#!/usr/bin/env python """ Module serving video from zmq to a webserver. """ __author__ = "Franoosh Corporation" import os from collections import defaultdict, deque import json import logging import zmq.asyncio import asyncio from collections import defaultdict from contextlib import asynccontextmanager import uvicorn import zmq from fastapi import ( FastAPI, Request, HTTPException, WebSocket, WebSocketDisconnect, templating, ) from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from helpers import CustomLoggingFormatter CLIENTS_JSON_FILE = os.path.join(os.getcwd(), 'clients.json') CLIENTS_DICT = defaultdict(list) # CLIENTS_DICT[client_id] = [camera_id1, camera_id2, ...] LOGFILE = 'webserver.log' LOGLEVEL = logging.DEBUG HOST = "127.0.0.1" ZMQ_PORT = "9979" WEB_PORT = "8008" CTRL_BACKEND_ADDR = f"tcp://{HOST}:{ZMQ_PORT}" WEB_BACKEND_ADDR = f"tcp://{HOST}:{WEB_PORT}" log_formatter = CustomLoggingFormatter() handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') handler.setFormatter(log_formatter) logging.root.addHandler(handler) logging.root.setLevel(LOGLEVEL) logger = logging.getLogger(__name__) logging.basicConfig( filename=LOGFILE, datefmt='%Y-%m-%d %I:%M:%S', level=LOGLEVEL, ) # Track websocket connections by (client_id, camera_id): ws_connections = defaultdict(dict) # ws_connections[client_id][camera_id] = websocket ws_queues = defaultdict(dict) ctrl_msg_que = asyncio.Queue() # Create ZMQ context and socket: zmq_context = zmq.asyncio.Context() zmq_socket = zmq_context.socket(zmq.DEALER) # Connect to ZMQ backend: zmq_socket.connect(WEB_BACKEND_ADDR) async def zmq_bridge(): """Bridge between ZMQ backend and websocket clients.""" while True: try: data = await zmq_socket.recv_multipart() topic, frame_data = None, None if len(data) == 2: topic, frame_data = data elif len(data) == 3: topic, _, _ = data else: logger.warning("Received invalid ZMQ message: %r", data) continue if topic: client_id, camera_id = topic.decode('utf-8').split(':', 1) if frame_data: # Add client and camera to CLIENTS_DICT if new: if not camera_id in CLIENTS_DICT[client_id]: CLIENTS_DICT[client_id].append(camera_id) else: # No frame data means a notification to remove camera: try: CLIENTS_DICT[client_id].remove(camera_id) except ValueError: pass queue = ws_queues.get(client_id, {}).get(camera_id) if queue and frame_data: if queue.full(): _ = queue.get_nowait() # Discard oldest frame await queue.put(frame_data) if not ctrl_msg_que.empty(): client_id, camera_id, command, args_list = await ctrl_msg_que.get() zmq_socket.send_multipart([ client_id.encode('utf-8'), camera_id.encode('utf-8'), command.encode('utf-8'), ] + args_list) logger.info("Sent control command '%s' with args: %r for '/ws/%s/%s' to backend.", command, args_list, client_id, camera_id) except Exception as e: logger.error("Error in ZMQ bridge: %r", e) # TODO: Check if this loop can be optimized to avoid busy waiting. # Alternative implementation using zmq Poller: # poll = zmq.asyncio.Poller() # poll.register(zmq_socket, zmq.POLLIN) # while True: # try: # sockets = dict(await poll.poll()) # if zmq_socket in sockets: # topic, frame_data = await zmq_socket.recv_multipart() # client_id, camera_id = topic.decode('utf-8').split(':', 1) # set_clients(client_id, camera_id) # queue = ws_queues.get(client_id, {}).get(camera_id) # if queue: # if queue.full(): # _ = queue.get_nowait() # Discard oldest frame # await queue.put(frame_data) # if not ctrl_msg_que.empty(): # client_id, camera_id, command, args_list = await ctrl_msg_que.get() # zmq_socket.send_multipart([ # client_id.encode('utf-8'), # camera_id.encode('utf-8'), # command.encode('utf-8'), # ] + args_list) # logger.info("Sent control command '%s' with args: %r for '/ws/%s/%s' to backend.", command, args_list, client_id, camera_id) # except Exception as e: # logger.error("Error in ZMQ bridge: %r", e) @asynccontextmanager async def lifespan(app: FastAPI): """Create lifespan context for FastAPI app.""" asyncio.create_task(zmq_bridge()) yield app = FastAPI(lifespan=lifespan) app.mount("/static", StaticFiles(directory="static"), name="static") templates = templating.Jinja2Templates(directory='templates') @app.get("/") async def main_route(request: Request): """Serve main page.""" logger.debug("Main route visited") return templates.TemplateResponse( "main.html", { "request": request, "clients": CLIENTS_DICT, } ) @app.get("/clients/{client_id}", response_class=HTMLResponse) async def client_route(request: Request, client_id: str): """Serve client page.""" logger.debug("Checking client_id: '%s' in clients_dict: %r.", client_id, CLIENTS_DICT) if not client_id in CLIENTS_DICT: raise HTTPException(status_code=404, detail="No such client ID.") return templates.TemplateResponse( "client.html", { "request": request, "client_id": client_id, "camera_ids": CLIENTS_DICT[client_id], }, ) @app.websocket("/ws/{client_id}/{camera_id}") async def camera_route(websocket: WebSocket, client_id: str, camera_id: str): """Serve a particular camera page.""" logger.info("Accepting websocket connection for '/ws/%s/%s'.", client_id, camera_id) await websocket.accept() ws_connections[client_id][camera_id] = {'ws': websocket} ws_queues[client_id][camera_id] = asyncio.Queue(maxsize=10) queue = ws_queues[client_id][camera_id] async def send_frames(): while True: frame_data = await queue.get() await websocket.send_bytes(frame_data) async def receive_control(): while True: try: data = await websocket.receive_text() logger.info("Received control message from '/ws/%s/%s': %s", client_id, camera_id, data) # Handle control messages from the client: frontend_message = json.loads(data) for command, args in frontend_message.items(): args_list = [str(arg).encode('utf-8') for arg in args] ctrl_msg_que.put_nowait((client_id, camera_id, command, args_list)) # zmq_control_socket.send_multipart([ # client_id.encode('utf-8'), # camera_id.encode('utf-8'), # command.encode('utf-8'), # b'', # ] + args_list) # logger.info("Sent control command '%s' with args: %r for '/ws/%s/%s' to backend.", command, args_list, client_id, camera_id) logger.info("Put control command '%s' with args: %r for '/ws/%s/%s' on queue to backend.", command, args_list, client_id, camera_id) except json.JSONDecodeError: logger.warning("Received invalid JSON from '/ws/%s/%s': %s", client_id, camera_id, data) except WebSocketDisconnect: logger.info("WebSocket disconnected for '/ws/%s/%s'.", client_id, camera_id) break except Exception as exc: logger.warning("Error receiving control message: %r", exc) send_task = asyncio.create_task(send_frames()) receive_task = asyncio.create_task(receive_control()) try: # await asyncio.gather(send_task, receive_task) await asyncio.wait( [send_task, receive_task], return_when=asyncio.FIRST_COMPLETED, ) finally: send_task.cancel() receive_task.cancel() ws_connections[client_id].pop(camera_id, None) ws_queues[client_id].pop(camera_id, None) if __name__ == "__main__": uvicorn.run( app, port=8007, host='127.0.0.1', log_level='info', )