diff options
| author | Franoosh <uinarf@autistici.org> | 2025-10-15 14:42:29 +0200 |
|---|---|---|
| committer | Franoosh <uinarf@autistici.org> | 2025-10-15 14:42:29 +0200 |
| commit | 70beed73465ab27449a59c62043d94c16efe00c5 (patch) | |
| tree | 9f4b5d6a72711af4641b01169da5d20a2228ef64 | |
| parent | 68bd1bd052a7cd6438b92cb1059ef5e58b8d022c (diff) | |
| download | ZeroMQ_Video_Streaming-70beed73465ab27449a59c62043d94c16efe00c5.tar.gz ZeroMQ_Video_Streaming-70beed73465ab27449a59c62043d94c16efe00c5.tar.bz2 ZeroMQ_Video_Streaming-70beed73465ab27449a59c62043d94c16efe00c5.zip | |
Added camera support. Movement recognition and video streaming. Web server and frontend. Work in progress. To be fixed: frontend reloading information about client and a page after major changes like camera name or address change, camera removal. Add certificates before testing on actual distributed hardware. Add user login logic.added_video
| -rw-r--r-- | client.py | 412 | ||||
| -rw-r--r-- | helpers.py | 63 | ||||
| -rw-r--r-- | router.py | 160 | ||||
| -rw-r--r-- | templates/client.html | 98 | ||||
| -rw-r--r-- | webserver.py | 214 | ||||
| -rw-r--r-- | worker.py | 247 |
6 files changed, 927 insertions, 267 deletions
@@ -6,28 +6,51 @@ A module containing client for streaming video to a zmq router. __author__ = "Franoosh Corporation" +import sys from os import path from threading import Thread, Event -from queue import Queue +from queue import Queue, Empty +from collections import deque import time import datetime import signal import logging from configparser import ConfigParser -import traceback import zmq +import cv2 +from collections import defaultdict -from helpers import CustomLoggingFormatter +from helpers import ( + CustomLoggingFormatter, + compute_contours, + detect_movement, + draw_contours, + ) -ROUTER_ADDRESS = "tcp://localhost" -PORT = "5569" -ADDRESS = f"{ROUTER_ADDRESS}:{PORT}" +################################################### +# Configuration, defaults and logging setup +################################################### CONFIG_FILE = "client.cfg" -LOGFILE = f"{path.splitext(path.basename(__file__))[0]}.log" -LOGLEVEL = logging.DEBUG -CAMERAS = {'front_camera': '/dev/video0', 'back_camera': '/dev/video1'} -SLEEP_TIME = 5 +# Those will be updated from config file if present: +LOGDIR = '.' +LOGFILE = path.join(LOGDIR, 'client.log') +LOGLEVEL = logging.INFO + +ROUTER_ADDRESS = "localhost" +PORT = "5569" +ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}" +# This is a default: +CAMERAS = { + 'default camera': { + 'address': '/dev/video0', + }, +} +KNOWN_CAMERA_PARAMETERS = ('address', 'contour_size_threshold', 'movement_grace_period') +CONTOUR_SIZE_THRESHOLD = 4000 # Minimum contour area to consider +MOVEMENT_GRACE_PERIOD = 10 # Seconds to wait after movement stops +TIME_FORMAT = '%Y_%m_%d-%H_%M_%S' +START_STOP_MESSAGE_FMT = "{action}:{data}" stop_event = Event() @@ -40,57 +63,115 @@ logging.root.setLevel(LOGLEVEL) logger = logging.getLogger(__name__) logging.basicConfig( filename=LOGFILE, - datefmt='%Y-%m-%d %I:%M:%S', + datefmt=TIME_FORMAT, level=LOGLEVEL, ) - -def read_config(conf_file): - """Read config file and return as dictionary.""" - cfg = ConfigParser() - cfg.read(conf_file) - - return {key: dict(cfg.items(key)) for key in cfg.sections()} +################################################### +# End configuration, defaults and logging setup +################################################### class ClientVideo(Thread): """Class for sending video stream""" - def __init__(self, _id, device, queue): + def __init__(self, _id, device_dict, queue): super().__init__(daemon=True) self.identity = _id - self.device = device + self.device_dict = device_dict + self.device_addr = device_dict.get('address', '/dev/video0') + try: + # Could move parameter validation to config reading function: + self.device_threshold = int(device_dict['contour_size_threshold']) + except (ValueError, KeyError) as exc: + logger.error("ClientVideo '%s': Invalid contour size threshold: %r, using default: %d", self.identity, exc, CONTOUR_SIZE_THRESHOLD) + self.device_threshold = CONTOUR_SIZE_THRESHOLD + try: + self.device_grace_period = int(device_dict['movement_grace_period']) + except (ValueError, KeyError) as exc: + logger.error("ClientVideo '%s': Invalid movement grace period: %r, using default: %d", self.identity, exc, MOVEMENT_GRACE_PERIOD) + self.device_grace_period = MOVEMENT_GRACE_PERIOD self.queue = queue + self.frame_deque = deque(maxlen=2) # Store last 2 frames self.live = True + def put_metadata_message(self, action, data): + """Put metadata message to queue.""" + metadata = START_STOP_MESSAGE_FMT.format( + action=action, + data=data, + ) + logger.info("ClientVideo '%s': %s at: %r", self.identity, action, data) + message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] + self.queue.put(message) + + def put_video_message(self, frame): + # Encode frame as JPEG: + _, buffer = cv2.imencode('.jpg', frame) + # Add identity to message: + message = [self.identity.encode('utf-8'), buffer.tobytes()] + # Put message to queue: + self.queue.put(message) + def run(self): - """Replace with actual video streaming logic.""" - ping_no = 0 + """Video streaming logic.""" + logger.debug("ClientVideo '%s' starting ...", self.identity) + cap = cv2.VideoCapture(self.device_addr) + if not cap.isOpened(): + logger.error("ClientVideo '%s': Unable to open video device '%s'", self.identity, self.device_addr) + return + + movement_detected = False + movement_detected_start = None while self.live and not stop_event.is_set(): - try: - # Four parts required to start/stop video stream, three parts for ping: - if ping_no == 0: # Start video stream - timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") - metadata = f"start:{timestamp}" - logger.debug("ClientVideo '%s' sending metadata: '%s'", self.identity, metadata) - message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] - self.queue.put(message) - text = f"ping-{ping_no}" - message = [self.identity.encode('utf-8'), text.encode('utf-8')] - elif ping_no >= 5: # Stop video stream - logger.debug("ClientVideo '%s' sending stop signal", self.identity) - timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") - metadata = f"stop:{timestamp}" - logger.debug("ClientVideo '%s' sending metadata: '%s' and ping: %r", self.identity, metadata, b'') - message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] - self.live = False - else: # Send ping - text = f"ping-{ping_no}" - message = [self.identity.encode('utf-8'), text.encode('utf-8')] - logger.debug("ClientVideo '%s' sending message: %r", self.identity, message) - self.queue.put(message) - ping_no += 1 - except Exception as exc: - logger.error("ClientVideo: socket error: %r", exc) - time.sleep(SLEEP_TIME) + ret, frame = cap.read() + if not ret: + logger.error("ClientVideo '%s': Failed to read frame from device '%s'", self.identity, self.device_addr) + break + self.frame_deque.append(frame) + # Skip on the first frame: + if len(self.frame_deque) >= 2: + contours = compute_contours(self.frame_deque) + logger.debug("ClientVideo '%s': Found %d contours.", self.identity, len(contours)) + logger.debug("ClientVideo '%s': Contours: %r", self.identity, contours) + movement_now = detect_movement(contours, min_area=self.device_threshold) + if movement_now: + logger.debug("ClientVideo '%s': Movement detected in frame.", self.identity) + # Only update movement start time if movement was not detected before: + if not movement_detected: + # Update movement detected start time: + movement_detected_start = datetime.datetime.now() + # Only send start message if movement was not detected before: + timestamp = movement_detected_start.strftime(TIME_FORMAT) + self.put_metadata_message( + action='start', + data=timestamp, + ) + # and set movement detected flag: + movement_detected = True + else: + logger.debug("ClientVideo '%s': No movement detected in frame.", self.identity) + + if movement_detected: + # Draw contours on frame: + frame = draw_contours(frame, contours, min_contour_area=self.device_threshold) + # Send video message with contour and frame: + self.put_video_message(frame) + + # Check if movement has stopped, taking into account grace period: + now = datetime.datetime.now() + delta_seconds = (now - movement_detected_start).total_seconds() + logger.debug("ClientVideo '%s': delta seconds since movement detected: %d, grace remaining: %d.", self.identity, delta_seconds, self.device_grace_period - delta_seconds) + # Wait seconds before deciding movement has stopped: + if delta_seconds > self.device_grace_period: + timestamp = now.strftime(TIME_FORMAT) + self.put_metadata_message( + action='stop', + data=timestamp, + ) + movement_detected = False + movement_detected_start = None + + else: + logger.debug("ClientVideo '%s': Frame deque length: %d", self.identity, len(self.frame_deque)) logger.info("ClientVideo '%s' closing socket ...", self.identity) @@ -98,7 +179,6 @@ class ClientVideo(Thread): logger.info("Client video '%s' exiting ...", self.identity) self.live = False - class ClientTask(Thread): """Main Client Task with logic, message handling and setup.""" @@ -111,35 +191,152 @@ class ClientTask(Thread): self.socket.identity = self.identity.encode("utf-8") self.poll = zmq.Poller() self.poll.register(self.socket, zmq.POLLIN) - self.video_threads = [] + # self.video_threads = [] + self.video_threads = {} self.running = True self.connected = False + self.queue = Queue() - def run(self): - self.socket.connect(ADDRESS) - logger.debug("Client '%s' connected to %s", self.identity, ADDRESS) - logger.debug("starting video threads ...") - q = Queue() - for _id, device in CAMERAS.items(): - vid_cl = ClientVideo(_id, device, q) - vid_cl.start() - self.video_threads.append(vid_cl) + def receive_messages(self): + """Receive messages from the server.""" while self.running: try: sockets = dict(self.poll.poll(1000)) if self.socket in sockets: try: message = self.socket.recv_multipart() - logger.debug("Client '%s' received message: %r", self.identity, message) - except Exception: - logger.error("Failed to receive message: %r", traceback.format_exc()) - if q.qsize() > 0: - frame = q.get() - if frame is not None: - logger.debug("Processing frame: %r", frame) - self.socket.send_multipart(frame) + logger.info("Client '%s' received message: %r.", self.identity, message) + self.update_config(message) + except Exception as exc: + logger.error("Failed to receive message: %r", exc) except zmq.ZMQError as exc: logger.error("Client '%s': socket error: %r", self.identity, exc) + time.sleep(0.01) # Sleep to avoid busy waiting + + def send_messages(self): + """Send messages to the server.""" + while self.running: + try: + frame = self.queue.get(timeout=0.1) + logger.debug("Sending message of length %r ...", len(frame)) + try: + self.socket.send_multipart(frame) + except zmq.ZMQError as exc: + logger.error("Client '%s': socket error: %r", self.identity, exc) + except Empty: + continue # No message to send, continue waiting + + def update_config(self, message): + """ + Update configuration based on message from server. + Only 'cameras' section can be updated. + directives = ( + 'modify_camera_name', + 'modify_camera_threshold', + 'modify_camera_grace_pd', + 'modify_camera_address', + 'add_camera', + 'remove_camera', + ) + """ + try: + msg_list = [part.decode('utf-8') for part in message] + logger.debug("Client received config message: %r", msg_list) + camera_id, directive, value = msg_list[0], msg_list[1], msg_list[2] + if directive in ( + 'modify_camera_name', + 'modify_camera_threshold', + 'modify_camera_grace_pd', + 'modify_camera_address', + ): + if camera_id not in CAMERAS: + logger.warning("Cannot rename unknown camera: %r", camera_id) + logger.warning("Known cameras: %r", list(CAMERAS.keys())) + else: + if directive == 'modify_camera_name': + old_name, new_name = camera_id, value + if new_name in CAMERAS: + raise ValueError("New camera name already exists.") + CAMERAS[new_name] = CAMERAS.pop(old_name) + self.video_threads[new_name] = self.video_threads.pop(old_name) + # Send message to server to update routing: + timestamp = datetime.datetime.now().strftime(TIME_FORMAT) + self.video_threads[new_name].put_metadata_message( + action='rename', + data=':'.join([old_name, new_name, timestamp]) + ) + self.send_messages + for item in cfg.items('cameras'): + if item[0].startswith(old_name + '.'): + param = item[0].split('.', 1)[1] + cfg.set('cameras', f"{new_name}.{param}", item[1]) + cfg.remove_option('cameras', item[0]) + logger.info("Renamed and restarted thread for camera '%s' to '%s'.", old_name, new_name) + elif directive == 'modify_camera_threshold': + new_threshold = int(value) + if new_threshold <= 0: + raise ValueError("Threshold must be positive.") + CAMERAS[camera_id]['contour_size_threshold'] = new_threshold + self.video_threads[camera_id].device_threshold = new_threshold + cfg.set('cameras', f"{camera_id}.contour_size_threshold", str(new_threshold)) + logger.info("Modified contour size threshold of camera '%s' to %d.", camera_id, new_threshold) + elif directive == 'modify_camera_grace_pd': + new_grace_pd = int(value) + if new_grace_pd < 0: + raise ValueError("Grace period cannot be negative.") + CAMERAS[camera_id]['movement_grace_period'] = new_grace_pd + self.video_threads[camera_id].device_grace_period = new_grace_pd + cfg.set('cameras', f"{camera_id}.movement_grace_period", str(new_grace_pd)) + logger.info("Modified movement grace period of camera '%s' to %d.", camera_id, new_grace_pd) + elif directive == 'modify_camera_address': + new_address = value + CAMERAS[camera_id]['address'] = new_address + self.video_threads[camera_id].device_addr = new_address + # Changing address on the fly requires restarting the video thread: + self.video_threads[camera_id].stop() + self.video_threads[camera_id].join() + vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue) + vid_cl.start() + self.video_threads[camera_id] = vid_cl + logger.info("Modified address of camera '%s' to '%s'.", camera_id, new_address) + cfg.set('cameras', f"{camera_id}.address", new_address) + elif directive == 'add_camera': + cam_address = value + if camera_id in CAMERAS: + raise ValueError("Camera name already exists.") + CAMERAS[camera_id] = {'address': cam_address} + vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue) + vid_cl.start() + self.video_threads[camera_id] = vid_cl + cfg.set('cameras', f"{camera_id}.address", cam_address) + logger.info("Added and started a thread for new camera '%s' with address '%s'.", camera_id, cam_address) + else: + logger.warning("Unknown config directive: %r", directive) + + except (ValueError, IndexError) as exc: + logger.error("Failed to handle config message: %r", exc) + cfg.write(open(CONFIG_FILE, 'w')) + + def run(self): + """Main client logic.""" + self.socket.connect(ADDRESS) + logger.debug("Client '%s' connected to %s", self.identity, ADDRESS) + logger.debug("Starting video threads ...") + for _id, device_dict in CAMERAS.items(): + vid_cl = ClientVideo(_id, device_dict, self.queue) + vid_cl.start() + self.video_threads[_id] = (vid_cl) + + recv_thread = Thread(target=self.receive_messages, daemon=True) + send_thread = Thread(target=self.send_messages, daemon=True) + + recv_thread.start() + send_thread.start() + logger.debug("Client '%s' started receiving and sending threads.", self.identity) + + logger.debug("Client '%s' waiting for threads to finish ...", self.identity) + recv_thread.join() + send_thread.join() logger.info("Closing socket ...") self.socket.close() @@ -149,18 +346,86 @@ class ClientTask(Thread): def stop(self): """Stop task""" logger.info("ClientTask cleaning up ...") - for thread in self.video_threads: - logger.info("Cleaning u video thread ...") + for _id, thread in self.video_threads.items(): + logger.info("Cleaning video thread %s ...", _id) thread.stop() thread.join() - self.video_threads = [] + # self.video_threads = [] + self.video_threads = {} logger.info("Client task exiting ...") self.running = False +def read_config(conf_file): + """ + Read config file and return as dictionary. + The only required section is 'cameras' with at least one camera. + Do basic sanity checks and update global variables. + Log warnings if config file or sections/options are missing. + """ + cfg = ConfigParser() + cfg.read(conf_file) + # Need to update logging info: + if 'logging' in cfg: + global LOGDIR, LOGFILE, LOGLEVEL + + LOGDIR = cfg.get('logging', 'logdir', fallback='.') + LOGFILE = cfg.get('logging', 'logfile', fallback=LOGFILE) + LOGLEVEL = cfg.get('logging', 'loglevel', fallback=LOGLEVEL) + logger.setLevel(LOGLEVEL) + for _handler in logger.handlers: + _handler.setLevel(LOGLEVEL) + if 'network' in cfg: + global ROUTER_ADDRESS, PORT, ADDRESS + if cfg.has_option('network', 'router_address'): + ROUTER_ADDRESS = cfg.get('network', 'router_address') + else: + logger.warning("No 'router_address' option in 'network' section, using default: %r.", ROUTER_ADDRESS) + if cfg.has_option('network', 'router_port'): + PORT = cfg.get('network', 'router_port') + else: + logger.warning("No 'router_port' option in 'network' section, using default: %r.", PORT) + ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}" + else: + logger.warning("No 'network' section in config file, using defaults: %r.", ADDRESS) + if 'cameras' in cfg: + global CAMERAS + # These shenanigans below allow for a nested config simulation with configparser + # and so per camera settings: + cameras_dict = defaultdict(dict) + for key, val in cfg.items('cameras'): + try: + cam, param = key.split('.', 1) + if param in KNOWN_CAMERA_PARAMETERS: + cameras_dict[cam][param] = val + else: + logger.warning("Unknown camera %r parameter: %r", cam, param) + except ValueError as exc: + logger.error("Invalid camera configuration entry: %r, error: %r, using defaults: %r.", key, exc, CAMERAS) + # Check that each camera has at least an address and remove if not: + for key in list(cameras_dict.keys()): + if 'address' not in cameras_dict[key].keys(): + logger.error("DEBUG: cameras_dict: %r", cameras_dict) + logger.error("Camera %r has no address configured, removing from configuration.", key) + del cameras_dict[key] + if not cameras_dict: + logger.warning("No valid camera configurations found in config file, using defaults: %r.", CAMERAS) + else: + # Only update global CAMERAS if we have at least one valid camera with an address: + CAMERAS = cameras_dict + logger.info("Using camera configuration: %r", CAMERAS) + else: + logger.warning("No 'cameras' section in config file, using defaults: %r.", CAMERAS) + logger.info("CAMERAS configuration: %r", CAMERAS) + with open(conf_file, 'w') as configfile: + cfg.write(configfile) + + return cfg + def signal_handler(sig, frame): logger.info("Received signal handler '%r', stopping client ...", sig) client.stop() + sys.exit(0) if __name__ == "__main__": @@ -168,9 +433,14 @@ if __name__ == "__main__": signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) + # Read configuration file before starting logging as it may contain logging settings: + cfg = read_config(CONFIG_FILE) + logger.info("Starting up ...") + client = ClientTask("client_task") client.start() - client.join() + + logger.info("Terminating ...")
\ No newline at end of file @@ -8,6 +8,8 @@ __author__ = "Franoosh Corporation" import logging +import subprocess +import cv2 class CustomLoggingFormatter(logging.Formatter): @@ -42,3 +44,64 @@ class CustomLoggingFormatter(logging.Formatter): return result +def process_frame(frame): + """Process frame for contour detection.""" + # Convert to grayscale: + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + # Apply Gaussian blur: + blurred = cv2.GaussianBlur(gray, (21, 21), 0) + + return blurred + +def compute_contours(frame_deque): + """Compute contours from a deque of frames.""" + contours = [] + if len(frame_deque) < 2: + return contours + all_contours = [] + + for idx, frame in enumerate(frame_deque): + frame_0 = process_frame(frame) + try: + frame_1 = process_frame(frame_deque[idx+1]) + except IndexError: + break + frame_delta = cv2.absdiff(frame_0, frame_1) + threshold = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1] + threshold = cv2.dilate(threshold, None, iterations=2) + contours, _ = cv2.findContours(threshold.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + all_contours.extend(contours) + + return all_contours + +def draw_contours(frame, contours, min_contour_area=500): + """Draw contours on the frame.""" + for contour in contours: + if cv2.contourArea(contour) > min_contour_area: + (x, y, w, h) = cv2.boundingRect(contour) + cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) + + return frame + +def detect_movement(contours, min_area=500): + """Detect movement based on contours found from frame diff.""" + for contour in contours: + if cv2.contourArea(contour) >= min_area: + return True + return False + +def get_available_cameras(): + """ + Get list of available camera devices. + At the moment it does not work. At all. It is useless. + """ + proc = subprocess.Popen(['v4l2-ctl', '--list-devices'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = proc.communicate() + candidate_devices = [i.strip() for i in stdout.decode('utf-8').strip().splitlines()[1:]] + verified_devices = [] + for device in candidate_devices: + cap = cv2.VideoCapture(device) + if cap.isOpened(): + verified_devices.append(device) + cap.release() + return verified_devices
\ No newline at end of file @@ -18,17 +18,17 @@ from helpers import CustomLoggingFormatter # TODO: add configparser -ADDR_FRONTEND = 'tcp://localhost' -ADDR_BACKEND = 'tcp://localhost' +IP_FRONTEND = '127.0.0.1' +IP_BACKEND = '127.0.0.1' PORT_FRONTEND = "5569" PORT_BACKEND = "9979" -INITIAL_FRONTEND_ADDR = f"{ADDR_FRONTEND}:{PORT_FRONTEND}" -INITIAL_BACKEND_ADDR = f"{ADDR_BACKEND}:{PORT_BACKEND}" +FRONTEND_ADDR = f"tcp://{IP_FRONTEND}:{PORT_FRONTEND}" +BACKEND_ADDR = f"tcp://{IP_BACKEND}:{PORT_BACKEND}" LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log" -LOGLEVEL = logging.DEBUG +LOGLEVEL = logging.INFO log_formatter = CustomLoggingFormatter() handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') @@ -52,13 +52,13 @@ def custom_proxy(): # Set up frontend: frontend_socket = context.socket(zmq.ROUTER) frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) - frontend_socket.bind(INITIAL_FRONTEND_ADDR) + frontend_socket.bind(FRONTEND_ADDR) poller.register(frontend_socket, zmq.POLLIN) # Set up backend: backend_socket = context.socket(zmq.ROUTER) backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) - backend_socket.bind(INITIAL_BACKEND_ADDR) + backend_socket.bind(BACKEND_ADDR) poller.register(backend_socket, zmq.POLLIN) awailable_workers = [] @@ -78,101 +78,137 @@ def custom_proxy(): pending_messages_worker_client = defaultdict(lambda: defaultdict(deque)) while RUNNING: - events = dict(poller.poll(1000)) # Poll both frontend and backend sockets: - if frontend_socket in events: # If message from client: - msg = frontend_socket.recv_multipart() # Receive message from client: - logger.debug("Received message: %r", msg) + # Poll both frontend and backend sockets: + events = dict(poller.poll(1000)) + # If message from client: + if frontend_socket in events: + # Receive message from client: + msg = frontend_socket.recv_multipart() + logger.debug("Received message.") client_id, content = msg[0], msg[1:] - if not client_id in client_worker_dict: # If client is not in client-worker dictionary: - logger.info("Received ID from client: %r.", client_id) # Check if client is already connected to worker: try: - worker_id = client_worker_dict[client_id] # Get worker ID for this client from the client-worker dictionary - while pending_messages_client_worker[client_id][worker_id]: # Check if there are any pending messages for this worker and send them first: - logger.debug("There are pending messages from client %r for worker %r", client_id, worker_id) + # Get worker ID for this client from the client-worker dictionary: + worker_id = client_worker_dict[client_id] + # Check if there are any pending messages for this worker and send them first: + while pending_messages_client_worker[client_id][worker_id]: + # In order to take a peek at the size of all messages, perhaps check out: + # https://web.archive.org/web/20240804164028/https://code.activestate.com/recipes/546530/ + logger.debug( + "There are '%d' pending messages from client %r for worker %r", + len(pending_messages_client_worker[client_id][worker_id]), + client_id, + worker_id, + ) pending_msg = pending_messages_client_worker[client_id][worker_id].pop() try: - logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id) + logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id) backend_socket.send_multipart([worker_id, client_id, *pending_msg]) except Exception as e: - logger.error("Failed to send pending message: %r to worker: %r, error: %r", pending_msg, worker_id, e) - pending_messages_client_worker[client_id][worker_id].append(pending_msg) # Re-queue the message - break # Break to avoid infinite loop - if pending_messages_client_worker[client_id][worker_id]: # If there are still pending messages for this worker: - pending_messages_client_worker[client_id][worker_id].appendleft(content) # Store message for later delivery + logger.error("Failed to send pending message to worker: %r, error: %r", worker_id, e) + # Re-queue the message: + pending_messages_client_worker[client_id][worker_id].append(pending_msg) + # Break to avoid infinite loop: + break + # If there are still pending messages for this worker: + if pending_messages_client_worker[client_id][worker_id]: + # Store message for later delivery: + pending_messages_client_worker[client_id][worker_id].appendleft(content) + # At last send new message to worker: else: try: - logger.debug("No more pending messages for client %r, sending new message to worker %r: %r", client_id, worker_id, msg) - backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker: + logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id) + backend_socket.send_multipart([worker_id, client_id, *content]) except Exception as e: - logger.error("Failed to send message to backend: %r, error: %s", msg, e) - pending_messages_client_worker[client_id][worker_id].appendleft(msg) # Store message for later delivery - except KeyError: # If client is not connected to any worker: - logger.debug("Client '%s' is not connected to any worker, checking available workers", client_id) - if awailable_workers: # If there are available workers: - worker_id = random.choice(awailable_workers) # Assign random worker to client - client_worker_dict[client_id] = worker_id # Update client-worker dictionary - if pending_messages_no_worker[client_id]: # Check if there are any pending messages for this client: - pending_messages_client_worker[client_id][worker_id].extendleft(msg) # Move messages to pending messages for this worker + logger.error("Failed to send message to backend, error: %s", e) + # Store message for later delivery: + # pending_messages_client_worker[client_id][worker_id].appendleft(msg) <- FIXME: this is the bug, no? + pending_messages_client_worker[client_id][worker_id].appendleft(content) + # If client is not connected to any worker: + except KeyError: + logger.info("Received ID from client: %r.", client_id) + # Check if there are available workers. + if awailable_workers: + # Assign random worker to client: + worker_id = random.choice(awailable_workers) + # Update client-worker dictionary: + client_worker_dict[client_id] = worker_id + # Check if there are any pending messages for this client: + if pending_messages_no_worker[client_id]: + # Move messages to pending messages for this worker: + pending_messages_client_worker[client_id][worker_id].extendleft(msg) logger.debug("Moved pending messages for client '%s' to worker '%s'", client_id, worker_id) - while pending_messages_client_worker[client_id][worker_id]: # Check if there are any pending messages for this worker: + # Check if there are any pending messages for this worker: + while pending_messages_client_worker[client_id][worker_id]: pending_msg = pending_messages_client_worker[client_id][worker_id].pop() try: - logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id) + logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id) backend_socket.send_multipart([worker_id, client_id, *pending_msg]) except Exception as e: - logger.error("Failed to send pending message: %r to worker: %r. Error: %s", pending_msg, worker_id, e) - pending_messages_client_worker[client_id][worker_id].append(pending_msg) # Re-queue the message + logger.error("Failed to send pending message to worker: %r. Error: %s", worker_id, e) + # Re-queue the message: + pending_messages_client_worker[client_id][worker_id].append(pending_msg) break - if pending_messages_client_worker[client_id][worker_id]: # If there are still pending messages for this worker: - logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery: %r", client_id, worker_id, content) - pending_messages_client_worker[client_id][worker_id].appendleft(content) # Store message for later delivery + # If there are still pending messages for this worker: + if pending_messages_client_worker[client_id][worker_id]: + logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery.", client_id, worker_id) + # Store message for later delivery: + pending_messages_client_worker[client_id][worker_id].appendleft(content) else: - logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg) + logger.debug("No more pending messages for client '%s', sending new message to worker '%s'.", client_id, worker_id) try: - logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg) - backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker: + logger.debug("No more pending messages for client '%s', sending new message to worker '%s'.", client_id, worker_id) + # At last, send the message to worker: + backend_socket.send_multipart([worker_id, client_id, *content]) except Exception as e: - logger.error("Failed to send message %r for client %r to backend, requeing. Error: %s", msg, client_id, e) - pending_messages_client_worker[client_id][worker_id].appendleft(msg) # Store message for later delivery + logger.error("Failed to send message for client %r to backend, requeing. Error: %s", client_id, e) + # Store message for later delivery: + pending_messages_client_worker[client_id][worker_id].appendleft(msg) - else: # If no workers are available, assign a new worker: - pending_messages_no_worker[client_id].append(content) # Store message for later delivery + else: + # Store message for later delivery: + pending_messages_no_worker[client_id].append(content) logger.debug("No available workers, storing client '%s' with no worker assigned", client_id) if backend_socket in events: _msg = backend_socket.recv_multipart() + # These messages should be safe to log as they are short strings or bytes. logger.debug("Received from worker: %r", _msg) if len(_msg) == 2: # Worker is sending its identity: worker_id, msg = _msg[0], _msg[1:] logger.debug("Received ID from worker: %r, message: %r", worker_id, msg) logger.info("Received ID from worker: %r.", worker_id) - awailable_workers.append(worker_id) # Add worker to available workers list + # Add worker to available workers list: + awailable_workers.append(worker_id) for client, worker in client_worker_dict.items(): - if worker is None: # If client has no worker assigned: + # If client has no worker assigned, assign it a new worker: + if worker is None: client_worker_dict[client] = worker logger.debug("Assigning worker %r to client %r", worker, client) if pending_messages_no_worker: # If there are pending messages for clients with no worker assigned: for client_id, messages in pending_messages_no_worker.items(): if messages: - pending_messages_client_worker[client_id][worker_id].extendleft(messages) # Move messages to pending messages for this worker - pending_messages_no_worker[client_id].clear() # Clear pending messages for this client + # Move messages to pending messages for this worker: + pending_messages_client_worker[client_id][worker_id].extendleft(messages) + # Clear pending messages for this client: + pending_messages_no_worker[client_id].clear() logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id) try: - pending_msg = pending_messages_client_worker[client_id][worker_id].pop() # Get the first message for this client and worker - logger.debug( - "Sending pending message %r to worker %r for client '%r'", pending_msg, worker_id, client_id) - backend_socket.send_multipart([worker_id, client_id, *pending_msg]) # Send the message to worker + # Get the first message for this client and worker: + pending_msg = pending_messages_client_worker[client_id][worker_id].pop() + logger.debug("Sending pending message to worker %r for client '%r'", worker_id, client_id) + # Send the message to worker: + backend_socket.send_multipart([worker_id, client_id, *pending_msg]) except Exception as e: pending_messages_client_worker[client_id][worker_id].append(pending_msg) - logger.error("Failed to send pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) + logger.error("Failed to send pending message to worker: %r. Error: %r", worker_id, e) else: # Worker is sending a message to client: worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:] logger.debug("Received message from worker '%s' for client '%s': %r", worker_id, client_id, msg) # First check if there are any pending messages for this worker: - while pending_messages_worker_client[worker_id][client_id]: # Check if there are any pending messages for this worker: + while pending_messages_worker_client[worker_id][client_id]: pending_msg = pending_messages_worker_client[worker_id][client_id].pop() try: logger.debug("Sending pending message %r from %r to client '%r'", pending_msg, client_id, worker_id) @@ -180,10 +216,12 @@ def custom_proxy(): except Exception as e: # It is safe to log the message content as it is a short string or bytes. logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) - pending_messages_worker_client[worker_id][client_id].append(pending_msg) # Re-queue the message - break # Break to avoid infinite loop - # Now send the message to the client if no pending messages for this client: - if not pending_messages_worker_client[worker_id][client_id]: # If there are no pending messages for this worker: + # Re-queue the message: + pending_messages_worker_client[worker_id][client_id].append(pending_msg) + # Break to avoid infinite loop: + break + # If no pending messages for this client, send new message to the client : + if not pending_messages_worker_client[worker_id][client_id]: logger.debug("Sending message %r to client '%s'", msg, client_id) try: frontend_socket.send_multipart([client_id, *msg]) diff --git a/templates/client.html b/templates/client.html index e03394a..5a52f43 100644 --- a/templates/client.html +++ b/templates/client.html @@ -12,19 +12,52 @@ <div class="streams-container"> {% for camera_id in camera_ids %} <div class="camera-stream"> - <h2>Camera {{ camera_id }}</h2> + <h2>Camera: {{ camera_id }}</h2> <img id="video-{{ camera_id }}" width="640" height="480" /> + <div> + <h3>Modify Camera Name</h3> + <input type="text" id="new-name-{{ camera_id }}" placeholder="New Name"> + <button onclick="sendConfig('{{ camera_id }}', 'modify_camera_name')">Send</button> + </div> + <div> + <h3>Modify Camera Threshold</h3> + <input type="number" id="threshold-value-{{ camera_id }}" placeholder="Threshold"> + <button onclick="sendConfig('{{ camera_id }}', 'modify_camera_threshold')">Send</button> + </div> + <div> + <h3>Modify Camera Grace Period</h3> + <input type="number" id="grace-value-{{ camera_id }}" placeholder="Grace Period"> + <button onclick="sendConfig('{{ camera_id }}', 'modify_camera_grace_pd')">Send</button> + </div> + <div> + <h3>Modify Camera Address</h3> + <input type="text" id="address-value-{{ camera_id }}" placeholder="New Address"> + <button onclick="sendConfig('{{ camera_id }}', 'modify_camera_address')">Send</button> + </div> </div> - {% endfor %} - </div> + {% endfor %} + <div> + <h3>Add Camera</h3> + <input type="text" id="add-name" placeholder="Camera Name"> + <input type="text" id="add-address" placeholder="Address"> + <button onclick="sendConfig('add_camera')">Send</button> + </div> + <div> + <h3>Remove Camera</h3> + <input type="text" id="remove-name" placeholder="Camera Name"> + <button onclick="sendConfig('remove_camera')">Send</button> + </div> + </div> <script> - // For each camera, open a WebSocket and update the corresponding <img> + const wsMap = {}; {% for camera_id in camera_ids %} + // For each camera, open a WebSocket and update the corresponding <img> (function() { - let ws = new WebSocket('ws://' + window.location.host + '/ws/{{ client_id }}/{{ camera_id }}'); - let image = document.getElementById('video-{{ camera_id }}'); + const cameraId = '{{ camera_id }}'; + const ws = new WebSocket('ws://' + window.location.host + '/ws/{{ client_id }}/' + cameraId); let currentUrl = null; ws.onmessage = function(event) { + let image = document.getElementById('video-' + cameraId); if (currentUrl) { URL.revokeObjectURL(currentUrl); } @@ -32,16 +65,65 @@ image.src = currentUrl; }; ws.onclose = function(event) { - console.log('WebSocket closed for camera {{ camera_id }}:', event); + console.log('WebSocket closed for camera ' + cameraId + ':', event); }; ws.onerror = function(event) { - console.log('WebSocket error for camera {{ camera_id }}:', event); + console.log('WebSocket error for camera ' + cameraId + ':', event); }; window.addEventListener('beforeunload', function() { ws.close(); }); + wsMap[cameraId] = ws; })(); {% endfor %} + + function sendConfig(cameraId, type) { + let msg = {}; + switch(type) { + case 'modify_camera_name': + msg[type] = [ + // cameraId, + document.getElementById('new-name-' + cameraId).value + ]; + break; + case 'modify_camera_threshold': + msg[type] = [ + // cameraId, + document.getElementById('threshold-value-' + cameraId).value + ]; + break; + case 'modify_camera_grace_pd': + msg[type] = [ + // cameraId, + parseInt(document.getElementById('grace-value-' + cameraId).value) + ]; + break; + case 'modify_camera_address': + msg[type] = [ + // cameraId, + document.getElementById('address-value-' + cameraId).value + ]; + break; + case 'add_camera': + msg[type] = [ + document.getElementById('add-name').value, + document.getElementById('add-address').value + ]; + break; + case 'remove_camera': + msg[type] = [ + document.getElementById('remove-name').value + ]; + break; + } + const ws = wsMap[cameraId] && wsMap[cameraId] ? wsMap[cameraId] : Object.values(wsMap)[0]; + if (ws && ws.readyState === WebSocket.OPEN) { + console.log("Sending message:", msg, "on ws:", ws); + ws.send(JSON.stringify(msg)); + } else { + alert('WebSocket is not open for camera ' + cameraId); + } + } </script> </body> </html> diff --git a/webserver.py b/webserver.py index d1c0a1e..162badc 100644 --- a/webserver.py +++ b/webserver.py @@ -1,42 +1,43 @@ #!/usr/bin/env python - """ Module serving video from zmq to a webserver. """ - __author__ = "Franoosh Corporation" - import os -from collections import defaultdict +from collections import defaultdict, deque import json import logging +import zmq.asyncio import asyncio -from threading import Thread +from collections import defaultdict +from contextlib import asynccontextmanager import uvicorn +import zmq from fastapi import ( FastAPI, Request, HTTPException, WebSocket, + WebSocketDisconnect, templating, ) from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from helpers import CustomLoggingFormatter -import zmq CLIENTS_JSON_FILE = os.path.join(os.getcwd(), 'clients.json') +CLIENTS_DICT = defaultdict(list) # CLIENTS_DICT[client_id] = [camera_id1, camera_id2, ...] LOGFILE = 'webserver.log' -LOGLEVEL = logging.INFO +LOGLEVEL = logging.DEBUG HOST = "127.0.0.1" -ZMQPORT = "9979" -WSPORT = "8008" -ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}" -WS_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}" +ZMQ_PORT = "9979" +WEB_PORT = "8008" +CTRL_BACKEND_ADDR = f"tcp://{HOST}:{ZMQ_PORT}" +WEB_BACKEND_ADDR = f"tcp://{HOST}:{WEB_PORT}" log_formatter = CustomLoggingFormatter() handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') @@ -50,88 +51,175 @@ logging.basicConfig( level=LOGLEVEL, ) - - -app = FastAPI() +# Track websocket connections by (client_id, camera_id): +ws_connections = defaultdict(dict) # ws_connections[client_id][camera_id] = websocket +ws_queues = defaultdict(dict) +ctrl_msg_que = asyncio.Queue() +# Create ZMQ context and socket: +zmq_context = zmq.asyncio.Context() +zmq_socket = zmq_context.socket(zmq.DEALER) +# Connect to ZMQ backend: +zmq_socket.connect(WEB_BACKEND_ADDR) + + +async def zmq_bridge(): + """Bridge between ZMQ backend and websocket clients.""" + while True: + try: + data = await zmq_socket.recv_multipart() + topic, frame_data = None, None + if len(data) == 2: + topic, frame_data = data + elif len(data) == 3: + topic, _, _ = data + else: + logger.warning("Received invalid ZMQ message: %r", data) + continue + if topic: + client_id, camera_id = topic.decode('utf-8').split(':', 1) + if frame_data: + # Add client and camera to CLIENTS_DICT if new: + if not camera_id in CLIENTS_DICT[client_id]: + CLIENTS_DICT[client_id].append(camera_id) + else: + # No frame data means a notification to remove camera: + try: + CLIENTS_DICT[client_id].remove(camera_id) + except ValueError: + pass + + queue = ws_queues.get(client_id, {}).get(camera_id) + if queue and frame_data: + if queue.full(): + _ = queue.get_nowait() # Discard oldest frame + await queue.put(frame_data) + if not ctrl_msg_que.empty(): + client_id, camera_id, command, args_list = await ctrl_msg_que.get() + zmq_socket.send_multipart([ + client_id.encode('utf-8'), + camera_id.encode('utf-8'), + command.encode('utf-8'), + ] + args_list) + logger.info("Sent control command '%s' with args: %r for '/ws/%s/%s' to backend.", command, args_list, client_id, camera_id) + except Exception as e: + logger.error("Error in ZMQ bridge: %r", e) + # TODO: Check if this loop can be optimized to avoid busy waiting. + # Alternative implementation using zmq Poller: + # poll = zmq.asyncio.Poller() + # poll.register(zmq_socket, zmq.POLLIN) + # while True: + # try: + # sockets = dict(await poll.poll()) + # if zmq_socket in sockets: + # topic, frame_data = await zmq_socket.recv_multipart() + # client_id, camera_id = topic.decode('utf-8').split(':', 1) + # set_clients(client_id, camera_id) + # queue = ws_queues.get(client_id, {}).get(camera_id) + # if queue: + # if queue.full(): + # _ = queue.get_nowait() # Discard oldest frame + # await queue.put(frame_data) + # if not ctrl_msg_que.empty(): + # client_id, camera_id, command, args_list = await ctrl_msg_que.get() + # zmq_socket.send_multipart([ + # client_id.encode('utf-8'), + # camera_id.encode('utf-8'), + # command.encode('utf-8'), + # ] + args_list) + # logger.info("Sent control command '%s' with args: %r for '/ws/%s/%s' to backend.", command, args_list, client_id, camera_id) + # except Exception as e: + # logger.error("Error in ZMQ bridge: %r", e) + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Create lifespan context for FastAPI app.""" + asyncio.create_task(zmq_bridge()) + yield + +app = FastAPI(lifespan=lifespan) app.mount("/static", StaticFiles(directory="static"), name="static") templates = templating.Jinja2Templates(directory='templates') -# Track websocket connections by (client_id, camera_id) -ws_connections = defaultdict(dict) # ws_connections[client_id][camera_id] = websocket - -# Set up a single ZMQ SUB socket for all websocket connections -zmq_context = zmq.Context() -zmq_socket = zmq_context.socket(zmq.SUB) -zmq_socket.bind(WS_BACKEND_ADDR) -zmq_socket.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all topics -poller = zmq.Poller() -poller.register(zmq_socket, zmq.POLLIN) - -def load_clients(): - try: - with open(CLIENTS_JSON_FILE) as f: - clients_dict = json.load(f) - except FileNotFoundError: - clients_dict = {} - return clients_dict @app.get("/") async def main_route(request: Request): - logger.error("DEBUG: main route visited") - clients = load_clients() + """Serve main page.""" + logger.debug("Main route visited") return templates.TemplateResponse( "main.html", { "request": request, - "clients": clients, + "clients": CLIENTS_DICT, } ) @app.get("/clients/{client_id}", response_class=HTMLResponse) async def client_route(request: Request, client_id: str): """Serve client page.""" - clients_dict = load_clients() - logger.debug("Checking client_id: '%s' in clients_dict: %r.", client_id, clients_dict) - if not client_id in clients_dict: - return HTTPException(status_code=404, detail="No such client ID.") + logger.debug("Checking client_id: '%s' in clients_dict: %r.", client_id, CLIENTS_DICT) + if not client_id in CLIENTS_DICT: + raise HTTPException(status_code=404, detail="No such client ID.") return templates.TemplateResponse( "client.html", { "request": request, "client_id": client_id, - "camera_ids": clients_dict[client_id], + "camera_ids": CLIENTS_DICT[client_id], }, ) - @app.websocket("/ws/{client_id}/{camera_id}") async def camera_route(websocket: WebSocket, client_id: str, camera_id: str): """Serve a particular camera page.""" logger.info("Accepting websocket connection for '/ws/%s/%s'.", client_id, camera_id) await websocket.accept() - if client_id not in ws_connections: - ws_connections[client_id] = {} - ws_connections[client_id][camera_id] = websocket - try: + ws_connections[client_id][camera_id] = {'ws': websocket} + ws_queues[client_id][camera_id] = asyncio.Queue(maxsize=10) + queue = ws_queues[client_id][camera_id] + + async def send_frames(): + while True: + frame_data = await queue.get() + await websocket.send_bytes(frame_data) + + async def receive_control(): while True: - # Wait for a frame for this client/camera - sockets = dict(poller.poll(1000)) - if zmq_socket in sockets: - msg = zmq_socket.recv_multipart() - if len(msg) == 3: - recv_client_id, recv_camera_id, content = msg - recv_client_id = recv_client_id.decode("utf-8") - recv_camera_id = recv_camera_id.decode("utf-8") - # Only send to the websocket for this client/camera - if recv_client_id == client_id and recv_camera_id == camera_id: - await websocket.send_bytes(content) - except Exception as exc: - logger.warning("Connection closed: %r", exc) + try: + data = await websocket.receive_text() + logger.info("Received control message from '/ws/%s/%s': %s", client_id, camera_id, data) + # Handle control messages from the client: + frontend_message = json.loads(data) + for command, args in frontend_message.items(): + args_list = [str(arg).encode('utf-8') for arg in args] + ctrl_msg_que.put_nowait((client_id, camera_id, command, args_list)) + # zmq_control_socket.send_multipart([ + # client_id.encode('utf-8'), + # camera_id.encode('utf-8'), + # command.encode('utf-8'), + # b'', + # ] + args_list) + # logger.info("Sent control command '%s' with args: %r for '/ws/%s/%s' to backend.", command, args_list, client_id, camera_id) + logger.info("Put control command '%s' with args: %r for '/ws/%s/%s' on queue to backend.", command, args_list, client_id, camera_id) + except json.JSONDecodeError: + logger.warning("Received invalid JSON from '/ws/%s/%s': %s", client_id, camera_id, data) + except WebSocketDisconnect: + logger.info("WebSocket disconnected for '/ws/%s/%s'.", client_id, camera_id) + break + except Exception as exc: + logger.warning("Error receiving control message: %r", exc) + send_task = asyncio.create_task(send_frames()) + receive_task = asyncio.create_task(receive_control()) + try: + # await asyncio.gather(send_task, receive_task) + await asyncio.wait( + [send_task, receive_task], + return_when=asyncio.FIRST_COMPLETED, + ) finally: - if client_id in ws_connections and camera_id in ws_connections[client_id]: - del ws_connections[client_id][camera_id] - await websocket.close() - + send_task.cancel() + receive_task.cancel() + ws_connections[client_id].pop(camera_id, None) + ws_queues[client_id].pop(camera_id, None) if __name__ == "__main__": uvicorn.run( @@ -139,4 +227,4 @@ if __name__ == "__main__": port=8007, host='127.0.0.1', log_level='info', - )
\ No newline at end of file + ) @@ -1,29 +1,51 @@ #!/usr/bin/env python -__author__ = "Franoosh Corporation" """ A module consisting of a zeromq worker receiving video stream via router from clients. Able to send control messages to clients. +Needs libzmq and pyzmq with 'drafts' support. """ import os +import sys from threading import Thread, Event import time import signal import logging from queue import Queue from collections import defaultdict +import tempfile +import json import zmq +import cv2 +import numpy from helpers import CustomLoggingFormatter +__author__ = "Franoosh Corporation" + +# Various constants: # TODO: put them in a config + +HOST = "127.0.0.1" +ZMQPORT = "9979" +# WSPORT = "8000" +WSPORT = "8008" +ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}" +WEB_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}" +# LOGLEVEL = logging.DEBUG +LOGLEVEL = logging.INFO + +# File paths: -ADDR = "tcp://localhost" -PORT = "9979" -INITIAL_BACKEND_ADDR = f"{ADDR}:{PORT}" LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log" -LOGLEVEL = logging.DEBUG +CWD = os.getcwd() +TMP_DIR = os.path.join(CWD, "tmp") +CLIENTS_JSON_FILE = os.path.join(CWD, 'clients.json') + +# Other: + +CLIENTS_DICT = {} stop_event = Event() @@ -39,6 +61,14 @@ logging.basicConfig( level=LOGLEVEL, ) + +if not os.path.exists(TMP_DIR): + try: + os.makedirs(TMP_DIR) + except Exception as exc: + logger.error("Could not create temporary directory: %r", exc) + sys.exit() + class MonitorTask(Thread): """Monitor task""" def __init__(self, socket): @@ -92,14 +122,45 @@ class ServerWorker(Thread): self.socket.identity = self.id.encode("utf-8") self.monitor = MonitorTask(self.socket) self.video_threads = defaultdict(lambda: defaultdict(dict)) + self.poller = zmq.Poller() + self.web_sock = self.context.socket(zmq.DEALER) self.connected = False self.running = True + def start_client(self, client_id, camera_id, filename): + """ + Start a video thread for new client_id and camera_id. + """ + if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id): # New client or new camera + q = Queue() + logger.debug("Starting new video thread for client '%s', camera '%s'", client_id, camera_id) + video_worker = VideoWorker(client_id, camera_id, filename, q) + video_worker.start() + self.video_threads[client_id][camera_id] = video_worker + + def stop_client(self, client_id, camera_id): + """ + Stop video thread for a client_id and camera_id. + """ + if client_id in self.video_threads and camera_id in self.video_threads[client_id]: + logger.debug("Stopping video thread for client '%s', camera '%s'", client_id, camera_id) + self.video_threads[client_id][camera_id].stop() # Stop the thread + del self.video_threads[client_id][camera_id] + logger.info("Stopped video thread for client '%s', camera '%s'", client_id, camera_id) + def run(self): + """ + Main loop of the worker. + Full of wonders. + """ logger.debug("ServerWorker '%s' starting ...", self.id) - self.socket.connect(INITIAL_BACKEND_ADDR) - self.poller = zmq.Poller() + self.socket.connect(ZMQ_BACKEND_ADDR) + try: + self.web_sock.bind(WEB_BACKEND_ADDR) + except Exception as exc: + logger.error("Connection to zmq websocket failed: %r", exc) self.poller.register(self.socket, zmq.POLLIN) + self.poller.register(self.web_sock, zmq.POLLIN) self.monitor.start() while self.running: @@ -112,67 +173,93 @@ class ServerWorker(Thread): if self.socket in sockets: self.connected = True msg = self.socket.recv_multipart() - logger.debug("ServerWorker '%s' received message: %r", self.id, msg) + logger.debug("ServerWorker '%s' received message of length: %d.", self.id, len(msg)) filename = None - # At the moment we don't expect any other message than a video message: + # At the moment we don't expect any other message than a start/stop message (of length 4) and a video message: if len(msg) == 4: # This is a message with start/stop directive and no video data. - logger.debug("Received start/stop directive: %r", msg) + logger.debug("Received start/stop directive: (?)") client_id, camera_id, metadata = msg[0], msg[1], msg[2] + # Convert bytes to str once and for all: + client_id = client_id.decode('utf-8') + camera_id = camera_id.decode('utf-8') + update_clients(client_id, camera_id) try: - directive, timestamp = metadata.decode("utf-8").split(":") - filename = f"{client_id.decode('utf-8')}_{camera_id.decode('utf-8')}-{timestamp}.mp4" - logger.debug("Directive: '%s', Timestamp: '%s', Filename: '%s'", directive, timestamp, filename) + # Directive and data are fron now on converted to strings: + directive, data = metadata.decode("utf-8").split(":") + logger.info( + "Received directive '%s' with data: %r for client '%s', camera '%s'", + directive, + data, + client_id, + camera_id, + ) except ValueError: logger.error("Invalid metadata format: %r", metadata) + directive = None continue - if directive == "start": - if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id): # New client or new camera - q = Queue() - logger.debug("Starting new video thread for client '%s', camera '%s'", client_id, camera_id) - video_worker = VideoWorker(filename, q) - video_worker.start() - self.video_threads[client_id][camera_id] = video_worker - elif directive == "stop": - if client_id in self.video_threads and camera_id in self.video_threads[client_id]: - logger.debug("Stopping video thread for client '%s', camera '%s'", client_id, camera_id) - self.video_threads[client_id][camera_id].queue.put(None) # Sentinel to stop the thread - del self.video_threads[client_id][camera_id] - logger.info("Stopped video thread for client '%s', camera '%s'", client_id, camera_id) + if directive == 'rename': + old_name, new_name, timestamp = None, None, None + try: + old_name, new_name, timestamp = data.split(":") + logger.info("Renamed video thread from '%s' to '%s'.", old_name, new_name) + except ValueError: + logger.error("Invalid rename data format: %r", data) + continue + if old_name and new_name and timestamp: + # I think it's better to stop the old thread and start a new one, + # rather than reuse the old one as it's less mucking about. + self.stop_client(old_name, camera_id) + self.start_client(new_name, camera_id, f"{new_name}_{camera_id}-{timestamp}.mkv") + to_remove= b':'.join([client_id, camera_id]) + try: + self.web_sock.send_multipart([to_remove, b'', b'']) # Notify webserver of rename + except Exception as exc: + logger.error("Sending rename notification to websocket failed: %r", exc) else: - logger.error("Unknown directive: '%s'", directive) - elif len(msg) == 3: # This is a video message with data only - logger.debug("Received video message with data only: %r", msg) - client_id, camera_id, msg = msg[0], msg[1], msg[2] + timestamp = data + filename = f"{client_id}_{camera_id}-{timestamp}.mkv" + logger.debug("Directive: '%s', Timestamp: '%s', Filename: '%s'", directive, timestamp, filename) + if directive == "start": + self.start_client(client_id, camera_id, filename) + elif directive == "stop": + self.stop_client(client_id, camera_id) + else: + logger.error("Unknown directive: %r", directive) + + elif len(msg) == 3: # This is a video message with data + logger.debug("Received video message with data only.") + client_id, camera_id, content = msg[0], msg[1], msg[2] + client_id = client_id.decode('utf-8') + camera_id = camera_id.decode('utf-8') if client_id in self.video_threads and camera_id in self.video_threads[client_id]: - self.video_threads[client_id][camera_id].queue.put(msg) + self.video_threads[client_id][camera_id].queue.put(content) + # Send only [client_id, camera_id, jpeg_bytes] to the webserver: + # zmq subsciber can subscribe to a topic defined by the first + # part of the multipart message, so in order to allow for a + # per camera subscription, we need to join client_id and camera_id + topic = ':'.join([client_id, camera_id]).encode('utf-8') + try: + self.web_sock.send_multipart([topic, content], flags=zmq.NOBLOCK) + except Exception as exc: + logger.error("Sending message to websocket failed: %r", exc) else: logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id) logger.error("Available video threads keys: %r", self.video_threads.keys()) logger.error("Available video threads values: %r", self.video_threads.values()) - elif len(msg) == 2: # This is a ping message from client - logger.debug("Received ping message: %r", msg) - client_id, camera_id = msg[0], msg[1] - if client_id in self.video_threads and camera_id in self.video_threads[client_id]: - # Respond with a pong message - try: - self.socket.send_multipart([client_id, camera_id, b'pong']) - logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id) - except zmq.ZMQError as exc: - logger.error("Failed to send pong: %r", exc) - else: - logger.warning("Received a message of not expected length from client: %r message: %r", client_id, msg) - try: - self.socket.send_multipart([client_id, camera_id, b'pong']) - logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id) - except zmq.ZMQError as exc: - logger.error("Failed to send pong: %r", exc) + logger.warning("Received a message of unexpected length from client. Message length: %d", len(msg)) else: logger.debug("No message received, polling again ...") time.sleep(5) + if self.web_sock in sockets: + frontend_msg = self.web_sock.recv_multipart() + logger.info("Received message from frontend: %r", frontend_msg) + self.socket.send_multipart(frontend_msg) + logger.info("Forwarded message to backend: %r", frontend_msg) self.monitor.stop() self.monitor.join() + for camera_thread in self.video_threads.values(): for thread in camera_thread.values(): thread.queue.put(None) # Sentinel to unblock queue.get() @@ -183,7 +270,7 @@ class ServerWorker(Thread): """Send control message to a specific client and camera.""" if client_id in self.video_threads and camera_id in self.video_threads[client_id]: self.socket.send_multipart([client_id.encode("utf-8"), camera_id.encode("utf-8"), message]) - logger.debug("Sent control message to client '%s', camera '%s': %r", client_id, camera_id, message) + logger.info("Sent control message to client '%s', camera '%s': %r", client_id, camera_id, message) else: logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id) @@ -195,34 +282,62 @@ class ServerWorker(Thread): class VideoWorker(Thread): """Class for video threads.""" - def __init__(self, filename, queue): + def __init__(self, client_id, camera_id, filename, queue): super().__init__(daemon=True) + self.context = zmq.Context() + self.context.setsockopt(zmq.LINGER, 0) + self.client_id = client_id + self.camera_id = camera_id self.filename = filename self.queue = queue self.live = True + def stop(self): + logger.info("VideoWorker %r exiting ...", self.camera_id) + self.live = False + def run(self): if os.path.exists(self.filename): logger.warning("File '%s' already exists, overwriting ...", self.filename) - with open(self.filename, 'wb') as f: - logger.info("VideoWorker started writing to file: %s", self.filename) - while self.live: - # Simulate video processing - frame = self.queue.get() # Get frame from queue - logger.debug("Processing ('writing to a file') frame for camera: %r", frame) - if frame is None: - logger.info("VideoWorker received stop signal, exiting ...") - break - f.write(frame + b'\n') # Write frame to file - - def stop(self): - logger.info("VideoWorker '' exiting ...") - self.live = False + fourcc = cv2.VideoWriter_fourcc(*'VP80') + out = cv2.VideoWriter(self.filename, fourcc, 30.0, (640, 480)) # Assuming 640x480 resolution + logger.info("This is the first run, binding websocket ...") + while self.live: + logger.debug("VideoWorker writing to file: %s", self.filename) + frame_bytes = self.queue.get() + if frame_bytes is None: + logger.debug("Received None, stopping video worker for camera: '%s'", self.camera_id) + break + frame = cv2.imdecode(numpy.frombuffer(frame_bytes, dtype=numpy.uint8), cv2.IMREAD_COLOR) + logger.debug("Processing ('writing to a file') frame for camera: '%s'", self.camera_id) + # Write frame to file: + out.write(frame) + + # Release video writer + out.release() + logger.info("VideoWorker finished writing to file: %s", self.filename) def signal_handler(sig, frame): worker.stop() +def update_clients(client_id, camera_id): + """Update client and camera dictionary and write to a shared file.""" + global CLIENTS_DICT + if client_id not in CLIENTS_DICT: + logger.debug("Client_id not in CLIENTS_DICT, adding an empty list for it.") + CLIENTS_DICT[client_id] = [] + if camera_id not in CLIENTS_DICT[client_id]: + logger.debug("Camera_id not in CLIENTS_DICT[%s] list, adding", client_id) + CLIENTS_DICT[client_id].append(camera_id) + # Atomic write using tempfile. Works only when both files on the same filesystem + with tempfile.NamedTemporaryFile('w', dir=TMP_DIR, delete=False) as tf: + logger.debug("Dumping to file CLIENTS_DICT: %r", CLIENTS_DICT) + json.dump(CLIENTS_DICT, tf) + tempname = tf.name + os.replace(tempname, CLIENTS_JSON_FILE) + + if __name__ == "__main__": logger.info("Starting up ...") @@ -233,3 +348,7 @@ if __name__ == "__main__": worker.start() worker.join() + try: + os.remove(CLIENTS_JSON_FILE) + except FileNotFoundError: + pass
\ No newline at end of file |
