__copyright__ = "Copyright (C) 2012 Andreas Kloeckner" __license__ = """ Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ import logging import numpy as np import pytest import pymbolic.primitives as prim import pyopencl as cl import pyopencl.clmath import pyopencl.clrandom import pyopencl.tools import pyopencl.version from pyopencl.tools import ( # noqa: F401 pytest_generate_tests_for_pyopencl as pytest_generate_tests, ) import loopy as lp from loopy.diagnostic import LoopyError from loopy.target.c import CTarget from loopy.target.opencl import OpenCLTarget from loopy.version import LOOPY_USE_LANGUAGE_VERSION_2018_2 # noqa: F401 logger = logging.getLogger(__name__) def test_ispc_target(): from loopy.target.ispc import ISPCTarget knl = lp.make_kernel( "{ [i]: 0<=i<n }", "out[i] = 2*a[i]", [ lp.GlobalArg("out,a", np.float32, shape=lp.auto), "..." ], target=ISPCTarget()) knl = lp.split_iname(knl, "i", 8, inner_tag="l.0") knl = lp.split_iname(knl, "i_outer", 4, outer_tag="g.0", inner_tag="ilp") knl = lp.add_prefetch(knl, "a", ["i_inner", "i_outer_inner"], default_tag="l.auto") codegen_result = lp.generate_code_v2(knl) print(codegen_result.device_code()) print(codegen_result.host_code()) def test_cuda_target(): from loopy.target.cuda import CudaTarget knl = lp.make_kernel( "{ [i]: 0<=i<n }", "out[i] = 2*a[i]", [ lp.GlobalArg("out,a", np.float32, shape=lp.auto), "..." ], target=CudaTarget()) knl = lp.split_iname(knl, "i", 8, inner_tag="l.0") knl = lp.split_iname(knl, "i_outer", 4, outer_tag="g.0", inner_tag="ilp") knl = lp.add_prefetch(knl, "a", ["i_inner", "i_outer_inner"], default_tag="l.auto") print( lp.generate_code_v2( knl).device_code()) def test_generate_c_snippet(): from pymbolic import var I = var("I") # noqa: N806,E741 f = var("f") df = var("df") q_v = var("q_v") k = var("k") u = var("u") from functools import partial l_sum = partial(lp.Reduction, "sum", allow_simultaneous=True) Instr = lp.Assignment # noqa: N806 knl = lp.make_kernel( "{[I, k]: 0<=I<nSpace and 0<=k<nQuad}", [ Instr(f[I], l_sum(k, q_v[k, I]*u)), Instr(df[I], l_sum(k, q_v[k, I])), ], [ lp.GlobalArg("q_v", np.float64, shape="nQuad, nSpace"), lp.GlobalArg("f,df", np.float64, shape="nSpace"), lp.ValueArg("u", np.float64), "...", ], target=CTarget(), assumptions="nQuad>=1") if 0: # enable to play with prefetching # (prefetch currently requires constant sizes) knl = lp.fix_parameters(knl, nQuad=5, nSpace=3) knl = lp.add_prefetch(knl, "q_v", "k,I", default_tag=None) knl = lp.split_iname(knl, "k", 4, inner_tag="unr", slabs=(0, 1)) knl = lp.prioritize_loops(knl, "I,k_outer,k_inner") print(lp.generate_code_v2(knl)) @pytest.mark.parametrize("target", [CTarget, OpenCLTarget]) @pytest.mark.parametrize("tp", ["f32", "f64"]) def test_math_function(target, tp): # Test correct maths functions are generated for C and OpenCL # backend instead for different data type data_type = {"f32": np.float32, "f64": np.float64}[tp] import pymbolic.primitives as p i = p.Variable("i") xi = p.Subscript(p.Variable("x"), i) yi = p.Subscript(p.Variable("y"), i) zi = p.Subscript(p.Variable("z"), i) n = 100 domain = "{[i]: 0<=i<%d}" % n data = [lp.GlobalArg("x", data_type, shape=(n,)), lp.GlobalArg("y", data_type, shape=(n,)), lp.GlobalArg("z", data_type, shape=(n,))] inst = [lp.Assignment(xi, p.Variable("min")(yi, zi))] knl = lp.make_kernel(domain, inst, data, target=target()) code = lp.generate_code_v2(knl).device_code() assert "fmin" in code if tp == "f32" and target == CTarget: assert "fminf" in code else: assert "fminf" not in code inst = [lp.Assignment(xi, p.Variable("max")(yi, zi))] knl = lp.make_kernel(domain, inst, data, target=target()) code = lp.generate_code_v2(knl).device_code() assert "fmax" in code if tp == "f32" and target == CTarget: assert "fmaxf" in code else: assert "fmaxf" not in code @pytest.mark.parametrize("tp", ["f32", "f64"]) def test_random123(ctx_factory, tp): ctx = ctx_factory() queue = cl.CommandQueue(ctx) if cl.version.VERSION < (2016, 2): pytest.skip("Random123 RNG not supported in PyOpenCL < 2016.2") n = 150000 knl = lp.make_kernel( "{ [i]: 0<=i<n }", """ <> key2 = make_uint2(i, 324830944) {inames=i} <> key4 = make_uint4(i, 324830944, 234181, 2233) {inames=i} <> ctr = make_uint4(0, 1, 2, 3) {inames=i,id=init_ctr} <> real, ctr = philox4x32_TYPE(ctr, key2) {id=realpart,dep=init_ctr} <> imag, ctr = threefry4x32_TYPE(ctr, key4) {dep=init_ctr:realpart} out[i, 0] = real.s0 + 1j * imag.s0 out[i, 1] = real.s1 + 1j * imag.s1 out[i, 2] = real.s2 + 1j * imag.s2 out[i, 3] = real.s3 + 1j * imag.s3 """.replace("TYPE", tp)) knl = lp.split_iname(knl, "i", 128, outer_tag="g.0", inner_tag="l.0") knl = lp.set_options(knl, write_code=True) _evt, (out,) = knl(queue, n=n) out = out.get() assert (out < 1).all() assert (0 <= out).all() def test_tuple(ctx_factory): ctx = ctx_factory() queue = cl.CommandQueue(ctx) import islpy as isl knl = lp.make_kernel( [isl.BasicSet("[] -> {[]: }")], """ a, b = make_tuple(1, 2.) """) _evt, (a, b) = knl(queue) assert a.get() == 1 assert b.get() == 2. def test_clamp(ctx_factory): ctx = ctx_factory() queue = cl.CommandQueue(ctx) n = 15 * 10**6 x = cl.clrandom.rand(queue, n, dtype=np.float32) knl = lp.make_kernel( "{ [i]: 0<=i<n }", "out[i] = clamp(x[i], a, b)") knl = lp.split_iname(knl, "i", 128, outer_tag="g.0", inner_tag="l.0") knl = lp.set_options(knl, write_code=True) _evt, (_out,) = knl(queue, x=x, a=np.float32(12), b=np.float32(15)) def test_sized_integer_c_codegen(ctx_factory): ctx = ctx_factory() queue = cl.CommandQueue(ctx) from pymbolic import var knl = lp.make_kernel( "{[i]: 0<=i<n}", [lp.Assignment("a[i]", lp.TypeCast(np.int64, 1) << var("i"))] ) knl = lp.set_options(knl, write_code=True) n = 40 _evt, (a,) = knl(queue, n=n) a_ref = 1 << np.arange(n, dtype=np.int64) assert np.array_equal(a_ref, a.get()) def test_child_invalid_type_cast(): from pymbolic import var knl = lp.make_kernel( "{[i]: 0<=i<n}", ["<> ctr = make_uint2(0, 0)", lp.Assignment("a[i]", lp.TypeCast(np.int64, var("ctr")) << var("i"))] ) with pytest.raises(lp.LoopyError): knl = lp.preprocess_kernel(knl) def test_target_invalid_type_cast(): dtype = np.dtype([("", "<u4"), ("", "<i4")]) with pytest.raises(lp.LoopyError): lp.TypeCast(dtype, 1) def test_ispc_streaming_stores(): stream_dtype = np.float32 index_dtype = np.int32 knl = lp.make_kernel( "{[i]: 0<=i<n}", "a[i] = b[i] + scalar * c[i]", target=lp.ISPCTarget(), index_dtype=index_dtype, name="stream_triad") vars = ["a", "b", "c", "scalar"] knl = lp.assume(knl, "n>0") knl = lp.split_iname( knl, "i", 2**18, outer_tag="g.0", slabs=(0, 1)) knl = lp.split_iname(knl, "i_inner", 8, inner_tag="l.0") knl = lp.tag_instructions(knl, "!streaming_store") knl = lp.add_and_infer_dtypes(knl, dict.fromkeys(vars, stream_dtype)) knl = lp.set_argument_order(knl, [*vars, "n"]) lp.generate_code_v2(knl).all_code() assert "streaming_store(" in lp.generate_code_v2(knl).all_code() def test_cuda_short_vector(): knl = lp.make_kernel( "{ [i]: 0<=i<n }", "out[i] = 2*a[i]", target=lp.CudaTarget()) knl = lp.set_options(knl, write_code=True) knl = lp.split_iname(knl, "i", 4, slabs=(0, 1), inner_tag="vec") knl = lp.split_array_axis(knl, "a,out", axis_nr=0, count=4) knl = lp.tag_array_axes(knl, "a,out", "C,vec") knl = lp.set_options(knl, write_wrapper=True) knl = lp.add_and_infer_dtypes(knl, {"a": np.float32}) print(lp.generate_code_v2(knl).device_code()) def test_pyopencl_execution_numpy_handling(ctx_factory): ctx = ctx_factory() queue = cl.CommandQueue(ctx) # test numpy input for x is written to and returned knl = lp.make_kernel("{:}", ["x[0] = y[0] + x[0]"]) y = np.array([3.]) x = np.array([4.]) _evt, out = knl(queue, y=y, x=x) assert out[0] is x assert x[0] == 7. # test numpy input for x is written to and returned, even when a pyopencl array # is passed for y import pyopencl.array as cla y = cla.zeros(queue, shape=(1), dtype="float64") + 3. x = np.array([4.]) _evt, out = knl(queue, y=y, x=x) assert out[0] is x assert x[0] == 7. # test numpy input for x is written to and returned, even when output-only knl = lp.make_kernel("{:}", ["x[0] = y[0] + 2"]) y = np.array([3.]) x = np.array([4.]) _evt, out = knl(queue, y=y, x=x) assert out[0] is x assert x[0] == 5. def test_opencl_support_for_bool(ctx_factory): knl = lp.make_kernel( "{[i]: 0<=i<10}", """ y[i] = i%2 """, [lp.GlobalArg("y", dtype=np.bool_, shape=lp.auto)]) cl_ctx = ctx_factory() _evt, (out, ) = knl(cl.CommandQueue(cl_ctx)) out = out.get() np.testing.assert_equal(out, np.tile(np.array([0, 1], dtype=np.bool_), 5)) @pytest.mark.parametrize("target", [lp.PyOpenCLTarget, lp.ExecutableCTarget]) def test_nan_support(ctx_factory, target): from pymbolic.primitives import NaN, Variable from loopy.symbolic import parse ctx = ctx_factory() queue = cl.CommandQueue(ctx) knl = lp.make_kernel( "{:}", [lp.Assignment(parse("a"), np.nan), lp.Assignment(parse("b"), parse("isnan(a)")), lp.Assignment(parse("c"), parse("isnan(3.14)")), lp.Assignment(parse("d"), parse("isnan(0.0)")), lp.Assignment(parse("e"), NaN(np.float32)), lp.Assignment(parse("f"), Variable("isnan")(NaN(None))), lp.Assignment(parse("g"), NaN(np.complex64)), lp.Assignment(parse("h"), NaN(np.complex128)), ], [lp.GlobalArg("a", is_input=False, shape=()), ...], seq_dependencies=True, target=target()) knl = lp.set_options(knl, return_dict=True) if target == lp.PyOpenCLTarget: _evt, out_dict = knl(queue) out_dict = {k: v.get() for k, v in out_dict.items()} elif target == lp.ExecutableCTarget: _evt, out_dict = knl() else: raise NotImplementedError("unsupported target") assert np.isnan(out_dict["a"]) assert out_dict["b"] == 1 assert out_dict["c"] == 0 assert out_dict["d"] == 0 assert np.isnan(out_dict["e"]) assert out_dict["e"].dtype == np.float32 assert out_dict["f"] == 1 assert np.isnan(out_dict["g"]) assert out_dict["g"].dtype == np.complex64 assert np.isnan(out_dict["h"]) assert out_dict["h"].dtype == np.complex128 @pytest.mark.parametrize("target", [lp.PyOpenCLTarget, lp.ExecutableCTarget]) def test_emits_ternary_operators_correctly(ctx_factory, target): # See: https://github.com/inducer/loopy/issues/390 ctx = ctx_factory() queue = cl.CommandQueue(ctx) knl = lp.make_kernel( "{:}", """ <> tmp1 = 3.1416 <> tmp2 = 0.000 y1 = 1729 if tmp1 else 1.414 y2 = 42 if 2.7183 else 13 y3 = 127 if tmp2 else 128 """, seq_dependencies=True, target=target()) knl = lp.set_options(knl, return_dict=True) if target == lp.PyOpenCLTarget: _evt, out_dict = knl(queue) elif target == lp.ExecutableCTarget: _evt, out_dict = knl() else: raise NotImplementedError("unsupported target") assert out_dict["y1"] == 1729 assert out_dict["y2"] == 42 assert out_dict["y3"] == 128 def test_scalar_array_take_offset(ctx_factory): import pyopencl.array as cla ctx = ctx_factory() cq = cl.CommandQueue(ctx) knl = lp.make_kernel( "{:}", """ y = 133*x """, [lp.GlobalArg("x", shape=(), offset=lp.auto), ...]) x_in_base = cla.arange(cq, 42, dtype=np.int32) x_in = x_in_base[13] _evt, (out,) = knl(cq, x=x_in) np.testing.assert_allclose(out.get(), 1729) @pytest.mark.parametrize("target", [lp.PyOpenCLTarget, lp.ExecutableCTarget]) @pytest.mark.parametrize("dtype", [np.float32, np.float64]) def test_inf_support(ctx_factory, target, dtype): import math from loopy.symbolic import parse # See: https://github.com/inducer/loopy/issues/443 for some laughs ctx = ctx_factory() queue = cl.CommandQueue(ctx) knl = lp.make_kernel( "{:}", [lp.Assignment(parse("out_inf"), math.inf), lp.Assignment(parse("out_neginf"), -math.inf)], [lp.GlobalArg("out_inf", shape=lp.auto, dtype=dtype), lp.GlobalArg("out_neginf", shape=lp.auto, dtype=dtype) ], target=target()) knl = lp.set_options(knl, return_dict=True) if target == lp.PyOpenCLTarget: _, out_dict = knl(queue) out_dict = {k: v.get() for k, v in out_dict.items()} elif target == lp.ExecutableCTarget: _, out_dict = knl() else: raise NotImplementedError("unsupported target") assert np.isinf(out_dict["out_inf"]) assert np.isneginf(out_dict["out_neginf"]) def test_input_args_are_required(ctx_factory): ctx = ctx_factory() queue = cl.CommandQueue(ctx) knl1 = lp.make_kernel( "{ [i]: 0<=i<2 }", """ g[i] = f[i] + 1.5 """, [lp.GlobalArg("f, g", shape=lp.auto, dtype="float64"), ...] ) knl2 = lp.make_kernel( "{ [i]: 0<=i<n }", "g[i] = 3 * f[i] + g[i]", ) f = np.zeros(2) g = np.zeros(2) for knl in [knl1, knl2]: with pytest.raises(LoopyError): _ = knl(queue) _ = knl(queue, g=g) _ = knl1(queue, f=f) _ = knl1(queue, f=f, g=g) knl = lp.make_kernel( "{ [i]: 0<=i<2 }", """ f[i] = 3. g[i] = f[i] + 1.5 """, [lp.GlobalArg("f, g", shape=lp.auto, dtype="float64"), ...] ) # FIXME: this should not raise! # https://github.com/inducer/loopy/issues/450 with pytest.raises(LoopyError): _ = knl(queue) def test_pyopencl_target_with_global_temps_with_base_storage(ctx_factory): from pyopencl.tools import ImmediateAllocator class RecordingAllocator(ImmediateAllocator): def __init__(self, queue): super().__init__(queue) self.allocated_nbytes = 0 def __call__(self, size): self.allocated_nbytes += size return super().__call__(size) ctx = ctx_factory() cq = cl.CommandQueue(ctx) knl = lp.make_kernel( "{[i, j]: 0<=i, j<10}", """ tmp1[i] = 2*i {id=w_tmp1} y[i] = tmp1[i] {nosync=w_tmp1} ... gbarrier tmp2[j] = 3*j {id=w_tmp2} z[j] = tmp2[j] {nosync=w_tmp2} """, [lp.TemporaryVariable("tmp1", base_storage="base", address_space=lp.AddressSpace.GLOBAL), lp.TemporaryVariable("tmp2", base_storage="base", address_space=lp.AddressSpace.GLOBAL), ...], seq_dependencies=True) knl = lp.tag_inames(knl, {"i": "g.0", "j": "g.0"}) knl = lp.set_options(knl, return_dict=True) knl = lp.preprocess_kernel(knl) knl = lp.allocate_temporaries_for_base_storage(knl) my_allocator = RecordingAllocator(cq) _, out = knl(cq, allocator=my_allocator) np.testing.assert_allclose(out["y"].get(), 2*np.arange(10)) np.testing.assert_allclose(out["z"].get(), 3*np.arange(10)) assert my_allocator.allocated_nbytes == (40 # base + 40 # y + 40 # z ) @pytest.mark.parametrize("dtype", ["float32", "float64"]) def test_glibc_bessel_functions(dtype): pytest.importorskip("scipy.special") from numpy.random import default_rng from scipy.special import jn, yn # pylint: disable=no-name-in-module from loopy.target.c.c_execution import CCompiler rng = default_rng(0) compiler = CCompiler(cflags=["-O3"]) n = 2 knl = lp.make_kernel( "{[i]: 0<=i<10}", """ first_kind_bessel[i] = bessel_jn(n, x[i]) second_kind_bessel[i] = bessel_yn(n, x[i]) """, target=lp.ExecutableCWithGNULibcTarget(compiler)) if knl.target.compiler.toolchain.cc not in ["gcc", "g++"]: # pylint: disable=no-member pytest.skip("GNU-libc not found.") knl = lp.fix_parameters(knl, n=2) knl = lp.set_options(knl, return_dict=True) knl = lp.set_options(knl, write_code=True) x_in = np.abs(rng.random(10, dtype=dtype)) _, out_dict = knl(x=x_in) np.testing.assert_allclose(jn(n, x_in), out_dict["first_kind_bessel"], rtol=1e-6, atol=1e-6) np.testing.assert_allclose(yn(n, x_in), out_dict["second_kind_bessel"], rtol=1e-6, atol=1e-6) def test_zero_size_temporaries(ctx_factory): """Zero-sized arrays in PyOpenCL allocate as "None". This tests that the invoker is OK with that. """ # https://github.com/inducer/loopy/pull/588 ctx = ctx_factory() cq = cl.CommandQueue(ctx) knl = lp.make_kernel( "{[i]: i > 0 and i < 0}", """ tmp[i] = i a[i] = tmp[i] """, [lp.TemporaryVariable("tmp", address_space=lp.AddressSpace.GLOBAL, shape=(0,)), lp.GlobalArg("a", shape=(0,)), ...]) _evt, (out, ) = knl(cq) assert out.shape == (0,) def test_empty_array_output(ctx_factory): ctx = ctx_factory() cq = cl.CommandQueue(ctx) knl = lp.make_kernel( "{[i]: i > 0 and i < 0}", [], [lp.GlobalArg("a", shape=(0,), dtype=np.float32, is_output=True, is_input=False)]) _evt, (out, ) = knl(cq) assert out.shape == (0,) def test_empty_array_stride_check(ctx_factory): ctx = ctx_factory() cq = cl.CommandQueue(ctx) rng = np.random.default_rng(seed=42) einsum = lp.make_einsum("mij,j->mi", ["a", "x"]) einsum(cq, a=rng.normal(size=(3, 0, 5)), x=rng.normal(size=5)) if einsum.default_entrypoint.options.skip_arg_checks: pytest.skip("args checks disabled, cannot check") with pytest.raises(ValueError): einsum(cq, a=rng.normal(size=(3, 2, 5)).copy(order="F"), x=rng.normal(size=5)) def test_no_op_with_predicate(ctx_factory): ctx = ctx_factory() predicate = prim.Comparison(prim.Variable("a"), ">", 0) knl = lp.make_kernel([], ["<> a = 1", lp.NoOpInstruction(predicates=[predicate])]) code = lp.generate_code_v2(knl).device_code() cl.Program(ctx, code).build() def test_empty_array_stride_check_fortran(ctx_factory): # https://github.com/inducer/loopy/issues/583 ctx = ctx_factory() queue = cl.CommandQueue(ctx) import pyopencl.array as cla a_f = cla.Array(queue, (0, 2), np.float64, order="F") knl = lp.make_kernel( "{ [i,j]: 0<=i<n and 0<=j<m }", "output[i,j] = sqrt(input[i,j])") knl(queue, input=a_f) @pytest.mark.parametrize("with_gbarrier", [False, True]) def test_passing_bajillions_of_svm_args(ctx_factory, with_gbarrier): ctx = ctx_factory() queue = cl.CommandQueue(ctx) from pyopencl.characterize import has_coarse_grain_buffer_svm if not has_coarse_grain_buffer_svm(queue.device): pytest.skip("device does not support SVM, which is required for this test") if with_gbarrier: gbarrier_part = [ # Make this artificially have multiple subkernels to check that # declarations are correctly emitted in that setting as well # https://github.com/inducer/loopy/pull/642#pullrequestreview-1087588248 "z[j] = 0 {id=init_z}", "... gbarrier {dep=init_z,id=gb}" ] dep = "{dep=gb}" else: gbarrier_part = [] dep = "" nargsets = 300 knl = lp.make_kernel( "{[i,j]: 0<=i,j<n}", gbarrier_part + [ f"c{iargset}[i] = a{iargset}[i]+b{iargset}[i] {dep}" for iargset in range(nargsets) ], [ lp.GlobalArg(f"{name}{iargset}", shape=lp.auto, dtype=np.float32) for name in "abc" for iargset in range(nargsets) ] + [...], target=lp.PyOpenCLTarget(limit_arg_size_nbytes=20), options=lp.Options(return_dict=True)) alloc = cl.tools.SVMAllocator( ctx, flags=cl.svm_mem_flags.READ_WRITE, queue=queue) multiplier = 10_000 args = {} for iargset in range(nargsets): args[f"a{iargset}"] = ( cl.array.zeros(queue, 20, np.float32, allocator=alloc) + np.float32(multiplier * iargset)) args[f"b{iargset}"] = ( cl.array.zeros(queue, 20, np.float32, allocator=alloc) + np.float32(iargset)) _evt, res = knl(queue, **args, allocator=alloc) for iargset in range(nargsets): assert (res[f"c{iargset}"].get() == iargset * multiplier + iargset).all() def test_no_uint_in_cuda_code(): # https://github.com/inducer/compyte/pull/44 knl = lp.make_kernel( "{ [i]: 0<=i<n }", "out[i] = a[i] + b[i]", target=lp.CudaTarget()) knl = lp.add_and_infer_dtypes(knl, {"a": np.dtype(np.uint32)}) knl = lp.add_and_infer_dtypes(knl, {"b": np.dtype(np.uint32)}) assert "uint" not in lp.generate_code_v2(knl).device_code() def test_ispc_private_var(): # https://github.com/inducer/loopy/issues/763 knl = lp.make_kernel( "{ [k]: 0<=k<K }", """ <float32> b = 6.0 * float_pos[k] output[k] = 2.0 * b """, [lp.ValueArg("K", is_input=True), lp.GlobalArg("float_pos", np.float32, shape=lp.auto, is_input=True, is_output=False), lp.GlobalArg("output", np.uint8, shape=lp.auto, is_input=False, is_output=True)], target=lp.ISPCTarget(), assumptions="1<K") knl = lp.split_iname(knl, "k", 8, inner_tag="l.0") knl = lp.set_temporary_address_space(knl, "b", "private") cg_result = lp.generate_code_v2(knl) print(cg_result.device_code()) def test_to_complex_casts(ctx_factory): arith_dtypes = "bhilqpBHILQPfdFD" out_type = lp.to_loopy_type(np.dtype(np.complex128)) other = np.complex64(7) from pymbolic import var knl = lp.make_kernel( [], [ lp.Assignment( f"out_{typename}", lp.TypeCast(out_type, var(f"in_{typename}")) + lp.TypeCast(out_type, other) ) for typename in arith_dtypes ], [ lp.GlobalArg(f"in_{typename}", dtype=np.dtype(typename), shape=()) for typename in arith_dtypes ] + [...] ) ctx = ctx_factory() code = lp.generate_code_v2(knl).device_code() # just testing here that the generated code builds cl.Program(ctx, code).build() def test_cl_vectorize_ternary(ctx_factory): knl = lp.make_kernel( "{ [i]: 0<=i<n }", """ b[i] = a[i]*3 if a[i] < 0 else sin(a[i]) """) knl = lp.split_array_axis(knl, "a,b", 0, 4) knl = lp.split_iname(knl, "i", 4) knl = lp.tag_inames(knl, {"i_inner": "vec"}) knl = lp.tag_array_axes(knl, "a,b", "c,vec") knl = lp.set_options(knl, write_code=True) knl = lp.assume(knl, "n % 4 = 0 and n>0") rng = np.random.default_rng(seed=12) a = rng.normal(size=(16, 4)) ctx = ctx_factory() queue = cl.CommandQueue(ctx) _evt, (result,) = knl(queue, a=a, n=a.size) result_ref = np.where(a < 0, a*3, np.sin(a)) assert np.allclose(result, result_ref) def test_float3(): # https://github.com/inducer/loopy/issues/922 knl = lp.make_kernel( "{ [i]: 0<=i<n }", """ out[i] = a if i == 0 else b """ ) vec_size = 3 knl = lp.split_array_axis(knl, "out", 0, vec_size) knl = lp.split_iname(knl, "i", vec_size) knl = lp.tag_inames(knl, {"i_inner": "vec"}) knl = lp.tag_array_axes(knl, "out", "c,vec") knl = lp.assume(knl, f"n % {vec_size} = 0 and n>0") knl = lp.add_and_infer_dtypes(knl, {"a": np.dtype(np.float32), "b": np.dtype(np.float32)}) device_code = lp.generate_code_v2(knl).device_code() assert "float3" in device_code if __name__ == "__main__": import sys if len(sys.argv) > 1: exec(sys.argv[1]) else: from pytest import main main([__file__]) # vim: foldmethod=marker