aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFranoosh <uinarf@autistici.org>2025-10-15 14:42:29 +0200
committerFranoosh <uinarf@autistici.org>2025-10-15 14:42:29 +0200
commit70beed73465ab27449a59c62043d94c16efe00c5 (patch)
tree9f4b5d6a72711af4641b01169da5d20a2228ef64
parent68bd1bd052a7cd6438b92cb1059ef5e58b8d022c (diff)
downloadZeroMQ_Video_Streaming-70beed73465ab27449a59c62043d94c16efe00c5.tar.gz
ZeroMQ_Video_Streaming-70beed73465ab27449a59c62043d94c16efe00c5.tar.bz2
ZeroMQ_Video_Streaming-70beed73465ab27449a59c62043d94c16efe00c5.zip
Added camera support. Movement recognition and video streaming. Web server and frontend. Work in progress. To be fixed: frontend reloading information about client and a page after major changes like camera name or address change, camera removal. Add certificates before testing on actual distributed hardware. Add user login logic.added_video
-rw-r--r--client.py412
-rw-r--r--helpers.py63
-rw-r--r--router.py160
-rw-r--r--templates/client.html98
-rw-r--r--webserver.py214
-rw-r--r--worker.py247
6 files changed, 927 insertions, 267 deletions
diff --git a/client.py b/client.py
index e9dce4f..00e97e4 100644
--- a/client.py
+++ b/client.py
@@ -6,28 +6,51 @@ A module containing client for streaming video to a zmq router.
__author__ = "Franoosh Corporation"
+import sys
from os import path
from threading import Thread, Event
-from queue import Queue
+from queue import Queue, Empty
+from collections import deque
import time
import datetime
import signal
import logging
from configparser import ConfigParser
-import traceback
import zmq
+import cv2
+from collections import defaultdict
-from helpers import CustomLoggingFormatter
+from helpers import (
+ CustomLoggingFormatter,
+ compute_contours,
+ detect_movement,
+ draw_contours,
+ )
-ROUTER_ADDRESS = "tcp://localhost"
-PORT = "5569"
-ADDRESS = f"{ROUTER_ADDRESS}:{PORT}"
+###################################################
+# Configuration, defaults and logging setup
+###################################################
CONFIG_FILE = "client.cfg"
-LOGFILE = f"{path.splitext(path.basename(__file__))[0]}.log"
-LOGLEVEL = logging.DEBUG
-CAMERAS = {'front_camera': '/dev/video0', 'back_camera': '/dev/video1'}
-SLEEP_TIME = 5
+# Those will be updated from config file if present:
+LOGDIR = '.'
+LOGFILE = path.join(LOGDIR, 'client.log')
+LOGLEVEL = logging.INFO
+
+ROUTER_ADDRESS = "localhost"
+PORT = "5569"
+ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}"
+# This is a default:
+CAMERAS = {
+ 'default camera': {
+ 'address': '/dev/video0',
+ },
+}
+KNOWN_CAMERA_PARAMETERS = ('address', 'contour_size_threshold', 'movement_grace_period')
+CONTOUR_SIZE_THRESHOLD = 4000 # Minimum contour area to consider
+MOVEMENT_GRACE_PERIOD = 10 # Seconds to wait after movement stops
+TIME_FORMAT = '%Y_%m_%d-%H_%M_%S'
+START_STOP_MESSAGE_FMT = "{action}:{data}"
stop_event = Event()
@@ -40,57 +63,115 @@ logging.root.setLevel(LOGLEVEL)
logger = logging.getLogger(__name__)
logging.basicConfig(
filename=LOGFILE,
- datefmt='%Y-%m-%d %I:%M:%S',
+ datefmt=TIME_FORMAT,
level=LOGLEVEL,
)
-
-def read_config(conf_file):
- """Read config file and return as dictionary."""
- cfg = ConfigParser()
- cfg.read(conf_file)
-
- return {key: dict(cfg.items(key)) for key in cfg.sections()}
+###################################################
+# End configuration, defaults and logging setup
+###################################################
class ClientVideo(Thread):
"""Class for sending video stream"""
- def __init__(self, _id, device, queue):
+ def __init__(self, _id, device_dict, queue):
super().__init__(daemon=True)
self.identity = _id
- self.device = device
+ self.device_dict = device_dict
+ self.device_addr = device_dict.get('address', '/dev/video0')
+ try:
+ # Could move parameter validation to config reading function:
+ self.device_threshold = int(device_dict['contour_size_threshold'])
+ except (ValueError, KeyError) as exc:
+ logger.error("ClientVideo '%s': Invalid contour size threshold: %r, using default: %d", self.identity, exc, CONTOUR_SIZE_THRESHOLD)
+ self.device_threshold = CONTOUR_SIZE_THRESHOLD
+ try:
+ self.device_grace_period = int(device_dict['movement_grace_period'])
+ except (ValueError, KeyError) as exc:
+ logger.error("ClientVideo '%s': Invalid movement grace period: %r, using default: %d", self.identity, exc, MOVEMENT_GRACE_PERIOD)
+ self.device_grace_period = MOVEMENT_GRACE_PERIOD
self.queue = queue
+ self.frame_deque = deque(maxlen=2) # Store last 2 frames
self.live = True
+ def put_metadata_message(self, action, data):
+ """Put metadata message to queue."""
+ metadata = START_STOP_MESSAGE_FMT.format(
+ action=action,
+ data=data,
+ )
+ logger.info("ClientVideo '%s': %s at: %r", self.identity, action, data)
+ message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b'']
+ self.queue.put(message)
+
+ def put_video_message(self, frame):
+ # Encode frame as JPEG:
+ _, buffer = cv2.imencode('.jpg', frame)
+ # Add identity to message:
+ message = [self.identity.encode('utf-8'), buffer.tobytes()]
+ # Put message to queue:
+ self.queue.put(message)
+
def run(self):
- """Replace with actual video streaming logic."""
- ping_no = 0
+ """Video streaming logic."""
+ logger.debug("ClientVideo '%s' starting ...", self.identity)
+ cap = cv2.VideoCapture(self.device_addr)
+ if not cap.isOpened():
+ logger.error("ClientVideo '%s': Unable to open video device '%s'", self.identity, self.device_addr)
+ return
+
+ movement_detected = False
+ movement_detected_start = None
while self.live and not stop_event.is_set():
- try:
- # Four parts required to start/stop video stream, three parts for ping:
- if ping_no == 0: # Start video stream
- timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
- metadata = f"start:{timestamp}"
- logger.debug("ClientVideo '%s' sending metadata: '%s'", self.identity, metadata)
- message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b'']
- self.queue.put(message)
- text = f"ping-{ping_no}"
- message = [self.identity.encode('utf-8'), text.encode('utf-8')]
- elif ping_no >= 5: # Stop video stream
- logger.debug("ClientVideo '%s' sending stop signal", self.identity)
- timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
- metadata = f"stop:{timestamp}"
- logger.debug("ClientVideo '%s' sending metadata: '%s' and ping: %r", self.identity, metadata, b'')
- message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b'']
- self.live = False
- else: # Send ping
- text = f"ping-{ping_no}"
- message = [self.identity.encode('utf-8'), text.encode('utf-8')]
- logger.debug("ClientVideo '%s' sending message: %r", self.identity, message)
- self.queue.put(message)
- ping_no += 1
- except Exception as exc:
- logger.error("ClientVideo: socket error: %r", exc)
- time.sleep(SLEEP_TIME)
+ ret, frame = cap.read()
+ if not ret:
+ logger.error("ClientVideo '%s': Failed to read frame from device '%s'", self.identity, self.device_addr)
+ break
+ self.frame_deque.append(frame)
+ # Skip on the first frame:
+ if len(self.frame_deque) >= 2:
+ contours = compute_contours(self.frame_deque)
+ logger.debug("ClientVideo '%s': Found %d contours.", self.identity, len(contours))
+ logger.debug("ClientVideo '%s': Contours: %r", self.identity, contours)
+ movement_now = detect_movement(contours, min_area=self.device_threshold)
+ if movement_now:
+ logger.debug("ClientVideo '%s': Movement detected in frame.", self.identity)
+ # Only update movement start time if movement was not detected before:
+ if not movement_detected:
+ # Update movement detected start time:
+ movement_detected_start = datetime.datetime.now()
+ # Only send start message if movement was not detected before:
+ timestamp = movement_detected_start.strftime(TIME_FORMAT)
+ self.put_metadata_message(
+ action='start',
+ data=timestamp,
+ )
+ # and set movement detected flag:
+ movement_detected = True
+ else:
+ logger.debug("ClientVideo '%s': No movement detected in frame.", self.identity)
+
+ if movement_detected:
+ # Draw contours on frame:
+ frame = draw_contours(frame, contours, min_contour_area=self.device_threshold)
+ # Send video message with contour and frame:
+ self.put_video_message(frame)
+
+ # Check if movement has stopped, taking into account grace period:
+ now = datetime.datetime.now()
+ delta_seconds = (now - movement_detected_start).total_seconds()
+ logger.debug("ClientVideo '%s': delta seconds since movement detected: %d, grace remaining: %d.", self.identity, delta_seconds, self.device_grace_period - delta_seconds)
+ # Wait seconds before deciding movement has stopped:
+ if delta_seconds > self.device_grace_period:
+ timestamp = now.strftime(TIME_FORMAT)
+ self.put_metadata_message(
+ action='stop',
+ data=timestamp,
+ )
+ movement_detected = False
+ movement_detected_start = None
+
+ else:
+ logger.debug("ClientVideo '%s': Frame deque length: %d", self.identity, len(self.frame_deque))
logger.info("ClientVideo '%s' closing socket ...", self.identity)
@@ -98,7 +179,6 @@ class ClientVideo(Thread):
logger.info("Client video '%s' exiting ...", self.identity)
self.live = False
-
class ClientTask(Thread):
"""Main Client Task with logic,
message handling and setup."""
@@ -111,35 +191,152 @@ class ClientTask(Thread):
self.socket.identity = self.identity.encode("utf-8")
self.poll = zmq.Poller()
self.poll.register(self.socket, zmq.POLLIN)
- self.video_threads = []
+ # self.video_threads = []
+ self.video_threads = {}
self.running = True
self.connected = False
+ self.queue = Queue()
- def run(self):
- self.socket.connect(ADDRESS)
- logger.debug("Client '%s' connected to %s", self.identity, ADDRESS)
- logger.debug("starting video threads ...")
- q = Queue()
- for _id, device in CAMERAS.items():
- vid_cl = ClientVideo(_id, device, q)
- vid_cl.start()
- self.video_threads.append(vid_cl)
+ def receive_messages(self):
+ """Receive messages from the server."""
while self.running:
try:
sockets = dict(self.poll.poll(1000))
if self.socket in sockets:
try:
message = self.socket.recv_multipart()
- logger.debug("Client '%s' received message: %r", self.identity, message)
- except Exception:
- logger.error("Failed to receive message: %r", traceback.format_exc())
- if q.qsize() > 0:
- frame = q.get()
- if frame is not None:
- logger.debug("Processing frame: %r", frame)
- self.socket.send_multipart(frame)
+ logger.info("Client '%s' received message: %r.", self.identity, message)
+ self.update_config(message)
+ except Exception as exc:
+ logger.error("Failed to receive message: %r", exc)
except zmq.ZMQError as exc:
logger.error("Client '%s': socket error: %r", self.identity, exc)
+ time.sleep(0.01) # Sleep to avoid busy waiting
+
+ def send_messages(self):
+ """Send messages to the server."""
+ while self.running:
+ try:
+ frame = self.queue.get(timeout=0.1)
+ logger.debug("Sending message of length %r ...", len(frame))
+ try:
+ self.socket.send_multipart(frame)
+ except zmq.ZMQError as exc:
+ logger.error("Client '%s': socket error: %r", self.identity, exc)
+ except Empty:
+ continue # No message to send, continue waiting
+
+ def update_config(self, message):
+ """
+ Update configuration based on message from server.
+ Only 'cameras' section can be updated.
+ directives = (
+ 'modify_camera_name',
+ 'modify_camera_threshold',
+ 'modify_camera_grace_pd',
+ 'modify_camera_address',
+ 'add_camera',
+ 'remove_camera',
+ )
+ """
+ try:
+ msg_list = [part.decode('utf-8') for part in message]
+ logger.debug("Client received config message: %r", msg_list)
+ camera_id, directive, value = msg_list[0], msg_list[1], msg_list[2]
+ if directive in (
+ 'modify_camera_name',
+ 'modify_camera_threshold',
+ 'modify_camera_grace_pd',
+ 'modify_camera_address',
+ ):
+ if camera_id not in CAMERAS:
+ logger.warning("Cannot rename unknown camera: %r", camera_id)
+ logger.warning("Known cameras: %r", list(CAMERAS.keys()))
+ else:
+ if directive == 'modify_camera_name':
+ old_name, new_name = camera_id, value
+ if new_name in CAMERAS:
+ raise ValueError("New camera name already exists.")
+ CAMERAS[new_name] = CAMERAS.pop(old_name)
+ self.video_threads[new_name] = self.video_threads.pop(old_name)
+ # Send message to server to update routing:
+ timestamp = datetime.datetime.now().strftime(TIME_FORMAT)
+ self.video_threads[new_name].put_metadata_message(
+ action='rename',
+ data=':'.join([old_name, new_name, timestamp])
+ )
+ self.send_messages
+ for item in cfg.items('cameras'):
+ if item[0].startswith(old_name + '.'):
+ param = item[0].split('.', 1)[1]
+ cfg.set('cameras', f"{new_name}.{param}", item[1])
+ cfg.remove_option('cameras', item[0])
+ logger.info("Renamed and restarted thread for camera '%s' to '%s'.", old_name, new_name)
+ elif directive == 'modify_camera_threshold':
+ new_threshold = int(value)
+ if new_threshold <= 0:
+ raise ValueError("Threshold must be positive.")
+ CAMERAS[camera_id]['contour_size_threshold'] = new_threshold
+ self.video_threads[camera_id].device_threshold = new_threshold
+ cfg.set('cameras', f"{camera_id}.contour_size_threshold", str(new_threshold))
+ logger.info("Modified contour size threshold of camera '%s' to %d.", camera_id, new_threshold)
+ elif directive == 'modify_camera_grace_pd':
+ new_grace_pd = int(value)
+ if new_grace_pd < 0:
+ raise ValueError("Grace period cannot be negative.")
+ CAMERAS[camera_id]['movement_grace_period'] = new_grace_pd
+ self.video_threads[camera_id].device_grace_period = new_grace_pd
+ cfg.set('cameras', f"{camera_id}.movement_grace_period", str(new_grace_pd))
+ logger.info("Modified movement grace period of camera '%s' to %d.", camera_id, new_grace_pd)
+ elif directive == 'modify_camera_address':
+ new_address = value
+ CAMERAS[camera_id]['address'] = new_address
+ self.video_threads[camera_id].device_addr = new_address
+ # Changing address on the fly requires restarting the video thread:
+ self.video_threads[camera_id].stop()
+ self.video_threads[camera_id].join()
+ vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue)
+ vid_cl.start()
+ self.video_threads[camera_id] = vid_cl
+ logger.info("Modified address of camera '%s' to '%s'.", camera_id, new_address)
+ cfg.set('cameras', f"{camera_id}.address", new_address)
+ elif directive == 'add_camera':
+ cam_address = value
+ if camera_id in CAMERAS:
+ raise ValueError("Camera name already exists.")
+ CAMERAS[camera_id] = {'address': cam_address}
+ vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue)
+ vid_cl.start()
+ self.video_threads[camera_id] = vid_cl
+ cfg.set('cameras', f"{camera_id}.address", cam_address)
+ logger.info("Added and started a thread for new camera '%s' with address '%s'.", camera_id, cam_address)
+ else:
+ logger.warning("Unknown config directive: %r", directive)
+
+ except (ValueError, IndexError) as exc:
+ logger.error("Failed to handle config message: %r", exc)
+ cfg.write(open(CONFIG_FILE, 'w'))
+
+ def run(self):
+ """Main client logic."""
+ self.socket.connect(ADDRESS)
+ logger.debug("Client '%s' connected to %s", self.identity, ADDRESS)
+ logger.debug("Starting video threads ...")
+ for _id, device_dict in CAMERAS.items():
+ vid_cl = ClientVideo(_id, device_dict, self.queue)
+ vid_cl.start()
+ self.video_threads[_id] = (vid_cl)
+
+ recv_thread = Thread(target=self.receive_messages, daemon=True)
+ send_thread = Thread(target=self.send_messages, daemon=True)
+
+ recv_thread.start()
+ send_thread.start()
+ logger.debug("Client '%s' started receiving and sending threads.", self.identity)
+
+ logger.debug("Client '%s' waiting for threads to finish ...", self.identity)
+ recv_thread.join()
+ send_thread.join()
logger.info("Closing socket ...")
self.socket.close()
@@ -149,18 +346,86 @@ class ClientTask(Thread):
def stop(self):
"""Stop task"""
logger.info("ClientTask cleaning up ...")
- for thread in self.video_threads:
- logger.info("Cleaning u video thread ...")
+ for _id, thread in self.video_threads.items():
+ logger.info("Cleaning video thread %s ...", _id)
thread.stop()
thread.join()
- self.video_threads = []
+ # self.video_threads = []
+ self.video_threads = {}
logger.info("Client task exiting ...")
self.running = False
+def read_config(conf_file):
+ """
+ Read config file and return as dictionary.
+ The only required section is 'cameras' with at least one camera.
+ Do basic sanity checks and update global variables.
+ Log warnings if config file or sections/options are missing.
+ """
+ cfg = ConfigParser()
+ cfg.read(conf_file)
+ # Need to update logging info:
+ if 'logging' in cfg:
+ global LOGDIR, LOGFILE, LOGLEVEL
+
+ LOGDIR = cfg.get('logging', 'logdir', fallback='.')
+ LOGFILE = cfg.get('logging', 'logfile', fallback=LOGFILE)
+ LOGLEVEL = cfg.get('logging', 'loglevel', fallback=LOGLEVEL)
+ logger.setLevel(LOGLEVEL)
+ for _handler in logger.handlers:
+ _handler.setLevel(LOGLEVEL)
+ if 'network' in cfg:
+ global ROUTER_ADDRESS, PORT, ADDRESS
+ if cfg.has_option('network', 'router_address'):
+ ROUTER_ADDRESS = cfg.get('network', 'router_address')
+ else:
+ logger.warning("No 'router_address' option in 'network' section, using default: %r.", ROUTER_ADDRESS)
+ if cfg.has_option('network', 'router_port'):
+ PORT = cfg.get('network', 'router_port')
+ else:
+ logger.warning("No 'router_port' option in 'network' section, using default: %r.", PORT)
+ ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}"
+ else:
+ logger.warning("No 'network' section in config file, using defaults: %r.", ADDRESS)
+ if 'cameras' in cfg:
+ global CAMERAS
+ # These shenanigans below allow for a nested config simulation with configparser
+ # and so per camera settings:
+ cameras_dict = defaultdict(dict)
+ for key, val in cfg.items('cameras'):
+ try:
+ cam, param = key.split('.', 1)
+ if param in KNOWN_CAMERA_PARAMETERS:
+ cameras_dict[cam][param] = val
+ else:
+ logger.warning("Unknown camera %r parameter: %r", cam, param)
+ except ValueError as exc:
+ logger.error("Invalid camera configuration entry: %r, error: %r, using defaults: %r.", key, exc, CAMERAS)
+ # Check that each camera has at least an address and remove if not:
+ for key in list(cameras_dict.keys()):
+ if 'address' not in cameras_dict[key].keys():
+ logger.error("DEBUG: cameras_dict: %r", cameras_dict)
+ logger.error("Camera %r has no address configured, removing from configuration.", key)
+ del cameras_dict[key]
+ if not cameras_dict:
+ logger.warning("No valid camera configurations found in config file, using defaults: %r.", CAMERAS)
+ else:
+ # Only update global CAMERAS if we have at least one valid camera with an address:
+ CAMERAS = cameras_dict
+ logger.info("Using camera configuration: %r", CAMERAS)
+ else:
+ logger.warning("No 'cameras' section in config file, using defaults: %r.", CAMERAS)
+ logger.info("CAMERAS configuration: %r", CAMERAS)
+ with open(conf_file, 'w') as configfile:
+ cfg.write(configfile)
+
+ return cfg
+
def signal_handler(sig, frame):
logger.info("Received signal handler '%r', stopping client ...", sig)
client.stop()
+ sys.exit(0)
if __name__ == "__main__":
@@ -168,9 +433,14 @@ if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
+ # Read configuration file before starting logging as it may contain logging settings:
+ cfg = read_config(CONFIG_FILE)
+
logger.info("Starting up ...")
+
client = ClientTask("client_task")
client.start()
-
client.join()
+
+ logger.info("Terminating ...") \ No newline at end of file
diff --git a/helpers.py b/helpers.py
index 409a253..8190782 100644
--- a/helpers.py
+++ b/helpers.py
@@ -8,6 +8,8 @@ __author__ = "Franoosh Corporation"
import logging
+import subprocess
+import cv2
class CustomLoggingFormatter(logging.Formatter):
@@ -42,3 +44,64 @@ class CustomLoggingFormatter(logging.Formatter):
return result
+def process_frame(frame):
+ """Process frame for contour detection."""
+ # Convert to grayscale:
+ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
+ # Apply Gaussian blur:
+ blurred = cv2.GaussianBlur(gray, (21, 21), 0)
+
+ return blurred
+
+def compute_contours(frame_deque):
+ """Compute contours from a deque of frames."""
+ contours = []
+ if len(frame_deque) < 2:
+ return contours
+ all_contours = []
+
+ for idx, frame in enumerate(frame_deque):
+ frame_0 = process_frame(frame)
+ try:
+ frame_1 = process_frame(frame_deque[idx+1])
+ except IndexError:
+ break
+ frame_delta = cv2.absdiff(frame_0, frame_1)
+ threshold = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1]
+ threshold = cv2.dilate(threshold, None, iterations=2)
+ contours, _ = cv2.findContours(threshold.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
+ all_contours.extend(contours)
+
+ return all_contours
+
+def draw_contours(frame, contours, min_contour_area=500):
+ """Draw contours on the frame."""
+ for contour in contours:
+ if cv2.contourArea(contour) > min_contour_area:
+ (x, y, w, h) = cv2.boundingRect(contour)
+ cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
+
+ return frame
+
+def detect_movement(contours, min_area=500):
+ """Detect movement based on contours found from frame diff."""
+ for contour in contours:
+ if cv2.contourArea(contour) >= min_area:
+ return True
+ return False
+
+def get_available_cameras():
+ """
+ Get list of available camera devices.
+ At the moment it does not work. At all. It is useless.
+ """
+ proc = subprocess.Popen(['v4l2-ctl', '--list-devices'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout, stderr = proc.communicate()
+ candidate_devices = [i.strip() for i in stdout.decode('utf-8').strip().splitlines()[1:]]
+ verified_devices = []
+ for device in candidate_devices:
+ cap = cv2.VideoCapture(device)
+ if cap.isOpened():
+ verified_devices.append(device)
+ cap.release()
+ return verified_devices \ No newline at end of file
diff --git a/router.py b/router.py
index 61b2f72..12d18c3 100644
--- a/router.py
+++ b/router.py
@@ -18,17 +18,17 @@ from helpers import CustomLoggingFormatter
# TODO: add configparser
-ADDR_FRONTEND = 'tcp://localhost'
-ADDR_BACKEND = 'tcp://localhost'
+IP_FRONTEND = '127.0.0.1'
+IP_BACKEND = '127.0.0.1'
PORT_FRONTEND = "5569"
PORT_BACKEND = "9979"
-INITIAL_FRONTEND_ADDR = f"{ADDR_FRONTEND}:{PORT_FRONTEND}"
-INITIAL_BACKEND_ADDR = f"{ADDR_BACKEND}:{PORT_BACKEND}"
+FRONTEND_ADDR = f"tcp://{IP_FRONTEND}:{PORT_FRONTEND}"
+BACKEND_ADDR = f"tcp://{IP_BACKEND}:{PORT_BACKEND}"
LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log"
-LOGLEVEL = logging.DEBUG
+LOGLEVEL = logging.INFO
log_formatter = CustomLoggingFormatter()
handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
@@ -52,13 +52,13 @@ def custom_proxy():
# Set up frontend:
frontend_socket = context.socket(zmq.ROUTER)
frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1)
- frontend_socket.bind(INITIAL_FRONTEND_ADDR)
+ frontend_socket.bind(FRONTEND_ADDR)
poller.register(frontend_socket, zmq.POLLIN)
# Set up backend:
backend_socket = context.socket(zmq.ROUTER)
backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1)
- backend_socket.bind(INITIAL_BACKEND_ADDR)
+ backend_socket.bind(BACKEND_ADDR)
poller.register(backend_socket, zmq.POLLIN)
awailable_workers = []
@@ -78,101 +78,137 @@ def custom_proxy():
pending_messages_worker_client = defaultdict(lambda: defaultdict(deque))
while RUNNING:
- events = dict(poller.poll(1000)) # Poll both frontend and backend sockets:
- if frontend_socket in events: # If message from client:
- msg = frontend_socket.recv_multipart() # Receive message from client:
- logger.debug("Received message: %r", msg)
+ # Poll both frontend and backend sockets:
+ events = dict(poller.poll(1000))
+ # If message from client:
+ if frontend_socket in events:
+ # Receive message from client:
+ msg = frontend_socket.recv_multipart()
+ logger.debug("Received message.")
client_id, content = msg[0], msg[1:]
- if not client_id in client_worker_dict: # If client is not in client-worker dictionary:
- logger.info("Received ID from client: %r.", client_id)
# Check if client is already connected to worker:
try:
- worker_id = client_worker_dict[client_id] # Get worker ID for this client from the client-worker dictionary
- while pending_messages_client_worker[client_id][worker_id]: # Check if there are any pending messages for this worker and send them first:
- logger.debug("There are pending messages from client %r for worker %r", client_id, worker_id)
+ # Get worker ID for this client from the client-worker dictionary:
+ worker_id = client_worker_dict[client_id]
+ # Check if there are any pending messages for this worker and send them first:
+ while pending_messages_client_worker[client_id][worker_id]:
+ # In order to take a peek at the size of all messages, perhaps check out:
+ # https://web.archive.org/web/20240804164028/https://code.activestate.com/recipes/546530/
+ logger.debug(
+ "There are '%d' pending messages from client %r for worker %r",
+ len(pending_messages_client_worker[client_id][worker_id]),
+ client_id,
+ worker_id,
+ )
pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
try:
- logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id)
+ logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id)
backend_socket.send_multipart([worker_id, client_id, *pending_msg])
except Exception as e:
- logger.error("Failed to send pending message: %r to worker: %r, error: %r", pending_msg, worker_id, e)
- pending_messages_client_worker[client_id][worker_id].append(pending_msg) # Re-queue the message
- break # Break to avoid infinite loop
- if pending_messages_client_worker[client_id][worker_id]: # If there are still pending messages for this worker:
- pending_messages_client_worker[client_id][worker_id].appendleft(content) # Store message for later delivery
+ logger.error("Failed to send pending message to worker: %r, error: %r", worker_id, e)
+ # Re-queue the message:
+ pending_messages_client_worker[client_id][worker_id].append(pending_msg)
+ # Break to avoid infinite loop:
+ break
+ # If there are still pending messages for this worker:
+ if pending_messages_client_worker[client_id][worker_id]:
+ # Store message for later delivery:
+ pending_messages_client_worker[client_id][worker_id].appendleft(content)
+ # At last send new message to worker:
else:
try:
- logger.debug("No more pending messages for client %r, sending new message to worker %r: %r", client_id, worker_id, msg)
- backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker:
+ logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id)
+ backend_socket.send_multipart([worker_id, client_id, *content])
except Exception as e:
- logger.error("Failed to send message to backend: %r, error: %s", msg, e)
- pending_messages_client_worker[client_id][worker_id].appendleft(msg) # Store message for later delivery
- except KeyError: # If client is not connected to any worker:
- logger.debug("Client '%s' is not connected to any worker, checking available workers", client_id)
- if awailable_workers: # If there are available workers:
- worker_id = random.choice(awailable_workers) # Assign random worker to client
- client_worker_dict[client_id] = worker_id # Update client-worker dictionary
- if pending_messages_no_worker[client_id]: # Check if there are any pending messages for this client:
- pending_messages_client_worker[client_id][worker_id].extendleft(msg) # Move messages to pending messages for this worker
+ logger.error("Failed to send message to backend, error: %s", e)
+ # Store message for later delivery:
+ # pending_messages_client_worker[client_id][worker_id].appendleft(msg) <- FIXME: this is the bug, no?
+ pending_messages_client_worker[client_id][worker_id].appendleft(content)
+ # If client is not connected to any worker:
+ except KeyError:
+ logger.info("Received ID from client: %r.", client_id)
+ # Check if there are available workers.
+ if awailable_workers:
+ # Assign random worker to client:
+ worker_id = random.choice(awailable_workers)
+ # Update client-worker dictionary:
+ client_worker_dict[client_id] = worker_id
+ # Check if there are any pending messages for this client:
+ if pending_messages_no_worker[client_id]:
+ # Move messages to pending messages for this worker:
+ pending_messages_client_worker[client_id][worker_id].extendleft(msg)
logger.debug("Moved pending messages for client '%s' to worker '%s'", client_id, worker_id)
- while pending_messages_client_worker[client_id][worker_id]: # Check if there are any pending messages for this worker:
+ # Check if there are any pending messages for this worker:
+ while pending_messages_client_worker[client_id][worker_id]:
pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
try:
- logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id)
+ logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id)
backend_socket.send_multipart([worker_id, client_id, *pending_msg])
except Exception as e:
- logger.error("Failed to send pending message: %r to worker: %r. Error: %s", pending_msg, worker_id, e)
- pending_messages_client_worker[client_id][worker_id].append(pending_msg) # Re-queue the message
+ logger.error("Failed to send pending message to worker: %r. Error: %s", worker_id, e)
+ # Re-queue the message:
+ pending_messages_client_worker[client_id][worker_id].append(pending_msg)
break
- if pending_messages_client_worker[client_id][worker_id]: # If there are still pending messages for this worker:
- logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery: %r", client_id, worker_id, content)
- pending_messages_client_worker[client_id][worker_id].appendleft(content) # Store message for later delivery
+ # If there are still pending messages for this worker:
+ if pending_messages_client_worker[client_id][worker_id]:
+ logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery.", client_id, worker_id)
+ # Store message for later delivery:
+ pending_messages_client_worker[client_id][worker_id].appendleft(content)
else:
- logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg)
+ logger.debug("No more pending messages for client '%s', sending new message to worker '%s'.", client_id, worker_id)
try:
- logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg)
- backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker:
+ logger.debug("No more pending messages for client '%s', sending new message to worker '%s'.", client_id, worker_id)
+ # At last, send the message to worker:
+ backend_socket.send_multipart([worker_id, client_id, *content])
except Exception as e:
- logger.error("Failed to send message %r for client %r to backend, requeing. Error: %s", msg, client_id, e)
- pending_messages_client_worker[client_id][worker_id].appendleft(msg) # Store message for later delivery
+ logger.error("Failed to send message for client %r to backend, requeing. Error: %s", client_id, e)
+ # Store message for later delivery:
+ pending_messages_client_worker[client_id][worker_id].appendleft(msg)
- else: # If no workers are available, assign a new worker:
- pending_messages_no_worker[client_id].append(content) # Store message for later delivery
+ else:
+ # Store message for later delivery:
+ pending_messages_no_worker[client_id].append(content)
logger.debug("No available workers, storing client '%s' with no worker assigned", client_id)
if backend_socket in events:
_msg = backend_socket.recv_multipart()
+ # These messages should be safe to log as they are short strings or bytes.
logger.debug("Received from worker: %r", _msg)
if len(_msg) == 2: # Worker is sending its identity:
worker_id, msg = _msg[0], _msg[1:]
logger.debug("Received ID from worker: %r, message: %r", worker_id, msg)
logger.info("Received ID from worker: %r.", worker_id)
- awailable_workers.append(worker_id) # Add worker to available workers list
+ # Add worker to available workers list:
+ awailable_workers.append(worker_id)
for client, worker in client_worker_dict.items():
- if worker is None: # If client has no worker assigned:
+ # If client has no worker assigned, assign it a new worker:
+ if worker is None:
client_worker_dict[client] = worker
logger.debug("Assigning worker %r to client %r", worker, client)
if pending_messages_no_worker:
# If there are pending messages for clients with no worker assigned:
for client_id, messages in pending_messages_no_worker.items():
if messages:
- pending_messages_client_worker[client_id][worker_id].extendleft(messages) # Move messages to pending messages for this worker
- pending_messages_no_worker[client_id].clear() # Clear pending messages for this client
+ # Move messages to pending messages for this worker:
+ pending_messages_client_worker[client_id][worker_id].extendleft(messages)
+ # Clear pending messages for this client:
+ pending_messages_no_worker[client_id].clear()
logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id)
try:
- pending_msg = pending_messages_client_worker[client_id][worker_id].pop() # Get the first message for this client and worker
- logger.debug(
- "Sending pending message %r to worker %r for client '%r'", pending_msg, worker_id, client_id)
- backend_socket.send_multipart([worker_id, client_id, *pending_msg]) # Send the message to worker
+ # Get the first message for this client and worker:
+ pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
+ logger.debug("Sending pending message to worker %r for client '%r'", worker_id, client_id)
+ # Send the message to worker:
+ backend_socket.send_multipart([worker_id, client_id, *pending_msg])
except Exception as e:
pending_messages_client_worker[client_id][worker_id].append(pending_msg)
- logger.error("Failed to send pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e)
+ logger.error("Failed to send pending message to worker: %r. Error: %r", worker_id, e)
else: # Worker is sending a message to client:
worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:]
logger.debug("Received message from worker '%s' for client '%s': %r", worker_id, client_id, msg)
# First check if there are any pending messages for this worker:
- while pending_messages_worker_client[worker_id][client_id]: # Check if there are any pending messages for this worker:
+ while pending_messages_worker_client[worker_id][client_id]:
pending_msg = pending_messages_worker_client[worker_id][client_id].pop()
try:
logger.debug("Sending pending message %r from %r to client '%r'", pending_msg, client_id, worker_id)
@@ -180,10 +216,12 @@ def custom_proxy():
except Exception as e:
# It is safe to log the message content as it is a short string or bytes.
logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e)
- pending_messages_worker_client[worker_id][client_id].append(pending_msg) # Re-queue the message
- break # Break to avoid infinite loop
- # Now send the message to the client if no pending messages for this client:
- if not pending_messages_worker_client[worker_id][client_id]: # If there are no pending messages for this worker:
+ # Re-queue the message:
+ pending_messages_worker_client[worker_id][client_id].append(pending_msg)
+ # Break to avoid infinite loop:
+ break
+ # If no pending messages for this client, send new message to the client :
+ if not pending_messages_worker_client[worker_id][client_id]:
logger.debug("Sending message %r to client '%s'", msg, client_id)
try:
frontend_socket.send_multipart([client_id, *msg])
diff --git a/templates/client.html b/templates/client.html
index e03394a..5a52f43 100644
--- a/templates/client.html
+++ b/templates/client.html
@@ -12,19 +12,52 @@
<div class="streams-container">
{% for camera_id in camera_ids %}
<div class="camera-stream">
- <h2>Camera {{ camera_id }}</h2>
+ <h2>Camera: {{ camera_id }}</h2>
<img id="video-{{ camera_id }}" width="640" height="480" />
+ <div>
+ <h3>Modify Camera Name</h3>
+ <input type="text" id="new-name-{{ camera_id }}" placeholder="New Name">
+ <button onclick="sendConfig('{{ camera_id }}', 'modify_camera_name')">Send</button>
+ </div>
+ <div>
+ <h3>Modify Camera Threshold</h3>
+ <input type="number" id="threshold-value-{{ camera_id }}" placeholder="Threshold">
+ <button onclick="sendConfig('{{ camera_id }}', 'modify_camera_threshold')">Send</button>
+ </div>
+ <div>
+ <h3>Modify Camera Grace Period</h3>
+ <input type="number" id="grace-value-{{ camera_id }}" placeholder="Grace Period">
+ <button onclick="sendConfig('{{ camera_id }}', 'modify_camera_grace_pd')">Send</button>
+ </div>
+ <div>
+ <h3>Modify Camera Address</h3>
+ <input type="text" id="address-value-{{ camera_id }}" placeholder="New Address">
+ <button onclick="sendConfig('{{ camera_id }}', 'modify_camera_address')">Send</button>
+ </div>
</div>
- {% endfor %}
- </div>
+ {% endfor %}
+ <div>
+ <h3>Add Camera</h3>
+ <input type="text" id="add-name" placeholder="Camera Name">
+ <input type="text" id="add-address" placeholder="Address">
+ <button onclick="sendConfig('add_camera')">Send</button>
+ </div>
+ <div>
+ <h3>Remove Camera</h3>
+ <input type="text" id="remove-name" placeholder="Camera Name">
+ <button onclick="sendConfig('remove_camera')">Send</button>
+ </div>
+ </div>
<script>
- // For each camera, open a WebSocket and update the corresponding <img>
+ const wsMap = {};
{% for camera_id in camera_ids %}
+ // For each camera, open a WebSocket and update the corresponding <img>
(function() {
- let ws = new WebSocket('ws://' + window.location.host + '/ws/{{ client_id }}/{{ camera_id }}');
- let image = document.getElementById('video-{{ camera_id }}');
+ const cameraId = '{{ camera_id }}';
+ const ws = new WebSocket('ws://' + window.location.host + '/ws/{{ client_id }}/' + cameraId);
let currentUrl = null;
ws.onmessage = function(event) {
+ let image = document.getElementById('video-' + cameraId);
if (currentUrl) {
URL.revokeObjectURL(currentUrl);
}
@@ -32,16 +65,65 @@
image.src = currentUrl;
};
ws.onclose = function(event) {
- console.log('WebSocket closed for camera {{ camera_id }}:', event);
+ console.log('WebSocket closed for camera ' + cameraId + ':', event);
};
ws.onerror = function(event) {
- console.log('WebSocket error for camera {{ camera_id }}:', event);
+ console.log('WebSocket error for camera ' + cameraId + ':', event);
};
window.addEventListener('beforeunload', function() {
ws.close();
});
+ wsMap[cameraId] = ws;
})();
{% endfor %}
+
+ function sendConfig(cameraId, type) {
+ let msg = {};
+ switch(type) {
+ case 'modify_camera_name':
+ msg[type] = [
+ // cameraId,
+ document.getElementById('new-name-' + cameraId).value
+ ];
+ break;
+ case 'modify_camera_threshold':
+ msg[type] = [
+ // cameraId,
+ document.getElementById('threshold-value-' + cameraId).value
+ ];
+ break;
+ case 'modify_camera_grace_pd':
+ msg[type] = [
+ // cameraId,
+ parseInt(document.getElementById('grace-value-' + cameraId).value)
+ ];
+ break;
+ case 'modify_camera_address':
+ msg[type] = [
+ // cameraId,
+ document.getElementById('address-value-' + cameraId).value
+ ];
+ break;
+ case 'add_camera':
+ msg[type] = [
+ document.getElementById('add-name').value,
+ document.getElementById('add-address').value
+ ];
+ break;
+ case 'remove_camera':
+ msg[type] = [
+ document.getElementById('remove-name').value
+ ];
+ break;
+ }
+ const ws = wsMap[cameraId] && wsMap[cameraId] ? wsMap[cameraId] : Object.values(wsMap)[0];
+ if (ws && ws.readyState === WebSocket.OPEN) {
+ console.log("Sending message:", msg, "on ws:", ws);
+ ws.send(JSON.stringify(msg));
+ } else {
+ alert('WebSocket is not open for camera ' + cameraId);
+ }
+ }
</script>
</body>
</html>
diff --git a/webserver.py b/webserver.py
index d1c0a1e..162badc 100644
--- a/webserver.py
+++ b/webserver.py
@@ -1,42 +1,43 @@
#!/usr/bin/env python
-
"""
Module serving video from zmq to a webserver.
"""
-
__author__ = "Franoosh Corporation"
-
import os
-from collections import defaultdict
+from collections import defaultdict, deque
import json
import logging
+import zmq.asyncio
import asyncio
-from threading import Thread
+from collections import defaultdict
+from contextlib import asynccontextmanager
import uvicorn
+import zmq
from fastapi import (
FastAPI,
Request,
HTTPException,
WebSocket,
+ WebSocketDisconnect,
templating,
)
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from helpers import CustomLoggingFormatter
-import zmq
CLIENTS_JSON_FILE = os.path.join(os.getcwd(), 'clients.json')
+CLIENTS_DICT = defaultdict(list) # CLIENTS_DICT[client_id] = [camera_id1, camera_id2, ...]
LOGFILE = 'webserver.log'
-LOGLEVEL = logging.INFO
+LOGLEVEL = logging.DEBUG
HOST = "127.0.0.1"
-ZMQPORT = "9979"
-WSPORT = "8008"
-ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}"
-WS_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}"
+ZMQ_PORT = "9979"
+WEB_PORT = "8008"
+CTRL_BACKEND_ADDR = f"tcp://{HOST}:{ZMQ_PORT}"
+WEB_BACKEND_ADDR = f"tcp://{HOST}:{WEB_PORT}"
log_formatter = CustomLoggingFormatter()
handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
@@ -50,88 +51,175 @@ logging.basicConfig(
level=LOGLEVEL,
)
-
-
-app = FastAPI()
+# Track websocket connections by (client_id, camera_id):
+ws_connections = defaultdict(dict) # ws_connections[client_id][camera_id] = websocket
+ws_queues = defaultdict(dict)
+ctrl_msg_que = asyncio.Queue()
+# Create ZMQ context and socket:
+zmq_context = zmq.asyncio.Context()
+zmq_socket = zmq_context.socket(zmq.DEALER)
+# Connect to ZMQ backend:
+zmq_socket.connect(WEB_BACKEND_ADDR)
+
+
+async def zmq_bridge():
+ """Bridge between ZMQ backend and websocket clients."""
+ while True:
+ try:
+ data = await zmq_socket.recv_multipart()
+ topic, frame_data = None, None
+ if len(data) == 2:
+ topic, frame_data = data
+ elif len(data) == 3:
+ topic, _, _ = data
+ else:
+ logger.warning("Received invalid ZMQ message: %r", data)
+ continue
+ if topic:
+ client_id, camera_id = topic.decode('utf-8').split(':', 1)
+ if frame_data:
+ # Add client and camera to CLIENTS_DICT if new:
+ if not camera_id in CLIENTS_DICT[client_id]:
+ CLIENTS_DICT[client_id].append(camera_id)
+ else:
+ # No frame data means a notification to remove camera:
+ try:
+ CLIENTS_DICT[client_id].remove(camera_id)
+ except ValueError:
+ pass
+
+ queue = ws_queues.get(client_id, {}).get(camera_id)
+ if queue and frame_data:
+ if queue.full():
+ _ = queue.get_nowait() # Discard oldest frame
+ await queue.put(frame_data)
+ if not ctrl_msg_que.empty():
+ client_id, camera_id, command, args_list = await ctrl_msg_que.get()
+ zmq_socket.send_multipart([
+ client_id.encode('utf-8'),
+ camera_id.encode('utf-8'),
+ command.encode('utf-8'),
+ ] + args_list)
+ logger.info("Sent control command '%s' with args: %r for '/ws/%s/%s' to backend.", command, args_list, client_id, camera_id)
+ except Exception as e:
+ logger.error("Error in ZMQ bridge: %r", e)
+ # TODO: Check if this loop can be optimized to avoid busy waiting.
+ # Alternative implementation using zmq Poller:
+ # poll = zmq.asyncio.Poller()
+ # poll.register(zmq_socket, zmq.POLLIN)
+ # while True:
+ # try:
+ # sockets = dict(await poll.poll())
+ # if zmq_socket in sockets:
+ # topic, frame_data = await zmq_socket.recv_multipart()
+ # client_id, camera_id = topic.decode('utf-8').split(':', 1)
+ # set_clients(client_id, camera_id)
+ # queue = ws_queues.get(client_id, {}).get(camera_id)
+ # if queue:
+ # if queue.full():
+ # _ = queue.get_nowait() # Discard oldest frame
+ # await queue.put(frame_data)
+ # if not ctrl_msg_que.empty():
+ # client_id, camera_id, command, args_list = await ctrl_msg_que.get()
+ # zmq_socket.send_multipart([
+ # client_id.encode('utf-8'),
+ # camera_id.encode('utf-8'),
+ # command.encode('utf-8'),
+ # ] + args_list)
+ # logger.info("Sent control command '%s' with args: %r for '/ws/%s/%s' to backend.", command, args_list, client_id, camera_id)
+ # except Exception as e:
+ # logger.error("Error in ZMQ bridge: %r", e)
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ """Create lifespan context for FastAPI app."""
+ asyncio.create_task(zmq_bridge())
+ yield
+
+app = FastAPI(lifespan=lifespan)
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = templating.Jinja2Templates(directory='templates')
-# Track websocket connections by (client_id, camera_id)
-ws_connections = defaultdict(dict) # ws_connections[client_id][camera_id] = websocket
-
-# Set up a single ZMQ SUB socket for all websocket connections
-zmq_context = zmq.Context()
-zmq_socket = zmq_context.socket(zmq.SUB)
-zmq_socket.bind(WS_BACKEND_ADDR)
-zmq_socket.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all topics
-poller = zmq.Poller()
-poller.register(zmq_socket, zmq.POLLIN)
-
-def load_clients():
- try:
- with open(CLIENTS_JSON_FILE) as f:
- clients_dict = json.load(f)
- except FileNotFoundError:
- clients_dict = {}
- return clients_dict
@app.get("/")
async def main_route(request: Request):
- logger.error("DEBUG: main route visited")
- clients = load_clients()
+ """Serve main page."""
+ logger.debug("Main route visited")
return templates.TemplateResponse(
"main.html",
{
"request": request,
- "clients": clients,
+ "clients": CLIENTS_DICT,
}
)
@app.get("/clients/{client_id}", response_class=HTMLResponse)
async def client_route(request: Request, client_id: str):
"""Serve client page."""
- clients_dict = load_clients()
- logger.debug("Checking client_id: '%s' in clients_dict: %r.", client_id, clients_dict)
- if not client_id in clients_dict:
- return HTTPException(status_code=404, detail="No such client ID.")
+ logger.debug("Checking client_id: '%s' in clients_dict: %r.", client_id, CLIENTS_DICT)
+ if not client_id in CLIENTS_DICT:
+ raise HTTPException(status_code=404, detail="No such client ID.")
return templates.TemplateResponse(
"client.html",
{
"request": request,
"client_id": client_id,
- "camera_ids": clients_dict[client_id],
+ "camera_ids": CLIENTS_DICT[client_id],
},
)
-
@app.websocket("/ws/{client_id}/{camera_id}")
async def camera_route(websocket: WebSocket, client_id: str, camera_id: str):
"""Serve a particular camera page."""
logger.info("Accepting websocket connection for '/ws/%s/%s'.", client_id, camera_id)
await websocket.accept()
- if client_id not in ws_connections:
- ws_connections[client_id] = {}
- ws_connections[client_id][camera_id] = websocket
- try:
+ ws_connections[client_id][camera_id] = {'ws': websocket}
+ ws_queues[client_id][camera_id] = asyncio.Queue(maxsize=10)
+ queue = ws_queues[client_id][camera_id]
+
+ async def send_frames():
+ while True:
+ frame_data = await queue.get()
+ await websocket.send_bytes(frame_data)
+
+ async def receive_control():
while True:
- # Wait for a frame for this client/camera
- sockets = dict(poller.poll(1000))
- if zmq_socket in sockets:
- msg = zmq_socket.recv_multipart()
- if len(msg) == 3:
- recv_client_id, recv_camera_id, content = msg
- recv_client_id = recv_client_id.decode("utf-8")
- recv_camera_id = recv_camera_id.decode("utf-8")
- # Only send to the websocket for this client/camera
- if recv_client_id == client_id and recv_camera_id == camera_id:
- await websocket.send_bytes(content)
- except Exception as exc:
- logger.warning("Connection closed: %r", exc)
+ try:
+ data = await websocket.receive_text()
+ logger.info("Received control message from '/ws/%s/%s': %s", client_id, camera_id, data)
+ # Handle control messages from the client:
+ frontend_message = json.loads(data)
+ for command, args in frontend_message.items():
+ args_list = [str(arg).encode('utf-8') for arg in args]
+ ctrl_msg_que.put_nowait((client_id, camera_id, command, args_list))
+ # zmq_control_socket.send_multipart([
+ # client_id.encode('utf-8'),
+ # camera_id.encode('utf-8'),
+ # command.encode('utf-8'),
+ # b'',
+ # ] + args_list)
+ # logger.info("Sent control command '%s' with args: %r for '/ws/%s/%s' to backend.", command, args_list, client_id, camera_id)
+ logger.info("Put control command '%s' with args: %r for '/ws/%s/%s' on queue to backend.", command, args_list, client_id, camera_id)
+ except json.JSONDecodeError:
+ logger.warning("Received invalid JSON from '/ws/%s/%s': %s", client_id, camera_id, data)
+ except WebSocketDisconnect:
+ logger.info("WebSocket disconnected for '/ws/%s/%s'.", client_id, camera_id)
+ break
+ except Exception as exc:
+ logger.warning("Error receiving control message: %r", exc)
+ send_task = asyncio.create_task(send_frames())
+ receive_task = asyncio.create_task(receive_control())
+ try:
+ # await asyncio.gather(send_task, receive_task)
+ await asyncio.wait(
+ [send_task, receive_task],
+ return_when=asyncio.FIRST_COMPLETED,
+ )
finally:
- if client_id in ws_connections and camera_id in ws_connections[client_id]:
- del ws_connections[client_id][camera_id]
- await websocket.close()
-
+ send_task.cancel()
+ receive_task.cancel()
+ ws_connections[client_id].pop(camera_id, None)
+ ws_queues[client_id].pop(camera_id, None)
if __name__ == "__main__":
uvicorn.run(
@@ -139,4 +227,4 @@ if __name__ == "__main__":
port=8007,
host='127.0.0.1',
log_level='info',
- ) \ No newline at end of file
+ )
diff --git a/worker.py b/worker.py
index ad5fe1e..54d1696 100644
--- a/worker.py
+++ b/worker.py
@@ -1,29 +1,51 @@
#!/usr/bin/env python
-__author__ = "Franoosh Corporation"
"""
A module consisting of a zeromq worker receiving video stream
via router from clients. Able to send control messages to clients.
+Needs libzmq and pyzmq with 'drafts' support.
"""
import os
+import sys
from threading import Thread, Event
import time
import signal
import logging
from queue import Queue
from collections import defaultdict
+import tempfile
+import json
import zmq
+import cv2
+import numpy
from helpers import CustomLoggingFormatter
+__author__ = "Franoosh Corporation"
+
+# Various constants: # TODO: put them in a config
+
+HOST = "127.0.0.1"
+ZMQPORT = "9979"
+# WSPORT = "8000"
+WSPORT = "8008"
+ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}"
+WEB_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}"
+# LOGLEVEL = logging.DEBUG
+LOGLEVEL = logging.INFO
+
+# File paths:
-ADDR = "tcp://localhost"
-PORT = "9979"
-INITIAL_BACKEND_ADDR = f"{ADDR}:{PORT}"
LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log"
-LOGLEVEL = logging.DEBUG
+CWD = os.getcwd()
+TMP_DIR = os.path.join(CWD, "tmp")
+CLIENTS_JSON_FILE = os.path.join(CWD, 'clients.json')
+
+# Other:
+
+CLIENTS_DICT = {}
stop_event = Event()
@@ -39,6 +61,14 @@ logging.basicConfig(
level=LOGLEVEL,
)
+
+if not os.path.exists(TMP_DIR):
+ try:
+ os.makedirs(TMP_DIR)
+ except Exception as exc:
+ logger.error("Could not create temporary directory: %r", exc)
+ sys.exit()
+
class MonitorTask(Thread):
"""Monitor task"""
def __init__(self, socket):
@@ -92,14 +122,45 @@ class ServerWorker(Thread):
self.socket.identity = self.id.encode("utf-8")
self.monitor = MonitorTask(self.socket)
self.video_threads = defaultdict(lambda: defaultdict(dict))
+ self.poller = zmq.Poller()
+ self.web_sock = self.context.socket(zmq.DEALER)
self.connected = False
self.running = True
+ def start_client(self, client_id, camera_id, filename):
+ """
+ Start a video thread for new client_id and camera_id.
+ """
+ if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id): # New client or new camera
+ q = Queue()
+ logger.debug("Starting new video thread for client '%s', camera '%s'", client_id, camera_id)
+ video_worker = VideoWorker(client_id, camera_id, filename, q)
+ video_worker.start()
+ self.video_threads[client_id][camera_id] = video_worker
+
+ def stop_client(self, client_id, camera_id):
+ """
+ Stop video thread for a client_id and camera_id.
+ """
+ if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
+ logger.debug("Stopping video thread for client '%s', camera '%s'", client_id, camera_id)
+ self.video_threads[client_id][camera_id].stop() # Stop the thread
+ del self.video_threads[client_id][camera_id]
+ logger.info("Stopped video thread for client '%s', camera '%s'", client_id, camera_id)
+
def run(self):
+ """
+ Main loop of the worker.
+ Full of wonders.
+ """
logger.debug("ServerWorker '%s' starting ...", self.id)
- self.socket.connect(INITIAL_BACKEND_ADDR)
- self.poller = zmq.Poller()
+ self.socket.connect(ZMQ_BACKEND_ADDR)
+ try:
+ self.web_sock.bind(WEB_BACKEND_ADDR)
+ except Exception as exc:
+ logger.error("Connection to zmq websocket failed: %r", exc)
self.poller.register(self.socket, zmq.POLLIN)
+ self.poller.register(self.web_sock, zmq.POLLIN)
self.monitor.start()
while self.running:
@@ -112,67 +173,93 @@ class ServerWorker(Thread):
if self.socket in sockets:
self.connected = True
msg = self.socket.recv_multipart()
- logger.debug("ServerWorker '%s' received message: %r", self.id, msg)
+ logger.debug("ServerWorker '%s' received message of length: %d.", self.id, len(msg))
filename = None
- # At the moment we don't expect any other message than a video message:
+ # At the moment we don't expect any other message than a start/stop message (of length 4) and a video message:
if len(msg) == 4: # This is a message with start/stop directive and no video data.
- logger.debug("Received start/stop directive: %r", msg)
+ logger.debug("Received start/stop directive: (?)")
client_id, camera_id, metadata = msg[0], msg[1], msg[2]
+ # Convert bytes to str once and for all:
+ client_id = client_id.decode('utf-8')
+ camera_id = camera_id.decode('utf-8')
+ update_clients(client_id, camera_id)
try:
- directive, timestamp = metadata.decode("utf-8").split(":")
- filename = f"{client_id.decode('utf-8')}_{camera_id.decode('utf-8')}-{timestamp}.mp4"
- logger.debug("Directive: '%s', Timestamp: '%s', Filename: '%s'", directive, timestamp, filename)
+ # Directive and data are fron now on converted to strings:
+ directive, data = metadata.decode("utf-8").split(":")
+ logger.info(
+ "Received directive '%s' with data: %r for client '%s', camera '%s'",
+ directive,
+ data,
+ client_id,
+ camera_id,
+ )
except ValueError:
logger.error("Invalid metadata format: %r", metadata)
+ directive = None
continue
- if directive == "start":
- if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id): # New client or new camera
- q = Queue()
- logger.debug("Starting new video thread for client '%s', camera '%s'", client_id, camera_id)
- video_worker = VideoWorker(filename, q)
- video_worker.start()
- self.video_threads[client_id][camera_id] = video_worker
- elif directive == "stop":
- if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
- logger.debug("Stopping video thread for client '%s', camera '%s'", client_id, camera_id)
- self.video_threads[client_id][camera_id].queue.put(None) # Sentinel to stop the thread
- del self.video_threads[client_id][camera_id]
- logger.info("Stopped video thread for client '%s', camera '%s'", client_id, camera_id)
+ if directive == 'rename':
+ old_name, new_name, timestamp = None, None, None
+ try:
+ old_name, new_name, timestamp = data.split(":")
+ logger.info("Renamed video thread from '%s' to '%s'.", old_name, new_name)
+ except ValueError:
+ logger.error("Invalid rename data format: %r", data)
+ continue
+ if old_name and new_name and timestamp:
+ # I think it's better to stop the old thread and start a new one,
+ # rather than reuse the old one as it's less mucking about.
+ self.stop_client(old_name, camera_id)
+ self.start_client(new_name, camera_id, f"{new_name}_{camera_id}-{timestamp}.mkv")
+ to_remove= b':'.join([client_id, camera_id])
+ try:
+ self.web_sock.send_multipart([to_remove, b'', b'']) # Notify webserver of rename
+ except Exception as exc:
+ logger.error("Sending rename notification to websocket failed: %r", exc)
else:
- logger.error("Unknown directive: '%s'", directive)
- elif len(msg) == 3: # This is a video message with data only
- logger.debug("Received video message with data only: %r", msg)
- client_id, camera_id, msg = msg[0], msg[1], msg[2]
+ timestamp = data
+ filename = f"{client_id}_{camera_id}-{timestamp}.mkv"
+ logger.debug("Directive: '%s', Timestamp: '%s', Filename: '%s'", directive, timestamp, filename)
+ if directive == "start":
+ self.start_client(client_id, camera_id, filename)
+ elif directive == "stop":
+ self.stop_client(client_id, camera_id)
+ else:
+ logger.error("Unknown directive: %r", directive)
+
+ elif len(msg) == 3: # This is a video message with data
+ logger.debug("Received video message with data only.")
+ client_id, camera_id, content = msg[0], msg[1], msg[2]
+ client_id = client_id.decode('utf-8')
+ camera_id = camera_id.decode('utf-8')
if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
- self.video_threads[client_id][camera_id].queue.put(msg)
+ self.video_threads[client_id][camera_id].queue.put(content)
+ # Send only [client_id, camera_id, jpeg_bytes] to the webserver:
+ # zmq subsciber can subscribe to a topic defined by the first
+ # part of the multipart message, so in order to allow for a
+ # per camera subscription, we need to join client_id and camera_id
+ topic = ':'.join([client_id, camera_id]).encode('utf-8')
+ try:
+ self.web_sock.send_multipart([topic, content], flags=zmq.NOBLOCK)
+ except Exception as exc:
+ logger.error("Sending message to websocket failed: %r", exc)
else:
logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id)
logger.error("Available video threads keys: %r", self.video_threads.keys())
logger.error("Available video threads values: %r", self.video_threads.values())
- elif len(msg) == 2: # This is a ping message from client
- logger.debug("Received ping message: %r", msg)
- client_id, camera_id = msg[0], msg[1]
- if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
- # Respond with a pong message
- try:
- self.socket.send_multipart([client_id, camera_id, b'pong'])
- logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id)
- except zmq.ZMQError as exc:
- logger.error("Failed to send pong: %r", exc)
-
else:
- logger.warning("Received a message of not expected length from client: %r message: %r", client_id, msg)
- try:
- self.socket.send_multipart([client_id, camera_id, b'pong'])
- logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id)
- except zmq.ZMQError as exc:
- logger.error("Failed to send pong: %r", exc)
+ logger.warning("Received a message of unexpected length from client. Message length: %d", len(msg))
else:
logger.debug("No message received, polling again ...")
time.sleep(5)
+ if self.web_sock in sockets:
+ frontend_msg = self.web_sock.recv_multipart()
+ logger.info("Received message from frontend: %r", frontend_msg)
+ self.socket.send_multipart(frontend_msg)
+ logger.info("Forwarded message to backend: %r", frontend_msg)
self.monitor.stop()
self.monitor.join()
+
for camera_thread in self.video_threads.values():
for thread in camera_thread.values():
thread.queue.put(None) # Sentinel to unblock queue.get()
@@ -183,7 +270,7 @@ class ServerWorker(Thread):
"""Send control message to a specific client and camera."""
if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
self.socket.send_multipart([client_id.encode("utf-8"), camera_id.encode("utf-8"), message])
- logger.debug("Sent control message to client '%s', camera '%s': %r", client_id, camera_id, message)
+ logger.info("Sent control message to client '%s', camera '%s': %r", client_id, camera_id, message)
else:
logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id)
@@ -195,34 +282,62 @@ class ServerWorker(Thread):
class VideoWorker(Thread):
"""Class for video threads."""
- def __init__(self, filename, queue):
+ def __init__(self, client_id, camera_id, filename, queue):
super().__init__(daemon=True)
+ self.context = zmq.Context()
+ self.context.setsockopt(zmq.LINGER, 0)
+ self.client_id = client_id
+ self.camera_id = camera_id
self.filename = filename
self.queue = queue
self.live = True
+ def stop(self):
+ logger.info("VideoWorker %r exiting ...", self.camera_id)
+ self.live = False
+
def run(self):
if os.path.exists(self.filename):
logger.warning("File '%s' already exists, overwriting ...", self.filename)
- with open(self.filename, 'wb') as f:
- logger.info("VideoWorker started writing to file: %s", self.filename)
- while self.live:
- # Simulate video processing
- frame = self.queue.get() # Get frame from queue
- logger.debug("Processing ('writing to a file') frame for camera: %r", frame)
- if frame is None:
- logger.info("VideoWorker received stop signal, exiting ...")
- break
- f.write(frame + b'\n') # Write frame to file
-
- def stop(self):
- logger.info("VideoWorker '' exiting ...")
- self.live = False
+ fourcc = cv2.VideoWriter_fourcc(*'VP80')
+ out = cv2.VideoWriter(self.filename, fourcc, 30.0, (640, 480)) # Assuming 640x480 resolution
+ logger.info("This is the first run, binding websocket ...")
+ while self.live:
+ logger.debug("VideoWorker writing to file: %s", self.filename)
+ frame_bytes = self.queue.get()
+ if frame_bytes is None:
+ logger.debug("Received None, stopping video worker for camera: '%s'", self.camera_id)
+ break
+ frame = cv2.imdecode(numpy.frombuffer(frame_bytes, dtype=numpy.uint8), cv2.IMREAD_COLOR)
+ logger.debug("Processing ('writing to a file') frame for camera: '%s'", self.camera_id)
+ # Write frame to file:
+ out.write(frame)
+
+ # Release video writer
+ out.release()
+ logger.info("VideoWorker finished writing to file: %s", self.filename)
def signal_handler(sig, frame):
worker.stop()
+def update_clients(client_id, camera_id):
+ """Update client and camera dictionary and write to a shared file."""
+ global CLIENTS_DICT
+ if client_id not in CLIENTS_DICT:
+ logger.debug("Client_id not in CLIENTS_DICT, adding an empty list for it.")
+ CLIENTS_DICT[client_id] = []
+ if camera_id not in CLIENTS_DICT[client_id]:
+ logger.debug("Camera_id not in CLIENTS_DICT[%s] list, adding", client_id)
+ CLIENTS_DICT[client_id].append(camera_id)
+ # Atomic write using tempfile. Works only when both files on the same filesystem
+ with tempfile.NamedTemporaryFile('w', dir=TMP_DIR, delete=False) as tf:
+ logger.debug("Dumping to file CLIENTS_DICT: %r", CLIENTS_DICT)
+ json.dump(CLIENTS_DICT, tf)
+ tempname = tf.name
+ os.replace(tempname, CLIENTS_JSON_FILE)
+
+
if __name__ == "__main__":
logger.info("Starting up ...")
@@ -233,3 +348,7 @@ if __name__ == "__main__":
worker.start()
worker.join()
+ try:
+ os.remove(CLIENTS_JSON_FILE)
+ except FileNotFoundError:
+ pass \ No newline at end of file