Building a Critter Stack Application: Durable Outbox Messaging and Why You Care!

Hey, did you know that JasperFx Software is ready for formal support plans for Marten and Wolverine? Not only are we trying to make the “Critter Stack” tools be viable long term options for your shop, we’re also interested in hearing your opinions about the tools and how they should change. We’re also certainly open to help you succeed with your software development projects on a consulting basis whether you’re using any part of the Critter Stack or any other .NET server side tooling.

Let’s build a small web service application using the whole “Critter Stack” and their friends, one small step at a time. For right now, the “finished” code is at CritterStackHelpDesk on GitHub.

The posts in this series are:

  1. Event Storming
  2. Marten as Event Store
  3. Marten Projections
  4. Integrating Marten into Our Application
  5. Wolverine as Mediator
  6. Web Service Query Endpoints with Marten
  7. Dealing with Concurrency
  8. Wolverine’s Aggregate Handler Workflow FTW!
  9. Command Line Diagnostics with Oakton
  10. Integration Testing Harness
  11. Marten as Document Database
  12. Asynchronous Processing with Wolverine
  13. Durable Outbox Messaging and Why You Care! (this post)
  14. Wolverine HTTP Endpoints
  15. Easy Unit Testing with Pure Functions
  16. Vertical Slice Architecture
  17. Messaging with Rabbit MQ
  18. The “Stateful Resource” Model
  19. Resiliency

As we layer in new technical concepts from both Wolverine and Marten to build out incident tracking, help desk API, we looked at this message handler in the last post that both saved data, and published a message to an asynchronous, local queue that would act upon the newly saved data at some point.

public static class CategoriseIncidentHandler
{
    public static readonly Guid SystemId = Guid.NewGuid();
     
    [AggregateHandler]
    // The object? as return value will be interpreted
    // by Wolverine as appending one or zero events
    public static async Task<object?> Handle(
        CategoriseIncident command, 
        IncidentDetails existing,
        IMessageBus bus)
    {
        if (existing.Category != command.Category)
        {
            // Send the message to any and all subscribers to this message
            await bus.PublishAsync(
                new TryAssignPriority { IncidentId = existing.Id });
            return new IncidentCategorised
            {
                Category = command.Category,
                UserId = SystemId
            };
        }
 
        // Wolverine will interpret this as "do no work"
        return null;
    }
}

To recap, that message handler is potentially appending an IncidentCategorised event to an Incident event stream and publishing a command message named TryAssignPriority that will trigger a downstream action to try to assign a new priority to our Incident.

This relatively simple message handler (and we’ll make it even simpler in a later post in this series) creates a host of potential problems for our system:

  • In a naive usage of messaging tools, there’s a race condition between the outbound `TryAssignPriority` message being picked up by its handler and the database changes getting committed to the database. I have seen this cause nasty, hard to reproduce bugs through in real life production applications when once in awhile the message is processed before the database changes are made, and the system behaves incorrectly because the expected data is not yet committed by the original command finishing.
  • Maybe the actual message sending fails, but the database changes succeed, so the system is in an inconsistent state.
  • Maybe the outgoing message is happily published successfully, but the database changes fail, so that when the TryAssignPriority message is handled, it’s working against old system state.
  • Event if everything succeeds perfectly, the outgoing message should never actually be published until the transaction is complete.

To be clear, even without the usage of the outbox feature we’re about to use, Wolverine will apply an “in memory outbox” in message handlers such that all the messages published through IMessageBus.PublishAsync()/SendAsync()/etc. will be held in memory until the successful completion of the message handler. That by itself is enough to prevent the race condition between the database changes and the outgoing messages.

At this point, let’s introduce Wolverine’s transactional outbox support that was built specifically to solve or prevent the potential problems I listed up above. In this case, Wolverine has a transactional outbox & inbox support built into its integrations with PostgreSQL and Marten.

To rewind a little bit, in an earlier post where we first introduced the Marten + Wolverine integration, I had added a call to IntegrateWithWolverine() to the Marten configuration in our Program file:

using Wolverine.Marten;
 
var builder = WebApplication.CreateBuilder(args);
 
builder.Services.AddMarten(opts =>
{
    // This would be from your configuration file in typical usage
    opts.Connection(Servers.PostgresConnectionString);
    opts.DatabaseSchemaName = "wolverine_middleware";
})
    // This is the wolverine integration for the outbox/inbox,
    // transactional middleware, saga persistence we don't care about
    // yet
    .IntegrateWithWolverine()
     
    // Just letting Marten build out known database schema elements upfront
    // Helps with Wolverine integration in development
    .ApplyAllDatabaseChangesOnStartup();

Among other things, the call to IntegrateWithWolverine() up above directs Wolverine to use the PostgreSQL database for Marten as the durable storage for incoming and outgoing messages as part of Wolverine’s transactional inbox and outbox. The basic goal of this subsystem is to create consistency (really “eventual consistency“) between database transactions and outgoing messages without having to resort to endlessly painful distributed transactions.

Now, we’ve got another step to make. As of right now, Wolverine makes a determination of whether or not to use the durable outbox storage based on the destination of the outgoing message — with the theory that teams might easily want to mix and match durable messaging and less resource intensive “fire and forget” messaging within the same application. In this help desk service, we’ll make that easy and just say that all message processing in local queues (we set up TryAssignPriority to be handled through a local queue in the previous post) to be durable. In the UseWolverine() configuration, I’ll add this line of code to do that:

builder.Host.UseWolverine(opts =>
{
    // More configuration...

    // Automatic transactional middleware
    opts.Policies.AutoApplyTransactions();
    
    // Opt into the transactional inbox for local 
    // queues
    opts.Policies.UseDurableLocalQueues();
    
    // Opt into the transactional inbox/outbox on all messaging
    // endpoints
    opts.Policies.UseDurableOutboxOnAllSendingEndpoints();

    // Set up from the previous post
    opts.LocalQueueFor<TryAssignPriority>()
        // By default, local queues allow for parallel processing with a maximum
        // parallel count equal to the number of processors on the executing
        // machine, but you can override the queue to be sequential and single file
        .Sequential()

        // Or add more to the maximum parallel count!
        .MaximumParallelMessages(10);
});

I (Jeremy) may very well declare this “endpoint by endpoint” declaration of durability to have been a big mistake because confused some users and vote to change this in a later version of Wolverine.

With this outbox functionality in place, the messaging and transaction workflow behind the scenes of that handler shown above is to:

  1. When the outgoing TryAssignPriority message is published, Wolverine will “route” that message into its internal Envelope structure that includes the message itself and all the necessary metadata and information Wolverine would need to actually send the message later
  2. The outbox integration will append the outgoing message as a pending operation to the current Marten session
  3. The IncidentCategorised event will be appended to the current Marten session
  4. The Marten session is committed (IDocumentSession.SaveChangesAsync()), which will persist the new event and a copy of the outgoing Envelope into the outbox or inbox (scheduled messages or messages to local queues will be persisted in the incoming table) tables in one single, batched database command and by a native PostgreSQL transaction.
  5. Assuming the database transaction succeeds, the outgoing messages are “released” to Wolverine’s outgoing message publishing in memory (we’re coming back to that last point in a bit)
  6. Once Wolverine is able to successfully publish the message to the outgoing transport, it will delete the database table record for that outgoing message.

The 4th point is important I think. The close integration between Marten & Wolverine allows for more efficient processing by combining the database operations to minimize database round trips. In cases where the outgoing message transport is also batched (Azure Service Bus or AWS SQS for example), the database command to delete messages is also optimized for one call using PostgreSQL array support. I guess the main point of bringing this up is just to say there’s been quite a bit of thought and outright micro-optimizations done to this infrastructure.

But what about…?

  • the process is shut down cleanly? Wolverine tries to “drain” all in flight work first, and then “release” that process’s ownership of the persisted messages
  • the process crashes before messages floating around the local queues or outgoing message publishing finishes? Wolverine is able to detect a “dormant node” and reassign the persisted incoming and outgoing messages to be processed by another node. Or in the case of a single node, restart that work when the process is restarted.
  • the Wolverine tables don’t yet exist in the database? Wolverine has similar database management to Marten (it’s all the shared Weasel library doing that behind the scenes) and will happily build out missing tables in its default setting
  • an application using a database per tenant multi-tenancy strategy? Wolverine creates separate inbox or outbox storage in each tenant database. It’s complicated and took quite awhile to build, but it works. If no tenant is specified, the inbox/outbox in a “default” database is used
  • I need to use the outbox approach for consistency outside of a message handler, like when handling an HTTP request that happens to make both database changes and publish messages? That’s a really good question, and arguably one of the best reasons to use Wolverine over other .NET messaging tools because as we’ll see in later posts, that’s perfectly possible and quite easy. There is a recipe for using the Wolverine outbox functionality with MVC Core or Minimal API shown here.

Summary and What’s Next

The outbox (and closely related inbox) support is hugely important inside of any system that uses asynchronous messaging as a way of creating consistency and resiliency. Wolverine’s implementation is significantly different (and honestly more complicated) than typical implementations that depend on just polling from an outbound database table. That’s a positive in some ways because we believe that Wolverine’s approach is more efficient and will lead to greater throughput.

There is also similar inbox/outbox functionality and optimizations for Wolverine with EF Core using either PostgreSQL or Sql Server as the backing storage. In the future, I hope to see the EF Core and Sql Server support improve, but for right now, the Marten integration is getting the most attention and usage. I’d also love to see Wolverine grow to include support for alternative databases, with Azure CosmosDb and AWS Dynamo Db being leading contenders. We’ll see.

As for what’s next, let me figure out what sounds easy for the next post in January. In the meantime, Happy New Year’s everybody!

Leave a comment