Skip to main content

C# class that provides an asynchronous producer/consumer collection.

//--------------------------------------------------------------------------
//
//  Copyright (c) Microsoft Corporation.  All rights reserved.
//
//  File: AsyncProducerConsumerCollection.cs
//
//--------------------------------------------------------------------------

using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Tasks;

namespace System.Threading.Async
{
    /// <summary>
    /// Provides an asynchronous producer/consumer collection.
    /// </summary>
    [DebuggerDisplay("Count={CurrentCount}")]
    public sealed class AsyncProducerConsumerCollection<T> : IDisposable
    {
        /// <summary>Asynchronous semaphore used to keep track of asynchronous work.</summary>
        private AsyncSemaphore _semaphore = new AsyncSemaphore();

        /// <summary>The data stored in the collection.</summary>
        private readonly IProducerConsumerCollection<T> _collection;

        /// <inheritdoc />
        /// <summary>Initializes the asynchronous producer/consumer collection to store data in a first-in-first-out (FIFO) order.</summary>
        public AsyncProducerConsumerCollection() : this(new ConcurrentQueue<T>())
        {
        }

        /// <summary>Initializes the asynchronous producer/consumer collection.</summary>
        /// <param name="collection">The underlying collection to use to store data.</param>
        public AsyncProducerConsumerCollection(IProducerConsumerCollection<T> collection)
        {
            _collection = collection ?? throw new ArgumentNullException(nameof(collection));
        }

        /// <summary>Adds an element to the collection.</summary>
        /// <param name="item">The item to be added.</param>
        public void Add(T item)
        {
            if (_collection.TryAdd(item))
            {
                _semaphore.Release();
            }
            else
            {
                throw new InvalidOperationException("Invalid collection");
            }
        }

        /// <summary>Takes an element from the collection asynchronously.</summary>
        /// <returns>A Task that represents the element removed from the collection.</returns>
        public Task<T> Take()
        {
            return _semaphore.WaitAsync().ContinueWith(_ =>
                {
                    T result;
                    if (!_collection.TryTake(out result)) throw new InvalidOperationException("Invalid collection");
                    return result;
                }, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion);
        }

        /// <summary>Gets the number of elements in the collection.</summary>
        public int Count
        {
            get { return _collection.Count; }
        }

        /// <summary>Disposes of the collection.</summary>
        public void Dispose()
        {
            if (_semaphore == null) return;
            _semaphore.Dispose();
            _semaphore = null;
        }
    }
}