This recipe is a slight modification of the previous one. We will once again create a set of tasks to be processed asynchronously by multiple workers, but this time we implement it with ConcurrentStack and see the differences.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
//
// Changing Asynchronous Processing Order ConcurrentStack
//
// This recipe is a slight modification of the previous one. We will once again
// create a set of tasks to be processed asynchronously by multiple workers, but
// this time we implement it with ConcurrentStack and see the differences.
//
// How it works...
//
// When the program runs, we now create an instance of the ConcurrentStack
// collection. The rest is almost like in the previous recipe, except instead of
// using the Push and TryPop methods on the concurrent stack, we use Enqueue and
// TryDequeue on a concurrent queue.
//
// We now see that the task processing order has been changed. The stack is a
// LIFO collection and workers process the later tasks first. In case of a
// concurrent queue, tasks were processed in almost the same order in which they
// were added. This means that by depending on the number of workers, we will
// surely process the task that was created first in a given time frame. In case
// of a stack, the tasks that were created earlier will have lower priority and
// may be not processed until a producer stops putting more tasks to the stack.
// This behavior is very specific and it is much better to use a queue in this
// scenario.
//
namespace Chapter6.Recipe3
{
class Program
{
static void Main(string[] args)
{
Task t = RunProgram();
t.Wait();
}
static async Task RunProgram()
{
var taskStack = new ConcurrentStack<CustomTask>();
var cts = new CancellationTokenSource();
var taskSource = Task.Run(() => TaskProducer(taskStack));
Task[] processors = new Task[4];
for (int i = 1; i <= 4; i++)
{
string processorId = i.ToString();
processors[i - 1] = Task.Run(() =>
TaskProcessor(taskStack, "Processor " + processorId, cts.Token)
);
}
await taskSource;
cts.CancelAfter(TimeSpan.FromSeconds(2));
await Task.WhenAll(processors);
}
static async Task TaskProducer(ConcurrentStack<CustomTask> stack)
{
for (int i = 1; i <= 20; i++)
{
await Task.Delay(50);
var workItem = new CustomTask { Id = i };
stack.Push(workItem);
Console.WriteLine("Task {0} has been posted", workItem.Id);
}
}
static async Task TaskProcessor(
ConcurrentStack<CustomTask> stack,
string name,
CancellationToken token)
{
await GetRandomDelay();
do
{
CustomTask workItem;
bool popSuccesful = stack.TryPop(out workItem);
if (popSuccesful)
{
Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name);
}
await GetRandomDelay();
}
while (!token.IsCancellationRequested);
}
static Task GetRandomDelay()
{
int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
return Task.Delay(delay);
}
class CustomTask
{
public int Id { get; set; }
}
}
}