Как реализовать Producer/consumer паттерн для web-scraper'а с корректным завершением
Хочу сделать свой web-scraper многопоточным.
В работе scraper'а используется две коллекции - посещенных ссылок и очередь ссылок для обработки.
По ссылкам хожу через Selenium ChromeDriver, а чтобы распараллелить ChromeDriver - нужно работать через потоки, с Task'ами рабочих подходов не встретил.
По сути имеем producer/consumer, где несколько producer/consumer'ов в одном лице со следующими ограничениями:
- Нельзя плодить экземпляры браузера в безлимитном режиме (память)
- Нужно переиспользовать экземпляры, выделенные потоку повторно (долгая инициализация)
И казалось бы, тут очевидное решение - сделать несколько Thread'ов c общими коллекциями посещенных и не посещенных ссылок и «дело в шляпе», и пусть каждый поток работает до тех пор, пока в очереди что-то есть.
Но может возникнуть следующая неприятная ситуация. К примеру, есть 8 экземпляров браузера, в 8 потоках «гуляющие по сети».
И вот, в какой-то момент, очередь ссылок стала пуста, 1 поток все еще работает над текущей страницей, а остальные 7 поглядели, что обрабатывать им нечего и завершились.
И тут первый поток добавляет в очередь 10000 ссылок (всякое бывает), с которыми он вынужден разгребаться до скончания времен.
В теории возникает 4 ситуации:
| Очередь ссылок | Другие потоки | Ожидаемое поведение текущего потока |
|---|---|---|
| Есть ссылки | Есть работающие | Взять следующую ссылку из очереди |
| Есть ссылки | Есть ожидающие ссылок | Взять следующую ссылку из очереди |
| Пустая | Есть работающие | Перейти к ожиданию результатов других потоков |
| Пустая | Есть ожидающие ссылок | Завершить работу |
И вопрос мой заключается в том, как реализовать описанное в третьем пункте ожидание, чтобы, грубо говоря:
- Если очередь ссылок пуста, но есть потоки «в работе» - текущий поток должен ожидать изменения «статуса» любого из потоков.
- Как только какой-либо из соседей закончил текущую итерацию работы, повторить проверку.
- И только в случае, если все потоки «ждут», они должны завершиться.
В таком случае, потоки завершатся все разом и именно в тот момент, когда гарантированно не будет новых ссылок.
Вопрос - как?
Однопоточный код у меня примерно такого плана:
var unvisitedLinks = new ConcurrentQueue<IUrl>(startUrls);
var visitedLinks = new BlockingCollection<IUrl>();
while (unvisitedLinks.TryDequeue(out var currentUrl))
{
// Проверяем, что не посещали ссылку
if (visitedLinks.Any(link => link == currentUrl)) continue;
Navigate(currentUrl);
// Извлечение информации
var scrapedObjects = ScrapeObjects(currentUrl, otherParams);
foreach (var scrapedObject in scrapedObjects)
{
if (scrapedObject is IUrl url) unvisitedLinks.Enqueue(url);
if (scrapedObject is IScrapingResult item) yield return item;
}
// Добавляем ссылку в посещенные
visitedLinks.Add(currentUrl);
// Делаем паузу
await Task.Delay(delay);
}