From 3f1a3e75f26ed2cc50868e219846c06756fa7210 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20G=C3=B3recki?= Date: Fri, 21 Jul 2023 13:14:11 +0200 Subject: [PATCH 1/4] add outbox, add outbox middleware to store integration events --- src/config/container.py | 22 ++++++--- .../bidding/application/event/__init__.py | 6 +-- .../bidding/application/event/eventual.py | 11 +++++ .../application/event/notify_outbid_winner.py | 8 ---- ...shed_start_auction.py => transactional.py} | 24 +++++++++- src/modules/catalog/application/__init__.py | 1 + .../catalog/application/event/__init__.py | 1 + .../do_nothing_when_listing_published.py | 2 + .../application/test_delete_listing_draft.py | 4 +- src/seedwork/application/__init__.py | 21 +++++++-- src/seedwork/application/events.py | 4 +- src/seedwork/application/inbox_outbox.py | 16 ++++++- src/seedwork/domain/events.py | 8 +++- src/seedwork/domain/value_objects.py | 5 ++ .../infrastructure/postgres_outbox.py | 47 +++++++++++++++++++ .../tests/application/test_application.py | 8 ++-- src/seedwork/tests/application/test_utils.py | 4 +- 17 files changed, 158 insertions(+), 34 deletions(-) create mode 100644 src/modules/bidding/application/event/eventual.py delete mode 100644 src/modules/bidding/application/event/notify_outbid_winner.py rename src/modules/bidding/application/event/{when_listing_is_published_start_auction.py => transactional.py} (51%) create mode 100644 src/seedwork/infrastructure/postgres_outbox.py diff --git a/src/config/container.py b/src/config/container.py index cad67bb..dba2b4f 100644 --- a/src/config/container.py +++ b/src/config/container.py @@ -1,6 +1,7 @@ import inspect import json import uuid +from logging import Logger from typing import Optional from uuid import UUID @@ -22,8 +23,8 @@ from modules.iam.application.services import IamService from modules.iam.infrastructure.repository import PostgresJsonUserRepository from seedwork.application import Application, DependencyProvider -from seedwork.application.inbox_outbox import InMemoryOutbox from seedwork.infrastructure.logging import logger +from seedwork.infrastructure.postgres_outbox import Outbox, PostgresOutbox def resolve_provider_by_type(container: Container, cls: type) -> Optional[Provider]: @@ -114,6 +115,7 @@ def on_enter_transaction_context(ctx): @application.on_exit_transaction_context def on_exit_transaction_context(ctx, exc_type, exc_val, exc_tb): session = ctx.dependency_provider.get_dependency("db_session") + if exc_type: session.rollback() logger.debug(f"rollback due to {exc_type}") @@ -125,7 +127,7 @@ def on_exit_transaction_context(ctx, exc_type, exc_val, exc_tb): logger.correlation_id.set(uuid.UUID(int=0)) @application.transaction_middleware - def logging_middleware(ctx, next, command=None, query=None, event=None): + def logging_middleware(ctx, call_next, command=None, query=None, event=None): if command: prefix = "Executing" task = command @@ -135,13 +137,20 @@ def logging_middleware(ctx, next, command=None, query=None, event=None): elif event: prefix = "Handling" task = event - task = command or query or event session = ctx.dependency_provider.get_dependency("db_session") logger.info(f"{id(session)} {prefix} {task}") - result = next() + result = call_next() logger.info(f"{id(session)} {prefix} completed, result: {result}") return result + @application.transaction_middleware + def outbox_middleware(ctx, call_next, command=None, query=None, event=None): + result = call_next() + outbox = ctx.get_dependency(Outbox) + for event in ctx.collect_integration_events(): + outbox.add(event) + return result + return application @@ -153,9 +162,7 @@ class TransactionContainer(containers.DeclarativeContainer): correlation_id = providers.Dependency(instance_of=UUID) db_session = providers.Dependency(instance_of=Session) - logger = providers.Dependency() - - outbox = providers.Singleton(InMemoryOutbox) + logger = providers.Dependency(instance_of=Logger) catalog_listing_repository = providers.Singleton( CatalogPostgresJsonListingRepository, @@ -173,6 +180,7 @@ class TransactionContainer(containers.DeclarativeContainer): ) iam_service = providers.Singleton(IamService, user_repository=user_repository) + outbox = providers.Singleton(PostgresOutbox, db_session=db_session) class TopLevelContainer(containers.DeclarativeContainer): diff --git a/src/modules/bidding/application/event/__init__.py b/src/modules/bidding/application/event/__init__.py index ff1954c..1b1eba1 100644 --- a/src/modules/bidding/application/event/__init__.py +++ b/src/modules/bidding/application/event/__init__.py @@ -1,4 +1,2 @@ -from .notify_outbid_winner import notify_outbid_winner -from .when_listing_is_published_start_auction import ( - when_listing_is_published_start_auction, -) +from .eventual import * +from .transactional import * diff --git a/src/modules/bidding/application/event/eventual.py b/src/modules/bidding/application/event/eventual.py new file mode 100644 index 0000000..2c36f62 --- /dev/null +++ b/src/modules/bidding/application/event/eventual.py @@ -0,0 +1,11 @@ +from modules.bidding.application import bidding_module +from modules.bidding.application.event.transactional import ( + SendEmailToSellerThatBidWasPlaced, +) + + +@bidding_module.integration_event_handler +def send_email_to_seller_that_bid_was_placed(event: SendEmailToSellerThatBidWasPlaced): + print("send_email_to_seller_that_bid_was_placed") + print(event.listing_id) + print(event.bidder_id) diff --git a/src/modules/bidding/application/event/notify_outbid_winner.py b/src/modules/bidding/application/event/notify_outbid_winner.py deleted file mode 100644 index f6a34f0..0000000 --- a/src/modules/bidding/application/event/notify_outbid_winner.py +++ /dev/null @@ -1,8 +0,0 @@ -from modules.bidding.application import bidding_module -from modules.bidding.domain.events import BidWasPlaced -from seedwork.infrastructure.logging import logger - - -@bidding_module.domain_event_handler -def notify_outbid_winner(event: BidWasPlaced): - logger.info(f"Message from a handler: Listing {event.listing_id} was published") diff --git a/src/modules/bidding/application/event/when_listing_is_published_start_auction.py b/src/modules/bidding/application/event/transactional.py similarity index 51% rename from src/modules/bidding/application/event/when_listing_is_published_start_auction.py rename to src/modules/bidding/application/event/transactional.py index f54bab1..7af77c5 100644 --- a/src/modules/bidding/application/event/when_listing_is_published_start_auction.py +++ b/src/modules/bidding/application/event/transactional.py @@ -2,16 +2,36 @@ from modules.bidding.application import bidding_module from modules.bidding.domain.entities import Listing +from modules.bidding.domain.events import BidWasPlaced from modules.bidding.domain.repositories import ListingRepository from modules.bidding.domain.value_objects import Seller from modules.catalog.domain.events import ListingPublishedEvent -from seedwork.application import EventResult +from seedwork.application.events import IntegrationEvent +from seedwork.domain.value_objects import GenericUUID + + +class SendEmailToSellerThatBidWasPlaced(IntegrationEvent): + listing_id: GenericUUID + bidder_id: GenericUUID + + +@bidding_module.domain_event_handler +def notify_outbid_winner(event: BidWasPlaced, logger): + logger.info(f"Message from a handler: Listing {event.listing_id} was published") + + +@bidding_module.domain_event_handler +def notify_seller_of_new_bid(event: BidWasPlaced, logger): + logger.info("New bid was placed") + return SendEmailToSellerThatBidWasPlaced( + listing_id=event.listing_id, bidder_id=event.bidder_id + ) @bidding_module.domain_event_handler def when_listing_is_published_start_auction( event: ListingPublishedEvent, listing_repository: ListingRepository -) -> EventResult: +): listing = Listing( id=event.listing_id, seller=Seller(id=event.seller_id), diff --git a/src/modules/catalog/application/__init__.py b/src/modules/catalog/application/__init__.py index 07c61da..a04f734 100644 --- a/src/modules/catalog/application/__init__.py +++ b/src/modules/catalog/application/__init__.py @@ -3,3 +3,4 @@ catalog_module = ApplicationModule("catalog") catalog_module.import_from("modules.catalog.application.command") catalog_module.import_from("modules.catalog.application.query") +catalog_module.import_from("modules.catalog.application.event") diff --git a/src/modules/catalog/application/event/__init__.py b/src/modules/catalog/application/event/__init__.py index e69de29..4d7ab7d 100644 --- a/src/modules/catalog/application/event/__init__.py +++ b/src/modules/catalog/application/event/__init__.py @@ -0,0 +1 @@ +from .do_nothing_when_listing_published import do_nothing_when_listing_published diff --git a/src/modules/catalog/application/event/do_nothing_when_listing_published.py b/src/modules/catalog/application/event/do_nothing_when_listing_published.py index 9d8d1e1..40dc059 100644 --- a/src/modules/catalog/application/event/do_nothing_when_listing_published.py +++ b/src/modules/catalog/application/event/do_nothing_when_listing_published.py @@ -1,6 +1,8 @@ +from modules.catalog.application import catalog_module from modules.catalog.domain.events import ListingPublishedEvent from seedwork.infrastructure.logging import logger +@catalog_module.domain_event_handler def do_nothing_when_listing_published(event: ListingPublishedEvent): logger.info(f"Message from a handler: Listing {event.listing_id} was published") diff --git a/src/modules/catalog/tests/application/test_delete_listing_draft.py b/src/modules/catalog/tests/application/test_delete_listing_draft.py index 4e96b79..ae59047 100644 --- a/src/modules/catalog/tests/application/test_delete_listing_draft.py +++ b/src/modules/catalog/tests/application/test_delete_listing_draft.py @@ -38,7 +38,9 @@ def test_delete_listing_draft(): # assert assert result.is_success() - assert result.events == [ListingDraftDeletedEvent(listing_id=listing.id)] + assert len(result.events) == 1 + assert result.events[0].__class__ is ListingDraftDeletedEvent + assert result.events[0].listing_id == listing_id @pytest.mark.integration diff --git a/src/seedwork/application/__init__.py b/src/seedwork/application/__init__.py index c059736..589c5ba 100644 --- a/src/seedwork/application/__init__.py +++ b/src/seedwork/application/__init__.py @@ -171,7 +171,7 @@ def execute_command(self, command) -> CommandResult: while len(event_queue) > 0: event = event_queue.pop(0) if isinstance(event, IntegrationEvent): - self.collect_integration_event(event) + self.store_integration_event(event) elif isinstance(event, DomainEvent): event_results = self.handle_domain_event(event) @@ -196,12 +196,21 @@ def handle_domain_event(self, event) -> EventResultSet: event_results.append(event_result) return EventResultSet(event_results) - def collect_integration_event(self, event): + def store_integration_event(self, event): self.integration_events.append(event) + def collect_integration_events(self) -> list[IntegrationEvent]: + integration_events = self.integration_events + self.integration_events = [] + return integration_events + + def get_dependency(self, identifier: Any) -> Any: + """Get a dependency from the dependency provider""" + return self.dependency_provider.get_dependency(identifier) + def get_service(self, service_cls) -> Any: """Get a dependency from the dependency provider""" - return self.dependency_provider.get_dependency(service_cls) + return self.get_dependency(service_cls) def __getitem__(self, item) -> Any: return self.get_service(item) @@ -237,6 +246,12 @@ def domain_event_handler(self, handler_func): self.event_handlers[event_cls].add(handler_func) return handler_func + def integration_event_handler(self, handler_func): + """Event handler decorator""" + event_cls, _ = get_function_arguments(handler_func) + self.event_handlers[event_cls].add(handler_func) + return handler_func + def import_from(self, module_name): importlib.import_module(module_name) diff --git a/src/seedwork/application/events.py b/src/seedwork/application/events.py index e8084c9..8efeb08 100644 --- a/src/seedwork/application/events.py +++ b/src/seedwork/application/events.py @@ -2,7 +2,7 @@ from dataclasses import dataclass, field from typing import Any -from pydantic import BaseModel +from pydantic import BaseModel, Field from seedwork.domain.type_hints import DomainEvent from seedwork.domain.value_objects import GenericUUID @@ -19,6 +19,8 @@ class IntegrationEvent(BaseModel): As a result, integration events are handled asynchronously. """ + event_id: GenericUUID = Field(default_factory=GenericUUID) + @dataclass class EventResult: diff --git a/src/seedwork/application/inbox_outbox.py b/src/seedwork/application/inbox_outbox.py index 9d4ceba..5ef94b7 100644 --- a/src/seedwork/application/inbox_outbox.py +++ b/src/seedwork/application/inbox_outbox.py @@ -1,4 +1,16 @@ +import abc import contextlib +from typing import Optional + +from seedwork.application.events import IntegrationEvent +from seedwork.domain.entities import AggregateRoot + + +class Outbox(abc.ABC): + @abc.abstractmethod + def add(self, event: IntegrationEvent, source: Optional[AggregateRoot] = None): + """Add event to the outbox""" + raise NotImplementedError() class InMemoryInbox: @@ -24,9 +36,9 @@ def should_process_next_event(self): return not self.inbox.is_empty() -class InMemoryOutbox: +class InMemoryOutbox(Outbox): def __init__(self): self.events = [] - def save(self, event): + def add(self, event: IntegrationEvent, source: Optional[AggregateRoot] = None): self.events.append(event) diff --git a/src/seedwork/domain/events.py b/src/seedwork/domain/events.py index 4071fd7..c0072f4 100644 --- a/src/seedwork/domain/events.py +++ b/src/seedwork/domain/events.py @@ -1,4 +1,6 @@ -from pydantic import BaseModel +from pydantic import BaseModel, Field + +from seedwork.domain.value_objects import GenericUUID class DomainEvent(BaseModel): @@ -7,6 +9,10 @@ class DomainEvent(BaseModel): Domain events are synchronous in nature. """ + event_id: GenericUUID = Field(default_factory=GenericUUID) + # correlation_id: GenericUUID + # causation_id: GenericUUID + def __next__(self): yield self diff --git a/src/seedwork/domain/value_objects.py b/src/seedwork/domain/value_objects.py index cb403b2..ce5a2a6 100644 --- a/src/seedwork/domain/value_objects.py +++ b/src/seedwork/domain/value_objects.py @@ -4,6 +4,11 @@ class GenericUUID(uuid.UUID): + def __init__(self, *args, **kwargs): + if not args and not kwargs: + kwargs["int"] = uuid.uuid4().int + super().__init__(*args, **kwargs) + @classmethod def next_id(cls): return cls(int=uuid.uuid4().int) diff --git a/src/seedwork/infrastructure/postgres_outbox.py b/src/seedwork/infrastructure/postgres_outbox.py new file mode 100644 index 0000000..e4f4ceb --- /dev/null +++ b/src/seedwork/infrastructure/postgres_outbox.py @@ -0,0 +1,47 @@ +from datetime import datetime +from typing import Optional + +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import Session +from sqlalchemy.sql.schema import Column +from sqlalchemy.sql.sqltypes import JSON, DateTime, Enum, String + +from seedwork.application.events import IntegrationEvent +from seedwork.application.inbox_outbox import Outbox +from seedwork.domain.entities import AggregateRoot +from seedwork.infrastructure.database import Base + + +class OutboxMessageStatus(str, Enum): + PENDING = "pending" + SENT = "sent" + FAILED = "failed" + + +class OutboxMessage(Base): + __tablename__ = "outbox_messages" + + id = Column(UUID(as_uuid=True), primary_key=True) + event_type = Column(String, nullable=False) + event_data = Column(JSON, nullable=False) + created_at = Column(DateTime, nullable=False, default=datetime.utcnow) + aggregate_id = Column(UUID(as_uuid=True), nullable=True) + aggregate_type = Column(String, nullable=True) + status = Column(String, nullable=False, default=OutboxMessageStatus.PENDING) + + +class PostgresOutbox(Outbox): + def __init__(self, db_session: Session): + self._session = db_session + + def add( + self, event: IntegrationEvent, source: Optional[AggregateRoot] = None + ) -> None: + message = OutboxMessage( + id=event.event_id, + event_type=str(event.__class__), + event_data=event.__dict__, + aggregate_type=type(source).__name__ if source else None, + aggregate_id=source.id if source else None, + ) + self._session.add(message) diff --git a/src/seedwork/tests/application/test_application.py b/src/seedwork/tests/application/test_application.py index 8b58d69..b5a7f68 100644 --- a/src/seedwork/tests/application/test_application.py +++ b/src/seedwork/tests/application/test_application.py @@ -113,14 +113,14 @@ def test_transaction_context_middleware(): app = Application(trace=[]) @app.transaction_middleware - def middleware1(ctx, next, command=None, query=None, event=None): + def middleware1(ctx, call_next, command=None, query=None, event=None): ctx.dependency_provider["trace"].append("middleware1") - return next() + return call_next() @app.transaction_middleware - def middleware1(ctx, next, command=None, query=None, event=None): + def middleware1(ctx, call_next, command=None, query=None, event=None): ctx.dependency_provider["trace"].append("middleware2") - return next() + return call_next() @app.command_handler def handle_ping(command: SendPing): diff --git a/src/seedwork/tests/application/test_utils.py b/src/seedwork/tests/application/test_utils.py index 34ef1c0..bfab939 100644 --- a/src/seedwork/tests/application/test_utils.py +++ b/src/seedwork/tests/application/test_utils.py @@ -12,7 +12,9 @@ def test_successful_command_result_as_event_result(): event_result = as_event_result(command_result) assert event_result.is_success() assert event_result.payload == "foo" - assert event_result.events == [FooEvent()] + assert ( + len(event_result.events) == 1 and event_result.events[0].__class__ is FooEvent + ) assert event_result.errors == [] From a19c6d352e97aa3e1e8eca6ee6182b5d6874d748 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20G=C3=B3recki?= Date: Wed, 9 Aug 2023 08:56:05 +0200 Subject: [PATCH 2/4] fix overrides bugs in ctx.call --- src/seedwork/application/__init__.py | 99 +++++++++++++------ .../tests/application/test_application.py | 52 ++++++++++ 2 files changed, 123 insertions(+), 28 deletions(-) diff --git a/src/seedwork/application/__init__.py b/src/seedwork/application/__init__.py index 589c5ba..9d5b714 100644 --- a/src/seedwork/application/__init__.py +++ b/src/seedwork/application/__init__.py @@ -1,6 +1,6 @@ import importlib import inspect -from collections import defaultdict +from collections import defaultdict, OrderedDict from functools import partial from typing import Any, Type, TypeVar @@ -19,11 +19,19 @@ def get_function_arguments(func): handler_signature = inspect.signature(func) kwargs_iterator = iter(handler_signature.parameters.items()) - _, first_param = next(kwargs_iterator) - first_parameter = first_param.annotation + parameters = OrderedDict() + for name, param in kwargs_iterator: + parameters[name] = param.annotation + return parameters + + +def get_handler_arguments(func): + parameters = get_function_arguments(func) + kwargs_iterator = iter(parameters.items()) + _, first_parameter = next(kwargs_iterator) remaining_parameters = {} for name, param in kwargs_iterator: - remaining_parameters[name] = param.annotation + remaining_parameters[name] = param return first_parameter, remaining_parameters @@ -54,33 +62,40 @@ def register_dependency(self, identifier, dependency_instance): def get_dependency(self, identifier): return self.dependencies[identifier] - def _get_arguments(self, func): - return get_function_arguments(func) - - def _resolve_arguments(self, handler_parameters) -> dict: + def _resolve_arguments(self, handler_parameters, overrides) -> dict: """Match handler_parameters with dependencies""" + + def _resolve(identifier, overrides): + if identifier in overrides: + return overrides[identifier] + return self.get_dependency(identifier) + kwargs = {} for param_name, param_type in handler_parameters.items(): + # first, try to resolve by type + if param_type is not inspect._empty: + try: + kwargs[param_name] = _resolve(param_type, overrides) + continue + except (ValueError, KeyError): + pass + # then, try to resolve by name try: - if param_type is inspect._empty: - raise ValueError("No type annotation") - kwargs[param_name] = self.get_dependency(param_type) - continue - except (ValueError, KeyError): - pass - - try: - kwargs[param_name] = self.get_dependency(param_name) + kwargs[param_name] = _resolve(param_name, overrides) continue except (ValueError, KeyError): pass return kwargs - def get_handler_kwargs(self, func, **overrides): - _, handler_parameters = self._get_arguments(func) - kwargs = self._resolve_arguments(handler_parameters) - kwargs.update(**overrides) + def get_function_kwargs(self, func, overrides=None): + func_parameters = get_function_arguments(func) + kwargs = self._resolve_arguments(func_parameters, overrides or {}) + return kwargs + + def get_handler_kwargs(self, func, overrides=None): + _, handler_parameters = get_handler_arguments(func) + kwargs = self._resolve_arguments(handler_parameters, overrides or {}) return kwargs def __getitem__(self, key): @@ -126,6 +141,30 @@ def _wrap_with_middlewares( for middleware in self.app._transaction_middlewares: p = partial(middleware, self, p, command, query, event) return p + + def _get_overrides(self, **kwargs): + overrides = dict(ctx=self) + overrides.update(self.overrides) + overrides.update(kwargs) + + type_match = defaultdict(list) + for name, value in overrides.items(): + type_match[type(value)].append(value) + with_unique_type = dict((k, v[0]) for k, v in type_match.items() if len(v) == 1) + + overrides.update(with_unique_type) + + return overrides + + def call(self, handler_func, **kwargs): + overrides = self._get_overrides(**kwargs) + handler_kwargs = self.dependency_provider.get_function_kwargs( + handler_func, overrides + ) + p = partial(handler_func, **handler_kwargs) + wrapped_handler = self._wrap_with_middlewares(p) + result = wrapped_handler() + return result def execute_query(self, query) -> QueryResult: assert ( @@ -135,7 +174,7 @@ def execute_query(self, query) -> QueryResult: handler_func = self.app.get_query_handler(query) handler_kwargs = self.dependency_provider.get_handler_kwargs( - handler_func, **self.overrides + handler_func, self._get_overrides() ) p = partial(handler_func, query, **handler_kwargs) wrapped_handler = self._wrap_with_middlewares(p, query=query) @@ -153,7 +192,7 @@ def execute_command(self, command) -> CommandResult: handler_func = self.app.get_command_handler(command) handler_kwargs = self.dependency_provider.get_handler_kwargs( - handler_func, **self.overrides + handler_func, self._get_overrides() ) p = partial(handler_func, command, **handler_kwargs) wrapped_handler = self._wrap_with_middlewares(p, command=command) @@ -184,7 +223,7 @@ def handle_domain_event(self, event) -> EventResultSet: event_results = [] for handler_func in self.app.get_event_handlers(event): handler_kwargs = self.dependency_provider.get_handler_kwargs( - handler_func, **self.overrides + handler_func, self._get_overrides() ) p = partial(handler_func, event, **handler_kwargs) wrapped_handler = self._wrap_with_middlewares(p, event=event) @@ -196,6 +235,10 @@ def handle_domain_event(self, event) -> EventResultSet: event_results.append(event_result) return EventResultSet(event_results) + def handle_integration_event(self, event): + # TODO: do we need to handle domain and integration events differently??? + return self.handle_domain_event(event) + def store_integration_event(self, event): self.integration_events.append(event) @@ -230,25 +273,25 @@ def __init__(self, name, version=1.0): def query_handler(self, handler_func): """Query handler decorator""" - query_cls, _ = get_function_arguments(handler_func) + query_cls, _ = get_handler_arguments(handler_func) self.query_handlers[query_cls] = handler_func return handler_func def command_handler(self, handler_func): """Command handler decorator""" - command_cls, _ = get_function_arguments(handler_func) + command_cls, _ = get_handler_arguments(handler_func) self.command_handlers[command_cls] = handler_func return handler_func def domain_event_handler(self, handler_func): """Event handler decorator""" - event_cls, _ = get_function_arguments(handler_func) + event_cls, _ = get_handler_arguments(handler_func) self.event_handlers[event_cls].add(handler_func) return handler_func def integration_event_handler(self, handler_func): """Event handler decorator""" - event_cls, _ = get_function_arguments(handler_func) + event_cls, _ = get_handler_arguments(handler_func) self.event_handlers[event_cls].add(handler_func) return handler_func diff --git a/src/seedwork/tests/application/test_application.py b/src/seedwork/tests/application/test_application.py index b5a7f68..44d9ae1 100644 --- a/src/seedwork/tests/application/test_application.py +++ b/src/seedwork/tests/application/test_application.py @@ -142,3 +142,55 @@ def handle_ping(command: SendPing, missing_dependency): with pytest.raises(TypeError): app.execute_command(SendPing()) + + +@pytest.mark.unit +def test_call_any_function(): + def some_function(foo, bar): + return foo + bar + + app = Application() + + with app.transaction_context(foo=1, bar=0) as ctx: + result = ctx.call(some_function, bar=2) + + assert result == 3 + + +@pytest.mark.unit +def test_call_with_no_dependencies(): + def foo(): + return True + + app = Application() + + with app.transaction_context() as ctx: + result = ctx.call(foo) + + assert result is True + + +@pytest.mark.unit +def test_call_skips_unneeded_dependencies(): + def foo(): + return True + + app = Application() + + with app.transaction_context(bar=1) as ctx: + result = ctx.call(foo) + + assert result is True + + +@pytest.mark.unit +def test_call_injects_self(): + def get_name(ctx): + return ctx.__class__.__name__ + + app = Application() + + with app.transaction_context(foo=1, bar=0) as ctx: + result = ctx.call(get_name) + + assert result == "TransactionContext" \ No newline at end of file From 384d614160b8c6580839782c146f05ef1ead0f07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20G=C3=B3recki?= Date: Fri, 11 Aug 2023 08:59:47 +0200 Subject: [PATCH 3/4] WIP: inbox --- poetry.lock | 752 +++++++++++++++--- pyproject.toml | 7 +- src/config/container.py | 14 + .../bidding/application/event/eventual.py | 6 +- .../application/event/transactional.py | 4 +- .../test_seller_is_notified_of_new_bid.py | 20 + src/seedwork/application/__init__.py | 20 +- src/seedwork/application/commands.py | 3 +- src/seedwork/application/inbox_outbox.py | 155 +++- .../infrastructure/postgres_outbox.py | 32 +- .../tests/application/test_application.py | 16 +- .../tests/application/test_inbox_outbox.py | 26 + src/worker/__init__.py | 36 + 13 files changed, 942 insertions(+), 149 deletions(-) create mode 100644 src/modules/bidding/tests/application/test_seller_is_notified_of_new_bid.py create mode 100644 src/seedwork/tests/application/test_inbox_outbox.py create mode 100644 src/worker/__init__.py diff --git a/poetry.lock b/poetry.lock index 958d601..e006b9b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,3 +1,34 @@ +[[package]] +name = "aiohttp" +version = "3.8.5" +description = "Async http client/server framework (asyncio)" +category = "dev" +optional = false +python-versions = ">=3.6" + +[package.dependencies] +aiosignal = ">=1.1.2" +async-timeout = ">=4.0.0a3,<5.0" +attrs = ">=17.3.0" +charset-normalizer = ">=2.0,<4.0" +frozenlist = ">=1.1.1" +multidict = ">=4.5,<7.0" +yarl = ">=1.0,<2.0" + +[package.extras] +speedups = ["aiodns", "brotli", "cchardet"] + +[[package]] +name = "aiosignal" +version = "1.3.1" +description = "aiosignal: a list of registered asynchronous callbacks" +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +frozenlist = ">=1.1.0" + [[package]] name = "alembic" version = "1.8.1" @@ -13,6 +44,17 @@ SQLAlchemy = ">=1.3.0" [package.extras] tz = ["python-dateutil"] +[[package]] +name = "amqp" +version = "5.1.1" +description = "Low-level AMQP client for Python (fork of amqplib)." +category = "main" +optional = false +python-versions = ">=3.6" + +[package.dependencies] +vine = ">=5.0.0" + [[package]] name = "anyio" version = "3.6.2" @@ -42,26 +84,27 @@ python-versions = ">=3.7" tests = ["pytest", "pytest-asyncio", "mypy (>=0.800)"] [[package]] -name = "atomicwrites" -version = "1.4.1" -description = "Atomic file writes." -category = "main" +name = "async-timeout" +version = "4.0.2" +description = "Timeout context manager for asyncio programs" +category = "dev" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +python-versions = ">=3.6" [[package]] name = "attrs" -version = "22.1.0" +version = "23.1.0" description = "Classes Without Boilerplate" -category = "main" +category = "dev" optional = false -python-versions = ">=3.5" +python-versions = ">=3.7" [package.extras] -dev = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "mypy (>=0.900,!=0.940)", "pytest-mypy-plugins", "zope.interface", "furo", "sphinx", "sphinx-notfound-page", "pre-commit", "cloudpickle"] -docs = ["furo", "sphinx", "zope.interface", "sphinx-notfound-page"] -tests = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "mypy (>=0.900,!=0.940)", "pytest-mypy-plugins", "zope.interface", "cloudpickle"] -tests_no_zope = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "mypy (>=0.900,!=0.940)", "pytest-mypy-plugins", "cloudpickle"] +cov = ["attrs", "coverage[toml] (>=5.3)"] +dev = ["attrs", "pre-commit"] +docs = ["furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphinxcontrib-towncrier", "towncrier", "zope-interface"] +tests = ["attrs", "zope-interface"] +tests-no-zope = ["cloudpickle", "hypothesis", "mypy (>=1.1.1)", "pympler", "pytest-mypy-plugins", "pytest-xdist", "pytest (>=4.3.0)"] [[package]] name = "bcrypt" @@ -75,35 +118,91 @@ python-versions = ">=3.6" tests = ["pytest (>=3.2.1,!=3.3.0)"] typecheck = ["mypy"] +[[package]] +name = "billiard" +version = "4.1.0" +description = "Python multiprocessing fork with improvements and bugfixes" +category = "main" +optional = false +python-versions = ">=3.7" + [[package]] name = "black" -version = "21.12b0" +version = "23.7.0" description = "The uncompromising code formatter." -category = "main" +category = "dev" optional = false -python-versions = ">=3.6.2" +python-versions = ">=3.8" [package.dependencies] -click = ">=7.1.2" +click = ">=8.0.0" mypy-extensions = ">=0.4.3" -pathspec = ">=0.9.0,<1" +packaging = ">=22.0" +pathspec = ">=0.9.0" platformdirs = ">=2" -tomli = ">=0.2.6,<2.0.0" -typing-extensions = [ - {version = ">=3.10.0.0", markers = "python_version < \"3.10\""}, - {version = "!=3.10.0.1", markers = "python_version >= \"3.10\""}, -] +tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""} [package.extras] colorama = ["colorama (>=0.4.3)"] d = ["aiohttp (>=3.7.4)"] jupyter = ["ipython (>=7.8.0)", "tokenize-rt (>=3.2.0)"] -python2 = ["typed-ast (>=1.4.3)"] uvloop = ["uvloop (>=0.15.2)"] +[[package]] +name = "celery" +version = "5.3.1" +description = "Distributed Task Queue." +category = "main" +optional = false +python-versions = ">=3.8" + +[package.dependencies] +billiard = ">=4.1.0,<5.0" +click = ">=8.1.2,<9.0" +click-didyoumean = ">=0.3.0" +click-plugins = ">=1.1.1" +click-repl = ">=0.2.0" +kombu = ">=5.3.1,<6.0" +python-dateutil = ">=2.8.2" +tzdata = ">=2022.7" +vine = ">=5.0.0,<6.0" + +[package.extras] +arangodb = ["pyArango (>=2.0.1)"] +auth = ["cryptography (==41.0.1)"] +azureblockblob = ["azure-storage-blob (>=12.15.0)"] +brotli = ["brotli (>=1.0.0)", "brotlipy (>=0.7.0)"] +cassandra = ["cassandra-driver (>=3.25.0,<4)"] +consul = ["python-consul2 (==0.1.5)"] +cosmosdbsql = ["pydocumentdb (==2.3.5)"] +couchbase = ["couchbase (>=3.0.0)"] +couchdb = ["pycouchdb (==1.14.2)"] +django = ["Django (>=2.2.28)"] +dynamodb = ["boto3 (>=1.26.143)"] +elasticsearch = ["elasticsearch (<8.0)"] +eventlet = ["eventlet (>=0.32.0)"] +gevent = ["gevent (>=1.5.0)"] +librabbitmq = ["librabbitmq (>=2.0.0)"] +memcache = ["pylibmc (==1.6.3)"] +mongodb = ["pymongo[srv] (>=4.0.2)"] +msgpack = ["msgpack (==1.0.5)"] +pymemcache = ["python-memcached (==1.59)"] +pyro = ["pyro4 (==4.82)"] +pytest = ["pytest-celery (==0.0.0)"] +redis = ["redis (>=4.5.2,!=4.5.5)"] +s3 = ["boto3 (>=1.26.143)"] +slmq = ["softlayer-messaging (>=1.0.3)"] +solar = ["ephem (==4.1.4)"] +sqlalchemy = ["sqlalchemy (>=1.4.48,<2.1)"] +sqs = ["boto3 (>=1.26.143)", "kombu[sqs] (>=5.3.0)", "urllib3 (>=1.26.16)", "pycurl (>=7.43.0.5)"] +tblib = ["tblib (>=1.3.0)", "tblib (>=1.5.0)"] +yaml = ["PyYAML (>=3.10)"] +zookeeper = ["kazoo (>=1.3.1)"] +zstd = ["zstandard (==0.21.0)"] + [[package]] name = "certifi" -version = "2022.12.7" +version = "2023.7.22" description = "Python package for providing Mozilla's CA Bundle." category = "main" optional = false @@ -119,26 +218,63 @@ python-versions = ">=3.6.1" [[package]] name = "charset-normalizer" -version = "2.1.1" +version = "3.2.0" description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet." category = "main" optional = false -python-versions = ">=3.6.0" - -[package.extras] -unicode_backport = ["unicodedata2"] +python-versions = ">=3.7.0" [[package]] name = "click" -version = "8.0.4" +version = "8.1.6" description = "Composable command line interface toolkit" category = "main" optional = false -python-versions = ">=3.6" +python-versions = ">=3.7" [package.dependencies] colorama = {version = "*", markers = "platform_system == \"Windows\""} +[[package]] +name = "click-didyoumean" +version = "0.3.0" +description = "Enables git-like *did-you-mean* feature in click" +category = "main" +optional = false +python-versions = ">=3.6.2,<4.0.0" + +[package.dependencies] +click = ">=7" + +[[package]] +name = "click-plugins" +version = "1.1.1" +description = "An extension module for click to enable registering CLI commands via setuptools entry-points." +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +click = ">=4.0" + +[package.extras] +dev = ["pytest (>=3.6)", "pytest-cov", "wheel", "coveralls"] + +[[package]] +name = "click-repl" +version = "0.3.0" +description = "REPL plugin for Click" +category = "main" +optional = false +python-versions = ">=3.6" + +[package.dependencies] +click = ">=7.0" +prompt-toolkit = ">=3.0.36" + +[package.extras] +testing = ["pytest-cov (>=4.0.0)", "pytest (>=7.2.1)", "tox (>=4.4.3)"] + [[package]] name = "colorama" version = "0.4.6" @@ -188,12 +324,23 @@ yaml = ["pyyaml"] [[package]] name = "distlib" -version = "0.3.6" +version = "0.3.7" description = "Distribution utilities" category = "main" optional = false python-versions = "*" +[[package]] +name = "exceptiongroup" +version = "1.1.2" +description = "Backport of PEP 654 (exception groups)" +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.extras] +test = ["pytest (>=6)"] + [[package]] name = "fastapi" version = "0.95.2" @@ -214,15 +361,40 @@ test = ["anyio[trio] (>=3.2.1,<4.0.0)", "black (==23.1.0)", "coverage[toml] (>=6 [[package]] name = "filelock" -version = "3.8.2" +version = "3.12.2" description = "A platform independent file lock." category = "main" optional = false python-versions = ">=3.7" [package.extras] -docs = ["furo (>=2022.9.29)", "sphinx (>=5.3)", "sphinx-autodoc-typehints (>=1.19.5)"] -testing = ["covdefaults (>=2.2.2)", "coverage (>=6.5)", "pytest (>=7.2)", "pytest-cov (>=4)", "pytest-timeout (>=2.1)"] +docs = ["furo (>=2023.5.20)", "sphinx-autodoc-typehints (>=1.23,!=1.23.4)", "sphinx (>=7.0.1)"] +testing = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "diff-cover (>=7.5)", "pytest-cov (>=4.1)", "pytest-mock (>=3.10)", "pytest-timeout (>=2.1)", "pytest (>=7.3.1)"] + +[[package]] +name = "fire" +version = "0.5.0" +description = "A library for automatically generating command line interfaces." +category = "dev" +optional = false +python-versions = "*" + +[package.dependencies] +six = "*" +termcolor = "*" + +[[package]] +name = "flake8" +version = "6.0.0" +description = "the modular source code checker: pep8 pyflakes and co" +category = "dev" +optional = false +python-versions = ">=3.8.1" + +[package.dependencies] +mccabe = ">=0.7.0,<0.8.0" +pycodestyle = ">=2.10.0,<2.11.0" +pyflakes = ">=3.0.0,<3.1.0" [[package]] name = "freezegun" @@ -235,6 +407,36 @@ python-versions = ">=3.6" [package.dependencies] python-dateutil = ">=2.7" +[[package]] +name = "frozenlist" +version = "1.4.0" +description = "A list-like structure which implements collections.abc.MutableSequence" +category = "dev" +optional = false +python-versions = ">=3.8" + +[[package]] +name = "gitdb" +version = "4.0.10" +description = "Git Object Database" +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +smmap = ">=3.0.1,<6" + +[[package]] +name = "gitpython" +version = "3.1.32" +description = "GitPython is a Python library used to interact with Git repositories" +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +gitdb = ">=4.0.1,<5" + [[package]] name = "greenlet" version = "2.0.1" @@ -314,11 +516,54 @@ python-versions = ">=3.5" [[package]] name = "iniconfig" -version = "1.1.1" -description = "iniconfig: brain-dead simple config-ini parsing" +version = "2.0.0" +description = "brain-dead simple config-ini parsing" +category = "dev" +optional = false +python-versions = ">=3.7" + +[[package]] +name = "isort" +version = "5.12.0" +description = "A Python utility / library to sort Python imports." +category = "dev" +optional = false +python-versions = ">=3.8.0" + +[package.extras] +colors = ["colorama (>=0.4.3)"] +requirements-deprecated-finder = ["pip-api", "pipreqs"] +pipfile-deprecated-finder = ["pip-shims (>=0.5.2)", "pipreqs", "requirementslib"] +plugins = ["setuptools"] + +[[package]] +name = "kombu" +version = "5.3.1" +description = "Messaging library for Python." category = "main" optional = false -python-versions = "*" +python-versions = ">=3.8" + +[package.dependencies] +amqp = ">=5.1.1,<6.0.0" +vine = "*" + +[package.extras] +azureservicebus = ["azure-servicebus (>=7.10.0)"] +azurestoragequeues = ["azure-identity (>=1.12.0)", "azure-storage-queue (>=12.6.0)"] +confluentkafka = ["confluent-kafka (==2.1.1)"] +consul = ["python-consul2"] +librabbitmq = ["librabbitmq (>=2.0.0)"] +mongodb = ["pymongo (>=4.1.1)"] +msgpack = ["msgpack"] +pyro = ["pyro4"] +qpid = ["qpid-python (>=0.26)", "qpid-tools (>=0.26)"] +redis = ["redis (>=4.5.2)"] +slmq = ["softlayer-messaging (>=1.0.3)"] +sqlalchemy = ["sqlalchemy (>=1.4.48,<2.1)"] +sqs = ["boto3 (>=1.26.143)", "urllib3 (>=1.26.16)", "pycurl (>=7.43.0.5)"] +yaml = ["PyYAML (>=3.10)"] +zookeeper = ["kazoo (>=2.8.0)"] [[package]] name = "mako" @@ -344,6 +589,78 @@ category = "main" optional = false python-versions = ">=3.7" +[[package]] +name = "mccabe" +version = "0.7.0" +description = "McCabe checker, plugin for flake8" +category = "dev" +optional = false +python-versions = ">=3.6" + +[[package]] +name = "mentat-ai" +version = "0.1.9" +description = "AI coding assistant on your command line" +category = "dev" +optional = false +python-versions = ">=3.10" + +[package.dependencies] +aiohttp = "3.8.5" +aiosignal = "1.3.1" +async-timeout = "4.0.2" +attrs = "23.1.0" +black = "23.7.0" +certifi = "2023.7.22" +charset-normalizer = "3.2.0" +click = "8.1.6" +exceptiongroup = "1.1.2" +fire = "0.5.0" +flake8 = "6.0.0" +frozenlist = "1.4.0" +gitdb = "4.0.10" +GitPython = "3.1.32" +idna = "3.4" +iniconfig = "2.0.0" +isort = "5.12.0" +mccabe = "0.7.0" +multidict = "6.0.4" +mypy-extensions = "1.0.0" +openai = "0.27.8" +packaging = "23.1" +pathspec = "0.11.2" +platformdirs = "3.10.0" +pluggy = "1.2.0" +prompt-toolkit = "3.0.39" +pycodestyle = "2.10.0" +pyflakes = "3.0.1" +Pygments = "2.15.1" +pytest = "7.4.0" +pytest-mock = "3.11.1" +pytest-repeat = "0.9.1" +pytest-reportlog = "0.4.0" +python-dotenv = "1.0.0" +regex = "2023.6.3" +requests = "2.31.0" +ruff = "0.0.280" +six = "1.16.0" +smmap = "5.0.0" +termcolor = "2.3.0" +tiktoken = "0.4.0" +tomli = "2.0.1" +tqdm = "4.65.0" +urllib3 = "2.0.4" +wcwidth = "0.2.6" +yarl = "1.9.2" + +[[package]] +name = "multidict" +version = "6.0.4" +description = "multidict implementation" +category = "dev" +optional = false +python-versions = ">=3.7" + [[package]] name = "mypy" version = "1.4.1" @@ -379,11 +696,30 @@ category = "main" optional = false python-versions = ">=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*" +[[package]] +name = "openai" +version = "0.27.8" +description = "Python client library for the OpenAI API" +category = "dev" +optional = false +python-versions = ">=3.7.1" + +[package.dependencies] +aiohttp = "*" +requests = ">=2.20" +tqdm = "*" + +[package.extras] +datalib = ["numpy", "pandas (>=1.2.3)", "pandas-stubs (>=1.1.0.11)", "openpyxl (>=3.0.7)"] +dev = ["black (>=21.6b0,<22.0.0)", "pytest (>=6.0.0,<7.0.0)", "pytest-asyncio", "pytest-mock"] +embeddings = ["scikit-learn (>=1.0.2)", "tenacity (>=8.0.1)", "matplotlib", "plotly", "numpy", "scipy", "pandas (>=1.2.3)", "pandas-stubs (>=1.1.0.11)", "openpyxl (>=3.0.7)"] +wandb = ["wandb", "numpy", "pandas (>=1.2.3)", "pandas-stubs (>=1.1.0.11)", "openpyxl (>=3.0.7)"] + [[package]] name = "packaging" -version = "22.0" +version = "23.1" description = "Core utilities for Python packages" -category = "main" +category = "dev" optional = false python-versions = ">=3.7" @@ -397,31 +733,31 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" [[package]] name = "pathspec" -version = "0.10.2" +version = "0.11.2" description = "Utility library for gitignore style pattern matching of file paths." -category = "main" +category = "dev" optional = false python-versions = ">=3.7" [[package]] name = "platformdirs" -version = "2.6.0" +version = "3.10.0" description = "A small Python package for determining appropriate platform-specific dirs, e.g. a \"user data dir\"." category = "main" optional = false python-versions = ">=3.7" [package.extras] -docs = ["furo (>=2022.9.29)", "proselint (>=0.13)", "sphinx-autodoc-typehints (>=1.19.4)", "sphinx (>=5.3)"] -test = ["appdirs (==1.4.4)", "pytest-cov (>=4)", "pytest-mock (>=3.10)", "pytest (>=7.2)"] +docs = ["furo (>=2023.7.26)", "proselint (>=0.13)", "sphinx-autodoc-typehints (>=1.24)", "sphinx (>=7.1.1)"] +test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)", "pytest-cov (>=4.1)", "pytest-mock (>=3.11.1)", "pytest (>=7.4)"] [[package]] name = "pluggy" -version = "1.0.0" +version = "1.2.0" description = "plugin and hook calling mechanisms for python" -category = "main" +category = "dev" optional = false -python-versions = ">=3.6" +python-versions = ">=3.7" [package.extras] dev = ["pre-commit", "tox"] @@ -455,6 +791,17 @@ pyyaml = ">=5.1" toml = "*" virtualenv = ">=20.0.8" +[[package]] +name = "prompt-toolkit" +version = "3.0.39" +description = "Library for building powerful interactive command lines in Python" +category = "main" +optional = false +python-versions = ">=3.7.0" + +[package.dependencies] +wcwidth = "*" + [[package]] name = "psycopg2-binary" version = "2.9.5" @@ -464,12 +811,12 @@ optional = false python-versions = ">=3.6" [[package]] -name = "py" -version = "1.11.0" -description = "library with cross-python path, ini-parsing, io, code, log facilities" -category = "main" +name = "pycodestyle" +version = "2.10.0" +description = "Python style guide checker" +category = "dev" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +python-versions = ">=3.6" [[package]] name = "pydantic" @@ -486,26 +833,43 @@ typing-extensions = ">=4.1.0" dotenv = ["python-dotenv (>=0.10.4)"] email = ["email-validator (>=1.0.3)"] +[[package]] +name = "pyflakes" +version = "3.0.1" +description = "passive checker of Python programs" +category = "dev" +optional = false +python-versions = ">=3.6" + +[[package]] +name = "pygments" +version = "2.15.1" +description = "Pygments is a syntax highlighting package written in Python." +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.extras] +plugins = ["importlib-metadata"] + [[package]] name = "pytest" -version = "6.2.5" +version = "7.4.0" description = "pytest: simple powerful testing with Python" -category = "main" +category = "dev" optional = false -python-versions = ">=3.6" +python-versions = ">=3.7" [package.dependencies] -atomicwrites = {version = ">=1.0", markers = "sys_platform == \"win32\""} -attrs = ">=19.2.0" colorama = {version = "*", markers = "sys_platform == \"win32\""} +exceptiongroup = {version = ">=1.0.0rc8", markers = "python_version < \"3.11\""} iniconfig = "*" packaging = "*" pluggy = ">=0.12,<2.0" -py = ">=1.8.2" -toml = "*" +tomli = {version = ">=1.0.0", markers = "python_version < \"3.11\""} [package.extras] -testing = ["argcomplete", "hypothesis (>=3.56)", "mock", "nose", "requests", "xmlschema"] +testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "nose", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] [[package]] name = "pytest-cov" @@ -523,6 +887,45 @@ toml = "*" [package.extras] testing = ["fields", "hunter", "process-tests", "six", "pytest-xdist", "virtualenv"] +[[package]] +name = "pytest-mock" +version = "3.11.1" +description = "Thin-wrapper around the mock package for easier use with pytest" +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +pytest = ">=5.0" + +[package.extras] +dev = ["pre-commit", "tox", "pytest-asyncio"] + +[[package]] +name = "pytest-repeat" +version = "0.9.1" +description = "pytest plugin for repeating tests" +category = "dev" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" + +[package.dependencies] +pytest = ">=3.6" + +[[package]] +name = "pytest-reportlog" +version = "0.4.0" +description = "Replacement for the --resultlog option, focused in simplicity and extensibility" +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +pytest = "*" + +[package.extras] +dev = ["pre-commit", "tox"] + [[package]] name = "python-dateutil" version = "2.8.2" @@ -534,6 +937,17 @@ python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" [package.dependencies] six = ">=1.5" +[[package]] +name = "python-dotenv" +version = "1.0.0" +description = "Read key-value pairs from a .env file and set them as environment variables" +category = "dev" +optional = false +python-versions = ">=3.8" + +[package.extras] +cli = ["click (>=5.0)"] + [[package]] name = "python-json-logger" version = "2.0.4" @@ -561,19 +975,27 @@ category = "main" optional = false python-versions = ">=3.6" +[[package]] +name = "regex" +version = "2023.6.3" +description = "Alternative regular expression module, to replace re." +category = "dev" +optional = false +python-versions = ">=3.6" + [[package]] name = "requests" -version = "2.28.1" +version = "2.31.0" description = "Python HTTP for Humans." category = "main" optional = false -python-versions = ">=3.7, <4" +python-versions = ">=3.7" [package.dependencies] certifi = ">=2017.4.17" -charset-normalizer = ">=2,<3" +charset-normalizer = ">=2,<4" idna = ">=2.5,<4" -urllib3 = ">=1.21.1,<1.27" +urllib3 = ">=1.21.1,<3" [package.extras] socks = ["PySocks (>=1.5.6,!=1.5.7)"] @@ -593,6 +1015,14 @@ idna = {version = "*", optional = true, markers = "extra == \"idna2008\""} [package.extras] idna2008 = ["idna"] +[[package]] +name = "ruff" +version = "0.0.280" +description = "An extremely fast Python linter, written in Rust." +category = "dev" +optional = false +python-versions = ">=3.7" + [[package]] name = "six" version = "1.16.0" @@ -601,6 +1031,14 @@ category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*" +[[package]] +name = "smmap" +version = "5.0.0" +description = "A pure Python implementation of a sliding window memory map manager" +category = "dev" +optional = false +python-versions = ">=3.6" + [[package]] name = "sniffio" version = "1.3.0" @@ -703,6 +1141,32 @@ python-versions = ">=3.7" [package.dependencies] starlette = "*" +[[package]] +name = "termcolor" +version = "2.3.0" +description = "ANSI color formatting for output in terminal" +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.extras] +tests = ["pytest", "pytest-cov"] + +[[package]] +name = "tiktoken" +version = "0.4.0" +description = "tiktoken is a fast BPE tokeniser for use with OpenAI's models" +category = "dev" +optional = false +python-versions = ">=3.8" + +[package.dependencies] +regex = ">=2022.1.18" +requests = ">=2.26.0" + +[package.extras] +blobfile = ["blobfile (>=2)"] + [[package]] name = "toml" version = "0.10.2" @@ -713,11 +1177,11 @@ python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" [[package]] name = "tomli" -version = "1.2.3" +version = "2.0.1" description = "A lil' TOML parser" category = "main" optional = false -python-versions = ">=3.6" +python-versions = ">=3.7" [[package]] name = "tomlkit" @@ -727,6 +1191,23 @@ category = "dev" optional = false python-versions = ">=3.6" +[[package]] +name = "tqdm" +version = "4.65.0" +description = "Fast, Extensible Progress Meter" +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +colorama = {version = "*", markers = "platform_system == \"Windows\""} + +[package.extras] +dev = ["py-make (>=0.1.0)", "twine", "wheel"] +notebook = ["ipywidgets (>=6)"] +slack = ["slack-sdk"] +telegram = ["requests"] + [[package]] name = "typed-ast" version = "1.5.4" @@ -743,18 +1224,27 @@ category = "main" optional = false python-versions = ">=3.7" +[[package]] +name = "tzdata" +version = "2023.3" +description = "Provider of IANA time zone data" +category = "main" +optional = false +python-versions = ">=2" + [[package]] name = "urllib3" -version = "1.26.13" +version = "2.0.4" description = "HTTP library with thread-safe connection pooling, file post, and more." category = "main" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" +python-versions = ">=3.7" [package.extras] -brotli = ["brotlicffi (>=0.8.0)", "brotli (>=1.0.9)", "brotlipy (>=0.6.0)"] -secure = ["pyOpenSSL (>=0.14)", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "certifi", "urllib3-secure-extra", "ipaddress"] -socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] +brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"] +secure = ["certifi", "cryptography (>=1.9)", "idna (>=2.0.0)", "pyopenssl (>=17.1.0)", "urllib3-secure-extra"] +socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] +zstd = ["zstandard (>=0.18.0)"] [[package]] name = "uvicorn" @@ -772,22 +1262,30 @@ h11 = ">=0.8" [package.extras] standard = ["websockets (>=9.1)", "httptools (>=0.2.0,<0.3.0)", "watchgod (>=0.6)", "python-dotenv (>=0.13)", "PyYAML (>=5.1)", "uvloop (>=0.14.0,!=0.15.0,!=0.15.1)", "colorama (>=0.4)"] +[[package]] +name = "vine" +version = "5.0.0" +description = "Promises, promises, promises." +category = "main" +optional = false +python-versions = ">=3.6" + [[package]] name = "virtualenv" -version = "20.17.1" +version = "20.24.2" description = "Virtual Python Environment builder" category = "main" optional = false -python-versions = ">=3.6" +python-versions = ">=3.7" [package.dependencies] -distlib = ">=0.3.6,<1" -filelock = ">=3.4.1,<4" -platformdirs = ">=2.4,<3" +distlib = ">=0.3.7,<1" +filelock = ">=3.12.2,<4" +platformdirs = ">=3.9.1,<4" [package.extras] -docs = ["proselint (>=0.13)", "sphinx (>=5.3)", "sphinx-argparse (>=0.3.2)", "sphinx-rtd-theme (>=1)", "towncrier (>=22.8)"] -testing = ["coverage (>=6.2)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=21.3)", "pytest (>=7.0.1)", "pytest-env (>=0.6.2)", "pytest-freezegun (>=0.4.2)", "pytest-mock (>=3.6.1)", "pytest-randomly (>=3.10.3)", "pytest-timeout (>=2.1)"] +docs = ["furo (>=2023.5.20)", "proselint (>=0.13)", "sphinx-argparse (>=0.4)", "sphinx (>=7.0.1)", "sphinxcontrib-towncrier (>=0.2.1a0)", "towncrier (>=23.6)"] +test = ["covdefaults (>=2.3)", "coverage-enable-subprocess (>=1)", "coverage (>=7.2.7)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest-env (>=0.8.2)", "pytest-freezer (>=0.4.8)", "pytest-mock (>=3.11.1)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)", "pytest (>=7.4)", "setuptools (>=68)", "time-machine (>=2.10)"] [[package]] name = "vulture" @@ -800,19 +1298,50 @@ python-versions = ">=3.6" [package.dependencies] toml = "*" +[[package]] +name = "wcwidth" +version = "0.2.6" +description = "Measures the displayed width of unicode strings in a terminal" +category = "main" +optional = false +python-versions = "*" + +[[package]] +name = "yarl" +version = "1.9.2" +description = "Yet another URL library" +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +idna = ">=2.0" +multidict = ">=4.0" + [metadata] lock-version = "1.1" python-versions = "^3.10.0" -content-hash = "eff24f1c1eeba97a5529a8db362115f43e88aa4a80c27303055863593d04221e" +content-hash = "5b159e35146201f34770f116a4a3a82e10fd9972bb493ab5e240736f0bc7c051" [metadata.files] +aiohttp = [] +aiosignal = [] alembic = [] +amqp = [ + {file = "amqp-5.1.1-py3-none-any.whl", hash = "sha256:6f0956d2c23d8fa6e7691934d8c3930eadb44972cbbd1a7ae3a520f735d43359"}, + {file = "amqp-5.1.1.tar.gz", hash = "sha256:2c1b13fecc0893e946c65cbd5f36427861cffa4ea2201d8f6fca22e2a373b5e2"}, +] anyio = [] asgiref = [] -atomicwrites = [] +async-timeout = [ + {file = "async-timeout-4.0.2.tar.gz", hash = "sha256:2163e1640ddb52b7a8c80d0a67a08587e5d245cc9c553a74a847056bc2976b15"}, + {file = "async_timeout-4.0.2-py3-none-any.whl", hash = "sha256:8ca1e4fcf50d07413d66d1a5e416e42cfdf5851c981d679a09851a6853383b3c"}, +] attrs = [] bcrypt = [] +billiard = [] black = [] +celery = [] certifi = [] cfgv = [ {file = "cfgv-3.3.1-py2.py3-none-any.whl", hash = "sha256:c6a0883f3917a037485059700b9e75da2464e6c27051014ad85ba6aaa5884426"}, @@ -820,6 +1349,15 @@ cfgv = [ ] charset-normalizer = [] click = [] +click-didyoumean = [ + {file = "click-didyoumean-0.3.0.tar.gz", hash = "sha256:f184f0d851d96b6d29297354ed981b7dd71df7ff500d82fa6d11f0856bee8035"}, + {file = "click_didyoumean-0.3.0-py3-none-any.whl", hash = "sha256:a0713dc7a1de3f06bc0df5a9567ad19ead2d3d5689b434768a6145bff77c0667"}, +] +click-plugins = [ + {file = "click-plugins-1.1.1.tar.gz", hash = "sha256:46ab999744a9d831159c3411bb0c79346d94a444df9a3a3742e9ed63645f264b"}, + {file = "click_plugins-1.1.1-py2.py3-none-any.whl", hash = "sha256:5d262006d3222f5057fd81e1623d4443e41dcda5dc815c06b442aa3c02889fc8"}, +] +click-repl = [] colorama = [] colorlog = [ {file = "colorlog-5.0.1-py2.py3-none-any.whl", hash = "sha256:4e6be13d9169254e2ded6526a6a4a1abb8ac564f2fa65b310a98e4ca5bea2c04"}, @@ -828,24 +1366,33 @@ colorlog = [ coverage = [] dependency-injector = [] distlib = [] +exceptiongroup = [] fastapi = [] filelock = [] +fire = [] +flake8 = [] freezegun = [] +frozenlist = [] +gitdb = [] +gitpython = [] greenlet = [] h11 = [] httpcore = [] httpx = [] identify = [] idna = [] -iniconfig = [ - {file = "iniconfig-1.1.1-py2.py3-none-any.whl", hash = "sha256:011e24c64b7f47f6ebd835bb12a743f2fbe9a26d4cecaa7f53bc4f35ee9da8b3"}, - {file = "iniconfig-1.1.1.tar.gz", hash = "sha256:bc3af051d7d14b2ee5ef9969666def0cd1a000e121eaea580d4a313df4b37f32"}, -] +iniconfig = [] +isort = [] +kombu = [] mako = [] markupsafe = [] +mccabe = [] +mentat-ai = [] +multidict = [] mypy = [] mypy-extensions = [] nodeenv = [] +openai = [] packaging = [] pastel = [ {file = "pastel-0.2.1-py2.py3-none-any.whl", hash = "sha256:4349225fcdf6c2bb34d483e523475de5bb04a5c10ef711263452cb37d7dd4364"}, @@ -853,30 +1400,31 @@ pastel = [ ] pathspec = [] platformdirs = [] -pluggy = [ - {file = "pluggy-1.0.0-py2.py3-none-any.whl", hash = "sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3"}, - {file = "pluggy-1.0.0.tar.gz", hash = "sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159"}, -] +pluggy = [] poethepoet = [ {file = "poethepoet-0.10.0-py3-none-any.whl", hash = "sha256:6fb3021603d4421c6fcc40072bbcf150a6c52ef70ff4d3be089b8b04e015ef5a"}, {file = "poethepoet-0.10.0.tar.gz", hash = "sha256:70b97cb194b978dc464c70793e85e6f746cddf82b84a38bfb135946ad71ae19c"}, ] pre-commit = [] +prompt-toolkit = [] psycopg2-binary = [] -py = [ - {file = "py-1.11.0-py2.py3-none-any.whl", hash = "sha256:607c53218732647dff4acdfcd50cb62615cedf612e72d1724fb1a0cc6405b378"}, - {file = "py-1.11.0.tar.gz", hash = "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719"}, -] +pycodestyle = [] pydantic = [] +pyflakes = [] +pygments = [] pytest = [] pytest-cov = [ {file = "pytest-cov-2.12.1.tar.gz", hash = "sha256:261ceeb8c227b726249b376b8526b600f38667ee314f910353fa318caa01f4d7"}, {file = "pytest_cov-2.12.1-py2.py3-none-any.whl", hash = "sha256:261bb9e47e65bd099c89c3edf92972865210c36813f80ede5277dceb77a4a62a"}, ] +pytest-mock = [] +pytest-repeat = [] +pytest-reportlog = [] python-dateutil = [ {file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"}, {file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"}, ] +python-dotenv = [] python-json-logger = [] python-multipart = [ {file = "python-multipart-0.0.5.tar.gz", hash = "sha256:f7bb5f611fc600d15fa47b3974c8aa16e93724513b49b5f95c81e6624c83fa43"}, @@ -916,12 +1464,15 @@ pyyaml = [ {file = "PyYAML-6.0-cp39-cp39-win_amd64.whl", hash = "sha256:b3d267842bf12586ba6c734f89d1f5b871df0273157918b0ccefa29deb05c21c"}, {file = "PyYAML-6.0.tar.gz", hash = "sha256:68fb519c14306fec9720a2a5b45bc9f0c8d1b9c72adf45c37baedfcd949c35a2"}, ] +regex = [] requests = [] rfc3986 = [] +ruff = [] six = [ {file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"}, {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"}, ] +smmap = [] sniffio = [] sqlalchemy = [] sqlalchemy-json = [ @@ -931,18 +1482,31 @@ sqlalchemy-json = [ sqlalchemy-utils = [] starlette = [] starlette-context = [] +termcolor = [] +tiktoken = [] toml = [ {file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"}, {file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"}, ] -tomli = [] +tomli = [ + {file = "tomli-2.0.1-py3-none-any.whl", hash = "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc"}, + {file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"}, +] tomlkit = [] +tqdm = [] typed-ast = [] typing-extensions = [] +tzdata = [] urllib3 = [] uvicorn = [ {file = "uvicorn-0.14.0-py3-none-any.whl", hash = "sha256:2a76bb359171a504b3d1c853409af3adbfa5cef374a4a59e5881945a97a93eae"}, {file = "uvicorn-0.14.0.tar.gz", hash = "sha256:45ad7dfaaa7d55cab4cd1e85e03f27e9d60bc067ddc59db52a2b0aeca8870292"}, ] +vine = [ + {file = "vine-5.0.0-py2.py3-none-any.whl", hash = "sha256:4c9dceab6f76ed92105027c49c823800dd33cacce13bdedc5b914e3514b7fb30"}, + {file = "vine-5.0.0.tar.gz", hash = "sha256:7d3b1624a953da82ef63462013bbd271d3eb75751489f9807598e8f340bd637e"}, +] virtualenv = [] vulture = [] +wcwidth = [] +yarl = [] diff --git a/pyproject.toml b/pyproject.toml index eb9cff3..02baa18 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,9 +6,7 @@ authors = ["Przemysław Górecki "] [tool.poetry.dependencies] python = "^3.10.0" -pytest = "^6.2.4" pydantic = "^1.8.2" -black = "^21.5b1" fastapi = "^0.95.2" uvicorn = "^0.14.0" starlette-context = "^0.3.3" @@ -24,16 +22,19 @@ psycopg2-binary = "^2.9.2" freezegun = "^1.1.0" SQLAlchemy-Utils = "^0.38.3" pre-commit = "^2.20.0" -click = "8.0.4" httpx = "^0.23.1" requests = "^2.28.1" bcrypt = "^4.0.1" mypy = "^1.4.1" +celery = "^5.3.1" [tool.poetry.dev-dependencies] poethepoet = "^0.10.0" pytest-cov = "^2.12.1" vulture = "^2.7" +black = "^23.7.0" +mentat-ai = "^0.1.9" +pytest = "^7.4.0" [build-system] requires = ["poetry-core>=1.0.0"] diff --git a/src/config/container.py b/src/config/container.py index dba2b4f..a97b9da 100644 --- a/src/config/container.py +++ b/src/config/container.py @@ -23,6 +23,7 @@ from modules.iam.application.services import IamService from modules.iam.infrastructure.repository import PostgresJsonUserRepository from seedwork.application import Application, DependencyProvider +from seedwork.application.events import DomainEvent, EventResult, IntegrationEvent from seedwork.infrastructure.logging import logger from seedwork.infrastructure.postgres_outbox import Outbox, PostgresOutbox @@ -126,6 +127,19 @@ def on_exit_transaction_context(ctx, exc_type, exc_val, exc_tb): logger.debug(f"transaction ended ") logger.correlation_id.set(uuid.UUID(int=0)) + @application.transaction_middleware + def auto_type_middleware(ctx, call_next, command=None, query=None, event=None): + result = call_next() + if command: + ... + elif query: + ... + elif event: + # we are allowing event handlers to return events, if so they are converted to EventResult + if isinstance(result, DomainEvent) or isinstance(result, IntegrationEvent): + return EventResult.success(event=result) + return result + @application.transaction_middleware def logging_middleware(ctx, call_next, command=None, query=None, event=None): if command: diff --git a/src/modules/bidding/application/event/eventual.py b/src/modules/bidding/application/event/eventual.py index 2c36f62..fb07c38 100644 --- a/src/modules/bidding/application/event/eventual.py +++ b/src/modules/bidding/application/event/eventual.py @@ -1,11 +1,9 @@ from modules.bidding.application import bidding_module -from modules.bidding.application.event.transactional import ( - SendEmailToSellerThatBidWasPlaced, -) +from modules.bidding.application.event.transactional import BidWasPlacedNotification @bidding_module.integration_event_handler -def send_email_to_seller_that_bid_was_placed(event: SendEmailToSellerThatBidWasPlaced): +def send_email_to_seller_that_bid_was_placed(event: BidWasPlacedNotification): print("send_email_to_seller_that_bid_was_placed") print(event.listing_id) print(event.bidder_id) diff --git a/src/modules/bidding/application/event/transactional.py b/src/modules/bidding/application/event/transactional.py index 7af77c5..718dbfb 100644 --- a/src/modules/bidding/application/event/transactional.py +++ b/src/modules/bidding/application/event/transactional.py @@ -10,7 +10,7 @@ from seedwork.domain.value_objects import GenericUUID -class SendEmailToSellerThatBidWasPlaced(IntegrationEvent): +class BidWasPlacedNotification(IntegrationEvent): listing_id: GenericUUID bidder_id: GenericUUID @@ -23,7 +23,7 @@ def notify_outbid_winner(event: BidWasPlaced, logger): @bidding_module.domain_event_handler def notify_seller_of_new_bid(event: BidWasPlaced, logger): logger.info("New bid was placed") - return SendEmailToSellerThatBidWasPlaced( + return BidWasPlacedNotification( listing_id=event.listing_id, bidder_id=event.bidder_id ) diff --git a/src/modules/bidding/tests/application/test_seller_is_notified_of_new_bid.py b/src/modules/bidding/tests/application/test_seller_is_notified_of_new_bid.py new file mode 100644 index 0000000..91b0282 --- /dev/null +++ b/src/modules/bidding/tests/application/test_seller_is_notified_of_new_bid.py @@ -0,0 +1,20 @@ +import pytest + +from modules.bidding.domain.events import BidWasPlaced +from seedwork.domain.value_objects import GenericUUID, Money + + +@pytest.mark.integration +def test_seller_is_notified_of_new_bid(app): + with app.transaction_context() as ctx: + ctx.handle_domain_event( + BidWasPlaced( + listing_id=GenericUUID(int=1), + bidder_id=GenericUUID(int=2), + amount=Money(10), + ) + ) + + with app.transaction_context() as ctx: + outbox = ctx.get_dependency("outbox") + assert outbox.get_messages() == 123 diff --git a/src/seedwork/application/__init__.py b/src/seedwork/application/__init__.py index 9d5b714..7b5638d 100644 --- a/src/seedwork/application/__init__.py +++ b/src/seedwork/application/__init__.py @@ -1,6 +1,6 @@ import importlib import inspect -from collections import defaultdict, OrderedDict +from collections import OrderedDict, defaultdict from functools import partial from typing import Any, Type, TypeVar @@ -26,6 +26,7 @@ def get_function_arguments(func): def get_handler_arguments(func): + """Handlers can have multiple arguments, but only the first of them can be a command, query or event.""" parameters = get_function_arguments(func) kwargs_iterator = iter(parameters.items()) _, first_parameter = next(kwargs_iterator) @@ -64,12 +65,12 @@ def get_dependency(self, identifier): def _resolve_arguments(self, handler_parameters, overrides) -> dict: """Match handler_parameters with dependencies""" - + def _resolve(identifier, overrides): if identifier in overrides: return overrides[identifier] return self.get_dependency(identifier) - + kwargs = {} for param_name, param_type in handler_parameters.items(): # first, try to resolve by type @@ -92,7 +93,7 @@ def get_function_kwargs(self, func, overrides=None): func_parameters = get_function_arguments(func) kwargs = self._resolve_arguments(func_parameters, overrides or {}) return kwargs - + def get_handler_kwargs(self, func, overrides=None): _, handler_parameters = get_handler_arguments(func) kwargs = self._resolve_arguments(handler_parameters, overrides or {}) @@ -141,21 +142,21 @@ def _wrap_with_middlewares( for middleware in self.app._transaction_middlewares: p = partial(middleware, self, p, command, query, event) return p - + def _get_overrides(self, **kwargs): overrides = dict(ctx=self) overrides.update(self.overrides) overrides.update(kwargs) - + type_match = defaultdict(list) for name, value in overrides.items(): type_match[type(value)].append(value) with_unique_type = dict((k, v[0]) for k, v in type_match.items() if len(v) == 1) - + overrides.update(with_unique_type) - + return overrides - + def call(self, handler_func, **kwargs): overrides = self._get_overrides(**kwargs) handler_kwargs = self.dependency_provider.get_function_kwargs( @@ -228,6 +229,7 @@ def handle_domain_event(self, event) -> EventResultSet: p = partial(handler_func, event, **handler_kwargs) wrapped_handler = self._wrap_with_middlewares(p, event=event) event_result = wrapped_handler() or EventResult.success() + assert isinstance( event_result, EventResult ), f"Got {event_result} instead of EventResult from {handler_func}" diff --git a/src/seedwork/application/commands.py b/src/seedwork/application/commands.py index 7c096d1..cdd75c5 100644 --- a/src/seedwork/application/commands.py +++ b/src/seedwork/application/commands.py @@ -1,5 +1,6 @@ from abc import ABC -from .command_handlers import CommandResult + +from .command_handlers import CommandResult # type: ignore class Command(ABC): diff --git a/src/seedwork/application/inbox_outbox.py b/src/seedwork/application/inbox_outbox.py index 5ef94b7..690f438 100644 --- a/src/seedwork/application/inbox_outbox.py +++ b/src/seedwork/application/inbox_outbox.py @@ -1,44 +1,165 @@ import abc -import contextlib +from datetime import datetime +from enum import Enum from typing import Optional from seedwork.application.events import IntegrationEvent from seedwork.domain.entities import AggregateRoot +class MessageStatus(str, Enum): + PENDING = "PENDING" + PROCESSING = "PROCESSING" + PROCESSED = "PROCESSED" + FAILED = "FAILED" + + +class Message: + """ + Message carries IntegrationEvent. + """ + + def __init__( + self, + event: IntegrationEvent, + status: MessageStatus = MessageStatus.PENDING, + sender: Optional[str] = None, + ): + self.event = event + self.status = status + self.sender = sender + self.crated_at = datetime.now() + self.updated_at = datetime.now() + + def mark_as_processing(self): + self.status = MessageStatus.PROCESSING + self.updated_at = datetime.now() + + def mark_as_processed(self): + self.status = MessageStatus.PROCESSED + self.updated_at = datetime.now() + + def mark_as_failed(self): + self.status = MessageStatus.FAILED + self.updated_at = datetime.now() + + +class Inbox(abc.ABC): + def add(self, event: IntegrationEvent): + """Add event to the inbox""" + raise NotImplementedError() + + def get_next_pending(self) -> Optional[IntegrationEvent]: + """ + Get next event from the inbox. Only one event can be retrieved at a time. + Call mark_as_processed() to mark event as processed. + Call mark_as_failed() to mark event as failed. + """ + raise NotImplementedError() + + def mark_as_processed(self): + """Mark retrieved event as processed""" + raise NotImplementedError() + + def mark_as_failed(self): + """Mark retrieved event as sent""" + raise NotImplementedError() + + class Outbox(abc.ABC): @abc.abstractmethod def add(self, event: IntegrationEvent, source: Optional[AggregateRoot] = None): """Add event to the outbox""" raise NotImplementedError() + def get_next_pending(self) -> Optional[IntegrationEvent]: + """Get pending events from the outbox""" + raise NotImplementedError() -class InMemoryInbox: - def __init__(self): - self.events = [] + def mark_as_sent(self): + """Mark retrieved event as sent""" + raise NotImplementedError() - def is_empty(self): - return len(self.events) == 0 + def mark_as_failed(self): + """Mark retrieved event as sent""" + raise NotImplementedError() + + def get_all_deliveries(self) -> list[Message]: + """Get all deliveries""" + raise NotImplementedError() - @contextlib.contextmanager - def get_next_event(self): - yield self.events.pop(0) - def enqueue(self, event): - self.events.append(event) +class InMemoryInbox(Inbox): + events: list[Message] + current: Optional[Message] + def __init__(self): + self.events = [] + self.current = None -class ProcessInboxUntilEmptyStrategy: - def __init__(self, inbox: InMemoryInbox): - self.inbox = inbox + def add(self, event: IntegrationEvent): + self.events.append(Message(event, MessageStatus.PENDING)) - def should_process_next_event(self): - return not self.inbox.is_empty() + def get_next_pending(self) -> Optional[IntegrationEvent]: + """ + Get next event from the inbox. Only one event can be retrieved at a time. + Call mark_as_processed() to mark event as processed. + Call mark_as_failed() to mark event as failed. + """ + assert self.current is None, "Only one event can be retrieved at a time" + generator = (x for x in self.events if x.status == MessageStatus.PENDING) + first_match = next(generator, None) + self.current = first_match + return first_match.event if first_match else None + + def mark_as_processed(self): + """Mark retrieved event as processed""" + self.current.mark_as_processed() + self.current = None + + def mark_as_failed(self): + """Mark retrieved event as sent""" + self.current.mark_as_failed() + self.current = None class InMemoryOutbox(Outbox): + events: list[Message] + current: Optional[Message] + def __init__(self): self.events = [] + self.current = None def add(self, event: IntegrationEvent, source: Optional[AggregateRoot] = None): - self.events.append(event) + sender = str(source) if source else None + self.events.append(Message(event, MessageStatus.PENDING, sender=sender)) + + def get_next_pending(self) -> Optional[IntegrationEvent]: + """Get pending events from the outbox""" + assert self.current is None, "Only one event can be retrieved at a time" + generator = (x for x in self.events if x.status == MessageStatus.PENDING) + first_match = next(generator, None) + self.current = first_match + return first_match.event if first_match else None + + def mark_as_sent(self): + """Mark retrieved event as sent""" + self.current.mark_as_processed() + self.current = None + + def mark_as_failed(self): + """Mark retrieved event as sent""" + self.current.mark_as_failed() + self.current = None + + +def move_events_from_outbox_to_inbox(outbox: Outbox, inbox: Inbox, limit: int = 100): + """Move events from outbox to inbox""" + while limit > 0: + event = outbox.get_next_pending() + if event is None: + break + inbox.add(event) + outbox.mark_as_sent() + limit -= 1 diff --git a/src/seedwork/infrastructure/postgres_outbox.py b/src/seedwork/infrastructure/postgres_outbox.py index e4f4ceb..2cc5592 100644 --- a/src/seedwork/infrastructure/postgres_outbox.py +++ b/src/seedwork/infrastructure/postgres_outbox.py @@ -4,21 +4,15 @@ from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.orm import Session from sqlalchemy.sql.schema import Column -from sqlalchemy.sql.sqltypes import JSON, DateTime, Enum, String +from sqlalchemy.sql.sqltypes import JSON, DateTime, String from seedwork.application.events import IntegrationEvent -from seedwork.application.inbox_outbox import Outbox +from seedwork.application.inbox_outbox import Message, MessageStatus, Outbox from seedwork.domain.entities import AggregateRoot from seedwork.infrastructure.database import Base -class OutboxMessageStatus(str, Enum): - PENDING = "pending" - SENT = "sent" - FAILED = "failed" - - -class OutboxMessage(Base): +class OutboxMessageModel(Base): __tablename__ = "outbox_messages" id = Column(UUID(as_uuid=True), primary_key=True) @@ -27,7 +21,7 @@ class OutboxMessage(Base): created_at = Column(DateTime, nullable=False, default=datetime.utcnow) aggregate_id = Column(UUID(as_uuid=True), nullable=True) aggregate_type = Column(String, nullable=True) - status = Column(String, nullable=False, default=OutboxMessageStatus.PENDING) + status = Column(String, nullable=False, default=MessageStatus.PENDING) class PostgresOutbox(Outbox): @@ -37,7 +31,7 @@ def __init__(self, db_session: Session): def add( self, event: IntegrationEvent, source: Optional[AggregateRoot] = None ) -> None: - message = OutboxMessage( + message = OutboxMessageModel( id=event.event_id, event_type=str(event.__class__), event_data=event.__dict__, @@ -45,3 +39,19 @@ def add( aggregate_id=source.id if source else None, ) self._session.add(message) + + def get_next_pending(self) -> Optional[IntegrationEvent]: + """Get pending events from the outbox""" + raise NotImplementedError() + + def mark_as_sent(self): + """Mark retrieved event as sent""" + raise NotImplementedError() + + def mark_as_failed(self): + """Mark retrieved event as sent""" + raise NotImplementedError() + + def get_messages(self) -> list[Message]: + """Get all deliveries""" + return self._session.query(OutboxMessageModel).all() diff --git a/src/seedwork/tests/application/test_application.py b/src/seedwork/tests/application/test_application.py index 44d9ae1..dc8d72b 100644 --- a/src/seedwork/tests/application/test_application.py +++ b/src/seedwork/tests/application/test_application.py @@ -142,18 +142,18 @@ def handle_ping(command: SendPing, missing_dependency): with pytest.raises(TypeError): app.execute_command(SendPing()) - - + + @pytest.mark.unit def test_call_any_function(): def some_function(foo, bar): return foo + bar - + app = Application() - + with app.transaction_context(foo=1, bar=0) as ctx: result = ctx.call(some_function, bar=2) - + assert result == 3 @@ -168,8 +168,8 @@ def foo(): result = ctx.call(foo) assert result is True - - + + @pytest.mark.unit def test_call_skips_unneeded_dependencies(): def foo(): @@ -193,4 +193,4 @@ def get_name(ctx): with app.transaction_context(foo=1, bar=0) as ctx: result = ctx.call(get_name) - assert result == "TransactionContext" \ No newline at end of file + assert result == "TransactionContext" diff --git a/src/seedwork/tests/application/test_inbox_outbox.py b/src/seedwork/tests/application/test_inbox_outbox.py new file mode 100644 index 0000000..a949e0c --- /dev/null +++ b/src/seedwork/tests/application/test_inbox_outbox.py @@ -0,0 +1,26 @@ +from seedwork.application.inbox_outbox import ( + InMemoryInbox, + InMemoryOutbox, + IntegrationEvent, + MessageStatus, + move_events_from_outbox_to_inbox, +) + + +class FooEvent(IntegrationEvent): + message: str + + +def test_move_messages_from_outbox_to_inbox(): + event = FooEvent(message="Hello") + outbox = InMemoryOutbox() + inbox = InMemoryInbox() + outbox.add(event) + + move_events_from_outbox_to_inbox(outbox, inbox) + + assert len(outbox.events) == 1 + assert outbox.events[0].status == MessageStatus.PROCESSED + assert len(inbox.events) == 1 + assert inbox.events[0].status == MessageStatus.PENDING + assert inbox.events[0].event == outbox.events[0].event diff --git a/src/worker/__init__.py b/src/worker/__init__.py new file mode 100644 index 0000000..89dab49 --- /dev/null +++ b/src/worker/__init__.py @@ -0,0 +1,36 @@ +from celery import Celery + +from config.api_config import ApiConfig +from config.container import TopLevelContainer +from seedwork.application.inbox_outbox import ( + Inbox, + Outbox, + move_events_from_outbox_to_inbox, +) + +c = Celery("hello", broker="amqp://guest@localhost//") + +container = TopLevelContainer() +container.config.from_pydantic(ApiConfig()) + +app = container.application() + + +@c.task +def send_events_from_outbox_to_inbox(): + with app.transaction_context() as ctx: + ctx.call(move_events_from_outbox_to_inbox) + + +@c.task +def process_incoming_events(): + def handle_incoming_events(ctx, inbox): + while event := inbox.get_next_pending(): + try: + ctx.handle_integration_event(event) + inbox.mark_as_processed() + except: + inbox.mark_as_failed() + + with app.transaction_context() as ctx: + ctx.call(handle_incoming_events) From e6d63beb6cdf7587a3ca5f3c5c0bb775b8c58f87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20G=C3=B3recki?= Date: Tue, 10 Oct 2023 15:16:38 +0200 Subject: [PATCH 4/4] Refactor event dispatching during command/event execution. Introduce the concept of execution chain. Refactor storing integration events in the outbox --- src/config/container.py | 47 ++++--- .../application/event/transactional.py | 10 +- .../test_seller_is_notified_of_new_bid.py | 8 +- src/seedwork/application/__init__.py | 117 +++++++++++------- src/seedwork/application/command_handlers.py | 3 + src/seedwork/application/events.py | 31 +---- src/seedwork/application/inbox_outbox.py | 4 + src/seedwork/application/results.py | 45 +++++++ src/seedwork/domain/exceptions.py | 1 - src/seedwork/infrastructure/postgres_inbox.py | 58 +++++++++ .../infrastructure/postgres_outbox.py | 7 +- .../test_application_with_outbox.py | 16 +-- 12 files changed, 231 insertions(+), 116 deletions(-) create mode 100644 src/seedwork/application/results.py create mode 100644 src/seedwork/infrastructure/postgres_inbox.py diff --git a/src/config/container.py b/src/config/container.py index a97b9da..7a7497b 100644 --- a/src/config/container.py +++ b/src/config/container.py @@ -22,10 +22,10 @@ ) from modules.iam.application.services import IamService from modules.iam.infrastructure.repository import PostgresJsonUserRepository -from seedwork.application import Application, DependencyProvider +from seedwork.application import Application, DependencyProvider, TransactionContext from seedwork.application.events import DomainEvent, EventResult, IntegrationEvent from seedwork.infrastructure.logging import logger -from seedwork.infrastructure.postgres_outbox import Outbox, PostgresOutbox +from seedwork.infrastructure.postgres_outbox import PostgresOutbox def resolve_provider_by_type(container: Container, cls: type) -> Optional[Provider]: @@ -102,11 +102,11 @@ def create_application(db_engine): application.include_module(bidding_module) @application.on_enter_transaction_context - def on_enter_transaction_context(ctx): + def on_enter_transaction_context(ctx: TransactionContext): engine = ctx.app.dependency_provider["db_engine"] session = Session(engine) correlation_id = uuid.uuid4() - logger.correlation_id.set(uuid.uuid4()) + logger.correlation_id.set(uuid.uuid4()) # type: ignore transaction_container = TransactionContainer( db_session=session, correlation_id=correlation_id, logger=logger ) @@ -114,8 +114,8 @@ def on_enter_transaction_context(ctx): logger.debug(f"transaction started") @application.on_exit_transaction_context - def on_exit_transaction_context(ctx, exc_type, exc_val, exc_tb): - session = ctx.dependency_provider.get_dependency("db_session") + def on_exit_transaction_context(ctx: TransactionContext, exc_type, exc_val, exc_tb): + session = ctx.get_dependency("db_session") if exc_type: session.rollback() @@ -125,23 +125,12 @@ def on_exit_transaction_context(ctx, exc_type, exc_val, exc_tb): logger.debug(f"committed") session.close() logger.debug(f"transaction ended ") - logger.correlation_id.set(uuid.UUID(int=0)) + logger.correlation_id.set(uuid.UUID(int=0)) # type: ignore @application.transaction_middleware - def auto_type_middleware(ctx, call_next, command=None, query=None, event=None): - result = call_next() - if command: - ... - elif query: - ... - elif event: - # we are allowing event handlers to return events, if so they are converted to EventResult - if isinstance(result, DomainEvent) or isinstance(result, IntegrationEvent): - return EventResult.success(event=result) - return result - - @application.transaction_middleware - def logging_middleware(ctx, call_next, command=None, query=None, event=None): + def logging_middleware( + ctx: TransactionContext, call_next, command=None, query=None, event=None + ): if command: prefix = "Executing" task = command @@ -158,11 +147,19 @@ def logging_middleware(ctx, call_next, command=None, query=None, event=None): return result @application.transaction_middleware - def outbox_middleware(ctx, call_next, command=None, query=None, event=None): + def event_to_event_result_middleware( + ctx: TransactionContext, call_next, command=None, query=None, event=None + ): + """If event handler returns an event, convert it to EventResult""" result = call_next() - outbox = ctx.get_dependency(Outbox) - for event in ctx.collect_integration_events(): - outbox.add(event) + if command: + ... + elif query: + ... + elif event: + # we are allowing event handlers to return events, if so they are converted to EventResult + if isinstance(result, DomainEvent) or isinstance(result, IntegrationEvent): + return EventResult.success(event=result) return result return application diff --git a/src/modules/bidding/application/event/transactional.py b/src/modules/bidding/application/event/transactional.py index 718dbfb..3875431 100644 --- a/src/modules/bidding/application/event/transactional.py +++ b/src/modules/bidding/application/event/transactional.py @@ -2,7 +2,7 @@ from modules.bidding.application import bidding_module from modules.bidding.domain.entities import Listing -from modules.bidding.domain.events import BidWasPlaced +from modules.bidding.domain.events import BidWasPlaced, HighestBidderWasOutbid from modules.bidding.domain.repositories import ListingRepository from modules.bidding.domain.value_objects import Seller from modules.catalog.domain.events import ListingPublishedEvent @@ -16,8 +16,12 @@ class BidWasPlacedNotification(IntegrationEvent): @bidding_module.domain_event_handler -def notify_outbid_winner(event: BidWasPlaced, logger): - logger.info(f"Message from a handler: Listing {event.listing_id} was published") +def notify_outbid_winner(event: HighestBidderWasOutbid, outbox): + outbox.add_message() + + return BidWasPlacedNotification( + listing_id=event.listing_id, bidder_id=event.bidder_id + ) @bidding_module.domain_event_handler diff --git a/src/modules/bidding/tests/application/test_seller_is_notified_of_new_bid.py b/src/modules/bidding/tests/application/test_seller_is_notified_of_new_bid.py index 91b0282..cd55ff1 100644 --- a/src/modules/bidding/tests/application/test_seller_is_notified_of_new_bid.py +++ b/src/modules/bidding/tests/application/test_seller_is_notified_of_new_bid.py @@ -1,5 +1,6 @@ import pytest +from modules.bidding.application.event import BidWasPlacedNotification from modules.bidding.domain.events import BidWasPlaced from seedwork.domain.value_objects import GenericUUID, Money @@ -7,14 +8,13 @@ @pytest.mark.integration def test_seller_is_notified_of_new_bid(app): with app.transaction_context() as ctx: - ctx.handle_domain_event( + chain = ctx.handle_domain_event( BidWasPlaced( listing_id=GenericUUID(int=1), bidder_id=GenericUUID(int=2), amount=Money(10), ) ) + events = chain.triggered_events(BidWasPlacedNotification) - with app.transaction_context() as ctx: - outbox = ctx.get_dependency("outbox") - assert outbox.get_messages() == 123 + assert len(events) == 1 diff --git a/src/seedwork/application/__init__.py b/src/seedwork/application/__init__.py index 7b5638d..6659644 100644 --- a/src/seedwork/application/__init__.py +++ b/src/seedwork/application/__init__.py @@ -6,11 +6,12 @@ from seedwork.application.command_handlers import CommandResult from seedwork.application.commands import Command -from seedwork.application.events import EventResult, EventResultSet, IntegrationEvent +from seedwork.application.events import EventResult, IntegrationEvent from seedwork.application.exceptions import ApplicationException from seedwork.application.inbox_outbox import InMemoryInbox from seedwork.application.queries import Query from seedwork.application.query_handlers import QueryResult +from seedwork.application.results import ExecutionChain, ExecutionStep from seedwork.domain.events import DomainEvent from seedwork.domain.repositories import GenericRepository from seedwork.utils.data_structures import OrderedSet @@ -25,6 +26,29 @@ def get_function_arguments(func): return parameters +def dispatch_triggered_events(handler): + def decorator(self, *args, **kwargs) -> ExecutionChain: + integration_events = [] + execution_chain = handler(self, *args, **kwargs) + pending_events = execution_chain.triggered_events() + while len(pending_events) > 0: + event = pending_events.pop(0) + if isinstance(event, IntegrationEvent): + integration_events.append(event) + execution_chain.add( + ExecutionStep(task=event, handler=None, result=None) + ) + + elif isinstance(event, DomainEvent): + subchain = self._handle_domain_event(event) + execution_chain.extend(subchain) + pending_events.extend(subchain.triggered_events()) + self.last_execution_chain = execution_chain + return execution_chain + + return decorator + + def get_handler_arguments(func): """Handlers can have multiple arguments, but only the first of them can be a command, query or event.""" parameters = get_function_arguments(func) @@ -40,15 +64,21 @@ def get_handler_arguments(func): T = TypeVar("T", CommandResult, EventResult) -def collect_domain_events(result: T, handler_kwargs) -> T: +def collect_domain_events(handler_kwargs) -> list[DomainEvent]: + """ + Collect domain events, captured by repositories used by a handler. + + The handler is used to change the state of an entity (which is accessed via repository) by calling one of its + methods. The repository is responsible for tracking changes in entities and also for collecting domain events that + are raised by entities. Hence, we can use repositories to collect domain events that are raised by entities. + """ domain_events = [] repositories = filter( lambda x: isinstance(x, GenericRepository), handler_kwargs.values() ) for repo in repositories: domain_events.extend(repo.collect_events()) - result.events.extend(domain_events) - return result + return domain_events class DependencyProvider: @@ -122,9 +152,6 @@ def __init__(self, app, **overrides): self.app = app self.overrides = overrides self.dependency_provider = app.dependency_provider - self.task = None - self.next_commands = [] - self.integration_events = [] def __enter__(self): """Should be used to start a transaction""" @@ -167,30 +194,26 @@ def call(self, handler_func, **kwargs): result = wrapped_handler() return result - def execute_query(self, query) -> QueryResult: - assert ( - self.task is None - ), "Cannot execute query while another task is being executed" - self.task = query - + def execute_query(self, query) -> ExecutionChain: handler_func = self.app.get_query_handler(query) handler_kwargs = self.dependency_provider.get_handler_kwargs( handler_func, self._get_overrides() ) p = partial(handler_func, query, **handler_kwargs) wrapped_handler = self._wrap_with_middlewares(p, query=query) - result = wrapped_handler() + query_result = wrapped_handler() assert isinstance( - result, QueryResult - ), f"Got {result} instead of QueryResult from {handler_func}" - return result + query_result, QueryResult + ), f"Got {query_result} instead of QueryResult from {handler_func}" - def execute_command(self, command) -> CommandResult: - assert ( - self.task is None - ), "Cannot execute command while another task is being executed" - self.task = command + return ExecutionChain.one( + ExecutionStep( + task=query, handler=handler_func.__name__, result=query_result + ) + ) + @dispatch_triggered_events + def execute_command(self, command) -> ExecutionChain: handler_func = self.app.get_command_handler(command) handler_kwargs = self.dependency_provider.get_handler_kwargs( handler_func, self._get_overrides() @@ -203,25 +226,23 @@ def execute_command(self, command) -> CommandResult: assert isinstance( command_result, CommandResult ), f"Got {command_result} instead of CommandResult from {handler_func}" - command_result = collect_domain_events(command_result, handler_kwargs) - self.next_commands = [] - self.integration_events = [] - event_queue = command_result.events.copy() - while len(event_queue) > 0: - event = event_queue.pop(0) - if isinstance(event, IntegrationEvent): - self.store_integration_event(event) + # collect events from entities used by the handler + collected_domain_events = collect_domain_events(handler_kwargs) + command_result.extend_with_events(collected_domain_events) - elif isinstance(event, DomainEvent): - event_results = self.handle_domain_event(event) - self.next_commands.extend(event_results.commands) - event_queue.extend(event_results.events) + return ExecutionChain.one( + ExecutionStep( + task=command, handler=handler_func.__name__, result=command_result + ) + ) - return CommandResult.success(payload=command_result.payload) + @dispatch_triggered_events + def handle_domain_event(self, event) -> ExecutionChain: + return self._handle_domain_event(event) - def handle_domain_event(self, event) -> EventResultSet: - event_results = [] + def _handle_domain_event(self, event) -> ExecutionChain: + execution_sequence = ExecutionChain() for handler_func in self.app.get_event_handlers(event): handler_kwargs = self.dependency_provider.get_handler_kwargs( handler_func, self._get_overrides() @@ -229,25 +250,25 @@ def handle_domain_event(self, event) -> EventResultSet: p = partial(handler_func, event, **handler_kwargs) wrapped_handler = self._wrap_with_middlewares(p, event=event) event_result = wrapped_handler() or EventResult.success() - assert isinstance( event_result, EventResult ), f"Got {event_result} instead of EventResult from {handler_func}" - event_result = collect_domain_events(event_result, handler_kwargs) - event_results.append(event_result) - return EventResultSet(event_results) + collected_events = collect_domain_events(handler_kwargs) + event_result.extend_with_events(collected_events) + execution_sequence.add( + ExecutionStep( + task=event, handler=handler_func.__name__, result=event_result + ) + ) + return execution_sequence def handle_integration_event(self, event): # TODO: do we need to handle domain and integration events differently??? return self.handle_domain_event(event) - def store_integration_event(self, event): - self.integration_events.append(event) - - def collect_integration_events(self) -> list[IntegrationEvent]: - integration_events = self.integration_events - self.integration_events = [] - return integration_events + def collect_integration_events(self): + assert self.last_execution_chain, "No execution chain available" + return self.last_execution_chain.triggered_events(type_of=IntegrationEvent) def get_dependency(self, identifier: Any) -> Any: """Get a dependency from the dependency provider""" diff --git a/src/seedwork/application/command_handlers.py b/src/seedwork/application/command_handlers.py index 54a28fd..25af6c3 100644 --- a/src/seedwork/application/command_handlers.py +++ b/src/seedwork/application/command_handlers.py @@ -22,6 +22,9 @@ def add_error(self, message, exception, exception_info): def is_success(self) -> bool: return not self.has_errors() + def extend_with_events(self, events: list[DomainEvent]): + self.events.extend(events) + @classmethod def failure(cls, message="Failure", exception=None) -> "CommandResult": """Creates a failed result""" diff --git a/src/seedwork/application/events.py b/src/seedwork/application/events.py index 8efeb08..d90e746 100644 --- a/src/seedwork/application/events.py +++ b/src/seedwork/application/events.py @@ -30,10 +30,7 @@ class EventResult: event_id: EventId = field(default_factory=EventId.next_id) payload: Any = None - command: Any = ( - None # command to be executed as a result of this event (experimental) - ) - events: list[DomainEvent] = field(default_factory=list) + events: list[DomainEvent] = field(default_factory=list) # triggered events errors: list[Any] = field(default_factory=list) def has_errors(self) -> bool: @@ -44,6 +41,9 @@ def is_success(self) -> bool: """Returns True if an event was successfully executed""" return not self.has_errors() + def extend_with_events(self, events: list[DomainEvent]): + self.events.extend(events) + def __hash__(self): return id(self) @@ -57,30 +57,11 @@ def failure(cls, message="Failure", exception=None) -> "EventResult": @classmethod def success( - cls, event_id=None, payload=None, command=None, event=None, events=None + cls, event_id=None, payload=None, event=None, events=None ) -> "EventResult": """Creates a successful result""" if events is None: events = [] if event: events.append(event) - return cls(event_id=event_id, payload=payload, command=command, events=events) - - -class EventResultSet(set): - """For now just aa fancy name for a set""" - - def is_success(self): - return all([r.is_success() for r in self]) - - @property - def events(self): - all_events = [] - for event in self: - all_events.extend(event.events) - return all_events - - @property - def commands(self): - all_commands = [event.command for event in self if event.command] - return all_commands + return cls(event_id=event_id, payload=payload, events=events) diff --git a/src/seedwork/application/inbox_outbox.py b/src/seedwork/application/inbox_outbox.py index 690f438..0be3ac3 100644 --- a/src/seedwork/application/inbox_outbox.py +++ b/src/seedwork/application/inbox_outbox.py @@ -89,6 +89,10 @@ def get_all_deliveries(self) -> list[Message]: raise NotImplementedError() +class MessageBroker: + ... + + class InMemoryInbox(Inbox): events: list[Message] current: Optional[Message] diff --git a/src/seedwork/application/results.py b/src/seedwork/application/results.py new file mode 100644 index 0000000..3e2ad21 --- /dev/null +++ b/src/seedwork/application/results.py @@ -0,0 +1,45 @@ +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class ExecutionStep: + task: Any + handler: Any + result: Any + + +@dataclass +class ExecutionChain: + """This class captures the sequence of execution steps and their results during command/event execution.""" + + steps: list[ExecutionStep] = field(default_factory=list) + + def add(self, step: ExecutionStep): + self.steps.append(step) + + def is_success(self): + return all([r.is_success() for r in self.steps]) + + def triggered_events(self, type_of=None): + all_events = [] + for step in self.steps: + all_events.extend(step.result.events if step.result else []) + + if type_of: + return list(filter(lambda e: isinstance(e, type_of), all_events)) + return all_events + + def extend(self, chain): + self.steps.extend(chain.steps) + + @property + def payload(self): + command_result = self.steps[0].result + return command_result.payload if command_result else None + + @classmethod + def one(cls, step: ExecutionStep): + chain = cls() + chain.add(step) + return chain diff --git a/src/seedwork/domain/exceptions.py b/src/seedwork/domain/exceptions.py index 37a1dc2..62f8133 100644 --- a/src/seedwork/domain/exceptions.py +++ b/src/seedwork/domain/exceptions.py @@ -12,7 +12,6 @@ def __str__(self): class EntityNotFoundException(Exception): def __init__(self, repository, **kwargs): - message = f"Entity with {kwargs} not found" super().__init__(message) self.repository = repository diff --git a/src/seedwork/infrastructure/postgres_inbox.py b/src/seedwork/infrastructure/postgres_inbox.py new file mode 100644 index 0000000..db8a0ca --- /dev/null +++ b/src/seedwork/infrastructure/postgres_inbox.py @@ -0,0 +1,58 @@ +from datetime import datetime +from typing import Optional + +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import Session +from sqlalchemy.sql.schema import Column +from sqlalchemy.sql.sqltypes import JSON, DateTime, String + +from seedwork.application.events import IntegrationEvent +from seedwork.application.inbox_outbox import Inbox, Message, MessageStatus +from seedwork.domain.entities import AggregateRoot +from seedwork.infrastructure.database import Base + + +class InboxMessageModel(Base): + __tablename__ = "inbox" + + id = Column(UUID(as_uuid=True), primary_key=True) + event_type = Column(String, nullable=False) + event_data = Column(JSON, nullable=False) + created_at = Column(DateTime, nullable=False, default=datetime.utcnow) + aggregate_id = Column(UUID(as_uuid=True), nullable=True) + aggregate_type = Column(String, nullable=True) + status = Column(String, nullable=False, default=MessageStatus.PENDING) + + +class PostgresInbox(Inbox): + def __init__(self, db_session: Session): + self._session = db_session + + def add( + self, event: IntegrationEvent, source: Optional[AggregateRoot] = None + ) -> None: + message = InboxMessageModel( + id=event.event_id, + event_type=str(event.__class__.__name__), + event_data=event.__dict__, + aggregate_type=type(source).__name__ if source else None, + aggregate_id=source.id if source else None, + ) + self._session.add(message) + + def get_next_pending(self) -> Optional[IntegrationEvent]: + """Get pending events from the outbox""" + raise NotImplementedError() + + def mark_as_processed(self): + """Mark event as completed""" + raise NotImplementedError() + + def mark_as_failed(self): + """Mark event as failed""" + raise NotImplementedError() + + def get_messages(self) -> list[Message]: + """Get all deliveries""" + messages = self._session.query(InboxMessageModel).all() + return messages diff --git a/src/seedwork/infrastructure/postgres_outbox.py b/src/seedwork/infrastructure/postgres_outbox.py index 2cc5592..50090c7 100644 --- a/src/seedwork/infrastructure/postgres_outbox.py +++ b/src/seedwork/infrastructure/postgres_outbox.py @@ -13,7 +13,7 @@ class OutboxMessageModel(Base): - __tablename__ = "outbox_messages" + __tablename__ = "outbox" id = Column(UUID(as_uuid=True), primary_key=True) event_type = Column(String, nullable=False) @@ -33,7 +33,7 @@ def add( ) -> None: message = OutboxMessageModel( id=event.event_id, - event_type=str(event.__class__), + event_type=str(event.__class__.__name__), event_data=event.__dict__, aggregate_type=type(source).__name__ if source else None, aggregate_id=source.id if source else None, @@ -54,4 +54,5 @@ def mark_as_failed(self): def get_messages(self) -> list[Message]: """Get all deliveries""" - return self._session.query(OutboxMessageModel).all() + messages = self._session.query(OutboxMessageModel).all() + return messages diff --git a/src/seedwork/tests/application/test_application_with_outbox.py b/src/seedwork/tests/application/test_application_with_outbox.py index b503259..61e3c13 100644 --- a/src/seedwork/tests/application/test_application_with_outbox.py +++ b/src/seedwork/tests/application/test_application_with_outbox.py @@ -15,6 +15,7 @@ def test_command_execution_returns_integration_events(): In this test, we want to verify that the application stores integration events in the outbox. """ + # arrange @dataclass class CompleteOrder(Command): order_id: int @@ -27,7 +28,7 @@ class NotifyBuyerOfOrderCompletion(IntegrationEvent): order_id: int buyer_email: str - class PrepareOrderForShipping(IntegrationEvent): + class NotifyPrepareOrderForShipping(IntegrationEvent): order_id: int outbox = [] @@ -45,18 +46,19 @@ def complete_order(command: CompleteOrder): @app.domain_event_handler def on_order_completed(event: OrderCompleted): - integration_event = PrepareOrderForShipping(order_id=event.order_id) + integration_event = NotifyPrepareOrderForShipping(order_id=event.order_id) return EventResult.success(event=integration_event) @app.on_exit_transaction_context def on_exit_transaction_context(ctx, exc_type, exc_val, exc_tb): - outbox = ctx.dependency_provider["outbox"] + outbox = ctx.get_dependency("outbox") if exc_type is None: - outbox.extend(ctx.integration_events) + outbox.extend(ctx.collect_integration_events()) + # act + command = CompleteOrder(order_id=1, buyer_email="john.doe@example.com") with app.transaction_context() as ctx: - ctx.execute_command( - CompleteOrder(order_id=1, buyer_email="john.doe@example.com") - ) + ctx.execute_command(command) + # assert assert len(outbox) == 2