aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFranoosh <uinarf@autistici.org>2025-07-25 17:13:38 +0200
committerFranoosh <uinarf@autistici.org>2025-10-15 14:33:59 +0200
commit68bd1bd052a7cd6438b92cb1059ef5e58b8d022c (patch)
tree5a7eab3022a7593bd3d9dbdcc99a1590ab0fc3bc
downloadZeroMQ_Video_Streaming-68bd1bd052a7cd6438b92cb1059ef5e58b8d022c.tar.gz
ZeroMQ_Video_Streaming-68bd1bd052a7cd6438b92cb1059ef5e58b8d022c.tar.bz2
ZeroMQ_Video_Streaming-68bd1bd052a7cd6438b92cb1059ef5e58b8d022c.zip
Initial commit. Proof of concept message passing between client <-> router <-> worker with rudimentary caching
-rw-r--r--client.py176
-rw-r--r--helpers.py44
-rw-r--r--router.py207
-rw-r--r--static/css/main.css21
-rw-r--r--templates/client.html47
-rw-r--r--templates/client.html.bak41
-rw-r--r--templates/main.html16
-rw-r--r--templates/main.html.bak35
-rw-r--r--webserver.py142
-rw-r--r--worker.py235
10 files changed, 964 insertions, 0 deletions
diff --git a/client.py b/client.py
new file mode 100644
index 0000000..e9dce4f
--- /dev/null
+++ b/client.py
@@ -0,0 +1,176 @@
+#!/usr/bin/env python
+
+"""
+A module containing client for streaming video to a zmq router.
+"""
+
+__author__ = "Franoosh Corporation"
+
+from os import path
+from threading import Thread, Event
+from queue import Queue
+import time
+import datetime
+import signal
+import logging
+from configparser import ConfigParser
+import traceback
+import zmq
+
+from helpers import CustomLoggingFormatter
+
+ROUTER_ADDRESS = "tcp://localhost"
+PORT = "5569"
+ADDRESS = f"{ROUTER_ADDRESS}:{PORT}"
+
+CONFIG_FILE = "client.cfg"
+LOGFILE = f"{path.splitext(path.basename(__file__))[0]}.log"
+LOGLEVEL = logging.DEBUG
+CAMERAS = {'front_camera': '/dev/video0', 'back_camera': '/dev/video1'}
+SLEEP_TIME = 5
+
+
+stop_event = Event()
+
+log_formatter = CustomLoggingFormatter()
+handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
+handler.setFormatter(log_formatter)
+logging.root.addHandler(handler)
+logging.root.setLevel(LOGLEVEL)
+logger = logging.getLogger(__name__)
+logging.basicConfig(
+ filename=LOGFILE,
+ datefmt='%Y-%m-%d %I:%M:%S',
+ level=LOGLEVEL,
+)
+
+def read_config(conf_file):
+ """Read config file and return as dictionary."""
+ cfg = ConfigParser()
+ cfg.read(conf_file)
+
+ return {key: dict(cfg.items(key)) for key in cfg.sections()}
+
+
+class ClientVideo(Thread):
+ """Class for sending video stream"""
+ def __init__(self, _id, device, queue):
+ super().__init__(daemon=True)
+ self.identity = _id
+ self.device = device
+ self.queue = queue
+ self.live = True
+
+ def run(self):
+ """Replace with actual video streaming logic."""
+ ping_no = 0
+ while self.live and not stop_event.is_set():
+ try:
+ # Four parts required to start/stop video stream, three parts for ping:
+ if ping_no == 0: # Start video stream
+ timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
+ metadata = f"start:{timestamp}"
+ logger.debug("ClientVideo '%s' sending metadata: '%s'", self.identity, metadata)
+ message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b'']
+ self.queue.put(message)
+ text = f"ping-{ping_no}"
+ message = [self.identity.encode('utf-8'), text.encode('utf-8')]
+ elif ping_no >= 5: # Stop video stream
+ logger.debug("ClientVideo '%s' sending stop signal", self.identity)
+ timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
+ metadata = f"stop:{timestamp}"
+ logger.debug("ClientVideo '%s' sending metadata: '%s' and ping: %r", self.identity, metadata, b'')
+ message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b'']
+ self.live = False
+ else: # Send ping
+ text = f"ping-{ping_no}"
+ message = [self.identity.encode('utf-8'), text.encode('utf-8')]
+ logger.debug("ClientVideo '%s' sending message: %r", self.identity, message)
+ self.queue.put(message)
+ ping_no += 1
+ except Exception as exc:
+ logger.error("ClientVideo: socket error: %r", exc)
+ time.sleep(SLEEP_TIME)
+
+ logger.info("ClientVideo '%s' closing socket ...", self.identity)
+
+ def stop(self):
+ logger.info("Client video '%s' exiting ...", self.identity)
+ self.live = False
+
+
+class ClientTask(Thread):
+ """Main Client Task with logic,
+ message handling and setup."""
+
+ def __init__(self, _id, context=None):
+ super().__init__(daemon=True)
+ self.identity = _id
+ self.context = context or zmq.Context.instance()
+ self.socket = self.context.socket(zmq.DEALER)
+ self.socket.identity = self.identity.encode("utf-8")
+ self.poll = zmq.Poller()
+ self.poll.register(self.socket, zmq.POLLIN)
+ self.video_threads = []
+ self.running = True
+ self.connected = False
+
+ def run(self):
+ self.socket.connect(ADDRESS)
+ logger.debug("Client '%s' connected to %s", self.identity, ADDRESS)
+ logger.debug("starting video threads ...")
+ q = Queue()
+ for _id, device in CAMERAS.items():
+ vid_cl = ClientVideo(_id, device, q)
+ vid_cl.start()
+ self.video_threads.append(vid_cl)
+ while self.running:
+ try:
+ sockets = dict(self.poll.poll(1000))
+ if self.socket in sockets:
+ try:
+ message = self.socket.recv_multipart()
+ logger.debug("Client '%s' received message: %r", self.identity, message)
+ except Exception:
+ logger.error("Failed to receive message: %r", traceback.format_exc())
+ if q.qsize() > 0:
+ frame = q.get()
+ if frame is not None:
+ logger.debug("Processing frame: %r", frame)
+ self.socket.send_multipart(frame)
+ except zmq.ZMQError as exc:
+ logger.error("Client '%s': socket error: %r", self.identity, exc)
+
+ logger.info("Closing socket ...")
+ self.socket.close()
+ logger.debug("Terminating context ...")
+ self.context.term()
+
+ def stop(self):
+ """Stop task"""
+ logger.info("ClientTask cleaning up ...")
+ for thread in self.video_threads:
+ logger.info("Cleaning u video thread ...")
+ thread.stop()
+ thread.join()
+ self.video_threads = []
+ logger.info("Client task exiting ...")
+ self.running = False
+
+
+def signal_handler(sig, frame):
+ logger.info("Received signal handler '%r', stopping client ...", sig)
+ client.stop()
+
+
+if __name__ == "__main__":
+
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ logger.info("Starting up ...")
+ client = ClientTask("client_task")
+
+ client.start()
+
+ client.join()
diff --git a/helpers.py b/helpers.py
new file mode 100644
index 0000000..409a253
--- /dev/null
+++ b/helpers.py
@@ -0,0 +1,44 @@
+#!/usr/bin/env python
+
+"""
+Helper functions for zmq video messaging.
+"""
+
+__author__ = "Franoosh Corporation"
+
+
+import logging
+
+
+class CustomLoggingFormatter(logging.Formatter):
+ """Custom logging formatter"""
+ debug_fmt = 'DEBUG: %(filename)s:%(lineno)d %(asctime)s %(message)s'
+ info_fmt = 'INFO: %(asctime)s %(message)s'
+ warning_fmt = 'WARNING: %(asctime)s %(message)s'
+ error_fmt = 'ERROR: %(asctime)s %(message)s'
+ critical_fmt = 'CRITICAL: %(asctime)s %(message)s'
+
+ def __init__(self):
+ super().__init__(
+ fmt="%(levelno)d: %s(asctime)s %(message)s",
+ datefmt=None,
+ )
+
+ def format(self, record):
+ orig_fmt = self._style._fmt
+ if record.levelno == logging.DEBUG:
+ self._style._fmt = CustomLoggingFormatter.debug_fmt
+ elif record.levelno == logging.INFO:
+ self._style._fmt = CustomLoggingFormatter.info_fmt
+ elif record.levelno == logging.WARNING:
+ self._style._fmt = CustomLoggingFormatter.warning_fmt
+ elif record.levelno == logging.ERROR:
+ self._style._fmt = CustomLoggingFormatter.error_fmt
+ elif record.levelno == logging.CRITICAL:
+ self._style._fmt = CustomLoggingFormatter.critical_fmt
+
+ result = logging.Formatter.format(self, record)
+ self._style._fmt = orig_fmt
+
+ return result
+
diff --git a/router.py b/router.py
new file mode 100644
index 0000000..61b2f72
--- /dev/null
+++ b/router.py
@@ -0,0 +1,207 @@
+#!/usr/bin/env python
+
+"""
+A module containing zeromq router for video streaming
+from a client and sending control messages from server.
+"""
+
+__author__ = "Franoosh Corporation"
+
+import os
+import signal
+import logging
+import random
+from collections import defaultdict, deque
+import zmq
+
+from helpers import CustomLoggingFormatter
+
+# TODO: add configparser
+
+ADDR_FRONTEND = 'tcp://localhost'
+ADDR_BACKEND = 'tcp://localhost'
+
+PORT_FRONTEND = "5569"
+PORT_BACKEND = "9979"
+
+INITIAL_FRONTEND_ADDR = f"{ADDR_FRONTEND}:{PORT_FRONTEND}"
+INITIAL_BACKEND_ADDR = f"{ADDR_BACKEND}:{PORT_BACKEND}"
+
+LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log"
+LOGLEVEL = logging.DEBUG
+
+log_formatter = CustomLoggingFormatter()
+handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
+handler.setFormatter(log_formatter)
+logging.root.addHandler(handler)
+logging.root.setLevel(LOGLEVEL)
+logger = logging.getLogger(__name__)
+logging.basicConfig(
+ filename=LOGFILE,
+ datefmt='%Y-%m-%d %I:%M:%S',
+ level=LOGLEVEL,
+)
+
+
+RUNNING = True
+
+def custom_proxy():
+ context = zmq.Context.instance()
+ poller = zmq.Poller()
+
+ # Set up frontend:
+ frontend_socket = context.socket(zmq.ROUTER)
+ frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1)
+ frontend_socket.bind(INITIAL_FRONTEND_ADDR)
+ poller.register(frontend_socket, zmq.POLLIN)
+
+ # Set up backend:
+ backend_socket = context.socket(zmq.ROUTER)
+ backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1)
+ backend_socket.bind(INITIAL_BACKEND_ADDR)
+ poller.register(backend_socket, zmq.POLLIN)
+
+ awailable_workers = []
+ client_worker_dict = {}
+ # There are three pending messages dictionaries:
+ # 1. pending_messages_no_worker[client_id] - messages that are pending for a client
+ # 2. pending_messages_client_worker[client_id][worker_id] - messages that are pending for a client with a specific worker
+ # 3. pending_messages_worker_client[worker_id][client_id] - messages that are pending for a worker
+ # To assure delivey of the entire video we need to store messages from start to end until they are delivered
+ # to the client.
+ # If a worker is not available for certain amount of time, we will reassign it to another worker.
+ # For now, there is only one worker. Remember to implement this later.
+ # Complex case: half delivered video to the worker, worker becomes unavailable and we need to reassign it to another worker.
+
+ pending_messages_no_worker = defaultdict(deque) # Messages that are pending for a client with no worker assigned
+ pending_messages_client_worker = defaultdict(lambda: defaultdict(deque))
+ pending_messages_worker_client = defaultdict(lambda: defaultdict(deque))
+
+ while RUNNING:
+ events = dict(poller.poll(1000)) # Poll both frontend and backend sockets:
+ if frontend_socket in events: # If message from client:
+ msg = frontend_socket.recv_multipart() # Receive message from client:
+ logger.debug("Received message: %r", msg)
+ client_id, content = msg[0], msg[1:]
+ if not client_id in client_worker_dict: # If client is not in client-worker dictionary:
+ logger.info("Received ID from client: %r.", client_id)
+ # Check if client is already connected to worker:
+ try:
+ worker_id = client_worker_dict[client_id] # Get worker ID for this client from the client-worker dictionary
+ while pending_messages_client_worker[client_id][worker_id]: # Check if there are any pending messages for this worker and send them first:
+ logger.debug("There are pending messages from client %r for worker %r", client_id, worker_id)
+ pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
+ try:
+ logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id)
+ backend_socket.send_multipart([worker_id, client_id, *pending_msg])
+ except Exception as e:
+ logger.error("Failed to send pending message: %r to worker: %r, error: %r", pending_msg, worker_id, e)
+ pending_messages_client_worker[client_id][worker_id].append(pending_msg) # Re-queue the message
+ break # Break to avoid infinite loop
+ if pending_messages_client_worker[client_id][worker_id]: # If there are still pending messages for this worker:
+ pending_messages_client_worker[client_id][worker_id].appendleft(content) # Store message for later delivery
+ else:
+ try:
+ logger.debug("No more pending messages for client %r, sending new message to worker %r: %r", client_id, worker_id, msg)
+ backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker:
+ except Exception as e:
+ logger.error("Failed to send message to backend: %r, error: %s", msg, e)
+ pending_messages_client_worker[client_id][worker_id].appendleft(msg) # Store message for later delivery
+ except KeyError: # If client is not connected to any worker:
+ logger.debug("Client '%s' is not connected to any worker, checking available workers", client_id)
+ if awailable_workers: # If there are available workers:
+ worker_id = random.choice(awailable_workers) # Assign random worker to client
+ client_worker_dict[client_id] = worker_id # Update client-worker dictionary
+ if pending_messages_no_worker[client_id]: # Check if there are any pending messages for this client:
+ pending_messages_client_worker[client_id][worker_id].extendleft(msg) # Move messages to pending messages for this worker
+ logger.debug("Moved pending messages for client '%s' to worker '%s'", client_id, worker_id)
+ while pending_messages_client_worker[client_id][worker_id]: # Check if there are any pending messages for this worker:
+ pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
+ try:
+ logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id)
+ backend_socket.send_multipart([worker_id, client_id, *pending_msg])
+ except Exception as e:
+ logger.error("Failed to send pending message: %r to worker: %r. Error: %s", pending_msg, worker_id, e)
+ pending_messages_client_worker[client_id][worker_id].append(pending_msg) # Re-queue the message
+ break
+ if pending_messages_client_worker[client_id][worker_id]: # If there are still pending messages for this worker:
+ logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery: %r", client_id, worker_id, content)
+ pending_messages_client_worker[client_id][worker_id].appendleft(content) # Store message for later delivery
+ else:
+ logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg)
+ try:
+ logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg)
+ backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker:
+ except Exception as e:
+ logger.error("Failed to send message %r for client %r to backend, requeing. Error: %s", msg, client_id, e)
+ pending_messages_client_worker[client_id][worker_id].appendleft(msg) # Store message for later delivery
+
+ else: # If no workers are available, assign a new worker:
+ pending_messages_no_worker[client_id].append(content) # Store message for later delivery
+ logger.debug("No available workers, storing client '%s' with no worker assigned", client_id)
+
+ if backend_socket in events:
+ _msg = backend_socket.recv_multipart()
+ logger.debug("Received from worker: %r", _msg)
+ if len(_msg) == 2: # Worker is sending its identity:
+ worker_id, msg = _msg[0], _msg[1:]
+ logger.debug("Received ID from worker: %r, message: %r", worker_id, msg)
+ logger.info("Received ID from worker: %r.", worker_id)
+ awailable_workers.append(worker_id) # Add worker to available workers list
+ for client, worker in client_worker_dict.items():
+ if worker is None: # If client has no worker assigned:
+ client_worker_dict[client] = worker
+ logger.debug("Assigning worker %r to client %r", worker, client)
+ if pending_messages_no_worker:
+ # If there are pending messages for clients with no worker assigned:
+ for client_id, messages in pending_messages_no_worker.items():
+ if messages:
+ pending_messages_client_worker[client_id][worker_id].extendleft(messages) # Move messages to pending messages for this worker
+ pending_messages_no_worker[client_id].clear() # Clear pending messages for this client
+ logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id)
+ try:
+ pending_msg = pending_messages_client_worker[client_id][worker_id].pop() # Get the first message for this client and worker
+ logger.debug(
+ "Sending pending message %r to worker %r for client '%r'", pending_msg, worker_id, client_id)
+ backend_socket.send_multipart([worker_id, client_id, *pending_msg]) # Send the message to worker
+ except Exception as e:
+ pending_messages_client_worker[client_id][worker_id].append(pending_msg)
+ logger.error("Failed to send pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e)
+
+ else: # Worker is sending a message to client:
+ worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:]
+ logger.debug("Received message from worker '%s' for client '%s': %r", worker_id, client_id, msg)
+ # First check if there are any pending messages for this worker:
+ while pending_messages_worker_client[worker_id][client_id]: # Check if there are any pending messages for this worker:
+ pending_msg = pending_messages_worker_client[worker_id][client_id].pop()
+ try:
+ logger.debug("Sending pending message %r from %r to client '%r'", pending_msg, client_id, worker_id)
+ backend_socket.send_multipart([client_id, *pending_msg])
+ except Exception as e:
+ # It is safe to log the message content as it is a short string or bytes.
+ logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e)
+ pending_messages_worker_client[worker_id][client_id].append(pending_msg) # Re-queue the message
+ break # Break to avoid infinite loop
+ # Now send the message to the client if no pending messages for this client:
+ if not pending_messages_worker_client[worker_id][client_id]: # If there are no pending messages for this worker:
+ logger.debug("Sending message %r to client '%s'", msg, client_id)
+ try:
+ frontend_socket.send_multipart([client_id, *msg])
+ logger.debug("Sent message %r to client '%r'", msg, client_id)
+ except Exception as e:
+ pending_messages_worker_client[worker_id][client_id].appendleft(msg)
+ logger.error("Could not deliver message: %r from worker: %r to client: '%r'", msg, worker_id, client_id)
+
+def signal_handler(sig, frame):
+ global RUNNING
+ logger.info("Received signal handler %r, stopping ...", signal.Signals(sig).name)
+ RUNNING = False
+
+
+if __name__ == "__main__":
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ logger.info("Starting up ...")
+
+ custom_proxy()
diff --git a/static/css/main.css b/static/css/main.css
new file mode 100644
index 0000000..4bb18b2
--- /dev/null
+++ b/static/css/main.css
@@ -0,0 +1,21 @@
+.streams-container {
+ display: flex;
+ flex-wrap: wrap;
+ gap: 16px;
+ justify-content: center;
+}
+.camera-stream {
+ flex: 0 1 320px;
+ margin: 10px;
+ text-align: center;
+}
+@media (max-width: 600px) {
+.streams-container {
+ flex-direction: column;
+ align-items: center;
+}
+.camera-stream {
+ width: 100%;
+ max-width: 100vw;
+}
+}
diff --git a/templates/client.html b/templates/client.html
new file mode 100644
index 0000000..e03394a
--- /dev/null
+++ b/templates/client.html
@@ -0,0 +1,47 @@
+<!DOCTYPE html>
+<html>
+<head>
+ <title>Client {{ client_id }} - Camera Streams</title>
+ <style>
+ .camera-stream { display: inline-block; margin: 10px; }
+ video { border: 1px solid #333; }
+ </style>
+</head>
+<body>
+ <h1>Camera Streams for client: {{ client_id }}</h1>
+ <div class="streams-container">
+ {% for camera_id in camera_ids %}
+ <div class="camera-stream">
+ <h2>Camera {{ camera_id }}</h2>
+ <img id="video-{{ camera_id }}" width="640" height="480" />
+ </div>
+ {% endfor %}
+ </div>
+ <script>
+ // For each camera, open a WebSocket and update the corresponding <img>
+ {% for camera_id in camera_ids %}
+ (function() {
+ let ws = new WebSocket('ws://' + window.location.host + '/ws/{{ client_id }}/{{ camera_id }}');
+ let image = document.getElementById('video-{{ camera_id }}');
+ let currentUrl = null;
+ ws.onmessage = function(event) {
+ if (currentUrl) {
+ URL.revokeObjectURL(currentUrl);
+ }
+ currentUrl = URL.createObjectURL(event.data);
+ image.src = currentUrl;
+ };
+ ws.onclose = function(event) {
+ console.log('WebSocket closed for camera {{ camera_id }}:', event);
+ };
+ ws.onerror = function(event) {
+ console.log('WebSocket error for camera {{ camera_id }}:', event);
+ };
+ window.addEventListener('beforeunload', function() {
+ ws.close();
+ });
+ })();
+ {% endfor %}
+ </script>
+</body>
+</html>
diff --git a/templates/client.html.bak b/templates/client.html.bak
new file mode 100644
index 0000000..9a4ff08
--- /dev/null
+++ b/templates/client.html.bak
@@ -0,0 +1,41 @@
+<!DOCTYPE html>
+<html>
+<head>
+ <title>Client {{ client_id }} - Camera Streams</title>
+ <style>
+ .camera-stream { display: inline-block; margin: 10px; }
+ video { border: 1px solid #333; }
+ </style>
+</head>
+<body>
+ <h1>Camera Streams for Client {{ client_id }}</h1>
+ {% for camera_id in camera_ids %}
+ <div class="camera-stream">
+ <h2>Camera {{ camera_id }}</h2>
+ <img id="video-{{ camera_id }}" width="640" height="480" />
+ </div>
+ {% endfor %}
+ <script>
+ {% for camera_id in camera_ids %}
+ (function() {
+ // const ws = new WebSocket("ws://127.0.0.1:8008/ws/client_task/front_camera");
+ const ws = new WebSocket("ws://${location.host}/ws/{{ client_id }}/{{ camera_id }}");
+ const img = document.getElementById("video-{{ camera_id }}");
+ ws.binaryType = "arraybuffer";
+ ws.onopen = function(event) {
+ console.log("Connection on websocket open");
+ }
+ ws.onmessage = function(event) {
+ console.log("Received frame data size: ", event.data.byteLength);
+ const blob = new Blob([event.data], {type: "image/jpeg"});
+ // document.getElementById("video").src = URL.createObjectURL(blob);
+ img.src = URL.createObjectURL(blob);
+ };
+ ws.onerror = function(event) {
+ console.error("Websocket error: ", event);
+ };
+ })();
+ {% endfor %}
+ </script>
+</body>
+</html>
diff --git a/templates/main.html b/templates/main.html
new file mode 100644
index 0000000..e0eaa52
--- /dev/null
+++ b/templates/main.html
@@ -0,0 +1,16 @@
+<!DOCTYPE html>
+<html>
+<head>
+ <title>Live Streaming</title>
+</head>
+<body>
+ <h1>Streaming live.</h1>
+ {% for client_id in clients.keys() %}
+ <h2>Client {{ client_id }}</h2>
+ <li>
+ <a href="/clients/{{ client_id }}"> Client {{ client_id }} streams
+ </a>
+ <li>
+ {% endfor %}
+</body>
+</html>
diff --git a/templates/main.html.bak b/templates/main.html.bak
new file mode 100644
index 0000000..7bb2ee5
--- /dev/null
+++ b/templates/main.html.bak
@@ -0,0 +1,35 @@
+<!DOCTYPE html>
+<html>
+ <head>
+ <title>Live Streaming</title>
+ </head>
+ <body>
+ <img id="frame" src="">
+ <h1>Streaming. Live.</h1>
+ <script>
+ let ws = new WebSocket("ws://127.0.0.1:8880/{{client_id}}/{{camera_id}}");
+ let image = document.getElementById("frame");
+ let currentUrl = null;
+
+ ws.onmessage = function(event) {
+ if (currentUrl) {
+ URL.revokeObjectURL(currentUrl);
+ }
+ currentUrl = URL.createObjectURL(event.data);
+ image.src = currentUrl;
+ };
+
+ ws.onclose = function(event) {
+ console.log("WebSocket closed:", event);
+ };
+
+ ws.onerror = function(event) {
+ console.log("WebSocket error:", event);
+ };
+
+ window.addEventListener('beforeunload', function() {
+ ws.close();
+ });
+ </script>
+ </body>
+</html>
diff --git a/webserver.py b/webserver.py
new file mode 100644
index 0000000..d1c0a1e
--- /dev/null
+++ b/webserver.py
@@ -0,0 +1,142 @@
+#!/usr/bin/env python
+
+"""
+Module serving video from zmq to a webserver.
+"""
+
+__author__ = "Franoosh Corporation"
+
+
+import os
+from collections import defaultdict
+import json
+import logging
+import asyncio
+from threading import Thread
+import uvicorn
+from fastapi import (
+ FastAPI,
+ Request,
+ HTTPException,
+ WebSocket,
+ templating,
+)
+from fastapi.responses import HTMLResponse
+from fastapi.staticfiles import StaticFiles
+
+from helpers import CustomLoggingFormatter
+import zmq
+
+
+CLIENTS_JSON_FILE = os.path.join(os.getcwd(), 'clients.json')
+LOGFILE = 'webserver.log'
+LOGLEVEL = logging.INFO
+
+HOST = "127.0.0.1"
+ZMQPORT = "9979"
+WSPORT = "8008"
+ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}"
+WS_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}"
+
+log_formatter = CustomLoggingFormatter()
+handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
+handler.setFormatter(log_formatter)
+logging.root.addHandler(handler)
+logging.root.setLevel(LOGLEVEL)
+logger = logging.getLogger(__name__)
+logging.basicConfig(
+ filename=LOGFILE,
+ datefmt='%Y-%m-%d %I:%M:%S',
+ level=LOGLEVEL,
+)
+
+
+
+app = FastAPI()
+app.mount("/static", StaticFiles(directory="static"), name="static")
+templates = templating.Jinja2Templates(directory='templates')
+
+# Track websocket connections by (client_id, camera_id)
+ws_connections = defaultdict(dict) # ws_connections[client_id][camera_id] = websocket
+
+# Set up a single ZMQ SUB socket for all websocket connections
+zmq_context = zmq.Context()
+zmq_socket = zmq_context.socket(zmq.SUB)
+zmq_socket.bind(WS_BACKEND_ADDR)
+zmq_socket.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all topics
+poller = zmq.Poller()
+poller.register(zmq_socket, zmq.POLLIN)
+
+def load_clients():
+ try:
+ with open(CLIENTS_JSON_FILE) as f:
+ clients_dict = json.load(f)
+ except FileNotFoundError:
+ clients_dict = {}
+ return clients_dict
+
+@app.get("/")
+async def main_route(request: Request):
+ logger.error("DEBUG: main route visited")
+ clients = load_clients()
+ return templates.TemplateResponse(
+ "main.html",
+ {
+ "request": request,
+ "clients": clients,
+ }
+ )
+
+@app.get("/clients/{client_id}", response_class=HTMLResponse)
+async def client_route(request: Request, client_id: str):
+ """Serve client page."""
+ clients_dict = load_clients()
+ logger.debug("Checking client_id: '%s' in clients_dict: %r.", client_id, clients_dict)
+ if not client_id in clients_dict:
+ return HTTPException(status_code=404, detail="No such client ID.")
+ return templates.TemplateResponse(
+ "client.html",
+ {
+ "request": request,
+ "client_id": client_id,
+ "camera_ids": clients_dict[client_id],
+ },
+ )
+
+
+@app.websocket("/ws/{client_id}/{camera_id}")
+async def camera_route(websocket: WebSocket, client_id: str, camera_id: str):
+ """Serve a particular camera page."""
+ logger.info("Accepting websocket connection for '/ws/%s/%s'.", client_id, camera_id)
+ await websocket.accept()
+ if client_id not in ws_connections:
+ ws_connections[client_id] = {}
+ ws_connections[client_id][camera_id] = websocket
+ try:
+ while True:
+ # Wait for a frame for this client/camera
+ sockets = dict(poller.poll(1000))
+ if zmq_socket in sockets:
+ msg = zmq_socket.recv_multipart()
+ if len(msg) == 3:
+ recv_client_id, recv_camera_id, content = msg
+ recv_client_id = recv_client_id.decode("utf-8")
+ recv_camera_id = recv_camera_id.decode("utf-8")
+ # Only send to the websocket for this client/camera
+ if recv_client_id == client_id and recv_camera_id == camera_id:
+ await websocket.send_bytes(content)
+ except Exception as exc:
+ logger.warning("Connection closed: %r", exc)
+ finally:
+ if client_id in ws_connections and camera_id in ws_connections[client_id]:
+ del ws_connections[client_id][camera_id]
+ await websocket.close()
+
+
+if __name__ == "__main__":
+ uvicorn.run(
+ app,
+ port=8007,
+ host='127.0.0.1',
+ log_level='info',
+ ) \ No newline at end of file
diff --git a/worker.py b/worker.py
new file mode 100644
index 0000000..ad5fe1e
--- /dev/null
+++ b/worker.py
@@ -0,0 +1,235 @@
+#!/usr/bin/env python
+
+__author__ = "Franoosh Corporation"
+
+"""
+A module consisting of a zeromq worker receiving video stream
+via router from clients. Able to send control messages to clients.
+"""
+
+import os
+from threading import Thread, Event
+import time
+import signal
+import logging
+from queue import Queue
+from collections import defaultdict
+import zmq
+
+from helpers import CustomLoggingFormatter
+
+
+ADDR = "tcp://localhost"
+PORT = "9979"
+INITIAL_BACKEND_ADDR = f"{ADDR}:{PORT}"
+LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log"
+LOGLEVEL = logging.DEBUG
+
+stop_event = Event()
+
+log_formatter = CustomLoggingFormatter()
+handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
+handler.setFormatter(log_formatter)
+logging.root.addHandler(handler)
+logging.root.setLevel(LOGLEVEL)
+logger = logging.getLogger(__name__)
+logging.basicConfig(
+ filename=LOGFILE,
+ datefmt='%Y-%m-%d %I:%M:%S',
+ level=LOGLEVEL,
+)
+
+class MonitorTask(Thread):
+ """Monitor task"""
+ def __init__(self, socket):
+ super().__init__(daemon=True)
+ self.socket = socket
+ self.running = True
+
+ def run(self):
+ """Monitor connection on the initial socket.
+ This is heartbeat monitoring"""
+ monitor = self.socket.get_monitor_socket()
+ monitor.setsockopt(zmq.RCVTIMEO, 5000)
+ logger.debug("Monitor socket started.")
+ while self.running:
+ try:
+ event, _ = monitor.recv_multipart()
+ # Resolve the received event type:
+ event_type = int.from_bytes(event[:2], "little")
+ if event_type in (zmq.EVENT_CLOSED, zmq.EVENT_DISCONNECTED):
+ logger.warning("Monitor socket: closed | disconnected")
+ stop_event.set()
+ elif event_type == zmq.EVENT_CONNECT_DELAYED:
+ logger.debug("Monitor socket: event connect delayed")
+ elif event_type == zmq.EVENT_CONNECT_RETRIED:
+ logger.debug("Monitor socket: event connect retried")
+ elif event_type in (zmq.EVENT_CONNECTED, zmq.EVENT_HANDSHAKE_SUCCEEDED):
+ logger.debug("Monitor socket: client connected to router, handshake OK.")
+ stop_event.clear()
+ else:
+ logger.warning("Monitor socket: other event: '%r'", event_type)
+ except zmq.Again:
+ logger.debug("Timeout on monitoring socket.")
+ except Exception as exc: # W: Catching too general exception Exception
+ logger.error("Other exception on monitoring socket: %r", exc)
+
+ # monitor.close()
+
+ def stop(self):
+ """Stop thread"""
+ logger.info("Stopping monitor thread ...")
+ self.running = False
+
+
+class ServerWorker(Thread):
+ """ServerWorker"""
+ def __init__(self, identity, context=None):
+ super().__init__(daemon=True)
+ self.id = identity
+ self.context = context or zmq.Context.instance()
+ self.socket = self.context.socket(zmq.DEALER)
+ self.socket.identity = self.id.encode("utf-8")
+ self.monitor = MonitorTask(self.socket)
+ self.video_threads = defaultdict(lambda: defaultdict(dict))
+ self.connected = False
+ self.running = True
+
+ def run(self):
+ logger.debug("ServerWorker '%s' starting ...", self.id)
+ self.socket.connect(INITIAL_BACKEND_ADDR)
+ self.poller = zmq.Poller()
+ self.poller.register(self.socket, zmq.POLLIN)
+ self.monitor.start()
+
+ while self.running:
+ logger.debug("ServerWorker '%s' waiting for a message ...", self.id)
+ if not self.connected or stop_event.is_set():
+ self.socket.send_multipart([b"READY"]) # Router needs worker identity, hence this
+ time.sleep(1) # Wait a bit before trying to connect again
+
+ sockets = dict(self.poller.poll(1000))
+ if self.socket in sockets:
+ self.connected = True
+ msg = self.socket.recv_multipart()
+ logger.debug("ServerWorker '%s' received message: %r", self.id, msg)
+ filename = None
+ # At the moment we don't expect any other message than a video message:
+ if len(msg) == 4: # This is a message with start/stop directive and no video data.
+ logger.debug("Received start/stop directive: %r", msg)
+ client_id, camera_id, metadata = msg[0], msg[1], msg[2]
+ try:
+ directive, timestamp = metadata.decode("utf-8").split(":")
+ filename = f"{client_id.decode('utf-8')}_{camera_id.decode('utf-8')}-{timestamp}.mp4"
+ logger.debug("Directive: '%s', Timestamp: '%s', Filename: '%s'", directive, timestamp, filename)
+ except ValueError:
+ logger.error("Invalid metadata format: %r", metadata)
+ continue
+ if directive == "start":
+ if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id): # New client or new camera
+ q = Queue()
+ logger.debug("Starting new video thread for client '%s', camera '%s'", client_id, camera_id)
+ video_worker = VideoWorker(filename, q)
+ video_worker.start()
+ self.video_threads[client_id][camera_id] = video_worker
+ elif directive == "stop":
+ if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
+ logger.debug("Stopping video thread for client '%s', camera '%s'", client_id, camera_id)
+ self.video_threads[client_id][camera_id].queue.put(None) # Sentinel to stop the thread
+ del self.video_threads[client_id][camera_id]
+ logger.info("Stopped video thread for client '%s', camera '%s'", client_id, camera_id)
+ else:
+ logger.error("Unknown directive: '%s'", directive)
+ elif len(msg) == 3: # This is a video message with data only
+ logger.debug("Received video message with data only: %r", msg)
+ client_id, camera_id, msg = msg[0], msg[1], msg[2]
+ if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
+ self.video_threads[client_id][camera_id].queue.put(msg)
+ else:
+ logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id)
+ logger.error("Available video threads keys: %r", self.video_threads.keys())
+ logger.error("Available video threads values: %r", self.video_threads.values())
+ elif len(msg) == 2: # This is a ping message from client
+ logger.debug("Received ping message: %r", msg)
+ client_id, camera_id = msg[0], msg[1]
+ if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
+ # Respond with a pong message
+ try:
+ self.socket.send_multipart([client_id, camera_id, b'pong'])
+ logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id)
+ except zmq.ZMQError as exc:
+ logger.error("Failed to send pong: %r", exc)
+
+ else:
+ logger.warning("Received a message of not expected length from client: %r message: %r", client_id, msg)
+ try:
+ self.socket.send_multipart([client_id, camera_id, b'pong'])
+ logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id)
+ except zmq.ZMQError as exc:
+ logger.error("Failed to send pong: %r", exc)
+ else:
+ logger.debug("No message received, polling again ...")
+ time.sleep(5)
+
+ self.monitor.stop()
+ self.monitor.join()
+ for camera_thread in self.video_threads.values():
+ for thread in camera_thread.values():
+ thread.queue.put(None) # Sentinel to unblock queue.get()
+ thread.stop()
+ thread.join()
+
+ def send_control_message(self, client_id, camera_id, message):
+ """Send control message to a specific client and camera."""
+ if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
+ self.socket.send_multipart([client_id.encode("utf-8"), camera_id.encode("utf-8"), message])
+ logger.debug("Sent control message to client '%s', camera '%s': %r", client_id, camera_id, message)
+ else:
+ logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id)
+
+ def stop(self):
+ logger.info("ServerWorker '%s' exiting ...", self.id)
+ self.running = False
+
+
+class VideoWorker(Thread):
+ """Class for video threads."""
+
+ def __init__(self, filename, queue):
+ super().__init__(daemon=True)
+ self.filename = filename
+ self.queue = queue
+ self.live = True
+
+ def run(self):
+ if os.path.exists(self.filename):
+ logger.warning("File '%s' already exists, overwriting ...", self.filename)
+ with open(self.filename, 'wb') as f:
+ logger.info("VideoWorker started writing to file: %s", self.filename)
+ while self.live:
+ # Simulate video processing
+ frame = self.queue.get() # Get frame from queue
+ logger.debug("Processing ('writing to a file') frame for camera: %r", frame)
+ if frame is None:
+ logger.info("VideoWorker received stop signal, exiting ...")
+ break
+ f.write(frame + b'\n') # Write frame to file
+
+
+ def stop(self):
+ logger.info("VideoWorker '' exiting ...")
+ self.live = False
+
+def signal_handler(sig, frame):
+ worker.stop()
+
+if __name__ == "__main__":
+ logger.info("Starting up ...")
+
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ worker = ServerWorker("worker-task")
+ worker.start()
+
+ worker.join()