Skip to content
test.py 3.44 KiB
Newer Older
import numpy as np
import numpy.linalg as la
import pyopencl as cl
import pyopencl.array  # noqa
import pyopencl.clrandom  # noqa
import loopy as lp
import sys

from pyopencl.tools import (  # noqa
        pytest_generate_tests_for_pyopencl
        as pytest_generate_tests)
Timothy A. Smith's avatar
Timothy A. Smith committed
from pytest import approx
class LoopyFixture:
    _WENO_PRG = []
    _QUEUE = []
    def __init__(self):
        self.prg = self.get_weno_program()

    def get_weno_program(self):
        if self._WENO_PRG:
            return self._WENO_PRG[0]
        fn = "WENO.F90"
        with open(fn, "r") as infile:
            infile_content = infile.read()
        prg = lp.parse_transformed_fortran(infile_content, filename=fn)
        self._WENO_PRG.append(prg)
        return prg
    def with_root_kernel(self, prg, root_name):
        # FIXME This is a little less beautiful than it could be
        new_prg = prg.copy(name=root_name)
        for name in prg:
            clbl = new_prg[name]
            if isinstance(clbl, lp.LoopKernel) and clbl.is_called_from_host:
                new_prg = new_prg.with_kernel(clbl.copy(is_called_from_host=False))
        new_prg = new_prg.with_kernel(prg[root_name].copy(is_called_from_host=True))
        return new_prg
    def get_queue(self, ctx_factory):
        if not self._QUEUE:
            ctx = ctx_factory()
            self._QUEUE.append(cl.CommandQueue(ctx))
        return self._QUEUE[0]
    def random_array(self, *args):
        return np.random.random_sample(args).astype(np.float32).copy(order="F")

    def mult_mat_vec(self, ctx_factory, alpha, a, b):
        queue = self.get_queue(ctx_factory)
        c_dev = cl.array.empty(queue, 10, dtype=np.float32)
        prg = self.with_root_kernel(self.prg, "mult_mat_vec")
        prg(queue, a=a, b=b, c=c_dev, alpha=alpha)
        return c_dev.get()

    def compute_flux_derivatives(self, ctx_factory,
            nvars, ndim, nx, ny, nz,
            states, fluxes, metrics, metric_jacobians):

        queue = self.get_queue(ctx_factory)

        prg = lp.fix_parameters(self.prg, nx=nx, ny=ny, nz=nz, _remove=False)

        flux_derivatives_dev = cl.array.empty(queue, (nvars, ndim, nx+6, ny+6,
            nz+6), dtype=np.float32, order="F")

        prg(queue, nvars=nvars, ndim=ndim, nx=nx, ny=ny, nz=nz,
                states=states, fluxes=fluxes, metrics=metrics,
                metric_jacobians=metric_jacobians,
                flux_derivatives=flux_derivatives_dev)
        return flux_derivatives_dev.get()
f = LoopyFixture()

def test_matvec(ctx_factory):
    a = f.random_array(10, 10)
    b = f.random_array(10)
    c = f.mult_mat_vec(ctx_factory, a=a, b=b, alpha=1.0)

    assert la.norm(a@b - c, 2)/la.norm(c) < 1e-5
def test_compute_flux_derivatives(ctx_factory):
    states = f.random_array(nvars, nx+6, ny+6, nz+6)
    fluxes = f.random_array(nvars, ndim, nx+6, ny+6, nz+6)
    metrics = f.random_array(ndim, ndim, nx+6, ny+6, nz+6)
    metric_jacobians = f.random_array(nx+6, ny+6, nz+6)
    flux_derivatives = f.compute_flux_derivatives(ctx_factory,
            nvars=nvars, ndim=ndim, nx=nx, ny=ny, nz=nz,
            states=states, fluxes=fluxes, metrics=metrics,
            metric_jacobians=metric_jacobians)
# This lets you run 'python test.py test_case(cl._csc)' without pytest.

if __name__ == "__main__":
    if len(sys.argv) > 1:
        exec(sys.argv[1])
    else:
        from pytest import main
        main([__file__])