CQRS Command Handlers with Marten

Hey, did you know that JasperFx Software offers both consulting services and support plans for the “Critter Stack” tools? One of the common areas where we’ve helped our clients is in using Marten or Wolverine when the usage involves quite a bit of potential concerns about concurrency. As I write this, I’m currently working with a JasperFx client to implement the FetchForWriting API shown in this post as a way of improving their system’s resiliency to concurrency problems.

You’ve decided to use event sourcing as your persistence strategy, so that your persisted state of record are the actual business events segregated by streams that represent changes in state to some kind of logical business entity (an invoice? an order? an incident? a project?). Of course there will have to be some way of resolving or “projecting” the raw events into a usable view of the system state, but we’ll get to that.

You’ve also decided to organize your system around a CQRS architectural style (Command Query Responsibility Segregation). With a CQRS approach, the backend code is mostly organized around the “verbs” of your system, meaning the “command” messages (this could be HTTP services, and I’m not implying that there automatically has to be any asynchronous messaging) that are handled to capture changes to the system (events in our case), and “query” endpoints or APIs that strictly serve up information about your system.

While it’s certainly possible to do either Event Sourcing or CQRS without the other, the two things do go together as Forrest Gump would say, like peas and carrots. Marten is certainly valuable as part of a CQRS with Event Sourcing approach within a range of .NET messaging or web frameworks, but there is quite a bit of synergy between Marten and its “Critter Stack” stable mate Wolverine (see the details about the integration here).

And lastly of course, you’ve quite logically decided to use Marten as the persistence mechanism for the events. Marten is also a strong fit because it comes with some important functionality that we’ll need for CQRS command handlers:

  • Marten’s event projection support can give us a representation of the current state of the raw event data in a usable way that we’ll need within our command handlers to both validate requested actions and to “decide” what additional events should be persisted to our system
  • The FetchForWriting API in Marten will not only give us access to the projected event data, but it provides an easy mechanism for both optimistic and pessimistic concurrency protections in our system
  • Marten allows for a couple different options of projection lifecycle that can be valuable for performance optimization with differing system needs

As a sample application problem domain, I got to be part of a successful effort during the worst of the pandemic to stand up a new “telehealth” web portal using event sourcing. One of the concepts in that system that we needed to track in that system was the activity of a health care provider (nurse, doctor, nurse practitioner) with events for when they were available and what they were doing at any particular time during the day for later decision making:

public record ProviderAssigned(Guid AppointmentId);

public record ProviderJoined(Guid BoardId, Guid ProviderId);

public record ProviderReady;

public record ProviderPaused;

public record ProviderSignedOff;

// "Charting" is basically just whatever
// paperwork they need to do after
// an appointment, and it was important
// for us to track that time as part
// of their availability and future
// planning
public record ChartingFinished;

public record ChartingStarted;

public enum ProviderStatus
{
    Ready,
    Assigned,
    Charting,
    Paused
}

But of course, at several points, you do actually need to know what the actual state of the provider’s current shift is to be able to make decisions within the command handlers, so we had a “write” model something like this:

// I'm sticking the Marten "projection" logic for updating
// state from the events directly into this "write" model,
// but you could separate that into a different class if you
// prefer
public class ProviderShift
{
    public Guid Id { get; set; }

    // This is important, this would be set by Marten to the 
    // current event number or revision of the ProviderShift
    // aggregate. This is going to be important later for
    // concurrency protections
    public int Version { get; set; }
    public Guid BoardId { get; private set; }
    public Guid ProviderId { get; init; }
    public ProviderStatus Status { get; private set; }
    public string Name { get; init; }
    public Guid? AppointmentId { get; set; }
    
    // The Create & Apply methods are conventional targets
    // for Marten's "projection" capabilities
    // But don't worry, you would never *have* to take a reference
    // to Marten itself like I did below jsut out of laziness
    public static ProviderShift Create(
        ProviderJoined joined)
    {
        return new ProviderShift
        {
            Status = ProviderStatus.Ready,
            ProviderId = joined.ProviderId,
            BoardId = joined.BoardId
        };
    }

    public void Apply(ProviderReady ready)
    {
        AppointmentId = null;
        Status = ProviderStatus.Ready;
    }

    public void Apply(ProviderAssigned assigned)
    {
        Status = ProviderStatus.Assigned;
        AppointmentId = assigned.AppointmentId;
    }

    public void Apply(ProviderPaused paused)
    {
        Status = ProviderStatus.Paused;
        AppointmentId = null;
    }

    // This is kind of a catch all for any paperwork the
    // provider has to do after an appointment has ended
    // for the just concluded appointment
    public void Apply(ChartingStarted charting)
    {
        Status = ProviderStatus.Charting;
    }
}

The whole purpose of the ProviderShift type above is to be a “write” model that contains enough information for the command handlers to “decide” what should be done — as opposed to a “read” model that might have richer information like the provider’s name that would be more suitable or usable for using within a user interface. “Write” or “read” in this case is just a role within the system, and at different times it might be valuable to have separate models for different consumers of the information and in other times be able to happily get by with a single model.

Alright, so let’s finally look at a very simple command handler related to providers that tries to mark the provider as being finished charting:

// Since we're focusing on Marten, I'm using an MVC Core
// controller just because it's commonly used and understood
public class CompleteChartingController : ControllerBase
{
    [HttpPost("/provider/charting/complete")]
    public async Task Post(
        [FromBody] CompleteCharting charting,
        [FromServices] IDocumentSession session)
    {
        // We're looking up the current state of the ProviderShift aggregate
        // for the designated provider
        var stream = await session
            .Events
            .FetchForWriting<ProviderShift>(charting.ProviderShiftId, HttpContext.RequestAborted);

        // The current state
        var shift = stream.Aggregate;
        
        if (shift.Status != ProviderStatus.Charting)
        {
            // Obviously do something smarter in your app, but you 
            // get the point
            throw new Exception("The shift is not currently charting");
        }
        
        // Append a single new event just to say 
        // "charting is finished". I'm relying on 
        // Marten's automatic metadata to capture
        // the timestamp of this event for me
        stream.AppendOne(new ChartingFinished());

        // Commit the transaction
        await session.SaveChangesAsync();
    }
}

I’m using the Marten FetchForWriting() API to get at the current state of the event stream for the designated provider shift (a provider’s activity during a single day). I’m also using this API to capture a new event marking the provider as being finished with charting. FetchForWriting() is doing two important things for us:

  1. Executes or finds the projected data for ProviderShift from the raw events. More on this a little later
  2. Provides a little bit of optimistic concurrency protection for our provider shift stream

Building on the theme of concurrency first, the command above will “remember” the current state of the ProviderShift at the point that FetchForWriting() is called. Upon SaveChangesAsync(), Marten will reject the transaction and throw a ConcurrencyException if some how, some way, some other request magically came through and completed against that same ProviderShift stream between the call to FetchForWriting() and SaveChangesAsync().

That level of concurrency is baked in, but we can do a little bit better. Remember that the ProviderShift has this property:

    // This is important, this would be set by Marten to the 
    // current event number or revision of the ProviderShift
    // aggregate. This is going to be important later for
    // concurrency protections
    public int Version { get; set; }

The projection capability of Marten makes it easy for us to “know” and track the current version of the ProviderShift stream so that we can feed it back to command handlers later. Here’s the full definition of the CompleteCharting command:

public record CompleteCharting(
    Guid ProviderShiftId,
    
    // This version is meant to mean "I was issued
    // assuming that the ProviderShift is currently
    // at this version in the server, and if the version
    // has shifted since, then this command is now invalid"
    int Version
);

Let’s tighten up the optimistic concurrency protection such that Marten will shut down the command handling faster before we waste system resources doing unnecessary work by passing the command version right into this overload of FetchForWriting():

// Since we're focusing on Marten, I'm using an MVC Core
// controller just because it's commonly used and understood
public class CompleteChartingController : ControllerBase
{
    [HttpPost("/provider/charting/complete")]
    public async Task Post(
        [FromBody] CompleteCharting charting,
        [FromServices] IDocumentSession session)
    {
        // We're looking up the current state of the ProviderShift aggregate
        // for the designated provider
        var stream = await session
            .Events
            .FetchForWriting<ProviderShift>(
                charting.ProviderShiftId, 
                
                // Passing the expected, starting version of ProviderShift
                // into Marten
                charting.Version,
                HttpContext.RequestAborted);

        // And the rest of the controller stays the same as
        // before....
    }
}

In the usage above, Marten will do a version check both at the point of FetchForWriting() using the version we passed in, and again during the call to SaveChangesAsync() to reject any changes made if there was a concurrent update to that same stream.

Lastly, Marten gives you the ability to opt into heavier, exclusive access to the ProviderShift with this option:

// We're looking up the current state of the ProviderShift aggregate
// for the designated provider
var stream = await session
    .Events
    .FetchForExclusiveWriting<ProviderShift>(
        charting.ProviderShiftId, 
        HttpContext.RequestAborted);

In that last usage, we’re relying on the underlying PostgreSQL database to get us an exclusive row lock on the ProviderShift event stream such that only our current operation is allowed to write to that event stream while we have the lock. This is heavier and comes with some risk of database locking problems, but solves the concurrency issue.

So that’s concurrency protection in FetchForWriting(), but I mostly skipped over when and how that API will execute the projection logic to go from the raw events like ProviderJoined, ProviderReady, or ChartingStarted to the projected ProviderShift.

Any projection in Marten can be calculated or executed with three different “projection lifecycles”:

  1. Live — in this case, a projection is calculated on the fly by loading the raw events in memory and calculating the current state right then and there. In the absence of any other configuration, this is the default lifecycle for the ProviderShift per stream aggregation.
  2. Inline — a projection is updated at the time any events are appended by Marten and persisted by Marten as a document in the PostgreSQL database.
  3. Async — a projection is updated in a background process as events are captured by Marten across the system. The projected state is persisted as a Marten document to the underlying PostgreSQL database

The first two options give you strong consistency models where the projection will always reflect the current state of the events captured to the database. Live is probably a little more optimal in the case where you have many writes, but few reads, and you want to optimize the “write” side. Inline is optimal for cases where you have few writes, but many reads, and you want to optimize the “read” side (or need to query against the projected data rather than just load by id). The Async model gives you the ability to take the work of projecting events into the aggregated state out of both the “write” and “read” side of things. This might easily be advantageous for performance and very frequently necessary for ordering or concurrency concerns.

In the case of the FetchForWriting() API, you will always have a strongly consistent view of the raw events because that API is happily wallpapering over the lifecycle for you. Live aggregation works as you’d expect, Inline aggregation works by just loading the expected document directly from the database, and Async aggregation is a hybrid model that starts from the last known persisted value for the aggregate and applies any missing events right on top of that (the async behavior was a big feature added in Marten 7).

By hiding the actual lifecycle behavior behind the FetchForWriting() signature, teams are able to experiment with different approaches to optimize their application without breaking existing code.

Summary

FetchForWriting() was built specifically to ease the usage of Marten within CQRS command handlers after seeing how much boilerplate code teams were having to use before with Marten. At this point, this is our strongly recommended approach for command handlers. Also note that this API is utilized within the Wolverine + Marten “aggregate handler workflow” usage that does even more to remove code ceremony from CQRS command handler code. To some degree, what is now Wolverine was purposely rebooted and saved from the scrap heap specifically because of that combination with Marten and the FetchForWriting API.

Personally, I’m opposed to any kind of IAggregateRepository or approach where the “write” model itself tracks the events that are applied or uncommitted. I’m trying hard to discourage folks using Marten away from this still somewhat popular old approach in favor of a more Functional Programming-ish approach.

FetchForWriting could be used as part of a homegrown “Decider” pattern usage if that’s something you prefer (I think the “decider” pattern in real life usage ends up devolving into brute force procedural code through massive switch statements personally).

The “telehealth” system I mentioned before was built in real life with a hand-rolled Node.js event sourcing implementation, but that experience has had plenty of influence over later Marten work including a feature that just went into Marten over the weekend for a JasperFx client to be able to emit “side effect” actions and messages during projection updates.

I was deeply unimpressed with the existing Node.js tooling for event sourcing at that time (~2020), but I would hope it’s much better now. Marten has absolutely grown in capability in the past couple years.

Leave a comment