Testing Asynchronous Projections in Marten

Hey, did you know that JasperFx Software offers formal support plans for Marten and Wolverine? Not only are we making the “Critter Stack” tools be viable long term options for your shop, we’re also interested in hearing your opinions about the tools and how they should change. We’re also certainly open to help you succeed with your software development projects on a consulting basis whether you’re using any part of the Critter Stack or some completely different .NET server side tooling.

As a kind of follow up to my post yesterday on Wolverine’s Baked In Integration Testing Support, I want to talk about some improvements to Marten that just went live in Marten 7.5 that are meant to make asynchronous projections much easier to test.

First off, let’s say that you have a simplistic document that can “self-aggregate” itself as a “Snapshot” in Marten like this:

public record InvoiceCreated(string Description, decimal Amount);

public record InvoiceApproved;
public record InvoiceCancelled;
public record InvoicePaid;
public record InvoiceRejected;

public class Invoice
{
    public Invoice()
    {
    }

    public static Invoice Create(IEvent<InvoiceCreated> created)
    {
        return new Invoice
        {
            Amount = created.Data.Amount,
            Description = created.Data.Description,

            // Capture the timestamp from the event
            // metadata captured by Marten
            Created = created.Timestamp,
            Status = InvoiceStatus.Created
        };
    }

    public int Version { get; set; }

    public decimal Amount { get; set; }
    public string Description { get; set; }
    public Guid Id { get; set; }
    public DateTimeOffset Created { get; set; }
    public InvoiceStatus Status { get; set; }

    public void Apply(InvoiceCancelled _) => Status = InvoiceStatus.Cancelled;
    public void Apply(InvoiceRejected _) => Status = InvoiceStatus.Rejected;
    public void Apply(InvoicePaid _) => Status = InvoiceStatus.Paid;
    public void Apply(InvoiceApproved _) => Status = InvoiceStatus.Approved;
}

For asynchronous projections of any kind, we have a little bit of complication for testing. In a classic “Arrange, Act, Assert” test workflow, we’d like to exercise our projection — and mind you, I strongly recommend that testing happen within its integration with Marten rather than some kind of solitary unit tests with fakes — with a workflow like this:

  1. Pump in some new events to Marten
  2. Somehow magically wait for Marten’s asynchronous daemon running in a background thread progress to the point where it’s handled all of our newly appended events for all known, running projections
  3. Load the expected documents that should have been persisted or updated from our new events by the projections running in the daemon, and run some assertions on the expected system state

For right now, I want to worry about the second bullet point and introduce a new (old, but it actually works correctly now) WaitForNonStaleProjectionDataAsync API introduced in Marten 7.5. You can see the new API used in this test from the new documentation on Testing Projections:

[Fact]
public async Task test_async_aggregation_with_wait_for()
{
    // In your tests, you would most likely use the IHost for your
    // application as it is normally built
    using var host = await Host.CreateDefaultBuilder()
        .ConfigureServices(services =>
        {
            services.AddMarten(opts =>
                {
                    opts.Connection(
                        "Host=localhost;Port=5432;Database=marten_testing;Username=postgres;password=postgres;Command Timeout=5");
                    opts.DatabaseSchemaName = "incidents";

                    // Notice that the "snapshot" is running inline
                    opts.Projections.Snapshot<Invoice>(SnapshotLifecycle.Async);
                })

                // Using Solo in tests will help it start up a little quicker
                .AddAsyncDaemon(DaemonMode.Solo);
        }).StartAsync();

    var store = host.Services.GetRequiredService<IDocumentStore>();

    var invoiceId = Guid.NewGuid();

    // Pump in events
    using (var session = store.LightweightSession())
    {
        session.Events.StartStream<Invoice>(invoiceId, new InvoiceCreated("Blue Shoes", 112.24m));
        await session.SaveChangesAsync();

        session.Events.Append(invoiceId,new InvoiceApproved());
        session.Events.Append(invoiceId,new InvoicePaid());
        await session.SaveChangesAsync();
    }

    // Now, this is going to pause here in this thread until the async daemon
    // running in our IHost is completely caught up to at least the point of the
    // last event captured at the point this method was called
    await store.WaitForNonStaleProjectionDataAsync(5.Seconds());

    // NOW, we should expect reliable results by just loading the already
    // persisted documents built by rebuilding the projection
    await using var query = store.QuerySession();

    // Load the document that was "projected" from the events above
    // and immediately persisted to the document store
    var invoice = await query.LoadAsync<Invoice>(invoiceId);

    // Run assertions
    invoice.Description.ShouldBe("Blue Shoes");
    invoice.Status.ShouldBe(InvoiceStatus.Paid);
}

Time. What about System Time?

See Andrew Lock’s blog post Avoiding flaky tests with TimeProvider and ITimer for more information on using TimeProvider in tests.

In the example projection, I’ve been capturing the timestamp in the Invoice document from the Marten event metadata:

public static Invoice Create(IEvent<InvoiceCreated> created)
{
    return new Invoice
    {
        Amount = created.Data.Amount,
        Description = created.Data.Description,

        // Capture the timestamp from the event
        // metadata captured by Marten
        Created = created.Timestamp,
        Status = InvoiceStatus.Created
    };
}

But of course, if that timestamp has some meaning later on and you have any kind of business rules that may need to key off that time, it’s very helpful to be able to control the timestamps that Marten is assigning to create predictable automated tests. As of Marten 7.5, Marten uses the newer .NET TimeProvider behind the scenes, and you can replace it in testing like so:

[Fact]
public async Task test_async_aggregation_with_wait_for_and_fake_time_provider()
{
    // Hang on to this for later!!!
    var eventsTimeProvider = new FakeTimeProvider();

    // In your tests, you would most likely use the IHost for your
    // application as it is normally built
    using var host = await Host.CreateDefaultBuilder()
        .ConfigureServices(services =>
        {
            services.AddMarten(opts =>
                {
                    opts.Connection(
                        "Host=localhost;Port=5432;Database=marten_testing;Username=postgres;password=postgres;Command Timeout=5");
                    opts.DatabaseSchemaName = "incidents";

                    // Notice that the "snapshot" is running inline
                    opts.Projections.Snapshot<Invoice>(SnapshotLifecycle.Async);

                    opts.Events.TimeProvider = eventsTimeProvider;
                })

                // Using Solo in tests will help it start up a little quicker
                .AddAsyncDaemon(DaemonMode.Solo);
        }).StartAsync();

    var store = host.Services.GetRequiredService<IDocumentStore>();

    var invoiceId = Guid.NewGuid();

    // Pump in events
    using (var session = store.LightweightSession())
    {
        session.Events.StartStream<Invoice>(invoiceId, new InvoiceCreated("Blue Shoes", 112.24m));
        await session.SaveChangesAsync();

        session.Events.Append(invoiceId,new InvoiceApproved());
        session.Events.Append(invoiceId,new InvoicePaid());
        await session.SaveChangesAsync();
    }

    // Now, this is going to pause here in this thread until the async daemon
    // running in our IHost is completely caught up to at least the point of the
    // last event captured at the point this method was called
    await store.WaitForNonStaleProjectionDataAsync(5.Seconds());

    // NOW, we should expect reliable results by just loading the already
    // persisted documents built by rebuilding the projection
    await using var query = store.QuerySession();

    // Load the document that was "projected" from the events above
    // and immediately persisted to the document store
    var invoice = await query.LoadAsync<Invoice>(invoiceId);

    // Run assertions, and we'll use the faked timestamp
    // from our time provider
    invoice.Created.ShouldBe(eventsTimeProvider.Start);
}

In the sample above, I used the FakeTimeProvider from the Microsoft.Extensions.TimeProvider.Testing Nuget package.

Summary

We take testability and automated testing very seriously throughout the entire “Critter Stack.” The testing of asynchronous projections has long been a soft spot that we hope is improved by the new capabilities in this post. As always, feel free to pop into the Critter Stack Discord for any questions.

Leave a comment