Why Windows Matter
Raw sensor readings are noisy. A single outlier should not wake an on-call engineer at 3 AM. Windows aggregate readings over time, smoothing transient spikes and exposing genuine trends. The Pyvorin Edge SDK provides two built-in window types—RollingWindow and TumblingWindow—plus an extension point for custom aggregation functions.
Window Types at a Glance
| Type | Overlap | Use Case |
|---|---|---|
| Rolling (Sliding) | Yes | Real-time anomaly detection where recent context matters. |
| Tumbling | No | Periodic reporting, billing intervals, or discrete snapshots. |
RollingWindow
A RollingWindow accumulates every reading whose timestamp falls within a fixed duration from the newest sample. Because the SDK does not auto-expire stale readings, you must call slide(cutoff_timestamp) manually or let the Pipeline hot loop manage it.
from pyvorin_edge.windows import RollingWindow
from pyvorin_edge.sensors import SensorReading
window = RollingWindow(duration_seconds=300.0, sensor_name="turbine_vibration")
# Ingest a batch
for i in range(20):
window.add_reading(SensorReading(
sensor_name="turbine_vibration",
timestamp=1717152000.0 + i * 30.0,
value=2.5 + (i % 3),
unit="mm/s",
))
print(f"Readings in window: {len(window)}")
print(f"Window full? {window.is_full()}") # True once span >= 300s
result = window.compute()
print(f"Mean: {result.mean:.2f}")
print(f"Max: {result.max:.2f}")
print(f"Min: {result.min:.2f}")
print(f"Std: {result.std:.2f}")
print(f"Sum: {result.sum:.2f}")
print(f"Count: {result.count}")
TumblingWindow
A TumblingWindow is non-overlapping. Its start time is anchored to the first reading it receives. Once the time span from that anchor exceeds duration_seconds, the window is considered full. After computing aggregates, call reset() to begin the next tumbling bucket.
from pyvorin_edge.windows import TumblingWindow
from pyvorin_edge.sensors import SensorReading
window = TumblingWindow(duration_seconds=60.0, sensor_name="flow_rate")
readings = [
SensorReading("flow_rate", 1717152000.0 + i * 10, 45.0 + i, "L/min")
for i in range(12)
]
for r in readings:
window.add_reading(r)
if window.is_full():
result = window.compute()
print(f"Bucket complete: mean={result.mean:.1f}, max={result.max:.1f}")
window.reset()
WindowResult
Both window types return a WindowResult dataclass with the following fields:
| Field | Type | Description |
|---|---|---|
sensor_name | str | None | Filter sensor, or None for all. |
window_type | str | "rolling" or "tumbling". |
duration_seconds | float | Configured window duration. |
count | int | Number of readings aggregated. |
mean | float | None | Arithmetic mean. |
max | float | None | Maximum value. |
min | float | None | Minimum value. |
std | float | None | Sample standard deviation (0.0 if count == 1). |
sum | float | None | Sum of all values. |
custom | dict | User-defined aggregation outputs. |
Aggregation Functions
Built-in aggregates are computed automatically by compute(). You do not pass the function name as a string; instead, the result object simply contains every built-in metric. If you only need one metric, ignore the rest—overhead is negligible for window sizes under 10,000 readings.
from pyvorin_edge.windows import RollingWindow
from pyvorin_edge.sensors import SensorReading
window = RollingWindow(duration_seconds=120.0)
values = [12.0, 15.0, 14.0, 18.0, 11.0]
for i, v in enumerate(values):
window.add_reading(SensorReading("demo", 1717152000.0 + i, v, "V"))
res = window.compute()
print(f"mean={res.mean}, max={res.max}, min={res.min}, "
f"sum={res.sum}, count={res.count}, std={res.std}")
# mean=14.0, max=18.0, min=11.0, sum=70.0, count=5, std=2.7386...
Custom Aggregation
For domain-specific metrics—such as vibration RMS, kurtosis, or energy cost estimates—supply a dictionary of callables to the window constructor. Each function receives a list[float] of windowed values and can return any JSON-serialisable type. Errors are caught and stored as strings in the custom dict so that one failing aggregation does not crash the entire window.
import math
from pyvorin_edge.windows import RollingWindow
from pyvorin_edge.sensors import SensorReading
def rms(values: list[float]) -> float:
"""Root-mean-square of a vibration signal."""
if not values:
return 0.0
return math.sqrt(sum(v * v for v in values) / len(values))
def cost_estimate(values: list[float]) -> float:
"""Rough energy cost at £0.15 per kWh assuming values are kW."""
if not values:
return 0.0
avg_kw = sum(values) / len(values)
hours = 1.0 # 1-hour window assumed
kwh = avg_kw * hours
return round(kwh * 0.15, 2)
window = RollingWindow(
duration_seconds=3600.0,
sensor_name="mains_power",
custom_aggregation={"rms": rms, "cost_gbp": cost_estimate},
)
for i in range(60):
window.add_reading(SensorReading(
sensor_name="mains_power",
timestamp=1717152000.0 + i * 60.0,
value=4.5 + math.sin(i / 10.0),
unit="kW",
))
result = window.compute()
print(result.custom)
# {'rms': 4.623..., 'cost_gbp': 0.69}
Session Windows
The SDK does not provide a dedicated SessionWindow class, but you can emulate session semantics with a RollingWindow and a gap-detection wrapper. A session ends when the gap between consecutive readings exceeds a threshold.
from pyvorin_edge.windows import RollingWindow
from pyvorin_edge.sensors import SensorReading
class SessionWindow:
def __init__(self, gap_seconds: float, sensor_name: str | None = None):
self.gap_seconds = gap_seconds
self.sensor_name = sensor_name
self.window = RollingWindow(duration_seconds=gap_seconds, sensor_name=sensor_name)
self._last_ts: float | None = None
def add_reading(self, reading: SensorReading) -> bool:
"""Returns True if a new session was started."""
if self._last_ts is not None and (reading.timestamp - self._last_ts) > self.gap_seconds:
self.window.reset()
new_session = True
else:
new_session = False
self.window.add_reading(reading)
self._last_ts = reading.timestamp
return new_session
def compute(self):
return self.window.compute()
session = SessionWindow(gap_seconds=300.0, sensor_name="pir_motion")
for i, ts in enumerate([0, 60, 120, 600, 660]):
reading = SensorReading("pir_motion", 1717152000.0 + ts, 1.0, "count")
if session.add_reading(reading):
print(f"New session started at reading {i}")
Window Lifecycle in a Pipeline
When you call pipeline.add_window(), the pipeline instantiates the concrete window object and stores it in _window_objects. During run(), every reading is appended to every window whose sensor_name matches (or is None, meaning "all sensors"). You do not need to manage slide() or reset() manually unless you are using windows outside of a pipeline.
from pyvorin_edge.pipeline import Pipeline, WindowConfig
from pyvorin_edge.sensors import Sensor, SensorType
pipeline = Pipeline("factory_floor")
pipeline.add_sensor(Sensor("press_force", SensorType.PRESSURE, "kN"))
pipeline.add_window(WindowConfig(duration_seconds=60.0, window_type="tumbling", sensor_name="press_force"))
pipeline.add_window(WindowConfig(duration_seconds=300.0, window_type="rolling", sensor_name="press_force"))
Performance Considerations
- Window memory grows unbounded until you call
reset()orslide(). In long-running edge runtimes, slide windows periodically. - Standard deviation uses
statistics.stdev(sample stddev, N-1 denominator). For populations, supply a custom aggregation. - Custom aggregations are executed synchronously inside
compute(). Keep them O(N) or offload heavy math to a background thread.