nuropb.service_runner

This module provides the runtime configuration for the RabbitMQ transport.

  • NuroPb Service leader election

  • RMQ Exchange and Queue configuration

A service Leader is elected using etcd. The leader is responsible for creating the RMQ Exchange and Queues, and binding the Queues to the Exchange. Due to the additional administrative responsibilities, the leader’s message prefetch size should be smaller relative to the other service instances.

TODO: The service leader is also responsible for monitoring and managing overall service health.

  • if the service queue is not draining fast enough, the leader will signal for additional instances to be started.

  • if the service queue is draining too fast, the leader will signal for instances to be stopped.

  • Connections to RMQ that are initiating service messages with high error rates will be terminated.

  • The leader will also monitor the dead letter queue and take action if the dead letter queue is growing too fast.

NuroPb services instances wait for the elected leader to signal that the RMQ Exchange and Queues are ready before connecting and starting to consume messages from the RMQ Queues.

Module Contents

Classes

ServiceRunner

ServiceRunner represents the state of the service configuration.

ServiceContainer

Data

logger

LEADER_KEY

LEASE_TTL

ContainerRunningState

API

nuropb.service_runner.logger

None

nuropb.service_runner.LEADER_KEY

‘/leader’

nuropb.service_runner.LEASE_TTL

15

class nuropb.service_runner.ServiceRunner

ServiceRunner represents the state of the service configuration.

service_name: str

None

service_name: the name of the service.

leader_id: str

None

service_leader_id: the identifier of the service leader.

configured: bool

None

rmq_configured: indicates if the RMQ Exchange and Queues have been configured.

ready: bool

None

rmq_ready: indicates if the RMQ Exchange and Queues are ready to receive messages.

consume: bool

None

rmq_consume: indicates if all service instance should consume messages or not from the RMQ Queues.

hw_mark: int

None

trigger for when to start or stop service instances.

_etc_client: etcd3.Client

None

_etc_client: the etcd3 client used to communicate with the etcd3 server.

nuropb.service_runner.ContainerRunningState

None

class nuropb.service_runner.ServiceContainer(rmq_api_url: str, instance: nuropb.rmq_api.RMQAPI, etcd_config: Optional[Dict[str, Any]] = None)

Bases: nuropb.service_runner.ServiceRunner

_instance: nuropb.rmq_api.RMQAPI

None

_rmq_api_url: str

None

_service_name: str

None

_transport: nuropb.rmq_transport.RMQTransport

None

_running_state: nuropb.service_runner.ContainerRunningState

None

_rmq_config_ok: bool

None

_shutdown: bool

None

_is_leader: bool

None

_leader_reference: str

None

_etcd_config: Dict[str, Any]

None

_etcd_client: etcd3.Client | None

None

_etcd_lease: etcd3.stateful.lease.Lease | None

None

_etcd_watcher: etcd3.stateful.watch.Watcher | None

None

_etcd_prefix: str

None

_container_running_future: Awaitable[bool] | None

None

_container_shutdown_future: Awaitable[bool] | None

None

property running_state: nuropb.service_runner.ContainerRunningState

running_state: the current running state of the service container.

async init_etcd(on_startup: bool = True) bool

init_etcd: initialises the etcd client and connection. Not a fan of the threading started here, when defining a watcher and calling runDaemon(). It would be way nicer to have a fully asynchronous etcd3 client. There is also threading used for the lease keepalive.

NOTES:

  • this function will run until successful etcd connection is established, or until self._shutdown is set to True.

Parameters:

on_startup – True, if the etcd3 client is being initialised at startup.

Returns:

None

nominate_as_leader() None

nominate_as_leader: nominates the service instance as the leader. if first to secure a lease on the leader key, then the instance is the leader. Otherwise, the instance is a follower.

Handling the following scenarios: On Startup: [No transition to leader or from leader required] * Can secure the leader key lease - startup: the first instance to start, no existing entry becomes the leader. * Cannot secure the leader key lease - startup: there is an existing leader entry, all services must wait until key’s lease expires and can then compete to become the leader.

Post Startup: [Possible transition to leader or from leader is required]
* No change to self._is_leader and continue as before. - No transition required
* Can secure the leader key lease
    - running: the leader entry is reset to None, followers then compete to become leader.

ETCD NOTES: it appears that a lease can only be associated with a key it that key was created with the lease. If the key is created without a lease, then the lease cannot be associated with the key.

Returns:

None

update_etcd_service_property(key: str, value: Any) bool

update_etcd_service_property: updates the etcd3 service property.

The use of this method is restricted to the service leader

Parameters:
  • key

  • value

Returns:

check_and_configure_rmq() None

check_and_configure_rmq: checks that the RMQ Exchange and Queues are correctly configured. If not, then the NuroPb RMQ configuration for self.service_name is applied.

Returns:

None

etcd_event_handler(event: etcd3.stateful.watch.Event) None

etc_event_handler: handles the etcd3 events.

async startup_steps() None

startup_steps: the startup steps for the service container.

  • wait for the leader to be elected.

  • check and configure the RMQ Exchange and Queues.

  • connect the service instance to RMQ.

  • start the etcd lease refresh loop.

Returns:

None

async start() bool
  • Starts the etcd client if configured

  • Startup Steps:

    • Leader election

    • Configures the brokers nuropb service mesh configuration if not done

    • Starts the container service instance.

Returns:

None

async stop() None

stop: stops the container service instance.

  • primary entry point to stop the service container.

Returns:

None