nuropb.rmq_api
Module Contents
Classes
The primary nuropb API. |
Data
API
- nuropb.rmq_api.logger
None
- nuropb.rmq_api.verbose
False
- class nuropb.rmq_api.RMQAPI(amqp_url: str | Dict[str, Any], service_name: str | None = None, instance_id: str | None = None, service_instance: object | None = None, rpc_exchange: Optional[str] = None, events_exchange: Optional[str] = None, transport_settings: Optional[Dict[str, Any]] = None)
Bases:
nuropb.interface.NuropbInterfaceThe primary nuropb API.
When an existing transport initialised and connected, and a subsequent transport instance is connected with the same service_name and instance_id as the first, the broker will shut down the channel of subsequent instances when they attempt to configure their response queue. This is because the response queue is opened in exclusive mode. The exclusive mode is used to ensure that only one consumer (nuropb api connection) is consuming from the response queue.
Deliberately specifying a fixed instance_id, is a valid mechanism to ensure that a service can only run in single instance mode. This is useful for services that are not designed to be run in a distributed manner or where there is specific service configuration required.
Initialization
RMQAPI: A NuropbInterface implementation that uses RabbitMQ as the underlying transport.
Where exchange inputs are none, but they user present in transport_settings, then use the values from transport_settings
- _mesh_name: str
None
- _connection_name: str
None
- _response_futures: Dict[str, nuropb.interface.ResultFutureResponsePayload]
None
- _transport: nuropb.rmq_transport.RMQTransport
None
- _rpc_exchange: str
None
- _events_exchange: str
None
- _service_instance: object | None
None
- _default_ttl: int
None
- _client_only: bool
None
- _encryptor: nuropb.encodings.encryption.Encryptor
None
- _service_discovery: Dict[str, Any]
None
- _service_public_keys: Dict[str, Any]
None
- classmethod _get_vhost(amqp_url: str | Dict[str, Any]) str
- _service_name
None
Is also a label for the api whether in client or service mode.
- property service_name: str
service_name: returns the service_name of the underlying transport
- property is_leader: bool
- property client_only: bool
client_only: returns the client_only status of the underlying transport
- property connected: bool
connected: returns the connection status of the underlying transport
- Returns:
bool
- property transport: nuropb.rmq_transport.RMQTransport
transport: returns the underlying transport
- Returns:
RMQTransport
- async connect() None
connect: connects to the underlying transport
- Returns:
None
- async disconnect() None
disconnect: disconnects from the underlying transport
- Returns:
None
- receive_transport_message(service_message: nuropb.interface.TransportServicePayload, message_complete_callback: nuropb.interface.MessageCompleteFunction, metadata: Dict[str, Any]) None
receive_transport_message: handles a messages received from the transport layer. Both incoming service messages and response messages pass through this method.
- Returns:
None
- classmethod _handle_immediate_request_error(rpc_response: bool, payload: nuropb.interface.RequestPayloadDict | nuropb.interface.ResponsePayloadDict, error: Dict[str, Any] | BaseException) nuropb.interface.ResponsePayloadDict
- async request(service: str, method: str, params: Dict[str, Any], context: Dict[str, Any], ttl: Optional[int] = None, trace_id: Optional[str] = None, rpc_response: bool = True, encrypted: bool = False) Union[nuropb.interface.ResponsePayloadDict, Any]
Makes a rpc request for a method on a service mesh service and waits until the response is received.
- Parameters:
service – str, The routing key on the rpc exchange to direct the request to the desired service request queue.
method – str, the name of the api call / method on the service
params – dict, The method input parameters
context – dict The context of the request. This is used to pass information to the service manager and is not used by the transport. Example content includes: - user_id: str # a unique user identifier or token of the user that made the request - correlation_id: str # a unique identifier of the request used to correlate the response to the # request or trace the request over the network (e.g. uuid4 hex string) - service: str - method: str
ttl – int optional expiry is the time in milliseconds that the message will be kept on the queue before being moved to the dead letter queue. If None, then the message expiry configured on the transport is used.
trace_id – str optional an identifier to trace the request over the network (e.g. uuid4 hex string)
rpc_response – bool optional if True (default), the actual response of the RPC call is returned and where there was an error, that is raised as an exception. Where rpc_response is a ResponsePayloadDict, is returned.
encrypted – bool if True then the message will be encrypted in transit
- Return ResponsePayloadDict | Any:
representing the response from the requested service with any exceptions raised
- command(service: str, method: str, params: Dict[str, Any], context: Dict[str, Any], ttl: Optional[int] = None, trace_id: Optional[str] = None, encrypted: bool = False) None
command: sends a command to the target service. I.e. a targeted event. response is not expected and ignored.
- Parameters:
service – the service name
method – the method name
params – the method arguments, these must be easily serializable to JSON
context – additional information that represent the context in which the request is executed. The must be easily serializable to JSON.
ttl – the time to live of the request in milliseconds. After this time and dependent on the underlying transport, it will not be consumed by the target or assumed by the requester to have failed with an undetermined state.
trace_id – an identifier to trace the request over the network (e.g. uuid4 hex string)
encrypted – bool, if True then the message will be encrypted in transit
- Returns:
None
- publish_event(topic: str, event: Dict[str, Any], context: Dict[str, Any], trace_id: Optional[str] = None, encrypted: bool = False) None
Broadcasts an event with the given topic.
- Parameters:
topic – str, The routing key on the events exchange
event – json-encodable Python Dict.
context –
dict, The context around gent generation, example content includes:
str user_id: # a unique user identifier or token of the user that made the request
str correlation_id: # a unique identifier of the request used to correlate the response # to the request # or trace the request over the network (e.g. an uuid4 hex string)
str service:
str method:
trace_id – str optional an identifier to trace the request over the network (e.g. an uuid4 hex string)
encrypted – bool, if True then the message will be encrypted in transit
- async describe_service(service_name: str, refresh: bool = False) Dict[str, Any] | None
describe_service: returns the service information for the given service_name, if it is not already cached or refresh is try then the service discovery is queried directly.
- Parameters:
service_name – str
refresh – bool
- Returns:
dict
- async requires_encryption(service_name: str, method_name: str) bool
requires_encryption: Queries the service discovery information for the service_name and returns True if encryption is required else False. none of encryption is not required.
- Parameters:
service_name –
method_name –
- Returns:
bool
- async has_public_key(service_name: str) bool
service_has_public_key: returns True if the service has a public key registered, else False