Skip to main content

C# task scheduler that queues tasks and runs them in one dedicated thread.

namespace TomsToolbox.Desktop
{
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Threading;
    using System.Threading.Tasks;

    using JetBrains.Annotations;

    /// <summary>
    /// A <see cref="System.Threading.Tasks.TaskScheduler" /> that queues the tasks an runs them in one dedicated thread.
    /// </summary>
    public sealed class ThreadBoundTaskScheduler : TaskScheduler, IDisposable
    {
        [NotNull]
        private readonly BlockingCollection<Task> _tasksCollection = new BlockingCollection<Task>();
        [NotNull]
        private readonly Thread _thread;

        /// <summary>
        /// Initializes a new instance of the <see cref="ThreadBoundTaskScheduler"/> class.
        /// </summary>
        public ThreadBoundTaskScheduler()
        {
            _thread = new Thread(Execute);
            _thread.Start();

            TaskFactory = new TaskFactory(this);
        }

        /// <summary>
        /// Gets the thread identifier of the underlying thread.
        /// </summary>
        public int ThreadId => _thread.ManagedThreadId;

        /// <summary>
        /// Gets the task factory that can be used to enqueue a new task.
        /// </summary>
        [NotNull]
        public TaskFactory TaskFactory { get; }

        private void Execute()
        {
            foreach (var task in _tasksCollection.GetConsumingEnumerable())
            {
                TryExecuteTask(task);
            }
        }

        /// <summary>
        /// For debugger support only, generates an enumerable of <see cref="T:System.Threading.Tasks.Task" /> instances currently queued to the scheduler waiting to be executed.
        /// </summary>
        /// <returns>
        /// An enumerable that allows a debugger to traverse the tasks currently queued to this scheduler.
        /// </returns>
        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return _tasksCollection.ToArray();
        }

        /// <summary>
        /// Queues a <see cref="T:System.Threading.Tasks.Task" /> to the scheduler.
        /// </summary>
        /// <param name="task">The <see cref="T:System.Threading.Tasks.Task" /> to be queued.</param>
        protected override void QueueTask(Task task)
        {
            _tasksCollection.Add(task);
        }

        /// <summary>
        /// Determines whether the provided <see cref="T:System.Threading.Tasks.Task" /> can be executed synchronously in this call, and if it can, executes it.
        /// </summary>
        /// <param name="task">The <see cref="T:System.Threading.Tasks.Task" /> to be executed.</param>
        /// <param name="taskWasPreviouslyQueued">A Boolean denoting whether or not task has previously been queued. If this parameter is True, then the task may have been previously queued (scheduled); if False, then the task is known not to have been queued, and this call is being made in order to execute the task inline without queuing it.</param>
        /// <returns>
        /// A Boolean value indicating whether the task was executed inline.
        /// </returns>
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            return false;
        }

        /// <summary>
        /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
        /// </summary>
        public void Dispose()
        {
            _tasksCollection.CompleteAdding();
            _thread.Join();
            _tasksCollection.Dispose();
        }
    }
}

// -----------------------------------------------------------------
// Tests
// -----------------------------------------------------------------

namespace TomsToolbox.Desktop.Tests
{
    using System.Collections.Concurrent;
    using System.Linq;
    using System.Threading;

    using Microsoft.VisualStudio.TestTools.UnitTesting;

    [TestClass]
    public class TaskSchedulerTests
    {
        [TestMethod]
        public void ThreadBoundTaskSchedulerTest()
        {
            var stack = new ConcurrentStack<string>();
            var thisId = Thread.CurrentThread.ManagedThreadId;
            int schedulerId;

            using (var taskScheduler = new ThreadBoundTaskScheduler())
            {
                var factory = taskScheduler.TaskFactory;
                schedulerId = taskScheduler.ThreadId;

                factory.StartNew(() => { stack.Push("1: " + Thread.CurrentThread.ManagedThreadId); Thread.Sleep(100); });
                factory.StartNew(() => { stack.Push("2: " + Thread.CurrentThread.ManagedThreadId); Thread.Sleep(100); });
                factory.StartNew(() => { stack.Push("3: " + Thread.CurrentThread.ManagedThreadId); Thread.Sleep(100); });
                factory.StartNew(() => { stack.Push("4: " + Thread.CurrentThread.ManagedThreadId); Thread.Sleep(100); });
            }

            Assert.AreEqual(4, stack.Count);
            Assert.AreNotEqual(thisId, schedulerId);
            Assert.IsTrue(stack.Reverse().SequenceEqual(Enumerable.Range(1, 4).Select(i => i + ": " + schedulerId)));
        }
    }
}