Newer
Older
import numpy as np
import numpy.linalg as la
import pyopencl as cl
import pyopencl.array # noqa
import pyopencl.tools # noqa
import pyopencl.clrandom # noqa
import loopy as lp # noqa
Timothy A. Smith
committed
import pytest
from pyopencl.tools import ( # noqa
pytest_generate_tests_for_pyopencl
as pytest_generate_tests)
Timothy A. Smith
committed
from utilities import *
@pytest.mark.parametrize("states_str,fluxes_str,direction", [
("2 1,4 1,4 1,4 1,20 5.5", "4 1,11.2 2.6,8 1,8 1,46.4 7.1", "x"),
("2 1,4 1,4 1,4 1,20 5.5", "4 1,8 1,11.2 2.6,8 1,46.4 7.1", "y"),
("2 1,4 1,4 1,4 1,20 5.5", "4 1,8 1,8 1,11.2 2.6,46.4 7.1", "z"),
("1 2,-1 -4,-1 -4,-1 -4,5.5 20", "-1 -4,2.6 11.2,1 8,1 8,-7.1 -46.4", "x"),
("1 2,-1 -4,-1 -4,-1 -4,5.5 20", "-1 -4,1 8,2.6 11.2,1 8,-7.1 -46.4", "y"),
("1 2,-1 -4,-1 -4,-1 -4,5.5 20", "-1 -4,1 8,1 8,2.6 11.2,-7.1 -46.4", "z"),
("2 1,4 1,8 2,12 3,64 11", "4 1,11.2 2.6,16 2,24 3,134.4 12.6", "x"),
("2 1,4 1,8 2,12 3,64 11", "8 2,16 2,35.2 5.6,48 6,268.8 25.2", "y"),
("2 1,4 1,8 2,12 3,64 11", "12 3,24 3,48 6,75.2 10.6,403.2 37.8", "z")
])
def test_roe_uniform_grid(ctx_factory, states_str, fluxes_str, direction):
Timothy A. Smith
committed
class RoeParams:
def __init__(self, nvars, ndim, d):
self.nvars = nvars
self.ndim = ndim
self.d = d
def mat_bounds(self):
return self.nvars, self.nvars
Timothy A. Smith
committed
def vec_bound(self):
Timothy A. Smith
committed
return self.nvars
def setup_roe_params(nvars, ndim, direction):
dirs = {"x" : 1, "y" : 2, "z" : 3}
return RoeParams(nvars, ndim, dirs[direction])
def identity_matrix(n):
return np.identity(n).astype(np.float32).copy(order="F")
def kernel_roe_eigensystem(queue, prg, params, states, metrics_frozen):
Timothy A. Smith
committed
R_dev = empty_array_on_device(queue, *params.mat_bounds())
Rinv_dev = empty_array_on_device(queue, *params.mat_bounds())
lam_dev = empty_array_on_device(queue, params.vec_bound())
Timothy A. Smith
committed
prg = with_root_kernel(prg, "roe_eigensystem")
prg(queue, nvars=params.nvars, ndim=params.ndim, d=params.d,
states=states, metrics_frozen=metrics_frozen,
R=R_dev, R_inv=Rinv_dev, lambda_roe=lam_dev)
return R_dev.get(), Rinv_dev.get(), lam_dev.get()
def check_roe_identity(states, R, Rinv):
dState = states[:,1] - states[:,0]
compare_arrays(R@(Rinv@dState), dState)
def check_roe_property(states, fluxes, R, Rinv, lam):
dState = states[:,1] - states[:,0]
dFlux = fluxes[:,1] - fluxes[:,0]
temp = Rinv@dState
temp = np.multiply(lam, temp)
compare_arrays(R@temp, dFlux)
queue = get_queue(ctx_factory)
prg = get_weno_program()
params = setup_roe_params(nvars=5, ndim=3, direction=direction)
states = array_from_string(states_str)
Timothy A. Smith
committed
metrics_frozen = identity_matrix(params.ndim)
R, Rinv, lam = kernel_roe_eigensystem(queue, prg, params, states, metrics_frozen)
Timothy A. Smith
committed
check_roe_identity(states, R, Rinv)
Timothy A. Smith
committed
check_roe_property(states, fluxes, R, Rinv, lam)
a = random_array_on_device(queue, 10, 10)
b = random_array_on_device(queue, 10)
prg = with_root_kernel(prg, "mult_mat_vec")
prg(queue, alpha=1.0, a=a, b=b, c=c)
compare_arrays(a.get()@b.get(), c.get())
Kaushik Kulkarni
committed
def test_compute_flux_derivatives(ctx_factory):
Timothy A. Smith
committed
queue = get_queue(ctx_factory)
prg = prg.copy(target=lp.PyOpenCLTarget(queue.device))
lp.auto_test_vs_ref(prg, ctx_factory(), warmup_rounds=1,
parameters=dict(ndim=3, nvars=5, nx=16, ny=16, nz=16))
Kaushik Kulkarni
committed
def test_compute_flux_derivatives_gpu(ctx_factory, write_code=False):
prg = transform_weno_for_gpu(prg)
queue = get_queue(ctx_factory)
prg = prg.copy(target=lp.PyOpenCLTarget(queue.device))
prg = lp.set_options(prg, no_numpy=True)
if write_code:
write_to_cl(prg)
lp.auto_test_vs_ref(prg, ctx_factory(), warmup_rounds=1,
parameters=dict(ndim=3, nvars=5, nx=16, ny=16, nz=16))
# This lets you run 'python test.py test_case(cl._csc)' without pytest.
if __name__ == "__main__":
if len(sys.argv) > 1:
Timothy A. Smith
committed
logging.basicConfig(level="INFO")
exec(sys.argv[1])
else: