nuropb.rmq_lib
RabbitMQ Utility library for NuroPb
Module Contents
Functions
Creates an AMQP URL for connecting to RabbitMQ |
|
Creates an HTTP URL for connecting to RabbitMQ management API |
|
Creates an HTTP URL for connecting to RabbitMQ management API from an AMQP URL |
|
Returns the client connection properties for the transport |
|
Return the connection parameters for the transport |
|
Creates a requests session for connecting to RabbitMQ management API |
|
Useful for initialisation of queues / exchanges. |
|
Configure the RabbitMQ broker for this transport. |
|
nack_message: nack the message and requeue it, there was likely a recoverable problem with this instance while processing the message |
|
reject_message: If the message is not a request, then reject the message and move on |
|
ack_message: ack the message |
|
Creates a virtual host on the RabbitMQ server using the REST API |
|
Creates a virtual host on the RabbitMQ server using the REST API |
|
Creates a virtual host on the RabbitMQ server using the REST API |
|
Deletes a virtual host on the RabbitMQ server using the REST API |
Data
API
- nuropb.rmq_lib.logger
None
- nuropb.rmq_lib.build_amqp_url(host: str, port: str | int, username: str, password: str, vhost: str, scheme: str = 'amqp') str
Creates an AMQP URL for connecting to RabbitMQ
- nuropb.rmq_lib.build_rmq_api_url(scheme: str, host: str, port: str | int, username: str | None, password: str | None) str
Creates an HTTP URL for connecting to RabbitMQ management API
- nuropb.rmq_lib.rmq_api_url_from_amqp_url(amqp_url: str, scheme: Optional[str] = None, port: Optional[int | str] = None) str
Creates an HTTP URL for connecting to RabbitMQ management API from an AMQP URL
- Parameters:
amqp_url – the AMQP URL to use
scheme – the scheme to use, defaults to http
port – the port to use, defaults to 15672
- Returns:
the RabbitMQ management API URL
- nuropb.rmq_lib.get_client_connection_properties(name: Optional[str] = None, instance_id: Optional[str] = None, client_only: Optional[bool] = None) Dict[str, str]
Returns the client connection properties for the transport
- nuropb.rmq_lib.get_connection_parameters(amqp_url: str | Dict[str, Any], name: Optional[str] = None, instance_id: Optional[str] = None, client_only: Optional[bool] = None, **overrides: Any) pika.ConnectionParameters | pika.URLParameters
Return the connection parameters for the transport
- Parameters:
amqp_url – the AMQP URL or connection parameters to use
name – the name of the service or client
instance_id – the instance id of the service or client
client_only –
overrides – additional keyword arguments to override the connection parameters
- nuropb.rmq_lib.management_api_session_info(scheme: str, host: str, port: str | int, username: Optional[str] = None, password: Optional[str] = None, bearer_token: Optional[str] = None, verify: bool = False, **headers: Any) Dict[str, Any]
Creates a requests session for connecting to RabbitMQ management API
- Parameters:
scheme – http or https
host – the host name or ip address of the RabbitMQ server
port – the port number of the RabbitMQ server
username – the username to use for authentication
password – the password to use for authentication
bearer_token – the bearer token to use for authentication
verify – whether to verify the SSL certificate
- Returns:
a requests session
- nuropb.rmq_lib.blocking_rabbitmq_channel(rmq_url: str | Dict[str, Any]) pika.channel.Channel
Useful for initialisation of queues / exchanges.
- nuropb.rmq_lib.configure_nuropb_rmq(rmq_url: str | Dict[str, Any], events_exchange: str, rpc_exchange: str, dl_exchange: str, dl_queue: str, **kwargs: Any) bool
Configure the RabbitMQ broker for this transport.
Calls to this function are IDEMPOTENT. However, previously named exchanges, queues, and declared bindings are not be removed. These will have to be done manually as part of broker housekeeping. This is to prevent accidental removal of queues and exchanges. It is safe to call this function multiple times and while other services are running, as it will not re-declare exchanges, queues, or bindings that already exist.
PRODUCTION AUTHORISATION NOTE: The RabbitMQ user used to connect to the broker must have the following permissions:
configure: .*
write: .*
read: .*
access to the vhost Client only applications and services should not have configuration permissions. For completeness, there may be specific implementation need to for a client only services to register service queue bindings, for example a client only service that is also a gateway or proxy service. In this case, the treating it as a service is the correct approach.
Settings for Exchange and default dead letter configuration apply to all services that use the same RabbitMQ broker. The rpc and event bindings are exclusive to the service, and are not shared with other services.
The response queues are not durable, and are auto-deleted when the connections close. This approach is taken as response queues are only used for RPC responses, and there is no need to keep and have to handle stale responses.
There is experimental work underway using etcd to manage the runtime configuration of a service leader and service followers. This will allow for persistent response queues and other configuration settings to be shared across multiple instances of the same service. This is not yet ready for production use. Experimentation scope:
service leader election
named instances of a service each with their own persistent response queue
Notification and handling of dead letter messages relating to a service
- Parameters:
rmq_url (str) – The URL of the RabbitMQ broker
events_exchange (str) – The name of the events exchange
rpc_exchange (str) – The name of the RPC exchange
dl_exchange (str) – The name of the dead letter exchange
dl_queue (str) – The name of the dead letter queue
kwargs –
Additional keyword argument overflow from the transport settings.
client_only: bool - True if this is a client only service, False otherwise
- Returns:
True if the RabbitMQ broker was configured successfully
- nuropb.rmq_lib.nack_message(channel: pika.channel.Channel, delivery_tag: int, properties: pika.spec.BasicProperties, mesg: nuropb.interface.PayloadDict | None, error: Exception | None = None) None
nack_message: nack the message and requeue it, there was likely a recoverable problem with this instance while processing the message
- nuropb.rmq_lib.reject_message(channel: pika.channel.Channel, delivery_tag: int, properties: pika.spec.BasicProperties, mesg: nuropb.interface.PayloadDict | None, error: Exception | None = None) None
reject_message: If the message is not a request, then reject the message and move on
- nuropb.rmq_lib.ack_message(channel: pika.channel.Channel, delivery_tag: int, properties: pika.spec.BasicProperties, mesg: nuropb.interface.PayloadDict | None, error: Exception | None = None) None
ack_message: ack the message
- nuropb.rmq_lib.get_virtual_host_queues(api_url: str, vhost_url: str) Any | None
Creates a virtual host on the RabbitMQ server using the REST API
- Parameters:
api_url – the url to the RabbitMQ API
vhost_url – the virtual host to create
- Returns:
None
- nuropb.rmq_lib.get_virtual_hosts(api_url: str, vhost_url: str | Dict[str, Any]) Any | None
Creates a virtual host on the RabbitMQ server using the REST API
- Parameters:
api_url – the url to the RabbitMQ API
vhost_url – the virtual host to create
- Returns:
None
- nuropb.rmq_lib.create_virtual_host(api_url: str, vhost_url: str | Dict[str, Any]) None
Creates a virtual host on the RabbitMQ server using the REST API
- Parameters:
api_url – the url to the RabbitMQ API
vhost_url – the virtual host to create
- Returns:
None
- nuropb.rmq_lib.delete_virtual_host(api_url: str, vhost_url: str | Dict[str, Any]) None
Deletes a virtual host on the RabbitMQ server using the REST API
- Parameters:
api_url – the url to the RabbitMQ API
vhost_url – the virtual host to delete
- Returns:
None