runtimepy.net.stream.json package#

Module contents#

A module implementing a JSON message connection interface.

class runtimepy.net.stream.json.JsonMessageConnection(logger: Logger | LoggerAdapter[Any], env: ChannelEnvironment = None, add_metrics: bool = True, markdown: str = None)[source]#

Bases: StringMessageConnection, AsyncCommandProcessingMixin, JsonMessageInterface

A connection interface for JSON messaging.

async async_init() bool[source]#

A runtime initialization routine (executes during ‘process’).

async handle_command(args: Namespace, channel: BitField | Channel[Int8Primitive] | Channel[Int16Primitive] | Channel[Int32Primitive] | Channel[Int64Primitive] | Channel[Uint8Primitive] | Channel[Uint16Primitive] | Channel[Uint32Primitive] | Channel[Uint64Primitive] | Channel[FloatPrimitive] | Channel[DoublePrimitive] | Channel[BooleanPrimitive] | None) None[source]#

Handle a remote command asynchronously.

init() None[source]#

Initialize this instance.

async process_message(data: str, addr: tuple[str, int] = None) bool[source]#

Process a string message.

write(data: bytes, addr: tuple[str, int] = None) None[source]#

Write data.