Source code for swergio.server

import socket
import threading
import json
from uuid import uuid4

from .messageType import MESSAGE_TYPE

reserved_rooms = ['_command','_logging']

[docs]class Server: def __init__(self, ip: str, port: int, format: str, header_length: int, enable_logging=True) -> None: """ Initializes the server instance with given ip, port, format and header length :param ip: IP address of the server :type ip: str :param port: Port number of the server :type port: int :param format: Format of the message data :type format: str :param header_length: Length of the message header :type header_length: int :param enable_logging: Flag to enable logging of the messages :type enable_logging: bool """ self.ip = ip self.port = port self.format = format self.header_length = header_length self.enable_logging = enable_logging self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.bind((self.ip, self.port)) self.clients = set() self.rooms = dict() for room in reserved_rooms: self.rooms[room] = set() self.names = dict() self.clients_lock = threading.Lock()
[docs] def handle_client(self, client, addr): """ Handles the incoming client connection and processes the messages :param client: Client connection instance :type client: socket.socket :param addr: IP address of the client :type addr: tuple """ print(f"[NEW CONNECTION] {addr} connected.") try: connected = True name = "" while connected: message_header = client.recv(self.header_length) if not message_header: break message_length = int(message_header.decode(self.format)) full_message_received = False message = b'' message_length_left = message_length while not full_message_received: message += client.recv(message_length_left) message_length_left = message_length - len(message) if message_length_left <= 0: full_message_received = True message = message.decode(self.format) try: msg_content = json.loads(message) except: print(message_length) print(message) msg_type = MESSAGE_TYPE.by_id(msg_content["TYPE"]) if msg_type == MESSAGE_TYPE.COMMAND.DISCONNECT: connected = self.disconnect_client(client) break if msg_type == MESSAGE_TYPE.COMMAND.REGISTER: name = self.register_client(client, msg_content) if msg_type == MESSAGE_TYPE.COMMAND.JOINROOM: self.join_room(client, msg_content) if msg_type == MESSAGE_TYPE.COMMAND.LEAVEROOM: self.leave_room(client, msg_content) self.broadcast_message(client,message_header,message, msg_content) finally: client.close() with self.clients_lock: self.clients.remove(client) for room in self.rooms: if client in self.rooms[room]: self.rooms[room].remove(client) self.names.pop(client) print(f"[{addr}] Disconnected.")
[docs] def register_client(self,client,msg_content): """ Registers the client with the given name :param client: Client connection instance :type client: socket.socket :param msg_content: Message content containing the name of the client :type msg_content: dict :return: Name of the registered client :rtype: str """ print(f"[REGISTER] {msg_content['NAME']} is registering...") name = msg_content["NAME"] with self.clients_lock: self.names[client] = name return name
[docs] def disconnect_client(self, client): """ Disconnects the given client from the server :param client: Client connection instance :type client: socket.socket :return: False to indicate disconnection :rtype: bool """ return False
[docs] def join_room(self, client, msg_content): """ Adds the given client to the specified room :param client: Client connection instance :type client: socket.socket :param msg_content: Message content containing the name of the room :type msg_content: dict """ room = msg_content["ROOM"] with self.clients_lock: if room not in self.rooms: self.rooms[room] = set() print(f"[ROOM CREATED] {room}") if client not in self.rooms[room]: self.rooms[room].add(client) print(f"[CLIENT ADDED] {self.names[client]} to {room}")
[docs] def leave_room(self, client, msg_content): """ Removes the given client from the specified room :param client: Client connection instance :type client: socket.socket :param msg_content: Message content containing the name of the room :type msg_content: dict """ room = msg_content["ROOM"] with self.clients_lock: if client in self.rooms[room]: self.rooms[room].remove(client) if len(self.rooms[room]) == 0: del self.rooms[room]
[docs] def broadcast_message(self, client,message_header,message, msg_content): """ Broadcasts the given message to the intended recipients :param client: Client connection instance who sent the message :type client: socket.socket :param message_header: Header of the message :type message_header: bytes :param message: Message data :type message: str :param msg_content: Message content containing the recipient details :type msg_content: dict """ print(f"[BROADCAST] {self.names[client]} sent a message.") if 'SENT_BY' not in msg_content: msg_content['SENT_BY'] = self.names[client] message = json.dumps(msg_content) message_encoded = message.encode(self.format) message_header = f"{len(message_encoded):<{self.header_length}}".encode(self.format) with self.clients_lock: if "TO_ROOM" in msg_content: room = msg_content["TO_ROOM"] for other in self.rooms[room]: if other != client: other.sendall(message_header) other.sendall(message.encode(self.format)) if "TO" in msg_content: name = msg_content["TO"] for other in self.clients: if self.names[other] == name: other.sendall(message_header) other.sendall(message.encode(self.format)) if self.enable_logging: if MESSAGE_TYPE.by_id(msg_content["TYPE"]) in [MESSAGE_TYPE.DATA.FORWARD, MESSAGE_TYPE.DATA.GRADIENT]: self.send_message_log(msg_content)
[docs] def send_message_log(self, msg_content): """ Sends the given message to the logging room :param msg_content: Message content to be logged :type msg_content: dict """ print(f"[LOGGING] {msg_content['SENT_BY']} sent a message.") msg = {'ID': uuid4().hex, 'TO_ROOM': '_logging', 'TYPE': MESSAGE_TYPE.LOG.MESSAGE.id, 'MESSAGE': msg_content} msg = json.dumps(msg) message_encoded = msg.encode(self.format) message_header = f"{len(message_encoded):<{self.header_length}}".encode(self.format) for client in self.rooms['_logging']: client.sendall(message_header + message_encoded)
[docs] def start(self): """ Starts the server and listens for incoming client connections """ print(f"[STARTING] Server is starting...") self.server.listen() while True: client, addr = self.server.accept() with self.clients_lock: self.clients.add(client) thread = threading.Thread(target=self.handle_client, args=(client,addr)) thread.start()