Building a Producer Consumer Queue with TPL Dataflow

I had never used the TPL Dataflow library until this summer and I was very pleasantly surprised at how easy and effective it was. 

In my last post I introduced the new “Async Daemon” feature in Marten that allows you to continuously update projected views over the event store as new events are captured in the system. In essence, the async daemon has to do two things:

  1. Fetch event data from the underlying Postgresql database and put it into the form that the projections and event processors expect
  2. Run the event data previously fetched through each projection or event processor and commit any projected document views back to the database.

Looking at it that way, the async daemon looks like a good fit for a producer/consumer queue. In this case, the event fetching “produces” batches of events for the projection “consumer” to process downstream. The goal of this approach is to improve overall throughput by allowing the fetching and processing to happen in parallel.

I had originally assumed that I would use Reactive Extensions for the async daemon, but after way too much research and dithering back and forth on my part, I decided that the TPL Dataflow library was a better fit in this particular case.

The producer/consumer queue inside of the async daemon consists of a couple main players:

  • The Fetcher class is the “producer” that continuously polls the database for the new events. It’s smart enough to pause the polling if there are no new events in the database, but otherwise it’s pretty dumb.
  • An instance of the IProjection interface that does the actual work of processing events or updating projected documents from the events.
  • The ProjectionTrack class acts as a logical controller to both Fetcher and IProjection
  • A pair of ActionBlock‘s from the TPL Dataflow library used as the consumer queue for processing events and a second queue for coordinating the activities within ProjectionTrack.

 

In the pure happy path workflow of the async daemon, it functions like this sequence diagram below:

AsyncDaemonSequence

The Fetcher object runs continuously fetching a new “page” of events and queues each page where it will be consumed by ProjectionTrack in its ExecutePage() method in a different thread.

The usage of the ActionBlock objects to connect the workflow together turned out to be pretty simple. In the following code taken from the ProjectionTrack class, I’m setting up the ActionBlock for the execution queue with a lambda to call the ExecutePage() method. One thing to notice is that I had to configure a couple options to ensure that each item enqueued to that ActionBlock is executed serially in the same order that it was received.

_executionTrack 
    = new ActionBlock<EventPage>(page => ExecutePage(page, _cancellation.Token),
	new ExecutionDataflowBlockOptions
	{
		MaxDegreeOfParallelism = 1,
		EnsureOrdered = true
	});

The value of the ActionBlock class usage is that it does all the heavy lifting for me in regards to the threading. The ActionBlock will trigger the ExecutePage() method in a different thread and ensure that every page is executed sequentially.

Incorporating Backpressure

I also wanted to incorporate the idea of “back pressure” so that if the event fetching producer is getting too far ahead of the event processing consumer, the async daemon would stop fetching new events to prevent spikes in memory usage and possibly reserve more system resources for the consumer until the consumer could catch up.

To do that, there’s a little bit of logic in ProjectionTrack that checks how many events are queued up in the execution track shown above and pauses the Fetcher if the configured threshold is exceeded:

public async Task CachePage(EventPage page)
{
	// Accumulator is just a little helper used to
	// track how many events are in flight
	Accumulator.Store(page);

	// If the consumer is backed up, stop fetching
	if (Accumulator.CachedEventCount > _projection.AsyncOptions.MaximumStagedEventCount)
	{
		_logger.ProjectionBackedUp(this, Accumulator.CachedEventCount, page);
		await _fetcher.Pause().ConfigureAwait(false);
	}


	_executionTrack?.Post(page);
}

When the consumer works through enough of the staged events, ProjectionTrack knows to restart the Fetcher to begin producing new pages of events:

// This method is called after every EventPage is successfully
// executed
public Task StoreProgress(Type viewType, EventPage page)
{
	Accumulator.Prune(page.To);

	if (shouldRestartFetcher())
	{
		_fetcher.Start(this, Lifecycle);
	}

	return Task.CompletedTask;
}

The actual “cooldown” logic inside of ProjectionTrack is implemented in this method:

private bool shouldRestartFetcher()
{
	if (_fetcher.State == FetcherState.Active) return false;

	if (Lifecycle == DaemonLifecycle.StopAtEndOfEventData && _atEndOfEventLog) return false;

	if (Accumulator.CachedEventCount <= _projection.AsyncOptions.CooldownStagedEventCount &&
		_fetcher.State == FetcherState.Paused)
	{
		return true;
	}

	return false;
}

To make this more concrete, by default Marten will pause a Fetcher if the consuming queue has over 1,000 events and won’t restart the Fetcher until the queue goes below 500. Both thresholds are configurable.

 

As I said in my last post, I thought that the async daemon overall was very challenging, but I felt that the usage of TPL Dataflow went very smoothly.

Doing it the Old Way with BlockingCollection

In the past, I’ve used the BlockingCollection to build producer/consumer queues in .Net. In the Storyteller project, I used producer/consumer queues to parallelize executing batches of specifications by dividing the work in stages that all do some kind of work on a “SpecExecutionRequest” object (read in the specification file, do some preparation work to build a “plan”, and finally to actually execute the specification). At the heart of that is a the ConsumingQueue class that allows you to queue up tasks for one of these SpecExecutionRequest stages:

    public class ConsumingQueue : IDisposable, IConsumingQueue
    {
        private readonly BlockingCollection<SpecExecutionRequest> _collection =
            new BlockingCollection<SpecExecutionRequest>(new ConcurrentBag<SpecExecutionRequest>());

        private Task _readingTask;
        private readonly Action<SpecExecutionRequest> _handler;

        public ConsumingQueue(Action<SpecExecutionRequest> handler)
        {
            _handler = handler;
        }

        public void Dispose()
        {
            _collection.CompleteAdding();
            _collection.Dispose();
        }

        // This does not block the caller
        public void Enqueue(SpecExecutionRequest plan)
        {
            _collection.Add(plan);
        }

        private void runSpecs()
        {
            // This loop runs continuously and calls _handler() for
            // each plan added to the queue in the method above
            foreach (var request in _collection.GetConsumingEnumerable())
            {
                if (request.IsCancelled) continue;

                _handler(request);
            }
        }

        public void Start()
        {
            _readingTask = Task.Factory.StartNew(runSpecs);
        }
    }

For more context, you can see how these ConsumingQueue objects are assembled and used in the SpecificationEngine class in the Storyteller codebase.

After doing it both ways, I think I prefer the TPL Dataflow approach over the older BlockingCollection mechanism.

 

 

 

 

Advertisement

8 thoughts on “Building a Producer Consumer Queue with TPL Dataflow

    1. I would, but it’s working as is right now and I’m not excited about changing anything. There is some extra complexity coming later when we start introducing projections that have to depend on other projections.

      1. Do you mean a dependency on their persistent state or in memory computation result (held by the deamon)?

  1. I haven’t read the code yet but from your article, I have the following questions:

    1) TPL Dataflow seems like overkill when you disable all parallelism and have such a simple data flow graph. What advantages do you feel it is still providing your solution?
    2) Why did you implement back-flow yourself versus using Dataflow’s instrinic throttling (BoundedCapacity) ?

    1. @Matt,

      1.) It was less code than writing it all by hand;) I was expecting to need a lot more than I did from TPL Dataflow when I was starting out as well.
      2.) Going to have to admit that I didn’t pick up on the built in bounded capacity, but I think I still would have opted to do it the way that I did. I wanted the backpressure
      to be based on how many outstanding events were in the queue rather than just how many pages of events were queued.

      – Jeremy

  2. “One thing to notice is that I had to configure a couple options to ensure that each item enqueued to that ActionBlock is executed serially in the same order that it was received.”

    You don’t actually have to configure MaxDegreeOfParallelism and EnsureOrdered. MaxDegreeOfParallelism defaults to 1 and EnsureOrdered defaults to true.

    Also, EnsureOrdered isn’t really relevant here as it ensures the order of items you take out of a block and an ActionBlock doesn’t output items (it’s an ITargetBlock but not an ISourceBlock). It’s only has an effect on TransformBlock and TransformManyBlock

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s