#!/usr/bin/env python """ A module containing zeromq router for video streaming from a client and sending control messages from server. """ __author__ = "Franoosh Corporation" import os import signal import logging import random from collections import defaultdict, deque import zmq from helpers import CustomLoggingFormatter # TODO: add configparser IP_FRONTEND = '127.0.0.1' IP_BACKEND = '127.0.0.1' PORT_FRONTEND = "5569" PORT_BACKEND = "9979" FRONTEND_ADDR = f"tcp://{IP_FRONTEND}:{PORT_FRONTEND}" BACKEND_ADDR = f"tcp://{IP_BACKEND}:{PORT_BACKEND}" LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log" LOGLEVEL = logging.INFO log_formatter = CustomLoggingFormatter() handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') handler.setFormatter(log_formatter) logging.root.addHandler(handler) logging.root.setLevel(LOGLEVEL) logger = logging.getLogger(__name__) logging.basicConfig( filename=LOGFILE, datefmt='%Y-%m-%d %I:%M:%S', level=LOGLEVEL, ) RUNNING = True def custom_proxy(): context = zmq.Context.instance() poller = zmq.Poller() # Set up frontend: frontend_socket = context.socket(zmq.ROUTER) frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) frontend_socket.bind(FRONTEND_ADDR) poller.register(frontend_socket, zmq.POLLIN) # Set up backend: backend_socket = context.socket(zmq.ROUTER) backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) backend_socket.bind(BACKEND_ADDR) poller.register(backend_socket, zmq.POLLIN) awailable_workers = [] client_worker_dict = {} # There are three pending messages dictionaries: # 1. pending_messages_no_worker[client_id] - messages that are pending for a client # 2. pending_messages_client_worker[client_id][worker_id] - messages that are pending for a client with a specific worker # 3. pending_messages_worker_client[worker_id][client_id] - messages that are pending for a worker # To assure delivey of the entire video we need to store messages from start to end until they are delivered # to the client. # If a worker is not available for certain amount of time, we will reassign it to another worker. # For now, there is only one worker. Remember to implement this later. # Complex case: half delivered video to the worker, worker becomes unavailable and we need to reassign it to another worker. pending_messages_no_worker = defaultdict(deque) # Messages that are pending for a client with no worker assigned pending_messages_client_worker = defaultdict(lambda: defaultdict(deque)) pending_messages_worker_client = defaultdict(lambda: defaultdict(deque)) while RUNNING: # Poll both frontend and backend sockets: events = dict(poller.poll(1000)) # If message from client: if frontend_socket in events: # Receive message from client: msg = frontend_socket.recv_multipart() logger.debug("Received message.") client_id, content = msg[0], msg[1:] # Check if client is already connected to worker: try: # Get worker ID for this client from the client-worker dictionary: worker_id = client_worker_dict[client_id] # Check if there are any pending messages for this worker and send them first: while pending_messages_client_worker[client_id][worker_id]: # In order to take a peek at the size of all messages, perhaps check out: # https://web.archive.org/web/20240804164028/https://code.activestate.com/recipes/546530/ logger.debug( "There are '%d' pending messages from client %r for worker %r", len(pending_messages_client_worker[client_id][worker_id]), client_id, worker_id, ) pending_msg = pending_messages_client_worker[client_id][worker_id].pop() try: logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id) backend_socket.send_multipart([worker_id, client_id, *pending_msg]) except Exception as e: logger.error("Failed to send pending message to worker: %r, error: %r", worker_id, e) # Re-queue the message: pending_messages_client_worker[client_id][worker_id].append(pending_msg) # Break to avoid infinite loop: break # If there are still pending messages for this worker: if pending_messages_client_worker[client_id][worker_id]: # Store message for later delivery: pending_messages_client_worker[client_id][worker_id].appendleft(content) # At last send new message to worker: else: try: logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id) backend_socket.send_multipart([worker_id, client_id, *content]) except Exception as e: logger.error("Failed to send message to backend, error: %s", e) # Store message for later delivery: # pending_messages_client_worker[client_id][worker_id].appendleft(msg) <- FIXME: this is the bug, no? pending_messages_client_worker[client_id][worker_id].appendleft(content) # If client is not connected to any worker: except KeyError: logger.info("Received ID from client: %r.", client_id) # Check if there are available workers. if awailable_workers: # Assign random worker to client: worker_id = random.choice(awailable_workers) # Update client-worker dictionary: client_worker_dict[client_id] = worker_id # Check if there are any pending messages for this client: if pending_messages_no_worker[client_id]: # Move messages to pending messages for this worker: pending_messages_client_worker[client_id][worker_id].extendleft(msg) logger.debug("Moved pending messages for client '%s' to worker '%s'", client_id, worker_id) # Check if there are any pending messages for this worker: while pending_messages_client_worker[client_id][worker_id]: pending_msg = pending_messages_client_worker[client_id][worker_id].pop() try: logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id) backend_socket.send_multipart([worker_id, client_id, *pending_msg]) except Exception as e: logger.error("Failed to send pending message to worker: %r. Error: %s", worker_id, e) # Re-queue the message: pending_messages_client_worker[client_id][worker_id].append(pending_msg) break # If there are still pending messages for this worker: if pending_messages_client_worker[client_id][worker_id]: logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery.", client_id, worker_id) # Store message for later delivery: pending_messages_client_worker[client_id][worker_id].appendleft(content) else: logger.debug("No more pending messages for client '%s', sending new message to worker '%s'.", client_id, worker_id) try: logger.debug("No more pending messages for client '%s', sending new message to worker '%s'.", client_id, worker_id) # At last, send the message to worker: backend_socket.send_multipart([worker_id, client_id, *content]) except Exception as e: logger.error("Failed to send message for client %r to backend, requeing. Error: %s", client_id, e) # Store message for later delivery: pending_messages_client_worker[client_id][worker_id].appendleft(msg) else: # Store message for later delivery: pending_messages_no_worker[client_id].append(content) logger.debug("No available workers, storing client '%s' with no worker assigned", client_id) if backend_socket in events: _msg = backend_socket.recv_multipart() # These messages should be safe to log as they are short strings or bytes. logger.debug("Received from worker: %r", _msg) if len(_msg) == 2: # Worker is sending its identity: worker_id, msg = _msg[0], _msg[1:] logger.debug("Received ID from worker: %r, message: %r", worker_id, msg) logger.info("Received ID from worker: %r.", worker_id) # Add worker to available workers list: awailable_workers.append(worker_id) for client, worker in client_worker_dict.items(): # If client has no worker assigned, assign it a new worker: if worker is None: client_worker_dict[client] = worker logger.debug("Assigning worker %r to client %r", worker, client) if pending_messages_no_worker: # If there are pending messages for clients with no worker assigned: for client_id, messages in pending_messages_no_worker.items(): if messages: # Move messages to pending messages for this worker: pending_messages_client_worker[client_id][worker_id].extendleft(messages) # Clear pending messages for this client: pending_messages_no_worker[client_id].clear() logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id) try: # Get the first message for this client and worker: pending_msg = pending_messages_client_worker[client_id][worker_id].pop() logger.debug("Sending pending message to worker %r for client '%r'", worker_id, client_id) # Send the message to worker: backend_socket.send_multipart([worker_id, client_id, *pending_msg]) except Exception as e: pending_messages_client_worker[client_id][worker_id].append(pending_msg) logger.error("Failed to send pending message to worker: %r. Error: %r", worker_id, e) else: # Worker is sending a message to client: worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:] logger.debug("Received message from worker '%s' for client '%s': %r", worker_id, client_id, msg) # First check if there are any pending messages for this worker: while pending_messages_worker_client[worker_id][client_id]: pending_msg = pending_messages_worker_client[worker_id][client_id].pop() try: logger.debug("Sending pending message %r from %r to client '%r'", pending_msg, client_id, worker_id) backend_socket.send_multipart([client_id, *pending_msg]) except Exception as e: # It is safe to log the message content as it is a short string or bytes. logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) # Re-queue the message: pending_messages_worker_client[worker_id][client_id].append(pending_msg) # Break to avoid infinite loop: break # If no pending messages for this client, send new message to the client : if not pending_messages_worker_client[worker_id][client_id]: logger.debug("Sending message %r to client '%s'", msg, client_id) try: frontend_socket.send_multipart([client_id, *msg]) logger.debug("Sent message %r to client '%r'", msg, client_id) except Exception as e: pending_messages_worker_client[worker_id][client_id].appendleft(msg) logger.error("Could not deliver message: %r from worker: %r to client: '%r'", msg, worker_id, client_id) def signal_handler(sig, frame): global RUNNING logger.info("Received signal handler %r, stopping ...", signal.Signals(sig).name) RUNNING = False if __name__ == "__main__": signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) logger.info("Starting up ...") custom_proxy()