Wolverine’s Improved Azure Service Bus Support

Wolverine 1.6.0 came out today, and one of the main themes was a series of improvements to the Azure Service Bus integration with Wolverine. In addition to the basic support Wolverine already had support for messaging with Azure Service Bus queues, topics, and subscriptions, you can now use native scheduled delivery, session identifiers for FIFO delivery, and expanded options for conventional routing topology.

First though, to get started with Azure Service Bus and Wolverine, install the WolverineFx.AzureServiceBus with the Nuget mechanism of your choice:

dotnet add package WolverineFx.AzureServiceBus

Next, you’ll add just a little bit to your Wolverine bootstrapping like this:

using var host = await Host.CreateDefaultBuilder()
    .UseWolverine((context, opts) =>
    {
        // One way or another, you're probably pulling the Azure Service Bus
        // connection string out of configuration
        var azureServiceBusConnectionString = context
            .Configuration
            .GetConnectionString("azure-service-bus");

        // Connect to the broker in the simplest possible way
        opts.UseAzureServiceBus(azureServiceBusConnectionString)
            .AutoProvision()
            .UseConventionalRouting();
    }).StartAsync();

Native Message Scheduling

You can now use native Azure Service Bus scheduled delivery within Wolverine without any explicit configuration beyond what you already do to connect to Azure Service Bus. Putting that into perspective, if you have a message type name ValidateInvoiceIsNotLate that is routed to an Azure Service Bus queue or subscription, you can use this feature:

public async Task SendScheduledMessage(IMessageContext bus, Guid invoiceId)
{
    var message = new public async Task SendScheduledMessage(IMessageContext bus, Guid invoiceId)
{
    var message = new ValidateInvoiceIsNotLate
    {
        InvoiceId = invoiceId
    };

    // Schedule the message to be processed in a certain amount
    // of time
    await bus.ScheduleAsync(message, 30.Days());

    // Schedule the message to be processed at a certain time
    await bus.ScheduleAsync(message, DateTimeOffset.Now.AddDays(30));
}
    {
        InvoiceId = invoiceId
    };

    // Schedule the message to be processed in a certain amount
    // of time
    await bus.ScheduleAsync(message, 30.Days());

    // Schedule the message to be processed at a certain time
    await bus.ScheduleAsync(message, DateTimeOffset.Now.AddDays(30));
}

That would also apply to scheduled retry error handling if the endpoint is also Inline:

using var host = Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        opts.Policies.OnException<TimeoutException>()
            // Just retry the message again on the
            // first failure
            .RetryOnce()

            // On the 2nd failure, put the message back into the
            // incoming queue to be retried later
            .Then.Requeue()

            // On the 3rd failure, retry the message again after a configurable
            // cool-off period. This schedules the message
            .Then.ScheduleRetry(15.Seconds())

            // On the next failure, move the message to the dead letter queue
            .Then.MoveToErrorQueue();

    }).StartAsync();

Topic & Subscription Conventions

The original conventional routing with Azure Service Bus just sent and listened to queues named after the message type within the application. Wolverine 1.6 adds an additional routing convention to publish outgoing messages to topics and listen for known handled messages with topics and subscriptions. In all cases, you can customize the convention naming and any element of the Wolverine listening, sending, or any of the effected Azure Service Bus topics or subscriptions.

The syntax for this option is shown below:

opts.UseAzureServiceBusTesting()
    .UseTopicAndSubscriptionConventionalRouting(convention =>
    {
        // Optionally control every aspect of the convention and
        // its applicability to types
        // as well as overriding any listener, sender, topic, or subscription
        // options
    })

    .AutoProvision()
    .AutoPurgeOnStartup();

Session Identifiers and FIFO Delivery

You can now take advantage of sessions and first-in, first out queues in Azure Service Bus with Wolverine. To tell Wolverine that an Azure Service Bus queue or subscription should require sessions, you have this syntax shown in an internal test:

_host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        opts.UseAzureServiceBusTesting()
            .AutoProvision().AutoPurgeOnStartup();

        opts.ListenToAzureServiceBusQueue("send_and_receive");
        opts.PublishMessage<AsbMessage1>().ToAzureServiceBusQueue("send_and_receive");

        opts.ListenToAzureServiceBusQueue("fifo1")
            
            // Require session identifiers with this queue
            .RequireSessions()
            
            // This controls the Wolverine handling to force it to process
            // messages sequentially
            .Sequential();
        
        opts.PublishMessage<AsbMessage2>()
            .ToAzureServiceBusQueue("fifo1");

        opts.PublishMessage<AsbMessage3>().ToAzureServiceBusTopic("asb3");
        opts.ListenToAzureServiceBusSubscription("asb3")
            .FromTopic("asb3")
            
            // Require sessions on this subscription
            .RequireSessions(1)
            
            .ProcessInline();
    }).StartAsync();

Wolverine is using the “group-id” nomenclature from the AMPQ standard, but for Azure Service Bus, this is directly mapped to the SessionId property on the Azure Service Bus client internally.

To publish messages to Azure Service Bus with a session id, you will need to of course supply the session id:

// bus is an IMessageBus
await bus.SendAsync(new AsbMessage3("Red"), new DeliveryOptions { GroupId = "2" });
await bus.SendAsync(new AsbMessage3("Green"), new DeliveryOptions { GroupId = "2" });
await bus.SendAsync(new AsbMessage3("Refactor"), new DeliveryOptions { GroupId = "2" });

You can also send messages with session identifiers through cascading messages as shown in a fake message handler below:

public static IEnumerable<object> Handle(IncomingMessage message)
{
    yield return new Message1().WithGroupId("one");
    yield return new Message2().WithGroupId("one");

    yield return new Message3().ScheduleToGroup("one", 5.Minutes());

    // Long hand
    yield return new Message4().WithDeliveryOptions(new DeliveryOptions
    {
        GroupId = "one"
    });
}

One thought on “Wolverine’s Improved Azure Service Bus Support

Leave a comment