diff --git a/loopy/kernel/creation.py b/loopy/kernel/creation.py index e25320729df3d5b4cceda46c9c406eb80e13ebdf..bcc387494e84c10dfd4087c74835ac64c3bd2705 100644 --- a/loopy/kernel/creation.py +++ b/loopy/kernel/creation.py @@ -655,33 +655,6 @@ class ArgumentGuesser: # }}} -# {{{ tag reduction inames as sequential - -def tag_reduction_inames_as_sequential(knl): - result = set() - - for insn in knl.instructions: - result.update(insn.reduction_inames()) - - from loopy.kernel.data import ParallelTag, ForceSequentialTag - - new_iname_to_tag = {} - for iname in result: - tag = knl.iname_to_tag.get(iname) - if tag is not None and isinstance(tag, ParallelTag): - raise RuntimeError("inconsistency detected: " - "reduction iname '%s' has " - "a parallel tag" % iname) - - if tag is None: - new_iname_to_tag[iname] = ForceSequentialTag() - - from loopy import tag_inames - return tag_inames(knl, new_iname_to_tag) - -# }}} - - # {{{ sanity checking def check_for_duplicate_names(knl): @@ -1325,7 +1298,6 @@ def make_kernel(domains, instructions, kernel_data=["..."], **kwargs): knl = create_temporaries(knl, default_order) knl = determine_shapes_of_temporaries(knl) - knl = tag_reduction_inames_as_sequential(knl) knl = expand_defines_in_shapes(knl, defines) knl = guess_arg_shape_if_requested(knl, default_order) knl = apply_default_order_to_args(knl, default_order) diff --git a/loopy/preprocess.py b/loopy/preprocess.py index 8b7be4f0407bb0b9e52f0b6915841af1c5dd7ac5..7b94d1ea98202188bf002e02ec3469c9aa1e9b62 100644 --- a/loopy/preprocess.py +++ b/loopy/preprocess.py @@ -28,7 +28,10 @@ from loopy.diagnostic import ( LoopyError, WriteRaceConditionWarning, warn, LoopyAdvisory, DependencyTypeInferenceFailure) +import islpy as isl + from pytools.persistent_dict import PersistentDict + from loopy.tools import LoopyKeyBuilder from loopy.version import DATA_MODEL_VERSION from loopy.kernel.data import make_assignment @@ -479,6 +482,7 @@ def realize_reduction(kernel, insn_id_filter=None): logger.debug("%s: realize reduction" % kernel.name) new_insns = [] + new_iname_tags = {} insn_id_gen = kernel.get_instruction_id_generator() @@ -488,27 +492,14 @@ def realize_reduction(kernel, insn_id_filter=None): from loopy.expression import TypeInferenceMapper type_inf_mapper = TypeInferenceMapper(kernel) - def map_reduction(expr, rec, multiple_values_ok=False): - # Only expand one level of reduction at a time, going from outermost to - # innermost. Otherwise we get the (iname + insn) dependencies wrong. - - try: - arg_dtype = type_inf_mapper(expr.expr) - except DependencyTypeInferenceFailure: - raise LoopyError("failed to determine type of accumulator for " - "reduction '%s'" % expr) - - arg_dtype = arg_dtype.with_target(kernel.target) - - reduction_dtypes = expr.operation.result_dtypes( - kernel, arg_dtype, expr.inames) - reduction_dtypes = tuple( - dt.with_target(kernel.target) for dt in reduction_dtypes) + # {{{ sequential + def map_reduction_seq(expr, rec, multiple_values_ok, arg_dtype, + reduction_dtypes): + outer_insn_inames = temp_kernel.insn_inames(insn) ncomp = len(reduction_dtypes) from pymbolic import var - acc_var_names = [ var_name_gen("acc_"+"_".join(expr.inames)) for i in range(ncomp)] @@ -523,12 +514,6 @@ def realize_reduction(kernel, insn_id_filter=None): dtype=dtype, scope=temp_var_scope.PRIVATE) - outer_insn_inames = temp_kernel.insn_inames(insn) - bad_inames = frozenset(expr.inames) & outer_insn_inames - if bad_inames: - raise LoopyError("reduction used within loop(s) that it was " - "supposed to reduce over: " + ", ".join(bad_inames)) - init_id = insn_id_gen( "%s_%s_init" % (insn.id, "_".join(expr.inames))) @@ -562,7 +547,7 @@ def realize_reduction(kernel, insn_id_filter=None): generated_insns.append(reduction_insn) - new_insn_depends_on.add(reduction_insn.id) + new_insn_add_depends_on.add(reduction_insn.id) if multiple_values_ok: return acc_vars @@ -570,17 +555,272 @@ def realize_reduction(kernel, insn_id_filter=None): assert len(acc_vars) == 1 return acc_vars[0] + # }}} + + # {{{ local-parallel + + def _get_int_iname_size(iname): + from loopy.isl_helpers import static_max_of_pw_aff + from loopy.symbolic import pw_aff_to_expr + size = pw_aff_to_expr( + static_max_of_pw_aff( + kernel.get_iname_bounds(iname).size, + constants_only=True)) + assert isinstance(size, six.integer_types) + return size + + def _make_slab_set(iname, size): + v = isl.make_zero_and_vars([iname]) + bs, = ( + v[0].le_set(v[iname]) + & + v[iname].lt_set(v[0] + size)).get_basic_sets() + return bs + + def map_reduction_local(expr, rec, multiple_values_ok, arg_dtype, + reduction_dtypes): + red_iname, = expr.inames + ncomp = len(reduction_dtypes) + + size = _get_int_iname_size(red_iname) + + outer_insn_inames = temp_kernel.insn_inames(insn) + + from loopy.kernel.data import LocalIndexTagBase + outer_local_inames = tuple( + oiname + for oiname in outer_insn_inames + if isinstance( + kernel.iname_to_tag.get(oiname), + LocalIndexTagBase)) + + from pymbolic import var + outer_local_iname_vars = tuple( + var(oiname) for oiname in outer_local_inames) + + outer_local_iname_sizes = tuple( + _get_int_iname_size(oiname) + for oiname in outer_local_inames) + + # {{{ add separate iname to carry out the reduction + + # Doing this sheds any odd conditionals that may be active + # on our red_iname. + + base_exec_iname = var_name_gen("red_"+red_iname) + domains.append(_make_slab_set(base_exec_iname, size)) + new_iname_tags[base_exec_iname] = kernel.iname_to_tag[red_iname] + + # }}} + + acc_var_names = [ + var_name_gen("acc_"+red_iname) + for i in range(ncomp)] + acc_vars = tuple(var(n) for n in acc_var_names) + + from loopy.kernel.data import TemporaryVariable, temp_var_scope + for name, dtype in zip(acc_var_names, reduction_dtypes): + new_temporary_variables[name] = TemporaryVariable( + name=name, + shape=outer_local_iname_sizes + (size,), + dtype=dtype, + scope=temp_var_scope.LOCAL) + + base_iname_deps = outer_insn_inames - frozenset(expr.inames) + + neutral = expr.operation.neutral_element(arg_dtype, expr.inames) + + init_id = insn_id_gen("%s_%s_init" % (insn.id, red_iname)) + init_insn = make_assignment( + id=init_id, + assignees=tuple( + acc_var[outer_local_iname_vars + (var(base_exec_iname),)] + for acc_var in acc_vars), + expression=neutral, + forced_iname_deps=base_iname_deps | frozenset([base_exec_iname]), + forced_iname_deps_is_final=insn.forced_iname_deps_is_final, + depends_on=frozenset()) + generated_insns.append(init_insn) + + transfer_id = insn_id_gen("%s_%s_transfer" % (insn.id, red_iname)) + transfer_insn = make_assignment( + id=transfer_id, + assignees=tuple( + acc_var[outer_local_iname_vars + (var(red_iname),)] + for acc_var in acc_vars), + expression=expr.operation( + arg_dtype, neutral, expr.expr, expr.inames), + forced_iname_deps=( + (outer_insn_inames - frozenset(expr.inames)) + | frozenset([red_iname])), + forced_iname_deps_is_final=insn.forced_iname_deps_is_final, + depends_on=frozenset([init_id]), + no_sync_with=frozenset([init_id])) + generated_insns.append(transfer_insn) + + def _strip_if_scalar(c): + if len(acc_vars) == 1: + return c[0] + else: + return c + + cur_size = 1 + while cur_size < size: + cur_size *= 2 + + prev_id = transfer_id + bound = size + + istage = 0 + while cur_size > 1: + + new_size = cur_size // 2 + assert new_size * 2 == cur_size + + stage_exec_iname = var_name_gen("red_%s_s%d" % (red_iname, istage)) + domains.append(_make_slab_set(stage_exec_iname, bound-new_size)) + new_iname_tags[stage_exec_iname] = kernel.iname_to_tag[red_iname] + + stage_id = insn_id_gen("red_%s_stage_%d" % (red_iname, istage)) + stage_insn = make_assignment( + id=stage_id, + assignees=tuple( + acc_var[outer_local_iname_vars + (var(stage_exec_iname),)] + for acc_var in acc_vars), + expression=expr.operation( + arg_dtype, + _strip_if_scalar([ + acc_var[ + outer_local_iname_vars + (var(stage_exec_iname),)] + for acc_var in acc_vars]), + _strip_if_scalar([ + acc_var[ + outer_local_iname_vars + ( + var(stage_exec_iname) + new_size,)] + for acc_var in acc_vars]), + expr.inames), + forced_iname_deps=( + base_iname_deps | frozenset([stage_exec_iname])), + forced_iname_deps_is_final=insn.forced_iname_deps_is_final, + depends_on=frozenset([prev_id]), + ) + + generated_insns.append(stage_insn) + prev_id = stage_id + + cur_size = new_size + bound = cur_size + istage += 1 + + new_insn_add_depends_on.add(prev_id) + new_insn_add_no_sync_with.add(prev_id) + new_insn_add_forced_iname_deps.add(stage_exec_iname or base_exec_iname) + + if multiple_values_ok: + return [acc_var[outer_local_iname_vars + (0,)] for acc_var in acc_vars] + else: + assert len(acc_vars) == 1 + return acc_vars[0][outer_local_iname_vars + (0,)] + + # }}} + + # {{{ seq/par dispatch + + def map_reduction(expr, rec, multiple_values_ok=False): + # Only expand one level of reduction at a time, going from outermost to + # innermost. Otherwise we get the (iname + insn) dependencies wrong. + + try: + arg_dtype = type_inf_mapper(expr.expr) + except DependencyTypeInferenceFailure: + raise LoopyError("failed to determine type of accumulator for " + "reduction '%s'" % expr) + + arg_dtype = arg_dtype.with_target(kernel.target) + + reduction_dtypes = expr.operation.result_dtypes( + kernel, arg_dtype, expr.inames) + reduction_dtypes = tuple( + dt.with_target(kernel.target) for dt in reduction_dtypes) + + outer_insn_inames = temp_kernel.insn_inames(insn) + bad_inames = frozenset(expr.inames) & outer_insn_inames + if bad_inames: + raise LoopyError("reduction used within loop(s) that it was " + "supposed to reduce over: " + ", ".join(bad_inames)) + + n_sequential = 0 + n_local_par = 0 + + from loopy.kernel.data import ( + LocalIndexTagBase, UnrolledIlpTag, UnrollTag, VectorizeTag, + ParallelTag) + for iname in expr.inames: + iname_tag = kernel.iname_to_tag.get(iname) + + if isinstance(iname_tag, (UnrollTag, UnrolledIlpTag)): + # These are nominally parallel, but we can live with + # them as sequential. + n_sequential += 1 + + elif isinstance(iname_tag, LocalIndexTagBase): + n_local_par += 1 + + elif isinstance(iname_tag, (ParallelTag, VectorizeTag)): + raise LoopyError("the only form of parallelism supported " + "by reductions is 'local'--found iname '%s' " + "tagged '%s'" + % (iname, type(iname_tag).__name__)) + + else: + n_sequential += 1 + + if n_local_par and n_sequential: + raise LoopyError("Reduction over '%s' contains both parallel and " + "sequential inames. It must be split " + "(using split_reduction_{in,out}ward) " + "before code generation." + % ", ".join(expr.inames)) + + if n_local_par > 1: + raise LoopyError("Reduction over '%s' contains more than" + "one parallel iname. It must be split " + "(using split_reduction_{in,out}ward) " + "before code generation." + % ", ".join(expr.inames)) + + if n_sequential: + assert n_local_par == 0 + return map_reduction_seq(expr, rec, multiple_values_ok, arg_dtype, + reduction_dtypes) + elif n_local_par: + return map_reduction_local(expr, rec, multiple_values_ok, arg_dtype, + reduction_dtypes) + else: + from loopy.diagnostic import warn + warn(kernel, "empty_reduction", + "Empty reduction found (no inames to reduce over). " + "Eliminating.") + + return expr.expr + + # }}} + from loopy.symbolic import ReductionCallbackMapper cb_mapper = ReductionCallbackMapper(map_reduction) insn_queue = kernel.instructions[:] insn_id_replacements = {} + domains = kernel.domains[:] temp_kernel = kernel import loopy as lp while insn_queue: - new_insn_depends_on = set() + new_insn_add_depends_on = set() + new_insn_add_no_sync_with = set() + new_insn_add_forced_iname_deps = set() + generated_insns = [] insn = insn_queue.pop(0) @@ -603,8 +843,13 @@ def realize_reduction(kernel, insn_id_filter=None): kwargs = insn.get_copy_kwargs( depends_on=insn.depends_on - | frozenset(new_insn_depends_on), - forced_iname_deps=temp_kernel.insn_inames(insn)) + | frozenset(new_insn_add_depends_on), + no_sync_with=insn.no_sync_with + | frozenset(new_insn_add_no_sync_with), + forced_iname_deps=( + temp_kernel.insn_inames(insn) + | new_insn_add_forced_iname_deps) + ) kwargs.pop("id") kwargs.pop("expression") @@ -624,8 +869,6 @@ def realize_reduction(kernel, insn_id_filter=None): insn_id_replacements[insn.id] = [ rinsn.id for rinsn in replacement_insns] - # FIXME: Track dep rename - insn_queue = generated_insns + replacement_insns + insn_queue # The reduction expander needs an up-to-date kernel @@ -637,23 +880,28 @@ def realize_reduction(kernel, insn_id_filter=None): else: # nothing happened, we're done with insn - assert not new_insn_depends_on + assert not new_insn_add_depends_on new_insns.append(insn) kernel = kernel.copy( instructions=new_insns, - temporary_variables=new_temporary_variables) + temporary_variables=new_temporary_variables, + domains=domains) - return lp.replace_instruction_ids(kernel, insn_id_replacements) + kernel = lp.replace_instruction_ids(kernel, insn_id_replacements) + + kernel = lp.tag_inames(kernel, new_iname_tags) + + return kernel # }}} -# {{{ find boostability of instructions +# {{{ find idempotence ("boostability") of instructions -def find_boostability(kernel): - logger.debug("%s: boostability" % kernel.name) +def find_idempotence(kernel): + logger.debug("%s: idempotence" % kernel.name) writer_map = kernel.writer_map() @@ -661,16 +909,20 @@ def find_boostability(kernel): var_names = arg_names | set(six.iterkeys(kernel.temporary_variables)) - dep_map = dict( + reads_map = dict( (insn.id, insn.read_dependency_names() & var_names) for insn in kernel.instructions) - non_boostable_vars = set() + non_idempotently_updated_vars = set() + + # FIXME: This can be made more efficient by simply starting + # from all written variables and not even considering + # instructions as the start of the first pass. new_insns = [] for insn in kernel.instructions: all_my_var_writers = set() - for var in dep_map[insn.id]: + for var in reads_map[insn.id]: var_writers = writer_map.get(var, set()) all_my_var_writers |= var_writers @@ -680,7 +932,7 @@ def find_boostability(kernel): last_all_my_var_writers = all_my_var_writers for writer_insn_id in last_all_my_var_writers: - for var in dep_map[writer_insn_id]: + for var in reads_map[writer_insn_id]: all_my_var_writers = \ all_my_var_writers | writer_map.get(var, set()) @@ -692,19 +944,20 @@ def find_boostability(kernel): boostable = insn.id not in all_my_var_writers if not boostable: - non_boostable_vars.update( + non_idempotently_updated_vars.update( var_name for var_name, _ in insn.assignees_and_indices()) insn = insn.copy(boostable=boostable) new_insns.append(insn) - # {{{ remove boostability from isns that access non-boostable vars + # {{{ remove boostability from isns that access non-idempotently updated vars new2_insns = [] for insn in new_insns: accessed_vars = insn.dependency_names() - boostable = insn.boostable and not bool(non_boostable_vars & accessed_vars) + boostable = insn.boostable and not bool( + non_idempotently_updated_vars & accessed_vars) new2_insns.append(insn.copy(boostable=boostable)) # }}} @@ -837,7 +1090,7 @@ def preprocess_kernel(kernel, device=None): kernel = add_axes_to_temporaries_for_ilp_and_vec(kernel) kernel = find_temporary_scope(kernel) - kernel = find_boostability(kernel) + kernel = find_idempotence(kernel) kernel = limit_boostability(kernel) kernel = kernel.target.preprocess(kernel) diff --git a/loopy/version.py b/loopy/version.py index cfaa4b9a01d4eeb57b4cdf1b64a114e3b8c3bb33..7e1870ef718cd3c4a1ef197f0fb25e86d7a6175d 100644 --- a/loopy/version.py +++ b/loopy/version.py @@ -32,4 +32,4 @@ except ImportError: else: _islpy_version = islpy.version.VERSION_TEXT -DATA_MODEL_VERSION = "v33-islpy%s" % _islpy_version +DATA_MODEL_VERSION = "v34-islpy%s" % _islpy_version diff --git a/test/test_loopy.py b/test/test_loopy.py index 57f90f49503504ed435f3fef6baa91f14fa15665..61cf56792fa1b93767503c8191f250bfae9ff23c 100644 --- a/test/test_loopy.py +++ b/test/test_loopy.py @@ -681,6 +681,41 @@ def test_recursive_nested_dependent_reduction(ctx_factory): # FIXME: Actually test functionality. +@pytest.mark.parametrize("size", [128, 5, 113, 67]) +def test_local_parallel_reduction(ctx_factory, size): + ctx = ctx_factory() + queue = cl.CommandQueue(ctx) + + knl = lp.make_kernel( + "{[i, j]: 0 <= i < n and 0 <= j < 5}", + """ + z[j] = sum(i, i+j) + """) + + knl = lp.fix_parameters(knl, n=size) + + ref_knl = knl + + def variant0(knl): + return lp.tag_inames(knl, "i:l.0") + + def variant1(knl): + return lp.tag_inames(knl, "i:l.0,j:l.1") + + def variant2(knl): + return lp.tag_inames(knl, "i:l.0,j:g.0") + + for variant in [ + variant0, + variant1, + variant2 + ]: + knl = variant(ref_knl) + evt, (z,) = knl(queue) + + lp.auto_test_vs_ref(ref_knl, ctx, knl) + + def test_dependent_loop_bounds(ctx_factory): dtype = np.dtype(np.float32) ctx = ctx_factory()