Scaling Marten with PostgreSQL Read Replicas

JasperFx Software is open for business and offering consulting services (like helping you craft scalability strategies!) and support contracts for both Marten and Wolverine so you know you can feel secure taking a big technical bet on these tools and reap all the advantages they give for productive and maintainable server side .NET development.

First off, big thanks to Jaedyn Tonee for actually doing all the work I’m writing about here. JT recently accepted a long standing “offer” to be part of the official “Critter Stack Core Team.”

Marten 7 embraced several new-ish features in Npgsql, including the NpgsqlDataSource concept for connection management. This opened Marten up for a couple other capabilities like integrating Marten with .NET Aspire. It also enabled Marten to utilize PostgreSQL Read Replicas for read only query usage. Read Replicas are valuable both for high availability of database systems, and to offload heavy read intensive operations off of the main database server and onto read replicas.

To opt into read replicas with Marten, you need to utilize the new MultiHostNpgsqlDataSource in Npgsql and Marten as shown in this sample code:

// services is an IServiceCollection collection
services.AddMultiHostNpgsqlDataSource(ConnectionSource.ConnectionString);

services.AddMarten(x =>
    {
        // Will prefer standby nodes for querying.
        x.Advanced.MultiHostSettings.ReadSessionPreference = TargetSessionAttributes.PreferStandby;
    })
    .UseLightweightSessions()
    .UseNpgsqlDataSource();

Behind the scenes, if you are opting into this model, when you make a query with Marten like this:

       // theSession is an IDocumentSession 
       var users = await theSession
            .Query<User>()
            .Where(x => x.FirstName == "Sam")
            .ToListAsync();

Marten will be trying to connect to a PostgreSQL read replica to service that LINQ query.

Summary

I hope this is an important new tool for Marten users to achieve both high availability and scalability within systems with bigger data loads.

Recent Marten & Wolverine Improvements and Roadmap Update

I’d love any feedback on any of this of course. And from something I wrote in a survey of sorts about the commercial product ideas down below yesterday (which is partially a response to a recent query wanting to know how Marten stacks up against AxonIQ from the JVM world):

There’s definitely an opportunity for a full blown Event Driven Architecture stack in the .NET ecosystem – and frankly, Jeremy really wants the Critter Stack to grow into the very best Event Driven Architecture toolset on the planet to the point where shops will purposely adopt .NET just because of the Critter Stack

I’m honestly just thinking out loud in this post, but a lot has been released for both Marten and Wolverine since the big Marten 7.0 release and the last time I published a roadmap update for the two big toolsets.

Here’s some recent highlights you might have missed from the past two months:

What’s Next?

Getting Marten 7.0 and the accompanying Wolverine 2.0 release across the finish line enabled a flurry of follow up features the past two month — largely driven by a couple JasperFx Software client projects (yeah!). Moving forward, I think these are the remaining strategic features that will hopefully go in soon:

  • Marten will get the ability to use PostgreSQL read replicas for read-only queries very soon as a way to scale applications
  • A new, alternative “Quick Append Event” workflow to Marten. The current internal mechanism in Marten for appending events is built for maximal support for “Inline” projections. This new model would simplify the runtime mechanics for appending events and hopefully make the Marten event store more robust in the face of concurrent requests than it is today. This model should also allow for faster performance if users opt into this mechanism.
  • Some ability to efficiently raise or append events (or side effects of some sort) from running projections. This has been in the backlog for a long time. I’d certainly feel better about this if we had some concrete use cases that folks want to do here. The “Quick Append Event” workflow would be a prerequisite
  • Using PostgreSQL partitioning on the Marten streams and events tables. This is the oldest item in the Marten backlog that’s been kicked down the road forever, but I think this is potentially huge for Marten scalability. This would probably be done in conjunction with some tactical improvements to the Marten projection model and the Wolverine aggregate handler workflow to make the archiving more accessible. The biggest issue has always been in how to handle the database migration model for this feature to convert brownfield applications
  • Wolverine 3.0
    • Try to eliminate the hard dependency on Lamar as the IoC container for Wolverine. Most people don’t care, but the folks who do care really don’t like that. So far from my research it looks like the answer is going to be supporting the built in .NET DI container or Lamar with the current Wolverine model — and we can maybe think about supporting other IoC containers with a step back in the runtime optimizations that Wolverine can do today with Lamar. I think it’s quickly coming to the point where all other IoC libraries besides the built in ServiceProvider container from Microsoft die off — even though there are still plenty of areas where that container is lacking compared to alternatives. Alas.
    • Try to apply the Wolverine error handling policies that today only work for Wolverine message handlers to HTTP endpoints

Critter Stack Pro

The Marten & Wolverine community is helping Babu, Jeffry Gonzalez & I brainstorm ideas for the future “Critter Stack Pro” suite of commercial add on tools. The goal is to (make money) make the “Critter Stack” be much more manageable in production environments, help troubleshoot production support issues, heal the system from runtime problems, and understand resource utilization. We don’t have the exact roadmap or exact technical approach locked down yet.

Right now that looks like:

  • A headless library to better distribute Marten projections and subscriptions across a running cluster of processes. This is “ready for testing” by a JasperFx customer
  • A management console application that will be offered both as an ASP.Net Core add on library for a single application or distributed as a standalone Docker image for managing multiple systems from one console
    • Analyze system configuration
    • Manage Wolverine’s “dead letter queue” for messages, including the ability to replay messages
    • Some integration with Open Telemetry and metrics data emitted from Marten and/or Wolverine applications, probably at a summary level with navigation to the “real” observability platform (Prometheus? Grafana? Something totally different?)
    • Management for Marten Asynchronous Projections and Subscriptions
      • Performance information
      • Triggering rebuilds or replays
      • Pausing/restarting projections or subscriptions
    • Tenant Management
      • Dynamically add or remove tenant databases
      • Pause tenants
      • Understand resource utilization and performance on a tenant by tenant basis
    • Marten Event Store Explorer — and we’re collecting several ideas for this
    • Wolverine Message Processing Explorer — ditto
    • Wolverine Scheduled Message Dashboard

My fervent hope is that this tooling will be demonstrable for friendly early adopters at the end of the 2nd quarter, and looking good in the 4th quarter to try to make a serious push for sales in the all important 1st quarter of next year.

And Beyond!

I’m very interested in porting just the event store functionality from Marten to a new library targeting SQL Server as the backing store. The goal here would be to give it the same Wolverine support as the existing Marten functionality. This would be pending some of the Marten projection model stabilizing up above.

Maybe adding CosmosDb and/or DynamoDb support to Wolverine.

And who knows? It’s likely something I’m not even aware of now will be the highest priority in the 3rd and 4th quarters!

Critter Stack Improvements for Event Driven Architecture

JasperFx Software is open for business and offering consulting services (like helping you craft modular monolith strategies!) and support contracts for both Marten and Wolverine so you know you can feel secure taking a big technical bet on these tools and reap all the advantages they give for productive and maintainable server side .NET development.

As a follow on post from First Class Event Subscriptions in Marten last week, let’s introduce Wolverine into the mix for end to end Event Driven Architecture approaches. Using Wolverine’s new Event Subscriptions model, “Critter Stack” systems can automatically process Marten event data with Wolverine message handlers:

If all we want to do is publish Marten event data through Wolverine’s message publishing (which remember, can be either to local queues or external message brokers), we have this simple recipe:

using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        opts.Services
            .AddMarten()
            
            // Just pulling the connection information from 
            // the IoC container at runtime.
            .UseNpgsqlDataSource()
            
            // You don't absolutely have to have the Wolverine
            // integration active here for subscriptions, but it's
            // more than likely that you will want this anyway
            .IntegrateWithWolverine()
            
            // The Marten async daemon most be active
            .AddAsyncDaemon(DaemonMode.HotCold)
            
            // This would attempt to publish every non-archived event
            // from Marten to Wolverine subscribers
            .PublishEventsToWolverine("Everything")
            
            // You wouldn't do this *and* the above option, but just to show
            // the filtering
            .PublishEventsToWolverine("Orders", relay =>
            {
                // Filtering 
                relay.FilterIncomingEventsOnStreamType(typeof(Order));

                // Optionally, tell Marten to only subscribe to new
                // events whenever this subscription is first activated
                relay.Options.SubscribeFromPresent();
            });
    }).StartAsync();

First off, what’s a “subscriber?” That would mean any event that Wolverine recognizes as having:

  • A local message handler in the application for the specific event type, which would effectively direct Wolverine to publish the event data to a local queue
  • A local message handler in the application for the specific IEvent<T> type, which would effectively direct Wolverine to publish the event with its IEvent Marten metadata wrapper to a local queue
  • Any event type where Wolverine can discover subscribers through routing rules

All the Wolverine subscription is doing is effectively calling IMessageBus.PublishAsync() against the event data or the IEvent<T> wrapper. You can make the subscription run more efficiently by applying event or stream type filters for the subscription.

If you need to do a transformation of the raw IEvent<T> or the internal event type to some kind of external event type for publishing to external systems when you want to avoid directly coupling other subscribers to your system’s internals, you can accomplish that by just building a message handler that does the transformation and publishes a cascading message like so:

public record OrderCreated(string OrderNumber, Guid CustomerId);

// I wouldn't use this kind of suffix in real life, but it helps
// document *what* this is for the sample in the docs:)
public record OrderCreatedIntegrationEvent(string OrderNumber, string CustomerName, DateTimeOffset Timestamp);

// We're going to use the Marten IEvent metadata and some other Marten reference
// data to transform the internal OrderCreated event
// to an OrderCreatedIntegrationEvent that will be more appropriate for publishing to
// external systems
public static class InternalOrderCreatedHandler
{
    public static Task<Customer?> LoadAsync(IEvent<OrderCreated> e, IQuerySession session,
        CancellationToken cancellationToken)
        => session.LoadAsync<Customer>(e.Data.CustomerId, cancellationToken);
    
    
    public static OrderCreatedIntegrationEvent Handle(IEvent<OrderCreated> e, Customer customer)
    {
        return new OrderCreatedIntegrationEvent(e.Data.OrderNumber, customer.Name, e.Timestamp);
    }
}

Process Events as Messages in Strict Order

In some cases you may want the events to be executed by Wolverine message handlers in strict order. With the recipe below:

using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        opts.Services
            .AddMarten(o =>
            {
                // This is the default setting, but just showing
                // you that Wolverine subscriptions will be able
                // to skip over messages that fail without
                // shutting down the subscription
                o.Projections.Errors.SkipApplyErrors = true;
            })

            // Just pulling the connection information from 
            // the IoC container at runtime.
            .UseNpgsqlDataSource()

            // You don't absolutely have to have the Wolverine
            // integration active here for subscriptions, but it's
            // more than likely that you will want this anyway
            .IntegrateWithWolverine()
            
            // The Marten async daemon most be active
            .AddAsyncDaemon(DaemonMode.HotCold)
            
            // Notice the allow list filtering of event types and the possibility of overriding
            // the starting point for this subscription at runtime
            .ProcessEventsWithWolverineHandlersInStrictOrder("Orders", o =>
            {
                // It's more important to create an allow list of event types that can be processed
                o.IncludeType<OrderCreated>();

                // Optionally mark the subscription as only starting from events from a certain time
                o.Options.SubscribeFromTime(new DateTimeOffset(new DateTime(2023, 12, 1)));
            });
    }).StartAsync();

In this recipe, Marten & Wolverine are working together to call IMessageBus.InvokeAsync() on each event in order. You can use both the actual event type (OrderCreated) or the wrapped Marten event type (IEvent<OrderCreated>) as the message type for your message handler.

In the case of exceptions from processing the event with Wolverine:

  1. Any built in “retry” error handling will kick in to retry the event processing inline
  2. If the retries are exhausted, and the Marten setting for StoreOptions.Projections.Errors.SkipApplyErrors is true, Wolverine will persist the event to its PostgreSQL backed dead letter queue and proceed to the next event. This setting is the default with Marten when the daemon is running continuously in the background, but false in rebuilds or replays
  3. If the retries are exhausted, and SkipApplyErrors = false, Wolverine will tell Marten to pause the subscription at the last event sequence that succeeded

Custom, Batched Subscriptions

The base type for all Wolverine subscriptions is the Wolverine.Marten.Subscriptions.BatchSubscription class. If you need to do something completely custom, or just to take action on a batch of events at one time, subclass that type. Here is an example usage where I’m using event carried state transfer to publish batches of reference data about customers being activated or deactivated within our system:

public record CompanyActivated(string Name);

public record CompanyDeactivated();

public record NewCompany(Guid Id, string Name);

// Message type we're going to publish to external
// systems to keep them up to date on new companies
public class CompanyActivations
{
    public List<NewCompany> Additions { get; set; } = new();
    public List<Guid> Removals { get; set; } = new();

    public void Add(Guid companyId, string name)
    {
        Removals.Remove(companyId);
        
        // Fill is an extension method in JasperFx.Core that adds the 
        // record to a list if the value does not already exist
        Additions.Fill(new NewCompany(companyId, name));
    }

    public void Remove(Guid companyId)
    {
        Removals.Fill(companyId);

        Additions.RemoveAll(x => x.Id == companyId);
    }
}

public class CompanyTransferSubscription : BatchSubscription
{
    public CompanyTransferSubscription() : base("CompanyTransfer")
    {
        IncludeType<CompanyActivated>();
        IncludeType<CompanyDeactivated>();
    }

    public override async Task ProcessEventsAsync(EventRange page, ISubscriptionController controller, IDocumentOperations operations,
        IMessageBus bus, CancellationToken cancellationToken)
    {
        var activations = new CompanyActivations();
        foreach (var e in page.Events)
        {
            switch (e)
            {
                // In all cases, I'm assuming that the Marten stream id is the identifier for a customer
                case IEvent<CompanyActivated> activated:
                    activations.Add(activated.StreamId, activated.Data.Name);
                    break;
                case IEvent<CompanyDeactivated> deactivated:
                    activations.Remove(deactivated.StreamId);
                    break;
            }
        }
        
        // At the end of all of this, publish a single message
        // In case you're wondering, this will opt into Wolverine's
        // transactional outbox with the same transaction as any changes
        // made by Marten's IDocumentOperations passed in, including Marten's
        // own work to track the progression of this subscription
        await bus.PublishAsync(activations);
    }
}

And the related code to register this subscription:

using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        opts.UseRabbitMq(); 
        
        // There needs to be *some* kind of subscriber for CompanyActivations
        // for this to work at all
        opts.PublishMessage<CompanyActivations>()
            .ToRabbitExchange("activations");
        
        opts.Services
            .AddMarten()

            // Just pulling the connection information from 
            // the IoC container at runtime.
            .UseNpgsqlDataSource()
            
            .IntegrateWithWolverine()
            
            // The Marten async daemon most be active
            .AddAsyncDaemon(DaemonMode.HotCold)

                                
            // Register the new subscription
            .SubscribeToEvents(new CompanyTransferSubscription());
    }).StartAsync();

Summary

The feature set shown here has been a very long planned set of capabilities to truly extend the “Critter Stack” into the realm of supporting Event Driven Architecture approaches from soup to nuts. Using the Wolverine subscriptions automatically gets you support to publish Marten events to any transport supported by Wolverine itself, and does so in a much more robust way than you can easily roll by hand like folks did previously with Marten’s IProjection interface. I’m currently helping a JasperFx Software client utilize this functionality for data exchange that has strict ordering and at least once delivery guarantees.

Marten, PostgreSQL, and .NET Aspire walk into a bar…

This is somewhat a follow up from yesterday’s post on Marten, Metrics, and Open Telemetry Support. I was very hopeful about the defunct Project Tye, and have been curious about .NET Aspire as a more ambitious offering since it was announced. As part of the massive Marten V7 release, we took some steps to ensure that Marten could use PostgreSQL databases controlled by .NET Aspire.

I finally got a chance to put together a sample Marten system named using .NET Aspire called MartenWithProjectAspire on GitHub. Simplified from some longstanding Marten test projects, consider this little system:

At runtime, the EventPublisher service continuously appends events that represent progress in a Trip event stream to the Marten-ized PostgreSQL database. The TripBuildingService is running Marten’s async daemon subsystem that constantly reads in new events to the PostgreSQL database and builds or updates projected documents back to the database to represent the current state of the event store.

The end result was a single project named AspireHost that when executed, will use .NET Aspire to start a new PostgreSQL docker container and start up the EventPublisher and TripBuildingService services while passing the connection string to the new PostgreSQL database to these services at runtime with a little bit of Environment variable sleight of hand.

You can see the various projects and containers from the Aspire dashboard:

and even see some of the Open Telemetry activity traced by Marten and visualized through Aspire:

Honestly, it took me a bit of trial and error to get this all working together. First, we need to configure Marten to use an NpgsqlDataSource connection to the PostgreSQL database that will be loaded from each service’s IoC container — then tell Marten to use that NpgsqlDataSource.

After adding Nuget references for Aspire.Npgsql and Marten itself, I added the second line of code shown below to the top of the Program file for both services using Marten:

var builder = Host.CreateApplicationBuilder();

// Register the NpgsqlDataSource in the IoC container using
// connection string named "marten" from IConfiguration
builder.AddNpgsqlDataSource("marten");

That’s really just a hook to add a registration for the NpgsqlDataSource type to the application’s IoC container with the expectation that the connection string will be in the application’s configuration connection string collection with the key “marten.”

One of the major efforts with Marten 7 was rewiring Marten’s internals (then Wolverine’s) to strictly use the new NpgsqlDataSource concept for database connectivity. If you maybe caught me being less than polite about Npgsql on what’s left of Twitter, just know that the process was very painful but it’s completely done now and working well outside of the absurd noisiness of built in Npgsql logging.

Next, I have to explicitly tell Marten itself to load the NpgsqlDataSource object from the application’s IoC container instead of the older, idiomatic approach of passing a connection string directly to Marten as shown below:

builder.Services.AddMarten(opts =>
    {
        // Other configuration, but *not* the connection
    })
    
    // Use PostgreSQL data source from the IoC container
    .UseNpgsqlDataSource();

Now, switching to the AspireHost, I needed to add a Nuget reference to Aspire.Hosting.PostgreSQL in order to be able to bootstrap the PostgreSQL database at runtime. I also made project references from AspireHost to EventPublisher and TripBuildingService — which is important because Aspire does some source generation build a strong typed enumeration representing your projects that we’ll use next. That last step confused me when I was first playing with Aspire, so hopefully now you get to bypass that confusion. Maybe.

In the Program file for AspireHost, it’s just this:

var builder = DistributedApplication.CreateBuilder(args);

var postgresdb = builder.AddPostgres("marten");

builder.AddProject<Projects.EventPublisher>("publisher")
    .WithReference(postgresdb);

builder.AddProject<Projects.TripBuildingService>("trip-building")
    .WithReference(postgresdb);

builder.Build().Run();

Now, run the AspireHost project and you are able to run the two other services with the newly activated PostgreSQL Docker container, which you can see from the Docker Desktop dashboard:

Ship it!

Summary

Is .NET Aspire actually useful (I think so, even if it’s just for local development and testing maybe)? Can I explain the new Open Telemetry data exported from Marten? Would I use this instead of a dirt simple Docker Compose file like I do today (probably not to be honest)? Is this all fake?

All these questions and more will be somewhat addressed tomorrow-ish when I try to launch a new YouTube channel for JasperFx Software using the sample from this blog post as the subject for my first ever solo YouTube video.

One more thing…

I did alter the launchSettings.json file of the Aspire host project so it didn’t need to care about HTTPS to this:

{
  "$schema": "https://json.schemastore.org/launchsettings.json",
  "profiles": {
    "http": {
      "commandName": "Project",
      "dotnetRunMessages": true,
      "launchBrowser": true,
      "applicationUrl": "http://localhost:15242",
      "environmentVariables": {
        "ASPNETCORE_ENVIRONMENT": "Development",
        "DOTNET_ENVIRONMENT": "Development",
        "DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "http://localhost:19076",
        "DOTNET_RESOURCE_SERVICE_ENDPOINT_URL": "http://localhost:20101",
        "ASPIRE_ALLOW_UNSECURED_TRANSPORT": "true"
      }
    }
  }
}

Note the usage of the ASPIRE_ALLOW_UNSECURED_TRANSPORT environment variable.

Marten, Metrics, and Open Telemetry Support

Marten 7.10 was released today, and (finally) brings some built in support for monitoring Marten performance by exporting Open Telemetry and Metrics about Marten activity and performance within your system.

To use a little example, there’s a sample application in the Marten codebase called EventPublisher that we use to manually test some of the command line tooling. All that EventPublisher does is to continuously publish randomized events to a Marten event store when it runs. That made it a good place to start with a test harness for our new Open Telemetry support and performance related metrics.

For testing, I was just using the Project Aspire dashboard for viewing metrics and Open Telemetry tracing. First off, I enabled the “opt in” connection tracing for Marten, and put it into a verbose mode that’s probably only suitable for debugging or performance optimization work:

        // builder is a HostApplicationBuilder object
        builder.Services.AddMarten(opts =>
        {
            // Other Marten configuration...
            
            // Turn on Otel tracing for connection activity, and
            // also tag events to each span for all the Marten "write"
            // operations
            opts.OpenTelemetry.TrackConnections = TrackLevel.Verbose;

            // This opts into exporting a counter just on the number
            // of events being appended. Kinda a duplication
            opts.OpenTelemetry.TrackEventCounters();
            );
        });

That’s just the Marten side of things, so to hook up an Open Telemetry exporter for the Aspire dashboard, I added (really copy/pasted) this code (note that you’ll need the OpenTelemetry.Extensions.Hosting and OpenTelemetry.Exporter.OpenTelemetryProtocol Nugets added to your project):

        builder.Logging.AddOpenTelemetry(logging =>
        {
            logging.IncludeFormattedMessage = true;
            logging.IncludeScopes = true;
        });

        builder.Services.AddOpenTelemetry()
            .WithMetrics(metrics =>
            {
                metrics
                    .AddRuntimeInstrumentation().AddMeter("EventPublisher");
            })
            .WithTracing(tracing =>
            {
                tracing.AddAspNetCoreInstrumentation()
                    .AddHttpClientInstrumentation();
            });

        var endpointUri = builder.Configuration["OTEL_EXPORTER_OTLP_ENDPOINT"];
        builder.Services.AddOpenTelemetry().UseOtlpExporter();

        builder.Services.AddOpenTelemetry()
            // Enable exports of Open Telemetry activities
            .WithTracing(tracing =>
            {
                tracing.AddSource("Marten");
            })
            
            // Enable exports of metrics
            .WithMetrics(metrics =>
            {
                metrics.AddMeter("Marten");
            });

And now after running that with Aspire, you can see the output:

By itself, these spans, especially when shown in context of being nested within an HTTP request or a message being handled in a service bus framework, can point out where you may have performance issues from chattiness between the application server and the database — which I have found to be a very common source of performance problems out in the real world.

This is an opt in mode, but there are metrics and Open Telemetry tracing that are automatic for the background, async daemon subsystem. Skipping ahead a little bit, here’s a preview of some performance metrics in a related application that shows the “health” of a projection running in Marten’s async daemon subsystem by visualizing the “gap” between the projection’s current progression and the “high water mark” of Marten’s event store (how far the projection is sequentially compared to how far the whole event store itself is):

Summary

This is a short blog post, but I hope even this is enough to demonstrate how useful this new tracing is going to be in this new world order of Open Telemetry tracing tools.

First Class Event Subscriptions in Marten

This feature has been planned for Marten for years, but finally happened this month because a JasperFx Software client had a complicated multi-tenanted integration need for this as part of a complicated multi-tenanted and order sensitive data integration.

Marten recently (these samples are pulled from Marten 7.9) got a first class “event subscription” feature that allows users to take action upon events being appended to Marten’s event store in strict sequential order in a background process. While you’ve long been able to integrate Marten with other systems by using Marten’s older projection model, the newer subscription model is leaner and more efficient for background processing.

Before I get to “what” it is, let’s say that you need to carry out some kind of background processing on these events as they are captured? For example, maybe you need to:

  • Publish events to an external system as some kind of integration?
  • Carry out background processing based on a captured event
  • Build a view representation of the events in something outside of the current PostgreSQL database, like maybe an Elastic Search view for better searching

With this recently added feature, you can utilize Marten’s ISubscription model that runs within Marten’s async daemon subsystem to “push” events into your subscriptions as events flow into your system. Note that this is a background process within your application, and happen in a completely different thread than the initial work of appending and saving events to the Marten event storage.

Subscriptions will always be an implementation of the ISubscription interface shown below:

/// <summary>
/// Basic abstraction for custom subscriptions to Marten events through the async daemon. Use this in
/// order to do custom processing against an ordered stream of the events
/// </summary>
public interface ISubscription : IAsyncDisposable
{
    /// <summary>
    /// Processes a page of events at a time
    /// </summary>
    /// <param name="page"></param>
    /// <param name="controller">Use to log dead letter events that are skipped or to stop the subscription from processing based on an exception</param>
    /// <param name="operations">Access to Marten queries and writes that will be committed with the progress update for this subscription</param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    Task<IChangeListener> ProcessEventsAsync(EventRange page, ISubscriptionController controller,
        IDocumentOperations operations,
        CancellationToken cancellationToken);
}

So far, the subscription model gives you these abilities:

  • Access to the Marten IDocumentOperations service that is scoped to the processing of a single page and can be used to either query additional data or to make database writes within the context of the same transaction that Marten will use to record the current progress of the subscription to the database
  • Error handling abilities via the ISubscriptionController interface argument that can be used to record events that were skipped by the subscription or to completely stop all further processing
  • By returning an IChangeListener, the subscription can be notified right before and right after Marten commits the database transaction for any changes including recording the current progress of the subscription for the current page. This was done purposely to enable transactional outbox approaches like the one in Wolverine. See the async daemon diagnostics for more information.
  • The ability to filter the event types or stream types that the subscription is interested in as a way to greatly optimize the runtime performance by preventing Marten from having to fetch events that the subscription will not process
  • The ability to create the actual subscription objects from the application’s IoC container when that is necessary
  • Flexible control over where or when the subscription starts when it is first applied to an existing event store
  • Some facility to “rewind and replay” subscriptions

To make this concrete, here’s the simplest possible subscription you can make to simply write out a console message for every event:

public class ConsoleSubscription: ISubscription
{
    public Task<IChangeListener> ProcessEventsAsync(EventRange page, ISubscriptionController controller, IDocumentOperations operations,
        CancellationToken cancellationToken)
    {
        Console.WriteLine($"Starting to process events from {page.SequenceFloor} to {page.SequenceCeiling}");
        foreach (var e in page.Events)
        {
            Console.WriteLine($"Got event of type {e.Data.GetType().NameInCode()} from stream {e.StreamId}");
        }

        // If you don't care about being signaled for
        return Task.FromResult(NullChangeListener.Instance);
    }

    public ValueTask DisposeAsync()
    {
        return new ValueTask();
    }
}

And to register that with our Marten store:

var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
    {
        opts.Connection(builder.Configuration.GetConnectionString("marten"));

        // Because this subscription has no service dependencies, we
        // can use this simple mechanism
        opts.Events.Subscribe(new ConsoleSubscription());

        // Or with additional configuration like:
        opts.Events.Subscribe(new ConsoleSubscription(), s =>
        {
            s.SubscriptionName = "Console"; // Override Marten's naming
            s.SubscriptionVersion = 2; // Potentially version as an all new subscription

            // Optionally create an allow list of
            // event types to subscribe to
            s.IncludeType<InvoiceApproved>();
            s.IncludeType<InvoiceCreated>();

            // Only subscribe to new events, and don't try
            // to apply this subscription to existing events
            s.Options.SubscribeFromPresent();
        });
    })
    .AddAsyncDaemon(DaemonMode.HotCold);

using var host = builder.Build();
await host.StartAsync();

Here’s a slightly more complicated sample that publishes events to a configured Kafka topic:

public class KafkaSubscription: SubscriptionBase
{
    private readonly KafkaProducerConfig _config;

    public KafkaSubscription(KafkaProducerConfig config)
    {
        _config = config;

        SubscriptionName = "Kafka";

        // Access to any or all filtering rules
        IncludeType<InvoiceApproved>();

        // Fine grained control over how the subscription runs
        // in the async daemon
        Options.BatchSize = 1000;
        Options.MaximumHopperSize = 10000;

        // Effectively run as a hot observable
        Options.SubscribeFromPresent();
    }

    // The daemon will "push" a page of events at a time to this subscription
    public override async Task<IChangeListener> ProcessEventsAsync(
        EventRange page,
        ISubscriptionController controller,
        IDocumentOperations operations,
        CancellationToken cancellationToken)
    {
        using var kafkaProducer =
            new ProducerBuilder<string, string>(_config.ProducerConfig).Build();

        foreach (var @event in page.Events)
        {
            await kafkaProducer.ProduceAsync(_config.Topic,
                new Message<string, string>
                {
                    // store event type name in message Key
                    Key = @event.Data.GetType().Name,
                    // serialize event to message Value
                    Value = JsonConvert.SerializeObject(@event.Data)
                }, cancellationToken);

        }

        // We don't need any kind of callback, so the nullo is fine
        return NullChangeListener.Instance;
    }

}

// Just assume this is registered in your IoC container
public class KafkaProducerConfig
{
    public ProducerConfig? ProducerConfig { get; set; }
    public string? Topic { get; set; }
}

This time, it’s requiring IoC services injected through its constructor, so we’re going to use this mechanism to add it to Marten:

var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
    {
        opts.Connection(builder.Configuration.GetConnectionString("marten"));
    })
    // Marten also supports a Scoped lifecycle, and quietly forward Transient
    // to Scoped
    .AddSubscriptionWithServices<KafkaSubscription>(ServiceLifetime.Singleton, o =>
    {
        // This is a default, but just showing what's possible
        o.IncludeArchivedEvents = false;

        o.FilterIncomingEventsOnStreamType(typeof(Invoice));

        // Process no more than 10 events at a time
        o.Options.BatchSize = 10;
    })
    .AddAsyncDaemon(DaemonMode.HotCold);

using var host = builder.Build();
await host.StartAsync();

But there’s more!

The subscriptions run with Marten’s async daemon process, which just got a world of improvements in the Marten V7 release, including the ability to distribute work across running nodes in your application at runtime.

I didn’t show it in this blog post, but there are also facilities to configure whether a new subscription will start by working through all the events from the beginning of the system, or whether the subscription should start from the current sequence of the event store, or even go back to an explicitly stated sequence or timestamp, then play forward. Marten also has support — similar to its projection rebuild functionality — to rewind and replay subscriptions.

Wolverine already has specific integrations to utilize Marten event subscriptions to process events with Wolverine message handlers, or to forward events as messages through Wolverine publishing (Kafka? Rabbit MQ? Azure Service Bus?), or to do something completely custom with batches of events at a time (which I’ll demonstrate in the next couple weeks). I’ll post about that soon after that functionality gets fully documented with decent examples.

Lastly, and this is strictly in the hopefully near term future, there will be specific support for Marten subscriptions in the planned “Critter Stack Pro” add on product to Marten & Wolverine to:

  • Distribute subscription work across running nodes within your system — which actually exists in a crude, but effective form, and will absolutely be in Critter Stack Pro V1!
  • User interface monitoring and control pane to manually turn on and off subscriptions, review performance, and manually “rewind” subscriptions

Hopefully much more on this soon. It’s taken much longer than I’d hoped, but it’s still coming.

Strict Ordered Message Handling wth Wolverine

The feature was built for a current JasperFx Software client, and came with a wave of developments across both Marten and Wolverine to support a fairly complex, mission critical set of application integrations. The PostgreSQL transport new to Wolverine was part of this wave. Some time next week I’ll be blogging about the Marten event subscription capabilities that were built into Marten & Wolverine to support this client as well. The point being, JasperFx is wide open for business and we can help your shop succeed with challenging project work!

Wolverine now has the ability to support strict messaging order with its message listeners. Given any random listening endpoint in Wolverine, just add this directive below to make the message processing be strictly sequential (with the proviso that your error handling policies may impact the order on failures):

var host = await Host.CreateDefaultBuilder().UseWolverine(opts =>
{
    opts.UseRabbitMq().EnableWolverineControlQueues();
    
opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "listeners");

    opts.ListenToRabbitQueue("ordered")
        
        // This option is available on all types of Wolverine
        // endpoints that can be configured to be a listener
        .ListenWithStrictOrdering();
}).StartAsync();

Some notes about the ListenWithStrictOrdering() directive you might have:

  1. It’s supported with every external messaging broker that Wolverine supports, including Kafka, Azure Service Bus, AWS SQS, and Rabbit MQ. It is also supported with the two database backed transports (we have both kinds, Sql Server and PostgreSQL!)
  2. When this directive is applied, Wolverine will only make the listener for each endpoint (in the case above, the Rabbit MQ named “ordered”) be active on a single node within your application. Today that distribution is just crudely spreading out the “exclusive listeners” evenly across the whole application cluster. Definitely note that the strict ordering comes at the cost of reduced throughput, so use this feature wisely! Did I mention that JasperFx Software is here and ready to work with your company on Critter Stack projects?
  3. Every exclusive listener will quickly start up on a single node if WolverineOptions.Durability.Mode = DurabilityMode.Solo, and you may want to do that for local testing and development just to be a little quicker on cold starts
  4. The ListenWithStrictOrdering will make the internal worker queue (Wolverine uses an internal TPL Dataflow ActionBlock in these cases) for “buffered” or “durable” endpoints be strictly sequential
  5. You will have to have a durable message store configured for your application in order for Wolverine to perform the leadership election and “agent tracking” (what’s running where)

Summary

This is a powerful tool in the continually growing Wolverine tool belt. The strict ordering may also be used to alleviate some concurrency issues that some users have hit with event sourcing using Marten when a single stream may be receiving bursts of commands that impact the event stream. The leadership election and agent distribution in Wolverine, in conjunction with this “sticky” listener assignment, gives Wolverine a nascent ability for virtual actors that we will continue to exploit. More soon-ish!

Modular Monoliths and the “Critter Stack”

JasperFx Software is open for business and offering consulting services (like helping you craft modular monolith strategies!) and support contracts for both Marten and Wolverine so you know you can feel secure taking a big technical bet on these tools and reap all the advantages they give for productive and maintainable server side .NET development.

I’ve been thinking, discussing, and writing a bit lately about the whole “modular monolith” idea, starting with Thoughts on “Modular Monoliths” and continuing onto Actually Talking about Modular Monoliths. This time out I think I’d like to just put out some demos and thoughts about where Marten and Wolverine fit well into the modular monolith idea — and also some areas where I think there’s room for improvement.

First off, let’s talk about…

Modular Configuration

Both tools use the idea of a “configuration model” (what Marten Fowler coined a Semantic Model years ago) that is compiled and built from a combination of attributes in the code, explicit configuration, user supplied policies, and built in policies in baseline Marten or Wolverine as shown below with an indication of the order of precedence:

In code, when you as a user configure Marten and Wolverine inside of the Program file for your system like so:

builder.Services.AddMarten(opts =>
{
    var connectionString = builder.Configuration.GetConnectionString("marten");
    opts.Connection(connectionString);

    // This will create a btree index within the JSONB data
    opts.Schema.For<Customer>().Index(x => x.Region);
})
    // Adds Wolverine transactional middleware for Marten
    // and the Wolverine transactional outbox support as well
    .IntegrateWithWolverine();

builder.Host.UseWolverine(opts =>
{
    opts.CodeGeneration.TypeLoadMode = TypeLoadMode.Static;
    
    // Let's build in some durability for transient errors
    opts.OnException<NpgsqlException>().Or<MartenCommandException>()
        .RetryWithCooldown(50.Milliseconds(), 100.Milliseconds(), 250.Milliseconds());

    // Shut down the listener for whatever queue experienced this exception
    // for 5 minutes, and put the message back on the queue
    opts.OnException<MakeBelieveSubsystemIsDownException>()
        .PauseThenRequeue(5.Minutes());

    // Log the bad message sure, but otherwise throw away this message because
    // it can never be processed
    opts.OnException<InvalidInputThatCouldNeverBeProcessedException>()
        .Discard();
    
    
    
    // Apply the validation middleware *and* discover and register
    // Fluent Validation validators
    opts.UseFluentValidation();
    
    // Automatic transactional middleware
    opts.Policies.AutoApplyTransactions();
    
    // Opt into the transactional inbox for local 
    // queues
    opts.Policies.UseDurableLocalQueues();
    
    // Opt into the transactional inbox/outbox on all messaging
    // endpoints
    opts.Policies.UseDurableOutboxOnAllSendingEndpoints();
    
    // Connecting to a local Rabbit MQ broker
    // at the default port
    opts.UseRabbitMq();

    // Adding a single Rabbit MQ messaging rule
    opts.PublishMessage<RingAllTheAlarms>()
        .ToRabbitExchange("notifications");

    opts.LocalQueueFor<TryAssignPriority>()
        // By default, local queues allow for parallel processing with a maximum
        // parallel count equal to the number of processors on the executing
        // machine, but you can override the queue to be sequential and single file
        .Sequential()

        // Or add more to the maximum parallel count!
        .MaximumParallelMessages(10)

        // Pause processing on this local queue for 1 minute if there's
        // more than 20% failures for a period of 2 minutes
        .CircuitBreaker(cb =>
        {
            cb.PauseTime = 1.Minutes();
            cb.SamplingPeriod = 2.Minutes();
            cb.FailurePercentageThreshold = 20;
            
            // Definitely worry about this type of exception
            cb.Include<TimeoutException>();
            
            // Don't worry about this type of exception
            cb.Exclude<InvalidInputThatCouldNeverBeProcessedException>();
        });
    
    // Or if so desired, you can route specific messages to 
    // specific local queues when ordering is important
    opts.Policies.DisableConventionalLocalRouting();
    opts.Publish(x =>
    {
        x.Message<TryAssignPriority>();
        x.Message<CategoriseIncident>();

        x.ToLocalQueue("commands").Sequential();
    });
});

The nested lambdas in AddMarten() and UseWolverine() are configuring the MartenOptions and WolverineOptions models respectively (the “configuration model” in that diagram above).

I’m not aware of any one commonly used .NET idiom for building modular configuration, but I do commonly see folks using extension methods for IServiceCollection or IHostBuilder to segregate configuration that’s specific to a single module, and that’s what I think I’d propose. Assuming that we have a module in our modular monolith system for handling workflow around “incidents”, there might be an extension method something like this:

public static class IncidentsConfigurationExtensions
{
    public static WebApplicationBuilder AddIncidentModule(this WebApplicationBuilder builder)
    {
        // Whatever other configuration, services, et al
        // we need for just the Incidents module
        
        // Extra Marten configuration
        builder.Services.ConfigureMarten(opts =>
        {
            // I'm just adding an index for a document type within
            // this module
            opts.Schema.For<IncidentDetails>()
                .Index(x => x.Priority);
            
            // Purposely segregating all document types in this module's assembly
            // to a separate database schema
            opts.Policies.ForAllDocuments(m =>
            {
                if (m.DocumentType.Assembly == typeof(IncidentsConfigurationExtensions).Assembly)
                {
                    m.DatabaseSchemaName = "incidents";
                }
            });
        });
        
        return builder;
    }
}

Which would be called from the overall system’s Program file like so:

var builder = WebApplication.CreateBuilder(args);

builder.AddIncidentModule();

// Much more below...

Between them, the two main Critter Stack tools have a lot of support for modularity through:

All of the facilities I described above can be used to separate specific configuration for different modules within the module code itself.

Modular Monoliths and Backing Persistence

In every single experience report you’ll ever find about a team trying to break up and modernize a large monolithic application the authors will invariably say that breaking apart the database was the single most challenging task. If we are really doing things better this time with the modular monolith approach, we’d probably better take steps ahead of time to make it easier to extract services by attempting to keep the persistence for each module at least somewhat decoupled from the persistence of other modules.

Going even farther in terms of separation, it’s not unlikely that some modules have quite different persistence needs and might be better served by using a completely different style of persistence than the other modules. Just as an example, one of our current JasperFx Software clients has a large monolithic application where some workflow-centric modules would be a good fit for an event sourcing approach, while other modules are more CRUD centric or reporting-centric where a straight up RDBMS approach is probably much more appropriate.

So let’s finally bring Marten and Wolverine into the mix and talk about the Good, the Bad, and the (sigh) Ugly of how the Critter Stack fits into modular monoliths:

wah, wah, wah…

Let’s start with a positive. Marten sits on top of the very robust PostgreSQL database. So in addition to Marten’s ability to use PostgreSQL as a document database and as an event store, PostgreSQL out of the box is a rock solid relational database. Heck, PostgreSQL even has some ability to be used as either a graph database! The point is that using the Marten + PostgreSQL combination gives you a lot of flexibility in terms of persistence style between different modules in a modular monolith without introducing a lot more infrastructure. Moreover, Wolverine can happily utilize its PostgreSQL-backed transactional outbox with both Entity Framework Core and Marten targeting the same PostgreSQL database in the same application.

Continuing with another positive, let’s say that we want to create some logical separation between our modules in the database, and one way to do so would be to simply keep Marten documents in separate database schemas for each module. Repeating a code sample from above, you can see that configuration below:

    public static WebApplicationBuilder AddIncidentModule(this WebApplicationBuilder builder)
    {
        // Whatever other configuration, services, et al
        // we need for just the Incidents module
        
        // Extra Marten configuration
        builder.Services.ConfigureMarten(opts =>
        {
            // Purposely segregating all document types in this module's assembly
            // to a separate database schema
            opts.Policies.ForAllDocuments(m =>
            {
                if (m.DocumentType.Assembly == typeof(IncidentsConfigurationExtensions).Assembly)
                {
                    m.DatabaseSchemaName = "incidents";
                }
            });
        });
        
        return builder;
    }

So, great, the storage for Marten documents could easily be segregated by schema. Especially considering there’s little or no referential integrity relationships between Marten document tables, it should be relatively easy to move these document tables to completely different databases later!

And with that, let’s move more into “Bad” or hopefully not too “Ugly” territory.

The event store data in Marten is all in one single set of tables (mt_streams and mt_events). So every module utilizing Marten’s event sourcing could be intermingling their events in just these tables through the one single, AddMarten() store for the application’s IHost. You could depend on marking event streams by their aggregate type like so:

public static async Task start_stream(IDocumentSession session)
{
    // the Incident type argument is strictly a marker for
    // Marten
    session.Events.StartStream<Incident>(new IncidentLogged());
    await session.SaveChangesAsync();
}

I think we could ameliorate this situation with a couple future changes:

  1. A new flag in Marten that would make it mandatory to mark every new event stream with an aggregate type specifically to make it easier to separate the events later to extract a service and its event storage
  2. Some kind of helper to move event streams from one database to another. It’s just not something we have in our tool belt at the moment

Of course, it would also help immeasurably if we had a way to split the event store storage for different types of event streams, but somehow that idea has never gotten any traction within Marten and never rises to the level of a high priority. Most of our discussions about sharding or partitioning the event store data has been geared around scalability — which is certainly an issue here too of course.

Marten also has its concept of “separate stores” that was meant to allow an application to interact with multiple Marten-ized databases from a single .NET process. This could be used with modular monoliths to segregate the event store data, even if targeting the same physical database in the end. The very large downside to this approach is that Wolverine’s Marten integration does not today do anything with the separate store model. So no Wolverine transactional middleware, event forwarding, transactional inbox/outbox integration, and no aggregate handler workflow. So basically everything about the full “Critter Stack” integration that makes that tooling the single most productive event sourcing development experience in all of .NET (in my obviously biased opinion). Ugly.

Randomly, I heard an NPR interview with Eli Wallach very late in his life who was the actor who played the “Ugly” character in the famous western, and I could only describe him as effusively jolly. So basically a 180 degree difference from his character!

Module to Module Communication

I’ve now spent much more time on this post than I had allotted, so it’s time to go fast…

In my last post I used this diagram to illustrate the risk of coupling modules through direct usage of internals (the red arrows):

Instead of the red arrows everywhere above, I think I’m in favor of trying to limit the module to module communication to using some mix of a “mediator” tool or an in memory message bus between modules. That’s obviously going to come with some overhead, but I think (hope) that overhead is a net positive.

For a current client, I’m recommending they further utilize MediatR as they move a little more in the direction of modularity in their current monolith. For greenfield codebases, I’d recommend Wolverine instead because I think it does much, much more.

First, Wolverine has a full set of functionality to be “just a Mediator” to decouple modules from too much of the internals of another module. Secondly, Wolverine has a lot of support for background processing through local, in memory queues that could be very advantageous in modular monoliths where Wolverine can de facto be an in memory message bus. Moreover, Wolverine’s main entry point usage is identical for messages processed locally versus messages published through external messaging brokers to external processes:

    public static async Task using_message_bus(IMessageBus bus)
    {
        // Use Wolverine as a "mediator"
        // This is normally executed inline, in process, but *could*
        // also be invoking this command in an external process
        // and waiting for the success or failure ack
        await bus.InvokeAsync(new CategoriseIncident());


        // Use Wolverine for asynchronous messaging. This could 
        // start by publishing to a local, in process queue, or
        // it could be routed to an external message broker -- but
        // the calling code doesn't have to know that
        await bus.PublishAsync(new CategoriseIncident());
    }

The point here is that Wolverine can potentially set your modular monolith architecture up so that it’s possible to extract or move functionality out into separate services later.

All that being said about messaging or mediator tools, some of the ugliest systems I’ve ever seen utilized messaging or proto-Mediatr command handlers between logical modules. Those systems had code that was almost indecipherable by introducing too many layers and far too much internal messaging. I think I’d say that some of the root cause of the poor system code was from getting the bounded context boundaries wrong so that the messaging was too chatty. Using high ceremony anti-corruption layers also adds a lot of mental overhead to follow information flow through several mapping transformations. One of these systems was using the iDesign architectural approach that I think naturally leads to very poorly factored software architectures and too much harmful code ceremony. I do not recommend.

I guess my only point here is that no matter what well intentioned advice people like me try to give, or any theory of how to make code more maintainable any of us might have, if you find yourself saying to yourself about code that “this is way harder than it should be” you should challenge the approach and look for something different — even if that just leads you right back to where you are now if the alternatives don’t look any better.

One important point here about both modular monoliths or a micro service strategy or a mix of the two: if two or more services/modules are chatty between themselves and very frequently have to be modified at the same time, they’re best described as a single bounded context and should probably be combined into a single service or module.

Summary

Anyway, that’s enough from me on this subject for now, and this took way longer than I meant to spend on it. Time to get my nose back to the grindstone. I am certainly very open to any feedback about the Critter Stack tools limitations for modular monolith construction and any suggestions or requests to improve those tools.

Testing Asynchronous Projections in Marten

Hey, did you know that JasperFx Software offers formal support plans for Marten and Wolverine? Not only are we making the “Critter Stack” tools be viable long term options for your shop, we’re also interested in hearing your opinions about the tools and how they should change. We’re also certainly open to help you succeed with your software development projects on a consulting basis whether you’re using any part of the Critter Stack or some completely different .NET server side tooling.

As a kind of follow up to my post yesterday on Wolverine’s Baked In Integration Testing Support, I want to talk about some improvements to Marten that just went live in Marten 7.5 that are meant to make asynchronous projections much easier to test.

First off, let’s say that you have a simplistic document that can “self-aggregate” itself as a “Snapshot” in Marten like this:

public record InvoiceCreated(string Description, decimal Amount);

public record InvoiceApproved;
public record InvoiceCancelled;
public record InvoicePaid;
public record InvoiceRejected;

public class Invoice
{
    public Invoice()
    {
    }

    public static Invoice Create(IEvent<InvoiceCreated> created)
    {
        return new Invoice
        {
            Amount = created.Data.Amount,
            Description = created.Data.Description,

            // Capture the timestamp from the event
            // metadata captured by Marten
            Created = created.Timestamp,
            Status = InvoiceStatus.Created
        };
    }

    public int Version { get; set; }

    public decimal Amount { get; set; }
    public string Description { get; set; }
    public Guid Id { get; set; }
    public DateTimeOffset Created { get; set; }
    public InvoiceStatus Status { get; set; }

    public void Apply(InvoiceCancelled _) => Status = InvoiceStatus.Cancelled;
    public void Apply(InvoiceRejected _) => Status = InvoiceStatus.Rejected;
    public void Apply(InvoicePaid _) => Status = InvoiceStatus.Paid;
    public void Apply(InvoiceApproved _) => Status = InvoiceStatus.Approved;
}

For asynchronous projections of any kind, we have a little bit of complication for testing. In a classic “Arrange, Act, Assert” test workflow, we’d like to exercise our projection — and mind you, I strongly recommend that testing happen within its integration with Marten rather than some kind of solitary unit tests with fakes — with a workflow like this:

  1. Pump in some new events to Marten
  2. Somehow magically wait for Marten’s asynchronous daemon running in a background thread progress to the point where it’s handled all of our newly appended events for all known, running projections
  3. Load the expected documents that should have been persisted or updated from our new events by the projections running in the daemon, and run some assertions on the expected system state

For right now, I want to worry about the second bullet point and introduce a new (old, but it actually works correctly now) WaitForNonStaleProjectionDataAsync API introduced in Marten 7.5. You can see the new API used in this test from the new documentation on Testing Projections:

[Fact]
public async Task test_async_aggregation_with_wait_for()
{
    // In your tests, you would most likely use the IHost for your
    // application as it is normally built
    using var host = await Host.CreateDefaultBuilder()
        .ConfigureServices(services =>
        {
            services.AddMarten(opts =>
                {
                    opts.Connection(
                        "Host=localhost;Port=5432;Database=marten_testing;Username=postgres;password=postgres;Command Timeout=5");
                    opts.DatabaseSchemaName = "incidents";

                    // Notice that the "snapshot" is running inline
                    opts.Projections.Snapshot<Invoice>(SnapshotLifecycle.Async);
                })

                // Using Solo in tests will help it start up a little quicker
                .AddAsyncDaemon(DaemonMode.Solo);
        }).StartAsync();

    var store = host.Services.GetRequiredService<IDocumentStore>();

    var invoiceId = Guid.NewGuid();

    // Pump in events
    using (var session = store.LightweightSession())
    {
        session.Events.StartStream<Invoice>(invoiceId, new InvoiceCreated("Blue Shoes", 112.24m));
        await session.SaveChangesAsync();

        session.Events.Append(invoiceId,new InvoiceApproved());
        session.Events.Append(invoiceId,new InvoicePaid());
        await session.SaveChangesAsync();
    }

    // Now, this is going to pause here in this thread until the async daemon
    // running in our IHost is completely caught up to at least the point of the
    // last event captured at the point this method was called
    await store.WaitForNonStaleProjectionDataAsync(5.Seconds());

    // NOW, we should expect reliable results by just loading the already
    // persisted documents built by rebuilding the projection
    await using var query = store.QuerySession();

    // Load the document that was "projected" from the events above
    // and immediately persisted to the document store
    var invoice = await query.LoadAsync<Invoice>(invoiceId);

    // Run assertions
    invoice.Description.ShouldBe("Blue Shoes");
    invoice.Status.ShouldBe(InvoiceStatus.Paid);
}

Time. What about System Time?

See Andrew Lock’s blog post Avoiding flaky tests with TimeProvider and ITimer for more information on using TimeProvider in tests.

In the example projection, I’ve been capturing the timestamp in the Invoice document from the Marten event metadata:

public static Invoice Create(IEvent<InvoiceCreated> created)
{
    return new Invoice
    {
        Amount = created.Data.Amount,
        Description = created.Data.Description,

        // Capture the timestamp from the event
        // metadata captured by Marten
        Created = created.Timestamp,
        Status = InvoiceStatus.Created
    };
}

But of course, if that timestamp has some meaning later on and you have any kind of business rules that may need to key off that time, it’s very helpful to be able to control the timestamps that Marten is assigning to create predictable automated tests. As of Marten 7.5, Marten uses the newer .NET TimeProvider behind the scenes, and you can replace it in testing like so:

[Fact]
public async Task test_async_aggregation_with_wait_for_and_fake_time_provider()
{
    // Hang on to this for later!!!
    var eventsTimeProvider = new FakeTimeProvider();

    // In your tests, you would most likely use the IHost for your
    // application as it is normally built
    using var host = await Host.CreateDefaultBuilder()
        .ConfigureServices(services =>
        {
            services.AddMarten(opts =>
                {
                    opts.Connection(
                        "Host=localhost;Port=5432;Database=marten_testing;Username=postgres;password=postgres;Command Timeout=5");
                    opts.DatabaseSchemaName = "incidents";

                    // Notice that the "snapshot" is running inline
                    opts.Projections.Snapshot<Invoice>(SnapshotLifecycle.Async);

                    opts.Events.TimeProvider = eventsTimeProvider;
                })

                // Using Solo in tests will help it start up a little quicker
                .AddAsyncDaemon(DaemonMode.Solo);
        }).StartAsync();

    var store = host.Services.GetRequiredService<IDocumentStore>();

    var invoiceId = Guid.NewGuid();

    // Pump in events
    using (var session = store.LightweightSession())
    {
        session.Events.StartStream<Invoice>(invoiceId, new InvoiceCreated("Blue Shoes", 112.24m));
        await session.SaveChangesAsync();

        session.Events.Append(invoiceId,new InvoiceApproved());
        session.Events.Append(invoiceId,new InvoicePaid());
        await session.SaveChangesAsync();
    }

    // Now, this is going to pause here in this thread until the async daemon
    // running in our IHost is completely caught up to at least the point of the
    // last event captured at the point this method was called
    await store.WaitForNonStaleProjectionDataAsync(5.Seconds());

    // NOW, we should expect reliable results by just loading the already
    // persisted documents built by rebuilding the projection
    await using var query = store.QuerySession();

    // Load the document that was "projected" from the events above
    // and immediately persisted to the document store
    var invoice = await query.LoadAsync<Invoice>(invoiceId);

    // Run assertions, and we'll use the faked timestamp
    // from our time provider
    invoice.Created.ShouldBe(eventsTimeProvider.Start);
}

In the sample above, I used the FakeTimeProvider from the Microsoft.Extensions.TimeProvider.Testing Nuget package.

Summary

We take testability and automated testing very seriously throughout the entire “Critter Stack.” The testing of asynchronous projections has long been a soft spot that we hope is improved by the new capabilities in this post. As always, feel free to pop into the Critter Stack Discord for any questions.

“Partial” Document Updates in Marten 7

Just continuing a loose series about recent improvements in the big Marten 7.0 release:

As part of the big Marten 7 release a couple weeks ago, core team member Babu Annamalai made a huge contribution with a brand new model for making “partial” document updates that’s backed with native PostgreSQL operations. See Babu’s post Marten native partial updates – patching for much more information and details about everything that’s supported now.

Let’s put this into perspective with a quick sample. Here’s a simplistic Wolverine handler that updates a single member on a stored document:

 public static async Task Handle(ApproveInvoice command, IDocumentSession session)
    {
        // Load the invoice
        var invoice = await session.LoadAsync<Invoice>(command.InvoiceId);
        invoice.Approved = true;
        
        // Tell Marten to persist the new version
        session.Store(invoice);
        
        // Commit the one pending change
        await session.SaveChangesAsync();
    }

In the code above, I’m loading the whole document, changing one property, and committing the changes back to the database. Under the covers we’re making two round trips to the database, deserializing the starting state of the Invoice document, then serializing the end state of the Invoice document.

With the new patching support, let’s rewrite that handler to this:

 public static async Task Handle(ApproveInvoice command, IDocumentSession session)
    {
        // Tell Marten what to change
        session.Patch<Invoice>(command.InvoiceId).Set(x => x.Approved, true);

        // Commit the one pending change
        await session.SaveChangesAsync();
    }

In that second version, we’re doing a lot less work. There’s only one database call to overwrite the Approved value within the existing Invoice document. While it’s a nontrivial operation to reach inside the JSON in the database, we’re not having to do any serialization in memory.

Marten technically had this feature set already, but our older support depended on the PLv8 extension to PostgreSQL that’s more or less deprecated now. Babu’s work for Marten 7 brings this very important feature set back into play for the majority of users who don’t want to or can’t utilize the older PLv8 backed support.