Introduction
The EdgeAgent class is the heart of the Pyvorin Edge Runtime. It is not merely a script that polls sensors; it is a multi-threaded daemon that manages the entire data lifecycle from ingestion to storage to cloud synchronisation. This article explains every attribute, every thread, every lifecycle phase, and every design decision in plain English. By the end, you will know exactly what happens when you call agent.start() and why it matters.
Where EdgeAgent Lives
The class is defined in /var/www/pyvorin/edge_runtime/pyv_edge_agent/main.py. This is the entry point for both the pyv-edge CLI and programmatic usage. When you run pyv-edge --config config.toml, the main() function instantiates an EdgeAgent, registers signal handlers for graceful shutdown, and calls agent.start().
The Constructor
When you create an agent, it starts in a dormant state. No threads are spawned. No files are opened. No network connections are made. This lazy initialisation is deliberate: it lets you construct and configure the agent safely before committing resources.
from pyv_edge_agent.main import EdgeAgent
# Dormant state: no threads, no files, no sockets
agent = EdgeAgent(config_path="/home/pi/pyvorin-edge/config.toml")
# Inspect initial state
print(agent.is_running) # False
print(agent.health())
# {
# "running": False,
# "config_path": "/home/pi/pyvorin-edge/config.toml",
# "buffer_count": 0,
# "cloud_pending": 0,
# "privacy_enabled": True
# }
Key Attributes at Construction
| Attribute | Type | Purpose |
|---|---|---|
_config_path |
Optional[str] |
Path to the TOML/JSON config file |
_config |
Config |
Configuration object (initially empty defaults) |
_store |
Optional[SQLiteStore] |
SQLite evidence store (initially None) |
_privacy |
PrivacyPolicy |
Privacy firewall rules |
_cloud |
CloudSyncQueue |
Persistent upload queue |
_uploader |
Optional[HTTPCloudUploader] |
HTTP client for cloud batches |
_buffers |
Dict[str, RingBuffer] |
One ring buffer per configured device |
_adapters |
Dict[str, Any] |
Ingest adapter instances per device |
_readings_processed |
int |
Counter incremented per reading |
_events_triggered |
int |
Counter incremented when value > 30.0 |
_running |
bool |
Main loop control flag |
_shutdown_event |
threading.Event |
Thread-safe signal for graceful shutdown |
_thread |
Optional[threading.Thread] |
The main agent loop thread |
_health_server |
Optional[HTTPServer] |
Built-in HTTP health server |
_health_thread |
Optional[threading.Thread] |
Daemon thread serving health HTTP |
_system_metrics |
SystemMetrics |
Collector for CPU, RAM, disk, thermal |
Lifecycle Diagram
[Constructed] --start()--> [Starting]
|
v
[Config Loaded]
|
v
[Logging Setup]
|
v
[Components Init]
|
+-----------+-----------+
| |
v v
[Health Server] [Agent Thread]
| |
+-----------+-----------+
|
v
[Running]
|
+--------------+--------------+
| |
run_once() run_loop()
(single cycle) (poll interval wait)
| |
+--------------+--------------+
|
[Shutdown Signal]
|
v
[Stopping]
|
v
+--------------+--------------+
| |
stop() SIGINT/SIGTERM
| |
+--------------+--------------+
|
v
[Health Stop]
|
v
[Thread Join]
|
v
[Store Close]
|
v
[Stopped]
Phase 1: Starting
Calling agent.start() triggers a strict sequence. If the agent is already running, it logs a warning and returns immediately to prevent double initialisation.
1.1 Load Configuration
if self._config_path:
self._config = Config.from_file(self._config_path)
The Config.from_file() method reads TOML or JSON, substitutes environment variables of the form ${VAR} or ${VAR:-default}, and merges user settings over built-in defaults. If the file is missing, a FileNotFoundError is raised immediately so you know the problem is configuration, not runtime.
1.2 Setup Logging
The agent supports two log formats: JSON (default, for structured log aggregation) and plain text (for human readability during development). The root logger is reconfigured with a single StreamHandler writing to stdout. Any existing handlers are cleared to prevent duplicate output from third-party libraries.
log_cfg = self._config.get("logging", default={})
level = log_cfg.get("level", "INFO")
fmt = log_cfg.get("format", "json")
JSON format includes timestamp, level, logger name, message, and exception tracebacks if present. This makes it trivial to ship logs to Elasticsearch, Loki, or CloudWatch.
1.3 Initialise Components
_init_components() is where the heavy lifting happens. It creates, in order:
- SQLiteStore — a connection-pooled, WAL-mode SQLite database for readings, events, summaries, and audit logs.
- PrivacyPolicy — loaded from the
[privacy]config section. - CloudSyncQueue — loaded from the
[cloud]config section. - RingBuffers — one per device, configured with capacity and dtype from
[windows]. - Ingest Adapters — created based on each device's
ingest_type(simulator, mqtt, http, file). - HTTPCloudUploader — created only if
cloud.enabledis true and an endpoint is provided.
1.4 Start the Health Server
The health server is a minimal HTTP server using Python's built-in http.server module. It runs in a daemon thread so it does not block agent shutdown. It responds to:
GET /health— comprehensive status JSON including agent state, system metrics, cloud queue depth, privacy rules, and ingest adapters.GET /metrics— raw system metrics only (CPU, RAM, disk, thermal, uptime).OPTIONS *— CORS preflight responses.
The handler is implemented as _HealthHandler, a subclass of BaseHTTPRequestHandler. It holds a class-level reference to the agent instance so it can inspect live state without global variables.
class _HealthHandler(BaseHTTPRequestHandler):
agent_ref: Optional["EdgeAgent"] = None
def do_GET(self):
if self.path == "/health":
status = "healthy" if (self.agent_ref and self.agent_ref.is_running) else "unhealthy"
# ... builds comprehensive JSON ...
1.5 Start the Main Thread
Finally, the agent spawns its main loop thread as a daemon:
self._thread = threading.Thread(target=self.run_loop, daemon=True, name="edge-agent")
self._thread.start()
Daemons are chosen deliberately: if the main process exits unexpectedly, the threads terminate with it, preventing zombie processes.
Phase 2: Running
2.1 The Main Loop
run_loop() is deceptively simple:
def run_loop(self) -> None:
interval = float(self._config.get("sensors", "poll_interval_seconds", default=5.0))
while self._running and not self._shutdown_event.is_set():
try:
self.run_once()
except Exception:
logger.exception("Error in run_loop iteration")
self._shutdown_event.wait(timeout=interval)
It reads the configured poll interval (default 5 seconds), then repeats until shutdown. The wait uses threading.Event.wait() rather than time.sleep() so that stop() can wake it immediately instead of waiting for the sleep to finish.
2.2 A Single Iteration
run_once() performs the core data lifecycle:
- Collect. For each buffer (and therefore each device), the agent asks the corresponding adapter for a value. It tries multiple adapter methods in order:
read(),generate_batch(), andgenerate_reading(). - Wrap. The raw value is wrapped in a
SensorReadingdataclass with the current timestamp. - Filter. The reading is passed through
PrivacyPolicy.evaluate(). If the result isNone, the reading is dropped and never stored. - Buffer. If allowed, the reading is appended to the device's ring buffer.
- Store. The reading is inserted into the SQLite
readingstable. - Enqueue. The reading is serialised and added to the cloud sync queue.
- Event Detection. If the value exceeds 30.0, the
_events_triggeredcounter increments. (In production, you would configure explicit rules in[rules]instead of this hard-coded threshold.) - Flush.
CloudSyncQueue.maybe_flush()checks whether the uploader should send a batch. If cloud sync is disabled, items remain in the queue indefinitely until polled manually or the database is purged.
2.3 Adapter Resilience
Adapters are allowed to fail. If _get_reading_value() raises an exception, the agent logs a warning and returns 0.0. This means a flaky MQTT connection or a missing sensor does not crash the entire agent; it simply records a zero for that cycle and tries again next interval.
def _get_reading_value(self, name: str) -> float:
adapter = self._adapters.get(name)
if adapter is None:
return 0.0
try:
# Try read(), generate_batch(), generate_reading() in order
...
except Exception as exc:
logger.warning("Adapter read failed for %s: %s", name, exc)
return 0.0
Phase 3: Stopping
Calling agent.stop() (or sending SIGINT/SIGTERM) triggers a graceful shutdown:
- Set
_running = Falseand signal_shutdown_eventto wake the main loop immediately. - Shut down the health HTTP server and join its thread with a 2-second timeout.
- Join the main agent thread with a 10-second timeout.
- Close the SQLite store, which closes all pooled connections.
- Log "EdgeAgent stopped".
The 10-second timeout on the main thread is important. If run_once() is blocked on a slow adapter or database write, the agent will wait up to 10 seconds before forcefully continuing shutdown. This prevents indefinite hangs while still giving in-flight operations a chance to complete.
Context Manager Support
EdgeAgent implements __enter__ and __exit__, so you can use it with Python's with statement for automatic lifecycle management:
from pyv_edge_agent.main import EdgeAgent
with EdgeAgent(config_path="/home/pi/pyvorin-edge/config.toml") as agent:
# Agent is running. Do work here.
import time
time.sleep(30)
# On exit, agent.stop() is called automatically.
This pattern is excellent for unit tests and short-lived batch jobs because it guarantees cleanup even if an exception is raised.
Signal Handling
The CLI entry point registers handlers for both SIGINT (Ctrl+C) and SIGTERM (systemd stop, Docker stop):
signal.signal(signal.SIGTERM, lambda s, f: _signal_handler(agent, s, f))
signal.signal(signal.SIGINT, lambda s, f: _signal_handler(agent, s, f))
The handler logs the signal, calls agent.stop(), and exits the process with code 0. This makes Pyvorin Edge friendly to process supervisors like systemd and container orchestrators.
Thread Safety Summary
| Component | Thread Safety | Mechanism |
|---|---|---|
| RingBuffer | Thread-safe | threading.Lock around append/peek/pop |
| SQLiteStore | Thread-safe | Connection pool + per-thread connections |
| CloudSyncQueue | Thread-safe | threading.RLock around DB operations |
| PrivacyPolicy | Read-safe | Immutable rules list; no locks needed for reads |
| SystemMetrics | Thread-safe | threading.Lock around CPU delta state |
Summary
The EdgeAgent is a carefully orchestrated multi-threaded daemon. It loads configuration, sets up logging, initialises a SQLite store, creates ring buffers and ingest adapters, starts a health HTTP server, and enters a poll-evaluate-store-sync loop. It handles adapter failures gracefully, supports clean shutdown via signals or context managers, and every shared resource is protected by locks. Understanding this lifecycle is essential for debugging, extending, and operating Pyvorin Edge in production.