Многопоточное асинхронное скачивание файлов с проверкой существования файла
Есть программа, которая по тегам находит посты. Затем по ним осуществляет скачивание изображений с полученных filepath внутри списка объектов Posts.
Проблема со скачиванием файлов в функциях download_media и _download_media. В том числе с проверкой существования файлов в папке, чтобы не качать файл повторно.
Программа скачивает их по одному, по очереди, не смотря на то, что должна разделить обработку массива for post in Posts и скачать файлы одновременно, ну или хотя бы в пределах количества ядер процессора. Когда постов мало, 90 изображений качаются минуту, не очень долго.
...
File already exists D:\ghd\img\artist imbi\40843394cbc7dde31b82dbbb6d817c651013475112a351dec7a07cb5bbe0e19d.jpg
File already exists D:\ghd\img\artist imbi\d3f1044c5879e688ef4d6d5b2356dae2058e3209b7b0261f017671c50a9ba8a6.jpg
it took: 50.14s
Если постов много, тогда очень долго. Не знаю как исправить эту ситуацию, улучшить программу в производительности в этой части.
Нужно решить 2 вопроса: Как добиться параллельного скачивания файлов в многопоточном режиме и Проверка на существование файлов происходит медленно в if os.path.exists(filepath)
В терминал выводит по 2 записи в секунду. Перебор папки с большим количеством файлов занимает ну слишком долгое время, чтобы проверить, что есть, а что отсутствует и нужно скачать.
def main():
start = time.perf_counter()
print("Запущено: ", start)
save_dir = 'D:/ghd/img/'
positive_tags = ['artist:imbi']
extra_tags = ['sarah_(the_last_of_us)']
negative_tags = ['dragon_ball']
# go to dir
string_tag = ''.join(positive_tags)
folder_tag = re.sub(r'[;,:\s]', ' ', string_tag)
if not os.path.exists(save_dir + folder_tag):
os.makedirs(save_dir + folder_tag)
os.chdir(save_dir + folder_tag)
print("Текущая директория изменилась на ", os.getcwd())
# Gets all posts with the tags
Posts = api.get_posts_with_tags(positive_tags, negative_tags, extra_tags)
print("Мы получили посты")
print("Начата загрузка тега ", positive_tags)
threads = []
for post in Posts:
t = Thread(target=api.download_media, args=(post, Path.cwd(),))
threads.append(t)
t.start()
for thread in threads:
thread.join()
end = time.perf_counter()
print(f"it took: {end - start:.2f}s")
if __name__ == '__main__':
main()
Соответственно в модуле api.py есть вызываемые для этого функции
def download_media(post: Post, filepath: Path) -> List[str]:
"""Download all media on a post and save it.
Args:
post: The post to download.
filepath: The file directory to save the media. The directory will be created if it doesn't
already exist.
Returns:
The names of the images downloaded.
"""
images_downloaded = []
filepath.mkdir(parents=True, exist_ok=True)
#print("Мы внутри downloadmedia")
for media_meta_data in post.imageurls:
filename = f'{media_meta_data.dataid}.{media_meta_data.type}'
image_filepath = filepath.joinpath(filename)
_download_media(media_meta_data.imageurl, image_filepath)
images_downloaded.append(filename)
#print("Мы закончили downloadmedia")
return images_downloaded
def _download_media(image_url: str, filepath: Path):
"""Download an image and save it.
Args:
image_url: The image URL.
filepath: The file directory to save the media. The directory will be created if it doesn't
already exist.
"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:69.0) Gecko/20100101 Firefox/69.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Referer': 'https://nozomi.la/',
'Upgrade-Insecure-Requests': '1'
}
if os.path.exists(filepath):
print('File already exists', filepath)
else:
print('File not exists %s', filepath)
with requests.get(image_url, stream=True, headers=headers) as r:
with open(filepath, 'wb') as f:
shutil.copyfileobj(r.raw, f)
_LOGGER.debug('Image downloaded %s', filepath)
print('File downloaded ', filepath)
Полный код программы GitHub
Ответы (1 шт):
По рекомендации полностью переписать код с нуля и исправить функцию, получающую список url постов, т.к. на выходе получается генератор, а не список.
Главная функция:
def runner():
save_dir = 'D:/ghd/img/'
positive_tags = ['artist:imbi']
extra_tags = ['sarah_(the_last_of_us)']#['artist:Xentho','sherry']#, 'lesdias','artist:IncredibleChris']['artist:imbi']
negative_tags = ['skateboard', 'petite']
url_list = get_urls_list(positive_tags)#, negative_tags)#, extra_tags)
url_list = list(url_list)
# go to dir
if not len(url_list) == 0:
string_tag = ''.join(positive_tags)
folder_tag = re.sub(r'[;,:\s]', ' ', string_tag)
if not os.path.exists(save_dir + folder_tag):
os.makedirs(save_dir + folder_tag)
os.chdir(save_dir + folder_tag)
print("Текущая директория изменилась на ", os.getcwd())
threads= []
with ThreadPoolExecutor(max_workers=20) as executor:
for post_url in url_list:
threads.append(executor.submit(download_file, post_url, Path.cwd()))
for task in as_completed(threads):
pass
if __name__ == '__main__':
runner()
Функция получения списка url:
url_list = []
def get_urls_list(positive_tags: list[str], negative_tags: list[str] = None, extra_tags: list[str] = None) -> list[str]:
if negative_tags is None:
negative_tags = list()
if extra_tags is None:
extra_tags = list()
try:
positive_post_urls = _get_post_urls(positive_tags)
negative_post_urls = _get_post_urls(negative_tags)
extra_post_urls = _get_post_urls(extra_tags)
relevant_post_urls = set(positive_post_urls + list(set(extra_post_urls) - set(positive_post_urls))) #- set(negative_post_urls)
#relevant_post_urls = [x for x in pos_extra_post_urls if x not in negative_post_urls]
return relevant_post_urls
except InvalidTagFormat as tf:
raise tf
except Exception as ex:
raise ex
Загрузка файлов объединена в 1 функцию:
def download_file(url: str, filepath: Path):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:69.0) Gecko/20100101 Firefox/69.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Referer': 'https://nozomi.la/',
'Upgrade-Insecure-Requests': '1'
}
filepath.mkdir(parents=True, exist_ok=True)
try:
post_data = requests.get(url).json()
current_post = from_dict(data_class=Post, data=post_data)
for media_meta_data in current_post.imageurls:
filename = f'{media_meta_data.dataid}.{media_meta_data.type}'
image_filepath = filepath.joinpath(filename)
if os.path.exists(image_filepath):
print('File already exists', image_filepath)
else:
print('File not exists', image_filepath)
with requests.get(media_meta_data.imageurl, stream=True, headers=headers) as r:
with open(image_filepath, 'wb') as f:
shutil.copyfileobj(r.raw, f)
print('File downloaded ', image_filepath)
except requests.exceptions.RequestException as e:
return e
except Exception as ex:
return ex
Скорость работы увеличилась примерно в 2-4 раза. Остальные небольшие изменения отражены в полном коде программы Git