🎯 Pourquoi un système de surveillance réseau temps réel ?
Le problème
Les réseaux font face à deux types de menaces simultanées : menaces externes (scans, brute-force, exploits) et vulnérabilités internes (services obsolètes, configurations faibles). Le vrai danger survient lors de la corrélation : quand une menace externe cible précisément une vulnérabilité interne.
CyberScan Live 2.0 unifie la détection dans une interface unique avec analyse IA en temps réel.
Les trois piliers
- Détection active : Scanner le LAN pour trouver les équipements et leurs vulnérabilités
- Surveillance passive : Capturer le trafic entrant pour détecter les attaques
- Corrélation intelligente : Croiser les données pour identifier les risques critiques
🏗️ Architecture du système
Architecture événementielle distribuée avec 5 modules communicant via WebSocket.
Interface Streamlit → WebSocket Client → Serveur WebSocket
↓ ↓
File Watcher ← Sniffer → Security Analyzer → IA Pédagogique
Flux de données
| Étape | Module | Action |
|---|---|---|
| 1 | Sniffer | Capture paquets TCP/UDP entrants |
| 2 | File Watcher | Détecte nouvelles lignes du log |
| 3 | Network Scanner | Scan subnet toutes les 30s |
| 4 | Security Analyzer | Scan ports + Banner grabbing |
| 5 | IA Pédagogique | Génère explications structurées |
| 6 | Dashboard | Corrélation trafic × vulnérabilités |
🔧 Composants clés
1. Banner Grabbing
Connexion aux ports ouverts pour lire les bannières de service et identifier les versions vulnérables.
Exemple : Bannière SSH
SSH-2.0-OpenSSH_7.4
Révèle OpenSSH 7.4 avec CVE-2018-15473
def _scan_ports(self, ip, ports_to_scan):
open_ports_map = {}
for port in ports_to_scan:
banner = ""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(0.5) # Timeout connexion
if sock.connect_ex((ip, port)) == 0:
sock.settimeout(1.0) # Timeout réception
banner = sock.recv(1024).decode('utf-8', errors='ignore').strip()
open_ports_map[port] = banner
except Exception:
pass
return open_ports_map
💡 Pourquoi deux timeouts ?
- 0.5s connexion : Évite de bloquer sur ports fermés
- 1s réception : Laisse le temps au service de répondre
2. Enrichissement IP avec AbuseIPDB
Chaque IP est enrichie avec score de réputation (0-100) et code pays.
def get_abuseipdb_info(ip):
if ip in ip_cache:
return ip_cache[ip]
url = "https://api.abuseipdb.com/api/v2/check"
params = {'ipAddress': ip, 'maxAgeInDays': '90'}
headers = {'Key': ABUSEIPDB_API_KEY}
response = requests.get(url, headers=headers, params=params, timeout=5)
if response.status_code == 200:
data = response.json()['data']
score = data.get('abuseConfidenceScore', 0)
country = data.get('countryCode', 'N/A')
ip_cache[ip] = (score, country)
return (score, country)
return (-1, "N/A")
3. IA Pédagogique : Redondance Ollama → Groq
Stratégie de "double timeout" pour garantir la disponibilité.
INIT: Test Cold Start (Ollama - 90s) ✅ Succès → Client strict (timeout 20s) ❌ Échec → Basculement vers Groq Cloud
def __init__(self):
# 1. Test Cold Start (90s)
try:
init_client = ollama.Client(timeout=90.0)
init_client.chat(model='mistral', messages=[{'role':'user','content':'Test'}])
# 2. Client strict pour runtime (20s)
self.llm_local = ollama.Client(timeout=20.0)
except TimeoutError:
self.llm_local = None
self.local_llm_failing_reason = "TIMEOUT_INIT"
# 3. Fallback Groq
self.llm_cloud = ChatGroq(model="llama-3.3-70b-versatile")
💡 Pourquoi cette stratégie ?
Ollama charge le modèle en RAM au 1er appel (30-90s). Les appels suivants sont rapides (<2s).
4. Corrélation de menaces
Croise trafic externe (IP attaquante, port ciblé) avec vulnérabilités internes (port ouvert).
def _run_threat_correlation(self, traffic_df, internal_alerts_map):
external_threats = traffic_df[traffic_df['score'] > 50]
vulnerable_endpoints = set()
for ip, alerts in internal_alerts_map.items():
for alert in alerts:
vulnerable_endpoints.add((ip, int(alert['port'])))
correlated = []
for _, threat in external_threats.iterrows():
if (threat['dst_ip'], threat['dst_port']) in vulnerable_endpoints:
correlated.append({
'type': 'Corrélation',
'risk_level': 'Critique',
'external_ip': threat['src_ip'],
'target_ip': threat['dst_ip'],
'target_port': threat['dst_port']
})
return correlated
Exemple concret
T+0 : Scanner détecte RDP ouvert (192.168.1.50:3389)
T+30min : Sniffer capture 50 tentatives RDP depuis 45.x.x.x (score 87)
Résultat : Alerte critique "ATTAQUE ACTIVE DÉTECTÉE"
⚡ WebSocket temps réel
Pourquoi WebSocket vs HTTP polling ?
| Critère | HTTP Polling | WebSocket |
|---|---|---|
| Latence | 1-5 secondes | <50ms |
| Overhead | Headers répétés | Connexion persistante |
| Scalabilité | Limitée | Excellente |
Implémentation serveur
from fastapi import FastAPI, WebSocket
app = FastAPI()
active_connections = []
@app.websocket("/ws/events")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
active_connections.append(websocket)
try:
while True:
await asyncio.sleep(1)
except WebSocketDisconnect:
active_connections.remove(websocket)
async def broadcast_event(event: dict):
for conn in active_connections:
await conn.send_json(event)
🚀 Optimisations
Cache intelligent
from functools import lru_cache
@lru_cache(maxsize=1000)
def get_abuseipdb_cached(ip: str):
return get_abuseipdb_info(ip)
Performance scan réseau
| Méthode | Temps (254 IPs) |
|---|---|
| Nmap (Python) | ~15 secondes |
| Socket raw | ~45 secondes |
| Async (asyncio) | ~8 secondes |
🛡️ Sécurité et cadre légal
⚠️ Privilèges élevés nécessaires
- Windows : PowerShell en Administrateur
- Linux :
sudo setcap cap_net_raw+ep /usr/bin/python3
💡 Cadre légal
En France, le scan de ports est légal sur votre propre infrastructure. Scanner des tiers sans autorisation = infraction (Code pénal art. 323-1).
🎯 Cas d'usage
Scénario 1 : Détection botnet IoT
Contexte : Caméra IP avec Telnet ouvert
T+0 : Alerte "Port 23 ouvert"
T+2h : 300 tentatives depuis Chine (score 92)
Résultat : Isolation VLAN avant compromission ✅
Scénario 2 : Audit ANSSI
non_compliant = []
for device, alerts in alerts_map.items():
for alert in alerts:
if alert['port'] in [21, 23, 80]: # FTP, Telnet, HTTP
non_compliant.append(device)
🚢 Déploiement Docker
FROM python:3.11-slim
RUN apt-get update && apt-get install -y tcpdump libpcap-dev nmap
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "websocket_server:app", "--host", "0.0.0.0"]
version: '3.8'
services:
websocket:
build: .
ports: ["8000:8000"]
sniffer:
build: .
command: python sniffer.py
network_mode: host
cap_add: [NET_ADMIN, NET_RAW]
streamlit:
build: .
command: streamlit run app.py
ports: ["8501:8501"]
🔮 Évolutions futures
ML pour anomalies
from sklearn.ensemble import IsolationForest
model = IsolationForest(contamination=0.1)
model.fit(traffic_features)
prediction = model.predict([current_traffic]) # -1 = anomalie
Intégration MISP
from pymisp import PyMISP
misp = PyMISP('https://misp.local', 'API_KEY')
result = misp.search(value=ip, type_attribute='ip-dst')
🎓 Conclusion
Ce que vous avez appris :
- ✅ Architecture événementielle avec WebSocket
- ✅ Banner grabbing pour détection de versions
- ✅ Enrichissement API (AbuseIPDB)
- ✅ Corrélation de données hétérogènes
- ✅ Redondance IA (Ollama/Groq)
- ✅ Optimisations (cache, async)
Compétences développées :
- 🐍 Python avancé (asyncio, threading, sockets)
- 🌐 Protocoles réseau (TCP/IP, WebSocket)
- 🤖 IA/ML (LLM, prompts structurés)
- 📊 Data Science (Pandas, Plotly)
- 🐳 DevOps (Docker, orchestration)