runtimepy.net.server.websocket package#
Submodules#
runtimepy.net.server.websocket.state module#
A module implementing a stateful data interface for per-connection tabs.
- class runtimepy.net.server.websocket.state.TabState(shown: bool, tab_logger: ListLogger, points: dict[str, list[tuple[str | int | float | bool, int]]], primitives: dict[str, Int8Primitive | Int16Primitive | Int32Primitive | Int64Primitive | Uint8Primitive | Uint16Primitive | Uint32Primitive | Uint64Primitive | HalfPrimitive | FloatPrimitive | DoublePrimitive | BooleanPrimitive], callbacks: dict[str, int], latest_ui_values: dict[str | int, bool | int | float | str], _loggers: list[Logger])[source]#
Bases:
object
Stateful information relevant to individual tabs.
- callbacks: dict[str, int]#
- latest_ui_values: dict[str | int, bool | int | float | str]#
- points: dict[str, list[tuple[str | int | float | bool, int]]]#
- primitives: dict[str, Int8Primitive | Int16Primitive | Int32Primitive | Int64Primitive | Uint8Primitive | Uint16Primitive | Uint32Primitive | Uint64Primitive | HalfPrimitive | FloatPrimitive | DoublePrimitive | BooleanPrimitive]#
- shown: bool#
- tab_logger: ListLogger#
Module contents#
A module implementing a simple WebSocket server for the package.
- class runtimepy.net.server.websocket.RuntimepyDataWebsocketConnection(protocol: ClientConnection | ServerConnection, **kwargs)[source]#
Bases:
WebsocketConnection
A class implementing a WebSocket connection for streaming raw data.
- class runtimepy.net.server.websocket.RuntimepyWebsocketConnection(protocol: ClientConnection | ServerConnection, **kwargs)[source]#
Bases:
WebsocketJsonMessageConnection
A class implementing a package-specific WebSocket connection.
- command: ChannelCommandProcessor#
- env: ChannelEnvironment#
- id_responses: dict[int, JsonMessage]#
- ids_waiting: dict[int, asyncio.Event]#
- list_handler: ListLogger#
- logger: LoggerType#
- markdown: str#
- outgoing_commands: asyncio.Queue[ChannelCommandParams]#
- poll_connection_metrics: bool#
- poll_governor: RateLimiter#
- processor: MessageProcessor#
- remote_environments: dict[str, ChannelEnvironment]#
- remote_meta: JsonMessage | None#
- send_interfaces: dict[str, Callable[[dict[str, Any]], None]]#
- tab_sender(name: str) Callable[[dict[str, Any]], None] [source]#
Get a tab message-sending interface.
- ui_time: float#