diff --git a/.gitignore b/.gitignore index ec1e4cd21c6b35558934b7024210a7bdade65313..94648fab1309424c88e2836ab4034eab05049871 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,4 @@ run-debug-* *.dat .cache +.pytest_cache diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index a865565eba7b02d42ed589e5ea173aa4ccdd51d9..1d6bb49c5289746c2712f5f800c98305bc15d58a 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -8,19 +8,51 @@ Python 2.7 POCL: tags: - python2.7 - pocl + - mpi except: - tags -Python 3.5 POCL: +Python 3.6 POCL: script: - - export PY_EXE=python3.5 + - export PY_EXE=python3.6 - export PYOPENCL_TEST=portable - export EXTRA_INSTALL="numpy mako" - curl -L -O -k https://gitlab.tiker.net/inducer/ci-support/raw/master/build-and-test-py-project.sh - ". ./build-and-test-py-project.sh" tags: - - python3.5 + - python3.6 - pocl + - mpi + except: + - tags + +Python 2.7 POCL MPI: + script: + - export PY_EXE=python2.7 + - export PYOPENCL_TEST=portable + - export EXTRA_INSTALL="numpy mako mpi4py pymetis" + - export PYTEST_ADDOPTS="-k mpi" + - curl -L -O -k https://gitlab.tiker.net/inducer/ci-support/raw/master/build-and-test-py-project.sh + - ". ./build-and-test-py-project.sh" + tags: + - python2.7 + - pocl + - mpi + except: + - tags + +Python 3.6 POCL MPI: + script: + - export PY_EXE=python3.6 + - export PYOPENCL_TEST=portable + - export EXTRA_INSTALL="numpy mako mpi4py pymetis" + - export PYTEST_ADDOPTS="-k mpi" + - curl -L -O -k https://gitlab.tiker.net/inducer/ci-support/raw/master/build-and-test-py-project.sh + - ". ./build-and-test-py-project.sh" + tags: + - python3.6 + - pocl + - mpi except: - tags @@ -30,7 +62,7 @@ Documentation: - curl -L -O -k https://gitlab.tiker.net/inducer/ci-support/raw/master/build-docs.sh - ". ./build-docs.sh" tags: - - python3.5 + - python3.6 only: - master @@ -39,6 +71,6 @@ Flake8: - curl -L -O -k https://gitlab.tiker.net/inducer/ci-support/raw/master/prepare-and-run-flake8.sh - ". ./prepare-and-run-flake8.sh grudge test" tags: - - python3.5 + - python3.6 except: - tags diff --git a/examples/benchmark_grudge/benchmark_mpi.py b/examples/benchmark_grudge/benchmark_mpi.py new file mode 100644 index 0000000000000000000000000000000000000000..38612322fc88ae75c05b7ec600eb63e9cbe28458 --- /dev/null +++ b/examples/benchmark_grudge/benchmark_mpi.py @@ -0,0 +1,134 @@ +import os +import numpy as np +import pyopencl as cl + +from grudge import sym, bind, DGDiscretizationWithBoundaries +from grudge.shortcuts import set_up_rk4 + + +def simple_wave_entrypoint(dim=2, num_elems=256, order=4, num_steps=30, + log_filename="grudge.dat"): + cl_ctx = cl.create_some_context() + queue = cl.CommandQueue(cl_ctx) + + from mpi4py import MPI + comm = MPI.COMM_WORLD + num_parts = comm.Get_size() + n = int(num_elems ** (1./dim)) + + from meshmode.distributed import MPIMeshDistributor + mesh_dist = MPIMeshDistributor(comm) + + if mesh_dist.is_mananger_rank(): + from meshmode.mesh.generation import generate_regular_rect_mesh + mesh = generate_regular_rect_mesh(a=(-0.5,)*dim, + b=(0.5,)*dim, + n=(n,)*dim) + + from pymetis import part_graph + _, p = part_graph(num_parts, + xadj=mesh.nodal_adjacency.neighbors_starts.tolist(), + adjncy=mesh.nodal_adjacency.neighbors.tolist()) + part_per_element = np.array(p) + + local_mesh = mesh_dist.send_mesh_parts(mesh, part_per_element, num_parts) + else: + local_mesh = mesh_dist.receive_mesh_part() + + vol_discr = DGDiscretizationWithBoundaries(cl_ctx, local_mesh, order=order, + mpi_communicator=comm) + + source_center = np.array([0.1, 0.22, 0.33])[:local_mesh.dim] + source_width = 0.05 + source_omega = 3 + + sym_x = sym.nodes(local_mesh.dim) + sym_source_center_dist = sym_x - source_center + sym_t = sym.ScalarVariable("t") + + from grudge.models.wave import StrongWaveOperator + from meshmode.mesh import BTAG_ALL, BTAG_NONE + op = StrongWaveOperator(-0.1, vol_discr.dim, + source_f=( + sym.sin(source_omega*sym_t) + * sym.exp( + -np.dot(sym_source_center_dist, sym_source_center_dist) + / source_width**2)), + dirichlet_tag=BTAG_NONE, + neumann_tag=BTAG_NONE, + radiation_tag=BTAG_ALL, + flux_type="upwind") + + from pytools.obj_array import join_fields + fields = join_fields(vol_discr.zeros(queue), + [vol_discr.zeros(queue) for i in range(vol_discr.dim)]) + + from pytools.log import LogManager, \ + add_general_quantities, \ + add_run_info, \ + IntervalTimer, EventCounter + # NOTE: LogManager hangs when using a file on a shared directory. + logmgr = LogManager(log_filename, "w", comm) + add_run_info(logmgr) + add_general_quantities(logmgr) + log_quantities =\ + {"rank_data_swap_timer": IntervalTimer("rank_data_swap_timer", + "Time spent evaluating RankDataSwapAssign"), + "rank_data_swap_counter": EventCounter("rank_data_swap_counter", + "Number of RankDataSwapAssign instructions evaluated"), + "exec_timer": IntervalTimer("exec_timer", + "Total time spent executing instructions"), + "insn_eval_timer": IntervalTimer("insn_eval_timer", + "Time spend evaluating instructions"), + "future_eval_timer": IntervalTimer("future_eval_timer", + "Time spent evaluating futures"), + "busy_wait_timer": IntervalTimer("busy_wait_timer", + "Time wasted doing busy wait")} + for quantity in log_quantities.values(): + logmgr.add_quantity(quantity) + + bound_op = bind(vol_discr, op.sym_operator()) + + def rhs(t, w): + val, rhs.profile_data = bound_op(queue, profile_data=rhs.profile_data, + log_quantities=log_quantities, + t=t, w=w) + return val + rhs.profile_data = {} + + dt = 0.04 + dt_stepper = set_up_rk4("w", dt, fields, rhs) + + logmgr.tick_before() + for event in dt_stepper.run(t_end=dt * num_steps): + if isinstance(event, dt_stepper.StateComputed): + logmgr.tick_after() + logmgr.tick_before() + logmgr.tick_after() + + def print_profile_data(data): + print("""execute() for rank %d: + \tInstruction Evaluation: %f%% + \tFuture Evaluation: %f%% + \tBusy Wait: %f%% + \tTotal: %f seconds""" % + (comm.Get_rank(), + data['insn_eval_time'] / data['total_time'] * 100, + data['future_eval_time'] / data['total_time'] * 100, + data['busy_wait_time'] / data['total_time'] * 100, + data['total_time'])) + + print_profile_data(rhs.profile_data) + logmgr.close() + + +if __name__ == "__main__": + assert "RUN_WITHIN_MPI" in os.environ, "Must run within mpi" + import sys + assert len(sys.argv) == 5, \ + "Usage: %s %s num_elems order num_steps logfile" \ + % (sys.executable, sys.argv[0]) + simple_wave_entrypoint(num_elems=int(sys.argv[1]), + order=int(sys.argv[2]), + num_steps=int(sys.argv[3]), + log_filename=sys.argv[4]) diff --git a/examples/benchmark_grudge/run_benchmark.sh b/examples/benchmark_grudge/run_benchmark.sh new file mode 100755 index 0000000000000000000000000000000000000000..72eaca2bdd62d934644333f8d78b5628df8217e9 --- /dev/null +++ b/examples/benchmark_grudge/run_benchmark.sh @@ -0,0 +1,123 @@ +#!/bin/bash + +# Weak scaling: We run our code on one computer, then we buy a second computer +# and we can run twice as much code in the same amount of time. + +# Strong scaling: We run our code on one computer, then we buy a second computer +# and we can run the same code in half the time. + +# Examples: +# ./run_benchmark.sh -t WEAK -n 100 -r 20 -s 1000 -l ~/weak_scaling.dat -o weak_scaling.txt +# ./run_benchmark.sh -t STRONG -n 100 -r 20 -s 1000 -l ~/strong_scaling.dat -o strong_scaling.txt + +set -eu + +# NOTE: benchmark_mpi.py hangs when logfile is in a shared directory. +USAGE="Usage: $0 -t -n num_elems -r order -s num_steps -l logfile -o outfile" +while getopts "t:n:r:s:l:o:" OPT; do + case $OPT in + t) + case $OPTARG in + WEAK) + SCALING_TYPE='WEAK' + ;; + STRONG) + SCALING_TYPE='STRONG' + ;; + *) + echo $USAGE + exit 1 + ;; + esac + ;; + n) + NUM_ELEMS=$OPTARG + ;; + r) + ORDER=$OPTARG + ;; + s) + NUM_STEPS=$OPTARG + ;; + l) + LOGFILE=$OPTARG + ;; + o) + OUTFILE=$OPTARG + ;; + *) + echo $USAGE + exit 1 + ;; + esac +done + + +# NOTE: We want to make sure we run grudge in the right environment. +SHARED="/home/eshoag2/shared" +source $SHARED/miniconda3/bin/activate inteq +PYTHON=$(which python) +BENCHMARK_MPI="$SHARED/grudge/examples/benchmark_grudge/benchmark_mpi.py" + +# Assume HOSTS_LIST is sorted in increasing order starting with one host. +HOSTS_LIST="\ +porter \ +porter,stout \ +porter,stout,koelsch" + +ENVIRONMENT_VARS="\ +-x RUN_WITHIN_MPI=1 \ +-x PYOPENCL_CTX=0 \ +-x POCL_AFFINITY=1" + +PERF_EVENTS="\ +cpu-cycles,\ +instructions,\ +task-clock" + +TEMPDIR=$(mktemp -d) +trap 'rm -rf $TEMPDIR' EXIT HUP INT QUIT TERM + +echo "$(date): Testing $SCALING_TYPE scaling" | tee -a $OUTFILE + +NUM_HOSTS=1 +BASE_NUM_ELEMS=$NUM_ELEMS +for HOSTS in $HOSTS_LIST; do + + if [ $SCALING_TYPE = 'WEAK' ]; then + NUM_ELEMS=$(echo $BASE_NUM_ELEMS $NUM_HOSTS | awk '{ print $1 * $2 }') + fi + + BENCHMARK_CMD="$PYTHON $BENCHMARK_MPI $NUM_ELEMS $ORDER $NUM_STEPS $LOGFILE.trial$NUM_HOSTS" + # NOTE: mpiexec recently updated so some things might act weird. + MPI_CMD="mpiexec --output-filename $TEMPDIR -H $HOSTS $ENVIRONMENT_VARS $BENCHMARK_CMD" + echo "Executing: $MPI_CMD" + + # NOTE: perf does not follow mpi accross different nodes. + # Instead, perf will follow all processes on the porter node. + echo "====================Using $NUM_HOSTS host(s)===================" >> $OUTFILE + START_TIME=$(date +%s) + perf stat --append -o $OUTFILE -e $PERF_EVENTS $MPI_CMD + DURATION=$(($(date +%s) - $START_TIME)) + echo "Finished in $DURATION seconds" + + echo "===================Output of Python===================" >> $OUTFILE + find $TEMPDIR -type f -exec cat {} \; >> $OUTFILE + echo "======================================================" >> $OUTFILE + rm -rf $TEMPDIR/* + + if [ $NUM_HOSTS -eq 1 ]; then + BASE_DURATION=$DURATION + fi + + # Efficiency is expected / actual + if [ $SCALING_TYPE = 'STRONG' ]; then + EFFICIENCY=$(echo $DURATION $BASE_DURATION $NUM_HOSTS | awk '{ print $2 / ($3 * $1) * 100"%" }') + elif [ $SCALING_TYPE = 'WEAK' ]; then + EFFICIENCY=$(echo $DURATION $BASE_DURATION | awk '{ print $2 / $1 * 100"%" }') + fi + + echo "Efficiency for $SCALING_TYPE scaling is $EFFICIENCY for $NUM_HOSTS host(s)." | tee -a $OUTFILE + + ((NUM_HOSTS++)) +done diff --git a/examples/wave/wave-min-mpi.py b/examples/wave/wave-min-mpi.py new file mode 100644 index 0000000000000000000000000000000000000000..04d0b8a371bef57013441932343085a22659782b --- /dev/null +++ b/examples/wave/wave-min-mpi.py @@ -0,0 +1,149 @@ +"""Minimal example of a grudge driver.""" + +from __future__ import division, print_function + +__copyright__ = "Copyright (C) 2015 Andreas Kloeckner" + +__license__ = """ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + + +import numpy as np +import pyopencl as cl +from grudge.shortcuts import set_up_rk4 +from grudge import sym, bind, DGDiscretizationWithBoundaries +from mpi4py import MPI + + +def main(write_output=True, order=4): + cl_ctx = cl.create_some_context() + queue = cl.CommandQueue(cl_ctx) + + comm = MPI.COMM_WORLD + num_parts = comm.Get_size() + + from meshmode.distributed import MPIMeshDistributor, get_partition_by_pymetis + mesh_dist = MPIMeshDistributor(comm) + + if mesh_dist.is_mananger_rank(): + dims = 2 + from meshmode.mesh.generation import generate_regular_rect_mesh + mesh = generate_regular_rect_mesh( + a=(-0.5,)*dims, + b=(0.5,)*dims, + n=(16,)*dims) + + print("%d elements" % mesh.nelements) + + part_per_element = get_partition_by_pymetis(mesh, num_parts) + + local_mesh = mesh_dist.send_mesh_parts(mesh, part_per_element, num_parts) + + del mesh + + else: + local_mesh = mesh_dist.receive_mesh_part() + + discr = DGDiscretizationWithBoundaries(cl_ctx, local_mesh, order=order, + mpi_communicator=comm) + + if local_mesh.dim == 2: + dt = 0.04 + elif local_mesh.dim == 3: + dt = 0.02 + + source_center = np.array([0.1, 0.22, 0.33])[:local_mesh.dim] + source_width = 0.05 + source_omega = 3 + + sym_x = sym.nodes(local_mesh.dim) + sym_source_center_dist = sym_x - source_center + sym_t = sym.ScalarVariable("t") + + from grudge.models.wave import StrongWaveOperator + from meshmode.mesh import BTAG_ALL, BTAG_NONE + op = StrongWaveOperator(-0.1, discr.dim, + source_f=( + sym.sin(source_omega*sym_t) + * sym.exp( + -np.dot(sym_source_center_dist, sym_source_center_dist) + / source_width**2)), + dirichlet_tag=BTAG_NONE, + neumann_tag=BTAG_NONE, + radiation_tag=BTAG_ALL, + flux_type="upwind") + + queue = cl.CommandQueue(discr.cl_context) + from pytools.obj_array import join_fields + fields = join_fields(discr.zeros(queue), + [discr.zeros(queue) for i in range(discr.dim)]) + + # FIXME + #dt = op.estimate_rk4_timestep(discr, fields=fields) + + op.check_bc_coverage(local_mesh) + + # print(sym.pretty(op.sym_operator())) + bound_op = bind(discr, op.sym_operator()) + + def rhs(t, w): + return bound_op(queue, t=t, w=w) + + dt_stepper = set_up_rk4("w", dt, fields, rhs) + + final_t = 10 + nsteps = int(final_t/dt) + print("dt=%g nsteps=%d" % (dt, nsteps)) + + from grudge.shortcuts import make_visualizer + vis = make_visualizer(discr, vis_order=order) + + step = 0 + + norm = bind(discr, sym.norm(2, sym.var("u"))) + + from time import time + t_last_step = time() + + rank = comm.Get_rank() + + for event in dt_stepper.run(t_end=final_t): + if isinstance(event, dt_stepper.StateComputed): + assert event.component_id == "w" + + step += 1 + + print(step, event.t, norm(queue, u=event.state_component[0]), + time()-t_last_step) + if step % 10 == 0: + vis.write_vtk_file( + "fld-%03d-%04d.vtu" % ( + rank, + step, + ), + [ + ("u", event.state_component[0]), + ("v", event.state_component[1:]), + ]) + t_last_step = time() + + +if __name__ == "__main__": + main() diff --git a/examples/wave/wave-min.py b/examples/wave/wave-min.py index 89f4bfa836abe91e305802a455fd19a729cac172..d7ebffd33a7732c621d159622804fed064f365b3 100644 --- a/examples/wave/wave-min.py +++ b/examples/wave/wave-min.py @@ -35,7 +35,7 @@ def main(write_output=True, order=4): cl_ctx = cl.create_some_context() queue = cl.CommandQueue(cl_ctx) - dims = 3 + dims = 2 from meshmode.mesh.generation import generate_regular_rect_mesh mesh = generate_regular_rect_mesh( a=(-0.5,)*dims, @@ -84,8 +84,6 @@ def main(write_output=True, order=4): # print(sym.pretty(op.sym_operator())) bound_op = bind(discr, op.sym_operator()) - # print(bound_op) - # 1/0 def rhs(t, w): return bound_op(queue, t=t, w=w) diff --git a/examples/wave/wave.py b/examples/wave/wave.py deleted file mode 100644 index 3d206d71ccac26b4360f0be97b69be201556a91f..0000000000000000000000000000000000000000 --- a/examples/wave/wave.py +++ /dev/null @@ -1,191 +0,0 @@ -# Copyright (C) 2007 Andreas Kloeckner -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - - -from __future__ import division -from __future__ import absolute_import -from __future__ import print_function -import numpy as np -from grudge.mesh import BTAG_ALL, BTAG_NONE -from six.moves import range - - -def main(write_output=True, - dir_tag=BTAG_NONE, neu_tag=TAG_NONE, rad_tag=BTAG_ALL, - flux_type_arg="upwind", dtype=np.float64, debug=[]): - from math import sin, cos, pi, exp, sqrt # noqa - - from grudge.backends import guess_run_context - rcon = guess_run_context() - - dim = 2 - - if dim == 1: - if rcon.is_head_rank: - from grudge.mesh.generator import make_uniform_1d_mesh - mesh = make_uniform_1d_mesh(-10, 10, 500) - elif dim == 2: - from grudge.mesh.generator import make_rect_mesh - if rcon.is_head_rank: - mesh = make_rect_mesh(a=(-0.5, -0.5), b=(0.5, 0.5), max_area=0.008) - elif dim == 3: - if rcon.is_head_rank: - from grudge.mesh.generator import make_ball_mesh - mesh = make_ball_mesh(max_volume=0.0005) - else: - raise RuntimeError("bad number of dimensions") - - if rcon.is_head_rank: - print("%d elements" % len(mesh.elements)) - mesh_data = rcon.distribute_mesh(mesh) - else: - mesh_data = rcon.receive_mesh() - - from grudge.timestep.runge_kutta import LSRK4TimeStepper - stepper = LSRK4TimeStepper(dtype=dtype) - - from grudge.models.wave import StrongWaveOperator - from grudge.mesh import BTAG_ALL, BTAG_NONE # noqa - - source_center = np.array([0.1, 0.22]) - source_width = 0.05 - source_omega = 3 - - import grudge.symbolic as sym - sym_x = sym.nodes(2) - sym_source_center_dist = sym_x - source_center - - op = StrongWaveOperator(-1, dim, - source_f= - sym.CFunction("sin")(source_omega*sym.ScalarParameter("t")) - * sym.CFunction("exp")( - -np.dot(sym_source_center_dist, sym_source_center_dist) - / source_width**2), - dirichlet_tag=dir_tag, - neumann_tag=neu_tag, - radiation_tag=rad_tag, - flux_type=flux_type_arg - ) - - discr = rcon.make_discretization(mesh_data, order=4, debug=debug, - default_scalar_type=dtype, - tune_for=op.sym_operator()) - - from grudge.visualization import VtkVisualizer - if write_output: - vis = VtkVisualizer(discr, rcon, "fld") - - from grudge.tools import join_fields - fields = join_fields(discr.volume_zeros(dtype=dtype), - [discr.volume_zeros(dtype=dtype) for i in range(discr.dimensions)]) - - # {{{ diagnostics setup - - from pytools.log import LogManager, \ - add_general_quantities, \ - add_simulation_quantities, \ - add_run_info - - if write_output: - log_file_name = "wave.dat" - else: - log_file_name = None - - logmgr = LogManager(log_file_name, "w", rcon.communicator) - add_run_info(logmgr) - add_general_quantities(logmgr) - add_simulation_quantities(logmgr) - discr.add_instrumentation(logmgr) - - from pytools.log import IntervalTimer - vis_timer = IntervalTimer("t_vis", "Time spent visualizing") - logmgr.add_quantity(vis_timer) - stepper.add_instrumentation(logmgr) - - from grudge.log import LpNorm - u_getter = lambda: fields[0] - logmgr.add_quantity(LpNorm(u_getter, discr, 1, name="l1_u")) - logmgr.add_quantity(LpNorm(u_getter, discr, name="l2_u")) - - logmgr.add_watches(["step.max", "t_sim.max", "l2_u", "t_step.max"]) - - # }}} - - # {{{ timestep loop - - rhs = op.bind(discr) - try: - from grudge.timestep import times_and_steps - step_it = times_and_steps( - final_time=4, logmgr=logmgr, - max_dt_getter=lambda t: op.estimate_timestep(discr, - stepper=stepper, t=t, fields=fields)) - - for step, t, dt in step_it: - if step % 10 == 0 and write_output: - visf = vis.make_file("fld-%04d" % step) - - vis.add_data(visf, - [ - ("u", discr.convert_volume(fields[0], kind="numpy")), - ("v", discr.convert_volume(fields[1:], kind="numpy")), - ], - time=t, - step=step) - visf.close() - - fields = stepper(fields, t, dt, rhs) - - assert discr.norm(fields) < 1 - assert fields[0].dtype == dtype - - finally: - if write_output: - vis.close() - - logmgr.close() - discr.close() - - # }}} - -if __name__ == "__main__": - main(True, BTAG_ALL, BTAG_NONE, TAG_NONE, "upwind", np.float64, - debug=["cuda_no_plan", "dump_optemplate_stages"]) - - -# {{{ entry points for py.test - -def test_wave(): - from pytools.test import mark_test - mark_long = mark_test.long - - yield ("dirichlet wave equation with SP data", mark_long(main), - False, BTAG_ALL, BTAG_NONE, TAG_NONE, "upwind", np.float64) - yield ("dirichlet wave equation with SP complex data", mark_long(main), - False, BTAG_ALL, BTAG_NONE, TAG_NONE, "upwind", np.complex64) - yield ("dirichlet wave equation with DP complex data", mark_long(main), - False, BTAG_ALL, BTAG_NONE, TAG_NONE, "upwind", np.complex128) - for flux_type in ["upwind", "central"]: - yield ("dirichlet wave equation with %s flux" % flux_type, - mark_long(main), - False, BTAG_ALL, BTAG_NONE, TAG_NONE, flux_type) - yield ("neumann wave equation", mark_long(main), - False, BTAG_NONE, BTAG_ALL, TAG_NONE) - yield ("radiation-bc wave equation", mark_long(main), - False, BTAG_NONE, TAG_NONE, BTAG_ALL) - -# }}} - -# ij diff --git a/grudge/discretization.py b/grudge/discretization.py index 289a8e82d00912143f58f3768937c61fc95d6308..20fc05053dd4cca70ee29b4909b045c52abc04e7 100644 --- a/grudge/discretization.py +++ b/grudge/discretization.py @@ -23,7 +23,9 @@ THE SOFTWARE. """ +import six from pytools import memoize_method +import pyopencl as cl from grudge import sym import numpy as np @@ -47,9 +49,10 @@ class DGDiscretizationWithBoundaries(DiscretizationBase): .. automethod :: zeros """ - def __init__(self, cl_ctx, mesh, order, quad_tag_to_group_factory=None): + def __init__(self, cl_ctx, mesh, order, quad_tag_to_group_factory=None, + mpi_communicator=None): """ - :param quad_min_degrees: A mapping from quadrature tags (typically + :param quad_tag_to_group_factory: A mapping from quadrature tags (typically strings--but may be any hashable/comparable object) to a :class:`meshmode.discretization.ElementGroupFactory` indicating with which quadrature discretization the operations are to be carried out, @@ -78,6 +81,60 @@ class DGDiscretizationWithBoundaries(DiscretizationBase): # }}} + with cl.CommandQueue(cl_ctx) as queue: + self._dist_boundary_connections = \ + self._set_up_distributed_communication(mpi_communicator, queue) + + self.mpi_communicator = mpi_communicator + + def get_management_rank_index(self): + return 0 + + def is_management_rank(self): + if self.mpi_communicator is None: + return True + else: + return self.mpi_communicator.Get_rank() \ + == self._get_management_rank_index() + + def _set_up_distributed_communication(self, mpi_communicator, queue): + from_dd = sym.DOFDesc("vol", sym.QTAG_NONE) + + from meshmode.distributed import get_connected_partitions + connected_parts = get_connected_partitions(self._volume_discr.mesh) + + if mpi_communicator is None and connected_parts: + raise RuntimeError("must supply an MPI communicator when using a " + "distributed mesh") + + grp_factory = self.group_factory_for_quadrature_tag(sym.QTAG_NONE) + + setup_helpers = {} + boundary_connections = {} + + from meshmode.distributed import MPIBoundaryCommSetupHelper + for i_remote_part in connected_parts: + conn = self.connection_from_dds( + from_dd, + sym.DOFDesc(sym.BTAG_PARTITION(i_remote_part), sym.QTAG_NONE)) + setup_helper = setup_helpers[i_remote_part] = MPIBoundaryCommSetupHelper( + mpi_communicator, queue, conn, i_remote_part, grp_factory) + setup_helper.post_sends() + + for i_remote_part, setup_helper in six.iteritems(setup_helpers): + boundary_connections[i_remote_part] = setup_helper.complete_setup() + + return boundary_connections + + def get_distributed_boundary_swap_connection(self, dd): + if dd.quadrature_tag != sym.QTAG_NONE: + # FIXME + raise NotImplementedError("Distributed communication with quadrature") + + assert isinstance(dd.domain_tag, sym.BTAG_PARTITION) + + return self._dist_boundary_connections[dd.domain_tag.part_nr] + @memoize_method def discr_from_dd(self, dd): dd = sym.as_dofdesc(dd) diff --git a/grudge/execution.py b/grudge/execution.py index cb52cc0bba39eb182f50fb40f54a14f8fe5427f1..9d665cb359c618bb0a07e3a030efef966cbbdc77 100644 --- a/grudge/execution.py +++ b/grudge/execution.py @@ -38,6 +38,9 @@ logger = logging.getLogger(__name__) from loopy.version import LOOPY_USE_LANGUAGE_VERSION_2018_1 # noqa: F401 +MPI_TAG_SEND_TAGS = 1729 + + # {{{ exec mapper class ExecutionMapper(mappers.Evaluator, @@ -248,6 +251,12 @@ class ExecutionMapper(mappers.Evaluator, conn = self.discrwb.connection_from_dds(op.dd_in, op.dd_out) return conn(self.queue, self.rec(field_expr)).with_queue(self.queue) + def map_opposite_partition_face_swap(self, op, field_expr): + assert op.dd_in == op.dd_out + bdry_conn = self.discrwb.get_distributed_boundary_swap_connection(op.dd_in) + remote_bdry_vec = self.rec(field_expr) # swapped by RankDataSwapAssign + return bdry_conn(self.queue, remote_bdry_vec).with_queue(self.queue) + def map_opposite_interior_face_swap(self, op, field_expr): return self.discrwb.opposite_face_connection()( self.queue, self.rec(field_expr)).with_queue(self.queue) @@ -311,7 +320,23 @@ class ExecutionMapper(mappers.Evaluator, # }}} - # {{{ code execution functions + # {{{ instruction execution functions + + def map_insn_rank_data_swap(self, insn): + local_data = self.rec(insn.field).get(self.queue) + comm = self.discrwb.mpi_communicator + + # print("Sending data to rank %d with tag %d" + # % (insn.i_remote_rank, insn.send_tag)) + send_req = comm.Isend(local_data, insn.i_remote_rank, tag=insn.send_tag) + + remote_data_host = np.empty_like(local_data) + recv_req = comm.Irecv(remote_data_host, insn.i_remote_rank, insn.recv_tag) + + return [], [ + MPIRecvFuture(recv_req, insn.name, remote_data_host, self.queue), + MPISendFuture(send_req)] + def map_insn_loopy_kernel(self, insn): kwargs = {} kdescr = insn.kernel_descriptor @@ -413,6 +438,38 @@ class ExecutionMapper(mappers.Evaluator, # }}} +# {{{ futures + +class MPIRecvFuture(object): + def __init__(self, recv_req, insn_name, remote_data_host, queue): + self.receive_request = recv_req + self.insn_name = insn_name + self.remote_data_host = remote_data_host + self.queue = queue + + def is_ready(self): + return self.receive_request.Test() + + def __call__(self): + self.receive_request.Wait() + remote_data = cl.array.to_device(self.queue, self.remote_data_host) + return [(self.insn_name, remote_data)], [] + + +class MPISendFuture(object): + def __init__(self, send_request): + self.send_request = send_request + + def is_ready(self): + return self.send_request.Test() + + def __call__(self): + self.send_request.wait() + return [], [] + +# }}} + + # {{{ bound operator class BoundOperator(object): @@ -436,7 +493,7 @@ class BoundOperator(object): + sep + str(self.eval_code)) - def __call__(self, queue, **context): + def __call__(self, queue, profile_data=None, log_quantities=None, **context): import pyopencl.array as cl_array def replace_queue(a): @@ -465,7 +522,9 @@ class BoundOperator(object): new_context[name] = with_object_array_or_scalar(replace_queue, var) return self.eval_code.execute( - ExecutionMapper(queue, new_context, self)) + ExecutionMapper(queue, new_context, self), + profile_data=profile_data, + log_quantities=log_quantities) # }}} @@ -475,6 +534,7 @@ class BoundOperator(object): def process_sym_operator(discrwb, sym_operator, post_bind_mapper=None, dumper=lambda name, sym_operator: None): + orig_sym_operator = sym_operator import grudge.symbolic.mappers as mappers dumper("before-bind", sym_operator) @@ -482,6 +542,30 @@ def process_sym_operator(discrwb, sym_operator, post_bind_mapper=None, mappers.ErrorChecker(discrwb.mesh)(sym_operator) + sym_operator = \ + mappers.OppositeInteriorFaceSwapUniqueIDAssigner()(sym_operator) + + # {{{ broadcast root rank's symn_operator + + # also make sure all ranks had same orig_sym_operator + + if discrwb.mpi_communicator is not None: + (mgmt_rank_orig_sym_operator, mgmt_rank_sym_operator) = \ + discrwb.mpi_communicator.bcast( + (orig_sym_operator, sym_operator), + discrwb.get_management_rank_index()) + + from pytools.obj_array import is_equal as is_oa_equal + if not is_oa_equal(mgmt_rank_orig_sym_operator, orig_sym_operator): + raise ValueError("rank %d received a different symbolic " + "operator to bind from rank %d" + % (discrwb.mpi_communicator.Get_rank(), + discrwb.get_management_rank_index())) + + sym_operator = mgmt_rank_sym_operator + + # }}} + if post_bind_mapper is not None: dumper("before-postbind", sym_operator) sym_operator = post_bind_mapper(sym_operator) @@ -514,6 +598,15 @@ def process_sym_operator(discrwb, sym_operator, post_bind_mapper=None, dumper("before-global-to-reference", sym_operator) sym_operator = mappers.GlobalToReferenceMapper(discrwb.ambient_dim)(sym_operator) + dumper("before-distributed", sym_operator) + + volume_mesh = discrwb.discr_from_dd("vol").mesh + from meshmode.distributed import get_connected_partitions + connected_parts = get_connected_partitions(volume_mesh) + + if connected_parts: + sym_operator = mappers.DistributedMapper(connected_parts)(sym_operator) + dumper("before-imass", sym_operator) sym_operator = mappers.InverseMassContractor()(sym_operator) diff --git a/grudge/symbolic/compiler.py b/grudge/symbolic/compiler.py index bd19a6c3d1b8246d5145dbdc6db454c391b85199..976beed9b8cc6ec20d6420203509f0ba954372ff 100644 --- a/grudge/symbolic/compiler.py +++ b/grudge/symbolic/compiler.py @@ -201,6 +201,48 @@ class Assign(AssignBase): mapper_method = intern("map_insn_assign") +class RankDataSwapAssign(Instruction): + """ + .. attribute:: name + .. attribute:: field + .. attribute:: i_remote_rank + + The number of the remote rank that this instruction swaps data with. + + .. attribute:: dd_out + .. attribute:: comment + """ + # TODO: We need to be sure this does not conflict with some other tag. + MPI_TAG_GRUDGE_DATA_BASE = 0x3700d3e + + def __init__(self, name, field, op): + self.name = name + self.field = field + self.i_remote_rank = op.i_remote_part + self.dd_out = op.dd_out + self.send_tag = self.MPI_TAG_GRUDGE_DATA_BASE + op.unique_id + self.recv_tag = self.MPI_TAG_GRUDGE_DATA_BASE + op.unique_id + self.comment = "Swap data with rank %02d" % self.i_remote_rank + + @memoize_method + def get_assignees(self): + return set([self.name]) + + @memoize_method + def get_dependencies(self): + return _make_dep_mapper(include_subscripts=False)(self.field) + + def __str__(self): + return ("{\n" + + " /* %s */\n" % self.comment + + " send_tag = %s\n" % self.send_tag + + " recv_tag = %s\n" % self.recv_tag + + " %s <- %s\n" % (self.name, self.field) + + "}") + + mapper_method = intern("map_insn_rank_data_swap") + + class ToDiscretizationScopedAssign(Assign): scope_indicator = "(to discr)-" @@ -337,7 +379,7 @@ class Code(object): def __init__(self, instructions, result): self.instructions = instructions self.result = result - self.last_schedule = None + # self.last_schedule = None self.static_schedule_attempts = 5 def dump_dataflow_graph(self): @@ -433,78 +475,103 @@ class Code(object): return argmax2(available_insns), discardable_vars - def execute_dynamic(self, exec_mapper, pre_assign_check=None): - """Execute the instruction stream, make all scheduling decisions - dynamically. Record the schedule in *self.last_schedule*. - """ - schedule = [] - + def execute(self, exec_mapper, pre_assign_check=None, profile_data=None, + log_quantities=None): + if profile_data is not None: + from time import time + start_time = time() + if profile_data == {}: + profile_data['insn_eval_time'] = 0 + profile_data['future_eval_time'] = 0 + profile_data['busy_wait_time'] = 0 + profile_data['total_time'] = 0 + if log_quantities is not None: + exec_sub_timer = log_quantities["exec_timer"].start_sub_timer() context = exec_mapper.context - next_future_id = 0 futures = [] done_insns = set() - force_future = False - while True: - insn = None - discardable_vars = [] - - # check futures for completion - - i = 0 - while i < len(futures): - future = futures[i] - if force_future or future.is_ready(): - futures.pop(i) - - insn = self.EvaluateFuture(future.id) + try: + if profile_data is not None: + insn_start_time = time() + if log_quantities is not None: + insn_sub_timer = \ + log_quantities["insn_eval_timer"].start_sub_timer() - assignments, new_futures = future() - force_future = False - break - else: - i += 1 + insn, discardable_vars = self.get_next_step( + frozenset(list(context.keys())), + frozenset(done_insns)) - del future + done_insns.add(insn) + for name in discardable_vars: + del context[name] - # if no future got processed, pick the next insn - if insn is None: - try: - insn, discardable_vars = self.get_next_step( - frozenset(list(context.keys())), - frozenset(done_insns)) - - except self.NoInstructionAvailable: - if futures: - # no insn ready: we need a future to complete to continue - force_future = True - else: - # no futures, no available instructions: we're done - break - else: - for name in discardable_vars: - del context[name] + mapper_method = getattr(exec_mapper, insn.mapper_method) + if log_quantities is not None: + if isinstance(insn, RankDataSwapAssign): + from pytools.log import time_and_count_function + mapper_method = time_and_count_function( + mapper_method, + log_quantities["rank_data_swap_timer"], + log_quantities["rank_data_swap_counter"]) - done_insns.add(insn) - mapper_method = getattr(exec_mapper, insn.mapper_method) - assignments, new_futures = mapper_method(insn) + assignments, new_futures = mapper_method(insn) - if insn is not None: for target, value in assignments: if pre_assign_check is not None: pre_assign_check(target, value) - context[target] = value futures.extend(new_futures) + if profile_data is not None: + profile_data['insn_eval_time'] += time() - insn_start_time + if log_quantities is not None: + insn_sub_timer.stop().submit() + except self.NoInstructionAvailable: + if not futures: + # No more instructions or futures. We are done. + break - schedule.append((discardable_vars, insn, len(new_futures))) - - for future in new_futures: - future.id = next_future_id - next_future_id += 1 + # Busy wait for a new future + if profile_data is not None: + busy_wait_start_time = time() + if log_quantities is not None: + busy_sub_timer =\ + log_quantities["busy_wait_timer"].start_sub_timer() + + did_eval_future = False + while not did_eval_future: + for i in range(len(futures)): + if futures[i].is_ready(): + if profile_data is not None: + profile_data['busy_wait_time'] +=\ + time() - busy_wait_start_time + future_start_time = time() + if log_quantities is not None: + busy_sub_timer.stop().submit() + future_sub_timer =\ + log_quantities["future_eval_timer"]\ + .start_sub_timer() + + future = futures.pop(i) + assignments, new_futures = future() + + for target, value in assignments: + if pre_assign_check is not None: + pre_assign_check(target, value) + context[target] = value + + futures.extend(new_futures) + did_eval_future = True + + if profile_data is not None: + profile_data['future_eval_time'] +=\ + time() - future_start_time + if log_quantities is not None: + future_sub_timer.stop().submit() + break if len(done_insns) < len(self.instructions): print("Unreachable instructions:") @@ -514,72 +581,15 @@ class Code(object): raise RuntimeError("not all instructions are reachable" "--did you forget to pass a value for a placeholder?") - if self.static_schedule_attempts: - self.last_schedule = schedule - - from pytools.obj_array import with_object_array_or_scalar - return with_object_array_or_scalar(exec_mapper, self.result) - - # }}} - - # {{{ static schedule execution - - class EvaluateFuture(object): - """A fake 'instruction' that represents evaluation of a future.""" - def __init__(self, future_id): - self.future_id = future_id - - def execute(self, exec_mapper, pre_assign_check=None): - """If we have a saved, static schedule for this instruction stream, - execute it. Otherwise, punt to the dynamic scheduler below. - """ - - if self.last_schedule is None: - return self.execute_dynamic(exec_mapper, pre_assign_check) - - context = exec_mapper.context - id_to_future = {} - next_future_id = 0 - - schedule_is_delay_free = True - - for discardable_vars, insn, new_future_count in self.last_schedule: - for name in discardable_vars: - del context[name] - - if isinstance(insn, self.EvaluateFuture): - future = id_to_future.pop(insn.future_id) - if not future.is_ready(): - schedule_is_delay_free = False - assignments, new_futures = future() - del future - else: - mapper_method = getattr(exec_mapper, insn.mapper_method) - assignments, new_futures = mapper_method(insn) - - for target, value in assignments: - if pre_assign_check is not None: - pre_assign_check(target, value) - - context[target] = value - - if len(new_futures) != new_future_count: - raise RuntimeError("static schedule got an unexpected number " - "of futures") - - for future in new_futures: - id_to_future[next_future_id] = future - next_future_id += 1 - - if not schedule_is_delay_free: - self.last_schedule = None - self.static_schedule_attempts -= 1 - + if log_quantities is not None: + exec_sub_timer.stop().submit() from pytools.obj_array import with_object_array_or_scalar + if profile_data is not None: + profile_data['total_time'] += time() - start_time + return (with_object_array_or_scalar(exec_mapper, self.result), + profile_data) return with_object_array_or_scalar(exec_mapper, self.result) - # }}} - # }}} @@ -1002,6 +1012,9 @@ class ToLoopyInstructionMapper(object): governing_dd=governing_dd) ) + def map_insn_rank_data_swap(self, insn): + return insn + def map_insn_assign_to_discr_scoped(self, insn): return insn @@ -1191,6 +1204,8 @@ class OperatorCompiler(mappers.IdentityMapper): def map_operator_binding(self, expr, codegen_state, name_hint=None): if isinstance(expr.op, sym.RefDiffOperatorBase): return self.map_ref_diff_op_binding(expr, codegen_state) + elif isinstance(expr.op, sym.OppositePartitionFaceSwap): + return self.map_rank_data_swap_binding(expr, codegen_state, name_hint) else: # make sure operator assignments stand alone and don't get muddled # up in vector math @@ -1249,6 +1264,20 @@ class OperatorCompiler(mappers.IdentityMapper): return self.expr_to_var[expr] + def map_rank_data_swap_binding(self, expr, codegen_state, name_hint): + try: + return self.expr_to_var[expr] + except KeyError: + field = self.rec(expr.field, codegen_state) + name = self.name_gen("raw_rank%02d_bdry_data" % expr.op.i_remote_part) + field_insn = RankDataSwapAssign(name=name, field=field, op=expr.op) + codegen_state.get_code_list(self).append(field_insn) + field_var = Variable(field_insn.name) + self.expr_to_var[expr] = self.assign_to_new_var(codegen_state, + expr.op(field_var), + prefix=name_hint) + return self.expr_to_var[expr] + # }}} # }}} diff --git a/grudge/symbolic/dofdesc_inference.py b/grudge/symbolic/dofdesc_inference.py index 7e1de6058e046154d2f5a254cb0f4c103f1eb09b..92be126f7f3f081b668247e1fe25a73b122a4887 100644 --- a/grudge/symbolic/dofdesc_inference.py +++ b/grudge/symbolic/dofdesc_inference.py @@ -201,6 +201,9 @@ class DOFDescInferenceMapper(RecursiveMapper, CSECachingMapperMixin): for name, expr in zip(insn.names, insn.exprs) ] + def map_insn_rank_data_swap(self, insn): + return [(insn.name, insn.dd_out)] + map_insn_assign_to_discr_scoped = map_insn_assign def map_insn_diff_batch_assign(self, insn): diff --git a/grudge/symbolic/mappers/__init__.py b/grudge/symbolic/mappers/__init__.py index ca11cbf0d92c96b6c9d00a6e035b7b840c2a85cd..5304d647f542400f12b67fecc34dd0f377b23375 100644 --- a/grudge/symbolic/mappers/__init__.py +++ b/grudge/symbolic/mappers/__init__.py @@ -148,6 +148,7 @@ class OperatorReducerMixin(LocalOpReducerMixin, FluxOpReducerMixin): map_ref_mass = _map_op_base map_ref_inverse_mass = _map_op_base + map_opposite_partition_face_swap = _map_op_base map_opposite_interior_face_swap = _map_op_base map_face_mass_operator = _map_op_base map_ref_face_mass_operator = _map_op_base @@ -196,6 +197,7 @@ class IdentityMapperMixin(LocalOpReducerMixin, FluxOpReducerMixin): map_ref_mass = map_elementwise_linear map_ref_inverse_mass = map_elementwise_linear + map_opposite_partition_face_swap = map_elementwise_linear map_opposite_interior_face_swap = map_elementwise_linear map_face_mass_operator = map_elementwise_linear map_ref_face_mass_operator = map_elementwise_linear @@ -332,6 +334,145 @@ class OperatorBinder(CSECachingMapperMixin, IdentityMapper): # }}} +# {{{ dof desc (dd) replacement + +class DOFDescReplacer(IdentityMapper): + def __init__(self, prev_dd, new_dd): + self.prev_dd = prev_dd + self.new_dd = new_dd + + def map_operator_binding(self, expr): + if (isinstance(expr.op, op.OppositeInteriorFaceSwap) + and expr.op.dd_in == self.prev_dd + and expr.op.dd_out == self.prev_dd): + field = self.rec(expr.field) + return op.OppositePartitionFaceSwap(dd_in=self.new_dd, + dd_out=self.new_dd)(field) + elif (isinstance(expr.op, op.InterpolationOperator) + and expr.op.dd_out == self.prev_dd): + return op.InterpolationOperator(dd_in=expr.op.dd_in, + dd_out=self.new_dd)(expr.field) + elif (isinstance(expr.op, op.RefDiffOperatorBase) + and expr.op.dd_out == self.prev_dd + and expr.op.dd_in == self.prev_dd): + return type(expr.op)(expr.op.rst_axis, + dd_in=self.new_dd, + dd_out=self.new_dd)(self.rec(expr.field)) + + def map_node_coordinate_component(self, expr): + if expr.dd == self.prev_dd: + return type(expr)(expr.axis, self.new_dd) + +# }}} + + +# {{{ mappers for distributed computation + +class OppositeInteriorFaceSwapUniqueIDAssigner( + CSECachingMapperMixin, IdentityMapper): + map_common_subexpression_uncached = IdentityMapper.map_common_subexpression + + def __init__(self): + super(OppositeInteriorFaceSwapUniqueIDAssigner, self).__init__() + self._next_id = 0 + self.seen_ids = set() + + def next_id(self): + while self._next_id in self.seen_ids: + self._next_id += 1 + + result = self._next_id + self._next_id += 1 + self.seen_ids.add(result) + + return result + + def map_opposite_interior_face_swap(self, expr): + if expr.unique_id is not None: + if expr.unique_id in self.seen_ids: + raise ValueError("OppositeInteriorFaceSwap unique ID '%d' " + "is not unique" % expr.unique_id) + + self.seen_ids.add(expr.unique_id) + return expr + + else: + return type(expr)(expr.dd_in, expr.dd_out, self.next_id()) + + +class DistributedMapper(CSECachingMapperMixin, IdentityMapper): + map_common_subexpression_uncached = IdentityMapper.map_common_subexpression + + def __init__(self, connected_parts): + self.connected_parts = connected_parts + + def map_operator_binding(self, expr): + from meshmode.mesh import BTAG_PARTITION + from meshmode.discretization.connection import (FACE_RESTR_ALL, + FACE_RESTR_INTERIOR) + if (isinstance(expr.op, op.InterpolationOperator) + and expr.op.dd_in.domain_tag is FACE_RESTR_INTERIOR + and expr.op.dd_out.domain_tag is FACE_RESTR_ALL): + distributed_work = 0 + for i_remote_part in self.connected_parts: + mapped_field = RankGeometryChanger(i_remote_part)(expr.field) + btag_part = BTAG_PARTITION(i_remote_part) + distributed_work += op.InterpolationOperator(dd_in=btag_part, + dd_out=expr.op.dd_out)(mapped_field) + return expr + distributed_work + else: + return IdentityMapper.map_operator_binding(self, expr) + + +class RankGeometryChanger(CSECachingMapperMixin, IdentityMapper): + map_common_subexpression_uncached = IdentityMapper.map_common_subexpression + + def __init__(self, i_remote_part): + from meshmode.discretization.connection import FACE_RESTR_INTERIOR + from meshmode.mesh import BTAG_PARTITION + self.prev_dd = sym.as_dofdesc(FACE_RESTR_INTERIOR) + self.new_dd = sym.as_dofdesc(BTAG_PARTITION(i_remote_part)) + + def _raise_unable(self, expr): + raise ValueError("encountered '%s' in updating subexpression for " + "changed geometry (likely for distributed computation); " + "unable to adapt from '%s' to '%s'" + % (str(expr), self.prev_dd, self.new_dd)) + + def map_operator_binding(self, expr): + if (isinstance(expr.op, op.OppositeInteriorFaceSwap) + and expr.op.dd_in == self.prev_dd + and expr.op.dd_out == self.prev_dd): + field = self.rec(expr.field) + return op.OppositePartitionFaceSwap( + dd_in=self.new_dd, + dd_out=self.new_dd, + unique_id=expr.op.unique_id)(field) + elif (isinstance(expr.op, op.InterpolationOperator) + and expr.op.dd_out == self.prev_dd): + return op.InterpolationOperator(dd_in=expr.op.dd_in, + dd_out=self.new_dd)(expr.field) + elif (isinstance(expr.op, op.RefDiffOperator) + and expr.op.dd_out == self.prev_dd + and expr.op.dd_in == self.prev_dd): + return op.RefDiffOperator(expr.op.rst_axis, + dd_in=self.new_dd, + dd_out=self.new_dd)(self.rec(expr.field)) + else: + self._raise_unable(expr) + + def map_grudge_variable(self, expr): + self._raise_unable(expr) + + def map_node_coordinate_component(self, expr): + if expr.dd == self.prev_dd: + return type(expr)(expr.axis, self.new_dd) + else: + self._raise_unable(expr) + +# }}} + + # {{{ operator specializer class OperatorSpecializer(CSECachingMapperMixin, IdentityMapper): @@ -572,6 +713,7 @@ class StringifyMapper(pymbolic.mapper.stringifier.StringifyMapper): else: return repr(s) + from meshmode.mesh import BTAG_PARTITION from meshmode.discretization.connection import ( FACE_RESTR_ALL, FACE_RESTR_INTERIOR) if dd.domain_tag is None: @@ -584,6 +726,8 @@ class StringifyMapper(pymbolic.mapper.stringifier.StringifyMapper): result = "all_faces" elif dd.domain_tag is FACE_RESTR_INTERIOR: result = "int_faces" + elif isinstance(dd.domain_tag, BTAG_PARTITION): + result = "part%d_faces" % dd.domain_tag.part_nr else: result = fmt(dd.domain_tag) @@ -671,6 +815,9 @@ class StringifyMapper(pymbolic.mapper.stringifier.StringifyMapper): def map_ref_face_mass_operator(self, expr, enclosing_prec): return "RefFaceM" + self._format_op_dd(expr) + def map_opposite_partition_face_swap(self, expr, enclosing_prec): + return "PartSwap" + self._format_op_dd(expr) + def map_opposite_interior_face_swap(self, expr, enclosing_prec): return "OppSwap" + self._format_op_dd(expr) diff --git a/grudge/symbolic/operators.py b/grudge/symbolic/operators.py index 194b071614dba86a7ff150c075e5a3c987f23a4b..53fb1422619db271786abb9c2c17df5b3f9a97c0 100644 --- a/grudge/symbolic/operators.py +++ b/grudge/symbolic/operators.py @@ -83,6 +83,8 @@ class Operator(pymbolic.primitives.Expression): dd_in=dd_in or self.dd_in, dd_out=dd_out or self.dd_out) + init_arg_names = ("dd_in", "dd_out") + def __getinitargs__(self): return (self.dd_in, self.dd_out,) @@ -105,6 +107,7 @@ class InterpolationOperator(Operator): " does not do anything.".format(official_dd_in, official_dd_out)) super(InterpolationOperator, self).__init__(dd_in, dd_out) + mapper_method = intern("map_interpolation") @@ -163,6 +166,8 @@ class DiffOperatorBase(Operator): self.xyz_axis = xyz_axis + init_arg_names = ("xyz_axis", "dd_in", "dd_out") + def __getinitargs__(self): return (self.xyz_axis, self.dd_in, self.dd_out) @@ -214,6 +219,8 @@ class RefDiffOperatorBase(ElementwiseLinearOperator): self.rst_axis = rst_axis + init_arg_names = ("rst_axis", "dd_in", "dd_out") + def __getinitargs__(self): return (self.rst_axis, self.dd_in, self.dd_out) @@ -410,7 +417,18 @@ class RefInverseMassOperator(RefMassOperatorBase): # {{{ boundary-related operators class OppositeInteriorFaceSwap(Operator): - def __init__(self, dd_in=None, dd_out=None): + """ + .. attribute:: unique_id + + An integer identifying this specific instances of + :class:`OppositePartitionFaceSwap` within an entire bound symbolic + operator. Is assigned automatically by :func:`grudge.bind` + if not already set by the user. This will become + :class:`OppositePartitionFaceSwap.unique_id` in distributed + runs. + """ + + def __init__(self, dd_in=None, dd_out=None, unique_id=None): sym = _sym() if dd_in is None: @@ -418,16 +436,60 @@ class OppositeInteriorFaceSwap(Operator): if dd_out is None: dd_out = dd_in - if dd_in.domain_tag is not sym.FACE_RESTR_INTERIOR: + super(OppositeInteriorFaceSwap, self).__init__(dd_in, dd_out) + if self.dd_in.domain_tag is not sym.FACE_RESTR_INTERIOR: raise ValueError("dd_in must be an interior faces domain") - if dd_out != dd_in: + if self.dd_out != self.dd_in: raise ValueError("dd_out and dd_in must be identical") - super(OppositeInteriorFaceSwap, self).__init__(dd_in, dd_out) + assert unique_id is None or isinstance(unique_id, int) + self.unique_id = unique_id + + init_arg_names = ("dd_in", "dd_out", "unique_id") + + def __getinitargs__(self): + return (self.dd_in, self.dd_out, self.unique_id) mapper_method = intern("map_opposite_interior_face_swap") +class OppositePartitionFaceSwap(Operator): + """ + .. attribute:: unique_id + + An integer corresponding to the :attr:`OppositeInteriorFaceSwap.unique_id` + which led to the creation of this object. This integer is used as an + MPI tag offset to keep different subexpressions apart in MPI traffic. + """ + def __init__(self, dd_in=None, dd_out=None, unique_id=None): + sym = _sym() + + if dd_in is None and dd_out is None: + raise ValueError("dd_in or dd_out must be specified") + elif dd_in is None: + dd_in = dd_out + elif dd_out is None: + dd_out = dd_in + + super(OppositePartitionFaceSwap, self).__init__(dd_in, dd_out) + if not isinstance(self.dd_in.domain_tag, sym.BTAG_PARTITION): + raise ValueError("dd_in must be a partition boundary faces domain") + if self.dd_out != self.dd_in: + raise ValueError("dd_out and dd_in must be identical") + + self.i_remote_part = self.dd_in.domain_tag.part_nr + + assert unique_id is None or isinstance(unique_id, int) + self.unique_id = unique_id + + init_arg_names = ("dd_in", "dd_out", "unique_id") + + def __getinitargs__(self): + return (self.dd_in, self.dd_out, self.unique_id) + + mapper_method = intern("map_opposite_partition_face_swap") + + class FaceMassOperatorBase(ElementwiseLinearOperator): def __init__(self, dd_in=None, dd_out=None): sym = _sym() diff --git a/grudge/symbolic/primitives.py b/grudge/symbolic/primitives.py index 471645d62317f9521e2ce09c4d8d600a09cfdc78..35c45268508b99c2e0264f38875eac2e895f335a 100644 --- a/grudge/symbolic/primitives.py +++ b/grudge/symbolic/primitives.py @@ -28,7 +28,7 @@ from six.moves import range, intern import numpy as np import pymbolic.primitives -from meshmode.mesh import BTAG_ALL, BTAG_REALLY_ALL, BTAG_NONE # noqa +from meshmode.mesh import BTAG_ALL, BTAG_REALLY_ALL, BTAG_NONE, BTAG_PARTITION # noqa from meshmode.discretization.connection import ( # noqa FACE_RESTR_ALL, FACE_RESTR_INTERIOR) @@ -161,6 +161,7 @@ class DOFDesc(object): :class:`meshmode.discretization.BTAG_ALL`, :class:`meshmode.discretization.BTAG_NONE`, :class:`meshmode.discretization.BTAG_REALLY_ALL`, + :class:`meshmode.discretization.PARTITION`, or :class or *None* to indicate that the geometry is not yet known. @@ -188,6 +189,8 @@ class DOFDesc(object): pass elif domain_tag is None: pass + elif isinstance(domain_tag, BTAG_PARTITION): + pass elif domain_tag in [BTAG_ALL, BTAG_REALLY_ALL, BTAG_NONE]: # FIXME: Should wrap these in DTAG_BOUNDARY pass @@ -218,6 +221,7 @@ class DOFDesc(object): return ( self.domain_tag in [ BTAG_ALL, BTAG_NONE, BTAG_REALLY_ALL] + or isinstance(self.domain_tag, BTAG_PARTITION) or isinstance(self.domain_tag, DTAG_BOUNDARY)) def is_trace(self): @@ -310,6 +314,7 @@ class cse_scope(cse_scope_base): # noqa class Variable(HasDOFDesc, ExpressionBase, pymbolic.primitives.Variable): """A user-supplied input variable with a known :class:`DOFDesc`. """ + init_arg_names = ("name", "dd") def __init__(self, name, dd=None): if dd is None: @@ -375,6 +380,8 @@ bessel_y = CFunction("bessel_y") # {{{ technical helpers class OperatorBinding(ExpressionBase): + init_arg_names = ("op", "field") + def __init__(self, op, field): self.op = op self.field = field @@ -438,6 +445,8 @@ class NodeCoordinateComponent(DiscretizationProperty): assert dd.domain_tag is not None + init_arg_names = ("axis", "dd") + def __getinitargs__(self): return (self.axis, self.dd) diff --git a/setup.py b/setup.py index 7fb0ad4527caf90ba6972b9879454922e38880b6..1f9ecbd04b99dbc64502f4a1bd4f2dc4a8376cdf 100644 --- a/setup.py +++ b/setup.py @@ -46,7 +46,7 @@ def main(): install_requires=[ "pytest>=2.3", - "pytools>=2015.1.4", + "pytools>=2018.5.2", "modepy>=2013.3", "meshmode>=2013.3", "pyopencl>=2013.1", diff --git a/test/test_mpi_communication.py b/test/test_mpi_communication.py new file mode 100644 index 0000000000000000000000000000000000000000..091111ed37c6cf72f12e43f03accaf326b69548f --- /dev/null +++ b/test/test_mpi_communication.py @@ -0,0 +1,302 @@ +from __future__ import division, absolute_import, print_function + +__copyright__ = """ +Copyright (C) 2017 Ellis Hoag +Copyright (C) 2017 Andreas Kloeckner +""" + +__license__ = """ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +import pytest +import os +import numpy as np +import pyopencl as cl +import logging +logger = logging.getLogger(__name__) + +from grudge import sym, bind, DGDiscretizationWithBoundaries +from grudge.shortcuts import set_up_rk4 + + +def simple_mpi_communication_entrypoint(): + cl_ctx = cl.create_some_context() + queue = cl.CommandQueue(cl_ctx) + from meshmode.distributed import MPIMeshDistributor, get_partition_by_pymetis + + from mpi4py import MPI + comm = MPI.COMM_WORLD + num_parts = comm.Get_size() + + mesh_dist = MPIMeshDistributor(comm) + + if mesh_dist.is_mananger_rank(): + from meshmode.mesh.generation import generate_regular_rect_mesh + mesh = generate_regular_rect_mesh(a=(-1,)*2, + b=(1,)*2, + n=(3,)*2) + + part_per_element = get_partition_by_pymetis(mesh, num_parts) + + local_mesh = mesh_dist.send_mesh_parts(mesh, part_per_element, num_parts) + else: + local_mesh = mesh_dist.receive_mesh_part() + + vol_discr = DGDiscretizationWithBoundaries(cl_ctx, local_mesh, order=5, + mpi_communicator=comm) + + sym_x = sym.nodes(local_mesh.dim) + myfunc_symb = sym.sin(np.dot(sym_x, [2, 3])) + myfunc = bind(vol_discr, myfunc_symb)(queue) + + sym_all_faces_func = sym.cse( + sym.interp("vol", "all_faces")(sym.var("myfunc"))) + sym_int_faces_func = sym.cse( + sym.interp("vol", "int_faces")(sym.var("myfunc"))) + sym_bdry_faces_func = sym.cse( + sym.interp(sym.BTAG_ALL, "all_faces")( + sym.interp("vol", sym.BTAG_ALL)(sym.var("myfunc")))) + + bound_face_swap = bind(vol_discr, + sym.interp("int_faces", "all_faces")( + sym.OppositeInteriorFaceSwap("int_faces")( + sym_int_faces_func) + ) - (sym_all_faces_func - sym_bdry_faces_func) + ) + + # print(bound_face_swap) + # 1/0 + + hopefully_zero = bound_face_swap(queue, myfunc=myfunc) + import numpy.linalg as la + error = la.norm(hopefully_zero.get()) + + np.set_printoptions(threshold=100000000, suppress=True) + print(hopefully_zero) + print(error) + + assert error < 1e-14 + + +def mpi_communication_entrypoint(): + cl_ctx = cl.create_some_context() + queue = cl.CommandQueue(cl_ctx) + + from mpi4py import MPI + comm = MPI.COMM_WORLD + i_local_rank = comm.Get_rank() + num_parts = comm.Get_size() + + from meshmode.distributed import MPIMeshDistributor, get_partition_by_pymetis + mesh_dist = MPIMeshDistributor(comm) + + dim = 2 + dt = 0.04 + order = 4 + + if mesh_dist.is_mananger_rank(): + from meshmode.mesh.generation import generate_regular_rect_mesh + mesh = generate_regular_rect_mesh(a=(-0.5,)*dim, + b=(0.5,)*dim, + n=(16,)*dim) + + part_per_element = get_partition_by_pymetis(mesh, num_parts) + + local_mesh = mesh_dist.send_mesh_parts(mesh, part_per_element, num_parts) + else: + local_mesh = mesh_dist.receive_mesh_part() + + vol_discr = DGDiscretizationWithBoundaries(cl_ctx, local_mesh, order=order, + mpi_communicator=comm) + + source_center = np.array([0.1, 0.22, 0.33])[:local_mesh.dim] + source_width = 0.05 + source_omega = 3 + + sym_x = sym.nodes(local_mesh.dim) + sym_source_center_dist = sym_x - source_center + sym_t = sym.ScalarVariable("t") + + from grudge.models.wave import StrongWaveOperator + from meshmode.mesh import BTAG_ALL, BTAG_NONE + op = StrongWaveOperator(-0.1, vol_discr.dim, + source_f=( + sym.sin(source_omega*sym_t) + * sym.exp( + -np.dot(sym_source_center_dist, sym_source_center_dist) + / source_width**2)), + dirichlet_tag=BTAG_NONE, + neumann_tag=BTAG_NONE, + radiation_tag=BTAG_ALL, + flux_type="upwind") + + from pytools.obj_array import join_fields + fields = join_fields(vol_discr.zeros(queue), + [vol_discr.zeros(queue) for i in range(vol_discr.dim)]) + + # FIXME + # dt = op.estimate_rk4_timestep(vol_discr, fields=fields) + + # FIXME: Should meshmode consider BTAG_PARTITION to be a boundary? + # Fails because: "found faces without boundary conditions" + # op.check_bc_coverage(local_mesh) + + from pytools.log import LogManager, \ + add_general_quantities, \ + add_run_info, \ + IntervalTimer, EventCounter + log_filename = None + # NOTE: LogManager hangs when using a file on a shared directory. + # log_filename = 'grudge_log.dat' + logmgr = LogManager(log_filename, "w", comm) + add_run_info(logmgr) + add_general_quantities(logmgr) + log_quantities =\ + {"rank_data_swap_timer": IntervalTimer("rank_data_swap_timer", + "Time spent evaluating RankDataSwapAssign"), + "rank_data_swap_counter": EventCounter("rank_data_swap_counter", + "Number of RankDataSwapAssign instructions evaluated"), + "exec_timer": IntervalTimer("exec_timer", + "Total time spent executing instructions"), + "insn_eval_timer": IntervalTimer("insn_eval_timer", + "Time spend evaluating instructions"), + "future_eval_timer": IntervalTimer("future_eval_timer", + "Time spent evaluating futures"), + "busy_wait_timer": IntervalTimer("busy_wait_timer", + "Time wasted doing busy wait")} + for quantity in log_quantities.values(): + logmgr.add_quantity(quantity) + + # print(sym.pretty(op.sym_operator())) + bound_op = bind(vol_discr, op.sym_operator()) + # print(bound_op) + # 1/0 + + def rhs(t, w): + val, rhs.profile_data = bound_op(queue, profile_data=rhs.profile_data, + log_quantities=log_quantities, + t=t, w=w) + return val + rhs.profile_data = {} + + dt_stepper = set_up_rk4("w", dt, fields, rhs) + + final_t = 4 + nsteps = int(final_t/dt) + print("rank=%d dt=%g nsteps=%d" % (i_local_rank, dt, nsteps)) + + # from grudge.shortcuts import make_visualizer + # vis = make_visualizer(vol_discr, vis_order=order) + + step = 0 + + norm = bind(vol_discr, sym.norm(2, sym.var("u"))) + + from time import time + t_last_step = time() + + logmgr.tick_before() + for event in dt_stepper.run(t_end=final_t): + if isinstance(event, dt_stepper.StateComputed): + assert event.component_id == "w" + + step += 1 + print(step, event.t, norm(queue, u=event.state_component[0]), + time()-t_last_step) + + # if step % 10 == 0: + # vis.write_vtk_file("rank%d-fld-%04d.vtu" % (i_local_rank, step), + # [("u", event.state_component[0]), + # ("v", event.state_component[1:])]) + t_last_step = time() + logmgr.tick_after() + logmgr.tick_before() + logmgr.tick_after() + + def print_profile_data(data): + print("""execute() for rank %d: + \tInstruction Evaluation: %f%% + \tFuture Evaluation: %f%% + \tBusy Wait: %f%% + \tTotal: %f seconds""" % + (i_local_rank, + data['insn_eval_time'] / data['total_time'] * 100, + data['future_eval_time'] / data['total_time'] * 100, + data['busy_wait_time'] / data['total_time'] * 100, + data['total_time'])) + + print_profile_data(rhs.profile_data) + logmgr.close() + logger.debug("Rank %d exiting", i_local_rank) + + +# {{{ MPI test pytest entrypoint + +@pytest.mark.mpi +@pytest.mark.parametrize("num_ranks", [2]) +def test_mpi(num_ranks): + pytest.importorskip("mpi4py") + pytest.importorskip("pymetis") + + from subprocess import check_call + import sys + newenv = os.environ.copy() + newenv["RUN_WITHIN_MPI"] = "1" + newenv["TEST_MPI_COMMUNICATION"] = "1" + check_call([ + "mpiexec", "-np", str(num_ranks), "-x", "RUN_WITHIN_MPI", + sys.executable, __file__], + env=newenv) + + +@pytest.mark.mpi +@pytest.mark.parametrize("num_ranks", [2]) +def test_simple_mpi(num_ranks): + pytest.importorskip("mpi4py") + pytest.importorskip("pymetis") + + from subprocess import check_call + import sys + newenv = os.environ.copy() + newenv["RUN_WITHIN_MPI"] = "1" + newenv["TEST_SIMPLE_MPI_COMMUNICATION"] = "1" + check_call([ + "mpiexec", "-np", str(num_ranks), "-x", "RUN_WITHIN_MPI", + sys.executable, __file__], + env=newenv) + +# }}} + + +if __name__ == "__main__": + if "RUN_WITHIN_MPI" in os.environ: + if "TEST_MPI_COMMUNICATION" in os.environ: + mpi_communication_entrypoint() + elif "TEST_SIMPLE_MPI_COMMUNICATION" in os.environ: + simple_mpi_communication_entrypoint() + else: + import sys + if len(sys.argv) > 1: + exec(sys.argv[1]) + else: + from py.test.cmdline import main + main([__file__]) + +# vim: fdm=marker