import hmac import hashlib import base64 import pika import os import logging import time from datetime import datetime, timezone USER = os.getenv("RABBITMQ_USER", "rabbitmquser") ACCESS_KEY = os.getenv("RABBITMQ_ACCESS_KEY", "abc") SECRET_KEY = os.getenv("RABBITMQ_SECRET_KEY", "def") VIRTUAL_HOST = os.getenv("RABBITMQ_VIRTUAL_HOST", "public") QUEUE_NAMES = os.getenv("RABBITMQ_QUEUE_NAMES", "WHATEVER-[TenantName]-Listener|Whatever2-[TenantName]-Listener") HOST = os.getenv("RABBITMQ_HOST", "publicapi.nt.cloud.bewotec.de") PORT = int(os.getenv("RABBITMQ_PORT", "5672")) logging.basicConfig(level=logging.INFO) def get_authorization(): """Generate HMAC-based authorization header.""" utc_date = datetime.now(timezone.utc).isoformat() iso_date = utc_date[:10].replace("-", "") + utc_date[11:19].replace(":", "") assemble = iso_date + HOST.upper() + VIRTUAL_HOST + "?q=" + QUEUE_NAMES hmac_digest = hmac.new(SECRET_KEY.encode(), assemble.encode(), hashlib.sha256).digest() signature_in_base64 = base64.b64encode(hmac_digest).decode() authorization = f"DirectGrant {USER} {ACCESS_KEY} {iso_date} {signature_in_base64}" return authorization def callback(ch, method, properties, body): logging.info(f"Received message: {body.decode()}") def main(): max_retries = 1 retry_delay = 2 # seconds attempt = 0 while attempt <= max_retries: connection = None try: # Each time we need to generate a new authorization authorization = get_authorization() base64_encoded_password = base64.b64encode(authorization.encode()) queue_names = QUEUE_NAMES.split("|") username = USER + "?q=" + QUEUE_NAMES logging.info(f"Using password (base64 auth): {base64_encoded_password}") logging.info(f"Connecting to RabbitMQ at {HOST} with user {username} on vhost {VIRTUAL_HOST}") logging.info(f"Subscribing to queues: {queue_names}") credentials = pika.PlainCredentials(username, base64_encoded_password) parameters = pika.ConnectionParameters( host=HOST, port=PORT, virtual_host=VIRTUAL_HOST, credentials=credentials, client_properties={ 'connection_name': 'PythonConsumer', } ) connection = pika.BlockingConnection(parameters) channel = connection.channel() # Declaring a queue is not allowed here because the user has no permission to do so. # channel.queue_declare(queue=QUEUE_NAME, durable=True, arguments={"x-queue-type": "quorum"}) # It is possible to consume messages from multiple queues by single channel, but if one queue is forbidden, connection will be closed. for queue in queue_names: channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True) logging.info(f'Waiting for messages on queues: {QUEUE_NAMES}. To exit press CTRL+C') channel.start_consuming() break # Exit loop if successful except Exception as e: logging.error(f"Error: {e}") # Check for CONNECTION_FORCED error and retry attempt += 1 logging.info(f"Retrying connection in {retry_delay} seconds (attempt {attempt}/{max_retries})...") time.sleep(retry_delay) retry_delay *= 2 # Exponential backoff finally: if connection: try: connection.close() except Exception: pass else: logging.error("Max retries reached. Exiting.") if __name__ == "__main__": main()