From bbb8f1ca775fc6c10617847495b38bfa870f669d Mon Sep 17 00:00:00 2001 From: peterakande Date: Sun, 5 Nov 2023 06:06:08 +0100 Subject: [PATCH 1/3] add: add redis pub/sub feature --- .idea/workspace.xml | 29 ++++++----- connection_manager.py | 87 ++++++++++++++++++++++++++++----- main.py | 24 ++++++++- models/message_to_room_model.py | 7 +++ requirements.txt | 2 + static/index.html | 5 +- 6 files changed, 124 insertions(+), 30 deletions(-) create mode 100644 models/message_to_room_model.py diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 2426e46..b5beb27 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -5,9 +5,12 @@ + + + - { - "keyToString": { - "ASKED_ADD_EXTERNAL_FILES": "true", - "ASKED_MARK_IGNORED_FILES_AS_EXCLUDED": "true", - "ASKED_SHARE_PROJECT_CONFIGURATION_FILES": "true", - "DefaultHtmlFileTemplate": "HTML File", - "RunOnceActivity.OpenProjectViewOnStart": "true", - "RunOnceActivity.ShowReadmeOnStart": "true", - "SHARE_PROJECT_CONFIGURATION_FILES": "true", - "git-widget-placeholder": "main", - "last_opened_file_path": "/Users/akandepeter/DevProjects/Backend/personal/FastApiWebSocket", - "settings.editor.selected.configurable": "preferences.pluginManager" + +}]]> diff --git a/connection_manager.py b/connection_manager.py index 354e3cb..03baf52 100644 --- a/connection_manager.py +++ b/connection_manager.py @@ -1,9 +1,13 @@ +import asyncio import traceback from typing import Dict, Set, List, Tuple from operator import itemgetter from fastapi import WebSocket from starlette.websockets import WebSocketState +from broadcaster import Broadcast + +from models.message_to_room_model import MessageToRoomModel class ConnectionManager: @@ -11,6 +15,8 @@ class ConnectionManager: This would handle all Message broadcast and all connections to the Service. """ + broadcaster = Broadcast("redis://localhost:6379") + def __init__(self): """ self.connections would be an in memory db of the users connected to a DB. @@ -25,13 +31,19 @@ def __init__(self): self.connections: Dict[str, Set[str]] = {} # The Room and the set of users connected self.user_connections: Dict[str, WebSocket] = {} # the user connections + async def connect_broadcaster(self): + await self.broadcaster.connect() + + async def disconnect_broadcaster(self): + await self.broadcaster.disconnect() + async def save_user_connection_record(self, ws_connection: WebSocket, user_id: str): """ This would save a user record to the Websocket connections that the connection manager is currently keeping """ self.user_connections[user_id] = ws_connection - await self.send_message_to_ws_connection(message="Connection successful", ws_connection=ws_connection) + await self._send_message_to_ws_connection(message="Connection successful", ws_connection=ws_connection) async def add_user_connection_to_room(self, room_id: str, user_id: str) -> (bool, str): """ @@ -49,21 +61,32 @@ async def add_user_connection_to_room(self, room_id: str, user_id: str) -> (bool # The user connection was gotten. - is_connection_active = await self.check_if_ws_connection_is_still_active(user_ws_connection) + is_connection_active = await self._check_if_ws_connection_is_still_active(user_ws_connection) if not is_connection_active: self.user_connections.pop(user_id) return False, "Connection not Active" - # Check if the room ID exists. If it does not, create one. + # Check if the room ID exists. If it does not, create one and listen to it if room_id not in self.connections.keys(): self.connections[room_id] = {user_id} + + # Now subscribe to the room + # An error occurs here, the line to subscribe and listen always hold the connection. + # So the fist request to subscribe to the room fails + + subscribe_n_listen_task = asyncio.create_task(self._subscribe_and_listen_to_channel(room_id=room_id)) + wait_for_subscribe_task = asyncio.create_task(asyncio.sleep(1)) # 1 Second delay + + # This coroutine would be exited when the time elaps, that should be anough time for the + # Subscription task to be done + await asyncio.wait([subscribe_n_listen_task, wait_for_subscribe_task], return_when=asyncio.FIRST_COMPLETED) else: self.connections[room_id].add(user_id) return True, "Connection Successful" - async def check_if_ws_connection_is_still_active(self, ws_connection: WebSocket, message=".") -> bool: + async def _check_if_ws_connection_is_still_active(self, ws_connection: WebSocket, message=".") -> bool: """ This function would check if the connection is still active. It tries to send a message """ @@ -84,27 +107,61 @@ async def check_if_ws_connection_is_still_active(self, ws_connection: WebSocket, return True - async def send_message_to_room(self, room_id: str, message: str): + async def _consume_events(self, message: MessageToRoomModel): """ - Messages in this Program are Texts. + Function to consume a message and send to all connect clients """ - room_connections = self.connections.get(room_id, {}) + room_connections = self.connections.get(message.room_id, {}) if len(room_connections) == 0: + # No user has connected to this room. + # The user has to connect to this room first, before sending message to the room return + users_ws_connections = itemgetter(*room_connections)(self.user_connections) - print(users_ws_connections) if type(users_ws_connections) is not tuple: + # It wouldn't be a tuple if only a user is connected to the room users_ws_connections = [users_ws_connections] for connection in users_ws_connections: - is_sent, sent_message_response_info = await self.send_message_to_ws_connection( - message=f"Room {room_id} --> {message}", ws_connection=connection) + is_sent, sent_message_response_info = await self._send_message_to_ws_connection( + message=f"Room {message.room_id} --> {message.message}", ws_connection=connection) + + async def _subscribe_and_listen_to_channel(self, room_id: str): + """ + This function subscribes to a channel and listens to event in the channel + """ + + async with self.broadcaster.subscribe(channel=room_id) as subscriber: + """ + Listen to every event from here + """ - # It can be chosen to remove the connection from the self.user_connections if is_sent is False. + async for event in subscriber: + print(event.message) + message = MessageToRoomModel.model_validate_json(event.message) - async def send_message_to_ws_connection(self, message: str, ws_connection: WebSocket) -> (bool, str): + await self._consume_events(message=message) + + async def send_message_to_room(self, message: MessageToRoomModel): + """ + Messages in this Program are Texts. + the room id is the channel of the room + # You just publish to the room + """ + + room_connections = self.connections.get(message.room_id, {}) + + if len(room_connections) == 0: + # No user has connected to this room. + # The user has to connect to this room first, before sending message to the room + return + + # Send events to the room + await self.broadcaster.publish(channel=message.room_id, message=message.model_dump_json()) + + async def _send_message_to_ws_connection(self, message: str, ws_connection: WebSocket) -> (bool, str): try: await ws_connection.send_text(message) return True, "Message sent!" @@ -117,4 +174,8 @@ async def send_message_to_ws_connection(self, message: str, ws_connection: WebSo return False, "Error Sending Message" def remove_user_connection(self, user_id): - self.user_connections.pop(user_id) + + try: + self.user_connections.pop(user_id) + except KeyError: + pass diff --git a/main.py b/main.py index e16f689..47861f2 100644 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ from connection_manager import ConnectionManager from models.register_to_room_model import RegisterToRoom +from models.message_to_room_model import MessageToRoomModel number_of_socket_connections = 0 connection_manager = ConnectionManager() @@ -14,6 +15,21 @@ app = FastAPI() +@app.on_event("startup") +async def startup(): + print("Conneting to redis") + await connection_manager.connect_broadcaster() + print("Connected to redis") + + +@app.on_event("shutdown") +async def shutdown(): + print("Disconnecting from redis") + await connection_manager.disconnect_broadcaster() + print("Disconnected from redis") + + + @app.get("/") async def get(): return FileResponse("static/index.html") @@ -52,14 +68,18 @@ async def websocket_endpoint(user_id: str, websocket: WebSocket): data = await websocket.receive_json() room_id = data["room_id"] - message = data["message"] + message_sent = data["message"] + user_id = data["user_id"] + + message = MessageToRoomModel(message=message_sent, room_id=room_id, user_id=user_id) - await connection_manager.send_message_to_room(message=message, room_id=room_id) + await connection_manager.send_message_to_room(message=message) except WebSocketDisconnect: # Remove the user from the connection stack connection_manager.remove_user_connection(user_id=user_id) except WebSocketException as e: traceback.print_exc() + # print("An error occurred and i dont know the details") diff --git a/models/message_to_room_model.py b/models/message_to_room_model.py new file mode 100644 index 0000000..41d920b --- /dev/null +++ b/models/message_to_room_model.py @@ -0,0 +1,7 @@ +from pydantic import BaseModel + + +class MessageToRoomModel(BaseModel): + user_id: str + message: str + room_id: str diff --git a/requirements.txt b/requirements.txt index 674addb..b3c856c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,7 @@ annotated-types==0.6.0 anyio==3.7.1 +asyncio-redis==0.16.0 +broadcaster==0.2.0 click==8.1.7 exceptiongroup==1.1.3 fastapi==0.103.2 diff --git a/static/index.html b/static/index.html index 3923f59..fef6e7b 100644 --- a/static/index.html +++ b/static/index.html @@ -23,7 +23,7 @@

WebSocket Chat


- +

@@ -88,7 +88,7 @@

WebSocket Chat

//The whole data has now been gotten, Make the request now - response = await fetch("/register_to_room", { + response = await fetch("/register_to_room/", { method: "POST", body: JSON.stringify({ user_id: userId, @@ -142,6 +142,7 @@

WebSocket Chat

var message = { room_id: roomId.value, message: input.value, + user_id: userId, } ws.send(JSON.stringify(message)) From 278a01bbcafc96ee8350dcff9ed3c39274e6ac08 Mon Sep 17 00:00:00 2001 From: peterakande Date: Sun, 5 Nov 2023 06:12:04 +0100 Subject: [PATCH 2/3] add: add comment corrections --- .idea/workspace.xml | 20 ++++++++++++-------- connection_manager.py | 5 ----- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/.idea/workspace.xml b/.idea/workspace.xml index b5beb27..901ab53 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -4,13 +4,8 @@
- - - + - - - @@ -122,6 +125,7 @@ \ No newline at end of file diff --git a/connection_manager.py b/connection_manager.py index 03baf52..ab32bbb 100644 --- a/connection_manager.py +++ b/connection_manager.py @@ -145,11 +145,6 @@ async def _subscribe_and_listen_to_channel(self, room_id: str): await self._consume_events(message=message) async def send_message_to_room(self, message: MessageToRoomModel): - """ - Messages in this Program are Texts. - the room id is the channel of the room - # You just publish to the room - """ room_connections = self.connections.get(message.room_id, {}) From bf41c6227ac0044c1cd390b29a8030ebcec098ed Mon Sep 17 00:00:00 2001 From: peterakande Date: Sun, 5 Nov 2023 06:33:01 +0100 Subject: [PATCH 3/3] add: update ReadMe.md --- ReadMe.md | 1 + 1 file changed, 1 insertion(+) diff --git a/ReadMe.md b/ReadMe.md index 3296a9a..9b87f89 100644 --- a/ReadMe.md +++ b/ReadMe.md @@ -3,6 +3,7 @@ A FastAPI Backend Service that simulates the Websocket needs of a chat app that involves: - Users being able to Exchange messages with one another - The concept of rooms or group chats +- Redis Pub Sub