Skip to main content

Example of building a producer/consumer queue in C#.

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using System.Text;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Security.Cryptography;

/// <summary>
/// A simple object that will represent the work to be
/// performed by the consumer threads.
///
/// Represents a task that's added to the work queue by the
/// producers and retrieved by the consumers to act upon.
/// </summary>
/// <remarks>
/// https://dotnetcodr.com/2015/09/08/using-the-blockingcollection-for-thread-safe-producer-consumer-scenarios-in-net-part-2/
/// </remarks>
public class WorkTask
{
    public WorkTask(string description, DateTime insertedUtc)
    {
        Description = description;
        InsertedUtc = insertedUtc;
    }

    public string Description { get; set; }
    public DateTime InsertedUtc { get; set; }
}

// -------------------------------------------------------------------------
// WorkQueue.cs
// -------------------------------------------------------------------------

public class WorkQueue
{
    private readonly BlockingCollection<WorkTask> _workQueue;

    /// <summary>
    /// An object that holds the work queue itself.
    ///
    /// Constructs a blocking collection with a concurrent queue
    /// as the underlying thread-safe collection.
    ///
    /// This class encapsulates the work queue itself. It will
    /// be the consumer of the work queue, via the
    /// "MonitorWorkQueue" method.
    /// </summary>
    /// <remarks>https://dotnetcodr.com/2015/09/08/using-the-blockingcollection-for-thread-safe-producer-consumer-scenarios-in-net-part-2/</remarks>
    public WorkQueue(IProducerConsumerCollection<WorkTask> workTaskCollection)
    {
        _workQueue = new BlockingCollection<WorkTask>(workTaskCollection);
    }

    public void AddTask(WorkTask workTask)
    {
        _workQueue.Add(workTask);
    }

    /// <summary>
    /// Send a signal to the Consumer that all Producers have
    /// finished adding new items to the Work Queue.
    ///
    /// This is to simulate a scenario where you can determine
    /// in advance, when all expected items have been added to
    /// the work queue.
    ///
    /// The "CompleteAdding()" method marks the collection
    /// as closed and won't accept any more items. Calling
    /// BlockingCollection.Take() will throw an exception
    /// of type InvalidOperationException so we'll need
    /// to catch that.
    /// </summary>
    public void AllItemsAdded()
    {
        _workQueue.CompleteAdding();
    }

    /// <summary>
    /// Monitors the queue.
    ///
    /// The _workQueue.Take() method will block the thread until
    /// there's an element to be retrieved.
    /// </summary>
    public void MonitorWorkQueue()
    {
        while (true)
        {
            try
            {
                WorkTask wt = _workQueue.Take();

                System.Console.WriteLine(
                    "Thread '{0}' processing Work Task '{1}', entered on '{2}'.",
                    Thread.CurrentThread.ManagedThreadId,
                    wt.Description, wt.InsertedUtc);
            }
            catch (InvalidOperationException ex)
            {
                Debug.WriteLine(ex.Message);

                System.Console.WriteLine(
                    "The Work Queue on Thread '{0}' has been Closed.",
                    Thread.CurrentThread.ManagedThreadId);

                break;
            }
        }
    }

}

// -------------------------------------------------------------------------
// WorkItemProducer.cs
// -------------------------------------------------------------------------

/// <summary>
/// The producer will need a WorkQueue which it can add
/// the work items to. It will also need a method to
/// continuously produce work items.
///
/// We'll let the infinite loop (ProduceWorkItems) sleep for 2 seconds.
/// </summary>
/// <remarks>https://dotnetcodr.com/2015/09/09/using-the-blockingcollection-for-thread-safe-producer-consumer-scenarios-in-net-part-3/</remarks>
public class WorkItemProducer
{
    private readonly WorkQueue _workQueue;

    public WorkItemProducer(WorkQueue workQueue)
    {
        _workQueue = workQueue;
    }

    /// <summary>
    /// The Producer will generate a random upper limit for
    /// a for-loop.
    ///
    /// The upper limit will indicate the maximum number of
    /// work items to be added to the queue.
    /// </summary>
    public void ProduceWorkItems()
    {
        int upperLimit = new Random().Next(5, 11);

        for (int i = 0; i <= upperLimit; i++)
        {
            Guid jobId = Guid.NewGuid();

            WorkTask wt = new WorkTask(
                string.Concat("Work with Job ID ", jobId), DateTime.UtcNow);

            System.Console.WriteLine(
                "Thread '{0}' added work '{1}' at '{2}' to the Work Queue in iteration '{3}'.",
                Thread.CurrentThread.ManagedThreadId, wt.Description,
                wt.InsertedUtc, i + 1);

            _workQueue.AddTask(wt);

            Thread.Sleep(2000);
        }
    }
}

// -------------------------------------------------------------------------
// BlockingCollectionSampleService.cs
// -------------------------------------------------------------------------

public class BlockingCollectionSampleService
{
    /// <summary>
    /// The code that runs the threads.
    ///
    /// First, wait for all Producer threads to finish.
    /// Once the Producer threads complete, tell the Consumers
    /// to close.
    /// </summary>
    /// <remarks>https://dotnetcodr.com/2015/09/11/using-the-blockingcollection-for-thread-safe-producer-consumer-scenarios-in-net-part-4/</remarks>
    public void RunBlockingCollectionCodeSample()
    {
        WorkQueue workQueue = new WorkQueue(new ConcurrentQueue<WorkTask>());
        WorkItemProducer producerOne = new WorkItemProducer(workQueue);
        WorkItemProducer producerTwo = new WorkItemProducer(workQueue);
        WorkItemProducer producerThree = new WorkItemProducer(workQueue);

        Task producerTaskOne = Task.Run(() => producerOne.ProduceWorkItems());
        Task producerTaskTwo = Task.Run(() => producerTwo.ProduceWorkItems());
        Task producerTaskThree = Task.Run(() => producerThree.ProduceWorkItems());

        Task consumerTaskOne = Task.Run(() => workQueue.MonitorWorkQueue());
        Task consumerTaskTwo = Task.Run(() => workQueue.MonitorWorkQueue());

        System.Console.WriteLine("> Waiting for Producers to finish...");
        System.Console.WriteLine();
        Task.WaitAll(
            producerTaskOne,
            producerTaskThree,
            producerTaskTwo
        );
        System.Console.WriteLine();
        System.Console.WriteLine("Producers finished.");
        System.Console.WriteLine();

        workQueue.AllItemsAdded();

        System.Console.WriteLine("> Waiting for Consumers to finish...");
        System.Console.WriteLine();
        Task.WaitAll(
            consumerTaskOne,
            consumerTaskTwo
        );
        System.Console.WriteLine();
        System.Console.WriteLine("Consumers finished.");
        System.Console.WriteLine();

        System.Console.WriteLine("Done.");
    }
}

// -------------------------------------------------------------------------
// Program.cs (Example Usage)
// -------------------------------------------------------------------------

class Program
{
    static void Main(string[] args)
    {
        var service = new BlockingCollectionSampleService();
        service.RunBlockingCollectionCodeSample();
    }
}