runtimepy.net.tcp package#
Subpackages#
- runtimepy.net.tcp.http package
- Module contents
HttpConnection
HttpConnection.env
HttpConnection.expecting_response
HttpConnection.get_handler()
HttpConnection.get_log_prefix()
HttpConnection.handlers
HttpConnection.identity
HttpConnection.init()
HttpConnection.log_alias
HttpConnection.logger
HttpConnection.markdown
HttpConnection.post_handler()
HttpConnection.process_binary()
HttpConnection.remote_address
HttpConnection.request()
HttpConnection.request_json()
to_json()
- Module contents
- runtimepy.net.tcp.scpi package
- runtimepy.net.tcp.telnet package
- Submodules
- runtimepy.net.tcp.telnet.codes module
- runtimepy.net.tcp.telnet.np_05b module
Np05bConnection
Np05bConnection.async_init()
Np05bConnection.chan_to_idx
Np05bConnection.default_auto_restart
Np05bConnection.handle_command()
Np05bConnection.init()
Np05bConnection.lock
Np05bConnection.np05b_command()
Np05bConnection.num_ports
Np05bConnection.prev_cmd
Np05bConnection.process_response()
Np05bConnection.process_text()
Np05bConnection.prompt
Np05bConnection.request_outlet_status()
Np05bConnection.set_outlet_state()
Np05bStrings
- Module contents
Submodules#
runtimepy.net.tcp.connection module#
A module implementing a TCP connection interface.
- class runtimepy.net.tcp.connection.EchoTcpConnection(transport: Transport, protocol: QueueProtocol, **kwargs)[source]#
Bases:
TcpConnection
,EchoConnection
An echo connection for TCP.
- class runtimepy.net.tcp.connection.NullTcpConnection(transport: Transport, protocol: QueueProtocol, **kwargs)[source]#
Bases:
TcpConnection
,NullConnection
A null TCP connection.
- class runtimepy.net.tcp.connection.TcpConnection(transport: Transport, protocol: QueueProtocol, **kwargs)[source]#
Bases:
Connection
,TransportMixin
A TCP connection interface.
- async classmethod app(stop_sig: Event, callback: Callable[[T], None] = None, serving_callback: Callable[[Any], None] = None, manager: ConnectionManager = None, **kwargs) None [source]#
Run an application that serves new connections.
- async classmethod create_connection(backoff: ExponentialBackoff = None, markdown: str = None, **kwargs) T [source]#
Create a TCP connection.
- classmethod create_pair(peer: type[V] = None, serve_kwargs: dict[str, Any] = None, connect_kwargs: dict[str, Any] = None, host: str = '127.0.0.1') AsyncIterator[tuple[V, T]] [source]#
Create a connection pair.
- env: ChannelEnvironment#
- classmethod get_log_prefix(is_ssl: bool = False) str [source]#
Get a logging prefix for this instance.
- property is_ssl: bool#
Determine if this connection uses SSL.
- log_alias = 'TCP'#
- log_prefix = ''#
- logger: LoggerType#
- markdown: str#
- remote_address: _Optional[_IpHost]#
- async restart() bool [source]#
Reset necessary underlying state for this connection to ‘process’ again.
- classmethod serve(callback: Callable[[T], None] = None, **kwargs) AsyncIterator[Any] [source]#
Serve incoming connections.
- uses_binary_tx_queue = False#
- uses_text_tx_queue = False#
runtimepy.net.tcp.create module#
A module for instantiating the underlying networking resources for TcpConnection.
- async runtimepy.net.tcp.create.tcp_transport_protocol(**kwargs) tuple[Transport, QueueProtocol] [source]#
Create a transport and protocol pair relevant for this class’s implementation.
- async runtimepy.net.tcp.create.tcp_transport_protocol_backoff(backoff: ExponentialBackoff = None, **kwargs) tuple[Transport, QueueProtocol] [source]#
Create a transport and protocol pair relevant for this class’s implementation.
- async runtimepy.net.tcp.create.try_tcp_transport_protocol(callback: Callable[[tuple[Transport, QueueProtocol]], None] = None, **kwargs) tuple[Transport, QueueProtocol] | None [source]#
Attempt to create a transport and protocol pair.
runtimepy.net.tcp.protocol module#
A module implementing a Protocol for TcpConnection.
- class runtimepy.net.tcp.protocol.QueueProtocol[source]#
Bases:
BinaryMessageQueueMixin
,Protocol
A simple streaming protocol that populates a message queue.
- conn: Connection#
- logger: Logger | LoggerAdapter[Any]#
Module contents#
A module aggregating all TCP-related interfaces.
- class runtimepy.net.tcp.EchoTcpConnection(transport: Transport, protocol: QueueProtocol, **kwargs)[source]#
Bases:
TcpConnection
,EchoConnection
An echo connection for TCP.
- class runtimepy.net.tcp.NullTcpConnection(transport: Transport, protocol: QueueProtocol, **kwargs)[source]#
Bases:
TcpConnection
,NullConnection
A null TCP connection.
- class runtimepy.net.tcp.TcpConnection(transport: Transport, protocol: QueueProtocol, **kwargs)[source]#
Bases:
Connection
,TransportMixin
A TCP connection interface.
- async classmethod app(stop_sig: Event, callback: Callable[[T], None] = None, serving_callback: Callable[[Any], None] = None, manager: ConnectionManager = None, **kwargs) None [source]#
Run an application that serves new connections.
- async classmethod create_connection(backoff: ExponentialBackoff = None, markdown: str = None, **kwargs) T [source]#
Create a TCP connection.
- classmethod create_pair(peer: type[V] = None, serve_kwargs: dict[str, Any] = None, connect_kwargs: dict[str, Any] = None, host: str = '127.0.0.1') AsyncIterator[tuple[V, T]] [source]#
Create a connection pair.
- env: ChannelEnvironment#
- classmethod get_log_prefix(is_ssl: bool = False) str [source]#
Get a logging prefix for this instance.
- property is_ssl: bool#
Determine if this connection uses SSL.
- log_alias = 'TCP'#
- log_prefix = ''#
- logger: LoggerType#
- markdown: str#
- remote_address: _Optional[_IpHost]#
- async restart() bool [source]#
Reset necessary underlying state for this connection to ‘process’ again.
- classmethod serve(callback: Callable[[T], None] = None, **kwargs) AsyncIterator[Any] [source]#
Serve incoming connections.
- uses_binary_tx_queue = False#
- uses_text_tx_queue = False#