Offline Event Processing in Marten with the new “Async Daemon”

The feature I’m talking about here was very difficult to write, brand new, and definitely in need of some serious user testing from anyone interested in kicking the tires on it. We’re getting a lot of interest in the Marten Gitter room about doing the kinds of use cases that the async daemon described below is meant to address. This was also the very last feature on Marten’s “must have for 1.0” list, so there’s a new 1.0-alpha nuget for Marten. 1.0 is still at least a couple months away, but it’s getting closer.

A couple weeks ago I pulled the trigger on a new, but long planned, feature in Marten we’ve been calling the “async daemon” that allows users to build and update projected views against the event store data in a background process hosted in your application or an external service.

To put this in context, let’s say that you are building an application to track the status of a Github repositories with event sourcing for the persistence. In this application, you would record events for things like:

  • Project started
  • A commit pushed into the main branch
  • Issue opened
  • Issue closed
  • Issue re-opened

There’s a lot of value to be had by recording the raw event data, but you still need to frequently see a rolled up view of each project that can tell you the total number of open issues, closed issues, how many lines of code are in the project, and how many unique contributors are involved.

To do that rollup, you can build a new document type called ActiveProject just to present that information. Optionally, you can use Marten’s built in support for making aggregated projections across a stream by adding Apply([Event Type]) methods to consume events. In my end to end tests for the async daemon, I used this version of ActiveProject (the raw code is on GitHub if the formatting is cut off for you):

    public class ActiveProject
    {
        public ActiveProject()
        {
        }

        public ActiveProject(string organizationName, string projectName)
        {
            ProjectName = projectName;
            OrganizationName = organizationName;
        }

        public Guid Id { get; set; }
        public string ProjectName { get; set; }

        public string OrganizationName { get; set; }

        public int LinesOfCode { get; set; }

        public int OpenIssueCount { get; set; }

        private readonly IList<string> _contributors = new List<string>();

        public string[] Contributors
        {
            get { return _contributors.OrderBy(x => x).ToArray(); }
            set
            {
                _contributors.Clear();
                _contributors.AddRange(value);
            }
        }

        public void Apply(ProjectStarted started)
        {
            ProjectName = started.Name;
            OrganizationName = started.Organization;
        }

        public void Apply(IssueCreated created)
        {
            OpenIssueCount++;
        }

        public void Apply(IssueReopened reopened)
        {
            OpenIssueCount++;
        }

        public void Apply(IssueClosed closed)
        {
            OpenIssueCount--;
        }

        public void Apply(Commit commit)
        {
            _contributors.Fill(commit.UserName);
            LinesOfCode += (commit.Additions - commit.Deletions);
        }
    }

Now, you can update projected views in Marten at the time of event capture with what we call “inline projections.” You could also build the aggregated view on demand from the underlying event data. Both of those solutions can be appropriate in some cases, but if our GitHub projects are very active with a fair amount of concurrent writes to any given project stream, we’d probably be much better off to move the aggregation updates to a background process.

That’s where the async daemon comes into play. If you have a Marten document store, you can start up a new instance of the async daemon like so (the underlying code shown below is in GitHub):

[Fact] 
public async Task build_continuously_as_events_flow_in()
{
    // In the test here, I'm just adding an aggregation for ActiveProject
    StoreOptions(_ =>
    {
        _.Events.AsyncProjections.AggregateStreamsWith<ActiveProject>();
    });

    using (var daemon = theStore.BuildProjectionDaemon(logger: _logger, settings: new DaemonSettings
    {
        LeadingEdgeBuffer = 1.Seconds()
    }))
    {
        // Start all of the configured async projections
        daemon.StartAll();

        // This just publishes event data
        await _fixture.PublishAllProjectEventsAsync(theStore);


        // Runs all projections until there are no more events coming in
        await daemon.WaitForNonStaleResults().ConfigureAwait(false);

        await daemon.StopAll().ConfigureAwait(false);
    }

    // Compare the actual data in the ActiveProject documents with 
    // the expectation
    _fixture.CompareActiveProjects(theStore);
}

In the code sample above I’m starting an async daemon to run the ActiveProject projection updating, and running a series of events through the event store. The async daemon is continuously detecting newly available events and applying those to the correct ActiveProject document. This is the only place in Marten where we utilize the idea of eventual consistency to allow for faster writes, but it’s clearly warranted in some cases.

Rebuilding a Projection From Existing Data

If you’re going to use event sourcing with read side projections (the “Q” in your CQRS architecture), you’re probably going to need a way to rebuild projected views from the existing data to fix bugs or add new data. You’ll also likely introduce new projected views after the initial rollout to production. You’ll absolutely need to rebuild projected view data in development as you’re iterating your system.

To that end, you can also use the async daemon to completely tear down and rebuild the population of a projected document view from the existing event store data.

// This is just some test setup to establish the DocumentStore
StoreOptions(_ => { _.Events.AsyncProjections.AggregateStreamsWith<ActiveProject>(); });

// Publishing some pre-canned event data
_fixture.PublishAllProjectEvents(theStore);


using (var daemon = theStore.BuildProjectionDaemon(logger: _logger, settings: new DaemonSettings
{
    LeadingEdgeBuffer = 0.Seconds()
}))
{
    await daemon.Rebuild<ActiveProject&gt().ConfigureAwait(false);
}

Taken from the tests for the async daemon on Github.

Other Functionality Possibilities

The async daemon can be described as just a mechanism to accurately and reliably execute the events in order through the IProjection interface shown below:

    public interface IProjection
    {
        Type[] Consumes { get; }
        Type Produces { get; }

        AsyncOptions AsyncOptions { get; }
        void Apply(IDocumentSession session, EventStream[] streams);
        Task ApplyAsync(IDocumentSession session, EventStream[] streams, CancellationToken token);
    }

Today, the only built in projections in Marten are to do one for one transformations of a certain event type to a view document and the aggregation by stream use case shown above in the ActiveProject example. However, there’s nothing preventing you from creating your own custom IProjection classes to:

  • Aggregate views across streams grouped by some kind of classification like region, country, person, etc.
  • Project event data into flat relational tables for more efficient reporting
  • Do complex event processing

 

 

 

What’s Next for the Async Daemon

The async daemon is the only major thing missing from the Marten documentation, and I need to fill that in soon. This blog post is just a down payment on the async daemon docs.

I cut a lot of content out on how the async daemon works. Since I thought this was one of the hardest things I’ve ever coded myself, I’d like to write a post next week just about designing and building the async daemon and see if I can trick some folks into effectively doing a code review on it;)

This was my first usage of the TPL Dataflow library and I was very pleasantly surprised by how much I liked using it. If I’m ambitious enough, I’ll write a post later on building producer/consumer queues and using back pressure with the dataflow classes.

4 thoughts on “Offline Event Processing in Marten with the new “Async Daemon”

  1. Well, through that post, I finally get an “intro” as to what is Marten.
    I’ve been working on systems using NeventStore, and struggled with projection.
    This API is very straight forward and intuitive. Now I’ll just have to give this a test-drive, and dig deeper into yet another of your awesome projects. 🙂

Leave a comment