
Wolverine has long had some ability to schedule some messages to be executed at a later time or schedule messages to be delivered at a later time. The Wolverine 4.12 release last night added some probably overdue test automation helpers to better deal with scheduled messaging within integration testing against Wolverine applications — and that makes now a perfectly good time to talk about this capability within Wolverine!
First, let’s say that we’re just using Wolverine locally within the current system with a setup like this:
var builder = Host.CreateApplicationBuilder();
builder.Services.AddWolverine(opts =>
{
// The only thing that matters here is that you have *some* kind of
// envelope persistence for Wolverine configured for your application
var connectionString = builder.Configuration.GetConnectionString("postgres");
opts.PersistMessagesWithPostgresql(connectionString);
});
The only point being that we have some kind of message persistence set up in our Wolverine application because the message or execution scheduling depends on persisted envelope storage.
Wolverine actually does support in memory scheduling without any persistence, but that’s really only useful for scheduled error handling or fire and forget type semantics because you’d lose everything if the process is stopped.
So now let’s move on to simply telling Wolverine to execute a message locally at a later time with the IMessageBus service:
public static async Task use_message_bus(IMessageBus bus)
{
// Send a message to be sent or executed at a specific time
await bus.SendAsync(new DebitAccount(1111, 100),
new(){ ScheduledTime = DateTimeOffset.UtcNow.AddDays(1) });
// Same mechanics w/ some syntactical sugar
await bus.ScheduleAsync(new DebitAccount(1111, 100), DateTimeOffset.UtcNow.AddDays(1));
// Or do the same, but this time express the time as a delay
await bus.SendAsync(new DebitAccount(1111, 225), new() { ScheduleDelay = 1.Days() });
// And the same with the syntactic sugar
await bus.ScheduleAsync(new DebitAccount(1111, 225), 1.Days());
}
In the system above, all messages are being handled locally. To actually process the scheduled messages, Wolverine is as you’ve probably guessed, polling the message storage (PostgreSQL in the case above), and looking for any messages that are ready to be played. Here’s a few notes on the mechanics:
- Every node within a cluster is trying to pull in scheduled messages, but there’s some randomness in the timing to keep every node from stomping on each other
- Any one node will only pull in a limited “page” of scheduled jobs at a time so that if you happen to be going bonkers scheduling thousands of messages at one time, Wolverine can share the load across nodes and keep any one node from blowing up
- The scheduled messages are in Wolverine’s transactional inbox storage with a
Scheduledstatus. When Wolverine decides to “play” the messages, they move to anIncomingstatus before finally getting marked asHandledwhen they are successful - When scheduled messages for local execution are “played” in a Wolverine node, they are put into the local queue for that message, so all the normal rules for ordering or parallelization for that queue still apply.
Now, let’s move on to scheduling message delivery to external brokers. Just say you have any external routing rules like this:
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseRabbitMq()
// Opt into conventional Rabbit MQ routing
.UseConventionalRouting();
}).StartAsync();
And go back to the same syntax for sending messages, but this time the message will get routed to a Rabbit MQ exchange:
await bus.ScheduleAsync(new DebitAccount(1111, 100), DateTimeOffset.UtcNow.AddDays(1));
This time, Wolverine is still using its transactional inbox, but with a twist. When Wolverine knows that it is scheduling message delivery to an outside messaging mechanism, it actually schedules a local ScheduledEnvelope message that when executed, sends the original message to the outbound delivery point. In this way, Wolverine is able to support scheduled message delivery to every single messaging transport that Wolverine supports with a common mechanism.
With idiomatic Wolverine usage, you do want to try to keep most of your handler methods as “pure functions” for easier testing and frankly less code noise due to async/await mechanics. To that end, there’s a couple helpers to schedule messages in Wolverine using its cascading messages syntax:
public IEnumerable<object> Consume(MyMessage message)
{
// Go West in an hour
yield return new GoWest().DelayedFor(1.Hours());
// Go East at midnight local time
yield return new GoEast().ScheduledAt(DateTime.Today.AddDays(1));
}
The extension methods above would give you the raw message wrapped in a Wolverine DeliveryMessage<T> object where T is the wrapped message type. You can still use that type to write assertions in your unit tests.
There’s also another helper called “timeout messages” that help you create scheduled messages by subclassing a Wolverine base class. This is largely associated with sagas just because it’s commonly a need for timing out saga workflows.
Error Handling
The scheduled message support is also useful in error handling. Consider this code:
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Policies.OnException<TimeoutException>().ScheduleRetry(5.Seconds());
opts.Policies.OnException<SecurityException>().MoveToErrorQueue();
// You can also apply an additional filter on the
// exception type for finer grained policies
opts.Policies
.OnException<SocketException>(ex => ex.Message.Contains("not responding"))
.ScheduleRetry(5.Seconds());
}).StartAsync();
In the case above, Wolverine uses the message scheduling to take a message that just failed, move it out of the current receiving endpoint so other messages can proceed, then retries it no sooner than 5 seconds later (it won’t be real time perfect on the timing). This is an important difference than the RetryWithCooldown() mechanism that is effectively just doing an await Task.Delay(timespan) inline to purposely slow down the application.
As an example of how this might be useful, I’ve had to work with 3rd party systems where users can create a pessimistic lock on a bank account, so any commands against that account would always fail because of that lock. If you can tell that the command failure is because of a pessimistic lock in the exception message, you might tell Wolverine to retry that message an hour later when hopefully the lock is released, but clear out the current receiving endpoint and/or queue for other work that can proceed.
Testing with Scheduled Messaging
We’re having some trouble with the documentation publishing for some reason that we haven’t figured out yet, but there will be docs soon on this new feature.
Finally, on to some new functionality! Wolverine 4.12 just added some improvements to Wolverine’s tracked session testing feature specifically to help you with scheduled messages.
First, for some background, let’s say you have these simple handlers:
public static DeliveryMessage<ScheduledMessage> Handle(TriggerScheduledMessage message)
{
// This causes a message to be scheduled for delivery in 5 minutes from now
return new ScheduledMessage(message.Text).DelayedFor(5.Minutes());
}
public static void Handle(ScheduledMessage message) => Debug.WriteLine("Got scheduled message");
And now this test using the tracked session which shows the new first class support for scheduled messaging:
[Fact]
public async Task deal_with_locally_scheduled_execution()
{
// In this case we're just executing everything in memory
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "wolverine");
opts.Policies.UseDurableInboxOnAllListeners();
}).StartAsync();
// Should finish cleanly, even though there's going to be a message that is scheduled
// and doesn't complete
var tracked = await host.SendMessageAndWaitAsync(new TriggerScheduledMessage("Chiefs"));
// Here's how you can query against the messages that were detected to be scheduled
tracked.Scheduled.SingleMessage<ScheduledMessage>()
.Text.ShouldBe("Chiefs");
// This API will try to immediately play any scheduled messages immediately
var replayed = await tracked.PlayScheduledMessagesAsync(10.Seconds());
replayed.Executed.SingleMessage<ScheduledMessage>().Text.ShouldBe("Chiefs");
}
And a similar test, but this time where the scheduled messages are being routed externally:
var port1 = PortFinder.GetAvailablePort();
var port2 = PortFinder.GetAvailablePort();
using var sender = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.PublishMessage<ScheduledMessage>().ToPort(port2);
opts.ListenAtPort(port1);
}).StartAsync();
using var receiver = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.ListenAtPort(port2);
}).StartAsync();
// Should finish cleanly
var tracked = await sender
.TrackActivity()
.IncludeExternalTransports()
.AlsoTrack(receiver)
.InvokeMessageAndWaitAsync(new TriggerScheduledMessage("Broncos"));
tracked.Scheduled.SingleMessage<ScheduledMessage>()
.Text.ShouldBe("Broncos");
var replayed = await tracked.PlayScheduledMessagesAsync(10.Seconds());
replayed.Executed.SingleMessage<ScheduledMessage>().Text.ShouldBe("Broncos");
Here’s what’s new in the code above:
ITrackedSession.Scheduledis a bit special collection of all activity that happened during the tracked activity that led to messages being scheduled. You can use this just to interrogate what scheduled messages resulted from the original activity.ITrackedSession.PlayScheduledMessagesAsync()will “play” all scheduled messages right now and return a newITrackedSessionfor those messages. This method will immediately execute any messages that were scheduled for local execution and tries to immediately send any messages that were scheduled for later delivery to external transports.
The new support in the existing tracked session feature further extends Wolverine’s already extensive test automation story. This new work was done at the behest of a JasperFx Software client who is quite aggressive in their test automation. Certainly reach out to us at sales@jasperfx.net for any help you might want with your own efforts!