Fleet Management at Scale

June 2, 2026 | 26 min read

Introduction

One edge device is a project. One hundred edge devices is an operations problem. This article describes how to provision, monitor, configure, and roll back an entire fleet of Pyvorin Edge agents. All techniques are built on real SDK primitives: Config from /var/www/pyvorin/edge_runtime/pyv_edge_agent/config.py, CloudSyncQueue from /var/www/pyvorin/edge_runtime/pyv_edge_agent/cloud_sync/queue.py, and the /health endpoint served by _HealthHandler in /var/www/pyvorin/edge_runtime/pyv_edge_agent/main.py.

Device Provisioning

Every device needs a unique identity, an initial configuration, and a trust anchor before it can join the fleet. The provisioning process must be repeatable and automatable.

Device ID Generation

Use a collision-resistant identifier derived from hardware characteristics. The CPU serial number on Raspberry Pi is available via /proc/cpuinfo or vcgencmd. Combine it with a fleet prefix to create a human-readable ID:

import hashlib
import re
from pathlib import Path

def get_device_id(fleet_prefix: str = "FLEET") -> str:
    """Generate a stable device ID from hardware serial."""
    cpuinfo = Path("/proc/cpuinfo").read_text(encoding="utf-8")
    match = re.search(r"Serial\s*:\s*([0-9a-f]+)", cpuinfo)
    serial = match.group(1) if match else "unknown"
    # Short hash for readability
    short = hashlib.sha256(serial.encode()).hexdigest()[:12]
    return f"{fleet_prefix}-{short}"
  

Initial Config Template

The Config class loads TOML or JSON and substitutes environment variables. A provisioning script should render a template, substituting device-specific values:

from pathlib import Path
from string import Template

CONFIG_TEMPLATE = """
[sensors]
poll_interval_seconds = 5.0
devices = [
    { name = "temp-01", ingest_type = "simulator", sensor_type = "temperature", unit = "C" },
]

[cloud]
enabled = true
endpoint = "https://api.pyvorin.com/v1/ingest"
api_key = "${API_KEY}"
batch_size = 100
flush_interval_seconds = 60.0

[privacy]
enabled = true
rules = [
    { sensor_pattern = "*password*", action = "drop" },
    { sensor_pattern = "*email*", action = "hash" },
]

[logging]
level = "INFO"
format = "json"

[health]
port = 8080
enabled = true
"""

def render_config(device_id: str, api_key: str, output_path: Path) -> None:
    t = Template(CONFIG_TEMPLATE)
    rendered = t.substitute(DEVICE_ID=device_id, API_KEY=api_key)
    output_path.write_text(rendered, encoding="utf-8")
  

Trust Anchor Installation

During provisioning, write the fleet's Ed25519 public key to /opt/pyvorin-edge/trust_anchor.json so that BundleVerifier.load_trust_anchor() can validate OTA updates:

import json
from pathlib import Path

def install_trust_anchor(public_key_hex: str, base_dir: Path) -> None:
    anchor = {"public_key": public_key_hex}
    (base_dir / "trust_anchor.json").write_text(
        json.dumps(anchor, indent=2), encoding="utf-8"
    )
  

Configuration Drift Detection

Fleet devices diverge over time: manual edits, failed updates, and environment variable overrides all cause drift. Detect it before it becomes an incident.

Hash-Based Drift Detection

Compute a canonical hash of the active config file and compare it against the fleet's declared golden hash. The Config class can export its internal dictionary as JSON for hashing:

import hashlib
import json
from pathlib import Path
from pyv_edge_agent.config import Config

def config_hash(config_path: Path) -> str:
    """Return a canonical SHA-256 hash of the loaded config."""
    cfg = Config.from_file(config_path)
    # Sort keys for stability
    canonical = json.dumps(cfg.to_dict(), sort_keys=True, separators=(",", ":"))
    return hashlib.sha256(canonical.encode("utf-8")).hexdigest()

def check_drift(config_path: Path, golden_hash: str) -> bool:
    actual = config_hash(config_path)
    return actual == golden_hash
  

Automated Drift Alert

Run a cron job every hour that reports the config hash to the fleet controller. If the hash does not match the channel's golden config, trigger an alert:

#!/bin/bash
# /usr/local/bin/drift-check.sh

DEVICE_ID=$(cat /opt/pyvorin-edge/device_id)
CONFIG_HASH=$(python3 -c "from drift_lib import config_hash; print(config_hash('/home/pi/pyvorin-edge/config.toml'))")
GOLDEN_HASH=$(curl -s "https://fleet.pyvorin.com/v1/golden-hash?device_id=${DEVICE_ID}")

if [ "$CONFIG_HASH" != "$GOLDEN_HASH" ]; then
    logger -t pyvorin-drift "CONFIG DRIFT DETECTED on ${DEVICE_ID}"
    curl -s -X POST "https://fleet.pyvorin.com/v1/alerts" \
        -H "Content-Type: application/json" \
        -d "{\"device_id\":\"${DEVICE_ID}\",\"alert\":\"config_drift\",\"hash\":\"${CONFIG_HASH}\"}"
fi
  

Bulk Config Push

When a policy or endpoint changes, pushing the new config to every device one by one is not feasible. Two approaches solve this: SSH/rsync for small fleets, and MQTT broadcast for large fleets.

SSH + Rsync (Small Fleets, < 50 Devices)

For fleets where you control the network and have SSH key access, rsync is simple and reliable:

#!/bin/bash
# push-config.sh — run from your workstation

CONFIG_SOURCE="./golden-config.toml"
DEVICE_LIST="./devices.txt"
REMOTE_PATH="/home/pi/pyvorin-edge/config.toml"

while read -r device; do
    echo "Pushing to ${device}..."
    rsync -avz --checksum "${CONFIG_SOURCE}" "pi@${device}:${REMOTE_PATH}"
    ssh "pi@${device}" "sudo systemctl restart pyvorin-edge"
done < "${DEVICE_LIST}"
  

MQTT Broadcast (Large Fleets, 50+ Devices)

Each device subscribes to a fleet config topic. The controller publishes the new config payload once, and every device receives it:

import json
import paho.mqtt.publish as publish

FLEET_TOPIC = "pyvorin/fleet/config/update"

def broadcast_config(controller, new_config_path: str, version: str) -> None:
    payload = {
        "version": version,
        "timestamp": time.time(),
        "config_b64": base64.b64encode(Path(new_config_path).read_bytes()).decode(),
        "checksum_sha256": sha256_file(new_config_path),
    }
    publish.single(
        topic=FLEET_TOPIC,
        payload=json.dumps(payload),
        hostname=controller.mqtt_host,
        port=8883,
        tls={"ca_certs": controller.ca_path},
    )
  

On the device side, the MQTT adapter (or a dedicated fleet listener) receives the message, verifies the checksum, writes the config, and restarts the agent:

import json
import base64
import hashlib
from pathlib import Path

class FleetConfigListener:
    def __init__(self, base_dir: Path) -> None:
        self.base_dir = base_dir
        self.config_path = base_dir / "config.toml"

    def on_message(self, topic: str, payload: bytes) -> None:
        data = json.loads(payload)
        config_bytes = base64.b64decode(data["config_b64"])
        expected_hash = data["checksum_sha256"]
        actual_hash = hashlib.sha256(config_bytes).hexdigest()

        if actual_hash != expected_hash:
            logger.error("Config broadcast hash mismatch. Ignoring.")
            return

        # Atomic write
        temp = self.config_path.with_suffix(".tmp")
        temp.write_bytes(config_bytes)
        temp.replace(self.config_path)

        logger.info("Config updated to version %s. Restarting agent.", data["version"])
        os.system("sudo systemctl restart pyvorin-edge")
  

Fleet-Wide Rollback

When a bad update reaches the fleet, speed matters. The rollback command must reach every device within seconds, not minutes.

MQTT Rollback Command

Publish a retained message to a dedicated rollback topic. Devices act immediately:

import time
import paho.mqtt.publish as publish

ROLLBACK_TOPIC = "pyvorin/fleet/command/rollback"

def fleet_rollback(controller, target_version: str) -> None:
    command = {
        "command": "rollback",
        "target_version": target_version,
        "issued_at": time.time(),
        "nonce": secrets.token_hex(8),
    }
    publish.single(
        topic=ROLLBACK_TOPIC,
        payload=json.dumps(command),
        hostname=controller.mqtt_host,
        port=8883,
        retain=True,
        tls={"ca_certs": controller.ca_path},
    )
  

On the device, the listener checks the nonce to avoid duplicate actions and invokes the OTA rollback logic from the previous article:

class FleetCommandHandler:
    def __init__(self, updater: OTAUpdater) -> None:
        self.updater = updater
        self._seen_nonces: set[str] = set()

    def on_command(self, payload: bytes) -> None:
        cmd = json.loads(payload)
        nonce = cmd.get("nonce", "")
        if nonce in self._seen_nonces:
            return
        self._seen_nonces.add(nonce)

        if cmd["command"] == "rollback":
            target = cmd["target_version"]
            logger.warning("Fleet rollback command received: target %s", target)
            self.updater.rollback_to(target)
            os.system("sudo systemctl restart pyvorin-edge")
  

Fleet Health Aggregation

A fleet is only healthy when every device is healthy. The controller collects /health responses and computes fleet-wide statistics.

Device Health Client

Each device exposes GET /health, returning a JSON object built in main.py that includes agent state, system metrics, cloud queue depth, and privacy rule counts. A simple client polls this endpoint:

import requests
from typing import Any, Dict

def fetch_device_health(ip: str, port: int = 8080, timeout: float = 5.0) -> Dict[str, Any]:
    try:
        resp = requests.get(f"http://{ip}:{port}/health", timeout=timeout)
        resp.raise_for_status()
        return resp.json()
    except Exception as exc:
        return {"status": "unreachable", "error": str(exc)}
  

Fleet Manager Class

The following Python class tracks N devices, polls health in parallel, and exposes aggregate statistics:

from __future__ import annotations

import concurrent.futures
import logging
import statistics
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

import requests

logger = logging.getLogger(__name__)


@dataclass
class Device:
    device_id: str
    ip: str
    port: int = 8080
    last_seen: float = field(default_factory=time.time)
    health: Dict[str, Any] = field(default_factory=dict)
    healthy: bool = False


@dataclass
class FleetSnapshot:
    timestamp: float
    total_devices: int
    healthy_count: int
    unhealthy_count: int
    unreachable_count: int
    avg_cpu: Optional[float]
    avg_disk: Optional[float]
    max_queue_depth: int
    devices: List[Device]


class FleetManager:
    """Track health and configuration for a fleet of Edge devices."""

    def __init__(self, devices: List[Device], poll_timeout: float = 5.0) -> None:
        self.devices = {d.device_id: d for d in devices}
        self.poll_timeout = poll_timeout

    def _poll_one(self, device: Device) -> None:
        url = f"http://{device.ip}:{device.port}/health"
        try:
            resp = requests.get(url, timeout=self.poll_timeout)
            resp.raise_for_status()
            data = resp.json()
            device.health = data
            device.healthy = data.get("status") == "healthy" and data["agent"]["running"]
            device.last_seen = time.time()
        except Exception as exc:
            logger.warning("Health poll failed for %s: %s", device.device_id, exc)
            device.healthy = False
            device.health = {"status": "unreachable", "error": str(exc)}

    def poll_all(self, max_workers: int = 20) -> FleetSnapshot:
        """Poll every device in parallel and return a fleet snapshot."""
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as pool:
            pool.map(self._poll_one, self.devices.values())

        healthy = [d for d in self.devices.values() if d.healthy]
        unhealthy = [d for d in self.devices.values() if not d.healthy and d.health.get("status") != "unreachable"]
        unreachable = [d for d in self.devices.values() if d.health.get("status") == "unreachable"]

        cpu_values = [
            d.health["metrics"]["cpu_percent"]
            for d in self.devices.values()
            if "metrics" in d.health and "cpu_percent" in d.health["metrics"]
        ]
        disk_values = [
            d.health["metrics"]["disk_percent"]
            for d in self.devices.values()
            if "metrics" in d.health and "disk_percent" in d.health["metrics"]
        ]
        queue_values = [
            d.health["cloud"]["queue_depth"]
            for d in self.devices.values()
            if "cloud" in d.health and "queue_depth" in d.health["cloud"]
        ]

        return FleetSnapshot(
            timestamp=time.time(),
            total_devices=len(self.devices),
            healthy_count=len(healthy),
            unhealthy_count=len(unhealthy),
            unreachable_count=len(unreachable),
            avg_cpu=statistics.mean(cpu_values) if cpu_values else None,
            avg_disk=statistics.mean(disk_values) if disk_values else None,
            max_queue_depth=max(queue_values) if queue_values else 0,
            devices=list(self.devices.values()),
        )

    def devices_needing_attention(self, max_queue_depth: int = 5000, max_cpu: float = 90.0) -> List[Device]:
        """Return devices that exceed operational thresholds."""
        result: List[Device] = []
        for d in self.devices.values():
            if not d.healthy:
                result.append(d)
                continue
            metrics = d.health.get("metrics", {})
            cloud = d.health.get("cloud", {})
            if metrics.get("cpu_percent", 0) > max_cpu:
                result.append(d)
            elif cloud.get("queue_depth", 0) > max_queue_depth:
                result.append(d)
        return result

    def push_config(self, config_path: str, target_devices: Optional[List[str]] = None) -> Dict[str, str]:
        """Push a config file to devices via HTTP POST (if custom endpoint enabled)."""
        results: Dict[str, str] = {}
        ids = target_devices or list(self.devices.keys())
        for device_id in ids:
            device = self.devices.get(device_id)
            if device is None:
                results[device_id] = "unknown_device"
                continue
            url = f"http://{device.ip}:{device.port}/config/reload"
            try:
                with open(config_path, "rb") as f:
                    resp = requests.post(url, files={"config": f}, timeout=10)
                results[device_id] = "ok" if resp.status_code == 200 else f"http_{resp.status_code}"
            except Exception as exc:
                results[device_id] = f"error: {exc}"
        return results


# Example usage
if __name__ == "__main__":
    fleet = FleetManager(
        devices=[
            Device(device_id="FLEET-a1b2c3d4", ip="10.0.1.11"),
            Device(device_id="FLEET-e5f6g7h8", ip="10.0.1.12"),
            Device(device_id="FLEET-i9j0k1l2", ip="10.0.1.13"),
        ]
    )

    snapshot = fleet.poll_all()
    print(f"Fleet: {snapshot.healthy_count}/{snapshot.total_devices} healthy")
    print(f"Avg CPU: {snapshot.avg_cpu:.1f}%  Max queue: {snapshot.max_queue_depth}")

    for d in fleet.devices_needing_attention():
        print(f"Needs attention: {d.device_id} — {d.health.get('status')}")
  

Fleet Health Dashboard Metrics

Export FleetSnapshot to Prometheus for Grafana visualisation:

from prometheus_client import Gauge, CollectorRegistry, generate_latest

registry = CollectorRegistry()
fleet_healthy = Gauge("pyvorin_fleet_healthy", "Number of healthy devices", registry=registry)
fleet_unhealthy = Gauge("pyvorin_fleet_unhealthy", "Number of unhealthy devices", registry=registry)
fleet_avg_cpu = Gauge("pyvorin_fleet_avg_cpu", "Average CPU across fleet", registry=registry)

def export_metrics(snapshot: FleetSnapshot) -> bytes:
    fleet_healthy.set(snapshot.healthy_count)
    fleet_unhealthy.set(snapshot.unhealthy_count)
    if snapshot.avg_cpu is not None:
        fleet_avg_cpu.set(snapshot.avg_cpu)
    return generate_latest(registry)
  

Integration with CloudSyncQueue

The CloudSyncQueue in /var/www/pyvorin/edge_runtime/pyv_edge_agent/cloud_sync/queue.py persists outbound messages in SQLite. When the fleet manager detects a device with high queue depth, it often indicates a network partition or an expired API key. Use queue.get_stats() via the health endpoint to diagnose:

# Inside FleetManager._poll_one, the health JSON already contains:
#   cloud.queue_depth      -> pending_count()
#   cloud.last_flush_time  -> last_flush_time
#   cloud.messages_sent_today -> messages_sent_today

# You can also query queue stats directly if you have filesystem access:
from pyv_edge_agent.cloud_sync.queue import CloudSyncQueue

queue = CloudSyncQueue(db_path="/home/pi/pyvorin-edge/sync_queue.db")
stats = queue.get_stats()
print(f"Depth: {stats['depth']}  Retries: {stats['total_retries']}  Oldest: {stats['oldest_item_timestamp']}")
  

Summary

Fleet management for Pyvorin Edge rests on five capabilities: deterministic device provisioning, continuous configuration drift detection, scalable config distribution via MQTT or SSH, one-command fleet-wide rollback, and centralised health aggregation. The FleetManager class shown above provides a production-ready foundation. Integrate it with your existing monitoring stack, enforce golden configs per channel, and never deploy to 100% of the fleet without a canary phase.