diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index a6790524ec737d52f3507bb74f84a13aad07f548..4c1545e1b1011e0f35b3aa2316a9cdde464a4304 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -137,6 +137,7 @@ jobs:
                 cd "$DOWNSTREAM_PROJECT"
                 echo "*** $DOWNSTREAM_PROJECT version: $(git rev-parse --short HEAD)"
 
+                # Use this version of arraycontext instead of what downstream would install
                 sed -i "/egg=arraycontext/ c git+file://$(readlink -f ..)#egg=arraycontext" requirements.txt
 
                 # Avoid slow or complicated tests in downstream projects
diff --git a/arraycontext/__init__.py b/arraycontext/__init__.py
index e5ae4a01d680396a0f25abc8329d247c039a3898..a338059d1caa3e27d680321b5e4c58da51308aa3 100644
--- a/arraycontext/__init__.py
+++ b/arraycontext/__init__.py
@@ -56,6 +56,7 @@ from .container.traversal import (
         from_numpy, to_numpy)
 
 from .impl.pyopencl import PyOpenCLArrayContext
+from .impl.pytato import PytatoPyOpenCLArrayContext
 
 from .pytest import (
         PytestPyOpenCLArrayContextFactory,
@@ -85,7 +86,7 @@ __all__ = (
         "thaw", "freeze",
         "from_numpy", "to_numpy",
 
-        "PyOpenCLArrayContext",
+        "PyOpenCLArrayContext", "PytatoPyOpenCLArrayContext",
 
         "make_loopy_program",
 
diff --git a/arraycontext/container/traversal.py b/arraycontext/container/traversal.py
index 5dd68afaeb13b778bad7a3b829c3ee1d0b0166e8..70babe6ad0eb31cfcd577c94c211d1a81719fdd3 100644
--- a/arraycontext/container/traversal.py
+++ b/arraycontext/container/traversal.py
@@ -48,7 +48,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 THE SOFTWARE.
 """
 
-from typing import Any, Callable, List, Optional
+from typing import Any, Callable, List, Optional, Union, Tuple
 from functools import update_wrapper, partial, singledispatch
 
 import numpy as np
@@ -101,14 +101,14 @@ def _multimap_array_container_impl(
     """
     def rec(*_args: Any) -> Any:
         template_ary = _args[container_indices[0]]
-        assert all(
-                type(_args[i]) is type(template_ary) for i in container_indices[1:]
-                ), f"expected type '{type(template_ary).__name__}'"
-
         if (type(template_ary) is leaf_cls
                 or not is_array_container(template_ary)):
             return f(*_args)
 
+        assert all(
+                type(_args[i]) is type(template_ary) for i in container_indices[1:]
+                ), f"expected type '{type(template_ary).__name__}'"
+
         result = []
         new_args = list(_args)
 
@@ -233,6 +233,49 @@ def multimapped_over_array_containers(
     update_wrapper(wrapper, f)
     return wrapper
 
+
+def keyed_map_array_container(f: Callable[[Any, Any], Any],
+                              ary: ArrayContainerT) -> ArrayContainerT:
+    r"""Applies *f* to all components of an :class:`ArrayContainer`.
+
+    Works similarly to :func:`map_array_container`, but *f* also takes an
+    identifier of the array in the container *ary*.
+
+    For a recursive version, see :func:`rec_keyed_map_array_container`.
+
+    :param ary: a (potentially nested) structure of :class:`ArrayContainer`\ s,
+        or an instance of a base array type.
+    """
+    if is_array_container(ary):
+        return deserialize_container(ary, [
+                (key, f(key, subary)) for key, subary in serialize_container(ary)
+                ])
+    else:
+        raise ValueError("Not an array-container, i.e. unknown key to pass.")
+
+
+def rec_keyed_map_array_container(f: Callable[[Tuple[Any, ...], Any], Any],
+                                  ary: ArrayContainerT) -> ArrayContainerT:
+    """
+    Works similarly to :func:`rec_map_array_container`, except that *f* also
+    takes in a traversal path to the leaf array. The traversal path argument is
+    passed in as a tuple of identifiers of the arrays traversed before reaching
+    the current array.
+    """
+
+    def rec(keys: Tuple[Union[str, int], ...],
+            _ary: ArrayContainerT) -> ArrayContainerT:
+        if is_array_container(_ary):
+
+            return deserialize_container(_ary, [
+                    (key, rec(keys+(key,), subary))
+                    for key, subary in serialize_container(_ary)
+                    ])
+        else:
+            return f(keys, _ary)
+
+    return rec((), ary)
+
 # }}}
 
 
diff --git a/arraycontext/context.py b/arraycontext/context.py
index 58c4299e9df667bb9c2f710d0108a034ec116592..13ce197ab2f02a62c8fe0a221c618dc3f27555b2 100644
--- a/arraycontext/context.py
+++ b/arraycontext/context.py
@@ -102,7 +102,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 THE SOFTWARE.
 """
 
-from typing import Sequence, Union
+from typing import Sequence, Union, Callable, Any
 from abc import ABC, abstractmethod, abstractproperty
 
 import numpy as np
@@ -148,6 +148,7 @@ class ArrayContext(ABC):
     .. automethod:: thaw
     .. automethod:: tag
     .. automethod:: tag_axis
+    .. automethod:: compile
     """
 
     def __init__(self):
@@ -193,7 +194,7 @@ class ArrayContext(ABC):
         """Execute the :mod:`loopy` program *program* on the arguments
         *kwargs*.
 
-        *program* is a :class:`loopy.LoopKernel` or :class:`loopy.LoopKernel`.
+        *program* is a :class:`loopy.LoopKernel` or :class:`loopy.TranslationUnit`.
         It is expected to not yet be transformed for execution speed.
         It must have :attr:`loopy.Options.return_dict` set.
 
@@ -322,6 +323,26 @@ class ArrayContext(ABC):
             "setup-only" array context "leaks" into the application.
         """
 
+    def compile(self, f: Callable[..., Any]) -> Callable[..., Any]:
+        """Compiles *f* for repeated use on this array context. *f* is expected
+        to be a `pure function <https://en.wikipedia.org/wiki/Pure_function>`__
+        performing an array computation.
+
+        Control flow statements (``if``, ``while``) that might take different
+        paths depending on the data lead to undefined behavior and are illegal.
+        Any data-dependent control flow must be expressed via array functions,
+        such as ``actx.np.where``.
+
+        *f* may be called on placeholder data, to obtain a representation
+        of the computation performed, or it may be called as part of the actual
+        computation, on actual data. If *f* is called on placeholder data,
+        it may be called only once (or a few times).
+
+        :arg f: the function executing the computation.
+        :return: a function with the same signature as *f*.
+        """
+        return f
+
     # undocumented for now
     @abstractproperty
     def permits_inplace_modification(self):
diff --git a/arraycontext/impl/pytato/__init__.py b/arraycontext/impl/pytato/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..b6f035e4132a79d06276f5c16cc04fdfee04715c
--- /dev/null
+++ b/arraycontext/impl/pytato/__init__.py
@@ -0,0 +1,183 @@
+"""
+.. currentmodule:: arraycontext
+
+A :mod:`pytato`-based array context defers the evaluation of an array until its
+frozen. The execution contexts for the evaluations are specific to an
+:class:`~arraycontext.ArrayContext` type. For ex.
+:class:`~arraycontext.PytatoPyOpenCLArrayContext` uses :mod:`pyopencl` to
+JIT-compile and execute the array expressions.
+
+Following :mod:`pytato`-based array context are provided:
+
+.. autoclass:: PytatoPyOpenCLArrayContext
+
+
+Compiling a python callable
+^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. automodule:: arraycontext.impl.pytato.compile
+"""
+__copyright__ = """
+Copyright (C) 2020-1 University of Illinois Board of Trustees
+"""
+
+__license__ = """
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+"""
+
+from arraycontext.context import ArrayContext
+import numpy as np
+from typing import Any, Callable, Union, Sequence
+from pytools.tag import Tag
+
+
+class PytatoPyOpenCLArrayContext(ArrayContext):
+    """
+    A :class:`ArrayContext` that uses :mod:`pytato` data types to represent
+    the arrays targeting OpenCL for offloading operations.
+
+    .. attribute:: queue
+
+        A :class:`pyopencl.CommandQueue`.
+
+    .. attribute:: allocator
+
+        A :mod:`pyopencl` memory allocator. Can also be None (default) or False
+        to use the default allocator.
+
+    .. automethod:: __init__
+    """
+
+    def __init__(self, queue, allocator=None):
+        super().__init__()
+        self.queue = queue
+        self.allocator = allocator
+
+        # unused, but necessary to keep the context alive
+        self.context = self.queue.context
+
+    def _get_fake_numpy_namespace(self):
+        from arraycontext.impl.pytato.fake_numpy import PytatoFakeNumpyNamespace
+        return PytatoFakeNumpyNamespace(self)
+
+    # {{{ ArrayContext interface
+
+    def clone(self):
+        return type(self)(self.queue, self.allocator)
+
+    def empty(self, shape, dtype):
+        raise ValueError("PytatoPyOpenCLArrayContext does not support empty")
+
+    def zeros(self, shape, dtype):
+        import pytato as pt
+        return pt.zeros(shape, dtype)
+
+    def from_numpy(self, np_array: np.ndarray):
+        import pytato as pt
+        import pyopencl.array as cla
+        cl_array = cla.to_device(self.queue, np_array)
+        return pt.make_data_wrapper(cl_array)
+
+    def to_numpy(self, array):
+        cl_array = self.freeze(array)
+        return cl_array.get(queue=self.queue)
+
+    def call_loopy(self, program, **kwargs):
+        import pyopencl.array as cla
+        from pytato.loopy import call_loopy
+        entrypoint, = set(program.callables_table)
+
+        # thaw frozen arrays
+        kwargs = {kw: (self.thaw(arg) if isinstance(arg, cla.Array) else arg)
+                  for kw, arg in kwargs.items()}
+
+        return call_loopy(program, kwargs, entrypoint)
+
+    def freeze(self, array):
+        # TODO: This should store a cache of pytato DAG -> build pyopencl
+        # program instead of re-compiling the DAG for every freeze.
+
+        import pytato as pt
+        import pyopencl.array as cla
+
+        if isinstance(array, cla.Array):
+            return array.with_queue(None)
+        if not isinstance(array, pt.Array):
+            raise TypeError("PytatoPyOpenCLArrayContext.freeze invoked with "
+                            f"non-pytato array of type '{type(array)}'")
+
+        pt_prg = pt.generate_loopy(array, cl_device=self.queue.device)
+        pt_prg = pt_prg.with_transformed_program(self.transform_loopy_program)
+
+        evt, (cl_array,) = pt_prg(self.queue)
+        evt.wait()
+
+        return cl_array.with_queue(None)
+
+    def thaw(self, array):
+        import pytato as pt
+        import pyopencl.array as cla
+
+        if not isinstance(array, cla.Array):
+            raise TypeError("PytatoPyOpenCLArrayContext.thaw expects CL arrays, got "
+                    f"{type(array)}")
+
+        return pt.make_data_wrapper(array.with_queue(self.queue))
+
+    # }}}
+
+    def compile(self, f: Callable[..., Any]) -> Callable[..., Any]:
+        from arraycontext.impl.pytato.compile import LazilyCompilingFunctionCaller
+        return LazilyCompilingFunctionCaller(self, f)
+
+    def transform_loopy_program(self, t_unit):
+        raise ValueError("PytatoPyOpenCLArrayContext does not implement "
+                         "transform_loopy_program. Sub-classes are supposed "
+                         "to implement it.")
+
+    def tag(self, tags: Union[Sequence[Tag], Tag], array):
+        return array.tagged(tags)
+
+    def tag_axis(self, iaxis, tags: Union[Sequence[Tag], Tag], array):
+        # TODO
+        from warnings import warn
+        warn("tagging PytatoPyOpenCLArrayContext's array axes: not yet implemented",
+             stacklevel=2)
+        return array
+
+    def einsum(self, spec, *args, arg_names=None, tagged=()):
+        import pyopencl.array as cla
+        import pytato as pt
+        if arg_names is not None:
+            from warnings import warn
+            warn("'arg_names' don't bear any significance in "
+                 "PytatoPyOpenCLArrayContext.", stacklevel=2)
+
+        def preprocess_arg(arg):
+            if isinstance(arg, cla.Array):
+                return self.thaw(arg)
+            else:
+                assert isinstance(arg, pt.Array)
+                return arg
+
+        return pt.einsum(spec, *(preprocess_arg(arg) for arg in args))
+
+    @property
+    def permits_inplace_modification(self):
+        return False
diff --git a/arraycontext/impl/pytato/compile.py b/arraycontext/impl/pytato/compile.py
new file mode 100644
index 0000000000000000000000000000000000000000..4f608e09243484e8a7d8dd26255e7b892c21d853
--- /dev/null
+++ b/arraycontext/impl/pytato/compile.py
@@ -0,0 +1,320 @@
+"""
+.. currentmodule:: arraycontext.impl.pytato.compile
+.. autoclass:: LazilyCompilingFunctionCaller
+.. autoclass:: CompiledFunction
+"""
+__copyright__ = """
+Copyright (C) 2020-1 University of Illinois Board of Trustees
+"""
+
+__license__ = """
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+"""
+
+from arraycontext.container import ArrayContainer
+from arraycontext import PytatoPyOpenCLArrayContext
+from arraycontext.container.traversal import (rec_keyed_map_array_container,
+                                              is_array_container)
+
+import numpy as np
+from typing import Any, Callable, Tuple, Dict, Mapping
+from dataclasses import dataclass, field
+from pyrsistent import pmap, PMap
+
+import pyopencl.array as cla
+import pytato as pt
+
+
+# {{{ helper classes: AbstractInputDescriptor
+
+class AbstractInputDescriptor:
+    """
+    Used internally in :class:`LazilyCompilingFunctionCaller` to characterize
+    an input.
+    """
+    def __eq__(self, other):
+        raise NotImplementedError
+
+    def __hash__(self):
+        raise NotImplementedError
+
+
+@dataclass(frozen=True, eq=True)
+class ScalarInputDescriptor(AbstractInputDescriptor):
+    dtype: np.dtype
+
+
+@dataclass(frozen=True, eq=True)
+class LeafArrayDescriptor(AbstractInputDescriptor):
+    dtype: np.dtype
+    shape: Tuple[int, ...]
+
+# }}}
+
+
+def _ary_container_key_stringifier(keys: Tuple[Any, ...]) -> str:
+    """
+    Helper for :meth:`LazilyCompilingFunctionCaller.__call__`. Stringifies an
+    array-container's component's key. Goals of this routine:
+
+    * No two different keys should have the same stringification
+    * Stringified key must a valid identifier according to :meth:`str.isidentifier`
+    * (informal) Shorter identifiers are preferred
+    """
+    def _rec_str(key: Any) -> str:
+        if isinstance(key, (str, int)):
+            return str(key)
+        elif isinstance(key, tuple):
+            # t in '_actx_t': stands for tuple
+            return "_actx_t" + "_".join(_rec_str(k) for k in key) + "_actx_endt"
+        else:
+            raise NotImplementedError("Key-stringication unimplemented for "
+                                      f"'{type(key).__name__}'.")
+
+    return "_".join(_rec_str(key) for key in keys)
+
+
+def _get_arg_id_to_arg_and_arg_id_to_descr(args: Tuple[Any, ...]
+                                           ) -> "Tuple[PMap[Tuple[Any, ...],\
+                                                            Any],\
+                                                       PMap[Tuple[Any, ...],\
+                                                            AbstractInputDescriptor]\
+                                                       ]":
+    """
+    Helper for :meth:`LazilyCompilingFunctionCaller.__call__`. Extracts
+    mappings from argument id to argument values and from argument id to
+    :class:`AbstractInputDescriptor`. See
+    :attr:`CompiledFunction.input_id_to_name_in_program` for argument-id's
+    representation.
+    """
+    arg_id_to_arg: Dict[Tuple[Any, ...], Any] = {}
+    arg_id_to_descr: Dict[Tuple[Any, ...], AbstractInputDescriptor] = {}
+
+    for iarg, arg in enumerate(args):
+        if np.isscalar(arg):
+            arg_id = (iarg,)
+            arg_id_to_arg[arg_id] = arg
+            arg_id_to_descr[arg_id] = ScalarInputDescriptor(np.dtype(arg))
+        elif is_array_container(arg):
+            def id_collector(keys, ary):
+                arg_id = (iarg,) + keys
+                arg_id_to_arg[arg_id] = ary
+                arg_id_to_descr[arg_id] = LeafArrayDescriptor(np.dtype(ary.dtype),
+                                                              ary.shape)
+                return ary
+
+            rec_keyed_map_array_container(id_collector, arg)
+        else:
+            raise ValueError("Argument to a compiled operator should be"
+                             " either a scalar or an array container. Got"
+                             f" '{arg}'.")
+
+    return pmap(arg_id_to_arg), pmap(arg_id_to_descr)
+
+
+def _get_f_placeholder_args(arg, iarg, arg_id_to_name):
+    """
+    Helper for :class:`LazilyCompilingFunctionCaller.__call__`. Returns the
+    placeholder version of an argument to
+    :attr:`LazilyCompilingFunctionCaller.f`.
+    """
+    if np.isscalar(arg):
+        name = arg_id_to_name[(iarg,)]
+        return pt.make_placeholder((), np.dtype(arg), name)
+    elif is_array_container(arg):
+        def _rec_to_placeholder(keys, ary):
+            name = arg_id_to_name[(iarg,) + keys]
+            return pt.make_placeholder(ary.shape, ary.dtype,
+                                       name)
+        return rec_keyed_map_array_container(_rec_to_placeholder,
+                                                arg)
+    else:
+        raise NotImplementedError(type(arg))
+
+
+@dataclass
+class LazilyCompilingFunctionCaller:
+    """
+    Records a side-effect-free callable
+    :attr:`LazilyCompilingFunctionCaller.f` that can be specialized for the
+    input types with which :meth:`LazilyCompilingFunctionCaller.__call__` is
+    invoked.
+
+    .. attribute:: f
+
+        The callable that will be called to obtain :mod:`pytato` DAGs.
+
+    .. automethod:: __call__
+    """
+
+    actx: PytatoPyOpenCLArrayContext
+    f: Callable[..., Any]
+    program_cache: Dict["PMap[Tuple[Any, ...], AbstractInputDescriptor]",
+                        "CompiledFunction"] = field(default_factory=lambda: {})
+
+    def __call__(self, *args: Any) -> Any:
+        """
+        Returns the result of :attr:`~LazilyCompilingFunctionCaller.f`'s
+        function application on *args*.
+
+        Before applying :attr:`~LazilyCompilingFunctionCaller.f`, it is compiled
+        to a :mod:`pytato` DAG that would apply
+        :attr:`~LazilyCompilingFunctionCaller.f` with *args* in a lazy-sense.
+        The intermediary pytato DAG for *args* is memoized in *self*.
+        """
+        from pytato.target.loopy import BoundPyOpenCLProgram
+        arg_id_to_arg, arg_id_to_descr = _get_arg_id_to_arg_and_arg_id_to_descr(args)
+
+        try:
+            compiled_f = self.program_cache[arg_id_to_descr]
+        except KeyError:
+            pass
+        else:
+            return compiled_f(arg_id_to_arg)
+
+        dict_of_named_arrays = {}
+        # output_naming_map: result id to name of the named array in the
+        # generated pytato DAG.
+        output_naming_map = {}
+        # input_naming_map: argument id to placeholder name in the generated
+        # pytato DAG.
+        input_naming_map = {
+            arg_id: f"_actx_in_{_ary_container_key_stringifier(arg_id)}"
+            for arg_id in arg_id_to_arg}
+
+        outputs = self.f(*[_get_f_placeholder_args(arg, iarg, input_naming_map)
+                           for iarg, arg in enumerate(args)])
+
+        if not is_array_container(outputs):
+            # TODO: We could possibly just short-circuit this interface if the
+            # returned type is a scalar. Not sure if it's worth it though.
+            raise NotImplementedError(
+                f"Function '{self.f.__name__}' to be compiled "
+                "did not return an array container, but an instance of "
+                f"'{outputs.__class__}' instead.")
+
+        def _as_dict_of_named_arrays(keys, ary):
+            name = "_pt_out_" + "_".join(str(key)
+                                         for key in keys)
+            output_naming_map[keys] = name
+            dict_of_named_arrays[name] = ary
+            return ary
+
+        rec_keyed_map_array_container(_as_dict_of_named_arrays,
+                                      outputs)
+
+        pytato_program = pt.generate_loopy(dict_of_named_arrays,
+                                           options={"return_dict": True},
+                                           cl_device=self.actx.queue.device)
+        assert isinstance(pytato_program, BoundPyOpenCLProgram)
+
+        pytato_program = (pytato_program
+                          .with_transformed_program(self
+                                                    .actx
+                                                    .transform_loopy_program))
+
+        self.program_cache[arg_id_to_descr] = CompiledFunction(
+                                                self.actx, pytato_program,
+                                                input_naming_map, output_naming_map,
+                                                output_template=outputs)
+
+        return self.program_cache[arg_id_to_descr](arg_id_to_arg)
+
+
+@dataclass
+class CompiledFunction:
+    """
+    A callable which captures the :class:`pytato.target.BoundProgram`  resulting
+    from calling :attr:`~LazilyCompilingFunctionCaller.f` with a given set of
+    input types, and generating :mod:`loopy` IR from it.
+
+    .. attribute:: pytato_program
+
+    .. attribute:: input_id_to_name_in_program
+
+        A mapping from input id to the placholder name in
+        :attr:`CompiledFunction.pytato_program`. Input id is represented as the
+        position of :attr:`~LazilyCompilingFunctionCaller.f`'s argument augmented
+        with the leaf array's key if the argument is an array container.
+
+    .. attribute:: output_id_to_name_in_program
+
+        A mapping from output id to the name of
+        :class:`pytato.array.NamedArray` in
+        :attr:`CompiledFunction.pytato_program`. Output id is represented by
+        the key of a leaf array in the array container
+        :attr:`CompiledFunction.output_template`.
+
+    .. attribute:: output_template
+
+       An instance of :class:`arraycontext.ArrayContainer` that is the return
+       type of the callable.
+    """
+
+    actx: PytatoPyOpenCLArrayContext
+    pytato_program: pt.target.BoundProgram
+    input_id_to_name_in_program: Mapping[Tuple[Any, ...], str]
+    output_id_to_name_in_program: Mapping[Tuple[Any, ...], str]
+    output_template: ArrayContainer
+
+    def __call__(self, arg_id_to_arg) -> ArrayContainer:
+        """
+        :arg arg_id_to_arg: Mapping from input id to the passed argument. See
+            :attr:`CompiledFunction.input_id_to_name_in_program` for input id's
+            representation.
+        """
+        from arraycontext.container.traversal import rec_keyed_map_array_container
+
+        input_kwargs_to_loopy = {}
+
+        # {{{ preprocess args to get arguments (CL buffers) to be fed to the
+        # loopy program
+
+        for arg_id, arg in arg_id_to_arg.items():
+            if np.isscalar(arg):
+                arg = cla.to_device(self.actx.queue, np.array(arg))
+            elif isinstance(arg, pt.array.DataWrapper):
+                # got a Datwwrapper => simply gets its data
+                arg = arg.data
+            elif isinstance(arg, cla.Array):
+                # got a frozen array  => do nothing
+                pass
+            elif isinstance(arg, pt.Array):
+                # got an array expression => evaluate it
+                arg = self.actx.freeze(arg).with_queue(self.actx.queue)
+            else:
+                raise NotImplementedError(type(arg))
+
+            input_kwargs_to_loopy[self.input_id_to_name_in_program[arg_id]] = arg
+
+        evt, out_dict = self.pytato_program(queue=self.actx.queue,
+                                            allocator=self.actx.allocator,
+                                            **input_kwargs_to_loopy)
+        # FIXME Kernels (for now) allocate tons of memory in temporaries. If we
+        # race too far ahead with enqueuing, there is a distinct risk of
+        # running out of memory. This mitigates that risk a bit, for now.
+        evt.wait()
+
+        # }}}
+
+        def to_output_template(keys, _):
+            return self.actx.thaw(out_dict[self.output_id_to_name_in_program[keys]])
+
+        return rec_keyed_map_array_container(to_output_template,
+                                             self.output_template)
diff --git a/arraycontext/impl/pytato/fake_numpy.py b/arraycontext/impl/pytato/fake_numpy.py
new file mode 100644
index 0000000000000000000000000000000000000000..9793dde7ab8c4ede3d9b9a52ae9ecc9e0ad71b11
--- /dev/null
+++ b/arraycontext/impl/pytato/fake_numpy.py
@@ -0,0 +1,148 @@
+__copyright__ = """
+Copyright (C) 2021 University of Illinois Board of Trustees
+"""
+
+__license__ = """
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+"""
+
+from arraycontext.fake_numpy import \
+        BaseFakeNumpyNamespace, BaseFakeNumpyLinalgNamespace
+from arraycontext.container.traversal import \
+        rec_multimap_array_container, rec_map_array_container
+import pytato as pt
+
+
+class PytatoFakeNumpyLinalgNamespace(BaseFakeNumpyLinalgNamespace):
+    # Everything is implemented in the base class for now.
+    pass
+
+
+class PytatoFakeNumpyNamespace(BaseFakeNumpyNamespace):
+    """
+    A :mod:`numpy` mimic for :class:`PytatoPyOpenCLArrayContext`.
+
+    .. note::
+
+        :mod:`pytato` does not define any memory layout for its arrays. See
+        :ref:`Pytato docs <pytato:memory-layout>` for more on this.
+    """
+    def _get_fake_numpy_linalg_namespace(self):
+        return PytatoFakeNumpyLinalgNamespace(self._array_context)
+
+    def __getattr__(self, name):
+
+        pt_funcs = ["abs", "sin", "cos", "tan", "arcsin", "arccos", "arctan",
+                    "sinh", "cosh", "tanh", "exp", "log", "log10", "isnan",
+                    "sqrt", "exp"]
+        if name in pt_funcs:
+            from functools import partial
+            return partial(rec_map_array_container, getattr(pt, name))
+
+        return super().__getattr__(name)
+
+    def reshape(self, a, newshape):
+        return rec_multimap_array_container(pt.reshape, a, newshape)
+
+    def transpose(self, a, axes=None):
+        return rec_multimap_array_container(pt.transpose, a, axes)
+
+    def concatenate(self, arrays, axis=0):
+        return rec_multimap_array_container(pt.concatenate, arrays, axis)
+
+    def ones_like(self, ary):
+        def _ones_like(subary):
+            return pt.ones(subary.shape, subary.dtype)
+
+        return self._new_like(ary, _ones_like)
+
+    def maximum(self, x, y):
+        return rec_multimap_array_container(pt.maximum, x, y)
+
+    def minimum(self, x, y):
+        return rec_multimap_array_container(pt.minimum, x, y)
+
+    def where(self, criterion, then, else_):
+        return rec_multimap_array_container(pt.where, criterion, then, else_)
+
+    def sum(self, a, dtype=None):
+        if dtype not in [a.dtype, None]:
+            raise NotImplementedError
+        return pt.sum(a)
+
+    def min(self, a):
+        return pt.amin(a)
+
+    def max(self, a):
+        return pt.amax(a)
+
+    def stack(self, arrays, axis=0):
+        return rec_multimap_array_container(lambda *args: pt.stack(arrays=args,
+                                                                   axis=axis),
+                                            *arrays)
+
+    # {{{ relational operators
+
+    def equal(self, x, y):
+        return rec_multimap_array_container(pt.equal, x, y)
+
+    def not_equal(self, x, y):
+        return rec_multimap_array_container(pt.not_equal, x, y)
+
+    def greater(self, x, y):
+        return rec_multimap_array_container(pt.greater, x, y)
+
+    def greater_equal(self, x, y):
+        return rec_multimap_array_container(pt.greater_equal, x, y)
+
+    def less(self, x, y):
+        return rec_multimap_array_container(pt.less, x, y)
+
+    def less_equal(self, x, y):
+        return rec_multimap_array_container(pt.less_equal, x, y)
+
+    def conj(self, x):
+        return rec_multimap_array_container(pt.conj, x)
+
+    def arctan2(self, y, x):
+        return rec_multimap_array_container(pt.arctan2, y, x)
+
+    def ravel(self, a, order="C"):
+        """
+        :arg order: A :class:`str` describing the order in which the elements
+            must be traversed while flattening. Can be one of 'F', 'C', 'A' or
+            'K'. Since, :mod:`pytato` arrays don't have a memory layout, if
+            *order* is 'A' or 'K', the traversal order while flattening is
+            undefined.
+        """
+
+        def _rec_ravel(a):
+            if order in "FC":
+                return pt.reshape(a, (-1,), order=order)
+            elif order in "AK":
+                # flattening in a C-order
+                # memory layout is assumed to be "C"
+                return pt.reshape(a, (-1,), order="C")
+            else:
+                raise ValueError("`order` can be one of 'F', 'C', 'A' or 'K'. "
+                                 f"(got {order})")
+
+        return rec_map_array_container(_rec_ravel, a)
+
+    # }}}
diff --git a/arraycontext/pytest.py b/arraycontext/pytest.py
index 9cc4b4e5f779594a1906d4a20a467edcc2d312c7..e93a8b38bd8528d8719dfe818bd58c56214a3c66 100644
--- a/arraycontext/pytest.py
+++ b/arraycontext/pytest.py
@@ -90,7 +90,7 @@ class _PytestPyOpenCLArrayContextFactoryWithClass(PytestPyOpenCLArrayContextFact
                 force_device_scalars=self.force_device_scalars)
 
     def __str__(self):
-        return ("<%s for <pyopencl.Device '%s' on '%s'>" %
+        return ("<%s for <pyopencl.Device '%s' on '%s'>>" %
                 (
                     self.actx_class.__name__,
                     self.device.name.strip(),
@@ -102,11 +102,36 @@ class _PytestPyOpenCLArrayContextFactoryWithClassAndHostScalars(
     force_device_scalars = False
 
 
+class _PytestPytatoPyOpenCLArrayContextFactory(
+        PytestPyOpenCLArrayContextFactory):
+
+    @property
+    def actx_class(self):
+        from arraycontext import PytatoPyOpenCLArrayContext
+        return PytatoPyOpenCLArrayContext
+
+    def __call__(self):
+        # The ostensibly pointless assignment to *ctx* keeps the CL context alive
+        # long enough to create the array context, which will then start
+        # holding a reference to the context to keep it alive in turn.
+        # On some implementations (notably Intel CPU), holding a reference
+        # to a queue does not keep the context alive.
+        ctx, queue = self.get_command_queue()
+        return self.actx_class(queue)
+
+    def __str__(self):
+        return ("<PytatoPyOpenCLArrayContext for <pyopencl.Device '%s' on '%s'>>" %
+                (
+                    self.device.name.strip(),
+                    self.device.platform.name.strip()))
+
+
 _ARRAY_CONTEXT_FACTORY_REGISTRY: \
         Dict[str, Type[PytestPyOpenCLArrayContextFactory]] = {
                 "pyopencl": _PytestPyOpenCLArrayContextFactoryWithClass,
                 "pyopencl-deprecated":
                 _PytestPyOpenCLArrayContextFactoryWithClassAndHostScalars,
+                "pytato-pyopencl": _PytestPytatoPyOpenCLArrayContextFactory,
                 }
 
 
@@ -157,6 +182,8 @@ def pytest_generate_tests_for_array_contexts(
     * ``"pyopencl-deprecated"``, which creates a
       :class:`~arraycontext.PyOpenCLArrayContext` with
       ``force_device_scalars=False``.
+    * ``"pytato-pyopencl"``, which creates a
+      :class:`~arraycontext.PytatoPyOpenCLArrayContext`.
 
     :arg factories: a list of identifiers or
         :class:`PytestPyOpenCLArrayContextFactory` classes (not instances)
@@ -234,6 +261,9 @@ def pytest_generate_tests_for_array_contexts(
 
         # }}}
 
+        # Sort the actx's so that parallel pytest works
+        arg_value_tuples = sorted(arg_value_tuples, key=lambda x: x.__str__())
+
         metafunc.parametrize(arg_names, arg_value_tuples, ids=ids)
 
     return inner
diff --git a/doc/array_context.rst b/doc/array_context.rst
index 4b3b00486dbb6699a04e0c020e71c789d7aad189..7a12dfb607790494910eb9232e2220e4e9bf8ffa 100644
--- a/doc/array_context.rst
+++ b/doc/array_context.rst
@@ -12,3 +12,9 @@ Array context based on :mod:`pyopencl.array`
 --------------------------------------------
 
 .. automodule:: arraycontext.impl.pyopencl
+
+
+Lazy/Deferred evaluation array context based on :mod:`pytato`
+-------------------------------------------------------------
+
+.. automodule:: arraycontext.impl.pytato
diff --git a/test/test_arraycontext.py b/test/test_arraycontext.py
index 12af22c7db2ff1b1b434c152087359889117b069..e1a2c08b4cc63026621b6ceb7b11734015baae53 100644
--- a/test/test_arraycontext.py
+++ b/test/test_arraycontext.py
@@ -32,10 +32,14 @@ from arraycontext import (
         serialize_container, deserialize_container,
         freeze, thaw,
         FirstAxisIsElementsTag,
-        PyOpenCLArrayContext)
+        PyOpenCLArrayContext,
+        PytatoPyOpenCLArrayContext,
+        ArrayContainer,)
 from arraycontext import (  # noqa: F401
-        pytest_generate_tests_for_array_contexts)
-from arraycontext.pytest import _PytestPyOpenCLArrayContextFactoryWithClass
+        pytest_generate_tests_for_array_contexts,
+        )
+from arraycontext.pytest import (_PytestPyOpenCLArrayContextFactoryWithClass,
+                                 _PytestPytatoPyOpenCLArrayContextFactory)
 
 
 import logging
@@ -53,6 +57,16 @@ class _PyOpenCLArrayContextForTests(PyOpenCLArrayContext):
         return t_unit
 
 
+class _PytatoPyOpenCLArrayContextForTests(PytatoPyOpenCLArrayContext):
+    """Like :class:`PytatoPyOpenCLArrayContext`, but applies no program
+    transformations whatsoever. Only to be used for testing internal to
+    :mod:`arraycontext`.
+    """
+
+    def transform_loopy_program(self, t_unit):
+        return t_unit
+
+
 class _PyOpenCLArrayContextWithHostScalarsForTestsFactory(
         _PytestPyOpenCLArrayContextFactoryWithClass):
     actx_class = _PyOpenCLArrayContextForTests
@@ -63,9 +77,15 @@ class _PyOpenCLArrayContextForTestsFactory(
     force_device_scalars = True
 
 
+class _PytatoPyOpenCLArrayContextForTestsFactory(
+        _PytestPytatoPyOpenCLArrayContextFactory):
+    actx_class = _PytatoPyOpenCLArrayContextForTests
+
+
 pytest_generate_tests = pytest_generate_tests_for_array_contexts([
     _PyOpenCLArrayContextForTestsFactory,
     _PyOpenCLArrayContextWithHostScalarsForTestsFactory,
+    _PytatoPyOpenCLArrayContextForTestsFactory,
     ])
 
 
@@ -299,7 +319,7 @@ def test_dof_array_arithmetic_same_as_numpy(actx_factory):
         return ary.real
 
     def get_imag(ary):
-        return ary.real
+        return ary.imag
 
     import operator
     from pytools import generate_nonnegative_integer_tuples_below as gnitb
@@ -357,6 +377,19 @@ def test_dof_array_arithmetic_same_as_numpy(actx_factory):
                 # can't do in place operations with a scalar lhs
                 continue
 
+            if op_func == operator.ge:
+                op_func_actx = actx.np.greater_equal
+            elif op_func == operator.lt:
+                op_func_actx = actx.np.less
+            elif op_func == operator.gt:
+                op_func_actx = actx.np.greater
+            elif op_func == operator.eq:
+                op_func_actx = actx.np.equal
+            elif op_func == operator.ne:
+                op_func_actx = actx.np.not_equal
+            else:
+                op_func_actx = op_func
+
             args = [
                     (0.5+np.random.rand(ndofs)
                         if not use_integers else
@@ -385,7 +418,7 @@ def test_dof_array_arithmetic_same_as_numpy(actx_factory):
                     if isinstance(arg, np.ndarray) else arg
                     for arg in args]
 
-            actx_result = actx.to_numpy(op_func(*actx_args)[0])
+            actx_result = actx.to_numpy(op_func_actx(*actx_args)[0])
 
             assert np.allclose(actx_result, ref_result)
 
@@ -417,7 +450,7 @@ def test_dof_array_arithmetic_same_as_numpy(actx_factory):
                         for arg in actx_args]
 
                 obj_array_result = actx.to_numpy(
-                        op_func(*obj_array_args)[0][0])
+                        op_func_actx(*obj_array_args)[0][0])
 
                 assert np.allclose(obj_array_result, ref_result)
 
@@ -427,22 +460,23 @@ def test_dof_array_arithmetic_same_as_numpy(actx_factory):
 
 
 # {{{ reductions same as numpy
-
-def test_dof_array_reductions_same_as_numpy(actx_factory):
+@pytest.mark.parametrize("op", ["sum", "min", "max"])
+def test_dof_array_reductions_same_as_numpy(actx_factory, op):
     actx = actx_factory()
 
+    ary = np.random.randn(3000)
+    np_red = getattr(np, op)(ary)
+    actx_red = getattr(actx.np, op)(actx.from_numpy(ary))
+    actx_red = actx.to_numpy(actx_red)
+
     from numbers import Number
-    for name in ["sum", "min", "max"]:
-        ary = np.random.randn(3000)
-        np_red = getattr(np, name)(ary)
-        actx_red = getattr(actx.np, name)(actx.from_numpy(ary))
 
-        if actx._force_device_scalars:
-            assert actx_red.shape == ()
-        else:
-            assert isinstance(actx_red, Number)
+    if isinstance(actx, PyOpenCLArrayContext) and (not actx._force_device_scalars):
+        assert isinstance(actx_red, Number)
+    else:
+        assert actx_red.shape == ()
 
-        assert np.allclose(np_red, actx.to_numpy(actx_red))
+    assert np.allclose(np_red, actx_red)
 
 # }}}
 
@@ -569,8 +603,8 @@ def test_container_multimap(actx_factory):
 
     # {{{ check
 
-    def _check_allclose(f, arg1, arg2, atol=1.0e-14):
-        assert np.linalg.norm((f(arg1) - arg2).get()) < atol
+    def _check_allclose(f, arg1, arg2, atol=2.0e-14):
+        assert np.linalg.norm(actx.to_numpy(f(arg1) - arg2)) < atol
 
     def func_all_scalar(x, y):
         return x + y
@@ -610,8 +644,8 @@ def test_container_arithmetic(actx_factory):
 
     # {{{ check
 
-    def _check_allclose(f, arg1, arg2, atol=1.0e-14):
-        assert np.linalg.norm((f(arg1) - arg2).get()) < atol
+    def _check_allclose(f, arg1, arg2, atol=5.0e-14):
+        assert np.linalg.norm(actx.to_numpy(f(arg1) - arg2)) < atol
 
     from functools import partial
     from arraycontext import rec_multimap_array_container
@@ -638,7 +672,8 @@ def test_container_arithmetic(actx_factory):
     bcast_result = ary_dof + bcast_dc_of_dofs
     bcast_dc_of_dofs + ary_dof
 
-    assert actx.np.linalg.norm(bcast_result.mass - 2*ary_of_dofs) < 1e-8
+    assert actx.to_numpy(actx.np.linalg.norm(bcast_result.mass
+                                             - 2*ary_of_dofs)) < 1e-8
 
     mock_gradient = MyContainerDOFBcast(
             name="yo",
@@ -649,7 +684,9 @@ def test_container_arithmetic(actx_factory):
     grad_matvec_result = mock_gradient @ ary_of_dofs
     assert isinstance(grad_matvec_result.mass, DOFArray)
     assert grad_matvec_result.momentum.shape == (3,)
-    assert actx.np.linalg.norm(grad_matvec_result.mass - 3*ary_of_dofs**2) < 1e-8
+
+    assert actx.to_numpy(actx.np.linalg.norm(grad_matvec_result.mass
+                                             - 3*ary_of_dofs**2)) < 1e-8
 
     # }}}
 
@@ -700,9 +737,6 @@ def test_container_freeze_thaw(actx_factory):
 def test_container_norm(actx_factory, ord):
     actx = actx_factory()
 
-    ary_dof, ary_of_dofs, mat_of_dofs, dc_of_dofs, bcast_dc_of_dofs = \
-            _get_test_containers(actx)
-
     from pytools.obj_array import make_obj_array
     c = MyContainer(name="hey", mass=1, momentum=make_obj_array([2, 3]), enthalpy=5)
     n1 = actx.np.linalg.norm(make_obj_array([c, c]), ord)
@@ -754,6 +788,8 @@ def test_norm_complex(actx_factory, norm_ord):
     norm_a_ref = np.linalg.norm(a, norm_ord)
     norm_a = actx.np.linalg.norm(actx.from_numpy(a), norm_ord)
 
+    norm_a = actx.to_numpy(norm_a)
+
     assert abs(norm_a_ref - norm_a)/norm_a < 1e-13
 
 
@@ -774,6 +810,45 @@ def test_norm_ord_none(actx_factory, ndim):
     np.testing.assert_allclose(actx.to_numpy(norm_a), norm_a_ref)
 
 
+# {{{ test_actx_compile helpers
+
+@with_container_arithmetic(bcast_obj_array=True, rel_comparison=True)
+@dataclass_array_container
+@dataclass(frozen=True)
+class Velocity2D:
+    u: ArrayContainer
+    v: ArrayContainer
+    array_context: ArrayContext
+
+
+def scale_and_orthogonalize(alpha, vel):
+    from arraycontext import rec_map_array_container
+    actx = vel.array_context
+    scaled_vel = rec_map_array_container(lambda x: alpha * x,
+                                         vel)
+    return Velocity2D(-scaled_vel.v, scaled_vel.u, actx)
+
+# }}}
+
+
+def test_actx_compile(actx_factory):
+    from arraycontext import (to_numpy, from_numpy)
+    actx = actx_factory()
+
+    compiled_rhs = actx.compile(scale_and_orthogonalize)
+
+    v_x = np.random.rand(10)
+    v_y = np.random.rand(10)
+
+    vel = from_numpy(Velocity2D(v_x, v_y, actx), actx)
+
+    scaled_speed = compiled_rhs(np.float64(3.14), vel)
+
+    result = to_numpy(scaled_speed, actx)
+    np.testing.assert_allclose(result.u, -3.14*v_y)
+    np.testing.assert_allclose(result.v, 3.14*v_x)
+
+
 def test_container_equality(actx_factory):
     actx = actx_factory()
 
diff --git a/test/test_utils.py b/test/test_utils.py
new file mode 100644
index 0000000000000000000000000000000000000000..2228152f68f18d89cbd6d4bb188ade3b70414e9a
--- /dev/null
+++ b/test/test_utils.py
@@ -0,0 +1,48 @@
+"""Testing for internal  utilities."""
+
+
+__copyright__ = "Copyright (C) 2021 University of Illinois Board of Trustees"
+
+__license__ = """
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+"""
+
+import logging
+logger = logging.getLogger(__name__)
+
+
+def test_pt_actx_key_stringification_uniqueness():
+    from arraycontext.impl.pytato.compile import _ary_container_key_stringifier
+
+    assert (_ary_container_key_stringifier(((3, 2), 3))
+            != _ary_container_key_stringifier((3, (2, 3))))
+
+    assert (_ary_container_key_stringifier(("tup", 3, "endtup"))
+            != _ary_container_key_stringifier(((3,),)))
+
+
+if __name__ == "__main__":
+    import sys
+    if len(sys.argv) > 1:
+        exec(sys.argv[1])
+    else:
+        from pytest import main
+        main([__file__])
+
+# vim: fdm=marker