Improvements to Event Sourcing in Marten V4

Marten V4 is still in flight, but everything I’m showing in this post is in the latest alpha release (4.0.0-alpha.6) release to Nuget.

TL;DR: Marten V4 has some significant improvements to its event sourcing support that will help developers deal with potential concurrency issues. The related event projection support in Marten V4 is also significantly more robust than previous versions.

A Sample Project Management Event Store

Imagine if you will, a simplistic application that uses Marten’s event sourcing functionality to do project management tracking with a conceptual CQRS architecture. The domain will include tracking each project task within the greater project. In this case, I’m choosing to model the activity of a single project and its related tasks as a stream of events like:

  1. ProjectStarted
  2. TaskRecorded
  3. TaskStarted
  4. TaskFinished
  5. ProjectCompleted

Starting a new Event Stream

Using event sourcing as our persistence strategy, the real system state is the raw events that model state changes or transitions of our project. As an example, let’s say that our system records and initializes a “stream” of events for a new project with this command handler:

    public class NewProjectCommand
    {
        public string Name { get; set; }
        public string[] Tasks { get; set; }
        public Guid ProjectId { get; set; } = Guid.NewGuid();
    }

    public class NewProjectHandler
    {
        private readonly IDocumentSession _session;

        public NewProjectHandler(IDocumentSession session)
        {
            _session = session;
        }

        public Task Handle(NewProjectCommand command)
        {
            var timestamp = DateTimeOffset.Now;
            var started = new ProjectStarted  
            {
                Name = command.Name, 
                Timestamp = timestamp
            };

            var tasks = command.Tasks
                .Select(name => new TaskRecorded {
                    Timestamp = timestamp, 
                    Title = name
                });

            // This tells Marten that it should create a new project
            // stream
            _session.Events.StartStream(command.ProjectId, started);

            // This quietly appends events to the event stream
            // created in the line of code above
            _session.Events.Append(command.ProjectId, tasks);

            // The actual persistence to Postgresql happens here
            return _session.SaveChangesAsync();
        }
    }

The call to StartStream() up above tells Marten that it should create a new event stream with the supplied project id. As a long requested improvement in Marten for V4, StartStream() guarantees that the transaction cannot succeed if another project event stream with that project id already exists. This helps the system prevent users from accidentally duplicating projects — at least by its project id anyway.

Assuming that there is no existing stream with the project id, Marten will create a new record to track the new project stream and individual records in Postgresql to persist the raw event data along with metadata like the stream version, time stamps, and the event type.

But we need a “read side” projection of the Projects!

So great, we can persist the raw events, and there’s plenty of valuable things we can do later with those events. However, our application sooner or later is going to need to know what the current state of an ongoing project for user screens or validation logic, so we need some way to compile the events for a single project into some kind of “here’s the current state of the Project” data structure — and that’s where Marten’s support for projections comes into play.

To model a “projected” view of a project from its raw events, I’m creating a single Project class to model the full state of a single, ongoing project. To reduce the number of moving parts, I’m going to make Project be “self-aggregating” such that it’s responsible for mutating itself based on incoming events:

    public class Project
    {
        private readonly IList _tasks = new List();

        public Project(ProjectStarted started)
        {
            Version = 1;
            Name = started.Name;
            StartedTime = started.Timestamp;
        }

        // This gets set by Marten
        public string Id { get; set; }

        public long Version { get; set; }
        public DateTimeOffset StartedTime { get; private set; }
        public DateTimeOffset? CompletedTime { get; private set; }

        public string Name { get; private set; }

        public ProjectTask[] Tasks
        {
            get
            {
                return _tasks.ToArray();
            }
            set
            {
                _tasks.Clear();
                _tasks.AddRange(value);
            }
        }

        public void Apply(TaskRecorded recorded, IEvent e)
        {
            Version = e.Version;
            var task = new ProjectTask
            {
                Title = recorded.Title,
                Number = _tasks.Max(x => x.Number) + 1,
                Recorded = recorded.Timestamp
            };

            _tasks.Add(task);
        }

        public void Apply(TaskStarted started, IEvent e)
        {
            Version = e.Version;
            var task = _tasks.FirstOrDefault(x => x.Number == started.Number);

            // Remember this isn't production code:)
            if (task != null) task.Started = started.Timestamp;
        }

        public void Apply(TaskFinished finished, IEvent e)
        {
            Version = e.Version;
            var task = _tasks.FirstOrDefault(x => x.Number == finished.Number);

            // Remember this isn't production code:)
            if (task != null) task.Finished = finished.Timestamp;
        }

        public void Apply(ProjectCompleted completed, IEvent e)
        {
            Version = e.Version;
            CompletedTime = completed.Timestamp;
        }
    }

I didn’t choose to use that here, but I want to point out that you can use immutable aggregates with Marten V4. In that case you’d simply have the Apply() methods return a new Project object (as far as Marten is concerned anyway). Functional programming enthusiasts will cheer the built in support for immutability, some of you will point out that that leads to less efficient code by increasing the number of object allocations and more code ceremony, and I will happily say that Marten V4 let’s you make that decision to suit your own needs and preferences;-)

Also see Event Sourcing with Marten V4: Aggregated Projections for more information on other alternatives for expressing aggregated projections in Marten.

Working in our project management domain, I’d like the Project aggregate document to be updated every time events are captured for an event related to a project. I’ll add the Project document as a self-aggregating, inline projection in my Marten system like this:

    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddMarten(opts =>
            {
                opts.Connection("some connection");

                // Direct Marten to update the Project aggregate
                // inline as new events are captured
                opts.Events
                    .Projections
                    .SelfAggregate<Project>(ProjectionLifecycle.Inline);

            });
        }
    }

As an example of how inline projections come into play, let’s look at another sample command handler for adding a new task to an ongoing project with a new task title :

    public class CreateTaskCommand
    {
        public string ProjectId { get; set; }
        public string Title { get; set; }
    }

    public class CreateTaskHandler
    {
        private readonly IDocumentSession _session;

        public CreateTaskHandler(IDocumentSession session)
        {
            _session = session;
        }

        public Task Handle(CreateTaskCommand command)
        {
            var recorded = new TaskRecorded
            {
                Timestamp = DateTimeOffset.UtcNow, 
                Title = command.Title
            };

            _session.Events.Append(command.ProjectId, recorded);
            return _session.SaveChangesAsync();
        }
    }

When SaveChangesAsync() is called above, Marten will issue a single database commit that captures a new TaskRecorded event. In that same commit Marten will fetch the persisted Project document for that project event stream, apply the changes from the new event, and persist the Project document that reflects the information in the events.

Let’s say that we have a user interface in our project management system that allows users to review and edit projects and tasks. Independent of the event streams, the “query” side of our application can happily retrieve the latest data for the Project documents by querying with Marten’s built in document database functionality like this simple MVC controller endpoint:

    public class ProjectController: ControllerBase
    {
        private readonly IQuerySession _session;

        public ProjectController(IQuerySession session)
        {
            _session = session;
        }

        [HttpGet("/api/project/{projectId}")]
        public Task<Project> Get(string projectId)
        {
            return _session.LoadAsync<Project>(projectId);
        }
    }

For a future blog post, I’ll show you a new Marten V4 feature for ultra efficient “read side” web services by streaming JSON data directly from Postgresql straight down the HTTP response body without ever wasting any time with JSON serialization.

Concurrency Problems Everywhere!!!

Don’t name a software project “Genesis”. Hubristic project names lead to massive project failures.

The “inline” projection strategy is easy to use in Marten, but it’s vulnerable to oddball concurrency problems if you’re not careful in your system design. Let’s say that we have two coworkers named Jack and Jill using our project management system. Jack pulls up the data for the “Genesis Project” in the UI client, then gets pulled into a hallway conversation (remember when we used to have those in the before times?). While he’s distracted, Jill makes some changes to the “Genesis Project” to record some tasks she and Jack had talked about earlier and saves the changes to the server. Jack finally gets back to his machine and makes basically the same edits to “Genesis” that Jill already did and saves the data. If we build our project management system naively, we’ve now allowed Jack and Jill to duplicate work and the “Genesis Project” task management is all messed up.

One way to prevent that concurrent change issue is to detect that the project has been changed by Jill when Jack tries to persist his duplicate changes and give him the chance to update his screen with the latest data before pushing new project task changes.

Going back to the Project aggregate document, as Marten appends events to a stream, it increments a numeric version number to each event within a stream. Let’s say in our project management system that we always want to know what the latest version of the project. Marten V4 finally allows you to use event metadata like the event version number within inline projections (this has been a long running request from some of our users). You can see that in action in the method below that updates Project based on a TaskStarted event. You might notice that I also pass in the IEvent object that would let us access the event metadata:

public void Apply(TaskStarted started, IEvent e)
{
    // Update the Project document based on the event version
    Version = e.Version;
    var task = _tasks.FirstOrDefault(x => x.Number == started.Number);

    // Remember this isn't production code:)
    if (task != null) task.Started = started.Timestamp;
}

Now, having the stream version number on the Project document turns out to be very helpful for concurrency issues. Since we’re worried about a Project event stream state getting out of sync if the system receives concurrent updates you can pass the current version that’s conveniently updated on the Project read side document down to the user interface, and have the user interface send you what it thinks is the current version of the project when it tries to make updates. If the underlying project event stream has changed since the user interface fetched the original data, we can make Marten reject the additional events. This is a form of an offline optimistic concurrency check, as we’re going to assume that everything is hunky dory, and just let the infrastructure reject the changes as an exceptional case.

To put that into motion, let’s say that our project management user interface posts this command up to the server to close a single project task:

    public class CompleteTaskCommand
    {
        public Guid ProjectId { get; set; }
        public int TaskNumber { get; set; }
        
        // This is the version of the project data
        // that was being edited in the user interface
        public long ExpectedVersion { get; set; }
    }

In the command handler, we can direct Marten to reject the new events being appended if the stream has been changed in between the user interface having fetched the project data and submitting its updates to the server:

    public class CompleteTaskHandler
    {
        private readonly IDocumentSession _session;

        public CompleteTaskHandler(IDocumentSession session)
        {
            _session = session;
        }

        public Task Handle(CompleteTaskCommand command)
        {
            var @event = new TaskFinished
            {
                Number = command.TaskNumber,
                Timestamp = DateTimeOffset.UtcNow
            };

            _session.Events.Append(
                command.ProjectId,

                // Using this overload will make Marten do
                // an optimistic concurrency check against
                // the existing version of the project event
                // stream as it commits
                command.ExpectedVersion,

                @event);
            return _session.SaveChangesAsync();
        }
    }

In the CompleteTaskHandler above, the call to SaveChangesAsync() will throw a EventStreamUnexpectedMaxEventIdException exception if the project event stream has advanced past the existing version assumed by the originator of the command in command.ExpectedVersion. To make our system more resilient, we would need to catch the Marten exception and deal with the proper application workflow.

What I showed above is pretty draconian in terms of what edits it allows to go through. In other cases you may get by with a simple workflow that just tries to guarantee that a single Project aggregate is only being updated by a single process at any time. Marten V4 comes through with a couple new ways to append events with more control over concurrent updates to a single event stream.

Let’s stick to optimistic checks for now and look at the new AppendOptimistic() method for appending events to an existing event stream with a rewritten version of our CompleteTaskCommand handling:

    public class CompleteTaskHandler2
    {
        private readonly IDocumentSession _session;

        public CompleteTaskHandler2(IDocumentSession session)
        {
            _session = session;
        }

        public Task Handle(CompleteTaskCommand command)
        {
            var @event = new TaskFinished
            {
                Number = command.TaskNumber,
                Timestamp = DateTimeOffset.UtcNow
            };

            // If some other process magically zips
            // in and updates this project event stream
            // between the call to AppendOptimistic()
            // and SaveChangesAsync(), Marten will detect
            // that and reject the transaction
            _session.Events.AppendOptimistic(
                command.ProjectId,
                @event);

            return _session.SaveChangesAsync();
        }
    }

We would still need to catch the Marten exceptions and handle those somehow. I myself would typically try to handle that with a messaging or command execution framework like MassTransit or my own Jasper project that comes with robust exception handling and retry capabilities.

At this point everything I’ve shown for concurrency control involves optimistic locks that result in exceptions being thrown and transactions being rejected. For another alternative, Marten V4 leverages Postgresql’s robust row locking support with this version of our CompleteTaskCommand handler that uses the new V4 AppendExclusive() method:

    public class CompleteTaskHandler3
    {
        private readonly IDocumentSession _session;

        public CompleteTaskHandler3(IDocumentSession session)
        {
            _session = session;
        }

        public Task Handle(CompleteTaskCommand command)
        {
            var @event = new TaskFinished
            {
                Number = command.TaskNumber,
                Timestamp = DateTimeOffset.UtcNow
            };

            // This tries to acquire an exclusive
            // lock on the stream identified by
            // command.ProjectId in the database
            // so that only one process at a time
            // can update this event stream
            _session.Events.AppendExclusive(
                command.ProjectId,
                @event);
            return _session.SaveChangesAsync();
        }
    }

This is heavier in terms of its impact on the database, but simpler to implement as there is no flow control by exceptions to worry about.

I should also note that both the AppendOptimistic() and AppendExclusive() methods will verify that the stream already exists and throw an exception if the stream does not.

Leave a comment