The code shown in this post isn’t quite out on Nuget yet, but will be part of Marten v0.9 later this week when I catch up on documentation.
From the very beginning, we’ve always intended to use Marten and Postgresql’s JSONB support as the technical foundation for event sourcing persistence. The document database half of Marten has been a more urgent need for my company, but lately we have a couple project teams and several folks in our community interested in seeing the event store functionality proceed. Between a couple substantial pull requests and me finally getting some time to work on it, I think we finally have a simplistic event store implementation that’s ready for early adopters to pull it down and kick the tires on it.
Why a new Event Store?
NEventStore has been around for years, and we do use an older version on one of our big applications. It’s working, but I’m not a big fan of how it integrates into the rest of our architecture and it doesn’t provide any support for “readside projections.” Part of the rationale behind Marten’s event store design is the belief that we could eliminate a lot of our hand coded projection support in a big application.
There’s also GetEventStore as a standalone, language agnostic event store based on an HTTP protocol. While I like many elements of GetEventStore and am taking a lot of inspiration from that tool for Marten’s event store usage, we prefer to have our event sourcing based on the Postgresql database instead of a standalone store.
Why, you ask?
- Since we’re already going to be using Postgresql, this leads to fewer things to deploy and monitor
- We’ll be able to just use all the existing DevOps tools for Postgresql like replication and backups without having to bring in anything new
- Building on a much larger community and a more proven technical foundation
- Being able to do event capture, document database changes, and potentially just raw SQL commands inside of the same native database transaction. No application in our portfolio strictly uses event sourcing as the sole persistence mechanism, and I think this makes Marten’s event sourcing feature compelling.
- The projection documents published by Marten’s event store functionality are just more document types and you’ll have the full power of Marten’s Linq querying, compiled querying, and batched querying to fetch the readside data from the event storage.
Getting Started with Simple Event Capture
Marten is just a .Net client library (4.6 at the moment), so all you need to get started is to have access to a Postgresql 9.4/9.5 database and to install Marten via Nuget.
Because I’ve read way too much epic fantasy series, my sample problem domain is an application that records, analyses, and visualizes the status of quests. During a quest, you may want to record events like:
- QuestStarted
- MembersJoined
- MembersDeparted
- ArrivedAtLocation
With Marten, you would want to describe these events with simple, serializable DTO classes like this:
public class MembersJoined { public MembersJoined() { } public MembersJoined(int day, string location, params string[] members) { Day = day; Location = location; Members = members; } public int Day { get; set; } public string Location { get; set; } public string[] Members { get; set; } public override string ToString() { return $"Members {Members.Join(", ")} joined at {Location} on Day {Day}"; } }
Now that we have some event classes identified and built, we can start to store a “stream” of events for a given quest as shown in this code below:
var store = DocumentStore.For("your connection string"); var questId = Guid.NewGuid(); using (var session = store.OpenSession()) { var started = new QuestStarted {Name = "Destroy the One Ring"}; var joined1 = new MembersJoined(1, "Hobbiton", "Frodo", "Merry"); // Start a brand new stream and commit the new events as // part of a transaction session.Events.StartStream(questId, started, joined1); session.SaveChanges(); // Append more events to the same stream var joined2 = new MembersJoined(3, "Buckland", "Merry", "Pippen"); var joined3 = new MembersJoined(10, "Bree", "Aragorn"); var arrived = new ArrivedAtLocation { Day = 15, Location = "Rivendell" }; session.Events.Append(questId, joined2, joined3, arrived); session.SaveChanges(); }
Now, if we want to fetch back all of the events for our new quest, we can do that with:
using (var session = store.OpenSession()) { // events are an array of little IEvent objects // that contain both the actual event object captured // previously and metadata like the Id and stream version var events = session.Events.FetchStream(questId); events.Each(evt => { Console.WriteLine($"{evt.Version}.) {evt.Data}"); }); }
When I execute this code, this is the output that I get:
1.) Quest Destroy the One Ring started 2.) Members Frodo, Merry joined at Hobbiton on Day 1 3.) Members Merry, Pippen joined at Buckland on Day 3 4.) Members Aragorn joined at Bree on Day 10 5.) Arrived at Rivendell on Day 15
At this point, Marten is assigning a Guid id, timestamp, and version number to each event (but thanks to a recent pull request, you do not have to have an Id property or field on your event classes). I didn’t show it here, but you can also fetch all of the events for a stream by the version number or timestamp to perform historical state queries.
Projection Support
What I’m showing in this section is brand spanking new and isn’t out on Nuget yet, but will be as Marten v0.9 by the middle of this week (after I get the documentation updated).
So raw event streams might be useful to some of you, but we think that Marten will be most useful when you combine the raw event sourcing with “projections” that create parallel “readside” views of the event data suitable for consumption in the rest of your application for concerns like validation, business logic, or supplying data through HTTP services.
Let’s say that we need a view of our quest data just to see what the current member composition of our quest party is. In our event store usage, you would create an “aggregate” document class and teach Marten how to update that aggregate based on event data types. The easiest way to expose the aggregation right now is to expose public “Apply([event type])” methods like the QuestParty class shown below:
public class QuestParty { private readonly IList _members = new List(); public string[] Members { get { return _members.ToArray(); } set { _members.Clear(); _members.AddRange(value); } } public IList Slayed { get; } = new List(); public void Apply(MembersJoined joined) { _members.Fill(joined.Members); } public void Apply(MembersDeparted departed) { _members.RemoveAll(x => departed.Members.Contains(x)); } public void Apply(QuestStarted started) { Name = started.Name; } public string Name { get; set; } public Guid Id { get; set; } public override string ToString() { return $"Quest party '{Name}' is {Members.Join(", ")}"; } }
Without any further configuration, I could create a live aggregation of a given quest stream calculated on the fly by using this syntax:
using (var session = store.OpenSession()) { var party = session.Events.AggregateStream(questId); Console.WriteLine(party); }
When I execute the code above for our new “Destroy the One Ring” stream of events, this is the output:
Quest party 'Destroy the One Ring' is Frodo, Merry, Pippen, Aragorn
Great, but since it’s potentially expensive to calculate the QuestParty from large streams of events, we may opt to have the aggregated view built in our Marten database ahead of time. Let’s say that we’re okay with just having the aggregate view updated every time a quest event stream is appended to. For that case, you can register a “live” aggregation in your DocumentStore initialization with this code:
var store2 = DocumentStore.For(_ => { _.Events.AggregateStreamsInlineWith<QuestParty>(); });
With the configuration above, the “QuestParty” view of a stream of related quest events is updated as part of the transaction that is persisting the new events being appended to the event log. If we were using this configuration, then we would be able to query Marten for the “QuestParty” as just another document type:
If your browser is cutting off the code formatting, the syntax is “session.Load<QuestParty>(questId)” down below:
using (var session = store.OpenSession()) { var party = session.Load<QuestParty>(questId); Console.WriteLine(party); // or var party2 = session.Query<QuestParty>() .Where(x => x.Name == "Destroy the One Ring") .FirstOrDefault(); }
Our Projections “Vision”
A couple years ago, I got to do what turned into a proof of concept project for building out an event store on top of Postgresql’s JSON support. My thought for Marten’s projection support is largely taken from this blog post I wrote on the earlier attempt at writing an event store on Postgresql.
Today the projection ability is very limited. So far you can use the live or “inline” aggregation of a single stream shown above or a simple pattern that allows you to create a single readside document for a given event type.
The end state we envision is to be able to allow users to:
- Express projections in either .Net code or by using Javascript functions running inside of Postgresql itself
- To execute the projection building either “inline” with event capture for pure ACID, asynchronously for complicated aggregations or better performance (and there comes eventual consistency back into our lives), or do aggregations “live” on demand. We think that this break down of projection timings will give users the ability to handle systems with many writes, but few reads with on demand projections, or to handle systems with few writes, but many reads with inline projections.
- To provide and out of the box “async daemon” that you would host as a stateful process within your applications to continuously calculate projections in the background. We want to at least experiment with using Postgresql’s NOTIFY/LISTEN functionality to avoid making this a purely polling process.
- Support hooks to perform your own form of event stream processing using the existing IDocumentSessionListener mechanism and maybe some way to plug more processors into the queue reading in the async daemon described above
- Add some “snapshotting” functionality that allows you to perform aggregated views on top of occasional snapshots every X times an event is captured on an aggregate
- Aggregate data across streams
- Support arbitrary categorization of events across streams
Anything I didn’t cover that you’re wondering about — or just want to give us some “constructive feedback” — please feel free to pop into Marten’s Gitter room since we’re talking about this today anyway;)
What DevOps tools for Postgresql do you use for replication and backups? Looking to replace MS-Sql Server with Postgresql.
I found a bug in your code and street cred.
> var joined1 = new MembersJoined(1, “Hobbiton”, “Frodo”, “Merry”);
Heh, I fixed that in the docs somewhere, but I had to look up “Buckland.”