diff --git a/grudge/execution.py b/grudge/execution.py index f756d21b64c691c372d6f328bf8153ed124dd732..9d665cb359c618bb0a07e3a030efef966cbbdc77 100644 --- a/grudge/execution.py +++ b/grudge/execution.py @@ -534,6 +534,7 @@ class BoundOperator(object): def process_sym_operator(discrwb, sym_operator, post_bind_mapper=None, dumper=lambda name, sym_operator: None): + orig_sym_operator = sym_operator import grudge.symbolic.mappers as mappers dumper("before-bind", sym_operator) @@ -541,6 +542,30 @@ def process_sym_operator(discrwb, sym_operator, post_bind_mapper=None, mappers.ErrorChecker(discrwb.mesh)(sym_operator) + sym_operator = \ + mappers.OppositeInteriorFaceSwapUniqueIDAssigner()(sym_operator) + + # {{{ broadcast root rank's symn_operator + + # also make sure all ranks had same orig_sym_operator + + if discrwb.mpi_communicator is not None: + (mgmt_rank_orig_sym_operator, mgmt_rank_sym_operator) = \ + discrwb.mpi_communicator.bcast( + (orig_sym_operator, sym_operator), + discrwb.get_management_rank_index()) + + from pytools.obj_array import is_equal as is_oa_equal + if not is_oa_equal(mgmt_rank_orig_sym_operator, orig_sym_operator): + raise ValueError("rank %d received a different symbolic " + "operator to bind from rank %d" + % (discrwb.mpi_communicator.Get_rank(), + discrwb.get_management_rank_index())) + + sym_operator = mgmt_rank_sym_operator + + # }}} + if post_bind_mapper is not None: dumper("before-postbind", sym_operator) sym_operator = post_bind_mapper(sym_operator) @@ -578,37 +603,10 @@ def process_sym_operator(discrwb, sym_operator, post_bind_mapper=None, volume_mesh = discrwb.discr_from_dd("vol").mesh from meshmode.distributed import get_connected_partitions connected_parts = get_connected_partitions(volume_mesh) + if connected_parts: sym_operator = mappers.DistributedMapper(connected_parts)(sym_operator) - # Communicate send and recv tags between ranks - comm = discrwb.mpi_communicator - i_local_rank = comm.Get_rank() - - tag_mapper = mappers.MPITagCollector(i_local_rank) - sym_operator = tag_mapper(sym_operator) - - if len(tag_mapper.send_tag_lookups) > 0: - # print("Rank %d distributing tags" % i_local_rank) - send_reqs = [] - for i_remote_rank in connected_parts: - send_tags = tag_mapper.send_tag_lookups[i_remote_rank] - send_reqs.append(comm.isend(send_tags, - i_remote_rank, - MPI_TAG_SEND_TAGS)) - - # print("Rank %d receiving tags" % i_local_rank) - recv_tag_lookups = {} - for i_remote_rank in connected_parts: - recv_tags = comm.recv(source=i_remote_rank, tag=MPI_TAG_SEND_TAGS) - recv_tag_lookups[i_remote_rank] = recv_tags - - for req in send_reqs: - req.wait() - - sym_operator = mappers.MPITagDistributor(recv_tag_lookups, - i_local_rank)(sym_operator) - dumper("before-imass", sym_operator) sym_operator = mappers.InverseMassContractor()(sym_operator) diff --git a/grudge/symbolic/compiler.py b/grudge/symbolic/compiler.py index 99de29976263196b98da3f66720439747775b95c..976beed9b8cc6ec20d6420203509f0ba954372ff 100644 --- a/grudge/symbolic/compiler.py +++ b/grudge/symbolic/compiler.py @@ -209,11 +209,6 @@ class RankDataSwapAssign(Instruction): The number of the remote rank that this instruction swaps data with. - .. attribute:: mpi_tag_offset - - A tag offset for mpi that should be unique for each instance within - a particular rank. - .. attribute:: dd_out .. attribute:: comment """ @@ -225,8 +220,8 @@ class RankDataSwapAssign(Instruction): self.field = field self.i_remote_rank = op.i_remote_part self.dd_out = op.dd_out - self.send_tag = self.MPI_TAG_GRUDGE_DATA_BASE + op.send_tag_offset - self.recv_tag = self.MPI_TAG_GRUDGE_DATA_BASE + op.recv_tag_offset + self.send_tag = self.MPI_TAG_GRUDGE_DATA_BASE + op.unique_id + self.recv_tag = self.MPI_TAG_GRUDGE_DATA_BASE + op.unique_id self.comment = "Swap data with rank %02d" % self.i_remote_rank @memoize_method @@ -502,8 +497,8 @@ class Code(object): if profile_data is not None: insn_start_time = time() if log_quantities is not None: - insn_sub_timer =\ - log_quantities["insn_eval_timer"].start_sub_timer() + insn_sub_timer = \ + log_quantities["insn_eval_timer"].start_sub_timer() insn, discardable_vars = self.get_next_step( frozenset(list(context.keys())), @@ -517,9 +512,11 @@ class Code(object): if log_quantities is not None: if isinstance(insn, RankDataSwapAssign): from pytools.log import time_and_count_function - mapper_method = time_and_count_function(mapper_method, - log_quantities["rank_data_swap_timer"], - log_quantities["rank_data_swap_counter"]) + mapper_method = time_and_count_function( + mapper_method, + log_quantities["rank_data_swap_timer"], + log_quantities["rank_data_swap_counter"]) + assignments, new_futures = mapper_method(insn) for target, value in assignments: @@ -536,6 +533,7 @@ class Code(object): if not futures: # No more instructions or futures. We are done. break + # Busy wait for a new future if profile_data is not None: busy_wait_start_time = time() diff --git a/grudge/symbolic/mappers/__init__.py b/grudge/symbolic/mappers/__init__.py index 6b2512529526e5d8cfb85c220cb229def679fed1..5304d647f542400f12b67fecc34dd0f377b23375 100644 --- a/grudge/symbolic/mappers/__init__.py +++ b/grudge/symbolic/mappers/__init__.py @@ -334,90 +334,70 @@ class OperatorBinder(CSECachingMapperMixin, IdentityMapper): # }}} +# {{{ dof desc (dd) replacement + +class DOFDescReplacer(IdentityMapper): + def __init__(self, prev_dd, new_dd): + self.prev_dd = prev_dd + self.new_dd = new_dd + + def map_operator_binding(self, expr): + if (isinstance(expr.op, op.OppositeInteriorFaceSwap) + and expr.op.dd_in == self.prev_dd + and expr.op.dd_out == self.prev_dd): + field = self.rec(expr.field) + return op.OppositePartitionFaceSwap(dd_in=self.new_dd, + dd_out=self.new_dd)(field) + elif (isinstance(expr.op, op.InterpolationOperator) + and expr.op.dd_out == self.prev_dd): + return op.InterpolationOperator(dd_in=expr.op.dd_in, + dd_out=self.new_dd)(expr.field) + elif (isinstance(expr.op, op.RefDiffOperatorBase) + and expr.op.dd_out == self.prev_dd + and expr.op.dd_in == self.prev_dd): + return type(expr.op)(expr.op.rst_axis, + dd_in=self.new_dd, + dd_out=self.new_dd)(self.rec(expr.field)) + + def map_node_coordinate_component(self, expr): + if expr.dd == self.prev_dd: + return type(expr)(expr.axis, self.new_dd) + +# }}} + + # {{{ mappers for distributed computation -def make_key_from_expr(expr, i_send_rank, i_recv_rank, clean_btag): - from copy import deepcopy - expr = deepcopy(expr) - - class BTAGCleaner(IdentityMapper): - def __init__(self): - from meshmode.mesh import BTAG_PARTITION - self.prev_dd = sym.as_dofdesc(BTAG_PARTITION(i_recv_rank)) - self.new_dd = sym.as_dofdesc(BTAG_PARTITION(i_send_rank)) - - def map_operator_binding(self, expr): - if (isinstance(expr.op, op.OppositeInteriorFaceSwap) - and expr.op.dd_in == self.prev_dd - and expr.op.dd_out == self.prev_dd): - field = self.rec(expr.field) - return op.OppositePartitionFaceSwap(dd_in=self.new_dd, - dd_out=self.new_dd)(field) - elif (isinstance(expr.op, op.InterpolationOperator) - and expr.op.dd_out == self.prev_dd): - return op.InterpolationOperator(dd_in=expr.op.dd_in, - dd_out=self.new_dd)(expr.field) - elif (isinstance(expr.op, op.RefDiffOperator) - and expr.op.dd_out == self.prev_dd - and expr.op.dd_in == self.prev_dd): - return op.RefDiffOperator(expr.op.rst_axis, - dd_in=self.new_dd, - dd_out=self.new_dd)(self.rec(expr.field)) - - def map_node_coordinate_component(self, expr): - if expr.dd == self.prev_dd: - return type(expr)(expr.axis, self.new_dd) - if clean_btag: - # FIXME: Maybe there is a better way to do this - # We need to change BTAG_PARTITION so that when expr is sent over to the - # other rank, it matches one of its own expressions - expr = BTAGCleaner()(expr) - return (expr, i_send_rank, i_recv_rank) - - -class MPITagCollector(CSECachingMapperMixin, IdentityMapper): +class OppositeInteriorFaceSwapUniqueIDAssigner( + CSECachingMapperMixin, IdentityMapper): map_common_subexpression_uncached = IdentityMapper.map_common_subexpression - def __init__(self, i_local_rank): - self.i_local_rank = i_local_rank - self.send_tag_lookups = {} + def __init__(self): + super(OppositeInteriorFaceSwapUniqueIDAssigner, self).__init__() + self._next_id = 0 + self.seen_ids = set() - def map_operator_binding(self, expr): - if isinstance(expr.op, op.OppositePartitionFaceSwap): - i_remote_rank = expr.op.i_remote_part - key = make_key_from_expr(self.rec(expr.field), - i_send_rank=self.i_local_rank, - i_recv_rank=i_remote_rank, - clean_btag=True) - if i_remote_rank not in self.send_tag_lookups: - self.send_tag_lookups[i_remote_rank] = {} - assert key not in self.send_tag_lookups[i_remote_rank],\ - "Duplicate keys found in tag lookup" - tag = expr.op.send_tag_offset = len(self.send_tag_lookups[i_remote_rank]) - self.send_tag_lookups[i_remote_rank][key] = tag - return expr - else: - return IdentityMapper.map_operator_binding(self, expr) + def next_id(self): + while self._next_id in self.seen_ids: + self._next_id += 1 + result = self._next_id + self._next_id += 1 + self.seen_ids.add(result) -class MPITagDistributor(CSECachingMapperMixin, IdentityMapper): - map_common_subexpression_uncached = IdentityMapper.map_common_subexpression + return result - def __init__(self, recv_tag_lookups, i_local_rank): - self.recv_tag_lookups = recv_tag_lookups - self.i_local_rank = i_local_rank + def map_opposite_interior_face_swap(self, expr): + if expr.unique_id is not None: + if expr.unique_id in self.seen_ids: + raise ValueError("OppositeInteriorFaceSwap unique ID '%d' " + "is not unique" % expr.unique_id) - def map_operator_binding(self, expr): - if isinstance(expr.op, op.OppositePartitionFaceSwap): - i_remote_rank = expr.op.i_remote_part - key = make_key_from_expr(self.rec(expr.field), - i_send_rank=i_remote_rank, - i_recv_rank=self.i_local_rank, - clean_btag=False) - expr.op.recv_tag_offset = self.recv_tag_lookups[i_remote_rank][key] + self.seen_ids.add(expr.unique_id) return expr + else: - return IdentityMapper.map_operator_binding(self, expr) + return type(expr)(expr.dd_in, expr.dd_out, self.next_id()) class DistributedMapper(CSECachingMapperMixin, IdentityMapper): @@ -464,8 +444,10 @@ class RankGeometryChanger(CSECachingMapperMixin, IdentityMapper): and expr.op.dd_in == self.prev_dd and expr.op.dd_out == self.prev_dd): field = self.rec(expr.field) - return op.OppositePartitionFaceSwap(dd_in=self.new_dd, - dd_out=self.new_dd)(field) + return op.OppositePartitionFaceSwap( + dd_in=self.new_dd, + dd_out=self.new_dd, + unique_id=expr.op.unique_id)(field) elif (isinstance(expr.op, op.InterpolationOperator) and expr.op.dd_out == self.prev_dd): return op.InterpolationOperator(dd_in=expr.op.dd_in, diff --git a/grudge/symbolic/operators.py b/grudge/symbolic/operators.py index 41b057d31311a48c7eb21f31986a9634f1dfe8f1..53fb1422619db271786abb9c2c17df5b3f9a97c0 100644 --- a/grudge/symbolic/operators.py +++ b/grudge/symbolic/operators.py @@ -83,6 +83,8 @@ class Operator(pymbolic.primitives.Expression): dd_in=dd_in or self.dd_in, dd_out=dd_out or self.dd_out) + init_arg_names = ("dd_in", "dd_out") + def __getinitargs__(self): return (self.dd_in, self.dd_out,) @@ -97,8 +99,6 @@ class ElementwiseLinearOperator(Operator): class InterpolationOperator(Operator): - init_arg_names = ("dd_in", "dd_out") - def __init__(self, dd_in, dd_out): official_dd_in = _sym().as_dofdesc(dd_in) official_dd_out = _sym().as_dofdesc(dd_out) @@ -107,6 +107,7 @@ class InterpolationOperator(Operator): " does not do anything.".format(official_dd_in, official_dd_out)) super(InterpolationOperator, self).__init__(dd_in, dd_out) + mapper_method = intern("map_interpolation") @@ -165,6 +166,8 @@ class DiffOperatorBase(Operator): self.xyz_axis = xyz_axis + init_arg_names = ("xyz_axis", "dd_in", "dd_out") + def __getinitargs__(self): return (self.xyz_axis, self.dd_in, self.dd_out) @@ -216,6 +219,8 @@ class RefDiffOperatorBase(ElementwiseLinearOperator): self.rst_axis = rst_axis + init_arg_names = ("rst_axis", "dd_in", "dd_out") + def __getinitargs__(self): return (self.rst_axis, self.dd_in, self.dd_out) @@ -410,8 +415,53 @@ class RefInverseMassOperator(RefMassOperatorBase): # {{{ boundary-related operators + +class OppositeInteriorFaceSwap(Operator): + """ + .. attribute:: unique_id + + An integer identifying this specific instances of + :class:`OppositePartitionFaceSwap` within an entire bound symbolic + operator. Is assigned automatically by :func:`grudge.bind` + if not already set by the user. This will become + :class:`OppositePartitionFaceSwap.unique_id` in distributed + runs. + """ + + def __init__(self, dd_in=None, dd_out=None, unique_id=None): + sym = _sym() + + if dd_in is None: + dd_in = sym.DOFDesc(sym.FACE_RESTR_INTERIOR, None) + if dd_out is None: + dd_out = dd_in + + super(OppositeInteriorFaceSwap, self).__init__(dd_in, dd_out) + if self.dd_in.domain_tag is not sym.FACE_RESTR_INTERIOR: + raise ValueError("dd_in must be an interior faces domain") + if self.dd_out != self.dd_in: + raise ValueError("dd_out and dd_in must be identical") + + assert unique_id is None or isinstance(unique_id, int) + self.unique_id = unique_id + + init_arg_names = ("dd_in", "dd_out", "unique_id") + + def __getinitargs__(self): + return (self.dd_in, self.dd_out, self.unique_id) + + mapper_method = intern("map_opposite_interior_face_swap") + + class OppositePartitionFaceSwap(Operator): - def __init__(self, dd_in=None, dd_out=None): + """ + .. attribute:: unique_id + + An integer corresponding to the :attr:`OppositeInteriorFaceSwap.unique_id` + which led to the creation of this object. This integer is used as an + MPI tag offset to keep different subexpressions apart in MPI traffic. + """ + def __init__(self, dd_in=None, dd_out=None, unique_id=None): sym = _sym() if dd_in is None and dd_out is None: @@ -429,25 +479,15 @@ class OppositePartitionFaceSwap(Operator): self.i_remote_part = self.dd_in.domain_tag.part_nr - mapper_method = intern("map_opposite_partition_face_swap") - + assert unique_id is None or isinstance(unique_id, int) + self.unique_id = unique_id -class OppositeInteriorFaceSwap(Operator): - def __init__(self, dd_in=None, dd_out=None): - sym = _sym() + init_arg_names = ("dd_in", "dd_out", "unique_id") - if dd_in is None: - dd_in = sym.DOFDesc(sym.FACE_RESTR_INTERIOR, None) - if dd_out is None: - dd_out = dd_in - - super(OppositeInteriorFaceSwap, self).__init__(dd_in, dd_out) - if self.dd_in.domain_tag is not sym.FACE_RESTR_INTERIOR: - raise ValueError("dd_in must be an interior faces domain") - if self.dd_out != self.dd_in: - raise ValueError("dd_out and dd_in must be identical") + def __getinitargs__(self): + return (self.dd_in, self.dd_out, self.unique_id) - mapper_method = intern("map_opposite_interior_face_swap") + mapper_method = intern("map_opposite_partition_face_swap") class FaceMassOperatorBase(ElementwiseLinearOperator): diff --git a/grudge/symbolic/primitives.py b/grudge/symbolic/primitives.py index 5b6f63c26ddfb8895d73f74b6fd69d388bf01ec4..35c45268508b99c2e0264f38875eac2e895f335a 100644 --- a/grudge/symbolic/primitives.py +++ b/grudge/symbolic/primitives.py @@ -445,6 +445,8 @@ class NodeCoordinateComponent(DiscretizationProperty): assert dd.domain_tag is not None + init_arg_names = ("axis", "dd") + def __getinitargs__(self): return (self.axis, self.dd)