diff --git a/examples/dagrt_fusion/fusion-study.py b/examples/dagrt_fusion/fusion-study.py index ce12f1691a5fbfa5c9dff7228a5d61784b1321f9..4c451ef63189e8be5a8218f920574671405f76f1 100644 --- a/examples/dagrt_fusion/fusion-study.py +++ b/examples/dagrt_fusion/fusion-study.py @@ -25,11 +25,14 @@ THE SOFTWARE. import logging import numpy as np +import six import pyopencl as cl +import pyopencl.array # noqa import dagrt.language as lang import pymbolic.primitives as p import grudge.symbolic.mappers as gmap +from grudge.execution import ExecutionMapper from pymbolic.mapper.evaluator import EvaluationMapper \ as PymbolicEvaluationMapper @@ -68,6 +71,8 @@ def topological_sort(stmts, root_deps): # }}} +# {{{ leap to grudge translation + # Use evaluation, not identity mappers to propagate symbolic vectors to # outermost level. @@ -174,43 +179,10 @@ def transcribe_phase(dag, field_var_name, field_components, phase_name, return output_vars, [ctx[ov] for ov in output_vars], yielded_states +# }}} -def get_strong_wave_op_with_discr(cl_ctx, dims=3, order=4): - from meshmode.mesh.generation import generate_regular_rect_mesh - mesh = generate_regular_rect_mesh( - a=(-0.5,)*dims, - b=(0.5,)*dims, - n=(16,)*dims) - - logger.info("%d elements" % mesh.nelements) - - discr = DGDiscretizationWithBoundaries(cl_ctx, mesh, order=order) - - source_center = np.array([0.1, 0.22, 0.33])[:dims] - source_width = 0.05 - source_omega = 3 - - sym_x = sym.nodes(mesh.dim) - sym_source_center_dist = sym_x - source_center - sym_t = sym.ScalarVariable("t") - - from grudge.models.wave import StrongWaveOperator - from meshmode.mesh import BTAG_ALL, BTAG_NONE - op = StrongWaveOperator(-0.1, dims, - source_f=( - sym.sin(source_omega*sym_t) - * sym.exp( - -np.dot(sym_source_center_dist, sym_source_center_dist) - / source_width**2)), - dirichlet_tag=BTAG_NONE, - neumann_tag=BTAG_NONE, - radiation_tag=BTAG_ALL, - flux_type="upwind") - - op.check_bc_coverage(mesh) - - return (op, discr) +# {{{ time integrator implementations class RK4TimeStepperBase(object): @@ -234,7 +206,8 @@ class RK4TimeStepperBase(object): "input_residual": flattened_fields, } - def set_up_stepper(self, discr, field_var_name, sym_rhs, num_fields): + def set_up_stepper(self, discr, field_var_name, sym_rhs, num_fields, + exec_mapper_factory=ExecutionMapper): dt_method = LSRK4Method(component_id=field_var_name) dt_code = dt_method.generate() self.field_var_name = field_var_name @@ -257,13 +230,45 @@ class RK4TimeStepperBase(object): from pytools.obj_array import join_fields flattened_results = join_fields(output_t, output_dt, *output_states) - self.bound_op = bind(discr, flattened_results) + self.bound_op = bind( + discr, flattened_results, exec_mapper_factory=exec_mapper_factory) + + def run(self, fields, t_start, dt, t_end, return_profile_data=False): + context = self.get_initial_context(fields, t_start, dt) + + t = t_start + + while t <= t_end: + if return_profile_data: + profile_data = dict() + else: + profile_data = None + + results = self.bound_op( + self.queue, + profile_data=profile_data, + **context) + + if return_profile_data: + results = results[0] + + t = results[0] + context["input_t"] = t + context["input_dt"] = results[1] + output_states = results[2:] + context[self.state_name] = output_states + + result = (t, self.component_getter(output_states)) + if return_profile_data: + result += (profile_data,) + + yield result class RK4TimeStepper(RK4TimeStepperBase): def __init__(self, queue, discr, field_var_name, grudge_bound_op, - num_fields, component_getter): + num_fields, component_getter, exec_mapper_factory=ExecutionMapper): from pymbolic import var # Construct sym_rhs to have the effect of replacing the RHS calls in the @@ -283,62 +288,82 @@ class RK4TimeStepper(RK4TimeStepperBase): self.queue = queue self.grudge_bound_op = grudge_bound_op - self.set_up_stepper(discr, field_var_name, sym_rhs, num_fields) + self.set_up_stepper(discr, field_var_name, sym_rhs, num_fields, exec_mapper_factory) self.component_getter = component_getter - def _bound_op(self, t, *args): + def _bound_op(self, t, *args, profile_data=None): from pytools.obj_array import join_fields context = { "t": t, self.field_var_name: join_fields(*args)} - return self.grudge_bound_op(self.queue, **context) + result = self.grudge_bound_op( + self.queue, profile_data=profile_data, **context) + if profile_data is not None: + result = result[0] + return result def get_initial_context(self, fields, t_start, dt): context = super().get_initial_context(fields, t_start, dt) context["grudge_op"] = self._bound_op return context - def run(self, fields, t_start, dt, t_end): - context = self.get_initial_context(fields, t_start, dt) - - t = t_start - - while t <= t_end: - results = self.bound_op(self.queue, **context) - t = results[0] - context["input_t"] = t - context["input_dt"] = results[1] - output_states = results[2:] - context[self.state_name] = output_states - yield (t, self.component_getter(output_states)) - class FusedRK4TimeStepper(RK4TimeStepperBase): def __init__(self, queue, discr, field_var_name, sym_rhs, num_fields, - component_getter): + component_getter, exec_mapper_factory=ExecutionMapper): self.queue = queue - self.set_up_stepper(discr, field_var_name, sym_rhs, num_fields) + self.set_up_stepper( + discr, field_var_name, sym_rhs, num_fields, exec_mapper_factory) self.component_getter = component_getter - def run(self, fields, t_start, dt, t_end): - context = self.get_initial_context(fields, t_start, dt) +# }}} - t = t_start - while t <= t_end: - results = self.bound_op(self.queue, **context) - t = results[0] - context["input_t"] = t - context["input_dt"] = results[1] - output_states = results[2:] - context[self.state_name] = output_states - yield (t, self.component_getter(output_states)) +# {{{ problem setup code + +def get_strong_wave_op_with_discr(cl_ctx, dims=3, order=4): + from meshmode.mesh.generation import generate_regular_rect_mesh + mesh = generate_regular_rect_mesh( + a=(-0.5,)*dims, + b=(0.5,)*dims, + n=(16,)*dims) + + logger.info("%d elements" % mesh.nelements) + + discr = DGDiscretizationWithBoundaries(cl_ctx, mesh, order=order) + + source_center = np.array([0.1, 0.22, 0.33])[:dims] + source_width = 0.05 + source_omega = 3 + + sym_x = sym.nodes(mesh.dim) + sym_source_center_dist = sym_x - source_center + sym_t = sym.ScalarVariable("t") + + from grudge.models.wave import StrongWaveOperator + from meshmode.mesh import BTAG_ALL, BTAG_NONE + op = StrongWaveOperator(-0.1, dims, + source_f=( + sym.sin(source_omega*sym_t) + * sym.exp( + -np.dot(sym_source_center_dist, sym_source_center_dist) + / source_width**2)), + dirichlet_tag=BTAG_NONE, + neumann_tag=BTAG_NONE, + radiation_tag=BTAG_ALL, + flux_type="upwind") + + op.check_bc_coverage(mesh) + + return (op, discr) def get_strong_wave_component(state_component): return (state_component[0], state_component[1:]) +# }}} + # {{{ equivalence check @@ -370,7 +395,7 @@ def test_stepper_equivalence(order=4): t_start = 0 t_end = 0.5 - nsteps = int(t_end/dt) + nsteps = int(np.ceil((t_end + 1e-9) / dt)) print("dt=%g nsteps=%d" % (dt, nsteps)) step = 0 @@ -381,7 +406,7 @@ def test_stepper_equivalence(order=4): for t_ref, (u_ref, v_ref) in stepper.run(ic, t_start, dt, t_end): step += 1 - logger.info("step %d", step) + logger.info("step %d/%d", step, nsteps) t, (u, v) = next(fused_steps) assert t == t_ref, step assert norm(queue, u=u, u_ref=u_ref) <= 1e-13, step @@ -389,5 +414,128 @@ def test_stepper_equivalence(order=4): # }}} +# {{{ mem op counter implementation + +class MemOpCountingExecutionMapper(ExecutionMapper): + + def __init__(self, queue, context, bound_op): + super().__init__(queue, context, bound_op) + + # {{{ expression mappings + + def map_common_subexpression(self, expr): + raise ValueError("CSE not expected") + + def map_profiled_external_call(self, expr, profile_data): + from pymbolic.primitives import Variable + assert isinstance(expr.function, Variable) + args = [self.rec(p) for p in expr.parameters] + return self.context[expr.function.name](*args, profile_data=profile_data) + + # }}} + + # {{{ instruction mappings + + def map_insn_assign(self, insn, profile_data): + result = [] + for name, expr in zip(insn.names, insn.exprs): + if isinstance(expr, sym.ExternalCall): + assert expr.mapper_method == "map_external_call" + val = self.map_profiled_external_call(expr, profile_data) + else: + val = self.rec(expr) + result.append((name, val)) + return result, [] + + def map_insn_loopy_kernel(self, insn, profile_data): + kwargs = {} + kdescr = insn.kernel_descriptor + for name, expr in six.iteritems(kdescr.input_mappings): + val = self.rec(expr) + kwargs[name] = val + assert not isinstance(val, np.ndarray) + if profile_data is not None and isinstance(val, pyopencl.array.Array): + profile_data["bytes_read"] = ( + profile_data.get("bytes_read", 0) + val.nbytes) + + discr = self.discrwb.discr_from_dd(kdescr.governing_dd) + for name in kdescr.scalar_args(): + v = kwargs[name] + if isinstance(v, (int, float)): + kwargs[name] = discr.real_dtype.type(v) + elif isinstance(v, complex): + kwargs[name] = discr.complex_dtype.type(v) + elif isinstance(v, np.number): + pass + else: + raise ValueError("unrecognized scalar type for variable '%s': %s" + % (name, type(v))) + + kwargs["grdg_n"] = discr.nnodes + evt, result_dict = kdescr.loopy_kernel(self.queue, **kwargs) + + for val in result_dict.values(): + assert not isinstance(val, np.ndarray) + if profile_data is not None and isinstance(val, pyopencl.array.Array): + profile_data["bytes_written"] = ( + profile_data.get("bytes_written", 0) + val.nbytes) + + return list(result_dict.items()), [] + + # }}} + +# }}} + + +# {{{ mem op counter check + +def test_stepper_mem_ops(): + cl_ctx = cl.create_some_context() + queue = cl.CommandQueue(cl_ctx) + + dims = 2 + op, discr = get_strong_wave_op_with_discr(cl_ctx, dims=2, order=3) + + t_start = 0 + dt = 0.04 + t_end = 0.2 + + from pytools.obj_array import join_fields + ic = join_fields(discr.zeros(queue), + [discr.zeros(queue) for i in range(discr.dim)]) + + bound_op = bind( + discr, op.sym_operator(), + exec_mapper_factory=MemOpCountingExecutionMapper) + + if 1: + stepper = RK4TimeStepper( + queue, discr, "w", bound_op, 1 + discr.dim, + get_strong_wave_component, + exec_mapper_factory=MemOpCountingExecutionMapper) + + else: + stepper = FusedRK4TimeStepper( + queue, discr, "w", op.sym_operator(), 1 + discr.dim, + get_strong_wave_component, + exec_mapper_factory=MemOpCountingExecutionMapper) + + step = 0 + + norm = bind(discr, sym.norm(2, sym.var("u_ref") - sym.var("u"))) + + nsteps = int(np.ceil((t_end + 1e-9) / dt)) + for (_, _, profile_data) in stepper.run( + ic, t_start, dt, t_end, return_profile_data=True): + step += 1 + logger.info("step %d/%d", step, nsteps) + + print("bytes read", profile_data["bytes_read"]) + print("bytes written", profile_data["bytes_written"]) + +# }}} + + if __name__ == "__main__": - test_stepper_equivalence() + #test_stepper_equivalence() + test_stepper_mem_ops()