Why Mutual TLS Matters at the Edge
In a typical cloud deployment, TLS protects data in transit from the client to the server. At the edge, this is not enough. Your Raspberry Pi 5 gateway sits in a warehouse, a hospital closet, or a factory floor where physical access is possible and network segmentation is often weak. If an attacker plugs a rogue device into the same VLAN, server-only TLS will happily accept its connection. Mutual TLS (mTLS) fixes this by requiring both parties to present valid certificates. The gateway proves its identity to the cloud, and the cloud proves its identity to the gateway.
This article covers the complete lifecycle: generating a private CA, issuing client certificates for each
edge device, configuring the MQTTAdapter and HTTPAdapter in
pyv_edge_agent/ingest/ for mTLS, rotating certificates without downtime, and revoking
compromised certificates using CRL and OCSP.
Generating a Private CA with OpenSSL
Do not use public CAs for device authentication. You need a private PKI that you control completely. The following commands create a 4096-bit RSA root CA with a 20-year validity period. Store the root private key in an offline HSM or encrypted vault; it should never reside on an edge device.
# Create directory structure
mkdir -p ~/pyvorin-ca/{private,certs,newcerts,crl,csr}
chmod 700 ~/pyvorin-ca/private
touch ~/pyvorin-ca/index.txt
echo 1000 > ~/pyvorin-ca/serial
# Create the OpenSSL CA configuration file
cat > ~/pyvorin-ca/openssl.cnf << 'EOF'
[ ca ]
default_ca = CA_default
[ CA_default ]
dir = ~/pyvorin-ca
certs = $dir/certs
crl_dir = $dir/crl
new_certs_dir = $dir/newcerts
database = $dir/index.txt
serial = $dir/serial
private_key = $dir/private/ca.key.pem
certificate = $dir/certs/ca.cert.pem
crlnumber = $dir/crlnumber
crl = $dir/crl/ca.crl.pem
default_md = sha256
name_opt = ca_default
cert_opt = ca_default
default_days = 375
preserve = no
policy = policy_strict
[ policy_strict ]
countryName = match
stateOrProvinceName = match
organizationName = match
organizationalUnitName = optional
commonName = supplied
emailAddress = optional
[ req ]
default_bits = 4096
distinguished_name = req_distinguished_name
string_mask = utf8only
default_md = sha256
[ req_distinguished_name ]
countryName = Country Name (2 letter code)
stateOrProvinceName = State or Province Name
localityName = Locality Name
organizationName = Organization Name
organizationalUnitName = Organizational Unit Name
commonName = Common Name
[ v3_ca ]
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid:always,issuer
basicConstraints = critical, CA:true
keyUsage = critical, digitalSignature, cRLSign, keyCertSign
[ usr_cert ]
basicConstraints = CA:FALSE
nsCertType = client, email
nsComment = "OpenSSL Generated Client Certificate"
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid,issuer
keyUsage = critical, nonRepudiation, digitalSignature, keyEncipherment
extendedKeyUsage = clientAuth, emailProtection
EOF
# Generate the root CA private key (encrypt with AES-256)
openssl genrsa -aes256 -out ~/pyvorin-ca/private/ca.key.pem 4096
chmod 400 ~/pyvorin-ca/private/ca.key.pem
# Generate the self-signed root CA certificate
openssl req -config ~/pyvorin-ca/openssl.cnf \
-key ~/pyvorin-ca/private/ca.key.pem \
-new -x509 -days 7300 -sha256 -extensions v3_ca \
-out ~/pyvorin-ca/certs/ca.cert.pem \
-subj "/C=DE/ST=Berlin/O=Pyvorin Edge/CN=Pyvorin Root CA"
chmod 444 ~/pyvorin-ca/certs/ca.cert.pem
Creating Client Certificates for Each Edge Device
Every edge device must have a unique client certificate. Do not share certificates across devices; if one device is compromised, you need the ability to revoke only that certificate. The common name should uniquely identify the device, ideally using its hardware serial number.
# Device-specific variables
DEVICE_SERIAL="pi5-warehouse-a7f3"
DEVICE_CN="edge-${DEVICE_SERIAL}"
DEVICE_DAYS=365
# Generate a private key for the device
openssl genrsa -out ~/pyvorin-ca/csr/${DEVICE_CN}.key.pem 2048
chmod 400 ~/pyvorin-ca/csr/${DEVICE_CN}.key.pem
# Create a certificate signing request (CSR)
openssl req -new \
-key ~/pyvorin-ca/csr/${DEVICE_CN}.key.pem \
-out ~/pyvorin-ca/csr/${DEVICE_CN}.csr.pem \
-subj "/C=DE/ST=Berlin/O=Pyvorin Edge/CN=${DEVICE_CN}"
# Sign the CSR with the CA to produce the client certificate
openssl ca -config ~/pyvorin-ca/openssl.cnf \
-extensions usr_cert -days ${DEVICE_DAYS} -notext -md sha256 \
-in ~/pyvorin-ca/csr/${DEVICE_CN}.csr.pem \
-out ~/pyvorin-ca/certs/${DEVICE_CN}.cert.pem
chmod 444 ~/pyvorin-ca/certs/${DEVICE_CN}.cert.pem
# Verify the certificate chain
openssl verify -CAfile ~/pyvorin-ca/certs/ca.cert.pem \
~/pyvorin-ca/certs/${DEVICE_CN}.cert.pem
To automate this for a fleet of thousands, wrap the above in a Python script that reads device inventory from your asset database and writes the resulting certificates to a secure distribution bucket.
import subprocess
from pathlib import Path
CA_DIR = Path.home() / "pyvorin-ca"
DEVICES = ["pi5-warehouse-a7f3", "pi5-factory-b2e9", "pi5-clinic-c4d1"]
for device in DEVICES:
cn = f"edge-{device}"
key_path = CA_DIR / "csr" / f"{cn}.key.pem"
csr_path = CA_DIR / "csr" / f"{cn}.csr.pem"
cert_path = CA_DIR / "certs" / f"{cn}.cert.pem"
subprocess.run(
["openssl", "genrsa", "-out", str(key_path), "2048"],
check=True,
)
subprocess.run(
["openssl", "req", "-new", "-key", str(key_path),
"-out", str(csr_path), "-subj", f"/C=DE/ST=Berlin/O=Pyvorin Edge/CN={cn}"],
check=True,
)
subprocess.run(
["openssl", "ca", "-config", str(CA_DIR / "openssl.cnf"),
"-extensions", "usr_cert", "-days", "365", "-notext", "-md", "sha256",
"-in", str(csr_path), "-out", str(cert_path)],
check=True,
input=b"y\ny\n", # auto-confirm CA signing
)
print(f"Issued certificate for {cn}")
Configuring MQTT Adapter for mTLS
The MQTTAdapter in pyv_edge_agent/ingest/mqtt_adapter.py uses
paho-mqtt under the hood. To enable mTLS, you must subclass MQTTAdapter
and configure the underlying paho client with a TLS context that includes the device
certificate, private key, and CA bundle.
import ssl
from typing import Any, Dict
from pyv_edge_agent.ingest.mqtt_adapter import MQTTAdapter
class MQTTSAdapter(MQTTAdapter):
"""MQTTAdapter with mutual TLS authentication and certificate pinning."""
def __init__(
self,
ca_certs: str,
certfile: str,
keyfile: str,
pin_server_cert: bool = False,
pinned_hash: str = "",
) -> None:
super().__init__()
self._ca_certs = ca_certs
self._certfile = certfile
self._keyfile = keyfile
self._pin_server_cert = pin_server_cert
self._pinned_hash = pinned_hash
def connect(
self, broker: str, port: int = 8883, topic: str = "#", qos: int = 1
) -> None:
self._broker = broker
self._port = port
self._topic = topic
self._qos = qos
self._stop_reconnect.clear()
try:
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1) # type: ignore[attr-defined]
except (AttributeError, TypeError):
client = mqtt.Client() # type: ignore[misc]
# Build TLS context with client certificate
tls_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
tls_context.minimum_version = ssl.TLSVersion.TLSv1_2
tls_context.load_verify_locations(self._ca_certs)
tls_context.load_cert_chain(self._certfile, self._keyfile)
tls_context.verify_mode = ssl.CERT_REQUIRED
# Optional certificate pinning: verify server cert fingerprint
if self._pin_server_cert:
tls_context.load_default_certs()
client.tls_set_context(tls_context)
client.on_connect = self._on_connect
client.on_disconnect = self._on_disconnect
client.on_message = self._on_message
client.connect(broker, port)
client.loop_start()
self._client = client
# Usage on the edge device
adapter = MQTTSAdapter(
ca_certs="/etc/pyvorin/certs/ca.cert.pem",
certfile="/etc/pyvorin/certs/edge-pi5-warehouse-a7f3.cert.pem",
keyfile="/etc/pyvorin/certs/edge-pi5-warehouse-a7f3.key.pem",
)
adapter.connect(broker="mqtt.pyvorin.io", port=8883, topic="sensors/+/telemetry")
adapter.on_message(lambda reading: print(f"Received: {reading['sensor_name']}={reading['value']}"))
Configuring HTTP Adapter for mTLS
The HTTPAdapter in pyv_edge_agent/ingest/http_adapter.py uses
urllib.request for upstream POSTs. The built-in post_to_upstream() method
does not accept TLS parameters, so you must subclass or wrap it to inject a custom
SSLContext with client certificates.
import json
import ssl
from urllib.request import Request, urlopen
from pyv_edge_agent.ingest.http_adapter import HTTPAdapter
class HTTPSMTLSAdapter(HTTPAdapter):
"""HTTPAdapter with mutual TLS for upstream POST requests."""
def __init__(
self,
ca_certs: str,
certfile: str,
keyfile: str,
) -> None:
super().__init__()
self._ca_certs = ca_certs
self._certfile = certfile
self._keyfile = keyfile
self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
self._ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
self._ssl_context.load_verify_locations(ca_certs)
self._ssl_context.load_cert_chain(certfile, keyfile)
self._ssl_context.verify_mode = ssl.CERT_REQUIRED
def post_to_upstream(self, url: str, payload: dict) -> dict:
data = json.dumps(payload).encode("utf-8")
request = Request(
url,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
with urlopen(request, timeout=30, context=self._ssl_context) as response:
response_body = response.read()
if response_body:
return json.loads(response_body.decode("utf-8"))
return {"status": "ok"}
# Usage
https = HTTPSMTLSAdapter(
ca_certs="/etc/pyvorin/certs/ca.cert.pem",
certfile="/etc/pyvorin/certs/edge-pi5-warehouse-a7f3.cert.pem",
keyfile="/etc/pyvorin/certs/edge-pi5-warehouse-a7f3.key.pem",
)
result = https.post_to_upstream(
"https://api.pyvorin.com/v1/ingest",
{"sensor_name": "temp_01", "value": 22.5, "unit": "°C"},
)
print(result)
Certificate Rotation Strategy
Certificates expire. A device that loses its certificate in the field will be unable to connect, and an unmonitored expiry can cause a fleet-wide outage. The rotation strategy must be automated, atomic, and safe to retry.
The Shadow Directory Pattern
Never overwrite a live certificate in place. Instead, write the new certificate and key to a shadow directory, verify the files are valid, then atomically swap.
#!/bin/bash
set -euo pipefail
LIVE_DIR="/etc/pyvorin/certs"
SHADOW_DIR="/etc/pyvorin/certs.new"
DEVICE_CN="edge-pi5-warehouse-a7f3"
# 1. Download new certificate bundle from your secure distribution server
mkdir -p "${SHADOW_DIR}"
curl -fsSL \
-H "Authorization: Bearer ${ROTATION_TOKEN}" \
"https://certs.internal/v1/rotate/${DEVICE_CN}" \
| tar -xz -C "${SHADOW_DIR}"
# 2. Validate the new certificate chain and private key match
openssl verify -CAfile "${SHADOW_DIR}/ca.cert.pem" \
"${SHADOW_DIR}/${DEVICE_CN}.cert.pem"
openssl x509 -noout -modulus -in "${SHADOW_DIR}/${DEVICE_CN}.cert.pem" | openssl md5
openssl rsa -noout -modulus -in "${SHADOW_DIR}/${DEVICE_CN}.key.pem" | openssl md5
# 3. Atomically replace the live directory
mv "${LIVE_DIR}" "${LIVE_DIR}.old"
mv "${SHADOW_DIR}" "${LIVE_DIR}"
# 4. Signal the Edge Agent to reload (SIGHUP or API call)
kill -HUP "$(pgrep -f 'pyv_edge_agent')"
# 5. Clean up old directory after verification
rm -rf "${LIVE_DIR}.old"
echo "Certificate rotated successfully"
Python Rotation Helper
import os
import shutil
import subprocess
from pathlib import Path
CERT_DIR = Path("/etc/pyvorin/certs")
SHADOW_DIR = Path("/etc/pyvorin/certs.new")
BACKUP_DIR = Path("/etc/pyvorin/certs.old")
def rotate_certificates(new_ca: Path, new_cert: Path, new_key: Path) -> None:
"""Atomically rotate mTLS certificates with validation."""
SHADOW_DIR.mkdir(parents=True, exist_ok=True)
# Copy and restrict permissions
shutil.copy2(new_ca, SHADOW_DIR / "ca.cert.pem")
shutil.copy2(new_cert, SHADOW_DIR / f"edge.cert.pem")
shutil.copy2(new_key, SHADOW_DIR / f"edge.key.pem")
(SHADOW_DIR / "edge.key.pem").chmod(0o400)
# Validate chain
subprocess.run(
["openssl", "verify", "-CAfile", str(SHADOW_DIR / "ca.cert.pem"),
str(SHADOW_DIR / "edge.cert.pem")],
check=True,
)
# Atomic swap
if CERT_DIR.exists():
os.replace(str(CERT_DIR), str(BACKUP_DIR))
os.replace(str(SHADOW_DIR), str(CERT_DIR))
# Trigger agent reload
# (Assumes your agent exposes a Unix socket or PID file)
pid_file = Path("/run/pyvorin-edge/agent.pid")
if pid_file.exists():
os.kill(int(pid_file.read_text().strip()), 1) # SIGHUP
# Clean up backup on success
if BACKUP_DIR.exists():
shutil.rmtree(BACKUP_DIR)
Certificate Revocation (CRL and OCSP)
When a device is stolen or a private key is leaked, you must revoke its certificate before the attacker can impersonate it. The two standard mechanisms are Certificate Revocation Lists (CRL) and Online Certificate Status Protocol (OCSP).
Maintaining a CRL
A CRL is a signed, timestamped list of revoked serial numbers. Edge devices download the CRL periodically and reject any peer certificate whose serial appears on the list.
# Revoke a compromised certificate
openssl ca -config ~/pyvorin-ca/openssl.cnf \
-revoke ~/pyvorin-ca/certs/edge-pi5-warehouse-a7f3.cert.pem
# Generate the CRL (valid for 30 days)
openssl ca -config ~/pyvorin-ca/openssl.cnf \
-gencrl -out ~/pyvorin-ca/crl/ca.crl.pem
# Verify a certificate against the CRL
openssl verify -crl_check -CAfile ~/pyvorin-ca/certs/ca.cert.pem \
-CRLfile ~/pyvorin-ca/crl/ca.crl.pem \
~/pyvorin-ca/certs/edge-pi5-warehouse-a7f3.cert.pem
OCSP Responder
OCSP allows real-time revocation checks without downloading a full CRL. Run an OCSP responder on your CA server and configure edge devices to query it before accepting a server certificate.
# Start an OCSP responder (run on the CA server)
openssl ocsp -index ~/pyvorin-ca/index.txt \
-port 2560 \
-rsigner ~/pyvorin-ca/certs/ca.cert.pem \
-rkey ~/pyvorin-ca/private/ca.key.pem \
-CA ~/pyvorin-ca/certs/ca.cert.pem \
-text
# Query the responder from an edge device
openssl ocsp -CAfile ~/pyvorin-ca/certs/ca.cert.pem \
-issuer ~/pyvorin-ca/certs/ca.cert.pem \
-cert ~/pyvorin-ca/certs/edge-pi5-warehouse-a7f3.cert.pem \
-url http://ca.internal:2560 \
-text
Python CRL Check Before Connection
import ssl
from pathlib import Path
from cryptography import x509
from cryptography.hazmat.backends import default_backend
CRL_PATH = Path("/etc/pyvorin/crl/ca.crl.pem")
def is_revoked(cert_path: Path, crl_path: Path) -> bool:
"""Return True if the certificate is present in the CRL."""
with open(crl_path, "rb") as f:
crl = x509.load_pem_x509_crl(f.read(), default_backend())
with open(cert_path, "rb") as f:
cert = x509.load_pem_x509_certificate(f.read(), default_backend())
for revoked in crl:
if revoked.serial_number == cert.serial_number:
return True
return False
# Check before establishing mTLS
if is_revoked(Path("/etc/pyvorin/certs/edge.cert.pem"), CRL_PATH):
raise ssl.SSLError("Local certificate has been revoked — halting.")
Operational Best Practices
- Pin the CA, not the leaf. Pinning the CA certificate in your trust store allows you to rotate leaf certificates without updating device firmware.
- Set short leaf lifetimes. Issue 90-day certificates and rotate them automatically every 60 days. Short lifetimes limit the window of exposure if a key is leaked.
- Monitor expiry with the agent health monitor. Add a health check that reads
/etc/pyvorin/certs/edge.cert.pemand alerts when fewer than 14 days remain. - Use filesystem immutability for CA certs. After provisioning, run
chattr +i /etc/pyvorin/certs/ca.cert.pemto prevent accidental or malicious replacement. - Log every rotation. Use
PrivacyAudit.log_action()frompyv_edge_agent/privacy_firewall/audit.pyto record rotation events with the certificate serial number and timestamp.
Summary
Mutual TLS transforms your edge fleet from a trust-everything model into a cryptographically
authenticated mesh where every device has a unique identity. By generating a private CA, issuing
per-device certificates, subclassing MQTTAdapter and HTTPAdapter with
TLS contexts, and implementing automated rotation with CRL or OCSP revocation, you create a
transport layer that is resilient to device theft, certificate leakage, and man-in-the-middle
attacks. The shadow directory pattern ensures that rotation is atomic and rollback-safe, while
short certificate lifetimes keep your blast radius small.