Source code for nyxmon.adapters.notification

import logging
import os
import json
from datetime import UTC, datetime, timedelta
from typing import Any, Protocol

import httpx
from anyio.from_thread import BlockingPortalProvider

from ..domain.models import Check, Result, Service

logger = logging.getLogger(__name__)


[docs] class Notifier(Protocol): """Interface for notification services."""
[docs] def notify_check_failed(self, check: Check, result: Result) -> None: """Notify about a failed check.""" ...
[docs] def notify_service_status_changed(self, service: Service, status: str) -> None: """Notify about a service status change.""" ...
[docs] class AsyncTelegramNotifier(Notifier): def __init__(self, token: str | None = None, chat_id: str | None = None): self.token = token or os.environ.get("TELEGRAM_BOT_TOKEN") self.chat_id = chat_id or os.environ.get("TELEGRAM_CHAT_ID") if not self.token or not self.chat_id: logger.warning("Telegram notifier initialized without token or chat_id") self.url = ( f"https://api.telegram.org/bot{self.token}/sendMessage" if self.token else "" ) self.portal_provider: BlockingPortalProvider | None = None self.opsgate_submit_base_url = ( os.environ.get("OPSGATE_SUBMIT_BASE_URL", "").strip().rstrip("/") ) self.opsgate_submit_token = os.environ.get("OPSGATE_SUBMIT_TOKEN", "").strip() self.opsgate_approval_base_url = ( os.environ.get("OPSGATE_APPROVAL_BASE_URL", "").strip().rstrip("/") or self.opsgate_submit_base_url ) self.opsgate_ticket_expires_seconds = self._parse_positive_int( os.environ.get("OPSGATE_TICKET_EXPIRES_SECONDS"), default=14400, env_name="OPSGATE_TICKET_EXPIRES_SECONDS", ) self.opsgate_submit_timeout_seconds = float( self._parse_positive_int( os.environ.get("OPSGATE_SUBMIT_TIMEOUT_SECONDS"), default=10, env_name="OPSGATE_SUBMIT_TIMEOUT_SECONDS", ) ) self.opsgate_include_warnings = self._parse_bool( os.environ.get("OPSGATE_SUBMIT_INCLUDE_WARNINGS"), default=False, )
[docs] def set_portal_provider(self, portal_provider: BlockingPortalProvider) -> None: """Set the portal provider for async operations.""" self.portal_provider = portal_provider
@staticmethod def _parse_positive_int(value: str | None, *, default: int, env_name: str) -> int: if value is None or value.strip() == "": return default try: parsed = int(value.strip()) except ValueError: logger.warning("%s is not an integer, using default %s", env_name, default) return default if parsed <= 0: logger.warning("%s must be > 0, using default %s", env_name, default) return default return parsed @staticmethod def _parse_bool(value: str | None, *, default: bool) -> bool: if value is None: return default normalized = value.strip().lower() if normalized == "": return default return normalized in {"1", "true", "yes", "on"} def _opsgate_enabled(self) -> bool: return bool(self.opsgate_submit_base_url and self.opsgate_submit_token) @staticmethod def _isoformat_z(value: datetime) -> str: return value.astimezone(UTC).isoformat().replace("+00:00", "Z") @staticmethod def _safe_json_payload(value: Any) -> dict[str, Any]: if isinstance(value, dict): return value return {"repr": repr(value)} @staticmethod def _json_text(value: Any) -> str: try: return json.dumps(value, indent=2, sort_keys=True, ensure_ascii=False) except TypeError: return json.dumps( {"repr": repr(value)}, indent=2, sort_keys=True, ensure_ascii=False ) def _build_opsgate_prompt(self, check: Check, result: Result) -> str: summary_lines = [ "# Objective", "Investigate and remediate this Nyxmon alert.", "", "## Alert Context", f"- Check ID: {check.check_id}", f"- Check Name: {check.name or 'Unnamed Check'}", f"- Check Type: {check.check_type}", f"- Check URL: {check.url}", f"- Result Status: {result.status}", "", "## Latest Result Data", "```json", self._json_text(self._safe_json_payload(result.data)), "```", "", "## Required Output", "- Identify root cause.", "- Propose/execute the smallest safe remediation.", "- Summarize what changed and what remains risky.", ] return "\n".join(summary_lines) def _build_opsgate_ticket_payload( self, check: Check, result: Result ) -> dict[str, Any]: expires_at = self._isoformat_z( datetime.now(tz=UTC) + timedelta(seconds=self.opsgate_ticket_expires_seconds) ) check_name = check.name or f"Check {check.check_id}" severity = "critical" if result.status == "error" else "warning" return { "title": f"Nyxmon {severity} alert: {check_name}", "summary": f"Nyxmon detected a {severity} state for check {check_name} ({check.url}).", "task_ref": f"nyxmon-check-{check.check_id}", "execution_plan": [ { "role": "investigator", "agent": "codex", "prompt_markdown": self._build_opsgate_prompt(check, result), } ], "context": { "producer": "nyxmon", "check": { "check_id": check.check_id, "name": check.name, "check_type": check.check_type, "url": check.url, "service_id": check.service_id, }, "result": { "status": result.status, "data": self._safe_json_payload(result.data), }, }, "expires_at": expires_at, } def _build_approval_url(self, ticket_id: str) -> str: return f"{self.opsgate_approval_base_url}/tickets/{ticket_id}" @staticmethod def _escape_markdown_link_url(url: str) -> str: return url.replace("\\", "\\\\").replace(")", "\\)") async def _create_opsgate_ticket( self, check: Check, result: Result ) -> dict[str, str] | None: if not self._opsgate_enabled(): return None payload = self._build_opsgate_ticket_payload(check, result) endpoint = f"{self.opsgate_submit_base_url}/api/v1/tickets" headers = {"Authorization": f"Bearer {self.opsgate_submit_token}"} try: async with httpx.AsyncClient() as client: response = await client.post( endpoint, headers=headers, json=payload, timeout=self.opsgate_submit_timeout_seconds, ) except Exception as exc: logger.error( "OpsGate ticket submit failed for check_id=%s: %s", check.check_id, exc ) return {"status": "error"} body: dict[str, Any] = {} try: parsed = response.json() if isinstance(parsed, dict): body = parsed except Exception: body = {} if response.status_code == 201: ticket_id = str(body.get("id", "")).strip() if not ticket_id: logger.error( "OpsGate ticket submit succeeded without ticket id for check_id=%s", check.check_id, ) return {"status": "error"} return {"status": "created", "ticket_id": ticket_id} if response.status_code == 409 and body.get("error") == "duplicate_open_ticket": logger.info( "OpsGate duplicate open ticket ignored for check_id=%s task_ref=%s", check.check_id, f"nyxmon-check-{check.check_id}", ) return {"status": "duplicate"} logger.error( "OpsGate ticket submit returned status=%s for check_id=%s body=%s", response.status_code, check.check_id, body, ) return {"status": "error"}
[docs] async def async_send(self, text: str, high_priority: bool = False) -> None: """Send a message via Telegram asynchronously.""" if not self.token or not self.chat_id or not self.url: logger.warning("Cannot send Telegram notification: missing credentials") return try: payload = { "chat_id": self.chat_id, "text": text, "parse_mode": "MarkdownV2", "disable_notification": not high_priority, } async with httpx.AsyncClient() as client: resp = await client.post(self.url, data=payload, timeout=10.0) resp.raise_for_status() except Exception as e: logger.error(f"Failed to send Telegram notification: {e}")
[docs] @staticmethod def escape_markdown_v2(text: str) -> str: """Escape special characters for Telegram's MarkdownV2 format.""" # Characters that need escaping in MarkdownV2: _ * [ ] ( ) ~ ` > # + - = | { } . ! special_chars = [ "_", "*", "[", "]", "(", ")", "~", "`", ">", "#", "+", "-", "=", "|", "{", "}", ".", "!", ] escaped_text = text for char in special_chars: escaped_text = escaped_text.replace(char, f"\\{char}") return escaped_text
[docs] async def async_notify_check_failed(self, check: Check, result: Result) -> None: """Notify about a failed check asynchronously.""" error_msg = result.data.get("error_msg", "Unknown error") error_type = result.data.get("error_type", "") status_code = result.data.get("status_code", "") # Determine severity from result status (ERROR=critical, WARNING=warning) is_critical = result.status == "error" # Escape all text for MarkdownV2 escaped_name = ( self.escape_markdown_v2(check.name) if check.name else "Unnamed Check" ) escaped_url = self.escape_markdown_v2(check.url) escaped_error_msg = self.escape_markdown_v2(str(error_msg)) escaped_error_type = self.escape_markdown_v2(str(error_type)) # Use different emoji and title based on severity if is_critical: message = "🔴 *Check Failed \\(Critical\\)*\n" else: message = "⚠️ *Check Warning*\n" message += f"Name: {escaped_name}\n" message += f"URL: {escaped_url}\n" if status_code: message += f"Status: {status_code}\n" if error_type: message += f"Error Type: {escaped_error_type}\n" message += f"Error: {escaped_error_msg}" ticket_info: dict[str, str] | None = None if is_critical or self.opsgate_include_warnings: ticket_info = await self._create_opsgate_ticket(check, result) if ticket_info and ticket_info.get("status") == "created": ticket_id = ticket_info["ticket_id"] escaped_ticket_id = self.escape_markdown_v2(ticket_id) approval_url = self._build_approval_url(ticket_id) escaped_approval_url = self._escape_markdown_link_url(approval_url) message += "\n\n🛠 *OpsGate Approval Needed*" message += f"\nTicket: `{escaped_ticket_id}`" message += f"\n[Open approval page]({escaped_approval_url})" elif ticket_info and ticket_info.get("status") == "duplicate": message += "\n\n🛠 *OpsGate*" message += ( "\nAn open remediation ticket already exists for this check\\." ) elif ticket_info and ticket_info.get("status") == "error": message += "\n\n🛠 *OpsGate*" message += "\nTicket creation failed; please create one manually\\." # Only use high priority (with sound) for critical failures await self.async_send(message, high_priority=is_critical)
[docs] async def async_notify_service_status_changed( self, service: Service, status: str ) -> None: """Notify about a service status change asynchronously.""" service_name = service.data.get("name", f"Service {service.service_id}") escaped_service_name = self.escape_markdown_v2(service_name) escaped_status = self.escape_markdown_v2(status) emoji = ( "🔴" if status.lower() == "down" else "🟢" if status.lower() == "up" else "⚠️" ) message = f"{emoji} *Service Status Changed*\n" message += f"Service: {escaped_service_name}\n" message += f"Status: {escaped_status}" await self.async_send(message, high_priority=True)
# Sync methods that call async methods through the portal
[docs] def notify_check_failed(self, check: Check, result: Result) -> None: """Notify about a failed check.""" if self.portal_provider is None: logger.warning("Cannot send notification: portal provider not set") return with self.portal_provider as portal: portal.call(self.async_notify_check_failed, check, result)
[docs] def notify_service_status_changed(self, service: Service, status: str) -> None: """Notify about a service status change.""" if self.portal_provider is None: logger.warning("Cannot send notification: portal provider not set") return with self.portal_provider as portal: portal.call(self.async_notify_service_status_changed, service, status)
[docs] class LoggingNotifier(Notifier): """A simple notifier that logs messages to the console."""
[docs] def notify_check_failed(self, check: Check, result: Result) -> None: """Log a failed check notification.""" check_name = check.name if check.name else f"Check {check.check_id}" logger.error( f"Check failed: {check_name} (ID: {check.check_id}), Result: {result}" )
[docs] def notify_service_status_changed(self, service: Service, status: str) -> None: """Log a service status change notification.""" logger.info(f"Service status changed: {service.service_id}, Status: {status}")
[docs] def set_portal_provider(self, portal_provider: BlockingPortalProvider) -> None: """Set the portal provider for async operations.""" self._portal_provider = portal_provider