Source code for nyxmon.service_layer.message_bus

import logging
from typing import Callable

from ..domain import Event, Command
from .unit_of_work import UnitOfWork

logger = logging.getLogger(__name__)

Message = Event | Command


[docs] class MessageBus: """ This is a simple implementation of a message bus that can handle messages of type Event and Command using event_handlers and command_handlers. It uses a UnitOfWork to collect new events. """ def __init__( self, uow: UnitOfWork, event_handlers: dict[type[Event], list[Callable]], command_handlers: dict[type[Command], Callable], ): self.uow = uow self.event_handlers = event_handlers self.command_handlers = command_handlers self.queue: list[Message] = []
[docs] def handle(self, message: Message): self.queue.append(message) while self.queue: message = self.queue.pop(0) if isinstance(message, Event): self.handle_event(message) elif isinstance(message, Command): self.handle_command(message) else: raise Exception(f"Cannot handle message of type {message}")
[docs] def handle_event(self, event: Event): for handler in self.event_handlers[type(event)]: try: logger.debug("handling event %s with handler %s", event, handler) handler(event) self.queue.extend(self.uow.collect_new_events()) except Exception: logger.exception("Exception handling event %s", event) continue
[docs] def handle_command(self, command: Command): logger.debug("handling command %s", command) try: handler = self.command_handlers[type(command)] handler(command) self.queue.extend(self.uow.collect_new_events()) except Exception: logger.exception("Exception handling command %s", command) raise