From aaccb9661e140005cb274c4b07a901bccd7acf76 Mon Sep 17 00:00:00 2001
From: Ellis Hoag <ellis.sparky.hoag@gmail.com>
Date: Thu, 10 May 2018 15:29:17 -0500
Subject: [PATCH] Add benchmarking code to for MPI

---
 examples/benchmark_grudge/benchmark_mpi.py | 134 +++++++++++++++++++++
 examples/benchmark_grudge/run_benchmark.sh | 122 +++++++++++++++++++
 2 files changed, 256 insertions(+)
 create mode 100644 examples/benchmark_grudge/benchmark_mpi.py
 create mode 100755 examples/benchmark_grudge/run_benchmark.sh

diff --git a/examples/benchmark_grudge/benchmark_mpi.py b/examples/benchmark_grudge/benchmark_mpi.py
new file mode 100644
index 0000000..3861232
--- /dev/null
+++ b/examples/benchmark_grudge/benchmark_mpi.py
@@ -0,0 +1,134 @@
+import os
+import numpy as np
+import pyopencl as cl
+
+from grudge import sym, bind, DGDiscretizationWithBoundaries
+from grudge.shortcuts import set_up_rk4
+
+
+def simple_wave_entrypoint(dim=2, num_elems=256, order=4, num_steps=30,
+                           log_filename="grudge.dat"):
+    cl_ctx = cl.create_some_context()
+    queue = cl.CommandQueue(cl_ctx)
+
+    from mpi4py import MPI
+    comm = MPI.COMM_WORLD
+    num_parts = comm.Get_size()
+    n = int(num_elems ** (1./dim))
+
+    from meshmode.distributed import MPIMeshDistributor
+    mesh_dist = MPIMeshDistributor(comm)
+
+    if mesh_dist.is_mananger_rank():
+        from meshmode.mesh.generation import generate_regular_rect_mesh
+        mesh = generate_regular_rect_mesh(a=(-0.5,)*dim,
+                                          b=(0.5,)*dim,
+                                          n=(n,)*dim)
+
+        from pymetis import part_graph
+        _, p = part_graph(num_parts,
+                          xadj=mesh.nodal_adjacency.neighbors_starts.tolist(),
+                          adjncy=mesh.nodal_adjacency.neighbors.tolist())
+        part_per_element = np.array(p)
+
+        local_mesh = mesh_dist.send_mesh_parts(mesh, part_per_element, num_parts)
+    else:
+        local_mesh = mesh_dist.receive_mesh_part()
+
+    vol_discr = DGDiscretizationWithBoundaries(cl_ctx, local_mesh, order=order,
+                                               mpi_communicator=comm)
+
+    source_center = np.array([0.1, 0.22, 0.33])[:local_mesh.dim]
+    source_width = 0.05
+    source_omega = 3
+
+    sym_x = sym.nodes(local_mesh.dim)
+    sym_source_center_dist = sym_x - source_center
+    sym_t = sym.ScalarVariable("t")
+
+    from grudge.models.wave import StrongWaveOperator
+    from meshmode.mesh import BTAG_ALL, BTAG_NONE
+    op = StrongWaveOperator(-0.1, vol_discr.dim,
+            source_f=(
+                sym.sin(source_omega*sym_t)
+                * sym.exp(
+                    -np.dot(sym_source_center_dist, sym_source_center_dist)
+                    / source_width**2)),
+            dirichlet_tag=BTAG_NONE,
+            neumann_tag=BTAG_NONE,
+            radiation_tag=BTAG_ALL,
+            flux_type="upwind")
+
+    from pytools.obj_array import join_fields
+    fields = join_fields(vol_discr.zeros(queue),
+            [vol_discr.zeros(queue) for i in range(vol_discr.dim)])
+
+    from pytools.log import LogManager, \
+            add_general_quantities, \
+            add_run_info, \
+            IntervalTimer, EventCounter
+    # NOTE: LogManager hangs when using a file on a shared directory.
+    logmgr = LogManager(log_filename, "w", comm)
+    add_run_info(logmgr)
+    add_general_quantities(logmgr)
+    log_quantities =\
+        {"rank_data_swap_timer": IntervalTimer("rank_data_swap_timer",
+                        "Time spent evaluating RankDataSwapAssign"),
+        "rank_data_swap_counter": EventCounter("rank_data_swap_counter",
+                        "Number of RankDataSwapAssign instructions evaluated"),
+        "exec_timer": IntervalTimer("exec_timer",
+                        "Total time spent executing instructions"),
+        "insn_eval_timer": IntervalTimer("insn_eval_timer",
+                        "Time spend evaluating instructions"),
+        "future_eval_timer": IntervalTimer("future_eval_timer",
+                        "Time spent evaluating futures"),
+        "busy_wait_timer": IntervalTimer("busy_wait_timer",
+                        "Time wasted doing busy wait")}
+    for quantity in log_quantities.values():
+        logmgr.add_quantity(quantity)
+
+    bound_op = bind(vol_discr, op.sym_operator())
+
+    def rhs(t, w):
+        val, rhs.profile_data = bound_op(queue, profile_data=rhs.profile_data,
+                                                log_quantities=log_quantities,
+                                                t=t, w=w)
+        return val
+    rhs.profile_data = {}
+
+    dt = 0.04
+    dt_stepper = set_up_rk4("w", dt, fields, rhs)
+
+    logmgr.tick_before()
+    for event in dt_stepper.run(t_end=dt * num_steps):
+        if isinstance(event, dt_stepper.StateComputed):
+            logmgr.tick_after()
+            logmgr.tick_before()
+    logmgr.tick_after()
+
+    def print_profile_data(data):
+        print("""execute() for rank %d:
+            \tInstruction Evaluation: %f%%
+            \tFuture Evaluation: %f%%
+            \tBusy Wait: %f%%
+            \tTotal: %f seconds""" %
+            (comm.Get_rank(),
+             data['insn_eval_time'] / data['total_time'] * 100,
+             data['future_eval_time'] / data['total_time'] * 100,
+             data['busy_wait_time'] / data['total_time'] * 100,
+             data['total_time']))
+
+    print_profile_data(rhs.profile_data)
+    logmgr.close()
+
+
+if __name__ == "__main__":
+    assert "RUN_WITHIN_MPI" in os.environ, "Must run within mpi"
+    import sys
+    assert len(sys.argv) == 5, \
+        "Usage: %s %s num_elems order num_steps logfile" \
+        % (sys.executable, sys.argv[0])
+    simple_wave_entrypoint(num_elems=int(sys.argv[1]),
+                           order=int(sys.argv[2]),
+                           num_steps=int(sys.argv[3]),
+                           log_filename=sys.argv[4])
diff --git a/examples/benchmark_grudge/run_benchmark.sh b/examples/benchmark_grudge/run_benchmark.sh
new file mode 100755
index 0000000..6c535df
--- /dev/null
+++ b/examples/benchmark_grudge/run_benchmark.sh
@@ -0,0 +1,122 @@
+#!/bin/bash
+
+# Weak scaling: We run our code on one computer, then we buy a second computer
+# and we can run twice as much code in the same amount of time.
+
+# Strong scaling: We run our code on one computer, then we buy a second computer
+# and we can run the same code in half the time.
+
+# Examples:
+# ./run_benchmark.sh -t WEAK -n 100 -r 20 -s 1000 -l ~/weak_scaling.dat -o weak_scaling.txt
+# ./run_benchmark.sh -t STRONG -n 100 -r 20 -s 1000 -l ~/strong_scaling.dat -o strong_scaling.txt
+
+set -eu
+
+# NOTE: benchmark_mpi.py hangs when logfile is in a shared directory.
+USAGE="Usage: $0 -t <WEAK|STRONG> -n num_elems -r order -s num_steps -l logfile -o outfile"
+while getopts "t:n:r:s:l:o:" OPT; do
+  case $OPT in
+    t)
+      case $OPTARG in
+        WEAK)
+          SCALING_TYPE='WEAK'
+          ;;
+        STRONG)
+          SCALING_TYPE='STRONG'
+          ;;
+        *)
+          echo $USAGE
+          exit 1
+          ;;
+      esac
+      ;;
+    n)
+      NUM_ELEMS=$OPTARG
+      ;;
+    r)
+      ORDER=$OPTARG
+      ;;
+    s)
+      NUM_STEPS=$OPTARG
+      ;;
+    l)
+      LOGFILE=$OPTARG
+      ;;
+    o)
+      OUTFILE=$OPTARG
+      ;;
+    *)
+      echo $USAGE
+      exit 1
+      ;;
+  esac
+done
+
+
+# NOTE: We want to make sure we run grudge in the right environment.
+SHARED="/home/eshoag2/shared"
+source $SHARED/miniconda3/bin/activate inteq
+PYTHON=$(which python)
+BENCHMARK_MPI="$SHARED/grudge/examples/benchmark_grudge/benchmark_mpi.py"
+
+# Assume HOSTS_LIST is sorted in increasing order starting with one host.
+HOSTS_LIST="\
+porter \
+porter,stout \
+porter,stout,koelsch"
+
+ENVIRONMENT_VARS="\
+-x RUN_WITHIN_MPI=1 \
+-x PYOPENCL_CTX=0 \
+-x POCL_AFFINITY=1"
+
+PERF_EVENTS="\
+cpu-cycles,\
+instructions,\
+task-clock"
+
+TEMPDIR=$(mktemp -d)
+trap 'rm -rf $TEMPDIR' EXIT HUP INT QUIT TERM
+
+echo "$(date): Testing $SCALING_TYPE scaling" | tee -a $OUTFILE
+
+NUM_HOSTS=1
+BASE_NUM_ELEMS=$NUM_ELEMS
+for HOSTS in $HOSTS_LIST; do
+
+  if [ $SCALING_TYPE = 'WEAK' ]; then
+    NUM_ELEMS=$(echo $BASE_NUM_ELEMS $NUM_HOSTS | awk '{ print $1 * $2 }')
+  fi
+
+  BENCHMARK_CMD="$PYTHON $BENCHMARK_MPI $NUM_ELEMS $ORDER $NUM_STEPS $LOGFILE.trial$NUM_HOSTS"
+  MPI_CMD="mpiexec --output-filename $TEMPDIR/output -H $HOSTS $ENVIRONMENT_VARS $BENCHMARK_CMD"
+  echo "Executing: $MPI_CMD"
+
+  # NOTE: perf does not follow mpi accross different nodes.
+  # Instead, perf will follow all processes on the porter node.
+  echo "====================Using $NUM_HOSTS host(s)===================" >> $OUTFILE
+  START_TIME=$(date +%s)
+  perf stat --append -o $OUTFILE -e $PERF_EVENTS $MPI_CMD
+  DURATION=$(($(date +%s) - $START_TIME))
+  echo "Finished in $DURATION seconds"
+
+  echo "===================Output of Python===================" >> $OUTFILE
+  cat $TEMPDIR/* >> $OUTFILE
+  echo "======================================================" >> $OUTFILE
+  rm $TEMPDIR/*
+
+  if [ $NUM_HOSTS -eq 1 ]; then
+    BASE_DURATION=$DURATION
+  fi
+
+  # Efficiency is expected / actual
+  if [ $SCALING_TYPE = 'STRONG' ]; then
+    EFFICIENCY=$(echo $DURATION $BASE_DURATION $NUM_HOSTS | awk '{ print $2 / ($3 * $1) * 100"%" }')
+  elif [ $SCALING_TYPE = 'WEAK' ]; then
+    EFFICIENCY=$(echo $DURATION $BASE_DURATION | awk '{ print $2 / $1 * 100"%" }')
+  fi
+
+  echo "Efficiency for $SCALING_TYPE scaling is $EFFICIENCY for $NUM_HOSTS host(s)." | tee -a $OUTFILE
+
+  ((NUM_HOSTS++))
+done
-- 
GitLab