diff --git a/pytools/persistent_dict.py b/pytools/persistent_dict.py new file mode 100644 index 0000000000000000000000000000000000000000..7f82ae2127744ad8c254117c3a01b2e530bf208a --- /dev/null +++ b/pytools/persistent_dict.py @@ -0,0 +1,401 @@ +"""Generic persistent, concurrent dictionary-like facility.""" + +from __future__ import division, with_statement + +__copyright__ = "Copyright (C) 2011,2014 Andreas Kloeckner" + +__license__ = """ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +import sys +import os + +try: + import hashlib + new_hash = hashlib.sha256 +except ImportError: + # for Python << 2.5 + import sha + new_hash = sha.new + + +def _erase_dir(dir): + from os import listdir, unlink, rmdir + from os.path import join, isdir + for name in listdir(dir): + sub_name = join(dir, name) + if isdir(sub_name): + _erase_dir(sub_name) + else: + unlink(sub_name) + + rmdir(dir) + + +def update_checksum(checksum, obj): + if isinstance(obj, unicode): + checksum.update(obj.encode("utf8")) + else: + checksum.update(obj) + + +# {{{ cleanup managers + +class CleanupBase(object): + pass + + +class CleanupManager(CleanupBase): + def __init__(self): + self.cleanups = [] + + def register(self, c): + self.cleanups.insert(0, c) + + def clean_up(self): + for c in self.cleanups: + c.clean_up() + + def error_clean_up(self): + for c in self.cleanups: + c.error_clean_up() + + +class LockManager(CleanupBase): + def __init__(self, cleanup_m, container_dir): + if container_dir is not None: + self.lock_file = os.path.join(container_dir, "lock") + + attempts = 0 + while True: + try: + self.fd = os.open(self.lock_file, + os.O_CREAT | os.O_WRONLY | os.O_EXCL) + break + except OSError: + pass + + from time import sleep + sleep(1) + + attempts += 1 + + if attempts > 10: + from warnings import warn + warn("could not obtain lock--delete '%s' if necessary" + % self.lock_file) + + cleanup_m.register(self) + + def clean_up(self): + import os + os.close(self.fd) + os.unlink(self.lock_file) + + def error_clean_up(self): + pass + + +class ItemDirManager(CleanupBase): + def __init__(self, cleanup_m, path): + from os import mkdir + + self.path = path + try: + mkdir(self.path) + cleanup_m.register(self) + self.existed = False + except OSError: + self.existed = True + + def sub(self, n): + from os.path import join + return join(self.path, n) + + def reset(self): + import os + _erase_dir(self.path) + os.mkdir(self.path) + + def clean_up(self): + pass + + def error_clean_up(self): + _erase_dir(self.path) + +# }}} + + +# {{{ key generation + +class KeyBuilder(object): + def rec(self, key_hash, key): + try: + method = key.update_persistent_hash + except AttributeError: + pass + else: + method(key_hash, self) + return + + try: + method = getattr(self, "update_for_"+type(key).__name__) + except AttributeError: + pass + else: + method(key_hash, key) + return + + raise TypeError("unsupported type for persistent hash keying: %s" + % type(key)) + + def __call__(self, key): + key_hash = new_hash() + self.rec(key_hash, key) + return key_hash.hexdigest() + + # {{{ mappers + + def update_for_int(self, key_hash, key): + key_hash.update(str(key)) + + upddate_for_long = update_for_int + + def update_for_float(self, key_hash, key): + key_hash.update(repr(key)) + + if sys.version_info >= (3,): + def update_for_str(self, key_hash, key): + key_hash.update(key.encode('utf8')) + else: + def update_for_str(self, key_hash, key): + key_hash.update(key) + + def update_for_unicode(self, key_hash, key): + key_hash.update(key.encode('utf8')) + + def update_for_tuple(self, key_hash, key): + for obj_i in key: + self.rec(key_hash, obj_i) + + # }}} + +# }}} + + +# {{{ top-level + +class NoSuchEntryError(KeyError): + pass + + +class PersistentDict(object): + def __init__(self, identifier, key_builder=None, container_dir=None): + """ + :arg identifier: a file-name-compatible string identifying this + dictionary + :arg key_builder: a subclass of :class:`KeyBuilder` + """ + + self.identifier = identifier + + if key_builder is None: + key_builder = KeyBuilder() + + self.key_builder = key_builder + + from os.path import join + if container_dir is None: + from tempfile import gettempdir + import getpass + container_dir = join(gettempdir(), + "pytools-pdict-%s-uid%s-py%s" % ( + identifier, + getpass.getuser(), + ".".join(str(i) for i in sys.version_info))) + + self.container_dir = container_dir + + self._make_container_dir() + + def _make_container_dir(self): + # {{{ ensure container directory exists + + try: + os.mkdir(self.container_dir) + except OSError, e: + from errno import EEXIST + if e.errno != EEXIST: + raise + + # }}} + + def store(self, key, value, info_files={}): + hexdigest_key = self.key_builder(key) + + cleanup_m = CleanupManager() + try: + try: + LockManager(cleanup_m, self.container_dir) + + from os.path import join + item_dir_m = ItemDirManager(cleanup_m, + join(self.container_dir, hexdigest_key)) + + if item_dir_m.existed: + item_dir_m.reset() + + for info_name, info_value in info_files.iteritems(): + info_path = item_dir_m.sub("info_"+info_name) + + with open(info_path, "wt") as outf: + outf.write(info_value) + + from cPickle import dump + value_path = item_dir_m.sub("contents") + with open(value_path, "wb") as outf: + dump(value, outf) + + # Write key last, so that if the reader below + key_path = item_dir_m.sub("key") + with open(key_path, "wb") as outf: + dump(key, outf) + + except: + cleanup_m.error_clean_up() + raise + finally: + cleanup_m.clean_up() + + def fetch(self, key): + hexdigest_key = self.key_builder(key) + + from os.path import join, isdir + item_dir = join(self.container_dir, hexdigest_key) + if not isdir(item_dir): + raise NoSuchEntryError(key) + + cleanup_m = CleanupManager() + try: + try: + LockManager(cleanup_m, self.container_dir) + + item_dir_m = ItemDirManager(cleanup_m, item_dir) + key_path = item_dir_m.sub("key") + value_path = item_dir_m.sub("contents") + + from cPickle import load + + # {{{ load key file + + exc = None + + try: + with open(key_path, "rb") as inf: + read_key = load(inf) + except IOError, e: + exc = e + except EOFError, e: + exc = e + + if exc is not None: + item_dir_m.reset() + from warnings import warn + warn("pytools.persistent_dict.PersistentDict(%s) " + "encountered an invalid " + "key file for key %s. Entry deleted." + % (self.identifier, hexdigest_key)) + raise NoSuchEntryError(key) + + # }}} + + if read_key != key: + # Key collision, oh well. + raise NoSuchEntryError(key) + + # {{{ load value + + exc = None + + try: + with open(value_path, "rb") as inf: + read_contents = load(inf) + except IOError, e: + exc = e + except EOFError, e: + exc = e + + if exc is not None: + item_dir_m.reset() + from warnings import warn + warn("pytools.persistent_dict.PersistentDict(%s) " + "encountered an invalid " + "key file for key %s. Entry deleted." + % (self.identifier, hexdigest_key)) + raise NoSuchEntryError(key) + + # }}} + + return read_contents + + except: + cleanup_m.error_clean_up() + raise + finally: + cleanup_m.clean_up() + + def remove(self, key): + hexdigest_key = self.key_builder(key) + + from os.path import join, isdir + item_dir = join(self.container_dir, hexdigest_key) + if not isdir(item_dir): + raise NoSuchEntryError(key) + + cleanup_m = CleanupManager() + try: + try: + LockManager(cleanup_m, self.container_dir) + + item_dir_m = ItemDirManager(cleanup_m, item_dir) + item_dir_m.reset() + + except: + cleanup_m.error_clean_up() + raise + finally: + cleanup_m.clean_up() + + def __getitem__(self, key): + return self.fetch(key) + + def __setitem__(self, key, value): + return self.store(key, value) + + def __delitem__(self, key): + self.remove(key) + + def clear(self): + _erase_dir(self.container_dir) + self._make_container_dir() + +# }}} + +# vim: foldmethod=marker diff --git a/test/test_persistent_dict.py b/test/test_persistent_dict.py new file mode 100644 index 0000000000000000000000000000000000000000..96ba48f3b5703198d5979c226076def75c37e786 --- /dev/null +++ b/test/test_persistent_dict.py @@ -0,0 +1,43 @@ +from __future__ import division, with_statement + +import pytest # noqa +import sys # noqa + + +def test_persistent_dict(): + from pytools.persistent_dict import PersistentDict + pdict = PersistentDict("pytools-test") + pdict.clear() + + from random import randrange + + def rand_str(n=20): + return "".join( + chr(65+randrange(26)) + for i in range(n)) + + keys = [(randrange(2000), rand_str()) for i in range(20)] + values = [randrange(2000) for i in range(20)] + + d = dict(zip(keys, values)) + + for k, v in zip(keys, values): + pdict[k] = v + pdict.store(k, v, info_files={"hey": str(v)}) + + for k, v in d.iteritems(): + assert d[k] == pdict[k] + + for k, v in zip(keys, values): + pdict.store(k, v+1, info_files={"hey": str(v)}) + + for k, v in d.iteritems(): + assert d[k] + 1 == pdict[k] + + +if __name__ == "__main__": + if len(sys.argv) > 1: + exec sys.argv[1] + else: + from py.test.cmdline import main + main([__file__])