diff --git a/loopy/schedule/__init__.py b/loopy/schedule/__init__.py index 22efee0f2c8730a87f813cfb2d0581e7bb8514f5..4148d7d752f89e5c3c37825dbce20a883ba08c0d 100644 --- a/loopy/schedule/__init__.py +++ b/loopy/schedule/__init__.py @@ -1384,172 +1384,253 @@ class DependencyRecord(ImmutableRecord): var_kind=var_kind) -def get_barrier_needing_dependency(kernel, target, source, reverse, var_kind): - """If there exists a dependency between target and source and the two access - a common variable of *var_kind* in a way that requires a barrier (essentially, - at least one write), then the function will return a tuple - ``(target, source, var_name)``. Otherwise, it will return *None*. - - This function finds direct or indirect instruction dependencies, but does - not attempt to guess dependencies that exist based on common access to - variables. - - :arg reverse: a :class:`bool` indicating whether - forward or reverse dependencies are sought. (see above) - :arg var_kind: "global" or "local", the kind of variable based on which - barrier-needing dependencies should be found. +class DependencyTracker(object): """ + A utility to help track dependencies between originating from a set + of sources (as defined by :meth:`add_source`. For each target, + dependencies can then be obtained using :meth:`gen_dependencies_with_target_at`. - # If target or source are insn IDs, look up the actual instructions. - from loopy.kernel.data import InstructionBase - if not isinstance(source, InstructionBase): - source = kernel.id_to_insn[source] - if not isinstance(target, InstructionBase): - target = kernel.id_to_insn[target] - - if reverse: - source, target = target, source - - if source.id in kernel.get_nosync_set(target.id, var_kind): - return - - # {{{ check that a dependency exists - - dep_descr = None + .. automethod:: add_source + .. automethod:: gen_dependencies_with_target_at + """ - target_deps = kernel.recursive_insn_dep_map()[target.id] - if source.id in target_deps: - if reverse: - dep_descr = "{src} rev-depends on {tgt}" + def __init__(self, kernel, var_kind, reverse): + """ + :arg var_kind: "global" or "local", the kind of variable based on which + barrier-needing dependencies should be found. + :arg reverse: + In straight-line code, this only tracks 'b depends on + a'-type 'forward' dependencies. But a loop of the type:: + + for i in range(10): + A + B + + effectively glues multiple copies of 'A;B' one after the other:: + + A + B + A + B + ... + + Now, if B depends on (i.e. is required to be textually before) A in a + way requiring a barrier, then we will assume that the reverse + dependency exists as well, i.e. a barrier between the tail end of + execution of B and the next beginning of A is also needed. + + Setting *reverse* to *True* tracks these reverse (instead of forward) + dependencies. + """ + self.kernel = kernel + self.reverse = reverse + self.var_kind = var_kind + + if var_kind == "local": + self.relevant_vars = kernel.local_var_names() + elif var_kind == "global": + self.relevant_vars = kernel.global_var_names() else: - dep_descr = "{tgt} depends on {src}" - - grps = source.groups & target.conflicts_with_groups - if grps: - dep_descr = "{src} conflicts with {tgt} (via '%s')" % ", ".join(grps) - - grps = target.groups & source.conflicts_with_groups - if grps: - dep_descr = "{src} conflicts with {tgt} (via '%s')" % ", ".join(grps) - - if not dep_descr: - return None - - # }}} - - if var_kind == "local": - relevant_vars = kernel.local_var_names() - elif var_kind == "global": - relevant_vars = kernel.global_var_names() - else: - raise ValueError("unknown 'var_kind': %s" % var_kind) + raise ValueError("unknown 'var_kind': %s" % var_kind) - temp_to_base_storage = kernel.get_temporary_to_base_storage_map() + from collections import defaultdict + self.writer_map = defaultdict(lambda: set()) + self.reader_map = defaultdict(lambda: set()) + self.temp_to_base_storage = kernel.get_temporary_to_base_storage_map() - def map_to_base_storage(var_names): + def map_to_base_storage(self, var_names): result = set(var_names) for name in var_names: - bs = temp_to_base_storage.get(name) + bs = self.temp_to_base_storage.get(name) if bs is not None: result.add(bs) return result - tgt_write = map_to_base_storage( - set(target.assignee_var_names()) & relevant_vars) - tgt_read = map_to_base_storage( - target.read_dependency_names() & relevant_vars) + def discard_all_sources(self): + self.writer_map.clear() + self.reader_map.clear() - src_write = map_to_base_storage( - set(source.assignee_var_names()) & relevant_vars) - src_read = map_to_base_storage( - source.read_dependency_names() & relevant_vars) + def add_source(self, source): + """ + Specify that an instruction may be used as the source of a dependency edge. + """ + # If source is an insn ID, look up the actual instruction. + source = self.kernel.id_to_insn.get(source, source) - waw = tgt_write & src_write - raw = tgt_read & src_write - war = tgt_write & src_read + for written in self.map_to_base_storage( + set(source.assignee_var_names()) & self.relevant_vars): + self.writer_map[written].add(source.id) - for var_name in sorted(raw | war): - return DependencyRecord( - source=source, - target=target, - dep_descr=dep_descr, - variable=var_name, - var_kind=var_kind) + for read in self.map_to_base_storage( + source.read_dependency_names() & self.relevant_vars): + self.reader_map[read].add(source.id) - if source is target: - return None + def gen_dependencies_with_target_at(self, target): + """ + Generate :class:`DependencyRecord` instances for dependencies edges + whose target is the given instruction. - for var_name in sorted(waw): - return DependencyRecord( - source=source, - target=target, - dep_descr=dep_descr, - variable=var_name, - var_kind=var_kind) + :arg target: The ID of the instruction for which dependencies + with conflicting var access should be found. + """ + # If target is an insn ID, look up the actual instruction. + target = self.kernel.id_to_insn.get(target, target) + + tgt_write = self.map_to_base_storage( + set(target.assignee_var_names()) & self.relevant_vars) + tgt_read = self.map_to_base_storage( + target.read_dependency_names() & self.relevant_vars) + + for (accessed_vars, accessor_map, ignore_self) in [ + (tgt_read, self.writer_map, False), + (tgt_write, self.reader_map, False), + (tgt_write, self.writer_map, True)]: + + for dep in self.get_conflicting_accesses( + accessed_vars, accessor_map, ignore_self, target.id): + yield dep + + def get_conflicting_accesses( + self, accessed_vars, var_to_accessor_map, ignore_self, target): + + def determine_conflict_nature(source, target): + if ignore_self and source == target: + return None + if (not self.reverse and source in + self.kernel.get_nosync_set(target, scope=self.var_kind)): + return None + if (self.reverse and target in + self.kernel.get_nosync_set(source, scope=self.var_kind)): + return None + return self.describe_dependency(source, target) + + for var in sorted(accessed_vars): + for source in sorted(var_to_accessor_map[var]): + dep_descr = determine_conflict_nature(source, target) + + if dep_descr is None: + continue - return None + yield DependencyRecord( + source=self.kernel.id_to_insn[source], + target=self.kernel.id_to_insn[target], + dep_descr=dep_descr, + variable=var, + var_kind=self.var_kind) + def describe_dependency(self, source, target): + dep_descr = None -def barrier_kind_more_or_equally_global(kind1, kind2): - return (kind1 == kind2) or (kind1 == "global" and kind2 == "local") + source = self.kernel.id_to_insn[source] + target = self.kernel.id_to_insn[target] + if self.reverse: + source, target = target, source -def get_tail_starting_at_last_barrier(schedule, kind): - result = [] + target_deps = self.kernel.recursive_insn_dep_map()[target.id] + if source.id in target_deps: + if self.reverse: + dep_descr = "{tgt} rev-depends on {src}" + else: + dep_descr = "{tgt} depends on {src}" - for sched_item in reversed(schedule): - if isinstance(sched_item, Barrier): - if barrier_kind_more_or_equally_global(sched_item.kind, kind): - break + grps = source.groups & target.conflicts_with_groups + if not grps: + grps = target.groups & source.conflicts_with_groups - elif isinstance(sched_item, RunInstruction): - result.append(sched_item.insn_id) + if grps: + dep_descr = "{src} conflicts with {tgt} (via '%s')" % ", ".join(grps) - elif isinstance(sched_item, (EnterLoop, LeaveLoop)): - pass + return dep_descr - elif isinstance(sched_item, (CallKernel, ReturnFromKernel)): - pass - else: - raise ValueError("unexpected schedule item type '%s'" - % type(sched_item).__name__) +def barrier_kind_more_or_equally_global(kind1, kind2): + return (kind1 == kind2) or (kind1 == "global" and kind2 == "local") - return reversed(result) +def insn_ids_reaching_end_without_intervening_barrier(schedule, kind): + return _insn_ids_reaching_end(schedule, kind, reverse=False) -def insn_ids_from_schedule(schedule): - result = [] - for sched_item in reversed(schedule): - if isinstance(sched_item, RunInstruction): - result.append(sched_item.insn_id) - elif isinstance(sched_item, (EnterLoop, LeaveLoop, Barrier, CallKernel, - ReturnFromKernel)): - pass +def insn_ids_reachable_from_start_without_intervening_barrier(schedule, kind): + return _insn_ids_reaching_end(schedule, kind, reverse=True) - else: - raise ValueError("unexpected schedule item type '%s'" - % type(sched_item).__name__) - return result +def _insn_ids_reaching_end(schedule, kind, reverse): + if reverse: + schedule = reversed(schedule) + enter_scope_item_kind = LeaveLoop + leave_scope_item_kind = EnterLoop + else: + enter_scope_item_kind = EnterLoop + leave_scope_item_kind = LeaveLoop + + insn_ids_alive_at_scope = [set()] + + for sched_item in schedule: + if isinstance(sched_item, enter_scope_item_kind): + insn_ids_alive_at_scope.append(set()) + elif isinstance(sched_item, leave_scope_item_kind): + innermost_scope = insn_ids_alive_at_scope.pop() + # Instructions in deeper scopes are alive but could be killed by + # barriers at a shallower level, e.g.: + # + # for i + # insn0 + # end + # barrier() <= kills insn0 + # + # Hence we merge this scope into the parent scope. + insn_ids_alive_at_scope[-1].update(innermost_scope) + elif isinstance(sched_item, Barrier): + # This barrier kills only the instruction ids that are alive at + # the current scope (or deeper). Without further analysis, we + # can't assume that instructions at shallower scope can be + # killed by deeper barriers, since loops might be empty, e.g.: + # + # insn0 <= isn't killed by barrier (i loop could be empty) + # for i + # insn1 <= is killed by barrier + # for j + # insn2 <= is killed by barrier + # end + # barrier() + # end + if barrier_kind_more_or_equally_global(sched_item.kind, kind): + insn_ids_alive_at_scope[-1].clear() + else: + insn_ids_alive_at_scope[-1] |= set( + insn_id for insn_id in sched_item_to_insn_id(sched_item)) + + assert len(insn_ids_alive_at_scope) == 1 + return insn_ids_alive_at_scope[-1] + + +def append_barrier_or_raise_error(schedule, dep, verify_only): + if verify_only: + from loopy.diagnostic import MissingBarrierError + raise MissingBarrierError( + "Dependency '%s' (for variable '%s') " + "requires synchronization " + "by a %s barrier (add a 'no_sync_with' " + "instruction option to state that no " + "synchronization is needed)" + % ( + dep.dep_descr.format( + tgt=dep.target.id, src=dep.source.id), + dep.variable, + dep.var_kind)) + else: + comment = "for %s (%s)" % ( + dep.variable, dep.dep_descr.format( + tgt=dep.target.id, src=dep.source.id)) + schedule.append(Barrier(comment=comment, kind=dep.var_kind)) -def insert_barriers(kernel, schedule, reverse, kind, verify_only, level=0): +def insert_barriers(kernel, schedule, kind, verify_only, level=0): """ - :arg reverse: a :class:`bool`. For ``level > 0``, this function should be - called twice, first with ``reverse=False`` to insert barriers for - forward dependencies, and then again with ``reverse=True`` to insert - reverse depedencies. This order is preferable because the forward pass - will limit the number of instructions that need to be considered as - depedency source candidates by already inserting some number of - barriers into *schedule*. - - Calling it with ``reverse==True and level==0` is not necessary, - since the root of the schedule is in no loop, therefore not repeated, - and therefore reverse dependencies don't need to be added. :arg kind: "local" or "global". The :attr:`Barrier.kind` to be inserted. Generally, this function will be called once for each kind of barrier at the top level, where more global barriers should be inserted first. @@ -1557,184 +1638,118 @@ def insert_barriers(kernel, schedule, reverse, kind, verify_only, level=0): missing. :arg level: the current level of loop nesting, 0 for outermost. """ - result = [] - - # In straight-line code, we have only 'b depends on a'-type 'forward' - # dependencies. But a loop of the type - # - # for i in range(10): - # A - # B - # - # effectively glues multiple copies of 'A;B' one after the other: - # - # A - # B - # A - # B - # ... - # - # Now, if B depends on (i.e. is required to be textually before) A in a way - # requiring a barrier, then we will assume that the reverse dependency exists - # as well, i.e. a barrier between the tail end fo execution of B and the next - # beginning of A is also needed. - - if level == 0 and reverse: - # The global schedule is in no loop, therefore not repeated, and - # therefore reverse dependencies don't need to be added. - return schedule - - # a list of instruction IDs that could lead to barrier-needing dependencies. - if reverse: - candidates = set(get_tail_starting_at_last_barrier(schedule, kind)) - else: - candidates = set() - - past_first_barrier = [False] - - def seen_barrier(): - past_first_barrier[0] = True - # We've just gone across a barrier, so anything that needed - # one from above just got one. + # {{{ insert barriers at outermost scheduling level - candidates.clear() + def insert_barriers_at_outer_level(schedule, reverse=False): + dep_tracker = DependencyTracker(kernel, var_kind=kind, reverse=reverse) - def issue_barrier(dep): - seen_barrier() + if reverse: + # Populate the dependency tracker with sources from the tail end of + # the schedule block. + for insn_id in ( + insn_ids_reaching_end_without_intervening_barrier( + schedule, kind)): + dep_tracker.add_source(insn_id) - comment = None - if dep is not None: - comment = "for %s (%s)" % ( - dep.variable, dep.dep_descr.format( - tgt=dep.target.id, src=dep.source.id)) + result = [] - result.append(Barrier(comment=comment, kind=dep.var_kind)) + i = 0 + while i < len(schedule): + sched_item = schedule[i] - i = 0 - while i < len(schedule): - sched_item = schedule[i] + if isinstance(sched_item, EnterLoop): + subloop, new_i = gather_schedule_block(schedule, i) - if isinstance(sched_item, EnterLoop): - # {{{ recurse for nested loop + loop_head = ( + insn_ids_reachable_from_start_without_intervening_barrier( + subloop, kind)) - subloop, new_i = gather_schedule_block(schedule, i) - i = new_i + loop_tail = ( + insn_ids_reaching_end_without_intervening_barrier( + subloop, kind)) - # Run barrier insertion for inner loop - subresult = subloop[1:-1] - for sub_reverse in [False, True]: - subresult = insert_barriers( - kernel, subresult, - reverse=sub_reverse, kind=kind, - verify_only=verify_only, - level=level+1) + # Checks if a barrier is needed before the loop. This handles + # dependencies with targets that can be reached without an + # intervening barrier from the start of the loop: + # + # a[x] = ... <= source + # for i + # ... = a[y] <= target + # barrier() + # ... + from itertools import chain + for dep in chain.from_iterable( + dep_tracker.gen_dependencies_with_target_at(insn) + for insn in loop_head): + append_barrier_or_raise_error(result, dep, verify_only) + # This barrier gets inserted outside the loop, hence it is + # executed unconditionally and so kills all sources before + # the loop. + dep_tracker.discard_all_sources() + break - # {{{ find barriers in loop body + result.extend(subloop) - first_barrier_index = None - last_barrier_index = None + # Handle dependencies with sources not killed inside the loop: + # + # for i + # ... + # barrier() + # b[i] = ... <= source + # end for + # ... = f(b) <= target + for item in loop_tail: + dep_tracker.add_source(item) + + i = new_i + + elif isinstance(sched_item, Barrier): + result.append(sched_item) + if barrier_kind_more_or_equally_global(sched_item.kind, kind): + dep_tracker.discard_all_sources() + i += 1 + + elif isinstance(sched_item, RunInstruction): + for dep in dep_tracker.gen_dependencies_with_target_at( + sched_item.insn_id): + append_barrier_or_raise_error(result, dep, verify_only) + dep_tracker.discard_all_sources() + break + result.append(sched_item) + dep_tracker.add_source(sched_item.insn_id) + i += 1 - for j, sub_sched_item in enumerate(subresult): - if (isinstance(sub_sched_item, Barrier) and - barrier_kind_more_or_equally_global( - sub_sched_item.kind, kind)): + elif isinstance(sched_item, (CallKernel, ReturnFromKernel)): + result.append(sched_item) + i += 1 - last_barrier_index = j - if first_barrier_index is None: - first_barrier_index = j + else: + raise ValueError("unexpected schedule item type '%s'" + % type(sched_item).__name__) - # }}} + return result - # {{{ check if a barrier is needed before the loop - - # (for leading (before-first-barrier) bit of loop body) - for insn_id in insn_ids_from_schedule(subresult[:first_barrier_index]): - search_set = sorted(candidates) - - for dep_src_insn_id in search_set: - dep = get_barrier_needing_dependency( - kernel, - target=insn_id, - source=dep_src_insn_id, - reverse=reverse, var_kind=kind) - if dep: - if verify_only: - from loopy.diagnostic import MissingBarrierError - raise MissingBarrierError( - "Dependency '%s' (for variable '%s') " - "requires synchronization " - "by a %s barrier (add a 'no_sync_with' " - "instruction option to state that no" - "synchronization is needed)" - % ( - dep.dep_descr.format( - tgt=dep.target.id, src=dep.source.id), - dep.variable, - kind)) - else: - issue_barrier(dep=dep) - break + # }}} - # }}} + # {{{ recursively insert barriers in loops - # add trailing end (past-last-barrier) of loop body to candidates - if last_barrier_index is None: - candidates.update(insn_ids_from_schedule(subresult)) - else: - seen_barrier() - candidates.update( - insn_ids_from_schedule( - subresult[last_barrier_index+1:])) + result = [] + i = 0 + while i < len(schedule): + sched_item = schedule[i] + if isinstance(sched_item, EnterLoop): + subloop, new_i = gather_schedule_block(schedule, i) + new_subloop = insert_barriers( + kernel, subloop[1:-1], kind, verify_only, level + 1) result.append(subloop[0]) - result.extend(subresult) + result.extend(new_subloop) result.append(subloop[-1]) + i = new_i - # }}} - - elif isinstance(sched_item, Barrier): - i += 1 - - if barrier_kind_more_or_equally_global(sched_item.kind, kind): - seen_barrier() - - result.append(sched_item) - - elif isinstance(sched_item, RunInstruction): - i += 1 - - search_set = sorted(candidates) - - for dep_src_insn_id in search_set: - dep = get_barrier_needing_dependency( - kernel, - target=sched_item.insn_id, - source=dep_src_insn_id, - reverse=reverse, var_kind=kind) - if dep: - if verify_only: - from loopy.diagnostic import MissingBarrierError - raise MissingBarrierError( - "Dependency '%s' (for variable '%s') " - "requires synchronization " - "by a %s barrier (add a 'no_sync_with' " - "instruction option to state that no " - "synchronization is needed)" - % ( - dep.dep_descr.format( - tgt=dep.target.id, src=dep.source.id), - dep.variable, - kind)) - - else: - issue_barrier(dep=dep) - break - - result.append(sched_item) - candidates.add(sched_item.insn_id) - - elif isinstance(sched_item, (CallKernel, ReturnFromKernel)): + elif isinstance(sched_item, + (Barrier, RunInstruction, CallKernel, ReturnFromKernel)): result.append(sched_item) i += 1 @@ -1742,13 +1757,13 @@ def insert_barriers(kernel, schedule, reverse, kind, verify_only, level=0): raise ValueError("unexpected schedule item type '%s'" % type(sched_item).__name__) - if past_first_barrier[0] and reverse: - # We can quit here, because we're only trying add - # reverse-dep barriers to the beginning of the loop, up to - # the first barrier. + # }}} + + result = insert_barriers_at_outer_level(result) - result.extend(schedule[i:]) - break + # When level = 0 there is no loop. + if level != 0: + result = insert_barriers_at_outer_level(result, reverse=True) return result @@ -1897,11 +1912,11 @@ def generate_loop_schedules_inner(kernel, debug_args={}): if not kernel.options.disable_global_barriers: logger.info("%s: barrier insertion: global" % kernel.name) gen_sched = insert_barriers(kernel, gen_sched, - reverse=False, kind="global", verify_only=True) + kind="global", verify_only=True) logger.info("%s: barrier insertion: local" % kernel.name) - gen_sched = insert_barriers(kernel, gen_sched, - reverse=False, kind="local", verify_only=False) + gen_sched = insert_barriers(kernel, gen_sched, kind="local", + verify_only=False) logger.info("%s: barrier insertion: done" % kernel.name) new_kernel = kernel.copy( diff --git a/test/test_loopy.py b/test/test_loopy.py index 48ccd8ee024325150f8686185678eeb64a7395dd..db4a382046cc1aaf1465e81cf493415ace57e64d 100644 --- a/test/test_loopy.py +++ b/test/test_loopy.py @@ -1995,10 +1995,11 @@ def test_integer_reduction(ctx_factory): assert function(out) -def assert_barrier_between(knl, id1, id2): - from loopy.schedule import RunInstruction, Barrier +def assert_barrier_between(knl, id1, id2, ignore_barriers_in_levels=()): + from loopy.schedule import (RunInstruction, Barrier, EnterLoop, LeaveLoop) watch_for_barrier = False seen_barrier = False + loop_level = 0 for sched_item in knl.schedule: if isinstance(sched_item, RunInstruction): @@ -2008,9 +2009,13 @@ def assert_barrier_between(knl, id1, id2): assert watch_for_barrier assert seen_barrier return - if isinstance(sched_item, Barrier): - if watch_for_barrier: + elif isinstance(sched_item, Barrier): + if watch_for_barrier and loop_level not in ignore_barriers_in_levels: seen_barrier = True + elif isinstance(sched_item, EnterLoop): + loop_level += 1 + elif isinstance(sched_item, LeaveLoop): + loop_level -= 1 raise RuntimeError("id2 was not seen") @@ -2029,6 +2034,7 @@ def test_barrier_insertion_near_top_of_loop(): end """, seq_dependencies=True) + knl = lp.tag_inames(knl, dict(i="l.0")) knl = lp.set_temporary_scope(knl, "a", "local") knl = lp.set_temporary_scope(knl, "b", "local") @@ -2041,6 +2047,32 @@ def test_barrier_insertion_near_top_of_loop(): assert_barrier_between(knl, "bcomp1", "bcomp2") +def test_barrier_insertion_near_bottom_of_loop(): + knl = lp.make_kernel( + ["{[i]: 0 <= i < 10 }", + "[jmax] -> {[j]: 0 <= j < jmax}"], + """ + for i + <>a[i] = i {id=ainit} + for j + <>b[i,j] = a[i] + t {id=bcomp1} + b[i,j] = b[i,j] + 1 {id=bcomp2} + end + a[i] = i + 1 {id=aupdate} + end + """, + seq_dependencies=True) + knl = lp.tag_inames(knl, dict(i="l.0")) + knl = lp.set_temporary_scope(knl, "a", "local") + knl = lp.set_temporary_scope(knl, "b", "local") + knl = lp.get_one_scheduled_kernel(lp.preprocess_kernel(knl)) + + print(knl) + + assert_barrier_between(knl, "bcomp1", "bcomp2") + assert_barrier_between(knl, "ainit", "aupdate", ignore_barriers_in_levels=[1]) + + if __name__ == "__main__": if len(sys.argv) > 1: exec(sys.argv[1])