Skip to main content

This recipe is a slight modification of the previous one. We will once again create a set of tasks to be processed asynchronously by multiple workers, but this time we implement it with ConcurrentStack and see the differences.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

//
// Changing Asynchronous Processing Order ConcurrentStack
//
// This recipe is a slight modification of the previous one. We will once again
// create a set of tasks to be processed asynchronously by multiple workers, but
// this time we implement it with ConcurrentStack and see the differences.
//
// How it works...
//
// When the program runs, we now create an instance of the ConcurrentStack
// collection. The rest is almost like in the previous recipe, except instead of
// using the Push and TryPop methods on the concurrent stack, we use Enqueue and
// TryDequeue on a concurrent queue.
//
// We now see that the task processing order has been changed. The stack is a
// LIFO collection and workers process the later tasks first. In case of a
// concurrent queue, tasks were processed in almost the same order in which they
// were added. This means that by depending on the number of workers, we will
// surely process the task that was created first in a given time frame. In case
// of a stack, the tasks that were created earlier will have lower priority and
// may be not processed until a producer stops putting more tasks to the stack.
// This behavior is very specific and it is much better to use a queue in this
// scenario.
//

namespace Chapter6.Recipe3
{
    class Program
    {
        static void Main(string[] args)
        {
            Task t = RunProgram();
            t.Wait();
        }

        static async Task RunProgram()
        {
            var taskStack = new ConcurrentStack<CustomTask>();
            var cts = new CancellationTokenSource();

            var taskSource = Task.Run(() => TaskProducer(taskStack));

            Task[] processors = new Task[4];
            for (int i = 1; i <= 4; i++)
            {
                string processorId = i.ToString();
                processors[i - 1] = Task.Run(() =>
                    TaskProcessor(taskStack, "Processor " + processorId, cts.Token)
                );
            }

            await taskSource;
            cts.CancelAfter(TimeSpan.FromSeconds(2));

            await Task.WhenAll(processors);
        }

        static async Task TaskProducer(ConcurrentStack<CustomTask> stack)
        {
            for (int i = 1; i <= 20; i++)
            {
                await Task.Delay(50);
                var workItem = new CustomTask { Id = i };
                stack.Push(workItem);
                Console.WriteLine("Task {0} has been posted", workItem.Id);
            }
        }

        static async Task TaskProcessor(
            ConcurrentStack<CustomTask> stack,
            string name,
            CancellationToken token)
        {
            await GetRandomDelay();
            do
            {
                CustomTask workItem;
                bool popSuccesful = stack.TryPop(out workItem);
                if (popSuccesful)
                {
                    Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name);
                }
                await GetRandomDelay();
            }

            while (!token.IsCancellationRequested);
        }

        static Task GetRandomDelay()
        {
            int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
            return Task.Delay(delay);
        }

        class CustomTask
        {
            public int Id { get; set; }
        }
    }
}