Не получается сделать параллельную обработку пользователей
Если кратко, то у меня такая проблема: Пользователь А вводит сложный запрос, начинается обработка, через 5 секунд Пользователь Б вводит другой запрос. Бот заканчивает обработку для Пользователя А и принимается за Пользователя Б. А мне нужно, чтобы для пользователя Б обработка началась сразу и при этом не мешала Пользователю А
async def generate_response(user_id, user_message, bot):
response_message = None
try:
await start_typing(bot, user_id)
model_name = await get_user_model(user_id)
context = await get_context(user_id)
context.insert(0, {"role": "system", "content": system_prompt})
context.append({"role": "user", "content": user_message})
# Получаем текущее количество запросов
current_requests_count = await execute_query_single('bot_data.db',
"SELECT requests_count FROM user_profile WHERE user_id = ?", (user_id,)
)
if current_requests_count is not None:
current_requests_count = current_requests_count[0]
if ENGEENER_LOGS:
logging.info(f"Текущее количество запросов для пользователя {user_id} перед обновлением: {current_requests_count}")
# Увеличиваем счетчик запросов
new_requests_count = current_requests_count + 1
update_successful = await execute_update('bot_data.db',
"UPDATE user_profile SET requests_count = ? WHERE user_id = ?",
(new_requests_count, user_id)
)
if update_successful:
if ENGEENER_LOGS:
logging.info(f"Количество запросов обновлено для пользователя {user_id}: {new_requests_count}")
else:
if ENGEENER_LOGS:
logging.error(f"Не удалось обновить количество запросов для пользователя {user_id}.")
else:
if ENGEENER_LOGS:
logging.error(f"Пользователь с ID {user_id} отсутствует в базе данных.")
response_message = await try_api_providers(model_name, context)
short_response = response_message
if response_message:
response_message = response_message.strip()
if ENGEENER_LOGS:
logging.info(f"Ответ модели (перед обработкой): {response_message}")
try:
html_content = markdown2.markdown(response_message, extras=["fenced-code-blocks", "tables", "header-ids", "code-friendly"])
cleaned_html = bleach.clean(html_content, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES, strip=True)
soup = BeautifulSoup(cleaned_html, "html.parser")
if soup.find(lambda tag: tag.name is None):
cleaned_html = "Ошибка в ответе модели. Сгенерирован неверный HTML."
if len(cleaned_html) <= TELEGRAPH_THRESHOLD:
await send_message_with_retry(bot, user_id, short_response, parse_mode=ParseMode.MARKDOWN)
if ENGEENER_LOGS:
logging.info(f"Отправлен короткий ответ пользователю {user_id}: {response_message}")
else:
async with aiohttp.ClientSession() as session:
response_id = await add_response_to_server(session, "Ответ от бота", cleaned_html)
if response_id:
response_url = f"{URL}:{WEB_PORT}/response/{response_id}"
full_url = f"{URL}:{WEB_PORT}/response/{response_id}"
mobile_url = f"{URL}:{WEB_PORT}/response/{response_id}?mobile=true"
# Создание разметки для кнопок
response_keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="Мобильная версия?", url=mobile_url),
InlineKeyboardButton(text="Полная версия?️", url=full_url)]
])
# Отправка сообщения с кнопками
await bot.send_message(
chat_id=user_id,
text="Ответ доступен по кнопке снизу",
parse_mode=ParseMode.HTML,
reply_markup=response_keyboard
)
if ENGEENER_LOGS:
logging.info(f"Отправлен длинный ответ пользователю {user_id} на сервер. Ссылка: {response_url}")
else:
await send_message_with_retry(bot, user_id, "Ошибка при отправке ответа на сервер. Попробуйте позже.", parse_mode=ParseMode.HTML)
if ENGEENER_LOGS:
logging.error(f"Ошибка при отправке ответа на сервер для пользователя {user_id}.")
except Exception as e:
logging.exception(f"Ошибка обработки ответа модели: {e}")
await send_message_with_retry(bot, user_id, f"Ошибка обработки ответа модели: {e}", parse_mode=ParseMode.HTML)
else:
await send_message_with_retry(bot, user_id, "Ошибка: Не удалось получить ответ от модели. Попробуйте позже.", parse_mode=ParseMode.HTML)
if ENGEENER_LOGS:
logging.error(f"Ошибка получения ответа от модели для пользователя {user_id}.")
except TelegramAPIError as e:
await stop_typing(bot, user_id)
logging.exception(f"Ошибка Telegram API для пользователя {user_id}: {e}")
await send_message_with_retry(bot, user_id, f"Ошибка Telegram API: {e}", parse_mode=ParseMode.HTML)
except Exception as e:
await stop_typing(bot, user_id)
logging.exception(f"Непредвиденная ошибка для пользователя {user_id}: {e}")
await send_message_with_retry(bot, user_id, "Произошла неизвестная ошибка. Попробуйте позже.", parse_mode=ParseMode.HTML)
finally:
await stop_typing(bot, user_id)
await save_message(user_id, "user", user_message)
await save_message(user_id, "assistant", response_message or "Ошибка получения ответа")
return None
async def process_queue(user_id: int, bot: Bot):
"""Асинхронно обрабатываем очередь сообщений для пользователя."""
queue = user_queues[user_id]
while True:
message = await queue.get() # Асинхронно получаем сообщение из очереди
try:
await process_message(message, user_id, bot) # Асинхронно обрабатываем сообщение
except Exception as e:
logging.exception(f"Ошибка при обработке сообщения из очереди для пользователя {user_id}: {e}")
await bot.send_message(user_id, "Произошла ошибка при обработке вашего запроса.")
finally:
queue.task_done() # Подтверждаем завершение обработки задачи
async def process_message(message: types.Message, user_id: int, bot: Bot):
"""Асинхронно обрабатываем сообщение."""
try:
await execute_update('bot_data.db', "UPDATE user_profile SET last_seen = ? WHERE user_id = ?", (int(time.time()), user_id))
if message.text:
response_message = await generate_response(user_id, message.text, bot)
if response_message:
await bot.send_message(user_id, response_message)
except Exception as e:
logging.exception(f"Ошибка при обработке сообщения для пользователя {user_id}: {e}")
await bot.send_message(user_id, "Произошла ошибка при обработке вашего запроса.")
# В main:
@dp.message()
async def handle_all_messages(message: types.Message):
"""Обработчик всех входящих сообщений."""
user_id = message.from_user.id
if await is_banned(user_id):
return await message.reply("Вы забанены!")
is_subscribed = await check_channel_subscription(user_id, pyrogram_client)
if not is_subscribed:
await message.answer(f"Для использования бота, подпишитесь на канал {CHANNEL}")
return
if user_id not in user_queues:
user_queues[user_id] = asyncio.Queue()
if user_id not in user_tasks:
task = asyncio.create_task(process_queue(user_id, bot))
user_tasks[user_id] = task
logging.info(f"Запущена задача process_queue для пользователя {user_id}")
await user_queues[user_id].put(message)
Но пользователи у меня обрабаотываются последовательно:
2025-02-19 17:32:00,767 - INFO - main.py:2865 - Запущена задача process_queue для пользователя 7244036268
2025-02-19 17:32:00,770 - INFO - dispatcher.py:172 - Update id=185448895 is handled. Duration 331 ms by bot id=7774981871
2025-02-19 17:32:04,666 - INFO - _client.py:1025 - HTTP Request: POST https://helixmind.online/v1/chat/completions "HTTP/1.1 200 OK"
2025-02-19 17:32:20,561 - INFO - main.py:2865 - Запущена задача process_queue для пользователя 7627710555
2025-02-19 17:32:20,562 - INFO - dispatcher.py:172 - Update id=185448896 is handled. Duration 153 ms by bot id=7774981871
2025-02-19 17:32:21,448 - INFO - _client.py:1025 - HTTP Request: POST https://helixmind.online/v1/chat/completions "HTTP/1.1 200 OK"```