Skip to main content

BlockingCollection is a thread-safe collection class that provides blocking and bounding functionality. Bounding means that you can set the maximum capacity of a collection, which enables you to control the maximum size of the collection in the memory. Multiple threads can add items to a collection concurrently, but if the collection reaches capacity, the producing threads will block until items are removed.

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

//
// .NET 4.5 Parallel Extensions Cookbook
// Chapter 5: Adding and removing items to BlockingCollection
//
// BlockingCollection<T> is a thread safe collection class that provides
// blocking and bounding functionality. Bounding means that you can set the
// maximum capacity of a collection, which enables you to control the maximum
// size of the collection in the memory.
//
// Multiple threads can add items to a collection concurrently, but if the
// collection reaches capacity, the producing threads will block until items are
// removed. Multiple consumers can remove items from the collection
// concurrently. If the collection becomes empty, consumption will block until
// more items are produced and added to the collection.
//
// In this recipe, we will take a look at the basics of adding items to, and
// removing items from BlockingCollection.
//
// We are going to create a Console application that initializes a range of
// integers and creates a parallel task to add the numbers to a blocking
// collection. Another parallel task will be created to remove items from the
// collection.
//
// https://www.packtpub.com/application-development/net-45-parallel-extensions-cookbook
//

namespace BlockingCollection
{
    class Program
    {
        static void Main()
        {
            var data = Enumerable.Range(0, 100);
            var numbers = new BlockingCollection<int>(100);
            var tasks = new List<Task>();

            // A simple blocking producer
            var producer = Task.Factory.StartNew(() =>
            {
                foreach (var item in data)
                {
                    numbers.Add(item);
                    Console.WriteLine("Adding:{0} Item Count={1}", item, numbers.Count);
                }
                numbers.CompleteAdding();
            });

            // A simple blocking consumer.
            var consumer = Task.Factory.StartNew(() =>
            {
                int item = -1;
                while (!numbers.IsCompleted)
                {
                    try
                    {
                        item = numbers.Take();
                    }
                    catch (InvalidOperationException)
                    {
                        Console.WriteLine("Nothing to take");
                        break;
                    }
                    Console.WriteLine("Taking:{0} ", item);
                    // wait for a bit
                    Thread.SpinWait(1000);
                }

                Console.WriteLine("\rNo more items to take.");
            });

            tasks.Add(producer);
            tasks.Add(consumer);

            try
            {
                Task.WaitAll(tasks.ToArray());
            }
            catch (AggregateException ae)
            {
                foreach (var v in ae.InnerExceptions)
                {
                    Console.WriteLine(v.Message);
                }
            }

            // Console.ReadLine();
        }
    }
}