#!/usr/bin/env python """ Helper functions for zmq video messaging. """ __author__ = "Franoosh Corporation" import logging import subprocess import cv2 class CustomLoggingFormatter(logging.Formatter): """Custom logging formatter""" debug_fmt = 'DEBUG: %(filename)s:%(lineno)d %(asctime)s %(message)s' info_fmt = 'INFO: %(asctime)s %(message)s' warning_fmt = 'WARNING: %(asctime)s %(message)s' error_fmt = 'ERROR: %(asctime)s %(message)s' critical_fmt = 'CRITICAL: %(asctime)s %(message)s' def __init__(self): super().__init__( fmt="%(levelno)d: %s(asctime)s %(message)s", datefmt=None, ) def format(self, record): orig_fmt = self._style._fmt if record.levelno == logging.DEBUG: self._style._fmt = CustomLoggingFormatter.debug_fmt elif record.levelno == logging.INFO: self._style._fmt = CustomLoggingFormatter.info_fmt elif record.levelno == logging.WARNING: self._style._fmt = CustomLoggingFormatter.warning_fmt elif record.levelno == logging.ERROR: self._style._fmt = CustomLoggingFormatter.error_fmt elif record.levelno == logging.CRITICAL: self._style._fmt = CustomLoggingFormatter.critical_fmt result = logging.Formatter.format(self, record) self._style._fmt = orig_fmt return result def process_frame(frame): """Process frame for contour detection.""" # Convert to grayscale: gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # Apply Gaussian blur: blurred = cv2.GaussianBlur(gray, (21, 21), 0) return blurred def compute_contours(frame_deque): """Compute contours from a deque of frames.""" contours = [] if len(frame_deque) < 2: return contours all_contours = [] for idx, frame in enumerate(frame_deque): frame_0 = process_frame(frame) try: frame_1 = process_frame(frame_deque[idx+1]) except IndexError: break frame_delta = cv2.absdiff(frame_0, frame_1) threshold = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1] threshold = cv2.dilate(threshold, None, iterations=2) contours, _ = cv2.findContours(threshold.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) all_contours.extend(contours) return all_contours def draw_contours(frame, contours, min_contour_area=500): """Draw contours on the frame.""" for contour in contours: if cv2.contourArea(contour) > min_contour_area: (x, y, w, h) = cv2.boundingRect(contour) cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) return frame def detect_movement(contours, min_area=500): """Detect movement based on contours found from frame diff.""" for contour in contours: if cv2.contourArea(contour) >= min_area: return True return False def get_available_cameras(): """ Get list of available camera devices. At the moment it does not work. At all. It is useless. """ proc = subprocess.Popen(['v4l2-ctl', '--list-devices'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = proc.communicate() candidate_devices = [i.strip() for i in stdout.decode('utf-8').strip().splitlines()[1:]] verified_devices = [] for device in candidate_devices: cap = cv2.VideoCapture(device) if cap.isOpened(): verified_devices.append(device) cap.release() return verified_devices