nuropb.rmq_transport
Module Contents
Classes
RMQTransport is the base class for the RabbitMQ transport. It wraps the NuroPb service mesh patterns and rules over the AMQP protocol. |
Functions
Map the incoming RabbitMQ message to python compatible dictionary as defined by ServicePayloadDict |
Data
The wait when shutting down consumers before closing the connection |
|
API
- class nuropb.rmq_transport.RabbitMQConfiguration
Bases:
typing.TypedDict- rpc_exchange: str
None
- events_exchange: str
None
- dl_exchange: str
None
- dl_queue: str
None
- service_queue: str
None
- response_queue: str
None
- rpc_bindings: List[str]
None
- event_bindings: List[str]
None
- default_ttl: int
None
- client_only: bool
None
- nuropb.rmq_transport.logger
None
- nuropb.rmq_transport.CONSUMER_CLOSED_WAIT_TIMEOUT
10
The wait when shutting down consumers before closing the connection
- nuropb.rmq_transport._verbose
False
- nuropb.rmq_transport.verbose() bool
- nuropb.rmq_transport.decode_rmq_body(method: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, body: bytes) nuropb.interface.TransportServicePayload
Map the incoming RabbitMQ message to python compatible dictionary as defined by ServicePayloadDict
- exception nuropb.rmq_transport.ServiceNotConfigured
Bases:
ExceptionRaised when a service is not properly configured on the RabbitMQ broker. the leader will be expected to configure the Exchange and service queues
Initialization
Initialize self. See help(type(self)) for accurate signature.
- class nuropb.rmq_transport.RMQTransport(service_name: str, instance_id: str, amqp_url: str | Dict[str, Any], message_callback: nuropb.interface.MessageCallbackFunction, default_ttl: Optional[int] = None, client_only: Optional[bool] = None, encryptor: Optional[nuropb.encodings.encryption.Encryptor] = None, **kwargs: Any)
RMQTransport is the base class for the RabbitMQ transport. It wraps the NuroPb service mesh patterns and rules over the AMQP protocol.
When RabbitMQ closes the connection, this class will stop and alert that reconnection is necessary, in some cases re-connection takes place automatically. Disconnections should be continuously monitored, there are various reasons why a connection or channel may be closed after being successfully opened, and usually related to authentication, permissions, protocol violation or networking.
TODO: Configure the Pika client connection attributes in the pika client properties.
Initialization
Create a new instance of the consumer class, passing in the AMQP URL used to connect to RabbitMQ.
- Parameters:
service_name (str) – The name of the service
instance_id (str) – The instance id of the service
amqp_url (str) –
The AMQP url to connect string or TLS connection details
cafile: str | None
certfile: str | None
keyfile: str | None
verify: bool (default True)
hostname: str
port: int
username: str
password: str
message_callback (MessageCallbackFunction) – The callback to call when a message is received
default_ttl (int) – The default time to live for messages in milliseconds, defaults to 12 hours.
client_only (bool) –
encryptor (Encryptor) – The encryptor to use for encrypting and decrypting messages
kwargs –
Mostly transport configuration options
str rpc_exchange: The name of the RPC exchange
str events_exchange: The name of the events exchange
str dl_exchange: The name of the dead letter exchange
str dl_queue: The name of the dead letter queue
str service_queue: The name of the requests queue
str response_queue: The name of the responses queue
List[str] rpc_bindings: The list of RPC bindings
List[str] event_bindings: The list of events bindings
int prefetch_count: The number of messages to prefetch defaults to 1, unlimited is 0. Experiment with larger values for higher throughput in your user case.
When an existing transport is initialised and connected, and a subsequent transport instance is connected with the same service_name and instance_id as the first, the broker will shut down the channel of subsequent instances when they attempt to configure their response queue. This is because the response queue is opened in exclusive mode. The exclusive mode is used to ensure that only one consumer (nuropb api connection) is consuming from the response queue.
Deliberately specifying a fixed instance_id, is a valid mechanism to ensure that a service can only run in single instance mode. This is useful for services that are not designed to be run in a distributed manner or where there is specific service configuration required.
- _service_name: str
None
- _instance_id: str
None
- _amqp_url: str | Dict[str, Any]
None
- _rpc_exchange: str
None
- _events_exchange: str
None
- _dl_exchange: str
None
- _dl_queue: str
None
- _service_queue: str
None
- _response_queue: str
None
- _rpc_bindings: Set[str]
None
- _event_bindings: Set[str]
None
- _prefetch_count: int
None
- _default_ttl: int
None
- _client_only: bool
None
- _message_callback: nuropb.interface.MessageCallbackFunction
None
- _encryptor: nuropb.encodings.encryption.Encryptor | None
None
- _connected_future: Any
None
- _disconnected_future: Any
None
- _is_leader: bool
None
- _is_rabbitmq_configured: bool
None
- _connection: pika.adapters.asyncio_connection.AsyncioConnection | None
None
- _channel: pika.channel.Channel | None
None
- _consumer_tags: Set[Any]
None
- _consuming: bool
None
- _connecting: bool
None
- _closing: bool
None
- _connected: bool
None
- _was_consuming: bool
None
- property service_name: str
- property instance_id: str
- property amqp_url: str | Dict[str, Any]
- property is_leader: bool
- property connected: bool
connected: returns the connection status of the underlying transport
- Returns:
bool
- property rpc_exchange: str
rpc_exchange: returns the name of the RPC exchange
- Returns:
str
- property events_exchange: str
events_exchange: returns the name of the events exchange
- Returns:
str
- property response_queue: str
response_queue: returns the name of the response queue
- Returns:
str
- property rmq_configuration: nuropb.rmq_transport.RabbitMQConfiguration
rmq_configuration: returns the RabbitMQ configuration
- Returns:
Dict[str, Any]
- configure_rabbitmq(rmq_configuration: Optional[nuropb.rmq_transport.RabbitMQConfiguration] = None, amqp_url: Optional[str | Dict[str, Any]] = None, rmq_api_url: Optional[str] = None) None
configure_rabbitmq: configure the RabbitMQ transport with the provided configuration
if rmq_configuration is None, then the transport will be configured with the configuration provided during the transport’s init().
if the virtual host in the build_amqp_url is not configured, then it will be created.
- Parameters:
rmq_configuration – RabbitMQConfiguration
amqp_url – Optional[str] if not provided then self._amqp_url is used
rmq_api_url – Optional[str] if not provided then it is created amqp_url
- Returns:
None
- async start() None
Start the transport by connecting to RabbitMQ
- async stop() None
Cleanly shutdown the connection to RabbitMQ by stopping the consumer with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok will be invoked by pika, which will then closing the channel and connection. The IOLoop is started again because this method is invoked when CTRL-C is pressed raising a KeyboardInterrupt exception. This exception stops the IOLoop which needs to be running for pika to communicate with RabbitMQ. All commands issued prior to starting the IOLoop will be buffered but not processed.
- connect() asyncio.Future[bool]
This method initiates a connection to RabbitMQ, returning the connection handle. When the connection is established, the on_connection_open method will be invoked by pika.
When the connection and channel is successfully opened, the incoming messages will automatically be handled by _handle_message()
- Return type:
asyncio.Future
- disconnect() Awaitable[bool]
This method closes the connection to RabbitMQ. the pika library events will drive the closing and reconnection process.
- Returns:
asyncio.Future
- on_connection_open(_connection: pika.adapters.asyncio_connection.AsyncioConnection) None
This method is called by pika once the connection to RabbitMQ has been established. It passes the handle to the connection object in case we need it, but in this case, we’ll just mark it unused.
- Parameters:
_connection (pika.adapters.asyncio_connection.AsyncioConnection) – The connection
- on_connection_open_error(conn: pika.adapters.asyncio_connection.AsyncioConnection, reason: Exception) None
This method is called by pika if the connection to RabbitMQ can’t be established.
- Parameters:
conn (pika.adapters.asyncio_connection.AsyncioConnection) –
reason (Exception) – The error
- on_connection_closed(_connection: pika.adapters.asyncio_connection.AsyncioConnection, reason: Exception) None
This method is invoked by pika when the connection to RabbitMQ is closed unexpectedly. Since it is unexpected, we will reconnect to RabbitMQ if it disconnects.
- Parameters:
_connection (pika.connection.Connection) – The closed connection obj
reason (Exception) – exception representing reason for loss of connection.
- open_channel() None
Open a new channel with RabbitMQ by issuing the Channel.Open RPC command. When RabbitMQ responds that the channel is open, the on_channel_open callback will be invoked by pika.
- on_channel_open(channel: pika.channel.Channel) None
This method is invoked by pika when the channel has been opened. The channel object is passed in so that we can make use of it.
- Parameters:
channel (pika.channel.Channel) – The channel object
- on_channel_closed(channel: pika.channel.Channel, reason: Exception) None
Invoked by pika when the channel is closed. Channels are at times close by the broker for various reasons, the most common being protocol violations e.g. acknowledging messages using an invalid message_tag. In most cases when the channel is closed by the broker, nuropb will automatically open a new channel and re-declare the service queue.
In the following cases the channel is not automatically re-opened: * When the connection is closed by this transport API * When the connection is close by the broker 403 (ACCESS_REFUSED): Typically these examples are seen: - “Provided JWT token has expired at timestamp”. In this case the transport will require a fresh JWT token before attempting to reconnect. - queue ‘XXXXXXX’ in vhost ‘YYYYY’ in exclusive use. In this case there is another response queue setup with the same name. Having a fixed response queue name is a valid mechanism to enforce a single instance of a service. * When the connection is close by the broker 403 (NOT_FOUND): Typically where there is an exchange configuration issue.
Always investigate the reasons why a channel is closed and introduce logic to handle that scenario accordingly. It is important to continuously monitor for this condition.
TODO: When there is message processing in flight, and particularly with a prefetch count > 1, then those messages are now not able to be acknowledged. By doing so will result in a forced channel closure by the broker and potentially a poison pill type scenario.
- Parameters:
channel (pika.channel.Channel) – The closed channel
reason (Exception) – why the channel was closed
- declare_service_queue(frame: pika.frame.Method) None
Refresh the request queue on RabbitMQ by invoking the Queue.Declare RPC command. When it is complete, the on_service_queue_declareok method will be invoked by pika.
This call is idempotent and will not fail if the queue already exists.
- on_service_queue_declareok(frame: pika.frame.Method, _userdata: str) None
- declare_response_queue() None
Set up the response queue on RabbitMQ by invoking the Queue.Declare RPC command. When it is complete, the on_response_queue_declareok method will be invoked by pika.
- on_response_queue_declareok(frame: pika.frame.Method, _userdata: str) None
Method invoked by pika when the Queue.Declare RPC call made in setup_response_queue has completed. In this method we will bind request queue and the response queues. When this command is complete, the on_bindok method will be invoked by pika.
No explicit binds required for the response queue as it relies on the default exchange and routing key to the name of the queue.
- Parameters:
frame (pika.frame.Method) – The Queue.DeclareOk frame
_userdata (str|unicode) – Extra user data (queue name)
- on_bindok(_frame: pika.frame.Method, userdata: str) None
Invoked by pika when the Queue.Bind method has completed. At this point we will set the prefetch count for the channel.
- Parameters:
_frame (pika.frame.Method) – The Queue.BindOk response frame
userdata (str|unicode) – Extra user data (queue name)
- on_basic_qos_ok(_frame: pika.frame.Method) None
Invoked by pika when the Basic.QoS method has completed. At this point we will start consuming messages.
A callback is added that will be invoked if RabbitMQ cancels the consumer for some reason. If RabbitMQ does cancel the consumer, on_consumer_cancelled will be invoked by pika.
- Parameters:
_frame (pika.frame.Method) – The Basic.QosOk response frame
- on_consumer_cancelled(method_frame: pika.frame.Method) None
Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer receiving messages.
- Parameters:
method_frame (pika.frame.Method) – The Basic.Cancel frame
- on_message_returned(channel: pika.channel.Channel, method: pika.spec.Basic.Return, properties: pika.spec.BasicProperties, body: bytes) None
Called when message has been rejected by the server.
callable callback: The function to call, having the signature callback(channel, method, properties, body)
- Parameters:
channel – pika.Channel
method – pika.spec.Basic.Deliver
properties – pika.spec.BasicProperties
body – bytes
- send_message(payload: Dict[str, Any], expiry: Optional[int] = None, priority: Optional[int] = None, encoding: str = 'json', encrypted: bool = False) None
Send a message to over the RabbitMQ Transport
TODO: Consider alternative handling when the channel is closed. also refer to the notes in the on_channel_closed method. - Wait and retry on a new channel? - setup a retry queue? - should there be a high water mark for the number of retries? - should new messages not be consumed until the channel is re-established and retry queue drained?- Parameters:
payload (Dict[str, Any]) – The message contents
expiry – The message expiry in milliseconds
priority – The message priority
encoding – The encoding of the message
encrypted – True if the message is to be encrypted
- classmethod acknowledge_service_message(channel: pika.channel.Channel, delivery_tag: int, action: Literal[ack, nack, reject], redelivered: bool) None
Acknowledgement of a service message
In NuroPb, acknowledgements of service message requests have three possible outcomes:
ack: Successfully processed, acknowledged and removed from the queue
nack: A recoverable error occurs, the message is not acknowledged and requeued
reject: An unrecoverable error occurs, the message is not acknowledged and dropped
To prevent accidental use of the redelivered parameter and to ensure system predictability on the Call Again feature, messages are only allowed to be redelivered once and only once. To this end all messages that have redelivered == True will be rejected. if redelivered is overridden with None, it is assumed True.
- Parameters:
channel (pika.channel.Channel) – The channel object
delivery_tag (int) – The delivery tag
action (str) – The action to take, one of ack, nack or reject
redelivered (bool) – True if the message is being requeued / replayed.
- classmethod metadata_metrics(metadata: Dict[str, Any]) None
Invoked by the transport after a service message has been processed.
NOTE - METADATA: keep this metadata in sync with across all these methods: - on_service_message, on_service_message_complete - on_response_message, on_response_message_complete - metadata_metrics
- Parameters:
metadata – information to drive message processing metrics
- Returns:
None
- on_service_message_complete(channel: pika.channel.Channel, basic_deliver: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, private_metadata: Dict[str, Any], response_messages: List[nuropb.interface.TransportRespondPayload], acknowledgement: nuropb.interface.AcknowledgeAction) None
Invoked by the implementation after a service message has been processed.
This is provided to the implementation as a helper function to complete the message flow. The message flow state parameters: channel, delivery_tag, reply_to and private_metadata are hidden from the implementation through the use of functools.partial. The interface of this function as it appears to the implementation is: response_messages: List[TransportRespondPayload] acknowledgement: Literal[“ack”, “nack”, “reject”]
NOTE: The acknowledgement references the incoming service message that resulted in these responses
- Parameters:
channel –
basic_deliver (pika.spec.Basic.Deliver) – basic_deliver method
properties (pika.spec.BasicProperties) – properties
private_metadata – information to drive message processing metrics
response_messages – List[TransportRespondPayload]
acknowledgement –
- Returns:
- on_service_message(queue_name: str, channel: pika.channel.Channel, basic_deliver: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, body: bytes) None
Invoked when a message is delivered to the service_queue.
DESIGN PARAMETERS: - Incoming service messages handling (this) require a deliberate acknowledgement. * Acknowledgements must be sent on the same channel and with the same delivery tag - Incoming response messages (on_response_message) are automatically acknowledged - Not all service messages require a response, e.g. events and commands - Transport must remain agnostic to the handling of the service message
OTHER DESIGN CONSIDERATIONS: - Connections to the RabbitMQ broker should be authenticated, and encrypted using TLS - Payload data not used for NuroPb message handling should be separately encrypted, and specially when containing sensitive data e.g. PI or PII .
FLOW DESIGN:
Python json->dict compatible message is received from the service request (rpc) queue
message is decoded into a python typed dictionary: TransportServicePayload
this message is passed to the message_callback which has no return value, but may raise any type of exception.
message_callback is provided by the implementation and supplied on transport instantiation
message_callback is responsible for handling all exceptions, acknowledgements and appropriate responses
a helper function is provided that will handle message acknowledgement and any responses. the function has all the required state to do so. See note 4 below.
State required to be preserved until the message is acknowledged:
channel + delivery_tag
reply_to
correlation_id
trace_id
incoming decoded message
This state can only exist in-memory as it’s not all “picklable” and bound to the current open channel
If an exception is raised, then the message is immediately rejected and dropped.
(transport) -> on_service_message -> (transport) message_callback(TransportServicePayload, MessageCompleteFunction, Dict[str, Any]) -> (api) message_complete_callback(List[TransportRespondPayload], AcknowledgeAction) -> (transport) acknowledge_service_message(channel, delivery_tag, action) -> (transport) send_response_messages(reply_to, response_messages) -> (transport) metadata_metrics(metadata)
TODO: Needing to think about excessive errors and decide on a strategy for for shutting down
the service instance. Should this take place in the transport layer or the API ?
- what happens to the result if the channel is closed?
- What happens if the request is resent, can we leverage the existing result
- what happens if the request is resent to another service worker?
- what happens to the request sender waiting for a response?
NOTES:
Call Again: When a message is nack’d and requeued, there is no current way to track how many times this may have occurred for the same message. To ensure stability, behaviour predictability and to limit the abuse of patterns, the use of NuropbCallAgain is limited to once and only once. This is enforced at the transport layer. For RabbitMQ if the incoming message is a requeued message, the basic_deliver.redelivered is True. Call again for all redelivered messages will be rejected.
- Parameters:
queue_name (str) – The name of the queue that the message was received on
channel (pika.channel.Channel) – The channel object
basic_deliver (pika.spec.Basic.Deliver) – basic_deliver method
properties (pika.spec.BasicProperties) – properties
body (bytes) – The message body
- on_response_message_complete(channel: pika.channel.Channel, basic_deliver: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, private_metadata: Dict[str, Any], response_messages: List[nuropb.interface.TransportRespondPayload], acknowledgement: nuropb.interface.AcknowledgeAction) None
Invoked by the implementation after a service message has been processed.
This is provided to the implementation as a helper function to complete the message flow. The message flow state parameters: channel, delivery_tag, reply_to and private_metadata are hidden from the implementation through the use of functools.partial. The interface of this function as it appears to the implementation is: response_messages: List[TransportRespondPayload] acknowledgement: Literal[“ack”, “nack”, “reject”]
NOTE: The acknowledgement references the incoming service message that resulted in these responses
- Parameters:
channel –
basic_deliver (pika.spec.Basic.Deliver) – basic_deliver method
properties (pika.spec.BasicProperties) – properties
private_metadata – information to drive message processing metrics
response_messages – List[TransportRespondPayload]
acknowledgement –
- Returns:
- on_response_message(_queue_name: str, channel: pika.channel.Channel, basic_deliver: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, body: bytes) None
Invoked when a message is delivered to the response_queue. The channel is passed for your convenience. The basic_deliver object that is passed in carries the exchange, routing key, delivery tag and a redelivered flag for the message. The properties passed in is an instance of BasicProperties with the message properties and the body is the message that was sent.
- Parameters:
_queue_name (str) – The name of the queue that the message was received on
channel (pika.channel.Channel) – The channel object
basic_deliver (pika.spec.Basic.Deliver) – basic_deliver
properties (pika.spec.BasicProperties) – properties
body (bytes) – The message body
- async stop_consuming() None
Tell RabbitMQ that you would like to stop consuming by sending the Basic.Cancel RPC command.
- close_channel() None
Call to close the channel with RabbitMQ cleanly by issuing the Channel.Close RPC command.