Skip to content
Snippets Groups Projects
dagrt-fusion.py 42.5 KiB
Newer Older
  • Learn to ignore specific revisions
  • Matt Wala's avatar
    Matt Wala committed
    #!/usr/bin/env python3
    
    Matt Wala's avatar
    Matt Wala committed
    """Study of operator fusion (inlining) for time integration operators in Grudge.
    
    from __future__ import division, print_function
    
    
    Matt Wala's avatar
    Matt Wala committed
    __copyright__ = """
    Copyright (C) 2015 Andreas Kloeckner
    Copyright (C) 2019 Matt Wala
    """
    
    
    __license__ = """
    Permission is hereby granted, free of charge, to any person obtaining a copy
    of this software and associated documentation files (the "Software"), to deal
    in the Software without restriction, including without limitation the rights
    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    copies of the Software, and to permit persons to whom the Software is
    furnished to do so, subject to the following conditions:
    
    The above copyright notice and this permission notice shall be included in
    all copies or substantial portions of the Software.
    
    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
    THE SOFTWARE.
    """
    
    
    
    import contextlib
    
    import logging
    import numpy as np
    
    Matt Wala's avatar
    Matt Wala committed
    import six
    
    import pyopencl as cl
    
    Matt Wala's avatar
    Matt Wala committed
    import pyopencl.array  # noqa
    
    Matt Wala's avatar
    Matt Wala committed
    import pytest
    
    
    import dagrt.language as lang
    import pymbolic.primitives as p
    import grudge.symbolic.mappers as gmap
    
    import grudge.symbolic.operators as sym_op
    
    Matt Wala's avatar
    Matt Wala committed
    from grudge.execution import ExecutionMapper
    
    from grudge.function_registry import base_function_registry
    
    from pymbolic.mapper import Mapper
    
    from pymbolic.mapper.evaluator import EvaluationMapper \
            as PymbolicEvaluationMapper
    
    from pytools import memoize
    
    from pytools.obj_array import join_fields
    
    
    from grudge import sym, bind, DGDiscretizationWithBoundaries
    
    from leap.rk import LSRK4MethodBuilder
    
    Matt Wala's avatar
    Matt Wala committed
    from pyopencl.tools import (  # noqa
            pytest_generate_tests_for_pyopencl as pytest_generate_tests)
    
    
    
    logging.basicConfig(level=logging.INFO)
    
    logger = logging.getLogger(__name__)
    
    
    SKIP_TESTS = int(os.environ.get("SKIP_TESTS", 0))
    PAPER_OUTPUT = int(os.environ.get("PAPER_OUTPUT", 0))
    OUT_DIR = os.environ.get("OUT_DIR", ".")
    
    
    
    @contextlib.contextmanager
    
    def open_output_file(filename):
        if not PAPER_OUTPUT:
    
    Matt Wala's avatar
    Matt Wala committed
            yield sys.stdout
    
    Matt Wala's avatar
    Matt Wala committed
            sys.stdout.flush()
    
            try:
                outfile = open(os.path.join(OUT_DIR, filename), "w")
                yield outfile
            finally:
                outfile.close()
    
    
    # {{{ topological sort
    
    def topological_sort(stmts, root_deps):
        id_to_stmt = {stmt.id: stmt for stmt in stmts}
    
        ordered_stmts = []
        satisfied = set()
    
        def satisfy_dep(name):
            if name in satisfied:
                return
    
            stmt = id_to_stmt[name]
    
    Matt Wala's avatar
    Matt Wala committed
            for dep in sorted(stmt.depends_on):
    
                satisfy_dep(dep)
            ordered_stmts.append(stmt)
            satisfied.add(name)
    
        for d in root_deps:
            satisfy_dep(d)
    
        return ordered_stmts
    
    # }}}
    
    
    
    Matt Wala's avatar
    Matt Wala committed
    # {{{ leap to grudge translation
    
    
    # Use evaluation, not identity mappers to propagate symbolic vectors to
    # outermost level.
    
    class DagrtToGrudgeRewriter(PymbolicEvaluationMapper):
        def __init__(self, context):
            self.context = context
    
        def map_variable(self, expr):
            return self.context[expr.name]
    
        def map_call(self, expr):
            raise ValueError("function call not expected")
    
    
    class GrudgeArgSubstitutor(gmap.SymbolicEvaluator):
        def __init__(self, args):
            super().__init__(context={})
            self.args = args
    
        def map_grudge_variable(self, expr):
            if expr.name in self.args:
                return self.args[expr.name]
    
    Matt Wala's avatar
    Matt Wala committed
            return super().map_variable(expr)
    
    
    
    def transcribe_phase(dag, field_var_name, field_components, phase_name,
                         sym_operator):
    
    Matt Wala's avatar
    Matt Wala committed
        """Generate a Grudge operator for a Dagrt time integrator phase.
    
    Matt Wala's avatar
    Matt Wala committed
        Arguments:
    
            dag: The Dagrt code object for the time integrator
    
    Matt Wala's avatar
    Matt Wala committed
    
            field_var_name: The name of the simulation variable
    
            field_components: The number of components (fields) in the variable
    
    
    Matt Wala's avatar
    Matt Wala committed
            phase_name: The name of the phase to transcribe
    
    Matt Wala's avatar
    Matt Wala committed
    
            sym_operator: The Grudge symbolic expression to substitue for the
                right-hand side evaluation in the Dagrt code
        """
    
        sym_operator = gmap.OperatorBinder()(sym_operator)
        phase = dag.phases[phase_name]
    
        ctx = {
                "<t>": sym.var("input_t", sym.DD_SCALAR),
                "<dt>": sym.var("input_dt", sym.DD_SCALAR),
                f"<state>{field_var_name}": sym.make_sym_array(
                    f"input_{field_var_name}", field_components),
    
                "<p>residual": sym.make_sym_array(
    
                    "input_residual", field_components),
        }
    
        rhs_name = f"<func>{field_var_name}"
        output_vars = [v for v in ctx]
        yielded_states = []
    
        from dagrt.codegen.transform import isolate_function_calls_in_phase
        ordered_stmts = topological_sort(
                isolate_function_calls_in_phase(
                    phase,
                    dag.get_stmt_id_generator(),
                    dag.get_var_name_generator()).statements,
                phase.depends_on)
    
        for stmt in ordered_stmts:
            if stmt.condition is not True:
                raise NotImplementedError(
                    "non-True condition (in statement '%s') not supported"
                    % stmt.id)
    
            if isinstance(stmt, lang.Nop):
                pass
    
    
    Matt Wala's avatar
    Matt Wala committed
            elif isinstance(stmt, lang.Assign):
    
                if not isinstance(stmt.lhs, p.Variable):
                    raise NotImplementedError("lhs of statement %s is not a variable: %s"
                            % (stmt.id, stmt.lhs))
                ctx[stmt.lhs.name] = sym.cse(
    
                        DagrtToGrudgeRewriter(ctx)(stmt.rhs),
                        (
                            stmt.lhs.name
                            .replace("<", "")
                            .replace(">", "")))
    
    
            elif isinstance(stmt, lang.AssignFunctionCall):
                if stmt.function_id != rhs_name:
                    raise NotImplementedError(
                            "statement '%s' calls unsupported function '%s'"
                            % (stmt.id, stmt.function_id))
    
                if stmt.parameters:
                    raise NotImplementedError(
                        "statement '%s' calls function '%s' with positional arguments"
                        % (stmt.id, stmt.function_id))
    
                kwargs = {name: sym.cse(DagrtToGrudgeRewriter(ctx)(arg))
                          for name, arg in stmt.kw_parameters.items()}
    
                if len(stmt.assignees) != 1:
                    raise NotImplementedError(
                        "statement '%s' calls function '%s' "
                        "with more than one LHS"
                        % (stmt.id, stmt.function_id))
    
                assignee, = stmt.assignees
                ctx[assignee] = GrudgeArgSubstitutor(kwargs)(sym_operator)
    
            elif isinstance(stmt, lang.YieldState):
                d2g = DagrtToGrudgeRewriter(ctx)
                yielded_states.append(
    
                        (
                            stmt.time_id,
                            d2g(stmt.time),
                            stmt.component_id,
                            d2g(stmt.expression)))
    
    
            else:
                raise NotImplementedError("statement %s is of unsupported type ''%s'"
                            % (stmt.id, type(stmt).__name__))
    
        return output_vars, [ctx[ov] for ov in output_vars], yielded_states
    
    
    Matt Wala's avatar
    Matt Wala committed
    # }}}
    
    Matt Wala's avatar
    Matt Wala committed
    # {{{ time integrator implementations
    
    
    class RK4TimeStepperBase(object):
    
    
    Matt Wala's avatar
    Matt Wala committed
        def __init__(self, queue, component_getter):
            self.queue = queue
            self.component_getter = component_getter
    
    
        def get_initial_context(self, fields, t_start, dt):
    
            # Flatten fields.
            flattened_fields = []
            for field in fields:
                if isinstance(field, list):
                    flattened_fields.extend(field)
                else:
                    flattened_fields.append(field)
            flattened_fields = join_fields(*flattened_fields)
            del fields
    
            return {
                    "input_t": t_start,
                    "input_dt": dt,
                    self.state_name: flattened_fields,
                    "input_residual": flattened_fields,
            }
    
    
    Matt Wala's avatar
    Matt Wala committed
        def set_up_stepper(self, discr, field_var_name, sym_rhs, num_fields,
    
                           function_registry=base_function_registry,
    
    Matt Wala's avatar
    Matt Wala committed
                           exec_mapper_factory=ExecutionMapper):
    
            dt_method = LSRK4MethodBuilder(component_id=field_var_name)
    
            dt_code = dt_method.generate()
            self.field_var_name = field_var_name
            self.state_name = f"input_{field_var_name}"
    
            # Transcribe the phase.
            output_vars, results, yielded_states = transcribe_phase(
                    dt_code, field_var_name, num_fields,
                    "primary", sym_rhs)
    
            # Build the bound operator for the time integrator.
            output_t = results[0]
            output_dt = results[1]
            output_states = results[2]
            output_residuals = results[3]
    
            assert len(output_states) == num_fields
            assert len(output_states) == len(output_residuals)
    
            flattened_results = join_fields(output_t, output_dt, *output_states)
    
    
    Matt Wala's avatar
    Matt Wala committed
            self.bound_op = bind(
    
                    discr, flattened_results,
                    function_registry=function_registry,
                    exec_mapper_factory=exec_mapper_factory)
    
    Matt Wala's avatar
    Matt Wala committed
    
        def run(self, fields, t_start, dt, t_end, return_profile_data=False):
            context = self.get_initial_context(fields, t_start, dt)
    
            t = t_start
    
            while t <= t_end:
                if return_profile_data:
                    profile_data = dict()
                else:
                    profile_data = None
    
                results = self.bound_op(
                        self.queue,
                        profile_data=profile_data,
                        **context)
    
                if return_profile_data:
                    results = results[0]
    
                t = results[0]
                context["input_t"] = t
                context["input_dt"] = results[1]
                output_states = results[2:]
                context[self.state_name] = output_states
    
                result = (t, self.component_getter(output_states))
                if return_profile_data:
                    result += (profile_data,)
    
                yield result
    
    
    
    class RK4TimeStepper(RK4TimeStepperBase):
    
        def __init__(self, queue, discr, field_var_name, grudge_bound_op,
    
    Matt Wala's avatar
    Matt Wala committed
                     num_fields, component_getter, exec_mapper_factory=ExecutionMapper):
    
    Matt Wala's avatar
    Matt Wala committed
            """Arguments:
    
                field_var_name: The name of the simulation variable
    
                grudge_bound_op: The BoundExpression for the right-hand side
    
                num_fields: The number of components in the simulation variable
    
                component_getter: A function, which, given an object array
                   representing the simulation variable, splits the array into
                   its components
    
            """
    
    Matt Wala's avatar
    Matt Wala committed
            super().__init__(queue, component_getter)
    
    
            # Construct sym_rhs to have the effect of replacing the RHS calls in the
            # dagrt code with calls of the grudge operator.
    
            from grudge.symbolic.primitives import FunctionSymbol, Variable
    
            call = sym.cse(
                    FunctionSymbol("grudge_op")(*(
    
                        (Variable("t", dd=sym.DD_SCALAR),)
                        + tuple(
                            Variable(field_var_name, dd=sym.DD_VOLUME)[i]
    
                            for i in range(num_fields)))))
    
            sym_rhs = join_fields(*(call[i] for i in range(num_fields)))
    
            self.queue = queue
            self.grudge_bound_op = grudge_bound_op
    
            from grudge.function_registry import register_external_function
    
    
            freg = register_external_function(
                    base_function_registry,
                    "grudge_op",
                    implementation=self._bound_op,
                    dd=sym.DD_VOLUME)
    
    
    Matt Wala's avatar
    Matt Wala committed
            self.set_up_stepper(
    
                    discr, field_var_name, sym_rhs, num_fields,
                    freg,
                    exec_mapper_factory)
    
    
            self.component_getter = component_getter
    
    
        def _bound_op(self, queue, t, *args, profile_data=None):
    
            context = {
                    "t": t,
                    self.field_var_name: join_fields(*args)}
    
    Matt Wala's avatar
    Matt Wala committed
            result = self.grudge_bound_op(
    
                    queue, profile_data=profile_data, **context)
    
    Matt Wala's avatar
    Matt Wala committed
            if profile_data is not None:
                result = result[0]
            return result
    
    
        def get_initial_context(self, fields, t_start, dt):
            context = super().get_initial_context(fields, t_start, dt)
            context["grudge_op"] = self._bound_op
            return context
    
    
    class FusedRK4TimeStepper(RK4TimeStepperBase):
    
        def __init__(self, queue, discr, field_var_name, sym_rhs, num_fields,
    
    Matt Wala's avatar
    Matt Wala committed
                     component_getter, exec_mapper_factory=ExecutionMapper):
    
    Matt Wala's avatar
    Matt Wala committed
            super().__init__(queue, component_getter)
    
    Matt Wala's avatar
    Matt Wala committed
            self.set_up_stepper(
    
                    discr, field_var_name, sym_rhs, num_fields,
                    base_function_registry,
                    exec_mapper_factory)
    
    Matt Wala's avatar
    Matt Wala committed
    # }}}
    
    Matt Wala's avatar
    Matt Wala committed
    # {{{ problem setup code
    
    
    Matt Wala's avatar
    Matt Wala committed
    def get_strong_wave_op_with_discr(cl_ctx, dims=2, order=4):
    
    Matt Wala's avatar
    Matt Wala committed
        from meshmode.mesh.generation import generate_regular_rect_mesh
        mesh = generate_regular_rect_mesh(
                a=(-0.5,)*dims,
                b=(0.5,)*dims,
    
    Matt Wala's avatar
    Matt Wala committed
                n=(16,)*dims)
    
        logger.debug("%d elements", mesh.nelements)
    
    Matt Wala's avatar
    Matt Wala committed
    
        discr = DGDiscretizationWithBoundaries(cl_ctx, mesh, order=order)
    
        source_center = np.array([0.1, 0.22, 0.33])[:dims]
        source_width = 0.05
        source_omega = 3
    
        sym_x = sym.nodes(mesh.dim)
        sym_source_center_dist = sym_x - source_center
        sym_t = sym.ScalarVariable("t")
    
        from grudge.models.wave import StrongWaveOperator
        from meshmode.mesh import BTAG_ALL, BTAG_NONE
        op = StrongWaveOperator(-0.1, dims,
                source_f=(
                    sym.sin(source_omega*sym_t)
                    * sym.exp(
                        -np.dot(sym_source_center_dist, sym_source_center_dist)
                        / source_width**2)),
                dirichlet_tag=BTAG_NONE,
                neumann_tag=BTAG_NONE,
                radiation_tag=BTAG_ALL,
                flux_type="upwind")
    
        op.check_bc_coverage(mesh)
    
    
        return (op.sym_operator(), discr)
    
    
    def dg_flux(c, tpair):
        u = tpair[0]
        v = tpair[1:]
    
        dims = len(v)
    
        normal = sym.normal(tpair.dd, dims)
    
        flux_weak = join_fields(
    
        flux_weak -= (1 if c > 0 else -1)*join_fields(
    
                0.5*(u.int-u.ext),
                0.5*(normal * np.dot(normal, v.int-v.ext)))
    
    
        flux_strong = join_fields(
    
                np.dot(v.int, normal),
                u.int * normal) - flux_weak
    
        return sym.interp(tpair.dd, "all_faces")(c*flux_strong)
    
    
    def get_strong_wave_op_with_discr_direct(cl_ctx, dims=2, order=4):
        from meshmode.mesh.generation import generate_regular_rect_mesh
        mesh = generate_regular_rect_mesh(
                a=(-0.5,)*dims,
                b=(0.5,)*dims,
                n=(16,)*dims)
    
        logger.debug("%d elements", mesh.nelements)
    
        discr = DGDiscretizationWithBoundaries(cl_ctx, mesh, order=order)
    
        source_center = np.array([0.1, 0.22, 0.33])[:dims]
        source_width = 0.05
        source_omega = 3
    
        sym_x = sym.nodes(mesh.dim)
        sym_source_center_dist = sym_x - source_center
        sym_t = sym.ScalarVariable("t")
    
        from meshmode.mesh import BTAG_ALL
    
        c = -0.1
        sign = -1
    
        w = sym.make_sym_array("w", dims+1)
        u = w[0]
        v = w[1:]
    
        source_f = (
                    sym.sin(source_omega*sym_t)
                    * sym.exp(
                        -np.dot(sym_source_center_dist, sym_source_center_dist)
                        / source_width**2))
    
        rad_normal = sym.normal(BTAG_ALL, dims)
    
        rad_u = sym.cse(sym.interp("vol", BTAG_ALL)(u))
        rad_v = sym.cse(sym.interp("vol", BTAG_ALL)(v))
    
    
        rad_bc = sym.cse(join_fields(
    
                0.5*(rad_u - sign*np.dot(rad_normal, rad_v)),
                0.5*rad_normal*(np.dot(rad_normal, rad_v) - sign*rad_u)
                ), "rad_bc")
    
        sym_operator = (
    
                    -c*np.dot(sym.nabla(dims), v) - source_f,
                    -c*(sym.nabla(dims)*u)
                    )
                + sym.InverseMassOperator()(
                    sym.FaceMassOperator()(
                        dg_flux(c, sym.int_tpair(w))
                        + dg_flux(c, sym.bv_tpair(BTAG_ALL, w, rad_bc))
                        )))
    
        return (sym_operator, discr)
    
    
    
    def get_strong_wave_component(state_component):
        return (state_component[0], state_component[1:])
    
    
    Matt Wala's avatar
    Matt Wala committed
    # }}}
    
    
    Matt Wala's avatar
    Matt Wala committed
    # {{{ equivalence check between fused and non-fused versions
    
    Matt Wala's avatar
    Matt Wala committed
    def test_stepper_equivalence(ctx_factory, order=4):
        cl_ctx = ctx_factory()
    
        queue = cl.CommandQueue(cl_ctx)
    
        dims = 2
    
    
        sym_operator, _ = get_strong_wave_op_with_discr(
                cl_ctx, dims=dims, order=order)
        sym_operator_direct, discr = get_strong_wave_op_with_discr_direct(
                cl_ctx, dims=dims, order=order)
    
    
        if dims == 2:
            dt = 0.04
        elif dims == 3:
            dt = 0.02
    
        ic = join_fields(discr.zeros(queue),
                [discr.zeros(queue) for i in range(discr.dim)])
    
    
    
        stepper = RK4TimeStepper(
                queue, discr, "w", bound_op, 1 + discr.dim, get_strong_wave_component)
    
        fused_stepper = FusedRK4TimeStepper(
    
                queue, discr, "w", sym_operator_direct, 1 + discr.dim,
    
                get_strong_wave_component)
    
        t_start = 0
        t_end = 0.5
    
    Matt Wala's avatar
    Matt Wala committed
        nsteps = int(np.ceil((t_end + 1e-9) / dt))
    
        print("dt=%g nsteps=%d" % (dt, nsteps))
    
        step = 0
    
        norm = bind(discr, sym.norm(2, sym.var("u_ref") - sym.var("u")))
    
        fused_steps = fused_stepper.run(ic, t_start, dt, t_end)
    
        for t_ref, (u_ref, v_ref) in stepper.run(ic, t_start, dt, t_end):
            step += 1
    
    Matt Wala's avatar
    Matt Wala committed
            logger.debug("step %d/%d", step, nsteps)
    
            t, (u, v) = next(fused_steps)
            assert t == t_ref, step
            assert norm(queue, u=u, u_ref=u_ref) <= 1e-13, step
    
    # }}}
    
    
    
    # {{{ execution mapper wrapper
    
    class ExecutionMapperWrapper(Mapper):
    
        def __init__(self, queue, context, bound_op):
            self.inner_mapper = ExecutionMapper(queue, context, bound_op)
            self.queue = queue
            self.context = context
            self.bound_op = bound_op
    
        def map_variable(self, expr):
            # Needed, because bound op execution can ask for variable values.
            return self.inner_mapper.map_variable(expr)
    
        def map_grudge_variable(self, expr):
            # See map_variable()
            return self.inner_mapper.map_grudge_variable(expr)
    
    # }}}
    
    
    
    Matt Wala's avatar
    Matt Wala committed
    # {{{ mem op counter implementation
    
    
    class ExecutionMapperWithMemOpCounting(ExecutionMapperWrapper):
    
    Matt Wala's avatar
    Matt Wala committed
        # This is a skeleton implementation that only has just enough functionality
        # for the wave-min example to work.
    
    Matt Wala's avatar
    Matt Wala committed
        # {{{ expressions
    
        def map_profiled_call(self, expr, profile_data):
    
            args = [self.inner_mapper.rec(p) for p in expr.parameters]
            return self.inner_mapper.function_registry[expr.function.name](
    
                    self.queue, *args, profile_data=profile_data)
    
    Matt Wala's avatar
    Matt Wala committed
        def map_profiled_essentially_elementwise_linear(self, op, field_expr,
                                                        profile_data):
    
            result = getattr(self.inner_mapper, op.mapper_method)(op, field_expr)
    
    Matt Wala's avatar
    Matt Wala committed
    
            if profile_data is not None:
                # We model the cost to load the input and write the output.  In
                # particular, we assume the elementwise matrices are negligible in
                # size and thus ignorable.
    
    
                field = self.inner_mapper.rec(field_expr)
    
    Matt Wala's avatar
    Matt Wala committed
                profile_data["bytes_read"] = (
                        profile_data.get("bytes_read", 0) + field.nbytes)
                profile_data["bytes_written"] = (
                        profile_data.get("bytes_written", 0) + result.nbytes)
    
    
    Matt Wala's avatar
    Matt Wala committed
                if op.mapper_method == "map_interpolation":
                    profile_data["interp_bytes_read"] = (
                            profile_data.get("interp_bytes_read", 0) + field.nbytes)
                    profile_data["interp_bytes_written"] = (
                            profile_data.get("interp_bytes_written", 0) + result.nbytes)
    
    
    Matt Wala's avatar
    Matt Wala committed
            return result
    
    
    Matt Wala's avatar
    Matt Wala committed
        # }}}
    
    Matt Wala's avatar
    Matt Wala committed
        # {{{ instruction mappings
    
    Matt Wala's avatar
    Matt Wala committed
    
        def process_assignment_expr(self, expr, profile_data):
    
            if isinstance(expr, p.Call):
                assert expr.mapper_method == "map_call"
                val = self.map_profiled_call(expr, profile_data)
    
    Matt Wala's avatar
    Matt Wala committed
    
            elif isinstance(expr, sym.OperatorBinding):
                if isinstance(
                        expr.op,
                        (
                            # TODO: Not comprehensive.
    
                            sym_op.InterpolationOperator,
                            sym_op.RefFaceMassOperator,
                            sym_op.RefInverseMassOperator,
                            sym_op.OppositeInteriorFaceSwap)):
    
    Matt Wala's avatar
    Matt Wala committed
                    val = self.map_profiled_essentially_elementwise_linear(
                            expr.op, expr.field, profile_data)
    
                else:
                    assert False, ("unknown operator: %s" % expr.op)
    
            else:
                logger.debug("assignment not profiled: %s", expr)
    
                val = self.inner_mapper.rec(expr)
    
    Matt Wala's avatar
    Matt Wala committed
        def map_insn_assign(self, insn, profile_data):
            result = []
            for name, expr in zip(insn.names, insn.exprs):
    
    Matt Wala's avatar
    Matt Wala committed
                result.append((name, self.process_assignment_expr(expr, profile_data)))
    
    Matt Wala's avatar
    Matt Wala committed
            return result, []
    
        def map_insn_loopy_kernel(self, insn, profile_data):
            kwargs = {}
            kdescr = insn.kernel_descriptor
            for name, expr in six.iteritems(kdescr.input_mappings):
    
                val = self.inner_mapper.rec(expr)
    
    Matt Wala's avatar
    Matt Wala committed
                kwargs[name] = val
                assert not isinstance(val, np.ndarray)
                if profile_data is not None and isinstance(val, pyopencl.array.Array):
                    profile_data["bytes_read"] = (
                            profile_data.get("bytes_read", 0) + val.nbytes)
    
                    profile_data["bytes_read_by_scalar_assignments"] = (
                            profile_data.get("bytes_read_by_scalar_assignments", 0)
    
    Matt Wala's avatar
    Matt Wala committed
                            + val.nbytes)
    
            discr = self.inner_mapper.discrwb.discr_from_dd(kdescr.governing_dd)
    
    Matt Wala's avatar
    Matt Wala committed
            for name in kdescr.scalar_args():
                v = kwargs[name]
                if isinstance(v, (int, float)):
                    kwargs[name] = discr.real_dtype.type(v)
                elif isinstance(v, complex):
                    kwargs[name] = discr.complex_dtype.type(v)
                elif isinstance(v, np.number):
                    pass
                else:
                    raise ValueError("unrecognized scalar type for variable '%s': %s"
                            % (name, type(v)))
    
            kwargs["grdg_n"] = discr.nnodes
            evt, result_dict = kdescr.loopy_kernel(self.queue, **kwargs)
    
            for val in result_dict.values():
                assert not isinstance(val, np.ndarray)
                if profile_data is not None and isinstance(val, pyopencl.array.Array):
                    profile_data["bytes_written"] = (
                            profile_data.get("bytes_written", 0) + val.nbytes)
    
                    profile_data["bytes_written_by_scalar_assignments"] = (
                            profile_data.get("bytes_written_by_scalar_assignments", 0)
    
    Matt Wala's avatar
    Matt Wala committed
                            + val.nbytes)
    
    Matt Wala's avatar
    Matt Wala committed
    
            return list(result_dict.items()), []
    
    
    Matt Wala's avatar
    Matt Wala committed
        def map_insn_assign_to_discr_scoped(self, insn, profile_data=None):
            assignments = []
    
            for name, expr in zip(insn.names, insn.exprs):
                logger.debug("assignment not profiled: %s <- %s", name, expr)
    
                inner_mapper = self.inner_mapper
                value = inner_mapper.rec(expr)
                inner_mapper.discrwb._discr_scoped_subexpr_name_to_value[name] = value
    
    Matt Wala's avatar
    Matt Wala committed
                assignments.append((name, value))
    
            return assignments, []
    
        def map_insn_assign_from_discr_scoped(self, insn, profile_data=None):
    
            return [(
                insn.name,
                self.inner_mapper.
                    discrwb._discr_scoped_subexpr_name_to_value[insn.name])], []
    
    Matt Wala's avatar
    Matt Wala committed
    
        def map_insn_rank_data_swap(self, insn, profile_data):
            raise NotImplementedError("no profiling for instruction: %s" % insn)
    
        def map_insn_diff_batch_assign(self, insn, profile_data):
    
            assignments, futures = self.inner_mapper.map_insn_diff_batch_assign(insn)
    
    Matt Wala's avatar
    Matt Wala committed
    
            if profile_data is not None:
                # We model the cost to load the input and write the output.  In
                # particular, we assume the elementwise matrices are negligible in
                # size and thus ignorable.
    
    
                field = self.inner_mapper.rec(insn.field)
    
    Matt Wala's avatar
    Matt Wala committed
                profile_data["bytes_read"] = (
                        profile_data.get("bytes_read", 0) + field.nbytes)
    
                for _, value in assignments:
                    profile_data["bytes_written"] = (
    
    Matt Wala's avatar
    Matt Wala committed
                            profile_data.get("bytes_written", 0) + value.nbytes)
    
    Matt Wala's avatar
    Matt Wala committed
    
            return assignments, futures
    
    
    Matt Wala's avatar
    Matt Wala committed
        # }}}
    
    # }}}
    
    
    # {{{ mem op counter check
    
    
    def test_assignment_memory_model(ctx_factory):
        cl_ctx = ctx_factory()
        queue = cl.CommandQueue(cl_ctx)
    
    
        _, discr = get_strong_wave_op_with_discr_direct(cl_ctx, dims=2, order=3)
    
    
        # Assignment instruction
        bound_op = bind(
                discr,
                sym.Variable("input0", sym.DD_VOLUME)
                + sym.Variable("input1", sym.DD_VOLUME),
                exec_mapper_factory=ExecutionMapperWithMemOpCounting)
    
        input0 = discr.zeros(queue)
        input1 = discr.zeros(queue)
    
        result, profile_data = bound_op(
                queue,
                profile_data={},
                input0=input0,
                input1=input1)
    
        assert profile_data["bytes_read"] == input0.nbytes + input1.nbytes
        assert profile_data["bytes_written"] == result.nbytes
    
    
    
    Matt Wala's avatar
    Matt Wala committed
    @pytest.mark.parametrize("use_fusion", (True, False))
    
    Matt Wala's avatar
    Matt Wala committed
    def test_stepper_mem_ops(ctx_factory, use_fusion):
        cl_ctx = ctx_factory()
    
    Matt Wala's avatar
    Matt Wala committed
        queue = cl.CommandQueue(cl_ctx)
    
        dims = 2
    
    Matt Wala's avatar
    Matt Wala committed
    
    
        sym_operator, discr = get_strong_wave_op_with_discr_direct(
                cl_ctx, dims=dims, order=3)
    
    Matt Wala's avatar
    Matt Wala committed
    
        t_start = 0
        dt = 0.04
        t_end = 0.2
    
        ic = join_fields(discr.zeros(queue),
                [discr.zeros(queue) for i in range(discr.dim)])
    
    
    Matt Wala's avatar
    Matt Wala committed
        if not use_fusion:
    
    Matt Wala's avatar
    Matt Wala committed
            bound_op = bind(
    
    Matt Wala's avatar
    Matt Wala committed
                    exec_mapper_factory=ExecutionMapperWithMemOpCounting)
    
    Matt Wala's avatar
    Matt Wala committed
    
    
    Matt Wala's avatar
    Matt Wala committed
            stepper = RK4TimeStepper(
                    queue, discr, "w", bound_op, 1 + discr.dim,
                    get_strong_wave_component,
    
    Matt Wala's avatar
    Matt Wala committed
                    exec_mapper_factory=ExecutionMapperWithMemOpCounting)
    
    Matt Wala's avatar
    Matt Wala committed
    
        else:
            stepper = FusedRK4TimeStepper(
    
                    queue, discr, "w", sym_operator, 1 + discr.dim,
    
    Matt Wala's avatar
    Matt Wala committed
                    get_strong_wave_component,
    
    Matt Wala's avatar
    Matt Wala committed
                    exec_mapper_factory=ExecutionMapperWithMemOpCounting)
    
    Matt Wala's avatar
    Matt Wala committed
    
        step = 0
    
        nsteps = int(np.ceil((t_end + 1e-9) / dt))
        for (_, _, profile_data) in stepper.run(
                ic, t_start, dt, t_end, return_profile_data=True):
            step += 1
    
    Matt Wala's avatar
    Matt Wala committed
            logger.info("step %d/%d", step, nsteps)
    
    Matt Wala's avatar
    Matt Wala committed
        logger.info("using fusion? %s", use_fusion)
    
    Matt Wala's avatar
    Matt Wala committed
        logger.info("bytes read: %d", profile_data["bytes_read"])
        logger.info("bytes written: %d", profile_data["bytes_written"])
    
    Matt Wala's avatar
    Matt Wala committed
        logger.info("bytes total: %d",
                profile_data["bytes_read"] + profile_data["bytes_written"])
    
    # }}}
    
    
    
    Matt Wala's avatar
    Matt Wala committed
    # {{{ execution mapper with timing
    
    
    Matt Wala's avatar
    Matt Wala committed
    SECONDS_PER_NANOSECOND = 10**9
    
    
    class TimingFuture(object):
    
        def __init__(self, start_event, stop_event):
            self.start_event = start_event
            self.stop_event = stop_event
    
        def elapsed(self):
            cl.wait_for_events([self.start_event, self.stop_event])
            return (
                    self.stop_event.profile.end
                    - self.start_event.profile.end) / SECONDS_PER_NANOSECOND
    
    
    from collections.abc import MutableSequence
    
    
    class TimingFutureList(MutableSequence):
    
        def __init__(self, *args, **kwargs):
            self._list = list(*args, **kwargs)
    
        def __len__(self):
            return len(self._list)
    
        def __getitem__(self, idx):
            return self._list[idx]
    
        def __setitem__(self, idx, val):
            self._list[idx] = val
    
        def __delitem__(self, idx):
            del self._list[idx]
    
        def insert(self, idx, val):
            self._list.insert(idx, val)
    
        def elapsed(self):
            return sum(future.elapsed() for future in self._list)
    
    
    
    Matt Wala's avatar
    Matt Wala committed
    def time_insn(f):
        time_field_name = "time_%s" % f.__name__
    
        def wrapper(self, insn, profile_data):
    
    Matt Wala's avatar
    Matt Wala committed
            if profile_data is None:
                return f(self, insn, profile_data)
    
    Matt Wala's avatar
    Matt Wala committed
    
    
    Matt Wala's avatar
    Matt Wala committed
            start = cl.enqueue_marker(self.queue)
            retval = f(self, insn, profile_data)
            end = cl.enqueue_marker(self.queue)
            profile_data\
                    .setdefault(time_field_name, TimingFutureList())\
                    .append(TimingFuture(start, end))
    
    Matt Wala's avatar
    Matt Wala committed
    
            return retval
    
        return wrapper
    
    
    
    class ExecutionMapperWithTiming(ExecutionMapperWrapper):
    
    Matt Wala's avatar
    Matt Wala committed
    
    
        def map_profiled_call(self, expr, profile_data):
    
            args = [self.inner_mapper.rec(p) for p in expr.parameters]
            return self.inner_mapper.function_registry[expr.function.name](
    
                    self.queue, *args, profile_data=profile_data)
    
    Matt Wala's avatar
    Matt Wala committed
    
        def map_profiled_operator_binding(self, expr, profile_data):
    
    Matt Wala's avatar
    Matt Wala committed
            if profile_data is None:
    
                return self.inner_mapper.map_operator_binding(expr)
    
    Matt Wala's avatar
    Matt Wala committed
    
            start = cl.enqueue_marker(self.queue)
    
            retval = self.inner_mapper.map_operator_binding(expr)
    
    Matt Wala's avatar
    Matt Wala committed
            end = cl.enqueue_marker(self.queue)
            time_field_name = "time_op_%s" % expr.op.mapper_method
            profile_data\
                    .setdefault(time_field_name, TimingFutureList())\
                    .append(TimingFuture(start, end))
    
    
    Matt Wala's avatar
    Matt Wala committed
            return retval
    
    
        def map_insn_assign_to_discr_scoped(self, insn, profile_data):
            return self.inner_mapper.map_insn_assign_to_discr_scoped(insn, profile_data)
    
        def map_insn_assign_from_discr_scoped(self, insn, profile_data):
            return self.\
                inner_mapper.map_insn_assign_from_discr_scoped(insn, profile_data)
    
    
    Matt Wala's avatar
    Matt Wala committed
        @time_insn
        def map_insn_loopy_kernel(self, *args, **kwargs):
    
            return self.inner_mapper.map_insn_loopy_kernel(*args, **kwargs)
    
    Matt Wala's avatar
    Matt Wala committed
    
        def map_insn_assign(self, insn, profile_data):
            if len(insn.exprs) == 1:
    
                if isinstance(insn.exprs[0], p.Call):
                    assert insn.exprs[0].mapper_method == "map_call"
                    val = self.map_profiled_call(insn.exprs[0], profile_data)
    
    Matt Wala's avatar
    Matt Wala committed
                    return [(insn.names[0], val)], []
                elif isinstance(insn.exprs[0], sym.OperatorBinding):
                    assert insn.exprs[0].mapper_method == "map_operator_binding"
                    val = self.map_profiled_operator_binding(insn.exprs[0], profile_data)
                    return [(insn.names[0], val)], []
    
    
            return self.inner_mapper.map_insn_assign(insn, profile_data)
    
    Matt Wala's avatar
    Matt Wala committed
    
        @time_insn
        def map_insn_diff_batch_assign(self, insn, profile_data):
    
            return self.inner_mapper.map_insn_diff_batch_assign(insn, profile_data)
    
    Matt Wala's avatar
    Matt Wala committed
    
    # }}}
    
    
    # {{{ timing check
    
    @pytest.mark.parametrize("use_fusion", (True, False))
    def test_stepper_timing(ctx_factory, use_fusion):
        cl_ctx = ctx_factory()
    
    Matt Wala's avatar
    Matt Wala committed
        queue = cl.CommandQueue(
                cl_ctx,
                properties=cl.command_queue_properties.PROFILING_ENABLE)
    
    Matt Wala's avatar
    Matt Wala committed
    
    
    Matt Wala's avatar
    Matt Wala committed
        dims = 3
    
    Matt Wala's avatar
    Matt Wala committed
    
    
        sym_operator, discr = get_strong_wave_op_with_discr_direct(
                cl_ctx, dims=dims, order=3)
    
    Matt Wala's avatar
    Matt Wala committed
    
        t_start = 0
        dt = 0.04
        t_end = 0.1
    
        ic = join_fields(discr.zeros(queue),
                [discr.zeros(queue) for i in range(discr.dim)])
    
        if not use_fusion:
            bound_op = bind(
    
    Matt Wala's avatar
    Matt Wala committed
                    exec_mapper_factory=ExecutionMapperWithTiming)
    
            stepper = RK4TimeStepper(
                    queue, discr, "w", bound_op, 1 + discr.dim,
                    get_strong_wave_component,
                    exec_mapper_factory=ExecutionMapperWithTiming)
    
        else:
            stepper = FusedRK4TimeStepper(
    
                    queue, discr, "w", sym_operator, 1 + discr.dim,
    
    Matt Wala's avatar
    Matt Wala committed
                    get_strong_wave_component,
                    exec_mapper_factory=ExecutionMapperWithTiming)
    
        step = 0
    
        import time
        t = time.time()
        nsteps = int(np.ceil((t_end + 1e-9) / dt))
        for (_, _, profile_data) in stepper.run(
                ic, t_start, dt, t_end, return_profile_data=True):
            step += 1
            tn = time.time()
            logger.info("step %d/%d: %f", step, nsteps, tn - t)
            t = tn