From 08be423154464a682f695369ee9e7f01238cd12f Mon Sep 17 00:00:00 2001
From: Yichao Yu <yyc1992@gmail.com>
Date: Sun, 25 May 2014 11:54:00 -0400
Subject: [PATCH] async

---
 pyopencl/_cffi.py       | 12 +++++-
 pyopencl/cffi_cl.py     |  8 ++--
 setup.py                |  8 ++--
 src/c_wrapper/async.cpp | 88 +++++++++++++++++++++++++++++++++++++++++
 src/c_wrapper/async.h   | 13 ++++++
 5 files changed, 120 insertions(+), 9 deletions(-)
 create mode 100644 src/c_wrapper/async.cpp
 create mode 100644 src/c_wrapper/async.h

diff --git a/pyopencl/_cffi.py b/pyopencl/_cffi.py
index 5775078f..9b15c2b4 100644
--- a/pyopencl/_cffi.py
+++ b/pyopencl/_cffi.py
@@ -175,7 +175,7 @@ def _py_deref(_id):
     except:
         pass
 
-def _get_insert_func(obj):
+def _get_ref_func(obj):
     @_ffi.callback('void(unsigned long)')
     def _insert(_id):
         _pyref[_id] = obj
@@ -185,3 +185,13 @@ def _find_obj(_id):
     return _pyref.get(_id, None)
 
 _lib.set_deref(_py_deref)
+
+import traceback
+def _to_c_callback(func, *args, **kwargs):
+    @_ffi.callback('void()')
+    def _func():
+        try:
+            func(*args, **kwargs)
+        except:
+            traceback.print_exc()
+    return _func
diff --git a/pyopencl/cffi_cl.py b/pyopencl/cffi_cl.py
index a2cef461..ba6d6eb8 100644
--- a/pyopencl/cffi_cl.py
+++ b/pyopencl/cffi_cl.py
@@ -32,7 +32,7 @@ import sys
 
 # TODO: can we do without ctypes?
 import ctypes
-from pyopencl._cffi import _ffi, _lib, _get_insert_func, _find_obj
+from pyopencl._cffi import _ffi, _lib, _get_ref_func, _find_obj, _to_c_callback
 
 # {{{ compatibility shims
 
@@ -814,7 +814,7 @@ def _enqueue_read_buffer(queue, mem, hostbuf, device_offset=0,
     _handle_error(_lib.enqueue_read_buffer(
         ptr_event, queue.ptr, mem.ptr, c_buf, size, device_offset,
         c_wait_for, num_wait_for, bool(is_blocking),
-        _get_insert_func(hostbuf)))
+        _get_ref_func(hostbuf)))
     return _create_instance(NannyEvent, ptr_event[0])
 
 
@@ -836,7 +836,7 @@ def _enqueue_write_buffer(queue, mem, hostbuf, device_offset=0,
     _handle_error(_lib.enqueue_write_buffer(
         ptr_event, queue.ptr, mem.ptr, c_buf, size, device_offset,
         c_wait_for, num_wait_for, bool(is_blocking),
-        _get_insert_func(c_ref)))
+        _get_ref_func(c_ref)))
     return _create_instance(NannyEvent, ptr_event[0])
 
 # }}}
@@ -853,7 +853,7 @@ def _enqueue_read_image(queue, mem, origin, region, hostbuf, row_pitch=0,
     _handle_error(_lib.enqueue_read_image(
         ptr_event, queue.ptr, mem.ptr, origin, region, c_buf, row_pitch,
         slice_pitch, c_wait_for, num_wait_for, bool(is_blocking),
-        _get_insert_func(c_buf)))
+        _get_ref_func(c_buf)))
     return _create_instance(NannyEvent, ptr_event[0])
 
 # TODO: write_image copy_image fill_image
diff --git a/setup.py b/setup.py
index 062681a2..1cdcc559 100644
--- a/setup.py
+++ b/setup.py
@@ -235,13 +235,13 @@ def main():
                                 "src/c_wrapper/wrap_constants.cpp",
                                 "src/c_wrapper/bitlog.cpp",
                                 "src/c_wrapper/pyhelper.cpp",
-                                #"src/c_wrapper/wrap_mempool.cpp",
+                                "src/c_wrapper/async.cpp",
+                                # "src/c_wrapper/wrap_mempool.cpp",
                                 ],
                                include_dirs=(
                                    conf["CL_INC_DIR"]
-                                   + ["src/c_wrapper/"]
-                                   + ["pyopencl/c_wrapper/"]
-                                   ),
+                                   + ["src/c_wrapper/",
+                                      "pyopencl/c_wrapper/"]),
                                library_dirs=conf["CL_LIB_DIR"],
                                libraries=conf["CL_LIBNAME"],
                                define_macros=list(EXTRA_DEFINES.items()),
diff --git a/src/c_wrapper/async.cpp b/src/c_wrapper/async.cpp
new file mode 100644
index 00000000..abe9f9f9
--- /dev/null
+++ b/src/c_wrapper/async.cpp
@@ -0,0 +1,88 @@
+#include "async.h"
+
+#include <queue>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
+
+namespace pyopencl {
+
+template <typename T>
+class Queue {
+private:
+    std::queue<T> m_queue;
+    std::mutex m_mutex;
+    std::condition_variable m_cond;
+public:
+    T
+    pop()
+    {
+        std::unique_lock<std::mutex> mlock(m_mutex);
+        while (m_queue.empty()) {
+            m_cond.wait(mlock);
+        }
+        auto item = m_queue.front();
+        m_queue.pop();
+        return item;
+    }
+    void
+    push(const T &item)
+    {
+        {
+            std::unique_lock<std::mutex> mlock(m_mutex);
+            m_queue.push(item);
+        }
+        m_cond.notify_one();
+    }
+};
+
+class AsyncCaller {
+private:
+    Queue<std::function<void()> > m_queue;
+    std::once_flag m_flag;
+    void
+    worker()
+    {
+        while (true) {
+            try {
+                auto func = m_queue.pop();
+                func();
+            } catch (...) {
+            }
+        }
+    }
+    void
+    start_thread()
+    {
+        std::thread t(&AsyncCaller::worker, this);
+        t.detach();
+    }
+public:
+    void
+    ensure_thread()
+    {
+        std::call_once(m_flag, &AsyncCaller::start_thread, this);
+    }
+    void
+    push(const std::function<void()> &func)
+    {
+        ensure_thread();
+        m_queue.push(func);
+    }
+};
+
+static AsyncCaller async_caller;
+
+void
+call_async(const std::function<void()> &func)
+{
+    async_caller.push(func);
+}
+
+void
+init_async()
+{
+    async_caller.ensure_thread();
+}
+
+}
diff --git a/src/c_wrapper/async.h b/src/c_wrapper/async.h
new file mode 100644
index 00000000..32143925
--- /dev/null
+++ b/src/c_wrapper/async.h
@@ -0,0 +1,13 @@
+#ifndef __PYOPENCL_ASYNC_H
+#define __PYOPENCL_ASYNC_H
+
+#include <functional>
+
+namespace pyopencl {
+
+void init_async();
+void call_async(const std::function<void()> &func);
+
+}
+
+#endif
-- 
GitLab