diff options
| author | Franoosh <uinarf@autistici.org> | 2025-10-15 14:42:29 +0200 |
|---|---|---|
| committer | Franoosh <uinarf@autistici.org> | 2025-10-15 14:42:29 +0200 |
| commit | 70beed73465ab27449a59c62043d94c16efe00c5 (patch) | |
| tree | 9f4b5d6a72711af4641b01169da5d20a2228ef64 /helpers.py | |
| parent | 68bd1bd052a7cd6438b92cb1059ef5e58b8d022c (diff) | |
| download | ZeroMQ_Video_Streaming-added_video.tar.gz ZeroMQ_Video_Streaming-added_video.tar.bz2 ZeroMQ_Video_Streaming-added_video.zip | |
Added camera support. Movement recognition and video streaming. Web server and frontend. Work in progress. To be fixed: frontend reloading information about client and a page after major changes like camera name or address change, camera removal. Add certificates before testing on actual distributed hardware. Add user login logic.added_video
Diffstat (limited to 'helpers.py')
| -rw-r--r-- | helpers.py | 63 |
1 files changed, 63 insertions, 0 deletions
@@ -8,6 +8,8 @@ __author__ = "Franoosh Corporation" import logging +import subprocess +import cv2 class CustomLoggingFormatter(logging.Formatter): @@ -42,3 +44,64 @@ class CustomLoggingFormatter(logging.Formatter): return result +def process_frame(frame): + """Process frame for contour detection.""" + # Convert to grayscale: + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + # Apply Gaussian blur: + blurred = cv2.GaussianBlur(gray, (21, 21), 0) + + return blurred + +def compute_contours(frame_deque): + """Compute contours from a deque of frames.""" + contours = [] + if len(frame_deque) < 2: + return contours + all_contours = [] + + for idx, frame in enumerate(frame_deque): + frame_0 = process_frame(frame) + try: + frame_1 = process_frame(frame_deque[idx+1]) + except IndexError: + break + frame_delta = cv2.absdiff(frame_0, frame_1) + threshold = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1] + threshold = cv2.dilate(threshold, None, iterations=2) + contours, _ = cv2.findContours(threshold.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + all_contours.extend(contours) + + return all_contours + +def draw_contours(frame, contours, min_contour_area=500): + """Draw contours on the frame.""" + for contour in contours: + if cv2.contourArea(contour) > min_contour_area: + (x, y, w, h) = cv2.boundingRect(contour) + cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) + + return frame + +def detect_movement(contours, min_area=500): + """Detect movement based on contours found from frame diff.""" + for contour in contours: + if cv2.contourArea(contour) >= min_area: + return True + return False + +def get_available_cameras(): + """ + Get list of available camera devices. + At the moment it does not work. At all. It is useless. + """ + proc = subprocess.Popen(['v4l2-ctl', '--list-devices'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = proc.communicate() + candidate_devices = [i.strip() for i in stdout.decode('utf-8').strip().splitlines()[1:]] + verified_devices = [] + for device in candidate_devices: + cap = cv2.VideoCapture(device) + if cap.isOpened(): + verified_devices.append(device) + cap.release() + return verified_devices
\ No newline at end of file |
