Рассылка в телеграм с многопоточностью и асинхронностью, как контролировать скорость?
Необходимо сделать рассылку Ботом Телеграм по людям Используются два разных метода. У каждого метода лимит 30 сообщений в секунду
Моя реализация:
import time
import asyncio
import aiohttp
import threading
async def send_message(user_id, send_type):
session = aiohttp.ClientSession()
# await session.post запрос
# send_type влияет на то, какой метод использован
return
def handler_send_message(user_id,send_type):
result = asyncio.run(send_message(user_id,send_type))
def main():
speed = 60
send_type = True
for user_id in range(100000):
send_type = False if send_type else True
time.sleep(1/speed)
threading.Thread(target=handler_send_message, args=(user_id,)).start()
# ожидание конца потоков
while threading.active_count() != 1:
time.sleep(1)
main()
Код работает, но его скорость плохо контролируется, ибо запросы могут застаиваться и потом разом обработаться всей кучей, что бьёт по лимитам.
Так же можно вычеркнуть асинхронность, если она мешает / из-за неё всё-таки медленней
Моя задача: Сделать быструю и контролируемую рассылку. Желательно, если для каждого метода можно будет контролировать скорость.
Ответы (1 шт):
Возможный вариант асинхронного решения:
- храним список всех запросов, которые мы запустили
- для каждого запроса запускаем асинхронную параллельную задачу, которая первым шагом ждет когда ей уже можно будет выполнятся
Логика, когда можно задаче разрешать делать запрос спрятана в функции throttle. Ее конкретная реализация зависит от того, как именно реализован лимит. Это может быть количество одновременно выполняемых запросов, тогда достаточно ждать пока это количество не станет меньше чем нужно (именно этот вариант реализован в примере ниже). Но так же возможен вариант, что проверяется количество запущенных запросов за последнюю секунду, тогда логику нужно подправить (для этого у нас есть вся информация, а именно время старта всех запросов и их типы).
Собственно код:
import asyncio
from time import time as gtime
from random import random
start = gtime()
def time():
return gtime() - start
class Request:
def __init__(self, user_id):
self.user_id = user_id
self.time_started = time()
def finish(self):
self.time_finished = time()
def log(msg):
print(f"{time():.3f}: {msg}")
async def send_message(user_id, send_type):
log(f"send: {send_type} {user_id:03d}: starting")
start = time()
# имитация вызова по сети
await asyncio.sleep(0.5 + 0.8 * random())
log(f"send: {send_type} {user_id:03d}: done in {time() - start:.3f}")
async def throttle(running_requests, send_type):
while len(running_requests[send_type]) > 3:
oldest = None
for r in running_requests[send_type]:
if oldest is None or oldest.time_started > r.time_started:
oldest = r
if time() - oldest.time_started > 1:
running_requests[send_type].remove(oldest)
await asyncio.sleep(0.01)
async def send_with_throttling(running_requests, user_id, send_type):
await throttle(running_requests, send_type)
request = Request(user_id)
running_requests[send_type].append(request)
await send_message(user_id, send_type)
request.finish()
async def main():
print("main: started")
send_type = True
running_requests = {True: [], False: []}
tasks = []
for user_id in range(30):
send_type = not send_type
tasks.append(
asyncio.create_task(
send_with_throttling(running_requests, user_id, send_type)
)
)
for task in tasks:
await task
print("main: finished")
asyncio.run(main())