runtimepy.net.websocket package#

Submodules#

runtimepy.net.websocket.connection module#

A module implementing a WebSocket connection interface.

class runtimepy.net.websocket.connection.EchoWebsocketConnection(protocol: ClientConnection | ServerConnection, **kwargs)[source]#

Bases: WebsocketConnection, EchoConnection

An echo connection for WebSocket.

class runtimepy.net.websocket.connection.NullWebsocketConnection(protocol: ClientConnection | ServerConnection, **kwargs)[source]#

Bases: WebsocketConnection, NullConnection

A null WebSocket connection.

class runtimepy.net.websocket.connection.WebsocketConnection(protocol: ClientConnection | ServerConnection, **kwargs)[source]#

Bases: Connection

A simple websocket connection interface.

async classmethod app(stop_sig: Event, init: Callable[[T], Awaitable[bool]] = None, manager: ConnectionManager = None, serving_callback: Callable[[Server], None] = None, **kwargs) None[source]#

Run a WebSocket-server application.

classmethod client(uri: str, markdown: str = None, **kwargs) AsyncIterator[T][source]#

A wrapper for connecting a client.

async close() None[source]#

Close this connection.

async classmethod create_connection(uri: str, markdown: str = None, **kwargs) T[source]#

Connect a client to an endpoint.

classmethod create_pair(serve_kwargs: dict[str, Any] = None) AsyncIterator[tuple[T, T]][source]#

Obtain a connected pair of WebsocketConnection objects.

classmethod serve(init: Callable[[T], Awaitable[bool]] = None, stop_sig: Event = None, manager: ConnectionManager = None, **kwargs) AsyncIterator[Server][source]#

Serve a WebSocket server.

classmethod server_handler(init: Callable[[T], Awaitable[bool]] = None, stop_sig: Event = None, manager: ConnectionManager = None) Callable[[ServerConnection], Awaitable[None]][source]#

A wrapper for passing in a websocket handler and initializing a connection.

Module contents#

A module aggregating all WebSocket-related interfaces.

class runtimepy.net.websocket.EchoWebsocketConnection(protocol: ClientConnection | ServerConnection, **kwargs)[source]#

Bases: WebsocketConnection, EchoConnection

An echo connection for WebSocket.

class runtimepy.net.websocket.NullWebsocketConnection(protocol: ClientConnection | ServerConnection, **kwargs)[source]#

Bases: WebsocketConnection, NullConnection

A null WebSocket connection.

class runtimepy.net.websocket.WebsocketConnection(protocol: ClientConnection | ServerConnection, **kwargs)[source]#

Bases: Connection

A simple websocket connection interface.

async classmethod app(stop_sig: Event, init: Callable[[T], Awaitable[bool]] = None, manager: ConnectionManager = None, serving_callback: Callable[[Server], None] = None, **kwargs) None[source]#

Run a WebSocket-server application.

classmethod client(uri: str, markdown: str = None, **kwargs) AsyncIterator[T][source]#

A wrapper for connecting a client.

async close() None[source]#

Close this connection.

async classmethod create_connection(uri: str, markdown: str = None, **kwargs) T[source]#

Connect a client to an endpoint.

classmethod create_pair(serve_kwargs: dict[str, Any] = None) AsyncIterator[tuple[T, T]][source]#

Obtain a connected pair of WebsocketConnection objects.

classmethod serve(init: Callable[[T], Awaitable[bool]] = None, stop_sig: Event = None, manager: ConnectionManager = None, **kwargs) AsyncIterator[Server][source]#

Serve a WebSocket server.

classmethod server_handler(init: Callable[[T], Awaitable[bool]] = None, stop_sig: Event = None, manager: ConnectionManager = None) Callable[[ServerConnection], Awaitable[None]][source]#

A wrapper for passing in a websocket handler and initializing a connection.