aboutsummaryrefslogtreecommitdiff
path: root/router.py
diff options
context:
space:
mode:
Diffstat (limited to 'router.py')
-rw-r--r--router.py160
1 files changed, 99 insertions, 61 deletions
diff --git a/router.py b/router.py
index 61b2f72..12d18c3 100644
--- a/router.py
+++ b/router.py
@@ -18,17 +18,17 @@ from helpers import CustomLoggingFormatter
# TODO: add configparser
-ADDR_FRONTEND = 'tcp://localhost'
-ADDR_BACKEND = 'tcp://localhost'
+IP_FRONTEND = '127.0.0.1'
+IP_BACKEND = '127.0.0.1'
PORT_FRONTEND = "5569"
PORT_BACKEND = "9979"
-INITIAL_FRONTEND_ADDR = f"{ADDR_FRONTEND}:{PORT_FRONTEND}"
-INITIAL_BACKEND_ADDR = f"{ADDR_BACKEND}:{PORT_BACKEND}"
+FRONTEND_ADDR = f"tcp://{IP_FRONTEND}:{PORT_FRONTEND}"
+BACKEND_ADDR = f"tcp://{IP_BACKEND}:{PORT_BACKEND}"
LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log"
-LOGLEVEL = logging.DEBUG
+LOGLEVEL = logging.INFO
log_formatter = CustomLoggingFormatter()
handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
@@ -52,13 +52,13 @@ def custom_proxy():
# Set up frontend:
frontend_socket = context.socket(zmq.ROUTER)
frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1)
- frontend_socket.bind(INITIAL_FRONTEND_ADDR)
+ frontend_socket.bind(FRONTEND_ADDR)
poller.register(frontend_socket, zmq.POLLIN)
# Set up backend:
backend_socket = context.socket(zmq.ROUTER)
backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1)
- backend_socket.bind(INITIAL_BACKEND_ADDR)
+ backend_socket.bind(BACKEND_ADDR)
poller.register(backend_socket, zmq.POLLIN)
awailable_workers = []
@@ -78,101 +78,137 @@ def custom_proxy():
pending_messages_worker_client = defaultdict(lambda: defaultdict(deque))
while RUNNING:
- events = dict(poller.poll(1000)) # Poll both frontend and backend sockets:
- if frontend_socket in events: # If message from client:
- msg = frontend_socket.recv_multipart() # Receive message from client:
- logger.debug("Received message: %r", msg)
+ # Poll both frontend and backend sockets:
+ events = dict(poller.poll(1000))
+ # If message from client:
+ if frontend_socket in events:
+ # Receive message from client:
+ msg = frontend_socket.recv_multipart()
+ logger.debug("Received message.")
client_id, content = msg[0], msg[1:]
- if not client_id in client_worker_dict: # If client is not in client-worker dictionary:
- logger.info("Received ID from client: %r.", client_id)
# Check if client is already connected to worker:
try:
- worker_id = client_worker_dict[client_id] # Get worker ID for this client from the client-worker dictionary
- while pending_messages_client_worker[client_id][worker_id]: # Check if there are any pending messages for this worker and send them first:
- logger.debug("There are pending messages from client %r for worker %r", client_id, worker_id)
+ # Get worker ID for this client from the client-worker dictionary:
+ worker_id = client_worker_dict[client_id]
+ # Check if there are any pending messages for this worker and send them first:
+ while pending_messages_client_worker[client_id][worker_id]:
+ # In order to take a peek at the size of all messages, perhaps check out:
+ # https://web.archive.org/web/20240804164028/https://code.activestate.com/recipes/546530/
+ logger.debug(
+ "There are '%d' pending messages from client %r for worker %r",
+ len(pending_messages_client_worker[client_id][worker_id]),
+ client_id,
+ worker_id,
+ )
pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
try:
- logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id)
+ logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id)
backend_socket.send_multipart([worker_id, client_id, *pending_msg])
except Exception as e:
- logger.error("Failed to send pending message: %r to worker: %r, error: %r", pending_msg, worker_id, e)
- pending_messages_client_worker[client_id][worker_id].append(pending_msg) # Re-queue the message
- break # Break to avoid infinite loop
- if pending_messages_client_worker[client_id][worker_id]: # If there are still pending messages for this worker:
- pending_messages_client_worker[client_id][worker_id].appendleft(content) # Store message for later delivery
+ logger.error("Failed to send pending message to worker: %r, error: %r", worker_id, e)
+ # Re-queue the message:
+ pending_messages_client_worker[client_id][worker_id].append(pending_msg)
+ # Break to avoid infinite loop:
+ break
+ # If there are still pending messages for this worker:
+ if pending_messages_client_worker[client_id][worker_id]:
+ # Store message for later delivery:
+ pending_messages_client_worker[client_id][worker_id].appendleft(content)
+ # At last send new message to worker:
else:
try:
- logger.debug("No more pending messages for client %r, sending new message to worker %r: %r", client_id, worker_id, msg)
- backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker:
+ logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id)
+ backend_socket.send_multipart([worker_id, client_id, *content])
except Exception as e:
- logger.error("Failed to send message to backend: %r, error: %s", msg, e)
- pending_messages_client_worker[client_id][worker_id].appendleft(msg) # Store message for later delivery
- except KeyError: # If client is not connected to any worker:
- logger.debug("Client '%s' is not connected to any worker, checking available workers", client_id)
- if awailable_workers: # If there are available workers:
- worker_id = random.choice(awailable_workers) # Assign random worker to client
- client_worker_dict[client_id] = worker_id # Update client-worker dictionary
- if pending_messages_no_worker[client_id]: # Check if there are any pending messages for this client:
- pending_messages_client_worker[client_id][worker_id].extendleft(msg) # Move messages to pending messages for this worker
+ logger.error("Failed to send message to backend, error: %s", e)
+ # Store message for later delivery:
+ # pending_messages_client_worker[client_id][worker_id].appendleft(msg) <- FIXME: this is the bug, no?
+ pending_messages_client_worker[client_id][worker_id].appendleft(content)
+ # If client is not connected to any worker:
+ except KeyError:
+ logger.info("Received ID from client: %r.", client_id)
+ # Check if there are available workers.
+ if awailable_workers:
+ # Assign random worker to client:
+ worker_id = random.choice(awailable_workers)
+ # Update client-worker dictionary:
+ client_worker_dict[client_id] = worker_id
+ # Check if there are any pending messages for this client:
+ if pending_messages_no_worker[client_id]:
+ # Move messages to pending messages for this worker:
+ pending_messages_client_worker[client_id][worker_id].extendleft(msg)
logger.debug("Moved pending messages for client '%s' to worker '%s'", client_id, worker_id)
- while pending_messages_client_worker[client_id][worker_id]: # Check if there are any pending messages for this worker:
+ # Check if there are any pending messages for this worker:
+ while pending_messages_client_worker[client_id][worker_id]:
pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
try:
- logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id)
+ logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id)
backend_socket.send_multipart([worker_id, client_id, *pending_msg])
except Exception as e:
- logger.error("Failed to send pending message: %r to worker: %r. Error: %s", pending_msg, worker_id, e)
- pending_messages_client_worker[client_id][worker_id].append(pending_msg) # Re-queue the message
+ logger.error("Failed to send pending message to worker: %r. Error: %s", worker_id, e)
+ # Re-queue the message:
+ pending_messages_client_worker[client_id][worker_id].append(pending_msg)
break
- if pending_messages_client_worker[client_id][worker_id]: # If there are still pending messages for this worker:
- logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery: %r", client_id, worker_id, content)
- pending_messages_client_worker[client_id][worker_id].appendleft(content) # Store message for later delivery
+ # If there are still pending messages for this worker:
+ if pending_messages_client_worker[client_id][worker_id]:
+ logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery.", client_id, worker_id)
+ # Store message for later delivery:
+ pending_messages_client_worker[client_id][worker_id].appendleft(content)
else:
- logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg)
+ logger.debug("No more pending messages for client '%s', sending new message to worker '%s'.", client_id, worker_id)
try:
- logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg)
- backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker:
+ logger.debug("No more pending messages for client '%s', sending new message to worker '%s'.", client_id, worker_id)
+ # At last, send the message to worker:
+ backend_socket.send_multipart([worker_id, client_id, *content])
except Exception as e:
- logger.error("Failed to send message %r for client %r to backend, requeing. Error: %s", msg, client_id, e)
- pending_messages_client_worker[client_id][worker_id].appendleft(msg) # Store message for later delivery
+ logger.error("Failed to send message for client %r to backend, requeing. Error: %s", client_id, e)
+ # Store message for later delivery:
+ pending_messages_client_worker[client_id][worker_id].appendleft(msg)
- else: # If no workers are available, assign a new worker:
- pending_messages_no_worker[client_id].append(content) # Store message for later delivery
+ else:
+ # Store message for later delivery:
+ pending_messages_no_worker[client_id].append(content)
logger.debug("No available workers, storing client '%s' with no worker assigned", client_id)
if backend_socket in events:
_msg = backend_socket.recv_multipart()
+ # These messages should be safe to log as they are short strings or bytes.
logger.debug("Received from worker: %r", _msg)
if len(_msg) == 2: # Worker is sending its identity:
worker_id, msg = _msg[0], _msg[1:]
logger.debug("Received ID from worker: %r, message: %r", worker_id, msg)
logger.info("Received ID from worker: %r.", worker_id)
- awailable_workers.append(worker_id) # Add worker to available workers list
+ # Add worker to available workers list:
+ awailable_workers.append(worker_id)
for client, worker in client_worker_dict.items():
- if worker is None: # If client has no worker assigned:
+ # If client has no worker assigned, assign it a new worker:
+ if worker is None:
client_worker_dict[client] = worker
logger.debug("Assigning worker %r to client %r", worker, client)
if pending_messages_no_worker:
# If there are pending messages for clients with no worker assigned:
for client_id, messages in pending_messages_no_worker.items():
if messages:
- pending_messages_client_worker[client_id][worker_id].extendleft(messages) # Move messages to pending messages for this worker
- pending_messages_no_worker[client_id].clear() # Clear pending messages for this client
+ # Move messages to pending messages for this worker:
+ pending_messages_client_worker[client_id][worker_id].extendleft(messages)
+ # Clear pending messages for this client:
+ pending_messages_no_worker[client_id].clear()
logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id)
try:
- pending_msg = pending_messages_client_worker[client_id][worker_id].pop() # Get the first message for this client and worker
- logger.debug(
- "Sending pending message %r to worker %r for client '%r'", pending_msg, worker_id, client_id)
- backend_socket.send_multipart([worker_id, client_id, *pending_msg]) # Send the message to worker
+ # Get the first message for this client and worker:
+ pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
+ logger.debug("Sending pending message to worker %r for client '%r'", worker_id, client_id)
+ # Send the message to worker:
+ backend_socket.send_multipart([worker_id, client_id, *pending_msg])
except Exception as e:
pending_messages_client_worker[client_id][worker_id].append(pending_msg)
- logger.error("Failed to send pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e)
+ logger.error("Failed to send pending message to worker: %r. Error: %r", worker_id, e)
else: # Worker is sending a message to client:
worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:]
logger.debug("Received message from worker '%s' for client '%s': %r", worker_id, client_id, msg)
# First check if there are any pending messages for this worker:
- while pending_messages_worker_client[worker_id][client_id]: # Check if there are any pending messages for this worker:
+ while pending_messages_worker_client[worker_id][client_id]:
pending_msg = pending_messages_worker_client[worker_id][client_id].pop()
try:
logger.debug("Sending pending message %r from %r to client '%r'", pending_msg, client_id, worker_id)
@@ -180,10 +216,12 @@ def custom_proxy():
except Exception as e:
# It is safe to log the message content as it is a short string or bytes.
logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e)
- pending_messages_worker_client[worker_id][client_id].append(pending_msg) # Re-queue the message
- break # Break to avoid infinite loop
- # Now send the message to the client if no pending messages for this client:
- if not pending_messages_worker_client[worker_id][client_id]: # If there are no pending messages for this worker:
+ # Re-queue the message:
+ pending_messages_worker_client[worker_id][client_id].append(pending_msg)
+ # Break to avoid infinite loop:
+ break
+ # If no pending messages for this client, send new message to the client :
+ if not pending_messages_worker_client[worker_id][client_id]:
logger.debug("Sending message %r to client '%s'", msg, client_id)
try:
frontend_socket.send_multipart([client_id, *msg])