Overview
The HTTPAdapter serves two roles in the Pyvorin Edge SDK: it is an ingress receiver (a lightweight threaded HTTP server that accepts POSTed sensor data) and an egress client (a helper for posting upstream to REST endpoints). This dual nature makes it ideal for integrations where sensors push to the edge device, or where the edge device must poll a legacy SCADA system.
Starting the Webhook Server
start_server() launches a background HTTPServer on the specified host, port, and endpoint path. It accepts only POST requests with JSON payloads. Valid payloads are normalised into SensorReading dicts and forwarded to your callback.
from pyv_edge_agent.ingest import HTTPAdapter
adapter = HTTPAdapter()
adapter.on_post(lambda reading: print(f"Received: {reading}"))
adapter.start_server(host="0.0.0.0", port=8080, endpoint="/ingest")
# Server is now running in a daemon thread
import time
time.sleep(3600)
adapter.stop_server()
Expected Payload Format
The adapter strictly expects a JSON object with the following keys. Missing keys default to sensible empty values.
| Key | Type | Default |
|---|---|---|
sensor_name | string | "" |
timestamp | number | 0 |
value | number | 0 |
unit | string | "" |
metadata | object | {} |
Polling REST Endpoints
When sensors do not push data, the edge device must pull. Use Python's urllib (or requests if installed) inside a polling loop. The adapter does not include a built-in poller, but the pattern is trivial to implement.
import json
import time
import urllib.request
from pyvorin_edge.pipeline import Pipeline
pipeline = Pipeline("scada_poll")
def poll_sensor(url: str, headers: dict | None = None):
req = urllib.request.Request(url, headers=headers or {})
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
return {
"sensor_name": data["tag"],
"timestamp": time.time(),
"value": float(data["value"]),
"unit": data.get("unit", ""),
"metadata": {"source": url},
}
while True:
reading = poll_sensor("http://scada.local/api/v1/tags/TT-101")
pipeline.run([reading])
time.sleep(5.0)
Authentication Headers
Whether polling or receiving, production APIs require authentication. Bearer tokens and API keys are injected via headers.
headers = {
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"X-API-Key": "pk_live_abc123",
}
req = urllib.request.Request(
"https://api.factory.io/sensors/latest",
headers=headers,
)
Rate Limiting
The adapter server does not enforce rate limits. If you expose the webhook to the public internet, wrap the callback with a token-bucket limiter or place nginx in front.
import time
class RateLimiter:
def __init__(self, max_calls: int, period: float):
self.max_calls = max_calls
self.period = period
self.tokens = max_calls
self.last = time.monotonic()
def __call__(self, fn):
def wrapper(reading):
now = time.monotonic()
self.tokens = min(self.max_calls, self.tokens + (now - self.last) * self.max_calls / self.period)
self.last = now
if self.tokens < 1:
return
self.tokens -= 1
return fn(reading)
return wrapper
@RateLimiter(max_calls=10, period=1.0)
def limited_callback(reading):
pipeline.run([reading])
adapter.on_post(limited_callback)
JSON and CSV Parsing
The server accepts only JSON. For CSV ingest, use the FileReplayAdapter or write a thin translation layer that POSTs CSV rows as JSON to the local adapter.
import csv
import json
import urllib.request
def stream_csv_to_adapter(csv_path: str, adapter_url: str):
with open(csv_path, newline="", encoding="utf-8") as fh:
reader = csv.DictReader(fh)
for row in reader:
payload = json.dumps({
"sensor_name": row["sensor_name"],
"timestamp": float(row["timestamp"]),
"value": float(row["value"]),
"unit": row.get("unit", ""),
"metadata": {},
}).encode("utf-8")
req = urllib.request.Request(
adapter_url,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
urllib.request.urlopen(req, timeout=5)
stream_csv_to_adapter("/data/log.csv", "http://localhost:8080/ingest")
Posting Upstream
post_to_upstream() sends a JSON payload to any URL and returns the parsed JSON response. It wraps urllib.request with a 30-second timeout and raises URLError on failure.
from pyv_edge_agent.ingest import HTTPAdapter
client = HTTPAdapter()
response = client.post_to_upstream(
"https://cloud.pyvorin.com/v1/ingest",
{
"device_id": "edge-001",
"readings": [reading.to_dict() for reading in batch],
},
)
print(response)
Full Example with FastAPI Sensor Mock
Below is a complete two-process example. One process runs a FastAPI mock sensor. The other runs the Pyvorin edge pipeline with HTTPAdapter.
# mock_sensor.py (run with: uvicorn mock_sensor:app --port 9000)
from fastapi import FastAPI
from fastapi.responses import JSONResponse
import time
import random
app = FastAPI()
@app.post("/reading")
def get_reading():
return JSONResponse({
"sensor_name": "mock_temp",
"timestamp": time.time(),
"value": 20.0 + random.uniform(-2.0, 5.0),
"unit": "°C",
"metadata": {"mock": True},
})
# edge_client.py
import time
import urllib.request
import json
from pyvorin_edge.pipeline import Pipeline, RuleConfig, WindowConfig
from pyvorin_edge.sensors import Sensor, SensorType
from pyv_edge_agent.ingest import HTTPAdapter
pipeline = Pipeline("fastapi_demo")
pipeline.add_sensor(Sensor("mock_temp", SensorType.TEMPERATURE, "°C"))
pipeline.add_window(WindowConfig(duration_seconds=60.0, window_type="rolling", sensor_name="mock_temp"))
pipeline.add_rule(RuleConfig(
name="warm",
condition=lambda r: r["value"] > 23.0,
severity="info",
cooldown_seconds=10.0,
))
def poll_fastapi():
req = urllib.request.Request("http://localhost:9000/reading", method="POST")
with urllib.request.urlopen(req, timeout=5) as resp:
return json.loads(resp.read().decode("utf-8"))
# Also start local webhook server for external push
adapter = HTTPAdapter()
adapter.on_post(lambda r: pipeline.run([r]))
adapter.start_server(port=8080, endpoint="/ingest")
try:
while True:
reading = poll_fastapi()
result = pipeline.run([reading])
if result.events:
print(f"Event: {result.events[0].to_dict()}")
time.sleep(2.0)
except KeyboardInterrupt:
adapter.stop_server()
Security Considerations
- Bind the webhook server to
127.0.0.1if it only accepts data from a local co-process. - Run nginx or Traefik in front for TLS termination, client-cert validation, and IP allow-listing.
- Validate the
Content-Lengthheader to avoid memory exhaustion from malicious clients.