diff --git a/grudge/execution.py b/grudge/execution.py index cdc7579f850bfba9ff5384549bf668946990b98c..a12c6dbe029958611cf66ea24baeef1d39ff31d6 100644 --- a/grudge/execution.py +++ b/grudge/execution.py @@ -325,29 +325,49 @@ class ExecutionMapper(mappers.Evaluator, local_data = self.rec(insn.field).get(self.queue) comm = self.discrwb.mpi_communicator - send_req = comm.Isend(local_data, insn.i_remote_rank, tag=insn.tag) + # print("Sending data to rank %d with tag %d" + # % (insn.i_remote_rank, insn.send_tag)) + send_req = comm.Isend(local_data, insn.i_remote_rank, tag=insn.send_tag) remote_data_host = np.empty_like(local_data) - comm.Recv(remote_data_host, source=insn.i_remote_rank, tag=insn.tag) - send_req.wait() - remote_data = cl.array.to_device(self.queue, remote_data_host) - - return [(insn.name, remote_data)], [] - - # class Future: - # def is_ready(self): - # return comm.improbe(source=insn.i_remote_rank, tag=insn.tag) - # - # def __call__(self): - # remote_data_host = np.empty_like(local_data) - # comm.Recv(remote_data_host, source=insn.i_remote_rank, tag=insn.tag) - # send_req.wait() - # - # remote_data = cl.array.to_device(queue, remote_data_host) - # return [(insn.name, remote_data)], [] - # - # return [], [Future()] + recv_req = comm.Irecv(remote_data_host, insn.i_remote_rank, insn.recv_tag) + # Do all instructions complete before futures? + # FIXME: We CANNOT have any possibility of deadlock + # One option is to add an attribute that tells the scheduler that this should not be foreced + + class RecvFuture: + def __init__(self, recv_req, insn_name, remote_data_host, queue): + self.receive_request = recv_req + self.insn_name = insn_name + self.remote_data_host = remote_data_host + self.queue = queue + + def is_ready(self): + return self.receive_request.Test() + + def __call__(self): + # assert self.is_ready(), "RecvFuture was not ready to be called!" + self.receive_request.Wait() + remote_data = cl.array.to_device(self.queue, self.remote_data_host) + return [(self.insn_name, remote_data)], [] + + + class SendFuture: + def __init__(self, send_request): + self.send_request = send_request + + def is_ready(self): + return self.send_request.Test() + + def __call__(self): + # assert self.is_ready(), "SendFuture was not ready to be called!" + self.send_request.wait() + return [], [] + + + return [], [RecvFuture(recv_req, insn.name, remote_data_host, self.queue), + SendFuture(send_req)] def map_insn_loopy_kernel(self, insn): kwargs = {} @@ -558,6 +578,37 @@ def process_sym_operator(discrwb, sym_operator, post_bind_mapper=None, connected_parts = get_connected_partitions(volume_mesh) sym_operator = mappers.DistributedMapper(connected_parts)(sym_operator) + # TODO + # This MPI communication my not be necessary. The goal is to define unique and + # consistent tags for each OppSwap. This could be achieved by defining some + # ordering of these opperators and assigning tags accordingly. + comm = discrwb.mpi_communicator + i_local_rank = comm.Get_rank() + + # NOTE: MPITagCollector does not modify sym_operator + tag_mapper = mappers.MPITagCollector(i_local_rank) + sym_operator = tag_mapper(sym_operator) + + if len(tag_mapper.send_tag_lookups) > 0: + # TODO: Tag should probably be global + MPI_TAG_SEND_TAGS = 1729 + send_reqs = [] + for i_remote_rank in connected_parts: + send_tags = tag_mapper.send_tag_lookups[i_remote_rank] + send_reqs.append(comm.isend(send_tags, source=i_remote_rank, + tag=MPI_TAG_SEND_TAGS)) + + recv_tag_lookups = {} + for i_remote_rank in connected_parts: + recv_tags = comm.recv(source=i_remote_rank, tag=MPI_TAG_SEND_TAGS) + recv_tag_lookups[i_remote_rank] = recv_tags + + for req in send_reqs: + req.wait() + + sym_operator = mappers.MPITagDistributor(recv_tag_lookups, + i_local_rank)(sym_operator) + dumper("before-imass", sym_operator) sym_operator = mappers.InverseMassContractor()(sym_operator) diff --git a/grudge/symbolic/compiler.py b/grudge/symbolic/compiler.py index 450b3cd426dcdff3172cd900d4bafd61f941cc59..340ffb3a80d1433c1133713024710946e28e1ce2 100644 --- a/grudge/symbolic/compiler.py +++ b/grudge/symbolic/compiler.py @@ -222,7 +222,8 @@ class RankDataSwapAssign(Instruction): self.field = field self.i_remote_rank = op.i_remote_part self.dd_out = op.dd_out - self.tag = self.MPI_TAG_GRUDGE_DATA + op.mpi_tag_offset + self.send_tag = self.MPI_TAG_GRUDGE_DATA + op.send_tag_offset + self.recv_tag = self.MPI_TAG_GRUDGE_DATA + op.recv_tag_offset self.comment = "Swap data with rank %02d" % self.i_remote_rank @memoize_method @@ -235,9 +236,11 @@ class RankDataSwapAssign(Instruction): def __str__(self): return ("{\n" - " /* %s */\n" - " %s <- %s\n" - "}\n" % (self.comment, self.name, self.field)) + + " /* %s */\n" % self.comment + + " send_tag = %s\n" % self.send_tag + + " recv_tag = %s\n" % self.recv_tag + + " %s <- %s\n" % (self.name, self.field) + + "}") mapper_method = intern("map_insn_rank_data_swap") @@ -520,7 +523,9 @@ class Code(object): except self.NoInstructionAvailable: if futures: # no insn ready: we need a future to complete to continue + # FIXME: May induce deadlock in RankDataSwapAssign force_future = True + # pass else: # no futures, no available instructions: we're done break @@ -1170,7 +1175,7 @@ class OperatorCompiler(mappers.IdentityMapper): if isinstance(expr.op, sym.RefDiffOperatorBase): return self.map_ref_diff_op_binding(expr, codegen_state) elif isinstance(expr.op, sym.OppositePartitionFaceSwap): - return self.map_rank_data_swap_binding(expr, codegen_state) + return self.map_rank_data_swap_binding(expr, codegen_state, name_hint) else: # make sure operator assignments stand alone and don't get muddled # up in vector math @@ -1229,7 +1234,7 @@ class OperatorCompiler(mappers.IdentityMapper): return self.expr_to_var[expr] - def map_rank_data_swap_binding(self, expr, codegen_state): + def map_rank_data_swap_binding(self, expr, codegen_state, name_hint): try: return self.expr_to_var[expr] except KeyError: @@ -1242,7 +1247,7 @@ class OperatorCompiler(mappers.IdentityMapper): # self.expr_to_var[field] = field_var self.expr_to_var[expr] = self.assign_to_new_var(codegen_state, expr.op(field_var), - prefix="other") + prefix=name_hint) return self.expr_to_var[expr] # }}} diff --git a/grudge/symbolic/mappers/__init__.py b/grudge/symbolic/mappers/__init__.py index 9db1ab3173c76a8de63fb3862d273c430986c945..27713a484ddbb16fb1cc5aebe4694f18560e06c5 100644 --- a/grudge/symbolic/mappers/__init__.py +++ b/grudge/symbolic/mappers/__init__.py @@ -336,6 +336,50 @@ class OperatorBinder(CSECachingMapperMixin, IdentityMapper): # {{{ mappers for distributed computation +class MPITagCollector(CSECachingMapperMixin, IdentityMapper): + map_common_subexpression_uncached = IdentityMapper.map_common_subexpression + + def __init__(self, i_local_rank): + self.i_local_rank = i_local_rank + self.send_tag_lookups = {} + + def map_operator_binding(self, expr): + if isinstance(expr.op, op.OppositePartitionFaceSwap): + field = self.rec(expr.field) + i_remote_rank = expr.op.i_remote_part + # FIXME: Come up with a better key + # We MUST be sure that tags are UNIQUE for each pair of neighboring ranks + key = (field.field.index, self.i_local_rank, i_remote_rank) + tag = expr.op.send_tag_offset + if i_remote_rank not in self.send_tag_lookups: + self.send_tag_lookups[i_remote_rank] = {key: tag} + else: + assert key not in self.send_tag_lookups[i_remote_rank],\ + "Duplicate keys found in tag lookup" + self.send_tag_lookups[i_remote_rank][key] = tag + return expr + else: + return IdentityMapper.map_operator_binding(self, expr) + + +class MPITagDistributor(CSECachingMapperMixin, IdentityMapper): + map_common_subexpression_uncached = IdentityMapper.map_common_subexpression + + def __init__(self, recv_tag_lookups, i_local_rank): + self.recv_tag_lookups = recv_tag_lookups + self.i_local_rank = i_local_rank + + def map_operator_binding(self, expr): + if isinstance(expr.op, op.OppositePartitionFaceSwap): + field = self.rec(expr.field) + i_remote_rank = expr.op.i_remote_part + key = (field.field.index, i_remote_rank, self.i_local_rank) + expr.op.recv_tag_offset = self.recv_tag_lookups[i_remote_rank][key] + return expr + else: + return IdentityMapper.map_operator_binding(self, expr) + + class DistributedMapper(CSECachingMapperMixin, IdentityMapper): map_common_subexpression_uncached = IdentityMapper.map_common_subexpression @@ -379,9 +423,9 @@ class RankGeometryChanger(CSECachingMapperMixin, IdentityMapper): if (isinstance(expr.op, op.OppositeInteriorFaceSwap) and expr.op.dd_in == self.prev_dd and expr.op.dd_out == self.prev_dd): + field = self.rec(expr.field) return op.OppositePartitionFaceSwap(dd_in=self.new_dd, - dd_out=self.new_dd)( - self.rec(expr.field)) + dd_out=self.new_dd)(field) elif (isinstance(expr.op, op.InterpolationOperator) and expr.op.dd_out == self.prev_dd): return op.InterpolationOperator(dd_in=expr.op.dd_in, @@ -750,7 +794,7 @@ class StringifyMapper(pymbolic.mapper.stringifier.StringifyMapper): return "RefFaceM" + self._format_op_dd(expr) def map_opposite_partition_face_swap(self, expr, enclosing_prec): - return "RankSwap" + self._format_op_dd(expr) + return "PartSwap" + self._format_op_dd(expr) def map_opposite_interior_face_swap(self, expr, enclosing_prec): return "OppSwap" + self._format_op_dd(expr) diff --git a/grudge/symbolic/operators.py b/grudge/symbolic/operators.py index 7cdb3d2b4cf39b7d4e153f13092de4b1b38e5a98..041cac3969b8e79d05ab715fa0f59f70e9893b8a 100644 --- a/grudge/symbolic/operators.py +++ b/grudge/symbolic/operators.py @@ -408,8 +408,11 @@ class RefInverseMassOperator(RefMassOperatorBase): # {{{ boundary-related operators - class OppositePartitionFaceSwap(Operator): + # FIXME: Static attribute, super hacky + from itertools import count + _num_instances = count(0) + def __init__(self, dd_in=None, dd_out=None): sym = _sym() @@ -427,8 +430,8 @@ class OppositePartitionFaceSwap(Operator): raise ValueError("dd_out and dd_in must be identical") self.i_remote_part = self.dd_in.domain_tag.part_nr - # FIXME: We should have a unique offset for each instance on a particular rank - self.mpi_tag_offset = 0 + self.send_tag_offset = next(self._num_instances) + # self.recv_tag_offset = -0x3700d3e # Some magic bad value mapper_method = intern("map_opposite_partition_face_swap") diff --git a/test/test_mpi_communication.py b/test/test_mpi_communication.py index 3bf012f352d893cd9b27304e1be20433d3351448..96c460a3d4e5e8b5af825121427328606b806444 100644 --- a/test/test_mpi_communication.py +++ b/test/test_mpi_communication.py @@ -78,6 +78,9 @@ def simple_mpi_communication_entrypoint(): sym.interp(sym.BTAG_ALL, "all_faces")( sym.interp("vol", sym.BTAG_ALL)(sym.var("myfunc")))) + # FIXME: Since this is the second call to bind, something wierd happens with MPITagCollector + # and MPITagDistributor. I think it has distributed mesh but does not have any + # OppositePartitionFaceSwap operators bound_face_swap = bind(vol_discr, sym.interp("int_faces", "all_faces")( sym.OppositeInteriorFaceSwap("int_faces")( @@ -85,7 +88,7 @@ def simple_mpi_communication_entrypoint(): ) - (sym_all_faces_func - sym_bdry_faces_func) ) - print(bound_face_swap) + # print(bound_face_swap) # 1/0 hopefully_zero = bound_face_swap(queue, myfunc=myfunc) @@ -102,24 +105,24 @@ def simple_mpi_communication_entrypoint(): def mpi_communication_entrypoint(): cl_ctx = cl.create_some_context() queue = cl.CommandQueue(cl_ctx) - from meshmode.distributed import MPIMeshDistributor from mpi4py import MPI comm = MPI.COMM_WORLD - rank = comm.Get_rank() + i_local_rank = comm.Get_rank() num_parts = comm.Get_size() + from meshmode.distributed import MPIMeshDistributor mesh_dist = MPIMeshDistributor(comm) - dims = 2 + dim = 2 dt = 0.04 order = 4 if mesh_dist.is_mananger_rank(): from meshmode.mesh.generation import generate_regular_rect_mesh - mesh = generate_regular_rect_mesh(a=(-0.5,)*dims, - b=(0.5,)*dims, - n=(16,)*dims) + mesh = generate_regular_rect_mesh(a=(-0.5,)*dim, + b=(0.5,)*dim, + n=(16,)*dim) from pymetis import part_graph _, p = part_graph(num_parts, @@ -132,7 +135,7 @@ def mpi_communication_entrypoint(): local_mesh = mesh_dist.receive_mesh_part() vol_discr = DGDiscretizationWithBoundaries(cl_ctx, local_mesh, order=order, - mpi_communicator=comm) + mpi_communicator=comm) source_center = np.array([0.1, 0.22, 0.33])[:local_mesh.dim] source_width = 0.05 @@ -176,9 +179,9 @@ def mpi_communication_entrypoint(): dt_stepper = set_up_rk4("w", dt, fields, rhs) - final_t = 10 + final_t = 4 nsteps = int(final_t/dt) - print("rank=%d dt=%g nsteps=%d" % (rank, dt, nsteps)) + print("rank=%d dt=%g nsteps=%d" % (i_local_rank, dt, nsteps)) from grudge.shortcuts import make_visualizer vis = make_visualizer(vol_discr, vis_order=order) @@ -197,21 +200,20 @@ def mpi_communication_entrypoint(): step += 1 print(step, event.t, norm(queue, u=event.state_component[0]), - time()-t_last_step) + time()-t_last_step) + if step % 10 == 0: - vis.write_vtk_file("rank%d-fld-%04d.vtu" % (rank, step), - [ - ("u", event.state_component[0]), - ("v", event.state_component[1:]), - ]) + vis.write_vtk_file("rank%d-fld-%04d.vtu" % (i_local_rank, step), + [("u", event.state_component[0]), + ("v", event.state_component[1:])]) t_last_step = time() - logger.debug("Rank %d exiting", rank) + logger.debug("Rank %d exiting", i_local_rank) # {{{ MPI test pytest entrypoint @pytest.mark.mpi -@pytest.mark.parametrize("num_ranks", [2]) +@pytest.mark.parametrize("num_ranks", [3]) def test_mpi(num_ranks): pytest.importorskip("mpi4py") @@ -227,8 +229,7 @@ def test_mpi(num_ranks): @pytest.mark.mpi -@pytest.mark.parametrize("num_ranks", [2]) -def test_simple_mpi(num_ranks): +def test_simple_mpi(): pytest.importorskip("mpi4py") from subprocess import check_call @@ -236,6 +237,7 @@ def test_simple_mpi(num_ranks): newenv = os.environ.copy() newenv["RUN_WITHIN_MPI"] = "1" newenv["TEST_SIMPLE_MPI_COMMUNICATION"] = "1" + num_ranks = 2 check_call([ "mpiexec", "-np", str(num_ranks), "-x", "RUN_WITHIN_MPI", sys.executable, __file__],