# SPDX-License-Identifier: Apache-2.0
# Copyright Contributors to the Rez Project
from __future__ import print_function
from rez.resolved_context import ResolvedContext
from rez.utils.colorize import heading, local, critical, Printer
from rez.utils.data_utils import cached_property
from rez.utils.formatting import columnise
from rez.vendor import yaml
from rez.vendor.yaml.error import YAMLError
from rez.exceptions import RezSystemError, SuiteError
from rez.config import config
import os.path
import sys
[docs]class Wrapper(object):
"""A Wrapper.
A wrapper is a tool created by a `Suite`. Wrappers reside in the ./bin
directory of a suite. They are executable yaml files that are run with the
internal '_rez-forward' tool.
When a wrapper is executed, it runs the associated tool within the matching
context in the suite.
"""
def __init__(self, filepath):
"""Create a wrapper given its executable file."""
from rez.suite import Suite
def _err(msg):
raise RezSystemError("Invalid executable file %s: %s"
% (filepath, msg))
with open(filepath) as f:
content = f.read()
try:
doc = yaml.load(content, Loader=yaml.FullLoader)
doc = doc["kwargs"]
context_name = doc["context_name"]
tool_name = doc["tool_name"]
prefix_char = doc.get("prefix_char")
except YAMLError as e:
_err(str(e))
# check that the suite is there - a wrapper may have been moved out of
# a suite's ./bin path, which renders it useless.
suite_path = os.path.dirname(os.path.dirname(filepath))
try:
Suite.load(suite_path)
except SuiteError as e:
_err(str(e))
path = os.path.join(suite_path, "contexts", "%s.rxt" % context_name)
context = ResolvedContext.load(path)
self._init(suite_path, context_name, context, tool_name, prefix_char)
def _init(self, suite_path, context_name, context, tool_name, prefix_char=None):
self.suite_path = suite_path
self.context_name = context_name
self.context = context
self.tool_name = tool_name
self.prefix_char = prefix_char
@cached_property
def suite(self):
from rez.suite import Suite
return Suite.load(self.suite_path)
[docs] def run(self, *args):
"""Invoke the wrapped script.
Returns:
Return code of the command, or 0 if the command is not run.
"""
if self.prefix_char is None:
prefix_char = config.suite_alias_prefix_char
else:
prefix_char = self.prefix_char
if prefix_char == '':
# empty prefix char means we don't support the '+' args
return self._run_no_args(args)
else:
return self._run(prefix_char, args)
def _run_no_args(self, args):
cmd = [self.tool_name] + list(args)
retcode, _, _ = self.context.execute_shell(command=cmd, block=True)
return retcode
def _run(self, prefix_char, args):
import argparse
parser = argparse.ArgumentParser(prog=self.tool_name,
prefix_chars=prefix_char)
def _add_argument(*nargs, **kwargs):
nargs_ = []
for narg in nargs:
nargs_.append(narg.replace('=', prefix_char))
parser.add_argument(*nargs_, **kwargs)
_add_argument(
"=a", "==about", action="store_true",
help="print information about the tool")
_add_argument(
"=i", "==interactive", action="store_true",
help="launch an interactive shell within the tool's configured "
"environment")
_add_argument(
"=p", "==patch", type=str, nargs='*', metavar="PKG",
help="run the tool in a patched environment")
_add_argument(
"==versions", action="store_true",
help="list versions of package providing this tool")
_add_argument(
"==command", type=str, nargs='+', metavar=("COMMAND", "ARG"),
help="read commands from string, rather than executing the tool")
_add_argument(
"==stdin", action="store_true",
help="read commands from standard input, rather than executing the tool")
_add_argument(
"==strict", action="store_true",
help="strict patching. Ignored if ++patch is not present")
_add_argument(
"==nl", "==no-local", dest="no_local", action="store_true",
help="don't load local packages when patching")
_add_argument(
"==peek", action="store_true",
help="diff against the tool's context and a re-resolved copy - "
"this shows how 'stale' the context is")
_add_argument(
"==verbose", action="count", default=0,
help="verbose mode, repeat for more verbosity")
_add_argument(
"==quiet", action="store_true",
help="hide welcome message when entering interactive mode")
_add_argument(
"==no-rez-args", dest="no_rez_args", action="store_true",
help="pass all args to the tool, even if they start with '%s'" % prefix_char)
opts, tool_args = parser.parse_known_args(args)
if opts.no_rez_args:
args = list(args)
args.remove("==no-rez-args".replace('=', prefix_char))
tool_args = args
opts = parser.parse_args([])
# print info
if opts.about:
return self.print_about()
elif opts.versions:
return self.print_package_versions()
elif opts.peek:
return self.peek()
# patching
context = self.context
if opts.patch is not None:
new_request = opts.patch
request = context.get_patched_request(new_request, strict=opts.strict)
config.remove_override("quiet")
pkg_paths = (config.nonlocal_packages_path
if opts.no_local else None)
context = ResolvedContext(request,
package_paths=pkg_paths,
verbosity=opts.verbose)
# reapply quiet mode (see cli.forward)
if "REZ_QUIET" not in os.environ:
config.override("quiet", True)
if opts.stdin:
# generally shells will behave as though the '-s' flag was not present
# when no stdin is available. So here we replicate this behaviour.
import select
try:
if not select.select([sys.stdin], [], [], 0.0)[0]:
opts.stdin = False
except select.error:
pass # because windows
# construct command
cmd = None
if opts.command:
cmd = opts.command
elif opts.interactive:
label = self.context_name
if opts.patch:
label += '*'
config.override("prompt", "%s>" % label)
cmd = None
else:
cmd = [self.tool_name] + tool_args
retcode, _, _ = context.execute_shell(command=cmd,
stdin=opts.stdin,
quiet=opts.quiet,
block=True)
return retcode
[docs] def print_about(self):
"""Print an info message about the tool."""
filepath = os.path.join(self.suite_path, "bin", self.tool_name)
print("Tool: %s" % self.tool_name)
print("Path: %s" % filepath)
print("Suite: %s" % self.suite_path)
msg = "%s (%r)" % (self.context.load_path, self.context_name)
print("Context: %s" % msg)
variants = self.context.get_tool_variants(self.tool_name)
if variants:
if len(variants) > 1:
self._print_conflicting(variants)
else:
variant = next(iter(variants))
print("Package: %s" % variant.qualified_package_name)
return 0
[docs] def print_package_versions(self):
"""Print a list of versions of the package this tool comes from, and
indicate which version this tool is from."""
variants = self.context.get_tool_variants(self.tool_name)
if variants:
if len(variants) > 1:
self._print_conflicting(variants)
return 1
else:
from rez.packages import iter_packages
variant = next(iter(variants))
it = iter_packages(name=variant.name)
rows = []
colors = []
for pkg in sorted(it, key=lambda x: x.version, reverse=True):
if pkg.version == variant.version:
name = "* %s" % pkg.qualified_name
col = heading
else:
name = " %s" % pkg.qualified_name
col = local if pkg.is_local else None
label = "(local)" if pkg.is_local else ""
rows.append((name, pkg.path, label))
colors.append(col)
_pr = Printer()
for col, line in zip(colors, columnise(rows)):
_pr(line, col)
return 0
[docs] def peek(self):
config.remove_override("quiet")
new_context = ResolvedContext(self.context.requested_packages(),
package_paths=self.context.package_paths)
# reapply quiet mode (see cli.forward)
if "REZ_QUIET" not in os.environ:
config.override("quiet", True)
self.context.print_resolve_diff(new_context)
return 0
@classmethod
def _print_conflicting(cls, variants):
vars_str = " ".join(x.qualified_package_name for x in variants)
msg = "Packages (in conflict): %s" % vars_str
Printer()(msg, critical)