import asyncio
import logging
from abc import ABC
from typing import (
Dict,
Any,
Callable,
Optional,
TypedDict,
Literal,
Type,
Union,
List,
)
logger = logging.getLogger(__name__)
NUROPB_VERSION = "0.1.8"
NUROPB_PROTOCOL_VERSION = "0.1.1"
NUROPB_PROTOCOL_VERSIONS_SUPPORTED = ("0.1.1",)
NUROPB_MESSAGE_TYPES = (
"request",
"response",
"event",
"command",
)
NuropbSerializeType = Literal["json"]
NuropbMessageType = Literal["request", "response", "event", "command"]
NuropbLifecycleState = Literal[
"client-start",
"client-encode",
"client-send",
"service-receive",
"service-decode",
"service-handle",
"service-encode",
"service-reply",
"service-ack",
"client-receive",
"client-decode",
"client-handle",
"client-ack",
"client-end",
]
[docs]
class ErrorDescriptionType(TypedDict):
error: str
description: Optional[str]
context: Optional[Dict[str, Any]]
[docs]
class EventType(TypedDict):
"""For compatibility with better futureproof serialisation support, Any encoded payload type is
supported.It is encouraged to use a json compatible key/value Type e.g. Dict[str, Any]
:target: is currently provided here as an aid for the implementation, there are use cases
where events are targeted to a specified audience or list or targets. In the NuroPb paradigm,
targets could be individual users or other services. A service would represent the service
as a whole, NOT any individual instance of that service.
It is also advised not to use NuroPb for communication between instances of a service.
Reference the notes on EventPayloadDict
"""
topic: str
payload: Any
target: Optional[List[Any]]
[docs]
class RequestPayloadDict(TypedDict):
"""Type[RequestPayloadDict]: represents a request that is sent to a service:
A request has a response, it is acknowledged by the transport layer after the destination
service has handled the request.
REMINDER FOR FUTURE: It is very tempting to support the Tuple[...,Any] for the
param key. There is much broader downstream compatibility in keeping this as a
dictionary / key-value mapping.
"""
tag: Literal["request"]
correlation_id: str
context: Dict[str, Any]
trace_id: Optional[str]
service: str
method: str
params: Dict[str, Any]
[docs]
class CommandPayloadDict(TypedDict):
"""Type[CommandPayloadDict]: represents a command that is sent to a service:
A command has no response, it is acked immediately by the transport layer. The
originator of the command has now knowledge of the execution of the command. If
a response is required, a request should be used.
A command is useful when used as a type of directed event.
REMINDER FOR FUTURE: It is very tempting to support the Tuple[...,Any] for the
param key. There is much broader downstream compatibility in keeping this as a
dictionary / key-value mapping.
"""
tag: Literal["command"]
correlation_id: str
context: Dict[str, Any]
trace_id: Optional[str]
service: str
method: str
params: Dict[str, Any]
[docs]
class EventPayloadDict(TypedDict):
"""Type[EventPayloadDict]: represents an event that is published to a topic:
:target: is currently provided here as an aid for the implementation, there are use cases
where events are targeted to a specified audience or list or targets. In the NuroPb paradigm,
targets could be individual users or other services. A service would represent the service
as a whole, NOT any individual instance of that service.
Reference the notes on EventType
"""
tag: Literal["event"]
correlation_id: str
context: Dict[str, Any]
trace_id: Optional[str]
topic: str
event: Any
target: Optional[List[Any]]
[docs]
class ResponsePayloadDict(TypedDict):
"""Type[ResponsePayloadDict]: represents a response to a request:"""
tag: Literal["response"]
correlation_id: str
context: Dict[str, Any]
trace_id: Optional[str]
result: Any
error: Optional[Dict[str, Any]]
warning: Optional[str]
reply_to: str
PayloadDict = Union[
ResponsePayloadDict, RequestPayloadDict, CommandPayloadDict, EventPayloadDict
]
ServicePayloadTypes = Union[ResponsePayloadDict, CommandPayloadDict, EventPayloadDict]
ResponsePayloadTypes = Union[ResponsePayloadDict, EventPayloadDict]
[docs]
class TransportServicePayload(TypedDict):
"""Type[TransportServicePayload]: represents valid service instruction encoded payload.
Depending on the transport implementation, there wire encoding and serialization may
be different, and some of the fields may be in the body or header of the message.
"""
nuropb_protocol: str # nuropb defined and validated
correlation_id: str # nuropb defined
trace_id: Optional[str] # implementation defined
ttl: Optional[int] # time to live in milliseconds
nuropb_type: NuropbMessageType
nuropb_payload: Dict[str, Any] # ServicePayloadTypes
[docs]
class TransportRespondPayload(TypedDict):
"""Type[TransportRespondPayload]: represents valid service response message,
valid nuropb encoded payload types are ResponsePayloadDict, and EventPayloadDict
"""
nuropb_protocol: str # nuropb defined and validated
correlation_id: str # nuropb defined
trace_id: Optional[str] # implementation defined
ttl: Optional[int]
nuropb_type: NuropbMessageType
nuropb_payload: ResponsePayloadTypes
ResultFutureResponsePayload = asyncio.Future[ResponsePayloadDict]
ResultFutureAny = asyncio.Future[Any]
AcknowledgeAction = Literal["ack", "nack", "reject"]
AcknowledgeCallbackFunction = Callable[[AcknowledgeAction], None]
""" AcknowledgeCallbackFunction: represents a callable with the inputs:
- action: AcknowledgeAction # one of "ack", "nack", "reject"
"""
MessageCompleteFunction = Callable[
[List[TransportRespondPayload], AcknowledgeAction], None
]
""" MessageCompleteFunction: represents a callable with the inputs:
- response_messages: List[TransportRespondPayload] # the responses to be sent
- acknowledge_action: AcknowledgeAction # one of "ack", "nack", "reject"
"""
MessageCallbackFunction = Callable[
[TransportServicePayload, MessageCompleteFunction, Dict[str, Any]], None
]
""" MessageCallbackFunction: represents a callable with the inputs:
- message: TransportServicePayload # the decoded message
- message_complete: Optional[AcknowledgeCallbackFunction] # a function that is called to acknowledge the message
- metadata: Dict[str: Any] # the context of the message
"""
ConnectionCallbackFunction = Callable[[Type["NuropbInterface"], str, str], None]
""" ConnectionCallbackFunction: represents a callable with the inputs:
- instance: type of NuropbInterface
- status: str # the status of the connection (connected, disconnected)
- reason: str # the reason for the connection status change
"""
[docs]
class NuropbException(Exception):
"""NuropbException: represents a base exception for all exceptions raised by the nuropb API
although the input parameters are optional, it is recommended that the message is set to a
meaningful value and the nuropb_message is set to the values that were present when the
exception was raised.
"""
description: str
payload: PayloadDict | TransportServicePayload | TransportRespondPayload | Dict[
str, Any
] | None
exception: BaseException | None
def __init__(
self,
description: Optional[str] = None,
payload: Optional[PayloadDict] = None,
exception: Optional[BaseException] = None,
):
if description is None:
description = (
f" {exception}"
if exception is not None
else f"{self.__class__.__name__}"
)
super().__init__(description)
self.description = description
self.payload = payload
self.exception = exception
[docs]
def to_dict(self) -> Dict[str, Any]:
underlying_exception = str(self.exception) if self.exception else str(self)
description = self.description if self.description else underlying_exception
return {
"error": self.__class__.__name__,
"description": description,
}
[docs]
class NuropbTimeoutError(NuropbException):
"""NuropbTimeoutError: represents an error that occurred when a timeout was reached."""
[docs]
class NuropbTransportError(NuropbException):
"""NuropbTransportError: represents an error that inside the plumbing."""
_close_connection: bool
def __init__(
self,
description: Optional[str] = None,
payload: Optional[PayloadDict] = None,
exception: Optional[BaseException] = None,
close_connection: bool = False,
):
super().__init__(
description=description,
payload=payload,
exception=exception,
)
self._close_connection = close_connection
@property
def close_connection(self) -> bool:
"""close_connection: returns True if the connection should be closed"""
return self._close_connection
[docs]
class NuropbMessageError(NuropbException):
"""NuropbMessageError: represents an error that occurred during the encoding or decoding of a
message.
"""
[docs]
class NuropbHandlingError(NuropbException):
"""NuropbHandlingError: represents an error that occurred during the execution or fulfilment
of a request or command. An error response is returned to the requester.
"""
[docs]
class NuropbDeprecatedError(NuropbHandlingError):
"""NuropbDeprecatedError: represents an error that occurred during the execution or fulfilment
of a request, command or event topic that has been marked deprecated.
"""
[docs]
class NuropbValidationError(NuropbException):
"""NuropbValidationError: represents an error that occurred during the validation of a
request or command. An error response is returned to the requester.
An error response is returned to the requester ONLY for requests and commands.
Events will be rejected with a NACK with requeue=False.
"""
[docs]
class NuropbAuthenticationError(NuropbException):
"""NuropbAuthenticationError: when this exception is raised, the transport layer will ACK the
message and return an authentication error response to the requester.
This exception occurs whe the identity of the requester can not be validated. for example
an unknown, invalid or expired user identifier or auth token.
In most cases, the requester will not be able to recover from this error and will need provide
valid credentials and retry the request. The approach to this retry outside the scope of the
nuropb API.
"""
[docs]
class NuropbAuthorizationError(NuropbException):
"""NuropbAuthorizationError: when this exception is raised, the transport layer will ACK the
message and return an authorization error response to the requester.
This exception occurs whe the requester does not have the required privileges to perform the
requested action of either a request or command.
In most cases, the requester will not be able to recover from this error and will need provide
valid credentials and retry the request. The approach to this retry outside the scope of the
nuropb API.
"""
[docs]
class NuropbNotDeliveredError(NuropbException):
"""NuropbNotDeliveredError: when this exception is raised, the transport layer will ACK the
message and return an error response to the requester.
"""
[docs]
class NuropbCallAgainReject(NuropbException):
"""NuropbCallAgainReject: when this exception is raised, the transport layer will REJECT
the message
To prevent accidental use of the redelivered parameter and to ensure system predictability
on the Call Again feature, messages are only allowed to be redelivered once and only once.
To this end all messages that have redelivered == True will be rejected.
"""
[docs]
class NuropbCallAgain(NuropbException):
"""NuropbCallAgain: when this exception is raised, the transport layer will NACK the message
and schedule it to be redelivered. The delay is determined by the transport layer or message
broker. A call again will result in a forced repeated call of the original message, with the
same correlation_id and trace_id.
The call again "feature" is ignored for event service messages and response messages, as
these are acked in all cases. The call again feature by implication is only supported for
request and command messages.
WARNING: with request messages, if a response has been returned, then this pattern
SHOULD NOT be used. The requester will receive the same response again, which will be
ignored as an unpaired response. if the underlying service method has no idempotence
guarantees, the service could end up in an inconsistent state.
"""
[docs]
class NuropbSuccess(NuropbException):
"""NuropbSuccessError: when this exception is raised, the transport layer will ACK the message
and return a success response if service encoded payload is a 'request'. This is useful when the request
is a command or event and is executed asynchronously.
There are some use cases where the service may want to return a success response irrespective
to the handling of the request.
A useful example is to short circuit processing when an outcome can be predetermined from the
inputs alone. For end to end request-response consistency, this class must be instantiated with
ResponsePayloadDict that contains a result consistent with the method and inputs provided.
Another use case is for the transmission of events raised during the execution of an event,
command or request. Events will only be sent to the transports layer after the successful
processing of a service message.
"""
result: Any
payload: ResponsePayloadDict | None
events: List[EventType] = []
def __init__(
self,
result: Any,
description: Optional[str] = None,
payload: Optional[ResponsePayloadDict] = None,
events: Optional[List[EventType]] = None,
):
super().__init__(
description=description,
payload=payload,
exception=None,
)
self.result = result
self.payload = payload
self.events = [] if events is None else events
[docs]
class NuropbInterface(ABC):
"""NuropbInterface: represents the interface that must be implemented by a nuropb API implementation"""
_service_name: str
_instance_id: str
_service_instance: object
@property
def service_name(self) -> str:
"""service_name: returns the service name"""
return self._service_name
@property
def instance_id(self) -> str:
"""instance_id: returns the instance id"""
return self._instance_id
[docs]
async def connect(self) -> None:
"""connect: waits for the underlying transport to connect, an exception is raised if the connection fails
to be established
:return: None
"""
raise NotImplementedError() # pragma: no cover
[docs]
async def disconnect(self) -> None:
"""disconnect: disconnects from the underlying transport
:return: None
"""
raise NotImplementedError() # pragma: no cover
@property
def connected(self) -> bool:
"""connected: returns the connection status of the underlying transport
:return: bool
"""
raise NotImplementedError() # pragma: no cover
@property
def is_leader(self) -> bool:
"""is_leader: returns the leader status of the service instance
:return: bool
"""
raise NotImplementedError() # pragma: no cover
[docs]
def receive_transport_message(
self,
service_message: TransportServicePayload,
message_complete_callback: MessageCompleteFunction,
metadata: Dict[str, Any],
) -> None:
"""handle_message: does the processing of a NuroPb message received from the transport
layer.
All response, request, command and event messages received from the transport layer are
handled here.
For failures service messages are handled, other than for events, a response including
details of the error is returned to the flow originator.
:param service_message: TransportServicePayload
:param message_complete_callback: MessageCompleteFunction
:param metadata: Dict[str, Any] - metric gathering information
:return: None
"""
raise NotImplementedError() # pragma: no cover
[docs]
async def request(
self,
service: str,
method: str,
params: Dict[str, Any],
context: Dict[str, Any],
ttl: Optional[int] = None,
trace_id: Optional[str] = None,
rpc_response: bool = True,
) -> Union[ResponsePayloadDict, Any]:
"""request: sends a request to the target service and waits for a response. It is up to the
implementation to manage message idempotency and message delivery guarantees.
:param service: the service name
:param method: the method name
:param params: the method arguments, these must be easily serializable to JSON
:param context: additional information that represent the context in which the request is executed.
The must be easily serializable to JSON.
:param ttl: the time to live of the request in milliseconds. After this time and dependent on the
state and underlying transport, it will not be consumed by the target service and
should be assumed by the requester to have failed with an undetermined state.
:param trace_id: an identifier to trace the request over the network (e.g. uuid4 hex string)
:param rpc_response: if True (default), the actual response of the RPC call is returned and where
there was an error, that is raised as an exception. Where rpc_response is a ResponsePayloadDict,
it is returned.
:return: ResponsePayloadDict
"""
raise NotImplementedError() # pragma: no cover
[docs]
def command(
self,
service: str,
method: str,
params: Dict[str, Any],
context: Dict[str, Any],
ttl: Optional[int] = None,
trace_id: Optional[str] = None,
) -> None:
"""command: sends a command to the target service. It is up to the implementation to manage message
idempotency and message delivery guarantees.
any response from the target service is ignored.
:param service: the service name
:param method: the method name
:param params: the method arguments, these must be easily serializable to JSON
:param context: additional information that represent the context in which the request is executed.
The must be easily serializable to JSON.
:param ttl: the time to live of the request in milliseconds. After this time and dependent on the
underlying transport, it will not be consumed by the target
or
assumed by the requester to have failed with an undetermined state.
:param trace_id: an identifier to trace the request over the network (e.g. uuid4 hex string)
:return: None
"""
raise NotImplementedError() # pragma: no cover
[docs]
def publish_event(
self,
topic: str,
event: Any,
context: Dict[str, Any],
trace_id: Optional[str] = None,
) -> None:
"""publish_event: publishes an event to the transport layer. the event sender should not have
any open transaction that is waiting for a response from this message. It is up to the
implementation to manage message idempotency and message delivery guarantees.
:param topic: the topic to publish to
:param event: the message to publish, must be easily serializable to JSON
:param context: additional information that represent the context in which the event is executed.
The must be easily serializable to JSON.
:param trace_id: an identifier to trace the request over the network (e.g. uuid4 hex string)
:return: None
"""
raise NotImplementedError() # pragma: no cover