Может ли вызов функции библиотеки в многопотоке держать остальные потоки на python?

Есть вот такой код многопоточного получения текстов со страниц:

import html2text
from readability import Document
from html2text import HTML2Text

def get_article(files): #на входе список ссылок
        with open(files) as file:
               links = [row.rstrip() for row in file]
        for link in sitemap: #для каждой ссылки
                response = requests.get(link, timeout = 10) #получаем код странцы
                doc = Document(response.text) #преобразуем полученное в DOM
                h = HTML2Text() #вызываем html2text для преобразования кода статьи в текст
                h.ignore_links = True
                h.ignore_images = True
                title = doc.title() #получаем заголовок статьи
                article = h.handle(doc.summary()) #получаем текст статьи
    
 with ThreadPoolExecutor(max_workers=num_threads) as pool:
    pool.map(lambda sitemap: get_article(sitemap), files)

Вопрос может ли вызов функций для обработки кода страницы, получения заголовка и текста держать остальные потоки или в каждом потоке выполняется независимо? Если да, то как можно это исправить?


Ответы (1 шт):

Автор решения: eri

Ваш код может падать от таймаута. таймаут стоит, а обработки нет.

В старом экзекуторе можно сделать так. Новые треды будут запускаться по мере освобождения старых. Ответы будут сформированы в итератор, который надо пройти.

from multiprocessing.dummy import Pool

with Pool(num_threads) as pool:
    for result in pool.imap_unordered(get_article, files):
        pass

В этом варианте я бы в тредах оставил только requests, а парсинг уже делал под этим фором (всеравно gil)

Или если ответы не важны совсем, как у вас в коде нет return.

pool = Pool(num_threads)
ar = pool.map_async(get_article, files)
pool.join()

Убрал ненужную лямбду.

И ещё одно замечание - файлы обычно читаются быстро, но так как имеют разный размер, то один файл может обрабатываться когда есть свободные воркеры. я бы смешал содержимое файлов через генератор.

def link_gen(files):
    for name in files:
        with open(files) as file:
            for row in file:
                yield row.rstrip() 

def get_article(link):
            response = requests.get(link, timeout = 10) #получаем код странцы
            doc = Document(response.text) #преобразуем полученное в DOM
            h = HTML2Text() #вызываем html2text для преобразования кода статьи в текст
            h.ignore_links = True
            h.ignore_images = True
            title = doc.title() #получаем заголовок статьи
            article = h.handle(doc.summary()) #получаем текст статьи        


pool.map_async(get_article, link_gen(files))

И о том что писал выше - экономим память не особо теряя в производительности:

def  get_article(link):
    try:
        response = requests.get(link, timeout = 10)
        return response 
    except:
        return


with Pool(num_threads) as pool:
    for response in pool.imap_unordered(get_article, link_gen(files)):
        if response:
            doc = Document(response.text) #преобразуем полученное в DOM
            h = HTML2Text() #вызываем html2text для преобразования кода статьи в текст
            h.ignore_links = True
            h.ignore_images = True
            title = doc.title() #получаем заголовок статьи
            article = h.handle(doc.summary()) #получаем текст статьи 

Чтоб загрузить все ядра нужны не треды, а процессы. Стоит ли их нагружать зависит от разницы во времени скачивания и обработки. если скачивается быстрее - то убери слово .dummy и модуль начнет работать на процессах. Конечно обработку верни обратно в функцию.

Стоит добавить chunksize. Значение выбирай примерно 5% от количества ссылок деленное на количество процессов.

→ Ссылка