__copyright__ = "Copyright (C) 2016 Matt Wala" __license__ = """ Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ import logging from pickle import dumps, loads import pytest import loopy as lp from loopy.version import LOOPY_USE_LANGUAGE_VERSION_2018_2 # noqa: F401 logger = logging.getLogger(__name__) def test_kernel_pickling_and_hashing(): knl = lp.make_kernel("{[i]: 0<=i<10}", """ y[i] = i """) from loopy.tools import LoopyKeyBuilder reconst_knl = loads(dumps(knl)) assert LoopyKeyBuilder()(knl) == LoopyKeyBuilder()(reconst_knl) def test_set_trie(): from loopy.kernel.tools import SetTrie s = SetTrie() s.add_or_update({1, 2, 3}) s.add_or_update({4, 2, 1}) s.add_or_update({1, 5}) result = [] s.descend(lambda prefix: result.extend(prefix)) assert result == [1, 2, 3, 4, 5] with pytest.raises(ValueError): s.add_or_update({1, 4}) class PickleDetector: """Contains a class attribute which flags if any instance was unpickled. """ @classmethod def reset(cls): cls.instance_unpickled = False def __getstate__(self): return {"state": self.state} def __setstate__(self, state): self.__class__.instance_unpickled = True self.state = state["state"] class PickleDetectorForLazilyUnpicklingDict(PickleDetector): instance_unpickled = False def __init__(self): self.state = None def test_lazily_unpickling_dict(): from loopy.tools import LazilyUnpicklingDict cls = PickleDetectorForLazilyUnpicklingDict mapping = LazilyUnpicklingDict({0: cls()}) assert not cls.instance_unpickled pickled_mapping = dumps(mapping) # {{{ test lazy loading mapping = loads(pickled_mapping) assert not cls.instance_unpickled list(mapping.keys()) assert not cls.instance_unpickled assert isinstance(mapping[0], cls) assert cls.instance_unpickled # }}} # {{{ conversion cls.reset() mapping = loads(pickled_mapping) dict(mapping) assert cls.instance_unpickled # }}} # {{{ test multi round trip mapping = loads(dumps(loads(pickled_mapping))) assert isinstance(mapping[0], cls) # }}} # {{{ test empty map mapping = LazilyUnpicklingDict({}) mapping = loads(dumps(mapping)) assert len(mapping) == 0 # }}} class PickleDetectorForLazilyUnpicklingList(PickleDetector): instance_unpickled = False def __init__(self): self.state = None def test_lazily_unpickling_list(): from loopy.tools import LazilyUnpicklingList cls = PickleDetectorForLazilyUnpicklingList lst = LazilyUnpicklingList([cls()]) assert not cls.instance_unpickled pickled_lst = dumps(lst) # {{{ test lazy loading lst = loads(pickled_lst) assert not cls.instance_unpickled assert isinstance(lst[0], cls) assert cls.instance_unpickled # }}} # {{{ conversion cls.reset() lst = loads(pickled_lst) list(lst) assert cls.instance_unpickled # }}} # {{{ test multi round trip lst = loads(dumps(loads(dumps(lst)))) assert isinstance(lst[0], cls) # }}} # {{{ test empty list lst = LazilyUnpicklingList([]) lst = loads(dumps(lst)) assert len(lst) == 0 # }}} class PickleDetectorForLazilyUnpicklingListWithEqAndPersistentHashing( PickleDetector): instance_unpickled = False def __init__(self, comparison_key): self.state = comparison_key def __repr__(self): return repr(self.state) def update_persistent_hash(self, key_hash, key_builder): key_builder.rec(key_hash, repr(self)) def test_lazily_unpickling_list_eq_and_persistent_hashing(): from loopy.tools import LazilyUnpicklingListWithEqAndPersistentHashing cls = PickleDetectorForLazilyUnpicklingListWithEqAndPersistentHashing # {{{ test comparison of a pair of lazy lists lst0 = LazilyUnpicklingListWithEqAndPersistentHashing( [cls(0), cls(1)], eq_key_getter=repr, persistent_hash_key_getter=repr) lst1 = LazilyUnpicklingListWithEqAndPersistentHashing( [cls(0), cls(1)], eq_key_getter=repr, persistent_hash_key_getter=repr) assert not cls.instance_unpickled assert lst0 == lst1 assert not cls.instance_unpickled lst0 = loads(dumps(lst0)) lst1 = loads(dumps(lst1)) assert lst0 == lst1 assert not cls.instance_unpickled lst0.append(cls(3)) lst1.append(cls(2)) assert lst0 != lst1 # }}} # {{{ comparison with plain lists lst = [cls(0), cls(1), cls(3)] assert lst == lst0 assert lst0 == lst assert not cls.instance_unpickled # }}} # {{{ persistent hashing from loopy.tools import LoopyKeyBuilder kb = LoopyKeyBuilder() assert kb(lst0) == kb(lst) assert not cls.instance_unpickled # }}} def test_optional(): from loopy import Optional # {{{ test API opt = Optional() assert not opt.has_value with pytest.raises(AttributeError): opt.value # noqa: B018 opt = Optional(1) assert opt.has_value assert 1 == opt.value assert Optional(1) == Optional(1) assert Optional(1) != Optional(2) assert Optional() == Optional() assert Optional() != Optional(1) # }}} # {{{ test pickling import pickle assert not pickle.loads(pickle.dumps(Optional())).has_value assert pickle.loads(pickle.dumps(Optional(1))).value == 1 # }}} # {{{ test key builder from loopy.tools import LoopyKeyBuilder kb = LoopyKeyBuilder() kb(Optional()) kb(Optional(None)) # }}} @lp.memoize_on_disk def very_costly_transform(knl, iname): from time import sleep sleep(5) return lp.split_iname(knl, iname, 4) def test_memoize_on_disk(): if not lp.CACHING_ENABLED: # if caching is disabled => don't test the caching behavior pytest.skip("cannot test memoization if caching disabled") knl = lp.make_kernel("{[i]: 0<=i<10}", """ y[i] = i """) from time import time uncached_knl = very_costly_transform(knl, "i") start = time() cached_knl = very_costly_transform(knl, "i") end = time() assert (end - start) < 4 assert cached_knl == uncached_knl @lp.memoize_on_disk def get_twice_of_pym_expr(expr): from time import sleep sleep(2) return 2 * expr def test_memoize_on_disk_with_pym_expr(): if not lp.CACHING_ENABLED: # if caching is disabled => don't test the caching behavior pytest.skip("cannot test memoization if caching disabled") from pymbolic import parse expr = parse("a[i] + b[i]") from time import time uncached_result = get_twice_of_pym_expr(expr) start = time() cached_result = get_twice_of_pym_expr(expr) end = time() assert (end - start) < 1.5 assert cached_result == uncached_result if __name__ == "__main__": import sys if len(sys.argv) > 1: exec(sys.argv[1]) else: from pytest import main main([__file__]) # vim: foldmethod=marker