Skip to main content

A .NET Core console application that demonstrates some of the ways PLINQ query execution may be controlled and configured. The sample source code is a .NET Core Console application written in C#, that demonstrates some of the ways PLINQ query execution may be controlled and configured. This sample is written in C# and targets .NET Core 3.1. It requires the .NET Core 3.1 SDK.

//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        // Uncomment each of the below four lines one-by-one
        // to test the relevant PLINQ operation

        await AsOrdered();
        // await WithMergeOptions();
        // await WithCancellation();
        // WithDegreeOfParallelism();

        await Task.CompletedTask;
    }

    static async Task AsOrdered()
    {
        #region Sequential
        var items = Enumerable.Range(1, 100);
        var q = from e in items
                where e % 2 == 0 // is even
                select DoWork(e);

        foreach (var e in q)
        {
            Console.WriteLine(await e);
        }
        Console.Write("Complete: Sequential");
        Console.ReadLine();
        #endregion

        #region Parallel
        var items2 = ParallelEnumerable.Range(1, 100);
        q = from e in items2
            where e % 2 == 0 // is even
            select DoWork(e);

        foreach (var e in q)
        {
            Console.WriteLine(await e);
        }
        Console.Write("Complete: Parallel");
        Console.ReadLine();
        #endregion

        #region Parallel with Ordering
        q = from e in items2.AsOrdered()
            where e % 2 == 0 // is even
            select DoWork(e);

        foreach (var e in q)
        {
            Console.WriteLine(await e);
        }

        Console.Write("Complete: Parallel with Ordering");
        Console.ReadLine();
        #endregion
    }

    static async Task WithMergeOptions()
    {
        #region Define the query
        var items = ParallelEnumerable.Range(1, 1_000);
        var q = from e in items
                select DoWork(e);
        #endregion

        #region Auto Buffered
        foreach (var e in q)
        {
            Console.WriteLine(await e);
        }

        Console.Write("Complete: Auto buffered");
        Console.ReadLine();
        #endregion

        #region Fully Buffered
        foreach (var e in q.WithMergeOptions(ParallelMergeOptions.FullyBuffered))
        {
            Console.WriteLine(await e);
        }

        Console.Write("Complete: Fully buffered");
        Console.ReadLine();
        #endregion

        #region Not buffered
        foreach (var e in q.WithMergeOptions(ParallelMergeOptions.NotBuffered))
        {
            Console.WriteLine(await e);
        }

        Console.Write("Complete: Not buffered");
        Console.ReadLine();
        #endregion
    }

    static async Task WithCancellation()
    {
        #region Define the query
        var items = ParallelEnumerable.Range(1, 1_000);
        var q = from e in items.WithMergeOptions(ParallelMergeOptions.NotBuffered)
                select DoWork(e);

        var cts = new CancellationTokenSource();

        #endregion

        #region Kick off the asynchronous cancellation
        _ = Task.Run(async () =>
        {
            await Task.Delay(300);
            cts.Cancel();
        });
        #endregion

        #region Enumerate the query
        try
        {
            foreach (var e in q.WithCancellation(cts.Token))
            {
                Console.WriteLine(await e);
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Cancelled!");
        }
        Console.ReadLine();
        #endregion
    }

    static void WithDegreeOfParallelism()
    {
        var items = ParallelEnumerable.Range(1, 200);

        #region Default DOP
        var sw = new Stopwatch();
        sw.Start();
        Console.Write("Default DOP: ");

        items.Average(e => DoWork2(e));

        sw.Stop();
        Console.WriteLine(sw.ElapsedMilliseconds);
        Console.WriteLine("Complete: Default DOP");
        #endregion

        #region DOP = 2
        sw.Reset();
        sw.Start();
        Console.Write("DOP = 2: ");

        items.WithDegreeOfParallelism(2)
             .Average(e => DoWork2(e));

        sw.Stop();
        Console.WriteLine(sw.ElapsedMilliseconds);
        Console.Write("Complete: DOP = 2");
        Console.ReadLine();
        #endregion
    }

    #region Helper functions
    static async Task<int> DoWork(int input)
    {
        await Task.Delay(20);
        return input * 2;
    }

    static int DoWork2(int input)
    {
        Thread.SpinWait(5_000_000);
        return input * 2;
    }
    #endregion
}