__copyright__ = "Copyright (C) 2012 Andreas Kloeckner" __license__ = """ Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ import logging import numpy as np import pytest import pymbolic.primitives as prim import pyopencl as cl import pyopencl.clmath import pyopencl.clrandom import pyopencl.tools import pyopencl.version from pyopencl.tools import ( # noqa: F401 pytest_generate_tests_for_pyopencl as pytest_generate_tests, ) import loopy as lp from loopy.diagnostic import LoopyError from loopy.target.c import CTarget from loopy.target.opencl import OpenCLTarget from loopy.version import LOOPY_USE_LANGUAGE_VERSION_2018_2 # noqa: F401 logger = logging.getLogger(__name__) def test_ispc_target(): from loopy.target.ispc import ISPCTarget knl = lp.make_kernel( "{ [i]: 0<=i key2 = make_uint2(i, 324830944) {inames=i} <> key4 = make_uint4(i, 324830944, 234181, 2233) {inames=i} <> ctr = make_uint4(0, 1, 2, 3) {inames=i,id=init_ctr} <> real, ctr = philox4x32_TYPE(ctr, key2) {id=realpart,dep=init_ctr} <> imag, ctr = threefry4x32_TYPE(ctr, key4) {dep=init_ctr:realpart} out[i, 0] = real.s0 + 1j * imag.s0 out[i, 1] = real.s1 + 1j * imag.s1 out[i, 2] = real.s2 + 1j * imag.s2 out[i, 3] = real.s3 + 1j * imag.s3 """.replace("TYPE", tp)) knl = lp.split_iname(knl, "i", 128, outer_tag="g.0", inner_tag="l.0") knl = lp.set_options(knl, write_code=True) _evt, (out,) = knl(queue, n=n) out = out.get() assert (out < 1).all() assert (0 <= out).all() def test_tuple(ctx_factory): ctx = ctx_factory() queue = cl.CommandQueue(ctx) import islpy as isl knl = lp.make_kernel( [isl.BasicSet("[] -> {[]: }")], """ a, b = make_tuple(1, 2.) """) _evt, (a, b) = knl(queue) assert a.get() == 1 assert b.get() == 2. def test_clamp(ctx_factory): ctx = ctx_factory() queue = cl.CommandQueue(ctx) n = 15 * 10**6 x = cl.clrandom.rand(queue, n, dtype=np.float32) knl = lp.make_kernel( "{ [i]: 0<=i ctr = make_uint2(0, 0)", lp.Assignment("a[i]", lp.TypeCast(np.int64, var("ctr")) << var("i"))] ) with pytest.raises(lp.LoopyError): knl = lp.preprocess_kernel(knl) def test_target_invalid_type_cast(): dtype = np.dtype([("", "0") knl = lp.split_iname( knl, "i", 2**18, outer_tag="g.0", slabs=(0, 1)) knl = lp.split_iname(knl, "i_inner", 8, inner_tag="l.0") knl = lp.tag_instructions(knl, "!streaming_store") knl = lp.add_and_infer_dtypes(knl, dict.fromkeys(vars, stream_dtype)) knl = lp.set_argument_order(knl, [*vars, "n"]) lp.generate_code_v2(knl).all_code() assert "streaming_store(" in lp.generate_code_v2(knl).all_code() def test_cuda_short_vector(): knl = lp.make_kernel( "{ [i]: 0<=i tmp1 = 3.1416 <> tmp2 = 0.000 y1 = 1729 if tmp1 else 1.414 y2 = 42 if 2.7183 else 13 y3 = 127 if tmp2 else 128 """, seq_dependencies=True, target=target()) knl = lp.set_options(knl, return_dict=True) if target == lp.PyOpenCLTarget: _evt, out_dict = knl(queue) elif target == lp.ExecutableCTarget: _evt, out_dict = knl() else: raise NotImplementedError("unsupported target") assert out_dict["y1"] == 1729 assert out_dict["y2"] == 42 assert out_dict["y3"] == 128 def test_scalar_array_take_offset(ctx_factory): import pyopencl.array as cla ctx = ctx_factory() cq = cl.CommandQueue(ctx) knl = lp.make_kernel( "{:}", """ y = 133*x """, [lp.GlobalArg("x", shape=(), offset=lp.auto), ...]) x_in_base = cla.arange(cq, 42, dtype=np.int32) x_in = x_in_base[13] _evt, (out,) = knl(cq, x=x_in) np.testing.assert_allclose(out.get(), 1729) @pytest.mark.parametrize("target", [lp.PyOpenCLTarget, lp.ExecutableCTarget]) @pytest.mark.parametrize("dtype", [np.float32, np.float64]) def test_inf_support(ctx_factory, target, dtype): import math from loopy.symbolic import parse # See: https://github.com/inducer/loopy/issues/443 for some laughs ctx = ctx_factory() queue = cl.CommandQueue(ctx) knl = lp.make_kernel( "{:}", [lp.Assignment(parse("out_inf"), math.inf), lp.Assignment(parse("out_neginf"), -math.inf)], [lp.GlobalArg("out_inf", shape=lp.auto, dtype=dtype), lp.GlobalArg("out_neginf", shape=lp.auto, dtype=dtype) ], target=target()) knl = lp.set_options(knl, return_dict=True) if target == lp.PyOpenCLTarget: _, out_dict = knl(queue) out_dict = {k: v.get() for k, v in out_dict.items()} elif target == lp.ExecutableCTarget: _, out_dict = knl() else: raise NotImplementedError("unsupported target") assert np.isinf(out_dict["out_inf"]) assert np.isneginf(out_dict["out_neginf"]) def test_input_args_are_required(ctx_factory): ctx = ctx_factory() queue = cl.CommandQueue(ctx) knl1 = lp.make_kernel( "{ [i]: 0<=i<2 }", """ g[i] = f[i] + 1.5 """, [lp.GlobalArg("f, g", shape=lp.auto, dtype="float64"), ...] ) knl2 = lp.make_kernel( "{ [i]: 0<=i 0 and i < 0}", """ tmp[i] = i a[i] = tmp[i] """, [lp.TemporaryVariable("tmp", address_space=lp.AddressSpace.GLOBAL, shape=(0,)), lp.GlobalArg("a", shape=(0,)), ...]) _evt, (out, ) = knl(cq) assert out.shape == (0,) def test_empty_array_output(ctx_factory): ctx = ctx_factory() cq = cl.CommandQueue(ctx) knl = lp.make_kernel( "{[i]: i > 0 and i < 0}", [], [lp.GlobalArg("a", shape=(0,), dtype=np.float32, is_output=True, is_input=False)]) _evt, (out, ) = knl(cq) assert out.shape == (0,) def test_empty_array_stride_check(ctx_factory): ctx = ctx_factory() cq = cl.CommandQueue(ctx) rng = np.random.default_rng(seed=42) einsum = lp.make_einsum("mij,j->mi", ["a", "x"]) einsum(cq, a=rng.normal(size=(3, 0, 5)), x=rng.normal(size=5)) if einsum.default_entrypoint.options.skip_arg_checks: pytest.skip("args checks disabled, cannot check") with pytest.raises(ValueError): einsum(cq, a=rng.normal(size=(3, 2, 5)).copy(order="F"), x=rng.normal(size=5)) def test_no_op_with_predicate(ctx_factory): ctx = ctx_factory() predicate = prim.Comparison(prim.Variable("a"), ">", 0) knl = lp.make_kernel([], ["<> a = 1", lp.NoOpInstruction(predicates=[predicate])]) code = lp.generate_code_v2(knl).device_code() cl.Program(ctx, code).build() def test_empty_array_stride_check_fortran(ctx_factory): # https://github.com/inducer/loopy/issues/583 ctx = ctx_factory() queue = cl.CommandQueue(ctx) import pyopencl.array as cla a_f = cla.Array(queue, (0, 2), np.float64, order="F") knl = lp.make_kernel( "{ [i,j]: 0<=i b = 6.0 * float_pos[k] output[k] = 2.0 * b """, [lp.ValueArg("K", is_input=True), lp.GlobalArg("float_pos", np.float32, shape=lp.auto, is_input=True, is_output=False), lp.GlobalArg("output", np.uint8, shape=lp.auto, is_input=False, is_output=True)], target=lp.ISPCTarget(), assumptions="10") rng = np.random.default_rng(seed=12) a = rng.normal(size=(16, 4)) ctx = ctx_factory() queue = cl.CommandQueue(ctx) _evt, (result,) = knl(queue, a=a, n=a.size) result_ref = np.where(a < 0, a*3, np.sin(a)) assert np.allclose(result, result_ref) def test_float3(): # https://github.com/inducer/loopy/issues/922 knl = lp.make_kernel( "{ [i]: 0<=i0") knl = lp.add_and_infer_dtypes(knl, {"a": np.dtype(np.float32), "b": np.dtype(np.float32)}) device_code = lp.generate_code_v2(knl).device_code() assert "float3" in device_code if __name__ == "__main__": import sys if len(sys.argv) > 1: exec(sys.argv[1]) else: from pytest import main main([__file__]) # vim: foldmethod=marker