FileReplayAdapter — Time-Travel for Sensor Data

June 2, 2026 | 12 min read

What Is File Replay?

The FileReplayAdapter turns historical log files into live sensor streams. It reads JSON Lines (.jsonl) or comma-separated values (.csv), parses timestamps, normalises rows into SensorReading dicts, and optionally paces output according to the original inter-arrival times. This makes it the perfect tool for regression testing, demos, and pipeline development when hardware is offline.

Supported Formats

FormatExtensionRequirements
JSON Lines.jsonlEach line is a JSON object with sensor_name, timestamp, and value.
CSV.csvHeader row containing at least sensor_name, timestamp, and value.

Timestamp Parsing

The adapter accepts three timestamp representations and normalises them all to epoch seconds (float):

  • ISO 8601 strings — e.g., 2024-05-31T12:00:00Z or 2024-05-31T12:00:00+00:00
  • Epoch seconds — e.g., 1717152000.0
  • Epoch milliseconds — values greater than 1e11 are automatically divided by 1000

from pyv_edge_agent.ingest import FileReplayAdapter

# Direct parsing helper
print(FileReplayAdapter._parse_timestamp("2024-05-31T12:00:00Z"))       # 1717156800.0
print(FileReplayAdapter._parse_timestamp(1717152000))                   # 1717152000.0
print(FileReplayAdapter._parse_timestamp(1717152000000))                # 1717152000.0
  

Reading JSONL

read_jsonl() yields one SensorReading dict per line. It is a generator, so memory usage stays constant regardless of file size.


# /data/readings.jsonl
# {"sensor_name": "t1", "timestamp": "2024-05-31T12:00:00Z", "value": 22.5, "unit": "°C"}
# {"sensor_name": "t1", "timestamp": "2024-05-31T12:01:00Z", "value": 23.0, "unit": "°C"}

adapter = FileReplayAdapter()
for reading in adapter.read_jsonl("/data/readings.jsonl"):
    print(reading["timestamp"], reading["value"])
  

Reading CSV

read_csv() uses the standard library csv.DictReader. All values are coerced to strings first, then parsed to the correct types.


# /data/readings.csv
# sensor_name,timestamp,value,unit
# t1,2024-05-31T12:00:00Z,22.5,°C
# t1,2024-05-31T12:01:00Z,23.0,°C

for reading in adapter.read_csv("/data/readings.csv"):
    pipeline.run([reading])
  

Speed Control

The replay() method paces output according to the delta between consecutive timestamps. A speed_multiplier of 1.0 means real-time; 10.0 replays ten times faster. Values between 0 and 1 slow playback down.


import time

adapter = FileReplayAdapter()

# Real-time replay
start = time.monotonic()
for reading in adapter.replay("/data/readings.jsonl", speed_multiplier=1.0):
    pipeline.run([reading])
real_duration = time.monotonic() - start

# 10× accelerated — useful for CI
for reading in adapter.replay("/data/readings.jsonl", speed_multiplier=10.0):
    pipeline.run([reading])
  

Loop Mode

The adapter does not have a built-in loop flag, but you can wrap the generator in an infinite loop for kiosk demos or soak testing.


import itertools

def loop_replay(adapter: FileReplayAdapter, path: str, speed_multiplier: float = 1.0):
    """Replay a file forever."""
    while True:
        yield from adapter.replay(path, speed_multiplier)

# Demo kiosk: replay one hour of data in a 6-minute loop
for reading in loop_replay(adapter, "/data/one_hour.jsonl", speed_multiplier=10.0):
    update_dashboard(reading)
  

Validation

Before replaying, call validate_format() to catch missing columns, invalid JSON, or empty files. This is especially useful in CI pipelines where test data is generated by earlier build stages.


from pyv_edge_agent.ingest import FileReplayAdapter

try:
    FileReplayAdapter.validate_format("/data/readings.jsonl")
    print("Format OK")
except (FileNotFoundError, ValueError) as exc:
    print(f"Validation failed: {exc}")
  

Use Cases

Regression Testing

Capture a known-bad event sequence from production, save it as JSONL, and assert that your pipeline emits the expected events.


import pytest
from pyvorin_edge.pipeline import Pipeline
from pyv_edge_agent.ingest import FileReplayAdapter

def test_known_anomaly_pipeline():
    pipeline = build_production_pipeline()
    adapter = FileReplayAdapter()
    events = []
    for reading in adapter.replay("tests/fixtures/anomaly_2024-05-31.jsonl", speed_multiplier=100.0):
        result = pipeline.run([reading])
        events.extend(result.events)
    assert any(e.rule_name == "vibration_anomaly" for e in events)
  

Demos

Replay a rich dataset at 50× speed while a stakeholder watches the dashboard update in real time.


for reading in adapter.replay("demo_dataset.jsonl", speed_multiplier=50.0):
    websocket.broadcast(reading)
  

Load Testing

Disable pacing entirely by using read_jsonl() or read_csv() directly, then fire readings into the pipeline as fast as the CPU allows.


import time

adapter = FileReplayAdapter()
readings = list(adapter.read_jsonl("/data/large_file.jsonl"))

start = time.perf_counter()
result = pipeline.run(readings)
elapsed = time.perf_counter() - start

throughput = len(readings) / elapsed
print(f"Throughput: {throughput:.0f} readings/sec")
  

Metadata Handling

If the metadata column in a CSV is a JSON-encoded string, the adapter automatically decodes it. If decoding fails, the metadata falls back to an empty dict so the pipeline never crashes on malformed annotations.


# CSV row with JSON metadata string
# sensor_name,timestamp,value,unit,metadata
# t1,1717152000,22.5,°C,"{""calibrated"": true}"

reading = next(adapter.read_csv("/data/meta.csv"))
print(reading["metadata"])  # {'calibrated': True}
  

Best Practices

  • Sort files by timestamp before replaying; the pacing logic assumes monotonic order.
  • Keep files under 1 GB for real-time replay. For larger datasets, preprocess with read_jsonl() and batch into the pipeline.
  • Commit fixture files to version control alongside regression tests so that pipeline changes are always tested against the same ground truth.