runtimepy.net.arbiter.config package#
Submodules#
runtimepy.net.arbiter.config.codec module#
A module implementing the configuration-object codec interface.
- class runtimepy.net.arbiter.config.codec.ConnectionArbiterConfig(data: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None, schemas: SchemaMap = None, dest_attr: str = 'data', verify: bool = True)[source]#
Bases:
RuntimepyDictCodec
A class for encoding and decoding connection-arbiter configuration data.
- asdict() dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] [source]#
Obtain a dictionary representing this instance.
- directory: Path#
runtimepy.net.arbiter.config.util module#
A module implementing utilites for arbiter configuration data interfaces.
- runtimepy.net.arbiter.config.util.fix_args(data: list[Any], ports: dict[str, int]) list[Any] [source]#
Fix positional arguments.
Module contents#
A module implementing a configuration-file interface for registering client connections or servers.
- class runtimepy.net.arbiter.config.ConfigConnectionArbiter(manager: ConnectionManager = None, stop_sig: Event = None, namespace: Namespace = None, logger: Logger | LoggerAdapter[Any] = None, app: Callable[[AppInfo], Awaitable[int]] | list[Callable[[AppInfo], Awaitable[int]]] = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None, window: Any | None = None, metrics_poller_task: bool = True)[source]#
Bases:
ImportConnectionArbiter
A class implementing a configuration loading interface for the connection arbiter.
- classmethod add_search_package(name: str, front: bool = True) None [source]#
Add a package to the search path.
- async load_configs(paths: Iterable[Path | str | None], wait_for_stop: bool = False) None [source]#
Load a client and server configuration to the arbiter.
- async process_config(config: ConnectionArbiterConfig, wait_for_stop: bool = False) None [source]#
Register clients and servers from a configuration object.
- search_packages: list[str] = ['runtimepy']#