From 220a5938bbac757390944d581c7429db7e9ddb24 Mon Sep 17 00:00:00 2001 From: Andreas Kloeckner Date: Tue, 28 Jan 2014 18:17:10 -0600 Subject: [PATCH] Complete more disciplined rewrite of barrier insertion --- loopy/schedule.py | 439 ++++++++++++++++++++++++++++++++------------- test/test_loopy.py | 15 +- test/test_nbody.py | 34 ++-- 3 files changed, 343 insertions(+), 145 deletions(-) diff --git a/loopy/schedule.py b/loopy/schedule.py index 70cbe41b8..697322f11 100644 --- a/loopy/schedule.py +++ b/loopy/schedule.py @@ -26,6 +26,7 @@ THE SOFTWARE. from pytools import Record import sys import islpy as isl +from loopy.diagnostic import LoopyError # noqa import logging logger = logging.getLogger(__name__) @@ -50,7 +51,16 @@ class RunInstruction(ScheduleItem): class Barrier(ScheduleItem): - __slots__ = ["comment"] + """ + .. attribute:: comment + + A plain-text comment explaining why the barrier was inserted. + + .. attribute:: kind + + ``"local"`` or ``"global"`` + """ + __slots__ = ["comment", "kind"] # }}} @@ -100,57 +110,6 @@ def generate_sub_sched_items(schedule, start_idx): assert False -def get_barrier_needing_dependency(kernel, target, source, unordered=False): - from loopy.kernel.data import InstructionBase - if not isinstance(source, InstructionBase): - source = kernel.id_to_insn[source] - if not isinstance(target, InstructionBase): - target = kernel.id_to_insn[target] - - local_vars = kernel.local_var_names() - - tgt_write = set(target.assignee_var_names()) & local_vars - tgt_read = target.read_dependency_names() & local_vars - - src_write = set(source.assignee_var_names()) & local_vars - src_read = source.read_dependency_names() & local_vars - - waw = tgt_write & src_write - raw = tgt_read & src_write - war = tgt_write & src_read - - for var_name in raw | war: - if not unordered: - assert source.id in target.insn_deps - return (target, source, var_name) - - if source is target: - return None - - for var_name in waw: - assert (source.id in target.insn_deps - or source is target) - return (target, source, var_name) - - return None - - -def get_barrier_dependent_in_schedule(kernel, source, schedule, - unordered): - """ - :arg source: an instruction id for the source of the dependency - """ - - for sched_item in schedule: - if isinstance(sched_item, RunInstruction): - temp_res = get_barrier_needing_dependency( - kernel, sched_item.insn_id, source, unordered=unordered) - if temp_res: - return temp_res - elif isinstance(sched_item, Barrier): - return - - def find_active_inames_at(kernel, sched_index): active_inames = [] @@ -698,114 +657,341 @@ def generate_loop_schedules_internal(sched_state, loop_priority, schedule=[], # {{{ barrier insertion -def insert_barriers(kernel, schedule, level=0): +class DependencyRecord(Record): + """ + .. attribute:: source + + A :class:`loopy.InstructionBase` instance. + + .. attribute:: target + + A :class:`loopy.InstructionBase` instance. + + .. attribute:: variable + + A string, the name of the variable that caused the dependency to arise. + + .. attribute:: var_kind + + "global" or "local" + + .. attribute:: is_forward + + A :class:`bool` indicating whether this is a forward or reverse + dependency. + + In a 'forward' dependency, the target depends on the source. + In a 'reverse' dependency, the source depends on the target. + """ + + def __init__(self, source, target, variable, var_kind, is_forward): + Record.__init__(self, + source=source, + target=target, + variable=variable, + var_kind=var_kind, + is_forward=is_forward) + + +def get_barrier_needing_dependency(kernel, target, source, reverse, var_kind): + """If there exists a depdency between target and source and the two access + a common variable of *var_kind* in a way that requires a barrier (essentially, + at least one write), then the function will return a tuple + ``(target, source, var_name)``. Otherwise, it will return *None*. + + This function finds direct or indirect instruction dependencies, but does + not attempt to guess dependencies that exist based on common access to + variables. + + :arg reverse: a :class:`bool` indicating whether + forward or reverse dependencies are sought. (see above) + :arg var_kind: "global" or "local", the kind of variable based on which + barrier-needing dependencies should be found. + """ + + # If target or source are insn IDs, look up the actual instructions. + from loopy.kernel.data import InstructionBase + if not isinstance(source, InstructionBase): + source = kernel.id_to_insn[source] + if not isinstance(target, InstructionBase): + target = kernel.id_to_insn[target] + + if reverse: + source, target = target, source + + # Check that a dependency exists. + target_deps = kernel.recursive_insn_dep_map()[target.id] + if source.id not in target_deps: + return None + + if var_kind == "local": + relevant_vars = kernel.local_var_names() + elif var_kind == "global": + relevant_vars = kernel.global_var_names() + else: + raise ValueError("unknown 'var_kind': %s" % var_kind) + + tgt_write = set(target.assignee_var_names()) & relevant_vars + tgt_read = target.read_dependency_names() & relevant_vars + + src_write = set(source.assignee_var_names()) & relevant_vars + src_read = source.read_dependency_names() & relevant_vars + + waw = tgt_write & src_write + raw = tgt_read & src_write + war = tgt_write & src_read + + for var_name in raw | war: + return DependencyRecord( + source=source, + target=target, + variable=var_name, + var_kind=var_kind, + is_forward=not reverse) + + if source is target: + return None + + for var_name in waw: + return DependencyRecord( + source=source, + target=target, + variable=var_name, + var_kind=var_kind, + is_forward=not reverse) + + return None + + +def barrier_kind_more_or_equally_global(kind1, kind2): + return (kind1 == kind2) or (kind1 == "global" and kind2 == "local") + + +def get_tail_starting_at_last_barrier(schedule, kind): result = [] - owed_barriers = set() - loop_had_barrier = [False] + for sched_item in reversed(schedule): + if isinstance(sched_item, Barrier): + if barrier_kind_more_or_equally_global(sched_item.kind, kind): + break - # A 'pre-barrier' is a special case that is only necessary once per loop - # iteration to protect the tops of local-mem variable assignments from - # being entered before all reads in the previous loop iteration are - # complete. Once the loop has had a barrier, this is not a concern any - # more, and any further write-after-read hazards will be covered by - # dependencies for which the 'normal' mechanism below will generate - # barriers. + elif isinstance(sched_item, RunInstruction): + result.append(sched_item.insn_id) - def issue_barrier(is_pre_barrier, dep): - if result and isinstance(result[-1], Barrier): - return + elif isinstance(sched_item, (EnterLoop, LeaveLoop)): + pass - if is_pre_barrier: - if loop_had_barrier[0] or level == 0: - return + else: + raise ValueError("unexpected schedule item type '%s'" + % type(sched_item).__name__) + + return reversed(result) + + +def insn_ids_from_schedule(schedule): + result = [] + for sched_item in reversed(schedule): + if isinstance(sched_item, RunInstruction): + result.append(sched_item.insn_id) + + elif isinstance(sched_item, (EnterLoop, LeaveLoop, Barrier)): + pass + + else: + raise ValueError("unexpected schedule item type '%s'" + % type(sched_item).__name__) + + return result + + +def insert_barriers(kernel, schedule, reverse, kind, level=0): + """ + :arg reverse: a :class:`bool`. For ``level > 0``, this function should be + called twice, first with ``reverse=False`` to insert barriers for + forward dependencies, and then again with ``reverse=True`` to insert + reverse depedencies. This order is preferable because the forward pass + will limit the number of instructions that need to be considered as + depedency source candidates by already inserting some number of + barriers into *schedule*. + + Calling it with ``reverse==True and level==0` is not necessary, + since the root of the schedule is in no loop, therefore not repeated, + and therefore reverse dependencies don't need to be added. + :arg kind: "local" or "global". The :attr:`Barrier.kind` to be inserted. + Generally, this function will be called once for each kind of barrier + at the top level, where more global barriers should be inserted first. + :arg level: the current level of loop nesting, 0 for outermost. + """ + result = [] + + # In straight-line code, we have only 'b depends on a'-type 'forward' + # dependencies. But a loop of the type + # + # for i in range(10): + # A + # B + # + # effectively glues multiple copies of 'A;B' one after the other: + # + # A + # B + # A + # B + # ... + # + # Now, if B depends on (i.e. is required to be textually before) A in a way + # requiring a barrier, then we will assume that the reverse dependency exists + # as well, i.e. a barrier between the tail end fo execution of B and the next + # beginning of A is also needed. + + if level == 0 and reverse: + # The global schedule is in no loop, therefore not repeated, and + # therefore reverse dependencies don't need to be added. + return schedule + + # a list of instruction IDs that could lead to barrier-needing dependencies. + if reverse: + candidates = set(get_tail_starting_at_last_barrier(schedule, kind)) + else: + candidates = set() - owed_barriers.clear() + past_first_barrier = [False] - cmt = None + def seen_barrier(): + past_first_barrier[0] = True + + # We've just gone across a barrier, so anything that needed + # one from above just got one. + + candidates.clear() + + def issue_barrier(dep): + seen_barrier() + + comment = None if dep is not None: - target, source, var = dep - if is_pre_barrier: - cmt = "pre-barrier: %s" % var + if dep.is_forward: + comment = "for %s (%s depends on %s)" % ( + dep.variable, dep.target.id, dep.source.id) else: - cmt = "dependency: %s" % var + comment = "for %s (%s rev-depends on %s)" % ( + dep.variable, dep.source.id, dep.target.id) - loop_had_barrier[0] = True - result.append(Barrier(comment=cmt)) + result.append(Barrier(comment=comment, kind=dep.var_kind)) i = 0 while i < len(schedule): sched_item = schedule[i] if isinstance(sched_item, EnterLoop): + # {{{ recurse for nested loop + subloop, new_i = gather_schedule_subloop(schedule, i) + i = new_i - subresult, sub_owed_barriers = insert_barriers( - kernel, subloop[1:-1], level+1) + # Run barrier insertion for inner loop + subresult = subloop[1:-1] + for sub_reverse in [False, True]: + subresult = insert_barriers( + kernel, subresult, + reverse=sub_reverse, kind=kind, + level=level+1) - # {{{ issue dependency-based barriers for contents of nested loop + # {{{ find barriers in loop body - # (i.e. if anything *in* the loop depends on something beforehand) + first_barrier_index = None + last_barrier_index = None - for insn_id in owed_barriers: - dep = get_barrier_dependent_in_schedule(kernel, insn_id, subresult, - unordered=False) - if dep: - issue_barrier(is_pre_barrier=False, dep=dep) - break + for j, sub_sched_item in enumerate(subresult): + if (isinstance(sched_item, Barrier) and + barrier_kind_more_or_equally_global(sched_item.kind, kind)): + + seen_barrier() + last_barrier_index = j + if first_barrier_index is None: + first_barrier_index = j # }}} - # {{{ issue pre-barriers for contents of nested loop - if not loop_had_barrier[0]: - for insn_id in sub_owed_barriers: - dep = get_barrier_dependent_in_schedule( - kernel, insn_id, schedule, unordered=True) + # {{{ check if a barrier is needed before the loop + + # (for leading (before-first-barrier) bit of loop body) + for insn_id in insn_ids_from_schedule(subresult[:first_barrier_index]): + search_set = candidates + if not reverse: + # can limit search set in case of forward dep + search_set = search_set \ + & kernel.recursive_insn_dep_map()[insn_id] + + for dep_src_insn_id in search_set: + dep = get_barrier_needing_dependency( + kernel, + target=insn_id, + source=dep_src_insn_id, + reverse=reverse, var_kind=kind) if dep: - issue_barrier(is_pre_barrier=True, dep=dep) + issue_barrier(dep=dep) + break # }}} + # add trailing end (past-last-barrier) of loop body to candidates + if last_barrier_index is None: + candidates.update(insn_ids_from_schedule(subresult)) + else: + candidates.update( + insn_ids_from_schedule( + subresult[last_barrier_index+1:])) + result.append(subloop[0]) result.extend(subresult) result.append(subloop[-1]) - owed_barriers.update(sub_owed_barriers) - - i = new_i + # }}} - elif isinstance(sched_item, RunInstruction): + elif isinstance(sched_item, Barrier): i += 1 - insn = kernel.id_to_insn[sched_item.insn_id] + if barrier_kind_more_or_equally_global(sched_item.kind, kind): + seen_barrier() - # {{{ issue dependency-based barriers for this instruction - - for dep_src_insn_id in set(insn.insn_deps) & owed_barriers: - dep = get_barrier_needing_dependency(kernel, insn, dep_src_insn_id) - if dep: - issue_barrier(is_pre_barrier=False, dep=dep) - - # }}} + result.append(sched_item) - for assignee_name in insn.assignee_var_names(): - assignee_temp_var = kernel.temporary_variables.get( - assignee_name) - if assignee_temp_var is not None and assignee_temp_var.is_local: - dep = get_barrier_dependent_in_schedule( - kernel, insn.id, schedule, - unordered=True) + elif isinstance(sched_item, RunInstruction): + i += 1 - if dep: - issue_barrier(is_pre_barrier=True, dep=dep) + search_set = candidates + if not reverse: + # can limit search set in case of forward dep + search_set = search_set \ + & kernel.recursive_insn_dep_map()[sched_item.insn_id] + + for dep_src_insn_id in search_set: + dep = get_barrier_needing_dependency( + kernel, + target=sched_item.insn_id, + source=dep_src_insn_id, + reverse=reverse, var_kind=kind) + if dep: + issue_barrier(dep=dep) + break - owed_barriers.add(insn.id) result.append(sched_item) + candidates.add(sched_item.insn_id) else: - assert False + raise ValueError("unexpected schedule item type '%s'" + % type(sched_item).__name__) + + if past_first_barrier[0] and reverse: + # We can quit here, because we're only trying add + # reverse-dep barriers to the beginning of the loop, up to + # the first barrier. + + result.extend(schedule[i:]) + break - return result, owed_barriers + return result # }}} @@ -851,15 +1037,16 @@ def generate_loop_schedules(kernel, debug_args={}): debug=debug)] for gen in generators: for gen_sched in gen: - gen_sched, owed_barriers = insert_barriers(kernel, gen_sched) - if owed_barriers: - from warnings import warn - from loopy.diagnostic import LoopyAdvisory - warn("Barrier insertion finished without inserting barriers for " - "local memory writes in these instructions: '%s'. " - "This often means that local memory was " - "written, but never read." - % ",".join(owed_barriers), LoopyAdvisory) + # gen_sched = insert_barriers(kernel, gen_sched, + # reverse=False, kind="global") + + # for sched_item in gen_sched: + # if isinstance(sched_item, Barrier) and sched_item.kind == "global": + # raise LoopyError("kernel requires a global barrier %s" + # % sched_item.comment) + + gen_sched = insert_barriers(kernel, gen_sched, + reverse=False, kind="local") debug.stop() yield kernel.copy(schedule=gen_sched) diff --git a/test/test_loopy.py b/test/test_loopy.py index 6a3df9d9e..95b8dd349 100644 --- a/test/test_loopy.py +++ b/test/test_loopy.py @@ -805,10 +805,10 @@ def test_equality_constraints(ctx_factory): ], [ "a[i,j] = 5 {id=set_all}", - "a[i,k] = 22 {dep=set_all}", + "b[i,k] = 22 {dep=set_all}", ], [ - lp.GlobalArg("a", dtype, shape="n, n", order=order), + lp.GlobalArg("a,b", dtype, shape="n, n", order=order), lp.ValueArg("n", np.int32, approximately=1000), ], name="equality_constraints", assumptions="n>=1") @@ -821,7 +821,8 @@ def test_equality_constraints(ctx_factory): #print knl.domains[0].detect_equalities() lp.auto_test_vs_ref(seq_knl, ctx, knl, - parameters=dict(n=n), print_ref_code=True) + parameters=dict(n=n), print_ref_code=True, + fills_entire_output=False) def test_stride(ctx_factory): @@ -864,10 +865,10 @@ def test_domain_dependency_via_existentially_quantified_variable(ctx_factory): ], [ "a[i] = 5 {id=set}", - "a[k] = 6 {dep=set}", + "b[k] = 6 {dep=set}", ], [ - lp.GlobalArg("a", dtype, shape="n", order=order), + lp.GlobalArg("a,b", dtype, shape="n", order=order), lp.ValueArg("n", np.int32, approximately=1000), ], assumptions="n>=1") @@ -875,7 +876,8 @@ def test_domain_dependency_via_existentially_quantified_variable(ctx_factory): seq_knl = knl lp.auto_test_vs_ref(seq_knl, ctx, knl, - parameters=dict(n=n), ) + parameters=dict(n=n), + fills_entire_output=False) def test_double_sum(ctx_factory): @@ -1655,6 +1657,7 @@ def test_multiple_writes_to_local_temporary(ctx_factory): knl = lp.tag_inames(knl, dict(i="l.0")) code, _ = lp.generate_code(knl) + print code if __name__ == "__main__": diff --git a/test/test_nbody.py b/test/test_nbody.py index 2d80c1383..7ec973156 100644 --- a/test/test_nbody.py +++ b/test/test_nbody.py @@ -25,28 +25,32 @@ THE SOFTWARE. import numpy as np import loopy as lp +import pyopencl as cl # noqa from pyopencl.tools import ( # noqa pytest_generate_tests_for_pyopencl as pytest_generate_tests) +import logging +logger = logging.getLogger(__name__) + def test_nbody(ctx_factory): + logging.basicConfig(level=logging.INFO) + dtype = np.float32 ctx = ctx_factory() knl = lp.make_kernel(ctx.devices[0], "[N] -> {[i,j,k]: 0<=i,j