Skip to main content

A lock-free C# implementation of single-reader multi-writer circular buffer.

// -------------------------------------------------------------------
// CircularBuffer
// https://github.com/open-telemetry/opentelemetry-dotnet/blob/master/src/OpenTelemetry/Internal/CircularBuffer.cs
// -------------------------------------------------------------------

// <copyright file="CircularBuffer.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using System.Runtime.CompilerServices;
using System.Threading;

namespace OpenTelemetry.Internal
{
    /// <summary>
    /// Lock-free implementation of single-reader multi-writer circular buffer.
    /// </summary>
    /// <typeparam name="T">The type of the underlying value.</typeparam>
    internal class CircularBuffer<T>
        where T : class
    {
        private readonly T[] trait;
        private long head;
        private long tail;

        /// <summary>
        /// Initializes a new instance of the <see cref="CircularBuffer{T}"/> class.
        /// </summary>
        /// <param name="capacity">The capacity of the circular buffer, must be a positive integer.</param>
        public CircularBuffer(int capacity)
        {
            if (capacity <= 0)
            {
                throw new ArgumentOutOfRangeException(nameof(capacity), capacity, "capacity should be greater than zero.");
            }

            this.Capacity = capacity;
            this.trait = new T[capacity];
        }

        /// <summary>
        /// Gets the capacity of the <see cref="CircularBuffer{T}"/>.
        /// </summary>
        public int Capacity { get; }

        /// <summary>
        /// Gets the number of items contained in the <see cref="CircularBuffer{T}"/>.
        /// </summary>
        public int Count
        {
            get
            {
                var tailSnapshot = this.tail;
                return (int)(this.head - tailSnapshot);
            }
        }

        /// <summary>
        /// Gets the number of items added to the <see cref="CircularBuffer{T}"/>.
        /// </summary>
        public long AddedCount => this.head;

        /// <summary>
        /// Gets the number of items removed from the <see cref="CircularBuffer{T}"/>.
        /// </summary>
        public long RemovedCount => this.tail;

        /// <summary>
        /// Adds the specified item to the buffer.
        /// </summary>
        /// <param name="value">The value to add.</param>
        /// <returns>
        /// Returns <c>true</c> if the item was added to the buffer successfully;
        /// <c>false</c> if the buffer is full.
        /// </returns>
        public bool Add(T value)
        {
            if (value == null)
            {
                throw new ArgumentNullException(nameof(value));
            }

            while (true)
            {
                var tailSnapshot = this.tail;
                var headSnapshot = this.head;

                if (headSnapshot - tailSnapshot >= this.Capacity)
                {
                    return false; // buffer is full
                }

                var head = Interlocked.CompareExchange(ref this.head, headSnapshot + 1, headSnapshot);
                if (head != headSnapshot)
                {
                    continue;
                }

                var index = (int)(head % this.Capacity);
                this.trait[index] = value;
                return true;
            }
        }

        /// <summary>
        /// Attempts to add the specified item to the buffer.
        /// </summary>
        /// <param name="value">The value to add.</param>
        /// <param name="maxSpinCount">The maximum allowed spin count, when set to a negative number or zero, will spin indefinitely.</param>
        /// <returns>
        /// Returns <c>true</c> if the item was added to the buffer successfully;
        /// <c>false</c> if the buffer is full or the spin count exceeded <paramref name="maxSpinCount"/>.
        /// </returns>
        public bool TryAdd(T value, int maxSpinCount)
        {
            if (maxSpinCount <= 0)
            {
                return this.Add(value);
            }

            if (value == null)
            {
                throw new ArgumentNullException(nameof(value));
            }

            var spinCountDown = maxSpinCount;

            while (true)
            {
                var tailSnapshot = this.tail;
                var headSnapshot = this.head;

                if (headSnapshot - tailSnapshot >= this.Capacity)
                {
                    return false; // buffer is full
                }

                var head = Interlocked.CompareExchange(ref this.head, headSnapshot + 1, headSnapshot);
                if (head != headSnapshot)
                {
                    if (spinCountDown-- == 0)
                    {
                        return false; // exceeded maximum spin count
                    }

                    continue;
                }

                var index = (int)(head % this.Capacity);
                this.trait[index] = value;
                return true;
            }
        }

        /// <summary>
        /// Reads an item from the <see cref="CircularBuffer{T}"/>.
        /// </summary>
        /// <remarks>
        /// This function is not reentrant-safe, only one reader is allowed at any given time.
        /// Warning: There is no bounds check in this method. Do not call unless you have verified Count > 0.
        /// </remarks>
        /// <returns>Item read.</returns>
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public T Read()
        {
            var index = (int)(this.tail % this.Capacity);
            while (true)
            {
                var value = this.trait[index];
                if (value == null)
                {
                    // If we got here it means a writer isn't done.
                    continue;
                }

                this.trait[index] = null;
                this.tail++;
                return value;
            }
        }
    }
}

// -------------------------------------------------------------------
// Example Usage
// https://github.com/open-telemetry/opentelemetry-dotnet/blob/master/src/OpenTelemetry/Batch.cs
// -------------------------------------------------------------------

// <copyright file="Batch.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using OpenTelemetry.Internal;

namespace OpenTelemetry
{
    /// <summary>
    /// Stores a batch of completed <typeparamref name="T"/> objects to be exported.
    /// </summary>
    /// <typeparam name="T">The type of object in the <see cref="Batch{T}"/>.</typeparam>
    public readonly struct Batch<T>
        where T : class
    {
        private readonly T item;
        private readonly CircularBuffer<T> circularBuffer;
        private readonly int maxSize;

        internal Batch(T item)
        {
            this.item = item ?? throw new ArgumentNullException(nameof(item));
            this.circularBuffer = null;
            this.maxSize = 1;
        }

        internal Batch(CircularBuffer<T> circularBuffer, int maxSize)
        {
            Debug.Assert(maxSize > 0, $"{nameof(maxSize)} should be a positive number.");

            this.item = null;
            this.circularBuffer = circularBuffer ?? throw new ArgumentNullException(nameof(circularBuffer));
            this.maxSize = maxSize;
        }

        /// <summary>
        /// Returns an enumerator that iterates through the <see cref="Batch{T}"/>.
        /// </summary>
        /// <returns><see cref="Enumerator"/>.</returns>
        public Enumerator GetEnumerator()
        {
            return this.circularBuffer != null
                ? new Enumerator(this.circularBuffer, this.maxSize)
                : new Enumerator(this.item);
        }

        /// <summary>
        /// Enumerates the elements of a <see cref="Batch{T}"/>.
        /// </summary>
        public struct Enumerator : IEnumerator<T>
        {
            private readonly CircularBuffer<T> circularBuffer;
            private int count;

            internal Enumerator(T item)
            {
                this.Current = item;
                this.circularBuffer = null;
                this.count = -1;
            }

            internal Enumerator(CircularBuffer<T> circularBuffer, int maxSize)
            {
                this.Current = null;
                this.circularBuffer = circularBuffer;
                this.count = Math.Min(maxSize, circularBuffer.Count);
            }

            /// <inheritdoc/>
            public T Current { get; private set; }

            /// <inheritdoc/>
            object IEnumerator.Current => this.Current;

            /// <inheritdoc/>
            public void Dispose()
            {
            }

            /// <inheritdoc/>
            public bool MoveNext()
            {
                var circularBuffer = this.circularBuffer;

                if (circularBuffer == null)
                {
                    if (this.count >= 0)
                    {
                        this.Current = null;
                        return false;
                    }

                    this.count++;
                    return true;
                }

                if (this.count > 0)
                {
                    this.Current = circularBuffer.Read();
                    this.count--;
                    return true;
                }

                this.Current = null;
                return false;
            }

            /// <inheritdoc/>
            public void Reset()
                => throw new NotSupportedException();
        }
    }
}

// -------------------------------------------------------------------
// Unit Tests
// https://github.com/open-telemetry/opentelemetry-dotnet/blob/1f04397c26d64de7d10289baf21084486f3fcd23/test/OpenTelemetry.Tests/Internal/CircularBufferTest.cs
// -------------------------------------------------------------------

// <copyright file="CircularBufferTest.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using OpenTelemetry.Internal;
using Xunit;

namespace OpenTelemetry.Tests.Internal
{
    public class CircularBufferTest
    {
        [Fact]
        public void CheckInvalidArgument()
        {
            Assert.Throws<ArgumentOutOfRangeException>(() => new CircularBuffer<string>(0));
        }

        [Fact]
        public void CheckCapacity()
        {
            int capacity = 1;
            var circularBuffer = new CircularBuffer<string>(capacity);

            Assert.Equal(capacity, circularBuffer.Capacity);
        }

        [Fact]
        public void CheckNullValueWhenAdding()
        {
            int capacity = 1;
            var circularBuffer = new CircularBuffer<string>(capacity);
            Assert.Throws<ArgumentNullException>(() => circularBuffer.Add(null));
        }

        [Fact]
        public void CheckValueWhenAdding()
        {
            int capacity = 1;
            var circularBuffer = new CircularBuffer<string>(capacity);
            var result = circularBuffer.Add("a");
            Assert.True(result);
            Assert.Equal(1, circularBuffer.AddedCount);
            Assert.Equal(1, circularBuffer.Count);
        }

        [Fact]
        public void CheckBufferFull()
        {
            int capacity = 1;
            var circularBuffer = new CircularBuffer<string>(capacity);
            var result = circularBuffer.Add("a");
            Assert.True(result);
            Assert.Equal(1, circularBuffer.AddedCount);
            Assert.Equal(1, circularBuffer.Count);

            result = circularBuffer.Add("b");
            Assert.False(result);
            Assert.Equal(1, circularBuffer.AddedCount);
            Assert.Equal(1, circularBuffer.Count);
        }

        [Fact]
        public void CheckRead()
        {
            string value = "a";
            int capacity = 1;
            var circularBuffer = new CircularBuffer<string>(capacity);
            var result = circularBuffer.Add(value);
            Assert.True(result);
            Assert.Equal(1, circularBuffer.AddedCount);
            Assert.Equal(1, circularBuffer.Count);

            string read = circularBuffer.Read();
            Assert.Equal(value, read);
            Assert.Equal(1, circularBuffer.AddedCount);
            Assert.Equal(1, circularBuffer.RemovedCount);
            Assert.Equal(0, circularBuffer.Count);
        }

        [Fact]
        public void CheckAddedCountAndCount()
        {
            int capacity = 2;
            var circularBuffer = new CircularBuffer<string>(capacity);
            var result = circularBuffer.Add("a");
            Assert.True(result);
            Assert.Equal(1, circularBuffer.AddedCount);
            Assert.Equal(1, circularBuffer.Count);

            result = circularBuffer.Add("a");
            Assert.True(result);
            Assert.Equal(2, circularBuffer.AddedCount);
            Assert.Equal(2, circularBuffer.Count);

            _ = circularBuffer.Read();
            Assert.Equal(2, circularBuffer.AddedCount);
            Assert.Equal(1, circularBuffer.RemovedCount);
            Assert.Equal(1, circularBuffer.Count);
        }
    }
}