Analyse de logs par IA avec Ollama sur un VPS : détecter les anomalies avec un LLM local

15 min de lecture·Matthieu·slackdiscordaiopslog-analysispythonlokiollama|

Construisez un pipeline d'analyse de logs par IA sur votre VPS. Interrogez Loki, classifiez les anomalies avec un LLM local via Ollama et envoyez des alertes vers Discord ou Slack avec un script Python et un timer systemd.

Votre pipeline Loki collecte les logs. Les tableaux de bord Grafana permettent de les parcourir. Mais personne ne surveille ces tableaux de bord à 3 h du matin quand une attaque par force brute SSH démarre ou qu'un disque se remplit. Il vous faut quelque chose qui lit vos logs en continu et vous prévient quand quelque chose ne va pas.

Ce tutoriel construit ce système. Un script Python interroge Loki toutes les 5 minutes, envoie les lots de logs à un LLM local tournant sur Ollama, récupère des classifications JSON structurées et envoie des alertes d'anomalie vers Discord ou Slack. Pas d'API IA cloud, pas de facturation au token, et vos données de logs ne quittent jamais votre serveur.

À la fin, vous disposerez d'un timer systemd fonctionnel qui tourne sans intervention, déduplique les alertes et gère les erreurs proprement.

Prérequis :

  • Un VPS avec au moins 8 Go de RAM (4 vCPU recommandés). Ollama et Loki tourneront côte à côte.
  • Un pipeline de logs Loki + Promtail fonctionnel
  • Python 3.10+ installé
  • Connaissance de base de systemd et Python

Cet article fait partie de la série AIOps.

Comment installer Ollama sur un VPS pour l'analyse de logs ?

Ollama exécute des LLM localement avec un seul binaire et expose une API HTTP sur le port 11434. Installez-le avec le script officiel, téléchargez un modèle et vérifiez que l'API répond. L'ensemble prend moins de 5 minutes avec une connexion correcte.

L'installeur a besoin de zstd pour l'extraction. Installez-le d'abord :

apt-get update && apt-get install -y zstd

Téléchargez et lancez le script d'installation :

curl -fsSL https://ollama.com/install.sh -o ollama-install.sh
sha256sum ollama-install.sh

Inspectez le script avant de l'exécuter. Lisez le code shell et comparez la somme de contrôle à une copie de référence si vous en avez une. Vous pouvez aussi consulter le code source sur GitHub. Puis lancez-le :

sh ollama-install.sh

L'installeur crée un service systemd appelé ollama. Vérifiez qu'il tourne :

systemctl status ollama

Vous devriez voir active (running) dans la sortie. Notez la ligne Loaded: loaded (/etc/systemd/system/ollama.service; enabled; preset: enabled). L'installeur l'a déjà configuré pour démarrer au boot.

Vérifiez que l'API écoute :

curl -s http://localhost:11434/api/tags | python3 -m json.tool

Cela renvoie un objet JSON avec un tableau models. Il sera vide tant que vous n'aurez pas téléchargé de modèle.

Lier Ollama à localhost uniquement

Par défaut, Ollama écoute sur 127.0.0.1:11434. Confirmez-le :

ss -tlnp | grep 11434

Si la sortie affiche 0.0.0.0:11434, Ollama est exposé à Internet. Corrigez cela en définissant la variable d'environnement dans l'unité systemd :

sudo mkdir -p /etc/systemd/system/ollama.service.d
cat <<'EOF' | sudo tee /etc/systemd/system/ollama.service.d/override.conf
[Service]
Environment="OLLAMA_HOST=127.0.0.1:11434"
EOF
sudo systemctl daemon-reload
sudo systemctl restart ollama
ss -tlnp | grep 11434

Confirmez que la sortie affiche maintenant 127.0.0.1:11434. Exposer une API LLM sur Internet permet à n'importe qui de lancer de l'inférence sur votre serveur.

Quel modèle LLM fonctionne le mieux pour la détection d'anomalies dans les logs serveur ?

Pour l'analyse de logs sur un VPS de 8 Go, il faut un modèle qui tient en mémoire aux côtés de Loki et Promtail. Deux modèles fonctionnent bien : Gemma 2 9B pour la classification générale de logs et Llama 3.1 8B pour l'analyse orientée sécurité. Les deux tournent en quantification Q4 avec environ 5-6 Go de RAM.

Téléchargez les deux modèles :

ollama pull gemma2:9b
ollama pull llama3.1:8b

Chaque téléchargement fait environ 5-6 Go. Après le téléchargement, vérifiez :

ollama list

Testez une inférence rapide pour confirmer que le modèle se charge :

curl -s http://localhost:11434/api/generate \
  -d '{"model": "gemma2:9b", "prompt": "Classify this log line: Failed password for root from 203.0.113.5 port 22", "stream": false}' \
  | python3 -m json.tool

Observez eval_duration dans la réponse. C'est le temps d'inférence en nanosecondes. Divisez par 1 000 000 pour obtenir des millisecondes.

Comparaison des modèles sur un VPS 4 vCPU / 8 Go

Les chiffres suivants ont été mesurés sur un Virtua Cloud VCS-8 (4 vCPU Ryzen, 8 Go RAM, NVMe) en traitant un lot de 100 lignes syslog avec le modèle déjà chargé en mémoire :

Métrique Gemma 2 9B (Q4_K_M) Llama 3.1 8B (Q4_K_M)
Taille du modèle sur disque 5,4 Go 4,9 Go
Utilisation RAM (chargé) ~5,8 Go ~5,2 Go
Temps par lot de 100 lignes ~12-18 s ~14-22 s
Tokens/s ~18-25 ~15-20
Précision sur logs de sécurité Bonne Meilleure
Détection d'anomalies générales Meilleure Bonne

Les démarrages à froid sont plus lents. La première inférence après qu'Ollama charge le modèle depuis le disque ajoute 5 à 10 secondes. Les appels suivants dans la fenêtre de keep-alive tournent aux vitesses indiquées ci-dessus.

Recommandation : commencez avec gemma2:9b pour l'analyse de logs généraliste. Passez à llama3.1:8b si vous analysez principalement des logs d'authentification et de sécurité.

Budget RAM sur 8 Go

Composant Utilisation RAM
OS + processus système ~400 Mo
Loki ~300-500 Mo
Promtail ~50 Mo
Ollama (inactif, aucun modèle chargé) ~30 Mo
Ollama (gemma2:9b chargé) ~5,8 Go
Script Python ~50 Mo
Total ~6,7-6,9 Go

Cela tient dans 8 Go avec ~1 Go de marge. Ollama décharge automatiquement les modèles après 5 minutes d'inactivité (configurable avec OLLAMA_KEEP_ALIVE), libérant la RAM. Le timer systemd se déclenche toutes les 5 minutes, donc le modèle reste chargé pendant les fenêtres d'analyse actives et se décharge entre elles.

Si la mémoire est limitée, utilisez gemma2:9b avec OLLAMA_KEEP_ALIVE=1m pour que le modèle se décharge plus vite après chaque lot.

Comment interroger les logs Loki depuis un script Python ?

Interrogez l'API HTTP de Loki à /loki/api/v1/query_range avec une expression LogQL et une fenêtre temporelle. L'API renvoie du JSON avec les flux de logs. Utilisez la bibliothèque requests de Python pour récupérer les 5 dernières minutes de logs pour un label de job donné.

D'abord, préparez le projet :

mkdir -p /opt/log-analyzer
cd /opt/log-analyzer

Créez le fichier de dépendances :

cat <<'EOF' > /opt/log-analyzer/requirements.txt
requests>=2.31.0
pydantic>=2.5.0
ollama>=0.4.0
EOF

Installez les dépendances dans un environnement virtuel. Sur Ubuntu 24.04, le paquet python3-venv est nécessaire :

apt-get install -y python3.12-venv
python3 -m venv /opt/log-analyzer/venv
/opt/log-analyzer/venv/bin/pip install -r /opt/log-analyzer/requirements.txt

Vérifiez l'installation :

/opt/log-analyzer/venv/bin/python -c "import requests, pydantic, ollama; print('OK')"

Fonction de requête Loki

La fonction suivante interroge Loki pour les logs récents :

import requests
from datetime import datetime, timedelta, timezone


def query_loki(
    loki_url: str,
    logql: str,
    minutes: int = 5,
    limit: int = 500,
) -> list[str]:
    """Query Loki for log lines from the last N minutes."""
    now = datetime.now(timezone.utc)
    start = now - timedelta(minutes=minutes)

    params = {
        "query": logql,
        "start": str(int(start.timestamp() * 1e9)),  # nanosecond epoch
        "end": str(int(now.timestamp() * 1e9)),
        "limit": limit,
    }

    resp = requests.get(
        f"{loki_url}/loki/api/v1/query_range",
        params=params,
        timeout=10,
    )
    resp.raise_for_status()

    data = resp.json()
    lines = []
    for stream in data.get("data", {}).get("result", []):
        for _ts, line in stream.get("values", []):
            lines.append(line)

    return lines

Les paramètres start et end utilisent des timestamps Unix en nanosecondes. La réponse de Loki imbrique les lignes de logs dans data.result[].values[], où chaque valeur est une paire [timestamp, line].

Exemples de requêtes LogQL que vous utiliserez :

# All syslog entries
SYSLOG_QUERY = '{job="syslog"}'

# Nginx error logs
NGINX_QUERY = '{job="nginx"} |= "error"'

# SSH authentication events
AUTH_QUERY = '{job="syslog"} |~ "(sshd|pam_unix)"'

Testez la requête sur votre instance Loki en cours d'exécution :

curl -s 'http://localhost:3100/loki/api/v1/query_range' \
  --data-urlencode 'query={job="syslog"}' \
  --data-urlencode "start=$(date -d '5 minutes ago' +%s)000000000" \
  --data-urlencode "end=$(date +%s)000000000" \
  --data-urlencode 'limit=10' \
  | python3 -m json.tool | head -30

Vous devriez voir des lignes de logs dans le tableau result. Si le tableau est vide, vérifiez que Promtail envoie bien les logs à Loki et que le label de job correspond à votre configuration Promtail.

Comment écrire des prompts qui classifient les entrées de logs comme anomalies ?

Le prompt est le cerveau de ce système. Un bon prompt dit au LLM exactement quoi chercher, définit les catégories de classification et exige une sortie structurée. Les mauvais prompts produisent des résumés vagues. Les bons prompts produisent du JSON actionnable.

Trois modèles de prompts couvrent la plupart des besoins d'analyse de logs serveur : détection d'anomalies générales, détection d'événements de sécurité et détection de problèmes de performance. Chaque prompt inclut le schéma de classification en ligne pour que le modèle connaisse le format de sortie attendu.

Prompt 1 : détection d'anomalies générales

PROMPT_GENERAL = """You are a server log analyzer. Analyze the following log lines and classify each anomaly found.

Rules:
- Only report anomalies. Normal operational logs should be ignored.
- An anomaly is anything unexpected: errors, warnings, unusual patterns, failed operations.
- Group related log lines into a single finding.
- Assign a severity: "low", "medium", "high", or "critical".

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of the anomaly",
      "severity": "low|medium|high|critical",
      "log_lines": ["the relevant log lines"],
      "explanation": "what this means and potential impact"
    }}
  ],
  "summary": "one sentence summary of overall log health"
}}

If no anomalies are found, return {{"findings": [], "summary": "No anomalies detected."}}.
"""

Prompt 2 : détection d'événements de sécurité

PROMPT_SECURITY = """You are a security analyst reviewing server logs. Identify security-relevant events.

Focus on:
- Brute-force attempts (repeated failed logins from same IP)
- Successful logins from unusual IPs or at unusual times
- Privilege escalation attempts (sudo failures, su attempts)
- Port scanning patterns
- Unauthorized access attempts to files or services

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of security event",
      "severity": "low|medium|high|critical",
      "source_ips": ["IP addresses involved"],
      "log_lines": ["the relevant log lines"],
      "recommendation": "suggested response action"
    }}
  ],
  "summary": "one sentence security posture assessment"
}}

If no security events are found, return {{"findings": [], "summary": "No security events detected."}}.
"""

Prompt 3 : détection de problèmes de performance

PROMPT_PERFORMANCE = """You are a performance engineer reviewing server logs. Identify performance-related issues.

Focus on:
- High response times or timeouts
- Resource exhaustion (OOM kills, disk full, connection limits)
- Service restarts or crashes
- Queue backlogs or processing delays
- Error rate spikes

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of performance issue",
      "severity": "low|medium|high|critical",
      "affected_service": "service name if identifiable",
      "log_lines": ["the relevant log lines"],
      "explanation": "what this means for system performance"
    }}
  ],
  "summary": "one sentence performance assessment"
}}

If no performance issues are found, return {{"findings": [], "summary": "No performance issues detected."}}.
"""

Inclure le schéma JSON directement dans le prompt est intentionnel. Cela donne au modèle deux signaux : le paramètre format impose du JSON valide, et le schéma dans le prompt guide la structure. Cette combinaison produit une sortie fiable avec de petits modèles.

Comment obtenir une sortie JSON structurée depuis Ollama ?

Ollama prend en charge la sortie structurée via le paramètre format de son API. Passez un schéma JSON et le modèle ne générera que des tokens conformes à ce schéma. Combiné avec un modèle Pydantic côté Python, vous récupérez des données validées et typées à chaque appel d'inférence.

Définissez les modèles Pydantic :

from pydantic import BaseModel


class Finding(BaseModel):
    title: str
    severity: str  # low, medium, high, critical
    log_lines: list[str]
    explanation: str = ""
    recommendation: str = ""
    source_ips: list[str] = []
    affected_service: str = ""


class AnalysisResult(BaseModel):
    findings: list[Finding]
    summary: str

Appelez Ollama avec le schéma imposé :

from ollama import chat


def analyze_logs(
    logs: list[str],
    model: str = "gemma2:9b",
    prompt_template: str = PROMPT_GENERAL,
) -> AnalysisResult:
    """Send logs to Ollama and get structured analysis back."""
    if not logs:
        return AnalysisResult(findings=[], summary="No logs to analyze.")

    # Truncate to avoid context window issues
    log_block = "\n".join(logs[:200])
    prompt = prompt_template.format(logs=log_block)

    response = chat(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        format=AnalysisResult.model_json_schema(),
        options={"temperature": 0.1},
    )

    return AnalysisResult.model_validate_json(response.message.content)

Points importants :

  • format=AnalysisResult.model_json_schema() indique à Ollama d'imposer le schéma JSON au niveau de la génération de tokens. Le modèle ne peut pas produire une sortie qui viole le schéma.
  • temperature: 0.1 maintient la sortie déterministe. La classification de logs ne doit pas être créative.
  • La troncature à 200 lignes évite les dépassements de fenêtre contextuelle. Gemma 2 9B a une fenêtre contextuelle de 8192 tokens. 200 lignes de logs à ~20 tokens chacune utilisent environ la moitié du contexte.
  • model_validate_json() parse la chaîne en un objet Pydantic typé. Si le parsing échoue (rare avec l'imposition de schéma), une ValidationError est levée que vous pouvez intercepter.

Testez la fonction depuis le shell Python :

/opt/log-analyzer/venv/bin/python3 -c "
from ollama import chat
import json

response = chat(
    model='gemma2:9b',
    messages=[{'role': 'user', 'content': 'Analyze this log: Failed password for root from 203.0.113.5 port 44322 ssh2'}],
    format={
        'type': 'object',
        'properties': {
            'findings': {'type': 'array', 'items': {'type': 'object'}},
            'summary': {'type': 'string'}
        },
        'required': ['findings', 'summary']
    },
    options={'temperature': 0.1},
)
print(json.dumps(json.loads(response.message.content), indent=2))
"

Vous devriez obtenir un objet JSON propre avec les clés findings et summary. Pas de blocs markdown, pas de préambule, juste du JSON.

Comment envoyer des alertes d'anomalie vers Discord et Slack ?

Envoyez une requête POST avec un payload JSON à l'URL d'un webhook. Discord utilise un tableau embeds avec des champs à code couleur. Slack utilise Block Kit avec des blocks et des champs text. Les deux acceptent un seul POST HTTPS.

Webhook Discord

Créez un webhook dans votre serveur Discord : Paramètres du serveur > Intégrations > Webhooks > Nouveau webhook. Copiez l'URL.

Stockez l'URL du webhook de manière sécurisée :

cat <<'EOF' > /opt/log-analyzer/.env
DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/YOUR_ID/YOUR_TOKEN
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
LOKI_URL=http://localhost:3100
OLLAMA_MODEL=gemma2:9b
EOF
chmod 600 /opt/log-analyzer/.env

La fonction d'envoi d'alertes :

import os
import requests

# Severity to Discord embed color (decimal)
SEVERITY_COLORS = {
    "critical": 15158332,  # red
    "high": 15105570,      # orange
    "medium": 16776960,    # yellow
    "low": 3447003,        # blue
}


def send_discord_alert(webhook_url: str, result: AnalysisResult) -> None:
    """Send findings to Discord as an embed."""
    if not result.findings:
        return

    for finding in result.findings:
        embed = {
            "title": f"[{finding.severity.upper()}] {finding.title}",
            "color": SEVERITY_COLORS.get(finding.severity, 3447003),
            "fields": [
                {
                    "name": "Explanation",
                    "value": finding.explanation or finding.recommendation or "N/A",
                    "inline": False,
                },
                {
                    "name": "Sample log lines",
                    "value": "```\n" + "\n".join(finding.log_lines[:5]) + "\n```",
                    "inline": False,
                },
            ],
        }
        if finding.source_ips:
            embed["fields"].append({
                "name": "Source IPs",
                "value": ", ".join(finding.source_ips),
                "inline": True,
            })

        payload = {"embeds": [embed]}
        resp = requests.post(webhook_url, json=payload, timeout=10)
        resp.raise_for_status()

Testez le webhook Discord avec curl :

curl -s -X POST "$DISCORD_WEBHOOK_URL" \
  -H "Content-Type: application/json" \
  -d '{
    "embeds": [{
      "title": "[HIGH] Test Alert - SSH Brute Force",
      "color": 15105570,
      "fields": [
        {"name": "Explanation", "value": "Multiple failed SSH login attempts from 203.0.113.5", "inline": false},
        {"name": "Sample log lines", "value": "```\nFailed password for root from 203.0.113.5\n```", "inline": false}
      ]
    }]
  }'

Vérifiez votre canal Discord. Vous devriez voir un message embed avec code couleur.

Webhook Slack

Créez une application Slack sur api.slack.com/apps, activez les Incoming Webhooks et copiez l'URL du webhook.

def send_slack_alert(webhook_url: str, result: AnalysisResult) -> None:
    """Send findings to Slack using Block Kit."""
    if not result.findings:
        return

    for finding in result.findings:
        severity_emoji = {
            "critical": ":rotating_light:",
            "high": ":warning:",
            "medium": ":large_yellow_circle:",
            "low": ":information_source:",
        }
        emoji = severity_emoji.get(finding.severity, ":grey_question:")

        blocks = [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": f"{emoji} [{finding.severity.upper()}] {finding.title}",
                },
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": finding.explanation or finding.recommendation or "N/A",
                },
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": "```" + "\n".join(finding.log_lines[:5]) + "```",
                },
            },
        ]

        payload = {
            "text": f"[{finding.severity.upper()}] {finding.title}",
            "blocks": blocks,
        }
        resp = requests.post(webhook_url, json=payload, timeout=10)
        resp.raise_for_status()

Testez le webhook Slack :

curl -s -X POST "$SLACK_WEBHOOK_URL" \
  -H "Content-Type: application/json" \
  -d '{"text": "[HIGH] Test Alert - SSH Brute Force", "blocks": [{"type": "header", "text": {"type": "plain_text", "text": ":warning: [HIGH] Test Alert"}}]}'

Comment éviter l'envoi d'alertes en double ?

Sans déduplication, la même attaque SSH par force brute depuis la même IP déclenche une alerte toutes les 5 minutes pendant des heures. Utilisez un cache fichier qui stocke un hash du titre et de la source de chaque finding. L'alerte est ignorée si le même hash est apparu dans la dernière heure.

import hashlib
import json
import time
from pathlib import Path

DEDUP_FILE = Path("/opt/log-analyzer/dedup_cache.json")
DEDUP_WINDOW = 3600  # seconds (1 hour)


def load_dedup_cache() -> dict:
    if DEDUP_FILE.exists():
        try:
            return json.loads(DEDUP_FILE.read_text())
        except (json.JSONDecodeError, OSError):
            return {}
    return {}


def save_dedup_cache(cache: dict) -> None:
    # Prune expired entries
    now = time.time()
    cache = {k: v for k, v in cache.items() if now - v < DEDUP_WINDOW}
    DEDUP_FILE.write_text(json.dumps(cache))


def is_duplicate(finding: Finding) -> bool:
    """Check if this finding was already alerted recently."""
    cache = load_dedup_cache()
    now = time.time()

    # Hash on title + sorted source IPs + severity
    key_material = f"{finding.title}|{finding.severity}|{'|'.join(sorted(finding.source_ips))}"
    key = hashlib.sha256(key_material.encode()).hexdigest()[:16]

    if key in cache and now - cache[key] < DEDUP_WINDOW:
        return True

    cache[key] = now
    save_dedup_cache(cache)
    return False

Le cache de déduplication est un fichier JSON avec des clés de hash courtes associées à des timestamps. Les anciennes entrées sont purgées à chaque sauvegarde. La fenêtre d'une heure est un bon défaut : assez longue pour supprimer les alertes répétées, assez courte pour relancer l'alerte si le même problème réapparaît après une pause.

Définissez les permissions du fichier cache :

touch /opt/log-analyzer/dedup_cache.json
chmod 600 /opt/log-analyzer/dedup_cache.json

Le script complet

Le fichier log_analyzer.py complet assemble le tout :

#!/usr/bin/env python3
"""AI Log Analyzer - Query Loki, classify with Ollama, alert to Discord/Slack."""

import hashlib
import json
import logging
import os
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path

import requests as http_requests
from ollama import chat
from pydantic import BaseModel

# --- Configuration ---

LOKI_URL = os.environ.get("LOKI_URL", "http://localhost:3100")
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "gemma2:9b")
DISCORD_WEBHOOK_URL = os.environ.get("DISCORD_WEBHOOK_URL", "")
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")
LOG_QUERIES = os.environ.get(
    "LOG_QUERIES",
    '{job="syslog"};{job="nginx"} |= "error"',
).split(";")
QUERY_WINDOW_MINUTES = int(os.environ.get("QUERY_WINDOW_MINUTES", "5"))
QUERY_LIMIT = int(os.environ.get("QUERY_LIMIT", "500"))
DEDUP_FILE = Path(os.environ.get("DEDUP_FILE", "/opt/log-analyzer/dedup_cache.json"))
DEDUP_WINDOW = int(os.environ.get("DEDUP_WINDOW", "3600"))

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger("log-analyzer")


# --- Models ---

class Finding(BaseModel):
    title: str
    severity: str
    log_lines: list[str]
    explanation: str = ""
    recommendation: str = ""
    source_ips: list[str] = []
    affected_service: str = ""


class AnalysisResult(BaseModel):
    findings: list[Finding]
    summary: str


# --- Prompts ---

PROMPT_GENERAL = """You are a server log analyzer. Analyze the following log lines and classify each anomaly found.

Rules:
- Only report anomalies. Normal operational logs should be ignored.
- An anomaly is anything unexpected: errors, warnings, unusual patterns, failed operations.
- Group related log lines into a single finding.
- Assign a severity: "low", "medium", "high", or "critical".

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of the anomaly",
      "severity": "low|medium|high|critical",
      "log_lines": ["the relevant log lines"],
      "explanation": "what this means and potential impact"
    }}
  ],
  "summary": "one sentence summary of overall log health"
}}

If no anomalies are found, return {{"findings": [], "summary": "No anomalies detected."}}.
"""


# --- Loki ---

def query_loki(logql: str) -> list[str]:
    """Query Loki for log lines from the last N minutes."""
    now = datetime.now(timezone.utc)
    start = now - timedelta(minutes=QUERY_WINDOW_MINUTES)

    params = {
        "query": logql,
        "start": str(int(start.timestamp() * 1e9)),
        "end": str(int(now.timestamp() * 1e9)),
        "limit": QUERY_LIMIT,
    }

    resp = http_requests.get(
        f"{LOKI_URL}/loki/api/v1/query_range",
        params=params,
        timeout=10,
    )
    resp.raise_for_status()

    data = resp.json()
    lines = []
    for stream in data.get("data", {}).get("result", []):
        for _ts, line in stream.get("values", []):
            lines.append(line)

    return lines


# --- Ollama ---

def analyze_logs(logs: list[str]) -> AnalysisResult:
    """Send logs to Ollama and get structured analysis back."""
    if not logs:
        return AnalysisResult(findings=[], summary="No logs to analyze.")

    log_block = "\n".join(logs[:200])
    prompt = PROMPT_GENERAL.format(logs=log_block)

    response = chat(
        model=OLLAMA_MODEL,
        messages=[{"role": "user", "content": prompt}],
        format=AnalysisResult.model_json_schema(),
        options={"temperature": 0.1},
    )

    return AnalysisResult.model_validate_json(response.message.content)


# --- Deduplication ---

def load_dedup_cache() -> dict:
    if DEDUP_FILE.exists():
        try:
            return json.loads(DEDUP_FILE.read_text())
        except (json.JSONDecodeError, OSError):
            return {}
    return {}


def save_dedup_cache(cache: dict) -> None:
    now = time.time()
    cache = {k: v for k, v in cache.items() if now - v < DEDUP_WINDOW}
    DEDUP_FILE.write_text(json.dumps(cache))


def is_duplicate(finding: Finding) -> bool:
    cache = load_dedup_cache()
    now = time.time()
    key_material = f"{finding.title}|{finding.severity}|{'|'.join(sorted(finding.source_ips))}"
    key = hashlib.sha256(key_material.encode()).hexdigest()[:16]

    if key in cache and now - cache[key] < DEDUP_WINDOW:
        return True

    cache[key] = now
    save_dedup_cache(cache)
    return False


# --- Alerting ---

SEVERITY_COLORS = {
    "critical": 15158332,
    "high": 15105570,
    "medium": 16776960,
    "low": 3447003,
}


def send_discord_alert(finding: Finding) -> None:
    if not DISCORD_WEBHOOK_URL:
        return

    embed = {
        "title": f"[{finding.severity.upper()}] {finding.title}",
        "color": SEVERITY_COLORS.get(finding.severity, 3447003),
        "fields": [
            {
                "name": "Explanation",
                "value": finding.explanation or finding.recommendation or "N/A",
                "inline": False,
            },
            {
                "name": "Sample log lines",
                "value": "```\n" + "\n".join(finding.log_lines[:5]) + "\n```",
                "inline": False,
            },
        ],
    }
    if finding.source_ips:
        embed["fields"].append({
            "name": "Source IPs",
            "value": ", ".join(finding.source_ips),
            "inline": True,
        })

    resp = http_requests.post(
        DISCORD_WEBHOOK_URL, json={"embeds": [embed]}, timeout=10
    )
    resp.raise_for_status()


def send_slack_alert(finding: Finding) -> None:
    if not SLACK_WEBHOOK_URL:
        return

    severity_emoji = {
        "critical": ":rotating_light:",
        "high": ":warning:",
        "medium": ":large_yellow_circle:",
        "low": ":information_source:",
    }
    emoji = severity_emoji.get(finding.severity, ":grey_question:")

    blocks = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": f"{emoji} [{finding.severity.upper()}] {finding.title}",
            },
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": finding.explanation or finding.recommendation or "N/A",
            },
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": "```" + "\n".join(finding.log_lines[:5]) + "```",
            },
        },
    ]

    resp = http_requests.post(
        SLACK_WEBHOOK_URL,
        json={
            "text": f"[{finding.severity.upper()}] {finding.title}",
            "blocks": blocks,
        },
        timeout=10,
    )
    resp.raise_for_status()


def send_alerts(finding: Finding) -> None:
    send_discord_alert(finding)
    send_slack_alert(finding)


# --- Main ---

def main() -> int:
    log.info("Starting log analysis run")
    all_lines = []

    for logql in LOG_QUERIES:
        logql = logql.strip()
        if not logql:
            continue
        try:
            lines = query_loki(logql)
            log.info("Query '%s' returned %d lines", logql, len(lines))
            all_lines.extend(lines)
        except Exception as e:
            log.error("Loki query failed for '%s': %s", logql, e)

    if not all_lines:
        log.info("No log lines to analyze")
        return 0

    log.info("Analyzing %d total log lines with %s", len(all_lines), OLLAMA_MODEL)

    try:
        result = analyze_logs(all_lines)
    except Exception as e:
        log.error("Ollama analysis failed: %s", e)
        return 1

    log.info("Analysis complete: %d findings. %s", len(result.findings), result.summary)

    alerted = 0
    for finding in result.findings:
        if is_duplicate(finding):
            log.info("Skipping duplicate: %s", finding.title)
            continue
        try:
            send_alerts(finding)
            alerted += 1
            log.info("Alerted: [%s] %s", finding.severity, finding.title)
        except Exception as e:
            log.error("Alert failed for '%s': %s", finding.title, e)

    log.info("Run complete. %d new alerts sent.", alerted)
    return 0


if __name__ == "__main__":
    sys.exit(main())

Définissez les permissions :

chmod 750 /opt/log-analyzer/log_analyzer.py
chown root:root /opt/log-analyzer/log_analyzer.py
ls -la /opt/log-analyzer/

Vérifiez que la sortie complète affiche rwxr-x--- pour le script et rw------- pour le fichier .env.

Lancez un test manuel :

cd /opt/log-analyzer
set -a && source .env && set +a
/opt/log-analyzer/venv/bin/python3 /opt/log-analyzer/log_analyzer.py

Vérifiez la sortie. Vous devriez voir les lignes de logs récupérées, les résultats d'analyse et les alertes envoyées (ou ignorées s'il n'y a pas d'anomalies).

Comment exécuter l'analyse de logs IA automatiquement avec un timer systemd ?

Créez une paire service + timer systemd. Le service exécute le script Python avec les variables d'environnement du fichier .env. Le timer le déclenche toutes les 5 minutes. Si le script échoue, systemd enregistre l'échec et l'exécution suivante se déroule normalement.

Créez l'unité de service :

cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.service
[Unit]
Description=AI Log Analyzer - Ollama anomaly detection
After=network-online.target ollama.service loki.service
Wants=network-online.target

[Service]
Type=oneshot
EnvironmentFile=/opt/log-analyzer/.env
ExecStart=/opt/log-analyzer/venv/bin/python3 /opt/log-analyzer/log_analyzer.py
WorkingDirectory=/opt/log-analyzer
User=root
StandardOutput=journal
StandardError=journal
TimeoutStartSec=120
EOF

Le TimeoutStartSec=120 donne au LLM jusqu'à 2 minutes pour terminer l'inférence. Sur un VPS de 8 Go avec 100-200 lignes de logs, l'inférence finit généralement en 15-25 secondes. Le timeout de 2 minutes gère les cas où Ollama doit charger le modèle depuis le disque.

Le service tourne en tant que User=root par simplicité. En production, créez un utilisateur dédié log-analyzer, accordez-lui l'accès en lecture au fichier .env et mettez à jour la directive User=. Le script n'a besoin que d'un accès HTTP à Loki et Ollama sur localhost, il ne nécessite donc pas de privilèges élevés.

Créez le timer :

cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.timer
[Unit]
Description=Run AI Log Analyzer every 5 minutes

[Timer]
OnBootSec=2min
OnUnitActiveSec=5min
AccuracySec=30s

[Install]
WantedBy=timers.target
EOF

Activez et démarrez le timer :

sudo systemctl daemon-reload
sudo systemctl enable --now log-analyzer.timer

Le enable --now fait survivre le timer aux redémarrages et le démarre immédiatement.

Vérifiez que le timer est actif :

systemctl status log-analyzer.timer

Vous devriez voir active (waiting) et la prochaine heure de déclenchement. Vérifiez quand il a tourné pour la dernière fois :

systemctl list-timers log-analyzer.timer

Après le premier déclenchement, consultez les logs du service :

journalctl -u log-analyzer.service -n 30 --no-pager

Repérez les messages Starting log analysis run et Run complete. Si vous voyez Ollama analysis failed, le modèle n'est peut-être pas téléchargé ou Ollama ne tourne pas.

Notifications d'échec

Si l'analyseur échoue, vous voulez le savoir. Ajoutez un handler OnFailure à l'unité de service :

cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer-failure@.service
[Unit]
Description=Log Analyzer failure notification for %i

[Service]
Type=oneshot
ExecStart=/usr/bin/curl -s -X POST ${DISCORD_WEBHOOK_URL} \
  -H "Content-Type: application/json" \
  -d '{"content": ":x: **Log Analyzer Failed**\nUnit: %i\nTime: %H\nCheck: journalctl -u %i"}'
EnvironmentFile=/opt/log-analyzer/.env
EOF

Ajoutez la directive OnFailure au service principal :

sudo mkdir -p /etc/systemd/system/log-analyzer.service.d
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.service.d/failure.conf
[Unit]
OnFailure=log-analyzer-failure@%n.service
EOF
sudo systemctl daemon-reload

Si le script Python se termine avec un code non nul, systemd envoie une notification sur votre canal Discord.

Quelles sont les limites de l'analyse de logs par LLM ?

L'analyse de logs par LLM complète l'alerte basée sur des règles, elle ne la remplace pas. Elle repère des patterns difficiles à exprimer sous forme de règles statiques. Mais elle a des faiblesses réelles qu'il faut connaître.

Hallucinations. Le modèle peut signaler des lignes de logs normales comme anomalies ou inventer des explications pour des événements bénins. Les findings de faible sévérité du LLM doivent être traités comme des suggestions, pas des faits. Vérifiez toujours manuellement les alertes de haute sévérité.

Limites de la fenêtre contextuelle. Gemma 2 9B a une fenêtre contextuelle de 8192 tokens. À ~20 tokens par ligne de log, cela fait environ 400 lignes maximum (avec de la place pour le prompt et la sortie). Le script tronque à 200 lignes par sécurité. Si votre serveur génère plus de 200 lignes en 5 minutes, vous devez soit filtrer avec des requêtes LogQL plus spécifiques, soit accepter que certaines lignes soient ignorées.

Pas d'apprentissage dans le temps. Le modèle n'a aucune mémoire entre les exécutions. Il ne peut pas apprendre qu'un pattern de log spécifique est normal pour votre environnement. Chaque lot est analysé de zéro. Si vous avez un message de log récurrent qui est bénin mais semble suspect, ajoutez-le à un filtre d'exclusion LogQL : {job="syslog"} != "expected noisy message".

Latence d'inférence. Sur un VPS 4 vCPU / 8 Go, l'inférence prend 12-22 secondes par lot. C'est acceptable pour un timer de 5 minutes mais trop lent pour de l'alerte temps réel. Pour les événements critiques en temps (disque plein, OOM), gardez les alertes Prometheus traditionnelles .

Faux négatifs. Les petits modèles ratent les patterns subtils. Une fuite mémoire lente qui produit une utilisation de swap légèrement élevée sur plusieurs jours n'apparaîtra pas dans une fenêtre de logs de 5 minutes. Utilisez les métriques Prometheus et les alertes Grafana pour la détection basée sur les tendances.

Coût d'exécution. Même sans frais par token, le modèle utilise ~5,8 Go de RAM une fois chargé. Sur un VPS de 8 Go, c'est la majeure partie de votre mémoire. Si votre serveur applicatif a besoin de cette RAM, exécutez Ollama sur un VPS séparé ou utilisez le modèle plus petit gemma2:2b (1,6 Go de RAM, précision moindre).

Quand utiliser ceci vs l'alerte traditionnelle

Cas d'usage Analyse de logs par LLM Alerte traditionnelle (Prometheus)
« Quelque chose semble anormal mais je ne peux pas écrire de règle » Oui Non
Détection de force brute SSH Oui (bon en reconnaissance de patterns) Oui (fail2ban est plus rapide)
Disque plein / OOM Non (trop lent) Oui
Patterns d'erreur inconnus Oui Non
Franchissement de seuil métrique Non Oui
Changement de format de logs Oui (s'adapte automatiquement) Non (les règles cassent)

La meilleure configuration fait tourner les deux. Prometheus gère les modes de défaillance connus avec des alertes rapides. Le LLM repère les inconnues inconnues en lisant le texte des logs.

Dépannage

Ollama renvoie « model not found » : lancez ollama list pour vérifier les modèles disponibles. Téléchargez le modèle avec ollama pull gemma2:9b.

La requête Loki renvoie des résultats vides : vérifiez que Promtail tourne (systemctl status promtail) et que le label de job dans votre requête LogQL correspond à la configuration Promtail. Testez avec curl directement sur l'API Loki.

Mémoire insuffisante : vérifiez la RAM avec free -h. Si le modèle Ollama consomme trop, définissez OLLAMA_KEEP_ALIVE=1m dans l'override du service Ollama. Passez à gemma2:2b pour une utilisation RAM réduite.

Les alertes Discord/Slack n'arrivent pas : testez l'URL du webhook avec les commandes curl de la section alertes. Vérifiez les erreurs HTTP dans les logs de l'analyseur : journalctl -u log-analyzer.service -n 50.

Inférence lente : vérifiez que votre VPS a le nombre de cœurs CPU attendu avec nproc. Ollama utilise tous les cœurs disponibles pour l'inférence. Si un autre processus consomme du CPU, l'inférence ralentit. Vérifiez avec top pendant une exécution.

Erreurs de parsing JSON : si model_validate_json échoue, le modèle a produit du JSON invalide malgré l'imposition de schéma. C'est rare mais cela arrive avec certains cas limites. Le script enregistre l'erreur et continue à l'exécution suivante. Si cela se répète, essayez de changer de modèle.

Consultez les logs de l'analyseur :

journalctl -u log-analyzer.service -f

Pour les problèmes spécifiques à Ollama :

journalctl -u ollama.service -f

Pour l'étape suivante du pipeline AIOps, voyez comment construire une remédiation automatisée qui agit sur ces alertes . Pour des approches d'observabilité alternatives, voir le guide SigNoz et OpenObserve.


Copyright 2026 Virtua.Cloud. Tous droits réservés. Ce contenu est une création originale de l'équipe Virtua.Cloud. Toute reproduction, republication ou redistribution sans autorisation écrite est interdite.

Prêt à essayer ?

Déployez votre serveur en quelques secondes. Linux, Windows ou FreeBSD.

Voir les offres VPS