事件驱动平台上的两个并行轮询任务

我目前正在基于事件驱动架构的服务器平台上工作。事件应通过Websocket连接进入系统,经过一些处理后,对该事件的响应也应通过相同的Websocket连接离开系统。这个想法背后的实现逻辑是,如果与服务器建立了连接,则我将其放置一段时间,然后等待它向我发送数据,直到断开连接为止。传入的数据被放入队列中,工作线程将从中将其拉出并进行处理。另一方面,我创建了一个任务,该任务正在轮询传出事件队列,如果队列中有事件,它将发送给相应的接收者。不幸的是,我当前的异步逻辑存在缺陷,即轮询传出事件队列会阻塞接收任务,而且我无法解决问题。以下是一些代码片段,它们应该代表上述问题:

启动websocket服务器

def run(self, address: str, port: int, ssl_context: ssl.SSLContext = None):
    start_server = websockets.serve(
        self.websocket_connection_handler, address, port, ssl=ssl_context)

    event_loop = asyncio.get_event_loop()
    event_loop.create_task(self.send_heartbeat())
    event_loop.create_task(self.dispatch_outgoing_events())

    print(f'Running on {"wss" if ssl_context else "ws"}://{address}:{port}')
    event_loop.run_until_complete(start_server)
    event_loop.run_forever()

分派器功能,可无限轮询传出队列中的数据

async def dispatch_outgoing_events(self):
    while not self.exit_state.should_exit:
        if len(self.outgoing_event_queue) == 0:
            await asyncio.sleep(0)
        else:
            event = self.outgoing_event_queue.get_event()
            destination = event.destination
            client_id = re.findall(
                r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}', destination)[0]
            client = self.client_store.get(client_id)
            await client.websocket.send(serializer.serialize(event))

websocket的连接处理函数

async def websocket_connection_handler(self, websocket, path):
    client_id = await self.register(websocket)

    try:
        while not self.exit_state.should_exit:
            correlation_id = str(uuid4())

            message = await websocket.recv()
            else:
                try:
                    event = serializer.deserialize(
                        message, correlation_id, client_id)
                    event.return_address = f'remote://websocket/{client_id}'
                    self.incoming_event_queue.add_event(event)
                except Exception as e:
                    event = type('evt', (object,), dict(system_entry=str(
                        datetime.datetime.utcnow()), destination=f'remote://websocket/{client_id}'))()

                    self.exception_handler.handle_exception(
                        e, event)

    except Exception as exception:
        print(
            f'client {client_id} suddenly disconnected. Reason: {type(exception).__name__} -> {exception}')
        self.client_store.remove(client_id)
        self.topic_factory.remove_client(client_id)
        self.topic_factory.get_topic('server_notifications').publish(ClientDisconnectedNotification(client_id),
                                                                        str(uuid4()))
评论