Testing Pyvorin Edge Pipelines

June 2, 2026 | 18 min read

Why Edge Code Demands Rigorous Testing

A bug in a cloud service can be patched in seconds and redeployed globally in minutes. A bug in an edge pipeline can brick a thousand devices, corrupt months of audit logs, or leak patient data from a hospital gateway that has no on-site IT staff. Testing is not optional; it is the price of admission for production edge deployments.

This article covers the full testing pyramid for Pyvorin Edge: fast unit tests with pytest, adapter mocking with SimulatorAdapter, property-based testing with Hypothesis, end-to-end integration tests, and a complete GitHub Actions CI/CD pipeline with coverage gates.

Unit Testing Pipelines with pytest

The pipeline engine in pyv_edge_agent is pure Python functions that transform SensorReading dicts. This makes them ideal for unit testing: no hardware, no network, no database required.

Project Test Layout


edge_project/
├── pyproject.toml
├── src/
│   └── my_pipeline/
│       ├── __init__.py
│       ├── filters.py
│       └── aggregates.py
├── tests/
│   ├── unit/
│   │   ├── test_filters.py
│   │   └── test_aggregates.py
│   ├── integration/
│   │   └── test_full_pipeline.py
│   └── conftest.py
└── .github/
    └── workflows/
        └── ci.yml
  

Testing a Privacy Filter


# tests/unit/test_filters.py
import pytest
from my_pipeline.filters import redact_phi, mask_email


class TestRedactPHI:
    def test_redacts_patient_id(self):
        reading = {
            "sensor_name": "bp_cuff_01",
            "timestamp": 1700000000.0,
            "value": 120.0,
            "unit": "mmHg",
            "metadata": {"patient_id": "P-8842", "room": "ICU-3"},
        }
        result = redact_phi(reading)
        assert "patient_id" not in result["metadata"]
        assert result["metadata"]["room"] == "ICU-3"

    def test_leaves_non_phi_intact(self):
        reading = {
            "sensor_name": "temp_lobby",
            "timestamp": 1700000000.0,
            "value": 22.5,
            "unit": "°C",
            "metadata": {"floor": "2", "zone": "north"},
        }
        result = redact_phi(reading)
        assert result["metadata"] == {"floor": "2", "zone": "north"}


class TestMaskEmail:
    def test_masks_long_email(self):
        assert mask_email("alice.smith@hospital.org") == "al****org"

    def test_masks_short_email(self):
        assert mask_email("a@b.co") == "****"
  

Testing an Aggregate Function


# tests/unit/test_aggregates.py
import pytest
from my_pipeline.aggregates import windowed_average


class TestWindowedAverage:
    def test_basic_average(self):
        readings = [
            {"value": 10.0},
            {"value": 20.0},
            {"value": 30.0},
        ]
        assert windowed_average(readings, window_size=3) == [20.0]

    def test_sliding_window(self):
        readings = [
            {"value": 10.0},
            {"value": 20.0},
            {"value": 30.0},
            {"value": 40.0},
        ]
        result = windowed_average(readings, window_size=2)
        assert result == [15.0, 25.0, 35.0]

    def test_empty_input(self):
        assert windowed_average([], window_size=3) == []

    def test_window_larger_than_input(self):
        readings = [{"value": 5.0}]
        assert windowed_average(readings, window_size=5) == []
  

Mocking Adapters with SimulatorAdapter

Real sensors require hardware. For CI and local development, use the SimulatorAdapter in pyv_edge_agent/ingest/simulator_adapter.py to generate realistic readings with configurable noise, drift, and anomaly injection.


# tests/conftest.py
import pytest
from pyv_edge_agent.ingest.simulator_adapter import SimulatorAdapter, SensorConfig, SensorType


@pytest.fixture
def temp_adapter() -> SimulatorAdapter:
    sensors = [
        SensorConfig(
            name="temp_kitchen",
            sensor_type=SensorType.TEMPERATURE,
            unit="°C",
            min_value=-10.0,
            max_value=50.0,
            noise_std=0.3,
            baseline=21.0,
        ),
        SensorConfig(
            name="humidity_kitchen",
            sensor_type=SensorType.HUMIDITY,
            unit="%RH",
            min_value=0.0,
            max_value=100.0,
            noise_std=1.0,
            baseline=45.0,
        ),
    ]
    return SimulatorAdapter(sensors=sensors, seed=42, anomaly_probability=0.0)


@pytest.fixture
def leaky_adapter() -> SimulatorAdapter:
    sensors = [
        SensorConfig(
            name="leak_sensor",
            sensor_type=SensorType.LEAK,
            unit="boolean",
            min_value=0.0,
            max_value=1.0,
            baseline=0.0,
        ),
    ]
    return SimulatorAdapter(sensors=sensors, seed=99, scenario="leak_event")
  

# tests/unit/test_pipeline_with_simulator.py
import pytest
from my_pipeline.pipeline import process_batch


class TestPipelineWithSimulator:
    def test_process_batch_returns_expected_shape(self, temp_adapter):
        readings = temp_adapter.generate_batch(
            duration_seconds=10.0,
            sample_rate_hz=1.0,
        )
        # readings is a list of frames; flatten for the pipeline
        flat = [r for frame in readings for r in frame]
        result = process_batch(flat)
        assert "avg_temperature" in result
        assert "avg_humidity" in result
        assert result["count"] == len(flat)

    def test_anomaly_detection_triggers_event(self, temp_adapter):
        temp_adapter.anomaly_probability = 1.0  # Force anomalies
        readings = temp_adapter.generate_batch(
            duration_seconds=5.0,
            sample_rate_hz=1.0,
        )
        flat = [r for frame in readings for r in frame]
        result = process_batch(flat)
        assert result["anomaly_count"] > 0

    def test_leak_scenario_detected(self, leaky_adapter):
        readings = leaky_adapter.generate_batch(
            duration_seconds=3600.0,
            sample_rate_hz=1.0 / 60.0,  # one per minute
            sensor_names=["leak_sensor"],
        )
        flat = [r for frame in readings for r in frame]
        leak_frames = [r for r in flat if r["value"] is True]
        assert len(leak_frames) > 0
  

Property-Based Testing with Hypothesis

Unit tests verify specific examples. Property-based tests verify invariants across thousands of randomly generated inputs. The Hypothesis library is excellent for finding edge cases in pipeline logic.


# tests/unit/test_properties.py
import pytest
from hypothesis import given, strategies as st
from my_pipeline.filters import clamp_value
from my_pipeline.aggregates import windowed_average


class TestClampValueProperties:
    @given(
        st.floats(allow_nan=False, allow_infinity=False),
        st.floats(allow_nan=False, allow_infinity=False),
        st.floats(allow_nan=False, allow_infinity=False),
    )
    def test_clamp_always_within_bounds(self, value, lo, hi):
        if lo > hi:
            lo, hi = hi, lo  # Normalise bounds
        result = clamp_value(value, lo, hi)
        assert lo <= result <= hi

    @given(st.floats(), st.floats())
    def test_clamp_is_idempotent(self, value, bound):
        lo = min(value, bound)
        hi = max(value, bound)
        first = clamp_value(value, lo, hi)
        second = clamp_value(first, lo, hi)
        assert first == second


class TestWindowedAverageProperties:
    @given(st.lists(st.floats(min_value=-1e6, max_value=1e6), min_size=1))
    def test_average_within_min_max(self, values):
        readings = [{"value": v} for v in values]
        result = windowed_average(readings, window_size=len(values))
        assert len(result) == 1
        assert min(values) <= result[0] <= max(values)

    @given(st.lists(st.floats(), min_size=0, max_size=100))
    def test_window_size_one_preserves_values(self, values):
        readings = [{"value": v} for v in values]
        result = windowed_average(readings, window_size=1)
        assert result == pytest.approx(values)
  

Integration Tests — Full Pipeline End-to-End

Integration tests verify that the pipeline, store, queue, and privacy engine work together correctly. They run against a real SQLite database (in a temporary directory) but still avoid hardware and cloud dependencies.


# tests/integration/test_full_pipeline.py
import json
import tempfile
from pathlib import Path

import pytest
from pyv_edge_agent.local_store.sqlite_store import SQLiteStore
from pyv_edge_agent.cloud_sync.queue import CloudSyncQueue, Priority
from pyv_edge_agent.privacy_firewall.policy import PrivacyPolicyEngine, PrivacyRuleset
from pyv_edge_agent.ingest.simulator_adapter import SimulatorAdapter, SensorConfig, SensorType
from my_pipeline.pipeline import run_pipeline


class TestFullPipeline:
    def test_end_to_end(self):
        with tempfile.TemporaryDirectory() as tmp:
            db_path = Path(tmp) / "edge_store.db"
            queue_path = Path(tmp) / "sync_queue.db"

            store = SQLiteStore(db_path=str(db_path))
            queue = CloudSyncQueue(db_path=str(queue_path))
            ruleset = PrivacyRuleset(
                redact_fields=["patient_id"],
                hash_fields=["device_mac"],
            )
            privacy = PrivacyPolicyEngine(ruleset=ruleset)

            adapter = SimulatorAdapter(
                sensors=[
                    SensorConfig(
                        name="temp_icu",
                        sensor_type=SensorType.TEMPERATURE,
                        unit="°C",
                        min_value=15.0,
                        max_value=30.0,
                        noise_std=0.2,
                        baseline=22.0,
                    ),
                ],
                seed=123,
            )

            # Generate readings and run the full pipeline
            readings = adapter.generate_batch(
                duration_seconds=60.0,
                sample_rate_hz=1.0,
            )
            flat = [r for frame in readings for r in frame]

            result = run_pipeline(
                readings=flat,
                store=store,
                queue=queue,
                privacy=privacy,
            )

            # Assertions
            assert result["processed"] == len(flat)
            assert store.get_stats()["total_rows"] > 0
            assert queue.pending_count() > 0

            # Verify privacy was applied
            sample_event = json.loads(queue.peek(n=1)[0].payload)
            if "metadata" in sample_event:
                assert "patient_id" not in sample_event["metadata"]

            store.close()
  

CI/CD Pipeline with GitHub Actions

Edge code should be tested on every commit. The following GitHub Actions workflow runs unit tests, property-based tests, integration tests, coverage analysis, and linting.


# .github/workflows/ci.yml
name: Pyvorin Edge CI

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        python-version: ["3.10", "3.11", "3.12"]

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python ${{ matrix.python-version }}
        uses: actions/setup-python@v5
        with:
          python-version: ${{ matrix.python-version }}

      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install -e ".[dev]"

      - name: Run unit tests
        run: pytest tests/unit -v --cov=my_pipeline --cov-report=xml

      - name: Run property-based tests
        run: pytest tests/unit/test_properties.py -v --hypothesis-seed=0

      - name: Run integration tests
        run: pytest tests/integration -v

      - name: Upload coverage
        uses: codecov/codecov-action@v4
        with:
          files: ./coverage.xml
          fail_ci_if_error: true

      - name: Lint with ruff
        run: ruff check src tests

      - name: Type check with mypy
        run: mypy src

      - name: Check formatting
        run: ruff format --check src tests
  

Code Coverage and Quality Gates

Coverage alone is a weak metric — 100% coverage with no assertions is worthless. Use coverage as a gate, not a goal, and pair it with mutation testing for critical paths.

Coverage Configuration (pyproject.toml)


[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-ra -q --strict-markers"
markers = [
    "slow: marks tests as slow (deselect with '-m \"not slow\"')",
    "integration: marks tests as integration tests",
]

[tool.coverage.run]
source = ["src/my_pipeline"]
branch = true
omit = [
    "*/tests/*",
    "*/__main__.py",
]

[tool.coverage.report]
exclude_lines = [
    "pragma: no cover",
    "def __repr__",
    "raise AssertionError",
    "raise NotImplementedError",
]
fail_under = 85
  

Mutation Testing with mutmut


# Install
pip install mutmut

# Run mutation testing on the pipeline module
mutmut run --paths-to-mutate=src/my_pipeline

# Show surviving mutants (these indicate weak tests)
mutmut results

# Generate HTML report
mutmut html
  

Complete Test File Examples

Here is a single consolidated test file that exercises the privacy engine, SQLite store, and queue together. Use it as a template for your own integration tests.


# tests/integration/test_privacy_store_queue.py
import tempfile
from pathlib import Path

import pytest
from pyv_edge_agent.local_store.sqlite_store import SQLiteStore
from pyv_edge_agent.cloud_sync.queue import CloudSyncQueue, Priority
from pyv_edge_agent.privacy_firewall.policy import PrivacyPolicyEngine, PrivacyRuleset
from pyv_edge_agent.types import SensorReading


class TestPrivacyStoreQueueIntegration:
    @pytest.fixture
    def store(self):
        with tempfile.TemporaryDirectory() as tmp:
            db = Path(tmp) / "store.db"
            s = SQLiteStore(db_path=str(db))
            yield s
            s.close()

    @pytest.fixture
    def queue(self):
        with tempfile.TemporaryDirectory() as tmp:
            db = Path(tmp) / "queue.db"
            q = CloudSyncQueue(db_path=str(db))
            yield q

    @pytest.fixture
    def privacy(self):
        ruleset = PrivacyRuleset(
            redact_fields=["ssn"],
            mask_fields=["phone"],
            hash_fields=["email"],
            drop_fields=["raw_image"],
        )
        return PrivacyPolicyEngine(ruleset=ruleset)

    def test_full_flow(self, store, queue, privacy):
        reading = SensorReading(
            sensor_name="camera_entrance",
            timestamp=1700000000.0,
            value=1,
            unit="frame",
            metadata={
                "ssn": "123-45-6789",
                "phone": "+1-555-0199",
                "email": "alice@example.com",
                "raw_image": "",
                "confidence": 0.97,
            },
        )

        # Apply privacy rules
        processed = privacy.apply(reading)
        assert processed is not None
        assert "ssn" not in processed.metadata
        assert processed.metadata["phone"] == "+1****99"
        assert len(processed.metadata["email"]) == 64  # SHA-256 hex
        assert "raw_image" not in processed.metadata
        assert processed.metadata["confidence"] == 0.97

        # Store locally
        row_id = store.store_reading(processed)
        assert row_id > 0

        # Enqueue for cloud sync
        queue_id = queue.enqueue(
            payload={
                "sensor_name": processed.sensor_name,
                "timestamp": processed.timestamp,
                "value": processed.value,
                "unit": processed.unit,
                "metadata": processed.metadata,
            },
            priority=Priority.TELEMETRY,
        )
        assert queue_id > 0
        assert queue.pending_count() == 1

        # Dequeue and ack
        items = queue.dequeue(batch_size=10)
        assert len(items) == 1
        queue.ack([items[0].id])
        assert queue.pending_count() == 0
  

Operational Best Practices

  • Keep unit tests under 100 ms each. Slow tests are not run. Use SimulatorAdapter with small duration_seconds and high sample_rate_hz for fast data generation.
  • Tag slow tests. Use @pytest.mark.slow and skip them locally with pytest -m "not slow".
  • Test privacy rules exhaustively. Every field in PrivacyRuleset should have at least one test for redaction, masking, hashing, dropping, and local-only behaviour.
  • Use tmp_path for databases. Never point tests at /var/lib/pyvorin. Always use tempfile or pytest's tmp_path fixture.
  • Run CI on ARM64. If your production target is Raspberry Pi, use GitHub's ubuntu-latest with QEMU or a self-hosted ARM64 runner to catch architecture-specific bugs (e.g., endianness, floating-point differences).

Summary

Testing Pyvorin Edge pipelines requires a layered approach: fast unit tests for logic, mocked sensor adapters for data generation, property-based tests for invariant discovery, and integration tests for end-to-end correctness. The SimulatorAdapter removes the hardware dependency, while Hypothesis finds edge cases you would never think to write manually. A robust CI/CD pipeline with coverage gates, mutation testing, and multi-version Python support ensures that every commit is production-ready before it reaches your fleet.