Simple Event Store - jak to działa?

ses, eventsourcing

Zanim uruchomimy projekt z komponentem SES kilka słów o tym jak on działa. SimpleEventStore jest zbiorem funkcji infrastrukturalnych wspomagających pisanie aplikacji w oparciu o architekturę CQRS+ES. Wiele osób uważa ten pattern za jakąś ciekawostkę przyrodniczą, a tak naprawdę koncept jest bardzo prosty, a jego budowa prosta jak budowa cepa. O CQRS i ES słychać teraz niemal wszędzie, ale niewielu decyduje się na praktyczne użycie.

CQRS

W kontekście budowy projektu, w skrócie, można by to opisać jako rozbicie aplikacji na dwie części.

W jednej przetwarzamy logikę domenową i zapisujemy wynik tego przetwarzania (na potrzeby podejmowania kolejnych decyzji domenowych). W drugiej odczytujemy stan wyprodukowany przez wcześniej zapisane zmiany. To by było na tyle. Implementacja tych założeń jest całkowicie dowolna i można ją wykonać na wiele sposobów. Pamiętacie video Maćka?

Event Sourcing

Stosując event sourcing zapisujemy ten wynik przetwarzania w postaci serii wyemitowanych zdarzeń, a nie jak w tradycyjnym podejściu, w formie jednego obecnego stanu. No tak, ale jak w takim razie działają nasze obiekty domenowe? Otóż przy takim podejściu, do odbudowy stanu na potrzeby obiektów domenowych musimy odczytać wszystkie wcześniej odpisane zdarzenia (wszystkie, które wyemitował dany agregat).

No dobrze. A co z tą drugą częścią, służącą do odczytu danych? Przecież coś trzeba wyświetlić użytkownikowi w UI? Druga część w takim projekcie budowana jest w następujący sposób.

Wszystkie wyemitowane i zapisane zdarzenia musimy obsłużyć, wydobyć z nich informacje i gdzieś zapisać (w dowolnym formacie: rdbms, no-sql, struktura w pamięci, etc.). Do tego służą projekcje. To właśnie one są odpowiedzialne za to, aby nasłuchiwać na zdarzenia i na ich podstawie budować widoki (danych) do późniejszego użycia (odczytu).

Co to jest agregat (Aggregate), zdarzenie (Event), projekcja (Projection)? Więcej o tym możecie się dowiedzieć czytając popularny post Martina Fowlera lub ciekawą serię postów Darka Pawlukiewicza.

SimpleEventStore

Wszystko to co powyżej zostało opisane zostało opakowane w odpowiednie komponenty zawarte właśnie w SES. Programista nie musi się zastanawiać za każdym razem jak tą infrastrukturę napisać/użyć. Instalujemy paczki SES i wszystko jest gotowe. Cały 'boilerplate code' został schowany, aby nie zaprzątać sobie nim głowy.

Przykład?

Napiszmy sobie zatem przypadek użycia. Posłużymy się przykładem z projektu SimpleEventStore.Samples. Klasa ShoppingCart - koszyk sklepowy, gdzie przechowujemy wybrane przez użytkownika towary. Problem banalny, logiki w przykładzie nie za wiele, ale im bardziej rozbudowany sklep tym logika zapewne będzie bogatsza. Co więcej raz wyemitowane zdarzenia pozwalają na wyciąganie wniosków z zachowań klientów (pastwienia się nad koszykiem) w dużo późniejszym czasie, czyli wtedy, gdy pojawi się taka potrzeba biznesowa. Zdarzenia są zapisane po wsze czasy.

public class ShoppingCart : Aggregate<ShoppingCartState>
{
    public ShoppingCart() { }

    public ShoppingCart(Guid id, Guid customerId)
    {
        Id = id;
        Apply(new ShoppingCartCreated(Id, customerId));
    }

    public void AddItem(Guid itemId, string name, int quantity)
    {
        if(quantity > 10) throw new InvalidOperationException("Adding items with quantity larger than 10 is not allowed.");
        Apply(new ItemAddedToShoppingCart(Id, itemId, name, quantity));
    }

    public void RemoveItem(Guid itemId)
    {
        if (State.Items.All(x => x.Id != itemId))
            throw new InvalidOperationException($"Item {itemId} not found in cart {Id}.");
        Apply(new ItemRemovedFromShoppingCart(Id, itemId));
    }
}

Pierwszy konstruktor (bez parametrów) jest potrzebny do załadowania obiektu z repozytorium. Drugi konstruktor potrzebny będzie wtedy, gdy chcemy utworzyć nowy koszyk. Jak widać obiekt ten posiada tylko i wyłącznie zachowania (metody). Nie ma tu właściwości zwracających stan logiczny (brak getterów), ani tym bardziej właściwości pozwalających na zmianę stanu (brak setterów). Podstawowa sprawa - nie posłgujemy się tymi obiektami w UI.

Stwórzmy zatem nowy koszyk, dodajmy do niego jedną pozycję z produktem i zapiszmy zmiany do bazy.

var commitId = SequentialGuid.NewGuid();
var streamId = SequentialGuid.NewGuid();
var itemId = SequentialGuid.NewGuid();

var cart = new ShoppingCart(streamId, customerId); // nowy koszyk dla klienta customerId
cart.AddItem(itemId, name: "Product 1", quantity: 3); // dodajemy pozycję

await repo.SaveChangesAsync(cart, commitId); // zapisujemy zmiany

SequentialGuid, a nie po prostu Guid?

SequentialGuid jest specjalną implementacją generującą Guid w takiej postaci, którą lubi mechanizm indeksowania bazy SQL Server. Jak nazwa wskazuje kolejne guidy są sekwencyjne. Dzięki temu indeksy w bazie nie fragmentują się w tak brzydki sposób jak przy standardowym Guid.NewGuid(). :)

Przetwarzanie zdarzeń

Co tak naprawdę się dzieje w powyższym kodzie? Konstruktor emituje zdarzenie ShoppingCartCreated. Dzięki temu wiemy, że powstał nowy koszyk dla klienta o identyfikatorze customerId.

Metoda AddItem jest banalna i pozwala na dodawanie pozycji do koszyka, ale wykonuje dodatkowe sprawdzenie, które nie dopuszcza pozycji zawierających większą ilość niż 10 sztuk.

Wykonanie SaveChangesAsync zapisuje zmiany do bazy. Co dokładnie zapisuje? Aggregate posiada metodę TakeUncommittedEvents, która zwraca wszystkie nowo wyemitowane zdarzenia. SES zbierze je razem i zapisze w bazie (implementacja SimpleEventStore.MsSql).

Metoda Apply w agregacie ma dodatkowe działanie. Oprócz tego, że kolekcjonuje nowe zdarzenia, to dodatkowo uruchamia proces budowania stanu agregatu na potrzeby sprawdzenia warunków logicznych. Innymi slowy uruchamia projekcję do odbudowania wewnętrznego stanu agregatu.

Przykładowym warunkiem logicznym jest sprawdzenie istnienia pozycji koszyka w metodzie RemoveItem. Jak zatem to wygląda? Zwróćcie uwagę na parametr generyczny koszyka, gdzie mamy zadeklarowany typ ShoppingCartState.

Ten obiekt jest odpowiedzialny za przechowanie odbudowanego stanu.

public class ShoppingCartState : IMemento
{
    public List<CartItem> Items { get; private set; }

    public ShoppingCartState()
    {
        Items = new List<CartItem>(2);
    }

    private void On(ItemAddedToShoppingCart obj)
    {
        Items.Add(new CartItem
        {
            Id = obj.ItemId,
            Name = obj.Name,
            Quantity = obj.Quantity
        });
    }

    private void On(ItemRemovedFromShoppingCart obj)
    {
        var item = Items.FirstOrDefault(x => x.Id == obj.ItemId);
        if (item != null) Items.Remove(item);
    }
}

Metoda Apply emitując zdarzenie danego typu stara się wykonać metodę On, gdzie parametrem wejściowym jest właśnie typ zdarzenia. Jeśli tej metody nie ma to po prostu nic się nie dzieje.

Metody On uruchamiane są w dwóch przypadkach. Pierwszy - w momencie jawnego wywołania Apply, na przykład w metodzie AddItem. W tym przypadku wykona się On(ItemAddedToShoppingCart obj). Drugi przypadek to odbudowa stanu agregatu w momencie załadowania z bazy.

Skąd taki podział? Dlaczego nie modyfikujemy stanu od razu w metodzie AddItem? Przecież to podwójna robota i tyle klepania kodu?

Rozdzielenie operacji sprawdzenia logiki domenowej od zmiany stanu jest właśnie na potrzeby odbudowy stanu ze zdarzeń wcześniej wyemitowanych. Metoda AddItem emituje zdarzenie zaś odpowiednia metoda On(...) jest tylko projekcją. Właśnie te projekcje odbudowują stan agregatu w trakcie ładowania zdarzeń z bazy.

var cart = await repo.LoadAsync(streamId);

Jeszcze raz. Zasada działania jest taka, że metoda na agregacie wykonuje tylko i wyłącznie logikę domenową i jeśli odpowiednie warunki zachodzą emituje jedno lub więcej zdarzeń.

Metody projekcji On(...) służą tylko i wyłącznie do odbudowy stanu agregatu na podstawie danych zawartych w zdarzeniach. Metody te nie mogą zawierać żadnej logiki. Ich działanie musi być zawsze takie same niezależnie od danych. Jeśli taka metoda zawiera instrukcje warunkowe to jest to bardzo podejrzane. Z reguły metody projekcji także nie rzucają wyjątkami. Skąd takie podejście? Pomyślcie jak zachowywałby się agregat podczas każdego ładowania danych z bazy. Nie chcemy, aby odbudowa stanu była zmienna, ani niestabilna ze względu na możliwość wystąpienia wyjątku.

Ale w podanym przykładzie w metodzie On(ItemRemovedFromShoppingCart obj) jest if? Tak, jest, ale nie jest częścią żadnej logiki. Wykonywane jest dodatkowe sprawdzenie, aby właśnie nie nastąpił wyjątek w przypadku braku pozycji.

Snapshot

Słyszałem głosy, że ES ma w swych założeniach luki, bo jak się tych zdarzeń uzbiera to ładowanie takiego agragatu będzie wolne. Bo weź za każdym wczytuj obiekt co ma 1000 zdarzeń... Fakt. Czasem może się tak zdarzyć, ale starzy wyjadacze w tym temacie na bazie doświadczeń szybciutko sprostują, że takich obiektów, które mają setki, tysiące lub więcej zdarzeń raczej nie spotyka się dużo, a nawet twierdzą, że większość systemów (poprawnie zamodelowanych) nie będzie posiadać takich w ogóle.

No, ale co jeśli się zdarzy? Wtedy możemy użyć do tego migawkę (piekne polskie słowo), czyli Snapshot. SimpleEventStore posiada od razu wbudowany mechanizm do prostego tworzenia migawek. W sumie to powyższy kod zawiera wszystko co jest do tego potrzebne. :)

Klasa ShoppingCartState implementuje pusty interface IMemento, który mówi, że ten obiekt może być wykorzystany jako migawka dla agregatu ShoppingCart. Jak stworzyć i zapisać migawkę?

var snapshot = cart.GetSnapshot(); // zwraca migawkę, której stan to obecna instancja ShoppingCartState
await store.Advanced.UpdateSnapshotAsync(cart.Id, snapshot.Version, snapshot.State); // zapisuje migawkę

Każda migawka posiada numer wersji. Dzięki temu SES wie, że przy ładowaniu danych agregatu, jeśli posiada migawkę w wersji 1000, musi załadować zdarzenia nowsze od wersji 1000. Oznacza to, że nie ładuje pierwszego tysiąca zdarzeń. Dzięki temu zmniejsza się znacznie ilość danych, które trzeba wyczytać z bazy i czas załadowania agregatu będzie dużo krótszy.

Dużo kodu, infrastruktury brak

Jak widać powyżej, podczas działania przedstawionych przykładów skupiamy się głównie nad obsługą logiki domenowej bez wnikania w bebeszki infrastrukturalne. Dzieje się to dlatego, że wszystko zostało zapakowane i ukryte pod kilkoma prostymi abstrakcjami.

  • mamy abstrakcyjną klasę bazową Aggregate, która zajmuje się emitowaniem zdarzeń i odbudową stanu poprzez wewnętrzne projekcje
  • Repository powołuje agregat o zadanym id, wstrzykuje do niego zdarzenia wyczytane z bazy oraz zapisuje nowo wyemitowane zdarzenia do bazy
  • IMemento - migawka (Snapshot)

Chcecie infrastrukturkę? No dobrze, niech już będzie, proszę bardzo.

IEventStore store = new EventStoreBuilder()
    .WithSerializer(new JilSerializer())
    .WithLogger(new NLogLogger("Ses"))
    .WithDefaultContractsRegistry(typeof(SampleRunner).Assembly)
    .WithMsSqlPersistor(connectionString, x =>
    {
        x.RunLinearizer(TimeSpan.FromMilliseconds(20), TimeSpan.FromMinutes(20));
    })
    .Build();

Powyższy kawałek kodu jest potrzebny do tego, aby uruchomić działanie SimpleEventStore po stronie logiki domenowej. Jak widać nie wymusza konkretnego komponentu potrzebnego do serializacji zdarzeń ani logera. Dwa interfejsy ISerializer i ILogger, które dostarcza SES powalają na całkowicie dowolną implementację. Ja stosuję odpowiednio Jil do serializacji i deserializacji oraz NLog jako loger.

Hmm... Tak mi się przypomniało... Pamiętacie post Jimmiego Bogarda, który pisał o mitach CQRS/ES?

Jeśli chcecie, aby projekcje były tworzone w tej samej transakcji co logika domenowa to w sumie już nic więcej nie potrzeba. Repository posiada bowiem metodę OnAfterSaveChanges, gdzie w prosty sposób można wpiąć własny mechanizm propagacji zdarzeń i ich przechwytywania w celu wykonania projekcji. Takim przykładowym mechanizmem może być MediatR lub jakakolwiek inna implementacja IObserver/IObservable. Wszystko wtedy będzie 'in-sync'. Ha. :)

Co dalej?

Wszystko to co powyżej opisałem służy do obsługi tej części systemu, która jest odpowiedzialna za przetwarzanie logiki domenowej. W kolejnym wpisie rozłożymy na łopatki tą drugą część, aby użytkownicy naszego systemu mogli cokolwiek zobaczyć w UI. Dokładniej rzecz biorąc będziemy posługiwać się komponentem SimpleEventStore.Subscriptions. Sprawdzimy jak to będzie działać kiedy projekcje będą budowane 'async' i wprowadzimy sobie pojęcie 'eventual consistency' :)

Simple Event Store - działa na produkcji

ses, eventsourcing

Troszkę mi to zajęło. Od kilku dni Simple Event Store (w skrócie SES) działa w środowisku produkcyjnym. Prace co prawda jeszcze w pełni nie zakończone, ale podstawowa funkcjonalność już działa i to całkiem nieźle (o tym będzie w innym poście).

SES jest dosyć prostą implementacją, która nie zrywa czapek z głów jeśli chodzi o prędkość. Jego zaleta to niski próg wejścia oraz łatwość w użyciu.

Co najważniejsze SES to nie jest framework. SES to biblioteczka, która wspomaga, porządkuje i ukrywa cały kod infrastrukturalny dając developerowi możliwość skupienia się nad samą logiką biznesową. SES jako odrębna biblioteka powstał w wyniku wyjęcia infrastrukturalnego kodu z całego projektu.

Instalacja

SES można pobrać bezpośrednio z githuba i skompilować samemu lub zainstalować bezpośrednio z serwera nuget.

SimpleEventStore.Abstracts

NuGet Status

Tu są zgromadzone wszystkie interfejsy oraz pomocnicze implementacje, które są potrzebne do działania zarówno podstawowego komponentu SimpleEventStore jak i SimpleEventStore.Subscriptions.

SimpleEventStore

NuGet Status

Podstawowa implementacja, która implementuje główny interfejs do zarządzania SES, czyli IEventStore.

SimpleEventStore.MsSql

NuGet Status

Implementacja SimpleEventStore dla bazy danych MS Sql Server.

SimpleEventStore.Domain

NuGet Status

Podstawowe interfejsy i implementacje dla obiektów tworzonej domeny biznesowej takich jak: Aggregate, Repository czy ValueObject. Biblioteka ta nie jest potrzebna do działania SES, ale znacznie ułatwia posługiwanie się nim.

SimpleEventStore.Subscriptions

NuGet Status

SimpleEventSore.Subscriptions automatyzuje dla nas proces czytania logu ze zdarzeniami. Zajmuje się wywoływaniem handlerów, a także utrzymuje stan ostatnio przetworzonych danych.

SimpleEventStore.Subscriptions.MsSql

NuGet Status

Implementacja SimpleEventStore.Subscriptions dla bazy danych MS Sql Server.

Możesz zainstalować wszystkie paczki wykonując pokolei poniższe linie w okienku Package Manager Console.

Podstawowe paczki pozwalające na zapis zdarzeń do bazy:

    Install-Package SimpleEventStore.Abstracts
    Install-Package SimpleEventStore
    Install-Package SimpleEventStore.MsSql
    Install-Package SimpleEventStore.Domain

Paczki pozwalające na zautomatyzowane przetwarzanie zdarzeń z bazy:

    Install-Package SimpleEventStore.Subscriptions
    Install-Package SimpleEventStore.Subscriptions.MsSql

W kolejnych postach opiszę dokładniej jak wygląda proces konfiguracji oraz podam przykładowe przypadki użycia. Stay tuned... :)

Dodanie parametru table-valued spowalnia wykonanie zapytania

sql, optymalizacja, ses, eventsourcing

Próba optymalizacji

Podczas implementacji Ses.MsSql chciałem jak najbardziej zoptymalizować zapis danych do bazy. Jedną ze znanych metod jest zmniejszenie ilości odwołań do serwera bazy danych.

Kiedy pojawia się więcej niż jeden rekord do zapisania standardowo zapisujemy w pętli jeden po drugim. Niestety każdy zapis to wywołanie zapytania typu INSERT, czyli strzał do bazy. A gdyby tak wysłać do serwera wszystkie rekordy naraz?

MS SQL Server od wersji 2008 posiada taką możliwość. Możemy zdefiniować własny typ, na przykład:

CREATE TYPE dbo.NewEvents AS TABLE (
    [Version] INT NOT NULL,
    [ContractName] NVARCHAR(225) NOT NULL,
    [Payload] VARBINARY(MAX) NOT NULL
)

Teraz kawałek SQLa, w którym skorzystamy z naszego nowego typu:

--DECLARE @Events NewEvents

INSERT INTO [Streams]([StreamId],[CommitId],[Version],[ContractName],[Payload],[CreatedAtUtc])
SELECT
    @StreamId,
    @CommitId,
    [Version],
    [ContractName],
    [Payload],
    @CreatedAtUtc
FROM
    @Events
ORDER BY
    [Version];

Taka konstrukcja pozwala na wrzucenie do tabeli Streams wszystkich rekordów przesłanych w strukturze NewEvents. Pokazany powyżej przykład zapytania to tylko wycinek procedury, która odpowiedzialna jest za dodawanie nowych zdarzeń do bazy projektu Ses.MsSql.

No dobra, mamy załatwioną część po stronie sqla. Teraz kod c#:

await cmd
    .AddInputParam(SqlQueries.InsertEvents.ParamStreamId, DbType.Guid, streamId)
    .AddInputParam(SqlQueries.InsertEvents.ParamCommitId, DbType.Guid, commitId)
    .AddInputParam(SqlQueries.InsertEvents.ParamCreatedAtUtc, DbType.DateTime, DateTime.UtcNow)
    .AddInputParam(SqlQueries.InsertEvents.ParamMetadataPayload, DbType.Binary, metadata, true)
    .AddInputParam(SqlQueries.InsertEvents.ParamIsLockable, DbType.Boolean, isLockable)
    .AddInputParam(SqlQueries.InsertEvents.ParamEvents, records)
    .ExecuteNonQueryAsync(cancellationToken)
    .ConfigureAwait(false);

gdzie records to poprostu:

IEnumerable<SqlDataRecord>

Typ SqlDataRecord został dodany do framework.net już w wersji 2.0.

Pokazany powyżej przykład daje (powinien dać) nam taką przewagę nad pojedynczym insertem, że przesłanie jednego czy wielu rekordów nie powinno zwiększać czasu wykonania (przynajmniej w pewnej skali).

I faktycznie tak jest. Czy przesyłam 1, 10 czy 100 rekordów to nadal czasy wykonania są podobne - zachowuje pewną liniowość.

Fiasko

Niestety kiedy wykonałem pomiary dla procedury bez parametru typu table-valued to wyszło mi, że cała konstrukcja wykonuje się ponad 9 razy szybciej. Problem opisywałem już jakiś czas temu na SO. Do tej pory nie znalazłem rozwiązania.

Wnioski

Taka optymalizacja przynajmniej w tym przypadku traci jakikolwiek sens.

Co więcej, większość biznesowych operacji generuje zazwyczaj jedno (dwa) zdarzenia. Dla takiej ilości wystarczająco wydajnym mechanizmem jest zapis zdarzeń rekord po rekordzie, a na dodatek w przypadku wystąpienia jakiegokolwiek błędu (błąd wersji agregatu - concurrency) od razu dostajemy informację, na którym zdarzeniu się to stało i możemy bezpośrednio podjąć odpowiednie kroki. O czym w kolejnych wpisach.

SES - Podstawowe interfejsy

ses, eventstore, eventsourcing

Dzisiaj zrobiłem kilka zmian, choć nie wiele, bo miałem na głowie inną pilną robotę. Wieczorkiem usiadłem i w sumie nie wszystko co wcześniej napisałem mi się podoba. Jutro zatem duże zmiany.

Mimo to jest kilka rzeczy, które w miarę się krystalizują i prawdopodobnie w tej postaci zostaną zachowane.

IEventStore

Podstawowy interface całej biblioteki, czyli IEventStore.

public interface IEventStore
{
    Task<IReadOnlyEventStream> Load(
        Guid streamId,
        bool pessimisticLock,
        CancellationToken cancellationToken = default(CancellationToken));

    Task SaveChanges(
        Guid streamId,
        int expectedVersion,
        IEventStream stream,
        CancellationToken cancellationToken = default(CancellationToken));
}

Definiuje tylko dwie metody:

  • Load - wczytuje dane z bazy danych
  • SaveChanges - zapisuje nowe zdarzenia do bazy danych

Oczywiście pod nimi kryje się troszkę logiki takiej jak serializacja/deserializacja, konwersja do nowszej wersji, rozwiązywanie konfliktów w przypadku wykrycia kolizji wersji, itd.

IEvent (i IMemento)

Zdarzenie marker, ale ułatwia kilka rzeczy. Po pierwsze w łatwy sposób mogę posługiwać się snapshotem, gdzie jeśli istnieje to zawsze będzie pierwszym elementem w liście odczytanych zdarzeń, bo IMemento dziedziczy po IEvent.

Dzięki temu mogę zrobić tak:

var events = await _settings.Persistor.Load(streamId, pessimisticLock);
var snapshot = events[0] as IMemento;
var currentVersion = snapshot?.Version + events.Count ?? events.Count;
return new ReadOnlyEventStream(events, currentVersion);

Słowem wyjaśnienia. Ładujemy zdarzenia. Sprawdzamy, czy pierwszy element to IMemento. Jeśli tak to znaczy, że mamy załadowany snapshot, a zdarzenia są tylko te, które mają wersję od niego starszą. Jeśli nie jest to IMemento to znaczy, że mamy załadowane wszystkie zdarzenia dla danego streamId.

Takie podejście przekłada się także na to jak będzie wyglądała implementacja IAggregate, ale o tym innym razem.

IReadOnlyEventStream

Definuje obiekt, w którym dostaniemy w całości załadowane dane dla danego streamId.

public interface IReadOnlyEventStream
{
    IReadOnlyList<IEvent> Events { get; } // Lista zdarzeń
    int CurrentVersion { get; } // Bieżąca wersja w bazie danych
}

IEventStream

W odróżnieniu od poprzedniego interfejsu IEventStream definiuje nam obiekt, który wysyłamy do eventstore nowe zdarzenia, aby zapisał je do bazy danych.

public interface IEventStream
{
    Guid CommitId { get; } // Identyfikator zmiany
    IEvent[] Events { get; } // Lista nowych zdarzeń
    IDictionary<string, object> Metadata { get; set; } // Metadane, związane z zapisywanymi zdarzeniami
}

PesimissticLock

Pewnie intryguje Was ten parametr. I słusznie. W sumie to się zastanawiam czy nie powinienem zmienić, aby miał domyślną wartość na false. A skąd w ogóle taki pomysł, aby coś takiego umieszczać w EventStore? O tym w kolejnym wpisie. :)

Simple Event Store

ses, eventstore, eventsourcing

Właśnie umieściłem na githubie zarys projektu SimpleEventStore (w skrócie SES). W założeniach ma to być biblioteczka (library - not framework) wspomagająca tworzenie aplikacji opartych o eventsouring (ES), gdzie dane są przechowywane w relacyjnej bazie danych. Na początek w podstawowej implementacji MS SQL Server.

Główne założenia do projektu:

  • całe API asynchroniczne (async/await)
  • bez dodatkowych zależności
  • możliwość użycia dowolnego silnika relacyjnej bazy danych
  • wsparcie dla optymistycznego (i pesymistycznego dla szczególnych przypadków) blokowania
  • możliwość użycia dowolnej biblioteki do logowania dzięki interfejsowi ILogger
  • możliwość użycia dowolnej biblioteki do serializacji/deserializacji danych dzięki interfejsowi ISerializer
  • automatyczna konwersja zdarzeń do nowszej wersji
  • wbudowany i łatwo podmienialny mechanizm subskrypcji
  • idempotentny subskrybent z automatu

Dodatkową zaletą ma być odczyt zdarzeń przy zachowaniu porządku zapisu. Przedstawię to na następującym przykładzie. Załóżmy, że klient rejestruje się w naszej aplikacji. Z punktu widzenia logiki biznesowej ważne jest, aby wiedzieć, że klient się zarejestrował (jedno zdarzenie) oraz dodatkową informacją jest to, że zaakceptował możliwość odbierania informacji handlowych w postaci newslettera (drugie zdarzenie).

Nasz agregat (User) wyemituje zatem dwa zdarzenia:

  • UserRegistered
  • UserNewsletterAccepted

SimpleEventStore w podstawowej implementacji (czyli dla MS SQL Server) będzie gwarantował, że zapisane zdarzenia będą mogły być przetwarzane przez subskrybentów zawsze w tej samej kolejności. Nie będzie możliwa sytuacja, że system będzie chciał oznaczać możliwość wysyłania newslettera do niezarejestrowanego jeszcze użytkownika.

Przetwarzanie zdarzeń bez zachowania kolejności budzi wiele problemów. Ten temat powraca jak bumerang na łamach grupy DDD/CQRS/ES.

Wszystko to co powyżej zostało określone w założeniach da się wykonać dzięki temu, że relacyjne bazy danych wspierają transakcje. To bardzo ułatwia. :)

Poniżej przykładowy (oczekiwany) kod obrazujący api SES:

var options = new TransactionOptions {IsolationLevel = IsolationLevel.ReadCommitted};
using (var scope = new TransactionScope(TransactionScopeOption.Required, options))
{
    var id = SequentalGuid.NewGuid();

    var aggregate = new ShoppingCart();
    aggregate.AddItem(SequentalGuid.NewGuid(), name: "Product 1", quantity: 3);


    var stream = new EventStream(id)

    // Appending events
    stream.Append(aggregate.TakeUncommittedEvents());

    // Adding metadata item (key, value)
    stream.Advanced.AddMetadata("RequestIP", "0.0.0.0");
    stream.Advanced.AddMetadata("User", "John Doe");

    await store.SaveChanges(stream);

    await scope.Complete();
}

A teraz z użyciem Repository, który skrzętnie ukrywa niepotrzebne szczegóły z punktu widzenia obsługi logiki samej aplikacji:

using (var scope = new TransactionScope(TransactionScopeOption.Required, options))
{
    using (var repo = new SourcedRepo<ShoppingCart>(store))
    {
        var aggregate = await repo.Load(id);

        aggregate.AddItem(Guid.NewGuid(), "Product 1", 3);

        await repo.SaveChanges(aggregate);
    }
    scope.Complete();
}

Na potrzeby jednego z projektów popełniłem już podobne rozwiązanie. Niestety wcześniej zbyt mocno wzorowałem się na NEventStore. Nie wpłynęło to dobrze na prawidłową implementację wcześniejszego rozwiązania. Zresztą jeden z commiterów NES także sam zauważył, że NES poszedł w złym kierunku.

Na pierwszy rzut oka można sobie pomyśleć 'to nie będzie działać wydajnie'. Nic bardziej mylnego. W poprzednim rozwiązaniu na moim i5 (dysk SSD) spokojnie dało się wyciągnąć ponad 6000 msg/sec. Jak na mój gust całkiem nieźle.