diff options
Diffstat (limited to 'worker.py')
| -rw-r--r-- | worker.py | 247 |
1 files changed, 183 insertions, 64 deletions
@@ -1,29 +1,51 @@ #!/usr/bin/env python -__author__ = "Franoosh Corporation" """ A module consisting of a zeromq worker receiving video stream via router from clients. Able to send control messages to clients. +Needs libzmq and pyzmq with 'drafts' support. """ import os +import sys from threading import Thread, Event import time import signal import logging from queue import Queue from collections import defaultdict +import tempfile +import json import zmq +import cv2 +import numpy from helpers import CustomLoggingFormatter +__author__ = "Franoosh Corporation" + +# Various constants: # TODO: put them in a config + +HOST = "127.0.0.1" +ZMQPORT = "9979" +# WSPORT = "8000" +WSPORT = "8008" +ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}" +WEB_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}" +# LOGLEVEL = logging.DEBUG +LOGLEVEL = logging.INFO + +# File paths: -ADDR = "tcp://localhost" -PORT = "9979" -INITIAL_BACKEND_ADDR = f"{ADDR}:{PORT}" LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log" -LOGLEVEL = logging.DEBUG +CWD = os.getcwd() +TMP_DIR = os.path.join(CWD, "tmp") +CLIENTS_JSON_FILE = os.path.join(CWD, 'clients.json') + +# Other: + +CLIENTS_DICT = {} stop_event = Event() @@ -39,6 +61,14 @@ logging.basicConfig( level=LOGLEVEL, ) + +if not os.path.exists(TMP_DIR): + try: + os.makedirs(TMP_DIR) + except Exception as exc: + logger.error("Could not create temporary directory: %r", exc) + sys.exit() + class MonitorTask(Thread): """Monitor task""" def __init__(self, socket): @@ -92,14 +122,45 @@ class ServerWorker(Thread): self.socket.identity = self.id.encode("utf-8") self.monitor = MonitorTask(self.socket) self.video_threads = defaultdict(lambda: defaultdict(dict)) + self.poller = zmq.Poller() + self.web_sock = self.context.socket(zmq.DEALER) self.connected = False self.running = True + def start_client(self, client_id, camera_id, filename): + """ + Start a video thread for new client_id and camera_id. + """ + if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id): # New client or new camera + q = Queue() + logger.debug("Starting new video thread for client '%s', camera '%s'", client_id, camera_id) + video_worker = VideoWorker(client_id, camera_id, filename, q) + video_worker.start() + self.video_threads[client_id][camera_id] = video_worker + + def stop_client(self, client_id, camera_id): + """ + Stop video thread for a client_id and camera_id. + """ + if client_id in self.video_threads and camera_id in self.video_threads[client_id]: + logger.debug("Stopping video thread for client '%s', camera '%s'", client_id, camera_id) + self.video_threads[client_id][camera_id].stop() # Stop the thread + del self.video_threads[client_id][camera_id] + logger.info("Stopped video thread for client '%s', camera '%s'", client_id, camera_id) + def run(self): + """ + Main loop of the worker. + Full of wonders. + """ logger.debug("ServerWorker '%s' starting ...", self.id) - self.socket.connect(INITIAL_BACKEND_ADDR) - self.poller = zmq.Poller() + self.socket.connect(ZMQ_BACKEND_ADDR) + try: + self.web_sock.bind(WEB_BACKEND_ADDR) + except Exception as exc: + logger.error("Connection to zmq websocket failed: %r", exc) self.poller.register(self.socket, zmq.POLLIN) + self.poller.register(self.web_sock, zmq.POLLIN) self.monitor.start() while self.running: @@ -112,67 +173,93 @@ class ServerWorker(Thread): if self.socket in sockets: self.connected = True msg = self.socket.recv_multipart() - logger.debug("ServerWorker '%s' received message: %r", self.id, msg) + logger.debug("ServerWorker '%s' received message of length: %d.", self.id, len(msg)) filename = None - # At the moment we don't expect any other message than a video message: + # At the moment we don't expect any other message than a start/stop message (of length 4) and a video message: if len(msg) == 4: # This is a message with start/stop directive and no video data. - logger.debug("Received start/stop directive: %r", msg) + logger.debug("Received start/stop directive: (?)") client_id, camera_id, metadata = msg[0], msg[1], msg[2] + # Convert bytes to str once and for all: + client_id = client_id.decode('utf-8') + camera_id = camera_id.decode('utf-8') + update_clients(client_id, camera_id) try: - directive, timestamp = metadata.decode("utf-8").split(":") - filename = f"{client_id.decode('utf-8')}_{camera_id.decode('utf-8')}-{timestamp}.mp4" - logger.debug("Directive: '%s', Timestamp: '%s', Filename: '%s'", directive, timestamp, filename) + # Directive and data are fron now on converted to strings: + directive, data = metadata.decode("utf-8").split(":") + logger.info( + "Received directive '%s' with data: %r for client '%s', camera '%s'", + directive, + data, + client_id, + camera_id, + ) except ValueError: logger.error("Invalid metadata format: %r", metadata) + directive = None continue - if directive == "start": - if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id): # New client or new camera - q = Queue() - logger.debug("Starting new video thread for client '%s', camera '%s'", client_id, camera_id) - video_worker = VideoWorker(filename, q) - video_worker.start() - self.video_threads[client_id][camera_id] = video_worker - elif directive == "stop": - if client_id in self.video_threads and camera_id in self.video_threads[client_id]: - logger.debug("Stopping video thread for client '%s', camera '%s'", client_id, camera_id) - self.video_threads[client_id][camera_id].queue.put(None) # Sentinel to stop the thread - del self.video_threads[client_id][camera_id] - logger.info("Stopped video thread for client '%s', camera '%s'", client_id, camera_id) + if directive == 'rename': + old_name, new_name, timestamp = None, None, None + try: + old_name, new_name, timestamp = data.split(":") + logger.info("Renamed video thread from '%s' to '%s'.", old_name, new_name) + except ValueError: + logger.error("Invalid rename data format: %r", data) + continue + if old_name and new_name and timestamp: + # I think it's better to stop the old thread and start a new one, + # rather than reuse the old one as it's less mucking about. + self.stop_client(old_name, camera_id) + self.start_client(new_name, camera_id, f"{new_name}_{camera_id}-{timestamp}.mkv") + to_remove= b':'.join([client_id, camera_id]) + try: + self.web_sock.send_multipart([to_remove, b'', b'']) # Notify webserver of rename + except Exception as exc: + logger.error("Sending rename notification to websocket failed: %r", exc) else: - logger.error("Unknown directive: '%s'", directive) - elif len(msg) == 3: # This is a video message with data only - logger.debug("Received video message with data only: %r", msg) - client_id, camera_id, msg = msg[0], msg[1], msg[2] + timestamp = data + filename = f"{client_id}_{camera_id}-{timestamp}.mkv" + logger.debug("Directive: '%s', Timestamp: '%s', Filename: '%s'", directive, timestamp, filename) + if directive == "start": + self.start_client(client_id, camera_id, filename) + elif directive == "stop": + self.stop_client(client_id, camera_id) + else: + logger.error("Unknown directive: %r", directive) + + elif len(msg) == 3: # This is a video message with data + logger.debug("Received video message with data only.") + client_id, camera_id, content = msg[0], msg[1], msg[2] + client_id = client_id.decode('utf-8') + camera_id = camera_id.decode('utf-8') if client_id in self.video_threads and camera_id in self.video_threads[client_id]: - self.video_threads[client_id][camera_id].queue.put(msg) + self.video_threads[client_id][camera_id].queue.put(content) + # Send only [client_id, camera_id, jpeg_bytes] to the webserver: + # zmq subsciber can subscribe to a topic defined by the first + # part of the multipart message, so in order to allow for a + # per camera subscription, we need to join client_id and camera_id + topic = ':'.join([client_id, camera_id]).encode('utf-8') + try: + self.web_sock.send_multipart([topic, content], flags=zmq.NOBLOCK) + except Exception as exc: + logger.error("Sending message to websocket failed: %r", exc) else: logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id) logger.error("Available video threads keys: %r", self.video_threads.keys()) logger.error("Available video threads values: %r", self.video_threads.values()) - elif len(msg) == 2: # This is a ping message from client - logger.debug("Received ping message: %r", msg) - client_id, camera_id = msg[0], msg[1] - if client_id in self.video_threads and camera_id in self.video_threads[client_id]: - # Respond with a pong message - try: - self.socket.send_multipart([client_id, camera_id, b'pong']) - logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id) - except zmq.ZMQError as exc: - logger.error("Failed to send pong: %r", exc) - else: - logger.warning("Received a message of not expected length from client: %r message: %r", client_id, msg) - try: - self.socket.send_multipart([client_id, camera_id, b'pong']) - logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id) - except zmq.ZMQError as exc: - logger.error("Failed to send pong: %r", exc) + logger.warning("Received a message of unexpected length from client. Message length: %d", len(msg)) else: logger.debug("No message received, polling again ...") time.sleep(5) + if self.web_sock in sockets: + frontend_msg = self.web_sock.recv_multipart() + logger.info("Received message from frontend: %r", frontend_msg) + self.socket.send_multipart(frontend_msg) + logger.info("Forwarded message to backend: %r", frontend_msg) self.monitor.stop() self.monitor.join() + for camera_thread in self.video_threads.values(): for thread in camera_thread.values(): thread.queue.put(None) # Sentinel to unblock queue.get() @@ -183,7 +270,7 @@ class ServerWorker(Thread): """Send control message to a specific client and camera.""" if client_id in self.video_threads and camera_id in self.video_threads[client_id]: self.socket.send_multipart([client_id.encode("utf-8"), camera_id.encode("utf-8"), message]) - logger.debug("Sent control message to client '%s', camera '%s': %r", client_id, camera_id, message) + logger.info("Sent control message to client '%s', camera '%s': %r", client_id, camera_id, message) else: logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id) @@ -195,34 +282,62 @@ class ServerWorker(Thread): class VideoWorker(Thread): """Class for video threads.""" - def __init__(self, filename, queue): + def __init__(self, client_id, camera_id, filename, queue): super().__init__(daemon=True) + self.context = zmq.Context() + self.context.setsockopt(zmq.LINGER, 0) + self.client_id = client_id + self.camera_id = camera_id self.filename = filename self.queue = queue self.live = True + def stop(self): + logger.info("VideoWorker %r exiting ...", self.camera_id) + self.live = False + def run(self): if os.path.exists(self.filename): logger.warning("File '%s' already exists, overwriting ...", self.filename) - with open(self.filename, 'wb') as f: - logger.info("VideoWorker started writing to file: %s", self.filename) - while self.live: - # Simulate video processing - frame = self.queue.get() # Get frame from queue - logger.debug("Processing ('writing to a file') frame for camera: %r", frame) - if frame is None: - logger.info("VideoWorker received stop signal, exiting ...") - break - f.write(frame + b'\n') # Write frame to file - - def stop(self): - logger.info("VideoWorker '' exiting ...") - self.live = False + fourcc = cv2.VideoWriter_fourcc(*'VP80') + out = cv2.VideoWriter(self.filename, fourcc, 30.0, (640, 480)) # Assuming 640x480 resolution + logger.info("This is the first run, binding websocket ...") + while self.live: + logger.debug("VideoWorker writing to file: %s", self.filename) + frame_bytes = self.queue.get() + if frame_bytes is None: + logger.debug("Received None, stopping video worker for camera: '%s'", self.camera_id) + break + frame = cv2.imdecode(numpy.frombuffer(frame_bytes, dtype=numpy.uint8), cv2.IMREAD_COLOR) + logger.debug("Processing ('writing to a file') frame for camera: '%s'", self.camera_id) + # Write frame to file: + out.write(frame) + + # Release video writer + out.release() + logger.info("VideoWorker finished writing to file: %s", self.filename) def signal_handler(sig, frame): worker.stop() +def update_clients(client_id, camera_id): + """Update client and camera dictionary and write to a shared file.""" + global CLIENTS_DICT + if client_id not in CLIENTS_DICT: + logger.debug("Client_id not in CLIENTS_DICT, adding an empty list for it.") + CLIENTS_DICT[client_id] = [] + if camera_id not in CLIENTS_DICT[client_id]: + logger.debug("Camera_id not in CLIENTS_DICT[%s] list, adding", client_id) + CLIENTS_DICT[client_id].append(camera_id) + # Atomic write using tempfile. Works only when both files on the same filesystem + with tempfile.NamedTemporaryFile('w', dir=TMP_DIR, delete=False) as tf: + logger.debug("Dumping to file CLIENTS_DICT: %r", CLIENTS_DICT) + json.dump(CLIENTS_DICT, tf) + tempname = tf.name + os.replace(tempname, CLIENTS_JSON_FILE) + + if __name__ == "__main__": logger.info("Starting up ...") @@ -233,3 +348,7 @@ if __name__ == "__main__": worker.start() worker.join() + try: + os.remove(CLIENTS_JSON_FILE) + except FileNotFoundError: + pass
\ No newline at end of file |
