Event Sourcing with Marten V4: Aggregated Projections

All the code samples in this post are from alpha code, and maybe subject to change based on user feedback. At a minimum, I’d expect the configuration code to change as we write more documentation and sample code and try to sand down anything that’s awkward, confusing, or not discoverable. I’m planning on making this the first in a series of a blog posts. Please, please, please share any feedback or questions you might have about the Marten usage here.

The Marten team kicked out a new alpha this week (4.0.0-alpha.5) that among other things, includes most of our planned improvements to Marten’s event sourcing support.

Before I dive into the Marten V4 improvements, let’s rewind and talk about what event sourcing is, starting with some quotes:

The fundamental idea of Event Sourcing is that of ensuring every change to the state of an application is captured in an event object, and that these event objects are themselves stored in the sequence they were applied for the same lifetime as the application state itself.

Event Sourcing by Martin Fowler (no relation to Marten:))

And,

Event Sourcing is an alternative way to persist data. In contrast with state-oriented persistence that only keeps the latest version of the entity state, Event Sourcing stores each state mutation as a separate record called an event.

What is Event Sourcing? by Alexey Zimarev 

We just finished a client project at Calavista that used event sourcing that we generally felt to be a success. In this case, the business problem was very workflow centric and lent itself well to being modeled as a series of events reflecting user actions captured by the system or determined by business rules in background processes. Moreover, the project had significant requirements to track metrics and auditing compliance and we found event sourcing to be a very effective way to knock out the auditing requirements as well as set our client up to be able to support whatever metrics they wished to develop in the future by ingesting the raw events.

We did need to know the current state of the active workflows going on within the system for many of the business rules, so we kept a live “projected” view of the raw events in memory in a background process. That strategy certainly won’t work for every system, but one way or another, your system built with event sourcing is likely going to need some way to derive the system state from the raw events — and this is where I finally want to switch to talking about the work we’ve been doing in Marten V4 to improve Marten’s read-side projection support.

If you’re wondering, we didn’t use Marten because the project in question was written on top of Node.js. If it had been a .Net project, I absolutely believe that Marten would have been a very good fit.

Marten’s Event Sourcing Support

The value of Marten as an event store is that in one library, you get:

  1. The ability to capture events in durable, sequential storage
  2. Opt in multi-tenancy support
  3. User-defined “Projections” that compile the derived state of the system based on the raw events, including the ability to store the projected views as just regular old Marten documents. Those “projected” views can be built on the fly, updated inline when new, related events are being captured, or built asynchronously by background processes provided by Marten.
  4. Plenty of functionality to query and retrieve event data

And all of that runs using the solid, fully transactional Postgresql database engine which is well supported on every major cloud hosting platform. I’m going to argue that Marten gives .Net developers the easiest path to a full fledged event sourcing persistence subsystem within your application architecture because it’s self-contained.

A Sample Domain Model

Let’s say we’re building a system to track our user’s road trips throughout the U.S. In that domain, we’re tracking and capturing these events for each active trip starting from the beginning of the system:

  1. TripStarted — a new trip started on a certain day
  2. TripEnded — a trip in progress ended at its final destination
  3. TripAborted — a trip in progress was ended before it was completed
  4. Departure and Arrival — a trip party reached or left a U.S. state
  5. Travel — a trip party drove within a single state on a single day a series of movements, all of a single cardinal direction like it’s a 1980’s Atari video game (cut me some slack, I needed a simple domain to test the projections here:))

Getting Started with Marten

Marten completely embraces the HostBuilder concept introduced in later versions of .Net Core for easy integration into .Net applications. Starting from the “worker” template to start a new .Net project, I add a reference to the Marten Nuget package and add this call to the AddMarten() extension method like so:

public static IHostBuilder CreateHostBuilder(string[] args)
{
    return Host.CreateDefaultBuilder(args)
        .ConfigureServices((hostContext, services) =>
        {
            var configuration = hostContext.Configuration;
            var environment = hostContext.HostingEnvironment;

            services.AddHostedService<Worker>();

            // This is the absolute, simplest way to integrate Marten into your
            // .Net Core application with Marten's default configuration
            services.AddMarten(options =>
            {
                // Establish the connection string to your Marten database
                options.Connection(configuration.GetConnectionString("Marten"));

                // If we're running in development mode, let Marten just take care
                // of all necessary schema building and patching behind the scenes
                if (environment.IsDevelopment())
                {
                    options.AutoCreateSchemaObjects = AutoCreate.All;
                }
            });
        });
}

I’ll add some new V4 options later in the post, but the basics of what I did up there is described in our documentation.

Now to capture some events. Assume that a new trip is started when our system receives this command message from another system or our user interface:

public class CreateNewTrip
{
    public int Day { get; set; }
    public string State { get; set; }
    public Movement[] Movements { get; set; }
}

Next, let’s say our message handler looks like this (it varies by messaging framework, but this is what it could look like with Jasper):

public class NewTripHandler
{
    private readonly IDocumentSession _session;

    public NewTripHandler(IDocumentSession session)
    {
        _session = session;
    }

    public async Task Handle(CreateNewTrip trip)
    {
        var started = new TripStarted
        {
            Day = trip.Day
        };

        var departure = new Departure
        {
            Day = trip.Day,
            State = trip.State
        };

        var travel = new Travel
        {
            Day = trip.Day,
            Movements = new List<Movement>(trip.Movements)
        };

        // This will create a new event stream and
        // append the three events to that new stream
        // when the IDocumentSession is saved
        var action = _session.Events
            .StartStream(started, departure, travel);

        // You can also use strings as the identifier
        // for streams
        var tripId = action.Id;

        // Commit the events to the new event
        // stream for this trip
        await _session.SaveChangesAsync();
    }
}

In Marten nomenclature, a “stream” is a related set of events in the event storage. In this system, we’ll use a stream for every distinct trip. The code above takes in the CreateNewTrip command message, and creates three events to record the initial progress of the new trip, and persists the new events.

So now that we’ve captured events, let’s move on to projections in Marten V4, because that’s been both a major area of effort and also the biggest changes for usage in this forthcoming release.

Aggregate by Stream

Projections support in Marten comes in a couple different flavors, but I’m guessing that the most common is projecting a single document view of a single stream of events. In this case, we’ll create a projected Trip view like this:

public class Trip
{
    public Guid Id { get; set; }

    // the day the trip ended
    public int EndedOn { get; set; }

    // total mileage of the trip to date
    public double Traveled { get; set; }

    // what state is the trip party in
    // presently
    public string State { get; set; }

    // is the trip ongoing?
    public bool Active { get; set; }

    // what day did the trip start?
    public int StartedOn { get; set; }
}

To configure an aggregated projection for a Trip stream, we’ll subclass the new AggregateProjection<T> class like so:

public class TripAggregation: AggregateProjection<Trip>
{
    public TripAggregation()
    {
        // Delete the Trip document for this
        // stream if this event is encountered
        DeleteEvent<TripAborted>();

        ProjectionName = "Trip";

        // We'll change this later
        Lifecycle = ProjectionLifecycle.Live;
    }

    public void Apply(Arrival e, Trip trip) => trip.State = e.State;
    public void Apply(Travel e, Trip trip) => trip.Traveled += e.TotalDistance();
    public void Apply(TripEnded e, Trip trip)
    {
        trip.Active = false;
        trip.EndedOn = e.Day;
    }

    public Trip Create(TripStarted started)
    {
        return new Trip {StartedOn = started.Day, Active = true};
    }
}

A couple notes here to explain the code:

  • Marten is depending on naming conventions to know what to do with a certain kind of event type. So the Create() method is used to create the Trip aggregate from an event object of type TripStarted.
  • The Apply() methods are used to make updates to an existing aggregate document based on an event of a certain type
  • For the moment, the TripAggregation is only used for “live” aggregations that are done on the fly. We’ll change that later

Now, let’s put this new projection to use. In our call to AddMarten() up above, I’m going to add one line of code to register our new projection:

options.Events.Projections.Add(new TripAggregation());

Let’s say that in our system we write events for trips very frequently, but very rarely need to see the current state of the trip (don’t know why that would be so, but just go with it). In that case we can just lean on Marten’s ability to aggregate the projected view on the fly as shown below with the AggregateStreamAsync() method:

public class EndTrip
{
    public Guid TripId { get; set; }
    public bool Successful { get; set; }
    public string State { get; set; }
    public int Day { get; set; }
}

public class EndTripHandler
{
    private readonly IDocumentSession _session;

    public EndTripHandler(IDocumentSession session)
    {
        _session = session;
    }

    public async Task Handle(EndTrip end)
    {
        // we need to first see the current
        // state of the Trip to decide how
        // to proceed
        var trip = await _session
            .Events
            .AggregateStreamAsync<Trip>(end.TripId);
        
        // finish processing the EndTrip command...
    }
}

If instead, we’d like to keep the matching Trip document up to date and persisted in the database as new events come in, we can switch the TripProjection to using the “inline” lifecycle by setting the Lifecycle property in the constructor function of TripProjection to:

public TripAggregation()
{
    DeleteEvent<TripAborted>();

    ProjectionName = "Trip";

    // Now let's change the lifecycle to inline
    Lifecycle = ProjectionLifecycle.Inline;
}

“Inline” projections are updated when events are appended, and in the same transaction as the append event database changes. This gives you true ACID transaction integrity between the events and the projected views. This can set you up for possible concurrency issues if multiple application threads are trying to update the same stream of events simultaneously, so exercise some caution with using inline projections.

So with the Trip documents being updated inline with new events coming in, our EndTripHandler becomes:

public class EndTripHandler
{
    private readonly IDocumentSession _session;

    public EndTripHandler(IDocumentSession session)
    {
        _session = session;
    }

    public async Task Handle(EndTrip end)
    {
        // we need to first see the current
        // state of the Trip to decide how
        // to proceed, so load the pre-built
        // projected document from the database
        var trip = await _session
            .LoadAsync<Trip>(end.TripId);

        // finish processing the EndTrip command...
    }
}

You can also run projections asynchronously in a background thread, but I’m going to leave that for a subsequent post.

Some other points of interest about AggregateProjection<T>:

  • There’s some wiggle room in the signature of the conventionally named Apply() and Create() methods. These method signatures can be asynchronous. They can also take in parameters for IQuerySession to load other information from Marten as they work or the IEvent / IEvent<T> data for the event to get access to metadata about the event or the event’s stream
  • The Apply() methods happily support immutable aggregate types. You’d simple return the new aggregate document created in an Apply() method or Task<T> where the T is the aggregate document type. That might not be very efficient because of the extra object allocations, but hey, some folks really want that.
  • I didn’t show it up above, but if you dislike the conventional “magic” I used above, that’s okay, there are methods you can use to define how to update or create the aggregate document based on specific event types through inline Lambda functions.
  • You can also conditionally delete the aggregated document if the logical workflow represented by an event stream is completed based on user defined logic.
  • All of the conventional method signatures as well as the inline Lambda usages for defining how an aggregate would be updated can also accept interface or abstract class types as well. This was a user request to enable folks to use event types from external assemblies in extensibility scenarios.

Aggregate a Projected Document Across Event Streams

I thought you said to never cross the streams! – had to be said:)

This is admittedly contrived, but let’s say we want a projected view from our raw trip events that tells us for each day the system is active:

  • How many trips started?
  • How many trips ended?
  • How many miles did all the active trips drive in each direction?

And by “day” in this system, I just mean the day number since the system went online. That aggregate might look like this:

public class Day
{
    public int Id { get; set; }

    // how many trips started on this day?
    public int Started { get; set; }

    // how many trips ended on this day?
    public int Ended { get; set; }

    // how many miles did the active trips
    // drive in which direction on this day?
    public double North { get; set; }
    public double East { get; set; }
    public double West { get; set; }
    public double South { get; set; }
}

So what we want to do is to group the TripStarted, TripEnded, and Travel events by the day, and create an aggregated Day document to reflect all the events that happened on the same day. The first step is to tell Marten how to know what events are going to be associated with a specific day, and the easiest way in Marten V4 is to have the events implement a common interface like this one:

public interface IDayEvent
{
    int Day { get; }
}

And then have the relevant events implement that interface like the TripStarted event:

public class TripStarted : IDayEvent
{
    public int Day { get; set; }
}

Now, to make the Marten projection for the Day document type, we’ll use the new V4 version of ViewProjection as a subclass like so:

// The 2nd generic parameter is the identity type of
// the document type. In this case the Day document
// is identified by an integer representing the number
// of days since the system went online
public class DayProjection: ViewProjection<Day, int>
{
    public DayProjection()
    {
        // Tell the projection how to group the events
        // by Day document
        Identity<IDayEvent>(x => x.Day);
        
        // This just lets the projection work independently
        // on each Movement child of the Travel event
        // as if it were its own event
        FanOut<Travel, Movement>(x => x.Movements);
        
        ProjectionName = "Day";
    }

    public void Apply(Day day, TripStarted e) => day.Started++;
    public void Apply(Day day, TripEnded e) => day.Ended++;

    public void Apply(Day day, Movement e)
    {
        switch (e.Direction)
        {
            case Direction.East:
                day.East += e.Distance;
                break;
            case Direction.North:
                day.North += e.Distance;
                break;
            case Direction.South:
                day.South += e.Distance;
                break;
            case Direction.West:
                day.West += e.Distance;
                break;

            default:
                throw new ArgumentOutOfRangeException();
        }
    }
}

The new ViewProjection is a subclass of AggregateProjection, and therefore has all the same capabilities as its parent type.

You’d most likely need to run a projection that crosses streams in the asynchronous projection lifecycle where the projection is executed in a background process, but I’m leaving that to another blog post.

Summary & What’s Next?

I focused strictly on projections that focused on aggregations, but there are plenty of use cases that don’t fit into that mold. In subsequent posts I’ll explore the other options for projections in Marten V4. The asynchronous projection support also got a full rewrite in Marten V4, so I’ll share plenty more about that.

In other posts, I’ll discuss some other improvements in the event capture process for reliability and concurrency issues. Hopefully for the next alpha release, we’ll be able to utilize native Postgresql database sharding to allow for far more scaleability in Marten’s event sourcing support.

2 thoughts on “Event Sourcing with Marten V4: Aggregated Projections

  1. You said to run a projection that crosses streams, the asynchronous projection lifecycle is needed. Why is this?

  2. Great article.

    I have been following Alexey Zimarev’s implementations of Event Sourcing using EventStore, but the implementation examples can be a bit too framework agnostic (and frankly, too advanced for me at times) having learned to program primarily with .NET.

    Having the ability to take care of event sourcing and projections with one library (Marten) simplifies so much of this.

    I feel like A. Zimarev’s new library Eventuous is taking a lot of the EventSourcing code he uses in his example and packaging them up in to a single available library, but until I’m able to take on that complexity – Marten is going to be perfect.

Leave a comment