Source code for rez.utils.sourcecode

# SPDX-License-Identifier: Apache-2.0
# Copyright Contributors to the Rez Project


from rez.utils.formatting import indent
from rez.utils.data_utils import cached_property
from rez.utils.logging_ import print_debug
from rez.utils import py23
from inspect import getsourcelines
from textwrap import dedent
from glob import glob
import traceback
import os.path


[docs]def early(): """Used by functions in package.py to harden to the return value at build time. The term 'early' refers to the fact these package attribute are evaluated early, ie at build time and before a package is installed. """ def decorated(fn): setattr(fn, "_early", True) return fn return decorated
[docs]def late(): """Used by functions in package.py that are evaluated lazily. The term 'late' refers to the fact these package attributes are evaluated late, ie when the attribute is queried for the first time. If you want to implement a package.py attribute as a function, you MUST use this decorator - otherwise it is understood that you want your attribute to be a function, not the return value of that function. """ from rez.package_resources import package_rex_keys def decorated(fn): # this is done here rather than in standard schema validation because # the latter causes a very obfuscated error message if fn.__name__ in package_rex_keys: raise ValueError("Cannot use @late decorator on function '%s'" % fn.__name__) setattr(fn, "_late", True) _add_decorator(fn, "late") return fn return decorated
[docs]def include(module_name, *module_names): """Used by functions in package.py to have access to named modules. See the 'package_definition_python_path' config setting for more info. """ def decorated(fn): _add_decorator(fn, "include", nargs=[module_name] + list(module_names)) return fn return decorated
def _add_decorator(fn, name, **kwargs): if not hasattr(fn, "_decorators"): setattr(fn, "_decorators", []) kwargs.update({"name": name}) fn._decorators.append(kwargs)
[docs]class SourceCodeError(Exception): def __init__(self, msg, short_msg): super(SourceCodeError, self).__init__(msg) self.short_msg = short_msg
[docs]class SourceCodeCompileError(SourceCodeError): pass
[docs]class SourceCodeExecError(SourceCodeError): pass
[docs]class SourceCode(object): """Wrapper for python source code. This object is aware of the decorators defined in this sourcefile (such as 'include') and deals with them appropriately. """ def __init__(self, source=None, func=None, filepath=None, eval_as_function=True): self.source = (source or '').rstrip() self.func = func self.filepath = filepath self.eval_as_function = eval_as_function self.package = None self.funcname = None self.decorators = [] if self.func is not None: self._init_from_func()
[docs] def copy(self): other = SourceCode.__new__(SourceCode) other.source = self.source other.func = self.func other.filepath = self.filepath other.eval_as_function = self.eval_as_function other.package = self.package other.funcname = self.funcname other.decorators = self.decorators return other
def _init_from_func(self): self.funcname = self.func.__name__ self.decorators = getattr(self.func, "_decorators", []) # get txt of function body. Skips sig and any decorators. Assumes that # only the decorators in this file (such as 'include') are used. loc = getsourcelines(self.func)[0][len(self.decorators) + 1:] code = dedent(''.join(loc)) # align lines that start with a comment (#) codelines = code.split('\n') linescount = len(codelines) for i, line in enumerate(codelines): if line.startswith('#'): nextindex = i + 1 if i < linescount else i - 1 nextline = codelines[nextindex] while nextline.startswith('#'): nextline = codelines[nextindex] nextindex = (nextindex + 1 if nextindex < linescount else nextindex - 1) firstchar = len(nextline) - len(nextline.lstrip()) codelines[i] = '%s%s' % (nextline[:firstchar], line) code = '\n'.join(codelines).rstrip() code = dedent(code) self.source = code @cached_property def includes(self): info = self._get_decorator_info("include") if not info: return None return set(info.get("nargs", [])) @cached_property def late_binding(self): info = self._get_decorator_info("late") return bool(info) @cached_property def evaluated_code(self): if self.eval_as_function: funcname = self.funcname or "_unnamed" code = indent(self.source) code = ( "def %s():\n" % funcname + code + "\n_result = %s()" % funcname ) else: code = "if True:\n" + indent(self.source) return code @property def sourcename(self): if self.filepath: filename = self.filepath else: filename = "string" if self.funcname: filename += ":%s" % self.funcname return "<%s>" % filename @cached_property def compiled(self): try: pyc = compile(self.evaluated_code, self.sourcename, 'exec') except Exception as e: stack = traceback.format_exc() raise SourceCodeCompileError( "Failed to compile %s:\n%s" % (self.sourcename, stack), short_msg=str(e)) return pyc
[docs] def set_package(self, package): # this is needed to load @included modules self.package = package
[docs] def exec_(self, globals_={}): # bind import modules if self.package is not None and self.includes: for name in self.includes: module = include_module_manager.load_module(name, self.package) globals_[name] = module # exec pyc = self.compiled try: exec(pyc, globals_) except Exception as e: stack = traceback.format_exc() raise SourceCodeExecError( "Failed to execute %s:\n%s" % (self.sourcename, stack), short_msg=str(e)) return globals_.get("_result")
[docs] def to_text(self, funcname): # don't indent code if already indented if self.source[0] in (' ', '\t'): source = self.source else: source = indent(self.source) txt = "def %s():\n%s" % (funcname, source) for entry in self.decorators: nargs_str = ", ".join(map(repr, entry.get("nargs", []))) name_str = entry.get("name") sig = "@%s(%s)" % (name_str, nargs_str) txt = sig + '\n' + txt return txt
def _get_decorator_info(self, name): matches = [x for x in self.decorators if x.get("name") == name] if not matches: return None return matches[0] def __getstate__(self): return { "source": self.source, "filepath": self.filepath, "funcname": self.funcname, "eval_as_function": self.eval_as_function, "decorators": self.decorators } def __setstate__(self, state): self.source = state["source"] self.filepath = state["filepath"] self.funcname = state["funcname"] self.eval_as_function = state["eval_as_function"] self.decorators = state["decorators"] self.func = None self.package = None def __eq__(self, other): return ( isinstance(other, SourceCode) and other.source == self.source ) def __ne__(self, other): return not (other == self) def __str__(self): return self.source def __repr__(self): return "%s(%r)" % (self.__class__.__name__, self.source)
[docs]class IncludeModuleManager(object): """Manages a cache of modules imported via '@include' decorator. """ # subdirectory under package 'base' path where we expect to find copied # sourcefiles referred to by the '@include' function decorator. # include_modules_subpath = ".rez/include" def __init__(self): self.modules = {}
[docs] def load_module(self, name, package): from hashlib import sha1 from rez.config import config # avoiding circular import from rez.developer_package import DeveloperPackage # in rare cases, a @late bound function may get called before the # package is built. An example is 'requires' and the other requires-like # functions. These need to be evaluated before a build, but it does also # make sense to sometimes implement these as late-bound functions. We # detect this case here, and load the modules from the original (pre- # copied into package payload) location. # if isinstance(package, DeveloperPackage): # load sourcefile from original location path = config.package_definition_python_path filepath = os.path.join(path, "%s.py" % name) if not os.path.exists(filepath): return None with open(filepath, "rb") as f: hash_str = sha1(f.read().strip()).hexdigest() else: # load sourcefile that's been copied into package install payload path = os.path.join(package.base, self.include_modules_subpath) pathname = os.path.join(path, "%s.py" % name) hashname = os.path.join(path, "%s.sha1" % name) if os.path.isfile(pathname) and os.path.isfile(hashname): with open(hashname, "r") as f: hash_str = f.readline() filepath = pathname else: # Fallback for backward compat pathname = os.path.join(path, "%s-*.py" % name) hashnames = glob(pathname) if not hashnames: return None filepath = hashnames[0] hash_str = filepath.rsplit('-', 1)[-1].split('.', 1)[0] # End, for details of backward compat, # see https://github.com/nerdvegas/rez/issues/934 # and https://github.com/nerdvegas/rez/pull/935 module = self.modules.get(hash_str) if module is not None: return module if config.debug("file_loads"): print_debug("Loading include sourcefile: %s" % filepath) module = py23.load_module_from_file(name, filepath) self.modules[hash_str] = module return module
# singleton include_module_manager = IncludeModuleManager()