Critter Stack Improvements for Event Driven Architecture

JasperFx Software is open for business and offering consulting services (like helping you craft modular monolith strategies!) and support contracts for both Marten and Wolverine so you know you can feel secure taking a big technical bet on these tools and reap all the advantages they give for productive and maintainable server side .NET development.

As a follow on post from First Class Event Subscriptions in Marten last week, let’s introduce Wolverine into the mix for end to end Event Driven Architecture approaches. Using Wolverine’s new Event Subscriptions model, “Critter Stack” systems can automatically process Marten event data with Wolverine message handlers:

If all we want to do is publish Marten event data through Wolverine’s message publishing (which remember, can be either to local queues or external message brokers), we have this simple recipe:

using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        opts.Services
            .AddMarten()
            
            // Just pulling the connection information from 
            // the IoC container at runtime.
            .UseNpgsqlDataSource()
            
            // You don't absolutely have to have the Wolverine
            // integration active here for subscriptions, but it's
            // more than likely that you will want this anyway
            .IntegrateWithWolverine()
            
            // The Marten async daemon most be active
            .AddAsyncDaemon(DaemonMode.HotCold)
            
            // This would attempt to publish every non-archived event
            // from Marten to Wolverine subscribers
            .PublishEventsToWolverine("Everything")
            
            // You wouldn't do this *and* the above option, but just to show
            // the filtering
            .PublishEventsToWolverine("Orders", relay =>
            {
                // Filtering 
                relay.FilterIncomingEventsOnStreamType(typeof(Order));

                // Optionally, tell Marten to only subscribe to new
                // events whenever this subscription is first activated
                relay.Options.SubscribeFromPresent();
            });
    }).StartAsync();

First off, what’s a “subscriber?” That would mean any event that Wolverine recognizes as having:

  • A local message handler in the application for the specific event type, which would effectively direct Wolverine to publish the event data to a local queue
  • A local message handler in the application for the specific IEvent<T> type, which would effectively direct Wolverine to publish the event with its IEvent Marten metadata wrapper to a local queue
  • Any event type where Wolverine can discover subscribers through routing rules

All the Wolverine subscription is doing is effectively calling IMessageBus.PublishAsync() against the event data or the IEvent<T> wrapper. You can make the subscription run more efficiently by applying event or stream type filters for the subscription.

If you need to do a transformation of the raw IEvent<T> or the internal event type to some kind of external event type for publishing to external systems when you want to avoid directly coupling other subscribers to your system’s internals, you can accomplish that by just building a message handler that does the transformation and publishes a cascading message like so:

public record OrderCreated(string OrderNumber, Guid CustomerId);

// I wouldn't use this kind of suffix in real life, but it helps
// document *what* this is for the sample in the docs:)
public record OrderCreatedIntegrationEvent(string OrderNumber, string CustomerName, DateTimeOffset Timestamp);

// We're going to use the Marten IEvent metadata and some other Marten reference
// data to transform the internal OrderCreated event
// to an OrderCreatedIntegrationEvent that will be more appropriate for publishing to
// external systems
public static class InternalOrderCreatedHandler
{
    public static Task<Customer?> LoadAsync(IEvent<OrderCreated> e, IQuerySession session,
        CancellationToken cancellationToken)
        => session.LoadAsync<Customer>(e.Data.CustomerId, cancellationToken);
    
    
    public static OrderCreatedIntegrationEvent Handle(IEvent<OrderCreated> e, Customer customer)
    {
        return new OrderCreatedIntegrationEvent(e.Data.OrderNumber, customer.Name, e.Timestamp);
    }
}

Process Events as Messages in Strict Order

In some cases you may want the events to be executed by Wolverine message handlers in strict order. With the recipe below:

using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        opts.Services
            .AddMarten(o =>
            {
                // This is the default setting, but just showing
                // you that Wolverine subscriptions will be able
                // to skip over messages that fail without
                // shutting down the subscription
                o.Projections.Errors.SkipApplyErrors = true;
            })

            // Just pulling the connection information from 
            // the IoC container at runtime.
            .UseNpgsqlDataSource()

            // You don't absolutely have to have the Wolverine
            // integration active here for subscriptions, but it's
            // more than likely that you will want this anyway
            .IntegrateWithWolverine()
            
            // The Marten async daemon most be active
            .AddAsyncDaemon(DaemonMode.HotCold)
            
            // Notice the allow list filtering of event types and the possibility of overriding
            // the starting point for this subscription at runtime
            .ProcessEventsWithWolverineHandlersInStrictOrder("Orders", o =>
            {
                // It's more important to create an allow list of event types that can be processed
                o.IncludeType<OrderCreated>();

                // Optionally mark the subscription as only starting from events from a certain time
                o.Options.SubscribeFromTime(new DateTimeOffset(new DateTime(2023, 12, 1)));
            });
    }).StartAsync();

In this recipe, Marten & Wolverine are working together to call IMessageBus.InvokeAsync() on each event in order. You can use both the actual event type (OrderCreated) or the wrapped Marten event type (IEvent<OrderCreated>) as the message type for your message handler.

In the case of exceptions from processing the event with Wolverine:

  1. Any built in “retry” error handling will kick in to retry the event processing inline
  2. If the retries are exhausted, and the Marten setting for StoreOptions.Projections.Errors.SkipApplyErrors is true, Wolverine will persist the event to its PostgreSQL backed dead letter queue and proceed to the next event. This setting is the default with Marten when the daemon is running continuously in the background, but false in rebuilds or replays
  3. If the retries are exhausted, and SkipApplyErrors = false, Wolverine will tell Marten to pause the subscription at the last event sequence that succeeded

Custom, Batched Subscriptions

The base type for all Wolverine subscriptions is the Wolverine.Marten.Subscriptions.BatchSubscription class. If you need to do something completely custom, or just to take action on a batch of events at one time, subclass that type. Here is an example usage where I’m using event carried state transfer to publish batches of reference data about customers being activated or deactivated within our system:

public record CompanyActivated(string Name);

public record CompanyDeactivated();

public record NewCompany(Guid Id, string Name);

// Message type we're going to publish to external
// systems to keep them up to date on new companies
public class CompanyActivations
{
    public List<NewCompany> Additions { get; set; } = new();
    public List<Guid> Removals { get; set; } = new();

    public void Add(Guid companyId, string name)
    {
        Removals.Remove(companyId);
        
        // Fill is an extension method in JasperFx.Core that adds the 
        // record to a list if the value does not already exist
        Additions.Fill(new NewCompany(companyId, name));
    }

    public void Remove(Guid companyId)
    {
        Removals.Fill(companyId);

        Additions.RemoveAll(x => x.Id == companyId);
    }
}

public class CompanyTransferSubscription : BatchSubscription
{
    public CompanyTransferSubscription() : base("CompanyTransfer")
    {
        IncludeType<CompanyActivated>();
        IncludeType<CompanyDeactivated>();
    }

    public override async Task ProcessEventsAsync(EventRange page, ISubscriptionController controller, IDocumentOperations operations,
        IMessageBus bus, CancellationToken cancellationToken)
    {
        var activations = new CompanyActivations();
        foreach (var e in page.Events)
        {
            switch (e)
            {
                // In all cases, I'm assuming that the Marten stream id is the identifier for a customer
                case IEvent<CompanyActivated> activated:
                    activations.Add(activated.StreamId, activated.Data.Name);
                    break;
                case IEvent<CompanyDeactivated> deactivated:
                    activations.Remove(deactivated.StreamId);
                    break;
            }
        }
        
        // At the end of all of this, publish a single message
        // In case you're wondering, this will opt into Wolverine's
        // transactional outbox with the same transaction as any changes
        // made by Marten's IDocumentOperations passed in, including Marten's
        // own work to track the progression of this subscription
        await bus.PublishAsync(activations);
    }
}

And the related code to register this subscription:

using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        opts.UseRabbitMq(); 
        
        // There needs to be *some* kind of subscriber for CompanyActivations
        // for this to work at all
        opts.PublishMessage<CompanyActivations>()
            .ToRabbitExchange("activations");
        
        opts.Services
            .AddMarten()

            // Just pulling the connection information from 
            // the IoC container at runtime.
            .UseNpgsqlDataSource()
            
            .IntegrateWithWolverine()
            
            // The Marten async daemon most be active
            .AddAsyncDaemon(DaemonMode.HotCold)

                                
            // Register the new subscription
            .SubscribeToEvents(new CompanyTransferSubscription());
    }).StartAsync();

Summary

The feature set shown here has been a very long planned set of capabilities to truly extend the “Critter Stack” into the realm of supporting Event Driven Architecture approaches from soup to nuts. Using the Wolverine subscriptions automatically gets you support to publish Marten events to any transport supported by Wolverine itself, and does so in a much more robust way than you can easily roll by hand like folks did previously with Marten’s IProjection interface. I’m currently helping a JasperFx Software client utilize this functionality for data exchange that has strict ordering and at least once delivery guarantees.

Marten, PostgreSQL, and .NET Aspire walk into a bar…

This is somewhat a follow up from yesterday’s post on Marten, Metrics, and Open Telemetry Support. I was very hopeful about the defunct Project Tye, and have been curious about .NET Aspire as a more ambitious offering since it was announced. As part of the massive Marten V7 release, we took some steps to ensure that Marten could use PostgreSQL databases controlled by .NET Aspire.

I finally got a chance to put together a sample Marten system named using .NET Aspire called MartenWithProjectAspire on GitHub. Simplified from some longstanding Marten test projects, consider this little system:

At runtime, the EventPublisher service continuously appends events that represent progress in a Trip event stream to the Marten-ized PostgreSQL database. The TripBuildingService is running Marten’s async daemon subsystem that constantly reads in new events to the PostgreSQL database and builds or updates projected documents back to the database to represent the current state of the event store.

The end result was a single project named AspireHost that when executed, will use .NET Aspire to start a new PostgreSQL docker container and start up the EventPublisher and TripBuildingService services while passing the connection string to the new PostgreSQL database to these services at runtime with a little bit of Environment variable sleight of hand.

You can see the various projects and containers from the Aspire dashboard:

and even see some of the Open Telemetry activity traced by Marten and visualized through Aspire:

Honestly, it took me a bit of trial and error to get this all working together. First, we need to configure Marten to use an NpgsqlDataSource connection to the PostgreSQL database that will be loaded from each service’s IoC container — then tell Marten to use that NpgsqlDataSource.

After adding Nuget references for Aspire.Npgsql and Marten itself, I added the second line of code shown below to the top of the Program file for both services using Marten:

var builder = Host.CreateApplicationBuilder();

// Register the NpgsqlDataSource in the IoC container using
// connection string named "marten" from IConfiguration
builder.AddNpgsqlDataSource("marten");

That’s really just a hook to add a registration for the NpgsqlDataSource type to the application’s IoC container with the expectation that the connection string will be in the application’s configuration connection string collection with the key “marten.”

One of the major efforts with Marten 7 was rewiring Marten’s internals (then Wolverine’s) to strictly use the new NpgsqlDataSource concept for database connectivity. If you maybe caught me being less than polite about Npgsql on what’s left of Twitter, just know that the process was very painful but it’s completely done now and working well outside of the absurd noisiness of built in Npgsql logging.

Next, I have to explicitly tell Marten itself to load the NpgsqlDataSource object from the application’s IoC container instead of the older, idiomatic approach of passing a connection string directly to Marten as shown below:

builder.Services.AddMarten(opts =>
    {
        // Other configuration, but *not* the connection
    })
    
    // Use PostgreSQL data source from the IoC container
    .UseNpgsqlDataSource();

Now, switching to the AspireHost, I needed to add a Nuget reference to Aspire.Hosting.PostgreSQL in order to be able to bootstrap the PostgreSQL database at runtime. I also made project references from AspireHost to EventPublisher and TripBuildingService — which is important because Aspire does some source generation build a strong typed enumeration representing your projects that we’ll use next. That last step confused me when I was first playing with Aspire, so hopefully now you get to bypass that confusion. Maybe.

In the Program file for AspireHost, it’s just this:

var builder = DistributedApplication.CreateBuilder(args);

var postgresdb = builder.AddPostgres("marten");

builder.AddProject<Projects.EventPublisher>("publisher")
    .WithReference(postgresdb);

builder.AddProject<Projects.TripBuildingService>("trip-building")
    .WithReference(postgresdb);

builder.Build().Run();

Now, run the AspireHost project and you are able to run the two other services with the newly activated PostgreSQL Docker container, which you can see from the Docker Desktop dashboard:

Ship it!

Summary

Is .NET Aspire actually useful (I think so, even if it’s just for local development and testing maybe)? Can I explain the new Open Telemetry data exported from Marten? Would I use this instead of a dirt simple Docker Compose file like I do today (probably not to be honest)? Is this all fake?

All these questions and more will be somewhat addressed tomorrow-ish when I try to launch a new YouTube channel for JasperFx Software using the sample from this blog post as the subject for my first ever solo YouTube video.

One more thing…

I did alter the launchSettings.json file of the Aspire host project so it didn’t need to care about HTTPS to this:

{
  "$schema": "https://json.schemastore.org/launchsettings.json",
  "profiles": {
    "http": {
      "commandName": "Project",
      "dotnetRunMessages": true,
      "launchBrowser": true,
      "applicationUrl": "http://localhost:15242",
      "environmentVariables": {
        "ASPNETCORE_ENVIRONMENT": "Development",
        "DOTNET_ENVIRONMENT": "Development",
        "DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "http://localhost:19076",
        "DOTNET_RESOURCE_SERVICE_ENDPOINT_URL": "http://localhost:20101",
        "ASPIRE_ALLOW_UNSECURED_TRANSPORT": "true"
      }
    }
  }
}

Note the usage of the ASPIRE_ALLOW_UNSECURED_TRANSPORT environment variable.

Marten, Metrics, and Open Telemetry Support

Marten 7.10 was released today, and (finally) brings some built in support for monitoring Marten performance by exporting Open Telemetry and Metrics about Marten activity and performance within your system.

To use a little example, there’s a sample application in the Marten codebase called EventPublisher that we use to manually test some of the command line tooling. All that EventPublisher does is to continuously publish randomized events to a Marten event store when it runs. That made it a good place to start with a test harness for our new Open Telemetry support and performance related metrics.

For testing, I was just using the Project Aspire dashboard for viewing metrics and Open Telemetry tracing. First off, I enabled the “opt in” connection tracing for Marten, and put it into a verbose mode that’s probably only suitable for debugging or performance optimization work:

        // builder is a HostApplicationBuilder object
        builder.Services.AddMarten(opts =>
        {
            // Other Marten configuration...
            
            // Turn on Otel tracing for connection activity, and
            // also tag events to each span for all the Marten "write"
            // operations
            opts.OpenTelemetry.TrackConnections = TrackLevel.Verbose;

            // This opts into exporting a counter just on the number
            // of events being appended. Kinda a duplication
            opts.OpenTelemetry.TrackEventCounters();
            );
        });

That’s just the Marten side of things, so to hook up an Open Telemetry exporter for the Aspire dashboard, I added (really copy/pasted) this code (note that you’ll need the OpenTelemetry.Extensions.Hosting and OpenTelemetry.Exporter.OpenTelemetryProtocol Nugets added to your project):

        builder.Logging.AddOpenTelemetry(logging =>
        {
            logging.IncludeFormattedMessage = true;
            logging.IncludeScopes = true;
        });

        builder.Services.AddOpenTelemetry()
            .WithMetrics(metrics =>
            {
                metrics
                    .AddRuntimeInstrumentation().AddMeter("EventPublisher");
            })
            .WithTracing(tracing =>
            {
                tracing.AddAspNetCoreInstrumentation()
                    .AddHttpClientInstrumentation();
            });

        var endpointUri = builder.Configuration["OTEL_EXPORTER_OTLP_ENDPOINT"];
        builder.Services.AddOpenTelemetry().UseOtlpExporter();

        builder.Services.AddOpenTelemetry()
            // Enable exports of Open Telemetry activities
            .WithTracing(tracing =>
            {
                tracing.AddSource("Marten");
            })
            
            // Enable exports of metrics
            .WithMetrics(metrics =>
            {
                metrics.AddMeter("Marten");
            });

And now after running that with Aspire, you can see the output:

By itself, these spans, especially when shown in context of being nested within an HTTP request or a message being handled in a service bus framework, can point out where you may have performance issues from chattiness between the application server and the database — which I have found to be a very common source of performance problems out in the real world.

This is an opt in mode, but there are metrics and Open Telemetry tracing that are automatic for the background, async daemon subsystem. Skipping ahead a little bit, here’s a preview of some performance metrics in a related application that shows the “health” of a projection running in Marten’s async daemon subsystem by visualizing the “gap” between the projection’s current progression and the “high water mark” of Marten’s event store (how far the projection is sequentially compared to how far the whole event store itself is):

Summary

This is a short blog post, but I hope even this is enough to demonstrate how useful this new tracing is going to be in this new world order of Open Telemetry tracing tools.

The Decorator Pattern is sometimes helpful

I’ve been occasionally writing posts about old design patterns that are still occasionally useful despite the decades long backlash to the old “Gang of Four” book:

According to the original Gang of Four book, the “Decorator Pattern”:

…dynamically adds/overrides behavior in an existing method of an object.

Or more concretely, a decorator is a wrapper for an inner object that intercepts all calls to the inner object and potentially does some kind of work before or after the inner call. As a simple example, consider this ancient interface from the testing suite in StructureMap & Lamar:

    public interface IWidget
    {
        void DoSomething();
    }

And here’s a potential decorator for the IWidget service:

    public class ConsoleWritingWidgetDecorator : IWidget
    {
        private readonly IWidget _inner;

        public ConsoleWritingWidgetDecorator(IWidget inner)
        {
            _inner = inner;
        }

        public void DoSomething()
        {
            Console.WriteLine("I'm about to do something!");
            _inner.DoSomething();
            Console.WriteLine("I did something!");
        }
    }

The mechanics are simple enough, so let’s dive into some more complex use cases from the Marten 7.* codebase.

The most common usage of a decorator for me has been to separate out some kind of infrastructural concern like logging, error handling, or security from the core behavior. Just think on this. Instrumentation, security, and error handling are all very important elements of successful production code, but how many times in your career have you struggled to comprehend, modify, or debug code that is almost completely obfuscated by technical concerns.

Instead, I’ve sometimes found it helpful to separate out some of the technical concerns to a wrapping decorator just to allow the core functionality code to be easier to write, read later, and definitely to test. As an example from Marten 7.*, we have this interface for a service within Marten’s async daemon subsystem that’s used to fetch a page of event data at a time for a given projection or subscription:

public class EventRequest
{
    public long Floor { get; init; }
    public long HighWater { get; init; }
    public int BatchSize { get; init; }

    public ShardName Name { get; init; }

    public ErrorHandlingOptions ErrorOptions { get; init; }

    // other stuff...
}

public interface IEventLoader
{
    Task<EventPage> LoadAsync(EventRequest request, CancellationToken token);
}

This is for an asynchronous, background process, and we fully expect for there to be occasional issues with database connectivity, network hiccups, command timeouts when the database is stressed, and who knows what else. It’s obviously very important for this code to be as resilient as possible and be able to do some selected retries on transient errors at runtime. At the same time though, the actual functionality of that one LoadAsync() method was busy enough all by itself, so I opted to write the “real” functionality first with this — then test the heck out of that first:

internal class EventLoader: IEventLoader
{
    private readonly int _aggregateIndex;
    private readonly int _batchSize;
    private readonly NpgsqlParameter _ceiling;
    private readonly NpgsqlCommand _command;
    private readonly NpgsqlParameter _floor;
    private readonly IEventStorage _storage;
    private readonly IDocumentStore _store;

    public EventLoader(DocumentStore store, MartenDatabase database, AsyncProjectionShard shard, AsyncOptions options) : this(store, database, options, shard.BuildFilters(store).ToArray())
    {

    }

    public EventLoader(DocumentStore store, MartenDatabase database, AsyncOptions options, ISqlFragment[] filters)
    {
        _store = store;
        Database = database;

        _storage = (IEventStorage)store.Options.Providers.StorageFor<IEvent>().QueryOnly;
        _batchSize = options.BatchSize;

        var schemaName = store.Options.Events.DatabaseSchemaName;

        var builder = new CommandBuilder();
        builder.Append($"select {_storage.SelectFields().Select(x => "d." + x).Join(", ")}, s.type as stream_type");
        builder.Append(
            $" from {schemaName}.mt_events as d inner join {schemaName}.mt_streams as s on d.stream_id = s.id");

        if (_store.Options.Events.TenancyStyle == TenancyStyle.Conjoined)
        {
            builder.Append(" and d.tenant_id = s.tenant_id");
        }

        var parameters = builder.AppendWithParameters(" where d.seq_id > ? and d.seq_id <= ?");
        _floor = parameters[0];
        _ceiling = parameters[1];
        _floor.NpgsqlDbType = _ceiling.NpgsqlDbType = NpgsqlDbType.Bigint;

        foreach (var filter in filters)
        {
            builder.Append(" and ");
            filter.Apply(builder);
        }

        builder.Append(" order by d.seq_id limit ");
        builder.Append(_batchSize);

        _command = builder.Compile();
        _aggregateIndex = _storage.SelectFields().Length;
    }

    public IMartenDatabase Database { get; }

    public async Task<EventPage> LoadAsync(EventRequest request,
        CancellationToken token)
    {
        // There's an assumption here that this method is only called sequentially
        // and never at the same time on the same instance
        var page = new EventPage(request.Floor);

        await using var session = (QuerySession)_store.QuerySession(SessionOptions.ForDatabase(Database));
        _floor.Value = request.Floor;
        _ceiling.Value = request.HighWater;

        await using var reader = await session.ExecuteReaderAsync(_command, token).ConfigureAwait(false);
        while (await reader.ReadAsync(token).ConfigureAwait(false))
        {
            try
            {
                // as a decorator
                var @event = await _storage.ResolveAsync(reader, token).ConfigureAwait(false);

                if (!await reader.IsDBNullAsync(_aggregateIndex, token).ConfigureAwait(false))
                {
                    @event.AggregateTypeName =
                        await reader.GetFieldValueAsync<string>(_aggregateIndex, token).ConfigureAwait(false);
                }

                page.Add(@event);
            }
            catch (UnknownEventTypeException e)
            {
                if (request.ErrorOptions.SkipUnknownEvents)
                {
                    request.Runtime.Logger.LogWarning("Skipping unknown event type '{EventTypeName}'", e.EventTypeName);
                }
                else
                {
                    // Let any other exception throw
                    throw;
                }
            }
            catch (EventDeserializationFailureException e)
            {
                if (request.ErrorOptions.SkipSerializationErrors)
                {
                    await request.Runtime.RecordDeadLetterEventAsync(e.ToDeadLetterEvent(request.Name)).ConfigureAwait(false);
                }
                else
                {
                    // Let any other exception throw
                    throw;
                }
            }
        }

        page.CalculateCeiling(_batchSize, request.HighWater);

        return page;
    }

At runtime, that type is wrapped by a decorator that adds resiliency through the Polly library like so:

internal class ResilientEventLoader: IEventLoader
{
    private readonly ResiliencePipeline _pipeline;
    private readonly EventLoader _inner;

    internal record EventLoadExecution(EventRequest Request, IEventLoader Loader)
    {
        public async ValueTask<EventPage> ExecuteAsync(CancellationToken token)
        {
            var results = await Loader.LoadAsync(Request, token).ConfigureAwait(false);
            return results;
        }
    }

    public ResilientEventLoader(ResiliencePipeline pipeline, EventLoader inner)
    {
        _pipeline = pipeline;
        _inner = inner;
    }

    public Task<EventPage> LoadAsync(EventRequest request, CancellationToken token)
    {
        try
        {
            var execution = new EventLoadExecution(request, _inner);
            return _pipeline.ExecuteAsync(static (x, t) => x.ExecuteAsync(t),
                execution, token).AsTask();
        }
        catch (Exception e)
        {
            // This would only happen after a chain of repeated
            // failures -- which can of course happen!
            throw new EventLoaderException(request.Name, _inner.Database, e);
        }
    }
}

In the case above, using a decorator allowed me to focus on one set of concerns at a time and punt the Polly usage for resiliency to something else. The “something else” being a decorator that only really deals with the error handling and resiliency while letting the inner IEventFetcher “know” how to fetch the requested event data and turn that into the right .NET objects.

Here’s a more recent example written by Sean Farrow where we’re purposely using a decorator to add extra functionality to a core bit of the Marten command execution. If you go spelunking around in the Marten codebase, you’ll fine an interface called IConnectionLifetime that is used to actually execute database commands or queries within most common Marten operations (it was actually featured in my post on the State pattern) partially shown below:

public interface IConnectionLifetime: IAsyncDisposable, IDisposable
{
    // Other stuff...

    Task<DbDataReader> ExecuteReaderAsync(NpgsqlCommand command,
        CancellationToken token = default);
}

As we’re adding Open Telemetry support into Marten for the 7.10 release, we know that some folks will want some control to turn up or down the telemetry data emitted by Marten (more can be noise, and sometimes less can mean better performance anyway). One possible data collection element is to track the number of database requests in a given session and the number of subsequent database exceptions. That’s being accomplished with a decorator around the IConnectionLifetime like this:

internal class EventTracingConnectionLifetime:
    IConnectionLifetime
{
    private const string MartenCommandExecutionStarted = "marten.command.execution.started";
    private const string MartenBatchExecutionStarted = "marten.batch.execution.started";
    private const string MartenBatchPagesExecutionStarted = "marten.batch.pages.execution.started";
    private readonly IConnectionLifetime _innerConnectionLifetime;
    private readonly Activity? _databaseActivity;

    public EventTracingConnectionLifetime(IConnectionLifetime innerConnectionLifetime, string tenantId)
    {
        if (innerConnectionLifetime == null)
        {
            throw new ArgumentNullException(nameof(innerConnectionLifetime));
        }

        if (string.IsNullOrWhiteSpace(tenantId))
        {
            throw new ArgumentException("The tenant id cannot be null, an empty string or whitespace.", nameof(tenantId));
        }

        Logger = innerConnectionLifetime.Logger;
        CommandTimeout = innerConnectionLifetime.CommandTimeout;
        _innerConnectionLifetime = innerConnectionLifetime;

        var currentActivity = Activity.Current ?? null;
        var tags = new ActivityTagsCollection(new[] { new KeyValuePair<string, object?>(MartenTracing.MartenTenantId, tenantId) });
        _databaseActivity = MartenTracing.StartConnectionActivity(currentActivity, tags);
    }

    public ValueTask DisposeAsync()
    {
        _databaseActivity?.Stop();
        return _innerConnectionLifetime.DisposeAsync();
    }

    public void Dispose()
    {
        _databaseActivity?.Stop();
        _innerConnectionLifetime.Dispose();
    }

    public IMartenSessionLogger Logger { get; set; }
    public int CommandTimeout { get; }
    public int Execute(NpgsqlCommand cmd)
    {
        _databaseActivity?.AddEvent(new ActivityEvent(MartenCommandExecutionStarted));

        try
        {
            return _innerConnectionLifetime.Execute(cmd);
        }
        catch (Exception e)
        {
            _databaseActivity?.RecordException(e);

            throw;
        }
    }

    public async Task<DbDataReader> ExecuteReaderAsync(NpgsqlCommand command, CancellationToken token = default)
    {
        _databaseActivity?.AddEvent(new ActivityEvent(MartenCommandExecutionStarted));

        try
        {
            return await _innerConnectionLifetime.ExecuteReaderAsync(command, token).ConfigureAwait(false);
        }
        catch (Exception e)
        {
            _databaseActivity?.RecordException(e);

            throw;
        }
    }

    // And much more...
}

That decorator is only selectively applied depending on whether or not the system developers have opted into this tracing and also if there’s an active listener for the data (no sense wasting extra CPU time on emitting data into the void!):

    internal IConnectionLifetime Initialize(DocumentStore store, CommandRunnerMode mode)
    {
        Mode = mode;
        Tenant ??= TenantId != Tenancy.DefaultTenantId ? store.Tenancy.GetTenant(TenantId) : store.Tenancy.Default;

        if (!AllowAnyTenant && !store.Options.Advanced.DefaultTenantUsageEnabled &&
            Tenant.TenantId == Tenancy.DefaultTenantId)
        {
            throw new DefaultTenantUsageDisabledException();
        }

        var innerConnectionLifetime = GetInnerConnectionLifetime(store, mode);

        return !OpenTelemetryOptions.TrackConnectionEvents || !MartenTracing.ActivitySource.HasListeners()
            ? innerConnectionLifetime
            : new EventTracingConnectionLifetime(innerConnectionLifetime, Tenant.TenantId);
    }

Summary

I showed off a couple examples where I feel like the decorator pattern is adding value to the Marten code by helping us expose extra functionality or just to separate concerns a little more cleanly in these particular cases. I’ve absolutely seen codebases where the code was dreadfully hard to follow because of the copious usage of decorators. Using decorators can also help blow up your object allocations (potential performance issue) and lead to some extraordinarily noisy exception stack traces from failures in the inner most objects. That being said, I’d still rather deal with nested decorators where you can at least see the boundaries between object responsibilities than wrestle with deep inheritance relationships.

As with all patterns, the decorator pattern is sometimes helpful and sometimes harmful. Just be cautious with its usage on a case by case basis and always filter it through the lens of “is using this making the code easier to understand or harder?”

But regardless, decorators are commonly used, and it’s just good to recognize the pattern when you see it and understand what the original author was trying to do.

First Class Event Subscriptions in Marten

This feature has been planned for Marten for years, but finally happened this month because a JasperFx Software client had a complicated multi-tenanted integration need for this as part of a complicated multi-tenanted and order sensitive data integration.

Marten recently (these samples are pulled from Marten 7.9) got a first class “event subscription” feature that allows users to take action upon events being appended to Marten’s event store in strict sequential order in a background process. While you’ve long been able to integrate Marten with other systems by using Marten’s older projection model, the newer subscription model is leaner and more efficient for background processing.

Before I get to “what” it is, let’s say that you need to carry out some kind of background processing on these events as they are captured? For example, maybe you need to:

  • Publish events to an external system as some kind of integration?
  • Carry out background processing based on a captured event
  • Build a view representation of the events in something outside of the current PostgreSQL database, like maybe an Elastic Search view for better searching

With this recently added feature, you can utilize Marten’s ISubscription model that runs within Marten’s async daemon subsystem to “push” events into your subscriptions as events flow into your system. Note that this is a background process within your application, and happen in a completely different thread than the initial work of appending and saving events to the Marten event storage.

Subscriptions will always be an implementation of the ISubscription interface shown below:

/// <summary>
/// Basic abstraction for custom subscriptions to Marten events through the async daemon. Use this in
/// order to do custom processing against an ordered stream of the events
/// </summary>
public interface ISubscription : IAsyncDisposable
{
    /// <summary>
    /// Processes a page of events at a time
    /// </summary>
    /// <param name="page"></param>
    /// <param name="controller">Use to log dead letter events that are skipped or to stop the subscription from processing based on an exception</param>
    /// <param name="operations">Access to Marten queries and writes that will be committed with the progress update for this subscription</param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    Task<IChangeListener> ProcessEventsAsync(EventRange page, ISubscriptionController controller,
        IDocumentOperations operations,
        CancellationToken cancellationToken);
}

So far, the subscription model gives you these abilities:

  • Access to the Marten IDocumentOperations service that is scoped to the processing of a single page and can be used to either query additional data or to make database writes within the context of the same transaction that Marten will use to record the current progress of the subscription to the database
  • Error handling abilities via the ISubscriptionController interface argument that can be used to record events that were skipped by the subscription or to completely stop all further processing
  • By returning an IChangeListener, the subscription can be notified right before and right after Marten commits the database transaction for any changes including recording the current progress of the subscription for the current page. This was done purposely to enable transactional outbox approaches like the one in Wolverine. See the async daemon diagnostics for more information.
  • The ability to filter the event types or stream types that the subscription is interested in as a way to greatly optimize the runtime performance by preventing Marten from having to fetch events that the subscription will not process
  • The ability to create the actual subscription objects from the application’s IoC container when that is necessary
  • Flexible control over where or when the subscription starts when it is first applied to an existing event store
  • Some facility to “rewind and replay” subscriptions

To make this concrete, here’s the simplest possible subscription you can make to simply write out a console message for every event:

public class ConsoleSubscription: ISubscription
{
    public Task<IChangeListener> ProcessEventsAsync(EventRange page, ISubscriptionController controller, IDocumentOperations operations,
        CancellationToken cancellationToken)
    {
        Console.WriteLine($"Starting to process events from {page.SequenceFloor} to {page.SequenceCeiling}");
        foreach (var e in page.Events)
        {
            Console.WriteLine($"Got event of type {e.Data.GetType().NameInCode()} from stream {e.StreamId}");
        }

        // If you don't care about being signaled for
        return Task.FromResult(NullChangeListener.Instance);
    }

    public ValueTask DisposeAsync()
    {
        return new ValueTask();
    }
}

And to register that with our Marten store:

var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
    {
        opts.Connection(builder.Configuration.GetConnectionString("marten"));

        // Because this subscription has no service dependencies, we
        // can use this simple mechanism
        opts.Events.Subscribe(new ConsoleSubscription());

        // Or with additional configuration like:
        opts.Events.Subscribe(new ConsoleSubscription(), s =>
        {
            s.SubscriptionName = "Console"; // Override Marten's naming
            s.SubscriptionVersion = 2; // Potentially version as an all new subscription

            // Optionally create an allow list of
            // event types to subscribe to
            s.IncludeType<InvoiceApproved>();
            s.IncludeType<InvoiceCreated>();

            // Only subscribe to new events, and don't try
            // to apply this subscription to existing events
            s.Options.SubscribeFromPresent();
        });
    })
    .AddAsyncDaemon(DaemonMode.HotCold);

using var host = builder.Build();
await host.StartAsync();

Here’s a slightly more complicated sample that publishes events to a configured Kafka topic:

public class KafkaSubscription: SubscriptionBase
{
    private readonly KafkaProducerConfig _config;

    public KafkaSubscription(KafkaProducerConfig config)
    {
        _config = config;

        SubscriptionName = "Kafka";

        // Access to any or all filtering rules
        IncludeType<InvoiceApproved>();

        // Fine grained control over how the subscription runs
        // in the async daemon
        Options.BatchSize = 1000;
        Options.MaximumHopperSize = 10000;

        // Effectively run as a hot observable
        Options.SubscribeFromPresent();
    }

    // The daemon will "push" a page of events at a time to this subscription
    public override async Task<IChangeListener> ProcessEventsAsync(
        EventRange page,
        ISubscriptionController controller,
        IDocumentOperations operations,
        CancellationToken cancellationToken)
    {
        using var kafkaProducer =
            new ProducerBuilder<string, string>(_config.ProducerConfig).Build();

        foreach (var @event in page.Events)
        {
            await kafkaProducer.ProduceAsync(_config.Topic,
                new Message<string, string>
                {
                    // store event type name in message Key
                    Key = @event.Data.GetType().Name,
                    // serialize event to message Value
                    Value = JsonConvert.SerializeObject(@event.Data)
                }, cancellationToken);

        }

        // We don't need any kind of callback, so the nullo is fine
        return NullChangeListener.Instance;
    }

}

// Just assume this is registered in your IoC container
public class KafkaProducerConfig
{
    public ProducerConfig? ProducerConfig { get; set; }
    public string? Topic { get; set; }
}

This time, it’s requiring IoC services injected through its constructor, so we’re going to use this mechanism to add it to Marten:

var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
    {
        opts.Connection(builder.Configuration.GetConnectionString("marten"));
    })
    // Marten also supports a Scoped lifecycle, and quietly forward Transient
    // to Scoped
    .AddSubscriptionWithServices<KafkaSubscription>(ServiceLifetime.Singleton, o =>
    {
        // This is a default, but just showing what's possible
        o.IncludeArchivedEvents = false;

        o.FilterIncomingEventsOnStreamType(typeof(Invoice));

        // Process no more than 10 events at a time
        o.Options.BatchSize = 10;
    })
    .AddAsyncDaemon(DaemonMode.HotCold);

using var host = builder.Build();
await host.StartAsync();

But there’s more!

The subscriptions run with Marten’s async daemon process, which just got a world of improvements in the Marten V7 release, including the ability to distribute work across running nodes in your application at runtime.

I didn’t show it in this blog post, but there are also facilities to configure whether a new subscription will start by working through all the events from the beginning of the system, or whether the subscription should start from the current sequence of the event store, or even go back to an explicitly stated sequence or timestamp, then play forward. Marten also has support — similar to its projection rebuild functionality — to rewind and replay subscriptions.

Wolverine already has specific integrations to utilize Marten event subscriptions to process events with Wolverine message handlers, or to forward events as messages through Wolverine publishing (Kafka? Rabbit MQ? Azure Service Bus?), or to do something completely custom with batches of events at a time (which I’ll demonstrate in the next couple weeks). I’ll post about that soon after that functionality gets fully documented with decent examples.

Lastly, and this is strictly in the hopefully near term future, there will be specific support for Marten subscriptions in the planned “Critter Stack Pro” add on product to Marten & Wolverine to:

  • Distribute subscription work across running nodes within your system — which actually exists in a crude, but effective form, and will absolutely be in Critter Stack Pro V1!
  • User interface monitoring and control pane to manually turn on and off subscriptions, review performance, and manually “rewind” subscriptions

Hopefully much more on this soon. It’s taken much longer than I’d hoped, but it’s still coming.

Embedding Database Migrations with Weasel

A woodworking weasel building a table, of course!

Let’s say that you’re building a system that needs to directly work with a handful of database tables. Or maybe more aptly, you’re building a redistributable class library that will expect to interact with a small number of database tables, functions, view, or sequences — and you’d love to make that class library be responsible for building those database objects as necessary at least at development time so your users can just get to work without any kind of laborious setup before hand.

If you’ve worked with the main “Critter Stack” tools (Marten and Wolverine), you’re familiar with how they can quietly set up your development or even your production database as necessary to reflect your system’s configuration. The actual work of database migrations built into these tools is done by the third member of the “Critter Stack, ” a helper library named Weasel.

You can also use Weasel in your own class library to do the same kind of automatic database migration — as long as you’re using either PostgreSQL or Sql Server (for now).

With Weasel, you can define the requirements for a new database table with a class originally named Table in the Weasel.Postgresql Nuget which exposes an API for just about anything you could do to configure a table including columns, primary keys, foreign key relationships to other tables, and indexes:

        var table = new Table("tables.people");
        table.AddColumn<int>("id").AsPrimaryKey();
        table.AddColumn<string>("first_name");
        table.AddColumn<string>("last_name");

Inside your code, you can at any time migrate the existing database to reflect your Table object with this convenience extension method added in Weasel 7.4:

var table = new Table("tables.people");
table.AddColumn<int>("id").AsPrimaryKey();
table.AddColumn<string>("first_name");
table.AddColumn<string>("last_name");

await using var conn = new NpgsqlConnection("some connection string");
await conn.OpenAsync();

// This will apply any necessary changes to make
// the database reflect the specified table structure
await table.MigrateAsync(conn);

Behind the scenes, Weasel reaches into the database to find the current status — if any — of the specified table. If the table doesn’t exist, Weasel creates it based on the in memory specification. If the table does already exist in the database, Weasel can figure out if there is any “delta” between the expected table from the Table specification and the actual database table. Weasel can issue SQL patches to:

  • Add missing columns
  • Remove columns in the database that are not part of the specification
  • Modify the primary key if necessary
  • Add missing indexes
  • Remove indexes that are not reflected in the specification
  • Deal with foreign keys

And of course, Weasel will do absolutely nothing else if it does not find any differences between the tables.

Likewise, Weasel supports functions and sequences for PostgreSQL. The Weasel.SqlServer has similar support for tables, stored procedures, and custom types (Wolverine uses quite a few user defined table types as an optimization to batch up updates and inserts with its Sql Server integration).

So Weasel definitely isn’t the best documented or visible library in the Critter Stack, but it is useful outside of Marten and Wolverine, and the documentation story might improve dramatically if there’s more demand for that.

Strict Ordered Message Handling wth Wolverine

The feature was built for a current JasperFx Software client, and came with a wave of developments across both Marten and Wolverine to support a fairly complex, mission critical set of application integrations. The PostgreSQL transport new to Wolverine was part of this wave. Some time next week I’ll be blogging about the Marten event subscription capabilities that were built into Marten & Wolverine to support this client as well. The point being, JasperFx is wide open for business and we can help your shop succeed with challenging project work!

Wolverine now has the ability to support strict messaging order with its message listeners. Given any random listening endpoint in Wolverine, just add this directive below to make the message processing be strictly sequential (with the proviso that your error handling policies may impact the order on failures):

var host = await Host.CreateDefaultBuilder().UseWolverine(opts =>
{
    opts.UseRabbitMq().EnableWolverineControlQueues();
    
opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "listeners");

    opts.ListenToRabbitQueue("ordered")
        
        // This option is available on all types of Wolverine
        // endpoints that can be configured to be a listener
        .ListenWithStrictOrdering();
}).StartAsync();

Some notes about the ListenWithStrictOrdering() directive you might have:

  1. It’s supported with every external messaging broker that Wolverine supports, including Kafka, Azure Service Bus, AWS SQS, and Rabbit MQ. It is also supported with the two database backed transports (we have both kinds, Sql Server and PostgreSQL!)
  2. When this directive is applied, Wolverine will only make the listener for each endpoint (in the case above, the Rabbit MQ named “ordered”) be active on a single node within your application. Today that distribution is just crudely spreading out the “exclusive listeners” evenly across the whole application cluster. Definitely note that the strict ordering comes at the cost of reduced throughput, so use this feature wisely! Did I mention that JasperFx Software is here and ready to work with your company on Critter Stack projects?
  3. Every exclusive listener will quickly start up on a single node if WolverineOptions.Durability.Mode = DurabilityMode.Solo, and you may want to do that for local testing and development just to be a little quicker on cold starts
  4. The ListenWithStrictOrdering will make the internal worker queue (Wolverine uses an internal TPL Dataflow ActionBlock in these cases) for “buffered” or “durable” endpoints be strictly sequential
  5. You will have to have a durable message store configured for your application in order for Wolverine to perform the leadership election and “agent tracking” (what’s running where)

Summary

This is a powerful tool in the continually growing Wolverine tool belt. The strict ordering may also be used to alleviate some concurrency issues that some users have hit with event sourcing using Marten when a single stream may be receiving bursts of commands that impact the event stream. The leadership election and agent distribution in Wolverine, in conjunction with this “sticky” listener assignment, gives Wolverine a nascent ability for virtual actors that we will continue to exploit. More soon-ish!

Wolverine’s New PostgreSQL Messaging Transport

Wolverine just got a new PostgreSQL-backed messaging transport (with the work sponsored by a JasperFx Software client!). The use case is just this, say you’re already using Wolverine to build a system with PostgreSQL as your backing database, and want to introduce some asynchronous, background processing in your system — which you could already do with just a database backed, local queue. Going farther though, let’s say that we’d like to have a competing consumers setup for our queueing for load balancing between active nodes and we’d like to do that without having to introduce some kind of new message broker infrastructure into our existing architecture.

That’s time to bring in Wolverine’s new option for asynchronous messaging just using our existing PostgreSQL database. To set that up by itself (without using Marten, but we’ll get to that in a second), it’s these couple lines of code:

var builder = WebApplication.CreateBuilder(args);
var connectionString = builder.Configuration.GetConnectionString("postgres");

builder.Host.UseWolverine(opts =>
{
    // Setting up Postgresql-backed message storage
    // This requires a reference to Wolverine.Postgresql
    opts.PersistMessagesWithPostgresql(connectionString);

    // Other Wolverine configuration
});

Of course, you’d want to setup PostgreSQL queues for Wolverine to send to and to listen to for messages to process. That’s shown below:

using var host = await Host.CreateDefaultBuilder()
    .UseWolverine((context, opts) =>
    {
        var connectionString = context.Configuration.GetConnectionString("postgres");
        opts.UsePostgresqlPersistenceAndTransport(connectionString, "myapp")
            
            // Tell Wolverine to build out all necessary queue or scheduled message
            // tables on demand as needed
            .AutoProvision()
            
            // Optional that may be helpful in testing, but probably bad
            // in production!
            .AutoPurgeOnStartup();

        // Use this extension method to create subscriber rules
        opts.PublishAllMessages().ToPostgresqlQueue("outbound");

        // Use this to set up queue listeners
        opts.ListenToPostgresqlQueue("inbound")
            
            .CircuitBreaker(cb =>
            {
                // fine tune the circuit breaker
                // policies here
            })
            
            // Optionally specify how many messages to 
            // fetch into the listener at any one time
            .MaximumMessagesToReceive(50);
    }).StartAsync();

And that’s that, we’re completely set up for messaging via the PostgreSQL database we already have with our Wolverine application!

Just a couple things to note before you run off and try to use this:

  • Like I alluded to earlier, the PostgreSQL queueing mechanism supports competing consumers, so different nodes at runtime can be pulling and processing messages from the PostgreSQL queues
  • There is a separate set of tables for each named queue (one for the actual inbound/outbound messages, and a separate table to segregate “scheduled” messages). Utilize that separation for better performance as needed by effectively sharding the message transfers
  • As that previous bullet point implies, the PostgreSQL transport is able to support scheduled message delivery
  • As in most cases, Wolverine is able to detect whether or not the necessary tables all exist in your database, and create any missing tables for you at runtime
  • In the case of using Wolverine with Marten multi-tenancy through separate databases, the queue tables will exist in all tenant databases
  • There’s some optimizations and integration between these queues and the transactional inbox/outbox support in Wolverine for performance by reducing database chattiness whenever possible

Summary

I’m not sure I’d recommend this approach over dedicated messaging infrastructure for high volumes of messages, but it’s a way to get things done with less infrastructure in some cases and it’s a valuable tool in the Wolverine toolbox.

Durable Background Processing with Wolverine

A couple weeks back I started a new blog series meant to explore Wolverine’s capabilities for background processing. Working in very small steps and only one new concept at a time, the first time out just showed how to set up Wolverine inside a new ASP.Net Core web api service and immediately use it for offloading some processing from HTTP endpoints to background processing by using Wolverine’s local queues and message handlers for background processing.

In that previous post though, the messages held in those in memory, local queues could conceivably be lost if the application is shut down unexpectedly (Wolverine will attempt to “drain” the local queues of outstanding work on graceful process shutdowns). That’s perfectly acceptable sometimes, but in other times you really need those queued up messages to be durable so that the in flight messages can be processed even if the service process is unexpectedly killed while work is in flight — so let’s opt into Wolverine’s ability to do exactly that!

To that end, let’s just assume that we’re a very typical .NET shop and we’re already using Sql Server as our backing database for the system. Knowing that, let’s add a new Nuget reference to our project:

dotnet add package WolverineFx.SqlServer

And let’s break into our Program file for the service where all the system configuration is, and expand the Wolverine configuration within the UseWolverine() call to this:

// This is good enough for what we're trying to do
// at the moment
builder.Host.UseWolverine(opts =>
{
    // Just normal .NET stuff to get the connection string to our Sql Server database
    // for this service
    var connectionString = builder.Configuration.GetConnectionString("SqlServer");
    
    // Telling Wolverine to build out message storage with Sql Server at 
    // this database and using the "wolverine" schema to somewhat segregate the 
    // wolverine tables away from the rest of the real application
    opts.PersistMessagesWithSqlServer(connectionString, "wolverine");
    
    // In one fell swoop, let's tell Wolverine to make *all* local
    // queues be durable and backed up by Sql Server 
    opts.Policies.UseDurableLocalQueues();
});

Nothing else in our previous code needs to change. As a matter of fact, once you restart your application — assuming that your box can reach the Sql Server database in the appsettings.json file — Wolverine is going to happily see that those necessary tables are missing, and build them out for you in your database so that Wolverine “can just work” on its first usage. That automatic schema creation can of course be disabled and/or done with pure SQL through other Wolverine facilities, but for right now, we’re taking the easy road.

Before I get into the runtime mechanics, here’s a refresher about our first message handler:

public static class SendWelcomeEmailHandler
{
    public static void Handle(SignUpRequest @event, ILogger logger)
    {
        // Just logging, a real handler would obviously do something real
        // to send an email
        logger.LogInformation("Send a Send a welcome email to {Name} at {Email}", @event.Name, @event.Email);
    }
}

And the code that publishes a SignUpRequest message to a local Wolverine queue in a Minimal API endpoint:

app.MapPost("/signup", (SignUpRequest request, IMessageBus bus) 
    => bus.PublishAsync(request));

After our new configuration up above to add message durability to our local queues, when a service client posts a SignUpRequest message is published to Wolverine as a result of a client posting valid data to the /signup Url, Wolverine will:

  1. Persist all the necessary information about the new SignUpRequest message that Wolverine uses to process that message in the Sql Server database (this is using the “Envelope Wrapper” pattern from the old EIP book, which is quite originally called Envelope in the Wolverine internals).
  2. If the message is successfully processed, Wolverine will delete that stored record for the message in Sql Server
  3. If the message processing fails, and there’s some kind of retry policy in effect, Wolverine will increment the number of failed attempts in the Sql Server database (with an UPDATE statement because it’s trying to be as efficient as possible)
  4. If the process somehow fails while the message is floating around in the in memory queues, Wolverine will be able to recover that local message from the database storage later when the system is restarted. Or if the system is running in a load balanced cluster, a different Wolverine node will be able to see that the messages are orphaned in the database and will steal that work into another node so that the messages eventually get processed

Summary and What’s Next?

That’s a lot of detail about what is happening in your system, but I’d argue that was very little code necessary to make the background processing with Wolverine be durable. And all without introducing any other new infrastructure other than the Sql Server database we were probably already using. Moreover, Wolverine can do a lot to make the necessary database setup for you at runtime so there’s hopefully very little friction getting up and running after a fresh git clone.

I’ll add at least a couple more entries to this series by looking at error handling strategies, controller the parallelism or strict ordering of message processing, a simple implementation of the Producer/Consumer pattern with Wolverine, and message scheduling.

Actually Talking about Modular Monoliths

That’s Winterfell from a Game of Thrones if you were curious. I have no earthly idea whether or not this mini-series of posts will be the slightest bit useful for anyone else, but it’s probably been good for me to read up much more on what other people think and to just flat out ponder this a lot more as it’s suddenly relevant to several different current JasperFx Software clients.

In my last post Thoughts on “Modular Monoliths” I was starting write a massive blog post about my reservations about modular monoliths and where and how the “Critter Stack” tools (Marten and Wolverine) are or aren’t already well suited for modular monolith projection construction — but I really only got around to talking about the problems with both traditional monolith structures and micro-service architectures before running out of steam. This post is to finally lay out my thoughts on “modular monoliths” with yet another 3rd post coming later to get into some specifics about how the “Critter Stack” tools (Marten and Wolverine) fit into this architectural style. I do think even the specifics of the Critter Stack tooling will help illuminate some potential challenges for folks building modular monolith systems with completely different tooling.

So let’s get started by me saying that my conceptual understanding of a “modular monolith” is that it’s a single codebase like we’ve traditionally done and consistently failed at, but this time we’ll pay more attention to isolating logical modules of functionality within that single codebase:

As I said in the earlier post, I’m very dubious about how effective the modular monolith strategy will really be for our industry because I think it’s still going to run into some very real friction. Do keep in mind that I’m coming from a primarily .NET background, and that means there’s no easy way to run multiple versions of the same library in the same process. That being said, here’s what else I’m still worried about:

  • You may still suffer from the same kind of friction with slow builds and sluggish IDEs you almost inevitably get from large codebases if you have to work with the whole codebase at one time — but maybe that’s not actually a necessity all the time, so let’s keep talking!
  • I really think it’s important for the long term health of a big system to be able to do technical upgrades or switch out technologies incrementally instead of having to upgrade the whole codebase at one time because you can never, ever (or at least very rarely) convince a non-technical product owner to let you stop and take six months to upgrade technologies without adding any new features — nor honestly should you even try to do that without some very good reasons
  • It’s still just as hard to find the right boundaries between modules as it was to make the proper boundaries for truly independent micro-services

A massive pet peeve of mine is hearing people exclaim something to the effect of “just use Domain Driven Design and your service and/or module boundaries will always be right!” I think the word “just” is doing a lot of work there and anybody who says that should read the classic essay Lullaby Language.

After writing my previous blog post, working on the proposals for a client that spawned this conversation in the first place, and reading up on a lot of other people’s thoughts about this subject, I’ve got a few more positive things to say.

I’m a proponent of “Vertical Slice Architecture” code organization and a harsh critic of layered architecture approaches (Clean/Onion/Hexagonal) as they are commonly practiced, so the idea of organizing related functionality together in modules throughout the system instead of giant, horizontal layers first definitely appeals to me and I think that’s a hugely valuable shift in thinking.

I’m much more bullish on modular monoliths after thinking more about the Citadel and Outpost approach where you start with the assumption that some modules of the monolith will be spun out into separate processes later when the team feels like the service boundaries are truly settled enough to make that viable. To that end, I liked the way that Glenn Henriksen put this over the weekend:

Continuing the “Citadel” theme where you assume that you will later spawn separate “Outpost” processes later, I’m now highly concerned with building the initial system in such a way that it’s relatively easy to excise out modules to separate processes later. In the early days of Extreme Programming, we talked a little bit about the concept of “Reversibility“, which just means how easy or hard it will be to change your mind and technical direction about any given technology or approach. Likewise with the “modular monolith” approach, I actually want to think a little bit upfront about having a path to easily break out modules into separate processes or services later.

I’m probably a little more confident about introducing some level of asynchronous messaging and distributed development than some folks, so I’m going to come right out and say that I would be willing to start with some modules split into separate processes right off the bat, but ameliorate that by assuming that all these processes will have to be deployed together and will live together in one single mono-repository. To circle back to the earlier “Reversibility” theme, I think this compromise will make it much easier for teams to adjust service boundaries later as everything will be living in the same repository.

Lastly on this topic, it’s .NET-centric, but I’m hopeful that Project Aspire makes it much easier to work with this kind of distributed monolith. Likewise, I’m keeping an eye on tooling like Incrementalist as a way of better working with mono-repository codebases.

What about the Database?

There are potential dangers that might make our modular monolith decision less reversible than we’d like, but for right now let’s focus on just the database (or databases). Specifically, I’m concerned about avoiding what I’ve called the Pond Scum Anti-Pattern where a lot of different applications (or modules in our new world) float on top of a murky shared database like pond scum on brackish water in sweltering summer heat.

I grew up on farms fishing in that exact kind of farm pond, hence the metaphor:)

Taking the ideal from micro-services, I’d love it if each module had logically separate database storage such that even if they are all targeting the same physical database server, there’s some relatively easy way later to pull out the storage for an individual module and move it out later.

If scalability was an issue, I would happily go for breaking the storage for some modules out into separate databases, even though that’s a little more complexity. Some of the other folks I read in researching this topic suggested using foreign data wrappers to offload database work while still making it look to your modular monolith like it’s one big happy family database — but I personally think that’s crazy town. There’s also a very real benefit to allowing different modules to use different styles of databases or persistence based on their needs.

This probably won’t happen, but I did at least raise the possibility to a client of using event sourcing in some of their workflow-centric modules while allowing simpler modules to be remain CRUD-centric.

How Do the Modules Stay Decoupled?

Assuming we all buy off into the idea of our modules remaining loosely coupled over time such that we have a pathway to pull them out into separate “Outpost” processes later, we absolutely don’t want many of the red arrows popping up as shown below:

Hat tip to Steve Smith for this way of describing the modularity issues.

Mechanically, my first inclination to enforce the modularity is to say that we’ll use some kind of mediator tooling like either MediatR or my own Wolverine to handle cross module interactions. That comes with its own set of complications:

  • Potentially more code as is almost inevitable when purposely putting in any kind of anti-corruption layer
  • What to do when one module needs the data from a second module to do its work? One answer is to use Domain Queries between modules — again, probably with some kind of mediator tool. I’ve always been dubious about that strategy because of the extra code ceremony and the simple fact that any possible technique that adds abstractions between your top level code and the raw data access code has a high tendency to cause performance problems later. If you go down this path, I’d be cognizant of the potential performance penalties and look maybe for some way to batch up queries later
  • You potentially just say that if “Module 1 consistently needs access to data managed by Module 2” then you should probably merge the two modules. One fast way to get into trouble in any kind of complicated system is to organize first by different logical persisted entities rather than by operations. I think you’re far more likely to arrive at cleanly separate module boundaries by focusing on the command and query use cases of the system rather than dividing code up by entities like “Invoice” or “Order” or “Shipment.”

Just for now, I think there’s one last conversation to have about how a team will go about enforcing the usage of proper patterns and encapsulation of the various modules without devolving into a morass of the red arrows from the picture above. You could:

  1. Favor internalized discipline and socialized design goals by doing whatever it takes to be able to trust the developers to naturally do the right thing. Kumbaya, up with people, stop laughing at me! I think that internalized discipline will deliver better results than high ceremony approaches that try to straight jacket developers into doing the right things, but I’m prepared to be wrong on this one
  2. Utilize architectural tests or maybe some kind of fancy static code analysis that can spot violations of the architectural “who can talk to who” rules
  3. Try to separate out projects or packages for modules or parts of modules to enforce rules about “who can talk to who.” I hate this approach as part of something like the Onion Architecture, and I’m probably naturally suspicious of it inside of modular monoliths too — but at least this time you’re hopefully dividing along the lines of closely related functionality rather than organizing by broad layers first.

Summary and What’s Next

My only summary is that I’m still dubious that the modular monolith idea is going to be a panacea, but this has been helpful to me personally just to think on it much harder and see what other folks are doing and saying about this architectural style.

My next and hopefully last post in this series will be taking a look at how Wolverine and Marten do or do not lend themselves to the modular monolith approach, and what might need to be improved later in these tools.