API

Configuration

class flask_coney.Coney(app=None, testing=False)

This class is used to control the Coney integration to one or more Flask applications. Depending on how you initialize the object it is usable right away or will attach as needed to a Flask application.

There are two usage modes which work very similarly. One is binding the instance to a very specific Flask application:

app = Flask(__name__)
coney = Coney(app)

The second possibility is to create the object once and configure the application later to support it:

coney = Coney()

def create_app():
    app = Flask(__name__)
    coney.init_app(app)
    return app

To listen on a queue use:

coney = Coney(app)

@coney.queue(queue_name="test")
def queue_test(ch, method, props, body):
    pass

To publish a message use:

coney = Coney(app)

coney.publish({"test": 1})
Parameters
  • app (Optional[Flask]) – A flask app

  • testing (bool) – Setup testing mode. This will not invoke threads

channel(app=None)

Provides context for a channel.

Example:

with channel(app) as ch:
    ch.basic_publish()
Parameters

app (Optional[Flask]) – A flask app

Return type

Channel

connection(app=None)

Provides context for a connection.

Example:

with connection(app) as c:
    c.channel()
Parameters

app (Optional[Flask]) – A flask app

Return type

BlockingConnection

get_app(reference_app=None)

Helper method that implements the logic to look up an application.

Parameters

reference_app (Optional[Flask]) – A flask app

init_app(app)

This callback can be used to initialize an application for the use with Coney.

publish(body, exchange_name='', routing_key='', durable=False, properties=None, app=None)

Will publish a message

Example:

@app.route('/process'):
def process():
    coney.publish({"text": "process me"})
Parameters
  • body (Union[str, dict]) – Body of the message, either a string or a dict

  • exchange_name (str) – The exchange

  • exchange_type – The type of the exchange

  • routing_key (str) – The routing key

  • durable (bool) – Should the exchange be durable

  • app (Optional[Flask]) – A flask app

publish_sync(body, exchange_name='', routing_key='', properties=None, timeout=10, app=None)

Will publish a message and wait for the response

Example:

# client
@app.route('/concat')
def concat():
    a = request.args.get('a')
    b = request.args.get('b')

    body = {'a': a, 'b': b}
    result = coney.publish_sync(body, routing_key="rpc")
    return result

# server
@queue(queue_name="rpc")
def concat_callback(ch, method, props, body):
    result = body["a"] + body["b"]
    body = {"result": result}
    coney.reply_sync(ch, method, props, body)
Parameters
Raises

SyncTimeoutError: if no message received in timeout

queue(queue_name='', exchange_name='', exchange_type=<ExchangeType.DIRECT: 'direct'>, routing_key=None, routing_keys=None, app=None)

A decorator for consuming a queue. A thread will start in the background, if no other thread for this purpose was already started. There will only be one thread for every queue.

Example:

@coney.queue(queue_name="test")
def queue_test(ch, method, props, body):
    pass

You can also bind the queue to multiple routing keys:

@coney.queue(routing_keys=["first", "second"])
def queue_multiple(ch, method, props, body):
    pass

If routing_keys and a routing_key is provided, they will be combined.

Parameters
  • type – ExchangeType

  • queue_name (str) – Name of the queue

  • exchange_name (str) – Name of the exchange

  • exchange_type (ExchangeType) – Type of the exchange

  • routing_key (Optional[str]) – The routing key

  • routing_keys (Optional[List[str]]) – A list of routing keys

  • app (Optional[Flask]) – A flask app

Return type

Callable

reply_sync(ch, method, properties, body, app=None)

Will reply to a message, which was send by publish_sync()

Example:

@queue(queue_name="rpc")
def concat_callback(ch, method, props, body):
    result = body["a"] + body["b"]
    body = {"result": result}
    coney.reply_sync(ch, method, props, body)

This is a conveniences short hand method for:

@queue(queue_name="rpc")
def concat_callback(ch, method, props, body):
    result = body["a"] + body["b"]
    body = {"result": result}
    self.publish(
        body,
        routing_key=properties.reply_to,
        properties={"correlation_id": properties.correlation_id},
        app=app,
    )
Parameters
class flask_coney.ExchangeType(value)

Defines all possible exchange types

DIRECT = 'direct'

direct exchange

FANOUT = 'fanout'

fanout exchange

HEADERS = 'headers'

headers exchange

TOPIC = 'topic'

topic exchange