Introduction
Pyvorin Edge runs on CPython, which means the Global Interpreter Lock (GIL) is always present. Even when your code is compiled to a native .so, the GIL can become a bottleneck if you do not explicitly release it inside C extensions. This article covers:
- How the GIL affects compiled kernels loaded through
ModuleLoader. - Why
Pipeline.run()is single-threaded and should stay that way. - Releasing the GIL inside native kernels with
Py_BEGIN_ALLOW_THREADS. - Using
ThreadPoolExecutorfor I/O-bound adapters. - Using
ProcessPoolExecutorfor CPU-bound work.
The GIL and Compiled Code
When ModuleLoader.call() invokes a symbol from a .so, ctypes acquires the GIL before entering C and releases it on return. For short scalar calls this is fine. For long-running vectorized kernels it serializes otherwise parallel-ready code.
The Problem
import ctypes
import numpy as np
from pyv_edge_agent.module_host.abi import ABIContract
from pyv_edge_agent.module_host.loader import ModuleLoader
abi = ABIContract(
function_name="heavy_compute",
arg_types=[ctypes.POINTER(ctypes.c_double), ctypes.c_size_t],
return_type=None,
)
loader = ModuleLoader()
loader.load("./libheavy.so", abi)
# This call holds the GIL for the entire duration
loader.call("heavy_compute", arr.ctypes.data_as(ctypes.POINTER(ctypes.c_double)), n)
If another Python thread wants to process MQTT messages or HTTP callbacks while the kernel runs, it must wait.
Releasing the GIL in Native Kernels
To allow true parallelism, wrap the compute-intensive section of your C kernel with the Python C-API macros. This tells the interpreter that your thread may safely drop the GIL.
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <stddef.h>
void heavy_compute(double *restrict data, size_t n) {
Py_BEGIN_ALLOW_THREADS
for (size_t i = 0; i < n; i++) {
data[i] = data[i] * data[i] + 1.0;
}
Py_END_ALLOW_THREADS
}
Build Command with Python Headers
gcc -shared -fPIC -O3 -march=armv8-a+fp+simd \
$(python3-config --includes) \
-o libheavy.so heavy_compute.c \
$(python3-config --ldflags)
Thread Safety in Pipeline
Pipeline.run() in edge_sdk/pyvorin_edge/pipeline.py is not thread-safe. It mutates internal lists (_window_objects, _events, _last_rule_fires) without locks. Calling run() from multiple threads simultaneously will corrupt state.
from pyvorin_edge.pipeline import Pipeline
pipeline = Pipeline("unsafe")
# WRONG: two threads calling run() concurrently
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=2) as ex:
ex.submit(pipeline.run, readings_batch_a)
ex.submit(pipeline.run, readings_batch_b) # race condition!
The correct pattern is a single event loop that serializes pipeline execution, while external threads handle I/O.
ThreadPoolExecutor for I/O-Bound Adapters
HTTP polling, MQTT subscriptions, and file tailing are I/O-bound. Run them in a ThreadPoolExecutor and feed results into a thread-safe queue consumed by the single pipeline thread.
import queue
import time
from concurrent.futures import ThreadPoolExecutor
from typing import List
import requests
from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyvorin_edge.sensors import Sensor, SensorReading, SensorType
class HTTPAdapter:
def __init__(self, url: str, interval: float = 5.0):
self.url = url
self.interval = interval
def poll(self, q: queue.Queue[SensorReading]) -> None:
while True:
try:
resp = requests.get(self.url, timeout=10)
resp.raise_for_status()
data = resp.json()
q.put(
SensorReading(
sensor_name="http_metric",
timestamp=time.time(),
value=float(data["value"]),
unit=data.get("unit", ""),
)
)
except Exception as exc:
print(f"Poll error: {exc}")
time.sleep(self.interval)
def main() -> None:
q: queue.Queue[SensorReading] = queue.Queue(maxsize=1000)
adapter = HTTPAdapter("https://api.example.com/metric")
pipeline = Pipeline("http_pipeline")
pipeline.add_sensor(Sensor("http_metric", SensorType.GENERIC, "count"))
pipeline.add_rule(
RuleConfig(
name="spike",
condition_expr="ctx.value > 1000.0",
severity="warning",
)
)
with ThreadPoolExecutor(max_workers=1) as executor:
executor.submit(adapter.poll, q)
# Single pipeline thread
while True:
batch: List[SensorReading] = []
deadline = time.time() + 1.0
while time.time() < deadline and len(batch) < 100:
try:
batch.append(q.get(timeout=0.05))
except queue.Empty:
break
if batch:
result = pipeline.run(batch)
for ev in result.events:
print(f"Event: {ev.rule_name} @ {ev.timestamp}")
if __name__ == "__main__":
main()
ProcessPoolExecutor for CPU-Bound Work
If you need true CPU parallelism for compiled kernels, spawn a separate process. The GIL is per-process, so a ProcessPoolExecutor lets multiple cores work on different chunks simultaneously.
import ctypes
import numpy as np
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from pyv_edge_agent.module_host.abi import ABIContract
from pyv_edge_agent.module_host.loader import ModuleLoader
SO_PATH = Path("./libvecadd.so")
abi = ABIContract(
function_name="vec_add_f32",
arg_types=[
ctypes.POINTER(ctypes.c_float),
ctypes.POINTER(ctypes.c_float),
ctypes.POINTER(ctypes.c_float),
ctypes.c_size_t,
],
return_type=None,
)
def worker_chunk(args):
"""Run in a child process. Each process loads its own copy of the .so."""
a_bytes, b_bytes, offset, length = args
a = np.frombuffer(a_bytes, dtype=np.float32, count=length)
b = np.frombuffer(b_bytes, dtype=np.float32, count=length)
out = np.empty(length, dtype=np.float32)
loader = ModuleLoader()
loader.load(str(SO_PATH), abi)
loader.call(
"vec_add_f32",
a.ctypes.data_as(ctypes.POINTER(ctypes.c_float)),
b.ctypes.data_as(ctypes.POINTER(ctypes.c_float)),
out.ctypes.data_as(ctypes.POINTER(ctypes.c_float)),
length,
)
loader.unload()
return offset, out.tobytes()
def parallel_vec_add(a: np.ndarray, b: np.ndarray, workers: int = 4) -> np.ndarray:
assert len(a) == len(b)
n = len(a)
chunk = n // workers
futures = []
with ProcessPoolExecutor(max_workers=workers) as executor:
for i in range(workers):
start = i * chunk
end = start + chunk if i < workers - 1 else n
args = (
a[start:end].tobytes(),
b[start:end].tobytes(),
start,
end - start,
)
futures.append(executor.submit(worker_chunk, args))
out = np.empty(n, dtype=np.float32)
for fut in futures:
offset, data = fut.result()
out[offset : offset + len(data) // 4] = np.frombuffer(data, dtype=np.float32)
return out
# Example
if __name__ == "__main__":
n = 1_000_000
a = np.random.rand(n).astype(np.float32)
b = np.random.rand(n).astype(np.float32)
result = parallel_vec_add(a, b, workers=4)
np.testing.assert_allclose(result, a + b, rtol=1e-5)
print("Process-parallel kernel passed.")
Summary
Keep Pipeline.run() on a single thread. Release the GIL inside C kernels with Py_BEGIN_ALLOW_THREADS so other Python threads can run. Use ThreadPoolExecutor for I/O-bound adapters and a queue to serialize data into the pipeline. Use ProcessPoolExecutor only when you need multi-core parallelism for CPU-bound kernels, and load the .so inside each worker.