Объединение задач обращения к API через asyncio и параллельной загрузки файлов

Есть две задачи: первая отправляет запросы к api, чтобы получить посты и затем получает список url изображений\видео из постов. Вторая сохраняет на диск изображения\видео в папку, полученные из запросов первой задачи.

Первая реализована с помощью asyncio и aiohttp. Вторая реализована с помощью aiofiles.

import asyncio
import aiohttp
import multi, api

semaphoreNozomi = asyncio.Semaphore(16)

async def runner():
...
from_r34 = False
full_multilist = multi.get_multi(from_r34)
...
internal_pos, internal_ext, internal_neg = full_multilist[i]
...
url_list = api.get_urls_list(internal_pos, internal_ext)#(positive_tags, extra_tags)
url_list = list(url_list)
url_list.sort()
# go to dir
if not len(url_list) == 0:
    string_tag = ''.join(internal_pos)
    folder_tag = re.sub(r'[<>/;,:\s]', ' ', string_tag)
    if not os.path.exists(save_dir + folder_tag):
        os.makedirs(save_dir + folder_tag)
        os.chdir(save_dir + folder_tag)
        print("Текущая директория изменилась на ", os.getcwd())
        # загрузка файлов                   
        if not os.path.exists(filename):
            print('ids File not exists:', filename)
            async with aiohttp.ClientSession(connector=aiohttp.TCPConnector()) as session:
                async with semaphoreNozomi:
                    tasks= []
                    for post_url in url_list:
                        tasks.append(asyncio.create_task(  
api.async_nozomi_download_file(session, semaphoreNozomi, post_url, internal_neg, relevant_date)))
                    await asyncio.gather(*tasks) # ожидает результаты выполнения всех задач'''

Но, насколько я знаю, асинхронность использует лишь 1 поток. Потому не получается сохранять сразу по 10-25 файлов одновременно без ошибок.

Функция:

async def async_nozomi_download_file(session, semaphoreNozomi, url: str, blacklist: list[str], relevant_post_date = None):
    
    filepath = Path.cwd()
    if relevant_post_date is None:
        relevant_post_date = datetime.strptime("1900-01-01", '%Y-%m-%d')
    filepath.mkdir(parents=True, exist_ok=True)
    try:
        async with semaphoreNozomi:
            async with session.get(url) as response:
                post_data = await response.json()
                current_post = from_dict(data_class=Post, data=post_data)
                post_date = datetime.strptime(current_post.date, '%Y-%m-%d %H:%M:%S-%f')
                if post_date > relevant_post_date:
                    current_post_tag_list = []
                    for i in range(len(current_post.artist)):
                        current_post_tag_list.append(current_post.artist[i].tag)
                    for i in range(len(current_post.character)):
                        current_post_tag_list.append(current_post.character[i].tag)
                    for i in range(len(current_post.copyright)):
                        current_post_tag_list.append(current_post.copyright[i].tag)
                    for i in range(len(current_post.general)):
                        current_post_tag_list.append(current_post.general[i].tag)

                    for tag in current_post_tag_list:
                        if not tag == '':
                            tag_counts[tag] += 1 
                    #print(tag_counts.items())
                    if not len(set(current_post_tag_list).intersection(blacklist)) > 0:
                        nozomi_img_counter = 1
                        img_tasks = []
                        for media_meta_data in current_post.imageurls:
                            filename = f'{current_post.date}-{nozomi_img_counter}-{media_meta_data.dataid}.{media_meta_data.type}'
                            filename = re.sub('[<>/:#%]', '', filename)
                            image_filepath = filepath.joinpath(filename)
                            if os.path.exists(image_filepath):
                                print('File already exists', image_filepath)
                            else:
                                print('File not exists', image_filepath)
                                img_tasks.append(asyncio.create_task(nozomi_file_saver(session, semaphoreNozomi,  
media_meta_data.imageurl, image_filepath)))
                                nozomi_img_counter += 1
                        await asyncio.gather(*img_tasks)
                    else:
                        print('Post in blacklist', current_post.postid)
                    
    except aiohttp.ClientError as e:
        return e
    except Exception as ex:
        return ex

async def nozomi_file_saver(session, url, image_filepath):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:69.0) Gecko/20100101 Firefox/69.0',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
        'Accept-Language': 'en-US,en;q=0.9',
        'Accept-Encoding': 'gzip, deflate, br',
        'Referer': 'https://nozomi.la/',
        'Upgrade-Insecure-Requests': '1'
    }
    async with session.get(url, headers=headers) as r:
        async with aiofiles.open(image_filepath, 'wb') as f:
            while True:
                chunk = await r.content.read(1024)
                if not chunk:
                    break
                await f.write(chunk)
    print('File downloaded', image_filepath)

Я бы хотел оптимизировать это, чтобы файлы могли параллельно записываться на диск, и чтобы использовались все возможности сети. Но никак не удается воткнуть корутину async def async_nozomi_download_file в ProcessPoolExecutor В текущей реализации записывается примерно по 2-4 файла в секунду(исправлено, 8-10 файлов), хотя файлы маленькие 3.6к картинок на 285мб загружаются слишком долго. В планах загрузить около 200к файлов на 90гб и не ждать несколько часов. Не удалось найти в интернете хорошие и рабочие примеры для подобной задачи. Нужна помощь, чтобы это исправить.

Updated: Обнаружена и исправлена проблема с неправильным использованием семафора, также разделил на отдельные подзадачи сохранение файлов. На данный момент загружает по 8-10 файлов в секунду. Наверное можно считать, что проблема устранена. Подойдет ли для этой задачи использование пула процессов?


Ответы (0 шт):