(Re) Introducing Jasper as a Command Bus

EDIT 6/15/2022: The correct Nuget is “Jasper.Persistence.Marten”

I just released a second alpha of Jasper 2.0 to Nuget. You can find the project goals for Jasper 2.0 here, and an update from a couple weeks ago here. Be aware that the published documentation for Jasper is very, very far behind. I’m working on it:)

Jasper is a long running open source project with the goal of creating a low ceremony and highly productive framework for building systems in .Net that would benefit from either an in memory command bus or utilize asynchronous messaging. The big driver for Jasper right now is using it in combination with the event sourcing capabilities of Marten as a full stack CQRS architectural framework. Later this week I’ll preview the ongoing Marten + Jasper integration, but for today I’d like to just introduce Jasper itself a little bit.

For a simplistic sample application, let’s say that we’re building an online system for potential customers to make reservations at any number of participating restaurants. I’m going to start by laying down a brand new .Net 6 Web API project. I’m obviously going to choose Marten as my persistence tooling, so the next steps are to add a Nuget reference to the Jasper.Persistence.Marten Nuget which will bring transitive dependencies over for both Jasper and Marten.

Jasper also has some working integration with EF Core using either Sql Server or Postgresql as the backing store so far.

Let’s build an online reservation system for your favorite restaurants!

Let’s say that as a result of an event storming requirements session, we’ve determined that we want both a command message to confirm a reservation, and a corresponding event message out to the internal systems of the various restaurants. I’m going to eschew event sourcing to keep this simpler and just opt for a persistent Reservation document in Marten. All that being said, here’s our code to model everything I just described:

public record ConfirmReservation(Guid ReservationId);
public record ReservationConfirmed(Guid ReservationId);

public class Reservation
{
    public Guid Id { get; set; }
    public DateTimeOffset Time { get; set; }
    public string RestaurantName { get; set; }
    public bool IsConfirmed { get; set; }
}

Building Our First Message Handlers

In this contrived example, the ReservationConfirmed event message will be published separately because it spawns a call to an external system where I’d strongly prefer to have a separate “retry loop” around just that call. That being said, this is the first cut for a command handler for the ConfirmReservation message:

public class ConfirmReservationHandler
{
    public async Task Handle(ConfirmReservation command, IDocumentSession session, IExecutionContext publisher)
    {
        var reservation = await session.LoadAsync<Reservation>(command.ReservationId);

        reservation!.IsConfirmed = true;

        // Watch out, this could be a race condition!!!!
        await publisher.PublishAsync(new ReservationConfirmed(reservation.Id));

        // We're coming back to this in a bit......
        await session.SaveChangesAsync();
    }
}

To be technical, Jasper uses an in memory outbox for all message processing even if there’s no durable message storage to at least guarantee that outgoing messages are only published when the original message is successfully handled. I just wanted to show the potential danger here.

So a couple things to note that are different from existing tools like NServiceBus or MassTransit:

  • Jasper locates message handlers strictly through naming conventions. Public methods named either Handle() or Consume() on public types that are suffixed by Handler or Consumer. There are no mandatory attributes or interfaces. Hell, there’s not even a mandatory method signature except that the first argument is always assumed to be the message type.
  • Jasper Handle() methods can happily support method injection, meaning that the IDocumentSession parameter above is pushed into the method from Jasper itself. In my experience, using method injection frequently simplifies the message handler code as opposed to the idiomatic C# approach of using constructor injection and relaying things through private fields.
  • Message types in Jasper are just concrete types, and there’s no necessary Event or Message base classes of any type — but that may be introduced later strictly for optional diagnostics.

Lastly, notice my comment about the race condition between publishing the outgoing ReservationConfirmed event message and committing the database work through IDocumentSession.SaveChangesAsync(). That’s obviously a problem waiting to bite us, so we’ll come back to that.

Next, let’s move on to the separate handler for ReservationConfirmed:

[LocalQueue("Notifications")]
[RetryNow(typeof(HttpRequestException), 50, 100, 250)]
public class ReservationConfirmedHandler
{
    public async Task Handle(ReservationConfirmed confirmed, IQuerySession session, IRestaurantProxy restaurant)
    {
        var reservation = await session.LoadAsync<Reservation>(confirmed.ReservationId);

        // Make a call to an external web service through a proxy
        await restaurant.NotifyRestaurant(reservation);
    }
}

All this handler does is look up the current state of the reservation and post that to an external system through a proxy interface (IRestaurantProxy).

As I said before, I strongly prefer that calls out to external systems be isolated to their own retry loops. In this case, the [RetryNow] attribute is setting up Jasper to retry a command through this handler on transient HttpException errors with a 50, 100, and then 250 millisecond cooldown period between attempts. Jasper’s error handling policies go much deeper than this, but hopefully you can already see what’s possible.

The usage of the [LocalQueue("Notifications")] attribute is directing Jasper to execute the ReservationConfirmed messages in a separate, local queue named “Notifications”. In effect, we’ve got a producer/consumer solution between the incoming ConfirmReservation command and ReservationConfirmed event messages. The local queueing is done with the TPL Dataflow library. Maybe Jasper will eventually move to using System.Threading.Channels, but for right now there’s just bigger issues to worry about.

Don’t fret if you don’t care for sprinkling attributes all over your code, all of the configuration I’ve done above with attributes can also be done with a fluent interface at bootstrapping time, or even within the message handler classes themselves.

Bootstrapping Jasper

Stepping into the Program.cs file for our new system, I’m going to add bootstrapping for both Marten and Jasper in the simplest possible way like so:

using CommandBusSamples;
using Jasper;
using Marten;
using Oakton;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddMarten(opts =>
{
    opts.Connection("Host=localhost;Port=5433;Database=postgres;Username=postgres;password=postgres");
});

// Adding Jasper as a straight up Command Bus with all
// its default configuration
builder.Host.UseJasper();

var app = builder.Build();

// This isn't *quite* the most efficient way to do this,
// but it's simple to understand, so please just let it go...
// I'm just delegating the HTTP request body as a command 
// directly to Jasper's in memory ICommandBus
app.MapPost("/reservations/confirm", (ConfirmReservation command, ICommandBus bus) => bus.InvokeAsync(command));


// This opts into using Oakton for extended command line options for this app
// Oakton is also a transitive dependency of Jasper itself
return await app.RunOaktonCommands(args);

Okay, so let’s talk about some of the things in that code up above:

  • Jasper tries to embrace the generic host building and core abstractions that came with .Net Core (IHostBuilder, ILogger, IHostedService etc.) wherever possible, so hence the integration happens with the UseJasper() call seen above.
  • The call to UseJasper() also quietly sets up Lamar as the underlying IoC container for your application. I won’t get into that much here, but there are optimizations in Jasper’s runtime model that require Lamar.
  • I used Oakton as the command line parser. That’s not 100% necessary, but there are a lot of development time utilities with Oakton for Jasper development. I’ll show some of that in later posts building on this one.

The one single HTTP route calls directly into the Jasper ICommandBus.InvokeAsync() method to immediately execute the message handler inline for the ConfirmReservation message. As someone who’s a skeptic of “mediator” tools in AspNetCore, I’m not sure this really adds much value as the handler for ConfirmReservation is currently written. However, we can add some transient error handling to our application’s bootstrapping that would apply to the ICommandBus.InvokeAsync() calls like so:

builder.Host.UseJasper(opts =>
{
    // Just setting up some retries on transient database connectivity errors
    opts.Handlers.OnException<NpgsqlException>().OrInner<NpgsqlException>()
        .RetryWithCooldown(50.Milliseconds(), 100.Milliseconds(), 250.Milliseconds());
});

We can also opt to make our ConfirmReservation commands be processed in background threads rather than inline through our web request by changing the Minimal API route to:

// This isn't *quite* the most efficient way to do this,
// but it's simple to understand, so please just let it go...
app.MapPost("/reservations/confirm", (ConfirmReservation command, ICommandBus bus) => bus.EnqueueAsync(command));

The EnqueueAsync() method above places the incoming command message into an in-memory queue.

What if the process dies mid-flight?!?

An experienced user of asynchronous messaging tools will have happily spotted several potential problems in the solution so far. One, there’s a potential race condition in the ConfirmReservationHandler code between database changes being committed and the outgoing message being processed. Two, what if the process dies? If we’re using all this in-memory queueing stuff, that all dies when the process dies, right?

Fortunately, Jasper with some significant help from Postgresql and Marten here, already has a robust inbox and outbox implementation we’ll add next for durable messaging.

For clarity, here’s the original handler code again for the ConfirmReservation message:

public class ConfirmReservationHandler
{
    public async Task Handle(ConfirmReservation command, IDocumentSession session, IExecutionContext publisher)
    {
        var reservation = await session.LoadAsync<Reservation>(command.ReservationId);

        reservation!.IsConfirmed = true;

        // Watch out, this could be a race condition!!!!
        await publisher.PublishAsync(new ReservationConfirmed(reservation.Id));

        // We're coming back to this in a bit......
        await session.SaveChangesAsync();
    }
}

Please note the comment about the race condition in the code. What we need to do is to introduce Jasper’s outbox feature, then revisit this handler.

First though, I need to go back to the bootstrapping code in Program and add a little more code:

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddMarten(opts =>
{
    opts.Connection("Host=localhost;Port=5433;Database=postgres;Username=postgres;password=postgres");
})
    // NEW! Adding Jasper outbox integration to Marten in the "messages"
    // database schema
    .IntegrateWithJasper("messages");

// Adding Jasper as a straight up Command Bus
builder.Host.UseJasper(opts =>
{
    // Just setting up some retries on transient database connectivity errors
    opts.Handlers.OnException<NpgsqlException>().OrInner<NpgsqlException>()
        .RetryWithCooldown(50.Milliseconds(), 100.Milliseconds(), 250.Milliseconds());

    // NEW! Apply the durable inbox/outbox functionality to the two in-memory queues
    opts.DefaultLocalQueue.DurablyPersistedLocally();
    opts.LocalQueue("Notifications").DurablyPersistedLocally();

    // And I just opened a GitHub issue to make this config easier...
});

var app = builder.Build();

// This isn't *quite* the most efficient way to do this,
// but it's simple to understand, so please just let it go...
app.MapPost("/reservations/confirm", (ConfirmReservation command, ICommandBus bus) => bus.EnqueueAsync(command));

// This opts into using Oakton for extended command line options for this app
// Oakton is also a transitive dependency of Jasper itself
return await app.RunOaktonCommands(args);

This time I chained a call on the Marten configuration through the UseWithJasper() extension method. I also added a couple lines of code within the UseJasper() block to mark the in memory queues as being enrolled in the inbox/outbox mechanics through the DurablyPersistedLocally() method.

Now, back to our handler code. Keeping things explicit for now, I’m going to add some necessary mechanics for opting into outbox sending:

public class ConfirmReservationHandler
{
    public async Task Handle(ConfirmReservation command, IDocumentSession session, IExecutionContext publisher)
    {
        // Enroll the execution context and Marten session in
        // outbox sending
        // This is an extension method in Jasper.Persistence.Marten
        await publisher.EnlistInOutboxAsync(session);
        
        var reservation = await session.LoadAsync<Reservation>(command.ReservationId);

        reservation!.IsConfirmed = true;

        // No longer a race condition, I'll explain more below:)
        await publisher.PublishAsync(new ReservationConfirmed(reservation.Id));

        // Persist, and kick out the outgoing messages
        await session.SaveChangesAsync();
    }
}

I’m going to utilize Jasper/Marten middleware in the last section to greatly simplify the code above, so please read to the end:)

With this version of the handler code, thing are working a little differently:

  • The call to PublishAsync() does not immediately release the message to the in memory queue. Instead, it’s routed and held in memory by the IExecutionContext for later
  • When IDocumentSession.SaveChangesAsync() is called, the outgoing messages are persisted into the underlying database in the same database transaction as the change to the Reservation document. Using the Marten integration, the Jasper outbox can even do this within the exact same batched database command as a minor performance optimization
  • At the end of IDocumentSession.SaveChangesAsync() upon a successful transaction, the outgoing messages are kicked out into the outgoing message sending queues in Jasper.

The outbox usage here solves a couple issues. First, it eliminates the race condition between the outgoing messages and the database changes. Secondly, it prevents situations of system inconsistency where either the message succeeds and the database changes fail, or vice versa.

I’ll be writing up a follow up post later this week or next diving deeper into Jasper’s outbox implementation. For a quick preview, by taking a very different approach than existing messaging tools in .Net, Jasper’s outbox is already usable in more scenarios than other alternatives and I’ll try to back that assertion up next time. To answer the obvious question in the meantime, Jasper’s outbox gives you an at least once delivery guarantee even if the current process fails.

Streamline the Handler Mechanics

So the “register outbox, public messages, commit session transaction” dance could easily get repetitive in your code. Jasper’s philosophy is that repetitive code is wasteful, so let’s eliminate the cruft-y code we were writing strictly for Marten or Jasper in the ConfirmReservationHandler. The code below is the exact functional equivalent to the earlier handler — even down to enrolling in the outbox:

public static class ConfirmReservationHandler
{
    [Transactional]
    public static async Task<ReservationConfirmed> Handle(ConfirmReservation command, IDocumentSession session)
    {
        var reservation = await session.LoadAsync<Reservation>(command.ReservationId);

        reservation!.IsConfirmed = true;

        session.Store(reservation);

        // Kicking out a "cascaded" message
        return new ReservationConfirmed(reservation.Id);
    }
}

The usage of the [Transactional] attribute opts only this handler into using Marten transactional middleware that handles all the outbox enrollment mechanics and calls IDocumentSession.SaveChangesAsync() for us after this method is called.

By returning Task<ReservationConfirmed>, I’m directing Jasper to publish the returned value as a message upon the successful completion of the incoming message. I personally like the cascading message pattern in Jasper as a way to make unit testing handler code easier. This was based on a much older custom service bus I helped build and run in production in the mid-10’s.

On the next episode of “please, please pay attention to my OSS projects!”

My very next post is a sequel to Marten just got better for CQRS architectures, but this time using some new Jasper functionality with Marten to further streamline out repetitive code.

Following behind that, I want to write follow ups doing a deeper dive on Jasper’s outbox implementation, using Rabbit MQ with Jasper, then a demonstration of all the copious command line utilities built into both Jasper and Marten.

As my mentor from Dell used to say, “thanks for listening, I feel better now”

Leave a comment