Example of building a producer/consumer queue in C#.
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using System.Text;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Security.Cryptography;
/// <summary>
/// A simple object that will represent the work to be
/// performed by the consumer threads.
///
/// Represents a task that's added to the work queue by the
/// producers and retrieved by the consumers to act upon.
/// </summary>
/// <remarks>
/// https://dotnetcodr.com/2015/09/08/using-the-blockingcollection-for-thread-safe-producer-consumer-scenarios-in-net-part-2/
/// </remarks>
public class WorkTask
{
public WorkTask(string description, DateTime insertedUtc)
{
Description = description;
InsertedUtc = insertedUtc;
}
public string Description { get; set; }
public DateTime InsertedUtc { get; set; }
}
// -------------------------------------------------------------------------
// WorkQueue.cs
// -------------------------------------------------------------------------
public class WorkQueue
{
private readonly BlockingCollection<WorkTask> _workQueue;
/// <summary>
/// An object that holds the work queue itself.
///
/// Constructs a blocking collection with a concurrent queue
/// as the underlying thread-safe collection.
///
/// This class encapsulates the work queue itself. It will
/// be the consumer of the work queue, via the
/// "MonitorWorkQueue" method.
/// </summary>
/// <remarks>https://dotnetcodr.com/2015/09/08/using-the-blockingcollection-for-thread-safe-producer-consumer-scenarios-in-net-part-2/</remarks>
public WorkQueue(IProducerConsumerCollection<WorkTask> workTaskCollection)
{
_workQueue = new BlockingCollection<WorkTask>(workTaskCollection);
}
public void AddTask(WorkTask workTask)
{
_workQueue.Add(workTask);
}
/// <summary>
/// Send a signal to the Consumer that all Producers have
/// finished adding new items to the Work Queue.
///
/// This is to simulate a scenario where you can determine
/// in advance, when all expected items have been added to
/// the work queue.
///
/// The "CompleteAdding()" method marks the collection
/// as closed and won't accept any more items. Calling
/// BlockingCollection.Take() will throw an exception
/// of type InvalidOperationException so we'll need
/// to catch that.
/// </summary>
public void AllItemsAdded()
{
_workQueue.CompleteAdding();
}
/// <summary>
/// Monitors the queue.
///
/// The _workQueue.Take() method will block the thread until
/// there's an element to be retrieved.
/// </summary>
public void MonitorWorkQueue()
{
while (true)
{
try
{
WorkTask wt = _workQueue.Take();
System.Console.WriteLine(
"Thread '{0}' processing Work Task '{1}', entered on '{2}'.",
Thread.CurrentThread.ManagedThreadId,
wt.Description, wt.InsertedUtc);
}
catch (InvalidOperationException ex)
{
Debug.WriteLine(ex.Message);
System.Console.WriteLine(
"The Work Queue on Thread '{0}' has been Closed.",
Thread.CurrentThread.ManagedThreadId);
break;
}
}
}
}
// -------------------------------------------------------------------------
// WorkItemProducer.cs
// -------------------------------------------------------------------------
/// <summary>
/// The producer will need a WorkQueue which it can add
/// the work items to. It will also need a method to
/// continuously produce work items.
///
/// We'll let the infinite loop (ProduceWorkItems) sleep for 2 seconds.
/// </summary>
/// <remarks>https://dotnetcodr.com/2015/09/09/using-the-blockingcollection-for-thread-safe-producer-consumer-scenarios-in-net-part-3/</remarks>
public class WorkItemProducer
{
private readonly WorkQueue _workQueue;
public WorkItemProducer(WorkQueue workQueue)
{
_workQueue = workQueue;
}
/// <summary>
/// The Producer will generate a random upper limit for
/// a for-loop.
///
/// The upper limit will indicate the maximum number of
/// work items to be added to the queue.
/// </summary>
public void ProduceWorkItems()
{
int upperLimit = new Random().Next(5, 11);
for (int i = 0; i <= upperLimit; i++)
{
Guid jobId = Guid.NewGuid();
WorkTask wt = new WorkTask(
string.Concat("Work with Job ID ", jobId), DateTime.UtcNow);
System.Console.WriteLine(
"Thread '{0}' added work '{1}' at '{2}' to the Work Queue in iteration '{3}'.",
Thread.CurrentThread.ManagedThreadId, wt.Description,
wt.InsertedUtc, i + 1);
_workQueue.AddTask(wt);
Thread.Sleep(2000);
}
}
}
// -------------------------------------------------------------------------
// BlockingCollectionSampleService.cs
// -------------------------------------------------------------------------
public class BlockingCollectionSampleService
{
/// <summary>
/// The code that runs the threads.
///
/// First, wait for all Producer threads to finish.
/// Once the Producer threads complete, tell the Consumers
/// to close.
/// </summary>
/// <remarks>https://dotnetcodr.com/2015/09/11/using-the-blockingcollection-for-thread-safe-producer-consumer-scenarios-in-net-part-4/</remarks>
public void RunBlockingCollectionCodeSample()
{
WorkQueue workQueue = new WorkQueue(new ConcurrentQueue<WorkTask>());
WorkItemProducer producerOne = new WorkItemProducer(workQueue);
WorkItemProducer producerTwo = new WorkItemProducer(workQueue);
WorkItemProducer producerThree = new WorkItemProducer(workQueue);
Task producerTaskOne = Task.Run(() => producerOne.ProduceWorkItems());
Task producerTaskTwo = Task.Run(() => producerTwo.ProduceWorkItems());
Task producerTaskThree = Task.Run(() => producerThree.ProduceWorkItems());
Task consumerTaskOne = Task.Run(() => workQueue.MonitorWorkQueue());
Task consumerTaskTwo = Task.Run(() => workQueue.MonitorWorkQueue());
System.Console.WriteLine("> Waiting for Producers to finish...");
System.Console.WriteLine();
Task.WaitAll(
producerTaskOne,
producerTaskThree,
producerTaskTwo
);
System.Console.WriteLine();
System.Console.WriteLine("Producers finished.");
System.Console.WriteLine();
workQueue.AllItemsAdded();
System.Console.WriteLine("> Waiting for Consumers to finish...");
System.Console.WriteLine();
Task.WaitAll(
consumerTaskOne,
consumerTaskTwo
);
System.Console.WriteLine();
System.Console.WriteLine("Consumers finished.");
System.Console.WriteLine();
System.Console.WriteLine("Done.");
}
}
// -------------------------------------------------------------------------
// Program.cs (Example Usage)
// -------------------------------------------------------------------------
class Program
{
static void Main(string[] args)
{
var service = new BlockingCollectionSampleService();
service.RunBlockingCollectionCodeSample();
}
}