Skip to main content

Provides for asynchronous exclusive and concurrent execution support.

//--------------------------------------------------------------------------
//
//  Copyright (c) Microsoft Corporation.  All rights reserved.
//
//  File: AsyncReaderWriter.cs
//
//--------------------------------------------------------------------------

using System.Collections.Generic;
using System.Threading.Tasks;
using System.Diagnostics;

namespace System.Threading.Async
{
    /// <summary>
    /// Provides for asynchronous exclusive and concurrent execution support.
    /// </summary>
    [DebuggerDisplay("WaitingConcurrent={WaitingConcurrent}, WaitingExclusive={WaitingExclusive}, CurrentReaders={CurrentConcurrent}, Exclusive={CurrentlyExclusive}")]
    public sealed class AsyncReaderWriter
    {
        /// <summary>The lock that protects all shared state in this instance.</summary>
        private readonly object _lock = new object();

        /// <summary>The queue of concurrent readers waiting to execute.</summary>
        private readonly Queue<Task> _waitingConcurrent = new Queue<Task>();

        /// <summary>The queue of exclusive writers waiting to execute.</summary>
        private readonly Queue<Task> _waitingExclusive = new Queue<Task>();

        /// <summary>The number of concurrent readers currently executing.</summary>
        private int _currentConcurrent;

        /// <summary>The number of exclusive writers currently executing.</summary>
        private bool _currentlyExclusive;

        /// <summary>The non-generic factory to use for task creation.</summary>
        private readonly TaskFactory _factory;

        /// <summary>Initializes the ReaderWriterAsync.</summary>
        public AsyncReaderWriter()
        {
            _factory = Task.Factory;
        }

        /// <summary>
        /// Initializes the ReaderWriterAsync with the specified TaskFactory
        /// for us in creating all tasks.
        /// </summary>
        /// <param name="factory">The TaskFactory to use to create all tasks.</param>
        public AsyncReaderWriter(TaskFactory factory)
        {
            _factory = factory ?? throw new ArgumentNullException(nameof(factory));
        }

        /// <summary>Gets the number of exclusive operations currently queued.</summary>
        public int WaitingExclusive
        {
            get
            {
                lock (_lock)
                {
                    return _waitingExclusive.Count;
                }
            }
        }

        /// <summary>Gets the number of concurrent operations currently queued.</summary>
        public int WaitingConcurrent
        {
            get
            {
                lock (_lock)
                {
                    return _waitingConcurrent.Count;
                }
            }
        }

        /// <summary>Gets the number of concurrent operations currently executing.</summary>
        public int CurrentConcurrent
        {
            get
            {
                lock (_lock)
                {
                    return _currentConcurrent;
                }
            }
        }

        /// <summary>Gets whether an exclusive operation is currently executing.</summary>
        public bool CurrentlyExclusive
        {
            get
            {
                lock (_lock)
                {
                    return _currentlyExclusive;
                }
            }
        }

        /// <summary>Queues an exclusive writer action to the ReaderWriterAsync.</summary>
        /// <param name="action">The action to be executed exclusively.</param>
        /// <returns>A Task that represents the execution of the provided action.</returns>
        public Task QueueExclusiveWriter(Action action)
        {
            // Create the task.  This Task will be started by the coordination primitive
            // when it's safe to do so, e.g. when there are no other tasks associated
            // with this async primitive executing.
            var task = _factory.Create(state =>
            {
                // Run the user-provided action
                try
                {
                    ((Action) state)();
                }
                // Ensure that we clean up when we're done
                finally
                {
                    FinishExclusiveWriter();
                }
            }, action);

            // Now that we've created the task, we need to do something with it, either queueing it or scheduling it immediately
            lock (_lock)
            {
                // If there's already a task running, or if there are any other exclusive tasks that need to run,
                // queue it.  Otherwise, no one else is running or wants to run, so schedule it now.
                if (_currentlyExclusive || _currentConcurrent > 0 || _waitingExclusive.Count > 0)
                {
                    _waitingExclusive.Enqueue(task);
                }
                else
                {
                    RunExclusive_RequiresLock(task);
                }
            }

            // Return the created task for the caller to track.
            return task;
        }

        /// <summary>Queues an exclusive writer function to the ReaderWriterAsync.</summary>
        /// <param name="function">The function to be executed exclusively.</param>
        /// <returns>A Task that represents the execution of the provided function.</returns>
        public Task<TResult> QueueExclusiveWriter<TResult>(Func<TResult> function)
        {
            // Create the task.  This Task will be started by the coordination primitive
            // when it's safe to do so, e.g. when there are no other tasks associated
            // with this async primitive executing.
            var task = _factory.Create(state =>
            {
                // Run the user-provided function
                try
                {
                    return ((Func<TResult>) state)();
                }
                // Ensure that we clean up when we're done
                finally
                {
                    FinishExclusiveWriter();
                }
            }, function);

            // Now that we've created the task, we need to do something with it, either queueing it or scheduling it immediately
            lock (_lock)
            {
                // If there's already a task running, or if there are any other exclusive tasks that need to run,
                // queue it.  Otherwise, no one else is running or wants to run, so schedule it now.
                if (_currentlyExclusive || _currentConcurrent > 0 || _waitingExclusive.Count > 0)
                {
                    _waitingExclusive.Enqueue(task);
                }
                else
                {
                    RunExclusive_RequiresLock(task);
                }
            }

            // Return the created task for the caller to track.
            return task;
        }

        /// <summary>Queues a concurrent reader action to the ReaderWriterAsync.</summary>
        /// <param name="action">The action to be executed concurrently.</param>
        /// <returns>A Task that represents the execution of the provided action.</returns>
        public Task QueueConcurrentReader(Action action)
        {
            // Create the task.  This Task will be started by the coordination primitive
            // when it's safe to do so, e.g. when there are no exclusive tasks running
            // or waiting to run.
            var task = _factory.Create(state =>
            {
                // Run the user-provided action
                try
                {
                    ((Action) state)();
                }
                // Ensure that we clean up when we're done
                finally
                {
                    FinishConcurrentReader();
                }
            }, action);

            // Now that we've created the task, we need to do something with it, either queueing it or scheduling it immediately
            lock (_lock)
            {
                // If there are any exclusive tasks running or waiting, queue the concurrent task
                if (_currentlyExclusive || _waitingExclusive.Count > 0)
                {
                    _waitingConcurrent.Enqueue(task);
                }
                // Otherwise schedule it immediately
                else
                {
                    RunConcurrent_RequiresLock(task);
                }
            }

            // Return the task to the caller.
            return task;
        }

        /// <summary>Queues a concurrent reader function to the ReaderWriterAsync.</summary>
        /// <param name="function">The function to be executed concurrently.</param>
        /// <returns>A Task that represents the execution of the provided function.</returns>
        public Task<TResult> QueueConcurrentReader<TResult>(Func<TResult> function)
        {
            // Create the task.  This Task will be started by the coordination primitive
            // when it's safe to do so, e.g. when there are no exclusive tasks running
            // or waiting to run.
            var task = _factory.Create(state =>
            {
                // Run the user-provided function
                try
                {
                    return ((Func<TResult>) state)();
                }
                // Ensure that we clean up when we're done
                finally
                {
                    FinishConcurrentReader();
                }
            }, function);

            // Now that we've created the task, we need to do something with it, either queueing it or scheduling it immediately
            lock (_lock)
            {
                // If there are any exclusive tasks running or waiting, queue the concurrent task
                if (_currentlyExclusive || _waitingExclusive.Count > 0)
                {
                    _waitingConcurrent.Enqueue(task);
                }
                // Otherwise schedule it immediately
                else
                {
                    RunConcurrent_RequiresLock(task);
                }
            }

            // Return the task to the caller.
            return task;
        }

        /// <summary>Starts the specified exclusive task.</summary>
        /// <param name="exclusive">The exclusive task to be started.</param>
        /// <remarks>This must only be executed while holding the instance's lock.</remarks>
        private void RunExclusive_RequiresLock(Task exclusive)
        {
            _currentlyExclusive = true;
            exclusive.Start(_factory.GetTargetScheduler());
        }

        /// <summary>Starts the specified concurrent task.</summary>
        /// <param name="concurrent">The exclusive task to be started.</param>
        /// <remarks>This must only be executed while holding the instance's lock.</remarks>
        private void RunConcurrent_RequiresLock(Task concurrent)
        {
            _currentConcurrent++;
            concurrent.Start(_factory.GetTargetScheduler());
        }

        /// <summary>Starts all queued concurrent tasks.</summary>
        /// <remarks>This must only be executed while holding the instance's lock.</remarks>
        private void RunConcurrent_RequiresLock()
        {
            while (_waitingConcurrent.Count > 0)
            {
                RunConcurrent_RequiresLock(_waitingConcurrent.Dequeue());
            }
        }

        /// <summary>Completes the processing of a concurrent reader.</summary>
        private void FinishConcurrentReader()
        {
            lock (_lock)
            {
                // Update the tracking count of the number of concurrently executing tasks
                _currentConcurrent--;

                // If we've now hit zero tasks running concurrently and there are any waiting writers, run one of them
                if (_currentConcurrent == 0 && _waitingExclusive.Count > 0)
                {
                    RunExclusive_RequiresLock(_waitingExclusive.Dequeue());
                }

                // Otherwise, if there are no waiting writers but there are waiting readers for some reason (they should
                // have started when they were added by the user), run all concurrent tasks waiting.
                else if (_waitingExclusive.Count == 0 && _waitingConcurrent.Count > 0)
                {
                    RunConcurrent_RequiresLock();
                }
            }
        }

        /// <summary>Completes the processing of an exclusive writer.</summary>
        private void FinishExclusiveWriter()
        {
            lock (_lock)
            {
                // We're no longer executing exclusively, though this might get reversed shortly
                _currentlyExclusive = false;

                // If there are any more waiting exclusive tasks, run the next one in line
                if (_waitingExclusive.Count > 0)
                {
                    RunExclusive_RequiresLock(_waitingExclusive.Dequeue());
                }

                // Otherwise, if there are any waiting concurrent tasks, run them all
                else if (_waitingConcurrent.Count > 0)
                {
                    RunConcurrent_RequiresLock();
                }
            }
        }
    }
}