Advanced Web Scraping with BeautifulSoup
Abstract
Create an advanced web scraping application with BeautifulSoup that extracts data from various website types including news sites, e-commerce platforms, and social media. This project demonstrates professional web scraping techniques, ethical scraping practices, rate limiting, and comprehensive data extraction methods.
Prerequisites
- Python 3.7 or above
- Text Editor or IDE
- Solid understanding of Python syntax and web technologies
- Knowledge of HTML, CSS selectors, and DOM structure
- Familiarity with HTTP requests and web protocols
- Understanding of ethical web scraping principles
- Basic knowledge of data processing and export formats
Getting Started
Create a new project
- Create a new project folder and name it
advancedWebScraper
advancedWebScraper
. - Create a new file and name it
webscrapingbeautifulsoup.py
webscrapingbeautifulsoup.py
. - Install required dependencies:
pip install beautifulsoup4 requests lxml
pip install beautifulsoup4 requests lxml
- Open the project folder in your favorite text editor or IDE.
- Copy the code below and paste it into your
webscrapingbeautifulsoup.py
webscrapingbeautifulsoup.py
file.
Write the code
- Add the following code to your
webscrapingbeautifulsoup.py
webscrapingbeautifulsoup.py
file.
βοΈ Advanced Web Scraping with BeautifulSoup
# Advanced Web Scraping with BeautifulSoup
import requests
from bs4 import BeautifulSoup
import csv
import json
import time
import os
import re
from urllib.parse import urljoin, urlparse
from typing import List, Dict, Optional
import logging
class WebScraper:
def __init__(self, base_url: str = "", delay: float = 1.0):
self.base_url = base_url
self.delay = delay
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger(__name__)
def fetch_page(self, url: str) -> Optional[BeautifulSoup]:
"""Fetch a web page and return BeautifulSoup object"""
try:
self.logger.info(f"Fetching: {url}")
response = self.session.get(url, timeout=10)
response.raise_for_status()
# Add delay to be respectful
time.sleep(self.delay)
return BeautifulSoup(response.content, 'html.parser')
except requests.exceptions.RequestException as e:
self.logger.error(f"Error fetching {url}: {e}")
return None
def scrape_articles_from_website(self, base_url: str, max_pages: int = 5) -> List[Dict]:
"""Scrape articles from a news website"""
articles = []
# Example for scraping a blog/news site
for page in range(1, max_pages + 1):
url = f"{base_url}?page={page}"
soup = self.fetch_page(url)
if not soup:
continue
# Find article containers (adjust selectors based on target site)
article_containers = soup.find_all('article') or soup.find_all('div', class_=['post', 'article', 'entry'])
for container in article_containers:
article_data = self.extract_article_data(container, base_url)
if article_data:
articles.append(article_data)
return articles
def extract_article_data(self, container, base_url: str) -> Optional[Dict]:
"""Extract article data from HTML container"""
try:
# Extract title
title_elem = container.find('h1') or container.find('h2') or container.find('h3')
title = title_elem.get_text(strip=True) if title_elem else "No title"
# Extract link
link_elem = container.find('a')
link = urljoin(base_url, link_elem.get('href')) if link_elem else ""
# Extract excerpt/description
desc_elem = container.find('p') or container.find('div', class_=['excerpt', 'summary'])
description = desc_elem.get_text(strip=True) if desc_elem else ""
# Extract author
author_elem = container.find('span', class_=['author', 'by']) or container.find('div', class_='author')
author = author_elem.get_text(strip=True) if author_elem else "Unknown"
# Extract date
date_elem = container.find('time') or container.find('span', class_=['date', 'published'])
date = date_elem.get_text(strip=True) if date_elem else ""
# Extract tags/categories
tag_container = container.find('div', class_=['tags', 'categories'])
tags = []
if tag_container:
tag_links = tag_container.find_all('a')
tags = [tag.get_text(strip=True) for tag in tag_links]
return {
'title': title,
'link': link,
'description': description[:200] + "..." if len(description) > 200 else description,
'author': author,
'date': date,
'tags': tags
}
except Exception as e:
self.logger.error(f"Error extracting article data: {e}")
return None
def scrape_product_listings(self, base_url: str, max_pages: int = 3) -> List[Dict]:
"""Scrape product listings from an e-commerce site"""
products = []
for page in range(1, max_pages + 1):
url = f"{base_url}?page={page}"
soup = self.fetch_page(url)
if not soup:
continue
# Find product containers
product_containers = soup.find_all('div', class_=['product', 'item', 'listing'])
for container in product_containers:
product_data = self.extract_product_data(container, base_url)
if product_data:
products.append(product_data)
return products
def extract_product_data(self, container, base_url: str) -> Optional[Dict]:
"""Extract product data from HTML container"""
try:
# Extract product name
name_elem = container.find('h2') or container.find('h3') or container.find('a')
name = name_elem.get_text(strip=True) if name_elem else "No name"
# Extract price
price_elem = container.find('span', class_=['price', 'cost']) or container.find('div', class_='price')
price = price_elem.get_text(strip=True) if price_elem else "No price"
# Clean price (remove currency symbols, etc.)
price_match = re.search(r'[\d,]+\.?\d*', price)
clean_price = price_match.group() if price_match else "0"
# Extract image URL
img_elem = container.find('img')
image_url = urljoin(base_url, img_elem.get('src')) if img_elem else ""
# Extract product link
link_elem = container.find('a')
product_link = urljoin(base_url, link_elem.get('href')) if link_elem else ""
# Extract rating
rating_elem = container.find('div', class_=['rating', 'stars'])
rating = rating_elem.get_text(strip=True) if rating_elem else "No rating"
return {
'name': name,
'price': clean_price,
'original_price': price,
'image_url': image_url,
'product_link': product_link,
'rating': rating
}
except Exception as e:
self.logger.error(f"Error extracting product data: {e}")
return None
def scrape_social_media_posts(self, username: str, platform: str = "twitter") -> List[Dict]:
"""Scrape social media posts (demo - be mindful of ToS)"""
posts = []
# This is a simplified example - real implementation would need
# proper authentication and API usage
if platform.lower() == "twitter":
# Example URL structure (adjust based on actual requirements)
url = f"https://twitter.com/{username}"
soup = self.fetch_page(url)
if soup:
# Find tweet containers (adjust selectors based on current Twitter structure)
tweet_containers = soup.find_all('div', {'data-testid': 'tweet'})
for container in tweet_containers:
post_data = self.extract_social_post_data(container)
if post_data:
posts.append(post_data)
return posts
def extract_social_post_data(self, container) -> Optional[Dict]:
"""Extract social media post data"""
try:
# Extract post text
text_elem = container.find('div', {'data-testid': 'tweetText'})
text = text_elem.get_text(strip=True) if text_elem else ""
# Extract timestamp
time_elem = container.find('time')
timestamp = time_elem.get('datetime') if time_elem else ""
# Extract engagement metrics
likes_elem = container.find('div', {'data-testid': 'like'})
likes = likes_elem.get_text(strip=True) if likes_elem else "0"
retweets_elem = container.find('div', {'data-testid': 'retweet'})
retweets = retweets_elem.get_text(strip=True) if retweets_elem else "0"
return {
'text': text,
'timestamp': timestamp,
'likes': likes,
'retweets': retweets
}
except Exception as e:
self.logger.error(f"Error extracting social post data: {e}")
return None
def save_to_csv(self, data: List[Dict], filename: str):
"""Save scraped data to CSV file"""
if not data:
self.logger.warning("No data to save")
return
try:
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = data[0].keys()
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for row in data:
writer.writerow(row)
self.logger.info(f"Data saved to {filename}")
except Exception as e:
self.logger.error(f"Error saving to CSV: {e}")
def save_to_json(self, data: List[Dict], filename: str):
"""Save scraped data to JSON file"""
try:
with open(filename, 'w', encoding='utf-8') as jsonfile:
json.dump(data, jsonfile, indent=2, ensure_ascii=False)
self.logger.info(f"Data saved to {filename}")
except Exception as e:
self.logger.error(f"Error saving to JSON: {e}")
def scrape_quotes(self) -> List[Dict]:
"""Scrape quotes from quotes.toscrape.com (practice site)"""
base_url = "http://quotes.toscrape.com"
quotes = []
page = 1
while True:
url = f"{base_url}/page/{page}/"
soup = self.fetch_page(url)
if not soup:
break
quote_containers = soup.find_all('div', class_='quote')
if not quote_containers:
break
for quote_div in quote_containers:
try:
text = quote_div.find('span', class_='text').get_text()
author = quote_div.find('small', class_='author').get_text()
tags = [tag.get_text() for tag in quote_div.find_all('a', class_='tag')]
quotes.append({
'text': text,
'author': author,
'tags': tags
})
except Exception as e:
self.logger.error(f"Error extracting quote: {e}")
page += 1
return quotes
def scrape_books_info(self) -> List[Dict]:
"""Scrape book information from books.toscrape.com (practice site)"""
base_url = "http://books.toscrape.com"
books = []
page = 1
while True:
url = f"{base_url}/catalogue/page-{page}.html"
soup = self.fetch_page(url)
if not soup:
break
book_containers = soup.find_all('article', class_='product_pod')
if not book_containers:
break
for book_article in book_containers:
try:
title_elem = book_article.find('h3').find('a')
title = title_elem.get('title')
price_elem = book_article.find('p', class_='price_color')
price = price_elem.get_text()
availability_elem = book_article.find('p', class_='instock availability')
availability = availability_elem.get_text(strip=True)
rating_elem = book_article.find('p', class_='star-rating')
rating = rating_elem.get('class')[1] if rating_elem else 'No rating'
books.append({
'title': title,
'price': price,
'availability': availability,
'rating': rating
})
except Exception as e:
self.logger.error(f"Error extracting book info: {e}")
page += 1
# Limit to prevent infinite loop
if page > 50:
break
return books
def main():
"""Main function to demonstrate web scraping"""
scraper = WebScraper(delay=1.0)
while True:
print("\n=== Advanced Web Scraper ===")
print("1. Scrape quotes (practice site)")
print("2. Scrape books (practice site)")
print("3. Custom article scraping")
print("4. Custom product scraping")
print("5. View scraped data files")
print("0. Exit")
try:
choice = input("\nEnter your choice: ").strip()
if choice == '1':
print("Scraping quotes from quotes.toscrape.com...")
quotes = scraper.scrape_quotes()
if quotes:
print(f"Scraped {len(quotes)} quotes!")
# Show first few quotes
for i, quote in enumerate(quotes[:3], 1):
print(f"\nQuote {i}:")
print(f"Text: {quote['text'][:100]}...")
print(f"Author: {quote['author']}")
print(f"Tags: {', '.join(quote['tags'])}")
# Save options
save_format = input("\nSave as (csv/json/both): ").strip().lower()
if save_format in ['csv', 'both']:
scraper.save_to_csv(quotes, 'quotes.csv')
if save_format in ['json', 'both']:
scraper.save_to_json(quotes, 'quotes.json')
else:
print("No quotes found!")
elif choice == '2':
print("Scraping books from books.toscrape.com...")
books = scraper.scrape_books_info()
if books:
print(f"Scraped {len(books)} books!")
# Show first few books
for i, book in enumerate(books[:5], 1):
print(f"\nBook {i}:")
print(f"Title: {book['title'][:50]}...")
print(f"Price: {book['price']}")
print(f"Rating: {book['rating']}")
print(f"Availability: {book['availability']}")
# Save options
save_format = input("\nSave as (csv/json/both): ").strip().lower()
if save_format in ['csv', 'both']:
scraper.save_to_csv(books, 'books.csv')
if save_format in ['json', 'both']:
scraper.save_to_json(books, 'books.json')
else:
print("No books found!")
elif choice == '3':
base_url = input("Enter base URL for article scraping: ").strip()
if base_url:
max_pages = int(input("Enter max pages to scrape (default 3): ").strip() or "3")
print(f"Scraping articles from {base_url}...")
articles = scraper.scrape_articles_from_website(base_url, max_pages)
if articles:
print(f"Scraped {len(articles)} articles!")
# Show first few articles
for i, article in enumerate(articles[:3], 1):
print(f"\nArticle {i}:")
print(f"Title: {article['title'][:50]}...")
print(f"Author: {article['author']}")
print(f"Date: {article['date']}")
# Save options
save_format = input("\nSave as (csv/json/both): ").strip().lower()
if save_format in ['csv', 'both']:
scraper.save_to_csv(articles, 'articles.csv')
if save_format in ['json', 'both']:
scraper.save_to_json(articles, 'articles.json')
else:
print("No articles found!")
elif choice == '4':
base_url = input("Enter base URL for product scraping: ").strip()
if base_url:
max_pages = int(input("Enter max pages to scrape (default 3): ").strip() or "3")
print(f"Scraping products from {base_url}...")
products = scraper.scrape_product_listings(base_url, max_pages)
if products:
print(f"Scraped {len(products)} products!")
# Show first few products
for i, product in enumerate(products[:3], 1):
print(f"\nProduct {i}:")
print(f"Name: {product['name'][:50]}...")
print(f"Price: {product['original_price']}")
print(f"Rating: {product['rating']}")
# Save options
save_format = input("\nSave as (csv/json/both): ").strip().lower()
if save_format in ['csv', 'both']:
scraper.save_to_csv(products, 'products.csv')
if save_format in ['json', 'both']:
scraper.save_to_json(products, 'products.json')
else:
print("No products found!")
elif choice == '5':
print("\nScraped data files:")
data_files = ['quotes.csv', 'quotes.json', 'books.csv', 'books.json',
'articles.csv', 'articles.json', 'products.csv', 'products.json']
for filename in data_files:
if os.path.exists(filename):
size = os.path.getsize(filename)
print(f" {filename} ({size} bytes)")
else:
print(f" {filename} (not found)")
elif choice == '0':
print("Thank you for using the Advanced Web Scraper!")
break
else:
print("Invalid choice. Please try again.")
except KeyboardInterrupt:
print("\n\nGoodbye!")
break
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
main()
# Advanced Web Scraping with BeautifulSoup
import requests
from bs4 import BeautifulSoup
import csv
import json
import time
import os
import re
from urllib.parse import urljoin, urlparse
from typing import List, Dict, Optional
import logging
class WebScraper:
def __init__(self, base_url: str = "", delay: float = 1.0):
self.base_url = base_url
self.delay = delay
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger(__name__)
def fetch_page(self, url: str) -> Optional[BeautifulSoup]:
"""Fetch a web page and return BeautifulSoup object"""
try:
self.logger.info(f"Fetching: {url}")
response = self.session.get(url, timeout=10)
response.raise_for_status()
# Add delay to be respectful
time.sleep(self.delay)
return BeautifulSoup(response.content, 'html.parser')
except requests.exceptions.RequestException as e:
self.logger.error(f"Error fetching {url}: {e}")
return None
def scrape_articles_from_website(self, base_url: str, max_pages: int = 5) -> List[Dict]:
"""Scrape articles from a news website"""
articles = []
# Example for scraping a blog/news site
for page in range(1, max_pages + 1):
url = f"{base_url}?page={page}"
soup = self.fetch_page(url)
if not soup:
continue
# Find article containers (adjust selectors based on target site)
article_containers = soup.find_all('article') or soup.find_all('div', class_=['post', 'article', 'entry'])
for container in article_containers:
article_data = self.extract_article_data(container, base_url)
if article_data:
articles.append(article_data)
return articles
def extract_article_data(self, container, base_url: str) -> Optional[Dict]:
"""Extract article data from HTML container"""
try:
# Extract title
title_elem = container.find('h1') or container.find('h2') or container.find('h3')
title = title_elem.get_text(strip=True) if title_elem else "No title"
# Extract link
link_elem = container.find('a')
link = urljoin(base_url, link_elem.get('href')) if link_elem else ""
# Extract excerpt/description
desc_elem = container.find('p') or container.find('div', class_=['excerpt', 'summary'])
description = desc_elem.get_text(strip=True) if desc_elem else ""
# Extract author
author_elem = container.find('span', class_=['author', 'by']) or container.find('div', class_='author')
author = author_elem.get_text(strip=True) if author_elem else "Unknown"
# Extract date
date_elem = container.find('time') or container.find('span', class_=['date', 'published'])
date = date_elem.get_text(strip=True) if date_elem else ""
# Extract tags/categories
tag_container = container.find('div', class_=['tags', 'categories'])
tags = []
if tag_container:
tag_links = tag_container.find_all('a')
tags = [tag.get_text(strip=True) for tag in tag_links]
return {
'title': title,
'link': link,
'description': description[:200] + "..." if len(description) > 200 else description,
'author': author,
'date': date,
'tags': tags
}
except Exception as e:
self.logger.error(f"Error extracting article data: {e}")
return None
def scrape_product_listings(self, base_url: str, max_pages: int = 3) -> List[Dict]:
"""Scrape product listings from an e-commerce site"""
products = []
for page in range(1, max_pages + 1):
url = f"{base_url}?page={page}"
soup = self.fetch_page(url)
if not soup:
continue
# Find product containers
product_containers = soup.find_all('div', class_=['product', 'item', 'listing'])
for container in product_containers:
product_data = self.extract_product_data(container, base_url)
if product_data:
products.append(product_data)
return products
def extract_product_data(self, container, base_url: str) -> Optional[Dict]:
"""Extract product data from HTML container"""
try:
# Extract product name
name_elem = container.find('h2') or container.find('h3') or container.find('a')
name = name_elem.get_text(strip=True) if name_elem else "No name"
# Extract price
price_elem = container.find('span', class_=['price', 'cost']) or container.find('div', class_='price')
price = price_elem.get_text(strip=True) if price_elem else "No price"
# Clean price (remove currency symbols, etc.)
price_match = re.search(r'[\d,]+\.?\d*', price)
clean_price = price_match.group() if price_match else "0"
# Extract image URL
img_elem = container.find('img')
image_url = urljoin(base_url, img_elem.get('src')) if img_elem else ""
# Extract product link
link_elem = container.find('a')
product_link = urljoin(base_url, link_elem.get('href')) if link_elem else ""
# Extract rating
rating_elem = container.find('div', class_=['rating', 'stars'])
rating = rating_elem.get_text(strip=True) if rating_elem else "No rating"
return {
'name': name,
'price': clean_price,
'original_price': price,
'image_url': image_url,
'product_link': product_link,
'rating': rating
}
except Exception as e:
self.logger.error(f"Error extracting product data: {e}")
return None
def scrape_social_media_posts(self, username: str, platform: str = "twitter") -> List[Dict]:
"""Scrape social media posts (demo - be mindful of ToS)"""
posts = []
# This is a simplified example - real implementation would need
# proper authentication and API usage
if platform.lower() == "twitter":
# Example URL structure (adjust based on actual requirements)
url = f"https://twitter.com/{username}"
soup = self.fetch_page(url)
if soup:
# Find tweet containers (adjust selectors based on current Twitter structure)
tweet_containers = soup.find_all('div', {'data-testid': 'tweet'})
for container in tweet_containers:
post_data = self.extract_social_post_data(container)
if post_data:
posts.append(post_data)
return posts
def extract_social_post_data(self, container) -> Optional[Dict]:
"""Extract social media post data"""
try:
# Extract post text
text_elem = container.find('div', {'data-testid': 'tweetText'})
text = text_elem.get_text(strip=True) if text_elem else ""
# Extract timestamp
time_elem = container.find('time')
timestamp = time_elem.get('datetime') if time_elem else ""
# Extract engagement metrics
likes_elem = container.find('div', {'data-testid': 'like'})
likes = likes_elem.get_text(strip=True) if likes_elem else "0"
retweets_elem = container.find('div', {'data-testid': 'retweet'})
retweets = retweets_elem.get_text(strip=True) if retweets_elem else "0"
return {
'text': text,
'timestamp': timestamp,
'likes': likes,
'retweets': retweets
}
except Exception as e:
self.logger.error(f"Error extracting social post data: {e}")
return None
def save_to_csv(self, data: List[Dict], filename: str):
"""Save scraped data to CSV file"""
if not data:
self.logger.warning("No data to save")
return
try:
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = data[0].keys()
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for row in data:
writer.writerow(row)
self.logger.info(f"Data saved to {filename}")
except Exception as e:
self.logger.error(f"Error saving to CSV: {e}")
def save_to_json(self, data: List[Dict], filename: str):
"""Save scraped data to JSON file"""
try:
with open(filename, 'w', encoding='utf-8') as jsonfile:
json.dump(data, jsonfile, indent=2, ensure_ascii=False)
self.logger.info(f"Data saved to {filename}")
except Exception as e:
self.logger.error(f"Error saving to JSON: {e}")
def scrape_quotes(self) -> List[Dict]:
"""Scrape quotes from quotes.toscrape.com (practice site)"""
base_url = "http://quotes.toscrape.com"
quotes = []
page = 1
while True:
url = f"{base_url}/page/{page}/"
soup = self.fetch_page(url)
if not soup:
break
quote_containers = soup.find_all('div', class_='quote')
if not quote_containers:
break
for quote_div in quote_containers:
try:
text = quote_div.find('span', class_='text').get_text()
author = quote_div.find('small', class_='author').get_text()
tags = [tag.get_text() for tag in quote_div.find_all('a', class_='tag')]
quotes.append({
'text': text,
'author': author,
'tags': tags
})
except Exception as e:
self.logger.error(f"Error extracting quote: {e}")
page += 1
return quotes
def scrape_books_info(self) -> List[Dict]:
"""Scrape book information from books.toscrape.com (practice site)"""
base_url = "http://books.toscrape.com"
books = []
page = 1
while True:
url = f"{base_url}/catalogue/page-{page}.html"
soup = self.fetch_page(url)
if not soup:
break
book_containers = soup.find_all('article', class_='product_pod')
if not book_containers:
break
for book_article in book_containers:
try:
title_elem = book_article.find('h3').find('a')
title = title_elem.get('title')
price_elem = book_article.find('p', class_='price_color')
price = price_elem.get_text()
availability_elem = book_article.find('p', class_='instock availability')
availability = availability_elem.get_text(strip=True)
rating_elem = book_article.find('p', class_='star-rating')
rating = rating_elem.get('class')[1] if rating_elem else 'No rating'
books.append({
'title': title,
'price': price,
'availability': availability,
'rating': rating
})
except Exception as e:
self.logger.error(f"Error extracting book info: {e}")
page += 1
# Limit to prevent infinite loop
if page > 50:
break
return books
def main():
"""Main function to demonstrate web scraping"""
scraper = WebScraper(delay=1.0)
while True:
print("\n=== Advanced Web Scraper ===")
print("1. Scrape quotes (practice site)")
print("2. Scrape books (practice site)")
print("3. Custom article scraping")
print("4. Custom product scraping")
print("5. View scraped data files")
print("0. Exit")
try:
choice = input("\nEnter your choice: ").strip()
if choice == '1':
print("Scraping quotes from quotes.toscrape.com...")
quotes = scraper.scrape_quotes()
if quotes:
print(f"Scraped {len(quotes)} quotes!")
# Show first few quotes
for i, quote in enumerate(quotes[:3], 1):
print(f"\nQuote {i}:")
print(f"Text: {quote['text'][:100]}...")
print(f"Author: {quote['author']}")
print(f"Tags: {', '.join(quote['tags'])}")
# Save options
save_format = input("\nSave as (csv/json/both): ").strip().lower()
if save_format in ['csv', 'both']:
scraper.save_to_csv(quotes, 'quotes.csv')
if save_format in ['json', 'both']:
scraper.save_to_json(quotes, 'quotes.json')
else:
print("No quotes found!")
elif choice == '2':
print("Scraping books from books.toscrape.com...")
books = scraper.scrape_books_info()
if books:
print(f"Scraped {len(books)} books!")
# Show first few books
for i, book in enumerate(books[:5], 1):
print(f"\nBook {i}:")
print(f"Title: {book['title'][:50]}...")
print(f"Price: {book['price']}")
print(f"Rating: {book['rating']}")
print(f"Availability: {book['availability']}")
# Save options
save_format = input("\nSave as (csv/json/both): ").strip().lower()
if save_format in ['csv', 'both']:
scraper.save_to_csv(books, 'books.csv')
if save_format in ['json', 'both']:
scraper.save_to_json(books, 'books.json')
else:
print("No books found!")
elif choice == '3':
base_url = input("Enter base URL for article scraping: ").strip()
if base_url:
max_pages = int(input("Enter max pages to scrape (default 3): ").strip() or "3")
print(f"Scraping articles from {base_url}...")
articles = scraper.scrape_articles_from_website(base_url, max_pages)
if articles:
print(f"Scraped {len(articles)} articles!")
# Show first few articles
for i, article in enumerate(articles[:3], 1):
print(f"\nArticle {i}:")
print(f"Title: {article['title'][:50]}...")
print(f"Author: {article['author']}")
print(f"Date: {article['date']}")
# Save options
save_format = input("\nSave as (csv/json/both): ").strip().lower()
if save_format in ['csv', 'both']:
scraper.save_to_csv(articles, 'articles.csv')
if save_format in ['json', 'both']:
scraper.save_to_json(articles, 'articles.json')
else:
print("No articles found!")
elif choice == '4':
base_url = input("Enter base URL for product scraping: ").strip()
if base_url:
max_pages = int(input("Enter max pages to scrape (default 3): ").strip() or "3")
print(f"Scraping products from {base_url}...")
products = scraper.scrape_product_listings(base_url, max_pages)
if products:
print(f"Scraped {len(products)} products!")
# Show first few products
for i, product in enumerate(products[:3], 1):
print(f"\nProduct {i}:")
print(f"Name: {product['name'][:50]}...")
print(f"Price: {product['original_price']}")
print(f"Rating: {product['rating']}")
# Save options
save_format = input("\nSave as (csv/json/both): ").strip().lower()
if save_format in ['csv', 'both']:
scraper.save_to_csv(products, 'products.csv')
if save_format in ['json', 'both']:
scraper.save_to_json(products, 'products.json')
else:
print("No products found!")
elif choice == '5':
print("\nScraped data files:")
data_files = ['quotes.csv', 'quotes.json', 'books.csv', 'books.json',
'articles.csv', 'articles.json', 'products.csv', 'products.json']
for filename in data_files:
if os.path.exists(filename):
size = os.path.getsize(filename)
print(f" {filename} ({size} bytes)")
else:
print(f" {filename} (not found)")
elif choice == '0':
print("Thank you for using the Advanced Web Scraper!")
break
else:
print("Invalid choice. Please try again.")
except KeyboardInterrupt:
print("\n\nGoodbye!")
break
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
main()
- Save the file.
- Run the following command to start the scraper.
C:\Users\username\Documents\advancedWebScraper> python webscrapingbeautifulsoup.py
Advanced Web Scraper
====================
1. Scrape News Articles
2. Scrape E-commerce Products
3. Scrape Social Media Posts
4. Custom URL Scraping
Choose scraping mode: 1
Target URL: https://example-news.com
β Found 25 articles
β Data exported to news_articles_20250903.csv
β Scraping completed in 45.2 seconds
C:\Users\username\Documents\advancedWebScraper> python webscrapingbeautifulsoup.py
Advanced Web Scraper
====================
1. Scrape News Articles
2. Scrape E-commerce Products
3. Scrape Social Media Posts
4. Custom URL Scraping
Choose scraping mode: 1
Target URL: https://example-news.com
β Found 25 articles
β Data exported to news_articles_20250903.csv
β Scraping completed in 45.2 seconds
Explanation
- The
from bs4 import BeautifulSoup
from bs4 import BeautifulSoup
imports the BeautifulSoup library for HTML parsing and extraction. - The
import requests
import requests
provides HTTP functionality for making web requests to target sites. - The
WebScraper
WebScraper
class manages all scraping operations and configuration settings. - Rate limiting prevents overwhelming target servers and reduces the risk of being blocked.
- User agent rotation makes requests appear to come from different browsers and devices.
- The
parse_html()
parse_html()
method extracts specific data elements using CSS selectors. - Content type detection automatically identifies article titles, prices, or post content.
- Error handling manages network issues, parsing errors, and missing elements gracefully.
- Export functionality saves scraped data to CSV, JSON, or database formats.
- Progress tracking provides real-time feedback during large scraping operations.
- Logging system records all operations for debugging and monitoring purposes.
- Retry mechanisms handle temporary failures and network interruptions automatically.
Next Steps
Congratulations! You have successfully created an Advanced Web Scraper in Python. Experiment with the code and see if you can modify the application. Here are a few suggestions:
- Add proxy support for anonymous scraping
- Implement JavaScript rendering with Selenium
- Create scheduled scraping with cron jobs
- Add data validation and cleaning features
- Implement distributed scraping across multiple servers
- Create real-time monitoring dashboards
- Add machine learning for content classification
- Implement captcha solving capabilities
- Create API endpoints for scraping services
Conclusion
In this project, you learned how to create an Advanced Web Scraper in Python using BeautifulSoup. You also learned about ethical web scraping, HTML parsing, data extraction techniques, and implementing professional scraping solutions. You can find the source code on GitHub
How It Works
1. WebScraper Class Architecture
class WebScraper:
def __init__(self, delay=1, max_retries=3):
self.delay = delay
self.max_retries = max_retries
self.session = requests.Session()
self.scraped_data = []
class WebScraper:
def __init__(self, delay=1, max_retries=3):
self.delay = delay
self.max_retries = max_retries
self.session = requests.Session()
self.scraped_data = []
The main class manages:
- Session Management: Persistent connections for efficiency
- Rate Limiting: Configurable delays between requests
- Data Storage: In-memory storage before export
- Error Tracking: Retry mechanisms and failure logging
2. User Agent Rotation
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
]
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
]
Multiple user agents help avoid detection by:
- Simulating different browsers and operating systems
- Reducing the chance of being blocked
- Appearing as organic traffic
3. Content Type Detection
The scraper automatically detects website types:
- News Sites: Extracts headlines, authors, dates, content
- E-commerce: Gets product names, prices, descriptions, reviews
- Social Media: Collects posts, usernames, timestamps, engagement
4. Data Export Options
def save_to_csv(self, filename):
# Export to CSV format
def save_to_json(self, filename):
# Export to JSON format
def save_to_database(self, db_name):
# Save to SQLite database
def save_to_csv(self, filename):
# Export to CSV format
def save_to_json(self, filename):
# Export to JSON format
def save_to_database(self, db_name):
# Save to SQLite database
Usage Examples
Basic Usage
# Create scraper instance
scraper = WebScraper(delay=2, max_retries=3)
# Scrape a single page
url = "https://example-news-site.com"
scraper.scrape_page(url)
# Export data
scraper.save_to_csv("scraped_data.csv")
# Create scraper instance
scraper = WebScraper(delay=2, max_retries=3)
# Scrape a single page
url = "https://example-news-site.com"
scraper.scrape_page(url)
# Export data
scraper.save_to_csv("scraped_data.csv")
Advanced Usage
# Scrape multiple URLs
urls = [
"https://news-site.com/article1",
"https://shop-site.com/products",
"https://social-site.com/posts"
]
for url in urls:
scraper.scrape_page(url)
# Save in multiple formats
scraper.save_to_json("data.json")
scraper.save_to_csv("data.csv")
# Scrape multiple URLs
urls = [
"https://news-site.com/article1",
"https://shop-site.com/products",
"https://social-site.com/posts"
]
for url in urls:
scraper.scrape_page(url)
# Save in multiple formats
scraper.save_to_json("data.json")
scraper.save_to_csv("data.csv")
Run the Application
python webscrapingbeautifulsoup.py
python webscrapingbeautifulsoup.py
Sample Output
Console Output
=== Advanced Web Scraper ===
Starting scraper with 2 second delay...
Scraping: https://example-news.com
β Successfully scraped 15 articles
Rate limiting: waiting 2 seconds...
Scraping: https://example-shop.com
β Successfully scraped 24 products
Rate limiting: waiting 2 seconds...
=== Scraping Complete ===
Total items scraped: 39
Exported to: scraped_data.csv
Exported to: scraped_data.json
=== Advanced Web Scraper ===
Starting scraper with 2 second delay...
Scraping: https://example-news.com
β Successfully scraped 15 articles
Rate limiting: waiting 2 seconds...
Scraping: https://example-shop.com
β Successfully scraped 24 products
Rate limiting: waiting 2 seconds...
=== Scraping Complete ===
Total items scraped: 39
Exported to: scraped_data.csv
Exported to: scraped_data.json
CSV Output Example
type,title,url,content,price,author,date
article,"Tech News Today","https://example.com/tech","Latest technology updates...",,"John Doe","2025-09-02"
product,"Smartphone XY","https://shop.com/phone","High-end smartphone","$699",,
article,"Market Update","https://example.com/market","Stock market analysis...",,"Jane Smith","2025-09-02"
type,title,url,content,price,author,date
article,"Tech News Today","https://example.com/tech","Latest technology updates...",,"John Doe","2025-09-02"
product,"Smartphone XY","https://shop.com/phone","High-end smartphone","$699",,
article,"Market Update","https://example.com/market","Stock market analysis...",,"Jane Smith","2025-09-02"
Advanced Features
1. Error Handling
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
except requests.exceptions.RequestException as e:
self.logger.error(f"Error fetching {url}: {e}")
return None
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
except requests.exceptions.RequestException as e:
self.logger.error(f"Error fetching {url}: {e}")
return None
2. Content Validation
def validate_content(self, soup, content_type):
"""Validate scraped content quality"""
if content_type == "article":
return len(soup.get_text()) > 100
elif content_type == "product":
return soup.find(class_="price") is not None
def validate_content(self, soup, content_type):
"""Validate scraped content quality"""
if content_type == "article":
return len(soup.get_text()) > 100
elif content_type == "product":
return soup.find(class_="price") is not None
3. Rate Limiting
def apply_rate_limit(self):
"""Apply configurable rate limiting"""
time.sleep(self.delay)
self.logger.info(f"Rate limiting: waited {self.delay} seconds")
def apply_rate_limit(self):
"""Apply configurable rate limiting"""
time.sleep(self.delay)
self.logger.info(f"Rate limiting: waited {self.delay} seconds")
Configuration Options
Scraper Settings
# Basic configuration
scraper = WebScraper(
delay=3, # Seconds between requests
max_retries=5, # Max retry attempts
timeout=15 # Request timeout
)
# Advanced configuration
scraper.configure(
user_agent_rotation=True,
content_validation=True,
export_format="both" # csv, json, or both
)
# Basic configuration
scraper = WebScraper(
delay=3, # Seconds between requests
max_retries=5, # Max retry attempts
timeout=15 # Request timeout
)
# Advanced configuration
scraper.configure(
user_agent_rotation=True,
content_validation=True,
export_format="both" # csv, json, or both
)
Customizing for Specific Sites
# Site-specific configurations
configs = {
"news_sites": {
"selectors": {
"title": "h1.headline",
"content": "div.article-body",
"author": "span.author-name"
}
},
"ecommerce_sites": {
"selectors": {
"title": "h1.product-title",
"price": "span.price",
"description": "div.description"
}
}
}
# Site-specific configurations
configs = {
"news_sites": {
"selectors": {
"title": "h1.headline",
"content": "div.article-body",
"author": "span.author-name"
}
},
"ecommerce_sites": {
"selectors": {
"title": "h1.product-title",
"price": "span.price",
"description": "div.description"
}
}
}
Best Practices
1. Respect robots.txt
def check_robots_txt(self, url):
"""Check if scraping is allowed"""
robots_url = urljoin(url, '/robots.txt')
# Implementation to parse robots.txt
def check_robots_txt(self, url):
"""Check if scraping is allowed"""
robots_url = urljoin(url, '/robots.txt')
# Implementation to parse robots.txt
2. Handle Dynamic Content
def handle_javascript(self, url):
"""Handle JavaScript-rendered content"""
# Use Selenium for dynamic content
from selenium import webdriver
driver = webdriver.Chrome()
driver.get(url)
html = driver.page_source
return BeautifulSoup(html, 'html.parser')
def handle_javascript(self, url):
"""Handle JavaScript-rendered content"""
# Use Selenium for dynamic content
from selenium import webdriver
driver = webdriver.Chrome()
driver.get(url)
html = driver.page_source
return BeautifulSoup(html, 'html.parser')
3. Data Cleaning
def clean_text(self, text):
"""Clean and normalize scraped text"""
import re
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters
text = re.sub(r'[^\w\s-.]', '', text)
return text.strip()
def clean_text(self, text):
"""Clean and normalize scraped text"""
import re
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters
text = re.sub(r'[^\w\s-.]', '', text)
return text.strip()
Troubleshooting
Common Issues
1. Getting Blocked
# Solutions:
- Increase delay between requests
- Rotate user agents more frequently
- Use proxy rotation
- Implement session management
# Solutions:
- Increase delay between requests
- Rotate user agents more frequently
- Use proxy rotation
- Implement session management
2. Dynamic Content Not Loading
# Solutions:
- Use Selenium WebDriver
- Wait for JavaScript to load
- Look for API endpoints
- Check network requests in browser
# Solutions:
- Use Selenium WebDriver
- Wait for JavaScript to load
- Look for API endpoints
- Check network requests in browser
3. Data Quality Issues
# Solutions:
- Implement content validation
- Use multiple CSS selectors
- Add data cleaning functions
- Verify extracted data
# Solutions:
- Implement content validation
- Use multiple CSS selectors
- Add data cleaning functions
- Verify extracted data
Legal and Ethical Considerations
Important Guidelines
- Check robots.txt: Always respect site policies
- Rate Limiting: Donβt overwhelm servers
- Terms of Service: Read and comply with site terms
- Copyright: Respect intellectual property rights
- Personal Data: Handle personal information carefully
Sample robots.txt Check
def is_scraping_allowed(self, url, user_agent='*'):
"""Check if scraping is allowed by robots.txt"""
from urllib.robotparser import RobotFileParser
robots_url = urljoin(url, '/robots.txt')
rp = RobotFileParser()
rp.set_url(robots_url)
rp.read()
return rp.can_fetch(user_agent, url)
def is_scraping_allowed(self, url, user_agent='*'):
"""Check if scraping is allowed by robots.txt"""
from urllib.robotparser import RobotFileParser
robots_url = urljoin(url, '/robots.txt')
rp = RobotFileParser()
rp.set_url(robots_url)
rp.read()
return rp.can_fetch(user_agent, url)
Extensions and Improvements
1. Add Database Support
import sqlite3
def save_to_database(self, db_name="scraped_data.db"):
"""Save data to SQLite database"""
conn = sqlite3.connect(db_name)
# Create tables and insert data
import sqlite3
def save_to_database(self, db_name="scraped_data.db"):
"""Save data to SQLite database"""
conn = sqlite3.connect(db_name)
# Create tables and insert data
2. Add Proxy Support
def setup_proxies(self):
"""Configure proxy rotation"""
self.proxies = [
{'http': 'http://proxy1:port'},
{'http': 'http://proxy2:port'}
]
def setup_proxies(self):
"""Configure proxy rotation"""
self.proxies = [
{'http': 'http://proxy1:port'},
{'http': 'http://proxy2:port'}
]
3. Add Monitoring Dashboard
def create_dashboard(self):
"""Create real-time scraping dashboard"""
# Use Flask or Streamlit for web interface
def create_dashboard(self):
"""Create real-time scraping dashboard"""
# Use Flask or Streamlit for web interface
Next Steps
After mastering this advanced web scraper, consider:
- Learn Selenium: For JavaScript-heavy sites
- Explore Scrapy: Professional scraping framework
- API Integration: Combine with API data sources
- Machine Learning: Automatic content classification
- Cloud Deployment: Scale with cloud platforms
Resources
- BeautifulSoup Documentation
- Requests Documentation
- Web Scraping Ethics
- robots.txt Specification
- Selenium Documentation
Conclusion
This advanced web scraper demonstrates professional-grade data extraction techniques. It includes essential features like rate limiting, error handling, and multiple export formats that are crucial for real-world applications. The modular design makes it easy to extend and customize for specific scraping needs.
Remember to always scrape responsibly, respect website policies, and consider the legal implications of your scraping activities. Happy scraping! π·οΈπ
Was this page helpful?
Let us know how we did