Ограничение количества запросов при использовании потоков

Нужно спарсить сайт в 3 потока, но строго не более 10 запросов за 60 секунд. Сделал очередь, в которую записывается отметка времени после каждого запроса, но вместо 10 запросов он делает 12. Не понимаю в чём проблема, прошу помощи. Может есть более изящный способ ограничения запросов в потоках?

import threading
import time
import queue

# Очередь для хранения времени запросов
timestamps_queue = queue.Queue(maxsize=10)

def main():

    while True:

        if timestamps_queue.qsize() == 10:
            timestamp = timestamps_queue.get()  # Получаем самую первую отметку времени

            # Если прошло менее 60 секунд с момента первого запроса, то ждём необходимое время
            if time.time() - timestamp <= 60:
                time.sleep(60 - (time.time() - timestamp))  # Перерыв на оставшееся количество секунд до 60 между запросами

        send()

def send():
    time.sleep(1)
    print("Сделал запрос")
    timestamps_queue.put(time.time())  # Добавляем отметку времени в очередь после успешного запроса

th1 = threading.Thread(target=main).start()
th2 = threading.Thread(target=main).start()
th3 = threading.Thread(target=main).start()

Ответы (1 шт):

Автор решения: Stanislav Volodarskiy

Учебный пример

send_now - та функция, которую нельзя вызывать часто. Она печатает переданное сообщение и время, когда была вызвана.

interval и messages задают ограничения. В любой интервал времени interval send_now будет вызывана не более messages раз.

send - реализует задержки. Очередь send_queue хранит времена, когда запись из очереди можно удалить. После отправки сообщения туда помещается текущее время плюс интервал. Если очередь хранит слишком много сообщений, send засыпает до момента, когда очередь можно вычистить, просыпается и чистит очередь. Когда очередь стала достаточно короткой, send вызывает send_now и добавляет время в очередь.

Всё это делается под блокировкой. Все обращения к send выполняются последовательно.

Остальной код для демонстрации. Ограничения - пять сообщений в течении десяти секунд.

import collections
import time
import threading


start_time = time.perf_counter()


def send_now(message):
    et = time.perf_counter() - start_time
    print('send_now', f'{et:6.2f}', message)


interval = 10
messages = 5


send_lock = threading.Lock()
send_queue = collections.deque()


def send(message):
    with send_lock:
        while len(send_queue) >= messages:
            time.sleep(max(0, time.perf_counter() - send_queue[-messages]))
            while send_queue and send_queue[0] <= time.perf_counter():
                send_queue.popleft()

        send_now(message)
        send_queue.append(time.perf_counter() + interval)


def producer():
    for c in range(100):
        send(f'message {c} from {threading.current_thread().name}')
        time.sleep(1)


def main():
    threads = [threading.Thread(target=producer) for _ in range(3)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()


main()
$ python temp.py
send_now   0.00 message 0 from Thread-1 (producer)
send_now   0.00 message 0 from Thread-2 (producer)
send_now   0.00 message 0 from Thread-3 (producer)
send_now   1.00 message 1 from Thread-1 (producer)
send_now   1.00 message 1 from Thread-3 (producer)
send_now  10.00 message 1 from Thread-2 (producer)
send_now  10.00 message 2 from Thread-3 (producer)
send_now  10.00 message 2 from Thread-1 (producer)
send_now  11.00 message 2 from Thread-2 (producer)
send_now  11.00 message 3 from Thread-3 (producer)
send_now  20.00 message 3 from Thread-1 (producer)
send_now  20.00 message 3 from Thread-2 (producer)
send_now  20.00 message 4 from Thread-3 (producer)
send_now  21.00 message 4 from Thread-1 (producer)
...

Рабочий код

В учебном коде много глобальных переменных. Так не делают, пишут декоратор, который умеет обрабатывать любые функции. Например, функцию send нельзя вызывать чаще чем пять раз каждые десять секунд. Декоратор throttle добавляет эту функциональность:

@throttle(interval=10, calls=5)
def send(message):
    et = time.perf_counter() - start_time
    print('send_now', f'{et:6.2f}', message)

Полный код:

import functools
import collections
import time
import threading


def throttle(interval, calls):

    lock = threading.Lock()
    queue = collections.deque()

    def decorator(f):

        @functools.wraps(f)
        def wrapper(*args, **kwds):
            with lock:
                while len(queue) >= calls:
                    time.sleep(max(0, time.perf_counter() - queue[-calls]))
                    while queue and queue[0] <= time.perf_counter():
                        queue.popleft()

                try:
                    r = f(*args, **kwds)
                finally:
                    queue.append(time.perf_counter() + interval)
                return r

        return wrapper

    return decorator


start_time = time.perf_counter()


@throttle(interval=10, calls=5)
def send(message):
    et = time.perf_counter() - start_time
    print('send_now', f'{et:6.2f}', message)


def producer():
    for c in range(100):
        send(f'message {c} from {threading.current_thread().name}')
        time.sleep(1)


def main():
    threads = [threading.Thread(target=producer) for _ in range(3)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()


main()
→ Ссылка