nuropb.rmq_lib

RabbitMQ Utility library for NuroPb

Module Contents

Functions

build_amqp_url

Creates an AMQP URL for connecting to RabbitMQ

build_rmq_api_url

Creates an HTTP URL for connecting to RabbitMQ management API

rmq_api_url_from_amqp_url

Creates an HTTP URL for connecting to RabbitMQ management API from an AMQP URL

get_client_connection_properties

Returns the client connection properties for the transport

get_connection_parameters

Return the connection parameters for the transport

management_api_session_info

Creates a requests session for connecting to RabbitMQ management API

blocking_rabbitmq_channel

Useful for initialisation of queues / exchanges.

configure_nuropb_rmq

Configure the RabbitMQ broker for this transport.

nack_message

nack_message: nack the message and requeue it, there was likely a recoverable problem with this instance while processing the message

reject_message

reject_message: If the message is not a request, then reject the message and move on

ack_message

ack_message: ack the message

get_virtual_host_queues

Creates a virtual host on the RabbitMQ server using the REST API

get_virtual_hosts

Creates a virtual host on the RabbitMQ server using the REST API

create_virtual_host

Creates a virtual host on the RabbitMQ server using the REST API

delete_virtual_host

Deletes a virtual host on the RabbitMQ server using the REST API

Data

logger

API

nuropb.rmq_lib.logger

None

nuropb.rmq_lib.build_amqp_url(host: str, port: str | int, username: str, password: str, vhost: str, scheme: str = 'amqp') str

Creates an AMQP URL for connecting to RabbitMQ

nuropb.rmq_lib.build_rmq_api_url(scheme: str, host: str, port: str | int, username: str | None, password: str | None) str

Creates an HTTP URL for connecting to RabbitMQ management API

nuropb.rmq_lib.rmq_api_url_from_amqp_url(amqp_url: str, scheme: Optional[str] = None, port: Optional[int | str] = None) str

Creates an HTTP URL for connecting to RabbitMQ management API from an AMQP URL

Parameters:
  • amqp_url – the AMQP URL to use

  • scheme – the scheme to use, defaults to http

  • port – the port to use, defaults to 15672

Returns:

the RabbitMQ management API URL

nuropb.rmq_lib.get_client_connection_properties(name: Optional[str] = None, instance_id: Optional[str] = None, client_only: Optional[bool] = None) Dict[str, str]

Returns the client connection properties for the transport

nuropb.rmq_lib.get_connection_parameters(amqp_url: str | Dict[str, Any], name: Optional[str] = None, instance_id: Optional[str] = None, client_only: Optional[bool] = None, **overrides: Any) pika.ConnectionParameters | pika.URLParameters

Return the connection parameters for the transport

Parameters:
  • amqp_url – the AMQP URL or connection parameters to use

  • name – the name of the service or client

  • instance_id – the instance id of the service or client

  • client_only

  • overrides – additional keyword arguments to override the connection parameters

nuropb.rmq_lib.management_api_session_info(scheme: str, host: str, port: str | int, username: Optional[str] = None, password: Optional[str] = None, bearer_token: Optional[str] = None, verify: bool = False, **headers: Any) Dict[str, Any]

Creates a requests session for connecting to RabbitMQ management API

Parameters:
  • scheme – http or https

  • host – the host name or ip address of the RabbitMQ server

  • port – the port number of the RabbitMQ server

  • username – the username to use for authentication

  • password – the password to use for authentication

  • bearer_token – the bearer token to use for authentication

  • verify – whether to verify the SSL certificate

Returns:

a requests session

nuropb.rmq_lib.blocking_rabbitmq_channel(rmq_url: str | Dict[str, Any]) pika.channel.Channel

Useful for initialisation of queues / exchanges.

nuropb.rmq_lib.configure_nuropb_rmq(rmq_url: str | Dict[str, Any], events_exchange: str, rpc_exchange: str, dl_exchange: str, dl_queue: str, **kwargs: Any) bool

Configure the RabbitMQ broker for this transport.

Calls to this function are IDEMPOTENT. However, previously named exchanges, queues, and declared bindings are not be removed. These will have to be done manually as part of broker housekeeping. This is to prevent accidental removal of queues and exchanges. It is safe to call this function multiple times and while other services are running, as it will not re-declare exchanges, queues, or bindings that already exist.

PRODUCTION AUTHORISATION NOTE: The RabbitMQ user used to connect to the broker must have the following permissions:

  • configure: .*

  • write: .*

  • read: .*

  • access to the vhost Client only applications and services should not have configuration permissions. For completeness, there may be specific implementation need to for a client only services to register service queue bindings, for example a client only service that is also a gateway or proxy service. In this case, the treating it as a service is the correct approach.

Settings for Exchange and default dead letter configuration apply to all services that use the same RabbitMQ broker. The rpc and event bindings are exclusive to the service, and are not shared with other services.

The response queues are not durable, and are auto-deleted when the connections close. This approach is taken as response queues are only used for RPC responses, and there is no need to keep and have to handle stale responses.

There is experimental work underway using etcd to manage the runtime configuration of a service leader and service followers. This will allow for persistent response queues and other configuration settings to be shared across multiple instances of the same service. This is not yet ready for production use. Experimentation scope:

  • service leader election

  • named instances of a service each with their own persistent response queue

  • Notification and handling of dead letter messages relating to a service

Parameters:
  • rmq_url (str) – The URL of the RabbitMQ broker

  • events_exchange (str) – The name of the events exchange

  • rpc_exchange (str) – The name of the RPC exchange

  • dl_exchange (str) – The name of the dead letter exchange

  • dl_queue (str) – The name of the dead letter queue

  • kwargs

    Additional keyword argument overflow from the transport settings.

    • client_only: bool - True if this is a client only service, False otherwise

Returns:

True if the RabbitMQ broker was configured successfully

nuropb.rmq_lib.nack_message(channel: pika.channel.Channel, delivery_tag: int, properties: pika.spec.BasicProperties, mesg: nuropb.interface.PayloadDict | None, error: Exception | None = None) None

nack_message: nack the message and requeue it, there was likely a recoverable problem with this instance while processing the message

nuropb.rmq_lib.reject_message(channel: pika.channel.Channel, delivery_tag: int, properties: pika.spec.BasicProperties, mesg: nuropb.interface.PayloadDict | None, error: Exception | None = None) None

reject_message: If the message is not a request, then reject the message and move on

nuropb.rmq_lib.ack_message(channel: pika.channel.Channel, delivery_tag: int, properties: pika.spec.BasicProperties, mesg: nuropb.interface.PayloadDict | None, error: Exception | None = None) None

ack_message: ack the message

nuropb.rmq_lib.get_virtual_host_queues(api_url: str, vhost_url: str) Any | None

Creates a virtual host on the RabbitMQ server using the REST API

Parameters:
  • api_url – the url to the RabbitMQ API

  • vhost_url – the virtual host to create

Returns:

None

nuropb.rmq_lib.get_virtual_hosts(api_url: str, vhost_url: str | Dict[str, Any]) Any | None

Creates a virtual host on the RabbitMQ server using the REST API

Parameters:
  • api_url – the url to the RabbitMQ API

  • vhost_url – the virtual host to create

Returns:

None

nuropb.rmq_lib.create_virtual_host(api_url: str, vhost_url: str | Dict[str, Any]) None

Creates a virtual host on the RabbitMQ server using the REST API

Parameters:
  • api_url – the url to the RabbitMQ API

  • vhost_url – the virtual host to create

Returns:

None

nuropb.rmq_lib.delete_virtual_host(api_url: str, vhost_url: str | Dict[str, Any]) None

Deletes a virtual host on the RabbitMQ server using the REST API

Parameters:
  • api_url – the url to the RabbitMQ API

  • vhost_url – the virtual host to delete

Returns:

None