diff --git a/.gitignore b/.gitignore index e271a149bb7acedf3510b6272681d4480b817cc6..6bd925ce9df7b1c6439507dda00b51c0b74678a2 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,8 @@ distribute*tar.gz a.out .cache +.pytest_cache + +.DS_Store +.idea +.vscode diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 795a715029b86f2cb7cc242077034e23a4eaae46..a417e101c088e88584161fb219282963ef81755c 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -59,6 +59,20 @@ Python 3 POCL K40: reports: junit: test/pytest.xml +Python 3 POCL MPI: + script: + - export PY_EXE=python3 + - export PYOPENCL_TEST=portable + - export EXTRA_INSTALL="numpy mako mpi4py pybind11" + - export PYTEST_ADDOPTS="-k mpi --capture=no" + - curl -L -O -k https://gitlab.tiker.net/inducer/ci-support/raw/master/build-and-test-py-project.sh + - ". ./build-and-test-py-project.sh" + tags: + - python3 + - pocl + except: + - tags + Python 3 POCL Examples: script: - test -n "$SKIP_EXAMPLES" && exit @@ -76,10 +90,11 @@ Python 3 POCL Examples: reports: junit: test/pytest.xml + Pylint: script: - export PY_EXE=python3 - - EXTRA_INSTALL="pybind11 numpy mako matplotlib" + - EXTRA_INSTALL="pybind11 numpy mako matplotlib mpi4py" - curl -L -O https://gitlab.tiker.net/inducer/ci-support/raw/main/prepare-and-run-pylint.sh - ". ./prepare-and-run-pylint.sh boxtree test/test_*.py" tags: @@ -89,7 +104,7 @@ Pylint: Documentation: script: - - EXTRA_INSTALL="pybind11 numpy mako" + - EXTRA_INSTALL="pybind11 numpy mako mpi4py" - curl -L -O https://gitlab.tiker.net/inducer/ci-support/raw/main/build-docs.sh - ". ./build-docs.sh" tags: diff --git a/.test-conda-env-py3.yml b/.test-conda-env-py3.yml index eb1d395a817bb9db89ac0eaeb8365dccb9200b7e..0ab7d188ce5f71ad553957128fdedda0fa9a3c5b 100644 --- a/.test-conda-env-py3.yml +++ b/.test-conda-env-py3.yml @@ -13,6 +13,7 @@ dependencies: - pyopencl - islpy - pyfmmlib +- mpi4py # Only needed to make pylint succeed - matplotlib-base diff --git a/boxtree/distributed/__init__.py b/boxtree/distributed/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..26dd34662e7d844ee4c59a4203bb9948a9b63c65 --- /dev/null +++ b/boxtree/distributed/__init__.py @@ -0,0 +1,245 @@ +__copyright__ = "Copyright (C) 2013 Andreas Kloeckner \ + Copyright (C) 2018 Hao Gao" + +__license__ = """ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +__doc__ = """ +High-level Point FMM Interface +------------------------------ + +To perform point-FMM, first construct a +:class:`boxtree.distributed.DistributedFMMRunner` object. The constructor will +distribute the necessary information from the root rank to all worker ranks. Then, +the :meth:`boxtree.distributed.DistributedFMMRunner.drive_dfmm` can be used for +launching FMM. + +.. autoclass:: boxtree.distributed.DistributedFMMRunner + + +Distributed Algorithm Overview +------------------------------ + +1. Construct the global tree and traversal lists on the root rank and broadcast to + all worker ranks. +2. Partition boxes into disjoint sets, where the number of sets is the number of MPI + ranks. (See :ref:`partition-boxes`) +3. Each rank constructs the local tree and traversal lists independently, according + to the partition. (See :ref:`construct-local-tree-traversal`) +4. Distribute source weights from the root rank to all worker ranks. (See + :ref:`distributed-wrangler`) +5. Each rank independently forms multipole expansions from the leaf nodes of the + local tree and propagates the partial multipole expansions upwards. +6. Communicate multipole expansions so that all ranks have the complete multipole + expansions needed. +7. Each ranks indepedently forms local expansions, propagates the local expansions + downwards, and evaluate potentials of target points in its partition. The + calculated potentials are then assembled on the root rank. + +For step 5-7, see :ref:`distributed-fmm-evaluation`. + +Note that step 4-7 may be repeated multiple times with the same tree and traversal +object built from step 1-3. For example, when iteratively solving a PDE, step 4-7 is +executed for each iteration of the linear solver. + +The next sections will cover the interfaces of these steps. + +.. _partition-boxes: + +Partition Boxes +--------------- + +.. autofunction:: boxtree.distributed.partition.partition_work + +.. autoclass:: boxtree.distributed.partition.BoxMasks + +.. autofunction:: boxtree.distributed.partition.get_box_masks + +.. _construct-local-tree-traversal: + +Construct Local Tree and Traversal +---------------------------------- + +.. autoclass:: boxtree.distributed.local_tree.LocalTree + +.. autofunction:: boxtree.distributed.local_tree.generate_local_tree + +.. autofunction:: boxtree.distributed.local_traversal.generate_local_travs + +.. _distributed-wrangler: + +Distributed Wrangler +-------------------- + +.. autoclass:: boxtree.distributed.calculation.DistributedExpansionWrangler + +.. _distributed-fmm-evaluation: + +Distributed FMM Evaluation +-------------------------- + +The distributed version of the FMM evaluation shares the same interface as the +shared-memory version. To evaluate FMM in a distributed manner, use a subclass +of :class:`boxtree.distributed.calculation.DistributedExpansionWrangler` in +:func:`boxtree.fmm.drive_fmm`. + +""" + +from mpi4py import MPI +import numpy as np +from enum import IntEnum +import warnings +from boxtree.cost import FMMCostModel + +__all__ = ["DistributedFMMRunner"] + + +class MPITags(IntEnum): + DIST_WEIGHT = 1 + GATHER_POTENTIALS = 2 + REDUCE_POTENTIALS = 3 + REDUCE_INDICES = 4 + + +def dtype_to_mpi(dtype): + """ This function translates a numpy datatype into the corresponding type used in + mpi4py. + """ + if hasattr(MPI, "_typedict"): + mpi_type = MPI._typedict[np.dtype(dtype).char] + elif hasattr(MPI, "__TypeDict__"): + mpi_type = MPI.__TypeDict__[np.dtype(dtype).char] + else: + raise RuntimeError("There is no dictionary to translate from Numpy dtype to " + "MPI type") + return mpi_type + + +class DistributedFMMRunner: + """Helper class for setting up and running distributed point FMM. + + .. automethod:: __init__ + .. automethod:: drive_dfmm + """ + def __init__(self, queue, global_tree_dev, + traversal_builder, + wrangler_factory, + calibration_params=None, comm=MPI.COMM_WORLD): + """Distributes the global tree from the root rank to each worker rank. + + :arg global_tree_dev: a :class:`boxtree.Tree` object in device memory. + :arg traversal_builder: an object which, when called, takes a + :class:`pyopencl.CommandQueue` object and a :class:`boxtree.Tree` object, + and generates a :class:`boxtree.traversal.FMMTraversalInfo` object from + the tree using the command queue. + :arg wrangler_factory: an object which, when called, takes the local + traversal and the global traversal objects and returns an + :class:`boxtree.fmm.ExpansionWranglerInterface` object. + :arg calibration_params: Calibration parameters for the cost model, + if supplied. The cost model is used for estimating the execution time of + each box, which is used for improving load balancing. + :arg comm: MPI communicator. + """ + self.comm = comm + mpi_rank = comm.Get_rank() + + # {{{ Broadcast global tree + + global_tree = None + if mpi_rank == 0: + global_tree = global_tree_dev.get(queue) + global_tree = comm.bcast(global_tree, root=0) + global_tree_dev = global_tree.to_device(queue).with_queue(queue) + + global_trav_dev, _ = traversal_builder(queue, global_tree_dev) + global_trav = global_trav_dev.get(queue) + + # }}} + + # {{{ Partition work + + cost_per_box = None + + if mpi_rank == 0: + cost_model = FMMCostModel() + + if calibration_params is None: + # Use default calibration parameters if not supplied + # TODO: should replace the default calibration params with a more + # accurate one + warnings.warn("Calibration parameters for the cost model are not " + "supplied. The default one will be used.") + calibration_params = \ + FMMCostModel.get_unit_calibration_params() + + # We need to construct a wrangler in order to access `level_nterms` + global_wrangler = wrangler_factory(global_trav, global_trav) + + cost_per_box = cost_model.cost_per_box( + # Currently only pyfmmlib has `level_nterms` field. + # See https://gitlab.tiker.net/inducer/boxtree/-/issues/25. + queue, global_trav_dev, global_wrangler.level_nterms, + calibration_params + ).get() + + from boxtree.distributed.partition import partition_work + responsible_boxes_list = partition_work(cost_per_box, global_trav, comm) + + # }}} + + # {{{ Compute local tree + + from boxtree.distributed.local_tree import generate_local_tree + self.local_tree, self.src_idx, self.tgt_idx = generate_local_tree( + queue, global_trav, responsible_boxes_list, comm) + + # }}} + + # {{ Gather source indices and target indices of each rank + + self.src_idx_all_ranks = comm.gather(self.src_idx, root=0) + self.tgt_idx_all_ranks = comm.gather(self.tgt_idx, root=0) + + # }}} + + # {{{ Compute traversal object on each rank + + from boxtree.distributed.local_traversal import generate_local_travs + local_trav = generate_local_travs( + queue, self.local_tree, traversal_builder, + box_bounding_box={ + "min": global_trav.box_target_bounding_box_min, + "max": global_trav.box_target_bounding_box_max + } + ) + + # }}} + + self.wrangler = wrangler_factory(local_trav.get(None), global_trav) + + def drive_dfmm(self, source_weights, timing_data=None): + """Calculate potentials at target points. + """ + from boxtree.fmm import drive_fmm + return drive_fmm( + self.wrangler, source_weights, + timing_data=timing_data, + global_src_idx_all_ranks=self.src_idx_all_ranks, + global_tgt_idx_all_ranks=self.tgt_idx_all_ranks) diff --git a/boxtree/distributed/calculation.py b/boxtree/distributed/calculation.py new file mode 100644 index 0000000000000000000000000000000000000000..6df05ec8a73d0b7c1c20e52451e7c33b89a84f24 --- /dev/null +++ b/boxtree/distributed/calculation.py @@ -0,0 +1,418 @@ +from __future__ import division + +__copyright__ = "Copyright (C) 2013 Andreas Kloeckner \ + Copyright (C) 2018 Hao Gao" + +__license__ = """ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +import numpy as np +import pyopencl as cl +from boxtree.distributed import MPITags +from mpi4py import MPI +from boxtree.pyfmmlib_integration import FMMLibExpansionWrangler +from boxtree.fmm import ExpansionWranglerInterface +from pytools import memoize_method +from pyopencl.tools import dtype_to_ctype +from pyopencl.elementwise import ElementwiseKernel +from mako.template import Template + +import logging +logger = logging.getLogger(__name__) + + +# {{{ Distributed FMM wrangler + +class DistributedExpansionWrangler(ExpansionWranglerInterface): + """Distributed expansion wrangler base class. + + This is an abstract class and should not be directly instantiated. Instead, it is + expected that all distributed wranglers should be subclasses of this class. + + .. automethod:: __init__ + .. automethod:: distribute_source_weights + .. automethod:: gather_potential_results + .. automethod:: communicate_mpoles + """ + def __init__(self, context, comm, global_traversal, + communicate_mpoles_via_allreduce=False): + self.context = context + self.comm = comm + self.global_traversal = global_traversal + self.communicate_mpoles_via_allreduce = communicate_mpoles_via_allreduce + + def distribute_source_weights(self, src_weight_vecs, src_idx_all_ranks): + mpi_rank = self.comm.Get_rank() + mpi_size = self.comm.Get_size() + + if mpi_rank == 0: + distribute_weight_req = [] + local_src_weight_vecs = np.empty((mpi_size,), dtype=object) + + for irank in range(mpi_size): + local_src_weight_vecs[irank] = [ + source_weights[src_idx_all_ranks[irank]] + for source_weights in src_weight_vecs] + + if irank != 0: + distribute_weight_req.append(self.comm.isend( + local_src_weight_vecs[irank], dest=irank, + tag=MPITags.DIST_WEIGHT + )) + + MPI.Request.Waitall(distribute_weight_req) + local_src_weight_vecs = local_src_weight_vecs[0] + else: + local_src_weight_vecs = self.comm.recv(source=0, tag=MPITags.DIST_WEIGHT) + + return local_src_weight_vecs + + def gather_potential_results(self, potentials, tgt_idx_all_ranks): + mpi_rank = self.comm.Get_rank() + mpi_size = self.comm.Get_size() + + from boxtree.distributed import dtype_to_mpi + potentials_mpi_type = dtype_to_mpi(potentials.dtype) + + gathered_potentials = None + + if mpi_rank == 0: + # The root rank received calculated potentials from all worker ranks + potentials_all_ranks = np.empty((mpi_size,), dtype=object) + potentials_all_ranks[0] = potentials + + recv_reqs = [] + + for irank in range(1, mpi_size): + potentials_all_ranks[irank] = np.empty( + tgt_idx_all_ranks[irank].shape, dtype=potentials.dtype) + + recv_reqs.append( + self.comm.Irecv( + [potentials_all_ranks[irank], potentials_mpi_type], + source=irank, tag=MPITags.GATHER_POTENTIALS)) + + MPI.Request.Waitall(recv_reqs) + + # Assemble potentials from worker ranks together on the root rank + gathered_potentials = np.empty( + self.global_traversal.tree.ntargets, dtype=potentials.dtype) + + for irank in range(mpi_size): + gathered_potentials[tgt_idx_all_ranks[irank]] = ( + potentials_all_ranks[irank]) + else: + # Worker ranks send calculated potentials to the root rank + self.comm.Send([potentials, potentials_mpi_type], + dest=0, tag=MPITags.GATHER_POTENTIALS) + + return gathered_potentials + + def _slice_mpoles(self, mpoles, slice_indices): + if len(slice_indices) == 0: + return np.empty((0,), dtype=mpoles.dtype) + + level_start_slice_indices = np.searchsorted( + slice_indices, self.traversal.tree.level_start_box_nrs) + mpoles_list = [] + + for ilevel in range(self.traversal.tree.nlevels): + start, stop = level_start_slice_indices[ilevel:ilevel+2] + if stop > start: + level_start_box_idx, mpoles_current_level = ( + self.multipole_expansions_view(mpoles, ilevel)) + mpoles_list.append( + mpoles_current_level[ + slice_indices[start:stop] - level_start_box_idx + ].reshape(-1) + ) + + return np.concatenate(mpoles_list) + + def _update_mpoles(self, mpoles, mpole_updates, slice_indices): + if len(slice_indices) == 0: + return + + level_start_slice_indices = np.searchsorted( + slice_indices, self.traversal.tree.level_start_box_nrs) + mpole_updates_start = 0 + + for ilevel in range(self.traversal.tree.nlevels): + start, stop = level_start_slice_indices[ilevel:ilevel+2] + if stop > start: + level_start_box_idx, mpoles_current_level = ( + self.multipole_expansions_view(mpoles, ilevel)) + mpoles_shape = (stop - start,) + mpoles_current_level.shape[1:] + + from pytools import product + mpole_updates_end = mpole_updates_start + product(mpoles_shape) + + mpoles_current_level[ + slice_indices[start:stop] - level_start_box_idx + ] += mpole_updates[ + mpole_updates_start:mpole_updates_end + ].reshape(mpoles_shape) + + mpole_updates_start = mpole_updates_end + + @memoize_method + def find_boxes_used_by_subrange_kernel(self, box_id_dtype): + return ElementwiseKernel( + self.context, + Template(r""" + ${box_id_t} *contributing_boxes_list, + int subrange_start, + int subrange_end, + ${box_id_t} *box_to_user_rank_starts, + int *box_to_user_rank_lists, + char *box_in_subrange + """).render( + box_id_t=dtype_to_ctype(box_id_dtype), + ), + Template(r""" + ${box_id_t} ibox = contributing_boxes_list[i]; + ${box_id_t} iuser_start = box_to_user_rank_starts[ibox]; + ${box_id_t} iuser_end = box_to_user_rank_starts[ibox + 1]; + for(${box_id_t} iuser = iuser_start; iuser < iuser_end; iuser++) { + int useri = box_to_user_rank_lists[iuser]; + if(subrange_start <= useri && useri < subrange_end) { + box_in_subrange[i] = 1; + } + } + """).render( + box_id_t=dtype_to_ctype(box_id_dtype) + ), + "find_boxes_used_by_subrange" + ) + + def find_boxes_used_by_subrange( + self, subrange, box_to_user_rank_starts, box_to_user_rank_lists, + contributing_boxes_list): + """Test whether the multipole expansions of the contributing boxes are used + by at least one box in a range. + + :arg subrange: the range is represented by ``(subrange[0], subrange[1])``. + :arg box_to_user_rank_starts: a :class:`pyopencl.array.Array` object + indicating the start and end index in *box_to_user_rank_lists* for each + box in *contributing_boxes_list*. + :arg box_to_user_rank_lists: a :class:`pyopencl.array.Array` object storing + the users of each box in *contributing_boxes_list*. + :returns: a :class:`pyopencl.array.Array` object with the same shape as + *contributing_boxes_list*, where the i-th entry is 1 if + ``contributing_boxes_list[i]`` is used by at least on box in the + subrange specified. + """ + box_in_subrange = cl.array.zeros( + contributing_boxes_list.queue, + contributing_boxes_list.shape[0], + dtype=np.int8 + ) + knl = self.find_boxes_used_by_subrange_kernel( + self.traversal.tree.box_id_dtype) + + knl( + contributing_boxes_list, + subrange[0], + subrange[1], + box_to_user_rank_starts, + box_to_user_rank_lists, + box_in_subrange + ) + + return box_in_subrange + + def communicate_mpoles(self, mpole_exps, return_stats=False): + """Based on Algorithm 3: Reduce and Scatter in Lashuk et al. [1]_. + + The main idea is to mimic an allreduce as done on a hypercube network, but to + decrease the bandwidth cost by sending only information that is relevant to + the rank receiving the message. + + .. [1] Lashuk, Ilya, Aparna Chandramowlishwaran, Harper Langston, + Tuan-Anh Nguyen, Rahul Sampath, Aashay Shringarpure, Richard Vuduc, + Lexing Ying, Denis Zorin, and George Biros. “A massively parallel + adaptive fast multipole method on heterogeneous architectures." + Communications of the ACM 55, no. 5 (2012): 101-109. + """ + mpi_rank = self.comm.Get_rank() + mpi_size = self.comm.Get_size() + tree = self.traversal.tree + + if self.communicate_mpoles_via_allreduce: + # Use MPI allreduce for communicating multipole expressions. It is slower + # but might be helpful for debugging purposes. + mpole_exps_all = np.zeros_like(mpole_exps) + self.comm.Allreduce(mpole_exps, mpole_exps_all) + mpole_exps[:] = mpole_exps_all + return + + stats = {} + + # contributing_boxes: + # + # A mask of the the set of boxes that the current process contributes + # to. This process contributes to a box when: + # + # (a) this process owns sources that contribute to the multipole expansion + # in the box (via the upward pass) or + # (b) this process has received a portion of the multipole expansion in this + # box from another process. + # + # Initially, this set consists of the boxes satisfying condition (a), which + # are precisely the boxes owned by this process and their ancestors. + contributing_boxes = tree.ancestor_mask.copy() + contributing_boxes[tree.responsible_boxes_list] = 1 + + from boxtree.tools import AllReduceCommPattern + comm_pattern = AllReduceCommPattern(mpi_rank, mpi_size) + + # Temporary buffers for receiving data + mpole_exps_buf = np.empty(mpole_exps.shape, dtype=mpole_exps.dtype) + boxes_list_buf = np.empty(tree.nboxes, dtype=tree.box_id_dtype) + + stats["bytes_sent_by_stage"] = [] + stats["bytes_recvd_by_stage"] = [] + + with cl.CommandQueue(self.context) as queue: + box_to_user_rank_starts_dev = cl.array.to_device( + queue, tree.box_to_user_rank_starts).with_queue(None) + + box_to_user_rank_lists_dev = cl.array.to_device( + queue, tree.box_to_user_rank_lists).with_queue(None) + + while not comm_pattern.done(): + send_requests = [] + + # Send data to other processors. + if comm_pattern.sinks(): + # Compute the subset of boxes to be sent. + message_subrange = comm_pattern.messages() + + contributing_boxes_list = np.nonzero(contributing_boxes)[0].astype( + tree.box_id_dtype + ) + + with cl.CommandQueue(self.context) as queue: + contributing_boxes_list_dev = cl.array.to_device( + queue, contributing_boxes_list) + + box_in_subrange = self.find_boxes_used_by_subrange( + message_subrange, + box_to_user_rank_starts_dev, box_to_user_rank_lists_dev, + contributing_boxes_list_dev + ) + + box_in_subrange_host = box_in_subrange.get().astype(bool) + + relevant_boxes_list = contributing_boxes_list[ + box_in_subrange_host + ].astype(tree.box_id_dtype) + + """ + # Pure Python version for debugging purpose + relevant_boxes_list = [] + for contrib_box in contributing_boxes_list: + iuser_start, iuser_end = tree.box_to_user_rank_starts[ + contrib_box:contrib_box + 2 + ] + for user_box in tree.box_to_user_rank_lists[ + iuser_start:iuser_end]: + if subrange_start <= user_box < subrange_end: + relevant_boxes_list.append(contrib_box) + break + """ + + relevant_boxes_list = np.array( + relevant_boxes_list, dtype=tree.box_id_dtype + ) + + relevant_mpole_exps = self._slice_mpoles( + mpole_exps, relevant_boxes_list) + + # Send the box subset to the other processors. + for sink in comm_pattern.sinks(): + req = self.comm.Isend(relevant_mpole_exps, dest=sink, + tag=MPITags.REDUCE_POTENTIALS) + send_requests.append(req) + + req = self.comm.Isend(relevant_boxes_list, dest=sink, + tag=MPITags.REDUCE_INDICES) + send_requests.append(req) + + # Receive data from other processors. + for source in comm_pattern.sources(): + self.comm.Recv(mpole_exps_buf, source=source, + tag=MPITags.REDUCE_POTENTIALS) + + status = MPI.Status() + self.comm.Recv( + boxes_list_buf, source=source, tag=MPITags.REDUCE_INDICES, + status=status) + nboxes = status.Get_count() // boxes_list_buf.dtype.itemsize + + # Update data structures. + self._update_mpoles( + mpole_exps, mpole_exps_buf, boxes_list_buf[:nboxes]) + + contributing_boxes[boxes_list_buf[:nboxes]] = 1 + + for req in send_requests: + req.wait() + + comm_pattern.advance() + + if return_stats: + return stats + + def finalize_potentials(self, potentials, template_ary): + if self.comm.Get_rank() == 0: + return super().finalize_potentials(potentials, template_ary) + else: + return None + + +class DistributedFMMLibExpansionWrangler( + DistributedExpansionWrangler, FMMLibExpansionWrangler): + def __init__( + self, context, comm, tree_indep, local_traversal, global_traversal, + fmm_level_to_nterms=None, + communicate_mpoles_via_allreduce=False, + **kwargs): + DistributedExpansionWrangler.__init__( + self, context, comm, global_traversal, + communicate_mpoles_via_allreduce=communicate_mpoles_via_allreduce) + FMMLibExpansionWrangler.__init__( + self, tree_indep, local_traversal, + fmm_level_to_nterms=fmm_level_to_nterms, **kwargs) + + #TODO: use log_process like FMMLibExpansionWrangler? + def reorder_sources(self, source_array): + if self.comm.Get_rank() == 0: + return source_array[..., self.global_traversal.tree.user_source_ids] + else: + return None + + def reorder_potentials(self, potentials): + if self.comm.Get_rank() == 0: + return potentials[self.global_traversal.tree.sorted_target_ids] + else: + return None + +# }}} diff --git a/boxtree/distributed/local_traversal.py b/boxtree/distributed/local_traversal.py new file mode 100644 index 0000000000000000000000000000000000000000..37514965596478a1f7ad5b10eb48005c317efb4c --- /dev/null +++ b/boxtree/distributed/local_traversal.py @@ -0,0 +1,68 @@ +__copyright__ = "Copyright (C) 2013 Andreas Kloeckner \ + Copyright (C) 2018 Hao Gao" + +__license__ = """ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +import time +import logging + +logger = logging.getLogger(__name__) + + +def generate_local_travs( + queue, local_tree, traversal_builder, box_bounding_box=None, + merge_close_lists=False): + """Generate local traversal from local tree. + + :arg queue: a :class:`pyopencl.CommandQueue` object. + :arg local_tree: the local tree of class + `boxtree.tools.ImmutableHostDeviceArray` on which the local traversal + object will be constructed. + :arg traversal_builder: a function, taken a :class:`pyopencl.CommandQueue` and + a tree, returns the traversal object based on the tree. + + :return: generated local traversal object in host memory + """ + start_time = time.time() + + local_tree.with_queue(queue) + + # We need `source_boxes_mask` and `source_parent_boxes_mask` here to restrict the + # multipole formation and upward propagation within the rank's responsible boxes + # region. Had there not been such restrictions, some sources might be distributed + # to more than 1 rank and counted multiple times. + d_local_trav, _ = traversal_builder( + queue, local_tree.to_device(queue), + box_bounding_box=box_bounding_box, + source_boxes_mask=local_tree.responsible_boxes_mask.device, + source_parent_boxes_mask=local_tree.ancestor_mask.device + ) + + if merge_close_lists and local_tree.targets_have_extent: + d_local_trav = d_local_trav.merge_close_lists(queue) + + local_trav = d_local_trav.get(queue=queue) + + logger.info("Generate local traversal in {} sec.".format( + str(time.time() - start_time)) + ) + + return local_trav diff --git a/boxtree/distributed/local_tree.py b/boxtree/distributed/local_tree.py new file mode 100644 index 0000000000000000000000000000000000000000..59b5844cd0e053ddfe4a623b7f99ad81d0b20bdd --- /dev/null +++ b/boxtree/distributed/local_tree.py @@ -0,0 +1,469 @@ +__copyright__ = "Copyright (C) 2013 Andreas Kloeckner \ + Copyright (C) 2018 Hao Gao" + +__license__ = """ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +from boxtree import Tree +from mako.template import Template +from pyopencl.tools import dtype_to_ctype +from pytools import memoize_method +import numpy as np +import pyopencl as cl +from dataclasses import dataclass +from typing import Optional +import time + +import logging +logger = logging.getLogger(__name__) + + +# FIXME: The logic in this file has a lot in common with +# the particle filtering functionality that already exists. +# We should refactor this to make use of this commonality. +# https://documen.tician.de/boxtree/tree.html#filtering-the-lists-of-targets + + +class LocalTreeGeneratorCodeContainer: + """Objects of this type serve as a place to keep the code needed for + :func:`generate_local_tree`. + """ + def __init__(self, cl_context, dimensions, particle_id_dtype, coord_dtype): + self.cl_context = cl_context + self.dimensions = dimensions + self.particle_id_dtype = particle_id_dtype + self.coord_dtype = coord_dtype + + @memoize_method + def particle_mask_kernel(self): + return cl.elementwise.ElementwiseKernel( + self.cl_context, + arguments=Template(""" + __global char *responsible_boxes, + __global ${particle_id_t} *box_particle_starts, + __global ${particle_id_t} *box_particle_counts_nonchild, + __global ${particle_id_t} *particle_mask + """, strict_undefined=True).render( + particle_id_t=dtype_to_ctype(self.particle_id_dtype) + ), + operation=Template(""" + if(responsible_boxes[i]) { + for(${particle_id_t} pid = box_particle_starts[i]; + pid < box_particle_starts[i] + + box_particle_counts_nonchild[i]; + ++pid) { + particle_mask[pid] = 1; + } + } + """).render(particle_id_t=dtype_to_ctype(self.particle_id_dtype)) + ) + + @memoize_method + def mask_scan_kernel(self): + from pyopencl.scan import GenericScanKernel + return GenericScanKernel( + self.cl_context, self.particle_id_dtype, + arguments=Template(""" + __global ${mask_t} *ary, + __global ${mask_t} *scan + """, strict_undefined=True).render( + mask_t=dtype_to_ctype(self.particle_id_dtype) + ), + input_expr="ary[i]", + scan_expr="a+b", neutral="0", + output_statement="scan[i + 1] = item;" + ) + + fetch_local_paticles_arguments = Template(""" + __global const ${mask_t} *particle_mask, + __global const ${mask_t} *particle_scan + % for dim in range(ndims): + , __global const ${coord_t} *particles_${dim} + % endfor + % for dim in range(ndims): + , __global ${coord_t} *local_particles_${dim} + % endfor + % if particles_have_extent: + , __global const ${coord_t} *particle_radii + , __global ${coord_t} *local_particle_radii + % endif + """, strict_undefined=True) + + fetch_local_particles_prg = Template(""" + if(particle_mask[i]) { + ${particle_id_t} des = particle_scan[i]; + % for dim in range(ndims): + local_particles_${dim}[des] = particles_${dim}[i]; + % endfor + % if particles_have_extent: + local_particle_radii[des] = particle_radii[i]; + % endif + } + """, strict_undefined=True) + + @memoize_method + def fetch_local_particles_kernel(self, particles_have_extent): + return cl.elementwise.ElementwiseKernel( + self.cl_context, + self.fetch_local_paticles_arguments.render( + mask_t=dtype_to_ctype(self.particle_id_dtype), + coord_t=dtype_to_ctype(self.coord_dtype), + ndims=self.dimensions, + particles_have_extent=particles_have_extent + ), + self.fetch_local_particles_prg.render( + particle_id_t=dtype_to_ctype(self.particle_id_dtype), + ndims=self.dimensions, + particles_have_extent=particles_have_extent + ) + ) + + @memoize_method + def mask_compressor_kernel(self): + from boxtree.tools import MaskCompressorKernel + return MaskCompressorKernel(self.cl_context) + + @memoize_method + def modify_target_flags_kernel(self): + from boxtree import box_flags_enum + box_flag_t = dtype_to_ctype(box_flags_enum.dtype) + + return cl.elementwise.ElementwiseKernel( + self.cl_context, + Template(""" + __global ${particle_id_t} *box_target_counts_nonchild, + __global ${particle_id_t} *box_target_counts_cumul, + __global ${box_flag_t} *box_flags + """).render( + particle_id_t=dtype_to_ctype(self.particle_id_dtype), + box_flag_t=box_flag_t + ), + Template(r""" + // reset HAS_OWN_TARGETS and HAS_CHILD_TARGETS bits in the flag of + // each box + box_flags[i] &= (~${HAS_OWN_TARGETS}); + box_flags[i] &= (~${HAS_CHILD_TARGETS}); + + // rebuild HAS_OWN_TARGETS and HAS_CHILD_TARGETS bits + if(box_target_counts_nonchild[i]) box_flags[i] |= ${HAS_OWN_TARGETS}; + if(box_target_counts_nonchild[i] < box_target_counts_cumul[i]) + box_flags[i] |= ${HAS_CHILD_TARGETS}; + """).render( + HAS_OWN_TARGETS=( + "(" + box_flag_t + ") " + str(box_flags_enum.HAS_OWN_TARGETS) + ), + HAS_CHILD_TARGETS=( + "(" + box_flag_t + ") " + str(box_flags_enum.HAS_CHILD_TARGETS) + ) + ) + ) + + +@dataclass +class LocalParticlesAndLists: + particles: cl.array.Array + particle_radii: Optional[cl.array.Array] + box_particle_starts: cl.array.Array + box_particle_counts_nonchild: cl.array.Array + box_particle_counts_cumul: cl.array.Array + particle_idx: np.ndarray + + +def construct_local_particles_and_lists( + queue, code, dimensions, num_boxes, num_global_particles, + particle_id_dtype, coord_dtype, particles_have_extent, + box_mask, + global_particles, global_particle_radii, + box_particle_starts, box_particle_counts_nonchild, + box_particle_counts_cumul): + """This helper function generates particles (either sources or targets) of the + local tree, and reconstructs list of lists indexing accordingly. + """ + # {{{ calculate the particle mask + + particle_mask = cl.array.zeros( + queue, num_global_particles, dtype=particle_id_dtype) + + code.particle_mask_kernel()( + box_mask, box_particle_starts, box_particle_counts_nonchild, particle_mask) + + # }}} + + # {{{ calculate the scan of the particle mask + + global_to_local_particle_index = cl.array.empty( + queue, num_global_particles + 1, dtype=particle_id_dtype) + + global_to_local_particle_index[0] = 0 + code.mask_scan_kernel()(particle_mask, global_to_local_particle_index) + + # }}} + + # {{{ fetch the local particles + + num_local_particles = global_to_local_particle_index[-1].get(queue).item() + + local_particles = cl.array.empty( + queue, (dimensions, num_local_particles), dtype=coord_dtype) + + local_particles_list = [local_particles[idim, :] for idim in range(dimensions)] + + local_particle_radii = None + if particles_have_extent: + local_particle_radii = cl.array.empty( + queue, num_local_particles, dtype=coord_dtype) + + code.fetch_local_particles_kernel(True)( + particle_mask, global_to_local_particle_index, + *global_particles.tolist(), + *local_particles_list, + global_particle_radii, + local_particle_radii) + else: + code.fetch_local_particles_kernel(False)( + particle_mask, global_to_local_particle_index, + *global_particles.tolist(), + *local_particles_list) + + # {{{ construct the list of list indices + + local_box_particle_starts = global_to_local_particle_index[box_particle_starts] + + box_counts_all_zeros = cl.array.zeros(queue, num_boxes, dtype=particle_id_dtype) + + local_box_particle_counts_nonchild = cl.array.if_positive( + box_mask, box_particle_counts_nonchild, box_counts_all_zeros) + + box_particle_ends_cumul = box_particle_starts + box_particle_counts_cumul + + local_box_particle_counts_cumul = ( + global_to_local_particle_index[box_particle_ends_cumul] + - global_to_local_particle_index[box_particle_starts]) + + # }}} + + particle_mask = particle_mask.get(queue=queue).astype(bool) + particle_idx = np.arange(num_global_particles)[particle_mask] + + return LocalParticlesAndLists( + local_particles, + local_particle_radii, + local_box_particle_starts, + local_box_particle_counts_nonchild, + local_box_particle_counts_cumul, + particle_idx) + + +class LocalTree(Tree): + """ + Inherits from :class:`boxtree.Tree`. + + .. attribute:: box_to_user_rank_starts + + ``box_id_t [nboxes + 1]`` + + .. attribute:: box_to_user_rank_lists + + ``int32 [*]`` + + A :ref:`csr` array, together with :attr:`box_to_user_rank_starts`. + For each box, the list of ranks which own targets that *use* the + multipole expansion at this box, via either List 3 or (possibly downward + propagated from an ancestor) List 2. + """ + + +def generate_local_tree(queue, global_traversal, responsible_boxes_list, comm): + """Generate the local tree for the current rank. + + This is an MPI-collective routine on *comm*. + + :arg queue: a :class:`pyopencl.CommandQueue` object. + :arg global_traversal: Global :class:`boxtree.traversal.FMMTraversalInfo` object + on host memory. + :arg responsible_boxes_list: a :class:`numpy.ndarray` object containing the + responsible boxes of the current rank. + + :return: a tuple of ``(local_tree, src_idx, tgt_idx)``, where ``local_tree`` is + an object with class :class:`boxtree.distributed.local_tree.LocalTree` of the + generated local tree, ``src_idx`` is the indices of the local sources in the + global tree, and ``tgt_idx`` is the indices of the local targets in the + global tree. ``src_idx`` and ``tgt_idx`` are needed for distributing source + weights from root rank and assembling calculated potentials on the root rank. + """ + global_tree = global_traversal.tree + code = LocalTreeGeneratorCodeContainer( + queue.context, global_tree.dimensions, + global_tree.particle_id_dtype, global_tree.coord_dtype) + + mpi_rank = comm.Get_rank() + mpi_size = comm.Get_size() + + start_time = time.time() + + from boxtree.distributed.partition import get_box_masks + box_masks = get_box_masks(queue, global_traversal, responsible_boxes_list) + + global_tree_dev = global_tree.to_device(queue).with_queue(queue) + + local_sources_and_lists = construct_local_particles_and_lists( + queue, code, global_tree.dimensions, global_tree.nboxes, + global_tree.nsources, + global_tree.particle_id_dtype, global_tree.coord_dtype, + global_tree.sources_have_extent, + box_masks.point_src_boxes, + global_tree_dev.sources, + global_tree_dev.sources_radii if global_tree.sources_have_extent else None, + global_tree_dev.box_source_starts, + global_tree_dev.box_source_counts_nonchild, + global_tree_dev.box_source_counts_cumul) + + local_targets_and_lists = construct_local_particles_and_lists( + queue, code, global_tree.dimensions, global_tree.nboxes, + global_tree.ntargets, + global_tree.particle_id_dtype, global_tree.coord_dtype, + global_tree.targets_have_extent, + box_masks.responsible_boxes, + global_tree_dev.targets, + global_tree_dev.target_radii if global_tree.targets_have_extent else None, + global_tree_dev.box_target_starts, + global_tree_dev.box_target_counts_nonchild, + global_tree_dev.box_target_counts_cumul) + + # {{{ compute the users of multipole expansions of each box on the root rank + + multipole_src_boxes_all_ranks = None + if mpi_rank == 0: + multipole_src_boxes_all_ranks = np.empty( + (mpi_size, global_tree.nboxes), + dtype=box_masks.multipole_src_boxes.dtype) + comm.Gather( + box_masks.multipole_src_boxes.get(), multipole_src_boxes_all_ranks, root=0) + + box_to_user_rank_starts = None + box_to_user_rank_lists = None + + if mpi_rank == 0: + multipole_src_boxes_all_ranks = cl.array.to_device( + queue, multipole_src_boxes_all_ranks) + + (box_to_user_rank_starts, box_to_user_rank_lists, evt) = \ + code.mask_compressor_kernel()( + queue, multipole_src_boxes_all_ranks.transpose(), + list_dtype=np.int32) + + cl.wait_for_events([evt]) + + box_to_user_rank_starts = box_to_user_rank_starts.get() + box_to_user_rank_lists = box_to_user_rank_lists.get() + + logger.debug("computing box_to_user: done") + + box_to_user_rank_starts = comm.bcast(box_to_user_rank_starts, root=0) + box_to_user_rank_lists = comm.bcast(box_to_user_rank_lists, root=0) + + # }}} + + # {{{ Reconstruct the target box flags + + # Note: We do not change the source box flags despite the local tree may only + # contain a subset of sources. This is because evaluating target potentials in + # the responsible boxes of the current rank may depend on the multipole + # expansions formed by souces in other ranks. Modifying the source box flags + # could result in incomplete interaction lists. + + local_box_flags = global_tree_dev.box_flags.copy(queue=queue) + code.modify_target_flags_kernel()( + local_targets_and_lists.box_particle_counts_nonchild, + local_targets_and_lists.box_particle_counts_cumul, + local_box_flags) + + # }}} + + local_sources = local_sources_and_lists.particles.get(queue=queue) + local_targets = local_targets_and_lists.particles.get(queue=queue) + + local_tree = LocalTree( + sources_are_targets=global_tree.sources_are_targets, + sources_have_extent=global_tree.sources_have_extent, + targets_have_extent=global_tree.targets_have_extent, + + particle_id_dtype=global_tree.particle_id_dtype, + box_id_dtype=global_tree.box_id_dtype, + coord_dtype=global_tree.coord_dtype, + box_level_dtype=global_tree.box_level_dtype, + + root_extent=global_tree.root_extent, + stick_out_factor=global_tree.stick_out_factor, + extent_norm=global_tree.extent_norm, + + bounding_box=global_tree.bounding_box, + level_start_box_nrs=global_tree.level_start_box_nrs, + level_start_box_nrs_dev=global_tree.level_start_box_nrs_dev, + + sources=local_sources, + targets=local_targets, + source_radii=(local_sources_and_lists.particle_radii.get(queue=queue) + if global_tree.sources_have_extent else None), + target_radii=(local_targets_and_lists.particle_radii.get(queue=queue) + if global_tree.targets_have_extent else None), + + box_source_starts=( + local_sources_and_lists.box_particle_starts.get(queue=queue)), + box_source_counts_nonchild=( + local_sources_and_lists.box_particle_counts_nonchild.get(queue=queue)), + box_source_counts_cumul=( + local_sources_and_lists.box_particle_counts_cumul.get(queue=queue)), + box_target_starts=( + local_targets_and_lists.box_particle_starts.get(queue=queue)), + box_target_counts_nonchild=( + local_targets_and_lists.box_particle_counts_nonchild.get(queue=queue)), + box_target_counts_cumul=( + local_targets_and_lists.box_particle_counts_cumul.get(queue=queue)), + + box_parent_ids=global_tree.box_parent_ids, + box_child_ids=global_tree.box_child_ids, + box_centers=global_tree.box_centers, + box_levels=global_tree.box_levels, + box_flags=local_box_flags.get(queue=queue), + + user_source_ids=None, + sorted_target_ids=None, + + _is_pruned=global_tree._is_pruned, + + responsible_boxes_list=responsible_boxes_list, + responsible_boxes_mask=box_masks.responsible_boxes.get(), + ancestor_mask=box_masks.ancestor_boxes.get(), + box_to_user_rank_starts=box_to_user_rank_starts, + box_to_user_rank_lists=box_to_user_rank_lists + ) + + local_tree = local_tree.to_host_device_array(queue) + local_tree.with_queue(None) + + logger.info("Generate local tree on rank {} in {} sec.".format( + mpi_rank, str(time.time() - start_time) + )) + + return ( + local_tree, + local_sources_and_lists.particle_idx, + local_targets_and_lists.particle_idx) diff --git a/boxtree/distributed/partition.py b/boxtree/distributed/partition.py new file mode 100644 index 0000000000000000000000000000000000000000..5f8d43db93e236ca04535748f11211e88deabb77 --- /dev/null +++ b/boxtree/distributed/partition.py @@ -0,0 +1,349 @@ +__copyright__ = "Copyright (C) 2012 Andreas Kloeckner \ + Copyright (C) 2018 Hao Gao" + +__license__ = """ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +import numpy as np +import pyopencl as cl +from pyopencl.tools import dtype_to_ctype +from mako.template import Template +from pytools import memoize_method +from dataclasses import dataclass + + +def get_box_ids_dfs_order(tree): + """Helper function for getting box ids of a tree in depth-first order. + + :arg tree: A :class:`boxtree.Tree` object in the host memory. See + :meth:`boxtree.Tree.get` for getting a tree object in host memory. + :return: A numpy array of box ids in depth-first order. + """ + # FIXME: optimize the performance with OpenCL + dfs_order = np.empty((tree.nboxes,), dtype=tree.box_id_dtype) + idx = 0 + stack = [0] + while stack: + box_id = stack.pop() + dfs_order[idx] = box_id + idx += 1 + for i in range(2**tree.dimensions): + child_box_id = tree.box_child_ids[i][box_id] + if child_box_id > 0: + stack.append(child_box_id) + return dfs_order + + +def partition_work(cost_per_box, traversal, comm): + """This function assigns responsible boxes for each rank. + + If a rank is responsible for a box, it will calculate the multiple expansion of + the box and evaluate target potentials in the box. + + :arg cost_per_box: The expected running time of each box. This argument is only + significant on the root rank. + :arg traversal: The global traversal object containing all particles. This + argument is significant on all ranks. + :arg comm: MPI communicator. + :return: A numpy array containing the responsible boxes of the current rank. + """ + tree = traversal.tree + mpi_rank = comm.Get_rank() + mpi_size = comm.Get_size() + + if mpi_size > tree.nboxes: + raise RuntimeError("Fail to partition work because the number of boxes is " + "less than the number of processes.") + + # transform tree from the level order to the morton dfs order + # dfs_order[i] stores the level-order box index of dfs index i + dfs_order = get_box_ids_dfs_order(tree) + + # partition all boxes in dfs order evenly according to workload on the root rank + + responsible_boxes_segments = None + # contains: [start_index, end_index) + responsible_boxes_current_rank = np.empty(2, dtype=tree.box_id_dtype) + + # FIXME: Right now, the responsible boxes assigned to all ranks are computed + # centrally on the root rank to avoid inconsistency risks of floating point + # operations. We could improve the efficiency by letting each rank compute the + # costs of a subset of boxes, and use MPI_Scan to aggregate the results. + if mpi_rank == 0: + total_workload = np.sum(cost_per_box) + + # second axis: [start_index, end_index) + responsible_boxes_segments = np.empty((mpi_size, 2), dtype=tree.box_id_dtype) + segment_idx = 0 + start = 0 + workload_count = 0 + for box_idx_dfs_order in range(tree.nboxes): + if segment_idx + 1 == mpi_size: + responsible_boxes_segments[segment_idx, :] = [start, tree.nboxes] + break + + box_idx = dfs_order[box_idx_dfs_order] + workload_count += cost_per_box[box_idx] + if (workload_count > (segment_idx + 1) * total_workload / mpi_size + or box_idx_dfs_order == tree.nboxes - 1): + # record "end of rank segment" + responsible_boxes_segments[segment_idx, :] = ( + [start, box_idx_dfs_order + 1]) + start = box_idx_dfs_order + 1 + segment_idx += 1 + + comm.Scatter(responsible_boxes_segments, responsible_boxes_current_rank, root=0) + + return dfs_order[ + responsible_boxes_current_rank[0]:responsible_boxes_current_rank[1]] + + +class GetBoxMasksCodeContainer: + def __init__(self, cl_context, box_id_dtype): + self.cl_context = cl_context + self.box_id_dtype = box_id_dtype + + @memoize_method + def add_interaction_list_boxes_kernel(self): + """Given a ``responsible_boxes_mask`` and an interaction list, mark source + boxes for target boxes in ``responsible_boxes_mask`` in a new separate mask. + """ + return cl.elementwise.ElementwiseKernel( + self.cl_context, + Template(""" + __global ${box_id_t} *box_list, + __global char *responsible_boxes_mask, + __global ${box_id_t} *interaction_boxes_starts, + __global ${box_id_t} *interaction_boxes_lists, + __global char *src_boxes_mask + """, strict_undefined=True).render( + box_id_t=dtype_to_ctype(self.box_id_dtype) + ), + Template(r""" + typedef ${box_id_t} box_id_t; + box_id_t current_box = box_list[i]; + if(responsible_boxes_mask[current_box]) { + for(box_id_t box_idx = interaction_boxes_starts[i]; + box_idx < interaction_boxes_starts[i + 1]; + ++box_idx) + src_boxes_mask[interaction_boxes_lists[box_idx]] = 1; + } + """, strict_undefined=True).render( + box_id_t=dtype_to_ctype(self.box_id_dtype) + ), + ) + + @memoize_method + def add_parent_boxes_kernel(self): + return cl.elementwise.ElementwiseKernel( + self.cl_context, + "__global char *current, __global char *parent, " + "__global %s *box_parent_ids" % dtype_to_ctype(self.box_id_dtype), + "if(i != 0 && current[i]) parent[box_parent_ids[i]] = 1" + ) + + +def get_ancestor_boxes_mask(queue, code, traversal, responsible_boxes_mask): + """Query the ancestors of responsible boxes. + + :arg responsible_boxes_mask: A :class:`pyopencl.array.Array` object of shape + ``(tree.nboxes,)`` whose i-th entry is 1 if ``i`` is a responsible box. + :return: A :class:`pyopencl.array.Array` object of shape ``(tree.nboxes,)`` whose + i-th entry is 1 if ``i`` is an ancestor of the responsible boxes specified by + *responsible_boxes_mask*. + """ + ancestor_boxes = cl.array.zeros(queue, (traversal.tree.nboxes,), dtype=np.int8) + ancestor_boxes_last = responsible_boxes_mask.copy() + + while ancestor_boxes_last.any(): + ancestor_boxes_new = cl.array.zeros( + queue, (traversal.tree.nboxes,), dtype=np.int8) + code.add_parent_boxes_kernel()( + ancestor_boxes_last, ancestor_boxes_new, traversal.tree.box_parent_ids) + ancestor_boxes_new = ancestor_boxes_new & (~ancestor_boxes) + ancestor_boxes = ancestor_boxes | ancestor_boxes_new + ancestor_boxes_last = ancestor_boxes_new + + return ancestor_boxes + + +def get_point_src_boxes_mask( + queue, code, traversal, responsible_boxes_mask, ancestor_boxes_mask): + """Query the boxes whose sources are needed in order to evaluate potentials + of boxes represented by *responsible_boxes_mask*. + + :arg responsible_boxes_mask: A :class:`pyopencl.array.Array` object of shape + ``(tree.nboxes,)`` whose i-th entry is 1 if ``i`` is a responsible box. + :param ancestor_boxes_mask: A :class:`pyopencl.array.Array` object of shape + ``(tree.nboxes,)`` whose i-th entry is 1 if ``i`` is either a responsible box + or an ancestor of the responsible boxes. + :return: A :class:`pyopencl.array.Array` object of shape ``(tree.nboxes,)`` whose + i-th entry is 1 if souces of box ``i`` are needed for evaluating the + potentials of targets in boxes represented by *responsible_boxes_mask*. + """ + + src_boxes_mask = responsible_boxes_mask.copy() + + # Add list 1 of responsible boxes + code.add_interaction_list_boxes_kernel()( + traversal.target_boxes, responsible_boxes_mask, + traversal.neighbor_source_boxes_starts, + traversal.neighbor_source_boxes_lists, src_boxes_mask, + queue=queue) + + # Add list 4 of responsible boxes or ancestor boxes + code.add_interaction_list_boxes_kernel()( + traversal.target_or_target_parent_boxes, + responsible_boxes_mask | ancestor_boxes_mask, + traversal.from_sep_bigger_starts, traversal.from_sep_bigger_lists, + src_boxes_mask, + queue=queue) + + if traversal.tree.targets_have_extent: + # Add list 3 close of responsible boxes + if traversal.from_sep_close_smaller_starts is not None: + code.add_interaction_list_boxes_kernel()( + traversal.target_boxes, + responsible_boxes_mask, + traversal.from_sep_close_smaller_starts, + traversal.from_sep_close_smaller_lists, + src_boxes_mask, + queue=queue + ) + + # Add list 4 close of responsible boxes + if traversal.from_sep_close_bigger_starts is not None: + code.add_interaction_list_boxes_kernel()( + traversal.target_boxes, + responsible_boxes_mask | ancestor_boxes_mask, + traversal.from_sep_close_bigger_starts, + traversal.from_sep_close_bigger_lists, + src_boxes_mask, + queue=queue + ) + + return src_boxes_mask + + +def get_multipole_src_boxes_mask( + queue, code, traversal, responsible_boxes_mask, ancestor_boxes_mask): + """Query the boxes whose multipoles are used in order to evaluate + potentials of targets in boxes represented by *responsible_boxes_mask*. + + :arg responsible_boxes_mask: A :class:`pyopencl.array.Array` object of shape + ``(tree.nboxes,)`` whose i-th entry is 1 if ``i`` is a responsible box. + :arg ancestor_boxes_mask: A :class:`pyopencl.array.Array` object of shape + ``(tree.nboxes,)`` whose i-th entry is 1 if ``i`` is either a responsible box + or an ancestor of the responsible boxes. + :return: A :class:`pyopencl.array.Array` object of shape ``(tree.nboxes,)`` + whose i-th entry is 1 if multipoles of box ``i`` are needed for evaluating + the potentials of targets in boxes represented by *responsible_boxes_mask*. + """ + + multipole_boxes_mask = cl.array.zeros( + queue, (traversal.tree.nboxes,), dtype=np.int8 + ) + + # A mpole is used by process p if it is in the List 2 of either a box + # owned by p or one of its ancestors. + code.add_interaction_list_boxes_kernel()( + traversal.target_or_target_parent_boxes, + responsible_boxes_mask | ancestor_boxes_mask, + traversal.from_sep_siblings_starts, + traversal.from_sep_siblings_lists, + multipole_boxes_mask, + queue=queue + ) + multipole_boxes_mask.finish() + + # A mpole is used by process p if it is in the List 3 of a box owned by p. + for ilevel in range(traversal.tree.nlevels): + code.add_interaction_list_boxes_kernel()( + traversal.target_boxes_sep_smaller_by_source_level[ilevel], + responsible_boxes_mask, + traversal.from_sep_smaller_by_level[ilevel].starts, + traversal.from_sep_smaller_by_level[ilevel].lists, + multipole_boxes_mask, + queue=queue + ) + + multipole_boxes_mask.finish() + + return multipole_boxes_mask + + +@dataclass +class BoxMasks: + """ + Box masks needed for the distributed calculation. Each of these masks is a + PyOpenCL array with length ``tree.nboxes``, whose `i`-th entry is 1 if box `i` is + set. + + .. attribute:: responsible_boxes + + Current process will evaluate target potentials and multipole expansions in + these boxes. Sources and targets in these boxes are needed. + + .. attribute:: ancestor_boxes + + Ancestors of the responsible boxes. + + .. attribute:: point_src_boxes + + Current process needs sources but not targets in these boxes. + + .. attribute:: multipole_src_boxes + + Current process needs multipole expressions in these boxes. + """ + responsible_boxes: cl.array.Array + ancestor_boxes: cl.array.Array + point_src_boxes: cl.array.Array + multipole_src_boxes: cl.array.Array + + +def get_box_masks(queue, traversal, responsible_boxes_list): + """Given the responsible boxes for a rank, this helper function calculates the + relevant masks. + + :arg responsible_boxes_list: A numpy array of responsible box indices. + + :returns: A :class:`BoxMasks` object of the relevant masks. + """ + code = GetBoxMasksCodeContainer(queue.context, traversal.tree.box_id_dtype) + + traversal = traversal.to_device(queue) + + responsible_boxes_mask = np.zeros((traversal.tree.nboxes,), dtype=np.int8) + responsible_boxes_mask[responsible_boxes_list] = 1 + responsible_boxes_mask = cl.array.to_device(queue, responsible_boxes_mask) + + ancestor_boxes_mask = get_ancestor_boxes_mask( + queue, code, traversal, responsible_boxes_mask) + + point_src_boxes_mask = get_point_src_boxes_mask( + queue, code, traversal, responsible_boxes_mask, ancestor_boxes_mask) + + multipole_src_boxes_mask = get_multipole_src_boxes_mask( + queue, code, traversal, responsible_boxes_mask, ancestor_boxes_mask) + + return BoxMasks( + responsible_boxes_mask, ancestor_boxes_mask, point_src_boxes_mask, + multipole_src_boxes_mask) diff --git a/boxtree/fmm.py b/boxtree/fmm.py index 33d3a1b5a03729dfb51bd64aa593b906b8709d58..2cac0f6bbced96addc5f6982c641964b70236b07 100644 --- a/boxtree/fmm.py +++ b/boxtree/fmm.py @@ -266,11 +266,65 @@ class ExpansionWranglerInterface(ABC): type (typically :class:`pyopencl.array.Array`). """ + def distribute_source_weights(self, src_weight_vecs, src_idx_all_ranks): + """Used by the distributed implementation for transferring needed source + weights from root rank to each worker rank in the communicator. + + This method needs to be called collectively by all ranks in the communicator. + + :arg src_weight_vecs: a sequence of :class:`numpy.ndarray`, each with length + ``nsources``, representing the weights of sources on the root rank. + *None* on worker ranks. + :arg src_idx_all_ranks: a :class:`list` of length ``nranks``, including the + root rank, where the i-th entry is a :class:`numpy.ndarray` of indices, + of which *src_weight_vecs* to be sent from the root rank to rank *i*. + Each entry can be generated by :func:`.generate_local_tree`. *None* on + worker ranks. + + :return: Received source weights of the current rank, including the root + rank. + """ + return src_weight_vecs + + def gather_potential_results(self, potentials, tgt_idx_all_ranks): + """Used by the distributed implementation for gathering calculated potentials + from all worker ranks in the communicator to the root rank. + + This method needs to be called collectively by all ranks in the communicator. + + :arg potentials: Calculated potentials on each rank. This argument is + significant on all ranks, including the root rank. + :arg tgt_idx_all_ranks: a :class:`list` of length ``nranks``, where the + i-th entry is a :class:`numpy.ndarray` of the global potential indices + of potentials from rank *i*. This argument is only significant on the + root rank. + + :return: Gathered potentials on the root rank. *None* on worker ranks. + """ + return potentials + + def communicate_mpoles(self, mpole_exps, return_stats=False): + """Used by the distributed implementation for forming the complete multipole + expansions from the partial multipole expansions. + + This function accepts partial multipole expansions in the argument + *mpole_exps*, and modifies *mpole_exps* in place with the communicated and + reduced multipole expansions. + + This function needs to be called collectively by all ranks in the + communicator. + + :returns: Statistics of the communication if *return_stats* is True. *None* + otherwise. + """ + pass + # }}} def drive_fmm(wrangler: ExpansionWranglerInterface, src_weight_vecs, - timing_data=None): + timing_data=None, + global_src_idx_all_ranks=None, global_tgt_idx_all_ranks=None): """Top-level driver routine for a fast multipole calculation. In part, this is intended as a template for custom FMMs, in the sense that @@ -282,14 +336,27 @@ def drive_fmm(wrangler: ExpansionWranglerInterface, src_weight_vecs, covered by supplying the right *expansion_wrangler* to this routine. :arg expansion_wrangler: An object exhibiting the - :class:`ExpansionWranglerInterface`. + :class:`ExpansionWranglerInterface`. For distributed implementation, this + wrangler should be a subclass of + :class:`boxtree.distributed.calculation.DistributedExpansionWrangler`. :arg src_weight_vecs: A sequence of source 'density/weights/charges'. - Passed unmodified to *expansion_wrangler*. + Passed unmodified to *expansion_wrangler*. For distributed + implementation, this argument is only significant on the root rank. :arg timing_data: Either *None*, or a :class:`dict` that is populated with timing information for the stages of the algorithm (in the form of :class:`~boxtree.timing.TimingResult`), if such information is available. - - Returns the potentials computed by *expansion_wrangler*. + :arg global_src_idx_all_ranks: a :class:`list` of length ``nranks``, where the + i-th entry is a :class:`numpy.ndarray` representing the global indices of + sources in the local tree on rank *i*. Each entry can be returned from + *generate_local_tree*. This argument is significant only on the root rank. + :arg global_tgt_idx_all_ranks: a :class:`list` of length ``nranks``, where the + i-th entry is a :class:`numpy.ndarray` representing the global indices of + targets in the local tree on rank *i*. Each entry can be returned from + *generate_local_tree*. This argument is significant only on the root rank. + + :return: the potentials computed by *expansion_wrangler*. For the distributed + implementation, the potentials are gathered and returned on the root rank; + this function returns *None* on the worker ranks. """ traversal = wrangler.traversal @@ -304,6 +371,9 @@ def drive_fmm(wrangler: ExpansionWranglerInterface, src_weight_vecs, src_weight_vecs = [wrangler.reorder_sources(weight) for weight in src_weight_vecs] + src_weight_vecs = wrangler.distribute_source_weights( + src_weight_vecs, global_src_idx_all_ranks) + # {{{ "Step 2.1:" Construct local multipoles mpole_exps, timing_future = wrangler.form_multipoles( @@ -328,6 +398,8 @@ def drive_fmm(wrangler: ExpansionWranglerInterface, src_weight_vecs, # }}} + wrangler.communicate_mpoles(mpole_exps) + # {{{ "Stage 3:" Direct evaluation from neighbor source boxes ("list 1") potentials, timing_future = wrangler.eval_direct( @@ -439,6 +511,9 @@ def drive_fmm(wrangler: ExpansionWranglerInterface, src_weight_vecs, # }}} + potentials = wrangler.gather_potential_results( + potentials, global_tgt_idx_all_ranks) + result = wrangler.reorder_potentials(potentials) result = wrangler.finalize_potentials(result, template_ary=src_weight_vecs[0]) diff --git a/boxtree/tools.py b/boxtree/tools.py index 7d2e4b6da95c3a980438d83a253b7a2f99125f7d..d9549be9d4a9f664806e50aea347718c625c9255 100644 --- a/boxtree/tools.py +++ b/boxtree/tools.py @@ -25,8 +25,7 @@ import numpy as np from pytools import Record, memoize_method import pyopencl as cl import pyopencl.array # noqa -from pyopencl.tools import dtype_to_c_struct, VectorArg as _VectorArg -from pyopencl.tools import ScalarArg # noqa +from pyopencl.tools import dtype_to_c_struct, ScalarArg, VectorArg as _VectorArg from mako.template import Template from pytools.obj_array import make_obj_array import loopy as lp @@ -34,12 +33,12 @@ import loopy as lp from loopy.version import LOOPY_USE_LANGUAGE_VERSION_2018_2 # noqa from functools import partial +import sys # Use offsets in VectorArg by default. VectorArg = partial(_VectorArg, with_offset=True) - AXIS_NAMES = ("x", "y", "z", "w") @@ -304,11 +303,13 @@ class DeviceDataRecord(Record): def get(self, queue, **kwargs): """Return a copy of `self` in which all data lives on the host, i.e. - all :class:`pyopencl.array.Array` objects are replaced by corresponding - :class:`numpy.ndarray` instances on the host. + all :class:`pyopencl.array.Array` and `ImmutableHostDeviceArray` objects are + replaced by corresponding :class:`numpy.ndarray` instances on the host. """ - def try_get(attr): + if isinstance(attr, ImmutableHostDeviceArray): + return attr.host + try: get_meth = attr.get except AttributeError: @@ -339,7 +340,7 @@ class DeviceDataRecord(Record): return self._transform_arrays(try_with_queue) def to_device(self, queue, exclude_fields=frozenset()): - """ Return a copy of `self` in all :class:`numpy.ndarray` arrays are + """Return a copy of `self` in all :class:`numpy.ndarray` arrays are transferred to device memory as :class:`pyopencl.array.Array` objects. :arg exclude_fields: a :class:`frozenset` containing fields excluding from @@ -349,10 +350,33 @@ class DeviceDataRecord(Record): def _to_device(attr): if isinstance(attr, np.ndarray): return cl.array.to_device(queue, attr).with_queue(None) + elif isinstance(attr, ImmutableHostDeviceArray): + return attr.device + elif isinstance(attr, DeviceDataRecord): + return attr.to_device(queue) else: return attr - return self._transform_arrays(_to_device, exclude_fields) + return self._transform_arrays(_to_device, exclude_fields=exclude_fields) + + def to_host_device_array(self, queue, exclude_fields=frozenset()): + """Return a copy of `self` where all device and host arrays are transformed + to `ImmutableHostDeviceArray` objects. + + :arg exclude_fields: a :class:`frozenset` containing fields excluding from + transformed to `ImmutableHostDeviceArray`. + """ + def _to_host_device_array(attr): + if isinstance(attr, (np.ndarray, cl.array.Array)): + return ImmutableHostDeviceArray(queue, attr) + elif isinstance(attr, DeviceDataRecord): + return attr.to_host_device_array(queue) + else: + return attr + + return self._transform_arrays( + _to_host_device_array, exclude_fields=exclude_fields + ) # }}} @@ -587,4 +611,301 @@ class InlineBinarySearch: # }}} +# {{{ compress a masked array into a list / list of lists + + +MASK_LIST_COMPRESSOR_BODY = r""" +void generate(LIST_ARG_DECL USER_ARG_DECL index_type i) +{ + if (mask[i]) + { + APPEND_output(i); + } +} +""" + + +MASK_MATRIX_COMPRESSOR_BODY = r""" +void generate(LIST_ARG_DECL USER_ARG_DECL index_type i) +{ + for (int j = 0; j < ncols; ++j) + { + if (mask[outer_stride * i + j * inner_stride]) + { + APPEND_output(j); + } + } +} +""" + + +class MaskCompressorKernel: + """ + .. automethod:: __call__ + """ + def __init__(self, context): + self.context = context + + @memoize_method + def get_list_compressor_kernel(self, mask_dtype, list_dtype): + from pyopencl.algorithm import ListOfListsBuilder + + return ListOfListsBuilder( + self.context, + [("output", list_dtype)], + MASK_LIST_COMPRESSOR_BODY, + [ + _VectorArg(mask_dtype, "mask"), + ], + name_prefix="compress_list") + + @memoize_method + def get_matrix_compressor_kernel(self, mask_dtype, list_dtype): + from pyopencl.algorithm import ListOfListsBuilder + + return ListOfListsBuilder( + self.context, + [("output", list_dtype)], + MASK_MATRIX_COMPRESSOR_BODY, + [ + ScalarArg(np.int32, "ncols"), + ScalarArg(np.int32, "outer_stride"), + ScalarArg(np.int32, "inner_stride"), + _VectorArg(mask_dtype, "mask"), + ], + name_prefix="compress_matrix") + + def __call__(self, queue, mask, list_dtype=None): + """Convert a mask to a list in :ref:`csr` format. + + :arg mask: Either a 1D or 2D array. + * If *mask* is 1D, it should represent a masked list, where + *mask[i]* is true if and only if *i* is in the list. + * If *mask* is 2D, it should represent a list of masked lists, + so that *mask[i,j]* is true if and only if *j* is in list *i*. + + :arg list_dtype: The dtype for the output list(s). Defaults to the mask + dtype. + + :returns: The return value depends on the type of the input. + * If mask* is 1D, returns a tuple *(list, evt)*. + * If *mask* is 2D, returns a tuple *(starts, lists, event)*, as a + :ref:`csr` list. + """ + if list_dtype is None: + list_dtype = mask.dtype + + if len(mask.shape) == 1: + knl = self.get_list_compressor_kernel(mask.dtype, list_dtype) + result, evt = knl(queue, mask.shape[0], mask.data) + return (result["output"].lists, evt) + elif len(mask.shape) == 2: + # FIXME: This is efficient for small column sizes but may not be + # for larger ones since the work is partitioned by row. + knl = self.get_matrix_compressor_kernel(mask.dtype, list_dtype) + size = mask.dtype.itemsize + assert size > 0 + result, evt = knl(queue, mask.shape[0], mask.shape[1], + mask.strides[0] // size, mask.strides[1] // size, + mask.data) + return (result["output"].starts, result["output"].lists, evt) + else: + raise ValueError("unsupported dimensionality") + +# }}} + + +# {{{ Communication pattern for partial multipole expansions + +class AllReduceCommPattern: + """Describes a tree-like communication pattern for exchanging and reducing + multipole expansions. Supports an arbitrary number of processes. + + A user must instantiate a version of this with identical *size* and varying + *rank* on each rank. During each stage, each rank sends its contribution to + the reduction results on ranks returned by :meth:`sinks` and listens for + contributions from :meth:`source`. :meth:`messages` can be used for determining + array indices whose partial results need to be sent during the current stage. + Then, all ranks call :meth:`advance` and use :meth:`done` to check whether the + communication is complete. In the use case of multipole communication, the + reduction result is a vector of multipole expansions to which all ranks add + contribution. These contributions are communicated sparsely via arrays of box + indices and expansions. + + .. automethod:: __init__ + .. automethod:: sources + .. automethod:: sinks + .. automethod:: messages + .. automethod:: advance + .. automethod:: done + """ + + def __init__(self, rank, size): + """ + :arg rank: Current rank. + :arg size: Total number of ranks. + """ + assert 0 <= rank < size + self.rank = rank + self.left = 0 + self.right = size + self.midpoint = size // 2 + + def sources(self): + """Return the set of source nodes at the current communication stage. The + current rank receives messages from these ranks. + """ + if self.rank < self.midpoint: + partner = self.midpoint + (self.rank - self.left) + if self.rank == self.midpoint - 1 and partner == self.right: + partners = set() + elif self.rank == self.midpoint - 1 and partner == self.right - 2: + partners = {partner, partner + 1} + else: + partners = {partner} + else: + partner = self.left + (self.rank - self.midpoint) + if self.rank == self.right - 1 and partner == self.midpoint: + partners = set() + elif self.rank == self.right - 1 and partner == self.midpoint - 2: + partners = {partner, partner + 1} + else: + partners = {partner} + + return partners + + def sinks(self): + """Return the set of sink nodes at this communication stage. The current rank + sends a message to these ranks. + """ + if self.rank < self.midpoint: + partner = self.midpoint + (self.rank - self.left) + if partner == self.right: + partner -= 1 + else: + partner = self.left + (self.rank - self.midpoint) + if partner == self.midpoint: + partner -= 1 + + return {partner} + + def messages(self): + """Return a range of ranks, such that the partial results of array indices + used by these ranks are sent to the sinks. This is returned as a + [start, end) pair. By design, it is a consecutive range. + """ + if self.rank < self.midpoint: + return (self.midpoint, self.right) + else: + return (self.left, self.midpoint) + + def advance(self): + """Advance to the next stage in the communication pattern. + """ + if self.done(): + raise RuntimeError("finished communicating") + + if self.rank < self.midpoint: + self.right = self.midpoint + self.midpoint = (self.midpoint + self.left) // 2 + else: + self.left = self.midpoint + self.midpoint = (self.midpoint + self.right) // 2 + + def done(self): + """Return whether the current rank is finished communicating. + """ + return self.left + 1 == self.right + +# }}} + + +# {{{ MPI launcher + +def run_mpi(script, num_processes, env): + """Launch MPI processes. + + This function forks another process and uses mpiexec to launch *num_processes* + copies of program *script*. + + :arg script: the Python script to run + :arg num_processes: the number of copies of program to launch + :arg env: a Python `dict` of environment variables + """ + import subprocess + from mpi4py import MPI + + # Using "-m mpi4py" is necessary for avoiding deadlocks on exception cleanup + # See https://mpi4py.readthedocs.io/en/stable/mpi4py.run.html for details. + + mpi_library_name = MPI.Get_library_version() + if mpi_library_name.startswith("Open MPI"): + command = ["mpiexec", "-np", str(num_processes), "--oversubscribe"] + for env_variable_name in env: + command.append("-x") + command.append(env_variable_name) + command.extend([sys.executable, "-m", "mpi4py", script]) + + subprocess.run(command, env=env, check=True) + else: + subprocess.run( + ["mpiexec", "-np", str(num_processes), sys.executable, + "-m", "mpi4py", script], + env=env, check=True + ) + +# }}} + + +# {{{ HostDeviceArray + +class ImmutableHostDeviceArray: + """Interface for arrays on both host and device. + + .. note:: This interface assumes the array is immutable. The behavior of + modifying the content of either the host array or the device array is undefined. + + @TODO: Once available, replace this implementation with PyOpenCL's in-house + implementation. + """ + def __init__(self, queue, array): + self.queue = queue + self.shape = array.shape + self.host_array = None + self.device_array = None + + if isinstance(array, np.ndarray): + self.host_array = array + elif isinstance(array, cl.array.Array): + self.device_array = array + + def with_queue(self, queue): + self.queue = queue + + @property + def svm_capable(self): + svm_capabilities = \ + self.queue.device.get_info(cl.device_info.SVM_CAPABILITIES) + if svm_capabilities & cl.device_svm_capabilities.FINE_GRAIN_BUFFER != 0: + return True + else: + return False + + @property + def host(self): + if self.host_array is None: + self.host_array = self.device_array.get(self.queue) + return self.host_array + + @property + def device(self): + if self.device_array is None: + # @TODO: Use SVM + self.device_array = cl.array.to_device(self.queue, self.host_array) + + self.device_array.with_queue(self.queue) + return self.device_array + +# }}} + # vim: foldmethod=marker:filetype=pyopencl diff --git a/boxtree/traversal.py b/boxtree/traversal.py index d54912ba9280aeab42f67173dffe717f1ba53842..ce03738b04704eab7d68c5ee4be9ef514d0c5b53 100644 --- a/boxtree/traversal.py +++ b/boxtree/traversal.py @@ -285,11 +285,21 @@ void generate(LIST_ARG_DECL USER_ARG_DECL box_id_t box_id) { box_flags_t flags = box_flags[box_id]; - if (flags & BOX_HAS_OWN_SOURCES) - { APPEND_source_boxes(box_id); } + %if source_boxes_has_mask: + if (flags & BOX_HAS_OWN_SOURCES && source_boxes_mask[box_id]) + { APPEND_source_boxes(box_id); } + %else: + if (flags & BOX_HAS_OWN_SOURCES) + { APPEND_source_boxes(box_id); } + %endif - if (flags & BOX_HAS_CHILD_SOURCES) - { APPEND_source_parent_boxes(box_id); } + %if source_parent_boxes_has_mask: + if (flags & BOX_HAS_CHILD_SOURCES && source_parent_boxes_mask[box_id]) + { APPEND_source_parent_boxes(box_id); } + %else: + if (flags & BOX_HAS_CHILD_SOURCES) + { APPEND_source_parent_boxes(box_id); } + %endif %if not sources_are_targets: if (flags & BOX_HAS_OWN_TARGETS) @@ -1763,7 +1773,9 @@ class FMMTraversalBuilder: def get_kernel_info(self, dimensions, particle_id_dtype, box_id_dtype, coord_dtype, box_level_dtype, max_levels, sources_are_targets, sources_have_extent, targets_have_extent, - extent_norm): + extent_norm, + source_boxes_has_mask, + source_parent_boxes_has_mask): # {{{ process from_sep_smaller_crit @@ -1826,6 +1838,8 @@ class FMMTraversalBuilder: targets_have_extent=targets_have_extent, well_sep_is_n_away=self.well_sep_is_n_away, from_sep_smaller_crit=from_sep_smaller_crit, + source_boxes_has_mask=source_boxes_has_mask, + source_parent_boxes_has_mask=source_parent_boxes_has_mask ) from pyopencl.algorithm import ListOfListsBuilder from boxtree.tools import VectorArg, ScalarArg @@ -1839,6 +1853,12 @@ class FMMTraversalBuilder: + SOURCES_PARENTS_AND_TARGETS_TEMPLATE, strict_undefined=True).render(**render_vars) + arg_decls = [VectorArg(box_flags_enum.dtype, "box_flags")] + if source_boxes_has_mask: + arg_decls.append(VectorArg(np.int8, "source_boxes_mask")) + if source_parent_boxes_has_mask: + arg_decls.append(VectorArg(np.int8, "source_parent_boxes_mask")) + result["sources_parents_and_targets_builder"] = \ ListOfListsBuilder(self.context, [ @@ -1850,9 +1870,7 @@ class FMMTraversalBuilder: if not sources_are_targets else []), str(src), - arg_decls=[ - VectorArg(box_flags_enum.dtype, "box_flags"), - ], + arg_decls=arg_decls, debug=debug, name_prefix="sources_parents_and_targets") @@ -1971,18 +1989,25 @@ class FMMTraversalBuilder: # {{{ driver def __call__(self, queue, tree, wait_for=None, debug=False, - _from_sep_smaller_min_nsources_cumul=None): + _from_sep_smaller_min_nsources_cumul=None, + box_bounding_box=None, + source_boxes_mask=None, + source_parent_boxes_mask=None): """ :arg queue: A :class:`pyopencl.CommandQueue` instance. :arg tree: A :class:`boxtree.Tree` instance. :arg wait_for: may either be *None* or a list of :class:`pyopencl.Event` instances for whose completion this command waits before starting exeuction. + :arg source_boxes_mask: Only boxes passing this mask will be considered for + `source_boxes`. Used by the distributed implementation. + :arg source_parent_boxes_mask: Only boxes passing this mask will be + considered for `source_parent_boxes`. Used by the distributed + implementation. :return: A tuple *(trav, event)*, where *trav* is a new instance of :class:`FMMTraversalInfo` and *event* is a :class:`pyopencl.Event` for dependency management. """ - if _from_sep_smaller_min_nsources_cumul is None: # default to old no-threshold behavior _from_sep_smaller_min_nsources_cumul = 0 @@ -2006,7 +2031,9 @@ class FMMTraversalBuilder: tree.coord_dtype, tree.box_level_dtype, max_levels, tree.sources_are_targets, tree.sources_have_extent, tree.targets_have_extent, - tree.extent_norm) + tree.extent_norm, + source_boxes_mask is not None, + source_parent_boxes_mask is not None) def fin_debug(s): if debug: @@ -2020,8 +2047,16 @@ class FMMTraversalBuilder: fin_debug("building list of source boxes, their parents, and target boxes") + extra_args = [] + if source_boxes_mask is not None: + extra_args.append(source_boxes_mask) + if source_parent_boxes_mask is not None: + extra_args.append(source_parent_boxes_mask) + result, evt = knl_info.sources_parents_and_targets_builder( - queue, tree.nboxes, tree.box_flags, wait_for=wait_for) + queue, tree.nboxes, tree.box_flags, *extra_args, wait_for=wait_for + ) + wait_for = [evt] source_parent_boxes = result["source_parent_boxes"].lists @@ -2086,84 +2121,91 @@ class FMMTraversalBuilder: # {{{ box extents fin_debug("finding box extents") - - box_source_bounding_box_min = cl.array.empty( - queue, (tree.dimensions, tree.aligned_nboxes), - dtype=tree.coord_dtype) - box_source_bounding_box_max = cl.array.empty( - queue, (tree.dimensions, tree.aligned_nboxes), - dtype=tree.coord_dtype) - - if tree.sources_are_targets: - box_target_bounding_box_min = box_source_bounding_box_min - box_target_bounding_box_max = box_source_bounding_box_max + if box_bounding_box is not None: + box_target_bounding_box_min = cl.array.to_device( + queue, box_bounding_box["min"]) + box_target_bounding_box_max = cl.array.to_device( + queue, box_bounding_box["max"]) + box_source_bounding_box_min = None + box_source_bounding_box_max = None else: - box_target_bounding_box_min = cl.array.empty( + box_source_bounding_box_min = cl.array.empty( queue, (tree.dimensions, tree.aligned_nboxes), dtype=tree.coord_dtype) - box_target_bounding_box_max = cl.array.empty( + box_source_bounding_box_max = cl.array.empty( queue, (tree.dimensions, tree.aligned_nboxes), dtype=tree.coord_dtype) - bogus_radii_array = cl.array.empty(queue, 1, dtype=tree.coord_dtype) - - # nlevels-1 is the highest valid level index - for level in range(tree.nlevels-1, -1, -1): - start, stop = tree.level_start_box_nrs[level:level+2] - - for (skip, enable_radii, bbox_min, bbox_max, - pstarts, pcounts, radii_tree_attr, particles) in [ - ( - # never skip - False, - - tree.sources_have_extent, - box_source_bounding_box_min, - box_source_bounding_box_max, - tree.box_source_starts, - tree.box_source_counts_nonchild, - "source_radii", - tree.sources), - ( - # skip the 'target' round if sources and targets - # are the same. - tree.sources_are_targets, - - tree.targets_have_extent, - box_target_bounding_box_min, - box_target_bounding_box_max, - tree.box_target_starts, - tree.box_target_counts_nonchild, - "target_radii", - tree.targets), - ]: - - if skip: - continue - - args = ( + if tree.sources_are_targets: + box_target_bounding_box_min = box_source_bounding_box_min + box_target_bounding_box_max = box_source_bounding_box_max + else: + box_target_bounding_box_min = cl.array.empty( + queue, (tree.dimensions, tree.aligned_nboxes), + dtype=tree.coord_dtype) + box_target_bounding_box_max = cl.array.empty( + queue, (tree.dimensions, tree.aligned_nboxes), + dtype=tree.coord_dtype) + + bogus_radii_array = cl.array.empty(queue, 1, dtype=tree.coord_dtype) + + # nlevels-1 is the highest valid level index + for level in range(tree.nlevels-1, -1, -1): + start, stop = tree.level_start_box_nrs[level:level+2] + + for (skip, enable_radii, bbox_min, bbox_max, + pstarts, pcounts, radii_tree_attr, particles) in [ ( - tree.aligned_nboxes, - tree.box_child_ids, - tree.box_centers, - pstarts, pcounts,) - + tuple(particles) - + ( - getattr(tree, radii_tree_attr, bogus_radii_array), - enable_radii, - - bbox_min, - bbox_max)) - - evt = knl_info.box_extents_finder( - *args, - - range=slice(start, stop), - queue=queue, wait_for=wait_for) - - wait_for = [evt] - - del bogus_radii_array + # never skip + False, + + tree.sources_have_extent, + box_source_bounding_box_min, + box_source_bounding_box_max, + tree.box_source_starts, + tree.box_source_counts_nonchild, + "source_radii", + tree.sources), + ( + # skip the 'target' round if sources and targets + # are the same. + tree.sources_are_targets, + + tree.targets_have_extent, + box_target_bounding_box_min, + box_target_bounding_box_max, + tree.box_target_starts, + tree.box_target_counts_nonchild, + "target_radii", + tree.targets), + ]: + + if skip: + continue + + args = ( + ( + tree.aligned_nboxes, + tree.box_child_ids, + tree.box_centers, + pstarts, pcounts,) + + tuple(particles) + + ( + getattr(tree, radii_tree_attr, bogus_radii_array), + enable_radii, + + bbox_min, + bbox_max)) + + evt = knl_info.box_extents_finder( + *args, + + range=slice(start, stop), + queue=queue, wait_for=wait_for) + + wait_for = [evt] + + del bogus_radii_array # }}} diff --git a/boxtree/tree.py b/boxtree/tree.py index a0586c08b3eb3f67a7eddd50825c507d2cdbb088..cc672b032790a432067928e29f28000520bd26fd 100644 --- a/boxtree/tree.py +++ b/boxtree/tree.py @@ -379,11 +379,11 @@ class Tree(DeviceDataRecord): @property def nsources(self): - return len(self.user_source_ids) + return len(self.sources[0]) @property def ntargets(self): - return len(self.sorted_target_ids) + return len(self.targets[0]) @property def nlevels(self): @@ -460,6 +460,14 @@ class Tree(DeviceDataRecord): return super().to_device(queue, frozenset(exclude_fields)) + def to_host_device_array(self, queue, exclude_fields=frozenset()): + # level_start_box_nrs should remain in host memory + exclude_fields = set(exclude_fields) + exclude_fields.add("level_start_box_nrs") + + return super(Tree, self).to_host_device_array( + queue, frozenset(exclude_fields)) + # }}} diff --git a/doc/distributed.rst b/doc/distributed.rst new file mode 100644 index 0000000000000000000000000000000000000000..91fbe7cd60e9908ff6d53348b7037ffd5b3479de --- /dev/null +++ b/doc/distributed.rst @@ -0,0 +1,4 @@ +Distributed Computation +======================= + +.. automodule:: boxtree.distributed diff --git a/doc/index.rst b/doc/index.rst index 87ece52ba2574b944f3f12bd0f70e9e41ca9d1a1..e73de51a55fcc5da4e834a58f64c201f6bdb696f 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -41,6 +41,7 @@ Overview fmm lookup cost + distributed tools misc 🚀 Github diff --git a/pytest.ini b/pytest.ini index a1462477d1f8a9791c2e8c4e4644899acd644a73..72057bc54ede5842143e7f45b8b3a270cd7edc30 100644 --- a/pytest.ini +++ b/pytest.ini @@ -3,3 +3,4 @@ markers = opencl: uses OpenCL geo_lookup: test geometric lookups area_query: test area queries + mpi: test distributed FMM diff --git a/setup.py b/setup.py index ac579477805b0829fe419db2a858a88e9ef80077..fc663dd4789cb14bedb01f1da565030e1b32d099 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ def main(): - from setuptools import setup + from setuptools import setup, find_packages version_dict = {} init_filename = "boxtree/version.py" @@ -36,8 +36,7 @@ def main(): 'Topic :: Software Development :: Libraries', 'Topic :: Utilities', ], - - packages=["boxtree"], + packages=find_packages(), python_requires="~=3.6", install_requires=[ "pytools>=2018.4", diff --git a/test/test_distributed.py b/test/test_distributed.py new file mode 100644 index 0000000000000000000000000000000000000000..bab888a5fc23adf986780335f0e10734816df6b5 --- /dev/null +++ b/test/test_distributed.py @@ -0,0 +1,314 @@ +__copyright__ = "Copyright (C) 2021 Hao Gao" + +__license__ = """ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +import numpy as np +import pyopencl as cl +import numpy.linalg as la +from boxtree.pyfmmlib_integration import \ + Kernel, FMMLibTreeIndependentDataForWrangler, FMMLibExpansionWrangler +from boxtree.constant_one import ( + ConstantOneExpansionWrangler as ConstantOneExpansionWranglerBase, + ConstantOneTreeIndependentDataForWrangler) +from boxtree.tools import run_mpi +import logging +import os +import pytest +import sys + +# Note: Do not import mpi4py.MPI object at the module level, because OpenMPI does not +# support recursive invocations. + +# Configure logging +logging.basicConfig(level=os.environ.get("LOGLEVEL", "WARNING")) +logging.getLogger("boxtree.distributed").setLevel(logging.INFO) + + +def set_cache_dir(comm): + """Make each rank use a differnt cache location to avoid conflict. + """ + from pathlib import Path + if "XDG_CACHE_HOME" in os.environ: + cache_home = Path(os.environ["XDG_CACHE_HOME"]) + else: + cache_home = Path.home() / ".cache" + os.environ["XDG_CACHE_HOME"] = str(cache_home / str(comm.Get_rank())) + + +def _test_against_shared( + dims, nsources, ntargets, dtype, communicate_mpoles_via_allreduce=False): + from mpi4py import MPI + + # Get the current rank + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + set_cache_dir(comm) + + # Initialize arguments for worker processes + global_tree_dev = None + sources_weights = None + helmholtz_k = 0 + + # Configure PyOpenCL + ctx = cl.create_some_context() + queue = cl.CommandQueue(ctx) + + def fmm_level_to_nterms(tree, level): + return max(level, 3) + + from boxtree.traversal import FMMTraversalBuilder + tg = FMMTraversalBuilder(ctx, well_sep_is_n_away=2) + + tree_indep = FMMLibTreeIndependentDataForWrangler( + dims, Kernel.HELMHOLTZ if helmholtz_k else Kernel.LAPLACE) + + # Generate particles and run shared-memory parallelism on rank 0 + if rank == 0: + # Generate random particles and source weights + from boxtree.tools import make_normal_particle_array as p_normal + sources = p_normal(queue, nsources, dims, dtype, seed=15) + targets = p_normal(queue, ntargets, dims, dtype, seed=18) + + from pyopencl.clrandom import PhiloxGenerator + rng = PhiloxGenerator(queue.context, seed=20) + sources_weights = rng.uniform(queue, nsources, dtype=np.float64).get() + + from pyopencl.clrandom import PhiloxGenerator + rng = PhiloxGenerator(queue.context, seed=22) + target_radii = rng.uniform( + queue, ntargets, a=0, b=0.05, dtype=np.float64).get() + + # Build the tree and interaction lists + from boxtree import TreeBuilder + tb = TreeBuilder(ctx) + global_tree_dev, _ = tb( + queue, sources, targets=targets, target_radii=target_radii, + stick_out_factor=0.25, max_particles_in_box=30, debug=True) + + d_trav, _ = tg(queue, global_tree_dev, debug=True) + global_traversal_host = d_trav.get(queue=queue) + + # Get pyfmmlib expansion wrangler + wrangler = FMMLibExpansionWrangler( + tree_indep, global_traversal_host, + fmm_level_to_nterms=fmm_level_to_nterms) + + # Compute FMM using shared memory parallelism + from boxtree.fmm import drive_fmm + pot_fmm = drive_fmm(wrangler, [sources_weights]) * 2 * np.pi + + # Compute FMM using distributed memory parallelism + + def wrangler_factory(local_traversal, global_traversal): + from boxtree.distributed.calculation import \ + DistributedFMMLibExpansionWrangler + + return DistributedFMMLibExpansionWrangler( + queue.context, comm, tree_indep, local_traversal, global_traversal, + fmm_level_to_nterms=fmm_level_to_nterms, + communicate_mpoles_via_allreduce=communicate_mpoles_via_allreduce) + + from boxtree.distributed import DistributedFMMRunner + distribued_fmm_info = DistributedFMMRunner( + queue, global_tree_dev, tg, wrangler_factory, comm=comm) + + timing_data = {} + pot_dfmm = distribued_fmm_info.drive_dfmm( + [sources_weights], timing_data=timing_data) + assert timing_data + + # Uncomment the following section to print the time taken of each stage + """ + if rank == 1: + from pytools import Table + table = Table() + table.add_row(["stage", "time (s)"]) + for stage in timing_data: + table.add_row([stage, "%.2f" % timing_data[stage]["wall_elapsed"]]) + print(table) + """ + + if rank == 0: + error = (la.norm(pot_fmm - pot_dfmm * 2 * np.pi, ord=np.inf) + / la.norm(pot_fmm, ord=np.inf)) + print(error) + assert error < 1e-14 + + +@pytest.mark.mpi +@pytest.mark.parametrize( + "num_processes, dims, nsources, ntargets, communicate_mpoles_via_allreduce", [ + (4, 3, 10000, 10000, True), + (4, 3, 10000, 10000, False) + ] +) +def test_against_shared( + num_processes, dims, nsources, ntargets, communicate_mpoles_via_allreduce): + pytest.importorskip("mpi4py") + + newenv = os.environ.copy() + newenv["PYTEST"] = "1" + newenv["dims"] = str(dims) + newenv["nsources"] = str(nsources) + newenv["ntargets"] = str(ntargets) + newenv["communicate_mpoles_via_allreduce"] = \ + str(communicate_mpoles_via_allreduce) + newenv["OMP_NUM_THREADS"] = "1" + + run_mpi(__file__, num_processes, newenv) + + +def _test_constantone(dims, nsources, ntargets, dtype): + from boxtree.distributed.calculation import DistributedExpansionWrangler + + class ConstantOneExpansionWrangler( + ConstantOneExpansionWranglerBase, DistributedExpansionWrangler): + def __init__( + self, queue, comm, tree_indep, local_traversal, global_traversal): + DistributedExpansionWrangler.__init__( + self, queue, comm, global_traversal, + communicate_mpoles_via_allreduce=True) + ConstantOneExpansionWranglerBase.__init__( + self, tree_indep, local_traversal) + self.level_nterms = np.ones(local_traversal.tree.nlevels, dtype=np.int32) + + def reorder_sources(self, source_array): + if self.comm.Get_rank() == 0: + return source_array[self.global_traversal.tree.user_source_ids] + else: + return None + + def reorder_potentials(self, potentials): + if self.comm.Get_rank() == 0: + return potentials[self.global_traversal.tree.sorted_target_ids] + else: + return None + + from mpi4py import MPI + + # Get the current rank + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + set_cache_dir(comm) + + # Initialization + tree = None + sources_weights = None + + # Configure PyOpenCL + import pyopencl as cl + ctx = cl.create_some_context() + queue = cl.CommandQueue(ctx) + + from boxtree.traversal import FMMTraversalBuilder + tg = FMMTraversalBuilder(ctx) + + if rank == 0: + + # Generate random particles + from boxtree.tools import make_normal_particle_array as p_normal + sources = p_normal(queue, nsources, dims, dtype, seed=15) + targets = (p_normal(queue, ntargets, dims, dtype, seed=18) + + np.array([2, 0, 0])[:dims]) + + # Constant one source weights + sources_weights = np.ones((nsources,), dtype=dtype) + + # Build the global tree + from boxtree import TreeBuilder + tb = TreeBuilder(ctx) + tree, _ = tb(queue, sources, targets=targets, max_particles_in_box=30, + debug=True) + + tree_indep = ConstantOneTreeIndependentDataForWrangler() + + def wrangler_factory(local_traversal, global_traversal): + return ConstantOneExpansionWrangler( + queue, comm, tree_indep, local_traversal, global_traversal) + + from boxtree.distributed import DistributedFMMRunner + distributed_fmm_info = DistributedFMMRunner( + queue, tree, tg, wrangler_factory, comm=MPI.COMM_WORLD) + + pot_dfmm = distributed_fmm_info.drive_dfmm([sources_weights]) + + if rank == 0: + assert (np.all(pot_dfmm == nsources)) + + +@pytest.mark.mpi +@pytest.mark.parametrize("num_processes, dims, nsources, ntargets", [ + (4, 3, 10000, 10000) +]) +def test_constantone(num_processes, dims, nsources, ntargets): + pytest.importorskip("mpi4py") + + newenv = os.environ.copy() + newenv["PYTEST"] = "2" + newenv["dims"] = str(dims) + newenv["nsources"] = str(nsources) + newenv["ntargets"] = str(ntargets) + newenv["OMP_NUM_THREADS"] = "1" + + run_mpi(__file__, num_processes, newenv) + + +if __name__ == "__main__": + + dtype = np.float64 + + if "PYTEST" in os.environ: + if os.environ["PYTEST"] == "1": + # Run "test_against_shared" test case + dims = int(os.environ["dims"]) + nsources = int(os.environ["nsources"]) + ntargets = int(os.environ["ntargets"]) + + from distutils.util import strtobool + communicate_mpoles_via_allreduce = bool( + strtobool(os.environ["communicate_mpoles_via_allreduce"])) + + _test_against_shared( + dims, nsources, ntargets, dtype, communicate_mpoles_via_allreduce) + + elif os.environ["PYTEST"] == "2": + # Run "test_constantone" test case + dims = int(os.environ["dims"]) + nsources = int(os.environ["nsources"]) + ntargets = int(os.environ["ntargets"]) + + _test_constantone(dims, nsources, ntargets, dtype) + + else: + if len(sys.argv) > 1: + + # You can test individual routines by typing + # $ python test_distributed.py 'test_constantone(4, 3, 10000, 10000)' + exec(sys.argv[1]) + + elif len(sys.argv) == 1: + + # Run against_shared test case with default parameter + dims = 3 + nsources = 10000 + ntargets = 10000 + + _test_against_shared(dims, nsources, ntargets, dtype) diff --git a/test/test_tools.py b/test/test_tools.py index 32ec7d36e876025ed89d6d219d54624e9ba39e34..e8286b5d43ed436bfee484a7c2b47443748239a6 100644 --- a/test/test_tools.py +++ b/test/test_tools.py @@ -1,7 +1,5 @@ -import numpy as np -import pyopencl as cl - __copyright__ = "Copyright (C) 2012 Andreas Kloeckner \ + Copyright (C) 2017 Matt Wala \ Copyright (C) 2018 Hao Gao" __license__ = """ @@ -24,8 +22,109 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ - +import logging import pytest +import numpy as np +import pyopencl as cl +import pyopencl.array # noqa +from pyopencl.tools import ( # noqa + pytest_generate_tests_for_pyopencl as pytest_generate_tests) + + +logger = logging.getLogger(__name__) + + +@pytest.mark.parametrize("p", [1, 2, 3, 4, 5, 6, 7, 8, 16, 17]) +def test_allreduce_comm_pattern(p): + from boxtree.tools import AllReduceCommPattern + + # This models the parallel allreduce communication pattern. + + # processor -> communication pattern of the processor + patterns = [AllReduceCommPattern(i, p) for i in range(p)] + # processor -> list of data items on the processor + data = [[i] for i in range(p)] + from copy import deepcopy + + while not all(pat.done() for pat in patterns): + new_data = deepcopy(data) + + for i in range(p): + if patterns[i].done(): + for pat in patterns: + if not pat.done(): + assert i not in pat.sources() | pat.sinks() + continue + + # Check sources / sinks match up + for s in patterns[i].sinks(): + assert i in patterns[s].sources() + + for s in patterns[i].sources(): + assert i in patterns[s].sinks() + + # Send / recv data + for s in patterns[i].sinks(): + new_data[s].extend(data[i]) + + for pat in patterns: + if not pat.done(): + pat.advance() + data = new_data + + for item in data: + assert len(item) == p + assert set(item) == set(range(p)) + + +@pytest.mark.parametrize("order", "CF") +def test_masked_matrix_compression(ctx_getter, order): + cl_context = ctx_getter() + + from boxtree.tools import MaskCompressorKernel + matcompr = MaskCompressorKernel(cl_context) + + n = 40 + m = 10 + + np.random.seed(15) + + arr = (np.random.rand(n, m) > 0.5).astype(np.int8).copy(order=order) + + with cl.CommandQueue(cl_context) as q: + d_arr = cl.array.Array(q, (n, m), arr.dtype, order=order) + d_arr[:] = arr + arr_starts, arr_lists, evt = matcompr(q, d_arr) + cl.wait_for_events([evt]) + arr_starts = arr_starts.get(q) + arr_lists = arr_lists.get(q) + + for i in range(n): + items = arr_lists[arr_starts[i]:arr_starts[i+1]] + assert set(items) == set(arr[i].nonzero()[0]) + + +def test_masked_list_compression(ctx_getter): + cl_context = ctx_getter() + + from boxtree.tools import MaskCompressorKernel + listcompr = MaskCompressorKernel(cl_context) + + n = 20 + + np.random.seed(15) + + arr = (np.random.rand(n) > 0.5).astype(np.int8) + + with cl.CommandQueue(cl_context) as q: + d_arr = cl.array.to_device(q, arr) + arr_list, evt = listcompr(q, d_arr) + cl.wait_for_events([evt]) + arr_list = arr_list.get(q) + + assert set(arr_list) == set(arr.nonzero()[0]) + + from pyopencl.tools import ( # noqa pytest_generate_tests_for_pyopencl as pytest_generate_tests)