INITIALISATION SÉCURISÉE...

CyberScan Live 2.0 : Surveillance Réseau Temps Réel avec IA

Construire un NIDS intelligent avec Python et IA

🎯 Pourquoi un système de surveillance réseau temps réel ?

Le problème

Les réseaux font face à deux types de menaces simultanées : menaces externes (scans, brute-force, exploits) et vulnérabilités internes (services obsolètes, configurations faibles). Le vrai danger survient lors de la corrélation : quand une menace externe cible précisément une vulnérabilité interne.

CyberScan Live 2.0 unifie la détection dans une interface unique avec analyse IA en temps réel.

Les trois piliers

  1. Détection active : Scanner le LAN pour trouver les équipements et leurs vulnérabilités
  2. Surveillance passive : Capturer le trafic entrant pour détecter les attaques
  3. Corrélation intelligente : Croiser les données pour identifier les risques critiques

🏗️ Architecture du système

Architecture événementielle distribuée avec 5 modules communicant via WebSocket.

Interface Streamlit → WebSocket Client → Serveur WebSocket
                    ↓                    ↓
            File Watcher ← Sniffer → Security Analyzer → IA Pédagogique

Flux de données

ÉtapeModuleAction
1SnifferCapture paquets TCP/UDP entrants
2File WatcherDétecte nouvelles lignes du log
3Network ScannerScan subnet toutes les 30s
4Security AnalyzerScan ports + Banner grabbing
5IA PédagogiqueGénère explications structurées
6DashboardCorrélation trafic × vulnérabilités

🔧 Composants clés

1. Banner Grabbing

Connexion aux ports ouverts pour lire les bannières de service et identifier les versions vulnérables.

Exemple : Bannière SSH

SSH-2.0-OpenSSH_7.4

Révèle OpenSSH 7.4 avec CVE-2018-15473

def _scan_ports(self, ip, ports_to_scan):
    open_ports_map = {}
    for port in ports_to_scan:
        banner = ""
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
                sock.settimeout(0.5)  # Timeout connexion
                if sock.connect_ex((ip, port)) == 0:
                    sock.settimeout(1.0)  # Timeout réception
                    banner = sock.recv(1024).decode('utf-8', errors='ignore').strip()
                    open_ports_map[port] = banner
        except Exception:
            pass
    return open_ports_map

💡 Pourquoi deux timeouts ?

  • 0.5s connexion : Évite de bloquer sur ports fermés
  • 1s réception : Laisse le temps au service de répondre

2. Enrichissement IP avec AbuseIPDB

Chaque IP est enrichie avec score de réputation (0-100) et code pays.

def get_abuseipdb_info(ip):
    if ip in ip_cache:
        return ip_cache[ip]
    
    url = "https://api.abuseipdb.com/api/v2/check"
    params = {'ipAddress': ip, 'maxAgeInDays': '90'}
    headers = {'Key': ABUSEIPDB_API_KEY}
    
    response = requests.get(url, headers=headers, params=params, timeout=5)
    if response.status_code == 200:
        data = response.json()['data']
        score = data.get('abuseConfidenceScore', 0)
        country = data.get('countryCode', 'N/A')
        ip_cache[ip] = (score, country)
        return (score, country)
    return (-1, "N/A")

3. IA Pédagogique : Redondance Ollama → Groq

Stratégie de "double timeout" pour garantir la disponibilité.

INIT: Test Cold Start (Ollama - 90s)
  ✅ Succès → Client strict (timeout 20s)
  ❌ Échec → Basculement vers Groq Cloud
def __init__(self):
    # 1. Test Cold Start (90s)
    try:
        init_client = ollama.Client(timeout=90.0)
        init_client.chat(model='mistral', messages=[{'role':'user','content':'Test'}])
        
        # 2. Client strict pour runtime (20s)
        self.llm_local = ollama.Client(timeout=20.0)
    except TimeoutError:
        self.llm_local = None
        self.local_llm_failing_reason = "TIMEOUT_INIT"
    
    # 3. Fallback Groq
    self.llm_cloud = ChatGroq(model="llama-3.3-70b-versatile")

💡 Pourquoi cette stratégie ?

Ollama charge le modèle en RAM au 1er appel (30-90s). Les appels suivants sont rapides (<2s).

4. Corrélation de menaces

Croise trafic externe (IP attaquante, port ciblé) avec vulnérabilités internes (port ouvert).

def _run_threat_correlation(self, traffic_df, internal_alerts_map):
    external_threats = traffic_df[traffic_df['score'] > 50]
    
    vulnerable_endpoints = set()
    for ip, alerts in internal_alerts_map.items():
        for alert in alerts:
            vulnerable_endpoints.add((ip, int(alert['port'])))
    
    correlated = []
    for _, threat in external_threats.iterrows():
        if (threat['dst_ip'], threat['dst_port']) in vulnerable_endpoints:
            correlated.append({
                'type': 'Corrélation',
                'risk_level': 'Critique',
                'external_ip': threat['src_ip'],
                'target_ip': threat['dst_ip'],
                'target_port': threat['dst_port']
            })
    return correlated

Exemple concret

T+0 : Scanner détecte RDP ouvert (192.168.1.50:3389)

T+30min : Sniffer capture 50 tentatives RDP depuis 45.x.x.x (score 87)

Résultat : Alerte critique "ATTAQUE ACTIVE DÉTECTÉE"

⚡ WebSocket temps réel

Pourquoi WebSocket vs HTTP polling ?

CritèreHTTP PollingWebSocket
Latence1-5 secondes<50ms
OverheadHeaders répétésConnexion persistante
ScalabilitéLimitéeExcellente

Implémentation serveur

from fastapi import FastAPI, WebSocket

app = FastAPI()
active_connections = []

@app.websocket("/ws/events")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    active_connections.append(websocket)
    try:
        while True:
            await asyncio.sleep(1)
    except WebSocketDisconnect:
        active_connections.remove(websocket)

async def broadcast_event(event: dict):
    for conn in active_connections:
        await conn.send_json(event)

🚀 Optimisations

Cache intelligent

from functools import lru_cache

@lru_cache(maxsize=1000)
def get_abuseipdb_cached(ip: str):
    return get_abuseipdb_info(ip)

Performance scan réseau

MéthodeTemps (254 IPs)
Nmap (Python)~15 secondes
Socket raw~45 secondes
Async (asyncio)~8 secondes

🛡️ Sécurité et cadre légal

⚠️ Privilèges élevés nécessaires

  • Windows : PowerShell en Administrateur
  • Linux : sudo setcap cap_net_raw+ep /usr/bin/python3

💡 Cadre légal

En France, le scan de ports est légal sur votre propre infrastructure. Scanner des tiers sans autorisation = infraction (Code pénal art. 323-1).

🎯 Cas d'usage

Scénario 1 : Détection botnet IoT

Contexte : Caméra IP avec Telnet ouvert

T+0 : Alerte "Port 23 ouvert"

T+2h : 300 tentatives depuis Chine (score 92)

Résultat : Isolation VLAN avant compromission ✅

Scénario 2 : Audit ANSSI

non_compliant = []
for device, alerts in alerts_map.items():
    for alert in alerts:
        if alert['port'] in [21, 23, 80]:  # FTP, Telnet, HTTP
            non_compliant.append(device)

🚢 Déploiement Docker

FROM python:3.11-slim
RUN apt-get update && apt-get install -y tcpdump libpcap-dev nmap
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "websocket_server:app", "--host", "0.0.0.0"]
version: '3.8'
services:
  websocket:
    build: .
    ports: ["8000:8000"]
  sniffer:
    build: .
    command: python sniffer.py
    network_mode: host
    cap_add: [NET_ADMIN, NET_RAW]
  streamlit:
    build: .
    command: streamlit run app.py
    ports: ["8501:8501"]

🔮 Évolutions futures

ML pour anomalies

from sklearn.ensemble import IsolationForest

model = IsolationForest(contamination=0.1)
model.fit(traffic_features)
prediction = model.predict([current_traffic])  # -1 = anomalie

Intégration MISP

from pymisp import PyMISP

misp = PyMISP('https://misp.local', 'API_KEY')
result = misp.search(value=ip, type_attribute='ip-dst')

🎓 Conclusion

Ce que vous avez appris :

  • ✅ Architecture événementielle avec WebSocket
  • ✅ Banner grabbing pour détection de versions
  • ✅ Enrichissement API (AbuseIPDB)
  • ✅ Corrélation de données hétérogènes
  • ✅ Redondance IA (Ollama/Groq)
  • ✅ Optimisations (cache, async)

Compétences développées :

  • 🐍 Python avancé (asyncio, threading, sockets)
  • 🌐 Protocoles réseau (TCP/IP, WebSocket)
  • 🤖 IA/ML (LLM, prompts structurés)
  • 📊 Data Science (Pandas, Plotly)
  • 🐳 DevOps (Docker, orchestration)
> [SYSTEM] Accès niveau root détecté.
> [MESSAGE] Excellence en infrastructure & sécurité confirmée.
> [STATUS] Portfolio opérationnel, non régressif, visuellement optimisé.
> [COMMAND] Respect +++