diff --git a/bin/logtool b/bin/logtool index 0ee4c4fa4e4408a10954d7ea1e6959d1c7f32216..01710934d0c55c782ca4b747ca1db97e9e202f9c 100755 --- a/bin/logtool +++ b/bin/logtool @@ -48,15 +48,13 @@ following: parser.print_help() sys.exit(1) - logmgr = LogManager() + logmgr = None did_plot = False - did_load = False def check_no_file(): - if not did_load: - from warnings import warn - warn("No file loaded -- is this what you want?") + if logmgr is None: + raise RuntimeError, "no file loaded" while args: cmd = args.pop(0) @@ -69,7 +67,10 @@ following: items = list(logmgr.quantity_data.iteritems()) items.sort(lambda a,b: cmp(a[0], b[0])) - col0_len = max(len(k) for k, v in items) + 1 + if items: + col0_len = max(len(k) for k, v in items) + 1 + else: + col0_len = 0 for key, qdat in items: print "%s\t%s" % (key.ljust(col0_len), qdat.description) @@ -80,7 +81,10 @@ following: items = list(logmgr.constants.iteritems()) items.sort(lambda a,b: cmp(a[0], b[0])) - col0_len = max(len(k) for k, v in items) + 1 + if items: + col0_len = max(len(k) for k, v in items) + 1 + else: + col0_len = 0 for key, value in items: print "%s\t%s" % (key.ljust(col0_len), str(value)) @@ -122,15 +126,10 @@ following: elif cmd == "table": check_no_file() - if not did_load: - from warnings import warn - warn("No file loaded -- is this what you want?") - print logmgr.quantity_data[args.pop(0)].table else: # not a known command, interpret as file name - logmgr.load(cmd) - did_load = True + logmgr = LogManager(cmd) if did_plot: from pylab import show, title, legend, axis, grid diff --git a/src/log.py b/src/log.py index f11f3a87895e2232cc04ab1b16243b9ee7e38d6d..3ad4505ae752dcf09839ef7219117da1444cb5d3 100644 --- a/src/log.py +++ b/src/log.py @@ -165,25 +165,20 @@ class LogManager(object): data in a saved log. """ - def __init__(self, filename=None, mpi_comm=None): + def __init__(self, filename, mpi_comm=None): """Initialize this log manager instance. - @arg filename: If given, the log is periodically written to this file. + @arg filename: If given, the filename to which this log is bound. + If this database exists, the current state is loaded from it. @arg mpi_comm: A C{boost.mpi} communicator. If given, logs are periodically synchronized to the head node, which then writes them out to disk. """ self.quantity_data = {} self.gather_descriptors = [] self.tick_count = 0 - self.filename = filename self.constants = {} - if filename is not None: - from os import access, R_OK - if access(self.filename, R_OK): - raise IOError, "cowardly refusing to overwrite '%s'" % self.filename - # self-timing self.start_time = time() self.t_log = 0 @@ -192,6 +187,7 @@ class LogManager(object): self.head_rank = 0 self.mpi_comm = mpi_comm self.is_parallel = mpi_comm is not None + if mpi_comm is None: self.rank = 0 self.last_checkpoint = self.start_time @@ -204,6 +200,47 @@ class LogManager(object): self.watches = [] self.next_watch_tick = 1 + # database binding + import sqlite3 + + if filename is not None: + self.db_conn = sqlite3.connect(filename) + try: + self.db_conn.execute("select * from quantities;") + self._load() + except sqlite3.OperationalError: + # initialize new database + self.db_conn.execute(""" + create table quantities ( + name text, + unit text, + description text, + default_aggregator blob)""") + self.db_conn.execute(""" + create table constants ( + name text, + value blob)""") + self.set_constant("is_parallel", self.is_parallel) + else: + self.db_conn = None + + def _load(self): + if self.mpi_comm and self.mpi_comm.rank != self.head_rank: + return + + from pickle import loads + for name, value in self.db_conn.execute("select name, value from constants"): + self.constants[name] = loads(value) + + for name, unit, description, def_agg in self.db_conn.execute( + "select name, unit, description, default_aggregator from quantities"): + qdat = self.quantity_data[name] = _QuantityData( + unit, description, loads(def_agg)) + + for row in self.db_conn.execute( + "select step, rank, value from %s" % name): + qdat.table.insert_row(row) + def add_watches(self, watches): """Add quantities that are printed after every time step.""" @@ -223,8 +260,22 @@ class LogManager(object): def set_constant(self, name, value): """Make a named, constant value available in the log.""" + existed = name in self.constants self.constants[name] = value + if self.db_conn is not None: + from pickle import dumps + value = buffer(dumps(value)) + + if existed: + self.db_conn.execute("update constants set value = ? where name = ?", + (value, name)) + else: + self.db_conn.execute("insert into constants values (?,?)", + (name, value)) + + self.db_conn.commit() + def tick(self): """Record data points from each added L{LogQuantity}. @@ -233,20 +284,24 @@ class LogManager(object): """ start_time = time() + def insert_datapoint(name, value): + self.quantity_data[name].table.insert_row( + (self.tick_count, self.rank, value)) + if self.db_conn is not None: + self.db_conn.execute("insert into %s values (?,?,?)" % name, + (self.tick_count, self.rank, value)) + for gd in self.gather_descriptors: if self.tick_count % gd.interval == 0: q_value = gd.quantity() if isinstance(gd.quantity, MultiLogQuantity): for name, value in zip(gd.quantity.names, q_value): - self.quantity_data[name].table.insert_row( - (self.tick_count, self.rank, value)) + insert_datapoint(name, value) else: - self.quantity_data[gd.quantity.name].table.insert_row( - (self.tick_count, self.rank, q_value)) + insert_datapoint(gd.quantity.name, q_value) self.tick_count += 1 end_time = time() - self.t_log = end_time - start_time # print watches if self.tick_count == self.next_watch_tick: @@ -256,23 +311,21 @@ class LogManager(object): if self.mpi_comm is not None: # parallel-case : sync, then checkpoint if self.tick_count == self.next_sync_tick: - if self.filename is not None: - # implicitly synchronizes - self.save() - else: - self.synchronize_logs() + self.synchronize_logs() # figure out next sync tick, broadcast to peers ticks_per_20_sec = 20*self.tick_count/max(1, end_time-self.start_time) next_sync_tick = self.tick_count + int(max(10, ticks_per_20_sec)) from boost.mpi import broadcast self.next_sync_tick = broadcast(self.mpi_comm, next_sync_tick, self.head_rank) - else: - # non-parallel-case : checkpoint log to disk, if necessary - if self.filename is not None: - if end_time - self.last_checkpoint > 10: - self.save() - self.last_checkpoint = end_time + + if self.db_conn is not None: + self.db_conn.commit() + + self.t_log = time() - start_time + + def save(self): + self.synchronize_logs() def synchronize_logs(self): """Transfer data from client ranks to the head rank. @@ -286,6 +339,13 @@ class LogManager(object): for rank_data in gather(self.mpi_comm, None, self.head_rank)[1:]: for name, rows in rank_data: self.quantity_data[name].table.insert_rows(rows) + if self.db_conn is not None: + for row in rows: + self.db_conn.execute("insert into ? values (?,?,?)", + (name,) + row) + + if self.db_conn is not None: + self.db_conn.commit() else: # send non-head data away gather(self.mpi_comm, @@ -299,19 +359,35 @@ class LogManager(object): def add_quantity(self, quantity, interval=1): """Add an object derived from L{LogQuantity} to this manager.""" + def add_internal(name, unit, description, def_agg): + if name in self.quantity_data: + raise RuntimeError("cannot add the same quantity '%s' twice" % name) + self.quantity_data[name] = _QuantityData( + unit, description, def_agg) + + if self.db_conn is not None: + from pickle import dumps + self.db_conn.execute("""insert into quantities values (?,?,?,?)""", ( + name, unit, description, + buffer(dumps(def_agg)))) + self.db_conn.execute("""create table %s + (step integer, rank integer, value real)""" % name) + + self.db_conn.commit() + self.gather_descriptors.append(_GatherDescriptor(quantity, interval)) + if isinstance(quantity, MultiLogQuantity): for name, unit, description, def_agg in zip( quantity.names, quantity.units, quantity.descriptions, quantity.default_aggregators): - self.quantity_data[name] = _QuantityData( - unit, description, def_agg) + add_internal(name, unit, description, def_agg) else: - self.quantity_data[quantity.name] = _QuantityData( - quantity.unit, quantity.description, - quantity.default_aggregator) + add_internal(quantity.name, + quantity.unit, quantity.description, + quantity.default_aggregator) def get_expr_dataset(self, expression, description=None, unit=None): """Prepare a time-series dataset for a given expression. @@ -392,37 +468,6 @@ class LogManager(object): return zipped_dubs - def save(self, filename=None): - """Save log data to a file. - - L{synchronize_logs} is called before saving. - - @arg filename: Specify the file name. If not given, the globally set name - is used. - """ - self.synchronize_logs() - if self.mpi_comm and self.mpi_comm.rank != self.head_rank: - return - - if filename is not None: - from os import access, R_OK - if access(filename, R_OK): - raise IOError, "cowardly refusing to overwrite '%s'" % filename - else: - filename = self.filename - - from cPickle import dump, HIGHEST_PROTOCOL - dump((self.quantity_data, self.constants, self.is_parallel), - open(filename, "w"), protocol=HIGHEST_PROTOCOL) - - def load(self, filename): - """Load saved log data from C{filename}.""" - if self.mpi_comm and self.mpi_comm.rank != self.head_rank: - return - - from cPickle import load - self.quantity_data, self.constants, self.is_parallel = load(open(filename)) - def get_plot_data(self, expr_x, expr_y, min_step=None, max_step=None): """Generate plot-ready data. @@ -767,8 +812,6 @@ def add_simulation_quantities(mgr, dt): mgr.add_quantity(SimulationTime(dt)) mgr.add_quantity(Timestep(dt)) - add_general_quantities(mgr) -