runtimepy.net.arbiter.config package#

Submodules#

runtimepy.net.arbiter.config.codec module#

A module implementing the configuration-object codec interface.

class runtimepy.net.arbiter.config.codec.ConnectionArbiterConfig(data: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None, schemas: SchemaMap = None, dest_attr: str = 'data', verify: bool = True)[source]#

Bases: RuntimepyDictCodec

A class for encoding and decoding connection-arbiter configuration data.

asdict() dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]][source]#

Obtain a dictionary representing this instance.

directory: Path#
init(data: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]]) None[source]#

Perform implementation-specific initialization.

runtimepy.net.arbiter.config.util module#

A module implementing utilites for arbiter configuration data interfaces.

runtimepy.net.arbiter.config.util.fix_args(data: list[Any], ports: dict[str, int]) list[Any][source]#

Fix positional arguments.

runtimepy.net.arbiter.config.util.fix_kwargs(data: dict[str, Any]) dict[str, Any][source]#

Fix data depending on nuances of what some Python interfaces require.

runtimepy.net.arbiter.config.util.list_adder(dest: list[T], data: T, front: bool = True) None[source]#

Handle adding to either the front or back of a list.

Module contents#

A module implementing a configuration-file interface for registering client connections or servers.

class runtimepy.net.arbiter.config.ConfigConnectionArbiter(manager: ConnectionManager = None, stop_sig: Event = None, namespace: Namespace = None, logger: Logger | LoggerAdapter[Any] = None, app: Callable[[AppInfo], Awaitable[int]] | list[Callable[[AppInfo], Awaitable[int]]] = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None, window: Any | None = None, metrics_poller_task: bool = True)[source]#

Bases: ImportConnectionArbiter

A class implementing a configuration loading interface for the connection arbiter.

classmethod add_search_package(name: str, front: bool = True) None[source]#

Add a package to the search path.

async load_configs(paths: Iterable[Path | str | None], wait_for_stop: bool = False) None[source]#

Load a client and server configuration to the arbiter.

async process_config(config: ConnectionArbiterConfig, wait_for_stop: bool = False) None[source]#

Register clients and servers from a configuration object.

search_packages: list[str] = ['runtimepy']#
runtimepy.net.arbiter.config.handle_config_builders(data: dict[str, Any], logger: LoggerMixin) None[source]#

Run any configured configuration-data building methods.