Прослушать websocket сервера, находящегося в другой подсети
Есть некий сервер, расположенный в другой подсети по следующему адресу ws://192.168.48.33:9090/ws/uplink/app
,
который посылает данные телеметрии по websocket. Как реализовать прослушку сервера на FastAPI?
Написал следующий код, но прослушка только на локальной машине ws://127.0.0.1:8000/ws
.
main.py
import ...
@app.websocket("/ws")
async def websocket(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
print(data)
Ответы (1 шт):
Автор решения: Almaz
→ Ссылка
import uvicorn
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket
import asyncio
import websockets
def create_websocket_client_task(app_state):
"""
Establish a WebSocket client connection and handle incoming messages in a loop.
"""
async def websocket_client():
uri = "ws://localhost:5000/ws" # Replace with your WebSocket server URI
try:
async with websockets.connect(uri) as websocket:
print(f"Connected to WebSocket server at {uri}")
while app_state["running"]:
try:
message = await websocket.recv() # Receive a message
print(f"Received message: {message}")
# Process the message as needed
except websockets.ConnectionClosed:
print("WebSocket connection closed")
break
except Exception as e:
print(f"Error in WebSocket connection: {e}")
return websocket_client
@asynccontextmanager
async def lifespan(app: FastAPI):
app_state = {"running": True}
# Start WebSocket client in a background asyncio task
websocket_task = asyncio.create_task(create_websocket_client_task(app_state)())
yield # Yield control to FastAPI for its lifespan
# On shutdown, clean up
app_state["running"] = False
await websocket_task # Wait for the WebSocket task to complete
print("WebSocket client shutdown")
# Create the FastAPI app with lifespan
app = FastAPI(lifespan=lifespan)