#!/usr/bin/env python """ A module containing client for streaming video to a zmq router. """ __author__ = "Franoosh Corporation" import sys from os import path from threading import Thread, Event from queue import Queue, Empty from collections import deque import time import datetime import signal import logging from configparser import ConfigParser import zmq import cv2 from collections import defaultdict from helpers import ( CustomLoggingFormatter, compute_contours, detect_movement, draw_contours, ) ################################################### # Configuration, defaults and logging setup ################################################### CONFIG_FILE = "client.cfg" # Those will be updated from config file if present: LOGDIR = '.' LOGFILE = path.join(LOGDIR, 'client.log') LOGLEVEL = logging.INFO ROUTER_ADDRESS = "localhost" PORT = "5569" ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}" # This is a default: CAMERAS = { 'default camera': { 'address': '/dev/video0', }, } KNOWN_CAMERA_PARAMETERS = ('address', 'contour_size_threshold', 'movement_grace_period') CONTOUR_SIZE_THRESHOLD = 4000 # Minimum contour area to consider MOVEMENT_GRACE_PERIOD = 10 # Seconds to wait after movement stops TIME_FORMAT = '%Y_%m_%d-%H_%M_%S' START_STOP_MESSAGE_FMT = "{action}:{data}" stop_event = Event() log_formatter = CustomLoggingFormatter() handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') handler.setFormatter(log_formatter) logging.root.addHandler(handler) logging.root.setLevel(LOGLEVEL) logger = logging.getLogger(__name__) logging.basicConfig( filename=LOGFILE, datefmt=TIME_FORMAT, level=LOGLEVEL, ) ################################################### # End configuration, defaults and logging setup ################################################### class ClientVideo(Thread): """Class for sending video stream""" def __init__(self, _id, device_dict, queue): super().__init__(daemon=True) self.identity = _id self.device_dict = device_dict self.device_addr = device_dict.get('address', '/dev/video0') try: # Could move parameter validation to config reading function: self.device_threshold = int(device_dict['contour_size_threshold']) except (ValueError, KeyError) as exc: logger.error("ClientVideo '%s': Invalid contour size threshold: %r, using default: %d", self.identity, exc, CONTOUR_SIZE_THRESHOLD) self.device_threshold = CONTOUR_SIZE_THRESHOLD try: self.device_grace_period = int(device_dict['movement_grace_period']) except (ValueError, KeyError) as exc: logger.error("ClientVideo '%s': Invalid movement grace period: %r, using default: %d", self.identity, exc, MOVEMENT_GRACE_PERIOD) self.device_grace_period = MOVEMENT_GRACE_PERIOD self.queue = queue self.frame_deque = deque(maxlen=2) # Store last 2 frames self.live = True def put_metadata_message(self, action, data): """Put metadata message to queue.""" metadata = START_STOP_MESSAGE_FMT.format( action=action, data=data, ) logger.info("ClientVideo '%s': %s at: %r", self.identity, action, data) message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] self.queue.put(message) def put_video_message(self, frame): # Encode frame as JPEG: _, buffer = cv2.imencode('.jpg', frame) # Add identity to message: message = [self.identity.encode('utf-8'), buffer.tobytes()] # Put message to queue: self.queue.put(message) def run(self): """Video streaming logic.""" logger.debug("ClientVideo '%s' starting ...", self.identity) cap = cv2.VideoCapture(self.device_addr) if not cap.isOpened(): logger.error("ClientVideo '%s': Unable to open video device '%s'", self.identity, self.device_addr) return movement_detected = False movement_detected_start = None while self.live and not stop_event.is_set(): ret, frame = cap.read() if not ret: logger.error("ClientVideo '%s': Failed to read frame from device '%s'", self.identity, self.device_addr) break self.frame_deque.append(frame) # Skip on the first frame: if len(self.frame_deque) >= 2: contours = compute_contours(self.frame_deque) logger.debug("ClientVideo '%s': Found %d contours.", self.identity, len(contours)) logger.debug("ClientVideo '%s': Contours: %r", self.identity, contours) movement_now = detect_movement(contours, min_area=self.device_threshold) if movement_now: logger.debug("ClientVideo '%s': Movement detected in frame.", self.identity) # Only update movement start time if movement was not detected before: if not movement_detected: # Update movement detected start time: movement_detected_start = datetime.datetime.now() # Only send start message if movement was not detected before: timestamp = movement_detected_start.strftime(TIME_FORMAT) self.put_metadata_message( action='start', data=timestamp, ) # and set movement detected flag: movement_detected = True else: logger.debug("ClientVideo '%s': No movement detected in frame.", self.identity) if movement_detected: # Draw contours on frame: frame = draw_contours(frame, contours, min_contour_area=self.device_threshold) # Send video message with contour and frame: self.put_video_message(frame) # Check if movement has stopped, taking into account grace period: now = datetime.datetime.now() delta_seconds = (now - movement_detected_start).total_seconds() logger.debug("ClientVideo '%s': delta seconds since movement detected: %d, grace remaining: %d.", self.identity, delta_seconds, self.device_grace_period - delta_seconds) # Wait seconds before deciding movement has stopped: if delta_seconds > self.device_grace_period: timestamp = now.strftime(TIME_FORMAT) self.put_metadata_message( action='stop', data=timestamp, ) movement_detected = False movement_detected_start = None else: logger.debug("ClientVideo '%s': Frame deque length: %d", self.identity, len(self.frame_deque)) logger.info("ClientVideo '%s' closing socket ...", self.identity) def stop(self): logger.info("Client video '%s' exiting ...", self.identity) self.live = False class ClientTask(Thread): """Main Client Task with logic, message handling and setup.""" def __init__(self, _id, context=None): super().__init__(daemon=True) self.identity = _id self.context = context or zmq.Context.instance() self.socket = self.context.socket(zmq.DEALER) self.socket.identity = self.identity.encode("utf-8") self.poll = zmq.Poller() self.poll.register(self.socket, zmq.POLLIN) # self.video_threads = [] self.video_threads = {} self.running = True self.connected = False self.queue = Queue() def receive_messages(self): """Receive messages from the server.""" while self.running: try: sockets = dict(self.poll.poll(1000)) if self.socket in sockets: try: message = self.socket.recv_multipart() logger.info("Client '%s' received message: %r.", self.identity, message) self.update_config(message) except Exception as exc: logger.error("Failed to receive message: %r", exc) except zmq.ZMQError as exc: logger.error("Client '%s': socket error: %r", self.identity, exc) time.sleep(0.01) # Sleep to avoid busy waiting def send_messages(self): """Send messages to the server.""" while self.running: try: frame = self.queue.get(timeout=0.1) logger.debug("Sending message of length %r ...", len(frame)) try: self.socket.send_multipart(frame) except zmq.ZMQError as exc: logger.error("Client '%s': socket error: %r", self.identity, exc) except Empty: continue # No message to send, continue waiting def update_config(self, message): """ Update configuration based on message from server. Only 'cameras' section can be updated. directives = ( 'modify_camera_name', 'modify_camera_threshold', 'modify_camera_grace_pd', 'modify_camera_address', 'add_camera', 'remove_camera', ) """ try: msg_list = [part.decode('utf-8') for part in message] logger.debug("Client received config message: %r", msg_list) camera_id, directive, value = msg_list[0], msg_list[1], msg_list[2] if directive in ( 'modify_camera_name', 'modify_camera_threshold', 'modify_camera_grace_pd', 'modify_camera_address', ): if camera_id not in CAMERAS: logger.warning("Cannot rename unknown camera: %r", camera_id) logger.warning("Known cameras: %r", list(CAMERAS.keys())) else: if directive == 'modify_camera_name': old_name, new_name = camera_id, value if new_name in CAMERAS: raise ValueError("New camera name already exists.") CAMERAS[new_name] = CAMERAS.pop(old_name) self.video_threads[new_name] = self.video_threads.pop(old_name) # Send message to server to update routing: timestamp = datetime.datetime.now().strftime(TIME_FORMAT) self.video_threads[new_name].put_metadata_message( action='rename', data=':'.join([old_name, new_name, timestamp]) ) self.send_messages for item in cfg.items('cameras'): if item[0].startswith(old_name + '.'): param = item[0].split('.', 1)[1] cfg.set('cameras', f"{new_name}.{param}", item[1]) cfg.remove_option('cameras', item[0]) logger.info("Renamed and restarted thread for camera '%s' to '%s'.", old_name, new_name) elif directive == 'modify_camera_threshold': new_threshold = int(value) if new_threshold <= 0: raise ValueError("Threshold must be positive.") CAMERAS[camera_id]['contour_size_threshold'] = new_threshold self.video_threads[camera_id].device_threshold = new_threshold cfg.set('cameras', f"{camera_id}.contour_size_threshold", str(new_threshold)) logger.info("Modified contour size threshold of camera '%s' to %d.", camera_id, new_threshold) elif directive == 'modify_camera_grace_pd': new_grace_pd = int(value) if new_grace_pd < 0: raise ValueError("Grace period cannot be negative.") CAMERAS[camera_id]['movement_grace_period'] = new_grace_pd self.video_threads[camera_id].device_grace_period = new_grace_pd cfg.set('cameras', f"{camera_id}.movement_grace_period", str(new_grace_pd)) logger.info("Modified movement grace period of camera '%s' to %d.", camera_id, new_grace_pd) elif directive == 'modify_camera_address': new_address = value CAMERAS[camera_id]['address'] = new_address self.video_threads[camera_id].device_addr = new_address # Changing address on the fly requires restarting the video thread: self.video_threads[camera_id].stop() self.video_threads[camera_id].join() vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue) vid_cl.start() self.video_threads[camera_id] = vid_cl logger.info("Modified address of camera '%s' to '%s'.", camera_id, new_address) cfg.set('cameras', f"{camera_id}.address", new_address) elif directive == 'add_camera': cam_address = value if camera_id in CAMERAS: raise ValueError("Camera name already exists.") CAMERAS[camera_id] = {'address': cam_address} vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue) vid_cl.start() self.video_threads[camera_id] = vid_cl cfg.set('cameras', f"{camera_id}.address", cam_address) logger.info("Added and started a thread for new camera '%s' with address '%s'.", camera_id, cam_address) else: logger.warning("Unknown config directive: %r", directive) except (ValueError, IndexError) as exc: logger.error("Failed to handle config message: %r", exc) cfg.write(open(CONFIG_FILE, 'w')) def run(self): """Main client logic.""" self.socket.connect(ADDRESS) logger.debug("Client '%s' connected to %s", self.identity, ADDRESS) logger.debug("Starting video threads ...") for _id, device_dict in CAMERAS.items(): vid_cl = ClientVideo(_id, device_dict, self.queue) vid_cl.start() self.video_threads[_id] = (vid_cl) recv_thread = Thread(target=self.receive_messages, daemon=True) send_thread = Thread(target=self.send_messages, daemon=True) recv_thread.start() send_thread.start() logger.debug("Client '%s' started receiving and sending threads.", self.identity) logger.debug("Client '%s' waiting for threads to finish ...", self.identity) recv_thread.join() send_thread.join() logger.info("Closing socket ...") self.socket.close() logger.debug("Terminating context ...") self.context.term() def stop(self): """Stop task""" logger.info("ClientTask cleaning up ...") for _id, thread in self.video_threads.items(): logger.info("Cleaning video thread %s ...", _id) thread.stop() thread.join() # self.video_threads = [] self.video_threads = {} logger.info("Client task exiting ...") self.running = False def read_config(conf_file): """ Read config file and return as dictionary. The only required section is 'cameras' with at least one camera. Do basic sanity checks and update global variables. Log warnings if config file or sections/options are missing. """ cfg = ConfigParser() cfg.read(conf_file) # Need to update logging info: if 'logging' in cfg: global LOGDIR, LOGFILE, LOGLEVEL LOGDIR = cfg.get('logging', 'logdir', fallback='.') LOGFILE = cfg.get('logging', 'logfile', fallback=LOGFILE) LOGLEVEL = cfg.get('logging', 'loglevel', fallback=LOGLEVEL) logger.setLevel(LOGLEVEL) for _handler in logger.handlers: _handler.setLevel(LOGLEVEL) if 'network' in cfg: global ROUTER_ADDRESS, PORT, ADDRESS if cfg.has_option('network', 'router_address'): ROUTER_ADDRESS = cfg.get('network', 'router_address') else: logger.warning("No 'router_address' option in 'network' section, using default: %r.", ROUTER_ADDRESS) if cfg.has_option('network', 'router_port'): PORT = cfg.get('network', 'router_port') else: logger.warning("No 'router_port' option in 'network' section, using default: %r.", PORT) ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}" else: logger.warning("No 'network' section in config file, using defaults: %r.", ADDRESS) if 'cameras' in cfg: global CAMERAS # These shenanigans below allow for a nested config simulation with configparser # and so per camera settings: cameras_dict = defaultdict(dict) for key, val in cfg.items('cameras'): try: cam, param = key.split('.', 1) if param in KNOWN_CAMERA_PARAMETERS: cameras_dict[cam][param] = val else: logger.warning("Unknown camera %r parameter: %r", cam, param) except ValueError as exc: logger.error("Invalid camera configuration entry: %r, error: %r, using defaults: %r.", key, exc, CAMERAS) # Check that each camera has at least an address and remove if not: for key in list(cameras_dict.keys()): if 'address' not in cameras_dict[key].keys(): logger.error("DEBUG: cameras_dict: %r", cameras_dict) logger.error("Camera %r has no address configured, removing from configuration.", key) del cameras_dict[key] if not cameras_dict: logger.warning("No valid camera configurations found in config file, using defaults: %r.", CAMERAS) else: # Only update global CAMERAS if we have at least one valid camera with an address: CAMERAS = cameras_dict logger.info("Using camera configuration: %r", CAMERAS) else: logger.warning("No 'cameras' section in config file, using defaults: %r.", CAMERAS) logger.info("CAMERAS configuration: %r", CAMERAS) with open(conf_file, 'w') as configfile: cfg.write(configfile) return cfg def signal_handler(sig, frame): logger.info("Received signal handler '%r', stopping client ...", sig) client.stop() sys.exit(0) if __name__ == "__main__": signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Read configuration file before starting logging as it may contain logging settings: cfg = read_config(CONFIG_FILE) logger.info("Starting up ...") client = ClientTask("client_task") client.start() client.join() logger.info("Terminating ...")