-
Closes gh-922
Closes gh-922
test_target.py 26.25 KiB
__copyright__ = "Copyright (C) 2012 Andreas Kloeckner"
__license__ = """
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import logging
import numpy as np
import pytest
import pymbolic.primitives as prim
import pyopencl as cl
import pyopencl.clmath
import pyopencl.clrandom
import pyopencl.tools
import pyopencl.version
from pyopencl.tools import ( # noqa: F401
pytest_generate_tests_for_pyopencl as pytest_generate_tests,
)
import loopy as lp
from loopy.diagnostic import LoopyError
from loopy.target.c import CTarget
from loopy.target.opencl import OpenCLTarget
from loopy.version import LOOPY_USE_LANGUAGE_VERSION_2018_2 # noqa: F401
logger = logging.getLogger(__name__)
def test_ispc_target():
from loopy.target.ispc import ISPCTarget
knl = lp.make_kernel(
"{ [i]: 0<=i<n }",
"out[i] = 2*a[i]",
[
lp.GlobalArg("out,a", np.float32, shape=lp.auto),
"..."
],
target=ISPCTarget())
knl = lp.split_iname(knl, "i", 8, inner_tag="l.0")
knl = lp.split_iname(knl, "i_outer", 4, outer_tag="g.0", inner_tag="ilp")
knl = lp.add_prefetch(knl, "a", ["i_inner", "i_outer_inner"],
default_tag="l.auto")
codegen_result = lp.generate_code_v2(knl)
print(codegen_result.device_code())
print(codegen_result.host_code())
def test_cuda_target():
from loopy.target.cuda import CudaTarget
knl = lp.make_kernel(
"{ [i]: 0<=i<n }",
"out[i] = 2*a[i]",
[
lp.GlobalArg("out,a", np.float32, shape=lp.auto),
"..."
],
target=CudaTarget())
knl = lp.split_iname(knl, "i", 8, inner_tag="l.0")
knl = lp.split_iname(knl, "i_outer", 4, outer_tag="g.0", inner_tag="ilp")
knl = lp.add_prefetch(knl, "a", ["i_inner", "i_outer_inner"],
default_tag="l.auto")
print(
lp.generate_code_v2(
knl).device_code())
def test_generate_c_snippet():
from pymbolic import var
I = var("I") # noqa: N806,E741
f = var("f")
df = var("df")
q_v = var("q_v")
k = var("k")
u = var("u")
from functools import partial
l_sum = partial(lp.Reduction, "sum", allow_simultaneous=True)
Instr = lp.Assignment # noqa: N806
knl = lp.make_kernel(
"{[I, k]: 0<=I<nSpace and 0<=k<nQuad}",
[
Instr(f[I], l_sum(k, q_v[k, I]*u)),
Instr(df[I], l_sum(k, q_v[k, I])),
],
[
lp.GlobalArg("q_v", np.float64, shape="nQuad, nSpace"),
lp.GlobalArg("f,df", np.float64, shape="nSpace"),
lp.ValueArg("u", np.float64),
"...",
],
target=CTarget(),
assumptions="nQuad>=1")
if 0: # enable to play with prefetching
# (prefetch currently requires constant sizes)
knl = lp.fix_parameters(knl, nQuad=5, nSpace=3)
knl = lp.add_prefetch(knl, "q_v", "k,I", default_tag=None)
knl = lp.split_iname(knl, "k", 4, inner_tag="unr", slabs=(0, 1))
knl = lp.prioritize_loops(knl, "I,k_outer,k_inner")
print(lp.generate_code_v2(knl))
@pytest.mark.parametrize("target", [CTarget, OpenCLTarget])
@pytest.mark.parametrize("tp", ["f32", "f64"])
def test_math_function(target, tp):
# Test correct maths functions are generated for C and OpenCL
# backend instead for different data type
data_type = {"f32": np.float32,
"f64": np.float64}[tp]
import pymbolic.primitives as p
i = p.Variable("i")
xi = p.Subscript(p.Variable("x"), i)
yi = p.Subscript(p.Variable("y"), i)
zi = p.Subscript(p.Variable("z"), i)
n = 100
domain = "{[i]: 0<=i<%d}" % n
data = [lp.GlobalArg("x", data_type, shape=(n,)),
lp.GlobalArg("y", data_type, shape=(n,)),
lp.GlobalArg("z", data_type, shape=(n,))]
inst = [lp.Assignment(xi, p.Variable("min")(yi, zi))]
knl = lp.make_kernel(domain, inst, data, target=target())
code = lp.generate_code_v2(knl).device_code()
assert "fmin" in code
if tp == "f32" and target == CTarget:
assert "fminf" in code
else:
assert "fminf" not in code
inst = [lp.Assignment(xi, p.Variable("max")(yi, zi))]
knl = lp.make_kernel(domain, inst, data, target=target())
code = lp.generate_code_v2(knl).device_code()
assert "fmax" in code
if tp == "f32" and target == CTarget:
assert "fmaxf" in code
else:
assert "fmaxf" not in code
@pytest.mark.parametrize("tp", ["f32", "f64"])
def test_random123(ctx_factory, tp):
ctx = ctx_factory()
queue = cl.CommandQueue(ctx)
if cl.version.VERSION < (2016, 2):
pytest.skip("Random123 RNG not supported in PyOpenCL < 2016.2")
n = 150000
knl = lp.make_kernel(
"{ [i]: 0<=i<n }",
"""
<> key2 = make_uint2(i, 324830944) {inames=i}
<> key4 = make_uint4(i, 324830944, 234181, 2233) {inames=i}
<> ctr = make_uint4(0, 1, 2, 3) {inames=i,id=init_ctr}
<> real, ctr = philox4x32_TYPE(ctr, key2) {id=realpart,dep=init_ctr}
<> imag, ctr = threefry4x32_TYPE(ctr, key4) {dep=init_ctr:realpart}
out[i, 0] = real.s0 + 1j * imag.s0
out[i, 1] = real.s1 + 1j * imag.s1
out[i, 2] = real.s2 + 1j * imag.s2
out[i, 3] = real.s3 + 1j * imag.s3
""".replace("TYPE", tp))
knl = lp.split_iname(knl, "i", 128, outer_tag="g.0", inner_tag="l.0")
knl = lp.set_options(knl, write_code=True)
_evt, (out,) = knl(queue, n=n)
out = out.get()
assert (out < 1).all()
assert (0 <= out).all()
def test_tuple(ctx_factory):
ctx = ctx_factory()
queue = cl.CommandQueue(ctx)
import islpy as isl
knl = lp.make_kernel(
[isl.BasicSet("[] -> {[]: }")],
"""
a, b = make_tuple(1, 2.)
""")
_evt, (a, b) = knl(queue)
assert a.get() == 1
assert b.get() == 2.
def test_clamp(ctx_factory):
ctx = ctx_factory()
queue = cl.CommandQueue(ctx)
n = 15 * 10**6
x = cl.clrandom.rand(queue, n, dtype=np.float32)
knl = lp.make_kernel(
"{ [i]: 0<=i<n }",
"out[i] = clamp(x[i], a, b)")
knl = lp.split_iname(knl, "i", 128, outer_tag="g.0", inner_tag="l.0")
knl = lp.set_options(knl, write_code=True)
_evt, (_out,) = knl(queue, x=x, a=np.float32(12), b=np.float32(15))
def test_sized_integer_c_codegen(ctx_factory):
ctx = ctx_factory()
queue = cl.CommandQueue(ctx)
from pymbolic import var
knl = lp.make_kernel(
"{[i]: 0<=i<n}",
[lp.Assignment("a[i]", lp.TypeCast(np.int64, 1) << var("i"))]
)
knl = lp.set_options(knl, write_code=True)
n = 40
_evt, (a,) = knl(queue, n=n)
a_ref = 1 << np.arange(n, dtype=np.int64)
assert np.array_equal(a_ref, a.get())
def test_child_invalid_type_cast():
from pymbolic import var
knl = lp.make_kernel(
"{[i]: 0<=i<n}",
["<> ctr = make_uint2(0, 0)",
lp.Assignment("a[i]", lp.TypeCast(np.int64, var("ctr")) << var("i"))]
)
with pytest.raises(lp.LoopyError):
knl = lp.preprocess_kernel(knl)
def test_target_invalid_type_cast():
dtype = np.dtype([("", "<u4"), ("", "<i4")])
with pytest.raises(lp.LoopyError):
lp.TypeCast(dtype, 1)
def test_ispc_streaming_stores():
stream_dtype = np.float32
index_dtype = np.int32
knl = lp.make_kernel(
"{[i]: 0<=i<n}",
"a[i] = b[i] + scalar * c[i]",
target=lp.ISPCTarget(), index_dtype=index_dtype,
name="stream_triad")
vars = ["a", "b", "c", "scalar"]
knl = lp.assume(knl, "n>0")
knl = lp.split_iname(
knl, "i", 2**18, outer_tag="g.0", slabs=(0, 1))
knl = lp.split_iname(knl, "i_inner", 8, inner_tag="l.0")
knl = lp.tag_instructions(knl, "!streaming_store")
knl = lp.add_and_infer_dtypes(knl, dict.fromkeys(vars, stream_dtype))
knl = lp.set_argument_order(knl, [*vars, "n"])
lp.generate_code_v2(knl).all_code()
assert "streaming_store(" in lp.generate_code_v2(knl).all_code()
def test_cuda_short_vector():
knl = lp.make_kernel(
"{ [i]: 0<=i<n }",
"out[i] = 2*a[i]",
target=lp.CudaTarget())
knl = lp.set_options(knl, write_code=True)
knl = lp.split_iname(knl, "i", 4, slabs=(0, 1), inner_tag="vec")
knl = lp.split_array_axis(knl, "a,out", axis_nr=0, count=4)
knl = lp.tag_array_axes(knl, "a,out", "C,vec")
knl = lp.set_options(knl, write_wrapper=True)
knl = lp.add_and_infer_dtypes(knl, {"a": np.float32})
print(lp.generate_code_v2(knl).device_code())
def test_pyopencl_execution_numpy_handling(ctx_factory):
ctx = ctx_factory()
queue = cl.CommandQueue(ctx)
# test numpy input for x is written to and returned
knl = lp.make_kernel("{:}", ["x[0] = y[0] + x[0]"])
y = np.array([3.])
x = np.array([4.])
_evt, out = knl(queue, y=y, x=x)
assert out[0] is x
assert x[0] == 7.
# test numpy input for x is written to and returned, even when a pyopencl array
# is passed for y
import pyopencl.array as cla
y = cla.zeros(queue, shape=(1), dtype="float64") + 3.
x = np.array([4.])
_evt, out = knl(queue, y=y, x=x)
assert out[0] is x
assert x[0] == 7.
# test numpy input for x is written to and returned, even when output-only
knl = lp.make_kernel("{:}", ["x[0] = y[0] + 2"])
y = np.array([3.])
x = np.array([4.])
_evt, out = knl(queue, y=y, x=x)
assert out[0] is x
assert x[0] == 5.
def test_opencl_support_for_bool(ctx_factory):
knl = lp.make_kernel(
"{[i]: 0<=i<10}",
"""
y[i] = i%2
""",
[lp.GlobalArg("y", dtype=np.bool_, shape=lp.auto)])
cl_ctx = ctx_factory()
_evt, (out, ) = knl(cl.CommandQueue(cl_ctx))
out = out.get()
np.testing.assert_equal(out, np.tile(np.array([0, 1], dtype=np.bool_), 5))
@pytest.mark.parametrize("target", [lp.PyOpenCLTarget, lp.ExecutableCTarget])
def test_nan_support(ctx_factory, target):
from pymbolic.primitives import NaN, Variable
from loopy.symbolic import parse
ctx = ctx_factory()
queue = cl.CommandQueue(ctx)
knl = lp.make_kernel(
"{:}",
[lp.Assignment(parse("a"), np.nan),
lp.Assignment(parse("b"), parse("isnan(a)")),
lp.Assignment(parse("c"), parse("isnan(3.14)")),
lp.Assignment(parse("d"), parse("isnan(0.0)")),
lp.Assignment(parse("e"), NaN(np.float32)),
lp.Assignment(parse("f"), Variable("isnan")(NaN(None))),
lp.Assignment(parse("g"), NaN(np.complex64)),
lp.Assignment(parse("h"), NaN(np.complex128)),
],
[lp.GlobalArg("a", is_input=False, shape=()), ...],
seq_dependencies=True, target=target())
knl = lp.set_options(knl, return_dict=True)
if target == lp.PyOpenCLTarget:
_evt, out_dict = knl(queue)
out_dict = {k: v.get() for k, v in out_dict.items()}
elif target == lp.ExecutableCTarget:
_evt, out_dict = knl()
else:
raise NotImplementedError("unsupported target")
assert np.isnan(out_dict["a"])
assert out_dict["b"] == 1
assert out_dict["c"] == 0
assert out_dict["d"] == 0
assert np.isnan(out_dict["e"])
assert out_dict["e"].dtype == np.float32
assert out_dict["f"] == 1
assert np.isnan(out_dict["g"])
assert out_dict["g"].dtype == np.complex64
assert np.isnan(out_dict["h"])
assert out_dict["h"].dtype == np.complex128
@pytest.mark.parametrize("target", [lp.PyOpenCLTarget, lp.ExecutableCTarget])
def test_emits_ternary_operators_correctly(ctx_factory, target):
# See: https://github.com/inducer/loopy/issues/390
ctx = ctx_factory()
queue = cl.CommandQueue(ctx)
knl = lp.make_kernel(
"{:}",
"""
<> tmp1 = 3.1416
<> tmp2 = 0.000
y1 = 1729 if tmp1 else 1.414
y2 = 42 if 2.7183 else 13
y3 = 127 if tmp2 else 128
""", seq_dependencies=True,
target=target())
knl = lp.set_options(knl, return_dict=True)
if target == lp.PyOpenCLTarget:
_evt, out_dict = knl(queue)
elif target == lp.ExecutableCTarget:
_evt, out_dict = knl()
else:
raise NotImplementedError("unsupported target")
assert out_dict["y1"] == 1729
assert out_dict["y2"] == 42
assert out_dict["y3"] == 128
def test_scalar_array_take_offset(ctx_factory):
import pyopencl.array as cla
ctx = ctx_factory()
cq = cl.CommandQueue(ctx)
knl = lp.make_kernel(
"{:}",
"""
y = 133*x
""",
[lp.GlobalArg("x", shape=(), offset=lp.auto),
...])
x_in_base = cla.arange(cq, 42, dtype=np.int32)
x_in = x_in_base[13]
_evt, (out,) = knl(cq, x=x_in)
np.testing.assert_allclose(out.get(), 1729)
@pytest.mark.parametrize("target", [lp.PyOpenCLTarget, lp.ExecutableCTarget])
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
def test_inf_support(ctx_factory, target, dtype):
import math
from loopy.symbolic import parse
# See: https://github.com/inducer/loopy/issues/443 for some laughs
ctx = ctx_factory()
queue = cl.CommandQueue(ctx)
knl = lp.make_kernel(
"{:}",
[lp.Assignment(parse("out_inf"),
math.inf),
lp.Assignment(parse("out_neginf"),
-math.inf)],
[lp.GlobalArg("out_inf", shape=lp.auto,
dtype=dtype),
lp.GlobalArg("out_neginf", shape=lp.auto,
dtype=dtype)
], target=target())
knl = lp.set_options(knl, return_dict=True)
if target == lp.PyOpenCLTarget:
_, out_dict = knl(queue)
out_dict = {k: v.get() for k, v in out_dict.items()}
elif target == lp.ExecutableCTarget:
_, out_dict = knl()
else:
raise NotImplementedError("unsupported target")
assert np.isinf(out_dict["out_inf"])
assert np.isneginf(out_dict["out_neginf"])
def test_input_args_are_required(ctx_factory):
ctx = ctx_factory()
queue = cl.CommandQueue(ctx)
knl1 = lp.make_kernel(
"{ [i]: 0<=i<2 }",
"""
g[i] = f[i] + 1.5
""",
[lp.GlobalArg("f, g", shape=lp.auto, dtype="float64"), ...]
)
knl2 = lp.make_kernel(
"{ [i]: 0<=i<n }",
"g[i] = 3 * f[i] + g[i]",
)
f = np.zeros(2)
g = np.zeros(2)
for knl in [knl1, knl2]:
with pytest.raises(LoopyError):
_ = knl(queue)
_ = knl(queue, g=g)
_ = knl1(queue, f=f)
_ = knl1(queue, f=f, g=g)
knl = lp.make_kernel(
"{ [i]: 0<=i<2 }",
"""
f[i] = 3.
g[i] = f[i] + 1.5
""",
[lp.GlobalArg("f, g", shape=lp.auto, dtype="float64"), ...]
)
# FIXME: this should not raise!
# https://github.com/inducer/loopy/issues/450
with pytest.raises(LoopyError):
_ = knl(queue)
def test_pyopencl_target_with_global_temps_with_base_storage(ctx_factory):
from pyopencl.tools import ImmediateAllocator
class RecordingAllocator(ImmediateAllocator):
def __init__(self, queue):
super().__init__(queue)
self.allocated_nbytes = 0
def __call__(self, size):
self.allocated_nbytes += size
return super().__call__(size)
ctx = ctx_factory()
cq = cl.CommandQueue(ctx)
knl = lp.make_kernel(
"{[i, j]: 0<=i, j<10}",
"""
tmp1[i] = 2*i {id=w_tmp1}
y[i] = tmp1[i] {nosync=w_tmp1}
... gbarrier
tmp2[j] = 3*j {id=w_tmp2}
z[j] = tmp2[j] {nosync=w_tmp2}
""",
[lp.TemporaryVariable("tmp1",
base_storage="base",
address_space=lp.AddressSpace.GLOBAL),
lp.TemporaryVariable("tmp2",
base_storage="base",
address_space=lp.AddressSpace.GLOBAL),
...],
seq_dependencies=True)
knl = lp.tag_inames(knl, {"i": "g.0", "j": "g.0"})
knl = lp.set_options(knl, return_dict=True)
knl = lp.preprocess_kernel(knl)
knl = lp.allocate_temporaries_for_base_storage(knl)
my_allocator = RecordingAllocator(cq)
_, out = knl(cq, allocator=my_allocator)
np.testing.assert_allclose(out["y"].get(), 2*np.arange(10))
np.testing.assert_allclose(out["z"].get(), 3*np.arange(10))
assert my_allocator.allocated_nbytes == (40 # base
+ 40 # y
+ 40 # z
)
@pytest.mark.parametrize("dtype", ["float32", "float64"])
def test_glibc_bessel_functions(dtype):
pytest.importorskip("scipy.special")
from numpy.random import default_rng
from scipy.special import jn, yn # pylint: disable=no-name-in-module
from loopy.target.c.c_execution import CCompiler
rng = default_rng(0)
compiler = CCompiler(cflags=["-O3"])
n = 2
knl = lp.make_kernel(
"{[i]: 0<=i<10}",
"""
first_kind_bessel[i] = bessel_jn(n, x[i])
second_kind_bessel[i] = bessel_yn(n, x[i])
""", target=lp.ExecutableCWithGNULibcTarget(compiler))
if knl.target.compiler.toolchain.cc not in ["gcc", "g++"]: # pylint: disable=no-member
pytest.skip("GNU-libc not found.")
knl = lp.fix_parameters(knl, n=2)
knl = lp.set_options(knl, return_dict=True)
knl = lp.set_options(knl, write_code=True)
x_in = np.abs(rng.random(10, dtype=dtype))
_, out_dict = knl(x=x_in)
np.testing.assert_allclose(jn(n, x_in), out_dict["first_kind_bessel"],
rtol=1e-6, atol=1e-6)
np.testing.assert_allclose(yn(n, x_in), out_dict["second_kind_bessel"],
rtol=1e-6, atol=1e-6)
def test_zero_size_temporaries(ctx_factory):
"""Zero-sized arrays in PyOpenCL allocate as "None". This tests that the
invoker is OK with that.
"""
# https://github.com/inducer/loopy/pull/588
ctx = ctx_factory()
cq = cl.CommandQueue(ctx)
knl = lp.make_kernel(
"{[i]: i > 0 and i < 0}",
"""
tmp[i] = i
a[i] = tmp[i]
""", [lp.TemporaryVariable("tmp", address_space=lp.AddressSpace.GLOBAL,
shape=(0,)),
lp.GlobalArg("a", shape=(0,)),
...])
_evt, (out, ) = knl(cq)
assert out.shape == (0,)
def test_empty_array_output(ctx_factory):
ctx = ctx_factory()
cq = cl.CommandQueue(ctx)
knl = lp.make_kernel(
"{[i]: i > 0 and i < 0}",
[],
[lp.GlobalArg("a", shape=(0,), dtype=np.float32,
is_output=True, is_input=False)])
_evt, (out, ) = knl(cq)
assert out.shape == (0,)
def test_empty_array_stride_check(ctx_factory):
ctx = ctx_factory()
cq = cl.CommandQueue(ctx)
rng = np.random.default_rng(seed=42)
einsum = lp.make_einsum("mij,j->mi", ["a", "x"])
einsum(cq, a=rng.normal(size=(3, 0, 5)), x=rng.normal(size=5))
if einsum.default_entrypoint.options.skip_arg_checks:
pytest.skip("args checks disabled, cannot check")
with pytest.raises(ValueError):
einsum(cq, a=rng.normal(size=(3, 2, 5)).copy(order="F"), x=rng.normal(size=5))
def test_no_op_with_predicate(ctx_factory):
ctx = ctx_factory()
predicate = prim.Comparison(prim.Variable("a"), ">", 0)
knl = lp.make_kernel([],
["<> a = 1", lp.NoOpInstruction(predicates=[predicate])])
code = lp.generate_code_v2(knl).device_code()
cl.Program(ctx, code).build()
def test_empty_array_stride_check_fortran(ctx_factory):
# https://github.com/inducer/loopy/issues/583
ctx = ctx_factory()
queue = cl.CommandQueue(ctx)
import pyopencl.array as cla
a_f = cla.Array(queue, (0, 2), np.float64, order="F")
knl = lp.make_kernel(
"{ [i,j]: 0<=i<n and 0<=j<m }",
"output[i,j] = sqrt(input[i,j])")
knl(queue, input=a_f)
@pytest.mark.parametrize("with_gbarrier", [False, True])
def test_passing_bajillions_of_svm_args(ctx_factory, with_gbarrier):
ctx = ctx_factory()
queue = cl.CommandQueue(ctx)
from pyopencl.characterize import has_coarse_grain_buffer_svm
if not has_coarse_grain_buffer_svm(queue.device):
pytest.skip("device does not support SVM, which is required for this test")
if with_gbarrier:
gbarrier_part = [
# Make this artificially have multiple subkernels to check that
# declarations are correctly emitted in that setting as well
# https://github.com/inducer/loopy/pull/642#pullrequestreview-1087588248
"z[j] = 0 {id=init_z}",
"... gbarrier {dep=init_z,id=gb}"
]
dep = "{dep=gb}"
else:
gbarrier_part = []
dep = ""
nargsets = 300
knl = lp.make_kernel(
"{[i,j]: 0<=i,j<n}",
gbarrier_part + [
f"c{iargset}[i] = a{iargset}[i]+b{iargset}[i] {dep}"
for iargset in range(nargsets)
], [
lp.GlobalArg(f"{name}{iargset}", shape=lp.auto, dtype=np.float32)
for name in "abc"
for iargset in range(nargsets)
] + [...],
target=lp.PyOpenCLTarget(limit_arg_size_nbytes=20),
options=lp.Options(return_dict=True))
alloc = cl.tools.SVMAllocator(
ctx, flags=cl.svm_mem_flags.READ_WRITE, queue=queue)
multiplier = 10_000
args = {}
for iargset in range(nargsets):
args[f"a{iargset}"] = (
cl.array.zeros(queue, 20, np.float32, allocator=alloc)
+ np.float32(multiplier * iargset))
args[f"b{iargset}"] = (
cl.array.zeros(queue, 20, np.float32, allocator=alloc)
+ np.float32(iargset))
_evt, res = knl(queue, **args, allocator=alloc)
for iargset in range(nargsets):
assert (res[f"c{iargset}"].get() == iargset * multiplier + iargset).all()
def test_no_uint_in_cuda_code():
# https://github.com/inducer/compyte/pull/44
knl = lp.make_kernel(
"{ [i]: 0<=i<n }",
"out[i] = a[i] + b[i]", target=lp.CudaTarget())
knl = lp.add_and_infer_dtypes(knl, {"a": np.dtype(np.uint32)})
knl = lp.add_and_infer_dtypes(knl, {"b": np.dtype(np.uint32)})
assert "uint" not in lp.generate_code_v2(knl).device_code()
def test_ispc_private_var():
# https://github.com/inducer/loopy/issues/763
knl = lp.make_kernel(
"{ [k]: 0<=k<K }",
"""
<float32> b = 6.0 * float_pos[k]
output[k] = 2.0 * b
""", [lp.ValueArg("K", is_input=True),
lp.GlobalArg("float_pos", np.float32, shape=lp.auto,
is_input=True, is_output=False),
lp.GlobalArg("output", np.uint8, shape=lp.auto, is_input=False,
is_output=True)],
target=lp.ISPCTarget(), assumptions="1<K")
knl = lp.split_iname(knl, "k", 8, inner_tag="l.0")
knl = lp.set_temporary_address_space(knl, "b", "private")
cg_result = lp.generate_code_v2(knl)
print(cg_result.device_code())
def test_to_complex_casts(ctx_factory):
arith_dtypes = "bhilqpBHILQPfdFD"
out_type = lp.to_loopy_type(np.dtype(np.complex128))
other = np.complex64(7)
from pymbolic import var
knl = lp.make_kernel(
[],
[
lp.Assignment(
f"out_{typename}",
lp.TypeCast(out_type, var(f"in_{typename}"))
+
lp.TypeCast(out_type, other)
)
for typename in arith_dtypes
],
[
lp.GlobalArg(f"in_{typename}", dtype=np.dtype(typename), shape=())
for typename in arith_dtypes
] + [...]
)
ctx = ctx_factory()
code = lp.generate_code_v2(knl).device_code()
# just testing here that the generated code builds
cl.Program(ctx, code).build()
def test_cl_vectorize_ternary(ctx_factory):
knl = lp.make_kernel(
"{ [i]: 0<=i<n }",
"""
b[i] = a[i]*3 if a[i] < 0 else sin(a[i])
""")
knl = lp.split_array_axis(knl, "a,b", 0, 4)
knl = lp.split_iname(knl, "i", 4)
knl = lp.tag_inames(knl, {"i_inner": "vec"})
knl = lp.tag_array_axes(knl, "a,b", "c,vec")
knl = lp.set_options(knl, write_code=True)
knl = lp.assume(knl, "n % 4 = 0 and n>0")
rng = np.random.default_rng(seed=12)
a = rng.normal(size=(16, 4))
ctx = ctx_factory()
queue = cl.CommandQueue(ctx)
_evt, (result,) = knl(queue, a=a, n=a.size)
result_ref = np.where(a < 0, a*3, np.sin(a))
assert np.allclose(result, result_ref)
def test_float3():
# https://github.com/inducer/loopy/issues/922
knl = lp.make_kernel(
"{ [i]: 0<=i<n }",
"""
out[i] = a if i == 0 else b
"""
)
vec_size = 3
knl = lp.split_array_axis(knl, "out", 0, vec_size)
knl = lp.split_iname(knl, "i", vec_size)
knl = lp.tag_inames(knl, {"i_inner": "vec"})
knl = lp.tag_array_axes(knl, "out", "c,vec")
knl = lp.assume(knl, f"n % {vec_size} = 0 and n>0")
knl = lp.add_and_infer_dtypes(knl,
{"a": np.dtype(np.float32), "b": np.dtype(np.float32)})
device_code = lp.generate_code_v2(knl).device_code()
assert "float3" in device_code
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
exec(sys.argv[1])
else:
from pytest import main
main([__file__])
# vim: foldmethod=marker