diff options
Diffstat (limited to 'router.py')
| -rw-r--r-- | router.py | 160 |
1 files changed, 99 insertions, 61 deletions
@@ -18,17 +18,17 @@ from helpers import CustomLoggingFormatter # TODO: add configparser -ADDR_FRONTEND = 'tcp://localhost' -ADDR_BACKEND = 'tcp://localhost' +IP_FRONTEND = '127.0.0.1' +IP_BACKEND = '127.0.0.1' PORT_FRONTEND = "5569" PORT_BACKEND = "9979" -INITIAL_FRONTEND_ADDR = f"{ADDR_FRONTEND}:{PORT_FRONTEND}" -INITIAL_BACKEND_ADDR = f"{ADDR_BACKEND}:{PORT_BACKEND}" +FRONTEND_ADDR = f"tcp://{IP_FRONTEND}:{PORT_FRONTEND}" +BACKEND_ADDR = f"tcp://{IP_BACKEND}:{PORT_BACKEND}" LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log" -LOGLEVEL = logging.DEBUG +LOGLEVEL = logging.INFO log_formatter = CustomLoggingFormatter() handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') @@ -52,13 +52,13 @@ def custom_proxy(): # Set up frontend: frontend_socket = context.socket(zmq.ROUTER) frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) - frontend_socket.bind(INITIAL_FRONTEND_ADDR) + frontend_socket.bind(FRONTEND_ADDR) poller.register(frontend_socket, zmq.POLLIN) # Set up backend: backend_socket = context.socket(zmq.ROUTER) backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) - backend_socket.bind(INITIAL_BACKEND_ADDR) + backend_socket.bind(BACKEND_ADDR) poller.register(backend_socket, zmq.POLLIN) awailable_workers = [] @@ -78,101 +78,137 @@ def custom_proxy(): pending_messages_worker_client = defaultdict(lambda: defaultdict(deque)) while RUNNING: - events = dict(poller.poll(1000)) # Poll both frontend and backend sockets: - if frontend_socket in events: # If message from client: - msg = frontend_socket.recv_multipart() # Receive message from client: - logger.debug("Received message: %r", msg) + # Poll both frontend and backend sockets: + events = dict(poller.poll(1000)) + # If message from client: + if frontend_socket in events: + # Receive message from client: + msg = frontend_socket.recv_multipart() + logger.debug("Received message.") client_id, content = msg[0], msg[1:] - if not client_id in client_worker_dict: # If client is not in client-worker dictionary: - logger.info("Received ID from client: %r.", client_id) # Check if client is already connected to worker: try: - worker_id = client_worker_dict[client_id] # Get worker ID for this client from the client-worker dictionary - while pending_messages_client_worker[client_id][worker_id]: # Check if there are any pending messages for this worker and send them first: - logger.debug("There are pending messages from client %r for worker %r", client_id, worker_id) + # Get worker ID for this client from the client-worker dictionary: + worker_id = client_worker_dict[client_id] + # Check if there are any pending messages for this worker and send them first: + while pending_messages_client_worker[client_id][worker_id]: + # In order to take a peek at the size of all messages, perhaps check out: + # https://web.archive.org/web/20240804164028/https://code.activestate.com/recipes/546530/ + logger.debug( + "There are '%d' pending messages from client %r for worker %r", + len(pending_messages_client_worker[client_id][worker_id]), + client_id, + worker_id, + ) pending_msg = pending_messages_client_worker[client_id][worker_id].pop() try: - logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id) + logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id) backend_socket.send_multipart([worker_id, client_id, *pending_msg]) except Exception as e: - logger.error("Failed to send pending message: %r to worker: %r, error: %r", pending_msg, worker_id, e) - pending_messages_client_worker[client_id][worker_id].append(pending_msg) # Re-queue the message - break # Break to avoid infinite loop - if pending_messages_client_worker[client_id][worker_id]: # If there are still pending messages for this worker: - pending_messages_client_worker[client_id][worker_id].appendleft(content) # Store message for later delivery + logger.error("Failed to send pending message to worker: %r, error: %r", worker_id, e) + # Re-queue the message: + pending_messages_client_worker[client_id][worker_id].append(pending_msg) + # Break to avoid infinite loop: + break + # If there are still pending messages for this worker: + if pending_messages_client_worker[client_id][worker_id]: + # Store message for later delivery: + pending_messages_client_worker[client_id][worker_id].appendleft(content) + # At last send new message to worker: else: try: - logger.debug("No more pending messages for client %r, sending new message to worker %r: %r", client_id, worker_id, msg) - backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker: + logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id) + backend_socket.send_multipart([worker_id, client_id, *content]) except Exception as e: - logger.error("Failed to send message to backend: %r, error: %s", msg, e) - pending_messages_client_worker[client_id][worker_id].appendleft(msg) # Store message for later delivery - except KeyError: # If client is not connected to any worker: - logger.debug("Client '%s' is not connected to any worker, checking available workers", client_id) - if awailable_workers: # If there are available workers: - worker_id = random.choice(awailable_workers) # Assign random worker to client - client_worker_dict[client_id] = worker_id # Update client-worker dictionary - if pending_messages_no_worker[client_id]: # Check if there are any pending messages for this client: - pending_messages_client_worker[client_id][worker_id].extendleft(msg) # Move messages to pending messages for this worker + logger.error("Failed to send message to backend, error: %s", e) + # Store message for later delivery: + # pending_messages_client_worker[client_id][worker_id].appendleft(msg) <- FIXME: this is the bug, no? + pending_messages_client_worker[client_id][worker_id].appendleft(content) + # If client is not connected to any worker: + except KeyError: + logger.info("Received ID from client: %r.", client_id) + # Check if there are available workers. + if awailable_workers: + # Assign random worker to client: + worker_id = random.choice(awailable_workers) + # Update client-worker dictionary: + client_worker_dict[client_id] = worker_id + # Check if there are any pending messages for this client: + if pending_messages_no_worker[client_id]: + # Move messages to pending messages for this worker: + pending_messages_client_worker[client_id][worker_id].extendleft(msg) logger.debug("Moved pending messages for client '%s' to worker '%s'", client_id, worker_id) - while pending_messages_client_worker[client_id][worker_id]: # Check if there are any pending messages for this worker: + # Check if there are any pending messages for this worker: + while pending_messages_client_worker[client_id][worker_id]: pending_msg = pending_messages_client_worker[client_id][worker_id].pop() try: - logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id) + logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id) backend_socket.send_multipart([worker_id, client_id, *pending_msg]) except Exception as e: - logger.error("Failed to send pending message: %r to worker: %r. Error: %s", pending_msg, worker_id, e) - pending_messages_client_worker[client_id][worker_id].append(pending_msg) # Re-queue the message + logger.error("Failed to send pending message to worker: %r. Error: %s", worker_id, e) + # Re-queue the message: + pending_messages_client_worker[client_id][worker_id].append(pending_msg) break - if pending_messages_client_worker[client_id][worker_id]: # If there are still pending messages for this worker: - logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery: %r", client_id, worker_id, content) - pending_messages_client_worker[client_id][worker_id].appendleft(content) # Store message for later delivery + # If there are still pending messages for this worker: + if pending_messages_client_worker[client_id][worker_id]: + logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery.", client_id, worker_id) + # Store message for later delivery: + pending_messages_client_worker[client_id][worker_id].appendleft(content) else: - logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg) + logger.debug("No more pending messages for client '%s', sending new message to worker '%s'.", client_id, worker_id) try: - logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg) - backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker: + logger.debug("No more pending messages for client '%s', sending new message to worker '%s'.", client_id, worker_id) + # At last, send the message to worker: + backend_socket.send_multipart([worker_id, client_id, *content]) except Exception as e: - logger.error("Failed to send message %r for client %r to backend, requeing. Error: %s", msg, client_id, e) - pending_messages_client_worker[client_id][worker_id].appendleft(msg) # Store message for later delivery + logger.error("Failed to send message for client %r to backend, requeing. Error: %s", client_id, e) + # Store message for later delivery: + pending_messages_client_worker[client_id][worker_id].appendleft(msg) - else: # If no workers are available, assign a new worker: - pending_messages_no_worker[client_id].append(content) # Store message for later delivery + else: + # Store message for later delivery: + pending_messages_no_worker[client_id].append(content) logger.debug("No available workers, storing client '%s' with no worker assigned", client_id) if backend_socket in events: _msg = backend_socket.recv_multipart() + # These messages should be safe to log as they are short strings or bytes. logger.debug("Received from worker: %r", _msg) if len(_msg) == 2: # Worker is sending its identity: worker_id, msg = _msg[0], _msg[1:] logger.debug("Received ID from worker: %r, message: %r", worker_id, msg) logger.info("Received ID from worker: %r.", worker_id) - awailable_workers.append(worker_id) # Add worker to available workers list + # Add worker to available workers list: + awailable_workers.append(worker_id) for client, worker in client_worker_dict.items(): - if worker is None: # If client has no worker assigned: + # If client has no worker assigned, assign it a new worker: + if worker is None: client_worker_dict[client] = worker logger.debug("Assigning worker %r to client %r", worker, client) if pending_messages_no_worker: # If there are pending messages for clients with no worker assigned: for client_id, messages in pending_messages_no_worker.items(): if messages: - pending_messages_client_worker[client_id][worker_id].extendleft(messages) # Move messages to pending messages for this worker - pending_messages_no_worker[client_id].clear() # Clear pending messages for this client + # Move messages to pending messages for this worker: + pending_messages_client_worker[client_id][worker_id].extendleft(messages) + # Clear pending messages for this client: + pending_messages_no_worker[client_id].clear() logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id) try: - pending_msg = pending_messages_client_worker[client_id][worker_id].pop() # Get the first message for this client and worker - logger.debug( - "Sending pending message %r to worker %r for client '%r'", pending_msg, worker_id, client_id) - backend_socket.send_multipart([worker_id, client_id, *pending_msg]) # Send the message to worker + # Get the first message for this client and worker: + pending_msg = pending_messages_client_worker[client_id][worker_id].pop() + logger.debug("Sending pending message to worker %r for client '%r'", worker_id, client_id) + # Send the message to worker: + backend_socket.send_multipart([worker_id, client_id, *pending_msg]) except Exception as e: pending_messages_client_worker[client_id][worker_id].append(pending_msg) - logger.error("Failed to send pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) + logger.error("Failed to send pending message to worker: %r. Error: %r", worker_id, e) else: # Worker is sending a message to client: worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:] logger.debug("Received message from worker '%s' for client '%s': %r", worker_id, client_id, msg) # First check if there are any pending messages for this worker: - while pending_messages_worker_client[worker_id][client_id]: # Check if there are any pending messages for this worker: + while pending_messages_worker_client[worker_id][client_id]: pending_msg = pending_messages_worker_client[worker_id][client_id].pop() try: logger.debug("Sending pending message %r from %r to client '%r'", pending_msg, client_id, worker_id) @@ -180,10 +216,12 @@ def custom_proxy(): except Exception as e: # It is safe to log the message content as it is a short string or bytes. logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) - pending_messages_worker_client[worker_id][client_id].append(pending_msg) # Re-queue the message - break # Break to avoid infinite loop - # Now send the message to the client if no pending messages for this client: - if not pending_messages_worker_client[worker_id][client_id]: # If there are no pending messages for this worker: + # Re-queue the message: + pending_messages_worker_client[worker_id][client_id].append(pending_msg) + # Break to avoid infinite loop: + break + # If no pending messages for this client, send new message to the client : + if not pending_messages_worker_client[worker_id][client_id]: logger.debug("Sending message %r to client '%s'", msg, client_id) try: frontend_socket.send_multipart([client_id, *msg]) |
