Marten 7 makes “Write Model” Projections Super

Marten 7.0 was released this week with a bevy of improvements and some important new features. One important area of concentration was a series of changes sponsored by a JasperFx Software client to improve Marten‘s scalability and to achieve more seamless deployments that involve changes to event store projections.

To show off this new work, I’d like to review CQRS command handler from my earlier Building a Critter Stack Application series. In that series, we had a command handler that attempted to apply changes to an existing event stream representing an Incident within our system:

public static async Task Handle(
    CategoriseIncident command, 
    IDocumentSession session, 
    CancellationToken cancellationToken)
{
    // Find the existing state of the referenced Incident
    // but also set Marten up for optimistic version checking on
    // the incident upon the call to SaveChangesAsync()
    var stream = await session
        .Events
        .FetchForWriting<IncidentDetails>(command.Id, cancellationToken);
 
    // Don't worry, we're going to clean this up later
    if (stream.Aggregate == null)
    {
        throw new ArgumentOutOfRangeException(nameof(command), "Unknown incident id " + command.Id);
    }
     
    // We need to validate whether this command actually 
    // should do anything
    if (stream.Aggregate.Category != command.Category)
    {
        var categorised = new IncidentCategorised
        {
            Category = command.Category,
            UserId = SystemId
        };
 
        stream.AppendOne(categorised);
         
        // This call may throw a ConcurrencyException!
        await session.SaveChangesAsync(cancellationToken);
    }
}

Mostly, I want to call your attention to this Marten API:

    var stream = await session
        .Events
        .FetchForWriting<IncidentDetails>(command.Id, cancellationToken);

The FetchForWriting() API is an important building block for using Marten inside of a CQRS command handler where you need to quickly fetch the “write model” for an event stream that represents enough of the state of that stream to make decisions about next steps.

This API completely hides away how Marten is deriving that aggregated state as well as setting the command handler up to easily opt into Marten’s optimistic concurrency features. Before 7.0, this API can either be doing “Live” aggregation where Marten fetches the raw events and applies them in memory to create the aggregated state, or uses an “Inline” aggregation where Marten is always updating the projected aggregate at the time that new events are captured.

Both of those strategies give you the all important strong consistency between the raw event data and the aggregated state, but the “Live” mode can be expensive with bigger or longer event streams and the “Inline” mode adds some expense to every single transaction with Marten that involves capturing new events.

With an important assist from the new numeric revisioning strategy, Marten 7.0 now enables users to utilize the asynchronous projection feature in Marten as a potentially much more efficient way to achieve strong consistency between the raw event data and the aggregated state served up by FetchForWriting() API.

In the sample incident tracking, help desk application, we have this projection for calculating the IncidentDetails model:

public class IncidentDetailsProjection: SingleStreamProjection<IncidentDetails>
{
    public static IncidentDetails Create(IEvent<IncidentLogged> logged) =>
        new(logged.StreamId, logged.Data.CustomerId, IncidentStatus.Pending, Array.Empty<IncidentNote>());

    public IncidentDetails Apply(IncidentCategorised categorised, IncidentDetails current) =>
        current with { Category = categorised.Category };

    public IncidentDetails Apply(IncidentPrioritised prioritised, IncidentDetails current) =>
        current with { Priority = prioritised.Priority };

    public IncidentDetails Apply(AgentAssignedToIncident prioritised, IncidentDetails current) =>
        current with { AgentId = prioritised.AgentId };

    public IncidentDetails Apply(IncidentResolved resolved, IncidentDetails current) =>
        current with { Status = IncidentStatus.Resolved };

    public IncidentDetails Apply(ResolutionAcknowledgedByCustomer acknowledged, IncidentDetails current) =>
        current with { Status = IncidentStatus.ResolutionAcknowledgedByCustomer };

    public IncidentDetails Apply(IncidentClosed closed, IncidentDetails current) =>
        current with { Status = IncidentStatus.Closed };
}

Now, we have these three options for the projection registration that are all going to be supported by the FetchForWriting() API:

builder.Services.AddMarten(opts =>
{
    // Do note that the three options below are all mutually
    // exclusive
    
    // Pre 7.0, our options were to either run the projection inline like this:
    opts.Projections.Add<IncidentDetailsProjection>(ProjectionLifecycle.Inline);
    
    // or rely on in memory calculation like:
    opts.Projections.Add<IncidentDetailsProjection>(ProjectionLifecycle.Live);
    
    // NOW though, with 7.0, we have this option:
    opts.Projections.Add<IncidentDetailsProjection>(ProjectionLifecycle.Async);

    // other configuration
})

Running the “write model” aggregation of the IncidentDetails projection gives us some immediate benefits:

  • There’s less work going on at the time of writing new event data to an incident stream if we don’t also have to do the work of updating the IncidentDetails model. That may make our system more responsive to users or outside clients of our application’s services
  • At the time that FetchForWriting() is called, Marten starts with whatever the current persisted state of the IncidentDetails aggregate document that is saved in the database, and apply any newer events on top of the persisted state for a strongly consistent model with the raw events
  • There’s some significant batching happening in the asynchronous daemon process that can be more efficient overall than the “Inline” projection calculation can be. That could be considerable for systems that need to capture events against multiple streams in the same transaction (that came up very recently from a user in our Discord room)
  • As I’m about to show next, this “Async” aggregation gives Marten the ability to allow for zero downtime deployments even if our IncidentDetails projection changes. It even allows for the ability to do blue/green deployments where the newer and older versions of IncidentDetails can coexist at runtime on the same database before the older version is completely retired!

I would like to point out that the mechanism for FetchForWriting() with asynchronous projections is batching up the necessary database queries in one round trip. I would like to argue that this is indicative of our attention to detail with Marten and a sign that Marten represents some serious minded engineering that you can feel good about as a technical dependency for important systems.

In the course of our system, someone up high decides we need to track whether issues are being duplicated. Maybe we add a new event like so:

public record IncidentDuplicated(Guid OtherId);

And update our projection to use this new event like so:

    public IncidentDetails Apply(IncidentDuplicated duplicated, IncidentDetails current) =>
        current with { DuplicateIds = current.DuplicateIds.Concat(new[] { duplicated.OtherId }).ToArray() };

To make this sample make more sense, let’s say that there were also some breaking changes to the IncidentDetails projection that made it incompatible with the old projected model.

Now, we’d like to utilize this new version of the projection immediately — but, wait! The existing system already has the old projection model persisted in database tables! If you used the “Inline” lifecycle, you would have to shut down the system, and rebuild the projection from scratch to replace the persisted data.

Now though, with Marten 7.0, we can instead make one more change to our projection and add this line to mark it as version 2:

    public IncidentDetailsProjection()
    {
        ProjectionVersion = 2;
    }

And finally, we’ll deploy our new version of the application — but because we’re being fancy, we’ll deploy in a blue/green way to only some of the application nodes while the older version of the application runs on other nodes.

When the application starts up with the new version of IncidentDetails, Marten will (this can vary based on your configuration, but let’s assume defaults for now):

  • Treat the V2 version of IncidentDetailsProjection as being completely separate from the original version. This means that Marten will create parallel tables for the V2 version of IncidentDetails and start calculating the IncidentDetailsProjection V2 from the starting position of the event store in the background (this is worth a much longer conversation later). Marten pulls this off by just adding an “_#” suffix to the tables and functions for the document where “#” is the version number if the version is greater than 1.
  • When FetchForWriting() is called on an individual stream, it might be effectively doing a “Live” aggregation until the projection catches up in the background with the “high water mark” of the event store, but at least you have zero required downtime for your IncidentDetails “write model.”
  • Marten’s async daemon subsystem got some spiffy improvements for V7, so now when you use the HotCold mode, the daemon just makes sure that every single projection for every individual database is running on exactly one node at a time, so the newly revisioned projection will be started up and actively run on one of the nodes that is running the latest revision of the system.

In the older version of the application still running on some nodes, Marten can happily just skip the “unknown” IncidentDuplicated events when it encounters them in the running asynchronous projections there rather than failing with repetitive exceptions.

Summary

Hey, this was a humongous set of improvements that I through out pretty quickly. All told, I’d summarize this by saying that Marten V7:

  1. Provides potentially better system throughput and scalability
  2. Allows for zero downtime deployments when a “write model” projection is revisioned (we’ve still got some work to do for multi-stream projections)
  3. Even manages to support blue/green deployments for much more flexibility in promoting changes to your systems that use Marten

Leave a comment