runtimepy.net.arbiter.factory package#

Submodules#

runtimepy.net.arbiter.factory.connection module#

A module implementing an interface extension to the base connection-arbiter so that methods that create connections can be registered by name.

class runtimepy.net.arbiter.factory.connection.ConnectionFactory[source]#

Bases: object

An interface for creating client connections.

async client(name: str, *args, **kwargs) Connection[source]#

Create a client connection.

async server_task(stop_sig: Event, manager: ConnectionManager, started_sem: Semaphore, *args, **kwargs) Awaitable[None][source]#

Create a task that will run a connection server.

class runtimepy.net.arbiter.factory.connection.FactoryConnectionArbiter(manager: ConnectionManager = None, stop_sig: Event = None, namespace: Namespace = None, logger: Logger | LoggerAdapter[Any] = None, app: Callable[[AppInfo], Awaitable[int]] | list[Callable[[AppInfo], Awaitable[int]]] = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None, window: Any | None = None, metrics_poller_task: bool = True)[source]#

Bases: BaseConnectionArbiter

A class implementing an interface to allow connection factories to be used for creating new connections (that can then be registered).

async factory_client(factory: str, name: str, *args, defer: bool = False, **kwargs) bool[source]#

Attempt to register a client connection using a registered factory.

async factory_server(factory: str, *args, **kwargs) bool[source]#

Attempt to create a server task using a registered factory.

register_connection_factory(factory: ConnectionFactory, *namespaces: str) bool[source]#

Attempt to register a connection factory.

runtimepy.net.arbiter.factory.task module#

A module implementing task-factory registration.

class runtimepy.net.arbiter.factory.task.TaskConnectionArbiter(manager: ConnectionManager = None, stop_sig: Event = None, namespace: Namespace = None, logger: Logger | LoggerAdapter[Any] = None, app: Callable[[AppInfo], Awaitable[int]] | list[Callable[[AppInfo], Awaitable[int]]] = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None, window: Any | None = None, metrics_poller_task: bool = True)[source]#

Bases: BaseConnectionArbiter

A class for managing task factories.

factory_task(factory: str, name: str, period_s: float = None, **kwargs) bool[source]#

Register a periodic task from one of the registered task factories.

register_task_factory(factory: TaskFactory[ArbiterTask], *namespaces: str) bool[source]#

Attempt to register a periodic task factory.

Module contents#

A module implementing an interface extension to the base connection-arbiter so that methods that create connections can be registered by name.

class runtimepy.net.arbiter.factory.ConnectionFactory[source]#

Bases: object

An interface for creating client connections.

async client(name: str, *args, **kwargs) Connection[source]#

Create a client connection.

async server_task(stop_sig: Event, manager: ConnectionManager, started_sem: Semaphore, *args, **kwargs) Awaitable[None][source]#

Create a task that will run a connection server.

class runtimepy.net.arbiter.factory.FactoryConnectionArbiter(manager: ConnectionManager = None, stop_sig: Event = None, namespace: Namespace = None, logger: Logger | LoggerAdapter[Any] = None, app: Callable[[AppInfo], Awaitable[int]] | list[Callable[[AppInfo], Awaitable[int]]] = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None, window: Any | None = None, metrics_poller_task: bool = True)[source]#

Bases: BaseConnectionArbiter

A class implementing an interface to allow connection factories to be used for creating new connections (that can then be registered).

async factory_client(factory: str, name: str, *args, defer: bool = False, **kwargs) bool[source]#

Attempt to register a client connection using a registered factory.

async factory_server(factory: str, *args, **kwargs) bool[source]#

Attempt to create a server task using a registered factory.

register_connection_factory(factory: ConnectionFactory, *namespaces: str) bool[source]#

Attempt to register a connection factory.