From b5af14daa2f1967a49e58413cef62b78984573cc Mon Sep 17 00:00:00 2001 From: Andreas Kloeckner Date: Tue, 15 Jan 2008 11:29:02 -0500 Subject: [PATCH] Lots of log improvements. (+) Uniproc-functional log watches. Use getrusage for timing. Default aggregators. Documentation. Fix sync timing. --- src/log.py | 359 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 264 insertions(+), 95 deletions(-) diff --git a/src/log.py b/src/log.py index f063989..36130db 100644 --- a/src/log.py +++ b/src/log.py @@ -3,17 +3,35 @@ from __future__ import division +# timing function ------------------------------------------------------------- +def time(): + """Return elapsed CPU time, as a float, in seconds.""" + from resource import getrusage, RUSAGE_SELF + return getrusage(RUSAGE_SELF).ru_utime + + + + # abstract logging interface -------------------------------------------------- class LogQuantity: + """A source of loggable scalars.""" def __init__(self, name, unit=None, description=None): self.name = name self.unit = unit self.description = description + default_aggregator = None + def __call__(self): + """Return the current value of the diagnostic represented by this + L{LogQuantity}.""" raise NotImplementedError + + + class CallableLogQuantityAdapter(LogQuantity): + """Adapt a 0-ary callable as a L{LogQuantity}.""" def __init__(self, callable, name, unit=None, description=None): self.callable = callable LogQuantity.__init__(self, name, unit, description) @@ -77,6 +95,19 @@ def _join_by_first_of_tuple(list_of_iterables): class LogManager: + """A parallel-capable diagnostic time-series logging facility. + + A C{LogManager} logs any number of named time series of floats to + a file. Non-time-series data, in the form of constants, is also + supported and saved. + + If MPI parallelism is used, the "head rank" below always refers to + rank 0. + + A command line tool called C{logtool} is available for looking at the + data in a saved log. + """ + def __init__(self, filename=None, mpi_comm=None): """Initialize this log manager instance. @@ -88,32 +119,60 @@ class LogManager: self.tick_count = 0 self.filename = filename - self.variables = {} + self.constants = {} if filename is not None: from os import access, R_OK if access(self.filename, R_OK): raise IOError, "cowardly refusing to overwrite '%s'" % self.filename - from time import time + # self-timing self.start_time = time() + self.t_log = 0 + # parallel support + self.head_rank = 0 self.mpi_comm = mpi_comm + self.is_parallel = mpi_comm is not None if mpi_comm is None: self.rank = 0 self.last_checkpoint = self.start_time else: self.rank = mpi_comm.rank - self.next_sync = 10 + self.next_sync_tick = 10 self.head_rank = 0 - self.t_log = 0 + # watch stuff + self.watches = [] + self.next_watch_tick = 1 + + def add_watches(self, watches): + """Add quantities that are printed after every time step.""" + + from pytools import Record + + for watch in watches: + parsed = self._parse_expr(watch) + parsed, dep_data = self._get_expr_dep_data(parsed) - def set_variable(self, name, value): - self.variables[name] = value + from pymbolic import compile + compiled = compile(parsed, [dd.varname for dd in dep_data]) + + watch_info = Record(expr=watch, parsed=parsed, dep_data=dep_data, + compiled=compiled) + + self.watches.append(watch_info) + + def set_constant(self, name, value): + """Make a named, constant value available in the log.""" + self.constants[name] = value def tick(self): - from time import time + """Record data points from each added L{LogQuantity}. + + May also checkpoint data to disk, and/or synchronize data points + to the head rank. + """ start_time = time() for qbuf in self.quantity_data.itervalues(): @@ -124,22 +183,25 @@ class LogManager: end_time = time() self.t_log = end_time - start_time + # print watches + if self.tick_count == self.next_watch_tick: + self._watch_tick() + # synchronize logs with parallel peers, if necessary if self.mpi_comm is not None: # parallel-case : sync, then checkpoint - if self.tick_count == self.next_sync: - + if self.tick_count == self.next_sync_tick: if self.filename is not None: # implicitly synchronizes self.save() else: self.synchronize_logs() - # figure out next sync tick + # figure out next sync tick, broadcast to peers ticks_per_20_sec = 20*self.tick_count/max(1, end_time-self.start_time) - self.next_sync = self.tick_count + min(10, ticks_per_20_sec) + next_sync_tick = self.tick_count + int(max(10, ticks_per_20_sec)) from boost.mpi import broadcast - self.next_sync = broadcast(self.mpi_comm, self.next_sync, self.head_rank) + self.next_sync_tick = broadcast(self.mpi_comm, next_sync_tick, self.head_rank) else: # non-parallel-case : checkpoint log to disk, if necessary if self.filename is not None: @@ -148,7 +210,9 @@ class LogManager: self.last_checkpoint = end_time def synchronize_logs(self): - """Send logs to head node.""" + """Transfer data from client ranks to the head rank. + + Must be called on all ranks simultaneously.""" if self.mpi_comm is None: return @@ -173,69 +237,33 @@ class LogManager: self.quantity_data[quantity.name] = _QuantityData(quantity, interval) def get_expr_dataset(self, expression, description=None, unit=None): - """Return a triple C{(description, unit, table)} for the given expression. - - C{table} is a list of tuples C{(tick_nbr, value)}. + """Prepare a time-series dataset for a given expression. + + @arg expression: A C{pymbolic} expression that may involve + the time-series variables and the constants in this L{LogManager}. + If there is data from multiple ranks for a quantity occuring in + this expression, an aggregator may have to be specified. + @return: C{(description, unit, table)}, where C{table} + is a list of tuples C{(tick_nbr, value)}. + + Aggregators are specified as follows: + - C{qty.min}, C{qty.max}, C{qty.avg}, C{qty.sum}, C{qty.norm2} + - C{qty[rank_nbr] """ - from pymbolic import parse, get_dependencies, substitute - - parsed = parse(expression) - deps = get_dependencies(parsed) - - # gather information on aggregation expressions - dep_data = [] - from pymbolic.primitives import Variable, Lookup, Subscript - for dep_idx, dep in enumerate(deps): - if isinstance(dep, Variable): - name = dep.name - from pytools import average - agg_func = average - elif isinstance(dep, Lookup): - assert isinstance(dep.aggregate, Variable) - name = dep.aggregate.name - agg_name = dep.name - if agg_name == "min": - agg_func = min - elif agg_name == "max": - agg_func = max - elif agg_name == "avg": - from pytools import average - agg_func = average - elif agg_name == "sum": - agg_func = sum - elif agg_name == "norm2": - from math import sqrt - agg_func = lambda iterable: sqrt( - sum(entry**2 for entry in iterable)) - else: - raise ValueError, "invalid rank aggregator '%s'" % agg_name - elif isinstance(dep, Subscript): - assert isinstance(dep.aggregate, Variable) - name = dep.aggregate.name - - class Nth: - def __init__(self, n): - self.n = n + parsed = self._parse_expr(expression) + parsed, dep_data = self._get_expr_dep_data(parsed) - def __call__(self, lst): - return lst[self.n] - - from pymbolic import evaluate - agg_func = Nth(evaluate(dep.index)) - - quantity = self.quantity_data[name].quantity - table = self.quantity_data[name].table + # aggregate table data + for dd in dep_data: + table = self.quantity_data[dd.name].table table.sort(["step"]) - table = table.aggregated(["step"], "value", agg_func).data - - from pytools import Record - this_dep_data = Record(table=table, quantity=quantity, - varname="logvar%d" % dep_idx, expr=dep) - dep_data.append(this_dep_data) + dd.table = table.aggregated(["step"], "value", dd.agg_func).data # evaluate unit and description, if necessary if unit is None: + from pymbolic import substitute, parse + unit = substitute(parsed, dict((dd.expr, parse(dd.quantity.unit)) for dd in dep_data) ) @@ -243,14 +271,6 @@ class LogManager: if description is None: description = expression - # substitute in the "logvar" variable names - from pymbolic import var - parsed = substitute(parsed, - dict((dd.expr, var(dd.varname)) for dd in dep_data)) - - # substitute in global variables - parsed = substitute(parsed, self.variables) - # compile and evaluate from pymbolic import compile compiled = compile(parsed, [dd.varname for dd in dep_data]) @@ -296,6 +316,13 @@ class LogManager: return zipped_dubs def save(self, filename=None): + """Save log data to a file. + + L{synchronize_logs} is called before saving. + + @arg filename: Specify the file name. If not given, the globally set name + is used. + """ self.synchronize_logs() if self.mpi_comm and self.mpi_comm.rank != self.head_rank: return @@ -319,15 +346,16 @@ class LogManager: for name, qbuf in self.quantity_data.iteritems()) from cPickle import dump, HIGHEST_PROTOCOL - dump((save_buffers, self.variables), + dump((save_buffers, self.constants, self.is_parallel), open(filename, "w"), protocol=HIGHEST_PROTOCOL) def load(self, filename): + """Load saved log data from C{filename}.""" if self.mpi_comm and self.mpi_comm.rank != self.head_rank: return from cPickle import load - self.quantity_data, self.variables = load(open(filename)) + self.quantity_data, self.constants, self.is_parallel = load(open(filename)) def get_plot_data(self, expr_x, expr_y): """Generate plot-ready data. @@ -380,21 +408,131 @@ class LogManager: ylabel(label_y) plot(data_x, data_y) + # private functionality --------------------------------------------------- + def _parse_expr(self, expr): + from pymbolic import parse, substitute + parsed = parse(expr) + + # substitute in global constants + parsed = substitute(parsed, self.constants) + + return parsed + + def _get_expr_dep_data(self, parsed): + from pymbolic import get_dependencies + + deps = get_dependencies(parsed) + + # gather information on aggregation expressions + dep_data = [] + from pymbolic.primitives import Variable, Lookup, Subscript + for dep_idx, dep in enumerate(deps): + if isinstance(dep, Variable): + name = dep.name + agg_func = self.quantity_data[name].quantity.default_aggregator + if agg_func is None: + if self.is_parallel: + raise ValueError, "must specify explicit aggregator for '%s'" % name + else: + agg_func = max # use something simple + elif isinstance(dep, Lookup): + assert isinstance(dep.aggregate, Variable) + name = dep.aggregate.name + agg_name = dep.name + if agg_name == "min": + agg_func = min + elif agg_name == "max": + agg_func = max + elif agg_name == "avg": + from pytools import average + agg_func = average + elif agg_name == "sum": + agg_func = sum + elif agg_name == "norm2": + from math import sqrt + agg_func = lambda iterable: sqrt( + sum(entry**2 for entry in iterable)) + else: + raise ValueError, "invalid rank aggregator '%s'" % agg_name + elif isinstance(dep, Subscript): + assert isinstance(dep.aggregate, Variable) + name = dep.aggregate.name + + class Nth: + def __init__(self, n): + self.n = n + + def __call__(self, lst): + return lst[self.n] + + from pymbolic import evaluate + agg_func = Nth(evaluate(dep.index)) + + quantity = self.quantity_data[name].quantity + + from pytools import Record + this_dep_data = Record(name=name, quantity=quantity, agg_func=agg_func, + varname="logvar%d" % dep_idx, expr=dep) + dep_data.append(this_dep_data) + + # substitute in the "logvar" variable names + from pymbolic import var, substitute + parsed = substitute(parsed, + dict((dd.expr, var(dd.varname)) for dd in dep_data)) + + return parsed, dep_data + + def _watch_tick(self): + def get_last_value(table): + if table: + return table.data[-1][2] + else: + return 0 + + data_block = dict((name, get_last_value(qdat.table)) + for name, qdat in self.quantity_data.iteritems()) + + if self.mpi_comm is not None: + from boost.mpi import broadcast, gather + + gathered_data = gather(self.mpi_comm, data_block, self.head_rank) + else: + gathered_data = [data_block] + + if self.rank == self.head_rank: + values = {} + for data_block in gathered_data: + for name, value in data_block.iteritems(): + values.setdefault(name, []).append(value) + + print " | ".join( + "%s=%g" % (watch.expr, watch.compiled( + *[dd.agg_func(values[dd.name]) for dd in watch.dep_data])) + for watch in self.watches + ) + + ticks_per_sec = self.tick_count/max(1, time()-self.start_time) + self.next_watch_tick = self.tick_count + int(max(1, ticks_per_sec)) + + if self.mpi_comm is not None: + self.next_watch_tick = broadcast(self.mpi_comm, next_watch_tick, self.head_rank) + + + # actual data loggers --------------------------------------------------------- class IntervalTimer(LogQuantity): + """Records the elapsed time between L{start} and L{stop} calls.""" def __init__(self, name="interval", description=None): LogQuantity.__init__(self, name, "s", description) self.elapsed = 0 def start(self): - from time import time self.start_time = time() def stop(self): - from time import time self.elapsed += time() - self.start_time del self.start_time @@ -407,16 +545,21 @@ class IntervalTimer(LogQuantity): class LogUpdateDuration(LogQuantity): + """Records how long the last L{LogManager.tick} invocation took.""" def __init__(self, mgr, name="t_log"): LogQuantity.__init__(self, name, "s", "Time spent updating the log") self.log_manager = mgr + default_aggregator = max + def __call__(self): return self.log_manager.t_log class EventCounter(LogQuantity): + """Counts events signaled by L{add}.""" + def __init__(self, name="interval", description=None): LogQuantity.__init__(self, name, "1", description) self.events = 0 @@ -436,10 +579,14 @@ class EventCounter(LogQuantity): class TimestepCounter(LogQuantity): + """Counts the number of times L{LogManager.tick} is called.""" + def __init__(self, name="step"): LogQuantity.__init__(self, name, "1", "Timesteps") self.steps = 0 + default_aggregator = max + def __call__(self): result = self.steps self.steps += 1 @@ -449,14 +596,16 @@ class TimestepCounter(LogQuantity): class TimestepDuration(LogQuantity): + """Records the CPU time between invocations of L{LogManager.tick}.""" + def __init__(self, name="t_step"): LogQuantity.__init__(self, name, "s", "Time step duration") - from time import time self.last_start = time() + default_aggregator = max + def __call__(self): - from time import time now = time() result = now - self.last_start self.last_start = now @@ -465,26 +614,40 @@ class TimestepDuration(LogQuantity): -class WallTime(LogQuantity): - def __init__(self, name="t_wall"): +class CPUTime(LogQuantity): + """Records (monotonically increasing) CPU time.""" + def __init__(self, name="t_cpu"): LogQuantity.__init__(self, name, "s", "Wall time") - from time import time self.start = time() def __call__(self): - from time import time return time()-self.start +def add_general_quantities(mgr): + """Add generally applicable L{LogQuantity} objects to C{mgr}.""" + + mgr.add_quantity(TimestepDuration()) + mgr.add_quantity(CPUTime()) + mgr.add_quantity(LogUpdateDuration(mgr)) + mgr.add_quantity(TimestepCounter()) + + + + class SimulationTime(LogQuantity): + """Record (monotonically increasing) simulation time.""" + def __init__(self, dt, name="t_sim", start=0): LogQuantity.__init__(self, name, "s", "Simulation Time") self.dt = dt self.t = 0 + default_aggregator = max + def set_dt(self, dt): self.dt = dt @@ -497,6 +660,8 @@ class SimulationTime(LogQuantity): class Timestep(LogQuantity): + """Record the magnitude of the simulated time step.""" + def __init__(self, dt, name="dt"): LogQuantity.__init__(self, name, "s", "Simulation Timestep") self.dt = dt @@ -510,26 +675,30 @@ class Timestep(LogQuantity): def set_dt(mgr, dt): + """Set the simulation timestep on L{LogManager} C{mgr} to C{dt}.""" + mgr.quantity_data["dt"].quantity.set_dt(dt) mgr.quantity_data["t_sim"].quantity.set_dt(dt) -def add_general_quantities(mgr, dt): - mgr.add_quantity(TimestepDuration()) - mgr.add_quantity(WallTime()) +def add_simulation_quantities(mgr, dt): + """Add L{LogQuantity} objects relating to simulation time.""" mgr.add_quantity(SimulationTime(dt)) mgr.add_quantity(Timestep(dt)) - mgr.add_quantity(LogUpdateDuration(mgr)) - mgr.add_quantity(TimestepCounter()) + + add_general_quantities(mgr) + def add_run_info(mgr): + """Add generic run metadata, such as command line, host, and time.""" + import sys - mgr.set_variable("cmdline", " ".join(sys.argv)) + mgr.set_constant("cmdline", " ".join(sys.argv)) from socket import gethostname - mgr.set_variable("machine", gethostname()) + mgr.set_constant("machine", gethostname()) from time import localtime, strftime - mgr.set_variable("date", strftime("%a, %d %b %Y %H:%M:%S %Z", localtime())) + mgr.set_constant("date", strftime("%a, %d %b %Y %H:%M:%S %Z", localtime())) -- GitLab