From 6046ccdadca8fa5617744e7aa9d4f81675a6aa5c Mon Sep 17 00:00:00 2001 From: Andreas Kloeckner <inform@tiker.net> Date: Mon, 22 Apr 2013 00:50:35 -0400 Subject: [PATCH] Add wait_for, return events in all higher-level algorithms. --- doc/source/algorithm.rst | 29 ++++++++-- doc/source/runtime.rst | 16 +----- doc/source/subst.rst | 14 +++++ pyopencl/algorithm.py | 115 +++++++++++++++++++++++---------------- pyopencl/array.py | 13 +++-- pyopencl/reduction.py | 13 ++++- pyopencl/scan.py | 16 +++--- test/test_algorithm.py | 12 ++-- 8 files changed, 142 insertions(+), 86 deletions(-) create mode 100644 doc/source/subst.rst diff --git a/doc/source/algorithm.rst b/doc/source/algorithm.rst index 408294ec..0c221f3c 100644 --- a/doc/source/algorithm.rst +++ b/doc/source/algorithm.rst @@ -1,6 +1,8 @@ Parallel Algorithms =================== +.. include:: subst.rst + Element-wise expression evalution ("map") ----------------------------------------- @@ -19,6 +21,8 @@ evaluate multi-stage expressions on one or several operands in a single pass. Invoke the generated scalar kernel. The arguments may either be scalars or :class:`GPUArray` instances. + |std-enqueue-blurb| + Here's a usage example:: import pyopencl as cl @@ -82,7 +86,17 @@ Sums and counts ("reduce") :meth:`pyopencl.Program.build`. *preamble* specifies a string of code that is inserted before the actual kernels. - .. method:: __call__(*args, queue=None) + .. method:: __call__(*args, queue=None, wait_for=None, return_event=False) + + |explain-waitfor| + + :return: the resulting scalar as a single-entry :class:`pyopencl.array.Array` + if *return_event* is *False*, otherwise a tuple ``(scalar_array, event)``. + + .. note:: + + The returned :class:`pyopencl.Event` corresponds only to part of the + execution of the reduction. It is not suitable for profiling. .. versionadded: 2011.1 @@ -179,13 +193,20 @@ Making Custom Scan Kernels .. autoclass:: GenericScanKernel - .. method:: __call__(*args, allocator=None, queue=None, size=None) + .. method:: __call__(*args, allocator=None, queue=None, size=None, wait_for=None) *queue* and *allocator* default to the ones provided on the first :class:`pyopencl.array.Array` in *args*. *size* may specify the length of the scan to be carried out. If not given, this length is inferred from the first array argument passed. + |std-enqueue-blurb| + + .. note:: + + The returned :class:`pyopencl.Event` corresponds only to part of the + execution of the scan. It is not suitable for profiling. + Debugging aids ~~~~~~~~~~~~~~ @@ -266,7 +287,3 @@ Building many variable-size lists .. autoclass:: ListOfListsBuilder .. automethod:: __call__ - -.. autoclass:: KeyValueSorter - - .. automethod:: __call__ diff --git a/doc/source/runtime.rst b/doc/source/runtime.rst index 4e303456..cc28e561 100644 --- a/doc/source/runtime.rst +++ b/doc/source/runtime.rst @@ -1,5 +1,7 @@ .. _reference-doc: +.. include:: subst.rst + OpenCL Platform/Runtime Documentation ===================================== @@ -55,20 +57,6 @@ Constants Platforms, Devices and Contexts ------------------------------- -.. |comparable| replace:: Instances of this class are hashable, and two - instances of this class may be compared using *"=="* and *"!="*. - (Hashability was added in version 2011.2.) - -.. |buf-iface| replace:: must implement the Python buffer interface. - (e.g. by being an :class:`numpy.ndarray`) -.. |explain-waitfor| replace:: *wait_for* - may either be *None* or a list of :class:`Event` instances for - whose completion this command waits before starting exeuction. -.. |std-enqueue-blurb| replace:: Returns a new :class:`Event`. |explain-waitfor| - -.. |copy-depr| replace:: **Note:** This function is deprecated as of PyOpenCL 2011.1. - Use :func:`enqueue_copy` instead. - .. function:: get_platforms() Return a list of :class:`Platform` instances. diff --git a/doc/source/subst.rst b/doc/source/subst.rst new file mode 100644 index 00000000..2d7393d8 --- /dev/null +++ b/doc/source/subst.rst @@ -0,0 +1,14 @@ +.. |comparable| replace:: Instances of this class are hashable, and two + instances of this class may be compared using *"=="* and *"!="*. + (Hashability was added in version 2011.2.) + +.. |buf-iface| replace:: must implement the Python buffer interface. + (e.g. by being an :class:`numpy.ndarray`) +.. |explain-waitfor| replace:: *wait_for* + may either be *None* or a list of :class:`pyopencl.Event` instances for + whose completion this command waits before starting exeuction. +.. |std-enqueue-blurb| replace:: Returns a new :class:`pyopencl.Event`. |explain-waitfor| + +.. |copy-depr| replace:: **Note:** This function is deprecated as of PyOpenCL 2011.1. + Use :func:`enqueue_copy` instead. + diff --git a/pyopencl/algorithm.py b/pyopencl/algorithm.py index 585dd4d7..9c33792a 100644 --- a/pyopencl/algorithm.py +++ b/pyopencl/algorithm.py @@ -52,7 +52,7 @@ _copy_if_template = ScanTemplate( template_processor="printf") -def copy_if(ary, predicate, extra_args=[], queue=None, preamble=""): +def copy_if(ary, predicate, extra_args=[], preamble="", queue=None, wait_for=None): """Copy the elements of *ary* satisfying *predicate* to an output array. :arg predicate: a C expression evaluating to a `bool`, represented as a string. @@ -60,9 +60,11 @@ def copy_if(ary, predicate, extra_args=[], queue=None, preamble=""): to `true`, then this value ends up in the output. :arg extra_args: |scan_extra_args| :arg preamble: |preamble| - :returns: a tuple *(out, count)* where *out* is the output array and *count* + :arg wait_for: |explain-waitfor| + :returns: a tuple *(out, count, event)* where *out* is the output array, *count* is an on-device scalar (fetch to host with `count.get()`) indicating - how many elements satisfied *predicate*. + how many elements satisfied *predicate*, and *event* is a + :class:`pyopencl.Event` for dependency management. .. versionadded:: 2013.1 """ @@ -82,15 +84,15 @@ def copy_if(ary, predicate, extra_args=[], queue=None, preamble=""): count = ary._new_with_changes(data=None, shape=(), strides=(), dtype=scan_dtype) # **dict is a Py2.5 workaround - knl(ary, out, count, *extra_args_values, **dict(queue=queue)) + evt = knl(ary, out, count, *extra_args_values, **dict(queue=queue, wait_for=wait_for)) - return out, count + return out, count, evt # }}} # {{{ remove_if -def remove_if(ary, predicate, extra_args=[], queue=None, preamble=""): +def remove_if(ary, predicate, extra_args=[], preamble="", queue=None, wait_for=None): """Copy the elements of *ary* not satisfying *predicate* to an output array. :arg predicate: a C expression evaluating to a `bool`, represented as a string. @@ -98,14 +100,16 @@ def remove_if(ary, predicate, extra_args=[], queue=None, preamble=""): to `false`, then this value ends up in the output. :arg extra_args: |scan_extra_args| :arg preamble: |preamble| - :returns: a tuple *(out, count)* where *out* is the output array and *count* + :arg wait_for: |explain-waitfor| + :returns: a tuple *(out, count, event)* where *out* is the output array, *count* is an on-device scalar (fetch to host with `count.get()`) indicating - how many elements did not satisfy *predicate*. + how many elements did not satisfy *predicate*, and *event* is a + :class:`pyopencl.Event` for dependency management. .. versionadded:: 2013.1 """ - return copy_if(ary, "!(%s)" % predicate, extra_args=extra_args, queue=queue, - preamble=preamble) + return copy_if(ary, "!(%s)" % predicate, extra_args=extra_args, + preamble=preamble, queue=queue, wait_for=wait_for) # }}} @@ -128,7 +132,7 @@ _partition_template = ScanTemplate( -def partition(ary, predicate, extra_args=[], queue=None, preamble=""): +def partition(ary, predicate, extra_args=[], preamble="", queue=None, wait_for=None): """Copy the elements of *ary* into one of two arrays depending on whether they satisfy *predicate*. @@ -136,9 +140,11 @@ def partition(ary, predicate, extra_args=[], queue=None, preamble=""): The value to test is available as `ary[i]`. :arg extra_args: |scan_extra_args| :arg preamble: |preamble| - :returns: a tuple *(out_true, out_false, count)* where *count* + :arg wait_for: |explain-waitfor| + :returns: a tuple *(out_true, out_false, count, event)* where *count* is an on-device scalar (fetch to host with `count.get()`) indicating - how many elements satisfied the predicate. + how many elements satisfied the predicate, and *event* is a + :class:`pyopencl.Event` for dependency management. .. versionadded:: 2013.1 """ @@ -161,9 +167,10 @@ def partition(ary, predicate, extra_args=[], queue=None, preamble=""): count = ary._new_with_changes(data=None, shape=(), strides=(), dtype=scan_dtype) # **dict is a Py2.5 workaround - knl(ary, out_true, out_false, count, *extra_args_values, **dict(queue=queue)) + evt = knl(ary, out_true, out_false, count, *extra_args_values, + **dict(queue=queue, wait_for=wait_for)) - return out_true, out_false, count + return out_true, out_false, count, evt # }}} @@ -185,7 +192,7 @@ _unique_template = ScanTemplate( template_processor="printf") -def unique(ary, is_equal_expr="a == b", extra_args=[], queue=None, preamble=""): +def unique(ary, is_equal_expr="a == b", extra_args=[], preamble="", queue=None, wait_for=None): """Copy the elements of *ary* into the output if *is_equal_expr*, applied to the array element and its predecessor, yields false. @@ -197,9 +204,11 @@ def unique(ary, is_equal_expr="a == b", extra_args=[], queue=None, preamble=""): yields `false`, the two are considered distinct. :arg extra_args: |scan_extra_args| :arg preamble: |preamble| - :returns: a tuple *(out, count)* where *out* is the output array and *count* + :arg wait_for: |explain-waitfor| + :returns: a tuple *(out, count, event)* where *out* is the output array, *count* is an on-device scalar (fetch to host with `count.get()`) indicating - how many elements satisfied the predicate. + how many elements satisfied the predicate, and *event* is a + :class:`pyopencl.Event` for dependency management. .. versionadded:: 2013.1 """ @@ -222,9 +231,9 @@ def unique(ary, is_equal_expr="a == b", extra_args=[], queue=None, preamble=""): count = ary._new_with_changes(data=None, shape=(), strides=(), dtype=scan_dtype) # **dict is a Py2.5 workaround - knl(ary, out, count, *extra_args_values, **dict(queue=queue)) + evt = knl(ary, out, count, *extra_args_values, **dict(queue=queue, wait_for=wait_for)) - return out, count + return out, count, evt # }}} @@ -446,13 +455,17 @@ class RadixSort(object): :arg key_bits: specify how many bits (starting from least-significant) there are in the key. + :arg allocator: See the *allocator* argument of :func:`pyopencl.array.empty`. :arg queue: A :class:`pyopencl.CommandQueue`, defaulting to the one from the first argument array. - :arg allocator: See the *allocator* argument of :func:`pyopencl.array.empty`. - :returns: Sorted copies of the arrays named in *sorted_args*, in the order - of that list. + :arg wait_for: |explain-waitfor| + :returns: A tuple ``(sorted, event)``. *sorted* consists of sorted + copies of the arrays named in *sorted_args*, in the order of that + list. *event* is a :class:`pyopencl.Event` for dependency management. """ + wait_for = kwargs.pop("wait_for", None) + # {{{ run control key_bits = kwargs.pop("key_bits", None) @@ -471,8 +484,6 @@ class RadixSort(object): args = list(args) - kwargs = dict(queue=queue) - base_bit = 0 while base_bit < key_bits: sorted_args = [ @@ -482,7 +493,8 @@ class RadixSort(object): scan_args = args + sorted_args + [base_bit] - self.scan_kernel(*scan_args, **kwargs) + last_evt = self.scan_kernel(*scan_args, **dict(queue=queue, wait_for=wait_for)) + wait_for = [last_evt] # substitute sorted for i, arg_descr in enumerate(self.arguments): @@ -493,7 +505,7 @@ class RadixSort(object): return [arg_val for arg_descr, arg_val in zip(self.arguments, args) - if arg_descr.name in self.sort_arg_names] + if arg_descr.name in self.sort_arg_names], last_evt # }}} @@ -894,7 +906,9 @@ class ListOfListsBuilder: be passed as their :attr:`pyopencl.array.Array.data` attribute instead. :arg allocator: optionally, the allocator to use to allocate new arrays. - :returns: a mapping from names to objects which have attributes + :arg wait_for: |explain-waitfor| + :returns: a tuple ``(lists, event)``, where + *lists* a mapping from (built) list names to objects which have attributes * `count` for the total number of entries in all lists combined * `lists` for the array containing all lists. @@ -905,6 +919,8 @@ class ListOfListsBuilder: even for the last list. This implies that all lists are contiguous. + + *event* is a :class:`pyopencl.Event` for dependency management. """ if n_objects >= int(np.iinfo(np.int32).max): index_dtype = np.int64 @@ -913,6 +929,7 @@ class ListOfListsBuilder: index_dtype = np.dtype(index_dtype) allocator = kwargs.pop("allocator", None) + wait_for = kwargs.pop("wait_for", None) if kwargs: raise TypeError("invalid keyword arguments: '%s'" % ", ".join(kwargs)) @@ -949,21 +966,27 @@ class ListOfListsBuilder: from pyopencl.array import splay gsize, lsize = splay(queue, n_objects) - count_kernel(queue, gsize, lsize, - *(tuple(count_list_args) + args + (n_objects,))) + count_event = count_kernel(queue, gsize, lsize, + *(tuple(count_list_args) + args + (n_objects,)), + wait_for=wait_for) + # {{{ run scans + scan_events = [] + for name, dtype in self.list_names_and_dtypes: if name in self.count_sharing: continue info_record = result[name] starts_ary = info_record.starts - scan_kernel(starts_ary) + evt = scan_kernel(starts_ary, wait_for=[count_event]) # set first entry to zero - cl.enqueue_copy(queue, starts_ary.data, index_dtype.type(0)) + evt = cl.enqueue_copy(queue, starts_ary.data, index_dtype.type(0), + wait_for=[evt]) + scan_events.append(evt) # retrieve count count = np.array(1, index_dtype) @@ -998,10 +1021,11 @@ class ListOfListsBuilder: # }}} - write_kernel(queue, gsize, lsize, - *(tuple(write_list_args) + args + (n_objects,))) + evt = write_kernel(queue, gsize, lsize, + *(tuple(write_list_args) + args + (n_objects,)), + **dict(wait_for=scan_events)) - return result + return result, evt # }}} @@ -1101,27 +1125,26 @@ class KeyValueSorter(object): bound_propagation_scan=bound_propagation_scan) def __call__(self, queue, keys, values, nkeys, - starts_dtype, allocator=None): + starts_dtype, allocator=None, wait_for=None): if allocator is None: allocator = values.allocator knl_info = self.get_kernels(keys.dtype, values.dtype, starts_dtype) - (values_sorted_by_key, keys_sorted_by_key - ) = knl_info.by_target_sorter( - values, keys, queue=queue) + (values_sorted_by_key, keys_sorted_by_key), evt = knl_info.by_target_sorter( + values, keys, queue=queue, wait_for=wait_for) - starts = cl.array.empty(queue, (nkeys+1), starts_dtype, - allocator=allocator) \ - .fill(len(values_sorted_by_key)) + starts = cl.array.empty(queue, (nkeys+1), starts_dtype, allocator=allocator) + evt = starts.fill_and_return_event(len(values_sorted_by_key), wait_for=[evt]) - knl_info.start_finder(starts, keys_sorted_by_key, - range=slice(len(keys_sorted_by_key))) + evt = knl_info.start_finder(starts, keys_sorted_by_key, + range=slice(len(keys_sorted_by_key)), + wait_for=[evt]) - knl_info.bound_propagation_scan(starts, nkeys, queue=queue) + evt = knl_info.bound_propagation_scan(starts, nkeys, queue=queue, wait_for=[evt]) - return starts, values_sorted_by_key + return starts, values_sorted_by_key, evt # }}} diff --git a/pyopencl/array.py b/pyopencl/array.py index 76122625..7d699905 100644 --- a/pyopencl/array.py +++ b/pyopencl/array.py @@ -157,6 +157,7 @@ def elwise_kernel_runner(kernel_getter): def kernel_runner(*args, **kwargs): repr_ary = args[0] queue = kwargs.pop("queue", None) or repr_ary.queue + wait_for = kwargs.pop("wait_for", None) knl = kernel_getter(*args) @@ -178,7 +179,7 @@ def elwise_kernel_runner(kernel_getter): actual_args.append(arg) actual_args.append(repr_ary.size) - return knl(queue, gs, ls, *actual_args) + return knl(queue, gs, ls, *actual_args, **dict(wait_for=wait_for)) try: from functools import update_wrapper @@ -717,11 +718,15 @@ class Array(object): __rtruediv__ = __rdiv__ - def fill(self, value, queue=None): - """fills the array with the specified value""" - self._fill(self, value, queue=queue) + def fill(self, value, queue=None, wait_for=None, return_event=False): + """Fills the array with the specified value.""" + self._fill(self, value, queue=queue, wait_for=wait_for) return self + def fill_and_return_event(self, value, queue=None, wait_for=None, return_event=False): + """Fills the array with the specified value.""" + return self._fill(self, value, queue=queue, wait_for=wait_for) + def __len__(self): """Return the size of the leading dimension of self.""" if len(self.shape): diff --git a/pyopencl/reduction.py b/pyopencl/reduction.py index abf3c354..b564abd9 100644 --- a/pyopencl/reduction.py +++ b/pyopencl/reduction.py @@ -316,6 +316,8 @@ class ReductionKernel: stage_inf = self.stage_1_inf queue = kwargs.pop("queue", None) + wait_for = kwargs.pop("wait_for", None) + return_event = kwargs.pop("return_event", False) if kwargs: raise TypeError("invalid keyword argument to reduction kernel") @@ -364,14 +366,19 @@ class ReductionKernel: (group_count,), self.dtype_out, allocator=repr_vec.allocator) - stage_inf.kernel( + last_evt = stage_inf.kernel( use_queue, (group_count*stage_inf.group_size,), (stage_inf.group_size,), - *([result.data]+invocation_args+[seq_count, sz])) + *([result.data]+invocation_args+[seq_count, sz]), + **dict(wait_for=wait_for)) + wait_for = [last_evt] if group_count == 1: - return result + if return_event: + return result, last_evt + else: + return result else: stage_inf = self.stage_2_inf args = (result,) + stage1_args diff --git a/pyopencl/scan.py b/pyopencl/scan.py index 0d0d5ef7..a2125f4b 100644 --- a/pyopencl/scan.py +++ b/pyopencl/scan.py @@ -1260,6 +1260,7 @@ class GenericScanKernel(_GenericScanKernelBase): allocator = kwargs.get("allocator") queue = kwargs.get("queue") n = kwargs.get("size") + wait_for = kwargs.get("wait_for") if len(args) != len(self.parsed_args): raise TypeError("expected %d arguments, got %d" % @@ -1329,9 +1330,9 @@ class GenericScanKernel(_GenericScanKernelBase): if self.store_segment_start_flags: scan1_args.append(segment_start_flags.data) - l1_info.kernel( + l1_evt = l1_info.kernel( queue, (num_intervals,), (l1_info.wg_size,), - *scan1_args, **dict(g_times_l=True)) + *scan1_args, **dict(g_times_l=True, wait_for=wait_for)) # }}} @@ -1349,9 +1350,9 @@ class GenericScanKernel(_GenericScanKernelBase): interval_results.data, # partial_scan_buffer num_intervals, interval_size] - l2_info.kernel( + l2_evt = l2_info.kernel( queue, (1,), (l1_info.wg_size,), - *scan2_args, **dict(g_times_l=True)) + *scan2_args, **dict(g_times_l=True, wait_for=[l1_evt])) # }}} @@ -1364,9 +1365,9 @@ class GenericScanKernel(_GenericScanKernelBase): if self.store_segment_start_flags: upd_args.append(segment_start_flags.data) - self.final_update_knl( + return self.final_update_knl( queue, (num_intervals,), (self.update_wg_size,), - *upd_args, **dict(g_times_l=True)) + *upd_args, **dict(g_times_l=True, wait_for=[l2_evt])) # }}} @@ -1445,6 +1446,7 @@ class GenericDebugScanKernel(_GenericScanKernelBase): allocator = kwargs.get("allocator") queue = kwargs.get("queue") n = kwargs.get("size") + wait_for = kwargs.get("wait_for") if len(args) != len(self.parsed_args): raise TypeError("expected %d arguments, got %d" % @@ -1467,7 +1469,7 @@ class GenericDebugScanKernel(_GenericScanKernelBase): # }}} - self.kernel(queue, (1,), (1,), *(data_args + [n])) + return self.kernel(queue, (1,), (1,), *(data_args + [n]), **dict(wait_for=wait_for)) # }}} diff --git a/test/test_algorithm.py b/test/test_algorithm.py index 9896f284..ca1a921d 100644 --- a/test/test_algorithm.py +++ b/test/test_algorithm.py @@ -538,7 +538,7 @@ def test_copy_if(ctx_factory): crit = a_dev.dtype.type(300) selected = a[a>crit] - selected_dev, count_dev = copy_if(a_dev, "ary[i] > myval", [("myval", crit)]) + selected_dev, count_dev, evt = copy_if(a_dev, "ary[i] > myval", [("myval", crit)]) assert (selected_dev.get()[:count_dev.get()] == selected).all() from gc import collect @@ -564,7 +564,7 @@ def test_partition(ctx_factory): false_host = a[a<=crit] from pyopencl.algorithm import partition - true_dev, false_dev, count_true_dev = partition(a_dev, "ary[i] > myval", [("myval", crit)]) + true_dev, false_dev, count_true_dev, evt = partition(a_dev, "ary[i] > myval", [("myval", crit)]) count_true_dev = count_true_dev.get() @@ -589,7 +589,7 @@ def test_unique(ctx_factory): a_unique_host = np.unique(a) from pyopencl.algorithm import unique - a_unique_dev, count_unique_dev = unique(a_dev) + a_unique_dev, count_unique_dev, evt = unique(a_dev) count_unique_dev = count_unique_dev.get() @@ -756,7 +756,7 @@ def test_sort(ctx_factory): dev_start = time() print(" device") - a_dev_sorted, = sort(a_dev, key_bits=16) + (a_dev_sorted,), evt = sort(a_dev, key_bits=16) queue.finish() dev_end = time() print(" numpy") @@ -789,7 +789,7 @@ def test_list_builder(ctx_factory): } """, arg_decls=[]) - result = builder(queue, 2000) + result, evt = builder(queue, 2000) inf = result["mylist"] assert inf.count == 3000 @@ -813,7 +813,7 @@ def test_key_value_sorter(ctx_factory): from pyopencl.algorithm import KeyValueSorter kvs = KeyValueSorter(context) - starts, lists = kvs(queue, keys, values, nkeys, starts_dtype=np.int32) + starts, lists, evt = kvs(queue, keys, values, nkeys, starts_dtype=np.int32) starts = starts.get() lists = lists.get() -- GitLab