Análisis de logs con IA usando Ollama en un VPS: detectar anomalías con un LLM local
Construye un pipeline de análisis de logs con IA listo para producción en tu VPS. Consulta Loki, clasifica anomalías con un LLM local vía Ollama y envía alertas a Discord o Slack usando un script Python y un timer de systemd.
Tu pipeline de Loki recopila logs. Los dashboards de Grafana permiten buscar en ellos. Pero nadie está mirando esos dashboards a las 3 de la mañana cuando empieza un ataque de fuerza bruta SSH o un disco se llena. Necesitas algo que lea tus logs continuamente y te avise cuando algo va mal.
Este tutorial construye ese sistema. Un script Python consulta Loki cada 5 minutos, envía lotes de logs a un LLM local ejecutándose en Ollama, recibe clasificaciones JSON estructuradas de vuelta y envía alertas de anomalías a Discord o Slack. Sin APIs de IA en la nube, sin facturación por token, y tus datos de logs nunca salen de tu servidor.
Al final tendrás un timer de systemd funcional que se ejecuta sin intervención, deduplica alertas y maneja fallos de forma limpia.
Requisitos previos:
- Un VPS con al menos 8 GB de RAM (4 vCPU recomendados). Ollama y Loki funcionarán en paralelo.
- Un pipeline de logs Loki + Promtail funcionando
- Python 3.10+ instalado
- Familiaridad básica con systemd y Python
Este artículo forma parte de la serie AIOps.
¿Cómo se instala Ollama en un VPS para análisis de logs?
Ollama ejecuta LLMs localmente con un único binario y expone una API HTTP en el puerto 11434. Instálalo con el script oficial, descarga un modelo y verifica que la API responde. Todo el proceso toma menos de 5 minutos con una conexión decente.
El instalador necesita zstd para la extracción. Instálalo primero:
apt-get update && apt-get install -y zstd
Descarga y ejecuta el script de instalación:
curl -fsSL https://ollama.com/install.sh -o ollama-install.sh
sha256sum ollama-install.sh
Inspecciona el script antes de ejecutarlo. Lee el código shell y compara la suma de verificación con una copia conocida si tienes una. También puedes revisar el código fuente en GitHub. Luego ejecútalo:
sh ollama-install.sh
El instalador crea un servicio systemd llamado ollama. Verifica que está funcionando:
systemctl status ollama
Deberías ver active (running) en la salida. Fíjate en la línea Loaded: loaded (/etc/systemd/system/ollama.service; enabled; preset: enabled). El instalador ya lo configuró para iniciar en el arranque.
Verifica que la API esté escuchando:
curl -s http://localhost:11434/api/tags | python3 -m json.tool
Esto devuelve un objeto JSON con un array models. Estará vacío hasta que descargues un modelo.
Vincular Ollama solo a localhost
Por defecto, Ollama escucha en 127.0.0.1:11434. Confírmalo:
ss -tlnp | grep 11434
Si la salida muestra 0.0.0.0:11434, Ollama está expuesto a Internet. Corrige esto configurando la variable de entorno en la unidad systemd:
sudo mkdir -p /etc/systemd/system/ollama.service.d
cat <<'EOF' | sudo tee /etc/systemd/system/ollama.service.d/override.conf
[Service]
Environment="OLLAMA_HOST=127.0.0.1:11434"
EOF
sudo systemctl daemon-reload
sudo systemctl restart ollama
ss -tlnp | grep 11434
Confirma que la salida ahora muestra 127.0.0.1:11434. Exponer una API de LLM a Internet permite que cualquiera ejecute inferencia en tu servidor.
¿Qué modelo LLM funciona mejor para la detección de anomalías en logs de servidor?
Para análisis de logs en un VPS de 8 GB, necesitas un modelo que quepa en memoria junto con Loki y Promtail. Dos modelos funcionan bien: Gemma 2 9B para clasificación general de logs y Llama 3.1 8B para análisis orientado a seguridad. Ambos corren en cuantización Q4 con aproximadamente 5-6 GB de RAM.
Descarga ambos modelos:
ollama pull gemma2:9b
ollama pull llama3.1:8b
Cada descarga es de aproximadamente 5-6 GB. Después de descargar, verifica:
ollama list
Prueba una inferencia rápida para confirmar que el modelo se carga:
curl -s http://localhost:11434/api/generate \
-d '{"model": "gemma2:9b", "prompt": "Classify this log line: Failed password for root from 203.0.113.5 port 22", "stream": false}' \
| python3 -m json.tool
Fíjate en eval_duration en la respuesta. Es el tiempo de inferencia en nanosegundos. Divide por 1.000.000 para obtener milisegundos.
Comparación de modelos en un VPS de 4 vCPU / 8 GB
Los siguientes números se midieron en un Virtua Cloud VCS-8 (4 vCPU Ryzen, 8 GB RAM, NVMe) procesando un lote de 100 líneas syslog con el modelo ya cargado en memoria:
| Métrica | Gemma 2 9B (Q4_K_M) | Llama 3.1 8B (Q4_K_M) |
|---|---|---|
| Tamaño del modelo en disco | 5,4 GB | 4,9 GB |
| Uso de RAM (cargado) | ~5,8 GB | ~5,2 GB |
| Tiempo por lote de 100 líneas | ~12-18 s | ~14-22 s |
| Tokens/s | ~18-25 | ~15-20 |
| Precisión en logs de seguridad | Buena | Mejor |
| Detección general de anomalías | Mejor | Buena |
Los arranques en frío son más lentos. La primera inferencia después de que Ollama carga el modelo desde disco añade 5-10 segundos. Las llamadas posteriores dentro de la ventana de keep-alive se ejecutan a las velocidades indicadas arriba.
Recomendación: empieza con gemma2:9b para análisis general de logs. Cambia a llama3.1:8b si analizas principalmente logs de autenticación y seguridad.
Presupuesto de RAM en 8 GB
| Componente | Uso de RAM |
|---|---|
| SO + procesos del sistema | ~400 MB |
| Loki | ~300-500 MB |
| Promtail | ~50 MB |
| Ollama (inactivo, sin modelo cargado) | ~30 MB |
| Ollama (gemma2:9b cargado) | ~5,8 GB |
| Script Python | ~50 MB |
| Total | ~6,7-6,9 GB |
Esto cabe en 8 GB con ~1 GB de margen. Ollama descarga automáticamente los modelos después de 5 minutos de inactividad (configurable con OLLAMA_KEEP_ALIVE), liberando la RAM. El timer de systemd se activa cada 5 minutos, así que el modelo permanece cargado durante las ventanas de análisis activas y se descarga entre ellas.
Si la memoria es escasa, usa gemma2:9b con OLLAMA_KEEP_ALIVE=1m para que el modelo se descargue más rápido después de cada lote.
¿Cómo se consultan los logs de Loki desde un script Python?
Consulta la API HTTP de Loki en /loki/api/v1/query_range con una expresión LogQL y una ventana de tiempo. La API devuelve JSON con flujos de logs. Usa la biblioteca requests de Python para obtener los últimos 5 minutos de logs para una etiqueta de job dada.
Primero, configura el proyecto:
mkdir -p /opt/log-analyzer
cd /opt/log-analyzer
Crea el archivo de requisitos:
cat <<'EOF' > /opt/log-analyzer/requirements.txt
requests>=2.31.0
pydantic>=2.5.0
ollama>=0.4.0
EOF
Instala las dependencias en un entorno virtual. En Ubuntu 24.04 necesitas el paquete python3-venv primero:
apt-get install -y python3.12-venv
python3 -m venv /opt/log-analyzer/venv
/opt/log-analyzer/venv/bin/pip install -r /opt/log-analyzer/requirements.txt
Verifica la instalación:
/opt/log-analyzer/venv/bin/python -c "import requests, pydantic, ollama; print('OK')"
Función de consulta a Loki
La siguiente función consulta Loki para logs recientes:
import requests
from datetime import datetime, timedelta, timezone
def query_loki(
loki_url: str,
logql: str,
minutes: int = 5,
limit: int = 500,
) -> list[str]:
"""Query Loki for log lines from the last N minutes."""
now = datetime.now(timezone.utc)
start = now - timedelta(minutes=minutes)
params = {
"query": logql,
"start": str(int(start.timestamp() * 1e9)), # nanosecond epoch
"end": str(int(now.timestamp() * 1e9)),
"limit": limit,
}
resp = requests.get(
f"{loki_url}/loki/api/v1/query_range",
params=params,
timeout=10,
)
resp.raise_for_status()
data = resp.json()
lines = []
for stream in data.get("data", {}).get("result", []):
for _ts, line in stream.get("values", []):
lines.append(line)
return lines
Los parámetros start y end usan marcas de tiempo Unix en nanosegundos. La respuesta de Loki anida las líneas de logs dentro de data.result[].values[], donde cada valor es un par [timestamp, line].
Ejemplos de consultas LogQL que usarás:
# All syslog entries
SYSLOG_QUERY = '{job="syslog"}'
# Nginx error logs
NGINX_QUERY = '{job="nginx"} |= "error"'
# SSH authentication events
AUTH_QUERY = '{job="syslog"} |~ "(sshd|pam_unix)"'
Prueba la consulta contra tu instancia de Loki:
curl -s 'http://localhost:3100/loki/api/v1/query_range' \
--data-urlencode 'query={job="syslog"}' \
--data-urlencode "start=$(date -d '5 minutes ago' +%s)000000000" \
--data-urlencode "end=$(date +%s)000000000" \
--data-urlencode 'limit=10' \
| python3 -m json.tool | head -30
Deberías ver líneas de logs en el array result. Si el array está vacío, verifica que Promtail está enviando logs a Loki y que la etiqueta de job coincide con tu configuración de Promtail.
¿Cómo se escriben prompts que clasifiquen entradas de logs como anomalías?
El prompt es el cerebro de este sistema. Un buen prompt le dice al LLM exactamente qué buscar, define categorías de clasificación y exige una salida estructurada. Los prompts malos producen resúmenes vagos. Los buenos producen JSON accionable.
Tres plantillas de prompts cubren la mayoría de las necesidades de análisis de logs de servidor: detección general de anomalías, detección de eventos de seguridad y detección de problemas de rendimiento. Cada prompt incluye el esquema de clasificación en línea para que el modelo conozca el formato de salida esperado.
Prompt 1: Detección general de anomalías
PROMPT_GENERAL = """You are a server log analyzer. Analyze the following log lines and classify each anomaly found.
Rules:
- Only report anomalies. Normal operational logs should be ignored.
- An anomaly is anything unexpected: errors, warnings, unusual patterns, failed operations.
- Group related log lines into a single finding.
- Assign a severity: "low", "medium", "high", or "critical".
Log lines:
{logs}
Respond with a JSON object matching this schema:
{{
"findings": [
{{
"title": "short description of the anomaly",
"severity": "low|medium|high|critical",
"log_lines": ["the relevant log lines"],
"explanation": "what this means and potential impact"
}}
],
"summary": "one sentence summary of overall log health"
}}
If no anomalies are found, return {{"findings": [], "summary": "No anomalies detected."}}.
"""
Prompt 2: Detección de eventos de seguridad
PROMPT_SECURITY = """You are a security analyst reviewing server logs. Identify security-relevant events.
Focus on:
- Brute-force attempts (repeated failed logins from same IP)
- Successful logins from unusual IPs or at unusual times
- Privilege escalation attempts (sudo failures, su attempts)
- Port scanning patterns
- Unauthorized access attempts to files or services
Log lines:
{logs}
Respond with a JSON object matching this schema:
{{
"findings": [
{{
"title": "short description of security event",
"severity": "low|medium|high|critical",
"source_ips": ["IP addresses involved"],
"log_lines": ["the relevant log lines"],
"recommendation": "suggested response action"
}}
],
"summary": "one sentence security posture assessment"
}}
If no security events are found, return {{"findings": [], "summary": "No security events detected."}}.
"""
Prompt 3: Detección de problemas de rendimiento
PROMPT_PERFORMANCE = """You are a performance engineer reviewing server logs. Identify performance-related issues.
Focus on:
- High response times or timeouts
- Resource exhaustion (OOM kills, disk full, connection limits)
- Service restarts or crashes
- Queue backlogs or processing delays
- Error rate spikes
Log lines:
{logs}
Respond with a JSON object matching this schema:
{{
"findings": [
{{
"title": "short description of performance issue",
"severity": "low|medium|high|critical",
"affected_service": "service name if identifiable",
"log_lines": ["the relevant log lines"],
"explanation": "what this means for system performance"
}}
],
"summary": "one sentence performance assessment"
}}
If no performance issues are found, return {{"findings": [], "summary": "No performance issues detected."}}.
"""
Incluir el esquema JSON directamente en el prompt es intencional. Le da al modelo dos señales: el parámetro format impone JSON válido, y el esquema en el prompt guía la estructura. Esta combinación produce salidas fiables con modelos pequeños.
¿Cómo se obtiene salida JSON estructurada de Ollama?
Ollama soporta salida estructurada mediante el parámetro format en su API. Pasa un esquema JSON y el modelo solo generará tokens que lo cumplan. Combinado con un modelo Pydantic en el lado Python, obtienes datos validados y tipados de cada llamada de inferencia.
Define los modelos Pydantic:
from pydantic import BaseModel
class Finding(BaseModel):
title: str
severity: str # low, medium, high, critical
log_lines: list[str]
explanation: str = ""
recommendation: str = ""
source_ips: list[str] = []
affected_service: str = ""
class AnalysisResult(BaseModel):
findings: list[Finding]
summary: str
Llama a Ollama con el esquema forzado:
from ollama import chat
def analyze_logs(
logs: list[str],
model: str = "gemma2:9b",
prompt_template: str = PROMPT_GENERAL,
) -> AnalysisResult:
"""Send logs to Ollama and get structured analysis back."""
if not logs:
return AnalysisResult(findings=[], summary="No logs to analyze.")
# Truncate to avoid context window issues
log_block = "\n".join(logs[:200])
prompt = prompt_template.format(logs=log_block)
response = chat(
model=model,
messages=[{"role": "user", "content": prompt}],
format=AnalysisResult.model_json_schema(),
options={"temperature": 0.1},
)
return AnalysisResult.model_validate_json(response.message.content)
Detalles clave:
format=AnalysisResult.model_json_schema()le indica a Ollama que imponga el esquema JSON a nivel de generación de tokens. El modelo no puede producir una salida que viole el esquema.temperature: 0.1mantiene la salida determinista. La clasificación de logs no debería ser creativa.- Truncar a 200 líneas evita desbordamientos de la ventana de contexto. Gemma 2 9B tiene una ventana de contexto de 8192 tokens. 200 líneas de log a ~20 tokens cada una usan aproximadamente la mitad del contexto.
model_validate_json()parsea la cadena en un objeto Pydantic tipado. Si el parseo falla (raro con la imposición de esquema), lanza unValidationErrorque puedes capturar.
Prueba la función desde la shell de Python:
/opt/log-analyzer/venv/bin/python3 -c "
from ollama import chat
import json
response = chat(
model='gemma2:9b',
messages=[{'role': 'user', 'content': 'Analyze this log: Failed password for root from 203.0.113.5 port 44322 ssh2'}],
format={
'type': 'object',
'properties': {
'findings': {'type': 'array', 'items': {'type': 'object'}},
'summary': {'type': 'string'}
},
'required': ['findings', 'summary']
},
options={'temperature': 0.1},
)
print(json.dumps(json.loads(response.message.content), indent=2))
"
Deberías ver un objeto JSON limpio con las claves findings y summary. Sin bloques markdown, sin preámbulo, solo JSON.
¿Cómo se envían alertas de anomalías a Discord y Slack?
Envía una petición POST con un payload JSON a una URL de webhook. Discord usa un array embeds con campos codificados por color. Slack usa Block Kit con blocks y campos text. Ambos aceptan un único POST HTTPS.
Webhook de Discord
Crea un webhook en tu servidor de Discord: Ajustes del servidor > Integraciones > Webhooks > Nuevo webhook. Copia la URL.
Almacena la URL del webhook de forma segura:
cat <<'EOF' > /opt/log-analyzer/.env
DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/YOUR_ID/YOUR_TOKEN
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
LOKI_URL=http://localhost:3100
OLLAMA_MODEL=gemma2:9b
EOF
chmod 600 /opt/log-analyzer/.env
La función de envío de alertas:
import os
import requests
# Severity to Discord embed color (decimal)
SEVERITY_COLORS = {
"critical": 15158332, # red
"high": 15105570, # orange
"medium": 16776960, # yellow
"low": 3447003, # blue
}
def send_discord_alert(webhook_url: str, result: AnalysisResult) -> None:
"""Send findings to Discord as an embed."""
if not result.findings:
return
for finding in result.findings:
embed = {
"title": f"[{finding.severity.upper()}] {finding.title}",
"color": SEVERITY_COLORS.get(finding.severity, 3447003),
"fields": [
{
"name": "Explanation",
"value": finding.explanation or finding.recommendation or "N/A",
"inline": False,
},
{
"name": "Sample log lines",
"value": "```\n" + "\n".join(finding.log_lines[:5]) + "\n```",
"inline": False,
},
],
}
if finding.source_ips:
embed["fields"].append({
"name": "Source IPs",
"value": ", ".join(finding.source_ips),
"inline": True,
})
payload = {"embeds": [embed]}
resp = requests.post(webhook_url, json=payload, timeout=10)
resp.raise_for_status()
Prueba el webhook de Discord con curl:
curl -s -X POST "$DISCORD_WEBHOOK_URL" \
-H "Content-Type: application/json" \
-d '{
"embeds": [{
"title": "[HIGH] Test Alert - SSH Brute Force",
"color": 15105570,
"fields": [
{"name": "Explanation", "value": "Multiple failed SSH login attempts from 203.0.113.5", "inline": false},
{"name": "Sample log lines", "value": "```\nFailed password for root from 203.0.113.5\n```", "inline": false}
]
}]
}'
Revisa tu canal de Discord. Deberías ver un mensaje embed con código de color.
Webhook de Slack
Crea una app de Slack en api.slack.com/apps, activa Incoming Webhooks y copia la URL del webhook.
def send_slack_alert(webhook_url: str, result: AnalysisResult) -> None:
"""Send findings to Slack using Block Kit."""
if not result.findings:
return
for finding in result.findings:
severity_emoji = {
"critical": ":rotating_light:",
"high": ":warning:",
"medium": ":large_yellow_circle:",
"low": ":information_source:",
}
emoji = severity_emoji.get(finding.severity, ":grey_question:")
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{emoji} [{finding.severity.upper()}] {finding.title}",
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": finding.explanation or finding.recommendation or "N/A",
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "```" + "\n".join(finding.log_lines[:5]) + "```",
},
},
]
payload = {
"text": f"[{finding.severity.upper()}] {finding.title}",
"blocks": blocks,
}
resp = requests.post(webhook_url, json=payload, timeout=10)
resp.raise_for_status()
Prueba el webhook de Slack:
curl -s -X POST "$SLACK_WEBHOOK_URL" \
-H "Content-Type: application/json" \
-d '{"text": "[HIGH] Test Alert - SSH Brute Force", "blocks": [{"type": "header", "text": {"type": "plain_text", "text": ":warning: [HIGH] Test Alert"}}]}'
¿Cómo se evita el envío de alertas duplicadas?
Sin deduplicación, el mismo ataque SSH de fuerza bruta desde la misma IP dispara una alerta cada 5 minutos durante horas. Usa una caché basada en archivo que almacena un hash del título y la fuente de cada hallazgo. La alerta se omite si el mismo hash apareció en la última hora.
import hashlib
import json
import time
from pathlib import Path
DEDUP_FILE = Path("/opt/log-analyzer/dedup_cache.json")
DEDUP_WINDOW = 3600 # seconds (1 hour)
def load_dedup_cache() -> dict:
if DEDUP_FILE.exists():
try:
return json.loads(DEDUP_FILE.read_text())
except (json.JSONDecodeError, OSError):
return {}
return {}
def save_dedup_cache(cache: dict) -> None:
# Prune expired entries
now = time.time()
cache = {k: v for k, v in cache.items() if now - v < DEDUP_WINDOW}
DEDUP_FILE.write_text(json.dumps(cache))
def is_duplicate(finding: Finding) -> bool:
"""Check if this finding was already alerted recently."""
cache = load_dedup_cache()
now = time.time()
# Hash on title + sorted source IPs + severity
key_material = f"{finding.title}|{finding.severity}|{'|'.join(sorted(finding.source_ips))}"
key = hashlib.sha256(key_material.encode()).hexdigest()[:16]
if key in cache and now - cache[key] < DEDUP_WINDOW:
return True
cache[key] = now
save_dedup_cache(cache)
return False
La caché de deduplicación es un archivo JSON con claves hash cortas mapeadas a timestamps. Las entradas antiguas se purgan en cada guardado. La ventana de una hora es un buen valor por defecto: lo bastante larga para suprimir alertas repetidas, lo bastante corta para volver a alertar si el mismo problema reaparece tras una pausa.
Establece los permisos adecuados en el archivo de caché:
touch /opt/log-analyzer/dedup_cache.json
chmod 600 /opt/log-analyzer/dedup_cache.json
El script completo
El archivo log_analyzer.py completo une todo:
#!/usr/bin/env python3
"""AI Log Analyzer - Query Loki, classify with Ollama, alert to Discord/Slack."""
import hashlib
import json
import logging
import os
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
import requests as http_requests
from ollama import chat
from pydantic import BaseModel
# --- Configuration ---
LOKI_URL = os.environ.get("LOKI_URL", "http://localhost:3100")
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "gemma2:9b")
DISCORD_WEBHOOK_URL = os.environ.get("DISCORD_WEBHOOK_URL", "")
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")
LOG_QUERIES = os.environ.get(
"LOG_QUERIES",
'{job="syslog"};{job="nginx"} |= "error"',
).split(";")
QUERY_WINDOW_MINUTES = int(os.environ.get("QUERY_WINDOW_MINUTES", "5"))
QUERY_LIMIT = int(os.environ.get("QUERY_LIMIT", "500"))
DEDUP_FILE = Path(os.environ.get("DEDUP_FILE", "/opt/log-analyzer/dedup_cache.json"))
DEDUP_WINDOW = int(os.environ.get("DEDUP_WINDOW", "3600"))
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger("log-analyzer")
# --- Models ---
class Finding(BaseModel):
title: str
severity: str
log_lines: list[str]
explanation: str = ""
recommendation: str = ""
source_ips: list[str] = []
affected_service: str = ""
class AnalysisResult(BaseModel):
findings: list[Finding]
summary: str
# --- Prompts ---
PROMPT_GENERAL = """You are a server log analyzer. Analyze the following log lines and classify each anomaly found.
Rules:
- Only report anomalies. Normal operational logs should be ignored.
- An anomaly is anything unexpected: errors, warnings, unusual patterns, failed operations.
- Group related log lines into a single finding.
- Assign a severity: "low", "medium", "high", or "critical".
Log lines:
{logs}
Respond with a JSON object matching this schema:
{{
"findings": [
{{
"title": "short description of the anomaly",
"severity": "low|medium|high|critical",
"log_lines": ["the relevant log lines"],
"explanation": "what this means and potential impact"
}}
],
"summary": "one sentence summary of overall log health"
}}
If no anomalies are found, return {{"findings": [], "summary": "No anomalies detected."}}.
"""
# --- Loki ---
def query_loki(logql: str) -> list[str]:
"""Query Loki for log lines from the last N minutes."""
now = datetime.now(timezone.utc)
start = now - timedelta(minutes=QUERY_WINDOW_MINUTES)
params = {
"query": logql,
"start": str(int(start.timestamp() * 1e9)),
"end": str(int(now.timestamp() * 1e9)),
"limit": QUERY_LIMIT,
}
resp = http_requests.get(
f"{LOKI_URL}/loki/api/v1/query_range",
params=params,
timeout=10,
)
resp.raise_for_status()
data = resp.json()
lines = []
for stream in data.get("data", {}).get("result", []):
for _ts, line in stream.get("values", []):
lines.append(line)
return lines
# --- Ollama ---
def analyze_logs(logs: list[str]) -> AnalysisResult:
"""Send logs to Ollama and get structured analysis back."""
if not logs:
return AnalysisResult(findings=[], summary="No logs to analyze.")
log_block = "\n".join(logs[:200])
prompt = PROMPT_GENERAL.format(logs=log_block)
response = chat(
model=OLLAMA_MODEL,
messages=[{"role": "user", "content": prompt}],
format=AnalysisResult.model_json_schema(),
options={"temperature": 0.1},
)
return AnalysisResult.model_validate_json(response.message.content)
# --- Deduplication ---
def load_dedup_cache() -> dict:
if DEDUP_FILE.exists():
try:
return json.loads(DEDUP_FILE.read_text())
except (json.JSONDecodeError, OSError):
return {}
return {}
def save_dedup_cache(cache: dict) -> None:
now = time.time()
cache = {k: v for k, v in cache.items() if now - v < DEDUP_WINDOW}
DEDUP_FILE.write_text(json.dumps(cache))
def is_duplicate(finding: Finding) -> bool:
cache = load_dedup_cache()
now = time.time()
key_material = f"{finding.title}|{finding.severity}|{'|'.join(sorted(finding.source_ips))}"
key = hashlib.sha256(key_material.encode()).hexdigest()[:16]
if key in cache and now - cache[key] < DEDUP_WINDOW:
return True
cache[key] = now
save_dedup_cache(cache)
return False
# --- Alerting ---
SEVERITY_COLORS = {
"critical": 15158332,
"high": 15105570,
"medium": 16776960,
"low": 3447003,
}
def send_discord_alert(finding: Finding) -> None:
if not DISCORD_WEBHOOK_URL:
return
embed = {
"title": f"[{finding.severity.upper()}] {finding.title}",
"color": SEVERITY_COLORS.get(finding.severity, 3447003),
"fields": [
{
"name": "Explanation",
"value": finding.explanation or finding.recommendation or "N/A",
"inline": False,
},
{
"name": "Sample log lines",
"value": "```\n" + "\n".join(finding.log_lines[:5]) + "\n```",
"inline": False,
},
],
}
if finding.source_ips:
embed["fields"].append({
"name": "Source IPs",
"value": ", ".join(finding.source_ips),
"inline": True,
})
resp = http_requests.post(
DISCORD_WEBHOOK_URL, json={"embeds": [embed]}, timeout=10
)
resp.raise_for_status()
def send_slack_alert(finding: Finding) -> None:
if not SLACK_WEBHOOK_URL:
return
severity_emoji = {
"critical": ":rotating_light:",
"high": ":warning:",
"medium": ":large_yellow_circle:",
"low": ":information_source:",
}
emoji = severity_emoji.get(finding.severity, ":grey_question:")
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{emoji} [{finding.severity.upper()}] {finding.title}",
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": finding.explanation or finding.recommendation or "N/A",
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "```" + "\n".join(finding.log_lines[:5]) + "```",
},
},
]
resp = http_requests.post(
SLACK_WEBHOOK_URL,
json={
"text": f"[{finding.severity.upper()}] {finding.title}",
"blocks": blocks,
},
timeout=10,
)
resp.raise_for_status()
def send_alerts(finding: Finding) -> None:
send_discord_alert(finding)
send_slack_alert(finding)
# --- Main ---
def main() -> int:
log.info("Starting log analysis run")
all_lines = []
for logql in LOG_QUERIES:
logql = logql.strip()
if not logql:
continue
try:
lines = query_loki(logql)
log.info("Query '%s' returned %d lines", logql, len(lines))
all_lines.extend(lines)
except Exception as e:
log.error("Loki query failed for '%s': %s", logql, e)
if not all_lines:
log.info("No log lines to analyze")
return 0
log.info("Analyzing %d total log lines with %s", len(all_lines), OLLAMA_MODEL)
try:
result = analyze_logs(all_lines)
except Exception as e:
log.error("Ollama analysis failed: %s", e)
return 1
log.info("Analysis complete: %d findings. %s", len(result.findings), result.summary)
alerted = 0
for finding in result.findings:
if is_duplicate(finding):
log.info("Skipping duplicate: %s", finding.title)
continue
try:
send_alerts(finding)
alerted += 1
log.info("Alerted: [%s] %s", finding.severity, finding.title)
except Exception as e:
log.error("Alert failed for '%s': %s", finding.title, e)
log.info("Run complete. %d new alerts sent.", alerted)
return 0
if __name__ == "__main__":
sys.exit(main())
Establece los permisos:
chmod 750 /opt/log-analyzer/log_analyzer.py
chown root:root /opt/log-analyzer/log_analyzer.py
ls -la /opt/log-analyzer/
Verifica que la salida muestra rwxr-x--- para el script y rw------- para el archivo .env.
Ejecuta una prueba manual:
cd /opt/log-analyzer
set -a && source .env && set +a
/opt/log-analyzer/venv/bin/python3 /opt/log-analyzer/log_analyzer.py
Revisa la salida. Deberías ver líneas de logs recuperadas, resultados del análisis y alertas enviadas (u omitidas si no hay anomalías).
¿Cómo se ejecuta el análisis de logs con IA automáticamente con un timer de systemd?
Crea un par servicio + timer de systemd. El servicio ejecuta el script Python con variables de entorno del archivo .env. El timer lo activa cada 5 minutos. Si el script falla, systemd registra el fallo y la siguiente ejecución procede normalmente.
Crea la unidad de servicio:
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.service
[Unit]
Description=AI Log Analyzer - Ollama anomaly detection
After=network-online.target ollama.service loki.service
Wants=network-online.target
[Service]
Type=oneshot
EnvironmentFile=/opt/log-analyzer/.env
ExecStart=/opt/log-analyzer/venv/bin/python3 /opt/log-analyzer/log_analyzer.py
WorkingDirectory=/opt/log-analyzer
User=root
StandardOutput=journal
StandardError=journal
TimeoutStartSec=120
EOF
El TimeoutStartSec=120 da al LLM hasta 2 minutos para completar la inferencia. En un VPS de 8 GB con 100-200 líneas de logs, la inferencia típicamente termina en 15-25 segundos. El timeout de 2 minutos cubre los casos donde Ollama necesita cargar el modelo desde disco.
El servicio se ejecuta como User=root por simplicidad. En producción, crea un usuario dedicado log-analyzer, otórgale acceso de lectura al archivo .env y actualiza la directiva User=. El script solo necesita acceso HTTP a Loki y Ollama en localhost, por lo que no requiere privilegios elevados.
Crea el timer:
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.timer
[Unit]
Description=Run AI Log Analyzer every 5 minutes
[Timer]
OnBootSec=2min
OnUnitActiveSec=5min
AccuracySec=30s
[Install]
WantedBy=timers.target
EOF
Activa e inicia el timer:
sudo systemctl daemon-reload
sudo systemctl enable --now log-analyzer.timer
El enable --now hace que el timer sobreviva a reinicios y lo inicia inmediatamente.
Verifica que el timer está activo:
systemctl status log-analyzer.timer
Deberías ver active (waiting) y la siguiente hora de activación. Comprueba cuándo se ejecutó por última vez:
systemctl list-timers log-analyzer.timer
Después de la primera activación, revisa los logs del servicio:
journalctl -u log-analyzer.service -n 30 --no-pager
Busca los mensajes Starting log analysis run y Run complete. Si ves Ollama analysis failed, puede que el modelo no esté descargado o que Ollama no esté funcionando.
Notificaciones de fallo
Si el analizador falla, quieres saberlo. Añade un handler OnFailure a la unidad de servicio:
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer-failure@.service
[Unit]
Description=Log Analyzer failure notification for %i
[Service]
Type=oneshot
ExecStart=/usr/bin/curl -s -X POST ${DISCORD_WEBHOOK_URL} \
-H "Content-Type: application/json" \
-d '{"content": ":x: **Log Analyzer Failed**\nUnit: %i\nTime: %H\nCheck: journalctl -u %i"}'
EnvironmentFile=/opt/log-analyzer/.env
EOF
Añade la directiva OnFailure al servicio principal:
sudo mkdir -p /etc/systemd/system/log-analyzer.service.d
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.service.d/failure.conf
[Unit]
OnFailure=log-analyzer-failure@%n.service
EOF
sudo systemctl daemon-reload
Si el script Python termina con un código distinto de cero, systemd envía una notificación a tu canal de Discord.
¿Cuáles son las limitaciones del análisis de logs basado en LLM?
El análisis de logs por LLM complementa las alertas basadas en reglas, no las reemplaza. Detecta patrones difíciles de expresar como reglas estáticas. Pero tiene debilidades reales que debes conocer.
Alucinaciones. El modelo puede marcar líneas de logs normales como anomalías o inventar explicaciones para eventos benignos. Los hallazgos de baja severidad del LLM deben tratarse como sugerencias, no como hechos. Verifica siempre manualmente las alertas de alta severidad.
Límites de la ventana de contexto. Gemma 2 9B tiene una ventana de contexto de 8192 tokens. A ~20 tokens por línea de log, son unas 400 líneas como máximo (con espacio para el prompt y la salida). El script trunca a 200 líneas por seguridad. Si tu servidor genera más de 200 líneas en 5 minutos, necesitas filtrar con consultas LogQL más específicas o aceptar que algunas líneas se omitan.
Sin aprendizaje a lo largo del tiempo. El modelo no tiene memoria entre ejecuciones. No puede aprender que un patrón de log específico es normal en tu entorno. Cada lote se analiza desde cero. Si tienes un mensaje de log recurrente que es benigno pero parece sospechoso, añádelo a un filtro de exclusión LogQL: {job="syslog"} != "expected noisy message".
Latencia de inferencia. En un VPS de 4 vCPU / 8 GB, la inferencia toma 12-22 segundos por lote. Está bien para un timer de 5 minutos pero es demasiado lento para alertas en tiempo real. Para eventos críticos en el tiempo (disco lleno, OOM), mantén las alertas tradicionales de Prometheus .
Falsos negativos. Los modelos pequeños pasan por alto patrones sutiles. Una fuga de memoria lenta que produce un uso de swap ligeramente elevado durante días no aparecerá en una ventana de logs de 5 minutos. Usa métricas de Prometheus y alertas de Grafana para detección basada en tendencias.
Coste de ejecución. Aunque no hay costes por token de API, el modelo usa ~5,8 GB de RAM cuando está cargado. En un VPS de 8 GB, eso es la mayor parte de tu memoria. Si tu servidor de aplicaciones necesita esa RAM, ejecuta Ollama en un VPS separado o usa el modelo más pequeño gemma2:2b (1,6 GB de RAM, menor precisión).
Cuándo usar esto vs alertas tradicionales
| Caso de uso | Análisis de logs por LLM | Alertas tradicionales (Prometheus) |
|---|---|---|
| «Algo parece mal pero no puedo escribir una regla» | Sí | No |
| Detección de fuerza bruta SSH | Sí (bueno en reconocimiento de patrones) | Sí (fail2ban es más rápido) |
| Disco lleno / OOM | No (demasiado lento) | Sí |
| Patrones de error desconocidos | Sí | No |
| Cruce de umbral de métricas | No | Sí |
| Cambio de formato de logs | Sí (se adapta automáticamente) | No (las reglas se rompen) |
La mejor configuración ejecuta ambos. Prometheus gestiona los modos de fallo conocidos con alertas rápidas. El LLM detecta las incógnitas desconocidas leyendo el texto real de los logs.
Solución de problemas
Ollama devuelve «model not found»: ejecuta ollama list para comprobar los modelos disponibles. Descarga el modelo con ollama pull gemma2:9b.
La consulta a Loki devuelve resultados vacíos: verifica que Promtail está funcionando (systemctl status promtail) y que la etiqueta de job en tu consulta LogQL coincide con la configuración de Promtail. Prueba con curl directamente contra la API de Loki.
Sin memoria suficiente: comprueba la RAM con free -h. Si el modelo de Ollama consume demasiado, configura OLLAMA_KEEP_ALIVE=1m en el override del servicio Ollama. Cambia a gemma2:2b para menor uso de RAM.
Las alertas de Discord/Slack no llegan: prueba la URL del webhook con los comandos curl de la sección de alertas. Revisa los errores HTTP en los logs del analizador: journalctl -u log-analyzer.service -n 50.
Inferencia lenta: verifica que tu VPS tiene los cores de CPU esperados con nproc. Ollama usa todos los cores disponibles para la inferencia. Si otro proceso consume CPU, la inferencia se ralentiza. Comprueba con top durante una ejecución.
Errores de parseo JSON: si model_validate_json falla, el modelo produjo JSON inválido a pesar de la imposición de esquema. Es raro pero ocurre con ciertos casos límite. El script registra el error y continúa en la siguiente ejecución. Si ocurre repetidamente, prueba a cambiar de modelo.
Revisa los logs del analizador:
journalctl -u log-analyzer.service -f
Para problemas específicos de Ollama:
journalctl -u ollama.service -f
Para el siguiente paso en el pipeline AIOps, consulta cómo construir remediación automatizada que actúe sobre estas alertas . Para enfoques alternativos de observabilidad, consulta la guía de SigNoz y OpenObserve.
Copyright 2026 Virtua.Cloud. Todos los derechos reservados. Este contenido es una obra original del equipo de Virtua.Cloud. La reproducción, republicación o redistribución sin permiso escrito está prohibida.
¿Listo para probarlo?
Despliega tu propio servidor en segundos. Linux, Windows o FreeBSD.
Ver planes VPS