Quickstart
The best way to get started is to look through the examples. These quick fire steps are the minimum to get going.:
Install Python >= 3.10
An accessible RabbitMQ >= 3.8.0 + Management Plugin
Install the nuropb package,
pip install nuropb
Run this code block to see a client and service running in the same Python module.
import logging
from typing import Any, Dict
from uuid import uuid4
import asyncio
from nuropb.contexts.context_manager import NuropbContextManager
from nuropb.contexts.context_manager_decorator import nuropb_context
from nuropb.contexts.describe import publish_to_mesh
from nuropb.rmq_api import RMQAPI
logger = logging.getLogger("nuropb-all-in-one")
def get_claims_from_token(bearer_token: str) -> Dict[str, Any] | None:
""" This is a stub for the required implementation of validating and decoding the bearer token
"""
_ = bearer_token
return {
"sub": "test_user",
"user_id": "test_user",
"scope": "openid, profile",
"roles": "user, admin",
}
class QuickExampleService:
_service_name = "quick-example"
_instance_id = uuid4().hex
@nuropb_context
@publish_to_mesh(authorize_func=get_claims_from_token)
def test_requires_user_claims(self, ctx, **kwargs: Any) -> str:
logger.info("test_requires_user_claims called")
assert isinstance(ctx, NuropbContextManager)
return f"hello {ctx.user_claims['user_id']}"
def test_method(self, param1, param2: Dict[str, Any]) -> Dict[str, Any]:
logger.info("test_method called")
_ = self
return {
"param1": param1,
"param2": param2,
"reply": "response from test_method",
}
async def main():
logging.info("All in one example done")
amqp_url = "amqp://guest:guest@localhost:5672/nuropb-example"
service_instance = QuickExampleService()
transport_settings = {
"rpc_bindings": [service_instance._service_name],
}
service_api = RMQAPI(
service_instance=service_instance,
service_name=service_instance._service_name,
instance_id=service_instance._instance_id,
amqp_url=amqp_url,
transport_settings=transport_settings,
)
await service_api.connect()
logger.info("Service Ready")
client_api = RMQAPI(
amqp_url=amqp_url,
)
await client_api.connect()
logger.info("Client connected")
context = {
"Authorization": "Bearer 1234567890",
}
response = await client_api.request(
service="quick-example",
method="test_requires_user_claims",
params={},
context=context,
)
logger.info(f"Response: {response}")
response = await client_api.request(
service="quick-example",
method="test_method",
params={
"param1": "value1",
"param2": {
"param2a": "value2a",
}
},
context={},
)
logger.info(f"Response: {response}")
await client_api.disconnect()
await service_api.disconnect()
logging.info("All in one example done")
if __name__ == "__main__":
log_format = (
"%(levelname).1s %(asctime)s %(name) -25s %(funcName) "
"-35s %(lineno) -5d: %(message)s"
)
logging.basicConfig(level=logging.INFO, format=log_format)
logging.getLogger("pika").setLevel(logging.WARNING)
logging.getLogger("etcd3").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
asyncio.run(main())