import logging
from types import TracebackType
from typing import Any, Dict, List, Optional, Type
logger = logging.getLogger(__name__)
_test_token_cache: Dict[str, Any] = {}
_test_user_id_cache: Dict[str, Any] = {}
[docs]
class NuropbContextManager:
"""This class is a context manager that's used to manage a transaction's context relating
to an incoming nuropb service message. When a class instance's method is decorated with the
nuropb_context decorator, the context manager is instantiated and injected into the method
as a ctx parameter.
The nuropb context manager is both a sync and async context manager. Events can be added
to the context manager and will be sent to the service mesh when the context manager
successfully exits. If an exception is raised while the context manager is in scope, the
exception is recorded and the transaction in context is considered to have failed. Any
events added to the context manager are discarded.
"""
_suppress_exceptions: bool
_nuropb_payload: Dict[str, Any] | None
_context: Dict[str, Any]
_user_claims: Dict[str, Any] | None
_events: List[Dict[str, Any]]
_exc_type: Type[BaseException] | None
_exec_value: BaseException | None
_exc_tb: TracebackType | None
_started: bool
_done: bool
def __init__(
self, context: Dict[str, Any], suppress_exceptions: Optional[bool] = True
):
if context is None:
raise TypeError("context cannot be None")
self._suppress_exceptions = (
True if suppress_exceptions is None else suppress_exceptions
)
self._nuropb_payload = None
self._context = context
self._user_claims = None
self._events = []
self._exc_type = None
self._exec_value = None
self._exc_tb = None
self._started = False
self._done = False
@property
def context(self) -> Dict[str, Any]:
return self._context
@property
def user_claims(self) -> Dict[str, Any] | None:
return self._user_claims
@user_claims.setter
def user_claims(self, claims: Dict[str, Any] | None) -> None:
if self._user_claims is not None:
raise ValueError("user_claims can only be set once")
self._user_claims = claims
@property
def events(self) -> List[Dict[str, Any]]:
return self._events
@property
def error(self) -> Dict[str, Any] | None:
if self._exc_type is None:
return None
return {
"error": self._exc_type.__name__,
"description": str(self._exec_value),
}
[docs]
def add_event(self, event: Dict[str, Any]) -> None:
"""Add an event to the context manager. The event will be sent to the service mesh when the context manager
exits successfully.
Event format
- "topic": "test_topic",
- "event": "test_event_payload",
- "context": {}
:param event:
:return:
"""
self._events.append(event)
[docs]
def _handle_context_exit(
self,
exc_type: Type[BaseException] | None,
exc_value: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
"""This method is for customising the behaviour when a context manager exits. It has the
same signature as __exit__ or __aexit__.
If an exception was raised while the context manager was running, the exception information
is recorded and content transaction is considered to have failed. If any events were added
to the context manager, they are discarded.
"""
if self._done:
raise RuntimeError("Context manager has already exited")
self._done = True
self._exc_type = exc_type
self._exec_value = exc_value
self._exc_tb = exc_tb
if exc_type is not None:
self._events = []
return self._suppress_exceptions
"""
**** Context Manager sync and async methods ****
"""
[docs]
def __enter__(self) -> Any:
"""This method is called when entering a context manager with a with statement"""
if self._done:
raise RuntimeError("Context manager has already exited")
if self._started:
raise RuntimeError("Context manager has already entered")
self._started = True
return self
[docs]
async def __aenter__(self) -> Any:
return self.__enter__()
[docs]
def __exit__(
self,
exc_type: Type[BaseException] | None,
exc_value: BaseException | None,
exc_tb: TracebackType | None,
) -> bool | None:
return self._handle_context_exit(exc_type, exc_value, exc_tb)
[docs]
async def __aexit__(
self,
exc_type: Type[BaseException] | None,
exc_value: BaseException | None,
exc_tb: TracebackType | None,
) -> bool | None:
return self.__exit__(exc_type, exc_value, exc_tb)