Source code for runtimepy.net.arbiter.task

"""
A module implement a base class for arbiter periodic tasks.
"""

# built-in
from typing import Generic as _Generic
from typing import TypeVar as _TypeVar

# internal
from runtimepy.net.arbiter.info import AppInfo
from runtimepy.task import PeriodicTask, PeriodicTaskManager


[docs] class ArbiterTask(PeriodicTask): """A base class for arbiter periodic tasks.""" app: AppInfo auto_finalize = False
[docs] async def init(self, app: AppInfo) -> None: """Initialize this task with application information.""" self.app = app
[docs] class ArbiterTaskManager(PeriodicTaskManager[ArbiterTask]): """A task-manger class for the connection arbiter."""
T = _TypeVar("T", bound=ArbiterTask)
[docs] class TaskFactory(_Generic[T]): """A task-factory base class.""" kind: type[T]