A couple weeks ago I wrote a blog post on the new “Async Daemon” feature in Marten. This post is a bit that I cut out of that post just describing the challenges I faced and what I did to slide around the problems. For all Marten users that have been asking me about writing their own subsystem to read and process events offline, you really want to read this post to understand why that’s much harder than you’d think and why you do probably want to just help make the async daemon solid.
The first challenge for the async daemon was “knowing” when there are new events that need to be processed by async projections. When a projection runs, it needs to process the events in the same order that they were captured in. Since the async daemon was inevitably going to use some sort of polling (NOTIFY/LISTEN in Postgresql was not adequate by itself) to read events out of the event table, we needed a very efficient way to be able to page the event fetching without missing events.
We started Marten with the thought that we would try to accomplish that by having the event store enqueue the events in a rolling buffer table that some kind of offline process would poll and read, but we were talked out of that approach in discussions with a Postgresql consultant who was helping us at work. Moreover, as I worked through other use cases to rebuild projections from scratch or add new projections later, we realized that the rolling buffer table would never have worked for the async daemon.
We also experimented with using sequential Guid’s as the global identifier for events in the event store with the idea that we would be able to use that to key off of for the projections by always querying for “Id > [last event id encountered].” In my testing I was unable to get the sequential Guid algorithm to accurately order the event id’s, especially under a heavy parallel load.
In the end, we opted to make the event store table in Marten use a sequential long integer as its primary key, and backed that with a database SEQUENCE. That gave us a more reliable way to “know” what events were new for each individual projection. In testing I figured out pretty quickly that the async daemon was missing events when there’s a lot of concurrent events streaming in because of event sequence id’s being reserved from in flight transactions. To counteract that problem, I ended up taking a two step process:
- Limit the async daemon to only querying against events that were captured before some time of threshold (3 seconds is the default) to avoid missing events that are still in flight
- When the async daemon fetches a new page of events, it actually tries to check that there are no gaps in the event sequence, and if there is, it pauses a little bit, and tries again until there are no gaps in the sequence or if the subsequent fetch turns up the exact same data (leading the async daemon to believe that the missing events were rejected).
Those two steps — as far as I can tell — have eliminated the problems I was seeing before about missing events in flight. It did completely ruin a family dinner at our favorite Thai restaurant when I couldn’t make myself stop thinking about how to slide around the problems in event ordering;)
The other killer problem was in trying to make the async daemon resilient in the face of potential connectivity problems and occasional projection failures without losing any results. I’ll try to blog about that in a later post.
You know what they say: “If at first you don’t succeed, Thai, Thai, Thai again.”
Thanks for the post. Informative.
Was thinking about the gap problem just the other day. I think having a single transactor would help here as it could guarantee order without gaps and potentially signal the daemon to start read model projections.
I believe Datomic takes a similar approach although most likely for different reasons.
Thoughts?
I’ve approached a similar problem building an event store in Elixir using PostgreSQL as the underlying storage.[1] Currently I’ve gone for using a `bigint` event id column and a single writer process that assigns an incrementing identifier to each event on write. This limits throughput but is simple to implement and reliable as individual processes in Elixir are single threaded.
PostgreSQL sequences may contain gaps which can cause issues if you assume a contiguous sequence and don’t expect any gaps. A potential solution is to use a separate table with a single row containing the next id. Then use a function to read this value, increment it and use it as the event id all within a single transaction. With concurrent writes you to aggregate the written events, and ordered by their id, when consuming.
An optimal solution is to use PostgreSQL’s logical decoding feature[3]. This “provides infrastructure to stream the modifications performed via SQL to external consumers”. Ideal for the projection use case. Unfortunately it requires writing and installing a C extension to the database engine. Here’s an example plugin that captures PostgreSQL changes to Kafka[4].
[1] https://github.com/slashdotdash/eventstore
[2] http://www.varlena.com/GeneralBits/130.php
[3] https://www.postgresql.org/docs/9.5/static/logicaldecoding.html
[4] https://github.com/confluentinc/bottledwater-pg
WRT “NOTIFY/LISTEN in Postgresql was not adequate by itself” could you elaborate for those of us unfamiliar as to why?
A set of libraries for event sourcing and CQRS that my team built and has been using for the last few years dealt with these same problems and arrived at a similar design. The bottleneck that we hit next resulted from the global incrementing integer for all cursors: map-type projections can’t be run in parallel. While projection type X and projection type Y can advance in parallel on different machines, two different instances of projection X have to proceed serially. Read model database rebuild time went up steadily as a result.
We approached the problem from a different angle with our new projection library, which is to allow streams (along with the concept of what constitutes the order of data in a stream) to be defined by the projection code rather than wedded to the data schema. This enables parallelization of the projections and an increase in throughput scale proportional to the number of machines you want to throw at the problem, at least until your database runs out of connections. If it’s of interest, that library is here: https://github.com/jonsequitur/Alluvial.
I will definitely take a look, thanks!