A light fixed size queue. If Enqueue is called and queue's limit has reached the last item will be removed. This data structure is thread safe.
// ----------------------------------------------------------------------------
// FixedSizeQueue.cs
// https://github.com/Azure/azure-functions-core-tools/blob/dev/src/Azure.Functions.Cli/Telemetry/PersistenceChannel/FixedSizeQueue.cs
//
// Copyright (c) .NET Foundation and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// ----------------------------------------------------------------------------
using System.Collections.Generic;
namespace Azure.Functions.Cli.Telemetry.PersistenceChannel
{
/// <summary>
/// A light fixed size queue. If Enqueue is called and queue's limit has
/// reached the last item will be removed. This data structure is thread safe.
/// </summary>
internal class FixedSizeQueue<T>
{
private readonly int _maxSize;
private readonly Queue<T> _queue = new Queue<T>();
private readonly object _lock = new object();
internal FixedSizeQueue(int maxSize)
{
_maxSize = maxSize;
}
internal void Enqueue(T item)
{
lock (_lock)
{
if (_queue.Count == _maxSize)
{
_queue.Dequeue();
}
_queue.Enqueue(item);
}
}
internal bool Contains(T item)
{
lock (_lock)
{
return _queue.Contains(item);
}
}
}
}
//
// Example
// ----------------------------------------------------------------------------
// StorageService.cs
// https://github.com/Azure/azure-functions-core-tools/blob/dev/src/Azure.Functions.Cli/Telemetry/PersistenceChannel/StorageService.cs
//
// Copyright (c) .NET Foundation and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// ----------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Functions.Cli.Common;
using Microsoft.ApplicationInsights.Channel;
namespace Azure.Functions.Cli.Telemetry.PersistenceChannel
{
internal sealed class StorageService : BaseStorageService
{
private const string DefaultStorageFolderName = "TelemetryStorageService";
private readonly FixedSizeQueue<string> _deletedFilesQueue = new FixedSizeQueue<string>(10);
private readonly object _peekLockObj = new object();
private readonly object _storageFolderLock = new object();
private string _storageDirectoryPath;
private string _storageDirectoryPathUsed;
private long _storageCountFiles;
private bool _storageFolderInitialized;
private long _storageSize;
private uint _transmissionsDropped;
/// <summary>
/// Gets the storage's folder name.
/// </summary>
internal override string StorageDirectoryPath => _storageDirectoryPath;
/// <summary>
/// Gets the storage folder. If storage folder couldn't be created, null will be returned.
/// </summary>
private string StorageFolder
{
get
{
if (!_storageFolderInitialized)
{
lock (_storageFolderLock)
{
if (!_storageFolderInitialized)
{
try
{
_storageDirectoryPathUsed = _storageDirectoryPath;
if (!Directory.Exists(_storageDirectoryPathUsed))
{
Directory.CreateDirectory(_storageDirectoryPathUsed);
}
}
catch (Exception e)
{
_storageDirectoryPathUsed = null;
PersistenceChannelDebugLog.WriteException(e, "Failed to create storage folder");
}
_storageFolderInitialized = true;
}
}
}
return _storageDirectoryPathUsed;
}
}
internal override void Init(string storageDirectoryPath)
{
PeekedTransmissions = new SnapshottingDictionary<string, string>();
VerifyOrSetDefaultStorageDirectoryPath(storageDirectoryPath);
CapacityInBytes = 10 * 1024 * 1024; // 10 MB
MaxFiles = 100;
Task.Run((Action) DeleteObsoleteFiles)
.ContinueWith(
task =>
{
PersistenceChannelDebugLog.WriteException(
task.Exception,
"Storage: Unhandled exception in DeleteObsoleteFiles");
},
TaskContinuationOptions.OnlyOnFaulted);
}
private void VerifyOrSetDefaultStorageDirectoryPath(string desireStorageDirectoryPath)
{
if (string.IsNullOrEmpty(desireStorageDirectoryPath))
{
string storageDir = Path.Combine(Utilities.EnsureCoreToolsLocalData(), ".telemetry");
FileSystemHelpers.EnsureDirectory(storageDir);
_storageDirectoryPath = Path.GetFullPath(storageDir);
}
else
{
if (!Path.IsPathRooted(desireStorageDirectoryPath))
{
throw new ArgumentException($"{nameof(desireStorageDirectoryPath)} need to be rooted (full path)");
}
_storageDirectoryPath = desireStorageDirectoryPath;
}
}
/// <summary>
/// Reads an item from the storage. Order is Last-In-First-Out.
/// When the Transmission is no longer needed (it was either sent or failed with a non-retryable error) it should be
/// disposed.
/// </summary>
internal override StorageTransmission Peek()
{
IEnumerable<string> files = GetFiles("*.trn", 50);
lock (_peekLockObj)
{
foreach (string file in files)
{
try
{
// if a file was peeked before, skip it (wait until it is disposed).
if (PeekedTransmissions.ContainsKey(file) == false &&
_deletedFilesQueue.Contains(file) == false)
{
// Load the transmission from disk.
StorageTransmission storageTransmissionItem = LoadTransmissionFromFileAsync(file)
.ConfigureAwait(false).GetAwaiter().GetResult();
// when item is disposed it should be removed from the peeked list.
storageTransmissionItem.Disposing = item => OnPeekedItemDisposed(file);
// add the transmission to the list.
PeekedTransmissions.Add(file, storageTransmissionItem.FullFilePath);
return storageTransmissionItem;
}
}
catch (Exception e)
{
PersistenceChannelDebugLog.WriteException(
e,
"Failed to load an item from the storage. file: {0}",
file);
}
}
}
return null;
}
internal override void Delete(StorageTransmission item)
{
try
{
if (StorageFolder == null)
{
return;
}
// Initial storage size calculation.
CalculateSize();
long fileSize = GetSize(item.FileName);
File.Delete(Path.Combine(StorageFolder, item.FileName));
_deletedFilesQueue.Enqueue(item.FileName);
// calculate size
Interlocked.Add(ref _storageSize, -fileSize);
Interlocked.Decrement(ref _storageCountFiles);
}
catch (IOException e)
{
PersistenceChannelDebugLog.WriteException(e, "Failed to delete a file. file: {0}", item == null ? "null" : item.FullFilePath);
}
}
internal override async Task EnqueueAsync(Transmission transmission)
{
try
{
if (transmission == null || StorageFolder == null)
{
return;
}
// Initial storage size calculation.
CalculateSize();
if ((ulong)_storageSize >= CapacityInBytes || _storageCountFiles >= MaxFiles)
{
// if max storage capacity has reached, drop the transmission (but log every 100 lost transmissions).
if (_transmissionsDropped++ % 100 == 0)
{
PersistenceChannelDebugLog.WriteLine("Total transmissions dropped: " + _transmissionsDropped);
}
return;
}
// Writes content to a temporary file and only then rename to avoid the Peek from reading the file before it is being written.
// Creates the temp file name
string tempFileName = Guid.NewGuid().ToString("N");
// Now that the file got created we can increase the files count
Interlocked.Increment(ref _storageCountFiles);
// Saves transmission to the temp file
await SaveTransmissionToFileAsync(transmission, tempFileName).ConfigureAwait(false);
// Now that the file is written increase storage size.
long temporaryFileSize = GetSize(tempFileName);
Interlocked.Add(ref _storageSize, temporaryFileSize);
// Creates a new file name
string now = DateTime.UtcNow.ToString("yyyyMMddHHmmss");
string newFileName = string.Format(CultureInfo.InvariantCulture, "{0}_{1}.trn", now, tempFileName);
// Renames the file
File.Move(Path.Combine(StorageFolder, tempFileName), Path.Combine(StorageFolder, newFileName));
}
catch (Exception e)
{
PersistenceChannelDebugLog.WriteException(e, "EnqueueAsync");
}
}
private async Task SaveTransmissionToFileAsync(Transmission transmission, string file)
{
try
{
using (Stream stream = File.OpenWrite(Path.Combine(StorageFolder, file)))
{
await StorageTransmission.SaveAsync(transmission, stream).ConfigureAwait(false);
}
}
catch (UnauthorizedAccessException)
{
string message =
string.Format(
"Failed to save transmission to file. UnauthorizedAccessException. File path: {0}, FileName: {1}",
StorageFolder, file);
PersistenceChannelDebugLog.WriteLine(message);
throw;
}
}
private async Task<StorageTransmission> LoadTransmissionFromFileAsync(string file)
{
try
{
using (Stream stream = File.OpenRead(Path.Combine(StorageFolder, file)))
{
StorageTransmission storageTransmissionItem =
await StorageTransmission.CreateFromStreamAsync(stream, file).ConfigureAwait(false);
return storageTransmissionItem;
}
}
catch (Exception e)
{
string message =
string.Format(
"Failed to load transmission from file. File path: {0}, FileName: {1}, Exception: {2}",
"storageFolderName", file, e);
PersistenceChannelDebugLog.WriteLine(message);
throw;
}
}
/// <summary>
/// Get files from <see cref="storageFolder" />.
/// </summary>
/// <param name="fileQuery">Define the logic for sorting the files.</param>
/// <param name="filterByExtension">Defines a file extension. This method will return only files with this extension.</param>
/// <param name="top">
/// Define how many files to return. This can be useful when the directory has a lot of files, in that case
/// GetFilesAsync will have a performance hit.
/// </param>
private IEnumerable<string> GetFiles(string filterByExtension, int top)
{
try
{
if (StorageFolder != null)
{
return Directory.GetFiles(StorageFolder, filterByExtension).Take(top);
}
}
catch (Exception e)
{
PersistenceChannelDebugLog.WriteException(e, "Peek failed while get files from storage.");
}
return Enumerable.Empty<string>();
}
/// <summary>
/// Gets a file's size.
/// </summary>
private long GetSize(string file)
{
using (FileStream stream = File.OpenRead(Path.Combine(StorageFolder, file)))
{
return stream.Length;
}
}
/// <summary>
/// Check the storage limits and return true if they reached.
/// Storage limits are defined by the number of files and the total size on disk.
/// </summary>
private void CalculateSize()
{
string[] storageFiles = Directory.GetFiles(StorageFolder, "*.*");
_storageCountFiles = storageFiles.Count();
long storageSizeInBytes = 0;
foreach (string file in storageFiles)
{
storageSizeInBytes += GetSize(file);
}
_storageSize = storageSizeInBytes;
}
/// <summary>
/// Enqueue is saving a transmission to a <c>tmp</c> file and after a successful write operation it renames it to a
/// <c>trn</c> file.
/// A file without a <c>trn</c> extension is ignored by Storage.Peek(), so if a process is taken down before rename
/// happens
/// it will stay on the disk forever.
/// This thread deletes files with the <c>tmp</c> extension that exists on disk for more than 5 minutes.
/// </summary>
private void DeleteObsoleteFiles()
{
try
{
IEnumerable<string> files = GetFiles("*.tmp", 50);
foreach (string file in files)
{
DateTime creationTime = File.GetCreationTimeUtc(Path.Combine(StorageFolder, file));
// if the file is older then 5 minutes - delete it.
if (DateTime.UtcNow - creationTime >= TimeSpan.FromMinutes(5))
{
File.Delete(Path.Combine(StorageFolder, file));
}
}
}
catch (Exception e)
{
PersistenceChannelDebugLog.WriteException(e, "Failed to delete tmp files.");
}
}
}
}