Event Enrichment in Marten Projections

So here’s a common scenario when building a system using Event Sourcing with Marten:

  1. Some of the data in your system is just reference data stored as plain old Marten documents. Something like user data (like I’ll use in just a bit), company data, or some other kind of static reference data that doesn’t justify the usage of Event Sourcing. Or maybe you have some data that is event sourced, but it’s very static data otherwise and you can essentially treat the projected documents as just documents.
  2. You have workflows modeled with event sourcing and you want some of the projections from those events to also include information from the reference data documents

As an example, let’s say that your application has some reference information about system users saved in this document type (from the Marten testing suite):

public class User
{
    public User()
    {
        Id = Guid.NewGuid();
    }

    public List<Friend> Friends { get; set; }

    public string[] Roles { get; set; }
    public Guid Id { get; set; }
    public string UserName { get; set; }
    public string FirstName { get; set; }
    public string LastName { get; set; }
    public string FullName => $"{FirstName} {LastName}";
}

And you also have events for some kind of UserTask aggregate that manages the workflow of some kind of work tracking. You might have some events like this:

public record TaskLogged(string Name);
public record TaskStarted;
public record TaskFinished;

public class UserAssigned
{
    public Guid UserId { get; set; }

    // You don't *have* to do this with a mutable
    // property, but it is *an* easy way to pull this off
    public User? User { get; set; }
}

In a “query model” view of the event data, you’d love to be able to show the full, human readable User information about the user’s full name right into the projected document:

public class UserTask
{
    public Guid Id { get; set; }
    public bool HasStarted { get; set; }
    public bool HasCompleted { get; set; }
    public Guid? UserId { get; set; }

    // This would be sourced from the User
    // documents
    public string UserFullName { get; set; }
}

In the projection for UserTask, you can always reach out to Marten in an adhoc way to grab the right User documents like this possible code in the projection definition for UserTask:

    // We're just gonna go look up the user we need right here and now!
    public async Task Apply(UserAssigned assigned, IQuerySession session, UserTask snapshot)
    {
        var user = await session.LoadAsync<User>(assigned.UserId);
        snapshot.UserFullName = user.FullName;
    }

The ability to just pull in IQuerySession and go look up whatever data you need as you need it is certainly powerful, but hold on a bit, because what if:

  1. You’re running the projection for UserTask asynchronously using Marten’s async daemon where it updates potentially hundreds of UserTask documents a the same time?
  2. You expect the UserAssigned events to be quite common, so there’s a lot of potential User lookups to process the projection
  3. You are quite aware that the code above could easily turn into an N+1 Query Problem that won’t be helpful at all for your system’s performance. And if you weren’t aware of that before, please be so now!

Instead of the N+1 Query Problem you could easily get from doing the User lookup one single event at a time, what if instead we were able to batch up the calls to lookup all the necessary User information for a batch of UserTask data being updated by the async daemon?

Enter Marten 8.11 (hopefully by the time you read this!) and our newly introduced hook for “event enrichment” and you can now do exactly that as a way of wringing more performance and scalability out of your Marten usage! Let’s build a single stream projection for the UserTask aggregate type shown up above that batches the User lookup:

public class UserTaskProjection: SingleStreamProjection<UserTask, Guid>
{
    // This is where you have a hook to "enrich" event data *after* slicing,
    // but before processing
    public override async Task EnrichEventsAsync(
        SliceGroup<UserTask, Guid> group, 
        IQuerySession querySession, 
        CancellationToken cancellation)
    {
        // First, let's find all the events that need a little bit of data lookup
        var assigned = group
            .Slices
            .SelectMany(x => x.Events().OfType<IEvent<UserAssigned>>())
            .ToArray();

        // Don't bother doing anything else if there are no matching events
        if (!assigned.Any()) return;

        var userIds = assigned.Select(x => x.Data.UserId)
            // Hey, watch this. Marten is going to helpfully sort this out for you anyway
            // but we're still going to make it a touch easier on PostgreSQL by
            // weeding out multiple ids
            .Distinct().ToArray();
        var users = await querySession.LoadManyAsync<User>(cancellation, userIds);

        // Just a convenience
        var lookups = users.ToDictionary(x => x.Id);
        foreach (var e in assigned)
        {
            if (lookups.TryGetValue(e.Data.UserId, out var user))
            {
                e.Data.User = user;
            }
        }
    }

    // This is the Marten 8 way of just writing explicit code in your projection
    public override UserTask Evolve(UserTask snapshot, Guid id, IEvent e)
    {
        snapshot ??= new UserTask { Id = id };
        switch (e.Data)
        {
            case UserAssigned assigned:
                snapshot.UserId = assigned?.User.Id;
                snapshot.UserFullName = assigned?.User.FullName;
                break;

            case TaskStarted:
                snapshot.HasStarted = true;
                break;

            case TaskFinished:
                snapshot.HasCompleted = true;
                break;
        }

        return snapshot;
    }
}

Focus please on the EnrichEventsAsync() method above. That’s a new hook in Marten 4.13 that lets you define a step in asynchronous projection running to potentially do batched data lookups immediately after Marten has “sliced” and grouped a batch of events by each aggregate identity that is about to be updated, but before the actual updates are made to any of the UserTask snapshot documents.

In the code above, we’re looking for all the unique user ids that are referenced by any UserAssigned events in this batch of events, and making one single call to Marten to fetch the matching User documents. Lastly, we’re looping around on the AgentAssigned objects and actually “enriching” the events by setting a User property on them with the data we just looked up.

A couple other things:

  • It might not be terribly obvious, but you could still use immutable types for your event data and “just” quietly swap out single event objects within the EventSlice groupings as well.
  • You can also do “event enrichment” in any kind of custom grouping within MultiStreamProjection types without this new hook method, but I felt like we needed this to have an easy recipe at least for SingleStreamProjection classes. You might find this hook easier to use than doing database lookups in custom grouping anyway

Summary

That EnrichEventsAsync() code is admittedly some busy code that really isn’t the most obvious thing in the world to do, but when you need better throughput, the ability to batch up queries to the database can be a hugely effective way to improve your system’s performance and we think this will be a very worthy addition to the Marten projection model. I cannot possibly stress enough how insidious N+1 Query issues can be in enterprise systems.

This work was more or less spawned by conversations with a JasperFx Software client and some of their upcoming development needs. Just saying, if you want any help being more successful with any part of the Critter Stack, drop us a line at sales@jasperfx.net.

Leave a comment