runtimepy.message package#

Submodules#

runtimepy.message.handlers module#

A module defining some useful JSON message handlers.

class runtimepy.message.handlers.ChannelCommand(data: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None, schemas: SchemaMap = None, dest_attr: str = 'data', verify: bool = True)[source]#

Bases: RuntimepyDictCodec, BasicDictCodec

A schema-validated find-file request.

data: dict[str, Any]#
class runtimepy.message.handlers.FindFile(data: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None, schemas: SchemaMap = None, dest_attr: str = 'data', verify: bool = True)[source]#

Bases: RuntimepyDictCodec, BasicDictCodec

A schema-validated find-file request.

data: dict[str, Any]#
runtimepy.message.handlers.channel_env_handler(envs: dict[str, ChannelCommandProcessor], default: ChannelCommandProcessor) Callable[[dict[str, Any], ChannelCommand], Awaitable[None]][source]#

Create a channel-environment map command handler.

async runtimepy.message.handlers.event_wait(event: Event, timeout: float) bool[source]#

Wait for an event to be set within a timeout.

async runtimepy.message.handlers.find_file_request_handler(outbox: dict[str, Any], request: FindFile) None[source]#

Attempt to find a file path based on the request.

async runtimepy.message.handlers.loopback_handler(outbox: dict[str, Any], inbox: dict[str, Any]) None[source]#

A simple loopback handler.

runtimepy.message.interface module#

A module implementing a JSON messaging interface.

class runtimepy.message.interface.JsonMessageInterface[source]#

Bases: object

A JSON messaging interface class.

basic_handler(key: str, handler: ~typing.Callable[[dict[str, ~typing.Any], dict[str, ~typing.Any]], ~typing.Awaitable[None]] = <function loopback_handler>) None[source]#

Register a basic handler.

async channel_command(command: str, environment: str = 'default', addr: tuple[str, int] = None) CommandResult[source]#

Send a channel command to an endpoint.

command: ChannelCommandProcessor#
handle_log_message(message: dict[str, Any]) None[source]#

Handle a log message.

list_handler: ListLogger#
logger: Logger | LoggerAdapter[Any]#
async loopback(data: dict[str, Any] = None, addr: tuple[str, int] = None, timeout: float = 3) bool[source]#

Perform a simple loopback test on this connection.

async poll_handler() None[source]#

Poll this instance.

async process_json(data: dict[str, Any], addr: tuple[str, int] = None) bool[source]#

Process a JSON message.

processor: MessageProcessor#
remote_environments: dict[str, ChannelEnvironment]#
send_json(data: dict[str, Any] | JsonCodec, addr: tuple[str, int] = None) None[source]#

Send a JSON message.

send_poll(loopback: int = 1) None[source]#

Send a poll message with a default loopback of 1, so that this instance will also be polled.

stage_remote_log(msg: str, *args, level: int = 20) None[source]#

Log a message on the remote.

typed_handler(key: str, kind: type[T], handler: Callable[[dict[str, Any], T], Awaitable[None]]) None[source]#

Register a typed handler.

async wait_json(data: dict[str, Any] | JsonCodec = None, addr: tuple[str, int] = None, timeout: float = 3) dict[str, Any][source]#

Send a JSON message and wait for a response.

write(data: bytes, addr: tuple[str, int] = None) None[source]#

Write data.

runtimepy.message.types module#

A module containing useful type definitions for JSON messaging.

Module contents#

A module implementing a message-stream processing interface.

class runtimepy.message.MessageProcessor(byte_order: ByteOrder = ByteOrder.NETWORK)[source]#

Bases: object

A class for parsing size-delimited messages.

encode(stream: BytesIO, data: bytes | str) None[source]#

Encode a message to a stream.

encode_json(stream: BytesIO, data: dict[str, Any]) None[source]#

Encode a message as JSON.

message_length_kind#

alias of Uint32Primitive

messages(data: bytes) Iterator[dict[str, Any]][source]#

Iterate over incoming messages.

process(data: bytes) Iterator[bytes][source]#

Process an incoming message.

class runtimepy.message.StrFallbackJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]#

Bases: JSONEncoder

Custom JSON encoder.

default(o)[source]#

Use a string conversion if necessary.