Ограничение количества запросов при использовании потоков
Нужно спарсить сайт в 3 потока, но строго не более 10 запросов за 60 секунд. Сделал очередь, в которую записывается отметка времени после каждого запроса, но вместо 10 запросов он делает 12. Не понимаю в чём проблема, прошу помощи. Может есть более изящный способ ограничения запросов в потоках?
import threading
import time
import queue
# Очередь для хранения времени запросов
timestamps_queue = queue.Queue(maxsize=10)
def main():
while True:
if timestamps_queue.qsize() == 10:
timestamp = timestamps_queue.get() # Получаем самую первую отметку времени
# Если прошло менее 60 секунд с момента первого запроса, то ждём необходимое время
if time.time() - timestamp <= 60:
time.sleep(60 - (time.time() - timestamp)) # Перерыв на оставшееся количество секунд до 60 между запросами
send()
def send():
time.sleep(1)
print("Сделал запрос")
timestamps_queue.put(time.time()) # Добавляем отметку времени в очередь после успешного запроса
th1 = threading.Thread(target=main).start()
th2 = threading.Thread(target=main).start()
th3 = threading.Thread(target=main).start()
Ответы (1 шт):
Учебный пример
send_now
- та функция, которую нельзя вызывать часто. Она печатает переданное сообщение и время, когда была вызвана.
interval
и messages
задают ограничения. В любой интервал времени interval
send_now
будет вызывана не более messages
раз.
send
- реализует задержки. Очередь send_queue
хранит времена, когда запись из очереди можно удалить. После отправки сообщения туда помещается текущее время плюс интервал. Если очередь хранит слишком много сообщений, send
засыпает до момента, когда очередь можно вычистить, просыпается и чистит очередь. Когда очередь стала достаточно короткой, send
вызывает send_now
и добавляет время в очередь.
Всё это делается под блокировкой. Все обращения к send
выполняются последовательно.
Остальной код для демонстрации. Ограничения - пять сообщений в течении десяти секунд.
import collections
import time
import threading
start_time = time.perf_counter()
def send_now(message):
et = time.perf_counter() - start_time
print('send_now', f'{et:6.2f}', message)
interval = 10
messages = 5
send_lock = threading.Lock()
send_queue = collections.deque()
def send(message):
with send_lock:
while len(send_queue) >= messages:
time.sleep(max(0, time.perf_counter() - send_queue[-messages]))
while send_queue and send_queue[0] <= time.perf_counter():
send_queue.popleft()
send_now(message)
send_queue.append(time.perf_counter() + interval)
def producer():
for c in range(100):
send(f'message {c} from {threading.current_thread().name}')
time.sleep(1)
def main():
threads = [threading.Thread(target=producer) for _ in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
main()
$ python temp.py send_now 0.00 message 0 from Thread-1 (producer) send_now 0.00 message 0 from Thread-2 (producer) send_now 0.00 message 0 from Thread-3 (producer) send_now 1.00 message 1 from Thread-1 (producer) send_now 1.00 message 1 from Thread-3 (producer) send_now 10.00 message 1 from Thread-2 (producer) send_now 10.00 message 2 from Thread-3 (producer) send_now 10.00 message 2 from Thread-1 (producer) send_now 11.00 message 2 from Thread-2 (producer) send_now 11.00 message 3 from Thread-3 (producer) send_now 20.00 message 3 from Thread-1 (producer) send_now 20.00 message 3 from Thread-2 (producer) send_now 20.00 message 4 from Thread-3 (producer) send_now 21.00 message 4 from Thread-1 (producer) ...
Рабочий код
В учебном коде много глобальных переменных. Так не делают, пишут декоратор, который умеет обрабатывать любые функции. Например, функцию send
нельзя вызывать чаще чем пять раз каждые десять секунд. Декоратор throttle
добавляет эту функциональность:
@throttle(interval=10, calls=5)
def send(message):
et = time.perf_counter() - start_time
print('send_now', f'{et:6.2f}', message)
Полный код:
import functools
import collections
import time
import threading
def throttle(interval, calls):
lock = threading.Lock()
queue = collections.deque()
def decorator(f):
@functools.wraps(f)
def wrapper(*args, **kwds):
with lock:
while len(queue) >= calls:
time.sleep(max(0, time.perf_counter() - queue[-calls]))
while queue and queue[0] <= time.perf_counter():
queue.popleft()
try:
r = f(*args, **kwds)
finally:
queue.append(time.perf_counter() + interval)
return r
return wrapper
return decorator
start_time = time.perf_counter()
@throttle(interval=10, calls=5)
def send(message):
et = time.perf_counter() - start_time
print('send_now', f'{et:6.2f}', message)
def producer():
for c in range(100):
send(f'message {c} from {threading.current_thread().name}')
time.sleep(1)
def main():
threads = [threading.Thread(target=producer) for _ in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
main()