# SPDX-License-Identifier: Apache-2.0
# Copyright Contributors to the Rez Project
from __future__ import print_function
from rez.packages import iter_packages
from rez.exceptions import BuildProcessError, BuildContextResolveError, \
ReleaseHookCancellingError, RezError, ReleaseError, BuildError, \
ReleaseVCSError, _NeverError
from rez.utils.logging_ import print_warning
from rez.utils.colorize import heading, Printer
from rez.resolved_context import ResolvedContext
from rez.release_hook import create_release_hooks
from rez.resolver import ResolverStatus
from rez.config import config
from rez.vendor.enum import Enum
from contextlib import contextmanager
from pipes import quote
import getpass
import os.path
import sys
debug_print = config.debug_printer("package_release")
[docs]def get_build_process_types():
"""Returns the available build process implementations."""
from rez.plugin_managers import plugin_manager
return plugin_manager.get_plugins('build_process')
[docs]def create_build_process(process_type, working_dir, build_system, package=None,
vcs=None, ensure_latest=True, skip_repo_errors=False,
ignore_existing_tag=False, verbose=False, quiet=False):
"""Create a `BuildProcess` instance."""
from rez.plugin_managers import plugin_manager
process_types = get_build_process_types()
if process_type not in process_types:
raise BuildProcessError("Unknown build process: %r" % process_type)
cls = plugin_manager.get_plugin_class('build_process', process_type)
return cls(working_dir, # ignored (deprecated)
build_system,
package=package, # ignored (deprecated)
vcs=vcs,
ensure_latest=ensure_latest,
skip_repo_errors=skip_repo_errors,
ignore_existing_tag=ignore_existing_tag,
verbose=verbose,
quiet=quiet)
[docs]class BuildType(Enum):
""" Enum to represent the type of build."""
local = 0
central = 1
[docs]class BuildProcess(object):
"""A BuildProcess builds and possibly releases a package.
A build process iterates over the variants of a package, creates the
correct build environment for each variant, builds that variant using a
build system (or possibly creates a script so the user can do that
independently), and then possibly releases the package with the nominated
VCS. This is an abstract base class, you should use a BuildProcess
subclass.
"""
[docs] @classmethod
def name(cls):
raise NotImplementedError
def __init__(self, working_dir, build_system, package=None, vcs=None,
ensure_latest=True, skip_repo_errors=False,
ignore_existing_tag=False, verbose=False, quiet=False):
"""Create a BuildProcess.
Args:
working_dir (DEPRECATED): Ignored.
build_system (`BuildSystem`): Build system used to build the package.
package (DEPRECATED): Ignored.
vcs (`ReleaseVCS`): Version control system to use for the release
process.
ensure_latest: If True, do not allow the release process to occur
if an newer versioned package is already released.
skip_repo_errors: If True, proceed with the release even when errors
occur. BE CAREFUL using this option, it is here in case a package
needs to be released urgently even though there is some problem
with reading or writing the repository.
ignore_existing_tag: Perform the release even if the repository is
already tagged at the current version. If the config setting
plugins.release_vcs.check_tag is False, this has no effect.
verbose (bool): Verbose mode.
quiet (bool): Quiet mode (overrides `verbose`).
"""
self.verbose = verbose and not quiet
self.quiet = quiet
self.build_system = build_system
self.vcs = vcs
self.ensure_latest = ensure_latest
self.skip_repo_errors = skip_repo_errors
self.ignore_existing_tag = ignore_existing_tag
if vcs and vcs.pkg_root != self.working_dir:
raise BuildProcessError(
"Build process was instantiated with a mismatched VCS instance")
if os.path.isabs(self.package.config.build_directory):
self.build_path = self.package.config.build_directory
else:
self.build_path = os.path.join(self.working_dir,
self.package.config.build_directory)
@property
def package(self):
return self.build_system.package
@property
def working_dir(self):
return self.build_system.working_dir
[docs] def build(self, install_path=None, clean=False, install=False, variants=None):
"""Perform the build process.
Iterates over the package's variants, resolves the environment for
each, and runs the build system within each resolved environment.
Args:
install_path (str): The package repository path to install the
package to, if installing. If None, defaults to
`config.local_packages_path`.
clean (bool): If True, clear any previous build first. Otherwise,
rebuild over the top of a previous build.
install (bool): If True, install the build.
variants (list of int): Indexes of variants to build, all if None.
Raises:
`BuildError`: If the build failed.
Returns:
int: Number of variants successfully built.
"""
raise NotImplementedError
[docs] def release(self, release_message=None, variants=None):
"""Perform the release process.
Iterates over the package's variants, building and installing each into
the release path determined by `config.release_packages_path`.
Args:
release_message (str): Message to associate with the release.
variants (list of int): Indexes of variants to release, all if None.
Raises:
`ReleaseError`: If the release failed.
Returns:
int: Number of variants successfully released.
"""
raise NotImplementedError
[docs] def get_changelog(self):
"""Get the changelog since last package release.
Returns:
str: Changelog.
"""
raise NotImplementedError
[docs]class BuildProcessHelper(BuildProcess):
"""A BuildProcess base class with some useful functionality.
"""
[docs] @contextmanager
def repo_operation(self):
exc_type = ReleaseVCSError if self.skip_repo_errors else _NeverError
try:
yield
except exc_type as e:
print_warning("THE FOLLOWING ERROR WAS SKIPPED:\n%s" % str(e))
[docs] def visit_variants(self, func, variants=None, **kwargs):
"""Iterate over variants and call a function on each."""
if variants:
present_variants = range(self.package.num_variants)
invalid_variants = set(variants) - set(present_variants)
if invalid_variants:
raise BuildError(
"The package does not contain the variants: %s"
% ", ".join(str(x) for x in sorted(invalid_variants)))
# iterate over variants
results = []
num_visited = 0
for variant in self.package.iter_variants():
if variants and variant.index not in variants:
self._print_header(
"Skipping variant %s (%s)..."
% (variant.index, self._n_of_m(variant)))
continue
# visit the variant
result = func(variant, **kwargs)
results.append(result)
num_visited += 1
return num_visited, results
[docs] def get_package_install_path(self, path):
"""Return the installation path for a package (where its payload goes).
Args:
path (str): Package repository path.
"""
from rez.package_repository import package_repository_manager
pkg_repo = package_repository_manager.get_repository(path)
return pkg_repo.get_package_payload_path(
package_name=self.package.name,
package_version=self.package.version
)
[docs] def create_build_context(self, variant, build_type, build_path):
"""Create a context to build the variant within."""
request = variant.get_requires(build_requires=True,
private_build_requires=True)
req_strs = map(str, request)
quoted_req_strs = map(quote, req_strs)
self._print("Resolving build environment: %s", ' '.join(quoted_req_strs))
if build_type == BuildType.local:
packages_path = self.package.config.packages_path
else:
packages_path = self.package.config.nonlocal_packages_path
# It is uncommon, but possible, to define the package filters in the
# developer package. Example scenario: you may want to enable visiblity
# of *.dev packages if the current package is *.dev also, for example
# (assuming you have a production-time package filter which filters out
# *.dev packages by default).
#
if self.package.config.is_overridden("package_filter"):
from rez.package_filter import PackageFilterList
data = self.package.config.package_filter
package_filter = PackageFilterList.from_pod(data)
else:
package_filter = None
# create the build context
context = ResolvedContext(request,
package_paths=packages_path,
package_filter=package_filter,
building=True)
if self.verbose:
context.print_info()
# save context before possible fail, so user can debug
rxt_filepath = os.path.join(build_path, "build.rxt")
context.save(rxt_filepath)
if context.status != ResolverStatus.solved:
raise BuildContextResolveError(context)
return context, rxt_filepath
[docs] def pre_release(self):
release_settings = self.package.config.plugins.release_vcs
# test that the release path exists
release_path = self.package.config.release_packages_path
if not os.path.exists(release_path):
raise ReleaseError("Release path does not exist: %r" % release_path)
# test that the repo is in a state to release
if self.vcs:
self._print("Checking state of repository...")
with self.repo_operation():
self.vcs.validate_repostate()
# check if the repo is already tagged at the current version
if release_settings.check_tag and not self.ignore_existing_tag:
tag_name = self.get_current_tag_name()
tag_exists = False
with self.repo_operation():
tag_exists = self.vcs.tag_exists(tag_name)
if tag_exists:
raise ReleaseError(
"Cannot release - the current package version '%s' is "
"already tagged in the repository. Use --ignore-existing-tag "
"to force the release" % self.package.version)
it = iter_packages(self.package.name, paths=[release_path])
packages = sorted(it, key=lambda x: x.version, reverse=True)
# check UUID. This stops unrelated packages that happen to have the same
# name, being released as though they are the same package
if self.package.uuid and packages:
latest_package = packages[0]
if latest_package.uuid and latest_package.uuid != self.package.uuid:
raise ReleaseError(
"Cannot release - the packages are not the same (UUID mismatch)")
# test that a newer package version hasn't already been released
if self.ensure_latest:
for package in packages:
if package.version > self.package.version:
raise ReleaseError(
"Cannot release - a newer package version already "
"exists (%s)" % package.uri)
else:
break
[docs] def post_release(self, release_message=None):
tag_name = self.get_current_tag_name()
if self.vcs is None:
return # nothing more to do
# write a tag for the new release into the vcs
with self.repo_operation():
self.vcs.create_release_tag(tag_name=tag_name, message=release_message)
[docs] def get_current_tag_name(self):
release_settings = self.package.config.plugins.release_vcs
try:
tag_name = self.package.format(release_settings.tag_name)
except Exception as e:
raise ReleaseError("Error formatting release tag name: %s" % str(e))
if not tag_name:
tag_name = "unversioned"
return tag_name
[docs] def run_hooks(self, hook_event, **kwargs):
hook_names = self.package.config.release_hooks or []
hooks = create_release_hooks(hook_names, self.working_dir)
for hook in hooks:
debug_print("Running %s hook '%s'...",
hook_event.label, hook.name())
try:
func = getattr(hook, hook_event.__name__)
func(user=getpass.getuser(), **kwargs)
except ReleaseHookCancellingError as e:
raise ReleaseError(
"%s cancelled by %s hook '%s': %s:\n%s"
% (hook_event.noun, hook_event.label, hook.name(),
e.__class__.__name__, str(e)))
except RezError:
debug_print("Error in %s hook '%s': %s:\n%s"
% (hook_event.label, hook.name(),
e.__class__.__name__, str(e)))
[docs] def get_previous_release(self):
release_path = self.package.config.release_packages_path
it = iter_packages(self.package.name, paths=[release_path])
packages = sorted(it, key=lambda x: x.version, reverse=True)
for package in packages:
if package.version < self.package.version:
return package
return None
[docs] def get_changelog(self):
previous_package = self.get_previous_release()
if previous_package:
previous_revision = previous_package.revision
else:
previous_revision = None
changelog = None
with self.repo_operation():
changelog = self.vcs.get_changelog(
previous_revision,
max_revisions=config.max_package_changelog_revisions)
return changelog
[docs] def get_release_data(self):
"""Get release data for this release.
Returns:
dict.
"""
previous_package = self.get_previous_release()
if previous_package:
previous_version = previous_package.version
previous_revision = previous_package.revision
else:
previous_version = None
previous_revision = None
if self.vcs is None:
return dict(vcs="None",
previous_version=previous_version)
revision = None
with self.repo_operation():
revision = self.vcs.get_current_revision()
changelog = self.get_changelog()
# truncate changelog - very large changelogs can cause package load
# times to be very high, we don't want that
maxlen = config.max_package_changelog_chars
if maxlen and changelog and len(changelog) > maxlen + 3:
changelog = changelog[:maxlen] + "..."
return dict(vcs=self.vcs.name(),
revision=revision,
changelog=changelog,
previous_version=previous_version,
previous_revision=previous_revision)
def _print(self, txt, *nargs):
if self.verbose:
if nargs:
txt = txt % nargs
print(txt)
def _print_header(self, txt, n=1):
if self.quiet:
return
self._print('')
if n <= 1:
br = '=' * 80
title = "%s\n%s\n%s" % (br, txt, br)
else:
title = "%s\n%s" % (txt, '-' * len(txt))
pr = Printer(sys.stdout)
pr(title, heading)
def _n_of_m(self, variant):
num_variants = max(self.package.num_variants, 1)
index = (variant.index or 0) + 1
return "%d/%d" % (index, num_variants)