diff --git a/examples/dagrt_fusion/fusion-study.py b/examples/dagrt_fusion/fusion-study.py index 4c451ef63189e8be5a8218f920574671405f76f1..ab33eb9105c0cc4d441ccf597594fb86a6c34051 100644 --- a/examples/dagrt_fusion/fusion-study.py +++ b/examples/dagrt_fusion/fusion-study.py @@ -1,6 +1,13 @@ +#!/usr/bin/env python3 +"""A study of operator fusion for time integration operators in Grudge. +""" + from __future__ import division, print_function -__copyright__ = "Copyright (C) 2015 Andreas Kloeckner" +__copyright__ = """ +Copyright (C) 2015 Andreas Kloeckner +Copyright (C) 2019 Matt Wala +""" __license__ = """ Permission is hereby granted, free of charge, to any person obtaining a copy @@ -28,13 +35,17 @@ import numpy as np import six import pyopencl as cl import pyopencl.array # noqa +import pytest import dagrt.language as lang +import loopy as lp import pymbolic.primitives as p import grudge.symbolic.mappers as gmap +import grudge.symbolic.operators as op from grudge.execution import ExecutionMapper from pymbolic.mapper.evaluator import EvaluationMapper \ as PymbolicEvaluationMapper +from pytools import memoize_in from grudge import sym, bind, DGDiscretizationWithBoundaries from leap.rk import LSRK4Method @@ -44,6 +55,8 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + # {{{ topological sort @@ -100,6 +113,19 @@ class GrudgeArgSubstitutor(gmap.SymbolicEvaluator): def transcribe_phase(dag, field_var_name, field_components, phase_name, sym_operator): + """Arguments: + + dag: The Dagrt code for the time integrator + + field_var_name: The name of the simulation variable + + field_components: The number of components (fields) in the variable + + phase_name: The name of the phase to transcribe in the Dagrt code + + sym_operator: The Grudge symbolic expression to substitue for the + right-hand side evaluation in the Dagrt code + """ sym_operator = gmap.OperatorBinder()(sym_operator) phase = dag.phases[phase_name] @@ -269,6 +295,19 @@ class RK4TimeStepper(RK4TimeStepperBase): def __init__(self, queue, discr, field_var_name, grudge_bound_op, num_fields, component_getter, exec_mapper_factory=ExecutionMapper): + """Arguments: + + field_var_name: The name of the simulation variable + + grudge_bound_op: The BoundExpression for the right-hand side + + num_fields: The number of components in the simulation variable + + component_getter: A function, which, given an object array + representing the simulation variable, splits the array into + its components + + """ from pymbolic import var # Construct sym_rhs to have the effect of replacing the RHS calls in the @@ -322,14 +361,14 @@ class FusedRK4TimeStepper(RK4TimeStepperBase): # {{{ problem setup code -def get_strong_wave_op_with_discr(cl_ctx, dims=3, order=4): +def get_strong_wave_op_with_discr(cl_ctx, dims=2, order=4): from meshmode.mesh.generation import generate_regular_rect_mesh mesh = generate_regular_rect_mesh( a=(-0.5,)*dims, b=(0.5,)*dims, n=(16,)*dims) - logger.info("%d elements" % mesh.nelements) + logger.debug("%d elements" % mesh.nelements) discr = DGDiscretizationWithBoundaries(cl_ctx, mesh, order=order) @@ -365,7 +404,7 @@ def get_strong_wave_component(state_component): # }}} -# {{{ equivalence check +# {{{ equivalence check between fused and non-fused versions def test_stepper_equivalence(order=4): cl_ctx = cl.create_some_context() @@ -406,7 +445,7 @@ def test_stepper_equivalence(order=4): for t_ref, (u_ref, v_ref) in stepper.run(ic, t_start, dt, t_end): step += 1 - logger.info("step %d/%d", step, nsteps) + logger.debug("step %d/%d", step, nsteps) t, (u, v) = next(fused_steps) assert t == t_ref, step assert norm(queue, u=u, u_ref=u_ref) <= 1e-13, step @@ -417,14 +456,17 @@ def test_stepper_equivalence(order=4): # {{{ mem op counter implementation class MemOpCountingExecutionMapper(ExecutionMapper): + # This is a skeleton implementation that only has just enough functionality + # for the wave-min example to work. def __init__(self, queue, context, bound_op): super().__init__(queue, context, bound_op) - # {{{ expression mappings + def map_external_call(self, expr): + # Should have been caught by our op counter + assert False, ("map_external_call called: %s" % expr) - def map_common_subexpression(self, expr): - raise ValueError("CSE not expected") + # {{{ expressions def map_profiled_external_call(self, expr, profile_data): from pymbolic.primitives import Variable @@ -432,19 +474,56 @@ class MemOpCountingExecutionMapper(ExecutionMapper): args = [self.rec(p) for p in expr.parameters] return self.context[expr.function.name](*args, profile_data=profile_data) + def map_profiled_essentially_elementwise_linear(self, op, field_expr, profile_data): + result = getattr(self, op.mapper_method)(op, field_expr) + + if profile_data is not None: + # We model the cost to load the input and write the output. In + # particular, we assume the elementwise matrices are negligible in + # size and thus ignorable. + + field = self.rec(field_expr) + profile_data["bytes_read"] = ( + profile_data.get("bytes_read", 0) + field.nbytes) + profile_data["bytes_written"] = ( + profile_data.get("bytes_written", 0) + result.nbytes) + + return result + # }}} - + # {{{ instruction mappings - + + def process_assignment_expr(self, expr, profile_data): + if isinstance(expr, sym.ExternalCall): + assert expr.mapper_method == "map_external_call" + val = self.map_profiled_external_call(expr, profile_data) + + elif isinstance(expr, sym.OperatorBinding): + if isinstance( + expr.op, + ( + # TODO: Not comprehensive. + op.InterpolationOperator, + op.RefFaceMassOperator, + op.RefInverseMassOperator, + op.OppositeInteriorFaceSwap)): + val = self.map_profiled_essentially_elementwise_linear( + expr.op, expr.field, profile_data) + + else: + assert False, ("unknown operator: %s" % expr.op) + + else: + logger.debug("assignment not profiled: %s", expr) + val = self.rec(expr) + + return val + def map_insn_assign(self, insn, profile_data): result = [] for name, expr in zip(insn.names, insn.exprs): - if isinstance(expr, sym.ExternalCall): - assert expr.mapper_method == "map_external_call" - val = self.map_profiled_external_call(expr, profile_data) - else: - val = self.rec(expr) - result.append((name, val)) + result.append((name, self.process_assignment_expr(expr, profile_data))) return result, [] def map_insn_loopy_kernel(self, insn, profile_data): @@ -482,6 +561,42 @@ class MemOpCountingExecutionMapper(ExecutionMapper): return list(result_dict.items()), [] + def map_insn_assign_to_discr_scoped(self, insn, profile_data=None): + assignments = [] + + for name, expr in zip(insn.names, insn.exprs): + logger.debug("assignment not profiled: %s <- %s", name, expr) + value = self.rec(expr) + self.discrwb._discr_scoped_subexpr_name_to_value[name] = value + assignments.append((name, value)) + + return assignments, [] + + def map_insn_assign_from_discr_scoped(self, insn, profile_data=None): + return [(insn.name, + self.discrwb._discr_scoped_subexpr_name_to_value[insn.name])], [] + + def map_insn_rank_data_swap(self, insn, profile_data): + raise NotImplementedError("no profiling for instruction: %s" % insn) + + def map_insn_diff_batch_assign(self, insn, profile_data): + assignments, futures = super().map_insn_diff_batch_assign(insn) + + if profile_data is not None: + # We model the cost to load the input and write the output. In + # particular, we assume the elementwise matrices are negligible in + # size and thus ignorable. + + field = self.rec(insn.field) + profile_data["bytes_read"] = ( + profile_data.get("bytes_read", 0) + field.nbytes) + + for _, value in assignments: + profile_data["bytes_written"] = ( + profile_data.get("bytes_writte", 0) + value.nbytes) + + return assignments, futures + # }}} # }}} @@ -489,7 +604,8 @@ class MemOpCountingExecutionMapper(ExecutionMapper): # {{{ mem op counter check -def test_stepper_mem_ops(): +@pytest.mark.parametrize("use_fusion", (True, False)) +def test_stepper_mem_ops(use_fusion=True): cl_ctx = cl.create_some_context() queue = cl.CommandQueue(cl_ctx) @@ -508,7 +624,7 @@ def test_stepper_mem_ops(): discr, op.sym_operator(), exec_mapper_factory=MemOpCountingExecutionMapper) - if 1: + if not use_fusion: stepper = RK4TimeStepper( queue, discr, "w", bound_op, 1 + discr.dim, get_strong_wave_component, @@ -528,10 +644,10 @@ def test_stepper_mem_ops(): for (_, _, profile_data) in stepper.run( ic, t_start, dt, t_end, return_profile_data=True): step += 1 - logger.info("step %d/%d", step, nsteps) + logger.debug("step %d/%d", step, nsteps) - print("bytes read", profile_data["bytes_read"]) - print("bytes written", profile_data["bytes_written"]) + logger.info("bytes read: %d", profile_data["bytes_read"]) + logger.info("bytes written: %d", profile_data["bytes_written"]) # }}}