KI-Log-Analyse mit Ollama auf einem VPS: Anomalien mit einem lokalen LLM erkennen
Bauen Sie eine produktionsreife KI-Log-Analyse-Pipeline auf Ihrem VPS. Fragen Sie Loki nach Logs ab, klassifizieren Sie Anomalien mit einem lokalen LLM über Ollama und leiten Sie Alarme an Discord oder Slack weiter – mit einem Python-Skript und systemd-Timer.
Ihre Loki-Pipeline sammelt Logs. Grafana-Dashboards ermöglichen die Suche. Aber niemand schaut um 3 Uhr morgens auf diese Dashboards, wenn ein SSH-Brute-Force-Angriff beginnt oder eine Festplatte vollläuft. Sie brauchen etwas, das Ihre Logs kontinuierlich liest und Sie warnt, wenn etwas nicht stimmt.
Dieses Tutorial baut genau dieses System. Ein Python-Skript fragt Loki alle 5 Minuten ab, schickt Log-Batches an ein lokales LLM auf Ollama, erhält strukturierte JSON-Klassifizierungen zurück und sendet Anomalie-Alarme an Discord oder Slack. Keine Cloud-KI-APIs, keine Token-basierte Abrechnung, und Ihre Log-Daten verlassen niemals Ihren Server.
Am Ende haben Sie einen funktionierenden systemd-Timer, der unbeaufsichtigt läuft, Alarme dedupliziert und Fehler zuverlässig behandelt.
Voraussetzungen:
- Ein VPS mit mindestens 8 GB RAM (4 vCPU empfohlen). Ollama und Loki laufen nebeneinander.
- Eine funktionierende Loki + Promtail Log-Pipeline
- Python 3.10+ installiert
- Grundkenntnisse in systemd und Python
Dieser Artikel ist Teil der AIOps-Serie.
Wie installieren Sie Ollama auf einem VPS für die Log-Analyse?
Ollama führt LLMs lokal mit einem einzigen Binary aus und stellt eine HTTP-API auf Port 11434 bereit. Installieren Sie es mit dem offiziellen Skript, laden Sie ein Modell herunter und prüfen Sie, ob die API antwortet. Der gesamte Vorgang dauert bei einer guten Verbindung weniger als 5 Minuten.
Der Installer benötigt zstd zum Entpacken. Installieren Sie es zuerst:
apt-get update && apt-get install -y zstd
Laden Sie das Installations-Skript herunter und führen Sie es aus:
curl -fsSL https://ollama.com/install.sh -o ollama-install.sh
sha256sum ollama-install.sh
Prüfen Sie das Skript, bevor Sie es ausführen. Lesen Sie den Shell-Code durch und vergleichen Sie die Prüfsumme mit einer bekannten Kopie, falls vorhanden. Sie können auch den Quellcode auf GitHub einsehen. Dann führen Sie es aus:
sh ollama-install.sh
Der Installer erstellt einen systemd-Dienst namens ollama. Prüfen Sie, ob er läuft:
systemctl status ollama
Sie sollten active (running) in der Ausgabe sehen. Beachten Sie die Zeile Loaded: loaded (/etc/systemd/system/ollama.service; enabled; preset: enabled). Der Installer hat den Dienst bereits für den Autostart konfiguriert.
Prüfen Sie, ob die API erreichbar ist:
curl -s http://localhost:11434/api/tags | python3 -m json.tool
Das gibt ein JSON-Objekt mit einem models-Array zurück. Es ist leer, bis Sie ein Modell herunterladen.
Ollama nur auf localhost binden
Standardmäßig lauscht Ollama auf 127.0.0.1:11434. Bestätigen Sie das:
ss -tlnp | grep 11434
Wenn die Ausgabe 0.0.0.0:11434 zeigt, ist Ollama aus dem Internet erreichbar. Beheben Sie das, indem Sie die Umgebungsvariable in der systemd-Unit setzen:
sudo mkdir -p /etc/systemd/system/ollama.service.d
cat <<'EOF' | sudo tee /etc/systemd/system/ollama.service.d/override.conf
[Service]
Environment="OLLAMA_HOST=127.0.0.1:11434"
EOF
sudo systemctl daemon-reload
sudo systemctl restart ollama
ss -tlnp | grep 11434
Bestätigen Sie, dass die Ausgabe jetzt 127.0.0.1:11434 zeigt. Eine LLM-API im Internet freizugeben erlaubt es jedem, Inferenz auf Ihrem Server auszuführen.
Welches LLM-Modell eignet sich am besten für die Anomalie-Erkennung in Server-Logs?
Für die Log-Analyse auf einem 8-GB-VPS brauchen Sie ein Modell, das neben Loki und Promtail in den Speicher passt. Zwei Modelle funktionieren gut: Gemma 2 9B für allgemeine Log-Klassifizierung und Llama 3.1 8B für sicherheitsorientierte Analyse. Beide laufen in Q4-Quantisierung mit etwa 5-6 GB RAM.
Laden Sie beide Modelle herunter:
ollama pull gemma2:9b
ollama pull llama3.1:8b
Jeder Download ist etwa 5-6 GB groß. Nach dem Download prüfen Sie:
ollama list
Testen Sie eine kurze Inferenz, um zu bestätigen, dass das Modell geladen wird:
curl -s http://localhost:11434/api/generate \
-d '{"model": "gemma2:9b", "prompt": "Classify this log line: Failed password for root from 203.0.113.5 port 22", "stream": false}' \
| python3 -m json.tool
Achten Sie auf eval_duration in der Antwort. Das ist die Inferenzzeit in Nanosekunden. Teilen Sie durch 1.000.000, um Millisekunden zu erhalten.
Modellvergleich auf einem 4-vCPU-/8-GB-VPS
Die folgenden Zahlen wurden auf einem Virtua Cloud VCS-8 (4 vCPU Ryzen, 8 GB RAM, NVMe) gemessen, bei der Verarbeitung eines Batches von 100 Syslog-Zeilen mit bereits geladenem Modell:
| Metrik | Gemma 2 9B (Q4_K_M) | Llama 3.1 8B (Q4_K_M) |
|---|---|---|
| Modellgröße auf der Festplatte | 5,4 GB | 4,9 GB |
| RAM-Nutzung (geladen) | ~5,8 GB | ~5,2 GB |
| Zeit pro 100-Zeilen-Batch | ~12-18 s | ~14-22 s |
| Tokens/s | ~18-25 | ~15-20 |
| Genauigkeit bei Sicherheits-Logs | Gut | Besser |
| Allgemeine Anomalie-Erkennung | Besser | Gut |
Kaltstarts sind langsamer. Die erste Inferenz nach dem Laden des Modells von der Festplatte durch Ollama benötigt zusätzlich 5-10 Sekunden. Nachfolgende Aufrufe innerhalb des Keep-Alive-Fensters laufen mit den oben genannten Geschwindigkeiten.
Empfehlung: Beginnen Sie mit gemma2:9b für allgemeine Log-Analyse. Wechseln Sie zu llama3.1:8b, wenn Sie hauptsächlich Auth-/Sicherheits-Logs analysieren.
RAM-Budget bei 8 GB
| Komponente | RAM-Nutzung |
|---|---|
| OS + Systemprozesse | ~400 MB |
| Loki | ~300-500 MB |
| Promtail | ~50 MB |
| Ollama (Leerlauf, kein Modell geladen) | ~30 MB |
| Ollama (gemma2:9b geladen) | ~5,8 GB |
| Python-Skript | ~50 MB |
| Gesamt | ~6,7-6,9 GB |
Das passt in 8 GB mit ~1 GB Reserve. Ollama entlädt Modelle automatisch nach 5 Minuten Inaktivität (konfigurierbar mit OLLAMA_KEEP_ALIVE) und gibt den RAM wieder frei. Der systemd-Timer löst alle 5 Minuten aus, sodass das Modell während aktiver Analysefenster geladen bleibt und sich dazwischen entlädt.
Wenn der Speicher knapp ist, verwenden Sie gemma2:9b mit OLLAMA_KEEP_ALIVE=1m, damit das Modell nach jedem Batch schneller entladen wird.
Wie fragen Sie Loki-Logs aus einem Python-Skript ab?
Fragen Sie Lokis HTTP-API unter /loki/api/v1/query_range mit einem LogQL-Ausdruck und einem Zeitfenster ab. Die API gibt JSON mit Log-Streams zurück. Verwenden Sie Pythons requests-Bibliothek, um die letzten 5 Minuten an Logs für ein bestimmtes Job-Label abzurufen.
Zuerst richten Sie das Projekt ein:
mkdir -p /opt/log-analyzer
cd /opt/log-analyzer
Erstellen Sie die Requirements-Datei:
cat <<'EOF' > /opt/log-analyzer/requirements.txt
requests>=2.31.0
pydantic>=2.5.0
ollama>=0.4.0
EOF
Installieren Sie die Abhängigkeiten in einer virtuellen Umgebung. Unter Ubuntu 24.04 brauchen Sie zuerst das Paket python3-venv:
apt-get install -y python3.12-venv
python3 -m venv /opt/log-analyzer/venv
/opt/log-analyzer/venv/bin/pip install -r /opt/log-analyzer/requirements.txt
Prüfen Sie die Installation:
/opt/log-analyzer/venv/bin/python -c "import requests, pydantic, ollama; print('OK')"
Loki-Abfrage-Funktion
Die folgende Funktion fragt Loki nach aktuellen Logs ab:
import requests
from datetime import datetime, timedelta, timezone
def query_loki(
loki_url: str,
logql: str,
minutes: int = 5,
limit: int = 500,
) -> list[str]:
"""Query Loki for log lines from the last N minutes."""
now = datetime.now(timezone.utc)
start = now - timedelta(minutes=minutes)
params = {
"query": logql,
"start": str(int(start.timestamp() * 1e9)), # nanosecond epoch
"end": str(int(now.timestamp() * 1e9)),
"limit": limit,
}
resp = requests.get(
f"{loki_url}/loki/api/v1/query_range",
params=params,
timeout=10,
)
resp.raise_for_status()
data = resp.json()
lines = []
for stream in data.get("data", {}).get("result", []):
for _ts, line in stream.get("values", []):
lines.append(line)
return lines
Die Parameter start und end verwenden Unix-Timestamps in Nanosekunden. Lokis Antwort verschachtelt Log-Zeilen in data.result[].values[], wobei jeder Wert ein [timestamp, line]-Paar ist.
Beispiel-LogQL-Abfragen, die Sie verwenden werden:
# All syslog entries
SYSLOG_QUERY = '{job="syslog"}'
# Nginx error logs
NGINX_QUERY = '{job="nginx"} |= "error"'
# SSH authentication events
AUTH_QUERY = '{job="syslog"} |~ "(sshd|pam_unix)"'
Testen Sie die Abfrage gegen Ihre laufende Loki-Instanz:
curl -s 'http://localhost:3100/loki/api/v1/query_range' \
--data-urlencode 'query={job="syslog"}' \
--data-urlencode "start=$(date -d '5 minutes ago' +%s)000000000" \
--data-urlencode "end=$(date +%s)000000000" \
--data-urlencode 'limit=10' \
| python3 -m json.tool | head -30
Sie sollten Log-Zeilen im result-Array sehen. Wenn das Array leer ist, prüfen Sie, ob Promtail Logs an Loki sendet und ob das Job-Label mit Ihrer Promtail-Konfiguration übereinstimmt.
Wie schreiben Sie Prompts, die Log-Einträge als Anomalien klassifizieren?
Der Prompt ist das Gehirn dieses Systems. Ein guter Prompt sagt dem LLM genau, wonach es suchen soll, definiert Klassifizierungskategorien und verlangt strukturierte Ausgabe. Schlechte Prompts produzieren vage Zusammenfassungen. Gute Prompts produzieren verwertbares JSON.
Drei Prompt-Vorlagen decken die meisten Anforderungen der Server-Log-Analyse ab: allgemeine Anomalie-Erkennung, Sicherheitsereignis-Erkennung und Performance-Problem-Erkennung. Jeder Prompt enthält das Klassifizierungsschema inline, damit das Modell das erwartete Ausgabeformat kennt.
Prompt 1: Allgemeine Anomalie-Erkennung
PROMPT_GENERAL = """You are a server log analyzer. Analyze the following log lines and classify each anomaly found.
Rules:
- Only report anomalies. Normal operational logs should be ignored.
- An anomaly is anything unexpected: errors, warnings, unusual patterns, failed operations.
- Group related log lines into a single finding.
- Assign a severity: "low", "medium", "high", or "critical".
Log lines:
{logs}
Respond with a JSON object matching this schema:
{{
"findings": [
{{
"title": "short description of the anomaly",
"severity": "low|medium|high|critical",
"log_lines": ["the relevant log lines"],
"explanation": "what this means and potential impact"
}}
],
"summary": "one sentence summary of overall log health"
}}
If no anomalies are found, return {{"findings": [], "summary": "No anomalies detected."}}.
"""
Prompt 2: Sicherheitsereignis-Erkennung
PROMPT_SECURITY = """You are a security analyst reviewing server logs. Identify security-relevant events.
Focus on:
- Brute-force attempts (repeated failed logins from same IP)
- Successful logins from unusual IPs or at unusual times
- Privilege escalation attempts (sudo failures, su attempts)
- Port scanning patterns
- Unauthorized access attempts to files or services
Log lines:
{logs}
Respond with a JSON object matching this schema:
{{
"findings": [
{{
"title": "short description of security event",
"severity": "low|medium|high|critical",
"source_ips": ["IP addresses involved"],
"log_lines": ["the relevant log lines"],
"recommendation": "suggested response action"
}}
],
"summary": "one sentence security posture assessment"
}}
If no security events are found, return {{"findings": [], "summary": "No security events detected."}}.
"""
Prompt 3: Performance-Problem-Erkennung
PROMPT_PERFORMANCE = """You are a performance engineer reviewing server logs. Identify performance-related issues.
Focus on:
- High response times or timeouts
- Resource exhaustion (OOM kills, disk full, connection limits)
- Service restarts or crashes
- Queue backlogs or processing delays
- Error rate spikes
Log lines:
{logs}
Respond with a JSON object matching this schema:
{{
"findings": [
{{
"title": "short description of performance issue",
"severity": "low|medium|high|critical",
"affected_service": "service name if identifiable",
"log_lines": ["the relevant log lines"],
"explanation": "what this means for system performance"
}}
],
"summary": "one sentence performance assessment"
}}
If no performance issues are found, return {{"findings": [], "summary": "No performance issues detected."}}.
"""
Das JSON-Schema direkt im Prompt einzubetten ist beabsichtigt. Es gibt dem Modell zwei Signale: der format-Parameter erzwingt valides JSON, und das Schema im Prompt leitet die Struktur. Diese Kombination liefert zuverlässige Ausgaben auch bei kleinen Modellen.
Wie erhalten Sie strukturierte JSON-Ausgabe von Ollama?
Ollama unterstützt strukturierte Ausgabe über den format-Parameter in seiner API. Übergeben Sie ein JSON-Schema, und das Modell generiert nur Tokens, die dem Schema entsprechen. Kombiniert mit einem Pydantic-Modell auf der Python-Seite erhalten Sie bei jedem Inferenzaufruf validierte, typisierte Daten zurück.
Definieren Sie die Pydantic-Modelle:
from pydantic import BaseModel
class Finding(BaseModel):
title: str
severity: str # low, medium, high, critical
log_lines: list[str]
explanation: str = ""
recommendation: str = ""
source_ips: list[str] = []
affected_service: str = ""
class AnalysisResult(BaseModel):
findings: list[Finding]
summary: str
Rufen Sie Ollama mit erzwungenem Schema auf:
from ollama import chat
def analyze_logs(
logs: list[str],
model: str = "gemma2:9b",
prompt_template: str = PROMPT_GENERAL,
) -> AnalysisResult:
"""Send logs to Ollama and get structured analysis back."""
if not logs:
return AnalysisResult(findings=[], summary="No logs to analyze.")
# Truncate to avoid context window issues
log_block = "\n".join(logs[:200])
prompt = prompt_template.format(logs=log_block)
response = chat(
model=model,
messages=[{"role": "user", "content": prompt}],
format=AnalysisResult.model_json_schema(),
options={"temperature": 0.1},
)
return AnalysisResult.model_validate_json(response.message.content)
Wichtige Details:
format=AnalysisResult.model_json_schema()weist Ollama an, das JSON-Schema auf Token-Generierungsebene zu erzwingen. Das Modell kann keine Ausgabe erzeugen, die das Schema verletzt.temperature: 0.1hält die Ausgabe deterministisch. Log-Klassifizierung sollte nicht kreativ sein.- Die Begrenzung auf 200 Zeilen verhindert Überläufe des Kontextfensters. Gemma 2 9B hat ein Kontextfenster von 8192 Tokens. 200 Log-Zeilen mit ~20 Tokens pro Zeile nutzen etwa die Hälfte des Kontexts.
model_validate_json()parst den String in ein typisiertes Pydantic-Objekt. Wenn das Parsen fehlschlägt (selten bei Schema-Erzwingung), wird einValidationErrorausgelöst, den Sie abfangen können.
Testen Sie die Funktion in der Python-Shell:
/opt/log-analyzer/venv/bin/python3 -c "
from ollama import chat
import json
response = chat(
model='gemma2:9b',
messages=[{'role': 'user', 'content': 'Analyze this log: Failed password for root from 203.0.113.5 port 44322 ssh2'}],
format={
'type': 'object',
'properties': {
'findings': {'type': 'array', 'items': {'type': 'object'}},
'summary': {'type': 'string'}
},
'required': ['findings', 'summary']
},
options={'temperature': 0.1},
)
print(json.dumps(json.loads(response.message.content), indent=2))
"
Sie sollten ein sauberes JSON-Objekt mit findings- und summary-Schlüsseln sehen. Keine Markdown-Blöcke, keine Einleitung, nur JSON.
Wie senden Sie Anomalie-Alarme an Discord und Slack?
Senden Sie einen POST-Request mit einem JSON-Payload an eine Webhook-URL. Discord verwendet ein embeds-Array mit farbcodierten Feldern. Slack verwendet Block Kit mit blocks- und text-Feldern. Beide akzeptieren einen einzelnen HTTPS-POST.
Discord-Webhook
Erstellen Sie einen Webhook in Ihrem Discord-Server: Servereinstellungen > Integrationen > Webhooks > Neuer Webhook. Kopieren Sie die URL.
Speichern Sie die Webhook-URL sicher:
cat <<'EOF' > /opt/log-analyzer/.env
DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/YOUR_ID/YOUR_TOKEN
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
LOKI_URL=http://localhost:3100
OLLAMA_MODEL=gemma2:9b
EOF
chmod 600 /opt/log-analyzer/.env
Die Alert-Sender-Funktion:
import os
import requests
# Severity to Discord embed color (decimal)
SEVERITY_COLORS = {
"critical": 15158332, # red
"high": 15105570, # orange
"medium": 16776960, # yellow
"low": 3447003, # blue
}
def send_discord_alert(webhook_url: str, result: AnalysisResult) -> None:
"""Send findings to Discord as an embed."""
if not result.findings:
return
for finding in result.findings:
embed = {
"title": f"[{finding.severity.upper()}] {finding.title}",
"color": SEVERITY_COLORS.get(finding.severity, 3447003),
"fields": [
{
"name": "Explanation",
"value": finding.explanation or finding.recommendation or "N/A",
"inline": False,
},
{
"name": "Sample log lines",
"value": "```\n" + "\n".join(finding.log_lines[:5]) + "\n```",
"inline": False,
},
],
}
if finding.source_ips:
embed["fields"].append({
"name": "Source IPs",
"value": ", ".join(finding.source_ips),
"inline": True,
})
payload = {"embeds": [embed]}
resp = requests.post(webhook_url, json=payload, timeout=10)
resp.raise_for_status()
Testen Sie den Discord-Webhook mit curl:
curl -s -X POST "$DISCORD_WEBHOOK_URL" \
-H "Content-Type: application/json" \
-d '{
"embeds": [{
"title": "[HIGH] Test Alert - SSH Brute Force",
"color": 15105570,
"fields": [
{"name": "Explanation", "value": "Multiple failed SSH login attempts from 203.0.113.5", "inline": false},
{"name": "Sample log lines", "value": "```\nFailed password for root from 203.0.113.5\n```", "inline": false}
]
}]
}'
Prüfen Sie Ihren Discord-Kanal. Sie sollten eine farbcodierte Embed-Nachricht sehen.
Slack-Webhook
Erstellen Sie eine Slack-App unter api.slack.com/apps, aktivieren Sie Incoming Webhooks und kopieren Sie die Webhook-URL.
def send_slack_alert(webhook_url: str, result: AnalysisResult) -> None:
"""Send findings to Slack using Block Kit."""
if not result.findings:
return
for finding in result.findings:
severity_emoji = {
"critical": ":rotating_light:",
"high": ":warning:",
"medium": ":large_yellow_circle:",
"low": ":information_source:",
}
emoji = severity_emoji.get(finding.severity, ":grey_question:")
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{emoji} [{finding.severity.upper()}] {finding.title}",
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": finding.explanation or finding.recommendation or "N/A",
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "```" + "\n".join(finding.log_lines[:5]) + "```",
},
},
]
payload = {
"text": f"[{finding.severity.upper()}] {finding.title}",
"blocks": blocks,
}
resp = requests.post(webhook_url, json=payload, timeout=10)
resp.raise_for_status()
Testen Sie den Slack-Webhook:
curl -s -X POST "$SLACK_WEBHOOK_URL" \
-H "Content-Type: application/json" \
-d '{"text": "[HIGH] Test Alert - SSH Brute Force", "blocks": [{"type": "header", "text": {"type": "plain_text", "text": ":warning: [HIGH] Test Alert"}}]}'
Wie vermeiden Sie doppelte Alarme?
Ohne Deduplizierung löst derselbe SSH-Brute-Force-Angriff von derselben IP alle 5 Minuten stundenlang einen Alarm aus. Verwenden Sie einen dateibasierten Cache, der einen Hash des Titels und der Quelle jedes Findings speichert. Der Alarm wird übersprungen, wenn derselbe Hash in der letzten Stunde aufgetreten ist.
import hashlib
import json
import time
from pathlib import Path
DEDUP_FILE = Path("/opt/log-analyzer/dedup_cache.json")
DEDUP_WINDOW = 3600 # seconds (1 hour)
def load_dedup_cache() -> dict:
if DEDUP_FILE.exists():
try:
return json.loads(DEDUP_FILE.read_text())
except (json.JSONDecodeError, OSError):
return {}
return {}
def save_dedup_cache(cache: dict) -> None:
# Prune expired entries
now = time.time()
cache = {k: v for k, v in cache.items() if now - v < DEDUP_WINDOW}
DEDUP_FILE.write_text(json.dumps(cache))
def is_duplicate(finding: Finding) -> bool:
"""Check if this finding was already alerted recently."""
cache = load_dedup_cache()
now = time.time()
# Hash on title + sorted source IPs + severity
key_material = f"{finding.title}|{finding.severity}|{'|'.join(sorted(finding.source_ips))}"
key = hashlib.sha256(key_material.encode()).hexdigest()[:16]
if key in cache and now - cache[key] < DEDUP_WINDOW:
return True
cache[key] = now
save_dedup_cache(cache)
return False
Der Deduplizierungs-Cache ist eine JSON-Datei mit kurzen Hash-Schlüsseln, die auf Timestamps verweisen. Alte Einträge werden bei jedem Speichern bereinigt. Das 1-Stunden-Fenster ist ein guter Standard: lang genug, um wiederholte Alarme zu unterdrücken, kurz genug, um erneut zu alarmieren, wenn dasselbe Problem nach einer Pause wieder auftritt.
Setzen Sie die richtigen Berechtigungen für die Cache-Datei:
touch /opt/log-analyzer/dedup_cache.json
chmod 600 /opt/log-analyzer/dedup_cache.json
Das vollständige Skript
Die komplette log_analyzer.py fügt alles zusammen:
#!/usr/bin/env python3
"""AI Log Analyzer - Query Loki, classify with Ollama, alert to Discord/Slack."""
import hashlib
import json
import logging
import os
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
import requests as http_requests
from ollama import chat
from pydantic import BaseModel
# --- Configuration ---
LOKI_URL = os.environ.get("LOKI_URL", "http://localhost:3100")
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "gemma2:9b")
DISCORD_WEBHOOK_URL = os.environ.get("DISCORD_WEBHOOK_URL", "")
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")
LOG_QUERIES = os.environ.get(
"LOG_QUERIES",
'{job="syslog"};{job="nginx"} |= "error"',
).split(";")
QUERY_WINDOW_MINUTES = int(os.environ.get("QUERY_WINDOW_MINUTES", "5"))
QUERY_LIMIT = int(os.environ.get("QUERY_LIMIT", "500"))
DEDUP_FILE = Path(os.environ.get("DEDUP_FILE", "/opt/log-analyzer/dedup_cache.json"))
DEDUP_WINDOW = int(os.environ.get("DEDUP_WINDOW", "3600"))
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger("log-analyzer")
# --- Models ---
class Finding(BaseModel):
title: str
severity: str
log_lines: list[str]
explanation: str = ""
recommendation: str = ""
source_ips: list[str] = []
affected_service: str = ""
class AnalysisResult(BaseModel):
findings: list[Finding]
summary: str
# --- Prompts ---
PROMPT_GENERAL = """You are a server log analyzer. Analyze the following log lines and classify each anomaly found.
Rules:
- Only report anomalies. Normal operational logs should be ignored.
- An anomaly is anything unexpected: errors, warnings, unusual patterns, failed operations.
- Group related log lines into a single finding.
- Assign a severity: "low", "medium", "high", or "critical".
Log lines:
{logs}
Respond with a JSON object matching this schema:
{{
"findings": [
{{
"title": "short description of the anomaly",
"severity": "low|medium|high|critical",
"log_lines": ["the relevant log lines"],
"explanation": "what this means and potential impact"
}}
],
"summary": "one sentence summary of overall log health"
}}
If no anomalies are found, return {{"findings": [], "summary": "No anomalies detected."}}.
"""
# --- Loki ---
def query_loki(logql: str) -> list[str]:
"""Query Loki for log lines from the last N minutes."""
now = datetime.now(timezone.utc)
start = now - timedelta(minutes=QUERY_WINDOW_MINUTES)
params = {
"query": logql,
"start": str(int(start.timestamp() * 1e9)),
"end": str(int(now.timestamp() * 1e9)),
"limit": QUERY_LIMIT,
}
resp = http_requests.get(
f"{LOKI_URL}/loki/api/v1/query_range",
params=params,
timeout=10,
)
resp.raise_for_status()
data = resp.json()
lines = []
for stream in data.get("data", {}).get("result", []):
for _ts, line in stream.get("values", []):
lines.append(line)
return lines
# --- Ollama ---
def analyze_logs(logs: list[str]) -> AnalysisResult:
"""Send logs to Ollama and get structured analysis back."""
if not logs:
return AnalysisResult(findings=[], summary="No logs to analyze.")
log_block = "\n".join(logs[:200])
prompt = PROMPT_GENERAL.format(logs=log_block)
response = chat(
model=OLLAMA_MODEL,
messages=[{"role": "user", "content": prompt}],
format=AnalysisResult.model_json_schema(),
options={"temperature": 0.1},
)
return AnalysisResult.model_validate_json(response.message.content)
# --- Deduplication ---
def load_dedup_cache() -> dict:
if DEDUP_FILE.exists():
try:
return json.loads(DEDUP_FILE.read_text())
except (json.JSONDecodeError, OSError):
return {}
return {}
def save_dedup_cache(cache: dict) -> None:
now = time.time()
cache = {k: v for k, v in cache.items() if now - v < DEDUP_WINDOW}
DEDUP_FILE.write_text(json.dumps(cache))
def is_duplicate(finding: Finding) -> bool:
cache = load_dedup_cache()
now = time.time()
key_material = f"{finding.title}|{finding.severity}|{'|'.join(sorted(finding.source_ips))}"
key = hashlib.sha256(key_material.encode()).hexdigest()[:16]
if key in cache and now - cache[key] < DEDUP_WINDOW:
return True
cache[key] = now
save_dedup_cache(cache)
return False
# --- Alerting ---
SEVERITY_COLORS = {
"critical": 15158332,
"high": 15105570,
"medium": 16776960,
"low": 3447003,
}
def send_discord_alert(finding: Finding) -> None:
if not DISCORD_WEBHOOK_URL:
return
embed = {
"title": f"[{finding.severity.upper()}] {finding.title}",
"color": SEVERITY_COLORS.get(finding.severity, 3447003),
"fields": [
{
"name": "Explanation",
"value": finding.explanation or finding.recommendation or "N/A",
"inline": False,
},
{
"name": "Sample log lines",
"value": "```\n" + "\n".join(finding.log_lines[:5]) + "\n```",
"inline": False,
},
],
}
if finding.source_ips:
embed["fields"].append({
"name": "Source IPs",
"value": ", ".join(finding.source_ips),
"inline": True,
})
resp = http_requests.post(
DISCORD_WEBHOOK_URL, json={"embeds": [embed]}, timeout=10
)
resp.raise_for_status()
def send_slack_alert(finding: Finding) -> None:
if not SLACK_WEBHOOK_URL:
return
severity_emoji = {
"critical": ":rotating_light:",
"high": ":warning:",
"medium": ":large_yellow_circle:",
"low": ":information_source:",
}
emoji = severity_emoji.get(finding.severity, ":grey_question:")
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{emoji} [{finding.severity.upper()}] {finding.title}",
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": finding.explanation or finding.recommendation or "N/A",
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "```" + "\n".join(finding.log_lines[:5]) + "```",
},
},
]
resp = http_requests.post(
SLACK_WEBHOOK_URL,
json={
"text": f"[{finding.severity.upper()}] {finding.title}",
"blocks": blocks,
},
timeout=10,
)
resp.raise_for_status()
def send_alerts(finding: Finding) -> None:
send_discord_alert(finding)
send_slack_alert(finding)
# --- Main ---
def main() -> int:
log.info("Starting log analysis run")
all_lines = []
for logql in LOG_QUERIES:
logql = logql.strip()
if not logql:
continue
try:
lines = query_loki(logql)
log.info("Query '%s' returned %d lines", logql, len(lines))
all_lines.extend(lines)
except Exception as e:
log.error("Loki query failed for '%s': %s", logql, e)
if not all_lines:
log.info("No log lines to analyze")
return 0
log.info("Analyzing %d total log lines with %s", len(all_lines), OLLAMA_MODEL)
try:
result = analyze_logs(all_lines)
except Exception as e:
log.error("Ollama analysis failed: %s", e)
return 1
log.info("Analysis complete: %d findings. %s", len(result.findings), result.summary)
alerted = 0
for finding in result.findings:
if is_duplicate(finding):
log.info("Skipping duplicate: %s", finding.title)
continue
try:
send_alerts(finding)
alerted += 1
log.info("Alerted: [%s] %s", finding.severity, finding.title)
except Exception as e:
log.error("Alert failed for '%s': %s", finding.title, e)
log.info("Run complete. %d new alerts sent.", alerted)
return 0
if __name__ == "__main__":
sys.exit(main())
Setzen Sie die Berechtigungen:
chmod 750 /opt/log-analyzer/log_analyzer.py
chown root:root /opt/log-analyzer/log_analyzer.py
ls -la /opt/log-analyzer/
Prüfen Sie, dass die Ausgabe rwxr-x--- für das Skript und rw------- für die .env-Datei zeigt.
Führen Sie einen manuellen Test durch:
cd /opt/log-analyzer
set -a && source .env && set +a
/opt/log-analyzer/venv/bin/python3 /opt/log-analyzer/log_analyzer.py
Prüfen Sie die Ausgabe. Sie sollten abgerufene Log-Zeilen, Analyseergebnisse und gesendete Alarme sehen (oder übersprungene, wenn keine Anomalien vorliegen).
Wie führen Sie die KI-Log-Analyse automatisch mit einem systemd-Timer aus?
Erstellen Sie ein systemd-Service-und-Timer-Paar. Der Service führt das Python-Skript mit Umgebungsvariablen aus der .env-Datei aus. Der Timer löst ihn alle 5 Minuten aus. Wenn das Skript fehlschlägt, protokolliert systemd den Fehler und der nächste Lauf wird normal ausgeführt.
Erstellen Sie die Service-Unit:
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.service
[Unit]
Description=AI Log Analyzer - Ollama anomaly detection
After=network-online.target ollama.service loki.service
Wants=network-online.target
[Service]
Type=oneshot
EnvironmentFile=/opt/log-analyzer/.env
ExecStart=/opt/log-analyzer/venv/bin/python3 /opt/log-analyzer/log_analyzer.py
WorkingDirectory=/opt/log-analyzer
User=root
StandardOutput=journal
StandardError=journal
TimeoutStartSec=120
EOF
Der TimeoutStartSec=120 gibt dem LLM bis zu 2 Minuten für die Inferenz. Auf einem 8-GB-VPS mit 100-200 Log-Zeilen dauert die Inferenz typischerweise 15-25 Sekunden. Das 2-Minuten-Timeout deckt Fälle ab, in denen Ollama das Modell erst von der Festplatte laden muss.
Der Service läuft als User=root der Einfachheit halber. In einer Produktionsumgebung erstellen Sie einen dedizierten log-analyzer-Benutzer, gewähren ihm Lesezugriff auf die .env-Datei und aktualisieren die User=-Direktive. Das Skript benötigt nur HTTP-Zugriff auf Loki und Ollama auf localhost und braucht daher keine erhöhten Berechtigungen.
Erstellen Sie den Timer:
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.timer
[Unit]
Description=Run AI Log Analyzer every 5 minutes
[Timer]
OnBootSec=2min
OnUnitActiveSec=5min
AccuracySec=30s
[Install]
WantedBy=timers.target
EOF
Aktivieren und starten Sie den Timer:
sudo systemctl daemon-reload
sudo systemctl enable --now log-analyzer.timer
Das enable --now lässt den Timer Neustarts überleben und startet ihn sofort.
Prüfen Sie, ob der Timer aktiv ist:
systemctl status log-analyzer.timer
Sie sollten active (waiting) und die nächste Auslösezeit sehen. Prüfen Sie, wann er zuletzt gelaufen ist:
systemctl list-timers log-analyzer.timer
Nach der ersten Auslösung prüfen Sie die Service-Logs:
journalctl -u log-analyzer.service -n 30 --no-pager
Achten Sie auf die Meldungen Starting log analysis run und Run complete. Wenn Sie Ollama analysis failed sehen, ist das Modell möglicherweise nicht heruntergeladen oder Ollama läuft nicht.
Fehlerbenachrichtigungen
Wenn der Analysator fehlschlägt, möchten Sie das wissen. Fügen Sie einen OnFailure-Handler zur Service-Unit hinzu:
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer-failure@.service
[Unit]
Description=Log Analyzer failure notification for %i
[Service]
Type=oneshot
ExecStart=/usr/bin/curl -s -X POST ${DISCORD_WEBHOOK_URL} \
-H "Content-Type: application/json" \
-d '{"content": ":x: **Log Analyzer Failed**\nUnit: %i\nTime: %H\nCheck: journalctl -u %i"}'
EnvironmentFile=/opt/log-analyzer/.env
EOF
Fügen Sie die OnFailure-Direktive zum Haupt-Service hinzu:
sudo mkdir -p /etc/systemd/system/log-analyzer.service.d
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.service.d/failure.conf
[Unit]
OnFailure=log-analyzer-failure@%n.service
EOF
sudo systemctl daemon-reload
Wenn das Python-Skript mit einem Nicht-Null-Code beendet wird, sendet systemd eine Benachrichtigung an Ihren Discord-Kanal.
Was sind die Grenzen der LLM-basierten Log-Analyse?
LLM-Log-Analyse ergänzt regelbasiertes Alerting, sie ersetzt es nicht. Sie erkennt Muster, die sich als statische Regeln nur schwer formulieren lassen. Aber sie hat echte Schwächen, die Sie kennen sollten.
Halluzinationen. Das Modell kann normale Log-Zeilen als Anomalien markieren oder Erklärungen für harmlose Ereignisse erfinden. Findings mit niedriger Schwere sollten als Hinweise behandelt werden, nicht als Fakten. Prüfen Sie Alarme hoher Schwere immer manuell.
Kontextfenster-Grenzen. Gemma 2 9B hat ein Kontextfenster von 8192 Tokens. Bei ~20 Tokens pro Log-Zeile sind das etwa 400 Zeilen maximal (mit Platz für Prompt und Ausgabe). Das Skript kürzt auf 200 Zeilen als Sicherheitsmarge. Wenn Ihr Server in 5 Minuten mehr als 200 Zeilen erzeugt, müssen Sie entweder mit spezifischeren LogQL-Abfragen filtern oder akzeptieren, dass einige Zeilen übersprungen werden.
Kein Lernen über die Zeit. Das Modell hat kein Gedächtnis zwischen den Läufen. Es kann nicht lernen, dass ein bestimmtes Log-Muster in Ihrer Umgebung normal ist. Jeder Batch wird von Grund auf analysiert. Wenn Sie eine wiederkehrende Log-Meldung haben, die harmlos ist aber verdächtig aussieht, fügen Sie sie einem LogQL-Ausschlussfilter hinzu: {job="syslog"} != "expected noisy message".
Inferenzlatenz. Auf einem 4-vCPU-/8-GB-VPS dauert die Inferenz 12-22 Sekunden pro Batch. Das ist für einen 5-Minuten-Timer in Ordnung, aber zu langsam für Echtzeit-Alerting. Für zeitkritische Ereignisse (Festplatte voll, OOM) behalten Sie traditionelle Prometheus-Alarme Einen selbstheilenden VPS mit Prometheus und Ollama aufbauen.
False Negatives. Kleine Modelle übersehen subtile Muster. Ein langsames Speicherleck, das über Tage leicht erhöhte Swap-Nutzung erzeugt, taucht in einem 5-Minuten-Log-Fenster nicht auf. Verwenden Sie Prometheus-Metriken und Grafana-Alarme für trendbasierte Erkennung.
Betriebskosten. Auch ohne Token-basierte API-Kosten belegt das Modell ~5,8 GB RAM im geladenen Zustand. Auf einem 8-GB-VPS ist das der Großteil Ihres Speichers. Wenn Ihr Anwendungsserver diesen RAM benötigt, führen Sie Ollama auf einem separaten VPS aus oder verwenden Sie das kleinere Modell gemma2:2b (1,6 GB RAM, geringere Genauigkeit).
Wann Sie dies vs. traditionelles Alerting verwenden
| Anwendungsfall | LLM-Log-Analyse | Traditionelles Alerting (Prometheus) |
|---|---|---|
| „Etwas stimmt nicht, aber ich kann keine Regel dafür schreiben" | Ja | Nein |
| SSH-Brute-Force-Erkennung | Ja (gut bei Mustererkennung) | Ja (fail2ban ist schneller) |
| Festplatte voll / OOM | Nein (zu langsam) | Ja |
| Unbekannte Fehlermuster | Ja | Nein |
| Metrik-Schwellenwert-Überschreitung | Nein | Ja |
| Änderung des Log-Formats | Ja (passt sich automatisch an) | Nein (Regeln brechen) |
Das beste Setup betreibt beides. Prometheus behandelt die bekannten Fehlermodi mit schnellen Alarmen. Das LLM fängt die unbekannten Unbekannten auf, indem es den tatsächlichen Log-Text liest.
Fehlerbehebung
Ollama gibt „model not found" zurück: Führen Sie ollama list aus, um verfügbare Modelle zu prüfen. Laden Sie das Modell mit ollama pull gemma2:9b herunter.
Loki-Abfrage liefert leere Ergebnisse: Prüfen Sie, ob Promtail läuft (systemctl status promtail) und ob das Job-Label in Ihrer LogQL-Abfrage mit der Promtail-Konfiguration übereinstimmt. Testen Sie mit curl direkt gegen die Loki-API.
Nicht genügend Arbeitsspeicher: Prüfen Sie den RAM mit free -h. Wenn das Ollama-Modell zu viel verbraucht, setzen Sie OLLAMA_KEEP_ALIVE=1m im Ollama-Service-Override. Wechseln Sie zu gemma2:2b für geringeren RAM-Verbrauch.
Discord-/Slack-Alarme kommen nicht an: Testen Sie die Webhook-URL mit den curl-Befehlen aus dem Alerting-Abschnitt. Prüfen Sie die HTTP-Fehler in den Analysator-Logs: journalctl -u log-analyzer.service -n 50.
Langsame Inferenz: Prüfen Sie, ob Ihr VPS die erwarteten CPU-Kerne hat, mit nproc. Ollama verwendet alle verfügbaren Kerne für die Inferenz. Wenn ein anderer Prozess CPU verbraucht, verlangsamt sich die Inferenz. Prüfen Sie mit top während eines Laufs.
JSON-Parsing-Fehler: Wenn model_validate_json fehlschlägt, hat das Modell trotz Schema-Erzwingung ungültiges JSON erzeugt. Das ist selten, tritt aber bei bestimmten Grenzfällen auf. Das Skript protokolliert den Fehler und setzt beim nächsten Lauf fort. Wenn es wiederholt auftritt, versuchen Sie ein anderes Modell.
Prüfen Sie die Analysator-Logs:
journalctl -u log-analyzer.service -f
Für Ollama-spezifische Probleme:
journalctl -u ollama.service -f
Für den nächsten Schritt in der AIOps-Pipeline erfahren Sie, wie Sie eine automatisierte Behebung aufbauen, die auf diese Alarme reagiert Einen selbstheilenden VPS mit Prometheus und Ollama aufbauen. Für alternative Observability-Ansätze siehe den SigNoz- und OpenObserve-Leitfaden.
Bereit, es selbst auszuprobieren?
Stellen Sie Ihren eigenen Server in Sekunden bereit. Linux, Windows oder FreeBSD. →