GDPR and NIS2 Compliance

June 2, 2026 | 16 min read

Compliance at the Edge

Edge computing changes the compliance landscape. When personal data is collected, processed, and stored on a device inside the European Union, GDPR applies in full. When that device is part of critical infrastructure — a hospital, a water treatment plant, an energy grid — the NIS2 Directive imposes additional security and incident-reporting obligations. The Pyvorin Edge Runtime provides the building blocks you need to satisfy both frameworks: tamper-evident audit chains, configurable data retention, pseudonymisation, and automated breach detection.

This article shows you how to implement GDPR Article 30 records of processing, enforce data retention policies, handle right-to-erasure requests, set up automated breach notification, and map your controls to NIS2 Article 21 technical measures.

GDPR Article 30 — Records of Processing Activities

Article 30 requires controllers and processors to maintain a record of every processing activity under their responsibility. The record must include: the purpose, categories of data subjects and data, recipients, retention periods, and security measures. The PrivacyAudit class in pyv_edge_agent/privacy_firewall/audit.py provides export_for_gdpr(), which transforms the tamper-evident audit chain into exactly this format.


from pyv_edge_agent.privacy_firewall.audit import PrivacyAudit
import json

audit = PrivacyAudit(db_path="/var/lib/pyvorin/edge_store.db")

# Log a processing activity with full GDPR metadata
audit.log_action(
    action="sensor_batch_processed",
    details={
        "purpose": "Environmental monitoring for HVAC optimisation",
        "data_subjects": ["building_occupants"],
        "data_categories": ["temperature_reading", "humidity_reading", "occupancy_count"],
        "recipients": ["cloud_analytics", "facilities_dashboard"],
        "retention_period": "90_days",
        "security_measures": [
            "tls_1_3_mtls",
            "sha256_audit_chain",
            "field_redaction_for_pii",
            "encrypted_local_storage",
        ],
    },
)

# Export all records in Article 30 format
gdpr_records = audit.export_for_gdpr()

with open("/tmp/gdpr_article_30_export.json", "w", encoding="utf-8") as f:
    json.dump(gdpr_records, f, indent=2, default=str)

print(f"Exported {len(gdpr_records)} processing records.")
  

Structure of the GDPR Export

Each record returned by export_for_gdpr() contains:

  • record_id — The primary key from the privacy_audit table.
  • processing_activity — The action string (e.g., sensor_batch_processed).
  • timestamp_iso — UTC timestamp in ISO 8601 format.
  • purpose — Extracted from details["purpose"].
  • data_subjects — List of subject categories.
  • data_categories — List of data category strings.
  • recipients — Internal or external recipients of the data.
  • retention_period — Human-readable retention description.
  • security_measures — List of controls applied during processing.
  • additional_details — The full raw details dict for audit depth.

[
  {
    "record_id": 42,
    "processing_activity": "sensor_batch_processed",
    "timestamp_iso": "2024-05-30T14:22:10Z",
    "purpose": "Environmental monitoring for HVAC optimisation",
    "data_subjects": ["building_occupants"],
    "data_categories": ["temperature_reading", "humidity_reading", "occupancy_count"],
    "recipients": ["cloud_analytics", "facilities_dashboard"],
    "retention_period": "90_days",
    "security_measures": [
      "tls_1_3_mtls",
      "sha256_audit_chain",
      "field_redaction_for_pii",
      "encrypted_local_storage"
    ],
    "additional_details": { ... }
  }
]
  

Data Retention Policies

GDPR Article 5(1)(e) requires that personal data be kept no longer than necessary for the purposes for which it is processed. The Pyvorin Edge Runtime implements this through two mechanisms: the PrivacyPolicyEngine in pyv_edge_agent/privacy_firewall/policy.py, which classifies and filters data at ingestion time; and the SQLiteStore.purge_old() method in pyv_edge_agent/local_store/sqlite_store.py, which physically deletes expired records.

Configuring Retention


from pyv_edge_agent.local_store.sqlite_store import SQLiteStore
from pyv_edge_agent.privacy_firewall.policy import PrivacyPolicyEngine, PrivacyRuleset

# Define which fields are sensitive and subject to strict retention
ruleset = PrivacyRuleset(
    redact_fields=["patient_id", "ssn", "email"],
    mask_fields=["phone_number"],
    hash_fields=["device_mac"],
    drop_fields=["raw_camera_frame"],
    local_only=["biometric_score"],
)
engine = PrivacyPolicyEngine(ruleset=ruleset)

# Open the store and purge records older than 90 days
store = SQLiteStore(db_path="/var/lib/pyvorin/edge_store.db")
deleted = store.purge_old(hours=90 * 24)
print(f"Purged {deleted} expired records from all tables.")
  

The purge_old() method iterates over readings, events, summaries, and audit_log, deleting rows whose timestamp is older than the cutoff. It returns the total number of rows removed. Schedule this via cron or systemd timer every night at 02:00.


# /etc/cron.d/pyvorin-retention
0 2 * * * pyvorin /usr/bin/python3 -c \
  "from pyv_edge_agent.local_store.sqlite_store import SQLiteStore; \
   store = SQLiteStore('/var/lib/pyvorin/edge_store.db'); \
   store.purge_old(hours=2160)" \
  >> /var/log/pyvorin/retention.log 2>&1
  

Per-Category Retention

For stricter compliance, you may want different retention periods for different data categories. Query the readings table with a sensor_name filter before purging.


import time
import sqlite3

def purge_by_sensor(db_path: str, sensor_prefix: str, hours: int) -> int:
    """Delete readings from sensors matching prefix older than N hours."""
    cutoff = time.time() - (hours * 3600)
    conn = sqlite3.connect(db_path)
    try:
        cur = conn.execute(
            "DELETE FROM readings WHERE sensor_name LIKE ? AND timestamp < ?",
            (f"{sensor_prefix}%", cutoff),
        )
        conn.commit()
        return cur.rowcount
    finally:
        conn.close()

# Purge camera data after 7 days, temperature after 90 days
deleted_cam = purge_by_sensor("/var/lib/pyvorin/edge_store.db", "camera_", 7 * 24)
deleted_temp = purge_by_sensor("/var/lib/pyvorin/edge_store.db", "temp_", 90 * 24)
print(f"Purged {deleted_cam} camera records, {deleted_temp} temperature records.")
  

Right to Erasure (Article 17)

When a data subject requests deletion, you must erase their personal data without undue delay. At the edge, this is complicated by the fact that data may have already been synced upstream. The correct procedure is: erase locally, log the erasure event, and enqueue a deletion request for the cloud.


import time
import sqlite3
from pyv_edge_agent.privacy_firewall.audit import PrivacyAudit
from pyv_edge_agent.cloud_sync.queue import CloudSyncQueue, Priority

DB_PATH = "/var/lib/pyvorin/edge_store.db"


def delete_subject_locally(subject_id: str) -> dict:
    """Erase all local records matching subject_id and log the action."""
    conn = sqlite3.connect(DB_PATH)
    deleted_counts = {}
    try:
        for table in ("readings", "events", "summaries", "audit_log"):
            # Assumes metadata_json contains the subject_id
            cur = conn.execute(
                f"DELETE FROM {table} WHERE metadata_json LIKE ?",
                (f'%"subject_id": "{subject_id}"%',),
            )
            deleted_counts[table] = cur.rowcount
        conn.commit()
    finally:
        conn.close()

    # Log the erasure
    audit = PrivacyAudit(db_path=DB_PATH)
    audit.log_action(
        action="right_to_erasure",
        details={
            "subject_id": subject_id,
            "deleted_counts": deleted_counts,
            "legal_basis": "GDPR Article 17",
        },
    )

    return deleted_counts


def enqueue_cloud_deletion(subject_id: str) -> int:
    """Notify upstream systems to delete the subject's data."""
    queue = CloudSyncQueue(db_path="/var/lib/pyvorin/sync_queue.db")
    return queue.enqueue(
        payload={
            "type": "deletion_request",
            "subject_id": subject_id,
            "requested_at": time.time(),
            "legal_basis": "GDPR Article 17",
        },
        priority=Priority.CRITICAL,
        ttl_seconds=86400 * 7,
    )


# Execute a deletion request
deleted = delete_subject_locally(subject_id="user-8842")
print(f"Local deletion complete: {deleted}")

queue_id = enqueue_cloud_deletion(subject_id="user-8842")
print(f"Cloud deletion request enqueued as item {queue_id}")
  

Breach Notification

GDPR Article 33 requires controllers to notify the supervisory authority within 72 hours of becoming aware of a personal data breach. NIS2 tightens this for critical infrastructure, requiring notification within 24 hours for significant incidents. Automated breach detection at the edge is essential because you may not have human operators on site.

Automated Alert Triggers


import time
import logging
from pyv_edge_agent.privacy_firewall.audit import PrivacyAudit
from pyv_edge_agent.cloud_sync.queue import CloudSyncQueue, Priority

logger = logging.getLogger("breach_detection")


def check_breach_indicators(audit: PrivacyAudit, queue: CloudSyncQueue) -> list:
    """Scan for conditions that may indicate a data breach."""
    alerts = []

    # Indicator 1: Audit chain tampering
    if not audit.verify_chain():
        alerts.append({
            "severity": "critical",
            "type": "audit_chain_tampered",
            "message": "Privacy audit chain integrity check failed.",
        })

    # Indicator 2: Sudden spike in outbound data volume
    stats = queue.get_stats()
    if stats["depth"] > 100_000:
        alerts.append({
            "severity": "high",
            "type": "queue_depth_spike",
            "message": f"Sync queue depth {stats['depth']} exceeds threshold.",
        })

    # Indicator 3: Unusual retry patterns (possible exfiltration)
    if stats["retrying_items"] > 1000:
        alerts.append({
            "severity": "high",
            "type": "excessive_retries",
            "message": f"{stats['retrying_items']} items in retry loop.",
        })

    return alerts


def send_breach_alert(alert: dict) -> int:
    """Enqueue a critical alert for immediate upstream delivery."""
    queue = CloudSyncQueue(db_path="/var/lib/pyvorin/sync_queue.db")
    return queue.enqueue(
        payload={
            "type": "breach_alert",
            "severity": alert["severity"],
            "alert_type": alert["type"],
            "message": alert["message"],
            "timestamp": time.time(),
            "device_serial": "pi5-warehouse-a7f3",
        },
        priority=Priority.CRITICAL,
        ttl_seconds=86400,
    )


# Run every 5 minutes via systemd timer or cron
audit = PrivacyAudit(db_path="/var/lib/pyvorin/edge_store.db")
queue = CloudSyncQueue(db_path="/var/lib/pyvorin/sync_queue.db")

for alert in check_breach_indicators(audit, queue):
    logger.critical("BREACH INDICATOR: %s — %s", alert["type"], alert["message"])
    send_breach_alert(alert)
  

Breach Notification Template


from datetime import datetime, timezone
import json

def generate_breach_report(
    breach_type: str,
    affected_subjects: list,
    data_categories: list,
    likely_consequences: str,
    measures_taken: list,
) -> dict:
    """Generate a GDPR Article 33 compliant breach notification dict."""
    return {
        "notification_type": "gdpr_article_33",
        "timestamp_utc": datetime.now(timezone.utc).isoformat(),
        "breach_type": breach_type,
        "affected_data_subjects_count": len(affected_subjects),
        "affected_data_categories": data_categories,
        "likely_consequences": likely_consequences,
        "measures_taken": measures_taken,
        "contact_dpo": "dpo@pyvorin.com",
        "supervisory_authority": "German DPA (Berlin)",
    }


report = generate_breach_report(
    breach_type="unauthorised_access",
    affected_subjects=["building_occupants"],
    data_categories=["temperature_reading", "occupancy_count"],
    likely_consequences="Potential inference of building occupancy patterns.",
    measures_taken=[
        "device_isolated_from_network",
        "audit_chain_verified",
        "certificates_rotated",
        "law_enforcement_notified",
    ],
)

print(json.dumps(report, indent=2))
  

NIS2 Compliance for Critical Infrastructure

The NIS2 Directive (EU 2022/2555) applies to essential and important entities in sectors such as energy, transport, banking, health, drinking water, and digital infrastructure. Article 21 requires measures for risk analysis, incident handling, business continuity, supply chain security, encryption, and authentication. Pyvorin Edge provides direct support for most of these.

Mapping Pyvorin Edge Controls to NIS2 Article 21

NIS2 Article 21 Measure Pyvorin Edge Implementation
Risk analysis and information system security BundleVerifier.verify_at_runtime(), tamper detection, hardware attestation
Incident handling Automated breach alerts via CloudSyncQueue with Priority.CRITICAL
Business continuity and crisis management SQLite WAL recovery, queue rebuild scripts, automated backups
Supply chain security pyv-edge-sign for signed bundles, Ed25519 trust anchors
Security in acquisition and development Signed OTA updates, SimulatorAdapter for pre-deployment testing
Policies and procedures for cryptography Ed25519 signing, SHA-256 hash chains, mTLS with certificate pinning
Authentication and access control Hardware serial binding, per-device client certificates

NIS2 Incident Timeline

  • T+0 hours: Automated breach detection triggers. Alert enqueued with Priority.CRITICAL.
  • T+1 hour: Alert reaches cloud SIEM. Incident response team acknowledges.
  • T+12 hours: Preliminary assessment completed using PrivacyAudit export and hardware attestation report.
  • T+24 hours: NIS2 significant incident notification sent to national CSIRT and sector regulator.
  • T+72 hours: GDPR breach notification sent to supervisory authority (if personal data involved).

Complete Python Example for GDPR Export

The following script is a complete, copy-paste-ready tool that connects to an edge device, verifies the audit chain, exports GDPR Article 30 records, generates a retention report, and produces an integrity attestation.


#!/usr/bin/env python3
"""Complete GDPR compliance export and verification tool for Pyvorin Edge."""

import json
import sys
from datetime import datetime, timezone
from pathlib import Path

from pyv_edge_agent.privacy_firewall.audit import PrivacyAudit
from pyv_edge_agent.local_store.sqlite_store import SQLiteStore
from pyvorin_edge.attestation import collect_hardware_info

DB_PATH = Path("/var/lib/pyvorin/edge_store.db")
OUTPUT_DIR = Path("/tmp/gdpr_export")


def main() -> int:
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

    audit = PrivacyAudit(db_path=str(DB_PATH))
    store = SQLiteStore(db_path=str(DB_PATH))

    # 1. Verify audit chain integrity
    if not audit.verify_chain():
        print("CRITICAL: Audit chain tampered. Export aborted.", file=sys.stderr)
        return 1

    # 2. Export Article 30 records
    gdpr_records = audit.export_for_gdpr()
    article_30_path = OUTPUT_DIR / "article_30_records.json"
    with open(article_30_path, "w", encoding="utf-8") as f:
        json.dump(gdpr_records, f, indent=2, default=str)
    print(f"Exported {len(gdpr_records)} Article 30 records to {article_30_path}")

    # 3. Retention report
    stats = store.get_stats()
    retention_report = {
        "generated_at": datetime.now(timezone.utc).isoformat(),
        "db_size_bytes": stats["db_size_bytes"],
        "wal_size_bytes": stats["wal_size_bytes"],
        "total_rows": stats["total_rows"],
        "oldest_record_timestamp": stats["oldest_record_timestamp"],
        "row_counts": stats["row_counts"],
    }
    retention_path = OUTPUT_DIR / "retention_report.json"
    with open(retention_path, "w", encoding="utf-8") as f:
        json.dump(retention_report, f, indent=2, default=str)
    print(f"Retention report written to {retention_path}")

    # 4. Hardware attestation
    hw = collect_hardware_info()
    attestation = {
        "device_serial": hw.serial_number,
        "hardware_revision": hw.hardware_revision,
        "cpu_model": hw.cpu_model,
        "exported_at": datetime.now(timezone.utc).isoformat(),
    }
    attestation_path = OUTPUT_DIR / "device_attestation.json"
    with open(attestation_path, "w", encoding="utf-8") as f:
        json.dump(attestation, f, indent=2, default=str)
    print(f"Device attestation written to {attestation_path}")

    # 5. Summary manifest
    manifest = {
        "export_timestamp": datetime.now(timezone.utc).isoformat(),
        "files": {
            "article_30_records": str(article_30_path),
            "retention_report": str(retention_path),
            "device_attestation": str(attestation_path),
        },
        "audit_chain_integrity": "verified",
        "record_count": len(gdpr_records),
    }
    with open(OUTPUT_DIR / "manifest.json", "w", encoding="utf-8") as f:
        json.dump(manifest, f, indent=2)

    print("GDPR export complete.")
    return 0


if __name__ == "__main__":
    sys.exit(main())
  

Operational Best Practices

  • Run export_for_gdpr() monthly. Even if no audit is scheduled, regular exports prove ongoing compliance and detect schema drift early.
  • Sign the export. Use pyv-edge-sign to sign the JSON export before transferring it to legal. This preserves non-repudiation.
  • Store exports off-device. rsync or scp the /tmp/gdpr_export directory to an object-store bucket with object-lock and legal-hold enabled.
  • Document your retention rationale. GDPR requires you to justify retention periods. Add a retention_rationale field to your PrivacyAudit details for every processing activity.
  • Test deletion requests quarterly. Simulate a right-to-erasure request on a staging device, verify that local and upstream data are removed, and confirm that the PrivacyAudit chain records the deletion event.

Summary

GDPR and NIS2 compliance at the edge is achievable with the tools built into Pyvorin Edge. PrivacyAudit.export_for_gdpr() gives you Article 30 records in a standard format. SQLiteStore.purge_old() enforces retention limits. The deletion-request workflow handles right-to-erasure with full audit logging. Automated breach detection and alerting satisfy both GDPR Article 33 and NIS2 incident-reporting timelines. By mapping every control to a specific runtime API or CLI tool, Pyvorin Edge turns regulatory requirements from paperwork into executable, testable, and verifiable code.