BlockingCollection
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
//
// .NET 4.5 Parallel Extensions Cookbook
// Chapter 5: Adding and removing items to BlockingCollection
//
// BlockingCollection<T> is a thread safe collection class that provides
// blocking and bounding functionality. Bounding means that you can set the
// maximum capacity of a collection, which enables you to control the maximum
// size of the collection in the memory.
//
// Multiple threads can add items to a collection concurrently, but if the
// collection reaches capacity, the producing threads will block until items are
// removed. Multiple consumers can remove items from the collection
// concurrently. If the collection becomes empty, consumption will block until
// more items are produced and added to the collection.
//
// In this recipe, we will take a look at the basics of adding items to, and
// removing items from BlockingCollection.
//
// We are going to create a Console application that initializes a range of
// integers and creates a parallel task to add the numbers to a blocking
// collection. Another parallel task will be created to remove items from the
// collection.
//
// https://www.packtpub.com/application-development/net-45-parallel-extensions-cookbook
//
namespace BlockingCollection
{
class Program
{
static void Main()
{
var data = Enumerable.Range(0, 100);
var numbers = new BlockingCollection<int>(100);
var tasks = new List<Task>();
// A simple blocking producer
var producer = Task.Factory.StartNew(() =>
{
foreach (var item in data)
{
numbers.Add(item);
Console.WriteLine("Adding:{0} Item Count={1}", item, numbers.Count);
}
numbers.CompleteAdding();
});
// A simple blocking consumer.
var consumer = Task.Factory.StartNew(() =>
{
int item = -1;
while (!numbers.IsCompleted)
{
try
{
item = numbers.Take();
}
catch (InvalidOperationException)
{
Console.WriteLine("Nothing to take");
break;
}
Console.WriteLine("Taking:{0} ", item);
// wait for a bit
Thread.SpinWait(1000);
}
Console.WriteLine("\rNo more items to take.");
});
tasks.Add(producer);
tasks.Add(consumer);
try
{
Task.WaitAll(tasks.ToArray());
}
catch (AggregateException ae)
{
foreach (var v in ae.InnerExceptions)
{
Console.WriteLine(v.Message);
}
}
// Console.ReadLine();
}
}
}