runtimepy.subprocess package

Contents

runtimepy.subprocess package#

Submodules#

runtimepy.subprocess.interface module#

A module implementing a runtimepy peer interface.

class runtimepy.subprocess.interface.RuntimepyPeerInterface(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], markdown: str = None)[source]#

Bases: JsonMessageInterface, AsyncCommandProcessingMixin, MarkdownMixin

A class implementing an interface for messaging peer subprocesses.

async handle_command(args: Namespace, channel: BitField | Channel[Int8Primitive] | Channel[Int16Primitive] | Channel[Int32Primitive] | Channel[Int64Primitive] | Channel[Uint8Primitive] | Channel[Uint16Primitive] | Channel[Uint32Primitive] | Channel[Uint64Primitive] | Channel[FloatPrimitive] | Channel[DoublePrimitive] | Channel[BooleanPrimitive] | None) None[source]#

Handle a command.

handle_log_message(message: dict[str, Any]) None[source]#

Handle a log message.

handle_stderr(data: bytes) None[source]#

Forward stderr.

async handle_stdout(data: bytes) None[source]#

Handle messages from stdout.

markdown: str#
property peer_name: str#

Get the name of the peer’s environment.

async poll_handler() None[source]#

Handle a ‘poll’ message.

poll_metrics(time_ns: int = None) None[source]#

Poll channels.

poll_period_s: float = 0.01#
async share_config(data: dict[str, Any]) None[source]#

Exchange configuration data.

async share_environment() None[source]#

Exchange channel environments.

struct_pre_finalize() None[source]#

Configure struct before finalization.

struct_type#

alias of SampleStruct

runtimepy.subprocess.peer module#

A module implementing a runtimepy peer interface.

class runtimepy.subprocess.peer.RuntimepyPeer(protocol: RuntimepySubprocessProtocol, name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], markdown: str = None)[source]#

Bases: RuntimepyPeerInterface

A class implementing an interface for messaging peer subprocesses.

classmethod exec(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], *args, markdown: str = None, **kwargs) AsyncIterator[T][source]#

Create an instance from comand-line arguments.

async main() None[source]#

Program entry.

classmethod running_program(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], import_str: str, *args, **kwargs) AsyncIterator[T][source]#

Run a peer subprocess.

async service_queues() bool[source]#

Service data from peer.

classmethod shell(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], cmd: str, markdown: str = None) AsyncIterator[T][source]#

Create an instance from a shell command.

write(data: bytes, addr: tuple[str, int] = None) None[source]#

Write bytes via this interface.

runtimepy.subprocess.program module#

A module implementing a peer program communication interface.

class runtimepy.subprocess.program.PeerProgram(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], markdown: str = None)[source]#

Bases: RuntimepyPeerInterface, PsutilMixin

A communication interface for peer programs.

async cleanup() None[source]#

Runs when program ‘running’ context exits.

got_eof: Event#
async heartbeat_task() None[source]#

Send a message heartbeat back and forth.

async io_task(buffer: BinaryIO) None[source]#

Run this peer program’s main loop.

json_output: BinaryIO#
async main(argv: list[str]) None[source]#

Program entry.

async poll_handler() None[source]#

Handle a ‘poll’ message.

pre_environment_exchange() None[source]#

Perform early initialization tasks.

async classmethod run(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], argv: list[str]) None[source]#

Run the program.

classmethod run_standard(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]]) tuple[Task[None], T][source]#

Run this program using standard input and output.

classmethod running(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], argv: list[str]) AsyncIterator[tuple[Task[None], Task[None], T]][source]#

Provide an interface for managed-context cleanup of the peer process.

classmethod singleton() T[source]#

Get a shared single instance of a protocol for this class.

stream_metrics: ChannelMetrics#
stream_output: BinaryIO#
streaming_events() Iterator[None][source]#

Stream events to the stream output.

struct_pre_finalize() None[source]#

Configure struct before finalization.

struct_type#

alias of RuntimeStruct

write(data: bytes, addr: tuple[str, int] = None) None[source]#

Write data.

runtimepy.subprocess.protocol module#

A module implementing a subprocess protocol.

class runtimepy.subprocess.protocol.RuntimepySubprocessProtocol[source]#

Bases: SubprocessProtocol

A simple subprocess protocol implementation.

connection_made(transport) None[source]#

Initialize this protocol.

elapsed_time: int#
exited: Event#
property pid: int#

Get this subprocess’s protocol identifier.

pipe_connection_lost(fd: int, exc) None[source]#

Handle a pipe connection closing.

pipe_data_received(fd: int, data: bytes) None[source]#

Handle incoming pipe data.

process_exited() None[source]#

Handle process exit.

start_time: int#
property stderr: Queue[bytes]#

Get this instance’s standard error queue.

stderr_queue: Queue[bytes] | None#
stderr_transport: ReadTransport#
stdin: WriteTransport#
property stdout: Queue[bytes]#

Get this instance’s standard output queue.

stdout_queue: Queue[bytes] | None#
stdout_transport: ReadTransport#
subproc: Popen[bytes]#
transport: SubprocessTransport#

Module contents#

A module implementing a subprocess management interface.

async runtimepy.subprocess.close_protocol(protocol: ~runtimepy.subprocess.protocol.RuntimepySubprocessProtocol, poll_rate: float = 2.0, stdout: ~asyncio.queues.Queue[bytes] = None, stderr: ~asyncio.queues.Queue[bytes] = None, logger: ~logging.Logger | ~logging.LoggerAdapter[~typing.Any] = <Logger runtimepy.subprocess (WARNING)>) None[source]#

Shutdown a protocol instance.

async runtimepy.subprocess.shutdown_protocol(protocol: ~runtimepy.subprocess.protocol.RuntimepySubprocessProtocol, poll_rate: float = 2.0, logger: ~logging.Logger | ~logging.LoggerAdapter[~typing.Any] = <Logger runtimepy.subprocess (WARNING)>) None[source]#

Shutdown a subprocess protocol instance.

runtimepy.subprocess.spawn_exec(*args: str, stdout: ~asyncio.queues.Queue[bytes] = None, stderr: ~asyncio.queues.Queue[bytes] = None, logger: ~logging.Logger | ~logging.LoggerAdapter[~typing.Any] = <Logger runtimepy.subprocess (WARNING)>, **kwargs) AsyncIterator[RuntimepySubprocessProtocol][source]#

Create a subprocess.

runtimepy.subprocess.spawn_shell(cmd: str, stdout: ~asyncio.queues.Queue[bytes] = None, stderr: ~asyncio.queues.Queue[bytes] = None, logger: ~logging.Logger | ~logging.LoggerAdapter[~typing.Any] = <Logger runtimepy.subprocess (WARNING)>, **kwargs) AsyncIterator[RuntimepySubprocessProtocol][source]#

Create a shell subprocess.