Marten 5.8 dropped over the weekend with mostly bug fixes, but one potentially useful new feature for projecting event data to plain old SQL tables. One of the strengths of Marten that we’ve touted from the beginning was the ability to mix document database features with event sourcing and old fashioned relational tables all with one database in a single application as your needs dictate.
Let’s dive right into a sample usage of this. If you’re a software developer long enough and move around just a little bit, you’re going to get sucked into building a workflow for importing flat files of dubious quality from external partners or customers. I’m going to claim that event sourcing is a good fit for this problem domain for event sourcing (and also suggesting this pretty strongly at work). That being said, here’s what the event types might look like that are recording the progress of a file import:
public record ImportStarted(
DateTimeOffset Started,
string ActivityType,
string CustomerId,
int PlannedSteps);
public record ImportProgress(
string StepName,
int Records,
int Invalids);
public record ImportFinished(DateTimeOffset Finished);
public record ImportFailed;
At some point, we’re going to want to apply some metrics to the execution history to understand the average size of the incoming files, what times of the day have more or less traffic, and performance information broken down by file size, file type, and who knows what. This sounds to me like a perfect use case for SQL queries against a flat table.
Enter Marten 5.8’s new functionality. First off, let’s do this simply by writing some explicit SQL in a new projection that we can replay against the existing events when we’re ready. I’m going to use Marten’s EventProjection as a base class in this case:
public class ImportSqlProjection: EventProjection
{
public ImportSqlProjection()
{
// Define the table structure here so that
// Marten can manage this for us in its schema
// management
var table = new Table("import_history");
table.AddColumn<Guid>("id").AsPrimaryKey();
table.AddColumn<string>("activity_type").NotNull();
table.AddColumn<DateTimeOffset>("started").NotNull();
table.AddColumn<DateTimeOffset>("finished");
SchemaObjects.Add(table);
// Telling Marten to delete the table data as the
// first step in rebuilding this projection
Options.DeleteDataInTableOnTeardown(table.Identifier);
}
public void Project(IEvent<ImportStarted> e, IDocumentOperations ops)
{
ops.QueueSqlCommand("insert into import_history (id, activity_type, started) values (?, ?, ?)",
e.StreamId, e.Data.ActivityType, e.Data.Started
);
}
public void Project(IEvent<ImportFinished> e, IDocumentOperations ops)
{
ops.QueueSqlCommand("update import_history set finished = ? where id = ?",
e.Data.Finished, e.StreamId
);
}
public void Project(IEvent<ImportFailed> e, IDocumentOperations ops)
{
ops.QueueSqlCommand("delete from import_history where id = ?", e.StreamId);
}
}
A couple notes about the code above:
- We’ve invested a huge amount of time in Marten and the related Weasel library building in robust schema management. The
Tablemodel I’m using up above comes from Weasel, and this allows a Marten application using this projection to manage the table creation in the underlying database for us. This new table would be part of all Marten’s built in schema management functionality. - The
QueueSqlCommand()functionality came in a couple minor releases ago, and gives you the ability to add raw SQL commands to be executed as part of a Marten unit of work transaction. It’s important to note that theQueueSqlCommand()method doesn’t execute inline, rather it adds the SQL you enqueue to be executed in a batch query when you eventually call the holdingIDocumentSession.SaveChangesAsync(). I can’t stress this enough, it has consistently been a big performance gain in Marten to batch up queries to the database server and reduce the number of network round trips. - The
Project()methods are a naming convention with Marten’sEventProjection. The first argument is always assumed to be the event type. In this case though, it’s legal to use Marten’sIEvent<T>envelope type to allow you access to event metadata like timestamps, version information, and the containing stream identity.
Now, let’s use Marten’s brand new FlatTableProjection recipe to do a little more advanced version of the earlier projection:
public class FlatImportProjection: FlatTableProjection
{
// I'm telling Marten to use the same database schema as the events from
// the Marten configuration in this application
public FlatImportProjection() : base("import_history", SchemaNameSource.EventSchema)
{
// We need to explicitly add a primary key
Table.AddColumn<Guid>("id").AsPrimaryKey();
TeardownDataOnRebuild = true;
Project<ImportStarted>(map =>
{
// Set values in the table from the event
map.Map(x => x.ActivityType).NotNull();
map.Map(x => x.CustomerId);
map.Map(x => x.PlannedSteps, "total_steps")
.DefaultValue(0);
map.Map(x => x.Started);
// Initial values
map.SetValue("status", "started");
map.SetValue("step_number", 0);
map.SetValue("records", 0);
});
Project<ImportProgress>(map =>
{
// Add 1 to this column when this event is encountered
map.Increment("step_number");
// Update a running sum of records progressed
// by the number of records on this event
map.Increment(x => x.Records);
map.SetValue("status", "working");
});
Project<ImportFinished>(map =>
{
map.Map(x => x.Finished);
map.SetValue("status", "completed");
});
// Just gonna delete the record of any failures
Delete<ImportFailed>();
}
}
A couple notes on this version of the code:
FlatFileProjectionis adding columns to its table based on the designated column mappings. You can happily customize theFlatFileProjection.Tableobject to add indexes, constraints, or defaults.- Marten is able to apply schema migrations and manage the table from the
FlatFileProjectionas long as it’s registered with Marten - When you call
Map(x => x.ActivityType), Marten is by default mapping that to a kebab-cased derivation of the member name for the column, so “activity_type”. You can explicitly map the column name yourself. - The call to
Map(expression)chains a fluent builder for the table column if you want to further customize the table column with default values or constraints like theNotNull() - In this case, I’m building a database row per event stream. The
FlatTableProjectioncan also map to arbitrary members of each event type - The
Project<T>(lambda)configuration leads to a runtime, code generation of a Postgresql upsert command so as to not be completely dependent upon events being captured in the exact right order. I think this will be more robust in real life usage than the first, more explicit version.
The FlatTableProjection in its first incarnation is not yet able to use event metadata because I got impatient to finish up 5.8 and punted on that for now. I think it’s safe to say this feature will evolve when it hits some real world usage.
Can we use event metadata today with MartenDB 7.x ?
Marten has had direct support for event metadata for quite awhile. See https://martendb.io/events/metadata.html#event-metadata
Yes, you are right, i forget to mention that i mean in FlatTableProjection. Actually, we use Marten.Events.Projections.EventProjection to create FlatTable. And for each event, we use QueueSqlCommand to insert, update or delete rows. Like :
ops.QueueSqlCommand($”update flat_table set statut = ? where id = ?”, “newStatut”, e.StreamId );
sometimes with id , sometimes i do the update where otherColumn = “…”
I tried to change the creation/modification of this flat_table with FlatTableProjection, like that :
Project<MyEvent>(map =>
{
map.SetValue(“status”, “newStatut”);
});
But, i cannot find how to do the Where clause ..
is it this complexity ?
Any answen about the question of Mohamed?
Also how to have access to the aggregate inside FlatImportProjection
Hey all, please use Discord for questions like this. I don’t monitor comments here well enough that you can expect answers:
https://discord.gg/xtWfkv2U