Building a Producer Consumer Queue with TPL Dataflow

I had never used the TPL Dataflow library until this summer and I was very pleasantly surprised at how easy and effective it was. 

In my last post I introduced the new “Async Daemon” feature in Marten that allows you to continuously update projected views over the event store as new events are captured in the system. In essence, the async daemon has to do two things:

  1. Fetch event data from the underlying Postgresql database and put it into the form that the projections and event processors expect
  2. Run the event data previously fetched through each projection or event processor and commit any projected document views back to the database.

Looking at it that way, the async daemon looks like a good fit for a producer/consumer queue. In this case, the event fetching “produces” batches of events for the projection “consumer” to process downstream. The goal of this approach is to improve overall throughput by allowing the fetching and processing to happen in parallel.

I had originally assumed that I would use Reactive Extensions for the async daemon, but after way too much research and dithering back and forth on my part, I decided that the TPL Dataflow library was a better fit in this particular case.

The producer/consumer queue inside of the async daemon consists of a couple main players:

  • The Fetcher class is the “producer” that continuously polls the database for the new events. It’s smart enough to pause the polling if there are no new events in the database, but otherwise it’s pretty dumb.
  • An instance of the IProjection interface that does the actual work of processing events or updating projected documents from the events.
  • The ProjectionTrack class acts as a logical controller to both Fetcher and IProjection
  • A pair of ActionBlock‘s from the TPL Dataflow library used as the consumer queue for processing events and a second queue for coordinating the activities within ProjectionTrack.

 

In the pure happy path workflow of the async daemon, it functions like this sequence diagram below:

AsyncDaemonSequence

The Fetcher object runs continuously fetching a new “page” of events and queues each page where it will be consumed by ProjectionTrack in its ExecutePage() method in a different thread.

The usage of the ActionBlock objects to connect the workflow together turned out to be pretty simple. In the following code taken from the ProjectionTrack class, I’m setting up the ActionBlock for the execution queue with a lambda to call the ExecutePage() method. One thing to notice is that I had to configure a couple options to ensure that each item enqueued to that ActionBlock is executed serially in the same order that it was received.

_executionTrack 
    = new ActionBlock<EventPage>(page => ExecutePage(page, _cancellation.Token),
	new ExecutionDataflowBlockOptions
	{
		MaxDegreeOfParallelism = 1,
		EnsureOrdered = true
	});

The value of the ActionBlock class usage is that it does all the heavy lifting for me in regards to the threading. The ActionBlock will trigger the ExecutePage() method in a different thread and ensure that every page is executed sequentially.

Incorporating Backpressure

I also wanted to incorporate the idea of “back pressure” so that if the event fetching producer is getting too far ahead of the event processing consumer, the async daemon would stop fetching new events to prevent spikes in memory usage and possibly reserve more system resources for the consumer until the consumer could catch up.

To do that, there’s a little bit of logic in ProjectionTrack that checks how many events are queued up in the execution track shown above and pauses the Fetcher if the configured threshold is exceeded:

public async Task CachePage(EventPage page)
{
	// Accumulator is just a little helper used to
	// track how many events are in flight
	Accumulator.Store(page);

	// If the consumer is backed up, stop fetching
	if (Accumulator.CachedEventCount > _projection.AsyncOptions.MaximumStagedEventCount)
	{
		_logger.ProjectionBackedUp(this, Accumulator.CachedEventCount, page);
		await _fetcher.Pause().ConfigureAwait(false);
	}


	_executionTrack?.Post(page);
}

When the consumer works through enough of the staged events, ProjectionTrack knows to restart the Fetcher to begin producing new pages of events:

// This method is called after every EventPage is successfully
// executed
public Task StoreProgress(Type viewType, EventPage page)
{
	Accumulator.Prune(page.To);

	if (shouldRestartFetcher())
	{
		_fetcher.Start(this, Lifecycle);
	}

	return Task.CompletedTask;
}

The actual “cooldown” logic inside of ProjectionTrack is implemented in this method:

private bool shouldRestartFetcher()
{
	if (_fetcher.State == FetcherState.Active) return false;

	if (Lifecycle == DaemonLifecycle.StopAtEndOfEventData && _atEndOfEventLog) return false;

	if (Accumulator.CachedEventCount <= _projection.AsyncOptions.CooldownStagedEventCount &&
		_fetcher.State == FetcherState.Paused)
	{
		return true;
	}

	return false;
}

To make this more concrete, by default Marten will pause a Fetcher if the consuming queue has over 1,000 events and won’t restart the Fetcher until the queue goes below 500. Both thresholds are configurable.

 

As I said in my last post, I thought that the async daemon overall was very challenging, but I felt that the usage of TPL Dataflow went very smoothly.

Doing it the Old Way with BlockingCollection

In the past, I’ve used the BlockingCollection to build producer/consumer queues in .Net. In the Storyteller project, I used producer/consumer queues to parallelize executing batches of specifications by dividing the work in stages that all do some kind of work on a “SpecExecutionRequest” object (read in the specification file, do some preparation work to build a “plan”, and finally to actually execute the specification). At the heart of that is a the ConsumingQueue class that allows you to queue up tasks for one of these SpecExecutionRequest stages:

    public class ConsumingQueue : IDisposable, IConsumingQueue
    {
        private readonly BlockingCollection<SpecExecutionRequest> _collection =
            new BlockingCollection<SpecExecutionRequest>(new ConcurrentBag<SpecExecutionRequest>());

        private Task _readingTask;
        private readonly Action<SpecExecutionRequest> _handler;

        public ConsumingQueue(Action<SpecExecutionRequest> handler)
        {
            _handler = handler;
        }

        public void Dispose()
        {
            _collection.CompleteAdding();
            _collection.Dispose();
        }

        // This does not block the caller
        public void Enqueue(SpecExecutionRequest plan)
        {
            _collection.Add(plan);
        }

        private void runSpecs()
        {
            // This loop runs continuously and calls _handler() for
            // each plan added to the queue in the method above
            foreach (var request in _collection.GetConsumingEnumerable())
            {
                if (request.IsCancelled) continue;

                _handler(request);
            }
        }

        public void Start()
        {
            _readingTask = Task.Factory.StartNew(runSpecs);
        }
    }

For more context, you can see how these ConsumingQueue objects are assembled and used in the SpecificationEngine class in the Storyteller codebase.

After doing it both ways, I think I prefer the TPL Dataflow approach over the older BlockingCollection mechanism.

 

 

 

 

Offline Event Processing in Marten with the new “Async Daemon”

The feature I’m talking about here was very difficult to write, brand new, and definitely in need of some serious user testing from anyone interested in kicking the tires on it. We’re getting a lot of interest in the Marten Gitter room about doing the kinds of use cases that the async daemon described below is meant to address. This was also the very last feature on Marten’s “must have for 1.0” list, so there’s a new 1.0-alpha nuget for Marten. 1.0 is still at least a couple months away, but it’s getting closer.

A couple weeks ago I pulled the trigger on a new, but long planned, feature in Marten we’ve been calling the “async daemon” that allows users to build and update projected views against the event store data in a background process hosted in your application or an external service.

To put this in context, let’s say that you are building an application to track the status of a Github repositories with event sourcing for the persistence. In this application, you would record events for things like:

  • Project started
  • A commit pushed into the main branch
  • Issue opened
  • Issue closed
  • Issue re-opened

There’s a lot of value to be had by recording the raw event data, but you still need to frequently see a rolled up view of each project that can tell you the total number of open issues, closed issues, how many lines of code are in the project, and how many unique contributors are involved.

To do that rollup, you can build a new document type called ActiveProject just to present that information. Optionally, you can use Marten’s built in support for making aggregated projections across a stream by adding Apply([Event Type]) methods to consume events. In my end to end tests for the async daemon, I used this version of ActiveProject (the raw code is on GitHub if the formatting is cut off for you):

    public class ActiveProject
    {
        public ActiveProject()
        {
        }

        public ActiveProject(string organizationName, string projectName)
        {
            ProjectName = projectName;
            OrganizationName = organizationName;
        }

        public Guid Id { get; set; }
        public string ProjectName { get; set; }

        public string OrganizationName { get; set; }

        public int LinesOfCode { get; set; }

        public int OpenIssueCount { get; set; }

        private readonly IList<string> _contributors = new List<string>();

        public string[] Contributors
        {
            get { return _contributors.OrderBy(x => x).ToArray(); }
            set
            {
                _contributors.Clear();
                _contributors.AddRange(value);
            }
        }

        public void Apply(ProjectStarted started)
        {
            ProjectName = started.Name;
            OrganizationName = started.Organization;
        }

        public void Apply(IssueCreated created)
        {
            OpenIssueCount++;
        }

        public void Apply(IssueReopened reopened)
        {
            OpenIssueCount++;
        }

        public void Apply(IssueClosed closed)
        {
            OpenIssueCount--;
        }

        public void Apply(Commit commit)
        {
            _contributors.Fill(commit.UserName);
            LinesOfCode += (commit.Additions - commit.Deletions);
        }
    }

Now, you can update projected views in Marten at the time of event capture with what we call “inline projections.” You could also build the aggregated view on demand from the underlying event data. Both of those solutions can be appropriate in some cases, but if our GitHub projects are very active with a fair amount of concurrent writes to any given project stream, we’d probably be much better off to move the aggregation updates to a background process.

That’s where the async daemon comes into play. If you have a Marten document store, you can start up a new instance of the async daemon like so (the underlying code shown below is in GitHub):

[Fact] 
public async Task build_continuously_as_events_flow_in()
{
    // In the test here, I'm just adding an aggregation for ActiveProject
    StoreOptions(_ =>
    {
        _.Events.AsyncProjections.AggregateStreamsWith<ActiveProject>();
    });

    using (var daemon = theStore.BuildProjectionDaemon(logger: _logger, settings: new DaemonSettings
    {
        LeadingEdgeBuffer = 1.Seconds()
    }))
    {
        // Start all of the configured async projections
        daemon.StartAll();

        // This just publishes event data
        await _fixture.PublishAllProjectEventsAsync(theStore);


        // Runs all projections until there are no more events coming in
        await daemon.WaitForNonStaleResults().ConfigureAwait(false);

        await daemon.StopAll().ConfigureAwait(false);
    }

    // Compare the actual data in the ActiveProject documents with 
    // the expectation
    _fixture.CompareActiveProjects(theStore);
}

In the code sample above I’m starting an async daemon to run the ActiveProject projection updating, and running a series of events through the event store. The async daemon is continuously detecting newly available events and applying those to the correct ActiveProject document. This is the only place in Marten where we utilize the idea of eventual consistency to allow for faster writes, but it’s clearly warranted in some cases.

Rebuilding a Projection From Existing Data

If you’re going to use event sourcing with read side projections (the “Q” in your CQRS architecture), you’re probably going to need a way to rebuild projected views from the existing data to fix bugs or add new data. You’ll also likely introduce new projected views after the initial rollout to production. You’ll absolutely need to rebuild projected view data in development as you’re iterating your system.

To that end, you can also use the async daemon to completely tear down and rebuild the population of a projected document view from the existing event store data.

// This is just some test setup to establish the DocumentStore
StoreOptions(_ => { _.Events.AsyncProjections.AggregateStreamsWith<ActiveProject>(); });

// Publishing some pre-canned event data
_fixture.PublishAllProjectEvents(theStore);


using (var daemon = theStore.BuildProjectionDaemon(logger: _logger, settings: new DaemonSettings
{
    LeadingEdgeBuffer = 0.Seconds()
}))
{
    await daemon.Rebuild<ActiveProject&gt().ConfigureAwait(false);
}

Taken from the tests for the async daemon on Github.

Other Functionality Possibilities

The async daemon can be described as just a mechanism to accurately and reliably execute the events in order through the IProjection interface shown below:

    public interface IProjection
    {
        Type[] Consumes { get; }
        Type Produces { get; }

        AsyncOptions AsyncOptions { get; }
        void Apply(IDocumentSession session, EventStream[] streams);
        Task ApplyAsync(IDocumentSession session, EventStream[] streams, CancellationToken token);
    }

Today, the only built in projections in Marten are to do one for one transformations of a certain event type to a view document and the aggregation by stream use case shown above in the ActiveProject example. However, there’s nothing preventing you from creating your own custom IProjection classes to:

  • Aggregate views across streams grouped by some kind of classification like region, country, person, etc.
  • Project event data into flat relational tables for more efficient reporting
  • Do complex event processing

 

 

 

What’s Next for the Async Daemon

The async daemon is the only major thing missing from the Marten documentation, and I need to fill that in soon. This blog post is just a down payment on the async daemon docs.

I cut a lot of content out on how the async daemon works. Since I thought this was one of the hardest things I’ve ever coded myself, I’d like to write a post next week just about designing and building the async daemon and see if I can trick some folks into effectively doing a code review on it;)

This was my first usage of the TPL Dataflow library and I was very pleasantly surprised by how much I liked using it. If I’m ambitious enough, I’ll write a post later on building producer/consumer queues and using back pressure with the dataflow classes.

StructureMap 4.3 Fully Embraces CoreCLR

EDIT 8/2: A lot of folks are asking me why SM targets both Netstandard 1.3 and 1.5 and I left that explanation out of the blog post because I was in too much of a hurry yesterday. The only single difference is that with 1.5 StructureMap can try to load an assembly by file path, which only comes into play if you’re using StructureMap to discover assemblies from the file path and an assembly name does not match the file name. I thought it was worthwhile to drop down to 1.3 without that small feature to expand StructureMap’s reach. We’ll see if I’m even remotely right.

I just uploaded StructureMap 4.3 to Nuget today. The big change (95% of my work) was to completely embrace the new world order of CoreCLR, the dotnet cli, and (for now) the project.json system. As such, I consolidated all of the real code back into the root StructureMap.dll project and relied on conditional compilation. This release also adds a great deal of functionality for type scanning and assembly scanning to the CoreCLR targets that were previously only available in the full .Net framework version.

StructureMap >=4.0 supported the CoreCLR through the old “dotnet” target, but we were only compiling to that target. Between users having Nuget issues with the old nomenclature and a CoreCLR specific bug, it was time to convert all the way and make sure that the tests were running against the CoreCLR version of StructureMap on our CI server.

What a StructureMap user needs to know before adopting 4.3…

  • The Nuget now targets .Net 4.5, Netstandard 1.3, and Netstandard 1.5
  • PCL profiles have been dropped for now, but I’m willing to try to put that back in if anyone requests it. That’s definitely a place where I’d love to have some help because I don’t do any mobile development that would test out that build.
  • The old StructureMap.Net4 assembly that used to be in the StructureMap nuget is gone. I’m relying on conditional compilation instead.
  • Any project that uses FubuMVC 3’s service bus should probably update to 4.3 for a big performance optimization that was impacting that functionality.

 

The complete list of changes and bug fixes is here.

Indexing Options in Marten

The road to Marten 1.0 continues with a discussion of the indexing options that we directly support against Postgresql.

We’re aiming to make Marten be usable for a wide range of application scenarios, and an obvious one is to make querying faster. To that end, Marten has direct support for adding a couple different flavors of indexes to optimize querying.

In all cases, Marten’s schema migration support can detect changes, additions, and removals of index definitions.

Calculated Index

After getting some feedback from a 2nd Quadrant consultant, the recommended path for optimizing queries against JSON documents in Marten is to use a Postgresql calculated index, which Marten can build for you with:

    var store = DocumentStore.For(_ =>
    {
        _.Connection(ConnectionSource.ConnectionString);
        _.Schema.For<Issue>().Index(x => x.Number);
    });

Marten creates this index behind the scenes against the Issue storage table:

CREATE INDEX mt_doc_issue_idx_number ON public.mt_doc_issue 
    ((CAST(data ->> 'Number' as integer)));

The advantages of using a calculated index are that you’re not duplicating storage and you’re causing fewer database schema changes as compared to our original “Duplicated Field” approach that’s described in the next section.

It’s not shown here, but there is some ability to configure how the calculated index is created. See the documentation on calculated indexes for an example of that usage.

I’ve been asked several times what it would take for Sql Server to add before it could support Marten. The calculated index feature as applicable to the JSONB data type isn’t explicitly necessary, but it’s a big advantage that Postgresql has over Sql Server at the moment.

Duplicated Field

Marten’s original approach was to optimize querying against designated fields by just duplicating the value within the JSON document into a separate database table column, and indexing that column. Marten does this behind the scenes when you use the Foreign Key option. Some of our users will opt for a duplicated field if they want to issue their own queries against the document table without having to worry about JSON locators.

To make a field duplicated, you can either use the [Duplicated] attribute:

    public class Team
    {
        public Guid Id { get; set; }

        [DuplicateField]
        public string Name { get; set; }
    }

Or you can specify the duplicated fields in the StoreOptions for your document store:

    using (var store = DocumentStore.For(_ =>
    {
        _.Connection(ConnectionSource.ConnectionString);

        _.Schema.For<User>().Duplicate(x => x.UserName);
    }))
    {

    }

If you decide to add a duplicated field to an existing document type, Marten’s schema migration support is good enough to add the column and fill in the values from the JSON document as part of patching. Even so, we will recommend the computed index approach in the section above to simplify your schema migrations.

It’s not shown here, but you have quite a bit of flexibility in configuring exactly what index type and applicability. See the documentation on duplicated fields for an example.

Gin Index

If you’re needing to issue a lot of variable adhoc queries against a Marten document, you may want to opt for a Gin index. A gin index against a Postgresql JSONB object creates a generalized index of key/value pairs and arrays within the parsed JSON document. To add a Gin index to a Marten document type, you need to explicitly configure that document type like this:

    var store = DocumentStore.For(_ =>
    {
        _.Schema.For<Issue>().GinIndexJsonData();
    });

You can also decorate your document class with the [GinIndexed] attribute. It’s not shown above, but there are options to customize the index generated.

When the DDL for the Issue document is generated, you would see a new index added to its table like this one:

CREATE INDEX mt_doc_issue_idx_data ON public.mt_doc_issue USING gin ("data" jsonb_path_ops);

Do note that using a Gin index against a document type will result in slightly slower inserts and updates to that table. From our testing, it’s not that big of a hit, but still something to be aware of.

Soft Deletes in Marten

Yet more content on new features in Marten leading up to the 1.0 release coming soon. These posts aren’t getting many reads, but they’ll be stuck in Google/Bing for later users, so expect a couple more of these.

As part of the 1.0 release work, Marten gained the capability last week (>0.9.8) to support the concept of “soft deletes.” Instead of just deleting a document completely out of the database, a soft delete would just mark a “deleted” column in the database table to denote that the row is now obsolete. The value of a “soft delete” is simply that you don’t lose any data from the historic record. The downsides are now you’ve got more rows cluttering up your database and you probably need to filter out deleted documents from your queries.

To see this in action, let’s first configure a document type in Marten as soft deleted:

    var store = DocumentStore.For(_ =>
    {
        _.Connection(ConnectionSource.ConnectionString);
        _.Schema.For<User>().SoftDeleted();
    });

By default, Marten does a hard delete of the row, so you’ll need to explicitly opt into soft deletes per document type. There is also a [SoftDeleted] attribute in Marten that you could use to decorate a document type to specify that it should be soft deleted.

When a document type is soft deleted, Marten adds a couple extra fields to the document storage table in the database called “mt_deleted” and “mt_deleted_at” just to track whether and when a document was deleted.

In usage, it’s transparent to the user that you’re doing a soft delete instead of a hard delete:

    // Create a new User document
    var user = new User();
    session.Store(user);
    session.SaveChanges();

    // Mark it deleted
    session.Delete(user);
    session.SaveChanges();

As I said earlier, one of your challenges is to filter out deleted documents in queries. Fortunately, Marten has you covered. As the following acceptance test from Marten shows, deleted documents are automatically filtered out of the Linq query results:

    [Fact]
    public void query_soft_deleted_docs()
    {
        var user1 = new User { UserName = "foo" };
        var user2 = new User { UserName = "bar" };
        var user3 = new User { UserName = "baz" };
        var user4 = new User { UserName = "jack" };

        using (var session = theStore.OpenSession())
        {
            session.Store(user1, user2, user3, user4);
            session.SaveChanges();

            // Deleting 'bar' and 'baz'
            session.DeleteWhere<User>(x => x.UserName.StartsWith("b"));
            session.SaveChanges();

            // no where clause, deleted docs should be filtered out
            session.Query<User>().OrderBy(x => x.UserName).Select(x => x.UserName)
                .ToList().ShouldHaveTheSameElementsAs("foo", "jack");

            var sql = session.Query<User>().OrderBy(x => x.UserName).Select(x => x.UserName).ToCommand().CommandText;
                _output.WriteLine(sql);

            // with a where clause
                session.Query<User>().Where(x => x.UserName != "jack")
                .ToList().Single().UserName.ShouldBe("foo");
        }
    }

Easy peasy. Of course you may want to query against the deleted documents or against all the documents. Marten’s got you covered there too, you can use these two custom Linq extensions in Marten to include deleted documents:

  1. “IDocumentSession.Query<T>().MaybeDeleted()…” will include all documents in the query, regardless of whether or not they have been deleted
  2. “IDocumentSession.Query<T>().IsDeleted()…” will only include documents marked as deleted

Avoid the Serialization Burn with Marten’s Patching API

This is a logical follow up to my last post on Document Transformations in Marten with Javascript and yet another signpost on the way to Marten 1.0. Instead of having to write your own Javascript, Marten supplies the “Patch API” described here for very common scenarios.

Before I started working on Marten, I read an article comparing the performance of writing and querying JSON data between MongoDB and Postgresql (I couldn’t find the link when I was writing this post). Long story short, Postgresql very clearly comes out on top in terms of throughput, but the author still wanted to stick with MongoDB because of its ability to do document patching where you’re able to change elements within the persisted document without having to first load it into your application, change it, and persist the whole thing back. It’s a fair point and a realistic scenario that I used with RavenDb’s Patch Commands in the past.

Fortunately, that argument is a moot point because we have a working “Patch API” model in Marten for doing document patches. This feature does require PLV8 be enabled in your Postgresql database if you want to play with this feature in our latest nugets.

For an example, let’s say that you want to change the user name of a User document without first loading it. To update a single property or field in a document by its Id, it’s just this:

public void change_user_name(IDocumentSession session, Guid userId, string newName)
{
    session.Patch<User>(userId).Set(x => x.UserName, newName);
    session.SaveChanges();
}

When IDocumentSession.SaveChanges() (or its async equivalent) is called, it will send all the patching requests queued up with all of the pending document changes in a single database call.

I should also point out that the Set() mechanism can be used with nested properties or fields and non-primitive types.

Looking at another example, what if you just want to add a new role to an existing User? For that, Marten exposes the Append() method:

public void append_role(IDocumentSession session, Guid userId, string role)
{
    session.Patch<User>(userId).Append(x => x.Roles, role);
    session.SaveChanges();
}

In the case above, the new role will be appended to the “Roles” collection in the persisted JSON document in the Postgresql database. Again, this method can be used for nested or deep properties or fields and with non-primitive elements.

As a third example, let’s say that you only want to increment some kind of counter in the JSON document:

public void increment_login_count(IDocumentSession session, Guid userId)
{
    session.Patch<User>(userId).Increment(x => x.LoginCount);
    session.SaveChanges();
}

When the above command is issued, Marten will find the current numeric value in the JSON document, add 1 to it (the increment is an optional argument not shown here), and persist the new JSON data without ever fetching it into the client. The Increment() method can be used with int’s, long’s, double’s, and float’s.

Lastly, if you want to make a patch update to many documents by some kind of criteria, you can do that too:

public void append_role_to_internal_users(IDocumentSession session, Guid userId, string role)
{
    // Adds the role to all internal users
    session.Patch<User>(x => x.Internal)
        .Append(x => x.Roles, role);

    session.SaveChanges();
}

Other “Patch” mechanisms include the ability to rename a property or field within the JSON document and the ability to insert an item into a child collection at a given index. Other patch mechanisms are planned for later as well.

Document Transformations in Marten with Javascript

In all likelihood, Marten would garner much more rapid adoption if we were able to build on top of Sql Server 2016 instead of Postgresql. Hopefully, .Net folks will be willing to try switching databases after they see how many helpful capabilities that Postgresql has that Sql Server can’t match yet. This blog post, yet one more stop along the way to Marten 1.0, demonstrates how we’re taking advantage of Postgresql’s built in Javascript engine (PLV8). 

A Common .Net Approach to “Readside” Views

Let’s say that you’re building HTTP services, and some of your HTTP endpoints will need to return some sort of “readside” representation of your persisted domain model. For the purpose of making Marten shine, let’s say that you’re going to need to work with hierarchical data. In a common .Net technology stack, you’d:

  1. Load the top level model object through Entity Framework or some other kind of ORM. EF would issue a convoluted SQL query with lots of OUTER JOIN’s so that it can make a single call to the database to fetch the entire hierarchy of data you need from various tables. EF would then proceed to iterate through the sparsely populated recordset coming back and turn that into the actual domain model object represented by the data with lots of internally generated code.
  2. You’d then use something like AutoMapper to transform the domain model object into a “read side” Data Transfer Object (view models, etc.) that’s more suitable to going over the wire to clients outside of your service.
  3. Serialize your DTO to a JSON string and write that out to the HTTP response

Depending on how deep your hierarchy is, #1 can be expensive in the database query. The serialization in #3 is also somewhat CPU intensive.

As a contrast, here’s an example of how you might approach that exact same use case with Marten:

    var json = session.Query<User>()
        .Where(x => x.Id == user.Id)
        .TransformToJson("get_fullname").Single();

In the usage above, I’m retrieving the data for a single User document from Marten and having Postgresql transform the persisted JSON data to the format I need for the client with a pre-loaded Javascript transformation. In the case of Marten, the workflow is to:

  1. Find the entire hierarchical document JSON in a single database row by its primary key
  2. Apply a Javascript function to transform the persisted JSON to the format that the client needs and return a JSON representation as a String
  3. Stream the JSON from the Linq query directly to the HTTP response without any additional serialization work

Not to belabor the point too much, but the Marten mechanics are simpler and probably much more efficient at runtime because:

  • The underlying database query is much simpler if all the data is in one field in one row
  • The Javascript transformation probably isn’t that much faster or slower than the equivalent AutoMapper mechanics, so let’s call that a wash
  • You don’t have the in memory allocations to load a rich model object just to immediately transform that into a completely different model object
  • You avoid the now unnecessary cost of serializing the DTO view models to a JSON string

A couple additional points:

  • Jimmy Bogard reviewed this and pointed out that in some cases you could bypass the Domain Model to DTO transformation by selecting straight to the DTO, but that wouldn’t cover all cases by any means. The same limitations apply to Marten and its Select() transformation features.
  • To get even more efficient in your Marten usage, the Javascript transformations can be used inside of Marten’s Compiled Query feature to avoid the CPU cost of repetitively parsing Linq statements. You can also do Javascript transformations inside of batched queries – which can of course, also be combined with the aforementioned compiled queries;)

 

Now, let’s see how it all works…

 

Building the Javascript Function

The way this works in Marten is that you write your Javascript function into a single file and export the main function with the “module.exports = ” CommonJS syntax. Marten is expecting the main function to have the signature “function(doc)” and return the transformed document.

Here’s a sample Javascript function I used to test this feature that works against a User document type:

module.exports = function(doc) {
    return {fullname: doc.FirstName + ' ' + doc.LastName};
}

Given the persisted JSON for a User document, this transformation would return a different object that would then be streamed back to the client as a JSON string.

There is some thought and even infrastructure for doing Javascript transformations with multiple, related documents, but that feature won’t make it into Marten 1.0.

To load the function into a Javascript-enabled Postgresql schema, Marten exposes this method:

    var store = DocumentStore.For(_ =>
    {
        _.Connection(ConnectionSource.ConnectionString);

        // Let Marten derive the transform name
        // from the file name
        _.Transforms.LoadFile("get_fullname.js");

        // or override the transform name
        _.Transforms.LoadFile("get_fullname.js", "fullname");
    });

Internally, Marten will wrap a PLV8 function wrapper around your Javascript function like this:

CREATE OR REPLACE FUNCTION public.mt_transform_get_fullname(doc jsonb)
  RETURNS jsonb AS
$BODY$

  var module = {export: {}};

module.exports = function (doc) {
    return {fullname: doc.FirstName + ' ' + doc.LastName};
}

  var func = module.exports;

  return func(doc);

$BODY$
  LANGUAGE plv8 IMMUTABLE STRICT;


My intention with the approach shown above was to allow users to write simple Javascript functions and be able to test their transformations in simple test harnesses like Mocha. By having Marten wrap the raw Javascript in a generated PLV8 function, users won’t have to be down in the weeds and worrying about Postgresql mechanics.

Depending on the configuration, Marten is good enough to build, or rebuild, the function to match the current version of the Javascript code on the first usage of that transformation. The Javascript transforms are also part of our schema management support for database migrations.

 

Transformations

The persisted JSON documents in Marten are a reflection of your .Net classes. Great, that makes it absurdly easy to keep the database schema in synch with your application code at development time — especially compared to the typical development process against a relational database. However, what happens when you really do need to make breaking changes or additions to a document type but you already have loads of persisted documents in your Marten database with the old structure?

To that end, Marten allows you to use Javascript functions to alter the existing documents in the database. As an example, let’s go back to the User document type and assume for some crazy reason that we didn’t immediately issue a user name to some subset of users. As a default, we might just assign their user names by combining their first and last names like so:

module.exports = function (doc) {
    doc.UserName = (doc.FirstName + '.' + doc.LastName).toLowerCase();

    return doc;
}

To apply this transformation to existing rows in the database, Marten exposes this syntax:

    var store = DocumentStore.For(_ =>
    {
        _.Connection(ConnectionSource.ConnectionString);

        _.Transforms.LoadFile("default_username.js");
    });

    store.Transform
        .Where<User>("default_username", x => x.UserName == null);

When you run the code above, Marten will issue a single SQL statement that issues an UPDATE to the rows matching the given criteria by applying the Javascript function above to alter the existing document. No data is ever fetched or processed in the actual application tier.

Supercharging Marten with the Jil Serializer

Some blog posts you write for attention or self promotion, some you write just because you’re excited about the topic, and some posts you write just to try to stick some content for later into Google searches. This one’s all about users googling for this information down the road.

Out of the box, Marten uses Newtonsoft.Json as its primary JSON serialization mechanism. While Newtonsoft has outstanding customizability and the most flexible feature set, you can opt to forgo some of that flexibility in favor of higher performance by switching instead to the Jil serializer.

In the last couple months I finally made a big effort to be able to run Marten’s test suite using the Jil serializer. I had to make one small adjustment to our JilSerializer (turning on includeInherited), and a distressingly intrusive structural change to make the internal handling of Enum values (!) in Linq queries be dependent upon the internal serializer’s behavior for enum storage.

At this point, we’re not supplying a separate Marten.Jil adapter package, but the code to swap in Jil is just this class:

public class JilSerializer : ISerializer
{
    private readonly Options _options 
        = new Options(dateFormat: DateTimeFormat.ISO8601, includeInherited:true);

    public string ToJson(object document)
    {
        return JSON.Serialize(document, _options);
    }

    public T FromJson<T>(string json)
    {
        return JSON.Deserialize<T>(json, _options);
    }

    public T FromJson<T>(Stream stream)
    {
        return JSON.Deserialize<T>(new StreamReader(stream), _options);
    }

    public object FromJson(Type type, string json)
    {
        return JSON.Deserialize(json, type, _options);
    }

    public string ToCleanJson(object document)
    {
        return ToJson(document);
    }

    public EnumStorage EnumStorage => EnumStorage.AsString;
}

And this one line of code in your document store set up:

var store = DocumentStore.For(_ =>
{
    _.Connection("the connection string");

    // Replace the ISerializer w/ the JilSerializer
    _.Serializer<JilSerializer>();
});

A couple things to note about using Jil in place of Newtonsoft:

  • The enumeration persistence behavior is different from Newtonsoft as it stores enum values by their string representation. Most Marten users seem to prefer this anyway, but watch the value of the “EnumStorage” property in your custom serializer.
  • We’ve tried very hard with Marten to ensure that the Json stored in the database doesn’t require .Net type metadata, but the one thing we can’t address is having polymorphic child collections. For that particular use case, you’ll have to stick with Newtonsoft.Json and turn on its type metadata handling.

Optimistic Concurrency with Marten

The Marten community has been making substantial progress toward a potential 1.0-alpha release early next week. As part of that effort, I’m going to be blogging about the new changes and features.

Recent versions of Marten (>0.9.5) have a new feature that allows you to enforce offline optimistic concurrency checks against documents that you are attempting to persist. You would use this feature if you’re concerned about a document in your current session having been modified by another session since you originally loaded the document.

I first learned about this concept from Martin Fowler’s PEAA book. From his definition, offline optimistic concurrency:

Prevents conflicts between concurrent business transactions by detecting a conflict and rolling back the transaction.

In Marten’s case, you have to explicitly opt into optimistic versioning for each document type. You can do that with either an attribute on your document type like so:

    [UseOptimisticConcurrency]
    public class CoffeeShop : Shop
    {
        // Guess where I'm at as I code this?
        public string Name { get; set; } = "Starbucks";
    }

Or by using Marten’s configuration API to do it programmatically:

    var store = DocumentStore.For(_ =>
    {
        _.Connection(ConnectionSource.ConnectionString);

        // Configure optimistic concurrency checks
        _.Schema.For<CoffeeShop>().UseOptimisticConcurrency(true);
    });

Once optimistic concurrency is turned on for the CoffeeShop document type, a session will now only be able to update a document if the document has been unchanged in the database since it was initially loaded.

To demonstrate the failure case, consider the following  acceptance test from Marten’s codebase:

[Fact]
public void update_with_stale_version_standard()
{
    var doc1 = new CoffeeShop();
    using (var session = theStore.OpenSession())
    {
        session.Store(doc1);
        session.SaveChanges();
    }

    var session1 = theStore.DirtyTrackedSession();
    var session2 = theStore.DirtyTrackedSession();

    var session1Copy = session1.Load<CoffeeShop>(doc1.Id);
    var session2Copy = session2.Load<CoffeeShop>(doc1.Id);

    try
    {
        session1Copy.Name = "Mozart's";
        session2Copy.Name = "Dominican Joe's";

        // Should go through just fine
        session2.SaveChanges();

        // When session1 tries to save its changes, Marten will detect
        // that the doc1 document has been modified and Marten will
        // throw an AggregateException
        var ex = Exception<AggregateException>.ShouldBeThrownBy(() =>
        {
            session1.SaveChanges();
        });

        // Marten will throw a ConcurrencyException for each document
        // that failed its concurrency check
        var concurrency = ex.InnerExceptions.OfType<ConcurrencyException>().Single();
        concurrency.Id.ShouldBe(doc1.Id);
        concurrency.Message.ShouldBe($"Optimistic concurrency check failed for {typeof(CoffeeShop).FullName} #{doc1.Id}");
    }
    finally
    {
        session1.Dispose();
        session2.Dispose();
    }

    // Just proving that the document was not overwritten
    using (var query = theStore.QuerySession())
    {
        query.Load<CoffeeShop>(doc1.Id).Name.ShouldBe("Dominican Joe's");
    }

}

Marten is throwing an AggregateException for the entire batch of changes being persisted from SaveChanges()/SaveChangesAsync() after rolling back the current database transaction. The individual ConcurrencyException’s inside of the aggregated exception expose information about the actual document type and identity that failed.

Schema Management with Marten (Why document databases rock)

This blog post describes the preliminary support and thinking behind how we’ll manage schema changes to production using Marten. I’m writing this to further the conversation one of our teams is having about how best to accomplish this. Expect the details of how this works to change after we face real world usages for awhile;)

Some of our projects at work are transitioning from RavenDb to an OSS project I help lead called Marten that uses Postgresql as a fully fledged document database. Among the very large advantages of document databases over relational databases is how much simpler it is to evolve a system over time because it takes so much less mechanical work to keep your document database synchronized to the application code.

Exhibit #1 in the case against relational databases is the need for laboriously tracking database migrations (assuming you give a damn about halfway decent software engineering in regards to your database).

Let’s compare the steps in adding a new property to one of your persisted objects in your system. Using a relational database with any kind of ORM (even if it describes itself as “micro” or “simple”), your steps in some order would be to:

  1. Add the new property
  2. Add a migration script that adds a new column to your database schema
  3. Change your ORM mapping or SQL statements to reflect the new property

Using a document database approach like Marten’s, you’d:

  1. Add the new property and continue on with your day

Notice which list is clearly shorter and simpler — not to mention less error prone for that matter.

Marten does still need to create matching schema objects in your Postgresql database, and it’s unlikely that any self-respecting DBA is going to allow your application to have rights to execute schema changes programmatically, so we’re stuck needing some kind of migration strategy as we add document types, Javascript transformations, and retrofit indexes. Fortunately, we’ve got a decent start on doing just that that’s demonstrated below:

 

Just Get Stuff Done in Development!

As long as you have rights to alter your Postgresql database, you can happily set up Marten in one of the “AutoCreate” modes and not worry about schema changes at all as you happily code new features and change existing document types:

var store = DocumentStore.For(_ =>
{
    // Marten will create any new objects that are missing,
    // attempt to update tables if it can, but drop and replace
    // tables that it cannot patch. 
    _.AutoCreateSchemaObjects = AutoCreate.All;


    // Marten will create any new objects that are missing or
    // attempt to update tables if it can. Will *never* drop
    // any existing objects, so no data loss
    _.AutoCreateSchemaObjects = AutoCreate.CreateOrUpdate;


    // Marten will create missing objects on demand, but
    // will not change any existing schema objects
    _.AutoCreateSchemaObjects = AutoCreate.CreateOnly;
});

As long as you’re using a permissive auto creation mode, you should be able to code in your application model and let Marten change your development database as needed behind the scenes.

Patching Production Databases

In the next section, I demonstrate how to dump the entire data definition language (DDL) that matches your Marten configuration as if you were starting from an empty database, but first, I want to focus on how to make incremental changes between production or staging releases.

In the real world, you’re generally not going to allow your application to willy nilly make changes to the running schema and you’ll be forced into this setting:

var store = DocumentStore.For(_ =>
{
    // Marten will not create or update any schema objects 
    // and throws an exception in the case of a schema object
    // not reflecting the Marten confi
guration
    _.AutoCreateSchemaObjects = AutoCreate.None;
});

This leaves us with the problem of how to get our production database matching however we’ve configured Marten in our application code. At this point, our theory is that we’ll use the “WritePatch” feature to generate delta DDL files:

IDocumentStore.Schema.WritePatch(string file);

When this is executed against a configured Marten document store, it will loop through all of the known document types, javascript transforms, the event store usage, and check the configured storage against the actual database. Marten writes two files, one to move your schema “up” to match the configured document store, and a second “drop” file that would rollback your database schema to reverse the changes in the “up” file.

The patching today is able to:

  1. Add all new tables, indexes, and functions
  2. Detect when a generated function has changed and rebuild it after dropping the old version
  3. Determine which indexes are new or modified and generate the necessary DDL to match
  4. Add the event store schema objects if they’re active and missing
  5. Add the database objects Marten needs for its “Hilo” identity strategy

This is very preliminary, but my concept of how we’ll use this in real life (admittedly with some gaps) is to:

  • Use “AutoCreateSchemaObjects = AutoCreate.All” in development and CI and basically not worry at all about incremental schema changes.
  • For each deployment to staging or production, we’ll use the WritePatch() method shown above to generate a patch SQL file that will then be committed to Git.
  • I’m assuming that the patch SQL files generated by Marten could feed into a real database migration tool like RoundhousE, and we would incorporate RoundhousE into our automated deployments to execute the “up” scripts to the most current database version.

 

 

Dump all the Sql

If you just want a database script that will build all the necessary schema objects for your Marten configuration, you can export either a single file:

// Export the SQL to a file
store.Schema.WriteDDL("my_database.sql");

Or write a SQL file for each document type and functional area of Marten to a directory like this:

// Or instead, write a separate sql script
// to the named directory
// for each type of document
store.Schema.WriteDDLByType("some folder");

In the second usage, Marten also writes a file called “all.sql” that executes the constituent sql files in the correct order just in case you’re using Marten’s support for foreign keys between document types.

The SQL dumps from the two methods shown above will write out every possible database schema object necessary to support your Marten configuration (document types, the event store, and a few other things) including tables, the generated functions, indexes, and even a stray sequence or two.

Relational Databases are the Buggy Whips of Software Development

I think that there’s going to be a day when you tell your children stories about how we built systems against relational databases with ORM’s or stored procedures or hand written SQL and they’re going to be appalled at how bad we had it, much like I did when my grandfather told me stories about ploughing with a horse during the Great Depression.