Не срабатывает on_message по mqtt python

Мне задали задание по mqtt. Задача прослушать несколько каналов, получить значения и провести с ними манипуляцию. Возник вопрос, во время получения информации от клиента, функция on_message не срабатывает. Есть функция, которая срабатывает после отправки, но в ней передается только Client, я ее не исполдьзую. Подскажите что не так. Код который передает данные:

# -*- coding: utf-8 -*-
"""
Файл служит для определения точности вашего алгоритма

Для получения оценки точности, запустите файл на исполнение
"""
import json
from threading import Thread
import time

import paho.mqtt.client as paho

import eval as submission


# import solution as submission


def message_handler(client, userdata, msg):
    topic = msg.topic
    msg_data = msg.payload.decode()
    # print(f"Received {topic}: {msg_data}")

    if len(user_test_results) <= current_test_id:
        user_test_results.append([])

    user_test_results[current_test_id].append([topic, msg_data])


user_test_results = []
current_test_id = 0


def main():
    global current_test_id
    annot_file = "annotations.json"

    with open(annot_file, 'r') as f:
        data = json.load(f)

    test_cases = data['test_cases']
    test_results = data['test_results']

    args = submission.setup()
    Thread(
        target=submission.main_loop,
        args=(args,),
        daemon=True
    ).start()

    client = paho.Client()
    client.on_message = message_handler

    if client.connect("localhost", 1883, 60) != 0:
        print("Couldn't connect to the mqtt broker")
        exit(1)

    client.subscribe("odd")
    client.subscribe("test_multiply")
    client.subscribe("test_addend")

    correct = 0
    for current_test_id, test_case in enumerate(test_cases):
        for topic, msg in test_case:
            client.publish(topic, msg, 0)
            client.loop_write()
            client.loop_read()

        for _ in range(10):
            client.loop_read()
            time.sleep(0.1)

        if test_results[current_test_id] == user_test_results[current_test_id]:
            correct += 1

    total_object = len(test_cases)
    print(f"Из {total_object} тестов верны {correct}")

    score = correct / total_object
    print(f"Точность: {score:.2f}")


if __name__ == '__main__':
    main()

Файл, который нужно мне написать:

# -*- coding: utf-8 -*-
import paho.mqtt.client as paho

sum = 0
addend_numbers = 0
def setup():
    """
        Функция для инициализации клиента MQTT, подписки на каналы,
            создание callback-функций.

        Выходные данные: что угодно, всё что вы захотите передать в функцию
            main_loop после инициализации MQTT-клиента.
    """
    client = paho.Client()
    client.on_message = on_message
    client.connect("127.0.0.1", 1883, 60)
    client.subscribe([("*2", 0), ("*3", 0), ("addend", 0), ("command", 0), ("numbers", 0)])

def main_loop(paho_client):
    """
        Функция с циклом для обработки входящих сообщений и отправки исходящих сообщений.
    """
    # TODO: Отредактируйте эту функцию по своему усмотрению.
    # Код проверки один раз вызовет функцию setup.
    # Затем, для каждого теста будет вызывать функцию main_loop.
    # Все пользовательские функции должны вызываться из вышеперечисленных.
    pass

def on_message(client, userdata, msg):
    print("Message received-> " + msg.topic + " " + str(msg.payload))
    topic = msg.topic
    msg_data = msg.payload.decode("utf-8")

    if topic.endswith("2"):
        number, channel_name = msg_data.split(";")
        result = int(number) * 2
        client.publish(channel_name, str(result))

    elif topic.endswith("3"):
        number, channel_name = msg_data.split(";")
        result = int(number) * 3
        client.publish(channel_name, str(result))

    elif topic == "addend":
        addend_numbers.append(int(msg_data))

    elif topic == "command":
        if addend_numbers:
            total_sum = sum(addend_numbers)
            client.publish(msg_data, str(total_sum))
            addend_numbers.clear()  # Очистка после отправки

    elif topic == "numbers":
        number = int(msg_data)
        if number % 2 != 0:  # Проверка на нечётность
            client.publish("odd", str(number))

Ответы (1 шт):

Автор решения: Dmitry

вы же не вызываете ни одну из loop функций. Их там как минимум 3 под разные задачи. Например для клиента можно использовать

# ваш код

    client.on_message = on_message
    client.connect("127.0.0.1", 1883, 60)
    client.loop_start()

# код дальше

loop_start() создает отдельный поток в котором будет слушать сообщения. Он перезапускается автоматически. Использовать в паре с loop_stop().

Есть метод, который запускает прослушивание, но при этом блокирует исполнение в этом потоке, loop_forever().

Можно использовать метод loop() для клиента с указанием времени блокировки.

Думаю это то, что вы ищете

UPD

Вот переделанный ваш код, протестированный

import paho.mqtt.client as paho

sum = 0
addend_numbers = 0
def setup():
    """
        Функция для инициализации клиента MQTT, подписки на каналы,
            создание callback-функций.

        Выходные данные: что угодно, всё что вы захотите передать в функцию
            main_loop после инициализации MQTT-клиента.
    """
    client = paho.Client()
    client.on_message = on_message
    client.connect("localhost", 1883, 60)
    client.subscribe([("*2", 0), ("*3", 0), ("addend", 0), ("command", 0), ("numbers", 0), ("#", 0)])
    return client

def main_loop(paho_client):
    """
        Функция с циклом для обработки входящих сообщений и отправки исходящих сообщений.
    """
    # TODO: Отредактируйте эту функцию по своему усмотрению.
    # Код проверки один раз вызовет функцию setup.
    # Затем, для каждого теста будет вызывать функцию main_loop.
    # Все пользовательские функции должны вызываться из вышеперечисленных.
    pass

def on_message(client, userdata, msg):
    print("Message received-> " + msg.topic + " " + str(msg.payload))
    topic = msg.topic
    msg_data = msg.payload.decode("utf-8")

    if topic.endswith("2"):
        number, channel_name = msg_data.split(";")
        result = int(number) * 2
        client.publish(channel_name, str(result))

    elif topic.endswith("3"):
        number, channel_name = msg_data.split(";")
        result = int(number) * 3
        client.publish(channel_name, str(result))

    elif topic == "addend":
        addend_numbers.append(int(msg_data))

    elif topic == "command":
        if addend_numbers:
            total_sum = sum(addend_numbers)
            client.publish(msg_data, str(total_sum))
            addend_numbers.clear()  # Очистка после отправки

    elif topic == "numbers":
        number = int(msg_data)
        if number % 2 != 0:  # Проверка на нечётность
            client.publish("odd", str(number))

# здесь я добавил то чего вам не хватало и то, что я пытался вам донести,
# что должен быть хоть один метод loop
try:
    client = setup() 
    client.loop_forever() # вот он
except Exception as e:
    print(f"Ошибка: {e}")

В функции setup() я добавил топик для прослушивания всех топиков - #. Из функции setup() я возвращаю клиента, даю ему метод loop_forever(). Больше я ничего не менял и не добавлял, остальной код оригинальный ТСа

И добавлю скрином результат работы. Я не проверял ваши обработки в on_message() я просто показываю, что данный метод вызывается и работает как и должен. То, на что я вам и указывал

введите сюда описание изображения

PS: раздел "гадание"

меняю эту строку

addend_numbers = []

и получаю

введите сюда описание изображения

→ Ссылка