Chat Application with Socket Programming
Abstract
Create a comprehensive real-time chat application using socket programming that supports multiple clients, private messaging, chat rooms, and advanced features. This project demonstrates network programming, concurrent connections, message broadcasting, and building distributed communication systems.
Prerequisites
- Python 3.7 or above
- Text Editor or IDE
- Solid understanding of Python syntax and networking concepts
- Knowledge of socket programming and TCP/UDP protocols
- Familiarity with threading and concurrent programming
- Understanding of client-server architecture
- Basic knowledge of GUI development with Tkinter
Getting Started
Create a new project
- Create a new project folder and name it
chatApplication
chatApplication
. - Create a new file and name it
chatapplication.py
chatapplication.py
. - Install required dependencies:
pip install tkinter threading socket
pip install tkinter threading socket
- Open the project folder in your favorite text editor or IDE.
- Copy the code below and paste it into your
chatapplication.py
chatapplication.py
file.
Write the code
- Add the following code to your
chatapplication.py
chatapplication.py
file.
⚙️ Chat Application with Socket Programming
# Chat Application with Socket Programming
import socket
import threading
import json
import time
import hashlib
import datetime
from typing import Dict, List, Optional, Tuple
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox, simpledialog
import queue
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class ChatMessage:
"""Represents a chat message"""
def __init__(self, username: str, content: str, message_type: str = "message",
timestamp: Optional[datetime.datetime] = None):
self.username = username
self.content = content
self.message_type = message_type # message, join, leave, system
self.timestamp = timestamp or datetime.datetime.now()
self.id = hashlib.md5(f"{username}{content}{self.timestamp}".encode()).hexdigest()[:8]
def to_dict(self) -> dict:
"""Convert message to dictionary"""
return {
'id': self.id,
'username': self.username,
'content': self.content,
'type': self.message_type,
'timestamp': self.timestamp.isoformat()
}
@classmethod
def from_dict(cls, data: dict) -> 'ChatMessage':
"""Create message from dictionary"""
timestamp = datetime.datetime.fromisoformat(data['timestamp'])
msg = cls(data['username'], data['content'], data['type'], timestamp)
msg.id = data['id']
return msg
class ChatServer:
"""Chat server handling multiple clients"""
def __init__(self, host: str = 'localhost', port: int = 12345):
self.host = host
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.clients: Dict[socket.socket, dict] = {}
self.message_history: List[ChatMessage] = []
self.running = False
self.max_history = 100
# Room management
self.rooms: Dict[str, List[socket.socket]] = {'general': []}
self.default_room = 'general'
def start(self):
"""Start the chat server"""
try:
self.socket.bind((self.host, self.port))
self.socket.listen(5)
self.running = True
logger.info(f"Chat server started on {self.host}:{self.port}")
while self.running:
try:
client_socket, address = self.socket.accept()
logger.info(f"New connection from {address}")
# Start client handler thread
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, address),
daemon=True
)
client_thread.start()
except OSError:
if self.running:
logger.error("Error accepting connections")
break
except Exception as e:
logger.error(f"Server error: {e}")
finally:
self.stop()
def handle_client(self, client_socket: socket.socket, address: Tuple[str, int]):
"""Handle individual client connection"""
username = None
try:
# Initial handshake - get username
client_socket.send(json.dumps({
'type': 'handshake',
'message': 'Please provide username'
}).encode('utf-8'))
# Receive username
data = client_socket.recv(1024).decode('utf-8')
handshake_data = json.loads(data)
username = handshake_data.get('username', f'User_{address[1]}')
# Check if username is already taken
while any(client['username'] == username for client in self.clients.values()):
username = f"{username}_{len(self.clients)}"
# Add client to clients list
self.clients[client_socket] = {
'username': username,
'address': address,
'room': self.default_room,
'joined_at': datetime.datetime.now()
}
# Add to default room
self.rooms[self.default_room].append(client_socket)
# Send welcome message and recent history
welcome_data = {
'type': 'welcome',
'username': username,
'room': self.default_room,
'history': [msg.to_dict() for msg in self.message_history[-20:]]
}
client_socket.send(json.dumps(welcome_data).encode('utf-8'))
# Broadcast join message
join_message = ChatMessage(username, f"{username} joined the chat", "join")
self.broadcast_message(join_message, exclude=client_socket)
self.add_to_history(join_message)
# Handle client messages
while self.running:
try:
data = client_socket.recv(1024).decode('utf-8')
if not data:
break
message_data = json.loads(data)
self.process_message(client_socket, message_data)
except json.JSONDecodeError:
logger.warning(f"Invalid JSON from {username}")
except Exception as e:
logger.error(f"Error handling message from {username}: {e}")
break
except Exception as e:
logger.error(f"Error in client handler for {address}: {e}")
finally:
self.disconnect_client(client_socket, username)
def process_message(self, client_socket: socket.socket, message_data: dict):
"""Process incoming message from client"""
client_info = self.clients.get(client_socket)
if not client_info:
return
username = client_info['username']
message_type = message_data.get('type', 'message')
content = message_data.get('content', '')
if message_type == 'message':
# Regular chat message
if content.strip():
chat_message = ChatMessage(username, content, "message")
self.broadcast_message(chat_message, room=client_info['room'])
self.add_to_history(chat_message)
elif message_type == 'command':
# Handle chat commands
self.handle_command(client_socket, content)
elif message_type == 'private':
# Private message
target_username = message_data.get('target')
self.send_private_message(client_socket, target_username, content)
def handle_command(self, client_socket: socket.socket, command: str):
"""Handle chat commands"""
client_info = self.clients[client_socket]
username = client_info['username']
parts = command.strip().split()
if not parts:
return
cmd = parts[0].lower()
if cmd == '/help':
help_text = """
Available commands:
/help - Show this help
/users - List online users
/rooms - List available rooms
/join <room> - Join a room
/create <room> - Create a new room
/pm <username> <message> - Send private message
/time - Show current server time
"""
self.send_system_message(client_socket, help_text)
elif cmd == '/users':
users = [info['username'] for info in self.clients.values()]
user_list = f"Online users ({len(users)}): {', '.join(users)}"
self.send_system_message(client_socket, user_list)
elif cmd == '/rooms':
room_info = []
for room, clients in self.rooms.items():
room_info.append(f"{room} ({len(clients)} users)")
room_list = f"Available rooms: {', '.join(room_info)}"
self.send_system_message(client_socket, room_list)
elif cmd == '/join' and len(parts) > 1:
new_room = parts[1]
self.move_client_to_room(client_socket, new_room)
elif cmd == '/create' and len(parts) > 1:
new_room = parts[1]
if new_room not in self.rooms:
self.rooms[new_room] = []
self.send_system_message(client_socket, f"Room '{new_room}' created")
self.move_client_to_room(client_socket, new_room)
else:
self.send_system_message(client_socket, f"Room '{new_room}' already exists")
elif cmd == '/pm' and len(parts) > 2:
target_username = parts[1]
message_content = ' '.join(parts[2:])
self.send_private_message(client_socket, target_username, message_content)
elif cmd == '/time':
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.send_system_message(client_socket, f"Server time: {current_time}")
else:
self.send_system_message(client_socket, "Unknown command. Type /help for available commands.")
def move_client_to_room(self, client_socket: socket.socket, room_name: str):
"""Move client to a different room"""
client_info = self.clients[client_socket]
username = client_info['username']
old_room = client_info['room']
# Remove from old room
if client_socket in self.rooms[old_room]:
self.rooms[old_room].remove(client_socket)
# Add to new room
if room_name not in self.rooms:
self.rooms[room_name] = []
self.rooms[room_name].append(client_socket)
client_info['room'] = room_name
# Notify client
self.send_system_message(client_socket, f"Moved to room '{room_name}'")
# Notify rooms
leave_msg = ChatMessage(username, f"{username} left the room", "leave")
self.broadcast_message(leave_msg, room=old_room, exclude=client_socket)
join_msg = ChatMessage(username, f"{username} joined the room", "join")
self.broadcast_message(join_msg, room=room_name, exclude=client_socket)
def send_private_message(self, sender_socket: socket.socket, target_username: str, content: str):
"""Send private message between users"""
sender_info = self.clients[sender_socket]
sender_username = sender_info['username']
# Find target client
target_socket = None
for client_socket, info in self.clients.items():
if info['username'] == target_username:
target_socket = client_socket
break
if target_socket:
# Send to target
pm_data = {
'type': 'private',
'from': sender_username,
'content': content,
'timestamp': datetime.datetime.now().isoformat()
}
target_socket.send(json.dumps(pm_data).encode('utf-8'))
# Confirm to sender
self.send_system_message(sender_socket, f"Private message sent to {target_username}")
else:
self.send_system_message(sender_socket, f"User '{target_username}' not found")
def send_system_message(self, client_socket: socket.socket, content: str):
"""Send system message to specific client"""
system_data = {
'type': 'system',
'content': content,
'timestamp': datetime.datetime.now().isoformat()
}
try:
client_socket.send(json.dumps(system_data).encode('utf-8'))
except:
pass
def broadcast_message(self, message: ChatMessage, room: str = None, exclude: socket.socket = None):
"""Broadcast message to clients"""
message_data = json.dumps(message.to_dict()).encode('utf-8')
if room:
# Broadcast to specific room
clients_to_send = self.rooms.get(room, [])
else:
# Broadcast to all clients
clients_to_send = list(self.clients.keys())
for client_socket in clients_to_send[:]: # Copy list to avoid modification during iteration
if client_socket != exclude:
try:
client_socket.send(message_data)
except:
# Client disconnected, remove from lists
self.disconnect_client(client_socket)
def add_to_history(self, message: ChatMessage):
"""Add message to history"""
self.message_history.append(message)
if len(self.message_history) > self.max_history:
self.message_history = self.message_history[-self.max_history:]
def disconnect_client(self, client_socket: socket.socket, username: str = None):
"""Handle client disconnection"""
if client_socket in self.clients:
client_info = self.clients[client_socket]
username = username or client_info['username']
room = client_info['room']
# Remove from room
if room in self.rooms and client_socket in self.rooms[room]:
self.rooms[room].remove(client_socket)
# Remove from clients
del self.clients[client_socket]
# Broadcast leave message
leave_message = ChatMessage(username, f"{username} left the chat", "leave")
self.broadcast_message(leave_message, room=room, exclude=client_socket)
self.add_to_history(leave_message)
logger.info(f"Client {username} disconnected")
try:
client_socket.close()
except:
pass
def stop(self):
"""Stop the server"""
self.running = False
# Close all client connections
for client_socket in list(self.clients.keys()):
try:
client_socket.close()
except:
pass
# Close server socket
try:
self.socket.close()
except:
pass
logger.info("Chat server stopped")
class ChatClient:
"""Chat client for connecting to server"""
def __init__(self, username: str, host: str = 'localhost', port: int = 12345):
self.username = username
self.host = host
self.port = port
self.socket = None
self.connected = False
self.message_queue = queue.Queue()
def connect(self) -> bool:
"""Connect to chat server"""
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
# Wait for handshake
data = self.socket.recv(1024).decode('utf-8')
handshake = json.loads(data)
if handshake['type'] == 'handshake':
# Send username
response = {'username': self.username}
self.socket.send(json.dumps(response).encode('utf-8'))
# Wait for welcome
data = self.socket.recv(4096).decode('utf-8')
welcome = json.loads(data)
if welcome['type'] == 'welcome':
self.connected = True
self.username = welcome['username'] # May have been modified
# Start message listener thread
listener_thread = threading.Thread(target=self.listen_for_messages, daemon=True)
listener_thread.start()
return True
return False
except Exception as e:
logger.error(f"Connection error: {e}")
return False
def listen_for_messages(self):
"""Listen for incoming messages"""
while self.connected:
try:
data = self.socket.recv(4096).decode('utf-8')
if not data:
break
message_data = json.loads(data)
self.message_queue.put(message_data)
except Exception as e:
if self.connected:
logger.error(f"Error receiving message: {e}")
break
self.connected = False
def send_message(self, content: str):
"""Send chat message"""
if not self.connected:
return False
try:
message_data = {
'type': 'message',
'content': content
}
self.socket.send(json.dumps(message_data).encode('utf-8'))
return True
except Exception as e:
logger.error(f"Error sending message: {e}")
return False
def send_command(self, command: str):
"""Send chat command"""
if not self.connected:
return False
try:
command_data = {
'type': 'command',
'content': command
}
self.socket.send(json.dumps(command_data).encode('utf-8'))
return True
except Exception as e:
logger.error(f"Error sending command: {e}")
return False
def disconnect(self):
"""Disconnect from server"""
self.connected = False
if self.socket:
try:
self.socket.close()
except:
pass
class ChatGUI:
"""GUI for chat client"""
def __init__(self):
self.client = None
self.root = tk.Tk()
self.root.title("Chat Application")
self.root.geometry("800x600")
self.setup_ui()
self.update_messages_thread = None
def setup_ui(self):
"""Setup the user interface"""
# Main frame
main_frame = ttk.Frame(self.root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# Configure grid weights
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
main_frame.columnconfigure(0, weight=1)
main_frame.rowconfigure(1, weight=1)
# Connection frame
conn_frame = ttk.LabelFrame(main_frame, text="Connection", padding="5")
conn_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=(0, 10))
conn_frame.columnconfigure(1, weight=1)
# Connection controls
ttk.Label(conn_frame, text="Username:").grid(row=0, column=0, sticky=tk.W)
self.username_var = tk.StringVar(value="User")
ttk.Entry(conn_frame, textvariable=self.username_var, width=15).grid(row=0, column=1, padx=(5, 10), sticky=tk.W)
ttk.Label(conn_frame, text="Host:").grid(row=0, column=2, sticky=tk.W)
self.host_var = tk.StringVar(value="localhost")
ttk.Entry(conn_frame, textvariable=self.host_var, width=15).grid(row=0, column=3, padx=(5, 10), sticky=tk.W)
ttk.Label(conn_frame, text="Port:").grid(row=0, column=4, sticky=tk.W)
self.port_var = tk.StringVar(value="12345")
ttk.Entry(conn_frame, textvariable=self.port_var, width=8).grid(row=0, column=5, padx=(5, 10), sticky=tk.W)
self.connect_button = ttk.Button(conn_frame, text="Connect", command=self.connect_to_server)
self.connect_button.grid(row=0, column=6, padx=(5, 0))
self.disconnect_button = ttk.Button(conn_frame, text="Disconnect", command=self.disconnect_from_server, state=tk.DISABLED)
self.disconnect_button.grid(row=0, column=7, padx=(5, 0))
# Status
self.status_var = tk.StringVar(value="Not connected")
status_label = ttk.Label(conn_frame, textvariable=self.status_var, foreground="red")
status_label.grid(row=1, column=0, columnspan=8, sticky=tk.W, pady=(5, 0))
# Chat frame
chat_frame = ttk.LabelFrame(main_frame, text="Chat", padding="5")
chat_frame.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
chat_frame.columnconfigure(0, weight=1)
chat_frame.rowconfigure(0, weight=1)
# Messages display
self.messages_display = scrolledtext.ScrolledText(chat_frame, state=tk.DISABLED, height=20)
self.messages_display.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10))
# Message input frame
input_frame = ttk.Frame(chat_frame)
input_frame.grid(row=1, column=0, sticky=(tk.W, tk.E))
input_frame.columnconfigure(0, weight=1)
# Message input
self.message_var = tk.StringVar()
self.message_entry = ttk.Entry(input_frame, textvariable=self.message_var)
self.message_entry.grid(row=0, column=0, sticky=(tk.W, tk.E), padx=(0, 10))
self.message_entry.bind('<Return>', self.send_message_event)
# Send button
self.send_button = ttk.Button(input_frame, text="Send", command=self.send_message, state=tk.DISABLED)
self.send_button.grid(row=0, column=1)
# Help text
help_frame = ttk.Frame(main_frame)
help_frame.grid(row=2, column=0, sticky=(tk.W, tk.E), pady=(10, 0))
help_text = "Commands: /help, /users, /rooms, /join <room>, /create <room>, /pm <user> <msg>, /time"
ttk.Label(help_frame, text=help_text, font=("TkDefaultFont", 8)).pack()
def connect_to_server(self):
"""Connect to chat server"""
username = self.username_var.get().strip()
host = self.host_var.get().strip()
try:
port = int(self.port_var.get())
except ValueError:
messagebox.showerror("Error", "Invalid port number")
return
if not username:
messagebox.showerror("Error", "Please enter a username")
return
# Create client and connect
self.client = ChatClient(username, host, port)
if self.client.connect():
self.status_var.set(f"Connected as {self.client.username}")
self.status_var.set(f"Connected as {self.client.username}")
# Update UI state
self.connect_button.config(state=tk.DISABLED)
self.disconnect_button.config(state=tk.NORMAL)
self.send_button.config(state=tk.NORMAL)
self.message_entry.config(state=tk.NORMAL)
# Start message update thread
self.update_messages_thread = threading.Thread(target=self.update_messages, daemon=True)
self.update_messages_thread.start()
# Clear and focus message entry
self.message_entry.focus()
else:
messagebox.showerror("Error", "Failed to connect to server")
self.client = None
def disconnect_from_server(self):
"""Disconnect from chat server"""
if self.client:
self.client.disconnect()
self.client = None
# Update UI state
self.status_var.set("Not connected")
self.connect_button.config(state=tk.NORMAL)
self.disconnect_button.config(state=tk.DISABLED)
self.send_button.config(state=tk.DISABLED)
self.message_entry.config(state=tk.DISABLED)
def send_message_event(self, event):
"""Handle Enter key press"""
self.send_message()
def send_message(self):
"""Send message or command"""
if not self.client or not self.client.connected:
return
content = self.message_var.get().strip()
if not content:
return
if content.startswith('/'):
# Send as command
self.client.send_command(content)
else:
# Send as message
self.client.send_message(content)
# Clear input
self.message_var.set("")
def update_messages(self):
"""Update messages from server"""
while self.client and self.client.connected:
try:
# Get message from queue (blocking with timeout)
message_data = self.client.message_queue.get(timeout=1.0)
# Update UI in main thread
self.root.after(0, self.display_message, message_data)
except queue.Empty:
continue
except Exception as e:
logger.error(f"Error updating messages: {e}")
break
def display_message(self, message_data: dict):
"""Display message in chat window"""
self.messages_display.config(state=tk.NORMAL)
message_type = message_data.get('type', 'message')
if message_type == 'welcome':
# Display welcome message and history
username = message_data['username']
room = message_data['room']
self.messages_display.insert(tk.END, f"=== Welcome {username} to room '{room}' ===\n", 'system')
# Display history
history = message_data.get('history', [])
for msg_dict in history:
self.format_and_insert_message(msg_dict)
elif message_type == 'system':
# System message
content = message_data['content']
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
self.messages_display.insert(tk.END, f"[{timestamp}] SYSTEM: {content}\n", 'system')
elif message_type == 'private':
# Private message
from_user = message_data['from']
content = message_data['content']
timestamp = datetime.datetime.fromisoformat(message_data['timestamp']).strftime("%H:%M:%S")
self.messages_display.insert(tk.END, f"[{timestamp}] PRIVATE from {from_user}: {content}\n", 'private')
else:
# Regular message
self.format_and_insert_message(message_data)
# Configure tags for styling
self.messages_display.tag_config('system', foreground='blue')
self.messages_display.tag_config('private', foreground='purple')
self.messages_display.tag_config('join', foreground='green')
self.messages_display.tag_config('leave', foreground='red')
# Scroll to bottom
self.messages_display.see(tk.END)
self.messages_display.config(state=tk.DISABLED)
def format_and_insert_message(self, message_data: dict):
"""Format and insert a chat message"""
username = message_data['username']
content = message_data['content']
msg_type = message_data.get('type', 'message')
timestamp = datetime.datetime.fromisoformat(message_data['timestamp']).strftime("%H:%M:%S")
if msg_type == 'join':
self.messages_display.insert(tk.END, f"[{timestamp}] {content}\n", 'join')
elif msg_type == 'leave':
self.messages_display.insert(tk.END, f"[{timestamp}] {content}\n", 'leave')
else:
self.messages_display.insert(tk.END, f"[{timestamp}] {username}: {content}\n")
def run(self):
"""Run the GUI application"""
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
self.root.mainloop()
def on_closing(self):
"""Handle window closing"""
if self.client:
self.client.disconnect()
self.root.destroy()
def start_server():
"""Start chat server"""
print("Starting chat server...")
server = ChatServer()
try:
server.start()
except KeyboardInterrupt:
print("\nShutting down server...")
server.stop()
def start_client():
"""Start chat client GUI"""
app = ChatGUI()
app.run()
def main():
"""Main function"""
import sys
if len(sys.argv) > 1:
if sys.argv[1] == 'server':
start_server()
elif sys.argv[1] == 'client':
start_client()
else:
print("Usage: python chatapp.py [server|client]")
else:
# Default to client
start_client()
if __name__ == "__main__":
main()
# Chat Application with Socket Programming
import socket
import threading
import json
import time
import hashlib
import datetime
from typing import Dict, List, Optional, Tuple
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox, simpledialog
import queue
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class ChatMessage:
"""Represents a chat message"""
def __init__(self, username: str, content: str, message_type: str = "message",
timestamp: Optional[datetime.datetime] = None):
self.username = username
self.content = content
self.message_type = message_type # message, join, leave, system
self.timestamp = timestamp or datetime.datetime.now()
self.id = hashlib.md5(f"{username}{content}{self.timestamp}".encode()).hexdigest()[:8]
def to_dict(self) -> dict:
"""Convert message to dictionary"""
return {
'id': self.id,
'username': self.username,
'content': self.content,
'type': self.message_type,
'timestamp': self.timestamp.isoformat()
}
@classmethod
def from_dict(cls, data: dict) -> 'ChatMessage':
"""Create message from dictionary"""
timestamp = datetime.datetime.fromisoformat(data['timestamp'])
msg = cls(data['username'], data['content'], data['type'], timestamp)
msg.id = data['id']
return msg
class ChatServer:
"""Chat server handling multiple clients"""
def __init__(self, host: str = 'localhost', port: int = 12345):
self.host = host
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.clients: Dict[socket.socket, dict] = {}
self.message_history: List[ChatMessage] = []
self.running = False
self.max_history = 100
# Room management
self.rooms: Dict[str, List[socket.socket]] = {'general': []}
self.default_room = 'general'
def start(self):
"""Start the chat server"""
try:
self.socket.bind((self.host, self.port))
self.socket.listen(5)
self.running = True
logger.info(f"Chat server started on {self.host}:{self.port}")
while self.running:
try:
client_socket, address = self.socket.accept()
logger.info(f"New connection from {address}")
# Start client handler thread
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, address),
daemon=True
)
client_thread.start()
except OSError:
if self.running:
logger.error("Error accepting connections")
break
except Exception as e:
logger.error(f"Server error: {e}")
finally:
self.stop()
def handle_client(self, client_socket: socket.socket, address: Tuple[str, int]):
"""Handle individual client connection"""
username = None
try:
# Initial handshake - get username
client_socket.send(json.dumps({
'type': 'handshake',
'message': 'Please provide username'
}).encode('utf-8'))
# Receive username
data = client_socket.recv(1024).decode('utf-8')
handshake_data = json.loads(data)
username = handshake_data.get('username', f'User_{address[1]}')
# Check if username is already taken
while any(client['username'] == username for client in self.clients.values()):
username = f"{username}_{len(self.clients)}"
# Add client to clients list
self.clients[client_socket] = {
'username': username,
'address': address,
'room': self.default_room,
'joined_at': datetime.datetime.now()
}
# Add to default room
self.rooms[self.default_room].append(client_socket)
# Send welcome message and recent history
welcome_data = {
'type': 'welcome',
'username': username,
'room': self.default_room,
'history': [msg.to_dict() for msg in self.message_history[-20:]]
}
client_socket.send(json.dumps(welcome_data).encode('utf-8'))
# Broadcast join message
join_message = ChatMessage(username, f"{username} joined the chat", "join")
self.broadcast_message(join_message, exclude=client_socket)
self.add_to_history(join_message)
# Handle client messages
while self.running:
try:
data = client_socket.recv(1024).decode('utf-8')
if not data:
break
message_data = json.loads(data)
self.process_message(client_socket, message_data)
except json.JSONDecodeError:
logger.warning(f"Invalid JSON from {username}")
except Exception as e:
logger.error(f"Error handling message from {username}: {e}")
break
except Exception as e:
logger.error(f"Error in client handler for {address}: {e}")
finally:
self.disconnect_client(client_socket, username)
def process_message(self, client_socket: socket.socket, message_data: dict):
"""Process incoming message from client"""
client_info = self.clients.get(client_socket)
if not client_info:
return
username = client_info['username']
message_type = message_data.get('type', 'message')
content = message_data.get('content', '')
if message_type == 'message':
# Regular chat message
if content.strip():
chat_message = ChatMessage(username, content, "message")
self.broadcast_message(chat_message, room=client_info['room'])
self.add_to_history(chat_message)
elif message_type == 'command':
# Handle chat commands
self.handle_command(client_socket, content)
elif message_type == 'private':
# Private message
target_username = message_data.get('target')
self.send_private_message(client_socket, target_username, content)
def handle_command(self, client_socket: socket.socket, command: str):
"""Handle chat commands"""
client_info = self.clients[client_socket]
username = client_info['username']
parts = command.strip().split()
if not parts:
return
cmd = parts[0].lower()
if cmd == '/help':
help_text = """
Available commands:
/help - Show this help
/users - List online users
/rooms - List available rooms
/join <room> - Join a room
/create <room> - Create a new room
/pm <username> <message> - Send private message
/time - Show current server time
"""
self.send_system_message(client_socket, help_text)
elif cmd == '/users':
users = [info['username'] for info in self.clients.values()]
user_list = f"Online users ({len(users)}): {', '.join(users)}"
self.send_system_message(client_socket, user_list)
elif cmd == '/rooms':
room_info = []
for room, clients in self.rooms.items():
room_info.append(f"{room} ({len(clients)} users)")
room_list = f"Available rooms: {', '.join(room_info)}"
self.send_system_message(client_socket, room_list)
elif cmd == '/join' and len(parts) > 1:
new_room = parts[1]
self.move_client_to_room(client_socket, new_room)
elif cmd == '/create' and len(parts) > 1:
new_room = parts[1]
if new_room not in self.rooms:
self.rooms[new_room] = []
self.send_system_message(client_socket, f"Room '{new_room}' created")
self.move_client_to_room(client_socket, new_room)
else:
self.send_system_message(client_socket, f"Room '{new_room}' already exists")
elif cmd == '/pm' and len(parts) > 2:
target_username = parts[1]
message_content = ' '.join(parts[2:])
self.send_private_message(client_socket, target_username, message_content)
elif cmd == '/time':
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.send_system_message(client_socket, f"Server time: {current_time}")
else:
self.send_system_message(client_socket, "Unknown command. Type /help for available commands.")
def move_client_to_room(self, client_socket: socket.socket, room_name: str):
"""Move client to a different room"""
client_info = self.clients[client_socket]
username = client_info['username']
old_room = client_info['room']
# Remove from old room
if client_socket in self.rooms[old_room]:
self.rooms[old_room].remove(client_socket)
# Add to new room
if room_name not in self.rooms:
self.rooms[room_name] = []
self.rooms[room_name].append(client_socket)
client_info['room'] = room_name
# Notify client
self.send_system_message(client_socket, f"Moved to room '{room_name}'")
# Notify rooms
leave_msg = ChatMessage(username, f"{username} left the room", "leave")
self.broadcast_message(leave_msg, room=old_room, exclude=client_socket)
join_msg = ChatMessage(username, f"{username} joined the room", "join")
self.broadcast_message(join_msg, room=room_name, exclude=client_socket)
def send_private_message(self, sender_socket: socket.socket, target_username: str, content: str):
"""Send private message between users"""
sender_info = self.clients[sender_socket]
sender_username = sender_info['username']
# Find target client
target_socket = None
for client_socket, info in self.clients.items():
if info['username'] == target_username:
target_socket = client_socket
break
if target_socket:
# Send to target
pm_data = {
'type': 'private',
'from': sender_username,
'content': content,
'timestamp': datetime.datetime.now().isoformat()
}
target_socket.send(json.dumps(pm_data).encode('utf-8'))
# Confirm to sender
self.send_system_message(sender_socket, f"Private message sent to {target_username}")
else:
self.send_system_message(sender_socket, f"User '{target_username}' not found")
def send_system_message(self, client_socket: socket.socket, content: str):
"""Send system message to specific client"""
system_data = {
'type': 'system',
'content': content,
'timestamp': datetime.datetime.now().isoformat()
}
try:
client_socket.send(json.dumps(system_data).encode('utf-8'))
except:
pass
def broadcast_message(self, message: ChatMessage, room: str = None, exclude: socket.socket = None):
"""Broadcast message to clients"""
message_data = json.dumps(message.to_dict()).encode('utf-8')
if room:
# Broadcast to specific room
clients_to_send = self.rooms.get(room, [])
else:
# Broadcast to all clients
clients_to_send = list(self.clients.keys())
for client_socket in clients_to_send[:]: # Copy list to avoid modification during iteration
if client_socket != exclude:
try:
client_socket.send(message_data)
except:
# Client disconnected, remove from lists
self.disconnect_client(client_socket)
def add_to_history(self, message: ChatMessage):
"""Add message to history"""
self.message_history.append(message)
if len(self.message_history) > self.max_history:
self.message_history = self.message_history[-self.max_history:]
def disconnect_client(self, client_socket: socket.socket, username: str = None):
"""Handle client disconnection"""
if client_socket in self.clients:
client_info = self.clients[client_socket]
username = username or client_info['username']
room = client_info['room']
# Remove from room
if room in self.rooms and client_socket in self.rooms[room]:
self.rooms[room].remove(client_socket)
# Remove from clients
del self.clients[client_socket]
# Broadcast leave message
leave_message = ChatMessage(username, f"{username} left the chat", "leave")
self.broadcast_message(leave_message, room=room, exclude=client_socket)
self.add_to_history(leave_message)
logger.info(f"Client {username} disconnected")
try:
client_socket.close()
except:
pass
def stop(self):
"""Stop the server"""
self.running = False
# Close all client connections
for client_socket in list(self.clients.keys()):
try:
client_socket.close()
except:
pass
# Close server socket
try:
self.socket.close()
except:
pass
logger.info("Chat server stopped")
class ChatClient:
"""Chat client for connecting to server"""
def __init__(self, username: str, host: str = 'localhost', port: int = 12345):
self.username = username
self.host = host
self.port = port
self.socket = None
self.connected = False
self.message_queue = queue.Queue()
def connect(self) -> bool:
"""Connect to chat server"""
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
# Wait for handshake
data = self.socket.recv(1024).decode('utf-8')
handshake = json.loads(data)
if handshake['type'] == 'handshake':
# Send username
response = {'username': self.username}
self.socket.send(json.dumps(response).encode('utf-8'))
# Wait for welcome
data = self.socket.recv(4096).decode('utf-8')
welcome = json.loads(data)
if welcome['type'] == 'welcome':
self.connected = True
self.username = welcome['username'] # May have been modified
# Start message listener thread
listener_thread = threading.Thread(target=self.listen_for_messages, daemon=True)
listener_thread.start()
return True
return False
except Exception as e:
logger.error(f"Connection error: {e}")
return False
def listen_for_messages(self):
"""Listen for incoming messages"""
while self.connected:
try:
data = self.socket.recv(4096).decode('utf-8')
if not data:
break
message_data = json.loads(data)
self.message_queue.put(message_data)
except Exception as e:
if self.connected:
logger.error(f"Error receiving message: {e}")
break
self.connected = False
def send_message(self, content: str):
"""Send chat message"""
if not self.connected:
return False
try:
message_data = {
'type': 'message',
'content': content
}
self.socket.send(json.dumps(message_data).encode('utf-8'))
return True
except Exception as e:
logger.error(f"Error sending message: {e}")
return False
def send_command(self, command: str):
"""Send chat command"""
if not self.connected:
return False
try:
command_data = {
'type': 'command',
'content': command
}
self.socket.send(json.dumps(command_data).encode('utf-8'))
return True
except Exception as e:
logger.error(f"Error sending command: {e}")
return False
def disconnect(self):
"""Disconnect from server"""
self.connected = False
if self.socket:
try:
self.socket.close()
except:
pass
class ChatGUI:
"""GUI for chat client"""
def __init__(self):
self.client = None
self.root = tk.Tk()
self.root.title("Chat Application")
self.root.geometry("800x600")
self.setup_ui()
self.update_messages_thread = None
def setup_ui(self):
"""Setup the user interface"""
# Main frame
main_frame = ttk.Frame(self.root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# Configure grid weights
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
main_frame.columnconfigure(0, weight=1)
main_frame.rowconfigure(1, weight=1)
# Connection frame
conn_frame = ttk.LabelFrame(main_frame, text="Connection", padding="5")
conn_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=(0, 10))
conn_frame.columnconfigure(1, weight=1)
# Connection controls
ttk.Label(conn_frame, text="Username:").grid(row=0, column=0, sticky=tk.W)
self.username_var = tk.StringVar(value="User")
ttk.Entry(conn_frame, textvariable=self.username_var, width=15).grid(row=0, column=1, padx=(5, 10), sticky=tk.W)
ttk.Label(conn_frame, text="Host:").grid(row=0, column=2, sticky=tk.W)
self.host_var = tk.StringVar(value="localhost")
ttk.Entry(conn_frame, textvariable=self.host_var, width=15).grid(row=0, column=3, padx=(5, 10), sticky=tk.W)
ttk.Label(conn_frame, text="Port:").grid(row=0, column=4, sticky=tk.W)
self.port_var = tk.StringVar(value="12345")
ttk.Entry(conn_frame, textvariable=self.port_var, width=8).grid(row=0, column=5, padx=(5, 10), sticky=tk.W)
self.connect_button = ttk.Button(conn_frame, text="Connect", command=self.connect_to_server)
self.connect_button.grid(row=0, column=6, padx=(5, 0))
self.disconnect_button = ttk.Button(conn_frame, text="Disconnect", command=self.disconnect_from_server, state=tk.DISABLED)
self.disconnect_button.grid(row=0, column=7, padx=(5, 0))
# Status
self.status_var = tk.StringVar(value="Not connected")
status_label = ttk.Label(conn_frame, textvariable=self.status_var, foreground="red")
status_label.grid(row=1, column=0, columnspan=8, sticky=tk.W, pady=(5, 0))
# Chat frame
chat_frame = ttk.LabelFrame(main_frame, text="Chat", padding="5")
chat_frame.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
chat_frame.columnconfigure(0, weight=1)
chat_frame.rowconfigure(0, weight=1)
# Messages display
self.messages_display = scrolledtext.ScrolledText(chat_frame, state=tk.DISABLED, height=20)
self.messages_display.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10))
# Message input frame
input_frame = ttk.Frame(chat_frame)
input_frame.grid(row=1, column=0, sticky=(tk.W, tk.E))
input_frame.columnconfigure(0, weight=1)
# Message input
self.message_var = tk.StringVar()
self.message_entry = ttk.Entry(input_frame, textvariable=self.message_var)
self.message_entry.grid(row=0, column=0, sticky=(tk.W, tk.E), padx=(0, 10))
self.message_entry.bind('<Return>', self.send_message_event)
# Send button
self.send_button = ttk.Button(input_frame, text="Send", command=self.send_message, state=tk.DISABLED)
self.send_button.grid(row=0, column=1)
# Help text
help_frame = ttk.Frame(main_frame)
help_frame.grid(row=2, column=0, sticky=(tk.W, tk.E), pady=(10, 0))
help_text = "Commands: /help, /users, /rooms, /join <room>, /create <room>, /pm <user> <msg>, /time"
ttk.Label(help_frame, text=help_text, font=("TkDefaultFont", 8)).pack()
def connect_to_server(self):
"""Connect to chat server"""
username = self.username_var.get().strip()
host = self.host_var.get().strip()
try:
port = int(self.port_var.get())
except ValueError:
messagebox.showerror("Error", "Invalid port number")
return
if not username:
messagebox.showerror("Error", "Please enter a username")
return
# Create client and connect
self.client = ChatClient(username, host, port)
if self.client.connect():
self.status_var.set(f"Connected as {self.client.username}")
self.status_var.set(f"Connected as {self.client.username}")
# Update UI state
self.connect_button.config(state=tk.DISABLED)
self.disconnect_button.config(state=tk.NORMAL)
self.send_button.config(state=tk.NORMAL)
self.message_entry.config(state=tk.NORMAL)
# Start message update thread
self.update_messages_thread = threading.Thread(target=self.update_messages, daemon=True)
self.update_messages_thread.start()
# Clear and focus message entry
self.message_entry.focus()
else:
messagebox.showerror("Error", "Failed to connect to server")
self.client = None
def disconnect_from_server(self):
"""Disconnect from chat server"""
if self.client:
self.client.disconnect()
self.client = None
# Update UI state
self.status_var.set("Not connected")
self.connect_button.config(state=tk.NORMAL)
self.disconnect_button.config(state=tk.DISABLED)
self.send_button.config(state=tk.DISABLED)
self.message_entry.config(state=tk.DISABLED)
def send_message_event(self, event):
"""Handle Enter key press"""
self.send_message()
def send_message(self):
"""Send message or command"""
if not self.client or not self.client.connected:
return
content = self.message_var.get().strip()
if not content:
return
if content.startswith('/'):
# Send as command
self.client.send_command(content)
else:
# Send as message
self.client.send_message(content)
# Clear input
self.message_var.set("")
def update_messages(self):
"""Update messages from server"""
while self.client and self.client.connected:
try:
# Get message from queue (blocking with timeout)
message_data = self.client.message_queue.get(timeout=1.0)
# Update UI in main thread
self.root.after(0, self.display_message, message_data)
except queue.Empty:
continue
except Exception as e:
logger.error(f"Error updating messages: {e}")
break
def display_message(self, message_data: dict):
"""Display message in chat window"""
self.messages_display.config(state=tk.NORMAL)
message_type = message_data.get('type', 'message')
if message_type == 'welcome':
# Display welcome message and history
username = message_data['username']
room = message_data['room']
self.messages_display.insert(tk.END, f"=== Welcome {username} to room '{room}' ===\n", 'system')
# Display history
history = message_data.get('history', [])
for msg_dict in history:
self.format_and_insert_message(msg_dict)
elif message_type == 'system':
# System message
content = message_data['content']
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
self.messages_display.insert(tk.END, f"[{timestamp}] SYSTEM: {content}\n", 'system')
elif message_type == 'private':
# Private message
from_user = message_data['from']
content = message_data['content']
timestamp = datetime.datetime.fromisoformat(message_data['timestamp']).strftime("%H:%M:%S")
self.messages_display.insert(tk.END, f"[{timestamp}] PRIVATE from {from_user}: {content}\n", 'private')
else:
# Regular message
self.format_and_insert_message(message_data)
# Configure tags for styling
self.messages_display.tag_config('system', foreground='blue')
self.messages_display.tag_config('private', foreground='purple')
self.messages_display.tag_config('join', foreground='green')
self.messages_display.tag_config('leave', foreground='red')
# Scroll to bottom
self.messages_display.see(tk.END)
self.messages_display.config(state=tk.DISABLED)
def format_and_insert_message(self, message_data: dict):
"""Format and insert a chat message"""
username = message_data['username']
content = message_data['content']
msg_type = message_data.get('type', 'message')
timestamp = datetime.datetime.fromisoformat(message_data['timestamp']).strftime("%H:%M:%S")
if msg_type == 'join':
self.messages_display.insert(tk.END, f"[{timestamp}] {content}\n", 'join')
elif msg_type == 'leave':
self.messages_display.insert(tk.END, f"[{timestamp}] {content}\n", 'leave')
else:
self.messages_display.insert(tk.END, f"[{timestamp}] {username}: {content}\n")
def run(self):
"""Run the GUI application"""
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
self.root.mainloop()
def on_closing(self):
"""Handle window closing"""
if self.client:
self.client.disconnect()
self.root.destroy()
def start_server():
"""Start chat server"""
print("Starting chat server...")
server = ChatServer()
try:
server.start()
except KeyboardInterrupt:
print("\nShutting down server...")
server.stop()
def start_client():
"""Start chat client GUI"""
app = ChatGUI()
app.run()
def main():
"""Main function"""
import sys
if len(sys.argv) > 1:
if sys.argv[1] == 'server':
start_server()
elif sys.argv[1] == 'client':
start_client()
else:
print("Usage: python chatapp.py [server|client]")
else:
# Default to client
start_client()
if __name__ == "__main__":
main()
- Save the file.
- Run the following command to start the server first:
C:\Users\username\Documents\chatApplication> python chatapplication.py server
Starting Chat Server...
======================
✓ Server listening on localhost:12345
✓ Waiting for client connections...
✓ Chat rooms initialized
✓ Message history enabled
Server ready for connections!
C:\Users\username\Documents\chatApplication> python chatapplication.py server
Starting Chat Server...
======================
✓ Server listening on localhost:12345
✓ Waiting for client connections...
✓ Chat rooms initialized
✓ Message history enabled
Server ready for connections!
- Then start the client application:
C:\Users\username\Documents\chatApplication> python chatapplication.py client
Chat Application
================
Enter username: Alice
✓ Connected to server
✓ Joined room: general
✓ Ready to chat!
Type '/help' for commands
C:\Users\username\Documents\chatApplication> python chatapplication.py client
Chat Application
================
Enter username: Alice
✓ Connected to server
✓ Joined room: general
✓ Ready to chat!
Type '/help' for commands
Explanation
- The
ChatMessage
ChatMessage
class represents individual messages with metadata and serialization. - The
ChatServer
ChatServer
class manages multiple client connections and message routing. - The
ChatClient
ChatClient
class handles connection to server and message handling. - The
ChatGUI
ChatGUI
class provides a user-friendly interface for the chat application. - Socket programming enables real-time communication between multiple clients.
- Threading allows concurrent handling of multiple client connections.
- Message broadcasting distributes messages to all connected clients in a room.
- Command system provides chat features like private messaging and room management.
- Room management allows users to create and join different chat channels.
- Message history maintains conversation logs for persistence.
- User authentication and session management for secure connections.
- Error handling ensures robust network communication.
Next Steps
Congratulations! You have successfully created a Chat Application in Python. Experiment with the code and see if you can modify the application. Here are a few suggestions:
- Add user registration and authentication system
- Implement file sharing and image uploads
- Create voice and video chat capabilities
- Add message encryption for secure communication
- Implement chat bots and automated responses
- Create admin panel for chat moderation
- Add emoji support and rich text formatting
- Implement push notifications for mobile clients
- Create chat analytics and usage statistics
Conclusion
In this project, you learned how to create a Chat Application in Python using socket programming and networking concepts. You also learned about client-server architecture, concurrent programming, message broadcasting, and implementing real-time communication systems. You can find the source code on GitHub
How It Works
1. Server Architecture
class ChatServer:
def __init__(self, host: str = 'localhost', port: int = 12345):
self.host = host
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.clients: Dict[socket.socket, dict] = {}
self.message_history: List[ChatMessage] = []
self.running = False
self.max_history = 100
# Room management
self.rooms: Dict[str, List[socket.socket]] = {'general': []}
self.default_room = 'general'
class ChatServer:
def __init__(self, host: str = 'localhost', port: int = 12345):
self.host = host
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.clients: Dict[socket.socket, dict] = {}
self.message_history: List[ChatMessage] = []
self.running = False
self.max_history = 100
# Room management
self.rooms: Dict[str, List[socket.socket]] = {'general': []}
self.default_room = 'general'
The server architecture includes:
- Socket Management: TCP socket for reliable communication
- Client Tracking: Dictionary to store client connections and metadata
- Room System: Multiple chat rooms with user management
- Message History: Persistent storage of chat messages
- Threading: Concurrent handling of multiple clients
2. Message Broadcasting System
def broadcast_message(self, message: ChatMessage, room: str = None, exclude: socket.socket = None):
if room is None:
# Broadcast to all clients
target_clients = list(self.clients.keys())
else:
# Broadcast to specific room
target_clients = self.rooms.get(room, [])
message_data = json.dumps(message.to_dict())
for client_socket in target_clients:
if client_socket != exclude:
try:
client_socket.send(message_data.encode('utf-8'))
except:
self.disconnect_client(client_socket)
def broadcast_message(self, message: ChatMessage, room: str = None, exclude: socket.socket = None):
if room is None:
# Broadcast to all clients
target_clients = list(self.clients.keys())
else:
# Broadcast to specific room
target_clients = self.rooms.get(room, [])
message_data = json.dumps(message.to_dict())
for client_socket in target_clients:
if client_socket != exclude:
try:
client_socket.send(message_data.encode('utf-8'))
except:
self.disconnect_client(client_socket)
3. Client Connection Handling
def handle_client(self, client_socket: socket.socket, address: Tuple[str, int]):
username = None
try:
# Request username
client_socket.send(b"USERNAME_REQUEST")
username = client_socket.recv(1024).decode('utf-8').strip()
# Add client to tracking
self.clients[client_socket] = {
'username': username,
'address': address,
'room': self.default_room,
'connected_at': datetime.datetime.now()
}
def handle_client(self, client_socket: socket.socket, address: Tuple[str, int]):
username = None
try:
# Request username
client_socket.send(b"USERNAME_REQUEST")
username = client_socket.recv(1024).decode('utf-8').strip()
# Add client to tracking
self.clients[client_socket] = {
'username': username,
'address': address,
'room': self.default_room,
'connected_at': datetime.datetime.now()
}
4. Command Processing
def handle_command(self, client_socket: socket.socket, command: str):
client_info = self.clients[client_socket]
username = client_info['username']
parts = command.strip().split()
if not parts:
return
cmd = parts[0].lower()
if cmd == '/help':
help_text = """
Available Commands:
/help - Show this help message
/users - List online users
/rooms - List available rooms
/join <room> - Join a room
/create <room> - Create new room
/pm <user> <message> - Send private message
/time - Show current time
"""
self.send_system_message(client_socket, help_text)
def handle_command(self, client_socket: socket.socket, command: str):
client_info = self.clients[client_socket]
username = client_info['username']
parts = command.strip().split()
if not parts:
return
cmd = parts[0].lower()
if cmd == '/help':
help_text = """
Available Commands:
/help - Show this help message
/users - List online users
/rooms - List available rooms
/join <room> - Join a room
/create <room> - Create new room
/pm <user> <message> - Send private message
/time - Show current time
"""
self.send_system_message(client_socket, help_text)
5. GUI Implementation
class ChatGUI:
def __init__(self):
self.root = tk.Tk()
self.root.title("Chat Application")
self.root.geometry("800x600")
self.client = None
self.username = ""
self.connected = False
self.message_queue = queue.Queue()
self.setup_ui()
self.update_messages()
def setup_ui(self):
# Create main frame
main_frame = ttk.Frame(self.root)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
class ChatGUI:
def __init__(self):
self.root = tk.Tk()
self.root.title("Chat Application")
self.root.geometry("800x600")
self.client = None
self.username = ""
self.connected = False
self.message_queue = queue.Queue()
self.setup_ui()
self.update_messages()
def setup_ui(self):
# Create main frame
main_frame = ttk.Frame(self.root)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
6. Message Queue System
def listen_for_messages(self):
while self.connected:
try:
message_data = self.socket.recv(1024).decode('utf-8')
if message_data:
message = json.loads(message_data)
self.message_queue.put(message)
except:
break
def update_messages(self):
try:
while True:
message_data = self.message_queue.get_nowait()
self.display_message(message_data)
except queue.Empty:
pass
self.root.after(100, self.update_messages)
def listen_for_messages(self):
while self.connected:
try:
message_data = self.socket.recv(1024).decode('utf-8')
if message_data:
message = json.loads(message_data)
self.message_queue.put(message)
except:
break
def update_messages(self):
try:
while True:
message_data = self.message_queue.get_nowait()
self.display_message(message_data)
except queue.Empty:
pass
self.root.after(100, self.update_messages)
Advanced Chat Features
1. Private Messaging System
def send_private_message(self, sender_socket: socket.socket, target_username: str, content: str):
"""Send private message between users"""
sender_info = self.clients.get(sender_socket)
if not sender_info:
return
sender_username = sender_info['username']
target_socket = None
# Find target user's socket
for client_socket, client_info in self.clients.items():
if client_info['username'] == target_username:
target_socket = client_socket
break
if target_socket:
# Send to target user
private_message = ChatMessage(
username=sender_username,
content=content,
message_type="private"
)
try:
message_data = json.dumps(private_message.to_dict())
target_socket.send(message_data.encode('utf-8'))
# Confirm to sender
confirmation = ChatMessage(
username="System",
content=f"Private message sent to {target_username}",
message_type="system"
)
sender_socket.send(json.dumps(confirmation.to_dict()).encode('utf-8'))
except Exception as e:
error_message = ChatMessage(
username="System",
content=f"Failed to send private message: {str(e)}",
message_type="system"
)
sender_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
else:
# User not found
error_message = ChatMessage(
username="System",
content=f"User '{target_username}' not found or offline",
message_type="system"
)
sender_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
def send_private_message(self, sender_socket: socket.socket, target_username: str, content: str):
"""Send private message between users"""
sender_info = self.clients.get(sender_socket)
if not sender_info:
return
sender_username = sender_info['username']
target_socket = None
# Find target user's socket
for client_socket, client_info in self.clients.items():
if client_info['username'] == target_username:
target_socket = client_socket
break
if target_socket:
# Send to target user
private_message = ChatMessage(
username=sender_username,
content=content,
message_type="private"
)
try:
message_data = json.dumps(private_message.to_dict())
target_socket.send(message_data.encode('utf-8'))
# Confirm to sender
confirmation = ChatMessage(
username="System",
content=f"Private message sent to {target_username}",
message_type="system"
)
sender_socket.send(json.dumps(confirmation.to_dict()).encode('utf-8'))
except Exception as e:
error_message = ChatMessage(
username="System",
content=f"Failed to send private message: {str(e)}",
message_type="system"
)
sender_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
else:
# User not found
error_message = ChatMessage(
username="System",
content=f"User '{target_username}' not found or offline",
message_type="system"
)
sender_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
2. Room Management System
def create_room(self, room_name: str, creator_socket: socket.socket, password: str = None):
"""Create a new chat room"""
if room_name in self.rooms:
error_message = ChatMessage(
username="System",
content=f"Room '{room_name}' already exists",
message_type="system"
)
creator_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
return False
# Create new room
self.rooms[room_name] = {
'clients': [],
'creator': self.clients[creator_socket]['username'],
'created_at': datetime.datetime.now(),
'password': password,
'topic': f"Welcome to {room_name}!",
'max_users': 50
}
# Move creator to new room
self.move_client_to_room(creator_socket, room_name)
success_message = ChatMessage(
username="System",
content=f"Room '{room_name}' created successfully. You are now in {room_name}.",
message_type="system"
)
creator_socket.send(json.dumps(success_message.to_dict()).encode('utf-8'))
return True
def join_room(self, client_socket: socket.socket, room_name: str, password: str = None):
"""Join an existing room"""
if room_name not in self.rooms:
error_message = ChatMessage(
username="System",
content=f"Room '{room_name}' does not exist",
message_type="system"
)
client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
return False
room = self.rooms[room_name]
# Check password if required
if room.get('password') and room['password'] != password:
error_message = ChatMessage(
username="System",
content=f"Incorrect password for room '{room_name}'",
message_type="system"
)
client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
return False
# Check room capacity
if len(room['clients']) >= room['max_users']:
error_message = ChatMessage(
username="System",
content=f"Room '{room_name}' is full",
message_type="system"
)
client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
return False
# Move client to room
self.move_client_to_room(client_socket, room_name)
return True
def list_rooms(self, client_socket: socket.socket):
"""List all available rooms"""
room_list = []
for room_name, room_info in self.rooms.items():
room_list.append({
'name': room_name,
'users': len(room_info['clients']),
'max_users': room_info['max_users'],
'has_password': bool(room_info.get('password')),
'topic': room_info.get('topic', 'No topic set')
})
room_info_message = ChatMessage(
username="System",
content=f"Available rooms:\n" +
"\n".join([
f"• {room['name']}: {room['users']}/{room['max_users']} users"
f"{' 🔒' if room['has_password'] else ''} - {room['topic']}"
for room in room_list
]),
message_type="system"
)
client_socket.send(json.dumps(room_info_message.to_dict()).encode('utf-8'))
def create_room(self, room_name: str, creator_socket: socket.socket, password: str = None):
"""Create a new chat room"""
if room_name in self.rooms:
error_message = ChatMessage(
username="System",
content=f"Room '{room_name}' already exists",
message_type="system"
)
creator_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
return False
# Create new room
self.rooms[room_name] = {
'clients': [],
'creator': self.clients[creator_socket]['username'],
'created_at': datetime.datetime.now(),
'password': password,
'topic': f"Welcome to {room_name}!",
'max_users': 50
}
# Move creator to new room
self.move_client_to_room(creator_socket, room_name)
success_message = ChatMessage(
username="System",
content=f"Room '{room_name}' created successfully. You are now in {room_name}.",
message_type="system"
)
creator_socket.send(json.dumps(success_message.to_dict()).encode('utf-8'))
return True
def join_room(self, client_socket: socket.socket, room_name: str, password: str = None):
"""Join an existing room"""
if room_name not in self.rooms:
error_message = ChatMessage(
username="System",
content=f"Room '{room_name}' does not exist",
message_type="system"
)
client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
return False
room = self.rooms[room_name]
# Check password if required
if room.get('password') and room['password'] != password:
error_message = ChatMessage(
username="System",
content=f"Incorrect password for room '{room_name}'",
message_type="system"
)
client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
return False
# Check room capacity
if len(room['clients']) >= room['max_users']:
error_message = ChatMessage(
username="System",
content=f"Room '{room_name}' is full",
message_type="system"
)
client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
return False
# Move client to room
self.move_client_to_room(client_socket, room_name)
return True
def list_rooms(self, client_socket: socket.socket):
"""List all available rooms"""
room_list = []
for room_name, room_info in self.rooms.items():
room_list.append({
'name': room_name,
'users': len(room_info['clients']),
'max_users': room_info['max_users'],
'has_password': bool(room_info.get('password')),
'topic': room_info.get('topic', 'No topic set')
})
room_info_message = ChatMessage(
username="System",
content=f"Available rooms:\n" +
"\n".join([
f"• {room['name']}: {room['users']}/{room['max_users']} users"
f"{' 🔒' if room['has_password'] else ''} - {room['topic']}"
for room in room_list
]),
message_type="system"
)
client_socket.send(json.dumps(room_info_message.to_dict()).encode('utf-8'))
3. User Status and Presence
def update_user_status(self, client_socket: socket.socket, status: str):
"""Update user's online status"""
valid_statuses = ['online', 'away', 'busy', 'invisible']
if status not in valid_statuses:
error_message = ChatMessage(
username="System",
content=f"Invalid status. Valid options: {', '.join(valid_statuses)}",
message_type="system"
)
client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
return
client_info = self.clients.get(client_socket)
if client_info:
old_status = client_info.get('status', 'online')
client_info['status'] = status
client_info['last_activity'] = datetime.datetime.now()
# Notify room members of status change
username = client_info['username']
current_room = client_info.get('room', self.default_room)
status_message = ChatMessage(
username="System",
content=f"{username} is now {status}",
message_type="status_change"
)
self.broadcast_message(status_message, current_room, exclude=client_socket)
def get_online_users(self, room_name: str = None) -> List[Dict]:
"""Get list of online users in room or globally"""
users = []
for client_socket, client_info in self.clients.items():
if room_name and client_info.get('room') != room_name:
continue
# Check if user is truly online (recent activity)
last_activity = client_info.get('last_activity', datetime.datetime.now())
time_diff = datetime.datetime.now() - last_activity
if time_diff.total_seconds() < 300: # 5 minutes timeout
users.append({
'username': client_info['username'],
'status': client_info.get('status', 'online'),
'room': client_info.get('room', self.default_room),
'joined_at': client_info.get('connected_at', ''),
'last_activity': last_activity.isoformat()
})
return users
def update_user_status(self, client_socket: socket.socket, status: str):
"""Update user's online status"""
valid_statuses = ['online', 'away', 'busy', 'invisible']
if status not in valid_statuses:
error_message = ChatMessage(
username="System",
content=f"Invalid status. Valid options: {', '.join(valid_statuses)}",
message_type="system"
)
client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
return
client_info = self.clients.get(client_socket)
if client_info:
old_status = client_info.get('status', 'online')
client_info['status'] = status
client_info['last_activity'] = datetime.datetime.now()
# Notify room members of status change
username = client_info['username']
current_room = client_info.get('room', self.default_room)
status_message = ChatMessage(
username="System",
content=f"{username} is now {status}",
message_type="status_change"
)
self.broadcast_message(status_message, current_room, exclude=client_socket)
def get_online_users(self, room_name: str = None) -> List[Dict]:
"""Get list of online users in room or globally"""
users = []
for client_socket, client_info in self.clients.items():
if room_name and client_info.get('room') != room_name:
continue
# Check if user is truly online (recent activity)
last_activity = client_info.get('last_activity', datetime.datetime.now())
time_diff = datetime.datetime.now() - last_activity
if time_diff.total_seconds() < 300: # 5 minutes timeout
users.append({
'username': client_info['username'],
'status': client_info.get('status', 'online'),
'room': client_info.get('room', self.default_room),
'joined_at': client_info.get('connected_at', ''),
'last_activity': last_activity.isoformat()
})
return users
4. File Sharing Capability
def handle_file_share(self, client_socket: socket.socket, file_data: Dict):
"""Handle file sharing between users"""
import base64
import mimetypes
client_info = self.clients.get(client_socket)
if not client_info:
return
try:
# Decode file data
filename = file_data['filename']
file_content = base64.b64decode(file_data['content'])
file_size = len(file_content)
# Validate file
max_file_size = 10 * 1024 * 1024 # 10MB limit
if file_size > max_file_size:
error_message = ChatMessage(
username="System",
content="File too large. Maximum size is 10MB.",
message_type="system"
)
client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
return
# Save file temporarily
upload_dir = "uploads"
os.makedirs(upload_dir, exist_ok=True)
safe_filename = f"{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}_{filename}"
file_path = os.path.join(upload_dir, safe_filename)
with open(file_path, 'wb') as f:
f.write(file_content)
# Get file type
mime_type, _ = mimetypes.guess_type(filename)
file_type = 'image' if mime_type and mime_type.startswith('image/') else 'file'
# Create file share message
file_message = ChatMessage(
username=client_info['username'],
content=f"shared a file: {filename} ({file_size} bytes)",
message_type="file_share"
)
# Add file information to message
file_message.file_info = {
'filename': filename,
'size': file_size,
'type': file_type,
'path': file_path,
'mime_type': mime_type
}
# Broadcast to room
current_room = client_info.get('room', self.default_room)
self.broadcast_message(file_message, current_room)
except Exception as e:
error_message = ChatMessage(
username="System",
content=f"File share failed: {str(e)}",
message_type="system"
)
client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
def handle_file_share(self, client_socket: socket.socket, file_data: Dict):
"""Handle file sharing between users"""
import base64
import mimetypes
client_info = self.clients.get(client_socket)
if not client_info:
return
try:
# Decode file data
filename = file_data['filename']
file_content = base64.b64decode(file_data['content'])
file_size = len(file_content)
# Validate file
max_file_size = 10 * 1024 * 1024 # 10MB limit
if file_size > max_file_size:
error_message = ChatMessage(
username="System",
content="File too large. Maximum size is 10MB.",
message_type="system"
)
client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
return
# Save file temporarily
upload_dir = "uploads"
os.makedirs(upload_dir, exist_ok=True)
safe_filename = f"{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}_{filename}"
file_path = os.path.join(upload_dir, safe_filename)
with open(file_path, 'wb') as f:
f.write(file_content)
# Get file type
mime_type, _ = mimetypes.guess_type(filename)
file_type = 'image' if mime_type and mime_type.startswith('image/') else 'file'
# Create file share message
file_message = ChatMessage(
username=client_info['username'],
content=f"shared a file: {filename} ({file_size} bytes)",
message_type="file_share"
)
# Add file information to message
file_message.file_info = {
'filename': filename,
'size': file_size,
'type': file_type,
'path': file_path,
'mime_type': mime_type
}
# Broadcast to room
current_room = client_info.get('room', self.default_room)
self.broadcast_message(file_message, current_room)
except Exception as e:
error_message = ChatMessage(
username="System",
content=f"File share failed: {str(e)}",
message_type="system"
)
client_socket.send(json.dumps(error_message.to_dict()).encode('utf-8'))
5. Message History and Persistence
def save_message_to_database(self, message: ChatMessage, room: str):
"""Save message to persistent storage"""
import sqlite3
db_path = "chat_history.db"
# Initialize database if needed
with sqlite3.connect(db_path) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
content TEXT NOT NULL,
message_type TEXT NOT NULL,
room TEXT NOT NULL,
timestamp TEXT NOT NULL,
message_id TEXT UNIQUE
)
''')
# Insert message
conn.execute('''
INSERT OR IGNORE INTO messages
(username, content, message_type, room, timestamp, message_id)
VALUES (?, ?, ?, ?, ?, ?)
''', (message.username, message.content, message.message_type,
room, message.timestamp.isoformat(), message.id))
def get_message_history(self, room: str, limit: int = 50) -> List[ChatMessage]:
"""Retrieve message history for a room"""
import sqlite3
db_path = "chat_history.db"
messages = []
try:
with sqlite3.connect(db_path) as conn:
cursor = conn.execute('''
SELECT username, content, message_type, timestamp, message_id
FROM messages
WHERE room = ?
ORDER BY timestamp DESC
LIMIT ?
''', (room, limit))
for row in cursor.fetchall():
message = ChatMessage(
username=row[0],
content=row[1],
message_type=row[2],
timestamp=datetime.datetime.fromisoformat(row[3])
)
message.id = row[4]
messages.append(message)
return list(reversed(messages)) # Return in chronological order
except Exception as e:
logger.error(f"Error retrieving message history: {e}")
return []
def send_message_history(self, client_socket: socket.socket, room: str):
"""Send recent message history to newly joined client"""
history = self.get_message_history(room, limit=20)
for message in history:
try:
message_data = json.dumps(message.to_dict())
client_socket.send(message_data.encode('utf-8'))
time.sleep(0.1) # Small delay to ensure order
except:
break
def save_message_to_database(self, message: ChatMessage, room: str):
"""Save message to persistent storage"""
import sqlite3
db_path = "chat_history.db"
# Initialize database if needed
with sqlite3.connect(db_path) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
content TEXT NOT NULL,
message_type TEXT NOT NULL,
room TEXT NOT NULL,
timestamp TEXT NOT NULL,
message_id TEXT UNIQUE
)
''')
# Insert message
conn.execute('''
INSERT OR IGNORE INTO messages
(username, content, message_type, room, timestamp, message_id)
VALUES (?, ?, ?, ?, ?, ?)
''', (message.username, message.content, message.message_type,
room, message.timestamp.isoformat(), message.id))
def get_message_history(self, room: str, limit: int = 50) -> List[ChatMessage]:
"""Retrieve message history for a room"""
import sqlite3
db_path = "chat_history.db"
messages = []
try:
with sqlite3.connect(db_path) as conn:
cursor = conn.execute('''
SELECT username, content, message_type, timestamp, message_id
FROM messages
WHERE room = ?
ORDER BY timestamp DESC
LIMIT ?
''', (room, limit))
for row in cursor.fetchall():
message = ChatMessage(
username=row[0],
content=row[1],
message_type=row[2],
timestamp=datetime.datetime.fromisoformat(row[3])
)
message.id = row[4]
messages.append(message)
return list(reversed(messages)) # Return in chronological order
except Exception as e:
logger.error(f"Error retrieving message history: {e}")
return []
def send_message_history(self, client_socket: socket.socket, room: str):
"""Send recent message history to newly joined client"""
history = self.get_message_history(room, limit=20)
for message in history:
try:
message_data = json.dumps(message.to_dict())
client_socket.send(message_data.encode('utf-8'))
time.sleep(0.1) # Small delay to ensure order
except:
break
Enhanced GUI Features
1. Modern Chat Interface
def setup_modern_ui(self):
"""Create modern chat interface with enhanced features"""
# Main container
self.main_frame = ttk.Frame(self.root)
self.main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# Create paned window for resizable layout
self.paned_window = ttk.PanedWindow(self.main_frame, orient=tk.HORIZONTAL)
self.paned_window.pack(fill=tk.BOTH, expand=True)
# Left sidebar for rooms and users
self.sidebar_frame = ttk.Frame(self.paned_window, width=250)
self.paned_window.add(self.sidebar_frame, weight=1)
# Main chat area
self.chat_frame = ttk.Frame(self.paned_window)
self.paned_window.add(self.chat_frame, weight=3)
self.setup_sidebar()
self.setup_chat_area()
self.setup_input_area()
def setup_sidebar(self):
"""Setup sidebar with rooms and user list"""
# Room list
ttk.Label(self.sidebar_frame, text="Rooms", font=('Arial', 12, 'bold')).pack(pady=5)
self.room_listbox = tk.Listbox(self.sidebar_frame, height=8)
self.room_listbox.pack(fill=tk.X, padx=5, pady=5)
self.room_listbox.bind('<Double-Button-1>', self.on_room_select)
# Room controls
room_controls = ttk.Frame(self.sidebar_frame)
room_controls.pack(fill=tk.X, padx=5)
ttk.Button(room_controls, text="Join Room", command=self.join_room_dialog).pack(side=tk.LEFT, padx=2)
ttk.Button(room_controls, text="Create Room", command=self.create_room_dialog).pack(side=tk.LEFT, padx=2)
# User list
ttk.Separator(self.sidebar_frame, orient=tk.HORIZONTAL).pack(fill=tk.X, pady=10)
ttk.Label(self.sidebar_frame, text="Online Users", font=('Arial', 12, 'bold')).pack(pady=5)
self.user_listbox = tk.Listbox(self.sidebar_frame, height=10)
self.user_listbox.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.user_listbox.bind('<Double-Button-1>', self.on_user_select)
def setup_chat_area(self):
"""Setup main chat display area"""
# Chat header
self.chat_header = ttk.Frame(self.chat_frame)
self.chat_header.pack(fill=tk.X, pady=(0, 5))
self.room_label = ttk.Label(self.chat_header, text="General", font=('Arial', 14, 'bold'))
self.room_label.pack(side=tk.LEFT)
self.user_count_label = ttk.Label(self.chat_header, text="0 users")
self.user_count_label.pack(side=tk.RIGHT)
# Chat messages area with scrollbar
chat_container = ttk.Frame(self.chat_frame)
chat_container.pack(fill=tk.BOTH, expand=True)
self.chat_display = scrolledtext.ScrolledText(
chat_container,
state=tk.DISABLED,
wrap=tk.WORD,
font=('Consolas', 10),
bg='#f8f9fa',
fg='#333333'
)
self.chat_display.pack(fill=tk.BOTH, expand=True)
# Configure text tags for message styling
self.chat_display.tag_configure('username', foreground='#007bff', font=('Consolas', 10, 'bold'))
self.chat_display.tag_configure('timestamp', foreground='#6c757d', font=('Consolas', 8))
self.chat_display.tag_configure('private', foreground='#dc3545', font=('Consolas', 10, 'italic'))
self.chat_display.tag_configure('system', foreground='#28a745', font=('Consolas', 10, 'italic'))
def setup_input_area(self):
"""Setup message input area with enhanced features"""
input_frame = ttk.Frame(self.chat_frame)
input_frame.pack(fill=tk.X, pady=(5, 0))
# Status indicator
self.status_frame = ttk.Frame(input_frame)
self.status_frame.pack(fill=tk.X, pady=(0, 5))
self.typing_label = ttk.Label(self.status_frame, text="", foreground='#6c757d')
self.typing_label.pack(side=tk.LEFT)
self.connection_status = ttk.Label(self.status_frame, text="●", foreground='red')
self.connection_status.pack(side=tk.RIGHT)
# Message input
message_frame = ttk.Frame(input_frame)
message_frame.pack(fill=tk.X)
self.message_entry = tk.Text(message_frame, height=3, wrap=tk.WORD)
self.message_entry.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5))
self.message_entry.bind('<Return>', self.send_message_event)
self.message_entry.bind('<KeyPress>', self.on_typing)
# Button frame
button_frame = ttk.Frame(message_frame)
button_frame.pack(side=tk.RIGHT, fill=tk.Y)
ttk.Button(button_frame, text="📎", command=self.attach_file, width=3).pack(pady=1)
ttk.Button(button_frame, text="😊", command=self.show_emoji_picker, width=3).pack(pady=1)
ttk.Button(button_frame, text="Send", command=self.send_message).pack(pady=1)
def setup_modern_ui(self):
"""Create modern chat interface with enhanced features"""
# Main container
self.main_frame = ttk.Frame(self.root)
self.main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# Create paned window for resizable layout
self.paned_window = ttk.PanedWindow(self.main_frame, orient=tk.HORIZONTAL)
self.paned_window.pack(fill=tk.BOTH, expand=True)
# Left sidebar for rooms and users
self.sidebar_frame = ttk.Frame(self.paned_window, width=250)
self.paned_window.add(self.sidebar_frame, weight=1)
# Main chat area
self.chat_frame = ttk.Frame(self.paned_window)
self.paned_window.add(self.chat_frame, weight=3)
self.setup_sidebar()
self.setup_chat_area()
self.setup_input_area()
def setup_sidebar(self):
"""Setup sidebar with rooms and user list"""
# Room list
ttk.Label(self.sidebar_frame, text="Rooms", font=('Arial', 12, 'bold')).pack(pady=5)
self.room_listbox = tk.Listbox(self.sidebar_frame, height=8)
self.room_listbox.pack(fill=tk.X, padx=5, pady=5)
self.room_listbox.bind('<Double-Button-1>', self.on_room_select)
# Room controls
room_controls = ttk.Frame(self.sidebar_frame)
room_controls.pack(fill=tk.X, padx=5)
ttk.Button(room_controls, text="Join Room", command=self.join_room_dialog).pack(side=tk.LEFT, padx=2)
ttk.Button(room_controls, text="Create Room", command=self.create_room_dialog).pack(side=tk.LEFT, padx=2)
# User list
ttk.Separator(self.sidebar_frame, orient=tk.HORIZONTAL).pack(fill=tk.X, pady=10)
ttk.Label(self.sidebar_frame, text="Online Users", font=('Arial', 12, 'bold')).pack(pady=5)
self.user_listbox = tk.Listbox(self.sidebar_frame, height=10)
self.user_listbox.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.user_listbox.bind('<Double-Button-1>', self.on_user_select)
def setup_chat_area(self):
"""Setup main chat display area"""
# Chat header
self.chat_header = ttk.Frame(self.chat_frame)
self.chat_header.pack(fill=tk.X, pady=(0, 5))
self.room_label = ttk.Label(self.chat_header, text="General", font=('Arial', 14, 'bold'))
self.room_label.pack(side=tk.LEFT)
self.user_count_label = ttk.Label(self.chat_header, text="0 users")
self.user_count_label.pack(side=tk.RIGHT)
# Chat messages area with scrollbar
chat_container = ttk.Frame(self.chat_frame)
chat_container.pack(fill=tk.BOTH, expand=True)
self.chat_display = scrolledtext.ScrolledText(
chat_container,
state=tk.DISABLED,
wrap=tk.WORD,
font=('Consolas', 10),
bg='#f8f9fa',
fg='#333333'
)
self.chat_display.pack(fill=tk.BOTH, expand=True)
# Configure text tags for message styling
self.chat_display.tag_configure('username', foreground='#007bff', font=('Consolas', 10, 'bold'))
self.chat_display.tag_configure('timestamp', foreground='#6c757d', font=('Consolas', 8))
self.chat_display.tag_configure('private', foreground='#dc3545', font=('Consolas', 10, 'italic'))
self.chat_display.tag_configure('system', foreground='#28a745', font=('Consolas', 10, 'italic'))
def setup_input_area(self):
"""Setup message input area with enhanced features"""
input_frame = ttk.Frame(self.chat_frame)
input_frame.pack(fill=tk.X, pady=(5, 0))
# Status indicator
self.status_frame = ttk.Frame(input_frame)
self.status_frame.pack(fill=tk.X, pady=(0, 5))
self.typing_label = ttk.Label(self.status_frame, text="", foreground='#6c757d')
self.typing_label.pack(side=tk.LEFT)
self.connection_status = ttk.Label(self.status_frame, text="●", foreground='red')
self.connection_status.pack(side=tk.RIGHT)
# Message input
message_frame = ttk.Frame(input_frame)
message_frame.pack(fill=tk.X)
self.message_entry = tk.Text(message_frame, height=3, wrap=tk.WORD)
self.message_entry.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5))
self.message_entry.bind('<Return>', self.send_message_event)
self.message_entry.bind('<KeyPress>', self.on_typing)
# Button frame
button_frame = ttk.Frame(message_frame)
button_frame.pack(side=tk.RIGHT, fill=tk.Y)
ttk.Button(button_frame, text="📎", command=self.attach_file, width=3).pack(pady=1)
ttk.Button(button_frame, text="😊", command=self.show_emoji_picker, width=3).pack(pady=1)
ttk.Button(button_frame, text="Send", command=self.send_message).pack(pady=1)
2. Emoji and File Support
def show_emoji_picker(self):
"""Show emoji picker dialog"""
emoji_window = tk.Toplevel(self.root)
emoji_window.title("Select Emoji")
emoji_window.geometry("300x200")
emoji_window.resizable(False, False)
# Common emojis
emojis = [
'😀', '😂', '😊', '😍', '😢', '😡', '👍', '👎',
'❤️', '💔', '🔥', '💯', '✨', '🎉', '👀', '🤔',
'😎', '🤣', '😇', '😈', '🤗', '🙄', '😴', '🤐'
]
# Create emoji buttons
for i, emoji in enumerate(emojis):
row = i // 8
col = i % 8
btn = tk.Button(
emoji_window,
text=emoji,
font=('Arial', 16),
command=lambda e=emoji: self.insert_emoji(e, emoji_window),
relief=tk.FLAT,
bd=1
)
btn.grid(row=row, column=col, padx=2, pady=2, sticky='nsew')
def insert_emoji(self, emoji: str, window):
"""Insert selected emoji into message"""
self.message_entry.insert(tk.INSERT, emoji)
window.destroy()
self.message_entry.focus()
def attach_file(self):
"""Open file attachment dialog"""
from tkinter import filedialog
file_path = filedialog.askopenfilename(
title="Select File to Share",
filetypes=[
("Images", "*.png *.jpg *.jpeg *.gif *.bmp"),
("Documents", "*.pdf *.doc *.docx *.txt"),
("All Files", "*.*")
]
)
if file_path:
self.send_file(file_path)
def send_file(self, file_path: str):
"""Send file to chat"""
try:
file_size = os.path.getsize(file_path)
max_size = 10 * 1024 * 1024 # 10MB
if file_size > max_size:
messagebox.showerror("File Too Large", "File must be smaller than 10MB")
return
filename = os.path.basename(file_path)
# Read and encode file
with open(file_path, 'rb') as f:
file_content = f.read()
import base64
encoded_content = base64.b64encode(file_content).decode('utf-8')
# Send file data
file_data = {
'type': 'file_share',
'filename': filename,
'content': encoded_content,
'size': file_size
}
if self.client:
self.client.send_command(json.dumps(file_data))
# Show file in chat
self.display_message({
'username': self.username,
'content': f"📎 Uploading {filename}...",
'type': 'file_share',
'timestamp': datetime.datetime.now().isoformat()
})
except Exception as e:
messagebox.showerror("File Error", f"Failed to send file: {str(e)}")
def show_emoji_picker(self):
"""Show emoji picker dialog"""
emoji_window = tk.Toplevel(self.root)
emoji_window.title("Select Emoji")
emoji_window.geometry("300x200")
emoji_window.resizable(False, False)
# Common emojis
emojis = [
'😀', '😂', '😊', '😍', '😢', '😡', '👍', '👎',
'❤️', '💔', '🔥', '💯', '✨', '🎉', '👀', '🤔',
'😎', '🤣', '😇', '😈', '🤗', '🙄', '😴', '🤐'
]
# Create emoji buttons
for i, emoji in enumerate(emojis):
row = i // 8
col = i % 8
btn = tk.Button(
emoji_window,
text=emoji,
font=('Arial', 16),
command=lambda e=emoji: self.insert_emoji(e, emoji_window),
relief=tk.FLAT,
bd=1
)
btn.grid(row=row, column=col, padx=2, pady=2, sticky='nsew')
def insert_emoji(self, emoji: str, window):
"""Insert selected emoji into message"""
self.message_entry.insert(tk.INSERT, emoji)
window.destroy()
self.message_entry.focus()
def attach_file(self):
"""Open file attachment dialog"""
from tkinter import filedialog
file_path = filedialog.askopenfilename(
title="Select File to Share",
filetypes=[
("Images", "*.png *.jpg *.jpeg *.gif *.bmp"),
("Documents", "*.pdf *.doc *.docx *.txt"),
("All Files", "*.*")
]
)
if file_path:
self.send_file(file_path)
def send_file(self, file_path: str):
"""Send file to chat"""
try:
file_size = os.path.getsize(file_path)
max_size = 10 * 1024 * 1024 # 10MB
if file_size > max_size:
messagebox.showerror("File Too Large", "File must be smaller than 10MB")
return
filename = os.path.basename(file_path)
# Read and encode file
with open(file_path, 'rb') as f:
file_content = f.read()
import base64
encoded_content = base64.b64encode(file_content).decode('utf-8')
# Send file data
file_data = {
'type': 'file_share',
'filename': filename,
'content': encoded_content,
'size': file_size
}
if self.client:
self.client.send_command(json.dumps(file_data))
# Show file in chat
self.display_message({
'username': self.username,
'content': f"📎 Uploading {filename}...",
'type': 'file_share',
'timestamp': datetime.datetime.now().isoformat()
})
except Exception as e:
messagebox.showerror("File Error", f"Failed to send file: {str(e)}")
3. Typing Indicators
def on_typing(self, event):
"""Handle typing indicator"""
if not self.connected:
return
current_time = time.time()
# Send typing indicator if haven't sent one recently
if current_time - getattr(self, 'last_typing_time', 0) > 2:
self.last_typing_time = current_time
typing_data = {
'type': 'typing_indicator',
'username': self.username,
'typing': True
}
if self.client:
self.client.send_command(json.dumps(typing_data))
# Schedule stop typing indicator
if hasattr(self, 'typing_timer'):
self.root.after_cancel(self.typing_timer)
self.typing_timer = self.root.after(3000, self.stop_typing_indicator)
def stop_typing_indicator(self):
"""Stop typing indicator"""
if self.connected and self.client:
typing_data = {
'type': 'typing_indicator',
'username': self.username,
'typing': False
}
self.client.send_command(json.dumps(typing_data))
def handle_typing_indicator(self, data):
"""Handle incoming typing indicators"""
username = data.get('username')
is_typing = data.get('typing', False)
if username == self.username:
return # Ignore own typing indicators
if is_typing:
if not hasattr(self, 'typing_users'):
self.typing_users = set()
self.typing_users.add(username)
# Update typing display
if len(self.typing_users) == 1:
typing_text = f"{username} is typing..."
elif len(self.typing_users) == 2:
typing_text = f"{', '.join(self.typing_users)} are typing..."
else:
typing_text = f"{len(self.typing_users)} people are typing..."
self.typing_label.config(text=typing_text)
else:
if hasattr(self, 'typing_users'):
self.typing_users.discard(username)
if not self.typing_users:
self.typing_label.config(text="")
def on_typing(self, event):
"""Handle typing indicator"""
if not self.connected:
return
current_time = time.time()
# Send typing indicator if haven't sent one recently
if current_time - getattr(self, 'last_typing_time', 0) > 2:
self.last_typing_time = current_time
typing_data = {
'type': 'typing_indicator',
'username': self.username,
'typing': True
}
if self.client:
self.client.send_command(json.dumps(typing_data))
# Schedule stop typing indicator
if hasattr(self, 'typing_timer'):
self.root.after_cancel(self.typing_timer)
self.typing_timer = self.root.after(3000, self.stop_typing_indicator)
def stop_typing_indicator(self):
"""Stop typing indicator"""
if self.connected and self.client:
typing_data = {
'type': 'typing_indicator',
'username': self.username,
'typing': False
}
self.client.send_command(json.dumps(typing_data))
def handle_typing_indicator(self, data):
"""Handle incoming typing indicators"""
username = data.get('username')
is_typing = data.get('typing', False)
if username == self.username:
return # Ignore own typing indicators
if is_typing:
if not hasattr(self, 'typing_users'):
self.typing_users = set()
self.typing_users.add(username)
# Update typing display
if len(self.typing_users) == 1:
typing_text = f"{username} is typing..."
elif len(self.typing_users) == 2:
typing_text = f"{', '.join(self.typing_users)} are typing..."
else:
typing_text = f"{len(self.typing_users)} people are typing..."
self.typing_label.config(text=typing_text)
else:
if hasattr(self, 'typing_users'):
self.typing_users.discard(username)
if not self.typing_users:
self.typing_label.config(text="")
Performance and Scalability
1. Connection Pool Management
class ConnectionPool:
def __init__(self, max_connections: int = 100):
self.max_connections = max_connections
self.active_connections = {}
self.connection_count = 0
self.lock = threading.Lock()
def add_connection(self, client_socket: socket.socket, client_info: Dict) -> bool:
"""Add new connection to pool"""
with self.lock:
if self.connection_count >= self.max_connections:
return False
connection_id = f"conn_{self.connection_count + 1}"
self.active_connections[connection_id] = {
'socket': client_socket,
'info': client_info,
'connected_at': datetime.datetime.now(),
'last_activity': datetime.datetime.now()
}
self.connection_count += 1
return True
def remove_connection(self, client_socket: socket.socket):
"""Remove connection from pool"""
with self.lock:
for conn_id, conn_data in list(self.active_connections.items()):
if conn_data['socket'] == client_socket:
del self.active_connections[conn_id]
self.connection_count -= 1
break
def cleanup_inactive_connections(self, timeout_minutes: int = 30):
"""Remove inactive connections"""
with self.lock:
current_time = datetime.datetime.now()
inactive_connections = []
for conn_id, conn_data in self.active_connections.items():
last_activity = conn_data['last_activity']
if (current_time - last_activity).total_seconds() > timeout_minutes * 60:
inactive_connections.append(conn_id)
for conn_id in inactive_connections:
try:
self.active_connections[conn_id]['socket'].close()
del self.active_connections[conn_id]
self.connection_count -= 1
except:
pass
class ConnectionPool:
def __init__(self, max_connections: int = 100):
self.max_connections = max_connections
self.active_connections = {}
self.connection_count = 0
self.lock = threading.Lock()
def add_connection(self, client_socket: socket.socket, client_info: Dict) -> bool:
"""Add new connection to pool"""
with self.lock:
if self.connection_count >= self.max_connections:
return False
connection_id = f"conn_{self.connection_count + 1}"
self.active_connections[connection_id] = {
'socket': client_socket,
'info': client_info,
'connected_at': datetime.datetime.now(),
'last_activity': datetime.datetime.now()
}
self.connection_count += 1
return True
def remove_connection(self, client_socket: socket.socket):
"""Remove connection from pool"""
with self.lock:
for conn_id, conn_data in list(self.active_connections.items()):
if conn_data['socket'] == client_socket:
del self.active_connections[conn_id]
self.connection_count -= 1
break
def cleanup_inactive_connections(self, timeout_minutes: int = 30):
"""Remove inactive connections"""
with self.lock:
current_time = datetime.datetime.now()
inactive_connections = []
for conn_id, conn_data in self.active_connections.items():
last_activity = conn_data['last_activity']
if (current_time - last_activity).total_seconds() > timeout_minutes * 60:
inactive_connections.append(conn_id)
for conn_id in inactive_connections:
try:
self.active_connections[conn_id]['socket'].close()
del self.active_connections[conn_id]
self.connection_count -= 1
except:
pass
2. Message Queuing System
import queue
import threading
class MessageQueue:
def __init__(self, max_size: int = 1000):
self.message_queue = queue.Queue(maxsize=max_size)
self.processing_thread = None
self.running = False
def start_processing(self):
"""Start message processing thread"""
self.running = True
self.processing_thread = threading.Thread(target=self._process_messages, daemon=True)
self.processing_thread.start()
def stop_processing(self):
"""Stop message processing thread"""
self.running = False
if self.processing_thread:
self.processing_thread.join(timeout=5)
def add_message(self, message: ChatMessage, recipients: List[socket.socket]):
"""Add message to processing queue"""
try:
self.message_queue.put({
'message': message,
'recipients': recipients,
'timestamp': datetime.datetime.now()
}, timeout=1)
except queue.Full:
logger.warning("Message queue is full, dropping message")
def _process_messages(self):
"""Process messages from queue"""
while self.running:
try:
queue_item = self.message_queue.get(timeout=1)
message = queue_item['message']
recipients = queue_item['recipients']
# Send message to all recipients
message_data = json.dumps(message.to_dict())
failed_recipients = []
for recipient_socket in recipients:
try:
recipient_socket.send(message_data.encode('utf-8'))
except:
failed_recipients.append(recipient_socket)
# Remove failed recipients
for failed_socket in failed_recipients:
recipients.remove(failed_socket)
self.message_queue.task_done()
except queue.Empty:
continue
except Exception as e:
logger.error(f"Error processing message: {e}")
import queue
import threading
class MessageQueue:
def __init__(self, max_size: int = 1000):
self.message_queue = queue.Queue(maxsize=max_size)
self.processing_thread = None
self.running = False
def start_processing(self):
"""Start message processing thread"""
self.running = True
self.processing_thread = threading.Thread(target=self._process_messages, daemon=True)
self.processing_thread.start()
def stop_processing(self):
"""Stop message processing thread"""
self.running = False
if self.processing_thread:
self.processing_thread.join(timeout=5)
def add_message(self, message: ChatMessage, recipients: List[socket.socket]):
"""Add message to processing queue"""
try:
self.message_queue.put({
'message': message,
'recipients': recipients,
'timestamp': datetime.datetime.now()
}, timeout=1)
except queue.Full:
logger.warning("Message queue is full, dropping message")
def _process_messages(self):
"""Process messages from queue"""
while self.running:
try:
queue_item = self.message_queue.get(timeout=1)
message = queue_item['message']
recipients = queue_item['recipients']
# Send message to all recipients
message_data = json.dumps(message.to_dict())
failed_recipients = []
for recipient_socket in recipients:
try:
recipient_socket.send(message_data.encode('utf-8'))
except:
failed_recipients.append(recipient_socket)
# Remove failed recipients
for failed_socket in failed_recipients:
recipients.remove(failed_socket)
self.message_queue.task_done()
except queue.Empty:
continue
except Exception as e:
logger.error(f"Error processing message: {e}")
3. Load Balancing and Clustering
class ChatServerCluster:
def __init__(self, server_id: str, cluster_nodes: List[str]):
self.server_id = server_id
self.cluster_nodes = cluster_nodes
self.local_clients = {}
self.remote_clients = {}
self.cluster_connections = {}
def setup_cluster_connections(self):
"""Setup connections to other cluster nodes"""
for node in self.cluster_nodes:
if node != self.server_id:
try:
host, port = node.split(':')
cluster_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cluster_socket.connect((host, int(port)))
self.cluster_connections[node] = cluster_socket
except Exception as e:
logger.error(f"Failed to connect to cluster node {node}: {e}")
def broadcast_to_cluster(self, message: ChatMessage, room: str):
"""Broadcast message to other cluster nodes"""
cluster_message = {
'type': 'cluster_broadcast',
'message': message.to_dict(),
'room': room,
'origin_server': self.server_id
}
message_data = json.dumps(cluster_message)
for node, connection in self.cluster_connections.items():
try:
connection.send(message_data.encode('utf-8'))
except Exception as e:
logger.error(f"Failed to send to cluster node {node}: {e}")
def handle_cluster_message(self, data: Dict):
"""Handle message from another cluster node"""
if data['origin_server'] == self.server_id:
return # Ignore messages from self
message_data = data['message']
room = data['room']
# Reconstruct message
message = ChatMessage.from_dict(message_data)
# Broadcast to local clients in the room
if room in self.rooms:
for client_socket in self.rooms[room]['clients']:
try:
client_socket.send(json.dumps(message.to_dict()).encode('utf-8'))
except:
pass
class ChatServerCluster:
def __init__(self, server_id: str, cluster_nodes: List[str]):
self.server_id = server_id
self.cluster_nodes = cluster_nodes
self.local_clients = {}
self.remote_clients = {}
self.cluster_connections = {}
def setup_cluster_connections(self):
"""Setup connections to other cluster nodes"""
for node in self.cluster_nodes:
if node != self.server_id:
try:
host, port = node.split(':')
cluster_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cluster_socket.connect((host, int(port)))
self.cluster_connections[node] = cluster_socket
except Exception as e:
logger.error(f"Failed to connect to cluster node {node}: {e}")
def broadcast_to_cluster(self, message: ChatMessage, room: str):
"""Broadcast message to other cluster nodes"""
cluster_message = {
'type': 'cluster_broadcast',
'message': message.to_dict(),
'room': room,
'origin_server': self.server_id
}
message_data = json.dumps(cluster_message)
for node, connection in self.cluster_connections.items():
try:
connection.send(message_data.encode('utf-8'))
except Exception as e:
logger.error(f"Failed to send to cluster node {node}: {e}")
def handle_cluster_message(self, data: Dict):
"""Handle message from another cluster node"""
if data['origin_server'] == self.server_id:
return # Ignore messages from self
message_data = data['message']
room = data['room']
# Reconstruct message
message = ChatMessage.from_dict(message_data)
# Broadcast to local clients in the room
if room in self.rooms:
for client_socket in self.rooms[room]['clients']:
try:
client_socket.send(json.dumps(message.to_dict()).encode('utf-8'))
except:
pass
Security Enhancements
1. Message Encryption
from cryptography.fernet import Fernet
import base64
class MessageEncryption:
def __init__(self):
self.key = Fernet.generate_key()
self.cipher = Fernet(self.key)
def encrypt_message(self, message: str) -> str:
"""Encrypt message content"""
encrypted = self.cipher.encrypt(message.encode())
return base64.b64encode(encrypted).decode()
def decrypt_message(self, encrypted_message: str) -> str:
"""Decrypt message content"""
encrypted_bytes = base64.b64decode(encrypted_message.encode())
decrypted = self.cipher.decrypt(encrypted_bytes)
return decrypted.decode()
def get_key_for_client(self) -> str:
"""Get encryption key for client"""
return base64.b64encode(self.key).decode()
def send_encrypted_message(self, content: str):
"""Send encrypted message"""
if hasattr(self, 'encryption') and self.encryption:
encrypted_content = self.encryption.encrypt_message(content)
message_data = {
'type': 'encrypted_message',
'content': encrypted_content,
'encrypted': True
}
else:
message_data = {
'type': 'message',
'content': content,
'encrypted': False
}
if self.client:
self.client.send_command(json.dumps(message_data))
from cryptography.fernet import Fernet
import base64
class MessageEncryption:
def __init__(self):
self.key = Fernet.generate_key()
self.cipher = Fernet(self.key)
def encrypt_message(self, message: str) -> str:
"""Encrypt message content"""
encrypted = self.cipher.encrypt(message.encode())
return base64.b64encode(encrypted).decode()
def decrypt_message(self, encrypted_message: str) -> str:
"""Decrypt message content"""
encrypted_bytes = base64.b64decode(encrypted_message.encode())
decrypted = self.cipher.decrypt(encrypted_bytes)
return decrypted.decode()
def get_key_for_client(self) -> str:
"""Get encryption key for client"""
return base64.b64encode(self.key).decode()
def send_encrypted_message(self, content: str):
"""Send encrypted message"""
if hasattr(self, 'encryption') and self.encryption:
encrypted_content = self.encryption.encrypt_message(content)
message_data = {
'type': 'encrypted_message',
'content': encrypted_content,
'encrypted': True
}
else:
message_data = {
'type': 'message',
'content': content,
'encrypted': False
}
if self.client:
self.client.send_command(json.dumps(message_data))
2. Rate Limiting
class RateLimiter:
def __init__(self, max_messages: int = 10, time_window: int = 60):
self.max_messages = max_messages
self.time_window = time_window
self.client_message_counts = {}
self.lock = threading.Lock()
def check_rate_limit(self, client_id: str) -> bool:
"""Check if client is within rate limits"""
with self.lock:
current_time = time.time()
if client_id not in self.client_message_counts:
self.client_message_counts[client_id] = []
# Remove old timestamps
client_messages = self.client_message_counts[client_id]
client_messages[:] = [
timestamp for timestamp in client_messages
if current_time - timestamp < self.time_window
]
# Check if under limit
if len(client_messages) < self.max_messages:
client_messages.append(current_time)
return True
return False
def cleanup_old_entries(self):
"""Cleanup old rate limit entries"""
with self.lock:
current_time = time.time()
for client_id in list(self.client_message_counts.keys()):
messages = self.client_message_counts[client_id]
messages[:] = [
timestamp for timestamp in messages
if current_time - timestamp < self.time_window
]
if not messages:
del self.client_message_counts[client_id]
class RateLimiter:
def __init__(self, max_messages: int = 10, time_window: int = 60):
self.max_messages = max_messages
self.time_window = time_window
self.client_message_counts = {}
self.lock = threading.Lock()
def check_rate_limit(self, client_id: str) -> bool:
"""Check if client is within rate limits"""
with self.lock:
current_time = time.time()
if client_id not in self.client_message_counts:
self.client_message_counts[client_id] = []
# Remove old timestamps
client_messages = self.client_message_counts[client_id]
client_messages[:] = [
timestamp for timestamp in client_messages
if current_time - timestamp < self.time_window
]
# Check if under limit
if len(client_messages) < self.max_messages:
client_messages.append(current_time)
return True
return False
def cleanup_old_entries(self):
"""Cleanup old rate limit entries"""
with self.lock:
current_time = time.time()
for client_id in list(self.client_message_counts.keys()):
messages = self.client_message_counts[client_id]
messages[:] = [
timestamp for timestamp in messages
if current_time - timestamp < self.time_window
]
if not messages:
del self.client_message_counts[client_id]
Usage Examples and Testing
1. Server Setup Example
def setup_production_server():
"""Setup chat server for production use"""
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('chat_server.log'),
logging.StreamHandler()
]
)
# Create server with enhanced features
server = ChatServer(host='0.0.0.0', port=8080)
# Setup security features
server.rate_limiter = RateLimiter(max_messages=30, time_window=60)
server.encryption = MessageEncryption()
# Setup database
server.init_database()
# Create default rooms
server.rooms.update({
'general': {'clients': [], 'topic': 'General discussion'},
'help': {'clients': [], 'topic': 'Get help and support'},
'random': {'clients': [], 'topic': 'Random conversations'}
})
# Start server
print("Starting production chat server...")
server.start()
if __name__ == "__main__":
setup_production_server()
def setup_production_server():
"""Setup chat server for production use"""
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('chat_server.log'),
logging.StreamHandler()
]
)
# Create server with enhanced features
server = ChatServer(host='0.0.0.0', port=8080)
# Setup security features
server.rate_limiter = RateLimiter(max_messages=30, time_window=60)
server.encryption = MessageEncryption()
# Setup database
server.init_database()
# Create default rooms
server.rooms.update({
'general': {'clients': [], 'topic': 'General discussion'},
'help': {'clients': [], 'topic': 'Get help and support'},
'random': {'clients': [], 'topic': 'Random conversations'}
})
# Start server
print("Starting production chat server...")
server.start()
if __name__ == "__main__":
setup_production_server()
2. Client Testing Example
def test_chat_functionality():
"""Test chat application functionality"""
import unittest
class ChatTestCase(unittest.TestCase):
def setUp(self):
self.server = ChatServer(host='localhost', port=12346)
self.server_thread = threading.Thread(target=self.server.start, daemon=True)
self.server_thread.start()
time.sleep(1) # Wait for server to start
def test_client_connection(self):
"""Test client can connect to server"""
client = ChatClient("test_user", port=12346)
self.assertTrue(client.connect())
client.disconnect()
def test_message_sending(self):
"""Test message sending between clients"""
client1 = ChatClient("user1", port=12346)
client2 = ChatClient("user2", port=12346)
self.assertTrue(client1.connect())
self.assertTrue(client2.connect())
# Send message from client1
client1.send_message("Hello from user1!")
# Check if client2 receives message
time.sleep(0.5)
# Add assertion logic here
client1.disconnect()
client2.disconnect()
def tearDown(self):
self.server.stop()
# Run tests
unittest.main()
def test_chat_functionality():
"""Test chat application functionality"""
import unittest
class ChatTestCase(unittest.TestCase):
def setUp(self):
self.server = ChatServer(host='localhost', port=12346)
self.server_thread = threading.Thread(target=self.server.start, daemon=True)
self.server_thread.start()
time.sleep(1) # Wait for server to start
def test_client_connection(self):
"""Test client can connect to server"""
client = ChatClient("test_user", port=12346)
self.assertTrue(client.connect())
client.disconnect()
def test_message_sending(self):
"""Test message sending between clients"""
client1 = ChatClient("user1", port=12346)
client2 = ChatClient("user2", port=12346)
self.assertTrue(client1.connect())
self.assertTrue(client2.connect())
# Send message from client1
client1.send_message("Hello from user1!")
# Check if client2 receives message
time.sleep(0.5)
# Add assertion logic here
client1.disconnect()
client2.disconnect()
def tearDown(self):
self.server.stop()
# Run tests
unittest.main()
Deployment and Production Considerations
1. Docker Configuration
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY chatapplication.py .
COPY config.json .
EXPOSE 8080
CMD ["python", "chatapplication.py", "server"]
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY chatapplication.py .
COPY config.json .
EXPOSE 8080
CMD ["python", "chatapplication.py", "server"]
2. Configuration Management
import json
class ConfigManager:
def __init__(self, config_file: str = "config.json"):
self.config_file = config_file
self.config = self.load_config()
def load_config(self) -> Dict:
"""Load configuration from file"""
try:
with open(self.config_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
return self.get_default_config()
def get_default_config(self) -> Dict:
"""Get default configuration"""
return {
"server": {
"host": "0.0.0.0",
"port": 8080,
"max_connections": 100
},
"security": {
"rate_limit_messages": 30,
"rate_limit_window": 60,
"enable_encryption": True,
"max_file_size": 10485760
},
"features": {
"enable_file_sharing": True,
"enable_private_messages": True,
"enable_rooms": True,
"max_message_history": 1000
},
"database": {
"path": "chat_history.db",
"backup_interval": 3600
}
}
def save_config(self):
"""Save current configuration to file"""
with open(self.config_file, 'w') as f:
json.dump(self.config, f, indent=2)
import json
class ConfigManager:
def __init__(self, config_file: str = "config.json"):
self.config_file = config_file
self.config = self.load_config()
def load_config(self) -> Dict:
"""Load configuration from file"""
try:
with open(self.config_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
return self.get_default_config()
def get_default_config(self) -> Dict:
"""Get default configuration"""
return {
"server": {
"host": "0.0.0.0",
"port": 8080,
"max_connections": 100
},
"security": {
"rate_limit_messages": 30,
"rate_limit_window": 60,
"enable_encryption": True,
"max_file_size": 10485760
},
"features": {
"enable_file_sharing": True,
"enable_private_messages": True,
"enable_rooms": True,
"max_message_history": 1000
},
"database": {
"path": "chat_history.db",
"backup_interval": 3600
}
}
def save_config(self):
"""Save current configuration to file"""
with open(self.config_file, 'w') as f:
json.dump(self.config, f, indent=2)
This comprehensive chat application demonstrates advanced socket programming concepts, real-time communication, and modern chat features. The modular design allows for easy customization and extension based on specific requirements. 💬🚀
Was this page helpful?
Let us know how we did