AI Log Analysis with Ollama on a VPS: Detect Anomalies with a Local LLM

13 min read·Matthieu·SlackDiscordAIOpsLog AnalysisPythonLokiOllama|

Build a production-ready AI log analysis pipeline on your VPS. Query Loki for logs, classify anomalies with a local LLM via Ollama, and route alerts to Discord or Slack using a Python script and systemd timer.

Your Loki pipeline collects logs. Grafana dashboards let you search them. But nobody is watching those dashboards at 3 AM when an SSH brute-force starts or a disk fills up. You need something that reads your logs continuously and tells you when something looks wrong.

This tutorial builds that system. A Python script queries Loki every 5 minutes, feeds log batches to a local LLM running on Ollama, gets structured JSON classifications back, and sends anomaly alerts to Discord or Slack. No cloud AI APIs, no per-token billing, and your log data never leaves your server.

By the end you will have a working systemd timer that runs unattended, deduplicates alerts, and handles failures gracefully.

Prerequisites:

This article is part of the AIOps series AIOps on a VPS: AI-Driven Server Management with Open-Source Tools.

How do you install Ollama on a VPS for log analysis?

Ollama runs LLMs locally with a single binary and exposes an HTTP API on port 11434. Install it with the official script, pull a model, and verify the API responds.

The installer requires zstd for extraction. Install it first:

apt-get update && apt-get install -y zstd

Download and run the install script:

curl -fsSL https://ollama.com/install.sh -o ollama-install.sh
sha256sum ollama-install.sh

Inspect the script before running it. Read through the shell code and compare the checksum to a known-good copy if you have one. You can also review the source on GitHub. Then run it:

sh ollama-install.sh

The installer creates a systemd service called ollama. Verify it is running:

systemctl status ollama

You should see active (running) in the output. Sharp eyes: notice the Loaded: loaded (/etc/systemd/system/ollama.service; enabled; preset: enabled) line. The installer already set it to start on boot.

Verify the API is listening:

curl -s http://localhost:11434/api/tags | python3 -m json.tool

This returns a JSON object with a models array. It will be empty until you pull a model.

Bind Ollama to localhost only

By default, Ollama listens on 127.0.0.1:11434. Confirm this:

ss -tlnp | grep 11434

If the output shows 0.0.0.0:11434, Ollama is exposed to the internet. Fix this by setting the environment variable in the systemd unit:

sudo mkdir -p /etc/systemd/system/ollama.service.d
cat <<'EOF' | sudo tee /etc/systemd/system/ollama.service.d/override.conf
[Service]
Environment="OLLAMA_HOST=127.0.0.1:11434"
EOF
sudo systemctl daemon-reload
sudo systemctl restart ollama
ss -tlnp | grep 11434

Confirm the output now shows 127.0.0.1:11434. Exposing an LLM API to the internet lets anyone run inference on your server.

Which LLM model works best for server log anomaly detection?

For log analysis on an 8 GB VPS, you need a model that fits in memory alongside Loki and Promtail. Two models work well: Gemma 2 9B for general log classification and Llama 3.1 8B for security-focused analysis. Both run in Q4 quantization at around 5-6 GB RAM.

Pull both models:

ollama pull gemma2:9b
ollama pull llama3.1:8b

Each download is roughly 5-6 GB. After pulling, verify:

ollama list

Test a quick inference to confirm the model loads:

curl -s http://localhost:11434/api/generate \
  -d '{"model": "gemma2:9b", "prompt": "Classify this log line: Failed password for root from 203.0.113.5 port 22", "stream": false}' \
  | python3 -m json.tool

Sharp eyes: look at eval_duration in the response. This is the inference time in nanoseconds. Divide by 1,000,000 to get milliseconds.

Model comparison on a 4 vCPU / 8 GB VPS

The following numbers were measured on a Virtua Cloud VCS-8 (4 vCPU Ryzen, 8 GB RAM, NVMe) processing a batch of 100 syslog lines with the model already loaded in memory:

Metric Gemma 2 9B (Q4_K_M) Llama 3.1 8B (Q4_K_M)
Model size on disk 5.4 GB 4.9 GB
RAM usage (loaded) ~5.8 GB ~5.2 GB
Time per 100-line batch ~12-18s ~14-22s
Tokens/sec ~18-25 ~15-20
Security log accuracy Good Better
General anomaly detection Better Good

Cold starts are slower. The first inference after Ollama loads the model from disk adds 5-10 seconds. Subsequent calls within the keep-alive window run at the speeds above.

Recommendation: Start with gemma2:9b for general-purpose log analysis. Switch to llama3.1:8b if you primarily analyze auth/security logs.

RAM budget on 8 GB

Component RAM usage
OS + system processes ~400 MB
Loki ~300-500 MB
Promtail ~50 MB
Ollama (idle, no model loaded) ~30 MB
Ollama (gemma2:9b loaded) ~5.8 GB
Python script ~50 MB
Total ~6.7-6.9 GB

This fits in 8 GB with ~1 GB of headroom. Ollama automatically unloads models after 5 minutes of inactivity (configurable with OLLAMA_KEEP_ALIVE), freeing the RAM back. The systemd timer triggers every 5 minutes, so the model stays loaded during active analysis windows and unloads between them.

If memory is tight, use gemma2:9b with OLLAMA_KEEP_ALIVE=1m so the model unloads sooner after each batch.

How do you query Loki logs from a Python script?

Query Loki's HTTP API at /loki/api/v1/query_range with a LogQL expression and a time window. The API returns JSON with log streams. Use Python's requests library to fetch the last 5 minutes of logs for a given job label.

First, set up the project:

mkdir -p /opt/log-analyzer
cd /opt/log-analyzer

Create the requirements file:

cat <<'EOF' > /opt/log-analyzer/requirements.txt
requests>=2.31.0
pydantic>=2.5.0
ollama>=0.4.0
EOF

Install dependencies in a virtual environment. On Ubuntu 24.04, you need the python3-venv package first:

apt-get install -y python3.12-venv
python3 -m venv /opt/log-analyzer/venv
/opt/log-analyzer/venv/bin/pip install -r /opt/log-analyzer/requirements.txt

Verify the install:

/opt/log-analyzer/venv/bin/python -c "import requests, pydantic, ollama; print('OK')"

Loki query function

The function below queries Loki for recent logs:

import requests
from datetime import datetime, timedelta, timezone


def query_loki(
    loki_url: str,
    logql: str,
    minutes: int = 5,
    limit: int = 500,
) -> list[str]:
    """Query Loki for log lines from the last N minutes."""
    now = datetime.now(timezone.utc)
    start = now - timedelta(minutes=minutes)

    params = {
        "query": logql,
        "start": str(int(start.timestamp() * 1e9)),  # nanosecond epoch
        "end": str(int(now.timestamp() * 1e9)),
        "limit": limit,
    }

    resp = requests.get(
        f"{loki_url}/loki/api/v1/query_range",
        params=params,
        timeout=10,
    )
    resp.raise_for_status()

    data = resp.json()
    lines = []
    for stream in data.get("data", {}).get("result", []):
        for _ts, line in stream.get("values", []):
            lines.append(line)

    return lines

The start and end parameters use nanosecond Unix timestamps. Loki's response nests log lines inside data.result[].values[], where each value is a [timestamp, line] pair.

Example LogQL queries you will use:

# All syslog entries
SYSLOG_QUERY = '{job="syslog"}'

# Nginx error logs
NGINX_QUERY = '{job="nginx"} |= "error"'

# SSH authentication events
AUTH_QUERY = '{job="syslog"} |~ "(sshd|pam_unix)"'

Test the query against your running Loki instance:

curl -s 'http://localhost:3100/loki/api/v1/query_range' \
  --data-urlencode 'query={job="syslog"}' \
  --data-urlencode "start=$(date -d '5 minutes ago' +%s)000000000" \
  --data-urlencode "end=$(date +%s)000000000" \
  --data-urlencode 'limit=10' \
  | python3 -m json.tool | head -30

You should see log lines in the result array. If the array is empty, check that Promtail is shipping logs to Loki and that the job label matches your Promtail configuration Centralized Log Management with Grafana Loki on a VPS.

How do you write prompts that classify log entries as anomalies?

The prompt is the brain of this system. A good prompt tells the LLM exactly what to look for, defines classification categories, and demands structured output. Bad prompts produce vague summaries. Good prompts produce actionable JSON.

Three prompt templates cover most server log analysis needs: general anomaly detection, security event detection, and performance issue detection. Each prompt includes the classification schema inline so the model knows the expected output format.

Prompt 1: General anomaly detection

PROMPT_GENERAL = """You are a server log analyzer. Analyze the following log lines and classify each anomaly found.

Rules:
- Only report anomalies. Normal operational logs should be ignored.
- An anomaly is anything unexpected: errors, warnings, unusual patterns, failed operations.
- Group related log lines into a single finding.
- Assign a severity: "low", "medium", "high", or "critical".

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of the anomaly",
      "severity": "low|medium|high|critical",
      "log_lines": ["the relevant log lines"],
      "explanation": "what this means and potential impact"
    }}
  ],
  "summary": "one sentence summary of overall log health"
}}

If no anomalies are found, return {{"findings": [], "summary": "No anomalies detected."}}.
"""

Prompt 2: Security event detection

PROMPT_SECURITY = """You are a security analyst reviewing server logs. Identify security-relevant events.

Focus on:
- Brute-force attempts (repeated failed logins from same IP)
- Successful logins from unusual IPs or at unusual times
- Privilege escalation attempts (sudo failures, su attempts)
- Port scanning patterns
- Unauthorized access attempts to files or services

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of security event",
      "severity": "low|medium|high|critical",
      "source_ips": ["IP addresses involved"],
      "log_lines": ["the relevant log lines"],
      "recommendation": "suggested response action"
    }}
  ],
  "summary": "one sentence security posture assessment"
}}

If no security events are found, return {{"findings": [], "summary": "No security events detected."}}.
"""

Prompt 3: Performance issue detection

PROMPT_PERFORMANCE = """You are a performance engineer reviewing server logs. Identify performance-related issues.

Focus on:
- High response times or timeouts
- Resource exhaustion (OOM kills, disk full, connection limits)
- Service restarts or crashes
- Queue backlogs or processing delays
- Error rate spikes

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of performance issue",
      "severity": "low|medium|high|critical",
      "affected_service": "service name if identifiable",
      "log_lines": ["the relevant log lines"],
      "explanation": "what this means for system performance"
    }}
  ],
  "summary": "one sentence performance assessment"
}}

If no performance issues are found, return {{"findings": [], "summary": "No performance issues detected."}}.
"""

Embedding the JSON schema directly in the prompt is intentional. It gives the model two signals: the format parameter enforces valid JSON, and the schema in the prompt guides the structure. This combination produces reliable output from small models.

How do you get structured JSON output from Ollama?

Ollama supports structured output through the format parameter in its API. Pass a JSON schema and the model will only generate tokens that conform to it. Combined with a Pydantic model on the Python side, you get validated, typed data back from every inference call.

Define the Pydantic models:

from pydantic import BaseModel


class Finding(BaseModel):
    title: str
    severity: str  # low, medium, high, critical
    log_lines: list[str]
    explanation: str = ""
    recommendation: str = ""
    source_ips: list[str] = []
    affected_service: str = ""


class AnalysisResult(BaseModel):
    findings: list[Finding]
    summary: str

Call Ollama with the schema enforced:

from ollama import chat


def analyze_logs(
    logs: list[str],
    model: str = "gemma2:9b",
    prompt_template: str = PROMPT_GENERAL,
) -> AnalysisResult:
    """Send logs to Ollama and get structured analysis back."""
    if not logs:
        return AnalysisResult(findings=[], summary="No logs to analyze.")

    # Truncate to avoid context window issues
    log_block = "\n".join(logs[:200])
    prompt = prompt_template.format(logs=log_block)

    response = chat(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        format=AnalysisResult.model_json_schema(),
        options={"temperature": 0.1},
    )

    return AnalysisResult.model_validate_json(response.message.content)

Key details:

  • format=AnalysisResult.model_json_schema() tells Ollama to enforce the JSON schema at the token generation level. The model cannot produce output that violates the schema.
  • temperature: 0.1 keeps the output deterministic. Log classification should not be creative.
  • Truncating to 200 lines prevents context window overflows. Gemma 2 9B has an 8192 token context window. 200 log lines at ~20 tokens each uses roughly half the context.
  • model_validate_json() parses the string into a typed Pydantic object. If parsing fails (rare with schema enforcement), it raises a ValidationError you can catch.

Test the function from the Python shell:

/opt/log-analyzer/venv/bin/python3 -c "
from ollama import chat
import json

response = chat(
    model='gemma2:9b',
    messages=[{'role': 'user', 'content': 'Analyze this log: Failed password for root from 203.0.113.5 port 44322 ssh2'}],
    format={
        'type': 'object',
        'properties': {
            'findings': {'type': 'array', 'items': {'type': 'object'}},
            'summary': {'type': 'string'}
        },
        'required': ['findings', 'summary']
    },
    options={'temperature': 0.1},
)
print(json.dumps(json.loads(response.message.content), indent=2))
"

You should see a clean JSON object with findings and summary keys. No markdown fences, no preamble, just JSON.

How do you send anomaly alerts to Discord and Slack?

Send a POST request with a JSON payload to a webhook URL. Discord uses an embeds array with color-coded fields. Slack uses Block Kit with blocks and text fields. Both accept a single HTTPS POST.

Discord webhook

Create a webhook in your Discord server: Server Settings > Integrations > Webhooks > New Webhook. Copy the URL.

Store the webhook URL securely:

cat <<'EOF' > /opt/log-analyzer/.env
DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/YOUR_ID/YOUR_TOKEN
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
LOKI_URL=http://localhost:3100
OLLAMA_MODEL=gemma2:9b
EOF
chmod 600 /opt/log-analyzer/.env

The alert sender function:

import os
import requests

# Severity to Discord embed color (decimal)
SEVERITY_COLORS = {
    "critical": 15158332,  # red
    "high": 15105570,      # orange
    "medium": 16776960,    # yellow
    "low": 3447003,        # blue
}


def send_discord_alert(webhook_url: str, result: AnalysisResult) -> None:
    """Send findings to Discord as an embed."""
    if not result.findings:
        return

    for finding in result.findings:
        embed = {
            "title": f"[{finding.severity.upper()}] {finding.title}",
            "color": SEVERITY_COLORS.get(finding.severity, 3447003),
            "fields": [
                {
                    "name": "Explanation",
                    "value": finding.explanation or finding.recommendation or "N/A",
                    "inline": False,
                },
                {
                    "name": "Sample log lines",
                    "value": "```\n" + "\n".join(finding.log_lines[:5]) + "\n```",
                    "inline": False,
                },
            ],
        }
        if finding.source_ips:
            embed["fields"].append({
                "name": "Source IPs",
                "value": ", ".join(finding.source_ips),
                "inline": True,
            })

        payload = {"embeds": [embed]}
        resp = requests.post(webhook_url, json=payload, timeout=10)
        resp.raise_for_status()

Test the Discord webhook with curl:

curl -s -X POST "$DISCORD_WEBHOOK_URL" \
  -H "Content-Type: application/json" \
  -d '{
    "embeds": [{
      "title": "[HIGH] Test Alert - SSH Brute Force",
      "color": 15105570,
      "fields": [
        {"name": "Explanation", "value": "Multiple failed SSH login attempts from 203.0.113.5", "inline": false},
        {"name": "Sample log lines", "value": "```\nFailed password for root from 203.0.113.5\n```", "inline": false}
      ]
    }]
  }'

Check your Discord channel. You should see a color-coded embed message.

Slack webhook

Create a Slack app at api.slack.com/apps, enable Incoming Webhooks, and copy the webhook URL.

def send_slack_alert(webhook_url: str, result: AnalysisResult) -> None:
    """Send findings to Slack using Block Kit."""
    if not result.findings:
        return

    for finding in result.findings:
        severity_emoji = {
            "critical": ":rotating_light:",
            "high": ":warning:",
            "medium": ":large_yellow_circle:",
            "low": ":information_source:",
        }
        emoji = severity_emoji.get(finding.severity, ":grey_question:")

        blocks = [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": f"{emoji} [{finding.severity.upper()}] {finding.title}",
                },
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": finding.explanation or finding.recommendation or "N/A",
                },
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": "```" + "\n".join(finding.log_lines[:5]) + "```",
                },
            },
        ]

        payload = {
            "text": f"[{finding.severity.upper()}] {finding.title}",
            "blocks": blocks,
        }
        resp = requests.post(webhook_url, json=payload, timeout=10)
        resp.raise_for_status()

Test the Slack webhook:

curl -s -X POST "$SLACK_WEBHOOK_URL" \
  -H "Content-Type: application/json" \
  -d '{"text": "[HIGH] Test Alert - SSH Brute Force", "blocks": [{"type": "header", "text": {"type": "plain_text", "text": ":warning: [HIGH] Test Alert"}}]}'

How do you avoid sending duplicate alerts?

Without deduplication, the same SSH brute-force from the same IP triggers an alert every 5 minutes for hours. Use a simple file-based cache that stores a hash of each finding's title and source. Skip the alert if the same hash appeared in the last hour.

import hashlib
import json
import time
from pathlib import Path

DEDUP_FILE = Path("/opt/log-analyzer/dedup_cache.json")
DEDUP_WINDOW = 3600  # seconds (1 hour)


def load_dedup_cache() -> dict:
    if DEDUP_FILE.exists():
        try:
            return json.loads(DEDUP_FILE.read_text())
        except (json.JSONDecodeError, OSError):
            return {}
    return {}


def save_dedup_cache(cache: dict) -> None:
    # Prune expired entries
    now = time.time()
    cache = {k: v for k, v in cache.items() if now - v < DEDUP_WINDOW}
    DEDUP_FILE.write_text(json.dumps(cache))


def is_duplicate(finding: Finding) -> bool:
    """Check if this finding was already alerted recently."""
    cache = load_dedup_cache()
    now = time.time()

    # Hash on title + sorted source IPs + severity
    key_material = f"{finding.title}|{finding.severity}|{'|'.join(sorted(finding.source_ips))}"
    key = hashlib.sha256(key_material.encode()).hexdigest()[:16]

    if key in cache and now - cache[key] < DEDUP_WINDOW:
        return True

    cache[key] = now
    save_dedup_cache(cache)
    return False

This keeps the dedup cache as a JSON file with short hash keys mapping to timestamps. Old entries get pruned on every save. The 1-hour window is a good default: long enough to suppress repeat alerts, short enough to re-alert if the same issue reoccurs after a break.

Set proper permissions on the cache file:

touch /opt/log-analyzer/dedup_cache.json
chmod 600 /opt/log-analyzer/dedup_cache.json

The complete script

The full log_analyzer.py ties everything together:

#!/usr/bin/env python3
"""AI Log Analyzer - Query Loki, classify with Ollama, alert to Discord/Slack."""

import hashlib
import json
import logging
import os
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path

import requests as http_requests
from ollama import chat
from pydantic import BaseModel

# --- Configuration ---

LOKI_URL = os.environ.get("LOKI_URL", "http://localhost:3100")
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "gemma2:9b")
DISCORD_WEBHOOK_URL = os.environ.get("DISCORD_WEBHOOK_URL", "")
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")
LOG_QUERIES = os.environ.get(
    "LOG_QUERIES",
    '{job="syslog"};{job="nginx"} |= "error"',
).split(";")
QUERY_WINDOW_MINUTES = int(os.environ.get("QUERY_WINDOW_MINUTES", "5"))
QUERY_LIMIT = int(os.environ.get("QUERY_LIMIT", "500"))
DEDUP_FILE = Path(os.environ.get("DEDUP_FILE", "/opt/log-analyzer/dedup_cache.json"))
DEDUP_WINDOW = int(os.environ.get("DEDUP_WINDOW", "3600"))

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger("log-analyzer")


# --- Models ---

class Finding(BaseModel):
    title: str
    severity: str
    log_lines: list[str]
    explanation: str = ""
    recommendation: str = ""
    source_ips: list[str] = []
    affected_service: str = ""


class AnalysisResult(BaseModel):
    findings: list[Finding]
    summary: str


# --- Prompts ---

PROMPT_GENERAL = """You are a server log analyzer. Analyze the following log lines and classify each anomaly found.

Rules:
- Only report anomalies. Normal operational logs should be ignored.
- An anomaly is anything unexpected: errors, warnings, unusual patterns, failed operations.
- Group related log lines into a single finding.
- Assign a severity: "low", "medium", "high", or "critical".

Log lines:
{logs}

Respond with a JSON object matching this schema:
{{
  "findings": [
    {{
      "title": "short description of the anomaly",
      "severity": "low|medium|high|critical",
      "log_lines": ["the relevant log lines"],
      "explanation": "what this means and potential impact"
    }}
  ],
  "summary": "one sentence summary of overall log health"
}}

If no anomalies are found, return {{"findings": [], "summary": "No anomalies detected."}}.
"""


# --- Loki ---

def query_loki(logql: str) -> list[str]:
    """Query Loki for log lines from the last N minutes."""
    now = datetime.now(timezone.utc)
    start = now - timedelta(minutes=QUERY_WINDOW_MINUTES)

    params = {
        "query": logql,
        "start": str(int(start.timestamp() * 1e9)),
        "end": str(int(now.timestamp() * 1e9)),
        "limit": QUERY_LIMIT,
    }

    resp = http_requests.get(
        f"{LOKI_URL}/loki/api/v1/query_range",
        params=params,
        timeout=10,
    )
    resp.raise_for_status()

    data = resp.json()
    lines = []
    for stream in data.get("data", {}).get("result", []):
        for _ts, line in stream.get("values", []):
            lines.append(line)

    return lines


# --- Ollama ---

def analyze_logs(logs: list[str]) -> AnalysisResult:
    """Send logs to Ollama and get structured analysis back."""
    if not logs:
        return AnalysisResult(findings=[], summary="No logs to analyze.")

    log_block = "\n".join(logs[:200])
    prompt = PROMPT_GENERAL.format(logs=log_block)

    response = chat(
        model=OLLAMA_MODEL,
        messages=[{"role": "user", "content": prompt}],
        format=AnalysisResult.model_json_schema(),
        options={"temperature": 0.1},
    )

    return AnalysisResult.model_validate_json(response.message.content)


# --- Deduplication ---

def load_dedup_cache() -> dict:
    if DEDUP_FILE.exists():
        try:
            return json.loads(DEDUP_FILE.read_text())
        except (json.JSONDecodeError, OSError):
            return {}
    return {}


def save_dedup_cache(cache: dict) -> None:
    now = time.time()
    cache = {k: v for k, v in cache.items() if now - v < DEDUP_WINDOW}
    DEDUP_FILE.write_text(json.dumps(cache))


def is_duplicate(finding: Finding) -> bool:
    cache = load_dedup_cache()
    now = time.time()
    key_material = f"{finding.title}|{finding.severity}|{'|'.join(sorted(finding.source_ips))}"
    key = hashlib.sha256(key_material.encode()).hexdigest()[:16]

    if key in cache and now - cache[key] < DEDUP_WINDOW:
        return True

    cache[key] = now
    save_dedup_cache(cache)
    return False


# --- Alerting ---

SEVERITY_COLORS = {
    "critical": 15158332,
    "high": 15105570,
    "medium": 16776960,
    "low": 3447003,
}


def send_discord_alert(finding: Finding) -> None:
    if not DISCORD_WEBHOOK_URL:
        return

    embed = {
        "title": f"[{finding.severity.upper()}] {finding.title}",
        "color": SEVERITY_COLORS.get(finding.severity, 3447003),
        "fields": [
            {
                "name": "Explanation",
                "value": finding.explanation or finding.recommendation or "N/A",
                "inline": False,
            },
            {
                "name": "Sample log lines",
                "value": "```\n" + "\n".join(finding.log_lines[:5]) + "\n```",
                "inline": False,
            },
        ],
    }
    if finding.source_ips:
        embed["fields"].append({
            "name": "Source IPs",
            "value": ", ".join(finding.source_ips),
            "inline": True,
        })

    resp = http_requests.post(
        DISCORD_WEBHOOK_URL, json={"embeds": [embed]}, timeout=10
    )
    resp.raise_for_status()


def send_slack_alert(finding: Finding) -> None:
    if not SLACK_WEBHOOK_URL:
        return

    severity_emoji = {
        "critical": ":rotating_light:",
        "high": ":warning:",
        "medium": ":large_yellow_circle:",
        "low": ":information_source:",
    }
    emoji = severity_emoji.get(finding.severity, ":grey_question:")

    blocks = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": f"{emoji} [{finding.severity.upper()}] {finding.title}",
            },
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": finding.explanation or finding.recommendation or "N/A",
            },
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": "```" + "\n".join(finding.log_lines[:5]) + "```",
            },
        },
    ]

    resp = http_requests.post(
        SLACK_WEBHOOK_URL,
        json={
            "text": f"[{finding.severity.upper()}] {finding.title}",
            "blocks": blocks,
        },
        timeout=10,
    )
    resp.raise_for_status()


def send_alerts(finding: Finding) -> None:
    send_discord_alert(finding)
    send_slack_alert(finding)


# --- Main ---

def main() -> int:
    log.info("Starting log analysis run")
    all_lines = []

    for logql in LOG_QUERIES:
        logql = logql.strip()
        if not logql:
            continue
        try:
            lines = query_loki(logql)
            log.info("Query '%s' returned %d lines", logql, len(lines))
            all_lines.extend(lines)
        except Exception as e:
            log.error("Loki query failed for '%s': %s", logql, e)

    if not all_lines:
        log.info("No log lines to analyze")
        return 0

    log.info("Analyzing %d total log lines with %s", len(all_lines), OLLAMA_MODEL)

    try:
        result = analyze_logs(all_lines)
    except Exception as e:
        log.error("Ollama analysis failed: %s", e)
        return 1

    log.info("Analysis complete: %d findings. %s", len(result.findings), result.summary)

    alerted = 0
    for finding in result.findings:
        if is_duplicate(finding):
            log.info("Skipping duplicate: %s", finding.title)
            continue
        try:
            send_alerts(finding)
            alerted += 1
            log.info("Alerted: [%s] %s", finding.severity, finding.title)
        except Exception as e:
            log.error("Alert failed for '%s': %s", finding.title, e)

    log.info("Run complete. %d new alerts sent.", alerted)
    return 0


if __name__ == "__main__":
    sys.exit(main())

Set permissions:

chmod 750 /opt/log-analyzer/log_analyzer.py
chown root:root /opt/log-analyzer/log_analyzer.py
ls -la /opt/log-analyzer/

Verify the complete output shows rwxr-x--- for the script and rw------- for the .env file.

Test a manual run:

cd /opt/log-analyzer
set -a && source .env && set +a
/opt/log-analyzer/venv/bin/python3 /opt/log-analyzer/log_analyzer.py

Check the output. You should see log lines fetched, analysis results, and alerts sent (or skipped if no anomalies).

How do you run AI log analysis automatically with a systemd timer?

Create a systemd service and timer pair. The service runs the Python script with environment variables from the .env file. The timer triggers it every 5 minutes. If the script fails, systemd logs the failure and the next run proceeds normally.

Create the service unit:

cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.service
[Unit]
Description=AI Log Analyzer - Ollama anomaly detection
After=network-online.target ollama.service loki.service
Wants=network-online.target

[Service]
Type=oneshot
EnvironmentFile=/opt/log-analyzer/.env
ExecStart=/opt/log-analyzer/venv/bin/python3 /opt/log-analyzer/log_analyzer.py
WorkingDirectory=/opt/log-analyzer
User=root
StandardOutput=journal
StandardError=journal
TimeoutStartSec=120
EOF

The TimeoutStartSec=120 gives the LLM up to 2 minutes to complete inference. On an 8 GB VPS with 100-200 log lines, inference typically finishes in 15-25 seconds. The 2-minute timeout handles cases where Ollama needs to load the model from disk first.

The service runs as User=root for simplicity. In production, consider creating a dedicated log-analyzer user, granting it read access to the .env file, and updating the User= directive. The script only needs HTTP access to Loki and Ollama on localhost, so it does not require elevated privileges.

Create the timer:

cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.timer
[Unit]
Description=Run AI Log Analyzer every 5 minutes

[Timer]
OnBootSec=2min
OnUnitActiveSec=5min
AccuracySec=30s

[Install]
WantedBy=timers.target
EOF

Enable and start the timer:

sudo systemctl daemon-reload
sudo systemctl enable --now log-analyzer.timer

The enable --now makes the timer survive reboots and starts it immediately.

Verify the timer is active:

systemctl status log-analyzer.timer

You should see active (waiting) and the next trigger time. Check when it last ran:

systemctl list-timers log-analyzer.timer

After the first trigger, check the service logs:

journalctl -u log-analyzer.service -n 30 --no-pager

Sharp eyes: look for the Starting log analysis run and Run complete messages. If you see Ollama analysis failed, the model might not be pulled or Ollama might not be running.

Failure notifications

If the analyzer fails, you want to know. Add an OnFailure handler to the service unit:

cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer-failure@.service
[Unit]
Description=Log Analyzer failure notification for %i

[Service]
Type=oneshot
ExecStart=/usr/bin/curl -s -X POST ${DISCORD_WEBHOOK_URL} \
  -H "Content-Type: application/json" \
  -d '{"content": ":x: **Log Analyzer Failed**\nUnit: %i\nTime: %H\nCheck: journalctl -u %i"}'
EnvironmentFile=/opt/log-analyzer/.env
EOF

Add the OnFailure directive to the main service:

sudo mkdir -p /etc/systemd/system/log-analyzer.service.d
cat <<'EOF' | sudo tee /etc/systemd/system/log-analyzer.service.d/failure.conf
[Unit]
OnFailure=log-analyzer-failure@%n.service
EOF
sudo systemctl daemon-reload

Now if the Python script exits with a non-zero code, systemd sends a notification to your Discord channel.

What are the limitations of LLM-based log analysis?

LLM log analysis is a complement to rule-based alerting, not a replacement. It catches patterns that are hard to express as static rules. But it has real weaknesses you need to understand.

Hallucinations. The model can flag normal log lines as anomalies or invent explanations for benign events. Low-severity findings from the LLM should be treated as suggestions, not facts. Always verify high-severity alerts manually.

Context window limits. Gemma 2 9B has an 8192 token context window. At ~20 tokens per log line, that is roughly 400 lines maximum (with room for the prompt and output). The script truncates at 200 lines to stay safe. If your server generates more than 200 lines in 5 minutes, you either need to filter with more specific LogQL queries or accept that some lines get skipped.

No learning over time. The model has no memory between runs. It cannot learn that a specific log pattern is normal for your environment. Every batch is analyzed from scratch. If you have a recurring log message that is benign but looks suspicious, add it to a LogQL exclusion filter: {job="syslog"} != "expected noisy message".

Inference latency. On a 4 vCPU / 8 GB VPS, inference takes 12-22 seconds per batch. This is fine for a 5-minute timer but too slow for real-time alerting. For time-sensitive events (disk full, OOM), keep traditional Prometheus alerts Build a Self-Healing VPS with Prometheus and Ollama.

False negatives. Small models miss subtle patterns. A slow memory leak that produces slightly elevated swap usage over days will not show up in a 5-minute log window. Use Prometheus metrics and Grafana alerts for trend-based detection.

Cost of running. While there are no per-token API costs, the model uses ~5.8 GB of RAM when loaded. On an 8 GB VPS, that is most of your memory. If your application server needs that RAM, run Ollama on a separate VPS or use the smaller gemma2:2b model (1.6 GB RAM, lower accuracy).

When to use this vs traditional alerting

Use case LLM log analysis Traditional alerting (Prometheus)
"Something looks wrong but I can't write a rule for it" Yes No
SSH brute-force detection Yes (good at pattern recognition) Yes (fail2ban is faster)
Disk full / OOM No (too slow) Yes
Unknown error patterns Yes No
Metric threshold crossing No Yes
Log format changes Yes (adapts automatically) No (rules break)

The best setup runs both. Prometheus handles the known failure modes with fast alerting. The LLM catches the unknown unknowns by reading the actual log text.

Troubleshooting

Ollama returns "model not found": Run ollama list to check available models. Pull the model with ollama pull gemma2:9b.

Loki query returns empty results: Check that Promtail is running (systemctl status promtail) and that the job label in your LogQL query matches the Promtail config. Test with curl directly against the Loki API.

Out of memory: Check RAM with free -h. If Ollama's model is consuming too much, set OLLAMA_KEEP_ALIVE=1m in the Ollama service override. Consider switching to gemma2:2b for lower RAM usage.

Discord/Slack alerts not arriving: Test the webhook URL with the curl commands from the alerting section. Check for HTTP errors in the analyzer logs: journalctl -u log-analyzer.service -n 50.

Slow inference: Verify your VPS has the expected CPU cores with nproc. Ollama uses all available cores for inference. If another process is consuming CPU, inference slows down. Check with top during a run.

JSON parsing errors: If model_validate_json fails, the model produced invalid JSON despite schema enforcement. This is rare but happens with certain edge cases. The script logs the error and continues to the next run. If it happens repeatedly, try switching models.

Check the analyzer logs:

journalctl -u log-analyzer.service -f

For Ollama-specific issues:

journalctl -u ollama.service -f

For the next step in the AIOps pipeline, see how to build automated remediation that acts on these alerts Build a Self-Healing VPS with Prometheus and Ollama. For alternative observability approaches, see Self-Host SigNoz or OpenObserve on a VPS: Datadog Alternatives Compared.


Copyright 2026 Virtua.Cloud. All rights reserved. This content is original work by the Virtua.Cloud team. Reproduction, republication, or redistribution without written permission is prohibited.