Объединение задач обращения к API через asyncio и параллельной загрузки файлов
Есть две задачи: первая отправляет запросы к api, чтобы получить посты и затем получает список url изображений\видео из постов. Вторая сохраняет на диск изображения\видео в папку, полученные из запросов первой задачи.
Первая реализована с помощью asyncio и aiohttp. Вторая реализована с помощью aiofiles.
import asyncio
import aiohttp
import multi, api
semaphoreNozomi = asyncio.Semaphore(16)
async def runner():
...
from_r34 = False
full_multilist = multi.get_multi(from_r34)
...
internal_pos, internal_ext, internal_neg = full_multilist[i]
...
url_list = api.get_urls_list(internal_pos, internal_ext)#(positive_tags, extra_tags)
url_list = list(url_list)
url_list.sort()
# go to dir
if not len(url_list) == 0:
string_tag = ''.join(internal_pos)
folder_tag = re.sub(r'[<>/;,:\s]', ' ', string_tag)
if not os.path.exists(save_dir + folder_tag):
os.makedirs(save_dir + folder_tag)
os.chdir(save_dir + folder_tag)
print("Текущая директория изменилась на ", os.getcwd())
# загрузка файлов
if not os.path.exists(filename):
print('ids File not exists:', filename)
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector()) as session:
async with semaphoreNozomi:
tasks= []
for post_url in url_list:
tasks.append(asyncio.create_task(
api.async_nozomi_download_file(session, semaphoreNozomi, post_url, internal_neg, relevant_date)))
await asyncio.gather(*tasks) # ожидает результаты выполнения всех задач'''
Но, насколько я знаю, асинхронность использует лишь 1 поток. Потому не получается сохранять сразу по 10-25 файлов одновременно без ошибок.
Функция:
async def async_nozomi_download_file(session, semaphoreNozomi, url: str, blacklist: list[str], relevant_post_date = None):
filepath = Path.cwd()
if relevant_post_date is None:
relevant_post_date = datetime.strptime("1900-01-01", '%Y-%m-%d')
filepath.mkdir(parents=True, exist_ok=True)
try:
async with semaphoreNozomi:
async with session.get(url) as response:
post_data = await response.json()
current_post = from_dict(data_class=Post, data=post_data)
post_date = datetime.strptime(current_post.date, '%Y-%m-%d %H:%M:%S-%f')
if post_date > relevant_post_date:
current_post_tag_list = []
for i in range(len(current_post.artist)):
current_post_tag_list.append(current_post.artist[i].tag)
for i in range(len(current_post.character)):
current_post_tag_list.append(current_post.character[i].tag)
for i in range(len(current_post.copyright)):
current_post_tag_list.append(current_post.copyright[i].tag)
for i in range(len(current_post.general)):
current_post_tag_list.append(current_post.general[i].tag)
for tag in current_post_tag_list:
if not tag == '':
tag_counts[tag] += 1
#print(tag_counts.items())
if not len(set(current_post_tag_list).intersection(blacklist)) > 0:
nozomi_img_counter = 1
img_tasks = []
for media_meta_data in current_post.imageurls:
filename = f'{current_post.date}-{nozomi_img_counter}-{media_meta_data.dataid}.{media_meta_data.type}'
filename = re.sub('[<>/:#%]', '', filename)
image_filepath = filepath.joinpath(filename)
if os.path.exists(image_filepath):
print('File already exists', image_filepath)
else:
print('File not exists', image_filepath)
img_tasks.append(asyncio.create_task(nozomi_file_saver(session, semaphoreNozomi,
media_meta_data.imageurl, image_filepath)))
nozomi_img_counter += 1
await asyncio.gather(*img_tasks)
else:
print('Post in blacklist', current_post.postid)
except aiohttp.ClientError as e:
return e
except Exception as ex:
return ex
async def nozomi_file_saver(session, url, image_filepath):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:69.0) Gecko/20100101 Firefox/69.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Referer': 'https://nozomi.la/',
'Upgrade-Insecure-Requests': '1'
}
async with session.get(url, headers=headers) as r:
async with aiofiles.open(image_filepath, 'wb') as f:
while True:
chunk = await r.content.read(1024)
if not chunk:
break
await f.write(chunk)
print('File downloaded', image_filepath)
Я бы хотел оптимизировать это, чтобы файлы могли параллельно записываться на диск, и чтобы использовались все возможности сети. Но никак не удается воткнуть корутину async def async_nozomi_download_file в ProcessPoolExecutor
В текущей реализации записывается примерно по 2-4 файла в секунду(исправлено, 8-10 файлов), хотя файлы маленькие 3.6к картинок на 285мб загружаются слишком долго. В планах загрузить около 200к файлов на 90гб и не ждать несколько часов.
Не удалось найти в интернете хорошие и рабочие примеры для подобной задачи.
Нужна помощь, чтобы это исправить.
Updated: Обнаружена и исправлена проблема с неправильным использованием семафора, также разделил на отдельные подзадачи сохранение файлов. На данный момент загружает по 8-10 файлов в секунду. Наверное можно считать, что проблема устранена. Подойдет ли для этой задачи использование пула процессов?