nuropb.interface

Module Contents

Classes

ErrorDescriptionType

EventType

For compatibility with better futureproof serialisation support, Any encoded payload type is supported.It is encouraged to use a json compatible key/value Type e.g. Dict[str, Any]

RequestPayloadDict

Type[RequestPayloadDict]: represents a request that is sent to a service: A request has a response, it is acknowledged by the transport layer after the destination service has handled the request.

CommandPayloadDict

Type[CommandPayloadDict]: represents a command that is sent to a service: A command has no response, it is acked immediately by the transport layer. The originator of the command has now knowledge of the execution of the command. If a response is required, a request should be used.

EventPayloadDict

Type[EventPayloadDict]: represents an event that is published to a topic:

ResponsePayloadDict

Type[ResponsePayloadDict]: represents a response to a request:

TransportServicePayload

Type[TransportServicePayload]: represents valid service instruction encoded payload. Depending on the transport implementation, there wire encoding and serialization may be different, and some of the fields may be in the body or header of the message.

TransportRespondPayload

Type[TransportRespondPayload]: represents valid service response message, valid nuropb encoded payload types are ResponsePayloadDict, and EventPayloadDict

NuropbInterface

NuropbInterface: represents the interface that must be implemented by a nuropb API implementation

Data

logger

NUROPB_VERSION

NUROPB_PROTOCOL_VERSION

NUROPB_PROTOCOL_VERSIONS_SUPPORTED

NUROPB_MESSAGE_TYPES

NuropbSerializeType

NuropbMessageType

NuropbLifecycleState

PayloadDict

ServicePayloadTypes

ResponsePayloadTypes

ResultFutureResponsePayload

ResultFutureAny

AcknowledgeAction

AcknowledgeCallbackFunction

AcknowledgeCallbackFunction: represents a callable with the inputs:

MessageCompleteFunction

MessageCompleteFunction: represents a callable with the inputs:

MessageCallbackFunction

MessageCallbackFunction: represents a callable with the inputs:

ConnectionCallbackFunction

ConnectionCallbackFunction: represents a callable with the inputs:

API

nuropb.interface.logger

None

nuropb.interface.NUROPB_VERSION

‘0.1.8’

nuropb.interface.NUROPB_PROTOCOL_VERSION

‘0.1.1’

nuropb.interface.NUROPB_PROTOCOL_VERSIONS_SUPPORTED

(‘0.1.1’,)

nuropb.interface.NUROPB_MESSAGE_TYPES

(‘request’, ‘response’, ‘event’, ‘command’)

nuropb.interface.NuropbSerializeType

None

nuropb.interface.NuropbMessageType

None

nuropb.interface.NuropbLifecycleState

None

class nuropb.interface.ErrorDescriptionType[source]

Bases: typing.TypedDict

error: str

None

description: Optional[str]

None

context: Optional[Dict[str, Any]]

None

class nuropb.interface.EventType[source]

Bases: typing.TypedDict

For compatibility with better futureproof serialisation support, Any encoded payload type is supported.It is encouraged to use a json compatible key/value Type e.g. Dict[str, Any]

Target:

is currently provided here as an aid for the implementation, there are use cases where events are targeted to a specified audience or list or targets. In the NuroPb paradigm, targets could be individual users or other services. A service would represent the service as a whole, NOT any individual instance of that service.

It is also advised not to use NuroPb for communication between instances of a service.

Reference the notes on EventPayloadDict

Initialization

Initialize self. See help(type(self)) for accurate signature.

topic: str

None

payload: Any

None

target: Optional[List[Any]]

None

class nuropb.interface.RequestPayloadDict[source]

Bases: typing.TypedDict

Type[RequestPayloadDict]: represents a request that is sent to a service: A request has a response, it is acknowledged by the transport layer after the destination service has handled the request.

REMINDER FOR FUTURE: It is very tempting to support the Tuple[…,Any] for the param key. There is much broader downstream compatibility in keeping this as a dictionary / key-value mapping.

Initialization

Initialize self. See help(type(self)) for accurate signature.

tag: Literal[request]

None

correlation_id: str

None

context: Dict[str, Any]

None

trace_id: Optional[str]

None

service: str

None

method: str

None

params: Dict[str, Any]

None

class nuropb.interface.CommandPayloadDict[source]

Bases: typing.TypedDict

Type[CommandPayloadDict]: represents a command that is sent to a service: A command has no response, it is acked immediately by the transport layer. The originator of the command has now knowledge of the execution of the command. If a response is required, a request should be used.

A command is useful when used as a type of directed event.

REMINDER FOR FUTURE: It is very tempting to support the Tuple[…,Any] for the param key. There is much broader downstream compatibility in keeping this as a dictionary / key-value mapping.

Initialization

Initialize self. See help(type(self)) for accurate signature.

tag: Literal[command]

None

correlation_id: str

None

context: Dict[str, Any]

None

trace_id: Optional[str]

None

service: str

None

method: str

None

params: Dict[str, Any]

None

class nuropb.interface.EventPayloadDict[source]

Bases: typing.TypedDict

Type[EventPayloadDict]: represents an event that is published to a topic:

Target:

is currently provided here as an aid for the implementation, there are use cases where events are targeted to a specified audience or list or targets. In the NuroPb paradigm, targets could be individual users or other services. A service would represent the service as a whole, NOT any individual instance of that service.

Reference the notes on EventType

Initialization

Initialize self. See help(type(self)) for accurate signature.

tag: Literal[nuropb.interface.EventPayloadDict.event]

None

correlation_id: str

None

context: Dict[str, Any]

None

trace_id: Optional[str]

None

topic: str

None

event: Any

None

target: Optional[List[Any]]

None

class nuropb.interface.ResponsePayloadDict[source]

Bases: typing.TypedDict

Type[ResponsePayloadDict]: represents a response to a request:

Initialization

Initialize self. See help(type(self)) for accurate signature.

tag: Literal[response]

None

correlation_id: str

None

context: Dict[str, Any]

None

trace_id: Optional[str]

None

result: Any

None

error: Optional[Dict[str, Any]]

None

warning: Optional[str]

None

reply_to: str

None

nuropb.interface.PayloadDict

None

nuropb.interface.ServicePayloadTypes

None

nuropb.interface.ResponsePayloadTypes

None

class nuropb.interface.TransportServicePayload[source]

Bases: typing.TypedDict

Type[TransportServicePayload]: represents valid service instruction encoded payload. Depending on the transport implementation, there wire encoding and serialization may be different, and some of the fields may be in the body or header of the message.

Initialization

Initialize self. See help(type(self)) for accurate signature.

nuropb_protocol: str

None

correlation_id: str

None

trace_id: Optional[str]

None

ttl: Optional[int]

None

nuropb_type: nuropb.interface.NuropbMessageType

None

nuropb_payload: Dict[str, Any]

None

class nuropb.interface.TransportRespondPayload[source]

Bases: typing.TypedDict

Type[TransportRespondPayload]: represents valid service response message, valid nuropb encoded payload types are ResponsePayloadDict, and EventPayloadDict

Initialization

Initialize self. See help(type(self)) for accurate signature.

nuropb_protocol: str

None

correlation_id: str

None

trace_id: Optional[str]

None

ttl: Optional[int]

None

nuropb_type: nuropb.interface.NuropbMessageType

None

nuropb_payload: nuropb.interface.ResponsePayloadTypes

None

nuropb.interface.ResultFutureResponsePayload

None

nuropb.interface.ResultFutureAny

None

nuropb.interface.AcknowledgeAction

None

nuropb.interface.AcknowledgeCallbackFunction

None

AcknowledgeCallbackFunction: represents a callable with the inputs:

  • action: AcknowledgeAction # one of “ack”, “nack”, “reject”

nuropb.interface.MessageCompleteFunction

None

MessageCompleteFunction: represents a callable with the inputs:

  • response_messages: List[TransportRespondPayload] # the responses to be sent

  • acknowledge_action: AcknowledgeAction # one of “ack”, “nack”, “reject”

nuropb.interface.MessageCallbackFunction

None

MessageCallbackFunction: represents a callable with the inputs:

  • message: TransportServicePayload # the decoded message

  • message_complete: Optional[AcknowledgeCallbackFunction] # a function that is called to acknowledge the message

  • metadata: Dict[str: Any] # the context of the message

nuropb.interface.ConnectionCallbackFunction

None

ConnectionCallbackFunction: represents a callable with the inputs:

  • instance: type of NuropbInterface

  • status: str # the status of the connection (connected, disconnected)

  • reason: str # the reason for the connection status change

exception nuropb.interface.NuropbException(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]

Bases: Exception

NuropbException: represents a base exception for all exceptions raised by the nuropb API although the input parameters are optional, it is recommended that the message is set to a meaningful value and the nuropb_message is set to the values that were present when the exception was raised.

Initialization

Initialize self. See help(type(self)) for accurate signature.

description: str

None

payload: nuropb.interface.PayloadDict | nuropb.interface.TransportServicePayload | nuropb.interface.TransportRespondPayload | Dict[str, Any] | None

None

exception: BaseException | None

None

to_dict() Dict[str, Any][source]
exception nuropb.interface.NuropbTimeoutError(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]

Bases: nuropb.interface.NuropbException

NuropbTimeoutError: represents an error that occurred when a timeout was reached.

Initialization

Initialize self. See help(type(self)) for accurate signature.

exception nuropb.interface.NuropbTransportError(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None, close_connection: bool = False)[source]

Bases: nuropb.interface.NuropbException

NuropbTransportError: represents an error that inside the plumbing.

Initialization

Initialize self. See help(type(self)) for accurate signature.

_close_connection: bool

None

property close_connection: bool

close_connection: returns True if the connection should be closed

exception nuropb.interface.NuropbMessageError(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]

Bases: nuropb.interface.NuropbException

NuropbMessageError: represents an error that occurred during the encoding or decoding of a message.

Initialization

Initialize self. See help(type(self)) for accurate signature.

exception nuropb.interface.NuropbHandlingError(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]

Bases: nuropb.interface.NuropbException

NuropbHandlingError: represents an error that occurred during the execution or fulfilment of a request or command. An error response is returned to the requester.

Initialization

Initialize self. See help(type(self)) for accurate signature.

exception nuropb.interface.NuropbDeprecatedError(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]

Bases: nuropb.interface.NuropbHandlingError

NuropbDeprecatedError: represents an error that occurred during the execution or fulfilment of a request, command or event topic that has been marked deprecated.

Initialization

Initialize self. See help(type(self)) for accurate signature.

exception nuropb.interface.NuropbValidationError(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]

Bases: nuropb.interface.NuropbException

NuropbValidationError: represents an error that occurred during the validation of a request or command. An error response is returned to the requester.

An error response is returned to the requester ONLY for requests and commands. Events will be rejected with a NACK with requeue=False.

Initialization

Initialize self. See help(type(self)) for accurate signature.

exception nuropb.interface.NuropbAuthenticationError(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]

Bases: nuropb.interface.NuropbException

NuropbAuthenticationError: when this exception is raised, the transport layer will ACK the message and return an authentication error response to the requester.

This exception occurs whe the identity of the requester can not be validated. for example an unknown, invalid or expired user identifier or auth token.

In most cases, the requester will not be able to recover from this error and will need provide valid credentials and retry the request. The approach to this retry outside the scope of the nuropb API.

Initialization

Initialize self. See help(type(self)) for accurate signature.

exception nuropb.interface.NuropbAuthorizationError(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]

Bases: nuropb.interface.NuropbException

NuropbAuthorizationError: when this exception is raised, the transport layer will ACK the message and return an authorization error response to the requester.

This exception occurs whe the requester does not have the required privileges to perform the requested action of either a request or command.

In most cases, the requester will not be able to recover from this error and will need provide valid credentials and retry the request. The approach to this retry outside the scope of the nuropb API.

Initialization

Initialize self. See help(type(self)) for accurate signature.

exception nuropb.interface.NuropbNotDeliveredError(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]

Bases: nuropb.interface.NuropbException

NuropbNotDeliveredError: when this exception is raised, the transport layer will ACK the message and return an error response to the requester.

Initialization

Initialize self. See help(type(self)) for accurate signature.

exception nuropb.interface.NuropbCallAgainReject(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]

Bases: nuropb.interface.NuropbException

NuropbCallAgainReject: when this exception is raised, the transport layer will REJECT the message

To prevent accidental use of the redelivered parameter and to ensure system predictability on the Call Again feature, messages are only allowed to be redelivered once and only once. To this end all messages that have redelivered == True will be rejected.

Initialization

Initialize self. See help(type(self)) for accurate signature.

exception nuropb.interface.NuropbCallAgain(description: Optional[str] = None, payload: Optional[nuropb.interface.PayloadDict] = None, exception: Optional[BaseException] = None)[source]

Bases: nuropb.interface.NuropbException

NuropbCallAgain: when this exception is raised, the transport layer will NACK the message and schedule it to be redelivered. The delay is determined by the transport layer or message broker. A call again will result in a forced repeated call of the original message, with the same correlation_id and trace_id.

The call again “feature” is ignored for event service messages and response messages, as these are acked in all cases. The call again feature by implication is only supported for request and command messages.

WARNING: with request messages, if a response has been returned, then this pattern SHOULD NOT be used. The requester will receive the same response again, which will be ignored as an unpaired response. if the underlying service method has no idempotence guarantees, the service could end up in an inconsistent state.

Initialization

Initialize self. See help(type(self)) for accurate signature.

exception nuropb.interface.NuropbSuccess(result: Any, description: Optional[str] = None, payload: Optional[nuropb.interface.ResponsePayloadDict] = None, events: Optional[List[nuropb.interface.EventType]] = None)[source]

Bases: nuropb.interface.NuropbException

NuropbSuccessError: when this exception is raised, the transport layer will ACK the message and return a success response if service encoded payload is a ‘request’. This is useful when the request is a command or event and is executed asynchronously.

There are some use cases where the service may want to return a success response irrespective to the handling of the request.

A useful example is to short circuit processing when an outcome can be predetermined from the inputs alone. For end to end request-response consistency, this class must be instantiated with ResponsePayloadDict that contains a result consistent with the method and inputs provided.

Another use case is for the transmission of events raised during the execution of an event, command or request. Events will only be sent to the transports layer after the successful processing of a service message.

Initialization

Initialize self. See help(type(self)) for accurate signature.

result: Any

None

payload: nuropb.interface.ResponsePayloadDict | None

None

events: List[nuropb.interface.EventType]

[]

class nuropb.interface.NuropbInterface[source]

Bases: abc.ABC

NuropbInterface: represents the interface that must be implemented by a nuropb API implementation

_service_name: str

None

_instance_id: str

None

_service_instance: object

None

property service_name: str

service_name: returns the service name

property instance_id: str

instance_id: returns the instance id

abstract async connect() None[source]

connect: waits for the underlying transport to connect, an exception is raised if the connection fails to be established

Returns:

None

abstract async disconnect() None[source]

disconnect: disconnects from the underlying transport

Returns:

None

abstract property connected: bool

connected: returns the connection status of the underlying transport

Returns:

bool

abstract property is_leader: bool

is_leader: returns the leader status of the service instance

Returns:

bool

abstract receive_transport_message(service_message: nuropb.interface.TransportServicePayload, message_complete_callback: nuropb.interface.MessageCompleteFunction, metadata: Dict[str, Any]) None[source]

handle_message: does the processing of a NuroPb message received from the transport layer.

All response, request, command and event messages received from the transport layer are handled here.

For failures service messages are handled, other than for events, a response including details of the error is returned to the flow originator.

Parameters:
  • service_message – TransportServicePayload

  • message_complete_callback – MessageCompleteFunction

  • metadata – Dict[str, Any] - metric gathering information

Returns:

None

abstract async request(service: str, method: str, params: Dict[str, Any], context: Dict[str, Any], ttl: Optional[int] = None, trace_id: Optional[str] = None, rpc_response: bool = True) Union[nuropb.interface.ResponsePayloadDict, Any][source]

request: sends a request to the target service and waits for a response. It is up to the implementation to manage message idempotency and message delivery guarantees.

Parameters:
  • service – the service name

  • method – the method name

  • params – the method arguments, these must be easily serializable to JSON

  • context – additional information that represent the context in which the request is executed. The must be easily serializable to JSON.

  • ttl – the time to live of the request in milliseconds. After this time and dependent on the state and underlying transport, it will not be consumed by the target service and should be assumed by the requester to have failed with an undetermined state.

  • trace_id – an identifier to trace the request over the network (e.g. uuid4 hex string)

  • rpc_response – if True (default), the actual response of the RPC call is returned and where there was an error, that is raised as an exception. Where rpc_response is a ResponsePayloadDict, it is returned.

Returns:

ResponsePayloadDict

abstract command(service: str, method: str, params: Dict[str, Any], context: Dict[str, Any], ttl: Optional[int] = None, trace_id: Optional[str] = None) None[source]

command: sends a command to the target service. It is up to the implementation to manage message idempotency and message delivery guarantees.

any response from the target service is ignored.

Parameters:
  • service – the service name

  • method – the method name

  • params – the method arguments, these must be easily serializable to JSON

  • context – additional information that represent the context in which the request is executed. The must be easily serializable to JSON.

  • ttl – the time to live of the request in milliseconds. After this time and dependent on the underlying transport, it will not be consumed by the target or assumed by the requester to have failed with an undetermined state.

  • trace_id – an identifier to trace the request over the network (e.g. uuid4 hex string)

Returns:

None

abstract publish_event(topic: str, event: Any, context: Dict[str, Any], trace_id: Optional[str] = None) None[source]

publish_event: publishes an event to the transport layer. the event sender should not have any open transaction that is waiting for a response from this message. It is up to the implementation to manage message idempotency and message delivery guarantees.

Parameters:
  • topic – the topic to publish to

  • event – the message to publish, must be easily serializable to JSON

  • context – additional information that represent the context in which the event is executed. The must be easily serializable to JSON.

  • trace_id – an identifier to trace the request over the network (e.g. uuid4 hex string)

Returns:

None