#!/usr/bin/env python """ Module serving video from zmq to a webserver. """ __author__ = "Franoosh Corporation" import os from collections import defaultdict import json import logging import asyncio from threading import Thread import uvicorn from fastapi import ( FastAPI, Request, HTTPException, WebSocket, templating, ) from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from helpers import CustomLoggingFormatter import zmq CLIENTS_JSON_FILE = os.path.join(os.getcwd(), 'clients.json') LOGFILE = 'webserver.log' LOGLEVEL = logging.INFO HOST = "127.0.0.1" ZMQPORT = "9979" WSPORT = "8008" ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}" WS_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}" log_formatter = CustomLoggingFormatter() handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') handler.setFormatter(log_formatter) logging.root.addHandler(handler) logging.root.setLevel(LOGLEVEL) logger = logging.getLogger(__name__) logging.basicConfig( filename=LOGFILE, datefmt='%Y-%m-%d %I:%M:%S', level=LOGLEVEL, ) app = FastAPI() app.mount("/static", StaticFiles(directory="static"), name="static") templates = templating.Jinja2Templates(directory='templates') # Track websocket connections by (client_id, camera_id) ws_connections = defaultdict(dict) # ws_connections[client_id][camera_id] = websocket # Set up a single ZMQ SUB socket for all websocket connections zmq_context = zmq.Context() zmq_socket = zmq_context.socket(zmq.SUB) zmq_socket.bind(WS_BACKEND_ADDR) zmq_socket.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all topics poller = zmq.Poller() poller.register(zmq_socket, zmq.POLLIN) def load_clients(): try: with open(CLIENTS_JSON_FILE) as f: clients_dict = json.load(f) except FileNotFoundError: clients_dict = {} return clients_dict @app.get("/") async def main_route(request: Request): logger.error("DEBUG: main route visited") clients = load_clients() return templates.TemplateResponse( "main.html", { "request": request, "clients": clients, } ) @app.get("/clients/{client_id}", response_class=HTMLResponse) async def client_route(request: Request, client_id: str): """Serve client page.""" clients_dict = load_clients() logger.debug("Checking client_id: '%s' in clients_dict: %r.", client_id, clients_dict) if not client_id in clients_dict: return HTTPException(status_code=404, detail="No such client ID.") return templates.TemplateResponse( "client.html", { "request": request, "client_id": client_id, "camera_ids": clients_dict[client_id], }, ) @app.websocket("/ws/{client_id}/{camera_id}") async def camera_route(websocket: WebSocket, client_id: str, camera_id: str): """Serve a particular camera page.""" logger.info("Accepting websocket connection for '/ws/%s/%s'.", client_id, camera_id) await websocket.accept() if client_id not in ws_connections: ws_connections[client_id] = {} ws_connections[client_id][camera_id] = websocket try: while True: # Wait for a frame for this client/camera sockets = dict(poller.poll(1000)) if zmq_socket in sockets: msg = zmq_socket.recv_multipart() if len(msg) == 3: recv_client_id, recv_camera_id, content = msg recv_client_id = recv_client_id.decode("utf-8") recv_camera_id = recv_camera_id.decode("utf-8") # Only send to the websocket for this client/camera if recv_client_id == client_id and recv_camera_id == camera_id: await websocket.send_bytes(content) except Exception as exc: logger.warning("Connection closed: %r", exc) finally: if client_id in ws_connections and camera_id in ws_connections[client_id]: del ws_connections[client_id][camera_id] await websocket.close() if __name__ == "__main__": uvicorn.run( app, port=8007, host='127.0.0.1', log_level='info', )