From 849b9397c0f5744a0796af53afd041d3a8bd2e07 Mon Sep 17 00:00:00 2001 From: Andreas Kloeckner Date: Tue, 26 Jun 2018 23:10:47 -0500 Subject: [PATCH] Do not use func-local classes for MPI futures --- examples/wave/wave-min-mpi.py | 143 +++++++++++++++++++++++++ examples/wave/wave.py | 191 ---------------------------------- grudge/execution.py | 63 ++++++----- 3 files changed, 178 insertions(+), 219 deletions(-) create mode 100644 examples/wave/wave-min-mpi.py delete mode 100644 examples/wave/wave.py diff --git a/examples/wave/wave-min-mpi.py b/examples/wave/wave-min-mpi.py new file mode 100644 index 00000000..26d22226 --- /dev/null +++ b/examples/wave/wave-min-mpi.py @@ -0,0 +1,143 @@ +"""Minimal example of a grudge driver.""" + +from __future__ import division, print_function + +__copyright__ = "Copyright (C) 2015 Andreas Kloeckner" + +__license__ = """ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + + +import numpy as np +import pyopencl as cl +from grudge.shortcuts import set_up_rk4 +from grudge import sym, bind, DGDiscretizationWithBoundaries +from mpi4py import MPI + + +def main(write_output=True, order=4): + cl_ctx = cl.create_some_context() + queue = cl.CommandQueue(cl_ctx) + + comm = MPI.COMM_WORLD + num_parts = comm.Get_size() + + from meshmode.distributed import MPIMeshDistributor, get_partition_by_pymetis + mesh_dist = MPIMeshDistributor(comm) + + if mesh_dist.is_mananger_rank(): + dims = 2 + from meshmode.mesh.generation import generate_regular_rect_mesh + mesh = generate_regular_rect_mesh( + a=(-0.5,)*dims, + b=(0.5,)*dims, + n=(16,)*dims) + + print("%d elements" % mesh.nelements) + + part_per_element = get_partition_by_pymetis(mesh, num_parts) + + local_mesh = mesh_dist.send_mesh_parts(mesh, part_per_element, num_parts) + + del mesh + + else: + local_mesh = mesh_dist.receive_mesh_part() + + discr = DGDiscretizationWithBoundaries(cl_ctx, local_mesh, order=order, + mpi_communicator=comm) + + if local_mesh.dim == 2: + dt = 0.04 + elif local_mesh.dim == 3: + dt = 0.02 + + source_center = np.array([0.1, 0.22, 0.33])[:local_mesh.dim] + source_width = 0.05 + source_omega = 3 + + sym_x = sym.nodes(local_mesh.dim) + sym_source_center_dist = sym_x - source_center + sym_t = sym.ScalarVariable("t") + + from grudge.models.wave import StrongWaveOperator + from meshmode.mesh import BTAG_ALL, BTAG_NONE + op = StrongWaveOperator(-0.1, discr.dim, + source_f=( + sym.sin(source_omega*sym_t) + * sym.exp( + -np.dot(sym_source_center_dist, sym_source_center_dist) + / source_width**2)), + dirichlet_tag=BTAG_NONE, + neumann_tag=BTAG_NONE, + radiation_tag=BTAG_ALL, + flux_type="upwind") + + queue = cl.CommandQueue(discr.cl_context) + from pytools.obj_array import join_fields + fields = join_fields(discr.zeros(queue), + [discr.zeros(queue) for i in range(discr.dim)]) + + # FIXME + #dt = op.estimate_rk4_timestep(discr, fields=fields) + + op.check_bc_coverage(local_mesh) + + # print(sym.pretty(op.sym_operator())) + bound_op = bind(discr, op.sym_operator()) + + def rhs(t, w): + return bound_op(queue, t=t, w=w) + + dt_stepper = set_up_rk4("w", dt, fields, rhs) + + final_t = 10 + nsteps = int(final_t/dt) + print("dt=%g nsteps=%d" % (dt, nsteps)) + + from grudge.shortcuts import make_visualizer + vis = make_visualizer(discr, vis_order=order) + + step = 0 + + norm = bind(discr, sym.norm(2, sym.var("u"))) + + from time import time + t_last_step = time() + + for event in dt_stepper.run(t_end=final_t): + if isinstance(event, dt_stepper.StateComputed): + assert event.component_id == "w" + + step += 1 + + print(step, event.t, norm(queue, u=event.state_component[0]), + time()-t_last_step) + if step % 10 == 0: + vis.write_vtk_file("fld-%04d.vtu" % step, + [ + ("u", event.state_component[0]), + ("v", event.state_component[1:]), + ]) + t_last_step = time() + + +if __name__ == "__main__": + main() diff --git a/examples/wave/wave.py b/examples/wave/wave.py deleted file mode 100644 index 3d206d71..00000000 --- a/examples/wave/wave.py +++ /dev/null @@ -1,191 +0,0 @@ -# Copyright (C) 2007 Andreas Kloeckner -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - - -from __future__ import division -from __future__ import absolute_import -from __future__ import print_function -import numpy as np -from grudge.mesh import BTAG_ALL, BTAG_NONE -from six.moves import range - - -def main(write_output=True, - dir_tag=BTAG_NONE, neu_tag=TAG_NONE, rad_tag=BTAG_ALL, - flux_type_arg="upwind", dtype=np.float64, debug=[]): - from math import sin, cos, pi, exp, sqrt # noqa - - from grudge.backends import guess_run_context - rcon = guess_run_context() - - dim = 2 - - if dim == 1: - if rcon.is_head_rank: - from grudge.mesh.generator import make_uniform_1d_mesh - mesh = make_uniform_1d_mesh(-10, 10, 500) - elif dim == 2: - from grudge.mesh.generator import make_rect_mesh - if rcon.is_head_rank: - mesh = make_rect_mesh(a=(-0.5, -0.5), b=(0.5, 0.5), max_area=0.008) - elif dim == 3: - if rcon.is_head_rank: - from grudge.mesh.generator import make_ball_mesh - mesh = make_ball_mesh(max_volume=0.0005) - else: - raise RuntimeError("bad number of dimensions") - - if rcon.is_head_rank: - print("%d elements" % len(mesh.elements)) - mesh_data = rcon.distribute_mesh(mesh) - else: - mesh_data = rcon.receive_mesh() - - from grudge.timestep.runge_kutta import LSRK4TimeStepper - stepper = LSRK4TimeStepper(dtype=dtype) - - from grudge.models.wave import StrongWaveOperator - from grudge.mesh import BTAG_ALL, BTAG_NONE # noqa - - source_center = np.array([0.1, 0.22]) - source_width = 0.05 - source_omega = 3 - - import grudge.symbolic as sym - sym_x = sym.nodes(2) - sym_source_center_dist = sym_x - source_center - - op = StrongWaveOperator(-1, dim, - source_f= - sym.CFunction("sin")(source_omega*sym.ScalarParameter("t")) - * sym.CFunction("exp")( - -np.dot(sym_source_center_dist, sym_source_center_dist) - / source_width**2), - dirichlet_tag=dir_tag, - neumann_tag=neu_tag, - radiation_tag=rad_tag, - flux_type=flux_type_arg - ) - - discr = rcon.make_discretization(mesh_data, order=4, debug=debug, - default_scalar_type=dtype, - tune_for=op.sym_operator()) - - from grudge.visualization import VtkVisualizer - if write_output: - vis = VtkVisualizer(discr, rcon, "fld") - - from grudge.tools import join_fields - fields = join_fields(discr.volume_zeros(dtype=dtype), - [discr.volume_zeros(dtype=dtype) for i in range(discr.dimensions)]) - - # {{{ diagnostics setup - - from pytools.log import LogManager, \ - add_general_quantities, \ - add_simulation_quantities, \ - add_run_info - - if write_output: - log_file_name = "wave.dat" - else: - log_file_name = None - - logmgr = LogManager(log_file_name, "w", rcon.communicator) - add_run_info(logmgr) - add_general_quantities(logmgr) - add_simulation_quantities(logmgr) - discr.add_instrumentation(logmgr) - - from pytools.log import IntervalTimer - vis_timer = IntervalTimer("t_vis", "Time spent visualizing") - logmgr.add_quantity(vis_timer) - stepper.add_instrumentation(logmgr) - - from grudge.log import LpNorm - u_getter = lambda: fields[0] - logmgr.add_quantity(LpNorm(u_getter, discr, 1, name="l1_u")) - logmgr.add_quantity(LpNorm(u_getter, discr, name="l2_u")) - - logmgr.add_watches(["step.max", "t_sim.max", "l2_u", "t_step.max"]) - - # }}} - - # {{{ timestep loop - - rhs = op.bind(discr) - try: - from grudge.timestep import times_and_steps - step_it = times_and_steps( - final_time=4, logmgr=logmgr, - max_dt_getter=lambda t: op.estimate_timestep(discr, - stepper=stepper, t=t, fields=fields)) - - for step, t, dt in step_it: - if step % 10 == 0 and write_output: - visf = vis.make_file("fld-%04d" % step) - - vis.add_data(visf, - [ - ("u", discr.convert_volume(fields[0], kind="numpy")), - ("v", discr.convert_volume(fields[1:], kind="numpy")), - ], - time=t, - step=step) - visf.close() - - fields = stepper(fields, t, dt, rhs) - - assert discr.norm(fields) < 1 - assert fields[0].dtype == dtype - - finally: - if write_output: - vis.close() - - logmgr.close() - discr.close() - - # }}} - -if __name__ == "__main__": - main(True, BTAG_ALL, BTAG_NONE, TAG_NONE, "upwind", np.float64, - debug=["cuda_no_plan", "dump_optemplate_stages"]) - - -# {{{ entry points for py.test - -def test_wave(): - from pytools.test import mark_test - mark_long = mark_test.long - - yield ("dirichlet wave equation with SP data", mark_long(main), - False, BTAG_ALL, BTAG_NONE, TAG_NONE, "upwind", np.float64) - yield ("dirichlet wave equation with SP complex data", mark_long(main), - False, BTAG_ALL, BTAG_NONE, TAG_NONE, "upwind", np.complex64) - yield ("dirichlet wave equation with DP complex data", mark_long(main), - False, BTAG_ALL, BTAG_NONE, TAG_NONE, "upwind", np.complex128) - for flux_type in ["upwind", "central"]: - yield ("dirichlet wave equation with %s flux" % flux_type, - mark_long(main), - False, BTAG_ALL, BTAG_NONE, TAG_NONE, flux_type) - yield ("neumann wave equation", mark_long(main), - False, BTAG_NONE, BTAG_ALL, TAG_NONE) - yield ("radiation-bc wave equation", mark_long(main), - False, BTAG_NONE, TAG_NONE, BTAG_ALL) - -# }}} - -# ij diff --git a/grudge/execution.py b/grudge/execution.py index 875db9d9..f756d21b 100644 --- a/grudge/execution.py +++ b/grudge/execution.py @@ -333,34 +333,9 @@ class ExecutionMapper(mappers.Evaluator, remote_data_host = np.empty_like(local_data) recv_req = comm.Irecv(remote_data_host, insn.i_remote_rank, insn.recv_tag) - class RecvFuture: - def __init__(self, recv_req, insn_name, remote_data_host, queue): - self.receive_request = recv_req - self.insn_name = insn_name - self.remote_data_host = remote_data_host - self.queue = queue - - def is_ready(self): - return self.receive_request.Test() - - def __call__(self): - self.receive_request.Wait() - remote_data = cl.array.to_device(self.queue, self.remote_data_host) - return [(self.insn_name, remote_data)], [] - - class SendFuture: - def __init__(self, send_request): - self.send_request = send_request - - def is_ready(self): - return self.send_request.Test() - - def __call__(self): - self.send_request.wait() - return [], [] - - return [], [RecvFuture(recv_req, insn.name, remote_data_host, self.queue), - SendFuture(send_req)] + return [], [ + MPIRecvFuture(recv_req, insn.name, remote_data_host, self.queue), + MPISendFuture(send_req)] def map_insn_loopy_kernel(self, insn): kwargs = {} @@ -463,6 +438,38 @@ class ExecutionMapper(mappers.Evaluator, # }}} +# {{{ futures + +class MPIRecvFuture(object): + def __init__(self, recv_req, insn_name, remote_data_host, queue): + self.receive_request = recv_req + self.insn_name = insn_name + self.remote_data_host = remote_data_host + self.queue = queue + + def is_ready(self): + return self.receive_request.Test() + + def __call__(self): + self.receive_request.Wait() + remote_data = cl.array.to_device(self.queue, self.remote_data_host) + return [(self.insn_name, remote_data)], [] + + +class MPISendFuture(object): + def __init__(self, send_request): + self.send_request = send_request + + def is_ready(self): + return self.send_request.Test() + + def __call__(self): + self.send_request.wait() + return [], [] + +# }}} + + # {{{ bound operator class BoundOperator(object): -- GitLab