Marten Event Sourcing Gets Some New Tools

JasperFx Software has gotten the chance this work to build out several strategic improvements to both Marten and Wolverine through collaborations with our clients who have had some specific needs. This has been highly advantageous because it’s helped push some significant, long planned technical improvements while getting all important feedback as clients integrate the new features. Today I’d like to throw out a couple valuable features and capabilities that Marten has gained as part of recent client work.

“Side Effects” in Projections

In a recent post called Multi Step Workflows with the Critter Stack I talked about using Wolverine sagas (really process managers if you have to be precise about the pattern name because I’m slopping about interchanging “saga” and “process manager”) for long running workflows. In that post I talked about how an incoming file would be:

  1. Broken up into batches of rows
  2. Each batch would be validated as a separately handled message for some parallelization and more granular retries
  3. When there were validation results recorded for each record batch, the file processing itself would either stop with a call back message summarizing the failures to the upstream sender or continue to the next stage.

As it turns out, event sourcing with a projected aggregate document for the state of the file import turns out to be another good way to implement this workflow, especially with the new “side effects” model recently introduced in Marten at the behest of a JasperFx client.

In this usage. let’s say that we have this aggregated state for a file being imported:

public class FileImportState
{

    // Identity for this saga within our system
    public Guid Id { get; set; }
    public string FileName { get; set; }
    public string PartnerTrackingNumber { get; set; }
    public DateTimeOffset Created { get; set; } = DateTimeOffset.UtcNow;
    public List<RecordBatchTracker> RecordBatches { get; set; } = new();

    public FileImportStage Stage { get; set; } = FileImportStage.Validating;
}

The FileImportState would be updated by appending events like BatchValidated, with Marten “projecting” those events in the rolled up state of the entire file. In Marten’s async daemon process that runs projections in a background process, Marten is processing a certain range (think events 10000 to 11000) at a time. As the daemon processes events into a projection for the FileImportState, it’s grouping the events in that range into event “slices” that are grouped by file id.

For managing the workflow, we can now append all new events as a “side effect” of processing an event slice in the daemon as the aggregation data is updated in the background. Let’s say that we have a single stream projection for our FileImportState aggregation like this below:

public class FileImportProjection : SingleStreamProjection<FileImportState>
{
    // Other Apply / Create methods to update the state of the 
    // FileImportState aggregate document

    public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEventSlice<FileImportState> slice)
    {
        var state = slice.Aggregate;
        if (state.Stage == FileImportStage.Validating &&
            state.RecordBatches.All(x => x.ValidationStatus != RecordStatus.Pending))
        {
            // At this point, the file is completely validated, and we can decide what should happen next with the
            // file
            
            // Are there any validation message failures?
            var rejected = state.RecordBatches.SelectMany(x => x.ValidationMessages).ToArray();
            if (rejected.Any())
            {
                // Append a validation failed event to the stream
                slice.AppendEvent(new ValidationFailed());
                
                // Also, send an outgoing command message that summarizes
                // the validation failures
                var message = new FileRejectionSummary()
                {
                    FileName = state.FileName,
                    Messages = rejected,
                    TrackingNumber = state.PartnerTrackingNumber
                };
                
                // This will "publish" a message once the daemon
                // has successfully committed all changes for the 
                // current batch of events
                // Unsurprisingly, there's a Wolverine integration 
                // for this
                slice.PublishMessage(message);
            }
            else
            {
                slice.AppendEvent(new ValidationSucceeded());
            }
        }

        return new ValueTask();
    }
}

And unsurprisingly, there is also the ability to “publish” outgoing messages as part of processing through asynchronous projections with an integration to Wolverine available.

This feature has long, long been planned and I was glad to get the chance to build it out this fall for a client. I’m happy to say that this is in production for them — after the obligatory shakedown cruise and some bug fixes.

Optimized Projection Rebuilds

Another JasperFx client has a system where they retrofitted Marten into an in flight system using event sourcing for a very large data set, but didn’t take advantage of many Marten capabilities including the ability to effectively pre-build or “snapshot” projected data to optimize system state reads.

With a little bit of work in their system, we knew we would be able to introduce the new projection snapshotting into their system with Marten’s blue/green deployment model for projections where Marten would immediately start trying to pre-build a new projection (or new version of an existing projection) from scratch. Great! Except we knew that was potentially going to be a major performance problem until the projection caught up to the current “high water mark” of the event store.

To ease the cost of introducing a new, persisted projection on top of ~100 million events, we built out Marten’s new optimized projection rebuild feature. To demonstrate what I mean, let’s first opt into using this feature (it had to be opt in because it forces users to made additive changes to existing database tables):

builder.Services.AddMarten(opts =>
{
    opts.Connection("some connection string");

    // Opts into a mode where Marten is able to rebuild single
    // stream projections faster by building one stream at a time
    // Does require new table migrations for Marten 7 users though
    opts.Events.UseOptimizedProjectionRebuilds = true; 
});

Now, when our users redeploy their system with the new snapshotted projection running with Marten’s Async workflow for the first time Marten will see that the projection has not been processed before, so will try to use an “optimized rebuild mode.” Since we’ve turned on optimized projection rebuilds, for a single stream projection, Marten runs the projection in “rebuild” mode by:

  1. First building a new table to track each event stream that relates to the aggregate type in question, but builds this table in reverse order of when each stream has been changed. The whole point of that is to make sure our optimized rebuild process is dealing with the most recently changed event streams so that the system can perform well even while the rebuild process in running
  2. The rebuild process rebuilds the aggregates event stream by event stream as a way of minimizing the number of database reads and writes it takes to rebuild the single stream projection. Compare that to the previous, naive “left fold” approach where it just works from event sequence = 1 to the high water mark and constantly writes and reads back the same projection document as its encountered throughout the event store
  3. When the optimized rebuild is complete, it switches the projection to running in its normal, continuous mode from the point at which the rebuild started

That’s a lot of words and maybe some complicated explanation, but the point is that Marten makes it possible to introduce new projections to a large, in flight system without incurring system downtime or even inconsistent data showing up to users.

Other Recent Improvements for Clients

Some other recent work that JasperFx has done for our clients includes:

Summary

I was part of a discussion slash argument a couple weeks ago about whether or not it was necessary to use an off the shelf event sourcing library or framework like Marten, or if you were just fine rolling your own. While I’d gladly admit that you can easily build purely a storage subsystem for events, it’s not even remotely feasible to quickly roll your own tooling that matches advanced features in Marten such as the work I presented here.

Leave a comment