Recently I was working with a data processing pipeline where some work items progressed through a number of different stages. The pipeline was running synchronously so it would fully complete before picking up the next work item.
The work items were not related in any way so processing them in parallel was an option and, as the different pipeline stages took varying amounts of time, I decided to parallelize each stage separately, have different numbers of worker threads for each stage and separated the stages with queues. The pipeline was running on a single machine with the worker threads all part of the same process and the queues were just FIFO data structures sitting in RAM - a relatively simple setup.
The issue I encountered pretty quickly was that the stages of the pipeline processed the work items at different rates and, in a couple of cases, not in a predictable way that I could solve by tweaking the numbers of worker threads used for each stage. Where the stage acting as the consumer of a queue was going slower than the stage that was the producer, the list of items pending built up and used up all the available memory pretty quickly.
I needed to be able to limit the number of pending items in each queue and block the publishers to that queue until the consumers caught up.
One way of achieving this is using semaphores to keep track of the number of "slots" used and have the producer threads block on the semaphore until a slot is available.
Another option is the underutilised TPL Dataflow library and solutions which work this way are relatively simple, examples of which are out there on the web such as this one on Stephen Cleary's blog where a BoundedCapacity is applied.
The option I went with was to wrap my ConcurrentQueue in a BlockingCollection with boundedCapacity specified. This has the effect of causing any Add operations on the collection to block until there is space available. Below is an example from MSDN slightly tweaked to introduce throttling to the producer Task.
You can see from the example output that, once the collection is at capacity the producer is forced to wait for the consumer to free up space in the collection before it can add more items.
No comments:
Post a Comment