Код работает чуть больше суток и перестает работать и пропадают подключения к базе данных
Есть код на python в котором я запускаю сервер Flask для прослушивания вебхуков, которые я добавляю в очередь используя библиотеку Queue. Сначала я запускаю фоновый поток который собирает данные с сервиса, обрабатываю их и вставляю в базу данных. Так же во время обработки я добавляю данные в очередь обработки аудитов (истории изменений сущности), и запускаю поток который обрабатывает данные из этой очереди. После того когда добавились все данные я запускаю поток для обработки данных которые в очереди вебхуков. Почему то код работал больше суток и просто перестал обрабатывать все приходящие вебхуки. При этом в базе данных пропали подключения к ней(бывало что подключения оставались, но просто ничего не выполняли), так же создается много потоков у это кода хотя при обычной его работе было только 2 потока.
Создание подключения к бд
from mysql.connector import pooling
connection_pool = pooling.MySQLConnectionPool(
pool_name="webhook",
pool_size=5,
pool_reset_session=True,
host='host',
user='user',
password='password',
database='database'
)
Часть кода:
def sql_insert(array_value):
try:
with connection_pool.get_connection() as connection:
cursor = connection.cursor()
for entity_key, entity_item in array_value.items():
# какая то обработка
query = insert_query(entity_key, arr_item)
cursor.executemany(query, array_items)
if audit_flag:
for one_id in id_item:
audit_queue.put((one_id, entity_key))
# Добавляем элементы в очередь
# Обработка
cursor.executemany(query_link, link_item_one_entity)
connection.commit()
except Exception as e:
send_error_to_telegram(e, "webhook")
finally:
pass
# По завершении блока with соединение будет возвращено в пул автоматически
webhook_queue = queue.Queue()
audit_queue = queue.Queue()
def webhook_handler():
while True:
webhook_item = webhook_queue.get()
# Получить элемент из очереди, блокируется до получения элемента
try:
# Обрабатываем элемент здесь
webhook_body(webhook_item)
except Exception as ex:
send_error_to_telegram(ex, "webhook")
finally:
# Сообщить очереди, что обработка элемента завершена
webhook_queue.task_done()
def audit_handler():
while True:
# Получить элемент из очереди, блокируется до получения элемента
audit = audit_queue.get()
try:
# Обрабатываем элемент здесь
audit_insert(audit)
except Exception as ex:
send_error_to_telegram(ex, "webhook")
finally:
# Сообщить очереди, что обработка элемента завершена
audit_queue.task_done()
if __name__ == '__main__':
thread_add = threading.Thread(target=data_processing)
worker_thread = threading.Thread(target=webhook_handler)
worker_thread.daemon = True
# worker_thread.start()
thread_add.start()
audit_thread = threading.Thread(target=audit_handler)
audit_thread.daemon = True
audit_thread.start()
worker_thread.start()
app.run(host='0.0.0.0', port=80)