Command Line Diagnostics in Wolverine

Wolverine 0.9.12 just went up on Nuget with new bug fixes, documentation improvements, improved Rabbit MQ usage of topics, local queue usage, and a lot of new functionality around the command line diagnostics. See the whole release notes here.

In this post, I want to zero into “command line diagnostics.” Speaking from a mix of concerns about being both a user of Wolverine and one of the people needing to support other people using Wolverine online, here’s a not exhaustive list of real challenges I’ve already seen or anticipate as Wolverine gets out into the wild more in the near future:

  • How is Wolverine configured? What extensions are found?
  • What middleware is registered, and is it hooked up correctly?
  • How is Wolverine handling a specific message exactly?
  • How is Wolverine HTTP handling an HTTP request for a specific route?
  • Is Wolverine finding all the handlers? Where is it looking?
  • Where is Wolverine trying to send each message?
  • Are we missing any configuration items? Is the database reachable? Is the url for a web service proxy in our application valid?
  • When Wolverine has to interact with databases or message brokers, are those servers configured correctly to run the application?

That’s a big list of potentially scary issues, so let’s run down a list of command line diagnostic tools that come out of the box with Wolverine to help developers be more productive in real world development. First off, Wolverine’s command line support is all through the Oakton library, and you’ll want to enable Oakton command handling directly in your main application through this line of code at the very end of a typical Program file:

// This is an extension method within Oakton
// And it's important to relay the exit code
// from Oakton commands to the command line
// if you want to use these tools in CI or CD
// pipelines to denote success or failure
return await app.RunOaktonCommands(args);

You’ll know Oakton is configured correctly if you’ll just go to the command line terminal of your preference at the root of your project and type:

dotnet run -- help

In a simple Wolverine application, you’d get these options out of the box:

The available commands are:
                                                                                                    
  Alias       Description                                                                           
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
  check-env   Execute all environment checks against the application                                
  codegen     Utilities for working with JasperFx.CodeGeneration and JasperFx.RuntimeCompiler       
  describe    Writes out a description of your running application to either the console or a file  
  help        List all the available commands                                                       
  resources   Check, setup, or teardown stateful resources of this system                           
  run         Start and run this .Net application                                                   
  storage     Administer the Wolverine message storage                                                       
                                                                                                    

Use dotnet run -- ? [command name] or dotnet run -- help [command name] to see usage help about a specific command

Let me admit that there’s a little bit of “magic” in the way that Wolverine uses naming or type conventions to “know” how to call into your application code. It’s great (in my opinion) that Wolverine doesn’t force you to pollute your code with framework concerns or require you to shape your code around Wolverine’s APIs the way most other .NET frameworks do.

Cool, so let’s move on to…

Describe the Configured Application

The annoying –framework flag is only necessary if your application targets multiple .NET frameworks, but no sane person would ever do that for a real application.

Partially for my own sanity, there’s a lot more support for Wolverine in the describe command. To see this in usage, consider the sample DiagnosticsApp from the Wolverine codebase. If I use the dotnet run --framework net7.0 -- describe command from that project, I get this copious textual output.

Just to summarize, what you’ll see in the command line report is:

  • “Wolverine Options” – the basics properties as configured, including what Wolverine thinks is the application assembly and any registered extensions
  • “Wolverine Listeners” – a tabular list of all the configured listening endpoints, including local queues, within the system and information about how they are configured
  • “Wolverine Message Routing” – a tabular list of all the message routing for known messages published within the system
  • “Wolverine Sending Endpoints” – a tabular list of all known, configured endpoints that send messages externally
  • “Wolverine Error Handling” – a preview of the active message failure policies active within the system
  • “Wolverine Http Endpoints” – shows all Wolverine HTTP endpoints. This is only active if WolverineFx.HTTP is used within the system

The latest Wolverine did add some optional message type discovery functionality specifically to make this describe command be more usable by letting Wolverine know about more message types that will be sent at runtime, but cannot be easily recognized as such strictly from configuration using a mix of marker interface types and/or attributes:

// These are all published messages that aren't
// obvious to Wolverine from message handler endpoint
// signatures
public record InvoiceShipped(Guid Id) : IEvent;
public record CreateShippingLabel(Guid Id) : ICommand;

[WolverineMessage]
public record AddItem(Guid Id, string ItemName);

Environment Checks

Have you ever made a deployment to production just to find out that a database connection string was wrong? Or the credentials to a message broker were wrong? Or your service wasn’t running under an account that had read access to a file share your application needed to scan? Me too!

Wolverine adds several environment checks so that you can use Oakton’s Environment Check functionality to self-diagnose potential configuration issues with:

dotnet run -- check-env

You could conceivably use this as part of your continuous delivery pipeline to quickly verify the application configuration for an application and fail fast & roll back if the checks fail.

How is Wolverine calling my message handlers?!?

Wolverine admittedly involves some “magic” about how it calls into your message handlers, and it’s not unlikely you may be confused about whether or how some kind of registered middleware is working within your system. Or maybe you’re just mildly curious about how Wolverine works at all.

To that end, you can preview — or just generate ahead of time for better “cold starts” — the dynamic source code that Wolverine generates for your message handlers or HTTP handlers with:

dotnet run -- codegen preview

Or just write the code to the file system so you can look at it to your heart’s content with your IDE with:

dotnet run -- codegen write

Which should write the source code files to /Internal/Generated/WolverineHandlers. Here’s a sample from the same diagnostics app sample:

// <auto-generated/>
#pragma warning disable

namespace Internal.Generated.WolverineHandlers
{
    public class CreateInvoiceHandler360502188 : Wolverine.Runtime.Handlers.MessageHandler
    {


        public override System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime.MessageContext context, System.Threading.CancellationToken cancellation)
        {
            var createInvoice = (IntegrationTests.CreateInvoice)context.Envelope.Message;
            var outgoing1 = IntegrationTests.CreateInvoiceHandler.Handle(createInvoice);
            // Outgoing, cascaded message
            return context.EnqueueCascadingAsync(outgoing1);

        }

    }
}

Database or Message Broker Setup

Your application will require some configuration of external resources if you’re using any mix of Wolverine’s transactional inbox/outbox support which targets Postgresql or Sql Server or message brokers like Rabbit MQ, Amazon SQS, or Azure Service Bus. Not to worry (too much), Wolverine exposes some command line support for making any necessary configuration setup in these resources with the Oakton resources command.

In the diagnostics app, we could ensure that our connected Postgresql database has all the necessary schema tables and the Rabbit MQ broker has all the necessary queues, exchanges, and bindings that out application needs to function with:

dotnet run -- resources setup

In testing or normal development work, I may also want to reset the state of these resources to delete now obsolete messages in either the database or the Rabbit MQ queues, and we can fortunately do that with:

dotnet run -- resources clear

There are also resource options for:

  • teardown — remove all the database objects or message broker objects that the Wolverine application placed there
  • statistics — glean some information about the number of records or messages in the stateful resources
  • check — do environment checks on the configuration of the stateful resources. This is purely a diagnostic function
  • list — just show you information about the known, stateful resources

Summary

Is any of this wall of textual reports being spit out at the command line sexy? Not in the slightest. Will this functionality help development teams be more productive with Wolverine? Will this functionality help myself and other Wolverine team members support remote users in the future? I’m hopeful that the answer to the first question is “yes” and pretty confident that it’s a “hell, yes” to the second question.

I would also hope that folks see this functionality and agree with my assessment that Wolverine (and Marten) are absolutely appropriate for real life usage and well beyond the toy project phase.

Anyway, more on Wolverine next week starting with an exploration of Wolverine’s local queuing support for asynchronous processing.

Wolverine’s New HTTP Endpoint Model

UPDATE: If you pull down the sample code, it’s not quite working with Swashbuckle yet. It *does* publish the metadata and the actual endpoints work, but it’s not showing up in the OpenAPI spec. Always something.

I just published Wolverine 0.9.10 to Nuget (after a much bigger 0.9.9 yesterday). There’s several bug fixes, some admitted breaking changes to advanced configuration items, and one significant change to the “mediator” behavior that’s described at the section at the very bottom of this post.

The big addition is a new library that enables Wolverine’s runtime model directly for HTTP endpoints in ASP.Net Core services without having to jump through the typical sequence of delegating directly from a Minimal API method directly to Wolverine’s mediator functionality like this:

app.MapPost("/items/create", (CreateItemCommand cmd, IMessageBus bus) => bus.InvokeAsync(cmd));

app.MapPost("/items/create2", (CreateItemCommand cmd, IMessageBus bus) => bus.InvokeAsync<ItemCreated>(cmd));

Instead, Wolverine now has the WolverineFx.Http library to directly use Wolverine’s runtime model — including its unique middleware approach — directly from HTTP endpoints.

Shamelessly stealing the Todo sample application from the Minimal API documentation, let’s build a similar service with WolverineFx.Http, but I’m also going to switch to Marten for persistence just out of personal preference.

To bootstrap the application, I used the dotnet new webapi model, then added the WolverineFx.Marten and WolverineFx.HTTP nugets. The application bootstrapping for basic integration of Wolverine, Marten, and the new Wolverine HTTP model becomes:

using Marten;
using Oakton;
using Wolverine;
using Wolverine.Http;
using Wolverine.Marten;

var builder = WebApplication.CreateBuilder(args);

// Adding Marten for persistence
builder.Services.AddMarten(opts =>
    {
        opts.Connection(builder.Configuration.GetConnectionString("Marten"));
        opts.DatabaseSchemaName = "todo";
    })
    .IntegrateWithWolverine()
    .ApplyAllDatabaseChangesOnStartup();

// Wolverine usage is required for WolverineFx.Http
builder.Host.UseWolverine(opts =>
{
    // This middleware will apply to the HTTP
    // endpoints as well
    opts.Policies.AutoApplyTransactions();
    
    // Setting up the outbox on all locally handled
    // background tasks
    opts.Policies.UseDurableLocalQueues();
});

// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

var app = builder.Build();

// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

// Let's add in Wolverine HTTP endpoints to the routing tree
app.MapWolverineEndpoints();

return await app.RunOaktonCommands(args);

Do note that the only thing in that sample that pertains to WolverineFx.Http itself is the call to IEndpointRouteBuilder.MapWolverineEndpoints().

Let’s move on to “Hello, World” with a new Wolverine http endpoint from this class we’ll add to the sample project:

public class HelloEndpoint
{
    [WolverineGet("/")]
    public string Get() => "Hello.";
}

At application startup, WolverineFx.Http will find the HelloEndpoint.Get() method and treat it as a Wolverine http endpoint with the route pattern GET: / specified in the [WolverineGet] attribute.

As you’d expect, that route will write the return value back to the HTTP response and behave as specified by this Alba specification:

[Fact]
public async Task hello_world()
{
    var result = await _host.Scenario(x =>
    {
        x.Get.Url("/");
        x.Header("content-type").SingleValueShouldEqual("text/plain");
    });
    
    result.ReadAsText().ShouldBe("Hello.");
}

Moving on to the actual Todo problem domain, let’s assume we’ve got a class like this:

public class Todo
{
    public int Id { get; set; }
    public string? Name { get; set; }
    public bool IsComplete { get; set; }
}

In a sample class called TodoEndpoints let’s add an HTTP service endpoint for listing all the known Todo documents:

[WolverineGet("/todoitems")]
public static Task<IReadOnlyList<Todo>> Get(IQuerySession session) 
    => session.Query<Todo>().ToListAsync();

As you’d guess, this method will serialize all the known Todo documents from the database into the HTTP response and return a 200 status code. In this particular case the code is a little bit noisier than the Minimal API equivalent, but that’s okay, because you can happily use Minimal API and WolverineFx.Http together in the same project. WolverineFx.Http, however, will shine in more complicated endpoints.

Consider this endpoint just to return the data for a single Todo document:

// Wolverine can infer the 200/404 status codes for you here
// so there's no code noise just to satisfy OpenAPI tooling
[WolverineGet("/todoitems/{id}")]
public static Task<Todo?> GetTodo(int id, IQuerySession session, CancellationToken cancellation) 
    => session.LoadAsync<Todo>(id, cancellation);

At this point it’s effectively de rigueur for any web service to support OpenAPI documentation directly in the service. Fortunately, WolverineFx.Http is able to glean most of the necessary metadata to support OpenAPI documentation with Swashbuckle from the method signature up above. The method up above will also cleanly set a status code of 404 if the requested Todo document does not exist.

Now, the bread and butter for WolverineFx.Http is using it in conjunction with Wolverine itself. In this sample, let’s create a new Todo based on submitted data, but also publish a new event message with Wolverine to do some background processing after the HTTP call succeeds. And, oh, yeah, let’s make sure this endpoint is actively using Wolverine’s transactional outbox support for consistency:

[WolverinePost("/todoitems")]
public static async Task<IResult> Create(CreateTodo command, IDocumentSession session, IMessageBus bus)
{
    var todo = new Todo { Name = command.Name };
    session.Store(todo);

    // Going to raise an event within out system to be processed later
    await bus.PublishAsync(new TodoCreated(todo.Id));
    
    return Results.Created($"/todoitems/{todo.Id}", todo);
}

The endpoint code above is automatically enrolled in the Marten transactional middleware by simple virtue of having a dependency on Marten’s IDocumentSession. By also taking in the IMessageBus dependency, WolverineFx.Http is wrapping the transactional outbox behavior around the method so that the TodoCreated message is only sent after the database transaction succeeds.

Lastly for this page, consider the need to update a Todo from a PUT call. Your HTTP endpoint may vary its handling and response by whether or not the document actually exists. Just to show off Wolverine’s “composite handler” functionality and also how WolverineFx.Http supports middleware, consider this more complex endpoint:

public static class UpdateTodoEndpoint
{
    public static async Task<(Todo? todo, IResult result)> LoadAsync(UpdateTodo command, IDocumentSession session)
    {
        var todo = await session.LoadAsync<Todo>(command.Id);
        return todo != null 
            ? (todo, new WolverineContinue()) 
            : (todo, Results.NotFound());
    }

    [WolverinePut("/todoitems")]
    public static void Put(UpdateTodo command, Todo todo, IDocumentSession session)
    {
        todo.Name = todo.Name;
        todo.IsComplete = todo.IsComplete;
        session.Store(todo);
    }
}

In the WolverineFx.Http model, any bit of middleware that returns an IResult object is tested by the generated code to execute any IResult object returned from middleware that is not the built in WolverineContinue type and stop all further processing. This is intended to enable validation or authorization type middleware where you may need to filter calls to the inner HTTP handler.

With the sample application out of the way, here’s a rundown of the significant things about this library:

  • It’s actually a pretty small library in the greater scheme of things and all it really does is connect ASP.Net Core’s endpoint routing to the Wolverine runtime model — and Wolverine’s runtime model is likely going to be somewhat more efficient than Minimal API and much more efficient that MVC Core
  • It can be happily combined with Minimal API, MVC Core, or any other ASP.Net Core model that exploits endpoint routing, even within the same application
  • Wolverine is allowing you to use the Minimal API IResult model
  • The JSON serialization is strictly System.Text.Json and uses the same options as Minimal API within an ASP.Net Core application
  • It’s possible to use Wolverine middleware strategy with the HTTP endpoints
  • Wolverine is trying to glean necessary metadata from the method signatures to feed OpenAPI usage within ASP.Net Core without developers having to jump through hoops adding attributes or goofy TypedResult noise code just for Swashbuckle
  • This model plays nicely with Wolverine’s transactional outbox model for common cases where you need to both make database changes and publish additional messages for background processing in the same HTTP call. That’s a bit of important functionality that I feel is missing or is clumsy at best in many leading .NET server side technologies.

For the handful of you reading this that still remember FubuMVC, Wolverine’s HTTP model retains some of FubuMVC’s old strengths in terms of still not ramming framework concerns into your application code, but learned some hard lessons from FubuMVC’s ultimate failure:

  • FubuMVC was an ambitious, sprawling framework that was trying to be its own ecosystem with its own bootstrapping model, logging abstractions, and even IoC abstractions. WolverineFx.Http is just a citizen within the greater ASP.Net Core ecosystem and uses common .NET abstractions, concepts, and idiomatic naming conventions at every possible turn
  • FubuMVC relied too much on conventions, which was great when the convention was exactly what you needed, and kinda hurtful when you needed something besides the exact conventions. Not to worry, WolverineFx.Http let’s you drop right down to the HttpContext level at will or use any of the IResult objects in existing ASP.Net Core whenever the Wolverine conventions don’t fit.
  • FubuMVC could technically be used with old ASP.Net MVC, but it was a Frankenstein’s monster to pull off. Wolverine can be mixed and matched at will with either Minimal API, MVC Core, or even other OSS projects that exploit ASP.Net Core endpoint routing.
  • Wolverine is trying to play nicely in terms of OpenAPI metadata and security related metadata for usage of standard ASP.Net Core middleware like the authorization or authentication middleware
  • FubuMVC’s “Behavior” model gave you a very powerful “Russian Doll” middleware ability that was maximally flexible — and also maximally inefficient in runtime. Wolverine’s runtime model takes a very different approach to still allow for the “Russian Doll” flexibility, but to do so in a way that is more efficient at runtime than basically every other commonly used framework today in the .NET community.
  • When things went boom in FubuMVC, you got monumentally huge stack traces that could overwhelm developers who hadn’t had a week’s worth of good night sleeps. It sounds minor, but Wolverine is valuable in the sense that the stack traces from HTTP (or message handler) failures will have very minimal Wolverine related framework noise in the stack trace for easier readability by developers.

Big Change to In Memory Mediator Model

I’ve been caught off guard a bit by how folks have mostly been interested in Wolverine as an alternative to MediatR with typical usage like this where users just delegate to Wolverine in memory within a Minimal API route:

app.MapPost("/items/create2", (CreateItemCommand cmd, IMessageBus bus) => bus.InvokeAsync<ItemCreated>(cmd));

With the corresponding message handler being this:

public class ItemHandler
{
    // This attribute applies Wolverine's EF Core transactional
    // middleware
    [Transactional]
    public static ItemCreated Handle(
        // This would be the message
        CreateItemCommand command,

        // Any other arguments are assumed
        // to be service dependencies
        ItemsDbContext db)
    {
        // Create a new Item entity
        var item = new Item
        {
            Name = command.Name
        };

        // Add the item to the current
        // DbContext unit of work
        db.Items.Add(item);

        // This event being returned
        // by the handler will be automatically sent
        // out as a "cascading" message
        return new ItemCreated
        {
            Id = item.Id
        };
    }
}

Prior to the latest release, the ItemCreated event in the handler above when used from IMessageBus.InvokeAsync<ItemCreated>() was not published as a message because my original assumption was that in that case you were using the return value explicitly as a return value. Early users have been surprised that the ItemCreated was not published as a message, so I just changed the behavior to do so to make the cascading message behavior be more consistent and what folks seem to actually want.

New Wolverine Release & Future Plans

After plenty of keystone cops shenanigans with CI automation today that made me question my own basic technical competency, there’s a new Wolverine 0.9.8 release on Nuget today with a variety of fixes and some new features. The documentation website was also re-published.

First, some thanks:

  • Wojtek Suwala made several fixes and improvements to the EF Core integration
  • Ivan Milosavljevic helped fix several hanging tests on CI, built the MemoryPack integration, and improved the FluentValidation integration
  • Anthony made his first OSS contribution (?) to help fix quite a few issues with the documentation
  • My boss and colleague Denys Grozenok for all his support with reviewing docs and reporting issues
  • Kebin for improving the dead letter queue mechanics

The highlights:

Dogfooding baby!

Conveniently enough, I’m part of a little officially sanctioned skunkworks team at work experimenting with converting a massive distributed monolithic application to the full Marten + Wolverine “critter stack.” I’m very encouraged by the effort so far, and it’s driven some recent features in Wolverine’s execution model to handle complexity in enterprise systems. More on that soon.

It’s also pushing the story for interoperability with NServiceBus on the other end of Rabbit MQ queues. Strangely enough, no one is interested in trying to convert a humongous distributed system to Wolverine in one round of work. Go figure.

When will Wolverine hit 1.0?

There’s a little bit of awkwardness in that Marten V6.0 (don’t worry, that’s a much smaller release than 4/5) needs to be released first and I haven’t been helping Oskar & Babu with that recently, but I think we’ll be able to clear that soon.

My “official” plan is to finish the documentation website by the end of February and make the 1.0 release by March 1st. Right now, Wolverine is having its tires kicked by plenty of early users and there’s plenty of feedback (read: bugs or usability issues) coming in that I’m trying to address quickly. Feature wise, the only things I’m hoping to have done by 1.0 are:

  • Using more native capabilities of Azure Service Bus, Rabbit MQ, and AWS SQS for dead letter queues and delayed messaging. That’s mostly to solidify some internal abstractions.
  • It’s a stretch goal, but have Wolverine support Marten’s multi-tenancy through a database per tenant strategy. We’ll want that for internal MedeAnalytics usage, so it might end up being a priority
  • Some better integration with ASP.Net Core Minimal API

Marten and Friend’s (Hopefully) Big Future!

Marten was conceived and launched way back in 2016 as an attempt to quickly improve the performance and stability of a mission critical web application by utilizing Postgresql and its new JSON capabilities as a replacement for a 3rd party document database – and do that in a hurry before the next busy season. My former colleagues and I did succeed in that endeavor, but more importantly for the longer run, Marten was also launched as an open source project on GitHub and quickly attracted attention from other developers. The addition of an originally small feature set for event sourcing dramatically increased interest and participation in Marten. 

Fast forward to today, and we have a vibrant community of engaged users and a core team of contributors that are constantly improving the tool and discussing ideas about how to make it even better. The giant V4 release last year brought an overhaul of almost all the library internals and plenty of new capabilities. V5 followed early in 2022 with more multi-tenancy options and better tooling for development lifecycles and database management based on early issues with V4. 

At this point, I’d list the strong points of Marten that we’ve already achieved as:

  • A very useful document database option that provides the powerful developer productivity you expect from NoSQL solutions while also supporting a strong consistency model that’s usually missing from NoSQL databases. 
  • A wide range of viable hosting options by virtue of being on top of Postgresql. No cloud vendor lock-in with Marten!
  • Quite possibly the easiest way to build an application using Event Sourcing in .NET with both event storage and user defined view projections in the box
  • A great local development story through the simple ability to run Postgresql in a Docker container and Marten’s focus on an “it just works” style database schema management subsystem
  • The aforementioned core team and active user base makes Marten a viable OSS tool for teams wanting some reassurance that Marten is going to be well supported in the future

Great! But now it’s time to talk about the next steps we’re planning to take Marten to even greater heights in the forthcoming Marten V6 that’s being planned now. The overarching theme is to remove the most common hurdles for not choosing Marten. By and large, I think the biggest themes for Marten are:

  1. Scalability, so Marten can be used for much larger data sets. From user feedback, Marten is able to handle data sets of 10 million events today, but there’s opportunities to go far, far larger than that.
  2. Improvements to operational support. Database migrations when documents change, rebuilding projections without downtime, usage metrics, and better support for using multiple databases for multi-tenancy
  3. Marten is in good shape as a purely storage option for Event Sourcing, but users are very often asking for an array of subscription options to propagate events captured by Marten
  4. More powerful options for aggregating event data into more complex projected views
  5. Improving the Linq and other querying support is a seemingly never-ending battle
  6. The lack of professional support for Marten. Obviously a lot of shops and teams are perfectly comfortable with using FOSS tools knowing that they may have to roll up their sleeves and pitch in with support, but other shops are not comfortable with this at all and will not allow FOSS usage for critical functions. More on this later.

First though, Marten is getting a new “critter” friend in the larger JasperFx project family:

Wolverine is a new/old OSS command bus and messaging tool for .NET. It’s what was formerly being developed as Jasper, but the Marten team decided to rebrand the tool as a natural partner with Marten (both animals plus Weasel are members of the Mustelidae family). While both Marten and Wolverine are happily usable without each other, we think that the integration of these tools gives us the opportunity to build a full fledged platform for building applications in .NET using a CQRS architecture with Event Sourcing. Moreover, we think there’s a significant gap in .NET for this kind of tooling and we hope to fill that. 

So, onto future plans…

There’s a couple immediate ways to improve the scalability of Marten we’re planning to build in Marten V6. The first idea is to utilize Postgresql table sharding in a couple different ways. 

First, we can enable sharding on document tables based on user defined criteria through Marten configuration. The big challenge there is to provide a good migration strategy for doing this as it requires at least a 3 step process of copying the existing table data off to the side before creating the new tables. 

The next idea is to shard the event storage tables as well, with the immediate idea being to shard off of archived status to effectively create a “hot” storage of recent events and a “cold” storage of older events that are much less frequently accessed. This would allow Marten users to keep the active “hot” event storage to a much smaller size and therefore greatly improve potential performance even as the database continues to grow.

We’re not done “sharding” yet, but this time we need to shift to the asynchronous projection support in Marten. The core team has some ideas to improve the throughput of the asynchronous projection code as it is, but today it’s limited to only running on one single application node with “hot/cold” rollover support. With some help from Wolverine, we’re hoping to build a “sharded” asynchronous projection that can shard the processing of single projections and distribute the projection work across potentially many nodes as shown in the following diagram:

The asynchronous projection sharding is going to be a big deal for Marten all by itself, but there’s some other potentially big wins for Marten V6 with better tooling for projection rebuilds and asynchronous projections in general:

  1. Some kind of user interface to monitor and manage the asynchronous projections
  2. Faster projection rebuilds
  3. Zero downtime projection rebuilds

Marten + Wolverine == “Critter Stack” 

Again, both Marten and Wolverine will be completely usable independently, but we think there’s some potential synergy through the combination. One of the potential advantages of combining the tools is to use Wolverine’s messaging to give Marten a full fledged subscription model for Marten events. All told we’re planning three different mechanisms for propagating Marten events to the rest of your system:

  1. Through Wolverine’s transactional outbox right at the point of event capture when you care more about immediate delivery than strict ordering (this is already working)
  2. Through Martens asynchronous daemon when you do need strict ordering
  3. If this works out, through CDC event streaming straight from the database to Kafka/Pulsar/Kinesis

That brings me to the last topic I wanted to talk about in this post. Marten and Wolverine in their current form will remain FOSS under the MIT license, but it’s past time to make a real business out of these tools.

I don’t know how this is exactly going to work out yet, but the core Marten team is actively planning on building a business around Marten and now Wolverine. I’m not sure if this will be the front company, but I personally have formed a new company named “Jasper Fx Software” for my own activity – but that’s going to be limited to just being side work for at least awhile. 

The general idea – so far – is to offer:

  • Support contracts for Marten 
  • Consulting services, especially for help modeling and maximizing the usage of the event sourcing support
  • Training workshops
  • Add on products that add the advanced features I described earlier in this post

Maybe success leads us to offering a SaaS model for Marten, but I see that as a long way down the road.

What think you gentle reader? Does any of this sound attractive? Should we be focusing on something else altogether?

Developing Error Handling Strategies for Asynchronous Messaging

I’m furiously working on what I hope is the last sprint toward a big new Jasper 2.0 release. Part of that work has been a big overhaul of the error handling strategies with an eye toward solving the real world problems I’ve personally experienced over the years doing asynchronous messaging in enterprise applications.

Whether you’re purposely using micro-services, having to integrate with 3rd party systems, or just the team down the hall’s services, it’s almost inevitable that an enterprise system will have to communicate with something else. Or at the very least have a need to do some kind of background processing within the same logical system. For all those reasons, it’s not unlikely that you’ll have to pull in some kind of asynchronous messaging tooling into your system.

It’s also an imperfect world, and despite your best efforts your software systems will occasionally encounter exceptions at runtime. What you really need to do is to plan around potential failures in your application, especially around integration points. Fortunately, your asynchronous messaging toolkit should have a robust set of error handling capabilities baked in — and this is maybe the single most important reason to use asynchronous messaging toolkits like MassTransit, NServiceBus, or the Jasper project I’m involved with rather than trying to roll your own one off message handling code or depend strictly on web communication via web services.

In no particular order, I think you need to have at least these goals in mind:

  • Craft your exception handling in such a way that it will very seldom require manual intervention to recover work in the system.
  • Build in resiliency to transient errors like networking hiccups or database timeouts that are common when systems get overtaxed.
  • Limit your temporal coupling to external systems both within your organization or from 3rd party systems. The point here is that you want your system to be at least somewhat functional even if external dependencies are unavailable. My off the cuff recommendation is to try to isolate calls to an external dependency within a small, atomic command handler so that you have a “retry loop” directly around the interaction with that dependency.
  • Prevent inconsistent state within your greater enterprise systems. I think of this as the “no leaky pipe” rule where you try to avoid any messages getting lost along the way, but it also applies to ordering operations sometimes. To illustrate this farther, consider the canonical example of recording a matching debit and withdrawal transaction between two banking accounts. If you process one operation, you have to do the other as well to avoid system inconsistencies. Asynchronous messaging makes that just a teeny bit harder maybe by introducing eventual consistency into the mix rather than trying to depend on two phase commits between systems — but who’s kidding who here, we’re probably all trying to avoid 2PC transactions like the plague.

Quick Introduction to Jasper

I’m using the Jasper framework for all the error handling samples here. Just to explain the syntax in the code samples, Jasper is configured at bootstrapping time with the JasperOptions type as shown in this sample below:

using var host = Host.CreateDefaultBuilder()
    .UseJasper(opts =>
    {
        opts.Handlers.OnException<TimeoutException>()
        // Just retry the message again on the
        // first failure
        .RetryOnce()

        // On the 2nd failure, put the message back into the
        // incoming queue to be retried later
        .Then.Requeue()

        // On the 3rd failure, retry the message again after a configurable
        // cool-off period. This schedules the message
        .Then.ScheduleRetry(15.Seconds())

        // On the 4th failure, move the message to the dead letter queue
        .Then.MoveToErrorQueue()

        // Or instead you could just discard the message and stop
        // all processing too!
        .Then.Discard().AndPauseProcessing(5.Minutes());
    }).StartAsync();

The exception handling policies are “fall through”, meaning that you probably want to put more specific rules before more generic rules. The rules can also be configured either globally for all message types, or for specific message types. In most of the code snippets the variable opts will refer to the JasperOptions for the application.

More in the brand new docs on error handling in Jasper.

Transient or concurrency errors that hopefully go away?

Assuming that you’ve done enough testing to remove most of the purely functional errors in your system. Once you’ve reached that point, the most common kind of error in my experience with system development is transient errors like:

  • Network timeouts
  • Database connectivity errors, which could be related to network issues or connection exhaustion
  • Concurrent access errors
  • Resource locking issues

For these types of errors, I think I’d recommend some sort of exponential backoff strategy that attempts to retry the message inline, but with an increasingly longer pause in between attempts like so:

// Retry the message again, but wait for the specified time
// The message will be dead lettered if it exhausts the delay
// attempts
opts.Handlers
    .OnException<SqlException>()
    .RetryWithCooldown(50.Milliseconds(), 100.Milliseconds(), 250.Milliseconds());

What you’re doing here is retrying the message a certain number of times, but with a pause to slow down processing in the system to allow for more time for a distressed resource to stabilize before trying again. I’d also recommend this approach for certain types of concurrency exceptions where only one process at a time is allowed to work with a resource (a database row? a file? an event store stream?). This is especially helpful with optimistic concurrency strategies where you might just need to start processing over against the newly changed system state.

I’m leaving it out for the sake of brevity, but Jasper will also let you put a message back into the end of the incoming queue or even schedule the next attempt out of process for a later time.

You shall not pass! (because a subsystem is down)

A few years ago I helped design a series of connected applications in a large banking ecosystem that ultimately transferred money from incoming payments in a flat file to a commercial, off the shelf (COTS) system. The COTS system exposed a web service endpoint we could use for our necessary integrations. Fortunately, we designed the system so that inputs to this service happened in a message handler fed by a messaging queue, so we could retry just the final call to the COTS web service in case of its common transient failures.

Great! Except that what also happened was that this COTS system could very easily be put into an invalid state where it could not handle any incoming transactions. In our then naive “retry all errors up to 3 times then move into a dead letter queue” strategy, literally hundreds of transactions would get retried those three times, spam the hell out of the error logs and production monitoring systems, and all end up in the dead letter queue where a support person would have to manually move them back to the real queue later after the COTS system was fixed.

This is obviously not a good situation. For future projects, Jasper will let you pause all incoming messages from a receiving endpoint (like a message queue) if a particular type of error is encountered like this:

using var host = await Host.CreateDefaultBuilder()
    .UseJasper(opts =>
    {
        // The failing message is requeued for later processing, then
        // the specific listener is paused for 10 minutes
        opts.Handlers.OnException<SystemIsCompletelyUnusableException>()
            .Requeue().AndPauseProcessing(10.Minutes());

    }).StartAsync();

Using that capability above, if you have all incoming requests to use an external web service coming through a single queue and receiving endpoint, you will be able to pause all processing of that queue if you detect an error that implies that the external system is completely invalid, but also try to restart listening later. All without user intervention.

Jasper would also enable you to chain additional actions to take after encountering that exception to send other messages or maybe raise some kind of alert through email or text that the listening has been paused. At the very worst, you could also use some kind of log monitoring tool to raise alerts when it sees the log message from Jasper about a listening endpoint being paused.

Dealing with a distressed resource

All of the other error handling strategies I’ve discussed so far have revolved around a single message. But what if you’re seeing a high percentage of exceptions across all messages for a single endpoint, which may imply that some kind of resource like a database is overloaded?

To that end, we could use a circuit breaker approach to temporarily pause message handling when a high number of exceptions are happening across incoming messages. This might help alleviate the load on the distressed subsystem and allow it to catch up before processing additional messages. That usage in Jasper is shown below:

opts.Handlers.OnException<InvalidOperationException>()
    .Discard();

opts.ListenToRabbitQueue("incoming")
    .CircuitBreaker(cb =>
    {
        // Minimum number of messages encountered within the tracking period
        // before the circuit breaker will be evaluated
        cb.MinimumThreshold = 10;

        // The time to pause the message processing before trying to restart
        cb.PauseTime = 1.Minutes();

        // The tracking period for the evaluation. Statistics tracking
        cb.TrackingPeriod = 5.Minutes();

        // If the failure percentage is higher than this number, trip
        // the circuit and stop processing
        cb.FailurePercentageThreshold = 10;

        // Optional allow list
        cb.Include<SqlException>(e => e.Message.Contains("Failure"));
        cb.Include<SocketException>();

        // Optional ignore list
        cb.Exclude<InvalidOperationException>();
    });
}).StartAsync();

Nope, that message is bad, no soup for you!

Hey, sometimes you’re going to get an exception that implies that the incoming message is invalid and can never be processed. Maybe it applies to a domain object that no longer exists, maybe it’s a security violation. The point being the message can never be processed, so there’s no use in clogging up your system with useless retry attempts. Instead, you want that message shoved out of the way immediately. Jasper gives you two options:

using var host = await Host.CreateDefaultBuilder()
    .UseJasper(opts =>
    {
        // Bad message, get this thing out of here!
        opts.Handlers.OnException<InvalidMessageYouWillNeverBeAbleToProcessException>()
            .Discard();
        
        // Or keep it around in case someone cares about the details later
        opts.Handlers.OnException<InvalidMessageYouWillNeverBeAbleToProcessException>()
            .MoveToErrorQueue();

    }).StartAsync();

Related Topics

Now we come to the point of the post when I’m getting tired and wanting to get this finished, so it’s time to just mention some related concepts for later research.

For the sake of consistency within your distributed system, I think you almost have to be aware of the outbox pattern — and conveniently enough, Jasper has a robust implementation of that pattern. MassTransit also recently added a “real outbox.” I know that NServiceBus has an improved outbox planned, but I don’t have a link handy for that.

Again for consistency within your distributed system, I’d recommend you familiarize yourself with the concept of compensating actions, especially if you’re trying to use eventual consistency in your system and the secondary actions fail.

And lastly, I’m not the world’s foremost expert here, but you really want some kind of system monitoring that detects and alerts folks to distressed subsystems, circuit breakers tripping off, or dead letter queues growing quickly.

Command Line Support for Marten Projections

Marten 5.7 was published earlier this week with mostly bug fixes. The one, big new piece of functionality was an improved version of the command line support for event store projections. Specifically, Marten added support for multi-tenancy through multiple databases and the ability to use separate document stores in one application as part of our V5 release earlier this year, but the projections command didn’t really catch up and support that — but now it can with Marten v5.7.0.

From a sample project in Marten we use to test this functionality, here’s part of the Marten setup that has a mix of asynchronous and inline projections, as well as uses the database per tenant strategy:

services.AddMarten(opts =>
{
    opts.AutoCreateSchemaObjects = AutoCreate.All;
    opts.DatabaseSchemaName = "cli";

    // Note this app uses multiple databases for multi-tenancy
 opts.MultiTenantedWithSingleServer(ConnectionSource.ConnectionString)
        .WithTenants("tenant1", "tenant2", "tenant3");

    // Register all event store projections ahead of time
    opts.Projections
        .Add(new TripAggregationWithCustomName(), ProjectionLifecycle.Async);
    
    opts.Projections
        .Add(new DayProjection(), ProjectionLifecycle.Async);
    
    opts.Projections
        .Add(new DistanceProjection(), ProjectionLifecycle.Async);

    opts.Projections
        .Add(new SimpleAggregate(), ProjectionLifecycle.Inline);

    // This is actually important to register "live" aggregations too for the code generation
    opts.Projections.SelfAggregate<SelfAggregatingTrip>(ProjectionLifecycle.Live);
}).AddAsyncDaemon(DaemonMode.Solo);

At this point, let’s introduce the Marten.CommandLine Nuget dependency to the system just to add Marten related command line options directly to our application for typical database management utilities. Marten.CommandLine brings with it a dependency on Oakton that we’ll actually use as the command line parser for our built in tooling. Using the now “old-fashioned” pre-.NET 6 manner of running a console application, I add Oakton to the system like this:

public static Task<int> Main(string[] args)
{
    // Use Oakton for running the command line
    return CreateHostBuilder(args).RunOaktonCommands(args);
}

When you use the dotnet command line options, just keep in mind that the “–” separator you’re seeing me here is used to separate options directly to the dotnet executable itself on the left from arguments being passed to the application itself on the right of the “–” separator.

Now, turning to the command line at the root of our project, I’m going to type out this command to see the Oakton options for our application:

dotnet run -- help

Which gives us this output:

If you’re wondering, the commands db-apply and marten-apply are synonyms that’s there as to not break older users when we introduced the now, more generic “db” commands.

And next I’m going to see the usage for the projections command with dotnet run -- help projections, which gives me this output:

For the simplest usage, I’m just going to list off the known projections for the entire system with dotnet run -- projections --list:

Which will show us the four registered projections in the main IDocumentStore, and tells us that there are no registered projections in the separate IOtherStore.

Now, I’m just going to continuously run the asynchronous projections for the entire application — while another process is constantly pumping random events into the system so there’s always new work to be doing — with dotnet run -- projections, which will spit out this continuously updating table (with an assist from Spectre.Console):

What I hope you can tell here is that every asynchronous projection is actively running for each separate tenant database. The blue “High Water Mark” is telling us where the current event store for each database is at.

And finally, for the main reason why I tackled the projections command line overhaul last week, folks needed a way to rebuild projections for every database when using a database per tenant strategy.

While the new projections command will happily let you rebuild any combination of database, store, and projection name by flags or even an interactive mode, we can quickly trigger a full rebuild of all the asynchronous projections with dotnet run -- projections --rebuild, which is going to loop through every store and database like so:

For the moment, the rebuild works on all the projections for a single database at a time. I’m sure we’ll attempt some optimizations of the rebuilding process and try to understand how much we can really parallelize more, but for right now, our users have an out of the box way to rebuild projections across separate databases or separate stores.

This *might* be a YouTube video soon just to kick off my new channel for Marten/Jasper/Oakton/Alba/Lamar content.

A Vision for Stateful Resources at Development or Deployment Time

As is not atypical, I found a couple little issues with both Oakton and Jasper in the course of writing this post. To that end, if you want to use the functionality shown here yourself, just make sure you’re on at least Oakton 4.6.1 and Jasper 2.0-alpha-3.

I’ve spit out quite a bit of blogging content the past several weeks on both Marten and Jasper:

I’ve been showing some new integration between Jasper, Marten, and Rabbit MQ. This time out, I want to show the new “stateful resource” model in a third tool named Oakton to remove development and deployment time friction when using these tools on a software project. Oakton itself is a command line processing tool, that more importantly, can be used to quickly add command line utilities directly to your .Net executable.

Drawing from a sample project in the Jasper codebase, here’s the configuration for an issue tracking application that uses Jasper, Marten, and RabbitMQ with Jasper’s inbox/outbox integration using Postgresql:

using IntegrationTests;
using Jasper;
using Jasper.Persistence.Marten;
using Jasper.RabbitMQ;
using Marten;
using MartenAndRabbitIssueService;
using MartenAndRabbitMessages;
using Oakton;
using Oakton.Resources;

var builder = WebApplication.CreateBuilder(args);

builder.Host.ApplyOaktonExtensions();

builder.Host.UseJasper(opts =>
{
    // I'm setting this up to publish to the same process
    // just to see things work
    opts.PublishAllMessages()
        .ToRabbitExchange("issue_events", exchange => exchange.BindQueue("issue_events"))
        .UseDurableOutbox();

    opts.ListenToRabbitQueue("issue_events").UseInbox();

    opts.UseRabbitMq(factory =>
    {
        // Just connecting with defaults, but showing
        // how you *could* customize the connection to Rabbit MQ
        factory.HostName = "localhost";
        factory.Port = 5672;
    });
});

// This is actually important, this directs
// the app to build out all declared Postgresql and
// Rabbit MQ objects on start up if they do not already
// exist
builder.Services.AddResourceSetupOnStartup();

// Just pumping out a bunch of messages so we can see
// statistics
builder.Services.AddHostedService<Worker>();

builder.Services.AddMarten(opts =>
{
    // I think you would most likely pull the connection string from
    // configuration like this:
    // var martenConnectionString = builder.Configuration.GetConnectionString("marten");
    // opts.Connection(martenConnectionString);

    opts.Connection(Servers.PostgresConnectionString);
    opts.DatabaseSchemaName = "issues";

    // Just letting Marten know there's a document type
    // so we can see the tables and functions created on startup
    opts.RegisterDocumentType<Issue>();

    // I'm putting the inbox/outbox tables into a separate "issue_service" schema
}).IntegrateWithJasper("issue_service");

var app = builder.Build();

app.MapGet("/", () => "Hello World!");

// Actually important to return the exit code here!
return await app.RunOaktonCommands(args);

Just to describe what’s going on up above, the .NET code above is going to depend on:

  1. A Postgresql database with the necessary tables and functions that Marten needs to be able to persist issue data
  2. Additional tables in the Postgresql database for persisting the outgoing and incoming messages in the inbox/outbox usage
  3. A Rabbit MQ broker with the necessary exchanges, queues, and bindings for the issue application as it’s configured

In a perfect world, scratch that, in an acceptable world, a developer should be able to start from a fresh clone of this issue tracking codebase and be able to run the system and/or any integration tests locally almost immediately with very minimal friction along the way.

At this point, I’m a big fan of trying to run development infrastructure in Docker where it’s easy to spin things up on demand, and just as quickly shut it all down when you no longer need it. To that end, let’s just say we’ve got a docker-compose.yml file for both Postgresql and Rabbit MQ. Having that, I’ll type docker compose up -d from the command line to spin up both infrastructure elements.

Cool, but now I need to have the database schemas built out with Marten tables and the Jasper inbox/outbox tables plus the Rabbit MQ queues for the application. This is where Oakton and its new “stateful resource” model comes into play. Jasper’s Rabbit MQ plugin and the inbox/outbox storage both expose Oakton’s IStatefulResource interface for easy setup. Likewise, Marten has support for this model as well (in this case it’s just a very slim wrapper around Marten’s longstanding database schema management functionality).

If you’re not familiar with this, the double dash “–” argument in dotnet run helps .NET to know which arguments (“run”) apply to the dotnet executable and the arguments to the right of the “–” that are passed into the application itself.

Opening up the command line terminal of your preference to the root of the project, I type dotnet run -- help to see what options are available in our Jasper application through the usage of Oakton:

There’s a couple commands up there that will help us out with the database management, but I want to focus on the resources command. To that end, I’m going to type dotnet run -- resources list just to see what resources our issue tracker application has:

Just through the configuration up above, the various Jasper elements have registered “stateful resource” adapters to for Oakton for the underlying Marten database, the inbox/outbox data (Envelope Storage above), and Rabbit MQ.

In the next case, I’m going to use dotnet run -- resources check to see if all our infrastructure is configured the way our application needs — and I’m going to do this without first starting the database or the message broker, so this should fail spectacularly!

Here’s the summary output:

If you were to scroll up a bit, you’d see a lot of exceptions thrown describing what’s wrong (helpfully color coded by Spectre.Console) including this one explaining that an expected Rabbit MQ queue is missing:

So that’s not good. No worries though, I’ll start up the docker containers, then go back to the command line and type:

dotnet run -- resources setup

And here’s some of the output:

Forget the command line…

If you’ll notice the single line of code `builder.Services.AddResourceSetupOnStartup();` in the bootstrapping code above, that’s adding a hosted service to our application from Oakton that will verify and apply all configured set up to the known Marten database, the inbox/outbox storage, and the required Rabbit MQ objects. No command line chicanery necessary. I’m hopeful that this will enable developers to be more productive by dealing with this kind of environmental setup directly inside the application itself rather than recreating the definition of what’s necessary in external scripts.

This was a fair amount of work, so I’d be very welcome to any kind of feedback here.

Using Rabbit MQ with Jasper

I’ve spit out quite a bit of blogging content the past several weeks on both Marten and Jasper:

As a follow up today, I’d like to introduce Rabbit MQ integration with Jasper as the actual transport between processes. I’m again going to use a “Ping/Pong” sample of sending messages between two processes (you may want to refer to my previous ping/pong post). You can find the sample code for this post on GitHub.

Sending messages betwixt processes

The message types and Jasper handlers are basically identical to those in my last post if you want a reference. In the Pinger application, I’ve added a reference to the Jasper.RabbitMQ Nuget library, which adds transitive references to Jasper itself and the Rabbit MQ client library. In the application bootstrapping, I’ve got this to connect to Rabbit MQ, send Ping messages to a Rabbit MQ exchange named *pings*, and add a hosted service just to send a new Ping message once a second:

using System.Net.NetworkInformation;
using Jasper;
using Jasper.RabbitMQ;
using Oakton;
using Pinger;

return await Host.CreateDefaultBuilder(args)
    .UseJasper(opts =>
    {
        // Listen for messages coming into the pongs queue
        opts
            .ListenToRabbitQueue("pongs")

            // This won't be necessary by the time Jasper goes 2.0
            // but for now, I've got to help Jasper out a little bit
            .UseForReplies();

        // Publish messages to the pings queue
        opts.PublishMessage<Ping>().ToRabbitExchange("pings");

        // Configure Rabbit MQ connection properties programmatically
        // against a ConnectionFactory
        opts.UseRabbitMq(rabbit =>
        {
            // Using a local installation of Rabbit MQ
            // via a running Docker image
            rabbit.HostName = "localhost";
        })
            // Directs Jasper to build any declared queues, exchanges, or
            // bindings with the Rabbit MQ broker as part of bootstrapping time
            .AutoProvision();

        // This will send ping messages on a continuous
        // loop
        opts.Services.AddHostedService<PingerService>();
    }).RunOaktonCommands(args);

On the Ponger side, I’ve got this setup:

using Baseline.Dates;
using Jasper;
using Jasper.RabbitMQ;
using Oakton;

return await Host.CreateDefaultBuilder(args)
    .UseJasper(opts =>
    {
        // Going to listen to a queue named "pings", but disregard any messages older than
        // 15 seconds
        opts.ListenToRabbitQueue("pings", queue => queue.TimeToLive(15.Seconds()));

        // Configure Rabbit MQ connections and optionally declare Rabbit MQ
        // objects through an extension method on JasperOptions.Endpoints
        opts.UseRabbitMq() // This is short hand to connect locally
            .DeclareExchange("pings", exchange =>
            {
                // Also declares the queue too
                exchange.BindQueue("pings");
            })
            .AutoProvision()

            // Option to blow away existing messages in
            // all queues on application startup
            .AutoPurgeOnStartup();
    })
    .RunOaktonCommands(args);

When running the applications side by side, I’ll get output like this from Pinger:

Got pong #55
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message PongMessage#01818741-30c2-4ab7-9fa8-d7870d194754 from rabbitmq://queue/pings
info: Jasper.Runtime.JasperRuntime[204]
      Sending agent for rabbitmq://exchange/pings has resumed
Got pong #56
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message PongMessage#01818741-34b3-4d34-910a-c9fc4c191bfc from rabbitmq://queue/pings
info: Jasper.Runtime.JasperRuntime[204]
      Sending agent for rabbitmq://exchange/pings has resumed

and this from Ponger:

Got ping #57
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message PingMessage#01818741-389e-4ddd-96cf-fb8a76e4f5f1 from rabbitmq://queue/pongs
info: Jasper.Runtime.JasperRuntime[204]
      Sending agent for rabbitmq://queue/pongs has resumed
Got ping #58
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message PingMessage#01818741-3c8e-469f-a330-e8c9e173dc40 from rabbitmq://queue/pongs
info: Jasper.Runtime.JasperRuntime[204]
      Sending agent for rabbitmq://queue/pongs has resumed

and of course, since Jasper is a work in progress, there’s a bunch of erroneous or at least misleading messages about a sending agent getting resumed that I need to take care of…

So what’s in place right now that can be inferred from the code samples above? The Jasper + Rabbit MQ integration today provides:

  • A way to subscribe to incoming messages from a Rabbit MQ queue. It’s not shown here, but you have all the options you’d expect to configure Rabbit MQ prefetch counts, message handling parallelism, and opt into persistent, inbox messaging
  • Mechanisms to declare Rabbit MQ queues, exchanges, and bindings as well as the ability to fine tune Rabbit MQ options for these objects
  • Create declared Rabbit MQ objects at system startup time
  • Automatically purge old messages out of Rabbit MQ queues on start up. It’s not shown here, but you can do this on a queue by queue basis as well
  • Create publishing rules from a Jasper process to direct outgoing messages to Rabbit MQ exchanges or directly to queues

What’s also available but not shown here is opt in, conventional routing to route outgoing messages to Rabbit MQ exchanges based on the message type names or to automatically declare and configure subscriptions to Rabbit MQ queues based on the message types of the messages that this process handles. This conventional routing is suspiciously similar (copied from) MassTransit because:

  1. I like MassTransit’s conventional routing functionality
  2. Bi-directional wire compatibility mostly out of the box with MassTransit is a first class goal for Jasper 2.0

Why Rabbit MQ? What about other things?

Jasper has focused on Rabbit MQ as the main transport option out of the box because that’s what my shop already uses (and I’m most definitely trying to get us to use Jasper at work), it has a great local development option through Docker hosting, and frankly because it’s easy to use. Jasper also supports Pulsar of all things, and will definitely have a Kafka integration before 2.0. Other transports will be added based on user requests, and that’s also a good place for other folks to get involved.

I had someone asking about using Jasper with Amazon SQS, and that’s such a simple model, that I might build that out as a reference for other folks.

What’s up next?

Now that I’ve introduced Jasper, its outbox functionality, and its integration with Rabbit MQ, my next post is visiting the utilities baked into Jasper itself for dealing with stateful resources like databases or broker configuration at development and production time. This is going to include a lot of command line functionality baked into your application.

A Vision for Low Ceremony CQRS with Event Sourcing

Let me just put this stake into the ground. The combination of Marten and Jasper will quickly become the most productive and lowest ceremony tooling for event sourcing and CQRS architectures on the .NET stack.

And for any George Strait fans out there, this song may be relevant to the previous paragraph:)

CQRS and Event Sourcing

Just to define some terminology, the Command Query Responsibility Separation (CQRS) pattern was first described by Greg Young as an approach to software architecture where you very consciously separate “writes” that change the state of the system from “reads” against the system state. The key to CQRS is to use separate models of the system state for writing versus querying. That leads to something I think of as the “scary view of CQRS”:

The “scary” view of CQRS

The “query database” in my diagram is meant to be an optimized storage for the queries needed by the system’s clients. The “domain model storage” is some kind of persisted storage that’s optimized for writes — both capturing new data and presenting exactly the current system state that the command handlers would need in order to process or validate incoming commands.

Many folks have a visceral, negative reaction to this kind of CQRS diagram as it does appear to be more complexity than the more traditional approach where you have a single database model — almost inevitably in a relational database of some kind — and both reads and writes work off the same set of database tables. Hell, I walked out of an early presentation by Greg Young about what later became to be known as CQRS at QCon 2008 shaking my head thinking this was all nuts.

Here’s the deal though, when we’re writing systems against the one, true database model, we’re potentially spending a lot of time mapping incoming messages to the stored model, and probably even more time transforming the raw database model into a shape that’s appropriate for our system’s clients in a way that also creates some decoupling between clients and the ugly, raw details of our underlying database. The “scary” CQRS architecture arguably just brings that sometimes hidden work and complexity into the light of day.

Now let’s move on to Event Sourcing. Event sourcing is a style of system persistence where each state change is captured and stored as an explicit event object. There’s all sorts of value from keeping the raw change events around for later, but you of course do need to compile the events into derived, or “projected” models that represent the current system state. When combining event sourcing into a CQRS architecture, some projected models from the raw events can serve as both the “write model” inside of incoming commands, while others can be built as the “query model” that clients will use to query our system.

At the end of this section, I want to be clear that event sourcing and CQRS can be used independently of each other, but to paraphrase Forrest Gump, event sourcing and CQRS go together like peas and carrots.

Now, the event sourcing side of this especially might sound a little scary if you’re not familiar with some of the existing tooling, so let’s move on first to Marten…

Marten has a mature feature set for event sourcing already, and we’re grown quite a bit in capability toward our support for projections (read-only views of the raw events). If you’ll peek back at the “scary” view of CQRS above, Marten’s projection support solves the problem of keeping the raw events synchronized to both any kind of “write model” for incoming commands and the richer “query model” views for system clients within a single database. In my opinion, Marten removes the “scary” out of event sourcing and we’re on our way to being the best possible “event sourcing in a box” solution for .NET.

As that support has gotten more capable and robust though, our users have frequently been asking about how to subscribe to events being captured in Marten and how to relay those events reliably to some other kind of infrastructure — which could be a completely separate database, outgoing message queues, or some other kind of event streaming scenario.

Oskar Dudycz and I have been discussing how to solve this in Marten, and we’ve come up with these three main mechanisms for supporting subscriptions to event data:

  1. Some or all of the events being captured by Marten should be forwarded automatically at the time of event capture (IDocumentSession.SaveChangesAsync())
  2. When strict ordering is required, we’ll need some kind of integration into Marten’s async daemon to relay events to external infrastructure
  3. For massive amounts of data with ambitious performance targets, use something like Debezium to directly stream Marten events from Postgresql to Kafka/Pulsar/etc.

Especially for the first item in that list above, we need some kind of outbox integration with Marten sessions to reliably relay events from Marten to outgoing transports while keeping the system in a consistent state (i.e., don’t publish the events if the transaction fails).

Fortunately, there’s a functional outbox implementation for Marten in…

To be clear, Jasper as a project was pretty well mothballed by the combination of COVID and the massive slog toward Marten 4.0 (and then a smaller slog to 5.0). However, I’ve been able to get back to Jasper and yesterday kicked out a new Jasper 2.0.0-alpha-2 release and (Re) Introducing Jasper as a Command Bus.

For this post though, I want to explore the potential of the Jasper + Marten combination for CQRS architectures. To that end, a couple weeks I published Marten just got better for CQRS architectures, which showed some new APIs in Marten to simplify repetitive code around using Marten event sourcing within CQRS architectures. Part of the sample code in that post was this MVC Core controller that used some newer Marten functionality to handle an incoming command:

public async Task CompleteCharting(
    [FromBody] CompleteCharting charting, 
    [FromServices] IDocumentSession session)
{
    var stream = await session
        .Events.FetchForExclusiveWriting<ProviderShift>(charting.ShiftId);
 
    // Validation on the ProviderShift aggregate
    if (stream.Aggregate.Status != ProviderStatus.Charting)
    {
        throw new Exception("The shift is not currently charting");
    }
     
    // We "decided" to emit one new event
    stream.AppendOne(new ChartingFinished(stream.Aggregate.AppointmentId.Value, stream.Aggregate.BoardId));
 
    await session.SaveChangesAsync();
}

To review, that controller method:

  1. Takes in a command message of type CompleteCharting
  2. Loads the current state of the aggregate ProviderShift model referred to by the incoming command, and does so in a way that takes care of concurrency for us by waiting to get an exclusive lock on the particular ProviderShift
  3. Assuming that the validation against the ProviderShift succeeds, emits a new ChartingFinished event
  4. Saves the pending work with a database transaction

In that post, I pointed out that there were some potential flaws or missing functionality with this approach:

  1. We probably want some error handling to retry the operation if we hit concurrency exceptions or timeout trying to get the exclusive lock. In other words, we have to plan for concurrency exceptions
  2. It’d be good to be able to automatically publish the new ChartingFinished event to a queue to take further action within our system (or an external service if we were using messaging here)
  3. Lastly, I’d argue there’s some repetitive code up there that could be simplified

To address these points, I’m going to introduce Jasper and its integration with Marten (Jasper.Persistence.Marten) to the telehealth portal sample from my previous blog post.

I’m going to move the actual handling of the CompleteCharting to a Jasper handler shown below that is functionally equivalent to the controller method shown earlier (except I switched the concurrency protection to being optimistic):

// This is auto-discovered by Jasper
public class CompleteChartingHandler
{
    [MartenCommandWorkflow] // this opts into some Jasper middlware 
    public ChartingFinished Handle(CompleteCharting charting, ProviderShift shift)
    {
        if (shift.Status != ProviderStatus.Charting)
        {
            throw new Exception("The shift is not currently charting");
        }

        return new ChartingFinished(charting.AppointmentId, shift.BoardId);
    }
}

And the controller method gets simplified down to just relaying the command to Jasper:

    public Task CompleteCharting(
        [FromBody] CompleteCharting charting, 
        [FromServices] ICommandBus bus)
    {
        // Just delegating to Jasper here
        return bus.InvokeAsync(charting, HttpContext.RequestAborted);
    }

There’s some opportunity for some mechanisms to make the code above be a little less repetitive and efficient. Maybe by riding on Minimal APIs. That’s for a later date though:)

By using the new [MartenCommandWorkflow] attribute, we’re directing Jasper to surround the command handler with middleware that handles much of the Marten mechanics by:

  1. Loading the aggregate ProviderShift for the incoming CompleteCharting command (I’m omitting some details here for brevity, but there’s a naming convention that can be explicitly overridden to pluck the aggregate identity off the incoming command)
  2. Passing that ProviderShift aggregate into the Handle() method above
  3. Applying the returned event to the event stream for the ProviderShift
  4. Committing the outstanding changes in the active Marten session

The Handle() code above becomes an example of a Decider function. Even better yet, it’s completely decoupled from any kind of infrastructure and fully synchronous. I’m going to argue that this approach will make command handlers much easier to unit test, definitely easier to write, and easier to read later just because you’re only focused on the business logic.

So that covers the repetitive code problem, but let’s move on to automatically publishing the ChartingCompleted event and some error handling. I’m going to add Jasper through the application’s bootstrapping code as shown below:

builder.Host.UseJasper(opts =>
{
    // I'm choosing to process any ChartingFinished event messages
    // in a separate, local queue with persistent messages for the inbox/outbox
    opts.PublishMessage<ChartingFinished>()
        .ToLocalQueue("charting")
        .DurablyPersistedLocally();
    
    // If we encounter a concurrency exception, just try it immediately 
    // up to 3 times total
    opts.Handlers.OnException<ConcurrencyException>().RetryNow(3); 
    
    // It's an imperfect world, and sometimes transient connectivity errors
    // to the database happen
    opts.Handlers.OnException<NpgsqlException>()
        .RetryWithCooldown(50.Milliseconds(), 100.Milliseconds(), 250.Milliseconds());
});

Jasper comes with some pretty strong exception handling policy capabilities you’ll need to do grown up development. In this case, I’m just setting up some global policies in the application to retry message failures on either Marten concurrency exceptions or the inevitable, transient Postgresql connectivity hiccups. In the case of the concurrency exception, you may just need to start the work over to ensure you’re starting from the most recent aggregate changes. I used globally applied policies here, but Jasper will also allow you to override that on a message type by message type basis.

Lastly, let’s add the Jasper outbox integration for Marten and opt into automatic event publishing with this bit of configuration chained to the standard AddMarten() usage:

builder.Services.AddMarten(opts =>
{
    // Marten configuration...
})
    // I added this to enroll Marten in the Jasper outbox
    .IntegrateWithJasper()
    
    // I also added this to opt into events being forward to
    // the Jasper outbox during SaveChangesAsync()
    .EventForwardingToJasper();

And that’s actually that. The configuration above will add the Jasper outbox tables to the Marten database for us, and let Marten’s database schema management manage those extra database objects.

Back to the command handler (mildly elided):

public class CompleteChartingHandler
{
    [MartenCommandWorkflow] 
    public ChartingFinished Handle(CompleteCharting charting, ProviderShift shift)
    {
        // validation code goes here!

        return new ChartingFinished(charting.AppointmentId, shift.BoardId);
    }
}

By opting into the outbox integration and event forwarding to Jasper from Marten, when this command handler is executed, the ChartingFinished events will be published — in this case just to an in-memory queue, but it could also be to an external transport — with Jasper’s outbox implementation that guarantees that the message will be delivered at least once as long as the database transaction to save the new event succeeds.

Conclusion and What’s Next?

There’s a tremendous amount of work in the rear window to get to the functionality that I demonstrated here, and a substantial amount of ambition in the future to drive this forward. I would love any possible feedback both positive and negative. Marten is a team effort, but Jasper’s been mostly my baby for the past 3-4 years, and I’d be happy for anybody who would want to get involved with that. I’m way behind in documentation for Jasper and somewhat for Marten, but that’s in flight.

My next couple posts to follow up on this are to:

  • Do a deeper dive into Jasper’s outbox and explain why it’s different and arguably more useful than the outbox implementations in other leading .NET tools
  • Introduce the usage of Rabbit MQ with Jasper for external messaging
  • Take a detour into the development and deployment time command line utilities built into Jasper & Marten through Oakton

Marten just got better for CQRS architectures

I’m assuming some prior knowledge of Event Sourcing as an architectural pattern here. I highly recommend Oskar Dudycz’s Introduction to Event Sourcing training kit or this video from Derek Comartin. While both Event Sourcing and the closely associated CQRS architectural style are both useful without the other, I’m still assuming here that you’re interested in using Marten for event sourcing within a larger CQRS architecture.

So you’re adopting an event sourcing style with Marten for your persistence within a larger CQRS architectural style. Crudely speaking, all “writes” to the system state involve sending a command message to your CQRS service with a workflow something like this:

In the course of handling the command message, our command handler (or HTTP endpoint) needs to:

  1. Fetch a “write model” that represents the state for the current workflow. This projected “write model” will be used by the command handler to validate the incoming command and also to…
  2. Decide what subsequent events should be published to update the state of the system based on the existing state and the incoming command
  3. Persist the new events to the ongoing Marten event store
  4. Possibly publish some or all of the new events to an outgoing transport to be acted upon asynchronously
  5. Deal with concurrency concerns, especially if there’s any significant chance that other related commands maybe coming in for the same logical workflow at the same time

Do note that as I shift to implementations that I’m going to mostly bypass any discussion of design patterns or what I personally consider to be useless cruft from common CQRS approaches in the .Net or JVM worlds. I.e., no repositories will be used in any of this code.

As an example system, let’s say that we’re building a new, online telehealth system that among other things will track how a medical provider spends their time during a shift helping patients during their workday. Using Marten’s “self-aggregate” support, a simplified version of the provider shift state is represented by this model:

public class ProviderShift
{
    public Guid Id { get; set; }
    
    // Pay attention to this, this will come into play
    // later
    public int Version { get; set; }
    
    public Guid BoardId { get; private set; }
    public Guid ProviderId { get; init; }
    public ProviderStatus Status { get; private set; }
    public string Name { get; init; }
    
    public Guid? AppointmentId { get; set; }
    
    public static async Task<ProviderShift> Create(ProviderJoined joined, IQuerySession session)
    {
        var provider = await session.LoadAsync<Provider>(joined.ProviderId);
        return new ProviderShift
        {
            Name = $"{provider.FirstName} {provider.LastName}",
            Status = ProviderStatus.Ready,
            ProviderId = joined.ProviderId,
            BoardId = joined.BoardId
        };
    }

    public void Apply(ProviderReady ready)
    {
        AppointmentId = null;
        Status = ProviderStatus.Ready;
    }

    public void Apply(ProviderAssigned assigned)
    {
        Status = ProviderStatus.Assigned;
        AppointmentId = assigned.AppointmentId;
    }
    
    public void Apply(ProviderPaused paused)
    {
        Status = ProviderStatus.Paused;
        AppointmentId = null;
    }

    // This is kind of a catch all for any paperwork the
    // provider has to do after an appointment has ended
    // for the just concluded appointment
    public void Apply(ChartingStarted charting) => Status = ProviderStatus.Charting;
}

Next up, let’s play the user story for a provider to make their “charting” activity complete after a patient appointment concludes. Looking at the sequence diagram and the bullet list of concerns for each command handler, we’ve got a few things to worry about. Never fear though, because Marten has you (mostly) covered today with a couple new features introduced in Marten v5.4 last week.

Starting with this simple command:

public record CompleteCharting(
    Guid ShiftId, 
    Guid AppointmentId, 
    int Version);

We’ll use Marten’s brand new IEventStore.FetchForWriting<T>() API to whip up the basic command handler (just a small ASP.Net Core Controller endpoint):

    public async Task CompleteCharting(
        [FromBody] CompleteCharting charting, 
        [FromServices] IDocumentSession session)
    {
        /* We've got options for concurrency here! */
        var stream = await session
            .Events.FetchForWriting<ProviderShift>(charting.ShiftId);

        // Validation on the ProviderShift aggregate
        if (stream.Aggregate.Status != ProviderStatus.Charting)
        {
            throw new Exception("The shift is not currently charting");
        }
        
        // We "decided" to emit one new event
        stream.AppendOne(new ChartingFinished(stream.Aggregate.AppointmentId.Value, stream.Aggregate.BoardId));

        await session.SaveChangesAsync();
    }

The FetchForWriting() method used above is doing a couple different things:

  1. Finding the current, persisted version of the event stream for the provider shift and loading that into the current document session to help with optimistic concurrency checks
  2. Fetching the current state of the ProviderShift aggregate for the shift id coming up on the command. Note that this API papers over whether or not the aggregate in question is a “live aggregate” that needs to be calculated on the fly from the raw events or previously persisted as just a Marten document by either an inline or asynchronous projection. I think I would strongly recommend that “write model” aggregates be either inline or live to avoid eventual consistency issues.

Concurrency?!?

Hey, the hard truth is that it’s easy for the command to be accidentally or incidentally dispatched to your service multiple times from messaging infrastructure, multiple users doing the same action in different sessions, or somebody clumsy like me just accidentally clicking a button too many times. One way or another, we may need to harden our command handler against concurrency concerns.

The usage of FetchForWriting<T>() will actually set you up for optimistic concurrency checks. If someone else manages to successfully process a command against the same provider shift between the call to FetchForWriting<T>() and IDocumentSession.SaveChangesAsync(), you’ll get a Marten ConcurrencyException thrown by SaveChangesAsync() that will abort and rollback the transaction.

Moving on though, let’s tighten up the optimistic version check by first telling Marten what the version of the provider shift was that our command thinks that the provider shift is at on the server. First though, we need to get the current version back to the client that’s collecting changes to our provider shift. If you scan back to the ProviderShift aggregate above, you’ll see this property:

    public int Version { get; set; }

With another new little feature in Marten v5.4, the Marten projection support will automatically set the value of a Version to the latest stream version for a single stream aggregate like the ProviderShift. Knowing that, and assuming that ProviderShift is updated inline, we could just deliver the whole ProviderShift to the client with this little web service endpoint (using Marten.AspNetCore extensions):

    [HttpGet("/shift/{shiftId}")]
    public Task GetProviderShift(Guid shiftId, [FromServices] IQuerySession session)
    {
        return session.Json.WriteById<ProviderShift>(shiftId, HttpContext);
    }

The Version property can be a field, scoped as internal, or read-only. Marten is using a dynamically generated Lambda that can happily bypass whatever scoping rules you have to set the version to the latest event for the stream represented by this aggregate. The Version naming convention can also be explicitly ignored, or redirected to a totally differently named member. Lastly, it can even be a .Net Int64 type too — but if you’re doing that, you probably have some severe modeling issues that should be addressed first!

Back to our command handler. If the client has what’s effectively the “expected starting version” of the ProviderShift and sends the CompleteCharting command with that version, we can change the first line of our handler method code to this:

        var stream = await session
                
            // Note: I'm passing in the expected, starting provider shift
            // version from the command
            .Events.FetchForWriting<ProviderShift>(charting.ShiftId, charting.Version);

This new version will throw a ConcurrencyException right off the bat if the expected, starting version is not the same as the last, persisted version in the database. After that, it’s the same optimistic concurrency check at the point of calling SaveChangesAsync() to commit the changes.

Lastly, since Marten is built upon a real database instead of trying to be its own specialized storage engine like many other event sourcing tools, we’ve got one last trick. Instead of putzing around with optimistic concurrency checks let’s go to a pessimistic, exclusive lock on the specific provider shift so that only one session at a time can ever be writing to that provider shift with this variation:

        var stream = await session
                
            // Note: This will try to "wait" to claim an exclusive lock for writing
            // on the provider shift event stream
            .Events.FetchForExclusiveWriting<ProviderShift>(charting.ShiftId);
        

As you can see, Marten has some new functionality to make it even easier to use Marten within CQRS architectures by eliminating some previously repetitive code in both queries on projected state and in command handlers where you need to use Marten’s concurrency control.

Wait, not so fast, you missed some things!

I missed a couple very big things in the sample code above. For one, we’d probably want to broadcast the new events through some kind of service bus to allow other systems or just our own system to asynchronously do other work (like trying to assign our provider to another ready patient appointment). To do that reliably so that the event capture and the outgoing events being published succeed or fail together in one atomic action, I really need an “outbox” of some sort integrated into Marten.

I also left out any kind of potential error handling or message retry capabilities around the concurrency exceptions. And lastly (that I can think of offhand), I completely left out any discussion of the instrumentation you’d want in any kind of grown up system.

Since we’re in the middle of the NBA playoffs, I’m reminded of a Shaquille O’Neal quote from when his backup was Alonzo Mourning, and Mourning had a great game off the bench: “sometimes Superman needs some help from the Incredible Hulk.” In this case, part of the future of Marten is to be combined with another project called Jasper that is going to add external messaging with a robust outbox implementation for Marten to create a full stack for CQRS architectures. Maybe as soon as late next week or at least in June, I’ll write a follow up showing the Marten + Jasper combination that deals with the big missing pieces of this post.