Operations that Span Multiple Event Streams with the Critter Stack

Let’s just say that Marten incurs some serious benefits to being on top of PostgreSQL and its very strong support for transactional integrity as opposed to some of the high profile commercial Event Sourcing tools who are spending a lot of time and energy on their “Dynamic Consistency Boundary” concept because they lack the ACID compliant transactions that Marten gets for free by riding on top of PostgreSQL.

Marten has long had the ability to support both reading and appending to multiple event streams at one time with guarantees about data consistency and even the ability to achieve strongly consistent transactional writes across multiple streams at one time. Wolverine just added some syntactic sugar to make cross-stream command handlers be more declarative with its “aggregate handler workflow” integration with Marten.

Using the canonical example of a use case where you move money from one account to another account and need both changes to be persisted in one atomic transaction. Let’s start with a simplified domain model of events and a “self-aggregatingAccount type like this:

public record AccountCreated(double InitialAmount);
public record Debited(double Amount);
public record Withdrawn(double Amount);

public class Account
{
    public Guid Id { get; set; }
    public double Amount { get; set; }

    public static Account Create(IEvent<AccountCreated> e)
        => new Account { Id = e.StreamId, Amount = e.Data.InitialAmount};

    public void Apply(Debited e) => Amount += e.Amount;
    public void Apply(Withdrawn e) => Amount -= e.Amount;
}

Moving on, here’s what a command handler could be that handles a TransferMoney command that impacts two different accounts:

public record TransferMoney(Guid FromId, Guid ToId, double Amount);

public static class TransferMoneyEndpoint
{
    [WolverinePost("/accounts/transfer")]
    public static void Post(
        TransferMoney command,

        [Aggregate(nameof(TransferMoney.FromId))] IEventStream<Account> fromAccount,
        
        [Aggregate(nameof(TransferMoney.ToId))] IEventStream<Account> toAccount)
    {
        // Would already 404 if either referenced account does not exist
        if (fromAccount.Aggregate.Amount >= command.Amount)
        {
            fromAccount.AppendOne(new Withdrawn(command.Amount));
            toAccount.AppendOne(new Debited(command.Amount));
        }
    }
}

The IEventStream<T> abstraction comes from Marten’s FetchForWriting() API that is our recommended way to interact with Marten streams in typical command handlers. This API is used underneath Wolverine’s “aggregate handler workflow”, but normally hidden from user written code if you’re only working with one stream at a time. In this case though, we’ll need to work with the raw IEventStream<T> objects that both wrap the projected aggregation of each Account as well as providing a point where we can explicitly append events separately to each event stream. FetchForWriting() guarantees that you get the most up to date information for the Account view of each event stream regardless of how you have configured Marten’s ProjectionLifecycle for Account (kind of an important detail here!).

The typical Marten transactional middleware within Wolverine is calling SaveChangesAsync() for us on the Marten unit of work IDocumentSession for the command. If there’s enough funds in the “From” account, this command will append a Withdrawn event to the “From” account and a Debited event to the “To” account. If either account has been written to between fetching the original information, Marten will reject the changes and throw its ConcurrencyException as an optimistic concurrency check.

In unit testing, we could write a unit test for the “happy path” where you have enough funds to cover the transfer like this:

public class when_transfering_money
{
    [Fact]
    public void happy_path_have_enough_funds()
    {
        // StubEventStream<T> is a type that was recently added to Marten
        // specifically to facilitate testing logic like this
        var fromAccount = new StubEventStream<Account>(new Account { Amount = 1000 }){Id = Guid.NewGuid()};
        var toAccount = new StubEventStream<Account>(new Account { Amount = 100}){Id = Guid.NewGuid()});
        
        TransferMoneyEndpoint.Post(new TransferMoney(fromAccount.Id, toAccount.Id, 100), fromAccount, toAccount);

        // Now check the events we expected to be appended
        fromAccount.Events.Single().ShouldBeOfType<Withdrawn>().Amount.ShouldBe(100);
        toAccount.Events.Single().ShouldBeOfType<Debited>().Amount.ShouldBe(100);
    }
}

Alright, so there’s a few remaining items we still need to improve over time:

  1. Today there’s no way to pass in the expected starting version of each individual stream
  2. There’s some ongoing work to allow Wolverine to intelligently parallelize work between business entities or event streams while doing work sequentially within a business entity or event stream to side step concurrency problems
  3. We’re working toward making Wolverine utilize Marten’s batch querying support any time you use Wolverine’s declarative persistence helpers against Marten and request more than one item from Marten. You can use Marten’s batch querying with its FetchForWriting() API today if you just drop down to the lower level and work directly against Marten, but wouldn’t it be nice if Wolverine would just do that automatically for you in cases like the TransferMoney command handler above? We think this will be a significant performance improvement because network round trips are evil.

I covered this example at the end of a live stream we did last week on Event Sourcing with the Critter Stack:

One thought on “Operations that Span Multiple Event Streams with the Critter Stack

  1. I often have workflows where individual claim lines are processed first, and later grouped into a single claim (invoice). By the time they’re finalized, each claim line has gone through ~20 events, all handled by single-stream projections.

    At the end of the process, I want to:

    1. Append a ClaimLineBilled event to each claim line stream.
    2. Append a ClaimStarted event to the claim stream itself.

    If I only appended the event to the claim, I’d have to make all those claim line projections multi-stream just so they could react to it. By appending directly to each line’s own stream, I can keep them single-stream and avoid that complexity.

    Here’s the proposed API for handling an unknown number of streams from a message:

    “`cs
    public interface IEventStreams
    {
    IReadOnlyList Ids { get; }
    IReadOnlyList> Items { get; }
    IEventStream<TAggregate> this[TId id] { get; }
    bool TryGet(TId id, out IEventStream<TAggregate> stream);
    }

    public record FinalizeClaim(Guid ClaimId, IReadOnlyList LineIds, BilledAt BilledAt);

    [WolverinePost(“/claims/finalize”)]
    public static void Post(
    FinalizeClaim cmd,
    [Aggregate(nameof(FinalizeClaim.LineIds))] IEventStreams lines,
    [Aggregate(nameof(FinalizeClaim.ClaimId))] IEventStream claim)
    {
    foreach (var stream in lines.Items)
    {
    stream.AppendOne(new ClaimLineBilled(cmd.ClaimId, cmd.BilledAt));
    }
    claim.AppendOne(new ClaimStarted(cmd.BilledAt));

    }
    “`

Leave a comment