rabbitmq
module¶
This module contains utilities for processing RabbitMQ messages.
Classes¶
- class swpt_pythonlib.rabbitmq.Publisher(app: Optional[Flask] = None, *, url_config_key: str = 'SIGNALBUS_RABBITMQ_URL')¶
A RabbitMQ message publisher
Each instance maintains a separate RabbitMQ connection in every thread. If a connection has not been used for longer than the heartbeat interval set for the connection, it will be automatically closed. A new connection will be open when needed.
- Parameters
app – Optional Flask app object. If not provided init_app must be called later, providing the Flask app object.
url_config_key – Optional configuration key for the RabbitMQ’s connection URL
Example:
from flask import Flask from flask_sqlalchemy import SQLAlchemy from swpt_pythonlib import rabbitmq app = Flask(__name__) headers = {'header1': 'value1', 'header2': 'value2'} properties = rabbitmq.MessageProperties( delivery_mode=2, # This makes the message persistent! app_id='example-publisher', content_type='application/json', headers=headers, ) m1 = rabbitmq.Message(exchange='', routing_key='test', body='Message 1', properties=properties) m2 = rabbitmq.Message(exchange='', routing_key='test', body='Message 2', properties=properties, mandatory=True) mq = rabbitmq.Publisher(app) mq.publish_messages([m1, m2])
- init_app(app: Flask) None ¶
Bind the instance to a Flask app object.
- Parameters
app – A Flask app object
- publish_messages(messages: Iterable[Message], *, timeout: Optional[int] = None, allow_retry: bool = True) None ¶
Publishes messages, waiting for delivery confirmations.
This method will block until a confirmation from the RabbitMQ broker has been received for each of the messages.
- Parameters
messages – The messages to publish
timeout – Optional timeout in seconds
- class swpt_pythonlib.rabbitmq.Message(exchange: str, routing_key: str, body: bytes, properties: Optional[MessageProperties] = None, mandatory: bool = False)¶
A typing.NamedTuple representing a RabbitMQ message to be send
- Parameters
exchange – RabbitMQ exchange name
routing_key – RabbitMQ routing key
body – The message’s body
properties – Message properties (see
pika.BasicProperties
)mandatory – If True, requires the message to be added to at least one queue.
- properties: Optional[MessageProperties]¶
Alias for field number 3
- class swpt_pythonlib.rabbitmq.MessageProperties(content_type=None, content_encoding=None, headers=None, delivery_mode=None, priority=None, correlation_id=None, reply_to=None, expiration=None, message_id=None, timestamp=None, type=None, user_id=None, app_id=None, cluster_id=None)¶
Basic message properties
This is an alias for
pika.BasicProperties
.
- class swpt_pythonlib.rabbitmq.Consumer(app: Optional[Flask] = None, *, config_prefix: str = 'SIGNALBUS_RABBITMQ', url: Optional[str] = None, queue: Optional[str] = None, threads: Optional[int] = None, prefetch_size: Optional[int] = None, prefetch_count: Optional[int] = None)¶
A RabbitMQ message consumer
- Parameters
app – Optional Flask app object. If not provided init_app must be called later, providing the Flask app object.
config_prefix – A prefix for the Flask configuration settings for this consumer instance.
url – RabbitMQ’s connection URL. If not passed, the value of the
{config_prefix}_URL
Flask configuration setting will be used.queue – The name of the RabbitMQ queue to consume from. If not passed, the value of the
{config_prefix}_QUEUE
Flask configuration setting will be used.threads – The number of worker threads in the pool. If not passed, the value of the
{config_prefix}_THREADS
Flask configuration setting will be used (the default is 1).prefetch_size – Specifies the prefetch window size. RabbitMQ will send a message in advance if it is equal to or smaller in size than the available prefetch size (and also falls into other prefetch limits). If not passed, the value of the
{config_prefix}_PREFETCH_SIZE
Flask configuration setting will be used (the default is 0, meaning “no specific limit”).prefetch_count – Specifies a prefetch window in terms of whole messages. This field may be used in combination with the prefetch_size field. A message will only be sent in advance if both prefetch windows allow it. Setting a bigger value may give a performance improvement. If not passed, the value of the
{config_prefix}_PREFETCH_COUNT
Flask configuration setting will be used (the default is 1).
The received messages will be processed by a pool of worker threads, created by the consumer instance, after the start method is called. Each consumer instance maintains a separate RabbitMQ connection. If the connection has been closed for some reason, the start method will throw an exception. To continue consuming, the start method can be called again.
This class is meant to be subclassed. For example:
from swpt_pythonlib import rabbitmq class ExampleConsumer(rabbitmq.Consumer): def process_message(self, body, properties): if len(body) == 0: return False # Malformed (empty) message # Process the message here. return True # Successfully processed
- init_app(app: Flask)¶
Bind the instance to a Flask app object.
- Parameters
app – A Flask app object
- process_message(body: bytes, properties: MessageProperties) bool ¶
This method must be implemented by the sub-classes.
- Parameters
body – message body
properties – message properties
The method should return True if the message has been successfully processed, and can be removed from the queue. If False is returned, this means that the message is malformed, and can not be processed. (Usually, malformed messages will be send to a “dead letter queue”.)
- start() None ¶
Opens a RabbitMQ connection and starts processing messages until one of the following things happen:
The connection has been lost.
An error has occurred during message processing.
The stop method has been called on the consumer instance.
This method blocks and never returns normally. If one of the previous things happen an TerminatedConsumtion exception will be raised. Also, this method may raise pika.exceptions.AMQPError when, for some reason, a proper RabbitMQ connection can not be established.
- stop(signum: Any = None, frame: Any = None) None ¶
Orders the consumer to stop.
This is useful for properly handling process termination. For example:
import signal consumer = Consumer(...) # creates the instance signal.signal(signal.SIGINT, consumer.stop) signal.signal(signal.SIGTERM, consumer.stop) consumer.start()
Exceptions¶
- class swpt_pythonlib.rabbitmq.ConnectionError¶
Bases:
DeliveryError
Can not connect to the server.
- class swpt_pythonlib.rabbitmq.TimeoutError¶
Bases:
DeliveryError
The attempt to deliver messages has timed out.