Реализация паттерна производитель-потребитель с помощью асинхронных функций

Всем доброго времени суток! Есть Websocket-сервер на Arduino, который шлёт данные с датчиков, есть сервер на FastAPI, который подключается к серверу на Arduino, получает с него данные и эти данные уже с сервера FastAPI отсылаются клиентам FastAPI в виде html-страницы. Задумано всё это в целях убрать нагрузку с платы Arduino. Пытаюсь реализовать паттерн производитель-потребитель, ибо график нужно обновлять с минимальной задержкой. Минимально-рабочая реализация паттерна производитель-потребитель:

import asyncio
import random
#import websockets


buffer = asyncio.Queue(1000)


async def send_response():
    while True:
        if not buffer.empty():
            print(buffer.get_nowait())

        await asyncio.sleep(0.001)



async def get_response():
    #async with websockets.connect("ws://192.168.1.10") as websocket:
    while True:
        #response = await websocket.recv()
        response = random.randint(1, 100)
        if buffer.full():
            buffer.get_nowait()
        buffer.put_nowait(response)
        await asyncio.sleep(0.001)


async def main():
    task_1 = asyncio.create_task(get_response())
    task_2 = asyncio.create_task(send_response())
    await task_1
    await task_2


asyncio.run(main())

Эта реализация, в теории, вполне должна справлять с поставленной задачей - "Очередь должна получать данные от Arduino и отдавать их FastAPI. Данные крутятся - графики мутятся".

Но на деле всё совершенно не так.

Код сервера на FastAPI:

import os
import time
import websockets
import asyncio
import json
import logging
import sys
from datetime import datetime
from typing import Iterator
from fastapi import FastAPI
from fastapi.requests import Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from starlette.responses import Response
from websockets.client import connect
from write_to_scv import write_to_scv
import threading


recording_period = 0.001 # период записи данных в .csv
buffer = asyncio.Queue(1000)


logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
application = FastAPI()
templates = Jinja2Templates(directory="templates")


async def connect_to_arduino():
    print("Connection to server...")
    async with websockets.connect("ws://192.168.1.10") as websocket:
        print("Connected")
        while True:
            response = await websocket.recv()
            print(response)
            if buffer.full():
                buffer.get_nowait()
                buffer.put_nowait(response)
            else:
                buffer.get_nowait()


def connect_to_server():
    asyncio.run(connect_to_arduino())


async def start_uvicorn():
    os.system("uvicorn test:application --port 80")


def start_webserver():
    asyncio.run(start_uvicorn())


@application.get("/", response_class=HTMLResponse)
def index(request: Request) -> Response:
    return templates.TemplateResponse("index.html", {"request": request})


def generate_random_data(request: Request) -> Iterator[str]:
    client_ip = request.client.host
    logger.info("Client %s connected", client_ip)
    fixed_time = time.time()
    while True:
        if buffer.empty():
            #print("buffer is empty")
            continue

        value = buffer.get()
        date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        shock_sensor = value[0]
        voice_sensor = value[1:]

        json_data = json.dumps(
            {
                "time": date,
                "shock_sensor": shock_sensor,
                "voice_sensor": voice_sensor,
            }
        )
        yield f"data:{json_data}\n\n"

        current_time = time.time()
        if current_time - fixed_time > recording_period:
            #write_to_scv(date, voice_sensor)
            fixed_time = time.time()


@application.get("/chart-data")
def chart_data(request: Request) -> StreamingResponse:
    response = StreamingResponse(generate_random_data(request), media_type="text/event-stream")
    response.headers["Cache-Control"] = "no-cache"
    response.headers["X-Accel-Buffering"] = "no"
    return response


if __name__=="__main__":
    server = threading.Thread(target=connect_to_server, daemon=True)
    server.start()
    client = threading.Thread(target=start_webserver)
    client.start()

FastAPI подключается к Arduino и на этом всё. Данные даже не выводятся в терминале. Осознание того, что глобальный буфер - не самая лучшая идея по ряду причин, но на данном этапе разработки всё таки хочется детально понять, в чем проблема.

Подскажите, пожалуйста, почему 2 разных потока видят глобальную переменную "по-своему"? Почему данные, полученные с сервера, не выводятся в терминале? Заранее спасибо за выделенное время!

Оставлю ссылочку на предыдущую реализацию, где клиент FastAPI каждый раз создавал новое подключение к Arduino : https://github.com/Whynot46/Arduino_Websocket_Server


Ответы (0 шт):