AsyncCheckRunner Execution Flow¶
This note documents how src/nyxmon/adapters/runner/async_runner.py bridges NyxMon’s synchronous message bus with the asynchronous check executors. It complements the high-level architecture description and is intended as an implementation-level guide.
Last Updated: 2025-09-30 - Refactored to remove legacy fallback, add conditional HTTP client creation, and implement executor cleanup.
1. Entry Point: CheckRunner.run_all¶
The service-layer handler (
src/nyxmon/service_layer/handlers.py) callsCheckRunner.run_all(checks, result_received)inside the synchronous message bus loop.AsyncCheckRunnerimplements that interface. Its constructor receives a sharedBlockingPortalProvider(created insrc/nyxmon/bootstrap.py) so it can spin up an AnyIO event loop when required.
2. Using a Portal to Enter the Async World¶
run_all defines an async helper run_checks that awaits _async_run_all. To execute this coroutine from synchronous code, it:
enters the portal context (
with self.portal_provider as portal:);calls
portal.call(run_checks, result_received)which blocks until the coroutine finishes.
The portal ensures all asynchronous work happens on a dedicated AnyIO loop while the caller remains synchronous.
3. _async_run_all: Fan Out Work and Gather Results¶
_async_run_all
│
├─ create memory object stream → (send_channel, receive_channel)
├─ pre-scan checks to determine required resources
│ └─ conditionally create httpx.AsyncClient (only if HTTP/JSON-HTTP checks present)
├─ register per-check-type executors
├─ start an AnyIO task group
│ └─ for each check → `tg.start_soon(self._run_one, check, send_channel)`
├─ clean up executors and HTTP client (in finally block)
└─ read from `receive_channel` and yield results up the stack
Key details:
Resource Scoping (NEW): The HTTP client is only created when the batch contains HTTP or JSON-HTTP checks. DNS-only batches avoid the overhead entirely.
The memory channels decouple producers (
_run_one) from the consumer (run_checks).Executors are registered per batch and can optionally receive shared resources (e.g. the HTTP client).
Cleanup Guarantee (NEW): The
finallyblock ensures executors and the HTTP client are properly closed, even if errors occur during execution.
4. Executor Registry¶
AsyncCheckRunner owns an ExecutorRegistry (src/nyxmon/adapters/runner/executors/__init__.py). The registry now supports:
Executor Factory Pattern: Executors can be registered as factories that create instances with per-batch context.
Strict Type Checking: Missing check types raise
UnknownCheckTypeErrorinstead of silently falling back.Cleanup Protocol: Executors can implement
aclose()for resource cleanup.
_register_executors wires up:
HttpCheckExecutorforhttpandjson-httpcheck types:Accepts optional
httpx.AsyncClient(shared if batch contains HTTP checks)Can create its own client if needed
Implements
aclose()to clean up self-created clients
DnsCheckExecutorfordnschecks:Performs DNS lookups via
dnspythonSupports optional source-IP binding for split-horizon DNS
Implements
aclose()as no-op (no resources to clean)
Additional executors can be added without modifying _run_one; they only need to be registered against a new check type.
5. _run_one: Execute a Single Check¶
Each task group member executes _run_one(check, send_channel):
Look up the executor for
check.check_typein the registry.Raises
UnknownCheckTypeErrorif no executor is registered (no fallback).Await
executor.execute(check).On success or failure, push the resulting
Resultonto thesend_channel.
Breaking Change: The legacy _run_http_check fallback has been removed. All check types must have registered executors.
6. Returning to the Synchronous Callback¶
Back in run_checks, the async generator from _async_run_all yields each Result. For every item, run_checks uses anyio.to_thread.run_sync(result_received_callback, result) to invoke the synchronous result_received function without blocking the event loop. That callback usually:
stores the result via the unit of work,
schedules the next check execution time,
enqueues follow-up commands (e.g.
AddCheckResult).
7. Resource Cleanup and Ordering Guarantees¶
Executor Cleanup (NEW): The
finallyblock in_async_run_allcallsexecutor_registry.aclose_all()to close all instantiated executors, ensuring resources are released even if execution fails.HTTP Client Cleanup (NEW): The HTTP client (if created) is explicitly closed in the
finallyblock.Task-group cancellation: if one
_run_oneraises, AnyIO cancels the others. Each executor must convert domain errors intoResultStatus.ERRORso failures are reported rather than propagated. Unexpected exceptions are wrapped inBaseExceptionGroupby AnyIO.Channel closing: after all tasks complete,
_async_run_allcloses the send channel to terminate the consumer loop cleanly.The outer portal context ensures the event loop is closed before
run_allreturns to the message bus.
8. Opportunities for Future Refinement¶
Executor registration: Consider moving registration into
__init__if executors don’t need per-batch resourcesBack-pressure tuning:
max_buffer_size=100is arbitrary; monitoring extremely large batches may need a different value or a semaphoreBatch-level metrics: Track HTTP client reuse, executor instantiation, and cleanup timing
Understanding this flow should make it easier to extend the runner (additional check types, instrumentation, etc.) while keeping the synchronous message bus intact.