AI-loganalyse met Ollama op een VPS: anomalieën detecteren met een lokaal LLM

13 min leestijd·Matthieu·slackdiscordaiopslog-analysispythonlokiollama|

Bouw een productieklare AI-loganalyse-pipeline op je VPS. Bevraag Loki voor logs, classificeer anomalieën met een lokaal LLM via Ollama en stuur waarschuwingen naar Discord of Slack met een Python-script en systemd-timer.

Je Loki-pipeline verzamelt logs. Grafana-dashboards laten je erdoorheen zoeken. Maar niemand kijkt naar die dashboards om 3 uur 's nachts als een SSH brute-force-aanval begint of een schijf volloopt. Je hebt iets nodig dat je logs continu leest en je waarschuwt als er iets misgaat.

Deze tutorial bouwt dat systeem. Een Python-script bevraagt Loki elke 5 minuten, stuurt logbatches naar een lokaal LLM dat draait op Ollama, krijgt gestructureerde JSON-classificaties terug en stuurt anomaliewaarschuwingen naar Discord of Slack. Geen cloud-AI-API's, geen facturering per token, en je loggegevens verlaten nooit je server.

Aan het eind heb je een werkende systemd-timer die onbeheerd draait, waarschuwingen dedupliceert en fouten netjes afhandelt.

Vereisten:

  • Een VPS met minimaal 8 GB RAM (4 vCPU aanbevolen). Ollama en Loki draaien naast elkaar.
  • Een werkende Loki + Promtail log-pipeline
  • Python 3.10+ geïnstalleerd
  • Basiskennis van systemd en Python

Dit artikel maakt deel uit van de AIOps-serie.

Hoe installeer je Ollama op een VPS voor loganalyse?

Ollama draait LLM's lokaal met een enkel binary en biedt een HTTP-API aan op poort 11434. Installeer het met het officiële script, download een model en controleer of de API reageert. Het hele proces duurt minder dan 5 minuten met een goede verbinding.

De installer heeft zstd nodig voor extractie. Installeer het eerst:

apt-get update && apt-get install -y zstd

Download en voer het installatiescript uit:

curl -fsSL https://ollama.com/install.sh -o ollama-install.sh
sha256sum ollama-install.sh

Bekijk het script voordat je het uitvoert. Lees de shellcode door en vergelijk de checksum met een bekende kopie als je die hebt. Je kunt ook de broncode op GitHub bekijken. Voer het dan uit:

sh ollama-install.sh

De installer maakt een systemd-service aan genaamd ollama. Controleer of deze draait:

systemctl status ollama

Je zou active (running) in de output moeten zien. Let op de regel Loaded: loaded (/etc/systemd/system/ollama.service; enabled; preset: enabled). De installer heeft het al ingesteld om te starten bij het booten.

Controleer of de API luistert:

curl -s http://localhost:11434/api/tags | python3 -m json.tool

Dit retourneert een JSON-object met een models-array. Deze is leeg totdat je een model downloadt.

Ollama alleen aan localhost binden

Standaard luistert Ollama op 127.0.0.1:11434. Bevestig dit:

ss -tlnp | grep 11434

Als de output 0.0.0.0:11434 toont, is Ollama blootgesteld aan het internet. Corrigeer dit door de omgevingsvariabele in de systemd-unit in te stellen:

sudo mkdir -p /etc/systemd/system/ollama.service.d
cat <<'EOF' | sudo tee /etc/systemd/system/ollama.service.d/override.conf
[Service]
Environment="OLLAMA_HOST=127.0.0.1:11434"
EOF
sudo systemctl daemon-reload
sudo systemctl restart ollama
ss -tlnp | grep 11434

Bevestig dat de output nu 127.0.0.1:11434 toont. Een LLM-API blootstellen aan het internet laat iedereen inferentie draaien op je server.

Welk LLM-model werkt het best voor anomaliedetectie in serverlogs?

Voor loganalyse op een 8 GB VPS heb je een model nodig dat in het geheugen past naast Loki en Promtail. Twee modellen werken goed: Gemma 2 9B voor algemene logclassificatie en Llama 3.1 8B voor beveiligingsgerichte analyse. Beide draaien in Q4-kwantisatie met ongeveer 5-6 GB RAM.

Download beide modellen:

ollama pull gemma2:9b
ollama pull llama3.1:8b

Elke download is ongeveer 5-6 GB. Na het downloaden, verifieer:

ollama list

Test een snelle inferentie om te bevestigen dat het model laadt:

curl -s http://localhost:11434/api/generate \
  -d '{"model": "gemma2:9b", "prompt": "Classify this log line: Failed password for root from 203.0.113.5 port 22", "stream": false}' \
  | python3 -m json.tool

Let op eval_duration in het antwoord. Dit is de inferentietijd in nanoseconden. Deel door 1.000.000 voor milliseconden.

Modelvergelijking op een 4 vCPU / 8 GB VPS

De volgende cijfers zijn gemeten op een Virtua Cloud VCS-8 (4 vCPU Ryzen, 8 GB RAM, NVMe) bij het verwerken van een batch van 100 syslog-regels met het model al geladen in het geheugen:

Metriek Gemma 2 9B (Q4_K_M) Llama 3.1 8B (Q4_K_M)
Modelgrootte op schijf 5,4 GB 4,9 GB
RAM-gebruik (geladen) ~5,8 GB ~5,2 GB
Tijd per 100-regelbatch ~12-18 s ~14-22 s
Tokens/s ~18-25 ~15-20
Nauwkeurigheid beveiligingslogs Goed Beter
Algemene anomaliedetectie Beter Goed

Koude starts zijn langzamer. De eerste inferentie nadat Ollama het model van schijf laadt, voegt 5-10 seconden toe. Opvolgende aanroepen binnen het keep-alive-venster draaien op de hierboven genoemde snelheden.

Aanbeveling: begin met gemma2:9b voor algemene loganalyse. Schakel over naar llama3.1:8b als je voornamelijk auth-/beveiligingslogs analyseert.

RAM-budget bij 8 GB

Component RAM-gebruik
OS + systeemprocessen ~400 MB
Loki ~300-500 MB
Promtail ~50 MB
Ollama (inactief, geen model geladen) ~30 MB
Ollama (gemma2:9b geladen) ~5,8 GB
Python-script ~50 MB
Totaal ~6,7-6,9 GB

Dit past in 8 GB met ~1 GB marge. Ollama ontlaadt modellen automatisch na 5 minuten inactiviteit (configureerbaar met OLLAMA_KEEP_ALIVE), waardoor het RAM vrijkomt. De systemd-timer triggert elke 5 minuten, dus het model blijft geladen tijdens actieve analysevensters en wordt ontladen ertussen.

Als het geheugen krap is, gebruik gemma2:9b met OLLAMA_KEEP_ALIVE=1m zodat het model sneller ontlaadt na elke batch.

Hoe bevraag je Loki-logs vanuit een Python-script?

Bevraag Loki's HTTP-API op /loki/api/v1/query_range met een LogQL-expressie en een tijdvenster. De API retourneert JSON met logstreams. Gebruik Python's requests-bibliotheek om de laatste 5 minuten logs op te halen voor een gegeven job-label.

Stel eerst het project op:

mkdir -p /opt/log-analyzer
cd /opt/log-analyzer

Maak het requirements-bestand:

cat <<'EOF' > /opt/log-analyzer/requirements.txt
requests>=2.31.0
pydantic>=2.5.0
ollama>=0.4.0
EOF

Installeer de afhankelijkheden in een virtuele omgeving. Op Ubuntu 24.04 heb je eerst het python3-venv-pakket nodig:

apt-get install -y python3.12-venv
python3 -m venv /opt/log-analyzer/venv
/opt/log-analyzer/venv/bin/pip install -r /opt/log-analyzer/requirements.txt

Controleer de installatie:

/opt/log-analyzer/venv/bin/python -c "import requests, pydantic, ollama; print('OK')"

Loki-queryfunctie

De volgende functie bevraagt Loki voor recente logs:

import requests
from datetime import datetime, timedelta, timezone


def query_loki(
    loki_url: str,
    logql: str,
    minutes: int = 5,
    limit: int = 500,
) -> list[str]:
    """Query Loki for log lines from the last N minutes."""
    now = datetime.now(timezone.utc)
    start = now - timedelta(minutes=minutes)

    params = {
        "query": logql,
        "start": str(int(start.timestamp() * 1e9)),  # nanosecond epoch
        "end": str(int(now.timestamp() * 1e9)),
        "limit": limit,
    }

    resp = requests.get(
        f"{loki_url}/loki/api/v1/query_range",
        params=params,
        timeout=10,
    )
    resp.raise_for_status()

    data = resp.json()
    lines = []
    for stream in data.get("data", {}).get("result", []):
        for _ts, line in stream.get("values", []):
            lines.append(line)

    return lines

De start- en end-parameters gebruiken Unix-timestamps in nanoseconden. Loki's antwoord nestelt logregels in data.result[].values[], waarbij elke waarde een [timestamp, line]-paar is.

Voorbeeld LogQL-queries die je gaat gebruiken:

# All syslog entries
SYSLOG_QUERY = '{job="syslog"}'

# Nginx error logs
NGINX_QUERY = '{job="nginx"} |= "error"'

# SSH authentication events
AUTH_QUERY = '{job="syslog"} |~ "(sshd|pam_unix)"'

Test de query tegen je draaiende Loki-instantie:

curl -s 'http://localhost:3100/loki/api/v1/query_range' \
  --data-urlencode 'query={job="syslog"}' \
  --data-urlencode "start=$(date -d '5 minutes ago' +%s)000000000" \
  --data-urlencode "end=$(date +%s)000000000" \
  --data-urlencode 'limit=10' \
  | python3 -m json.tool | head -30

Je zou logregels moeten zien in de result-array. Als de array leeg is, controleer of Promtail logs naar Loki stuurt en of het job-label overeenkomt met je Promtail-configuratie.

Hoe schrijf je prompts die logvermeldingen als anomalieën classificeren?

De prompt is het brein van dit systeem. Een goede prompt vertelt het LLM precies wat het moet zoeken, definieert classificatiecategorieën en eist gestructureerde output. Slechte prompts produceren vage samenvattingen. Goede prompts produceren bruikbare JSON.

Drie prompt-templates dekken de meeste behoeften voor serverloganalyse: algemene anomaliedetectie, beveiligingseventdetectie en prestatieprobleemdetectie. Elke prompt bevat het classificatieschema inline zodat het model het verwachte outputformaat kent.

Prompt 1: Algemene anomaliedetectie

PROMPT_GENERAL = """You are a server log analyzer. Analyze the following log lines and classify each anomaly found.

Rules:
- Only report anomalies. Normal operational logs should be ignored.
- An anomaly is anything unexpected: errors, warnings, unusual patterns, failed operations.
- Group related log lines into a single finding.
- Assign a severity: "low", "medium", "high", or "critical".

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of the anomaly",
      "severity": "low|medium|high|critical",
      "log_lines": ["the relevant log lines"],
      "explanation": "what this means and potential impact"
    }}
  ],
  "summary": "one sentence summary of overall log health"
}}

If no anomalies are found, return {{"findings": [], "summary": "No anomalies detected."}}.
"""

Prompt 2: Beveiligingseventdetectie

PROMPT_SECURITY = """You are a security analyst reviewing server logs. Identify security-relevant events.

Focus on:
- Brute-force attempts (repeated failed logins from same IP)
- Successful logins from unusual IPs or at unusual times
- Privilege escalation attempts (sudo failures, su attempts)
- Port scanning patterns
- Unauthorized access attempts to files or services

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of security event",
      "severity": "low|medium|high|critical",
      "source_ips": ["IP addresses involved"],
      "log_lines": ["the relevant log lines"],
      "recommendation": "suggested response action"
    }}
  ],
  "summary": "one sentence security posture assessment"
}}

If no security events are found, return {{"findings": [], "summary": "No security events detected."}}.
"""

Prompt 3: Prestatieprobleemdetectie

PROMPT_PERFORMANCE = """You are a performance engineer reviewing server logs. Identify performance-related issues.

Focus on:
- High response times or timeouts
- Resource exhaustion (OOM kills, disk full, connection limits)
- Service restarts or crashes
- Queue backlogs or processing delays
- Error rate spikes

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of performance issue",
      "severity": "low|medium|high|critical",
      "affected_service": "service name if identifiable",
      "log_lines": ["the relevant log lines"],
      "explanation": "what this means for system performance"
    }}
  ],
  "summary": "one sentence performance assessment"
}}

If no performance issues are found, return {{"findings": [], "summary": "No performance issues detected."}}.
"""

Het JSON-schema direct in de prompt opnemen is opzettelijk. Het geeft het model twee signalen: de format-parameter dwingt valide JSON af, en het schema in de prompt stuurt de structuur. Deze combinatie produceert betrouwbare output met kleine modellen.

Hoe krijg je gestructureerde JSON-output van Ollama?

Ollama ondersteunt gestructureerde output via de format-parameter in zijn API. Geef een JSON-schema mee en het model genereert alleen tokens die eraan voldoen. Gecombineerd met een Pydantic-model aan de Python-kant krijg je gevalideerde, getypeerde data terug bij elke inferentie-aanroep.

Definieer de Pydantic-modellen:

from pydantic import BaseModel


class Finding(BaseModel):
    title: str
    severity: str  # low, medium, high, critical
    log_lines: list[str]
    explanation: str = ""
    recommendation: str = ""
    source_ips: list[str] = []
    affected_service: str = ""


class AnalysisResult(BaseModel):
    findings: list[Finding]
    summary: str

Roep Ollama aan met het afgedwongen schema:

from ollama import chat


def analyze_logs(
    logs: list[str],
    model: str = "gemma2:9b",
    prompt_template: str = PROMPT_GENERAL,
) -> AnalysisResult:
    """Send logs to Ollama and get structured analysis back."""
    if not logs:
        return AnalysisResult(findings=[], summary="No logs to analyze.")

    # Truncate to avoid context window issues
    log_block = "\n".join(logs[:200])
    prompt = prompt_template.format(logs=log_block)

    response = chat(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        format=AnalysisResult.model_json_schema(),
        options={"temperature": 0.1},
    )

    return AnalysisResult.model_validate_json(response.message.content)

Belangrijke details:

  • format=AnalysisResult.model_json_schema() vertelt Ollama het JSON-schema af te dwingen op tokengeneratieniveau. Het model kan geen output produceren die het schema schendt.
  • temperature: 0.1 houdt de output deterministisch. Logclassificatie moet niet creatief zijn.
  • Afkappen op 200 regels voorkomt contextvenster-overflows. Gemma 2 9B heeft een contextvenster van 8192 tokens. 200 logregels van ~20 tokens per stuk gebruiken ongeveer de helft van de context.
  • model_validate_json() parst de string naar een getypeerd Pydantic-object. Als het parsen mislukt (zeldzaam bij schema-afdwinging), wordt een ValidationError gegooid die je kunt opvangen.

Test de functie vanuit de Python-shell:

/opt/log-analyzer/venv/bin/python3 -c "
from ollama import chat
import json

response = chat(
    model='gemma2:9b',
    messages=[{'role': 'user', 'content': 'Analyze this log: Failed password for root from 203.0.113.5 port 44322 ssh2'}],
    format={
        'type': 'object',
        'properties': {
            'findings': {'type': 'array', 'items': {'type': 'object'}},
            'summary': {'type': 'string'}
        },
        'required': ['findings', 'summary']
    },
    options={'temperature': 0.1},
)
print(json.dumps(json.loads(response.message.content), indent=2))
"

Je zou een schoon JSON-object moeten zien met findings- en summary-sleutels. Geen markdown-blokken, geen inleiding, alleen JSON.

Hoe stuur je anomaliewaarschuwingen naar Discord en Slack?

Stuur een POST-verzoek met een JSON-payload naar een webhook-URL. Discord gebruikt een embeds-array met kleurgecodeerde velden. Slack gebruikt Block Kit met blocks- en text-velden. Beide accepteren een enkel HTTPS POST-verzoek.

Discord-webhook

Maak een webhook in je Discord-server: Serverinstellingen > Integraties > Webhooks > Nieuwe webhook. Kopieer de URL.

Sla de webhook-URL veilig op:

cat <<'EOF' > /opt/log-analyzer/.env
DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/YOUR_ID/YOUR_TOKEN
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
LOKI_URL=http://localhost:3100
OLLAMA_MODEL=gemma2:9b
EOF
chmod 600 /opt/log-analyzer/.env

De waarschuwingsverzenderfunctie:

import os
import requests

# Severity to Discord embed color (decimal)
SEVERITY_COLORS = {
    "critical": 15158332,  # red
    "high": 15105570,      # orange
    "medium": 16776960,    # yellow
    "low": 3447003,        # blue
}


def send_discord_alert(webhook_url: str, result: AnalysisResult) -> None:
    """Send findings to Discord as an embed."""
    if not result.findings:
        return

    for finding in result.findings:
        embed = {
            "title": f"[{finding.severity.upper()}] {finding.title}",
            "color": SEVERITY_COLORS.get(finding.severity, 3447003),
            "fields": [
                {
                    "name": "Explanation",
                    "value": finding.explanation or finding.recommendation or "N/A",
                    "inline": False,
                },
                {
                    "name": "Sample log lines",
                    "value": "```\n" + "\n".join(finding.log_lines[:5]) + "\n```",
                    "inline": False,
                },
            ],
        }
        if finding.source_ips:
            embed["fields"].append({
                "name": "Source IPs",
                "value": ", ".join(finding.source_ips),
                "inline": True,
            })

        payload = {"embeds": [embed]}
        resp = requests.post(webhook_url, json=payload, timeout=10)
        resp.raise_for_status()

Test de Discord-webhook met curl:

curl -s -X POST "$DISCORD_WEBHOOK_URL" \
  -H "Content-Type: application/json" \
  -d '{
    "embeds": [{
      "title": "[HIGH] Test Alert - SSH Brute Force",
      "color": 15105570,
      "fields": [
        {"name": "Explanation", "value": "Multiple failed SSH login attempts from 203.0.113.5", "inline": false},
        {"name": "Sample log lines", "value": "```\nFailed password for root from 203.0.113.5\n```", "inline": false}
      ]
    }]
  }'

Controleer je Discord-kanaal. Je zou een kleurgecodeerd embed-bericht moeten zien.

Slack-webhook

Maak een Slack-app op api.slack.com/apps, schakel Incoming Webhooks in en kopieer de webhook-URL.

def send_slack_alert(webhook_url: str, result: AnalysisResult) -> None:
    """Send findings to Slack using Block Kit."""
    if not result.findings:
        return

    for finding in result.findings:
        severity_emoji = {
            "critical": ":rotating_light:",
            "high": ":warning:",
            "medium": ":large_yellow_circle:",
            "low": ":information_source:",
        }
        emoji = severity_emoji.get(finding.severity, ":grey_question:")

        blocks = [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": f"{emoji} [{finding.severity.upper()}] {finding.title}",
                },
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": finding.explanation or finding.recommendation or "N/A",
                },
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": "```" + "\n".join(finding.log_lines[:5]) + "```",
                },
            },
        ]

        payload = {
            "text": f"[{finding.severity.upper()}] {finding.title}",
            "blocks": blocks,
        }
        resp = requests.post(webhook_url, json=payload, timeout=10)
        resp.raise_for_status()

Test de Slack-webhook:

curl -s -X POST "$SLACK_WEBHOOK_URL" \
  -H "Content-Type: application/json" \
  -d '{"text": "[HIGH] Test Alert - SSH Brute Force", "blocks": [{"type": "header", "text": {"type": "plain_text", "text": ":warning: [HIGH] Test Alert"}}]}'

Hoe voorkom je het verzenden van dubbele waarschuwingen?

Zonder deduplicatie triggert dezelfde SSH brute-force van hetzelfde IP elke 5 minuten een waarschuwing, urenlang. Gebruik een bestandsgebaseerde cache die een hash opslaat van de titel en bron van elke finding. De waarschuwing wordt overgeslagen als dezelfde hash in het afgelopen uur is verschenen.

import hashlib
import json
import time
from pathlib import Path

DEDUP_FILE = Path("/opt/log-analyzer/dedup_cache.json")
DEDUP_WINDOW = 3600  # seconds (1 hour)


def load_dedup_cache() -> dict:
    if DEDUP_FILE.exists():
        try:
            return json.loads(DEDUP_FILE.read_text())
        except (json.JSONDecodeError, OSError):
            return {}
    return {}


def save_dedup_cache(cache: dict) -> None:
    # Prune expired entries
    now = time.time()
    cache = {k: v for k, v in cache.items() if now - v < DEDUP_WINDOW}
    DEDUP_FILE.write_text(json.dumps(cache))


def is_duplicate(finding: Finding) -> bool:
    """Check if this finding was already alerted recently."""
    cache = load_dedup_cache()
    now = time.time()

    # Hash on title + sorted source IPs + severity
    key_material = f"{finding.title}|{finding.severity}|{'|'.join(sorted(finding.source_ips))}"
    key = hashlib.sha256(key_material.encode()).hexdigest()[:16]

    if key in cache and now - cache[key] < DEDUP_WINDOW:
        return True

    cache[key] = now
    save_dedup_cache(cache)
    return False

De deduplicatiecache is een JSON-bestand met korte hash-sleutels gekoppeld aan timestamps. Oude entries worden bij elke opslag opgeruimd. Het 1-uur-venster is een goede standaard: lang genoeg om herhaalde waarschuwingen te onderdrukken, kort genoeg om opnieuw te waarschuwen als hetzelfde probleem na een pauze terugkeert.

Stel de juiste permissies in op het cachebestand:

touch /opt/log-analyzer/dedup_cache.json
chmod 600 /opt/log-analyzer/dedup_cache.json

Het volledige script

Het complete log_analyzer.py brengt alles samen:

#!/usr/bin/env python3
"""AI Log Analyzer - Query Loki, classify with Ollama, alert to Discord/Slack."""

import hashlib
import json
import logging
import os
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path

import requests as http_requests
from ollama import chat
from pydantic import BaseModel

# --- Configuration ---

LOKI_URL = os.environ.get("LOKI_URL", "http://localhost:3100")
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "gemma2:9b")
DISCORD_WEBHOOK_URL = os.environ.get("DISCORD_WEBHOOK_URL", "")
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")
LOG_QUERIES = os.environ.get(
    "LOG_QUERIES",
    '{job="syslog"};{job="nginx"} |= "error"',
).split(";")
QUERY_WINDOW_MINUTES = int(os.environ.get("QUERY_WINDOW_MINUTES", "5"))
QUERY_LIMIT = int(os.environ.get("QUERY_LIMIT", "500"))
DEDUP_FILE = Path(os.environ.get("DEDUP_FILE", "/opt/log-analyzer/dedup_cache.json"))
DEDUP_WINDOW = int(os.environ.get("DEDUP_WINDOW", "3600"))

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger("log-analyzer")


# --- Models ---

class Finding(BaseModel):
    title: str
    severity: str
    log_lines: list[str]
    explanation: str = ""
    recommendation: str = ""
    source_ips: list[str] = []
    affected_service: str = ""


class AnalysisResult(BaseModel):
    findings: list[Finding]
    summary: str


# --- Prompts ---

PROMPT_GENERAL = """You are a server log analyzer. Analyze the following log lines and classify each anomaly found.

Rules:
- Only report anomalies. Normal operational logs should be ignored.
- An anomaly is anything unexpected: errors, warnings, unusual patterns, failed operations.
- Group related log lines into a single finding.
- Assign a severity: "low", "medium", "high", or "critical".

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of the anomaly",
      "severity": "low|medium|high|critical",
      "log_lines": ["the relevant log lines"],
      "explanation": "what this means and potential impact"
    }}
  ],
  "summary": "one sentence summary of overall log health"
}}

If no anomalies are found, return {{"findings": [], "summary": "No anomalies detected."}}.
"""


# --- Loki ---

def query_loki(logql: str) -> list[str]:
    """Query Loki for log lines from the last N minutes."""
    now = datetime.now(timezone.utc)
    start = now - timedelta(minutes=QUERY_WINDOW_MINUTES)

    params = {
        "query": logql,
        "start": str(int(start.timestamp() * 1e9)),
        "end": str(int(now.timestamp() * 1e9)),
        "limit": QUERY_LIMIT,
    }

    resp = http_requests.get(
        f"{LOKI_URL}/loki/api/v1/query_range",
        params=params,
        timeout=10,
    )
    resp.raise_for_status()

    data = resp.json()
    lines = []
    for stream in data.get("data", {}).get("result", []):
        for _ts, line in stream.get("values", []):
            lines.append(line)

    return lines


# --- Ollama ---

def analyze_logs(logs: list[str]) -> AnalysisResult:
    """Send logs to Ollama and get structured analysis back."""
    if not logs:
        return AnalysisResult(findings=[], summary="No logs to analyze.")

    log_block = "\n".join(logs[:200])
    prompt = PROMPT_GENERAL.format(logs=log_block)

    response = chat(
        model=OLLAMA_MODEL,
        messages=[{"role": "user", "content": prompt}],
        format=AnalysisResult.model_json_schema(),
        options={"temperature": 0.1},
    )

    return AnalysisResult.model_validate_json(response.message.content)


# --- Deduplication ---

def load_dedup_cache() -> dict:
    if DEDUP_FILE.exists():
        try:
            return json.loads(DEDUP_FILE.read_text())
        except (json.JSONDecodeError, OSError):
            return {}
    return {}


def save_dedup_cache(cache: dict) -> None:
    now = time.time()
    cache = {k: v for k, v in cache.items() if now - v < DEDUP_WINDOW}
    DEDUP_FILE.write_text(json.dumps(cache))


def is_duplicate(finding: Finding) -> bool:
    cache = load_dedup_cache()
    now = time.time()
    key_material = f"{finding.title}|{finding.severity}|{'|'.join(sorted(finding.source_ips))}"
    key = hashlib.sha256(key_material.encode()).hexdigest()[:16]

    if key in cache and now - cache[key] < DEDUP_WINDOW:
        return True

    cache[key] = now
    save_dedup_cache(cache)
    return False


# --- Alerting ---

SEVERITY_COLORS = {
    "critical": 15158332,
    "high": 15105570,
    "medium": 16776960,
    "low": 3447003,
}


def send_discord_alert(finding: Finding) -> None:
    if not DISCORD_WEBHOOK_URL:
        return

    embed = {
        "title": f"[{finding.severity.upper()}] {finding.title}",
        "color": SEVERITY_COLORS.get(finding.severity, 3447003),
        "fields": [
            {
                "name": "Explanation",
                "value": finding.explanation or finding.recommendation or "N/A",
                "inline": False,
            },
            {
                "name": "Sample log lines",
                "value": "```\n" + "\n".join(finding.log_lines[:5]) + "\n```",
                "inline": False,
            },
        ],
    }
    if finding.source_ips:
        embed["fields"].append({
            "name": "Source IPs",
            "value": ", ".join(finding.source_ips),
            "inline": True,
        })

    resp = http_requests.post(
        DISCORD_WEBHOOK_URL, json={"embeds": [embed]}, timeout=10
    )
    resp.raise_for_status()


def send_slack_alert(finding: Finding) -> None:
    if not SLACK_WEBHOOK_URL:
        return

    severity_emoji = {
        "critical": ":rotating_light:",
        "high": ":warning:",
        "medium": ":large_yellow_circle:",
        "low": ":information_source:",
    }
    emoji = severity_emoji.get(finding.severity, ":grey_question:")

    blocks = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": f"{emoji} [{finding.severity.upper()}] {finding.title}",
            },
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": finding.explanation or finding.recommendation or "N/A",
            },
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": "```" + "\n".join(finding.log_lines[:5]) + "```",
            },
        },
    ]

    resp = http_requests.post(
        SLACK_WEBHOOK_URL,
        json={
            "text": f"[{finding.severity.upper()}] {finding.title}",
            "blocks": blocks,
        },
        timeout=10,
    )
    resp.raise_for_status()


def send_alerts(finding: Finding) -> None:
    send_discord_alert(finding)
    send_slack_alert(finding)


# --- Main ---

def main() -> int:
    log.info("Starting log analysis run")
    all_lines = []

    for logql in LOG_QUERIES:
        logql = logql.strip()
        if not logql:
            continue
        try:
            lines = query_loki(logql)
            log.info("Query '%s' returned %d lines", logql, len(lines))
            all_lines.extend(lines)
        except Exception as e:
            log.error("Loki query failed for '%s': %s", logql, e)

    if not all_lines:
        log.info("No log lines to analyze")
        return 0

    log.info("Analyzing %d total log lines with %s", len(all_lines), OLLAMA_MODEL)

    try:
        result = analyze_logs(all_lines)
    except Exception as e:
        log.error("Ollama analysis failed: %s", e)
        return 1

    log.info("Analysis complete: %d findings. %s", len(result.findings), result.summary)

    alerted = 0
    for finding in result.findings:
        if is_duplicate(finding):
            log.info("Skipping duplicate: %s", finding.title)
            continue
        try:
            send_alerts(finding)
            alerted += 1
            log.info("Alerted: [%s] %s", finding.severity, finding.title)
        except Exception as e:
            log.error("Alert failed for '%s': %s", finding.title, e)

    log.info("Run complete. %d new alerts sent.", alerted)
    return 0


if __name__ == "__main__":
    sys.exit(main())

Stel de permissies in:

chmod 750 /opt/log-analyzer/log_analyzer.py
chown root:root /opt/log-analyzer/log_analyzer.py
ls -la /opt/log-analyzer/

Controleer dat de output rwxr-x--- toont voor het script en rw------- voor het .env-bestand.

Voer een handmatige test uit:

cd /opt/log-analyzer
set -a && source .env && set +a
/opt/log-analyzer/venv/bin/python3 /opt/log-analyzer/log_analyzer.py

Controleer de output. Je zou opgehaalde logregels, analyseresultaten en verzonden waarschuwingen moeten zien (of overgeslagen als er geen anomalieën zijn).

Hoe voer je AI-loganalyse automatisch uit met een systemd-timer?

Maak een systemd service-en-timer-paar. De service voert het Python-script uit met omgevingsvariabelen uit het .env-bestand. De timer triggert het elke 5 minuten. Als het script faalt, logt systemd de fout en de volgende run gaat normaal door.

Maak de service-unit:

cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.service
[Unit]
Description=AI Log Analyzer - Ollama anomaly detection
After=network-online.target ollama.service loki.service
Wants=network-online.target

[Service]
Type=oneshot
EnvironmentFile=/opt/log-analyzer/.env
ExecStart=/opt/log-analyzer/venv/bin/python3 /opt/log-analyzer/log_analyzer.py
WorkingDirectory=/opt/log-analyzer
User=root
StandardOutput=journal
StandardError=journal
TimeoutStartSec=120
EOF

De TimeoutStartSec=120 geeft het LLM tot 2 minuten voor de inferentie. Op een 8 GB VPS met 100-200 logregels duurt inferentie doorgaans 15-25 seconden. De 2-minuten-timeout dekt gevallen waarin Ollama het model eerst van schijf moet laden.

De service draait als User=root voor de eenvoud. In productie maak je een speciale log-analyzer-gebruiker aan, geef je deze leestoegang tot het .env-bestand en werk je de User=-directive bij. Het script heeft alleen HTTP-toegang nodig tot Loki en Ollama op localhost en heeft daarom geen verhoogde rechten nodig.

Maak de timer:

cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.timer
[Unit]
Description=Run AI Log Analyzer every 5 minutes

[Timer]
OnBootSec=2min
OnUnitActiveSec=5min
AccuracySec=30s

[Install]
WantedBy=timers.target
EOF

Activeer en start de timer:

sudo systemctl daemon-reload
sudo systemctl enable --now log-analyzer.timer

Het enable --now zorgt ervoor dat de timer herstart overleeft en onmiddellijk start.

Controleer of de timer actief is:

systemctl status log-analyzer.timer

Je zou active (waiting) en de volgende triggertijd moeten zien. Controleer wanneer het voor het laatst is gedraaid:

systemctl list-timers log-analyzer.timer

Na de eerste trigger, controleer de service-logs:

journalctl -u log-analyzer.service -n 30 --no-pager

Let op de berichten Starting log analysis run en Run complete. Als je Ollama analysis failed ziet, is het model mogelijk niet gedownload of draait Ollama niet.

Foutmeldingen

Als de analyzer faalt, wil je dat weten. Voeg een OnFailure-handler toe aan de service-unit:

cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer-failure@.service
[Unit]
Description=Log Analyzer failure notification for %i

[Service]
Type=oneshot
ExecStart=/usr/bin/curl -s -X POST ${DISCORD_WEBHOOK_URL} \
  -H "Content-Type: application/json" \
  -d '{"content": ":x: **Log Analyzer Failed**\nUnit: %i\nTime: %H\nCheck: journalctl -u %i"}'
EnvironmentFile=/opt/log-analyzer/.env
EOF

Voeg de OnFailure-directive toe aan de hoofdservice:

sudo mkdir -p /etc/systemd/system/log-analyzer.service.d
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.service.d/failure.conf
[Unit]
OnFailure=log-analyzer-failure@%n.service
EOF
sudo systemctl daemon-reload

Als het Python-script eindigt met een niet-nul exitcode, stuurt systemd een melding naar je Discord-kanaal.

Wat zijn de beperkingen van LLM-gebaseerde loganalyse?

LLM-loganalyse is een aanvulling op regelgebaseerde alerting, geen vervanging. Het vangt patronen die moeilijk als statische regels te formuleren zijn. Maar het heeft echte zwaktes die je moet kennen.

Hallucinaties. Het model kan normale logregels als anomalieën markeren of verklaringen verzinnen voor onschuldige gebeurtenissen. Findings met lage ernst van het LLM moeten als suggesties worden behandeld, niet als feiten. Controleer waarschuwingen met hoge ernst altijd handmatig.

Contextvenster-limieten. Gemma 2 9B heeft een contextvenster van 8192 tokens. Bij ~20 tokens per logregel zijn dat maximaal ongeveer 400 regels (met ruimte voor de prompt en output). Het script kapt af op 200 regels voor de veiligheid. Als je server meer dan 200 regels genereert in 5 minuten, moet je filteren met specifiekere LogQL-queries of accepteren dat sommige regels worden overgeslagen.

Geen leren over tijd. Het model heeft geen geheugen tussen runs. Het kan niet leren dat een specifiek logpatroon normaal is voor jouw omgeving. Elke batch wordt van nul geanalyseerd. Als je een terugkerend logbericht hebt dat onschuldig is maar verdacht lijkt, voeg het toe aan een LogQL-uitsluitingsfilter: {job="syslog"} != "expected noisy message".

Inferentielatentie. Op een 4 vCPU / 8 GB VPS duurt inferentie 12-22 seconden per batch. Prima voor een 5-minuten-timer maar te traag voor real-time alerting. Voor tijdgevoelige gebeurtenissen (schijf vol, OOM), behoud traditionele Prometheus-alerts Een zelfherstellende VPS bouwen met Prometheus en Ollama.

Valse negatieven. Kleine modellen missen subtiele patronen. Een langzaam geheugenlek dat dagenlang licht verhoogd swap-gebruik produceert, verschijnt niet in een 5-minuten-logvenster. Gebruik Prometheus-metrics en Grafana-alerts voor trendgebaseerde detectie.

Kosten van draaien. Hoewel er geen per-token API-kosten zijn, gebruikt het model ~5,8 GB RAM wanneer geladen. Op een 8 GB VPS is dat het meeste van je geheugen. Als je applicatieserver dat RAM nodig heeft, draai Ollama op een aparte VPS of gebruik het kleinere gemma2:2b-model (1,6 GB RAM, lagere nauwkeurigheid).

Wanneer dit te gebruiken vs traditionele alerting

Gebruik LLM-loganalyse Traditionele alerting (Prometheus)
"Er lijkt iets mis maar ik kan er geen regel voor schrijven" Ja Nee
SSH brute-force-detectie Ja (goed in patroonherkenning) Ja (fail2ban is sneller)
Schijf vol / OOM Nee (te traag) Ja
Onbekende foutpatronen Ja Nee
Metriekdrempel-overschrijding Nee Ja
Logformaat wijziging Ja (past zich automatisch aan) Nee (regels breken)

De beste setup draait beide. Prometheus behandelt de bekende faalwijzen met snelle alerts. Het LLM vangt de onbekende onbekenden door de daadwerkelijke logtekst te lezen.

Probleemoplossing

Ollama retourneert "model not found": draai ollama list om beschikbare modellen te controleren. Download het model met ollama pull gemma2:9b.

Loki-query retourneert lege resultaten: controleer of Promtail draait (systemctl status promtail) en of het job-label in je LogQL-query overeenkomt met de Promtail-configuratie. Test met curl direct tegen de Loki-API.

Onvoldoende geheugen: controleer RAM met free -h. Als het Ollama-model te veel verbruikt, stel OLLAMA_KEEP_ALIVE=1m in in de Ollama-service-override. Schakel over naar gemma2:2b voor lager RAM-gebruik.

Discord-/Slack-waarschuwingen komen niet aan: test de webhook-URL met de curl-commando's uit de alerting-sectie. Controleer op HTTP-fouten in de analyzer-logs: journalctl -u log-analyzer.service -n 50.

Trage inferentie: controleer of je VPS de verwachte CPU-cores heeft met nproc. Ollama gebruikt alle beschikbare cores voor inferentie. Als een ander proces CPU verbruikt, vertraagt de inferentie. Controleer met top tijdens een run.

JSON-parseerfouten: als model_validate_json faalt, heeft het model ongeldige JSON geproduceerd ondanks schema-afdwinging. Dit is zeldzaam maar komt voor bij bepaalde randgevallen. Het script logt de fout en gaat door bij de volgende run. Als het herhaaldelijk voorkomt, probeer een ander model.

Controleer de analyzer-logs:

journalctl -u log-analyzer.service -f

Voor Ollama-specifieke problemen:

journalctl -u ollama.service -f

Voor de volgende stap in de AIOps-pipeline, zie hoe je geautomatiseerde remediatie bouwt die op deze waarschuwingen reageert Een zelfherstellende VPS bouwen met Prometheus en Ollama. Voor alternatieve observability-benaderingen, zie de SigNoz en OpenObserve gids.


Copyright 2026 Virtua.Cloud. Alle rechten voorbehouden. Deze inhoud is een origineel werk van het Virtua.Cloud-team. Reproductie, herpublicatie of herdistributie zonder schriftelijke toestemming is verboden.

Klaar om het zelf te proberen?

Deploy uw eigen server in seconden. Linux, Windows of FreeBSD.

Bekijk VPS-aanbod