Skip to main content

This recipe shows how to scale workload between a number of independent workers that both produce work and process it.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;

//
// Creating a Scalable Crawler with ConcurrentBag
//
// This recipe shows how to scale workload between a number of independent
// workers that both produce work and process it.
//
// How it works...
//
// The program simulates web-page indexing with multiple web crawlers. A web
// crawler is a program that opens a web page by its address, indexes the
// content, and tries to visit all the links that this page contains and index
// these linked pages as well. At the beginning, we define a dictionary
// containing different web-page URLs. This dictionary simulates web pages
// containing links to other pages. The implementation is very naive; it does
// not care about indexing already visited pages, but it is simple and allows us
// to focus on the concurrent workload.
//
// Then we create a concurrent bag, containing crawling tasks. We create four
// crawlers and provide a different site root URL to each of them. Then we wait
// for all crawlers to compete. Now, each crawler starts to index the site URL
// it was given. We simulate the network I/O process by waiting for some random
// amount of time; then if the page contains more URLs, the crawler posts more
// crawling tasks to the bag. Then, it checks whether there are any tasks left
// to crawl in the bag. If not, the crawler completes.
//
// If we check the output in the first lines below the first four, which were
// root URLs, we will see that usually a task posted by crawler number N is
// processed by the same crawler. However, the later lines will be different.
// This happens because internally ConcurrentBag is optimized for exactly this
// scenario where there are multiple threads that both add items and remove
// them. This is achieved by letting each thread work with its own local queue
// of items, and thus, we do not need any locks while this queue is occupied.
// Only when we have no items left in the local queue will we perform some
// locking and try to "steal" the work from another thread's local queue. This
// behavior helps to distribute the work between all workers and avoid locking.
//

namespace Chapter6.Recipe4
{
    class Program
    {
        static void Main(string[] args)
        {
            CreateLinks();
            Task t = RunProgram();
            t.Wait();
        }

        static Dictionary<string, string[]> _contentEmulation = new Dictionary<string, string[]>();

        static async Task RunProgram()
        {
            var bag = new ConcurrentBag<CrawlingTask>();

            string[] urls = new[] {"http://microsoft.com/", "http://google.com/", "http://facebook.com/", "http://twitter.com/"};

            var crawlers = new Task[4];
            for (int i = 1; i <= 4; i++)
            {
                string crawlerName = "Crawler " + i.ToString();
                bag.Add(new CrawlingTask { UrlToCrawl = urls[i-1], ProducerName = "root"});
                crawlers[i - 1] = Task.Run(() => Crawl(bag, crawlerName));
            }

            await Task.WhenAll(crawlers);
        }

        static async Task Crawl(ConcurrentBag<CrawlingTask> bag, string crawlerName)
        {
            CrawlingTask task;
            while (bag.TryTake(out task))
            {
                IEnumerable<string> urls = await GetLinksFromContent(task);
                if (urls != null)
                {
                    foreach (var url in urls)
                    {
                        var t = new CrawlingTask
                        {
                            UrlToCrawl = url,
                            ProducerName = crawlerName
                        };

                        bag.Add(t);
                    }
                }
                Console.WriteLine("Indexing url {0} posted by {1} is completed by {2}!",
                    task.UrlToCrawl, task.ProducerName, crawlerName);
            }
        }

        static async Task<IEnumerable<string>> GetLinksFromContent(CrawlingTask task)
        {
            await GetRandomDelay();

            if (_contentEmulation.ContainsKey(task.UrlToCrawl)) return _contentEmulation[task.UrlToCrawl];

            return null;
        }

        static void CreateLinks()
        {
            _contentEmulation["http://microsoft.com/"]       = new[] { "http://microsoft.com/a.html", "http://microsoft.com/b.html" };
            _contentEmulation["http://microsoft.com/a.html"] = new[] { "http://microsoft.com/c.html", "http://microsoft.com/d.html" };
            _contentEmulation["http://microsoft.com/b.html"] = new[] { "http://microsoft.com/e.html"                                };

            _contentEmulation["http://google.com/"]          = new[] { "http://google.com/a.html",    "http://google.com/b.html"    };
            _contentEmulation["http://google.com/a.html"]    = new[] { "http://google.com/c.html",    "http://google.com/d.html"    };
            _contentEmulation["http://google.com/b.html"]    = new[] { "http://google.com/e.html",    "http://google.com/f.html"    };
            _contentEmulation["http://google.com/c.html"]    = new[] { "http://google.com/h.html",    "http://google.com/i.html"    };

            _contentEmulation["http://facebook.com/"]        = new[] { "http://facebook.com/a.html",  "http://facebook.com/b.html"  };
            _contentEmulation["http://facebook.com/a.html"]  = new[] { "http://facebook.com/c.html",  "http://facebook.com/d.html"  };
            _contentEmulation["http://facebook.com/b.html"]  = new[] { "http://facebook.com/e.html"                                 };

            _contentEmulation["http://twitter.com/"]         = new[] { "http://twitter.com/a.html",    "http://twitter.com/b.html"  };
            _contentEmulation["http://twitter.com/a.html"]   = new[] { "http://twitter.com/c.html",    "http://twitter.com/d.html"  };
            _contentEmulation["http://twitter.com/b.html"]   = new[] { "http://twitter.com/e.html"                                  };
            _contentEmulation["http://twitter.com/c.html"]   = new[] { "http://twitter.com/f.html",    "http://twitter.com/g.html"  };
            _contentEmulation["http://twitter.com/d.html"]   = new[] { "http://twitter.com/h.html"                                  };
            _contentEmulation["http://twitter.com/e.html"]   = new[] { "http://twitter.com/i.html"                                  };
        }

        static Task GetRandomDelay()
        {
            int delay = new Random(DateTime.Now.Millisecond).Next(150, 200);
            return Task.Delay(delay);
        }

        class CrawlingTask
        {
            public string UrlToCrawl { get; set; }
            public string ProducerName { get; set; }
        }
    }
}