Simple Event Store - jak to działa?

ses, eventsourcing Komentarze

Zanim uruchomimy projekt z komponentem SES kilka słów o tym jak on działa. SimpleEventStore jest zbiorem funkcji infrastrukturalnych wspomagających pisanie aplikacji w oparciu o architekturę CQRS+ES. Wiele osób uważa ten pattern za jakąś ciekawostkę przyrodniczą, a tak naprawdę koncept jest bardzo prosty, a jego budowa prosta jak budowa cepa. O CQRS i ES słychać teraz niemal wszędzie, ale niewielu decyduje się na praktyczne użycie.

CQRS

W kontekście budowy projektu, w skrócie, można by to opisać jako rozbicie aplikacji na dwie części.

W jednej przetwarzamy logikę domenową i zapisujemy wynik tego przetwarzania (na potrzeby podejmowania kolejnych decyzji domenowych). W drugiej odczytujemy stan wyprodukowany przez wcześniej zapisane zmiany. To by było na tyle. Implementacja tych założeń jest całkowicie dowolna i można ją wykonać na wiele sposobów. Pamiętacie video Maćka?

Event Sourcing

Stosując event sourcing zapisujemy ten wynik przetwarzania w postaci serii wyemitowanych zdarzeń, a nie jak w tradycyjnym podejściu, w formie jednego obecnego stanu. No tak, ale jak w takim razie działają nasze obiekty domenowe? Otóż przy takim podejściu, do odbudowy stanu na potrzeby obiektów domenowych musimy odczytać wszystkie wcześniej odpisane zdarzenia (wszystkie, które wyemitował dany agregat).

No dobrze. A co z tą drugą częścią, służącą do odczytu danych? Przecież coś trzeba wyświetlić użytkownikowi w UI? Druga część w takim projekcie budowana jest w następujący sposób.

Wszystkie wyemitowane i zapisane zdarzenia musimy obsłużyć, wydobyć z nich informacje i gdzieś zapisać (w dowolnym formacie: rdbms, no-sql, struktura w pamięci, etc.). Do tego służą projekcje. To właśnie one są odpowiedzialne za to, aby nasłuchiwać na zdarzenia i na ich podstawie budować widoki (danych) do późniejszego użycia (odczytu).

Co to jest agregat (Aggregate), zdarzenie (Event), projekcja (Projection)? Więcej o tym możecie się dowiedzieć czytając popularny post Martina Fowlera lub ciekawą serię postów Darka Pawlukiewicza.

SimpleEventStore

Wszystko to co powyżej zostało opisane zostało opakowane w odpowiednie komponenty zawarte właśnie w SES. Programista nie musi się zastanawiać za każdym razem jak tą infrastrukturę napisać/użyć. Instalujemy paczki SES i wszystko jest gotowe. Cały 'boilerplate code' został schowany, aby nie zaprzątać sobie nim głowy.

Przykład?

Napiszmy sobie zatem przypadek użycia. Posłużymy się przykładem z projektu SimpleEventStore.Samples. Klasa ShoppingCart - koszyk sklepowy, gdzie przechowujemy wybrane przez użytkownika towary. Problem banalny, logiki w przykładzie nie za wiele, ale im bardziej rozbudowany sklep tym logika zapewne będzie bogatsza. Co więcej raz wyemitowane zdarzenia pozwalają na wyciąganie wniosków z zachowań klientów (pastwienia się nad koszykiem) w dużo późniejszym czasie, czyli wtedy, gdy pojawi się taka potrzeba biznesowa. Zdarzenia są zapisane po wsze czasy.

public class ShoppingCart : Aggregate<ShoppingCartState>
{
    public ShoppingCart() { }

    public ShoppingCart(Guid id, Guid customerId)
    {
        Id = id;
        Apply(new ShoppingCartCreated(Id, customerId));
    }

    public void AddItem(Guid itemId, string name, int quantity)
    {
        if(quantity > 10) throw new InvalidOperationException("Adding items with quantity larger than 10 is not allowed.");
        Apply(new ItemAddedToShoppingCart(Id, itemId, name, quantity));
    }

    public void RemoveItem(Guid itemId)
    {
        if (State.Items.All(x => x.Id != itemId))
            throw new InvalidOperationException($"Item {itemId} not found in cart {Id}.");
        Apply(new ItemRemovedFromShoppingCart(Id, itemId));
    }
}

Pierwszy konstruktor (bez parametrów) jest potrzebny do załadowania obiektu z repozytorium. Drugi konstruktor potrzebny będzie wtedy, gdy chcemy utworzyć nowy koszyk. Jak widać obiekt ten posiada tylko i wyłącznie zachowania (metody). Nie ma tu właściwości zwracających stan logiczny (brak getterów), ani tym bardziej właściwości pozwalających na zmianę stanu (brak setterów). Podstawowa sprawa - nie posłgujemy się tymi obiektami w UI.

Stwórzmy zatem nowy koszyk, dodajmy do niego jedną pozycję z produktem i zapiszmy zmiany do bazy.

var commitId = SequentialGuid.NewGuid();
var streamId = SequentialGuid.NewGuid();
var itemId = SequentialGuid.NewGuid();

var cart = new ShoppingCart(streamId, customerId); // nowy koszyk dla klienta customerId
cart.AddItem(itemId, name: "Product 1", quantity: 3); // dodajemy pozycję

await repo.SaveChangesAsync(cart, commitId); // zapisujemy zmiany

SequentialGuid, a nie po prostu Guid?

SequentialGuid jest specjalną implementacją generującą Guid w takiej postaci, którą lubi mechanizm indeksowania bazy SQL Server. Jak nazwa wskazuje kolejne guidy są sekwencyjne. Dzięki temu indeksy w bazie nie fragmentują się w tak brzydki sposób jak przy standardowym Guid.NewGuid(). :)

Przetwarzanie zdarzeń

Co tak naprawdę się dzieje w powyższym kodzie? Konstruktor emituje zdarzenie ShoppingCartCreated. Dzięki temu wiemy, że powstał nowy koszyk dla klienta o identyfikatorze customerId.

Metoda AddItem jest banalna i pozwala na dodawanie pozycji do koszyka, ale wykonuje dodatkowe sprawdzenie, które nie dopuszcza pozycji zawierających większą ilość niż 10 sztuk.

Wykonanie SaveChangesAsync zapisuje zmiany do bazy. Co dokładnie zapisuje? Aggregate posiada metodę TakeUncommittedEvents, która zwraca wszystkie nowo wyemitowane zdarzenia. SES zbierze je razem i zapisze w bazie (implementacja SimpleEventStore.MsSql).

Metoda Apply w agregacie ma dodatkowe działanie. Oprócz tego, że kolekcjonuje nowe zdarzenia, to dodatkowo uruchamia proces budowania stanu agregatu na potrzeby sprawdzenia warunków logicznych. Innymi slowy uruchamia projekcję do odbudowania wewnętrznego stanu agregatu.

Przykładowym warunkiem logicznym jest sprawdzenie istnienia pozycji koszyka w metodzie RemoveItem. Jak zatem to wygląda? Zwróćcie uwagę na parametr generyczny koszyka, gdzie mamy zadeklarowany typ ShoppingCartState.

Ten obiekt jest odpowiedzialny za przechowanie odbudowanego stanu.

public class ShoppingCartState : IMemento
{
    public List<CartItem> Items { get; private set; }

    public ShoppingCartState()
    {
        Items = new List<CartItem>(2);
    }

    private void On(ItemAddedToShoppingCart obj)
    {
        Items.Add(new CartItem
        {
            Id = obj.ItemId,
            Name = obj.Name,
            Quantity = obj.Quantity
        });
    }

    private void On(ItemRemovedFromShoppingCart obj)
    {
        var item = Items.FirstOrDefault(x => x.Id == obj.ItemId);
        if (item != null) Items.Remove(item);
    }
}

Metoda Apply emitując zdarzenie danego typu stara się wykonać metodę On, gdzie parametrem wejściowym jest właśnie typ zdarzenia. Jeśli tej metody nie ma to po prostu nic się nie dzieje.

Metody On uruchamiane są w dwóch przypadkach. Pierwszy - w momencie jawnego wywołania Apply, na przykład w metodzie AddItem. W tym przypadku wykona się On(ItemAddedToShoppingCart obj). Drugi przypadek to odbudowa stanu agregatu w momencie załadowania z bazy.

Skąd taki podział? Dlaczego nie modyfikujemy stanu od razu w metodzie AddItem? Przecież to podwójna robota i tyle klepania kodu?

Rozdzielenie operacji sprawdzenia logiki domenowej od zmiany stanu jest właśnie na potrzeby odbudowy stanu ze zdarzeń wcześniej wyemitowanych. Metoda AddItem emituje zdarzenie zaś odpowiednia metoda On(...) jest tylko projekcją. Właśnie te projekcje odbudowują stan agregatu w trakcie ładowania zdarzeń z bazy.

var cart = await repo.LoadAsync(streamId);

Jeszcze raz. Zasada działania jest taka, że metoda na agregacie wykonuje tylko i wyłącznie logikę domenową i jeśli odpowiednie warunki zachodzą emituje jedno lub więcej zdarzeń.

Metody projekcji On(...) służą tylko i wyłącznie do odbudowy stanu agregatu na podstawie danych zawartych w zdarzeniach. Metody te nie mogą zawierać żadnej logiki. Ich działanie musi być zawsze takie same niezależnie od danych. Jeśli taka metoda zawiera instrukcje warunkowe to jest to bardzo podejrzane. Z reguły metody projekcji także nie rzucają wyjątkami. Skąd takie podejście? Pomyślcie jak zachowywałby się agregat podczas każdego ładowania danych z bazy. Nie chcemy, aby odbudowa stanu była zmienna, ani niestabilna ze względu na możliwość wystąpienia wyjątku.

Ale w podanym przykładzie w metodzie On(ItemRemovedFromShoppingCart obj) jest if? Tak, jest, ale nie jest częścią żadnej logiki. Wykonywane jest dodatkowe sprawdzenie, aby właśnie nie nastąpił wyjątek w przypadku braku pozycji.

Snapshot

Słyszałem głosy, że ES ma w swych założeniach luki, bo jak się tych zdarzeń uzbiera to ładowanie takiego agragatu będzie wolne. Bo weź za każdym wczytuj obiekt co ma 1000 zdarzeń... Fakt. Czasem może się tak zdarzyć, ale starzy wyjadacze w tym temacie na bazie doświadczeń szybciutko sprostują, że takich obiektów, które mają setki, tysiące lub więcej zdarzeń raczej nie spotyka się dużo, a nawet twierdzą, że większość systemów (poprawnie zamodelowanych) nie będzie posiadać takich w ogóle.

No, ale co jeśli się zdarzy? Wtedy możemy użyć do tego migawkę (piekne polskie słowo), czyli Snapshot. SimpleEventStore posiada od razu wbudowany mechanizm do prostego tworzenia migawek. W sumie to powyższy kod zawiera wszystko co jest do tego potrzebne. :)

Klasa ShoppingCartState implementuje pusty interface IMemento, który mówi, że ten obiekt może być wykorzystany jako migawka dla agregatu ShoppingCart. Jak stworzyć i zapisać migawkę?

var snapshot = cart.GetSnapshot(); // zwraca migawkę, której stan to obecna instancja ShoppingCartState
await store.Advanced.UpdateSnapshotAsync(cart.Id, snapshot.Version, snapshot.State); // zapisuje migawkę

Każda migawka posiada numer wersji. Dzięki temu SES wie, że przy ładowaniu danych agregatu, jeśli posiada migawkę w wersji 1000, musi załadować zdarzenia nowsze od wersji 1000. Oznacza to, że nie ładuje pierwszego tysiąca zdarzeń. Dzięki temu zmniejsza się znacznie ilość danych, które trzeba wyczytać z bazy i czas załadowania agregatu będzie dużo krótszy.

Dużo kodu, infrastruktury brak

Jak widać powyżej, podczas działania przedstawionych przykładów skupiamy się głównie nad obsługą logiki domenowej bez wnikania w bebeszki infrastrukturalne. Dzieje się to dlatego, że wszystko zostało zapakowane i ukryte pod kilkoma prostymi abstrakcjami.

Chcecie infrastrukturkę? No dobrze, niech już będzie, proszę bardzo.

IEventStore store = new EventStoreBuilder()
    .WithSerializer(new JilSerializer())
    .WithLogger(new NLogLogger("Ses"))
    .WithDefaultContractsRegistry(typeof(SampleRunner).Assembly)
    .WithMsSqlPersistor(connectionString, x =>
    {
        x.RunLinearizer(TimeSpan.FromMilliseconds(20), TimeSpan.FromMinutes(20));
    })
    .Build();

Powyższy kawałek kodu jest potrzebny do tego, aby uruchomić działanie SimpleEventStore po stronie logiki domenowej. Jak widać nie wymusza konkretnego komponentu potrzebnego do serializacji zdarzeń ani logera. Dwa interfejsy ISerializer i ILogger, które dostarcza SES powalają na całkowicie dowolną implementację. Ja stosuję odpowiednio Jil do serializacji i deserializacji oraz NLog jako loger.

Hmm... Tak mi się przypomniało... Pamiętacie post Jimmiego Bogarda, który pisał o mitach CQRS/ES?

Jeśli chcecie, aby projekcje były tworzone w tej samej transakcji co logika domenowa to w sumie już nic więcej nie potrzeba. Repository posiada bowiem metodę OnAfterSaveChanges, gdzie w prosty sposób można wpiąć własny mechanizm propagacji zdarzeń i ich przechwytywania w celu wykonania projekcji. Takim przykładowym mechanizmem może być MediatR lub jakakolwiek inna implementacja IObserver/IObservable. Wszystko wtedy będzie 'in-sync'. Ha. :)

Co dalej?

Wszystko to co powyżej opisałem służy do obsługi tej części systemu, która jest odpowiedzialna za przetwarzanie logiki domenowej. W kolejnym wpisie rozłożymy na łopatki tą drugą część, aby użytkownicy naszego systemu mogli cokolwiek zobaczyć w UI. Dokładniej rzecz biorąc będziemy posługiwać się komponentem SimpleEventStore.Subscriptions. Sprawdzimy jak to będzie działać kiedy projekcje będą budowane 'async' i wprowadzimy sobie pojęcie 'eventual consistency' :)

Komentarze