Analisi dei log con IA tramite Ollama su un VPS: rilevare anomalie con un LLM locale

14 min di lettura·Matthieu·slackdiscordaiopslog-analysispythonlokiollama|

Costruisci una pipeline di analisi log con IA pronta per la produzione sul tuo VPS. Interroga Loki, classifica le anomalie con un LLM locale tramite Ollama e invia avvisi a Discord o Slack con uno script Python e un timer systemd.

La tua pipeline Loki raccoglie i log. Le dashboard di Grafana permettono di cercarli. Ma nessuno guarda quelle dashboard alle 3 di notte quando inizia un attacco brute-force SSH o un disco si riempie. Ti serve qualcosa che legga i tuoi log continuamente e ti avvisi quando qualcosa non va.

Questo tutorial costruisce quel sistema. Uno script Python interroga Loki ogni 5 minuti, invia batch di log a un LLM locale in esecuzione su Ollama, riceve classificazioni JSON strutturate e manda avvisi di anomalia a Discord o Slack. Nessuna API IA cloud, nessuna fatturazione per token, e i tuoi dati di log non lasciano mai il server.

Alla fine avrai un timer systemd funzionante che gira senza supervisione, deduplica gli avvisi e gestisce i fallimenti in modo pulito.

Prerequisiti:

  • Un VPS con almeno 8 GB di RAM (4 vCPU consigliati). Ollama e Loki funzioneranno fianco a fianco.
  • Una pipeline di log Loki + Promtail funzionante
  • Python 3.10+ installato
  • Familiarità di base con systemd e Python

Questo articolo fa parte della serie AIOps.

Come si installa Ollama su un VPS per l'analisi dei log?

Ollama esegue LLM localmente con un singolo binario ed espone un'API HTTP sulla porta 11434. Installalo con lo script ufficiale, scarica un modello e verifica che l'API risponda. L'intero processo richiede meno di 5 minuti con una connessione decente.

L'installer necessita di zstd per l'estrazione. Installalo prima:

apt-get update && apt-get install -y zstd

Scarica ed esegui lo script di installazione:

curl -fsSL https://ollama.com/install.sh -o ollama-install.sh
sha256sum ollama-install.sh

Esamina lo script prima di eseguirlo. Leggi il codice shell e confronta il checksum con una copia conosciuta se ne hai una. Puoi anche consultare il codice sorgente su GitHub. Poi eseguilo:

sh ollama-install.sh

L'installer crea un servizio systemd chiamato ollama. Verifica che sia in esecuzione:

systemctl status ollama

Dovresti vedere active (running) nell'output. Nota la riga Loaded: loaded (/etc/systemd/system/ollama.service; enabled; preset: enabled). L'installer lo ha già configurato per avviarsi al boot.

Verifica che l'API sia in ascolto:

curl -s http://localhost:11434/api/tags | python3 -m json.tool

Questo restituisce un oggetto JSON con un array models. Sarà vuoto finché non scarichi un modello.

Vincolare Ollama solo a localhost

Per impostazione predefinita, Ollama ascolta su 127.0.0.1:11434. Confermalo:

ss -tlnp | grep 11434

Se l'output mostra 0.0.0.0:11434, Ollama è esposto a Internet. Correggi impostando la variabile d'ambiente nell'unità systemd:

sudo mkdir -p /etc/systemd/system/ollama.service.d
cat <<'EOF' | sudo tee /etc/systemd/system/ollama.service.d/override.conf
[Service]
Environment="OLLAMA_HOST=127.0.0.1:11434"
EOF
sudo systemctl daemon-reload
sudo systemctl restart ollama
ss -tlnp | grep 11434

Conferma che l'output ora mostra 127.0.0.1:11434. Esporre un'API LLM su Internet permette a chiunque di eseguire inferenza sul tuo server.

Quale modello LLM funziona meglio per il rilevamento di anomalie nei log del server?

Per l'analisi dei log su un VPS da 8 GB, serve un modello che stia in memoria insieme a Loki e Promtail. Due modelli funzionano bene: Gemma 2 9B per la classificazione generale dei log e Llama 3.1 8B per l'analisi orientata alla sicurezza. Entrambi girano in quantizzazione Q4 con circa 5-6 GB di RAM.

Scarica entrambi i modelli:

ollama pull gemma2:9b
ollama pull llama3.1:8b

Ogni download è di circa 5-6 GB. Dopo il download, verifica:

ollama list

Testa un'inferenza rapida per confermare che il modello si carica:

curl -s http://localhost:11434/api/generate \
  -d '{"model": "gemma2:9b", "prompt": "Classify this log line: Failed password for root from 203.0.113.5 port 22", "stream": false}' \
  | python3 -m json.tool

Osserva eval_duration nella risposta. È il tempo di inferenza in nanosecondi. Dividi per 1.000.000 per ottenere millisecondi.

Confronto modelli su un VPS 4 vCPU / 8 GB

I numeri seguenti sono stati misurati su un Virtua Cloud VCS-8 (4 vCPU Ryzen, 8 GB RAM, NVMe) elaborando un batch di 100 righe syslog con il modello già caricato in memoria:

Metrica Gemma 2 9B (Q4_K_M) Llama 3.1 8B (Q4_K_M)
Dimensione modello su disco 5,4 GB 4,9 GB
Uso RAM (caricato) ~5,8 GB ~5,2 GB
Tempo per batch da 100 righe ~12-18 s ~14-22 s
Token/s ~18-25 ~15-20
Precisione su log di sicurezza Buona Migliore
Rilevamento anomalie generali Migliore Buona

Gli avvii a freddo sono più lenti. La prima inferenza dopo che Ollama carica il modello dal disco aggiunge 5-10 secondi. Le chiamate successive nella finestra di keep-alive girano alle velocità indicate sopra.

Raccomandazione: inizia con gemma2:9b per l'analisi dei log generica. Passa a llama3.1:8b se analizzi principalmente log di autenticazione e sicurezza.

Budget RAM su 8 GB

Componente Uso RAM
OS + processi di sistema ~400 MB
Loki ~300-500 MB
Promtail ~50 MB
Ollama (inattivo, nessun modello caricato) ~30 MB
Ollama (gemma2:9b caricato) ~5,8 GB
Script Python ~50 MB
Totale ~6,7-6,9 GB

Ci sta in 8 GB con ~1 GB di margine. Ollama scarica automaticamente i modelli dopo 5 minuti di inattività (configurabile con OLLAMA_KEEP_ALIVE), liberando la RAM. Il timer systemd si attiva ogni 5 minuti, quindi il modello resta caricato durante le finestre di analisi attive e si scarica tra una e l'altra.

Se la memoria è scarsa, usa gemma2:9b con OLLAMA_KEEP_ALIVE=1m per far scaricare il modello più rapidamente dopo ogni batch.

Come si interrogano i log di Loki da uno script Python?

Interroga l'API HTTP di Loki a /loki/api/v1/query_range con un'espressione LogQL e una finestra temporale. L'API restituisce JSON con flussi di log. Usa la libreria requests di Python per recuperare gli ultimi 5 minuti di log per un'etichetta job data.

Prima, prepara il progetto:

mkdir -p /opt/log-analyzer
cd /opt/log-analyzer

Crea il file dei requisiti:

cat <<'EOF' > /opt/log-analyzer/requirements.txt
requests>=2.31.0
pydantic>=2.5.0
ollama>=0.4.0
EOF

Installa le dipendenze in un ambiente virtuale. Su Ubuntu 24.04 serve prima il pacchetto python3-venv:

apt-get install -y python3.12-venv
python3 -m venv /opt/log-analyzer/venv
/opt/log-analyzer/venv/bin/pip install -r /opt/log-analyzer/requirements.txt

Verifica l'installazione:

/opt/log-analyzer/venv/bin/python -c "import requests, pydantic, ollama; print('OK')"

Funzione di query Loki

La funzione seguente interroga Loki per i log recenti:

import requests
from datetime import datetime, timedelta, timezone


def query_loki(
    loki_url: str,
    logql: str,
    minutes: int = 5,
    limit: int = 500,
) -> list[str]:
    """Query Loki for log lines from the last N minutes."""
    now = datetime.now(timezone.utc)
    start = now - timedelta(minutes=minutes)

    params = {
        "query": logql,
        "start": str(int(start.timestamp() * 1e9)),  # nanosecond epoch
        "end": str(int(now.timestamp() * 1e9)),
        "limit": limit,
    }

    resp = requests.get(
        f"{loki_url}/loki/api/v1/query_range",
        params=params,
        timeout=10,
    )
    resp.raise_for_status()

    data = resp.json()
    lines = []
    for stream in data.get("data", {}).get("result", []):
        for _ts, line in stream.get("values", []):
            lines.append(line)

    return lines

I parametri start e end usano timestamp Unix in nanosecondi. La risposta di Loki annida le righe di log dentro data.result[].values[], dove ogni valore è una coppia [timestamp, line].

Esempi di query LogQL che userai:

# All syslog entries
SYSLOG_QUERY = '{job="syslog"}'

# Nginx error logs
NGINX_QUERY = '{job="nginx"} |= "error"'

# SSH authentication events
AUTH_QUERY = '{job="syslog"} |~ "(sshd|pam_unix)"'

Testa la query sulla tua istanza Loki in esecuzione:

curl -s 'http://localhost:3100/loki/api/v1/query_range' \
  --data-urlencode 'query={job="syslog"}' \
  --data-urlencode "start=$(date -d '5 minutes ago' +%s)000000000" \
  --data-urlencode "end=$(date +%s)000000000" \
  --data-urlencode 'limit=10' \
  | python3 -m json.tool | head -30

Dovresti vedere righe di log nell'array result. Se l'array è vuoto, verifica che Promtail stia inviando log a Loki e che l'etichetta job corrisponda alla tua configurazione Promtail.

Come si scrivono prompt che classificano le voci di log come anomalie?

Il prompt è il cervello di questo sistema. Un buon prompt dice al LLM cosa cercare, definisce le categorie di classificazione e richiede output strutturato. I prompt scarsi producono riassunti vaghi. I buoni prompt producono JSON utilizzabile.

Tre template di prompt coprono la maggior parte delle esigenze di analisi dei log del server: rilevamento anomalie generali, rilevamento eventi di sicurezza e rilevamento problemi di prestazioni. Ogni prompt include lo schema di classificazione inline perché il modello conosca il formato di output atteso.

Prompt 1: Rilevamento anomalie generali

PROMPT_GENERAL = """You are a server log analyzer. Analyze the following log lines and classify each anomaly found.

Rules:
- Only report anomalies. Normal operational logs should be ignored.
- An anomaly is anything unexpected: errors, warnings, unusual patterns, failed operations.
- Group related log lines into a single finding.
- Assign a severity: "low", "medium", "high", or "critical".

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of the anomaly",
      "severity": "low|medium|high|critical",
      "log_lines": ["the relevant log lines"],
      "explanation": "what this means and potential impact"
    }}
  ],
  "summary": "one sentence summary of overall log health"
}}

If no anomalies are found, return {{"findings": [], "summary": "No anomalies detected."}}.
"""

Prompt 2: Rilevamento eventi di sicurezza

PROMPT_SECURITY = """You are a security analyst reviewing server logs. Identify security-relevant events.

Focus on:
- Brute-force attempts (repeated failed logins from same IP)
- Successful logins from unusual IPs or at unusual times
- Privilege escalation attempts (sudo failures, su attempts)
- Port scanning patterns
- Unauthorized access attempts to files or services

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of security event",
      "severity": "low|medium|high|critical",
      "source_ips": ["IP addresses involved"],
      "log_lines": ["the relevant log lines"],
      "recommendation": "suggested response action"
    }}
  ],
  "summary": "one sentence security posture assessment"
}}

If no security events are found, return {{"findings": [], "summary": "No security events detected."}}.
"""

Prompt 3: Rilevamento problemi di prestazioni

PROMPT_PERFORMANCE = """You are a performance engineer reviewing server logs. Identify performance-related issues.

Focus on:
- High response times or timeouts
- Resource exhaustion (OOM kills, disk full, connection limits)
- Service restarts or crashes
- Queue backlogs or processing delays
- Error rate spikes

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of performance issue",
      "severity": "low|medium|high|critical",
      "affected_service": "service name if identifiable",
      "log_lines": ["the relevant log lines"],
      "explanation": "what this means for system performance"
    }}
  ],
  "summary": "one sentence performance assessment"
}}

If no performance issues are found, return {{"findings": [], "summary": "No performance issues detected."}}.
"""

Incorporare lo schema JSON direttamente nel prompt è intenzionale. Dà al modello due segnali: il parametro format impone JSON valido, e lo schema nel prompt guida la struttura. Questa combinazione produce output affidabile anche con modelli piccoli.

Come si ottiene output JSON strutturato da Ollama?

Ollama supporta l'output strutturato tramite il parametro format nella sua API. Passa uno schema JSON e il modello genererà solo token conformi. Combinato con un modello Pydantic lato Python, ottieni dati validati e tipizzati da ogni chiamata di inferenza.

Definisci i modelli Pydantic:

from pydantic import BaseModel


class Finding(BaseModel):
    title: str
    severity: str  # low, medium, high, critical
    log_lines: list[str]
    explanation: str = ""
    recommendation: str = ""
    source_ips: list[str] = []
    affected_service: str = ""


class AnalysisResult(BaseModel):
    findings: list[Finding]
    summary: str

Chiama Ollama con lo schema imposto:

from ollama import chat


def analyze_logs(
    logs: list[str],
    model: str = "gemma2:9b",
    prompt_template: str = PROMPT_GENERAL,
) -> AnalysisResult:
    """Send logs to Ollama and get structured analysis back."""
    if not logs:
        return AnalysisResult(findings=[], summary="No logs to analyze.")

    # Truncate to avoid context window issues
    log_block = "\n".join(logs[:200])
    prompt = prompt_template.format(logs=log_block)

    response = chat(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        format=AnalysisResult.model_json_schema(),
        options={"temperature": 0.1},
    )

    return AnalysisResult.model_validate_json(response.message.content)

Dettagli chiave:

  • format=AnalysisResult.model_json_schema() dice a Ollama di imporre lo schema JSON a livello di generazione dei token. Il modello non può produrre output che violi lo schema.
  • temperature: 0.1 mantiene l'output deterministico. La classificazione dei log non dovrebbe essere creativa.
  • Il troncamento a 200 righe previene gli overflow della finestra di contesto. Gemma 2 9B ha una finestra di contesto di 8192 token. 200 righe di log a ~20 token ciascuna usano circa metà del contesto.
  • model_validate_json() effettua il parsing della stringa in un oggetto Pydantic tipizzato. Se il parsing fallisce (raro con l'imposizione dello schema), viene lanciato un ValidationError che puoi intercettare.

Testa la funzione dalla shell Python:

/opt/log-analyzer/venv/bin/python3 -c "
from ollama import chat
import json

response = chat(
    model='gemma2:9b',
    messages=[{'role': 'user', 'content': 'Analyze this log: Failed password for root from 203.0.113.5 port 44322 ssh2'}],
    format={
        'type': 'object',
        'properties': {
            'findings': {'type': 'array', 'items': {'type': 'object'}},
            'summary': {'type': 'string'}
        },
        'required': ['findings', 'summary']
    },
    options={'temperature': 0.1},
)
print(json.dumps(json.loads(response.message.content), indent=2))
"

Dovresti vedere un oggetto JSON pulito con le chiavi findings e summary. Nessun blocco markdown, nessun preambolo, solo JSON.

Come si inviano avvisi di anomalia a Discord e Slack?

Invia una richiesta POST con un payload JSON a un URL webhook. Discord usa un array embeds con campi con codice colore. Slack usa Block Kit con blocks e campi text. Entrambi accettano un singolo POST HTTPS.

Webhook Discord

Crea un webhook nel tuo server Discord: Impostazioni server > Integrazioni > Webhook > Nuovo webhook. Copia l'URL.

Salva l'URL del webhook in modo sicuro:

cat <<'EOF' > /opt/log-analyzer/.env
DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/YOUR_ID/YOUR_TOKEN
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
LOKI_URL=http://localhost:3100
OLLAMA_MODEL=gemma2:9b
EOF
chmod 600 /opt/log-analyzer/.env

La funzione di invio avvisi:

import os
import requests

# Severity to Discord embed color (decimal)
SEVERITY_COLORS = {
    "critical": 15158332,  # red
    "high": 15105570,      # orange
    "medium": 16776960,    # yellow
    "low": 3447003,        # blue
}


def send_discord_alert(webhook_url: str, result: AnalysisResult) -> None:
    """Send findings to Discord as an embed."""
    if not result.findings:
        return

    for finding in result.findings:
        embed = {
            "title": f"[{finding.severity.upper()}] {finding.title}",
            "color": SEVERITY_COLORS.get(finding.severity, 3447003),
            "fields": [
                {
                    "name": "Explanation",
                    "value": finding.explanation or finding.recommendation or "N/A",
                    "inline": False,
                },
                {
                    "name": "Sample log lines",
                    "value": "```\n" + "\n".join(finding.log_lines[:5]) + "\n```",
                    "inline": False,
                },
            ],
        }
        if finding.source_ips:
            embed["fields"].append({
                "name": "Source IPs",
                "value": ", ".join(finding.source_ips),
                "inline": True,
            })

        payload = {"embeds": [embed]}
        resp = requests.post(webhook_url, json=payload, timeout=10)
        resp.raise_for_status()

Testa il webhook Discord con curl:

curl -s -X POST "$DISCORD_WEBHOOK_URL" \
  -H "Content-Type: application/json" \
  -d '{
    "embeds": [{
      "title": "[HIGH] Test Alert - SSH Brute Force",
      "color": 15105570,
      "fields": [
        {"name": "Explanation", "value": "Multiple failed SSH login attempts from 203.0.113.5", "inline": false},
        {"name": "Sample log lines", "value": "```\nFailed password for root from 203.0.113.5\n```", "inline": false}
      ]
    }]
  }'

Controlla il tuo canale Discord. Dovresti vedere un messaggio embed con codice colore.

Webhook Slack

Crea un'app Slack su api.slack.com/apps, abilita gli Incoming Webhooks e copia l'URL del webhook.

def send_slack_alert(webhook_url: str, result: AnalysisResult) -> None:
    """Send findings to Slack using Block Kit."""
    if not result.findings:
        return

    for finding in result.findings:
        severity_emoji = {
            "critical": ":rotating_light:",
            "high": ":warning:",
            "medium": ":large_yellow_circle:",
            "low": ":information_source:",
        }
        emoji = severity_emoji.get(finding.severity, ":grey_question:")

        blocks = [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": f"{emoji} [{finding.severity.upper()}] {finding.title}",
                },
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": finding.explanation or finding.recommendation or "N/A",
                },
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": "```" + "\n".join(finding.log_lines[:5]) + "```",
                },
            },
        ]

        payload = {
            "text": f"[{finding.severity.upper()}] {finding.title}",
            "blocks": blocks,
        }
        resp = requests.post(webhook_url, json=payload, timeout=10)
        resp.raise_for_status()

Testa il webhook Slack:

curl -s -X POST "$SLACK_WEBHOOK_URL" \
  -H "Content-Type: application/json" \
  -d '{"text": "[HIGH] Test Alert - SSH Brute Force", "blocks": [{"type": "header", "text": {"type": "plain_text", "text": ":warning: [HIGH] Test Alert"}}]}'

Come si evita l'invio di avvisi duplicati?

Senza deduplicazione, lo stesso brute-force SSH dallo stesso IP genera un avviso ogni 5 minuti per ore. Usa una cache basata su file che memorizza un hash del titolo e della fonte di ogni finding. L'avviso viene saltato se lo stesso hash è apparso nell'ultima ora.

import hashlib
import json
import time
from pathlib import Path

DEDUP_FILE = Path("/opt/log-analyzer/dedup_cache.json")
DEDUP_WINDOW = 3600  # seconds (1 hour)


def load_dedup_cache() -> dict:
    if DEDUP_FILE.exists():
        try:
            return json.loads(DEDUP_FILE.read_text())
        except (json.JSONDecodeError, OSError):
            return {}
    return {}


def save_dedup_cache(cache: dict) -> None:
    # Prune expired entries
    now = time.time()
    cache = {k: v for k, v in cache.items() if now - v < DEDUP_WINDOW}
    DEDUP_FILE.write_text(json.dumps(cache))


def is_duplicate(finding: Finding) -> bool:
    """Check if this finding was already alerted recently."""
    cache = load_dedup_cache()
    now = time.time()

    # Hash on title + sorted source IPs + severity
    key_material = f"{finding.title}|{finding.severity}|{'|'.join(sorted(finding.source_ips))}"
    key = hashlib.sha256(key_material.encode()).hexdigest()[:16]

    if key in cache and now - cache[key] < DEDUP_WINDOW:
        return True

    cache[key] = now
    save_dedup_cache(cache)
    return False

La cache di deduplicazione è un file JSON con chiavi hash corte mappate a timestamp. Le voci vecchie vengono eliminate ad ogni salvataggio. La finestra di un'ora è un buon valore predefinito: abbastanza lunga per sopprimere avvisi ripetuti, abbastanza corta per avvisare di nuovo se lo stesso problema riappare dopo una pausa.

Imposta i permessi corretti sul file cache:

touch /opt/log-analyzer/dedup_cache.json
chmod 600 /opt/log-analyzer/dedup_cache.json

Lo script completo

Il file log_analyzer.py completo assembla il tutto:

#!/usr/bin/env python3
"""AI Log Analyzer - Query Loki, classify with Ollama, alert to Discord/Slack."""

import hashlib
import json
import logging
import os
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path

import requests as http_requests
from ollama import chat
from pydantic import BaseModel

# --- Configuration ---

LOKI_URL = os.environ.get("LOKI_URL", "http://localhost:3100")
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "gemma2:9b")
DISCORD_WEBHOOK_URL = os.environ.get("DISCORD_WEBHOOK_URL", "")
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")
LOG_QUERIES = os.environ.get(
    "LOG_QUERIES",
    '{job="syslog"};{job="nginx"} |= "error"',
).split(";")
QUERY_WINDOW_MINUTES = int(os.environ.get("QUERY_WINDOW_MINUTES", "5"))
QUERY_LIMIT = int(os.environ.get("QUERY_LIMIT", "500"))
DEDUP_FILE = Path(os.environ.get("DEDUP_FILE", "/opt/log-analyzer/dedup_cache.json"))
DEDUP_WINDOW = int(os.environ.get("DEDUP_WINDOW", "3600"))

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger("log-analyzer")


# --- Models ---

class Finding(BaseModel):
    title: str
    severity: str
    log_lines: list[str]
    explanation: str = ""
    recommendation: str = ""
    source_ips: list[str] = []
    affected_service: str = ""


class AnalysisResult(BaseModel):
    findings: list[Finding]
    summary: str


# --- Prompts ---

PROMPT_GENERAL = """You are a server log analyzer. Analyze the following log lines and classify each anomaly found.

Rules:
- Only report anomalies. Normal operational logs should be ignored.
- An anomaly is anything unexpected: errors, warnings, unusual patterns, failed operations.
- Group related log lines into a single finding.
- Assign a severity: "low", "medium", "high", or "critical".

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of the anomaly",
      "severity": "low|medium|high|critical",
      "log_lines": ["the relevant log lines"],
      "explanation": "what this means and potential impact"
    }}
  ],
  "summary": "one sentence summary of overall log health"
}}

If no anomalies are found, return {{"findings": [], "summary": "No anomalies detected."}}.
"""


# --- Loki ---

def query_loki(logql: str) -> list[str]:
    """Query Loki for log lines from the last N minutes."""
    now = datetime.now(timezone.utc)
    start = now - timedelta(minutes=QUERY_WINDOW_MINUTES)

    params = {
        "query": logql,
        "start": str(int(start.timestamp() * 1e9)),
        "end": str(int(now.timestamp() * 1e9)),
        "limit": QUERY_LIMIT,
    }

    resp = http_requests.get(
        f"{LOKI_URL}/loki/api/v1/query_range",
        params=params,
        timeout=10,
    )
    resp.raise_for_status()

    data = resp.json()
    lines = []
    for stream in data.get("data", {}).get("result", []):
        for _ts, line in stream.get("values", []):
            lines.append(line)

    return lines


# --- Ollama ---

def analyze_logs(logs: list[str]) -> AnalysisResult:
    """Send logs to Ollama and get structured analysis back."""
    if not logs:
        return AnalysisResult(findings=[], summary="No logs to analyze.")

    log_block = "\n".join(logs[:200])
    prompt = PROMPT_GENERAL.format(logs=log_block)

    response = chat(
        model=OLLAMA_MODEL,
        messages=[{"role": "user", "content": prompt}],
        format=AnalysisResult.model_json_schema(),
        options={"temperature": 0.1},
    )

    return AnalysisResult.model_validate_json(response.message.content)


# --- Deduplication ---

def load_dedup_cache() -> dict:
    if DEDUP_FILE.exists():
        try:
            return json.loads(DEDUP_FILE.read_text())
        except (json.JSONDecodeError, OSError):
            return {}
    return {}


def save_dedup_cache(cache: dict) -> None:
    now = time.time()
    cache = {k: v for k, v in cache.items() if now - v < DEDUP_WINDOW}
    DEDUP_FILE.write_text(json.dumps(cache))


def is_duplicate(finding: Finding) -> bool:
    cache = load_dedup_cache()
    now = time.time()
    key_material = f"{finding.title}|{finding.severity}|{'|'.join(sorted(finding.source_ips))}"
    key = hashlib.sha256(key_material.encode()).hexdigest()[:16]

    if key in cache and now - cache[key] < DEDUP_WINDOW:
        return True

    cache[key] = now
    save_dedup_cache(cache)
    return False


# --- Alerting ---

SEVERITY_COLORS = {
    "critical": 15158332,
    "high": 15105570,
    "medium": 16776960,
    "low": 3447003,
}


def send_discord_alert(finding: Finding) -> None:
    if not DISCORD_WEBHOOK_URL:
        return

    embed = {
        "title": f"[{finding.severity.upper()}] {finding.title}",
        "color": SEVERITY_COLORS.get(finding.severity, 3447003),
        "fields": [
            {
                "name": "Explanation",
                "value": finding.explanation or finding.recommendation or "N/A",
                "inline": False,
            },
            {
                "name": "Sample log lines",
                "value": "```\n" + "\n".join(finding.log_lines[:5]) + "\n```",
                "inline": False,
            },
        ],
    }
    if finding.source_ips:
        embed["fields"].append({
            "name": "Source IPs",
            "value": ", ".join(finding.source_ips),
            "inline": True,
        })

    resp = http_requests.post(
        DISCORD_WEBHOOK_URL, json={"embeds": [embed]}, timeout=10
    )
    resp.raise_for_status()


def send_slack_alert(finding: Finding) -> None:
    if not SLACK_WEBHOOK_URL:
        return

    severity_emoji = {
        "critical": ":rotating_light:",
        "high": ":warning:",
        "medium": ":large_yellow_circle:",
        "low": ":information_source:",
    }
    emoji = severity_emoji.get(finding.severity, ":grey_question:")

    blocks = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": f"{emoji} [{finding.severity.upper()}] {finding.title}",
            },
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": finding.explanation or finding.recommendation or "N/A",
            },
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": "```" + "\n".join(finding.log_lines[:5]) + "```",
            },
        },
    ]

    resp = http_requests.post(
        SLACK_WEBHOOK_URL,
        json={
            "text": f"[{finding.severity.upper()}] {finding.title}",
            "blocks": blocks,
        },
        timeout=10,
    )
    resp.raise_for_status()


def send_alerts(finding: Finding) -> None:
    send_discord_alert(finding)
    send_slack_alert(finding)


# --- Main ---

def main() -> int:
    log.info("Starting log analysis run")
    all_lines = []

    for logql in LOG_QUERIES:
        logql = logql.strip()
        if not logql:
            continue
        try:
            lines = query_loki(logql)
            log.info("Query '%s' returned %d lines", logql, len(lines))
            all_lines.extend(lines)
        except Exception as e:
            log.error("Loki query failed for '%s': %s", logql, e)

    if not all_lines:
        log.info("No log lines to analyze")
        return 0

    log.info("Analyzing %d total log lines with %s", len(all_lines), OLLAMA_MODEL)

    try:
        result = analyze_logs(all_lines)
    except Exception as e:
        log.error("Ollama analysis failed: %s", e)
        return 1

    log.info("Analysis complete: %d findings. %s", len(result.findings), result.summary)

    alerted = 0
    for finding in result.findings:
        if is_duplicate(finding):
            log.info("Skipping duplicate: %s", finding.title)
            continue
        try:
            send_alerts(finding)
            alerted += 1
            log.info("Alerted: [%s] %s", finding.severity, finding.title)
        except Exception as e:
            log.error("Alert failed for '%s': %s", finding.title, e)

    log.info("Run complete. %d new alerts sent.", alerted)
    return 0


if __name__ == "__main__":
    sys.exit(main())

Imposta i permessi:

chmod 750 /opt/log-analyzer/log_analyzer.py
chown root:root /opt/log-analyzer/log_analyzer.py
ls -la /opt/log-analyzer/

Verifica che l'output mostri rwxr-x--- per lo script e rw------- per il file .env.

Esegui un test manuale:

cd /opt/log-analyzer
set -a && source .env && set +a
/opt/log-analyzer/venv/bin/python3 /opt/log-analyzer/log_analyzer.py

Controlla l'output. Dovresti vedere le righe di log recuperate, i risultati dell'analisi e gli avvisi inviati (o saltati se non ci sono anomalie).

Come si esegue l'analisi dei log IA automaticamente con un timer systemd?

Crea una coppia service + timer systemd. Il service esegue lo script Python con le variabili d'ambiente dal file .env. Il timer lo attiva ogni 5 minuti. Se lo script fallisce, systemd registra il fallimento e l'esecuzione successiva procede normalmente.

Crea l'unità service:

cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.service
[Unit]
Description=AI Log Analyzer - Ollama anomaly detection
After=network-online.target ollama.service loki.service
Wants=network-online.target

[Service]
Type=oneshot
EnvironmentFile=/opt/log-analyzer/.env
ExecStart=/opt/log-analyzer/venv/bin/python3 /opt/log-analyzer/log_analyzer.py
WorkingDirectory=/opt/log-analyzer
User=root
StandardOutput=journal
StandardError=journal
TimeoutStartSec=120
EOF

Il TimeoutStartSec=120 dà al LLM fino a 2 minuti per completare l'inferenza. Su un VPS da 8 GB con 100-200 righe di log, l'inferenza finisce tipicamente in 15-25 secondi. Il timeout di 2 minuti copre i casi in cui Ollama deve caricare il modello dal disco.

Il service gira come User=root per semplicità. In produzione, crea un utente dedicato log-analyzer, concedigli accesso in lettura al file .env e aggiorna la direttiva User=. Lo script ha bisogno solo dell'accesso HTTP a Loki e Ollama su localhost, quindi non richiede privilegi elevati.

Crea il timer:

cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.timer
[Unit]
Description=Run AI Log Analyzer every 5 minutes

[Timer]
OnBootSec=2min
OnUnitActiveSec=5min
AccuracySec=30s

[Install]
WantedBy=timers.target
EOF

Abilita e avvia il timer:

sudo systemctl daemon-reload
sudo systemctl enable --now log-analyzer.timer

Il enable --now fa sopravvivere il timer ai riavvii e lo avvia immediatamente.

Verifica che il timer sia attivo:

systemctl status log-analyzer.timer

Dovresti vedere active (waiting) e il prossimo orario di attivazione. Controlla quando è stato eseguito l'ultima volta:

systemctl list-timers log-analyzer.timer

Dopo la prima attivazione, controlla i log del service:

journalctl -u log-analyzer.service -n 30 --no-pager

Cerca i messaggi Starting log analysis run e Run complete. Se vedi Ollama analysis failed, il modello potrebbe non essere scaricato o Ollama potrebbe non essere in esecuzione.

Notifiche di fallimento

Se l'analizzatore fallisce, vuoi saperlo. Aggiungi un handler OnFailure all'unità service:

cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer-failure@.service
[Unit]
Description=Log Analyzer failure notification for %i

[Service]
Type=oneshot
ExecStart=/usr/bin/curl -s -X POST ${DISCORD_WEBHOOK_URL} \
  -H "Content-Type: application/json" \
  -d '{"content": ":x: **Log Analyzer Failed**\nUnit: %i\nTime: %H\nCheck: journalctl -u %i"}'
EnvironmentFile=/opt/log-analyzer/.env
EOF

Aggiungi la direttiva OnFailure al service principale:

sudo mkdir -p /etc/systemd/system/log-analyzer.service.d
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.service.d/failure.conf
[Unit]
OnFailure=log-analyzer-failure@%n.service
EOF
sudo systemctl daemon-reload

Se lo script Python termina con un codice diverso da zero, systemd invia una notifica al tuo canale Discord.

Quali sono i limiti dell'analisi dei log basata su LLM?

L'analisi dei log con LLM è un complemento all'alerting basato su regole, non un sostituto. Cattura pattern difficili da esprimere come regole statiche. Ma ha debolezze reali che devi conoscere.

Allucinazioni. Il modello può segnalare righe di log normali come anomalie o inventare spiegazioni per eventi benigni. I finding a bassa severità del LLM vanno trattati come suggerimenti, non come fatti. Verifica sempre manualmente gli avvisi ad alta severità.

Limiti della finestra di contesto. Gemma 2 9B ha una finestra di contesto di 8192 token. A ~20 token per riga di log, sono circa 400 righe al massimo (con spazio per il prompt e l'output). Lo script tronca a 200 righe per sicurezza. Se il tuo server genera più di 200 righe in 5 minuti, devi filtrare con query LogQL più specifiche o accettare che alcune righe vengano saltate.

Nessun apprendimento nel tempo. Il modello non ha memoria tra le esecuzioni. Non può imparare che un pattern di log specifico è normale per il tuo ambiente. Ogni batch viene analizzato da zero. Se hai un messaggio di log ricorrente che è benigno ma sembra sospetto, aggiungilo a un filtro di esclusione LogQL: {job="syslog"} != "expected noisy message".

Latenza di inferenza. Su un VPS 4 vCPU / 8 GB, l'inferenza richiede 12-22 secondi per batch. Va bene per un timer da 5 minuti ma è troppo lento per alerting in tempo reale. Per eventi time-critical (disco pieno, OOM), mantieni gli alert Prometheus tradizionali .

Falsi negativi. I modelli piccoli mancano pattern sottili. Un memory leak lento che produce un uso di swap leggermente elevato per giorni non apparirà in una finestra di log di 5 minuti. Usa metriche Prometheus e alert Grafana per il rilevamento basato sui trend.

Costo di esecuzione. Anche senza costi per token API, il modello usa ~5,8 GB di RAM quando caricato. Su un VPS da 8 GB, è la maggior parte della memoria. Se il tuo application server ha bisogno di quella RAM, esegui Ollama su un VPS separato o usa il modello più piccolo gemma2:2b (1,6 GB di RAM, precisione inferiore).

Quando usare questo vs alerting tradizionale

Caso d'uso Analisi log LLM Alerting tradizionale (Prometheus)
«Qualcosa sembra strano ma non riesco a scrivere una regola» No
Rilevamento brute-force SSH Sì (buono nel riconoscimento di pattern) Sì (fail2ban è più veloce)
Disco pieno / OOM No (troppo lento)
Pattern di errore sconosciuti No
Superamento soglia metrica No
Cambio formato log Sì (si adatta automaticamente) No (le regole si rompono)

La configurazione migliore li usa entrambi. Prometheus gestisce le modalità di guasto conosciute con alert veloci. L'LLM cattura le incognite sconosciute leggendo il testo effettivo dei log.

Risoluzione dei problemi

Ollama restituisce "model not found": esegui ollama list per controllare i modelli disponibili. Scarica il modello con ollama pull gemma2:9b.

La query Loki restituisce risultati vuoti: verifica che Promtail sia in esecuzione (systemctl status promtail) e che l'etichetta job nella tua query LogQL corrisponda alla configurazione Promtail. Testa con curl direttamente sull'API Loki.

Memoria insufficiente: controlla la RAM con free -h. Se il modello Ollama consuma troppo, imposta OLLAMA_KEEP_ALIVE=1m nell'override del servizio Ollama. Passa a gemma2:2b per un uso di RAM ridotto.

Gli avvisi Discord/Slack non arrivano: testa l'URL del webhook con i comandi curl dalla sezione alerting. Controlla gli errori HTTP nei log dell'analizzatore: journalctl -u log-analyzer.service -n 50.

Inferenza lenta: verifica che il tuo VPS abbia i core CPU attesi con nproc. Ollama usa tutti i core disponibili per l'inferenza. Se un altro processo consuma CPU, l'inferenza rallenta. Controlla con top durante un'esecuzione.

Errori di parsing JSON: se model_validate_json fallisce, il modello ha prodotto JSON non valido nonostante l'imposizione dello schema. È raro ma succede con certi casi limite. Lo script registra l'errore e continua all'esecuzione successiva. Se succede ripetutamente, prova a cambiare modello.

Controlla i log dell'analizzatore:

journalctl -u log-analyzer.service -f

Per problemi specifici di Ollama:

journalctl -u ollama.service -f

Per il prossimo passo nella pipeline AIOps, vedi come costruire una remediation automatizzata che agisce su questi avvisi . Per approcci di osservabilità alternativi, consulta la guida SigNoz e OpenObserve.


Copyright 2026 Virtua.Cloud. Tutti i diritti riservati. Questo contenuto è un'opera originale del team Virtua.Cloud. La riproduzione, ripubblicazione o redistribuzione senza autorizzazione scritta è vietata.

Pronto a provare?

Distribuisci il tuo server in pochi secondi. Linux, Windows o FreeBSD.

Vedi piani VPS