runtimepy.subprocess package#
Submodules#
runtimepy.subprocess.interface module#
A module implementing a runtimepy peer interface.
- class runtimepy.subprocess.interface.RuntimepyPeerInterface(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], markdown: str = None)[source]#
Bases:
JsonMessageInterface
,AsyncCommandProcessingMixin
,MarkdownMixin
A class implementing an interface for messaging peer subprocesses.
- async handle_command(args: Namespace, channel: BitField | Channel[Int8Primitive] | Channel[Int16Primitive] | Channel[Int32Primitive] | Channel[Int64Primitive] | Channel[Uint8Primitive] | Channel[Uint16Primitive] | Channel[Uint32Primitive] | Channel[Uint64Primitive] | Channel[FloatPrimitive] | Channel[DoublePrimitive] | Channel[BooleanPrimitive] | None) None [source]#
Handle a command.
- markdown: str#
- property peer_name: str#
Get the name of the peer’s environment.
- poll_period_s: float = 0.01#
Exchange configuration data.
Exchange channel environments.
- struct_type#
alias of
SampleStruct
runtimepy.subprocess.peer module#
A module implementing a runtimepy peer interface.
- class runtimepy.subprocess.peer.RuntimepyPeer(protocol: RuntimepySubprocessProtocol, name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], markdown: str = None)[source]#
Bases:
RuntimepyPeerInterface
A class implementing an interface for messaging peer subprocesses.
- classmethod exec(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], *args, markdown: str = None, **kwargs) AsyncIterator[T] [source]#
Create an instance from comand-line arguments.
- classmethod running_program(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], import_str: str, *args, **kwargs) AsyncIterator[T] [source]#
Run a peer subprocess.
runtimepy.subprocess.program module#
A module implementing a peer program communication interface.
- class runtimepy.subprocess.program.PeerProgram(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], markdown: str = None)[source]#
Bases:
RuntimepyPeerInterface
,PsutilMixin
A communication interface for peer programs.
- got_eof: Event#
- json_output: BinaryIO#
- async classmethod run(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], argv: list[str]) None [source]#
Run the program.
- classmethod run_standard(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]]) tuple[Task[None], T] [source]#
Run this program using standard input and output.
- classmethod running(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], argv: list[str]) AsyncIterator[tuple[Task[None], Task[None], T]] [source]#
Provide an interface for managed-context cleanup of the peer process.
- stream_metrics: ChannelMetrics#
- stream_output: BinaryIO#
- struct_type#
alias of
RuntimeStruct
runtimepy.subprocess.protocol module#
A module implementing a subprocess protocol.
- class runtimepy.subprocess.protocol.RuntimepySubprocessProtocol[source]#
Bases:
SubprocessProtocol
A simple subprocess protocol implementation.
- elapsed_time: int#
- exited: Event#
- property pid: int#
Get this subprocess’s protocol identifier.
- start_time: int#
- property stderr: Queue[bytes]#
Get this instance’s standard error queue.
- stderr_queue: Queue[bytes] | None#
- stderr_transport: ReadTransport#
- stdin: WriteTransport#
- property stdout: Queue[bytes]#
Get this instance’s standard output queue.
- stdout_queue: Queue[bytes] | None#
- stdout_transport: ReadTransport#
- subproc: Popen[bytes]#
- transport: SubprocessTransport#
Module contents#
A module implementing a subprocess management interface.
- async runtimepy.subprocess.close_protocol(protocol: ~runtimepy.subprocess.protocol.RuntimepySubprocessProtocol, poll_rate: float = 2.0, stdout: ~asyncio.queues.Queue[bytes] = None, stderr: ~asyncio.queues.Queue[bytes] = None, logger: ~logging.Logger | ~logging.LoggerAdapter[~typing.Any] = <Logger runtimepy.subprocess (WARNING)>) None [source]#
Shutdown a protocol instance.
- async runtimepy.subprocess.shutdown_protocol(protocol: ~runtimepy.subprocess.protocol.RuntimepySubprocessProtocol, poll_rate: float = 2.0, logger: ~logging.Logger | ~logging.LoggerAdapter[~typing.Any] = <Logger runtimepy.subprocess (WARNING)>) None [source]#
Shutdown a subprocess protocol instance.
- runtimepy.subprocess.spawn_exec(*args: str, stdout: ~asyncio.queues.Queue[bytes] = None, stderr: ~asyncio.queues.Queue[bytes] = None, logger: ~logging.Logger | ~logging.LoggerAdapter[~typing.Any] = <Logger runtimepy.subprocess (WARNING)>, **kwargs) AsyncIterator[RuntimepySubprocessProtocol] [source]#
Create a subprocess.
- runtimepy.subprocess.spawn_shell(cmd: str, stdout: ~asyncio.queues.Queue[bytes] = None, stderr: ~asyncio.queues.Queue[bytes] = None, logger: ~logging.Logger | ~logging.LoggerAdapter[~typing.Any] = <Logger runtimepy.subprocess (WARNING)>, **kwargs) AsyncIterator[RuntimepySubprocessProtocol] [source]#
Create a shell subprocess.