AI-анализ логов с Ollama на VPS: обнаружение аномалий с помощью локальной LLM
Собери production-ready пайплайн AI-анализа логов на своём VPS. Запрашивай логи из Loki, классифицируй аномалии локальной LLM через Ollama и отправляй алерты в Discord или Slack с помощью Python-скрипта и systemd-таймера.
Твой пайплайн Loki собирает логи. Дашборды Grafana позволяют искать по ним. Но никто не смотрит на эти дашборды в 3 часа ночи, когда начинается SSH brute-force или заканчивается место на диске. Нужно что-то, что непрерывно читает логи и сообщает, когда что-то пошло не так.
Этот туториал строит такую систему. Python-скрипт каждые 5 минут запрашивает Loki, отправляет батчи логов в локальную LLM на Ollama, получает структурированные JSON-классификации и отправляет алерты об аномалиях в Discord или Slack. Никаких облачных AI API, никакой оплаты за токены, и данные логов никогда не покидают сервер.
В итоге у тебя будет рабочий systemd-таймер, который работает без присмотра, дедуплицирует алерты и корректно обрабатывает ошибки.
Предварительные требования:
- VPS с минимум 8 ГБ RAM (рекомендуется 4 vCPU). Ollama и Loki будут работать бок о бок.
- Рабочий пайплайн логов Loki + Promtail
- Установлен Python 3.10+
- Базовое знакомство с systemd и Python
Эта статья — часть серии AIOps.
Как установить Ollama на VPS для анализа логов?
Ollama запускает LLM локально одним бинарником и предоставляет HTTP API на порту 11434. Установи его официальным скриптом, скачай модель и проверь, что API отвечает. Весь процесс занимает меньше 5 минут при нормальном соединении.
Инсталлятору нужен zstd для распаковки. Установи сначала его:
apt-get update && apt-get install -y zstd
Скачай и запусти скрипт установки:
curl -fsSL https://ollama.com/install.sh -o ollama-install.sh
sha256sum ollama-install.sh
Проверь скрипт перед запуском. Прочитай shell-код и сверь контрольную сумму с известной копией, если есть. Также можно посмотреть исходник на GitHub. Затем запусти:
sh ollama-install.sh
Инсталлятор создаёт systemd-сервис ollama. Проверь, что он запущен:
systemctl status ollama
В выводе должно быть active (running). Обрати внимание на строку Loaded: loaded (/etc/systemd/system/ollama.service; enabled; preset: enabled). Инсталлятор уже настроил автозапуск.
Проверь, что API слушает:
curl -s http://localhost:11434/api/tags | python3 -m json.tool
Вернётся JSON-объект с массивом models. Он будет пустым, пока не скачаешь модель.
Привязка Ollama только к localhost
По умолчанию Ollama слушает на 127.0.0.1:11434. Подтверди это:
ss -tlnp | grep 11434
Если в выводе 0.0.0.0:11434, Ollama доступен из интернета. Исправь, задав переменную окружения в systemd-юните:
sudo mkdir -p /etc/systemd/system/ollama.service.d
cat <<'EOF' | sudo tee /etc/systemd/system/ollama.service.d/override.conf
[Service]
Environment="OLLAMA_HOST=127.0.0.1:11434"
EOF
sudo systemctl daemon-reload
sudo systemctl restart ollama
ss -tlnp | grep 11434
Убедись, что теперь в выводе 127.0.0.1:11434. Открытый LLM API в интернете позволяет кому угодно запускать инференс на твоём сервере.
Какая LLM-модель лучше подходит для обнаружения аномалий в серверных логах?
Для анализа логов на VPS с 8 ГБ нужна модель, которая помещается в память рядом с Loki и Promtail. Два варианта работают хорошо: Gemma 2 9B для общей классификации логов и Llama 3.1 8B для анализа, ориентированного на безопасность. Обе работают в Q4-квантизации и занимают около 5-6 ГБ RAM.
Скачай обе модели:
ollama pull gemma2:9b
ollama pull llama3.1:8b
Каждая загрузка — примерно 5-6 ГБ. После скачивания проверь:
ollama list
Протестируй быстрый инференс, чтобы убедиться, что модель загружается:
curl -s http://localhost:11434/api/generate \
-d '{"model": "gemma2:9b", "prompt": "Classify this log line: Failed password for root from 203.0.113.5 port 22", "stream": false}' \
| python3 -m json.tool
Обрати внимание на eval_duration в ответе. Это время инференса в наносекундах. Раздели на 1 000 000, чтобы получить миллисекунды.
Сравнение моделей на VPS с 4 vCPU / 8 ГБ
Следующие цифры получены на Virtua Cloud VCS-8 (4 vCPU Ryzen, 8 ГБ RAM, NVMe) при обработке батча из 100 строк syslog с уже загруженной моделью:
| Метрика | Gemma 2 9B (Q4_K_M) | Llama 3.1 8B (Q4_K_M) |
|---|---|---|
| Размер модели на диске | 5,4 ГБ | 4,9 ГБ |
| Использование RAM (загружена) | ~5,8 ГБ | ~5,2 ГБ |
| Время на батч из 100 строк | ~12-18 с | ~14-22 с |
| Токенов/с | ~18-25 | ~15-20 |
| Точность на логах безопасности | Хорошая | Лучше |
| Общее обнаружение аномалий | Лучше | Хорошая |
Холодный старт медленнее. Первый инференс после загрузки модели с диска добавляет 5-10 секунд. Последующие вызовы в пределах keep-alive окна работают на указанных выше скоростях.
Рекомендация: начни с gemma2:9b для общего анализа логов. Переключись на llama3.1:8b, если в основном анализируешь логи аутентификации и безопасности.
Бюджет RAM на 8 ГБ
| Компонент | Использование RAM |
|---|---|
| ОС + системные процессы | ~400 МБ |
| Loki | ~300-500 МБ |
| Promtail | ~50 МБ |
| Ollama (простаивает, модель не загружена) | ~30 МБ |
| Ollama (gemma2:9b загружена) | ~5,8 ГБ |
| Python-скрипт | ~50 МБ |
| Итого | ~6,7-6,9 ГБ |
Это помещается в 8 ГБ с запасом ~1 ГБ. Ollama автоматически выгружает модели после 5 минут неактивности (настраивается через OLLAMA_KEEP_ALIVE), освобождая RAM. Systemd-таймер срабатывает каждые 5 минут, поэтому модель остаётся загруженной во время активного анализа и выгружается между запусками.
Если с памятью туго, используй gemma2:9b с OLLAMA_KEEP_ALIVE=1m, чтобы модель выгружалась быстрее после каждого батча.
Как запрашивать логи Loki из Python-скрипта?
Запрашивай HTTP API Loki на /loki/api/v1/query_range с LogQL-выражением и временным окном. API возвращает JSON с потоками логов. Используй библиотеку requests Python для получения последних 5 минут логов по заданному job-лейблу.
Сначала подготовь проект:
mkdir -p /opt/log-analyzer
cd /opt/log-analyzer
Создай файл зависимостей:
cat <<'EOF' > /opt/log-analyzer/requirements.txt
requests>=2.31.0
pydantic>=2.5.0
ollama>=0.4.0
EOF
Установи зависимости в виртуальном окружении. На Ubuntu 24.04 сначала нужен пакет python3-venv:
apt-get install -y python3.12-venv
python3 -m venv /opt/log-analyzer/venv
/opt/log-analyzer/venv/bin/pip install -r /opt/log-analyzer/requirements.txt
Проверь установку:
/opt/log-analyzer/venv/bin/python -c "import requests, pydantic, ollama; print('OK')"
Функция запроса к Loki
Следующая функция запрашивает Loki для получения свежих логов:
import requests
from datetime import datetime, timedelta, timezone
def query_loki(
loki_url: str,
logql: str,
minutes: int = 5,
limit: int = 500,
) -> list[str]:
"""Query Loki for log lines from the last N minutes."""
now = datetime.now(timezone.utc)
start = now - timedelta(minutes=minutes)
params = {
"query": logql,
"start": str(int(start.timestamp() * 1e9)), # nanosecond epoch
"end": str(int(now.timestamp() * 1e9)),
"limit": limit,
}
resp = requests.get(
f"{loki_url}/loki/api/v1/query_range",
params=params,
timeout=10,
)
resp.raise_for_status()
data = resp.json()
lines = []
for stream in data.get("data", {}).get("result", []):
for _ts, line in stream.get("values", []):
lines.append(line)
return lines
Параметры start и end используют Unix-таймстемпы в наносекундах. Ответ Loki вкладывает строки логов в data.result[].values[], где каждое значение — пара [timestamp, line].
Примеры LogQL-запросов, которые понадобятся:
# All syslog entries
SYSLOG_QUERY = '{job="syslog"}'
# Nginx error logs
NGINX_QUERY = '{job="nginx"} |= "error"'
# SSH authentication events
AUTH_QUERY = '{job="syslog"} |~ "(sshd|pam_unix)"'
Протестируй запрос на работающем инстансе Loki:
curl -s 'http://localhost:3100/loki/api/v1/query_range' \
--data-urlencode 'query={job="syslog"}' \
--data-urlencode "start=$(date -d '5 minutes ago' +%s)000000000" \
--data-urlencode "end=$(date +%s)000000000" \
--data-urlencode 'limit=10' \
| python3 -m json.tool | head -30
В массиве result должны быть строки логов. Если массив пустой, проверь, что Promtail отправляет логи в Loki и что job-лейбл соответствует конфигурации Promtail.
Как писать промпты, классифицирующие записи логов как аномалии?
Промпт — это мозг системы. Хороший промпт говорит LLM, что именно искать, определяет категории классификации и требует структурированный вывод. Плохие промпты дают размытые сводки. Хорошие промпты дают actionable JSON.
Три шаблона промптов покрывают большинство задач анализа серверных логов: общее обнаружение аномалий, обнаружение событий безопасности и обнаружение проблем с производительностью. Каждый промпт включает схему классификации inline, чтобы модель знала ожидаемый формат вывода.
Промпт 1: Общее обнаружение аномалий
PROMPT_GENERAL = """You are a server log analyzer. Analyze the following log lines and classify each anomaly found.
Rules:
- Only report anomalies. Normal operational logs should be ignored.
- An anomaly is anything unexpected: errors, warnings, unusual patterns, failed operations.
- Group related log lines into a single finding.
- Assign a severity: "low", "medium", "high", or "critical".
Log lines:
{logs}
Respond with a JSON object matching this schema:
{{
"findings": [
{{
"title": "short description of the anomaly",
"severity": "low|medium|high|critical",
"log_lines": ["the relevant log lines"],
"explanation": "what this means and potential impact"
}}
],
"summary": "one sentence summary of overall log health"
}}
If no anomalies are found, return {{"findings": [], "summary": "No anomalies detected."}}.
"""
Промпт 2: Обнаружение событий безопасности
PROMPT_SECURITY = """You are a security analyst reviewing server logs. Identify security-relevant events.
Focus on:
- Brute-force attempts (repeated failed logins from same IP)
- Successful logins from unusual IPs or at unusual times
- Privilege escalation attempts (sudo failures, su attempts)
- Port scanning patterns
- Unauthorized access attempts to files or services
Log lines:
{logs}
Respond with a JSON object matching this schema:
{{
"findings": [
{{
"title": "short description of security event",
"severity": "low|medium|high|critical",
"source_ips": ["IP addresses involved"],
"log_lines": ["the relevant log lines"],
"recommendation": "suggested response action"
}}
],
"summary": "one sentence security posture assessment"
}}
If no security events are found, return {{"findings": [], "summary": "No security events detected."}}.
"""
Промпт 3: Обнаружение проблем с производительностью
PROMPT_PERFORMANCE = """You are a performance engineer reviewing server logs. Identify performance-related issues.
Focus on:
- High response times or timeouts
- Resource exhaustion (OOM kills, disk full, connection limits)
- Service restarts or crashes
- Queue backlogs or processing delays
- Error rate spikes
Log lines:
{logs}
Respond with a JSON object matching this schema:
{{
"findings": [
{{
"title": "short description of performance issue",
"severity": "low|medium|high|critical",
"affected_service": "service name if identifiable",
"log_lines": ["the relevant log lines"],
"explanation": "what this means for system performance"
}}
],
"summary": "one sentence performance assessment"
}}
If no performance issues are found, return {{"findings": [], "summary": "No performance issues detected."}}.
"""
Встраивание JSON-схемы прямо в промпт — осознанный выбор. Модель получает два сигнала: параметр format обеспечивает валидный JSON, а схема в промпте задаёт структуру. Такая комбинация даёт надёжный вывод даже с маленькими моделями.
Как получить структурированный JSON от Ollama?
Ollama поддерживает структурированный вывод через параметр format в API. Передай JSON-схему, и модель будет генерировать только токены, соответствующие ей. В сочетании с Pydantic-моделью на стороне Python получаешь валидированные типизированные данные при каждом вызове инференса.
Определи Pydantic-модели:
from pydantic import BaseModel
class Finding(BaseModel):
title: str
severity: str # low, medium, high, critical
log_lines: list[str]
explanation: str = ""
recommendation: str = ""
source_ips: list[str] = []
affected_service: str = ""
class AnalysisResult(BaseModel):
findings: list[Finding]
summary: str
Вызови Ollama с принудительной схемой:
from ollama import chat
def analyze_logs(
logs: list[str],
model: str = "gemma2:9b",
prompt_template: str = PROMPT_GENERAL,
) -> AnalysisResult:
"""Send logs to Ollama and get structured analysis back."""
if not logs:
return AnalysisResult(findings=[], summary="No logs to analyze.")
# Truncate to avoid context window issues
log_block = "\n".join(logs[:200])
prompt = prompt_template.format(logs=log_block)
response = chat(
model=model,
messages=[{"role": "user", "content": prompt}],
format=AnalysisResult.model_json_schema(),
options={"temperature": 0.1},
)
return AnalysisResult.model_validate_json(response.message.content)
Ключевые детали:
format=AnalysisResult.model_json_schema()говорит Ollama принудительно применять JSON-схему на уровне генерации токенов. Модель не может выдать вывод, нарушающий схему.temperature: 0.1делает вывод детерминированным. Классификация логов не должна быть творческой.- Обрезка до 200 строк предотвращает переполнение контекстного окна. У Gemma 2 9B окно 8192 токена. 200 строк логов по ~20 токенов каждая — это примерно половина контекста.
model_validate_json()парсит строку в типизированный Pydantic-объект. Если парсинг не удаётся (редко при принудительной схеме), возникаетValidationError, который можно перехватить.
Протестируй функцию из Python-шелла:
/opt/log-analyzer/venv/bin/python3 -c "
from ollama import chat
import json
response = chat(
model='gemma2:9b',
messages=[{'role': 'user', 'content': 'Analyze this log: Failed password for root from 203.0.113.5 port 44322 ssh2'}],
format={
'type': 'object',
'properties': {
'findings': {'type': 'array', 'items': {'type': 'object'}},
'summary': {'type': 'string'}
},
'required': ['findings', 'summary']
},
options={'temperature': 0.1},
)
print(json.dumps(json.loads(response.message.content), indent=2))
"
Должен получиться чистый JSON-объект с ключами findings и summary. Без markdown-блоков, без преамбулы, только JSON.
Как отправлять алерты об аномалиях в Discord и Slack?
Отправь POST-запрос с JSON-payload на URL вебхука. Discord использует массив embeds с полями, кодированными цветом. Slack использует Block Kit с blocks и полями text. Оба принимают один HTTPS POST.
Вебхук Discord
Создай вебхук в своём Discord-сервере: Настройки сервера > Интеграции > Вебхуки > Новый вебхук. Скопируй URL.
Сохрани URL вебхука безопасно:
cat <<'EOF' > /opt/log-analyzer/.env
DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/YOUR_ID/YOUR_TOKEN
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
LOKI_URL=http://localhost:3100
OLLAMA_MODEL=gemma2:9b
EOF
chmod 600 /opt/log-analyzer/.env
Функция отправки алертов:
import os
import requests
# Severity to Discord embed color (decimal)
SEVERITY_COLORS = {
"critical": 15158332, # red
"high": 15105570, # orange
"medium": 16776960, # yellow
"low": 3447003, # blue
}
def send_discord_alert(webhook_url: str, result: AnalysisResult) -> None:
"""Send findings to Discord as an embed."""
if not result.findings:
return
for finding in result.findings:
embed = {
"title": f"[{finding.severity.upper()}] {finding.title}",
"color": SEVERITY_COLORS.get(finding.severity, 3447003),
"fields": [
{
"name": "Explanation",
"value": finding.explanation or finding.recommendation or "N/A",
"inline": False,
},
{
"name": "Sample log lines",
"value": "```\n" + "\n".join(finding.log_lines[:5]) + "\n```",
"inline": False,
},
],
}
if finding.source_ips:
embed["fields"].append({
"name": "Source IPs",
"value": ", ".join(finding.source_ips),
"inline": True,
})
payload = {"embeds": [embed]}
resp = requests.post(webhook_url, json=payload, timeout=10)
resp.raise_for_status()
Протестируй Discord-вебхук через curl:
curl -s -X POST "$DISCORD_WEBHOOK_URL" \
-H "Content-Type: application/json" \
-d '{
"embeds": [{
"title": "[HIGH] Test Alert - SSH Brute Force",
"color": 15105570,
"fields": [
{"name": "Explanation", "value": "Multiple failed SSH login attempts from 203.0.113.5", "inline": false},
{"name": "Sample log lines", "value": "```\nFailed password for root from 203.0.113.5\n```", "inline": false}
]
}]
}'
Проверь свой Discord-канал. Должно появиться embed-сообщение с цветовым кодом.
Вебхук Slack
Создай Slack-приложение на api.slack.com/apps, включи Incoming Webhooks и скопируй URL вебхука.
def send_slack_alert(webhook_url: str, result: AnalysisResult) -> None:
"""Send findings to Slack using Block Kit."""
if not result.findings:
return
for finding in result.findings:
severity_emoji = {
"critical": ":rotating_light:",
"high": ":warning:",
"medium": ":large_yellow_circle:",
"low": ":information_source:",
}
emoji = severity_emoji.get(finding.severity, ":grey_question:")
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{emoji} [{finding.severity.upper()}] {finding.title}",
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": finding.explanation or finding.recommendation or "N/A",
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "```" + "\n".join(finding.log_lines[:5]) + "```",
},
},
]
payload = {
"text": f"[{finding.severity.upper()}] {finding.title}",
"blocks": blocks,
}
resp = requests.post(webhook_url, json=payload, timeout=10)
resp.raise_for_status()
Протестируй Slack-вебхук:
curl -s -X POST "$SLACK_WEBHOOK_URL" \
-H "Content-Type: application/json" \
-d '{"text": "[HIGH] Test Alert - SSH Brute Force", "blocks": [{"type": "header", "text": {"type": "plain_text", "text": ":warning: [HIGH] Test Alert"}}]}'
Как избежать отправки дублирующихся алертов?
Без дедупликации один и тот же SSH brute-force с одного IP триггерит алерт каждые 5 минут часами. Используй файловый кеш, хранящий хеш заголовка и источника каждого finding. Алерт пропускается, если тот же хеш появлялся в последний час.
import hashlib
import json
import time
from pathlib import Path
DEDUP_FILE = Path("/opt/log-analyzer/dedup_cache.json")
DEDUP_WINDOW = 3600 # seconds (1 hour)
def load_dedup_cache() -> dict:
if DEDUP_FILE.exists():
try:
return json.loads(DEDUP_FILE.read_text())
except (json.JSONDecodeError, OSError):
return {}
return {}
def save_dedup_cache(cache: dict) -> None:
# Prune expired entries
now = time.time()
cache = {k: v for k, v in cache.items() if now - v < DEDUP_WINDOW}
DEDUP_FILE.write_text(json.dumps(cache))
def is_duplicate(finding: Finding) -> bool:
"""Check if this finding was already alerted recently."""
cache = load_dedup_cache()
now = time.time()
# Hash on title + sorted source IPs + severity
key_material = f"{finding.title}|{finding.severity}|{'|'.join(sorted(finding.source_ips))}"
key = hashlib.sha256(key_material.encode()).hexdigest()[:16]
if key in cache and now - cache[key] < DEDUP_WINDOW:
return True
cache[key] = now
save_dedup_cache(cache)
return False
Кеш дедупликации — JSON-файл с короткими хеш-ключами, привязанными к таймстемпам. Старые записи чистятся при каждом сохранении. Окно в 1 час — хороший дефолт: достаточно длинный для подавления повторных алертов, достаточно короткий для повторного алерта, если та же проблема вернётся после паузы.
Установи права на файл кеша:
touch /opt/log-analyzer/dedup_cache.json
chmod 600 /opt/log-analyzer/dedup_cache.json
Полный скрипт
Полный log_analyzer.py собирает всё вместе:
#!/usr/bin/env python3
"""AI Log Analyzer - Query Loki, classify with Ollama, alert to Discord/Slack."""
import hashlib
import json
import logging
import os
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
import requests as http_requests
from ollama import chat
from pydantic import BaseModel
# --- Configuration ---
LOKI_URL = os.environ.get("LOKI_URL", "http://localhost:3100")
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "gemma2:9b")
DISCORD_WEBHOOK_URL = os.environ.get("DISCORD_WEBHOOK_URL", "")
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")
LOG_QUERIES = os.environ.get(
"LOG_QUERIES",
'{job="syslog"};{job="nginx"} |= "error"',
).split(";")
QUERY_WINDOW_MINUTES = int(os.environ.get("QUERY_WINDOW_MINUTES", "5"))
QUERY_LIMIT = int(os.environ.get("QUERY_LIMIT", "500"))
DEDUP_FILE = Path(os.environ.get("DEDUP_FILE", "/opt/log-analyzer/dedup_cache.json"))
DEDUP_WINDOW = int(os.environ.get("DEDUP_WINDOW", "3600"))
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger("log-analyzer")
# --- Models ---
class Finding(BaseModel):
title: str
severity: str
log_lines: list[str]
explanation: str = ""
recommendation: str = ""
source_ips: list[str] = []
affected_service: str = ""
class AnalysisResult(BaseModel):
findings: list[Finding]
summary: str
# --- Prompts ---
PROMPT_GENERAL = """You are a server log analyzer. Analyze the following log lines and classify each anomaly found.
Rules:
- Only report anomalies. Normal operational logs should be ignored.
- An anomaly is anything unexpected: errors, warnings, unusual patterns, failed operations.
- Group related log lines into a single finding.
- Assign a severity: "low", "medium", "high", or "critical".
Log lines:
{logs}
Respond with a JSON object matching this schema:
{{
"findings": [
{{
"title": "short description of the anomaly",
"severity": "low|medium|high|critical",
"log_lines": ["the relevant log lines"],
"explanation": "what this means and potential impact"
}}
],
"summary": "one sentence summary of overall log health"
}}
If no anomalies are found, return {{"findings": [], "summary": "No anomalies detected."}}.
"""
# --- Loki ---
def query_loki(logql: str) -> list[str]:
"""Query Loki for log lines from the last N minutes."""
now = datetime.now(timezone.utc)
start = now - timedelta(minutes=QUERY_WINDOW_MINUTES)
params = {
"query": logql,
"start": str(int(start.timestamp() * 1e9)),
"end": str(int(now.timestamp() * 1e9)),
"limit": QUERY_LIMIT,
}
resp = http_requests.get(
f"{LOKI_URL}/loki/api/v1/query_range",
params=params,
timeout=10,
)
resp.raise_for_status()
data = resp.json()
lines = []
for stream in data.get("data", {}).get("result", []):
for _ts, line in stream.get("values", []):
lines.append(line)
return lines
# --- Ollama ---
def analyze_logs(logs: list[str]) -> AnalysisResult:
"""Send logs to Ollama and get structured analysis back."""
if not logs:
return AnalysisResult(findings=[], summary="No logs to analyze.")
log_block = "\n".join(logs[:200])
prompt = PROMPT_GENERAL.format(logs=log_block)
response = chat(
model=OLLAMA_MODEL,
messages=[{"role": "user", "content": prompt}],
format=AnalysisResult.model_json_schema(),
options={"temperature": 0.1},
)
return AnalysisResult.model_validate_json(response.message.content)
# --- Deduplication ---
def load_dedup_cache() -> dict:
if DEDUP_FILE.exists():
try:
return json.loads(DEDUP_FILE.read_text())
except (json.JSONDecodeError, OSError):
return {}
return {}
def save_dedup_cache(cache: dict) -> None:
now = time.time()
cache = {k: v for k, v in cache.items() if now - v < DEDUP_WINDOW}
DEDUP_FILE.write_text(json.dumps(cache))
def is_duplicate(finding: Finding) -> bool:
cache = load_dedup_cache()
now = time.time()
key_material = f"{finding.title}|{finding.severity}|{'|'.join(sorted(finding.source_ips))}"
key = hashlib.sha256(key_material.encode()).hexdigest()[:16]
if key in cache and now - cache[key] < DEDUP_WINDOW:
return True
cache[key] = now
save_dedup_cache(cache)
return False
# --- Alerting ---
SEVERITY_COLORS = {
"critical": 15158332,
"high": 15105570,
"medium": 16776960,
"low": 3447003,
}
def send_discord_alert(finding: Finding) -> None:
if not DISCORD_WEBHOOK_URL:
return
embed = {
"title": f"[{finding.severity.upper()}] {finding.title}",
"color": SEVERITY_COLORS.get(finding.severity, 3447003),
"fields": [
{
"name": "Explanation",
"value": finding.explanation or finding.recommendation or "N/A",
"inline": False,
},
{
"name": "Sample log lines",
"value": "```\n" + "\n".join(finding.log_lines[:5]) + "\n```",
"inline": False,
},
],
}
if finding.source_ips:
embed["fields"].append({
"name": "Source IPs",
"value": ", ".join(finding.source_ips),
"inline": True,
})
resp = http_requests.post(
DISCORD_WEBHOOK_URL, json={"embeds": [embed]}, timeout=10
)
resp.raise_for_status()
def send_slack_alert(finding: Finding) -> None:
if not SLACK_WEBHOOK_URL:
return
severity_emoji = {
"critical": ":rotating_light:",
"high": ":warning:",
"medium": ":large_yellow_circle:",
"low": ":information_source:",
}
emoji = severity_emoji.get(finding.severity, ":grey_question:")
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{emoji} [{finding.severity.upper()}] {finding.title}",
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": finding.explanation or finding.recommendation or "N/A",
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "```" + "\n".join(finding.log_lines[:5]) + "```",
},
},
]
resp = http_requests.post(
SLACK_WEBHOOK_URL,
json={
"text": f"[{finding.severity.upper()}] {finding.title}",
"blocks": blocks,
},
timeout=10,
)
resp.raise_for_status()
def send_alerts(finding: Finding) -> None:
send_discord_alert(finding)
send_slack_alert(finding)
# --- Main ---
def main() -> int:
log.info("Starting log analysis run")
all_lines = []
for logql in LOG_QUERIES:
logql = logql.strip()
if not logql:
continue
try:
lines = query_loki(logql)
log.info("Query '%s' returned %d lines", logql, len(lines))
all_lines.extend(lines)
except Exception as e:
log.error("Loki query failed for '%s': %s", logql, e)
if not all_lines:
log.info("No log lines to analyze")
return 0
log.info("Analyzing %d total log lines with %s", len(all_lines), OLLAMA_MODEL)
try:
result = analyze_logs(all_lines)
except Exception as e:
log.error("Ollama analysis failed: %s", e)
return 1
log.info("Analysis complete: %d findings. %s", len(result.findings), result.summary)
alerted = 0
for finding in result.findings:
if is_duplicate(finding):
log.info("Skipping duplicate: %s", finding.title)
continue
try:
send_alerts(finding)
alerted += 1
log.info("Alerted: [%s] %s", finding.severity, finding.title)
except Exception as e:
log.error("Alert failed for '%s': %s", finding.title, e)
log.info("Run complete. %d new alerts sent.", alerted)
return 0
if __name__ == "__main__":
sys.exit(main())
Установи права:
chmod 750 /opt/log-analyzer/log_analyzer.py
chown root:root /opt/log-analyzer/log_analyzer.py
ls -la /opt/log-analyzer/
Проверь, что вывод показывает rwxr-x--- для скрипта и rw------- для файла .env.
Запусти ручной тест:
cd /opt/log-analyzer
set -a && source .env && set +a
/opt/log-analyzer/venv/bin/python3 /opt/log-analyzer/log_analyzer.py
Проверь вывод. Должны быть видны полученные строки логов, результаты анализа и отправленные алерты (или пропущенные, если аномалий нет).
Как автоматически запускать AI-анализ логов с помощью systemd-таймера?
Создай пару systemd service + timer. Service запускает Python-скрипт с переменными окружения из файла .env. Timer срабатывает каждые 5 минут. Если скрипт падает, systemd логирует ошибку, и следующий запуск проходит нормально.
Создай service-юнит:
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.service
[Unit]
Description=AI Log Analyzer - Ollama anomaly detection
After=network-online.target ollama.service loki.service
Wants=network-online.target
[Service]
Type=oneshot
EnvironmentFile=/opt/log-analyzer/.env
ExecStart=/opt/log-analyzer/venv/bin/python3 /opt/log-analyzer/log_analyzer.py
WorkingDirectory=/opt/log-analyzer
User=root
StandardOutput=journal
StandardError=journal
TimeoutStartSec=120
EOF
TimeoutStartSec=120 даёт LLM до 2 минут на инференс. На VPS с 8 ГБ и 100-200 строками логов инференс обычно занимает 15-25 секунд. 2-минутный таймаут покрывает случаи, когда Ollama загружает модель с диска.
Service работает от User=root для простоты. В продакшене создай выделенного пользователя log-analyzer, дай ему доступ на чтение файла .env и обнови директиву User=. Скрипту нужен только HTTP-доступ к Loki и Ollama на localhost, повышенные привилегии не требуются.
Создай таймер:
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.timer
[Unit]
Description=Run AI Log Analyzer every 5 minutes
[Timer]
OnBootSec=2min
OnUnitActiveSec=5min
AccuracySec=30s
[Install]
WantedBy=timers.target
EOF
Включи и запусти таймер:
sudo systemctl daemon-reload
sudo systemctl enable --now log-analyzer.timer
enable --now делает таймер переживающим перезагрузки и запускает его немедленно.
Проверь, что таймер активен:
systemctl status log-analyzer.timer
Должно быть active (waiting) и время следующего срабатывания. Посмотри, когда он последний раз запускался:
systemctl list-timers log-analyzer.timer
После первого срабатывания проверь логи сервиса:
journalctl -u log-analyzer.service -n 30 --no-pager
Ищи сообщения Starting log analysis run и Run complete. Если видишь Ollama analysis failed, возможно, модель не скачана или Ollama не запущен.
Уведомления об ошибках
Если анализатор падает, ты хочешь об этом знать. Добавь обработчик OnFailure к service-юниту:
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer-failure@.service
[Unit]
Description=Log Analyzer failure notification for %i
[Service]
Type=oneshot
ExecStart=/usr/bin/curl -s -X POST ${DISCORD_WEBHOOK_URL} \
-H "Content-Type: application/json" \
-d '{"content": ":x: **Log Analyzer Failed**\nUnit: %i\nTime: %H\nCheck: journalctl -u %i"}'
EnvironmentFile=/opt/log-analyzer/.env
EOF
Добавь директиву OnFailure к основному сервису:
sudo mkdir -p /etc/systemd/system/log-analyzer.service.d
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.service.d/failure.conf
[Unit]
OnFailure=log-analyzer-failure@%n.service
EOF
sudo systemctl daemon-reload
Если Python-скрипт завершается с ненулевым кодом, systemd отправит уведомление в твой Discord-канал.
Каковы ограничения LLM-анализа логов?
LLM-анализ логов дополняет rule-based алертинг, а не заменяет его. Он ловит паттерны, которые сложно выразить статическими правилами. Но у него есть реальные слабости, которые нужно знать.
Галлюцинации. Модель может пометить нормальные строки логов как аномалии или придумать объяснения для безобидных событий. Finding'и с низкой серьёзностью от LLM стоит воспринимать как подсказки, а не факты. Всегда проверяй алерты высокой серьёзности вручную.
Ограничения контекстного окна. У Gemma 2 9B окно 8192 токена. При ~20 токенах на строку лога это максимум примерно 400 строк (с учётом места для промпта и вывода). Скрипт обрезает до 200 строк для надёжности. Если сервер генерирует более 200 строк за 5 минут, нужно либо фильтровать более точными LogQL-запросами, либо принять, что часть строк пропускается.
Нет обучения со временем. У модели нет памяти между запусками. Она не может научиться тому, что определённый паттерн логов нормален для твоего окружения. Каждый батч анализируется с нуля. Если есть повторяющееся безвредное, но подозрительно выглядящее сообщение, добавь его в LogQL-фильтр исключения: {job="syslog"} != "expected noisy message".
Задержка инференса. На VPS с 4 vCPU / 8 ГБ инференс занимает 12-22 секунды на батч. Для 5-минутного таймера нормально, но для real-time алертинга слишком медленно. Для критичных по времени событий (диск полон, OOM) оставь классические Prometheus-алерты .
Ложноотрицательные. Маленькие модели пропускают тонкие паттерны. Медленная утечка памяти, порождающая слегка повышенное использование swap'а в течение дней, не проявится в 5-минутном окне логов. Используй метрики Prometheus и алерты Grafana для обнаружения на основе трендов.
Стоимость работы. Хотя нет платы за токены API, модель использует ~5,8 ГБ RAM в загруженном состоянии. На VPS с 8 ГБ это большая часть памяти. Если твоему серверу приложений нужна эта память, запусти Ollama на отдельном VPS или используй модель поменьше gemma2:2b (1,6 ГБ RAM, ниже точность).
Когда использовать это vs классический алертинг
| Сценарий | LLM-анализ логов | Классический алертинг (Prometheus) |
|---|---|---|
| «Что-то не так, но правило не написать» | Да | Нет |
| Обнаружение SSH brute-force | Да (хорош в распознавании паттернов) | Да (fail2ban быстрее) |
| Диск полон / OOM | Нет (слишком медленно) | Да |
| Неизвестные паттерны ошибок | Да | Нет |
| Превышение порога метрики | Нет | Да |
| Изменение формата логов | Да (адаптируется автоматически) | Нет (правила ломаются) |
Лучшая конфигурация запускает оба. Prometheus обрабатывает известные режимы отказа быстрыми алертами. LLM ловит неизвестные неизвестные, читая текст логов.
Устранение неполадок
Ollama возвращает "model not found": запусти ollama list для проверки доступных моделей. Скачай модель командой ollama pull gemma2:9b.
Запрос Loki возвращает пустые результаты: проверь, что Promtail работает (systemctl status promtail) и что job-лейбл в LogQL-запросе соответствует конфигурации Promtail. Протестируй curl-ом напрямую к API Loki.
Не хватает памяти: проверь RAM командой free -h. Если модель Ollama потребляет слишком много, установи OLLAMA_KEEP_ALIVE=1m в override сервиса Ollama. Переключись на gemma2:2b для меньшего потребления RAM.
Алерты Discord/Slack не приходят: протестируй URL вебхука curl-командами из раздела алертинга. Проверь HTTP-ошибки в логах анализатора: journalctl -u log-analyzer.service -n 50.
Медленный инференс: проверь, что VPS имеет ожидаемое число ядер CPU командой nproc. Ollama использует все доступные ядра для инференса. Если другой процесс потребляет CPU, инференс замедляется. Проверь top во время запуска.
Ошибки парсинга JSON: если model_validate_json падает, модель выдала невалидный JSON несмотря на принудительную схему. Это редкость, но бывает в граничных случаях. Скрипт логирует ошибку и продолжает при следующем запуске. Если повторяется, попробуй другую модель.
Смотри логи анализатора:
journalctl -u log-analyzer.service -f
Для проблем, специфичных для Ollama:
journalctl -u ollama.service -f
О следующем шаге в пайплайне AIOps — как построить автоматическое исправление, реагирующее на эти алерты . Об альтернативных подходах к наблюдаемости смотри руководство по SigNoz и OpenObserve.
Авторское право 2026 Virtua.Cloud. Все права защищены. Данный контент является оригинальным произведением команды Virtua.Cloud. Воспроизведение, повторная публикация или распространение без письменного разрешения запрещены.
Готовы попробовать?
Разверните свой сервер за секунды. Linux, Windows или FreeBSD.
Смотреть тарифы VPS