Patterns

Flask-Coney can be used to archive different messaging patterns. A few typical patterns, found on RabbitMQ Tutorials, will be displayed here.

Hallo World

In this example we have two services (aka flask applications). Service 1 will receive data from a http post request. It will send this data to the message broker with the routing_key “process”. Service 2 will pick up any message with the routing key “process”, process and store the data. Service 2 also provides an api endpoint, which allows a user to request the processed data.

Service 1

from flask import Flask, request
from flask_coney import Coney

app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)


@app.route("/process", methods=["POST"])
def process():
    data = request.get_json()
    # validation ...
    coney.publish(data, routing_key="process")

    return "will be processed, check service2"

Serivce 2

from flask import Flask
from flask_coney import Coney

app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)


@coney.queue(queue_name="process")
def process_queue(ch, method, props, body):
    # do something with body
    print(body, flush=True)

Work queues

In this example service1 publishes a message. This message it then either processed by service2 or service3, depending on how reads the message first. This can be useful, if you want to split up the workload.

Service 1

from flask import Flask
from flask_coney import Coney

app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)

Service 2

from flask import Flask
from flask_coney import Coney

app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)

Service 3

from flask import Flask
from flask_coney import Coney

app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)

Publish/Subscribe

In this example service1 publishes a message to an exchange. Because it is a fanout exchange all queues will receive a copy of this message. Thus, service2 and service3 will process the message.

Service 1

from flask import Flask
from flask_coney import Coney

app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)


@app.route("/pub", methods=["POST"])
def pub():
    coney.publish("A message", exchange_name="notify", routing_key="")
    return "It will be process by service2 and service3"

Service 2

from flask import Flask
from flask_coney import Coney, ExchangeType

app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)


@coney.queue(
    queue_name="service2.notify",
    exchange_name="notify",
    exchange_type=ExchangeType.FANOUT,
)
def notify_queue(ch, method, props, body):
    print(body, flush=True)

Service 3

from flask import Flask
from flask_coney import Coney, ExchangeType

app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)


@coney.queue(
    queue_name="service3.notify",
    exchange_name="notify",
    exchange_type=ExchangeType.FANOUT,
)
def notify_queue(ch, method, props, body):
    print(body, flush=True)

Routing

Service 1

from flask import Flask
from flask_coney import Coney

app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)


@app.route("/warning", methods=["POST"])
def warning():
    coney.publish("This is a warning", exchange_name="logs", routing_key="warning")
    return "Warning published"


@app.route("/error", methods=["POST"])
def error():
    coney.publish("This is an error", exchange_name="logs", routing_key="error")
    return "Error published"


@app.route("/info", methods=["POST"])
def info():
    coney.publish("This is an info", exchange_name="logs", routing_key="info")
    return "Info published"

Service 2

from flask import Flask
from flask_coney import Coney

app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)


@coney.queue(queue_name="logs_error", exchange_name="logs", routing_key="error")
def error_queue(ch, method, props, body):
    print(body, flush=True)


@coney.queue(queue_name="logs_warning", exchange_name="logs", routing_key="warning")
def warning_queue(ch, method, props, body):
    print(body, flush=True)

Service 3

from flask import Flask
from flask_coney import Coney

app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)


@coney.queue(queue_name="logs_info", exchange_name="logs", routing_key="info")
def info_queue(ch, method, props, body):
    print(body, flush=True)

Topics

Service 1

from flask import Flask
from flask_coney import Coney

app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)

Service 2

from flask import Flask
from flask_coney import Coney

app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)

Service 3

from flask import Flask
from flask_coney import Coney

app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)

Request/Reply

Service 1

from flask import Flask
from flask_coney import Coney

app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)


@app.route("/rpc", methods=["GET"])
def rpc():
    response = coney.publish_sync("Hi", routing_key="rpc")

    return response

Service 2

from flask import Flask
from flask_coney import Coney

app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)


@coney.queue(queue_name="rpc")
def rpc_queue(ch, method, props, body):
    result = f"{body.decode('utf-8')} touched by me"
    print(result, flush=True)
    coney.reply_sync(ch, method, props, result)