# SPDX-License-Identifier: Apache-2.0
# Copyright Contributors to the Rez Project
"""
Pluggable API for creating subshells using different programs, such as bash.
"""
from rez.rex import RexExecutor, ActionInterpreter, OutputStyle
from rez.util import shlex_join, is_non_string_iterable
from rez.utils.which import which
from rez.utils.logging_ import print_warning
from rez.utils.execution import Popen
from rez.system import system
from rez.exceptions import RezSystemError
from rez.rex import EscapedString
from rez.config import config
from rez.vendor.six import six
import os
import os.path
import pipes
basestring = six.string_types[0]
[docs]def get_shell_types():
"""Returns the available shell types: bash, tcsh etc.
Returns:
List of str: Shells.
"""
from rez.plugin_managers import plugin_manager
return list(plugin_manager.get_plugins('shell'))
[docs]def get_shell_class(shell=None):
"""Get the plugin class associated with the given or current shell.
Returns:
class: Plugin class for shell.
"""
if not shell:
shell = config.default_shell
if not shell:
from rez.system import system
shell = system.shell
from rez.plugin_managers import plugin_manager
return plugin_manager.get_plugin_class("shell", shell)
[docs]def create_shell(shell=None, **kwargs):
"""Returns a Shell of the given or current type.
Returns:
`Shell`: Instance of given shell.
"""
if not shell:
shell = config.default_shell
if not shell:
from rez.system import system
shell = system.shell
from rez.plugin_managers import plugin_manager
return plugin_manager.create_instance('shell', shell, **kwargs)
[docs]class Shell(ActionInterpreter):
"""Class representing a shell, such as bash or tcsh.
"""
schema_dict = {
"prompt": basestring}
[docs] @classmethod
def name(cls):
"""Plugin name.
"""
raise NotImplementedError
[docs] @classmethod
def executable_name(cls):
"""Name of executable to create shell instance.
"""
return cls.name()
[docs] @classmethod
def executable_filepath(cls):
"""Get full filepath to executable, or raise if not found.
"""
return cls.find_executable(cls.executable_name())
@property
def executable(self):
return self.__class__.executable_filepath()
[docs] @classmethod
def is_available(cls):
"""Determine if the shell is available to instantiate.
Returns:
bool: True if the shell can be created.
"""
try:
return cls.executable_filepath() is not None
except RuntimeError:
return False
[docs] @classmethod
def file_extension(cls):
"""Get the file extension associated with the shell.
Returns:
str: Shell file extension.
"""
raise NotImplementedError
[docs] @classmethod
def startup_capabilities(cls, rcfile=False, norc=False, stdin=False,
command=False):
"""
Given a set of options related to shell startup, return the actual
options that will be applied.
@returns 4-tuple representing applied value of each option.
"""
raise NotImplementedError
[docs] @classmethod
def get_syspaths(cls):
raise NotImplementedError
def __init__(self):
self._lines = []
self.settings = config.plugins.shell[self.name()]
def _addline(self, line):
self._lines.append(line)
[docs] def get_output(self, style=OutputStyle.file):
if style == OutputStyle.file:
script = '\n'.join(self._lines) + '\n'
else: # eval style
lines = []
for line in self._lines:
if not line.startswith('#'): # strip comments
line = line.rstrip().rstrip(';')
lines.append(line)
script = ';'.join(lines)
return script
[docs] def new_shell(self):
"""Returns A new, reset shell of the same type."""
return self.__class__()
@classmethod
def _unsupported_option(cls, option, val):
if val and config.warn("shell_startup"):
print_warning("%s ignored, not supported by %s shell"
% (option, cls.name()))
@classmethod
def _overruled_option(cls, option, overruling_option, val):
if val and config.warn("shell_startup"):
print_warning("%s ignored by %s shell - overruled by %s option"
% (option, cls.name(), overruling_option))
[docs] @classmethod
def find_executable(cls, name, check_syspaths=False):
"""Find an executable.
Args:
name (str): Program name.
check_syspaths (bool): If True, check the standard system paths as
well, if program was not found on current $PATH.
Returns:
str: Full filepath of executable.
"""
settings = config.plugins.shell[cls.name()]
if settings.executable_fullpath:
if not os.path.exists(settings.executable_fullpath):
raise RuntimeError(
"Couldn't find executable '%s'." % settings.executable_fullpath
)
else:
return settings.executable_fullpath
exe = which(name)
if not exe and check_syspaths:
paths = cls.get_syspaths()
env = os.environ.copy()
env["PATH"] = os.pathsep.join(paths)
exe = which(name, env=env)
if not exe:
raise RuntimeError("Couldn't find executable '%s'." % name)
return exe
[docs] def spawn_shell(self, context_file, tmpdir, rcfile=None, norc=False,
stdin=False, command=None, env=None, quiet=False,
pre_command=None, add_rez=True,
package_commands_sourced_first=None, **Popen_args):
"""Spawn a possibly interactive subshell.
Args:
context:_file File that must be sourced in the new shell, this
configures the Rez environment.
tmpdir: Tempfiles, if needed, should be created within this path.
rcfile: Custom startup script.
norc: Don't run startup scripts. Overrides rcfile.
stdin: If True, read commands from stdin in a non-interactive shell.
If a different non-False value, such as subprocess.PIPE, the same
occurs, but stdin is also passed to the resulting subprocess.Popen
object.
command: If not None, execute this command in a non-interactive shell.
If an empty string, don't run a command, but don't open an
interactive shell either.
env: Environ dict to execute the shell within; uses the current
environment if None.
quiet: If True, don't show the configuration summary, and suppress
any stdout from startup scripts.
pre_command: Command to inject before the shell command itself. This
is for internal use.
add_rez: If True, assume this shell is being used with rez, and do
things such as set the prompt etc.
package_commands_sourced_first: If True, source the context file before
sourcing startup scripts (such as .bashrc). If False, source
the context file AFTER. If None, use the configured setting.
popen_args: args to pass to the shell process object constructor.
Returns:
A subprocess.Popen object representing the shell process.
"""
raise NotImplementedError
[docs] @classmethod
def convert_tokens(cls, value):
"""
Converts any token like ${VAR} and $VAR to shell specific form.
Uses the ENV_VAR_REGEX to correctly parse tokens.
Args:
value: str to convert
Returns:
str with shell specific variables
"""
return cls.ENV_VAR_REGEX.sub(
lambda m: "".join(cls.get_key_token(g) for g in m.groups() if g),
value
)
[docs] @classmethod
def get_key_token(cls, key):
"""
Encodes the environment variable into the shell specific form.
Shells might implement multiple forms, but the most common/safest
should be returned here.
Args:
key: Variable name to encode
Returns:
str of encoded token form
"""
return cls.get_all_key_tokens(key)[0]
[docs] @classmethod
def get_all_key_tokens(cls, key):
"""
Encodes the environment variable into the shell specific forms.
Shells might implement multiple forms, but the most common/safest
should be always returned at index 0.
Args:
key: Variable name to encode
Returns:
list of str with encoded token forms
"""
raise NotImplementedError
[docs] @classmethod
def line_terminator(cls):
"""
Returns:
str: default line terminator
"""
raise NotImplementedError
[docs] @classmethod
def join(cls, command):
"""
Note: Default to unix sh/bash- friendly behaviour.
Args:
command:
A sequence of program arguments to be joined into a single
string that can be executed in the current shell.
Returns:
A string object representing the command.
"""
replacements = [
# escape ` as \`
('`', "\\`"),
# use double quotes, and put double quotes into single quotes -
# the string $"b is then quoted as "$"'"'"b". This mimics py3.8+
# shlex.join function, except with double instead of single quotes.
#
# We do this because a command like rez-env -- echo 'hey $FOO' needs
# to be interpreted as echo "hey $FOO" in the runtime - ie we would
# expect $FOO to get expanded.
#
('"', '"\'"\'"')
]
return shlex_join(command, replacements=replacements)
[docs]class UnixShell(Shell):
"""
A base class for common *nix shells, such as bash and tcsh.
"""
rcfile_arg = None
norc_arg = None
histfile = None
histvar = None
command_arg = '-c'
stdin_arg = '-s'
last_command_status = '$?'
syspaths = None
#
# startup rules
#
[docs] @classmethod
def supports_norc(cls):
return True
[docs] @classmethod
def supports_command(cls):
return True
[docs] @classmethod
def supports_stdin(cls):
return True
[docs] @classmethod
def get_startup_sequence(cls, rcfile, norc, stdin, command):
"""
Return a dict containing:
- 'stdin': resulting stdin setting.
- 'command': resulting command setting.
- 'do_rcfile': True if a file should be sourced directly.
- 'envvar': Env-var that points at a file to source at startup. Can be None.
- 'files': Existing files that will be sourced (non-user-expanded), in source
order. This may also incorporate rcfile, and file pointed at via envvar.
Can be empty.
- 'bind_files': Files to inject Rez binding into, even if that file doesn't
already exist.
- 'source_bind_files': Whether to source bind files, if they exist.
"""
raise NotImplementedError
[docs] def spawn_shell(self, context_file, tmpdir, rcfile=None, norc=False,
stdin=False, command=None, env=None, quiet=False,
pre_command=None, add_rez=True,
package_commands_sourced_first=None, **Popen_args):
d = self.get_startup_sequence(rcfile, norc, bool(stdin), command)
envvar = d["envvar"]
files = d["files"]
bind_files = d["bind_files"]
do_rcfile = d["do_rcfile"]
shell_command = None
if package_commands_sourced_first is None:
package_commands_sourced_first = config.package_commands_sourced_first
def _record_shell(ex, files, bind_rez=True, print_msg=False):
if bind_rez and package_commands_sourced_first:
ex.source(context_file)
for file_ in files:
if os.path.exists(os.path.expanduser(file_)):
ex.source(file_)
if bind_rez and not package_commands_sourced_first:
ex.source(context_file)
if envvar:
ex.unsetenv(envvar)
if add_rez and bind_rez:
ex.interpreter._bind_interactive_rez()
if print_msg and add_rez and not quiet:
ex.info('')
ex.info('You are now in a rez-configured environment.')
ex.info('')
if system.is_production_rez_install:
ex.command('rezolve context')
def _write_shell(ex, filename):
code = ex.get_output()
target_file = os.path.join(tmpdir, filename)
with open(target_file, 'w') as f:
f.write(code)
return target_file
def _create_ex():
return RexExecutor(interpreter=self.new_shell(),
parent_environ={},
add_default_namespaces=False)
executor = _create_ex()
if self.settings.prompt:
newprompt = '${REZ_ENV_PROMPT}%s' % self.settings.prompt
executor.interpreter._saferefenv('REZ_ENV_PROMPT')
executor.env.REZ_ENV_PROMPT = newprompt
if d["command"] is not None:
_record_shell(executor, files=files)
shell_command = d["command"]
else:
if d["stdin"]:
assert(self.stdin_arg)
shell_command = "%s %s" % (self.executable, self.stdin_arg)
quiet = True
elif do_rcfile:
assert(self.rcfile_arg)
shell_command = "%s %s" % (self.executable, self.rcfile_arg)
else:
shell_command = self.executable
if do_rcfile:
# hijack rcfile to insert our own script
ex = _create_ex()
_record_shell(ex, files=files, print_msg=(not quiet))
filename = "rcfile.%s" % self.file_extension()
filepath = _write_shell(ex, filename)
shell_command += " %s" % filepath
elif envvar:
# hijack env-var to insert our own script
ex = _create_ex()
_record_shell(ex, files=files, print_msg=(not quiet))
filename = "%s.%s" % (envvar, self.file_extension())
filepath = _write_shell(ex, filename)
executor.setenv(envvar, filepath)
else:
# hijack $HOME to insert our own script
files = [x for x in files if x not in bind_files] + list(bind_files)
if files:
for file_ in files:
if file_ in bind_files:
bind_rez = True
files_ = [file_] if d["source_bind_files"] else []
else:
bind_rez = False
files_ = [file_]
ex = _create_ex()
ex.setenv('HOME', os.environ.get('HOME', ''))
_record_shell(ex, files=files_, bind_rez=bind_rez,
print_msg=bind_rez)
_write_shell(ex, os.path.basename(file_))
executor.setenv("HOME", tmpdir)
# keep history
if self.histfile and self.histvar:
histfile = os.path.expanduser(self.histfile)
if os.path.exists(histfile):
executor.setenv(self.histvar, histfile)
else:
if config.warn("shell_startup"):
print_warning(
"WARNING: Could not configure environment from "
"within the target shell (%s); this has been done "
"in the parent process instead." % self.name())
executor.source(context_file)
if shell_command: # an empty string means 'run no command and exit'
executor.command(shell_command)
executor.command("exit %s" % self.last_command_status)
code = executor.get_output()
target_file = os.path.join(tmpdir, "rez-shell.%s" % self.file_extension())
with open(target_file, 'w') as f:
f.write(code)
if d["stdin"] and stdin and (stdin is not True):
Popen_args["stdin"] = stdin
cmd = []
if pre_command:
if isinstance(pre_command, basestring):
cmd = pre_command.strip().split()
else:
cmd = pre_command
cmd.extend([self.executable, target_file])
try:
p = Popen(cmd, env=env, **Popen_args)
except Exception as e:
cmd_str = ' '.join(map(pipes.quote, cmd))
raise RezSystemError("Error running command:\n%s\n%s"
% (cmd_str, str(e)))
return p
[docs] def resetenv(self, key, value, friends=None):
self._addline(self.setenv(key, value))
[docs] def info(self, value):
for line in value.split('\n'):
line = self.escape_string(line)
self._addline('echo %s' % line)
[docs] def error(self, value):
for line in value.split('\n'):
line = self.escape_string(line)
self._addline('echo %s 1>&2' % line)
# escaping is allowed in args, but not in program string
[docs] def command(self, value):
if is_non_string_iterable(value):
it = iter(value)
cmd = EscapedString.disallow(next(it))
args_str = ' '.join(self.escape_string(x) for x in it)
value = "%s %s" % (cmd, args_str)
else:
value = EscapedString.disallow(value)
self._addline(value)
[docs] def shebang(self):
self._addline("#!%s" % self.executable)
[docs] @classmethod
def get_all_key_tokens(cls, key):
return ["${%s}" % key, "$%s" % key]
[docs] @classmethod
def line_terminator(cls):
return "\n"