Skip to main content

A TaskScheduler that provides control over the underlying threads utilized.

//--------------------------------------------------------------------------
//  Copyright (c) Microsoft Corporation.  All rights reserved.
//
//  File: QueuedTaskScheduler.cs
//--------------------------------------------------------------------------

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Quartz.Util
{
    // ReSharper disable All

    /// <summary>
    /// Provides a TaskScheduler that provides control over the underlying threads utilized.
    /// </summary>
    [DebuggerTypeProxy(typeof (QueuedTaskSchedulerDebugView))]
    [DebuggerDisplay("Id={Id}, ScheduledTasks = {DebugTaskCount}")]
    internal sealed class QueuedTaskScheduler : TaskScheduler, IDisposable
    {
        /// <summary>Debug view for the QueuedTaskScheduler.</summary>
        private class QueuedTaskSchedulerDebugView
        {
            /// <summary>The scheduler.</summary>
            private QueuedTaskScheduler _scheduler;

            /// <summary>Initializes the debug view.</summary>
            /// <param name="scheduler">The scheduler.</param>
            public QueuedTaskSchedulerDebugView(QueuedTaskScheduler scheduler)
            {
                _scheduler = scheduler ?? throw new ArgumentNullException("scheduler");
            }

            /// <summary>Gets all of the Tasks queued to the scheduler directly.</summary>
            public IEnumerable<Task> ScheduledTasks
            {
                get
                {
                    return _scheduler._blockingTaskQueue.ToList();
                }
            }
        }

        /// <summary>Cancellation token used for disposal.</summary>
        private readonly CancellationTokenSource _disposeCancellation = new CancellationTokenSource();

        /// <summary>
        /// The maximum allowed concurrency level of this scheduler.  If custom threads are
        /// used, this represents the number of created threads.
        /// </summary>
        private readonly int _concurrencyLevel;

        /// <summary>Whether we're processing tasks on the current thread.</summary>
        private static ThreadLocal<bool> _taskProcessingThread = new ThreadLocal<bool>();

        /// <summary>The threads used by the scheduler to process work.</summary>
        private readonly Thread[] _threads;

        /// <summary>The collection of tasks to be executed on our custom threads.</summary>
        private readonly BlockingCollection<Task> _blockingTaskQueue;

        /// <summary>Initializes the scheduler.</summary>
        /// <param name="threadCount">The number of threads to create and use for processing work items.</param>
        public QueuedTaskScheduler(int threadCount)
            : this(
                threadCount,
                string.Empty,
                false,
                ThreadPriority.Normal)
        {
        }

        /// <summary>Initializes the scheduler.</summary>
        /// <param name="threadCount">The number of threads to create and use for processing work items.</param>
        /// <param name="threadName">The name to use for each of the created threads.</param>
        /// <param name="useForegroundThreads">A Boolean value that indicates whether to use foreground threads instead of background.</param>
        /// <param name="threadPriority">The priority to assign to each thread.</param>
        public QueuedTaskScheduler(
            int threadCount,
            string threadName = "",
            bool useForegroundThreads = false,
            ThreadPriority threadPriority = ThreadPriority.Normal)
        {
            // Validates arguments (some validation is left up to the Thread type itself).
            // If the thread count is 0, default to the number of logical processors.
            if (threadCount < 0)
            {
                throw new ArgumentOutOfRangeException("concurrencyLevel");
            }
            else if (threadCount == 0)
            {
                _concurrencyLevel = Environment.ProcessorCount;
            }
            else
            {
                _concurrencyLevel = threadCount;
            }

            // Initialize the queue used for storing tasks
            _blockingTaskQueue = new BlockingCollection<Task>();

            // Create all of the threads
            _threads = new Thread[threadCount];
            for (int i = 0; i < threadCount; i++)
            {
                _threads[i] = new Thread(() => ThreadBasedDispatchLoop())
                {
                    Priority = threadPriority,
                    IsBackground = !useForegroundThreads,
                };
                if (threadName != null)
                {
                    _threads[i].Name = $"{threadName} ({i})";
                }
            }

            // Start all of the threads
            foreach (var thread in _threads)
            {
                thread.Start();
            }
        }

        /// <summary>The dispatch loop run by all threads in this scheduler.</summary>
        private void ThreadBasedDispatchLoop()
        {
            _taskProcessingThread.Value = true;
            try
            {
                // If a thread abort occurs, we'll try to reset it and continue running.
                while (true)
                {
                    try
                    {
                        // For each task queued to the scheduler, try to execute it.
                        foreach (var task in _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token))
                        {
                            TryExecuteTask(task);
                        }
                    }
                    catch (ThreadAbortException)
                    {
                        // If we received a thread abort, and that thread abort was due to shutting down
                        // or unloading, let it pass through.  Otherwise, reset the abort so we can
                        // continue processing work items.
                        if (!Environment.HasShutdownStarted && !AppDomain.CurrentDomain.IsFinalizingForUnload())
                        {
                            Thread.ResetAbort();
                        }
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // If the scheduler is disposed, the cancellation token will be set and
                // we'll receive an OperationCanceledException.  That OCE should not crash the process.
            }
            finally
            {
                _taskProcessingThread.Value = false;
            }
        }

        /// <summary>Gets the number of tasks currently scheduled.</summary>
        private int DebugTaskCount
        {
            get { return _blockingTaskQueue.Count; }
        }

        /// <summary>Queues a task to the scheduler.</summary>
        /// <param name="task">The task to be queued.</param>
        protected override void QueueTask(Task task)
        {
            // If we've been disposed, no one should be queueing
            if (_disposeCancellation.IsCancellationRequested)
            {
                throw new ObjectDisposedException(GetType().Name);
            }

            // add the task to the blocking queue
            _blockingTaskQueue.Add(task);
        }

        /// <summary>Tries to execute a task synchronously on the current thread.</summary>
        /// <param name="task">The task to execute.</param>
        /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param>
        /// <returns>true if the task was executed; otherwise, false.</returns>
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            // If we're already running tasks on this threads, enable inlining
            return _taskProcessingThread.Value && TryExecuteTask(task);
        }

        /// <summary>Gets the tasks scheduled to this scheduler.</summary>
        /// <returns>An enumerable of all tasks queued to this scheduler.</returns>
        /// <remarks>This does not include the tasks on sub-schedulers.  Those will be retrieved by the debugger separately.</remarks>
        protected override IEnumerable<Task> GetScheduledTasks()
        {
            // Get the tasks from the blocking queue.
            return _blockingTaskQueue.ToList();
        }

        /// <summary>Gets the maximum concurrency level to use when processing tasks.</summary>
        public override int MaximumConcurrencyLevel
        {
            get { return _concurrencyLevel; }
        }

        /// <summary>Initiates shutdown of the scheduler.</summary>
        public void Dispose()
        {
            _disposeCancellation.Cancel();
        }
    }
}

// ----------------------------------------------------------------------------
// from "ParallelExtensionsExtras Tour -- #7 Additional TaskSchedulers: QueuedTaskScheduler" (Blogs MSDN)
// by April 9, 2010 by Stephen Toub - MSFT
// <https://blogs.msdn.microsoft.com/pfxteam/2010/04/09/parallelextensionsextras-tour-7-additional-taskschedulers/>
//
// The full set of ParallelExtensionsExtras Tour posts is available here:
// <https://blogs.msdn.microsoft.com/pfxteam/2010/04/04/a-tour-of-parallelextensionsextras/>
//
// Samples for Parallel Programming with the .NET Framework:
// <https://code.msdn.microsoft.com/ParExtSamples>
// ----------------------------------------------------------------------------
//
// Found in the "QueuedTaskScheduler.cs" file, QueuedTaskScheduler provides a
// wealth of functionality all wrapped up into a single scheduler type. It
// supports:
//
// ## Priorities
//
// You create a queue of off the scheduler with a particular priority, and that
// queue is itself a TaskScheduler. Any tasks you schedule to that scheduler
// then are tagged with that priority, and the scheduler will service tasks in
// priority order. e.g.
//
//     QueuedTaskScheduler qts = new QueuedTaskScheduler();
//     TaskScheduler pri0 = qts.CreateQueue(priority:0);
//     TaskScheduler pri1 = qts.CreateQueue(priority:1);
//
// Any tasks scheduled to pri0 will get priority over tasks scheduled to pri1,
// even if scheduled after.
//
// ## Fairness
//
// It's often the case that you have batches of work, and you want each of these
// batches to be treated fairly with each other, such that if a large batch
// arrives and then a small batch arrives, the small batch doesn't have to wait
// until the entire large batch is completed; instead, tasks from both the large
// and small batches will be scheduled fairly and round-robin'd between. The
// queues created on a QueuedTaskScheduler are scheduled in just such a manner,
// also taking into account priorities. e.g.
//
//     QueuedTaskScheduler qts = new QueuedTaskScheduler();
//     TaskScheduler pri0a = qts.CreateQueue(priority:0);
//     TaskScheduler pri0b = qts.CreateQueue(priority:0);
//     TaskScheduler pri1 = qts.CreateQueue(priority:1);
//
// Tasks scheduled to both pri0a and pri0b will take priority over tasks
// scheduled to pri1. However, tasks scheduled to pri0a and pri0b will be
// round-robin'd between, as they exist at the same priority level.
//
// ## Concurrency Levels
//
// In a large system, you may want to control how much parallelism is afforded
// to different parts of the system. With parallel loops and PLINQ queries, you
// can control this on a per-loop or per-query basis, but out-of-the-box there's
// no way to control it across loops, and there's no built-in way to control it
// for tasks. By scheduling all related work to a TaskScheduler that enforces a
// maximum concurrency level, that functionality is gained.
//
//     var qts = new QueuedTaskScheduler(TaskScheduler.Default, maxConcurrencyLevel:4);
//     var options = new ParallelOptions { TaskScheduler = qts };
//
//     Task.Factory.StartNew(() =>
//     {
//         Parallel.For(0, 100, options, i=> { ... });
//     }, CancellationToken.None, TaskCreationOptions.None, qts);
//
//     Task.Factory.StartNew(() =>
//     {
//         Parallel.For(0, 100, options, i=> { ... });
//     }, CancellationToken.None, TaskCreationOptions.None, qts);
//
// Both tasks and the parallel loops they contain will be limited to a maximum
// concurrency level of four.
//
// ## Thread control
//
// The priorities, fairness, and concurrency level control all apply when
// QueuedTaskScheduler is used on top of another TaskScheduler as well as when
// used with dedicated threads for the scheduler. However, QueuedTaskScheduler
// also provides very low-level control over the threads utilized by the
// scheduler when dedicated threads are requested. Here's the relevant
// constructor, which provides insight into the various knobs provided:
//
//     public QueuedTaskScheduler(
//         int threadCount,
//         string threadName = "",
//         bool useForegroundThreads = false,
//         ThreadPriority threadPriority = ThreadPriority.Normal,
//         ApartmentState threadApartmentState = ApartmentState.MTA,
//         int threadMaxStackSize = 0,
//         Action threadInit = null,
//         Action threadFinally = null
//     );
//