# SPDX-License-Identifier: Apache-2.0
# Copyright Contributors to the Rez Project
from __future__ import print_function
from rez.utils.execution import create_forwarding_script
from rez.exceptions import SuiteError, ResolvedContextError
from rez.resolved_context import ResolvedContext
from rez.utils.data_utils import cached_property
from rez.utils.formatting import columnise, PackageRequest
from rez.utils.colorize import warning, critical, Printer, alias as alias_col
from rez.vendor import yaml
from rez.vendor.yaml.error import YAMLError
from rez.utils.yaml import dump_yaml
from rez.vendor.six import six
from collections import defaultdict
import os
import os.path
import shutil
import sys
basestring = six.string_types[0]
[docs]class Suite(object):
"""A collection of contexts.
A suite is a collection of contexts. A suite stores its contexts in a
single directory, and creates wrapper scripts for each tool in each context,
which it stores into a single bin directory. When a tool is invoked, it
executes the actual tool in its associated context. When you add a suite's
bin directory to PATH, you have access to all these tools, which will
automatically run in correctly configured environments.
Tool clashes can occur when a tool of the same name is present in more than
one context. When a context is added to a suite, or prefixed/suffixed, that
context's tools override tools from other contexts.
There are several ways to avoid tool name clashes:
- Hide a tool. This removes it from the suite even if it does not clash;
- Prefix/suffix a context. When you do this, all the tools in the context
have the prefix/suffix applied;
- Explicitly alias a tool using the `alias_tool` method. This takes
precedence over context prefix/suffixing.
"""
def __init__(self):
"""Create a suite."""
self.load_path = None
self.contexts = {}
self.next_priority = 1
self.tools = None
self.tool_conflicts = None
self.hidden_tools = None
@property
def context_names(self):
"""Get the names of the contexts in the suite.
Reurns:
List of strings.
"""
return list(self.contexts.keys())
@cached_property
def tools_path(self):
"""Get the path that should be added to $PATH to expose this suite's
tools.
Returns:
Absolute path as a string, or None if this suite was not loaded
from disk.
"""
return os.path.join(self.load_path, "bin") if self.load_path else None
[docs] def activation_shell_code(self, shell=None):
"""Get shell code that should be run to activate this suite."""
from rez.shells import create_shell
from rez.rex import RexExecutor
executor = RexExecutor(interpreter=create_shell(shell),
parent_variables=["PATH"],
shebang=False)
executor.env.PATH.append(self.tools_path)
return executor.get_output().strip()
def __str__(self):
return "%s(%s)" % (self.__class__.__name__, " ".join(self.context_names))
[docs] def context(self, name):
"""Get a context.
Args:
name (str): Name to store the context under.
Returns:
`ResolvedContext` object.
"""
data = self._context(name)
context = data.get("context")
if context:
return context
assert self.load_path
context_path = os.path.join(self.load_path, "contexts", "%s.rxt" % name)
context = ResolvedContext.load(context_path)
data["context"] = context
data["loaded"] = True
return context
[docs] def add_context(self, name, context, prefix_char=None):
"""Add a context to the suite.
Args:
name (str): Name to store the context under.
context (ResolvedContext): Context to add.
"""
if name in self.contexts:
raise SuiteError("Context already in suite: %r" % name)
if not context.success:
raise SuiteError("Context is not resolved: %r" % name)
self.contexts[name] = dict(name=name,
context=context.copy(),
tool_aliases={},
hidden_tools=set(),
priority=self._next_priority,
prefix_char=prefix_char)
self._flush_tools()
[docs] def find_contexts(self, in_request=None, in_resolve=None):
"""Find contexts in the suite based on search criteria.
Args:
in_request (str): Match contexts that contain the given package in
their request.
in_resolve (str or `Requirement`): Match contexts that contain the
given package in their resolve. You can also supply a conflict
requirement - '!foo' will match any contexts whos resolve does
not contain any version of package 'foo'.
Returns:
List of context names that match the search criteria.
"""
names = self.context_names
if in_request:
def _in_request(name):
context = self.context(name)
packages = set(x.name for x in context.requested_packages(True))
return (in_request in packages)
names = [x for x in names if _in_request(x)]
if in_resolve:
if isinstance(in_resolve, basestring):
in_resolve = PackageRequest(in_resolve)
def _in_resolve(name):
context = self.context(name)
variant = context.get_resolved_package(in_resolve.name)
if variant:
overlap = (variant.version in in_resolve.range)
return (
(in_resolve.conflict and not overlap)
or (overlap and not in_resolve.conflict)
)
else:
return in_resolve.conflict
names = [x for x in names if _in_resolve(x)]
return names
[docs] def remove_context(self, name):
"""Remove a context from the suite.
Args:
name (str): Name of the context to remove.
"""
self._context(name)
del self.contexts[name]
self._flush_tools()
[docs] def set_context_prefix(self, name, prefix):
"""Set a context's prefix.
This will be applied to all wrappers for the tools in this context. For
example, a tool called 'foo' would appear as '<prefix>foo' in the
suite's bin path.
Args:
name (str): Name of the context to prefix.
prefix (str): Prefix to apply to tools.
"""
data = self._context(name)
data["prefix"] = prefix
self._flush_tools()
[docs] def remove_context_prefix(self, name):
"""Remove a context's prefix.
Args:
name (str): Name of the context to de-prefix.
"""
self.set_context_prefix(name, "")
[docs] def set_context_suffix(self, name, suffix):
"""Set a context's suffix.
This will be applied to all wrappers for the tools in this context. For
example, a tool called 'foo' would appear as 'foo<suffix>' in the
suite's bin path.
Args:
name (str): Name of the context to suffix.
suffix (str): Suffix to apply to tools.
"""
data = self._context(name)
data["suffix"] = suffix
self._flush_tools()
[docs] def remove_context_suffix(self, name):
"""Remove a context's suffix.
Args:
name (str): Name of the context to de-suffix.
"""
self.set_context_suffix(name, "")
[docs] def bump_context(self, name):
"""Causes the context's tools to take priority over all others."""
data = self._context(name)
data["priority"] = self._next_priority
self._flush_tools()
[docs] def get_tool_context(self, tool_alias):
"""Given a visible tool alias, return the name of the context it
belongs to.
Args:
tool_alias (str): Tool alias to search for.
Returns:
(str): Name of the context that exposes a visible instance of this
tool alias, or None if the alias is not available.
"""
tools_dict = self.get_tools()
data = tools_dict.get(tool_alias)
if data:
return data["context_name"]
return None
[docs] def get_conflicting_aliases(self):
"""Get a list of tool aliases that have one or more conflicts.
Returns:
List of strings.
"""
self._update_tools()
return list(self.tool_conflicts.keys())
[docs] def get_alias_conflicts(self, tool_alias):
"""Get a list of conflicts on the given tool alias.
Args:
tool_alias (str): Alias to check for conflicts.
Returns: None if the alias has no conflicts, or a list of dicts, where
each dict contains:
- tool_name (str): The original, non-aliased name of the tool;
- tool_alias (str): Aliased tool name (same as key);
- context_name (str): Name of the context containing the tool;
- variant (`Variant`): Variant providing the tool.
"""
self._update_tools()
return self.tool_conflicts.get(tool_alias)
[docs] def validate(self):
"""Validate the suite."""
for context_name in self.context_names:
context = self.context(context_name)
try:
context.validate()
except ResolvedContextError as e:
raise SuiteError("Error in context %r: %s"
% (context_name, str(e)))
[docs] def to_dict(self):
contexts_ = {}
for k, data in self.contexts.items():
data_ = data.copy()
if "context" in data_:
del data_["context"]
if "loaded" in data_:
del data_["loaded"]
contexts_[k] = data_
return dict(contexts=contexts_)
[docs] @classmethod
def from_dict(cls, d):
s = Suite.__new__(Suite)
s.load_path = None
s.tools = None
s.tool_conflicts = None
s.contexts = d["contexts"]
if s.contexts:
s.next_priority = max(x["priority"]
for x in s.contexts.values()) + 1
else:
s.next_priority = 1
return s
[docs] def save(self, path, verbose=False):
"""Save the suite to disk.
Args:
path (str): Path to save the suite to. If a suite is already saved
at `path`, then it will be overwritten. Otherwise, if `path`
exists, an error is raised.
"""
path = os.path.realpath(path)
if os.path.exists(path):
if self.load_path and self.load_path == path:
if verbose:
print("saving over previous suite...")
for context_name in self.context_names:
self.context(context_name) # load before dir deleted
shutil.rmtree(path)
else:
raise SuiteError("Cannot save, path exists: %r" % path)
contexts_path = os.path.join(path, "contexts")
os.makedirs(contexts_path)
# write suite data
data = self.to_dict()
filepath = os.path.join(path, "suite.yaml")
with open(filepath, "w") as f:
f.write(dump_yaml(data))
# write contexts
for context_name in self.context_names:
context = self.context(context_name)
context._set_parent_suite(path, context_name)
filepath = self._context_path(context_name, path)
if verbose:
print("writing %r..." % filepath)
context.save(filepath)
# create alias wrappers
tools_path = os.path.join(path, "bin")
os.makedirs(tools_path)
if verbose:
print("creating alias wrappers in %r..." % tools_path)
tools = self.get_tools()
for tool_alias, d in tools.items():
tool_name = d["tool_name"]
context_name = d["context_name"]
data = self._context(context_name)
prefix_char = data.get("prefix_char")
if verbose:
print("creating %r -> %r (%s context)..."
% (tool_alias, tool_name, context_name))
filepath = os.path.join(tools_path, tool_alias)
create_forwarding_script(filepath,
module="suite",
func_name="_FWD__invoke_suite_tool_alias",
context_name=context_name,
tool_name=tool_name,
prefix_char=prefix_char)
[docs] @classmethod
def load(cls, path):
if not os.path.exists(path):
open(path) # raise IOError
filepath = os.path.join(path, "suite.yaml")
if not os.path.isfile(filepath):
raise SuiteError("Not a suite: %r" % path)
try:
with open(filepath) as f:
data = yaml.load(f.read(), Loader=yaml.FullLoader)
except YAMLError as e:
raise SuiteError("Failed loading suite: %s" % str(e))
s = cls.from_dict(data)
s.load_path = os.path.realpath(path)
return s
[docs] @classmethod
def visible_suite_paths(cls, paths=None):
"""Get a list of paths to suites that are visible on $PATH.
Returns:
List of str.
"""
suite_paths = []
if paths is None:
paths = os.getenv("PATH", "").split(os.pathsep)
for path in paths:
if path and os.path.isdir(path):
path_ = os.path.dirname(path)
filepath = os.path.join(path_, "suite.yaml")
if os.path.isfile(filepath):
suite_paths.append(path_)
return suite_paths
[docs] @classmethod
def load_visible_suites(cls, paths=None):
"""Get a list of suites whos bin paths are visible on $PATH.
Returns:
List of `Suite` objects.
"""
suite_paths = cls.visible_suite_paths(paths)
suites = [cls.load(x) for x in suite_paths]
return suites
[docs] def print_info(self, buf=sys.stdout, verbose=False):
"""Prints a message summarising the contents of the suite."""
_pr = Printer(buf)
if not self.contexts:
_pr("Suite is empty.")
return
context_names = sorted(self.contexts.keys())
_pr("Suite contains %d contexts:" % len(context_names))
if not verbose:
_pr(' '.join(context_names))
return
tools = self.get_tools().values()
context_tools = defaultdict(set)
context_variants = defaultdict(set)
for entry in tools:
context_name = entry["context_name"]
context_tools[context_name].add(entry["tool_name"])
context_variants[context_name].add(str(entry["variant"]))
_pr()
rows = [["NAME", "VISIBLE TOOLS", "PATH"],
["----", "-------------", "----"]]
for context_name in context_names:
context_path = self._context_path(context_name) or '-'
ntools = len(context_tools.get(context_name, []))
if ntools:
nvariants = len(context_variants[context_name])
short_desc = "%d tools from %d packages" % (ntools, nvariants)
else:
short_desc = "no tools"
rows.append((context_name, short_desc, context_path))
_pr("\n".join(columnise(rows)))
def _context(self, name):
data = self.contexts.get(name)
if not data:
raise SuiteError("No such context: %r" % name)
return data
def _context_path(self, name, suite_path=None):
suite_path = suite_path or self.load_path
if not suite_path:
return None
filepath = os.path.join(suite_path, "contexts", "%s.rxt" % name)
return filepath
def _sorted_contexts(self):
return sorted(self.contexts.values(), key=lambda x: x["priority"])
@property
def _next_priority(self):
p = self.next_priority
self.next_priority += 1
return p
def _flush_tools(self):
self.tools = None
self.tool_conflicts = None
self.hidden_tools = None
def _validate_tool(self, context_name, tool_name):
context = self.context(context_name)
context_tools = context.get_tools(request_only=True)
for _, tool_names in context_tools.values():
if tool_name in tool_names:
return
raise SuiteError("No such tool %r in context %r"
% (tool_name, context_name))
def _update_tools(self):
if self.tools is not None:
return
self.tools = {}
self.hidden_tools = []
self.tool_conflicts = defaultdict(list)
for data in reversed(self._sorted_contexts()):
context_name = data["name"]
tool_aliases = data["tool_aliases"]
hidden_tools = data["hidden_tools"]
prefix = data.get("prefix", "")
suffix = data.get("suffix", "")
context = self.context(context_name)
context_tools = context.get_tools(request_only=True)
for variant, tool_names in context_tools.values():
for tool_name in tool_names:
alias = tool_aliases.get(tool_name)
if alias is None:
alias = "%s%s%s" % (prefix, tool_name, suffix)
entry = dict(tool_name=tool_name,
tool_alias=alias,
context_name=context_name,
variant=variant)
if tool_name in hidden_tools:
self.hidden_tools.append(entry)
continue
existing_entry = self.tools.get(alias)
if existing_entry:
if existing_entry.get("context_name") == context_name:
# the same tool is provided in the same context by
# more than one package.
existing_variant = existing_entry["variant"]
if isinstance(existing_variant, set):
existing_variant.add(variant)
else:
existing_entry["variant"] = set([existing_variant,
variant])
else:
self.tool_conflicts[alias].append(entry)
else:
self.tools[alias] = entry
def _FWD__invoke_suite_tool_alias(context_name, tool_name, prefix_char=None,
_script=None, _cli_args=None):
suite_path = os.path.dirname(os.path.dirname(_script))
path = os.path.join(suite_path, "contexts", "%s.rxt" % context_name)
context = ResolvedContext.load(path)
from rez.wrapper import Wrapper
w = Wrapper.__new__(Wrapper)
w._init(suite_path, context_name, context, tool_name, prefix_char)
retcode = w.run(*(_cli_args or []))
sys.exit(retcode)