diff --git a/grudge/execution.py b/grudge/execution.py index ff14c6f3bddd2555c3ab22dafb694845b38d7d6a..177cd52e281445bc312330064355662aafeb8fd8 100644 --- a/grudge/execution.py +++ b/grudge/execution.py @@ -281,6 +281,95 @@ class ExecutionMapper(mappers.Evaluator, def map_opposite_partition_face_swap(self, op, field_expr): raise NotImplementedError("map_opposite_partition_face_swap") + + # TODO: Fetch these variables + local_mesh = None + vol_discr = None + group_factory = None + TAG_SEND_MESH = 1 + + from mpi4py import MPI + comm = MPI.COMM_WORLD + # FIXME: Assumes rank 0 is a 'central hub' and + # i_part = rank - 1 for all other ranks + rank = comm.Get_rank() + num_parts = comm.Get_size() - 1 + + i_local_part = rank - 1 + local_bdry_conns = {} + for i_remote_part in range(num_parts): + if i_local_part == i_remote_part: + continue + # Mark faces within local_mesh that are connected to remote_mesh + from meshmode.discretization.connection import make_face_restriction + from meshmode.mesh import BTAG_PARTITION + # TODO: May not be necessary to compute every time + local_bdry_conns[i_remote_part] =\ + make_face_restriction(vol_discr, group_factory, + BTAG_PARTITION(i_remote_part)) + + # Send boundary data + send_reqs = [] + for i_remote_part in range(num_parts): + if i_local_part == i_remote_part: + continue + bdry_nodes = local_bdry_conns[i_remote_part].to_discr.nodes() + if bdry_nodes.size == 0: + # local_mesh is not connected to remote_mesh; send None + send_reqs.append(comm.isend(None, + dest=i_remote_part+1, + tag=TAG_SEND_MESH)) + continue + + # Gather information to send to other ranks + local_bdry = local_bdry_conns[i_remote_part].to_discr + local_adj_groups = [local_mesh.facial_adjacency_groups[i][None] + for i in range(len(local_mesh.groups))] + local_batches = [local_bdry_conns[i_remote_part].groups[i].batches + for i in range(len(local_mesh.groups))] + local_to_elem_faces = [[batch.to_element_face for batch in grp_batches] + for grp_batches in local_batches] + local_to_elem_indices = [[batch.to_element_indices.get(queue=self.queue) + for batch in grp_batches] + for grp_batches in local_batches] + + local_data = {'bdry_mesh': local_bdry.mesh, + 'adj': local_adj_groups, + 'to_elem_faces': local_to_elem_faces, + 'to_elem_indices': local_to_elem_indices} + send_reqs.append(comm.isend(local_data, + dest=i_remote_part+1, + tag=TAG_SEND_MESH)) + + # Receive boundary data + remote_buf = {} + for i_remote_part in range(num_parts): + if i_local_part == i_remote_part: + continue + remote_rank = i_remote_part + 1 + status = MPI.Status() + comm.probe(source=remote_rank, tag=TAG_SEND_MESH, status=status) + remote_buf[i_remote_part] = np.empty(status.count, dtype=bytes) + + recv_reqs = {} + for i_remote_part, buf in remote_buf.items(): + remote_rank = i_remote_part + 1 + recv_reqs[i_remote_part] = comm.irecv(buf=buf, + source=remote_rank, + tag=TAG_SEND_MESH) + + remote_data = {} + for i_remote_part, req in recv_reqs.items(): + status = MPI.Status() + remote_data[i_remote_part] = req.wait(status=status) + # Free the buffer + remote_buf[i_remote_part] = None # FIXME: Is this a good idea? + print('Rank {0}: Received rank {1} data ({2} bytes)' + .format(rank, i_remote_part + 1, status.count)) + + for req in send_reqs: + req.wait() + return None def map_opposite_interior_face_swap(self, op, field_expr):