Как реализовать Producer/consumer паттерн для web-scraper'а с корректным завершением

Хочу сделать свой web-scraper многопоточным.

В работе scraper'а используется две коллекции - посещенных ссылок и очередь ссылок для обработки.
По ссылкам хожу через Selenium ChromeDriver, а чтобы распараллелить ChromeDriver - нужно работать через потоки, с Task'ами рабочих подходов не встретил.

По сути имеем producer/consumer, где несколько producer/consumer'ов в одном лице со следующими ограничениями:

  • Нельзя плодить экземпляры браузера в безлимитном режиме (память)
  • Нужно переиспользовать экземпляры, выделенные потоку повторно (долгая инициализация)

И казалось бы, тут очевидное решение - сделать несколько Thread'ов c общими коллекциями посещенных и не посещенных ссылок и «дело в шляпе»‎, и пусть каждый поток работает до тех пор, пока в очереди что-то есть.

Но может возникнуть следующая неприятная ситуация. К примеру, есть 8 экземпляров браузера, в 8 потоках «гуляющие по сети».
И вот, в какой-то момент, очередь ссылок стала пуста, 1 поток все еще работает над текущей страницей, а остальные 7 поглядели, что обрабатывать им нечего и завершились.
И тут первый поток добавляет в очередь 10000 ссылок (всякое бывает), с которыми он вынужден разгребаться до скончания времен.

В теории возникает 4 ситуации:

Очередь ссылок Другие потоки Ожидаемое поведение текущего потока
Есть ссылки Есть работающие Взять следующую ссылку из очереди
Есть ссылки Есть ожидающие ссылок Взять следующую ссылку из очереди
Пустая Есть работающие Перейти к ожиданию результатов других потоков
Пустая Есть ожидающие ссылок Завершить работу

И вопрос мой заключается в том, как реализовать описанное в третьем пункте ожидание, чтобы, грубо говоря:

  • Если очередь ссылок пуста, но есть потоки «в работе» - текущий поток должен ожидать изменения «статуса» любого из потоков.
  • Как только какой-либо из соседей закончил текущую итерацию работы, повторить проверку.
  • И только в случае, если все потоки «ждут», они должны завершиться.

В таком случае, потоки завершатся все разом и именно в тот момент, когда гарантированно не будет новых ссылок.

Вопрос - как?

Однопоточный код у меня примерно такого плана:

var unvisitedLinks = new ConcurrentQueue<IUrl>(startUrls);
var visitedLinks = new BlockingCollection<IUrl>();

while (unvisitedLinks.TryDequeue(out var currentUrl))
{
    // Проверяем, что не посещали ссылку
    if (visitedLinks.Any(link => link == currentUrl)) continue;

    Navigate(currentUrl);

    // Извлечение информации
    var scrapedObjects = ScrapeObjects(currentUrl, otherParams);

    foreach (var scrapedObject in scrapedObjects)
    {
        if (scrapedObject is IUrl url) unvisitedLinks.Enqueue(url);

        if (scrapedObject is IScrapingResult item) yield return item;
    }

    // Добавляем ссылку в посещенные
    visitedLinks.Add(currentUrl);

    // Делаем паузу
    await Task.Delay(delay);
}

Ответы (0 шт):