runtimepy.net.server.websocket package#

Submodules#

runtimepy.net.server.websocket.state module#

A module implementing a stateful data interface for per-connection tabs.

class runtimepy.net.server.websocket.state.TabState(shown: bool, tab_logger: ListLogger, points: dict[str, list[tuple[str | int | float | bool, int]]], primitives: dict[str, Int8Primitive | Int16Primitive | Int32Primitive | Int64Primitive | Uint8Primitive | Uint16Primitive | Uint32Primitive | Uint64Primitive | HalfPrimitive | FloatPrimitive | DoublePrimitive | BooleanPrimitive], callbacks: dict[str, int], latest_ui_values: dict[str | int, bool | int | float | str], _loggers: list[Logger])[source]#

Bases: object

Stateful information relevant to individual tabs.

add_logger(logger: Logger) None[source]#

Add a logger.

callbacks: dict[str, int]#
clear_loggers() None[source]#

Clear all logging handlers.

clear_telemetry() None[source]#

Clear all telemetry interactions.

static create() TabState[source]#

Create a new instance.

frame(time: float) dict[str, Any][source]#

Handle a new UI frame.

latest_ui_values: dict[str | int, bool | int | float | str]#
points: dict[str, list[tuple[str | int | float | bool, int]]]#
primitives: dict[str, Int8Primitive | Int16Primitive | Int32Primitive | Int64Primitive | Uint8Primitive | Uint16Primitive | Uint32Primitive | Uint64Primitive | HalfPrimitive | FloatPrimitive | DoublePrimitive | BooleanPrimitive]#
shown: bool#
tab_logger: ListLogger#

Module contents#

A module implementing a simple WebSocket server for the package.

class runtimepy.net.server.websocket.RuntimepyDataWebsocketConnection(protocol: ClientConnection | ServerConnection, **kwargs)[source]#

Bases: WebsocketConnection

A class implementing a WebSocket connection for streaming raw data.

class runtimepy.net.server.websocket.RuntimepyWebsocketConnection(protocol: ClientConnection | ServerConnection, **kwargs)[source]#

Bases: WebsocketJsonMessageConnection

A class implementing a package-specific WebSocket connection.

async async_init() bool[source]#

A runtime initialization routine (executes during ‘process’).

command: ChannelCommandProcessor#
disable_extra() None[source]#

Additional tasks to perform when disabling.

env: ChannelEnvironment#
id_responses: dict[int, JsonMessage]#
ids_waiting: dict[int, asyncio.Event]#
init() None[source]#

Initialize this instance.

list_handler: ListLogger#
logger: LoggerType#
markdown: str#
outgoing_commands: asyncio.Queue[ChannelCommandParams]#
poll_connection_metrics: bool#
poll_governor: RateLimiter#
processor: MessageProcessor#
remote_environments: dict[str, ChannelEnvironment]#
remote_meta: JsonMessage | None#
send_interfaces: dict[str, Callable[[dict[str, Any]], None]]#
tab_sender(name: str) Callable[[dict[str, Any]], None][source]#

Get a tab message-sending interface.

tabs: dict[str, TabState]#
ui_time: float#