From 6f39c2d6b790001f65d28ccf75fdfdcbbdf56919 Mon Sep 17 00:00:00 2001 From: Matt Wala Date: Mon, 18 Sep 2017 19:25:15 -0500 Subject: [PATCH] Add an LRU cache to PersistentDict. This adds an optional in-memory LRU cache. To use the cache, you supply a *in_mem_cache_size* parameter to the PersistentDict. In order to properly support cache invalidation this also implements version tracking of files. This change is backwards compatible with existing persistent dict caches. --- pytools/persistent_dict.py | 244 +++++++++++++++++++++++++++++++---- test/test_persistent_dict.py | 57 ++++++++ 2 files changed, 278 insertions(+), 23 deletions(-) diff --git a/pytools/persistent_dict.py b/pytools/persistent_dict.py index 3fe6948..6a1a707 100644 --- a/pytools/persistent_dict.py +++ b/pytools/persistent_dict.py @@ -54,6 +54,15 @@ except ImportError: new_hash = sha.new +def _make_dir_recursively(dir): + try: + os.makedirs(dir) + except OSError as e: + from errno import EEXIST + if e.errno != EEXIST: + raise + + def _erase_dir(dir): from os import listdir, unlink, rmdir from os.path import join, isdir @@ -274,12 +283,127 @@ class NoSuchEntryError(KeyError): pass +class _LinkedList(object): + """The list operates on nodes of the form [value, leftptr, rightpr]. To create a + node of this form you can use `LinkedList.new_node().` + + Supports inserting at the left and deleting from an arbitrary location. + """ + + def __init__(self): + self.count = 0 + self.head = None + self.end = None + + @staticmethod + def new_node(element): + return [element, None, None] + + def __len__(self): + return self.count + + def appendleft_node(self, node): + self.count += 1 + + if self.head is None: + self.head = self.end = node + return + + self.head[1] = node + node[2] = self.head + + self.head = node + + def pop_node(self): + end = self.end + self.remove_node(end) + return end + + def remove_node(self, node): + self.count -= 1 + + if self.head is self.end: + self.head = self.end = None + return + + left = node[1] + right = node[2] + + if left is None: + self.head = right + else: + left[2] = right + + if right is None: + self.tail = left + else: + right[1] = left + + node[1] = node[2] = None + + +class _LRUCache(object): + """A mapping that keeps at most *maxsize* items with an LRU replacement policy. + """ + + def __init__(self, maxsize): + self.lru_order = _LinkedList() + self.maxsize = maxsize + self.cache = {} + + def __delitem__(self, item): + node = self.cache[item] + self.lru_order.remove_node(node) + del self.cache[item] + + def __getitem__(self, item): + node = self.cache[item] + self.lru_order.remove_node(node) + self.lru_order.appendleft_node(node) + # A linked list node contains a tuple of the form (item, value). + return node[0][1] + + def __contains__(self, item): + return item in self.cache + + def discard(self, item): + if item in self.cache: + del self[item] + + def clear(self): + self.cache.clear() + del self.lru_order + self.lru_order = _LinkedList() + + def __setitem__(self, item, value): + if self.maxsize < 1: + return + + try: + node = self.cache[item] + self.lru_order.remove_node(node) + except KeyError: + if len(self.lru_order) >= self.maxsize: + # Make room for new elements. + end_node = self.lru_order.pop_node() + del self.cache[end_node[0][0]] + + node = self.lru_order.new_node((item, value)) + self.cache[item] = node + + self.lru_order.appendleft_node(node) + return node[0] + + class PersistentDict(object): - def __init__(self, identifier, key_builder=None, container_dir=None): + def __init__(self, identifier, key_builder=None, container_dir=None, + in_mem_cache_size=None): """ :arg identifier: a file-name-compatible string identifying this dictionary :arg key_builder: a subclass of :class:`KeyBuilder` + :arg in_mem_cache_size: If not *None*, retain an in-memory cache of + *in_mem_cache_size* items. The replacement policy is LRU. .. automethod:: __getitem__ .. automethod:: __setitem__ @@ -304,20 +428,20 @@ class PersistentDict(object): ".".join(str(i) for i in sys.version_info),)) self.container_dir = container_dir + self.version_dir = join(container_dir, "version") - self._make_container_dir() + self._make_container_dirs() - def _make_container_dir(self): - # {{{ ensure container directory exists + if in_mem_cache_size is None: + in_mem_cache_size = 0 - try: - os.makedirs(self.container_dir) - except OSError as e: - from errno import EEXIST - if e.errno != EEXIST: - raise + self._use_cache = (in_mem_cache_size >= 1) + self._read_key_cache = _LRUCache(in_mem_cache_size) + self._read_contents_cache = _LRUCache(in_mem_cache_size) - # }}} + def _make_container_dirs(self): + _make_dir_recursively(self.container_dir) + _make_dir_recursively(self.version_dir) def store(self, key, value, info_files={}): hexdigest_key = self.key_builder(key) @@ -348,6 +472,8 @@ class PersistentDict(object): logger.debug("%s: cache store [key=%s]" % ( self.identifier, hexdigest_key)) + self._tick_version(hexdigest_key) + # Write key last, so that if the reader below key_path = item_dir_m.sub("key") with open(key_path, "wb") as outf: @@ -359,6 +485,39 @@ class PersistentDict(object): finally: cleanup_m.clean_up() + def _tick_version(self, hexdigest_key): + from os.path import join + version_path = join(self.version_dir, hexdigest_key) + + from six.moves.cPickle import load, dump, HIGHEST_PROTOCOL + + try: + with open(version_path, "r+b") as versionf: + version = 1 + load(versionf) + versionf.seek(0) + dump(version, versionf, protocol=HIGHEST_PROTOCOL) + + except (IOError, EOFError): + _make_dir_recursively(self.version_dir) + with open(version_path, "wb") as versionf: + dump(0, versionf, protocol=HIGHEST_PROTOCOL) + + def _read_cached(self, file_name, version, cache): + try: + value, cached_version = cache[file_name] + if version == cached_version: + return value + except KeyError: + pass + + with open(file_name, "rb") as inf: + from six.moves.cPickle import load + value = load(inf) + + cache[file_name] = (value, version) + + return value + def fetch(self, key): hexdigest_key = self.key_builder(key) @@ -377,19 +536,51 @@ class PersistentDict(object): item_dir_m = ItemDirManager(cleanup_m, item_dir) key_path = item_dir_m.sub("key") value_path = item_dir_m.sub("contents") + version_path = join(self.version_dir, hexdigest_key) - from six.moves.cPickle import load + # {{{ read version + + version = None + exc = None + + try: + with open(version_path, "rb") as versionf: + from six.moves.cPickle import load + version = load(versionf) + except IOError: + # Not a fatal error - but we won't be able to use the cache. + self._read_key_cache.discard(key_path) + self._read_contents_cache.discard(value_path) + except (OSError, EOFError) as e: + exc = e + + if version is None: + try: + # If the version doesn't exist, reset the version + # counter. + self._tick_version(hexdigest_key) + except (OSError, IOError, EOFError) as e: + exc = e + + if exc is not None: + item_dir_m.reset() + from warnings import warn + warn("pytools.persistent_dict.PersistentDict(%s) " + "encountered an invalid " + "key file for key %s. Entry deleted." + % (self.identifier, hexdigest_key)) + raise NoSuchEntryError(key) + + # }}} # {{{ load key file exc = None try: - with open(key_path, "rb") as inf: - read_key = load(inf) - except IOError as e: - exc = e - except EOFError as e: + read_key = self._read_cached(key_path, version, + self._read_key_cache) + except (OSError, IOError, EOFError) as e: exc = e if exc is not None: @@ -423,11 +614,9 @@ class PersistentDict(object): exc = None try: - with open(value_path, "rb") as inf: - read_contents = load(inf) - except IOError as e: - exc = e - except EOFError as e: + read_contents = self._read_cached(value_path, version, + self._read_contents_cache) + except (OSError, IOError, EOFError) as e: exc = e if exc is not None: @@ -457,6 +646,9 @@ class PersistentDict(object): if not isdir(item_dir): raise NoSuchEntryError(key) + key_file = join(item_dir, "key") + contents_file = join(item_dir, "contents") + cleanup_m = CleanupManager() try: try: @@ -471,6 +663,9 @@ class PersistentDict(object): finally: cleanup_m.clean_up() + self._read_key_cache.discard(key_file) + self._read_contents_cache.discard(contents_file) + def __getitem__(self, key): return self.fetch(key) @@ -487,7 +682,10 @@ class PersistentDict(object): if e.errno != errno.ENOENT: raise - self._make_container_dir() + self._make_container_dirs() + + self._read_key_cache.clear() + self._read_contents_cache.clear() # }}} diff --git a/test/test_persistent_dict.py b/test/test_persistent_dict.py index 3da5748..7904a03 100644 --- a/test/test_persistent_dict.py +++ b/test/test_persistent_dict.py @@ -37,6 +37,63 @@ def test_persistent_dict(): assert d[k] + 1 == pdict[k] +class PDictTestValue(object): + + def __init__(self, val): + self.val = val + + def __getstate__(self): + return {"val": self.val} + + def update_persistent_hash(self, key_hash, key_builder): + key_builder.rec(key_hash, self.val) + + +def test_persistent_dict_in_memory_cache(): + from pytools.persistent_dict import PersistentDict + pdict = PersistentDict("pytools-in-memory-cache-test", in_mem_cache_size=3) + pdict.clear() + + pdict[1] = PDictTestValue(1) + pdict[2] = PDictTestValue(2) + pdict[3] = PDictTestValue(3) + pdict[4] = PDictTestValue(4) + + # {{{ test LRU policy + + val1 = pdict[1] + val1.val = 0 + + assert pdict[1].val == 0 + pdict[2] + assert pdict[1].val == 0 + pdict[3] + assert pdict[1].val == 0 + pdict[2] + assert pdict[1].val == 0 + pdict[4] + assert pdict[1].val == 1 + + # }}} + + # {{{ test cache invalidation by versioning + + assert pdict[1].val == 1 + pdict2 = PersistentDict("pytools-in-memory-cache-test") + pdict2[1] = PDictTestValue(5) + assert pdict[1].val == 5 + + # }}} + + # {{{ test cache invalidation by deletion + + del pdict2[1] + pdict2[1] = PDictTestValue(10) + assert pdict[1].val == 10 + + # }}} + + if __name__ == "__main__": if len(sys.argv) > 1: exec(sys.argv[1]) -- GitLab