Building a Critter Stack Application: Asynchronous Processing with Wolverine

Hey, did you know that JasperFx Software is ready for formal support plans for Marten and Wolverine? Not only are we trying to make the “Critter Stack” tools be viable long term options for your shop, we’re also interested in hearing your opinions about the tools and how they should change. We’re also certainly open to help you succeed with your software development projects on a consulting basis whether you’re using any part of the Critter Stack or any other .NET server side tooling.

Let’s build a small web service application using the whole “Critter Stack” and their friends, one small step at a time. For right now, the “finished” code is at CritterStackHelpDesk on GitHub.

The posts in this series are:

  1. Event Storming
  2. Marten as Event Store
  3. Marten Projections
  4. Integrating Marten into Our Application
  5. Wolverine as Mediator
  6. Web Service Query Endpoints with Marten
  7. Dealing with Concurrency
  8. Wolverine’s Aggregate Handler Workflow FTW!
  9. Command Line Diagnostics with Oakton
  10. Integration Testing Harness
  11. Marten as Document Database
  12. Asynchronous Processing with Wolverine (this post)
  13. Durable Outbox Messaging and Why You Care!
  14. Wolverine HTTP Endpoints
  15. Easy Unit Testing with Pure Functions
  16. Vertical Slice Architecture
  17. Messaging with Rabbit MQ
  18. The “Stateful Resource” Model
  19. Resiliency

As we continue to add new functionality to our incident tracking, help desk system, we have been using Marten for persistence and Wolverine for command execution within MVC Core controllers (with cameos from Alba for testing support and Oakton for command line utilities).

In the workflow we’ve built out so far for the little system shown below, we’ve created a command called CategorizeIncident that for the moment is only sent to the system through HTTP calls from a user interface.

Let’s say that in our system that we may have some domain logic rules based on customer data that we could use to try to prioritize an incident automatically once the incident is categorized. To that end, let’s create a new command named `TryAssignPriority` like this:

public class TryAssignPriority
{
    public Guid IncidentId { get; set; }
}

We’d like to kick off this work any time an incident is categorized, but we might not want to necessarily do that work within the scope of the web request that’s capturing the CategorizeIncident command. Partially this would be a potential scalability issue to potentially offload work from the web server, partially to make the user interface as responsive as possible by not making it wait for slower web service responses, but mostly because I want an excuse to introduce Wolverine’s ability to asynchronously process work through local, in memory queues.

Most of the code in this post is an intermediate form that I’m using just to introduce concepts in the simplest way I can think of. In later posts I’ll show more idiomatic Wolverine ways to do things to arrive at the final version that is in GitHub.

Alright, now that we’ve got our new command class above, let’s publish that locally through Wolverine by breaking into our earlier CategoriseIncidentHandler that I’ll show here in a “before” state:

public static class CategoriseIncidentHandler
{
    public static readonly Guid SystemId = Guid.NewGuid();
    
    [AggregateHandler]
    public static IEnumerable<object> Handle(CategoriseIncident command, IncidentDetails existing)
    {
        if (existing.Category != command.Category)
        {
            yield return new IncidentCategorised
            {
                Category = command.Category,
                UserId = SystemId
            };
        }
    }
}

In this next version, I’m going to add a single call to Wolverine’s main IMessageBus entry point to publish the new TryAssignPriority command message:

public static class CategoriseIncidentHandler
{
    public static readonly Guid SystemId = Guid.NewGuid();
    
    [AggregateHandler]
    // The object? as return value will be interpreted
    // by Wolverine as appending one or zero events
    public static async Task<object?> Handle(
        CategoriseIncident command, 
        IncidentDetails existing,
        IMessageBus bus)
    {
        if (existing.Category != command.Category)
        {
            // Send the message to any and all subscribers to this message
            await bus.PublishAsync(new TryAssignPriority { IncidentId = existing.Id });
            return new IncidentCategorised
            {
                Category = command.Category,
                UserId = SystemId
            };
        }

        // Wolverine will interpret this as "do no work"
        return null;
    }
}

I didn’t do anything that is necessarily out of order here. We haven’t built a message handler for TryAssignPriority or done anything to register subscribers, but that can come later because the PublishAsync() call up above will quietly do nothing if there are no known subscribers for the message.

For asynchronous messaging veterans out there, I will discuss Wolverine’s support for a transactional outbox for a later post. For now, just know that there’s at the very least an in-memory outbox around any message handler that will not send out any pending published messages until after the original message is successfully handled. If you’re not familiar with the “transactional outbox” pattern, please come back to read the follow up post on that later because you absolutely need to understand that to use asynchronous messaging infrastructure like Wolverine.

Next, let’s just add a skeleton message handler for our TryAssignPriority command message in the root API projection:

public static class TryAssignPriorityHandler
{
    public static void Handle(TryAssignPriority command)
    {
        Console.WriteLine("Hey, somebody wants me to prioritize incident " + command.IncidentId);
    }
}

Switching to the command line (you may need to have the PostgreSQL database running for this next thing to work #sadtrombone), I’m going to call dotnet run -- describe to preview my help desk API a little bit.

Under the section of the textual output with the header “Wolverine Message Routing”, you’ll see the message routing tree for Wolverine’s known message types:

┌─────────────────────────────────┬──────────────────────────────────────────┬──────────────────┐
│ Message Type                    │ Destination                              │ Content Type     │
├─────────────────────────────────┼──────────────────────────────────────────┼──────────────────┤
│ Helpdesk.Api.CategoriseIncident │ local://helpdesk.api.categoriseincident/ │ application/json │
│ Helpdesk.Api.TryAssignPriority  │ local://helpdesk.api.tryassignpriority/  │ application/json │
└─────────────────────────────────┴──────────────────────────────────────────┴──────────────────┘

As you can hopefully see in that table up above, just by the fact that Wolverine “knows” there is a handler in the local application for the TryAssignPriority message type, it’s going to route messages of that type to a local queue where it will be executed later in a separate thread.

Don’t worry, this conventional routing, the parallelization settings, and just about anything you can think of is configurable, but let’s mostly stay with defaults for right now.

Switching to the Wolverine configuration in the Program file, here’s a little taste of some of the ways we could control the exact parameters of the asynchronous processing for this local, in memory queue:

builder.Host.UseWolverine(opts =>
{
    // more configuration...

    // Adding a single Rabbit MQ messaging rule
    opts.PublishMessage<RingAllTheAlarms>()
        .ToRabbitExchange("notifications");

    opts.LocalQueueFor<TryAssignPriority>()
        // By default, local queues allow for parallel processing with a maximum
        // parallel count equal to the number of processors on the executing
        // machine, but you can override the queue to be sequential and single file
        .Sequential()

        // Or add more to the maximum parallel count!
        .MaximumParallelMessages(10);
    
    // Or if so desired, you can route specific messages to 
    // specific local queues when ordering is important
    opts.Policies.DisableConventionalLocalRouting();
    opts.Publish(x =>
    {
        x.Message<TryAssignPriority>();
        x.Message<CategoriseIncident>();

        x.ToLocalQueue("commands").Sequential();
    });
});

Summary and What’s Next

Through its local queues function, Wolverine has very strong support for managing asynchronous work within a local process. Any of Wolverine’s message handling capability is usable within these local queues. You also have complete control over the parallelization of the messages being handled in these local queues.

This functionality does raise a lot of questions that I will try to answer in subsequent posts in this series:

  • For the sake of system consistency, we absolutely have to talk about Wolverine’s transactional outbox support
  • How we can use Wolverine’s integration testing support to test our system even when it is spawning additional messages that may be handled asynchronously
  • Wolverine’s ability to automatically forward captured events in Marten to message handlers for side effects
  • How to utilize Wolverine’s “special sauce” to craft message handlers as pure functions that are more easily unit tested than what we have so far
  • Wolverine’s built in Open Telemetry support to trace the asynchronous work end to end
  • Wolverine’s error handling policies to make our system as resilient as possible

Thanks for reading! I’ve been pleasantly surprised how well this series has been received so far. I think this will be the last entry until after Christmas, but I think I will write at least 7-8 more just to keep introducing bits of Critter Stack capabilities in small bites. In the meantime, Merry Christmas and Happy Holidays to you all!

Leave a comment