From 8e9be14cd08c7a43bd1847ca438101154d5954f7 Mon Sep 17 00:00:00 2001 From: "[6~" Date: Tue, 22 Oct 2019 14:18:36 -0500 Subject: [PATCH 01/11] Add initial hooks for concurrency checker --- pyopencl/check_concurrency.py | 53 +++++++++++++++++++++++++++++++++++ pyopencl/invoker.py | 4 +-- test/test_wrapper.py | 17 +++++++++++ 3 files changed, 72 insertions(+), 2 deletions(-) create mode 100644 pyopencl/check_concurrency.py diff --git a/pyopencl/check_concurrency.py b/pyopencl/check_concurrency.py new file mode 100644 index 00000000..bc7c55e1 --- /dev/null +++ b/pyopencl/check_concurrency.py @@ -0,0 +1,53 @@ +import pyopencl as cl +import weakref + + +# mapping from buffers to list of +# (queue,jjj +BUFFER_TO_OPS = weakref.WeakKeyDictionary() + +# mapping from kernel to dictionary containing {nr: buffer argument} +CURRENT_BUF_ARGS = weakref.WeakKeyDictionary() + + +prev_enqueue_nd_range_kernel = None +prev_kernel__set_arg_buf = None +prev_kernel_set_arg = None + + +def my_set_arg(kernel, index, obj): + print("SET_ARG: %s %d" % (kernel.function_name, index)) + if isinstance(obj, cl.Buffer): + arg_dict = CURRENT_BUF_ARGS.setdefault(kernel, {}) + arg_dict[index] = weakref.ref(obj) + return prev_kernel_set_arg(kernel, index, obj) + + +def my_enqueue_nd_range_kernel( + queue, kernel, global_size, local_size, + global_offset=None, wait_for=None, g_times_l=None): + print("ENQUEUE: %s" % kernel.function_name) + arg_dict = CURRENT_BUF_ARGS[kernel] + print(arg_dict) + return prev_enqueue_nd_range_kernel( + queue, kernel, global_size, local_size, + global_offset, wait_for, g_times_l) + + +def enable(): + global prev_enqueue_nd_range_kernel + global prev_kernel_set_arg + global prev_get_cl_header_version + + if prev_enqueue_nd_range_kernel is not None: + raise RuntimeError("already enabled") + + prev_enqueue_nd_range_kernel = cl.enqueue_nd_range_kernel + prev_kernel_set_arg = cl.Kernel.set_arg + prev_get_cl_header_version = cl.get_cl_header_version + + cl.Kernel.set_arg = my_set_arg + cl.enqueue_nd_range_kernel = my_enqueue_nd_range_kernel + + # I can't be bothered to handle clEnqueueFillBuffer + cl.get_cl_header_version = lambda: (1, 1) diff --git a/pyopencl/invoker.py b/pyopencl/invoker.py index b580c537..6fbf905e 100644 --- a/pyopencl/invoker.py +++ b/pyopencl/invoker.py @@ -284,7 +284,7 @@ def wrap_in_error_handler(body, arg_names): def add_local_imports(gen): gen("import numpy as np") - gen("import pyopencl._cl as _cl") + gen("import pyopencl as _cl") gen("from pyopencl import _KERNEL_ARG_CLASSES") gen("") @@ -352,7 +352,7 @@ def _generate_enqueue_and_set_args_module(function_name, invoker_cache = WriteOncePersistentDict( - "pyopencl-invoker-cache-v6", + "pyopencl-invoker-cache-v7", key_builder=_NumpyTypesKeyBuilder()) diff --git a/test/test_wrapper.py b/test/test_wrapper.py index 75b39b49..916cc0d2 100644 --- a/test/test_wrapper.py +++ b/test/test_wrapper.py @@ -1139,6 +1139,23 @@ def test_threaded_nanny_events(ctx_factory): t2.join() +def test_concurrency_checker(ctx_factory): + import pyopencl.check_concurrency as ccheck + + ccheck.enable() + + ctx = ctx_factory() + queue1 = cl.CommandQueue(ctx) + queue2 = cl.CommandQueue(ctx) + + arr1 = cl_array.zeros(queue1, (10,), np.float32) + arr2 = cl_array.zeros(queue2, (10,), np.float32) + del arr1.events[:] + del arr2.events[:] + + arr1 + arr2 + + if __name__ == "__main__": # make sure that import failures get reported, instead of skipping the tests. import pyopencl # noqa -- GitLab From 81295b6f1b74db86441fcea1e47778f626070bdc Mon Sep 17 00:00:00 2001 From: "[6~" Date: Tue, 22 Oct 2019 16:16:45 -0500 Subject: [PATCH 02/11] Mini step towards concurrency checker --- pyopencl/check_concurrency.py | 102 +++++++++++++++++++++++++++------- test/test_wrapper.py | 19 +++---- 2 files changed, 92 insertions(+), 29 deletions(-) diff --git a/pyopencl/check_concurrency.py b/pyopencl/check_concurrency.py index bc7c55e1..8998770d 100644 --- a/pyopencl/check_concurrency.py +++ b/pyopencl/check_concurrency.py @@ -1,9 +1,39 @@ +from __future__ import division, absolute_import, print_function + +__copyright__ = "Copyright (C) 2019 Andreas Kloeckner" + +__license__ = """ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + import pyopencl as cl import weakref +from collections import namedtuple + +OpRecord = namedtuple("OpRecord", [ + "kernel_name", + "queue", + ]) # mapping from buffers to list of -# (queue,jjj +# (kernel_name, queue weakref) BUFFER_TO_OPS = weakref.WeakKeyDictionary() # mapping from kernel to dictionary containing {nr: buffer argument} @@ -16,7 +46,6 @@ prev_kernel_set_arg = None def my_set_arg(kernel, index, obj): - print("SET_ARG: %s %d" % (kernel.function_name, index)) if isinstance(obj, cl.Buffer): arg_dict = CURRENT_BUF_ARGS.setdefault(kernel, {}) arg_dict[index] = weakref.ref(obj) @@ -26,28 +55,63 @@ def my_set_arg(kernel, index, obj): def my_enqueue_nd_range_kernel( queue, kernel, global_size, local_size, global_offset=None, wait_for=None, g_times_l=None): - print("ENQUEUE: %s" % kernel.function_name) - arg_dict = CURRENT_BUF_ARGS[kernel] - print(arg_dict) - return prev_enqueue_nd_range_kernel( + evt = prev_enqueue_nd_range_kernel( queue, kernel, global_size, local_size, global_offset, wait_for, g_times_l) + arg_dict = CURRENT_BUF_ARGS.get(kernel) + if arg_dict is not None: + for buf in arg_dict.values(): + buf = buf() + if buf is None: + continue + + prior_ops = BUFFER_TO_OPS.setdefault(buf, []) + for prior_op in prior_ops: + prev_queue = prior_op.queue() + + if prev_queue is not None and prev_queue.int_ptr != queue.int_ptr: + print("DIFFERENT QUEUES", + kernel.function_name, prior_op.kernel_name) + + prior_ops.append( + OpRecord( + kernel_name=kernel.function_name, + queue=weakref.ref(queue),) + ) + + return evt + + +class ConcurrencyCheck(object): + def __enter__(self): + global prev_enqueue_nd_range_kernel + global prev_kernel_set_arg + global prev_get_cl_header_version + + if prev_enqueue_nd_range_kernel is not None: + raise RuntimeError("already enabled") + + prev_enqueue_nd_range_kernel = cl.enqueue_nd_range_kernel + prev_kernel_set_arg = cl.Kernel.set_arg + prev_get_cl_header_version = cl.get_cl_header_version + + cl.Kernel.set_arg = my_set_arg + cl.enqueue_nd_range_kernel = my_enqueue_nd_range_kernel -def enable(): - global prev_enqueue_nd_range_kernel - global prev_kernel_set_arg - global prev_get_cl_header_version + # I can't be bothered to handle clEnqueueFillBuffer + cl.get_cl_header_version = lambda: (1, 1) - if prev_enqueue_nd_range_kernel is not None: - raise RuntimeError("already enabled") + def __exit__(self, exc_type, exc_value, traceback): + global prev_enqueue_nd_range_kernel + global prev_kernel_set_arg + global prev_get_cl_header_version - prev_enqueue_nd_range_kernel = cl.enqueue_nd_range_kernel - prev_kernel_set_arg = cl.Kernel.set_arg - prev_get_cl_header_version = cl.get_cl_header_version + cl.enqueue_nd_range_kernel = prev_enqueue_nd_range_kernel + cl.Kernel.set_arg = prev_kernel_set_arg + cl.get_cl_header_version = prev_get_cl_header_version - cl.Kernel.set_arg = my_set_arg - cl.enqueue_nd_range_kernel = my_enqueue_nd_range_kernel + prev_enqueue_nd_range_kernel = None - # I can't be bothered to handle clEnqueueFillBuffer - cl.get_cl_header_version = lambda: (1, 1) + BUFFER_TO_OPS.clear() + CURRENT_BUF_ARGS.clear() diff --git a/test/test_wrapper.py b/test/test_wrapper.py index 916cc0d2..93e70c8f 100644 --- a/test/test_wrapper.py +++ b/test/test_wrapper.py @@ -1142,18 +1142,17 @@ def test_threaded_nanny_events(ctx_factory): def test_concurrency_checker(ctx_factory): import pyopencl.check_concurrency as ccheck - ccheck.enable() - - ctx = ctx_factory() - queue1 = cl.CommandQueue(ctx) - queue2 = cl.CommandQueue(ctx) + with ccheck.ConcurrencyCheck(): + ctx = ctx_factory() + queue1 = cl.CommandQueue(ctx) + queue2 = cl.CommandQueue(ctx) - arr1 = cl_array.zeros(queue1, (10,), np.float32) - arr2 = cl_array.zeros(queue2, (10,), np.float32) - del arr1.events[:] - del arr2.events[:] + arr1 = cl_array.zeros(queue1, (10,), np.float32) + arr2 = cl_array.zeros(queue2, (10,), np.float32) + del arr1.events[:] + del arr2.events[:] - arr1 + arr2 + arr1 + arr2 if __name__ == "__main__": -- GitLab From 1b70bfab1037216e974768009af1820c7a242f4a Mon Sep 17 00:00:00 2001 From: Alexandru Fikl Date: Wed, 23 Oct 2019 20:16:47 -0500 Subject: [PATCH 03/11] make some global variables class attributes instead --- pyopencl/check_concurrency.py | 130 +++++++++++++++++++++++----------- pyopencl/invoker.py | 2 +- test/test_wrapper.py | 2 +- 3 files changed, 90 insertions(+), 44 deletions(-) diff --git a/pyopencl/check_concurrency.py b/pyopencl/check_concurrency.py index 8998770d..da582e39 100644 --- a/pyopencl/check_concurrency.py +++ b/pyopencl/check_concurrency.py @@ -23,12 +23,17 @@ THE SOFTWARE. """ import pyopencl as cl +import pyopencl.invoker + +import traceback import weakref from collections import namedtuple + OpRecord = namedtuple("OpRecord", [ "kernel_name", "queue", + "event" ]) @@ -40,78 +45,119 @@ BUFFER_TO_OPS = weakref.WeakKeyDictionary() CURRENT_BUF_ARGS = weakref.WeakKeyDictionary() -prev_enqueue_nd_range_kernel = None -prev_kernel__set_arg_buf = None -prev_kernel_set_arg = None +def add_local_imports_wrapper(gen): + cl.invoker.add_local_imports(gen) + # NOTE: need to import pyopencl to be able to wrap it in generated code + gen("import pyopencl as _cl") + gen("") -def my_set_arg(kernel, index, obj): +def set_arg_wrapper(cc, kernel, index, obj): + if cc.verbose: + # FIXME: should really use logging + print('set_arg: %s %s' % (kernel.function_name, index)) + if isinstance(obj, cl.Buffer): arg_dict = CURRENT_BUF_ARGS.setdefault(kernel, {}) arg_dict[index] = weakref.ref(obj) - return prev_kernel_set_arg(kernel, index, obj) + return cc.prev_kernel_set_arg(kernel, index, obj) -def my_enqueue_nd_range_kernel( - queue, kernel, global_size, local_size, +def check_events(wait_for_events, prior_events): + for evt in wait_for_events: + if evt in prior_events: + return True + + return False + + +def enqueue_nd_range_kernel_wrapper( + cc, queue, kernel, global_size, local_size, global_offset=None, wait_for=None, g_times_l=None): - evt = prev_enqueue_nd_range_kernel( + if cc.verbose: + print('enqueue_nd_range_kernel: %s' % (kernel.function_name,)) + + evt = cc.prev_enqueue_nd_range_kernel( queue, kernel, global_size, local_size, global_offset, wait_for, g_times_l) arg_dict = CURRENT_BUF_ARGS.get(kernel) - if arg_dict is not None: - for buf in arg_dict.values(): - buf = buf() - if buf is None: + if arg_dict is None: + return evt + + for index, buf in arg_dict.items(): + buf = buf() + if buf is None: + continue + + prior_ops = BUFFER_TO_OPS.setdefault(buf, []) + prior_events = [] + for prior_op in prior_ops: + prev_queue = prior_op.queue() + if prev_queue is None: continue - prior_ops = BUFFER_TO_OPS.setdefault(buf, []) - for prior_op in prior_ops: - prev_queue = prior_op.queue() + if prev_queue.int_ptr != queue.int_ptr: + if cc.show_traceback: + print("Traceback") + traceback.print_stack() - if prev_queue is not None and prev_queue.int_ptr != queue.int_ptr: - print("DIFFERENT QUEUES", - kernel.function_name, prior_op.kernel_name) + print('DifferentQueuesInKernel: argument %d current kernel `%s` ' + 'previous kernel `%s`' % ( + index, kernel.function_name, prior_op.kernel_name)) - prior_ops.append( - OpRecord( - kernel_name=kernel.function_name, - queue=weakref.ref(queue),) - ) + prior_event = prior_op.event() + if prior_event is not None: + prior_events.append(prior_event) + + if not check_events(wait_for, prior_events): + print('EventsNotFound') + + prior_ops.append( + OpRecord( + kernel_name=kernel.function_name, + queue=weakref.ref(queue), + event=weakref.ref(evt),) + ) return evt class ConcurrencyCheck(object): - def __enter__(self): - global prev_enqueue_nd_range_kernel - global prev_kernel_set_arg - global prev_get_cl_header_version + prev_enqueue_nd_range_kernel = None + prev_kernel_set_arg = None + prev_get_cl_header_version = None - if prev_enqueue_nd_range_kernel is not None: - raise RuntimeError("already enabled") + def __init__(self, show_traceback=True, verbose=True): + self.show_traceback = show_traceback + self.verbose = verbose - prev_enqueue_nd_range_kernel = cl.enqueue_nd_range_kernel - prev_kernel_set_arg = cl.Kernel.set_arg - prev_get_cl_header_version = cl.get_cl_header_version + def __enter__(self): + if self.prev_enqueue_nd_range_kernel is not None: + raise RuntimeError('cannot nest `ConcurrencyCheck`s') - cl.Kernel.set_arg = my_set_arg - cl.enqueue_nd_range_kernel = my_enqueue_nd_range_kernel + self.prev_enqueue_nd_range_kernel = cl.enqueue_nd_range_kernel + self.prev_kernel_set_arg = cl.Kernel.set_arg + self.prev_get_cl_header_version = cl.get_cl_header_version + + from functools import partial + cl.Kernel.set_arg = lambda a, b, c: set_arg_wrapper(self, a, b, c) + cl.enqueue_nd_range_kernel = \ + partial(enqueue_nd_range_kernel_wrapper, self) + cl.invoker.add_local_imports = \ + add_local_imports_wrapper # I can't be bothered to handle clEnqueueFillBuffer cl.get_cl_header_version = lambda: (1, 1) def __exit__(self, exc_type, exc_value, traceback): - global prev_enqueue_nd_range_kernel - global prev_kernel_set_arg - global prev_get_cl_header_version - - cl.enqueue_nd_range_kernel = prev_enqueue_nd_range_kernel - cl.Kernel.set_arg = prev_kernel_set_arg - cl.get_cl_header_version = prev_get_cl_header_version + cl.enqueue_nd_range_kernel = self.prev_enqueue_nd_range_kernel + cl.Kernel.set_arg = self.prev_kernel_set_arg + cl.get_cl_header_version = self.prev_get_cl_header_version - prev_enqueue_nd_range_kernel = None + self.prev_enqueue_nd_range_kernel = None BUFFER_TO_OPS.clear() CURRENT_BUF_ARGS.clear() + +# vim: foldmethod=marker diff --git a/pyopencl/invoker.py b/pyopencl/invoker.py index 6fbf905e..e29db97c 100644 --- a/pyopencl/invoker.py +++ b/pyopencl/invoker.py @@ -284,7 +284,7 @@ def wrap_in_error_handler(body, arg_names): def add_local_imports(gen): gen("import numpy as np") - gen("import pyopencl as _cl") + gen("import pyopencl._cl as _cl") gen("from pyopencl import _KERNEL_ARG_CLASSES") gen("") diff --git a/test/test_wrapper.py b/test/test_wrapper.py index 93e70c8f..8822f3b2 100644 --- a/test/test_wrapper.py +++ b/test/test_wrapper.py @@ -1149,7 +1149,7 @@ def test_concurrency_checker(ctx_factory): arr1 = cl_array.zeros(queue1, (10,), np.float32) arr2 = cl_array.zeros(queue2, (10,), np.float32) - del arr1.events[:] + # del arr1.events[:] del arr2.events[:] arr1 + arr2 -- GitLab From 1575333bf9fbb3b98eaa444949e7e81981f76b05 Mon Sep 17 00:00:00 2001 From: Alexandru Fikl Date: Sat, 26 Oct 2019 16:55:47 -0500 Subject: [PATCH 04/11] some more work on the concurrency checker Added the following logic: * in `enqueue_nd_range_kernel`, we catch all generated events for each queue. * those events get cleared when calling `queue.finish` or `cl.wait_for_events`. * if a kernel argument's queue's events are not in the current `wait_for`, we complain. --- pyopencl/check_concurrency.py | 233 +++++++++++++++++++++++----------- test/test_wrapper.py | 15 ++- 2 files changed, 170 insertions(+), 78 deletions(-) diff --git a/pyopencl/check_concurrency.py b/pyopencl/check_concurrency.py index da582e39..f900e739 100644 --- a/pyopencl/check_concurrency.py +++ b/pyopencl/check_concurrency.py @@ -29,135 +29,216 @@ import traceback import weakref from collections import namedtuple +import logging +logger = logging.getLogger(__name__) OpRecord = namedtuple("OpRecord", [ "kernel_name", "queue", - "event" ]) # mapping from buffers to list of -# (kernel_name, queue weakref) +# (kernel_name, queue weakref) BUFFER_TO_OPS = weakref.WeakKeyDictionary() # mapping from kernel to dictionary containing {nr: buffer argument} CURRENT_BUF_ARGS = weakref.WeakKeyDictionary() +# list of events for each queue +QUEUE_TO_EVENTS = weakref.WeakKeyDictionary() + + +# {{{ helpers + +def remove_finished_events(events): + global QUEUE_TO_EVENTS + + for evt in events: + queue = evt.get_info(cl.event_info.COMMAND_QUEUE) + if queue not in QUEUE_TO_EVENTS: + continue + + logger.info('[RM] %s: %s', queue, hash(evt)) + QUEUE_TO_EVENTS[queue].remove(hash(evt)) + + +def add_events(queue, events): + global QUEUE_TO_EVENTS + + logger.debug('[ADD] %s: %s', queue, set(hash(evt) for evt in events)) + QUEUE_TO_EVENTS.setdefault(queue, set()).update( + [hash(evt) for evt in events]) + + +# }}} + + +# {{{ wrappers + +def wrapper_add_local_imports(cc, gen): + """Wraps :func:`pyopencl.invoker.add_local_imports`""" + cc.call('add_local_imports')(gen) -def add_local_imports_wrapper(gen): - cl.invoker.add_local_imports(gen) # NOTE: need to import pyopencl to be able to wrap it in generated code gen("import pyopencl as _cl") gen("") -def set_arg_wrapper(cc, kernel, index, obj): - if cc.verbose: - # FIXME: should really use logging - print('set_arg: %s %s' % (kernel.function_name, index)) +def wrapper_set_arg(cc, kernel, index, obj): + """Wraps :meth:`pyopencl.Kernel.set_arg`""" + logger.debug('set_arg: %s %s', kernel.function_name, index) if isinstance(obj, cl.Buffer): arg_dict = CURRENT_BUF_ARGS.setdefault(kernel, {}) arg_dict[index] = weakref.ref(obj) - return cc.prev_kernel_set_arg(kernel, index, obj) + return cc.call('set_arg')(kernel, index, obj) -def check_events(wait_for_events, prior_events): - for evt in wait_for_events: - if evt in prior_events: - return True - return False +def wrapper_wait_for_events(cc, events): + """Wraps :func:`pyopencl.wait_for_events`""" + remove_finished_events(events) -def enqueue_nd_range_kernel_wrapper( - cc, queue, kernel, global_size, local_size, - global_offset=None, wait_for=None, g_times_l=None): - if cc.verbose: - print('enqueue_nd_range_kernel: %s' % (kernel.function_name,)) + return cc.call('wait_for_events')(events) - evt = cc.prev_enqueue_nd_range_kernel( - queue, kernel, global_size, local_size, - global_offset, wait_for, g_times_l) - arg_dict = CURRENT_BUF_ARGS.get(kernel) - if arg_dict is None: - return evt +def wrapper_finish(cc, queue): + """Wraps :meth:`pyopencl.CommandQueue.finish`""" - for index, buf in arg_dict.items(): - buf = buf() - if buf is None: - continue + if queue in QUEUE_TO_EVENTS: + QUEUE_TO_EVENTS[queue].clear() - prior_ops = BUFFER_TO_OPS.setdefault(buf, []) - prior_events = [] - for prior_op in prior_ops: - prev_queue = prior_op.queue() - if prev_queue is None: - continue + return cc.call('finish')(queue) - if prev_queue.int_ptr != queue.int_ptr: - if cc.show_traceback: - print("Traceback") - traceback.print_stack() - print('DifferentQueuesInKernel: argument %d current kernel `%s` ' - 'previous kernel `%s`' % ( - index, kernel.function_name, prior_op.kernel_name)) +def wrapper_enqueue_nd_range_kernel(cc, + queue, kernel, global_size, local_size, + global_offset=None, wait_for=None, g_times_l=None): + """Wraps :func:`pyopencl.enqueue_nd_range_kernel`""" - prior_event = prior_op.event() - if prior_event is not None: - prior_events.append(prior_event) + logger.debug('enqueue_nd_range_kernel: %s', kernel.function_name) - if not check_events(wait_for, prior_events): - print('EventsNotFound') + arg_dict = CURRENT_BUF_ARGS.get(kernel) + if arg_dict is not None: + synced_events = set([hash(evt) for evt in wait_for]) \ + | QUEUE_TO_EVENTS.get(queue, set()) + logger.debug("synced events: %s", synced_events) - prior_ops.append( - OpRecord( - kernel_name=kernel.function_name, - queue=weakref.ref(queue), - event=weakref.ref(evt),) - ) + for index, buf in arg_dict.items(): + logger.debug("%s: arg %d" % (kernel.function_name, index)) + + buf = buf() + if buf is None: + continue + + prior_ops = BUFFER_TO_OPS.setdefault(buf, []) + for op in prior_ops: + prior_queue = op.queue() + if prior_queue is None: + continue + if prior_queue.int_ptr == queue.int_ptr: + continue + + prior_events = QUEUE_TO_EVENTS.get(prior_queue, set()) + unsynced_events = prior_events - synced_events + logger.debug("%s prior events: %s", prior_queue, prior_events) + logger.debug("unsynced events: %s", unsynced_events) + + if unsynced_events: + if cc.show_traceback: + print("Traceback") + traceback.print_stack() + from warnings import warn + + warn("\nEventsNotSynced: argument %d " + "current kernel `%s` previous kernel `%s`\n" + "events `%s` not found in `wait_for` " + "or synced with `queue.finish()` " + "or `cl.wait_for_events()`" % ( + index, kernel.function_name, op.kernel_name, + unsynced_events), + RuntimeWarning, stacklevel=5) + + prior_ops.append(OpRecord( + kernel_name=kernel.function_name, + queue=weakref.ref(queue),)) + + evt = cc.call('enqueue_nd_range_kernel')(queue, kernel, + global_size, local_size, global_offset, wait_for, g_times_l) + add_events(queue, [evt]) return evt +# }}} + + +# {{{ class ConcurrencyCheck(object): - prev_enqueue_nd_range_kernel = None - prev_kernel_set_arg = None - prev_get_cl_header_version = None + _entered = False - def __init__(self, show_traceback=True, verbose=True): + def __init__(self, show_traceback=False): self.show_traceback = show_traceback - self.verbose = verbose - def __enter__(self): - if self.prev_enqueue_nd_range_kernel is not None: + self._overwritten_attrs = {} + if self._entered: raise RuntimeError('cannot nest `ConcurrencyCheck`s') - self.prev_enqueue_nd_range_kernel = cl.enqueue_nd_range_kernel - self.prev_kernel_set_arg = cl.Kernel.set_arg - self.prev_get_cl_header_version = cl.get_cl_header_version + def _monkey_patch(self, obj, name, wrapper=None): + orig_attr = getattr(obj, name, None) + if wrapper is None: + from functools import partial + try: + wrapper = partial(globals()["wrapper_%s" % name], self) + except KeyError: + raise - from functools import partial - cl.Kernel.set_arg = lambda a, b, c: set_arg_wrapper(self, a, b, c) - cl.enqueue_nd_range_kernel = \ - partial(enqueue_nd_range_kernel_wrapper, self) - cl.invoker.add_local_imports = \ - add_local_imports_wrapper + setattr(obj, name, wrapper) + logger.debug('Monkey patched %s `%s` method `%s`' % ( + type(obj).__name__, obj.__name__, name)) - # I can't be bothered to handle clEnqueueFillBuffer - cl.get_cl_header_version = lambda: (1, 1) + self._overwritten_attrs[name] = (obj, orig_attr) - def __exit__(self, exc_type, exc_value, traceback): - cl.enqueue_nd_range_kernel = self.prev_enqueue_nd_range_kernel - cl.Kernel.set_arg = self.prev_kernel_set_arg - cl.get_cl_header_version = self.prev_get_cl_header_version + def call(self, name): + _, func = self._overwritten_attrs[name] + return func + + def __enter__(self): + self._entered = True + + # allow monkeypatching in generated code + self._monkey_patch(cl.invoker, 'add_local_imports') + # fix version to avoid handling enqueue_fill_buffer + # in pyopencl.array.Array._zero_fill::1223 + self._monkey_patch(cl, 'get_cl_header_version', + wrapper=lambda: (1, 1)) + + # catch kernel argument buffers + self._monkey_patch(cl.Kernel, 'set_arg', + wrapper=lambda a, b, c: wrapper_set_arg(self, a, b, c)) + # catch events + self._monkey_patch(cl.Event, '__hash__', + wrapper=lambda x: x.int_ptr) + self._monkey_patch(cl, 'wait_for_events') + self._monkey_patch(cl.CommandQueue, 'finish') + # catch kernel calls to check concurrency + self._monkey_patch(cl, 'enqueue_nd_range_kernel') - self.prev_enqueue_nd_range_kernel = None + def __exit__(self, exc_type, exc_value, traceback): + for name, (obj, orig) in self._overwritten_attrs.items(): + if orig is None: + delattr(obj, name) + else: + setattr(obj, name, orig) BUFFER_TO_OPS.clear() CURRENT_BUF_ARGS.clear() + self._overwritten_attrs.clear() + self._entered = False + +# }}} + # vim: foldmethod=marker diff --git a/test/test_wrapper.py b/test/test_wrapper.py index 8822f3b2..292aee47 100644 --- a/test/test_wrapper.py +++ b/test/test_wrapper.py @@ -1140,8 +1140,17 @@ def test_threaded_nanny_events(ctx_factory): def test_concurrency_checker(ctx_factory): - import pyopencl.check_concurrency as ccheck + import logging + logger = logging.getLogger('pyopencl.check_concurrency') + + formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') + handler = logging.StreamHandler() + handler.setFormatter(formatter) + + logger.setLevel(logging.DEBUG) + logger.addHandler(handler) + import pyopencl.check_concurrency as ccheck with ccheck.ConcurrencyCheck(): ctx = ctx_factory() queue1 = cl.CommandQueue(ctx) @@ -1152,7 +1161,9 @@ def test_concurrency_checker(ctx_factory): # del arr1.events[:] del arr2.events[:] - arr1 + arr2 + arr1 - arr2 + + print('done') if __name__ == "__main__": -- GitLab From a5ce2cfcdc460a4ef8f5248d14a0ae1a93748798 Mon Sep 17 00:00:00 2001 From: Alexandru Fikl Date: Sat, 26 Oct 2019 21:45:57 -0500 Subject: [PATCH 05/11] some more tweaks to concurrency checker. Uses WeakSet to store events so we don't have to worry when they get destroyed. The check for out of sync events is also more strict: we now store the event per (buffer, op) as well as per queue and only check events that are still valid in both and only for the current buffer. --- pyopencl/check_concurrency.py | 99 ++++++++++++++++------------------- test/test_wrapper.py | 50 ++++++++++-------- 2 files changed, 72 insertions(+), 77 deletions(-) diff --git a/pyopencl/check_concurrency.py b/pyopencl/check_concurrency.py index f900e739..c13cabb1 100644 --- a/pyopencl/check_concurrency.py +++ b/pyopencl/check_concurrency.py @@ -35,6 +35,7 @@ logger = logging.getLogger(__name__) OpRecord = namedtuple("OpRecord", [ "kernel_name", "queue", + "event", ]) @@ -51,25 +52,9 @@ QUEUE_TO_EVENTS = weakref.WeakKeyDictionary() # {{{ helpers -def remove_finished_events(events): - global QUEUE_TO_EVENTS - - for evt in events: - queue = evt.get_info(cl.event_info.COMMAND_QUEUE) - if queue not in QUEUE_TO_EVENTS: - continue - - logger.info('[RM] %s: %s', queue, hash(evt)) - QUEUE_TO_EVENTS[queue].remove(hash(evt)) - - def add_events(queue, events): - global QUEUE_TO_EVENTS - - logger.debug('[ADD] %s: %s', queue, set(hash(evt) for evt in events)) - QUEUE_TO_EVENTS.setdefault(queue, set()).update( - [hash(evt) for evt in events]) - + logger.debug('[ADD] %s: %s', queue, events) + QUEUE_TO_EVENTS.setdefault(queue, weakref.WeakSet()).update(events) # }}} @@ -96,14 +81,6 @@ def wrapper_set_arg(cc, kernel, index, obj): return cc.call('set_arg')(kernel, index, obj) -def wrapper_wait_for_events(cc, events): - """Wraps :func:`pyopencl.wait_for_events`""" - - remove_finished_events(events) - - return cc.call('wait_for_events')(events) - - def wrapper_finish(cc, queue): """Wraps :meth:`pyopencl.CommandQueue.finish`""" @@ -119,13 +96,17 @@ def wrapper_enqueue_nd_range_kernel(cc, """Wraps :func:`pyopencl.enqueue_nd_range_kernel`""" logger.debug('enqueue_nd_range_kernel: %s', kernel.function_name) + evt = cc.call('enqueue_nd_range_kernel')(queue, kernel, + global_size, local_size, global_offset, wait_for, g_times_l) + add_events(queue, [evt]) arg_dict = CURRENT_BUF_ARGS.get(kernel) if arg_dict is not None: - synced_events = set([hash(evt) for evt in wait_for]) \ - | QUEUE_TO_EVENTS.get(queue, set()) - logger.debug("synced events: %s", synced_events) + synced_events = weakref.WeakSet() + if wait_for is not None: + synced_events |= weakref.WeakSet(wait_for) + indices = list(arg_dict.keys()) for index, buf in arg_dict.items(): logger.debug("%s: arg %d" % (kernel.function_name, index)) @@ -134,6 +115,7 @@ def wrapper_enqueue_nd_range_kernel(cc, continue prior_ops = BUFFER_TO_OPS.setdefault(buf, []) + unsynced_events = [] for op in prior_ops: prior_queue = op.queue() if prior_queue is None: @@ -141,33 +123,39 @@ def wrapper_enqueue_nd_range_kernel(cc, if prior_queue.int_ptr == queue.int_ptr: continue - prior_events = QUEUE_TO_EVENTS.get(prior_queue, set()) - unsynced_events = prior_events - synced_events - logger.debug("%s prior events: %s", prior_queue, prior_events) - logger.debug("unsynced events: %s", unsynced_events) - - if unsynced_events: - if cc.show_traceback: - print("Traceback") - traceback.print_stack() - from warnings import warn - - warn("\nEventsNotSynced: argument %d " - "current kernel `%s` previous kernel `%s`\n" - "events `%s` not found in `wait_for` " - "or synced with `queue.finish()` " - "or `cl.wait_for_events()`" % ( - index, kernel.function_name, op.kernel_name, - unsynced_events), - RuntimeWarning, stacklevel=5) + prior_event = op.event() + if prior_event is None: + continue + + prior_queue_events = QUEUE_TO_EVENTS.get( + prior_queue, weakref.WeakSet()) + if prior_event in prior_queue_events \ + and prior_event not in synced_events: + unsynced_events.append(op.kernel_name) + + logger.debug("unsynced events: %s", list(unsynced_events)) + if unsynced_events: + if cc.show_traceback: + print("Traceback") + traceback.print_stack() + + from warnings import warn + warn("\n[%5d] EventsNotSynced: argument `%s` in `%s`\n" + "%7s current kernel `%s` previous kernels %s\n" + "%7s %d events not found in `wait_for` " + "or synced with `queue.finish()` " + "or `cl.wait_for_events()`\n" % ( + cc.concurrency_issues, + index, indices, " ", + kernel.function_name, ", ".join(unsynced_events), " ", + len(unsynced_events)), + RuntimeWarning, stacklevel=5) + cc.concurrency_issues += 1 prior_ops.append(OpRecord( kernel_name=kernel.function_name, - queue=weakref.ref(queue),)) - - evt = cc.call('enqueue_nd_range_kernel')(queue, kernel, - global_size, local_size, global_offset, wait_for, g_times_l) - add_events(queue, [evt]) + queue=weakref.ref(queue), + event=weakref.ref(evt),)) return evt @@ -207,6 +195,7 @@ class ConcurrencyCheck(object): def __enter__(self): self._entered = True + self.concurrency_issues = 0 # allow monkeypatching in generated code self._monkey_patch(cl.invoker, 'add_local_imports') @@ -221,8 +210,8 @@ class ConcurrencyCheck(object): # catch events self._monkey_patch(cl.Event, '__hash__', wrapper=lambda x: x.int_ptr) - self._monkey_patch(cl, 'wait_for_events') - self._monkey_patch(cl.CommandQueue, 'finish') + self._monkey_patch(cl.CommandQueue, 'finish', + wrapper=lambda a: wrapper_finish(self, a)) # catch kernel calls to check concurrency self._monkey_patch(cl, 'enqueue_nd_range_kernel') diff --git a/test/test_wrapper.py b/test/test_wrapper.py index 292aee47..f14c9398 100644 --- a/test/test_wrapper.py +++ b/test/test_wrapper.py @@ -45,6 +45,25 @@ else: faulthandler.enable() +def with_concurrency_check(func): + def wrapper(*args, **kwargs): + import pyopencl.check_concurrency as cc + with cc.ConcurrencyCheck(): + func(*args, **kwargs) + + import logging + logger = logging.getLogger('pyopencl.check_concurrency') + + formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') + handler = logging.StreamHandler() + handler.setFormatter(formatter) + + logger.setLevel(logging.DEBUG) + logger.addHandler(handler) + + return wrapper + + def _skip_if_pocl(plat, up_to_version, msg='unsupported by pocl'): if plat.vendor == "The pocl project": if up_to_version is None or get_pocl_version(plat) <= up_to_version: @@ -1139,31 +1158,18 @@ def test_threaded_nanny_events(ctx_factory): t2.join() +@with_concurrency_check def test_concurrency_checker(ctx_factory): - import logging - logger = logging.getLogger('pyopencl.check_concurrency') - - formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') - handler = logging.StreamHandler() - handler.setFormatter(formatter) - - logger.setLevel(logging.DEBUG) - logger.addHandler(handler) - - import pyopencl.check_concurrency as ccheck - with ccheck.ConcurrencyCheck(): - ctx = ctx_factory() - queue1 = cl.CommandQueue(ctx) - queue2 = cl.CommandQueue(ctx) - - arr1 = cl_array.zeros(queue1, (10,), np.float32) - arr2 = cl_array.zeros(queue2, (10,), np.float32) - # del arr1.events[:] - del arr2.events[:] + ctx = ctx_factory() + queue1 = cl.CommandQueue(ctx) + queue2 = cl.CommandQueue(ctx) - arr1 - arr2 + arr1 = cl_array.zeros(queue1, (10,), np.float32) + arr2 = cl_array.zeros(queue2, (10,), np.float32) + # del arr1.events[:] + del arr2.events[:] - print('done') + arr1 - arr2 if __name__ == "__main__": -- GitLab From 73343d5db3ab615e94a09ddb2a26c3b30e4cd21e Mon Sep 17 00:00:00 2001 From: Alexandru Fikl Date: Sat, 26 Oct 2019 21:57:58 -0500 Subject: [PATCH 06/11] add decorator --- pyopencl/check_concurrency.py | 28 ++++++++++++++++++---------- test/test_wrapper.py | 20 +------------------- 2 files changed, 19 insertions(+), 29 deletions(-) diff --git a/pyopencl/check_concurrency.py b/pyopencl/check_concurrency.py index c13cabb1..5c81750e 100644 --- a/pyopencl/check_concurrency.py +++ b/pyopencl/check_concurrency.py @@ -50,15 +50,6 @@ CURRENT_BUF_ARGS = weakref.WeakKeyDictionary() QUEUE_TO_EVENTS = weakref.WeakKeyDictionary() -# {{{ helpers - -def add_events(queue, events): - logger.debug('[ADD] %s: %s', queue, events) - QUEUE_TO_EVENTS.setdefault(queue, weakref.WeakSet()).update(events) - -# }}} - - # {{{ wrappers def wrapper_add_local_imports(cc, gen): @@ -98,7 +89,7 @@ def wrapper_enqueue_nd_range_kernel(cc, logger.debug('enqueue_nd_range_kernel: %s', kernel.function_name) evt = cc.call('enqueue_nd_range_kernel')(queue, kernel, global_size, local_size, global_offset, wait_for, g_times_l) - add_events(queue, [evt]) + QUEUE_TO_EVENTS.setdefault(queue, weakref.WeakSet()).add(evt) arg_dict = CURRENT_BUF_ARGS.get(kernel) if arg_dict is not None: @@ -164,6 +155,23 @@ def wrapper_enqueue_nd_range_kernel(cc, # {{{ +def with_concurrency_check(func): + def wrapper(func, *args, **kwargs): + with ConcurrencyCheck(): + return func(*args, **kwargs) + + formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') + handler = logging.StreamHandler() + handler.setFormatter(formatter) + + global logger + logger.setLevel(logging.DEBUG) + logger.addHandler(handler) + + from pytools import decorator + return decorator.decorator(wrapper, func) + + class ConcurrencyCheck(object): _entered = False diff --git a/test/test_wrapper.py b/test/test_wrapper.py index f14c9398..d798a417 100644 --- a/test/test_wrapper.py +++ b/test/test_wrapper.py @@ -35,6 +35,7 @@ import pyopencl.clrandom from pyopencl.tools import ( # noqa pytest_generate_tests_for_pyopencl as pytest_generate_tests) from pyopencl.characterize import get_pocl_version +from pyopencl.check_concurrency import with_concurrency_check # Are CL implementations crashy? You be the judge. :) try: @@ -45,25 +46,6 @@ else: faulthandler.enable() -def with_concurrency_check(func): - def wrapper(*args, **kwargs): - import pyopencl.check_concurrency as cc - with cc.ConcurrencyCheck(): - func(*args, **kwargs) - - import logging - logger = logging.getLogger('pyopencl.check_concurrency') - - formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') - handler = logging.StreamHandler() - handler.setFormatter(formatter) - - logger.setLevel(logging.DEBUG) - logger.addHandler(handler) - - return wrapper - - def _skip_if_pocl(plat, up_to_version, msg='unsupported by pocl'): if plat.vendor == "The pocl project": if up_to_version is None or get_pocl_version(plat) <= up_to_version: -- GitLab From 8f30b807729b73bb868b5d9bdc34874012c887b3 Mon Sep 17 00:00:00 2001 From: Alexandru Fikl Date: Sun, 3 Nov 2019 20:33:04 -0600 Subject: [PATCH 07/11] do not use weakrefs for Events in case someone deletes them --- pyopencl/check_concurrency.py | 58 ++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/pyopencl/check_concurrency.py b/pyopencl/check_concurrency.py index 5c81750e..d1e4a86f 100644 --- a/pyopencl/check_concurrency.py +++ b/pyopencl/check_concurrency.py @@ -33,6 +33,7 @@ import logging logger = logging.getLogger(__name__) OpRecord = namedtuple("OpRecord", [ + "arg_name", "kernel_name", "queue", "event", @@ -72,6 +73,16 @@ def wrapper_set_arg(cc, kernel, index, obj): return cc.call('set_arg')(kernel, index, obj) +def wrapper_wait_for_events(cc, wait_for): + for evt in wait_for: + queue = evt.get_info(cl.event_info.COMMAND_QUEUE) + if queue not in QUEUE_TO_EVENTS: + continue + + if evt in QUEUE_TO_EVENTS[queue]: + QUEUE_TO_EVENTS[queue].remove(evt) + + def wrapper_finish(cc, queue): """Wraps :meth:`pyopencl.CommandQueue.finish`""" @@ -93,18 +104,22 @@ def wrapper_enqueue_nd_range_kernel(cc, arg_dict = CURRENT_BUF_ARGS.get(kernel) if arg_dict is not None: - synced_events = weakref.WeakSet() + synced_events = set() if wait_for is not None: - synced_events |= weakref.WeakSet(wait_for) + synced_events |= set(wait_for) - indices = list(arg_dict.keys()) - for index, buf in arg_dict.items(): + for ibuf, (index, buf) in enumerate(arg_dict.items()): logger.debug("%s: arg %d" % (kernel.function_name, index)) buf = buf() if buf is None: continue + try: + arg_name = kernel.get_arg_info(index, cl.kernel_arg_info.NAME) + except cl.RuntimeError: + arg_name = str(ibuf) + prior_ops = BUFFER_TO_OPS.setdefault(buf, []) unsynced_events = [] for op in prior_ops: @@ -114,39 +129,35 @@ def wrapper_enqueue_nd_range_kernel(cc, if prior_queue.int_ptr == queue.int_ptr: continue - prior_event = op.event() - if prior_event is None: - continue - - prior_queue_events = QUEUE_TO_EVENTS.get( - prior_queue, weakref.WeakSet()) - if prior_event in prior_queue_events \ - and prior_event not in synced_events: - unsynced_events.append(op.kernel_name) + prior_queue_events = QUEUE_TO_EVENTS.get(prior_queue, set()) + if op.event in prior_queue_events \ + and op.event not in synced_events: + unsynced_events.append((op.arg_name, op.kernel_name)) logger.debug("unsynced events: %s", list(unsynced_events)) if unsynced_events: if cc.show_traceback: print("Traceback") traceback.print_stack() + cc.concurrency_issues += 1 from warnings import warn - warn("\n[%5d] EventsNotSynced: argument `%s` in `%s`\n" - "%7s current kernel `%s` previous kernels %s\n" + warn("\n[%5d] EventsNotSynced: argument `%s` kernel `%s`\n" + "%7s previous kernels %s\n" "%7s %d events not found in `wait_for` " "or synced with `queue.finish()` " "or `cl.wait_for_events()`\n" % ( cc.concurrency_issues, - index, indices, " ", - kernel.function_name, ", ".join(unsynced_events), " ", + arg_name, kernel.function_name, " ", + ", ".join([str(x) for x in unsynced_events]), " ", len(unsynced_events)), RuntimeWarning, stacklevel=5) - cc.concurrency_issues += 1 prior_ops.append(OpRecord( + arg_name=arg_name, kernel_name=kernel.function_name, queue=weakref.ref(queue), - event=weakref.ref(evt),)) + event=evt,)) return evt @@ -179,6 +190,7 @@ class ConcurrencyCheck(object): self.show_traceback = show_traceback self._overwritten_attrs = {} + print(self, self._entered) if self._entered: raise RuntimeError('cannot nest `ConcurrencyCheck`s') @@ -202,7 +214,7 @@ class ConcurrencyCheck(object): return func def __enter__(self): - self._entered = True + ConcurrencyCheck._entered = True self.concurrency_issues = 0 # allow monkeypatching in generated code @@ -218,11 +230,14 @@ class ConcurrencyCheck(object): # catch events self._monkey_patch(cl.Event, '__hash__', wrapper=lambda x: x.int_ptr) + self._monkey_patch(cl, 'wait_for_events') self._monkey_patch(cl.CommandQueue, 'finish', wrapper=lambda a: wrapper_finish(self, a)) # catch kernel calls to check concurrency self._monkey_patch(cl, 'enqueue_nd_range_kernel') + return self + def __exit__(self, exc_type, exc_value, traceback): for name, (obj, orig) in self._overwritten_attrs.items(): if orig is None: @@ -232,9 +247,10 @@ class ConcurrencyCheck(object): BUFFER_TO_OPS.clear() CURRENT_BUF_ARGS.clear() + QUEUE_TO_EVENTS.clear() self._overwritten_attrs.clear() - self._entered = False + ConcurrencyCheck._entered = False # }}} -- GitLab From ddab6ab14e749ca85065f99358ee1a2a74ddb078 Mon Sep 17 00:00:00 2001 From: Alexandru Fikl Date: Tue, 5 Nov 2019 13:23:33 -0600 Subject: [PATCH 08/11] check_concurrency: cleanup a bit --- pyopencl/check_concurrency.py | 125 +++++++++++++++++++--------------- 1 file changed, 70 insertions(+), 55 deletions(-) diff --git a/pyopencl/check_concurrency.py b/pyopencl/check_concurrency.py index d1e4a86f..a96d31e8 100644 --- a/pyopencl/check_concurrency.py +++ b/pyopencl/check_concurrency.py @@ -37,8 +37,10 @@ OpRecord = namedtuple("OpRecord", [ "kernel_name", "queue", "event", + "stack", ]) +ArgRecord = namedtuple("ArgRecord", ["name", "buf"]) # mapping from buffers to list of # (kernel_name, queue weakref) @@ -55,7 +57,7 @@ QUEUE_TO_EVENTS = weakref.WeakKeyDictionary() def wrapper_add_local_imports(cc, gen): """Wraps :func:`pyopencl.invoker.add_local_imports`""" - cc.call('add_local_imports')(gen) + cc.func('add_local_imports')(gen) # NOTE: need to import pyopencl to be able to wrap it in generated code gen("import pyopencl as _cl") @@ -68,9 +70,18 @@ def wrapper_set_arg(cc, kernel, index, obj): logger.debug('set_arg: %s %s', kernel.function_name, index) if isinstance(obj, cl.Buffer): arg_dict = CURRENT_BUF_ARGS.setdefault(kernel, {}) - arg_dict[index] = weakref.ref(obj) - return cc.call('set_arg')(kernel, index, obj) + try: + arg_name = kernel.get_arg_info(index, cl.kernel_arg_info.NAME) + except cl.RuntimeError: + arg_name = str(index) + + arg_dict[index] = ArgRecord( + name=arg_name, + buf=weakref.ref(obj), + ) + + return cc.func('set_arg')(kernel, index, obj) def wrapper_wait_for_events(cc, wait_for): @@ -89,7 +100,7 @@ def wrapper_finish(cc, queue): if queue in QUEUE_TO_EVENTS: QUEUE_TO_EVENTS[queue].clear() - return cc.call('finish')(queue) + return cc.func('finish')(queue) def wrapper_enqueue_nd_range_kernel(cc, @@ -98,9 +109,9 @@ def wrapper_enqueue_nd_range_kernel(cc, """Wraps :func:`pyopencl.enqueue_nd_range_kernel`""" logger.debug('enqueue_nd_range_kernel: %s', kernel.function_name) - evt = cc.call('enqueue_nd_range_kernel')(queue, kernel, + evt = cc.func('enqueue_nd_range_kernel')(queue, kernel, global_size, local_size, global_offset, wait_for, g_times_l) - QUEUE_TO_EVENTS.setdefault(queue, weakref.WeakSet()).add(evt) + QUEUE_TO_EVENTS.setdefault(queue, set()).add(evt) arg_dict = CURRENT_BUF_ARGS.get(kernel) if arg_dict is not None: @@ -108,18 +119,13 @@ def wrapper_enqueue_nd_range_kernel(cc, if wait_for is not None: synced_events |= set(wait_for) - for ibuf, (index, buf) in enumerate(arg_dict.items()): - logger.debug("%s: arg %d" % (kernel.function_name, index)) + for arg in arg_dict.values(): + logger.debug("%s: arg %s" % (kernel.function_name, arg.name)) - buf = buf() + buf = arg.buf() if buf is None: continue - try: - arg_name = kernel.get_arg_info(index, cl.kernel_arg_info.NAME) - except cl.RuntimeError: - arg_name = str(ibuf) - prior_ops = BUFFER_TO_OPS.setdefault(buf, []) unsynced_events = [] for op in prior_ops: @@ -135,10 +141,9 @@ def wrapper_enqueue_nd_range_kernel(cc, unsynced_events.append((op.arg_name, op.kernel_name)) logger.debug("unsynced events: %s", list(unsynced_events)) + + tb = "".join(traceback.format_stack()) if unsynced_events: - if cc.show_traceback: - print("Traceback") - traceback.print_stack() cc.concurrency_issues += 1 from warnings import warn @@ -148,16 +153,22 @@ def wrapper_enqueue_nd_range_kernel(cc, "or synced with `queue.finish()` " "or `cl.wait_for_events()`\n" % ( cc.concurrency_issues, - arg_name, kernel.function_name, " ", + arg.name, kernel.function_name, " ", ", ".join([str(x) for x in unsynced_events]), " ", len(unsynced_events)), RuntimeWarning, stacklevel=5) + if cc.show_traceback: + print("Traceback\n%s" % tb) + import pudb + pudb.set_trace() + prior_ops.append(OpRecord( - arg_name=arg_name, + arg_name=arg.name, kernel_name=kernel.function_name, queue=weakref.ref(queue), - event=evt,)) + event=evt, + stack=tb)) return evt @@ -168,7 +179,7 @@ def wrapper_enqueue_nd_range_kernel(cc, def with_concurrency_check(func): def wrapper(func, *args, **kwargs): - with ConcurrencyCheck(): + with ConcurrencyCheck(show_traceback=True): return func(*args, **kwargs) formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') @@ -183,6 +194,29 @@ def with_concurrency_check(func): return decorator.decorator(wrapper, func) +class MonkeyPatch(object): + def __init__(self, cc, obj, attr, wrapper=None): + def wrapper_partial(*args, **kwargs): + func = globals()["wrapper_%s" % attr] + return func(cc, *args, **kwargs) + + if wrapper is None: + wrapper = wrapper_partial + + func = getattr(obj, attr, None) + setattr(obj, attr, wrapper) + + self.obj = obj + self.attr = attr + self.func = func + + def undo(self): + if self.func is None: + delattr(self.obj, self.attr) + else: + setattr(self.obj, self.attr, self.func) + + class ConcurrencyCheck(object): _entered = False @@ -190,60 +224,41 @@ class ConcurrencyCheck(object): self.show_traceback = show_traceback self._overwritten_attrs = {} - print(self, self._entered) if self._entered: raise RuntimeError('cannot nest `ConcurrencyCheck`s') - def _monkey_patch(self, obj, name, wrapper=None): - orig_attr = getattr(obj, name, None) - if wrapper is None: - from functools import partial - try: - wrapper = partial(globals()["wrapper_%s" % name], self) - except KeyError: - raise - - setattr(obj, name, wrapper) - logger.debug('Monkey patched %s `%s` method `%s`' % ( - type(obj).__name__, obj.__name__, name)) - - self._overwritten_attrs[name] = (obj, orig_attr) - - def call(self, name): - _, func = self._overwritten_attrs[name] - return func + def func(self, attr): + return self._overwritten_attrs[attr].func def __enter__(self): ConcurrencyCheck._entered = True self.concurrency_issues = 0 + def monkeypatch(obj, attr, wrapper=None): + p = MonkeyPatch(self, obj, attr, wrapper=wrapper) + self._overwritten_attrs[attr] = p + # allow monkeypatching in generated code - self._monkey_patch(cl.invoker, 'add_local_imports') + monkeypatch(cl.invoker, 'add_local_imports') # fix version to avoid handling enqueue_fill_buffer # in pyopencl.array.Array._zero_fill::1223 - self._monkey_patch(cl, 'get_cl_header_version', - wrapper=lambda: (1, 1)) + monkeypatch(cl, 'get_cl_header_version', wrapper=lambda: (1, 1)) # catch kernel argument buffers - self._monkey_patch(cl.Kernel, 'set_arg', - wrapper=lambda a, b, c: wrapper_set_arg(self, a, b, c)) + monkeypatch(cl.Kernel, 'set_arg') # catch events - self._monkey_patch(cl.Event, '__hash__', + monkeypatch(cl.Event, '__hash__', wrapper=lambda x: x.int_ptr) - self._monkey_patch(cl, 'wait_for_events') - self._monkey_patch(cl.CommandQueue, 'finish', - wrapper=lambda a: wrapper_finish(self, a)) + monkeypatch(cl, 'wait_for_events') + monkeypatch(cl.CommandQueue, 'finish') # catch kernel calls to check concurrency - self._monkey_patch(cl, 'enqueue_nd_range_kernel') + monkeypatch(cl, 'enqueue_nd_range_kernel') return self def __exit__(self, exc_type, exc_value, traceback): - for name, (obj, orig) in self._overwritten_attrs.items(): - if orig is None: - delattr(obj, name) - else: - setattr(obj, name, orig) + for p in self._overwritten_attrs.values(): + p.undo() BUFFER_TO_OPS.clear() CURRENT_BUF_ARGS.clear() -- GitLab From 00f82a3eae5831110739d9c0dede58175927b1cb Mon Sep 17 00:00:00 2001 From: Alexandru Fikl Date: Tue, 5 Nov 2019 16:25:47 -0600 Subject: [PATCH 09/11] add a nice menu with options --- pyopencl/check_concurrency.py | 170 +++++++++++++++++++++++----------- 1 file changed, 118 insertions(+), 52 deletions(-) diff --git a/pyopencl/check_concurrency.py b/pyopencl/check_concurrency.py index a96d31e8..5aaff348 100644 --- a/pyopencl/check_concurrency.py +++ b/pyopencl/check_concurrency.py @@ -53,6 +53,75 @@ CURRENT_BUF_ARGS = weakref.WeakKeyDictionary() QUEUE_TO_EVENTS = weakref.WeakKeyDictionary() +# {{{ helpers + +def show_traceback_menu(kernel, arg, current_op, ops): + def print_menu(): + m = 1 + + def print_option(text): + nonlocal m + print(' [%2d] %s' % (m, text)) + m += 1 + + print() + print("Choose action:") + print_option("Show info for current kernel call") + print_option("Show traceback for current kernel call") + for op in ops: + print_option("Show info for kernel `%s` call" % op.kernel_name) + print_option("Show traceback for kernel `%s` call" % op.kernel_name) + print_option("Enter debugger (pudb)") + print_option("Continue") + return m - 1 + + def print_op(op, knl): + print('Argument: %s' % op.arg_name) + print('Kernel: %s' % op.kernel_name) + if knl is not None: + args = CURRENT_BUF_ARGS.get(knl) + args = [x.name for x in args.values()] + print("Arguments: %s" % args) + print('Event: %s' % op.event) + print('Queue: %s (dead %s)' % (op.queue, op.queue() is None)) + + while True: + nchoices = print_menu() + choice = input("Choice [%d]: " % nchoices) + print() + + if not choice: + break + + try: + choice = int(choice) + except ValueError: + print("ValueError: Invalid choice") + continue + + if choice <= 0 or choice > nchoices: + print("ValueError: Invalid choice") + continue + + if choice == 1: + print_op(current_op, kernel) + elif choice == 2: + print("Traceback") + print("".join(traceback.format_stack()[:-3])) + elif choice == nchoices - 1: + import pudb + pudb.set_trace() + elif choice == nchoices: + break + elif (choice - 3) % 2 == 0: + print_op(ops[(choice - 3) // 2], None) + elif (choice - 3) % 2 == 1: + print("Traceback") + print("".join(ops[(choice - 3) // 2].stack[:-2])) + +# }}} + + # {{{ wrappers def wrapper_add_local_imports(cc, gen): @@ -114,61 +183,58 @@ def wrapper_enqueue_nd_range_kernel(cc, QUEUE_TO_EVENTS.setdefault(queue, set()).add(evt) arg_dict = CURRENT_BUF_ARGS.get(kernel) - if arg_dict is not None: - synced_events = set() - if wait_for is not None: - synced_events |= set(wait_for) + if arg_dict is None: + return evt + + synced_events = set() if wait_for is None else set(wait_for) + for arg in arg_dict.values(): + logger.debug("%s: arg %s" % (kernel.function_name, arg.name)) - for arg in arg_dict.values(): - logger.debug("%s: arg %s" % (kernel.function_name, arg.name)) + buf = arg.buf() + if buf is None: + continue - buf = arg.buf() - if buf is None: + current_op = OpRecord( + arg_name=arg.name, + kernel_name=kernel.function_name, + queue=weakref.ref(queue), + event=evt, + stack=traceback.format_stack()) + prior_ops = BUFFER_TO_OPS.setdefault(buf, []) + + unsynced_events = [] + for op in prior_ops: + prior_queue = op.queue() + if prior_queue is None: + continue + if prior_queue.int_ptr == queue.int_ptr: continue - prior_ops = BUFFER_TO_OPS.setdefault(buf, []) - unsynced_events = [] - for op in prior_ops: - prior_queue = op.queue() - if prior_queue is None: - continue - if prior_queue.int_ptr == queue.int_ptr: - continue - - prior_queue_events = QUEUE_TO_EVENTS.get(prior_queue, set()) - if op.event in prior_queue_events \ - and op.event not in synced_events: - unsynced_events.append((op.arg_name, op.kernel_name)) - - logger.debug("unsynced events: %s", list(unsynced_events)) - - tb = "".join(traceback.format_stack()) - if unsynced_events: - cc.concurrency_issues += 1 - - from warnings import warn - warn("\n[%5d] EventsNotSynced: argument `%s` kernel `%s`\n" - "%7s previous kernels %s\n" - "%7s %d events not found in `wait_for` " - "or synced with `queue.finish()` " - "or `cl.wait_for_events()`\n" % ( - cc.concurrency_issues, - arg.name, kernel.function_name, " ", - ", ".join([str(x) for x in unsynced_events]), " ", - len(unsynced_events)), - RuntimeWarning, stacklevel=5) - - if cc.show_traceback: - print("Traceback\n%s" % tb) - import pudb - pudb.set_trace() - - prior_ops.append(OpRecord( - arg_name=arg.name, - kernel_name=kernel.function_name, - queue=weakref.ref(queue), - event=evt, - stack=tb)) + prior_queue_events = QUEUE_TO_EVENTS.get(prior_queue, set()) + if op.event in prior_queue_events \ + and op.event not in synced_events: + unsynced_events.append(op) + + logger.debug("unsynced events: %s", + [op.event for op in unsynced_events]) + + if unsynced_events: + from warnings import warn + warn("\nEventsNotSynced: argument `%s` kernel `%s`\n" + "%7s previous kernels %s\n" + "%7s %d events not found in `wait_for` " + "or synced with `queue.finish()` " + "or `cl.wait_for_events()`\n" % ( + arg.name, kernel.function_name, " ", + ", ".join([str((op.arg_name, op.kernel_name)) + for op in unsynced_events]), " ", + len(unsynced_events)), + RuntimeWarning, stacklevel=5) + + if cc.show_traceback: + show_traceback_menu(kernel, arg, current_op, unsynced_events) + + prior_ops.append(current_op) return evt @@ -179,7 +245,7 @@ def wrapper_enqueue_nd_range_kernel(cc, def with_concurrency_check(func): def wrapper(func, *args, **kwargs): - with ConcurrencyCheck(show_traceback=True): + with ConcurrencyCheck(show_traceback=False): return func(*args, **kwargs) formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') -- GitLab From a564ed93ab3fe3c5b172abab9d88fd27a16f8a02 Mon Sep 17 00:00:00 2001 From: Alexandru Fikl Date: Tue, 5 Nov 2019 16:56:37 -0600 Subject: [PATCH 10/11] better menu --- pyopencl/check_concurrency.py | 74 +++++++++++++++++++++-------------- 1 file changed, 44 insertions(+), 30 deletions(-) diff --git a/pyopencl/check_concurrency.py b/pyopencl/check_concurrency.py index 5aaff348..414016e1 100644 --- a/pyopencl/check_concurrency.py +++ b/pyopencl/check_concurrency.py @@ -55,38 +55,45 @@ QUEUE_TO_EVENTS = weakref.WeakKeyDictionary() # {{{ helpers -def show_traceback_menu(kernel, arg, current_op, ops): - def print_menu(): - m = 1 +def print_menu(ops): + m = {"count": 1} - def print_option(text): - nonlocal m - print(' [%2d] %s' % (m, text)) - m += 1 + def print_option(text): + nonlocal m + print(' [%2d] %s' % (m["count"], text)) + m["count"] += 1 + + print("Choose action:") + print_option("Show info for current kernel call") + print_option("Show traceback for current kernel call") + for op in ops: + print_option("Show info for kernel `%s` call" % op.kernel_name) + print_option("Show traceback for kernel `%s` call" % op.kernel_name) + print_option("Enter debugger (pudb)") + print_option("Continue") + + return m["count"] - 1 + + +def print_op(op, knl): + print('Argument: %s' % op.arg_name) + print('Kernel: %s' % op.kernel_name) + if knl is not None: + args = CURRENT_BUF_ARGS.get(knl) + args = [x.name for x in args.values()] + print("Arguments: %s" % args) + print('Event: %s' % op.event) + print('Queue: %s (dead %s)' % (op.queue, op.queue() is None)) - print() - print("Choose action:") - print_option("Show info for current kernel call") - print_option("Show traceback for current kernel call") - for op in ops: - print_option("Show info for kernel `%s` call" % op.kernel_name) - print_option("Show traceback for kernel `%s` call" % op.kernel_name) - print_option("Enter debugger (pudb)") - print_option("Continue") - return m - 1 - - def print_op(op, knl): - print('Argument: %s' % op.arg_name) - print('Kernel: %s' % op.kernel_name) - if knl is not None: - args = CURRENT_BUF_ARGS.get(knl) - args = [x.name for x in args.values()] - print("Arguments: %s" % args) - print('Event: %s' % op.event) - print('Queue: %s (dead %s)' % (op.queue, op.queue() is None)) + +def show_traceback_menu(kernel, arg, current_op, ops): + print() + print("Current buffer `%s` in kernel `%s` has unsynced events " + "with calls to the kernels %s" % (arg.name, kernel.function_name, + [op.kernel_name for op in ops])) while True: - nchoices = print_menu() + nchoices = print_menu(ops) choice = input("Choice [%d]: " % nchoices) print() @@ -109,8 +116,11 @@ def show_traceback_menu(kernel, arg, current_op, ops): print("Traceback") print("".join(traceback.format_stack()[:-3])) elif choice == nchoices - 1: - import pudb - pudb.set_trace() + try: + import pudb + pudb.set_trace() + except ImportError: + print("ImportError: pudb not found") elif choice == nchoices: break elif (choice - 3) % 2 == 0: @@ -119,6 +129,8 @@ def show_traceback_menu(kernel, arg, current_op, ops): print("Traceback") print("".join(ops[(choice - 3) // 2].stack[:-2])) + print() + # }}} @@ -144,6 +156,8 @@ def wrapper_set_arg(cc, kernel, index, obj): arg_name = kernel.get_arg_info(index, cl.kernel_arg_info.NAME) except cl.RuntimeError: arg_name = str(index) + except AttributeError: + arg_name = str(index) arg_dict[index] = ArgRecord( name=arg_name, -- GitLab From 34a6acc0cce8fca74fedda059f097190b5645481 Mon Sep 17 00:00:00 2001 From: Alexandru Fikl Date: Tue, 5 Nov 2019 17:17:40 -0600 Subject: [PATCH 11/11] remove nonlocal --- pyopencl/check_concurrency.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pyopencl/check_concurrency.py b/pyopencl/check_concurrency.py index 414016e1..3bce3aef 100644 --- a/pyopencl/check_concurrency.py +++ b/pyopencl/check_concurrency.py @@ -59,7 +59,6 @@ def print_menu(ops): m = {"count": 1} def print_option(text): - nonlocal m print(' [%2d] %s' % (m["count"], text)) m["count"] += 1 -- GitLab