Не срабатывает on_message по mqtt python
Мне задали задание по mqtt. Задача прослушать несколько каналов, получить значения и провести с ними манипуляцию. Возник вопрос, во время получения информации от клиента, функция on_message не срабатывает. Есть функция, которая срабатывает после отправки, но в ней передается только Client, я ее не исполдьзую. Подскажите что не так. Код который передает данные:
# -*- coding: utf-8 -*-
"""
Файл служит для определения точности вашего алгоритма
Для получения оценки точности, запустите файл на исполнение
"""
import json
from threading import Thread
import time
import paho.mqtt.client as paho
import eval as submission
# import solution as submission
def message_handler(client, userdata, msg):
topic = msg.topic
msg_data = msg.payload.decode()
# print(f"Received {topic}: {msg_data}")
if len(user_test_results) <= current_test_id:
user_test_results.append([])
user_test_results[current_test_id].append([topic, msg_data])
user_test_results = []
current_test_id = 0
def main():
global current_test_id
annot_file = "annotations.json"
with open(annot_file, 'r') as f:
data = json.load(f)
test_cases = data['test_cases']
test_results = data['test_results']
args = submission.setup()
Thread(
target=submission.main_loop,
args=(args,),
daemon=True
).start()
client = paho.Client()
client.on_message = message_handler
if client.connect("localhost", 1883, 60) != 0:
print("Couldn't connect to the mqtt broker")
exit(1)
client.subscribe("odd")
client.subscribe("test_multiply")
client.subscribe("test_addend")
correct = 0
for current_test_id, test_case in enumerate(test_cases):
for topic, msg in test_case:
client.publish(topic, msg, 0)
client.loop_write()
client.loop_read()
for _ in range(10):
client.loop_read()
time.sleep(0.1)
if test_results[current_test_id] == user_test_results[current_test_id]:
correct += 1
total_object = len(test_cases)
print(f"Из {total_object} тестов верны {correct}")
score = correct / total_object
print(f"Точность: {score:.2f}")
if __name__ == '__main__':
main()
Файл, который нужно мне написать:
# -*- coding: utf-8 -*-
import paho.mqtt.client as paho
sum = 0
addend_numbers = 0
def setup():
"""
Функция для инициализации клиента MQTT, подписки на каналы,
создание callback-функций.
Выходные данные: что угодно, всё что вы захотите передать в функцию
main_loop после инициализации MQTT-клиента.
"""
client = paho.Client()
client.on_message = on_message
client.connect("127.0.0.1", 1883, 60)
client.subscribe([("*2", 0), ("*3", 0), ("addend", 0), ("command", 0), ("numbers", 0)])
def main_loop(paho_client):
"""
Функция с циклом для обработки входящих сообщений и отправки исходящих сообщений.
"""
# TODO: Отредактируйте эту функцию по своему усмотрению.
# Код проверки один раз вызовет функцию setup.
# Затем, для каждого теста будет вызывать функцию main_loop.
# Все пользовательские функции должны вызываться из вышеперечисленных.
pass
def on_message(client, userdata, msg):
print("Message received-> " + msg.topic + " " + str(msg.payload))
topic = msg.topic
msg_data = msg.payload.decode("utf-8")
if topic.endswith("2"):
number, channel_name = msg_data.split(";")
result = int(number) * 2
client.publish(channel_name, str(result))
elif topic.endswith("3"):
number, channel_name = msg_data.split(";")
result = int(number) * 3
client.publish(channel_name, str(result))
elif topic == "addend":
addend_numbers.append(int(msg_data))
elif topic == "command":
if addend_numbers:
total_sum = sum(addend_numbers)
client.publish(msg_data, str(total_sum))
addend_numbers.clear() # Очистка после отправки
elif topic == "numbers":
number = int(msg_data)
if number % 2 != 0: # Проверка на нечётность
client.publish("odd", str(number))
Ответы (1 шт):
вы же не вызываете ни одну из loop
функций. Их там как минимум 3 под разные задачи. Например для клиента можно использовать
# ваш код
client.on_message = on_message
client.connect("127.0.0.1", 1883, 60)
client.loop_start()
# код дальше
loop_start()
создает отдельный поток в котором будет слушать сообщения. Он перезапускается автоматически. Использовать в паре с loop_stop()
.
Есть метод, который запускает прослушивание, но при этом блокирует исполнение в этом потоке, loop_forever()
.
Можно использовать метод loop()
для клиента с указанием времени блокировки.
Думаю это то, что вы ищете
UPD
Вот переделанный ваш код, протестированный
import paho.mqtt.client as paho
sum = 0
addend_numbers = 0
def setup():
"""
Функция для инициализации клиента MQTT, подписки на каналы,
создание callback-функций.
Выходные данные: что угодно, всё что вы захотите передать в функцию
main_loop после инициализации MQTT-клиента.
"""
client = paho.Client()
client.on_message = on_message
client.connect("localhost", 1883, 60)
client.subscribe([("*2", 0), ("*3", 0), ("addend", 0), ("command", 0), ("numbers", 0), ("#", 0)])
return client
def main_loop(paho_client):
"""
Функция с циклом для обработки входящих сообщений и отправки исходящих сообщений.
"""
# TODO: Отредактируйте эту функцию по своему усмотрению.
# Код проверки один раз вызовет функцию setup.
# Затем, для каждого теста будет вызывать функцию main_loop.
# Все пользовательские функции должны вызываться из вышеперечисленных.
pass
def on_message(client, userdata, msg):
print("Message received-> " + msg.topic + " " + str(msg.payload))
topic = msg.topic
msg_data = msg.payload.decode("utf-8")
if topic.endswith("2"):
number, channel_name = msg_data.split(";")
result = int(number) * 2
client.publish(channel_name, str(result))
elif topic.endswith("3"):
number, channel_name = msg_data.split(";")
result = int(number) * 3
client.publish(channel_name, str(result))
elif topic == "addend":
addend_numbers.append(int(msg_data))
elif topic == "command":
if addend_numbers:
total_sum = sum(addend_numbers)
client.publish(msg_data, str(total_sum))
addend_numbers.clear() # Очистка после отправки
elif topic == "numbers":
number = int(msg_data)
if number % 2 != 0: # Проверка на нечётность
client.publish("odd", str(number))
# здесь я добавил то чего вам не хватало и то, что я пытался вам донести,
# что должен быть хоть один метод loop
try:
client = setup()
client.loop_forever() # вот он
except Exception as e:
print(f"Ошибка: {e}")
В функции setup()
я добавил топик для прослушивания всех топиков - #
.
Из функции setup()
я возвращаю клиента, даю ему метод loop_forever()
.
Больше я ничего не менял и не добавлял, остальной код оригинальный ТСа
И добавлю скрином результат работы. Я не проверял ваши обработки в on_message()
я просто показываю, что данный метод вызывается и работает как и должен. То, на что я вам и указывал
PS: раздел "гадание"
меняю эту строку
addend_numbers = []
и получаю