在VPS上使用Ollama进行AI日志分析:用本地LLM检测异常

3 分钟阅读·Matthieu·slackdiscordaiopslog-analysispythonlokiollama|

在你的VPS上构建生产级AI日志分析管道。查询Loki获取日志,通过Ollama使用本地LLM对异常进行分类,并使用Python脚本和systemd定时器将告警发送到Discord或Slack。

你的Loki管道在收集日志。Grafana仪表板可以搜索它们。但凌晨3点SSH暴力破解开始或磁盘快满时,没人在看这些仪表板。你需要一个能持续读取日志并在出问题时通知你的工具。

本教程构建的就是这个系统。一个Python脚本每5分钟查询Loki,将日志批量发送给运行在Ollama上的本地LLM,获取结构化的JSON分类结果,并将异常告警发送到Discord或Slack。不需要云端AI API,不按token计费,你的日志数据永远不会离开服务器。

完成后,你将拥有一个可无人值守运行的systemd定时器,它会自动去重告警并妥善处理故障。

前提条件:

  • 一台至少8 GB RAM的VPS(推荐4 vCPU)。Ollama和Loki将并行运行。
  • 一个正常工作的Loki + Promtail日志管道
  • 已安装Python 3.10+
  • 熟悉systemd和Python基础知识

本文是AIOps系列的一部分。

如何在VPS上安装Ollama进行日志分析?

Ollama以单个二进制文件在本地运行LLM,并在11434端口暴露HTTP API。用官方脚本安装,拉取模型,验证API响应。整个过程在网络良好的情况下不到5分钟。

安装程序需要zstd进行解压。先安装它:

apt-get update && apt-get install -y zstd

下载并运行安装脚本:

curl -fsSL https://ollama.com/install.sh -o ollama-install.sh
sha256sum ollama-install.sh

执行前先检查脚本内容。通读shell代码,如果有已知的校验值可以对比。你也可以查看GitHub上的源码。然后执行:

sh ollama-install.sh

安装程序会创建一个名为ollama的systemd服务。验证它正在运行:

systemctl status ollama

你应该在输出中看到active (running)。注意Loaded: loaded (/etc/systemd/system/ollama.service; enabled; preset: enabled)这一行。安装程序已经将其设置为开机自启。

验证API正在监听:

curl -s http://localhost:11434/api/tags | python3 -m json.tool

这会返回一个包含models数组的JSON对象。在拉取模型之前它是空的。

将Ollama绑定到localhost

默认情况下,Ollama监听127.0.0.1:11434。确认这一点:

ss -tlnp | grep 11434

如果输出显示0.0.0.0:11434,Ollama暴露在了互联网上。通过在systemd unit中设置环境变量来修复:

sudo mkdir -p /etc/systemd/system/ollama.service.d
cat <<'EOF' | sudo tee /etc/systemd/system/ollama.service.d/override.conf
[Service]
Environment="OLLAMA_HOST=127.0.0.1:11434"
EOF
sudo systemctl daemon-reload
sudo systemctl restart ollama
ss -tlnp | grep 11434

确认输出现在显示127.0.0.1:11434。将LLM API暴露在互联网上意味着任何人都可以在你的服务器上运行推理。

哪个LLM模型最适合服务器日志异常检测?

在8 GB的VPS上进行日志分析,你需要一个能和Loki、Promtail一起放入内存的模型。两个模型效果不错:Gemma 2 9B用于通用日志分类,Llama 3.1 8B用于安全分析。两者都以Q4量化运行,大约占用5-6 GB RAM。

拉取两个模型:

ollama pull gemma2:9b
ollama pull llama3.1:8b

每个下载大约5-6 GB。拉取后验证:

ollama list

测试一次快速推理以确认模型能加载:

curl -s http://localhost:11434/api/generate \
  -d '{"model": "gemma2:9b", "prompt": "Classify this log line: Failed password for root from 203.0.113.5 port 22", "stream": false}' \
  | python3 -m json.tool

注意响应中的eval_duration。这是推理时间,单位是纳秒。除以1,000,000得到毫秒。

在4 vCPU / 8 GB VPS上的模型对比

以下数据在Virtua Cloud VCS-8(4 vCPU Ryzen,8 GB RAM,NVMe)上测量,处理100行syslog的批次,模型已加载到内存中:

指标 Gemma 2 9B (Q4_K_M) Llama 3.1 8B (Q4_K_M)
模型磁盘大小 5.4 GB 4.9 GB
RAM占用(已加载) ~5.8 GB ~5.2 GB
每100行批次耗时 ~12-18秒 ~14-22秒
Tokens/秒 ~18-25 ~15-20
安全日志准确度 良好 更好
通用异常检测 更好 良好

冷启动更慢。Ollama从磁盘加载模型后的第一次推理会多花5-10秒。在keep-alive窗口内的后续调用以上述速度运行。

建议:gemma2:9b开始做通用日志分析。如果你主要分析认证/安全日志,切换到llama3.1:8b

8 GB的RAM预算

组件 RAM占用
操作系统 + 系统进程 ~400 MB
Loki ~300-500 MB
Promtail ~50 MB
Ollama(空闲,无模型加载) ~30 MB
Ollama(gemma2:9b已加载) ~5.8 GB
Python脚本 ~50 MB
合计 ~6.7-6.9 GB

这在8 GB内可以运行,还有约1 GB的余量。Ollama在5分钟不活动后会自动卸载模型(可通过OLLAMA_KEEP_ALIVE配置),释放RAM。systemd定时器每5分钟触发一次,所以模型在活跃分析窗口期间保持加载,之间则卸载。

如果内存紧张,使用gemma2:9b配合OLLAMA_KEEP_ALIVE=1m,让模型在每个批次后更快卸载。

如何从Python脚本查询Loki日志?

通过LogQL表达式和时间窗口查询Loki的HTTP API /loki/api/v1/query_range。API返回包含日志流的JSON。使用Python的requests库获取指定job标签的最近5分钟日志。

首先,设置项目:

mkdir -p /opt/log-analyzer
cd /opt/log-analyzer

创建依赖文件:

cat <<'EOF' > /opt/log-analyzer/requirements.txt
requests>=2.31.0
pydantic>=2.5.0
ollama>=0.4.0
EOF

在虚拟环境中安装依赖。在Ubuntu 24.04上,需要先安装python3-venv包:

apt-get install -y python3.12-venv
python3 -m venv /opt/log-analyzer/venv
/opt/log-analyzer/venv/bin/pip install -r /opt/log-analyzer/requirements.txt

验证安装:

/opt/log-analyzer/venv/bin/python -c "import requests, pydantic, ollama; print('OK')"

Loki查询函数

以下函数查询Loki获取最近的日志:

import requests
from datetime import datetime, timedelta, timezone


def query_loki(
    loki_url: str,
    logql: str,
    minutes: int = 5,
    limit: int = 500,
) -> list[str]:
    """Query Loki for log lines from the last N minutes."""
    now = datetime.now(timezone.utc)
    start = now - timedelta(minutes=minutes)

    params = {
        "query": logql,
        "start": str(int(start.timestamp() * 1e9)),  # nanosecond epoch
        "end": str(int(now.timestamp() * 1e9)),
        "limit": limit,
    }

    resp = requests.get(
        f"{loki_url}/loki/api/v1/query_range",
        params=params,
        timeout=10,
    )
    resp.raise_for_status()

    data = resp.json()
    lines = []
    for stream in data.get("data", {}).get("result", []):
        for _ts, line in stream.get("values", []):
            lines.append(line)

    return lines

startend参数使用纳秒级Unix时间戳。Loki的响应将日志行嵌套在data.result[].values[]中,每个值是一个[timestamp, line]对。

你将用到的LogQL查询示例:

# All syslog entries
SYSLOG_QUERY = '{job="syslog"}'

# Nginx error logs
NGINX_QUERY = '{job="nginx"} |= "error"'

# SSH authentication events
AUTH_QUERY = '{job="syslog"} |~ "(sshd|pam_unix)"'

对运行中的Loki实例测试查询:

curl -s 'http://localhost:3100/loki/api/v1/query_range' \
  --data-urlencode 'query={job="syslog"}' \
  --data-urlencode "start=$(date -d '5 minutes ago' +%s)000000000" \
  --data-urlencode "end=$(date +%s)000000000" \
  --data-urlencode 'limit=10' \
  | python3 -m json.tool | head -30

你应该在result数组中看到日志行。如果数组为空,检查Promtail是否在向Loki发送日志,以及job标签是否与你的Promtail配置匹配。

如何编写将日志条目分类为异常的提示词?

提示词是这个系统的核心。好的提示词告诉LLM要找什么,定义分类类别,并要求结构化输出。差的提示词产生模糊的摘要。好的提示词产生可操作的JSON。

三个提示词模板覆盖了大多数服务器日志分析需求:通用异常检测、安全事件检测和性能问题检测。每个提示词内联包含分类schema,以便模型知道预期的输出格式。

提示词1:通用异常检测

PROMPT_GENERAL = """You are a server log analyzer. Analyze the following log lines and classify each anomaly found.

Rules:
- Only report anomalies. Normal operational logs should be ignored.
- An anomaly is anything unexpected: errors, warnings, unusual patterns, failed operations.
- Group related log lines into a single finding.
- Assign a severity: "low", "medium", "high", or "critical".

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of the anomaly",
      "severity": "low|medium|high|critical",
      "log_lines": ["the relevant log lines"],
      "explanation": "what this means and potential impact"
    }}
  ],
  "summary": "one sentence summary of overall log health"
}}

If no anomalies are found, return {{"findings": [], "summary": "No anomalies detected."}}.
"""

提示词2:安全事件检测

PROMPT_SECURITY = """You are a security analyst reviewing server logs. Identify security-relevant events.

Focus on:
- Brute-force attempts (repeated failed logins from same IP)
- Successful logins from unusual IPs or at unusual times
- Privilege escalation attempts (sudo failures, su attempts)
- Port scanning patterns
- Unauthorized access attempts to files or services

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of security event",
      "severity": "low|medium|high|critical",
      "source_ips": ["IP addresses involved"],
      "log_lines": ["the relevant log lines"],
      "recommendation": "suggested response action"
    }}
  ],
  "summary": "one sentence security posture assessment"
}}

If no security events are found, return {{"findings": [], "summary": "No security events detected."}}.
"""

提示词3:性能问题检测

PROMPT_PERFORMANCE = """You are a performance engineer reviewing server logs. Identify performance-related issues.

Focus on:
- High response times or timeouts
- Resource exhaustion (OOM kills, disk full, connection limits)
- Service restarts or crashes
- Queue backlogs or processing delays
- Error rate spikes

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of performance issue",
      "severity": "low|medium|high|critical",
      "affected_service": "service name if identifiable",
      "log_lines": ["the relevant log lines"],
      "explanation": "what this means for system performance"
    }}
  ],
  "summary": "one sentence performance assessment"
}}

If no performance issues are found, return {{"findings": [], "summary": "No performance issues detected."}}.
"""

直接在提示词中嵌入JSON schema是有意为之的。这给模型两个信号:format参数强制输出有效JSON,提示词中的schema引导结构。这种组合使小模型也能产生可靠的输出。

如何从Ollama获取结构化JSON输出?

Ollama通过其API中的format参数支持结构化输出。传入一个JSON schema,模型将只生成符合该schema的token。配合Python端的Pydantic模型,你可以从每次推理调用中获得经过验证的类型化数据。

定义Pydantic模型:

from pydantic import BaseModel


class Finding(BaseModel):
    title: str
    severity: str  # low, medium, high, critical
    log_lines: list[str]
    explanation: str = ""
    recommendation: str = ""
    source_ips: list[str] = []
    affected_service: str = ""


class AnalysisResult(BaseModel):
    findings: list[Finding]
    summary: str

使用强制schema调用Ollama:

from ollama import chat


def analyze_logs(
    logs: list[str],
    model: str = "gemma2:9b",
    prompt_template: str = PROMPT_GENERAL,
) -> AnalysisResult:
    """Send logs to Ollama and get structured analysis back."""
    if not logs:
        return AnalysisResult(findings=[], summary="No logs to analyze.")

    # Truncate to avoid context window issues
    log_block = "\n".join(logs[:200])
    prompt = prompt_template.format(logs=log_block)

    response = chat(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        format=AnalysisResult.model_json_schema(),
        options={"temperature": 0.1},
    )

    return AnalysisResult.model_validate_json(response.message.content)

关键细节:

  • format=AnalysisResult.model_json_schema()告诉Ollama在token生成层面强制执行JSON schema。模型无法生成违反schema的输出。
  • temperature: 0.1保持输出确定性。日志分类不应该有创造性。
  • 截断到200行防止上下文窗口溢出。Gemma 2 9B有8192 token的上下文窗口。200行日志每行约20个token,大约使用一半的上下文。
  • model_validate_json()将字符串解析为类型化的Pydantic对象。如果解析失败(在schema强制下很少发生),会抛出一个你可以捕获的ValidationError

从Python shell测试函数:

/opt/log-analyzer/venv/bin/python3 -c "
from ollama import chat
import json

response = chat(
    model='gemma2:9b',
    messages=[{'role': 'user', 'content': 'Analyze this log: Failed password for root from 203.0.113.5 port 44322 ssh2'}],
    format={
        'type': 'object',
        'properties': {
            'findings': {'type': 'array', 'items': {'type': 'object'}},
            'summary': {'type': 'string'}
        },
        'required': ['findings', 'summary']
    },
    options={'temperature': 0.1},
)
print(json.dumps(json.loads(response.message.content), indent=2))
"

你应该看到一个干净的JSON对象,包含findingssummary键。没有markdown代码块,没有前言,只有JSON。

如何将异常告警发送到Discord和Slack?

向webhook URL发送带有JSON payload的POST请求。Discord使用带有颜色编码字段的embeds数组。Slack使用Block Kit,包含blockstext字段。两者都接受单个HTTPS POST。

Discord webhook

在你的Discord服务器中创建webhook:服务器设置 > 集成 > Webhooks > 新建Webhook。复制URL。

安全存储webhook URL:

cat <<'EOF' > /opt/log-analyzer/.env
DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/YOUR_ID/YOUR_TOKEN
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
LOKI_URL=http://localhost:3100
OLLAMA_MODEL=gemma2:9b
EOF
chmod 600 /opt/log-analyzer/.env

告警发送函数:

import os
import requests

# Severity to Discord embed color (decimal)
SEVERITY_COLORS = {
    "critical": 15158332,  # red
    "high": 15105570,      # orange
    "medium": 16776960,    # yellow
    "low": 3447003,        # blue
}


def send_discord_alert(webhook_url: str, result: AnalysisResult) -> None:
    """Send findings to Discord as an embed."""
    if not result.findings:
        return

    for finding in result.findings:
        embed = {
            "title": f"[{finding.severity.upper()}] {finding.title}",
            "color": SEVERITY_COLORS.get(finding.severity, 3447003),
            "fields": [
                {
                    "name": "Explanation",
                    "value": finding.explanation or finding.recommendation or "N/A",
                    "inline": False,
                },
                {
                    "name": "Sample log lines",
                    "value": "```\n" + "\n".join(finding.log_lines[:5]) + "\n```",
                    "inline": False,
                },
            ],
        }
        if finding.source_ips:
            embed["fields"].append({
                "name": "Source IPs",
                "value": ", ".join(finding.source_ips),
                "inline": True,
            })

        payload = {"embeds": [embed]}
        resp = requests.post(webhook_url, json=payload, timeout=10)
        resp.raise_for_status()

用curl测试Discord webhook:

curl -s -X POST "$DISCORD_WEBHOOK_URL" \
  -H "Content-Type: application/json" \
  -d '{
    "embeds": [{
      "title": "[HIGH] Test Alert - SSH Brute Force",
      "color": 15105570,
      "fields": [
        {"name": "Explanation", "value": "Multiple failed SSH login attempts from 203.0.113.5", "inline": false},
        {"name": "Sample log lines", "value": "```\nFailed password for root from 203.0.113.5\n```", "inline": false}
      ]
    }]
  }'

检查你的Discord频道。你应该看到一条带颜色编码的embed消息。

Slack webhook

在api.slack.com/apps创建一个Slack应用,启用Incoming Webhooks,复制webhook URL。

def send_slack_alert(webhook_url: str, result: AnalysisResult) -> None:
    """Send findings to Slack using Block Kit."""
    if not result.findings:
        return

    for finding in result.findings:
        severity_emoji = {
            "critical": ":rotating_light:",
            "high": ":warning:",
            "medium": ":large_yellow_circle:",
            "low": ":information_source:",
        }
        emoji = severity_emoji.get(finding.severity, ":grey_question:")

        blocks = [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": f"{emoji} [{finding.severity.upper()}] {finding.title}",
                },
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": finding.explanation or finding.recommendation or "N/A",
                },
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": "```" + "\n".join(finding.log_lines[:5]) + "```",
                },
            },
        ]

        payload = {
            "text": f"[{finding.severity.upper()}] {finding.title}",
            "blocks": blocks,
        }
        resp = requests.post(webhook_url, json=payload, timeout=10)
        resp.raise_for_status()

测试Slack webhook:

curl -s -X POST "$SLACK_WEBHOOK_URL" \
  -H "Content-Type: application/json" \
  -d '{"text": "[HIGH] Test Alert - SSH Brute Force", "blocks": [{"type": "header", "text": {"type": "plain_text", "text": ":warning: [HIGH] Test Alert"}}]}'

如何避免发送重复告警?

没有去重机制的话,同一个IP的SSH暴力破解每5分钟触发一次告警,持续数小时。使用基于文件的缓存,存储每个发现的标题和来源的哈希值。如果同一哈希在最近一小时内出现过,则跳过该告警。

import hashlib
import json
import time
from pathlib import Path

DEDUP_FILE = Path("/opt/log-analyzer/dedup_cache.json")
DEDUP_WINDOW = 3600  # seconds (1 hour)


def load_dedup_cache() -> dict:
    if DEDUP_FILE.exists():
        try:
            return json.loads(DEDUP_FILE.read_text())
        except (json.JSONDecodeError, OSError):
            return {}
    return {}


def save_dedup_cache(cache: dict) -> None:
    # Prune expired entries
    now = time.time()
    cache = {k: v for k, v in cache.items() if now - v < DEDUP_WINDOW}
    DEDUP_FILE.write_text(json.dumps(cache))


def is_duplicate(finding: Finding) -> bool:
    """Check if this finding was already alerted recently."""
    cache = load_dedup_cache()
    now = time.time()

    # Hash on title + sorted source IPs + severity
    key_material = f"{finding.title}|{finding.severity}|{'|'.join(sorted(finding.source_ips))}"
    key = hashlib.sha256(key_material.encode()).hexdigest()[:16]

    if key in cache and now - cache[key] < DEDUP_WINDOW:
        return True

    cache[key] = now
    save_dedup_cache(cache)
    return False

去重缓存是一个JSON文件,短哈希键映射到时间戳。旧条目在每次保存时被清理。1小时窗口是一个好的默认值:足够长以抑制重复告警,足够短以在同一问题间隔后再次出现时重新告警。

设置缓存文件的权限:

touch /opt/log-analyzer/dedup_cache.json
chmod 600 /opt/log-analyzer/dedup_cache.json

完整脚本

完整的log_analyzer.py将所有部分组合在一起:

#!/usr/bin/env python3
"""AI Log Analyzer - Query Loki, classify with Ollama, alert to Discord/Slack."""

import hashlib
import json
import logging
import os
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path

import requests as http_requests
from ollama import chat
from pydantic import BaseModel

# --- Configuration ---

LOKI_URL = os.environ.get("LOKI_URL", "http://localhost:3100")
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "gemma2:9b")
DISCORD_WEBHOOK_URL = os.environ.get("DISCORD_WEBHOOK_URL", "")
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")
LOG_QUERIES = os.environ.get(
    "LOG_QUERIES",
    '{job="syslog"};{job="nginx"} |= "error"',
).split(";")
QUERY_WINDOW_MINUTES = int(os.environ.get("QUERY_WINDOW_MINUTES", "5"))
QUERY_LIMIT = int(os.environ.get("QUERY_LIMIT", "500"))
DEDUP_FILE = Path(os.environ.get("DEDUP_FILE", "/opt/log-analyzer/dedup_cache.json"))
DEDUP_WINDOW = int(os.environ.get("DEDUP_WINDOW", "3600"))

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger("log-analyzer")


# --- Models ---

class Finding(BaseModel):
    title: str
    severity: str
    log_lines: list[str]
    explanation: str = ""
    recommendation: str = ""
    source_ips: list[str] = []
    affected_service: str = ""


class AnalysisResult(BaseModel):
    findings: list[Finding]
    summary: str


# --- Prompts ---

PROMPT_GENERAL = """You are a server log analyzer. Analyze the following log lines and classify each anomaly found.

Rules:
- Only report anomalies. Normal operational logs should be ignored.
- An anomaly is anything unexpected: errors, warnings, unusual patterns, failed operations.
- Group related log lines into a single finding.
- Assign a severity: "low", "medium", "high", or "critical".

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of the anomaly",
      "severity": "low|medium|high|critical",
      "log_lines": ["the relevant log lines"],
      "explanation": "what this means and potential impact"
    }}
  ],
  "summary": "one sentence summary of overall log health"
}}

If no anomalies are found, return {{"findings": [], "summary": "No anomalies detected."}}.
"""


# --- Loki ---

def query_loki(logql: str) -> list[str]:
    """Query Loki for log lines from the last N minutes."""
    now = datetime.now(timezone.utc)
    start = now - timedelta(minutes=QUERY_WINDOW_MINUTES)

    params = {
        "query": logql,
        "start": str(int(start.timestamp() * 1e9)),
        "end": str(int(now.timestamp() * 1e9)),
        "limit": QUERY_LIMIT,
    }

    resp = http_requests.get(
        f"{LOKI_URL}/loki/api/v1/query_range",
        params=params,
        timeout=10,
    )
    resp.raise_for_status()

    data = resp.json()
    lines = []
    for stream in data.get("data", {}).get("result", []):
        for _ts, line in stream.get("values", []):
            lines.append(line)

    return lines


# --- Ollama ---

def analyze_logs(logs: list[str]) -> AnalysisResult:
    """Send logs to Ollama and get structured analysis back."""
    if not logs:
        return AnalysisResult(findings=[], summary="No logs to analyze.")

    log_block = "\n".join(logs[:200])
    prompt = PROMPT_GENERAL.format(logs=log_block)

    response = chat(
        model=OLLAMA_MODEL,
        messages=[{"role": "user", "content": prompt}],
        format=AnalysisResult.model_json_schema(),
        options={"temperature": 0.1},
    )

    return AnalysisResult.model_validate_json(response.message.content)


# --- Deduplication ---

def load_dedup_cache() -> dict:
    if DEDUP_FILE.exists():
        try:
            return json.loads(DEDUP_FILE.read_text())
        except (json.JSONDecodeError, OSError):
            return {}
    return {}


def save_dedup_cache(cache: dict) -> None:
    now = time.time()
    cache = {k: v for k, v in cache.items() if now - v < DEDUP_WINDOW}
    DEDUP_FILE.write_text(json.dumps(cache))


def is_duplicate(finding: Finding) -> bool:
    cache = load_dedup_cache()
    now = time.time()
    key_material = f"{finding.title}|{finding.severity}|{'|'.join(sorted(finding.source_ips))}"
    key = hashlib.sha256(key_material.encode()).hexdigest()[:16]

    if key in cache and now - cache[key] < DEDUP_WINDOW:
        return True

    cache[key] = now
    save_dedup_cache(cache)
    return False


# --- Alerting ---

SEVERITY_COLORS = {
    "critical": 15158332,
    "high": 15105570,
    "medium": 16776960,
    "low": 3447003,
}


def send_discord_alert(finding: Finding) -> None:
    if not DISCORD_WEBHOOK_URL:
        return

    embed = {
        "title": f"[{finding.severity.upper()}] {finding.title}",
        "color": SEVERITY_COLORS.get(finding.severity, 3447003),
        "fields": [
            {
                "name": "Explanation",
                "value": finding.explanation or finding.recommendation or "N/A",
                "inline": False,
            },
            {
                "name": "Sample log lines",
                "value": "```\n" + "\n".join(finding.log_lines[:5]) + "\n```",
                "inline": False,
            },
        ],
    }
    if finding.source_ips:
        embed["fields"].append({
            "name": "Source IPs",
            "value": ", ".join(finding.source_ips),
            "inline": True,
        })

    resp = http_requests.post(
        DISCORD_WEBHOOK_URL, json={"embeds": [embed]}, timeout=10
    )
    resp.raise_for_status()


def send_slack_alert(finding: Finding) -> None:
    if not SLACK_WEBHOOK_URL:
        return

    severity_emoji = {
        "critical": ":rotating_light:",
        "high": ":warning:",
        "medium": ":large_yellow_circle:",
        "low": ":information_source:",
    }
    emoji = severity_emoji.get(finding.severity, ":grey_question:")

    blocks = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": f"{emoji} [{finding.severity.upper()}] {finding.title}",
            },
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": finding.explanation or finding.recommendation or "N/A",
            },
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": "```" + "\n".join(finding.log_lines[:5]) + "```",
            },
        },
    ]

    resp = http_requests.post(
        SLACK_WEBHOOK_URL,
        json={
            "text": f"[{finding.severity.upper()}] {finding.title}",
            "blocks": blocks,
        },
        timeout=10,
    )
    resp.raise_for_status()


def send_alerts(finding: Finding) -> None:
    send_discord_alert(finding)
    send_slack_alert(finding)


# --- Main ---

def main() -> int:
    log.info("Starting log analysis run")
    all_lines = []

    for logql in LOG_QUERIES:
        logql = logql.strip()
        if not logql:
            continue
        try:
            lines = query_loki(logql)
            log.info("Query '%s' returned %d lines", logql, len(lines))
            all_lines.extend(lines)
        except Exception as e:
            log.error("Loki query failed for '%s': %s", logql, e)

    if not all_lines:
        log.info("No log lines to analyze")
        return 0

    log.info("Analyzing %d total log lines with %s", len(all_lines), OLLAMA_MODEL)

    try:
        result = analyze_logs(all_lines)
    except Exception as e:
        log.error("Ollama analysis failed: %s", e)
        return 1

    log.info("Analysis complete: %d findings. %s", len(result.findings), result.summary)

    alerted = 0
    for finding in result.findings:
        if is_duplicate(finding):
            log.info("Skipping duplicate: %s", finding.title)
            continue
        try:
            send_alerts(finding)
            alerted += 1
            log.info("Alerted: [%s] %s", finding.severity, finding.title)
        except Exception as e:
            log.error("Alert failed for '%s': %s", finding.title, e)

    log.info("Run complete. %d new alerts sent.", alerted)
    return 0


if __name__ == "__main__":
    sys.exit(main())

设置权限:

chmod 750 /opt/log-analyzer/log_analyzer.py
chown root:root /opt/log-analyzer/log_analyzer.py
ls -la /opt/log-analyzer/

验证输出显示脚本为rwxr-x---.env文件为rw-------

运行手动测试:

cd /opt/log-analyzer
set -a && source .env && set +a
/opt/log-analyzer/venv/bin/python3 /opt/log-analyzer/log_analyzer.py

检查输出。你应该看到获取的日志行、分析结果和发送的告警(如果没有异常则显示跳过)。

如何使用systemd定时器自动运行AI日志分析?

创建一对systemd service和timer。service使用.env文件中的环境变量运行Python脚本。timer每5分钟触发一次。如果脚本失败,systemd记录失败,下次运行正常进行。

创建service unit:

cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.service
[Unit]
Description=AI Log Analyzer - Ollama anomaly detection
After=network-online.target ollama.service loki.service
Wants=network-online.target

[Service]
Type=oneshot
EnvironmentFile=/opt/log-analyzer/.env
ExecStart=/opt/log-analyzer/venv/bin/python3 /opt/log-analyzer/log_analyzer.py
WorkingDirectory=/opt/log-analyzer
User=root
StandardOutput=journal
StandardError=journal
TimeoutStartSec=120
EOF

TimeoutStartSec=120给LLM最多2分钟完成推理。在8 GB VPS上处理100-200行日志,推理通常在15-25秒内完成。2分钟超时覆盖了Ollama需要从磁盘加载模型的情况。

service以User=root运行是为了简单。在生产环境中,建议创建一个专用的log-analyzer用户,授予其对.env文件的读取权限,并更新User=指令。脚本只需要HTTP访问localhost上的Loki和Ollama,不需要提升的权限。

创建timer:

cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.timer
[Unit]
Description=Run AI Log Analyzer every 5 minutes

[Timer]
OnBootSec=2min
OnUnitActiveSec=5min
AccuracySec=30s

[Install]
WantedBy=timers.target
EOF

启用并启动timer:

sudo systemctl daemon-reload
sudo systemctl enable --now log-analyzer.timer

enable --now使timer在重启后保留并立即启动。

验证timer是否活跃:

systemctl status log-analyzer.timer

你应该看到active (waiting)和下次触发时间。查看上次运行时间:

systemctl list-timers log-analyzer.timer

第一次触发后,查看service日志:

journalctl -u log-analyzer.service -n 30 --no-pager

寻找Starting log analysis runRun complete消息。如果看到Ollama analysis failed,可能是模型没有拉取或Ollama没有运行。

失败通知

如果分析器失败,你需要知道。给service unit添加一个OnFailure处理器:

cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer-failure@.service
[Unit]
Description=Log Analyzer failure notification for %i

[Service]
Type=oneshot
ExecStart=/usr/bin/curl -s -X POST ${DISCORD_WEBHOOK_URL} \
  -H "Content-Type: application/json" \
  -d '{"content": ":x: **Log Analyzer Failed**\nUnit: %i\nTime: %H\nCheck: journalctl -u %i"}'
EnvironmentFile=/opt/log-analyzer/.env
EOF

给主service添加OnFailure指令:

sudo mkdir -p /etc/systemd/system/log-analyzer.service.d
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.service.d/failure.conf
[Unit]
OnFailure=log-analyzer-failure@%n.service
EOF
sudo systemctl daemon-reload

如果Python脚本以非零代码退出,systemd会向你的Discord频道发送通知。

基于LLM的日志分析有哪些局限性?

LLM日志分析是基于规则告警的补充,不是替代。它能捕捉难以用静态规则表达的模式。但它有一些真实的弱点需要了解。

幻觉。 模型可能将正常日志行标记为异常,或为良性事件编造解释。LLM的低严重性发现应当被视为建议而非事实。高严重性告警务必手动验证。

上下文窗口限制。 Gemma 2 9B有8192 token的上下文窗口。每行日志约20个token,最多大约400行(留出prompt和输出的空间)。脚本安全起见截断到200行。如果你的服务器在5分钟内生成超过200行,你需要用更具体的LogQL查询过滤,或者接受部分日志行被跳过。

无法随时间学习。 模型在运行之间没有记忆。它无法学习到某个特定日志模式在你的环境中是正常的。每个批次都从头分析。如果你有一个反复出现的良性但看似可疑的日志消息,将其添加到LogQL排除过滤器:{job="syslog"} != "expected noisy message"

推理延迟。 在4 vCPU / 8 GB VPS上,每批推理需要12-22秒。对于5分钟定时器来说没问题,但对于实时告警来说太慢了。对于时间敏感的事件(磁盘满、OOM),保留传统的Prometheus告警 。

假阴性。 小模型会遗漏细微模式。一个缓慢的内存泄漏在数天内产生略微升高的swap使用量,不会出现在5分钟的日志窗口中。使用Prometheus指标和Grafana告警进行基于趋势的检测。

运行成本。 虽然没有按token的API费用,但模型加载时使用约5.8 GB RAM。在8 GB VPS上,这是你大部分的内存。如果你的应用服务器需要这些RAM,在单独的VPS上运行Ollama,或使用更小的gemma2:2b模型(1.6 GB RAM,准确度较低)。

何时使用这个 vs 传统告警

用例 LLM日志分析 传统告警(Prometheus)
「看起来有问题但我写不出规则」
SSH暴力破解检测 是(擅长模式识别) 是(fail2ban更快)
磁盘满 / OOM 否(太慢)
未知错误模式
指标阈值超越
日志格式变化 是(自动适应) 否(规则失效)

最佳配置是两者同时运行。Prometheus用快速告警处理已知的故障模式。LLM通过阅读实际日志文本捕捉未知的未知问题。

故障排除

Ollama返回"model not found": 运行ollama list查看可用模型。用ollama pull gemma2:9b拉取模型。

Loki查询返回空结果: 检查Promtail是否在运行(systemctl status promtail),以及LogQL查询中的job标签是否与Promtail配置匹配。直接用curl对Loki API进行测试。

内存不足:free -h检查RAM。如果Ollama的模型消耗过多,在Ollama service override中设置OLLAMA_KEEP_ALIVE=1m。考虑切换到gemma2:2b以降低RAM使用。

Discord/Slack告警未到达: 用告警部分的curl命令测试webhook URL。在分析器日志中检查HTTP错误:journalctl -u log-analyzer.service -n 50

推理缓慢:nproc验证你的VPS是否有预期的CPU核心数。Ollama使用所有可用核心进行推理。如果另一个进程在消耗CPU,推理会变慢。运行期间用top检查。

JSON解析错误: 如果model_validate_json失败,说明模型在schema强制下仍产生了无效JSON。这很少见但在某些边缘情况下会发生。脚本记录错误并继续下次运行。如果反复发生,尝试换一个模型。

查看分析器日志:

journalctl -u log-analyzer.service -f

对于Ollama相关的问题:

journalctl -u ollama.service -f

关于AIOps管道的下一步,请参阅如何构建基于这些告警采取行动的自动修复系统 。关于替代的可观测性方案,请参阅SigNoz和OpenObserve指南。


版权所有 2026 Virtua.Cloud。保留所有权利。 本内容为 Virtua.Cloud 团队原创作品。 未经书面许可,禁止复制、转载或再分发。

准备好亲自尝试了吗?

几秒内部署您自己的服务器。支持 Linux、Windows 或 FreeBSD。

查看 VPS 方案