nuropb.interface
Module Contents
Classes
For compatibility with better futureproof serialisation support, Any encoded payload type is supported.It is encouraged to use a json compatible key/value Type e.g. Dict[str, Any] |
|
Type[RequestPayloadDict]: represents a request that is sent to a service: A request has a response, it is acknowledged by the transport layer after the destination service has handled the request. |
|
Type[CommandPayloadDict]: represents a command that is sent to a service: A command has no response, it is acked immediately by the transport layer. The originator of the command has now knowledge of the execution of the command. If a response is required, a request should be used. |
|
Type[EventPayloadDict]: represents an event that is published to a topic: |
|
Type[ResponsePayloadDict]: represents a response to a request: |
|
Type[TransportServicePayload]: represents valid service instruction encoded payload. Depending on the transport implementation, there wire encoding and serialization may be different, and some of the fields may be in the body or header of the message. |
|
Type[TransportRespondPayload]: represents valid service response message, valid nuropb encoded payload types are ResponsePayloadDict, and EventPayloadDict |
|
NuropbInterface: represents the interface that must be implemented by a nuropb API implementation |
Data
AcknowledgeCallbackFunction: represents a callable with the inputs: |
|
MessageCompleteFunction: represents a callable with the inputs: |
|
MessageCallbackFunction: represents a callable with the inputs: |
|
ConnectionCallbackFunction: represents a callable with the inputs: |
API
- nuropb.interface.logger
None
- nuropb.interface.NUROPB_VERSION
‘0.1.8’
- nuropb.interface.NUROPB_PROTOCOL_VERSION
‘0.1.1’
- nuropb.interface.NUROPB_PROTOCOL_VERSIONS_SUPPORTED
(‘0.1.1’,)
- nuropb.interface.NUROPB_MESSAGE_TYPES
(‘request’, ‘response’, ‘event’, ‘command’)
- nuropb.interface.NuropbSerializeType
None
- nuropb.interface.NuropbMessageType
None
- nuropb.interface.NuropbLifecycleState
None
- class nuropb.interface.ErrorDescriptionType[source]
Bases:
typing.TypedDict- error: str
None
- description: Optional[str]
None
- context: Optional[Dict[str, Any]]
None
- class nuropb.interface.EventType[source]
Bases:
typing.TypedDictFor compatibility with better futureproof serialisation support, Any encoded payload type is supported.It is encouraged to use a json compatible key/value Type e.g. Dict[str, Any]
- Target:
is currently provided here as an aid for the implementation, there are use cases where events are targeted to a specified audience or list or targets. In the NuroPb paradigm, targets could be individual users or other services. A service would represent the service as a whole, NOT any individual instance of that service.
It is also advised not to use NuroPb for communication between instances of a service.
Reference the notes on EventPayloadDict
Initialization
Initialize self. See help(type(self)) for accurate signature.
- topic: str
None
- payload: Any
None
- target: Optional[List[Any]]
None
- class nuropb.interface.RequestPayloadDict[source]
Bases:
typing.TypedDictType[RequestPayloadDict]: represents a request that is sent to a service: A request has a response, it is acknowledged by the transport layer after the destination service has handled the request.
REMINDER FOR FUTURE: It is very tempting to support the Tuple[…,Any] for the param key. There is much broader downstream compatibility in keeping this as a dictionary / key-value mapping.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- tag: Literal[request]
None
- correlation_id: str
None
- context: Dict[str, Any]
None
- trace_id: Optional[str]
None
- service: str
None
- method: str
None
- params: Dict[str, Any]
None
- class nuropb.interface.CommandPayloadDict[source]
Bases:
typing.TypedDictType[CommandPayloadDict]: represents a command that is sent to a service: A command has no response, it is acked immediately by the transport layer. The originator of the command has now knowledge of the execution of the command. If a response is required, a request should be used.
A command is useful when used as a type of directed event.
REMINDER FOR FUTURE: It is very tempting to support the Tuple[…,Any] for the param key. There is much broader downstream compatibility in keeping this as a dictionary / key-value mapping.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- tag: Literal[command]
None
- correlation_id: str
None
- context: Dict[str, Any]
None
- trace_id: Optional[str]
None
- service: str
None
- method: str
None
- params: Dict[str, Any]
None
- class nuropb.interface.EventPayloadDict[source]
Bases:
typing.TypedDictType[EventPayloadDict]: represents an event that is published to a topic:
- Target:
is currently provided here as an aid for the implementation, there are use cases where events are targeted to a specified audience or list or targets. In the NuroPb paradigm, targets could be individual users or other services. A service would represent the service as a whole, NOT any individual instance of that service.
Reference the notes on EventType
Initialization
Initialize self. See help(type(self)) for accurate signature.
- tag: Literal[nuropb.interface.EventPayloadDict.event]
None
- correlation_id: str
None
- context: Dict[str, Any]
None
- trace_id: Optional[str]
None
- topic: str
None
- event: Any
None
- target: Optional[List[Any]]
None
- class nuropb.interface.ResponsePayloadDict[source]
Bases:
typing.TypedDictType[ResponsePayloadDict]: represents a response to a request:
Initialization
Initialize self. See help(type(self)) for accurate signature.
- tag: Literal[response]
None
- correlation_id: str
None
- context: Dict[str, Any]
None
- trace_id: Optional[str]
None
- result: Any
None
- error: Optional[Dict[str, Any]]
None
- warning: Optional[str]
None
- reply_to: str
None
- nuropb.interface.PayloadDict
None
- nuropb.interface.ServicePayloadTypes
None
- nuropb.interface.ResponsePayloadTypes
None
- class nuropb.interface.TransportServicePayload[source]
Bases:
typing.TypedDictType[TransportServicePayload]: represents valid service instruction encoded payload. Depending on the transport implementation, there wire encoding and serialization may be different, and some of the fields may be in the body or header of the message.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- nuropb_protocol: str
None
- correlation_id: str
None
- trace_id: Optional[str]
None
- ttl: Optional[int]
None
- nuropb_type: nuropb.interface.NuropbMessageType
None
- nuropb_payload: Dict[str, Any]
None
- class nuropb.interface.TransportRespondPayload[source]
Bases:
typing.TypedDictType[TransportRespondPayload]: represents valid service response message, valid nuropb encoded payload types are ResponsePayloadDict, and EventPayloadDict
Initialization
Initialize self. See help(type(self)) for accurate signature.
- nuropb_protocol: str
None
- correlation_id: str
None
- trace_id: Optional[str]
None
- ttl: Optional[int]
None
- nuropb_type: nuropb.interface.NuropbMessageType
None
- nuropb_payload: nuropb.interface.ResponsePayloadTypes
None
- nuropb.interface.ResultFutureResponsePayload
None
- nuropb.interface.ResultFutureAny
None
- nuropb.interface.AcknowledgeAction
None
- nuropb.interface.AcknowledgeCallbackFunction
None
AcknowledgeCallbackFunction: represents a callable with the inputs:
action: AcknowledgeAction # one of “ack”, “nack”, “reject”
- nuropb.interface.MessageCompleteFunction
None
MessageCompleteFunction: represents a callable with the inputs:
response_messages: List[TransportRespondPayload] # the responses to be sent
acknowledge_action: AcknowledgeAction # one of “ack”, “nack”, “reject”
- nuropb.interface.MessageCallbackFunction
None
MessageCallbackFunction: represents a callable with the inputs:
message: TransportServicePayload # the decoded message
message_complete: Optional[AcknowledgeCallbackFunction] # a function that is called to acknowledge the message
metadata: Dict[str: Any] # the context of the message
- nuropb.interface.ConnectionCallbackFunction
None
ConnectionCallbackFunction: represents a callable with the inputs:
instance: type of NuropbInterface
status: str # the status of the connection (connected, disconnected)
reason: str # the reason for the connection status change
- exception nuropb.interface.NuropbException(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]
Bases:
ExceptionNuropbException: represents a base exception for all exceptions raised by the nuropb API although the input parameters are optional, it is recommended that the message is set to a meaningful value and the nuropb_message is set to the values that were present when the exception was raised.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- description: str
None
- payload: nuropb.interface.PayloadDict | nuropb.interface.TransportServicePayload | nuropb.interface.TransportRespondPayload | Dict[str, Any] | None
None
- exception: BaseException | None
None
- exception nuropb.interface.NuropbTimeoutError(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]
Bases:
nuropb.interface.NuropbExceptionNuropbTimeoutError: represents an error that occurred when a timeout was reached.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- exception nuropb.interface.NuropbTransportError(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None, close_connection: bool = False)[source]
Bases:
nuropb.interface.NuropbExceptionNuropbTransportError: represents an error that inside the plumbing.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- _close_connection: bool
None
- property close_connection: bool
close_connection: returns True if the connection should be closed
- exception nuropb.interface.NuropbMessageError(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]
Bases:
nuropb.interface.NuropbExceptionNuropbMessageError: represents an error that occurred during the encoding or decoding of a message.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- exception nuropb.interface.NuropbHandlingError(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]
Bases:
nuropb.interface.NuropbExceptionNuropbHandlingError: represents an error that occurred during the execution or fulfilment of a request or command. An error response is returned to the requester.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- exception nuropb.interface.NuropbDeprecatedError(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]
Bases:
nuropb.interface.NuropbHandlingErrorNuropbDeprecatedError: represents an error that occurred during the execution or fulfilment of a request, command or event topic that has been marked deprecated.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- exception nuropb.interface.NuropbValidationError(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]
Bases:
nuropb.interface.NuropbExceptionNuropbValidationError: represents an error that occurred during the validation of a request or command. An error response is returned to the requester.
An error response is returned to the requester ONLY for requests and commands. Events will be rejected with a NACK with requeue=False.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- exception nuropb.interface.NuropbAuthenticationError(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]
Bases:
nuropb.interface.NuropbExceptionNuropbAuthenticationError: when this exception is raised, the transport layer will ACK the message and return an authentication error response to the requester.
This exception occurs whe the identity of the requester can not be validated. for example an unknown, invalid or expired user identifier or auth token.
In most cases, the requester will not be able to recover from this error and will need provide valid credentials and retry the request. The approach to this retry outside the scope of the nuropb API.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- exception nuropb.interface.NuropbAuthorizationError(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]
Bases:
nuropb.interface.NuropbExceptionNuropbAuthorizationError: when this exception is raised, the transport layer will ACK the message and return an authorization error response to the requester.
This exception occurs whe the requester does not have the required privileges to perform the requested action of either a request or command.
In most cases, the requester will not be able to recover from this error and will need provide valid credentials and retry the request. The approach to this retry outside the scope of the nuropb API.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- exception nuropb.interface.NuropbNotDeliveredError(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]
Bases:
nuropb.interface.NuropbExceptionNuropbNotDeliveredError: when this exception is raised, the transport layer will ACK the message and return an error response to the requester.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- exception nuropb.interface.NuropbCallAgainReject(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]
Bases:
nuropb.interface.NuropbExceptionNuropbCallAgainReject: when this exception is raised, the transport layer will REJECT the message
To prevent accidental use of the redelivered parameter and to ensure system predictability on the Call Again feature, messages are only allowed to be redelivered once and only once. To this end all messages that have redelivered == True will be rejected.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- exception nuropb.interface.NuropbCallAgain(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]
Bases:
nuropb.interface.NuropbExceptionNuropbCallAgain: when this exception is raised, the transport layer will NACK the message and schedule it to be redelivered. The delay is determined by the transport layer or message broker. A call again will result in a forced repeated call of the original message, with the same correlation_id and trace_id.
The call again “feature” is ignored for event service messages and response messages, as these are acked in all cases. The call again feature by implication is only supported for request and command messages.
WARNING: with request messages, if a response has been returned, then this pattern SHOULD NOT be used. The requester will receive the same response again, which will be ignored as an unpaired response. if the underlying service method has no idempotence guarantees, the service could end up in an inconsistent state.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- exception nuropb.interface.NuropbSuccess(result: Any, description: Optional[str] = None, payload: Optional[nuropb.interface.ResponsePayloadDict] = None, events: Optional[List[nuropb.interface.EventType]] = None)[source]
Bases:
nuropb.interface.NuropbExceptionNuropbSuccessError: when this exception is raised, the transport layer will ACK the message and return a success response if service encoded payload is a ‘request’. This is useful when the request is a command or event and is executed asynchronously.
There are some use cases where the service may want to return a success response irrespective to the handling of the request.
A useful example is to short circuit processing when an outcome can be predetermined from the inputs alone. For end to end request-response consistency, this class must be instantiated with ResponsePayloadDict that contains a result consistent with the method and inputs provided.
Another use case is for the transmission of events raised during the execution of an event, command or request. Events will only be sent to the transports layer after the successful processing of a service message.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- result: Any
None
- payload: nuropb.interface.ResponsePayloadDict | None
None
- events: List[nuropb.interface.EventType]
[]
- class nuropb.interface.NuropbInterface[source]
Bases:
abc.ABCNuropbInterface: represents the interface that must be implemented by a nuropb API implementation
- _service_name: str
None
- _instance_id: str
None
- _service_instance: object
None
- property service_name: str
service_name: returns the service name
- property instance_id: str
instance_id: returns the instance id
- abstract async connect() None[source]
connect: waits for the underlying transport to connect, an exception is raised if the connection fails to be established
- Returns:
None
- abstract async disconnect() None[source]
disconnect: disconnects from the underlying transport
- Returns:
None
- abstract property connected: bool
connected: returns the connection status of the underlying transport
- Returns:
bool
- abstract property is_leader: bool
is_leader: returns the leader status of the service instance
- Returns:
bool
- abstract receive_transport_message(service_message: nuropb.interface.TransportServicePayload, message_complete_callback: nuropb.interface.MessageCompleteFunction, metadata: Dict[str, Any]) None[source]
handle_message: does the processing of a NuroPb message received from the transport layer.
All response, request, command and event messages received from the transport layer are handled here.
For failures service messages are handled, other than for events, a response including details of the error is returned to the flow originator.
- Parameters:
service_message – TransportServicePayload
message_complete_callback – MessageCompleteFunction
metadata – Dict[str, Any] - metric gathering information
- Returns:
None
- abstract async request(service: str, method: str, params: Dict[str, Any], context: Dict[str, Any], ttl: Optional[int] = None, trace_id: Optional[str] = None, rpc_response: bool = True) Union[nuropb.interface.ResponsePayloadDict, Any][source]
request: sends a request to the target service and waits for a response. It is up to the implementation to manage message idempotency and message delivery guarantees.
- Parameters:
service – the service name
method – the method name
params – the method arguments, these must be easily serializable to JSON
context – additional information that represent the context in which the request is executed. The must be easily serializable to JSON.
ttl – the time to live of the request in milliseconds. After this time and dependent on the state and underlying transport, it will not be consumed by the target service and should be assumed by the requester to have failed with an undetermined state.
trace_id – an identifier to trace the request over the network (e.g. uuid4 hex string)
rpc_response – if True (default), the actual response of the RPC call is returned and where there was an error, that is raised as an exception. Where rpc_response is a ResponsePayloadDict, it is returned.
- Returns:
ResponsePayloadDict
- abstract command(service: str, method: str, params: Dict[str, Any], context: Dict[str, Any], ttl: Optional[int] = None, trace_id: Optional[str] = None) None[source]
command: sends a command to the target service. It is up to the implementation to manage message idempotency and message delivery guarantees.
any response from the target service is ignored.
- Parameters:
service – the service name
method – the method name
params – the method arguments, these must be easily serializable to JSON
context – additional information that represent the context in which the request is executed. The must be easily serializable to JSON.
ttl – the time to live of the request in milliseconds. After this time and dependent on the underlying transport, it will not be consumed by the target or assumed by the requester to have failed with an undetermined state.
trace_id – an identifier to trace the request over the network (e.g. uuid4 hex string)
- Returns:
None
- abstract publish_event(topic: str, event: Any, context: Dict[str, Any], trace_id: Optional[str] = None) None[source]
publish_event: publishes an event to the transport layer. the event sender should not have any open transaction that is waiting for a response from this message. It is up to the implementation to manage message idempotency and message delivery guarantees.
- Parameters:
topic – the topic to publish to
event – the message to publish, must be easily serializable to JSON
context – additional information that represent the context in which the event is executed. The must be easily serializable to JSON.
trace_id – an identifier to trace the request over the network (e.g. uuid4 hex string)
- Returns:
None