Source code for nyxmon.adapters.runner.executors.dns_executor

"""DNS check executor implementation."""

import time
from dataclasses import dataclass
from typing import Any, List, Protocol

import dns.asyncresolver
import dns.rdatatype
import dns.rcode
import dns.resolver

from ....domain import Check, Result, ResultStatus
from ....domain.dns_config import DnsCheckConfig


[docs] @dataclass class DnsResolverResult: """Structured result returned by DNS resolvers.""" records: List[str] metadata: dict[str, Any]
[docs] class DnsResolver(Protocol): """Protocol describing the resolver interface expected by the executor."""
[docs] async def query(self, domain: str, config: DnsCheckConfig) -> DnsResolverResult: """Resolve ``domain`` according to ``config`` and return structured data."""
[docs] class DnspythonResolver: """Resolver implementation backed by dnspython."""
[docs] async def query(self, domain: str, config: DnsCheckConfig) -> DnsResolverResult: resolver = dns.asyncresolver.Resolver() if config.dns_server: resolver.nameservers = [config.dns_server] resolver.timeout = config.timeout resolver.lifetime = config.timeout if config.source_ip: answer = await resolver.resolve( domain, config.query_type, source=config.source_ip ) else: answer = await resolver.resolve(domain, config.query_type) resolved_data: list[str] = [] for rdata in answer: if config.query_type in {"A", "AAAA"}: resolved_data.append(rdata.to_text()) elif config.query_type == "MX": resolved_data.append(rdata.exchange.to_text()) elif config.query_type == "TXT": resolved_data.append( " ".join( [ s.decode() if isinstance(s, bytes) else s for s in rdata.strings ] ) ) else: resolved_data.append(rdata.to_text()) metadata: dict[str, Any] = { "response_code": dns.rcode.to_text(answer.response.rcode()), "questions": [ f"{answer.qname.to_text()} IN {dns.rdatatype.to_text(answer.rdtype)}" ], "rrset": resolved_data.copy(), "dns_server": config.dns_server or "system", } if config.source_ip: metadata["source_address"] = config.source_ip return DnsResolverResult(records=resolved_data, metadata=metadata)
[docs] class DnsCheckExecutor: """Executor for DNS checks. Performs DNS queries and validates results against expected IPs. """ def __init__(self, resolver: DnsResolver | None = None) -> None: self._resolver: DnsResolver = resolver or DnspythonResolver()
[docs] async def execute(self, check: Check) -> Result: """Execute a DNS check and return a Result.""" start_time = time.time() try: config = DnsCheckConfig.from_dict(check.data) config.validate() resolver_result = await self._resolver.query(check.url, config) resolved_ips = resolver_result.records dns_metadata = resolver_result.metadata query_time_ms = int((time.time() - start_time) * 1000) if self._check_ip_match(resolved_ips, config.expected_ips): return self._create_success_result( check.check_id, resolved_ips, query_time_ms, dns_metadata, ) return self._create_mismatch_result( check.check_id, config.expected_ips, resolved_ips, query_time_ms, dns_metadata, ) except dns.resolver.NXDOMAIN: return self._create_error_result( check.check_id, "nxdomain", f"Domain {check.url} does not exist", ) except dns.resolver.Timeout: return self._create_error_result( check.check_id, "timeout", f"DNS query timed out for {check.url}", ) except dns.resolver.NoAnswer: return self._create_error_result( check.check_id, "no_answer", f"No answer received for {check.url}", ) except ValueError as err: return self._create_error_result( check.check_id, "configuration_error", str(err), ) except Exception as err: # noqa: BLE001 - bubble unexpected errors to result return self._create_error_result( check.check_id, "unexpected_error", str(err), )
def _check_ip_match(self, resolved: List[str], expected: List[str]) -> bool: """Check if any resolved IP matches any expected IP. Args: resolved: List of resolved IPs expected: List of expected IPs Returns: True if there's at least one match """ resolved_set = set(resolved) expected_set = set(expected) return bool(resolved_set & expected_set) def _create_success_result( self, check_id: int, resolved_ips: List[str], query_time_ms: int, metadata: dict[str, Any], ) -> Result: """Create a successful result. Args: check_id: The check ID resolved_ips: List of resolved IPs query_time_ms: Query time in milliseconds metadata: DNS metadata Returns: Success Result """ data = { "resolved_ips": resolved_ips, "query_time_ms": query_time_ms, **metadata, } return Result(check_id=check_id, status=ResultStatus.OK, data=data) def _create_mismatch_result( self, check_id: int, expected: List[str], actual: List[str], query_time_ms: int, metadata: dict[str, Any], ) -> Result: """Create a resolution mismatch result. Args: check_id: The check ID expected: Expected IPs actual: Actual resolved IPs query_time_ms: Query time in milliseconds metadata: DNS metadata Returns: Error Result for mismatch """ data = { "error_type": "resolution_mismatch", "expected": expected, "actual": actual, "query_time_ms": query_time_ms, **metadata, } return Result(check_id=check_id, status=ResultStatus.ERROR, data=data) def _create_error_result( self, check_id: int, error_type: str, error_msg: str ) -> Result: """Create an error result. Args: check_id: The check ID error_type: Type of error error_msg: Error message Returns: Error Result """ return Result( check_id=check_id, status=ResultStatus.ERROR, data={"error_type": error_type, "error_msg": error_msg}, )
[docs] async def aclose(self) -> None: """Close the executor. DNS executor doesn't manage any resources, so this is a no-op. """ pass