在VPS上使用Ollama进行AI日志分析:用本地LLM检测异常
在你的VPS上构建生产级AI日志分析管道。查询Loki获取日志,通过Ollama使用本地LLM对异常进行分类,并使用Python脚本和systemd定时器将告警发送到Discord或Slack。
你的Loki管道在收集日志。Grafana仪表板可以搜索它们。但凌晨3点SSH暴力破解开始或磁盘快满时,没人在看这些仪表板。你需要一个能持续读取日志并在出问题时通知你的工具。
本教程构建的就是这个系统。一个Python脚本每5分钟查询Loki,将日志批量发送给运行在Ollama上的本地LLM,获取结构化的JSON分类结果,并将异常告警发送到Discord或Slack。不需要云端AI API,不按token计费,你的日志数据永远不会离开服务器。
完成后,你将拥有一个可无人值守运行的systemd定时器,它会自动去重告警并妥善处理故障。
前提条件:
- 一台至少8 GB RAM的VPS(推荐4 vCPU)。Ollama和Loki将并行运行。
- 一个正常工作的Loki + Promtail日志管道
- 已安装Python 3.10+
- 熟悉systemd和Python基础知识
本文是AIOps系列的一部分。
如何在VPS上安装Ollama进行日志分析?
Ollama以单个二进制文件在本地运行LLM,并在11434端口暴露HTTP API。用官方脚本安装,拉取模型,验证API响应。整个过程在网络良好的情况下不到5分钟。
安装程序需要zstd进行解压。先安装它:
apt-get update && apt-get install -y zstd
下载并运行安装脚本:
curl -fsSL https://ollama.com/install.sh -o ollama-install.sh
sha256sum ollama-install.sh
执行前先检查脚本内容。通读shell代码,如果有已知的校验值可以对比。你也可以查看GitHub上的源码。然后执行:
sh ollama-install.sh
安装程序会创建一个名为ollama的systemd服务。验证它正在运行:
systemctl status ollama
你应该在输出中看到active (running)。注意Loaded: loaded (/etc/systemd/system/ollama.service; enabled; preset: enabled)这一行。安装程序已经将其设置为开机自启。
验证API正在监听:
curl -s http://localhost:11434/api/tags | python3 -m json.tool
这会返回一个包含models数组的JSON对象。在拉取模型之前它是空的。
将Ollama绑定到localhost
默认情况下,Ollama监听127.0.0.1:11434。确认这一点:
ss -tlnp | grep 11434
如果输出显示0.0.0.0:11434,Ollama暴露在了互联网上。通过在systemd unit中设置环境变量来修复:
sudo mkdir -p /etc/systemd/system/ollama.service.d
cat <<'EOF' | sudo tee /etc/systemd/system/ollama.service.d/override.conf
[Service]
Environment="OLLAMA_HOST=127.0.0.1:11434"
EOF
sudo systemctl daemon-reload
sudo systemctl restart ollama
ss -tlnp | grep 11434
确认输出现在显示127.0.0.1:11434。将LLM API暴露在互联网上意味着任何人都可以在你的服务器上运行推理。
哪个LLM模型最适合服务器日志异常检测?
在8 GB的VPS上进行日志分析,你需要一个能和Loki、Promtail一起放入内存的模型。两个模型效果不错:Gemma 2 9B用于通用日志分类,Llama 3.1 8B用于安全分析。两者都以Q4量化运行,大约占用5-6 GB RAM。
拉取两个模型:
ollama pull gemma2:9b
ollama pull llama3.1:8b
每个下载大约5-6 GB。拉取后验证:
ollama list
测试一次快速推理以确认模型能加载:
curl -s http://localhost:11434/api/generate \
-d '{"model": "gemma2:9b", "prompt": "Classify this log line: Failed password for root from 203.0.113.5 port 22", "stream": false}' \
| python3 -m json.tool
注意响应中的eval_duration。这是推理时间,单位是纳秒。除以1,000,000得到毫秒。
在4 vCPU / 8 GB VPS上的模型对比
以下数据在Virtua Cloud VCS-8(4 vCPU Ryzen,8 GB RAM,NVMe)上测量,处理100行syslog的批次,模型已加载到内存中:
| 指标 | Gemma 2 9B (Q4_K_M) | Llama 3.1 8B (Q4_K_M) |
|---|---|---|
| 模型磁盘大小 | 5.4 GB | 4.9 GB |
| RAM占用(已加载) | ~5.8 GB | ~5.2 GB |
| 每100行批次耗时 | ~12-18秒 | ~14-22秒 |
| Tokens/秒 | ~18-25 | ~15-20 |
| 安全日志准确度 | 良好 | 更好 |
| 通用异常检测 | 更好 | 良好 |
冷启动更慢。Ollama从磁盘加载模型后的第一次推理会多花5-10秒。在keep-alive窗口内的后续调用以上述速度运行。
建议: 从gemma2:9b开始做通用日志分析。如果你主要分析认证/安全日志,切换到llama3.1:8b。
8 GB的RAM预算
| 组件 | RAM占用 |
|---|---|
| 操作系统 + 系统进程 | ~400 MB |
| Loki | ~300-500 MB |
| Promtail | ~50 MB |
| Ollama(空闲,无模型加载) | ~30 MB |
| Ollama(gemma2:9b已加载) | ~5.8 GB |
| Python脚本 | ~50 MB |
| 合计 | ~6.7-6.9 GB |
这在8 GB内可以运行,还有约1 GB的余量。Ollama在5分钟不活动后会自动卸载模型(可通过OLLAMA_KEEP_ALIVE配置),释放RAM。systemd定时器每5分钟触发一次,所以模型在活跃分析窗口期间保持加载,之间则卸载。
如果内存紧张,使用gemma2:9b配合OLLAMA_KEEP_ALIVE=1m,让模型在每个批次后更快卸载。
如何从Python脚本查询Loki日志?
通过LogQL表达式和时间窗口查询Loki的HTTP API /loki/api/v1/query_range。API返回包含日志流的JSON。使用Python的requests库获取指定job标签的最近5分钟日志。
首先,设置项目:
mkdir -p /opt/log-analyzer
cd /opt/log-analyzer
创建依赖文件:
cat <<'EOF' > /opt/log-analyzer/requirements.txt
requests>=2.31.0
pydantic>=2.5.0
ollama>=0.4.0
EOF
在虚拟环境中安装依赖。在Ubuntu 24.04上,需要先安装python3-venv包:
apt-get install -y python3.12-venv
python3 -m venv /opt/log-analyzer/venv
/opt/log-analyzer/venv/bin/pip install -r /opt/log-analyzer/requirements.txt
验证安装:
/opt/log-analyzer/venv/bin/python -c "import requests, pydantic, ollama; print('OK')"
Loki查询函数
以下函数查询Loki获取最近的日志:
import requests
from datetime import datetime, timedelta, timezone
def query_loki(
loki_url: str,
logql: str,
minutes: int = 5,
limit: int = 500,
) -> list[str]:
"""Query Loki for log lines from the last N minutes."""
now = datetime.now(timezone.utc)
start = now - timedelta(minutes=minutes)
params = {
"query": logql,
"start": str(int(start.timestamp() * 1e9)), # nanosecond epoch
"end": str(int(now.timestamp() * 1e9)),
"limit": limit,
}
resp = requests.get(
f"{loki_url}/loki/api/v1/query_range",
params=params,
timeout=10,
)
resp.raise_for_status()
data = resp.json()
lines = []
for stream in data.get("data", {}).get("result", []):
for _ts, line in stream.get("values", []):
lines.append(line)
return lines
start和end参数使用纳秒级Unix时间戳。Loki的响应将日志行嵌套在data.result[].values[]中,每个值是一个[timestamp, line]对。
你将用到的LogQL查询示例:
# All syslog entries
SYSLOG_QUERY = '{job="syslog"}'
# Nginx error logs
NGINX_QUERY = '{job="nginx"} |= "error"'
# SSH authentication events
AUTH_QUERY = '{job="syslog"} |~ "(sshd|pam_unix)"'
对运行中的Loki实例测试查询:
curl -s 'http://localhost:3100/loki/api/v1/query_range' \
--data-urlencode 'query={job="syslog"}' \
--data-urlencode "start=$(date -d '5 minutes ago' +%s)000000000" \
--data-urlencode "end=$(date +%s)000000000" \
--data-urlencode 'limit=10' \
| python3 -m json.tool | head -30
你应该在result数组中看到日志行。如果数组为空,检查Promtail是否在向Loki发送日志,以及job标签是否与你的Promtail配置匹配。
如何编写将日志条目分类为异常的提示词?
提示词是这个系统的核心。好的提示词告诉LLM要找什么,定义分类类别,并要求结构化输出。差的提示词产生模糊的摘要。好的提示词产生可操作的JSON。
三个提示词模板覆盖了大多数服务器日志分析需求:通用异常检测、安全事件检测和性能问题检测。每个提示词内联包含分类schema,以便模型知道预期的输出格式。
提示词1:通用异常检测
PROMPT_GENERAL = """You are a server log analyzer. Analyze the following log lines and classify each anomaly found.
Rules:
- Only report anomalies. Normal operational logs should be ignored.
- An anomaly is anything unexpected: errors, warnings, unusual patterns, failed operations.
- Group related log lines into a single finding.
- Assign a severity: "low", "medium", "high", or "critical".
Log lines:
{logs}
Respond with a JSON object matching this schema:
{{
"findings": [
{{
"title": "short description of the anomaly",
"severity": "low|medium|high|critical",
"log_lines": ["the relevant log lines"],
"explanation": "what this means and potential impact"
}}
],
"summary": "one sentence summary of overall log health"
}}
If no anomalies are found, return {{"findings": [], "summary": "No anomalies detected."}}.
"""
提示词2:安全事件检测
PROMPT_SECURITY = """You are a security analyst reviewing server logs. Identify security-relevant events.
Focus on:
- Brute-force attempts (repeated failed logins from same IP)
- Successful logins from unusual IPs or at unusual times
- Privilege escalation attempts (sudo failures, su attempts)
- Port scanning patterns
- Unauthorized access attempts to files or services
Log lines:
{logs}
Respond with a JSON object matching this schema:
{{
"findings": [
{{
"title": "short description of security event",
"severity": "low|medium|high|critical",
"source_ips": ["IP addresses involved"],
"log_lines": ["the relevant log lines"],
"recommendation": "suggested response action"
}}
],
"summary": "one sentence security posture assessment"
}}
If no security events are found, return {{"findings": [], "summary": "No security events detected."}}.
"""
提示词3:性能问题检测
PROMPT_PERFORMANCE = """You are a performance engineer reviewing server logs. Identify performance-related issues.
Focus on:
- High response times or timeouts
- Resource exhaustion (OOM kills, disk full, connection limits)
- Service restarts or crashes
- Queue backlogs or processing delays
- Error rate spikes
Log lines:
{logs}
Respond with a JSON object matching this schema:
{{
"findings": [
{{
"title": "short description of performance issue",
"severity": "low|medium|high|critical",
"affected_service": "service name if identifiable",
"log_lines": ["the relevant log lines"],
"explanation": "what this means for system performance"
}}
],
"summary": "one sentence performance assessment"
}}
If no performance issues are found, return {{"findings": [], "summary": "No performance issues detected."}}.
"""
直接在提示词中嵌入JSON schema是有意为之的。这给模型两个信号:format参数强制输出有效JSON,提示词中的schema引导结构。这种组合使小模型也能产生可靠的输出。
如何从Ollama获取结构化JSON输出?
Ollama通过其API中的format参数支持结构化输出。传入一个JSON schema,模型将只生成符合该schema的token。配合Python端的Pydantic模型,你可以从每次推理调用中获得经过验证的类型化数据。
定义Pydantic模型:
from pydantic import BaseModel
class Finding(BaseModel):
title: str
severity: str # low, medium, high, critical
log_lines: list[str]
explanation: str = ""
recommendation: str = ""
source_ips: list[str] = []
affected_service: str = ""
class AnalysisResult(BaseModel):
findings: list[Finding]
summary: str
使用强制schema调用Ollama:
from ollama import chat
def analyze_logs(
logs: list[str],
model: str = "gemma2:9b",
prompt_template: str = PROMPT_GENERAL,
) -> AnalysisResult:
"""Send logs to Ollama and get structured analysis back."""
if not logs:
return AnalysisResult(findings=[], summary="No logs to analyze.")
# Truncate to avoid context window issues
log_block = "\n".join(logs[:200])
prompt = prompt_template.format(logs=log_block)
response = chat(
model=model,
messages=[{"role": "user", "content": prompt}],
format=AnalysisResult.model_json_schema(),
options={"temperature": 0.1},
)
return AnalysisResult.model_validate_json(response.message.content)
关键细节:
format=AnalysisResult.model_json_schema()告诉Ollama在token生成层面强制执行JSON schema。模型无法生成违反schema的输出。temperature: 0.1保持输出确定性。日志分类不应该有创造性。- 截断到200行防止上下文窗口溢出。Gemma 2 9B有8192 token的上下文窗口。200行日志每行约20个token,大约使用一半的上下文。
model_validate_json()将字符串解析为类型化的Pydantic对象。如果解析失败(在schema强制下很少发生),会抛出一个你可以捕获的ValidationError。
从Python shell测试函数:
/opt/log-analyzer/venv/bin/python3 -c "
from ollama import chat
import json
response = chat(
model='gemma2:9b',
messages=[{'role': 'user', 'content': 'Analyze this log: Failed password for root from 203.0.113.5 port 44322 ssh2'}],
format={
'type': 'object',
'properties': {
'findings': {'type': 'array', 'items': {'type': 'object'}},
'summary': {'type': 'string'}
},
'required': ['findings', 'summary']
},
options={'temperature': 0.1},
)
print(json.dumps(json.loads(response.message.content), indent=2))
"
你应该看到一个干净的JSON对象,包含findings和summary键。没有markdown代码块,没有前言,只有JSON。
如何将异常告警发送到Discord和Slack?
向webhook URL发送带有JSON payload的POST请求。Discord使用带有颜色编码字段的embeds数组。Slack使用Block Kit,包含blocks和text字段。两者都接受单个HTTPS POST。
Discord webhook
在你的Discord服务器中创建webhook:服务器设置 > 集成 > Webhooks > 新建Webhook。复制URL。
安全存储webhook URL:
cat <<'EOF' > /opt/log-analyzer/.env
DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/YOUR_ID/YOUR_TOKEN
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
LOKI_URL=http://localhost:3100
OLLAMA_MODEL=gemma2:9b
EOF
chmod 600 /opt/log-analyzer/.env
告警发送函数:
import os
import requests
# Severity to Discord embed color (decimal)
SEVERITY_COLORS = {
"critical": 15158332, # red
"high": 15105570, # orange
"medium": 16776960, # yellow
"low": 3447003, # blue
}
def send_discord_alert(webhook_url: str, result: AnalysisResult) -> None:
"""Send findings to Discord as an embed."""
if not result.findings:
return
for finding in result.findings:
embed = {
"title": f"[{finding.severity.upper()}] {finding.title}",
"color": SEVERITY_COLORS.get(finding.severity, 3447003),
"fields": [
{
"name": "Explanation",
"value": finding.explanation or finding.recommendation or "N/A",
"inline": False,
},
{
"name": "Sample log lines",
"value": "```\n" + "\n".join(finding.log_lines[:5]) + "\n```",
"inline": False,
},
],
}
if finding.source_ips:
embed["fields"].append({
"name": "Source IPs",
"value": ", ".join(finding.source_ips),
"inline": True,
})
payload = {"embeds": [embed]}
resp = requests.post(webhook_url, json=payload, timeout=10)
resp.raise_for_status()
用curl测试Discord webhook:
curl -s -X POST "$DISCORD_WEBHOOK_URL" \
-H "Content-Type: application/json" \
-d '{
"embeds": [{
"title": "[HIGH] Test Alert - SSH Brute Force",
"color": 15105570,
"fields": [
{"name": "Explanation", "value": "Multiple failed SSH login attempts from 203.0.113.5", "inline": false},
{"name": "Sample log lines", "value": "```\nFailed password for root from 203.0.113.5\n```", "inline": false}
]
}]
}'
检查你的Discord频道。你应该看到一条带颜色编码的embed消息。
Slack webhook
在api.slack.com/apps创建一个Slack应用,启用Incoming Webhooks,复制webhook URL。
def send_slack_alert(webhook_url: str, result: AnalysisResult) -> None:
"""Send findings to Slack using Block Kit."""
if not result.findings:
return
for finding in result.findings:
severity_emoji = {
"critical": ":rotating_light:",
"high": ":warning:",
"medium": ":large_yellow_circle:",
"low": ":information_source:",
}
emoji = severity_emoji.get(finding.severity, ":grey_question:")
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{emoji} [{finding.severity.upper()}] {finding.title}",
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": finding.explanation or finding.recommendation or "N/A",
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "```" + "\n".join(finding.log_lines[:5]) + "```",
},
},
]
payload = {
"text": f"[{finding.severity.upper()}] {finding.title}",
"blocks": blocks,
}
resp = requests.post(webhook_url, json=payload, timeout=10)
resp.raise_for_status()
测试Slack webhook:
curl -s -X POST "$SLACK_WEBHOOK_URL" \
-H "Content-Type: application/json" \
-d '{"text": "[HIGH] Test Alert - SSH Brute Force", "blocks": [{"type": "header", "text": {"type": "plain_text", "text": ":warning: [HIGH] Test Alert"}}]}'
如何避免发送重复告警?
没有去重机制的话,同一个IP的SSH暴力破解每5分钟触发一次告警,持续数小时。使用基于文件的缓存,存储每个发现的标题和来源的哈希值。如果同一哈希在最近一小时内出现过,则跳过该告警。
import hashlib
import json
import time
from pathlib import Path
DEDUP_FILE = Path("/opt/log-analyzer/dedup_cache.json")
DEDUP_WINDOW = 3600 # seconds (1 hour)
def load_dedup_cache() -> dict:
if DEDUP_FILE.exists():
try:
return json.loads(DEDUP_FILE.read_text())
except (json.JSONDecodeError, OSError):
return {}
return {}
def save_dedup_cache(cache: dict) -> None:
# Prune expired entries
now = time.time()
cache = {k: v for k, v in cache.items() if now - v < DEDUP_WINDOW}
DEDUP_FILE.write_text(json.dumps(cache))
def is_duplicate(finding: Finding) -> bool:
"""Check if this finding was already alerted recently."""
cache = load_dedup_cache()
now = time.time()
# Hash on title + sorted source IPs + severity
key_material = f"{finding.title}|{finding.severity}|{'|'.join(sorted(finding.source_ips))}"
key = hashlib.sha256(key_material.encode()).hexdigest()[:16]
if key in cache and now - cache[key] < DEDUP_WINDOW:
return True
cache[key] = now
save_dedup_cache(cache)
return False
去重缓存是一个JSON文件,短哈希键映射到时间戳。旧条目在每次保存时被清理。1小时窗口是一个好的默认值:足够长以抑制重复告警,足够短以在同一问题间隔后再次出现时重新告警。
设置缓存文件的权限:
touch /opt/log-analyzer/dedup_cache.json
chmod 600 /opt/log-analyzer/dedup_cache.json
完整脚本
完整的log_analyzer.py将所有部分组合在一起:
#!/usr/bin/env python3
"""AI Log Analyzer - Query Loki, classify with Ollama, alert to Discord/Slack."""
import hashlib
import json
import logging
import os
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
import requests as http_requests
from ollama import chat
from pydantic import BaseModel
# --- Configuration ---
LOKI_URL = os.environ.get("LOKI_URL", "http://localhost:3100")
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "gemma2:9b")
DISCORD_WEBHOOK_URL = os.environ.get("DISCORD_WEBHOOK_URL", "")
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")
LOG_QUERIES = os.environ.get(
"LOG_QUERIES",
'{job="syslog"};{job="nginx"} |= "error"',
).split(";")
QUERY_WINDOW_MINUTES = int(os.environ.get("QUERY_WINDOW_MINUTES", "5"))
QUERY_LIMIT = int(os.environ.get("QUERY_LIMIT", "500"))
DEDUP_FILE = Path(os.environ.get("DEDUP_FILE", "/opt/log-analyzer/dedup_cache.json"))
DEDUP_WINDOW = int(os.environ.get("DEDUP_WINDOW", "3600"))
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger("log-analyzer")
# --- Models ---
class Finding(BaseModel):
title: str
severity: str
log_lines: list[str]
explanation: str = ""
recommendation: str = ""
source_ips: list[str] = []
affected_service: str = ""
class AnalysisResult(BaseModel):
findings: list[Finding]
summary: str
# --- Prompts ---
PROMPT_GENERAL = """You are a server log analyzer. Analyze the following log lines and classify each anomaly found.
Rules:
- Only report anomalies. Normal operational logs should be ignored.
- An anomaly is anything unexpected: errors, warnings, unusual patterns, failed operations.
- Group related log lines into a single finding.
- Assign a severity: "low", "medium", "high", or "critical".
Log lines:
{logs}
Respond with a JSON object matching this schema:
{{
"findings": [
{{
"title": "short description of the anomaly",
"severity": "low|medium|high|critical",
"log_lines": ["the relevant log lines"],
"explanation": "what this means and potential impact"
}}
],
"summary": "one sentence summary of overall log health"
}}
If no anomalies are found, return {{"findings": [], "summary": "No anomalies detected."}}.
"""
# --- Loki ---
def query_loki(logql: str) -> list[str]:
"""Query Loki for log lines from the last N minutes."""
now = datetime.now(timezone.utc)
start = now - timedelta(minutes=QUERY_WINDOW_MINUTES)
params = {
"query": logql,
"start": str(int(start.timestamp() * 1e9)),
"end": str(int(now.timestamp() * 1e9)),
"limit": QUERY_LIMIT,
}
resp = http_requests.get(
f"{LOKI_URL}/loki/api/v1/query_range",
params=params,
timeout=10,
)
resp.raise_for_status()
data = resp.json()
lines = []
for stream in data.get("data", {}).get("result", []):
for _ts, line in stream.get("values", []):
lines.append(line)
return lines
# --- Ollama ---
def analyze_logs(logs: list[str]) -> AnalysisResult:
"""Send logs to Ollama and get structured analysis back."""
if not logs:
return AnalysisResult(findings=[], summary="No logs to analyze.")
log_block = "\n".join(logs[:200])
prompt = PROMPT_GENERAL.format(logs=log_block)
response = chat(
model=OLLAMA_MODEL,
messages=[{"role": "user", "content": prompt}],
format=AnalysisResult.model_json_schema(),
options={"temperature": 0.1},
)
return AnalysisResult.model_validate_json(response.message.content)
# --- Deduplication ---
def load_dedup_cache() -> dict:
if DEDUP_FILE.exists():
try:
return json.loads(DEDUP_FILE.read_text())
except (json.JSONDecodeError, OSError):
return {}
return {}
def save_dedup_cache(cache: dict) -> None:
now = time.time()
cache = {k: v for k, v in cache.items() if now - v < DEDUP_WINDOW}
DEDUP_FILE.write_text(json.dumps(cache))
def is_duplicate(finding: Finding) -> bool:
cache = load_dedup_cache()
now = time.time()
key_material = f"{finding.title}|{finding.severity}|{'|'.join(sorted(finding.source_ips))}"
key = hashlib.sha256(key_material.encode()).hexdigest()[:16]
if key in cache and now - cache[key] < DEDUP_WINDOW:
return True
cache[key] = now
save_dedup_cache(cache)
return False
# --- Alerting ---
SEVERITY_COLORS = {
"critical": 15158332,
"high": 15105570,
"medium": 16776960,
"low": 3447003,
}
def send_discord_alert(finding: Finding) -> None:
if not DISCORD_WEBHOOK_URL:
return
embed = {
"title": f"[{finding.severity.upper()}] {finding.title}",
"color": SEVERITY_COLORS.get(finding.severity, 3447003),
"fields": [
{
"name": "Explanation",
"value": finding.explanation or finding.recommendation or "N/A",
"inline": False,
},
{
"name": "Sample log lines",
"value": "```\n" + "\n".join(finding.log_lines[:5]) + "\n```",
"inline": False,
},
],
}
if finding.source_ips:
embed["fields"].append({
"name": "Source IPs",
"value": ", ".join(finding.source_ips),
"inline": True,
})
resp = http_requests.post(
DISCORD_WEBHOOK_URL, json={"embeds": [embed]}, timeout=10
)
resp.raise_for_status()
def send_slack_alert(finding: Finding) -> None:
if not SLACK_WEBHOOK_URL:
return
severity_emoji = {
"critical": ":rotating_light:",
"high": ":warning:",
"medium": ":large_yellow_circle:",
"low": ":information_source:",
}
emoji = severity_emoji.get(finding.severity, ":grey_question:")
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{emoji} [{finding.severity.upper()}] {finding.title}",
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": finding.explanation or finding.recommendation or "N/A",
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "```" + "\n".join(finding.log_lines[:5]) + "```",
},
},
]
resp = http_requests.post(
SLACK_WEBHOOK_URL,
json={
"text": f"[{finding.severity.upper()}] {finding.title}",
"blocks": blocks,
},
timeout=10,
)
resp.raise_for_status()
def send_alerts(finding: Finding) -> None:
send_discord_alert(finding)
send_slack_alert(finding)
# --- Main ---
def main() -> int:
log.info("Starting log analysis run")
all_lines = []
for logql in LOG_QUERIES:
logql = logql.strip()
if not logql:
continue
try:
lines = query_loki(logql)
log.info("Query '%s' returned %d lines", logql, len(lines))
all_lines.extend(lines)
except Exception as e:
log.error("Loki query failed for '%s': %s", logql, e)
if not all_lines:
log.info("No log lines to analyze")
return 0
log.info("Analyzing %d total log lines with %s", len(all_lines), OLLAMA_MODEL)
try:
result = analyze_logs(all_lines)
except Exception as e:
log.error("Ollama analysis failed: %s", e)
return 1
log.info("Analysis complete: %d findings. %s", len(result.findings), result.summary)
alerted = 0
for finding in result.findings:
if is_duplicate(finding):
log.info("Skipping duplicate: %s", finding.title)
continue
try:
send_alerts(finding)
alerted += 1
log.info("Alerted: [%s] %s", finding.severity, finding.title)
except Exception as e:
log.error("Alert failed for '%s': %s", finding.title, e)
log.info("Run complete. %d new alerts sent.", alerted)
return 0
if __name__ == "__main__":
sys.exit(main())
设置权限:
chmod 750 /opt/log-analyzer/log_analyzer.py
chown root:root /opt/log-analyzer/log_analyzer.py
ls -la /opt/log-analyzer/
验证输出显示脚本为rwxr-x---,.env文件为rw-------。
运行手动测试:
cd /opt/log-analyzer
set -a && source .env && set +a
/opt/log-analyzer/venv/bin/python3 /opt/log-analyzer/log_analyzer.py
检查输出。你应该看到获取的日志行、分析结果和发送的告警(如果没有异常则显示跳过)。
如何使用systemd定时器自动运行AI日志分析?
创建一对systemd service和timer。service使用.env文件中的环境变量运行Python脚本。timer每5分钟触发一次。如果脚本失败,systemd记录失败,下次运行正常进行。
创建service unit:
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.service
[Unit]
Description=AI Log Analyzer - Ollama anomaly detection
After=network-online.target ollama.service loki.service
Wants=network-online.target
[Service]
Type=oneshot
EnvironmentFile=/opt/log-analyzer/.env
ExecStart=/opt/log-analyzer/venv/bin/python3 /opt/log-analyzer/log_analyzer.py
WorkingDirectory=/opt/log-analyzer
User=root
StandardOutput=journal
StandardError=journal
TimeoutStartSec=120
EOF
TimeoutStartSec=120给LLM最多2分钟完成推理。在8 GB VPS上处理100-200行日志,推理通常在15-25秒内完成。2分钟超时覆盖了Ollama需要从磁盘加载模型的情况。
service以User=root运行是为了简单。在生产环境中,建议创建一个专用的log-analyzer用户,授予其对.env文件的读取权限,并更新User=指令。脚本只需要HTTP访问localhost上的Loki和Ollama,不需要提升的权限。
创建timer:
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.timer
[Unit]
Description=Run AI Log Analyzer every 5 minutes
[Timer]
OnBootSec=2min
OnUnitActiveSec=5min
AccuracySec=30s
[Install]
WantedBy=timers.target
EOF
启用并启动timer:
sudo systemctl daemon-reload
sudo systemctl enable --now log-analyzer.timer
enable --now使timer在重启后保留并立即启动。
验证timer是否活跃:
systemctl status log-analyzer.timer
你应该看到active (waiting)和下次触发时间。查看上次运行时间:
systemctl list-timers log-analyzer.timer
第一次触发后,查看service日志:
journalctl -u log-analyzer.service -n 30 --no-pager
寻找Starting log analysis run和Run complete消息。如果看到Ollama analysis failed,可能是模型没有拉取或Ollama没有运行。
失败通知
如果分析器失败,你需要知道。给service unit添加一个OnFailure处理器:
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer-failure@.service
[Unit]
Description=Log Analyzer failure notification for %i
[Service]
Type=oneshot
ExecStart=/usr/bin/curl -s -X POST ${DISCORD_WEBHOOK_URL} \
-H "Content-Type: application/json" \
-d '{"content": ":x: **Log Analyzer Failed**\nUnit: %i\nTime: %H\nCheck: journalctl -u %i"}'
EnvironmentFile=/opt/log-analyzer/.env
EOF
给主service添加OnFailure指令:
sudo mkdir -p /etc/systemd/system/log-analyzer.service.d
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.service.d/failure.conf
[Unit]
OnFailure=log-analyzer-failure@%n.service
EOF
sudo systemctl daemon-reload
如果Python脚本以非零代码退出,systemd会向你的Discord频道发送通知。
基于LLM的日志分析有哪些局限性?
LLM日志分析是基于规则告警的补充,不是替代。它能捕捉难以用静态规则表达的模式。但它有一些真实的弱点需要了解。
幻觉。 模型可能将正常日志行标记为异常,或为良性事件编造解释。LLM的低严重性发现应当被视为建议而非事实。高严重性告警务必手动验证。
上下文窗口限制。 Gemma 2 9B有8192 token的上下文窗口。每行日志约20个token,最多大约400行(留出prompt和输出的空间)。脚本安全起见截断到200行。如果你的服务器在5分钟内生成超过200行,你需要用更具体的LogQL查询过滤,或者接受部分日志行被跳过。
无法随时间学习。 模型在运行之间没有记忆。它无法学习到某个特定日志模式在你的环境中是正常的。每个批次都从头分析。如果你有一个反复出现的良性但看似可疑的日志消息,将其添加到LogQL排除过滤器:{job="syslog"} != "expected noisy message"。
推理延迟。 在4 vCPU / 8 GB VPS上,每批推理需要12-22秒。对于5分钟定时器来说没问题,但对于实时告警来说太慢了。对于时间敏感的事件(磁盘满、OOM),保留传统的Prometheus告警 。
假阴性。 小模型会遗漏细微模式。一个缓慢的内存泄漏在数天内产生略微升高的swap使用量,不会出现在5分钟的日志窗口中。使用Prometheus指标和Grafana告警进行基于趋势的检测。
运行成本。 虽然没有按token的API费用,但模型加载时使用约5.8 GB RAM。在8 GB VPS上,这是你大部分的内存。如果你的应用服务器需要这些RAM,在单独的VPS上运行Ollama,或使用更小的gemma2:2b模型(1.6 GB RAM,准确度较低)。
何时使用这个 vs 传统告警
| 用例 | LLM日志分析 | 传统告警(Prometheus) |
|---|---|---|
| 「看起来有问题但我写不出规则」 | 是 | 否 |
| SSH暴力破解检测 | 是(擅长模式识别) | 是(fail2ban更快) |
| 磁盘满 / OOM | 否(太慢) | 是 |
| 未知错误模式 | 是 | 否 |
| 指标阈值超越 | 否 | 是 |
| 日志格式变化 | 是(自动适应) | 否(规则失效) |
最佳配置是两者同时运行。Prometheus用快速告警处理已知的故障模式。LLM通过阅读实际日志文本捕捉未知的未知问题。
故障排除
Ollama返回"model not found": 运行ollama list查看可用模型。用ollama pull gemma2:9b拉取模型。
Loki查询返回空结果: 检查Promtail是否在运行(systemctl status promtail),以及LogQL查询中的job标签是否与Promtail配置匹配。直接用curl对Loki API进行测试。
内存不足: 用free -h检查RAM。如果Ollama的模型消耗过多,在Ollama service override中设置OLLAMA_KEEP_ALIVE=1m。考虑切换到gemma2:2b以降低RAM使用。
Discord/Slack告警未到达: 用告警部分的curl命令测试webhook URL。在分析器日志中检查HTTP错误:journalctl -u log-analyzer.service -n 50。
推理缓慢: 用nproc验证你的VPS是否有预期的CPU核心数。Ollama使用所有可用核心进行推理。如果另一个进程在消耗CPU,推理会变慢。运行期间用top检查。
JSON解析错误: 如果model_validate_json失败,说明模型在schema强制下仍产生了无效JSON。这很少见但在某些边缘情况下会发生。脚本记录错误并继续下次运行。如果反复发生,尝试换一个模型。
查看分析器日志:
journalctl -u log-analyzer.service -f
对于Ollama相关的问题:
journalctl -u ollama.service -f
关于AIOps管道的下一步,请参阅如何构建基于这些告警采取行动的自动修复系统 。关于替代的可观测性方案,请参阅SigNoz和OpenObserve指南。
版权所有 2026 Virtua.Cloud。保留所有权利。 本内容为 Virtua.Cloud 团队原创作品。 未经书面许可,禁止复制、转载或再分发。