Многопоточное асинхронное скачивание файлов с проверкой существования файла

Есть программа, которая по тегам находит посты. Затем по ним осуществляет скачивание изображений с полученных filepath внутри списка объектов Posts.

Проблема со скачиванием файлов в функциях download_media и _download_media. В том числе с проверкой существования файлов в папке, чтобы не качать файл повторно.

Программа скачивает их по одному, по очереди, не смотря на то, что должна разделить обработку массива for post in Posts и скачать файлы одновременно, ну или хотя бы в пределах количества ядер процессора. Когда постов мало, 90 изображений качаются минуту, не очень долго.

...
File already exists D:\ghd\img\artist imbi\40843394cbc7dde31b82dbbb6d817c651013475112a351dec7a07cb5bbe0e19d.jpg
File already exists D:\ghd\img\artist imbi\d3f1044c5879e688ef4d6d5b2356dae2058e3209b7b0261f017671c50a9ba8a6.jpg
it took: 50.14s

Если постов много, тогда очень долго. Не знаю как исправить эту ситуацию, улучшить программу в производительности в этой части. Нужно решить 2 вопроса: Как добиться параллельного скачивания файлов в многопоточном режиме и Проверка на существование файлов происходит медленно в if os.path.exists(filepath) В терминал выводит по 2 записи в секунду. Перебор папки с большим количеством файлов занимает ну слишком долгое время, чтобы проверить, что есть, а что отсутствует и нужно скачать.

def main():
    start = time.perf_counter()
    print("Запущено: ", start)
    
    save_dir = 'D:/ghd/img/'
    positive_tags = ['artist:imbi']
    extra_tags = ['sarah_(the_last_of_us)']
    negative_tags = ['dragon_ball']
    # go to dir
    string_tag = ''.join(positive_tags)
    folder_tag = re.sub(r'[;,:\s]', ' ', string_tag)
    if not os.path.exists(save_dir + folder_tag):
        os.makedirs(save_dir + folder_tag)
    os.chdir(save_dir + folder_tag)
    print("Текущая директория изменилась на ", os.getcwd())
    # Gets all posts with the tags
    Posts = api.get_posts_with_tags(positive_tags, negative_tags, extra_tags)
    print("Мы получили посты")
    print("Начата загрузка тега ", positive_tags)
    threads = []
    for post in Posts:
        t = Thread(target=api.download_media, args=(post, Path.cwd(),))
        threads.append(t)
        t.start()
    for thread in threads:
        thread.join()
    end = time.perf_counter()
    print(f"it took: {end - start:.2f}s")

if __name__ == '__main__':
    main()

Соответственно в модуле api.py есть вызываемые для этого функции

def download_media(post: Post, filepath: Path) -> List[str]:
    """Download all media on a post and save it.

    Args:
        post: The post to download.
        filepath: The file directory to save the media. The directory will be created if it doesn't
            already exist.

    Returns:
        The names of the images downloaded.

    """
    images_downloaded = []
    filepath.mkdir(parents=True, exist_ok=True)
    #print("Мы внутри downloadmedia")
    for media_meta_data in post.imageurls:
        filename = f'{media_meta_data.dataid}.{media_meta_data.type}'
        image_filepath = filepath.joinpath(filename)
        _download_media(media_meta_data.imageurl, image_filepath)
        images_downloaded.append(filename)
    #print("Мы закончили downloadmedia")
    return images_downloaded


def _download_media(image_url: str, filepath: Path):
    """Download an image and save it.

    Args:
        image_url: The image URL.
        filepath: The file directory to save the media. The directory will be created if it doesn't
            already exist.

    """
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:69.0) Gecko/20100101 Firefox/69.0',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
        'Accept-Language': 'en-US,en;q=0.9',
        'Accept-Encoding': 'gzip, deflate, br',
        'Referer': 'https://nozomi.la/',
        'Upgrade-Insecure-Requests': '1'
    }
    if os.path.exists(filepath):
        print('File already exists', filepath)
    else:
        print('File not exists %s', filepath)
        with requests.get(image_url, stream=True, headers=headers) as r:
            with open(filepath, 'wb') as f:
                shutil.copyfileobj(r.raw, f)
        _LOGGER.debug('Image downloaded %s', filepath)
        print('File downloaded ', filepath)

Полный код программы GitHub


Ответы (1 шт):

Автор решения: Алексей Кочетков

По рекомендации полностью переписать код с нуля и исправить функцию, получающую список url постов, т.к. на выходе получается генератор, а не список.

Главная функция:

def runner():
    save_dir = 'D:/ghd/img/'
    positive_tags = ['artist:imbi']
    extra_tags = ['sarah_(the_last_of_us)']#['artist:Xentho','sherry']#, 'lesdias','artist:IncredibleChris']['artist:imbi']
    negative_tags = ['skateboard', 'petite']
    url_list = get_urls_list(positive_tags)#, negative_tags)#, extra_tags)
    url_list = list(url_list)
    # go to dir
    if not len(url_list) == 0:
        string_tag = ''.join(positive_tags)
        folder_tag = re.sub(r'[;,:\s]', ' ', string_tag)
        if not os.path.exists(save_dir + folder_tag):
            os.makedirs(save_dir + folder_tag)
        os.chdir(save_dir + folder_tag)
        print("Текущая директория изменилась на ", os.getcwd())
        threads= []
        with ThreadPoolExecutor(max_workers=20) as executor:
            for post_url in url_list:
                threads.append(executor.submit(download_file, post_url, Path.cwd()))
                
            for task in as_completed(threads):
                pass

if __name__ == '__main__':
    runner()

Функция получения списка url:

url_list = []

def get_urls_list(positive_tags: list[str], negative_tags: list[str] = None, extra_tags: list[str] = None) -> list[str]:
    if negative_tags is None:
        negative_tags = list()
    if extra_tags is None:
        extra_tags = list()
    try:
        positive_post_urls = _get_post_urls(positive_tags)
        negative_post_urls = _get_post_urls(negative_tags)
        extra_post_urls = _get_post_urls(extra_tags)
        relevant_post_urls = set(positive_post_urls + list(set(extra_post_urls) - set(positive_post_urls))) #- set(negative_post_urls)
        #relevant_post_urls = [x for x in pos_extra_post_urls if x not in negative_post_urls]
        return relevant_post_urls
    except InvalidTagFormat as tf:
        raise tf
    except Exception as ex:
        raise ex

Загрузка файлов объединена в 1 функцию:

def download_file(url: str, filepath: Path):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:69.0) Gecko/20100101 Firefox/69.0',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
        'Accept-Language': 'en-US,en;q=0.9',
        'Accept-Encoding': 'gzip, deflate, br',
        'Referer': 'https://nozomi.la/',
        'Upgrade-Insecure-Requests': '1'
    }
    filepath.mkdir(parents=True, exist_ok=True)
    try:
        post_data = requests.get(url).json()
        current_post = from_dict(data_class=Post, data=post_data)
        for media_meta_data in current_post.imageurls:
            filename = f'{media_meta_data.dataid}.{media_meta_data.type}'
            image_filepath = filepath.joinpath(filename)
            if os.path.exists(image_filepath):
                print('File already exists', image_filepath)
            else:
                print('File not exists', image_filepath)
                with requests.get(media_meta_data.imageurl, stream=True, headers=headers) as r:
                    with open(image_filepath, 'wb') as f:
                        shutil.copyfileobj(r.raw, f)
                print('File downloaded ', image_filepath)
    except requests.exceptions.RequestException as e:
        return e
    except Exception as ex:
        return ex

Скорость работы увеличилась примерно в 2-4 раза. Остальные небольшие изменения отражены в полном коде программы Git

→ Ссылка