aboutsummaryrefslogtreecommitdiff
path: root/worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'worker.py')
-rw-r--r--worker.py247
1 files changed, 183 insertions, 64 deletions
diff --git a/worker.py b/worker.py
index ad5fe1e..54d1696 100644
--- a/worker.py
+++ b/worker.py
@@ -1,29 +1,51 @@
#!/usr/bin/env python
-__author__ = "Franoosh Corporation"
"""
A module consisting of a zeromq worker receiving video stream
via router from clients. Able to send control messages to clients.
+Needs libzmq and pyzmq with 'drafts' support.
"""
import os
+import sys
from threading import Thread, Event
import time
import signal
import logging
from queue import Queue
from collections import defaultdict
+import tempfile
+import json
import zmq
+import cv2
+import numpy
from helpers import CustomLoggingFormatter
+__author__ = "Franoosh Corporation"
+
+# Various constants: # TODO: put them in a config
+
+HOST = "127.0.0.1"
+ZMQPORT = "9979"
+# WSPORT = "8000"
+WSPORT = "8008"
+ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}"
+WEB_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}"
+# LOGLEVEL = logging.DEBUG
+LOGLEVEL = logging.INFO
+
+# File paths:
-ADDR = "tcp://localhost"
-PORT = "9979"
-INITIAL_BACKEND_ADDR = f"{ADDR}:{PORT}"
LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log"
-LOGLEVEL = logging.DEBUG
+CWD = os.getcwd()
+TMP_DIR = os.path.join(CWD, "tmp")
+CLIENTS_JSON_FILE = os.path.join(CWD, 'clients.json')
+
+# Other:
+
+CLIENTS_DICT = {}
stop_event = Event()
@@ -39,6 +61,14 @@ logging.basicConfig(
level=LOGLEVEL,
)
+
+if not os.path.exists(TMP_DIR):
+ try:
+ os.makedirs(TMP_DIR)
+ except Exception as exc:
+ logger.error("Could not create temporary directory: %r", exc)
+ sys.exit()
+
class MonitorTask(Thread):
"""Monitor task"""
def __init__(self, socket):
@@ -92,14 +122,45 @@ class ServerWorker(Thread):
self.socket.identity = self.id.encode("utf-8")
self.monitor = MonitorTask(self.socket)
self.video_threads = defaultdict(lambda: defaultdict(dict))
+ self.poller = zmq.Poller()
+ self.web_sock = self.context.socket(zmq.DEALER)
self.connected = False
self.running = True
+ def start_client(self, client_id, camera_id, filename):
+ """
+ Start a video thread for new client_id and camera_id.
+ """
+ if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id): # New client or new camera
+ q = Queue()
+ logger.debug("Starting new video thread for client '%s', camera '%s'", client_id, camera_id)
+ video_worker = VideoWorker(client_id, camera_id, filename, q)
+ video_worker.start()
+ self.video_threads[client_id][camera_id] = video_worker
+
+ def stop_client(self, client_id, camera_id):
+ """
+ Stop video thread for a client_id and camera_id.
+ """
+ if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
+ logger.debug("Stopping video thread for client '%s', camera '%s'", client_id, camera_id)
+ self.video_threads[client_id][camera_id].stop() # Stop the thread
+ del self.video_threads[client_id][camera_id]
+ logger.info("Stopped video thread for client '%s', camera '%s'", client_id, camera_id)
+
def run(self):
+ """
+ Main loop of the worker.
+ Full of wonders.
+ """
logger.debug("ServerWorker '%s' starting ...", self.id)
- self.socket.connect(INITIAL_BACKEND_ADDR)
- self.poller = zmq.Poller()
+ self.socket.connect(ZMQ_BACKEND_ADDR)
+ try:
+ self.web_sock.bind(WEB_BACKEND_ADDR)
+ except Exception as exc:
+ logger.error("Connection to zmq websocket failed: %r", exc)
self.poller.register(self.socket, zmq.POLLIN)
+ self.poller.register(self.web_sock, zmq.POLLIN)
self.monitor.start()
while self.running:
@@ -112,67 +173,93 @@ class ServerWorker(Thread):
if self.socket in sockets:
self.connected = True
msg = self.socket.recv_multipart()
- logger.debug("ServerWorker '%s' received message: %r", self.id, msg)
+ logger.debug("ServerWorker '%s' received message of length: %d.", self.id, len(msg))
filename = None
- # At the moment we don't expect any other message than a video message:
+ # At the moment we don't expect any other message than a start/stop message (of length 4) and a video message:
if len(msg) == 4: # This is a message with start/stop directive and no video data.
- logger.debug("Received start/stop directive: %r", msg)
+ logger.debug("Received start/stop directive: (?)")
client_id, camera_id, metadata = msg[0], msg[1], msg[2]
+ # Convert bytes to str once and for all:
+ client_id = client_id.decode('utf-8')
+ camera_id = camera_id.decode('utf-8')
+ update_clients(client_id, camera_id)
try:
- directive, timestamp = metadata.decode("utf-8").split(":")
- filename = f"{client_id.decode('utf-8')}_{camera_id.decode('utf-8')}-{timestamp}.mp4"
- logger.debug("Directive: '%s', Timestamp: '%s', Filename: '%s'", directive, timestamp, filename)
+ # Directive and data are fron now on converted to strings:
+ directive, data = metadata.decode("utf-8").split(":")
+ logger.info(
+ "Received directive '%s' with data: %r for client '%s', camera '%s'",
+ directive,
+ data,
+ client_id,
+ camera_id,
+ )
except ValueError:
logger.error("Invalid metadata format: %r", metadata)
+ directive = None
continue
- if directive == "start":
- if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id): # New client or new camera
- q = Queue()
- logger.debug("Starting new video thread for client '%s', camera '%s'", client_id, camera_id)
- video_worker = VideoWorker(filename, q)
- video_worker.start()
- self.video_threads[client_id][camera_id] = video_worker
- elif directive == "stop":
- if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
- logger.debug("Stopping video thread for client '%s', camera '%s'", client_id, camera_id)
- self.video_threads[client_id][camera_id].queue.put(None) # Sentinel to stop the thread
- del self.video_threads[client_id][camera_id]
- logger.info("Stopped video thread for client '%s', camera '%s'", client_id, camera_id)
+ if directive == 'rename':
+ old_name, new_name, timestamp = None, None, None
+ try:
+ old_name, new_name, timestamp = data.split(":")
+ logger.info("Renamed video thread from '%s' to '%s'.", old_name, new_name)
+ except ValueError:
+ logger.error("Invalid rename data format: %r", data)
+ continue
+ if old_name and new_name and timestamp:
+ # I think it's better to stop the old thread and start a new one,
+ # rather than reuse the old one as it's less mucking about.
+ self.stop_client(old_name, camera_id)
+ self.start_client(new_name, camera_id, f"{new_name}_{camera_id}-{timestamp}.mkv")
+ to_remove= b':'.join([client_id, camera_id])
+ try:
+ self.web_sock.send_multipart([to_remove, b'', b'']) # Notify webserver of rename
+ except Exception as exc:
+ logger.error("Sending rename notification to websocket failed: %r", exc)
else:
- logger.error("Unknown directive: '%s'", directive)
- elif len(msg) == 3: # This is a video message with data only
- logger.debug("Received video message with data only: %r", msg)
- client_id, camera_id, msg = msg[0], msg[1], msg[2]
+ timestamp = data
+ filename = f"{client_id}_{camera_id}-{timestamp}.mkv"
+ logger.debug("Directive: '%s', Timestamp: '%s', Filename: '%s'", directive, timestamp, filename)
+ if directive == "start":
+ self.start_client(client_id, camera_id, filename)
+ elif directive == "stop":
+ self.stop_client(client_id, camera_id)
+ else:
+ logger.error("Unknown directive: %r", directive)
+
+ elif len(msg) == 3: # This is a video message with data
+ logger.debug("Received video message with data only.")
+ client_id, camera_id, content = msg[0], msg[1], msg[2]
+ client_id = client_id.decode('utf-8')
+ camera_id = camera_id.decode('utf-8')
if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
- self.video_threads[client_id][camera_id].queue.put(msg)
+ self.video_threads[client_id][camera_id].queue.put(content)
+ # Send only [client_id, camera_id, jpeg_bytes] to the webserver:
+ # zmq subsciber can subscribe to a topic defined by the first
+ # part of the multipart message, so in order to allow for a
+ # per camera subscription, we need to join client_id and camera_id
+ topic = ':'.join([client_id, camera_id]).encode('utf-8')
+ try:
+ self.web_sock.send_multipart([topic, content], flags=zmq.NOBLOCK)
+ except Exception as exc:
+ logger.error("Sending message to websocket failed: %r", exc)
else:
logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id)
logger.error("Available video threads keys: %r", self.video_threads.keys())
logger.error("Available video threads values: %r", self.video_threads.values())
- elif len(msg) == 2: # This is a ping message from client
- logger.debug("Received ping message: %r", msg)
- client_id, camera_id = msg[0], msg[1]
- if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
- # Respond with a pong message
- try:
- self.socket.send_multipart([client_id, camera_id, b'pong'])
- logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id)
- except zmq.ZMQError as exc:
- logger.error("Failed to send pong: %r", exc)
-
else:
- logger.warning("Received a message of not expected length from client: %r message: %r", client_id, msg)
- try:
- self.socket.send_multipart([client_id, camera_id, b'pong'])
- logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id)
- except zmq.ZMQError as exc:
- logger.error("Failed to send pong: %r", exc)
+ logger.warning("Received a message of unexpected length from client. Message length: %d", len(msg))
else:
logger.debug("No message received, polling again ...")
time.sleep(5)
+ if self.web_sock in sockets:
+ frontend_msg = self.web_sock.recv_multipart()
+ logger.info("Received message from frontend: %r", frontend_msg)
+ self.socket.send_multipart(frontend_msg)
+ logger.info("Forwarded message to backend: %r", frontend_msg)
self.monitor.stop()
self.monitor.join()
+
for camera_thread in self.video_threads.values():
for thread in camera_thread.values():
thread.queue.put(None) # Sentinel to unblock queue.get()
@@ -183,7 +270,7 @@ class ServerWorker(Thread):
"""Send control message to a specific client and camera."""
if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
self.socket.send_multipart([client_id.encode("utf-8"), camera_id.encode("utf-8"), message])
- logger.debug("Sent control message to client '%s', camera '%s': %r", client_id, camera_id, message)
+ logger.info("Sent control message to client '%s', camera '%s': %r", client_id, camera_id, message)
else:
logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id)
@@ -195,34 +282,62 @@ class ServerWorker(Thread):
class VideoWorker(Thread):
"""Class for video threads."""
- def __init__(self, filename, queue):
+ def __init__(self, client_id, camera_id, filename, queue):
super().__init__(daemon=True)
+ self.context = zmq.Context()
+ self.context.setsockopt(zmq.LINGER, 0)
+ self.client_id = client_id
+ self.camera_id = camera_id
self.filename = filename
self.queue = queue
self.live = True
+ def stop(self):
+ logger.info("VideoWorker %r exiting ...", self.camera_id)
+ self.live = False
+
def run(self):
if os.path.exists(self.filename):
logger.warning("File '%s' already exists, overwriting ...", self.filename)
- with open(self.filename, 'wb') as f:
- logger.info("VideoWorker started writing to file: %s", self.filename)
- while self.live:
- # Simulate video processing
- frame = self.queue.get() # Get frame from queue
- logger.debug("Processing ('writing to a file') frame for camera: %r", frame)
- if frame is None:
- logger.info("VideoWorker received stop signal, exiting ...")
- break
- f.write(frame + b'\n') # Write frame to file
-
- def stop(self):
- logger.info("VideoWorker '' exiting ...")
- self.live = False
+ fourcc = cv2.VideoWriter_fourcc(*'VP80')
+ out = cv2.VideoWriter(self.filename, fourcc, 30.0, (640, 480)) # Assuming 640x480 resolution
+ logger.info("This is the first run, binding websocket ...")
+ while self.live:
+ logger.debug("VideoWorker writing to file: %s", self.filename)
+ frame_bytes = self.queue.get()
+ if frame_bytes is None:
+ logger.debug("Received None, stopping video worker for camera: '%s'", self.camera_id)
+ break
+ frame = cv2.imdecode(numpy.frombuffer(frame_bytes, dtype=numpy.uint8), cv2.IMREAD_COLOR)
+ logger.debug("Processing ('writing to a file') frame for camera: '%s'", self.camera_id)
+ # Write frame to file:
+ out.write(frame)
+
+ # Release video writer
+ out.release()
+ logger.info("VideoWorker finished writing to file: %s", self.filename)
def signal_handler(sig, frame):
worker.stop()
+def update_clients(client_id, camera_id):
+ """Update client and camera dictionary and write to a shared file."""
+ global CLIENTS_DICT
+ if client_id not in CLIENTS_DICT:
+ logger.debug("Client_id not in CLIENTS_DICT, adding an empty list for it.")
+ CLIENTS_DICT[client_id] = []
+ if camera_id not in CLIENTS_DICT[client_id]:
+ logger.debug("Camera_id not in CLIENTS_DICT[%s] list, adding", client_id)
+ CLIENTS_DICT[client_id].append(camera_id)
+ # Atomic write using tempfile. Works only when both files on the same filesystem
+ with tempfile.NamedTemporaryFile('w', dir=TMP_DIR, delete=False) as tf:
+ logger.debug("Dumping to file CLIENTS_DICT: %r", CLIENTS_DICT)
+ json.dump(CLIENTS_DICT, tf)
+ tempname = tf.name
+ os.replace(tempname, CLIENTS_JSON_FILE)
+
+
if __name__ == "__main__":
logger.info("Starting up ...")
@@ -233,3 +348,7 @@ if __name__ == "__main__":
worker.start()
worker.join()
+ try:
+ os.remove(CLIENTS_JSON_FILE)
+ except FileNotFoundError:
+ pass \ No newline at end of file