import pyopencl as cl from sympy.core.cache import clear_cache import numpy as np from pytential import bind, sym from sumpy.kernel import LaplaceKernel from pytential.qbx import QBXLayerPotentialSource from pytential.qbx.cost import CLQBXCostModel from pytential.qbx.distributed import DistributedQBXLayerPotentialSource from mpi4py import MPI import logging import os import warnings import functools import time import pickle # Parameters k = 0 qbx_order = 3 fmm_order = 10 _expansion_stick_out_factor = 0.5 # Configure logging logger = logging.getLogger(__name__) logging.basicConfig(level=os.environ.get("LOGLEVEL", "WARNING")) logging.getLogger("pytential.qbx.distributed").setLevel(logging.INFO) logging.getLogger("pytential.qbx.perf_model").setLevel(logging.INFO) logging.getLogger("boxtree.distributed.local_tree").setLevel(logging.INFO) logging.getLogger("boxtree.distributed.local_traversal").setLevel(logging.INFO) logging.getLogger("boxtree.distributed.calculation").setLevel(logging.INFO) logging.getLogger(__name__).setLevel(logging.INFO) warnings.filterwarnings("ignore", category=UserWarning) # Disable sympy cache clear_cache() # Setup PyOpenCL ctx = cl.create_some_context() queue = cl.CommandQueue(ctx) class GreenExpr(object): zero_op_name = "green" def get_zero_op(self, kernel, **knl_kwargs): u_sym = sym.var("u") dn_u_sym = sym.var("dn_u") return ( sym.S(kernel, dn_u_sym, qbx_forced_limit=-1, **knl_kwargs) - sym.D(kernel, u_sym, qbx_forced_limit="avg", **knl_kwargs) - 0.5*u_sym) order_drop = 0 def get_urchin_mesh(m, n, order, est_rel_interp_tolerance): from meshmode.mesh.generation import refine_mesh_and_get_urchin_warper refiner, warp_mesh = refine_mesh_and_get_urchin_warper( order, m, n, est_rel_interp_tolerance ) return refiner.get_current_mesh() def replicate_along_axes(mesh, shape, sep_ratio): """ :arg shape: Replication shape, e.g. (1, 2, 1) for 2 copies along y axis :arg sep_ratio: Distance between copies as a ratio of the bounding box size """ from meshmode.mesh.processing import ( find_bounding_box, affine_map, merge_disjoint_meshes) bbox = find_bounding_box(mesh) sizes = bbox[1] - bbox[0] meshes = [mesh] for i in range(mesh.ambient_dim): for j in range(1, shape[i]): vec = np.zeros(mesh.ambient_dim) vec[i] = j * sizes[i] * (1 + sep_ratio) meshes.append(affine_map(mesh, A=None, b=vec)) # FIXME: https://gitlab.tiker.net/inducer/pytential/issues/6 mesh = merge_disjoint_meshes(meshes, single_group=True) meshes = [mesh] return mesh def generate_expr(m, n, target_order, est_rel_interp_tolerance, lp_source_contructor, replicate_parameters=None): expr = GreenExpr() mesh = get_urchin_mesh(m, n, target_order, est_rel_interp_tolerance) if replicate_parameters is not None: mesh = replicate_along_axes(mesh, *replicate_parameters) d = mesh.ambient_dim lap_k_sym = LaplaceKernel(d) k_sym = lap_k_sym knl_kwargs = {} from meshmode.discretization import Discretization from meshmode.discretization.poly_element import \ InterpolatoryQuadratureSimplexGroupFactory pre_density_discr = Discretization( ctx, mesh, InterpolatoryQuadratureSimplexGroupFactory(target_order) ) refiner_extra_kwargs = {} qbx, _ = lp_source_contructor( pre_density_discr, 4 * target_order, qbx_order, fmm_order=fmm_order, _expansions_in_tree_have_extent=True, _expansion_stick_out_factor=_expansion_stick_out_factor, ).with_refinement(**refiner_extra_kwargs) density_discr = qbx.density_discr # {{{ compute values of a solution to the PDE nodes_host = density_discr.nodes().get(queue) normal = bind(density_discr, sym.normal(d))(queue).as_vector(np.object) normal_host = [normal[j].get() for j in range(d)] center = np.array([3, 1, 2])[:d] diff = nodes_host - center[:, np.newaxis] dist_squared = np.sum(diff ** 2, axis=0) dist = np.sqrt(dist_squared) if d == 2: u = np.log(dist) grad_u = diff / dist_squared elif d == 3: u = 1 / dist grad_u = -diff / dist ** 3 else: assert False dn_u = 0 for i in range(d): dn_u = dn_u + normal_host[i] * grad_u[i] # }}} u_dev = cl.array.to_device(queue, u) dn_u_dev = cl.array.to_device(queue, dn_u) grad_u_dev = cl.array.to_device(queue, grad_u) bound_op = bind(qbx, expr.get_zero_op(k_sym, **knl_kwargs)) return bound_op, {'u': u_dev, 'dn_u': dn_u_dev, 'grad_u': grad_u_dev, 'k': k} def train(): training_cases = [ (2, 3, 5, 0.01), # dummy run (3, 5, 5, 0.01), (3, 5, 5, 0.01), (4, 8, 5, 0.01), (4, 8, 5, 0.01) # (8, 15, 5, 0.01) # 2m # (10, 16, 5, 0.01) # 2m # (4, 8, 5, 0.001), # (4, 8, 5, 0.001), # (6, 10, 5, 0.001), # 2m # (6, 10, 5, 0.001), # (8, 12, 5, 0.001), # (8, 12, 5, 0.001), # (10, 15, 5, 0.001), # 5m # (10, 15, 5, 0.001) # (20, 30, 5, 0.001) # 17m # (30, 50, 5, 0.001) # 33m # (40, 70, 5, 0.001) # 72m ] cost_model = CLQBXCostModel( queue, CLQBXCostModel.get_constantone_calibration_params() ) lp_source_contructor = functools.partial( QBXLayerPotentialSource, fmm_backend='fmmlib', cost_model=cost_model ) model_results = [] timing_results = [] for icase, (m, n, target_order, est_rel_interp_tolerance) in \ enumerate(training_cases): expr, context = generate_expr( m, n, target_order, est_rel_interp_tolerance, lp_source_contructor ) modeled_cost = expr.get_modeled_cost(queue, **context) timing_data = {} expr.eval(queue, context, timing_data=timing_data) if icase != 0: for key in modeled_cost: model_results.append(modeled_cost[key]) timing_results.append(timing_data[key]) calibration_params = cost_model.estimate_calibration_params( model_results, timing_results ) return calibration_params def test_cost_model_accuracy(calibration_params): m, n, target_order, est_rel_interp_tolerance = 10, 16, 5, 0.01 cost_model = CLQBXCostModel(queue, calibration_params) lp_source_contructor = functools.partial( QBXLayerPotentialSource, fmm_backend='fmmlib', cost_model=cost_model ) # dummy run expr, context = generate_expr( 2, 3, 5, 0.01, lp_source_contructor ) expr.eval(queue, context) # actual run expr, context = generate_expr( m, n, target_order, est_rel_interp_tolerance, lp_source_contructor ) modeled_cost = expr.get_modeled_cost(queue, **context) for key in modeled_cost: modeled_cost_aggregate = {} for stage in modeled_cost[key]: modeled_cost_aggregate[stage] = cost_model.aggregate(modeled_cost[key][stage]) print(modeled_cost_aggregate) timing_data = {} expr.eval(queue, context, timing_data=timing_data) for key in timing_data: actual_time = {} for stage in timing_data[key]: actual_time[stage] = timing_data[key][stage]["wall_elapsed"] print(actual_time) def evaluate(calibration_params): # MPI information comm = MPI.COMM_WORLD current_rank = comm.Get_rank() if current_rank == 0: cost_model = CLQBXCostModel(queue, calibration_params) # {{{ dummy run lp_source_contructor = functools.partial( DistributedQBXLayerPotentialSource, comm, cost_model=cost_model ) bound_op, context = generate_expr( 2, 3, 5, 0.01, lp_source_contructor ) bound_op.eval(queue, context=context) # }}} comm.barrier() logger.info("Dummy run ends!") # {{{ actual run lp_source_contructor = functools.partial( DistributedQBXLayerPotentialSource, comm, cost_model=cost_model ) generate_expr_start_time = time.time() bound_op, context = generate_expr( 40, 70, 5, 0.001, lp_source_contructor, replicate_parameters=((2, 1, 1), 0.05) ) logger.info("Generating geometry/expression elapsed {} secs.".format( time.time() - generate_expr_start_time) ) bound_op.eval(queue, context=context) # }}} else: from pytential.qbx.distributed import drive_dfmm # {{{ dummy run for _ in range(2): lp_source = DistributedQBXLayerPotentialSource(comm, None, None) distribute_geo_data = lp_source.distibuted_geo_data( None, queue, None, None ) weights = None drive_dfmm( queue, weights, distribute_geo_data, comm=comm ) # }}} comm.barrier() # {{{ actual run for _ in range(2): lp_source = DistributedQBXLayerPotentialSource(comm, None, None) distribute_geo_data = lp_source.distibuted_geo_data( None, queue, None, None ) weights = None drive_dfmm( queue, weights, distribute_geo_data, comm=comm ) # }}} if __name__ == '__main__': import sys if len(sys.argv) != 2: raise RuntimeError("Please specify train or eval") if sys.argv[1] == 'train': params = train() pickle.dump(params, open("param", "wb")) elif sys.argv[1] == 'eval': params = pickle.load(open("param", "rb")) evaluate(params) elif sys.argv[1] == 'test': params = pickle.load(open("param", "rb")) test_cost_model_accuracy(params) else: raise RuntimeError("Unrecognized command line argument. Please specify train or eval.")