nuropb.rmq_transport

Module Contents

Classes

RabbitMQConfiguration

RMQTransport

RMQTransport is the base class for the RabbitMQ transport. It wraps the NuroPb service mesh patterns and rules over the AMQP protocol.

Functions

verbose

decode_rmq_body

Map the incoming RabbitMQ message to python compatible dictionary as defined by ServicePayloadDict

Data

logger

CONSUMER_CLOSED_WAIT_TIMEOUT

The wait when shutting down consumers before closing the connection

_verbose

API

class nuropb.rmq_transport.RabbitMQConfiguration

Bases: typing.TypedDict

rpc_exchange: str

None

events_exchange: str

None

dl_exchange: str

None

dl_queue: str

None

service_queue: str

None

response_queue: str

None

rpc_bindings: List[str]

None

event_bindings: List[str]

None

default_ttl: int

None

client_only: bool

None

nuropb.rmq_transport.logger

None

nuropb.rmq_transport.CONSUMER_CLOSED_WAIT_TIMEOUT

10

The wait when shutting down consumers before closing the connection

nuropb.rmq_transport._verbose

False

nuropb.rmq_transport.verbose() bool
nuropb.rmq_transport.decode_rmq_body(method: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, body: bytes) nuropb.interface.TransportServicePayload

Map the incoming RabbitMQ message to python compatible dictionary as defined by ServicePayloadDict

exception nuropb.rmq_transport.ServiceNotConfigured

Bases: Exception

Raised when a service is not properly configured on the RabbitMQ broker. the leader will be expected to configure the Exchange and service queues

Initialization

Initialize self. See help(type(self)) for accurate signature.

class nuropb.rmq_transport.RMQTransport(service_name: str, instance_id: str, amqp_url: str | Dict[str, Any], message_callback: nuropb.interface.MessageCallbackFunction, default_ttl: Optional[int] = None, client_only: Optional[bool] = None, encryptor: Optional[nuropb.encodings.encryption.Encryptor] = None, **kwargs: Any)

RMQTransport is the base class for the RabbitMQ transport. It wraps the NuroPb service mesh patterns and rules over the AMQP protocol.

When RabbitMQ closes the connection, this class will stop and alert that reconnection is necessary, in some cases re-connection takes place automatically. Disconnections should be continuously monitored, there are various reasons why a connection or channel may be closed after being successfully opened, and usually related to authentication, permissions, protocol violation or networking.

TODO: Configure the Pika client connection attributes in the pika client properties.

Initialization

Create a new instance of the consumer class, passing in the AMQP URL used to connect to RabbitMQ.

Parameters:
  • service_name (str) – The name of the service

  • instance_id (str) – The instance id of the service

  • amqp_url (str) –

    The AMQP url to connect string or TLS connection details

    • cafile: str | None

    • certfile: str | None

    • keyfile: str | None

    • verify: bool (default True)

    • hostname: str

    • port: int

    • username: str

    • password: str

  • message_callback (MessageCallbackFunction) – The callback to call when a message is received

  • default_ttl (int) – The default time to live for messages in milliseconds, defaults to 12 hours.

  • client_only (bool) –

  • encryptor (Encryptor) – The encryptor to use for encrypting and decrypting messages

  • kwargs

    Mostly transport configuration options

    • str rpc_exchange: The name of the RPC exchange

    • str events_exchange: The name of the events exchange

    • str dl_exchange: The name of the dead letter exchange

    • str dl_queue: The name of the dead letter queue

    • str service_queue: The name of the requests queue

    • str response_queue: The name of the responses queue

    • List[str] rpc_bindings: The list of RPC bindings

    • List[str] event_bindings: The list of events bindings

    • int prefetch_count: The number of messages to prefetch defaults to 1, unlimited is 0. Experiment with larger values for higher throughput in your user case.

    When an existing transport is initialised and connected, and a subsequent transport instance is connected with the same service_name and instance_id as the first, the broker will shut down the channel of subsequent instances when they attempt to configure their response queue. This is because the response queue is opened in exclusive mode. The exclusive mode is used to ensure that only one consumer (nuropb api connection) is consuming from the response queue.

    Deliberately specifying a fixed instance_id, is a valid mechanism to ensure that a service can only run in single instance mode. This is useful for services that are not designed to be run in a distributed manner or where there is specific service configuration required.

_service_name: str

None

_instance_id: str

None

_amqp_url: str | Dict[str, Any]

None

_rpc_exchange: str

None

_events_exchange: str

None

_dl_exchange: str

None

_dl_queue: str

None

_service_queue: str

None

_response_queue: str

None

_rpc_bindings: Set[str]

None

_event_bindings: Set[str]

None

_prefetch_count: int

None

_default_ttl: int

None

_client_only: bool

None

_message_callback: nuropb.interface.MessageCallbackFunction

None

_encryptor: nuropb.encodings.encryption.Encryptor | None

None

_connected_future: Any

None

_disconnected_future: Any

None

_is_leader: bool

None

_is_rabbitmq_configured: bool

None

_connection: pika.adapters.asyncio_connection.AsyncioConnection | None

None

_channel: pika.channel.Channel | None

None

_consumer_tags: Set[Any]

None

_consuming: bool

None

_connecting: bool

None

_closing: bool

None

_connected: bool

None

_was_consuming: bool

None

property service_name: str
property instance_id: str
property amqp_url: str | Dict[str, Any]
property is_leader: bool
property connected: bool

connected: returns the connection status of the underlying transport

Returns:

bool

property rpc_exchange: str

rpc_exchange: returns the name of the RPC exchange

Returns:

str

property events_exchange: str

events_exchange: returns the name of the events exchange

Returns:

str

property response_queue: str

response_queue: returns the name of the response queue

Returns:

str

property rmq_configuration: nuropb.rmq_transport.RabbitMQConfiguration

rmq_configuration: returns the RabbitMQ configuration

Returns:

Dict[str, Any]

configure_rabbitmq(rmq_configuration: Optional[nuropb.rmq_transport.RabbitMQConfiguration] = None, amqp_url: Optional[str | Dict[str, Any]] = None, rmq_api_url: Optional[str] = None) None

configure_rabbitmq: configure the RabbitMQ transport with the provided configuration

if rmq_configuration is None, then the transport will be configured with the configuration provided during the transport’s init().

if the virtual host in the build_amqp_url is not configured, then it will be created.

Parameters:
  • rmq_configuration – RabbitMQConfiguration

  • amqp_url – Optional[str] if not provided then self._amqp_url is used

  • rmq_api_url – Optional[str] if not provided then it is created amqp_url

Returns:

None

async start() None

Start the transport by connecting to RabbitMQ

async stop() None

Cleanly shutdown the connection to RabbitMQ by stopping the consumer with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok will be invoked by pika, which will then closing the channel and connection. The IOLoop is started again because this method is invoked when CTRL-C is pressed raising a KeyboardInterrupt exception. This exception stops the IOLoop which needs to be running for pika to communicate with RabbitMQ. All commands issued prior to starting the IOLoop will be buffered but not processed.

connect() asyncio.Future[bool]

This method initiates a connection to RabbitMQ, returning the connection handle. When the connection is established, the on_connection_open method will be invoked by pika.

When the connection and channel is successfully opened, the incoming messages will automatically be handled by _handle_message()

Return type:

asyncio.Future

disconnect() Awaitable[bool]

This method closes the connection to RabbitMQ. the pika library events will drive the closing and reconnection process.

Returns:

asyncio.Future

on_connection_open(_connection: pika.adapters.asyncio_connection.AsyncioConnection) None

This method is called by pika once the connection to RabbitMQ has been established. It passes the handle to the connection object in case we need it, but in this case, we’ll just mark it unused.

Parameters:

_connection (pika.adapters.asyncio_connection.AsyncioConnection) – The connection

on_connection_open_error(conn: pika.adapters.asyncio_connection.AsyncioConnection, reason: Exception) None

This method is called by pika if the connection to RabbitMQ can’t be established.

Parameters:
  • conn (pika.adapters.asyncio_connection.AsyncioConnection) –

  • reason (Exception) – The error

on_connection_closed(_connection: pika.adapters.asyncio_connection.AsyncioConnection, reason: Exception) None

This method is invoked by pika when the connection to RabbitMQ is closed unexpectedly. Since it is unexpected, we will reconnect to RabbitMQ if it disconnects.

Parameters:
  • _connection (pika.connection.Connection) – The closed connection obj

  • reason (Exception) – exception representing reason for loss of connection.

open_channel() None

Open a new channel with RabbitMQ by issuing the Channel.Open RPC command. When RabbitMQ responds that the channel is open, the on_channel_open callback will be invoked by pika.

on_channel_open(channel: pika.channel.Channel) None

This method is invoked by pika when the channel has been opened. The channel object is passed in so that we can make use of it.

Parameters:

channel (pika.channel.Channel) – The channel object

on_channel_closed(channel: pika.channel.Channel, reason: Exception) None

Invoked by pika when the channel is closed. Channels are at times close by the broker for various reasons, the most common being protocol violations e.g. acknowledging messages using an invalid message_tag. In most cases when the channel is closed by the broker, nuropb will automatically open a new channel and re-declare the service queue.

In the following cases the channel is not automatically re-opened: * When the connection is closed by this transport API * When the connection is close by the broker 403 (ACCESS_REFUSED): Typically these examples are seen: - “Provided JWT token has expired at timestamp”. In this case the transport will require a fresh JWT token before attempting to reconnect. - queue ‘XXXXXXX’ in vhost ‘YYYYY’ in exclusive use. In this case there is another response queue setup with the same name. Having a fixed response queue name is a valid mechanism to enforce a single instance of a service. * When the connection is close by the broker 403 (NOT_FOUND): Typically where there is an exchange configuration issue.

Always investigate the reasons why a channel is closed and introduce logic to handle that scenario accordingly. It is important to continuously monitor for this condition.

TODO: When there is message processing in flight, and particularly with a prefetch count > 1, then those messages are now not able to be acknowledged. By doing so will result in a forced channel closure by the broker and potentially a poison pill type scenario.

Parameters:
  • channel (pika.channel.Channel) – The closed channel

  • reason (Exception) – why the channel was closed

declare_service_queue(frame: pika.frame.Method) None

Refresh the request queue on RabbitMQ by invoking the Queue.Declare RPC command. When it is complete, the on_service_queue_declareok method will be invoked by pika.

This call is idempotent and will not fail if the queue already exists.

on_service_queue_declareok(frame: pika.frame.Method, _userdata: str) None
declare_response_queue() None

Set up the response queue on RabbitMQ by invoking the Queue.Declare RPC command. When it is complete, the on_response_queue_declareok method will be invoked by pika.

on_response_queue_declareok(frame: pika.frame.Method, _userdata: str) None

Method invoked by pika when the Queue.Declare RPC call made in setup_response_queue has completed. In this method we will bind request queue and the response queues. When this command is complete, the on_bindok method will be invoked by pika.

No explicit binds required for the response queue as it relies on the default exchange and routing key to the name of the queue.

Parameters:
  • frame (pika.frame.Method) – The Queue.DeclareOk frame

  • _userdata (str|unicode) – Extra user data (queue name)

on_bindok(_frame: pika.frame.Method, userdata: str) None

Invoked by pika when the Queue.Bind method has completed. At this point we will set the prefetch count for the channel.

Parameters:
  • _frame (pika.frame.Method) – The Queue.BindOk response frame

  • userdata (str|unicode) – Extra user data (queue name)

on_basic_qos_ok(_frame: pika.frame.Method) None

Invoked by pika when the Basic.QoS method has completed. At this point we will start consuming messages.

A callback is added that will be invoked if RabbitMQ cancels the consumer for some reason. If RabbitMQ does cancel the consumer, on_consumer_cancelled will be invoked by pika.

Parameters:

_frame (pika.frame.Method) – The Basic.QosOk response frame

on_consumer_cancelled(method_frame: pika.frame.Method) None

Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer receiving messages.

Parameters:

method_frame (pika.frame.Method) – The Basic.Cancel frame

on_message_returned(channel: pika.channel.Channel, method: pika.spec.Basic.Return, properties: pika.spec.BasicProperties, body: bytes) None

Called when message has been rejected by the server.

callable callback: The function to call, having the signature callback(channel, method, properties, body)

Parameters:
  • channel – pika.Channel

  • method – pika.spec.Basic.Deliver

  • properties – pika.spec.BasicProperties

  • body – bytes

send_message(payload: Dict[str, Any], expiry: Optional[int] = None, priority: Optional[int] = None, encoding: str = 'json', encrypted: bool = False) None

Send a message to over the RabbitMQ Transport

TODO: Consider alternative handling when the channel is closed.
    also refer to the notes in the on_channel_closed method.
    - Wait and retry on a new channel?
    - setup a retry queue?
    - should there be a high water mark for the number of retries?
    - should new messages not be consumed until the channel is re-established and retry queue drained?
Parameters:
  • payload (Dict[str, Any]) – The message contents

  • expiry – The message expiry in milliseconds

  • priority – The message priority

  • encoding – The encoding of the message

  • encrypted – True if the message is to be encrypted

classmethod acknowledge_service_message(channel: pika.channel.Channel, delivery_tag: int, action: Literal[ack, nack, reject], redelivered: bool) None

Acknowledgement of a service message

In NuroPb, acknowledgements of service message requests have three possible outcomes:

  • ack: Successfully processed, acknowledged and removed from the queue

  • nack: A recoverable error occurs, the message is not acknowledged and requeued

  • reject: An unrecoverable error occurs, the message is not acknowledged and dropped

To prevent accidental use of the redelivered parameter and to ensure system predictability on the Call Again feature, messages are only allowed to be redelivered once and only once. To this end all messages that have redelivered == True will be rejected. if redelivered is overridden with None, it is assumed True.

Parameters:
  • channel (pika.channel.Channel) – The channel object

  • delivery_tag (int) – The delivery tag

  • action (str) – The action to take, one of ack, nack or reject

  • redelivered (bool) – True if the message is being requeued / replayed.

classmethod metadata_metrics(metadata: Dict[str, Any]) None

Invoked by the transport after a service message has been processed.

NOTE - METADATA: keep this metadata in sync with across all these methods: - on_service_message, on_service_message_complete - on_response_message, on_response_message_complete - metadata_metrics

Parameters:

metadata – information to drive message processing metrics

Returns:

None

on_service_message_complete(channel: pika.channel.Channel, basic_deliver: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, private_metadata: Dict[str, Any], response_messages: List[nuropb.interface.TransportRespondPayload], acknowledgement: nuropb.interface.AcknowledgeAction) None

Invoked by the implementation after a service message has been processed.

This is provided to the implementation as a helper function to complete the message flow. The message flow state parameters: channel, delivery_tag, reply_to and private_metadata are hidden from the implementation through the use of functools.partial. The interface of this function as it appears to the implementation is: response_messages: List[TransportRespondPayload] acknowledgement: Literal[“ack”, “nack”, “reject”]

NOTE: The acknowledgement references the incoming service message that resulted in these responses

Parameters:
  • channel

  • basic_deliver (pika.spec.Basic.Deliver) – basic_deliver method

  • properties (pika.spec.BasicProperties) – properties

  • private_metadata – information to drive message processing metrics

  • response_messages – List[TransportRespondPayload]

  • acknowledgement

Returns:

on_service_message(queue_name: str, channel: pika.channel.Channel, basic_deliver: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, body: bytes) None

Invoked when a message is delivered to the service_queue.

DESIGN PARAMETERS: - Incoming service messages handling (this) require a deliberate acknowledgement. * Acknowledgements must be sent on the same channel and with the same delivery tag - Incoming response messages (on_response_message) are automatically acknowledged - Not all service messages require a response, e.g. events and commands - Transport must remain agnostic to the handling of the service message

OTHER DESIGN CONSIDERATIONS: - Connections to the RabbitMQ broker should be authenticated, and encrypted using TLS - Payload data not used for NuroPb message handling should be separately encrypted, and specially when containing sensitive data e.g. PI or PII .

FLOW DESIGN:

  1. Python json->dict compatible message is received from the service request (rpc) queue

  2. message is decoded into a python typed dictionary: TransportServicePayload

  3. this message is passed to the message_callback which has no return value, but may raise any type of exception.

    • message_callback is provided by the implementation and supplied on transport instantiation

    • message_callback is responsible for handling all exceptions, acknowledgements and appropriate responses

    • a helper function is provided that will handle message acknowledgement and any responses. the function has all the required state to do so. See note 4 below.

  4. State required to be preserved until the message is acknowledged:

    • channel + delivery_tag

    • reply_to

    • correlation_id

    • trace_id

    • incoming decoded message

  5. This state can only exist in-memory as it’s not all “picklable” and bound to the current open channel

  6. If an exception is raised, then the message is immediately rejected and dropped.

(transport) -> on_service_message -> (transport) message_callback(TransportServicePayload, MessageCompleteFunction, Dict[str, Any]) -> (api) message_complete_callback(List[TransportRespondPayload], AcknowledgeAction) -> (transport) acknowledge_service_message(channel, delivery_tag, action) -> (transport) send_response_messages(reply_to, response_messages) -> (transport) metadata_metrics(metadata)

TODO: Needing to think about excessive errors and decide on a strategy for for shutting down

the service instance. Should this take place in the transport layer or the API ?

- what happens to the result if the channel is closed?

- What happens if the request is resent, can we leverage the existing result

- what happens if the request is resent to another service worker?

- what happens to the request sender waiting for a response?

NOTES:

  • Call Again: When a message is nack’d and requeued, there is no current way to track how many times this may have occurred for the same message. To ensure stability, behaviour predictability and to limit the abuse of patterns, the use of NuropbCallAgain is limited to once and only once. This is enforced at the transport layer. For RabbitMQ if the incoming message is a requeued message, the basic_deliver.redelivered is True. Call again for all redelivered messages will be rejected.

Parameters:
  • queue_name (str) – The name of the queue that the message was received on

  • channel (pika.channel.Channel) – The channel object

  • basic_deliver (pika.spec.Basic.Deliver) – basic_deliver method

  • properties (pika.spec.BasicProperties) – properties

  • body (bytes) – The message body

on_response_message_complete(channel: pika.channel.Channel, basic_deliver: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, private_metadata: Dict[str, Any], response_messages: List[nuropb.interface.TransportRespondPayload], acknowledgement: nuropb.interface.AcknowledgeAction) None

Invoked by the implementation after a service message has been processed.

This is provided to the implementation as a helper function to complete the message flow. The message flow state parameters: channel, delivery_tag, reply_to and private_metadata are hidden from the implementation through the use of functools.partial. The interface of this function as it appears to the implementation is: response_messages: List[TransportRespondPayload] acknowledgement: Literal[“ack”, “nack”, “reject”]

NOTE: The acknowledgement references the incoming service message that resulted in these responses

Parameters:
  • channel

  • basic_deliver (pika.spec.Basic.Deliver) – basic_deliver method

  • properties (pika.spec.BasicProperties) – properties

  • private_metadata – information to drive message processing metrics

  • response_messages – List[TransportRespondPayload]

  • acknowledgement

Returns:

on_response_message(_queue_name: str, channel: pika.channel.Channel, basic_deliver: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, body: bytes) None

Invoked when a message is delivered to the response_queue. The channel is passed for your convenience. The basic_deliver object that is passed in carries the exchange, routing key, delivery tag and a redelivered flag for the message. The properties passed in is an instance of BasicProperties with the message properties and the body is the message that was sent.

Parameters:
  • _queue_name (str) – The name of the queue that the message was received on

  • channel (pika.channel.Channel) – The channel object

  • basic_deliver (pika.spec.Basic.Deliver) – basic_deliver

  • properties (pika.spec.BasicProperties) – properties

  • body (bytes) – The message body

async stop_consuming() None

Tell RabbitMQ that you would like to stop consuming by sending the Basic.Cancel RPC command.

close_channel() None

Call to close the channel with RabbitMQ cleanly by issuing the Channel.Close RPC command.