Source code for rez.tests.test_shells

# SPDX-License-Identifier: Apache-2.0
# Copyright Contributors to the Rez Project


"""
test shell invocation
"""
from __future__ import print_function

from rez.system import system
from rez.shells import create_shell, get_shell_types, get_shell_class
from rez.resolved_context import ResolvedContext
from rez.rex import literal, expandable
from rez.plugin_managers import plugin_manager
from rez.utils.execution import ExecutableScriptMode, _get_python_script_files
from rez.tests.util import TestBase, TempdirMixin, per_available_shell, \
    install_dependent
from rez.bind import hello_world
from rez.config import config
import unittest
import subprocess
import tempfile
import inspect
import textwrap
import os


def _stdout(proc):
    out_, _ = proc.communicate()
    if proc.returncode:
        raise RuntimeError(
            "The subprocess failed with exitcode %d" % proc.returncode
        )
    return out_.strip()


[docs]class TestShells(TestBase, TempdirMixin):
[docs] @classmethod def setUpClass(cls): TempdirMixin.setUpClass() packages_path = os.path.join(cls.root, "packages") os.makedirs(packages_path) hello_world.bind(packages_path) cls.settings = dict( packages_path=[packages_path], package_filter=None, implicit_packages=[], warn_untimestamped=False)
[docs] @classmethod def tearDownClass(cls): TempdirMixin.tearDownClass()
@classmethod def _create_context(cls, pkgs): return ResolvedContext(pkgs, caching=False)
[docs] def test_aaa_shell_presence(self): """Ensure specific shell types are present as loaded plugins. The env var _REZ_ENSURE_TEST_SHELLS should be set by a CI system (such as github actions) to make sure the shells we expect to be installed, are installed, and are getting tested. Note 'aaa' forces unittest to run this test first. """ shells = os.getenv("_REZ_ENSURE_TEST_SHELLS", "").split(',') shells = set(x for x in shells if x) if not shells: self.skipTest("Not ensuring presence of shells from explicit list") return # check for missing shells missing_shells = shells - set(get_shell_types()) if missing_shells: raise RuntimeError( "The following shells should be available for testing but are " "not present: %r" % list(missing_shells) ) # check for unavailable shells for shell in shells: if not get_shell_class(shell).is_available(): raise RuntimeError( "The shell %r is not available (executable not found)" % shell ) # check for shell plugins that failed to load for (name, reason) in plugin_manager.get_failed_plugins("shell"): if name in shells: raise RuntimeError( "The shell plugin %r failed to load: %s" % (name, reason) )
[docs] @per_available_shell() def test_no_output(self, shell): sh = create_shell(shell) _, _, _, command = sh.startup_capabilities(command=True) if command: r = self._create_context(["hello_world"]) p = r.execute_shell(command="hello_world -q", stdout=subprocess.PIPE, text=True) self.assertEqual( _stdout(p), '', "This test and others will fail, because one or more of your " "startup scripts are printing to stdout. Please remove the " "printout and try again.")
[docs] def test_create_executable_script(self): script_file = os.path.join(self.root, "script") py_script_file = os.path.join(self.root, "script.py") for platform in ['windows', 'linux']: files = _get_python_script_files(script_file, ExecutableScriptMode.py, platform) self.assertListEqual(files, [py_script_file]) files = _get_python_script_files(py_script_file, ExecutableScriptMode.py, platform) self.assertListEqual(files, [py_script_file]) files = _get_python_script_files(script_file, ExecutableScriptMode.single, platform) self.assertListEqual(files, [script_file]) files = _get_python_script_files(py_script_file, ExecutableScriptMode.single, platform) self.assertListEqual(files, [py_script_file]) files = _get_python_script_files(script_file, ExecutableScriptMode.both, platform) self.assertListEqual(files, [script_file, py_script_file]) files = _get_python_script_files(py_script_file, ExecutableScriptMode.both, platform) self.assertListEqual(files, [py_script_file]) files = _get_python_script_files(script_file, ExecutableScriptMode.platform_specific, platform) if platform == "windows": self.assertListEqual(files, [py_script_file]) else: self.assertListEqual(files, [script_file]) files = _get_python_script_files(py_script_file, ExecutableScriptMode.platform_specific, platform) self.assertListEqual(files, [py_script_file])
[docs] @per_available_shell() def test_command(self, shell): sh = create_shell(shell) _, _, _, command = sh.startup_capabilities(command=True) if command: r = self._create_context(["hello_world"]) p = r.execute_shell(command="hello_world", stdout=subprocess.PIPE, text=True) self.assertEqual(_stdout(p), "Hello Rez World!")
[docs] @per_available_shell() def test_command_returncode(self, shell): sh = create_shell(shell) _, _, _, command = sh.startup_capabilities(command=True) if command: r = self._create_context(["hello_world"]) command = "hello_world -q -r 66" commands = (command, command.split()) for cmd in commands: with r.execute_shell(command=cmd, stdout=subprocess.PIPE) as p: p.wait() self.assertEqual(p.returncode, 66)
[docs] @per_available_shell() def test_norc(self, shell): sh = create_shell(shell) _, norc, _, command = sh.startup_capabilities(norc=True, command=True) if norc and command: r = self._create_context(["hello_world"]) p = r.execute_shell(norc=True, command="hello_world", stdout=subprocess.PIPE, text=True) self.assertEqual(_stdout(p), "Hello Rez World!")
[docs] @per_available_shell() def test_stdin(self, shell): sh = create_shell(shell) _, _, stdin, _ = sh.startup_capabilities(stdin=True) if stdin: r = self._create_context(["hello_world"]) p = r.execute_shell(stdout=subprocess.PIPE, stdin=subprocess.PIPE, norc=True) stdout, _ = p.communicate(input="hello_world\n") stdout = stdout.strip() self.assertEqual(stdout, "Hello Rez World!")
[docs] @per_available_shell() def test_rcfile(self, shell): sh = create_shell(shell) rcfile, _, _, command = sh.startup_capabilities(rcfile=True, command=True) if rcfile and command: f, path = tempfile.mkstemp() os.write(f, "hello_world\n") os.close(f) r = self._create_context(["hello_world"]) p = r.execute_shell(rcfile=path, command="hello_world -q", stdout=subprocess.PIPE, text=True) self.assertEqual(_stdout(p), "Hello Rez World!") os.remove(path)
# TODO fix cmd shell command string escape # as per https://github.com/nerdvegas/rez/pull/1130, then remove this # exclusion #
[docs] @per_available_shell(exclude=["cmd"]) @install_dependent() def test_rez_env_output(self, shell): def _test(txt): # Assumes that the shell has an echo command, built-in or alias binpath = os.path.join(system.rez_bin_path, "rez-env") args = [binpath, "--shell", shell, "--", "echo", txt] process = subprocess.Popen( args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True ) sh_out = process.communicate() # because powershell may not exit with !0 on error, depending on # how it's been configured # if sh_out[1]: raise RuntimeError("Command %r failed:\n%s" % (txt, sh_out[1])) self.assertEqual(sh_out[0].strip(), txt) # please note - it's no coincidence that there are no substrings like # '$you' here. These would expand to the equivalent env-var (as # intended), which would be an empty string. We're not testing that # here though. # _test("hey") # simple case _test("hey you") # with a space _test("<hey>") # special characters _test("!hey>$") # more special characters _test("'hey'") # single quotes _test('"hey"') # double quotes _test("hey `") # backtick _test("hey $ ?yeah> 'you'..^!") # throw lots of stuff at it
[docs] @per_available_shell() @install_dependent() def test_rez_command(self, shell): sh = create_shell(shell) _, _, _, command = sh.startup_capabilities(command=True) if command: r = self._create_context([]) with r.execute_shell(command="rezolve -h") as p: p.wait() self.assertEqual(p.returncode, 0) with r.execute_shell(command="rez-env -h") as p: p.wait() self.assertEqual(p.returncode, 0)
[docs] @per_available_shell() def test_rex_code(self, shell): """Test that Rex code run in the shell creates the environment variable values that we expect. """ config.override("default_shell", shell) def _execute_code(func, expected_output): loc = inspect.getsourcelines(func)[0][1:] code = textwrap.dedent('\n'.join(loc)) r = self._create_context([]) p = r.execute_rex_code(code, stdout=subprocess.PIPE, text=True) out, _ = p.communicate() self.assertEqual(p.returncode, 0) output = out.strip().split("\n") self.assertEqual(output, expected_output) def _rex_assigning(): from rez.shells import create_shell sh = create_shell() def _print(value): env.FOO = value # Wrap the output in quotes to prevent the shell from # interpreting parts of our output as commands. This can happen # when we include special characters (&, <, >, ^) in a # variable. info('"${FOO}"') env.GREET = "hi" env.WHO = "Gary" _print("ello") _print(literal("ello")) _print(expandable("ello")) info('') _print("\\") _print("\\'") _print("\\\"") _print(literal("\\")) _print(literal("\\'")) _print(literal("\\\"")) _print("\\path1\\path2\\path3") _print(literal("\\path1").e("\\path2\\path3")) _print("hello world") _print("hello 'world'") _print('hello "world"') _print(literal("hello world")) _print(literal("hello 'world'")) _print(literal('hello "world"')) # Generic form of variables _print("hey $WHO") _print("hey ${WHO}") _print(expandable("${GREET} ").e("$WHO")) _print(expandable("${GREET} ").l("$WHO")) _print(literal("${WHO}")) _print(literal("${WHO}").e(" $WHO")) # Make sure we are escaping &, <, >, ^ properly. _print('hey & world') _print('hey > world') _print('hey < world') _print('hey ^ world') # Platform dependent form of variables. for token in sh.get_all_key_tokens("WHO"): _print("hey " + token) _print(expandable("${GREET} ").e(token)) _print(expandable("${GREET} ").l(token)) _print(literal(token)) _print(literal(token).e(" " + token)) expected_output = [ "ello", "ello", "ello", "", "\\", "\\'", "\\\"", "\\", "\\'", "\\\"", "\\path1\\path2\\path3", "\\path1\\path2\\path3", "hello world", "hello 'world'", 'hello "world"', "hello world", "hello 'world'", 'hello "world"', "hey Gary", "hey Gary", "hi Gary", "hi $WHO", "${WHO}", "${WHO} Gary", "hey & world", "hey > world", "hey < world", "hey ^ world" ] # Assertions for other environment variable types from rez.shells import create_shell sh = create_shell() for token in sh.get_all_key_tokens("WHO"): expected_output += [ "hey Gary", "hi Gary", "hi " + token, token, token + " Gary", ] # We are wrapping all variable outputs in quotes in order to make sure # our shell isn't interpreting our output as instructions when echoing # it but this means we need to wrap our expected output as well. Only # exception is empty string, which is just passed through. expected_output = ['"{}"'.format(o) if o else o for o in expected_output] _execute_code(_rex_assigning, expected_output) def _rex_appending(): from rez.shells import create_shell sh = create_shell() env.FOO.append("hey") info(sh.get_key_token("FOO")) env.FOO.append(literal("$DAVE")) info(sh.get_key_token("FOO")) env.FOO.append("Dave's not here man") info(sh.get_key_token("FOO")) expected_output = [ "hey", sh.pathsep.join(["hey", "$DAVE"]), sh.pathsep.join(["hey", "$DAVE", "Dave's not here man"]) ] _execute_code(_rex_appending, expected_output)
[docs] @per_available_shell() def test_rex_code_alias(self, shell): """Ensure PATH changes do not influence the alias command. This is important for Windows because the doskey.exe might not be on the PATH anymore at the time it's executed. That's why we figure out the absolute path to doskey.exe before we modify PATH and continue to use the absolute path after the modifications. """ config.override("default_shell", shell) def _execute_code(func): loc = inspect.getsourcelines(func)[0][1:] code = textwrap.dedent('\n'.join(loc)) r = self._create_context([]) p = r.execute_rex_code(code, stdout=subprocess.PIPE) out, _ = p.communicate() self.assertEqual(p.returncode, 0) def _alias_after_path_manipulation(): # Appending something to the PATH and creating an alias afterwards # did fail before we implemented a doskey specific fix. env.PATH.append("hey") alias("alias_test", "echo test_echo") # We don't expect any output, the shell should just return with exit # code 0. _execute_code(_alias_after_path_manipulation)
[docs] @per_available_shell() def test_alias_command(self, shell): """Testing alias can be passed in as command This is important for Windows CMD shell because the doskey.exe isn't executed yet when the alias is being passed. """ config.override("default_shell", shell) def _make_alias(ex): ex.alias('hi', 'echo "hi"') r = self._create_context([]) p = r.execute_shell(command='hi', actions_callback=_make_alias, stdout=subprocess.PIPE) out, _ = p.communicate() self.assertEqual(0, p.returncode)
[docs] @per_available_shell() def test_alias_command_with_args(self, shell): """Testing alias can be passed in as command with args This is important for Windows CMD shell because the doskey.exe isn't executed yet when the alias is being passed. """ config.override("default_shell", shell) def _make_alias(ex): ex.alias('tell', 'echo') r = self._create_context([]) p = r.execute_shell(command='tell "hello"', actions_callback=_make_alias, stdout=subprocess.PIPE) out, _ = p.communicate() self.assertEqual(0, p.returncode)
if __name__ == '__main__': unittest.main()