flask_signalbus
module¶
This module adds to Flask-SQLAlchemy the capability to conveniently send messages (signals) over a message bus (RabbitMQ for example).
The processing of each message involves two steps:
One or more messages are recorded in the SQL database (as rows in tables).
The messages are sent over the message bus. This should be explicitly triggered with a method call, or through the Flask Command Line Interface.
Usage¶
To use the Flask application factory pattern with flask_signalbus, you
should subclass the SQLAlchemy
class, adding the
SignalBusMixin
mixin to it. For
example:
from flask_sqlalchemy import SQLAlchemy
from swpt_pythonlib.flask_signalbus import SignalBusMixin
class CustomSQLAlchemy(SignalBusMixin, SQLAlchemy):
pass
db = CustomSQLAlchemy()
Note that SignalBusMixin should always come before
SQLAlchemy
.
Each type of message (signal) that we plan to send over the message bus should have its own database model class defined. For example:
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from swpt_pythonlib.flask_signalbus import SignalBusMixin
class CustomSQLAlchemy(SignalBusMixin, SQLAlchemy):
pass
app = Flask(__name__)
db = CustomSQLAlchemy(app)
class MySignal(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
message_text = db.Column(db.Text, nullable=False)
def send_signalbus_message(self):
# Write some code here, that sends
# the message over the message bus!
Here, MySignal
represent one particular type of message that we will be
sending over the message bus. We call this a “signal model”.
A signal model is an otherwise normal database model class (a subclass of
db.Model
), which however has a send_signalbus_message
method
defined.
The
send_signalbus_message
method should be implemented in such a way that when it returns, the message is guaranteed to be successfully sent and stored by the broker. Normally, this means that an acknowledge has been received for the message from the broker.The signal model class may have a
send_signalbus_messages
class method which accepts one positional argument: an iterable of instances of the class. The method should be implemented in such a way that when it returns, all messages for the passed instances are guaranteed to be successfully sent and stored by the broker. Implementing asend_signalbus_messages
class method can greatly improve performance, because message brokers are usually optimized to process messages in batches much more efficiently.The signal model class may have a
signalbus_burst_count
integer attribute defined, which determines how many individual signals can be sent and deleted at once, as a part of one database transaction. This can greatly improve performace in some cases when auto-flushing is disabled, especially when thesend_signalbus_messages
class method is implemented efficiently. If not defined, it defaults to1
.
Transaction Management Utilities¶
As a bonus, flask_signalbus offers some utilities for transaction
management. See
AtomicProceduresMixin
for
details.
Command Line Interface¶
Flask_signalbus will register a group of Flask CLI commands,
starting with the prefix signalbus
. To see all available commands,
use:
$ flask signalbus --help
To send all pending signals, use:
$ flask signalbus flushmany
For the last command, you can specify the exact type of signals which to send.
API Reference¶
Functions¶
Classes¶
- class swpt_pythonlib.flask_signalbus.SignalBus(db: SQLAlchemy)¶
Instances of this class send signal messages that have been recorded in the SQL database, over a message bus. The sending of the recorded messages should be triggered explicitly by a function call.
- flushmany(models: Optional[Iterable[type[flask_sqlalchemy.model.Model]]] = None) int ¶
Send pending signals over the message bus.
If your database (and its SQLAlchemy dialect) supports
FOR UPDATE SKIP LOCKED
, multiple processes will be able to run this method in parallel, without stepping on each others’ toes.- Parameters
models – If passed, flushes only signals of the specified types.
- Returns
The total number of signals that have been sent
- get_signal_models() list[type[flask_sqlalchemy.model.Model]] ¶
Return all signal types in a list.
Mixins¶
- class swpt_pythonlib.flask_signalbus.SignalBusMixin¶
A mixin class that can be used to extend
SQLAlchemy
to send signals.For example:
from flask import Flask from flask_sqlalchemy import SQLAlchemy from swpt_pythonlib.flask_signalbus import SignalBusMixin class CustomSQLAlchemy(SignalBusMixin, SQLAlchemy): pass app = Flask(__name__) db = CustomSQLAlchemy(app) db.signalbus.flush()
- class swpt_pythonlib.flask_signalbus.AtomicProceduresMixin(*args, engine_options=None, **kwargs)¶
A mixin class that adds utility functions to
flask_sqlalchemy.SQLAlchemy
.For example:
from flask_sqlalchemy import SQLAlchemy from swpt_pythonlib.flask_signalbus import AtomicProceduresMixin class CustomSQLAlchemy(AtomicProceduresMixin, SQLAlchemy): pass db = CustomSQLAlchemy() # Now `AtomicProceduresMixin` method are available in `db`.
Note that when subclassing, AtomicProceduresMixin should always come before
flask_sqlalchemy.SQLAlchemy
. AddingAtomicProceduresMixin
has several useful results:AtomicProceduresMixin methods will be available in
db
.If not explicitly configured, the database isolation level will be set to
REPEATABLE_READ
.
- atomic(func)¶
A decorator that wraps a function in an atomic block.
Example:
db = CustomSQLAlchemy() @db.atomic def f(): write_to_db('a message') return 'OK' assert f() == 'OK'
This code defines the function
f
, which is wrapped in an atomic block. Wrapping a function in an atomic block gives several guarantees:The database transaction will be automatically committed if the function returns normally, and automatically rolled back if the function raises unhandled exception.
When the transaction is committed, all objects in
db.session
will be expunged. This means that no lazy loading will be performed on them.If a transaction serialization error occurs during the execution of the function, the function will be re-executed. (It might be re-executed several times.)
Atomic blocks can be nested, but in this case the outermost block takes full control of transaction’s life-cycle, and inner blocks do nothing.
- execute_atomic(func)¶
A decorator that executes a function in an atomic block (see
atomic()
).Example:
db = CustomSQLAlchemy() @db.execute_atomic def result(): write_to_db('a message') return 'OK' assert result == 'OK'
This code defines and executes the function
result
in an atomic block. At the end, the nameresult
holds the value returned from the function.
- retry_on_integrity_error()¶
Re-raise
IntegrityError
as DBSerializationError.This is mainly useful to handle race conditions in atomic blocks. For example, even if prior to a database INSERT we have verified that there is no existing row with the given primary key, we still may get an
IntegrityError
if another transaction inserted a row with this primary key in the meantime. But if we do (within an atomic block):with db.retry_on_integrity_error(): db.session.add(instance)
then if the before-mentioned race condition occurs, DBSerializationError will be raised instead of
IntegrityError
, so that the transaction will be retried (by the atomic block), and the second time our prior-to-INSERT check will correctly detect a primary key collision.Note:
retry_on_integrity_error()
triggers a session flush.
Exceptions¶
- class swpt_pythonlib.flask_signalbus.DBSerializationError¶
The transaction is rolled back due to a race condition.