What Is File Replay?
The FileReplayAdapter turns historical log files into live sensor streams. It reads JSON Lines (.jsonl) or comma-separated values (.csv), parses timestamps, normalises rows into SensorReading dicts, and optionally paces output according to the original inter-arrival times. This makes it the perfect tool for regression testing, demos, and pipeline development when hardware is offline.
Supported Formats
| Format | Extension | Requirements |
|---|---|---|
| JSON Lines | .jsonl | Each line is a JSON object with sensor_name, timestamp, and value. |
| CSV | .csv | Header row containing at least sensor_name, timestamp, and value. |
Timestamp Parsing
The adapter accepts three timestamp representations and normalises them all to epoch seconds (float):
- ISO 8601 strings — e.g.,
2024-05-31T12:00:00Zor2024-05-31T12:00:00+00:00 - Epoch seconds — e.g.,
1717152000.0 - Epoch milliseconds — values greater than
1e11are automatically divided by 1000
from pyv_edge_agent.ingest import FileReplayAdapter
# Direct parsing helper
print(FileReplayAdapter._parse_timestamp("2024-05-31T12:00:00Z")) # 1717156800.0
print(FileReplayAdapter._parse_timestamp(1717152000)) # 1717152000.0
print(FileReplayAdapter._parse_timestamp(1717152000000)) # 1717152000.0
Reading JSONL
read_jsonl() yields one SensorReading dict per line. It is a generator, so memory usage stays constant regardless of file size.
# /data/readings.jsonl
# {"sensor_name": "t1", "timestamp": "2024-05-31T12:00:00Z", "value": 22.5, "unit": "°C"}
# {"sensor_name": "t1", "timestamp": "2024-05-31T12:01:00Z", "value": 23.0, "unit": "°C"}
adapter = FileReplayAdapter()
for reading in adapter.read_jsonl("/data/readings.jsonl"):
print(reading["timestamp"], reading["value"])
Reading CSV
read_csv() uses the standard library csv.DictReader. All values are coerced to strings first, then parsed to the correct types.
# /data/readings.csv
# sensor_name,timestamp,value,unit
# t1,2024-05-31T12:00:00Z,22.5,°C
# t1,2024-05-31T12:01:00Z,23.0,°C
for reading in adapter.read_csv("/data/readings.csv"):
pipeline.run([reading])
Speed Control
The replay() method paces output according to the delta between consecutive timestamps. A speed_multiplier of 1.0 means real-time; 10.0 replays ten times faster. Values between 0 and 1 slow playback down.
import time
adapter = FileReplayAdapter()
# Real-time replay
start = time.monotonic()
for reading in adapter.replay("/data/readings.jsonl", speed_multiplier=1.0):
pipeline.run([reading])
real_duration = time.monotonic() - start
# 10× accelerated — useful for CI
for reading in adapter.replay("/data/readings.jsonl", speed_multiplier=10.0):
pipeline.run([reading])
Loop Mode
The adapter does not have a built-in loop flag, but you can wrap the generator in an infinite loop for kiosk demos or soak testing.
import itertools
def loop_replay(adapter: FileReplayAdapter, path: str, speed_multiplier: float = 1.0):
"""Replay a file forever."""
while True:
yield from adapter.replay(path, speed_multiplier)
# Demo kiosk: replay one hour of data in a 6-minute loop
for reading in loop_replay(adapter, "/data/one_hour.jsonl", speed_multiplier=10.0):
update_dashboard(reading)
Validation
Before replaying, call validate_format() to catch missing columns, invalid JSON, or empty files. This is especially useful in CI pipelines where test data is generated by earlier build stages.
from pyv_edge_agent.ingest import FileReplayAdapter
try:
FileReplayAdapter.validate_format("/data/readings.jsonl")
print("Format OK")
except (FileNotFoundError, ValueError) as exc:
print(f"Validation failed: {exc}")
Use Cases
Regression Testing
Capture a known-bad event sequence from production, save it as JSONL, and assert that your pipeline emits the expected events.
import pytest
from pyvorin_edge.pipeline import Pipeline
from pyv_edge_agent.ingest import FileReplayAdapter
def test_known_anomaly_pipeline():
pipeline = build_production_pipeline()
adapter = FileReplayAdapter()
events = []
for reading in adapter.replay("tests/fixtures/anomaly_2024-05-31.jsonl", speed_multiplier=100.0):
result = pipeline.run([reading])
events.extend(result.events)
assert any(e.rule_name == "vibration_anomaly" for e in events)
Demos
Replay a rich dataset at 50× speed while a stakeholder watches the dashboard update in real time.
for reading in adapter.replay("demo_dataset.jsonl", speed_multiplier=50.0):
websocket.broadcast(reading)
Load Testing
Disable pacing entirely by using read_jsonl() or read_csv() directly, then fire readings into the pipeline as fast as the CPU allows.
import time
adapter = FileReplayAdapter()
readings = list(adapter.read_jsonl("/data/large_file.jsonl"))
start = time.perf_counter()
result = pipeline.run(readings)
elapsed = time.perf_counter() - start
throughput = len(readings) / elapsed
print(f"Throughput: {throughput:.0f} readings/sec")
Metadata Handling
If the metadata column in a CSV is a JSON-encoded string, the adapter automatically decodes it. If decoding fails, the metadata falls back to an empty dict so the pipeline never crashes on malformed annotations.
# CSV row with JSON metadata string
# sensor_name,timestamp,value,unit,metadata
# t1,1717152000,22.5,°C,"{""calibrated"": true}"
reading = next(adapter.read_csv("/data/meta.csv"))
print(reading["metadata"]) # {'calibrated': True}
Best Practices
- Sort files by timestamp before replaying; the pacing logic assumes monotonic order.
- Keep files under 1 GB for real-time replay. For larger datasets, preprocess with
read_jsonl()and batch into the pipeline. - Commit fixture files to version control alongside regression tests so that pipeline changes are always tested against the same ground truth.