Source code for rez.utils.amqp

# SPDX-License-Identifier: Apache-2.0
# Copyright Contributors to the Rez Project


import atexit
import socket
import time
import threading
import logging

from rez.utils import json
from rez.utils.logging_ import print_error
from rez.vendor.six.six.moves import queue, urllib
from rez.vendor.pika.adapters.blocking_connection import BlockingConnection
from rez.vendor.pika.connection import ConnectionParameters
from rez.vendor.pika.credentials import PlainCredentials
from rez.vendor.pika.spec import BasicProperties
from rez.config import config


_lock = threading.Lock()
_queue = queue.Queue()
_thread = None
_num_pending = 0


[docs]def publish_message(host, amqp_settings, routing_key, data, block=True): """Publish an AMQP message. Returns: bool: True if message was sent successfully. """ global _thread global _num_pending kwargs = { "host": host, "amqp_settings": amqp_settings, "routing_key": routing_key, "data": data } if block: return _publish_message(**kwargs) if _thread is None: with _lock: if _thread is None: _thread = threading.Thread(target=_publish_messages_async) _thread.daemon = True _thread.start() with _lock: _num_pending += 1 _queue.put(kwargs) return True
def _publish_message(host, amqp_settings, routing_key, data): """Publish an AMQP message. Returns: bool: True if message was sent successfully. """ if host == "stdout": print("Published to %s: %s" % (routing_key, data)) return True set_pika_log_level() conn_kwargs = dict() # name the conn like 'rez.publish.{host}' conn_kwargs["client_properties"] = { "connection_name": "rez.publish.%s" % socket.gethostname() } host, port = parse_host_and_port(url=host) conn_kwargs["host"] = host if port is not None: conn_kwargs["port"] = port if amqp_settings.get("userid"): conn_kwargs["credentials"] = PlainCredentials( username=amqp_settings.get("userid"), password=amqp_settings.get("password") ) params = ConnectionParameters( socket_timeout=amqp_settings.get("connect_timeout"), **conn_kwargs ) props = BasicProperties( content_type="application/json", content_encoding="utf-8", delivery_mode=amqp_settings.get("message_delivery_mode") ) try: conn = BlockingConnection(params) except socket.error as e: print_error("Cannot connect to the message broker: %s" % e) return False try: channel = conn.channel() channel.basic_publish( exchange=amqp_settings["exchange_name"], routing_key=routing_key, body=json.dumps(data), properties=props ) except Exception as e: print_error("Failed to publish message: %s" % (e)) return False finally: conn.close() return True def _publish_messages_async(): global _num_pending while True: kwargs = _queue.get() try: _publish_message(**kwargs) finally: with _lock: _num_pending -= 1
[docs]@atexit.register def on_exit(): # Give pending messages a chance to publish, otherwise a command like # 'rez-env --output ...' could exit before the publish. # t = time.time() maxtime = 5 timeinc = 0.1 while _num_pending and (time.time() - t) < maxtime: time.sleep(timeinc)
[docs]def parse_host_and_port(url): _url = urllib.parse.urlsplit(url) if not _url.scheme: _url = urllib.parse.urlsplit("//" + url) host = _url.hostname port = _url.port return host, port
[docs]def set_pika_log_level(): mod_name = "rez.vendor.pika" if config.debug("context_tracking"): logging.getLogger(mod_name).setLevel(logging.DEBUG) else: logging.getLogger(mod_name).setLevel(logging.WARNING)