"Tutorial: Smart Building Energy Monitor"

June 2, 2026 | 30 min read

Introduction

This tutorial walks you through building a complete smart building energy monitoring system using a Raspberry Pi 4, off-the-shelf temperature and humidity sensors, and a Modbus energy meter. By the end, you will have a running Pyvorin Edge pipeline that aggregates sensor data in rolling windows, evaluates safety rules locally, and uploads summaries to the cloud.

Hardware Setup

Raspberry Pi 4

Any Raspberry Pi 4 with 2 GB RAM or more is sufficient. Install Raspberry Pi OS Lite (64-bit) and enable SPI and I2C interfaces via raspi-config.

Sensors

  • Temperature / Humidity: DHT22 or BME280 connected via GPIO or I2C.
  • Energy Meter: SDM120-Modbus connected via USB-to-RS485 adapter.

Configuration (config.toml)

The Edge Agent reads its runtime configuration from config.toml. We define three sensors, a 5-minute rolling window, and two rules: overtemperature and high humidity.


[sensors]
poll_interval_seconds = 5

[[sensors.devices]]
name = "lobby_temp"
ingest_type = "simulator"
sensor_type = "temperature"
unit = "C"
min_value = 18.0
max_value = 32.0
baseline = 24.0
noise_std = 0.5

[[sensors.devices]]
name = "lobby_hum"
ingest_type = "simulator"
sensor_type = "humidity"
unit = "%"
min_value = 30.0
max_value = 80.0
baseline = 55.0
noise_std = 2.0

[[sensors.devices]]
name = "main_meter"
ingest_type = "simulator"
sensor_type = "current"
unit = "A"
min_value = 0.0
max_value = 100.0
baseline = 15.0
noise_std = 1.0

[windows]
default_size = 100
default_dtype = "float"

[cloud]
enabled = true
endpoint = "https://api.pyvorin.com/v1/ingest"
api_key = "${PYVORIN_API_KEY}"
timeout = 30.0

[health]
enabled = true
port = 8080

[logging]
level = "INFO"
format = "json"
  

Pipeline Script

The pipeline is declared in Python using the Pipeline class from edge_sdk/pyvorin_edge/pipeline.py. We add sensors, a RollingWindow (defined in edge_sdk/pyvorin_edge/windows.py), and RuleConfig objects for local rule evaluation.

Imports and Setup


#!/usr/bin/env python3
"""Smart Building Energy Monitor — Pyvorin Edge Pipeline"""

import time
from pyvorin_edge.pipeline import Pipeline, WindowConfig, RuleConfig
from pyvorin_edge.windows import RollingWindow
from pyvorin_edge.sensors import Sensor, SensorType, SensorReading
from pyvorin_edge.policies import CloudPolicy
from pyvorin_edge.reports import ReportBuilder

pipeline = Pipeline(name="smart_building")
  

Sensor Registration


# Register sensors with normal operating ranges
pipeline.add_sensor(
    Sensor(
        name="lobby_temp",
        sensor_type=SensorType.TEMPERATURE,
        unit="C",
        normal_range=(18.0, 32.0),
        location="Lobby",
    )
)
pipeline.add_sensor(
    Sensor(
        name="lobby_hum",
        sensor_type=SensorType.HUMIDITY,
        unit="%",
        normal_range=(30.0, 80.0),
        location="Lobby",
    )
)
pipeline.add_sensor(
    Sensor(
        name="main_meter",
        sensor_type=SensorType.CURRENT,
        unit="A",
        normal_range=(0.0, 100.0),
        location="Main_Distribution",
    )
)
  

Windows and Rules


# 5-minute rolling windows for trend analysis
pipeline.add_window(
    WindowConfig(
        duration_seconds=300.0,
        sensor_name="lobby_temp",
        window_type="rolling",
    )
)
pipeline.add_window(
    WindowConfig(
        duration_seconds=300.0,
        sensor_name="lobby_hum",
        window_type="rolling",
    )
)

# Local rules with cooldown to prevent alert spam
pipeline.add_rule(
    RuleConfig(
        name="overtemp",
        condition_expr="ctx.value > 28.0",
        severity="warning",
        cooldown_seconds=300.0,
    )
)
pipeline.add_rule(
    RuleConfig(
        name="high_humidity",
        condition_expr="ctx.value > 65.0",
        severity="warning",
        cooldown_seconds=300.0,
    )
)
  

Cloud Policy


# Upload aggregated summaries every 5 minutes
pipeline.add_policy(
    CloudPolicy(
        endpoint_url="https://api.pyvorin.com/v1/ingest",
        protocol="mqtt",
        max_messages_per_minute=10,
        summary_interval_seconds=300.0,
    )
)
  

Running the Pipeline


def main():
    # Simulate a burst of readings (in production these come from adapters)
    now = time.time()
    readings = [
        SensorReading(sensor_name="lobby_temp", timestamp=now, value=26.5, unit="C"),
        SensorReading(sensor_name="lobby_hum", timestamp=now, value=68.0, unit="%"),
        SensorReading(sensor_name="main_meter", timestamp=now, value=22.0, unit="A"),
    ]

    result = pipeline.run(readings)
    print(f"Processed {result.readings_processed} readings")
    print(f"Events: {len(result.events)}")
    for ev in result.events:
        print(f"  [{ev.severity}] {ev.rule_name}: {ev.message}")

    # Generate a health report
    # Direct RollingWindow usage for custom aggregation
    from pyvorin_edge.windows import RollingWindow
    temp_window = RollingWindow(duration_seconds=300.0, sensor_name="lobby_temp")
    for r in readings:
        if r.sensor_name == "lobby_temp":
            temp_window.add_reading(r)
    window_result = temp_window.compute()
    print(f"Window mean temp: {window_result.mean:.2f} °C")

    report = ReportBuilder(pipeline_name="smart_building")
    report.add_health_data(pipeline.sensors, result.events)
    print(report.to_json())


if __name__ == "__main__":
    main()
  

Cloud Sync and Upload

The Edge Agent uses CloudSyncQueue (defined in edge_runtime/pyv_edge_agent/cloud_sync/queue.py) to persist summaries locally before uploading. If connectivity is lost, items remain in the SQLite-backed queue until the link recovers.


from pyv_edge_agent.cloud_sync.queue import CloudSyncQueue, Priority

queue = CloudSyncQueue(db_path="sync_queue.db")
summary = {
    "pipeline": "smart_building",
    "timestamp": time.time(),
    "window_mean_temp": 24.3,
    "window_mean_hum": 56.1,
}

queue.enqueue(summary, priority=Priority.TELEMETRY, ttl_seconds=86400)
print(f"Queue depth: {queue.pending_count()}")
  

Dashboard Walkthrough

With the health server enabled (port 8080), two endpoints are available on the device:

  • /health — Agent status, metrics, buffer counts, and cloud queue depth (populated by EdgeAgent in edge_runtime/pyv_edge_agent/main.py).
  • /metrics — System metrics (CPU, RAM, disk, thermal) collected by SystemMetrics in edge_runtime/pyv_edge_agent/health_monitor/metrics.py.

curl -s http://localhost:8080/health | python3 -m json.tool
curl -s http://localhost:8080/metrics | python3 -m json.tool
  

Point Grafana or any JSON-capable dashboard at these endpoints to visualise real-time building health. The /health response includes nested objects for agent, cloud, privacy, and ingest state.

Full Working Example

Save the following as pipeline.py alongside the config.toml shown earlier. Running python3 pipeline.py executes the complete pipeline end-to-end.


#!/usr/bin/env python3
"""Smart Building Energy Monitor — complete working example."""

import time
from pyvorin_edge.pipeline import Pipeline, WindowConfig, RuleConfig
from pyvorin_edge.windows import RollingWindow
from pyvorin_edge.sensors import Sensor, SensorType, SensorReading
from pyvorin_edge.policies import CloudPolicy
from pyvorin_edge.reports import ReportBuilder

pipeline = Pipeline(name="smart_building")

# Sensors
pipeline.add_sensor(
    Sensor(name="lobby_temp", sensor_type=SensorType.TEMPERATURE, unit="C",
           normal_range=(18.0, 32.0), location="Lobby")
)
pipeline.add_sensor(
    Sensor(name="lobby_hum", sensor_type=SensorType.HUMIDITY, unit="%",
           normal_range=(30.0, 80.0), location="Lobby")
)
pipeline.add_sensor(
    Sensor(name="main_meter", sensor_type=SensorType.CURRENT, unit="A",
           normal_range=(0.0, 100.0), location="Main_Distribution")
)

# Windows
pipeline.add_window(
    WindowConfig(duration_seconds=300.0, sensor_name="lobby_temp", window_type="rolling")
)
pipeline.add_window(
    WindowConfig(duration_seconds=300.0, sensor_name="lobby_hum", window_type="rolling")
)

# Rules
pipeline.add_rule(
    RuleConfig(name="overtemp", condition_expr="ctx.value > 28.0",
               severity="warning", cooldown_seconds=300.0)
)
pipeline.add_rule(
    RuleConfig(name="high_humidity", condition_expr="ctx.value > 65.0",
               severity="warning", cooldown_seconds=300.0)
)

# Cloud policy
pipeline.add_policy(
    CloudPolicy(
        endpoint_url="https://api.pyvorin.com/v1/ingest",
        protocol="mqtt",
        max_messages_per_minute=10,
        summary_interval_seconds=300.0,
    )
)


def main():
    now = time.time()
    readings = [
        SensorReading(sensor_name="lobby_temp", timestamp=now, value=26.5, unit="C"),
        SensorReading(sensor_name="lobby_hum", timestamp=now, value=68.0, unit="%"),
        SensorReading(sensor_name="main_meter", timestamp=now, value=22.0, unit="A"),
    ]
    result = pipeline.run(readings)
    print(f"Processed {result.readings_processed} readings, {len(result.events)} events")
    for ev in result.events:
        print(f"  [{ev.severity}] {ev.rule_name}")

    # Direct RollingWindow usage for custom aggregation
    from pyvorin_edge.windows import RollingWindow
    temp_window = RollingWindow(duration_seconds=300.0, sensor_name="lobby_temp")
    for r in readings:
        if r.sensor_name == "lobby_temp":
            temp_window.add_reading(r)
    window_result = temp_window.compute()
    print(f"Window mean temp: {window_result.mean:.2f} °C")

    report = ReportBuilder(pipeline_name="smart_building")
    report.add_health_data(pipeline.sensors, result.events)
    print(report.to_json())


if __name__ == "__main__":
    main()
  

Next Steps

Now that you have a working pipeline, try:

  • Replacing the simulator adapters with real I2C or Modbus adapters.
  • Adding a TumblingWindow for hourly energy summaries.
  • Compiling rule conditions to native code with CompilerBridge (see the next tutorial in this series).