Producer/Consumer Pattern with Wolverine

This is going to be the start of a series of short blog posts to show off some Wolverine capabilities. You may very well point out that there are some architectural anti-patterns in the system I’m describing, and I would actually agree. I’ll try to address that as we go along, but just know this is based on a very real system and reflects real world compromises that I was unhappy about at that time. Onward.

Wolverine has a strong in process queueing mechanism based around the TPL Dataflow library — a frequently overlooked gem in the .NET ecosystem. Today I want to show how that capability allows you to reliably implement the producer/consumer pattern for parallelizing work with some sort of constrained resource. Consider this system I helped design and build several years ago:

It’s a somewhat typical “ingestion” service to read flat files representing payments to a series of client loans ultimately managed and tracked in what I called the “Very Unreliable 3rd Party Web Service” up above. Our application is responsible for reading, parsing, and validating the payments from a flat file that’s inevitably dropped in a secured FTP site (my least favorite mechanism for doing large scale integrations, but sadly common). Once the data is parsed, we need to implement this workflow for each payment:

  1. Validate the required data elements and reject any that are missing necessary information
  2. Retrieve information about the related accounts and loans from the 3rd party service into our service
  3. Do further validations and determine how the payment is to be applied to the loan (how much goes toward interest, fees, or principal)
  4. Persist a more or less audit log of all the results
  5. Post the application of the payment to the 3rd party system

Now, let’s say that our application is receiving lots of these flat files throughout the day and it’s actually important for the business that these files be processed quickly. But there’s a catch in our potential throughput, the 3rd party web service can only handle one read request and one transactional request at one time without becoming unstable because of its oddball threading model (this sounds contrived, but it was unfortunately true).

I’m going to say this is a perfect place to use the consumer/producer model where we’ll:

  • Constrain the calls to the 3rd party web service to a dedicated activity thread that executes one at a time
  • Do the other work (parsing, validating, determining the payment schedule) in other activity threads where parallelization is possible

Admittedly starting in the middle of the process, let’s say that our Wolverine application detects a new file to the FTP site and publishes a new event to a message handler that will be the “Producer” to create individual payment workflows within our system:

    // This is a "producer" that is creating payment messages
    public static IEnumerable<object> Handle(
        FileDetected command, 
        IFileParser parser, 
        IPaymentValidator validator)
    {
        foreach (var payment in parser.ReadFile(command.FileName))
        {
            if (validator.TryValidate(payment, out var reasons))
            {
                yield return new PaymentValidated(payment);
            }
            else
            {
                yield return new PaymentRejected(payment, reasons);
            }
        }
    }

For each file, we’ll publish individual messages for each payment to parallelize work. That leads to the next stage of work to look up information from the 3rd party service:

    public static async Task<SchedulePayment> Handle(
        PaymentValidated @event, 
        IThirdPartyServiceGateway gateway,
        CancellationToken cancellation)
    {
        var information = await gateway.FindLoanInformationAsync(@event.Payment.LoanId, cancellation);
        return new SchedulePayment(@event.Payment, information);
    }

Note that the only work that particular handler is doing is calling the 3rd party service and handing off the results to yet another handler I’ll show next. Because the access to the 3rd party service is so constrained — and likely a performance bottleneck — we don’t want to be doing anything else in this handler but looking up data and passing it along.

On to the next handler where we’ll do a bit of pure business logic to “schedule” the payment and how it will be applied to the loan in terms of what gets applied to interest, fees, or principle related to the loan balance.

    public static PostPaymentSchedule Handle(SchedulePayment command, PaymentSchedulerRules rules)
    {
        // Carry out a ton of business rules about how the payment
        // is scheduled against interest, fees, principal, and early payment penalties
        var schedule = rules.CreateSchedule(command.Payment, command.LoanInformation);
        return new PostPaymentSchedule(schedule);
    }

Finally, we’ll have one last handler thread to actually post the payment schedule to the 3rd party service:

    public static async Task Handle(
        PostPaymentSchedule command, 
        IThirdPartyServiceGateway gateway,
        CancellationToken cancellation)
    {
        // I'm leaving it async here because I'm going to add a lot more to this
        // in later posts
        await gateway.PostPaymentScheduleAsync(command.Schedule, cancellation);
    }

To rewind, we have these four command or event messages:

  1. FileDetected – can be processed in parallel
  2. PaymentValidated – has to be processed single file
  3. SchedulePayment – can be processed in parallel
  4. PostPaymentSchedule – has to be processed single file

Moving to Wolverine setup, by default each locally handled message type is published to its own local queue with the parallelization set to the detected number of processors on the hosting server — but we can happily override that to handle any number of message types in the same local queue or to change the processing of an individual queue. In this case we’re going to change the message handling for PaymentValidated and PostPaymentSchedule to be processed sequentially:

using Microsoft.Extensions.Hosting;
using Oakton;
using Wolverine;
using WolverineIngestionService;

return await Host.CreateDefaultBuilder()
    .UseWolverine(opts =>
    {
        // There'd obviously be a LOT more set up and service registrations
        // to be a real application
        
        opts.LocalQueueFor<PaymentValidated>().Sequential();
        opts.LocalQueueFor<PostPaymentSchedule>().Sequential();
    }).RunOaktonCommands(args);

And just like that, we’ve got a producer/consumer system that is able to communicate through local queues with Wolverine. From the dotnet run -- describe diagnostics for this application, you can see the listeners:

┌────────────────────────┬────────────────────────┬──────────────────┬────────────────────────┬────────────────────────┐
│ Uri                    │ Name                   │ Mode             │ Execution              │ Serializers            │
├────────────────────────┼────────────────────────┼──────────────────┼────────────────────────┼────────────────────────┤
│ local://default/       │ default                │ BufferedInMemory │ MaxDegreeOfParallelism │ NewtonsoftSerializer   │
│                        │                        │                  │ : 16, EnsureOrdered:   │ (application/json)     │
│                        │                        │                  │ False                  │                        │
│ local://durable/       │ durable                │ Durable          │ MaxDegreeOfParallelism │ NewtonsoftSerializer   │
│                        │                        │                  │ : 16, EnsureOrdered:   │ (application/json)     │
│                        │                        │                  │ False                  │                        │
│ local://replies/       │ replies                │ BufferedInMemory │ MaxDegreeOfParallelism │ NewtonsoftSerializer   │
│                        │                        │                  │ : 16, EnsureOrdered:   │ (application/json)     │
│                        │                        │                  │ False                  │                        │
│ local://wolverineinges │ wolverineingestionserv │ BufferedInMemory │ MaxDegreeOfParallelism │ NewtonsoftSerializer   │
│ tionservice.filedetect │ ice.filedetected       │                  │ : 16, EnsureOrdered:   │ (application/json)     │
│ ed/                    │                        │                  │ False                  │                        │
│ local://wolverineinges │ wolverineingestionserv │ BufferedInMemory │ MaxDegreeOfParallelism │ NewtonsoftSerializer   │
│ tionservice.paymentval │ ice.paymentvalidated   │                  │ : 1, EnsureOrdered:    │ (application/json)     │
│ idated/                │                        │                  │ True                   │                        │
│ local://wolverineinges │ wolverineingestionserv │ BufferedInMemory │ MaxDegreeOfParallelism │ NewtonsoftSerializer   │
│ tionservice.postpaymen │ ice.postpaymentschedul │                  │ : 1, EnsureOrdered:    │ (application/json)     │
│ tschedule/             │ e                      │                  │ True                   │                        │
│ local://wolverineinges │ wolverineingestionserv │ BufferedInMemory │ MaxDegreeOfParallelism │ NewtonsoftSerializer   │
│ tionservice.schedulepa │ ice.schedulepayment    │                  │ : 16, EnsureOrdered:   │ (application/json)     │
│ yment/                 │                        │                  │ False                  │                        │
└────────────────────────┴────────────────────────┴──────────────────┴────────────────────────┴────────────────────────┘

and the message routing:

┌───────────────────────────────────────────────┬────────────────────────────────────────────────────────┬──────────────────┐
│ Message Type                                  │ Destination                                            │ Content Type     │
├───────────────────────────────────────────────┼────────────────────────────────────────────────────────┼──────────────────┤
│ WolverineIngestionService.FileDetected        │ local://wolverineingestionservice.filedetected/        │ application/json │
│ WolverineIngestionService.PaymentValidated    │ local://wolverineingestionservice.paymentvalidated/    │ application/json │
│ WolverineIngestionService.PostPaymentSchedule │ local://wolverineingestionservice.postpaymentschedule/ │ application/json │
│ WolverineIngestionService.SchedulePayment     │ local://wolverineingestionservice.schedulepayment/     │ application/json │
└───────────────────────────────────────────────┴────────────────────────────────────────────────────────┴──────────────────┘

Summary and next time…

So far, most folks seem to be considering Wolverine as an in process mediator tool or less frequently as an asynchronous messaging tool between processes. However, Wolverine also has a strong model for asynchronous processing through its local queueing model. It’s not shown in this post, but the command/event/message handling is fully instrumented with logging, Open Telemetry tracing, and all of Wolverine’s error handling capabilities.

Next time out, I’m going to use this same system to demonstrate some of Wolverine’s resiliency features that were absolutely inspired by my experiences building and supporting the system this post is based on.

Leave a comment