From 94af5e25b3898a50a916bde6dd9ef8d90a740d9e Mon Sep 17 00:00:00 2001 From: ellis Date: Wed, 23 Aug 2017 11:59:47 -0500 Subject: [PATCH] working --- meshmode/mesh/__init__.py | 10 +++ test/test_meshmode.py | 11 +++- test/testmpi.py | 130 ++++++++++++++++++++++++++++++++++++++ testmpi.py | 118 ---------------------------------- 4 files changed, 149 insertions(+), 120 deletions(-) create mode 100644 test/testmpi.py delete mode 100644 testmpi.py diff --git a/meshmode/mesh/__init__.py b/meshmode/mesh/__init__.py index b9f8e9fe..f5147001 100644 --- a/meshmode/mesh/__init__.py +++ b/meshmode/mesh/__init__.py @@ -548,6 +548,16 @@ class InterPartitionAdjacencyGroup(FacialAdjacencyGroup): .. versionadded:: 2017.1 """ + #FIXME + ''' + This is a weird error. When we try to pickle and unpickle a mesh, + neighbor_partitions does not exist anymore in + mesh.facial_adjacency_groups[i][None]. My guess was that pickle did not know + that property existed, so I created it. + ''' + neighbor_partitions = None + global_neighbors = None + def __init__(self, elements, element_faces, neighbors, diff --git a/test/test_meshmode.py b/test/test_meshmode.py index f0009aa7..c4944132 100644 --- a/test/test_meshmode.py +++ b/test/test_meshmode.py @@ -49,6 +49,13 @@ import logging logger = logging.getLogger(__name__) +@pytest.mark.parametrize("num_parts", [3]) +def test_interpartition_comm(num_parts): + from pytools.mpi import run_with_mpi_ranks + run_with_mpi_ranks("testmpi.py", num_parts + 1, interpartition_communication, + (num_parts,)) + + # {{{ partition_interpolation @pytest.mark.parametrize("group_factory", [PolynomialWarpAndBlendGroupFactory]) @@ -134,7 +141,7 @@ def test_partition_interpolation(ctx_getter, group_factory, dim, mesh_pars, # Gather just enough information for the connection local_bdry = local_bdry_conn.to_discr - local_mesh = local_bdry_conn.from_discr.mesh + local_mesh = part_meshes[i_local_part] local_adj_groups = [local_mesh.facial_adjacency_groups[i][None] for i in range(len(local_mesh.groups))] local_batches = [local_bdry_conn.groups[i].batches @@ -147,7 +154,7 @@ def test_partition_interpolation(ctx_getter, group_factory, dim, mesh_pars, for grp_batches in local_batches] remote_bdry = remote_bdry_conn.to_discr - remote_mesh = remote_bdry_conn.from_discr.mesh + remote_mesh = part_meshes[i_remote_mesh] remote_adj_groups = [remote_mesh.facial_adjacency_groups[i][None] for i in range(len(remote_mesh.groups))] remote_batches = [remote_bdry_conn.groups[i].batches diff --git a/test/testmpi.py b/test/testmpi.py new file mode 100644 index 00000000..5c142c10 --- /dev/null +++ b/test/testmpi.py @@ -0,0 +1,130 @@ +import numpy as np +import pyopencl as cl +import pytest + +def interpartition_communication(num_parts): + from mpi4py import MPI + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + + if rank == 0: + np.random.seed(42) + from meshmode.mesh.generation import generate_warped_rect_mesh + meshes = [generate_warped_rect_mesh(3, order=4, n=5) for _ in range(2)] + + from meshmode.mesh.processing import merge_disjoint_meshes + mesh = merge_disjoint_meshes(meshes) + + part_per_element = np.random.randint(num_parts, size=mesh.nelements) + + from meshmode.mesh.processing import partition_mesh + parts = [partition_mesh(mesh, part_per_element, i)[0] + for i in range(num_parts)] + + reqs = [] + for r in range(num_parts): + reqs.append(comm.isend(parts[r], dest=r+1, tag=1)) + print('Sent all mesh partitions.') + for req in reqs: + req.wait() + + elif (rank - 1) in range(num_parts): + status = MPI.Status() + local_mesh = comm.recv(source=0, tag=1, status=status) + print('Recieved mesh (size = {0})'.format(status.count)) + + from meshmode.discretization.poly_element\ + import PolynomialWarpAndBlendGroupFactory + group_factory = PolynomialWarpAndBlendGroupFactory(4) + cl_ctx = cl.create_some_context() + queue = cl.CommandQueue(cl_ctx) + + from meshmode.discretization import Discretization + vol_discr = Discretization(cl_ctx, local_mesh, group_factory) + + send_reqs = [] + i_local_part = rank - 1 + local_bdry_conns = {} + for i_remote_part in range(num_parts): + if i_local_part == i_remote_part: + continue + # Mark faces within local_mesh that are connected to remote_mesh + from meshmode.discretization.connection import make_face_restriction + from meshmode.mesh import BTAG_PARTITION + local_bdry_conns[i_remote_part] =\ + make_face_restriction(vol_discr, group_factory, + BTAG_PARTITION(i_remote_part)) + + for i_remote_part in range(num_parts): + if i_local_part == i_remote_part: + continue + bdry_nodes = local_bdry_conns[i_remote_part].to_discr.nodes() + if bdry_nodes.size == 0: + # local_mesh is not connected to remote_mesh, send None + send_reqs.append(comm.isend(None, dest=i_remote_part+1, tag=2)) + continue + + # Gather information to send to other ranks + local_bdry = local_bdry_conns[i_remote_part].to_discr + local_adj_groups = [local_mesh.facial_adjacency_groups[i][None] + for i in range(len(local_mesh.groups))] + local_batches = [local_bdry_conns[i_remote_part].groups[i].batches + for i in range(len(local_mesh.groups))] + local_to_elem_faces = [[batch.to_element_face for batch in grp_batches] + for grp_batches in local_batches] + local_to_elem_indices = [[batch.to_element_indices.get(queue=queue) + for batch in grp_batches] + for grp_batches in local_batches] + + local_data = {'bdry_mesh': local_bdry.mesh, + 'adj': local_adj_groups, + 'to_elem_faces': local_to_elem_faces, + 'to_elem_indices': local_to_elem_indices} + send_reqs.append(comm.isend(local_data, dest=i_remote_part+1, tag=2)) + + recv_reqs = {} + for i_remote_part in range(num_parts): + if i_local_part == i_remote_part: + continue + status = MPI.Status() + #TODO: Send size of data before to allocate a buffer. + recv_reqs[i_remote_part] = comm.irecv(buf=1000000, + source=i_remote_part+1, + tag=2) + remote_data = {} + for i_part, req in recv_reqs.items(): + remote_data[i_part] = req.wait(status=status) + print('Received remote data (size = {0})'.format(status.count)) + for req in send_reqs: + req.wait() + + connection = {} + for i_remote_part, data in remote_data.items(): + if data is None: + # Local mesh is not connected to remote mesh + continue + remote_bdry_mesh = data['bdry_mesh'] + remote_bdry = Discretization(cl_ctx, remote_bdry_mesh, group_factory) + remote_adj_groups = data['adj'] + remote_to_elem_faces = data['to_elem_faces'] + remote_to_elem_indices = data['to_elem_indices'] + # Connect local_mesh to remote_mesh + from meshmode.discretization.connection import make_partition_connection + connection[i_remote_part] =\ + make_partition_connection(local_bdry_conns[i_remote_part], + i_local_part, + remote_bdry, + remote_adj_groups, + remote_to_elem_faces, + remote_to_elem_indices) + from meshmode.discretization.connection import check_connection + check_connection(connection[i_remote_part]) + +if __name__ == "__main__": + import sys + from pytools.mpi import check_for_mpi_relaunch + check_for_mpi_relaunch(sys.argv) + + if len(sys.argv) > 1: + exec sys.argv[1] + diff --git a/testmpi.py b/testmpi.py deleted file mode 100644 index bb3c1978..00000000 --- a/testmpi.py +++ /dev/null @@ -1,118 +0,0 @@ -from mpi4py import MPI -import numpy as np -import pyopencl as cl - -comm = MPI.COMM_WORLD -rank = comm.Get_rank() - -num_parts = 3 -if rank == 0: - np.random.seed(42) - from meshmode.mesh.generation import generate_warped_rect_mesh - meshes = [generate_warped_rect_mesh(3, order=4, n=5) for _ in range(2)] - - from meshmode.mesh.processing import merge_disjoint_meshes - mesh = merge_disjoint_meshes(meshes) - - part_per_element = np.random.randint(num_parts, size=mesh.nelements) - - from meshmode.mesh.processing import partition_mesh - parts = [partition_mesh(mesh, part_per_element, i)[0] for i in range(num_parts)] - - reqs = [] - for r in range(num_parts): - reqs.append(comm.isend(parts[r], dest=r+1, tag=1)) - print('Sent all mesh parts.') - for req in reqs: - req.wait() - -elif (rank - 1) in range(num_parts): - mesh = comm.recv(source=0, tag=1) - print('Recieved mesh') - - cl_ctx = cl.create_some_context() - queue = cl.CommandQueue(cl_ctx) - - from meshmode.discretization.poly_element\ - import PolynomialWarpAndBlendGroupFactory - group_factory = PolynomialWarpAndBlendGroupFactory(4) - - from meshmode.discretization import Discretization - vol_discr = Discretization(cl_ctx, mesh, group_factory) - - send_reqs = [] - i_local_part = rank - 1 - local_bdry_conns = {} - for i_remote_part in range(num_parts): - if i_local_part == i_remote_part: - continue - # Mark faces within local_mesh that are connected to remote_mesh - from meshmode.discretization.connection import make_face_restriction - from meshmode.mesh import BTAG_PARTITION - local_bdry_conns[i_remote_part] =\ - make_face_restriction(vol_discr, group_factory, - BTAG_PARTITION(i_remote_part)) - - for i_remote_part in range(num_parts): - if i_local_part == i_remote_part: - continue - bdry_nodes = local_bdry_conns[i_remote_part].to_discr.nodes() - if bdry_nodes.size == 0: - # local_mesh is not connected to remote_mesh, send None - send_reqs.append(comm.isend(None, dest=i_remote_part+1, tag=2)) - continue - - # Gather information to send to other ranks - local_bdry = local_bdry_conns[i_remote_part].to_discr - local_mesh = local_bdry_conns[i_remote_part].from_discr.mesh - local_adj_groups = [local_mesh.facial_adjacency_groups[i][None] - for i in range(len(local_mesh.groups))] - local_batches = [local_bdry_conns[i_remote_part].groups[i].batches - for i in range(len(local_mesh.groups))] - local_to_elem_faces = [[batch.to_element_face for batch in grp_batches] - for grp_batches in local_batches] - local_to_elem_indices = [[batch.to_element_indices.get(queue=queue) - for batch in grp_batches] - for grp_batches in local_batches] - - print(local_bdry.groups) - local_data = {'bdry': local_bdry, - 'adj': local_adj_groups, - 'to_elem_faces': local_to_elem_faces, - 'to_elem_indices': local_to_elem_indices} - send_reqs.append(comm.isend(local_data, dest=i_remote_part+1, tag=2)) - - recv_reqs = {} - for i_remote_part in range(num_parts): - if i_local_part == i_remote_part: - continue - recv_reqs[i_remote_part] = comm.irecv(source=i_remote_part+1, tag=2) - - remote_data = {} - for i_part, req in recv_reqs.items(): - remote_data[i_part] = req.wait() - for req in send_reqs: - req.wait() - - - connection = {} - for i_remote_part, data in remote_data.items(): - if data is None: - # Local mesh is not connected to remote mesh - continue - remote_bdry = data['bdry'] - remote_adj_groups =data['adj'] - remote_to_elem_faces = data['to_elem_faces'] - remote_to_elem_indices = data['to_elem_indices'] - # Connect local_mesh to remote_mesh - from meshmode.discretization.connection import make_partition_connection - connection[i_remote_part] =\ - make_partition_connection(local_bdry_conns[i_remote_part], - i_local_part, - remote_bdry, - remote_adj_groups, - remote_to_elem_faces, - remote_to_elem_indices) - from meshmode.discretization.connection import check_connection - check_connection(connection[i_remote_part]) - -- GitLab