Why Edge Code Demands Rigorous Testing
A bug in a cloud service can be patched in seconds and redeployed globally in minutes. A bug in an edge pipeline can brick a thousand devices, corrupt months of audit logs, or leak patient data from a hospital gateway that has no on-site IT staff. Testing is not optional; it is the price of admission for production edge deployments.
This article covers the full testing pyramid for Pyvorin Edge: fast unit tests with pytest,
adapter mocking with SimulatorAdapter, property-based testing with Hypothesis,
end-to-end integration tests, and a complete GitHub Actions CI/CD pipeline with coverage
gates.
Unit Testing Pipelines with pytest
The pipeline engine in pyv_edge_agent is pure Python functions that transform
SensorReading dicts. This makes them ideal for unit testing: no hardware, no
network, no database required.
Project Test Layout
edge_project/
├── pyproject.toml
├── src/
│ └── my_pipeline/
│ ├── __init__.py
│ ├── filters.py
│ └── aggregates.py
├── tests/
│ ├── unit/
│ │ ├── test_filters.py
│ │ └── test_aggregates.py
│ ├── integration/
│ │ └── test_full_pipeline.py
│ └── conftest.py
└── .github/
└── workflows/
└── ci.yml
Testing a Privacy Filter
# tests/unit/test_filters.py
import pytest
from my_pipeline.filters import redact_phi, mask_email
class TestRedactPHI:
def test_redacts_patient_id(self):
reading = {
"sensor_name": "bp_cuff_01",
"timestamp": 1700000000.0,
"value": 120.0,
"unit": "mmHg",
"metadata": {"patient_id": "P-8842", "room": "ICU-3"},
}
result = redact_phi(reading)
assert "patient_id" not in result["metadata"]
assert result["metadata"]["room"] == "ICU-3"
def test_leaves_non_phi_intact(self):
reading = {
"sensor_name": "temp_lobby",
"timestamp": 1700000000.0,
"value": 22.5,
"unit": "°C",
"metadata": {"floor": "2", "zone": "north"},
}
result = redact_phi(reading)
assert result["metadata"] == {"floor": "2", "zone": "north"}
class TestMaskEmail:
def test_masks_long_email(self):
assert mask_email("alice.smith@hospital.org") == "al****org"
def test_masks_short_email(self):
assert mask_email("a@b.co") == "****"
Testing an Aggregate Function
# tests/unit/test_aggregates.py
import pytest
from my_pipeline.aggregates import windowed_average
class TestWindowedAverage:
def test_basic_average(self):
readings = [
{"value": 10.0},
{"value": 20.0},
{"value": 30.0},
]
assert windowed_average(readings, window_size=3) == [20.0]
def test_sliding_window(self):
readings = [
{"value": 10.0},
{"value": 20.0},
{"value": 30.0},
{"value": 40.0},
]
result = windowed_average(readings, window_size=2)
assert result == [15.0, 25.0, 35.0]
def test_empty_input(self):
assert windowed_average([], window_size=3) == []
def test_window_larger_than_input(self):
readings = [{"value": 5.0}]
assert windowed_average(readings, window_size=5) == []
Mocking Adapters with SimulatorAdapter
Real sensors require hardware. For CI and local development, use the SimulatorAdapter
in pyv_edge_agent/ingest/simulator_adapter.py to generate realistic readings with
configurable noise, drift, and anomaly injection.
# tests/conftest.py
import pytest
from pyv_edge_agent.ingest.simulator_adapter import SimulatorAdapter, SensorConfig, SensorType
@pytest.fixture
def temp_adapter() -> SimulatorAdapter:
sensors = [
SensorConfig(
name="temp_kitchen",
sensor_type=SensorType.TEMPERATURE,
unit="°C",
min_value=-10.0,
max_value=50.0,
noise_std=0.3,
baseline=21.0,
),
SensorConfig(
name="humidity_kitchen",
sensor_type=SensorType.HUMIDITY,
unit="%RH",
min_value=0.0,
max_value=100.0,
noise_std=1.0,
baseline=45.0,
),
]
return SimulatorAdapter(sensors=sensors, seed=42, anomaly_probability=0.0)
@pytest.fixture
def leaky_adapter() -> SimulatorAdapter:
sensors = [
SensorConfig(
name="leak_sensor",
sensor_type=SensorType.LEAK,
unit="boolean",
min_value=0.0,
max_value=1.0,
baseline=0.0,
),
]
return SimulatorAdapter(sensors=sensors, seed=99, scenario="leak_event")
# tests/unit/test_pipeline_with_simulator.py
import pytest
from my_pipeline.pipeline import process_batch
class TestPipelineWithSimulator:
def test_process_batch_returns_expected_shape(self, temp_adapter):
readings = temp_adapter.generate_batch(
duration_seconds=10.0,
sample_rate_hz=1.0,
)
# readings is a list of frames; flatten for the pipeline
flat = [r for frame in readings for r in frame]
result = process_batch(flat)
assert "avg_temperature" in result
assert "avg_humidity" in result
assert result["count"] == len(flat)
def test_anomaly_detection_triggers_event(self, temp_adapter):
temp_adapter.anomaly_probability = 1.0 # Force anomalies
readings = temp_adapter.generate_batch(
duration_seconds=5.0,
sample_rate_hz=1.0,
)
flat = [r for frame in readings for r in frame]
result = process_batch(flat)
assert result["anomaly_count"] > 0
def test_leak_scenario_detected(self, leaky_adapter):
readings = leaky_adapter.generate_batch(
duration_seconds=3600.0,
sample_rate_hz=1.0 / 60.0, # one per minute
sensor_names=["leak_sensor"],
)
flat = [r for frame in readings for r in frame]
leak_frames = [r for r in flat if r["value"] is True]
assert len(leak_frames) > 0
Property-Based Testing with Hypothesis
Unit tests verify specific examples. Property-based tests verify invariants across thousands of randomly generated inputs. The Hypothesis library is excellent for finding edge cases in pipeline logic.
# tests/unit/test_properties.py
import pytest
from hypothesis import given, strategies as st
from my_pipeline.filters import clamp_value
from my_pipeline.aggregates import windowed_average
class TestClampValueProperties:
@given(
st.floats(allow_nan=False, allow_infinity=False),
st.floats(allow_nan=False, allow_infinity=False),
st.floats(allow_nan=False, allow_infinity=False),
)
def test_clamp_always_within_bounds(self, value, lo, hi):
if lo > hi:
lo, hi = hi, lo # Normalise bounds
result = clamp_value(value, lo, hi)
assert lo <= result <= hi
@given(st.floats(), st.floats())
def test_clamp_is_idempotent(self, value, bound):
lo = min(value, bound)
hi = max(value, bound)
first = clamp_value(value, lo, hi)
second = clamp_value(first, lo, hi)
assert first == second
class TestWindowedAverageProperties:
@given(st.lists(st.floats(min_value=-1e6, max_value=1e6), min_size=1))
def test_average_within_min_max(self, values):
readings = [{"value": v} for v in values]
result = windowed_average(readings, window_size=len(values))
assert len(result) == 1
assert min(values) <= result[0] <= max(values)
@given(st.lists(st.floats(), min_size=0, max_size=100))
def test_window_size_one_preserves_values(self, values):
readings = [{"value": v} for v in values]
result = windowed_average(readings, window_size=1)
assert result == pytest.approx(values)
Integration Tests — Full Pipeline End-to-End
Integration tests verify that the pipeline, store, queue, and privacy engine work together correctly. They run against a real SQLite database (in a temporary directory) but still avoid hardware and cloud dependencies.
# tests/integration/test_full_pipeline.py
import json
import tempfile
from pathlib import Path
import pytest
from pyv_edge_agent.local_store.sqlite_store import SQLiteStore
from pyv_edge_agent.cloud_sync.queue import CloudSyncQueue, Priority
from pyv_edge_agent.privacy_firewall.policy import PrivacyPolicyEngine, PrivacyRuleset
from pyv_edge_agent.ingest.simulator_adapter import SimulatorAdapter, SensorConfig, SensorType
from my_pipeline.pipeline import run_pipeline
class TestFullPipeline:
def test_end_to_end(self):
with tempfile.TemporaryDirectory() as tmp:
db_path = Path(tmp) / "edge_store.db"
queue_path = Path(tmp) / "sync_queue.db"
store = SQLiteStore(db_path=str(db_path))
queue = CloudSyncQueue(db_path=str(queue_path))
ruleset = PrivacyRuleset(
redact_fields=["patient_id"],
hash_fields=["device_mac"],
)
privacy = PrivacyPolicyEngine(ruleset=ruleset)
adapter = SimulatorAdapter(
sensors=[
SensorConfig(
name="temp_icu",
sensor_type=SensorType.TEMPERATURE,
unit="°C",
min_value=15.0,
max_value=30.0,
noise_std=0.2,
baseline=22.0,
),
],
seed=123,
)
# Generate readings and run the full pipeline
readings = adapter.generate_batch(
duration_seconds=60.0,
sample_rate_hz=1.0,
)
flat = [r for frame in readings for r in frame]
result = run_pipeline(
readings=flat,
store=store,
queue=queue,
privacy=privacy,
)
# Assertions
assert result["processed"] == len(flat)
assert store.get_stats()["total_rows"] > 0
assert queue.pending_count() > 0
# Verify privacy was applied
sample_event = json.loads(queue.peek(n=1)[0].payload)
if "metadata" in sample_event:
assert "patient_id" not in sample_event["metadata"]
store.close()
CI/CD Pipeline with GitHub Actions
Edge code should be tested on every commit. The following GitHub Actions workflow runs unit tests, property-based tests, integration tests, coverage analysis, and linting.
# .github/workflows/ci.yml
name: Pyvorin Edge CI
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e ".[dev]"
- name: Run unit tests
run: pytest tests/unit -v --cov=my_pipeline --cov-report=xml
- name: Run property-based tests
run: pytest tests/unit/test_properties.py -v --hypothesis-seed=0
- name: Run integration tests
run: pytest tests/integration -v
- name: Upload coverage
uses: codecov/codecov-action@v4
with:
files: ./coverage.xml
fail_ci_if_error: true
- name: Lint with ruff
run: ruff check src tests
- name: Type check with mypy
run: mypy src
- name: Check formatting
run: ruff format --check src tests
Code Coverage and Quality Gates
Coverage alone is a weak metric — 100% coverage with no assertions is worthless. Use coverage as a gate, not a goal, and pair it with mutation testing for critical paths.
Coverage Configuration (pyproject.toml)
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-ra -q --strict-markers"
markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
"integration: marks tests as integration tests",
]
[tool.coverage.run]
source = ["src/my_pipeline"]
branch = true
omit = [
"*/tests/*",
"*/__main__.py",
]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"def __repr__",
"raise AssertionError",
"raise NotImplementedError",
]
fail_under = 85
Mutation Testing with mutmut
# Install
pip install mutmut
# Run mutation testing on the pipeline module
mutmut run --paths-to-mutate=src/my_pipeline
# Show surviving mutants (these indicate weak tests)
mutmut results
# Generate HTML report
mutmut html
Complete Test File Examples
Here is a single consolidated test file that exercises the privacy engine, SQLite store, and queue together. Use it as a template for your own integration tests.
# tests/integration/test_privacy_store_queue.py
import tempfile
from pathlib import Path
import pytest
from pyv_edge_agent.local_store.sqlite_store import SQLiteStore
from pyv_edge_agent.cloud_sync.queue import CloudSyncQueue, Priority
from pyv_edge_agent.privacy_firewall.policy import PrivacyPolicyEngine, PrivacyRuleset
from pyv_edge_agent.types import SensorReading
class TestPrivacyStoreQueueIntegration:
@pytest.fixture
def store(self):
with tempfile.TemporaryDirectory() as tmp:
db = Path(tmp) / "store.db"
s = SQLiteStore(db_path=str(db))
yield s
s.close()
@pytest.fixture
def queue(self):
with tempfile.TemporaryDirectory() as tmp:
db = Path(tmp) / "queue.db"
q = CloudSyncQueue(db_path=str(db))
yield q
@pytest.fixture
def privacy(self):
ruleset = PrivacyRuleset(
redact_fields=["ssn"],
mask_fields=["phone"],
hash_fields=["email"],
drop_fields=["raw_image"],
)
return PrivacyPolicyEngine(ruleset=ruleset)
def test_full_flow(self, store, queue, privacy):
reading = SensorReading(
sensor_name="camera_entrance",
timestamp=1700000000.0,
value=1,
unit="frame",
metadata={
"ssn": "123-45-6789",
"phone": "+1-555-0199",
"email": "alice@example.com",
"raw_image": "",
"confidence": 0.97,
},
)
# Apply privacy rules
processed = privacy.apply(reading)
assert processed is not None
assert "ssn" not in processed.metadata
assert processed.metadata["phone"] == "+1****99"
assert len(processed.metadata["email"]) == 64 # SHA-256 hex
assert "raw_image" not in processed.metadata
assert processed.metadata["confidence"] == 0.97
# Store locally
row_id = store.store_reading(processed)
assert row_id > 0
# Enqueue for cloud sync
queue_id = queue.enqueue(
payload={
"sensor_name": processed.sensor_name,
"timestamp": processed.timestamp,
"value": processed.value,
"unit": processed.unit,
"metadata": processed.metadata,
},
priority=Priority.TELEMETRY,
)
assert queue_id > 0
assert queue.pending_count() == 1
# Dequeue and ack
items = queue.dequeue(batch_size=10)
assert len(items) == 1
queue.ack([items[0].id])
assert queue.pending_count() == 0
Operational Best Practices
- Keep unit tests under 100 ms each. Slow tests are not run. Use
SimulatorAdapterwith smallduration_secondsand highsample_rate_hzfor fast data generation. - Tag slow tests. Use
@pytest.mark.slowand skip them locally withpytest -m "not slow". - Test privacy rules exhaustively. Every field in
PrivacyRulesetshould have at least one test for redaction, masking, hashing, dropping, and local-only behaviour. - Use tmp_path for databases. Never point tests at
/var/lib/pyvorin. Always usetempfileor pytest'stmp_pathfixture. - Run CI on ARM64. If your production target is Raspberry Pi, use
GitHub's
ubuntu-latestwith QEMU or a self-hosted ARM64 runner to catch architecture-specific bugs (e.g., endianness, floating-point differences).
Summary
Testing Pyvorin Edge pipelines requires a layered approach: fast unit tests for logic, mocked
sensor adapters for data generation, property-based tests for invariant discovery, and
integration tests for end-to-end correctness. The SimulatorAdapter removes the
hardware dependency, while Hypothesis finds edge cases you would never think to write manually.
A robust CI/CD pipeline with coverage gates, mutation testing, and multi-version Python support
ensures that every commit is production-ready before it reaches your fleet.