From 202e431a235fbd026abe19a142cce2295d07a9f9 Mon Sep 17 00:00:00 2001 From: Andreas Kloeckner Date: Thu, 1 Feb 2018 18:03:18 -0600 Subject: [PATCH] Restructure MPI comm into setup/send+receive combo --- grudge/discretization.py | 49 ++++++++++++++++++++++++++++- grudge/execution.py | 39 ++++++++++++++--------- grudge/symbolic/mappers/__init__.py | 2 +- test/test_mpi_communication.py | 6 ++-- 4 files changed, 78 insertions(+), 18 deletions(-) diff --git a/grudge/discretization.py b/grudge/discretization.py index 6f39762c..bc59299f 100644 --- a/grudge/discretization.py +++ b/grudge/discretization.py @@ -23,7 +23,9 @@ THE SOFTWARE. """ +import six from pytools import memoize_method +import pyopencl as cl from grudge import sym import numpy as np @@ -47,7 +49,8 @@ class DGDiscretizationWithBoundaries(DiscretizationBase): .. automethod :: zeros """ - def __init__(self, cl_ctx, mesh, order, quad_min_degrees=None): + def __init__(self, cl_ctx, mesh, order, quad_min_degrees=None, + mpi_communicator=None): """ :param quad_min_degrees: A mapping from quadrature tags to the degrees to which the desired quadrature is supposed to be exact. @@ -74,6 +77,50 @@ class DGDiscretizationWithBoundaries(DiscretizationBase): # }}} + with cl.CommandQueue(cl_ctx) as queue: + self._dist_boundary_connections = \ + self._set_up_distributed_communication(mpi_communicator, queue) + + self.mpi_communicator = mpi_communicator + + def _set_up_distributed_communication(self, mpi_communicator, queue): + from_dd = sym.DOFDesc("vol", sym.QTAG_NONE) + + from meshmode.distributed import get_connected_partitions + connected_parts = get_connected_partitions(self._volume_discr.mesh) + + if mpi_communicator is None and connected_parts: + raise RuntimeError("must supply an MPI communicator when using a " + "distributed mesh") + + grp_factory = self.group_factory_for_quadrature_tag(sym.QTAG_NONE) + + setup_helpers = {} + boundary_connections = {} + + from meshmode.distributed import MPIBoundaryCommSetupHelper + for i_remote_part in connected_parts: + conn = self.connection_from_dds( + from_dd, + sym.DOFDesc(sym.BTAG_PARTITION(i_remote_part), sym.QTAG_NONE)) + setup_helper = setup_helpers[i_remote_part] = MPIBoundaryCommSetupHelper( + mpi_communicator, queue, conn, i_remote_part, grp_factory) + setup_helper.post_sends() + + for i_remote_part, setup_helper in six.iteritems(setup_helpers): + boundary_connections[i_remote_part] = setup_helper.complete_setup() + + return boundary_connections + + def get_distributed_boundary_swap_connection(self, dd): + if dd.quadrature_tag != sym.QTAG_NONE: + # FIXME + raise NotImplementedError("Distributed communication with quadrature") + + assert isinstance(dd.domain_tag, sym.BTAG_PARTITION) + + return self._dist_boundary_connections[dd.domain_tag.part_nr] + @memoize_method def discr_from_dd(self, dd): dd = sym.as_dofdesc(dd) diff --git a/grudge/execution.py b/grudge/execution.py index 56ae7c1c..1fdec1b9 100644 --- a/grudge/execution.py +++ b/grudge/execution.py @@ -36,6 +36,9 @@ import logging logger = logging.getLogger(__name__) +MPI_TAG_GRUDGE_DATA = 0x3700d3e + + # {{{ exec mapper class ExecutionMapper(mappers.Evaluator, @@ -246,20 +249,28 @@ class ExecutionMapper(mappers.Evaluator, return conn(self.queue, self.rec(field_expr)).with_queue(self.queue) def map_opposite_partition_face_swap(self, op, field_expr): - from mpi4py import MPI - mpi_comm = MPI.COMM_WORLD - - grp_factory = self.discrwb.group_factory_for_quadrature_tag(sym.QTAG_NONE) - - volume_discr = self.discrwb.discr_from_dd("vol") - from meshmode.distributed import MPIBoundaryCommunicator - bdry_conn_future = MPIBoundaryCommunicator(mpi_comm, self.queue, - volume_discr, - grp_factory, - op.i_remote_part) - # TODO: Need to tell the future what boundary data to transfer - bdry_conn, _ = bdry_conn_future() - return bdry_conn(self.queue, self.rec(field_expr)).with_queue(self.queue) + assert op.dd_in == op.dd_out + + bdry_conn = self.discrwb.get_distributed_boundary_swap_connection(op.dd_in) + loc_bdry_vec = self.rec(field_expr).get(self.queue) + + comm = self.discrwb.mpi_communicator + + remote_rank = op.dd_in.domain_tag.part_nr + + send_req = comm.Isend(loc_bdry_vec, remote_rank, + tag=MPI_TAG_GRUDGE_DATA) + + recv_vec_host = np.empty_like(loc_bdry_vec) + comm.Recv(recv_vec_host, source=remote_rank, tag=MPI_TAG_GRUDGE_DATA) + send_req.wait() + + recv_vec_dev = cl.array.to_device(self.queue, recv_vec_host) + + shuffled_recv_vec = bdry_conn(self.queue, recv_vec_dev) \ + .with_queue(self.queue) + + return shuffled_recv_vec def map_opposite_interior_face_swap(self, op, field_expr): dd = op.dd_in diff --git a/grudge/symbolic/mappers/__init__.py b/grudge/symbolic/mappers/__init__.py index ddba6c8d..a810a335 100644 --- a/grudge/symbolic/mappers/__init__.py +++ b/grudge/symbolic/mappers/__init__.py @@ -334,7 +334,7 @@ class OperatorBinder(CSECachingMapperMixin, IdentityMapper): # }}} -# {{{ distributed mappers +# {{{ mappers for distributed computation class DistributedMapper(CSECachingMapperMixin, IdentityMapper): map_common_subexpression_uncached = IdentityMapper.map_common_subexpression diff --git a/test/test_mpi_communication.py b/test/test_mpi_communication.py index 68901da5..208de1af 100644 --- a/test/test_mpi_communication.py +++ b/test/test_mpi_communication.py @@ -65,7 +65,8 @@ def simple_mpi_communication_entrypoint(order): else: local_mesh = mesh_dist.receive_mesh_part() - vol_discr = DGDiscretizationWithBoundaries(cl_ctx, local_mesh, order=order) + vol_discr = DGDiscretizationWithBoundaries(cl_ctx, local_mesh, order=order, + mpi_communicator=comm) sym_x = sym.nodes(local_mesh.dim) myfunc_symb = sym.sin(np.dot(sym_x, [2, 3])) @@ -129,7 +130,8 @@ def mpi_communication_entrypoint(): else: local_mesh = mesh_dist.receive_mesh_part() - vol_discr = DGDiscretizationWithBoundaries(cl_ctx, local_mesh, order=order) + vol_discr = DGDiscretizationWithBoundaries(cl_ctx, local_mesh, order=order, + mpi_communicator=comm) source_center = np.array([0.1, 0.22, 0.33])[:local_mesh.dim] source_width = 0.05 -- GitLab