diff --git a/.gitignore b/.gitignore index b450b3b751d1f738042168a937b2734d73530a3a..f2792e88875767bd3d1f269728fd48c212843668 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,9 @@ a.out hk test/*.pdf examples/*.pdf +.idea +.pytest_cache +.vscode *.dot diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index effbd8ce64d935a2821477a5598d923b3e47a8a9..706c5901f8c403fe5e629a46b0477d9d93173325 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -18,6 +18,7 @@ Python 2.7 POCL: - large-node except: - tags + - distributed-fmm artifacts: reports: junit: test/pytest.xml @@ -36,10 +37,27 @@ Python 3 POCL: - large-node except: - tags + - distributed-fmm artifacts: reports: junit: test/pytest.xml +Python 3 POCL MPI: + script: + - export PY_EXE=python3 + - export PYOPENCL_TEST=portable:pthread + - export PYTEST_ADDOPTS="-m mpi --capture=no" + - export EXTRA_INSTALL="Cython pybind11 numpy scipy mako mpi4py" + - curl -L -O -k https://gitlab.tiker.net/inducer/ci-support/raw/master/build-and-test-py-project.sh + - ". ./build-and-test-py-project.sh" + tags: + - python3 + - pocl + - large-node + except: + - tags + + Python 3 Intel: script: - export PY_EXE=python3 @@ -55,6 +73,7 @@ Python 3 Intel: - large-node except: - tags + - distributed-fmm artifacts: reports: junit: test/pytest.xml @@ -73,6 +92,10 @@ Python 3 POCL Examples: - large-node except: - tags + - distributed-fmm + artifacts: + reports: + junit: test/pytest.xml Python 3 Conda: script: @@ -86,7 +109,7 @@ Python 3 Conda: - large-node except: - tags - + - distributed-fmm artifacts: reports: junit: test/pytest.xml @@ -108,6 +131,7 @@ Python 3 Conda Apple: - apple except: - tags + - distributed-fmm retry: 2 artifacts: @@ -130,7 +154,7 @@ Pylint: # Pylint won't find the Cython bits without this - PROJECT_INSTALL_FLAGS="--editable" - export PY_EXE=python3 - - EXTRA_INSTALL="Cython pybind11 numpy mako matplotlib" + - EXTRA_INSTALL="Cython pybind11 numpy mako matplotlib mpi4py" - curl -L -O -k https://gitlab.tiker.net/inducer/ci-support/raw/master/prepare-and-run-pylint.sh - ". ./prepare-and-run-pylint.sh pytential test/test_*.py" tags: diff --git a/.test-conda-env-py3-macos.yml b/.test-conda-env-py3-macos.yml index 901576dcf85d5fae32db5acb200702e2ad73c352..2b96b8e634f7a526d4615c848b30829856a69509 100644 --- a/.test-conda-env-py3-macos.yml +++ b/.test-conda-env-py3-macos.yml @@ -23,7 +23,7 @@ dependencies: - pip - pip: - git+https://github.com/inducer/pytools - - git+https://gitlab.tiker.net/inducer/boxtree + - git+https://gitlab.tiker.net/inducer/boxtree@distributed-fmm-global - git+https://github.com/inducer/pymbolic - git+https://github.com/inducer/loopy - git+https://gitlab.tiker.net/inducer/sumpy diff --git a/.test-conda-env-py3.yml b/.test-conda-env-py3.yml index 748855b09faf2accacd8b0970597d4732740a2a4..49f7d468781e0aa26df08eee7e28fe9db9029600 100644 --- a/.test-conda-env-py3.yml +++ b/.test-conda-env-py3.yml @@ -19,7 +19,7 @@ dependencies: - pip - pip: - git+https://github.com/inducer/pytools - - git+https://gitlab.tiker.net/inducer/boxtree + - git+https://gitlab.tiker.net/inducer/boxtree@distributed-fmm-global - git+https://github.com/inducer/pymbolic - git+https://github.com/inducer/loopy - git+https://gitlab.tiker.net/inducer/sumpy diff --git a/examples/distributed/test_layer_pot_identity.py b/examples/distributed/test_layer_pot_identity.py new file mode 100644 index 0000000000000000000000000000000000000000..e67464d64e2483f4a4ec6bc51ab06facb51d9b30 --- /dev/null +++ b/examples/distributed/test_layer_pot_identity.py @@ -0,0 +1,176 @@ +import pyopencl as cl +from pytential import bind, sym, norm +from pytential.symbolic.execution import bind_distributed +import numpy as np +from sympy.core.cache import clear_cache +from pytools.convergence import EOCRecorder +from mpi4py import MPI +from pytential.qbx.distributed import DistributedQBXLayerPotentialSource +from sumpy.kernel import LaplaceKernel + +comm = MPI.COMM_WORLD +current_rank = comm.Get_rank() +total_rank = comm.Get_size() + +# prevent cache 'splosion +clear_cache() + +# Setup PyOpenCL +ctx = cl.create_some_context() +queue = cl.CommandQueue(ctx) + + +class GreenExpr(object): + zero_op_name = "green" + + def get_zero_op(self, kernel, **knl_kwargs): + + u_sym = sym.var("u") + dn_u_sym = sym.var("dn_u") + + return ( + sym.S(kernel, dn_u_sym, qbx_forced_limit=-1, **knl_kwargs) + - sym.D(kernel, u_sym, qbx_forced_limit="avg", **knl_kwargs) + - 0.5*u_sym) + + order_drop = 0 + + +def get_sphere_mesh(refinement_increment, target_order): + from meshmode.mesh.generation import generate_icosphere + mesh = generate_icosphere(1, target_order) + from meshmode.mesh.refinement import Refiner + + refiner = Refiner(mesh) + for i in range(refinement_increment): + flags = np.ones(mesh.nelements, dtype=bool) + refiner.refine(flags) + mesh = refiner.get_current_mesh() + + return mesh + + +class SphereGeometry(object): + mesh_name = "sphere" + dim = 3 + + resolutions = [0, 1] + + def get_mesh(self, resolution, tgt_order): + return get_sphere_mesh(resolution, tgt_order) + + +target_order = 8 +k = 0 +qbx_order = 3 +fmm_order = 10 +resolutions = [0, 1] +_expansion_stick_out_factor = 0.5 +visualize = False + +eoc_rec = EOCRecorder() + +for resolution in resolutions: + if current_rank == 0: + expr = GreenExpr() + geometry = SphereGeometry() + mesh = geometry.get_mesh(resolution, target_order) + if mesh is None: + break + + d = mesh.ambient_dim + + lap_k_sym = LaplaceKernel(d) + k_sym = lap_k_sym + knl_kwargs = {} + + from meshmode.discretization import Discretization + from meshmode.discretization.poly_element import \ + InterpolatoryQuadratureSimplexGroupFactory + + pre_density_discr = Discretization( + ctx, mesh, + InterpolatoryQuadratureSimplexGroupFactory(target_order)) + + refiner_extra_kwargs = {} + + qbx, _ = DistributedQBXLayerPotentialSource( + pre_density_discr, + fine_order=4 * target_order, + qbx_order=qbx_order, + comm=comm, + fmm_order=fmm_order, + knl_specific_calibration_params="constant_one", + _expansions_in_tree_have_extent=True, + _expansion_stick_out_factor=_expansion_stick_out_factor + ).with_refinement(**refiner_extra_kwargs) + + density_discr = qbx.density_discr + + # {{{ compute values of a solution to the PDE + + nodes_host = density_discr.nodes().get(queue) + normal = bind(density_discr, sym.normal(d))(queue).as_vector(np.object) + normal_host = [normal[j].get() for j in range(d)] + + center = np.array([3, 1, 2])[:d] + diff = nodes_host - center[:, np.newaxis] + dist_squared = np.sum(diff ** 2, axis=0) + dist = np.sqrt(dist_squared) + if d == 2: + u = np.log(dist) + grad_u = diff / dist_squared + elif d == 3: + u = 1 / dist + grad_u = -diff / dist ** 3 + else: + assert False + + dn_u = 0 + for i in range(d): + dn_u = dn_u + normal_host[i] * grad_u[i] + + # }}} + + u_dev = cl.array.to_device(queue, u) + dn_u_dev = cl.array.to_device(queue, dn_u) + grad_u_dev = cl.array.to_device(queue, grad_u) + + key = (qbx_order, geometry.mesh_name, resolution, + expr.zero_op_name) + + op = expr.get_zero_op(k_sym, **knl_kwargs) + qbx_ctx = {"u": u_dev, "dn_u": dn_u_dev, "grad_u": grad_u_dev, "k": k} + else: + qbx = None + op = None + qbx_ctx = {} + + bound_op = bind_distributed(comm, qbx, op) + error = bound_op(queue, **qbx_ctx) + + if current_rank == 0: + linf_error_norm = norm(density_discr, queue, error, p=np.inf) + print("--->", key, linf_error_norm) + + h_max = bind(qbx, sym.h_max(qbx.ambient_dim))(queue) + eoc_rec.add_data_point(h_max, linf_error_norm) + + if visualize: + from meshmode.discretization.visualization import make_visualizer + + bdry_vis = make_visualizer(queue, density_discr, target_order) + + bdry_normals = bind(density_discr, sym.normal(mesh.ambient_dim))(queue) \ + .as_vector(dtype=object) + + bdry_vis.write_vtk_file("source-%s.vtu" % resolution, [ + ("u", u_dev), + ("bdry_normals", bdry_normals), + ("error", error), + ]) + +if current_rank == 0: + print(eoc_rec) + tgt_order = qbx_order - expr.order_drop + assert eoc_rec.order_estimate() > tgt_order - 1.6 diff --git a/examples/distributed/test_off_surface_eval.py b/examples/distributed/test_off_surface_eval.py new file mode 100644 index 0000000000000000000000000000000000000000..3e0679d6ea265bc4ad59c8d5ffa8f3406553d383 --- /dev/null +++ b/examples/distributed/test_off_surface_eval.py @@ -0,0 +1,91 @@ +import pyopencl as cl +from meshmode.mesh.generation import ( + make_curve_mesh, ellipse) +import functools +from sympy.core.cache import clear_cache +import numpy as np +from pytential.qbx.distributed import DistributedQBXLayerPotentialSource +from pytential.symbolic.execution import bind_distributed +from meshmode.discretization import Discretization +from meshmode.discretization.poly_element import ( + InterpolatoryQuadratureSimplexGroupFactory) +from sumpy.kernel import LaplaceKernel +import pytential +from sumpy.visualization import FieldPlotter +from pytential.target import PointsTarget +import matplotlib.pyplot as pt +from mpi4py import MPI +import logging +import os + +# Set up logging infrastructure +logging.basicConfig(level=os.environ.get("LOGLEVEL", "WARNING")) +logging.getLogger("boxtree.distributed").setLevel(logging.INFO) +logging.getLogger("pytential.qbx.distributed").setLevel(logging.INFO) + +# Get MPI information +comm = MPI.COMM_WORLD +current_rank = comm.Get_rank() +total_rank = comm.Get_size() + +# Disable sympy cache +clear_cache() + +# Setup PyOpenCL +ctx = cl.create_some_context() +queue = cl.CommandQueue(ctx) + +# Parameters +nelements = 30 +target_order = 8 +qbx_order = 3 +fmm_order = qbx_order + +if current_rank == 0: # master rank + mesh = make_curve_mesh(functools.partial(ellipse, 3), + np.linspace(0, 1, nelements + 1), + target_order) + + pre_density_discr = Discretization( + ctx, mesh, InterpolatoryQuadratureSimplexGroupFactory(target_order)) + + qbx = DistributedQBXLayerPotentialSource( + pre_density_discr, + fine_order=4 * target_order, + comm=comm, + qbx_order=qbx_order, + fmm_order=fmm_order, + knl_specific_calibration_params="constant_one" + ) + + op = pytential.sym.D( + LaplaceKernel(2), pytential.sym.var("sigma"), qbx_forced_limit=-2) + + qbx, _ = qbx.with_refinement() + density_discr = qbx.density_discr + sigma = density_discr.zeros(queue) + 1 + qbx_ctx = {"sigma": sigma} + + fplot = FieldPlotter(np.zeros(2), extent=0.54, npoints=30) + targets = PointsTarget(fplot.points) +else: + qbx = None + targets = None + op = None + qbx_ctx = {} + +fld_in_vol = bind_distributed(comm, (qbx, targets), op)(queue, **qbx_ctx) + +if current_rank == 0: + err = cl.clmath.fabs(fld_in_vol - (-1)) + + linf_err = cl.array.max(err).get() + print("l_inf error:", linf_err) + + fplot.show_scalar_in_matplotlib(fld_in_vol.get()) + + pt.colorbar() + pt.show() + + # FIXME: Why does the FMM only meet this sloppy tolerance? + assert linf_err < 1e-2 diff --git a/examples/distributed/test_scalar_int_eq.py b/examples/distributed/test_scalar_int_eq.py new file mode 100644 index 0000000000000000000000000000000000000000..e3e7deb1dcdfd4552cd4a775c2761d86d538e9dd --- /dev/null +++ b/examples/distributed/test_scalar_int_eq.py @@ -0,0 +1,987 @@ +from __future__ import division, absolute_import, print_function + +__copyright__ = "Copyright (C) 2014 Andreas Kloeckner" \ + "Copyright (C) 2019 Hao Gao" + +__license__ = """ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + + +import numpy as np +import numpy.linalg as la +import pyopencl as cl +import pyopencl.clmath # noqa +import pytest +from pytools import Record +from pyopencl.tools import ( # noqa + pytest_generate_tests_for_pyopencl as pytest_generate_tests) + +from functools import partial +from meshmode.mesh.generation import ( # noqa + ellipse, cloverleaf, starfish, drop, n_gon, qbx_peanut, WobblyCircle, + make_curve_mesh) +from meshmode.discretization.visualization import make_visualizer +from sumpy.symbolic import USE_SYMENGINE +from pytential import bind, sym +from pytential.qbx import QBXTargetAssociationFailedException + +from mpi4py import MPI +from pytential.qbx.distributed import DistributedQBXLayerPotentialSource +from pytential.symbolic.execution import bind_distributed + +import logging +logger = logging.getLogger(__name__) + +circle = partial(ellipse, 1) + +try: + import matplotlib.pyplot as pt +except ImportError: + pass + + +def make_circular_point_group(ambient_dim, npoints, radius, + center=np.array([0., 0.]), func=lambda x: x): + t = func(np.linspace(0, 1, npoints, endpoint=False)) * (2 * np.pi) + center = np.asarray(center) + result = np.zeros((ambient_dim, npoints)) + result[:2, :] = center[:, np.newaxis] + radius*np.vstack((np.cos(t), np.sin(t))) + return result + + +# {{{ test cases + +class IntEqTestCase: + + @property + def default_helmholtz_k(self): + raise NotImplementedError + + @property + def name(self): + raise NotImplementedError + + @property + def qbx_order(self): + raise NotImplementedError + + @property + def target_order(self): + raise NotImplementedError + + def __init__(self, helmholtz_k, bc_type, prob_side): + """ + :arg prob_side: may be -1, +1, or ``'scat'`` for a scattering problem + """ + + if helmholtz_k is None: + helmholtz_k = self.default_helmholtz_k + + self.helmholtz_k = helmholtz_k + self.bc_type = bc_type + self.prob_side = prob_side + + @property + def k(self): + return self.helmholtz_k + + def __str__(self): + return ("name: %s, bc_type: %s, prob_side: %s, " + "helmholtz_k: %s, qbx_order: %d, target_order: %d" + % (self.name, self.bc_type, self.prob_side, self.helmholtz_k, + self.qbx_order, self.target_order)) + + fmm_backend = "sumpy" + gmres_tol = 1e-14 + + +class CurveIntEqTestCase(IntEqTestCase): + resolutions = [40, 50, 60] + + def curve_func(self, *args, **kwargs): + raise NotImplementedError + + def get_mesh(self, resolution, target_order): + return make_curve_mesh( + self.curve_func, + np.linspace(0, 1, resolution+1), + target_order) + + use_refinement = True + + inner_radius = 0.1 + outer_radius = 2 + + qbx_order = 5 + target_order = qbx_order + fmm_backend = None + + check_tangential_deriv = True + check_gradient = False + + +class EllipseIntEqTestCase(CurveIntEqTestCase): + name = "3-to-1 ellipse" + + def curve_func(self, x): + return ellipse(3, x) + + +class Helmholtz3DIntEqTestCase(IntEqTestCase): + fmm_backend = "fmmlib" + use_refinement = False + + @property + def target_order(self): + return self.qbx_order + + check_tangential_deriv = False + + gmres_tol = 1e-7 + + +class EllipsoidIntEqTestCase(Helmholtz3DIntEqTestCase): + resolutions = [2, 0.8] + name = "ellipsoid" + + def get_mesh(self, resolution, target_order): + from meshmode.mesh.io import generate_gmsh, FileSource + mesh = generate_gmsh( + FileSource("../ellipsoid.step"), 2, order=2, + other_options=[ + "-string", + "Mesh.CharacteristicLengthMax = %g;" % resolution]) + + from meshmode.mesh.processing import perform_flips + # Flip elements--gmsh generates inside-out geometry. + return perform_flips(mesh, np.ones(mesh.nelements)) + + qbx_order = 5 + fmm_order = 13 + + inner_radius = 0.4 + outer_radius = 5 + + check_gradient = True + + +class SphereIntEqTestCase(IntEqTestCase): + resolutions = [1, 2] + name = "sphere" + + def get_mesh(self, resolution, target_order): + from meshmode.mesh.generation import generate_icosphere + from meshmode.mesh.refinement import refine_uniformly + mesh = refine_uniformly( + generate_icosphere(1, target_order), + resolution) + + return mesh + + fmm_backend = "fmmlib" + use_refinement = False + + fmm_tol = 1e-4 + + inner_radius = 0.4 + outer_radius = 5 + + qbx_order = 5 + target_order = 8 + check_gradient = False + check_tangential_deriv = False + + gmres_tol = 1e-7 + + +class MergedCubesIntEqTestCase(Helmholtz3DIntEqTestCase): + resolutions = [1.4] + name = "merged-cubes" + + def get_mesh(self, resolution, target_order): + from meshmode.mesh.io import generate_gmsh, FileSource + mesh = generate_gmsh( + FileSource("../merged-cubes.step"), 2, order=2, + other_options=[ + "-string", + "Mesh.CharacteristicLengthMax = %g;" % resolution]) + + from meshmode.mesh.processing import perform_flips + # Flip elements--gmsh generates inside-out geometry. + mesh = perform_flips(mesh, np.ones(mesh.nelements)) + + return mesh + + use_refinement = True + + inner_radius = 0.4 + outer_radius = 12 + + +class ManyEllipsoidIntEqTestCase(Helmholtz3DIntEqTestCase): + resolutions = [2, 1] + name = "ellipsoid" + + nx = 2 + ny = 2 + nz = 2 + + def get_mesh(self, resolution, target_order): + from meshmode.mesh.io import generate_gmsh, FileSource + base_mesh = generate_gmsh( + FileSource("../ellipsoid.step"), 2, order=2, + other_options=[ + "-string", + "Mesh.CharacteristicLengthMax = %g;" % resolution]) + + from meshmode.mesh.processing import perform_flips + # Flip elements--gmsh generates inside-out geometry. + base_mesh = perform_flips(base_mesh, np.ones(base_mesh.nelements)) + + from meshmode.mesh.processing import affine_map, merge_disjoint_meshes + from meshmode.mesh.tools import rand_rotation_matrix + pitch = 10 + meshes = [ + affine_map( + base_mesh, + A=rand_rotation_matrix(3), + b=pitch*np.array([ + (ix-self.nx//2), + (iy-self.ny//2), + (iz-self.ny//2)])) + for ix in range(self.nx) + for iy in range(self.ny) + for iz in range(self.nz) + ] + + mesh = merge_disjoint_meshes(meshes, single_group=True) + return mesh + + inner_radius = 0.4 + # This should sit in the area just outside the middle ellipsoid + outer_radius = 5 + + +class ElliptiplaneIntEqTestCase(IntEqTestCase): + name = "elliptiplane" + + resolutions = [0.1] + + fmm_backend = "fmmlib" + use_refinement = True + + qbx_order = 3 + fmm_tol = 1e-4 + target_order = qbx_order + check_gradient = False + check_tangential_deriv = False + + # We're only expecting three digits based on FMM settings. Who are we + # kidding? + gmres_tol = 1e-5 + + # to match the scheme given in the GIGAQBX3D paper + box_extent_norm = "l2" + from_sep_smaller_crit = "static_l2" + + def get_mesh(self, resolution, target_order): + from pytools import download_from_web_if_not_present + + download_from_web_if_not_present( + "https://raw.githubusercontent.com/inducer/geometries/master/" + "surface-3d/elliptiplane.brep") + + from meshmode.mesh.io import generate_gmsh, FileSource + mesh = generate_gmsh( + FileSource("elliptiplane.brep"), 2, order=2, + other_options=[ + "-string", + "Mesh.CharacteristicLengthMax = %g;" % resolution]) + + # now centered at origin and extends to -1,1 + + # Flip elements--gmsh generates inside-out geometry. + from meshmode.mesh.processing import perform_flips + return perform_flips(mesh, np.ones(mesh.nelements)) + + inner_radius = 0.2 + outer_radius = 12 # was '-13' in some large-scale run (?) + + +class BetterplaneIntEqTestCase(IntEqTestCase): + name = "betterplane" + + default_helmholtz_k = 20 + resolutions = [0.2] + # refine_on_helmholtz_k = False + + fmm_backend = "fmmlib" + use_refinement = True + + qbx_order = 3 + fmm_tol = 1e-4 + target_order = 6 + check_gradient = False + check_tangential_deriv = False + + visualize_geometry = True + + #scaled_max_curvature_threshold = 1 + expansion_disturbance_tolerance = 0.3 + + # We're only expecting three digits based on FMM settings. Who are we + # kidding? + gmres_tol = 1e-5 + + vis_grid_spacing = (0.025, 0.2, 0.025) + vis_extend_factor = 0.2 + + def get_mesh(self, resolution, target_order): + from pytools import download_from_web_if_not_present + + download_from_web_if_not_present( + "https://raw.githubusercontent.com/inducer/geometries/a869fc3/" + "surface-3d/betterplane.brep") + + from meshmode.mesh.io import generate_gmsh, ScriptWithFilesSource + mesh = generate_gmsh( + ScriptWithFilesSource(""" + Merge "betterplane.brep"; + + Mesh.CharacteristicLengthMax = %(lcmax)f; + Mesh.ElementOrder = 2; + Mesh.CharacteristicLengthExtendFromBoundary = 0; + + // 2D mesh optimization + // Mesh.Lloyd = 1; + + l_superfine() = Unique(Abs(Boundary{ Surface{ + 27, 25, 17, 13, 18 }; })); + l_fine() = Unique(Abs(Boundary{ Surface{ 2, 6, 7}; })); + l_coarse() = Unique(Abs(Boundary{ Surface{ 14, 16 }; })); + + // p() = Unique(Abs(Boundary{ Line{l_fine()}; })); + // Characteristic Length{p()} = 0.05; + + Field[1] = Attractor; + Field[1].NNodesByEdge = 100; + Field[1].EdgesList = {l_superfine()}; + + Field[2] = Threshold; + Field[2].IField = 1; + Field[2].LcMin = 0.075; + Field[2].LcMax = %(lcmax)f; + Field[2].DistMin = 0.1; + Field[2].DistMax = 0.4; + + Field[3] = Attractor; + Field[3].NNodesByEdge = 100; + Field[3].EdgesList = {l_fine()}; + + Field[4] = Threshold; + Field[4].IField = 3; + Field[4].LcMin = 0.1; + Field[4].LcMax = %(lcmax)f; + Field[4].DistMin = 0.15; + Field[4].DistMax = 0.4; + + Field[5] = Attractor; + Field[5].NNodesByEdge = 100; + Field[5].EdgesList = {l_coarse()}; + + Field[6] = Threshold; + Field[6].IField = 5; + Field[6].LcMin = 0.15; + Field[6].LcMax = %(lcmax)f; + Field[6].DistMin = 0.2; + Field[6].DistMax = 0.4; + + Field[7] = Min; + Field[7].FieldsList = {2, 4, 6}; + + Background Field = 7; + """ % { + "lcmax": resolution, + }, ["betterplane.brep"]), 2) + + # Flip elements--gmsh generates inside-out geometry. + from meshmode.mesh.processing import perform_flips + return perform_flips(mesh, np.ones(mesh.nelements)) + + inner_radius = 0.2 + outer_radius = 15 + +# }}} + + +# {{{ test backend + +def run_int_eq_test(cl_ctx, queue, case, resolution, visualize): + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + + if rank == 0: + mesh = case.get_mesh(resolution, case.target_order) + print("%d elements" % mesh.nelements) + + from meshmode.discretization import Discretization + from meshmode.discretization.poly_element import \ + InterpolatoryQuadratureSimplexGroupFactory + pre_density_discr = Discretization( + cl_ctx, mesh, + InterpolatoryQuadratureSimplexGroupFactory(case.target_order)) + + source_order = 4*case.target_order + + refiner_extra_kwargs = {} + + qbx_lpot_kwargs = {} + if case.fmm_backend is None: + qbx_lpot_kwargs["fmm_order"] = False + else: + if hasattr(case, "fmm_tol"): + from sumpy.expansion.level_to_order import SimpleExpansionOrderFinder + qbx_lpot_kwargs["fmm_level_to_order"] = SimpleExpansionOrderFinder( + case.fmm_tol) + + elif hasattr(case, "fmm_order"): + qbx_lpot_kwargs["fmm_order"] = case.fmm_order + else: + qbx_lpot_kwargs["fmm_order"] = case.qbx_order + 5 + + qbx = DistributedQBXLayerPotentialSource( + pre_density_discr, + comm=comm, + fine_order=source_order, + qbx_order=case.qbx_order, + _box_extent_norm=getattr(case, "box_extent_norm", None), + _from_sep_smaller_crit=getattr(case, "from_sep_smaller_crit", None), + knl_specific_calibration_params="constant_one", + **qbx_lpot_kwargs + ) + + if case.use_refinement: + if case.k != 0 and getattr(case, "refine_on_helmholtz_k", True): + refiner_extra_kwargs["kernel_length_scale"] = 5/case.k + + if hasattr(case, "scaled_max_curvature_threshold"): + refiner_extra_kwargs["_scaled_max_curvature_threshold"] = \ + case.scaled_max_curvature_threshold + + if hasattr(case, "expansion_disturbance_tolerance"): + refiner_extra_kwargs["_expansion_disturbance_tolerance"] = \ + case.expansion_disturbance_tolerance + + if hasattr(case, "refinement_maxiter"): + refiner_extra_kwargs["maxiter"] = case.refinement_maxiter + + #refiner_extra_kwargs["visualize"] = True + + print("%d elements before refinement" % pre_density_discr.mesh.nelements) + qbx, _ = qbx.with_refinement(**refiner_extra_kwargs) + print("%d stage-1 elements after refinement" + % qbx.density_discr.mesh.nelements) + print("%d stage-2 elements after refinement" + % qbx.stage2_density_discr.mesh.nelements) + print("quad stage-2 elements have %d nodes" + % qbx.quad_stage2_density_discr.groups[0].nunit_nodes) + + density_discr = qbx.density_discr + + if hasattr(case, "visualize_geometry") and case.visualize_geometry: + bdry_normals = bind( + density_discr, sym.normal(mesh.ambient_dim) + )(queue).as_vector(dtype=object) + + bdry_vis = make_visualizer(queue, density_discr, case.target_order) + bdry_vis.write_vtk_file("geometry.vtu", [ + ("normals", bdry_normals) + ]) + + # {{{ plot geometry + + if 0: + if mesh.ambient_dim == 2: + # show geometry, centers, normals + nodes_h = density_discr.nodes().get(queue=queue) + pt.plot(nodes_h[0], nodes_h[1], "x-") + normal = bind( + density_discr, sym.normal(2))(queue).as_vector(np.object) + pt.quiver(nodes_h[0], nodes_h[1], + normal[0].get(queue), normal[1].get(queue)) + pt.gca().set_aspect("equal") + pt.show() + + elif mesh.ambient_dim == 3: + bdry_vis = make_visualizer(queue, density_discr, case.target_order+3) + + bdry_normals = bind(density_discr, sym.normal(3))(queue)\ + .as_vector(dtype=object) + + bdry_vis.write_vtk_file("pre-solve-source-%s.vtu" % resolution, [ + ("bdry_normals", bdry_normals), + ]) + + else: + raise ValueError("invalid mesh dim") + + # }}} + + # {{{ set up operator + + from pytential.symbolic.pde.scalar import ( + DirichletOperator, + NeumannOperator) + + from sumpy.kernel import LaplaceKernel, HelmholtzKernel + if case.k: + knl = HelmholtzKernel(mesh.ambient_dim) + knl_kwargs = {"k": sym.var("k")} + concrete_knl_kwargs = {"k": case.k} + else: + knl = LaplaceKernel(mesh.ambient_dim) + knl_kwargs = {} + concrete_knl_kwargs = {} + + if knl.is_complex_valued: + dtype = np.complex128 + else: + dtype = np.float64 + + loc_sign = +1 if case.prob_side in [+1, "scat"] else -1 + + if case.bc_type == "dirichlet": + op = DirichletOperator(knl, loc_sign, use_l2_weighting=True, + kernel_arguments=knl_kwargs) + elif case.bc_type == "neumann": + op = NeumannOperator(knl, loc_sign, use_l2_weighting=True, + use_improved_operator=False, kernel_arguments=knl_kwargs) + else: + assert False + + op_u = op.operator(sym.var("u")) + + # }}} + + # {{{ set up test data + + if case.prob_side == -1: + test_src_geo_radius = case.outer_radius + test_tgt_geo_radius = case.inner_radius + elif case.prob_side == +1: + test_src_geo_radius = case.inner_radius + test_tgt_geo_radius = case.outer_radius + elif case.prob_side == "scat": + test_src_geo_radius = case.outer_radius + test_tgt_geo_radius = case.outer_radius + else: + raise ValueError("unknown problem_side") + + point_sources = make_circular_point_group( + mesh.ambient_dim, 10, test_src_geo_radius, + func=lambda x: x**1.5) + test_targets = make_circular_point_group( + mesh.ambient_dim, 20, test_tgt_geo_radius) + + np.random.seed(22) + source_charges = np.random.randn(point_sources.shape[1]) + source_charges[-1] = -np.sum(source_charges[:-1]) + source_charges = source_charges.astype(dtype) + assert np.sum(source_charges) < 1e-15 + + source_charges_dev = cl.array.to_device(queue, source_charges) + + # }}} + + # {{{ establish BCs + + from pytential.source import PointPotentialSource + from pytential.target import PointsTarget + + point_source = PointPotentialSource(cl_ctx, point_sources) + + pot_src = sym.IntG( + # FIXME: qbx_forced_limit--really? + knl, sym.var("charges"), qbx_forced_limit=None, **knl_kwargs) + + test_direct = bind((point_source, PointsTarget(test_targets)), pot_src)( + queue, charges=source_charges_dev, **concrete_knl_kwargs) + + if case.bc_type == "dirichlet": + bc = bind((point_source, density_discr), pot_src)( + queue, charges=source_charges_dev, **concrete_knl_kwargs) + + elif case.bc_type == "neumann": + bc = bind( + (point_source, density_discr), + sym.normal_derivative( + qbx.ambient_dim, pot_src, dofdesc=sym.DEFAULT_TARGET) + )(queue, charges=source_charges_dev, **concrete_knl_kwargs) + + rhs = bind(density_discr, op.prepare_rhs(sym.var("bc")))(queue, bc=bc) + + # }}} + else: + qbx = None + op_u = None + dtype = None + rhs = None + concrete_knl_kwargs = {} + + # {{{ solve + + bound_op = bind_distributed(comm, qbx, op_u) + + try: + from pytential.solve import distributed_gmres + gmres_result = distributed_gmres( + comm, + bound_op.scipy_op(queue, "u", dtype, **concrete_knl_kwargs), + rhs, + tol=case.gmres_tol, + progress=True, + hard_failure=True, + stall_iterations=50, no_progress_factor=1.05) + except QBXTargetAssociationFailedException as e: + bdry_vis = make_visualizer(queue, density_discr, case.target_order+3) + + bdry_vis.write_vtk_file("failed-targets-%s.vtu" % resolution, [ + ("failed_targets", e.failed_target_flags), + ]) + raise + + if rank == 0: + print("gmres state:", gmres_result.state) + weighted_u = gmres_result.solution + + # }}} + + # {{{ error check + + if case.prob_side != "scat": + + if rank == 0: + points_target = PointsTarget(test_targets) + tgt_op = op.representation(sym.var("u")) + qbx_ctx = { + "u": weighted_u, + "k": case.k + } + else: + points_target = None + tgt_op = None + qbx_ctx = {} + + bound_tgt_op = bind_distributed(comm, (qbx, points_target), tgt_op) + + test_via_bdry = bound_tgt_op(queue, **qbx_ctx) + + if rank == 0: + err = test_via_bdry - test_direct + + err = err.get() + test_direct = test_direct.get() + test_via_bdry = test_via_bdry.get() + + # {{{ remove effect of net source charge + + if case.k == 0 and case.bc_type == "neumann" and loc_sign == -1: + + # remove constant offset in interior Laplace Neumann error + tgt_ones = np.ones_like(test_direct) + tgt_ones = tgt_ones/la.norm(tgt_ones) + err = err - np.vdot(tgt_ones, err)*tgt_ones + + # }}} + + rel_err_2 = la.norm(err)/la.norm(test_direct) + rel_err_inf = la.norm(err, np.inf)/la.norm(test_direct, np.inf) + + print("rel_err_2: %g rel_err_inf: %g" % (rel_err_2, rel_err_inf)) + else: + rel_err_2 = None + rel_err_inf = None + + # }}} + + # {{{ test gradient + + if case.check_gradient and case.prob_side != "scat": + + if rank == 0: + tgt_op = op.representation( + sym.var("u"), + map_potentials=lambda pot: sym.grad(mesh.ambient_dim, pot), + qbx_forced_limit=None + ) + qbx_ctx = concrete_knl_kwargs.copy() + qbx_ctx["u"] = weighted_u + else: + tgt_op = None + qbx_ctx = {} + + bound_grad_op = bind_distributed(comm, (qbx, points_target), tgt_op) + + grad_from_src = bound_grad_op(queue, **qbx_ctx) + + if rank == 0: + grad_ref = (bind( + (point_source, points_target), + sym.grad(mesh.ambient_dim, pot_src) + )(queue, charges=source_charges_dev, **concrete_knl_kwargs) + ) + + grad_err = (grad_from_src - grad_ref) + + rel_grad_err_inf = ( + la.norm(grad_err[0].get(), np.inf) + / la.norm(grad_ref[0].get(), np.inf)) + + print("rel_grad_err_inf: %g" % rel_grad_err_inf) + + # }}} + + # {{{ test tangential derivative + + if case.check_tangential_deriv and case.prob_side != "scat": + + if rank == 0: + deriv_op = op.representation( + sym.var("u"), + map_potentials=lambda pot: sym.tangential_derivative(2, pot), + qbx_forced_limit=loc_sign + ) + qbx_ctx = concrete_knl_kwargs.copy() + qbx_ctx["u"] = weighted_u + else: + deriv_op = None + qbx_ctx = None + + bound_t_deriv_op = bind_distributed(comm, qbx, deriv_op) + + tang_deriv_from_src = bound_t_deriv_op(queue, **qbx_ctx) + + if rank == 0: + tang_deriv_from_src = tang_deriv_from_src.as_scalar().get() + + tang_deriv_ref = (bind( + (point_source, density_discr), + sym.tangential_derivative(2, pot_src) + )(queue, charges=source_charges_dev, **concrete_knl_kwargs) + .as_scalar().get()) + + if 0: + pt.plot(tang_deriv_ref.real) + pt.plot(tang_deriv_from_src.real) + pt.show() + + td_err = (tang_deriv_from_src - tang_deriv_ref) + + rel_td_err_inf = la.norm(td_err, np.inf)/la.norm(tang_deriv_ref, np.inf) + + print("rel_td_err_inf: %g" % rel_td_err_inf) + else: + rel_td_err_inf = None + + # }}} + + if rank == 0: + + # {{{ any-D file plotting + + if visualize: + bdry_vis = make_visualizer(queue, density_discr, case.target_order+3) + + bdry_normals = bind(density_discr, sym.normal(qbx.ambient_dim))(queue)\ + .as_vector(dtype=object) + + sym_sqrt_j = sym.sqrt_jac_q_weight(density_discr.ambient_dim) + u = bind(density_discr, sym.var("u")/sym_sqrt_j)(queue, u=weighted_u) + + bdry_vis.write_vtk_file("source-%s.vtu" % resolution, [ + ("u", u), + ("bc", bc), + #("bdry_normals", bdry_normals), + ]) + + from sumpy.visualization import make_field_plotter_from_bbox # noqa + from meshmode.mesh.processing import find_bounding_box + + vis_grid_spacing = (0.1, 0.1, 0.1)[:qbx.ambient_dim] + if hasattr(case, "vis_grid_spacing"): + vis_grid_spacing = case.vis_grid_spacing + vis_extend_factor = 0.2 + if hasattr(case, "vis_extend_factor"): + vis_grid_spacing = case.vis_grid_spacing + + fplot = make_field_plotter_from_bbox( + find_bounding_box(mesh), + h=vis_grid_spacing, + extend_factor=vis_extend_factor) + + qbx_tgt_tol = qbx.copy(target_association_tolerance=0.15) + from pytential.target import PointsTarget + + try: + solved_pot = bind( + (qbx_tgt_tol, PointsTarget(fplot.points)), + op.representation(sym.var("u")) + )(queue, u=weighted_u, k=case.k) + except QBXTargetAssociationFailedException as e: + fplot.write_vtk_file( + "failed-targets.vts", + [ + ("failed_targets", e.failed_target_flags.get(queue)) + ]) + raise + + from sumpy.kernel import LaplaceKernel + ones_density = density_discr.zeros(queue) + ones_density.fill(1) + indicator = bind( + (qbx_tgt_tol, PointsTarget(fplot.points)), + -sym.D(LaplaceKernel(density_discr.ambient_dim), + sym.var("sigma"), + qbx_forced_limit=None))( + queue, sigma=ones_density).get() + + solved_pot = solved_pot.get() + + true_pot = bind((point_source, PointsTarget(fplot.points)), pot_src)( + queue, charges=source_charges_dev, **concrete_knl_kwargs).get() + + #fplot.show_scalar_in_mayavi(solved_pot.real, max_val=5) + if case.prob_side == "scat": + fplot.write_vtk_file( + "potential-%s.vts" % resolution, + [ + ("pot_scattered", solved_pot), + ("pot_incoming", -true_pot), + ("indicator", indicator), + ] + ) + else: + fplot.write_vtk_file( + "potential-%s.vts" % resolution, + [ + ("solved_pot", solved_pot), + ("true_pot", true_pot), + ("indicator", indicator), + ] + ) + + # }}} + + class Result(Record): + pass + + h_max = bind(qbx, sym.h_max(qbx.ambient_dim))(queue) + return Result( + h_max=h_max, + rel_err_2=rel_err_2, + rel_err_inf=rel_err_inf, + rel_td_err_inf=rel_td_err_inf, + gmres_result=gmres_result) + +# }}} + + +# {{{ test frontend + +@pytest.mark.parametrize("case", [ + EllipseIntEqTestCase(helmholtz_k=helmholtz_k, bc_type=bc_type, + prob_side=prob_side) + for helmholtz_k in [0, 1.2] + for bc_type in ["dirichlet", "neumann"] + for prob_side in [-1, +1] + ]) +# Sample test run: +# 'test_integral_equation(cl._csc, EllipseIntEqTestCase(0, "dirichlet", +1), visualize=True)' # noqa: E501 +def test_integral_equation(ctx_factory, case, visualize=False): + logging.basicConfig(level=logging.INFO) + + cl_ctx = ctx_factory() + queue = cl.CommandQueue(cl_ctx) + + if USE_SYMENGINE and case.fmm_backend is None: + pytest.skip("https://gitlab.tiker.net/inducer/sumpy/issues/25") + + # prevent cache 'splosion + from sympy.core.cache import clear_cache + clear_cache() + + # get MPI information + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + + if rank == 0: + from pytools.convergence import EOCRecorder + print("qbx_order: %d, %s" % (case.qbx_order, case)) + + eoc_rec_target = EOCRecorder() + eoc_rec_td = EOCRecorder() + + have_error_data = False + + for resolution in case.resolutions: + result = run_int_eq_test(cl_ctx, queue, case, resolution, + visualize=visualize) + + if rank == 0: + if result.rel_err_2 is not None: + have_error_data = True + eoc_rec_target.add_data_point(result.h_max, result.rel_err_2) + + if result.rel_td_err_inf is not None: + eoc_rec_td.add_data_point(result.h_max, result.rel_td_err_inf) + + if rank == 0: + if case.bc_type == "dirichlet": + tgt_order = case.qbx_order + elif case.bc_type == "neumann": + tgt_order = case.qbx_order-1 + else: + assert False + + if have_error_data: + print("TARGET ERROR:") + print(eoc_rec_target) + assert eoc_rec_target.order_estimate() > tgt_order - 1.3 + + if case.check_tangential_deriv: + print("TANGENTIAL DERIVATIVE ERROR:") + print(eoc_rec_td) + assert eoc_rec_td.order_estimate() > tgt_order - 2.3 + +# }}} + + +# You can test individual routines by typing +# $ python test_scalar_int_eq.py 'test_routine()' + +if __name__ == "__main__": + import sys + if len(sys.argv) > 1: + exec(sys.argv[1]) + else: + from pytest import main + main([__file__]) + +# vim: fdm=marker diff --git a/pytential/qbx/__init__.py b/pytential/qbx/__init__.py index 67115241dbbf75b1adbc1abc4280d1cb8e89f5cf..b507986082fb91bcc556bdcf8bfa677d0cb72241 100644 --- a/pytential/qbx/__init__.py +++ b/pytential/qbx/__init__.py @@ -29,6 +29,7 @@ import numpy as np from pytools import memoize_method from pytential.qbx.target_assoc import QBXTargetAssociationFailedException from pytential.source import LayerPotentialSourceBase +from pytential.qbx.cost import AbstractQBXCostModel, QBXCostModel import pyopencl as cl @@ -87,6 +88,7 @@ class QBXLayerPotentialSource(LayerPotentialSourceBase): _use_target_specific_qbx=None, geometry_data_inspector=None, cost_model=None, + knl_specific_calibration_params=None, fmm_backend="sumpy", target_stick_out_factor=_not_provided): """ @@ -216,6 +218,7 @@ class QBXLayerPotentialSource(LayerPotentialSourceBase): cost_model = QBXCostModel(queue) self.cost_model = cost_model + self.knl_specific_calibration_params = knl_specific_calibration_params # /!\ *All* parameters set here must also be set by copy() below, # otherwise they will be reset to their default values behind your @@ -239,8 +242,9 @@ class QBXLayerPotentialSource(LayerPotentialSourceBase): _tree_kind=None, _use_target_specific_qbx=_not_provided, geometry_data_inspector=None, - cost_model=_not_provided, fmm_backend=None, + cost_model=_not_provided, + knl_specific_calibration_params=_not_provided, debug=_not_provided, _disable_refinement=_not_provided, @@ -284,7 +288,7 @@ class QBXLayerPotentialSource(LayerPotentialSourceBase): # FIXME Could/should share wrangler and geometry kernels # if no relevant changes have been made. - return QBXLayerPotentialSource( + return type(self)( density_discr=density_discr or self.density_discr, fine_order=( fine_order if fine_order is not None else self.fine_order), @@ -327,10 +331,14 @@ class QBXLayerPotentialSource(LayerPotentialSourceBase): geometry_data_inspector=( geometry_data_inspector or self.geometry_data_inspector), cost_model=( - # None is a valid value here - cost_model - if cost_model is not _not_provided - else self.cost_model), + # None is a valid value here + cost_model + if cost_model is not _not_provided + else self.cost_model), + knl_specific_calibration_params=( + knl_specific_calibration_params + if knl_specific_calibration_params is not _not_provided + else self.knl_specific_calibration_params), fmm_backend=fmm_backend or self.fmm_backend, **kwargs) @@ -414,6 +422,16 @@ class QBXLayerPotentialSource(LayerPotentialSourceBase): else: func = self.exec_compute_potential_insn_fmm + def drive_dfmm(*args, **kwargs): + if return_timing_data: + timing_data = {} + else: + timing_data = None + kwargs.update({"timing_data": timing_data}) + + from pytential.qbx.distributed import drive_dfmm + return drive_dfmm(*args, **kwargs), timing_data + def drive_fmm(wrangler, strengths, geo_data, kernel, kernel_arguments): del geo_data, kernel, kernel_arguments from pytential.qbx.fmm import drive_fmm @@ -423,7 +441,11 @@ class QBXLayerPotentialSource(LayerPotentialSourceBase): timing_data = None return drive_fmm(wrangler, strengths, timing_data), timing_data - extra_args["fmm_driver"] = drive_fmm + from pytential.qbx.distributed import DistributedQBXLayerPotentialSource + if isinstance(self, DistributedQBXLayerPotentialSource): + extra_args["fmm_driver"] = drive_dfmm + else: + extra_args["fmm_driver"] = drive_fmm return self._dispatch_compute_potential_insn( queue, insn, bound_expr, evaluate, func, extra_args) @@ -610,11 +632,60 @@ class QBXLayerPotentialSource(LayerPotentialSourceBase): # }}} - # Execute global QBX. - all_potentials_on_every_target, extra_outputs = ( + # {{{ execute global QBX + + from pytential.qbx.distributed import DistributedQBXLayerPotentialSource + if isinstance(self, DistributedQBXLayerPotentialSource): + # FIXME: If the expansion wrangler is not FMMLib, the argument + # 'uses_pde_expansions' might be different + if self.cost_model is None: + cost_model = QBXCostModel(queue) + else: + cost_model = self.cost_model + + if self.knl_specific_calibration_params is None: + import warnings + warnings.warn( + "Kernel-specific calibration parameters are not supplied when" + "using distributed FMM." + ) + # TODO: supply better default calibration parameters + calibration_params = \ + AbstractQBXCostModel.get_unit_calibration_params() + elif (isinstance(self.knl_specific_calibration_params, str) + and self.knl_specific_calibration_params == "constant_one"): + calibration_params = \ + AbstractQBXCostModel.get_unit_calibration_params() + else: + knls = frozenset(knl for knl in insn.kernels) + calibration_params = self.knl_specific_calibration_params[knls] + + kernel_args = {} + for arg_name, arg_expr in six.iteritems(insn.kernel_arguments): + kernel_args[arg_name] = evaluate(arg_expr) + + boxes_time, _ = cost_model.qbx_cost_per_box( + geo_data, insn.base_kernel, kernel_args, calibration_params + ) + boxes_time = boxes_time.get() + + distributed_geo_data = self.distibuted_geo_data( # noqa pylint:disable=no-member + geo_data, queue, wrangler, boxes_time + ) + + all_potentials_on_every_target, extra_outputs = fmm_driver( + queue, strengths, distributed_geo_data, wrangler, + comm=self.comm # noqa pylint:disable=no-member + ) + + else: + # Execute global QBX. + all_potentials_on_every_target, extra_outputs = ( fmm_driver( wrangler, strengths, geo_data, fmm_kernel, kernel_extra_kwargs)) + # }}} + result = [] for o in insn.outputs: @@ -623,8 +694,10 @@ class QBXLayerPotentialSource(LayerPotentialSourceBase): target_slice = slice(*geo_data.target_info().target_discr_starts[ target_side_number:target_side_number+2]) - result.append((o.name, - all_potentials_on_every_target[o.kernel_index][target_slice])) + result.append( + (o.name, + all_potentials_on_every_target[o.kernel_index][target_slice]) + ) return result, extra_outputs @@ -813,8 +886,6 @@ class QBXLayerPotentialSource(LayerPotentialSourceBase): # }}} - # }}} - # }}} diff --git a/pytential/qbx/distributed.py b/pytential/qbx/distributed.py new file mode 100644 index 0000000000000000000000000000000000000000..2b430c52b9098127b9a9f356511fa341a09f08bf --- /dev/null +++ b/pytential/qbx/distributed.py @@ -0,0 +1,1180 @@ +from pytential.qbx.fmmlib import QBXFMMLibExpansionWrangler +from pytential.qbx import QBXLayerPotentialSource +from pytential.qbx.utils import ToHostTransferredGeoDataWrapper +from pytential.qbx.fmm import QBXExpansionWrangler +from boxtree.tree import FilteredTargetListsInTreeOrder +from boxtree.traversal import FMMTraversalBuilder +from boxtree.distributed.partition import ResponsibleBoxesQuery +from boxtree.distributed.calculation import DistributedExpansionWrangler +from boxtree.distributed.calculation import DistributedFMMLibExpansionWrangler +from boxtree.fmm import TimingRecorder +from boxtree.tools import DeviceDataRecord +from mpi4py import MPI +import numpy as np +import pyopencl as cl +from collections import namedtuple +import logging +import time +from pytools import memoize_method +from pytools.obj_array import with_object_array_or_scalar + +logger = logging.getLogger(__name__) + +# {{{ MPITags used in this module + +# TODO: remove unused tags +MPITags = { + "dipole_vec": 0, + "non_qbx_potentials": 1, + "qbx_potentials": 2 +} + +# }}} + + +# {{{ Expansion Wrangler + +class DistribtuedQBXFMMLibExpansionWrangler( + QBXFMMLibExpansionWrangler, DistributedFMMLibExpansionWrangler): + + @classmethod + def distribute_wrangler( + cls, queue, global_wrangler, distributed_geo_data, comm=MPI.COMM_WORLD): + mpi_rank = comm.Get_rank() + + if mpi_rank == 0: + import copy + distributed_wrangler = copy.copy(global_wrangler) + distributed_wrangler.queue = None + distributed_wrangler.geo_data = None + distributed_wrangler.rotation_data = None + distributed_wrangler.code = None + distributed_wrangler.tree = None + distributed_wrangler.__class__ = cls + else: # worker process + distributed_wrangler = None + + distributed_wrangler = comm.bcast(distributed_wrangler, root=0) + distributed_wrangler.tree = distributed_geo_data.local_tree + distributed_wrangler.geo_data = distributed_geo_data + distributed_wrangler.rotation_data = distributed_geo_data + distributed_wrangler.queue = queue + + # {{{ Compute local dipole_vec from the global one + + if distributed_wrangler.dipole_vec is not None: + src_idx = distributed_geo_data.src_idx + local_dipole_vec = distributed_wrangler.dipole_vec[:, src_idx] + distributed_wrangler.dipole_vec = local_dipole_vec + + # }}} + + return distributed_wrangler + + def eval_qbx_output_zeros(self): + from pytools.obj_array import make_obj_array + ctt = self.geo_data.center_to_tree_targets() + output = make_obj_array([np.zeros(len(ctt.lists), self.dtype) + for k in self.outputs]) + return output + + +class DistribtuedQBXSumpyExpansionWrangler( + QBXExpansionWrangler, DistributedExpansionWrangler): + @classmethod + def distribute_wrangler( + cls, queue, global_wrangler, distributed_geo_data, comm=MPI.COMM_WORLD): + mpi_rank = comm.Get_rank() + + if mpi_rank == 0: + import copy + distributed_wrangler = copy.copy(global_wrangler) + distributed_wrangler.code._cl_context = None + distributed_wrangler.code.cl_context = None + + distributed_wrangler.queue = None + distributed_wrangler.geo_data = None + distributed_wrangler.rotation_data = None + distributed_wrangler.tree = None + + from pytential.qbx.utils import sumpy_wrangler_extra_kwargs_to_host + distributed_wrangler.source_extra_kwargs = \ + sumpy_wrangler_extra_kwargs_to_host( + distributed_wrangler.source_extra_kwargs, queue + ) + + distributed_wrangler.extra_kwargs = \ + sumpy_wrangler_extra_kwargs_to_host( + distributed_wrangler.extra_kwargs, queue + ) + + distributed_wrangler.__class__ = cls + else: # worker process + distributed_wrangler = None + + distributed_wrangler = comm.bcast(distributed_wrangler, root=0) + distributed_wrangler.tree = distributed_geo_data.local_tree + distributed_wrangler.geo_data = distributed_geo_data + # distributed_wrangler.rotation_data = distributed_geo_data + distributed_wrangler.queue = queue + distributed_wrangler.code.cl_context = queue.context + + # {{{ compute local dsource_vec + + if "dsource_vec" in distributed_wrangler.source_extra_kwargs: + dsource_vec = distributed_wrangler.source_extra_kwargs["dsource_vec"] + for idim in range(len(dsource_vec)): + dsource_vec[idim] = dsource_vec[idim][distributed_geo_data.src_idx] + + if "dsource_vec" in distributed_wrangler.extra_kwargs: + dsource_vec = distributed_wrangler.extra_kwargs["dsource_vec"] + for idim in range(len(dsource_vec)): + dsource_vec[idim] = dsource_vec[idim][distributed_geo_data.src_idx] + + # }}} + + from pytential.qbx.utils import sumpy_wrangler_extra_kwargs_to_device + distributed_wrangler.source_extra_kwargs = \ + sumpy_wrangler_extra_kwargs_to_device( + distributed_wrangler.source_extra_kwargs, queue + ) + + distributed_wrangler.extra_kwargs = \ + sumpy_wrangler_extra_kwargs_to_device( + distributed_wrangler.extra_kwargs, queue + ) + + return distributed_wrangler + + def distribute_source_weights( + self, source_weights, src_idx_all_ranks, comm=MPI.COMM_WORLD): + """ This method transfers needed source_weights from root process to each + worker process in communicator *comm*. + + This method needs to be called collectively by all processes in *comm*. + + :arg source_weights: Source weights in tree order on root, None on worker + processes. + :arg src_idx_all_ranks: Returned from *generate_local_tree*. None on worker + processes. + :arg comm: MPI communicator. + :return Source weights needed for the current process. + """ + mpi_rank = comm.Get_rank() + + if mpi_rank == 0: + source_weights = source_weights.get(self.queue) + + local_source_weights = super( + DistribtuedQBXSumpyExpansionWrangler, self + ).distribute_source_weights( + source_weights, src_idx_all_ranks, comm=comm + ) + + return cl.array.to_device(self.queue, local_source_weights) + + def eval_qbx_output_zeros(self): + from pytools.obj_array import make_obj_array + ctt = self.geo_data.center_to_tree_targets() + return make_obj_array([ + cl.array.zeros( + self.queue, + len(ctt.lists), + dtype=self.dtype) + for k in self.code.out_kernels]) + +# }}} + + +# {{{ Traversal builder + +class QBXFMMGeometryDataTraversalBuilder: + def __init__(self, context, well_sep_is_n_away=1, from_sep_smaller_crit=None, + _from_sep_smaller_min_nsources_cumul=0, + _expansions_in_tree_have_extent=True): + self.traversal_builder = FMMTraversalBuilder( + context, + well_sep_is_n_away=well_sep_is_n_away, + from_sep_smaller_crit=from_sep_smaller_crit + ) + self._from_sep_smaller_min_nsources_cumul = \ + _from_sep_smaller_min_nsources_cumul + + def __call__(self, queue, tree, **kwargs): + trav, evt = self.traversal_builder( + queue, tree, + _from_sep_smaller_min_nsources_cumul=( + self._from_sep_smaller_min_nsources_cumul + ), + **kwargs + ) + + return trav, evt + +# }}} + + +# {{{ Distribute QBXFMMGeometryData to each worker rank + +DistributedGlobalQBXFMMGeometryData = namedtuple( + "DistributedGlobalQBXFMMGeometryData", + [ + "global_traversal", "centers", "expansion_radii", "global_qbx_centers", + "qbx_center_to_target_box", "non_qbx_box_target_lists", + "center_to_tree_targets" + ] +) + + +def broadcast_global_geometry_data( + comm, queue, traversal_builder, global_geometry_data): + """Broadcasts useful fields of global geometry data from root to worker ranks, + so that each rank can form local geometry data independently. + + :arg comm: an object of :class:`MPI.Intracomm`, the MPI communicator. + :arg queue: a :class:`pyopencl.CommandQueue` object. + :arg traversal_builder: a :class:`QBXFMMGeometryDataTraversalBuilder` object, + used for constructing the global traversal object. This argument is + significant on all ranks. + :arg global_geometry_data: an object of :class:`ToHostTransferredGeoDataWrapper`, + the global geometry data on host memory. This argument is only significant on + the root rank. + :returns: a :class:`DistributedGlobalQBXFMMGeometryData` object on each worker + rank, representing the broadcasted subset of the global geometry data, used + for constructing the local geometry data independently. See + :func:`compute_local_geometry_data`. + """ + + mpi_rank = comm.Get_rank() + + global_traversal = None + global_tree = None + centers = None + expansion_radii = None + global_qbx_centers = None + qbx_center_to_target_box = None + non_qbx_box_target_lists = None + center_to_tree_targets = None + + if mpi_rank == 0: + global_traversal = global_geometry_data.traversal() + global_tree = global_traversal.tree + + centers = global_geometry_data.centers() + expansion_radii = global_geometry_data.expansion_radii() + global_qbx_centers = global_geometry_data.global_qbx_centers() + qbx_center_to_target_box = global_geometry_data.qbx_center_to_target_box() + non_qbx_box_target_lists = global_geometry_data.non_qbx_box_target_lists() + center_to_tree_targets = global_geometry_data.center_to_tree_targets() + + global_tree = comm.bcast(global_tree, root=0) + global_tree_dev = global_tree.to_device(queue).with_queue(queue) + + centers = comm.bcast(centers, root=0) + expansion_radii = comm.bcast(expansion_radii, root=0) + global_qbx_centers = comm.bcast(global_qbx_centers, root=0) + qbx_center_to_target_box = comm.bcast(qbx_center_to_target_box, root=0) + non_qbx_box_target_lists = comm.bcast(non_qbx_box_target_lists, root=0) + center_to_tree_targets = comm.bcast(center_to_tree_targets, root=0) + + if mpi_rank != 0: + global_traversal, _ = traversal_builder(queue, global_tree_dev) + + if global_tree_dev.targets_have_extent: + global_traversal = global_traversal.merge_close_lists(queue) + + global_traversal = global_traversal.get(queue) + + return DistributedGlobalQBXFMMGeometryData( + global_traversal=global_traversal, + centers=centers, + expansion_radii=expansion_radii, + global_qbx_centers=global_qbx_centers, + qbx_center_to_target_box=qbx_center_to_target_box, + non_qbx_box_target_lists=non_qbx_box_target_lists, + center_to_tree_targets=center_to_tree_targets + ) + + +class LocalQBXFMMGeometryData(DeviceDataRecord): + def non_qbx_box_target_lists(self): + return self._non_qbx_box_target_lists + + def traversal(self): + return self.local_trav + + def tree(self): + return self.traversal().tree + + def centers(self): + return self._local_centers + + @property + def ncenters(self): + return self._local_centers.shape[1] + + def global_qbx_centers(self): + return self._global_qbx_centers + + def expansion_radii(self): + return self._expansion_radii + + def qbx_center_to_target_box(self): + return self._local_qbx_center_to_target_box + + def center_to_tree_targets(self): + return self._local_center_to_tree_targets + + def qbx_targets(self): + return self._qbx_targets + + def qbx_center_to_target_box_source_level(self, source_level): + return self._qbx_center_to_target_box_source_level[source_level] + + @memoize_method + def build_rotation_classes_lists(self): + with cl.CommandQueue(self.cl_context) as queue: + trav = self.traversal().to_device(queue) + tree = self.tree().to_device(queue) + + from boxtree.rotation_classes import RotationClassesBuilder + return RotationClassesBuilder(self.cl_context)( + queue, trav, tree)[0].get(queue) + + def eval_qbx_targets(self): + return self.qbx_targets() + + @memoize_method + def m2l_rotation_lists(self): + return self.build_rotation_classes_lists().from_sep_siblings_rotation_classes + + @memoize_method + def m2l_rotation_angles(self): + return (self + .build_rotation_classes_lists() + .from_sep_siblings_rotation_class_to_angle) + + +def compute_local_geometry_data( + queue, comm, global_geometry_data, boxes_time, traversal_builder): + mpi_rank = comm.Get_rank() + + global_traversal = global_geometry_data.global_traversal + global_tree = global_traversal.tree + centers = global_geometry_data.centers + ncenters = len(centers[0]) + expansion_radii = global_geometry_data.expansion_radii + global_qbx_centers = global_geometry_data.global_qbx_centers + qbx_center_to_target_box = global_geometry_data.qbx_center_to_target_box + non_qbx_box_target_lists = global_geometry_data.non_qbx_box_target_lists + center_to_tree_targets = global_geometry_data.center_to_tree_targets + + # {{{ Generate local tree and local traversal + + from boxtree.distributed.partition import partition_work + responsible_boxes_list = partition_work( + boxes_time, global_traversal, comm.Get_size() + ) + + responsible_box_query = ResponsibleBoxesQuery(queue, global_traversal) + + from boxtree.distributed.local_tree import generate_local_tree + local_tree, src_idx, tgt_idx = generate_local_tree( + queue, global_tree, responsible_boxes_list, responsible_box_query, comm=comm + ) + + src_idx_all_ranks = comm.gather(src_idx, root=0) + tgt_idx_all_ranks = comm.gather(tgt_idx, root=0) + + from boxtree.distributed.local_traversal import generate_local_travs + local_trav = generate_local_travs( + queue, local_tree, traversal_builder, + box_bounding_box={ + "min": global_traversal.box_target_bounding_box_min, + "max": global_traversal.box_target_bounding_box_max + }, + # TODO: get whether to merge close lists from root instead of + # hard-coding? + merge_close_lists=True + ) + + # }}} + + # {{{ Form non_qbx_box_target_lists + + from boxtree.distributed.local_tree import get_fetch_local_particles_knls + knls = get_fetch_local_particles_knls(queue.context, global_tree) + + box_target_starts = cl.array.to_device( + queue, non_qbx_box_target_lists.box_target_starts + ) + box_target_counts_nonchild = cl.array.to_device( + queue, non_qbx_box_target_lists.box_target_counts_nonchild + ) + nfiltered_targets = non_qbx_box_target_lists.nfiltered_targets + targets = non_qbx_box_target_lists.targets + + particle_mask = cl.array.zeros( + queue, (nfiltered_targets,), dtype=global_tree.particle_id_dtype + ) + + responsible_boxes_mask = np.zeros(global_tree.nboxes, dtype=np.int8) + responsible_boxes_mask[responsible_boxes_list[mpi_rank]] = 1 + responsible_boxes_mask = cl.array.to_device(queue, responsible_boxes_mask) + + knls.particle_mask_knl( + responsible_boxes_mask, + box_target_starts, + box_target_counts_nonchild, + particle_mask + ) + + particle_scan = cl.array.empty( + queue, (nfiltered_targets + 1,), + dtype=global_tree.particle_id_dtype + ) + particle_scan[0] = 0 + knls.mask_scan_knl(particle_mask, particle_scan) + + local_box_target_starts = cl.array.empty( + queue, (global_tree.nboxes,), dtype=global_tree.particle_id_dtype + ) + knls.generate_box_particle_starts( + box_target_starts, particle_scan, + local_box_target_starts + ) + + local_box_target_counts_nonchild = cl.array.zeros( + queue, (global_tree.nboxes,), dtype=global_tree.particle_id_dtype + ) + knls.generate_box_particle_counts_nonchild( + responsible_boxes_mask, + box_target_counts_nonchild, + local_box_target_counts_nonchild + ) + + local_nfiltered_targets = int(particle_scan[-1].get(queue)) + + particle_mask = particle_mask.get().astype(bool) + particle_mask_all_ranks = comm.gather(particle_mask, root=0) + local_targets = np.empty((global_tree.dimensions,), dtype=object) + for idimension in range(global_tree.dimensions): + local_targets[idimension] = targets[idimension][particle_mask] + + local_non_qbx_box_target_lists = { + "nfiltered_targets": local_nfiltered_targets, + "box_target_starts": local_box_target_starts.get(), + "box_target_counts_nonchild": + local_box_target_counts_nonchild.get(), + "targets": local_targets + } + + non_qbx_box_target_lists = FilteredTargetListsInTreeOrder( + nfiltered_targets=local_non_qbx_box_target_lists["nfiltered_targets"], + box_target_starts=local_non_qbx_box_target_lists["box_target_starts"], + box_target_counts_nonchild=local_non_qbx_box_target_lists[ + "box_target_counts_nonchild"], + targets=local_non_qbx_box_target_lists["targets"], + unfiltered_from_filtered_target_indices=None + ) + + # }}} + + tgt_mask = np.zeros((global_tree.ntargets,), dtype=bool) + tgt_mask[tgt_idx] = True + + tgt_mask_user_order = tgt_mask[global_tree.sorted_target_ids] + centers_mask = tgt_mask_user_order[:ncenters] + centers_scan = np.empty( + (ncenters + 1,), dtype=global_tree.particle_id_dtype) + centers_scan[1:] = np.cumsum( + centers_mask.astype(global_tree.particle_id_dtype)) + centers_scan[0] = 0 + + # {{{ local centers + + nlocal_centers = np.sum(centers_mask.astype(np.int32)) + centers_dims = centers.shape[0] + local_centers = np.empty( + (centers_dims, nlocal_centers), dtype=centers[0].dtype + ) + for idims in range(centers_dims): + local_centers[idims, :] = centers[idims][centers_mask] + + # }}} + + # {{{ local global_qbx_centers + + local_global_qbx_centers = centers_scan[ + global_qbx_centers[centers_mask[global_qbx_centers]] + ] + + # }}} + + # {{{ local expansion_radii + + local_expansion_radii = expansion_radii[centers_mask] + + # }}} + + # {{{ local qbx_center_to_target_box + + # Transform local qbx_center_to_target_box to global indexing + local_qbx_center_to_target_box = global_traversal.target_boxes[ + qbx_center_to_target_box[centers_mask] + ] + + # Transform local_qbx_center_to_target_box to local target_boxes indexing + global_boxes_to_target_boxes = np.ones( + (local_tree.nboxes,), dtype=local_tree.particle_id_dtype + ) + # make sure accessing invalid position raises an error + global_boxes_to_target_boxes *= -1 + global_boxes_to_target_boxes[local_trav.target_boxes] = \ + np.arange(local_trav.target_boxes.shape[0]) + local_qbx_center_to_target_box = \ + global_boxes_to_target_boxes[local_qbx_center_to_target_box] + + # }}} + + # {{{ local_qbx_targets and local center_to_tree_targets + + starts = center_to_tree_targets.starts + lists = center_to_tree_targets.lists + local_starts = np.empty((nlocal_centers + 1,), dtype=starts.dtype) + local_lists = np.empty(lists.shape, dtype=lists.dtype) + + qbx_target_mask = np.zeros((global_tree.ntargets,), dtype=bool) + current_start = 0 # index into local_lists + ilocal_center = 0 + local_starts[0] = 0 + + for icenter in range(ncenters): + # skip the current center if the current rank is not responsible for + # processing it + if not centers_mask[icenter]: + continue + + current_center_targets = lists[starts[icenter]:starts[icenter + 1]] + qbx_target_mask[current_center_targets] = True + current_stop = current_start + starts[icenter + 1] - starts[icenter] + local_starts[ilocal_center + 1] = current_stop + local_lists[current_start:current_stop] = \ + lists[starts[icenter]:starts[icenter + 1]] + + current_start = current_stop + ilocal_center += 1 + + qbx_target_mask_all_ranks = comm.gather(qbx_target_mask, root=0) + + local_lists = local_lists[:current_start] + + qbx_target_scan = np.empty( + (global_tree.ntargets + 1,), dtype=lists.dtype + ) + qbx_target_scan[0] = 0 + qbx_target_scan[1:] = np.cumsum(qbx_target_mask.astype(lists.dtype)) + nlocal_qbx_target = qbx_target_scan[-1] + + local_qbx_targets = np.empty( + (global_tree.dimensions, nlocal_qbx_target), + dtype=global_tree.targets[0].dtype + ) + for idim in range(global_tree.dimensions): + local_qbx_targets[idim, :] = global_tree.targets[idim][qbx_target_mask] + + local_lists = qbx_target_scan[local_lists] + local_center_to_tree_targets = { + "starts": local_starts, + "lists": local_lists + } + + # }}} + + from pytential.qbx.geometry import CenterToTargetList + local_center_to_tree_targets = CenterToTargetList( + starts=local_center_to_tree_targets["starts"], + lists=local_center_to_tree_targets["lists"] + ) + + # }}} + + # {{{ Construct qbx_center_to_target_box_source_level + + # This is modified from pytential.geometry.QBXFMMGeometryData. + # qbx_center_to_target_box_source_level but on host using Numpy instead of + # PyOpenCL. + + tree = local_trav.tree + + qbx_center_to_target_box_source_level = np.empty( + (tree.nlevels,), dtype=object) + + for source_level in range(tree.nlevels): + sep_smaller = local_trav.from_sep_smaller_by_level[source_level] + + target_box_to_target_box_source_level = np.empty( + len(local_trav.target_boxes), + dtype=tree.box_id_dtype + ) + target_box_to_target_box_source_level.fill(-1) + target_box_to_target_box_source_level[sep_smaller.nonempty_indices] = ( + np.arange(sep_smaller.num_nonempty_lists, + dtype=tree.box_id_dtype) + ) + + qbx_center_to_target_box_source_level[source_level] = ( + target_box_to_target_box_source_level[ + local_qbx_center_to_target_box + ] + ) + + # }}} + + return LocalQBXFMMGeometryData( + cl_context=queue.context, + local_tree=local_tree, + local_trav=local_trav, + _local_centers=local_centers, + _global_qbx_centers=local_global_qbx_centers, + src_idx=src_idx, + tgt_idx=tgt_idx, + src_idx_all_ranks=src_idx_all_ranks, + tgt_idx_all_ranks=tgt_idx_all_ranks, + particle_mask=particle_mask_all_ranks, + qbx_target_mask=qbx_target_mask_all_ranks, + _non_qbx_box_target_lists=non_qbx_box_target_lists, + _local_qbx_center_to_target_box=local_qbx_center_to_target_box, + _expansion_radii=local_expansion_radii, + _qbx_targets=local_qbx_targets, + _local_center_to_tree_targets=local_center_to_tree_targets, + _qbx_center_to_target_box_source_level=( + qbx_center_to_target_box_source_level) + ) + + +def distribute_geo_data(geo_data, queue, boxes_time, comm=MPI.COMM_WORLD): + mpi_rank = comm.Get_rank() + + # {{{ traversal builder + + trav_param = None + if mpi_rank == 0: + trav_param = { + "well_sep_is_n_away": + geo_data.geo_data.code_getter.build_traversal.well_sep_is_n_away, + "from_sep_smaller_crit": + geo_data.geo_data.code_getter.build_traversal. + from_sep_smaller_crit, + "_from_sep_smaller_min_nsources_cumul": + geo_data.geo_data.lpot_source. + _from_sep_smaller_min_nsources_cumul + } + trav_param = comm.bcast(trav_param, root=0) + + # NOTE: The distributed implementation relies on building the same traversal + # objects as the global traversal object of the root rank. This means here + # the traversal builder should use the same parameter as + # `QBXFMMGeometryData.traversal`. + + traversal_builder = QBXFMMGeometryDataTraversalBuilder( + queue.context, + well_sep_is_n_away=trav_param["well_sep_is_n_away"], + from_sep_smaller_crit=trav_param["from_sep_smaller_crit"], + _from_sep_smaller_min_nsources_cumul=trav_param[ + "_from_sep_smaller_min_nsources_cumul" + ] + ) + + # }}} + + # {{{ Broadcast necessary parts of geometry data to worker ranks + + global_geometry_data = broadcast_global_geometry_data( + comm, queue, traversal_builder, geo_data + ) + + if mpi_rank != 0: + nboxes = global_geometry_data.global_traversal.tree.nboxes + boxes_time = np.empty(nboxes, dtype=np.float64) + + comm.Bcast(boxes_time, root=0) + + # }}} + + local_geo_data = compute_local_geometry_data( + queue, comm, global_geometry_data, boxes_time, traversal_builder + ) + + return local_geo_data + + +class DistributedQBXLayerPotentialSource(QBXLayerPotentialSource): + + def __init__(self, *args, **kwargs): + # process communicator argument + comm = kwargs.pop("comm", MPI.COMM_WORLD) + self.comm = comm + current_rank = comm.Get_rank() + + # set fmmlib as default fmm backend + if "fmm_backend" not in kwargs: + kwargs["fmm_backend"] = "fmmlib" + + # "_from_sep_smaller_min_nsources_cumul" will be forced to 0 for distributed + # implementation. If not, the potential contribution of a list 3 box might be + # computed particle-to-particle instead of using its multipole expansion. + # However, the particle information may not be distributed to the target + # rank. + if "_from_sep_smaller_min_nsources_cumul" not in kwargs: + kwargs["_from_sep_smaller_min_nsources_cumul"] = 0 + elif kwargs["_from_sep_smaller_min_nsources_cumul"] != 0: + raise ValueError( + "_from_sep_smaller_min_nsources_cumul has to be 0 for distributed " + "implementation" + ) + + # report geometric parameters + report_parameters = kwargs.pop("report_parameters", False) + self.report_parameters = report_parameters + + self.distributed_geo_data_cache = {} + + if current_rank == 0: + self.next_geo_data_id = 0 + self.arg_to_id = {} + + super(DistributedQBXLayerPotentialSource, self).__init__(*args, **kwargs) + + def copy(self, *args, **kwargs): + comm = kwargs.pop("comm", self.comm) + report_parameters = kwargs.pop("report_parameters", self.report_parameters) + + obj = super(DistributedQBXLayerPotentialSource, self).copy(*args, **kwargs) + + # obj.__class__ = DistributedQBXLayerPotentialSource + obj.comm = comm + obj.report_parameters = report_parameters + + obj.distributed_geo_data_cache = self.distributed_geo_data_cache + if comm.Get_rank() == 0: + obj.next_geo_data_id = self.next_geo_data_id + obj.arg_to_id = self.arg_to_id + + return obj + + def distibuted_geo_data(self, geo_data, queue, wrangler, boxes_time): + """ Note: This method needs to be called collectively by all processes of + self.comm + """ + current_rank = self.comm.Get_rank() + + ''' + TODO: Cache is not working at the moment because workers' layer + potential source is recomputed after each iteration + + if current_rank == 0: + + target_discrs_and_qbx_sides = geo_data.target_discrs_and_qbx_sides + + if target_discrs_and_qbx_sides in self.arg_to_id: + geo_data_id = self.arg_to_id[target_discrs_and_qbx_sides] + else: + geo_data_id = self.next_geo_data_id + self.arg_to_id[target_discrs_and_qbx_sides] = geo_data_id + self.next_geo_data_id += 1 + else: + geo_data_id = None + + geo_data_id = self.comm.bcast(geo_data_id, root=0) + + if geo_data_id == -1: + return None + + if geo_data_id in self.distributed_geo_data_cache: + return self.distributed_geo_data_cache[geo_data_id] + + ''' + + # no cached result found, construct a new distributed_geo_data + if current_rank == 0: + host_geo_data = ToHostTransferredGeoDataWrapper(queue, geo_data) + + if self.report_parameters: + from pytools import Table + table = Table() + table.add_row(["name", "value"]) + + table.add_row(["nsources", host_geo_data.tree().nsources]) + table.add_row(["ntargets", host_geo_data.tree().ntargets]) + table.add_row(["ncenters", host_geo_data.ncenters]) + table.add_row([ + "non-qbx targets", + host_geo_data.non_qbx_box_target_lists().nfiltered_targets + ]) + table.add_row([ + "qbx targets", + host_geo_data.tree().ntargets - host_geo_data.ncenters + ]) + + print(table) + + distributed_geo_data = distribute_geo_data( + host_geo_data, queue, boxes_time, comm=self.comm + ) + + else: + distributed_geo_data = distribute_geo_data( + None, queue, None, self.comm + ) + + # self.distributed_geo_data_cache[geo_data_id] = distributed_geo_data + + return distributed_geo_data + + +# {{{ FMM Driver + +def add_dicts(dict1, dict2): + rtv = {} + + for key in set(dict1) | set(dict2): + if key not in dict1: + rtv[key] = dict2[key] + elif key not in dict2: + rtv[key] = dict1[key] + else: + rtv[key] = dict1[key] + dict2[key] + + return rtv + + +def drive_dfmm(queue, src_weights, distributed_geo_data, global_wrangler, + comm=MPI.COMM_WORLD, + timing_data=None, + _communicate_mpoles_via_allreduce=False): + + current_rank = comm.Get_rank() + total_rank = comm.Get_size() + + if current_rank == 0: + start_time = time.time() + + cls = None + if current_rank == 0: + fmm_backend = global_wrangler.geo_data.lpot_source.fmm_backend + if fmm_backend == "sumpy": + cls = DistribtuedQBXSumpyExpansionWrangler + elif fmm_backend == "fmmlib": + cls = DistribtuedQBXFMMLibExpansionWrangler + else: + raise RuntimeError("Unknown fmm backend") + cls = comm.bcast(cls, root=0) + + wrangler = cls.distribute_wrangler( + queue, global_wrangler, distributed_geo_data, comm=comm + ) + + local_traversal_host = distributed_geo_data.local_trav + + if isinstance(wrangler, DistribtuedQBXSumpyExpansionWrangler): + local_traversal = \ + local_traversal_host.copy().to_device(queue).with_queue(queue) + else: + local_traversal = local_traversal_host + + # {{{ Distribute source weights + + if current_rank == 0: + src_weights = global_wrangler.reorder_sources(src_weights) + + local_source_weights = wrangler.distribute_source_weights( + src_weights, distributed_geo_data.src_idx_all_ranks, comm=comm + ) + + # }}} + + recorder = TimingRecorder() + + # {{{ construct local multipoles + + mpole_exps, timing_future = wrangler.form_multipoles( + local_traversal.level_start_source_box_nrs, + local_traversal.source_boxes, + local_source_weights) + + recorder.add("form_multipoles", timing_future) + + # }}} + + # {{{ propagate multipoles upward + + mpole_exps, timing_future = wrangler.coarsen_multipoles( + local_traversal.level_start_source_parent_box_nrs, + local_traversal.source_parent_boxes, + mpole_exps) + + recorder.add("coarsen_multipoles", timing_future) + + # }}} + + # {{{ Communicate mpoles + + from boxtree.distributed.calculation import communicate_mpoles + + comm_start_time = time.time() + + if isinstance(wrangler, DistribtuedQBXSumpyExpansionWrangler): + mpole_exps_host = mpole_exps.get(queue=queue) + else: + mpole_exps_host = mpole_exps + + if _communicate_mpoles_via_allreduce: + mpole_exps_all = np.zeros_like(mpole_exps_host) + comm.Allreduce(mpole_exps_host, mpole_exps_all) + mpole_exps_host = mpole_exps_all + else: + communicate_mpoles(wrangler, comm, local_traversal_host, mpole_exps_host) + + if isinstance(wrangler, DistribtuedQBXSumpyExpansionWrangler): + mpole_exps[:] = cl.array.to_device(queue, mpole_exps_host) + else: + mpole_exps = mpole_exps_host + + from boxtree.tools import DummyTimingFuture + timing_future = DummyTimingFuture(wall_elapsed=(time.time() - comm_start_time)) + + recorder.add("multipole communication", timing_future) + + # }}} + + # {{{ direct evaluation from neighbor source boxes ("list 1") + + non_qbx_potentials, timing_future = wrangler.eval_direct( + local_traversal.target_boxes, + local_traversal.neighbor_source_boxes_starts, + local_traversal.neighbor_source_boxes_lists, + local_source_weights) + + recorder.add("eval_direct", timing_future) + + # }}} + + # {{{ translate separated siblings' ("list 2") mpoles to local + + local_exps, timing_future = wrangler.multipole_to_local( + local_traversal.level_start_target_or_target_parent_box_nrs, + local_traversal.target_or_target_parent_boxes, + local_traversal.from_sep_siblings_starts, + local_traversal.from_sep_siblings_lists, + mpole_exps) + + recorder.add("multipole_to_local", timing_future) + + # }}} + + # {{{ evaluate sep. smaller mpoles ("list 3") at particles + + # (the point of aiming this stage at particles is specifically to keep its + # contribution *out* of the downward-propagating local expansions) + + mpole_result, timing_future = wrangler.eval_multipoles( + local_traversal.target_boxes_sep_smaller_by_source_level, + local_traversal.from_sep_smaller_by_level, + mpole_exps) + + recorder.add("eval_multipoles", timing_future) + + non_qbx_potentials = non_qbx_potentials + mpole_result + + # assert that list 3 close has been merged into list 1 + assert local_traversal.from_sep_close_smaller_starts is None + + # }}} + + # {{{ form locals for separated bigger source boxes ("list 4") + + local_result, timing_future = wrangler.form_locals( + local_traversal.level_start_target_or_target_parent_box_nrs, + local_traversal.target_or_target_parent_boxes, + local_traversal.from_sep_bigger_starts, + local_traversal.from_sep_bigger_lists, + local_source_weights) + + recorder.add("form_locals", timing_future) + + local_exps = local_exps + local_result + + # assert that list 4 close has been merged into list 1 + assert local_traversal.from_sep_close_bigger_starts is None + + # }}} + + # {{{ propagate local_exps downward + + local_exps, timing_future = wrangler.refine_locals( + local_traversal.level_start_target_or_target_parent_box_nrs, + local_traversal.target_or_target_parent_boxes, + local_exps) + + recorder.add("refine_locals", timing_future) + + # }}} + + # {{{ evaluate locals + + local_result, timing_future = wrangler.eval_locals( + local_traversal.level_start_target_box_nrs, + local_traversal.target_boxes, + local_exps) + + recorder.add("eval_locals", timing_future) + + non_qbx_potentials = non_qbx_potentials + local_result + + # }}} + + # {{{ wrangle qbx expansions + + # form_global_qbx_locals and eval_target_specific_qbx_locals are responsible + # for the same interactions (directly evaluated portion of the potentials + # via unified List 1). Which one is used depends on the wrangler. If one of + # them is unused the corresponding output entries will be zero. + + qbx_expansions, timing_future = \ + wrangler.form_global_qbx_locals(local_source_weights) + + recorder.add("form_global_qbx_locals", timing_future) + + local_result, timing_future = \ + wrangler.translate_box_multipoles_to_qbx_local(mpole_exps) + + recorder.add("translate_box_multipoles_to_qbx_local", timing_future) + + qbx_expansions = qbx_expansions + local_result + + local_result, timing_future = \ + wrangler.translate_box_local_to_qbx_local(local_exps) + + recorder.add("translate_box_local_to_qbx_local", timing_future) + + qbx_expansions = qbx_expansions + local_result + + qbx_potentials, timing_future = wrangler.eval_qbx_expansions(qbx_expansions) + + recorder.add("eval_qbx_expansions", timing_future) + + ts_result, timing_future = \ + wrangler.eval_target_specific_qbx_locals(local_source_weights) + + recorder.add("eval_target_specific_qbx_locals", timing_future) + + qbx_potentials = qbx_potentials + ts_result + + # }}} + + if isinstance(wrangler, DistribtuedQBXSumpyExpansionWrangler): + qbx_potentials = \ + with_object_array_or_scalar(lambda x: x.get(queue), qbx_potentials) + non_qbx_potentials = \ + with_object_array_or_scalar(lambda x: x.get(queue), non_qbx_potentials) + + if current_rank != 0: # worker process + comm.send(non_qbx_potentials, dest=0, tag=MPITags["non_qbx_potentials"]) + comm.send(qbx_potentials, dest=0, tag=MPITags["qbx_potentials"]) + result = None + + else: # master process + + merge_start_time = time.time() + + ndims = len(non_qbx_potentials) + all_potentials_in_tree_order = global_wrangler.full_output_zeros() + nqbtl = global_wrangler.geo_data.non_qbx_box_target_lists() + + from pytools.obj_array import make_obj_array + non_qbx_potentials_all_rank = make_obj_array([ + np.zeros(nqbtl.nfiltered_targets, global_wrangler.dtype) + for _ in range(ndims)] + ) + + for irank in range(total_rank): + + if irank == 0: + non_qbx_potentials_cur_rank = non_qbx_potentials + else: + non_qbx_potentials_cur_rank = comm.recv( + source=irank, tag=MPITags["non_qbx_potentials"]) + + for idim in range(ndims): + non_qbx_potentials_all_rank[idim][ + distributed_geo_data.particle_mask[irank] + ] = non_qbx_potentials_cur_rank[idim] + + if isinstance(wrangler, DistribtuedQBXSumpyExpansionWrangler): + non_qbx_potentials_all_rank = with_object_array_or_scalar( + lambda x: cl.array.to_device(queue, x), non_qbx_potentials_all_rank + ) + + for ap_i, nqp_i in zip( + all_potentials_in_tree_order, non_qbx_potentials_all_rank): + ap_i[nqbtl.unfiltered_from_filtered_target_indices] = nqp_i + + for irank in range(total_rank): + + if irank == 0: + qbx_potentials_cur_rank = qbx_potentials + else: + qbx_potentials_cur_rank = comm.recv( + source=irank, tag=MPITags["qbx_potentials"] + ) + + qbx_target_mask = distributed_geo_data.qbx_target_mask[irank] + + if isinstance(wrangler, DistribtuedQBXSumpyExpansionWrangler): + qbx_potentials_cur_rank = with_object_array_or_scalar( + lambda x: cl.array.to_device(queue, x), + qbx_potentials_cur_rank + ) + + qbx_target_idx = np.arange( + len(qbx_target_mask), + dtype=local_traversal.tree.particle_id_dtype + ) + qbx_target_idx = qbx_target_idx[qbx_target_mask] + qbx_target_idx = cl.array.to_device(queue, qbx_target_idx) + + for idim in range(ndims): + all_potentials_in_tree_order[idim][qbx_target_idx] += ( + qbx_potentials_cur_rank[idim] + ) + else: + for idim in range(ndims): + all_potentials_in_tree_order[idim][qbx_target_mask] += ( + qbx_potentials_cur_rank[idim] + ) + + def reorder_and_finalize_potentials(x): + # "finalize" gives host FMMs (like FMMlib) a chance to turn the + # potential back into a CL array. + return global_wrangler.finalize_potentials( + x[global_wrangler.tree.sorted_target_ids]) + + result = with_object_array_or_scalar( + reorder_and_finalize_potentials, all_potentials_in_tree_order) + + timing_future = DummyTimingFuture( + wall_elapsed=(time.time() - merge_start_time) + ) + recorder.add("merge potentials", timing_future) + + if current_rank == 0: + logger.info("Distributed FMM evaluation finished in {} secs.".format( + time.time() - start_time)) + + if timing_data is not None: + timing_data.update(add_dicts(timing_data, recorder.summarize())) + + return result + +# }}} diff --git a/pytential/qbx/fmm.py b/pytential/qbx/fmm.py index 8b9ea4292868fc7c811f46d8fe8b966555b5784e..5700c452ebc9cba426e3fabd1cafcffe4e9cf71d 100644 --- a/pytential/qbx/fmm.py +++ b/pytential/qbx/fmm.py @@ -341,9 +341,12 @@ QBXFMMGeometryData.non_qbx_box_target_lists`), return (qbx_expansions, SumpyTimingFuture(self.queue, events)) + def eval_qbx_output_zeros(self): + return self.full_output_zeros() + @log_process(logger) def eval_qbx_expansions(self, qbx_expansions): - pot = self.full_output_zeros() + pot = self.eval_qbx_output_zeros() geo_data = self.geo_data events = [] @@ -364,7 +367,7 @@ QBXFMMGeometryData.non_qbx_box_target_lists`), center_to_targets_starts=ctt.starts, center_to_targets_lists=ctt.lists, - targets=self.tree.targets, + targets=geo_data.eval_qbx_targets(), qbx_expansions=qbx_expansions, result=pot, @@ -378,7 +381,7 @@ QBXFMMGeometryData.non_qbx_box_target_lists`), @log_process(logger) def eval_target_specific_qbx_locals(self, src_weights): - return (self.full_output_zeros(), SumpyTimingFuture(self.queue, events=())) + return self.eval_qbx_output_zeros(), SumpyTimingFuture(self.queue, events=()) # }}} @@ -387,8 +390,21 @@ QBXFMMGeometryData.non_qbx_box_target_lists`), # {{{ FMM top-level -def drive_fmm(expansion_wrangler, src_weights, timing_data=None, - traversal=None): +def add_dicts(dict1, dict2): + rtv = {} + + for key in set(dict1) | set(dict2): + if key not in dict1: + rtv[key] = dict2[key] + elif key not in dict2: + rtv[key] = dict1[key] + else: + rtv[key] = dict1[key] + dict2[key] + + return rtv + + +def drive_fmm(expansion_wrangler, src_weights, timing_data=None, traversal=None): """Top-level driver routine for the QBX fast multipole calculation. :arg geo_data: A :class:`QBXFMMGeometryData` instance. @@ -591,7 +607,8 @@ def drive_fmm(expansion_wrangler, src_weights, timing_data=None, fmm_proc.done() if timing_data is not None: - timing_data.update(recorder.summarize()) + timing_data.update(add_dicts(timing_data, recorder.summarize())) + return result # }}} diff --git a/pytential/qbx/fmmlib.py b/pytential/qbx/fmmlib.py index 98df707e296fd8e0d128edb1dafcae7e7f6b7fa1..6ebf7d10f6729f51c4948891bcb8221fcda7f495 100644 --- a/pytential/qbx/fmmlib.py +++ b/pytential/qbx/fmmlib.py @@ -225,6 +225,9 @@ class QBXFMMLibExpansionWrangler(FMMLibExpansionWrangler): np.zeros(self.tree.ntargets, self.dtype) for k in self.outputs]) + def eval_qbx_output_zeros(self): + return self.full_output_zeros() + def reorder_sources(self, source_array): if isinstance(source_array, cl.array.Array): source_array = source_array.get(queue=self.queue) @@ -547,7 +550,7 @@ class QBXFMMLibExpansionWrangler(FMMLibExpansionWrangler): @log_process(logger) @return_timing_data def eval_qbx_expansions(self, qbx_expansions): - output = self.full_output_zeros() + output = self.eval_qbx_output_zeros() geo_data = self.geo_data ctt = geo_data.center_to_tree_targets() @@ -555,7 +558,7 @@ class QBXFMMLibExpansionWrangler(FMMLibExpansionWrangler): qbx_centers = geo_data.centers() qbx_radii = geo_data.expansion_radii() - all_targets = geo_data.all_targets() + all_targets = geo_data.eval_qbx_targets() taeval = self.get_expn_eval_routine("ta") @@ -582,9 +585,12 @@ class QBXFMMLibExpansionWrangler(FMMLibExpansionWrangler): @log_process(logger) @return_timing_data def eval_target_specific_qbx_locals(self, src_weights): + output = self.eval_qbx_output_zeros() + if not self.using_tsqbx: - return self.full_output_zeros() + return output + noutput_targets = len(output[0]) geo_data = self.geo_data trav = geo_data.traversal() @@ -599,9 +605,9 @@ class QBXFMMLibExpansionWrangler(FMMLibExpansionWrangler): ifgrad = self.ifgrad # Create temporary output arrays for potential / gradient. - pot = np.zeros(self.tree.ntargets, np.complex) if ifpot else None + pot = np.zeros(noutput_targets, np.complex) if ifpot else None grad = ( - np.zeros((self.dim, self.tree.ntargets), np.complex) + np.zeros((self.dim, noutput_targets), np.complex) if ifgrad else None) ts.eval_target_specific_qbx_locals( @@ -611,7 +617,7 @@ class QBXFMMLibExpansionWrangler(FMMLibExpansionWrangler): ifdipole=ifdipole, order=self.qbx_order, sources=self._get_single_sources_array(), - targets=geo_data.all_targets(), + targets=geo_data.eval_qbx_targets(), centers=self._get_single_centers_array(), qbx_centers=geo_data.global_qbx_centers(), qbx_center_to_target_box=geo_data.qbx_center_to_target_box(), @@ -628,7 +634,6 @@ class QBXFMMLibExpansionWrangler(FMMLibExpansionWrangler): pot=pot, grad=grad) - output = self.full_output_zeros() self.add_potgrad_onto_output(output, slice(None), pot, grad) return output diff --git a/pytential/qbx/geometry.py b/pytential/qbx/geometry.py index 7a991ddfa2b17c6f5b6fa3dc81d1c8d9fdecab06..b93e5406c59a162707d313ef3d1e718e962c6f3e 100644 --- a/pytential/qbx/geometry.py +++ b/pytential/qbx/geometry.py @@ -701,6 +701,9 @@ class QBXFMMGeometryData(FMMLibRotationDataInterface): return result.with_queue(None) + def eval_qbx_targets(self): + return self.tree().targets + @memoize_method @log_process(logger) def global_qbx_centers(self): diff --git a/pytential/qbx/utils.py b/pytential/qbx/utils.py index b872152a2002ddec5678073e15255c25842db4ed..a6cc68f4e1a234995b59237baa4c90e55d28bbf4 100644 --- a/pytential/qbx/utils.py +++ b/pytential/qbx/utils.py @@ -31,6 +31,7 @@ from boxtree.tree import Tree import pyopencl as cl import pyopencl.array # noqa from pytools import memoize_method +from pytools.obj_array import with_object_array_or_scalar from boxtree.pyfmmlib_integration import FMMLibRotationDataInterface import logging @@ -450,6 +451,9 @@ class ToHostTransferredGeoDataWrapper(FMMLibRotationDataInterface): """All (not just non-QBX) targets packaged into a single array.""" return np.array(list(self.tree().targets)) + def eval_qbx_targets(self): + return self.all_targets() + def m2l_rotation_lists(self): # Already on host return self.geo_data.m2l_rotation_lists() @@ -460,4 +464,40 @@ class ToHostTransferredGeoDataWrapper(FMMLibRotationDataInterface): # }}} + +# {{{ + +def _transform_dict(f, old_dict): + new_dict = {} + + for key in old_dict: + if isinstance(old_dict[key], np.ndarray) and old_dict[key].dtype == object: + new_dict[key] = with_object_array_or_scalar(f, old_dict[key]) + else: + new_dict[key] = f(key) + + return new_dict + + +def sumpy_wrangler_extra_kwargs_to_host(wrangler_kwargs, queue): + def to_host(attr): + if not isinstance(attr, cl.array.Array): + return attr + + return attr.get(queue=queue) + + return _transform_dict(to_host, wrangler_kwargs) + + +def sumpy_wrangler_extra_kwargs_to_device(wrangler_kwargs, queue): + def to_device(attr): + if not isinstance(attr, np.ndarray): + return attr + + return cl.array.to_device(queue, attr).with_queue(queue) + + return _transform_dict(to_device, wrangler_kwargs) + +# }}} + # vim: foldmethod=marker:filetype=pyopencl diff --git a/pytential/solve.py b/pytential/solve.py index c2e932432b8db59a2734d0c8225f1edd500f84bc..1a6a230961dd52801278de0e76eb7baa60550a70 100644 --- a/pytential/solve.py +++ b/pytential/solve.py @@ -347,6 +347,222 @@ def gmres(op, rhs, restart=None, tol=None, x0=None, return result.copy(solution=chopper.chop(result.solution)) + +gmres_status = { + "EVALUATE": 0, + "TERMINATE": 1 +} + + +def _distributed_gmres_master(comm, A, b, + restart=None, tol=None, x0=None, dot=None, # noqa + maxiter=None, hard_failure=None, + require_monotonicity=True, no_progress_factor=None, + stall_iterations=None, callback=None): + + # {{{ input processing + + n, _ = A.shape + + if not callable(A): + a_call = A.matvec + else: + a_call = A + + if restart is None: + restart = min(n, 20) + + if tol is None: + tol = 1e-5 + + if maxiter is None: + maxiter = 2*n + + if hard_failure is None: + hard_failure = True + + if stall_iterations is None: + stall_iterations = 10 + if no_progress_factor is None: + no_progress_factor = 1.25 + + # }}} + + def norm(x): + return np.sqrt(abs(dot(x, x))) + + if x0 is None: + x = 0*b + r = b + recalc_r = False + else: + x = x0 + del x0 + recalc_r = True + + Ae = [None]*restart # noqa + e = [None]*restart + + k = 0 + + norm_b = norm(b) + last_resid_norm = None + residual_norms = [] + + for iteration in range(maxiter): + # restart if required + if k == restart: + k = 0 + orth_count = restart + else: + orth_count = k + + # recalculate residual every 10 steps + if recalc_r: + comm.bcast(gmres_status["EVALUATE"], root=0) + r = b - a_call(x) + + norm_r = norm(r) + residual_norms.append(norm_r) + + if callback is not None: + callback(r) + + if abs(norm_r) < tol*norm_b: + comm.bcast(gmres_status["TERMINATE"], root=0) + return GMRESResult(solution=x, + residual_norms=residual_norms, + iteration_count=iteration, success=True, + state="success") + if last_resid_norm is not None: + if norm_r > 1.25*last_resid_norm: + state = "non-monotonic residuals" + if require_monotonicity: + if hard_failure: + raise GMRESError(state) + else: + comm.bcast(gmres_status["TERMINATE"], root=0) + return GMRESResult(solution=x, + residual_norms=residual_norms, + iteration_count=iteration, success=False, + state=state) + else: + print("*** WARNING: non-monotonic residuals in GMRES") + + if (stall_iterations + and len(residual_norms) > stall_iterations + and norm_r > ( + residual_norms[-stall_iterations] # noqa pylint:disable=invalid-unary-operand-type + / no_progress_factor)): + + state = "stalled" + if hard_failure: + raise GMRESError(state) + else: + comm.bcast(gmres_status["TERMINATE"], root=0) + return GMRESResult(solution=x, + residual_norms=residual_norms, + iteration_count=iteration, success=False, + state=state) + + last_resid_norm = norm_r + + # initial new direction guess + comm.bcast(gmres_status["EVALUATE"], root=0) + w = a_call(r) + + # {{{ double-orthogonalize the new direction against preceding ones + + rp = r + + for orth_trips in range(2): + for j in range(0, orth_count): + d = dot(Ae[j], w) + w = w - d * Ae[j] + rp = rp - d * e[j] + + # normalize + d = 1/norm(w) + w = d*w + rp = d*rp + + # }}} + + Ae[k] = w + e[k] = rp + + # update the residual and solution + d = dot(Ae[k], r) + + recalc_r = (iteration+1) % 10 == 0 + if not recalc_r: + r = r - d*Ae[k] + + x = x + d*e[k] + + k += 1 + + state = "max iterations" + if hard_failure: + raise GMRESError(state) + else: + comm.bcast(gmres_status["TERMINATE"], root=0) + return GMRESResult(solution=x, + residual_norms=residual_norms, + iteration_count=iteration, success=False, + state=state) + + +def distributed_gmres(comm, op, rhs, restart=None, tol=None, x0=None, + inner_product=None, + maxiter=None, hard_failure=None, + no_progress_factor=None, stall_iterations=None, + callback=None, progress=False): + + # TODO: Not sure if this is sufficient to ensure op is evaluating distributed + # layer potentials + from pytential.symbolic.execution import DistributedMatVecOp + if not isinstance(op, DistributedMatVecOp): + raise NotImplementedError("Only support op of DistributedMatVecOp class") + + if comm.Get_rank() == 0: + amod = get_array_module(rhs) + + chopper = VectorChopper(rhs) + stacked_rhs = chopper.stack(rhs) + + if inner_product is None: + inner_product = amod.vdot + + if callback is None: + if progress: + callback = ResidualPrinter(inner_product) + else: + callback = None + + result = _distributed_gmres_master( + comm, op, stacked_rhs, restart=restart, tol=tol, x0=x0, + dot=inner_product, + maxiter=maxiter, hard_failure=hard_failure, + no_progress_factor=no_progress_factor, + stall_iterations=stall_iterations, callback=callback + ) + + return result.copy(solution=chopper.chop(result.solution)) + else: + while True: + gmres_current_status = comm.bcast(None, root=0) + + if gmres_current_status == gmres_status["TERMINATE"]: + break + else: + if not callable(op): + a_call = op.matvec + else: + a_call = op + + a_call(None) + # }}} # }}} diff --git a/pytential/symbolic/compiler.py b/pytential/symbolic/compiler.py index 3fcfbf2ec6eb4c42cb2a6d1af24bdb0bb1eca297..fc93d0ec81ebe69b09143ca12e8d6587a99ba07a 100644 --- a/pytential/symbolic/compiler.py +++ b/pytential/symbolic/compiler.py @@ -417,6 +417,88 @@ class Code(object): # }}} + +class DistributedCode(Code): + def __init__(self, comm, instructions, result): + Code.__init__(self, instructions, result) + self.comm = comm + + def execute(self, exec_mapper, pre_assign_check=None): + insn_type = { + "ASSIGN": 0, + "COMPUTE_POTENTIAL": 1, + "TERMINAL": 2 + } + + if self.comm.Get_rank() == 0: + context = exec_mapper.context + + done_insns = set() + + while True: + try: + insn, discardable_vars = self.get_next_step( + frozenset(list(context.keys())), + frozenset(done_insns)) + + except self.NoInstructionAvailable: + # no available instructions: we're done + self.comm.bcast(insn_type["TERMINAL"], root=0) + break + else: + for name in discardable_vars: + del context[name] + + if isinstance(insn, ComputePotentialInstruction): + self.comm.bcast(insn_type["COMPUTE_POTENTIAL"], root=0) + else: + self.comm.bcast(insn_type["ASSIGN"], root=0) + + done_insns.add(insn) + assignments = ( + self.get_exec_function(insn, exec_mapper) + (exec_mapper.queue, insn, exec_mapper.bound_expr, + exec_mapper)) + + assignees = insn.get_assignees() + for target, value in assignments: + if pre_assign_check is not None: + pre_assign_check(target, value) + + assert target in assignees + context[target] = value + + if len(done_insns) < len(self.instructions): + print("Unreachable instructions:") + for insn in set(self.instructions) - done_insns: + print(" ", str(insn).replace("\n", "\n ")) + from pymbolic import var + print(" missing: ", ", ".join( + str(s) for s in + set(insn.get_dependencies()) + - set(var(v) for v in six.iterkeys(context)))) + + raise RuntimeError("not all instructions are reachable" + "--did you forget to pass a value for a placeholder?") + + from pytools.obj_array import with_object_array_or_scalar + return with_object_array_or_scalar(exec_mapper, self.result) + + else: + while True: + current_insn_type = self.comm.bcast(None, root=0) + + if current_insn_type == insn_type["ASSIGN"]: + continue + elif current_insn_type == insn_type["TERMINAL"]: + break + elif current_insn_type == insn_type["COMPUTE_POTENTIAL"]: + exec_mapper.exec_compute_potential_insn( + exec_mapper.queue, None, None, None + ) + else: + raise RuntimeError("Unknown instruction type") + # }}} diff --git a/pytential/symbolic/execution.py b/pytential/symbolic/execution.py index aeba89e53c8b711b67b68466f93b877659ce44d8..4b3747fd1258ccdc81e70acf891c0544181037aa 100644 --- a/pytential/symbolic/execution.py +++ b/pytential/symbolic/execution.py @@ -27,6 +27,7 @@ THE SOFTWARE. import six from six.moves import zip +import copy from pymbolic.mapper.evaluator import ( EvaluationMapper as PymbolicEvaluationMapper) @@ -342,6 +343,60 @@ class EvaluationMapper(EvaluationMapperBase): return result + +class DistributedEvaluationMapper(EvaluationMapper): + def __init__(self, comm, bound_expr, queue, context=None, timing_data=None): + """ + Note :arg timing_data: has to be None or a valid dict across ranks. It could + lead to deadlock if some ranks have a valid dict but others are None. + """ + self.comm = comm + + if comm.Get_rank() == 0: + EvaluationMapper.__init__(self, bound_expr, queue, context, timing_data) + else: + self.timing_data = timing_data + self.queue = queue + + def exec_compute_potential_insn(self, queue, insn, bound_expr, evaluate): + return_timing_data = self.timing_data is not None + if return_timing_data: + insn_str = self.comm.bcast(str(insn), root=0) + + if self.comm.Get_rank() == 0: + result = EvaluationMapper.exec_compute_potential_insn( + self, queue, insn, bound_expr, evaluate + ) + else: + # TODO: use evaluation mapper for launching FMM in the following code + from pytential.qbx.distributed import DistributedQBXLayerPotentialSource + lp_source = DistributedQBXLayerPotentialSource(self.comm, None, None) + + distribute_geo_data = lp_source.distibuted_geo_data( + None, queue, None, None + ) + + if return_timing_data: + timing_data = {} + else: + timing_data = None + + from pytential.qbx.distributed import drive_dfmm + weights = None + global_wrangler = None + result = drive_dfmm( + queue, weights, distribute_geo_data, global_wrangler, + comm=self.comm, timing_data=timing_data + ) + + if return_timing_data: + # The compiler ensures this. + assert insn not in self.timing_data + + self.timing_data[insn_str] = timing_data + + return result + # }}} @@ -458,6 +513,31 @@ class MatVecOp: return result + +class DistributedMatVecOp(MatVecOp): + def __init__(self, + comm, bound_expr, queue, arg_name, dtype, total_dofs, + starts_and_ends, extra_args): + self.comm = comm + MatVecOp.__init__( + self, bound_expr, queue, arg_name, dtype, total_dofs, + starts_and_ends, extra_args + ) + + @classmethod + def from_mat_vec_op(cls, comm, mat_vec_op): + distributed_mat_vec_op = copy.copy(mat_vec_op) + distributed_mat_vec_op.__class__ = cls + distributed_mat_vec_op.comm = comm + + return distributed_mat_vec_op + + def matvec(self, x): + if self.comm.Get_rank() == 0: + return MatVecOp.matvec(self, x) + else: + return self.bound_expr(self.queue) + # }}} @@ -948,6 +1028,56 @@ class BoundExpression(object): return self.eval(queue, args) +class DistributedBoundExpression(BoundExpression): + def __init__(self, comm, places, sym_op_expr): + self.comm = comm + rank = comm.Get_rank() + + from pytential.symbolic.compiler import DistributedCode + + if rank == 0: + BoundExpression.__init__(self, places, sym_op_expr) + self.code = DistributedCode( + comm, self.code.instructions, self.code.result + ) + else: + self.code = DistributedCode(comm, None, None) + + def cost_per_stage(self, queue, calibration_params, **args): + if self.comm.Get_rank() == 0: + return BoundExpression.cost_per_stage( + self, queue, calibration_params, **args + ) + else: + raise RuntimeError("Cost model is not available on worker nodes") + + def cost_per_box(self, queue, calibration_params, **args): + if self.comm.Get_rank() == 0: + return BoundExpression.cost_per_box( + self, queue, calibration_params, **args + ) + else: + raise RuntimeError("Cost model is not available on worker nodes") + + def scipy_op(self, queue, arg_name, dtype, domains=None, **extra_args): + if self.comm.Get_rank() == 0: + mat_vec_op = BoundExpression.scipy_op( + self, queue, arg_name, dtype, domains=None, **extra_args + ) + else: + mat_vec_op = MatVecOp(self, queue, None, None, None, None, None) + + return DistributedMatVecOp.from_mat_vec_op(self.comm, mat_vec_op) + + def eval(self, queue, context=None, timing_data=None): + if context is None: + context = {} + exec_mapper = DistributedEvaluationMapper( + self.comm, self, queue, context, timing_data=timing_data + ) + return self.code.execute(exec_mapper) + + def bind(places, expr, auto_where=None): """ :arg places: a :class:`~pytential.symbolic.execution.GeometryCollection`. @@ -968,6 +1098,22 @@ def bind(places, expr, auto_where=None): return BoundExpression(places, expr) + +def bind_distributed(comm, places, expr, auto_where=None): + """ + Same as bind, except `places` and `expr` can be None on worker ranks. + """ + + if comm.Get_rank() == 0: + if not isinstance(places, GeometryCollection): + places = GeometryCollection(places, auto_where=auto_where) + expr = _prepare_expr(places, expr) + else: + places = None + expr = None + + return DistributedBoundExpression(comm, places, expr) + # }}} diff --git a/requirements.txt b/requirements.txt index 625deb28d7e5a04253bc3ffb5c12b2ba263362b5..f8d0764d1cb473e07b642db3e300010182717c96 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ git+https://github.com/inducer/modepy git+https://github.com/inducer/pyopencl git+https://github.com/inducer/islpy git+https://github.com/inducer/loopy -git+https://gitlab.tiker.net/inducer/boxtree +git+https://gitlab.tiker.net/inducer/boxtree@distributed-fmm-global git+https://github.com/inducer/meshmode git+https://gitlab.tiker.net/inducer/sumpy git+https://gitlab.tiker.net/inducer/pyfmmlib diff --git a/test/test_distributed.py b/test/test_distributed.py new file mode 100644 index 0000000000000000000000000000000000000000..243c6093d29351bae302166cf7d01f52e5c3840d --- /dev/null +++ b/test/test_distributed.py @@ -0,0 +1,337 @@ +import numpy as np +import numpy.linalg as la # noqa +import pyopencl as cl +import pyopencl.clmath # noqa +from pyopencl.tools import ( # noqa + pytest_generate_tests_for_pyopencl as pytest_generate_tests) + +from meshmode.mesh.generation import make_curve_mesh, ellipse +from sumpy.visualization import FieldPlotter +from pytential import bind, sym, GeometryCollection +from boxtree.tools import run_mpi + +import pytest +from functools import partial +import sys +import os + +from mpi4py import MPI +comm = MPI.COMM_WORLD +rank = comm.Get_rank() +size = comm.Get_size() + +import logging +logger = logging.getLogger(__name__) + + +# {{{ test off-surface eval + +def _test_off_surface_eval( + ctx_factory, use_fmm, do_plot=False, fmm_backend='fmmlib'): + logging.basicConfig(level=logging.INFO) + + cl_ctx = ctx_factory() + queue = cl.CommandQueue(cl_ctx) + + # prevent cache 'splosion + from sympy.core.cache import clear_cache + clear_cache() + + nelements = 30 + target_order = 8 + qbx_order = 3 + if use_fmm: + fmm_order = qbx_order + else: + fmm_order = False + + if rank == 0: + + mesh = make_curve_mesh(partial(ellipse, 3), + np.linspace(0, 1, nelements+1), + target_order) + + from pytential.qbx.distributed import DistributedQBXLayerPotentialSource + from meshmode.discretization import Discretization + from meshmode.discretization.poly_element import \ + InterpolatoryQuadratureSimplexGroupFactory + + pre_density_discr = Discretization( + cl_ctx, mesh, InterpolatoryQuadratureSimplexGroupFactory(target_order)) + qbx = DistributedQBXLayerPotentialSource( + pre_density_discr, + fine_order=4*target_order, + qbx_order=qbx_order, + fmm_order=fmm_order, + comm=comm, + knl_specific_calibration_params="constant_one", + fmm_backend=fmm_backend + ) + + from pytential.target import PointsTarget + fplot = FieldPlotter(np.zeros(2), extent=0.54, npoints=30) + targets = PointsTarget(fplot.points) + + places = GeometryCollection((qbx, targets)) + density_discr = places.get_discretization(places.auto_source.geometry) + + from sumpy.kernel import LaplaceKernel + op = sym.D(LaplaceKernel(2), sym.var("sigma"), qbx_forced_limit=-2) + + sigma = density_discr.zeros(queue) + 1 + qbx_ctx = {"sigma": sigma} + + else: + places = None + op = None + qbx_ctx = {} + + from pytential.symbolic.execution import bind_distributed + fld_in_vol = bind_distributed(comm, places, op)(queue, **qbx_ctx) + + if rank == 0: + # test against shared memory result + + from pytential.qbx import QBXLayerPotentialSource + qbx = QBXLayerPotentialSource( + pre_density_discr, + 4 * target_order, + qbx_order, + fmm_order=fmm_order, + _from_sep_smaller_min_nsources_cumul=0 + ) + + places = GeometryCollection((qbx, targets)) + fld_in_vol_single_node = bind(places, op)(queue, **qbx_ctx) + + linf_err = cl.array.max( + cl.clmath.fabs(fld_in_vol - fld_in_vol_single_node) + ) + + assert linf_err < 1e-13 + + # test against analytical solution + + err = cl.clmath.fabs(fld_in_vol - (-1)) + + linf_err = cl.array.max(err).get() + print("l_inf error:", linf_err) + + if do_plot: + fplot.show_scalar_in_matplotlib(fld_in_vol.get()) + import matplotlib.pyplot as pt + pt.colorbar() + pt.show() + + assert linf_err < 1e-3 + + +@pytest.mark.mpi +@pytest.mark.parametrize("num_processes, use_fmm, fmm_backend", [ + (4, True, 'fmmlib'), + (4, True, 'sumpy') +]) +@pytest.mark.skipif(sys.version_info < (3, 5), + reason="distributed implementation requires 3.5 or higher") +def test_off_surface_eval( + num_processes, use_fmm, fmm_backend, do_plot=False): + pytest.importorskip("mpi4py") + + newenv = os.environ.copy() + newenv["PYTEST"] = "1" + newenv["OMP_NUM_THREADS"] = "1" + newenv["POCL_MAX_PTHREAD_COUNT"] = "1" + newenv["use_fmm"] = str(use_fmm) + newenv["do_plot"] = str(do_plot) + newenv["fmm_backend"] = fmm_backend + + run_mpi(__file__, num_processes, newenv) + +# }}} + + +# {{{ compare on-surface urchin geometry against single-rank result + +def single_layer_wrapper(kernel): + u_sym = sym.var("u") + return sym.S(kernel, u_sym, qbx_forced_limit=-1) + + +def double_layer_wrapper(kernel): + u_sym = sym.var("u") + return sym.D(kernel, u_sym, qbx_forced_limit="avg") + + +def _test_urchin_against_single_rank( + ctx_factory, m, n, op_wrapper, fmm_backend, use_tsqbx): + logging.basicConfig(level=logging.INFO) + + qbx_order = 3 + fmm_order = 10 + target_order = 8 + est_rel_interp_tolerance = 1e-10 + _expansion_stick_out_factor = 0.5 + + ctx = ctx_factory() + queue = cl.CommandQueue(ctx) + + # prevent cache 'splosion + from sympy.core.cache import clear_cache + clear_cache() + + if rank == 0: + from meshmode.mesh.generation import generate_urchin + mesh = generate_urchin(target_order, m, n, est_rel_interp_tolerance) + d = mesh.ambient_dim + + from sumpy.kernel import LaplaceKernel + k_sym = LaplaceKernel(d) + op = op_wrapper(k_sym) + + from meshmode.discretization import Discretization + from meshmode.discretization.poly_element import \ + InterpolatoryQuadratureSimplexGroupFactory + + pre_density_discr = Discretization( + ctx, mesh, + InterpolatoryQuadratureSimplexGroupFactory(target_order) + ) + + params = { + "qbx_order": qbx_order, + "fmm_order": fmm_order, + "_from_sep_smaller_min_nsources_cumul": 0, + "_expansions_in_tree_have_extent": True, + "_expansion_stick_out_factor": _expansion_stick_out_factor, + "_use_target_specific_qbx": use_tsqbx, + "fmm_backend": fmm_backend + } + + from pytential.qbx.distributed import DistributedQBXLayerPotentialSource + qbx = DistributedQBXLayerPotentialSource( + density_discr=pre_density_discr, + fine_order=4 * target_order, + comm=comm, + knl_specific_calibration_params="constant_one", + **params + ) + + places = GeometryCollection(qbx) + density_discr = places.get_discretization(places.auto_source.geometry) + + # {{{ compute values of a solution to the PDE + + nodes_host = density_discr.nodes().get(queue) + + center = np.array([3, 1, 2])[:d] + diff = nodes_host - center[:, np.newaxis] + dist_squared = np.sum(diff ** 2, axis=0) + dist = np.sqrt(dist_squared) + if d == 2: + u = np.log(dist) + grad_u = diff / dist_squared + elif d == 3: + u = 1 / dist + grad_u = -diff / dist ** 3 + else: + assert False + + # }}} + + u_dev = cl.array.to_device(queue, u) + grad_u_dev = cl.array.to_device(queue, grad_u) + context = {'u': u_dev, 'grad_u': grad_u_dev} + else: + places = None + op = None + context = {} + + from pytential.symbolic.execution import bind_distributed + bound_op = bind_distributed(comm, places, op) + distributed_result = bound_op(queue, **context) + + if rank == 0: + from pytential.qbx import QBXLayerPotentialSource + qbx = QBXLayerPotentialSource( + density_discr=pre_density_discr, + fine_order=4 * target_order, + **params + ) + places = GeometryCollection(qbx) + + single_node_result = bind(places, op)(queue, **context) + + distributed_result = distributed_result.get() + single_node_result = single_node_result.get() + + linf_err = la.norm(distributed_result - single_node_result, ord=np.inf) + + print("l_inf error:", linf_err) + + assert linf_err < 1e-13 + + +@pytest.mark.mpi +@pytest.mark.parametrize("num_processes, m, n, op_wrapper, fmm_backend, use_tsqbx", [ + (4, 1, 3, "single_layer_wrapper", "fmmlib", True), + (4, 1, 3, "single_layer_wrapper", "sumpy", False), + (4, 1, 3, "double_layer_wrapper", "fmmlib", True), + (4, 1, 3, "double_layer_wrapper", "sumpy", False), +]) +@pytest.mark.skipif(sys.version_info < (3, 5), + reason="distributed implementation requires 3.5 or higher") +def test_urchin_against_single_rank( + num_processes, m, n, op_wrapper, fmm_backend, use_tsqbx): + pytest.importorskip("mpi4py") + + newenv = os.environ.copy() + newenv["PYTEST"] = "2" + newenv["OMP_NUM_THREADS"] = "1" + newenv["POCL_MAX_PTHREAD_COUNT"] = "1" + newenv["m"] = str(m) + newenv["n"] = str(n) + newenv["op_wrapper"] = op_wrapper + newenv["fmm_backend"] = fmm_backend + newenv["use_tsqbx"] = str(use_tsqbx) + + run_mpi(__file__, num_processes, newenv) + +# }}} + + +if __name__ == "__main__": + if "PYTEST" in os.environ: + if os.environ["PYTEST"] == "1": + # Run "test_off_surface_eval" test case + use_fmm = (os.environ["use_fmm"] == 'True') + do_plot = (os.environ["do_plot"] == 'True') + fmm_backend = os.environ["fmm_backend"] + + _test_off_surface_eval( + cl.create_some_context, use_fmm, + do_plot=do_plot, fmm_backend=fmm_backend + ) + elif os.environ["PYTEST"] == "2": + # Run "test_urchin_against_single_rank" test case + m = int(os.environ["m"]) + n = int(os.environ["n"]) + op_wrapper_str = os.environ["op_wrapper"] + fmm_backend = os.environ["fmm_backend"] + use_tsqbx = (os.environ["use_tsqbx"] == 'True') + + if op_wrapper_str == "single_layer_wrapper": + op_wrapper = single_layer_wrapper + elif op_wrapper_str == "double_layer_wrapper": + op_wrapper = double_layer_wrapper + else: + raise ValueError("unknown op wrapper") + + _test_urchin_against_single_rank( + cl.create_some_context, m, n, op_wrapper, fmm_backend, use_tsqbx + ) + else: + if len(sys.argv) > 1: + # You can test individual routines by typing + # $ python test_distributed.py 'test_off_surface_eval(4, True, "fmmlib", + # True)' + exec(sys.argv[1])