From c0844c5a386fbbc80a5bb1ea99ec43a2180d74b5 Mon Sep 17 00:00:00 2001 From: Hao Gao Date: Tue, 19 Sep 2017 22:45:27 -0500 Subject: [PATCH 1/6] Distribute the source array for FMM --- boxtree/dfmm.py | 88 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 boxtree/dfmm.py diff --git a/boxtree/dfmm.py b/boxtree/dfmm.py new file mode 100644 index 0000000..79bc426 --- /dev/null +++ b/boxtree/dfmm.py @@ -0,0 +1,88 @@ +from __future__ import division + +__copyright__ = "Copyright (C) 2012 Andreas Kloeckner \ + Copyright (C) 2017 Hao Gao" + +__license__ = """ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +import logging +logger = logging.getLogger(__name__) + +import numpy as np +import hpx + +@hpx.create_action() +def main(sources, num_particles_per_block): + + # {{{ Distribute source array + + num_particles = sources.shape[0] + import math + num_block = math.ceil(num_particles / num_particles_per_block) + d_sources = hpx.GlobalMemory.alloc_cyclic(num_block, + (num_particles_per_block, 2), sources.dtype) + finished_copy = hpx.And(num_block) + for i in range(num_block): + d_sources[i].set(sources[i*num_particles_per_block : (i+1)*num_particles_per_block], + sync='async', rsync_lco=finished_copy) + finished_copy.wait() + + # }}} + + # WIP: this is a placeholder + potentials = np.empty((num_particles,), dtype=float) + hpx.exit(array=potentials) + +def ddrive_fmm(traversal, expansion_wrangler, src_weights, num_particles_per_block=10000, + hpx_options=[]): + """Distributed implementation of top-level driver routine for a fast + multipole calculation. + + :arg traversal: A :class:`boxtree.traversal.FMMTraversalInfo` instance. + :arg expansion_wrangler: An object exhibiting the + :class:`ExpansionWranglerInterface`. + :arg src_weights: Source 'density/weights/charges'. + Passed unmodified to *expansion_wrangler*. + :arg hpx_options: Options for HPX runtime. Pass directly to hpx.init. + + Returns the potentials computed by *expansion_wrangler*. + """ + wrangler = expansion_wrangler + logger.info("start fmm") + + logger.debug("reorder source weights") + src_weights = wrangler.reorder_sources(src_weights) + + logger.debug("start hpx runtime") + hpx.init(argv=hpx_options) + + # launch the main action + sources = np.stack([wrangler.tree.sources[0], wrangler.tree.sources[1]], + axis=-1) + num_particles = sources.shape[0] + potentials = hpx.run(main, sources, num_particles_per_block, + shape=(num_particles,), dtype=float) + + logger.debug("finalize hpx runtime") + hpx.finalize() + + return potentials + -- GitLab From e11f35fd0c38834eca61587e82ba620f7b24443e Mon Sep 17 00:00:00 2001 From: Hao Gao Date: Mon, 16 Oct 2017 23:55:34 -0500 Subject: [PATCH 2/6] Add a test script for distributed FMM so that the result can compare --- test/test_dfmm.py | 70 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 test/test_dfmm.py diff --git a/test/test_dfmm.py b/test/test_dfmm.py new file mode 100644 index 0000000..56db48d --- /dev/null +++ b/test/test_dfmm.py @@ -0,0 +1,70 @@ +import numpy as np +import sys +from mpi4py import MPI + +# Get MPI information +comm = MPI.COMM_WORLD +rank = comm.Get_rank() + +# Parameters +dims = 2 +nsources = 3000 +ntargets = 1000 +dtype = np.float64 + +# Generate particles and run shared-memory parallelism on rank 0 +if rank == 0: + # Configure PyOpenCL + import pyopencl as cl + ctx = cl.create_some_context() + queue = cl.CommandQueue(ctx) + + # Generate random particles and source weights + from boxtree.tools import make_normal_particle_array as p_normal + sources = p_normal(queue, nsources, dims, dtype, seed=15) + targets = p_normal(queue, ntargets, dims, dtype, seed=18) + np.array([2, 0, 0])[:dims] + + from boxtree.tools import particle_array_to_host + sources_host = particle_array_to_host(sources) + targets_host = particle_array_to_host(targets) + + from pyopencl.clrandom import PhiloxGenerator + rng = PhiloxGenerator(queue.context, seed=20) + sources_weights = rng.uniform(queue, nsources, dtype=np.float64).get() + + # Display sources and targets + if "--display" in sys.argv: + import matplotlib.pyplot as plt + plt.plot(sources_host[:, 0], sources_host[:, 1], "bo") + plt.plot(targets_host[:, 0], targets_host[:, 1], "ro") + plt.show() + + # Calculate potentials in naive algorithm + import numpy.linalg as la + distances = la.norm(sources_host.reshape(1, nsources, 2) - \ + targets_host.reshape(ntargets, 1, 2), + ord=2, axis=2) + pot_naive = np.sum(-np.log(distances)*sources_weights, axis=1) + + # Build the tree and interaction lists + from boxtree import TreeBuilder + tb = TreeBuilder(ctx) + tree, _ = tb(queue, sources, targets=targets, max_particles_in_box=30, debug=True) + + from boxtree.traversal import FMMTraversalBuilder + tg = FMMTraversalBuilder(ctx) + trav, _ = tg(queue, tree, debug=True) + trav = trav.get(queue=queue) + + # Get pyfmmlib expansion wrangler + from boxtree.pyfmmlib_integration import FMMLibExpansionWrangler + def fmm_level_to_nterms(tree, level): + return 20 + wrangler = FMMLibExpansionWrangler(trav.tree, 0, fmm_level_to_nterms=fmm_level_to_nterms) + + # Compute FMM using shared memory parallelism + from boxtree.fmm import drive_fmm + pot_fmm = drive_fmm(trav, wrangler, sources_weights)* 2 * np.pi + print(la.norm(pot_fmm - pot_naive, ord=2)) + +# Next: Compute FMM using distributed memory parallelism -- GitLab From 74e58373f2cd1e155838b4b886e8877a5329d089 Mon Sep 17 00:00:00 2001 From: Hao Gao Date: Tue, 17 Oct 2017 12:31:42 -0500 Subject: [PATCH 3/6] Distribute source particles across all ranks --- boxtree/dfmm.py | 97 +++++++++++++++++++++++-------------------------- 1 file changed, 46 insertions(+), 51 deletions(-) diff --git a/boxtree/dfmm.py b/boxtree/dfmm.py index 79bc426..aace408 100644 --- a/boxtree/dfmm.py +++ b/boxtree/dfmm.py @@ -26,63 +26,58 @@ THE SOFTWARE. import logging logger = logging.getLogger(__name__) +from mpi4py import MPI import numpy as np -import hpx -@hpx.create_action() -def main(sources, num_particles_per_block): +def drive_dfmm(traversal, expansion_wrangler, src_weights): - # {{{ Distribute source array - - num_particles = sources.shape[0] - import math - num_block = math.ceil(num_particles / num_particles_per_block) - d_sources = hpx.GlobalMemory.alloc_cyclic(num_block, - (num_particles_per_block, 2), sources.dtype) - finished_copy = hpx.And(num_block) - for i in range(num_block): - d_sources[i].set(sources[i*num_particles_per_block : (i+1)*num_particles_per_block], - sync='async', rsync_lco=finished_copy) - finished_copy.wait() + # {{{ Get MPI information - # }}} + comm = MPI.COMM_WORLD + current_rank = comm.Get_rank() + total_rank = comm.Get_size() - # WIP: this is a placeholder - potentials = np.empty((num_particles,), dtype=float) - hpx.exit(array=potentials) - -def ddrive_fmm(traversal, expansion_wrangler, src_weights, num_particles_per_block=10000, - hpx_options=[]): - """Distributed implementation of top-level driver routine for a fast - multipole calculation. - - :arg traversal: A :class:`boxtree.traversal.FMMTraversalInfo` instance. - :arg expansion_wrangler: An object exhibiting the - :class:`ExpansionWranglerInterface`. - :arg src_weights: Source 'density/weights/charges'. - Passed unmodified to *expansion_wrangler*. - :arg hpx_options: Options for HPX runtime. Pass directly to hpx.init. - - Returns the potentials computed by *expansion_wrangler*. - """ - wrangler = expansion_wrangler - logger.info("start fmm") - - logger.debug("reorder source weights") - src_weights = wrangler.reorder_sources(src_weights) - - logger.debug("start hpx runtime") - hpx.init(argv=hpx_options) + # }}} - # launch the main action - sources = np.stack([wrangler.tree.sources[0], wrangler.tree.sources[1]], - axis=-1) - num_particles = sources.shape[0] - potentials = hpx.run(main, sources, num_particles_per_block, - shape=(num_particles,), dtype=float) + # {{{ Distribute problem parameters - logger.debug("finalize hpx runtime") - hpx.finalize() + if current_rank == 0: + tree = traversal.tree + parameters = {"nsources":tree.nsources, + "dimensions":tree.sources.shape[0], + "coord_dtype":tree.coord_dtype} + else: + parameters = None + parameters = comm.bcast(parameters, root=0) + + # }}} - return potentials + # {{{ Distribute source particles + num_sources_per_rank = (parameters["nsources"] + total_rank - 1) // total_rank + sources = [] + + for i in range(parameters["dimensions"]): + # Prepare send buffer + if current_rank == 0: + sendbuf = np.empty((num_sources_per_rank * total_rank,), + dtype=parameters['coord_dtype']) + sendbuf[:parameters["nsources"]] = tree.sources[i] + else: + sendbuf = None + + # Prepare receive buffer + recvbuf = np.empty((num_sources_per_rank,), dtype=parameters['coord_dtype']) + + # Scatter send buffer + comm.Scatter(sendbuf, recvbuf, root=0) + + # Trim the receive buffer for the last rank + if current_rank == total_rank - 1: + num_sources_current_rank = parameters["nsources"] - \ + num_sources_per_rank * (total_rank - 1) + sources.append(recvbuf[:num_sources_current_rank]) + else: + sources.append(recvbuf) + + # }}} -- GitLab From 2fb275099607ce65ed23a7477f8354ec2a68b20d Mon Sep 17 00:00:00 2001 From: Hao Gao Date: Wed, 18 Oct 2017 22:07:07 -0500 Subject: [PATCH 4/6] Update test script --- test/test_dfmm.py | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/test/test_dfmm.py b/test/test_dfmm.py index 56db48d..2a6c724 100644 --- a/test/test_dfmm.py +++ b/test/test_dfmm.py @@ -2,16 +2,21 @@ import numpy as np import sys from mpi4py import MPI -# Get MPI information -comm = MPI.COMM_WORLD -rank = comm.Get_rank() - # Parameters dims = 2 -nsources = 3000 -ntargets = 1000 +nsources = 30 +ntargets = 10 dtype = np.float64 +# Get the current rank +comm = MPI.COMM_WORLD +rank = comm.Get_rank() + +# Initialization +trav = None +sources_weights = None +wrangler = None + # Generate particles and run shared-memory parallelism on rank 0 if rank == 0: # Configure PyOpenCL @@ -39,7 +44,7 @@ if rank == 0: plt.plot(targets_host[:, 0], targets_host[:, 1], "ro") plt.show() - # Calculate potentials in naive algorithm + # Calculate potentials using direct evaluation import numpy.linalg as la distances = la.norm(sources_host.reshape(1, nsources, 2) - \ targets_host.reshape(ntargets, 1, 2), @@ -67,4 +72,9 @@ if rank == 0: pot_fmm = drive_fmm(trav, wrangler, sources_weights)* 2 * np.pi print(la.norm(pot_fmm - pot_naive, ord=2)) -# Next: Compute FMM using distributed memory parallelism +# Compute FMM using distributed memory parallelism +from boxtree.dfmm import drive_dfmm +# Note: The drive_dfmm interface works as follows: +# Rank 0 passes the correct trav, wrangler, and sources_weights +# All other ranks pass None to these arguments +pot_dfmm = drive_dfmm(trav, wrangler, sources_weights) -- GitLab From a55da65a2e64da27c5ca32ea40a078b839eb71cc Mon Sep 17 00:00:00 2001 From: Hao Gao Date: Sun, 22 Oct 2017 20:01:51 -0500 Subject: [PATCH 5/6] Construct ancestor mask for all boxes in a rank --- boxtree/dfmm.py | 81 ++++++++++++++++++++++++++++++------------------- 1 file changed, 50 insertions(+), 31 deletions(-) diff --git a/boxtree/dfmm.py b/boxtree/dfmm.py index aace408..35261f0 100644 --- a/boxtree/dfmm.py +++ b/boxtree/dfmm.py @@ -39,45 +39,64 @@ def drive_dfmm(traversal, expansion_wrangler, src_weights): # }}} - # {{{ Distribute problem parameters + # {{{ Distribute tree parameters if current_rank == 0: tree = traversal.tree - parameters = {"nsources":tree.nsources, + # TODO: distribute more parameters of the tree + parameters = {"sources_are_targets": tree.sources_are_targets, + "sources_have_extent": tree.sources_have_extent, + "nsources":tree.nsources, + "nboxes":tree.box_source_starts.shape[0], "dimensions":tree.sources.shape[0], - "coord_dtype":tree.coord_dtype} + "coord_dtype":tree.coord_dtype, + "box_id_dtype":tree.box_id_dtype} else: parameters = None parameters = comm.bcast(parameters, root=0) # }}} - # {{{ Distribute source particles - - num_sources_per_rank = (parameters["nsources"] + total_rank - 1) // total_rank - sources = [] - - for i in range(parameters["dimensions"]): - # Prepare send buffer - if current_rank == 0: - sendbuf = np.empty((num_sources_per_rank * total_rank,), - dtype=parameters['coord_dtype']) - sendbuf[:parameters["nsources"]] = tree.sources[i] - else: - sendbuf = None - - # Prepare receive buffer - recvbuf = np.empty((num_sources_per_rank,), dtype=parameters['coord_dtype']) - - # Scatter send buffer - comm.Scatter(sendbuf, recvbuf, root=0) - - # Trim the receive buffer for the last rank - if current_rank == total_rank - 1: - num_sources_current_rank = parameters["nsources"] - \ - num_sources_per_rank * (total_rank - 1) - sources.append(recvbuf[:num_sources_current_rank]) - else: - sources.append(recvbuf) - + # {{{ Fill tree parameters to the locally essentail tree + + from boxtree import Tree + letree = Tree() + # TODO: add more parameters to the locally essential tree + letree.sources_are_targets = parameters["sources_are_targets"] + + # }}} + + # {{{ Construct locally essential tree mask for each rank + + # Problem: Current implementation divides all boxes with targets evenly across all + # ranks. This scheme is subject to significant load imbalance. A better way to do + # this is to assign a weight to each box according to its interaction list, and then + # divides boxes evenly by the total weights. + + if current_rank == 0: + # mask[i][j] is true iff box j is in the locally essential tree of rank i + mask = np.zeros((total_rank, parameters["nboxes"]), dtype=bool) + target_boxes = traversal.target_boxes + num_boxes_per_rank = (len(target_boxes) + total_rank - 1) // total_rank + + for i in range(total_rank): + # Get the start and end box index for rank i + box_start_idx = num_boxes_per_rank * i + if current_rank == total_rank - 1: + box_end_idx = len(target_boxes) + else: + box_end_idx = num_boxes_per_rank * (i + 1) + + # Mark all ancestors of boxes of rank i + new_mask = np.zeros(parameters["nboxes"], dtype=bool) + new_mask[target_boxes[box_start_idx:box_end_idx]] = True + while np.count_nonzero(new_mask) != 0: + np.logical_or(mask[i, :], new_mask, out=mask[i, :]) + new_mask_idx = np.nonzero(new_mask) + new_mask_parent_idx = tree.box_parent_ids[new_mask_idx] + new_mask[:] = False + new_mask[new_mask_parent_idx] = True + new_mask = np.logical_and(new_mask, np.logical_not(mask[i, :]), + out=new_mask) + # }}} -- GitLab From e254f9472c0c1af1d8e9a408ad78a7ba1215cea2 Mon Sep 17 00:00:00 2001 From: Hao Gao Date: Mon, 23 Oct 2017 13:49:05 -0500 Subject: [PATCH 6/6] Generate masks for list 1 and list 2 --- boxtree/dfmm.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/boxtree/dfmm.py b/boxtree/dfmm.py index 35261f0..ebd8350 100644 --- a/boxtree/dfmm.py +++ b/boxtree/dfmm.py @@ -68,7 +68,7 @@ def drive_dfmm(traversal, expansion_wrangler, src_weights): # {{{ Construct locally essential tree mask for each rank - # Problem: Current implementation divides all boxes with targets evenly across all + # Problem: Current implementation divides all boxes evenly across all # ranks. This scheme is subject to significant load imbalance. A better way to do # this is to assign a weight to each box according to its interaction list, and then # divides boxes evenly by the total weights. @@ -76,20 +76,19 @@ def drive_dfmm(traversal, expansion_wrangler, src_weights): if current_rank == 0: # mask[i][j] is true iff box j is in the locally essential tree of rank i mask = np.zeros((total_rank, parameters["nboxes"]), dtype=bool) - target_boxes = traversal.target_boxes - num_boxes_per_rank = (len(target_boxes) + total_rank - 1) // total_rank + num_boxes_per_rank = (parameters["nboxes"] + total_rank - 1) // total_rank for i in range(total_rank): # Get the start and end box index for rank i box_start_idx = num_boxes_per_rank * i if current_rank == total_rank - 1: - box_end_idx = len(target_boxes) + box_end_idx = parameters["nboxes"] else: box_end_idx = num_boxes_per_rank * (i + 1) # Mark all ancestors of boxes of rank i new_mask = np.zeros(parameters["nboxes"], dtype=bool) - new_mask[target_boxes[box_start_idx:box_end_idx]] = True + new_mask[box_start_idx:box_end_idx] = True while np.count_nonzero(new_mask) != 0: np.logical_or(mask[i, :], new_mask, out=mask[i, :]) new_mask_idx = np.nonzero(new_mask) @@ -98,5 +97,21 @@ def drive_dfmm(traversal, expansion_wrangler, src_weights): new_mask[new_mask_parent_idx] = True new_mask = np.logical_and(new_mask, np.logical_not(mask[i, :]), out=new_mask) + + # Generate interaction list mask for mask[i, :] + interaction_mask = np.zeros(parameters["nboxes"], dtype=bool) + box_indices = np.nonzero(mask[i, :]) + for j in range(len(box_indices)): + box_index = box_indices[j] + # List 1 + start, end = traversal.neighbor_source_boxes_starts[box_index:box_index + 2] + list1_idx = traversal.neighbor_source_boxes_lists[start:end] + interaction_mask[list1_idx] = True + # List 2 + start, end = traversal.from_sep_siblings_starts[box_index:box_index + 2] + list2_idx = traversal.from_sep_siblings_lists[start:end] + interaction_mask[list2_idx] = True + # List 3 + # }}} -- GitLab