import numpy as np import numpy.linalg as la import pyopencl as cl import pyopencl.array # noqa import pyopencl.tools # noqa import pyopencl.clrandom # noqa import loopy as lp # noqa import sys import logging import pytest from pytest import approx from pyopencl.tools import ( # noqa pytest_generate_tests_for_pyopencl as pytest_generate_tests) from utilities import * class RoeParams: def __init__(self, nvars, ndim, d): self.nvars = nvars self.ndim = ndim self.d = d def mat_bounds(self): return self.nvars, self.nvars def vec_bounds(self): return self.nvars class FluxDerivativeParams: def __init__(self, nvars, ndim, nx, ny, nz): self.nvars = nvars self.ndim = ndim self.nx = nx self.ny = ny self.nz = nz self.nhalo = 3 self.nx_halo = self.nx + 2*self.nhalo self.ny_halo = self.ny + 2*self.nhalo self.nz_halo = self.nz + 2*self.nhalo def state_bounds(self): return self.nvars, self.nx_halo, self.ny_halo, self.nz_halo def flux_bounds(self): return self.nvars, self.ndim, self.nx_halo, self.ny_halo, self.nz_halo def metric_bounds(self): return self.ndim, self.ndim, self.nx_halo, self.ny_halo, self.nz_halo def jacobian_bounds(self): return self.nx_halo, self.ny_halo, self.nz_halo class FluxDerivativeArrays: def __init__(self, states, fluxes, metrics, metric_jacobians): self.states = states self.fluxes = fluxes self.metrics = metrics self.metric_jacobians = metric_jacobians def setup_roe_params(nvars, ndim, direction): dirs = {"x" : 1, "y" : 2, "z" : 3} return RoeParams(nvars, ndim, dirs[direction]) def setup_flux_derivative_params(nvars, ndim, n): return FluxDerivativeParams(nvars, ndim, n, n, n) def setup_empty_array_on_device(queue, shape): return cl.array.empty(queue, shape, dtype=np.float32, order="F") def setup_identity(n): return np.identity(n).astype(np.float32).copy(order="F") def setup_random_array(*shape): return np.random.random_sample(shape).astype(np.float32).copy(order="F") def setup_random_array_on_device(queue, *shape): return cl.array.to_device(queue, setup_random_array(*shape)) def setup_random_flux_derivative_arrays(params): states = setup_random_array(*params.state_bounds()) fluxes = setup_random_array(*params.flux_bounds()) metrics = setup_random_array(*params.metric_bounds()) metric_jacobians = setup_random_array(*params.jacobian_bounds()) return FluxDerivativeArrays(states, fluxes, metrics, metric_jacobians) def setup_random_flux_derivative_arrays_on_device(ctx_factory, params): queue = get_queue(ctx_factory) states = setup_random_array_on_device(queue, *params.state_bounds()) fluxes = setup_random_array_on_device(queue, *params.flux_bounds()) metrics = setup_random_array_on_device(queue, *params.metric_bounds()) metric_jacobians = setup_random_array_on_device(queue, *params.jacobian_bounds()) return FluxDerivativeArrays(states, fluxes, metrics, metric_jacobians) def arrays_from_string(string_arrays): return split_map_to_list(string_arrays, array_from_string, ":") def array_from_string(string_array): if ";" not in string_array: if "," not in string_array: array = array_from_string_1d(string_array) else: array = array_from_string_2d(string_array) else: array = array_from_string_3d(string_array) return array.copy(order="F") def array_from_string_3d(string_array): if string_array[0] == ";": return array_from_string_1d(string_array[1:]).reshape((-1, 1, 1)) else: return np.array(split_map_to_list(string_array, array_from_string_2d, ";")) def array_from_string_2d(string_array): if string_array[0] == ",": return array_from_string_1d(string_array[1:]).reshape((-1, 1)) else: return np.array(split_map_to_list(string_array, array_from_string_1d, ",")) def array_from_string_1d(string_array): if string_array[0] == "i": return np.array(split_map_to_list(string_array[1:], int, " ")) else: return np.array(split_map_to_list(string_array, float, " "), dtype=np.float32) def split_map_to_list(string, map_func, splitter): return list(map(map_func, string.split(splitter))) def with_root_kernel(prg, root_name): # FIXME This is a little less beautiful than it could be new_prg = prg.copy(name=root_name) for name in prg: clbl = new_prg[name] if isinstance(clbl, lp.LoopKernel) and clbl.is_called_from_host: new_prg = new_prg.with_kernel(clbl.copy(is_called_from_host=False)) new_prg = new_prg.with_kernel(prg[root_name].copy(is_called_from_host=True)) return new_prg def kernel_roe_eigensystem(queue, prg, params, states, metrics_frozen): R_dev = setup_empty_array_on_device(queue, params.mat_bounds()) Rinv_dev = setup_empty_array_on_device(queue, params.mat_bounds()) lam_dev = setup_empty_array_on_device(queue, params.vec_bounds()) prg = with_root_kernel(prg, "roe_eigensystem") prg(queue, nvars=params.nvars, ndim=params.ndim, d=params.d, states=states, metrics_frozen=metrics_frozen, R=R_dev, R_inv=Rinv_dev, lambda_roe=lam_dev) return R_dev.get(), Rinv_dev.get(), lam_dev.get() def kernel_mult_mat_vec(queue, prg, alpha, a, b): c_dev = setup_empty_array_on_device(queue, b.shape) prg = with_root_kernel(prg, "mult_mat_vec") prg(queue, a=a, b=b, c=c_dev, alpha=alpha) return c_dev.get() def compare_arrays(a, b): assert a == approx(b) def compare_roe_identity(states, R, Rinv): dState = states[:,1] - states[:,0] compare_arrays(R@(Rinv@dState), dState) def compare_roe_property(states, fluxes, R, Rinv, lam): dState = states[:,1] - states[:,0] dFlux = fluxes[:,1] - fluxes[:,0] temp = Rinv@dState temp = np.multiply(lam, temp) compare_arrays(R@temp, dFlux) @pytest.mark.xfail @pytest.mark.parametrize("states_str,fluxes_str,direction", [ ("2 1,4 1,4 1,4 1,20 5.5", "4 1,11.2 2.6,8 1,8 1,46.4 7.1", "x"), ("2 1,4 1,4 1,4 1,20 5.5", "4 1,8 1,11.2 2.6,8 1,46.4 7.1", "y"), ("2 1,4 1,4 1,4 1,20 5.5", "4 1,8 1,8 1,11.2 2.6,46.4 7.1", "z"), ("1 2,-1 -4,-1 -4,-1 -4,5.5 20", "-1 -4,2.6 11.2,1 8,1 8,-7.1 -46.4", "x"), ("1 2,-1 -4,-1 -4,-1 -4,5.5 20", "-1 -4,1 8,2.6 11.2,1 8,-7.1 -46.4", "y"), ("1 2,-1 -4,-1 -4,-1 -4,5.5 20", "-1 -4,1 8,1 8,2.6 11.2,-7.1 -46.4", "z"), ("2 1,4 1,8 2,12 3,64 11", "4 1,11.2 2.6,16 2,24 3,134.4 12.6", "x"), ("2 1,4 1,8 2,12 3,64 11", "8 2,16 2,35.2 5.6,48 6,268.8 25.2", "y"), ("2 1,4 1,8 2,12 3,64 11", "12 3,24 3,48 6,75.2 10.6,403.2 37.8", "z") ]) def test_roe_uniform_grid(ctx_factory, states_str, fluxes_str, direction): queue = get_queue(ctx_factory) prg = get_weno_program() params = setup_roe_params(nvars=5, ndim=3, direction=direction) states = array_from_string(states_str) metrics_frozen = setup_identity(params.ndim) R, Rinv, lam = kernel_roe_eigensystem(queue, prg, params, states, metrics_frozen) compare_roe_identity(states, R, Rinv) fluxes = array_from_string(fluxes_str) compare_roe_property(states, fluxes, R, Rinv, lam) def test_matvec(ctx_factory): queue = get_queue(ctx_factory) prg = get_weno_program() a = setup_random_array(10, 10) b = setup_random_array(10) c = kernel_mult_mat_vec(queue, prg, alpha=1.0, a=a, b=b) compare_arrays(a@b, c) #@pytest.mark.slow def test_compute_flux_derivatives(ctx_factory): prg = get_weno_program() prg = transform_compute_flux_derivative_basic(prg) lp.auto_test_vs_ref(prg, ctx_factory(), parameters=dict(ndim=3, nvars=5, nx=16, ny=16, nz=16)) #@pytest.mark.slow def test_compute_flux_derivatives_gpu(ctx_factory): prg = get_weno_program() prg = transform_compute_flux_derivative_gpu(get_queue(ctx_factory), prg) lp.auto_test_vs_ref(prg, ctx_factory(), parameters=dict(ndim=3, nvars=5, nx=16, ny=16, nz=16)) # This lets you run 'python test.py test_case(cl._csc)' without pytest. if __name__ == "__main__": if len(sys.argv) > 1: logging.basicConfig(level="INFO") exec(sys.argv[1]) else: pytest.main([__file__])