1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
#!/usr/bin/env python
"""
Module serving video from zmq to a webserver.
"""
__author__ = "Franoosh Corporation"
import os
from collections import defaultdict
import json
import logging
import asyncio
from threading import Thread
import uvicorn
from fastapi import (
FastAPI,
Request,
HTTPException,
WebSocket,
templating,
)
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from helpers import CustomLoggingFormatter
import zmq
CLIENTS_JSON_FILE = os.path.join(os.getcwd(), 'clients.json')
LOGFILE = 'webserver.log'
LOGLEVEL = logging.INFO
HOST = "127.0.0.1"
ZMQPORT = "9979"
WSPORT = "8008"
ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}"
WS_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}"
log_formatter = CustomLoggingFormatter()
handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
handler.setFormatter(log_formatter)
logging.root.addHandler(handler)
logging.root.setLevel(LOGLEVEL)
logger = logging.getLogger(__name__)
logging.basicConfig(
filename=LOGFILE,
datefmt='%Y-%m-%d %I:%M:%S',
level=LOGLEVEL,
)
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = templating.Jinja2Templates(directory='templates')
# Track websocket connections by (client_id, camera_id)
ws_connections = defaultdict(dict) # ws_connections[client_id][camera_id] = websocket
# Set up a single ZMQ SUB socket for all websocket connections
zmq_context = zmq.Context()
zmq_socket = zmq_context.socket(zmq.SUB)
zmq_socket.bind(WS_BACKEND_ADDR)
zmq_socket.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all topics
poller = zmq.Poller()
poller.register(zmq_socket, zmq.POLLIN)
def load_clients():
try:
with open(CLIENTS_JSON_FILE) as f:
clients_dict = json.load(f)
except FileNotFoundError:
clients_dict = {}
return clients_dict
@app.get("/")
async def main_route(request: Request):
logger.error("DEBUG: main route visited")
clients = load_clients()
return templates.TemplateResponse(
"main.html",
{
"request": request,
"clients": clients,
}
)
@app.get("/clients/{client_id}", response_class=HTMLResponse)
async def client_route(request: Request, client_id: str):
"""Serve client page."""
clients_dict = load_clients()
logger.debug("Checking client_id: '%s' in clients_dict: %r.", client_id, clients_dict)
if not client_id in clients_dict:
return HTTPException(status_code=404, detail="No such client ID.")
return templates.TemplateResponse(
"client.html",
{
"request": request,
"client_id": client_id,
"camera_ids": clients_dict[client_id],
},
)
@app.websocket("/ws/{client_id}/{camera_id}")
async def camera_route(websocket: WebSocket, client_id: str, camera_id: str):
"""Serve a particular camera page."""
logger.info("Accepting websocket connection for '/ws/%s/%s'.", client_id, camera_id)
await websocket.accept()
if client_id not in ws_connections:
ws_connections[client_id] = {}
ws_connections[client_id][camera_id] = websocket
try:
while True:
# Wait for a frame for this client/camera
sockets = dict(poller.poll(1000))
if zmq_socket in sockets:
msg = zmq_socket.recv_multipart()
if len(msg) == 3:
recv_client_id, recv_camera_id, content = msg
recv_client_id = recv_client_id.decode("utf-8")
recv_camera_id = recv_camera_id.decode("utf-8")
# Only send to the websocket for this client/camera
if recv_client_id == client_id and recv_camera_id == camera_id:
await websocket.send_bytes(content)
except Exception as exc:
logger.warning("Connection closed: %r", exc)
finally:
if client_id in ws_connections and camera_id in ws_connections[client_id]:
del ws_connections[client_id][camera_id]
await websocket.close()
if __name__ == "__main__":
uvicorn.run(
app,
port=8007,
host='127.0.0.1',
log_level='info',
)
|