Advanced Network Traffic Monitor
Abstract
Advanced Network Traffic Monitor is a Python project that enables real-time monitoring and analysis of network traffic. The application captures packets, analyzes protocols, and visualizes traffic statistics. It demonstrates network programming, packet parsing, and data visualization, making it suitable for security analysis and network diagnostics.
Prerequisites
- Python 3.8 or above
- A code editor or IDE
- Basic understanding of networking concepts
- Required libraries:
scapy
scapy
,matplotlib
matplotlib
- Administrator/root privileges for packet capture
Before you Start
Install Python and the required libraries:
Install dependencies
pip install scapy matplotlib
Install dependencies
pip install scapy matplotlib
Getting Started
Create a Project
- Create a folder named
advanced-network-traffic-monitor
advanced-network-traffic-monitor
. - Open the folder in your code editor or IDE.
- Create a file named
advanced_network_traffic_monitor.py
advanced_network_traffic_monitor.py
. - Copy the code below into your file.
Write the Code
⚙️ Advanced Network Traffic Monitor
Advanced Network Traffic Monitor
"""
Advanced Network Traffic Monitor
Features:
- Network traffic analysis
- Visualization
- Anomaly detection
- Modular design
- CLI interface
- Error handling
"""
import psutil
import time
import sys
import matplotlib.pyplot as plt
import numpy as np
class TrafficMonitor:
def __init__(self):
self.data = []
self.timestamps = []
def collect(self, duration=60):
print("Collecting network traffic data...")
for _ in range(duration):
stats = psutil.net_io_counters()
self.data.append(stats.bytes_sent + stats.bytes_recv)
self.timestamps.append(time.time())
time.sleep(1)
def detect_anomaly(self):
arr = np.array(self.data)
mean = arr.mean()
std = arr.std()
anomalies = [(i, v) for i, v in enumerate(arr) if abs(v - mean) > 2*std]
return anomalies
def visualize(self):
plt.plot(self.timestamps, self.data, label='Traffic')
anomalies = self.detect_anomaly()
for idx, val in anomalies:
plt.scatter(self.timestamps[idx], val, color='r', label='Anomaly' if idx==anomalies[0][0] else "")
plt.xlabel('Time')
plt.ylabel('Bytes')
plt.title('Network Traffic Over Time')
plt.legend()
plt.show()
class CLI:
@staticmethod
def run():
monitor = TrafficMonitor()
monitor.collect(60)
print("Visualizing...")
monitor.visualize()
anomalies = monitor.detect_anomaly()
print(f"Anomalies detected: {anomalies}")
if __name__ == "__main__":
try:
CLI.run()
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
Advanced Network Traffic Monitor
"""
Advanced Network Traffic Monitor
Features:
- Network traffic analysis
- Visualization
- Anomaly detection
- Modular design
- CLI interface
- Error handling
"""
import psutil
import time
import sys
import matplotlib.pyplot as plt
import numpy as np
class TrafficMonitor:
def __init__(self):
self.data = []
self.timestamps = []
def collect(self, duration=60):
print("Collecting network traffic data...")
for _ in range(duration):
stats = psutil.net_io_counters()
self.data.append(stats.bytes_sent + stats.bytes_recv)
self.timestamps.append(time.time())
time.sleep(1)
def detect_anomaly(self):
arr = np.array(self.data)
mean = arr.mean()
std = arr.std()
anomalies = [(i, v) for i, v in enumerate(arr) if abs(v - mean) > 2*std]
return anomalies
def visualize(self):
plt.plot(self.timestamps, self.data, label='Traffic')
anomalies = self.detect_anomaly()
for idx, val in anomalies:
plt.scatter(self.timestamps[idx], val, color='r', label='Anomaly' if idx==anomalies[0][0] else "")
plt.xlabel('Time')
plt.ylabel('Bytes')
plt.title('Network Traffic Over Time')
plt.legend()
plt.show()
class CLI:
@staticmethod
def run():
monitor = TrafficMonitor()
monitor.collect(60)
print("Visualizing...")
monitor.visualize()
anomalies = monitor.detect_anomaly()
print(f"Anomalies detected: {anomalies}")
if __name__ == "__main__":
try:
CLI.run()
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
Example Usage
Run the network monitor
sudo python advanced_network_traffic_monitor.py
Run the network monitor
sudo python advanced_network_traffic_monitor.py
Explanation
Key Features
- Packet Capture: Uses Scapy to capture live network packets.
- Protocol Analysis: Identifies and counts protocols (TCP, UDP, ICMP).
- Traffic Visualization: Plots protocol distribution using matplotlib.
- Error Handling: Manages permissions and invalid inputs.
- CLI Interface: Interactive command-line usage.
Code Breakdown
- Import Libraries and Setup Capture
advanced_network_traffic_monitor.py
from scapy.all import sniff
import matplotlib.pyplot as plt
advanced_network_traffic_monitor.py
from scapy.all import sniff
import matplotlib.pyplot as plt
- Packet Analysis Function
advanced_network_traffic_monitor.py
protocol_counts = {'TCP': 0, 'UDP': 0, 'ICMP': 0, 'Other': 0}
def analyze_packet(packet):
if packet.haslayer('TCP'):
protocol_counts['TCP'] += 1
elif packet.haslayer('UDP'):
protocol_counts['UDP'] += 1
elif packet.haslayer('ICMP'):
protocol_counts['ICMP'] += 1
else:
protocol_counts['Other'] += 1
advanced_network_traffic_monitor.py
protocol_counts = {'TCP': 0, 'UDP': 0, 'ICMP': 0, 'Other': 0}
def analyze_packet(packet):
if packet.haslayer('TCP'):
protocol_counts['TCP'] += 1
elif packet.haslayer('UDP'):
protocol_counts['UDP'] += 1
elif packet.haslayer('ICMP'):
protocol_counts['ICMP'] += 1
else:
protocol_counts['Other'] += 1
- Start Packet Capture
advanced_network_traffic_monitor.py
def start_capture():
print("Capturing packets... Press Ctrl+C to stop.")
try:
sniff(prn=analyze_packet, store=0)
except PermissionError:
print("Error: Run as administrator/root.")
advanced_network_traffic_monitor.py
def start_capture():
print("Capturing packets... Press Ctrl+C to stop.")
try:
sniff(prn=analyze_packet, store=0)
except PermissionError:
print("Error: Run as administrator/root.")
- Visualize Protocol Distribution
advanced_network_traffic_monitor.py
def visualize():
labels = list(protocol_counts.keys())
sizes = list(protocol_counts.values())
plt.pie(sizes, labels=labels, autopct='%1.1f%%')
plt.title('Protocol Distribution')
plt.show()
advanced_network_traffic_monitor.py
def visualize():
labels = list(protocol_counts.keys())
sizes = list(protocol_counts.values())
plt.pie(sizes, labels=labels, autopct='%1.1f%%')
plt.title('Protocol Distribution')
plt.show()
- CLI Interface and Error Handling
advanced_network_traffic_monitor.py
def main():
print("Advanced Network Traffic Monitor")
while True:
cmd = input('> ')
if cmd == 'capture':
start_capture()
elif cmd == 'visualize':
visualize()
elif cmd == 'exit':
break
else:
print("Unknown command. Type 'capture', 'visualize', or 'exit'.")
if __name__ == "__main__":
main()
advanced_network_traffic_monitor.py
def main():
print("Advanced Network Traffic Monitor")
while True:
cmd = input('> ')
if cmd == 'capture':
start_capture()
elif cmd == 'visualize':
visualize()
elif cmd == 'exit':
break
else:
print("Unknown command. Type 'capture', 'visualize', or 'exit'.")
if __name__ == "__main__":
main()
Features
- Real-Time Monitoring: Captures and analyzes live traffic
- Protocol Analysis: Identifies and counts protocols
- Visualization: Plots protocol distribution
- Error Handling: Manages permissions and exceptions
- Production-Ready: Modular and maintainable code
Next Steps
Enhance the project by:
- Adding support for more protocols
- Logging captured packets to a file
- Creating a GUI for visualization
- Integrating with intrusion detection systems
- Adding batch analysis of pcap files
- Unit testing for reliability
Educational Value
This project teaches:
- Network Programming: Packet capture and analysis
- Data Visualization: Plotting statistics
- Software Design: Modular, maintainable code
- Error Handling: Writing robust Python code
Real-World Applications
- Network Diagnostics
- Security Analysis
- Traffic Engineering
- Educational Tools
Conclusion
Advanced Network Traffic Monitor provides a robust framework for real-time network analysis and visualization. With extensible design and error handling, it is suitable for diagnostics, security, and educational use. For more advanced projects, visit Python Central Hub.
Was this page helpful?
Let us know how we did