Source code for nyxmon.adapters.runner.executors.http_executor

"""HTTP check executor implementation."""

import asyncio
import time
from typing import Any, Optional

import anyio
import httpx

from ....domain import Check, Result, ResultStatus
from ....domain.http_config import HttpCheckConfig


[docs] class HttpCheckExecutor: """Executor for HTTP checks. Performs HTTP requests and validates responses. Can accept an external client or create its own. """ def __init__(self, client: Optional[httpx.AsyncClient] = None): """Initialize HTTP executor. Args: client: Optional shared httpx client. If None, creates its own. """ self._client = client self._owns_client = client is None self._created_client: Optional[httpx.AsyncClient] = None self._client_lock = asyncio.Lock() async def _get_client(self) -> httpx.AsyncClient: """Get or create the HTTP client. Thread-safe creation of HTTP client for concurrent check execution. Returns: HTTP client instance """ if self._client is not None: return self._client # Protect client creation from concurrent access async with self._client_lock: if self._created_client is None: self._created_client = httpx.AsyncClient(follow_redirects=True) return self._created_client
[docs] async def execute(self, check: Check) -> Result: """Execute an HTTP check and return a Result. Args: check: The HTTP check to execute Returns: Result with HTTP response information """ try: config = HttpCheckConfig.from_dict(check.data) config.validate() except ValueError as exc: return self._error( check.check_id, "configuration_error", str(exc), {"attempt": 0, "attempts": 0}, ) client = await self._get_client() attempts = config.retries + 1 for attempt in range(1, attempts + 1): start = time.time() try: response = await client.get(check.url, timeout=config.timeout) status_code = self._response_status_code(response) if status_code is not None and not 200 <= status_code < 300: result = await self._http_status_error_result( check.check_id, status_code, attempt, attempts, config, ) if result is None: continue return result if status_code is None: response.raise_for_status() duration_ms = int((time.time() - start) * 1000) data: dict[str, Any] = {} if attempt > 1 or attempts > 1: data.update( { "attempt": attempt, "attempts": attempts, "duration_ms": duration_ms, } ) return Result( check_id=check.check_id, status=ResultStatus.OK, data=data, ) except httpx.HTTPStatusError as exc: status_code = exc.response.status_code result = await self._http_status_error_result( check.check_id, status_code, attempt, attempts, config, ) if result is None: continue return result except httpx.TimeoutException as exc: error_data = { "error_type": "timeout", "error_msg": str(exc), "attempt": attempt, "attempts": attempts, } if attempt < attempts: await anyio.sleep(config.retry_delay) continue return Result( check_id=check.check_id, status=ResultStatus.ERROR, data=error_data, ) except httpx.ConnectError as exc: error_data = { "error_type": "connection_error", "error_msg": str(exc), "attempt": attempt, "attempts": attempts, } if attempt < attempts: await anyio.sleep(config.retry_delay) continue return Result( check_id=check.check_id, status=ResultStatus.ERROR, data=error_data, ) except httpx.RequestError as exc: error_data = { "error_type": "request_error", "error_msg": str(exc), "attempt": attempt, "attempts": attempts, } if attempt < attempts: await anyio.sleep(config.retry_delay) continue return Result( check_id=check.check_id, status=ResultStatus.ERROR, data=error_data, ) except Exception as exc: # noqa: BLE001 return self._error( check.check_id, "unexpected_error", str(exc), {"attempt": attempt, "attempts": attempts}, ) return self._error( check.check_id, "unexpected_error", "HTTP check failed", {"attempt": attempts, "attempts": attempts}, )
def _response_status_code(self, response: httpx.Response) -> int | None: status_code = getattr(response, "status_code", None) if isinstance(status_code, int): return status_code return None async def _http_status_error_result( self, check_id: int, status_code: int, attempt: int, attempts: int, config: HttpCheckConfig, ) -> Result | None: error_data = { "error_type": "http_error", "error_msg": f"HTTP {status_code}", "status_code": status_code, "attempt": attempt, "attempts": attempts, } if status_code in config.retry_status_codes and attempt < attempts: await anyio.sleep(config.retry_delay) return None return Result( check_id=check_id, status=ResultStatus.ERROR, data=error_data, ) def _error( self, check_id: int, error_type: str, msg: str, data: dict[str, Any] ) -> Result: payload: dict[str, Any] = {"error_type": error_type, "error_msg": msg} payload.update(data) return Result(check_id=check_id, status=ResultStatus.ERROR, data=payload)
[docs] async def aclose(self) -> None: """Close the HTTP client if we created it. Only closes the client if this executor created it. Externally provided clients are not closed. """ if self._owns_client and self._created_client is not None: await self._created_client.aclose() self._created_client = None