Newer
Older
import pyopencl as cl
from sympy.core.cache import clear_cache
import numpy as np
from sumpy.kernel import LaplaceKernel
from pytential.qbx import QBXLayerPotentialSource
from pytential.qbx.distributed import DistributedQBXLayerPotentialSource
from mpi4py import MPI
import logging
import os
# Parameters
k = 0
qbx_order = 3
fmm_order = 10
_expansion_stick_out_factor = 0.5
logger = logging.getLogger(__name__)
logging.basicConfig(level=os.environ.get("LOGLEVEL", "WARNING"))
logging.getLogger("pytential.qbx.distributed").setLevel(logging.INFO)
logging.getLogger("pytential.qbx.perf_model").setLevel(logging.INFO)
logging.getLogger("boxtree.distributed.local_tree").setLevel(logging.INFO)
logging.getLogger("boxtree.distributed.local_traversal").setLevel(logging.INFO)
logging.getLogger("boxtree.distributed.calculation").setLevel(logging.INFO)
logging.getLogger(__name__).setLevel(logging.INFO)
warnings.filterwarnings("ignore", category=UserWarning)
# Disable sympy cache
clear_cache()
# Setup PyOpenCL
ctx = cl.create_some_context()
queue = cl.CommandQueue(ctx)
class GreenExpr(object):
zero_op_name = "green"
def get_zero_op(self, kernel, **knl_kwargs):
u_sym = sym.var("u")
dn_u_sym = sym.var("dn_u")
return (
sym.S(kernel, dn_u_sym, qbx_forced_limit=-1, **knl_kwargs)
- sym.D(kernel, u_sym, qbx_forced_limit="avg", **knl_kwargs)
- 0.5*u_sym)
order_drop = 0
def get_urchin_mesh(m, n, order, est_rel_interp_tolerance):
from meshmode.mesh.generation import refine_mesh_and_get_urchin_warper
refiner, warp_mesh = refine_mesh_and_get_urchin_warper(
order, m, n, est_rel_interp_tolerance
)
def replicate_along_axes(mesh, shape, sep_ratio):
"""
:arg shape: Replication shape, e.g. (1, 2, 1) for 2 copies along y axis
:arg sep_ratio: Distance between copies as a ratio of the bounding box size
"""
from meshmode.mesh.processing import (
find_bounding_box, affine_map, merge_disjoint_meshes)
bbox = find_bounding_box(mesh)
sizes = bbox[1] - bbox[0]
meshes = [mesh]
for i in range(mesh.ambient_dim):
for j in range(1, shape[i]):
vec = np.zeros(mesh.ambient_dim)
vec[i] = j * sizes[i] * (1 + sep_ratio)
meshes.append(affine_map(mesh, A=None, b=vec))
# FIXME: https://gitlab.tiker.net/inducer/pytential/issues/6
mesh = merge_disjoint_meshes(meshes, single_group=True)
meshes = [mesh]
return mesh
def generate_expr(m, n, target_order, est_rel_interp_tolerance, lp_source_contructor,
mesh = get_urchin_mesh(m, n, target_order, est_rel_interp_tolerance)
if replicate_parameters is not None:
mesh = replicate_along_axes(mesh, *replicate_parameters)
lap_k_sym = LaplaceKernel(d)
k_sym = lap_k_sym
knl_kwargs = {}
from meshmode.discretization import Discretization
from meshmode.discretization.poly_element import \
InterpolatoryQuadratureSimplexGroupFactory
pre_density_discr = Discretization(
ctx, mesh,
InterpolatoryQuadratureSimplexGroupFactory(target_order)
)
refiner_extra_kwargs = {}
qbx, _ = lp_source_contructor(
pre_density_discr, 4 * target_order,
qbx_order,
fmm_order=fmm_order,
_expansions_in_tree_have_extent=True,
_expansion_stick_out_factor=_expansion_stick_out_factor,
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
).with_refinement(**refiner_extra_kwargs)
density_discr = qbx.density_discr
# {{{ compute values of a solution to the PDE
nodes_host = density_discr.nodes().get(queue)
normal = bind(density_discr, sym.normal(d))(queue).as_vector(np.object)
normal_host = [normal[j].get() for j in range(d)]
center = np.array([3, 1, 2])[:d]
diff = nodes_host - center[:, np.newaxis]
dist_squared = np.sum(diff ** 2, axis=0)
dist = np.sqrt(dist_squared)
if d == 2:
u = np.log(dist)
grad_u = diff / dist_squared
elif d == 3:
u = 1 / dist
grad_u = -diff / dist ** 3
else:
assert False
dn_u = 0
for i in range(d):
dn_u = dn_u + normal_host[i] * grad_u[i]
# }}}
u_dev = cl.array.to_device(queue, u)
dn_u_dev = cl.array.to_device(queue, dn_u)
grad_u_dev = cl.array.to_device(queue, grad_u)
bound_op = bind(qbx, expr.get_zero_op(k_sym, **knl_kwargs))
return bound_op, {'u': u_dev, 'dn_u': dn_u_dev, 'grad_u': grad_u_dev, 'k': k}
(2, 3, 5, 0.01), # dummy run
(3, 5, 5, 0.01),
(3, 5, 5, 0.01),
(4, 8, 5, 0.01),
(4, 8, 5, 0.01)
# (8, 15, 5, 0.01) # 2m
# (10, 16, 5, 0.01) # 2m
# (4, 8, 5, 0.001),
# (4, 8, 5, 0.001),
# (6, 10, 5, 0.001), # 2m
# (6, 10, 5, 0.001),
# (8, 12, 5, 0.001),
# (8, 12, 5, 0.001),
# (10, 15, 5, 0.001), # 5m
# (10, 15, 5, 0.001)
# (20, 30, 5, 0.001) # 17m
# (30, 50, 5, 0.001) # 33m
# (40, 70, 5, 0.001) # 72m
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
cost_model = CLQBXCostModel(
queue, CLQBXCostModel.get_constantone_calibration_params()
)
lp_source_contructor = functools.partial(
QBXLayerPotentialSource,
fmm_backend='fmmlib',
cost_model=cost_model
)
model_results = []
timing_results = []
for icase, (m, n, target_order, est_rel_interp_tolerance) in \
enumerate(training_cases):
expr, context = generate_expr(
m, n, target_order, est_rel_interp_tolerance, lp_source_contructor
)
modeled_cost = expr.get_modeled_cost(queue, **context)
timing_data = {}
expr.eval(queue, context, timing_data=timing_data)
if icase != 0:
for key in modeled_cost:
model_results.append(modeled_cost[key])
timing_results.append(timing_data[key])
calibration_params = cost_model.estimate_calibration_params(
model_results, timing_results
)
def test_cost_model_accuracy(calibration_params):
m, n, target_order, est_rel_interp_tolerance = 10, 16, 5, 0.01
cost_model = CLQBXCostModel(queue, calibration_params)
lp_source_contructor = functools.partial(
QBXLayerPotentialSource,
fmm_backend='fmmlib',
cost_model=cost_model
)
# dummy run
expr, context = generate_expr(
2, 3, 5, 0.01, lp_source_contructor
)
expr.eval(queue, context)
# actual run
expr, context = generate_expr(
m, n, target_order, est_rel_interp_tolerance, lp_source_contructor
)
modeled_cost = expr.get_modeled_cost(queue, **context)
for key in modeled_cost:
modeled_cost_aggregate = {}
for stage in modeled_cost[key]:
modeled_cost_aggregate[stage] = cost_model.aggregate(modeled_cost[key][stage])
print(modeled_cost_aggregate)
timing_data = {}
expr.eval(queue, context, timing_data=timing_data)
for key in timing_data:
actual_time = {}
for stage in timing_data[key]:
actual_time[stage] = timing_data[key][stage]["wall_elapsed"]
print(actual_time)
# MPI information
comm = MPI.COMM_WORLD
current_rank = comm.Get_rank()
if current_rank == 0:
cost_model = CLQBXCostModel(queue, calibration_params)
# {{{ dummy run
lp_source_contructor = functools.partial(
DistributedQBXLayerPotentialSource,
comm,
cost_model=cost_model
)
bound_op, context = generate_expr(
2, 3, 5, 0.01, lp_source_contructor
)
bound_op.eval(queue, context=context)
# }}}
comm.barrier()
logger.info("Dummy run ends!")
# {{{ actual run
lp_source_contructor = functools.partial(
DistributedQBXLayerPotentialSource,
comm,
generate_expr_start_time = time.time()
40, 70, 5, 0.001, lp_source_contructor,
replicate_parameters=((2, 1, 1), 0.05)
logger.info("Generating geometry/expression elapsed {} secs.".format(
time.time() - generate_expr_start_time)
)
from pytential.qbx.distributed import drive_dfmm
# {{{ dummy run
for _ in range(2):
lp_source = DistributedQBXLayerPotentialSource(comm, None, None)
distribute_geo_data = lp_source.distibuted_geo_data(
None, queue, None, None
)
weights = None
drive_dfmm(
queue, weights, distribute_geo_data, comm=comm
)
# }}}
comm.barrier()
# {{{ actual run
for _ in range(2):
lp_source = DistributedQBXLayerPotentialSource(comm, None, None)
distribute_geo_data = lp_source.distibuted_geo_data(
None, queue, None, None
)
queue, weights, distribute_geo_data, comm=comm
if __name__ == '__main__':
import sys
if len(sys.argv) != 2:
raise RuntimeError("Please specify train or eval")
if sys.argv[1] == 'train':
params = train()
pickle.dump(params, open("param", "wb"))
params = pickle.load(open("param", "rb"))
evaluate(params)
elif sys.argv[1] == 'test':
params = pickle.load(open("param", "rb"))
test_cost_model_accuracy(params)