Skip to main content

C# console application that has producer Task and consumer Task. The producer will be adding items to BlockingCollection using TryAdd, and the consumer will be removing items using Try. After the producer and consumer get started, we will call the Cancel method on a token source to see how we can use the TryAdd and TryTake overloads to handle cancellation of our operation.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

//
// .NET 4.5 Parallel Extensions Cookbook
// Chapter 5: Cancelling an operation in a concurrent collection
//
// When working with BlockingCollection, most Add and Take operations are
// performed in a loop. The TryAdd and TryTake methods of BlockingCollection can
// accept a CancellationToken parameter so that we can respond to cancellation
// requests and break out of a loop.
//
// In this recipe, we are going to create a Console application that has
// producer Task and consumer Task. The producer will be adding items to
// BlockingCollection using TryAdd, and the consumer will be removing items
// using Try.
//
// After the producer and consumer get started, we will call the Cancel method
// on a token source to see how we can use the TryAdd and TryTake overloads to
// handle cancellation of our operation.
//
// NOTE: For this recipe, we need to turn off the Visual Studio 2012 Exception
// Assistant. The Exception Assistant appears whenever a runtime Exception is
// thrown, and intercepts the Exception before it gets to our handler.
//
//  1. To turn off the Exception Assistant, go to the Debug menu and select Exceptions.
//  2. Uncheck the User-unhandled checkbox next to Common Language Runtime Exceptions.
//
// https://www.packtpub.com/application-development/net-45-parallel-extensions-cookbook
//

namespace CancelOperation
{
    class Program
    {
        static void Main(string[] args)
        {
            var data = Enumerable.Range(0, 100);
            var numbers = new BlockingCollection<int>(100);

            var tokenSource = new CancellationTokenSource();
            var token = tokenSource.Token;

            // A simple blocking producer
            Task.Factory.StartNew(() =>
            {
                foreach (var item in data)
                {
                    try
                    {
                        numbers.TryAdd(item, 5, token);
                        Console.WriteLine("Adding:{0} Item Count={1}", item, numbers.Count);
                    }
                    catch (OperationCanceledException)
                    {
                        Console.WriteLine("Adding operation has been cancelled");
                        numbers.CompleteAdding();
                        break;
                    }
                }
                numbers.CompleteAdding();
            }, token);

            // A simple blocking consumer
            Task.Factory.StartNew(() =>
            {
                int item = -1;

                while (!numbers.IsCompleted)
                {
                    try
                    {
                        numbers.TryTake(out item, 5, token);
                    }
                    catch (OperationCanceledException)
                    {
                        Console.WriteLine("Take operation has been cancelled");
                        break;
                    }
                    Console.WriteLine("Taking:{0} ", item);
                    // wait for a bit
                    Thread.SpinWait(10000);
                }

                Console.WriteLine("\rNo more items to take.");

            }, token);

            Thread.SpinWait(2000000);
            tokenSource.Cancel();

            Console.ReadLine();
        }
    }
}