nuropb.rmq_api

Module Contents

Classes

RMQAPI

The primary nuropb API.

Data

logger

verbose

API

nuropb.rmq_api.logger

None

nuropb.rmq_api.verbose

False

class nuropb.rmq_api.RMQAPI(amqp_url: str | Dict[str, Any], service_name: str | None = None, instance_id: str | None = None, service_instance: object | None = None, rpc_exchange: Optional[str] = None, events_exchange: Optional[str] = None, transport_settings: Optional[Dict[str, Any]] = None)

Bases: nuropb.interface.NuropbInterface

The primary nuropb API.

When an existing transport initialised and connected, and a subsequent transport instance is connected with the same service_name and instance_id as the first, the broker will shut down the channel of subsequent instances when they attempt to configure their response queue. This is because the response queue is opened in exclusive mode. The exclusive mode is used to ensure that only one consumer (nuropb api connection) is consuming from the response queue.

Deliberately specifying a fixed instance_id, is a valid mechanism to ensure that a service can only run in single instance mode. This is useful for services that are not designed to be run in a distributed manner or where there is specific service configuration required.

Initialization

RMQAPI: A NuropbInterface implementation that uses RabbitMQ as the underlying transport.

Where exchange inputs are none, but they user present in transport_settings, then use the values from transport_settings

_mesh_name: str

None

_connection_name: str

None

_response_futures: Dict[str, nuropb.interface.ResultFutureResponsePayload]

None

_transport: nuropb.rmq_transport.RMQTransport

None

_rpc_exchange: str

None

_events_exchange: str

None

_service_instance: object | None

None

_default_ttl: int

None

_client_only: bool

None

_encryptor: nuropb.encodings.encryption.Encryptor

None

_service_discovery: Dict[str, Any]

None

_service_public_keys: Dict[str, Any]

None

classmethod _get_vhost(amqp_url: str | Dict[str, Any]) str
_service_name

None

Is also a label for the api whether in client or service mode.

property service_name: str

service_name: returns the service_name of the underlying transport

property is_leader: bool
property client_only: bool

client_only: returns the client_only status of the underlying transport

property connected: bool

connected: returns the connection status of the underlying transport

Returns:

bool

property transport: nuropb.rmq_transport.RMQTransport

transport: returns the underlying transport

Returns:

RMQTransport

async connect() None

connect: connects to the underlying transport

Returns:

None

async disconnect() None

disconnect: disconnects from the underlying transport

Returns:

None

receive_transport_message(service_message: nuropb.interface.TransportServicePayload, message_complete_callback: nuropb.interface.MessageCompleteFunction, metadata: Dict[str, Any]) None

receive_transport_message: handles a messages received from the transport layer. Both incoming service messages and response messages pass through this method.

Returns:

None

classmethod _handle_immediate_request_error(rpc_response: bool, payload: nuropb.interface.RequestPayloadDict | nuropb.interface.ResponsePayloadDict, error: Dict[str, Any] | BaseException) nuropb.interface.ResponsePayloadDict
async request(service: str, method: str, params: Dict[str, Any], context: Dict[str, Any], ttl: Optional[int] = None, trace_id: Optional[str] = None, rpc_response: bool = True, encrypted: bool = False) Union[nuropb.interface.ResponsePayloadDict, Any]

Makes a rpc request for a method on a service mesh service and waits until the response is received.

Parameters:
  • service – str, The routing key on the rpc exchange to direct the request to the desired service request queue.

  • method – str, the name of the api call / method on the service

  • params – dict, The method input parameters

  • context – dict The context of the request. This is used to pass information to the service manager and is not used by the transport. Example content includes: - user_id: str # a unique user identifier or token of the user that made the request - correlation_id: str # a unique identifier of the request used to correlate the response to the # request or trace the request over the network (e.g. uuid4 hex string) - service: str - method: str

  • ttl – int optional expiry is the time in milliseconds that the message will be kept on the queue before being moved to the dead letter queue. If None, then the message expiry configured on the transport is used.

  • trace_id – str optional an identifier to trace the request over the network (e.g. uuid4 hex string)

  • rpc_response – bool optional if True (default), the actual response of the RPC call is returned and where there was an error, that is raised as an exception. Where rpc_response is a ResponsePayloadDict, is returned.

  • encrypted – bool if True then the message will be encrypted in transit

Return ResponsePayloadDict | Any:

representing the response from the requested service with any exceptions raised

command(service: str, method: str, params: Dict[str, Any], context: Dict[str, Any], ttl: Optional[int] = None, trace_id: Optional[str] = None, encrypted: bool = False) None

command: sends a command to the target service. I.e. a targeted event. response is not expected and ignored.

Parameters:
  • service – the service name

  • method – the method name

  • params – the method arguments, these must be easily serializable to JSON

  • context – additional information that represent the context in which the request is executed. The must be easily serializable to JSON.

  • ttl – the time to live of the request in milliseconds. After this time and dependent on the underlying transport, it will not be consumed by the target or assumed by the requester to have failed with an undetermined state.

  • trace_id – an identifier to trace the request over the network (e.g. uuid4 hex string)

  • encrypted – bool, if True then the message will be encrypted in transit

Returns:

None

publish_event(topic: str, event: Dict[str, Any], context: Dict[str, Any], trace_id: Optional[str] = None, encrypted: bool = False) None

Broadcasts an event with the given topic.

Parameters:
  • topic – str, The routing key on the events exchange

  • event – json-encodable Python Dict.

  • context

    dict, The context around gent generation, example content includes:

    • str user_id: # a unique user identifier or token of the user that made the request

    • str correlation_id: # a unique identifier of the request used to correlate the response # to the request # or trace the request over the network (e.g. an uuid4 hex string)

    • str service:

    • str method:

  • trace_id – str optional an identifier to trace the request over the network (e.g. an uuid4 hex string)

  • encrypted – bool, if True then the message will be encrypted in transit

async describe_service(service_name: str, refresh: bool = False) Dict[str, Any] | None

describe_service: returns the service information for the given service_name, if it is not already cached or refresh is try then the service discovery is queried directly.

Parameters:
  • service_name – str

  • refresh – bool

Returns:

dict

async requires_encryption(service_name: str, method_name: str) bool

requires_encryption: Queries the service discovery information for the service_name and returns True if encryption is required else False. none of encryption is not required.

Parameters:
  • service_name

  • method_name

Returns:

bool

async has_public_key(service_name: str) bool

service_has_public_key: returns True if the service has a public key registered, else False