Source code for nyxmon.adapters.repositories.interface

from typing import List, Protocol, TypeAlias

from ...domain import Result, Check, Service


[docs] class ResultRepository(Protocol): """A repository interface for storing and retrieving results.""" seen: set[Result]
[docs] def add(self, result: Result) -> None: """Add a result to the repository.""" ...
[docs] def get(self, result_id: int) -> Result: """Get a result from the repository by ID.""" ...
[docs] def list(self) -> List[Result]: """Get a list of all results.""" ...
[docs] def list_for_check(self, check_id: int, limit: int) -> List[Result]: """Get recent results for a check, newest first.""" ...
[docs] class CheckRepository(Protocol): """A repository interface for storing and retrieving checks.""" seen: set
[docs] def add(self, check) -> None: """Add a check to the repository.""" ...
[docs] def get(self, check_id: int): """Get a check from the repository by ID.""" ...
[docs] def list(self) -> List[Check]: """Get a list of all checks.""" ...
[docs] async def list_async(self) -> List[Check]: """Get a list of all checks asynchronously.""" ...
[docs] class ServiceRepository(Protocol): """A repository interface for storing and retrieving services.""" seen: set
[docs] def add(self, service) -> None: """Add a service to the repository.""" ...
[docs] def get(self, service_id: int): """Get a service from the repository by ID.""" ...
[docs] def list(self) -> List[Service]: """Get a list of all services.""" ...
Repository: TypeAlias = ResultRepository | CheckRepository | ServiceRepository
[docs] class RepositoryStore(Protocol): """A protocol for a collection of repositories.""" results: ResultRepository checks: CheckRepository services: ServiceRepository
[docs] def list(self) -> List[Repository]: """Get a list of all repositories.""" ...