Skip to content
Snippets Groups Projects
Commit aaccb966 authored by Ellis Hoag's avatar Ellis Hoag
Browse files

Add benchmarking code to for MPI

parent 9842b869
No related branches found
No related tags found
No related merge requests found
import os
import numpy as np
import pyopencl as cl
from grudge import sym, bind, DGDiscretizationWithBoundaries
from grudge.shortcuts import set_up_rk4
def simple_wave_entrypoint(dim=2, num_elems=256, order=4, num_steps=30,
log_filename="grudge.dat"):
cl_ctx = cl.create_some_context()
queue = cl.CommandQueue(cl_ctx)
from mpi4py import MPI
comm = MPI.COMM_WORLD
num_parts = comm.Get_size()
n = int(num_elems ** (1./dim))
from meshmode.distributed import MPIMeshDistributor
mesh_dist = MPIMeshDistributor(comm)
if mesh_dist.is_mananger_rank():
from meshmode.mesh.generation import generate_regular_rect_mesh
mesh = generate_regular_rect_mesh(a=(-0.5,)*dim,
b=(0.5,)*dim,
n=(n,)*dim)
from pymetis import part_graph
_, p = part_graph(num_parts,
xadj=mesh.nodal_adjacency.neighbors_starts.tolist(),
adjncy=mesh.nodal_adjacency.neighbors.tolist())
part_per_element = np.array(p)
local_mesh = mesh_dist.send_mesh_parts(mesh, part_per_element, num_parts)
else:
local_mesh = mesh_dist.receive_mesh_part()
vol_discr = DGDiscretizationWithBoundaries(cl_ctx, local_mesh, order=order,
mpi_communicator=comm)
source_center = np.array([0.1, 0.22, 0.33])[:local_mesh.dim]
source_width = 0.05
source_omega = 3
sym_x = sym.nodes(local_mesh.dim)
sym_source_center_dist = sym_x - source_center
sym_t = sym.ScalarVariable("t")
from grudge.models.wave import StrongWaveOperator
from meshmode.mesh import BTAG_ALL, BTAG_NONE
op = StrongWaveOperator(-0.1, vol_discr.dim,
source_f=(
sym.sin(source_omega*sym_t)
* sym.exp(
-np.dot(sym_source_center_dist, sym_source_center_dist)
/ source_width**2)),
dirichlet_tag=BTAG_NONE,
neumann_tag=BTAG_NONE,
radiation_tag=BTAG_ALL,
flux_type="upwind")
from pytools.obj_array import join_fields
fields = join_fields(vol_discr.zeros(queue),
[vol_discr.zeros(queue) for i in range(vol_discr.dim)])
from pytools.log import LogManager, \
add_general_quantities, \
add_run_info, \
IntervalTimer, EventCounter
# NOTE: LogManager hangs when using a file on a shared directory.
logmgr = LogManager(log_filename, "w", comm)
add_run_info(logmgr)
add_general_quantities(logmgr)
log_quantities =\
{"rank_data_swap_timer": IntervalTimer("rank_data_swap_timer",
"Time spent evaluating RankDataSwapAssign"),
"rank_data_swap_counter": EventCounter("rank_data_swap_counter",
"Number of RankDataSwapAssign instructions evaluated"),
"exec_timer": IntervalTimer("exec_timer",
"Total time spent executing instructions"),
"insn_eval_timer": IntervalTimer("insn_eval_timer",
"Time spend evaluating instructions"),
"future_eval_timer": IntervalTimer("future_eval_timer",
"Time spent evaluating futures"),
"busy_wait_timer": IntervalTimer("busy_wait_timer",
"Time wasted doing busy wait")}
for quantity in log_quantities.values():
logmgr.add_quantity(quantity)
bound_op = bind(vol_discr, op.sym_operator())
def rhs(t, w):
val, rhs.profile_data = bound_op(queue, profile_data=rhs.profile_data,
log_quantities=log_quantities,
t=t, w=w)
return val
rhs.profile_data = {}
dt = 0.04
dt_stepper = set_up_rk4("w", dt, fields, rhs)
logmgr.tick_before()
for event in dt_stepper.run(t_end=dt * num_steps):
if isinstance(event, dt_stepper.StateComputed):
logmgr.tick_after()
logmgr.tick_before()
logmgr.tick_after()
def print_profile_data(data):
print("""execute() for rank %d:
\tInstruction Evaluation: %f%%
\tFuture Evaluation: %f%%
\tBusy Wait: %f%%
\tTotal: %f seconds""" %
(comm.Get_rank(),
data['insn_eval_time'] / data['total_time'] * 100,
data['future_eval_time'] / data['total_time'] * 100,
data['busy_wait_time'] / data['total_time'] * 100,
data['total_time']))
print_profile_data(rhs.profile_data)
logmgr.close()
if __name__ == "__main__":
assert "RUN_WITHIN_MPI" in os.environ, "Must run within mpi"
import sys
assert len(sys.argv) == 5, \
"Usage: %s %s num_elems order num_steps logfile" \
% (sys.executable, sys.argv[0])
simple_wave_entrypoint(num_elems=int(sys.argv[1]),
order=int(sys.argv[2]),
num_steps=int(sys.argv[3]),
log_filename=sys.argv[4])
#!/bin/bash
# Weak scaling: We run our code on one computer, then we buy a second computer
# and we can run twice as much code in the same amount of time.
# Strong scaling: We run our code on one computer, then we buy a second computer
# and we can run the same code in half the time.
# Examples:
# ./run_benchmark.sh -t WEAK -n 100 -r 20 -s 1000 -l ~/weak_scaling.dat -o weak_scaling.txt
# ./run_benchmark.sh -t STRONG -n 100 -r 20 -s 1000 -l ~/strong_scaling.dat -o strong_scaling.txt
set -eu
# NOTE: benchmark_mpi.py hangs when logfile is in a shared directory.
USAGE="Usage: $0 -t <WEAK|STRONG> -n num_elems -r order -s num_steps -l logfile -o outfile"
while getopts "t:n:r:s:l:o:" OPT; do
case $OPT in
t)
case $OPTARG in
WEAK)
SCALING_TYPE='WEAK'
;;
STRONG)
SCALING_TYPE='STRONG'
;;
*)
echo $USAGE
exit 1
;;
esac
;;
n)
NUM_ELEMS=$OPTARG
;;
r)
ORDER=$OPTARG
;;
s)
NUM_STEPS=$OPTARG
;;
l)
LOGFILE=$OPTARG
;;
o)
OUTFILE=$OPTARG
;;
*)
echo $USAGE
exit 1
;;
esac
done
# NOTE: We want to make sure we run grudge in the right environment.
SHARED="/home/eshoag2/shared"
source $SHARED/miniconda3/bin/activate inteq
PYTHON=$(which python)
BENCHMARK_MPI="$SHARED/grudge/examples/benchmark_grudge/benchmark_mpi.py"
# Assume HOSTS_LIST is sorted in increasing order starting with one host.
HOSTS_LIST="\
porter \
porter,stout \
porter,stout,koelsch"
ENVIRONMENT_VARS="\
-x RUN_WITHIN_MPI=1 \
-x PYOPENCL_CTX=0 \
-x POCL_AFFINITY=1"
PERF_EVENTS="\
cpu-cycles,\
instructions,\
task-clock"
TEMPDIR=$(mktemp -d)
trap 'rm -rf $TEMPDIR' EXIT HUP INT QUIT TERM
echo "$(date): Testing $SCALING_TYPE scaling" | tee -a $OUTFILE
NUM_HOSTS=1
BASE_NUM_ELEMS=$NUM_ELEMS
for HOSTS in $HOSTS_LIST; do
if [ $SCALING_TYPE = 'WEAK' ]; then
NUM_ELEMS=$(echo $BASE_NUM_ELEMS $NUM_HOSTS | awk '{ print $1 * $2 }')
fi
BENCHMARK_CMD="$PYTHON $BENCHMARK_MPI $NUM_ELEMS $ORDER $NUM_STEPS $LOGFILE.trial$NUM_HOSTS"
MPI_CMD="mpiexec --output-filename $TEMPDIR/output -H $HOSTS $ENVIRONMENT_VARS $BENCHMARK_CMD"
echo "Executing: $MPI_CMD"
# NOTE: perf does not follow mpi accross different nodes.
# Instead, perf will follow all processes on the porter node.
echo "====================Using $NUM_HOSTS host(s)===================" >> $OUTFILE
START_TIME=$(date +%s)
perf stat --append -o $OUTFILE -e $PERF_EVENTS $MPI_CMD
DURATION=$(($(date +%s) - $START_TIME))
echo "Finished in $DURATION seconds"
echo "===================Output of Python===================" >> $OUTFILE
cat $TEMPDIR/* >> $OUTFILE
echo "======================================================" >> $OUTFILE
rm $TEMPDIR/*
if [ $NUM_HOSTS -eq 1 ]; then
BASE_DURATION=$DURATION
fi
# Efficiency is expected / actual
if [ $SCALING_TYPE = 'STRONG' ]; then
EFFICIENCY=$(echo $DURATION $BASE_DURATION $NUM_HOSTS | awk '{ print $2 / ($3 * $1) * 100"%" }')
elif [ $SCALING_TYPE = 'WEAK' ]; then
EFFICIENCY=$(echo $DURATION $BASE_DURATION | awk '{ print $2 / $1 * 100"%" }')
fi
echo "Efficiency for $SCALING_TYPE scaling is $EFFICIENCY for $NUM_HOSTS host(s)." | tee -a $OUTFILE
((NUM_HOSTS++))
done
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment