diff --git a/pyopencl/array.py b/pyopencl/array.py index 5d77482c4d9f852d211522b7ccd7ef6d1b48050f..01eaf395bd3d82a9d4e611d89d88f5a69f04ecd0 100644 --- a/pyopencl/array.py +++ b/pyopencl/array.py @@ -423,6 +423,23 @@ class Array(object): .. automethod :: __le__ .. automethod :: __gt__ .. automethod :: __ge__ + + .. rubric:: Event management + + If an array is used from within an out-of-order queue, it needs to take + care of its own operation ordering. The facilities in this section make + this possible. + + .. versionadded:: 2014.1.1 + + .. attribute:: events + + A list of :class:`pyopencl.Event` instances that the current content of + this array depends on. User code may read, but should never modify this + list directly. To update this list, instead use the following methods. + + .. automethod:: add_event + .. automethod:: finish """ __array_priority__ = 100 @@ -573,7 +590,7 @@ class Array(object): def _new_with_changes(self, data, offset, shape=None, dtype=None, strides=None, queue=_copy_queue): """ - :arg data: *None* means alocate a new array. + :arg data: *None* means allocate a new array. """ if shape is None: shape = self.shape @@ -584,14 +601,23 @@ class Array(object): if queue is _copy_queue: queue = self.queue + # If we're allocating new data, then there's not likely to be + # a data dependency. Otherwise, the two arrays should probably + # share the same events list. + + if data is None: + events = None + else: + events = self.events + if queue is not None: return Array(queue, shape, dtype, allocator=self.allocator, strides=strides, data=data, offset=offset, - events=self.events) + events=events) else: return Array(self.context, shape, dtype, queue=queue, strides=strides, data=data, offset=offset, - events=self.events, allocator=self.allocator) + events=events, allocator=self.allocator) def with_queue(self, queue): """Return a copy of *self* with the default queue set to *queue*. @@ -844,7 +870,8 @@ class Array(object): """ result = self._new_like_me( _get_common_dtype(self, other, queue or self.queue)) - self._axpbyz(result, selffac, self, otherfac, other) + result.add_event( + self._axpbyz(result, selffac, self, otherfac, other)) return result def __add__(self, other): @@ -854,9 +881,12 @@ class Array(object): # add another vector result = self._new_like_me( _get_common_dtype(self, other, self.queue)) - self._axpbyz(result, - self.dtype.type(1), self, - other.dtype.type(1), other) + + result.add_event( + self._axpbyz(result, + self.dtype.type(1), self, + other.dtype.type(1), other)) + return result else: # add a scalar @@ -865,8 +895,9 @@ class Array(object): else: common_dtype = _get_common_dtype(self, other, self.queue) result = self._new_like_me(common_dtype) - self._axpbz(result, self.dtype.type(1), - self, common_dtype.type(other)) + result.add_event( + self._axpbz(result, self.dtype.type(1), + self, common_dtype.type(other))) return result __radd__ = __add__ @@ -877,9 +908,11 @@ class Array(object): if isinstance(other, Array): result = self._new_like_me( _get_common_dtype(self, other, self.queue)) - self._axpbyz(result, - self.dtype.type(1), self, - other.dtype.type(-1), other) + result.add_event( + self._axpbyz(result, + self.dtype.type(1), self, + other.dtype.type(-1), other)) + return result else: # subtract a scalar @@ -888,7 +921,8 @@ class Array(object): else: result = self._new_like_me( _get_common_dtype(self, other, self.queue)) - self._axpbz(result, self.dtype.type(1), self, -other) + result.add_event( + self._axpbz(result, self.dtype.type(1), self, -other)) return result def __rsub__(self, other): @@ -899,24 +933,28 @@ class Array(object): common_dtype = _get_common_dtype(self, other, self.queue) # other must be a scalar result = self._new_like_me(common_dtype) - self._axpbz(result, self.dtype.type(-1), self, - common_dtype.type(other)) + result.add_event( + self._axpbz(result, self.dtype.type(-1), self, + common_dtype.type(other))) return result def __iadd__(self, other): if isinstance(other, Array): - self._axpbyz(self, - self.dtype.type(1), self, - other.dtype.type(1), other) + self.add_event( + self._axpbyz(self, + self.dtype.type(1), self, + other.dtype.type(1), other)) return self else: - self._axpbz(self, self.dtype.type(1), self, other) + self.add_event( + self._axpbz(self, self.dtype.type(1), self, other)) return self def __isub__(self, other): if isinstance(other, Array): - self._axpbyz(self, self.dtype.type(1), self, - other.dtype.type(-1), other) + self.add_event( + self._axpbyz(self, self.dtype.type(1), self, + other.dtype.type(-1), other)) return self else: self._axpbz(self, self.dtype.type(1), self, -other) @@ -924,35 +962,40 @@ class Array(object): def __neg__(self): result = self._new_like_me() - self._axpbz(result, -1, self, 0) + result.add_event(self._axpbz(result, -1, self, 0)) return result def __mul__(self, other): if isinstance(other, Array): result = self._new_like_me( _get_common_dtype(self, other, self.queue)) - self._elwise_multiply(result, self, other) + result.add_event( + self._elwise_multiply(result, self, other)) return result else: common_dtype = _get_common_dtype(self, other, self.queue) result = self._new_like_me(common_dtype) - self._axpbz(result, - common_dtype.type(other), self, self.dtype.type(0)) + result.add_event( + self._axpbz(result, + common_dtype.type(other), self, self.dtype.type(0))) return result def __rmul__(self, scalar): common_dtype = _get_common_dtype(self, scalar, self.queue) result = self._new_like_me(common_dtype) - self._axpbz(result, - common_dtype.type(scalar), self, self.dtype.type(0)) + result.add_event( + self._axpbz(result, + common_dtype.type(scalar), self, self.dtype.type(0))) return result def __imul__(self, other): if isinstance(other, Array): - self._elwise_multiply(self, self, other) + self.add_event( + self._elwise_multiply(self, self, other)) else: # scalar - self._axpbz(self, other, self, self.dtype.type(0)) + self.add_event( + self._axpbz(self, other, self, self.dtype.type(0))) return self @@ -962,7 +1005,7 @@ class Array(object): if isinstance(other, Array): result = self._new_like_me( _get_common_dtype(self, other, self.queue)) - self._div(result, self, other) + result.add_event(self._div(result, self, other)) else: if other == 1: return self.copy() @@ -970,8 +1013,9 @@ class Array(object): # create a new array for the result common_dtype = _get_common_dtype(self, other, self.queue) result = self._new_like_me(common_dtype) - self._axpbz(result, - common_dtype.type(1/other), self, self.dtype.type(0)) + result.add_event( + self._axpbz(result, + common_dtype.type(1/other), self, self.dtype.type(0))) return result @@ -984,12 +1028,13 @@ class Array(object): if isinstance(other, Array): result = self._new_like_me( _get_common_dtype(self, other, self.queue)) - other._div(result, self) + result.add_event(other._div(result, self)) else: # create a new array for the result common_dtype = _get_common_dtype(self, other, self.queue) result = self._new_like_me(common_dtype) - self._rdiv_scalar(result, self, common_dtype.type(other)) + result.add_event( + self._rdiv_scalar(result, self, common_dtype.type(other))) return result @@ -1000,7 +1045,7 @@ class Array(object): :returns: *self*. """ - self.events.append( + self.add_event( self._fill(self, value, queue=queue, wait_for=wait_for)) return self @@ -1018,7 +1063,7 @@ class Array(object): """ result = self._new_like_me(self.dtype.type(0).real.dtype) - self._abs(result, self) + result.add_event(self._abs(result, self)) return result def __pow__(self, other): @@ -1031,11 +1076,12 @@ class Array(object): result = self._new_like_me( _get_common_dtype(self, other, self.queue)) - self._pow_array(result, self, other) + result.add_event( + self._pow_array(result, self, other)) else: result = self._new_like_me( _get_common_dtype(self, other, self.queue)) - self._pow_scalar(result, self, other) + result.add_event(self._pow_scalar(result, self, other)) return result @@ -1043,7 +1089,8 @@ class Array(object): # other must be a scalar common_dtype = _get_common_dtype(self, other, self.queue) result = self._new_like_me(common_dtype) - self._rpow_scalar(result, common_dtype.type(other), self) + result.add_event( + self._rpow_scalar(result, common_dtype.type(other), self)) return result # }}} @@ -1054,7 +1101,8 @@ class Array(object): """ result = self._new_like_me() - self._reverse(result, self) + result.add_event( + self._reverse(result, self)) return result def astype(self, dtype, queue=None): @@ -1063,7 +1111,7 @@ class Array(object): return self.copy() result = self._new_like_me(dtype=dtype) - self._copy(result, self, queue=queue) + result.add_event(self._copy(result, self, queue=queue)) return result # {{{ rich comparisons, any, all @@ -1102,27 +1150,32 @@ class Array(object): def __eq__(self, other): if isinstance(other, Array): result = self._new_like_me(np.int8) - self._array_comparison(result, self, other, op="==") + result.add_event( + self._array_comparison(result, self, other, op="==")) return result else: result = self._new_like_me(np.int8) - self._scalar_comparison(result, self, other, op="==") + result.add_event( + self._scalar_comparison(result, self, other, op="==")) return result def __ne__(self, other): if isinstance(other, Array): result = self._new_like_me(np.int8) - self._array_comparison(result, self, other, op="!=") + result.add_event( + self._array_comparison(result, self, other, op="!=")) return result else: result = self._new_like_me(np.int8) - self._scalar_comparison(result, self, other, op="!=") + result.add_event( + self._scalar_comparison(result, self, other, op="!=")) return result def __le__(self, other): if isinstance(other, Array): result = self._new_like_me(np.int8) - self._array_comparison(result, self, other, op="<=") + result.add_event( + self._array_comparison(result, self, other, op="<=")) return result else: result = self._new_like_me(np.int8) @@ -1132,31 +1185,37 @@ class Array(object): def __ge__(self, other): if isinstance(other, Array): result = self._new_like_me(np.int8) - self._array_comparison(result, self, other, op=">=") + result.add_event( + self._array_comparison(result, self, other, op=">=")) return result else: result = self._new_like_me(np.int8) - self._scalar_comparison(result, self, other, op=">=") + result.add_event( + self._scalar_comparison(result, self, other, op=">=")) return result def __lt__(self, other): if isinstance(other, Array): result = self._new_like_me(np.int8) - self._array_comparison(result, self, other, op="<") + result.add_event( + self._array_comparison(result, self, other, op="<")) return result else: result = self._new_like_me(np.int8) - self._scalar_comparison(result, self, other, op="<") + result.add_event( + self._scalar_comparison(result, self, other, op="<")) return result def __gt__(self, other): if isinstance(other, Array): result = self._new_like_me(np.int8) - self._array_comparison(result, self, other, op=">") + result.add_event( + self._array_comparison(result, self, other, op=">")) return result else: result = self._new_like_me(np.int8) - self._scalar_comparison(result, self, other, op=">") + result.add_event( + self._scalar_comparison(result, self, other, op=">")) return result # }}} @@ -1166,7 +1225,8 @@ class Array(object): def real(self): if self.dtype.kind == "c": result = self._new_like_me(self.dtype.type(0).real.dtype) - self._real(result, self) + result.add_event( + self._real(result, self)) return result else: return self @@ -1175,7 +1235,8 @@ class Array(object): def imag(self): if self.dtype.kind == "c": result = self._new_like_me(self.dtype.type(0).real.dtype) - self._imag(result, self) + result.add_event( + self._imag(result, self)) return result else: return zeros_like(self) @@ -1185,19 +1246,38 @@ class Array(object): """.. versionadded:: 2012.1""" if self.dtype.kind == "c": result = self._new_like_me() - self._conj(result, self) + result.add_event(self._conj(result, self)) return result else: return self # }}} + # {{{ event management + + def add_event(self, evt): + """Add *evt* to :attr:`events`. If :attr:`events` is too long, this method + may implicitly wait for a subset of :attr:`events` and clear them from the + list. + """ + n_wait = 4 + + self.events.append(evt) + + if len(self.events) > 3*n_wait: + wait_events = self.events[:n_wait] + cl.wait_for_events(wait_events) + del self.events[:n_wait] + def finish(self): - # undoc + """Wait for the entire contents of :attr:`events`, clear it.""" + if self.events: cl.wait_for_events(self.events) del self.events[:] + # }}} + # {{{ views def reshape(self, *shape, **kwargs): @@ -1418,7 +1498,7 @@ class Array(object): if isinstance(value, np.ndarray): if subarray.shape == value.shape and subarray.strides == value.strides: - self.events.append( + self.add_event( cl.enqueue_copy(queue, subarray.base_data, value, device_offset=subarray.offset, wait_for=wait_for)) return @@ -1436,7 +1516,8 @@ class Array(object): raise ValueError("cannot assign between arrays of " "differing strides") - self._copy(subarray, value, queue=queue, wait_for=wait_for) + self.add_event( + self._copy(subarray, value, queue=queue, wait_for=wait_for)) else: # Let's assume it's a scalar @@ -1634,7 +1715,7 @@ def arange(queue, *args, **kwargs): size = int(ceil((stop-start)/step)) result = Array(queue, (size,), dtype, allocator=inf.allocator) - result.events.append( + result.add_event( _arange_knl(result, start, step, queue=queue, wait_for=wait_for)) return result @@ -1659,7 +1740,7 @@ def take(a, indices, out=None, queue=None, wait_for=None): out = Array(queue, indices.shape, a.dtype, allocator=a.allocator) assert len(indices.shape) == 1 - out.events.append( + out.add_event( _take(out, a, indices, queue=queue, wait_for=wait_for)) return out @@ -1860,7 +1941,7 @@ def multi_put(arrays, dest_indices, dest_shape=None, out=None, queue=None, # FIXME should wait on incoming events for o in out[chunk_slice]: - o.events.append(evt) + o.add_event(evt) return out diff --git a/pyopencl/clrandom.py b/pyopencl/clrandom.py index 2fc377cb38e32dc8ffd5543f15008fe3f94a30ed..13cfbc20ff4a8857be53bef7db83a71fb8a5616a 100644 --- a/pyopencl/clrandom.py +++ b/pyopencl/clrandom.py @@ -292,13 +292,19 @@ class RanluxGenerator(object): def fill_uniform(self, ary, a=0, b=1, queue=None): """Fill *ary* with uniformly distributed random numbers in the interval *(a, b)*, endpoints excluded. + + :return: a :class:`pyopencl.Event` + + .. versionchanged:: 2014.1.1 + + Added return value. """ if queue is None: queue = ary.queue knl, size_multiplier = self.get_gen_kernel(ary.dtype, "uniform") - knl(queue, + return knl(queue, (self.num_work_items,), None, self.state.data, ary.data, ary.size*size_multiplier, b-a, a) @@ -317,13 +323,17 @@ class RanluxGenerator(object): def fill_normal(self, ary, mu=0, sigma=1, queue=None): """Fill *ary* with normally distributed numbers with mean *mu* and standard deviation *sigma*. + + .. versionchanged:: 2014.1.1 + + Added return value. """ if queue is None: queue = ary.queue knl, size_multiplier = self.get_gen_kernel(ary.dtype, "normal") - knl(queue, + return knl(queue, (self.num_work_items,), self.wg_size, self.state.data, ary.data, ary.size*size_multiplier, sigma, mu) @@ -335,7 +345,8 @@ class RanluxGenerator(object): result = cl_array.empty(*args, **kwargs) - self.fill_normal(result, queue=result.queue, mu=mu, sigma=sigma) + result.add_event( + self.fill_normal(result, queue=result.queue, mu=mu, sigma=sigma)) return result @memoize_method @@ -393,7 +404,8 @@ def rand(queue, shape, dtype, luxury=None, a=0, b=1): from pyopencl.array import Array gen = _get_generator(queue, luxury) result = Array(queue, shape, dtype) - gen.fill_uniform(result, a=a, b=b) + result.add_event( + gen.fill_uniform(result, a=a, b=b)) return result diff --git a/test/test_array.py b/test/test_array.py index bfe227849e4f3f2d7ccaaf0da1e97c4f128d986b..eb29b9b6cac86e134da49293b975f6c2a90644fc 100644 --- a/test/test_array.py +++ b/test/test_array.py @@ -719,6 +719,45 @@ def test_view_and_strides(ctx_factory): assert (y.get() == X.get()[:3, :5]).all() +def test_event_management(ctx_factory): + context = ctx_factory() + queue = cl.CommandQueue(context) + + from pyopencl.clrandom import rand as clrand + + x = clrand(queue, (5, 10), dtype=np.float32) + assert len(x.events) == 1, len(x._events) + + x.finish() + + assert len(x.events) == 0 + + y = x+x + assert len(y.events) == 1 + y = x*x + assert len(y.events) == 1 + y = 2*x + assert len(y.events) == 1 + y = 2/x + assert len(y.events) == 1 + y = x/2 + assert len(y.events) == 1 + y = x**2 + assert len(y.events) == 1 + y = 2**x + assert len(y.events) == 1 + + for i in range(10): + x.fill(0) + + assert len(x.events) == 10 + + for i in range(1000): + x.fill(0) + + assert len(x.events) < 100 + + if __name__ == "__main__": # make sure that import failures get reported, instead of skipping the # tests.