From b9041e8977ebe82624ea0adaaa17dc2b15c4819a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20G=C3=B3recki?= Date: Thu, 9 Feb 2023 07:03:15 +0100 Subject: [PATCH] external events: wip --- src/seedwork/application/event_dispatcher.py | 12 +- src/seedwork/application/message_broker.py | 22 +++ src/seedwork/application/modules.py | 72 +++++++--- src/seedwork/application/registry.py | 26 ++-- src/seedwork/domain/events.py | 6 +- ..._modules_interacting_via_domain_events.py} | 0 ...ules_interacting_via_integration_events.py | 134 ++++++++++++++++++ 7 files changed, 233 insertions(+), 39 deletions(-) create mode 100644 src/seedwork/application/message_broker.py rename src/seedwork/tests/application/{test_two_modules_interacting.py => test_two_modules_interacting_via_domain_events.py} (100%) create mode 100644 src/seedwork/tests/application/test_two_modules_interacting_via_integration_events.py diff --git a/src/seedwork/application/event_dispatcher.py b/src/seedwork/application/event_dispatcher.py index 88f830b..e1a9496 100644 --- a/src/seedwork/application/event_dispatcher.py +++ b/src/seedwork/application/event_dispatcher.py @@ -1,14 +1,16 @@ import abc from collections import defaultdict -from seedwork.domain.events import Event +from seedwork.domain.events import SystemEvent class EventDispatcher(metaclass=abc.ABCMeta): """An interface for a generic event dispatcher""" @abc.abstractmethod - def add_event_handler(self, event_class: type[Event], event_handler: callable): + def add_event_handler( + self, event_class: type[SystemEvent], event_handler: callable + ): raise NotImplementedError() @abc.abstractmethod @@ -20,10 +22,12 @@ class InMemoryEventDispatcher(EventDispatcher): def __init__(self): self._handlers = defaultdict(set) - def add_event_handler(self, event_class: type[Event], event_handler: callable): + def add_event_handler( + self, event_class: type[SystemEvent], event_handler: callable + ): self._handlers[event_class].add(event_handler) - def dispatch(self, event: type[Event]): + def dispatch(self, event: type[SystemEvent]): event_class = type(event) for event_handler in self._handlers[event_class]: event_handler(event) diff --git a/src/seedwork/application/message_broker.py b/src/seedwork/application/message_broker.py new file mode 100644 index 0000000..8b65371 --- /dev/null +++ b/src/seedwork/application/message_broker.py @@ -0,0 +1,22 @@ +class Inbox: + ... + + +class Outbox: + ... + + +class MessageBroker: + ... + + +class InMemoryInbox(Inbox): + ... + + +class InMemoryOutbox(Outbox): + ... + + +class InMemoryMessageBroker(MessageBroker): + ... diff --git a/src/seedwork/application/modules.py b/src/seedwork/application/modules.py index 733a9c1..ee879a6 100644 --- a/src/seedwork/application/modules.py +++ b/src/seedwork/application/modules.py @@ -29,8 +29,8 @@ def handle(module, query_or_command, *args, **kwargs): from sqlalchemy.orm import Session from seedwork.application.decorators import registry as default_registry -from seedwork.domain.events import DomainEvent -from seedwork.domain.repositories import GenericRepository +from seedwork.application.message_broker import Inbox, Outbox +from seedwork.domain.events import DomainEvent, IntegrationEvent from seedwork.infrastructure.request_context import request_context @@ -44,10 +44,9 @@ class UnitOfWork: db_session: Session correlation_id: uuid.UUID - def get_repositories(self): - for attr in self.__dict__.values(): - if isinstance(attr, GenericRepository): - yield attr + def handler_arguments(self) -> dict: + """Return arguments that are injectable to command/query handlers""" + return self.__dict__.items() class BusinessModule: @@ -67,7 +66,13 @@ class BusinessModule: event_handlers = () registry = default_registry - def __init__(self, domain_event_dispatcher: type[EventDispatcher], **kwargs): + def __init__( + self, + domain_event_dispatcher: EventDispatcher, + inbox: Inbox = None, + outbox: Outbox = None, + **kwargs, + ): self._uow: ContextVar[UnitOfWork] = ContextVar("_uow", default=None) self.init_kwargs = kwargs self._domain_event_dispatcher = domain_event_dispatcher @@ -78,12 +83,19 @@ def register_event_handlers(self): if self._domain_event_dispatcher is None: return - for event_class in self.get_handleable_domain_events(): - self._domain_event_dispatcher.add_event_handler( - event_class=event_class, event_handler=self.handle_domain_event - ) - - def get_handleable_domain_events(self) -> list[type[DomainEvent]]: + for event_class in self.get_handleable_events(): + if issubclass(event_class, DomainEvent): + self._domain_event_dispatcher.add_event_handler( + event_class=event_class, event_handler=self.handle_domain_event + ) + elif issubclass(event_class, IntegrationEvent): + self._integration_event_dispatcher.add_event_handler( + event_class=event_class, event_handler=self.handle_integration_event + ) + else: + NotImplementedError(f"Unsupported event class {event_class}") + + def get_handleable_events(self) -> list[type[DomainEvent]]: """Returns a list of domain event classes that this module is capable of handling""" handled_event_types = set() for handler in self.event_handlers: @@ -134,12 +146,13 @@ def end_unit_of_work(self, uow): def execute_command(self, command): """Module entrypoint. Use it to change the state of the module by passing a command object""" command_class = type(command) + arguments = self.uow.handler_arguments() assert ( command_class in self.supported_commands ), f"{command_class} is not included in {type(self).__name__}.supported_commands" handler = self.registry.get_command_handler_for(command_class) - kwarg_params = self.registry.get_command_handler_parameters_for(command_class) - kwargs = self.resolve_handler_kwargs(kwarg_params) + handler_params = self.registry.get_command_handler_parameters_for(command_class) + kwargs = self.resolve_handler_kwargs(handler_params, arguments) command_result = handler(command, **kwargs) if command_result.is_success(): self.publish_domain_events(command_result.events) @@ -148,12 +161,13 @@ def execute_command(self, command): def execute_query(self, query): """Module entrypoint. Use it to read the state of the module by passing a query object""" query_class = type(query) + arguments = self.uow.handler_arguments() assert ( query_class in self.supported_queries ), f"{query_class} is not included in {type(self).__name__}.supported_queries" handler = self.registry.get_query_handler_for(query_class) - kwarg_params = self.registry.get_query_handler_parameters_for(query_class) - kwargs = self.resolve_handler_kwargs(kwarg_params) + handler_params = self.registry.get_query_handler_parameters_for(query_class) + kwargs = self.resolve_handler_kwargs(handler_params, arguments) return handler(query, **kwargs) @property @@ -166,19 +180,23 @@ def uow(self) -> UnitOfWork: ) return uow - def resolve_handler_kwargs(self, kwarg_params) -> dict: + def resolve_handler_kwargs(self, handler_params: dict, arguments: dict) -> dict: """Match kwargs required by a function to attributes available in a unit of work""" kwargs = {} - for param_name, param_type in kwarg_params.items(): - for attr_name, attr_value in self.uow.__dict__.items(): - if attr_name == param_name or isinstance(attr_value, param_type): - kwargs[param_name] = attr_value + for param_name, param_type in handler_params.items(): + for arg_name, arg_value in arguments: + if arg_name == param_name or isinstance(arg_value, param_type): + kwargs[param_name] = arg_value return kwargs def publish_domain_events(self, events): for event in events: self._domain_event_dispatcher.dispatch(event=event) + def publish_integration_events(self, events): + for event in events: + raise NotImplementedError() + def handle_domain_event(self, event: type[DomainEvent]): """Execute all registered handlers within this module for this event type""" @@ -188,3 +206,13 @@ def handle_domain_event(self, event: type[DomainEvent]): ) if event_class is type(event): handler(event, self) + + def handle_integration_event(self, event: type[IntegrationEvent]): + """Execute all registered handlers within this module for this event type""" + + for handler in self.event_handlers: + event_class, handler_parameters = self.registry.inspect_handler_parameters( + handler + ) + if event_class is type(event): + handler(event, self) diff --git a/src/seedwork/application/registry.py b/src/seedwork/application/registry.py index 31a8b48..cae43a6 100644 --- a/src/seedwork/application/registry.py +++ b/src/seedwork/application/registry.py @@ -8,7 +8,7 @@ from seedwork.application.commands import Command from seedwork.application.queries import Query from seedwork.application.query_handlers import QueryResult -from seedwork.domain.events import DomainEvent +from seedwork.domain.events import DomainEvent, IntegrationEvent, SystemEvent from seedwork.domain.exceptions import BusinessRuleValidationException from seedwork.infrastructure.logging import logger @@ -49,9 +49,9 @@ def get_query_handler_for(self, query_class) -> dict: def get_query_handler_parameters_for(self, query_class) -> Callable: return self.query_handlers[query_class][1].copy() - def register_domain_event_handler( + def register_event_handler( self, - domain_event_class: type[DomainEvent], + event_class: type[SystemEvent], handler: Callable, handler_parameters, ): @@ -126,26 +126,32 @@ def decorator(*args, **kwargs): self.register_query_handler(query_class, decorator, handler_parameters) return decorator - def domain_event_handler(self, fn: Callable): + def _event_handler(self, fn: Callable, event_class): """Domain Event handler decorator""" @functools.wraps(fn) def decorator(*args, **kwargs): - event = find_object_of_class(args, DomainEvent) or find_object_of_class( + event = find_object_of_class(args, event_class) or find_object_of_class( kwargs.items(), DomainEvent ) print("handling event", f"{type(event).__module__}.{type(event).__name__}") return fn(*args, **kwargs) - domain_event_class, handler_parameters = self.inspect_handler_parameters(fn) + handler_event_class, handler_parameters = self.inspect_handler_parameters(fn) assert issubclass( - domain_event_class, DomainEvent + handler_event_class, event_class ), "The first parameter must be of type DomainEvent" - self.register_domain_event_handler( - domain_event_class, decorator, handler_parameters - ) + self.register_event_handler(handler_event_class, decorator, handler_parameters) return decorator + def domain_event_handler(self, fn: Callable): + """Domain Event handler decorator""" + return self._event_handler(fn, DomainEvent) + + def integration_event_handler(self, fn: Callable): + """Integration Event handler decorator""" + return self._event_handler(fn, IntegrationEvent) + def find_object_of_class(iterable, cls): for item in iterable: diff --git a/src/seedwork/domain/events.py b/src/seedwork/domain/events.py index e2b9228..e166db3 100644 --- a/src/seedwork/domain/events.py +++ b/src/seedwork/domain/events.py @@ -1,13 +1,13 @@ from pydantic import BaseModel -class Event(BaseModel): +class SystemEvent(BaseModel): pass -class DomainEvent(Event): +class DomainEvent(SystemEvent): pass -class IntegrationEvent(Event): +class IntegrationEvent(SystemEvent): pass diff --git a/src/seedwork/tests/application/test_two_modules_interacting.py b/src/seedwork/tests/application/test_two_modules_interacting_via_domain_events.py similarity index 100% rename from src/seedwork/tests/application/test_two_modules_interacting.py rename to src/seedwork/tests/application/test_two_modules_interacting_via_domain_events.py diff --git a/src/seedwork/tests/application/test_two_modules_interacting_via_integration_events.py b/src/seedwork/tests/application/test_two_modules_interacting_via_integration_events.py new file mode 100644 index 0000000..edbf52b --- /dev/null +++ b/src/seedwork/tests/application/test_two_modules_interacting_via_integration_events.py @@ -0,0 +1,134 @@ +from dataclasses import dataclass + +import pytest + +from seedwork.application.command_handlers import CommandResult +from seedwork.application.commands import Command +from seedwork.application.event_dispatcher import InMemoryEventDispatcher +from seedwork.application.message_broker import ( + InMemoryInbox, + InMemoryMessageBroker, + InMemoryOutbox, +) +from seedwork.application.modules import BusinessModule, UnitOfWork +from seedwork.application.registry import Registry +from seedwork.domain.events import IntegrationEvent + +ping_registry = Registry() +pong_registry = Registry() + +# Common / Contract + + +@dataclass +class CommonUnitOfWork(UnitOfWork): + history: list + + +class PingSent(IntegrationEvent): + message: str + + +class PongSent(IntegrationEvent): + reply: str + + +# Ping Module + + +@dataclass +class SendPing(Command): + message: str + + +@ping_registry.command_handler +def send_ping(command: SendPing, history): + history.append(f"send_ping: message is {command.message}") + return CommandResult.success(payload=None, event=PingSent(message=command.message)) + + +@ping_registry.integration_event_handler +def when_pong_received_sit_and_relax_policy( + event: PongSent, module: type[BusinessModule] +): + module.uow.history.append(f"PongSent") + + +class PingModule(BusinessModule): + registry = ping_registry + unit_of_work_class = CommonUnitOfWork + supported_commands = (SendPing,) + supported_queries = () + event_handlers = (when_pong_received_sit_and_relax_policy,) + + def get_unit_of_work_init_kwargs(self): + return dict(history=self.init_kwargs["history"]) + + +# Pong Module + + +@dataclass +class SendPong(Command): + message: str + reply: str + + +@pong_registry.command_handler +def send_pong(command: SendPong, history): + history.append(f"send_pong: reply to {command.message}i s {command.reply}") + return CommandResult.success(payload=None, event=PongSent()) + + +@pong_registry.integration_event_handler +def when_ping_received_send_pong_policy(event: PingSent, module: type[BusinessModule]): + module.uow.history.append(f"PingSent") + module.execute_command(SendPong()) + + +class PongModule(BusinessModule): + registry = pong_registry + unit_of_work_class = CommonUnitOfWork + supported_commands = (SendPong,) + supported_queries = () + event_handlers = (when_ping_received_send_pong_policy,) + + +@pytest.mark.integration +def test_handing_of_integration_event_across_modules(): + """ + In this scenario, a domain event published by ping module, + cannot be handled by pong module, because these modules + have separate unit of works. This is by design. + If such inter module communication is requires, is should be carried out + by integration events. + """ + history = [] + domain_event_dispatcher = InMemoryEventDispatcher() + integration_event_dispatcher = InMemoryEventDispatcher() + + ping_module = PingModule( + domain_event_dispatcher=domain_event_dispatcher, + inbox=InMemoryInbox(), + outbox=InMemoryOutbox(), + history=history, + ) + pong_module = PongModule( + domain_event_dispatcher=domain_event_dispatcher, + inbox=InMemoryInbox(), + outbox=InMemoryOutbox(), + history=history, + ) + + message_broker = InMemoryMessageBroker(modules=[ping_module, pong_module]) + + with ping_module.unit_of_work(): + ping_module.execute_command(SendPing(message="Greetings from Ping module")) + + assert ping_module.outbox[0] == PingSent() + assert len(pong_module.inbox) == 0 + + message_broker.deliver_messages() + + assert len(ping_module.outbox) == 0 + assert pong_module.inbox[0] == PingSent()