diff --git a/bin/logtool b/bin/logtool index 22ae36362d281e85ae4f365ee5d0ae215a39ef7f..780fdd4f2880173c8f6912aa66c78c28769b5ae8 100755 --- a/bin/logtool +++ b/bin/logtool @@ -45,14 +45,15 @@ following: print "Time series" print "-----------" - items = list(logmgr.quantity_buffers.iteritems()) + items = list(logmgr.quantity_data.iteritems()) items.sort(lambda a,b: cmp(a[0], b[0])) col0_len = max(len(k) for k, v in items) + 1 - for key, qbuf in items: - print "%s\t%s" % (key.ljust(col0_len), qbuf.quantity.description) + for key, qdat in items: + print "%s\t%s" % (key.ljust(col0_len), qdat.quantity.description) + print print "Constants" print "---------" items = list(logmgr.variables.iteritems()) diff --git a/src/log.py b/src/log.py index d96e47fbffc0f887fd984e02f3cf111b5e7a78dd..e91db3ca78516642704672d64c5cda072e051271 100644 --- a/src/log.py +++ b/src/log.py @@ -25,11 +25,19 @@ class CallableLogQuantityAdapter(LogQuantity): # manager functionality ------------------------------------------------------- -class _QuantityBuffer: - def __init__(self, quantity, interval=1, buffer=[]): +class _QuantityData: + def __init__(self, quantity, interval=1, table=None): self.quantity = quantity self.interval = interval - self.buffer = buffer[:] + + if table is None: + from pytools.datatable import DataTable + self.table = DataTable(["step", "rank", "value"]) + else: + self.table = table.copy() + + + def _join_by_first_of_tuple(list_of_iterables): loi = [i.__iter__() for i in list_of_iterables] @@ -69,8 +77,14 @@ def _join_by_first_of_tuple(list_of_iterables): class LogManager: - def __init__(self, filename=None): - self.quantity_buffers = {} + def __init__(self, filename=None, mpi_comm=None): + """Initialize this log manager instance. + + @arg filename: If given, the log is periodically written to this file. + @arg mpi_comm: A C{boost.mpi} communicator. If given, logs are periodically + synchronized to the head node, which then writes them out to disk. + """ + self.quantity_data = {} self.tick_count = 0 self.filename = filename @@ -84,66 +98,146 @@ class LogManager: from time import time self.last_checkpoint = time() + self.mpi_comm = mpi_comm + if mpi_comm is None: + self.rank = 0 + else: + self.rank = mpi_comm.rank + self.last_sync = self.last_checkpoint + + self.t_log = 0 + def set_variable(self, name, value): self.variables[name] = value def tick(self): - for qbuf in self.quantity_buffers.itervalues(): + from time import time + start_time = time() + + for qbuf in self.quantity_data.itervalues(): if self.tick_count % qbuf.interval == 0: - qbuf.buffer.append((self.tick_count, qbuf.quantity())) + qbuf.table.insert_row((self.tick_count, self.rank, qbuf.quantity())) self.tick_count += 1 + end_time = time() + self.t_log = end_time - start_time + + # synchronize logs with parallel peers, if necessary + if self.mpi_comm is not None: + if end_time - self.last_sync > 10: + self.synchronize_logs() + self.last_sync = end_time + + # checkpoint log to disk, if necessary if self.filename is not None: - from time import time - now = time() - if now - self.last_checkpoint > 10: + if end_time - self.last_checkpoint > 10: self.save() - self.last_checkpoint = now + self.last_checkpoint = end_time + + def synchronize_logs(self): + """Send logs to head node.""" + if self.mpi_comm is None: + return + + from boost.mpi import gather + root = 0 + if self.mpi_comm.rank == root: + for rank_data in gather(self.mpi_comm, None, root)[1:]: + for name, rows in rank_data: + self.quantity_data[name].insert_rows(rows) + else: + gather(self.mpi_comm, + [(name, qdat.table.data) + for name, qdat in self.quantity_data.iteritems()], + root) def add_quantity(self, quantity, interval=1): """Add an object derived from L{LogQuantity} to this manager.""" - self.quantity_buffers[quantity.name] = _QuantityBuffer(quantity, interval) + self.quantity_data[quantity.name] = _QuantityData(quantity, interval) def get_expr_dataset(self, expression, description=None, unit=None): - """Return a triple C{(description, unit, buffer)} for the given expression. + """Return a triple C{(description, unit, table)} for the given expression. - C{buffer} consists of a list of tuples C{(tick_nbr, value)}. + C{table} is a list of tuples C{(tick_nbr, value)}. """ - try: - qbuf = self.quantity_buffers[expression] - except KeyError: - from pymbolic import parse, get_dependencies, evaluate, \ - var, substitute - parsed = parse(expression) - deps = [dep.name for dep in get_dependencies(parsed)] - - if unit is None: - unit = substitute(parsed, - dict((name, - var(self.quantity_buffers[name].quantity.unit)) - for name in deps)) - if description is None: - description = expression - - def make_eval_context(seq): - ctx = dict(seq) - ctx.update(self.variables) - return ctx - - return (description, - unit, - [(key, evaluate(parsed, - make_eval_context((name, value) - for name, value in zip(deps, values)) - )) - - for key, values in _join_by_first_of_tuple( - self.quantity_buffers[dep].buffer for dep in deps) - ]) - else: - return (description or qbuf.quantity.description, - unit or qbuf.quantity.unit, - qbuf.buffer) + + from pymbolic import parse, get_dependencies, substitute + + parsed = parse(expression) + deps = get_dependencies(parsed) + + # gather information on aggregation expressions + dep_data = [] + from pymbolic.primitives import Variable, Lookup, Subscript + for dep_idx, dep in enumerate(deps): + if isinstance(dep, Variable): + name = dep.name + from pytools import average + agg_func = average + elif isinstance(dep, Lookup): + assert isinstance(dep.aggregate, Variable) + name = dep.aggregate.name + agg_name = dep.name + if agg_name == "min": + agg_func = min + elif agg_name == "max": + agg_func = max + elif agg_name == "avg": + from pytools import average + agg_func = average + else: + raise ValueError, "invalid rank aggregator '%s'" % agg_name + elif isinstance(dep, Subscript): + assert isinstance(dep.aggregate, Variable) + name = dep.aggregate.name + + class Nth: + def __init__(self, n): + self.n = n + + def __call__(self, lst): + return lst[self.n] + + from pymbolic import evaluate + agg_func = Nth(evaluate(dep.index)) + + quantity = self.quantity_data[name].quantity + table = self.quantity_data[name].table + table.sort(["step"]) + table = table.aggregated(["step"], "value", agg_func).data + + from pytools import Record + this_dep_data = Record(table=table, quantity=quantity, + varname="logvar%d" % dep_idx, expr=dep) + dep_data.append(this_dep_data) + + # evaluate unit and description, if necessary + if unit is None: + unit = substitute(parsed, + dict((dd.expr, parse(dd.quantity.unit)) for dd in dep_data) + ) + + if description is None: + description = expression + + # substitute in the "logvar" variable names + from pymbolic import var + parsed = substitute(parsed, + dict((dd.expr, var(dd.varname)) for dd in dep_data)) + + # substitute in global variables + parsed = substitute(parsed, self.variables) + + # compile and evaluate + from pymbolic import compile + compiled = compile(parsed, [dd.varname for dd in dep_data]) + + return (description, + unit, + [(key, compiled(*values)) + for key, values in _join_by_first_of_tuple( + dd.table for dd in dep_data) + ]) def get_joint_dataset(self, expressions): """Return a joint data set for a list of expressions. @@ -153,11 +247,11 @@ class LogManager: In the former case, the description and the unit are found automatically, if possible. In the latter case, they are used as specified. - @return: A triple C{(descriptions, units, buffer)}, where - C{buffer} is a a list of C{[(tstep, (val_expr1, val_expr2,...)...]}. + @return: A triple C{(descriptions, units, table)}, where + C{table} is a a list of C{[(tstep, (val_expr1, val_expr2,...)...]}. """ - # dubs is a list of (desc, unit, buffer) triples as + # dubs is a list of (desc, unit, table) triples as # returned by get_expr_dataset dubs = [] for expr in expressions: @@ -179,6 +273,10 @@ class LogManager: return zipped_dubs def save(self, filename=None): + self.synchronize_logs() + if self.mpi_comm and not self.mpi_comm.rank != 0: + return + if filename is not None: from os import access, R_OK if access(filename, R_OK): @@ -187,23 +285,26 @@ class LogManager: filename = self.filename save_buffers = dict( - (name, _QuantityBuffer( + (name, _QuantityData( LogQuantity( qbuf.quantity.name, qbuf.quantity.unit, qbuf.quantity.description, ), qbuf.interval, - qbuf.buffer)) - for name, qbuf in self.quantity_buffers.iteritems()) + qbuf.table)) + for name, qbuf in self.quantity_data.iteritems()) from cPickle import dump, HIGHEST_PROTOCOL dump((save_buffers, self.variables), open(filename, "w"), protocol=HIGHEST_PROTOCOL) def load(self, filename): + if self.mpi_comm and not self.mpi_comm.rank != 0: + return + from cPickle import load - self.quantity_buffers, self.variables = load(open(filename)) + self.quantity_data, self.variables = load(open(filename)) def get_plot_data(self, expr_x, expr_y): """Generate plot-ready data. @@ -282,6 +383,16 @@ class IntervalTimer(LogQuantity): +class LogUpdateDuration(LogQuantity): + def __init__(self, mgr, name="t_log"): + LogQuantity.__init__(self, name, "s", "Time spent updating the log") + self.log_manager = mgr + + def __call__(self): + return self.log_manager.t_log + + + class EventCounter(LogQuantity): def __init__(self, name="interval", description=None): LogQuantity.__init__(self, name, "1", description) @@ -376,8 +487,8 @@ class Timestep(LogQuantity): def set_dt(mgr, dt): - mgr.quantity_buffers["dt"].quantity.set_dt(dt) - mgr.quantity_buffers["t_sim"].quantity.set_dt(dt) + mgr.quantity_data["dt"].quantity.set_dt(dt) + mgr.quantity_data["t_sim"].quantity.set_dt(dt) @@ -387,8 +498,15 @@ def add_general_quantities(mgr, dt): mgr.add_quantity(WallTime()) mgr.add_quantity(SimulationTime(dt)) mgr.add_quantity(Timestep(dt)) + mgr.add_quantity(LogUpdateDuration(mgr)) mgr.add_quantity(TimestepCounter()) - +def add_run_info(mgr): + import sys + mgr.set_variable("cmdline", " ".join(sys.argv)) + from socket import gethostname + mgr.set_variable("machine", gethostname()) + from time import gmtime, strftime + mgr.set_variable("date", strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime()))