
Through a combination of Marten community members and in collaboration with some JasperFx Software clients, we’re able to push some new fixes and functionality in Marten 7.34 just today.
For the F# Person in your Life
You can now use F# Option types in LINQ Where() clauses in Marten. Check out the pull request for that to see samples. The LINQ provider code is just a difficult problem domain, and I can’t tell you how grateful I am to have gotten the community pull request for this.
Fetch the Latest Aggregate
Marten has had the FetchForWriting() API for awhile now as our recommended way to build CQRS command handlers with Marten event sourcing as I wrote about recently in CQRS Command Handlers with Marten. Great, but…
- What if you just want a read only view of the current data for an aggregate projection over a single event stream and wouldn’t mind a lighter weight API than
FetchForWriting()? - What if in your command handler you used
FetchForWriting(), but now you want to return the now updated version of your aggregate projection for the caller of the command? And by the way, you want this to be as performant as possible no matter how the projection is configured.
Now you’re in luck, because Marten 7.34 adds the new FetchLatest() API for both of the bullets above.
Let’s pretend we’re building an invoicing system with Marten event sourcing and have this “self-aggregating” version of an Invoice:
public record InvoiceCreated(string Description, decimal Amount);
public record InvoiceApproved;
public record InvoiceCancelled;
public record InvoicePaid;
public record InvoiceRejected;
public class Invoice
{
public Invoice()
{
}
public static Invoice Create(IEvent<InvoiceCreated> created)
{
return new Invoice
{
Amount = created.Data.Amount,
Description = created.Data.Description,
// Capture the timestamp from the event
// metadata captured by Marten
Created = created.Timestamp,
Status = InvoiceStatus.Created
};
}
public int Version { get; set; }
public decimal Amount { get; set; }
public string Description { get; set; }
public Guid Id { get; set; }
public DateTimeOffset Created { get; set; }
public InvoiceStatus Status { get; set; }
public void Apply(InvoiceCancelled _) => Status = InvoiceStatus.Cancelled;
public void Apply(InvoiceRejected _) => Status = InvoiceStatus.Rejected;
public void Apply(InvoicePaid _) => Status = InvoiceStatus.Paid;
public void Apply(InvoiceApproved _) => Status = InvoiceStatus.Approved;
}
And for now, we’re going to let our command handlers just use a Live aggregation of the Invoice from the raw events on demand:
var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
{
opts.Connection(builder.Configuration.GetConnectionString("marten"));
// Just telling Marten upfront that we will use
// live aggregation for the Invoice aggregate
// This would be the default anyway if you didn't explicitly
// register Invoice this way, but doing so let's
// Marten "know" about Invoice for code generation
opts.Projections.LiveStreamAggregation<Projections.Invoice>();
});
Now we can get at the latest, greatest, current view of the Invoice that is consistent with the captured events for that invoice stream at this very moment with this usage:
public static async Task read_latest(
// Watch this, only available on the full IDocumentSession
IDocumentSession session,
Guid invoiceId)
{
var invoice = await session
.Events.FetchLatest<Projections.Invoice>(invoiceId);
}
The usage of the API above would be completely unchanged if you were to switch the projection lifecycle of the Invoice to be either Inline (where the view is updated in the database at the same time new events are captured) or Async. That usage gives you a little bit of what we called “reversibility” in the XP days, which just means that you’re easily able to change your mind later about exactly what projection lifecycle you want to use for Invoice views.
The main reason that FetchLatest() was envisioned, however, was to pair up with FetchForWriting() in command handlers. It’s turned out to be a common use case that folks want their command handlers to:
- Use the current state of the projected aggregate for the event stream to…
- “Decide” what new events should be appended to this stream based on the incoming command and existing state of the aggregate
- Save the changes
- Return a now updated version of the projected aggregate for the event stream with the newly captured events reflected in the projected aggregate.
There is going to be a slicker integration of this with Wolverine’s aggregate handler workflow with Marten by early next week, but for now, let’s pretend we’re working with Marten from within maybe ASP.Net Minimal API and want to just work that way. Let’s say that we have a little helper method for a mini-“Decider” pattern implementation for our Invoice event streams like this one:
public static class MutationExtensions
{
public static async Task<Projections.Invoice> MutateInvoice(this IDocumentSession session, Guid id, Func<Projections.Invoice, IEnumerable<object>> decider,
CancellationToken token = default)
{
var stream = await session.Events.FetchForWriting<Projections.Invoice>(id, token);
// Decide what new events should be appended based on the current
// state of the aggregate and application logic
var events = decider(stream.Aggregate);
stream.AppendMany(events);
// Persist any new events
await session.SaveChangesAsync(token);
return await session.Events.FetchLatest<Projections.Invoice>(id, token);
}
}
Which could be used something like:
public static Task Approve(IDocumentSession session, Guid invoiceId)
{
// I'd maybe suggest taking the lambda being passed in
// here out somewhere where it's easy to test
// Wolverine does that for you, so maybe just use that!
return session.MutateInvoice(invoiceId, invoice =>
{
if (invoice.Status != InvoiceStatus.Approved)
{
return [new InvoiceApproved()];
}
return [];
});
}
New Marten System Level “Archived” Event
Much more on this soon with an example more end to end with Wolverine explaining how this would add value for performance and testability.
Marten now has a built in event named Archived that can be appended to any event stream:
namespace Marten.Events;
/// <summary>
/// The presence of this event marks a stream as "archived" when it is processed
/// by a single stream projection of any sort
/// </summary>
public record Archived(string Reason);
When this event is appended to an event stream and that event is processed through any type of single stream projection for that event stream (snapshot or what we used to call a “self-aggregate”, SingleStreamProjection, or CustomProjection with the AggregateByStream option), Marten will automatically mark that entire event stream as archived as part of processing the projection. This applies for both Inline and Async execution of projections.
Let’s try to make this concrete by building a simple order processing system that might include this aggregate:
public class Item
{
public string Name { get; set; }
public bool Ready { get; set; }
}
public class Order
{
// This would be the stream id
public Guid Id { get; set; }
// This is important, by Marten convention this would
// be the
public int Version { get; set; }
public Order(OrderCreated created)
{
foreach (var item in created.Items)
{
Items[item.Name] = item;
}
}
public void Apply(IEvent<OrderShipped> shipped) => Shipped = shipped.Timestamp;
public void Apply(ItemReady ready) => Items[ready.Name].Ready = true;
public DateTimeOffset? Shipped { get; private set; }
public Dictionary<string, Item> Items { get; set; } = new();
public bool IsReadyToShip()
{
return Shipped == null && Items.Values.All(x => x.Ready);
}
}
Next, let’s say we’re having the Order aggregate snapshotted so that it’s updated every time new events are captured like so:
var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
{
opts.Connection("some connection string");
// The Order aggregate is updated Inline inside the
// same transaction as the events being appended
opts.Projections.Snapshot<Order>(SnapshotLifecycle.Inline);
// Opt into an optimization for the inline aggregates
// used with FetchForWriting()
opts.Projections.UseIdentityMapForAggregates = true;
})
// This is also a performance optimization in Marten to disable the
// identity map tracking overall in Marten sessions if you don't
// need that tracking at runtime
.UseLightweightSessions();
Now, let’s say as a way to keep our application performing as well as possible, we’d like to be aggressive about archiving shipped orders to keep the “hot” event storage table small. One way we can do that is to append the Archived event as part of processing a command to ship an order like so:
public static async Task HandleAsync(ShipOrder command, IDocumentSession session)
{
var stream = await session.Events.FetchForWriting<Order>(command.OrderId);
var order = stream.Aggregate;
if (!order.Shipped.HasValue)
{
// Mark it as shipped
stream.AppendOne(new OrderShipped());
// But also, the order is done, so let's mark it as archived too!
stream.AppendOne(new Archived("Shipped"));
await session.SaveChangesAsync();
}
}
If an Order hasn’t already shipped, one of the outcomes of that command handler executing is that the entire event stream for the Order will be marked as archived.









