Wolverine Interoperability with Others

Wolverine is the relative newcomer on the scene for asynchronous messaging in the .NET ecosystem. While many Wolverine users are starting in greenfield circumstances, it’s far more likely that the exact shops who would be interested in Wolverine’s messaging support already have a significant amount of existing systems communicating with other messaging infrastructure solutions. And while I absolutely believe in Wolverine, there is likely no world in which it makes sense to completely replace every bit of existing messaging infrastructure all at once. Moreover, it’s also common to have systems built on completely other platforms or 3rd party systems that communicate with message queueing.

All that said, Wolverine obviously needs to have a strong interoperability story to enable its adoption. Other than the interoperability with NServiceBus through Rabbit MQ we needed at my previous company, I quite admittedly skimped a little on that in the original push to 1.0 as I inevitably start triaging user stories to make my self imposed deadline for 1.0 this summer.

In the past couple weeks there were several folks trying to utilize Wolverine to receive messages from external systems using various transports, so it turned into the perfect time to focus on improving Wolverine’s interoperability features in the recent Wolverine 1.7 release.

First, some links for more information:

Feel very free to skip down to the samples below that.

A Little Background

For just a little background, each messaging transport has a little bit different API for shuffling data between systems, but that mostly boils down to message body data and message metadata (headers). Wolverine (and other messaging alternatives) maps the specific messaging API of Rabbit MQ, Azure Service Bus, or AWS SQS into Wolverine’s internal Envelope representation. The message body itself would be deserialized into the actual .NET message type, and the rest of that metadata helps Wolverine perform distributed tracing through correlation identifiers, “know” how to send replies back to the original sender, and to even just know what the incoming message type is. That all works seamlessly when Wolverine is on both sides of the messaging pipe, but when interoperating with a non-Wolverine system you have to override Wolverine’s mapping between its Envelope model and the incoming and outgoing API for the underlying transport.

Fortunately, this mapping is either completely pluggable on an endpoint by endpoint basis, or you can now start with the built in mapping from Wolverine and selectively override a subset of the metadata mappings.

Receive “Just” JSON via Rabbit MQ

A prospective Wolverine user reached out to us on Discord to tell us about trying to receive pure JSON messages from a service written in Python (hence the image up above). After some internal changes in Wolverine 1.7, you can now receive “just” JSON to a Rabbit MQ queue assuming that that queue will only ever receive one message type by telling Wolverine what the default message type name is like this:

using var host = await Host.CreateDefaultBuilder()
    .UseWolverine((context, opts) =>
    {
        var rabbitMqConnectionString = context.Configuration.GetConnectionString("rabbit");

        opts.UseRabbitMq(rabbitMqConnectionString);

        opts.ListenToRabbitQueue("emails")
            // Tell Wolverine to assume that all messages
            // received at this queue are the SendEmail
            // message type
            .DefaultIncomingMessage<SendEmail>();
    }).StartAsync();

Also see the documentation on Rabbit MQ interoperability.

Interop with AWS SQS

Wolverine has to work with AWS SQS in a much different way than the other transports. Via a pull request in Wolverine 1.7, you can now receive “just” JSON from external systems via AWS SQS like this:

using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        opts.UseAmazonSqsTransport();

        opts.ListenToSqsQueue("incoming").ReceiveRawJsonMessage(
            // Specify the single message type for this queue
            typeof(Message1), 
            
            // Optionally customize System.Text.Json configuration
            o =>
            {
                o.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
            });
    }).StartAsync();

To send “just” JSON to external systems, use this:

using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        opts.UseAmazonSqsTransport();

        opts.PublishAllMessages().ToSqsQueue("outgoing").SendRawJsonMessage(
            // Specify the single message type for this queue
            typeof(Message1), 
            
            // Optionally customize System.Text.Json configuration
            o =>
            {
                o.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
            });
    }).StartAsync();

For custom interoperability strategies using AWS SQS, you can create your own implementation of the `ISqsEnvelopeMapper` interface like this one:

public class CustomSqsMapper : ISqsEnvelopeMapper
{
    public string BuildMessageBody(Envelope envelope)
    {
        // Serialized data from the Wolverine message
        return Encoding.Default.GetString(envelope.Data);
    }

    // Specify header values for the SQS message from the Wolverine envelope
    public IEnumerable<KeyValuePair<string, MessageAttributeValue>> ToAttributes(Envelope envelope)
    {
        if (envelope.TenantId.IsNotEmpty())
        {
            yield return new KeyValuePair<string, MessageAttributeValue>("tenant-id", new MessageAttributeValue{StringValue = envelope.TenantId});
        }
    }

    public void ReadEnvelopeData(Envelope envelope, string messageBody, IDictionary<string, MessageAttributeValue> attributes)
    {
        envelope.Data = Encoding.Default.GetBytes(messageBody);

        if (attributes.TryGetValue("tenant-id", out var att))
        {
            envelope.TenantId = att.StringValue;
        }
    }
}

And apply that to Wolverine endpoints like this:

using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        opts.UseAmazonSqsTransport()
            .UseConventionalRouting()

            .ConfigureListeners(l => l.InteropWith(new CustomSqsMapper()))

            .ConfigureSenders(s => s.InteropWith(new CustomSqsMapper()));

    }).StartAsync();

Interop with Azure Service Bus

You can create interoperability with non-Wolverine applications by writing a custom IAzureServiceBusEnvelopeMapper as shown in the following sample:

public class CustomAzureServiceBusMapper : IAzureServiceBusEnvelopeMapper
{
    public void MapEnvelopeToOutgoing(Envelope envelope, ServiceBusMessage outgoing)
    {
        outgoing.Body = new BinaryData(envelope.Data);
        if (envelope.DeliverWithin != null)
        {
            outgoing.TimeToLive = envelope.DeliverWithin.Value;
        }
    }

    public void MapIncomingToEnvelope(Envelope envelope, ServiceBusReceivedMessage incoming)
    {
        envelope.Data = incoming.Body.ToArray();
        
        // You will have to help Wolverine out by either telling Wolverine
        // what the message type is, or by reading the actual message object,
        // or by telling Wolverine separately what the default message type
        // is for a listening endpoint
        envelope.MessageType = typeof(Message1).ToMessageTypeName();
    }

    public IEnumerable<string> AllHeaders()
    {
        yield break;
    }
}

and apply that to various endpoints like this:

using var host = await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        opts.UseAzureServiceBus("some connection string")
            .UseConventionalRouting()

            .ConfigureListeners(l => l.InteropWith(new CustomAzureServiceBusMapper()))

            .ConfigureSenders(s => s.InteropWith(new CustomAzureServiceBusMapper()));
    }).StartAsync();

Interop with NServiceBus via Rabbit MQ

The original production usage of Wolverine was replacing NServiceBus for one service within a large constellation of services that all communicated asynchronously with Rabbit MQ. Unsurprisingly, Wolverine launched with a strong, fully functional interoperability between NServiceBus systems and Wolverine systems through Rabbit MQ with this usage taken from a test project within the Wolverine codebase:

Wolverine = await Host.CreateDefaultBuilder().UseWolverine(opts =>
{
    opts.UseRabbitMq()
        .AutoProvision().AutoPurgeOnStartup()
        .BindExchange("wolverine").ToQueue("wolverine")
        .BindExchange("nsb").ToQueue("nsb")
        .BindExchange("NServiceBusRabbitMqService:ResponseMessage").ToQueue("wolverine");

    opts.PublishAllMessages().ToRabbitExchange("nsb")

        // Tell Wolverine to make this endpoint send messages out in a format
        // for NServiceBus
        .UseNServiceBusInterop();

    opts.ListenToRabbitQueue("wolverine")
        .UseNServiceBusInterop()
        

        .UseForReplies();
    
    // This facilitates messaging from NServiceBus (or MassTransit) sending as interface
    // types, whereas Wolverine only wants to deal with concrete types
    opts.Policies.RegisterInteropMessageAssembly(typeof(IInterfaceMessage).Assembly);
}).StartAsync();

Interop with MassTransit via Rabbit MQ

A little less battle tested — and much weirder under the covers — is a similar interoperability recipe for talking to MassTransit applications via Rabbit MQ:

Wolverine = await Host.CreateDefaultBuilder().UseWolverine(opts =>
{
    opts.ApplicationAssembly = GetType().Assembly;

    opts.UseRabbitMq()
        .CustomizeDeadLetterQueueing(new DeadLetterQueue("errors", DeadLetterQueueMode.InteropFriendly))
        .AutoProvision().AutoPurgeOnStartup()
        .BindExchange("wolverine").ToQueue("wolverine")
        .BindExchange("masstransit").ToQueue("masstransit");

    opts.PublishAllMessages().ToRabbitExchange("masstransit")

        // Tell Wolverine to make this endpoint send messages out in a format
        // for MassTransit
        .UseMassTransitInterop();

    opts.ListenToRabbitQueue("wolverine")

        // Tell Wolverine to make this endpoint interoperable with MassTransit
        .UseMassTransitInterop(mt =>
        {
            // optionally customize the inner JSON serialization
        })
        .DefaultIncomingMessage<ResponseMessage>().UseForReplies();
}).StartAsync();

Leave a comment