rabbitmq module

This module contains utilities for processing RabbitMQ messages.

Classes

class swpt_pythonlib.rabbitmq.Publisher(app: Optional[Flask] = None, *, url_config_key: str = 'SIGNALBUS_RABBITMQ_URL')

A RabbitMQ message publisher

Each instance maintains a separate RabbitMQ connection in every thread. If a connection has not been used for longer than the heartbeat interval set for the connection, it will be automatically closed. A new connection will be open when needed.

Parameters
  • app – Optional Flask app object. If not provided init_app must be called later, providing the Flask app object.

  • url_config_key – Optional configuration key for the RabbitMQ’s connection URL

Example:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from swpt_pythonlib import rabbitmq

app = Flask(__name__)

headers = {'header1': 'value1', 'header2': 'value2'}
properties = rabbitmq.MessageProperties(
    delivery_mode=2,  # This makes the message persistent!
    app_id='example-publisher',
    content_type='application/json',
    headers=headers,
)
m1 = rabbitmq.Message(exchange='', routing_key='test',
                      body='Message 1', properties=properties)
m2 = rabbitmq.Message(exchange='', routing_key='test',
                      body='Message 2', properties=properties,
                      mandatory=True)

mq = rabbitmq.Publisher(app)
mq.publish_messages([m1, m2])
init_app(app: Flask) None

Bind the instance to a Flask app object.

Parameters

app – A Flask app object

publish_messages(messages: Iterable[Message], *, timeout: Optional[int] = None, allow_retry: bool = True) None

Publishes messages, waiting for delivery confirmations.

This method will block until a confirmation from the RabbitMQ broker has been received for each of the messages.

Parameters
  • messages – The messages to publish

  • timeout – Optional timeout in seconds

class swpt_pythonlib.rabbitmq.Message(exchange: str, routing_key: str, body: bytes, properties: Optional[MessageProperties] = None, mandatory: bool = False)

A typing.NamedTuple representing a RabbitMQ message to be send

Parameters
  • exchange – RabbitMQ exchange name

  • routing_key – RabbitMQ routing key

  • body – The message’s body

  • properties – Message properties (see pika.BasicProperties)

  • mandatory – If True, requires the message to be added to at least one queue.

body: bytes

Alias for field number 2

exchange: str

Alias for field number 0

mandatory: bool

Alias for field number 4

properties: Optional[MessageProperties]

Alias for field number 3

routing_key: str

Alias for field number 1

class swpt_pythonlib.rabbitmq.MessageProperties(content_type=None, content_encoding=None, headers=None, delivery_mode=None, priority=None, correlation_id=None, reply_to=None, expiration=None, message_id=None, timestamp=None, type=None, user_id=None, app_id=None, cluster_id=None)

Basic message properties

This is an alias for pika.BasicProperties.

class swpt_pythonlib.rabbitmq.Consumer(app: Optional[Flask] = None, *, config_prefix: str = 'SIGNALBUS_RABBITMQ', url: Optional[str] = None, queue: Optional[str] = None, threads: Optional[int] = None, prefetch_size: Optional[int] = None, prefetch_count: Optional[int] = None)

A RabbitMQ message consumer

Parameters
  • app – Optional Flask app object. If not provided init_app must be called later, providing the Flask app object.

  • config_prefix – A prefix for the Flask configuration settings for this consumer instance.

  • url – RabbitMQ’s connection URL. If not passed, the value of the {config_prefix}_URL Flask configuration setting will be used.

  • queue – The name of the RabbitMQ queue to consume from. If not passed, the value of the {config_prefix}_QUEUE Flask configuration setting will be used.

  • threads – The number of worker threads in the pool. If not passed, the value of the {config_prefix}_THREADS Flask configuration setting will be used (the default is 1).

  • prefetch_size – Specifies the prefetch window size. RabbitMQ will send a message in advance if it is equal to or smaller in size than the available prefetch size (and also falls into other prefetch limits). If not passed, the value of the {config_prefix}_PREFETCH_SIZE Flask configuration setting will be used (the default is 0, meaning “no specific limit”).

  • prefetch_count – Specifies a prefetch window in terms of whole messages. This field may be used in combination with the prefetch_size field. A message will only be sent in advance if both prefetch windows allow it. Setting a bigger value may give a performance improvement. If not passed, the value of the {config_prefix}_PREFETCH_COUNT Flask configuration setting will be used (the default is 1).

The received messages will be processed by a pool of worker threads, created by the consumer instance, after the start method is called. Each consumer instance maintains a separate RabbitMQ connection. If the connection has been closed for some reason, the start method will throw an exception. To continue consuming, the start method can be called again.

This class is meant to be subclassed. For example:

from swpt_pythonlib import rabbitmq

class ExampleConsumer(rabbitmq.Consumer):
    def process_message(self, body, properties):
        if len(body) == 0:
            return False  # Malformed (empty) message

        # Process the  message here.

        return True  # Successfully processed
init_app(app: Flask)

Bind the instance to a Flask app object.

Parameters

app – A Flask app object

process_message(body: bytes, properties: MessageProperties) bool

This method must be implemented by the sub-classes.

Parameters
  • body – message body

  • properties – message properties

The method should return True if the message has been successfully processed, and can be removed from the queue. If False is returned, this means that the message is malformed, and can not be processed. (Usually, malformed messages will be send to a “dead letter queue”.)

start() None

Opens a RabbitMQ connection and starts processing messages until one of the following things happen:

  • The connection has been lost.

  • An error has occurred during message processing.

  • The stop method has been called on the consumer instance.

This method blocks and never returns normally. If one of the previous things happen an TerminatedConsumtion exception will be raised. Also, this method may raise pika.exceptions.AMQPError when, for some reason, a proper RabbitMQ connection can not be established.

stop(signum: Any = None, frame: Any = None) None

Orders the consumer to stop.

This is useful for properly handling process termination. For example:

import signal

consumer = Consumer(...)  # creates the instance

signal.signal(signal.SIGINT, consumer.stop)
signal.signal(signal.SIGTERM, consumer.stop)
consumer.start()

Exceptions

class swpt_pythonlib.rabbitmq.DeliveryError

Bases: Exception

A failed attempt to deliver messages.

class swpt_pythonlib.rabbitmq.ConnectionError

Bases: DeliveryError

Can not connect to the server.

class swpt_pythonlib.rabbitmq.TimeoutError

Bases: DeliveryError

The attempt to deliver messages has timed out.

class swpt_pythonlib.rabbitmq.TerminatedConsumtion

Bases: Exception

The consumption has been terminated.