diff --git a/pytools/log.py b/pytools/log.py index 50f4311a51a63f0c3d3115c0708c095996ca4108..dc85e2b7cecc2bf7015ef251d942b7014a38eed1 100644 --- a/pytools/log.py +++ b/pytools/log.py @@ -27,12 +27,12 @@ class LogQuantity(object): self.name = name self.unit = unit self.description = description - + @property def default_aggregator(self): return None def __call__(self): - """Return the current value of the diagnostic represented by this + """Return the current value of the diagnostic represented by this L{LogQuantity} or None if no value is available.""" raise NotImplementedError @@ -45,12 +45,12 @@ class MultiLogQuantity(object): self.names = names self.units = units self.descriptions = descriptions - + @property def default_aggregators(self): return [None] * len(self.names) def __call__(self): - """Return an iterable of the current values of the diagnostic represented + """Return an iterable of the current values of the diagnostic represented by this L{MultiLogQuantity}.""" raise NotImplementedError @@ -73,7 +73,7 @@ class SimulationLogQuantity(LogQuantity, DtConsumer): def __init__(self, dt, name, unit=None, description=None): LogQuantity.__init__(self, name, unit, description) DtConsumer.__init__(self, dt) - + @@ -93,7 +93,7 @@ class PushLogQuantity(LogQuantity): self.value = None return v - + class CallableLogQuantityAdapter(LogQuantity): @@ -191,19 +191,19 @@ def _set_up_schema(db_conn): # initialize new database db_conn.execute(""" create table quantities ( - name text, - unit text, + name text, + unit text, description text, default_aggregator blob)""") db_conn.execute(""" create table constants ( - name text, + name text, value blob)""") db_conn.execute(""" create table warnings ( rank integer, step integer, - message text, + message text, category text, filename text, lineno integer @@ -218,8 +218,8 @@ def _set_up_schema(db_conn): class LogManager(object): """A parallel-capable diagnostic time-series logging facility. - A C{LogManager} logs any number of named time series of floats to - a file. Non-time-series data, in the form of constants, is also + A C{LogManager} logs any number of named time series of floats to + a file. Non-time-series data, in the form of constants, is also supported and saved. If MPI parallelism is used, the "head rank" below always refers to @@ -234,7 +234,7 @@ class LogManager(object): @arg filename: If given, the filename to which this log is bound. If this database exists, the current state is loaded from it. - @arg mode: One of "w", "r" for write, read. "w" assumes that the + @arg mode: One of "w", "r" for write, read. "w" assumes that the database is initially empty. @arg mpi_comm: An C{boostmpi} communicator. If given, logs are periodically synchronized to the head node, which then writes them out to disk. @@ -305,8 +305,8 @@ class LogManager(object): # set globally unique run_id if self.is_parallel: from boostmpi import broadcast - self.set_constant("unique_run_id", - broadcast(self.mpi_comm, _get_unique_id(), + self.set_constant("unique_run_id", + broadcast(self.mpi_comm, _get_unique_id(), root=self.head_rank)) else: self.set_constant("unique_run_id", _get_unique_id()) @@ -336,16 +336,16 @@ class LogManager(object): # cater to Python 2.5 and earlier self.old_showwarning(message, category, filename, lineno) - if (self.db_conn is not None - and self.schema_version >= 1 + if (self.db_conn is not None + and self.schema_version >= 1 and self.mode == "w"): if self.schema_version >= 2: self.db_conn.execute("insert into warnings values (?,?,?,?,?,?)", - (self.rank, self.tick_count, str(message), str(category), + (self.rank, self.tick_count, str(message), str(category), filename, lineno)) else: self.db_conn.execute("insert into warnings values (?,?,?,?,?)", - (self.tick_count, str(message), str(category), + (self.tick_count, str(message), str(category), filename, lineno)) import warnings @@ -471,7 +471,7 @@ class LogManager(object): def tick(self): """Record data points from each added L{LogQuantity}. - May also checkpoint data to disk, and/or synchronize data points + May also checkpoint data to disk, and/or synchronize data points to the head rank. """ tick_start_time = time() @@ -540,7 +540,7 @@ class LogManager(object): self.db_conn.execute("""insert into quantities values (?,?,?,?)""", ( name, unit, description, buffer(dumps(def_agg)))) - self.db_conn.execute("""create table %s + self.db_conn.execute("""create table %s (step integer, rank integer, value real)""" % name) self.db_conn.commit() @@ -555,8 +555,8 @@ class LogManager(object): quantity.default_aggregators): add_internal(name, unit, description, def_agg) else: - add_internal(quantity.name, - quantity.unit, quantity.description, + add_internal(quantity.name, + quantity.unit, quantity.description, quantity.default_aggregator) def get_expr_dataset(self, expression, description=None, unit=None): @@ -566,7 +566,7 @@ class LogManager(object): the time-series variables and the constants in this L{LogManager}. If there is data from multiple ranks for a quantity occuring in this expression, an aggregator may have to be specified. - @return: C{(description, unit, table)}, where C{table} + @return: C{(description, unit, table)}, where C{table} is a list of tuples C{(tick_nbr, value)}. Aggregators are specified as follows: @@ -671,7 +671,7 @@ class LogManager(object): @arg gp: a Gnuplot.Gnuplot instance to which the plot is sent. @arg expr_x: an allowed argument to L{get_joint_dataset}. @arg expr_y: an allowed argument to L{get_joint_dataset}. - @arg kwargs: keyword arguments that are directly passed on to + @arg kwargs: keyword arguments that are directly passed on to C{Gnuplot.Data}. """ (data_x, descr_x, unit_x), (data_y, descr_y, unit_y) = \ @@ -735,7 +735,7 @@ class LogManager(object): if isinstance(dep, Variable): name = dep.name agg_func = self.quantity_data[name].default_aggregator - if agg_func is None: + if agg_func is None: if self.is_parallel: raise ValueError, "must specify explicit aggregator for '%s'" % name else: @@ -782,7 +782,7 @@ class LogManager(object): # substitute in the "logvar" variable names from pymbolic import var, substitute - parsed = substitute(parsed, + parsed = substitute(parsed, dict((dd.expr, var(dd.varname)) for dd in dep_data)) return parsed, dep_data @@ -816,7 +816,7 @@ class LogManager(object): def compute_watch_str(watch): try: return "%s=%g" % (watch.display, watch.compiled( - *[dd.agg_func(values[dd.name]) + *[dd.agg_func(values[dd.name]) for dd in watch.dep_data])) except ZeroDivisionError: return "%s:div0" % watch.display @@ -831,7 +831,7 @@ class LogManager(object): self.next_watch_tick = self.tick_count + int(max(1, ticks_per_sec)) if self.mpi_comm is not None and self.have_nonlocal_watches: - self.next_watch_tick = broadcast(self.mpi_comm, + self.next_watch_tick = broadcast(self.mpi_comm, self.next_watch_tick, self.head_rank) @@ -866,7 +866,7 @@ class IntervalTimer(LogQuantity): def start(self): from warnings import warn - warn("IntervalTimer.start() is deprecated. Use start_sub_timer() instead.", + warn("IntervalTimer.start() is deprecated. Use start_sub_timer() instead.", DeprecationWarning, stacklevel=2) self.start_time = time() @@ -905,7 +905,7 @@ class LogUpdateDuration(LogQuantity): class EventCounter(LogQuantity): """Counts events signaled by L{add}.""" - + def __init__(self, name="interval", description=None): LogQuantity.__init__(self, name, "1", description) self.events = 0