diff --git a/pyopencl/array.py b/pyopencl/array.py index d7ef138b021bdf21162cade3a70b59136b616b3b..b904c7deae4d85901babc66195ed41170e227df7 100644 --- a/pyopencl/array.py +++ b/pyopencl/array.py @@ -1291,12 +1291,20 @@ class Array(object): def any(self, queue=None, wait_for=None): from pyopencl.reduction import get_any_kernel krnl = get_any_kernel(self.context, self.dtype) - return krnl(self, queue=queue, wait_for=wait_for) + if wait_for is None: + wait_for = [] + result, event1 = krnl(self, queue=queue, wait_for=wait_for + self.events, return_event=True) + result.add_event(event1) + return result def all(self, queue=None, wait_for=None): from pyopencl.reduction import get_all_kernel krnl = get_all_kernel(self.context, self.dtype) - return krnl(self, queue=queue, wait_for=wait_for) + if wait_for is None: + wait_for = [] + result, event1 = krnl(self, queue=queue, wait_for=wait_for + self.events, return_event=True) + result.add_event(event1) + return result @staticmethod @elwise_kernel_runner @@ -1677,11 +1685,13 @@ class Array(object): if flags is None: flags = cl.map_flags.READ | cl.map_flags.WRITE + if wait_for is None: + wait_for=[] ary, evt = cl.enqueue_map_buffer( queue or self.queue, self.base_data, flags, self.offset, - self.shape, self.dtype, strides=self.strides, wait_for=wait_for, - is_blocking=is_blocking) + self.shape, self.dtype, strides=self.strides, + wait_for=wait_for + self.events, is_blocking=is_blocking) if is_blocking: return ary @@ -1800,6 +1810,9 @@ class Array(object): """ queue = queue or self.queue or value.queue + if wait_for is None: + wait_for = [] + wait_for = wait_for + self.events if isinstance(subscript, Array): if subscript.dtype.kind != "i": @@ -2149,11 +2162,16 @@ def multi_take(arrays, indices, out=None, queue=None): cl.kernel_work_group_info.WORK_GROUP_SIZE, queue.device)) - knl(queue, gs, ls, + wait_for_this = (indices.events + + _builtin_sum((i.events for i in arrays[chunk_slice]), []) + + _builtin_sum((o.events for o in out[chunk_slice]), [])) + evt = knl(queue, gs, ls, indices.data, *([o.data for o in out[chunk_slice]] + [i.data for i in arrays[chunk_slice]] - + [indices.size])) + + [indices.size]), wait_for=wait_for_this) + for o in out[chunk_slice]: + o.add_event(evt) return out @@ -2223,7 +2241,10 @@ def multi_take_put(arrays, dest_indices, src_indices, dest_shape=None, queue.device)) from pytools import flatten - knl(queue, gs, ls, + wait_for_this = (dest_indices.events + src_indices.events + + _builtin_sum((i.events for i in arrays[chunk_slice]), []) + + _builtin_sum((o.events for o in out[chunk_slice]), [])) + evt = knl(queue, gs, ls, *([o.data for o in out[chunk_slice]] + [dest_indices.base_data, dest_indices.offset, @@ -2234,6 +2255,8 @@ def multi_take_put(arrays, dest_indices, src_indices, dest_shape=None, for i in arrays[chunk_slice])) + src_offsets_list[chunk_slice] + [src_indices.size])) + for o in out[chunk_slice]: + o.add_event(evt) return out @@ -2248,6 +2271,10 @@ def multi_put(arrays, dest_indices, dest_shape=None, out=None, queue=None, a_allocator = arrays[0].allocator context = dest_indices.context queue = queue or dest_indices.queue + if wait_for is None: + wait_for = [] + wait_for = wait_for + dest_indices.events + vec_count = len(arrays) @@ -2299,6 +2326,9 @@ def multi_put(arrays, dest_indices, dest_shape=None, out=None, queue=None, queue.device)) from pytools import flatten + wait_for_this = (wait_for + + _builtin_sum((i.events for i in arrays[chunk_slice]), []) + + _builtin_sum((o.events for o in out[chunk_slice]), [])) evt = knl(queue, gs, ls, *( list(flatten( @@ -2311,9 +2341,7 @@ def multi_put(arrays, dest_indices, dest_shape=None, out=None, queue=None, + [use_fill_cla.base_data, use_fill_cla.offset] + [array_lengths_cla.base_data, array_lengths_cla.offset] + [dest_indices.size]), - **dict(wait_for=wait_for)) - - # FIXME should wait on incoming events + **dict(wait_for=wait_for_this)) for o in out[chunk_slice]: o.add_event(evt) @@ -2391,7 +2419,8 @@ def diff(array, queue=None, allocator=None): allocator = allocator or array.allocator result = empty(queue, (n-1,), array.dtype, allocator=allocator) - _diff(result, array, queue=queue) + event1 = _diff(result, array, queue=queue) + result.add_event(event1) return result @@ -2472,7 +2501,8 @@ def if_positive(criterion, then_, else_, out=None, queue=None): if out is None: out = empty_like(then_) - _if_positive(out, criterion, then_, else_, queue=queue) + event1 = _if_positive(out, criterion, then_, else_, queue=queue) + out.add_event(event1) return out @@ -2505,7 +2535,9 @@ def sum(a, dtype=None, queue=None, slice=None): """ from pyopencl.reduction import get_sum_kernel krnl = get_sum_kernel(a.context, dtype, a.dtype) - return krnl(a, queue=queue, slice=slice) + result, event1 = krnl(a, queue=queue, slice=slice, wait_for=a.events, return_event=True) + result.add_event(event1) + return result def dot(a, b, dtype=None, queue=None, slice=None): @@ -2514,7 +2546,9 @@ def dot(a, b, dtype=None, queue=None, slice=None): """ from pyopencl.reduction import get_dot_kernel krnl = get_dot_kernel(a.context, dtype, a.dtype, b.dtype) - return krnl(a, b, queue=queue, slice=slice) + result, event1 = krnl(a, b, queue=queue, slice=slice, wait_for=a.events + b.events, return_event=True) + result.add_event(event1) + return result def vdot(a, b, dtype=None, queue=None, slice=None): @@ -2525,7 +2559,9 @@ def vdot(a, b, dtype=None, queue=None, slice=None): from pyopencl.reduction import get_dot_kernel krnl = get_dot_kernel(a.context, dtype, a.dtype, b.dtype, conjugate_first=True) - return krnl(a, b, queue=queue, slice=slice) + result, event1 = krnl(a, b, queue=queue, slice=slice, wait_for=a.events + b.events, return_event=True) + result.add_event(event1) + return result def subset_dot(subset, a, b, dtype=None, queue=None, slice=None): @@ -2535,14 +2571,19 @@ def subset_dot(subset, a, b, dtype=None, queue=None, slice=None): from pyopencl.reduction import get_subset_dot_kernel krnl = get_subset_dot_kernel( a.context, dtype, subset.dtype, a.dtype, b.dtype) - return krnl(subset, a, b, queue=queue, slice=slice) + result, event1 = krnl(subset, a, b, queue=queue, slice=slice, + wait_for=subset.events + a.events + b.events, return_event=True) + result.add_event(event1) + return result def _make_minmax_kernel(what): def f(a, queue=None): from pyopencl.reduction import get_minmax_kernel krnl = get_minmax_kernel(a.context, what, a.dtype) - return krnl(a, queue=queue) + result, event1 = krnl(a, queue=queue, wait_for=a.events, return_event=True) + result.add_event(event1) + return result return f @@ -2562,8 +2603,10 @@ def _make_subset_minmax_kernel(what): def f(subset, a, queue=None, slice=None): from pyopencl.reduction import get_subset_minmax_kernel krnl = get_subset_minmax_kernel(a.context, what, a.dtype, subset.dtype) - return krnl(subset, a, queue=queue, slice=slice) - + result, event1 = krnl(subset, a, queue=queue, slice=slice, + wait_for=a.events + subset.events, return_event=True) + result.add_event(event1) + return result return f @@ -2587,12 +2630,15 @@ def cumsum(a, output_dtype=None, queue=None, if output_dtype is None: output_dtype = a.dtype + if wait_for is None: + wait_for = [] result = a._new_like_me(output_dtype) from pyopencl.scan import get_cumsum_kernel krnl = get_cumsum_kernel(a.context, a.dtype, output_dtype) - evt = krnl(a, result, queue=queue, wait_for=wait_for) + evt = krnl(a, result, queue=queue, wait_for=wait_for + a.events) + result.add_event(evt) if return_event: return evt, result diff --git a/test/test_array.py b/test/test_array.py index fdfcfce3a7ce857125c38b832a4db084c4fd86c5..c9771ac0b5bd15153080bd06a5dbdb658009b4ed 100644 --- a/test/test_array.py +++ b/test/test_array.py @@ -1241,6 +1241,41 @@ def test_outoforderqueue_copy(ctx_factory): b = 10 * (a**2 - 7) assert np.abs(b1 - b).mean() < 1e-5 +def test_outoforderqueue_indexing(ctx_factory): + context = ctx_factory() + try: + queue = cl.CommandQueue(context, properties=cl.command_queue_properties.OUT_OF_ORDER_EXEC_MODE_ENABLE) + except Exception: + pytest.skip("out-of-order queue not available") + a = np.random.rand(10**6).astype(np.dtype('float32')) + i = (8e5 + 1e5 * np.random.rand(10**5)).astype(np.dtype('int32')) + a_gpu = cl_array.to_device(queue, a) + i_gpu = cl_array.to_device(queue, i) + c_gpu = (a_gpu**2)[i_gpu - 10000] + b_gpu = 10 - a_gpu + b_gpu[:] = 8 * a_gpu + b_gpu[i_gpu + 10000] = c_gpu - 10 + queue.finish() + b1 = b_gpu.get() + c = (a**2)[i - 10000] + b = 8 * a + b[i + 10000] = c - 10 + assert np.abs(b1 - b).mean() < 1e-5 + +def test_outoforderqueue_reductions(ctx_factory): + context = ctx_factory() + try: + queue = cl.CommandQueue(context, properties=cl.command_queue_properties.OUT_OF_ORDER_EXEC_MODE_ENABLE) + except Exception: + pytest.skip("out-of-order queue not available") + a = (np.random.rand(10**6) > 0.5).astype(np.dtype('float32')) # 0/1 values to avoid accumulated rounding error + a[800000] = 10 # all<5 looks true until near the end + a_gpu = cl_array.to_device(queue, a) + b1 = cl_array.sum(a_gpu).get() + b2 = cl_array.dot(a_gpu, 3 - a_gpu).get() + b3 = (a_gpu < 5).all().get() + assert b1 == a.sum() and b2 == a.dot(3 - a) and b3 == 0 + if __name__ == "__main__": # make sure that import failures get reported, instead of skipping the # tests.