from __future__ import division import numpy as np import loopy as lp import pyopencl as cl from pyopencl.tools import pytest_generate_tests_for_pyopencl \ as pytest_generate_tests __all__ = ["pytest_generate_tests", "cl" # 'cl.create_some_context' ] def test_owed_barriers(ctx_factory): ctx = ctx_factory() knl = lp.make_kernel(ctx.devices[0], "{[i]: 0<=i<100}", [ "[i:l.0] z[i] = a[i]" ], [lp.GlobalArg("a", np.float32, shape=(100,))] ) kernel_gen = lp.generate_loop_schedules(knl) kernel_gen = lp.check_kernels(kernel_gen) for gen_knl in kernel_gen: compiled = lp.CompiledKernel(ctx, gen_knl) print compiled.code def test_wg_too_small(ctx_factory): ctx = ctx_factory() knl = lp.make_kernel(ctx.devices[0], "{[i]: 0<=i<100}", [ "[i:l.0] z[i] = a[i] {id=copy}" ], [lp.GlobalArg("a", np.float32, shape=(100,))], local_sizes={0: 16}) kernel_gen = lp.generate_loop_schedules(knl) kernel_gen = lp.check_kernels(kernel_gen) for gen_knl in kernel_gen: try: lp.CompiledKernel(ctx, gen_knl) except RuntimeError, e: assert "implemented and desired" in str(e) pass # expected! else: assert False # expecting an error def test_multi_cse(ctx_factory): ctx = ctx_factory() knl = lp.make_kernel(ctx.devices[0], "{[i]: 0<=i<100}", [ "[i] z[i] = a[i] + a[i]**2" ], [lp.GlobalArg("a", np.float32, shape=(100,))], local_sizes={0: 16}) knl = lp.split_dimension(knl, "i", 16, inner_tag="l.0") knl = lp.add_prefetch(knl, "a", []) kernel_gen = lp.generate_loop_schedules(knl) kernel_gen = lp.check_kernels(kernel_gen) for gen_knl in kernel_gen: compiled = lp.CompiledKernel(ctx, gen_knl) print compiled.code def test_stencil(ctx_factory): ctx = ctx_factory() # n=32 causes corner case behavior in size calculations for temprorary (a # non-unifiable, two-constant-segments PwAff as the base index) n = 256 knl = lp.make_kernel(ctx.devices[0], "{[i,j]: 0<= i,j < %d}" % n, [ "a_offset(ii, jj) := a[ii+1, jj+1]", "z[i,j] = -2*a_offset(i,j)" " + a_offset(i,j-1)" " + a_offset(i,j+1)" " + a_offset(i-1,j)" " + a_offset(i+1,j)" ], [ lp.GlobalArg("a", np.float32, shape=(n+2,n+2,)), lp.GlobalArg("z", np.float32, shape=(n+2,n+2,)) ]) ref_knl = knl def variant_1(knl): knl = lp.split_dimension(knl, "i", 16, outer_tag="g.1", inner_tag="l.1") knl = lp.split_dimension(knl, "j", 16, outer_tag="g.0", inner_tag="l.0") knl = lp.add_prefetch(knl, "a", ["i_inner", "j_inner"]) return knl for variant in [variant_1]: kernel_gen = lp.generate_loop_schedules(variant(knl), loop_priority=["i_outer", "i_inner_0", "j_0"]) kernel_gen = lp.check_kernels(kernel_gen) lp.auto_test_vs_ref(ref_knl, ctx, kernel_gen, fills_entire_output=False, print_ref_code=True, op_count=[n*n], op_label=["cells"]) def test_eq_constraint(ctx_factory): ctx = ctx_factory() knl = lp.make_kernel(ctx.devices[0], "{[i,j]: 0<= i,j < 32}", [ "a[i] = b[i]" ], [ lp.GlobalArg("a", np.float32, shape=(1000,)), lp.GlobalArg("b", np.float32, shape=(1000,)) ]) knl = lp.split_dimension(knl, "i", 16, outer_tag="g.0") knl = lp.split_dimension(knl, "i_inner", 16, outer_tag=None, inner_tag="l.0") kernel_gen = lp.generate_loop_schedules(knl) kernel_gen = lp.check_kernels(kernel_gen) for knl in kernel_gen: print lp.generate_code(knl) def test_argmax(ctx_factory): dtype = np.dtype(np.float32) ctx = ctx_factory() queue = cl.CommandQueue(ctx) order = "C" n = 10000 knl = lp.make_kernel(ctx.devices[0], "{[i]: 0<=i<%d}" % n, [ "<> result = argmax(i, fabs(a[i]))", "max_idx = result.index", "max_val = result.value", ], [ lp.GlobalArg("a", dtype, shape=(n,), order=order), lp.GlobalArg("max_idx", np.int32, shape=(), order=order), lp.GlobalArg("max_val", dtype, shape=(), order=order), ]) a = np.random.randn(10000).astype(dtype) cknl = lp.CompiledKernel(ctx, knl) evt, (max_idx, max_val) = cknl(queue, a=a, out_host=True) assert max_val == np.max(np.abs(a)) assert max_idx == np.where(np.abs(a)==max_val)[-1] def make_random_value(): from random import randrange, uniform v = randrange(3) if v == 0: while True: z = randrange(-1000, 1000) if z: return z elif v == 1: return uniform(-10, 10) else: return uniform(-10, 10) + 1j*uniform(-10, 10) def make_random_expression(var_values, size): from random import randrange import pymbolic.primitives as p v = randrange(1500) size[0] += 1 if v < 500 and size[0] < 40: term_count = randrange(2, 5) if randrange(2) < 1: cls = p.Sum else: cls = p.Product return cls(tuple( make_random_expression(var_values, size) for i in range(term_count))) elif v < 750: return make_random_value() elif v < 1000: var_name = "var_%d" % len(var_values) assert var_name not in var_values var_values[var_name] = make_random_value() return p.Variable(var_name) elif v < 1250: return make_random_expression(var_values, size) - make_random_expression(var_values, size) elif v < 1500: return make_random_expression(var_values, size) / make_random_expression(var_values, size) def generate_random_fuzz_examples(count): for i in xrange(count): size = [0] var_values = {} expr = make_random_expression(var_values, size) yield expr, var_values def test_fuzz_code_generator(ctx_factory): ctx = ctx_factory() queue = cl.CommandQueue(ctx) #from expr_fuzz import get_fuzz_examples for expr, var_values in generate_random_fuzz_examples(20): #for expr, var_values in get_fuzz_examples(): from pymbolic import evaluate true_value = evaluate(expr, var_values) def get_dtype(x): if isinstance(x, complex): return np.complex128 else: return np.float64 knl = lp.make_kernel(ctx.devices[0], "{ : }", [lp.Instruction(None, "value", expr)], [lp.GlobalArg("value", np.complex128, shape=())] + [ lp.ValueArg(name, get_dtype(val)) for name, val in var_values.iteritems() ]) ck = lp.CompiledKernel(ctx, knl) evt, (lp_value,) = ck(queue, out_host=True, **var_values) err = abs(true_value-lp_value)/abs(true_value) if abs(err) > 1e-10: print "---------------------------------------------------------------------" print "WRONG: rel error=%g" % err print "true=%r" % true_value print "loopy=%r" % lp_value print "---------------------------------------------------------------------" print ck.code print "---------------------------------------------------------------------" print var_values print "---------------------------------------------------------------------" print repr(expr) print "---------------------------------------------------------------------" print expr print "---------------------------------------------------------------------" 1/0 def test_empty_reduction(ctx_factory): dtype = np.dtype(np.float32) ctx = ctx_factory() queue = cl.CommandQueue(ctx) knl = lp.make_kernel(ctx.devices[0], [ "{[i]: 0<=i<20}", "{[j]: 0<=j<0}" ], [ "a[i] = sum(j, j)", ], [ lp.GlobalArg("a", dtype, (20,)), ]) cknl = lp.CompiledKernel(ctx, knl) evt, (a,) = cknl(queue) assert (a.get() == 0).all() def test_nested_dependent_reduction(ctx_factory): dtype = np.dtype(np.int32) ctx = ctx_factory() queue = cl.CommandQueue(ctx) knl = lp.make_kernel(ctx.devices[0], [ "{[i]: 0<=i sumlen = l[i]", "a[i] = sum(j, j)", ], [ lp.ValueArg("n", np.int32), lp.GlobalArg("a", dtype, ("n",)), lp.GlobalArg("l", np.int32, ("n",)), ]) cknl = lp.CompiledKernel(ctx, knl) n = 330 l = np.arange(n, dtype=np.int32) evt, (a,) = cknl(queue, l=l, n=n, out_host=True) tgt_result = (2*l-1)*2*l/2 assert (a == tgt_result).all() def test_dependent_loop_bounds(ctx_factory): dtype = np.dtype(np.float32) ctx = ctx_factory() knl = lp.make_kernel(ctx.devices[0], [ "{[i]: 0<=i row_len = a_rowstarts[i+1] - a_rowstarts[i]", "ax[i] = sum(jj, a_values[a_rowstarts[i]+jj])", ], [ lp.GlobalArg("a_rowstarts", np.int32), lp.GlobalArg("a_indices", np.int32), lp.GlobalArg("a_values", dtype), lp.GlobalArg("x", dtype), lp.GlobalArg("ax", dtype), lp.ValueArg("n", np.int32), ], assumptions="n>=1 and row_len>=1") cknl = lp.CompiledKernel(ctx, knl) print "---------------------------------------------------" cknl.print_code() print "---------------------------------------------------" def test_dependent_loop_bounds_2(ctx_factory): dtype = np.dtype(np.float32) ctx = ctx_factory() knl = lp.make_kernel(ctx.devices[0], [ "{[i]: 0<=i row_start = a_rowstarts[i]", "<> row_len = a_rowstarts[i+1] - row_start", "ax[i] = sum(jj, a_values[row_start+jj])", ], [ lp.GlobalArg("a_rowstarts", np.int32), lp.GlobalArg("a_indices", np.int32), lp.GlobalArg("a_values", dtype), lp.GlobalArg("x", dtype), lp.GlobalArg("ax", dtype), lp.ValueArg("n", np.int32), ], assumptions="n>=1 and row_len>=1") knl = lp.split_dimension(knl, "i", 128, outer_tag="g.0", inner_tag="l.0") cknl = lp.CompiledKernel(ctx, knl) print "---------------------------------------------------" cknl.print_code() print "---------------------------------------------------" def test_dependent_loop_bounds_3(ctx_factory): # The point of this test is that it shows a dependency between # domains that is exclusively mediated by the row_len temporary. # It also makes sure that row_len gets read before any # conditionals use it. dtype = np.dtype(np.float32) ctx = ctx_factory() knl = lp.make_kernel(ctx.devices[0], [ "{[i]: 0<=i row_len = a_row_lengths[i]", "a[i,jj] = 1", ], [ lp.GlobalArg("a_row_lengths", np.int32), lp.GlobalArg("a", dtype, shape=("n,n"), order="C"), lp.ValueArg("n", np.int32), ]) assert knl.parents_per_domain()[1] == 0 knl = lp.split_dimension(knl, "i", 128, outer_tag="g.0", inner_tag="l.0") cknl = lp.CompiledKernel(ctx, knl) print "---------------------------------------------------" cknl.print_code() print "---------------------------------------------------" knl_bad = lp.split_dimension(knl, "jj", 128, outer_tag="g.1", inner_tag="l.1") import pytest with pytest.raises(RuntimeError): list(lp.generate_loop_schedules(knl_bad)) def test_independent_multi_domain(ctx_factory): dtype = np.dtype(np.float32) ctx = ctx_factory() queue = cl.CommandQueue(ctx) knl = lp.make_kernel(ctx.devices[0], [ "{[i]: 0<=i {[i]: 0<=i znirp = n", "a[i] = 1", ], [ lp.GlobalArg("a", dtype, shape=("n"), order="C"), lp.ValueArg("n", np.int32), ]) cknl = lp.CompiledKernel(ctx, knl) n = 20000 evt, (a,) = cknl(queue, n=n, out_host=True) assert a.shape == (n,) assert (a == 1).all() def test_split(ctx_factory): dtype = np.float32 ctx = ctx_factory() order = "C" K = 10000 Np = 36 Nq = 50 knl = lp.make_kernel(ctx.devices[0], "[K] -> {[i,j,k,ii,jj]: 0<=k temp[ii] = sum(jj, d[ii, jj]*f[k, jj])", "result[k, i] = sum(j, d2[i, j]*temp[j])" ], [ lp.GlobalArg("d", dtype, shape="Np, Nq", order=order), lp.GlobalArg("d2", dtype, shape="Np, Np", order=order), lp.GlobalArg("f", dtype, shape="K, Nq", order=order), lp.GlobalArg("result", dtype, shape="K, Np", order=order), lp.ValueArg("K", np.int32, approximately=1000), ], name="batched_matvec", assumptions="K>=1", defines=dict(Np=Np, Nq=Nq)) seq_knl = knl knl = lp.add_prefetch(knl, 'd[:,:]') knl = lp.add_prefetch(knl, 'd2[:,:]') kernel_gen = lp.generate_loop_schedules(knl) kernel_gen = lp.check_kernels(kernel_gen, dict(K=K)) lp.auto_test_vs_ref(seq_knl, ctx, kernel_gen, op_count=[K*2*(Np**2+Np*Nq)/1e9], op_label=["GFlops"], parameters=dict(K=K), print_ref_code=True) if __name__ == "__main__": import sys if len(sys.argv) > 1: exec(sys.argv[1]) else: from py.test.cmdline import main main([__file__])