"Structured Log Management and Shipping"

June 2, 2026 | 18 min read

Introduction

Logs at the edge are not an afterthought—they are evidence. When a temperature excursion triggers a critical alert, the log trail must be complete, structured, and tamper-evident. This article shows you how to integrate structlog with the Pyvorin Edge SDK, rotate logs safely on flash storage, inject correlation IDs across pipeline stages, and ship logs to Loki, Splunk, and CloudWatch without blocking the ingestion loop.

structlog Integration

The Edge Agent in edge_runtime/pyv_edge_agent/main.py uses the standard-library logging module with a custom JSON formatter. For richer structured logging, we recommend layering structlog on top. It preserves compatibility with the standard library while adding type-safe key-value rendering and processor pipelines.


pip install structlog
  

import logging
import structlog
from structlog.stdlib import LoggerFactory
from structlog.processors import TimeStamper, JSONRenderer, add_log_level

structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
        TimeStamper(fmt="iso"),
        JSONRenderer(),
    ],
    context_class=dict,
    logger_factory=LoggerFactory(),
    wrapper_class=structlog.stdlib.BoundLogger,
    cache_logger_on_first_use=True,
)

logger = structlog.get_logger("pyvorin.edge")
logger.info("pipeline_started", pipeline="precision_agriculture", version="2.1.0")
  

Output:


{"event": "pipeline_started", "pipeline": "precision_agriculture", "version": "2.1.0", "logger": "pyvorin.edge", "level": "info", "timestamp": "2024-05-30T08:37:37.951919Z"}
  

Log Levels

Pyvorin Edge uses the standard five log levels. In production, set the level to INFO; during commissioning, use DEBUG.

LevelWhen to Use
DEBUGSensor raw values, adapter state transitions, window buffer dumps.
INFOPipeline start/stop, rule firings, cloud flush completions.
WARNINGAdapter read failures, retry exhaustion, thermal throttling.
ERRORRule action exceptions, SQLite write errors, compiler bridge failures.
CRITICALUnrecoverable crashes, privacy chain tampering, disk full.

Log Rotation with RotatingFileHandler

Flash storage wears out with excessive writes. Never log continuously to the same file on an SD card. Use RotatingFileHandler with a modest maximum size and a small backup count. This caps total log disk usage and distributes writes across inodes.


import logging
import logging.handlers
from pathlib import Path

LOG_DIR = Path("/var/log/pyvorin")
LOG_DIR.mkdir(parents=True, exist_ok=True)

handler = logging.handlers.RotatingFileHandler(
    LOG_DIR / "edge.log",
    maxBytes=1_048_576,   # 1 MB per file
    backupCount=3,        # Keep edge.log, edge.log.1, edge.log.2, edge.log.3
)

formatter = logging.Formatter(
    "%(asctime)s %(levelname)s %(name)s: %(message)s"
)
handler.setFormatter(formatter)

root = logging.getLogger()
root.handlers = []
root.addHandler(handler)
root.setLevel(logging.INFO)
  

Shipping to Loki

Grafana Loki accepts logs via a JSON POST to /loki/api/v1/push. The snippet below reads the current log file and streams new lines. Run it as a systemd service or a sidecar container.


import json
import urllib.request
from pathlib import Path

LOKI_URL = "http://loki.local:3100/loki/api/v1/push"
LOG_FILE = Path("/var/log/pyvorin/edge.log")


def ship_to_loki(lines: list[str]):
    streams = [
        {
            "stream": {"job": "pyvorin-edge", "host": "rpi4-field-01"},
            "values": [
                [str(int(time.time() * 1e9)), line]
                for line in lines
            ],
        }
    ]
    payload = json.dumps({"streams": streams}).encode("utf-8")
    req = urllib.request.Request(
        LOKI_URL,
        data=payload,
        headers={"Content-Type": "application/json"},
        method="POST",
    )
    with urllib.request.urlopen(req, timeout=10) as resp:
        if resp.status != 204:
            raise RuntimeError(f"Loki returned {resp.status}")


# In production, tail the file with a library like `pytail` or `watchdog`
if LOG_FILE.exists():
    with open(LOG_FILE, "r", encoding="utf-8") as f:
        ship_to_loki(f.readlines()[-100:])  # Ship last 100 lines
  

Shipping to Splunk

Splunk's HTTP Event Collector (HEC) expects a simple JSON envelope with an event field and an optional sourcetype.


import json
import urllib.request

SPLUNK_HEC_URL = "https://splunk.local:8088/services/collector/event"
SPLUNK_TOKEN = "Splunk hec-token-here"


def ship_to_splunk(records: list[dict]):
    data = "\n".join(
        json.dumps({"event": rec, "sourcetype": "pyvorin:edge"})
        for rec in records
    ).encode("utf-8")

    req = urllib.request.Request(
        SPLUNK_HEC_URL,
        data=data,
        headers={
            "Authorization": SPLUNK_TOKEN,
            "Content-Type": "application/json",
        },
        method="POST",
    )
    with urllib.request.urlopen(req, timeout=10) as resp:
        if resp.status != 200:
            raise RuntimeError(f"Splunk returned {resp.status}")
  

Shipping to CloudWatch

AWS CloudWatch Logs requires the boto3 SDK. Because it is a heavy dependency, only install it if you are already running in an AWS environment.


import boto3
from botocore.exceptions import ClientError

LOG_GROUP = "/pyvorin/edge"
LOG_STREAM = "rpi4-field-01"

client = boto3.client("logs", region_name="eu-west-2")


def ship_to_cloudwatch(events: list[dict]):
    try:
        client.put_log_events(
            logGroupName=LOG_GROUP,
            logStreamName=LOG_STREAM,
            logEvents=[
                {
                    "timestamp": int(rec["timestamp"] * 1000),
                    "message": json.dumps(rec),
                }
                for rec in events
            ],
        )
    except ClientError as exc:
        print(f"CloudWatch error: {exc}")
  

Correlation IDs Across Pipeline Stages

When a reading flows from ingestion through windows, rules, and cloud sync, it is processed by multiple threads and potentially multiple processes. A correlation ID ties the entire journey together for debugging.


import uuid
import structlog
from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyvorin_edge.sensors import SensorReading

logger = structlog.get_logger("pyvorin.edge")


def run_pipeline_with_cid(pipeline: Pipeline, readings: list[SensorReading]):
    cid = str(uuid.uuid4())
    structlog.contextvars.bind_contextvars(correlation_id=cid)
    logger.info("pipeline_run_start", readings_count=len(readings))

    result = pipeline.run(readings)

    for ev in result.events:
        logger.warning(
            "rule_fired",
            rule=ev.rule_name,
            severity=ev.severity,
            correlation_id=cid,
        )

    logger.info("pipeline_run_end", events=len(result.events), correlation_id=cid)
    structlog.contextvars.clear_contextvars()
    return result
  

Complete Logging Configuration

Save the following as logging_config.py and import it at the top of your agent entrypoint. It sets up JSON rotation, structlog, and correlation-ID support in one place.


#!/usr/bin/env python3
"""Complete logging configuration for Pyvorin Edge."""

import logging
import logging.handlers
import structlog
from pathlib import Path

LOG_DIR = Path("/var/log/pyvorin")
LOG_DIR.mkdir(parents=True, exist_ok=True)


def configure_logging(level: str = "INFO") -> None:
    # Standard-library handler
    handler = logging.handlers.RotatingFileHandler(
        LOG_DIR / "edge.log",
        maxBytes=1_048_576,
        backupCount=3,
    )
    handler.setFormatter(
        logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s")
    )

    root = logging.getLogger()
    root.handlers = []
    root.addHandler(handler)
    root.setLevel(getattr(logging, level.upper(), logging.INFO))

    # structlog processors
    structlog.configure(
        processors=[
            structlog.stdlib.filter_by_level,
            structlog.stdlib.add_logger_name,
            structlog.stdlib.add_log_level,
            structlog.stdlib.PositionalArgumentsFormatter(),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.UnicodeDecoder(),
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.JSONRenderer(),
        ],
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        wrapper_class=structlog.stdlib.BoundLogger,
        cache_logger_on_first_use=True,
    )


if __name__ == "__main__":
    configure_logging("DEBUG")
    logger = structlog.get_logger("pyvorin.edge")
    logger.info("logging_configured", log_dir=str(LOG_DIR))
  

Summary

You now have a production-grade logging stack: structured JSON via structlog, flash-safe rotation via RotatingFileHandler, correlation IDs across pipeline stages, and sidecar shippers for Loki, Splunk, and CloudWatch. Keep the log volume low at the edge—ship summaries upstream and retain only DEBUG logs locally for 24 hours.