Window Aggregation — Rolling, Tumbling, and Custom Functions

June 2, 2026 | 14 min read

Why Windows Matter

Raw sensor readings are noisy. A single outlier should not wake an on-call engineer at 3 AM. Windows aggregate readings over time, smoothing transient spikes and exposing genuine trends. The Pyvorin Edge SDK provides two built-in window types—RollingWindow and TumblingWindow—plus an extension point for custom aggregation functions.

Window Types at a Glance

TypeOverlapUse Case
Rolling (Sliding)YesReal-time anomaly detection where recent context matters.
TumblingNoPeriodic reporting, billing intervals, or discrete snapshots.

RollingWindow

A RollingWindow accumulates every reading whose timestamp falls within a fixed duration from the newest sample. Because the SDK does not auto-expire stale readings, you must call slide(cutoff_timestamp) manually or let the Pipeline hot loop manage it.


from pyvorin_edge.windows import RollingWindow
from pyvorin_edge.sensors import SensorReading

window = RollingWindow(duration_seconds=300.0, sensor_name="turbine_vibration")

# Ingest a batch
for i in range(20):
    window.add_reading(SensorReading(
        sensor_name="turbine_vibration",
        timestamp=1717152000.0 + i * 30.0,
        value=2.5 + (i % 3),
        unit="mm/s",
    ))

print(f"Readings in window: {len(window)}")
print(f"Window full? {window.is_full()}")  # True once span >= 300s

result = window.compute()
print(f"Mean: {result.mean:.2f}")
print(f"Max:  {result.max:.2f}")
print(f"Min:  {result.min:.2f}")
print(f"Std:  {result.std:.2f}")
print(f"Sum:  {result.sum:.2f}")
print(f"Count: {result.count}")
  

TumblingWindow

A TumblingWindow is non-overlapping. Its start time is anchored to the first reading it receives. Once the time span from that anchor exceeds duration_seconds, the window is considered full. After computing aggregates, call reset() to begin the next tumbling bucket.


from pyvorin_edge.windows import TumblingWindow
from pyvorin_edge.sensors import SensorReading

window = TumblingWindow(duration_seconds=60.0, sensor_name="flow_rate")

readings = [
    SensorReading("flow_rate", 1717152000.0 + i * 10, 45.0 + i, "L/min")
    for i in range(12)
]

for r in readings:
    window.add_reading(r)
    if window.is_full():
        result = window.compute()
        print(f"Bucket complete: mean={result.mean:.1f}, max={result.max:.1f}")
        window.reset()
  

WindowResult

Both window types return a WindowResult dataclass with the following fields:

FieldTypeDescription
sensor_namestr | NoneFilter sensor, or None for all.
window_typestr"rolling" or "tumbling".
duration_secondsfloatConfigured window duration.
countintNumber of readings aggregated.
meanfloat | NoneArithmetic mean.
maxfloat | NoneMaximum value.
minfloat | NoneMinimum value.
stdfloat | NoneSample standard deviation (0.0 if count == 1).
sumfloat | NoneSum of all values.
customdictUser-defined aggregation outputs.

Aggregation Functions

Built-in aggregates are computed automatically by compute(). You do not pass the function name as a string; instead, the result object simply contains every built-in metric. If you only need one metric, ignore the rest—overhead is negligible for window sizes under 10,000 readings.


from pyvorin_edge.windows import RollingWindow
from pyvorin_edge.sensors import SensorReading

window = RollingWindow(duration_seconds=120.0)
values = [12.0, 15.0, 14.0, 18.0, 11.0]
for i, v in enumerate(values):
    window.add_reading(SensorReading("demo", 1717152000.0 + i, v, "V"))

res = window.compute()
print(f"mean={res.mean}, max={res.max}, min={res.min}, "
      f"sum={res.sum}, count={res.count}, std={res.std}")
# mean=14.0, max=18.0, min=11.0, sum=70.0, count=5, std=2.7386...
  

Custom Aggregation

For domain-specific metrics—such as vibration RMS, kurtosis, or energy cost estimates—supply a dictionary of callables to the window constructor. Each function receives a list[float] of windowed values and can return any JSON-serialisable type. Errors are caught and stored as strings in the custom dict so that one failing aggregation does not crash the entire window.


import math
from pyvorin_edge.windows import RollingWindow
from pyvorin_edge.sensors import SensorReading

def rms(values: list[float]) -> float:
    """Root-mean-square of a vibration signal."""
    if not values:
        return 0.0
    return math.sqrt(sum(v * v for v in values) / len(values))

def cost_estimate(values: list[float]) -> float:
    """Rough energy cost at £0.15 per kWh assuming values are kW."""
    if not values:
        return 0.0
    avg_kw = sum(values) / len(values)
    hours = 1.0  # 1-hour window assumed
    kwh = avg_kw * hours
    return round(kwh * 0.15, 2)

window = RollingWindow(
    duration_seconds=3600.0,
    sensor_name="mains_power",
    custom_aggregation={"rms": rms, "cost_gbp": cost_estimate},
)

for i in range(60):
    window.add_reading(SensorReading(
        sensor_name="mains_power",
        timestamp=1717152000.0 + i * 60.0,
        value=4.5 + math.sin(i / 10.0),
        unit="kW",
    ))

result = window.compute()
print(result.custom)
# {'rms': 4.623..., 'cost_gbp': 0.69}
  

Session Windows

The SDK does not provide a dedicated SessionWindow class, but you can emulate session semantics with a RollingWindow and a gap-detection wrapper. A session ends when the gap between consecutive readings exceeds a threshold.


from pyvorin_edge.windows import RollingWindow
from pyvorin_edge.sensors import SensorReading

class SessionWindow:
    def __init__(self, gap_seconds: float, sensor_name: str | None = None):
        self.gap_seconds = gap_seconds
        self.sensor_name = sensor_name
        self.window = RollingWindow(duration_seconds=gap_seconds, sensor_name=sensor_name)
        self._last_ts: float | None = None

    def add_reading(self, reading: SensorReading) -> bool:
        """Returns True if a new session was started."""
        if self._last_ts is not None and (reading.timestamp - self._last_ts) > self.gap_seconds:
            self.window.reset()
            new_session = True
        else:
            new_session = False
        self.window.add_reading(reading)
        self._last_ts = reading.timestamp
        return new_session

    def compute(self):
        return self.window.compute()

session = SessionWindow(gap_seconds=300.0, sensor_name="pir_motion")
for i, ts in enumerate([0, 60, 120, 600, 660]):
    reading = SensorReading("pir_motion", 1717152000.0 + ts, 1.0, "count")
    if session.add_reading(reading):
        print(f"New session started at reading {i}")
  

Window Lifecycle in a Pipeline

When you call pipeline.add_window(), the pipeline instantiates the concrete window object and stores it in _window_objects. During run(), every reading is appended to every window whose sensor_name matches (or is None, meaning "all sensors"). You do not need to manage slide() or reset() manually unless you are using windows outside of a pipeline.


from pyvorin_edge.pipeline import Pipeline, WindowConfig
from pyvorin_edge.sensors import Sensor, SensorType

pipeline = Pipeline("factory_floor")
pipeline.add_sensor(Sensor("press_force", SensorType.PRESSURE, "kN"))
pipeline.add_window(WindowConfig(duration_seconds=60.0,  window_type="tumbling", sensor_name="press_force"))
pipeline.add_window(WindowConfig(duration_seconds=300.0, window_type="rolling",  sensor_name="press_force"))
  

Performance Considerations

  • Window memory grows unbounded until you call reset() or slide(). In long-running edge runtimes, slide windows periodically.
  • Standard deviation uses statistics.stdev (sample stddev, N-1 denominator). For populations, supply a custom aggregation.
  • Custom aggregations are executed synchronously inside compute(). Keep them O(N) or offload heavy math to a background thread.