Skip to content

Chat Application with Socket Programming

Abstract

Create a comprehensive real-time chat application using socket programming that supports multiple clients, private messaging, chat rooms, and advanced features. This project demonstrates network programming, concurrent connections, message broadcasting, and building distributed communication systems.

Prerequisites

  • Python 3.7 or above
  • Text Editor or IDE
  • Solid understanding of Python syntax and networking concepts
  • Knowledge of socket programming and TCP/UDP protocols
  • Familiarity with threading and concurrent programming
  • Understanding of client-server architecture
  • Basic knowledge of GUI development with Tkinter

Getting Started

Create a new project

  1. Create a new project folder and name it chatApplicationchatApplication.
  2. Create a new file and name it chatapplication.pychatapplication.py.
  3. Install required dependencies: pip install tkinter threading socketpip install tkinter threading socket
  4. Open the project folder in your favorite text editor or IDE.
  5. Copy the code below and paste it into your chatapplication.pychatapplication.py file.

Write the code

  1. Add the following code to your chatapplication.pychatapplication.py file.
⚙️ Chat Application with Socket Programming
Chat Application with Socket Programming
# Chat Application with Socket Programming
 
import socket
import threading
import json
import time
import hashlib
import datetime
from typing import Dict, List, Optional, Tuple
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox, simpledialog
import queue
import logging
 
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
 
class ChatMessage:
    """Represents a chat message"""
    def __init__(self, username: str, content: str, message_type: str = "message", 
                 timestamp: Optional[datetime.datetime] = None):
        self.username = username
        self.content = content
        self.message_type = message_type  # message, join, leave, system
        self.timestamp = timestamp or datetime.datetime.now()
        self.id = hashlib.md5(f"{username}{content}{self.timestamp}".encode()).hexdigest()[:8]
    
    def to_dict(self) -> dict:
        """Convert message to dictionary"""
        return {
            'id': self.id,
            'username': self.username,
            'content': self.content,
            'type': self.message_type,
            'timestamp': self.timestamp.isoformat()
        }
    
    @classmethod
    def from_dict(cls, data: dict) -> 'ChatMessage':
        """Create message from dictionary"""
        timestamp = datetime.datetime.fromisoformat(data['timestamp'])
        msg = cls(data['username'], data['content'], data['type'], timestamp)
        msg.id = data['id']
        return msg
 
class ChatServer:
    """Chat server handling multiple clients"""
    
    def __init__(self, host: str = 'localhost', port: int = 12345):
        self.host = host
        self.port = port
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        
        self.clients: Dict[socket.socket, dict] = {}
        self.message_history: List[ChatMessage] = []
        self.running = False
        self.max_history = 100
        
        # Room management
        self.rooms: Dict[str, List[socket.socket]] = {'general': []}
        self.default_room = 'general'
    
    def start(self):
        """Start the chat server"""
        try:
            self.socket.bind((self.host, self.port))
            self.socket.listen(5)
            self.running = True
            
            logger.info(f"Chat server started on {self.host}:{self.port}")
            
            while self.running:
                try:
                    client_socket, address = self.socket.accept()
                    logger.info(f"New connection from {address}")
                    
                    # Start client handler thread
                    client_thread = threading.Thread(
                        target=self.handle_client,
                        args=(client_socket, address),
                        daemon=True
                    )
                    client_thread.start()
                    
                except OSError:
                    if self.running:
                        logger.error("Error accepting connections")
                    break
                    
        except Exception as e:
            logger.error(f"Server error: {e}")
        finally:
            self.stop()
    
    def handle_client(self, client_socket: socket.socket, address: Tuple[str, int]):
        """Handle individual client connection"""
        username = None
        
        try:
            # Initial handshake - get username
            client_socket.send(json.dumps({
                'type': 'handshake',
                'message': 'Please provide username'
            }).encode('utf-8'))
            
            # Receive username
            data = client_socket.recv(1024).decode('utf-8')
            handshake_data = json.loads(data)
            username = handshake_data.get('username', f'User_{address[1]}')
            
            # Check if username is already taken
            while any(client['username'] == username for client in self.clients.values()):
                username = f"{username}_{len(self.clients)}"
            
            # Add client to clients list
            self.clients[client_socket] = {
                'username': username,
                'address': address,
                'room': self.default_room,
                'joined_at': datetime.datetime.now()
            }
            
            # Add to default room
            self.rooms[self.default_room].append(client_socket)
            
            # Send welcome message and recent history
            welcome_data = {
                'type': 'welcome',
                'username': username,
                'room': self.default_room,
                'history': [msg.to_dict() for msg in self.message_history[-20:]]
            }
            client_socket.send(json.dumps(welcome_data).encode('utf-8'))
            
            # Broadcast join message
            join_message = ChatMessage(username, f"{username} joined the chat", "join")
            self.broadcast_message(join_message, exclude=client_socket)
            self.add_to_history(join_message)
            
            # Handle client messages
            while self.running:
                try:
                    data = client_socket.recv(1024).decode('utf-8')
                    if not data:
                        break
                    
                    message_data = json.loads(data)
                    self.process_message(client_socket, message_data)
                    
                except json.JSONDecodeError:
                    logger.warning(f"Invalid JSON from {username}")
                except Exception as e:
                    logger.error(f"Error handling message from {username}: {e}")
                    break
        
        except Exception as e:
            logger.error(f"Error in client handler for {address}: {e}")
        
        finally:
            self.disconnect_client(client_socket, username)
    
    def process_message(self, client_socket: socket.socket, message_data: dict):
        """Process incoming message from client"""
        client_info = self.clients.get(client_socket)
        if not client_info:
            return
        
        username = client_info['username']
        message_type = message_data.get('type', 'message')
        content = message_data.get('content', '')
        
        if message_type == 'message':
            # Regular chat message
            if content.strip():
                chat_message = ChatMessage(username, content, "message")
                self.broadcast_message(chat_message, room=client_info['room'])
                self.add_to_history(chat_message)
        
        elif message_type == 'command':
            # Handle chat commands
            self.handle_command(client_socket, content)
        
        elif message_type == 'private':
            # Private message
            target_username = message_data.get('target')
            self.send_private_message(client_socket, target_username, content)
    
    def handle_command(self, client_socket: socket.socket, command: str):
        """Handle chat commands"""
        client_info = self.clients[client_socket]
        username = client_info['username']
        
        parts = command.strip().split()
        if not parts:
            return
        
        cmd = parts[0].lower()
        
        if cmd == '/help':
            help_text = """
Available commands:
/help - Show this help
/users - List online users
/rooms - List available rooms
/join <room> - Join a room
/create <room> - Create a new room
/pm <username> <message> - Send private message
/time - Show current server time
"""
            self.send_system_message(client_socket, help_text)
        
        elif cmd == '/users':
            users = [info['username'] for info in self.clients.values()]
            user_list = f"Online users ({len(users)}): {', '.join(users)}"
            self.send_system_message(client_socket, user_list)
        
        elif cmd == '/rooms':
            room_info = []
            for room, clients in self.rooms.items():
                room_info.append(f"{room} ({len(clients)} users)")
            room_list = f"Available rooms: {', '.join(room_info)}"
            self.send_system_message(client_socket, room_list)
        
        elif cmd == '/join' and len(parts) > 1:
            new_room = parts[1]
            self.move_client_to_room(client_socket, new_room)
        
        elif cmd == '/create' and len(parts) > 1:
            new_room = parts[1]
            if new_room not in self.rooms:
                self.rooms[new_room] = []
                self.send_system_message(client_socket, f"Room '{new_room}' created")
                self.move_client_to_room(client_socket, new_room)
            else:
                self.send_system_message(client_socket, f"Room '{new_room}' already exists")
        
        elif cmd == '/pm' and len(parts) > 2:
            target_username = parts[1]
            message_content = ' '.join(parts[2:])
            self.send_private_message(client_socket, target_username, message_content)
        
        elif cmd == '/time':
            current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            self.send_system_message(client_socket, f"Server time: {current_time}")
        
        else:
            self.send_system_message(client_socket, "Unknown command. Type /help for available commands.")
    
    def move_client_to_room(self, client_socket: socket.socket, room_name: str):
        """Move client to a different room"""
        client_info = self.clients[client_socket]
        username = client_info['username']
        old_room = client_info['room']
        
        # Remove from old room
        if client_socket in self.rooms[old_room]:
            self.rooms[old_room].remove(client_socket)
        
        # Add to new room
        if room_name not in self.rooms:
            self.rooms[room_name] = []
        
        self.rooms[room_name].append(client_socket)
        client_info['room'] = room_name
        
        # Notify client
        self.send_system_message(client_socket, f"Moved to room '{room_name}'")
        
        # Notify rooms
        leave_msg = ChatMessage(username, f"{username} left the room", "leave")
        self.broadcast_message(leave_msg, room=old_room, exclude=client_socket)
        
        join_msg = ChatMessage(username, f"{username} joined the room", "join")
        self.broadcast_message(join_msg, room=room_name, exclude=client_socket)
    
    def send_private_message(self, sender_socket: socket.socket, target_username: str, content: str):
        """Send private message between users"""
        sender_info = self.clients[sender_socket]
        sender_username = sender_info['username']
        
        # Find target client
        target_socket = None
        for client_socket, info in self.clients.items():
            if info['username'] == target_username:
                target_socket = client_socket
                break
        
        if target_socket:
            # Send to target
            pm_data = {
                'type': 'private',
                'from': sender_username,
                'content': content,
                'timestamp': datetime.datetime.now().isoformat()
            }
            target_socket.send(json.dumps(pm_data).encode('utf-8'))
            
            # Confirm to sender
            self.send_system_message(sender_socket, f"Private message sent to {target_username}")
        else:
            self.send_system_message(sender_socket, f"User '{target_username}' not found")
    
    def send_system_message(self, client_socket: socket.socket, content: str):
        """Send system message to specific client"""
        system_data = {
            'type': 'system',
            'content': content,
            'timestamp': datetime.datetime.now().isoformat()
        }
        try:
            client_socket.send(json.dumps(system_data).encode('utf-8'))
        except:
            pass
    
    def broadcast_message(self, message: ChatMessage, room: str = None, exclude: socket.socket = None):
        """Broadcast message to clients"""
        message_data = json.dumps(message.to_dict()).encode('utf-8')
        
        if room:
            # Broadcast to specific room
            clients_to_send = self.rooms.get(room, [])
        else:
            # Broadcast to all clients
            clients_to_send = list(self.clients.keys())
        
        for client_socket in clients_to_send[:]:  # Copy list to avoid modification during iteration
            if client_socket != exclude:
                try:
                    client_socket.send(message_data)
                except:
                    # Client disconnected, remove from lists
                    self.disconnect_client(client_socket)
    
    def add_to_history(self, message: ChatMessage):
        """Add message to history"""
        self.message_history.append(message)
        if len(self.message_history) > self.max_history:
            self.message_history = self.message_history[-self.max_history:]
    
    def disconnect_client(self, client_socket: socket.socket, username: str = None):
        """Handle client disconnection"""
        if client_socket in self.clients:
            client_info = self.clients[client_socket]
            username = username or client_info['username']
            room = client_info['room']
            
            # Remove from room
            if room in self.rooms and client_socket in self.rooms[room]:
                self.rooms[room].remove(client_socket)
            
            # Remove from clients
            del self.clients[client_socket]
            
            # Broadcast leave message
            leave_message = ChatMessage(username, f"{username} left the chat", "leave")
            self.broadcast_message(leave_message, room=room, exclude=client_socket)
            self.add_to_history(leave_message)
            
            logger.info(f"Client {username} disconnected")
        
        try:
            client_socket.close()
        except:
            pass
    
    def stop(self):
        """Stop the server"""
        self.running = False
        
        # Close all client connections
        for client_socket in list(self.clients.keys()):
            try:
                client_socket.close()
            except:
                pass
        
        # Close server socket
        try:
            self.socket.close()
        except:
            pass
        
        logger.info("Chat server stopped")
 
class ChatClient:
    """Chat client for connecting to server"""
    
    def __init__(self, username: str, host: str = 'localhost', port: int = 12345):
        self.username = username
        self.host = host
        self.port = port
        self.socket = None
        self.connected = False
        self.message_queue = queue.Queue()
        
    def connect(self) -> bool:
        """Connect to chat server"""
        try:
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.socket.connect((self.host, self.port))
            
            # Wait for handshake
            data = self.socket.recv(1024).decode('utf-8')
            handshake = json.loads(data)
            
            if handshake['type'] == 'handshake':
                # Send username
                response = {'username': self.username}
                self.socket.send(json.dumps(response).encode('utf-8'))
                
                # Wait for welcome
                data = self.socket.recv(4096).decode('utf-8')
                welcome = json.loads(data)
                
                if welcome['type'] == 'welcome':
                    self.connected = True
                    self.username = welcome['username']  # May have been modified
                    
                    # Start message listener thread
                    listener_thread = threading.Thread(target=self.listen_for_messages, daemon=True)
                    listener_thread.start()
                    
                    return True
            
            return False
            
        except Exception as e:
            logger.error(f"Connection error: {e}")
            return False
    
    def listen_for_messages(self):
        """Listen for incoming messages"""
        while self.connected:
            try:
                data = self.socket.recv(4096).decode('utf-8')
                if not data:
                    break
                
                message_data = json.loads(data)
                self.message_queue.put(message_data)
                
            except Exception as e:
                if self.connected:
                    logger.error(f"Error receiving message: {e}")
                break
        
        self.connected = False
    
    def send_message(self, content: str):
        """Send chat message"""
        if not self.connected:
            return False
        
        try:
            message_data = {
                'type': 'message',
                'content': content
            }
            self.socket.send(json.dumps(message_data).encode('utf-8'))
            return True
        except Exception as e:
            logger.error(f"Error sending message: {e}")
            return False
    
    def send_command(self, command: str):
        """Send chat command"""
        if not self.connected:
            return False
        
        try:
            command_data = {
                'type': 'command',
                'content': command
            }
            self.socket.send(json.dumps(command_data).encode('utf-8'))
            return True
        except Exception as e:
            logger.error(f"Error sending command: {e}")
            return False
    
    def disconnect(self):
        """Disconnect from server"""
        self.connected = False
        if self.socket:
            try:
                self.socket.close()
            except:
                pass
 
class ChatGUI:
    """GUI for chat client"""
    
    def __init__(self):
        self.client = None
        self.root = tk.Tk()
        self.root.title("Chat Application")
        self.root.geometry("800x600")
        
        self.setup_ui()
        self.update_messages_thread = None
        
    def setup_ui(self):
        """Setup the user interface"""
        # Main frame
        main_frame = ttk.Frame(self.root, padding="10")
        main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
        
        # Configure grid weights
        self.root.columnconfigure(0, weight=1)
        self.root.rowconfigure(0, weight=1)
        main_frame.columnconfigure(0, weight=1)
        main_frame.rowconfigure(1, weight=1)
        
        # Connection frame
        conn_frame = ttk.LabelFrame(main_frame, text="Connection", padding="5")
        conn_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=(0, 10))
        conn_frame.columnconfigure(1, weight=1)
        
        # Connection controls
        ttk.Label(conn_frame, text="Username:").grid(row=0, column=0, sticky=tk.W)
        self.username_var = tk.StringVar(value="User")
        ttk.Entry(conn_frame, textvariable=self.username_var, width=15).grid(row=0, column=1, padx=(5, 10), sticky=tk.W)
        
        ttk.Label(conn_frame, text="Host:").grid(row=0, column=2, sticky=tk.W)
        self.host_var = tk.StringVar(value="localhost")
        ttk.Entry(conn_frame, textvariable=self.host_var, width=15).grid(row=0, column=3, padx=(5, 10), sticky=tk.W)
        
        ttk.Label(conn_frame, text="Port:").grid(row=0, column=4, sticky=tk.W)
        self.port_var = tk.StringVar(value="12345")
        ttk.Entry(conn_frame, textvariable=self.port_var, width=8).grid(row=0, column=5, padx=(5, 10), sticky=tk.W)
        
        self.connect_button = ttk.Button(conn_frame, text="Connect", command=self.connect_to_server)
        self.connect_button.grid(row=0, column=6, padx=(5, 0))
        
        self.disconnect_button = ttk.Button(conn_frame, text="Disconnect", command=self.disconnect_from_server, state=tk.DISABLED)
        self.disconnect_button.grid(row=0, column=7, padx=(5, 0))
        
        # Status
        self.status_var = tk.StringVar(value="Not connected")
        status_label = ttk.Label(conn_frame, textvariable=self.status_var, foreground="red")
        status_label.grid(row=1, column=0, columnspan=8, sticky=tk.W, pady=(5, 0))
        
        # Chat frame
        chat_frame = ttk.LabelFrame(main_frame, text="Chat", padding="5")
        chat_frame.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
        chat_frame.columnconfigure(0, weight=1)
        chat_frame.rowconfigure(0, weight=1)
        
        # Messages display
        self.messages_display = scrolledtext.ScrolledText(chat_frame, state=tk.DISABLED, height=20)
        self.messages_display.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10))
        
        # Message input frame
        input_frame = ttk.Frame(chat_frame)
        input_frame.grid(row=1, column=0, sticky=(tk.W, tk.E))
        input_frame.columnconfigure(0, weight=1)
        
        # Message input
        self.message_var = tk.StringVar()
        self.message_entry = ttk.Entry(input_frame, textvariable=self.message_var)
        self.message_entry.grid(row=0, column=0, sticky=(tk.W, tk.E), padx=(0, 10))
        self.message_entry.bind('<Return>', self.send_message_event)
        
        # Send button
        self.send_button = ttk.Button(input_frame, text="Send", command=self.send_message, state=tk.DISABLED)
        self.send_button.grid(row=0, column=1)
        
        # Help text
        help_frame = ttk.Frame(main_frame)
        help_frame.grid(row=2, column=0, sticky=(tk.W, tk.E), pady=(10, 0))
        
        help_text = "Commands: /help, /users, /rooms, /join <room>, /create <room>, /pm <user> <msg>, /time"
        ttk.Label(help_frame, text=help_text, font=("TkDefaultFont", 8)).pack()
    
    def connect_to_server(self):
        """Connect to chat server"""
        username = self.username_var.get().strip()
        host = self.host_var.get().strip()
        
        try:
            port = int(self.port_var.get())
        except ValueError:
            messagebox.showerror("Error", "Invalid port number")
            return
        
        if not username:
            messagebox.showerror("Error", "Please enter a username")
            return
        
        # Create client and connect
        self.client = ChatClient(username, host, port)
        
        if self.client.connect():
            self.status_var.set(f"Connected as {self.client.username}")
            self.status_var.set(f"Connected as {self.client.username}")
            
            # Update UI state
            self.connect_button.config(state=tk.DISABLED)
            self.disconnect_button.config(state=tk.NORMAL)
            self.send_button.config(state=tk.NORMAL)
            self.message_entry.config(state=tk.NORMAL)
            
            # Start message update thread
            self.update_messages_thread = threading.Thread(target=self.update_messages, daemon=True)
            self.update_messages_thread.start()
            
            # Clear and focus message entry
            self.message_entry.focus()
            
        else:
            messagebox.showerror("Error", "Failed to connect to server")
            self.client = None
    
    def disconnect_from_server(self):
        """Disconnect from chat server"""
        if self.client:
            self.client.disconnect()
            self.client = None
        
        # Update UI state
        self.status_var.set("Not connected")
        self.connect_button.config(state=tk.NORMAL)
        self.disconnect_button.config(state=tk.DISABLED)
        self.send_button.config(state=tk.DISABLED)
        self.message_entry.config(state=tk.DISABLED)
    
    def send_message_event(self, event):
        """Handle Enter key press"""
        self.send_message()
    
    def send_message(self):
        """Send message or command"""
        if not self.client or not self.client.connected:
            return
        
        content = self.message_var.get().strip()
        if not content:
            return
        
        if content.startswith('/'):
            # Send as command
            self.client.send_command(content)
        else:
            # Send as message
            self.client.send_message(content)
        
        # Clear input
        self.message_var.set("")
    
    def update_messages(self):
        """Update messages from server"""
        while self.client and self.client.connected:
            try:
                # Get message from queue (blocking with timeout)
                message_data = self.client.message_queue.get(timeout=1.0)
                
                # Update UI in main thread
                self.root.after(0, self.display_message, message_data)
                
            except queue.Empty:
                continue
            except Exception as e:
                logger.error(f"Error updating messages: {e}")
                break
    
    def display_message(self, message_data: dict):
        """Display message in chat window"""
        self.messages_display.config(state=tk.NORMAL)
        
        message_type = message_data.get('type', 'message')
        
        if message_type == 'welcome':
            # Display welcome message and history
            username = message_data['username']
            room = message_data['room']
            self.messages_display.insert(tk.END, f"=== Welcome {username} to room '{room}' ===\n", 'system')
            
            # Display history
            history = message_data.get('history', [])
            for msg_dict in history:
                self.format_and_insert_message(msg_dict)
        
        elif message_type == 'system':
            # System message
            content = message_data['content']
            timestamp = datetime.datetime.now().strftime("%H:%M:%S")
            self.messages_display.insert(tk.END, f"[{timestamp}] SYSTEM: {content}\n", 'system')
        
        elif message_type == 'private':
            # Private message
            from_user = message_data['from']
            content = message_data['content']
            timestamp = datetime.datetime.fromisoformat(message_data['timestamp']).strftime("%H:%M:%S")
            self.messages_display.insert(tk.END, f"[{timestamp}] PRIVATE from {from_user}: {content}\n", 'private')
        
        else:
            # Regular message
            self.format_and_insert_message(message_data)
        
        # Configure tags for styling
        self.messages_display.tag_config('system', foreground='blue')
        self.messages_display.tag_config('private', foreground='purple')
        self.messages_display.tag_config('join', foreground='green')
        self.messages_display.tag_config('leave', foreground='red')
        
        # Scroll to bottom
        self.messages_display.see(tk.END)
        self.messages_display.config(state=tk.DISABLED)
    
    def format_and_insert_message(self, message_data: dict):
        """Format and insert a chat message"""
        username = message_data['username']
        content = message_data['content']
        msg_type = message_data.get('type', 'message')
        timestamp = datetime.datetime.fromisoformat(message_data['timestamp']).strftime("%H:%M:%S")
        
        if msg_type == 'join':
            self.messages_display.insert(tk.END, f"[{timestamp}] {content}\n", 'join')
        elif msg_type == 'leave':
            self.messages_display.insert(tk.END, f"[{timestamp}] {content}\n", 'leave')
        else:
            self.messages_display.insert(tk.END, f"[{timestamp}] {username}: {content}\n")
    
    def run(self):
        """Run the GUI application"""
        self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
        self.root.mainloop()
    
    def on_closing(self):
        """Handle window closing"""
        if self.client:
            self.client.disconnect()
        self.root.destroy()
 
def start_server():
    """Start chat server"""
    print("Starting chat server...")
    server = ChatServer()
    try:
        server.start()
    except KeyboardInterrupt:
        print("\nShutting down server...")
        server.stop()
 
def start_client():
    """Start chat client GUI"""
    app = ChatGUI()
    app.run()
 
def main():
    """Main function"""
    import sys
    
    if len(sys.argv) > 1:
        if sys.argv[1] == 'server':
            start_server()
        elif sys.argv[1] == 'client':
            start_client()
        else:
            print("Usage: python chatapp.py [server|client]")
    else:
        # Default to client
        start_client()
 
if __name__ == "__main__":
    main()
 
Chat Application with Socket Programming
# Chat Application with Socket Programming
 
import socket
import threading
import json
import time
import hashlib
import datetime
from typing import Dict, List, Optional, Tuple
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox, simpledialog
import queue
import logging
 
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
 
class ChatMessage:
    """Represents a chat message"""
    def __init__(self, username: str, content: str, message_type: str = "message", 
                 timestamp: Optional[datetime.datetime] = None):
        self.username = username
        self.content = content
        self.message_type = message_type  # message, join, leave, system
        self.timestamp = timestamp or datetime.datetime.now()
        self.id = hashlib.md5(f"{username}{content}{self.timestamp}".encode()).hexdigest()[:8]
    
    def to_dict(self) -> dict:
        """Convert message to dictionary"""
        return {
            'id': self.id,
            'username': self.username,
            'content': self.content,
            'type': self.message_type,
            'timestamp': self.timestamp.isoformat()
        }
    
    @classmethod
    def from_dict(cls, data: dict) -> 'ChatMessage':
        """Create message from dictionary"""
        timestamp = datetime.datetime.fromisoformat(data['timestamp'])
        msg = cls(data['username'], data['content'], data['type'], timestamp)
        msg.id = data['id']
        return msg
 
class ChatServer:
    """Chat server handling multiple clients"""
    
    def __init__(self, host: str = 'localhost', port: int = 12345):
        self.host = host
        self.port = port
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        
        self.clients: Dict[socket.socket, dict] = {}
        self.message_history: List[ChatMessage] = []
        self.running = False
        self.max_history = 100
        
        # Room management
        self.rooms: Dict[str, List[socket.socket]] = {'general': []}
        self.default_room = 'general'
    
    def start(self):
        """Start the chat server"""
        try:
            self.socket.bind((self.host, self.port))
            self.socket.listen(5)
            self.running = True
            
            logger.info(f"Chat server started on {self.host}:{self.port}")
            
            while self.running:
                try:
                    client_socket, address = self.socket.accept()
                    logger.info(f"New connection from {address}")
                    
                    # Start client handler thread
                    client_thread = threading.Thread(
                        target=self.handle_client,
                        args=(client_socket, address),
                        daemon=True
                    )
                    client_thread.start()
                    
                except OSError:
                    if self.running:
                        logger.error("Error accepting connections")
                    break
                    
        except Exception as e:
            logger.error(f"Server error: {e}")
        finally:
            self.stop()
    
    def handle_client(self, client_socket: socket.socket, address: Tuple[str, int]):
        """Handle individual client connection"""
        username = None
        
        try:
            # Initial handshake - get username
            client_socket.send(json.dumps({
                'type': 'handshake',
                'message': 'Please provide username'
            }).encode('utf-8'))
            
            # Receive username
            data = client_socket.recv(1024).decode('utf-8')
            handshake_data = json.loads(data)
            username = handshake_data.get('username', f'User_{address[1]}')
            
            # Check if username is already taken
            while any(client['username'] == username for client in self.clients.values()):
                username = f"{username}_{len(self.clients)}"
            
            # Add client to clients list
            self.clients[client_socket] = {
                'username': username,
                'address': address,
                'room': self.default_room,
                'joined_at': datetime.datetime.now()
            }
            
            # Add to default room
            self.rooms[self.default_room].append(client_socket)
            
            # Send welcome message and recent history
            welcome_data = {
                'type': 'welcome',
                'username': username,
                'room': self.default_room,
                'history': [msg.to_dict() for msg in self.message_history[-20:]]
            }
            client_socket.send(json.dumps(welcome_data).encode('utf-8'))
            
            # Broadcast join message
            join_message = ChatMessage(username, f"{username} joined the chat", "join")
            self.broadcast_message(join_message, exclude=client_socket)
            self.add_to_history(join_message)
            
            # Handle client messages
            while self.running:
                try:
                    data = client_socket.recv(1024).decode('utf-8')
                    if not data:
                        break
                    
                    message_data = json.loads(data)
                    self.process_message(client_socket, message_data)
                    
                except json.JSONDecodeError:
                    logger.warning(f"Invalid JSON from {username}")
                except Exception as e:
                    logger.error(f"Error handling message from {username}: {e}")
                    break
        
        except Exception as e:
            logger.error(f"Error in client handler for {address}: {e}")
        
        finally:
            self.disconnect_client(client_socket, username)
    
    def process_message(self, client_socket: socket.socket, message_data: dict):
        """Process incoming message from client"""
        client_info = self.clients.get(client_socket)
        if not client_info:
            return
        
        username = client_info['username']
        message_type = message_data.get('type', 'message')
        content = message_data.get('content', '')
        
        if message_type == 'message':
            # Regular chat message
            if content.strip():
                chat_message = ChatMessage(username, content, "message")
                self.broadcast_message(chat_message, room=client_info['room'])
                self.add_to_history(chat_message)
        
        elif message_type == 'command':
            # Handle chat commands
            self.handle_command(client_socket, content)
        
        elif message_type == 'private':
            # Private message
            target_username = message_data.get('target')
            self.send_private_message(client_socket, target_username, content)
    
    def handle_command(self, client_socket: socket.socket, command: str):
        """Handle chat commands"""
        client_info = self.clients[client_socket]
        username = client_info['username']
        
        parts = command.strip().split()
        if not parts:
            return
        
        cmd = parts[0].lower()
        
        if cmd == '/help':
            help_text = """
Available commands:
/help - Show this help
/users - List online users
/rooms - List available rooms
/join <room> - Join a room
/create <room> - Create a new room
/pm <username> <message> - Send private message
/time - Show current server time
"""
            self.send_system_message(client_socket, help_text)
        
        elif cmd == '/users':
            users = [info['username'] for info in self.clients.values()]
            user_list = f"Online users ({len(users)}): {', '.join(users)}"
            self.send_system_message(client_socket, user_list)
        
        elif cmd == '/rooms':
            room_info = []
            for room, clients in self.rooms.items():
                room_info.append(f"{room} ({len(clients)} users)")
            room_list = f"Available rooms: {', '.join(room_info)}"
            self.send_system_message(client_socket, room_list)
        
        elif cmd == '/join' and len(parts) > 1:
            new_room = parts[1]
            self.move_client_to_room(client_socket, new_room)
        
        elif cmd == '/create' and len(parts) > 1:
            new_room = parts[1]
            if new_room not in self.rooms:
                self.rooms[new_room] = []
                self.send_system_message(client_socket, f"Room '{new_room}' created")
                self.move_client_to_room(client_socket, new_room)
            else:
                self.send_system_message(client_socket, f"Room '{new_room}' already exists")
        
        elif cmd == '/pm' and len(parts) > 2:
            target_username = parts[1]
            message_content = ' '.join(parts[2:])
            self.send_private_message(client_socket, target_username, message_content)
        
        elif cmd == '/time':
            current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            self.send_system_message(client_socket, f"Server time: {current_time}")
        
        else:
            self.send_system_message(client_socket, "Unknown command. Type /help for available commands.")
    
    def move_client_to_room(self, client_socket: socket.socket, room_name: str):
        """Move client to a different room"""
        client_info = self.clients[client_socket]
        username = client_info['username']
        old_room = client_info['room']
        
        # Remove from old room
        if client_socket in self.rooms[old_room]:
            self.rooms[old_room].remove(client_socket)
        
        # Add to new room
        if room_name not in self.rooms:
            self.rooms[room_name] = []
        
        self.rooms[room_name].append(client_socket)
        client_info['room'] = room_name
        
        # Notify client
        self.send_system_message(client_socket, f"Moved to room '{room_name}'")
        
        # Notify rooms
        leave_msg = ChatMessage(username, f"{username} left the room", "leave")
        self.broadcast_message(leave_msg, room=old_room, exclude=client_socket)
        
        join_msg = ChatMessage(username, f"{username} joined the room", "join")
        self.broadcast_message(join_msg, room=room_name, exclude=client_socket)
    
    def send_private_message(self, sender_socket: socket.socket, target_username: str, content: str):
        """Send private message between users"""
        sender_info = self.clients[sender_socket]
        sender_username = sender_info['username']
        
        # Find target client
        target_socket = None
        for client_socket, info in self.clients.items():
            if info['username'] == target_username:
                target_socket = client_socket
                break
        
        if target_socket:
            # Send to target
            pm_data = {
                'type': 'private',
                'from': sender_username,
                'content': content,
                'timestamp': datetime.datetime.now().isoformat()
            }
            target_socket.send(json.dumps(pm_data).encode('utf-8'))
            
            # Confirm to sender
            self.send_system_message(sender_socket, f"Private message sent to {target_username}")
        else:
            self.send_system_message(sender_socket, f"User '{target_username}' not found")
    
    def send_system_message(self, client_socket: socket.socket, content: str):
        """Send system message to specific client"""
        system_data = {
            'type': 'system',
            'content': content,
            'timestamp': datetime.datetime.now().isoformat()
        }
        try:
            client_socket.send(json.dumps(system_data).encode('utf-8'))
        except:
            pass
    
    def broadcast_message(self, message: ChatMessage, room: str = None, exclude: socket.socket = None):
        """Broadcast message to clients"""
        message_data = json.dumps(message.to_dict()).encode('utf-8')
        
        if room:
            # Broadcast to specific room
            clients_to_send = self.rooms.get(room, [])
        else:
            # Broadcast to all clients
            clients_to_send = list(self.clients.keys())
        
        for client_socket in clients_to_send[:]:  # Copy list to avoid modification during iteration
            if client_socket != exclude:
                try:
                    client_socket.send(message_data)
                except:
                    # Client disconnected, remove from lists
                    self.disconnect_client(client_socket)
    
    def add_to_history(self, message: ChatMessage):
        """Add message to history"""
        self.message_history.append(message)
        if len(self.message_history) > self.max_history:
            self.message_history = self.message_history[-self.max_history:]
    
    def disconnect_client(self, client_socket: socket.socket, username: str = None):
        """Handle client disconnection"""
        if client_socket in self.clients:
            client_info = self.clients[client_socket]
            username = username or client_info['username']
            room = client_info['room']
            
            # Remove from room
            if room in self.rooms and client_socket in self.rooms[room]:
                self.rooms[room].remove(client_socket)
            
            # Remove from clients
            del self.clients[client_socket]
            
            # Broadcast leave message
            leave_message = ChatMessage(username, f"{username} left the chat", "leave")
            self.broadcast_message(leave_message, room=room, exclude=client_socket)
            self.add_to_history(leave_message)
            
            logger.info(f"Client {username} disconnected")
        
        try:
            client_socket.close()
        except:
            pass
    
    def stop(self):
        """Stop the server"""
        self.running = False
        
        # Close all client connections
        for client_socket in list(self.clients.keys()):
            try:
                client_socket.close()
            except:
                pass
        
        # Close server socket
        try:
            self.socket.close()
        except:
            pass
        
        logger.info("Chat server stopped")
 
class ChatClient:
    """Chat client for connecting to server"""
    
    def __init__(self, username: str, host: str = 'localhost', port: int = 12345):
        self.username = username
        self.host = host
        self.port = port
        self.socket = None
        self.connected = False
        self.message_queue = queue.Queue()
        
    def connect(self) -> bool:
        """Connect to chat server"""
        try:
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.socket.connect((self.host, self.port))
            
            # Wait for handshake
            data = self.socket.recv(1024).decode('utf-8')
            handshake = json.loads(data)
            
            if handshake['type'] == 'handshake':
                # Send username
                response = {'username': self.username}
                self.socket.send(json.dumps(response).encode('utf-8'))
                
                # Wait for welcome
                data = self.socket.recv(4096).decode('utf-8')
                welcome = json.loads(data)
                
                if welcome['type'] == 'welcome':
                    self.connected = True
                    self.username = welcome['username']  # May have been modified
                    
                    # Start message listener thread
                    listener_thread = threading.Thread(target=self.listen_for_messages, daemon=True)
                    listener_thread.start()
                    
                    return True
            
            return False
            
        except Exception as e:
            logger.error(f"Connection error: {e}")
            return False
    
    def listen_for_messages(self):
        """Listen for incoming messages"""
        while self.connected:
            try:
                data = self.socket.recv(4096).decode('utf-8')
                if not data:
                    break
                
                message_data = json.loads(data)
                self.message_queue.put(message_data)
                
            except Exception as e:
                if self.connected:
                    logger.error(f"Error receiving message: {e}")
                break
        
        self.connected = False
    
    def send_message(self, content: str):
        """Send chat message"""
        if not self.connected:
            return False
        
        try:
            message_data = {
                'type': 'message',
                'content': content
            }
            self.socket.send(json.dumps(message_data).encode('utf-8'))
            return True
        except Exception as e:
            logger.error(f"Error sending message: {e}")
            return False
    
    def send_command(self, command: str):
        """Send chat command"""
        if not self.connected:
            return False
        
        try:
            command_data = {
                'type': 'command',
                'content': command
            }
            self.socket.send(json.dumps(command_data).encode('utf-8'))
            return True
        except Exception as e:
            logger.error(f"Error sending command: {e}")
            return False
    
    def disconnect(self):
        """Disconnect from server"""
        self.connected = False
        if self.socket:
            try:
                self.socket.close()
            except:
                pass
 
class ChatGUI:
    """GUI for chat client"""
    
    def __init__(self):
        self.client = None
        self.root = tk.Tk()
        self.root.title("Chat Application")
        self.root.geometry("800x600")
        
        self.setup_ui()
        self.update_messages_thread = None
        
    def setup_ui(self):
        """Setup the user interface"""
        # Main frame
        main_frame = ttk.Frame(self.root, padding="10")
        main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
        
        # Configure grid weights
        self.root.columnconfigure(0, weight=1)
        self.root.rowconfigure(0, weight=1)
        main_frame.columnconfigure(0, weight=1)
        main_frame.rowconfigure(1, weight=1)
        
        # Connection frame
        conn_frame = ttk.LabelFrame(main_frame, text="Connection", padding="5")
        conn_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=(0, 10))
        conn_frame.columnconfigure(1, weight=1)
        
        # Connection controls
        ttk.Label(conn_frame, text="Username:").grid(row=0, column=0, sticky=tk.W)
        self.username_var = tk.StringVar(value="User")
        ttk.Entry(conn_frame, textvariable=self.username_var, width=15).grid(row=0, column=1, padx=(5, 10), sticky=tk.W)
        
        ttk.Label(conn_frame, text="Host:").grid(row=0, column=2, sticky=tk.W)
        self.host_var = tk.StringVar(value="localhost")
        ttk.Entry(conn_frame, textvariable=self.host_var, width=15).grid(row=0, column=3, padx=(5, 10), sticky=tk.W)
        
        ttk.Label(conn_frame, text="Port:").grid(row=0, column=4, sticky=tk.W)
        self.port_var = tk.StringVar(value="12345")
        ttk.Entry(conn_frame, textvariable=self.port_var, width=8).grid(row=0, column=5, padx=(5, 10), sticky=tk.W)
        
        self.connect_button = ttk.Button(conn_frame, text="Connect", command=self.connect_to_server)
        self.connect_button.grid(row=0, column=6, padx=(5, 0))
        
        self.disconnect_button = ttk.Button(conn_frame, text="Disconnect", command=self.disconnect_from_server, state=tk.DISABLED)
        self.disconnect_button.grid(row=0, column=7, padx=(5, 0))
        
        # Status
        self.status_var = tk.StringVar(value="Not connected")
        status_label = ttk.Label(conn_frame, textvariable=self.status_var, foreground="red")
        status_label.grid(row=1, column=0, columnspan=8, sticky=tk.W, pady=(5, 0))
        
        # Chat frame
        chat_frame = ttk.LabelFrame(main_frame, text="Chat", padding="5")
        chat_frame.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
        chat_frame.columnconfigure(0, weight=1)
        chat_frame.rowconfigure(0, weight=1)
        
        # Messages display
        self.messages_display = scrolledtext.ScrolledText(chat_frame, state=tk.DISABLED, height=20)
        self.messages_display.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10))
        
        # Message input frame
        input_frame = ttk.Frame(chat_frame)
        input_frame.grid(row=1, column=0, sticky=(tk.W, tk.E))
        input_frame.columnconfigure(0, weight=1)
        
        # Message input
        self.message_var = tk.StringVar()
        self.message_entry = ttk.Entry(input_frame, textvariable=self.message_var)
        self.message_entry.grid(row=0, column=0, sticky=(tk.W, tk.E), padx=(0, 10))
        self.message_entry.bind('<Return>', self.send_message_event)
        
        # Send button
        self.send_button = ttk.Button(input_frame, text="Send", command=self.send_message, state=tk.DISABLED)
        self.send_button.grid(row=0, column=1)
        
        # Help text
        help_frame = ttk.Frame(main_frame)
        help_frame.grid(row=2, column=0, sticky=(tk.W, tk.E), pady=(10, 0))
        
        help_text = "Commands: /help, /users, /rooms, /join <room>, /create <room>, /pm <user> <msg>, /time"
        ttk.Label(help_frame, text=help_text, font=("TkDefaultFont", 8)).pack()
    
    def connect_to_server(self):
        """Connect to chat server"""
        username = self.username_var.get().strip()
        host = self.host_var.get().strip()
        
        try:
            port = int(self.port_var.get())
        except ValueError:
            messagebox.showerror("Error", "Invalid port number")
            return
        
        if not username:
            messagebox.showerror("Error", "Please enter a username")
            return
        
        # Create client and connect
        self.client = ChatClient(username, host, port)
        
        if self.client.connect():
            self.status_var.set(f"Connected as {self.client.username}")
            self.status_var.set(f"Connected as {self.client.username}")
            
            # Update UI state
            self.connect_button.config(state=tk.DISABLED)
            self.disconnect_button.config(state=tk.NORMAL)
            self.send_button.config(state=tk.NORMAL)
            self.message_entry.config(state=tk.NORMAL)
            
            # Start message update thread
            self.update_messages_thread = threading.Thread(target=self.update_messages, daemon=True)
            self.update_messages_thread.start()
            
            # Clear and focus message entry
            self.message_entry.focus()
            
        else:
            messagebox.showerror("Error", "Failed to connect to server")
            self.client = None
    
    def disconnect_from_server(self):
        """Disconnect from chat server"""
        if self.client:
            self.client.disconnect()
            self.client = None
        
        # Update UI state
        self.status_var.set("Not connected")
        self.connect_button.config(state=tk.NORMAL)
        self.disconnect_button.config(state=tk.DISABLED)
        self.send_button.config(state=tk.DISABLED)
        self.message_entry.config(state=tk.DISABLED)
    
    def send_message_event(self, event):
        """Handle Enter key press"""
        self.send_message()
    
    def send_message(self):
        """Send message or command"""
        if not self.client or not self.client.connected:
            return
        
        content = self.message_var.get().strip()
        if not content:
            return
        
        if content.startswith('/'):
            # Send as command
            self.client.send_command(content)
        else:
            # Send as message
            self.client.send_message(content)
        
        # Clear input
        self.message_var.set("")
    
    def update_messages(self):
        """Update messages from server"""
        while self.client and self.client.connected:
            try:
                # Get message from queue (blocking with timeout)
                message_data = self.client.message_queue.get(timeout=1.0)
                
                # Update UI in main thread
                self.root.after(0, self.display_message, message_data)
                
            except queue.Empty:
                continue
            except Exception as e:
                logger.error(f"Error updating messages: {e}")
                break
    
    def display_message(self, message_data: dict):
        """Display message in chat window"""
        self.messages_display.config(state=tk.NORMAL)
        
        message_type = message_data.get('type', 'message')
        
        if message_type == 'welcome':
            # Display welcome message and history
            username = message_data['username']
            room = message_data['room']
            self.messages_display.insert(tk.END, f"=== Welcome {username} to room '{room}' ===\n", 'system')
            
            # Display history
            history = message_data.get('history', [])
            for msg_dict in history:
                self.format_and_insert_message(msg_dict)
        
        elif message_type == 'system':
            # System message
            content = message_data['content']
            timestamp = datetime.datetime.now().strftime("%H:%M:%S")
            self.messages_display.insert(tk.END, f"[{timestamp}] SYSTEM: {content}\n", 'system')
        
        elif message_type == 'private':
            # Private message
            from_user = message_data['from']
            content = message_data['content']
            timestamp = datetime.datetime.fromisoformat(message_data['timestamp']).strftime("%H:%M:%S")
            self.messages_display.insert(tk.END, f"[{timestamp}] PRIVATE from {from_user}: {content}\n", 'private')
        
        else:
            # Regular message
            self.format_and_insert_message(message_data)
        
        # Configure tags for styling
        self.messages_display.tag_config('system', foreground='blue')
        self.messages_display.tag_config('private', foreground='purple')
        self.messages_display.tag_config('join', foreground='green')
        self.messages_display.tag_config('leave', foreground='red')
        
        # Scroll to bottom
        self.messages_display.see(tk.END)
        self.messages_display.config(state=tk.DISABLED)
    
    def format_and_insert_message(self, message_data: dict):
        """Format and insert a chat message"""
        username = message_data['username']
        content = message_data['content']
        msg_type = message_data.get('type', 'message')
        timestamp = datetime.datetime.fromisoformat(message_data['timestamp']).strftime("%H:%M:%S")
        
        if msg_type == 'join':
            self.messages_display.insert(tk.END, f"[{timestamp}] {content}\n", 'join')
        elif msg_type == 'leave':
            self.messages_display.insert(tk.END, f"[{timestamp}] {content}\n", 'leave')
        else:
            self.messages_display.insert(tk.END, f"[{timestamp}] {username}: {content}\n")
    
    def run(self):
        """Run the GUI application"""
        self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
        self.root.mainloop()
    
    def on_closing(self):
        """Handle window closing"""
        if self.client:
            self.client.disconnect()
        self.root.destroy()
 
def start_server():
    """Start chat server"""
    print("Starting chat server...")
    server = ChatServer()
    try:
        server.start()
    except KeyboardInterrupt:
        print("\nShutting down server...")
        server.stop()
 
def start_client():
    """Start chat client GUI"""
    app = ChatGUI()
    app.run()
 
def main():
    """Main function"""
    import sys
    
    if len(sys.argv) > 1:
        if sys.argv[1] == 'server':
            start_server()
        elif sys.argv[1] == 'client':
            start_client()
        else:
            print("Usage: python chatapp.py [server|client]")
    else:
        # Default to client
        start_client()
 
if __name__ == "__main__":
    main()
 
  1. Save the file.
  2. Run the following command to start the server first:
command
C:\Users\username\Documents\chatApplication> python chatapplication.py server
Starting Chat Server...
======================
Server listening on localhost:12345
✓ Waiting for client connections...
✓ Chat rooms initialized
✓ Message history enabled
Server ready for connections!
command
C:\Users\username\Documents\chatApplication> python chatapplication.py server
Starting Chat Server...
======================
Server listening on localhost:12345
✓ Waiting for client connections...
✓ Chat rooms initialized
✓ Message history enabled
Server ready for connections!
  1. Then start the client application:
command
C:\Users\username\Documents\chatApplication> python chatapplication.py client
Chat Application
================
Enter username: Alice
✓ Connected to server
✓ Joined room: general
✓ Ready to chat!
Type '/help' for commands
command
C:\Users\username\Documents\chatApplication> python chatapplication.py client
Chat Application
================
Enter username: Alice
✓ Connected to server
✓ Joined room: general
✓ Ready to chat!
Type '/help' for commands

Explanation

  1. The ChatMessageChatMessage class represents individual messages with metadata and serialization.
  2. The ChatServerChatServer class manages multiple client connections and message routing.
  3. The ChatClientChatClient class handles connection to server and message handling.
  4. The ChatGUIChatGUI class provides a user-friendly interface for the chat application.
  5. Socket programming enables real-time communication between multiple clients.
  6. Threading allows concurrent handling of multiple client connections.
  7. Message broadcasting distributes messages to all connected clients in a room.
  8. Command system provides chat features like private messaging and room management.
  9. Room management allows users to create and join different chat channels.
  10. Message history maintains conversation logs for persistence.
  11. User authentication and session management for secure connections.
  12. Error handling ensures robust network communication.

Next Steps

Congratulations! You have successfully created a Chat Application in Python. Experiment with the code and see if you can modify the application. Here are a few suggestions:

  • Add user registration and authentication system
  • Implement file sharing and image uploads
  • Create voice and video chat capabilities
  • Add message encryption for secure communication
  • Implement chat bots and automated responses
  • Create admin panel for chat moderation
  • Add emoji support and rich text formatting
  • Implement push notifications for mobile clients
  • Create chat analytics and usage statistics

Conclusion

In this project, you learned how to create a Chat Application in Python using socket programming and networking concepts. You also learned about client-server architecture, concurrent programming, message broadcasting, and implementing real-time communication systems. You can find the source code on GitHub

How It Works

1. Server Architecture

chatapplication.py
class ChatServer:
    def __init__(self, host: str = 'localhost', port: int = 12345):
        self.host = host
        self.port = port
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        
        self.clients: Dict[socket.socket, dict] = {}
        self.message_history: List[ChatMessage] = []
        self.running = False
        self.max_history = 100
        
        # Room management
        self.rooms: Dict[str, List[socket.socket]] = {'general': []}
        self.default_room = 'general'
chatapplication.py
class ChatServer:
    def __init__(self, host: str = 'localhost', port: int = 12345):
        self.host = host
        self.port = port
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        
        self.clients: Dict[socket.socket, dict] = {}
        self.message_history: List[ChatMessage] = []
        self.running = False
        self.max_history = 100
        
        # Room management
        self.rooms: Dict[str, List[socket.socket]] = {'general': []}
        self.default_room = 'general'

The server architecture includes:

  • Socket Management: TCP socket for reliable communication
  • Client Tracking: Dictionary to store client connections and metadata
  • Room System: Multiple chat rooms with user management
  • Message History: Persistent storage of chat messages
  • Threading: Concurrent handling of multiple clients

2. Message Broadcasting System

chatapplication.py
def broadcast_message(self, message: ChatMessage, room: str = None, exclude: socket.socket = None):
    if room is None:
        # Broadcast to all clients
        target_clients = list(self.clients.keys())
    else:
        # Broadcast to specific room
        target_clients = self.rooms.get(room, [])
    
    message_data = json.dumps(message.to_dict())
    
    for client_socket in target_clients:
        if client_socket != exclude:
            try:
                client_socket.send(message_data.encode('utf-8'))
            except:
                self.disconnect_client(client_socket)
chatapplication.py
def broadcast_message(self, message: ChatMessage, room: str = None, exclude: socket.socket = None):
    if room is None:
        # Broadcast to all clients
        target_clients = list(self.clients.keys())
    else:
        # Broadcast to specific room
        target_clients = self.rooms.get(room, [])
    
    message_data = json.dumps(message.to_dict())
    
    for client_socket in target_clients:
        if client_socket != exclude:
            try:
                client_socket.send(message_data.encode('utf-8'))
            except:
                self.disconnect_client(client_socket)

3. Client Connection Handling

chatapplication.py
def handle_client(self, client_socket: socket.socket, address: Tuple[str, int]):
    username = None
    
    try:
        # Request username
        client_socket.send(b"USERNAME_REQUEST")
        username = client_socket.recv(1024).decode('utf-8').strip()
        
        # Add client to tracking
        self.clients[client_socket] = {
            'username': username,
            'address': address,
            'room': self.default_room,
            'connected_at': datetime.datetime.now()
        }
chatapplication.py
def handle_client(self, client_socket: socket.socket, address: Tuple[str, int]):
    username = None
    
    try:
        # Request username
        client_socket.send(b"USERNAME_REQUEST")
        username = client_socket.recv(1024).decode('utf-8').strip()
        
        # Add client to tracking
        self.clients[client_socket] = {
            'username': username,
            'address': address,
            'room': self.default_room,
            'connected_at': datetime.datetime.now()
        }

4. Command Processing

chatapplication.py
def handle_command(self, client_socket: socket.socket, command: str):
    client_info = self.clients[client_socket]
    username = client_info['username']
    
    parts = command.strip().split()
    if not parts:
        return
    
    cmd = parts[0].lower()
    
    if cmd == '/help':
        help_text = """
        Available Commands:
        /help - Show this help message
        /users - List online users
        /rooms - List available rooms
        /join <room> - Join a room
        /create <room> - Create new room
        /pm <user> <message> - Send private message
        /time - Show current time
        """
        self.send_system_message(client_socket, help_text)
chatapplication.py
def handle_command(self, client_socket: socket.socket, command: str):
    client_info = self.clients[client_socket]
    username = client_info['username']
    
    parts = command.strip().split()
    if not parts:
        return
    
    cmd = parts[0].lower()
    
    if cmd == '/help':
        help_text = """
        Available Commands:
        /help - Show this help message
        /users - List online users
        /rooms - List available rooms
        /join <room> - Join a room
        /create <room> - Create new room
        /pm <user> <message> - Send private message
        /time - Show current time
        """
        self.send_system_message(client_socket, help_text)

5. GUI Implementation

chatapplication.py
class ChatGUI:
    def __init__(self):
        self.root = tk.Tk()
        self.root.title("Chat Application")
        self.root.geometry("800x600")
        
        self.client = None
        self.username = ""
        self.connected = False
        self.message_queue = queue.Queue()
        
        self.setup_ui()
        self.update_messages()
    
    def setup_ui(self):
        # Create main frame
        main_frame = ttk.Frame(self.root)
        main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
chatapplication.py
class ChatGUI:
    def __init__(self):
        self.root = tk.Tk()
        self.root.title("Chat Application")
        self.root.geometry("800x600")
        
        self.client = None
        self.username = ""
        self.connected = False
        self.message_queue = queue.Queue()
        
        self.setup_ui()
        self.update_messages()
    
    def setup_ui(self):
        # Create main frame
        main_frame = ttk.Frame(self.root)
        main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)

6. Message Queue System

chatapplication.py
def listen_for_messages(self):
    while self.connected:
        try:
            message_data = self.socket.recv(1024).decode('utf-8')
            if message_data:
                message = json.loads(message_data)
                self.message_queue.put(message)
        except:
            break
 
def update_messages(self):
    try:
        while True:
            message_data = self.message_queue.get_nowait()
            self.display_message(message_data)
    except queue.Empty:
        pass
    
    self.root.after(100, self.update_messages)
chatapplication.py
def listen_for_messages(self):
    while self.connected:
        try:
            message_data = self.socket.recv(1024).decode('utf-8')
            if message_data:
                message = json.loads(message_data)
                self.message_queue.put(message)
        except:
            break
 
def update_messages(self):
    try:
        while True:
            message_data = self.message_queue.get_nowait()
            self.display_message(message_data)
    except queue.Empty:
        pass
    
    self.root.after(100, self.update_messages)

Advanced Chat Features

1. Private Messaging System

chatapplication.py
def send_private_message(self, sender_socket: socket.socket, target_username: str, content: str):
    """Send private message between users"""
    sender_info = self.clients.get(sender_socket)
    if not sender_info:
        return
    
    sender_username = sender_info['username']
    target_socket = None
    
    # Find target user's socket
    for client_socket, client_info in self.clients.items():
        if client_info['username'] == target_username:
            target_socket = client_socket
            break
    
    if target_socket:
        # Send to target user
        private_message = ChatMessage(
            username=sender_username,
            content=content,
            message_type="private"
        )
        
        try:
            message_data = json.dumps(private_message.to_dict())
            target_socket.send(message_data.encode('utf-8'))
            
            # Confirm to sender
            confirmation = ChatMessage(
                username="System",
                content=f"Private message sent to {target_username}",
                message_type="system"
            )
            sender_socket.send(json.dumps(confirmation.to_dict()).encode('utf-8'))
            
        except Exception as e:
            error_message = ChatMessage(
                username="System",
                content=f"Failed to send private message: {str(e)}",
                message_type="system"
            )
            sender_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
    else:
        # User not found
        error_message = ChatMessage(
            username="System",
            content=f"User '{target_username}' not found or offline",
            message_type="system"
        )
        sender_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
chatapplication.py
def send_private_message(self, sender_socket: socket.socket, target_username: str, content: str):
    """Send private message between users"""
    sender_info = self.clients.get(sender_socket)
    if not sender_info:
        return
    
    sender_username = sender_info['username']
    target_socket = None
    
    # Find target user's socket
    for client_socket, client_info in self.clients.items():
        if client_info['username'] == target_username:
            target_socket = client_socket
            break
    
    if target_socket:
        # Send to target user
        private_message = ChatMessage(
            username=sender_username,
            content=content,
            message_type="private"
        )
        
        try:
            message_data = json.dumps(private_message.to_dict())
            target_socket.send(message_data.encode('utf-8'))
            
            # Confirm to sender
            confirmation = ChatMessage(
                username="System",
                content=f"Private message sent to {target_username}",
                message_type="system"
            )
            sender_socket.send(json.dumps(confirmation.to_dict()).encode('utf-8'))
            
        except Exception as e:
            error_message = ChatMessage(
                username="System",
                content=f"Failed to send private message: {str(e)}",
                message_type="system"
            )
            sender_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
    else:
        # User not found
        error_message = ChatMessage(
            username="System",
            content=f"User '{target_username}' not found or offline",
            message_type="system"
        )
        sender_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))

2. Room Management System

chatapplication.py
def create_room(self, room_name: str, creator_socket: socket.socket, password: str = None):
    """Create a new chat room"""
    if room_name in self.rooms:
        error_message = ChatMessage(
            username="System",
            content=f"Room '{room_name}' already exists",
            message_type="system"
        )
        creator_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
        return False
    
    # Create new room
    self.rooms[room_name] = {
        'clients': [],
        'creator': self.clients[creator_socket]['username'],
        'created_at': datetime.datetime.now(),
        'password': password,
        'topic': f"Welcome to {room_name}!",
        'max_users': 50
    }
    
    # Move creator to new room
    self.move_client_to_room(creator_socket, room_name)
    
    success_message = ChatMessage(
        username="System",
        content=f"Room '{room_name}' created successfully. You are now in {room_name}.",
        message_type="system"
    )
    creator_socket.send(json.dumps(success_message.to_dict()).encode('utf-8'))
    
    return True
 
def join_room(self, client_socket: socket.socket, room_name: str, password: str = None):
    """Join an existing room"""
    if room_name not in self.rooms:
        error_message = ChatMessage(
            username="System",
            content=f"Room '{room_name}' does not exist",
            message_type="system"
        )
        client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
        return False
    
    room = self.rooms[room_name]
    
    # Check password if required
    if room.get('password') and room['password'] != password:
        error_message = ChatMessage(
            username="System",
            content=f"Incorrect password for room '{room_name}'",
            message_type="system"
        )
        client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
        return False
    
    # Check room capacity
    if len(room['clients']) >= room['max_users']:
        error_message = ChatMessage(
            username="System",
            content=f"Room '{room_name}' is full",
            message_type="system"
        )
        client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
        return False
    
    # Move client to room
    self.move_client_to_room(client_socket, room_name)
    return True
 
def list_rooms(self, client_socket: socket.socket):
    """List all available rooms"""
    room_list = []
    for room_name, room_info in self.rooms.items():
        room_list.append({
            'name': room_name,
            'users': len(room_info['clients']),
            'max_users': room_info['max_users'],
            'has_password': bool(room_info.get('password')),
            'topic': room_info.get('topic', 'No topic set')
        })
    
    room_info_message = ChatMessage(
        username="System",
        content=f"Available rooms:\n" + 
                "\n".join([
                    f"• {room['name']}: {room['users']}/{room['max_users']} users"
                    f"{' 🔒' if room['has_password'] else ''} - {room['topic']}"
                    for room in room_list
                ]),
        message_type="system"
    )
    
    client_socket.send(json.dumps(room_info_message.to_dict()).encode('utf-8'))
chatapplication.py
def create_room(self, room_name: str, creator_socket: socket.socket, password: str = None):
    """Create a new chat room"""
    if room_name in self.rooms:
        error_message = ChatMessage(
            username="System",
            content=f"Room '{room_name}' already exists",
            message_type="system"
        )
        creator_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
        return False
    
    # Create new room
    self.rooms[room_name] = {
        'clients': [],
        'creator': self.clients[creator_socket]['username'],
        'created_at': datetime.datetime.now(),
        'password': password,
        'topic': f"Welcome to {room_name}!",
        'max_users': 50
    }
    
    # Move creator to new room
    self.move_client_to_room(creator_socket, room_name)
    
    success_message = ChatMessage(
        username="System",
        content=f"Room '{room_name}' created successfully. You are now in {room_name}.",
        message_type="system"
    )
    creator_socket.send(json.dumps(success_message.to_dict()).encode('utf-8'))
    
    return True
 
def join_room(self, client_socket: socket.socket, room_name: str, password: str = None):
    """Join an existing room"""
    if room_name not in self.rooms:
        error_message = ChatMessage(
            username="System",
            content=f"Room '{room_name}' does not exist",
            message_type="system"
        )
        client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
        return False
    
    room = self.rooms[room_name]
    
    # Check password if required
    if room.get('password') and room['password'] != password:
        error_message = ChatMessage(
            username="System",
            content=f"Incorrect password for room '{room_name}'",
            message_type="system"
        )
        client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
        return False
    
    # Check room capacity
    if len(room['clients']) >= room['max_users']:
        error_message = ChatMessage(
            username="System",
            content=f"Room '{room_name}' is full",
            message_type="system"
        )
        client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
        return False
    
    # Move client to room
    self.move_client_to_room(client_socket, room_name)
    return True
 
def list_rooms(self, client_socket: socket.socket):
    """List all available rooms"""
    room_list = []
    for room_name, room_info in self.rooms.items():
        room_list.append({
            'name': room_name,
            'users': len(room_info['clients']),
            'max_users': room_info['max_users'],
            'has_password': bool(room_info.get('password')),
            'topic': room_info.get('topic', 'No topic set')
        })
    
    room_info_message = ChatMessage(
        username="System",
        content=f"Available rooms:\n" + 
                "\n".join([
                    f"• {room['name']}: {room['users']}/{room['max_users']} users"
                    f"{' 🔒' if room['has_password'] else ''} - {room['topic']}"
                    for room in room_list
                ]),
        message_type="system"
    )
    
    client_socket.send(json.dumps(room_info_message.to_dict()).encode('utf-8'))

3. User Status and Presence

chatapplication.py
def update_user_status(self, client_socket: socket.socket, status: str):
    """Update user's online status"""
    valid_statuses = ['online', 'away', 'busy', 'invisible']
    
    if status not in valid_statuses:
        error_message = ChatMessage(
            username="System",
            content=f"Invalid status. Valid options: {', '.join(valid_statuses)}",
            message_type="system"
        )
        client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
        return
    
    client_info = self.clients.get(client_socket)
    if client_info:
        old_status = client_info.get('status', 'online')
        client_info['status'] = status
        client_info['last_activity'] = datetime.datetime.now()
        
        # Notify room members of status change
        username = client_info['username']
        current_room = client_info.get('room', self.default_room)
        
        status_message = ChatMessage(
            username="System",
            content=f"{username} is now {status}",
            message_type="status_change"
        )
        
        self.broadcast_message(status_message, current_room, exclude=client_socket)
 
def get_online_users(self, room_name: str = None) -> List[Dict]:
    """Get list of online users in room or globally"""
    users = []
    
    for client_socket, client_info in self.clients.items():
        if room_name and client_info.get('room') != room_name:
            continue
        
        # Check if user is truly online (recent activity)
        last_activity = client_info.get('last_activity', datetime.datetime.now())
        time_diff = datetime.datetime.now() - last_activity
        
        if time_diff.total_seconds() < 300:  # 5 minutes timeout
            users.append({
                'username': client_info['username'],
                'status': client_info.get('status', 'online'),
                'room': client_info.get('room', self.default_room),
                'joined_at': client_info.get('connected_at', ''),
                'last_activity': last_activity.isoformat()
            })
    
    return users
chatapplication.py
def update_user_status(self, client_socket: socket.socket, status: str):
    """Update user's online status"""
    valid_statuses = ['online', 'away', 'busy', 'invisible']
    
    if status not in valid_statuses:
        error_message = ChatMessage(
            username="System",
            content=f"Invalid status. Valid options: {', '.join(valid_statuses)}",
            message_type="system"
        )
        client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
        return
    
    client_info = self.clients.get(client_socket)
    if client_info:
        old_status = client_info.get('status', 'online')
        client_info['status'] = status
        client_info['last_activity'] = datetime.datetime.now()
        
        # Notify room members of status change
        username = client_info['username']
        current_room = client_info.get('room', self.default_room)
        
        status_message = ChatMessage(
            username="System",
            content=f"{username} is now {status}",
            message_type="status_change"
        )
        
        self.broadcast_message(status_message, current_room, exclude=client_socket)
 
def get_online_users(self, room_name: str = None) -> List[Dict]:
    """Get list of online users in room or globally"""
    users = []
    
    for client_socket, client_info in self.clients.items():
        if room_name and client_info.get('room') != room_name:
            continue
        
        # Check if user is truly online (recent activity)
        last_activity = client_info.get('last_activity', datetime.datetime.now())
        time_diff = datetime.datetime.now() - last_activity
        
        if time_diff.total_seconds() < 300:  # 5 minutes timeout
            users.append({
                'username': client_info['username'],
                'status': client_info.get('status', 'online'),
                'room': client_info.get('room', self.default_room),
                'joined_at': client_info.get('connected_at', ''),
                'last_activity': last_activity.isoformat()
            })
    
    return users

4. File Sharing Capability

chatapplication.py
def handle_file_share(self, client_socket: socket.socket, file_data: Dict):
    """Handle file sharing between users"""
    import base64
    import mimetypes
    
    client_info = self.clients.get(client_socket)
    if not client_info:
        return
    
    try:
        # Decode file data
        filename = file_data['filename']
        file_content = base64.b64decode(file_data['content'])
        file_size = len(file_content)
        
        # Validate file
        max_file_size = 10 * 1024 * 1024  # 10MB limit
        if file_size > max_file_size:
            error_message = ChatMessage(
                username="System",
                content="File too large. Maximum size is 10MB.",
                message_type="system"
            )
            client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
            return
        
        # Save file temporarily
        upload_dir = "uploads"
        os.makedirs(upload_dir, exist_ok=True)
        
        safe_filename = f"{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}_{filename}"
        file_path = os.path.join(upload_dir, safe_filename)
        
        with open(file_path, 'wb') as f:
            f.write(file_content)
        
        # Get file type
        mime_type, _ = mimetypes.guess_type(filename)
        file_type = 'image' if mime_type and mime_type.startswith('image/') else 'file'
        
        # Create file share message
        file_message = ChatMessage(
            username=client_info['username'],
            content=f"shared a file: {filename} ({file_size} bytes)",
            message_type="file_share"
        )
        
        # Add file information to message
        file_message.file_info = {
            'filename': filename,
            'size': file_size,
            'type': file_type,
            'path': file_path,
            'mime_type': mime_type
        }
        
        # Broadcast to room
        current_room = client_info.get('room', self.default_room)
        self.broadcast_message(file_message, current_room)
        
    except Exception as e:
        error_message = ChatMessage(
            username="System",
            content=f"File share failed: {str(e)}",
            message_type="system"
        )
        client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
chatapplication.py
def handle_file_share(self, client_socket: socket.socket, file_data: Dict):
    """Handle file sharing between users"""
    import base64
    import mimetypes
    
    client_info = self.clients.get(client_socket)
    if not client_info:
        return
    
    try:
        # Decode file data
        filename = file_data['filename']
        file_content = base64.b64decode(file_data['content'])
        file_size = len(file_content)
        
        # Validate file
        max_file_size = 10 * 1024 * 1024  # 10MB limit
        if file_size > max_file_size:
            error_message = ChatMessage(
                username="System",
                content="File too large. Maximum size is 10MB.",
                message_type="system"
            )
            client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
            return
        
        # Save file temporarily
        upload_dir = "uploads"
        os.makedirs(upload_dir, exist_ok=True)
        
        safe_filename = f"{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}_{filename}"
        file_path = os.path.join(upload_dir, safe_filename)
        
        with open(file_path, 'wb') as f:
            f.write(file_content)
        
        # Get file type
        mime_type, _ = mimetypes.guess_type(filename)
        file_type = 'image' if mime_type and mime_type.startswith('image/') else 'file'
        
        # Create file share message
        file_message = ChatMessage(
            username=client_info['username'],
            content=f"shared a file: {filename} ({file_size} bytes)",
            message_type="file_share"
        )
        
        # Add file information to message
        file_message.file_info = {
            'filename': filename,
            'size': file_size,
            'type': file_type,
            'path': file_path,
            'mime_type': mime_type
        }
        
        # Broadcast to room
        current_room = client_info.get('room', self.default_room)
        self.broadcast_message(file_message, current_room)
        
    except Exception as e:
        error_message = ChatMessage(
            username="System",
            content=f"File share failed: {str(e)}",
            message_type="system"
        )
        client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))

5. Message History and Persistence

chatapplication.py
def save_message_to_database(self, message: ChatMessage, room: str):
    """Save message to persistent storage"""
    import sqlite3
    
    db_path = "chat_history.db"
    
    # Initialize database if needed
    with sqlite3.connect(db_path) as conn:
        conn.execute('''
            CREATE TABLE IF NOT EXISTS messages (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                username TEXT NOT NULL,
                content TEXT NOT NULL,
                message_type TEXT NOT NULL,
                room TEXT NOT NULL,
                timestamp TEXT NOT NULL,
                message_id TEXT UNIQUE
            )
        ''')
        
        # Insert message
        conn.execute('''
            INSERT OR IGNORE INTO messages 
            (username, content, message_type, room, timestamp, message_id)
            VALUES (?, ?, ?, ?, ?, ?)
        ''', (message.username, message.content, message.message_type, 
              room, message.timestamp.isoformat(), message.id))
 
def get_message_history(self, room: str, limit: int = 50) -> List[ChatMessage]:
    """Retrieve message history for a room"""
    import sqlite3
    
    db_path = "chat_history.db"
    messages = []
    
    try:
        with sqlite3.connect(db_path) as conn:
            cursor = conn.execute('''
                SELECT username, content, message_type, timestamp, message_id
                FROM messages 
                WHERE room = ?
                ORDER BY timestamp DESC
                LIMIT ?
            ''', (room, limit))
            
            for row in cursor.fetchall():
                message = ChatMessage(
                    username=row[0],
                    content=row[1],
                    message_type=row[2],
                    timestamp=datetime.datetime.fromisoformat(row[3])
                )
                message.id = row[4]
                messages.append(message)
        
        return list(reversed(messages))  # Return in chronological order
    
    except Exception as e:
        logger.error(f"Error retrieving message history: {e}")
        return []
 
def send_message_history(self, client_socket: socket.socket, room: str):
    """Send recent message history to newly joined client"""
    history = self.get_message_history(room, limit=20)
    
    for message in history:
        try:
            message_data = json.dumps(message.to_dict())
            client_socket.send(message_data.encode('utf-8'))
            time.sleep(0.1)  # Small delay to ensure order
        except:
            break
chatapplication.py
def save_message_to_database(self, message: ChatMessage, room: str):
    """Save message to persistent storage"""
    import sqlite3
    
    db_path = "chat_history.db"
    
    # Initialize database if needed
    with sqlite3.connect(db_path) as conn:
        conn.execute('''
            CREATE TABLE IF NOT EXISTS messages (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                username TEXT NOT NULL,
                content TEXT NOT NULL,
                message_type TEXT NOT NULL,
                room TEXT NOT NULL,
                timestamp TEXT NOT NULL,
                message_id TEXT UNIQUE
            )
        ''')
        
        # Insert message
        conn.execute('''
            INSERT OR IGNORE INTO messages 
            (username, content, message_type, room, timestamp, message_id)
            VALUES (?, ?, ?, ?, ?, ?)
        ''', (message.username, message.content, message.message_type, 
              room, message.timestamp.isoformat(), message.id))
 
def get_message_history(self, room: str, limit: int = 50) -> List[ChatMessage]:
    """Retrieve message history for a room"""
    import sqlite3
    
    db_path = "chat_history.db"
    messages = []
    
    try:
        with sqlite3.connect(db_path) as conn:
            cursor = conn.execute('''
                SELECT username, content, message_type, timestamp, message_id
                FROM messages 
                WHERE room = ?
                ORDER BY timestamp DESC
                LIMIT ?
            ''', (room, limit))
            
            for row in cursor.fetchall():
                message = ChatMessage(
                    username=row[0],
                    content=row[1],
                    message_type=row[2],
                    timestamp=datetime.datetime.fromisoformat(row[3])
                )
                message.id = row[4]
                messages.append(message)
        
        return list(reversed(messages))  # Return in chronological order
    
    except Exception as e:
        logger.error(f"Error retrieving message history: {e}")
        return []
 
def send_message_history(self, client_socket: socket.socket, room: str):
    """Send recent message history to newly joined client"""
    history = self.get_message_history(room, limit=20)
    
    for message in history:
        try:
            message_data = json.dumps(message.to_dict())
            client_socket.send(message_data.encode('utf-8'))
            time.sleep(0.1)  # Small delay to ensure order
        except:
            break

Enhanced GUI Features

1. Modern Chat Interface

chatapplication.py
def setup_modern_ui(self):
    """Create modern chat interface with enhanced features"""
    # Main container
    self.main_frame = ttk.Frame(self.root)
    self.main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
    
    # Create paned window for resizable layout
    self.paned_window = ttk.PanedWindow(self.main_frame, orient=tk.HORIZONTAL)
    self.paned_window.pack(fill=tk.BOTH, expand=True)
    
    # Left sidebar for rooms and users
    self.sidebar_frame = ttk.Frame(self.paned_window, width=250)
    self.paned_window.add(self.sidebar_frame, weight=1)
    
    # Main chat area
    self.chat_frame = ttk.Frame(self.paned_window)
    self.paned_window.add(self.chat_frame, weight=3)
    
    self.setup_sidebar()
    self.setup_chat_area()
    self.setup_input_area()
 
def setup_sidebar(self):
    """Setup sidebar with rooms and user list"""
    # Room list
    ttk.Label(self.sidebar_frame, text="Rooms", font=('Arial', 12, 'bold')).pack(pady=5)
    
    self.room_listbox = tk.Listbox(self.sidebar_frame, height=8)
    self.room_listbox.pack(fill=tk.X, padx=5, pady=5)
    self.room_listbox.bind('<Double-Button-1>', self.on_room_select)
    
    # Room controls
    room_controls = ttk.Frame(self.sidebar_frame)
    room_controls.pack(fill=tk.X, padx=5)
    
    ttk.Button(room_controls, text="Join Room", command=self.join_room_dialog).pack(side=tk.LEFT, padx=2)
    ttk.Button(room_controls, text="Create Room", command=self.create_room_dialog).pack(side=tk.LEFT, padx=2)
    
    # User list
    ttk.Separator(self.sidebar_frame, orient=tk.HORIZONTAL).pack(fill=tk.X, pady=10)
    ttk.Label(self.sidebar_frame, text="Online Users", font=('Arial', 12, 'bold')).pack(pady=5)
    
    self.user_listbox = tk.Listbox(self.sidebar_frame, height=10)
    self.user_listbox.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
    self.user_listbox.bind('<Double-Button-1>', self.on_user_select)
 
def setup_chat_area(self):
    """Setup main chat display area"""
    # Chat header
    self.chat_header = ttk.Frame(self.chat_frame)
    self.chat_header.pack(fill=tk.X, pady=(0, 5))
    
    self.room_label = ttk.Label(self.chat_header, text="General", font=('Arial', 14, 'bold'))
    self.room_label.pack(side=tk.LEFT)
    
    self.user_count_label = ttk.Label(self.chat_header, text="0 users")
    self.user_count_label.pack(side=tk.RIGHT)
    
    # Chat messages area with scrollbar
    chat_container = ttk.Frame(self.chat_frame)
    chat_container.pack(fill=tk.BOTH, expand=True)
    
    self.chat_display = scrolledtext.ScrolledText(
        chat_container, 
        state=tk.DISABLED,
        wrap=tk.WORD,
        font=('Consolas', 10),
        bg='#f8f9fa',
        fg='#333333'
    )
    self.chat_display.pack(fill=tk.BOTH, expand=True)
    
    # Configure text tags for message styling
    self.chat_display.tag_configure('username', foreground='#007bff', font=('Consolas', 10, 'bold'))
    self.chat_display.tag_configure('timestamp', foreground='#6c757d', font=('Consolas', 8))
    self.chat_display.tag_configure('private', foreground='#dc3545', font=('Consolas', 10, 'italic'))
    self.chat_display.tag_configure('system', foreground='#28a745', font=('Consolas', 10, 'italic'))
 
def setup_input_area(self):
    """Setup message input area with enhanced features"""
    input_frame = ttk.Frame(self.chat_frame)
    input_frame.pack(fill=tk.X, pady=(5, 0))
    
    # Status indicator
    self.status_frame = ttk.Frame(input_frame)
    self.status_frame.pack(fill=tk.X, pady=(0, 5))
    
    self.typing_label = ttk.Label(self.status_frame, text="", foreground='#6c757d')
    self.typing_label.pack(side=tk.LEFT)
    
    self.connection_status = ttk.Label(self.status_frame, text="●", foreground='red')
    self.connection_status.pack(side=tk.RIGHT)
    
    # Message input
    message_frame = ttk.Frame(input_frame)
    message_frame.pack(fill=tk.X)
    
    self.message_entry = tk.Text(message_frame, height=3, wrap=tk.WORD)
    self.message_entry.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5))
    self.message_entry.bind('<Return>', self.send_message_event)
    self.message_entry.bind('<KeyPress>', self.on_typing)
    
    # Button frame
    button_frame = ttk.Frame(message_frame)
    button_frame.pack(side=tk.RIGHT, fill=tk.Y)
    
    ttk.Button(button_frame, text="📎", command=self.attach_file, width=3).pack(pady=1)
    ttk.Button(button_frame, text="😊", command=self.show_emoji_picker, width=3).pack(pady=1)
    ttk.Button(button_frame, text="Send", command=self.send_message).pack(pady=1)
chatapplication.py
def setup_modern_ui(self):
    """Create modern chat interface with enhanced features"""
    # Main container
    self.main_frame = ttk.Frame(self.root)
    self.main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
    
    # Create paned window for resizable layout
    self.paned_window = ttk.PanedWindow(self.main_frame, orient=tk.HORIZONTAL)
    self.paned_window.pack(fill=tk.BOTH, expand=True)
    
    # Left sidebar for rooms and users
    self.sidebar_frame = ttk.Frame(self.paned_window, width=250)
    self.paned_window.add(self.sidebar_frame, weight=1)
    
    # Main chat area
    self.chat_frame = ttk.Frame(self.paned_window)
    self.paned_window.add(self.chat_frame, weight=3)
    
    self.setup_sidebar()
    self.setup_chat_area()
    self.setup_input_area()
 
def setup_sidebar(self):
    """Setup sidebar with rooms and user list"""
    # Room list
    ttk.Label(self.sidebar_frame, text="Rooms", font=('Arial', 12, 'bold')).pack(pady=5)
    
    self.room_listbox = tk.Listbox(self.sidebar_frame, height=8)
    self.room_listbox.pack(fill=tk.X, padx=5, pady=5)
    self.room_listbox.bind('<Double-Button-1>', self.on_room_select)
    
    # Room controls
    room_controls = ttk.Frame(self.sidebar_frame)
    room_controls.pack(fill=tk.X, padx=5)
    
    ttk.Button(room_controls, text="Join Room", command=self.join_room_dialog).pack(side=tk.LEFT, padx=2)
    ttk.Button(room_controls, text="Create Room", command=self.create_room_dialog).pack(side=tk.LEFT, padx=2)
    
    # User list
    ttk.Separator(self.sidebar_frame, orient=tk.HORIZONTAL).pack(fill=tk.X, pady=10)
    ttk.Label(self.sidebar_frame, text="Online Users", font=('Arial', 12, 'bold')).pack(pady=5)
    
    self.user_listbox = tk.Listbox(self.sidebar_frame, height=10)
    self.user_listbox.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
    self.user_listbox.bind('<Double-Button-1>', self.on_user_select)
 
def setup_chat_area(self):
    """Setup main chat display area"""
    # Chat header
    self.chat_header = ttk.Frame(self.chat_frame)
    self.chat_header.pack(fill=tk.X, pady=(0, 5))
    
    self.room_label = ttk.Label(self.chat_header, text="General", font=('Arial', 14, 'bold'))
    self.room_label.pack(side=tk.LEFT)
    
    self.user_count_label = ttk.Label(self.chat_header, text="0 users")
    self.user_count_label.pack(side=tk.RIGHT)
    
    # Chat messages area with scrollbar
    chat_container = ttk.Frame(self.chat_frame)
    chat_container.pack(fill=tk.BOTH, expand=True)
    
    self.chat_display = scrolledtext.ScrolledText(
        chat_container, 
        state=tk.DISABLED,
        wrap=tk.WORD,
        font=('Consolas', 10),
        bg='#f8f9fa',
        fg='#333333'
    )
    self.chat_display.pack(fill=tk.BOTH, expand=True)
    
    # Configure text tags for message styling
    self.chat_display.tag_configure('username', foreground='#007bff', font=('Consolas', 10, 'bold'))
    self.chat_display.tag_configure('timestamp', foreground='#6c757d', font=('Consolas', 8))
    self.chat_display.tag_configure('private', foreground='#dc3545', font=('Consolas', 10, 'italic'))
    self.chat_display.tag_configure('system', foreground='#28a745', font=('Consolas', 10, 'italic'))
 
def setup_input_area(self):
    """Setup message input area with enhanced features"""
    input_frame = ttk.Frame(self.chat_frame)
    input_frame.pack(fill=tk.X, pady=(5, 0))
    
    # Status indicator
    self.status_frame = ttk.Frame(input_frame)
    self.status_frame.pack(fill=tk.X, pady=(0, 5))
    
    self.typing_label = ttk.Label(self.status_frame, text="", foreground='#6c757d')
    self.typing_label.pack(side=tk.LEFT)
    
    self.connection_status = ttk.Label(self.status_frame, text="●", foreground='red')
    self.connection_status.pack(side=tk.RIGHT)
    
    # Message input
    message_frame = ttk.Frame(input_frame)
    message_frame.pack(fill=tk.X)
    
    self.message_entry = tk.Text(message_frame, height=3, wrap=tk.WORD)
    self.message_entry.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5))
    self.message_entry.bind('<Return>', self.send_message_event)
    self.message_entry.bind('<KeyPress>', self.on_typing)
    
    # Button frame
    button_frame = ttk.Frame(message_frame)
    button_frame.pack(side=tk.RIGHT, fill=tk.Y)
    
    ttk.Button(button_frame, text="📎", command=self.attach_file, width=3).pack(pady=1)
    ttk.Button(button_frame, text="😊", command=self.show_emoji_picker, width=3).pack(pady=1)
    ttk.Button(button_frame, text="Send", command=self.send_message).pack(pady=1)

2. Emoji and File Support

chatapplication.py
def show_emoji_picker(self):
    """Show emoji picker dialog"""
    emoji_window = tk.Toplevel(self.root)
    emoji_window.title("Select Emoji")
    emoji_window.geometry("300x200")
    emoji_window.resizable(False, False)
    
    # Common emojis
    emojis = [
        '😀', '😂', '😊', '😍', '😢', '😡', '👍', '👎',
        '❤️', '💔', '🔥', '💯', '✨', '🎉', '👀', '🤔',
        '😎', '🤣', '😇', '😈', '🤗', '🙄', '😴', '🤐'
    ]
    
    # Create emoji buttons
    for i, emoji in enumerate(emojis):
        row = i // 8
        col = i % 8
        
        btn = tk.Button(
            emoji_window,
            text=emoji,
            font=('Arial', 16),
            command=lambda e=emoji: self.insert_emoji(e, emoji_window),
            relief=tk.FLAT,
            bd=1
        )
        btn.grid(row=row, column=col, padx=2, pady=2, sticky='nsew')
 
def insert_emoji(self, emoji: str, window):
    """Insert selected emoji into message"""
    self.message_entry.insert(tk.INSERT, emoji)
    window.destroy()
    self.message_entry.focus()
 
def attach_file(self):
    """Open file attachment dialog"""
    from tkinter import filedialog
    
    file_path = filedialog.askopenfilename(
        title="Select File to Share",
        filetypes=[
            ("Images", "*.png *.jpg *.jpeg *.gif *.bmp"),
            ("Documents", "*.pdf *.doc *.docx *.txt"),
            ("All Files", "*.*")
        ]
    )
    
    if file_path:
        self.send_file(file_path)
 
def send_file(self, file_path: str):
    """Send file to chat"""
    try:
        file_size = os.path.getsize(file_path)
        max_size = 10 * 1024 * 1024  # 10MB
        
        if file_size > max_size:
            messagebox.showerror("File Too Large", "File must be smaller than 10MB")
            return
        
        filename = os.path.basename(file_path)
        
        # Read and encode file
        with open(file_path, 'rb') as f:
            file_content = f.read()
        
        import base64
        encoded_content = base64.b64encode(file_content).decode('utf-8')
        
        # Send file data
        file_data = {
            'type': 'file_share',
            'filename': filename,
            'content': encoded_content,
            'size': file_size
        }
        
        if self.client:
            self.client.send_command(json.dumps(file_data))
            
            # Show file in chat
            self.display_message({
                'username': self.username,
                'content': f"📎 Uploading {filename}...",
                'type': 'file_share',
                'timestamp': datetime.datetime.now().isoformat()
            })
    
    except Exception as e:
        messagebox.showerror("File Error", f"Failed to send file: {str(e)}")
chatapplication.py
def show_emoji_picker(self):
    """Show emoji picker dialog"""
    emoji_window = tk.Toplevel(self.root)
    emoji_window.title("Select Emoji")
    emoji_window.geometry("300x200")
    emoji_window.resizable(False, False)
    
    # Common emojis
    emojis = [
        '😀', '😂', '😊', '😍', '😢', '😡', '👍', '👎',
        '❤️', '💔', '🔥', '💯', '✨', '🎉', '👀', '🤔',
        '😎', '🤣', '😇', '😈', '🤗', '🙄', '😴', '🤐'
    ]
    
    # Create emoji buttons
    for i, emoji in enumerate(emojis):
        row = i // 8
        col = i % 8
        
        btn = tk.Button(
            emoji_window,
            text=emoji,
            font=('Arial', 16),
            command=lambda e=emoji: self.insert_emoji(e, emoji_window),
            relief=tk.FLAT,
            bd=1
        )
        btn.grid(row=row, column=col, padx=2, pady=2, sticky='nsew')
 
def insert_emoji(self, emoji: str, window):
    """Insert selected emoji into message"""
    self.message_entry.insert(tk.INSERT, emoji)
    window.destroy()
    self.message_entry.focus()
 
def attach_file(self):
    """Open file attachment dialog"""
    from tkinter import filedialog
    
    file_path = filedialog.askopenfilename(
        title="Select File to Share",
        filetypes=[
            ("Images", "*.png *.jpg *.jpeg *.gif *.bmp"),
            ("Documents", "*.pdf *.doc *.docx *.txt"),
            ("All Files", "*.*")
        ]
    )
    
    if file_path:
        self.send_file(file_path)
 
def send_file(self, file_path: str):
    """Send file to chat"""
    try:
        file_size = os.path.getsize(file_path)
        max_size = 10 * 1024 * 1024  # 10MB
        
        if file_size > max_size:
            messagebox.showerror("File Too Large", "File must be smaller than 10MB")
            return
        
        filename = os.path.basename(file_path)
        
        # Read and encode file
        with open(file_path, 'rb') as f:
            file_content = f.read()
        
        import base64
        encoded_content = base64.b64encode(file_content).decode('utf-8')
        
        # Send file data
        file_data = {
            'type': 'file_share',
            'filename': filename,
            'content': encoded_content,
            'size': file_size
        }
        
        if self.client:
            self.client.send_command(json.dumps(file_data))
            
            # Show file in chat
            self.display_message({
                'username': self.username,
                'content': f"📎 Uploading {filename}...",
                'type': 'file_share',
                'timestamp': datetime.datetime.now().isoformat()
            })
    
    except Exception as e:
        messagebox.showerror("File Error", f"Failed to send file: {str(e)}")

3. Typing Indicators

chatapplication.py
def on_typing(self, event):
    """Handle typing indicator"""
    if not self.connected:
        return
    
    current_time = time.time()
    
    # Send typing indicator if haven't sent one recently
    if current_time - getattr(self, 'last_typing_time', 0) > 2:
        self.last_typing_time = current_time
        
        typing_data = {
            'type': 'typing_indicator',
            'username': self.username,
            'typing': True
        }
        
        if self.client:
            self.client.send_command(json.dumps(typing_data))
    
    # Schedule stop typing indicator
    if hasattr(self, 'typing_timer'):
        self.root.after_cancel(self.typing_timer)
    
    self.typing_timer = self.root.after(3000, self.stop_typing_indicator)
 
def stop_typing_indicator(self):
    """Stop typing indicator"""
    if self.connected and self.client:
        typing_data = {
            'type': 'typing_indicator',
            'username': self.username,
            'typing': False
        }
        self.client.send_command(json.dumps(typing_data))
 
def handle_typing_indicator(self, data):
    """Handle incoming typing indicators"""
    username = data.get('username')
    is_typing = data.get('typing', False)
    
    if username == self.username:
        return  # Ignore own typing indicators
    
    if is_typing:
        if not hasattr(self, 'typing_users'):
            self.typing_users = set()
        
        self.typing_users.add(username)
        
        # Update typing display
        if len(self.typing_users) == 1:
            typing_text = f"{username} is typing..."
        elif len(self.typing_users) == 2:
            typing_text = f"{', '.join(self.typing_users)} are typing..."
        else:
            typing_text = f"{len(self.typing_users)} people are typing..."
        
        self.typing_label.config(text=typing_text)
    else:
        if hasattr(self, 'typing_users'):
            self.typing_users.discard(username)
            
            if not self.typing_users:
                self.typing_label.config(text="")
chatapplication.py
def on_typing(self, event):
    """Handle typing indicator"""
    if not self.connected:
        return
    
    current_time = time.time()
    
    # Send typing indicator if haven't sent one recently
    if current_time - getattr(self, 'last_typing_time', 0) > 2:
        self.last_typing_time = current_time
        
        typing_data = {
            'type': 'typing_indicator',
            'username': self.username,
            'typing': True
        }
        
        if self.client:
            self.client.send_command(json.dumps(typing_data))
    
    # Schedule stop typing indicator
    if hasattr(self, 'typing_timer'):
        self.root.after_cancel(self.typing_timer)
    
    self.typing_timer = self.root.after(3000, self.stop_typing_indicator)
 
def stop_typing_indicator(self):
    """Stop typing indicator"""
    if self.connected and self.client:
        typing_data = {
            'type': 'typing_indicator',
            'username': self.username,
            'typing': False
        }
        self.client.send_command(json.dumps(typing_data))
 
def handle_typing_indicator(self, data):
    """Handle incoming typing indicators"""
    username = data.get('username')
    is_typing = data.get('typing', False)
    
    if username == self.username:
        return  # Ignore own typing indicators
    
    if is_typing:
        if not hasattr(self, 'typing_users'):
            self.typing_users = set()
        
        self.typing_users.add(username)
        
        # Update typing display
        if len(self.typing_users) == 1:
            typing_text = f"{username} is typing..."
        elif len(self.typing_users) == 2:
            typing_text = f"{', '.join(self.typing_users)} are typing..."
        else:
            typing_text = f"{len(self.typing_users)} people are typing..."
        
        self.typing_label.config(text=typing_text)
    else:
        if hasattr(self, 'typing_users'):
            self.typing_users.discard(username)
            
            if not self.typing_users:
                self.typing_label.config(text="")

Performance and Scalability

1. Connection Pool Management

chatapplication.py
class ConnectionPool:
    def __init__(self, max_connections: int = 100):
        self.max_connections = max_connections
        self.active_connections = {}
        self.connection_count = 0
        self.lock = threading.Lock()
    
    def add_connection(self, client_socket: socket.socket, client_info: Dict) -> bool:
        """Add new connection to pool"""
        with self.lock:
            if self.connection_count >= self.max_connections:
                return False
            
            connection_id = f"conn_{self.connection_count + 1}"
            self.active_connections[connection_id] = {
                'socket': client_socket,
                'info': client_info,
                'connected_at': datetime.datetime.now(),
                'last_activity': datetime.datetime.now()
            }
            
            self.connection_count += 1
            return True
    
    def remove_connection(self, client_socket: socket.socket):
        """Remove connection from pool"""
        with self.lock:
            for conn_id, conn_data in list(self.active_connections.items()):
                if conn_data['socket'] == client_socket:
                    del self.active_connections[conn_id]
                    self.connection_count -= 1
                    break
    
    def cleanup_inactive_connections(self, timeout_minutes: int = 30):
        """Remove inactive connections"""
        with self.lock:
            current_time = datetime.datetime.now()
            inactive_connections = []
            
            for conn_id, conn_data in self.active_connections.items():
                last_activity = conn_data['last_activity']
                if (current_time - last_activity).total_seconds() > timeout_minutes * 60:
                    inactive_connections.append(conn_id)
            
            for conn_id in inactive_connections:
                try:
                    self.active_connections[conn_id]['socket'].close()
                    del self.active_connections[conn_id]
                    self.connection_count -= 1
                except:
                    pass
chatapplication.py
class ConnectionPool:
    def __init__(self, max_connections: int = 100):
        self.max_connections = max_connections
        self.active_connections = {}
        self.connection_count = 0
        self.lock = threading.Lock()
    
    def add_connection(self, client_socket: socket.socket, client_info: Dict) -> bool:
        """Add new connection to pool"""
        with self.lock:
            if self.connection_count >= self.max_connections:
                return False
            
            connection_id = f"conn_{self.connection_count + 1}"
            self.active_connections[connection_id] = {
                'socket': client_socket,
                'info': client_info,
                'connected_at': datetime.datetime.now(),
                'last_activity': datetime.datetime.now()
            }
            
            self.connection_count += 1
            return True
    
    def remove_connection(self, client_socket: socket.socket):
        """Remove connection from pool"""
        with self.lock:
            for conn_id, conn_data in list(self.active_connections.items()):
                if conn_data['socket'] == client_socket:
                    del self.active_connections[conn_id]
                    self.connection_count -= 1
                    break
    
    def cleanup_inactive_connections(self, timeout_minutes: int = 30):
        """Remove inactive connections"""
        with self.lock:
            current_time = datetime.datetime.now()
            inactive_connections = []
            
            for conn_id, conn_data in self.active_connections.items():
                last_activity = conn_data['last_activity']
                if (current_time - last_activity).total_seconds() > timeout_minutes * 60:
                    inactive_connections.append(conn_id)
            
            for conn_id in inactive_connections:
                try:
                    self.active_connections[conn_id]['socket'].close()
                    del self.active_connections[conn_id]
                    self.connection_count -= 1
                except:
                    pass

2. Message Queuing System

chatapplication.py
import queue
import threading
 
class MessageQueue:
    def __init__(self, max_size: int = 1000):
        self.message_queue = queue.Queue(maxsize=max_size)
        self.processing_thread = None
        self.running = False
    
    def start_processing(self):
        """Start message processing thread"""
        self.running = True
        self.processing_thread = threading.Thread(target=self._process_messages, daemon=True)
        self.processing_thread.start()
    
    def stop_processing(self):
        """Stop message processing thread"""
        self.running = False
        if self.processing_thread:
            self.processing_thread.join(timeout=5)
    
    def add_message(self, message: ChatMessage, recipients: List[socket.socket]):
        """Add message to processing queue"""
        try:
            self.message_queue.put({
                'message': message,
                'recipients': recipients,
                'timestamp': datetime.datetime.now()
            }, timeout=1)
        except queue.Full:
            logger.warning("Message queue is full, dropping message")
    
    def _process_messages(self):
        """Process messages from queue"""
        while self.running:
            try:
                queue_item = self.message_queue.get(timeout=1)
                message = queue_item['message']
                recipients = queue_item['recipients']
                
                # Send message to all recipients
                message_data = json.dumps(message.to_dict())
                failed_recipients = []
                
                for recipient_socket in recipients:
                    try:
                        recipient_socket.send(message_data.encode('utf-8'))
                    except:
                        failed_recipients.append(recipient_socket)
                
                # Remove failed recipients
                for failed_socket in failed_recipients:
                    recipients.remove(failed_socket)
                
                self.message_queue.task_done()
                
            except queue.Empty:
                continue
            except Exception as e:
                logger.error(f"Error processing message: {e}")
chatapplication.py
import queue
import threading
 
class MessageQueue:
    def __init__(self, max_size: int = 1000):
        self.message_queue = queue.Queue(maxsize=max_size)
        self.processing_thread = None
        self.running = False
    
    def start_processing(self):
        """Start message processing thread"""
        self.running = True
        self.processing_thread = threading.Thread(target=self._process_messages, daemon=True)
        self.processing_thread.start()
    
    def stop_processing(self):
        """Stop message processing thread"""
        self.running = False
        if self.processing_thread:
            self.processing_thread.join(timeout=5)
    
    def add_message(self, message: ChatMessage, recipients: List[socket.socket]):
        """Add message to processing queue"""
        try:
            self.message_queue.put({
                'message': message,
                'recipients': recipients,
                'timestamp': datetime.datetime.now()
            }, timeout=1)
        except queue.Full:
            logger.warning("Message queue is full, dropping message")
    
    def _process_messages(self):
        """Process messages from queue"""
        while self.running:
            try:
                queue_item = self.message_queue.get(timeout=1)
                message = queue_item['message']
                recipients = queue_item['recipients']
                
                # Send message to all recipients
                message_data = json.dumps(message.to_dict())
                failed_recipients = []
                
                for recipient_socket in recipients:
                    try:
                        recipient_socket.send(message_data.encode('utf-8'))
                    except:
                        failed_recipients.append(recipient_socket)
                
                # Remove failed recipients
                for failed_socket in failed_recipients:
                    recipients.remove(failed_socket)
                
                self.message_queue.task_done()
                
            except queue.Empty:
                continue
            except Exception as e:
                logger.error(f"Error processing message: {e}")

3. Load Balancing and Clustering

chatapplication.py
class ChatServerCluster:
    def __init__(self, server_id: str, cluster_nodes: List[str]):
        self.server_id = server_id
        self.cluster_nodes = cluster_nodes
        self.local_clients = {}
        self.remote_clients = {}
        self.cluster_connections = {}
    
    def setup_cluster_connections(self):
        """Setup connections to other cluster nodes"""
        for node in self.cluster_nodes:
            if node != self.server_id:
                try:
                    host, port = node.split(':')
                    cluster_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    cluster_socket.connect((host, int(port)))
                    self.cluster_connections[node] = cluster_socket
                except Exception as e:
                    logger.error(f"Failed to connect to cluster node {node}: {e}")
    
    def broadcast_to_cluster(self, message: ChatMessage, room: str):
        """Broadcast message to other cluster nodes"""
        cluster_message = {
            'type': 'cluster_broadcast',
            'message': message.to_dict(),
            'room': room,
            'origin_server': self.server_id
        }
        
        message_data = json.dumps(cluster_message)
        
        for node, connection in self.cluster_connections.items():
            try:
                connection.send(message_data.encode('utf-8'))
            except Exception as e:
                logger.error(f"Failed to send to cluster node {node}: {e}")
    
    def handle_cluster_message(self, data: Dict):
        """Handle message from another cluster node"""
        if data['origin_server'] == self.server_id:
            return  # Ignore messages from self
        
        message_data = data['message']
        room = data['room']
        
        # Reconstruct message
        message = ChatMessage.from_dict(message_data)
        
        # Broadcast to local clients in the room
        if room in self.rooms:
            for client_socket in self.rooms[room]['clients']:
                try:
                    client_socket.send(json.dumps(message.to_dict()).encode('utf-8'))
                except:
                    pass
chatapplication.py
class ChatServerCluster:
    def __init__(self, server_id: str, cluster_nodes: List[str]):
        self.server_id = server_id
        self.cluster_nodes = cluster_nodes
        self.local_clients = {}
        self.remote_clients = {}
        self.cluster_connections = {}
    
    def setup_cluster_connections(self):
        """Setup connections to other cluster nodes"""
        for node in self.cluster_nodes:
            if node != self.server_id:
                try:
                    host, port = node.split(':')
                    cluster_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    cluster_socket.connect((host, int(port)))
                    self.cluster_connections[node] = cluster_socket
                except Exception as e:
                    logger.error(f"Failed to connect to cluster node {node}: {e}")
    
    def broadcast_to_cluster(self, message: ChatMessage, room: str):
        """Broadcast message to other cluster nodes"""
        cluster_message = {
            'type': 'cluster_broadcast',
            'message': message.to_dict(),
            'room': room,
            'origin_server': self.server_id
        }
        
        message_data = json.dumps(cluster_message)
        
        for node, connection in self.cluster_connections.items():
            try:
                connection.send(message_data.encode('utf-8'))
            except Exception as e:
                logger.error(f"Failed to send to cluster node {node}: {e}")
    
    def handle_cluster_message(self, data: Dict):
        """Handle message from another cluster node"""
        if data['origin_server'] == self.server_id:
            return  # Ignore messages from self
        
        message_data = data['message']
        room = data['room']
        
        # Reconstruct message
        message = ChatMessage.from_dict(message_data)
        
        # Broadcast to local clients in the room
        if room in self.rooms:
            for client_socket in self.rooms[room]['clients']:
                try:
                    client_socket.send(json.dumps(message.to_dict()).encode('utf-8'))
                except:
                    pass

Security Enhancements

1. Message Encryption

chatapplication.py
from cryptography.fernet import Fernet
import base64
 
class MessageEncryption:
    def __init__(self):
        self.key = Fernet.generate_key()
        self.cipher = Fernet(self.key)
    
    def encrypt_message(self, message: str) -> str:
        """Encrypt message content"""
        encrypted = self.cipher.encrypt(message.encode())
        return base64.b64encode(encrypted).decode()
    
    def decrypt_message(self, encrypted_message: str) -> str:
        """Decrypt message content"""
        encrypted_bytes = base64.b64decode(encrypted_message.encode())
        decrypted = self.cipher.decrypt(encrypted_bytes)
        return decrypted.decode()
    
    def get_key_for_client(self) -> str:
        """Get encryption key for client"""
        return base64.b64encode(self.key).decode()
 
def send_encrypted_message(self, content: str):
    """Send encrypted message"""
    if hasattr(self, 'encryption') and self.encryption:
        encrypted_content = self.encryption.encrypt_message(content)
        
        message_data = {
            'type': 'encrypted_message',
            'content': encrypted_content,
            'encrypted': True
        }
    else:
        message_data = {
            'type': 'message',
            'content': content,
            'encrypted': False
        }
    
    if self.client:
        self.client.send_command(json.dumps(message_data))
chatapplication.py
from cryptography.fernet import Fernet
import base64
 
class MessageEncryption:
    def __init__(self):
        self.key = Fernet.generate_key()
        self.cipher = Fernet(self.key)
    
    def encrypt_message(self, message: str) -> str:
        """Encrypt message content"""
        encrypted = self.cipher.encrypt(message.encode())
        return base64.b64encode(encrypted).decode()
    
    def decrypt_message(self, encrypted_message: str) -> str:
        """Decrypt message content"""
        encrypted_bytes = base64.b64decode(encrypted_message.encode())
        decrypted = self.cipher.decrypt(encrypted_bytes)
        return decrypted.decode()
    
    def get_key_for_client(self) -> str:
        """Get encryption key for client"""
        return base64.b64encode(self.key).decode()
 
def send_encrypted_message(self, content: str):
    """Send encrypted message"""
    if hasattr(self, 'encryption') and self.encryption:
        encrypted_content = self.encryption.encrypt_message(content)
        
        message_data = {
            'type': 'encrypted_message',
            'content': encrypted_content,
            'encrypted': True
        }
    else:
        message_data = {
            'type': 'message',
            'content': content,
            'encrypted': False
        }
    
    if self.client:
        self.client.send_command(json.dumps(message_data))

2. Rate Limiting

chatapplication.py
class RateLimiter:
    def __init__(self, max_messages: int = 10, time_window: int = 60):
        self.max_messages = max_messages
        self.time_window = time_window
        self.client_message_counts = {}
        self.lock = threading.Lock()
    
    def check_rate_limit(self, client_id: str) -> bool:
        """Check if client is within rate limits"""
        with self.lock:
            current_time = time.time()
            
            if client_id not in self.client_message_counts:
                self.client_message_counts[client_id] = []
            
            # Remove old timestamps
            client_messages = self.client_message_counts[client_id]
            client_messages[:] = [
                timestamp for timestamp in client_messages 
                if current_time - timestamp < self.time_window
            ]
            
            # Check if under limit
            if len(client_messages) < self.max_messages:
                client_messages.append(current_time)
                return True
            
            return False
    
    def cleanup_old_entries(self):
        """Cleanup old rate limit entries"""
        with self.lock:
            current_time = time.time()
            
            for client_id in list(self.client_message_counts.keys()):
                messages = self.client_message_counts[client_id]
                messages[:] = [
                    timestamp for timestamp in messages 
                    if current_time - timestamp < self.time_window
                ]
                
                if not messages:
                    del self.client_message_counts[client_id]
chatapplication.py
class RateLimiter:
    def __init__(self, max_messages: int = 10, time_window: int = 60):
        self.max_messages = max_messages
        self.time_window = time_window
        self.client_message_counts = {}
        self.lock = threading.Lock()
    
    def check_rate_limit(self, client_id: str) -> bool:
        """Check if client is within rate limits"""
        with self.lock:
            current_time = time.time()
            
            if client_id not in self.client_message_counts:
                self.client_message_counts[client_id] = []
            
            # Remove old timestamps
            client_messages = self.client_message_counts[client_id]
            client_messages[:] = [
                timestamp for timestamp in client_messages 
                if current_time - timestamp < self.time_window
            ]
            
            # Check if under limit
            if len(client_messages) < self.max_messages:
                client_messages.append(current_time)
                return True
            
            return False
    
    def cleanup_old_entries(self):
        """Cleanup old rate limit entries"""
        with self.lock:
            current_time = time.time()
            
            for client_id in list(self.client_message_counts.keys()):
                messages = self.client_message_counts[client_id]
                messages[:] = [
                    timestamp for timestamp in messages 
                    if current_time - timestamp < self.time_window
                ]
                
                if not messages:
                    del self.client_message_counts[client_id]

Usage Examples and Testing

1. Server Setup Example

chatapplication.py
def setup_production_server():
    """Setup chat server for production use"""
    
    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler('chat_server.log'),
            logging.StreamHandler()
        ]
    )
    
    # Create server with enhanced features
    server = ChatServer(host='0.0.0.0', port=8080)
    
    # Setup security features
    server.rate_limiter = RateLimiter(max_messages=30, time_window=60)
    server.encryption = MessageEncryption()
    
    # Setup database
    server.init_database()
    
    # Create default rooms
    server.rooms.update({
        'general': {'clients': [], 'topic': 'General discussion'},
        'help': {'clients': [], 'topic': 'Get help and support'},
        'random': {'clients': [], 'topic': 'Random conversations'}
    })
    
    # Start server
    print("Starting production chat server...")
    server.start()
 
if __name__ == "__main__":
    setup_production_server()
chatapplication.py
def setup_production_server():
    """Setup chat server for production use"""
    
    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler('chat_server.log'),
            logging.StreamHandler()
        ]
    )
    
    # Create server with enhanced features
    server = ChatServer(host='0.0.0.0', port=8080)
    
    # Setup security features
    server.rate_limiter = RateLimiter(max_messages=30, time_window=60)
    server.encryption = MessageEncryption()
    
    # Setup database
    server.init_database()
    
    # Create default rooms
    server.rooms.update({
        'general': {'clients': [], 'topic': 'General discussion'},
        'help': {'clients': [], 'topic': 'Get help and support'},
        'random': {'clients': [], 'topic': 'Random conversations'}
    })
    
    # Start server
    print("Starting production chat server...")
    server.start()
 
if __name__ == "__main__":
    setup_production_server()

2. Client Testing Example

chatapplication.py
def test_chat_functionality():
    """Test chat application functionality"""
    import unittest
    
    class ChatTestCase(unittest.TestCase):
        def setUp(self):
            self.server = ChatServer(host='localhost', port=12346)
            self.server_thread = threading.Thread(target=self.server.start, daemon=True)
            self.server_thread.start()
            time.sleep(1)  # Wait for server to start
        
        def test_client_connection(self):
            """Test client can connect to server"""
            client = ChatClient("test_user", port=12346)
            self.assertTrue(client.connect())
            client.disconnect()
        
        def test_message_sending(self):
            """Test message sending between clients"""
            client1 = ChatClient("user1", port=12346)
            client2 = ChatClient("user2", port=12346)
            
            self.assertTrue(client1.connect())
            self.assertTrue(client2.connect())
            
            # Send message from client1
            client1.send_message("Hello from user1!")
            
            # Check if client2 receives message
            time.sleep(0.5)
            # Add assertion logic here
            
            client1.disconnect()
            client2.disconnect()
        
        def tearDown(self):
            self.server.stop()
    
    # Run tests
    unittest.main()
chatapplication.py
def test_chat_functionality():
    """Test chat application functionality"""
    import unittest
    
    class ChatTestCase(unittest.TestCase):
        def setUp(self):
            self.server = ChatServer(host='localhost', port=12346)
            self.server_thread = threading.Thread(target=self.server.start, daemon=True)
            self.server_thread.start()
            time.sleep(1)  # Wait for server to start
        
        def test_client_connection(self):
            """Test client can connect to server"""
            client = ChatClient("test_user", port=12346)
            self.assertTrue(client.connect())
            client.disconnect()
        
        def test_message_sending(self):
            """Test message sending between clients"""
            client1 = ChatClient("user1", port=12346)
            client2 = ChatClient("user2", port=12346)
            
            self.assertTrue(client1.connect())
            self.assertTrue(client2.connect())
            
            # Send message from client1
            client1.send_message("Hello from user1!")
            
            # Check if client2 receives message
            time.sleep(0.5)
            # Add assertion logic here
            
            client1.disconnect()
            client2.disconnect()
        
        def tearDown(self):
            self.server.stop()
    
    # Run tests
    unittest.main()

Deployment and Production Considerations

1. Docker Configuration

# Dockerfile
FROM python:3.9-slim
 
WORKDIR /app
 
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
 
COPY chatapplication.py .
COPY config.json .
 
EXPOSE 8080
 
CMD ["python", "chatapplication.py", "server"]
# Dockerfile
FROM python:3.9-slim
 
WORKDIR /app
 
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
 
COPY chatapplication.py .
COPY config.json .
 
EXPOSE 8080
 
CMD ["python", "chatapplication.py", "server"]

2. Configuration Management

chatapplication.py
import json
 
class ConfigManager:
    def __init__(self, config_file: str = "config.json"):
        self.config_file = config_file
        self.config = self.load_config()
    
    def load_config(self) -> Dict:
        """Load configuration from file"""
        try:
            with open(self.config_file, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return self.get_default_config()
    
    def get_default_config(self) -> Dict:
        """Get default configuration"""
        return {
            "server": {
                "host": "0.0.0.0",
                "port": 8080,
                "max_connections": 100
            },
            "security": {
                "rate_limit_messages": 30,
                "rate_limit_window": 60,
                "enable_encryption": True,
                "max_file_size": 10485760
            },
            "features": {
                "enable_file_sharing": True,
                "enable_private_messages": True,
                "enable_rooms": True,
                "max_message_history": 1000
            },
            "database": {
                "path": "chat_history.db",
                "backup_interval": 3600
            }
        }
    
    def save_config(self):
        """Save current configuration to file"""
        with open(self.config_file, 'w') as f:
            json.dump(self.config, f, indent=2)
chatapplication.py
import json
 
class ConfigManager:
    def __init__(self, config_file: str = "config.json"):
        self.config_file = config_file
        self.config = self.load_config()
    
    def load_config(self) -> Dict:
        """Load configuration from file"""
        try:
            with open(self.config_file, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return self.get_default_config()
    
    def get_default_config(self) -> Dict:
        """Get default configuration"""
        return {
            "server": {
                "host": "0.0.0.0",
                "port": 8080,
                "max_connections": 100
            },
            "security": {
                "rate_limit_messages": 30,
                "rate_limit_window": 60,
                "enable_encryption": True,
                "max_file_size": 10485760
            },
            "features": {
                "enable_file_sharing": True,
                "enable_private_messages": True,
                "enable_rooms": True,
                "max_message_history": 1000
            },
            "database": {
                "path": "chat_history.db",
                "backup_interval": 3600
            }
        }
    
    def save_config(self):
        """Save current configuration to file"""
        with open(self.config_file, 'w') as f:
            json.dump(self.config, f, indent=2)

This comprehensive chat application demonstrates advanced socket programming concepts, real-time communication, and modern chat features. The modular design allows for easy customization and extension based on specific requirements. 💬🚀

Was this page helpful?

Let us know how we did