diff --git a/pytools/log.py b/pytools/log.py index 3b25a4886cd8e5a2df7ff892d49ac233a608f960..f95674b0a5c49cff076bada5a84d27738d669c19 100644 --- a/pytools/log.py +++ b/pytools/log.py @@ -4,9 +4,8 @@ import logging logger = logging.getLogger(__name__) +# {{{ timing function - -# timing function ------------------------------------------------------------- def time(): """Return elapsed CPU time, as a float, in seconds.""" import os @@ -18,12 +17,13 @@ def time(): from resource import getrusage, RUSAGE_SELF return getrusage(RUSAGE_SELF).ru_utime else: - raise RuntimeError, "invalid timing method '%s'" % time_opt + raise RuntimeError("invalid timing method '%s'" % time_opt) +# }}} +# {{{ abstract logging interface -# abstract logging interface -------------------------------------------------- class LogQuantity(object): """A source of loggable scalars.""" @@ -35,7 +35,8 @@ class LogQuantity(object): self.description = description @property - def default_aggregator(self): return None + def default_aggregator(self): + return None def tick(self): """Perform updates required at every :class:`LogManager` tick.""" @@ -43,15 +44,13 @@ class LogQuantity(object): def __call__(self): """Return the current value of the diagnostic represented by this - L{LogQuantity} or None if no value is available. + :class:`LogQuantity` or None if no value is available. This is only called if the invocation interval calls for it. """ raise NotImplementedError - - class PostLogQuantity(LogQuantity): """A source of loggable scalars.""" @@ -61,7 +60,6 @@ class PostLogQuantity(LogQuantity): pass - class MultiLogQuantity(object): """A source of multiple loggable scalars.""" @@ -88,21 +86,17 @@ class MultiLogQuantity(object): def __call__(self): """Return an iterable of the current values of the diagnostic represented - by this L{MultiLogQuantity}. + by this :class:`MultiLogQuantity`. This is only called if the invocation interval calls for it. """ raise NotImplementedError - - class MultiPostLogQuantity(MultiLogQuantity, PostLogQuantity): pass - - class DtConsumer(object): def __init__(self, dt): self.dt = dt @@ -111,8 +105,6 @@ class DtConsumer(object): self.dt = dt - - class TimeTracker(DtConsumer): def __init__(self, dt): DtConsumer.__init__(self, dt) @@ -122,8 +114,6 @@ class TimeTracker(DtConsumer): self.t += self.dt - - class SimulationLogQuantity(PostLogQuantity, DtConsumer): """A source of loggable scalars that needs to know the simulation timestep.""" @@ -132,9 +122,6 @@ class SimulationLogQuantity(PostLogQuantity, DtConsumer): DtConsumer.__init__(self, dt) - - - class PushLogQuantity(LogQuantity): def __init__(self, name, unit=None, description=None): LogQuantity.__init__(self, name, unit, description) @@ -142,7 +129,7 @@ class PushLogQuantity(LogQuantity): def push_value(self, value): if self.value is not None: - raise RuntimeError, "can't push two values per cycle" + raise RuntimeError("can't push two values per cycle") self.value = value def __call__(self): @@ -151,10 +138,8 @@ class PushLogQuantity(LogQuantity): return v - - class CallableLogQuantityAdapter(LogQuantity): - """Adapt a 0-ary callable as a L{LogQuantity}.""" + """Adapt a 0-ary callable as a :class:`LogQuantity`.""" def __init__(self, callable, name, unit=None, description=None): self.callable = callable LogQuantity.__init__(self, name, unit, description) @@ -162,18 +147,17 @@ class CallableLogQuantityAdapter(LogQuantity): def __call__(self): return self.callable() +# }}} +# {{{ manager functionality -# manager functionality ------------------------------------------------------- class _GatherDescriptor(object): def __init__(self, quantity, interval): self.quantity = quantity self.interval = interval - - class _QuantityData(object): def __init__(self, unit, description, default_aggregator): self.unit = unit @@ -181,8 +165,6 @@ class _QuantityData(object): self.default_aggregator = default_aggregator - - def _join_by_first_of_tuple(list_of_iterables): loi = [i.__iter__() for i in list_of_iterables] if not loi: @@ -218,8 +200,6 @@ def _join_by_first_of_tuple(list_of_iterables): force_advance = True - - def _get_unique_id(): try: from uiid import uuid1 @@ -236,14 +216,12 @@ def _get_unique_id(): rng = Random() rng.seed() for i in range(20): - checksum.update(str(rng.randrange(1<<30))) + checksum.update(str(rng.randrange(1 << 30))) return checksum.hexdigest() else: return uuid1().hex - - def _get_random_suffix(n): characters = ( [chr(65+i) for i in range(26)] @@ -254,10 +232,6 @@ def _get_random_suffix(n): return "".join(choice(characters) for i in xrange(n)) - - - - def _set_up_schema(db_conn): # initialize new database db_conn.execute(""" @@ -284,8 +258,6 @@ def _set_up_schema(db_conn): return schema_version - - class LogManager(object): """A parallel-capable diagnostic time-series logging facility. It is meant to log data from a computation, with certain log @@ -330,8 +302,9 @@ class LogManager(object): :param mode: One of "w", "r" for write, read. "w" assumes that the database is initially empty. May also be "wu" to indicate that a unique filename should be chosen automatically. - :arg mpi_comm: An C{mpi4py} communicator. If given, logs are periodically - synchronized to the head node, which then writes them out to disk. + :arg mpi_comm: A :mod:`mpi4py.MPI.Communicator`. If given, logs are + periodically synchronized to the head node, which then writes them + out to disk. :param capture_warnings: Tap the Python warnings facility and save warnings to the log file. :param commit_interval: actually perform a commit only every N times a commit @@ -381,7 +354,7 @@ class LogManager(object): try: from pysqlite2 import dbapi2 as sqlite except ImportError: - raise ImportError, "could not find a usable version of sqlite." + raise ImportError("could not find a usable version of sqlite.") if filename is None: filename = ":memory:" @@ -402,7 +375,7 @@ class LogManager(object): except sqlite.OperationalError: # we're building a new database if mode == "r": - raise RuntimeError, "Log database '%s' not found" % filename + raise RuntimeError("Log database '%s' not found" % filename) self.schema_version = _set_up_schema(self.db_conn) self.set_constant("schema_version", self.schema_version) @@ -412,7 +385,8 @@ class LogManager(object): # set globally unique run_id if self.is_parallel: self.set_constant("unique_run_id", - self.mpi_comm.bcast(_get_unique_id(), root=self.head_rank)) + self.mpi_comm.bcast(_get_unique_id(), + root=self.head_rank)) else: self.set_constant("unique_run_id", _get_unique_id()) @@ -424,7 +398,7 @@ class LogManager(object): else: # we've opened an existing database if mode == "w": - raise RuntimeError, "Log database '%s' already exists" % filename + raise RuntimeError("Log database '%s' already exists" % filename) elif mode == "wu": # try again with a new suffix continue @@ -462,10 +436,11 @@ class LogManager(object): self.old_showwarning = warnings.showwarning warnings.showwarning = _showwarning else: - raise RuntimeError, "Warnings capture was enabled twice" + raise RuntimeError("Warnings capture was enabled twice") else: if self.old_showwarning is None: - raise RuntimeError, "Warnings capture was disabled, but never enabled" + raise RuntimeError( + "Warnings capture was disabled, but never enabled") else: warnings.showwarning = self.old_showwarning self.old_showwarning = None @@ -483,8 +458,9 @@ class LogManager(object): self.is_parallel = self.constants["is_parallel"] for name, unit, description, def_agg in self.db_conn.execute( - "select name, unit, description, default_aggregator from quantities"): - qdat = self.quantity_data[name] = _QuantityData( + "select name, unit, description, default_aggregator " + "from quantities"): + self.quantity_data[name] = _QuantityData( unit, description, loads(def_agg)) def close(self): @@ -494,10 +470,9 @@ class LogManager(object): self.save() self.db_conn.close() - def get_table(self, q_name): if q_name not in self.quantity_data: - raise KeyError, "invalid quantity name '%s'" % q_name + raise KeyError("invalid quantity name '%s'" % q_name) from pytools.datatable import DataTable result = DataTable(["step", "rank", "value"]) @@ -526,7 +501,9 @@ class LogManager(object): """Add quantities that are printed after every time step.""" from pytools import Record - class WatchInfo(Record): pass + + class WatchInfo(Record): + pass for watch in watches: if isinstance(watch, tuple): @@ -590,13 +567,14 @@ class LogManager(object): self._insert_datapoint(gd.quantity.name, q_value) def tick(self): - """Record data points from each added L{LogQuantity}. + """Record data points from each added :class:`LogQuantity`. May also checkpoint data to disk, and/or synchronize data points to the head rank. """ from warnings import warn - warn("LogManager.tick() is deprecated. Use LogManager.tick_{before,after}().", + warn("LogManager.tick() is deprecated. " + "Use LogManager.tick_{before,after}().", DeprecationWarning) self.tick_before() @@ -644,8 +622,6 @@ class LogManager(object): if tick_start_time > self.last_save_time + save_interval: self.save() - end_time = time() - # print watches if self.tick_count == self.next_watch_tick: self._watch_tick() @@ -680,8 +656,8 @@ class LogManager(object): from pickle import dumps self.db_conn.execute("""insert into quantities values (?,?,?,?)""", ( - name, unit, description, - buffer(dumps(def_agg)))) + name, unit, description, + buffer(dumps(def_agg)))) self.db_conn.execute("""create table %s (step integer, rank integer, value real)""" % name) @@ -712,7 +688,7 @@ class LogManager(object): """Prepare a time-series dataset for a given expression. @arg expression: A C{pymbolic} expression that may involve - the time-series variables and the constants in this L{LogManager}. + the time-series variables and the constants in this :class:`LogManager`. If there is data from multiple ranks for a quantity occuring in this expression, an aggregator may have to be specified. @return: C{(description, unit, table)}, where C{table} @@ -798,7 +774,7 @@ class LogManager(object): def get_plot_data(self, expr_x, expr_y, min_step=None, max_step=None): """Generate plot-ready data. - @return: C{(data_x, descr_x, unit_x), (data_y, descr_y, unit_y)} + :return: ``(data_x, descr_x, unit_x), (data_y, descr_y, unit_y)`` """ (descr_x, descr_y), (unit_x, unit_y), data = \ self.get_joint_dataset([expr_x, expr_y]) @@ -822,8 +798,8 @@ class LogManager(object): """Plot data to Gnuplot.py. @arg gp: a Gnuplot.Gnuplot instance to which the plot is sent. - @arg expr_x: an allowed argument to L{get_joint_dataset}. - @arg expr_y: an allowed argument to L{get_joint_dataset}. + @arg expr_x: an allowed argument to :meth:`get_joint_dataset`. + @arg expr_y: an allowed argument to :meth:`get_joint_dataset`. @arg kwargs: keyword arguments that are directly passed on to C{Gnuplot.Data}. """ @@ -855,7 +831,8 @@ class LogManager(object): ylabel("%s [%s]" % (descr_y, unit_y)) plot(data_x, data_y) - # private functionality --------------------------------------------------- + # {{{ private functionality + def _parse_expr(self, expr): from pymbolic import parse, substitute parsed = parse(expr) @@ -892,7 +869,8 @@ class LogManager(object): agg_func = self.quantity_data[name].default_aggregator if agg_func is None: if self.is_parallel: - raise ValueError, "must specify explicit aggregator for '%s'" % name + raise ValueError( + "must specify explicit aggregator for '%s'" % name) else: agg_func = lambda lst: lst[0] elif isinstance(dep, Lookup): @@ -917,7 +895,7 @@ class LogManager(object): agg_func = lambda iterable: sqrt( sum(entry**2 for entry in iterable)) else: - raise ValueError, "invalid rank aggregator '%s'" % agg_name + raise ValueError("invalid rank aggregator '%s'" % agg_name) elif isinstance(dep, Subscript): assert isinstance(dep.aggregate, Variable) name = dep.aggregate.name @@ -928,7 +906,9 @@ class LogManager(object): qdat = self.quantity_data[name] from pytools import Record - class DependencyData(Record): pass + + class DependencyData(Record): + pass this_dep_data = DependencyData(name=name, qdat=qdat, agg_func=agg_func, varname="logvar%d" % dep_idx, expr=dep, @@ -979,11 +959,13 @@ class LogManager(object): self.next_watch_tick = self.mpi_comm.bcast( self.next_watch_tick, self.head_rank) + # }}} +# }}} +# {{{ actual data loggers -# actual data loggers --------------------------------------------------------- class _SubTimer: def __init__(self, itimer): self.itimer = itimer @@ -999,6 +981,7 @@ class _SubTimer: self.itimer.add_time(self.elapsed) del self.elapsed + class IntervalTimer(PostLogQuantity): """Records elapsed times.""" @@ -1019,8 +1002,6 @@ class IntervalTimer(PostLogQuantity): return result - - class LogUpdateDuration(LogQuantity): """Records how long the last :meth:`LogManager.tick` invocation took.""" @@ -1034,9 +1015,8 @@ class LogUpdateDuration(LogQuantity): return self.log_manager.t_log - class EventCounter(PostLogQuantity): - """Counts events signaled by L{add}.""" + """Counts events signaled by :meth:`add`.""" def __init__(self, name="interval", description=None): PostLogQuantity.__init__(self, name, "1", description) @@ -1056,9 +1036,6 @@ class EventCounter(PostLogQuantity): return result - - - def time_and_count_function(f, timer, counter=None, increment=1): def inner_f(*args, **kwargs): if counter is not None: @@ -1072,12 +1049,8 @@ def time_and_count_function(f, timer, counter=None, increment=1): return inner_f - - - - class TimestepCounter(LogQuantity): - """Counts the number of times L{LogManager.tick} is called.""" + """Counts the number of times :meth:`LogManager.tick` is called.""" def __init__(self, name="step"): LogQuantity.__init__(self, name, "1", "Timesteps") @@ -1089,8 +1062,6 @@ class TimestepCounter(LogQuantity): return result - - class StepToStepDuration(PostLogQuantity): """Records the CPU time between invocations of :meth:`LogManager.tick_before` and @@ -1113,9 +1084,6 @@ class StepToStepDuration(PostLogQuantity): return self.last_start_time - self.last2_start_time - - - class TimestepDuration(PostLogQuantity): """Records the CPU time between the starts of time steps. :meth:`LogManager.tick_before` and @@ -1140,9 +1108,6 @@ class TimestepDuration(PostLogQuantity): return result - - - class CPUTime(LogQuantity): """Records (monotonically increasing) CPU time.""" def __init__(self, name="t_cpu"): @@ -1154,8 +1119,6 @@ class CPUTime(LogQuantity): return time()-self.start - - class ETA(LogQuantity): """Records an estimate of how long the computation will still take.""" def __init__(self, total_steps, name="t_eta"): @@ -1175,10 +1138,8 @@ class ETA(LogQuantity): return 0 - - def add_general_quantities(mgr): - """Add generally applicable L{LogQuantity} objects to C{mgr}.""" + """Add generally applicable :class:`LogQuantity` objects to C{mgr}.""" mgr.add_quantity(TimestepDuration()) mgr.add_quantity(StepToStepDuration()) @@ -1187,8 +1148,6 @@ def add_general_quantities(mgr): mgr.add_quantity(TimestepCounter()) - - class SimulationTime(TimeTracker, LogQuantity): """Record (monotonically increasing) simulation time.""" @@ -1200,8 +1159,6 @@ class SimulationTime(TimeTracker, LogQuantity): return self.t - - class Timestep(SimulationLogQuantity): """Record the magnitude of the simulated time step.""" @@ -1212,9 +1169,8 @@ class Timestep(SimulationLogQuantity): return self.dt - def set_dt(mgr, dt): - """Set the simulation timestep on L{LogManager} C{mgr} to C{dt}.""" + """Set the simulation timestep on :class:`LogManager` C{mgr} to C{dt}.""" for gd_lst in [mgr.before_gather_descriptors, mgr.after_gather_descriptors]: @@ -1223,10 +1179,8 @@ def set_dt(mgr, dt): gd.quantity.set_dt(dt) - - def add_simulation_quantities(mgr, dt=None): - """Add L{LogQuantity} objects relating to simulation time.""" + """Add :class:`LogQuantity` objects relating to simulation time.""" if dt is not None: from warnings import warn warn("Specifying dt ahead of time is a deprecated practice. " @@ -1236,8 +1190,6 @@ def add_simulation_quantities(mgr, dt=None): mgr.add_quantity(Timestep(dt)) - - def add_run_info(mgr): """Add generic run metadata, such as command line, host, and time.""" @@ -1248,3 +1200,7 @@ def add_run_info(mgr): from time import localtime, strftime, time mgr.set_constant("date", strftime("%a, %d %b %Y %H:%M:%S %Z", localtime())) mgr.set_constant("unixtime", time()) + +# }}} + +# vim: foldmethod=marker