aboutsummaryrefslogtreecommitdiff
path: root/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'client.py')
-rw-r--r--client.py412
1 files changed, 341 insertions, 71 deletions
diff --git a/client.py b/client.py
index e9dce4f..00e97e4 100644
--- a/client.py
+++ b/client.py
@@ -6,28 +6,51 @@ A module containing client for streaming video to a zmq router.
__author__ = "Franoosh Corporation"
+import sys
from os import path
from threading import Thread, Event
-from queue import Queue
+from queue import Queue, Empty
+from collections import deque
import time
import datetime
import signal
import logging
from configparser import ConfigParser
-import traceback
import zmq
+import cv2
+from collections import defaultdict
-from helpers import CustomLoggingFormatter
+from helpers import (
+ CustomLoggingFormatter,
+ compute_contours,
+ detect_movement,
+ draw_contours,
+ )
-ROUTER_ADDRESS = "tcp://localhost"
-PORT = "5569"
-ADDRESS = f"{ROUTER_ADDRESS}:{PORT}"
+###################################################
+# Configuration, defaults and logging setup
+###################################################
CONFIG_FILE = "client.cfg"
-LOGFILE = f"{path.splitext(path.basename(__file__))[0]}.log"
-LOGLEVEL = logging.DEBUG
-CAMERAS = {'front_camera': '/dev/video0', 'back_camera': '/dev/video1'}
-SLEEP_TIME = 5
+# Those will be updated from config file if present:
+LOGDIR = '.'
+LOGFILE = path.join(LOGDIR, 'client.log')
+LOGLEVEL = logging.INFO
+
+ROUTER_ADDRESS = "localhost"
+PORT = "5569"
+ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}"
+# This is a default:
+CAMERAS = {
+ 'default camera': {
+ 'address': '/dev/video0',
+ },
+}
+KNOWN_CAMERA_PARAMETERS = ('address', 'contour_size_threshold', 'movement_grace_period')
+CONTOUR_SIZE_THRESHOLD = 4000 # Minimum contour area to consider
+MOVEMENT_GRACE_PERIOD = 10 # Seconds to wait after movement stops
+TIME_FORMAT = '%Y_%m_%d-%H_%M_%S'
+START_STOP_MESSAGE_FMT = "{action}:{data}"
stop_event = Event()
@@ -40,57 +63,115 @@ logging.root.setLevel(LOGLEVEL)
logger = logging.getLogger(__name__)
logging.basicConfig(
filename=LOGFILE,
- datefmt='%Y-%m-%d %I:%M:%S',
+ datefmt=TIME_FORMAT,
level=LOGLEVEL,
)
-
-def read_config(conf_file):
- """Read config file and return as dictionary."""
- cfg = ConfigParser()
- cfg.read(conf_file)
-
- return {key: dict(cfg.items(key)) for key in cfg.sections()}
+###################################################
+# End configuration, defaults and logging setup
+###################################################
class ClientVideo(Thread):
"""Class for sending video stream"""
- def __init__(self, _id, device, queue):
+ def __init__(self, _id, device_dict, queue):
super().__init__(daemon=True)
self.identity = _id
- self.device = device
+ self.device_dict = device_dict
+ self.device_addr = device_dict.get('address', '/dev/video0')
+ try:
+ # Could move parameter validation to config reading function:
+ self.device_threshold = int(device_dict['contour_size_threshold'])
+ except (ValueError, KeyError) as exc:
+ logger.error("ClientVideo '%s': Invalid contour size threshold: %r, using default: %d", self.identity, exc, CONTOUR_SIZE_THRESHOLD)
+ self.device_threshold = CONTOUR_SIZE_THRESHOLD
+ try:
+ self.device_grace_period = int(device_dict['movement_grace_period'])
+ except (ValueError, KeyError) as exc:
+ logger.error("ClientVideo '%s': Invalid movement grace period: %r, using default: %d", self.identity, exc, MOVEMENT_GRACE_PERIOD)
+ self.device_grace_period = MOVEMENT_GRACE_PERIOD
self.queue = queue
+ self.frame_deque = deque(maxlen=2) # Store last 2 frames
self.live = True
+ def put_metadata_message(self, action, data):
+ """Put metadata message to queue."""
+ metadata = START_STOP_MESSAGE_FMT.format(
+ action=action,
+ data=data,
+ )
+ logger.info("ClientVideo '%s': %s at: %r", self.identity, action, data)
+ message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b'']
+ self.queue.put(message)
+
+ def put_video_message(self, frame):
+ # Encode frame as JPEG:
+ _, buffer = cv2.imencode('.jpg', frame)
+ # Add identity to message:
+ message = [self.identity.encode('utf-8'), buffer.tobytes()]
+ # Put message to queue:
+ self.queue.put(message)
+
def run(self):
- """Replace with actual video streaming logic."""
- ping_no = 0
+ """Video streaming logic."""
+ logger.debug("ClientVideo '%s' starting ...", self.identity)
+ cap = cv2.VideoCapture(self.device_addr)
+ if not cap.isOpened():
+ logger.error("ClientVideo '%s': Unable to open video device '%s'", self.identity, self.device_addr)
+ return
+
+ movement_detected = False
+ movement_detected_start = None
while self.live and not stop_event.is_set():
- try:
- # Four parts required to start/stop video stream, three parts for ping:
- if ping_no == 0: # Start video stream
- timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
- metadata = f"start:{timestamp}"
- logger.debug("ClientVideo '%s' sending metadata: '%s'", self.identity, metadata)
- message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b'']
- self.queue.put(message)
- text = f"ping-{ping_no}"
- message = [self.identity.encode('utf-8'), text.encode('utf-8')]
- elif ping_no >= 5: # Stop video stream
- logger.debug("ClientVideo '%s' sending stop signal", self.identity)
- timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
- metadata = f"stop:{timestamp}"
- logger.debug("ClientVideo '%s' sending metadata: '%s' and ping: %r", self.identity, metadata, b'')
- message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b'']
- self.live = False
- else: # Send ping
- text = f"ping-{ping_no}"
- message = [self.identity.encode('utf-8'), text.encode('utf-8')]
- logger.debug("ClientVideo '%s' sending message: %r", self.identity, message)
- self.queue.put(message)
- ping_no += 1
- except Exception as exc:
- logger.error("ClientVideo: socket error: %r", exc)
- time.sleep(SLEEP_TIME)
+ ret, frame = cap.read()
+ if not ret:
+ logger.error("ClientVideo '%s': Failed to read frame from device '%s'", self.identity, self.device_addr)
+ break
+ self.frame_deque.append(frame)
+ # Skip on the first frame:
+ if len(self.frame_deque) >= 2:
+ contours = compute_contours(self.frame_deque)
+ logger.debug("ClientVideo '%s': Found %d contours.", self.identity, len(contours))
+ logger.debug("ClientVideo '%s': Contours: %r", self.identity, contours)
+ movement_now = detect_movement(contours, min_area=self.device_threshold)
+ if movement_now:
+ logger.debug("ClientVideo '%s': Movement detected in frame.", self.identity)
+ # Only update movement start time if movement was not detected before:
+ if not movement_detected:
+ # Update movement detected start time:
+ movement_detected_start = datetime.datetime.now()
+ # Only send start message if movement was not detected before:
+ timestamp = movement_detected_start.strftime(TIME_FORMAT)
+ self.put_metadata_message(
+ action='start',
+ data=timestamp,
+ )
+ # and set movement detected flag:
+ movement_detected = True
+ else:
+ logger.debug("ClientVideo '%s': No movement detected in frame.", self.identity)
+
+ if movement_detected:
+ # Draw contours on frame:
+ frame = draw_contours(frame, contours, min_contour_area=self.device_threshold)
+ # Send video message with contour and frame:
+ self.put_video_message(frame)
+
+ # Check if movement has stopped, taking into account grace period:
+ now = datetime.datetime.now()
+ delta_seconds = (now - movement_detected_start).total_seconds()
+ logger.debug("ClientVideo '%s': delta seconds since movement detected: %d, grace remaining: %d.", self.identity, delta_seconds, self.device_grace_period - delta_seconds)
+ # Wait seconds before deciding movement has stopped:
+ if delta_seconds > self.device_grace_period:
+ timestamp = now.strftime(TIME_FORMAT)
+ self.put_metadata_message(
+ action='stop',
+ data=timestamp,
+ )
+ movement_detected = False
+ movement_detected_start = None
+
+ else:
+ logger.debug("ClientVideo '%s': Frame deque length: %d", self.identity, len(self.frame_deque))
logger.info("ClientVideo '%s' closing socket ...", self.identity)
@@ -98,7 +179,6 @@ class ClientVideo(Thread):
logger.info("Client video '%s' exiting ...", self.identity)
self.live = False
-
class ClientTask(Thread):
"""Main Client Task with logic,
message handling and setup."""
@@ -111,35 +191,152 @@ class ClientTask(Thread):
self.socket.identity = self.identity.encode("utf-8")
self.poll = zmq.Poller()
self.poll.register(self.socket, zmq.POLLIN)
- self.video_threads = []
+ # self.video_threads = []
+ self.video_threads = {}
self.running = True
self.connected = False
+ self.queue = Queue()
- def run(self):
- self.socket.connect(ADDRESS)
- logger.debug("Client '%s' connected to %s", self.identity, ADDRESS)
- logger.debug("starting video threads ...")
- q = Queue()
- for _id, device in CAMERAS.items():
- vid_cl = ClientVideo(_id, device, q)
- vid_cl.start()
- self.video_threads.append(vid_cl)
+ def receive_messages(self):
+ """Receive messages from the server."""
while self.running:
try:
sockets = dict(self.poll.poll(1000))
if self.socket in sockets:
try:
message = self.socket.recv_multipart()
- logger.debug("Client '%s' received message: %r", self.identity, message)
- except Exception:
- logger.error("Failed to receive message: %r", traceback.format_exc())
- if q.qsize() > 0:
- frame = q.get()
- if frame is not None:
- logger.debug("Processing frame: %r", frame)
- self.socket.send_multipart(frame)
+ logger.info("Client '%s' received message: %r.", self.identity, message)
+ self.update_config(message)
+ except Exception as exc:
+ logger.error("Failed to receive message: %r", exc)
except zmq.ZMQError as exc:
logger.error("Client '%s': socket error: %r", self.identity, exc)
+ time.sleep(0.01) # Sleep to avoid busy waiting
+
+ def send_messages(self):
+ """Send messages to the server."""
+ while self.running:
+ try:
+ frame = self.queue.get(timeout=0.1)
+ logger.debug("Sending message of length %r ...", len(frame))
+ try:
+ self.socket.send_multipart(frame)
+ except zmq.ZMQError as exc:
+ logger.error("Client '%s': socket error: %r", self.identity, exc)
+ except Empty:
+ continue # No message to send, continue waiting
+
+ def update_config(self, message):
+ """
+ Update configuration based on message from server.
+ Only 'cameras' section can be updated.
+ directives = (
+ 'modify_camera_name',
+ 'modify_camera_threshold',
+ 'modify_camera_grace_pd',
+ 'modify_camera_address',
+ 'add_camera',
+ 'remove_camera',
+ )
+ """
+ try:
+ msg_list = [part.decode('utf-8') for part in message]
+ logger.debug("Client received config message: %r", msg_list)
+ camera_id, directive, value = msg_list[0], msg_list[1], msg_list[2]
+ if directive in (
+ 'modify_camera_name',
+ 'modify_camera_threshold',
+ 'modify_camera_grace_pd',
+ 'modify_camera_address',
+ ):
+ if camera_id not in CAMERAS:
+ logger.warning("Cannot rename unknown camera: %r", camera_id)
+ logger.warning("Known cameras: %r", list(CAMERAS.keys()))
+ else:
+ if directive == 'modify_camera_name':
+ old_name, new_name = camera_id, value
+ if new_name in CAMERAS:
+ raise ValueError("New camera name already exists.")
+ CAMERAS[new_name] = CAMERAS.pop(old_name)
+ self.video_threads[new_name] = self.video_threads.pop(old_name)
+ # Send message to server to update routing:
+ timestamp = datetime.datetime.now().strftime(TIME_FORMAT)
+ self.video_threads[new_name].put_metadata_message(
+ action='rename',
+ data=':'.join([old_name, new_name, timestamp])
+ )
+ self.send_messages
+ for item in cfg.items('cameras'):
+ if item[0].startswith(old_name + '.'):
+ param = item[0].split('.', 1)[1]
+ cfg.set('cameras', f"{new_name}.{param}", item[1])
+ cfg.remove_option('cameras', item[0])
+ logger.info("Renamed and restarted thread for camera '%s' to '%s'.", old_name, new_name)
+ elif directive == 'modify_camera_threshold':
+ new_threshold = int(value)
+ if new_threshold <= 0:
+ raise ValueError("Threshold must be positive.")
+ CAMERAS[camera_id]['contour_size_threshold'] = new_threshold
+ self.video_threads[camera_id].device_threshold = new_threshold
+ cfg.set('cameras', f"{camera_id}.contour_size_threshold", str(new_threshold))
+ logger.info("Modified contour size threshold of camera '%s' to %d.", camera_id, new_threshold)
+ elif directive == 'modify_camera_grace_pd':
+ new_grace_pd = int(value)
+ if new_grace_pd < 0:
+ raise ValueError("Grace period cannot be negative.")
+ CAMERAS[camera_id]['movement_grace_period'] = new_grace_pd
+ self.video_threads[camera_id].device_grace_period = new_grace_pd
+ cfg.set('cameras', f"{camera_id}.movement_grace_period", str(new_grace_pd))
+ logger.info("Modified movement grace period of camera '%s' to %d.", camera_id, new_grace_pd)
+ elif directive == 'modify_camera_address':
+ new_address = value
+ CAMERAS[camera_id]['address'] = new_address
+ self.video_threads[camera_id].device_addr = new_address
+ # Changing address on the fly requires restarting the video thread:
+ self.video_threads[camera_id].stop()
+ self.video_threads[camera_id].join()
+ vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue)
+ vid_cl.start()
+ self.video_threads[camera_id] = vid_cl
+ logger.info("Modified address of camera '%s' to '%s'.", camera_id, new_address)
+ cfg.set('cameras', f"{camera_id}.address", new_address)
+ elif directive == 'add_camera':
+ cam_address = value
+ if camera_id in CAMERAS:
+ raise ValueError("Camera name already exists.")
+ CAMERAS[camera_id] = {'address': cam_address}
+ vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue)
+ vid_cl.start()
+ self.video_threads[camera_id] = vid_cl
+ cfg.set('cameras', f"{camera_id}.address", cam_address)
+ logger.info("Added and started a thread for new camera '%s' with address '%s'.", camera_id, cam_address)
+ else:
+ logger.warning("Unknown config directive: %r", directive)
+
+ except (ValueError, IndexError) as exc:
+ logger.error("Failed to handle config message: %r", exc)
+ cfg.write(open(CONFIG_FILE, 'w'))
+
+ def run(self):
+ """Main client logic."""
+ self.socket.connect(ADDRESS)
+ logger.debug("Client '%s' connected to %s", self.identity, ADDRESS)
+ logger.debug("Starting video threads ...")
+ for _id, device_dict in CAMERAS.items():
+ vid_cl = ClientVideo(_id, device_dict, self.queue)
+ vid_cl.start()
+ self.video_threads[_id] = (vid_cl)
+
+ recv_thread = Thread(target=self.receive_messages, daemon=True)
+ send_thread = Thread(target=self.send_messages, daemon=True)
+
+ recv_thread.start()
+ send_thread.start()
+ logger.debug("Client '%s' started receiving and sending threads.", self.identity)
+
+ logger.debug("Client '%s' waiting for threads to finish ...", self.identity)
+ recv_thread.join()
+ send_thread.join()
logger.info("Closing socket ...")
self.socket.close()
@@ -149,18 +346,86 @@ class ClientTask(Thread):
def stop(self):
"""Stop task"""
logger.info("ClientTask cleaning up ...")
- for thread in self.video_threads:
- logger.info("Cleaning u video thread ...")
+ for _id, thread in self.video_threads.items():
+ logger.info("Cleaning video thread %s ...", _id)
thread.stop()
thread.join()
- self.video_threads = []
+ # self.video_threads = []
+ self.video_threads = {}
logger.info("Client task exiting ...")
self.running = False
+def read_config(conf_file):
+ """
+ Read config file and return as dictionary.
+ The only required section is 'cameras' with at least one camera.
+ Do basic sanity checks and update global variables.
+ Log warnings if config file or sections/options are missing.
+ """
+ cfg = ConfigParser()
+ cfg.read(conf_file)
+ # Need to update logging info:
+ if 'logging' in cfg:
+ global LOGDIR, LOGFILE, LOGLEVEL
+
+ LOGDIR = cfg.get('logging', 'logdir', fallback='.')
+ LOGFILE = cfg.get('logging', 'logfile', fallback=LOGFILE)
+ LOGLEVEL = cfg.get('logging', 'loglevel', fallback=LOGLEVEL)
+ logger.setLevel(LOGLEVEL)
+ for _handler in logger.handlers:
+ _handler.setLevel(LOGLEVEL)
+ if 'network' in cfg:
+ global ROUTER_ADDRESS, PORT, ADDRESS
+ if cfg.has_option('network', 'router_address'):
+ ROUTER_ADDRESS = cfg.get('network', 'router_address')
+ else:
+ logger.warning("No 'router_address' option in 'network' section, using default: %r.", ROUTER_ADDRESS)
+ if cfg.has_option('network', 'router_port'):
+ PORT = cfg.get('network', 'router_port')
+ else:
+ logger.warning("No 'router_port' option in 'network' section, using default: %r.", PORT)
+ ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}"
+ else:
+ logger.warning("No 'network' section in config file, using defaults: %r.", ADDRESS)
+ if 'cameras' in cfg:
+ global CAMERAS
+ # These shenanigans below allow for a nested config simulation with configparser
+ # and so per camera settings:
+ cameras_dict = defaultdict(dict)
+ for key, val in cfg.items('cameras'):
+ try:
+ cam, param = key.split('.', 1)
+ if param in KNOWN_CAMERA_PARAMETERS:
+ cameras_dict[cam][param] = val
+ else:
+ logger.warning("Unknown camera %r parameter: %r", cam, param)
+ except ValueError as exc:
+ logger.error("Invalid camera configuration entry: %r, error: %r, using defaults: %r.", key, exc, CAMERAS)
+ # Check that each camera has at least an address and remove if not:
+ for key in list(cameras_dict.keys()):
+ if 'address' not in cameras_dict[key].keys():
+ logger.error("DEBUG: cameras_dict: %r", cameras_dict)
+ logger.error("Camera %r has no address configured, removing from configuration.", key)
+ del cameras_dict[key]
+ if not cameras_dict:
+ logger.warning("No valid camera configurations found in config file, using defaults: %r.", CAMERAS)
+ else:
+ # Only update global CAMERAS if we have at least one valid camera with an address:
+ CAMERAS = cameras_dict
+ logger.info("Using camera configuration: %r", CAMERAS)
+ else:
+ logger.warning("No 'cameras' section in config file, using defaults: %r.", CAMERAS)
+ logger.info("CAMERAS configuration: %r", CAMERAS)
+ with open(conf_file, 'w') as configfile:
+ cfg.write(configfile)
+
+ return cfg
+
def signal_handler(sig, frame):
logger.info("Received signal handler '%r', stopping client ...", sig)
client.stop()
+ sys.exit(0)
if __name__ == "__main__":
@@ -168,9 +433,14 @@ if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
+ # Read configuration file before starting logging as it may contain logging settings:
+ cfg = read_config(CONFIG_FILE)
+
logger.info("Starting up ...")
+
client = ClientTask("client_task")
client.start()
-
client.join()
+
+ logger.info("Terminating ...") \ No newline at end of file