Now its time to improve the looks of the dashboard page...
So, I want to organise
the dashboard more professionally, instead of making it look so congested....All
the functionality is properly done so no changes in the functionality but I want to
make it look very clean immersive professional and attractive so that users will
have a good experience using it
import pandas as pd
import joblib
import time
import csv
import numpy as np
from flask import Flask, render_template, request, jsonify
import plotly.express as px
from dash import dcc, html
import matplotlib.pyplot as plt
from fpdf import FPDF
import os
import dash
from dash.dependencies import Input, Output
from scapy.all import sniff, IP
from statsmodels.tsa.arima.model import ARIMA
from sklearn.ensemble import IsolationForest
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from fpdf import FPDF
import threading
app = Flask(__name__)
file_name = "network_data.csv"
subscribed_emails = set()
def load_data():
try:
df = pd.read_csv(file_name, header=None, names=["Timestamp", "Source IP",
"Destination IP", "Protocol", "Packet Size", "Anomaly"],
on_bad_lines='skip', engine='python', dtype=str)
# Explicitly define the timestamp format based on your CSB data
df['Timestamp'] = pd.to_datetime(df['Timestamp'], format="%Y-%m-%d %H:%M:
%S", errors='coerce')
# Convert 'Packet Size' to numeric and remove non-numeric characters
df['Packet Size'] = pd.to_numeric(df['Packet Size'].str.replace(r'\D', '',
regex=True), errors='coerce')
df.dropna(inplace=True)
# Convert 'Anomaly' column to boolean values
df['Anomaly'] = df['Anomaly'].map({'True': True, 'False': False})
df.dropna(inplace=True)
return df
except Exception as e:
print(f"❌ Error loading CSV: {e}")
return pd.DataFrame()
def train_arima():
df = load_data()
if df.empty:
return
df = df[['Timestamp', 'Packet
Size']].set_index('Timestamp').resample('T').mean().fillna(0)
df['Packet Size'] = df['Packet Size'].diff().fillna(0)
model = ARIMA(df['Packet Size'], order=(1,1,1))
arima_model = model.fit()
joblib.dump(arima_model, "arima_model.pkl")
def train_isolation_forest():
df = load_data()
if df.empty:
return
if len(df) > 1000:
df = df[-1000:]
model = IsolationForest(contamination=0.01, random_state=42)
model.fit(df[['Packet Size']])
joblib.dump(model, "isolation_forest.pkl")
def load_arima():
return joblib.load("arima_model.pkl")
def load_isolation_forest():
return joblib.load("isolation_forest.pkl")
arima_model = None
isolation_forest_model = None
def load_models():
global arima_model, isolation_forest_model
try:
arima_model = joblib.load("arima_model.pkl")
isolation_forest_model = joblib.load("isolation_forest.pkl")
print("✅ Models loaded successfully!")
except Exception as e:
print(f"⚠️ Model loading failed: {e}")
def detect_anomalies(packet_size):
try:
arima_model = load_arima()
isolation_model = load_isolation_forest()
X_new = pd.DataFrame([[packet_size]], columns=['Packet Size'])
isolation_prediction = isolation_model.predict(X_new)[0] == -1
forecast = arima_model.forecast(steps=1)[0]
arima_anomaly = packet_size > forecast * 2
return arima_anomaly and isolation_prediction
except Exception as e:
print(f"⚠️ Error in anomaly detection: {e}")
return False
def send_email_alert(packet_size, src_ip, dst_ip):
try:
sender_email = "nuhayd.m.k@gmail.com"
password = "unsq dtzd vlnd eylz"
subject = "🚨 Network Anomaly Detected"
message = f"Alert! Anomalous packet detected.\nPacket Size: {packet_size}\
nSource IP: {src_ip}\nDestination IP: {dst_ip}"
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(sender_email, password)
for email in subscribed_emails:
msg = MIMEMultipart()
msg['From'] = sender_email
msg['To'] = email
msg['Subject'] = subject
msg.attach(MIMEText(message, 'plain'))
server.send_message(msg)
server.quit()
print(f"📩 Email alert sent to {len(subscribed_emails)} users!")
except Exception as e:
print(f"⚠️ Failed to send email: {e}")
def packet_callback(packet):
try:
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
src_ip = packet[IP].src if packet.haslayer(IP) else "N/A"
dst_ip = packet[IP].dst if packet.haslayer(IP) else "N/A"
protocol = packet.summary()
packet_size = len(packet)
anomaly = detect_anomalies(packet_size)
with open(file_name, mode='a', newline='') as file:
writer = csv.writer(file)
writer.writerow([timestamp, src_ip, dst_ip, protocol, packet_size,
anomaly])
except Exception as e:
print(f"⚠️ Error processing packet: {e}")
def start_sniffing():
sniff(prn=packet_callback, store=False)
dash_app = dash.Dash(__name__, server=app, routes_pathname_prefix='/dashboard/')
dash_app.layout = html.Div([
html.H1("🚨 Real-Time Network Traffic Monitoring"),
dcc.Graph(id="live-graph"),
html.Div(id="anomaly-alerts", style={'color': 'red', 'font-weight': 'bold',
'font-size': '18px'}),
dcc.Interval(id="interval-update", interval=60000, n_intervals=0)
])
@dash_app.callback(
[Output("live-graph", "figure"), Output("anomaly-alerts", "children")],
[Input("interval-update", "n_intervals")]
)
def update_graph(n):
df = load_data()
if df.empty:
return px.scatter(title="No Data Available"), "✅ No anomalies detected"
# Mark anomalies in red
df['Color'] = df['Anomaly'].apply(lambda x: 'red' if x else 'blue')
# **Possible Reason Mapping** (Customize as per your network knowledge)
df['Reason'] = df['Packet Size'].apply(lambda x:
"Unusual High Traffic" if x > 15000 else
"Suspicious Small Packets" if x < 500 else
"Potential Data Exfiltration"
)
# **Plotly Graph with Source IP & Reason in Hover**
fig = px.scatter(
df, x="Timestamp", y="Packet Size", color='Color',
title="Live Network Bandwidth (Anomalies in Red)",
labels={'Color': 'Anomaly Status'},
hover_data=["Source IP", "Reason"] # Adding Source IP & Reason
)
# **Anomaly Alert Message**
anomalies = df[df['Anomaly']]
if anomalies.empty:
alert_message = "✅ No anomalies detected"
else:
alert_message = "🚨 Anomalies detected!\n"
for index, row in anomalies.iterrows():
alert_message += f"🔴 {row['Timestamp']} | Source: {row['Source IP']} |
{row['Reason']}\n"
return fig, alert_message
@app.route('/get_anomalies')
def get_anomalies():
df = load_data()
if df.empty:
return jsonify({"anomalies": [], "totalPackets": 0, "totalAnomalies": 0})
anomalies = df[df['Anomaly'] == True].tail(5).to_dict('records')
anomaly_messages = [f"{a['Timestamp']} | {a['Source IP']} ➝ {a['Destination
IP']} | {a['Packet Size']} bytes" for a in anomalies]
return jsonify({
"anomalies": anomaly_messages,
"totalPackets": len(df),
"totalAnomalies": len(df[df['Anomaly'] == True])
})
import os
# File to store subscribed emails
SUBSCRIPTION_FILE = "subscribed_emails.txt"
# Load previously subscribed emails from file
def load_subscribed_emails():
if os.path.exists(SUBSCRIPTION_FILE):
with open(SUBSCRIPTION_FILE, "r") as file:
return set(file.read().splitlines())
return set()
# Save the updated subscribed emails to file
def save_subscribed_email(email):
with open(SUBSCRIPTION_FILE, "a") as file:
file.write(email + "\n")
# Load emails when the app starts
subscribed_emails = load_subscribed_emails()
@app.route('/subscribe_email', methods=['POST'])
def subscribe_email():
email = request.form.get("email")
if not email:
return jsonify({"message": "❌ Invalid email!"})
if email in subscribed_emails:
return jsonify({"message": "⚠️ You are already subscribed!"}) # Prevent
duplicate subscription
# Add email to the subscription list
subscribed_emails.add(email)
save_subscribed_email(email) # Save it to file
# Send confirmation email
try:
sender_email = "nuhayd.m.k@gmail.com"
password = "unsq dtzd vlnd eylz"
subject = "✅ Subscription Confirmed: Network Alerts"
message = f"Hello,\n\nYou have successfully subscribed to network anomaly
alerts. You will receive an email if any unusual high traffic is detected.\n\
nRegards,\nNetwork Monitoring System"
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(sender_email, password)
msg = MIMEMultipart()
msg['From'] = sender_email
msg['To'] = email
msg['Subject'] = subject
msg.attach(MIMEText(message, 'plain'))
server.send_message(msg)
server.quit()
return jsonify({"message": "✅ Email subscribed successfully! Confirmation
email sent."})
except Exception as e:
print(f"⚠️ Failed to send confirmation email: {e}")
return jsonify({"message": "⚠️ Subscription successful, but failed to send
confirmation email."})
def wait_for_models():
while not os.path.exists("arima_model.pkl") or not
os.path.exists("isolation_forest.pkl"):
print("⏳ Waiting for models to be trained...")
time.sleep(5)
def generate_anomaly_report():
df = load_data()
if df.empty:
return None
# Generate Traffic Trend Graph
fig, ax = plt.subplots(figsize=(8, 4))
df.plot(x="Timestamp", y="Packet Size", ax=ax, title="Network Traffic Trend",
legend=False)
ax.set_xlabel("Time")
ax.set_ylabel("Packet Size")
plt.xticks(rotation=45)
plt.tight_layout()
traffic_graph = "traffic_graph.png"
plt.savefig(traffic_graph)
plt.close()
# Generate Pie Chart (Legit vs. Anomalous)
anomaly_counts = df["Anomaly"].value_counts()
labels = ["Legit Packets", "Anomalous Packets"]
sizes = [anomaly_counts.get(False, 0), anomaly_counts.get(True, 0)]
colors = ["green", "red"]
fig, ax = plt.subplots(figsize=(5, 5))
ax.pie(sizes, labels=labels, colors=colors, autopct="%1.1f%%", startangle=90,
wedgeprops={"edgecolor": "black"})
ax.set_title("Packet Distribution")
pie_chart = "pie_chart.png"
plt.savefig(pie_chart)
plt.close()
suspicious_packet = df[df["Anomaly"] == True].nlargest(1, "Packet Size")
if not suspicious_packet.empty:
most_suspicious = suspicious_packet.iloc[0]
source_ip = most_suspicious["Source IP"]
destination_ip = most_suspicious["Destination IP"]
packet_size = most_suspicious["Packet Size"]
reason = "Unusually high traffic detected."
else:
source_ip = "N/A"
destination_ip = "N/A"
packet_size = "N/A"
reason = "No major anomalies detected."
solution = "Investigate the source IP, check for large data exfiltration, and
monitor for unusual access patterns."
# **Step 5: Create PDF Report**
pdf = FPDF()
pdf.set_auto_page_break(auto=True, margin=15)
pdf.add_page()
pdf.set_font("Arial", "B", 16)
pdf.cell(200, 10, "Network Anomaly Report", ln=True, align="C")
# **Add Suspicious Packet Details**
pdf.ln(10)
pdf.set_font("Arial", "B", 12)
pdf.cell(200, 10, "🔴 Most Suspicious Packet Detected:", ln=True)
pdf.set_font("Arial", "", 12)
pdf.multi_cell(0, 8, f"📌 Source IP: {source_ip}\n📌 Destination IP:
{destination_ip}\n📌 Packet Size: {packet_size} bytes\n📌 Reason: {reason}")
# **Add Solution**
pdf.ln(5)
pdf.set_font("Arial", "B", 12)
pdf.cell(200, 10, "🛠 Suggested Solution:", ln=True)
pdf.set_font("Arial", "", 12)
pdf.multi_cell(0, 8, f"{solution}")
# **Insert Traffic Graph**
pdf.ln(10)
pdf.image(traffic_graph, x=10, w=180)
# **Insert Pie Chart**
pdf.ln(10)
pdf.image(pie_chart, x=40, w=120)
# **Save PDF Report**
pdf_filename = "anomaly_report.pdf"
pdf.output(pdf_filename)
# Cleanup Images
os.remove(traffic_graph)
os.remove(pie_chart)
return pdf_filename
def send_email_alert(packet_size, src_ip, dst_ip):
try:
if packet_size <= 15000: # alerts for high traffic only
return
sender_email = "nuhayd.m.k@gmail.com"
password = "unsq dtzd vlnd eylz"
subject = "🚨 High Traffic Alert: Network Anomaly Detected"
message = f"⚠️ Unusual High Traffic detected!\nPacket Size: {packet_size}
bytes\nSource IP: {src_ip}\nDestination IP: {dst_ip}\n\n📎 Attached: Detailed
Network Report (PDF)."
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(sender_email, password)
# Generate the PDF Report
pdf_filename = generate_anomaly_report()
for email in subscribed_emails:
msg = MIMEMultipart()
msg['From'] = sender_email
msg['To'] = email
msg['Subject'] = subject
msg.attach(MIMEText(message, 'plain'))
if pdf_filename:
with open(pdf_filename, "rb") as attachment:
pdf_attachment = MIMEText(attachment.read(), "base64")
pdf_attachment.add_header("Content-Disposition", f"attachment;
filename={pdf_filename}")
msg.attach(pdf_attachment)
server.send_message(msg)
server.quit()
os.remove(pdf_filename) # Cleanup
print(f"📩 High traffic alert with PDF report sent to
{len(subscribed_emails)} users!")
except Exception as e:
print(f"⚠️ Failed to send high traffic alert: {e}")
@app.route('/')
def index():
return render_template("index.html")
if __name__ == "__main__":
threading.Thread(target=start_sniffing, daemon=True).start()
train_arima()
train_isolation_forest()
wait_for_models() # Ensure models exist before starting Flask
load_models()
app.run(debug=True, port=5000)
my app.py
and index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Network Monitoring Dashboard</title>
<link rel="stylesheet" href="{{ url_for('static', filename='styles.css')}}">
<link rel="stylesheet"
href="https://cdnjs.cloudflare.com/ajax/libs/bootstrap/5.3.0/css/bootstrap.min.css"
>
<script
src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.6.0/jquery.min.js"></script>
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-dark">
<div class="container-fluid">
<a class="navbar-brand" href="#">Network Monitoring System</a>
</div>
</nav>
<div class="container mt-4">
<div class="row">
<div class="col-md-6">
<div class="card p-3">
<h4>Live Network Traffic</h4>
<iframe src="/dashboard/" width="100%" height="400px"
style="border: none;"></iframe>
</div>
</div>
<div class="col-md-6">
<div class="card p-3">
<h4>Real-Time Anomaly Alerts</h4>
<ul id="anomalyList" class="list-group"></ul>
</div>
</div>
</div>
<div class="row mt-4">
<div class="col-md-6">
<div class="card p-3">
<h4>Email Alerts</h4>
<form id="emailForm">
<label for="email">Enter Email for Alerts:</label>
<input type="email" id="email" class="form-control mb-2"
required>
<button type="submit" class="btn
btn-primary">Subscribe</button>
</form>
<div id="responseMessage" class="mt-3"></div>
</div>
</div>
<div class="col-md-6">
<div class="card p-3">
<h4>Network Statistics</h4>
<p><strong>Total Packets Monitored:</strong> <span
id="totalPackets">0</span></p>
<p><strong>Total Anomalies Detected:</strong> <span
id="totalAnomalies">0</span></p>
</div>
</div>
</div>
</div>
<script>
$(document).ready(function(){
$('#emailForm').submit(function(event){
event.preventDefault();
let email = $('#email').val();
$.post('/subscribe_email', {email: email},
function(response){
$('#responseMessage').text(response.message);
}, 'json');
});
function fetchAnomalies() {
$.getJSON('/get_anomalies', function(data) {
$('#anomalyList').empty();
$('#totalPackets').text(data.totalPackets);
$('#totalAnomalies').text(data.totalAnomalies);
data.anomalies.forEach(function(anomaly) {
$('#anomalyList').append('<li class="list-group-
item list-group-item-danger">' + anomaly + '</li>');
});
});
}
$('#sendAlert').click(function(){
$.post('/send_email_alert', function(response){
alert(response.message);
}, 'json');
});
setInterval(fetchAnomalies, 5000);
});
</script>
</body>
</html>