From 3e24063b72dce18afa166716a9816a440c559943 Mon Sep 17 00:00:00 2001 From: Andreas Kloeckner <inform@tiker.net> Date: Mon, 26 Sep 2011 01:06:27 -0400 Subject: [PATCH] Much more schedule hacking--not far from working. --- MEMO | 28 +++++++ loopy/__init__.py | 186 +++++++++++++++++++++++----------------------- loopy/kernel.py | 101 ++++++++++++------------- loopy/schedule.py | 101 +++++++++++++++++-------- 4 files changed, 240 insertions(+), 176 deletions(-) diff --git a/MEMO b/MEMO index d3c511f4c..bd6512aec 100644 --- a/MEMO +++ b/MEMO @@ -57,6 +57,8 @@ Things to consider - FIXME: Deal with insns losing a seq iname dep in a CSE realization + a <- cse(reduce(stuff)) + - Every loop in loopy is opened at most once. Dealt with @@ -72,6 +74,32 @@ Dealt with - Generalize reduction to be over multiple variables +Should a dependency on an iname be forced in a CSE? +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Local var: + +l | n +g | y +dl | Err +d | Err + +Private var: + +l | y +g | y +dl | Err +d | Err + +dg: Invalid-> error + +d: is duplicate +l: is tagged as local idx +g: is tagged as group idx + +Raise error if dl is targeting a private variable, regardless of whether it's +a dependency or not. + How to represent the schedule ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/loopy/__init__.py b/loopy/__init__.py index d8c688f0a..cf246e4f9 100644 --- a/loopy/__init__.py +++ b/loopy/__init__.py @@ -100,18 +100,6 @@ def split_dimension(kernel, iname, inner_length, padded_length=None, new_expr = subst_mapper(rls(insn.expression)) - old_iname_tag = insn.iname_to_tag.get(iname) - new_iname_to_tag = insn.iname_to_tag.copy() - - from loopy.kernel import UniqueTag - if not isinstance(old_iname_tag, UniqueTag): - new_iname_to_tag.pop(iname, None) - new_iname_to_tag[outer_iname] = old_iname_tag - new_iname_to_tag[inner_iname] = old_iname_tag - else: - raise RuntimeError("cannot split already unique-tagged iname '%s'" - % iname) - if iname in insn.forced_iname_deps: new_forced_iname_deps = insn.forced_iname_deps[:] new_forced_iname_deps.remove(iname) @@ -122,7 +110,6 @@ def split_dimension(kernel, iname, inner_length, padded_length=None, insn = insn.copy( assignee=subst_mapper(insn.assignee), expression=new_expr, - iname_to_tag=new_iname_to_tag, forced_iname_deps=new_forced_iname_deps ) @@ -144,43 +131,28 @@ def split_dimension(kernel, iname, inner_length, padded_length=None, -def tag_dimensions(kernel, iname_to_tag, insn_id=None): - from loopy.kernel import UniqueTag, ParallelTag, parse_tag +def tag_dimensions(kernel, iname_to_tag): + from loopy.kernel import parse_tag iname_to_tag = dict((iname, parse_tag(tag)) for iname, tag in iname_to_tag.iteritems()) - new_insns = [] - for insn in kernel.instructions: - if insn_id is None or insn_id == insn.id: - new_iname_to_tag = insn.iname_to_tag.copy() + new_iname_to_tag = kernel.iname_to_tag.copy() + for iname, new_tag in iname_to_tag.iteritems(): + if new_tag is None: + continue - existing_unique_tag_keys = set( - tag.key for tag in new_iname_to_tag.itervalues() - if isinstance(tag, UniqueTag)) + if iname not in kernel.all_inames(): + raise ValueError("cannot tag '%s'--not known" % iname) - for iname, tag in iname_to_tag.iteritems(): - if iname not in insn.all_inames(): - continue + old_tag = kernel.iname_to_tag.get(iname) + if old_tag is not None and (old_tag != new_tag): + raise RuntimeError("'%s' is already tagged '%s'--cannot retag" + % (iname, old_tag)) - if isinstance(tag, ParallelTag) and iname in insn.sequential_inames(): - raise NotImplementedError("cannot parallelize reduction dimension (yet)") + new_iname_to_tag[iname] = new_tag - new_iname_to_tag[iname] = tag - - if isinstance(tag, UniqueTag): - tag_key = tag.key - if tag_key in existing_unique_tag_keys: - raise RuntimeError("repeated unique tag key: %s" % tag_key) - - existing_unique_tag_keys.add(tag_key) - - new_insns.append( - insn.copy(iname_to_tag=new_iname_to_tag)) - else: - new_insns.append(insn) - - return kernel.copy(instructions=new_insns) + return kernel.copy(iname_to_tag=new_iname_to_tag) @@ -189,8 +161,8 @@ def tag_dimensions(kernel, iname_to_tag, insn_id=None): def realize_cse(kernel, cse_tag, dtype, duplicate_inames=[], parallel_inames=None, dup_iname_to_tag={}, new_inames=None): """ - :arg duplicate_inames: - also determines index order of temporary array + :arg duplicate_inames: which inames are supposed to be separate loops + in the CSE. Also determines index order of temporary array. :arg parallel_inames: only a convenient interface for dup_iname_to_tag """ @@ -233,10 +205,18 @@ def realize_cse(kernel, cse_tag, dtype, duplicate_inames=[], parallel_inames=Non new_inames = temp_new_inames + old_to_new_iname = dict(zip(duplicate_inames, new_inames)) + # }}} target_var_name = kernel.make_unique_var_name(cse_tag) + from loopy.kernel import (TAG_WORK_ITEM_IDX, TAG_AUTO_WORK_ITEM_IDX, + TAG_GROUP_IDX) + target_var_is_local = any( + isinstance(tag, (TAG_WORK_ITEM_IDX, TAG_AUTO_WORK_ITEM_IDX)) + for tag in dup_iname_to_tag.itervalues()) + cse_lookup_table = {} cse_result_insns = [] @@ -257,6 +237,58 @@ def realize_cse(kernel, cse_tag, dtype, duplicate_inames=[], parallel_inames=Non if cse_result_insns: raise RuntimeError("CSE tag '%s' is not unique" % cse_tag) + + # {{{ decide what to do with each iname + + parent_inames = insn.all_inames() + forced_iname_deps = [] + + from loopy.symbolic import IndexVariableFinder + dependencies = IndexVariableFinder()(expr.child) + + assert dependencies <= parent_inames + + for iname in parent_inames: + if iname in duplicate_inames: + tag = dup_iname_to_tag[iname] + else: + tag = kernel.iname_to_tag[iname] + + if isinstance(tag, (TAG_WORK_ITEM_IDX, TAG_AUTO_WORK_ITEM_IDX)): + kind = "l" + elif isinstance(tag, TAG_GROUP_IDX): + kind = "g" + else: + kind = "o" + + if iname in duplicate_inames and kind == "g": + raise RuntimeError("duplicating inames into " + "group index axes is not helpful, as they cannot " + "collaborate in computing a local variable") + + if iname in dependencies: + if not target_var_is_local and iname in duplicate_inames and kind == "l": + raise RuntimeError("invalid: parallelized " + "fetch into private variable") + + # otherwise: all happy + continue + + # the iname is *not* a dependency of the fetch expression + if iname in duplicate_inames: + raise RuntimeError("duplicating an iname " + "that the CSE does not depend on " + "does not make sense") + + force_dependency = True + if kind == "l" and target_var_is_local: + force_dependency = False + + if force_dependency: + forced_iname_deps.append(iname) + + # }}} + # {{{ concoct new inner and outer expressions from pymbolic import var @@ -278,34 +310,12 @@ def realize_cse(kernel, cse_tag, dtype, duplicate_inames=[], parallel_inames=Non # }}} - # {{{ build the new instruction's iname_to tag - - from loopy.symbolic import IndexVariableFinder - new_iname_to_tag = {} - for old_iname, new_iname in zip(duplicate_inames, new_inames): - new_iname_to_tag[new_iname] = dup_iname_to_tag[old_iname] - - index_deps = ( - IndexVariableFinder()(new_inner_expr) - | set(new_inames)) - - for iname in index_deps: - if iname not in new_iname_to_tag: - # assume generating instruction's view on - # inames on which we don't have an opinion. - - if iname in insn.iname_to_tag: - new_iname_to_tag[iname] = insn.iname_to_tag[iname] - - # }}} - from loopy.kernel import Instruction new_insn = Instruction( id=kernel.make_unique_instruction_id(based_on=cse_tag), assignee=assignee, expression=new_inner_expr, - iname_to_tag=new_iname_to_tag, - ) + forced_iname_deps=forced_iname_deps) cse_result_insns.append(new_insn) @@ -368,8 +378,8 @@ def realize_cse(kernel, cse_tag, dtype, duplicate_inames=[], parallel_inames=Non # {{{ set up data for temp variable - temp_var_base_indices = [] - temp_var_shape = [] + target_var_base_indices = [] + target_var_shape = [] for iname in new_inames: lower_bound_pw_aff = new_domain.dim_min(new_name_to_dim[iname][1]) @@ -378,28 +388,32 @@ def realize_cse(kernel, cse_tag, dtype, duplicate_inames=[], parallel_inames=Non from loopy.isl import static_max_of_pw_aff from loopy.symbolic import pw_aff_to_expr - temp_var_shape.append(static_max_of_pw_aff( + target_var_shape.append(static_max_of_pw_aff( upper_bound_pw_aff - lower_bound_pw_aff + 1)) - temp_var_base_indices.append(pw_aff_to_expr(lower_bound_pw_aff)) + target_var_base_indices.append(pw_aff_to_expr(lower_bound_pw_aff)) - from loopy.kernel import TemporaryVariable, ParallelTag + from loopy.kernel import TemporaryVariable new_temporary_variables = kernel.temporary_variables + [ TemporaryVariable( name=target_var_name, dtype=dtype, - base_indices=temp_var_base_indices, - shape=temp_var_shape, - is_local=any(isinstance(tag, ParallelTag) - for tag in dup_iname_to_tag.iterkeys())) + base_indices=target_var_base_indices, + shape=target_var_shape, + is_local=target_var_is_local) ] # }}} + new_iname_to_tag = kernel.iname_to_tag.copy() + for old_iname, new_iname in zip(duplicate_inames, new_inames): + new_iname_to_tag[new_iname] = dup_iname_to_tag[old_iname] + return kernel.copy( domain=new_domain, instructions=new_insns, temporary_variables=new_temporary_variables, - name_to_dim=new_name_to_dim) + name_to_dim=new_name_to_dim, + iname_to_tag=new_iname_to_tag) @@ -433,17 +447,12 @@ def realize_reduction(kernel, inames=None, reduction_tag=None): shape=(), is_local=False)) - init_iname_to_tag = insn.iname_to_tag.copy() - for iname in expr.inames: - del init_iname_to_tag[iname] - init_insn = Instruction( id=kernel.make_unique_instruction_id( extra_used_ids=set(ni.id for ni in new_insns)), assignee=target_var, forced_iname_deps=list(insn.all_inames() - set(expr.inames)), - expression=expr.operation.neutral_element, - iname_to_tag=init_iname_to_tag) + expression=expr.operation.neutral_element) new_insns.append(init_insn) @@ -453,8 +462,7 @@ def realize_reduction(kernel, inames=None, reduction_tag=None): assignee=target_var, expression=expr.operation(target_var, sub_expr), insn_deps=[init_insn.id], - forced_iname_deps=list(insn.all_inames()), - iname_to_tag=insn.iname_to_tag) + forced_iname_deps=list(insn.all_inames())) new_insns.append(reduction_insn) @@ -472,17 +480,11 @@ def realize_reduction(kernel, inames=None, reduction_tag=None): new_expression = cb_mapper(insn.expression) - new_iname_to_tag = insn.iname_to_tag.copy() - for iname in new_insn_removed_inames: - del new_iname_to_tag[iname] - new_insns.append( insn.copy( expression=new_expression, insn_deps=insn.insn_deps - + new_insn_insn_deps, - iname_to_tag=new_iname_to_tag, - )) + + new_insn_insn_deps)) return kernel.copy( instructions=new_insns, diff --git a/loopy/kernel.py b/loopy/kernel.py index 87cd2a9c9..a0463bded 100644 --- a/loopy/kernel.py +++ b/loopy/kernel.py @@ -214,52 +214,16 @@ class Instruction(Record): :ivar insn_deps: a list of ids of :class:`Instruction` instances that *must* be executed before this one. Note that loop scheduling augments this by adding dependencies on any writes to temporaries read by this instruction. - :ivar iname_to_tag: a map from loop domain variables to subclasses - of :class:`IndexTag` """ def __init__(self, id, assignee, expression, - forced_iname_deps=[], insn_deps=[], - iname_to_tag={}): - - # {{{ find and properly tag reduction inames - - reduction_inames = set() - - from loopy.symbolic import ReductionCallbackMapper - - def map_reduction(expr, rec): - rec(expr.expr) - reduction_inames.update(expr.inames) - - ReductionCallbackMapper(map_reduction)(expression) - - if reduction_inames: - iname_to_tag = iname_to_tag.copy() - - for iname in reduction_inames: - tag = iname_to_tag.get(iname) - if not (tag is None or isinstance(tag, SequentialTag)): - raise RuntimeError("inconsistency detected: " - "sequential/reduction iname '%s' was " - "tagged otherwise" % iname) - - iname_to_tag[iname] = SequentialTag() - - # }}} + forced_iname_deps=[], insn_deps=[]): Record.__init__(self, id=id, assignee=assignee, expression=expression, forced_iname_deps=forced_iname_deps, insn_deps=insn_deps, - iname_to_tag=dict( - (iname, parse_tag(tag)) - for iname, tag in iname_to_tag.iteritems())) - - unused_tags = set(self.iname_to_tag.iterkeys()) - self.all_inames() - if unused_tags: - raise RuntimeError("encountered tags for unused inames: " - + ", ".join(unused_tags)) + ) @memoize_method def all_inames(self): @@ -271,27 +235,19 @@ class Instruction(Record): return index_vars | set(self.forced_iname_deps) @memoize_method - def sequential_inames(self): + def sequential_inames(self, iname_to_tag): result = set() - for iname, tag in self.iname_to_tag.iteritems(): + for iname in self.all_inames(): + tag = iname_to_tag.get(iname) if isinstance(tag, SequentialTag): result.add(iname) return result def __str__(self): - loop_descrs = [] - for iname in sorted(self.all_inames()): - tag = self.iname_to_tag.get(iname) - - if tag is None: - loop_descrs.append(iname) - else: - loop_descrs.append("%s: %s" % (iname, tag)) - result = "%s: %s <- %s\n [%s]" % (self.id, - self.assignee, self.expression, ", ".join(loop_descrs)) + self.assignee, self.expression, ", ".join(sorted(self.all_inames()))) if self.insn_deps: result += "\n : " + ", ".join(self.insn_deps) @@ -418,6 +374,7 @@ class LoopKernel(Record): :ivar workgroup_size: :ivar name_to_dim: A lookup table from inames to ISL-style (dim_type, index) tuples + :ivar iname_to_tag: """ def __init__(self, device, domain, instructions, args=None, schedule=None, @@ -426,7 +383,8 @@ class LoopKernel(Record): iname_slab_increments={}, temporary_variables=[], workgroup_size=None, - name_to_dim=None): + name_to_dim=None, + iname_to_tag={}): """ :arg domain: a :class:`islpy.BasicSet`, or a string parseable to a basic set by the isl. Example: "{[i,j]: 0<=i < 10 and 0<= j < 9}" @@ -469,6 +427,33 @@ class LoopKernel(Record): if len(set(insn.id for insn in insns)) != len(insns): raise RuntimeError("instruction ids do not appear to be unique") + # {{{ find and properly tag reduction inames + + reduction_inames = set() + + from loopy.symbolic import ReductionCallbackMapper + + def map_reduction(expr, rec): + rec(expr.expr) + reduction_inames.update(expr.inames) + + for insn in insns: + ReductionCallbackMapper(map_reduction)(insn.expression) + + iname_to_tag = iname_to_tag.copy() + + if reduction_inames: + for iname in reduction_inames: + tag = iname_to_tag.get(iname) + if not (tag is None or isinstance(tag, SequentialTag)): + raise RuntimeError("inconsistency detected: " + "sequential/reduction iname '%s' was " + "tagged otherwise" % iname) + + iname_to_tag[iname] = SequentialTag() + + # }}} + if assumptions is None: assumptions = isl.Set.universe(domain.get_space()) elif isinstance(assumptions, str): @@ -492,7 +477,8 @@ class LoopKernel(Record): iname_slab_increments=iname_slab_increments, temporary_variables=temporary_variables, workgroup_size=workgroup_size, - name_to_dim=name_to_dim) + name_to_dim=name_to_dim, + iname_to_tag=iname_to_tag) def make_unique_instruction_id(self, insns=None, based_on="insn", extra_used_ids=set()): if insns is None: @@ -622,6 +608,17 @@ class LoopKernel(Record): return sum(lv.nbytes for lv in self.temporary_variables if lv.is_local) + def __str__(self): + lines = [] + + for insn in self.instructions: + lines.append(str(insn)) + lines.append("") + for iname in sorted(self.all_inames()): + lines.append("%s: %s" % (iname, self.iname_to_tag.get(iname))) + + return "\n".join(lines) + # }}} diff --git a/loopy/schedule.py b/loopy/schedule.py index d0a40d293..0dbeeaa70 100644 --- a/loopy/schedule.py +++ b/loopy/schedule.py @@ -37,7 +37,7 @@ def generate_loop_dep_graph(kernel): :return: a dict mapping an iname to the ones that need to be entered before it. """ - # FIXME likely not useful + # FIXME perhaps useful? result = {} print "------------------------------------------------------" @@ -68,6 +68,14 @@ def generate_loop_dep_graph(kernel): +def adjust_local_temp_var_storage(kernel): + from warnings import warn + warn("adjust_local_temp_var_storage is unimplemented") + return kernel + + + + def find_writers(kernel): """ :return: a dict that maps variable names to ids of insns that @@ -143,23 +151,33 @@ def add_automatic_dependencies(kernel): def generate_loop_schedules_internal(kernel, schedule=[]): + print schedule + + #if len(schedule) == 8: + #from pudb import set_trace; set_trace() + all_insn_ids = set(insn.id for insn in kernel.instructions) - print schedule scheduled_insn_ids = set(sched_item.insn_id for sched_item in schedule if isinstance(sched_item, RunInstruction)) # {{{ find active and entered loops - active_loops = set() + active_loops = [] entered_loops = set() for sched_item in schedule: if isinstance(sched_item, EnterLoop): - active_loops.add(sched_item.iname) + active_loops.append(sched_item.iname) entered_loops.add(sched_item.iname) if isinstance(sched_item, LeaveLoop): - active_loops.remove(sched_item.iname) + active_loops.pop() + + if active_loops: + last_entered_loop = active_loops[-1] + else: + last_entered_loop = None + active_loops = set(active_loops) # }}} @@ -167,17 +185,17 @@ def generate_loop_schedules_internal(kernel, schedule=[]): # {{{ see if any insn can be scheduled now - available_insn_ids = list(all_insn_ids - scheduled_insn_ids) + unscheduled_insn_ids = list(all_insn_ids - scheduled_insn_ids) - for insn_id in available_insn_ids: + for insn_id in unscheduled_insn_ids: insn = kernel.id_to_insn[insn_id] - if (active_loops == set(insn.all_inames()) + if (active_loops == insn.all_inames() and set(insn.insn_deps) <= scheduled_insn_ids): scheduled_insn_ids.add(insn.id) schedule = schedule + [RunInstruction(insn_id=insn.id)] made_progress = True - available_insn_ids = list(all_insn_ids - scheduled_insn_ids) + unscheduled_insn_ids = list(all_insn_ids - scheduled_insn_ids) # }}} @@ -186,41 +204,54 @@ def generate_loop_schedules_internal(kernel, schedule=[]): available_loops = kernel.all_inames() - entered_loops if available_loops: + found_something_useful = False + for iname in available_loops: + # {{{ determine if that gets us closer to being able to scheduling an insn + + useful = False + + hypothetical_active_loops = active_loops | set([iname]) + for insn_id in unscheduled_insn_ids: + insn = kernel.id_to_insn[insn_id] + if hypothetical_active_loops <= insn.all_inames(): + useful = True + break + + if not useful: + continue + + found_something_useful = True + + # }}} + new_schedule = schedule + [EnterLoop(iname=iname)] for sub_sched in generate_loop_schedules_internal( kernel, new_schedule): yield sub_sched - return + + if found_something_useful: + return # }}} # {{{ see if we're ready to leave a loop - leavable_loops = set() - - for iname in active_loops: - leavable = True - for insn_id in available_insn_ids: + if last_entered_loop is not None: + can_leave = True + for insn_id in unscheduled_insn_ids: insn = kernel.id_to_insn[insn_id] - if iname in insn.all_inames(): - leavable = False + if last_entered_loop in insn.all_inames(): + can_leave = False break - if leavable: - leavable_loops.add(iname) - - if leavable_loops: - for iname in leavable_loops: - new_schedule = schedule + [LeaveLoop(iname=iname)] - for sub_sched in generate_loop_schedules_internal( - kernel, new_schedule): - yield sub_sched - return + if can_leave: + schedule = schedule + [LeaveLoop(iname=last_entered_loop)] + made_progress = True # }}} - if not active_loops and not available_loops and not available_insn_ids: + if not active_loops and not available_loops and not unscheduled_insn_ids: # if done, yield result yield schedule else: @@ -240,7 +271,9 @@ def generate_loop_schedules(kernel): from loopy import realize_reduction kernel = realize_reduction(kernel) - # {{{ check that all CSEs + kernel = adjust_local_temp_var_storage(kernel) + + # {{{ check that all CSEs have been realized from loopy.symbolic import CSECallbackMapper @@ -262,17 +295,21 @@ def generate_loop_schedules(kernel): kernel = add_automatic_dependencies(kernel) - for insn_a in kernel.instructions: - print insn_a + print kernel #grid_size, group_size = find_known_grid_and_group_sizes(kernel) #kernel = assign_grid_and_group_indices(kernel) for gen_sched in generate_loop_schedules_internal(kernel): - #yield kernel.copy(schedule=gen_sched) print gen_sched + if False: + gen_sched = insert_barriers(gen_sched) + schedule = insert_parallel_dim_check_points(schedule=gen_sched) + yield kernel.copy(schedule=gen_sched) + + 1/0 -- GitLab