diff --git a/bin/runalyzer-gather b/bin/runalyzer-gather index 86e8582c421a1d23e94646ed0b2fe8a291728ce9..47c0e67f6f44ded902afad2717b77991632539b1 100644 --- a/bin/runalyzer-gather +++ b/bin/runalyzer-gather @@ -102,7 +102,7 @@ class FeatureGatherer: self.dir_to_features[line[:colon_idx]] = features - def get_db_features(self, dbname): + def get_db_features(self, dbname, logmgr): from os.path import dirname dn = dirname(dbname) @@ -112,36 +112,55 @@ class FeatureGatherer: features.extend(parse_dir_feature(feat, i) for i, feat in enumerate(dn.split("-"))) - from pytools.log import LogManager - logmgr = LogManager(dbname, "r") for name, value in logmgr.constants.iteritems(): features.append((name,) + sql_type_and_value(value)) - logmgr.close() return features - def gather_feature_types(self, dbnames, progress=True): - features = {} - from pytools import ProgressBar - if progress: - pb = ProgressBar("Scanning...", len(dbnames)) - - for dbname in dbnames: - if progress: - pb.progress() - for fname, ftype, fvalue in self.get_db_features(dbname): - if fname in features: - features[fname] = larger_sql_type(ftype, features[fname]) - else: - if ftype is None: - ftype = "text" - features[fname] = ftype + + +def scan(fg, dbnames, progress=True): + features = {} + dbname_to_run_id = {} + uid_to_run_id = {} + next_run_id = 1 + + from pytools import ProgressBar + if progress: + pb = ProgressBar("Scanning...", len(dbnames)) + + for dbname in dbnames: + from pytools.log import LogManager + logmgr = LogManager(dbname, "r") + + unique_run_id = logmgr.constants.get("unique_run_id") + run_id = uid_to_run_id.get(unique_run_id) + + if run_id is None: + run_id = next_run_id + next_run_id += 1 + + if unique_run_id is not None: + uid_to_run_id[unique_run_id] = run_id + + dbname_to_run_id[dbname] = run_id if progress: - pb.finished() + pb.progress() - return features + for fname, ftype, fvalue in fg.get_db_features(dbname, logmgr): + if fname in features: + features[fname] = larger_sql_type(ftype, features[fname]) + else: + if ftype is None: + ftype = "text" + features[fname] = ftype + + if progress: + pb.finished() + + return features, dbname_to_run_id @@ -164,50 +183,76 @@ def make_name_map(map_str): -def main(): - import sys - from optparse import OptionParser +def transfer_data_table(db_conn, tbl_name, data_table): + db_conn.executemany("insert into %s (%s) values (%s)" % + (tbl_name, + ", ".join(data_table.column_names), + ", ".join("?" * len(data_table.column_names))), + data_table.data) - parser = OptionParser(usage="%prog OUTDB DBFILES ...") - parser.add_option("-s", "--show-features", action="store_true", - help="Only print the features found and quit") - parser.add_option("-d", "--dir-features", action="store_true", - help="Extract features from directory names") - parser.add_option("-f", "--file-features", default=None, - metavar="FILENAME", - help="Read additional features from file, with lines like: " - "'dirname: key=value, key=value'") - parser.add_option("-m", "--feature-map", default=None, - help="Specify a feature name map.", - metavar="F1=FNAME1,F2=FNAME2") - parser.add_option("-q", "--quantity-map", default=None, - help="Specify a quantity name map.", - metavar="Q1=QNAME1,Q2=QNAME2") - options, args = parser.parse_args() + - if len(args) < 2: - parser.print_help() - sys.exit(1) - outfile = args[0] - infiles = args[1:] +def gather_single_file(outfile, infiles): + from pytools import ProgressBar + pb = ProgressBar("Importing...", len(infiles)) - # list of run features as {name: sql_type} - fg = FeatureGatherer(options.dir_features, options.file_features) - features = fg.gather_feature_types(infiles) + import sqlite3 + db_conn = sqlite3.connect(outfile) - fmap = make_name_map(options.feature_map) - qmap = make_name_map(options.quantity_map) + from pytools.log import _set_up_schema + _set_up_schema(db_conn) - if options.show_features: - for feat_name, feat_type in features.iteritems(): - print fmap.get(feat_name, feat_name), feat_type - sys.exit(0) + from pickle import dumps + + seen_constants = set() + seen_quantities = set() + + for dbname in infiles: + pb.progress() + + from pytools.log import LogManager + logmgr = LogManager(dbname, "r") + + # transfer warnings + transfer_data_table(db_conn, "warnings", logmgr.get_warnings()) + + # transfer constants + for key, val in logmgr.constants.iteritems(): + if key not in seen_constants: + db_conn.execute("insert into constants values (?,?)", + (key, buffer(dumps(val)))) + seen_constants.add(key) + + for qname, qdata in logmgr.quantity_data.iteritems(): + db_conn.execute("""insert into quantities values (?,?,?,?)""", ( + qname, qdata.unit, qdata.description, + buffer(dumps(qdata.default_aggregator)))) + + if qname not in seen_quantities: + db_conn.execute("""create table %s + (step integer, rank integer, value real)""" % qname) + seen_quantities.add(qname) + + transfer_data_table(db_conn, qname, logmgr.get_table(qname)) + + pb.finished() + + db_conn.commit() + db_conn.close() + + + + +def gather_multi_file(outfile, infiles, fmap, qmap, fg, features, + dbname_to_run_id): + from pytools import ProgressBar + pb = ProgressBar("Importing...", len(infiles)) import sqlite3 db_conn = sqlite3.connect(outfile) run_columns = [ - "id integer primary key autoincrement", + "id integer primary key", "dirname text"] + ["%s %s" % (fmap.get(fname, fname), ftype) for fname, ftype in features.iteritems()] db_conn.execute("create table runs (%s)" % ",".join(run_columns)) @@ -223,23 +268,28 @@ def main(): created_tables = set() - from pytools import ProgressBar - pb = ProgressBar("Importing...", len(infiles)) - from os.path import dirname + uid_to_run_id = {} + written_run_ids = set() + for dbname in infiles: pb.progress() - dbfeatures = fg.get_db_features(dbname) - qry = "insert into runs (%s) values (%s)" % ( - ",".join(["dirname"]+[fmap.get(f[0], f[0]) for f in dbfeatures]), - ",".join("?" * (len(dbfeatures)+1))) - rows = db_conn.execute(qry, [dirname(dbname)]+[f[2] for f in dbfeatures]) - run_id = rows.lastrowid + run_id = dbname_to_run_id[dbname] from pytools.log import LogManager logmgr = LogManager(dbname, "r") + + if run_id not in written_run_ids: + dbfeatures = fg.get_db_features(dbname, logmgr) + qry = "insert into runs (%s) values (%s)" % ( + ",".join(["id", "dirname"]+[fmap.get(f[0], f[0]) for f in dbfeatures]), + ",".join("?" * (len(dbfeatures)+2))) + rows = db_conn.execute(qry, [run_id, dirname(dbname)]+[f[2] for f in dbfeatures]) + + written_run_ids.add(run_id) + for qname, qdat in logmgr.quantity_data.iteritems(): tgt_qname = qmap.get(qname, qname) @@ -271,6 +321,63 @@ def main(): pb.finished() db_conn.commit() + db_conn.close() + + + + +def main(): + import sys + from optparse import OptionParser + + parser = OptionParser(usage="%prog OUTDB DBFILES ...") + parser.add_option("-1", "--single", action="store_true", + help="Gather single-run instead of multi-run file") + parser.add_option("-s", "--show-features", action="store_true", + help="Only print the features found and quit") + parser.add_option("-d", "--dir-features", action="store_true", + help="Extract features from directory names") + parser.add_option("-f", "--file-features", default=None, + metavar="FILENAME", + help="Read additional features from file, with lines like: " + "'dirname: key=value, key=value'") + parser.add_option("-m", "--feature-map", default=None, + help="Specify a feature name map.", + metavar="F1=FNAME1,F2=FNAME2") + parser.add_option("-q", "--quantity-map", default=None, + help="Specify a quantity name map.", + metavar="Q1=QNAME1,Q2=QNAME2") + options, args = parser.parse_args() + + if len(args) < 2: + parser.print_help() + sys.exit(1) + + outfile = args[0] + infiles = args[1:] + + # list of run features as {name: sql_type} + fg = FeatureGatherer(options.dir_features, options.file_features) + features, dbname_to_run_id = scan(fg, infiles) + + fmap = make_name_map(options.feature_map) + qmap = make_name_map(options.quantity_map) + + if options.show_features: + for feat_name, feat_type in features.iteritems(): + print fmap.get(feat_name, feat_name), feat_type + sys.exit(0) + + if options.single: + if len(set(dbname_to_run_id.values())) > 1: + raise ValueError( + "data seems to come from more than one run--" + "can't write single-run file") + gather_single_file(outfile, infiles) + else: + gather_multi_file(outfile, infiles, fmap, qmap, fg, features, + dbname_to_run_id) + diff --git a/pytools/log.py b/pytools/log.py index af15c0cf0aeca492be16b64e398ab8b2b495aeca..ea4688baf57b7bef2367d39d7a1b8c6ef3cd40a8 100644 --- a/pytools/log.py +++ b/pytools/log.py @@ -163,6 +163,58 @@ def _join_by_first_of_tuple(list_of_iterables): +def _get_unique_id(): + try: + from uiid import uuid1 + except ImportError: + try: + import hashlib + checksum = hashlib.md5() + except ImportError: + # for Python << 2.5 + import md5 + checksum = md5.new() + + from random import Random + rng = Random() + rng.seed() + for i in range(20): + checksum.update(str(rng.randrange(1<<30))) + return checksum.hexdigest() + else: + return uuid1().hex + + + + +def _set_up_schema(db_conn): + # initialize new database + db_conn.execute(""" + create table quantities ( + name text, + unit text, + description text, + default_aggregator blob)""") + db_conn.execute(""" + create table constants ( + name text, + value blob)""") + db_conn.execute(""" + create table warnings ( + rank integer, + step integer, + message text, + category text, + filename text, + lineno integer + )""") + + schema_version = 2 + return schema_version + + + + class LogManager(object): """A parallel-capable diagnostic time-series logging facility. @@ -216,14 +268,12 @@ class LogManager(object): self.rank = mpi_comm.rank self.head_rank = 0 - self.next_sync_tick = 10 - # watch stuff self.watches = [] self.next_watch_tick = 1 # database binding - if filename is not None and self.rank == self.head_rank: + if filename is not None: try: import sqlite3 as sqlite except ImportError: @@ -232,6 +282,9 @@ class LogManager(object): except ImportError: raise ImportError, "could not find a usable version of sqlite." + if self.is_parallel: + filename += "-rank%d" % self.rank + self.db_conn = sqlite.connect(filename, timeout=30) self.mode = mode try: @@ -243,28 +296,19 @@ class LogManager(object): if mode == "r": raise RuntimeError, "Log database '%s' not found" % filename - # initialize new database - self.db_conn.execute(""" - create table quantities ( - name text, - unit text, - description text, - default_aggregator blob)""") - self.db_conn.execute(""" - create table constants ( - name text, - value blob)""") - self.db_conn.execute(""" - create table warnings ( - step integer, - message text, - category text, - filename text, - lineno integer - )""") - self.set_constant("is_parallel", self.is_parallel) - self.schema_version = 1 + self.schema_version = _set_up_schema(self.db_conn) self.set_constant("schema_version", self.schema_version) + + self.set_constant("is_parallel", self.is_parallel) + + # set globally unique run_id + if self.is_parallel: + from boostmpi import broadcast + self.set_constant("unique_run_id", + broadcast(self.mpi_comm, _get_unique_id(), + root=self.head_rank)) + else: + self.set_constant("unique_run_id", _get_unique_id()) else: self.db_conn = None @@ -273,8 +317,6 @@ class LogManager(object): self.capture_warnings(True) def capture_warnings(self, enable=True): - # FIXME warning capture on multiple processors - def _showwarning(message, category, filename, lineno, file=None, line=None): try: self.old_showwarning(message, category, filename, lineno, file, line) @@ -285,8 +327,14 @@ class LogManager(object): if (self.db_conn is not None and self.schema_version >= 1 and self.mode == "w"): - self.db_conn.execute("insert into warnings values (?,?,?,?,?)", - (self.tick_count, str(message), str(category), filename, lineno)) + if self.schema_version >= 2: + self.db_conn.execute("insert into warnings values (?,?,?,?,?,?)", + (self.rank, self.tick_count, str(message), str(category), + filename, lineno)) + else: + self.db_conn.execute("insert into warnings values (?,?,?,?,?)", + (self.tick_count, str(message), str(category), + filename, lineno)) import warnings if enable: @@ -342,12 +390,16 @@ class LogManager(object): return result def get_warnings(self): + columns = ["step", "message", "category", "filename", "lineno"] + if self.db_conn is not None and self.schema_version >= 2: + columns.insert(0, "rank") + from pytools.datatable import DataTable - result = DataTable(["step", "message", "category", "filename", "lineno"]) + result = DataTable(columns) - if self.schema_version >= 1 and self.db_conn is not None: + if self.db_conn is not None: for row in self.db_conn.execute( - "select step, message, category, filename, lineno from warnings"): + "select %s from warnings" % (", ".join(columns))): result.insert_row(row) return result @@ -432,53 +484,13 @@ class LogManager(object): if self.tick_count == self.next_watch_tick: self._watch_tick() - if self.tick_count == self.next_sync_tick: - # sync every few seconds: - self.save() - - # figure out next sync tick, broadcast to peers - ticks_per_10_sec = 10*self.tick_count/max(1, end_time-self.start_time) - self.next_sync_tick = self.tick_count + int(max(50, ticks_per_10_sec)) - if self.mpi_comm is not None: - from boostmpi import broadcast - self.next_sync_tick = broadcast(self.mpi_comm, self.next_sync_tick, self.head_rank) - self.t_log = time() - start_time def save(self): - self.synchronize_logs() - if self.db_conn is not None: # then, to disk self.db_conn.commit() - def synchronize_logs(self): - """Transfer data from client ranks to the head rank. - - Must be called on all ranks simultaneously.""" - if self.mpi_comm is None: - return - - from boostmpi import gather - if self.mpi_comm.rank == self.head_rank: - for rank_data in gather(self.mpi_comm, None, self.head_rank)[1:]: - for name, rows in rank_data: - self.get_table(name).insert_rows(rows) - if self.db_conn is not None: - for row in rows: - self.db_conn.execute( - "insert into %s values (?,?,?)" % name, row) - else: - # send non-head data away - gather(self.mpi_comm, - [(name, self.get_table(name).data) - for name, qdat in self.quantity_data.iteritems()], - self.head_rank) - - # and erase it - for qname in self.quantity_data.iterkeys(): - self.get_table(qname).clear() - def add_quantity(self, quantity, interval=1): """Add an object derived from L{LogQuantity} to this manager.""" def add_internal(name, unit, description, def_agg):