Skip to main content

This recipe will show an example of creating a set of tasks to be processed asynchronously by multiple workers.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

//
// Implementing Asynchronous Processing Using ConcurrentQueue
//
// This recipe will show an example of creating a set of tasks to be processed
// asynchronously by multiple workers.
//
// How it works...
//
// When the program runs, we create a queue of tasks with an instance of the
// ConcurrentQueue collection. Then we create a cancellation token, which will
// be used to stop work after we are done posting tasks to the queue. Next, we
// start a separate worker thread that will be posting tasks to the tasks queue.
// This part produces a workload for our asynchronous processing.
//
// Now let us define a task-consuming part of the program. We create four
// workers that will wait a random time, then get a task from the task queue,
// process it, and repeat the whole process until we signal the cancellation
// token. Finally, we start the task-producing thread, wait for its completion,
// and then signal to the consumers that we finished work with the cancellation
// token. The last step will be to wait for all our consumers to complete.
//
// We see that we have tasks processing from start to end, but it is possible
// that a later task will be processed before an earlier one because we have
// four workers running independently and the task processing time is not
// constant. We see that the access to the queue is thread-safe; no work item
// was taken twice.
//

namespace Chapter6.Recipe2
{
    class Program
    {
        static void Main(string[] args)
        {
            Task t = RunProgram();
            t.Wait();
        }

        static async Task RunProgram()
        {
            var taskQueue = new ConcurrentQueue<CustomTask>();
            var cts = new CancellationTokenSource();

            var taskSource = Task.Run(() => TaskProducer(taskQueue));

            Task[] processors = new Task[4];
            for (int i = 1; i <= 4; i++)
            {
                string processorId = i.ToString();
                processors[i - 1] = Task.Run(() =>
                    TaskProcessor(taskQueue, "Processor " + processorId, cts.Token)
                );
            }

            await taskSource;
            cts.CancelAfter(TimeSpan.FromSeconds(2));

            await Task.WhenAll(processors);
        }

        static async Task TaskProducer(ConcurrentQueue<CustomTask> queue)
        {
            for (int i = 1; i <= 20; i++)
            {
                await Task.Delay(50);
                var workItem = new CustomTask {Id = i};
                queue.Enqueue(workItem);
                Console.WriteLine("Task {0} has been posted", workItem.Id);
            }
        }

        static async Task TaskProcessor(
            ConcurrentQueue<CustomTask> queue,
            string name,
            CancellationToken token)
        {
            CustomTask workItem;
            bool dequeueSuccesful = false;

            await GetRandomDelay();

            do
            {
                dequeueSuccesful = queue.TryDequeue(out workItem);
                if (dequeueSuccesful)
                {
                    Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name);
                }
                await GetRandomDelay();
            }

            while (!token.IsCancellationRequested);
        }

        static Task GetRandomDelay()
        {
            int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
            return Task.Delay(delay);
        }

        class CustomTask
        {
            public int Id { get; set; }
        }
    }
}