runtimepy.net.tcp package#

Subpackages#

Submodules#

runtimepy.net.tcp.connection module#

A module implementing a TCP connection interface.

class runtimepy.net.tcp.connection.EchoTcpConnection(transport: Transport, protocol: QueueProtocol, **kwargs)[source]#

Bases: TcpConnection, EchoConnection

An echo connection for TCP.

class runtimepy.net.tcp.connection.NullTcpConnection(transport: Transport, protocol: QueueProtocol, **kwargs)[source]#

Bases: TcpConnection, NullConnection

A null TCP connection.

class runtimepy.net.tcp.connection.TcpConnection(transport: Transport, protocol: QueueProtocol, **kwargs)[source]#

Bases: Connection, TransportMixin

A TCP connection interface.

async classmethod app(stop_sig: Event, callback: Callable[[T], None] = None, serving_callback: Callable[[Any], None] = None, manager: ConnectionManager = None, **kwargs) None[source]#

Run an application that serves new connections.

async close() None[source]#

Close this connection.

async classmethod create_connection(backoff: ExponentialBackoff = None, markdown: str = None, **kwargs) T[source]#

Create a TCP connection.

classmethod create_pair(peer: type[V] = None, serve_kwargs: dict[str, Any] = None, connect_kwargs: dict[str, Any] = None, host: str = '127.0.0.1') AsyncIterator[tuple[V, T]][source]#

Create a connection pair.

env: ChannelEnvironment#
classmethod get_log_prefix(is_ssl: bool = False) str[source]#

Get a logging prefix for this instance.

property is_ssl: bool#

Determine if this connection uses SSL.

log_alias = 'TCP'#
log_prefix = ''#
logger: LoggerType#
markdown: str#
remote_address: _Optional[_IpHost]#
async restart() bool[source]#

Reset necessary underlying state for this connection to ‘process’ again.

send_binary(data: bytes | bytearray | memoryview) None[source]#

Enqueue a binary message tos end.

send_text(data: str) None[source]#

Enqueue a text message to send.

classmethod serve(callback: Callable[[T], None] = None, **kwargs) AsyncIterator[Any][source]#

Serve incoming connections.

uses_binary_tx_queue = False#
uses_text_tx_queue = False#

runtimepy.net.tcp.create module#

A module for instantiating the underlying networking resources for TcpConnection.

async runtimepy.net.tcp.create.tcp_transport_protocol(**kwargs) tuple[Transport, QueueProtocol][source]#

Create a transport and protocol pair relevant for this class’s implementation.

async runtimepy.net.tcp.create.tcp_transport_protocol_backoff(backoff: ExponentialBackoff = None, **kwargs) tuple[Transport, QueueProtocol][source]#

Create a transport and protocol pair relevant for this class’s implementation.

async runtimepy.net.tcp.create.try_tcp_transport_protocol(callback: Callable[[tuple[Transport, QueueProtocol]], None] = None, **kwargs) tuple[Transport, QueueProtocol] | None[source]#

Attempt to create a transport and protocol pair.

runtimepy.net.tcp.protocol module#

A module implementing a Protocol for TcpConnection.

class runtimepy.net.tcp.protocol.QueueProtocol[source]#

Bases: BinaryMessageQueueMixin, Protocol

A simple streaming protocol that populates a message queue.

conn: Connection#
connection_lost(exc: Exception | None) None[source]#

Log the disconnection.

connection_made(transport: BaseTransport) None[source]#

Log the connection establishment.

data_received(data: bytes | bytearray | memoryview) None[source]#

Handle incoming data.

logger: Logger | LoggerAdapter[Any]#

Module contents#

A module aggregating all TCP-related interfaces.

class runtimepy.net.tcp.EchoTcpConnection(transport: Transport, protocol: QueueProtocol, **kwargs)[source]#

Bases: TcpConnection, EchoConnection

An echo connection for TCP.

class runtimepy.net.tcp.NullTcpConnection(transport: Transport, protocol: QueueProtocol, **kwargs)[source]#

Bases: TcpConnection, NullConnection

A null TCP connection.

class runtimepy.net.tcp.TcpConnection(transport: Transport, protocol: QueueProtocol, **kwargs)[source]#

Bases: Connection, TransportMixin

A TCP connection interface.

async classmethod app(stop_sig: Event, callback: Callable[[T], None] = None, serving_callback: Callable[[Any], None] = None, manager: ConnectionManager = None, **kwargs) None[source]#

Run an application that serves new connections.

async close() None[source]#

Close this connection.

async classmethod create_connection(backoff: ExponentialBackoff = None, markdown: str = None, **kwargs) T[source]#

Create a TCP connection.

classmethod create_pair(peer: type[V] = None, serve_kwargs: dict[str, Any] = None, connect_kwargs: dict[str, Any] = None, host: str = '127.0.0.1') AsyncIterator[tuple[V, T]][source]#

Create a connection pair.

env: ChannelEnvironment#
classmethod get_log_prefix(is_ssl: bool = False) str[source]#

Get a logging prefix for this instance.

property is_ssl: bool#

Determine if this connection uses SSL.

log_alias = 'TCP'#
log_prefix = ''#
logger: LoggerType#
markdown: str#
remote_address: _Optional[_IpHost]#
async restart() bool[source]#

Reset necessary underlying state for this connection to ‘process’ again.

send_binary(data: bytes | bytearray | memoryview) None[source]#

Enqueue a binary message tos end.

send_text(data: str) None[source]#

Enqueue a text message to send.

classmethod serve(callback: Callable[[T], None] = None, **kwargs) AsyncIterator[Any][source]#

Serve incoming connections.

uses_binary_tx_queue = False#
uses_text_tx_queue = False#