What Is Good Code?

This is the second part of a 3 or 4 part series where I’m formulating my thoughts about an ongoing initiative at MedeAnalytics. I started yesterday with a related post called On Giving Technical Guidance to Others that’s a synopsis of an impromptu lecture I game our architecture team about all the things I wish I’d known before becoming any kind of technical leader. I’ll follow this post up hopefully as soon as tomorrow with my reasoning about why prescriptive architectures are harmful and my own spin on the SOLID Principles.

I’m part of an architectural team that’s been charged with modernizing and improving our very large, existing systems. We have an initiative just getting off the ground to break off part of one of our monoliths into a separate system to begin a strangler application strategy to modernize the system over time. This gives us a chance to work out how we want our systems to be structured and built going forward in a smaller subset of work instead of trying to boil the ocean to update the entire monolith codebase at one time.

As part of that effort, I’m trying to put some stakes in the ground to:

  • Ban all usage of formal, prescriptive architectural styles like the Onion Architecture or Clean Architecture because I find that they do more harm than good. Rather, I’m pushing hard for vertical slice or feature folder code organization while still satisfying the need for separation of concerns and decent structuring of the code
  • Generally choose lower code ceremony approaches whenever possible because that promotes easier evolution of the code, and in the end, the only truly guaranteed path to good code is adaptation and evolution in the face of feedback about the code.
  • Be very cautious about how we abstract database access to avoid causing unnecessary complexity or poor performance, which means I probably want to ban any usage of naive IRepository<T> type abstractions
  • Put the SOLID Principles into a little bit of perspective as we do this work and make sure our developers and architects have a wider range of mental tools in their design toolbox than just an easy to remember but hard to interpret or apply acronym developed by C++ developers before many of our developers were even born

The rest of this post is just trying to support those opinions.

First, What is Good?

More on this in a later post as I give my take on SOLID, but Dan North made an attempt at describing “good code” that’s worth your read.

Let’s talk a little bit about the qualities you want in your code. Quite a few folks are going to say that the most important quality is that the code satisfies the business needs and delivers value to the business! If you’ll please get that bit of self righteousness out of your system, let’s move on to the kind of technical quality that’s necessary to continue to efficiently deliver business value over time.

  • You can understand what the code is doing, navigate within the codebase, and generally find code where you would expect it to be based on the evident and documented rules of the system architecture.
  • The code exhibits separation of concerns, meaning that you’re generally able to reason about and change one responsibility of the code at a time (data access, business logic, validation logic, data presentation, etc.). Cohesion and coupling are the alpha and omega of software design. I’m a very strong believer in evolutionary approaches to designing software as the only truly reliable method to arrive at good code, but that’s largely dependent upon the qualities of cohesion and coupling within your code.
  • Rapid feedback is vital to effective coding, so testability of the code is a major factor for me. This can mean that code is structured in a way that it’s easy to unit test in isolation (i.e., you can effectively test business rules without having to run the full stack application or in one hurtful extreme, be forced to use a tool like Selenium). This version of testability is very largely a restatement of cohesion and coupling. Alternatively, if the code depends on some kind of infrastructure that’s easy to deal with in integration testing (like Marten!) and the integration tests run “fast enough,” I say you can relax separation of concerns and jumble things together as long as the code is still easy to reason about.
  • I don’t know a pithy way to describe this, but the code needs to carefully expose the places where system state is changed or “mutated” to make the code’s behavior predictable and prevent bugs. Whether that’s adopting command query segregation, using elements of functional programming, or the uni-directional data flow in place of two way data binding in user interface development, system state changes are an awfully easy way to introduce bugs in code and should be dealt with consciously and with some care.

I think most of us would say that code should be “simple,” and I’d add that I personally want code to be written in a low ceremony way that reduces noise in the code. The problem with that whole statement is that it’s very subjective:

Which is just to say that saying the words “just write simple code!” isn’t necessarily all that helpful or descriptive. What’s helpful is to have some mental tools to help developers judge whether or not their code is “good” and move in the direction of more successful code. Bet yet, do that without introducing unnecessary complexity or code ceremony through well-intentioned prescriptive architectures like “Onion” or “Clean” that purposely try to force developers to write code “the right way.”

And next time on Jeremy tries to explain his twitter ranting…

This has inevitably taken longer than I wished to write, so I’m breaking things up. I will follow up tomorrow and Thursday with my analysis of SOLID, an explanation of why I think the Onion/Clean Architecture style of code organization is best avoided, and eventually some thoughts on database abstractions.

On Giving Technical Guidance to Others

I’m still working on my promised SOLID/Anti-Onion/Anti-Clean/Database Abstraction post, but it’s as usual taking longer than I’d like and I’m publishing this section separately.

Just as a quirk of circumstances, I pretty well went straight from being a self-taught “Shadow IT” developer to being a lead developer and de facto architect on a mission critical supply chain application for a then Fortune 500 company. The system was an undeniable success in the short term, but it came at a cost to me because as a first time lead I had zero ability to enable the other developers working with me to be productive. As such, I ended up writing the mass majority of the code and inevitably became the bottleneck on all subsequent production issues. That doesn’t scale.

The following year I had another chance to lead a follow up project and vowed to do a better job with the other developers (plus I was getting a lot of heat from various management types to do so). In a particular case that I remember to this day, I wrote up a detailed Word document for a coding assignment for another developer. I got all the way down to class and method names and even had some loose sample code I think. I handed that off, patted myself on the back for being a better lead, and went off on my merry way.

As you might have surmised, when I got his code back later it was unusable because he did exactly what I said to do — which turned out to be wrong based on factors I hadn’t anticipated. Worse, he only did exactly what I said to do and missed some concerns that I didn’t think needed to be explicitly called out. I’ve thought a lot about this over the years and come to some conclusions about how I should have tried to work differently with that developer. Before diving into that, let’s first talk about you for awhile!

Congratulations! You’ve made it to some kind of senior technical role in your company. You’ve attained enough skill and knowledge to be recognized for your individual contributions, and now your company is putting you in a position to positively influence other developers, determine technical strategies, and serve as a steward for your company’s systems.

Hopefully you’ll still be hands on in the coding and testing, but increasingly, your role is going to involve trying to create and evolve technical guidance for other developers within your systems. More and more, your success is going to be dependent on your ability to explain ideas, concepts, and approaches to other developers. Not that I’m the fount of all wisdom about this, but here’s some of the things I wish I’d understood before being put into technical leadership roles:

  • It’s crucial to provide the context, reasoning, and applicability behind any technical guidance. Explaining why or when are we doing this is just as important as the “what” or “how.”
  • Being too specific in the guidance or instructions to another developer can easily come with the unintended consequence of turning off their brains and will frequently lead to poor results. Expanding on my first point, it’s better to explain the goals, how their work fits into the larger system, and the qualities of the code you’re hoping to achieve rather than try to make them automatons just following directions. It’s quite possible that JIRA-driven development exacerbates this potential problem.
  • You need to provide some kind of off-ramp to developers to understand the limitations of the guidance. The last thing you want is for developers to blindly follow guidance that is inappropriate for a circumstance that wasn’t anticipated during the formulation of said guidance
  • Recommendations about technology usage probably needs to come as some kind of decision tree with multiple options to its applicability because there’s just about never a one size fits all tool
  • By all means, allow and encourage the actual developers to actively look for better approaches because they’re the ones closest to their code. Especially with talented younger developers, you never want to take away their sense of initiative or close them off from providing feedback, adjustments, or flat out innovation to the “official” guidance. At the very least, you as a senior technical person need to pay attention when a developer tells you that the current approach is confusing or laborious or feels too complicated.
  • Treat every possible recommendation or technical guidance as a theory that hasn’t yet been perfectly proven.

I’ve talked a lot about giving technical guidance, but you should never think that you or any other individual are responsible for doing all the thinking within a software ecosystem. What you might be responsible for is facilitating the sharing of learning and knowledge through the company. I was lucky enough early in my career to spend just a little bit of time working with Martin Fowler who effectively acts as a sort of industry wide, super bumble bee gathering useful knowledge from lots of different projects and cross-pollinating what he’s learned to other teams and other projects. Maybe you don’t impact the entire software development industry like he has, but you can at least facilitate that within your own team or maybe within your larger organization.

As an aside, a very helpful technique to use when trying to explain something in code to another developer is to ask them to explain it back to you in their own words — or conversely, I try to do this when I’m the one getting the explanation to make sure I’ve really understood what I’m being told. My wife is an educator and tells me this is a common technique for teachers as well.

Next time…

In my next post I’m going to cover a lot of ground about why I think prescriptive architectural styles like the “Onion” or “Clean” are harmful, alternatives, a discussion about what use is SOLID these days (more than none, but much less than the focus many people put on it is really worth), and a discussion about database abstractions I find to be harmful that tend to be side effects of prescriptive architectures.

Projecting Marten Events to a Flat Table

Marten 5.8 dropped over the weekend with mostly bug fixes, but one potentially useful new feature for projecting event data to plain old SQL tables. One of the strengths of Marten that we’ve touted from the beginning was the ability to mix document database features with event sourcing and old fashioned relational tables all with one database in a single application as your needs dictate.

Let’s dive right into a sample usage of this. If you’re a software developer long enough and move around just a little bit, you’re going to get sucked into building a workflow for importing flat files of dubious quality from external partners or customers. I’m going to claim that event sourcing is a good fit for this problem domain for event sourcing (and also suggesting this pretty strongly at work). That being said, here’s what the event types might look like that are recording the progress of a file import:

public record ImportStarted(
    DateTimeOffset Started,
    string ActivityType,
    string CustomerId,
    int PlannedSteps);

public record ImportProgress(
    string StepName,
    int Records,
    int Invalids);

public record ImportFinished(DateTimeOffset Finished);

public record ImportFailed;

At some point, we’re going to want to apply some metrics to the execution history to understand the average size of the incoming files, what times of the day have more or less traffic, and performance information broken down by file size, file type, and who knows what. This sounds to me like a perfect use case for SQL queries against a flat table.

Enter Marten 5.8’s new functionality. First off, let’s do this simply by writing some explicit SQL in a new projection that we can replay against the existing events when we’re ready. I’m going to use Marten’s EventProjection as a base class in this case:

public class ImportSqlProjection: EventProjection
{
    public ImportSqlProjection()
    {
        // Define the table structure here so that 
        // Marten can manage this for us in its schema
        // management
        var table = new Table("import_history");
        table.AddColumn<Guid>("id").AsPrimaryKey();
        table.AddColumn<string>("activity_type").NotNull();
        table.AddColumn<DateTimeOffset>("started").NotNull();
        table.AddColumn<DateTimeOffset>("finished");

        SchemaObjects.Add(table);

        // Telling Marten to delete the table data as the 
        // first step in rebuilding this projection
        Options.DeleteDataInTableOnTeardown(table.Identifier);
    }

    public void Project(IEvent<ImportStarted> e, IDocumentOperations ops)
    {
        ops.QueueSqlCommand("insert into import_history (id, activity_type, started) values (?, ?, ?)",
            e.StreamId, e.Data.ActivityType, e.Data.Started
        );
    }

    public void Project(IEvent<ImportFinished> e, IDocumentOperations ops)
    {
        ops.QueueSqlCommand("update import_history set finished = ? where id = ?",
            e.Data.Finished, e.StreamId
        );
    }

    public void Project(IEvent<ImportFailed> e, IDocumentOperations ops)
    {
        ops.QueueSqlCommand("delete from import_history where id = ?", e.StreamId);
    }
}

A couple notes about the code above:

  • We’ve invested a huge amount of time in Marten and the related Weasel library building in robust schema management. The Table model I’m using up above comes from Weasel, and this allows a Marten application using this projection to manage the table creation in the underlying database for us. This new table would be part of all Marten’s built in schema management functionality.
  • The QueueSqlCommand() functionality came in a couple minor releases ago, and gives you the ability to add raw SQL commands to be executed as part of a Marten unit of work transaction. It’s important to note that the QueueSqlCommand() method doesn’t execute inline, rather it adds the SQL you enqueue to be executed in a batch query when you eventually call the holding IDocumentSession.SaveChangesAsync(). I can’t stress this enough, it has consistently been a big performance gain in Marten to batch up queries to the database server and reduce the number of network round trips.
  • The Project() methods are a naming convention with Marten’s EventProjection. The first argument is always assumed to be the event type. In this case though, it’s legal to use Marten’s IEvent<T> envelope type to allow you access to event metadata like timestamps, version information, and the containing stream identity.

Now, let’s use Marten’s brand new FlatTableProjection recipe to do a little more advanced version of the earlier projection:

public class FlatImportProjection: FlatTableProjection
{
    // I'm telling Marten to use the same database schema as the events from
    // the Marten configuration in this application
    public FlatImportProjection() : base("import_history", SchemaNameSource.EventSchema)
    {
        // We need to explicitly add a primary key
        Table.AddColumn<Guid>("id").AsPrimaryKey();

        TeardownDataOnRebuild = true;

        Project<ImportStarted>(map =>
        {
            // Set values in the table from the event
            map.Map(x => x.ActivityType).NotNull();
            map.Map(x => x.CustomerId);
            map.Map(x => x.PlannedSteps, "total_steps")
                .DefaultValue(0);
            
            map.Map(x => x.Started);

            // Initial values
            map.SetValue("status", "started");
            map.SetValue("step_number", 0);
            map.SetValue("records", 0);
        });

        Project<ImportProgress>(map =>
        {
            // Add 1 to this column when this event is encountered
            map.Increment("step_number");

            // Update a running sum of records progressed
            // by the number of records on this event
            map.Increment(x => x.Records);

            map.SetValue("status", "working");
        });

        Project<ImportFinished>(map =>
        {
            map.Map(x => x.Finished);
            map.SetValue("status", "completed");
        });

        // Just gonna delete the record of any failures
        Delete<ImportFailed>();

    }
}

A couple notes on this version of the code:

  • FlatFileProjection is adding columns to its table based on the designated column mappings. You can happily customize the FlatFileProjection.Table object to add indexes, constraints, or defaults.
  • Marten is able to apply schema migrations and manage the table from the FlatFileProjection as long as it’s registered with Marten
  • When you call Map(x => x.ActivityType), Marten is by default mapping that to a kebab-cased derivation of the member name for the column, so “activity_type”. You can explicitly map the column name yourself.
  • The call to Map(expression) chains a fluent builder for the table column if you want to further customize the table column with default values or constraints like the NotNull()
  • In this case, I’m building a database row per event stream. The FlatTableProjection can also map to arbitrary members of each event type
  • The Project<T>(lambda) configuration leads to a runtime, code generation of a Postgresql upsert command so as to not be completely dependent upon events being captured in the exact right order. I think this will be more robust in real life usage than the first, more explicit version.

The FlatTableProjection in its first incarnation is not yet able to use event metadata because I got impatient to finish up 5.8 and punted on that for now. I think it’s safe to say this feature will evolve when it hits some real world usage.

Command Line Support for Marten Projections

Marten 5.7 was published earlier this week with mostly bug fixes. The one, big new piece of functionality was an improved version of the command line support for event store projections. Specifically, Marten added support for multi-tenancy through multiple databases and the ability to use separate document stores in one application as part of our V5 release earlier this year, but the projections command didn’t really catch up and support that — but now it can with Marten v5.7.0.

From a sample project in Marten we use to test this functionality, here’s part of the Marten setup that has a mix of asynchronous and inline projections, as well as uses the database per tenant strategy:

services.AddMarten(opts =>
{
    opts.AutoCreateSchemaObjects = AutoCreate.All;
    opts.DatabaseSchemaName = "cli";

    // Note this app uses multiple databases for multi-tenancy
 opts.MultiTenantedWithSingleServer(ConnectionSource.ConnectionString)
        .WithTenants("tenant1", "tenant2", "tenant3");

    // Register all event store projections ahead of time
    opts.Projections
        .Add(new TripAggregationWithCustomName(), ProjectionLifecycle.Async);
    
    opts.Projections
        .Add(new DayProjection(), ProjectionLifecycle.Async);
    
    opts.Projections
        .Add(new DistanceProjection(), ProjectionLifecycle.Async);

    opts.Projections
        .Add(new SimpleAggregate(), ProjectionLifecycle.Inline);

    // This is actually important to register "live" aggregations too for the code generation
    opts.Projections.SelfAggregate<SelfAggregatingTrip>(ProjectionLifecycle.Live);
}).AddAsyncDaemon(DaemonMode.Solo);

At this point, let’s introduce the Marten.CommandLine Nuget dependency to the system just to add Marten related command line options directly to our application for typical database management utilities. Marten.CommandLine brings with it a dependency on Oakton that we’ll actually use as the command line parser for our built in tooling. Using the now “old-fashioned” pre-.NET 6 manner of running a console application, I add Oakton to the system like this:

public static Task<int> Main(string[] args)
{
    // Use Oakton for running the command line
    return CreateHostBuilder(args).RunOaktonCommands(args);
}

When you use the dotnet command line options, just keep in mind that the “–” separator you’re seeing me here is used to separate options directly to the dotnet executable itself on the left from arguments being passed to the application itself on the right of the “–” separator.

Now, turning to the command line at the root of our project, I’m going to type out this command to see the Oakton options for our application:

dotnet run -- help

Which gives us this output:

If you’re wondering, the commands db-apply and marten-apply are synonyms that’s there as to not break older users when we introduced the now, more generic “db” commands.

And next I’m going to see the usage for the projections command with dotnet run -- help projections, which gives me this output:

For the simplest usage, I’m just going to list off the known projections for the entire system with dotnet run -- projections --list:

Which will show us the four registered projections in the main IDocumentStore, and tells us that there are no registered projections in the separate IOtherStore.

Now, I’m just going to continuously run the asynchronous projections for the entire application — while another process is constantly pumping random events into the system so there’s always new work to be doing — with dotnet run -- projections, which will spit out this continuously updating table (with an assist from Spectre.Console):

What I hope you can tell here is that every asynchronous projection is actively running for each separate tenant database. The blue “High Water Mark” is telling us where the current event store for each database is at.

And finally, for the main reason why I tackled the projections command line overhaul last week, folks needed a way to rebuild projections for every database when using a database per tenant strategy.

While the new projections command will happily let you rebuild any combination of database, store, and projection name by flags or even an interactive mode, we can quickly trigger a full rebuild of all the asynchronous projections with dotnet run -- projections --rebuild, which is going to loop through every store and database like so:

For the moment, the rebuild works on all the projections for a single database at a time. I’m sure we’ll attempt some optimizations of the rebuilding process and try to understand how much we can really parallelize more, but for right now, our users have an out of the box way to rebuild projections across separate databases or separate stores.

This *might* be a YouTube video soon just to kick off my new channel for Marten/Jasper/Oakton/Alba/Lamar content.

Low Code Ceremony Sagas with Jasper & Marten

You’ll need at least Jasper v2.0.0-alpha-4 if you want to recreate the saga support in this post. All the sample code for this post is in an executable sample on GitHub. Jasper does support sagas with EF Core and Sql Server or Postgresql, but Marten is where most of the effort is going just at the moment.

The Saga pattern is a way to solve the issue of logical, long-running transactions that necessarily need to span over multiple operations. In the approaches I’ve encountered throughout my career, this has generally meant persisting a “saga state” of some sort in a database that is used within a message handling framework to “know” what steps have been completed, and what’s outstanding.

Jumping right into an example, consider a very simple order management service that will have steps to:

  1. Create a new order
  2. Complete the order
  3. Or alternatively, delete new orders if they have not been completed within 1 minute

For the moment, I’m going to ignore the underlying persistence and just focus on the Jasper message handlers to implement the order saga workflow with this simplistic saga code:

using Baseline.Dates;
using Jasper;

namespace OrderSagaSample;

public record StartOrder(string Id);

public record CompleteOrder(string Id);

public record OrderTimeout(string Id) : TimeoutMessage(1.Minutes());

public class Order : Saga
{
    public string? Id { get; set; }

    // By returning the OrderTimeout, we're triggering a "timeout"
    // condition that will process the OrderTimeout message at least
    // one minute after an order is started
    public OrderTimeout Start(StartOrder order, ILogger<Order> logger)
    {
        Id = order.Id; // defining the Saga Id.

        logger.LogInformation("Got a new order with id {Id}", order.Id);
        // creating a timeout message for the saga
        return new OrderTimeout(order.Id);
    }

    public void Handle(CompleteOrder complete, ILogger<Order> logger)
    {
        logger.LogInformation("Completing order {Id}", complete.Id);

        // That's it, we're done. This directs Jasper to delete the
        // persisted saga state after the message is done.
        MarkCompleted();
    }

    public void Handle(OrderTimeout timeout, ILogger<Order> logger)
    {
        logger.LogInformation("Applying timeout to order {Id}", timeout.Id);

        // That's it, we're done. Delete the saga state after the message is done.
        MarkCompleted();
    }
}

I’m just aiming for a quick sample rather than exhaustive documentation here, but a few notes:

  • Jasper leans a bit on type and naming conventions to discover message handlers and to “know” how to call these message handlers. Some folks will definitely not like the magic, but this approach leads to substantially less code and arguably complexity compared to existing .Net tools
  • Jasper supports the idea of scheduled messages, and the new TimeoutMessage base class up there is just a way to utilize that support for “saga timeout” conditions
  • Jasper generally tries to adapt to your application code rather than force a lot of mandatory framework artifacts into your message handler code

Now let’s move over to the service bootstrapping and add Marten in as our persistence mechanism in the Program file:

using Jasper;
using Jasper.Persistence.Marten;
using Marten;
using Oakton;
using Oakton.Resources;
using OrderSagaSample;

var builder = WebApplication.CreateBuilder(args);

// Not 100% necessary, but enables some extra command line diagnostics
builder.Host.ApplyOaktonExtensions();

// Adding Marten
builder.Services.AddMarten(opts =>
    {
        var connectionString = builder.Configuration.GetConnectionString("Marten");
        opts.Connection(connectionString);
        opts.DatabaseSchemaName = "orders";
    })

    // Adding the Jasper integration for Marten.
    .IntegrateWithJasper();


builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

// Do all necessary database setup on startup
builder.Services.AddResourceSetupOnStartup();

// The defaults are good enough here
builder.Host.UseJasper();

var app = builder.Build();

// Just delegating to Jasper's local command bus for all
app.MapPost("/start", (StartOrder start, ICommandBus bus) => bus.InvokeAsync(start));
app.MapPost("/complete", (CompleteOrder start, ICommandBus bus) => bus.InvokeAsync(start));
app.MapGet("/all", (IQuerySession session) => session.Query<Order>().ToListAsync());
app.MapGet("/", (HttpResponse response) =>
{
    response.Headers.Add("Location", "/swagger");
    response.StatusCode = 301;
});

app.UseSwagger();
app.UseSwaggerUI();

return await app.RunOaktonCommands(args);

Off screen, I’ve started up a docker container for Postgresql to get a blank database. With that running, I’ll start the application up with the usual dotnet run command and open up the Swagger page:

You’ll get a lot of SQL in your terminal on the first run as Marten sets up the database for you, that’s perfectly normal.

I’m going to first create a new order for “Shoes” and execute the /create endpoint:

And verify that it’s persisted by checking the /all endpoint:

If I’m quick enough, I’ll post {"Id": "Shoes"} to /complete, and then verify through the /all endpoint that the “Shoes” order has been completed.

Otherwise, if I’m too slow to complete the order, the timeout message will be applied to our order and you’ll see evidence of that in the logging output like so:

And that’s it, one working saga implementation with database backed persistence through Marten. The goal of Jasper is to make this kind of server side development as low ceremony and easy to use as possible, so any feedback about what you do or don’t like in this sample would be very helpful.

Related Posts

I’ve spit out quite a bit of blogging content the past several weeks on both Marten and Jasper:

A Vision for Stateful Resources at Development or Deployment Time

As is not atypical, I found a couple little issues with both Oakton and Jasper in the course of writing this post. To that end, if you want to use the functionality shown here yourself, just make sure you’re on at least Oakton 4.6.1 and Jasper 2.0-alpha-3.

I’ve spit out quite a bit of blogging content the past several weeks on both Marten and Jasper:

I’ve been showing some new integration between Jasper, Marten, and Rabbit MQ. This time out, I want to show the new “stateful resource” model in a third tool named Oakton to remove development and deployment time friction when using these tools on a software project. Oakton itself is a command line processing tool, that more importantly, can be used to quickly add command line utilities directly to your .Net executable.

Drawing from a sample project in the Jasper codebase, here’s the configuration for an issue tracking application that uses Jasper, Marten, and RabbitMQ with Jasper’s inbox/outbox integration using Postgresql:

using IntegrationTests;
using Jasper;
using Jasper.Persistence.Marten;
using Jasper.RabbitMQ;
using Marten;
using MartenAndRabbitIssueService;
using MartenAndRabbitMessages;
using Oakton;
using Oakton.Resources;

var builder = WebApplication.CreateBuilder(args);

builder.Host.ApplyOaktonExtensions();

builder.Host.UseJasper(opts =>
{
    // I'm setting this up to publish to the same process
    // just to see things work
    opts.PublishAllMessages()
        .ToRabbitExchange("issue_events", exchange => exchange.BindQueue("issue_events"))
        .UseDurableOutbox();

    opts.ListenToRabbitQueue("issue_events").UseInbox();

    opts.UseRabbitMq(factory =>
    {
        // Just connecting with defaults, but showing
        // how you *could* customize the connection to Rabbit MQ
        factory.HostName = "localhost";
        factory.Port = 5672;
    });
});

// This is actually important, this directs
// the app to build out all declared Postgresql and
// Rabbit MQ objects on start up if they do not already
// exist
builder.Services.AddResourceSetupOnStartup();

// Just pumping out a bunch of messages so we can see
// statistics
builder.Services.AddHostedService<Worker>();

builder.Services.AddMarten(opts =>
{
    // I think you would most likely pull the connection string from
    // configuration like this:
    // var martenConnectionString = builder.Configuration.GetConnectionString("marten");
    // opts.Connection(martenConnectionString);

    opts.Connection(Servers.PostgresConnectionString);
    opts.DatabaseSchemaName = "issues";

    // Just letting Marten know there's a document type
    // so we can see the tables and functions created on startup
    opts.RegisterDocumentType<Issue>();

    // I'm putting the inbox/outbox tables into a separate "issue_service" schema
}).IntegrateWithJasper("issue_service");

var app = builder.Build();

app.MapGet("/", () => "Hello World!");

// Actually important to return the exit code here!
return await app.RunOaktonCommands(args);

Just to describe what’s going on up above, the .NET code above is going to depend on:

  1. A Postgresql database with the necessary tables and functions that Marten needs to be able to persist issue data
  2. Additional tables in the Postgresql database for persisting the outgoing and incoming messages in the inbox/outbox usage
  3. A Rabbit MQ broker with the necessary exchanges, queues, and bindings for the issue application as it’s configured

In a perfect world, scratch that, in an acceptable world, a developer should be able to start from a fresh clone of this issue tracking codebase and be able to run the system and/or any integration tests locally almost immediately with very minimal friction along the way.

At this point, I’m a big fan of trying to run development infrastructure in Docker where it’s easy to spin things up on demand, and just as quickly shut it all down when you no longer need it. To that end, let’s just say we’ve got a docker-compose.yml file for both Postgresql and Rabbit MQ. Having that, I’ll type docker compose up -d from the command line to spin up both infrastructure elements.

Cool, but now I need to have the database schemas built out with Marten tables and the Jasper inbox/outbox tables plus the Rabbit MQ queues for the application. This is where Oakton and its new “stateful resource” model comes into play. Jasper’s Rabbit MQ plugin and the inbox/outbox storage both expose Oakton’s IStatefulResource interface for easy setup. Likewise, Marten has support for this model as well (in this case it’s just a very slim wrapper around Marten’s longstanding database schema management functionality).

If you’re not familiar with this, the double dash “–” argument in dotnet run helps .NET to know which arguments (“run”) apply to the dotnet executable and the arguments to the right of the “–” that are passed into the application itself.

Opening up the command line terminal of your preference to the root of the project, I type dotnet run -- help to see what options are available in our Jasper application through the usage of Oakton:

There’s a couple commands up there that will help us out with the database management, but I want to focus on the resources command. To that end, I’m going to type dotnet run -- resources list just to see what resources our issue tracker application has:

Just through the configuration up above, the various Jasper elements have registered “stateful resource” adapters to for Oakton for the underlying Marten database, the inbox/outbox data (Envelope Storage above), and Rabbit MQ.

In the next case, I’m going to use dotnet run -- resources check to see if all our infrastructure is configured the way our application needs — and I’m going to do this without first starting the database or the message broker, so this should fail spectacularly!

Here’s the summary output:

If you were to scroll up a bit, you’d see a lot of exceptions thrown describing what’s wrong (helpfully color coded by Spectre.Console) including this one explaining that an expected Rabbit MQ queue is missing:

So that’s not good. No worries though, I’ll start up the docker containers, then go back to the command line and type:

dotnet run -- resources setup

And here’s some of the output:

Forget the command line…

If you’ll notice the single line of code `builder.Services.AddResourceSetupOnStartup();` in the bootstrapping code above, that’s adding a hosted service to our application from Oakton that will verify and apply all configured set up to the known Marten database, the inbox/outbox storage, and the required Rabbit MQ objects. No command line chicanery necessary. I’m hopeful that this will enable developers to be more productive by dealing with this kind of environmental setup directly inside the application itself rather than recreating the definition of what’s necessary in external scripts.

This was a fair amount of work, so I’d be very welcome to any kind of feedback here.

Using Rabbit MQ with Jasper

I’ve spit out quite a bit of blogging content the past several weeks on both Marten and Jasper:

As a follow up today, I’d like to introduce Rabbit MQ integration with Jasper as the actual transport between processes. I’m again going to use a “Ping/Pong” sample of sending messages between two processes (you may want to refer to my previous ping/pong post). You can find the sample code for this post on GitHub.

Sending messages betwixt processes

The message types and Jasper handlers are basically identical to those in my last post if you want a reference. In the Pinger application, I’ve added a reference to the Jasper.RabbitMQ Nuget library, which adds transitive references to Jasper itself and the Rabbit MQ client library. In the application bootstrapping, I’ve got this to connect to Rabbit MQ, send Ping messages to a Rabbit MQ exchange named *pings*, and add a hosted service just to send a new Ping message once a second:

using System.Net.NetworkInformation;
using Jasper;
using Jasper.RabbitMQ;
using Oakton;
using Pinger;

return await Host.CreateDefaultBuilder(args)
    .UseJasper(opts =>
    {
        // Listen for messages coming into the pongs queue
        opts
            .ListenToRabbitQueue("pongs")

            // This won't be necessary by the time Jasper goes 2.0
            // but for now, I've got to help Jasper out a little bit
            .UseForReplies();

        // Publish messages to the pings queue
        opts.PublishMessage<Ping>().ToRabbitExchange("pings");

        // Configure Rabbit MQ connection properties programmatically
        // against a ConnectionFactory
        opts.UseRabbitMq(rabbit =>
        {
            // Using a local installation of Rabbit MQ
            // via a running Docker image
            rabbit.HostName = "localhost";
        })
            // Directs Jasper to build any declared queues, exchanges, or
            // bindings with the Rabbit MQ broker as part of bootstrapping time
            .AutoProvision();

        // This will send ping messages on a continuous
        // loop
        opts.Services.AddHostedService<PingerService>();
    }).RunOaktonCommands(args);

On the Ponger side, I’ve got this setup:

using Baseline.Dates;
using Jasper;
using Jasper.RabbitMQ;
using Oakton;

return await Host.CreateDefaultBuilder(args)
    .UseJasper(opts =>
    {
        // Going to listen to a queue named "pings", but disregard any messages older than
        // 15 seconds
        opts.ListenToRabbitQueue("pings", queue => queue.TimeToLive(15.Seconds()));

        // Configure Rabbit MQ connections and optionally declare Rabbit MQ
        // objects through an extension method on JasperOptions.Endpoints
        opts.UseRabbitMq() // This is short hand to connect locally
            .DeclareExchange("pings", exchange =>
            {
                // Also declares the queue too
                exchange.BindQueue("pings");
            })
            .AutoProvision()

            // Option to blow away existing messages in
            // all queues on application startup
            .AutoPurgeOnStartup();
    })
    .RunOaktonCommands(args);

When running the applications side by side, I’ll get output like this from Pinger:

Got pong #55
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message PongMessage#01818741-30c2-4ab7-9fa8-d7870d194754 from rabbitmq://queue/pings
info: Jasper.Runtime.JasperRuntime[204]
      Sending agent for rabbitmq://exchange/pings has resumed
Got pong #56
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message PongMessage#01818741-34b3-4d34-910a-c9fc4c191bfc from rabbitmq://queue/pings
info: Jasper.Runtime.JasperRuntime[204]
      Sending agent for rabbitmq://exchange/pings has resumed

and this from Ponger:

Got ping #57
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message PingMessage#01818741-389e-4ddd-96cf-fb8a76e4f5f1 from rabbitmq://queue/pongs
info: Jasper.Runtime.JasperRuntime[204]
      Sending agent for rabbitmq://queue/pongs has resumed
Got ping #58
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message PingMessage#01818741-3c8e-469f-a330-e8c9e173dc40 from rabbitmq://queue/pongs
info: Jasper.Runtime.JasperRuntime[204]
      Sending agent for rabbitmq://queue/pongs has resumed

and of course, since Jasper is a work in progress, there’s a bunch of erroneous or at least misleading messages about a sending agent getting resumed that I need to take care of…

So what’s in place right now that can be inferred from the code samples above? The Jasper + Rabbit MQ integration today provides:

  • A way to subscribe to incoming messages from a Rabbit MQ queue. It’s not shown here, but you have all the options you’d expect to configure Rabbit MQ prefetch counts, message handling parallelism, and opt into persistent, inbox messaging
  • Mechanisms to declare Rabbit MQ queues, exchanges, and bindings as well as the ability to fine tune Rabbit MQ options for these objects
  • Create declared Rabbit MQ objects at system startup time
  • Automatically purge old messages out of Rabbit MQ queues on start up. It’s not shown here, but you can do this on a queue by queue basis as well
  • Create publishing rules from a Jasper process to direct outgoing messages to Rabbit MQ exchanges or directly to queues

What’s also available but not shown here is opt in, conventional routing to route outgoing messages to Rabbit MQ exchanges based on the message type names or to automatically declare and configure subscriptions to Rabbit MQ queues based on the message types of the messages that this process handles. This conventional routing is suspiciously similar (copied from) MassTransit because:

  1. I like MassTransit’s conventional routing functionality
  2. Bi-directional wire compatibility mostly out of the box with MassTransit is a first class goal for Jasper 2.0

Why Rabbit MQ? What about other things?

Jasper has focused on Rabbit MQ as the main transport option out of the box because that’s what my shop already uses (and I’m most definitely trying to get us to use Jasper at work), it has a great local development option through Docker hosting, and frankly because it’s easy to use. Jasper also supports Pulsar of all things, and will definitely have a Kafka integration before 2.0. Other transports will be added based on user requests, and that’s also a good place for other folks to get involved.

I had someone asking about using Jasper with Amazon SQS, and that’s such a simple model, that I might build that out as a reference for other folks.

What’s up next?

Now that I’ve introduced Jasper, its outbox functionality, and its integration with Rabbit MQ, my next post is visiting the utilities baked into Jasper itself for dealing with stateful resources like databases or broker configuration at development and production time. This is going to include a lot of command line functionality baked into your application.

Ping/Pong Jasper Style

I’ve spit out quite a bit of blogging content the past several weeks on both Marten and Jasper:

For this post though, I just want to show off a very small sample “Ping/Pong” example of using Jasper to do messaging between different .NET processes. All of the code is drawn from a brand new sample on GitHub. I’m just using Jasper’s little built in TCP transport that’s probably just going to be useful for local development, but it happily serves for this sample.

First off, I’m going to build out a very small shared library just to hold the messages we’re going to exchange:

public class Ping
{
    public int Number { get; set; }
}

public class Pong
{
    public int Number { get; set; }
}

snippet source | anchor

And next, I’ll start a small Pinger service with the dotnet new worker template. There’s just three pieces of code, starting with the boostrapping code:

using Jasper;
using Jasper.Transports.Tcp;
using Messages;
using Oakton;
using Pinger;

return await Host.CreateDefaultBuilder(args)
    .UseJasper(opts =>
    {
        // Using Jasper's built in TCP transport

        // listen to incoming messages at port 5580
        opts.ListenAtPort(5580);

        // route all Ping messages to port 5581
        opts.PublishMessage<Ping>().ToPort(5581);

        // Registering the hosted service here, but could do
        // that with a separate call to IHostBuilder.ConfigureServices()
        opts.Services.AddHostedService<Worker>();
    })
    .RunOaktonCommands(args);

snippet source | anchor

and the Worker class that’s just going to publish a new Ping message once a second:

using Jasper;
using Messages;

namespace Pinger;

public class Worker : BackgroundService
{
    private readonly ILogger<Worker> _logger;
    private readonly IMessagePublisher _publisher;

    public Worker(ILogger<Worker> logger, IMessagePublisher publisher)
    {
        _logger = logger;
        _publisher = publisher;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var pingNumber = 1;

        while (!stoppingToken.IsCancellationRequested)
        {

            await Task.Delay(1000, stoppingToken);
            _logger.LogInformation("Sending Ping #{Number}", pingNumber);
            await _publisher.PublishAsync(new Ping { Number = pingNumber });
            pingNumber++;
        }
    }
}

snippet source | anchor

and lastly a message handler for any Pong messages coming back from the Ponger we’ll build next:

using Messages;

namespace Pinger;

public class PongHandler
{
    public void Handle(Pong pong, ILogger<PongHandler> logger)
    {
        logger.LogInformation("Received Pong #{Number}", pong.Number);
    }
}

snippet source | anchor

Okay then, next let’s move on to building the Ponger application. This time I’ll use dotnet new console to start the new project, then add references to our Messages library and Jasper itself. For the bootstrapping, add this code:

using Jasper;
using Jasper.Transports.Tcp;
using Microsoft.Extensions.Hosting;
using Oakton;

return await Host.CreateDefaultBuilder(args)
    .UseJasper(opts =>
    {
        // Using Jasper's built in TCP transport
        opts.ListenAtPort(5581);
    })
    .RunOaktonCommands(args);

snippet source | anchor

And a message handler for the Ping messages that will turn right around and shoot a Pong response right back to the original sender:

using Jasper;
using Messages;
using Microsoft.Extensions.Logging;

namespace Ponger;

public class PingHandler
{
    public ValueTask Handle(Ping ping, ILogger<PingHandler> logger, IExecutionContext context)
    {
        logger.LogInformation("Got Ping #{Number}", ping.Number);
        return context.RespondToSenderAsync(new Pong { Number = ping.Number });
    }
}

snippet source | anchor

public static class PingHandler
{
    // Simple message handler for the PingMessage message type
    public static ValueTask Handle(
        // The first argument is assumed to be the message type
        PingMessage message,

        // Jasper supports method injection similar to ASP.Net Core MVC
        // In this case though, IMessageContext is scoped to the message
        // being handled
        IExecutionContext context)
    {
        ConsoleWriter.Write(ConsoleColor.Blue, $"Got ping #{message.Number}");

        var response = new PongMessage
        {
            Number = message.Number
        };

        // This usage will send the response message
        // back to the original sender. Jasper uses message
        // headers to embed the reply address for exactly
        // this use case
        return context.RespondToSenderAsync(response);
    }
}

snippet source | anchor

If I start up first the Ponger service, then the Pinger service, I’ll see console output like this from Pinger:

info: Pinger.Worker[0]
      Sending Ping #11
info: Pinger.PongHandler[0]
      Received Pong #1
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message Pong#01817277-f692-42d5-a3e4-35d9b7d119fb from tcp://localhost:5581/
info: Pinger.PongHandler[0]
      Received Pong #2
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message Pong#01817277-f699-4340-a59d-9616aee61cb8 from tcp://localhost:5581/
info: Pinger.PongHandler[0]
      Received Pong #3
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message Pong#01817277-f699-48ea-988b-9e835bc53020 from tcp://localhost:5581/
info: Pinger.PongHandler[0]

and output like this in the Ponger process:

info: Ponger.PingHandler[0]
      Got Ping #1
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message Ping#01817277-d673-4357-84e3-834c36f3446c from tcp://localhost:5580/
info: Ponger.PingHandler[0]
      Got Ping #2
info: Jasper.Runtime.JasperRuntime[104]
      Successfully processed message Ping#01817277-da61-4c9d-b381-6cda92038d41 from tcp://localhost:5580/
info: Ponger.PingHandler[0]
      Got Ping #3

Building a More Useful Outbox for Reliable Messaging

I blog not at all, or a lot at once. Nothing in between. In the past couple weeks I’ve written:

As a follow up to those posts, Jasper comes with a robust transactional inbox/outbox implementation that partially runs as a background service using .NET’s hosted service mechanism. Jasper’s version of this functionality differs quite a bit in both implementation from other toolsets in the .NET space. Here’s a rundown so far:

  • When used, Jasper is persisting the outgoing message, metadata, and the intended destination in the hosting service’s application database (or at least that’s the way it’s intended to work).
  • A background service is running in every node to ensure that these outgoing messages don’t get marooned or fail to be sent out
  • Pending messages do not get “lost” if the process dies or is shut down unexpectedly
  • Pending messages are resumed if a process is just restarted
  • If running in a cluster, the other nodes can detect that another node is no longer running, and take on the persisted, outbound messages from that original node
  • So far, the outbox has support for either Postgresql or Sql Server as the backing database.
  • Not surprisingly, there’s much deeper integration with Marten, but there’s also support for using the outbox with EF Core
  • Jasper’s outbox is actually usable completely outside of Jasper command handlers. You might have to write a little more explicit code to use it within ASP.Net Core controllers or Minimal API methods for example, but it’s perfectly usable in those scenarios. This is unique to Jasper and currently unsupported in the most popular service bus tools in .NET at the time I’m writing this post.
  • Jasper comes with some database schema management baked in for the necessary database objects needed to integrate the outbox to make things easier to get up and going.
  • There’s also some command line tooling baked into Jasper that will hopefully be helpful at development or deployment time to manage the persisted message storage

Now, on to some sample code!

To reuse a code sample from earlier this week, let’s say we’re building a new web service for potential customers to make online reservations for their favorite restaurants. We’re also going to be using Jasper as a command and/or message bus (and Marten as the backing persistence infrastructure), so we might have a command handler for confirming a reservation like this simple example:

public static class ConfirmReservationHandler
{
    [Transactional]
    public static async Task<ReservationConfirmed> Handle(ConfirmReservation command, IDocumentSession session)
    {
        var reservation = await session.LoadAsync<Reservation>(command.ReservationId);

        reservation!.IsConfirmed = true;

        session.Store(reservation);

        // Kicking out a "cascaded" message
        return new ReservationConfirmed(reservation.Id);
    }
}

As I described in an earlier post, that handler — in conjunction with some Jasper middleware brought in by the [Transactional] attribute — is making database changes and publishing a corresponding message about those very changes. Those two actions are an atomic unit of logical work. They need to either both succeed, or not happen at all. We also need the message to only go out after the database changes succeed. Otherwise, there’s a possible race condition between the database changes being committed and the ReservationConfirmed message being processed in another thread with incorrect or inconsistent system data. And take my word for it on that last point, because I’ve seen that happen and directly cause production bugs.

Let’s quickly rule out an old fashioned two phase commit because that brings a lot of headaches I could do without for the rest of my career. As I mentioned in some of my earlier posts, Jasper comes out of the box with a transactional outbox built in for specifically the scenario shown in the handler above. Jasper has an add on Nuget library called Jasper.Persistence.Marten that adds some helpful integration between Jasper’s built in inbox/outbox “durability agent” and Marten. The setup code for that integration is just this:

// Normal Marten integration
builder.Services.AddMarten(opts =>
{
    opts.Connection("Host=localhost;Port=5433;Database=postgres;Username=postgres;password=postgres");
})
    // NEW! Adding Jasper outbox integration to Marten in the "messages"
    // database schema
    .IntegrateWithJasper("messages");

and Jasper’s own integration setup:

builder.Host.UseJasper(opts =>
{
    // more Jasper configuration for error handling etc.
});

To add some context, here’s the actual code (with some reformatting and annotations from me) that Jasper builds up at runtime around our command handler:

public class ConfirmReservationHandler615381178 : MessageHandler
{
    private readonly OutboxedSessionFactory _outboxedSessionFactory;

    public ConfirmReservationHandler615381178(OutboxedSessionFactory outboxedSessionFactory)
    {
        _outboxedSessionFactory = outboxedSessionFactory;
    }

    public override async Task HandleAsync(IExecutionContext context, CancellationToken cancellation)
    {
        var confirmReservation = (ConfirmReservation)context.Envelope.Message;

        // This is creating a Marten session that's hooked into the Jasper outbox for
        // the incoming ConfirmReservation message being processed here
        await using var documentSession = _outboxedSessionFactory.OpenSession(context);

        var reservationConfirmed = await ConfirmReservationHandler.Handle(confirmReservation, documentSession)
            .ConfigureAwait(false);
        // Outgoing, cascaded message
        await context.EnqueueCascadingAsync(reservationConfirmed).ConfigureAwait(false);

        // Commit the unit of work *and* start sending the ReservationConfirmed message
        await documentSession.SaveChangesAsync(cancellation).ConfigureAwait(false);
    }
}

There’s a lot going on up above behind the scenes, but the key elements of the message handler workflow code with the outbox above are:

  • The outgoing message will not actually be sent to its destination (a Rabbit MQ exchange? a local queue? a Pulsar topic?) until the database changes succeed. That’s true for messages directly sent to Jasper’s IMessagePublisher or IExecutionContext services within a command handler method as well.
  • The outgoing messages and all necessary metadata (including the destination) are persisted to a database table inside the exact same database transaction as the changes to the persistent model in the handler above. In the case of the Marten integration, the messages are persisted in the same database command as the Reservation document changes as a way to improve performance by reducing network round trips to the database server.
  • After the transaction succeeds, the previously registered Jasper outbox is notified by a Marten session listener to start publishing the outgoing “cascaded” messages in Jasper’s background sending queues which span across command executions.
  • If the transaction fails, no messages will be sent out

A Vision for Low Ceremony CQRS with Event Sourcing

Let me just put this stake into the ground. The combination of Marten and Jasper will quickly become the most productive and lowest ceremony tooling for event sourcing and CQRS architectures on the .NET stack.

And for any George Strait fans out there, this song may be relevant to the previous paragraph:)

CQRS and Event Sourcing

Just to define some terminology, the Command Query Responsibility Separation (CQRS) pattern was first described by Greg Young as an approach to software architecture where you very consciously separate “writes” that change the state of the system from “reads” against the system state. The key to CQRS is to use separate models of the system state for writing versus querying. That leads to something I think of as the “scary view of CQRS”:

The “scary” view of CQRS

The “query database” in my diagram is meant to be an optimized storage for the queries needed by the system’s clients. The “domain model storage” is some kind of persisted storage that’s optimized for writes — both capturing new data and presenting exactly the current system state that the command handlers would need in order to process or validate incoming commands.

Many folks have a visceral, negative reaction to this kind of CQRS diagram as it does appear to be more complexity than the more traditional approach where you have a single database model — almost inevitably in a relational database of some kind — and both reads and writes work off the same set of database tables. Hell, I walked out of an early presentation by Greg Young about what later became to be known as CQRS at QCon 2008 shaking my head thinking this was all nuts.

Here’s the deal though, when we’re writing systems against the one, true database model, we’re potentially spending a lot of time mapping incoming messages to the stored model, and probably even more time transforming the raw database model into a shape that’s appropriate for our system’s clients in a way that also creates some decoupling between clients and the ugly, raw details of our underlying database. The “scary” CQRS architecture arguably just brings that sometimes hidden work and complexity into the light of day.

Now let’s move on to Event Sourcing. Event sourcing is a style of system persistence where each state change is captured and stored as an explicit event object. There’s all sorts of value from keeping the raw change events around for later, but you of course do need to compile the events into derived, or “projected” models that represent the current system state. When combining event sourcing into a CQRS architecture, some projected models from the raw events can serve as both the “write model” inside of incoming commands, while others can be built as the “query model” that clients will use to query our system.

At the end of this section, I want to be clear that event sourcing and CQRS can be used independently of each other, but to paraphrase Forrest Gump, event sourcing and CQRS go together like peas and carrots.

Now, the event sourcing side of this especially might sound a little scary if you’re not familiar with some of the existing tooling, so let’s move on first to Marten…

Marten has a mature feature set for event sourcing already, and we’re grown quite a bit in capability toward our support for projections (read-only views of the raw events). If you’ll peek back at the “scary” view of CQRS above, Marten’s projection support solves the problem of keeping the raw events synchronized to both any kind of “write model” for incoming commands and the richer “query model” views for system clients within a single database. In my opinion, Marten removes the “scary” out of event sourcing and we’re on our way to being the best possible “event sourcing in a box” solution for .NET.

As that support has gotten more capable and robust though, our users have frequently been asking about how to subscribe to events being captured in Marten and how to relay those events reliably to some other kind of infrastructure — which could be a completely separate database, outgoing message queues, or some other kind of event streaming scenario.

Oskar Dudycz and I have been discussing how to solve this in Marten, and we’ve come up with these three main mechanisms for supporting subscriptions to event data:

  1. Some or all of the events being captured by Marten should be forwarded automatically at the time of event capture (IDocumentSession.SaveChangesAsync())
  2. When strict ordering is required, we’ll need some kind of integration into Marten’s async daemon to relay events to external infrastructure
  3. For massive amounts of data with ambitious performance targets, use something like Debezium to directly stream Marten events from Postgresql to Kafka/Pulsar/etc.

Especially for the first item in that list above, we need some kind of outbox integration with Marten sessions to reliably relay events from Marten to outgoing transports while keeping the system in a consistent state (i.e., don’t publish the events if the transaction fails).

Fortunately, there’s a functional outbox implementation for Marten in…

To be clear, Jasper as a project was pretty well mothballed by the combination of COVID and the massive slog toward Marten 4.0 (and then a smaller slog to 5.0). However, I’ve been able to get back to Jasper and yesterday kicked out a new Jasper 2.0.0-alpha-2 release and (Re) Introducing Jasper as a Command Bus.

For this post though, I want to explore the potential of the Jasper + Marten combination for CQRS architectures. To that end, a couple weeks I published Marten just got better for CQRS architectures, which showed some new APIs in Marten to simplify repetitive code around using Marten event sourcing within CQRS architectures. Part of the sample code in that post was this MVC Core controller that used some newer Marten functionality to handle an incoming command:

public async Task CompleteCharting(
    [FromBody] CompleteCharting charting, 
    [FromServices] IDocumentSession session)
{
    var stream = await session
        .Events.FetchForExclusiveWriting<ProviderShift>(charting.ShiftId);
 
    // Validation on the ProviderShift aggregate
    if (stream.Aggregate.Status != ProviderStatus.Charting)
    {
        throw new Exception("The shift is not currently charting");
    }
     
    // We "decided" to emit one new event
    stream.AppendOne(new ChartingFinished(stream.Aggregate.AppointmentId.Value, stream.Aggregate.BoardId));
 
    await session.SaveChangesAsync();
}

To review, that controller method:

  1. Takes in a command message of type CompleteCharting
  2. Loads the current state of the aggregate ProviderShift model referred to by the incoming command, and does so in a way that takes care of concurrency for us by waiting to get an exclusive lock on the particular ProviderShift
  3. Assuming that the validation against the ProviderShift succeeds, emits a new ChartingFinished event
  4. Saves the pending work with a database transaction

In that post, I pointed out that there were some potential flaws or missing functionality with this approach:

  1. We probably want some error handling to retry the operation if we hit concurrency exceptions or timeout trying to get the exclusive lock. In other words, we have to plan for concurrency exceptions
  2. It’d be good to be able to automatically publish the new ChartingFinished event to a queue to take further action within our system (or an external service if we were using messaging here)
  3. Lastly, I’d argue there’s some repetitive code up there that could be simplified

To address these points, I’m going to introduce Jasper and its integration with Marten (Jasper.Persistence.Marten) to the telehealth portal sample from my previous blog post.

I’m going to move the actual handling of the CompleteCharting to a Jasper handler shown below that is functionally equivalent to the controller method shown earlier (except I switched the concurrency protection to being optimistic):

// This is auto-discovered by Jasper
public class CompleteChartingHandler
{
    [MartenCommandWorkflow] // this opts into some Jasper middlware 
    public ChartingFinished Handle(CompleteCharting charting, ProviderShift shift)
    {
        if (shift.Status != ProviderStatus.Charting)
        {
            throw new Exception("The shift is not currently charting");
        }

        return new ChartingFinished(charting.AppointmentId, shift.BoardId);
    }
}

And the controller method gets simplified down to just relaying the command to Jasper:

    public Task CompleteCharting(
        [FromBody] CompleteCharting charting, 
        [FromServices] ICommandBus bus)
    {
        // Just delegating to Jasper here
        return bus.InvokeAsync(charting, HttpContext.RequestAborted);
    }

There’s some opportunity for some mechanisms to make the code above be a little less repetitive and efficient. Maybe by riding on Minimal APIs. That’s for a later date though:)

By using the new [MartenCommandWorkflow] attribute, we’re directing Jasper to surround the command handler with middleware that handles much of the Marten mechanics by:

  1. Loading the aggregate ProviderShift for the incoming CompleteCharting command (I’m omitting some details here for brevity, but there’s a naming convention that can be explicitly overridden to pluck the aggregate identity off the incoming command)
  2. Passing that ProviderShift aggregate into the Handle() method above
  3. Applying the returned event to the event stream for the ProviderShift
  4. Committing the outstanding changes in the active Marten session

The Handle() code above becomes an example of a Decider function. Even better yet, it’s completely decoupled from any kind of infrastructure and fully synchronous. I’m going to argue that this approach will make command handlers much easier to unit test, definitely easier to write, and easier to read later just because you’re only focused on the business logic.

So that covers the repetitive code problem, but let’s move on to automatically publishing the ChartingCompleted event and some error handling. I’m going to add Jasper through the application’s bootstrapping code as shown below:

builder.Host.UseJasper(opts =>
{
    // I'm choosing to process any ChartingFinished event messages
    // in a separate, local queue with persistent messages for the inbox/outbox
    opts.PublishMessage<ChartingFinished>()
        .ToLocalQueue("charting")
        .DurablyPersistedLocally();
    
    // If we encounter a concurrency exception, just try it immediately 
    // up to 3 times total
    opts.Handlers.OnException<ConcurrencyException>().RetryNow(3); 
    
    // It's an imperfect world, and sometimes transient connectivity errors
    // to the database happen
    opts.Handlers.OnException<NpgsqlException>()
        .RetryWithCooldown(50.Milliseconds(), 100.Milliseconds(), 250.Milliseconds());
});

Jasper comes with some pretty strong exception handling policy capabilities you’ll need to do grown up development. In this case, I’m just setting up some global policies in the application to retry message failures on either Marten concurrency exceptions or the inevitable, transient Postgresql connectivity hiccups. In the case of the concurrency exception, you may just need to start the work over to ensure you’re starting from the most recent aggregate changes. I used globally applied policies here, but Jasper will also allow you to override that on a message type by message type basis.

Lastly, let’s add the Jasper outbox integration for Marten and opt into automatic event publishing with this bit of configuration chained to the standard AddMarten() usage:

builder.Services.AddMarten(opts =>
{
    // Marten configuration...
})
    // I added this to enroll Marten in the Jasper outbox
    .IntegrateWithJasper()
    
    // I also added this to opt into events being forward to
    // the Jasper outbox during SaveChangesAsync()
    .EventForwardingToJasper();

And that’s actually that. The configuration above will add the Jasper outbox tables to the Marten database for us, and let Marten’s database schema management manage those extra database objects.

Back to the command handler (mildly elided):

public class CompleteChartingHandler
{
    [MartenCommandWorkflow] 
    public ChartingFinished Handle(CompleteCharting charting, ProviderShift shift)
    {
        // validation code goes here!

        return new ChartingFinished(charting.AppointmentId, shift.BoardId);
    }
}

By opting into the outbox integration and event forwarding to Jasper from Marten, when this command handler is executed, the ChartingFinished events will be published — in this case just to an in-memory queue, but it could also be to an external transport — with Jasper’s outbox implementation that guarantees that the message will be delivered at least once as long as the database transaction to save the new event succeeds.

Conclusion and What’s Next?

There’s a tremendous amount of work in the rear window to get to the functionality that I demonstrated here, and a substantial amount of ambition in the future to drive this forward. I would love any possible feedback both positive and negative. Marten is a team effort, but Jasper’s been mostly my baby for the past 3-4 years, and I’d be happy for anybody who would want to get involved with that. I’m way behind in documentation for Jasper and somewhat for Marten, but that’s in flight.

My next couple posts to follow up on this are to:

  • Do a deeper dive into Jasper’s outbox and explain why it’s different and arguably more useful than the outbox implementations in other leading .NET tools
  • Introduce the usage of Rabbit MQ with Jasper for external messaging
  • Take a detour into the development and deployment time command line utilities built into Jasper & Marten through Oakton