This recipe shows how to scale workload between a number of independent workers that both produce work and process it.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
//
// Creating a Scalable Crawler with ConcurrentBag
//
// This recipe shows how to scale workload between a number of independent
// workers that both produce work and process it.
//
// How it works...
//
// The program simulates web-page indexing with multiple web crawlers. A web
// crawler is a program that opens a web page by its address, indexes the
// content, and tries to visit all the links that this page contains and index
// these linked pages as well. At the beginning, we define a dictionary
// containing different web-page URLs. This dictionary simulates web pages
// containing links to other pages. The implementation is very naive; it does
// not care about indexing already visited pages, but it is simple and allows us
// to focus on the concurrent workload.
//
// Then we create a concurrent bag, containing crawling tasks. We create four
// crawlers and provide a different site root URL to each of them. Then we wait
// for all crawlers to compete. Now, each crawler starts to index the site URL
// it was given. We simulate the network I/O process by waiting for some random
// amount of time; then if the page contains more URLs, the crawler posts more
// crawling tasks to the bag. Then, it checks whether there are any tasks left
// to crawl in the bag. If not, the crawler completes.
//
// If we check the output in the first lines below the first four, which were
// root URLs, we will see that usually a task posted by crawler number N is
// processed by the same crawler. However, the later lines will be different.
// This happens because internally ConcurrentBag is optimized for exactly this
// scenario where there are multiple threads that both add items and remove
// them. This is achieved by letting each thread work with its own local queue
// of items, and thus, we do not need any locks while this queue is occupied.
// Only when we have no items left in the local queue will we perform some
// locking and try to "steal" the work from another thread's local queue. This
// behavior helps to distribute the work between all workers and avoid locking.
//
namespace Chapter6.Recipe4
{
class Program
{
static void Main(string[] args)
{
CreateLinks();
Task t = RunProgram();
t.Wait();
}
static Dictionary<string, string[]> _contentEmulation = new Dictionary<string, string[]>();
static async Task RunProgram()
{
var bag = new ConcurrentBag<CrawlingTask>();
string[] urls = new[] {"http://microsoft.com/", "http://google.com/", "http://facebook.com/", "http://twitter.com/"};
var crawlers = new Task[4];
for (int i = 1; i <= 4; i++)
{
string crawlerName = "Crawler " + i.ToString();
bag.Add(new CrawlingTask { UrlToCrawl = urls[i-1], ProducerName = "root"});
crawlers[i - 1] = Task.Run(() => Crawl(bag, crawlerName));
}
await Task.WhenAll(crawlers);
}
static async Task Crawl(ConcurrentBag<CrawlingTask> bag, string crawlerName)
{
CrawlingTask task;
while (bag.TryTake(out task))
{
IEnumerable<string> urls = await GetLinksFromContent(task);
if (urls != null)
{
foreach (var url in urls)
{
var t = new CrawlingTask
{
UrlToCrawl = url,
ProducerName = crawlerName
};
bag.Add(t);
}
}
Console.WriteLine("Indexing url {0} posted by {1} is completed by {2}!",
task.UrlToCrawl, task.ProducerName, crawlerName);
}
}
static async Task<IEnumerable<string>> GetLinksFromContent(CrawlingTask task)
{
await GetRandomDelay();
if (_contentEmulation.ContainsKey(task.UrlToCrawl)) return _contentEmulation[task.UrlToCrawl];
return null;
}
static void CreateLinks()
{
_contentEmulation["http://microsoft.com/"] = new[] { "http://microsoft.com/a.html", "http://microsoft.com/b.html" };
_contentEmulation["http://microsoft.com/a.html"] = new[] { "http://microsoft.com/c.html", "http://microsoft.com/d.html" };
_contentEmulation["http://microsoft.com/b.html"] = new[] { "http://microsoft.com/e.html" };
_contentEmulation["http://google.com/"] = new[] { "http://google.com/a.html", "http://google.com/b.html" };
_contentEmulation["http://google.com/a.html"] = new[] { "http://google.com/c.html", "http://google.com/d.html" };
_contentEmulation["http://google.com/b.html"] = new[] { "http://google.com/e.html", "http://google.com/f.html" };
_contentEmulation["http://google.com/c.html"] = new[] { "http://google.com/h.html", "http://google.com/i.html" };
_contentEmulation["http://facebook.com/"] = new[] { "http://facebook.com/a.html", "http://facebook.com/b.html" };
_contentEmulation["http://facebook.com/a.html"] = new[] { "http://facebook.com/c.html", "http://facebook.com/d.html" };
_contentEmulation["http://facebook.com/b.html"] = new[] { "http://facebook.com/e.html" };
_contentEmulation["http://twitter.com/"] = new[] { "http://twitter.com/a.html", "http://twitter.com/b.html" };
_contentEmulation["http://twitter.com/a.html"] = new[] { "http://twitter.com/c.html", "http://twitter.com/d.html" };
_contentEmulation["http://twitter.com/b.html"] = new[] { "http://twitter.com/e.html" };
_contentEmulation["http://twitter.com/c.html"] = new[] { "http://twitter.com/f.html", "http://twitter.com/g.html" };
_contentEmulation["http://twitter.com/d.html"] = new[] { "http://twitter.com/h.html" };
_contentEmulation["http://twitter.com/e.html"] = new[] { "http://twitter.com/i.html" };
}
static Task GetRandomDelay()
{
int delay = new Random(DateTime.Now.Millisecond).Next(150, 200);
return Task.Delay(delay);
}
class CrawlingTask
{
public string UrlToCrawl { get; set; }
public string ProducerName { get; set; }
}
}
}