Building a Critter Stack Application: Dealing with Concurrency

Hey, did you know that JasperFx Software is ready for formal support plans for Marten and Wolverine? Not only are we trying to make the “Critter Stack” tools be viable long term options for your shop, we’re also interested in hearing your opinions about the tools and how they should change. We’re also certainly open to help you succeed with your software development projects on a consulting basis whether you’re using any part of the Critter Stack or any other .NET server side tooling.

Let’s build a small web service application using the whole “Critter Stack” and their friends, one small step at a time. For right now, the “finished” code is at CritterStackHelpDesk on GitHub.

The posts in this series are:

  1. Event Storming
  2. Marten as Event Store
  3. Marten Projections
  4. Integrating Marten into Our Application
  5. Wolverine as Mediator
  6. Web Service Query Endpoints with Marten
  7. Dealing with Concurrency (this post)
  8. Wolverine’s Aggregate Handler Workflow FTW!
  9. Command Line Diagnostics with Oakton
  10. Integration Testing Harness
  11. Marten as Document Database
  12. Asynchronous Processing with Wolverine
  13. Durable Outbox Messaging and Why You Care!
  14. Wolverine HTTP Endpoints
  15. Easy Unit Testing with Pure Functions
  16. Vertical Slice Architecture
  17. Messaging with Rabbit MQ
  18. The “Stateful Resource” Model
  19. Resiliency

Last time out we talked using Marten’s projection data in the context of building query services inside our CQRS architecture for our new incident tracking help desk application. Today I want to talk about how to protect our systems from concurrency and ordering issues when we start to have more users or subsystems trying to access and even modify the same issues.

Imagine some of these unfortunately likely scenarios:

  1. A user gets impatient with our user interface and clicks on a button multiple times which sends multiple requests to our back end to add the same note to an incident
  2. A technician pulls up the incident details for something new, but then gets called away (or goes to lunch). A second technician pulls up the incident and carries out some actions to change the category or priority. The first technician come back to their desk, and tries to change the priority of the incident based on the stale data about that incident they already had open on their screen
  3. Later on, we may have several automated workflows happening that could conceivably try to change an incident simultaneously. In this case it might be important that actions involving an incident only happen one at a time to prevent inconsistent system state

In later posts I’ll talk about how Wolverine can help your system be much more robust in the face of concurrency issues and works with Marten to make your code robust for concurrency while still being low ceremony. Today though, I strictly want to talk about Marten’s built in protections for concurrency before getting fancy.

To review from a couple posts ago when I introduced Wolverine command handlers, we had this code to process a CategoriseIncident in our system:

    public static async Task Handle(
        CategoriseIncident command, 
        IDocumentSession session, 
        CancellationToken cancellationToken)
    {
        // Find the existing state of the referenced Incident
        var existing = await session
            .Events
            .AggregateStreamAsync<IncidentDetails>(command.Id, token: cancellationToken);

        // Don't worry, we're going to clean this up later
        if (existing == null)
        {
            throw new ArgumentOutOfRangeException(nameof(command), "Unknown incident id " + command.Id);
        }
        
        // We need to validate whether this command actually 
        // should do anything
        if (existing.Category != command.Category)
        {
            var categorised = new IncidentCategorised
            {
                Category = command.Category,
                UserId = SystemId
            };

            session.Events.Append(command.Id, categorised);
            await session.SaveChangesAsync(cancellationToken);
        }
    }

I’m going to change this handler to introduce some concurrency protection against the single incident referred to by the CategoriseIncident command. To do that, I’m going to use Marten’s FetchForWriting() API that we introduced specifically to make Marten easier to use within CQRS command handling and rewrite this handler to use optimistic concurrency protections:

    public static async Task Handle(
        CategoriseIncident command, 
        IDocumentSession session, 
        CancellationToken cancellationToken)
    {
        // Find the existing state of the referenced Incident
        // but also set Marten up for optimistic version checking on
        // the incident upon the call to SaveChangesAsync()
        var stream = await session
            .Events
            .FetchForWriting<IncidentDetails>(command.Id, cancellationToken);

        // Don't worry, we're going to clean this up later
        if (stream.Aggregate == null)
        {
            throw new ArgumentOutOfRangeException(nameof(command), "Unknown incident id " + command.Id);
        }
        
        // We need to validate whether this command actually 
        // should do anything
        if (stream.Aggregate.Category != command.Category)
        {
            var categorised = new IncidentCategorised
            {
                Category = command.Category,
                UserId = SystemId
            };

            stream.AppendOne(categorised);
            
            // This call may throw a ConcurrencyException!
            await session.SaveChangesAsync(cancellationToken);
        }
    }

Notice the call to FetchForWriting(). That loads the current IncidentDetails aggregate data for our incident event stream. Under the covers, Marten is also loading the current revision number for that incident event stream and tracking that. When the IDocumentSession.SaveChangesAsync() is called, it will attempt to append the new event(s) to the incident event stream, but this operation will throw a Marten ConcurrencyException and roll back the underlying database transaction if the incident event stream has been revisioned between the call to FetchForWriting() and SaveChangesAsync().

Do note that the call to FetchForWriting() can happily work with aggregate projections that are configured as either “live” or persisted to the database. Our strong recommendation within your command handlers where you’re appending events is to rely on this API so that you can easily change up projection lifecycles as necessary.

While this crude protection might be helpful by itself for concurrency protection, we can go farther and avoid doing work that is just going to fail anyway by telling Marten that the current command was issued assuming that the event stream is currently at an expected revision.

Just as a reminder to close the loop here, when we write the aggregated projection for IncidentDetails document type shown below:

public record IncidentDetails(
    Guid Id,
    Guid CustomerId,
    IncidentStatus Status,
    IncidentNote[] Notes,
    IncidentCategory? Category = null,
    IncidentPriority? Priority = null,
    Guid? AgentId = null,
    
    // This is meant to be the revision number
    // of the event stream for this incident
    int Version = 1
);

Marten will “automagically” set the value of a Version property of the aggregated document to the latest revision number of the event stream. This (hopefully) makes it relatively easy for systems built with Marten to transfer the current event stream revision number to user interfaces or other clients specifically to make optimistic concurrency protection easier.

Now that our user interface “knows” what it thinks the current version of the incident data is, we’ll also transmit that version number through our command that we’re posting to the service:

public class CategoriseIncident
{
    public Guid Id { get; set; }
    public IncidentCategory Category { get; set; }

    // This is to communicate to the server that
    // this command was issued assuming that the 
    // incident is currently at this revision
    // number
    public int Version { get; set; }
}

We’re going to change our message handler one more time, but this time we want a little stronger concurrency protection upfront to disallow any work from proceeding if the incident has been revisioned past where the client knew about, but still retain the optimistic concurrency check on SaveChangesAsync(). Squint really hard at the call to FetchForWriting() where I pass in the version number from the command as that’s the only change:

    public static async Task Handle(
        CategoriseIncident command, 
        IDocumentSession session, 
        CancellationToken cancellationToken)
    {
        // Find the existing state of the referenced Incident
        // *But*, throw a ConcurrencyException if the stream has been revisioned past
        // the expected, starting version communicated by the command
        var stream = await session
            .Events
            .FetchForWriting<IncidentDetails>(command.Id, command.Version, cancellationToken);

        // Don't worry, we're going to clean this up later
        if (stream.Aggregate == null)
        {
            throw new ArgumentOutOfRangeException(nameof(command), "Unknown incident id " + command.Id);
        }
        
        // We need to validate whether this command actually 
        // should do anything
        if (stream.Aggregate.Category != command.Category)
        {
            var categorised = new IncidentCategorised
            {
                Category = command.Category,
                UserId = SystemId
            };

            stream.AppendOne(categorised);
            
            // This call may throw a ConcurrencyException!
            await session.SaveChangesAsync(cancellationToken);
        }
    }

In the previous couple revisions, I’ve strictly used “optimistic concurrency” where you work on the assumption that it’s more likely than not okay to proceed, but update the database in some way such that it will reject the changes if the expected starting revision does not match the current revision stored in the database. Marten also has the option to use exclusive database locks where only the current transaction is allowed to edit the event stream. That usage is shown below, but yet again, just squint at the changed call to FetchForExclusiveWriting():

    public static async Task Handle(
        CategoriseIncident command, 
        IDocumentSession session, 
        CancellationToken cancellationToken)
    {

        // Careful! This will try to wait until the database can grant us exclusive
        // write access to the specific incident event stream
        var stream = await session
            .Events
            .FetchForExclusiveWriting<IncidentDetails>(command.Id, cancellationToken);

        // Don't worry, we're going to clean this up later
        if (stream.Aggregate == null)
        {
            throw new ArgumentOutOfRangeException(nameof(command), "Unknown incident id " + command.Id);
        }
        
        // We need to validate whether this command actually 
        // should do anything
        if (stream.Aggregate.Category != command.Category)
        {
            var categorised = new IncidentCategorised
            {
                Category = command.Category,
                UserId = SystemId
            };

            stream.AppendOne(categorised);
            
            await session.SaveChangesAsync(cancellationToken);
        }
    }

This approach is something I think of as a “guilty until proven innocent” tool. While this is absolutely more rigid protection against making concurrent access or concurrent processing of commands to a single incident event stream, it’s comes with some drawbacks. Using the exclusive lock makes your database engine work harder and use more resources is one issue. The database might also cause timeouts on the initial call to FetchForExclusiveWriting() as it has to wait until any previous locks from an ongoing transaction finishes. In your application you may need to separately handle this kind of TimeoutException differently from the optimistic ConcurrencyException (we’ll talk about that a lot more in later posts). This usage does come with a bit of risk for deadlocks in the database.

Lastly, you can technically use serializable transactions with Marten to really, really make the data access be serialized on a single event stream like so:

    public static async Task Handle(
        CategoriseIncident command, 
        IDocumentStore store, 
        CancellationToken cancellationToken)
    {
        // This is your last resort approach!
        await using var session = 
            await store.LightweightSerializableSessionAsync(cancellationToken);
        
        var stream = await session
            .Events
            .FetchForWriting<IncidentDetails>(command.Id, cancellationToken);

        // Don't worry, we're going to clean this up later
        if (stream.Aggregate == null)
        {
            throw new ArgumentOutOfRangeException(nameof(command), "Unknown incident id " + command.Id);
        }
        
        // We need to validate whether this command actually 
        // should do anything
        if (stream.Aggregate.Category != command.Category)
        {
            var categorised = new IncidentCategorised
            {
                Category = command.Category,
                UserId = SystemId
            };

            stream.AppendOne(categorised);
            
            await session.SaveChangesAsync(cancellationToken);
        }
    }

But if the exclusive lock was a “guilty until proven innocent” approach, then serializable transactions because of their even heavier overhead are a “break glass in case of emergency” usage you should keep in your back pocket unless you really, really need it.

Summary and What’s Next?

In this post I introduced Marten’s built in concurrency protections for appending data to event streams. For the most part, I think you should assume the usage of optimistic concurrency as a default as that’s lighter on your PostgreSQL database. I also showed how to track the current event stream version through projections in CQRS queries where it can then be used by clients to pass the expected starting version in commands to be used for optimistic concurrency checks within our CQRS commands.

In the next post, I think I’m going to introduce Wolverine’s aggregate handler workflow with Marten as a way of making the message handler in this post much simpler and easier to test.

Leave a comment