A Vision for Stateful Resources at Development or Deployment Time

As is not atypical, I found a couple little issues with both Oakton and Jasper in the course of writing this post. To that end, if you want to use the functionality shown here yourself, just make sure you’re on at least Oakton 4.6.1 and Jasper 2.0-alpha-3.

I’ve spit out quite a bit of blogging content the past several weeks on both Marten and Jasper:

I’ve been showing some new integration between Jasper, Marten, and Rabbit MQ. This time out, I want to show the new “stateful resource” model in a third tool named Oakton to remove development and deployment time friction when using these tools on a software project. Oakton itself is a command line processing tool, that more importantly, can be used to quickly add command line utilities directly to your .Net executable.

Drawing from a sample project in the Jasper codebase, here’s the configuration for an issue tracking application that uses Jasper, Marten, and RabbitMQ with Jasper’s inbox/outbox integration using Postgresql:

using IntegrationTests;
using Jasper;
using Jasper.Persistence.Marten;
using Jasper.RabbitMQ;
using Marten;
using MartenAndRabbitIssueService;
using MartenAndRabbitMessages;
using Oakton;
using Oakton.Resources;

var builder = WebApplication.CreateBuilder(args);

builder.Host.ApplyOaktonExtensions();

builder.Host.UseJasper(opts =>
{
    // I'm setting this up to publish to the same process
    // just to see things work
    opts.PublishAllMessages()
        .ToRabbitExchange("issue_events", exchange => exchange.BindQueue("issue_events"))
        .UseDurableOutbox();

    opts.ListenToRabbitQueue("issue_events").UseInbox();

    opts.UseRabbitMq(factory =>
    {
        // Just connecting with defaults, but showing
        // how you *could* customize the connection to Rabbit MQ
        factory.HostName = "localhost";
        factory.Port = 5672;
    });
});

// This is actually important, this directs
// the app to build out all declared Postgresql and
// Rabbit MQ objects on start up if they do not already
// exist
builder.Services.AddResourceSetupOnStartup();

// Just pumping out a bunch of messages so we can see
// statistics
builder.Services.AddHostedService<Worker>();

builder.Services.AddMarten(opts =>
{
    // I think you would most likely pull the connection string from
    // configuration like this:
    // var martenConnectionString = builder.Configuration.GetConnectionString("marten");
    // opts.Connection(martenConnectionString);

    opts.Connection(Servers.PostgresConnectionString);
    opts.DatabaseSchemaName = "issues";

    // Just letting Marten know there's a document type
    // so we can see the tables and functions created on startup
    opts.RegisterDocumentType<Issue>();

    // I'm putting the inbox/outbox tables into a separate "issue_service" schema
}).IntegrateWithJasper("issue_service");

var app = builder.Build();

app.MapGet("/", () => "Hello World!");

// Actually important to return the exit code here!
return await app.RunOaktonCommands(args);

Just to describe what’s going on up above, the .NET code above is going to depend on:

  1. A Postgresql database with the necessary tables and functions that Marten needs to be able to persist issue data
  2. Additional tables in the Postgresql database for persisting the outgoing and incoming messages in the inbox/outbox usage
  3. A Rabbit MQ broker with the necessary exchanges, queues, and bindings for the issue application as it’s configured

In a perfect world, scratch that, in an acceptable world, a developer should be able to start from a fresh clone of this issue tracking codebase and be able to run the system and/or any integration tests locally almost immediately with very minimal friction along the way.

At this point, I’m a big fan of trying to run development infrastructure in Docker where it’s easy to spin things up on demand, and just as quickly shut it all down when you no longer need it. To that end, let’s just say we’ve got a docker-compose.yml file for both Postgresql and Rabbit MQ. Having that, I’ll type docker compose up -d from the command line to spin up both infrastructure elements.

Cool, but now I need to have the database schemas built out with Marten tables and the Jasper inbox/outbox tables plus the Rabbit MQ queues for the application. This is where Oakton and its new “stateful resource” model comes into play. Jasper’s Rabbit MQ plugin and the inbox/outbox storage both expose Oakton’s IStatefulResource interface for easy setup. Likewise, Marten has support for this model as well (in this case it’s just a very slim wrapper around Marten’s longstanding database schema management functionality).

If you’re not familiar with this, the double dash “–” argument in dotnet run helps .NET to know which arguments (“run”) apply to the dotnet executable and the arguments to the right of the “–” that are passed into the application itself.

Opening up the command line terminal of your preference to the root of the project, I type dotnet run -- help to see what options are available in our Jasper application through the usage of Oakton:

There’s a couple commands up there that will help us out with the database management, but I want to focus on the resources command. To that end, I’m going to type dotnet run -- resources list just to see what resources our issue tracker application has:

Just through the configuration up above, the various Jasper elements have registered “stateful resource” adapters to for Oakton for the underlying Marten database, the inbox/outbox data (Envelope Storage above), and Rabbit MQ.

In the next case, I’m going to use dotnet run -- resources check to see if all our infrastructure is configured the way our application needs — and I’m going to do this without first starting the database or the message broker, so this should fail spectacularly!

Here’s the summary output:

If you were to scroll up a bit, you’d see a lot of exceptions thrown describing what’s wrong (helpfully color coded by Spectre.Console) including this one explaining that an expected Rabbit MQ queue is missing:

So that’s not good. No worries though, I’ll start up the docker containers, then go back to the command line and type:

dotnet run -- resources setup

And here’s some of the output:

Forget the command line…

If you’ll notice the single line of code `builder.Services.AddResourceSetupOnStartup();` in the bootstrapping code above, that’s adding a hosted service to our application from Oakton that will verify and apply all configured set up to the known Marten database, the inbox/outbox storage, and the required Rabbit MQ objects. No command line chicanery necessary. I’m hopeful that this will enable developers to be more productive by dealing with this kind of environmental setup directly inside the application itself rather than recreating the definition of what’s necessary in external scripts.

This was a fair amount of work, so I’d be very welcome to any kind of feedback here.

Using Rabbit MQ with Jasper

I’ve spit out quite a bit of blogging content the past several weeks on both Marten and Jasper:

As a follow up today, I’d like to introduce Rabbit MQ integration with Jasper as the actual transport between processes. I’m again going to use a “Ping/Pong” sample of sending messages between two processes (you may want to refer to my previous ping/pong post). You can find the sample code for this post on GitHub.

Sending messages betwixt processes

The message types and Jasper handlers are basically identical to those in my last post if you want a reference. In the Pinger application, I’ve added a reference to the Jasper.RabbitMQ Nuget library, which adds transitive references to Jasper itself and the Rabbit MQ client library. In the application bootstrapping, I’ve got this to connect to Rabbit MQ, send Ping messages to a Rabbit MQ exchange named *pings*, and add a hosted service just to send a new Ping message once a second:

using System.Net.NetworkInformation;
using Jasper;
using Jasper.RabbitMQ;
using Oakton;
using Pinger;

return await Host.CreateDefaultBuilder(args)
    .UseJasper(opts =>
    {
        // Listen for messages coming into the pongs queue
        opts
            .ListenToRabbitQueue("pongs")

            // This won't be necessary by the time Jasper goes 2.0
            // but for now, I've got to help Jasper out a little bit
            .UseForReplies();

        // Publish messages to the pings queue
        opts.PublishMessage<Ping>().ToRabbitExchange("pings");

        // Configure Rabbit MQ connection properties programmatically
        // against a ConnectionFactory
        opts.UseRabbitMq(rabbit =>
        {
            // Using a local installation of Rabbit MQ
            // via a running Docker image
            rabbit.HostName = "localhost";
        })
            // Directs Jasper to build any declared queues, exchanges, or
            // bindings with the Rabbit MQ broker as part of bootstrapping time
            .AutoProvision();

        // This will send ping messages on a continuous
        // loop
        opts.Services.AddHostedService<PingerService>();
    }).RunOaktonCommands(args);

On the Ponger side, I’ve got this setup:

using Baseline.Dates;
using Jasper;
using Jasper.RabbitMQ;
using Oakton;

return await Host.CreateDefaultBuilder(args)
    .UseJasper(opts =>
    {
        // Going to listen to a queue named "pings", but disregard any messages older than
        // 15 seconds
        opts.ListenToRabbitQueue("pings", queue => queue.TimeToLive(15.Seconds()));

        // Configure Rabbit MQ connections and optionally declare Rabbit MQ
        // objects through an extension method on JasperOptions.Endpoints
        opts.UseRabbitMq() // This is short hand to connect locally
            .DeclareExchange("pings", exchange =>
            {
                // Also declares the queue too
                exchange.BindQueue("pings");
            })
            .AutoProvision()

            // Option to blow away existing messages in
            // all queues on application startup
            .AutoPurgeOnStartup();
    })
    .RunOaktonCommands(args);

When running the applications side by side, I’ll get output like this from Pinger:

Got pong #55
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message PongMessage#01818741-30c2-4ab7-9fa8-d7870d194754 from rabbitmq://queue/pings
info: Jasper.Runtime.JasperRuntime[204]
      Sending agent for rabbitmq://exchange/pings has resumed
Got pong #56
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message PongMessage#01818741-34b3-4d34-910a-c9fc4c191bfc from rabbitmq://queue/pings
info: Jasper.Runtime.JasperRuntime[204]
      Sending agent for rabbitmq://exchange/pings has resumed

and this from Ponger:

Got ping #57
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message PingMessage#01818741-389e-4ddd-96cf-fb8a76e4f5f1 from rabbitmq://queue/pongs
info: Jasper.Runtime.JasperRuntime[204]
      Sending agent for rabbitmq://queue/pongs has resumed
Got ping #58
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message PingMessage#01818741-3c8e-469f-a330-e8c9e173dc40 from rabbitmq://queue/pongs
info: Jasper.Runtime.JasperRuntime[204]
      Sending agent for rabbitmq://queue/pongs has resumed

and of course, since Jasper is a work in progress, there’s a bunch of erroneous or at least misleading messages about a sending agent getting resumed that I need to take care of…

So what’s in place right now that can be inferred from the code samples above? The Jasper + Rabbit MQ integration today provides:

  • A way to subscribe to incoming messages from a Rabbit MQ queue. It’s not shown here, but you have all the options you’d expect to configure Rabbit MQ prefetch counts, message handling parallelism, and opt into persistent, inbox messaging
  • Mechanisms to declare Rabbit MQ queues, exchanges, and bindings as well as the ability to fine tune Rabbit MQ options for these objects
  • Create declared Rabbit MQ objects at system startup time
  • Automatically purge old messages out of Rabbit MQ queues on start up. It’s not shown here, but you can do this on a queue by queue basis as well
  • Create publishing rules from a Jasper process to direct outgoing messages to Rabbit MQ exchanges or directly to queues

What’s also available but not shown here is opt in, conventional routing to route outgoing messages to Rabbit MQ exchanges based on the message type names or to automatically declare and configure subscriptions to Rabbit MQ queues based on the message types of the messages that this process handles. This conventional routing is suspiciously similar (copied from) MassTransit because:

  1. I like MassTransit’s conventional routing functionality
  2. Bi-directional wire compatibility mostly out of the box with MassTransit is a first class goal for Jasper 2.0

Why Rabbit MQ? What about other things?

Jasper has focused on Rabbit MQ as the main transport option out of the box because that’s what my shop already uses (and I’m most definitely trying to get us to use Jasper at work), it has a great local development option through Docker hosting, and frankly because it’s easy to use. Jasper also supports Pulsar of all things, and will definitely have a Kafka integration before 2.0. Other transports will be added based on user requests, and that’s also a good place for other folks to get involved.

I had someone asking about using Jasper with Amazon SQS, and that’s such a simple model, that I might build that out as a reference for other folks.

What’s up next?

Now that I’ve introduced Jasper, its outbox functionality, and its integration with Rabbit MQ, my next post is visiting the utilities baked into Jasper itself for dealing with stateful resources like databases or broker configuration at development and production time. This is going to include a lot of command line functionality baked into your application.

Ping/Pong Jasper Style

I’ve spit out quite a bit of blogging content the past several weeks on both Marten and Jasper:

For this post though, I just want to show off a very small sample “Ping/Pong” example of using Jasper to do messaging between different .NET processes. All of the code is drawn from a brand new sample on GitHub. I’m just using Jasper’s little built in TCP transport that’s probably just going to be useful for local development, but it happily serves for this sample.

First off, I’m going to build out a very small shared library just to hold the messages we’re going to exchange:

public class Ping
{
    public int Number { get; set; }
}

public class Pong
{
    public int Number { get; set; }
}

snippet source | anchor

And next, I’ll start a small Pinger service with the dotnet new worker template. There’s just three pieces of code, starting with the boostrapping code:

using Jasper;
using Jasper.Transports.Tcp;
using Messages;
using Oakton;
using Pinger;

return await Host.CreateDefaultBuilder(args)
    .UseJasper(opts =>
    {
        // Using Jasper's built in TCP transport

        // listen to incoming messages at port 5580
        opts.ListenAtPort(5580);

        // route all Ping messages to port 5581
        opts.PublishMessage<Ping>().ToPort(5581);

        // Registering the hosted service here, but could do
        // that with a separate call to IHostBuilder.ConfigureServices()
        opts.Services.AddHostedService<Worker>();
    })
    .RunOaktonCommands(args);

snippet source | anchor

and the Worker class that’s just going to publish a new Ping message once a second:

using Jasper;
using Messages;

namespace Pinger;

public class Worker : BackgroundService
{
    private readonly ILogger<Worker> _logger;
    private readonly IMessagePublisher _publisher;

    public Worker(ILogger<Worker> logger, IMessagePublisher publisher)
    {
        _logger = logger;
        _publisher = publisher;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var pingNumber = 1;

        while (!stoppingToken.IsCancellationRequested)
        {

            await Task.Delay(1000, stoppingToken);
            _logger.LogInformation("Sending Ping #{Number}", pingNumber);
            await _publisher.PublishAsync(new Ping { Number = pingNumber });
            pingNumber++;
        }
    }
}

snippet source | anchor

and lastly a message handler for any Pong messages coming back from the Ponger we’ll build next:

using Messages;

namespace Pinger;

public class PongHandler
{
    public void Handle(Pong pong, ILogger<PongHandler> logger)
    {
        logger.LogInformation("Received Pong #{Number}", pong.Number);
    }
}

snippet source | anchor

Okay then, next let’s move on to building the Ponger application. This time I’ll use dotnet new console to start the new project, then add references to our Messages library and Jasper itself. For the bootstrapping, add this code:

using Jasper;
using Jasper.Transports.Tcp;
using Microsoft.Extensions.Hosting;
using Oakton;

return await Host.CreateDefaultBuilder(args)
    .UseJasper(opts =>
    {
        // Using Jasper's built in TCP transport
        opts.ListenAtPort(5581);
    })
    .RunOaktonCommands(args);

snippet source | anchor

And a message handler for the Ping messages that will turn right around and shoot a Pong response right back to the original sender:

using Jasper;
using Messages;
using Microsoft.Extensions.Logging;

namespace Ponger;

public class PingHandler
{
    public ValueTask Handle(Ping ping, ILogger<PingHandler> logger, IExecutionContext context)
    {
        logger.LogInformation("Got Ping #{Number}", ping.Number);
        return context.RespondToSenderAsync(new Pong { Number = ping.Number });
    }
}

snippet source | anchor

public static class PingHandler
{
    // Simple message handler for the PingMessage message type
    public static ValueTask Handle(
        // The first argument is assumed to be the message type
        PingMessage message,

        // Jasper supports method injection similar to ASP.Net Core MVC
        // In this case though, IMessageContext is scoped to the message
        // being handled
        IExecutionContext context)
    {
        ConsoleWriter.Write(ConsoleColor.Blue, $"Got ping #{message.Number}");

        var response = new PongMessage
        {
            Number = message.Number
        };

        // This usage will send the response message
        // back to the original sender. Jasper uses message
        // headers to embed the reply address for exactly
        // this use case
        return context.RespondToSenderAsync(response);
    }
}

snippet source | anchor

If I start up first the Ponger service, then the Pinger service, I’ll see console output like this from Pinger:

info: Pinger.Worker[0]
      Sending Ping #11
info: Pinger.PongHandler[0]
      Received Pong #1
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message Pong#01817277-f692-42d5-a3e4-35d9b7d119fb from tcp://localhost:5581/
info: Pinger.PongHandler[0]
      Received Pong #2
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message Pong#01817277-f699-4340-a59d-9616aee61cb8 from tcp://localhost:5581/
info: Pinger.PongHandler[0]
      Received Pong #3
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message Pong#01817277-f699-48ea-988b-9e835bc53020 from tcp://localhost:5581/
info: Pinger.PongHandler[0]

and output like this in the Ponger process:

info: Ponger.PingHandler[0]
      Got Ping #1
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message Ping#01817277-d673-4357-84e3-834c36f3446c from tcp://localhost:5580/
info: Ponger.PingHandler[0]
      Got Ping #2
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message Ping#01817277-da61-4c9d-b381-6cda92038d41 from tcp://localhost:5580/
info: Ponger.PingHandler[0]
      Got Ping #3

Building a More Useful Outbox for Reliable Messaging

I blog not at all, or a lot at once. Nothing in between. In the past couple weeks I’ve written:

As a follow up to those posts, Jasper comes with a robust transactional inbox/outbox implementation that partially runs as a background service using .NET’s hosted service mechanism. Jasper’s version of this functionality differs quite a bit in both implementation from other toolsets in the .NET space. Here’s a rundown so far:

  • When used, Jasper is persisting the outgoing message, metadata, and the intended destination in the hosting service’s application database (or at least that’s the way it’s intended to work).
  • A background service is running in every node to ensure that these outgoing messages don’t get marooned or fail to be sent out
  • Pending messages do not get “lost” if the process dies or is shut down unexpectedly
  • Pending messages are resumed if a process is just restarted
  • If running in a cluster, the other nodes can detect that another node is no longer running, and take on the persisted, outbound messages from that original node
  • So far, the outbox has support for either Postgresql or Sql Server as the backing database.
  • Not surprisingly, there’s much deeper integration with Marten, but there’s also support for using the outbox with EF Core
  • Jasper’s outbox is actually usable completely outside of Jasper command handlers. You might have to write a little more explicit code to use it within ASP.Net Core controllers or Minimal API methods for example, but it’s perfectly usable in those scenarios. This is unique to Jasper and currently unsupported in the most popular service bus tools in .NET at the time I’m writing this post.
  • Jasper comes with some database schema management baked in for the necessary database objects needed to integrate the outbox to make things easier to get up and going.
  • There’s also some command line tooling baked into Jasper that will hopefully be helpful at development or deployment time to manage the persisted message storage

Now, on to some sample code!

To reuse a code sample from earlier this week, let’s say we’re building a new web service for potential customers to make online reservations for their favorite restaurants. We’re also going to be using Jasper as a command and/or message bus (and Marten as the backing persistence infrastructure), so we might have a command handler for confirming a reservation like this simple example:

public static class ConfirmReservationHandler
{
    [Transactional]
    public static async Task<ReservationConfirmed> Handle(ConfirmReservation command, IDocumentSession session)
    {
        var reservation = await session.LoadAsync<Reservation>(command.ReservationId);

        reservation!.IsConfirmed = true;

        session.Store(reservation);

        // Kicking out a "cascaded" message
        return new ReservationConfirmed(reservation.Id);
    }
}

As I described in an earlier post, that handler — in conjunction with some Jasper middleware brought in by the [Transactional] attribute — is making database changes and publishing a corresponding message about those very changes. Those two actions are an atomic unit of logical work. They need to either both succeed, or not happen at all. We also need the message to only go out after the database changes succeed. Otherwise, there’s a possible race condition between the database changes being committed and the ReservationConfirmed message being processed in another thread with incorrect or inconsistent system data. And take my word for it on that last point, because I’ve seen that happen and directly cause production bugs.

Let’s quickly rule out an old fashioned two phase commit because that brings a lot of headaches I could do without for the rest of my career. As I mentioned in some of my earlier posts, Jasper comes out of the box with a transactional outbox built in for specifically the scenario shown in the handler above. Jasper has an add on Nuget library called Jasper.Persistence.Marten that adds some helpful integration between Jasper’s built in inbox/outbox “durability agent” and Marten. The setup code for that integration is just this:

// Normal Marten integration
builder.Services.AddMarten(opts =>
{
    opts.Connection("Host=localhost;Port=5433;Database=postgres;Username=postgres;password=postgres");
})
    // NEW! Adding Jasper outbox integration to Marten in the "messages"
    // database schema
    .IntegrateWithJasper("messages");

and Jasper’s own integration setup:

builder.Host.UseJasper(opts =>
{
    // more Jasper configuration for error handling etc.
});

To add some context, here’s the actual code (with some reformatting and annotations from me) that Jasper builds up at runtime around our command handler:

public class ConfirmReservationHandler615381178 : MessageHandler
{
    private readonly OutboxedSessionFactory _outboxedSessionFactory;

    public ConfirmReservationHandler615381178(OutboxedSessionFactory outboxedSessionFactory)
    {
        _outboxedSessionFactory = outboxedSessionFactory;
    }

    public override async Task HandleAsync(IExecutionContext context, CancellationToken cancellation)
    {
        var confirmReservation = (ConfirmReservation)context.Envelope.Message;

        // This is creating a Marten session that's hooked into the Jasper outbox for
        // the incoming ConfirmReservation message being processed here
        await using var documentSession = _outboxedSessionFactory.OpenSession(context);

        var reservationConfirmed = await ConfirmReservationHandler.Handle(confirmReservation, documentSession)
            .ConfigureAwait(false);
        // Outgoing, cascaded message
        await context.EnqueueCascadingAsync(reservationConfirmed).ConfigureAwait(false);

        // Commit the unit of work *and* start sending the ReservationConfirmed message
        await documentSession.SaveChangesAsync(cancellation).ConfigureAwait(false);
    }
}

There’s a lot going on up above behind the scenes, but the key elements of the message handler workflow code with the outbox above are:

  • The outgoing message will not actually be sent to its destination (a Rabbit MQ exchange? a local queue? a Pulsar topic?) until the database changes succeed. That’s true for messages directly sent to Jasper’s IMessagePublisher or IExecutionContext services within a command handler method as well.
  • The outgoing messages and all necessary metadata (including the destination) are persisted to a database table inside the exact same database transaction as the changes to the persistent model in the handler above. In the case of the Marten integration, the messages are persisted in the same database command as the Reservation document changes as a way to improve performance by reducing network round trips to the database server.
  • After the transaction succeeds, the previously registered Jasper outbox is notified by a Marten session listener to start publishing the outgoing “cascaded” messages in Jasper’s background sending queues which span across command executions.
  • If the transaction fails, no messages will be sent out

A Vision for Low Ceremony CQRS with Event Sourcing

Let me just put this stake into the ground. The combination of Marten and Jasper will quickly become the most productive and lowest ceremony tooling for event sourcing and CQRS architectures on the .NET stack.

And for any George Strait fans out there, this song may be relevant to the previous paragraph:)

CQRS and Event Sourcing

Just to define some terminology, the Command Query Responsibility Separation (CQRS) pattern was first described by Greg Young as an approach to software architecture where you very consciously separate “writes” that change the state of the system from “reads” against the system state. The key to CQRS is to use separate models of the system state for writing versus querying. That leads to something I think of as the “scary view of CQRS”:

The “scary” view of CQRS

The “query database” in my diagram is meant to be an optimized storage for the queries needed by the system’s clients. The “domain model storage” is some kind of persisted storage that’s optimized for writes — both capturing new data and presenting exactly the current system state that the command handlers would need in order to process or validate incoming commands.

Many folks have a visceral, negative reaction to this kind of CQRS diagram as it does appear to be more complexity than the more traditional approach where you have a single database model — almost inevitably in a relational database of some kind — and both reads and writes work off the same set of database tables. Hell, I walked out of an early presentation by Greg Young about what later became to be known as CQRS at QCon 2008 shaking my head thinking this was all nuts.

Here’s the deal though, when we’re writing systems against the one, true database model, we’re potentially spending a lot of time mapping incoming messages to the stored model, and probably even more time transforming the raw database model into a shape that’s appropriate for our system’s clients in a way that also creates some decoupling between clients and the ugly, raw details of our underlying database. The “scary” CQRS architecture arguably just brings that sometimes hidden work and complexity into the light of day.

Now let’s move on to Event Sourcing. Event sourcing is a style of system persistence where each state change is captured and stored as an explicit event object. There’s all sorts of value from keeping the raw change events around for later, but you of course do need to compile the events into derived, or “projected” models that represent the current system state. When combining event sourcing into a CQRS architecture, some projected models from the raw events can serve as both the “write model” inside of incoming commands, while others can be built as the “query model” that clients will use to query our system.

At the end of this section, I want to be clear that event sourcing and CQRS can be used independently of each other, but to paraphrase Forrest Gump, event sourcing and CQRS go together like peas and carrots.

Now, the event sourcing side of this especially might sound a little scary if you’re not familiar with some of the existing tooling, so let’s move on first to Marten…

Marten has a mature feature set for event sourcing already, and we’re grown quite a bit in capability toward our support for projections (read-only views of the raw events). If you’ll peek back at the “scary” view of CQRS above, Marten’s projection support solves the problem of keeping the raw events synchronized to both any kind of “write model” for incoming commands and the richer “query model” views for system clients within a single database. In my opinion, Marten removes the “scary” out of event sourcing and we’re on our way to being the best possible “event sourcing in a box” solution for .NET.

As that support has gotten more capable and robust though, our users have frequently been asking about how to subscribe to events being captured in Marten and how to relay those events reliably to some other kind of infrastructure — which could be a completely separate database, outgoing message queues, or some other kind of event streaming scenario.

Oskar Dudycz and I have been discussing how to solve this in Marten, and we’ve come up with these three main mechanisms for supporting subscriptions to event data:

  1. Some or all of the events being captured by Marten should be forwarded automatically at the time of event capture (IDocumentSession.SaveChangesAsync())
  2. When strict ordering is required, we’ll need some kind of integration into Marten’s async daemon to relay events to external infrastructure
  3. For massive amounts of data with ambitious performance targets, use something like Debezium to directly stream Marten events from Postgresql to Kafka/Pulsar/etc.

Especially for the first item in that list above, we need some kind of outbox integration with Marten sessions to reliably relay events from Marten to outgoing transports while keeping the system in a consistent state (i.e., don’t publish the events if the transaction fails).

Fortunately, there’s a functional outbox implementation for Marten in…

To be clear, Jasper as a project was pretty well mothballed by the combination of COVID and the massive slog toward Marten 4.0 (and then a smaller slog to 5.0). However, I’ve been able to get back to Jasper and yesterday kicked out a new Jasper 2.0.0-alpha-2 release and (Re) Introducing Jasper as a Command Bus.

For this post though, I want to explore the potential of the Jasper + Marten combination for CQRS architectures. To that end, a couple weeks I published Marten just got better for CQRS architectures, which showed some new APIs in Marten to simplify repetitive code around using Marten event sourcing within CQRS architectures. Part of the sample code in that post was this MVC Core controller that used some newer Marten functionality to handle an incoming command:

public async Task CompleteCharting(
    [FromBody] CompleteCharting charting, 
    [FromServices] IDocumentSession session)
{
    var stream = await session
        .Events.FetchForExclusiveWriting<ProviderShift>(charting.ShiftId);
 
    // Validation on the ProviderShift aggregate
    if (stream.Aggregate.Status != ProviderStatus.Charting)
    {
        throw new Exception("The shift is not currently charting");
    }
     
    // We "decided" to emit one new event
    stream.AppendOne(new ChartingFinished(stream.Aggregate.AppointmentId.Value, stream.Aggregate.BoardId));
 
    await session.SaveChangesAsync();
}

To review, that controller method:

  1. Takes in a command message of type CompleteCharting
  2. Loads the current state of the aggregate ProviderShift model referred to by the incoming command, and does so in a way that takes care of concurrency for us by waiting to get an exclusive lock on the particular ProviderShift
  3. Assuming that the validation against the ProviderShift succeeds, emits a new ChartingFinished event
  4. Saves the pending work with a database transaction

In that post, I pointed out that there were some potential flaws or missing functionality with this approach:

  1. We probably want some error handling to retry the operation if we hit concurrency exceptions or timeout trying to get the exclusive lock. In other words, we have to plan for concurrency exceptions
  2. It’d be good to be able to automatically publish the new ChartingFinished event to a queue to take further action within our system (or an external service if we were using messaging here)
  3. Lastly, I’d argue there’s some repetitive code up there that could be simplified

To address these points, I’m going to introduce Jasper and its integration with Marten (Jasper.Persistence.Marten) to the telehealth portal sample from my previous blog post.

I’m going to move the actual handling of the CompleteCharting to a Jasper handler shown below that is functionally equivalent to the controller method shown earlier (except I switched the concurrency protection to being optimistic):

// This is auto-discovered by Jasper
public class CompleteChartingHandler
{
    [MartenCommandWorkflow] // this opts into some Jasper middlware 
    public ChartingFinished Handle(CompleteCharting charting, ProviderShift shift)
    {
        if (shift.Status != ProviderStatus.Charting)
        {
            throw new Exception("The shift is not currently charting");
        }

        return new ChartingFinished(charting.AppointmentId, shift.BoardId);
    }
}

And the controller method gets simplified down to just relaying the command to Jasper:

    public Task CompleteCharting(
        [FromBody] CompleteCharting charting, 
        [FromServices] ICommandBus bus)
    {
        // Just delegating to Jasper here
        return bus.InvokeAsync(charting, HttpContext.RequestAborted);
    }

There’s some opportunity for some mechanisms to make the code above be a little less repetitive and efficient. Maybe by riding on Minimal APIs. That’s for a later date though:)

By using the new [MartenCommandWorkflow] attribute, we’re directing Jasper to surround the command handler with middleware that handles much of the Marten mechanics by:

  1. Loading the aggregate ProviderShift for the incoming CompleteCharting command (I’m omitting some details here for brevity, but there’s a naming convention that can be explicitly overridden to pluck the aggregate identity off the incoming command)
  2. Passing that ProviderShift aggregate into the Handle() method above
  3. Applying the returned event to the event stream for the ProviderShift
  4. Committing the outstanding changes in the active Marten session

The Handle() code above becomes an example of a Decider function. Even better yet, it’s completely decoupled from any kind of infrastructure and fully synchronous. I’m going to argue that this approach will make command handlers much easier to unit test, definitely easier to write, and easier to read later just because you’re only focused on the business logic.

So that covers the repetitive code problem, but let’s move on to automatically publishing the ChartingCompleted event and some error handling. I’m going to add Jasper through the application’s bootstrapping code as shown below:

builder.Host.UseJasper(opts =>
{
    // I'm choosing to process any ChartingFinished event messages
    // in a separate, local queue with persistent messages for the inbox/outbox
    opts.PublishMessage<ChartingFinished>()
        .ToLocalQueue("charting")
        .DurablyPersistedLocally();
    
    // If we encounter a concurrency exception, just try it immediately 
    // up to 3 times total
    opts.Handlers.OnException<ConcurrencyException>().RetryNow(3); 
    
    // It's an imperfect world, and sometimes transient connectivity errors
    // to the database happen
    opts.Handlers.OnException<NpgsqlException>()
        .RetryWithCooldown(50.Milliseconds(), 100.Milliseconds(), 250.Milliseconds());
});

Jasper comes with some pretty strong exception handling policy capabilities you’ll need to do grown up development. In this case, I’m just setting up some global policies in the application to retry message failures on either Marten concurrency exceptions or the inevitable, transient Postgresql connectivity hiccups. In the case of the concurrency exception, you may just need to start the work over to ensure you’re starting from the most recent aggregate changes. I used globally applied policies here, but Jasper will also allow you to override that on a message type by message type basis.

Lastly, let’s add the Jasper outbox integration for Marten and opt into automatic event publishing with this bit of configuration chained to the standard AddMarten() usage:

builder.Services.AddMarten(opts =>
{
    // Marten configuration...
})
    // I added this to enroll Marten in the Jasper outbox
    .IntegrateWithJasper()
    
    // I also added this to opt into events being forward to
    // the Jasper outbox during SaveChangesAsync()
    .EventForwardingToJasper();

And that’s actually that. The configuration above will add the Jasper outbox tables to the Marten database for us, and let Marten’s database schema management manage those extra database objects.

Back to the command handler (mildly elided):

public class CompleteChartingHandler
{
    [MartenCommandWorkflow] 
    public ChartingFinished Handle(CompleteCharting charting, ProviderShift shift)
    {
        // validation code goes here!

        return new ChartingFinished(charting.AppointmentId, shift.BoardId);
    }
}

By opting into the outbox integration and event forwarding to Jasper from Marten, when this command handler is executed, the ChartingFinished events will be published — in this case just to an in-memory queue, but it could also be to an external transport — with Jasper’s outbox implementation that guarantees that the message will be delivered at least once as long as the database transaction to save the new event succeeds.

Conclusion and What’s Next?

There’s a tremendous amount of work in the rear window to get to the functionality that I demonstrated here, and a substantial amount of ambition in the future to drive this forward. I would love any possible feedback both positive and negative. Marten is a team effort, but Jasper’s been mostly my baby for the past 3-4 years, and I’d be happy for anybody who would want to get involved with that. I’m way behind in documentation for Jasper and somewhat for Marten, but that’s in flight.

My next couple posts to follow up on this are to:

  • Do a deeper dive into Jasper’s outbox and explain why it’s different and arguably more useful than the outbox implementations in other leading .NET tools
  • Introduce the usage of Rabbit MQ with Jasper for external messaging
  • Take a detour into the development and deployment time command line utilities built into Jasper & Marten through Oakton

(Re) Introducing Jasper as a Command Bus

EDIT 6/15/2022: The correct Nuget is “Jasper.Persistence.Marten”

I just released a second alpha of Jasper 2.0 to Nuget. You can find the project goals for Jasper 2.0 here, and an update from a couple weeks ago here. Be aware that the published documentation for Jasper is very, very far behind. I’m working on it:)

Jasper is a long running open source project with the goal of creating a low ceremony and highly productive framework for building systems in .Net that would benefit from either an in memory command bus or utilize asynchronous messaging. The big driver for Jasper right now is using it in combination with the event sourcing capabilities of Marten as a full stack CQRS architectural framework. Later this week I’ll preview the ongoing Marten + Jasper integration, but for today I’d like to just introduce Jasper itself a little bit.

For a simplistic sample application, let’s say that we’re building an online system for potential customers to make reservations at any number of participating restaurants. I’m going to start by laying down a brand new .Net 6 Web API project. I’m obviously going to choose Marten as my persistence tooling, so the next steps are to add a Nuget reference to the Jasper.Persistence.Marten Nuget which will bring transitive dependencies over for both Jasper and Marten.

Jasper also has some working integration with EF Core using either Sql Server or Postgresql as the backing store so far.

Let’s build an online reservation system for your favorite restaurants!

Let’s say that as a result of an event storming requirements session, we’ve determined that we want both a command message to confirm a reservation, and a corresponding event message out to the internal systems of the various restaurants. I’m going to eschew event sourcing to keep this simpler and just opt for a persistent Reservation document in Marten. All that being said, here’s our code to model everything I just described:

public record ConfirmReservation(Guid ReservationId);
public record ReservationConfirmed(Guid ReservationId);

public class Reservation
{
    public Guid Id { get; set; }
    public DateTimeOffset Time { get; set; }
    public string RestaurantName { get; set; }
    public bool IsConfirmed { get; set; }
}

Building Our First Message Handlers

In this contrived example, the ReservationConfirmed event message will be published separately because it spawns a call to an external system where I’d strongly prefer to have a separate “retry loop” around just that call. That being said, this is the first cut for a command handler for the ConfirmReservation message:

public class ConfirmReservationHandler
{
    public async Task Handle(ConfirmReservation command, IDocumentSession session, IExecutionContext publisher)
    {
        var reservation = await session.LoadAsync<Reservation>(command.ReservationId);

        reservation!.IsConfirmed = true;

        // Watch out, this could be a race condition!!!!
        await publisher.PublishAsync(new ReservationConfirmed(reservation.Id));

        // We're coming back to this in a bit......
        await session.SaveChangesAsync();
    }
}

To be technical, Jasper uses an in memory outbox for all message processing even if there’s no durable message storage to at least guarantee that outgoing messages are only published when the original message is successfully handled. I just wanted to show the potential danger here.

So a couple things to note that are different from existing tools like NServiceBus or MassTransit:

  • Jasper locates message handlers strictly through naming conventions. Public methods named either Handle() or Consume() on public types that are suffixed by Handler or Consumer. There are no mandatory attributes or interfaces. Hell, there’s not even a mandatory method signature except that the first argument is always assumed to be the message type.
  • Jasper Handle() methods can happily support method injection, meaning that the IDocumentSession parameter above is pushed into the method from Jasper itself. In my experience, using method injection frequently simplifies the message handler code as opposed to the idiomatic C# approach of using constructor injection and relaying things through private fields.
  • Message types in Jasper are just concrete types, and there’s no necessary Event or Message base classes of any type — but that may be introduced later strictly for optional diagnostics.

Lastly, notice my comment about the race condition between publishing the outgoing ReservationConfirmed event message and committing the database work through IDocumentSession.SaveChangesAsync(). That’s obviously a problem waiting to bite us, so we’ll come back to that.

Next, let’s move on to the separate handler for ReservationConfirmed:

[LocalQueue("Notifications")]
[RetryNow(typeof(HttpRequestException), 50, 100, 250)]
public class ReservationConfirmedHandler
{
    public async Task Handle(ReservationConfirmed confirmed, IQuerySession session, IRestaurantProxy restaurant)
    {
        var reservation = await session.LoadAsync<Reservation>(confirmed.ReservationId);

        // Make a call to an external web service through a proxy
        await restaurant.NotifyRestaurant(reservation);
    }
}

All this handler does is look up the current state of the reservation and post that to an external system through a proxy interface (IRestaurantProxy).

As I said before, I strongly prefer that calls out to external systems be isolated to their own retry loops. In this case, the [RetryNow] attribute is setting up Jasper to retry a command through this handler on transient HttpException errors with a 50, 100, and then 250 millisecond cooldown period between attempts. Jasper’s error handling policies go much deeper than this, but hopefully you can already see what’s possible.

The usage of the [LocalQueue("Notifications")] attribute is directing Jasper to execute the ReservationConfirmed messages in a separate, local queue named “Notifications”. In effect, we’ve got a producer/consumer solution between the incoming ConfirmReservation command and ReservationConfirmed event messages. The local queueing is done with the TPL Dataflow library. Maybe Jasper will eventually move to using System.Threading.Channels, but for right now there’s just bigger issues to worry about.

Don’t fret if you don’t care for sprinkling attributes all over your code, all of the configuration I’ve done above with attributes can also be done with a fluent interface at bootstrapping time, or even within the message handler classes themselves.

Bootstrapping Jasper

Stepping into the Program.cs file for our new system, I’m going to add bootstrapping for both Marten and Jasper in the simplest possible way like so:

using CommandBusSamples;
using Jasper;
using Marten;
using Oakton;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddMarten(opts =>
{
    opts.Connection("Host=localhost;Port=5433;Database=postgres;Username=postgres;password=postgres");
});

// Adding Jasper as a straight up Command Bus with all
// its default configuration
builder.Host.UseJasper();

var app = builder.Build();

// This isn't *quite* the most efficient way to do this,
// but it's simple to understand, so please just let it go...
// I'm just delegating the HTTP request body as a command 
// directly to Jasper's in memory ICommandBus
app.MapPost("/reservations/confirm", (ConfirmReservation command, ICommandBus bus) => bus.InvokeAsync(command));


// This opts into using Oakton for extended command line options for this app
// Oakton is also a transitive dependency of Jasper itself
return await app.RunOaktonCommands(args);

Okay, so let’s talk about some of the things in that code up above:

  • Jasper tries to embrace the generic host building and core abstractions that came with .Net Core (IHostBuilder, ILogger, IHostedService etc.) wherever possible, so hence the integration happens with the UseJasper() call seen above.
  • The call to UseJasper() also quietly sets up Lamar as the underlying IoC container for your application. I won’t get into that much here, but there are optimizations in Jasper’s runtime model that require Lamar.
  • I used Oakton as the command line parser. That’s not 100% necessary, but there are a lot of development time utilities with Oakton for Jasper development. I’ll show some of that in later posts building on this one.

The one single HTTP route calls directly into the Jasper ICommandBus.InvokeAsync() method to immediately execute the message handler inline for the ConfirmReservation message. As someone who’s a skeptic of “mediator” tools in AspNetCore, I’m not sure this really adds much value as the handler for ConfirmReservation is currently written. However, we can add some transient error handling to our application’s bootstrapping that would apply to the ICommandBus.InvokeAsync() calls like so:

builder.Host.UseJasper(opts =>
{
    // Just setting up some retries on transient database connectivity errors
    opts.Handlers.OnException<NpgsqlException>().OrInner<NpgsqlException>()
        .RetryWithCooldown(50.Milliseconds(), 100.Milliseconds(), 250.Milliseconds());
});

We can also opt to make our ConfirmReservation commands be processed in background threads rather than inline through our web request by changing the Minimal API route to:

// This isn't *quite* the most efficient way to do this,
// but it's simple to understand, so please just let it go...
app.MapPost("/reservations/confirm", (ConfirmReservation command, ICommandBus bus) => bus.EnqueueAsync(command));

The EnqueueAsync() method above places the incoming command message into an in-memory queue.

What if the process dies mid-flight?!?

An experienced user of asynchronous messaging tools will have happily spotted several potential problems in the solution so far. One, there’s a potential race condition in the ConfirmReservationHandler code between database changes being committed and the outgoing message being processed. Two, what if the process dies? If we’re using all this in-memory queueing stuff, that all dies when the process dies, right?

Fortunately, Jasper with some significant help from Postgresql and Marten here, already has a robust inbox and outbox implementation we’ll add next for durable messaging.

For clarity, here’s the original handler code again for the ConfirmReservation message:

public class ConfirmReservationHandler
{
    public async Task Handle(ConfirmReservation command, IDocumentSession session, IExecutionContext publisher)
    {
        var reservation = await session.LoadAsync<Reservation>(command.ReservationId);

        reservation!.IsConfirmed = true;

        // Watch out, this could be a race condition!!!!
        await publisher.PublishAsync(new ReservationConfirmed(reservation.Id));

        // We're coming back to this in a bit......
        await session.SaveChangesAsync();
    }
}

Please note the comment about the race condition in the code. What we need to do is to introduce Jasper’s outbox feature, then revisit this handler.

First though, I need to go back to the bootstrapping code in Program and add a little more code:

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddMarten(opts =>
{
    opts.Connection("Host=localhost;Port=5433;Database=postgres;Username=postgres;password=postgres");
})
    // NEW! Adding Jasper outbox integration to Marten in the "messages"
    // database schema
    .IntegrateWithJasper("messages");

// Adding Jasper as a straight up Command Bus
builder.Host.UseJasper(opts =>
{
    // Just setting up some retries on transient database connectivity errors
    opts.Handlers.OnException<NpgsqlException>().OrInner<NpgsqlException>()
        .RetryWithCooldown(50.Milliseconds(), 100.Milliseconds(), 250.Milliseconds());

    // NEW! Apply the durable inbox/outbox functionality to the two in-memory queues
    opts.DefaultLocalQueue.DurablyPersistedLocally();
    opts.LocalQueue("Notifications").DurablyPersistedLocally();

    // And I just opened a GitHub issue to make this config easier...
});

var app = builder.Build();

// This isn't *quite* the most efficient way to do this,
// but it's simple to understand, so please just let it go...
app.MapPost("/reservations/confirm", (ConfirmReservation command, ICommandBus bus) => bus.EnqueueAsync(command));

// This opts into using Oakton for extended command line options for this app
// Oakton is also a transitive dependency of Jasper itself
return await app.RunOaktonCommands(args);

This time I chained a call on the Marten configuration through the UseWithJasper() extension method. I also added a couple lines of code within the UseJasper() block to mark the in memory queues as being enrolled in the inbox/outbox mechanics through the DurablyPersistedLocally() method.

Now, back to our handler code. Keeping things explicit for now, I’m going to add some necessary mechanics for opting into outbox sending:

public class ConfirmReservationHandler
{
    public async Task Handle(ConfirmReservation command, IDocumentSession session, IExecutionContext publisher)
    {
        // Enroll the execution context and Marten session in
        // outbox sending
        // This is an extension method in Jasper.Persistence.Marten
        await publisher.EnlistInOutboxAsync(session);
        
        var reservation = await session.LoadAsync<Reservation>(command.ReservationId);

        reservation!.IsConfirmed = true;

        // No longer a race condition, I'll explain more below:)
        await publisher.PublishAsync(new ReservationConfirmed(reservation.Id));

        // Persist, and kick out the outgoing messages
        await session.SaveChangesAsync();
    }
}

I’m going to utilize Jasper/Marten middleware in the last section to greatly simplify the code above, so please read to the end:)

With this version of the handler code, thing are working a little differently:

  • The call to PublishAsync() does not immediately release the message to the in memory queue. Instead, it’s routed and held in memory by the IExecutionContext for later
  • When IDocumentSession.SaveChangesAsync() is called, the outgoing messages are persisted into the underlying database in the same database transaction as the change to the Reservation document. Using the Marten integration, the Jasper outbox can even do this within the exact same batched database command as a minor performance optimization
  • At the end of IDocumentSession.SaveChangesAsync() upon a successful transaction, the outgoing messages are kicked out into the outgoing message sending queues in Jasper.

The outbox usage here solves a couple issues. First, it eliminates the race condition between the outgoing messages and the database changes. Secondly, it prevents situations of system inconsistency where either the message succeeds and the database changes fail, or vice versa.

I’ll be writing up a follow up post later this week or next diving deeper into Jasper’s outbox implementation. For a quick preview, by taking a very different approach than existing messaging tools in .Net, Jasper’s outbox is already usable in more scenarios than other alternatives and I’ll try to back that assertion up next time. To answer the obvious question in the meantime, Jasper’s outbox gives you an at least once delivery guarantee even if the current process fails.

Streamline the Handler Mechanics

So the “register outbox, public messages, commit session transaction” dance could easily get repetitive in your code. Jasper’s philosophy is that repetitive code is wasteful, so let’s eliminate the cruft-y code we were writing strictly for Marten or Jasper in the ConfirmReservationHandler. The code below is the exact functional equivalent to the earlier handler — even down to enrolling in the outbox:

public static class ConfirmReservationHandler
{
    [Transactional]
    public static async Task<ReservationConfirmed> Handle(ConfirmReservation command, IDocumentSession session)
    {
        var reservation = await session.LoadAsync<Reservation>(command.ReservationId);

        reservation!.IsConfirmed = true;

        session.Store(reservation);

        // Kicking out a "cascaded" message
        return new ReservationConfirmed(reservation.Id);
    }
}

The usage of the [Transactional] attribute opts only this handler into using Marten transactional middleware that handles all the outbox enrollment mechanics and calls IDocumentSession.SaveChangesAsync() for us after this method is called.

By returning Task<ReservationConfirmed>, I’m directing Jasper to publish the returned value as a message upon the successful completion of the incoming message. I personally like the cascading message pattern in Jasper as a way to make unit testing handler code easier. This was based on a much older custom service bus I helped build and run in production in the mid-10’s.

On the next episode of “please, please pay attention to my OSS projects!”

My very next post is a sequel to Marten just got better for CQRS architectures, but this time using some new Jasper functionality with Marten to further streamline out repetitive code.

Following behind that, I want to write follow ups doing a deeper dive on Jasper’s outbox implementation, using Rabbit MQ with Jasper, then a demonstration of all the copious command line utilities built into both Jasper and Marten.

As my mentor from Dell used to say, “thanks for listening, I feel better now”

Marten just got better for CQRS architectures

I’m assuming some prior knowledge of Event Sourcing as an architectural pattern here. I highly recommend Oskar Dudycz’s Introduction to Event Sourcing training kit or this video from Derek Comartin. While both Event Sourcing and the closely associated CQRS architectural style are both useful without the other, I’m still assuming here that you’re interested in using Marten for event sourcing within a larger CQRS architecture.

So you’re adopting an event sourcing style with Marten for your persistence within a larger CQRS architectural style. Crudely speaking, all “writes” to the system state involve sending a command message to your CQRS service with a workflow something like this:

In the course of handling the command message, our command handler (or HTTP endpoint) needs to:

  1. Fetch a “write model” that represents the state for the current workflow. This projected “write model” will be used by the command handler to validate the incoming command and also to…
  2. Decide what subsequent events should be published to update the state of the system based on the existing state and the incoming command
  3. Persist the new events to the ongoing Marten event store
  4. Possibly publish some or all of the new events to an outgoing transport to be acted upon asynchronously
  5. Deal with concurrency concerns, especially if there’s any significant chance that other related commands maybe coming in for the same logical workflow at the same time

Do note that as I shift to implementations that I’m going to mostly bypass any discussion of design patterns or what I personally consider to be useless cruft from common CQRS approaches in the .Net or JVM worlds. I.e., no repositories will be used in any of this code.

As an example system, let’s say that we’re building a new, online telehealth system that among other things will track how a medical provider spends their time during a shift helping patients during their workday. Using Marten’s “self-aggregate” support, a simplified version of the provider shift state is represented by this model:

public class ProviderShift
{
    public Guid Id { get; set; }
    
    // Pay attention to this, this will come into play
    // later
    public int Version { get; set; }
    
    public Guid BoardId { get; private set; }
    public Guid ProviderId { get; init; }
    public ProviderStatus Status { get; private set; }
    public string Name { get; init; }
    
    public Guid? AppointmentId { get; set; }
    
    public static async Task<ProviderShift> Create(ProviderJoined joined, IQuerySession session)
    {
        var provider = await session.LoadAsync<Provider>(joined.ProviderId);
        return new ProviderShift
        {
            Name = $"{provider.FirstName} {provider.LastName}",
            Status = ProviderStatus.Ready,
            ProviderId = joined.ProviderId,
            BoardId = joined.BoardId
        };
    }

    public void Apply(ProviderReady ready)
    {
        AppointmentId = null;
        Status = ProviderStatus.Ready;
    }

    public void Apply(ProviderAssigned assigned)
    {
        Status = ProviderStatus.Assigned;
        AppointmentId = assigned.AppointmentId;
    }
    
    public void Apply(ProviderPaused paused)
    {
        Status = ProviderStatus.Paused;
        AppointmentId = null;
    }

    // This is kind of a catch all for any paperwork the
    // provider has to do after an appointment has ended
    // for the just concluded appointment
    public void Apply(ChartingStarted charting) => Status = ProviderStatus.Charting;
}

Next up, let’s play the user story for a provider to make their “charting” activity complete after a patient appointment concludes. Looking at the sequence diagram and the bullet list of concerns for each command handler, we’ve got a few things to worry about. Never fear though, because Marten has you (mostly) covered today with a couple new features introduced in Marten v5.4 last week.

Starting with this simple command:

public record CompleteCharting(
    Guid ShiftId, 
    Guid AppointmentId, 
    int Version);

We’ll use Marten’s brand new IEventStore.FetchForWriting<T>() API to whip up the basic command handler (just a small ASP.Net Core Controller endpoint):

    public async Task CompleteCharting(
        [FromBody] CompleteCharting charting, 
        [FromServices] IDocumentSession session)
    {
        /* We've got options for concurrency here! */
        var stream = await session
            .Events.FetchForWriting<ProviderShift>(charting.ShiftId);

        // Validation on the ProviderShift aggregate
        if (stream.Aggregate.Status != ProviderStatus.Charting)
        {
            throw new Exception("The shift is not currently charting");
        }
        
        // We "decided" to emit one new event
        stream.AppendOne(new ChartingFinished(stream.Aggregate.AppointmentId.Value, stream.Aggregate.BoardId));

        await session.SaveChangesAsync();
    }

The FetchForWriting() method used above is doing a couple different things:

  1. Finding the current, persisted version of the event stream for the provider shift and loading that into the current document session to help with optimistic concurrency checks
  2. Fetching the current state of the ProviderShift aggregate for the shift id coming up on the command. Note that this API papers over whether or not the aggregate in question is a “live aggregate” that needs to be calculated on the fly from the raw events or previously persisted as just a Marten document by either an inline or asynchronous projection. I think I would strongly recommend that “write model” aggregates be either inline or live to avoid eventual consistency issues.

Concurrency?!?

Hey, the hard truth is that it’s easy for the command to be accidentally or incidentally dispatched to your service multiple times from messaging infrastructure, multiple users doing the same action in different sessions, or somebody clumsy like me just accidentally clicking a button too many times. One way or another, we may need to harden our command handler against concurrency concerns.

The usage of FetchForWriting<T>() will actually set you up for optimistic concurrency checks. If someone else manages to successfully process a command against the same provider shift between the call to FetchForWriting<T>() and IDocumentSession.SaveChangesAsync(), you’ll get a Marten ConcurrencyException thrown by SaveChangesAsync() that will abort and rollback the transaction.

Moving on though, let’s tighten up the optimistic version check by first telling Marten what the version of the provider shift was that our command thinks that the provider shift is at on the server. First though, we need to get the current version back to the client that’s collecting changes to our provider shift. If you scan back to the ProviderShift aggregate above, you’ll see this property:

    public int Version { get; set; }

With another new little feature in Marten v5.4, the Marten projection support will automatically set the value of a Version to the latest stream version for a single stream aggregate like the ProviderShift. Knowing that, and assuming that ProviderShift is updated inline, we could just deliver the whole ProviderShift to the client with this little web service endpoint (using Marten.AspNetCore extensions):

    [HttpGet("/shift/{shiftId}")]
    public Task GetProviderShift(Guid shiftId, [FromServices] IQuerySession session)
    {
        return session.Json.WriteById<ProviderShift>(shiftId, HttpContext);
    }

The Version property can be a field, scoped as internal, or read-only. Marten is using a dynamically generated Lambda that can happily bypass whatever scoping rules you have to set the version to the latest event for the stream represented by this aggregate. The Version naming convention can also be explicitly ignored, or redirected to a totally differently named member. Lastly, it can even be a .Net Int64 type too — but if you’re doing that, you probably have some severe modeling issues that should be addressed first!

Back to our command handler. If the client has what’s effectively the “expected starting version” of the ProviderShift and sends the CompleteCharting command with that version, we can change the first line of our handler method code to this:

        var stream = await session
                
            // Note: I'm passing in the expected, starting provider shift
            // version from the command
            .Events.FetchForWriting<ProviderShift>(charting.ShiftId, charting.Version);

This new version will throw a ConcurrencyException right off the bat if the expected, starting version is not the same as the last, persisted version in the database. After that, it’s the same optimistic concurrency check at the point of calling SaveChangesAsync() to commit the changes.

Lastly, since Marten is built upon a real database instead of trying to be its own specialized storage engine like many other event sourcing tools, we’ve got one last trick. Instead of putzing around with optimistic concurrency checks let’s go to a pessimistic, exclusive lock on the specific provider shift so that only one session at a time can ever be writing to that provider shift with this variation:

        var stream = await session
                
            // Note: This will try to "wait" to claim an exclusive lock for writing
            // on the provider shift event stream
            .Events.FetchForExclusiveWriting<ProviderShift>(charting.ShiftId);
        

As you can see, Marten has some new functionality to make it even easier to use Marten within CQRS architectures by eliminating some previously repetitive code in both queries on projected state and in command handlers where you need to use Marten’s concurrency control.

Wait, not so fast, you missed some things!

I missed a couple very big things in the sample code above. For one, we’d probably want to broadcast the new events through some kind of service bus to allow other systems or just our own system to asynchronously do other work (like trying to assign our provider to another ready patient appointment). To do that reliably so that the event capture and the outgoing events being published succeed or fail together in one atomic action, I really need an “outbox” of some sort integrated into Marten.

I also left out any kind of potential error handling or message retry capabilities around the concurrency exceptions. And lastly (that I can think of offhand), I completely left out any discussion of the instrumentation you’d want in any kind of grown up system.

Since we’re in the middle of the NBA playoffs, I’m reminded of a Shaquille O’Neal quote from when his backup was Alonzo Mourning, and Mourning had a great game off the bench: “sometimes Superman needs some help from the Incredible Hulk.” In this case, part of the future of Marten is to be combined with another project called Jasper that is going to add external messaging with a robust outbox implementation for Marten to create a full stack for CQRS architectures. Maybe as soon as late next week or at least in June, I’ll write a follow up showing the Marten + Jasper combination that deals with the big missing pieces of this post.

Update on Jasper v2 with an actual alpha

First off, my super power (stop laughing at me!) is having a much longer attention span than the average software developer. In positive ways, this has enabled me to tackle very complex problems. In negative ways, I’ve probably wasted a tremendous amount of time in my career working on systems or projects long after they had probably already failed and I just wouldn’t admit it.

So late last year I started working on a reboot of Jasper, my attempt at creating a “next generation” messaging framework for .Net. The goal of Jasper has changed quite a bit since I started jotting down notes for it in 2014, but the current vision is to be a highly productive command execution engine and asynchronous messaging tool for .Net with less code ceremony than the currently popular tools in this space.

I kicked out a Jasper v 2.0.0-alpha-1 release this week just barely in time for my talk at That Conference yesterday (but didn’t end up showing it at all). Right now the intermediate goals to get to a full Jasper 2.0 rebooted project is to:

  • Finish the baked in Open Telemetry support. It’s there, but there’s holes in what’s being captured
  • Get the interop with MassTransit via Rabbit MQ working for more scenarios. I’ve got a successful proof of concept of bi-directional interaction between Jasper and MassTransit services
  • Finish documentation for the new 2.0 version. I moved the docs to VitePress and started re-writing the docs from scratch, and that takes time

The first two bullet points are all about getting Jasper ready to be something I could dogfood at work.

While I absolutely intend both Jasper and Marten to be perfectly usable without the other, there’s also going to be some specific integration between Jasper and Marten to create a full blown, opinionated CQRS stack for .Net development (think Axon for .Net, but hopefully with much less code ceremony). For this combination, the Marten team is talking about adding messaging subscriptions for the Marten event store functionality, Jasper middleware to reduce repetitive CQRS handler code, and using the outbox functionality in Jasper to also integrate Marten with external messaging infrastructure.

I’ll kick out actual content about all this in the next couple weeks, but a couple folks have noticed the big uptick in Jasper work and asked what was going on, so here’s a little blog post on it:)

Resetting Marten Database State Between Tests

TL;DR: Marten has a new method in V5 called ResetAllData() that’s very handy for rolling back database state to a known point in automated tests.

I’m a big believer in utilizing intermediate level integration tests. By this I mean the middle layer of the typical automated testing pyramid where you’re most definitely testing through your application’s infrastructure, but not necessarily running the system end to end.

Now, any remotely successful test automation strategy means that you have to be able to exert some level of control over the state of the system leading into a test because all automated tests need the combination of known inputs and expected outcomes. To that end, Marten has built in support for completely rolling back the state of a Marten-ized database between tests that I’ll be demonstrating in this post.

When I’m working on a system that uses a relational database, I’m a fan of using Respawn from Jimmy Bogard that helps you rollback the state of a database to its beginning point as part of integration test setup. Likewise, Marten has the “clean” functionality for the same purpose:

public async Task clean_out_documents(IDocumentStore store)
{
    // Completely remove all the database schema objects related
    // to the User document type
    await store.Advanced.Clean.CompletelyRemoveAsync(typeof(User));

    // Tear down and remove all Marten related database schema objects
    await store.Advanced.Clean.CompletelyRemoveAllAsync();

    // Deletes all the documents stored in a Marten database
    await store.Advanced.Clean.DeleteAllDocumentsAsync();

    // Deletes all of the persisted User documents
    await store.Advanced.Clean.DeleteDocumentsByTypeAsync(typeof(User));

    // For cases where you may want to keep some document types,
    // but eliminate everything else. This is here specifically to support
    // automated testing scenarios where you have some static data that can
    // be safely reused across tests
    await store.Advanced.Clean.DeleteDocumentsExceptAsync(typeof(Company), typeof(User));
    
    // And get at event storage too!
    await store.Advanced.Clean.DeleteAllEventDataAsync();
}

So that’s tearing down data, but many if not most systems will need some baseline reference data to function. We’re still in business though, because Marten has long had a concept of initial data applied to a document store on its start up with the IInitialData interface. To illustrate that interface, here’s a small sample implementation:

    internal class BaselineUsers: IInitialData
    {
        public async Task Populate(IDocumentStore store, CancellationToken cancellation)
        {
            using var session = store.LightweightSession();
            session.Store(new User
            {
                UserName = "magic",
                FirstName = "Earvin",
                LastName = "Johnson"
            });

            session.Store(new User
            {
                UserName = "sircharles",
                FirstName = "Charles",
                LastName = "Barkley"
            });

            await session.SaveChangesAsync(cancellation);
        }
    }

And the BaselineUsers type could be applied like this during initial application configuration:

using var host = await Host.CreateDefaultBuilder()
    .ConfigureServices(services =>
    {
        services.AddMarten(opts =>
        {
            opts.Connection("some connection string");
        }).InitializeWith<BaselineUsers>();
    }).StartAsync();

Or, maybe a little more likely, if you have some reference data that’s only applicable for your automated testing, we can attach our BaselineUsers data set to Marten, but **only in our test harness** with usage like this:

// First, delegate to your system under test project's
// Program.CreateHostBuilder() method to get the normal system configuration
var host = Program.CreateHostBuilder(Array.Empty<string>())

    // But next, apply initial data to Marten that we need just for testing
    .ConfigureServices(services =>
    {
        // This will add the initial data to the DocumentStore
        // on application startup
        services.InitializeMartenWith<BaselineUsers>();
    }).StartAsync();

For some background, as of V5 the mechanics for the initial data set feature moved to executing in an IHostedService so there’s no more issue of asynchronous code being called from synchronous code with the dreaded “will it dead lock or am I feeling lucky?” GetAwaiter().GetResult() mechanics.

Putting it all together with xUnit

The way I like to do integration testing with xUnit (the NUnit mechanics would involve static members, but the same concepts of lifetime still apply) is to have a “fixture” class that will bootstrap and hold on to a shared IHost instance for the system under test between tests like this one:

    public class MyAppFixture: IAsyncLifetime
    {
        public IHost Host { get; private set; }

        public async Task InitializeAsync()
        {
            // First, delegate to your system under test project's
            // Program.CreateHostBuilder() method to get the normal system configuration
            Host = await Program.CreateHostBuilder(Array.Empty<string>())

                // But next, apply initial data to Marten that we need just for testing
                .ConfigureServices(services =>
                {
                    services.InitializeMartenWith<BaselineUsers>();
                }).StartAsync();
            }

        public async Task DisposeAsync()
        {
            await Host.StopAsync();
        }
    }

Next, I like to have a base class for integration tests that in this case will consume the MyAppFixture above, but also reset the Marten database between tests with the new V5 IDocumentStore.Advanced.ResetAllStore() like this one:

    public abstract class IntegrationContext : IAsyncLifetime
    {
        protected IntegrationContext(MyAppFixture fixture)
        {
            Services = fixture.Host.Services;
        }

        public IServiceProvider Services { get; set; }

        public Task InitializeAsync()
        {
            var store = Services.GetRequiredService<IDocumentStore>();

            // This cleans out all existing data, and reapplies
            // the initial data set before all tests
            return store.Advanced.ResetAllData();
        }

        public virtual Task DisposeAsync()
        {
            return Task.CompletedTask;
        }
    }

Do note that I left out some xUnit ICollectionFixture mechanics that you might need to do to make sure that MyAppFixture is really shared between tests. See xUnit’s Shared Context documentation.

Improving the Development and Production Time Experience with Marten V5

Marten V5 dropped last week, with significant new features for multi-tenancy scenarios and enabling users to use multiple Marten document stores in one .Net application. A big chunk of the V5 work was mostly behind the scenes trying to address user feedback from the much larger V4 release late last year. As always, the Marten documentation is here.

First, why didn’t you just…

I’d advise developers and architects to largely eliminate the word “just” and any other lullabye language from their vocabulary when talking about technical problems and solutions.

That being said:

  • Why didn’t you just use source generators instead? Most of this was done before source generators were released, and source generators are limited to information that’s available at compile time. The dynamic code generation in Marten is potentially using information that is only available at run time
  • Why didn’t you just use IL generation instead? Because I despise working directly with IL and I think that would have dramatically curtailed what was easily possible. It’s also possible that we end up having to go there eventually.

Setting the Stage

Consider this simplistic code to start a new Marten DocumentStore against a blank database and persist a single User document:

var store = DocumentStore.For("connection string");

await using var session = store.LightweightSession();
var user = new User
{
    UserName = "pmahomes", 
    FirstName = "Patrick", 
    LastName = "Mahomes"
};

session.Store(user);
await session.SaveChangesAsync();

Hopefully that code is simple enough for new users to follow and immediately start being productive with Marten. The major advantage of document databases over the more traditional RDBMS with or without an ORM is the ability to just get stuff done without having to spend a lot of time configuring databases or object to database mappings or anywhere as much underlying code to just read and write data. To that end, there’s a lot of stuff going on behind the scenes of that code up above.

First off, there’s some automatic database schema management. In the default configuration used up above, Marten is quietly checking the underlying database on the first usage of the User document type to see if the database matches Marten’s configuration for the User document, and applies database migrations at runtime to change the database as necessary.

Secondly, there’s some runtime code generation happening to “bake in” the internal handling of how User documents are read from and written to the database. It’s not apparent here, but there’s a lot of knobs you can twist in Marten to change the behavior of how a document type is stored and retrieved from the database (soft deletes, turning on more metadata tracking, turning off default metadata tracking to be leaner, etc.). That behavior even varies between the lightweight session I used up above and the behavior of IDocumentStore.OpenSession() that adds identity map behavior to the session. To be more efficient over all, Marten generates the tightest possible C# code to handle each document type, then in the default mode, actually compiles that code in memory with Roslyn and uses the dynamically built assembly.

Cool, right? I’d argue that Marten can make teams be far more productive than they would be with the more typical EF Core or Dapper backed approach. Now let’s move on to the unfortunately very real downsides of Marten’s approach and what we’ve done to improve matters:

  • The dynamic Roslyn code generation can sometimes incur a major “cold start” issue on the very first usage. It’s definitely not consistent, as some people do not see any noticeable impact and other folks tell me they get a 9 second delay on the first usage. This cold start issue is especially problematic for folks using Marten in a Serverless architecture
  • The dynamically generated code can’t be used for any kind of potentially valuable AOT optimization
  • Roslyn usage sometimes causes a big ol’ memory leak no matter what we try. This isn’t consistent, so I don’t know why
  • The database change tracking does do some in memory locking, and that’s been prone to dead lock issues in some flavors of .Net (Blazor, WPF)
  • Some of you won’t want to give your application rights to modify a database at runtime
  • In Marten V4 there were a few too many places where Marten was executing the database change detection asynchronously, but from within synchronous calls using the dreaded .GetAwaiter().GetResult() approach. Occasional deadlock issues occurred, mostly in Marten usage within Blazor.

Database Migration Improvements

Alright, let’s tackle the database migration issues first. Marten has long had some command line support so that you could detect and apply any outstanding database changes from your application itself with this call:

dotnet run -- marten-apply

If you use the command line tooling for migrations, you can now optimize Marten to just turn off all runtime database migrations like so:

using var host = Host.CreateDefaultBuilder()
    .ConfigureServices(services =>
    {
        services
            .AddMarten(opts =>
            {
                opts.Connection("connection string");
                opts.AutoCreateSchemaObjects = AutoCreate.None;
            });
    }).StartAsync();

Other folks won’t want to use the command line tooling, so there’s another option to just do all database migrations on database startup once, but otherwise completely eliminate all other potential locking in Marten V5, but this time I have to use the IHost integration:

using var host = Host.CreateDefaultBuilder()
    .ConfigureServices(services =>
    {
        services
            .AddMarten(opts =>
            {
                opts.Connection("connection string");
                
                // Mild compromise, now I've got to tell
                // Marten about the User document
                opts.RegisterDocumentType<User>();
            })

            // This tells the app to do all database migrations
            // at application startup time
            .ApplyAllDatabaseChangesOnStartup();
    }).StartAsync();

In case you’re wondering, this option is safe to use even if you have multiple application nodes starting up simultaneously. The V5 version here relies on global locks in Postgresql itself to prevent simultaneous database changes that previously resulted in interestingly chaotic failure:(

Pre-building the Generated Types

Now, onto dealing with the dynamic codegen aspect of things. V4 created a “build types ahead” model where you can generate all the dynamic code with this command line call:

dotnet run -- codegen write

You can now completely dodge the runtime code generation issue by this sequence of events:

  1. In your deployment scripts, run dotnet run -- codegen write first
  2. Compile your application, which will embed the newly generated code right into your application’s entry assembly
  3. Use the below setting to completely disable all dynamic codegen:
using var host = Host.CreateDefaultBuilder()
    .ConfigureServices(services =>
    {
        services
            .AddMarten(opts =>
            {
                opts.Connection("connection string");

                // Turn off all dynamic code generation, but this
                // will blow up if the necessary type isn't compiled
                // into 
                opts.GeneratedCodeMode = TypeLoadMode.Static;
            });
    }).StartAsync();

Again though, this depends on you having all document types registered with Marten instead of depending on runtime discovery as we did in the very first sample in this post — and that’s a bit of friction. What we’ve found is that folks have found the origin pre-built generation model to be clumsy, so we went back to the drawing board for Marten V5 and came up with the…

“Auto” Generated Code Mode

For V5, we have the option shown below:

using var host = Host.CreateDefaultBuilder()
    .ConfigureServices(services =>
    {
        services
            .AddMarten(opts =>
            {
                opts.Connection("connection string");

                // use pre-built code if it exists, or
                // generate code if it doesn't and "just work"
                opts.GeneratedCodeMode = TypeLoadMode.Auto;
            });
    }).StartAsync();

My thinking here is that you’d just keep this on all the time, and as long as you’re running the application locally or through your integration test suite (you have one of those, right?), you’d have the dynamic types written to your main project’s code automatically (in an /Internal/Generated folder). Unless you purposely add those to your source control’s ignore list, that code will also be checked in. Woohoo, right?

Now, finally let’s put this all together and bundle all of what I would recommend as Marten best practices into the new…

Optimized Artifact Workflow

New in Marten V5 is what I named the “optimized artifact workflow” (I say “I” because I don’t think other folks like the name:)) as shown below:

using var host = Host.CreateDefaultBuilder()
    .ConfigureServices(services =>
    {
        services
            .AddMarten(opts =>
            {
                opts.Connection("connection string");
            })
            // This is the call you want!
            .OptimizeArtifactWorkflow(TypeLoadMode.Static)
            .ApplyAllDatabaseChangesOnStartup();
    })
    
    // In testing harnesses, or with AWS Lambda / Azure Functions,
    // you may have to help out .Net by explicitly setting
    // the main application assembly
    .UseApplicationProject(typeof(User).Assembly)
    
    .StartAsync();

With the OptimizeArtifaceWorkflow(TypeLoadMode.Static) usage above, Marten is running with automatic database management and “Auto” code generation if the host’s environment name is “Development” as it would typically be on a local developer box. In “Production” mode, Marten is running with all automatic database management disabled at runtime beside the initial database change application at startup. In “Production” mode, Marten is also turning off all dynamic code generation with the assumption that all necessary types can be found in the entry assembly.

The goal here was to have a quick setting that optimized Marten usage in both development and production time without having to add in a bunch of nested conditional logic for IHostEnvironment.IsDevelopment() throughout the IHost configuration code.

Exterminating Sync over Async Calls

Back to the very original sample code:

var store = DocumentStore.For("connection string");

await using var session = store.LightweightSession();
var user = new User
{
    UserName = "pmahomes", 
    FirstName = "Patrick", 
    LastName = "Mahomes"
};

session.Store(user);
await session.SaveChangesAsync();

In Marten V4, the first call to session.Store(user) would trigger the database schema detection, which behind the scenes would end up doing a .GetAwaiter().GetResult() trick to call asynchronous code within the synchronous Store() command (not gonna get into that here, but we eliminated all synchronous database schema detection functionality for unrelated reasons in V4).

In V5, we rewired a lot of the internal guts such that the database schema detection is happening instead in the call to IDocumentSession.SaveChangesAsync(), which is of course, asynchronous. That allowed us to eliminate usages of “sync over async” calls. Likewise, we made similar changes throughout other areas of Marten.

Summary

The hope here is that we can make our users be more successful with Marten, and side step the problems our users have had specifically with using Marten with AWS Lambda, Azure Functions, Blazor, and inside of WPF applications. I’m also hoping that the OptimizedArtifactWorkflow() usage greatly simplifies the usage of Marten “best practices.”