Skip to content
test.py 9.07 KiB
Newer Older
import numpy as np
import numpy.linalg as la
import pyopencl as cl
import pyopencl.array  # noqa
import pyopencl.tools  # noqa
import pyopencl.clrandom  # noqa
import loopy as lp  # noqa

from pytest import approx
from pyopencl.tools import (  # noqa
        pytest_generate_tests_for_pyopencl
        as pytest_generate_tests)

class RoeParams:
    def __init__(self, nvars, ndim, d):
        self.nvars = nvars
        self.ndim = ndim
        self.d = d

    def mat_bounds(self):
        return self.nvars, self.nvars

    def vec_bounds(self):
        return self.nvars


class FluxDerivativeParams:
    def __init__(self, nvars, ndim, nx, ny, nz):
        self.nvars = nvars
        self.ndim = ndim

        self.nx = nx
        self.ny = ny
        self.nz = nz

        self.nhalo = 3
        self.nx_halo = self.nx + 2*self.nhalo
        self.ny_halo = self.ny + 2*self.nhalo
        self.nz_halo = self.nz + 2*self.nhalo

    def state_bounds(self):
        return self.nvars, self.nx_halo, self.ny_halo, self.nz_halo

    def flux_bounds(self):
        return self.nvars, self.ndim, self.nx_halo, self.ny_halo, self.nz_halo

    def metric_bounds(self):
        return self.ndim, self.ndim, self.nx_halo, self.ny_halo, self.nz_halo

    def jacobian_bounds(self):
        return self.nx_halo, self.ny_halo, self.nz_halo


class FluxDerivativeArrays:
    def __init__(self, states, fluxes, metrics, metric_jacobians):
        self.states = states
        self.fluxes = fluxes
        self.metrics = metrics
        self.metric_jacobians = metric_jacobians


def setup_roe_params(nvars, ndim, direction):
    dirs = {"x" : 1, "y" : 2, "z" : 3}
    return RoeParams(nvars, ndim, dirs[direction])


def setup_flux_derivative_params(nvars, ndim, n):
    return FluxDerivativeParams(nvars, ndim, n, n, n)


def setup_empty_array_on_device(queue, shape):
    return cl.array.empty(queue, shape, dtype=np.float32, order="F")


def setup_identity(n):
    return np.identity(n).astype(np.float32).copy(order="F")


def setup_random_array(*shape):
    return np.random.random_sample(shape).astype(np.float32).copy(order="F")


def setup_random_array_on_device(queue, *shape):
    return cl.array.to_device(queue, setup_random_array(*shape))


def setup_random_flux_derivative_arrays(params):
    states = setup_random_array(*params.state_bounds())
    fluxes = setup_random_array(*params.flux_bounds())
    metrics = setup_random_array(*params.metric_bounds())
    metric_jacobians = setup_random_array(*params.jacobian_bounds())

    return FluxDerivativeArrays(states, fluxes, metrics, metric_jacobians)


def setup_random_flux_derivative_arrays_on_device(ctx_factory, params):
    queue = get_queue(ctx_factory)

    states = setup_random_array_on_device(queue, *params.state_bounds())
    fluxes = setup_random_array_on_device(queue, *params.flux_bounds())
    metrics = setup_random_array_on_device(queue, *params.metric_bounds())
    metric_jacobians = setup_random_array_on_device(queue, *params.jacobian_bounds())

    return FluxDerivativeArrays(states, fluxes, metrics, metric_jacobians)


def arrays_from_string(string_arrays):
    return split_map_to_list(string_arrays, array_from_string, ":")


def array_from_string(string_array):
    if ";" not in string_array:
        if "," not in string_array:
            array = array_from_string_1d(string_array)
        else:
            array = array_from_string_2d(string_array)
    else:
        array = array_from_string_3d(string_array)
    return array.copy(order="F")


def array_from_string_3d(string_array):
    if string_array[0] == ";":
        return array_from_string_1d(string_array[1:]).reshape((-1, 1, 1))
    else:
        return np.array(split_map_to_list(string_array, array_from_string_2d, ";"))


def array_from_string_2d(string_array):
    if string_array[0] == ",":
        return array_from_string_1d(string_array[1:]).reshape((-1, 1))
    else:
        return np.array(split_map_to_list(string_array, array_from_string_1d, ","))


def array_from_string_1d(string_array):
    if string_array[0] == "i":
        return np.array(split_map_to_list(string_array[1:], int, " "))
    else:
        return np.array(split_map_to_list(string_array, float, " "), dtype=np.float32)


def split_map_to_list(string, map_func, splitter):
    return list(map(map_func, string.split(splitter)))


def with_root_kernel(prg, root_name):
    # FIXME This is a little less beautiful than it could be
    new_prg = prg.copy(name=root_name)
    for name in prg:
        clbl = new_prg[name]
        if isinstance(clbl, lp.LoopKernel) and clbl.is_called_from_host:
            new_prg = new_prg.with_kernel(clbl.copy(is_called_from_host=False))

    new_prg = new_prg.with_kernel(prg[root_name].copy(is_called_from_host=True))
    return new_prg


def kernel_roe_eigensystem(queue, prg, params, states, metrics_frozen):
    R_dev = setup_empty_array_on_device(queue, params.mat_bounds())
    Rinv_dev = setup_empty_array_on_device(queue, params.mat_bounds())
    lam_dev = setup_empty_array_on_device(queue, params.vec_bounds())

    prg = with_root_kernel(prg, "roe_eigensystem")
    prg(queue, nvars=params.nvars, ndim=params.ndim, d=params.d,
            states=states, metrics_frozen=metrics_frozen,
            R=R_dev, R_inv=Rinv_dev, lambda_roe=lam_dev)

    return R_dev.get(), Rinv_dev.get(), lam_dev.get()


def kernel_mult_mat_vec(queue, prg, alpha, a, b):
    c_dev = setup_empty_array_on_device(queue, b.shape)

    prg = with_root_kernel(prg, "mult_mat_vec")
    prg(queue, a=a, b=b, c=c_dev, alpha=alpha)

    return c_dev.get()


def compare_arrays(a, b):
    assert a == approx(b)


def compare_roe_identity(states, R, Rinv):
    dState = states[:,1] - states[:,0]
    compare_arrays(R@(Rinv@dState), dState)


def compare_roe_property(states, fluxes, R, Rinv, lam):
    dState = states[:,1] - states[:,0]
    dFlux = fluxes[:,1] - fluxes[:,0]

    temp = Rinv@dState
    temp = np.multiply(lam, temp)
    compare_arrays(R@temp, dFlux)

@pytest.mark.xfail
@pytest.mark.parametrize("states_str,fluxes_str,direction", [
    ("2 1,4 1,4 1,4 1,20 5.5", "4 1,11.2 2.6,8 1,8 1,46.4 7.1", "x"),
    ("2 1,4 1,4 1,4 1,20 5.5", "4 1,8 1,11.2 2.6,8 1,46.4 7.1", "y"),
    ("2 1,4 1,4 1,4 1,20 5.5", "4 1,8 1,8 1,11.2 2.6,46.4 7.1", "z"),
    ("1 2,-1 -4,-1 -4,-1 -4,5.5 20", "-1 -4,2.6 11.2,1 8,1 8,-7.1 -46.4", "x"),
    ("1 2,-1 -4,-1 -4,-1 -4,5.5 20", "-1 -4,1 8,2.6 11.2,1 8,-7.1 -46.4", "y"),
    ("1 2,-1 -4,-1 -4,-1 -4,5.5 20", "-1 -4,1 8,1 8,2.6 11.2,-7.1 -46.4", "z"),
    ("2 1,4 1,8 2,12 3,64 11", "4 1,11.2 2.6,16 2,24 3,134.4 12.6", "x"),
    ("2 1,4 1,8 2,12 3,64 11", "8 2,16 2,35.2 5.6,48 6,268.8 25.2", "y"),
    ("2 1,4 1,8 2,12 3,64 11", "12 3,24 3,48 6,75.2 10.6,403.2 37.8", "z")
    ])
def test_roe_uniform_grid(ctx_factory, states_str, fluxes_str, direction):
    queue = get_queue(ctx_factory)
    prg = get_weno_program()
    params = setup_roe_params(nvars=5, ndim=3, direction=direction)
    states = array_from_string(states_str)
    metrics_frozen = setup_identity(params.ndim)
    R, Rinv, lam = kernel_roe_eigensystem(queue, prg, params, states, metrics_frozen)
    compare_roe_identity(states, R, Rinv)
    fluxes = array_from_string(fluxes_str)
    compare_roe_property(states, fluxes, R, Rinv, lam)
def test_matvec(ctx_factory):
    queue = get_queue(ctx_factory)
    prg = get_weno_program()
    a = setup_random_array(10, 10)
    b = setup_random_array(10)
    c = kernel_mult_mat_vec(queue, prg, alpha=1.0, a=a, b=b)
    compare_arrays(a@b, c)
#@pytest.mark.slow
def test_compute_flux_derivatives(ctx_factory):
    queue = get_queue(ctx_factory)
    prg = get_weno_program()
    prg = transform_compute_flux_derivative_basic(prg)
    params = setup_flux_derivative_params(ndim=3, nvars=5, n=10)
    arrays = setup_random_flux_derivative_arrays(params)
    flux_derivatives_dev = setup_empty_array_on_device(queue, (params.nvars, params.ndim,
        params.nx_halo, params.ny_halo, params.nz_halo))

    prg(queue, nvars=params.nvars, ndim=params.ndim,
            states=arrays.states, fluxes=arrays.fluxes, metrics=arrays.metrics,
            metric_jacobians=arrays.metric_jacobians,
            flux_derivatives=flux_derivatives_dev)
#@pytest.mark.slow
Andreas Klöckner's avatar
Andreas Klöckner committed
def test_compute_flux_derivatives_gpu(ctx_factory):
    queue = get_queue(ctx_factory)
    prg = get_weno_program()
    prg = transform_compute_flux_derivative_gpu(queue, prg)
    params = setup_flux_derivative_params(ndim=3, nvars=5, n=10)
    arrays = setup_random_flux_derivative_arrays_on_device(ctx_factory, params)
    flux_derivatives_dev = setup_empty_array_on_device(queue, (params.nvars, params.ndim,
        params.nx_halo, params.ny_halo, params.nz_halo))

    prg(queue, nvars=params.nvars, ndim=params.ndim,
            states=arrays.states, fluxes=arrays.fluxes, metrics=arrays.metrics,
            metric_jacobians=arrays.metric_jacobians,
            flux_derivatives=flux_derivatives_dev)
Andreas Klöckner's avatar
Andreas Klöckner committed
# This lets you run 'python test.py test_case(cl._csc)' without pytest.
if __name__ == "__main__":
    if len(sys.argv) > 1:
        exec(sys.argv[1])
    else:
        pytest.main([__file__])