diff --git a/doc/index.rst b/doc/index.rst
index d113cce65741f2d97f0f89b9248587ab4a2a4c5a..303a75895f1b44d652973fb1335cc83c377d5796 100644
--- a/doc/index.rst
+++ b/doc/index.rst
@@ -10,6 +10,8 @@ Contents:
     dof_desc
     geometry
     operators
+    utils
+    references
     misc
     🚀 Github <https://github.com/inducer/grudge>
     💾 Download Releases <https://pypi.org/project/grudge>
diff --git a/doc/misc.rst b/doc/misc.rst
index 9f1f28b10899961adc6f92acfb50e3157080a99c..79eb5de443dc14e05b71b720bf24d23f55d49754 100644
--- a/doc/misc.rst
+++ b/doc/misc.rst
@@ -104,7 +104,7 @@ Licensing
 
 :mod:`grudge` is licensed to you under the MIT/X Consortium license:
 
-Copyright (c) 2014-16 Andreas Klöckner and Contributors.
+Copyright (c) 2014-21 Andreas Klöckner and Contributors.
 
 Permission is hereby granted, free of charge, to any person
 obtaining a copy of this software and associated documentation
diff --git a/doc/references.rst b/doc/references.rst
new file mode 100644
index 0000000000000000000000000000000000000000..e56859ee9b77554770d99761f0ed7d394019c9c2
--- /dev/null
+++ b/doc/references.rst
@@ -0,0 +1,22 @@
+References
+==========
+
+..
+    When adding references here, please use the demonstrated format:
+    [FirstAuthor_pubyear]
+
+.. [Shewchuk_2002] J. R. Shewchuk (2002), \
+    What Is a Good Linear Finite Element? - \
+    Interpolation, Conditioning, Anisotropy, and Quality Measures, \
+    In Proc. of the 11th International Meshing Roundtable \
+    `(link) <http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.19.2164>`__
+
+.. [Hesthaven_2008] J. S. Hesthaven and T. Warburton (2008), \
+    Nodal Discontinuous Galerkin Methods: Algorithms, Analysis, and Applications, \
+    Springer \
+    `(doi) <https://doi.org/10.1007/978-0-387-72067-8>`__
+
+.. [Chan_2016] J. Chan, R. J. Hewett, and T. Warburton (2016), \
+    Weight-Adjusted Discontinuous Galerkin Methods: Curvilinear Meshes, \
+    SIAM J Sci Comput \
+    `(doi) <https://doi.org/10.1137/16M1089198>`__
diff --git a/doc/utils.rst b/doc/utils.rst
new file mode 100644
index 0000000000000000000000000000000000000000..d269517a0f785142bf466745bf3bd8172f365cc3
--- /dev/null
+++ b/doc/utils.rst
@@ -0,0 +1,7 @@
+Helper functions
+================
+
+Estimating stable time-steps
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. automodule:: grudge.dt_utils
diff --git a/examples/advection/surface.py b/examples/advection/surface.py
index 74771af007aa2b3c1f124e0976140cdb4793e28e..9dcdc93ec550acdb0e50d210ce470c897bb04d24 100644
--- a/examples/advection/surface.py
+++ b/examples/advection/surface.py
@@ -116,8 +116,6 @@ def main(ctx_factory, dim=2, order=4, use_quad=False, visualize=False):
     # sphere resolution
     resolution = 64 if dim == 2 else 1
 
-    # cfl
-    dt_factor = 2.0
     # final time
     final_time = np.pi
 
@@ -205,11 +203,8 @@ def main(ctx_factory, dim=2, order=4, use_quad=False, visualize=False):
 
     # {{{ time stepping
 
-    # compute time step
-    h_min = op.h_max_from_volume(dcoll, dim=dcoll.dim)
-    dt = dt_factor * h_min/order**2
+    dt = adv_operator.estimate_rk4_timestep(dcoll, fields=u0)
     nsteps = int(final_time // dt) + 1
-    dt = final_time/nsteps + 1.0e-15
 
     logger.info("dt:        %.5e", dt)
     logger.info("nsteps:    %d", nsteps)
@@ -270,12 +265,14 @@ if __name__ == "__main__":
 
     parser = argparse.ArgumentParser()
     parser.add_argument("--dim", choices=[2, 3], default=2, type=int)
-    parser.add_argument("--use-quad", action="store_false")
+    parser.add_argument("--order", default=4, type=int)
+    parser.add_argument("--use-quad", action="store_true")
     parser.add_argument("--visualize", action="store_true")
     args = parser.parse_args()
 
     logging.basicConfig(level=logging.INFO)
     main(cl.create_some_context,
             dim=args.dim,
+            order=args.order,
             use_quad=args.use_quad,
             visualize=args.visualize)
diff --git a/examples/advection/var-velocity.py b/examples/advection/var-velocity.py
index 1d10c917d6ba03ec2922dcf6dfaa131c51a55aca..b050c60b8e9648a6ae51159346ae27eee5dfa50b 100644
--- a/examples/advection/var-velocity.py
+++ b/examples/advection/var-velocity.py
@@ -112,17 +112,9 @@ def main(ctx_factory, dim=2, order=4, use_quad=False, visualize=False):
     d = 1.0
     # number of points in each dimension
     npoints = 25
-    # grid spacing
-    h = d / npoints
 
-    # cfl
-    dt_factor = 1.0
     # finale time
     final_time = 0.5
-    # time steps
-    dt = dt_factor * h/order**2
-    nsteps = int(final_time // dt) + 1
-    dt = final_time/nsteps + 1.0e-15
 
     # flux
     flux_type = "upwind"
@@ -203,6 +195,10 @@ def main(ctx_factory, dim=2, order=4, use_quad=False, visualize=False):
     def rhs(t, u):
         return adv_operator.operator(t, u)
 
+    dt = adv_operator.estimate_rk4_timestep(dcoll, fields=u)
+
+    logger.info("Timestep size: %g", dt)
+
     # }}}
 
     # {{{ time stepping
@@ -236,10 +232,14 @@ if __name__ == "__main__":
 
     parser = argparse.ArgumentParser()
     parser.add_argument("--dim", default=2, type=int)
-    parser.add_argument("--use-quad", action="store_false")
+    parser.add_argument("--order", default=4, type=int)
+    parser.add_argument("--use-quad", action="store_true")
+    parser.add_argument("--visualize", action="store_true")
     args = parser.parse_args()
 
     logging.basicConfig(level=logging.INFO)
     main(cl.create_some_context,
-            dim=args.dim,
-            use_quad=args.use_quad)
+         dim=args.dim,
+         order=args.order,
+         use_quad=args.use_quad,
+         visualize=args.visualize)
diff --git a/examples/advection/weak.py b/examples/advection/weak.py
index 5c9ad1098033532c6782a2c1b730a5252358c2e3..9300ab01f4e9a7aa7288bf847429f49c05d5c914 100644
--- a/examples/advection/weak.py
+++ b/examples/advection/weak.py
@@ -112,21 +112,14 @@ def main(ctx_factory, dim=2, order=4, visualize=False):
     d = 1.0
     # number of points in each dimension
     npoints = 20
-    # grid spacing
-    h = d / npoints
 
-    # cfl
-    dt_factor = 2.0
     # final time
     final_time = 1.0
-    # compute number of steps
-    dt = dt_factor * h/order**2
-    nsteps = int(final_time // dt) + 1
-    dt = final_time/nsteps + 1.0e-15
 
     # velocity field
     c = np.array([0.5] * dim)
     norm_c = la.norm(c)
+
     # flux
     flux_type = "central"
 
@@ -171,6 +164,10 @@ def main(ctx_factory, dim=2, order=4, visualize=False):
     def rhs(t, u):
         return adv_operator.operator(t, u)
 
+    dt = adv_operator.estimate_rk4_timestep(dcoll, fields=u)
+
+    logger.info("Timestep size: %g", dt)
+
     # }}}
 
     # {{{ time stepping
@@ -205,8 +202,12 @@ if __name__ == "__main__":
 
     parser = argparse.ArgumentParser()
     parser.add_argument("--dim", default=2, type=int)
+    parser.add_argument("--order", default=4, type=int)
+    parser.add_argument("--visualize", action="store_true")
     args = parser.parse_args()
 
     logging.basicConfig(level=logging.INFO)
     main(cl.create_some_context,
-            dim=args.dim)
+         dim=args.dim,
+         order=args.order,
+         visualize=args.visualize)
diff --git a/examples/maxwell/cavities.py b/examples/maxwell/cavities.py
index 57006e45d5c4499d3ea81bcea3781f93a457fa80..3d68e513fdb51b5b66801d01e90bb2dfa39860e6 100644
--- a/examples/maxwell/cavities.py
+++ b/examples/maxwell/cavities.py
@@ -39,12 +39,12 @@ from grudge.models.em import get_rectangular_cavity_mode
 
 import grudge.op as op
 
+import logging
+logger = logging.getLogger(__name__)
 
-STEPS = 60
 
-
-def main(dims, write_output=False, order=4):
-    cl_ctx = cl.create_some_context()
+def main(ctx_factory, dim=3, order=4, visualize=False):
+    cl_ctx = ctx_factory()
     queue = cl.CommandQueue(cl_ctx)
     actx = PyOpenCLArrayContext(
         queue,
@@ -53,9 +53,9 @@ def main(dims, write_output=False, order=4):
 
     from meshmode.mesh.generation import generate_regular_rect_mesh
     mesh = generate_regular_rect_mesh(
-            a=(0.0,)*dims,
-            b=(1.0,)*dims,
-            nelements_per_axis=(4,)*dims)
+            a=(0.0,)*dim,
+            b=(1.0,)*dim,
+            nelements_per_axis=(4,)*dim)
 
     dcoll = DiscretizationCollection(actx, mesh, order=order)
 
@@ -75,36 +75,31 @@ def main(dims, write_output=False, order=4):
         epsilon,
         mu,
         flux_type=0.5,
-        dimensions=dims
+        dimensions=dim
     )
 
     def cavity_mode(x, t=0):
-        if dims == 3:
+        if dim == 3:
             return get_rectangular_cavity_mode(actx, x, t, 1, (1, 2, 2))
         else:
             return get_rectangular_cavity_mode(actx, x, t, 1, (2, 3))
 
     fields = cavity_mode(thaw(op.nodes(dcoll), actx), t=0)
 
-    # FIXME
-    # dt = maxwell_operator.estimate_rk4_timestep(dcoll, fields=fields)
-
     maxwell_operator.check_bc_coverage(mesh)
 
     def rhs(t, w):
         return maxwell_operator.operator(t, w)
 
-    if mesh.dim == 2:
-        dt = 0.004
-    elif mesh.dim == 3:
-        dt = 0.002
+    dt = maxwell_operator.estimate_rk4_timestep(dcoll, fields=fields)
 
     dt_stepper = set_up_rk4("w", dt, fields, rhs)
 
-    final_t = dt * STEPS
-    nsteps = int(final_t/dt)
+    target_steps = 60
+    final_t = dt * target_steps
+    nsteps = int(final_t/dt) + 1
 
-    print("dt=%g nsteps=%d" % (dt, nsteps))
+    logger.info("dt = %g nsteps = %d", dt, nsteps)
 
     from grudge.shortcuts import make_visualizer
     vis = make_visualizer(dcoll)
@@ -114,12 +109,9 @@ def main(dims, write_output=False, order=4):
     def norm(u):
         return op.norm(dcoll, u, 2)
 
-    from time import time
-    t_last_step = time()
-
     e, h = maxwell_operator.split_eh(fields)
 
-    if write_output:
+    if visualize:
         vis.write_vtk_file(
             f"fld-cavities-{step:04d}.vtu",
             [
@@ -133,17 +125,21 @@ def main(dims, write_output=False, order=4):
             assert event.component_id == "w"
 
             step += 1
+            e, h = maxwell_operator.split_eh(event.state_component)
 
             norm_e0 = norm(u=e[0])
             norm_e1 = norm(u=e[1])
             norm_h0 = norm(u=h[0])
             norm_h1 = norm(u=h[1])
-            print(step, event.t,
-                  norm_e0, norm_e1, norm_h0, norm_h1,
-                  time()-t_last_step)
+
+            logger.info(
+                "[%04d] t = %.5f |e0| = %.5e, |e1| = %.5e, |h0| = %.5e, |h1| = %.5e",
+                step, event.t,
+                norm_e0, norm_e1, norm_h0, norm_h1
+            )
+
             if step % 10 == 0:
-                if write_output:
-                    e, h = maxwell_operator.split_eh(event.state_component)
+                if visualize:
                     vis.write_vtk_file(
                         f"fld-cavities-{step:04d}.vtu",
                         [
@@ -151,7 +147,6 @@ def main(dims, write_output=False, order=4):
                             ("h", h),
                         ]
                     )
-            t_last_step = time()
 
             # NOTE: These are here to ensure the solution is bounded for the
             # time interval specified
@@ -162,4 +157,16 @@ def main(dims, write_output=False, order=4):
 
 
 if __name__ == "__main__":
-    main(3)
+    import argparse
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--dim", default=3, type=int)
+    parser.add_argument("--order", default=4, type=int)
+    parser.add_argument("--visualize", action="store_true")
+    args = parser.parse_args()
+
+    logging.basicConfig(level=logging.INFO)
+    main(cl.create_some_context,
+         dim=args.dim,
+         order=args.order,
+         visualize=args.visualize)
diff --git a/examples/wave/var-propagation-speed.py b/examples/wave/var-propagation-speed.py
index a1a211c923e1726ae80fcaee6cc584014585fb8f..0b8d51b9bdfa0388612366c1d95cedcc931eedb2 100644
--- a/examples/wave/var-propagation-speed.py
+++ b/examples/wave/var-propagation-speed.py
@@ -39,21 +39,23 @@ from pytools.obj_array import flat_obj_array
 
 import grudge.op as op
 
+import logging
+logger = logging.getLogger(__name__)
 
-def main(write_output=False, order=4):
-    cl_ctx = cl.create_some_context()
+
+def main(ctx_factory, dim=2, order=4, visualize=False):
+    cl_ctx = ctx_factory()
     queue = cl.CommandQueue(cl_ctx)
     actx = PyOpenCLArrayContext(
         queue,
         allocator=cl_tools.MemoryPool(cl_tools.ImmediateAllocator(queue))
     )
 
-    dims = 2
     from meshmode.mesh.generation import generate_regular_rect_mesh
     mesh = generate_regular_rect_mesh(
-            a=(-0.5,)*dims,
-            b=(0.5,)*dims,
-            nelements_per_axis=(20,)*dims)
+            a=(-0.5,)*dim,
+            b=(0.5,)*dim,
+            nelements_per_axis=(20,)*dim)
 
     dcoll = DiscretizationCollection(actx, mesh, order=order)
 
@@ -100,16 +102,15 @@ def main(write_output=False, order=4):
     def rhs(t, w):
         return wave_op.operator(t, w)
 
-    if mesh.dim == 2:
-        dt = 0.04 * 0.3
-    elif mesh.dim == 3:
-        dt = 0.02 * 0.1
+    dt_scaling_const = 2/3
+    dt = dt_scaling_const * wave_op.estimate_rk4_timestep(dcoll, fields=fields)
 
     dt_stepper = set_up_rk4("w", dt, fields, rhs)
 
     final_t = 1
-    nsteps = int(final_t/dt)
-    print("dt=%g nsteps=%d" % (dt, nsteps))
+    nsteps = int(final_t/dt) + 1
+
+    logger.info("dt=%g nsteps=%d", dt, nsteps)
 
     from grudge.shortcuts import make_visualizer
     vis = make_visualizer(dcoll)
@@ -122,7 +123,7 @@ def main(write_output=False, order=4):
     from time import time
     t_last_step = time()
 
-    if write_output:
+    if visualize:
         u = fields[0]
         v = fields[1:]
         vis.write_vtk_file(
@@ -141,9 +142,9 @@ def main(write_output=False, order=4):
             step += 1
 
             if step % 10 == 0:
-                print(f"step: {step} t: {time()-t_last_step} "
-                      f"L2: {norm(u=event.state_component[0])}")
-                if write_output:
+                logger.info(f"step: {step} t: {time()-t_last_step} "
+                            f"L2: {norm(u=event.state_component[0])}")
+                if visualize:
                     vis.write_vtk_file(
                         f"fld-var-propogation-speed-{step:04d}.vtu",
                         [
@@ -160,4 +161,16 @@ def main(write_output=False, order=4):
 
 
 if __name__ == "__main__":
-    main()
+    import argparse
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--dim", default=2, type=int)
+    parser.add_argument("--order", default=4, type=int)
+    parser.add_argument("--visualize", action="store_true")
+    args = parser.parse_args()
+
+    logging.basicConfig(level=logging.INFO)
+    main(cl.create_some_context,
+         dim=args.dim,
+         order=args.order,
+         visualize=args.visualize)
diff --git a/examples/wave/wave-min-mpi.py b/examples/wave/wave-min-mpi.py
index 1fb3eb9eaafb19bf1e576b9408985a609f8db467..a3e75c7b525628bcaa6bd861a1009ec9cd79d985 100644
--- a/examples/wave/wave-min-mpi.py
+++ b/examples/wave/wave-min-mpi.py
@@ -41,8 +41,11 @@ from pytools.obj_array import flat_obj_array
 
 import grudge.op as op
 
+import logging
+logger = logging.getLogger(__name__)
 
-def main(write_output=False, order=4):
+
+def main(ctx_factory, dim=2, order=4, visualize=False):
     cl_ctx = cl.create_some_context()
     queue = cl.CommandQueue(cl_ctx)
     actx = PyOpenCLArrayContext(
@@ -57,14 +60,13 @@ def main(write_output=False, order=4):
     mesh_dist = MPIMeshDistributor(comm)
 
     if mesh_dist.is_mananger_rank():
-        dims = 2
         from meshmode.mesh.generation import generate_regular_rect_mesh
         mesh = generate_regular_rect_mesh(
-                a=(-0.5,)*dims,
-                b=(0.5,)*dims,
-                nelements_per_axis=(16,)*dims)
+                a=(-0.5,)*dim,
+                b=(0.5,)*dim,
+                nelements_per_axis=(16,)*dim)
 
-        print("%d elements" % mesh.nelements)
+        logger.info("%d elements", mesh.nelements)
 
         part_per_element = get_partition_by_pymetis(mesh, num_parts)
 
@@ -78,11 +80,6 @@ def main(write_output=False, order=4):
     dcoll = DiscretizationCollection(actx, local_mesh, order=order,
             mpi_communicator=comm)
 
-    if local_mesh.dim == 2:
-        dt = 0.04
-    elif local_mesh.dim == 3:
-        dt = 0.02
-
     def source_f(actx, dcoll, t=0):
         source_center = np.array([0.1, 0.22, 0.33])[:dcoll.dim]
         source_width = 0.05
@@ -117,8 +114,8 @@ def main(write_output=False, order=4):
         [dcoll.zeros(actx) for i in range(dcoll.dim)]
     )
 
-    # FIXME
-    # dt = wave_op.estimate_rk4_timestep(dcoll, fields=fields)
+    dt_scaling_const = 2/3
+    dt = dt_scaling_const * wave_op.estimate_rk4_timestep(dcoll, fields=fields)
 
     wave_op.check_bc_coverage(local_mesh)
 
@@ -128,10 +125,10 @@ def main(write_output=False, order=4):
     dt_stepper = set_up_rk4("w", dt, fields, rhs)
 
     final_t = 10
-    nsteps = int(final_t/dt)
+    nsteps = int(final_t/dt) + 1
 
     if comm.rank == 0:
-        print("dt=%g nsteps=%d" % (dt, nsteps))
+        logger.info("dt=%g nsteps=%d", dt, nsteps)
 
     from grudge.shortcuts import make_visualizer
     vis = make_visualizer(dcoll)
@@ -144,7 +141,7 @@ def main(write_output=False, order=4):
     from time import time
     t_last_step = time()
 
-    if write_output:
+    if visualize:
         u = fields[0]
         v = fields[1:]
         vis.write_parallel_vtk_file(
@@ -163,9 +160,10 @@ def main(write_output=False, order=4):
             step += 1
 
             if step % 10 == 0:
-                print(f"step: {step} t: {time()-t_last_step} "
-                        f"L2: {norm(u=event.state_component[0])}")
-                if write_output:
+                logger.info(f"rank: {comm.rank} step: {step} "
+                            f"t: {time()-t_last_step} "
+                            f"L2: {norm(u=event.state_component[0])}")
+                if visualize:
                     vis.write_parallel_vtk_file(
                         comm,
                         f"fld-wave-min-mpi-{{rank:03d}}-{step:04d}.vtu",
@@ -182,4 +180,16 @@ def main(write_output=False, order=4):
 
 
 if __name__ == "__main__":
-    main()
+    import argparse
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--dim", default=2, type=int)
+    parser.add_argument("--order", default=4, type=int)
+    parser.add_argument("--visualize", action="store_true")
+    args = parser.parse_args()
+
+    logging.basicConfig(level=logging.INFO)
+    main(cl.create_some_context,
+         dim=args.dim,
+         order=args.order,
+         visualize=args.visualize)
diff --git a/examples/wave/wave-min.py b/examples/wave/wave-min.py
index a9947483b8bb8918c31972ebb519c9d9fedf613e..7b18d3dfaec68ab38af936c45f395bd10ad29a75 100644
--- a/examples/wave/wave-min.py
+++ b/examples/wave/wave-min.py
@@ -39,28 +39,25 @@ from pytools.obj_array import flat_obj_array
 
 import grudge.op as op
 
+import logging
+logger = logging.getLogger(__name__)
 
-def main(write_output=False, order=4):
-    cl_ctx = cl.create_some_context()
+
+def main(ctx_factory, dim=2, order=4, visualize=False):
+    cl_ctx = ctx_factory()
     queue = cl.CommandQueue(cl_ctx)
     actx = PyOpenCLArrayContext(
         queue,
         allocator=cl_tools.MemoryPool(cl_tools.ImmediateAllocator(queue))
     )
 
-    dims = 2
     from meshmode.mesh.generation import generate_regular_rect_mesh
     mesh = generate_regular_rect_mesh(
-            a=(-0.5,)*dims,
-            b=(0.5,)*dims,
-            nelements_per_axis=(16,)*dims)
-
-    if mesh.dim == 2:
-        dt = 0.04
-    elif mesh.dim == 3:
-        dt = 0.02
+            a=(-0.5,)*dim,
+            b=(0.5,)*dim,
+            nelements_per_axis=(16,)*dim)
 
-    print("%d elements" % mesh.nelements)
+    logger.info("%d elements", mesh.nelements)
 
     dcoll = DiscretizationCollection(actx, mesh, order=order)
 
@@ -98,8 +95,8 @@ def main(write_output=False, order=4):
         [dcoll.zeros(actx) for i in range(dcoll.dim)]
     )
 
-    # FIXME
-    # dt = wave_op.estimate_rk4_timestep(dcoll, fields=fields)
+    dt_scaling_const = 2/3
+    dt = dt_scaling_const * wave_op.estimate_rk4_timestep(dcoll, fields=fields)
 
     wave_op.check_bc_coverage(mesh)
 
@@ -109,8 +106,9 @@ def main(write_output=False, order=4):
     dt_stepper = set_up_rk4("w", dt, fields, rhs)
 
     final_t = 10
-    nsteps = int(final_t/dt)
-    print("dt=%g nsteps=%d" % (dt, nsteps))
+    nsteps = int(final_t/dt) + 1
+
+    logger.info("dt=%g nsteps=%d", dt, nsteps)
 
     from grudge.shortcuts import make_visualizer
     vis = make_visualizer(dcoll)
@@ -123,7 +121,7 @@ def main(write_output=False, order=4):
     from time import time
     t_last_step = time()
 
-    if write_output:
+    if visualize:
         u = fields[0]
         v = fields[1:]
         vis.write_vtk_file(
@@ -141,9 +139,9 @@ def main(write_output=False, order=4):
             step += 1
 
             if step % 10 == 0:
-                print(f"step: {step} t: {time()-t_last_step} "
-                      f"L2: {norm(u=event.state_component[0])}")
-                if write_output:
+                logger.info(f"step: {step} t: {time()-t_last_step} "
+                            f"L2: {norm(u=event.state_component[0])}")
+                if visualize:
                     vis.write_vtk_file(
                         f"fld-wave-min-{step:04d}.vtu",
                         [
@@ -159,4 +157,16 @@ def main(write_output=False, order=4):
 
 
 if __name__ == "__main__":
-    main()
+    import argparse
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--dim", default=2, type=int)
+    parser.add_argument("--order", default=4, type=int)
+    parser.add_argument("--visualize", action="store_true")
+    args = parser.parse_args()
+
+    logging.basicConfig(level=logging.INFO)
+    main(cl.create_some_context,
+         dim=args.dim,
+         order=args.order,
+         visualize=args.visualize)
diff --git a/examples/wave/wave-op-mpi.py b/examples/wave/wave-op-mpi.py
index 55253d17e04a723cbcec75e39b54856e46967aa5..3fb49d7d1b65eb8d9caa5ac6a5d68c79625c5a94 100644
--- a/examples/wave/wave-op-mpi.py
+++ b/examples/wave/wave-op-mpi.py
@@ -42,6 +42,9 @@ from grudge.shortcuts import make_visualizer
 
 import grudge.op as op
 
+import logging
+logger = logging.getLogger(__name__)
+
 from mpi4py import MPI
 
 
@@ -111,6 +114,23 @@ def rk4_step(y, t, h, f):
     return y + h/6*(k1 + 2*k2 + 2*k3 + k4)
 
 
+def estimate_rk4_timestep(dcoll, c):
+    from grudge.dt_utils import (dt_non_geometric_factor,
+                                 dt_geometric_factors)
+
+    # FIXME: We should make the nodal reductions respect MPI.
+    # See: https://github.com/inducer/grudge/issues/112 and
+    # https://github.com/inducer/grudge/issues/113
+    dt_factor = (dt_non_geometric_factor(dcoll)
+                 * op.nodal_min(dcoll, "vol", dt_geometric_factors(dcoll)))
+
+    mpi_comm = dcoll.mpi_communicator
+    if mpi_comm is None:
+        return dt_factor * (1 / c)
+
+    return (1 / c) * mpi_comm.allreduce(dt_factor, op=MPI.MIN)
+
+
 def bump(actx, dcoll, t=0):
     source_center = np.array([0.2, 0.35, 0.1])[:dcoll.dim]
     source_width = 0.05
@@ -129,8 +149,8 @@ def bump(actx, dcoll, t=0):
             / source_width**2))
 
 
-def main(write_output=False):
-    cl_ctx = cl.create_some_context()
+def main(ctx_factory, dim=2, order=3, visualize=False):
+    cl_ctx = ctx_factory()
     queue = cl.CommandQueue(cl_ctx)
     actx = PyOpenCLArrayContext(
         queue,
@@ -143,7 +163,6 @@ def main(write_output=False):
     from meshmode.distributed import MPIMeshDistributor, get_partition_by_pymetis
     mesh_dist = MPIMeshDistributor(comm)
 
-    dim = 2
     nel_1d = 16
 
     if mesh_dist.is_mananger_rank():
@@ -153,7 +172,7 @@ def main(write_output=False):
                 b=(0.5,)*dim,
                 nelements_per_axis=(nel_1d,)*dim)
 
-        print("%d elements" % mesh.nelements)
+        logger.info("%d elements", mesh.nelements)
 
         part_per_element = get_partition_by_pymetis(mesh, num_parts)
 
@@ -164,29 +183,24 @@ def main(write_output=False):
     else:
         local_mesh = mesh_dist.receive_mesh_part()
 
-    order = 3
-
     dcoll = DiscretizationCollection(actx, local_mesh, order=order,
                     mpi_communicator=comm)
 
-    if dim == 2:
-        # no deep meaning here, just a fudge factor
-        dt = 0.7/(nel_1d*order**2)
-    elif dim == 3:
-        # no deep meaning here, just a fudge factor
-        dt = 0.45/(nel_1d*order**2)
-    else:
-        raise ValueError("don't have a stable time step guesstimate")
-
     fields = flat_obj_array(
             bump(actx, dcoll),
             [dcoll.zeros(actx) for i in range(dcoll.dim)]
             )
 
+    c = 1
+    dt_scaling_const = 0.45
+    dt = dt_scaling_const * estimate_rk4_timestep(dcoll, c)
+
     vis = make_visualizer(dcoll)
 
     def rhs(t, w):
-        return wave_operator(dcoll, c=1, w=w)
+        return wave_operator(dcoll, c=c, w=w)
+
+    logger.info("dt = %g", dt)
 
     t = 0
     t_final = 3
@@ -195,13 +209,13 @@ def main(write_output=False):
         fields = rk4_step(fields, t, dt, rhs)
 
         if istep % 10 == 0:
-            if comm.rank == 0:
-                print(f"step: {istep} t: {t} "
-                      f"L2: {op.norm(dcoll, fields[0], 2)} "
-                      f"Linf: {op.norm(dcoll, fields[0], np.inf)} "
-                      f"sol max: {op.nodal_max(dcoll, 'vol', fields[0])} "
-                      f"sol min: {op.nodal_min(dcoll, 'vol', fields[0])}")
-            if write_output:
+            logger.info(f"rank: {comm.rank} "
+                        f"step: {istep} t: {t} "
+                        f"L2: {op.norm(dcoll, fields[0], 2)} "
+                        f"Linf: {op.norm(dcoll, fields[0], np.inf)} "
+                        f"sol max: {op.nodal_max(dcoll, 'vol', fields[0])} "
+                        f"sol min: {op.nodal_min(dcoll, 'vol', fields[0])}")
+            if visualize:
                 vis.write_parallel_vtk_file(
                     comm,
                     f"fld-wave-eager-mpi-{{rank:03d}}-{istep:04d}.vtu",
@@ -220,6 +234,18 @@ def main(write_output=False):
 
 
 if __name__ == "__main__":
-    main()
+    import argparse
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--dim", default=2, type=int)
+    parser.add_argument("--order", default=3, type=int)
+    parser.add_argument("--visualize", action="store_true")
+    args = parser.parse_args()
+
+    logging.basicConfig(level=logging.INFO)
+    main(cl.create_some_context,
+         dim=args.dim,
+         order=args.order,
+         visualize=args.visualize)
 
 # vim: foldmethod=marker
diff --git a/examples/wave/wave-op-var-velocity.py b/examples/wave/wave-op-var-velocity.py
index 7f842b809484444b64afe2b1aa7d0262e62dfcf5..e48c8d7141285b40d0a0935c032449e7bbe5a2a4 100644
--- a/examples/wave/wave-op-var-velocity.py
+++ b/examples/wave/wave-op-var-velocity.py
@@ -43,6 +43,9 @@ from grudge.shortcuts import make_visualizer
 
 import grudge.op as op
 
+import logging
+logger = logging.getLogger(__name__)
+
 
 # {{{ wave equation bits
 
@@ -126,6 +129,16 @@ def rk4_step(y, t, h, f):
     return y + h/6*(k1 + 2*k2 + 2*k3 + k4)
 
 
+def estimate_rk4_timestep(dcoll, c):
+    from grudge.dt_utils import (dt_non_geometric_factor,
+                                 dt_geometric_factors)
+
+    dt_factor = (dt_non_geometric_factor(dcoll)
+                 * op.nodal_min(dcoll, "vol", dt_geometric_factors(dcoll)))
+
+    return dt_factor * (1 / c)
+
+
 def bump(actx, dcoll, t=0, width=0.05, center=None):
     if center is None:
         center = np.array([0.2, 0.35, 0.1])
@@ -146,15 +159,14 @@ def bump(actx, dcoll, t=0, width=0.05, center=None):
             / width**2))
 
 
-def main(write_output=False):
-    cl_ctx = cl.create_some_context()
+def main(ctx_factory, dim=2, order=3, visualize=False):
+    cl_ctx = ctx_factory()
     queue = cl.CommandQueue(cl_ctx)
     actx = PyOpenCLArrayContext(
         queue,
         allocator=cl_tools.MemoryPool(cl_tools.ImmediateAllocator(queue))
     )
 
-    dim = 2
     nel_1d = 16
     from meshmode.mesh.generation import generate_regular_rect_mesh
     mesh = generate_regular_rect_mesh(
@@ -162,18 +174,7 @@ def main(write_output=False):
             b=(0.5,)*dim,
             nelements_per_axis=(nel_1d,)*dim)
 
-    order = 3
-
-    if dim == 2:
-        # no deep meaning here, just a fudge factor
-        dt = 0.75/(nel_1d*order**2)
-    elif dim == 3:
-        # no deep meaning here, just a fudge factor
-        dt = 0.45/(nel_1d*order**2)
-    else:
-        raise ValueError("don't have a stable time step guesstimate")
-
-    print("%d elements" % mesh.nelements)
+    logger.info("%d elements", mesh.nelements)
 
     from meshmode.discretization.poly_element import \
             QuadratureSimplexGroupFactory, \
@@ -188,6 +189,8 @@ def main(write_output=False):
 
     # bounded above by 1
     c = 0.2 + 0.8*bump(actx, dcoll, center=np.zeros(3), width=0.5)
+    dt_scaling_const = 0.5
+    dt = dt_scaling_const * estimate_rk4_timestep(dcoll, c=1)
 
     fields = flat_obj_array(
             bump(actx, dcoll, ),
@@ -199,6 +202,8 @@ def main(write_output=False):
     def rhs(t, w):
         return wave_operator(dcoll, c=c, w=w)
 
+    logger.info("dt = %g", dt)
+
     t = 0
     t_final = 3
     istep = 0
@@ -206,12 +211,12 @@ def main(write_output=False):
         fields = rk4_step(fields, t, dt, rhs)
 
         if istep % 10 == 0:
-            print(f"step: {istep} t: {t} "
-                  f"L2: {op.norm(dcoll, fields[0], 2)} "
-                  f"Linf: {op.norm(dcoll, fields[0], np.inf)} "
-                  f"sol max: {op.nodal_max(dcoll, 'vol', fields[0])} "
-                  f"sol min: {op.nodal_min(dcoll, 'vol', fields[0])}")
-            if write_output:
+            logger.info(f"step: {istep} t: {t} "
+                        f"L2: {op.norm(dcoll, fields[0], 2)} "
+                        f"Linf: {op.norm(dcoll, fields[0], np.inf)} "
+                        f"sol max: {op.nodal_max(dcoll, 'vol', fields[0])} "
+                        f"sol min: {op.nodal_min(dcoll, 'vol', fields[0])}")
+            if visualize:
                 vis.write_vtk_file(
                     f"fld-wave-eager-var-velocity-{istep:04d}.vtu",
                     [
@@ -230,6 +235,18 @@ def main(write_output=False):
 
 
 if __name__ == "__main__":
-    main()
+    import argparse
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--dim", default=2, type=int)
+    parser.add_argument("--order", default=3, type=int)
+    parser.add_argument("--visualize", action="store_true")
+    args = parser.parse_args()
+
+    logging.basicConfig(level=logging.INFO)
+    main(cl.create_some_context,
+         dim=args.dim,
+         order=args.order,
+         visualize=args.visualize)
 
 # vim: foldmethod=marker
diff --git a/examples/wave/wave-op.py b/examples/wave/wave-op.py
index 0a669a2270b31202f398b487e03ad96d25fe6f26..c0494d9f0196da5cb7d951a6950f82250a2e9aa6 100644
--- a/examples/wave/wave-op.py
+++ b/examples/wave/wave-op.py
@@ -42,6 +42,9 @@ from grudge.shortcuts import make_visualizer
 
 import grudge.op as op
 
+import logging
+logger = logging.getLogger(__name__)
+
 
 # {{{ wave equation bits
 
@@ -109,6 +112,16 @@ def rk4_step(y, t, h, f):
     return y + h/6*(k1 + 2*k2 + 2*k3 + k4)
 
 
+def estimate_rk4_timestep(dcoll, c):
+    from grudge.dt_utils import (dt_non_geometric_factor,
+                                 dt_geometric_factors)
+
+    dt_factor = (dt_non_geometric_factor(dcoll)
+                 * op.nodal_min(dcoll, "vol", dt_geometric_factors(dcoll)))
+
+    return dt_factor * (1 / c)
+
+
 def bump(actx, dcoll, t=0):
     source_center = np.array([0.2, 0.35, 0.1])[:dcoll.dim]
     source_width = 0.05
@@ -127,15 +140,14 @@ def bump(actx, dcoll, t=0):
             / source_width**2))
 
 
-def main(write_output=False):
-    cl_ctx = cl.create_some_context()
+def main(ctx_factory, dim=2, order=3, visualize=False):
+    cl_ctx = ctx_factory()
     queue = cl.CommandQueue(cl_ctx)
     actx = PyOpenCLArrayContext(
         queue,
         allocator=cl_tools.MemoryPool(cl_tools.ImmediateAllocator(queue))
     )
 
-    dim = 2
     nel_1d = 16
     from meshmode.mesh.generation import generate_regular_rect_mesh
     mesh = generate_regular_rect_mesh(
@@ -143,18 +155,7 @@ def main(write_output=False):
             b=(0.5,)*dim,
             nelements_per_axis=(nel_1d,)*dim)
 
-    order = 3
-
-    if dim == 2:
-        # no deep meaning here, just a fudge factor
-        dt = 0.7/(nel_1d*order**2)
-    elif dim == 3:
-        # no deep meaning here, just a fudge factor
-        dt = 0.45/(nel_1d*order**2)
-    else:
-        raise ValueError("don't have a stable time step guesstimate")
-
-    print("%d elements" % mesh.nelements)
+    logger.info("%d elements", mesh.nelements)
 
     dcoll = DiscretizationCollection(actx, mesh, order=order)
 
@@ -163,10 +164,16 @@ def main(write_output=False):
             [dcoll.zeros(actx) for i in range(dcoll.dim)]
             )
 
+    c = 1
+    dt_scaling_const = 0.45
+    dt = dt_scaling_const * estimate_rk4_timestep(dcoll, c)
+
     vis = make_visualizer(dcoll)
 
     def rhs(t, w):
-        return wave_operator(dcoll, c=1, w=w)
+        return wave_operator(dcoll, c=c, w=w)
+
+    logger.info("dt = %g", dt)
 
     t = 0
     t_final = 3
@@ -175,12 +182,12 @@ def main(write_output=False):
         fields = rk4_step(fields, t, dt, rhs)
 
         if istep % 10 == 0:
-            print(f"step: {istep} t: {t} "
-                  f"L2: {op.norm(dcoll, fields[0], 2)} "
-                  f"Linf: {op.norm(dcoll, fields[0], np.inf)} "
-                  f"sol max: {op.nodal_max(dcoll, 'vol', fields[0])} "
-                  f"sol min: {op.nodal_min(dcoll, 'vol', fields[0])}")
-            if write_output:
+            logger.info(f"step: {istep} t: {t} "
+                        f"L2: {op.norm(dcoll, fields[0], 2)} "
+                        f"Linf: {op.norm(dcoll, fields[0], np.inf)} "
+                        f"sol max: {op.nodal_max(dcoll, 'vol', fields[0])} "
+                        f"sol min: {op.nodal_min(dcoll, 'vol', fields[0])}")
+            if visualize:
                 vis.write_vtk_file(
                     f"fld-wave-eager-{istep:04d}.vtu",
                     [
@@ -198,6 +205,18 @@ def main(write_output=False):
 
 
 if __name__ == "__main__":
-    main()
+    import argparse
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--dim", default=2, type=int)
+    parser.add_argument("--order", default=3, type=int)
+    parser.add_argument("--visualize", action="store_true")
+    args = parser.parse_args()
+
+    logging.basicConfig(level=logging.INFO)
+    main(cl.create_some_context,
+         dim=args.dim,
+         order=args.order,
+         visualize=args.visualize)
 
 # vim: foldmethod=marker
diff --git a/grudge/dt_finding.py b/grudge/dt_finding.py
deleted file mode 100644
index fd5a3d8e2ddcc2d13170182656eb59d40a23b798..0000000000000000000000000000000000000000
--- a/grudge/dt_finding.py
+++ /dev/null
@@ -1,92 +0,0 @@
-"""Helpers for estimating a stable time step."""
-
-__copyright__ = "Copyright (C) 2015 Andreas Kloeckner"
-
-__license__ = """
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
-"""
-
-from pytools import memoize_on_first_arg
-from meshmode.discretization.poly_element import PolynomialWarpAndBlendElementGroup
-import numpy.linalg as la
-
-
-class WarpAndBlendTimestepInfo:
-    @staticmethod
-    def dt_non_geometric_factor(discr, grp):
-        if grp.dim == 1:
-            if grp.order == 0:
-                return 1
-            else:
-                unodes = grp.unit_nodes
-                return la.norm(unodes[0] - unodes[1]) * 0.85
-        else:
-            unodes = grp.unit_nodes
-            vertex_indices = grp.vertex_indices()
-            return 2 / 3 * \
-                    min(min(min(
-                        la.norm(unodes[face_node_index]-unodes[vertex_index])
-                        for vertex_index in vertex_indices
-                        if vertex_index != face_node_index)
-                        for face_node_index in face_indices)
-                        for face_indices in self.face_indices())
-
-    @staticmethod
-    def dt_geometric_factor(discr, grp):
-        if grp.dim == 1:
-            return abs(el.map.jacobian())
-
-        elif grp.dim == 2:
-            area = abs(2 * el.map.jacobian())
-            semiperimeter = sum(la.norm(vertices[vi1] - vertices[vi2])
-                    for vi1, vi2 in [(0, 1), (1, 2), (2, 0)])/2
-            return area / semiperimeter
-
-        elif grp.dim == 3:
-            result = abs(el.map.jacobian())/max(abs(fj) for fj in el.face_jacobians)
-            if grp.order in [1, 2]:
-                from warnings import warn
-                warn("cowardly halving timestep for order 1 and 2 tets "
-                        "to avoid CFL issues")
-                result /= 2
-
-            return result
-
-        else:
-            raise NotImplementedError("cannot estimate timestep for "
-                    "%d-dimensional elements" % grp.dim)
-
-
-_GROUP_TYPE_TO_INFO = {
-        PolynomialWarpAndBlendElementGroup: WarpAndBlendTimestepInfo
-        }
-
-
-@memoize_on_first_arg
-def dt_non_geometric_factor(discr):
-    return min(
-            _GROUP_TYPE_TO_INFO[type(grp)].dt_non_geometric_factor(discr, grp)
-            for grp in discr.groups)
-
-
-@memoize_on_first_arg
-def dt_geometric_factor(discr):
-    return min(
-            _GROUP_TYPE_TO_INFO[type(grp)].dt_geometric_factor(discr, grp)
-            for grp in discr.groups)
diff --git a/grudge/dt_utils.py b/grudge/dt_utils.py
new file mode 100644
index 0000000000000000000000000000000000000000..14c955595fc5ca41ddcd707066e0238865b273e5
--- /dev/null
+++ b/grudge/dt_utils.py
@@ -0,0 +1,177 @@
+"""Helper functions for estimating stable time steps for RKDG methods.
+
+.. autofunction:: dt_non_geometric_factor
+.. autofunction:: dt_geometric_factors
+"""
+
+__copyright__ = """
+Copyright (C) 2021 University of Illinois Board of Trustees
+"""
+
+__license__ = """
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+"""
+
+
+import numpy as np
+
+from arraycontext import FirstAxisIsElementsTag
+
+from grudge.dof_desc import DD_VOLUME, DOFDesc
+from grudge.discretization import DiscretizationCollection
+
+import grudge.op as op
+
+from meshmode.dof_array import DOFArray
+
+from pytools import memoize_on_first_arg
+
+
+@memoize_on_first_arg
+def dt_non_geometric_factor(dcoll: DiscretizationCollection, dd=None) -> float:
+    r"""Computes the non-geometric scale factor following [Hesthaven_2008]_,
+    section 6.4:
+
+    .. math::
+
+        c_{ng} = \operatorname{min}\left( \Delta r_i \right),
+
+    where :math:`\Delta r_i` denotes the distance between two distinct
+    nodes on the reference element.
+
+    :arg dd: a :class:`~grudge.dof_desc.DOFDesc`, or a value convertible to one.
+        Defaults to the base volume discretization if not provided.
+    :returns: a :class:`float` denoting the minimum node distance on the
+        reference element.
+    """
+    if dd is None:
+        dd = DD_VOLUME
+
+    discr = dcoll.discr_from_dd(dd)
+    min_delta_rs = []
+    for grp in discr.groups:
+        nodes = np.asarray(list(zip(*grp.unit_nodes)))
+        nnodes = grp.nunit_dofs
+
+        # NOTE: order 0 elements have 1 node located at the centroid of
+        # the reference element and is equidistant from each vertex
+        if grp.order == 0:
+            assert nnodes == 1
+            min_delta_rs.append(
+                np.linalg.norm(
+                    nodes[0] - grp.mesh_el_group.vertex_unit_coordinates()[0]
+                )
+            )
+        else:
+            min_delta_rs.append(
+                min(
+                    np.linalg.norm(nodes[i] - nodes[j])
+                    for i in range(nnodes) for j in range(nnodes) if i != j
+                )
+            )
+
+    # Return minimum over all element groups in the discretization
+    return min(min_delta_rs)
+
+
+@memoize_on_first_arg
+def dt_geometric_factors(
+        dcoll: DiscretizationCollection, dd=None) -> DOFArray:
+    r"""Computes a geometric scaling factor for each cell following [Hesthaven_2008]_,
+    section 6.4, defined as the inradius (radius of an inscribed circle/sphere).
+
+    Specifically, the inradius for each element is computed using the following
+    formula from [Shewchuk_2002]_, Table 1, for simplicial cells
+    (triangles/tetrahedra):
+
+    .. math::
+
+        r_D = \frac{d V}{\sum_{i=1}^{N_{faces}} F_i},
+
+    where :math:`d` is the topological dimension, :math:`V` is the cell volume,
+    and :math:`F_i` are the areas of each face of the cell.
+
+    :arg dd: a :class:`~grudge.dof_desc.DOFDesc`, or a value convertible to one.
+        Defaults to the base volume discretization if not provided.
+    :returns: a :class:`~meshmode.dof_array.DOFArray` containing the geometric
+        factors for each cell and at each nodal location.
+    """
+    from meshmode.discretization.poly_element import SimplexElementGroupBase
+
+    if dd is None:
+        dd = DD_VOLUME
+
+    actx = dcoll._setup_actx
+    volm_discr = dcoll.discr_from_dd(dd)
+
+    if any(not isinstance(grp, SimplexElementGroupBase)
+           for grp in volm_discr.groups):
+        raise NotImplementedError(
+            "Geometric factors are only implemented for simplex element groups"
+        )
+
+    cell_vols = abs(
+        op.elementwise_integral(
+            dcoll, dd, volm_discr.zeros(actx) + 1.0
+        )
+    )
+
+    if dcoll.dim == 1:
+        return cell_vols
+
+    dd_face = DOFDesc("all_faces", dd.discretization_tag)
+    face_discr = dcoll.discr_from_dd(dd_face)
+
+    # To get a single value for the total surface area of a cell, we
+    # take the sum over the averaged face areas on each face.
+    # NOTE: The face areas are the *same* at each face nodal location.
+    # This assumes there are the *same* number of face nodes on each face.
+    surface_areas = abs(
+        op.elementwise_integral(
+            dcoll, dd_face, face_discr.zeros(actx) + 1.0
+        )
+    )
+    surface_areas = DOFArray(
+        actx,
+        data=tuple(
+            actx.einsum("fej->e",
+                        face_ae_i.reshape(
+                            vgrp.mesh_el_group.nfaces,
+                            vgrp.nelements,
+                            afgrp.nunit_dofs
+                        ),
+                        tagged=(FirstAxisIsElementsTag(),)) / afgrp.nunit_dofs
+
+            for vgrp, afgrp, face_ae_i in zip(volm_discr.groups,
+                                              face_discr.groups,
+                                              surface_areas)
+        )
+    )
+
+    return DOFArray(
+        actx,
+        data=tuple(
+            actx.einsum("e,ei->ei",
+                        1/sae_i,
+                        cv_i,
+                        tagged=(FirstAxisIsElementsTag(),)) * dcoll.dim
+
+            for cv_i, sae_i in zip(cell_vols, surface_areas)
+        )
+    )
diff --git a/grudge/models/__init__.py b/grudge/models/__init__.py
index bc780365cd89169aae11358288b17d57825c61cd..579a86017a401345d53c3ab1594e2bf56cb09116 100644
--- a/grudge/models/__init__.py
+++ b/grudge/models/__init__.py
@@ -22,8 +22,10 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 THE SOFTWARE.
 """
 
+from abc import ABCMeta, abstractmethod
 
-class Operator:
+
+class Operator(metaclass=ABCMeta):
     """A base class for Discontinuous Galerkin operators.
 
     You may derive your own operators from this class, but, at present
@@ -36,16 +38,31 @@ class Operator:
 class HyperbolicOperator(Operator):
     """A base class for hyperbolic Discontinuous Galerkin operators."""
 
-    def max_eigenvalue(self, t, fields, discr):
-        raise NotImplementedError
-
-    def estimate_rk4_timestep(self, discr, t=None, fields=None):
-        """Estimate the largest stable timestep for an RK4 method.
+    @abstractmethod
+    def max_characteristic_velocity(self, t, fields, dcoll):
+        """Return an upper bound on the characteristic
+        velocities of the operator.
         """
 
-        from grudge.dt_finding import (
-                dt_non_geometric_factor,
-                dt_geometric_factor)
-        return 1 / self.max_eigenvalue(t, fields, discr) \
-                * (dt_non_geometric_factor(discr)
-                * dt_geometric_factor(discr))
+    def estimate_rk4_timestep(self, dcoll, t=None, fields=None):
+        """Estimate the largest stable timestep for an RK4 method."""
+        from grudge.dt_utils import (dt_non_geometric_factor,
+                                     dt_geometric_factors)
+        import grudge.op as op
+
+        # FIXME: We should make the nodal reductions respect MPI.
+        # See: https://github.com/inducer/grudge/issues/112 and
+        # https://github.com/inducer/grudge/issues/113
+        max_lambda = self.max_characteristic_velocity(t, fields, dcoll)
+        dt_factor = \
+            (dt_non_geometric_factor(dcoll)
+             * op.nodal_min(dcoll, "vol", dt_geometric_factors(dcoll)))
+
+        mpi_comm = dcoll.mpi_communicator
+        if mpi_comm is None:
+            return dt_factor * (1 / max_lambda)
+
+        # NOTE: Do NOT move this import; only import MPI when we need it
+        from mpi4py import MPI
+
+        return mpi_comm.allreduce(dt_factor, op=MPI.MIN) * (1 / max_lambda)
diff --git a/grudge/models/advection.py b/grudge/models/advection.py
index 37ed429be37250f102812127c5eab28a7a495b91..2469498aef61fff823d794123992f586cfa1cb39 100644
--- a/grudge/models/advection.py
+++ b/grudge/models/advection.py
@@ -84,8 +84,8 @@ class AdvectionOperatorBase(HyperbolicOperator):
     def weak_flux(self, u_tpair):
         return advection_weak_flux(self.dcoll, self.flux_type, u_tpair, self.v)
 
-    def max_eigenvalue(self, t=None, fields=None, discr=None):
-        return np.linalg.norm(self.v)
+    def max_characteristic_velocity(self, t=None, fields=None, dcoll=None):
+        return op.norm(self.dcoll, self.v, 2)
 
 
 class StrongAdvectionOperator(AdvectionOperatorBase):
diff --git a/grudge/models/em.py b/grudge/models/em.py
index 44ec4a6edeccb7abc2d0a3bfe14f0673f8db8e14..38b73d3d2a598acef851586cd74c982761efc102 100644
--- a/grudge/models/em.py
+++ b/grudge/models/em.py
@@ -38,6 +38,7 @@ from pytools import memoize_method
 from pytools.obj_array import flat_obj_array, make_obj_array
 
 import grudge.op as op
+import numpy as np
 
 
 class MaxwellOperator(HyperbolicOperator):
@@ -332,27 +333,14 @@ class MaxwellOperator(HyperbolicOperator):
         """
         return 6*(True,)
 
-    def max_eigenvalue_expr(self):
-        """Return the largest eigenvalue of Maxwell's equations as a hyperbolic
-        system.
-        """
-        from math import sqrt
+    def max_characteristic_velocity(self, t, fields=None, dcoll=None):
         if self.fixed_material:
-            return 1/sqrt(self.epsilon*self.mu)  # a number
+            return 1/np.sqrt(self.epsilon*self.mu)  # a number
         else:
             actx = self.dcoll._setup_actx
             return op.nodal_max(self.dcoll, "vol",
                                 1 / actx.np.sqrt(self.epsilon * self.mu))
 
-    def max_eigenvalue(self, t, fields=None, discr=None, context=None):
-        if context is None:
-            context = {}
-        if self.fixed_material:
-            return self.max_eigenvalue_expr()
-        else:
-            raise ValueError("max_eigenvalue is no longer supported for "
-                    "variable-coefficient problems--use max_eigenvalue_expr")
-
     def check_bc_coverage(self, mesh):
         from meshmode.mesh import check_bc_coverage
         check_bc_coverage(mesh, [
@@ -439,15 +427,14 @@ def get_rectangular_cavity_mode(actx, nodes, t, E_0, mode_indices):  # noqa: N80
     dims = len(mode_indices)
     if dims != 2 and dims != 3:
         raise ValueError("Improper mode_indices dimensions")
-    import numpy
 
-    factors = [n*numpy.pi for n in mode_indices]
+    factors = [n*np.pi for n in mode_indices]
 
     kx, ky = factors[0:2]
     if dims == 3:
         kz = factors[2]
 
-    omega = numpy.sqrt(sum(f**2 for f in factors))
+    omega = np.sqrt(sum(f**2 for f in factors))
 
     x = nodes[0]
     y = nodes[1]
@@ -469,15 +456,15 @@ def get_rectangular_cavity_mode(actx, nodes, t, E_0, mode_indices):  # noqa: N80
         result = flat_obj_array(
             zeros,
             zeros,
-            actx.np.sin(kx * x) * actx.np.sin(ky * y) * actx.np.cos(tfac),  # ez
+            actx.np.sin(kx * x) * actx.np.sin(ky * y) * np.cos(tfac),  # ez
             (-ky * actx.np.sin(kx * x) * actx.np.cos(ky * y)
-             * actx.np.sin(tfac) / omega),  # hx
+             * np.sin(tfac) / omega),  # hx
             (kx * actx.np.cos(kx * x) * actx.np.sin(ky * y)
-             * actx.np.sin(tfac) / omega),  # hy
+             * np.sin(tfac) / omega),  # hy
             zeros,
         )
     else:
-        tdep = numpy.exp(-1j * omega * t)
+        tdep = np.exp(-1j * omega * t)
 
         gamma_squared = ky**2 + kx**2
         result = flat_obj_array(
diff --git a/grudge/models/wave.py b/grudge/models/wave.py
index ed0d40198397f070f9fba35300a0b932e99a1497..e101fd0443918c3cbb3b5f530a002ff2437578fd 100644
--- a/grudge/models/wave.py
+++ b/grudge/models/wave.py
@@ -175,7 +175,7 @@ class WeakWaveOperator(HyperbolicOperator):
             self.neumann_tag,
             self.radiation_tag])
 
-    def max_eigenvalue(self, t, fields=None, discr=None):
+    def max_characteristic_velocity(self, t, fields=None, dcoll=None):
         return abs(self.c)
 
 
@@ -331,7 +331,7 @@ class VariableCoefficientWeakWaveOperator(HyperbolicOperator):
             self.neumann_tag,
             self.radiation_tag])
 
-    def max_eigenvalue(self, t, fields=None, discr=None):
+    def max_characteristic_velocity(self, t, fields=None, dcoll=None):
         actx = self.dcoll._setup_actx
         return op.nodal_max(self.dcoll, "vol", actx.np.fabs(self.c))
 
diff --git a/grudge/op.py b/grudge/op.py
index c784537a9ac10b8e4ee06b106135f6dca7e2df54..200c89324a6ba82d2c33f55e0c7bd79a80b48368 100644
--- a/grudge/op.py
+++ b/grudge/op.py
@@ -55,6 +55,7 @@ Elementwise reductions
 ----------------------
 
 .. autofunction:: elementwise_sum
+.. autofunction:: elementwise_integral
 """
 
 __copyright__ = """
@@ -795,7 +796,7 @@ def inverse_mass(dcoll: DiscretizationCollection, vec):
     scaling factor (see :func:`grudge.geometry.area_element`).
 
     For non-affine :math:`E`, :math:`J^e` is not constant. In this case, a
-    weight-adjusted approximation is used instead:
+    weight-adjusted approximation is used instead following [Chan_2016]_:
 
     .. math::
 
@@ -1051,8 +1052,9 @@ def norm(dcoll: DiscretizationCollection, vec, p, dd=None):
         Defaults to the base volume discretization if not provided.
     :returns: a nonegative scalar denoting the norm.
     """
-    # FIXME: Make MPI-aware
-    # NOTE: Must retain a way to do local reductions
+    # FIXME: We should make the nodal reductions respect MPI.
+    # See: https://github.com/inducer/grudge/issues/112 and
+    # https://github.com/inducer/grudge/issues/113
 
     if dd is None:
         dd = dof_desc.DD_VOLUME
@@ -1082,8 +1084,9 @@ def nodal_sum(dcoll: DiscretizationCollection, dd, vec):
     :arg vec: a :class:`~meshmode.dof_array.DOFArray`.
     :returns: a scalar denoting the nodal sum.
     """
-    # FIXME: Make MPI-aware
-    # NOTE: Must retain a way to do local reductions
+    # FIXME: We should make the nodal reductions respect MPI.
+    # See: https://github.com/inducer/grudge/issues/112 and
+    # https://github.com/inducer/grudge/issues/113
     actx = vec.array_context
     return sum([actx.np.sum(grp_ary) for grp_ary in vec])
 
@@ -1096,8 +1099,9 @@ def nodal_min(dcoll: DiscretizationCollection, dd, vec):
     :arg vec: a :class:`~meshmode.dof_array.DOFArray`.
     :returns: a scalar denoting the nodal minimum.
     """
-    # FIXME: Make MPI-aware
-    # NOTE: Must retain a way to do local reductions
+    # FIXME: We should make the nodal reductions respect MPI.
+    # See: https://github.com/inducer/grudge/issues/112 and
+    # https://github.com/inducer/grudge/issues/113
     actx = vec.array_context
     return reduce(lambda acc, grp_ary: actx.np.minimum(acc, actx.np.min(grp_ary)),
                   vec, np.inf)
@@ -1111,8 +1115,9 @@ def nodal_max(dcoll: DiscretizationCollection, dd, vec):
     :arg vec: a :class:`~meshmode.dof_array.DOFArray`.
     :returns: a scalar denoting the nodal maximum.
     """
-    # FIXME: Make MPI-aware
-    # NOTE: Must retain a way to do local reductions
+    # FIXME: We should make the nodal reductions respect MPI.
+    # See: https://github.com/inducer/grudge/issues/112 and
+    # https://github.com/inducer/grudge/issues/113
     actx = vec.array_context
     return reduce(lambda acc, grp_ary: actx.np.maximum(acc, actx.np.max(grp_ary)),
                   vec, -np.inf)
@@ -1185,7 +1190,6 @@ def elementwise_sum(dcoll: DiscretizationCollection, *args):
         )
 
     actx = vec.array_context
-    vec = project(dcoll, "vol", dd, vec)
 
     return DOFArray(
         actx,
@@ -1198,6 +1202,26 @@ def elementwise_sum(dcoll: DiscretizationCollection, *args):
         )
     )
 
+
+def elementwise_integral(dcoll: DiscretizationCollection, dd, vec):
+    """Numerically integrates a function represented by a
+    :class:`~meshmode.dof_array.DOFArray` of degrees of freedom in
+    each element of a discretization, given by *dd*.
+
+    :arg dd: a :class:`~grudge.dof_desc.DOFDesc`, or a value convertible to one.
+    :arg vec: a :class:`~meshmode.dof_array.DOFArray`
+    :returns: a :class:`~meshmode.dof_array.DOFArray` containing the
+        elementwise integral of *vec*, represented as a constant function on each
+        cell.
+    """
+
+    dd = dof_desc.as_dofdesc(dd)
+
+    ones = dcoll.discr_from_dd(dd).zeros(vec.array_context) + 1.0
+    return elementwise_sum(
+        dcoll, dd, vec * _apply_mass_operator(dcoll, dd, dd, ones)
+    )
+
 # }}}
 
 
diff --git a/test/test_dt_utils.py b/test/test_dt_utils.py
new file mode 100644
index 0000000000000000000000000000000000000000..9db4f15e668df5c0cb678cb011496268ef8881ef
--- /dev/null
+++ b/test/test_dt_utils.py
@@ -0,0 +1,125 @@
+__copyright__ = """
+Copyright (C) 2021 University of Illinois Board of Trustees
+"""
+
+__license__ = """
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+"""
+
+import numpy as np
+
+from arraycontext import (  # noqa
+    pytest_generate_tests_for_pyopencl_array_context
+    as pytest_generate_tests
+)
+
+from grudge import DiscretizationCollection
+
+import grudge.op as op
+
+import pytest
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+@pytest.mark.parametrize("name", ["interval", "box2d", "box3d"])
+def test_geometric_factors_regular_refinement(actx_factory, name):
+    from grudge.dt_utils import dt_geometric_factors
+
+    actx = actx_factory()
+
+    # {{{ cases
+
+    if name == "interval":
+        from mesh_data import BoxMeshBuilder
+        builder = BoxMeshBuilder(ambient_dim=1)
+    elif name == "box2d":
+        from mesh_data import BoxMeshBuilder
+        builder = BoxMeshBuilder(ambient_dim=2)
+    elif name == "box3d":
+        from mesh_data import BoxMeshBuilder
+        builder = BoxMeshBuilder(ambient_dim=3)
+    else:
+        raise ValueError("unknown geometry name: %s" % name)
+
+    # }}}
+
+    min_factors = []
+    for resolution in builder.resolutions:
+        mesh = builder.get_mesh(resolution, builder.mesh_order)
+        dcoll = DiscretizationCollection(actx, mesh, order=builder.order)
+        min_factors.append(op.nodal_min(dcoll, "vol", dt_geometric_factors(dcoll)))
+
+    # Resolution is doubled each refinement, so the ratio of consecutive
+    # geometric factors should satisfy: gfi+1 / gfi = 2
+    min_factors = np.asarray(min_factors)
+    ratios = min_factors[:-1] / min_factors[1:]
+    assert np.all(np.isclose(ratios, 2))
+
+
+@pytest.mark.parametrize("name", ["interval", "box2d", "box3d"])
+def test_non_geometric_factors(actx_factory, name):
+    from grudge.dt_utils import dt_non_geometric_factor
+
+    actx = actx_factory()
+
+    # {{{ cases
+
+    if name == "interval":
+        from mesh_data import BoxMeshBuilder
+        builder = BoxMeshBuilder(ambient_dim=1)
+    elif name == "box2d":
+        from mesh_data import BoxMeshBuilder
+        builder = BoxMeshBuilder(ambient_dim=2)
+    elif name == "box3d":
+        from mesh_data import BoxMeshBuilder
+        builder = BoxMeshBuilder(ambient_dim=3)
+    else:
+        raise ValueError("unknown geometry name: %s" % name)
+
+    # }}}
+
+    factors = []
+    degrees = list(range(1, 8))
+    for degree in degrees:
+        mesh = builder.get_mesh(1, degree)
+        dcoll = DiscretizationCollection(actx, mesh, order=degree)
+        factors.append(dt_non_geometric_factor(dcoll))
+
+    # Crude estimate, factors should behave like 1/N**2
+    factors = np.asarray(factors)
+    lower_bounds = 1/(np.asarray(degrees)**2)
+    upper_bounds = 6.295*lower_bounds
+
+    assert all(lower_bounds <= factors)
+    assert all(factors <= upper_bounds)
+
+
+# You can test individual routines by typing
+# $ python test_grudge.py 'test_routine()'
+
+if __name__ == "__main__":
+    import sys
+    if len(sys.argv) > 1:
+        exec(sys.argv[1])
+    else:
+        pytest.main([__file__])
diff --git a/test/test_grudge.py b/test/test_grudge.py
index 25c2e469f1d2fb8858d4d995b0cc1f84c332ca1e..4963642ecf46cb300f719686f87b7e63049afd73 100644
--- a/test/test_grudge.py
+++ b/test/test_grudge.py
@@ -859,7 +859,7 @@ def test_convergence_maxwell(actx_factory,  order):
         def rhs(t, w):
             return maxwell_operator.operator(t, w)
 
-        dt = 0.002
+        dt = maxwell_operator.estimate_rk4_timestep(dcoll)
         final_t = dt * 5
         nsteps = int(final_t/dt)