When I was at Codemash this year, Matthew Groves was kind enough to let me do a podcast with him on Marten for the Cross Cutting Concerns podcast. Check it out.
Tag: Marten
Marten 1.3 is Out: Bugfixes, Usability Improvements, and a lot less Memory Usage
I just uploaded Marten 1.3.0 to Nuget (but note that Nuget has had issues today with the index updating being delayed). This release is mostly bugfixes, but there’s some new functionality, and significant improvements to performance on document updates and bulk inserts. You can see the entire list of changes here with some highlights below.
I’d like to thank Marten contributors Eric Green, James Hopper, Michał Gajek, Barry Hagan, and Babu Annamalai for their contributions in this release. A special thanks goes out to Szymon Kulec for all his efforts in both Marten and Npgsgl to reduce Marten’s memory allocations.
Thanks to Phillip Haydon There’s a slew of new documentation on our website about Postgresql for Sql Server folks.
What’s New?
It wasn’t a huge release for new features, but these were added:
- New “AsPagedList()” helper for fetching documents by page
- Query for deleted, not deleted, or all documents marked as “soft deleted“
- Indexes on Marten’s metadata columns
- Querying by the document metadata
What’s Next?
The next release is going to be Marten 2.0 because we need to make a handful of breaking API changes (don’t worry, it’s very unlikely that most users would hit this). The big ticket item is a lot more work to reduce memory allocations throughout Marten. The other, not-in-the-slightest-bit-sexy change is to standardize and streamline Marten’s facilities for database change tracking with the hope that this work will make it far easier to start adding new features again.
Marten 1.2 — Improved Linq support and way more polish
Marten is a library for .Net that turns Postgresql into a document database and event store.
I just published the Marten 1.2 release to Nuget. While I hoped to fit a lot more new functionality into this release, 1.2 really just adds a lot more polish to Marten by fixing several bugs, makes some performance improvements based on my company’s trial by fire usage of Marten during our peak “season”, and by largely reworking the internals of the Linq support.
Marten continues to have a vibrant community of interested folks and contributors that are helping push the project on. Probably missing some names, but I’d like to call out James Hopper, jokokko, Barry Hagan, Alexander Langer, and Robin van der Knaap for their contributions to this release. I’d also like to thank all of you who have opened and commented on Github issues to help improve Marten. If this all keeps up long enough, I may finally stop being so cynical about OSS on the .Net platform;)
Here’s the entire list of changes from the GitHub milestone. The highlights of the 1.2 release are:
- Support for the SelectMany() operator in Linq queries (this story spurred an absurd amount of rework in our Linq support that I think will make it easier to add more features in subsequent releases)
- Distinct() Linq query support
- Named parameter usage in user supplied queries
- Better logging and exception messages
- Marten’s sequential Guid algorithm was corrected to order consistently with Postgresql. This should result in better write performance in Marten usage with Guid id’s.
- Marten tries harder to warn you when you use unsupported Linq operators
- Several improvements to querying against child collections
- The ability to use event metadata in the built in aggregation projections
- Cleaned up some of the database connection mechanics to stop mixing blocking and async calls and makes Marten much more aggressive about closing database connections
What’s Next?
I’m not 100% sure I want to commit to another new release before the holiday season, but 1.3 is looking like it’s going to be a lot of improvements for querying against multiple documents, new types of Select() transformations, and working over the internals to optimize performance.
The tentative list of 1.3 enhancements can be seen here.
Marten 1.1 Release Notes
Marten 1.1 was released just now (as in, hold your horses until Nuget gets done indexing it) with an assortment of bug fixes, performance & reliability improvements, and a couple of new convenience methods. As our teams have used Marten more at work, we’ve also had to make some adjustments for running Marten under reduced Postgresql security privileges and with the “AutoCreateSchemaObjects == None” mode. Finally, we had to add a couple new public members to existing API’s, so SemVer rules mean this had to be a minor point bump.
So what’s new or different? You can find the entire 1.1 issue and pull request list in GitHub. The highlights are described below:
Distinct() Support in Linq
From a pull request by John Campion, Marten now supports the Linq Distinct() keyword:
public void use_distinct(IQuerySession session)
{
var surnames = session
.Query<User>()
.Select(x => x.LastName)
.Distinct();
}
Better Connection and Transaction Hygiene
I’m a little embarrassed by this one, but at least we got it before it did too much harm. Marten had been too aggressive in starting transactions in sessions which has had the effect of making Npgsql send extraneous ROLLBACK; messages to Postgresql to close out the empty transactions. In some failure cases, our team at work was seeing this cause a connection to hang. We made two fixes for this behavior:
First off, if you IDocumentSession.SaveChanges(Async) is called when there are no outstanding changes queued up, Marten does absolutely nothing. No connection opened, no transaction started, just nothing.
Secondly, Marten now starts transactions lazily within an IDocumentSession. So instead of starting a transaction on the first time a session opens a connection to Postgresql, it defers that until SaveChanges() or SaveChangesAsync() is called.
public void lazy_tx(IDocumentSession session)
{
// Executing this query will *not* start
// a new transaction
var users = session
.Query<User>()
.Where(x => x.Internal)
.ToList();
session.Store(new User {UserName = "lebron"});
// This starts a transaction against the open
// connection before doing any writes
session.SaveChanges();
}
Data Migration Improvements
From our work on moving document storage from RavenDb to Marten (and other users too), we’ve bumped into a little bit of friction in Marten. The bulk inserts in either of the non-default modes left out the last modified data. That impacts either of these options:
public void bulk_inserts(IDocumentStore store, Target[] documents)
{
store.BulkInsert(documents, BulkInsertMode.IgnoreDuplicates);
// or
store.BulkInsert(documents, BulkInsertMode.OverwriteExisting);
}
To make it easier to migrate data in documents that uses a Hilo sequence for identity assignment, we added a convenience method to establish a new “floor” in the sequence to avoid conflicting with the existing data being brought over from a new system.
public void reset_hilo(IDocumentStore store)
{
// This resets the Hilo state in the database
// for the IntDoc document type so that
// all id's assigned will be greater than the floor
// value.
store.Advanced.ResetHiloSequenceFloor<IntDoc>(3000);
}
Do note that it’s possible and even likely that there will be gaps in the id sequence in the database when you do this.
Building Marten’s Async Daemon
A couple weeks ago I wrote a blog post on the new “Async Daemon” feature in Marten. This post is a bit that I cut out of that post just describing the challenges I faced and what I did to slide around the problems. For all Marten users that have been asking me about writing their own subsystem to read and process events offline, you really want to read this post to understand why that’s much harder than you’d think and why you do probably want to just help make the async daemon solid.
The first challenge for the async daemon was “knowing” when there are new events that need to be processed by async projections. When a projection runs, it needs to process the events in the same order that they were captured in. Since the async daemon was inevitably going to use some sort of polling (NOTIFY/LISTEN in Postgresql was not adequate by itself) to read events out of the event table, we needed a very efficient way to be able to page the event fetching without missing events.
We started Marten with the thought that we would try to accomplish that by having the event store enqueue the events in a rolling buffer table that some kind of offline process would poll and read, but we were talked out of that approach in discussions with a Postgresql consultant who was helping us at work. Moreover, as I worked through other use cases to rebuild projections from scratch or add new projections later, we realized that the rolling buffer table would never have worked for the async daemon.
We also experimented with using sequential Guid’s as the global identifier for events in the event store with the idea that we would be able to use that to key off of for the projections by always querying for “Id > [last event id encountered].” In my testing I was unable to get the sequential Guid algorithm to accurately order the event id’s, especially under a heavy parallel load.
In the end, we opted to make the event store table in Marten use a sequential long integer as its primary key, and backed that with a database SEQUENCE. That gave us a more reliable way to “know” what events were new for each individual projection. In testing I figured out pretty quickly that the async daemon was missing events when there’s a lot of concurrent events streaming in because of event sequence id’s being reserved from in flight transactions. To counteract that problem, I ended up taking a two step process:
- Limit the async daemon to only querying against events that were captured before some time of threshold (3 seconds is the default) to avoid missing events that are still in flight
- When the async daemon fetches a new page of events, it actually tries to check that there are no gaps in the event sequence, and if there is, it pauses a little bit, and tries again until there are no gaps in the sequence or if the subsequent fetch turns up the exact same data (leading the async daemon to believe that the missing events were rejected).
Those two steps — as far as I can tell — have eliminated the problems I was seeing before about missing events in flight. It did completely ruin a family dinner at our favorite Thai restaurant when I couldn’t make myself stop thinking about how to slide around the problems in event ordering;)
The other killer problem was in trying to make the async daemon resilient in the face of potential connectivity problems and occasional projection failures without losing any results. I’ll try to blog about that in a later post.
Proposed Roadmap for Marten 1.0 and Beyond
I’m just thinking out loud here and hoping for some usable feedback.
I feel like Marten is getting very close to an official 1.0 release, and the latest Nuget is marked as 1.0-alpha. The Marten community voted on our minimum feature set for 1.0 earlier this year and we’ve finished everything on that list as of late July (right before I went on a long family vacation;)).
Some thoughts on the big 1.0:
- I’m a big believer in semantic versioning, so an OSS tool reaching 1.0 is a big deal because that starts the draconian versioning rules about backward compatibility. You want to get pretty close to a livable API before you throw that switch to 1.0.
- It’s a chicken and egg kind of conundrum. What we need right now is more users spawning yet more feedback about Marten. I’d love to have more usage before flipping Marten to 1.0, but we’ll get a lot more users after we release it as 1.0.
- In this day and age of package managers like Nuget, it’s a lot less friction to make more frequent releases and update your dependencies, so going 1.0 now knowing that 1.0.* bug fix releases and 1.* feature releases will be coming soon just isn’t that worrisome.
- I feel pretty good about the document database side of Marten, but the event store functionality is still churning and it’s less mature.
- We’re basically out of low hanging fruit kind of features on the document storage and Linq support
- My shop is doing the work right now to transition a very large web application from RavenDb to Marten. Right now I’m thinking that the first version of Marten that goes into production across all of that application will be declared to be 1.0.
All that being said, my best guess for an official Marten 1.0 release is around October 1st. Right now my biggest issues on my plate are really all around schema management and our database team’s requirements for the DDL generation. And more documentation, but that battle never ends. Plenty of pull requests are still flowing in, but I think I’m personally done with any kind of major feature work for awhile unless there’s noticeable demand from the community for specific features.
Marten 1.1 and Beyond
Based on our current issue list and requests from the Marten Gitter room, I think this list is where Marten goes next after the 1.0 release:
- Better support for child collections on documents
- Support the Linq SelectMany() operator
- Be able to do Include() against child collections
- Maybe be able to use child collections in Select() transformations
- More types of event store projections — if you’re looking to get into doing some OSS work, I think these are our most approachable stories in the backlog
- Project to a flat table for better reporting?
- Projections that use the output of other projections
- Arbitrary categorization of projected views (by customer, by region, etc.). Some of our users have already done this themselves, but it’s not in Marten itself yet
- Multi-tenancy support. My thinking right now is that we don’t directly put this into Marten, but make sure that there are adequate hooks to do this easily yourself. There’s a lot more information in the GitHub issue linked to above.
- Possibly try to support the Linq GroupBy() operator. That might also lead into some kind of map/reduce capability within Marten. We’ve had the feedback that “Marten isn’t a real document db because it doesn’t have map/reduce.” I think that’s nonsense, but we might very well need to have a better story for creating aggregated views into the document state — which may or may not be best done as some kind of formal map/reduce strategy.
- More support on document structural changes. Marten can already handle transformations of a single document type, but we’ll need to be able to later address document type names being changed, multiple document types getting combined (this is potentially a big deal for one of our systems), and whatever else we bump into next spring when we start optimizing a big system at work;)
- Being able to do document transformation with more than one document at time. This would mean being able to use related documents in the same Select() transformation. Also, we’ll probably need to be able to use Javascript transformations across multiple document types.
There’s some other things in the GitHub issue list, but the above is what I’m thinking about right now for 1.1 and beyond.
Thoughts? Concerns? Requests? Let us know either here, the GitHub issue list, or the Marten Gitter room.
Proposed Marten Tooling for Database Management
This is an update to an earlier post on schema management using Marten.
At this point, I think the biggest challenges facing us at work for using Marten are strictly in the realm of database change management. To that end, we’re adding what will be a new package for command line tooling around Marten schema management and investigating possible usage of Sqitch to handle database migrations in our ecosystem. The command line usage shown in this post is in Marten master, but not pushed up to Nuget in any way yet. The Sqitch usage here is purely hypothetical.
When you’re using Marten, all the data definition language (DDL) for the underlying Postgresql database is generated to match by code within Marten. In development, you’d just run with the setting to auto-create database objects on the fly to match the code for faster iteration. For production deployment, however, you probably don’t get to do that and you’ll need some kind of database migration strategy to get the changes that your Marten application needs to the real database. That’s the gap that this post is trying to fill.
Command Line Tooling
My concept for supporting command line tooling suitable for build automation at this point is to publish a new library package called Marten.CommandLine that you can use to expose your own application and database through the command line. To use this tooling, follow these steps:
- Create a new console application in your solution
- Add the forthcoming Marten.CommandLine nuget
- Add a reference to the projects in your system that would express the configuration for your Marten-enabled application
- In the “Main()” entry point of your new console application, add code like this below to build up your Marten configuration via the StoreOptions class and then delegate to Marten to parse the command line arguments and execute the proper command:
public class Program
{
public static int Main(string[] args)
{
var options = buildStoreOptions();
return MartenCommands.Execute(options, args);
}
private static StoreOptions buildStoreOptions()
{
// build your own StoreOptions that
// establishes the configuration of your
// Marten application
}
}
You can see an example of building the console application from the SampleConsoleApp project I used in the Marten codebase to test this functionality.
Once you have the code above, you’re actually ready to go. If you’re using the new dotnet CLI, running “dotnet run” in the root of your console application project yields this output listing the valid commands:
------------------------------------------------------------------------------------------------------------------------------------
Available commands:
------------------------------------------------------------------------------------------------------------------------------------
apply -> Applies all outstanding changes to the database based on the current configuration
assert -> Assert that the existing database matches the current Marten configuration
dump -> Dumps the entire DDL for the configured Marten database
patch -> Evaluates the current configuration against the database and writes a patch and drop file if there are any differences
------------------------------------------------------------------------------------------------------------------------------------
If you’re not using the dotnet CLI yet, you’d just need to compile your new console application like you’ve always done and call the exe directly. If you’re familiar with the *nix style of command line interfaces ala Git, you should feel right at home with the command line usage in Marten.
For the sake of usability, let’s say that you stick a file named “marten.cmd” (or the *nix shell file equivalent) at the root of your codebase like so:
dotnet run --project src/MyConsoleApp %*
All the example above does is delegate any arguments to your console application. Once you have that file, some sample usages are shown below:
# Assert that the database matches the current database. This # command will fail if there are differences marten assert --log log.txt # This command tries to update the database # to reflect the application configuration marten apply --log log.txt # This dumps a single file named "database.sql" with # all the DDL necessary to build the database to # match the application configuration marten dump database.sql # This dumps the DDL to separate files per document # type to a folder named "scripts" marten dump scripts --by-type # Create a patch file called "patch1.sql" and # the corresponding rollback file "patch.drop.sql" if any # differences are found between the application configuration # and the database marten patch patch1.sql --drop patch1.drop.sql
In all cases, the commands expose usage help through “marten help [command].” Each of the commands also exposes a “–conn” (or “-c” if you prefer) flag to override the database connection string and a “–log” flag to record all the command output to a file.
My Current Thinking about Marten + Sqitch
Our team doing the RavenDb to Marten transition work has turned us on to using Sqitch for database migrations. From my point of view, I like this choice because Sqitch just uses script files in whatever the underlying database’s SQL dialect is. That means that Marten can use our existing “WritePatch()” schema management to tie into Sqitch’s migration scheme.
The way that I think this could work for us is first to have a Sqitch project established in our codebase with its folders for updates, rollbacks, and verify’s. In our build script that runs in our master continuous integration (CI) build, we would:
- Call sqitch to update the CI database (or whatever database we declare to be the source of truth) with the latest known migrations
- Call the “marten assert” command shown above to detect if there are outstanding differences between the application configuration and the database by examining the exit code from that command
- If there are any differences detected, figure out what the next migration name would be based on our naming convention and use sqitch to start a new migration with that name
- Run the “marten patch” command to write the update and rollback scripts to the file locations previously determined in steps 2 & 3
- Commit the new migration file back to the underlying git repository
I’m insisting on doing this on our CI server instead of making developers do it locally because I think it’ll lead to less duplicated work and fewer problems from these migrations being created against work in progress feature branches.
For production (and staging/QA) deployments, we’d just use sqitch out of the box to bring the databases up to date.
I like this approach because it keeps the monotony of repetitive database change tracking out of our developer’s hair, while also allowing them to integrate database changes from outside of Marten objects into the database versioning.
Moving from RavenDb to Marten
EDIT 8/19: Couple other things came up about indexing yesterday that I added here.
For the purpose of this post, I’m only talking about the document database features in Marten. Our immediate need is to replace RavenDb before our busy season starts. Using the event store half of Marten probably won’t happen for us until next year.
The planets have finally aligned for us at work to begin converting our largest and most active application from RavenDb to Marten for persistence. I’m meeting with a couple of our teams this morning to talk over the transition, and this blog post is just an attempt to get my talking points prepared for them.
Moving to Marten
First off, Marten is really just a fancy data access library against the outstanding Postgresql database engine. Marten utilizes Postgresql’s JSONB type to efficiently store and query against our document data. We have deliberately based some of the most basic API usage on RavenDb where that made sense in order to make the transition to Marten easier for our teams, but Marten has deviated quite a bit in more advanced usage.
Here’s what I want our teams to know when we switch things over:
- Marten is ACID all the way down. No more WaitForNonStaleResults() nonsense, no more subtle bugs or unstable automated tests from stale data. Some folks have poked back at this in Marten by claiming that eventual consistency is necessary for performance or scalability. So far, all our experimentation suggests that Marten’s Postgresql-backed writes – with ACID – are measurably faster than RavenDb.
- Marten does not force you to declare which indexes you want to use for any given query. Postgresql itself can figure out the most efficient execution plan for itself. This is going to be advantageous for us in a couple ways. First by letting us rip a lot of RavenDb index code out. Secondly by making it much easier to optimize database performance without having to have so much impact on the code like it is today with RavenDb.
- We need more documentation and blog posts on this topic, but it is perfectly possible to use the relational database features of Postgresql where that’s still valuable.
- If it’s useful, it is possible to use Dapper in conjunction with Marten and even in the same unit of work/transaction.
- Just like RavenDb, Marten’s IDocumentSession is effectively the unit of work and should be scoped to a logical transaction. In most cases in our systems that effectively translates to an IDocumentSession per HTTP request or service bus message.
- There is no hard request throttling in Marten. You should be aware of how many network round trips you’re making during a single operation and there are diagnostics to track that, but Marten will not blow up in production because an operation happened to make too many requests.
- There’s no equivalent to RavenDb’s embedded data store option. That was the killer feature in RavenDb we’re going to miss the most. Fortunately, it’s pretty easy to spin up Postgresql on your own box. For automated testing scenarios where today we just use a brand new RavenDb data store, we’ll just take advantage of Marten’s “database cleaner” to wipe out state in between tests. In a way, this will simplify some of our testing against distributed systems. If this becomes a problem for test performance, we have a couple fallback plans to either host Postgresql in disposable Docker images or to enhance our testing harnesses to leapfrog clean schemas between tests.
- Most importantly, if there’s something in Marten you don’t like, you can either do a pull request or at least raise an issue in GitHub where I’ll see it and we can get it fixed. OSS FTW!
- We don’t use this in our internal systems (but we should), but the “Include()” feature in Marten for fetching related documents in one round trip is quite different than Raven’s.
- Batch querying in Marten is more explicit and different mechanically than RavenDb’s “Futures.” We should be using this feature to reduce network chattiness between applications and the database.
- I am highly recommending the usage of the Compiled Query feature in Marten that has no equivalent in RavenDb for better runtime performance and even as a declarative query model. This feature can be used in combination with “Include()” and batch querying to maximize the performance of your Marten backed persistence.
- You can use any tooling you want that’s compatible with Postgresql to poke and prod a Marten-ized database. I just use pgAdmin, but Datagrip or even just Visual Studio is useful.
- Marten has quite a few more useful diagnostic abilities you can use to analyze the SQL being generated or track database activity by session. In a later blog post, I’ll talk about the reusable recipe we’ve built for Marten integration into FubuMVC applications.
Why we’re getting off of RavenDb
I’ve been asked several times since we started working on Marten in public what it would take for us to change our minds and continue with RavenDb. I think it’s quite possible that Voron will make a positive difference, but as I’ll explain a little below, we just don’t trust RavenDb’s quality and software engineering practices.
So why are we wanting to move away from RavenDb?
- We’ve had multiple day+ outages due to RavenDb indexes getting corrupted and being unable to rebuild. That in a nutshell is more than enough reason to move on.
- We’ve been concerned for years with RavenDb’s internal quality. We’ve experienced a number of regression bugs when changing versions of RavenDb to the point where we’re unwilling to even try upgrading it.
- Their release and versioning strategies are not consistent with Semantic Versioning, so you never know if you’re going to get breaking changes in minor or revision level version changes
- Unresponsive support when we’ve had production issues with RavenDb
- We’ve not had a lot of success with the DevOps type tooling around RavenDb (replication, etc.) and we’re hopeful that adopting Postgresql helps out on that front.
- Resource utilization. RavenDb requires a lot of handholding to keep the memory utilization reasonable. Naive usage of RavenDb almost invariably leads to problems.
- The stale data issue as a result of RavenDb’s eventual consistency strategy has been a major source of friction for us
Building a Producer Consumer Queue with TPL Dataflow
I had never used the TPL Dataflow library until this summer and I was very pleasantly surprised at how easy and effective it was.
In my last post I introduced the new “Async Daemon” feature in Marten that allows you to continuously update projected views over the event store as new events are captured in the system. In essence, the async daemon has to do two things:
- Fetch event data from the underlying Postgresql database and put it into the form that the projections and event processors expect
- Run the event data previously fetched through each projection or event processor and commit any projected document views back to the database.
Looking at it that way, the async daemon looks like a good fit for a producer/consumer queue. In this case, the event fetching “produces” batches of events for the projection “consumer” to process downstream. The goal of this approach is to improve overall throughput by allowing the fetching and processing to happen in parallel.
I had originally assumed that I would use Reactive Extensions for the async daemon, but after way too much research and dithering back and forth on my part, I decided that the TPL Dataflow library was a better fit in this particular case.
The producer/consumer queue inside of the async daemon consists of a couple main players:
- The Fetcher class is the “producer” that continuously polls the database for the new events. It’s smart enough to pause the polling if there are no new events in the database, but otherwise it’s pretty dumb.
- An instance of the IProjection interface that does the actual work of processing events or updating projected documents from the events.
- The ProjectionTrack class acts as a logical controller to both Fetcher and IProjection
- A pair of ActionBlock‘s from the TPL Dataflow library used as the consumer queue for processing events and a second queue for coordinating the activities within ProjectionTrack.
In the pure happy path workflow of the async daemon, it functions like this sequence diagram below:

The Fetcher object runs continuously fetching a new “page” of events and queues each page where it will be consumed by ProjectionTrack in its ExecutePage() method in a different thread.
The usage of the ActionBlock objects to connect the workflow together turned out to be pretty simple. In the following code taken from the ProjectionTrack class, I’m setting up the ActionBlock for the execution queue with a lambda to call the ExecutePage() method. One thing to notice is that I had to configure a couple options to ensure that each item enqueued to that ActionBlock is executed serially in the same order that it was received.
_executionTrack
= new ActionBlock<EventPage>(page => ExecutePage(page, _cancellation.Token),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
EnsureOrdered = true
});
The value of the ActionBlock class usage is that it does all the heavy lifting for me in regards to the threading. The ActionBlock will trigger the ExecutePage() method in a different thread and ensure that every page is executed sequentially.
Incorporating Backpressure
I also wanted to incorporate the idea of “back pressure” so that if the event fetching producer is getting too far ahead of the event processing consumer, the async daemon would stop fetching new events to prevent spikes in memory usage and possibly reserve more system resources for the consumer until the consumer could catch up.
To do that, there’s a little bit of logic in ProjectionTrack that checks how many events are queued up in the execution track shown above and pauses the Fetcher if the configured threshold is exceeded:
public async Task CachePage(EventPage page)
{
// Accumulator is just a little helper used to
// track how many events are in flight
Accumulator.Store(page);
// If the consumer is backed up, stop fetching
if (Accumulator.CachedEventCount > _projection.AsyncOptions.MaximumStagedEventCount)
{
_logger.ProjectionBackedUp(this, Accumulator.CachedEventCount, page);
await _fetcher.Pause().ConfigureAwait(false);
}
_executionTrack?.Post(page);
}
When the consumer works through enough of the staged events, ProjectionTrack knows to restart the Fetcher to begin producing new pages of events:
// This method is called after every EventPage is successfully
// executed
public Task StoreProgress(Type viewType, EventPage page)
{
Accumulator.Prune(page.To);
if (shouldRestartFetcher())
{
_fetcher.Start(this, Lifecycle);
}
return Task.CompletedTask;
}
The actual “cooldown” logic inside of ProjectionTrack is implemented in this method:
private bool shouldRestartFetcher()
{
if (_fetcher.State == FetcherState.Active) return false;
if (Lifecycle == DaemonLifecycle.StopAtEndOfEventData && _atEndOfEventLog) return false;
if (Accumulator.CachedEventCount <= _projection.AsyncOptions.CooldownStagedEventCount &&
_fetcher.State == FetcherState.Paused)
{
return true;
}
return false;
}
To make this more concrete, by default Marten will pause a Fetcher if the consuming queue has over 1,000 events and won’t restart the Fetcher until the queue goes below 500. Both thresholds are configurable.
As I said in my last post, I thought that the async daemon overall was very challenging, but I felt that the usage of TPL Dataflow went very smoothly.
Doing it the Old Way with BlockingCollection
In the past, I’ve used the BlockingCollection to build producer/consumer queues in .Net. In the Storyteller project, I used producer/consumer queues to parallelize executing batches of specifications by dividing the work in stages that all do some kind of work on a “SpecExecutionRequest” object (read in the specification file, do some preparation work to build a “plan”, and finally to actually execute the specification). At the heart of that is a the ConsumingQueue class that allows you to queue up tasks for one of these SpecExecutionRequest stages:
public class ConsumingQueue : IDisposable, IConsumingQueue
{
private readonly BlockingCollection<SpecExecutionRequest> _collection =
new BlockingCollection<SpecExecutionRequest>(new ConcurrentBag<SpecExecutionRequest>());
private Task _readingTask;
private readonly Action<SpecExecutionRequest> _handler;
public ConsumingQueue(Action<SpecExecutionRequest> handler)
{
_handler = handler;
}
public void Dispose()
{
_collection.CompleteAdding();
_collection.Dispose();
}
// This does not block the caller
public void Enqueue(SpecExecutionRequest plan)
{
_collection.Add(plan);
}
private void runSpecs()
{
// This loop runs continuously and calls _handler() for
// each plan added to the queue in the method above
foreach (var request in _collection.GetConsumingEnumerable())
{
if (request.IsCancelled) continue;
_handler(request);
}
}
public void Start()
{
_readingTask = Task.Factory.StartNew(runSpecs);
}
}
For more context, you can see how these ConsumingQueue objects are assembled and used in the SpecificationEngine class in the Storyteller codebase.
After doing it both ways, I think I prefer the TPL Dataflow approach over the older BlockingCollection mechanism.
Offline Event Processing in Marten with the new “Async Daemon”
The feature I’m talking about here was very difficult to write, brand new, and definitely in need of some serious user testing from anyone interested in kicking the tires on it. We’re getting a lot of interest in the Marten Gitter room about doing the kinds of use cases that the async daemon described below is meant to address. This was also the very last feature on Marten’s “must have for 1.0” list, so there’s a new 1.0-alpha nuget for Marten. 1.0 is still at least a couple months away, but it’s getting closer.
A couple weeks ago I pulled the trigger on a new, but long planned, feature in Marten we’ve been calling the “async daemon” that allows users to build and update projected views against the event store data in a background process hosted in your application or an external service.
To put this in context, let’s say that you are building an application to track the status of a Github repositories with event sourcing for the persistence. In this application, you would record events for things like:
- Project started
- A commit pushed into the main branch
- Issue opened
- Issue closed
- Issue re-opened
There’s a lot of value to be had by recording the raw event data, but you still need to frequently see a rolled up view of each project that can tell you the total number of open issues, closed issues, how many lines of code are in the project, and how many unique contributors are involved.
To do that rollup, you can build a new document type called ActiveProject just to present that information. Optionally, you can use Marten’s built in support for making aggregated projections across a stream by adding Apply([Event Type]) methods to consume events. In my end to end tests for the async daemon, I used this version of ActiveProject (the raw code is on GitHub if the formatting is cut off for you):
public class ActiveProject
{
public ActiveProject()
{
}
public ActiveProject(string organizationName, string projectName)
{
ProjectName = projectName;
OrganizationName = organizationName;
}
public Guid Id { get; set; }
public string ProjectName { get; set; }
public string OrganizationName { get; set; }
public int LinesOfCode { get; set; }
public int OpenIssueCount { get; set; }
private readonly IList<string> _contributors = new List<string>();
public string[] Contributors
{
get { return _contributors.OrderBy(x => x).ToArray(); }
set
{
_contributors.Clear();
_contributors.AddRange(value);
}
}
public void Apply(ProjectStarted started)
{
ProjectName = started.Name;
OrganizationName = started.Organization;
}
public void Apply(IssueCreated created)
{
OpenIssueCount++;
}
public void Apply(IssueReopened reopened)
{
OpenIssueCount++;
}
public void Apply(IssueClosed closed)
{
OpenIssueCount--;
}
public void Apply(Commit commit)
{
_contributors.Fill(commit.UserName);
LinesOfCode += (commit.Additions - commit.Deletions);
}
}
Now, you can update projected views in Marten at the time of event capture with what we call “inline projections.” You could also build the aggregated view on demand from the underlying event data. Both of those solutions can be appropriate in some cases, but if our GitHub projects are very active with a fair amount of concurrent writes to any given project stream, we’d probably be much better off to move the aggregation updates to a background process.
That’s where the async daemon comes into play. If you have a Marten document store, you can start up a new instance of the async daemon like so (the underlying code shown below is in GitHub):
[Fact]
public async Task build_continuously_as_events_flow_in()
{
// In the test here, I'm just adding an aggregation for ActiveProject
StoreOptions(_ =>
{
_.Events.AsyncProjections.AggregateStreamsWith<ActiveProject>();
});
using (var daemon = theStore.BuildProjectionDaemon(logger: _logger, settings: new DaemonSettings
{
LeadingEdgeBuffer = 1.Seconds()
}))
{
// Start all of the configured async projections
daemon.StartAll();
// This just publishes event data
await _fixture.PublishAllProjectEventsAsync(theStore);
// Runs all projections until there are no more events coming in
await daemon.WaitForNonStaleResults().ConfigureAwait(false);
await daemon.StopAll().ConfigureAwait(false);
}
// Compare the actual data in the ActiveProject documents with
// the expectation
_fixture.CompareActiveProjects(theStore);
}
In the code sample above I’m starting an async daemon to run the ActiveProject projection updating, and running a series of events through the event store. The async daemon is continuously detecting newly available events and applying those to the correct ActiveProject document. This is the only place in Marten where we utilize the idea of eventual consistency to allow for faster writes, but it’s clearly warranted in some cases.
Rebuilding a Projection From Existing Data
If you’re going to use event sourcing with read side projections (the “Q” in your CQRS architecture), you’re probably going to need a way to rebuild projected views from the existing data to fix bugs or add new data. You’ll also likely introduce new projected views after the initial rollout to production. You’ll absolutely need to rebuild projected view data in development as you’re iterating your system.
To that end, you can also use the async daemon to completely tear down and rebuild the population of a projected document view from the existing event store data.
// This is just some test setup to establish the DocumentStore
StoreOptions(_ => { _.Events.AsyncProjections.AggregateStreamsWith<ActiveProject>(); });
// Publishing some pre-canned event data
_fixture.PublishAllProjectEvents(theStore);
using (var daemon = theStore.BuildProjectionDaemon(logger: _logger, settings: new DaemonSettings
{
LeadingEdgeBuffer = 0.Seconds()
}))
{
await daemon.Rebuild<ActiveProject>().ConfigureAwait(false);
}
Taken from the tests for the async daemon on Github.
Other Functionality Possibilities
The async daemon can be described as just a mechanism to accurately and reliably execute the events in order through the IProjection interface shown below:
public interface IProjection
{
Type[] Consumes { get; }
Type Produces { get; }
AsyncOptions AsyncOptions { get; }
void Apply(IDocumentSession session, EventStream[] streams);
Task ApplyAsync(IDocumentSession session, EventStream[] streams, CancellationToken token);
}
Today, the only built in projections in Marten are to do one for one transformations of a certain event type to a view document and the aggregation by stream use case shown above in the ActiveProject example. However, there’s nothing preventing you from creating your own custom IProjection classes to:
- Aggregate views across streams grouped by some kind of classification like region, country, person, etc.
- Project event data into flat relational tables for more efficient reporting
- Do complex event processing
What’s Next for the Async Daemon
The async daemon is the only major thing missing from the Marten documentation, and I need to fill that in soon. This blog post is just a down payment on the async daemon docs.
I cut a lot of content out on how the async daemon works. Since I thought this was one of the hardest things I’ve ever coded myself, I’d like to write a post next week just about designing and building the async daemon and see if I can trick some folks into effectively doing a code review on it;)
This was my first usage of the TPL Dataflow library and I was very pleasantly surprised by how much I liked using it. If I’m ambitious enough, I’ll write a post later on building producer/consumer queues and using back pressure with the dataflow classes.