runtimepy.net.arbiter.factory package#
Submodules#
runtimepy.net.arbiter.factory.connection module#
A module implementing an interface extension to the base connection-arbiter so that methods that create connections can be registered by name.
- class runtimepy.net.arbiter.factory.connection.ConnectionFactory[source]#
Bases:
object
An interface for creating client connections.
- async client(name: str, *args, **kwargs) Connection [source]#
Create a client connection.
- async server_task(stop_sig: Event, manager: ConnectionManager, started_sem: Semaphore, *args, **kwargs) Awaitable[None] [source]#
Create a task that will run a connection server.
- class runtimepy.net.arbiter.factory.connection.FactoryConnectionArbiter(manager: ConnectionManager = None, stop_sig: Event = None, namespace: Namespace = None, logger: Logger | LoggerAdapter[Any] = None, app: Callable[[AppInfo], Awaitable[int]] | list[Callable[[AppInfo], Awaitable[int]]] = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None, window: Any | None = None, metrics_poller_task: bool = True)[source]#
Bases:
BaseConnectionArbiter
A class implementing an interface to allow connection factories to be used for creating new connections (that can then be registered).
- async factory_client(factory: str, name: str, *args, defer: bool = False, **kwargs) bool [source]#
Attempt to register a client connection using a registered factory.
- async factory_server(factory: str, *args, **kwargs) bool [source]#
Attempt to create a server task using a registered factory.
- register_connection_factory(factory: ConnectionFactory, *namespaces: str) bool [source]#
Attempt to register a connection factory.
runtimepy.net.arbiter.factory.task module#
A module implementing task-factory registration.
- class runtimepy.net.arbiter.factory.task.TaskConnectionArbiter(manager: ConnectionManager = None, stop_sig: Event = None, namespace: Namespace = None, logger: Logger | LoggerAdapter[Any] = None, app: Callable[[AppInfo], Awaitable[int]] | list[Callable[[AppInfo], Awaitable[int]]] = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None, window: Any | None = None, metrics_poller_task: bool = True)[source]#
Bases:
BaseConnectionArbiter
A class for managing task factories.
- factory_task(factory: str, name: str, period_s: float = None, **kwargs) bool [source]#
Register a periodic task from one of the registered task factories.
- register_task_factory(factory: TaskFactory[ArbiterTask], *namespaces: str) bool [source]#
Attempt to register a periodic task factory.
Module contents#
A module implementing an interface extension to the base connection-arbiter so that methods that create connections can be registered by name.
- class runtimepy.net.arbiter.factory.ConnectionFactory[source]#
Bases:
object
An interface for creating client connections.
- async client(name: str, *args, **kwargs) Connection [source]#
Create a client connection.
- async server_task(stop_sig: Event, manager: ConnectionManager, started_sem: Semaphore, *args, **kwargs) Awaitable[None] [source]#
Create a task that will run a connection server.
- class runtimepy.net.arbiter.factory.FactoryConnectionArbiter(manager: ConnectionManager = None, stop_sig: Event = None, namespace: Namespace = None, logger: Logger | LoggerAdapter[Any] = None, app: Callable[[AppInfo], Awaitable[int]] | list[Callable[[AppInfo], Awaitable[int]]] = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None, window: Any | None = None, metrics_poller_task: bool = True)[source]#
Bases:
BaseConnectionArbiter
A class implementing an interface to allow connection factories to be used for creating new connections (that can then be registered).
- async factory_client(factory: str, name: str, *args, defer: bool = False, **kwargs) bool [source]#
Attempt to register a client connection using a registered factory.
- async factory_server(factory: str, *args, **kwargs) bool [source]#
Attempt to create a server task using a registered factory.
- register_connection_factory(factory: ConnectionFactory, *namespaces: str) bool [source]#
Attempt to register a connection factory.