import numpy as np import numpy.linalg as la import pyopencl as cl import pyopencl.array # noqa import pyopencl.tools # noqa import pyopencl.clrandom # noqa import loopy as lp # noqa import sys import logging import pytest from pytest import approx from pyopencl.tools import ( # noqa pytest_generate_tests_for_pyopencl as pytest_generate_tests) from utilities import * @pytest.mark.xfail @pytest.mark.parametrize("states_str,fluxes_str,direction", [ ("2 1,4 1,4 1,4 1,20 5.5", "4 1,11.2 2.6,8 1,8 1,46.4 7.1", "x"), ("2 1,4 1,4 1,4 1,20 5.5", "4 1,8 1,11.2 2.6,8 1,46.4 7.1", "y"), ("2 1,4 1,4 1,4 1,20 5.5", "4 1,8 1,8 1,11.2 2.6,46.4 7.1", "z"), ("1 2,-1 -4,-1 -4,-1 -4,5.5 20", "-1 -4,2.6 11.2,1 8,1 8,-7.1 -46.4", "x"), ("1 2,-1 -4,-1 -4,-1 -4,5.5 20", "-1 -4,1 8,2.6 11.2,1 8,-7.1 -46.4", "y"), ("1 2,-1 -4,-1 -4,-1 -4,5.5 20", "-1 -4,1 8,1 8,2.6 11.2,-7.1 -46.4", "z"), ("2 1,4 1,8 2,12 3,64 11", "4 1,11.2 2.6,16 2,24 3,134.4 12.6", "x"), ("2 1,4 1,8 2,12 3,64 11", "8 2,16 2,35.2 5.6,48 6,268.8 25.2", "y"), ("2 1,4 1,8 2,12 3,64 11", "12 3,24 3,48 6,75.2 10.6,403.2 37.8", "z") ]) def test_roe_uniform_grid(ctx_factory, states_str, fluxes_str, direction): class RoeParams: def __init__(self, nvars, ndim, d): self.nvars = nvars self.ndim = ndim self.d = d def mat_bounds(self): return self.nvars, self.nvars def vec_bound(self): return self.nvars def setup_roe_params(nvars, ndim, direction): dirs = {"x" : 1, "y" : 2, "z" : 3} return RoeParams(nvars, ndim, dirs[direction]) def identity_matrix(n): return np.identity(n).astype(np.float32).copy(order="F") def kernel_roe_eigensystem(queue, prg, params, states, metrics_frozen): R_dev = empty_array_on_device(queue, *params.mat_bounds()) Rinv_dev = empty_array_on_device(queue, *params.mat_bounds()) lam_dev = empty_array_on_device(queue, params.vec_bound()) prg = with_root_kernel(prg, "roe_eigensystem") prg(queue, nvars=params.nvars, ndim=params.ndim, d=params.d, states=states, metrics_frozen=metrics_frozen, R=R_dev, R_inv=Rinv_dev, lambda_roe=lam_dev) return R_dev.get(), Rinv_dev.get(), lam_dev.get() def check_roe_identity(states, R, Rinv): dState = states[:,1] - states[:,0] compare_arrays(R@(Rinv@dState), dState) def check_roe_property(states, fluxes, R, Rinv, lam): dState = states[:,1] - states[:,0] dFlux = fluxes[:,1] - fluxes[:,0] temp = Rinv@dState temp = np.multiply(lam, temp) compare_arrays(R@temp, dFlux) queue = get_queue(ctx_factory) prg = get_weno_program() params = setup_roe_params(nvars=5, ndim=3, direction=direction) states = array_from_string(states_str) metrics_frozen = identity_matrix(params.ndim) R, Rinv, lam = kernel_roe_eigensystem(queue, prg, params, states, metrics_frozen) check_roe_identity(states, R, Rinv) fluxes = array_from_string(fluxes_str) check_roe_property(states, fluxes, R, Rinv, lam) def test_matvec(ctx_factory): prg = get_weno_program() queue = get_queue(ctx_factory) a = random_array_on_device(queue, 10, 10) b = random_array_on_device(queue, 10) c = empty_array_on_device(queue, 10) prg = with_root_kernel(prg, "mult_mat_vec") prg(queue, alpha=1.0, a=a, b=b, c=c) compare_arrays(a.get()@b.get(), c.get()) @pytest.mark.slow def test_compute_flux_derivatives(ctx_factory): prg = get_weno_program() queue = get_queue(ctx_factory) prg = prg.copy(target=lp.PyOpenCLTarget(queue.device)) lp.auto_test_vs_ref(prg, ctx_factory(), warmup_rounds=1, parameters=dict(ndim=3, nvars=5, nx=16, ny=16, nz=16)) @pytest.mark.slow def test_compute_flux_derivatives_gpu(ctx_factory, write_code=False): prg = get_weno_program() prg = transform_weno_for_gpu(prg) queue = get_queue(ctx_factory) prg = prg.copy(target=lp.PyOpenCLTarget(queue.device)) prg = lp.set_options(prg, no_numpy=True) if write_code: write_target_device_code(prg) lp.auto_test_vs_ref(prg, ctx_factory(), warmup_rounds=1, parameters=dict(ndim=3, nvars=5, nx=16, ny=16, nz=16)) # This lets you run 'python test.py test_case(cl._csc)' without pytest. if __name__ == "__main__": if len(sys.argv) > 1: logging.basicConfig(level="INFO") exec(sys.argv[1]) else: pytest.main([__file__])