nuropb.service_runner
This module provides the runtime configuration for the RabbitMQ transport.
NuroPb Service leader election
RMQ Exchange and Queue configuration
A service Leader is elected using etcd. The leader is responsible for creating the RMQ Exchange and Queues, and binding the Queues to the Exchange. Due to the additional administrative responsibilities, the leader’s message prefetch size should be smaller relative to the other service instances.
TODO: The service leader is also responsible for monitoring and managing overall service health.
if the service queue is not draining fast enough, the leader will signal for additional instances to be started.
if the service queue is draining too fast, the leader will signal for instances to be stopped.
Connections to RMQ that are initiating service messages with high error rates will be terminated.
The leader will also monitor the dead letter queue and take action if the dead letter queue is growing too fast.
NuroPb services instances wait for the elected leader to signal that the RMQ Exchange and Queues are ready before connecting and starting to consume messages from the RMQ Queues.
Module Contents
Classes
ServiceRunner represents the state of the service configuration. |
|
Data
API
- nuropb.service_runner.logger
None
- nuropb.service_runner.LEADER_KEY
‘/leader’
- nuropb.service_runner.LEASE_TTL
15
- class nuropb.service_runner.ServiceRunner
ServiceRunner represents the state of the service configuration.
- service_name: str
None
service_name: the name of the service.
- leader_id: str
None
service_leader_id: the identifier of the service leader.
- configured: bool
None
rmq_configured: indicates if the RMQ Exchange and Queues have been configured.
- ready: bool
None
rmq_ready: indicates if the RMQ Exchange and Queues are ready to receive messages.
- consume: bool
None
rmq_consume: indicates if all service instance should consume messages or not from the RMQ Queues.
- hw_mark: int
None
trigger for when to start or stop service instances.
- _etc_client: etcd3.Client
None
_etc_client: the etcd3 client used to communicate with the etcd3 server.
- nuropb.service_runner.ContainerRunningState
None
- class nuropb.service_runner.ServiceContainer(rmq_api_url: str, instance: nuropb.rmq_api.RMQAPI, etcd_config: Optional[Dict[str, Any]] = None)
Bases:
nuropb.service_runner.ServiceRunner- _instance: nuropb.rmq_api.RMQAPI
None
- _rmq_api_url: str
None
- _service_name: str
None
- _transport: nuropb.rmq_transport.RMQTransport
None
- _running_state: nuropb.service_runner.ContainerRunningState
None
- _rmq_config_ok: bool
None
- _shutdown: bool
None
- _is_leader: bool
None
- _leader_reference: str
None
- _etcd_config: Dict[str, Any]
None
- _etcd_client: etcd3.Client | None
None
- _etcd_lease: etcd3.stateful.lease.Lease | None
None
- _etcd_watcher: etcd3.stateful.watch.Watcher | None
None
- _etcd_prefix: str
None
- _container_running_future: Awaitable[bool] | None
None
- _container_shutdown_future: Awaitable[bool] | None
None
- property running_state: nuropb.service_runner.ContainerRunningState
running_state: the current running state of the service container.
- async init_etcd(on_startup: bool = True) bool
init_etcd: initialises the etcd client and connection. Not a fan of the threading started here, when defining a watcher and calling runDaemon(). It would be way nicer to have a fully asynchronous etcd3 client. There is also threading used for the lease keepalive.
NOTES:
this function will run until successful etcd connection is established, or until self._shutdown is set to True.
- Parameters:
on_startup – True, if the etcd3 client is being initialised at startup.
- Returns:
None
- nominate_as_leader() None
nominate_as_leader: nominates the service instance as the leader. if first to secure a lease on the leader key, then the instance is the leader. Otherwise, the instance is a follower.
Handling the following scenarios: On Startup: [No transition to leader or from leader required] * Can secure the leader key lease - startup: the first instance to start, no existing entry becomes the leader. * Cannot secure the leader key lease - startup: there is an existing leader entry, all services must wait until key’s lease expires and can then compete to become the leader.
Post Startup: [Possible transition to leader or from leader is required] * No change to self._is_leader and continue as before. - No transition required * Can secure the leader key lease - running: the leader entry is reset to None, followers then compete to become leader.ETCD NOTES: it appears that a lease can only be associated with a key it that key was created with the lease. If the key is created without a lease, then the lease cannot be associated with the key.
- Returns:
None
- update_etcd_service_property(key: str, value: Any) bool
update_etcd_service_property: updates the etcd3 service property.
The use of this method is restricted to the service leader
- Parameters:
key –
value –
- Returns:
- check_and_configure_rmq() None
check_and_configure_rmq: checks that the RMQ Exchange and Queues are correctly configured. If not, then the NuroPb RMQ configuration for self.service_name is applied.
- Returns:
None
- etcd_event_handler(event: etcd3.stateful.watch.Event) None
etc_event_handler: handles the etcd3 events.
- async startup_steps() None
startup_steps: the startup steps for the service container.
wait for the leader to be elected.
check and configure the RMQ Exchange and Queues.
connect the service instance to RMQ.
start the etcd lease refresh loop.
- Returns:
None
- async start() bool
Starts the etcd client if configured
Startup Steps:
Leader election
Configures the brokers nuropb service mesh configuration if not done
Starts the container service instance.
- Returns:
None
- async stop() None
stop: stops the container service instance.
primary entry point to stop the service container.
- Returns:
None