Beyond Managed Ingestion
Not every deployment sends data to Pyvorin's managed cloud. Enterprises with existing data
lakes, regulated industries with air-gapped analysis clusters, and OEMs white-labelling the
Edge Runtime all need to receive batches on their own infrastructure. The good news is that
the HTTPCloudUploader speaks plain HTTPS + JSON. Any server that can accept a
POST request and parse JSON can be a Pyvorin Edge receiver.
This article documents the exact payload format, authentication scheme, and response codes that the uploader expects. It then provides complete, production-ready webhook handlers in Flask and FastAPI that you can deploy behind nginx or a cloud load balancer within minutes.
Expected JSON Payload Format
When HTTPCloudUploader.post_batch(items) is called, it serialises the following
envelope:
{
"batch_id": "pyv-1717000000000",
"timestamp": 1717000000.123,
"count": 3,
"items": [
{
"sensor_name": "temp.warehouse.a",
"timestamp": 1717000000.0,
"value": 18.5,
"unit": "celsius",
"metadata": {"zone": "north", "floor": "2"}
},
{
"sensor_name": "pressure.hydraulic",
"timestamp": 1717000001.0,
"value": 101325.0,
"unit": "pascal",
"metadata": {"safety_zone": "A"}
},
{
"sensor_name": "motion.lobby",
"timestamp": 1717000002.0,
"value": 1.0,
"unit": "count",
"metadata": {}
}
]
}
Field semantics:
batch_id— Unique identifier for idempotency. Derived from millisecond timestamp. Format:pyv-{milliseconds_since_epoch}.timestamp— Unix epoch seconds (with fractional precision) when the batch was constructed on the edge device.count— Redundant count ofitemsfor quick validation.items— Array of dictionaries. Each dictionary is typically the output ofSensorReading.to_dict()or a raw event dict that has passed through the privacy engine.
The payload is sent with Content-Type: application/json and a
User-Agent: Pyvorin-Edge/1.0 header. The body is UTF-8 encoded.
Authentication: Bearer Token
The uploader supports API key authentication via the Authorization: Bearer {token}
header. If api_key is non-empty when the HTTPCloudUploader is
constructed, the header is added automatically. Your receiver must validate this token before
processing the payload.
# Excerpt from uploader.py
headers = {
"Content-Type": "application/json",
"User-Agent": "Pyvorin-Edge/1.0",
}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
Token management recommendations:
- Use long random strings (256 bits / 43 base64url characters minimum).
- Store tokens in a database or secret manager, not in source code.
- Support multiple active tokens so that fleet rotation does not cause a global outage.
- Return
401 Unauthorizedfor missing or invalid tokens, and403 Forbiddenif the token is valid but does not have permission for the requested endpoint.
Response Codes and Retry Behaviour
The uploader's retry logic is driven entirely by HTTP status:
| Status | Uploader Behaviour | Receiver Semantics |
|---|---|---|
| 2xx | Return True; items are acknowledged. | Batch accepted and persisted. Safe to deduplicate by batch_id. |
| 400 Bad Request | Return False after 3 attempts. Items are nacked. | Payload malformed or schema violation. Client will retry with same payload, so do not partially process. |
| 401 / 403 | Return False after 3 attempts. Items are nacked. | Authentication or authorisation failure. Client may never succeed without operator intervention. |
| 429 Too Many Requests | Return False after 3 attempts. Items are nacked. | Rate limit exceeded. Include Retry-After header if possible; the uploader does not parse it yet, but future versions may. |
| 5xx Server Error | Return False after 3 attempts. Items are nacked. | Transient server failure. Client will retry with exponential backoff. |
| Network failure (0) | Return False after 3 attempts. Items are nacked. | DNS, TCP, or TLS failure. Client treats as transient. |
Flask Webhook Handler
Below is a minimal but production-ready Flask receiver. It validates the Bearer token,
checks the payload shape, deduplicates by batch_id, and writes items to a local
SQLite database for downstream processing.
import sqlite3
import json
from functools import wraps
from flask import Flask, request, jsonify
app = Flask(__name__)
# In production, load from environment or secret vault
VALID_TOKENS = {"pyv_live_abc123xyz", "pyv_live_backup_456"}
DB_PATH = "edge_ingest.db"
def init_db():
conn = sqlite3.connect(DB_PATH)
conn.execute("""
CREATE TABLE IF NOT EXISTS batches (
batch_id TEXT PRIMARY KEY,
received_at REAL NOT NULL,
payload_json TEXT NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
batch_id TEXT NOT NULL,
sensor_name TEXT NOT NULL,
timestamp REAL NOT NULL,
value REAL,
unit TEXT,
metadata_json TEXT
)
""")
conn.commit()
conn.close()
def require_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
return jsonify({"error": "Missing bearer token"}), 401
token = auth.split(None, 1)[1]
if token not in VALID_TOKENS:
return jsonify({"error": "Invalid token"}), 403
return f(*args, **kwargs)
return decorated
@app.route("/v1/ingest", methods=["POST"])
@require_auth
def ingest():
data = request.get_json(force=True, silent=False)
if not isinstance(data, dict):
return jsonify({"error": "Expected JSON object"}), 400
batch_id = data.get("batch_id")
if not batch_id:
return jsonify({"error": "Missing batch_id"}), 400
conn = sqlite3.connect(DB_PATH)
try:
# Idempotency check
row = conn.execute(
"SELECT 1 FROM batches WHERE batch_id = ?", (batch_id,)
).fetchone()
if row:
return jsonify({"status": "duplicate", "batch_id": batch_id}), 200
# Persist batch envelope
conn.execute(
"INSERT INTO batches (batch_id, received_at, payload_json) VALUES (?, ?, ?)",
(batch_id, data.get("timestamp"), json.dumps(data)),
)
# Persist individual items
for item in data.get("items", []):
conn.execute(
"""
INSERT INTO items (batch_id, sensor_name, timestamp, value, unit, metadata_json)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
batch_id,
item.get("sensor_name", ""),
item.get("timestamp", 0.0),
item.get("value"),
item.get("unit", ""),
json.dumps(item.get("metadata", {})),
),
)
conn.commit()
return jsonify({"status": "ok", "batch_id": batch_id, "count": len(data.get("items", []))}), 201
finally:
conn.close()
@app.route("/v1/health", methods=["GET"])
def health():
return jsonify({"status": "healthy"}), 200
if __name__ == "__main__":
init_db()
app.run(host="0.0.0.0", port=5000)
FastAPI Webhook Handler
FastAPI offers automatic request validation, async handling, and OpenAPI documentation. The following example uses Pydantic models to enforce the expected schema at the HTTP boundary.
import sqlite3
import json
from typing import List, Dict, Any
from fastapi import FastAPI, HTTPException, Depends, Header, status
from pydantic import BaseModel
app = FastAPI(title="Pyvorin Edge Receiver")
VALID_TOKENS = {"pyv_live_abc123xyz", "pyv_live_backup_456"}
DB_PATH = "edge_ingest.db"
class SensorItem(BaseModel):
sensor_name: str
timestamp: float
value: float
unit: str = ""
metadata: Dict[str, Any] = {}
class BatchEnvelope(BaseModel):
batch_id: str
timestamp: float
count: int
items: List[SensorItem]
def get_db():
conn = sqlite3.connect(DB_PATH)
try:
yield conn
finally:
conn.close()
def verify_token(authorization: str = Header(...)):
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing bearer token")
token = authorization.split(None, 1)[1]
if token not in VALID_TOKENS:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid token")
return token
@app.post("/v1/ingest", status_code=201)
def ingest_batch(batch: BatchEnvelope, token: str = Depends(verify_token), conn: sqlite3.Connection = Depends(get_db)):
# Idempotency
row = conn.execute("SELECT 1 FROM batches WHERE batch_id = ?", (batch.batch_id,)).fetchone()
if row:
return {"status": "duplicate", "batch_id": batch.batch_id}
conn.execute(
"INSERT INTO batches (batch_id, received_at, payload_json) VALUES (?, ?, ?)",
(batch.batch_id, batch.timestamp, batch.json()),
)
for item in batch.items:
conn.execute(
"""
INSERT INTO items (batch_id, sensor_name, timestamp, value, unit, metadata_json)
VALUES (?, ?, ?, ?, ?, ?)
""",
(batch.batch_id, item.sensor_name, item.timestamp, item.value, item.unit, json.dumps(item.metadata)),
)
conn.commit()
return {"status": "ok", "batch_id": batch.batch_id, "count": len(batch.items)}
@app.get("/v1/health")
def health():
return {"status": "healthy"}
Batch Schema Validation
Regardless of framework, your receiver should validate the following invariants before persisting data:
batch_idis present and is a string.countequalslen(items). Mismatch suggests truncation or corruption.- Every item has a
sensor_nameandtimestamp. These are the minimum fields required for downstream time-series indexing. valueis numeric (int or float). Non-numeric values may indicate a schema version mismatch.
If validation fails, return 400 Bad Request with a descriptive error message.
Do not return 2xx; the uploader will treat 2xx as success and drop the items from the local
queue, causing permanent data loss if your receiver rejected them silently.
Scaling Considerations
- Use connection pooling. Both Flask and FastAPI examples above open a new SQLite connection per request. For production traffic, switch to PostgreSQL or MySQL with SQLAlchemy connection pooling.
- Accept and queue. Do not perform heavy synchronous processing inside the HTTP handler. Write the batch to a message queue (Redis Streams, RabbitMQ, AWS SQS) and return 201 immediately. This keeps the uploader's timeout window short and prevents queue backpressure on the edge device.
- Implement backpressure signalling. If your downstream pipeline is
saturated, return
503 Service Unavailableor429 Too Many Requests. The edge device will retry with exponential backoff, giving your pipeline time to recover.
Summary
Building a custom cloud receiver for Pyvorin Edge is straightforward because the wire protocol
is intentionally simple: HTTPS POST, JSON envelope, Bearer token auth, and standard HTTP
status codes. By validating the schema, deduplicating by batch_id, and returning
precise status codes, you create a reliable contract that lets the edge device make correct
decisions about retries, acknowledgements, and failure handling. The Flask and FastAPI examples
in this article are drop-in starting points that you can extend with your own persistence,
analytics, and alerting layers.