# SPDX-License-Identifier: Apache-2.0
# Copyright Contributors to the Rez Project
from __future__ import print_function
from rez.config import config
from rez.vendor.memcache.memcache import Client as Client_, \
SERVER_MAX_KEY_LENGTH, __version__ as memcache_client_version
from rez.utils import py23
from threading import local
from contextlib import contextmanager
from functools import update_wrapper
from inspect import isgeneratorfunction
from hashlib import md5
from uuid import uuid4
from rez.vendor.six import six
basestring = six.string_types[0]
# this version should be changed if and when the caching interface changes
cache_interface_version = 2
[docs]class Client(object):
"""Wrapper for memcache.Client instance.
Adds the features:
- unlimited key length;
- hard/soft flushing;
- ability to cache None.
"""
class _Miss(object):
def __nonzero__(self):
return False
__bool__ = __nonzero__ # py3 compat
miss = _Miss()
logger = config.debug_printer("memcache")
def __init__(self, servers, debug=False):
"""Create a memcached client.
Args:
servers (str or list of str): Server URI(s), eg '127.0.0.1:11211'.
debug (bool): If True, quasi human readable keys are used. This helps
debugging - run 'memcached -vv' in the foreground to see the keys
being get/set/stored.
"""
self.servers = [servers] if isinstance(servers, basestring) else servers
self.key_hasher = self._debug_key_hash if debug else self._key_hash
self._client = None
self.debug = debug
self.current = ''
def __nonzero__(self):
return bool(self.servers)
__bool__ = __nonzero__ # py3 compat
@property
def client(self):
"""Get the native memcache client.
Returns:
`memcache.Client` instance.
"""
if self._client is None:
self._client = Client_(self.servers)
return self._client
[docs] def test_servers(self):
"""Test that memcached servers are servicing requests.
Returns:
set: URIs of servers that are responding.
"""
responders = set()
for server in self.servers:
client = Client_([server])
key = uuid4().hex
client.set(key, 1)
if client.get(key) == 1:
responders.add(server)
return responders
[docs] def set(self, key, val, time=0, min_compress_len=0):
"""See memcache.Client."""
if not self.servers:
return
key = self._qualified_key(key)
hashed_key = self.key_hasher(key)
val = (key, val)
self.client.set(key=hashed_key,
val=val,
time=time,
min_compress_len=min_compress_len)
self.logger("SET: %s", key)
[docs] def get(self, key):
"""See memcache.Client.
Returns:
object: A value if cached, else `self.miss`. Note that this differs
from `memcache.Client`, which returns None on cache miss, and thus
cannot cache the value None itself.
"""
if not self.servers:
return self.miss
key = self._qualified_key(key)
hashed_key = self.key_hasher(key)
entry = self.client.get(hashed_key)
if isinstance(entry, tuple) and len(entry) == 2:
key_, result = entry
if key_ == key:
self.logger("HIT: %s", key)
return result
self.logger("MISS: %s", key)
return self.miss
[docs] def delete(self, key):
"""See memcache.Client."""
if self.servers:
key = self._qualified_key(key)
hashed_key = self.key_hasher(key)
self.client.delete(hashed_key)
[docs] def flush(self, hard=False):
"""Drop existing entries from the cache.
Args:
hard (bool): If True, all current entries are flushed from the
server(s), which affects all users. If False, only the local
process is affected.
"""
if not self.servers:
return
if hard:
self.client.flush_all()
self.reset_stats()
else:
from uuid import uuid4
tag = uuid4().hex
if self.debug:
tag = "flushed" + tag
self.current = tag
[docs] def get_stats(self):
"""Get server statistics.
Returns:
A list of tuples (server_identifier, stats_dictionary).
"""
return self._get_stats()
[docs] def reset_stats(self):
"""Reset the server stats."""
self._get_stats("reset")
[docs] def disconnect(self):
"""Disconnect from server(s). Behaviour is undefined after this call."""
if self.servers and self._client:
self._client.disconnect_all()
# print("Disconnected memcached client %s" % str(self))
def _qualified_key(self, key):
"""
Qualify cache key so that:
* changes to schemas don't break compatibility (cache_interface_version)
* we're shielded from potential compatibility bugs in newer versions of
python-memcached
"""
return "%s:%s:%s:%s" % (
memcache_client_version,
cache_interface_version,
self.current,
key
)
def _get_stats(self, stat_args=None):
return self.client.get_stats(stat_args=stat_args)
@classmethod
def _key_hash(cls, key):
return md5(key.encode("utf-8")).hexdigest()
@classmethod
def _debug_key_hash(cls, key):
import re
h = cls._key_hash(key)[:16]
value = "%s:%s" % (h, key)
value = value[:SERVER_MAX_KEY_LENGTH]
value = re.sub("[^0-9a-zA-Z]+", '_', value)
return value
class _ScopedInstanceManager(local):
def __init__(self):
self.clients = {}
def acquire(self, servers, debug=False):
key = (tuple(servers or []), debug)
entry = self.clients.get(key)
if entry:
entry[1] += 1
return entry[0], key
else:
client = Client(servers, debug=debug)
self.clients[key] = [client, 1]
return client, key
def release(self, key):
entry = self.clients.get(key)
assert entry
entry[1] -= 1
if not entry[1]:
client = entry[0]
del self.clients[key]
client.disconnect()
scoped_instance_manager = _ScopedInstanceManager()
[docs]@contextmanager
def memcached_client(servers=config.memcached_uri, debug=config.debug_memcache):
"""Get a shared memcached instance.
This function shares the same memcached instance across nested invocations.
This is done so that memcached connections can be kept to a minimum, but at
the same time unnecessary extra reconnections are avoided. Typically an
initial scope (using 'with' construct) is made around parts of code that hit
the cache server many times - such as a resolve, or executing a context. On
exit of the topmost scope, the memcached client is disconnected.
Returns:
`Client`: Memcached instance.
"""
key = None
try:
client, key = scoped_instance_manager.acquire(servers, debug=debug)
yield client
finally:
if key:
scoped_instance_manager.release(key)
[docs]def pool_memcached_connections(func):
"""Function decorator to pool memcached connections.
Use this to wrap functions that might make multiple calls to memcached. This
will cause a single memcached client to be shared for all connections.
"""
if isgeneratorfunction(func):
def wrapper(*nargs, **kwargs):
with memcached_client():
for result in func(*nargs, **kwargs):
yield result
else:
def wrapper(*nargs, **kwargs):
with memcached_client():
return func(*nargs, **kwargs)
return update_wrapper(wrapper, func)
[docs]def memcached(servers, key=None, from_cache=None, to_cache=None, time=0,
min_compress_len=0, debug=False):
"""memcached memoization function decorator.
The wrapped function is expected to return a value that is stored to a
memcached server, first translated by `to_cache` if provided. In the event
of a cache hit, the data is translated by `from_cache` if provided, before
being returned. If you do not want a result to be cached, wrap the return
value of your function in a `DoNotCache` object.
Example:
@memcached('127.0.0.1:11211')
def _listdir(path):
return os.path.listdir(path)
Note:
If using the default key function, ensure that repr() is implemented on
all your arguments and that they are hashable.
Note:
`from_cache` and `to_cache` both accept the value as first parameter,
then the target function's arguments follow.
Args:
servers (str or list of str): memcached server uri(s), eg '127.0.0.1:11211'.
This arg can be None also, in which case memcaching is disabled.
key (callable, optional): Function that, given the target function's args,
returns the string key to use in memcached.
from_cache (callable, optional): If provided, and a cache hit occurs, the
cached value will be translated by this function before being returned.
to_cache (callable, optional): If provided, and a cache miss occurs, the
function's return value will be translated by this function before
being cached.
time (int): Tells memcached the time which this value should expire, either
as a delta number of seconds, or an absolute unix time-since-the-epoch
value. See the memcached protocol docs section "Storage Commands"
for more info on <exptime>. We default to 0 == cache forever.
min_compress_len (int): The threshold length to kick in auto-compression
of the value using the zlib.compress() routine. If the value being cached is
a string, then the length of the string is measured, else if the value is an
object, then the length of the pickle result is measured. If the resulting
attempt at compression yeilds a larger string than the input, then it is
discarded. For backwards compatability, this parameter defaults to 0,
indicating don't ever try to compress.
debug (bool): If True, memcache keys are kept human readable, so you can
read them if running a foreground memcached proc with 'memcached -vv'.
However this increases chances of key clashes so should not be left
turned on.
"""
def default_key(func, *nargs, **kwargs):
parts = [func.__module__]
argnames = py23.get_function_arg_names(func)
if argnames:
if argnames[0] == "cls":
cls_ = nargs[0]
parts.append(cls_.__name__)
nargs = nargs[1:]
elif argnames[0] == "self":
cls_ = nargs[0].__class__
parts.append(cls_.__name__)
nargs = nargs[1:]
parts.append(func.__name__)
value = ('.'.join(parts), nargs, tuple(sorted(kwargs.items())))
# make sure key is hashable. We don't strictly need it to be, but this
# is a way of hopefully avoiding object types that are not ordered (these
# would give an unreliable key). If you need to key on unhashable args,
# you should provide your own `key` functor.
#
_ = hash(value) # noqa
return repr(value)
def identity(value, *nargs, **kwargs):
return value
from_cache = from_cache or identity
to_cache = to_cache or identity
def decorator(func):
if servers:
def wrapper(*nargs, **kwargs):
with memcached_client(servers, debug=debug) as client:
if key:
cache_key = key(*nargs, **kwargs)
else:
cache_key = default_key(func, *nargs, **kwargs)
# get
result = client.get(cache_key)
if result is not client.miss:
return from_cache(result, *nargs, **kwargs)
# cache miss - run target function
result = func(*nargs, **kwargs)
if isinstance(result, DoNotCache):
return result.result
# store
cache_result = to_cache(result, *nargs, **kwargs)
client.set(key=cache_key,
val=cache_result,
time=time,
min_compress_len=min_compress_len)
return result
else:
def wrapper(*nargs, **kwargs):
result = func(*nargs, **kwargs)
if isinstance(result, DoNotCache):
return result.result
return result
def forget():
"""Forget entries in the cache.
Note that this does not delete entries from a memcached server - that
would be slow and error-prone. Calling this function only ensures
that entries set by the current process will no longer be seen during
this process.
"""
with memcached_client(servers, debug=debug) as client:
client.flush()
wrapper.forget = forget
wrapper.__wrapped__ = func
return update_wrapper(wrapper, func)
return decorator
[docs]class DoNotCache(object):
def __init__(self, result):
self.result = result