Source code for nyxmon.adapters.repositories.interface
from typing import List, Protocol, TypeAlias
from ...domain import Result, Check, Service
[docs]
class ResultRepository(Protocol):
"""A repository interface for storing and retrieving results."""
seen: set[Result]
[docs]
def add(self, result: Result) -> None:
"""Add a result to the repository."""
...
[docs]
def get(self, result_id: int) -> Result:
"""Get a result from the repository by ID."""
...
[docs]
def list(self) -> List[Result]:
"""Get a list of all results."""
...
[docs]
def list_for_check(self, check_id: int, limit: int) -> List[Result]:
"""Get recent results for a check, newest first."""
...
[docs]
class CheckRepository(Protocol):
"""A repository interface for storing and retrieving checks."""
seen: set
[docs]
def add(self, check) -> None:
"""Add a check to the repository."""
...
[docs]
def get(self, check_id: int):
"""Get a check from the repository by ID."""
...
[docs]
def list(self) -> List[Check]:
"""Get a list of all checks."""
...
[docs]
async def list_async(self) -> List[Check]:
"""Get a list of all checks asynchronously."""
...
[docs]
class ServiceRepository(Protocol):
"""A repository interface for storing and retrieving services."""
seen: set
[docs]
def add(self, service) -> None:
"""Add a service to the repository."""
...
[docs]
def get(self, service_id: int):
"""Get a service from the repository by ID."""
...
[docs]
def list(self) -> List[Service]:
"""Get a list of all services."""
...
Repository: TypeAlias = ResultRepository | CheckRepository | ServiceRepository
[docs]
class RepositoryStore(Protocol):
"""A protocol for a collection of repositories."""
results: ResultRepository
checks: CheckRepository
services: ServiceRepository
[docs]
def list(self) -> List[Repository]:
"""Get a list of all repositories."""
...