diff options
Diffstat (limited to 'client.py')
| -rw-r--r-- | client.py | 412 |
1 files changed, 341 insertions, 71 deletions
@@ -6,28 +6,51 @@ A module containing client for streaming video to a zmq router. __author__ = "Franoosh Corporation" +import sys from os import path from threading import Thread, Event -from queue import Queue +from queue import Queue, Empty +from collections import deque import time import datetime import signal import logging from configparser import ConfigParser -import traceback import zmq +import cv2 +from collections import defaultdict -from helpers import CustomLoggingFormatter +from helpers import ( + CustomLoggingFormatter, + compute_contours, + detect_movement, + draw_contours, + ) -ROUTER_ADDRESS = "tcp://localhost" -PORT = "5569" -ADDRESS = f"{ROUTER_ADDRESS}:{PORT}" +################################################### +# Configuration, defaults and logging setup +################################################### CONFIG_FILE = "client.cfg" -LOGFILE = f"{path.splitext(path.basename(__file__))[0]}.log" -LOGLEVEL = logging.DEBUG -CAMERAS = {'front_camera': '/dev/video0', 'back_camera': '/dev/video1'} -SLEEP_TIME = 5 +# Those will be updated from config file if present: +LOGDIR = '.' +LOGFILE = path.join(LOGDIR, 'client.log') +LOGLEVEL = logging.INFO + +ROUTER_ADDRESS = "localhost" +PORT = "5569" +ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}" +# This is a default: +CAMERAS = { + 'default camera': { + 'address': '/dev/video0', + }, +} +KNOWN_CAMERA_PARAMETERS = ('address', 'contour_size_threshold', 'movement_grace_period') +CONTOUR_SIZE_THRESHOLD = 4000 # Minimum contour area to consider +MOVEMENT_GRACE_PERIOD = 10 # Seconds to wait after movement stops +TIME_FORMAT = '%Y_%m_%d-%H_%M_%S' +START_STOP_MESSAGE_FMT = "{action}:{data}" stop_event = Event() @@ -40,57 +63,115 @@ logging.root.setLevel(LOGLEVEL) logger = logging.getLogger(__name__) logging.basicConfig( filename=LOGFILE, - datefmt='%Y-%m-%d %I:%M:%S', + datefmt=TIME_FORMAT, level=LOGLEVEL, ) - -def read_config(conf_file): - """Read config file and return as dictionary.""" - cfg = ConfigParser() - cfg.read(conf_file) - - return {key: dict(cfg.items(key)) for key in cfg.sections()} +################################################### +# End configuration, defaults and logging setup +################################################### class ClientVideo(Thread): """Class for sending video stream""" - def __init__(self, _id, device, queue): + def __init__(self, _id, device_dict, queue): super().__init__(daemon=True) self.identity = _id - self.device = device + self.device_dict = device_dict + self.device_addr = device_dict.get('address', '/dev/video0') + try: + # Could move parameter validation to config reading function: + self.device_threshold = int(device_dict['contour_size_threshold']) + except (ValueError, KeyError) as exc: + logger.error("ClientVideo '%s': Invalid contour size threshold: %r, using default: %d", self.identity, exc, CONTOUR_SIZE_THRESHOLD) + self.device_threshold = CONTOUR_SIZE_THRESHOLD + try: + self.device_grace_period = int(device_dict['movement_grace_period']) + except (ValueError, KeyError) as exc: + logger.error("ClientVideo '%s': Invalid movement grace period: %r, using default: %d", self.identity, exc, MOVEMENT_GRACE_PERIOD) + self.device_grace_period = MOVEMENT_GRACE_PERIOD self.queue = queue + self.frame_deque = deque(maxlen=2) # Store last 2 frames self.live = True + def put_metadata_message(self, action, data): + """Put metadata message to queue.""" + metadata = START_STOP_MESSAGE_FMT.format( + action=action, + data=data, + ) + logger.info("ClientVideo '%s': %s at: %r", self.identity, action, data) + message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] + self.queue.put(message) + + def put_video_message(self, frame): + # Encode frame as JPEG: + _, buffer = cv2.imencode('.jpg', frame) + # Add identity to message: + message = [self.identity.encode('utf-8'), buffer.tobytes()] + # Put message to queue: + self.queue.put(message) + def run(self): - """Replace with actual video streaming logic.""" - ping_no = 0 + """Video streaming logic.""" + logger.debug("ClientVideo '%s' starting ...", self.identity) + cap = cv2.VideoCapture(self.device_addr) + if not cap.isOpened(): + logger.error("ClientVideo '%s': Unable to open video device '%s'", self.identity, self.device_addr) + return + + movement_detected = False + movement_detected_start = None while self.live and not stop_event.is_set(): - try: - # Four parts required to start/stop video stream, three parts for ping: - if ping_no == 0: # Start video stream - timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") - metadata = f"start:{timestamp}" - logger.debug("ClientVideo '%s' sending metadata: '%s'", self.identity, metadata) - message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] - self.queue.put(message) - text = f"ping-{ping_no}" - message = [self.identity.encode('utf-8'), text.encode('utf-8')] - elif ping_no >= 5: # Stop video stream - logger.debug("ClientVideo '%s' sending stop signal", self.identity) - timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") - metadata = f"stop:{timestamp}" - logger.debug("ClientVideo '%s' sending metadata: '%s' and ping: %r", self.identity, metadata, b'') - message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] - self.live = False - else: # Send ping - text = f"ping-{ping_no}" - message = [self.identity.encode('utf-8'), text.encode('utf-8')] - logger.debug("ClientVideo '%s' sending message: %r", self.identity, message) - self.queue.put(message) - ping_no += 1 - except Exception as exc: - logger.error("ClientVideo: socket error: %r", exc) - time.sleep(SLEEP_TIME) + ret, frame = cap.read() + if not ret: + logger.error("ClientVideo '%s': Failed to read frame from device '%s'", self.identity, self.device_addr) + break + self.frame_deque.append(frame) + # Skip on the first frame: + if len(self.frame_deque) >= 2: + contours = compute_contours(self.frame_deque) + logger.debug("ClientVideo '%s': Found %d contours.", self.identity, len(contours)) + logger.debug("ClientVideo '%s': Contours: %r", self.identity, contours) + movement_now = detect_movement(contours, min_area=self.device_threshold) + if movement_now: + logger.debug("ClientVideo '%s': Movement detected in frame.", self.identity) + # Only update movement start time if movement was not detected before: + if not movement_detected: + # Update movement detected start time: + movement_detected_start = datetime.datetime.now() + # Only send start message if movement was not detected before: + timestamp = movement_detected_start.strftime(TIME_FORMAT) + self.put_metadata_message( + action='start', + data=timestamp, + ) + # and set movement detected flag: + movement_detected = True + else: + logger.debug("ClientVideo '%s': No movement detected in frame.", self.identity) + + if movement_detected: + # Draw contours on frame: + frame = draw_contours(frame, contours, min_contour_area=self.device_threshold) + # Send video message with contour and frame: + self.put_video_message(frame) + + # Check if movement has stopped, taking into account grace period: + now = datetime.datetime.now() + delta_seconds = (now - movement_detected_start).total_seconds() + logger.debug("ClientVideo '%s': delta seconds since movement detected: %d, grace remaining: %d.", self.identity, delta_seconds, self.device_grace_period - delta_seconds) + # Wait seconds before deciding movement has stopped: + if delta_seconds > self.device_grace_period: + timestamp = now.strftime(TIME_FORMAT) + self.put_metadata_message( + action='stop', + data=timestamp, + ) + movement_detected = False + movement_detected_start = None + + else: + logger.debug("ClientVideo '%s': Frame deque length: %d", self.identity, len(self.frame_deque)) logger.info("ClientVideo '%s' closing socket ...", self.identity) @@ -98,7 +179,6 @@ class ClientVideo(Thread): logger.info("Client video '%s' exiting ...", self.identity) self.live = False - class ClientTask(Thread): """Main Client Task with logic, message handling and setup.""" @@ -111,35 +191,152 @@ class ClientTask(Thread): self.socket.identity = self.identity.encode("utf-8") self.poll = zmq.Poller() self.poll.register(self.socket, zmq.POLLIN) - self.video_threads = [] + # self.video_threads = [] + self.video_threads = {} self.running = True self.connected = False + self.queue = Queue() - def run(self): - self.socket.connect(ADDRESS) - logger.debug("Client '%s' connected to %s", self.identity, ADDRESS) - logger.debug("starting video threads ...") - q = Queue() - for _id, device in CAMERAS.items(): - vid_cl = ClientVideo(_id, device, q) - vid_cl.start() - self.video_threads.append(vid_cl) + def receive_messages(self): + """Receive messages from the server.""" while self.running: try: sockets = dict(self.poll.poll(1000)) if self.socket in sockets: try: message = self.socket.recv_multipart() - logger.debug("Client '%s' received message: %r", self.identity, message) - except Exception: - logger.error("Failed to receive message: %r", traceback.format_exc()) - if q.qsize() > 0: - frame = q.get() - if frame is not None: - logger.debug("Processing frame: %r", frame) - self.socket.send_multipart(frame) + logger.info("Client '%s' received message: %r.", self.identity, message) + self.update_config(message) + except Exception as exc: + logger.error("Failed to receive message: %r", exc) except zmq.ZMQError as exc: logger.error("Client '%s': socket error: %r", self.identity, exc) + time.sleep(0.01) # Sleep to avoid busy waiting + + def send_messages(self): + """Send messages to the server.""" + while self.running: + try: + frame = self.queue.get(timeout=0.1) + logger.debug("Sending message of length %r ...", len(frame)) + try: + self.socket.send_multipart(frame) + except zmq.ZMQError as exc: + logger.error("Client '%s': socket error: %r", self.identity, exc) + except Empty: + continue # No message to send, continue waiting + + def update_config(self, message): + """ + Update configuration based on message from server. + Only 'cameras' section can be updated. + directives = ( + 'modify_camera_name', + 'modify_camera_threshold', + 'modify_camera_grace_pd', + 'modify_camera_address', + 'add_camera', + 'remove_camera', + ) + """ + try: + msg_list = [part.decode('utf-8') for part in message] + logger.debug("Client received config message: %r", msg_list) + camera_id, directive, value = msg_list[0], msg_list[1], msg_list[2] + if directive in ( + 'modify_camera_name', + 'modify_camera_threshold', + 'modify_camera_grace_pd', + 'modify_camera_address', + ): + if camera_id not in CAMERAS: + logger.warning("Cannot rename unknown camera: %r", camera_id) + logger.warning("Known cameras: %r", list(CAMERAS.keys())) + else: + if directive == 'modify_camera_name': + old_name, new_name = camera_id, value + if new_name in CAMERAS: + raise ValueError("New camera name already exists.") + CAMERAS[new_name] = CAMERAS.pop(old_name) + self.video_threads[new_name] = self.video_threads.pop(old_name) + # Send message to server to update routing: + timestamp = datetime.datetime.now().strftime(TIME_FORMAT) + self.video_threads[new_name].put_metadata_message( + action='rename', + data=':'.join([old_name, new_name, timestamp]) + ) + self.send_messages + for item in cfg.items('cameras'): + if item[0].startswith(old_name + '.'): + param = item[0].split('.', 1)[1] + cfg.set('cameras', f"{new_name}.{param}", item[1]) + cfg.remove_option('cameras', item[0]) + logger.info("Renamed and restarted thread for camera '%s' to '%s'.", old_name, new_name) + elif directive == 'modify_camera_threshold': + new_threshold = int(value) + if new_threshold <= 0: + raise ValueError("Threshold must be positive.") + CAMERAS[camera_id]['contour_size_threshold'] = new_threshold + self.video_threads[camera_id].device_threshold = new_threshold + cfg.set('cameras', f"{camera_id}.contour_size_threshold", str(new_threshold)) + logger.info("Modified contour size threshold of camera '%s' to %d.", camera_id, new_threshold) + elif directive == 'modify_camera_grace_pd': + new_grace_pd = int(value) + if new_grace_pd < 0: + raise ValueError("Grace period cannot be negative.") + CAMERAS[camera_id]['movement_grace_period'] = new_grace_pd + self.video_threads[camera_id].device_grace_period = new_grace_pd + cfg.set('cameras', f"{camera_id}.movement_grace_period", str(new_grace_pd)) + logger.info("Modified movement grace period of camera '%s' to %d.", camera_id, new_grace_pd) + elif directive == 'modify_camera_address': + new_address = value + CAMERAS[camera_id]['address'] = new_address + self.video_threads[camera_id].device_addr = new_address + # Changing address on the fly requires restarting the video thread: + self.video_threads[camera_id].stop() + self.video_threads[camera_id].join() + vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue) + vid_cl.start() + self.video_threads[camera_id] = vid_cl + logger.info("Modified address of camera '%s' to '%s'.", camera_id, new_address) + cfg.set('cameras', f"{camera_id}.address", new_address) + elif directive == 'add_camera': + cam_address = value + if camera_id in CAMERAS: + raise ValueError("Camera name already exists.") + CAMERAS[camera_id] = {'address': cam_address} + vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue) + vid_cl.start() + self.video_threads[camera_id] = vid_cl + cfg.set('cameras', f"{camera_id}.address", cam_address) + logger.info("Added and started a thread for new camera '%s' with address '%s'.", camera_id, cam_address) + else: + logger.warning("Unknown config directive: %r", directive) + + except (ValueError, IndexError) as exc: + logger.error("Failed to handle config message: %r", exc) + cfg.write(open(CONFIG_FILE, 'w')) + + def run(self): + """Main client logic.""" + self.socket.connect(ADDRESS) + logger.debug("Client '%s' connected to %s", self.identity, ADDRESS) + logger.debug("Starting video threads ...") + for _id, device_dict in CAMERAS.items(): + vid_cl = ClientVideo(_id, device_dict, self.queue) + vid_cl.start() + self.video_threads[_id] = (vid_cl) + + recv_thread = Thread(target=self.receive_messages, daemon=True) + send_thread = Thread(target=self.send_messages, daemon=True) + + recv_thread.start() + send_thread.start() + logger.debug("Client '%s' started receiving and sending threads.", self.identity) + + logger.debug("Client '%s' waiting for threads to finish ...", self.identity) + recv_thread.join() + send_thread.join() logger.info("Closing socket ...") self.socket.close() @@ -149,18 +346,86 @@ class ClientTask(Thread): def stop(self): """Stop task""" logger.info("ClientTask cleaning up ...") - for thread in self.video_threads: - logger.info("Cleaning u video thread ...") + for _id, thread in self.video_threads.items(): + logger.info("Cleaning video thread %s ...", _id) thread.stop() thread.join() - self.video_threads = [] + # self.video_threads = [] + self.video_threads = {} logger.info("Client task exiting ...") self.running = False +def read_config(conf_file): + """ + Read config file and return as dictionary. + The only required section is 'cameras' with at least one camera. + Do basic sanity checks and update global variables. + Log warnings if config file or sections/options are missing. + """ + cfg = ConfigParser() + cfg.read(conf_file) + # Need to update logging info: + if 'logging' in cfg: + global LOGDIR, LOGFILE, LOGLEVEL + + LOGDIR = cfg.get('logging', 'logdir', fallback='.') + LOGFILE = cfg.get('logging', 'logfile', fallback=LOGFILE) + LOGLEVEL = cfg.get('logging', 'loglevel', fallback=LOGLEVEL) + logger.setLevel(LOGLEVEL) + for _handler in logger.handlers: + _handler.setLevel(LOGLEVEL) + if 'network' in cfg: + global ROUTER_ADDRESS, PORT, ADDRESS + if cfg.has_option('network', 'router_address'): + ROUTER_ADDRESS = cfg.get('network', 'router_address') + else: + logger.warning("No 'router_address' option in 'network' section, using default: %r.", ROUTER_ADDRESS) + if cfg.has_option('network', 'router_port'): + PORT = cfg.get('network', 'router_port') + else: + logger.warning("No 'router_port' option in 'network' section, using default: %r.", PORT) + ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}" + else: + logger.warning("No 'network' section in config file, using defaults: %r.", ADDRESS) + if 'cameras' in cfg: + global CAMERAS + # These shenanigans below allow for a nested config simulation with configparser + # and so per camera settings: + cameras_dict = defaultdict(dict) + for key, val in cfg.items('cameras'): + try: + cam, param = key.split('.', 1) + if param in KNOWN_CAMERA_PARAMETERS: + cameras_dict[cam][param] = val + else: + logger.warning("Unknown camera %r parameter: %r", cam, param) + except ValueError as exc: + logger.error("Invalid camera configuration entry: %r, error: %r, using defaults: %r.", key, exc, CAMERAS) + # Check that each camera has at least an address and remove if not: + for key in list(cameras_dict.keys()): + if 'address' not in cameras_dict[key].keys(): + logger.error("DEBUG: cameras_dict: %r", cameras_dict) + logger.error("Camera %r has no address configured, removing from configuration.", key) + del cameras_dict[key] + if not cameras_dict: + logger.warning("No valid camera configurations found in config file, using defaults: %r.", CAMERAS) + else: + # Only update global CAMERAS if we have at least one valid camera with an address: + CAMERAS = cameras_dict + logger.info("Using camera configuration: %r", CAMERAS) + else: + logger.warning("No 'cameras' section in config file, using defaults: %r.", CAMERAS) + logger.info("CAMERAS configuration: %r", CAMERAS) + with open(conf_file, 'w') as configfile: + cfg.write(configfile) + + return cfg + def signal_handler(sig, frame): logger.info("Received signal handler '%r', stopping client ...", sig) client.stop() + sys.exit(0) if __name__ == "__main__": @@ -168,9 +433,14 @@ if __name__ == "__main__": signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) + # Read configuration file before starting logging as it may contain logging settings: + cfg = read_config(CONFIG_FILE) + logger.info("Starting up ...") + client = ClientTask("client_task") client.start() - client.join() + + logger.info("Terminating ...")
\ No newline at end of file |
