Automated Email Sender with Advanced Features
Abstract
Create a professional automated email sender system that manages contacts, templates, and bulk email campaigns with advanced features. This project demonstrates email automation, database management, GUI development, and implementing enterprise-level email marketing capabilities.
Prerequisites
- Python 3.7 or above
- Text Editor or IDE
- Solid understanding of Python syntax and OOP concepts
- Knowledge of email protocols (SMTP, IMAP)
- Familiarity with database operations and SQL
- Understanding of GUI development with Tkinter
- Basic knowledge of email security and authentication
Getting Started
Create a new project
- Create a new project folder and name it
automatedEmailSender
automatedEmailSender
. - Create a new file and name it
automatedemailsender.py
automatedemailsender.py
. - Install required dependencies:
pip install tkinter sqlite3 smtplib email
pip install tkinter sqlite3 smtplib email
- Open the project folder in your favorite text editor or IDE.
- Copy the code below and paste it into your
automatedemailsender.py
automatedemailsender.py
file.
Write the code
- Add the following code to your
automatedemailsender.py
automatedemailsender.py
file.
⚙️ Automated Email Sender with Advanced Features
# Automated Email Sender
import smtplib
import imaplib
import email
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import csv
import json
import schedule
import time
import sqlite3
import datetime
from typing import List, Dict, Optional, Tuple
import tkinter as tk
from tkinter import ttk, scrolledtext, filedialog, messagebox
import threading
from pathlib import Path
import re
import ssl
import socket
from dataclasses import dataclass
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class EmailAccount:
"""Email account configuration"""
email: str
password: str
smtp_server: str
smtp_port: int
imap_server: str = ""
imap_port: int = 993
use_tls: bool = True
@dataclass
class EmailTemplate:
"""Email template"""
name: str
subject: str
body: str
is_html: bool = False
variables: List[str] = None
class EmailDatabase:
"""Database for email management"""
def __init__(self, db_path: str = "email_system.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Initialize database tables"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Email accounts table
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_accounts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
email TEXT NOT NULL,
smtp_server TEXT NOT NULL,
smtp_port INTEGER NOT NULL,
imap_server TEXT,
imap_port INTEGER DEFAULT 993,
use_tls BOOLEAN DEFAULT 1,
created_date TEXT NOT NULL
)
""")
# Email templates table
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_templates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
subject TEXT NOT NULL,
body TEXT NOT NULL,
is_html BOOLEAN DEFAULT 0,
variables TEXT,
created_date TEXT NOT NULL,
last_used TEXT
)
""")
# Recipients/contacts table
cursor.execute("""
CREATE TABLE IF NOT EXISTS contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL,
group_name TEXT,
custom_fields TEXT,
added_date TEXT NOT NULL,
UNIQUE(email, group_name)
)
""")
# Sent emails log table
cursor.execute("""
CREATE TABLE IF NOT EXISTS sent_emails (
id INTEGER PRIMARY KEY AUTOINCREMENT,
account_id INTEGER NOT NULL,
template_id INTEGER,
recipient_email TEXT NOT NULL,
recipient_name TEXT,
subject TEXT NOT NULL,
sent_date TEXT NOT NULL,
success BOOLEAN NOT NULL,
error_message TEXT,
FOREIGN KEY (account_id) REFERENCES email_accounts (id),
FOREIGN KEY (template_id) REFERENCES email_templates (id)
)
""")
# Scheduled emails table
cursor.execute("""
CREATE TABLE IF NOT EXISTS scheduled_emails (
id INTEGER PRIMARY KEY AUTOINCREMENT,
account_id INTEGER NOT NULL,
template_id INTEGER NOT NULL,
recipient_group TEXT,
schedule_time TEXT NOT NULL,
repeat_type TEXT,
next_send_time TEXT NOT NULL,
active BOOLEAN DEFAULT 1,
created_date TEXT NOT NULL,
FOREIGN KEY (account_id) REFERENCES email_accounts (id),
FOREIGN KEY (template_id) REFERENCES email_templates (id)
)
""")
conn.commit()
def add_email_account(self, name: str, email: str, smtp_server: str, smtp_port: int,
imap_server: str = "", imap_port: int = 993, use_tls: bool = True) -> bool:
"""Add email account configuration"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
try:
cursor.execute("""
INSERT INTO email_accounts (name, email, smtp_server, smtp_port,
imap_server, imap_port, use_tls, created_date)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (name, email, smtp_server, smtp_port, imap_server, imap_port,
use_tls, datetime.datetime.now().isoformat()))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
def get_email_accounts(self) -> List[Dict]:
"""Get all email accounts"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM email_accounts ORDER BY name")
accounts = []
for row in cursor.fetchall():
accounts.append({
'id': row[0], 'name': row[1], 'email': row[2],
'smtp_server': row[3], 'smtp_port': row[4],
'imap_server': row[5], 'imap_port': row[6],
'use_tls': bool(row[7]), 'created_date': row[8]
})
return accounts
def add_template(self, name: str, subject: str, body: str, is_html: bool = False,
variables: List[str] = None) -> bool:
"""Add email template"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
try:
variables_json = json.dumps(variables) if variables else "[]"
cursor.execute("""
INSERT INTO email_templates (name, subject, body, is_html, variables, created_date)
VALUES (?, ?, ?, ?, ?, ?)
""", (name, subject, body, is_html, variables_json,
datetime.datetime.now().isoformat()))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
def get_templates(self) -> List[Dict]:
"""Get all email templates"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM email_templates ORDER BY name")
templates = []
for row in cursor.fetchall():
templates.append({
'id': row[0], 'name': row[1], 'subject': row[2],
'body': row[3], 'is_html': bool(row[4]),
'variables': json.loads(row[5]) if row[5] else [],
'created_date': row[6], 'last_used': row[7]
})
return templates
def add_contact(self, name: str, email: str, group_name: str = "default",
custom_fields: Dict = None) -> bool:
"""Add contact to database"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
try:
custom_fields_json = json.dumps(custom_fields) if custom_fields else "{}"
cursor.execute("""
INSERT INTO contacts (name, email, group_name, custom_fields, added_date)
VALUES (?, ?, ?, ?, ?)
""", (name, email, group_name, custom_fields_json,
datetime.datetime.now().isoformat()))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
def get_contacts(self, group_name: str = None) -> List[Dict]:
"""Get contacts from database"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
if group_name:
cursor.execute("SELECT * FROM contacts WHERE group_name = ? ORDER BY name", (group_name,))
else:
cursor.execute("SELECT * FROM contacts ORDER BY group_name, name")
contacts = []
for row in cursor.fetchall():
contacts.append({
'id': row[0], 'name': row[1], 'email': row[2],
'group_name': row[3],
'custom_fields': json.loads(row[4]) if row[4] else {},
'added_date': row[5]
})
return contacts
def get_contact_groups(self) -> List[str]:
"""Get all contact groups"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT group_name FROM contacts ORDER BY group_name")
return [row[0] for row in cursor.fetchall()]
def log_sent_email(self, account_id: int, template_id: int, recipient_email: str,
recipient_name: str, subject: str, success: bool,
error_message: str = None):
"""Log sent email"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO sent_emails (account_id, template_id, recipient_email,
recipient_name, subject, sent_date, success, error_message)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (account_id, template_id, recipient_email, recipient_name, subject,
datetime.datetime.now().isoformat(), success, error_message))
conn.commit()
class EmailSender:
"""Main email sending class"""
def __init__(self, database: EmailDatabase):
self.db = database
self.common_providers = {
'gmail.com': {'smtp': 'smtp.gmail.com', 'port': 587, 'imap': 'imap.gmail.com'},
'outlook.com': {'smtp': 'smtp-mail.outlook.com', 'port': 587, 'imap': 'outlook.office365.com'},
'hotmail.com': {'smtp': 'smtp-mail.outlook.com', 'port': 587, 'imap': 'outlook.office365.com'},
'yahoo.com': {'smtp': 'smtp.mail.yahoo.com', 'port': 587, 'imap': 'imap.mail.yahoo.com'},
'icloud.com': {'smtp': 'smtp.mail.me.com', 'port': 587, 'imap': 'imap.mail.me.com'},
}
def get_smtp_settings(self, email: str) -> Dict:
"""Get SMTP settings for common email providers"""
domain = email.split('@')[1].lower()
return self.common_providers.get(domain, {})
def test_connection(self, account: EmailAccount, password: str) -> Tuple[bool, str]:
"""Test email account connection"""
try:
# Create SMTP connection
if account.smtp_port == 465:
server = smtplib.SMTP_SSL(account.smtp_server, account.smtp_port)
else:
server = smtplib.SMTP(account.smtp_server, account.smtp_port)
if account.use_tls:
server.starttls()
# Login
server.login(account.email, password)
server.quit()
return True, "Connection successful"
except smtplib.SMTPAuthenticationError:
return False, "Authentication failed - check email and password"
except smtplib.SMTPConnectError:
return False, f"Cannot connect to SMTP server {account.smtp_server}:{account.smtp_port}"
except socket.gaierror:
return False, f"Invalid SMTP server address: {account.smtp_server}"
except Exception as e:
return False, f"Connection error: {str(e)}"
def send_email(self, account: EmailAccount, password: str, to_email: str,
subject: str, body: str, is_html: bool = False,
attachments: List[str] = None, to_name: str = "") -> Tuple[bool, str]:
"""Send a single email"""
try:
# Create message
msg = MIMEMultipart('alternative' if is_html else 'mixed')
msg['From'] = account.email
msg['To'] = to_email
msg['Subject'] = subject
# Add body
if is_html:
msg.attach(MIMEText(body, 'html'))
else:
msg.attach(MIMEText(body, 'plain'))
# Add attachments
if attachments:
for file_path in attachments:
if Path(file_path).exists():
with open(file_path, "rb") as attachment:
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename= {Path(file_path).name}'
)
msg.attach(part)
# Create SMTP connection and send
if account.smtp_port == 465:
server = smtplib.SMTP_SSL(account.smtp_server, account.smtp_port)
else:
server = smtplib.SMTP(account.smtp_server, account.smtp_port)
if account.use_tls:
server.starttls()
server.login(account.email, password)
server.send_message(msg)
server.quit()
return True, "Email sent successfully"
except Exception as e:
return False, f"Failed to send email: {str(e)}"
def send_bulk_emails(self, account: EmailAccount, password: str,
recipients: List[Dict], template: EmailTemplate,
attachments: List[str] = None,
progress_callback=None) -> Dict:
"""Send bulk emails with template"""
results = {'sent': 0, 'failed': 0, 'errors': []}
for i, recipient in enumerate(recipients):
try:
# Replace variables in template
subject = self.replace_variables(template.subject, recipient)
body = self.replace_variables(template.body, recipient)
# Send email
success, message = self.send_email(
account, password, recipient['email'], subject, body,
template.is_html, attachments, recipient.get('name', '')
)
if success:
results['sent'] += 1
else:
results['failed'] += 1
results['errors'].append(f"{recipient['email']}: {message}")
# Call progress callback
if progress_callback:
progress = (i + 1) / len(recipients) * 100
progress_callback(progress, recipient['email'], success)
# Small delay to avoid being flagged as spam
time.sleep(1)
except Exception as e:
results['failed'] += 1
results['errors'].append(f"{recipient['email']}: {str(e)}")
return results
def replace_variables(self, text: str, data: Dict) -> str:
"""Replace variables in template text"""
for key, value in data.items():
placeholder = f"{{{key}}}"
text = text.replace(placeholder, str(value))
return text
def parse_csv_contacts(self, csv_file_path: str) -> List[Dict]:
"""Parse contacts from CSV file"""
contacts = []
try:
with open(csv_file_path, 'r', newline='', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
# Clean up the row
clean_row = {k.strip(): v.strip() for k, v in row.items() if v.strip()}
# Ensure email field exists
if 'email' in clean_row and clean_row['email']:
if 'name' not in clean_row:
clean_row['name'] = clean_row['email'].split('@')[0]
contacts.append(clean_row)
except Exception as e:
logger.error(f"Error parsing CSV: {e}")
return contacts
class EmailScheduler:
"""Email scheduling functionality"""
def __init__(self, email_sender: EmailSender):
self.email_sender = email_sender
self.running = False
self.scheduler_thread = None
def start_scheduler(self):
"""Start the email scheduler"""
if not self.running:
self.running = True
self.scheduler_thread = threading.Thread(target=self._run_scheduler, daemon=True)
self.scheduler_thread.start()
def stop_scheduler(self):
"""Stop the email scheduler"""
self.running = False
if self.scheduler_thread:
self.scheduler_thread.join(timeout=5)
def _run_scheduler(self):
"""Run the scheduler loop"""
while self.running:
schedule.run_pending()
time.sleep(60) # Check every minute
def schedule_email(self, account_id: int, template_id: int,
recipient_group: str, schedule_time: str,
repeat_type: str = "none"):
"""Schedule an email"""
# Implementation would depend on specific scheduling requirements
pass
class EmailGUI:
"""GUI for email sender application"""
def __init__(self):
self.db = EmailDatabase()
self.email_sender = EmailSender(self.db)
self.scheduler = EmailScheduler(self.email_sender)
self.root = tk.Tk()
self.root.title("Automated Email Sender")
self.root.geometry("1000x700")
self.current_account = None
self.current_password = None
self.setup_ui()
self.refresh_data()
def setup_ui(self):
"""Setup the user interface"""
# Create notebook for tabs
self.notebook = ttk.Notebook(self.root)
self.notebook.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# Accounts tab
self.accounts_frame = ttk.Frame(self.notebook)
self.notebook.add(self.accounts_frame, text="Email Accounts")
self.setup_accounts_tab()
# Templates tab
self.templates_frame = ttk.Frame(self.notebook)
self.notebook.add(self.templates_frame, text="Templates")
self.setup_templates_tab()
# Contacts tab
self.contacts_frame = ttk.Frame(self.notebook)
self.notebook.add(self.contacts_frame, text="Contacts")
self.setup_contacts_tab()
# Send Email tab
self.send_frame = ttk.Frame(self.notebook)
self.notebook.add(self.send_frame, text="Send Email")
self.setup_send_tab()
# History tab
self.history_frame = ttk.Frame(self.notebook)
self.notebook.add(self.history_frame, text="History")
self.setup_history_tab()
def setup_accounts_tab(self):
"""Setup email accounts tab"""
main_frame = ttk.Frame(self.accounts_frame, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# Account list
list_frame = ttk.LabelFrame(main_frame, text="Email Accounts", padding="5")
list_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
# Treeview for accounts
self.accounts_tree = ttk.Treeview(list_frame, columns=('Email', 'SMTP Server', 'Port'),
show='tree headings', height=8)
self.accounts_tree.heading('#0', text='Account Name')
self.accounts_tree.heading('Email', text='Email')
self.accounts_tree.heading('SMTP Server', text='SMTP Server')
self.accounts_tree.heading('Port', text='Port')
self.accounts_tree.pack(fill=tk.BOTH, expand=True, padx=(0, 5))
# Scrollbar for accounts tree
accounts_scrollbar = ttk.Scrollbar(list_frame, orient=tk.VERTICAL, command=self.accounts_tree.yview)
accounts_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.accounts_tree.configure(yscrollcommand=accounts_scrollbar.set)
# Add account form
form_frame = ttk.LabelFrame(main_frame, text="Add/Edit Account", padding="10")
form_frame.pack(fill=tk.X)
# Form fields
ttk.Label(form_frame, text="Account Name:").grid(row=0, column=0, sticky=tk.W, pady=2)
self.account_name_var = tk.StringVar()
ttk.Entry(form_frame, textvariable=self.account_name_var, width=25).grid(row=0, column=1, padx=5, pady=2, sticky=tk.W)
ttk.Label(form_frame, text="Email:").grid(row=0, column=2, sticky=tk.W, pady=2)
self.account_email_var = tk.StringVar()
email_entry = ttk.Entry(form_frame, textvariable=self.account_email_var, width=30)
email_entry.grid(row=0, column=3, padx=5, pady=2, sticky=tk.W)
email_entry.bind('<FocusOut>', self.auto_detect_smtp)
ttk.Label(form_frame, text="SMTP Server:").grid(row=1, column=0, sticky=tk.W, pady=2)
self.smtp_server_var = tk.StringVar()
ttk.Entry(form_frame, textvariable=self.smtp_server_var, width=25).grid(row=1, column=1, padx=5, pady=2, sticky=tk.W)
ttk.Label(form_frame, text="SMTP Port:").grid(row=1, column=2, sticky=tk.W, pady=2)
self.smtp_port_var = tk.StringVar(value="587")
ttk.Entry(form_frame, textvariable=self.smtp_port_var, width=10).grid(row=1, column=3, padx=5, pady=2, sticky=tk.W)
# Buttons
button_frame = ttk.Frame(form_frame)
button_frame.grid(row=2, column=0, columnspan=4, pady=10)
ttk.Button(button_frame, text="Add Account", command=self.add_account).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="Test Connection", command=self.test_account_connection).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="Clear", command=self.clear_account_form).pack(side=tk.LEFT, padx=5)
def setup_templates_tab(self):
"""Setup email templates tab"""
main_frame = ttk.Frame(self.templates_frame, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# Template list
list_frame = ttk.LabelFrame(main_frame, text="Email Templates", padding="5")
list_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
self.templates_tree = ttk.Treeview(list_frame, columns=('Subject', 'Type'),
show='tree headings', height=6)
self.templates_tree.heading('#0', text='Template Name')
self.templates_tree.heading('Subject', text='Subject')
self.templates_tree.heading('Type', text='Type')
self.templates_tree.pack(fill=tk.BOTH, expand=True)
self.templates_tree.bind('<Double-1>', self.load_template)
# Template form
form_frame = ttk.LabelFrame(main_frame, text="Create/Edit Template", padding="10")
form_frame.pack(fill=tk.X)
# Template name and subject
top_frame = ttk.Frame(form_frame)
top_frame.pack(fill=tk.X, pady=(0, 5))
ttk.Label(top_frame, text="Name:").pack(side=tk.LEFT)
self.template_name_var = tk.StringVar()
ttk.Entry(top_frame, textvariable=self.template_name_var, width=20).pack(side=tk.LEFT, padx=5)
ttk.Label(top_frame, text="Subject:").pack(side=tk.LEFT, padx=(20, 0))
self.template_subject_var = tk.StringVar()
ttk.Entry(top_frame, textvariable=self.template_subject_var, width=40).pack(side=tk.LEFT, padx=5)
self.template_html_var = tk.BooleanVar()
ttk.Checkbutton(top_frame, text="HTML", variable=self.template_html_var).pack(side=tk.LEFT, padx=10)
# Template body
ttk.Label(form_frame, text="Body:").pack(anchor=tk.W)
self.template_body = scrolledtext.ScrolledText(form_frame, height=8, wrap=tk.WORD)
self.template_body.pack(fill=tk.X, pady=5)
# Variables info
variables_info = "Use variables like {name}, {email}, {company} in your template"
ttk.Label(form_frame, text=variables_info, font=("TkDefaultFont", 8)).pack(anchor=tk.W)
# Template buttons
template_buttons = ttk.Frame(form_frame)
template_buttons.pack(fill=tk.X, pady=5)
ttk.Button(template_buttons, text="Save Template", command=self.save_template).pack(side=tk.LEFT, padx=5)
ttk.Button(template_buttons, text="Clear", command=self.clear_template_form).pack(side=tk.LEFT, padx=5)
def setup_contacts_tab(self):
"""Setup contacts tab"""
main_frame = ttk.Frame(self.contacts_frame, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# Contacts list
list_frame = ttk.LabelFrame(main_frame, text="Contacts", padding="5")
list_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
self.contacts_tree = ttk.Treeview(list_frame, columns=('Email', 'Group'),
show='tree headings', height=8)
self.contacts_tree.heading('#0', text='Name')
self.contacts_tree.heading('Email', text='Email')
self.contacts_tree.heading('Group', text='Group')
self.contacts_tree.pack(fill=tk.BOTH, expand=True)
# Contact management
mgmt_frame = ttk.Frame(main_frame)
mgmt_frame.pack(fill=tk.X)
# Import from CSV
import_frame = ttk.LabelFrame(mgmt_frame, text="Import Contacts", padding="5")
import_frame.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 5))
ttk.Button(import_frame, text="Import from CSV", command=self.import_contacts_csv).pack(side=tk.LEFT, padx=5)
ttk.Label(import_frame, text="Group:").pack(side=tk.LEFT, padx=(20, 5))
self.import_group_var = tk.StringVar(value="default")
ttk.Entry(import_frame, textvariable=self.import_group_var, width=15).pack(side=tk.LEFT)
# Add contact manually
add_frame = ttk.LabelFrame(mgmt_frame, text="Add Contact", padding="5")
add_frame.pack(side=tk.RIGHT, fill=tk.X, expand=True, padx=(5, 0))
contact_fields = ttk.Frame(add_frame)
contact_fields.pack(fill=tk.X)
ttk.Label(contact_fields, text="Name:").grid(row=0, column=0, sticky=tk.W)
self.contact_name_var = tk.StringVar()
ttk.Entry(contact_fields, textvariable=self.contact_name_var, width=15).grid(row=0, column=1, padx=2)
ttk.Label(contact_fields, text="Email:").grid(row=0, column=2, sticky=tk.W, padx=(10, 0))
self.contact_email_var = tk.StringVar()
ttk.Entry(contact_fields, textvariable=self.contact_email_var, width=20).grid(row=0, column=3, padx=2)
ttk.Button(add_frame, text="Add Contact", command=self.add_contact).pack(pady=5)
def setup_send_tab(self):
"""Setup send email tab"""
main_frame = ttk.Frame(self.send_frame, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# Send options
options_frame = ttk.LabelFrame(main_frame, text="Send Options", padding="10")
options_frame.pack(fill=tk.X, pady=(0, 10))
# Account selection
ttk.Label(options_frame, text="From Account:").grid(row=0, column=0, sticky=tk.W, pady=2)
self.send_account_var = tk.StringVar()
self.send_account_combo = ttk.Combobox(options_frame, textvariable=self.send_account_var, width=30)
self.send_account_combo.grid(row=0, column=1, padx=5, pady=2, sticky=tk.W)
ttk.Button(options_frame, text="Set Password", command=self.set_account_password).grid(row=0, column=2, padx=10)
# Template selection
ttk.Label(options_frame, text="Template:").grid(row=1, column=0, sticky=tk.W, pady=2)
self.send_template_var = tk.StringVar()
self.send_template_combo = ttk.Combobox(options_frame, textvariable=self.send_template_var, width=30)
self.send_template_combo.grid(row=1, column=1, padx=5, pady=2, sticky=tk.W)
# Recipient selection
ttk.Label(options_frame, text="Recipients:").grid(row=2, column=0, sticky=tk.W, pady=2)
self.recipient_type_var = tk.StringVar(value="group")
type_frame = ttk.Frame(options_frame)
type_frame.grid(row=2, column=1, columnspan=2, padx=5, pady=2, sticky=tk.W)
ttk.Radiobutton(type_frame, text="Group", variable=self.recipient_type_var,
value="group", command=self.update_recipient_options).pack(side=tk.LEFT)
ttk.Radiobutton(type_frame, text="All Contacts", variable=self.recipient_type_var,
value="all", command=self.update_recipient_options).pack(side=tk.LEFT, padx=10)
ttk.Radiobutton(type_frame, text="Single Email", variable=self.recipient_type_var,
value="single", command=self.update_recipient_options).pack(side=tk.LEFT, padx=10)
# Recipient details
self.recipient_frame = ttk.Frame(options_frame)
self.recipient_frame.grid(row=3, column=0, columnspan=3, sticky=tk.W, pady=5)
self.recipient_group_var = tk.StringVar()
self.recipient_email_var = tk.StringVar()
self.update_recipient_options()
# Attachments
attach_frame = ttk.LabelFrame(main_frame, text="Attachments", padding="5")
attach_frame.pack(fill=tk.X, pady=(0, 10))
self.attachments_list = tk.Listbox(attach_frame, height=3)
self.attachments_list.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5))
attach_buttons = ttk.Frame(attach_frame)
attach_buttons.pack(side=tk.RIGHT, fill=tk.Y)
ttk.Button(attach_buttons, text="Add File", command=self.add_attachment).pack(pady=2)
ttk.Button(attach_buttons, text="Remove", command=self.remove_attachment).pack(pady=2)
# Send button and progress
send_frame = ttk.Frame(main_frame)
send_frame.pack(fill=tk.X, pady=10)
ttk.Button(send_frame, text="Send Emails", command=self.send_emails,
style="Accent.TButton").pack(side=tk.LEFT, padx=5)
self.progress_var = tk.StringVar(value="Ready to send")
ttk.Label(send_frame, textvariable=self.progress_var).pack(side=tk.LEFT, padx=20)
self.progress_bar = ttk.Progressbar(send_frame, length=200, mode='determinate')
self.progress_bar.pack(side=tk.RIGHT, padx=5)
def setup_history_tab(self):
"""Setup email history tab"""
main_frame = ttk.Frame(self.history_frame, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# History list
self.history_tree = ttk.Treeview(main_frame, columns=('Recipient', 'Subject', 'Date', 'Status'),
show='tree headings')
self.history_tree.heading('#0', text='ID')
self.history_tree.heading('Recipient', text='Recipient')
self.history_tree.heading('Subject', text='Subject')
self.history_tree.heading('Date', text='Date')
self.history_tree.heading('Status', text='Status')
self.history_tree.pack(fill=tk.BOTH, expand=True)
ttk.Button(main_frame, text="Refresh History", command=self.refresh_history).pack(pady=10)
def auto_detect_smtp(self, event=None):
"""Auto-detect SMTP settings based on email"""
email = self.account_email_var.get()
if '@' in email:
settings = self.email_sender.get_smtp_settings(email)
if settings:
self.smtp_server_var.set(settings.get('smtp', ''))
self.smtp_port_var.set(str(settings.get('port', 587)))
def add_account(self):
"""Add new email account"""
name = self.account_name_var.get().strip()
email = self.account_email_var.get().strip()
smtp_server = self.smtp_server_var.get().strip()
try:
smtp_port = int(self.smtp_port_var.get())
except ValueError:
messagebox.showerror("Error", "Invalid port number")
return
if not all([name, email, smtp_server]):
messagebox.showerror("Error", "Please fill in all required fields")
return
if self.db.add_email_account(name, email, smtp_server, smtp_port):
messagebox.showinfo("Success", "Email account added successfully")
self.clear_account_form()
self.refresh_accounts()
else:
messagebox.showerror("Error", "Account name already exists")
def test_account_connection(self):
"""Test email account connection"""
email = self.account_email_var.get().strip()
smtp_server = self.smtp_server_var.get().strip()
try:
smtp_port = int(self.smtp_port_var.get())
except ValueError:
messagebox.showerror("Error", "Invalid port number")
return
if not all([email, smtp_server]):
messagebox.showerror("Error", "Please enter email and SMTP server")
return
# Get password
password = simpledialog.askstring("Password", f"Enter password for {email}:", show='*')
if not password:
return
account = EmailAccount(email, password, smtp_server, smtp_port)
success, message = self.email_sender.test_connection(account, password)
if success:
messagebox.showinfo("Success", message)
else:
messagebox.showerror("Error", message)
def clear_account_form(self):
"""Clear account form"""
self.account_name_var.set("")
self.account_email_var.set("")
self.smtp_server_var.set("")
self.smtp_port_var.set("587")
def save_template(self):
"""Save email template"""
name = self.template_name_var.get().strip()
subject = self.template_subject_var.get().strip()
body = self.template_body.get(1.0, tk.END).strip()
is_html = self.template_html_var.get()
if not all([name, subject, body]):
messagebox.showerror("Error", "Please fill in all fields")
return
# Extract variables from template
variables = re.findall(r'\{(\w+)\}', subject + ' ' + body)
variables = list(set(variables)) # Remove duplicates
if self.db.add_template(name, subject, body, is_html, variables):
messagebox.showinfo("Success", f"Template saved with variables: {', '.join(variables)}")
self.clear_template_form()
self.refresh_templates()
else:
messagebox.showerror("Error", "Template name already exists")
def load_template(self, event):
"""Load selected template"""
selection = self.templates_tree.selection()
if selection:
item = self.templates_tree.item(selection[0])
template_name = item['text']
templates = self.db.get_templates()
for template in templates:
if template['name'] == template_name:
self.template_name_var.set(template['name'])
self.template_subject_var.set(template['subject'])
self.template_body.delete(1.0, tk.END)
self.template_body.insert(1.0, template['body'])
self.template_html_var.set(template['is_html'])
break
def clear_template_form(self):
"""Clear template form"""
self.template_name_var.set("")
self.template_subject_var.set("")
self.template_body.delete(1.0, tk.END)
self.template_html_var.set(False)
def import_contacts_csv(self):
"""Import contacts from CSV file"""
file_path = filedialog.askopenfilename(
title="Select CSV file",
filetypes=[("CSV files", "*.csv"), ("All files", "*.*")]
)
if file_path:
contacts = self.email_sender.parse_csv_contacts(file_path)
group_name = self.import_group_var.get().strip() or "default"
added_count = 0
for contact in contacts:
if self.db.add_contact(contact.get('name', ''), contact['email'],
group_name, contact):
added_count += 1
messagebox.showinfo("Import Complete",
f"Added {added_count} contacts to group '{group_name}'")
self.refresh_contacts()
def add_contact(self):
"""Add single contact"""
name = self.contact_name_var.get().strip()
email = self.contact_email_var.get().strip()
if not email:
messagebox.showerror("Error", "Please enter an email address")
return
if not name:
name = email.split('@')[0]
if self.db.add_contact(name, email, "default"):
messagebox.showinfo("Success", "Contact added successfully")
self.contact_name_var.set("")
self.contact_email_var.set("")
self.refresh_contacts()
else:
messagebox.showerror("Error", "Contact already exists")
def update_recipient_options(self):
"""Update recipient selection options"""
for widget in self.recipient_frame.winfo_children():
widget.destroy()
if self.recipient_type_var.get() == "group":
ttk.Label(self.recipient_frame, text="Group:").pack(side=tk.LEFT)
group_combo = ttk.Combobox(self.recipient_frame, textvariable=self.recipient_group_var, width=20)
group_combo.pack(side=tk.LEFT, padx=5)
# Update group options
groups = self.db.get_contact_groups()
group_combo['values'] = groups
if groups:
self.recipient_group_var.set(groups[0])
elif self.recipient_type_var.get() == "single":
ttk.Label(self.recipient_frame, text="Email:").pack(side=tk.LEFT)
ttk.Entry(self.recipient_frame, textvariable=self.recipient_email_var, width=30).pack(side=tk.LEFT, padx=5)
def set_account_password(self):
"""Set password for selected account"""
account_name = self.send_account_var.get()
if not account_name:
messagebox.showerror("Error", "Please select an account")
return
password = simpledialog.askstring("Password", f"Enter password for {account_name}:", show='*')
if password:
self.current_password = password
messagebox.showinfo("Success", "Password set successfully")
def add_attachment(self):
"""Add attachment file"""
file_path = filedialog.askopenfilename(title="Select attachment")
if file_path:
self.attachments_list.insert(tk.END, file_path)
def remove_attachment(self):
"""Remove selected attachment"""
selection = self.attachments_list.curselection()
if selection:
self.attachments_list.delete(selection[0])
def send_emails(self):
"""Send emails"""
if not self.current_account or not self.current_password:
messagebox.showerror("Error", "Please select account and set password")
return
template_name = self.send_template_var.get()
if not template_name:
messagebox.showerror("Error", "Please select a template")
return
# Get template
templates = self.db.get_templates()
template = None
for t in templates:
if t['name'] == template_name:
template = EmailTemplate(t['name'], t['subject'], t['body'],
t['is_html'], t['variables'])
break
if not template:
messagebox.showerror("Error", "Template not found")
return
# Get recipients
recipients = []
recipient_type = self.recipient_type_var.get()
if recipient_type == "group":
group_name = self.recipient_group_var.get()
recipients = self.db.get_contacts(group_name)
elif recipient_type == "all":
recipients = self.db.get_contacts()
elif recipient_type == "single":
email = self.recipient_email_var.get().strip()
if email:
recipients = [{'name': email.split('@')[0], 'email': email}]
if not recipients:
messagebox.showerror("Error", "No recipients found")
return
# Get attachments
attachments = [self.attachments_list.get(i) for i in range(self.attachments_list.size())]
# Send emails in separate thread
def send_thread():
def progress_callback(progress, email, success):
self.root.after(0, lambda: self.update_send_progress(progress, email, success))
results = self.email_sender.send_bulk_emails(
self.current_account, self.current_password, recipients,
template, attachments, progress_callback
)
self.root.after(0, lambda: self.send_complete(results))
threading.Thread(target=send_thread, daemon=True).start()
# Reset progress
self.progress_bar['value'] = 0
self.progress_var.set("Sending emails...")
def update_send_progress(self, progress, email, success):
"""Update sending progress"""
self.progress_bar['value'] = progress
status = "✓" if success else "✗"
self.progress_var.set(f"Sending to {email} {status}")
self.root.update_idletasks()
def send_complete(self, results):
"""Handle send completion"""
self.progress_bar['value'] = 100
self.progress_var.set(f"Complete: {results['sent']} sent, {results['failed']} failed")
message = f"Email sending complete:\n\nSent: {results['sent']}\nFailed: {results['failed']}"
if results['errors']:
message += f"\n\nErrors:\n" + "\n".join(results['errors'][:5])
if len(results['errors']) > 5:
message += f"\n... and {len(results['errors']) - 5} more errors"
messagebox.showinfo("Send Complete", message)
self.refresh_history()
def refresh_data(self):
"""Refresh all data"""
self.refresh_accounts()
self.refresh_templates()
self.refresh_contacts()
def refresh_accounts(self):
"""Refresh accounts list"""
self.accounts_tree.delete(*self.accounts_tree.get_children())
accounts = self.db.get_email_accounts()
account_names = []
for account in accounts:
self.accounts_tree.insert('', tk.END, text=account['name'],
values=(account['email'], account['smtp_server'], account['smtp_port']))
account_names.append(account['name'])
# Update send account combo
self.send_account_combo['values'] = account_names
if account_names and not self.send_account_var.get():
self.send_account_var.set(account_names[0])
# Set current account
for account in accounts:
if account['name'] == account_names[0]:
self.current_account = EmailAccount(
account['email'], "", account['smtp_server'],
account['smtp_port'], account.get('imap_server', ''),
account['imap_port'], account['use_tls']
)
break
def refresh_templates(self):
"""Refresh templates list"""
self.templates_tree.delete(*self.templates_tree.get_children())
templates = self.db.get_templates()
template_names = []
for template in templates:
template_type = "HTML" if template['is_html'] else "Text"
self.templates_tree.insert('', tk.END, text=template['name'],
values=(template['subject'], template_type))
template_names.append(template['name'])
# Update send template combo
self.send_template_combo['values'] = template_names
def refresh_contacts(self):
"""Refresh contacts list"""
self.contacts_tree.delete(*self.contacts_tree.get_children())
contacts = self.db.get_contacts()
for contact in contacts:
self.contacts_tree.insert('', tk.END, text=contact['name'],
values=(contact['email'], contact['group_name']))
def refresh_history(self):
"""Refresh email history"""
# This would query the sent_emails table
pass
def run(self):
"""Run the application"""
self.root.mainloop()
def main():
"""Main function"""
app = EmailGUI()
app.run()
if __name__ == "__main__":
main()
# Automated Email Sender
import smtplib
import imaplib
import email
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import csv
import json
import schedule
import time
import sqlite3
import datetime
from typing import List, Dict, Optional, Tuple
import tkinter as tk
from tkinter import ttk, scrolledtext, filedialog, messagebox
import threading
from pathlib import Path
import re
import ssl
import socket
from dataclasses import dataclass
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class EmailAccount:
"""Email account configuration"""
email: str
password: str
smtp_server: str
smtp_port: int
imap_server: str = ""
imap_port: int = 993
use_tls: bool = True
@dataclass
class EmailTemplate:
"""Email template"""
name: str
subject: str
body: str
is_html: bool = False
variables: List[str] = None
class EmailDatabase:
"""Database for email management"""
def __init__(self, db_path: str = "email_system.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Initialize database tables"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Email accounts table
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_accounts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
email TEXT NOT NULL,
smtp_server TEXT NOT NULL,
smtp_port INTEGER NOT NULL,
imap_server TEXT,
imap_port INTEGER DEFAULT 993,
use_tls BOOLEAN DEFAULT 1,
created_date TEXT NOT NULL
)
""")
# Email templates table
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_templates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
subject TEXT NOT NULL,
body TEXT NOT NULL,
is_html BOOLEAN DEFAULT 0,
variables TEXT,
created_date TEXT NOT NULL,
last_used TEXT
)
""")
# Recipients/contacts table
cursor.execute("""
CREATE TABLE IF NOT EXISTS contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL,
group_name TEXT,
custom_fields TEXT,
added_date TEXT NOT NULL,
UNIQUE(email, group_name)
)
""")
# Sent emails log table
cursor.execute("""
CREATE TABLE IF NOT EXISTS sent_emails (
id INTEGER PRIMARY KEY AUTOINCREMENT,
account_id INTEGER NOT NULL,
template_id INTEGER,
recipient_email TEXT NOT NULL,
recipient_name TEXT,
subject TEXT NOT NULL,
sent_date TEXT NOT NULL,
success BOOLEAN NOT NULL,
error_message TEXT,
FOREIGN KEY (account_id) REFERENCES email_accounts (id),
FOREIGN KEY (template_id) REFERENCES email_templates (id)
)
""")
# Scheduled emails table
cursor.execute("""
CREATE TABLE IF NOT EXISTS scheduled_emails (
id INTEGER PRIMARY KEY AUTOINCREMENT,
account_id INTEGER NOT NULL,
template_id INTEGER NOT NULL,
recipient_group TEXT,
schedule_time TEXT NOT NULL,
repeat_type TEXT,
next_send_time TEXT NOT NULL,
active BOOLEAN DEFAULT 1,
created_date TEXT NOT NULL,
FOREIGN KEY (account_id) REFERENCES email_accounts (id),
FOREIGN KEY (template_id) REFERENCES email_templates (id)
)
""")
conn.commit()
def add_email_account(self, name: str, email: str, smtp_server: str, smtp_port: int,
imap_server: str = "", imap_port: int = 993, use_tls: bool = True) -> bool:
"""Add email account configuration"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
try:
cursor.execute("""
INSERT INTO email_accounts (name, email, smtp_server, smtp_port,
imap_server, imap_port, use_tls, created_date)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (name, email, smtp_server, smtp_port, imap_server, imap_port,
use_tls, datetime.datetime.now().isoformat()))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
def get_email_accounts(self) -> List[Dict]:
"""Get all email accounts"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM email_accounts ORDER BY name")
accounts = []
for row in cursor.fetchall():
accounts.append({
'id': row[0], 'name': row[1], 'email': row[2],
'smtp_server': row[3], 'smtp_port': row[4],
'imap_server': row[5], 'imap_port': row[6],
'use_tls': bool(row[7]), 'created_date': row[8]
})
return accounts
def add_template(self, name: str, subject: str, body: str, is_html: bool = False,
variables: List[str] = None) -> bool:
"""Add email template"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
try:
variables_json = json.dumps(variables) if variables else "[]"
cursor.execute("""
INSERT INTO email_templates (name, subject, body, is_html, variables, created_date)
VALUES (?, ?, ?, ?, ?, ?)
""", (name, subject, body, is_html, variables_json,
datetime.datetime.now().isoformat()))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
def get_templates(self) -> List[Dict]:
"""Get all email templates"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM email_templates ORDER BY name")
templates = []
for row in cursor.fetchall():
templates.append({
'id': row[0], 'name': row[1], 'subject': row[2],
'body': row[3], 'is_html': bool(row[4]),
'variables': json.loads(row[5]) if row[5] else [],
'created_date': row[6], 'last_used': row[7]
})
return templates
def add_contact(self, name: str, email: str, group_name: str = "default",
custom_fields: Dict = None) -> bool:
"""Add contact to database"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
try:
custom_fields_json = json.dumps(custom_fields) if custom_fields else "{}"
cursor.execute("""
INSERT INTO contacts (name, email, group_name, custom_fields, added_date)
VALUES (?, ?, ?, ?, ?)
""", (name, email, group_name, custom_fields_json,
datetime.datetime.now().isoformat()))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
def get_contacts(self, group_name: str = None) -> List[Dict]:
"""Get contacts from database"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
if group_name:
cursor.execute("SELECT * FROM contacts WHERE group_name = ? ORDER BY name", (group_name,))
else:
cursor.execute("SELECT * FROM contacts ORDER BY group_name, name")
contacts = []
for row in cursor.fetchall():
contacts.append({
'id': row[0], 'name': row[1], 'email': row[2],
'group_name': row[3],
'custom_fields': json.loads(row[4]) if row[4] else {},
'added_date': row[5]
})
return contacts
def get_contact_groups(self) -> List[str]:
"""Get all contact groups"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT group_name FROM contacts ORDER BY group_name")
return [row[0] for row in cursor.fetchall()]
def log_sent_email(self, account_id: int, template_id: int, recipient_email: str,
recipient_name: str, subject: str, success: bool,
error_message: str = None):
"""Log sent email"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO sent_emails (account_id, template_id, recipient_email,
recipient_name, subject, sent_date, success, error_message)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (account_id, template_id, recipient_email, recipient_name, subject,
datetime.datetime.now().isoformat(), success, error_message))
conn.commit()
class EmailSender:
"""Main email sending class"""
def __init__(self, database: EmailDatabase):
self.db = database
self.common_providers = {
'gmail.com': {'smtp': 'smtp.gmail.com', 'port': 587, 'imap': 'imap.gmail.com'},
'outlook.com': {'smtp': 'smtp-mail.outlook.com', 'port': 587, 'imap': 'outlook.office365.com'},
'hotmail.com': {'smtp': 'smtp-mail.outlook.com', 'port': 587, 'imap': 'outlook.office365.com'},
'yahoo.com': {'smtp': 'smtp.mail.yahoo.com', 'port': 587, 'imap': 'imap.mail.yahoo.com'},
'icloud.com': {'smtp': 'smtp.mail.me.com', 'port': 587, 'imap': 'imap.mail.me.com'},
}
def get_smtp_settings(self, email: str) -> Dict:
"""Get SMTP settings for common email providers"""
domain = email.split('@')[1].lower()
return self.common_providers.get(domain, {})
def test_connection(self, account: EmailAccount, password: str) -> Tuple[bool, str]:
"""Test email account connection"""
try:
# Create SMTP connection
if account.smtp_port == 465:
server = smtplib.SMTP_SSL(account.smtp_server, account.smtp_port)
else:
server = smtplib.SMTP(account.smtp_server, account.smtp_port)
if account.use_tls:
server.starttls()
# Login
server.login(account.email, password)
server.quit()
return True, "Connection successful"
except smtplib.SMTPAuthenticationError:
return False, "Authentication failed - check email and password"
except smtplib.SMTPConnectError:
return False, f"Cannot connect to SMTP server {account.smtp_server}:{account.smtp_port}"
except socket.gaierror:
return False, f"Invalid SMTP server address: {account.smtp_server}"
except Exception as e:
return False, f"Connection error: {str(e)}"
def send_email(self, account: EmailAccount, password: str, to_email: str,
subject: str, body: str, is_html: bool = False,
attachments: List[str] = None, to_name: str = "") -> Tuple[bool, str]:
"""Send a single email"""
try:
# Create message
msg = MIMEMultipart('alternative' if is_html else 'mixed')
msg['From'] = account.email
msg['To'] = to_email
msg['Subject'] = subject
# Add body
if is_html:
msg.attach(MIMEText(body, 'html'))
else:
msg.attach(MIMEText(body, 'plain'))
# Add attachments
if attachments:
for file_path in attachments:
if Path(file_path).exists():
with open(file_path, "rb") as attachment:
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename= {Path(file_path).name}'
)
msg.attach(part)
# Create SMTP connection and send
if account.smtp_port == 465:
server = smtplib.SMTP_SSL(account.smtp_server, account.smtp_port)
else:
server = smtplib.SMTP(account.smtp_server, account.smtp_port)
if account.use_tls:
server.starttls()
server.login(account.email, password)
server.send_message(msg)
server.quit()
return True, "Email sent successfully"
except Exception as e:
return False, f"Failed to send email: {str(e)}"
def send_bulk_emails(self, account: EmailAccount, password: str,
recipients: List[Dict], template: EmailTemplate,
attachments: List[str] = None,
progress_callback=None) -> Dict:
"""Send bulk emails with template"""
results = {'sent': 0, 'failed': 0, 'errors': []}
for i, recipient in enumerate(recipients):
try:
# Replace variables in template
subject = self.replace_variables(template.subject, recipient)
body = self.replace_variables(template.body, recipient)
# Send email
success, message = self.send_email(
account, password, recipient['email'], subject, body,
template.is_html, attachments, recipient.get('name', '')
)
if success:
results['sent'] += 1
else:
results['failed'] += 1
results['errors'].append(f"{recipient['email']}: {message}")
# Call progress callback
if progress_callback:
progress = (i + 1) / len(recipients) * 100
progress_callback(progress, recipient['email'], success)
# Small delay to avoid being flagged as spam
time.sleep(1)
except Exception as e:
results['failed'] += 1
results['errors'].append(f"{recipient['email']}: {str(e)}")
return results
def replace_variables(self, text: str, data: Dict) -> str:
"""Replace variables in template text"""
for key, value in data.items():
placeholder = f"{{{key}}}"
text = text.replace(placeholder, str(value))
return text
def parse_csv_contacts(self, csv_file_path: str) -> List[Dict]:
"""Parse contacts from CSV file"""
contacts = []
try:
with open(csv_file_path, 'r', newline='', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
# Clean up the row
clean_row = {k.strip(): v.strip() for k, v in row.items() if v.strip()}
# Ensure email field exists
if 'email' in clean_row and clean_row['email']:
if 'name' not in clean_row:
clean_row['name'] = clean_row['email'].split('@')[0]
contacts.append(clean_row)
except Exception as e:
logger.error(f"Error parsing CSV: {e}")
return contacts
class EmailScheduler:
"""Email scheduling functionality"""
def __init__(self, email_sender: EmailSender):
self.email_sender = email_sender
self.running = False
self.scheduler_thread = None
def start_scheduler(self):
"""Start the email scheduler"""
if not self.running:
self.running = True
self.scheduler_thread = threading.Thread(target=self._run_scheduler, daemon=True)
self.scheduler_thread.start()
def stop_scheduler(self):
"""Stop the email scheduler"""
self.running = False
if self.scheduler_thread:
self.scheduler_thread.join(timeout=5)
def _run_scheduler(self):
"""Run the scheduler loop"""
while self.running:
schedule.run_pending()
time.sleep(60) # Check every minute
def schedule_email(self, account_id: int, template_id: int,
recipient_group: str, schedule_time: str,
repeat_type: str = "none"):
"""Schedule an email"""
# Implementation would depend on specific scheduling requirements
pass
class EmailGUI:
"""GUI for email sender application"""
def __init__(self):
self.db = EmailDatabase()
self.email_sender = EmailSender(self.db)
self.scheduler = EmailScheduler(self.email_sender)
self.root = tk.Tk()
self.root.title("Automated Email Sender")
self.root.geometry("1000x700")
self.current_account = None
self.current_password = None
self.setup_ui()
self.refresh_data()
def setup_ui(self):
"""Setup the user interface"""
# Create notebook for tabs
self.notebook = ttk.Notebook(self.root)
self.notebook.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# Accounts tab
self.accounts_frame = ttk.Frame(self.notebook)
self.notebook.add(self.accounts_frame, text="Email Accounts")
self.setup_accounts_tab()
# Templates tab
self.templates_frame = ttk.Frame(self.notebook)
self.notebook.add(self.templates_frame, text="Templates")
self.setup_templates_tab()
# Contacts tab
self.contacts_frame = ttk.Frame(self.notebook)
self.notebook.add(self.contacts_frame, text="Contacts")
self.setup_contacts_tab()
# Send Email tab
self.send_frame = ttk.Frame(self.notebook)
self.notebook.add(self.send_frame, text="Send Email")
self.setup_send_tab()
# History tab
self.history_frame = ttk.Frame(self.notebook)
self.notebook.add(self.history_frame, text="History")
self.setup_history_tab()
def setup_accounts_tab(self):
"""Setup email accounts tab"""
main_frame = ttk.Frame(self.accounts_frame, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# Account list
list_frame = ttk.LabelFrame(main_frame, text="Email Accounts", padding="5")
list_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
# Treeview for accounts
self.accounts_tree = ttk.Treeview(list_frame, columns=('Email', 'SMTP Server', 'Port'),
show='tree headings', height=8)
self.accounts_tree.heading('#0', text='Account Name')
self.accounts_tree.heading('Email', text='Email')
self.accounts_tree.heading('SMTP Server', text='SMTP Server')
self.accounts_tree.heading('Port', text='Port')
self.accounts_tree.pack(fill=tk.BOTH, expand=True, padx=(0, 5))
# Scrollbar for accounts tree
accounts_scrollbar = ttk.Scrollbar(list_frame, orient=tk.VERTICAL, command=self.accounts_tree.yview)
accounts_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.accounts_tree.configure(yscrollcommand=accounts_scrollbar.set)
# Add account form
form_frame = ttk.LabelFrame(main_frame, text="Add/Edit Account", padding="10")
form_frame.pack(fill=tk.X)
# Form fields
ttk.Label(form_frame, text="Account Name:").grid(row=0, column=0, sticky=tk.W, pady=2)
self.account_name_var = tk.StringVar()
ttk.Entry(form_frame, textvariable=self.account_name_var, width=25).grid(row=0, column=1, padx=5, pady=2, sticky=tk.W)
ttk.Label(form_frame, text="Email:").grid(row=0, column=2, sticky=tk.W, pady=2)
self.account_email_var = tk.StringVar()
email_entry = ttk.Entry(form_frame, textvariable=self.account_email_var, width=30)
email_entry.grid(row=0, column=3, padx=5, pady=2, sticky=tk.W)
email_entry.bind('<FocusOut>', self.auto_detect_smtp)
ttk.Label(form_frame, text="SMTP Server:").grid(row=1, column=0, sticky=tk.W, pady=2)
self.smtp_server_var = tk.StringVar()
ttk.Entry(form_frame, textvariable=self.smtp_server_var, width=25).grid(row=1, column=1, padx=5, pady=2, sticky=tk.W)
ttk.Label(form_frame, text="SMTP Port:").grid(row=1, column=2, sticky=tk.W, pady=2)
self.smtp_port_var = tk.StringVar(value="587")
ttk.Entry(form_frame, textvariable=self.smtp_port_var, width=10).grid(row=1, column=3, padx=5, pady=2, sticky=tk.W)
# Buttons
button_frame = ttk.Frame(form_frame)
button_frame.grid(row=2, column=0, columnspan=4, pady=10)
ttk.Button(button_frame, text="Add Account", command=self.add_account).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="Test Connection", command=self.test_account_connection).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="Clear", command=self.clear_account_form).pack(side=tk.LEFT, padx=5)
def setup_templates_tab(self):
"""Setup email templates tab"""
main_frame = ttk.Frame(self.templates_frame, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# Template list
list_frame = ttk.LabelFrame(main_frame, text="Email Templates", padding="5")
list_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
self.templates_tree = ttk.Treeview(list_frame, columns=('Subject', 'Type'),
show='tree headings', height=6)
self.templates_tree.heading('#0', text='Template Name')
self.templates_tree.heading('Subject', text='Subject')
self.templates_tree.heading('Type', text='Type')
self.templates_tree.pack(fill=tk.BOTH, expand=True)
self.templates_tree.bind('<Double-1>', self.load_template)
# Template form
form_frame = ttk.LabelFrame(main_frame, text="Create/Edit Template", padding="10")
form_frame.pack(fill=tk.X)
# Template name and subject
top_frame = ttk.Frame(form_frame)
top_frame.pack(fill=tk.X, pady=(0, 5))
ttk.Label(top_frame, text="Name:").pack(side=tk.LEFT)
self.template_name_var = tk.StringVar()
ttk.Entry(top_frame, textvariable=self.template_name_var, width=20).pack(side=tk.LEFT, padx=5)
ttk.Label(top_frame, text="Subject:").pack(side=tk.LEFT, padx=(20, 0))
self.template_subject_var = tk.StringVar()
ttk.Entry(top_frame, textvariable=self.template_subject_var, width=40).pack(side=tk.LEFT, padx=5)
self.template_html_var = tk.BooleanVar()
ttk.Checkbutton(top_frame, text="HTML", variable=self.template_html_var).pack(side=tk.LEFT, padx=10)
# Template body
ttk.Label(form_frame, text="Body:").pack(anchor=tk.W)
self.template_body = scrolledtext.ScrolledText(form_frame, height=8, wrap=tk.WORD)
self.template_body.pack(fill=tk.X, pady=5)
# Variables info
variables_info = "Use variables like {name}, {email}, {company} in your template"
ttk.Label(form_frame, text=variables_info, font=("TkDefaultFont", 8)).pack(anchor=tk.W)
# Template buttons
template_buttons = ttk.Frame(form_frame)
template_buttons.pack(fill=tk.X, pady=5)
ttk.Button(template_buttons, text="Save Template", command=self.save_template).pack(side=tk.LEFT, padx=5)
ttk.Button(template_buttons, text="Clear", command=self.clear_template_form).pack(side=tk.LEFT, padx=5)
def setup_contacts_tab(self):
"""Setup contacts tab"""
main_frame = ttk.Frame(self.contacts_frame, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# Contacts list
list_frame = ttk.LabelFrame(main_frame, text="Contacts", padding="5")
list_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
self.contacts_tree = ttk.Treeview(list_frame, columns=('Email', 'Group'),
show='tree headings', height=8)
self.contacts_tree.heading('#0', text='Name')
self.contacts_tree.heading('Email', text='Email')
self.contacts_tree.heading('Group', text='Group')
self.contacts_tree.pack(fill=tk.BOTH, expand=True)
# Contact management
mgmt_frame = ttk.Frame(main_frame)
mgmt_frame.pack(fill=tk.X)
# Import from CSV
import_frame = ttk.LabelFrame(mgmt_frame, text="Import Contacts", padding="5")
import_frame.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 5))
ttk.Button(import_frame, text="Import from CSV", command=self.import_contacts_csv).pack(side=tk.LEFT, padx=5)
ttk.Label(import_frame, text="Group:").pack(side=tk.LEFT, padx=(20, 5))
self.import_group_var = tk.StringVar(value="default")
ttk.Entry(import_frame, textvariable=self.import_group_var, width=15).pack(side=tk.LEFT)
# Add contact manually
add_frame = ttk.LabelFrame(mgmt_frame, text="Add Contact", padding="5")
add_frame.pack(side=tk.RIGHT, fill=tk.X, expand=True, padx=(5, 0))
contact_fields = ttk.Frame(add_frame)
contact_fields.pack(fill=tk.X)
ttk.Label(contact_fields, text="Name:").grid(row=0, column=0, sticky=tk.W)
self.contact_name_var = tk.StringVar()
ttk.Entry(contact_fields, textvariable=self.contact_name_var, width=15).grid(row=0, column=1, padx=2)
ttk.Label(contact_fields, text="Email:").grid(row=0, column=2, sticky=tk.W, padx=(10, 0))
self.contact_email_var = tk.StringVar()
ttk.Entry(contact_fields, textvariable=self.contact_email_var, width=20).grid(row=0, column=3, padx=2)
ttk.Button(add_frame, text="Add Contact", command=self.add_contact).pack(pady=5)
def setup_send_tab(self):
"""Setup send email tab"""
main_frame = ttk.Frame(self.send_frame, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# Send options
options_frame = ttk.LabelFrame(main_frame, text="Send Options", padding="10")
options_frame.pack(fill=tk.X, pady=(0, 10))
# Account selection
ttk.Label(options_frame, text="From Account:").grid(row=0, column=0, sticky=tk.W, pady=2)
self.send_account_var = tk.StringVar()
self.send_account_combo = ttk.Combobox(options_frame, textvariable=self.send_account_var, width=30)
self.send_account_combo.grid(row=0, column=1, padx=5, pady=2, sticky=tk.W)
ttk.Button(options_frame, text="Set Password", command=self.set_account_password).grid(row=0, column=2, padx=10)
# Template selection
ttk.Label(options_frame, text="Template:").grid(row=1, column=0, sticky=tk.W, pady=2)
self.send_template_var = tk.StringVar()
self.send_template_combo = ttk.Combobox(options_frame, textvariable=self.send_template_var, width=30)
self.send_template_combo.grid(row=1, column=1, padx=5, pady=2, sticky=tk.W)
# Recipient selection
ttk.Label(options_frame, text="Recipients:").grid(row=2, column=0, sticky=tk.W, pady=2)
self.recipient_type_var = tk.StringVar(value="group")
type_frame = ttk.Frame(options_frame)
type_frame.grid(row=2, column=1, columnspan=2, padx=5, pady=2, sticky=tk.W)
ttk.Radiobutton(type_frame, text="Group", variable=self.recipient_type_var,
value="group", command=self.update_recipient_options).pack(side=tk.LEFT)
ttk.Radiobutton(type_frame, text="All Contacts", variable=self.recipient_type_var,
value="all", command=self.update_recipient_options).pack(side=tk.LEFT, padx=10)
ttk.Radiobutton(type_frame, text="Single Email", variable=self.recipient_type_var,
value="single", command=self.update_recipient_options).pack(side=tk.LEFT, padx=10)
# Recipient details
self.recipient_frame = ttk.Frame(options_frame)
self.recipient_frame.grid(row=3, column=0, columnspan=3, sticky=tk.W, pady=5)
self.recipient_group_var = tk.StringVar()
self.recipient_email_var = tk.StringVar()
self.update_recipient_options()
# Attachments
attach_frame = ttk.LabelFrame(main_frame, text="Attachments", padding="5")
attach_frame.pack(fill=tk.X, pady=(0, 10))
self.attachments_list = tk.Listbox(attach_frame, height=3)
self.attachments_list.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5))
attach_buttons = ttk.Frame(attach_frame)
attach_buttons.pack(side=tk.RIGHT, fill=tk.Y)
ttk.Button(attach_buttons, text="Add File", command=self.add_attachment).pack(pady=2)
ttk.Button(attach_buttons, text="Remove", command=self.remove_attachment).pack(pady=2)
# Send button and progress
send_frame = ttk.Frame(main_frame)
send_frame.pack(fill=tk.X, pady=10)
ttk.Button(send_frame, text="Send Emails", command=self.send_emails,
style="Accent.TButton").pack(side=tk.LEFT, padx=5)
self.progress_var = tk.StringVar(value="Ready to send")
ttk.Label(send_frame, textvariable=self.progress_var).pack(side=tk.LEFT, padx=20)
self.progress_bar = ttk.Progressbar(send_frame, length=200, mode='determinate')
self.progress_bar.pack(side=tk.RIGHT, padx=5)
def setup_history_tab(self):
"""Setup email history tab"""
main_frame = ttk.Frame(self.history_frame, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# History list
self.history_tree = ttk.Treeview(main_frame, columns=('Recipient', 'Subject', 'Date', 'Status'),
show='tree headings')
self.history_tree.heading('#0', text='ID')
self.history_tree.heading('Recipient', text='Recipient')
self.history_tree.heading('Subject', text='Subject')
self.history_tree.heading('Date', text='Date')
self.history_tree.heading('Status', text='Status')
self.history_tree.pack(fill=tk.BOTH, expand=True)
ttk.Button(main_frame, text="Refresh History", command=self.refresh_history).pack(pady=10)
def auto_detect_smtp(self, event=None):
"""Auto-detect SMTP settings based on email"""
email = self.account_email_var.get()
if '@' in email:
settings = self.email_sender.get_smtp_settings(email)
if settings:
self.smtp_server_var.set(settings.get('smtp', ''))
self.smtp_port_var.set(str(settings.get('port', 587)))
def add_account(self):
"""Add new email account"""
name = self.account_name_var.get().strip()
email = self.account_email_var.get().strip()
smtp_server = self.smtp_server_var.get().strip()
try:
smtp_port = int(self.smtp_port_var.get())
except ValueError:
messagebox.showerror("Error", "Invalid port number")
return
if not all([name, email, smtp_server]):
messagebox.showerror("Error", "Please fill in all required fields")
return
if self.db.add_email_account(name, email, smtp_server, smtp_port):
messagebox.showinfo("Success", "Email account added successfully")
self.clear_account_form()
self.refresh_accounts()
else:
messagebox.showerror("Error", "Account name already exists")
def test_account_connection(self):
"""Test email account connection"""
email = self.account_email_var.get().strip()
smtp_server = self.smtp_server_var.get().strip()
try:
smtp_port = int(self.smtp_port_var.get())
except ValueError:
messagebox.showerror("Error", "Invalid port number")
return
if not all([email, smtp_server]):
messagebox.showerror("Error", "Please enter email and SMTP server")
return
# Get password
password = simpledialog.askstring("Password", f"Enter password for {email}:", show='*')
if not password:
return
account = EmailAccount(email, password, smtp_server, smtp_port)
success, message = self.email_sender.test_connection(account, password)
if success:
messagebox.showinfo("Success", message)
else:
messagebox.showerror("Error", message)
def clear_account_form(self):
"""Clear account form"""
self.account_name_var.set("")
self.account_email_var.set("")
self.smtp_server_var.set("")
self.smtp_port_var.set("587")
def save_template(self):
"""Save email template"""
name = self.template_name_var.get().strip()
subject = self.template_subject_var.get().strip()
body = self.template_body.get(1.0, tk.END).strip()
is_html = self.template_html_var.get()
if not all([name, subject, body]):
messagebox.showerror("Error", "Please fill in all fields")
return
# Extract variables from template
variables = re.findall(r'\{(\w+)\}', subject + ' ' + body)
variables = list(set(variables)) # Remove duplicates
if self.db.add_template(name, subject, body, is_html, variables):
messagebox.showinfo("Success", f"Template saved with variables: {', '.join(variables)}")
self.clear_template_form()
self.refresh_templates()
else:
messagebox.showerror("Error", "Template name already exists")
def load_template(self, event):
"""Load selected template"""
selection = self.templates_tree.selection()
if selection:
item = self.templates_tree.item(selection[0])
template_name = item['text']
templates = self.db.get_templates()
for template in templates:
if template['name'] == template_name:
self.template_name_var.set(template['name'])
self.template_subject_var.set(template['subject'])
self.template_body.delete(1.0, tk.END)
self.template_body.insert(1.0, template['body'])
self.template_html_var.set(template['is_html'])
break
def clear_template_form(self):
"""Clear template form"""
self.template_name_var.set("")
self.template_subject_var.set("")
self.template_body.delete(1.0, tk.END)
self.template_html_var.set(False)
def import_contacts_csv(self):
"""Import contacts from CSV file"""
file_path = filedialog.askopenfilename(
title="Select CSV file",
filetypes=[("CSV files", "*.csv"), ("All files", "*.*")]
)
if file_path:
contacts = self.email_sender.parse_csv_contacts(file_path)
group_name = self.import_group_var.get().strip() or "default"
added_count = 0
for contact in contacts:
if self.db.add_contact(contact.get('name', ''), contact['email'],
group_name, contact):
added_count += 1
messagebox.showinfo("Import Complete",
f"Added {added_count} contacts to group '{group_name}'")
self.refresh_contacts()
def add_contact(self):
"""Add single contact"""
name = self.contact_name_var.get().strip()
email = self.contact_email_var.get().strip()
if not email:
messagebox.showerror("Error", "Please enter an email address")
return
if not name:
name = email.split('@')[0]
if self.db.add_contact(name, email, "default"):
messagebox.showinfo("Success", "Contact added successfully")
self.contact_name_var.set("")
self.contact_email_var.set("")
self.refresh_contacts()
else:
messagebox.showerror("Error", "Contact already exists")
def update_recipient_options(self):
"""Update recipient selection options"""
for widget in self.recipient_frame.winfo_children():
widget.destroy()
if self.recipient_type_var.get() == "group":
ttk.Label(self.recipient_frame, text="Group:").pack(side=tk.LEFT)
group_combo = ttk.Combobox(self.recipient_frame, textvariable=self.recipient_group_var, width=20)
group_combo.pack(side=tk.LEFT, padx=5)
# Update group options
groups = self.db.get_contact_groups()
group_combo['values'] = groups
if groups:
self.recipient_group_var.set(groups[0])
elif self.recipient_type_var.get() == "single":
ttk.Label(self.recipient_frame, text="Email:").pack(side=tk.LEFT)
ttk.Entry(self.recipient_frame, textvariable=self.recipient_email_var, width=30).pack(side=tk.LEFT, padx=5)
def set_account_password(self):
"""Set password for selected account"""
account_name = self.send_account_var.get()
if not account_name:
messagebox.showerror("Error", "Please select an account")
return
password = simpledialog.askstring("Password", f"Enter password for {account_name}:", show='*')
if password:
self.current_password = password
messagebox.showinfo("Success", "Password set successfully")
def add_attachment(self):
"""Add attachment file"""
file_path = filedialog.askopenfilename(title="Select attachment")
if file_path:
self.attachments_list.insert(tk.END, file_path)
def remove_attachment(self):
"""Remove selected attachment"""
selection = self.attachments_list.curselection()
if selection:
self.attachments_list.delete(selection[0])
def send_emails(self):
"""Send emails"""
if not self.current_account or not self.current_password:
messagebox.showerror("Error", "Please select account and set password")
return
template_name = self.send_template_var.get()
if not template_name:
messagebox.showerror("Error", "Please select a template")
return
# Get template
templates = self.db.get_templates()
template = None
for t in templates:
if t['name'] == template_name:
template = EmailTemplate(t['name'], t['subject'], t['body'],
t['is_html'], t['variables'])
break
if not template:
messagebox.showerror("Error", "Template not found")
return
# Get recipients
recipients = []
recipient_type = self.recipient_type_var.get()
if recipient_type == "group":
group_name = self.recipient_group_var.get()
recipients = self.db.get_contacts(group_name)
elif recipient_type == "all":
recipients = self.db.get_contacts()
elif recipient_type == "single":
email = self.recipient_email_var.get().strip()
if email:
recipients = [{'name': email.split('@')[0], 'email': email}]
if not recipients:
messagebox.showerror("Error", "No recipients found")
return
# Get attachments
attachments = [self.attachments_list.get(i) for i in range(self.attachments_list.size())]
# Send emails in separate thread
def send_thread():
def progress_callback(progress, email, success):
self.root.after(0, lambda: self.update_send_progress(progress, email, success))
results = self.email_sender.send_bulk_emails(
self.current_account, self.current_password, recipients,
template, attachments, progress_callback
)
self.root.after(0, lambda: self.send_complete(results))
threading.Thread(target=send_thread, daemon=True).start()
# Reset progress
self.progress_bar['value'] = 0
self.progress_var.set("Sending emails...")
def update_send_progress(self, progress, email, success):
"""Update sending progress"""
self.progress_bar['value'] = progress
status = "✓" if success else "✗"
self.progress_var.set(f"Sending to {email} {status}")
self.root.update_idletasks()
def send_complete(self, results):
"""Handle send completion"""
self.progress_bar['value'] = 100
self.progress_var.set(f"Complete: {results['sent']} sent, {results['failed']} failed")
message = f"Email sending complete:\n\nSent: {results['sent']}\nFailed: {results['failed']}"
if results['errors']:
message += f"\n\nErrors:\n" + "\n".join(results['errors'][:5])
if len(results['errors']) > 5:
message += f"\n... and {len(results['errors']) - 5} more errors"
messagebox.showinfo("Send Complete", message)
self.refresh_history()
def refresh_data(self):
"""Refresh all data"""
self.refresh_accounts()
self.refresh_templates()
self.refresh_contacts()
def refresh_accounts(self):
"""Refresh accounts list"""
self.accounts_tree.delete(*self.accounts_tree.get_children())
accounts = self.db.get_email_accounts()
account_names = []
for account in accounts:
self.accounts_tree.insert('', tk.END, text=account['name'],
values=(account['email'], account['smtp_server'], account['smtp_port']))
account_names.append(account['name'])
# Update send account combo
self.send_account_combo['values'] = account_names
if account_names and not self.send_account_var.get():
self.send_account_var.set(account_names[0])
# Set current account
for account in accounts:
if account['name'] == account_names[0]:
self.current_account = EmailAccount(
account['email'], "", account['smtp_server'],
account['smtp_port'], account.get('imap_server', ''),
account['imap_port'], account['use_tls']
)
break
def refresh_templates(self):
"""Refresh templates list"""
self.templates_tree.delete(*self.templates_tree.get_children())
templates = self.db.get_templates()
template_names = []
for template in templates:
template_type = "HTML" if template['is_html'] else "Text"
self.templates_tree.insert('', tk.END, text=template['name'],
values=(template['subject'], template_type))
template_names.append(template['name'])
# Update send template combo
self.send_template_combo['values'] = template_names
def refresh_contacts(self):
"""Refresh contacts list"""
self.contacts_tree.delete(*self.contacts_tree.get_children())
contacts = self.db.get_contacts()
for contact in contacts:
self.contacts_tree.insert('', tk.END, text=contact['name'],
values=(contact['email'], contact['group_name']))
def refresh_history(self):
"""Refresh email history"""
# This would query the sent_emails table
pass
def run(self):
"""Run the application"""
self.root.mainloop()
def main():
"""Main function"""
app = EmailGUI()
app.run()
if __name__ == "__main__":
main()
- Save the file.
- Run the following command to run the application.
C:\Users\username\Documents\automatedEmailSender> python automatedemailsender.py
Email System Starting...
========================
✓ Database initialized
✓ GUI loaded
✓ SMTP configurations ready
✓ Template system active
Email Automation System Ready!
C:\Users\username\Documents\automatedEmailSender> python automatedemailsender.py
Email System Starting...
========================
✓ Database initialized
✓ GUI loaded
✓ SMTP configurations ready
✓ Template system active
Email Automation System Ready!
Explanation
- The
EmailDatabase
EmailDatabase
class manages SQLite database operations for contacts, templates, and accounts. - The
EmailSender
EmailSender
class handles SMTP connections and email sending functionality. - The
EmailScheduler
EmailScheduler
class provides automated scheduling and recurring email capabilities. - The
EmailGUI
EmailGUI
class creates a comprehensive Tkinter interface for email management. - Database tables store email accounts, templates, contacts, and sending history.
- Template system supports variable replacement and HTML/plain text formats.
- Contact management includes grouping and CSV import functionality.
- Bulk sending with progress tracking and error handling.
- SMTP auto-detection for common email providers.
- Attachment handling with file validation and size limits.
- Email analytics and delivery tracking.
- Security features including password encryption and authentication.
Next Steps
Congratulations! You have successfully created an Automated Email Sender in Python. Experiment with the code and see if you can modify the application. Here are a few suggestions:
- Add email template editor with rich text formatting
- Implement email bounce handling and unsubscribe management
- Create email campaign analytics and reporting
- Add integration with popular email services (SendGrid, Mailchimp)
- Implement A/B testing for email campaigns
- Create email list segmentation and targeting
- Add email tracking pixels and click analytics
- Implement email scheduling with time zones
- Create backup and restore functionality for email data
Conclusion
In this project, you learned how to create an Automated Email Sender in Python using advanced email automation concepts. You also learned about SMTP protocols, database management, contact organization, template systems, and implementing professional email marketing solutions. You can find the source code on GitHub
How It Works
1. Database Architecture
class EmailDatabase:
def __init__(self, db_path: str = "email_system.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
with sqlite3.connect(self.db_path) as conn:
# Create accounts table
conn.execute('''
CREATE TABLE IF NOT EXISTS accounts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
smtp_server TEXT NOT NULL,
smtp_port INTEGER NOT NULL
)
''')
class EmailDatabase:
def __init__(self, db_path: str = "email_system.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
with sqlite3.connect(self.db_path) as conn:
# Create accounts table
conn.execute('''
CREATE TABLE IF NOT EXISTS accounts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
smtp_server TEXT NOT NULL,
smtp_port INTEGER NOT NULL
)
''')
The database system includes:
- Account Management: Store SMTP configurations and credentials
- Contact Organization: Group contacts with custom fields
- Template System: Save reusable email templates
- Analytics Tracking: Monitor email delivery and engagement
- History Logging: Track all sent emails and results
2. Email Sending Engine
class EmailSender:
def __init__(self, database: EmailDatabase):
self.db = database
def send_email(self, account: EmailAccount, password: str, to_email: str,
subject: str, body: str, is_html: bool = False,
attachments: List[str] = None, to_name: str = "") -> Tuple[bool, str]:
try:
msg = MIMEMultipart()
msg['From'] = account.email
msg['To'] = to_email
msg['Subject'] = subject
class EmailSender:
def __init__(self, database: EmailDatabase):
self.db = database
def send_email(self, account: EmailAccount, password: str, to_email: str,
subject: str, body: str, is_html: bool = False,
attachments: List[str] = None, to_name: str = "") -> Tuple[bool, str]:
try:
msg = MIMEMultipart()
msg['From'] = account.email
msg['To'] = to_email
msg['Subject'] = subject
3. Template Management
@dataclass
class EmailTemplate:
name: str
subject: str
body: str
is_html: bool = False
variables: List[str] = None
def replace_variables(self, text: str, data: Dict) -> str:
for key, value in data.items():
placeholder = f"{{{key}}}"
text = text.replace(placeholder, str(value))
return text
@dataclass
class EmailTemplate:
name: str
subject: str
body: str
is_html: bool = False
variables: List[str] = None
def replace_variables(self, text: str, data: Dict) -> str:
for key, value in data.items():
placeholder = f"{{{key}}}"
text = text.replace(placeholder, str(value))
return text
4. Bulk Email Processing
def send_bulk_emails(self, account: EmailAccount, password: str,
recipients: List[Dict], template: EmailTemplate,
attachments: List[str] = None,
progress_callback=None) -> Dict:
results = {'sent': 0, 'failed': 0, 'errors': []}
for i, recipient in enumerate(recipients):
try:
personalized_subject = self.replace_variables(template.subject, recipient)
personalized_body = self.replace_variables(template.body, recipient)
def send_bulk_emails(self, account: EmailAccount, password: str,
recipients: List[Dict], template: EmailTemplate,
attachments: List[str] = None,
progress_callback=None) -> Dict:
results = {'sent': 0, 'failed': 0, 'errors': []}
for i, recipient in enumerate(recipients):
try:
personalized_subject = self.replace_variables(template.subject, recipient)
personalized_body = self.replace_variables(template.body, recipient)
5. GUI Implementation
class EmailGUI:
def __init__(self):
self.root = tk.Tk()
self.root.title("Email Automation System")
self.root.geometry("1200x800")
self.db = EmailDatabase()
self.email_sender = EmailSender(self.db)
self.scheduler = EmailScheduler(self.email_sender)
self.setup_ui()
class EmailGUI:
def __init__(self):
self.root = tk.Tk()
self.root.title("Email Automation System")
self.root.geometry("1200x800")
self.db = EmailDatabase()
self.email_sender = EmailSender(self.db)
self.scheduler = EmailScheduler(self.email_sender)
self.setup_ui()
Email Account Configuration
1. SMTP Settings for Popular Providers
def get_smtp_settings(self, email: str) -> Dict:
"""Auto-detect SMTP settings for common providers"""
domain = email.split('@')[1].lower()
smtp_configs = {
'gmail.com': {
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'use_tls': True,
'imap_server': 'imap.gmail.com',
'imap_port': 993
},
'outlook.com': {
'smtp_server': 'smtp-mail.outlook.com',
'smtp_port': 587,
'use_tls': True,
'imap_server': 'outlook.office365.com',
'imap_port': 993
},
'yahoo.com': {
'smtp_server': 'smtp.mail.yahoo.com',
'smtp_port': 587,
'use_tls': True,
'imap_server': 'imap.mail.yahoo.com',
'imap_port': 993
},
'hotmail.com': {
'smtp_server': 'smtp-mail.outlook.com',
'smtp_port': 587,
'use_tls': True,
'imap_server': 'outlook.office365.com',
'imap_port': 993
}
}
return smtp_configs.get(domain, {
'smtp_server': '',
'smtp_port': 587,
'use_tls': True
})
def get_smtp_settings(self, email: str) -> Dict:
"""Auto-detect SMTP settings for common providers"""
domain = email.split('@')[1].lower()
smtp_configs = {
'gmail.com': {
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'use_tls': True,
'imap_server': 'imap.gmail.com',
'imap_port': 993
},
'outlook.com': {
'smtp_server': 'smtp-mail.outlook.com',
'smtp_port': 587,
'use_tls': True,
'imap_server': 'outlook.office365.com',
'imap_port': 993
},
'yahoo.com': {
'smtp_server': 'smtp.mail.yahoo.com',
'smtp_port': 587,
'use_tls': True,
'imap_server': 'imap.mail.yahoo.com',
'imap_port': 993
},
'hotmail.com': {
'smtp_server': 'smtp-mail.outlook.com',
'smtp_port': 587,
'use_tls': True,
'imap_server': 'outlook.office365.com',
'imap_port': 993
}
}
return smtp_configs.get(domain, {
'smtp_server': '',
'smtp_port': 587,
'use_tls': True
})
2. Connection Testing
def test_connection(self, account: EmailAccount, password: str) -> Tuple[bool, str]:
"""Test SMTP connection with provided credentials"""
try:
server = smtplib.SMTP(account.smtp_server, account.smtp_port)
server.starttls() if account.use_tls else None
server.login(account.email, password)
server.quit()
return True, "Connection successful"
except smtplib.SMTPAuthenticationError:
return False, "Authentication failed. Check email/password"
except smtplib.SMTPServerDisconnected:
return False, "Server connection failed"
except Exception as e:
return False, f"Connection error: {str(e)}"
def test_connection(self, account: EmailAccount, password: str) -> Tuple[bool, str]:
"""Test SMTP connection with provided credentials"""
try:
server = smtplib.SMTP(account.smtp_server, account.smtp_port)
server.starttls() if account.use_tls else None
server.login(account.email, password)
server.quit()
return True, "Connection successful"
except smtplib.SMTPAuthenticationError:
return False, "Authentication failed. Check email/password"
except smtplib.SMTPServerDisconnected:
return False, "Server connection failed"
except Exception as e:
return False, f"Connection error: {str(e)}"
Template System Features
1. Variable Replacement Engine
def replace_variables(self, text: str, data: Dict) -> str:
"""Advanced variable replacement with multiple formats"""
import re
# Replace {variable} format
for key, value in data.items():
patterns = [
f"{{{key}}}", # {name}
f"{{{{ {key} }}}}", # {{ name }}
f"[{key}]", # [name]
f"%{key}%" # %name%
]
for pattern in patterns:
text = text.replace(pattern, str(value))
# Replace conditional blocks
# Example: {if:premium}Premium content{endif}
conditional_pattern = r'\{if:(\w+)\}(.*?)\{endif\}'
def replace_conditional(match):
condition = match.group(1)
content = match.group(2)
return content if data.get(condition) else ""
text = re.sub(conditional_pattern, replace_conditional, text, flags=re.DOTALL)
return text
def replace_variables(self, text: str, data: Dict) -> str:
"""Advanced variable replacement with multiple formats"""
import re
# Replace {variable} format
for key, value in data.items():
patterns = [
f"{{{key}}}", # {name}
f"{{{{ {key} }}}}", # {{ name }}
f"[{key}]", # [name]
f"%{key}%" # %name%
]
for pattern in patterns:
text = text.replace(pattern, str(value))
# Replace conditional blocks
# Example: {if:premium}Premium content{endif}
conditional_pattern = r'\{if:(\w+)\}(.*?)\{endif\}'
def replace_conditional(match):
condition = match.group(1)
content = match.group(2)
return content if data.get(condition) else ""
text = re.sub(conditional_pattern, replace_conditional, text, flags=re.DOTALL)
return text
2. Template Categories
def get_template_categories(self) -> Dict:
"""Predefined template categories and examples"""
return {
'marketing': {
'welcome_series': {
'subject': 'Welcome to {company_name}, {first_name}!',
'body': '''
Hello {first_name},
Welcome to {company_name}! We're excited to have you join our community.
Your account details:
- Email: {email}
- Member ID: {member_id}
- Join Date: {join_date}
Best regards,
{sender_name}
'''
},
'newsletter': {
'subject': '{company_name} Newsletter - {month} {year}',
'body': '''
Hi {first_name},
Here's what's new this month at {company_name}:
{newsletter_content}
Don't miss out on our special offers!
Unsubscribe: {unsubscribe_link}
'''
}
},
'transactional': {
'order_confirmation': {
'subject': 'Order Confirmation #{order_id}',
'body': '''
Dear {customer_name},
Thank you for your order!
Order Details:
- Order ID: {order_id}
- Total: {order_total}
- Estimated Delivery: {delivery_date}
Track your order: {tracking_link}
'''
}
},
'notification': {
'password_reset': {
'subject': 'Password Reset Request',
'body': '''
Hi {username},
You requested a password reset for your account.
Click here to reset: {reset_link}
This link expires in 24 hours.
If you didn't request this, please ignore this email.
'''
}
}
}
def get_template_categories(self) -> Dict:
"""Predefined template categories and examples"""
return {
'marketing': {
'welcome_series': {
'subject': 'Welcome to {company_name}, {first_name}!',
'body': '''
Hello {first_name},
Welcome to {company_name}! We're excited to have you join our community.
Your account details:
- Email: {email}
- Member ID: {member_id}
- Join Date: {join_date}
Best regards,
{sender_name}
'''
},
'newsletter': {
'subject': '{company_name} Newsletter - {month} {year}',
'body': '''
Hi {first_name},
Here's what's new this month at {company_name}:
{newsletter_content}
Don't miss out on our special offers!
Unsubscribe: {unsubscribe_link}
'''
}
},
'transactional': {
'order_confirmation': {
'subject': 'Order Confirmation #{order_id}',
'body': '''
Dear {customer_name},
Thank you for your order!
Order Details:
- Order ID: {order_id}
- Total: {order_total}
- Estimated Delivery: {delivery_date}
Track your order: {tracking_link}
'''
}
},
'notification': {
'password_reset': {
'subject': 'Password Reset Request',
'body': '''
Hi {username},
You requested a password reset for your account.
Click here to reset: {reset_link}
This link expires in 24 hours.
If you didn't request this, please ignore this email.
'''
}
}
}
Contact Management System
1. CSV Import with Validation
def parse_csv_contacts(self, csv_file_path: str) -> List[Dict]:
"""Parse and validate CSV contact file"""
contacts = []
errors = []
try:
with open(csv_file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
required_fields = ['email']
optional_fields = ['first_name', 'last_name', 'company', 'phone']
for row_num, row in enumerate(reader, start=2):
# Validate required fields
if not row.get('email') or not self.validate_email(row['email']):
errors.append(f"Row {row_num}: Invalid email address")
continue
# Clean and format contact data
contact = {
'email': row['email'].strip().lower(),
'first_name': row.get('first_name', '').strip(),
'last_name': row.get('last_name', '').strip(),
'full_name': f"{row.get('first_name', '')} {row.get('last_name', '')}".strip(),
'company': row.get('company', '').strip(),
'phone': row.get('phone', '').strip(),
'import_date': datetime.datetime.now().isoformat()
}
# Add any additional custom fields
for key, value in row.items():
if key not in required_fields + optional_fields:
contact[f'custom_{key}'] = value
contacts.append(contact)
if errors:
print(f"Import completed with {len(errors)} errors:")
for error in errors[:10]: # Show first 10 errors
print(f" - {error}")
except Exception as e:
print(f"Error reading CSV file: {e}")
return contacts
def parse_csv_contacts(self, csv_file_path: str) -> List[Dict]:
"""Parse and validate CSV contact file"""
contacts = []
errors = []
try:
with open(csv_file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
required_fields = ['email']
optional_fields = ['first_name', 'last_name', 'company', 'phone']
for row_num, row in enumerate(reader, start=2):
# Validate required fields
if not row.get('email') or not self.validate_email(row['email']):
errors.append(f"Row {row_num}: Invalid email address")
continue
# Clean and format contact data
contact = {
'email': row['email'].strip().lower(),
'first_name': row.get('first_name', '').strip(),
'last_name': row.get('last_name', '').strip(),
'full_name': f"{row.get('first_name', '')} {row.get('last_name', '')}".strip(),
'company': row.get('company', '').strip(),
'phone': row.get('phone', '').strip(),
'import_date': datetime.datetime.now().isoformat()
}
# Add any additional custom fields
for key, value in row.items():
if key not in required_fields + optional_fields:
contact[f'custom_{key}'] = value
contacts.append(contact)
if errors:
print(f"Import completed with {len(errors)} errors:")
for error in errors[:10]: # Show first 10 errors
print(f" - {error}")
except Exception as e:
print(f"Error reading CSV file: {e}")
return contacts
2. Contact Segmentation
def segment_contacts(self, criteria: Dict) -> List[Dict]:
"""Segment contacts based on criteria"""
with sqlite3.connect(self.db_path) as conn:
query_parts = ["SELECT * FROM contacts WHERE 1=1"]
params = []
if criteria.get('group_name'):
query_parts.append("AND group_name = ?")
params.append(criteria['group_name'])
if criteria.get('company'):
query_parts.append("AND company LIKE ?")
params.append(f"%{criteria['company']}%")
if criteria.get('has_custom_field'):
field, value = criteria['has_custom_field']
query_parts.append(f"AND custom_fields LIKE ?")
params.append(f'%"{field}":"%{value}"%')
if criteria.get('last_contacted_days'):
days_ago = datetime.datetime.now() - datetime.timedelta(days=criteria['last_contacted_days'])
query_parts.append("AND last_contacted > ?")
params.append(days_ago.isoformat())
cursor = conn.execute(" ".join(query_parts), params)
contacts = [dict(zip([col[0] for col in cursor.description], row))
for row in cursor.fetchall()]
return contacts
def segment_contacts(self, criteria: Dict) -> List[Dict]:
"""Segment contacts based on criteria"""
with sqlite3.connect(self.db_path) as conn:
query_parts = ["SELECT * FROM contacts WHERE 1=1"]
params = []
if criteria.get('group_name'):
query_parts.append("AND group_name = ?")
params.append(criteria['group_name'])
if criteria.get('company'):
query_parts.append("AND company LIKE ?")
params.append(f"%{criteria['company']}%")
if criteria.get('has_custom_field'):
field, value = criteria['has_custom_field']
query_parts.append(f"AND custom_fields LIKE ?")
params.append(f'%"{field}":"%{value}"%')
if criteria.get('last_contacted_days'):
days_ago = datetime.datetime.now() - datetime.timedelta(days=criteria['last_contacted_days'])
query_parts.append("AND last_contacted > ?")
params.append(days_ago.isoformat())
cursor = conn.execute(" ".join(query_parts), params)
contacts = [dict(zip([col[0] for col in cursor.description], row))
for row in cursor.fetchall()]
return contacts
Bulk Email Processing
1. Advanced Sending Engine
def send_bulk_emails(self, account: EmailAccount, password: str,
recipients: List[Dict], template: EmailTemplate,
attachments: List[str] = None,
progress_callback=None) -> Dict:
"""Advanced bulk email sending with rate limiting and error handling"""
results = {
'sent': 0,
'failed': 0,
'errors': [],
'start_time': datetime.datetime.now(),
'end_time': None,
'rate_limit_delays': 0
}
# Rate limiting settings
emails_per_minute = 60
delay_between_emails = 60 / emails_per_minute
# Connection pool for efficiency
server = None
last_connect_time = 0
reconnect_interval = 300 # Reconnect every 5 minutes
try:
for i, recipient in enumerate(recipients):
# Reconnect if needed
current_time = time.time()
if not server or (current_time - last_connect_time) > reconnect_interval:
if server:
server.quit()
server = smtplib.SMTP(account.smtp_server, account.smtp_port)
if account.use_tls:
server.starttls()
server.login(account.email, password)
last_connect_time = current_time
# Personalize email content
personalized_subject = self.replace_variables(template.subject, recipient)
personalized_body = self.replace_variables(template.body, recipient)
# Send email
success, error_msg = self.send_single_email(
server, account.email, recipient['email'],
personalized_subject, personalized_body,
template.is_html, attachments
)
if success:
results['sent'] += 1
self.log_sent_email(
account.email, recipient['email'],
personalized_subject, True
)
else:
results['failed'] += 1
results['errors'].append({
'email': recipient['email'],
'error': error_msg
})
self.log_sent_email(
account.email, recipient['email'],
personalized_subject, False, error_msg
)
# Progress callback
if progress_callback:
progress = ((i + 1) / len(recipients)) * 100
progress_callback(progress, recipient['email'], success)
# Rate limiting
if i < len(recipients) - 1: # Don't delay after last email
time.sleep(delay_between_emails)
if delay_between_emails > 0:
results['rate_limit_delays'] += 1
except Exception as e:
results['errors'].append({
'email': 'bulk_operation',
'error': f"Bulk operation failed: {str(e)}"
})
finally:
if server:
server.quit()
results['end_time'] = datetime.datetime.now()
return results
def send_bulk_emails(self, account: EmailAccount, password: str,
recipients: List[Dict], template: EmailTemplate,
attachments: List[str] = None,
progress_callback=None) -> Dict:
"""Advanced bulk email sending with rate limiting and error handling"""
results = {
'sent': 0,
'failed': 0,
'errors': [],
'start_time': datetime.datetime.now(),
'end_time': None,
'rate_limit_delays': 0
}
# Rate limiting settings
emails_per_minute = 60
delay_between_emails = 60 / emails_per_minute
# Connection pool for efficiency
server = None
last_connect_time = 0
reconnect_interval = 300 # Reconnect every 5 minutes
try:
for i, recipient in enumerate(recipients):
# Reconnect if needed
current_time = time.time()
if not server or (current_time - last_connect_time) > reconnect_interval:
if server:
server.quit()
server = smtplib.SMTP(account.smtp_server, account.smtp_port)
if account.use_tls:
server.starttls()
server.login(account.email, password)
last_connect_time = current_time
# Personalize email content
personalized_subject = self.replace_variables(template.subject, recipient)
personalized_body = self.replace_variables(template.body, recipient)
# Send email
success, error_msg = self.send_single_email(
server, account.email, recipient['email'],
personalized_subject, personalized_body,
template.is_html, attachments
)
if success:
results['sent'] += 1
self.log_sent_email(
account.email, recipient['email'],
personalized_subject, True
)
else:
results['failed'] += 1
results['errors'].append({
'email': recipient['email'],
'error': error_msg
})
self.log_sent_email(
account.email, recipient['email'],
personalized_subject, False, error_msg
)
# Progress callback
if progress_callback:
progress = ((i + 1) / len(recipients)) * 100
progress_callback(progress, recipient['email'], success)
# Rate limiting
if i < len(recipients) - 1: # Don't delay after last email
time.sleep(delay_between_emails)
if delay_between_emails > 0:
results['rate_limit_delays'] += 1
except Exception as e:
results['errors'].append({
'email': 'bulk_operation',
'error': f"Bulk operation failed: {str(e)}"
})
finally:
if server:
server.quit()
results['end_time'] = datetime.datetime.now()
return results
2. Email Scheduling System
class EmailScheduler:
def __init__(self, email_sender: EmailSender):
self.email_sender = email_sender
self.scheduled_tasks = []
self.running = False
self.scheduler_thread = None
def schedule_email(self, account_id: int, template_id: int,
recipient_group: str, schedule_time: str,
repeat_type: str = "none", repeat_interval: int = 1):
"""Schedule email for future sending"""
task = {
'id': len(self.scheduled_tasks) + 1,
'account_id': account_id,
'template_id': template_id,
'recipient_group': recipient_group,
'schedule_time': datetime.datetime.fromisoformat(schedule_time),
'repeat_type': repeat_type, # none, daily, weekly, monthly
'repeat_interval': repeat_interval,
'next_run': datetime.datetime.fromisoformat(schedule_time),
'created_at': datetime.datetime.now(),
'status': 'scheduled'
}
self.scheduled_tasks.append(task)
return task['id']
def _run_scheduler(self):
"""Background scheduler execution"""
while self.running:
current_time = datetime.datetime.now()
for task in self.scheduled_tasks:
if (task['status'] == 'scheduled' and
task['next_run'] <= current_time):
# Execute the scheduled email
self._execute_scheduled_task(task)
# Calculate next run if repeating
if task['repeat_type'] != 'none':
task['next_run'] = self._calculate_next_run(task)
else:
task['status'] = 'completed'
time.sleep(60) # Check every minute
def _calculate_next_run(self, task: Dict) -> datetime.datetime:
"""Calculate next run time for repeating tasks"""
current_next = task['next_run']
interval = task['repeat_interval']
if task['repeat_type'] == 'daily':
return current_next + datetime.timedelta(days=interval)
elif task['repeat_type'] == 'weekly':
return current_next + datetime.timedelta(weeks=interval)
elif task['repeat_type'] == 'monthly':
# Approximate monthly calculation
return current_next + datetime.timedelta(days=30 * interval)
return current_next
class EmailScheduler:
def __init__(self, email_sender: EmailSender):
self.email_sender = email_sender
self.scheduled_tasks = []
self.running = False
self.scheduler_thread = None
def schedule_email(self, account_id: int, template_id: int,
recipient_group: str, schedule_time: str,
repeat_type: str = "none", repeat_interval: int = 1):
"""Schedule email for future sending"""
task = {
'id': len(self.scheduled_tasks) + 1,
'account_id': account_id,
'template_id': template_id,
'recipient_group': recipient_group,
'schedule_time': datetime.datetime.fromisoformat(schedule_time),
'repeat_type': repeat_type, # none, daily, weekly, monthly
'repeat_interval': repeat_interval,
'next_run': datetime.datetime.fromisoformat(schedule_time),
'created_at': datetime.datetime.now(),
'status': 'scheduled'
}
self.scheduled_tasks.append(task)
return task['id']
def _run_scheduler(self):
"""Background scheduler execution"""
while self.running:
current_time = datetime.datetime.now()
for task in self.scheduled_tasks:
if (task['status'] == 'scheduled' and
task['next_run'] <= current_time):
# Execute the scheduled email
self._execute_scheduled_task(task)
# Calculate next run if repeating
if task['repeat_type'] != 'none':
task['next_run'] = self._calculate_next_run(task)
else:
task['status'] = 'completed'
time.sleep(60) # Check every minute
def _calculate_next_run(self, task: Dict) -> datetime.datetime:
"""Calculate next run time for repeating tasks"""
current_next = task['next_run']
interval = task['repeat_interval']
if task['repeat_type'] == 'daily':
return current_next + datetime.timedelta(days=interval)
elif task['repeat_type'] == 'weekly':
return current_next + datetime.timedelta(weeks=interval)
elif task['repeat_type'] == 'monthly':
# Approximate monthly calculation
return current_next + datetime.timedelta(days=30 * interval)
return current_next
Analytics and Reporting
1. Email Performance Metrics
def generate_email_analytics(self, account_id: int = None,
days_back: int = 30) -> Dict:
"""Generate comprehensive email analytics"""
with sqlite3.connect(self.db_path) as conn:
# Base query conditions
conditions = ["sent_at >= datetime('now', '-{} days')".format(days_back)]
params = []
if account_id:
conditions.append("account_id = ?")
params.append(account_id)
where_clause = " AND ".join(conditions)
# Total emails sent
cursor = conn.execute(f"""
SELECT COUNT(*) as total_sent,
COUNT(CASE WHEN success = 1 THEN 1 END) as successful,
COUNT(CASE WHEN success = 0 THEN 1 END) as failed
FROM email_history
WHERE {where_clause}
""", params)
totals = cursor.fetchone()
# Daily breakdown
cursor = conn.execute(f"""
SELECT date(sent_at) as send_date,
COUNT(*) as emails_sent,
COUNT(CASE WHEN success = 1 THEN 1 END) as successful
FROM email_history
WHERE {where_clause}
GROUP BY date(sent_at)
ORDER BY send_date
""", params)
daily_stats = cursor.fetchall()
# Top error types
cursor = conn.execute(f"""
SELECT error_message, COUNT(*) as error_count
FROM email_history
WHERE {where_clause} AND success = 0 AND error_message IS NOT NULL
GROUP BY error_message
ORDER BY error_count DESC
LIMIT 10
""", params)
error_stats = cursor.fetchall()
# Template performance
cursor = conn.execute(f"""
SELECT template_id, COUNT(*) as usage_count,
COUNT(CASE WHEN success = 1 THEN 1 END) as successful_sends
FROM email_history
WHERE {where_clause} AND template_id IS NOT NULL
GROUP BY template_id
ORDER BY usage_count DESC
""", params)
template_stats = cursor.fetchall()
return {
'summary': {
'total_sent': totals[0],
'successful': totals[1],
'failed': totals[2],
'success_rate': (totals[1] / totals[0] * 100) if totals[0] > 0 else 0,
'period_days': days_back
},
'daily_breakdown': [
{
'date': row[0],
'emails_sent': row[1],
'successful': row[2],
'success_rate': (row[2] / row[1] * 100) if row[1] > 0 else 0
}
for row in daily_stats
],
'error_analysis': [
{'error': row[0], 'count': row[1]}
for row in error_stats
],
'template_performance': [
{
'template_id': row[0],
'usage_count': row[1],
'successful_sends': row[2],
'success_rate': (row[2] / row[1] * 100) if row[1] > 0 else 0
}
for row in template_stats
]
}
def generate_email_analytics(self, account_id: int = None,
days_back: int = 30) -> Dict:
"""Generate comprehensive email analytics"""
with sqlite3.connect(self.db_path) as conn:
# Base query conditions
conditions = ["sent_at >= datetime('now', '-{} days')".format(days_back)]
params = []
if account_id:
conditions.append("account_id = ?")
params.append(account_id)
where_clause = " AND ".join(conditions)
# Total emails sent
cursor = conn.execute(f"""
SELECT COUNT(*) as total_sent,
COUNT(CASE WHEN success = 1 THEN 1 END) as successful,
COUNT(CASE WHEN success = 0 THEN 1 END) as failed
FROM email_history
WHERE {where_clause}
""", params)
totals = cursor.fetchone()
# Daily breakdown
cursor = conn.execute(f"""
SELECT date(sent_at) as send_date,
COUNT(*) as emails_sent,
COUNT(CASE WHEN success = 1 THEN 1 END) as successful
FROM email_history
WHERE {where_clause}
GROUP BY date(sent_at)
ORDER BY send_date
""", params)
daily_stats = cursor.fetchall()
# Top error types
cursor = conn.execute(f"""
SELECT error_message, COUNT(*) as error_count
FROM email_history
WHERE {where_clause} AND success = 0 AND error_message IS NOT NULL
GROUP BY error_message
ORDER BY error_count DESC
LIMIT 10
""", params)
error_stats = cursor.fetchall()
# Template performance
cursor = conn.execute(f"""
SELECT template_id, COUNT(*) as usage_count,
COUNT(CASE WHEN success = 1 THEN 1 END) as successful_sends
FROM email_history
WHERE {where_clause} AND template_id IS NOT NULL
GROUP BY template_id
ORDER BY usage_count DESC
""", params)
template_stats = cursor.fetchall()
return {
'summary': {
'total_sent': totals[0],
'successful': totals[1],
'failed': totals[2],
'success_rate': (totals[1] / totals[0] * 100) if totals[0] > 0 else 0,
'period_days': days_back
},
'daily_breakdown': [
{
'date': row[0],
'emails_sent': row[1],
'successful': row[2],
'success_rate': (row[2] / row[1] * 100) if row[1] > 0 else 0
}
for row in daily_stats
],
'error_analysis': [
{'error': row[0], 'count': row[1]}
for row in error_stats
],
'template_performance': [
{
'template_id': row[0],
'usage_count': row[1],
'successful_sends': row[2],
'success_rate': (row[2] / row[1] * 100) if row[1] > 0 else 0
}
for row in template_stats
]
}
2. Report Generation
def generate_pdf_report(self, analytics: Dict, output_path: str):
"""Generate PDF analytics report"""
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import letter, A4
from reportlab.lib import colors
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
doc = SimpleDocTemplate(output_path, pagesize=A4)
styles = getSampleStyleSheet()
story = []
# Title
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=24,
alignment=1, # Center alignment
textColor=colors.darkblue
)
story.append(Paragraph("Email Campaign Analytics Report", title_style))
story.append(Paragraph("<br/><br/>", styles['Normal']))
# Summary section
summary = analytics['summary']
summary_data = [
['Metric', 'Value'],
['Total Emails Sent', f"{summary['total_sent']:,}"],
['Successful Sends', f"{summary['successful']:,}"],
['Failed Sends', f"{summary['failed']:,}"],
['Success Rate', f"{summary['success_rate']:.1f}%"],
['Reporting Period', f"{summary['period_days']} days"]
]
summary_table = Table(summary_data)
summary_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 14),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
story.append(Paragraph("Campaign Summary", styles['Heading2']))
story.append(summary_table)
story.append(Paragraph("<br/>", styles['Normal']))
# Build PDF
doc.build(story)
def generate_pdf_report(self, analytics: Dict, output_path: str):
"""Generate PDF analytics report"""
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import letter, A4
from reportlab.lib import colors
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
doc = SimpleDocTemplate(output_path, pagesize=A4)
styles = getSampleStyleSheet()
story = []
# Title
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=24,
alignment=1, # Center alignment
textColor=colors.darkblue
)
story.append(Paragraph("Email Campaign Analytics Report", title_style))
story.append(Paragraph("<br/><br/>", styles['Normal']))
# Summary section
summary = analytics['summary']
summary_data = [
['Metric', 'Value'],
['Total Emails Sent', f"{summary['total_sent']:,}"],
['Successful Sends', f"{summary['successful']:,}"],
['Failed Sends', f"{summary['failed']:,}"],
['Success Rate', f"{summary['success_rate']:.1f}%"],
['Reporting Period', f"{summary['period_days']} days"]
]
summary_table = Table(summary_data)
summary_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 14),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
story.append(Paragraph("Campaign Summary", styles['Heading2']))
story.append(summary_table)
story.append(Paragraph("<br/>", styles['Normal']))
# Build PDF
doc.build(story)
Advanced Features
1. A/B Testing Framework
class ABTestManager:
def __init__(self, database: EmailDatabase):
self.db = database
def create_ab_test(self, test_name: str, template_a_id: int,
template_b_id: int, test_percentage: float = 0.1):
"""Create A/B test for email templates"""
with sqlite3.connect(self.db.db_path) as conn:
conn.execute("""
INSERT INTO ab_tests (test_name, template_a_id, template_b_id,
test_percentage, status, created_at)
VALUES (?, ?, ?, ?, 'active', datetime('now'))
""", (test_name, template_a_id, template_b_id, test_percentage))
return conn.lastrowid
def select_template_for_recipient(self, recipient_email: str,
available_tests: List[int]) -> int:
"""Select template based on A/B test assignment"""
# Use email hash for consistent assignment
email_hash = hash(recipient_email)
for test_id in available_tests:
test_info = self.get_test_info(test_id)
# Determine if recipient is in test group
hash_mod = abs(email_hash) % 100
if hash_mod < (test_info['test_percentage'] * 100):
# In test group - randomly assign template
return test_info['template_b_id'] if hash_mod % 2 else test_info['template_a_id']
# Default template if not in any test
return available_tests[0] if available_tests else None
class ABTestManager:
def __init__(self, database: EmailDatabase):
self.db = database
def create_ab_test(self, test_name: str, template_a_id: int,
template_b_id: int, test_percentage: float = 0.1):
"""Create A/B test for email templates"""
with sqlite3.connect(self.db.db_path) as conn:
conn.execute("""
INSERT INTO ab_tests (test_name, template_a_id, template_b_id,
test_percentage, status, created_at)
VALUES (?, ?, ?, ?, 'active', datetime('now'))
""", (test_name, template_a_id, template_b_id, test_percentage))
return conn.lastrowid
def select_template_for_recipient(self, recipient_email: str,
available_tests: List[int]) -> int:
"""Select template based on A/B test assignment"""
# Use email hash for consistent assignment
email_hash = hash(recipient_email)
for test_id in available_tests:
test_info = self.get_test_info(test_id)
# Determine if recipient is in test group
hash_mod = abs(email_hash) % 100
if hash_mod < (test_info['test_percentage'] * 100):
# In test group - randomly assign template
return test_info['template_b_id'] if hash_mod % 2 else test_info['template_a_id']
# Default template if not in any test
return available_tests[0] if available_tests else None
2. Email Validation and Verification
def validate_email_list(self, email_list: List[str]) -> Dict:
"""Comprehensive email validation"""
import re
import dns.resolver
results = {
'valid': [],
'invalid': [],
'risky': [],
'unknown': []
}
email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
for email in email_list:
email = email.strip().lower()
# Basic format validation
if not email_pattern.match(email):
results['invalid'].append({
'email': email,
'reason': 'Invalid format'
})
continue
# Domain validation
domain = email.split('@')[1]
try:
# Check if domain has MX record
mx_records = dns.resolver.resolve(domain, 'MX')
if mx_records:
# Additional checks for risky domains
if self.is_disposable_email(domain):
results['risky'].append({
'email': email,
'reason': 'Disposable email provider'
})
elif self.is_role_email(email):
results['risky'].append({
'email': email,
'reason': 'Role-based email address'
})
else:
results['valid'].append(email)
else:
results['invalid'].append({
'email': email,
'reason': 'No MX record found'
})
except Exception:
results['unknown'].append({
'email': email,
'reason': 'Domain verification failed'
})
return results
def is_disposable_email(self, domain: str) -> bool:
"""Check if domain is a disposable email provider"""
disposable_domains = {
'10minutemail.com', 'tempmail.org', 'guerrillamail.com',
'mailinator.com', 'yopmail.com', 'trash-mail.com'
}
return domain in disposable_domains
def is_role_email(self, email: str) -> bool:
"""Check if email is a role-based address"""
role_prefixes = {
'admin', 'administrator', 'support', 'help', 'info',
'sales', 'marketing', 'noreply', 'no-reply', 'postmaster'
}
local_part = email.split('@')[0]
return local_part in role_prefixes
def validate_email_list(self, email_list: List[str]) -> Dict:
"""Comprehensive email validation"""
import re
import dns.resolver
results = {
'valid': [],
'invalid': [],
'risky': [],
'unknown': []
}
email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
for email in email_list:
email = email.strip().lower()
# Basic format validation
if not email_pattern.match(email):
results['invalid'].append({
'email': email,
'reason': 'Invalid format'
})
continue
# Domain validation
domain = email.split('@')[1]
try:
# Check if domain has MX record
mx_records = dns.resolver.resolve(domain, 'MX')
if mx_records:
# Additional checks for risky domains
if self.is_disposable_email(domain):
results['risky'].append({
'email': email,
'reason': 'Disposable email provider'
})
elif self.is_role_email(email):
results['risky'].append({
'email': email,
'reason': 'Role-based email address'
})
else:
results['valid'].append(email)
else:
results['invalid'].append({
'email': email,
'reason': 'No MX record found'
})
except Exception:
results['unknown'].append({
'email': email,
'reason': 'Domain verification failed'
})
return results
def is_disposable_email(self, domain: str) -> bool:
"""Check if domain is a disposable email provider"""
disposable_domains = {
'10minutemail.com', 'tempmail.org', 'guerrillamail.com',
'mailinator.com', 'yopmail.com', 'trash-mail.com'
}
return domain in disposable_domains
def is_role_email(self, email: str) -> bool:
"""Check if email is a role-based address"""
role_prefixes = {
'admin', 'administrator', 'support', 'help', 'info',
'sales', 'marketing', 'noreply', 'no-reply', 'postmaster'
}
local_part = email.split('@')[0]
return local_part in role_prefixes
Usage Examples
1. Quick Start Example
# Basic email sending example
def quick_start_example():
"""Quick start example for sending emails"""
# Initialize system
db = EmailDatabase()
sender = EmailSender(db)
# Add email account
account_id = db.add_email_account(
name="My Gmail Account",
email="your.email@gmail.com",
smtp_server="smtp.gmail.com",
smtp_port=587
)
# Create email template
template_id = db.add_template(
name="Welcome Email",
subject="Welcome {first_name}!",
body="""
Hello {first_name},
Welcome to our platform! We're excited to have you.
Best regards,
The Team
""",
variables=["first_name"]
)
# Add contacts
contacts = [
{"first_name": "John", "email": "john@example.com"},
{"first_name": "Jane", "email": "jane@example.com"}
]
for contact in contacts:
db.add_contact(
name=contact["first_name"],
email=contact["email"],
custom_fields=contact
)
# Send emails
account = db.get_email_account(account_id)
template = db.get_template(template_id)
recipients = db.get_contacts()
password = input("Enter email password: ")
results = sender.send_bulk_emails(account, password, recipients, template)
print(f"Sent: {results['sent']}, Failed: {results['failed']}")
# Basic email sending example
def quick_start_example():
"""Quick start example for sending emails"""
# Initialize system
db = EmailDatabase()
sender = EmailSender(db)
# Add email account
account_id = db.add_email_account(
name="My Gmail Account",
email="your.email@gmail.com",
smtp_server="smtp.gmail.com",
smtp_port=587
)
# Create email template
template_id = db.add_template(
name="Welcome Email",
subject="Welcome {first_name}!",
body="""
Hello {first_name},
Welcome to our platform! We're excited to have you.
Best regards,
The Team
""",
variables=["first_name"]
)
# Add contacts
contacts = [
{"first_name": "John", "email": "john@example.com"},
{"first_name": "Jane", "email": "jane@example.com"}
]
for contact in contacts:
db.add_contact(
name=contact["first_name"],
email=contact["email"],
custom_fields=contact
)
# Send emails
account = db.get_email_account(account_id)
template = db.get_template(template_id)
recipients = db.get_contacts()
password = input("Enter email password: ")
results = sender.send_bulk_emails(account, password, recipients, template)
print(f"Sent: {results['sent']}, Failed: {results['failed']}")
2. Advanced Campaign Example
def advanced_campaign_example():
"""Advanced email campaign with segmentation and scheduling"""
db = EmailDatabase()
sender = EmailSender(db)
scheduler = EmailScheduler(sender)
# Import contacts from CSV
contacts = sender.parse_csv_contacts('customer_list.csv')
# Segment contacts
premium_customers = db.segment_contacts({
'has_custom_field': ('subscription_type', 'premium')
})
new_customers = db.segment_contacts({
'last_contacted_days': 30
})
# Create targeted templates
premium_template = EmailTemplate(
name="Premium Newsletter",
subject="Exclusive Updates for Premium Members",
body="Premium content for {first_name}...",
is_html=True
)
# Schedule campaigns
scheduler.schedule_email(
account_id=1,
template_id=premium_template.id,
recipient_group="premium",
schedule_time="2025-09-05T09:00:00",
repeat_type="weekly"
)
# Start scheduler
scheduler.start_scheduler()
def advanced_campaign_example():
"""Advanced email campaign with segmentation and scheduling"""
db = EmailDatabase()
sender = EmailSender(db)
scheduler = EmailScheduler(sender)
# Import contacts from CSV
contacts = sender.parse_csv_contacts('customer_list.csv')
# Segment contacts
premium_customers = db.segment_contacts({
'has_custom_field': ('subscription_type', 'premium')
})
new_customers = db.segment_contacts({
'last_contacted_days': 30
})
# Create targeted templates
premium_template = EmailTemplate(
name="Premium Newsletter",
subject="Exclusive Updates for Premium Members",
body="Premium content for {first_name}...",
is_html=True
)
# Schedule campaigns
scheduler.schedule_email(
account_id=1,
template_id=premium_template.id,
recipient_group="premium",
schedule_time="2025-09-05T09:00:00",
repeat_type="weekly"
)
# Start scheduler
scheduler.start_scheduler()
Troubleshooting Guide
Common Issues and Solutions
1. SMTP Authentication Errors
# Solution: Handle authentication properly
def handle_smtp_auth_error(self, error):
"""Handle SMTP authentication errors"""
error_solutions = {
'535': "Invalid credentials - check email/password",
'534': "Authentication mechanism not supported",
'587': "Enable 'Less secure app access' for Gmail",
'465': "Try using TLS on port 587 instead of SSL on 465"
}
error_code = str(error).split()[0]
solution = error_solutions.get(error_code, "Check SMTP settings and credentials")
return f"Authentication Error: {solution}"
# Solution: Handle authentication properly
def handle_smtp_auth_error(self, error):
"""Handle SMTP authentication errors"""
error_solutions = {
'535': "Invalid credentials - check email/password",
'534': "Authentication mechanism not supported",
'587': "Enable 'Less secure app access' for Gmail",
'465': "Try using TLS on port 587 instead of SSL on 465"
}
error_code = str(error).split()[0]
solution = error_solutions.get(error_code, "Check SMTP settings and credentials")
return f"Authentication Error: {solution}"
2. Rate Limiting Issues
# Solution: Implement intelligent rate limiting
def implement_rate_limiting(self, provider: str):
"""Provider-specific rate limiting"""
rate_limits = {
'gmail.com': {'emails_per_minute': 20, 'emails_per_day': 500},
'outlook.com': {'emails_per_minute': 30, 'emails_per_day': 300},
'yahoo.com': {'emails_per_minute': 25, 'emails_per_day': 400}
}
return rate_limits.get(provider, {'emails_per_minute': 10, 'emails_per_day': 100})
# Solution: Implement intelligent rate limiting
def implement_rate_limiting(self, provider: str):
"""Provider-specific rate limiting"""
rate_limits = {
'gmail.com': {'emails_per_minute': 20, 'emails_per_day': 500},
'outlook.com': {'emails_per_minute': 30, 'emails_per_day': 300},
'yahoo.com': {'emails_per_minute': 25, 'emails_per_day': 400}
}
return rate_limits.get(provider, {'emails_per_minute': 10, 'emails_per_day': 100})
3. Large Attachment Handling
# Solution: Compress or split large attachments
def handle_large_attachments(self, attachments: List[str], max_size_mb: int = 25):
"""Handle large email attachments"""
import zipfile
total_size = sum(os.path.getsize(f) for f in attachments)
max_size_bytes = max_size_mb * 1024 * 1024
if total_size > max_size_bytes:
# Create compressed archive
zip_path = 'attachments.zip'
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_path in attachments:
zipf.write(file_path, os.path.basename(file_path))
return [zip_path] if os.path.getsize(zip_path) <= max_size_bytes else []
return attachments
# Solution: Compress or split large attachments
def handle_large_attachments(self, attachments: List[str], max_size_mb: int = 25):
"""Handle large email attachments"""
import zipfile
total_size = sum(os.path.getsize(f) for f in attachments)
max_size_bytes = max_size_mb * 1024 * 1024
if total_size > max_size_bytes:
# Create compressed archive
zip_path = 'attachments.zip'
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_path in attachments:
zipf.write(file_path, os.path.basename(file_path))
return [zip_path] if os.path.getsize(zip_path) <= max_size_bytes else []
return attachments
Performance Optimization
1. Database Optimization
def optimize_database(self):
"""Optimize database for better performance"""
with sqlite3.connect(self.db_path) as conn:
# Create indexes for better query performance
conn.execute("CREATE INDEX IF NOT EXISTS idx_contacts_email ON contacts(email)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_email_history_sent_at ON email_history(sent_at)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_email_history_success ON email_history(success)")
# Vacuum database to reclaim space
conn.execute("VACUUM")
# Analyze tables for query optimization
conn.execute("ANALYZE")
def optimize_database(self):
"""Optimize database for better performance"""
with sqlite3.connect(self.db_path) as conn:
# Create indexes for better query performance
conn.execute("CREATE INDEX IF NOT EXISTS idx_contacts_email ON contacts(email)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_email_history_sent_at ON email_history(sent_at)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_email_history_success ON email_history(success)")
# Vacuum database to reclaim space
conn.execute("VACUUM")
# Analyze tables for query optimization
conn.execute("ANALYZE")
2. Memory Management
def process_large_recipient_list(self, recipients: List[Dict], batch_size: int = 100):
"""Process large recipient lists in batches"""
for i in range(0, len(recipients), batch_size):
batch = recipients[i:i + batch_size]
yield batch
# Optional: Add delay between batches
time.sleep(1)
def process_large_recipient_list(self, recipients: List[Dict], batch_size: int = 100):
"""Process large recipient lists in batches"""
for i in range(0, len(recipients), batch_size):
batch = recipients[i:i + batch_size]
yield batch
# Optional: Add delay between batches
time.sleep(1)
Security Features
1. Password Encryption
import cryptography.fernet as fernet
import base64
def encrypt_password(self, password: str, key: bytes = None) -> Tuple[str, bytes]:
"""Encrypt password for secure storage"""
if key is None:
key = fernet.Fernet.generate_key()
f = fernet.Fernet(key)
encrypted_password = f.encrypt(password.encode())
return base64.b64encode(encrypted_password).decode(), key
def decrypt_password(self, encrypted_password: str, key: bytes) -> str:
"""Decrypt stored password"""
f = fernet.Fernet(key)
encrypted_bytes = base64.b64decode(encrypted_password.encode())
return f.decrypt(encrypted_bytes).decode()
import cryptography.fernet as fernet
import base64
def encrypt_password(self, password: str, key: bytes = None) -> Tuple[str, bytes]:
"""Encrypt password for secure storage"""
if key is None:
key = fernet.Fernet.generate_key()
f = fernet.Fernet(key)
encrypted_password = f.encrypt(password.encode())
return base64.b64encode(encrypted_password).decode(), key
def decrypt_password(self, encrypted_password: str, key: bytes) -> str:
"""Decrypt stored password"""
f = fernet.Fernet(key)
encrypted_bytes = base64.b64decode(encrypted_password.encode())
return f.decrypt(encrypted_bytes).decode()
2. Input Sanitization
def sanitize_email_content(self, content: str) -> str:
"""Sanitize email content to prevent injection attacks"""
import html
import re
# Escape HTML characters
content = html.escape(content)
# Remove potentially dangerous patterns
dangerous_patterns = [
r'<script.*?</script>',
r'javascript:',
r'vbscript:',
r'onload=',
r'onerror='
]
for pattern in dangerous_patterns:
content = re.sub(pattern, '', content, flags=re.IGNORECASE)
return content
def sanitize_email_content(self, content: str) -> str:
"""Sanitize email content to prevent injection attacks"""
import html
import re
# Escape HTML characters
content = html.escape(content)
# Remove potentially dangerous patterns
dangerous_patterns = [
r'<script.*?</script>',
r'javascript:',
r'vbscript:',
r'onload=',
r'onerror='
]
for pattern in dangerous_patterns:
content = re.sub(pattern, '', content, flags=re.IGNORECASE)
return content
Next Steps and Extensions
Recommended Improvements
- Web Interface: Create a Flask/Django web interface
- API Integration: Add REST API for external integrations
- Mobile App: Build companion mobile application
- Cloud Storage: Integrate with cloud storage for attachments
- Advanced Analytics: Add click tracking and engagement metrics
- Machine Learning: Implement send-time optimization
- Multi-language: Add internationalization support
- Enterprise Features: Add SSO, audit logs, and compliance features
Sample Projects to Build
- Newsletter Platform: Build a complete newsletter service
- E-commerce Notifications: Create transactional email system
- CRM Integration: Integrate with customer relationship management
- Marketing Automation: Build automated drip campaigns
- Event Notifications: Create event-driven email triggers
This comprehensive email automation system provides enterprise-level capabilities for managing email campaigns, contact lists, and analytics. The modular design allows for easy customization and extension based on specific requirements. 📧🚀
Was this page helpful?
Let us know how we did