aboutsummaryrefslogtreecommitdiff
path: root/router.py
blob: 12d18c3e8bdfbe7ed9158efc24ae484e4032880e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
#!/usr/bin/env python

"""
A module containing zeromq router for video streaming
from a client and sending control messages from server.
"""

__author__ = "Franoosh Corporation"

import os
import signal
import logging
import random
from collections import defaultdict, deque
import zmq

from helpers import CustomLoggingFormatter

# TODO: add configparser

IP_FRONTEND = '127.0.0.1'
IP_BACKEND = '127.0.0.1'

PORT_FRONTEND = "5569"
PORT_BACKEND = "9979"

FRONTEND_ADDR = f"tcp://{IP_FRONTEND}:{PORT_FRONTEND}"
BACKEND_ADDR = f"tcp://{IP_BACKEND}:{PORT_BACKEND}"

LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log"
LOGLEVEL = logging.INFO

log_formatter = CustomLoggingFormatter()
handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
handler.setFormatter(log_formatter)
logging.root.addHandler(handler)
logging.root.setLevel(LOGLEVEL)
logger = logging.getLogger(__name__)
logging.basicConfig(
    filename=LOGFILE,
    datefmt='%Y-%m-%d %I:%M:%S',
    level=LOGLEVEL,
)


RUNNING = True

def custom_proxy():
    context = zmq.Context.instance()
    poller = zmq.Poller()

    # Set up frontend:
    frontend_socket = context.socket(zmq.ROUTER)
    frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1)
    frontend_socket.bind(FRONTEND_ADDR)
    poller.register(frontend_socket, zmq.POLLIN)

    # Set up backend:
    backend_socket = context.socket(zmq.ROUTER)
    backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1)
    backend_socket.bind(BACKEND_ADDR)
    poller.register(backend_socket, zmq.POLLIN)

    awailable_workers = []
    client_worker_dict = {}
    # There are three pending messages dictionaries:
    # 1. pending_messages_no_worker[client_id] - messages that are pending for a client
    # 2. pending_messages_client_worker[client_id][worker_id] - messages that are pending for a client with a specific worker
    # 3. pending_messages_worker_client[worker_id][client_id] - messages that are pending for a worker
    # To assure delivey of the entire video we need to store messages from start to end until they are delivered
    # to the client.
    # If a worker is not available for certain amount of time, we will reassign it to another worker.
    # For now, there is only one worker. Remember to implement this later.
    # Complex case: half delivered video to the worker, worker becomes unavailable and we need to reassign it to another worker.

    pending_messages_no_worker = defaultdict(deque)  # Messages that are pending for a client with no worker assigned
    pending_messages_client_worker = defaultdict(lambda: defaultdict(deque))
    pending_messages_worker_client = defaultdict(lambda: defaultdict(deque))

    while RUNNING:
        # Poll both frontend and backend sockets:
        events = dict(poller.poll(1000))
        # If message from client:
        if frontend_socket in events:
            # Receive message from client:
            msg = frontend_socket.recv_multipart()
            logger.debug("Received message.")
            client_id, content = msg[0], msg[1:]
            # Check if client is already connected to worker:
            try:
                # Get worker ID for this client from the client-worker dictionary:
                worker_id = client_worker_dict[client_id]
                # Check if there are any pending messages for this worker and send them first:
                while pending_messages_client_worker[client_id][worker_id]:
                    # In order to take a peek at the size of all messages, perhaps check out:
                    # https://web.archive.org/web/20240804164028/https://code.activestate.com/recipes/546530/
                    logger.debug(
                        "There are '%d' pending messages from client %r for worker %r",
                        len(pending_messages_client_worker[client_id][worker_id]),
                        client_id,
                        worker_id,
                    )
                    pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
                    try:
                        logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id)
                        backend_socket.send_multipart([worker_id, client_id, *pending_msg])
                    except Exception as e:
                        logger.error("Failed to send pending message to worker: %r, error: %r", worker_id, e)
                        # Re-queue the message:
                        pending_messages_client_worker[client_id][worker_id].append(pending_msg)
                        # Break to avoid infinite loop:
                        break
                # If there are still pending messages for this worker:
                if pending_messages_client_worker[client_id][worker_id]:
                    # Store message for later delivery:
                    pending_messages_client_worker[client_id][worker_id].appendleft(content)
                # At last send new message to worker:
                else:
                    try:
                        logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id)
                        backend_socket.send_multipart([worker_id, client_id, *content])
                    except Exception as e:
                        logger.error("Failed to send message to backend, error: %s", e)
                        # Store message for later delivery:
                        # pending_messages_client_worker[client_id][worker_id].appendleft(msg) <- FIXME: this is the bug, no?
                        pending_messages_client_worker[client_id][worker_id].appendleft(content)
            # If client is not connected to any worker:
            except KeyError:
                logger.info("Received ID from client: %r.", client_id)
                # Check if there are available workers.
                if awailable_workers:
                    # Assign random worker to client:
                    worker_id = random.choice(awailable_workers)
                    # Update client-worker dictionary:
                    client_worker_dict[client_id] = worker_id
                    # Check if there are any pending messages for this client:
                    if pending_messages_no_worker[client_id]:
                        # Move messages to pending messages for this worker:
                        pending_messages_client_worker[client_id][worker_id].extendleft(msg)
                        logger.debug("Moved pending messages for client '%s' to worker '%s'", client_id, worker_id)
                    # Check if there are any pending messages for this worker:
                    while pending_messages_client_worker[client_id][worker_id]:
                        pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
                        try:
                            logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id)
                            backend_socket.send_multipart([worker_id, client_id, *pending_msg])
                        except Exception as e:
                            logger.error("Failed to send pending message to worker: %r. Error: %s", worker_id, e)
                            # Re-queue the message:
                            pending_messages_client_worker[client_id][worker_id].append(pending_msg)
                            break
                    # If there are still pending messages for this worker:
                    if pending_messages_client_worker[client_id][worker_id]:
                        logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery.", client_id, worker_id)
                        # Store message for later delivery:
                        pending_messages_client_worker[client_id][worker_id].appendleft(content)
                    else:
                        logger.debug("No more pending messages for client '%s', sending new message to worker '%s'.", client_id, worker_id)
                        try:
                            logger.debug("No more pending messages for client '%s', sending new message to worker '%s'.", client_id, worker_id)
                            # At last, send the message to worker:
                            backend_socket.send_multipart([worker_id, client_id, *content])
                        except Exception as e:
                            logger.error("Failed to send message for client %r to backend, requeing. Error: %s", client_id, e)
                            # Store message for later delivery:
                            pending_messages_client_worker[client_id][worker_id].appendleft(msg)

                else:
                    # Store message for later delivery:
                    pending_messages_no_worker[client_id].append(content)
                    logger.debug("No available workers, storing client '%s' with no worker assigned", client_id)

        if backend_socket in events:
            _msg = backend_socket.recv_multipart()
            # These messages should be safe to log as they are short strings or bytes.
            logger.debug("Received from worker: %r", _msg)
            if len(_msg) == 2: # Worker is sending its identity:
                worker_id, msg = _msg[0], _msg[1:]
                logger.debug("Received ID from worker: %r, message: %r", worker_id, msg)
                logger.info("Received ID from worker: %r.", worker_id)
                # Add worker to available workers list:
                awailable_workers.append(worker_id)
                for client, worker in client_worker_dict.items():
                    # If client has no worker assigned, assign it a new worker:
                    if worker is None:
                        client_worker_dict[client] = worker
                        logger.debug("Assigning worker %r to client %r", worker, client)
                if pending_messages_no_worker:
                    # If there are pending messages for clients with no worker assigned:
                    for client_id, messages in pending_messages_no_worker.items():
                        if messages:
                            # Move messages to pending messages for this worker:
                            pending_messages_client_worker[client_id][worker_id].extendleft(messages)
                            # Clear pending messages for this client:
                            pending_messages_no_worker[client_id].clear()
                            logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id)
                            try:
                                # Get the first message for this client and worker:
                                pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
                                logger.debug("Sending pending message to worker %r for client '%r'", worker_id, client_id)
                                # Send the message to worker:
                                backend_socket.send_multipart([worker_id, client_id, *pending_msg])
                            except Exception as e:
                                pending_messages_client_worker[client_id][worker_id].append(pending_msg)
                                logger.error("Failed to send pending message to worker: %r. Error: %r", worker_id, e)

            else:  # Worker is sending a message to client:
                worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:]
                logger.debug("Received message from worker '%s' for client '%s': %r", worker_id, client_id, msg)
                # First check if there are any pending messages for this worker:
                while pending_messages_worker_client[worker_id][client_id]:
                    pending_msg = pending_messages_worker_client[worker_id][client_id].pop()
                    try:
                        logger.debug("Sending pending message %r from %r to client '%r'", pending_msg, client_id, worker_id)
                        backend_socket.send_multipart([client_id, *pending_msg])
                    except Exception as e:
                        # It is safe to log the message content as it is a short string or bytes.
                        logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e)
                        # Re-queue the message:
                        pending_messages_worker_client[worker_id][client_id].append(pending_msg)
                        # Break to avoid infinite loop:
                        break
                # If no pending messages for this client, send new message to the client :
                if not pending_messages_worker_client[worker_id][client_id]:
                    logger.debug("Sending message %r to client '%s'", msg, client_id)
                    try:
                        frontend_socket.send_multipart([client_id, *msg])
                        logger.debug("Sent message %r to client '%r'", msg, client_id)
                    except Exception as e:
                        pending_messages_worker_client[worker_id][client_id].appendleft(msg)
                        logger.error("Could not deliver message: %r from worker: %r to client: '%r'", msg, worker_id, client_id)

def signal_handler(sig, frame):
    global RUNNING
    logger.info("Received signal handler %r, stopping ...", signal.Signals(sig).name)
    RUNNING = False


if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    logger.info("Starting up ...")

    custom_proxy()