Skip to main content

For each element in an enumerable, run a function that returns a Task<TResult> to represent the completion of processing that element. All of these functions may run asynchronously concurrently. As each task completes, run a second processing action over the results. All of these actions must be run sequentially, but order doesn't matter.

// ===========================================================================
// Implementing a simple ForEachAsync
//
// For each element in an enumerable, run a function that returns a
// Task<TResult> to represent the completion of processing that element. All of
// these functions may run asynchronously concurrently.
//
// As each task completes, run a second processing action over the results. All
// of these actions must be run sequentially, but order doesn't matter.
//
// We instantiate a semaphore initialized with a count of 1, and we use this to
// throttle the follow-up actions to ensure that only one at a time runs. We
// then call the ProcessAsync method for each element, passing in the element,
// the semaphore, and the delegates to invoke.  Inside ProcessAsync, we first
// run the function and await its completion.  Once it's complete, we acquire
// the semaphore to ensure that we run the result processing function
// sequentially with regards to all the other processing function.
//
// Part 1: http://blogs.msdn.com/b/pfxteam/archive/2012/03/04/10277325.aspx
// Part 2: http://blogs.msdn.com/b/pfxteam/archive/2012/03/05/10278165.aspx
// ===========================================================================

//
// Part 1:
public static class IEnumerableExtensions
{
    public static Task ForEachAsync<TSource, TResult>(
        this IEnumerable<TSource> source,
        Func<TSource, Task<TResult>> taskSelector, Action<TSource, TResult> resultProcessor)
    {
        var oneAtATime = new SemaphoreSlim(initialCount: 1, maxCount: 1);

        return Task.WhenAll(
                   from item in source
                   select ProcessAsync(item, taskSelector, resultProcessor, oneAtATime));
    }

    private static async Task ProcessAsync<TSource, TResult>(
        TSource item,
        Func<TSource, Task<TResult>> taskSelector, Action<TSource, TResult> resultProcessor,
        SemaphoreSlim oneAtATime)
    {
        TResult result = await taskSelector(item);
        await oneAtATime.WaitAsync();

        try
        {
            resultProcessor(item, result);
        }
        finally
        {
            oneAtATime.Release();
        }
    }
}

//
// ---------------------------------------------------------------------------
// EXAMPLE USAGE
// ---------------------------------------------------------------------------
//

//
// NOTE: Example implementation is from here:
// http://stackoverflow.com/a/19201419
//

private async void MainWindow_Loaded(object sender, RoutedEventArgs e)
{
    IEnumerable<string> enumerable = your urls here;
    var results = new List<Tuple<string, string, Exception>>();
    await enumerable.ForEachAsync(s => DownloadFileTaskAsync(s, null, 1000), (url, t) => results.Add(t));
}

/// <summary>
///     Downloads a file from a specified Internet address.
/// </summary>
/// <param name="remotePath">Internet address of the file to download.</param>
/// <param name="localPath">
///     Local file name where to store the content of the download, if null a temporary file name will
///     be generated.
/// </param>
/// <param name="timeOut">Duration in miliseconds before cancelling the  operation.</param>
/// <returns>A tuple containing the remote path, the local path and an exception if one occurred.</returns>
private static async Task<Tuple<string, string, Exception>> DownloadFileTaskAsync(string remotePath, string localPath = null, int timeOut = 3000)
{
    try
    {
        if (remotePath == null)
        {
            Debug.WriteLine("DownloadFileTaskAsync (null remote path): skipping");
            throw new ArgumentNullException("remotePath");
        }

        if (localPath == null)
        {
            Debug.WriteLine(
                string.Format(
                    "DownloadFileTaskAsync (null local path): generating a temporary file name for {0}",
                    remotePath));
            localPath = Path.GetTempFileName();
        }

        using (var client = new WebClient())
        {
            TimerCallback timerCallback = c =>
            {
                var webClient = (WebClient) c;
                if (!webClient.IsBusy) { return; }
                webClient.CancelAsync();
                Debug.WriteLine(string.Format("DownloadFileTaskAsync (time out due): {0}", remotePath));
            };

            using (var timer = new Timer(timerCallback, client, timeOut, Timeout.Infinite))
            {
                await client.DownloadFileTaskAsync(remotePath, localPath);
            }

            Debug.WriteLine(string.Format("DownloadFileTaskAsync (downloaded): {0}", remotePath));

            return new Tuple<string, string, Exception>(remotePath, localPath, null);
        }
    }
    catch (Exception ex)
    {
        return new Tuple<string, string, Exception>(remotePath, null, ex);
    }
}