runtimepy.net.arbiter package

Contents

runtimepy.net.arbiter package#

Subpackages#

Submodules#

runtimepy.net.arbiter.base module#

A module implementing a base connection-arbiter interface.

class runtimepy.net.arbiter.base.BaseConnectionArbiter(manager: ConnectionManager = None, stop_sig: Event = None, namespace: Namespace = None, logger: Logger | LoggerAdapter[Any] = None, app: Callable[[AppInfo], Awaitable[int]] | list[Callable[[AppInfo], Awaitable[int]]] = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None, window: Any | None = None, metrics_poller_task: bool = True)[source]#

Bases: NamespaceMixin, LoggerMixin, TuiMixin

A class implementing a base connection-manager for a broader application.

async app(app: Callable[[AppInfo], Awaitable[int]] | list[Callable[[AppInfo], Awaitable[int]]] = None, check_connections: bool = True, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None) int[source]#

Run the application alongside the connection manager and server tasks.

register_connection(connection: Connection | Awaitable[Connection], *names: str, delim: str = None) bool[source]#

Attempt to register a connection object.

run(app: Callable[[AppInfo], Awaitable[int]] | list[Callable[[AppInfo], Awaitable[int]]] = None, eloop: AbstractEventLoop = None, signals: Iterable[int] = None, check_connections: bool = True, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None, enable_uvloop: bool = True) int[source]#

Run the application until the stop signal is set.

async runtimepy.net.arbiter.base.init_only(app: AppInfo) int[source]#

A network application that doesn’t do anything.

runtimepy.net.arbiter.base.normalize_app(app: Callable[[AppInfo], Awaitable[int]] | list[Callable[[AppInfo], Awaitable[int]]] = None) list[list[Callable[[AppInfo], Awaitable[int]]]][source]#

Normalize some application parameter into a list of network applications.

runtimepy.net.arbiter.info module#

A module implementing an application information interface.

class runtimepy.net.arbiter.info.AppInfo(logger: Logger | LoggerAdapter[Any], stack: AsyncExitStack, connections: MutableMapping[str, Connection], conn_manager: ConnectionManager, names: Namespace, stop: Event, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], ports: dict[str, int], tui: TuiMixin, tasks: dict[str, PeriodicTask], task_manager: PeriodicTaskManager[Any], results: list[list[AppResult]], structs: dict[str, RuntimeStructBase], peers: dict[str, _RuntimepyPeer])[source]#

Bases: LoggerMixin

References provided to network applications.

async all_finalized() None[source]#

Wait for all tasks and connections to be finalized.

config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]]#
config_param(key: str, default: Z, strict: bool = False) Z[source]#

Attempt to get a configuration parameter.

conn_manager: ConnectionManager#
connections: MutableMapping[str, Connection]#
exceptions() Iterator[Exception][source]#

Iterate over exceptions raised by the application.

logger: Logger | LoggerAdapter[Any]#
names: Namespace#
original_config() dict[str, Any][source]#

Re-assemble a dictionary closer to the original configuration data (than the .config attribute).

peers: dict[str, _RuntimepyPeer]#
ports: dict[str, int]#
property raised_exception: bool#

Determine if the application raised any exception.

result(logger: Logger | LoggerAdapter[Any] = None) bool[source]#

Get the overall boolean result for the application.

results: list[list[AppResult]]#
search(*names: str, pattern: str = '.*', kind: type[~runtimepy.net.arbiter.info.T] = <class 'runtimepy.net.connection.Connection'>) Iterator[T][source]#

Get all connections that are matching a naming convention or are specific kind (or both).

search_structs(kind: type[W], pattern: str = '.*') Iterator[W][source]#

Search for structs by type or name.

search_tasks(kind: type[V], pattern: str = '.*') Iterator[V][source]#

Search for tasks by type or pattern.

single(*names: str, pattern: str = '.*', kind: type[~runtimepy.net.arbiter.info.T] = <class 'runtimepy.net.connection.Connection'>) T[source]#

Search for a single node.

stack: AsyncExitStack#
stop: Event#
structs: dict[str, RuntimeStructBase]#
task_manager: PeriodicTaskManager[Any]#
tasks: dict[str, PeriodicTask]#
tui: TuiMixin#
with_new_logger(name: str) AppInfo[source]#

Get a copy of this AppInfo instance, but with a new logger.

class runtimepy.net.arbiter.info.RuntimeStruct(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], markdown: str = None)[source]#

Bases: RuntimeStructBase, ABC

A class implementing a base runtime structure.

app: AppInfo#
array: PrimitiveArray#
async build(app: AppInfo, **kwargs) None[source]#

Build a struct instance’s channel environment.

byte_order: ByteOrder = 4#
final_poll = False#
init_env() None[source]#

Initialize this sample environment.

update_byte_order(byte_order: ByteOrder, **kwargs) None[source]#

Update the over-the-wire byte order for this struct.

class runtimepy.net.arbiter.info.SampleStruct(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], markdown: str = None)[source]#

Bases: TrigStruct

A sample runtime structure.

final_poll = True#
init_env() None[source]#

Initialize this sample environment.

poll() None[source]#

A method that other runtime entities can call to perform canonical updates to this struct’s environment.

class runtimepy.net.arbiter.info.TrigStruct(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], markdown: str = None)[source]#

Bases: RuntimeStruct, TrigMixin

A simple trig struct.

init_env() None[source]#

Initialize this sample environment.

iterations: Uint32Primitive#
poll() None[source]#

A method that other runtime entities can call to perform canonical updates to this struct’s environment.

runtimepy.net.arbiter.result module#

A module implementing a simple, application-result interface.

class runtimepy.net.arbiter.result.AppResult(method: str, state: ResultState = ResultState.NOT_RUN, code: int | None = None, exception: Exception | None = None, duration_ns: int | None = None)[source]#

Bases: NamedTuple

A container for application-result information.

code: int | None#

Alias for field number 2

duration_ns: int | None#

Alias for field number 4

exception: Exception | None#

Alias for field number 3

log(overall_idx: int, stage_idx: int, logger: Logger | LoggerAdapter[Any]) None[source]#

Log information about this result.

method: str#

Alias for field number 0

state: ResultState#

Alias for field number 1

class runtimepy.net.arbiter.result.ResultState(*values)[source]#

Bases: StrEnum

Possible outcomes of an application method.

EXCEPTION = 'exception'#
FAIL = 'fail'#
NOT_RUN = 'not run'#
PASS = 'pass'#
static from_int(data: int) ResultState[source]#

Create result state from an integer.

property log_level: int#

Get a log level for this result state.

runtimepy.net.arbiter.result.log_stage(overall_idx: int, result: list[AppResult], logger: Logger | LoggerAdapter[Any] = None) bool[source]#

Log the results of a stage.

runtimepy.net.arbiter.result.results(result: list[list[AppResult]], logger: Logger | LoggerAdapter[Any] = None) bool[source]#

Log overall results.

runtimepy.net.arbiter.task module#

A module implement a base class for arbiter periodic tasks.

class runtimepy.net.arbiter.task.ArbiterTask(name: str, average_depth: int = 10, metrics: PeriodicTaskMetrics = None, period_s: float = 1.0, env: ChannelEnvironment = None, period_controls: dict[str, int | float | bool | dict[str, int | float | bool]] | str = 'period', markdown: str = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None)[source]#

Bases: PeriodicTask

A base class for arbiter periodic tasks.

app: AppInfo#
auto_finalize = False#
async init(app: AppInfo) None[source]#

Initialize this task with application information.

class runtimepy.net.arbiter.task.ArbiterTaskManager[source]#

Bases: PeriodicTaskManager[ArbiterTask]

A task-manger class for the connection arbiter.

class runtimepy.net.arbiter.task.TaskFactory[source]#

Bases: Generic[T]

A task-factory base class.

kind: type[T]#

runtimepy.net.arbiter.udp module#

A module implementing a basic UDP connection factory that can be extended.

class runtimepy.net.arbiter.udp.UdpConnectionFactory[source]#

Bases: ConnectionFactory, Generic[T]

A class implementing a basic UDP connection factory.

async client(name: str, *args, **kwargs) Connection[source]#

Create a client connection.

kind: type[T]#

runtimepy.net.arbiter.websocket module#

A module implementing a basic WebSocket connection factory that can be extended.

class runtimepy.net.arbiter.websocket.WebsocketConnectionFactory[source]#

Bases: ConnectionFactory, Generic[T]

A class implementing a basic WebSocket connection factory.

async client(name: str, *args, **kwargs) Connection[source]#

Create a client connection.

kind: type[T]#
async server_task(stop_sig: Event, manager: ConnectionManager, started_sem: Semaphore, *args, **kwargs) Awaitable[None][source]#

Create a task that will run a connection server.

Module contents#

A module implementing a connection arbiter interface.

class runtimepy.net.arbiter.AppInfo(logger: Logger | LoggerAdapter[Any], stack: AsyncExitStack, connections: MutableMapping[str, Connection], conn_manager: ConnectionManager, names: Namespace, stop: Event, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], ports: dict[str, int], tui: TuiMixin, tasks: dict[str, PeriodicTask], task_manager: PeriodicTaskManager[Any], results: list[list[AppResult]], structs: dict[str, RuntimeStructBase], peers: dict[str, _RuntimepyPeer])[source]#

Bases: LoggerMixin

References provided to network applications.

async all_finalized() None[source]#

Wait for all tasks and connections to be finalized.

config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]]#
config_param(key: str, default: Z, strict: bool = False) Z[source]#

Attempt to get a configuration parameter.

conn_manager: ConnectionManager#
connections: MutableMapping[str, Connection]#
exceptions() Iterator[Exception][source]#

Iterate over exceptions raised by the application.

logger: Logger | LoggerAdapter[Any]#
names: Namespace#
original_config() dict[str, Any][source]#

Re-assemble a dictionary closer to the original configuration data (than the .config attribute).

peers: dict[str, _RuntimepyPeer]#
ports: dict[str, int]#
property raised_exception: bool#

Determine if the application raised any exception.

result(logger: Logger | LoggerAdapter[Any] = None) bool[source]#

Get the overall boolean result for the application.

results: list[list[AppResult]]#
search(*names: str, pattern: str = '.*', kind: type[~runtimepy.net.arbiter.info.T] = <class 'runtimepy.net.connection.Connection'>) Iterator[T][source]#

Get all connections that are matching a naming convention or are specific kind (or both).

search_structs(kind: type[W], pattern: str = '.*') Iterator[W][source]#

Search for structs by type or name.

search_tasks(kind: type[V], pattern: str = '.*') Iterator[V][source]#

Search for tasks by type or pattern.

single(*names: str, pattern: str = '.*', kind: type[~runtimepy.net.arbiter.info.T] = <class 'runtimepy.net.connection.Connection'>) T[source]#

Search for a single node.

stack: AsyncExitStack#
stop: Event#
structs: dict[str, RuntimeStructBase]#
task_manager: PeriodicTaskManager[Any]#
tasks: dict[str, PeriodicTask]#
tui: TuiMixin#
with_new_logger(name: str) AppInfo[source]#

Get a copy of this AppInfo instance, but with a new logger.

class runtimepy.net.arbiter.ArbiterTask(name: str, average_depth: int = 10, metrics: PeriodicTaskMetrics = None, period_s: float = 1.0, env: ChannelEnvironment = None, period_controls: dict[str, int | float | bool | dict[str, int | float | bool]] | str = 'period', markdown: str = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None)[source]#

Bases: PeriodicTask

A base class for arbiter periodic tasks.

app: AppInfo#
auto_finalize = False#
config: _JsonObject#
env: ChannelEnvironment#
async init(app: AppInfo) None[source]#

Initialize this task with application information.

logger: LoggerType#
markdown: str#
class runtimepy.net.arbiter.ArbiterTaskManager[source]#

Bases: PeriodicTaskManager[ArbiterTask]

A task-manger class for the connection arbiter.

class runtimepy.net.arbiter.ConnectionArbiter(manager: ConnectionManager = None, stop_sig: Event = None, namespace: Namespace = None, logger: Logger | LoggerAdapter[Any] = None, app: Callable[[AppInfo], Awaitable[int]] | list[Callable[[AppInfo], Awaitable[int]]] = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None, window: Any | None = None, metrics_poller_task: bool = True)[source]#

Bases: ConfigConnectionArbiter

A class implementing a connection manager for a broader application.

class runtimepy.net.arbiter.TaskFactory[source]#

Bases: Generic[T]

A task-factory base class.

kind: type[T]#
async runtimepy.net.arbiter.init_only(app: AppInfo) int[source]#

A network application that doesn’t do anything.