From 8ce6c9ee80f32bebc6aa6687e4c75297b02c32f5 Mon Sep 17 00:00:00 2001 From: Hao Gao Date: Fri, 8 May 2020 10:05:51 -0500 Subject: [PATCH 1/7] Broadcast the complete traversal object to all worker ranks --- boxtree/distributed/__init__.py | 111 +++++++-------- boxtree/distributed/calculation.py | 15 +- boxtree/distributed/local_tree.py | 213 ++++++----------------------- boxtree/distributed/partition.py | 6 +- 4 files changed, 106 insertions(+), 239 deletions(-) diff --git a/boxtree/distributed/__init__.py b/boxtree/distributed/__init__.py index 07e0f1f..b10d3cd 100644 --- a/boxtree/distributed/__init__.py +++ b/boxtree/distributed/__init__.py @@ -28,14 +28,10 @@ import numpy as np from boxtree.cost import FMMCostModel MPITags = dict( - DIST_TREE=0, - DIST_SOURCES=1, - DIST_TARGETS=2, - DIST_RADII=3, - DIST_WEIGHT=4, - GATHER_POTENTIALS=5, - REDUCE_POTENTIALS=6, - REDUCE_INDICES=7 + DIST_WEIGHT=1, + GATHER_POTENTIALS=2, + REDUCE_POTENTIALS=3, + REDUCE_INDICES=4 ) @@ -64,13 +60,13 @@ class DistributedFMMInfo(object): An object implementing :class:`ExpansionWranglerInterface`. *global_wrangler* contains reference to the global tree object and is used for distributing and collecting density/potential between the root - and worker ranks. This attribute is only present on the root rank. + and worker ranks. .. attribute:: local_wrangler An object implementing :class:`ExpansionWranglerInterface`. *local_wrangler* contains reference to the local tree object and is - used for local FMM operations. This attribute is present on all ranks. + used for local FMM operations. """ # TODO: Support box_target_counts_nonchild? @@ -78,83 +74,82 @@ class DistributedFMMInfo(object): self.comm = comm current_rank = comm.Get_rank() + # {{{ broadcast global traversal object + if current_rank == 0: self.global_trav = global_trav_dev.get(queue=queue) else: self.global_trav = None + self.global_trav = comm.bcast(self.global_trav, root=0) + + if current_rank != 0: + global_trav_dev = self.global_trav.to_device(queue) + global_trav_dev.tree = self.global_trav.tree.to_device(queue) + + # }}} + self.distributed_expansion_wrangler_factory = \ distributed_expansion_wrangler_factory # {{{ Get global wrangler - if current_rank == 0: - self.global_wrangler = distributed_expansion_wrangler_factory( - self.global_trav.tree - ) - else: - self.global_wrangler = None + self.global_wrangler = distributed_expansion_wrangler_factory( + self.global_trav.tree + ) # }}} - # {{{ Broadcast well_sep_is_n_away + # {{{ Partiton work - if current_rank == 0: - well_sep_is_n_away = self.global_trav.well_sep_is_n_away - else: - well_sep_is_n_away = None + # Construct default cost model if not supplied + cost_model = FMMCostModel(queue) - well_sep_is_n_away = comm.bcast(well_sep_is_n_away, root=0) + if calibration_params is None: + # TODO: should replace the calibration params with a reasonable + # deafult one + calibration_params = \ + FMMCostModel.get_unit_calibration_params() - # }}} + boxes_time = cost_model.cost_per_box( + global_trav_dev, self.global_wrangler.level_nterms, + calibration_params + ).get() - # {{{ Partiton work + from boxtree.distributed.partition import partition_work + responsible_boxes_list = partition_work( + boxes_time, self.global_trav, comm.Get_size() + ) - if current_rank == 0: - # Construct default cost model if not supplied - cost_model = FMMCostModel(queue) - - if calibration_params is None: - # TODO: should replace the calibration params with a reasonable - # deafult one - calibration_params = \ - FMMCostModel.get_unit_calibration_params() - - boxes_time = cost_model.cost_per_box( - global_trav_dev, self.global_wrangler.level_nterms, - calibration_params - ).get() - - from boxtree.distributed.partition import partition_work - responsible_boxes_list = partition_work( - boxes_time, self.global_trav, comm.Get_size() - ) - else: - responsible_boxes_list = None + # It is assumed that, even if each rank computes `responsible_boxes_list` + # independently, it should be the same across ranks, since ranks use the same + # calibration parameters. # }}} - # {{{ Compute and distribute local tree + # {{{ Compute local tree - if current_rank == 0: - from boxtree.distributed.partition import ResponsibleBoxesQuery - responsible_box_query = ResponsibleBoxesQuery(queue, self.global_trav) - else: - responsible_box_query = None + from boxtree.distributed.partition import ResponsibleBoxesQuery + responsible_box_query = ResponsibleBoxesQuery(queue, self.global_trav) from boxtree.distributed.local_tree import generate_local_tree - self.local_tree, self.local_data, self.box_bounding_box = \ - generate_local_tree(queue, self.global_trav, responsible_boxes_list, - responsible_box_query) + self.local_tree, self.src_idx, self.tgt_idx = generate_local_tree( + queue, self.global_trav, responsible_boxes_list, responsible_box_query + ) # }}} - # {{{ Compute traversal object on each process + # {{{ Compute traversal object on each rank from boxtree.distributed.local_traversal import generate_local_travs self.local_trav = generate_local_travs( - queue, self.local_tree, self.box_bounding_box, - well_sep_is_n_away=well_sep_is_n_away) + queue, self.local_tree, + box_bounding_box={ + "min": self.global_trav.box_target_bounding_box_min, + "max": self.global_trav.box_target_bounding_box_max + }, + well_sep_is_n_away=self.global_trav.well_sep_is_n_away + ) # }}} @@ -169,6 +164,6 @@ class DistributedFMMInfo(object): from boxtree.distributed.calculation import calculate_pot return calculate_pot( self.local_wrangler, self.global_wrangler, self.local_trav, - source_weights, self.local_data, + source_weights, self.src_idx, self.tgt_idx, _communicate_mpoles_via_allreduce=_communicate_mpoles_via_allreduce ) diff --git a/boxtree/distributed/calculation.py b/boxtree/distributed/calculation.py index 064fc9a..bb61a1b 100644 --- a/boxtree/distributed/calculation.py +++ b/boxtree/distributed/calculation.py @@ -295,7 +295,7 @@ def communicate_mpoles(wrangler, comm, trav, mpole_exps, return_stats=False): # {{{ Distribute source weights -def distribute_source_weights(source_weights, local_data, comm=MPI.COMM_WORLD): +def distribute_source_weights(source_weights, src_idx, comm=MPI.COMM_WORLD): """ This function transfers needed source_weights from root process to each worker process in communicator *comm*. @@ -303,7 +303,7 @@ def distribute_source_weights(source_weights, local_data, comm=MPI.COMM_WORLD): :arg source_weights: Source weights in tree order on root, None on worker processes. - :arg local_data: Returned from *generate_local_tree*. None on worker processes. + :arg src_idx: Returned from *generate_local_tree*. None on worker processes. :return Source weights needed for the current process. """ current_rank = comm.Get_rank() @@ -314,7 +314,7 @@ def distribute_source_weights(source_weights, local_data, comm=MPI.COMM_WORLD): local_src_weights = np.empty((total_rank,), dtype=object) for irank in range(total_rank): - local_src_weights[irank] = source_weights[local_data[irank].src_idx] + local_src_weights[irank] = source_weights[src_idx[irank]] if irank != 0: weight_req.append( @@ -336,7 +336,7 @@ def distribute_source_weights(source_weights, local_data, comm=MPI.COMM_WORLD): # {{{ FMM driver for calculating potentials def calculate_pot(local_wrangler, global_wrangler, local_trav, source_weights, - local_data, comm=MPI.COMM_WORLD, + src_idx, tgt_idx, comm=MPI.COMM_WORLD, _communicate_mpoles_via_allreduce=False): """ Calculate potentials for targets on distributed memory machines. @@ -369,7 +369,7 @@ def calculate_pot(local_wrangler, global_wrangler, local_trav, source_weights, source_weights = source_weights[global_wrangler.tree.user_source_ids] local_src_weights = distribute_source_weights( - source_weights, local_data, comm=comm + source_weights, src_idx, comm=comm ) # }}} @@ -513,7 +513,8 @@ def calculate_pot(local_wrangler, global_wrangler, local_trav, source_weights, for irank in range(1, total_rank): potentials_all_ranks[irank] = np.empty( - (local_data[irank].ntargets,), dtype=potentials.dtype) + tgt_idx[irank].shape, dtype=potentials.dtype + ) comm.Recv([potentials_all_ranks[irank], potentials_mpi_type], source=irank, tag=MPITags["GATHER_POTENTIALS"]) @@ -530,7 +531,7 @@ def calculate_pot(local_wrangler, global_wrangler, local_trav, source_weights, dtype=potentials.dtype) for irank in range(total_rank): - potentials[local_data[irank].tgt_idx] = potentials_all_ranks[irank] + potentials[tgt_idx[irank]] = potentials_all_ranks[irank] logger.debug("reorder potentials") result = global_wrangler.reorder_potentials(potentials) diff --git a/boxtree/distributed/local_tree.py b/boxtree/distributed/local_tree.py index b904e93..02171f6 100644 --- a/boxtree/distributed/local_tree.py +++ b/boxtree/distributed/local_tree.py @@ -31,7 +31,6 @@ from boxtree import Tree from mpi4py import MPI import time import numpy as np -from boxtree.distributed import MPITags import logging logger = logging.getLogger(__name__) @@ -205,17 +204,6 @@ def get_fetch_local_particles_knls(context, global_tree): ) -LocalData = namedtuple( - 'LocalData', - [ - 'nsources', - 'ntargets', - 'src_idx', - 'tgt_idx' - ] -) - - def fetch_local_particles(queue, global_tree, src_box_mask, tgt_box_mask, local_tree, knls): """ This helper function fetches particles needed for worker processes, and @@ -480,18 +468,7 @@ def fetch_local_particles(queue, global_tree, src_box_mask, tgt_box_mask, local_ # }}} - # {{{ Fetch fields to local_data - - local_data = LocalData( - nsources=local_nsources, - ntargets=local_ntargets, - src_idx=src_idx, - tgt_idx=tgt_idx - ) - - # }}} - - return local_tree, local_data + return local_tree, src_idx, tgt_idx class LocalTreeBuilder: @@ -517,7 +494,7 @@ class LocalTreeBuilder: local_tree.user_source_ids = None local_tree.sorted_target_ids = None - local_tree, local_data = fetch_local_particles( + local_tree, src_idx, tgt_idx = fetch_local_particles( self.queue, self.global_tree, src_boxes_mask, @@ -532,7 +509,7 @@ class LocalTreeBuilder: local_tree.__class__ = LocalTree - return local_tree, local_data + return local_tree, src_idx, tgt_idx class LocalTree(Tree): @@ -568,92 +545,46 @@ class LocalTree(Tree): def generate_local_tree(queue, traversal, responsible_boxes_list, - responsible_box_query, comm=MPI.COMM_WORLD, - no_targets=False): + responsible_box_query, comm=MPI.COMM_WORLD): # Get MPI information - current_rank = comm.Get_rank() - total_rank = comm.Get_size() - - if current_rank == 0: - start_time = time.time() - - if current_rank == 0: - local_data = np.empty((total_rank,), dtype=object) - else: - local_data = None - - if current_rank == 0: - tree = traversal.tree - - local_tree_builder = LocalTreeBuilder(tree, queue) - - box_mpole_is_used = cl.array.empty( - queue, (total_rank, tree.nboxes,), dtype=np.int8 - ) - - # request objects for non-blocking communication - tree_req = [] - particles_req = [] - - # buffer holding communication data so that it is not garbage collected - local_tree = np.empty((total_rank,), dtype=object) - local_targets = np.empty((total_rank,), dtype=object) - local_sources = np.empty((total_rank,), dtype=object) - local_target_radii = np.empty((total_rank,), dtype=object) - - for irank in range(total_rank): - - (responsible_boxes_mask, ancestor_boxes, src_boxes_mask, - box_mpole_is_used[irank]) = \ - responsible_box_query.get_boxes_mask(responsible_boxes_list[irank]) - - local_tree[irank], local_data[irank] = \ - local_tree_builder.from_global_tree( - responsible_boxes_list[irank], responsible_boxes_mask, - src_boxes_mask, ancestor_boxes - ) + rank = comm.Get_rank() + size = comm.Get_size() - # master process does not need to communicate with itself - if irank == 0: - continue + start_time = time.time() - # {{{ Peel sources and targets off tree + tree = traversal.tree + local_tree_builder = LocalTreeBuilder(tree, queue) - local_targets[irank] = local_tree[irank].targets - local_tree[irank].targets = None + (responsible_boxes_mask, ancestor_boxes, src_boxes_mask, box_mpole_is_used) = \ + responsible_box_query.get_boxes_mask(responsible_boxes_list[rank]) - local_sources[irank] = local_tree[irank].sources - local_tree[irank].sources = None - - if tree.targets_have_extent: - local_target_radii[irank] = local_tree[irank].target_radii - local_tree[irank].target_radii = None - - # }}} + local_tree, src_idx, tgt_idx = local_tree_builder.from_global_tree( + responsible_boxes_list[rank], responsible_boxes_mask, src_boxes_mask, + ancestor_boxes + ) - # Send the local tree skeleton without sources and targets - tree_req.append(comm.isend( - local_tree[irank], dest=irank, tag=MPITags["DIST_TREE"])) + # {{{ compute the users of multipole expansions of each box on root rank - # Send the sources and targets - particles_req.append(comm.Isend( - local_sources[irank], dest=irank, tag=MPITags["DIST_SOURCES"])) + box_mpole_is_used_all_ranks = None + if rank == 0: + box_mpole_is_used_all_ranks = np.empty( + (size, tree.nboxes), dtype=box_mpole_is_used.dtype + ) + comm.Gather(box_mpole_is_used.get(), box_mpole_is_used_all_ranks, root=0) - if not no_targets: - particles_req.append(comm.Isend( - local_targets[irank], dest=irank, tag=MPITags["DIST_TARGETS"])) + box_to_user_starts = None + box_to_user_lists = None - if tree.targets_have_extent: - particles_req.append(comm.Isend( - local_target_radii[irank], dest=irank, - tag=MPITags["DIST_RADII"]) - ) + if rank == 0: + box_mpole_is_used_all_ranks = cl.array.to_device( + queue, box_mpole_is_used_all_ranks + ) from boxtree.tools import MaskCompressorKernel matcompr = MaskCompressorKernel(queue.context) (box_to_user_starts, box_to_user_lists, evt) = \ - matcompr(queue, box_mpole_is_used.transpose(), + matcompr(queue, box_mpole_is_used_all_ranks.transpose(), list_dtype=np.int32) cl.wait_for_events([evt]) @@ -664,83 +595,23 @@ def generate_local_tree(queue, traversal, responsible_boxes_list, logger.debug("computing box_to_user: done") - # Receive the local tree from root - if current_rank == 0: - MPI.Request.Waitall(tree_req) - local_tree = local_tree[0] - else: - local_tree = comm.recv(source=0, tag=MPITags["DIST_TREE"]) - - # Receive sources and targets - if current_rank == 0: - MPI.Request.Waitall(particles_req) - else: - reqs = [] - - local_tree.sources = np.empty( - (local_tree.dimensions, local_tree.nsources), - dtype=local_tree.coord_dtype - ) - reqs.append(comm.Irecv( - local_tree.sources, source=0, tag=MPITags["DIST_SOURCES"])) - - if no_targets: - local_tree.targets = None - if local_tree.targets_have_extent: - local_tree.target_radii = None - else: - local_tree.targets = np.empty( - (local_tree.dimensions, local_tree.ntargets), - dtype=local_tree.coord_dtype - ) - - reqs.append(comm.Irecv( - local_tree.targets, source=0, tag=MPITags["DIST_TARGETS"])) - - if local_tree.targets_have_extent: - local_tree.target_radii = np.empty( - (local_tree.ntargets,), - dtype=local_tree.coord_dtype - ) - - reqs.append(comm.Irecv( - local_tree.target_radii, source=0, tag=MPITags["DIST_RADII"])) - - MPI.Request.Waitall(reqs) - - # Receive box extent - if current_rank == 0: - box_target_bounding_box_min = traversal.box_target_bounding_box_min - box_target_bounding_box_max = traversal.box_target_bounding_box_max - else: - box_target_bounding_box_min = np.empty( - (local_tree.dimensions, local_tree.aligned_nboxes), - dtype=local_tree.coord_dtype - ) - box_target_bounding_box_max = np.empty( - (local_tree.dimensions, local_tree.aligned_nboxes), - dtype=local_tree.coord_dtype - ) - comm.Bcast(box_target_bounding_box_min, root=0) - comm.Bcast(box_target_bounding_box_max, root=0) - box_bounding_box = { - "min": box_target_bounding_box_min, - "max": box_target_bounding_box_max - } - - if current_rank != 0: - box_to_user_starts = None - box_to_user_lists = None - box_to_user_starts = comm.bcast(box_to_user_starts, root=0) box_to_user_lists = comm.bcast(box_to_user_lists, root=0) local_tree.box_to_user_starts = box_to_user_starts local_tree.box_to_user_lists = box_to_user_lists - if current_rank == 0: - logger.info("Distribute local tree in {} sec.".format( - str(time.time() - start_time)) - ) + # }}} + + # {{ Gather source indices and target indices of each rank + + src_idx_all_ranks = comm.gather(src_idx, root=0) + tgt_idx_all_ranks = comm.gather(tgt_idx, root=0) + + # }}} + + logger.info("Generate local tree on rank {} in {} sec.".format( + rank, str(time.time() - start_time) + )) - return local_tree, local_data, box_bounding_box + return local_tree, src_idx_all_ranks, tgt_idx_all_ranks diff --git a/boxtree/distributed/partition.py b/boxtree/distributed/partition.py index fc15a7e..bdf052f 100644 --- a/boxtree/distributed/partition.py +++ b/boxtree/distributed/partition.py @@ -147,8 +147,6 @@ class ResponsibleBoxesQuery(object): self.from_sep_bigger_lists_dev = cl.array.to_device( queue, traversal.from_sep_bigger_lists) - # }}} - if self.tree.targets_have_extent: # list 3 close if traversal.from_sep_close_smaller_starts is not None: @@ -164,6 +162,8 @@ class ResponsibleBoxesQuery(object): self.from_sep_close_bigger_lists_dev = cl.array.to_device( queue, traversal.from_sep_close_bigger_lists) + # }}} + # helper kernel for ancestor box query self.mark_parent_knl = cl.elementwise.ElementwiseKernel( queue.context, @@ -323,7 +323,7 @@ class ResponsibleBoxesQuery(object): def get_boxes_mask(self, responsible_boxes_list): """ Given a list of responsible boxes for a process, calculates the following - three masks: + four masks: responsible_box_mask: Current process will evaluate target potentials and multipole expansions in these boxes. Sources and targets in these boxes -- GitLab From 51bf728cd41f5252b68369b874ee04ac7cafd442 Mon Sep 17 00:00:00 2001 From: Hao Gao Date: Sun, 10 May 2020 13:18:50 -0500 Subject: [PATCH 2/7] Keep level_start_box_nrs in host memory for to_device --- boxtree/tools.py | 12 +++++++++--- boxtree/tree.py | 7 +++++++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/boxtree/tools.py b/boxtree/tools.py index 5dc5831..23193d7 100644 --- a/boxtree/tools.py +++ b/boxtree/tools.py @@ -270,7 +270,7 @@ class DeviceDataRecord(Record): instances on the host. """ - def _transform_arrays(self, f): + def _transform_arrays(self, f, exclude_fields=frozenset()): result = {} def transform_val(val): @@ -290,6 +290,9 @@ class DeviceDataRecord(Record): return f(val) for field_name in self.__class__.fields: + if field_name in exclude_fields: + continue + try: attr = getattr(self, field_name) except AttributeError: @@ -335,9 +338,12 @@ class DeviceDataRecord(Record): return self._transform_arrays(try_with_queue) - def to_device(self, queue): + def to_device(self, queue, exclude_fields=frozenset()): """ Return a copy of `self` in all :class:`numpy.ndarray` arrays are transferred to device memory as :class:`pyopencl.array.Array` objects. + + :arg exclude_fields: a :class:`frozenset` containing fields excluding from + transferring to the device memory. """ def _to_device(attr): @@ -346,7 +352,7 @@ class DeviceDataRecord(Record): else: return attr - return self._transform_arrays(_to_device) + return self._transform_arrays(_to_device, exclude_fields) # }}} diff --git a/boxtree/tree.py b/boxtree/tree.py index c09ee65..f4614b4 100644 --- a/boxtree/tree.py +++ b/boxtree/tree.py @@ -389,6 +389,13 @@ class Tree(DeviceDataRecord): # }}} + def to_device(self, queue, exclude_fields=frozenset()): + # level_start_box_nrs should remain in host memory + exclude_fields = set(exclude_fields) + exclude_fields.add("level_start_box_nrs") + + return super().to_device(queue, frozenset(exclude_fields)) + # }}} -- GitLab From 2e60bb642ded0fd1a4916c3bf9f1b8b0589ddc7c Mon Sep 17 00:00:00 2001 From: Hao Gao Date: Sun, 10 May 2020 14:21:08 -0500 Subject: [PATCH 3/7] Python2 compatibility --- boxtree/tree.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/boxtree/tree.py b/boxtree/tree.py index f4614b4..a67a7c2 100644 --- a/boxtree/tree.py +++ b/boxtree/tree.py @@ -394,7 +394,7 @@ class Tree(DeviceDataRecord): exclude_fields = set(exclude_fields) exclude_fields.add("level_start_box_nrs") - return super().to_device(queue, frozenset(exclude_fields)) + return super(Tree, self).to_device(queue, frozenset(exclude_fields)) # }}} -- GitLab From 4eaf9f28c1eb68db09c8102264d5515d7db696fc Mon Sep 17 00:00:00 2001 From: Hao Gao Date: Sun, 10 May 2020 23:59:22 -0500 Subject: [PATCH 4/7] Broadcast tree instead of traversal --- boxtree/distributed/__init__.py | 30 +++++++++++--------------- boxtree/distributed/local_traversal.py | 12 ++--------- test/test_distributed.py | 22 +++++++++---------- 3 files changed, 25 insertions(+), 39 deletions(-) diff --git a/boxtree/distributed/__init__.py b/boxtree/distributed/__init__.py index b10d3cd..48c4e88 100644 --- a/boxtree/distributed/__init__.py +++ b/boxtree/distributed/__init__.py @@ -51,7 +51,8 @@ def dtype_to_mpi(dtype): class DistributedFMMInfo(object): - def __init__(self, queue, global_trav_dev, + def __init__(self, queue, global_tree_dev, + traversal_builder, distributed_expansion_wrangler_factory, calibration_params=None, comm=MPI.COMM_WORLD): """ @@ -69,23 +70,19 @@ class DistributedFMMInfo(object): used for local FMM operations. """ - # TODO: Support box_target_counts_nonchild? - self.comm = comm current_rank = comm.Get_rank() - # {{{ broadcast global traversal object + # {{{ Broadcast global tree + global_tree = None if current_rank == 0: - self.global_trav = global_trav_dev.get(queue=queue) - else: - self.global_trav = None - - self.global_trav = comm.bcast(self.global_trav, root=0) + global_tree = global_tree_dev.get(queue) + global_tree = comm.bcast(global_tree, root=0) + global_tree_dev = global_tree.to_device(queue).with_queue(queue) - if current_rank != 0: - global_trav_dev = self.global_trav.to_device(queue) - global_trav_dev.tree = self.global_trav.tree.to_device(queue) + global_trav_dev, _ = traversal_builder(queue, global_tree_dev) + self.global_trav = global_trav_dev.get(queue) # }}} @@ -94,9 +91,7 @@ class DistributedFMMInfo(object): # {{{ Get global wrangler - self.global_wrangler = distributed_expansion_wrangler_factory( - self.global_trav.tree - ) + self.global_wrangler = distributed_expansion_wrangler_factory(global_tree) # }}} @@ -143,12 +138,11 @@ class DistributedFMMInfo(object): from boxtree.distributed.local_traversal import generate_local_travs self.local_trav = generate_local_travs( - queue, self.local_tree, + queue, self.local_tree, traversal_builder, box_bounding_box={ "min": self.global_trav.box_target_bounding_box_min, "max": self.global_trav.box_target_bounding_box_max - }, - well_sep_is_n_away=self.global_trav.well_sep_is_n_away + } ) # }}} diff --git a/boxtree/distributed/local_traversal.py b/boxtree/distributed/local_traversal.py index 895efed..773f286 100644 --- a/boxtree/distributed/local_traversal.py +++ b/boxtree/distributed/local_traversal.py @@ -33,8 +33,7 @@ logger = logging.getLogger(__name__) def generate_local_travs( - queue, local_tree, box_bounding_box=None, - well_sep_is_n_away=1, from_sep_smaller_crit=None, + queue, local_tree, traversal_builder, box_bounding_box=None, merge_close_lists=False): start_time = time.time() @@ -117,14 +116,7 @@ def generate_local_travs( modify_own_sources_knl(d_tree.responsible_boxes_list, local_box_flags) modify_child_sources_knl(d_tree.ancestor_mask, local_box_flags) - from boxtree.traversal import FMMTraversalBuilder - tg = FMMTraversalBuilder( - queue.context, - well_sep_is_n_away=well_sep_is_n_away, - from_sep_smaller_crit=from_sep_smaller_crit - ) - - d_local_trav, _ = tg( + d_local_trav, _ = traversal_builder( queue, d_tree, debug=True, box_bounding_box=box_bounding_box, local_box_flags=local_box_flags diff --git a/test/test_distributed.py b/test/test_distributed.py index 443306f..6a7aca8 100644 --- a/test/test_distributed.py +++ b/test/test_distributed.py @@ -26,7 +26,7 @@ def _test_against_shared(dims, nsources, ntargets, dtype): rank = comm.Get_rank() # Initialize arguments for worker processes - d_trav = None + tree = None sources_weights = None helmholtz_k = 0 @@ -37,6 +37,9 @@ def _test_against_shared(dims, nsources, ntargets, dtype): def fmm_level_to_nterms(tree, level): return max(level, 3) + from boxtree.traversal import FMMTraversalBuilder + tg = FMMTraversalBuilder(ctx, well_sep_is_n_away=2) + # Generate particles and run shared-memory parallelism on rank 0 if rank == 0: @@ -60,8 +63,6 @@ def _test_against_shared(dims, nsources, ntargets, dtype): tree, _ = tb(queue, sources, targets=targets, target_radii=target_radii, stick_out_factor=0.25, max_particles_in_box=30, debug=True) - from boxtree.traversal import FMMTraversalBuilder - tg = FMMTraversalBuilder(ctx, well_sep_is_n_away=2) d_trav, _ = tg(queue, tree, debug=True) trav = d_trav.get(queue=queue) @@ -84,7 +85,8 @@ def _test_against_shared(dims, nsources, ntargets, dtype): from boxtree.distributed import DistributedFMMInfo distribued_fmm_info = DistributedFMMInfo( - queue, d_trav, distributed_expansion_wrangler_factory, comm=comm) + queue, tree, tg, distributed_expansion_wrangler_factory, comm=comm + ) pot_dfmm = distribued_fmm_info.drive_dfmm(sources_weights) if rank == 0: @@ -132,7 +134,7 @@ def _test_constantone(dims, nsources, ntargets, dtype): rank = comm.Get_rank() # Initialization - d_trav = None + tree = None sources_weights = None # Configure PyOpenCL @@ -140,6 +142,9 @@ def _test_constantone(dims, nsources, ntargets, dtype): ctx = cl.create_some_context() queue = cl.CommandQueue(ctx) + from boxtree.traversal import FMMTraversalBuilder + tg = FMMTraversalBuilder(ctx) + if rank == 0: # Generate random particles @@ -157,17 +162,12 @@ def _test_constantone(dims, nsources, ntargets, dtype): tree, _ = tb(queue, sources, targets=targets, max_particles_in_box=30, debug=True) - # Build global interaction lists - from boxtree.traversal import FMMTraversalBuilder - tg = FMMTraversalBuilder(ctx) - d_trav, _ = tg(queue, tree, debug=True) - def constantone_expansion_wrangler_factory(tree): return ConstantOneExpansionWrangler(tree) from boxtree.distributed import DistributedFMMInfo distributed_fmm_info = DistributedFMMInfo( - queue, d_trav, constantone_expansion_wrangler_factory, comm=MPI.COMM_WORLD + queue, tree, tg, constantone_expansion_wrangler_factory, comm=MPI.COMM_WORLD ) pot_dfmm = distributed_fmm_info.drive_dfmm( -- GitLab From 8e5e5c959feea485db3464092dcdeedfe872f944 Mon Sep 17 00:00:00 2001 From: "[6~" Date: Mon, 11 May 2020 17:06:40 -0500 Subject: [PATCH 5/7] Placate flake8 3.8 --- boxtree/tools.py | 6 +++--- boxtree/tree_build.py | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/boxtree/tools.py b/boxtree/tools.py index 23193d7..9c74d96 100644 --- a/boxtree/tools.py +++ b/boxtree/tools.py @@ -46,9 +46,9 @@ VectorArg = partial(_VectorArg, with_offset=True) AXIS_NAMES = ("x", "y", "z", "w") -def padded_bin(i, l): - """Format *i* as binary number, pad it to length *l*.""" - return bin(i)[2:].rjust(l, "0") +def padded_bin(i, nbits): + """Format *i* as binary number, pad it to length *nbits*.""" + return bin(i)[2:].rjust(nbits, "0") # NOTE: Order of positional args should match GappyCopyAndMapKernel.__call__() diff --git a/boxtree/tree_build.py b/boxtree/tree_build.py index ff944c6..8f652aa 100644 --- a/boxtree/tree_build.py +++ b/boxtree/tree_build.py @@ -718,8 +718,8 @@ class TreeBuilder(object): # Currently undocumented. lr_lookbehind_levels = kwargs.get("lr_lookbehind", 1) minimal_new_level_length += sum( - 2**(l*dimensions) * new_level_leaf_counts[level - l] - for l in range(1, 1 + min(level, lr_lookbehind_levels))) + 2**(lev*dimensions) * new_level_leaf_counts[level - lev] + for lev in range(1, 1 + min(level, lr_lookbehind_levels))) nboxes_minimal = \ sum(minimal_upper_level_lengths) + minimal_new_level_length @@ -740,8 +740,8 @@ class TreeBuilder(object): # Recompute the level padding. for ulevel in range(level): upper_level_padding[ulevel] = sum( - 2**(l*dimensions) * new_level_leaf_counts[ulevel - l] - for l in range( + 2**(lev*dimensions) * new_level_leaf_counts[ulevel - lev] + for lev in range( 1, 1 + min(ulevel, lr_lookbehind_levels))) new_upper_level_unused_box_counts = np.max( -- GitLab From 9937ac572b9a57491a5453e60975d913595f9e71 Mon Sep 17 00:00:00 2001 From: Hao Gao Date: Mon, 11 May 2020 17:41:21 -0500 Subject: [PATCH 6/7] Change generate_local_tree interface for accepting a tree instead of a traversal --- boxtree/distributed/__init__.py | 2 +- boxtree/distributed/calculation.py | 3 ++- boxtree/distributed/local_tree.py | 3 +-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/boxtree/distributed/__init__.py b/boxtree/distributed/__init__.py index 48c4e88..9c5d5e8 100644 --- a/boxtree/distributed/__init__.py +++ b/boxtree/distributed/__init__.py @@ -129,7 +129,7 @@ class DistributedFMMInfo(object): from boxtree.distributed.local_tree import generate_local_tree self.local_tree, self.src_idx, self.tgt_idx = generate_local_tree( - queue, self.global_trav, responsible_boxes_list, responsible_box_query + queue, global_tree, responsible_boxes_list, responsible_box_query ) # }}} diff --git a/boxtree/distributed/calculation.py b/boxtree/distributed/calculation.py index bb61a1b..a85f5d9 100644 --- a/boxtree/distributed/calculation.py +++ b/boxtree/distributed/calculation.py @@ -349,7 +349,8 @@ def calculate_pot(local_wrangler, global_wrangler, local_trav, source_weights, This argument is None on worker ranks. :param local_trav: Local traversal object returned from *generate_local_travs*. :param source_weights: Source weights for FMM. None on worker ranks. - :param local_data: LocalData object returned from *generate_local_tree*. + :param src_idx: returned from *generate_local_tree*. + :param tgt_idx: returned from *generate_local_tree*. :param comm: MPI communicator. :param _communicate_mpoles_via_allreduce: Use MPI allreduce for communicating multipole expressions. Using MPI allreduce is slower but might be helpful for diff --git a/boxtree/distributed/local_tree.py b/boxtree/distributed/local_tree.py index 02171f6..b0b145a 100644 --- a/boxtree/distributed/local_tree.py +++ b/boxtree/distributed/local_tree.py @@ -544,7 +544,7 @@ class LocalTree(Tree): return self._dimensions -def generate_local_tree(queue, traversal, responsible_boxes_list, +def generate_local_tree(queue, tree, responsible_boxes_list, responsible_box_query, comm=MPI.COMM_WORLD): # Get MPI information @@ -553,7 +553,6 @@ def generate_local_tree(queue, traversal, responsible_boxes_list, start_time = time.time() - tree = traversal.tree local_tree_builder = LocalTreeBuilder(tree, queue) (responsible_boxes_mask, ancestor_boxes, src_boxes_mask, box_mpole_is_used) = \ -- GitLab From dd7905a641207e36f8b1674bf3a17f62597f1433 Mon Sep 17 00:00:00 2001 From: Hao Gao Date: Tue, 12 May 2020 00:35:53 -0500 Subject: [PATCH 7/7] Gather source and target indices outside generate_local_tree --- boxtree/distributed/__init__.py | 9 ++++++++- boxtree/distributed/calculation.py | 14 ++++++++------ boxtree/distributed/local_tree.py | 9 +-------- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/boxtree/distributed/__init__.py b/boxtree/distributed/__init__.py index 9c5d5e8..0154343 100644 --- a/boxtree/distributed/__init__.py +++ b/boxtree/distributed/__init__.py @@ -134,6 +134,13 @@ class DistributedFMMInfo(object): # }}} + # {{ Gather source indices and target indices of each rank + + self.src_idx_all_ranks = comm.gather(self.src_idx, root=0) + self.tgt_idx_all_ranks = comm.gather(self.tgt_idx, root=0) + + # }}} + # {{{ Compute traversal object on each rank from boxtree.distributed.local_traversal import generate_local_travs @@ -158,6 +165,6 @@ class DistributedFMMInfo(object): from boxtree.distributed.calculation import calculate_pot return calculate_pot( self.local_wrangler, self.global_wrangler, self.local_trav, - source_weights, self.src_idx, self.tgt_idx, + source_weights, self.src_idx_all_ranks, self.tgt_idx_all_ranks, _communicate_mpoles_via_allreduce=_communicate_mpoles_via_allreduce ) diff --git a/boxtree/distributed/calculation.py b/boxtree/distributed/calculation.py index a85f5d9..df97ec2 100644 --- a/boxtree/distributed/calculation.py +++ b/boxtree/distributed/calculation.py @@ -336,7 +336,7 @@ def distribute_source_weights(source_weights, src_idx, comm=MPI.COMM_WORLD): # {{{ FMM driver for calculating potentials def calculate_pot(local_wrangler, global_wrangler, local_trav, source_weights, - src_idx, tgt_idx, comm=MPI.COMM_WORLD, + src_idx_all_ranks, tgt_idx_all_ranks, comm=MPI.COMM_WORLD, _communicate_mpoles_via_allreduce=False): """ Calculate potentials for targets on distributed memory machines. @@ -349,8 +349,10 @@ def calculate_pot(local_wrangler, global_wrangler, local_trav, source_weights, This argument is None on worker ranks. :param local_trav: Local traversal object returned from *generate_local_travs*. :param source_weights: Source weights for FMM. None on worker ranks. - :param src_idx: returned from *generate_local_tree*. - :param tgt_idx: returned from *generate_local_tree*. + :param src_idx_all_ranks: gathered from the return value of + *generate_local_tree*. Only significant on root rank. + :param tgt_idx_all_ranks: gathered from the return value of + *generate_local_tree*. Only significant on root rank. :param comm: MPI communicator. :param _communicate_mpoles_via_allreduce: Use MPI allreduce for communicating multipole expressions. Using MPI allreduce is slower but might be helpful for @@ -370,7 +372,7 @@ def calculate_pot(local_wrangler, global_wrangler, local_trav, source_weights, source_weights = source_weights[global_wrangler.tree.user_source_ids] local_src_weights = distribute_source_weights( - source_weights, src_idx, comm=comm + source_weights, src_idx_all_ranks, comm=comm ) # }}} @@ -514,7 +516,7 @@ def calculate_pot(local_wrangler, global_wrangler, local_trav, source_weights, for irank in range(1, total_rank): potentials_all_ranks[irank] = np.empty( - tgt_idx[irank].shape, dtype=potentials.dtype + tgt_idx_all_ranks[irank].shape, dtype=potentials.dtype ) comm.Recv([potentials_all_ranks[irank], potentials_mpi_type], @@ -532,7 +534,7 @@ def calculate_pot(local_wrangler, global_wrangler, local_trav, source_weights, dtype=potentials.dtype) for irank in range(total_rank): - potentials[tgt_idx[irank]] = potentials_all_ranks[irank] + potentials[tgt_idx_all_ranks[irank]] = potentials_all_ranks[irank] logger.debug("reorder potentials") result = global_wrangler.reorder_potentials(potentials) diff --git a/boxtree/distributed/local_tree.py b/boxtree/distributed/local_tree.py index b0b145a..54ea206 100644 --- a/boxtree/distributed/local_tree.py +++ b/boxtree/distributed/local_tree.py @@ -602,15 +602,8 @@ def generate_local_tree(queue, tree, responsible_boxes_list, # }}} - # {{ Gather source indices and target indices of each rank - - src_idx_all_ranks = comm.gather(src_idx, root=0) - tgt_idx_all_ranks = comm.gather(tgt_idx, root=0) - - # }}} - logger.info("Generate local tree on rank {} in {} sec.".format( rank, str(time.time() - start_time) )) - return local_tree, src_idx_all_ranks, tgt_idx_all_ranks + return local_tree, src_idx, tgt_idx -- GitLab