flask_signalbus module

This module adds to Flask-SQLAlchemy the capability to conveniently send messages (signals) over a message bus (RabbitMQ for example).

The processing of each message involves two steps:

  1. One or more messages are recorded in the SQL database (as rows in tables).

  2. The messages are sent over the message bus. This should be explicitly triggered with a method call, or through the Flask Command Line Interface.

Usage

To use the Flask application factory pattern with flask_signalbus, you should subclass the SQLAlchemy class, adding the SignalBusMixin mixin to it. For example:

from flask_sqlalchemy import SQLAlchemy
from swpt_pythonlib.flask_signalbus import SignalBusMixin

class CustomSQLAlchemy(SignalBusMixin, SQLAlchemy):
    pass

db = CustomSQLAlchemy()

Note that SignalBusMixin should always come before SQLAlchemy.

Each type of message (signal) that we plan to send over the message bus should have its own database model class defined. For example:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from swpt_pythonlib.flask_signalbus import SignalBusMixin

class CustomSQLAlchemy(SignalBusMixin, SQLAlchemy):
    pass

app = Flask(__name__)
db = CustomSQLAlchemy(app)

class MySignal(db.Model):
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    message_text = db.Column(db.Text, nullable=False)

    def send_signalbus_message(self):
        # Write some code here, that sends
        # the message over the message bus!

Here, MySignal represent one particular type of message that we will be sending over the message bus. We call this a “signal model”.

A signal model is an otherwise normal database model class (a subclass of db.Model), which however has a send_signalbus_message method defined.

  • The send_signalbus_message method should be implemented in such a way that when it returns, the message is guaranteed to be successfully sent and stored by the broker. Normally, this means that an acknowledge has been received for the message from the broker.

  • The signal model class may have a send_signalbus_messages class method which accepts one positional argument: an iterable of instances of the class. The method should be implemented in such a way that when it returns, all messages for the passed instances are guaranteed to be successfully sent and stored by the broker. Implementing a send_signalbus_messages class method can greatly improve performance, because message brokers are usually optimized to process messages in batches much more efficiently.

  • The signal model class may have a signalbus_burst_count integer attribute defined, which determines how many individual signals can be sent and deleted at once, as a part of one database transaction. This can greatly improve performace in some cases when auto-flushing is disabled, especially when the send_signalbus_messages class method is implemented efficiently. If not defined, it defaults to 1.

Transaction Management Utilities

As a bonus, flask_signalbus offers some utilities for transaction management. See AtomicProceduresMixin for details.

Command Line Interface

Flask_signalbus will register a group of Flask CLI commands, starting with the prefix signalbus. To see all available commands, use:

$ flask signalbus --help

To send all pending signals, use:

$ flask signalbus flushmany

For the last command, you can specify the exact type of signals which to send.

API Reference

Functions

swpt_pythonlib.flask_signalbus.get_models_to_flush(signalbus: SignalBus, model_names: list[str]) list[type[flask_sqlalchemy.model.Model]]

Given model names, return model classes.

Invalid model names are ignored with a warning. This function is useful for implementing CLI tools.

Classes

class swpt_pythonlib.flask_signalbus.SignalBus(db: SQLAlchemy)

Instances of this class send signal messages that have been recorded in the SQL database, over a message bus. The sending of the recorded messages should be triggered explicitly by a function call.

flushmany(models: Optional[Iterable[type[flask_sqlalchemy.model.Model]]] = None) int

Send pending signals over the message bus.

If your database (and its SQLAlchemy dialect) supports FOR UPDATE SKIP LOCKED, multiple processes will be able to run this method in parallel, without stepping on each others’ toes.

Parameters

models – If passed, flushes only signals of the specified types.

Returns

The total number of signals that have been sent

get_signal_models() list[type[flask_sqlalchemy.model.Model]]

Return all signal types in a list.

Mixins

class swpt_pythonlib.flask_signalbus.SignalBusMixin

A mixin class that can be used to extend SQLAlchemy to send signals.

For example:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from swpt_pythonlib.flask_signalbus import SignalBusMixin

class CustomSQLAlchemy(SignalBusMixin, SQLAlchemy):
    pass

app = Flask(__name__)
db = CustomSQLAlchemy(app)
db.signalbus.flush()
class swpt_pythonlib.flask_signalbus.AtomicProceduresMixin(*args, engine_options=None, **kwargs)

A mixin class that adds utility functions to flask_sqlalchemy.SQLAlchemy.

For example:

from flask_sqlalchemy import SQLAlchemy
from swpt_pythonlib.flask_signalbus import AtomicProceduresMixin

class CustomSQLAlchemy(AtomicProceduresMixin, SQLAlchemy):
    pass

db = CustomSQLAlchemy()

# Now `AtomicProceduresMixin` method are available in `db`.

Note that when subclassing, AtomicProceduresMixin should always come before flask_sqlalchemy.SQLAlchemy. Adding AtomicProceduresMixin has several useful results:

  1. AtomicProceduresMixin methods will be available in db.

  2. If not explicitly configured, the database isolation level will be set to REPEATABLE_READ.

atomic(func)

A decorator that wraps a function in an atomic block.

Example:

db = CustomSQLAlchemy()

@db.atomic
def f():
    write_to_db('a message')
    return 'OK'

assert f() == 'OK'

This code defines the function f, which is wrapped in an atomic block. Wrapping a function in an atomic block gives several guarantees:

  1. The database transaction will be automatically committed if the function returns normally, and automatically rolled back if the function raises unhandled exception.

  2. When the transaction is committed, all objects in db.session will be expunged. This means that no lazy loading will be performed on them.

  3. If a transaction serialization error occurs during the execution of the function, the function will be re-executed. (It might be re-executed several times.)

Atomic blocks can be nested, but in this case the outermost block takes full control of transaction’s life-cycle, and inner blocks do nothing.

execute_atomic(func)

A decorator that executes a function in an atomic block (see atomic()).

Example:

db = CustomSQLAlchemy()

@db.execute_atomic
def result():
    write_to_db('a message')
    return 'OK'

assert result == 'OK'

This code defines and executes the function result in an atomic block. At the end, the name result holds the value returned from the function.

retry_on_integrity_error()

Re-raise IntegrityError as DBSerializationError.

This is mainly useful to handle race conditions in atomic blocks. For example, even if prior to a database INSERT we have verified that there is no existing row with the given primary key, we still may get an IntegrityError if another transaction inserted a row with this primary key in the meantime. But if we do (within an atomic block):

with db.retry_on_integrity_error():
    db.session.add(instance)

then if the before-mentioned race condition occurs, DBSerializationError will be raised instead of IntegrityError, so that the transaction will be retried (by the atomic block), and the second time our prior-to-INSERT check will correctly detect a primary key collision.

Note: retry_on_integrity_error() triggers a session flush.

Exceptions

class swpt_pythonlib.flask_signalbus.DBSerializationError

The transaction is rolled back due to a race condition.