runtimepy.net package

Contents

runtimepy.net package#

Subpackages#

Submodules#

runtimepy.net.backoff module#

A module implementing a simple exponential-backoff interface.

class runtimepy.net.backoff.ExponentialBackoff(interval: float = 0.1, max_sleep: float = 10.0, max_tries: int = 20)[source]#

Bases: object

A class implementing a simple exponential-backoff handler.

attempt: int#
property give_up: bool#

Determine whether or not to give up based on the number of attempts.

reset() None[source]#

Reset this instance’s state.

async sleep() None[source]#

Sleep for the correct amount of time.

wait: float#

runtimepy.net.connection module#

A module implementing a network-connection interface.

class runtimepy.net.connection.Connection(logger: Logger | LoggerAdapter[Any], env: ChannelEnvironment = None, add_metrics: bool = True, markdown: str = None)[source]#

Bases: LoggerMixinLevelControl, ChannelEnvironmentMixin, MarkdownMixin, ABC

A connection interface.

async async_init() bool[source]#

A runtime initialization routine (executes during ‘process’).

property auto_restart: bool#

Determine if this connection should be automatically restarted.

byte_order: ByteOrder = 4#
async close() None[source]#

Close this connection.

connected = True#
default_auto_restart = False#
disable(reason: str) None[source]#

Disable this connection.

disable_extra() None[source]#

Additional tasks to perform when disabling.

async disable_in(time: float) None[source]#

A method for disabling a connection after some delay.

property disabled: bool#

Determine if this connection is disabled.

init() None[source]#

Initialize this instance.

log_metrics(label: str = 'conn') None[source]#

Log connection metrics.

async process(stop_sig: Event = None, disable_time: float = None, backoff: ExponentialBackoff = None) None[source]#

Process tasks for this connection while the connection is active.

async process_binary(data: bytes) bool[source]#

Process a binary frame.

async process_text(data: str) bool[source]#

Process a text frame.

process_then_disable(**kwargs) AsyncIterator[None][source]#

Process this connection, then disable and wait for completion.

async restart() bool[source]#

Reset necessary underlying state for this connection to ‘process’ again.

send_binary(data: bytes | bytearray | memoryview) None[source]#

Enqueue a binary message tos end.

send_text(data: str) None[source]#

Enqueue a text message to send.

property tasks: Iterator[Task[None]]#

Get active connection tasks. Instance uses this opportunity to release references to any completed tasks.

uses_binary_tx_queue = True#
uses_text_tx_queue = True#
class runtimepy.net.connection.EchoConnection(logger: Logger | LoggerAdapter[Any], env: ChannelEnvironment = None, add_metrics: bool = True, markdown: str = None)[source]#

Bases: Connection

A connection that just echoes what it was sent.

async process_binary(data: bytes) bool[source]#

Process a binary frame.

async process_text(data: str) bool[source]#

Process a text frame.

class runtimepy.net.connection.NullConnection(logger: Logger | LoggerAdapter[Any], env: ChannelEnvironment = None, add_metrics: bool = True, markdown: str = None)[source]#

Bases: Connection

A connection that doesn’t do anything with incoming data.

async process_binary(data: bytes) bool[source]#

Process a binary frame.

async process_text(data: str) bool[source]#

Process a text frame.

runtimepy.net.manager module#

A module implementing a connection manager.

class runtimepy.net.manager.ConnectionManager[source]#

Bases: LoggerMixin

A class for managing connection processing at runtime.

by_type(kind: type[T]) Iterator[T][source]#

Iterate over connections of a specific type.

property connection_tasks: Iterator[Task[None]]#

Iterate over connection tasks.

async manage(stop_sig: Event) None[source]#

Handle incoming connections until the stop signal is set.

property num_connections: int#

Return the number of managed connections.

poll_metrics(time_ns: int = None) None[source]#

Poll connection metrics.

reset_metrics() None[source]#

Reset connection metrics.

runtimepy.net.mixin module#

Various networking-related class utilities.

class runtimepy.net.mixin.BinaryMessageQueueMixin[source]#

Bases: object

A mixin for adding a ‘queue’ attribute.

class runtimepy.net.mixin.TransportMixin(transport: BaseTransport)[source]#

Bases: object

A class simplifying evaluation of local and remote addresses.

logger_name(prefix: str = '') str[source]#

Get a logger name for this connection.

mtu(probe_size: int = 1432, fallback: int = 1500, probe_create: ~typing.Callable[[int], bytes] = <class 'bytes'>) int[source]#

Get a maximum transmission unit for this connection. Underlying sockets must be connected for this to work.

remote_address: IPv4Host | IPv6Host | None#
set_transport(transport: BaseTransport) None[source]#

Set the transport for this instance.

property socket: socket#

Get this instance’s underlying socket.

runtimepy.net.mtu module#

A module implementing utilities for calculating maximum transmission-unit sizes.

class runtimepy.net.mtu.SocketConstants(*values)[source]#

Bases: IntEnum

Some platform definitions necessary for mtu discovery.

IP_MTU = 14#
IP_MTU_DISCOVER = 10#
IP_PMTUDISC_DO = 2#
runtimepy.net.mtu.discover_mtu(*destination: str | int | ~runtimepy.net.util.IPv4Host | ~runtimepy.net.util.IPv6Host | None, local: ~runtimepy.net.util.IPv4Host | ~runtimepy.net.util.IPv6Host = None, probe_size: int = 1432, fallback: int = 1500, kind: int = SocketKind.SOCK_DGRAM, probe_create: ~typing.Callable[[int], bytes] = <class 'bytes'>) int[source]#

Determine the maximum transmission unit for an IPv4 payload to a provided host.

runtimepy.net.mtu.host_discover_mtu(local: ~runtimepy.net.util.IPv4Host | ~runtimepy.net.util.IPv6Host, destination: ~runtimepy.net.util.IPv4Host | ~runtimepy.net.util.IPv6Host, probe_size: int, fallback: int, kind: int = SocketKind.SOCK_DGRAM, probe_create: ~typing.Callable[[int], bytes] = <class 'bytes'>) int[source]#

Perform MTU discovery given a local and remote host plus probe size.

runtimepy.net.mtu.socket_discover_mtu(sock: ~_socket.socket, probe_size: int, fallback: int, probe_create: ~typing.Callable[[int], bytes] = <class 'bytes'>) int[source]#

Send a large frame and indicate that we want to perform mtu discovery, and not fragment any frames.

runtimepy.net.ssl module#

A module implementing SSL-related interfaces.

runtimepy.net.ssl.handle_possible_ssl(client: bool = True, **kwargs) dict[str, Any][source]#

Handle creating an SSL context based on keyword arguments.

runtimepy.net.util module#

A module implementing various networking utilities.

class runtimepy.net.util.IPv4Host(name: str = '', port: int = 0)[source]#

Bases: NamedTuple

See: https://docs.python.org/3/library/socket.html#socket-families.

property address: IPv4Address#

Get an address object for this hostname.

property address_str: str#

Get an address string for this host.

property address_str_tuple: tuple[str, int]#

Get a string-address tuple for this instance.

property family: int#

Address family constant.

property hostname: str#

Get a hostname for this instance.

name: str#

Alias for field number 0

port: int#

Alias for field number 1

zero_port() IPv4Host[source]#

Get a zeroed-out port instance.

class runtimepy.net.util.IPv6Host(name: str = '', port: int = 0, flowinfo: int = 0, scope_id: int = 0)[source]#

Bases: NamedTuple

See: https://docs.python.org/3/library/socket.html#socket-families.

property address: IPv6Address#

Get an address object for this hostname.

property address_str: str#

Get an address string for this host.

property address_str_tuple: tuple[str, int, int, int]#

Get a string-address tuple for this instance.

property family: int#

Address family constant.

flowinfo: int#

Alias for field number 2

property hostname: str#

Get a hostname for this instance.

name: str#

Alias for field number 0

port: int#

Alias for field number 1

scope_id: int#

Alias for field number 3

zero_port() IPv6Host[source]#

Get a zeroed-out port instance.

runtimepy.net.util.address_str(name: str, fallback_host: str = 'localhost', **kwargs) str[source]#

Get an IP address string for a given name.

runtimepy.net.util.get_free_socket(local: IPv4Host | IPv6Host = None, kind: int = SocketKind.SOCK_STREAM, reuse: bool = False) socket[source]#

Attempt to get an available socket.

runtimepy.net.util.get_free_socket_name(local: IPv4Host | IPv6Host = None, kind: int = SocketKind.SOCK_STREAM) IPv4Host | IPv6Host[source]#

Create a socket to determine an arbitrary port number that’s available. There is an inherent race condition using this strategy.

runtimepy.net.util.hostname(ip_address: str) str[source]#

Attempt to get a string hostname for a string IP address argument that ‘gethostbyaddr’ accepts. Otherwise return the original string

runtimepy.net.util.hostname_port(ip_address: str, port: int) str[source]#

Get a hostname string with a port appended.

runtimepy.net.util.normalize_host(*args: str | int | ~runtimepy.net.util.IPv4Host | ~runtimepy.net.util.IPv6Host | None, default: type[~runtimepy.net.util.IPv4Host | ~runtimepy.net.util.IPv6Host] = <class 'runtimepy.net.util.IPv4Host'>) IPv4Host | IPv6Host[source]#

Get a host object from caller parameters.

runtimepy.net.util.sockname(sock: socket) IPv4Host | IPv6Host[source]#

Get address information from a socket.

async runtimepy.net.util.try_log_connection_error(future: Awaitable[T], message: str, logger: Logger | LoggerAdapter[Any] = None) T | None[source]#

Attempt to resolve a future but log any connection-related exceptions.

Module contents#

A module aggregating commonly used networking interface.

class runtimepy.net.Connection(logger: Logger | LoggerAdapter[Any], env: ChannelEnvironment = None, add_metrics: bool = True, markdown: str = None)[source]#

Bases: LoggerMixinLevelControl, ChannelEnvironmentMixin, MarkdownMixin, ABC

A connection interface.

async async_init() bool[source]#

A runtime initialization routine (executes during ‘process’).

property auto_restart: bool#

Determine if this connection should be automatically restarted.

byte_order: ByteOrder = 4#
async close() None[source]#

Close this connection.

connected = True#
default_auto_restart = False#
disable(reason: str) None[source]#

Disable this connection.

disable_extra() None[source]#

Additional tasks to perform when disabling.

async disable_in(time: float) None[source]#

A method for disabling a connection after some delay.

property disabled: bool#

Determine if this connection is disabled.

env: ChannelEnvironment#
init() None[source]#

Initialize this instance.

log_metrics(label: str = 'conn') None[source]#

Log connection metrics.

logger: LoggerType#
markdown: str#
async process(stop_sig: Event = None, disable_time: float = None, backoff: ExponentialBackoff = None) None[source]#

Process tasks for this connection while the connection is active.

async process_binary(data: bytes) bool[source]#

Process a binary frame.

async process_text(data: str) bool[source]#

Process a text frame.

process_then_disable(**kwargs) AsyncIterator[None][source]#

Process this connection, then disable and wait for completion.

async restart() bool[source]#

Reset necessary underlying state for this connection to ‘process’ again.

send_binary(data: bytes | bytearray | memoryview) None[source]#

Enqueue a binary message tos end.

send_text(data: str) None[source]#

Enqueue a text message to send.

property tasks: Iterator[Task[None]]#

Get active connection tasks. Instance uses this opportunity to release references to any completed tasks.

uses_binary_tx_queue = True#
uses_text_tx_queue = True#
class runtimepy.net.ConnectionManager[source]#

Bases: LoggerMixin

A class for managing connection processing at runtime.

by_type(kind: type[T]) Iterator[T][source]#

Iterate over connections of a specific type.

property connection_tasks: Iterator[Task[None]]#

Iterate over connection tasks.

async manage(stop_sig: Event) None[source]#

Handle incoming connections until the stop signal is set.

property num_connections: int#

Return the number of managed connections.

poll_metrics(time_ns: int = None) None[source]#

Poll connection metrics.

reset_metrics() None[source]#

Reset connection metrics.

class runtimepy.net.EchoConnection(logger: Logger | LoggerAdapter[Any], env: ChannelEnvironment = None, add_metrics: bool = True, markdown: str = None)[source]#

Bases: Connection

A connection that just echoes what it was sent.

async process_binary(data: bytes) bool[source]#

Process a binary frame.

async process_text(data: str) bool[source]#

Process a text frame.

class runtimepy.net.ExponentialBackoff(interval: float = 0.1, max_sleep: float = 10.0, max_tries: int = 20)[source]#

Bases: object

A class implementing a simple exponential-backoff handler.

attempt: int#
property give_up: bool#

Determine whether or not to give up based on the number of attempts.

reset() None[source]#

Reset this instance’s state.

async sleep() None[source]#

Sleep for the correct amount of time.

wait: float#
class runtimepy.net.IPv4Host(name: str = '', port: int = 0)[source]#

Bases: NamedTuple

See: https://docs.python.org/3/library/socket.html#socket-families.

property address: IPv4Address#

Get an address object for this hostname.

property address_str: str#

Get an address string for this host.

property address_str_tuple: tuple[str, int]#

Get a string-address tuple for this instance.

property family: int#

Address family constant.

property hostname: str#

Get a hostname for this instance.

name: str#

Alias for field number 0

port: int#

Alias for field number 1

zero_port() IPv4Host[source]#

Get a zeroed-out port instance.

class runtimepy.net.IPv6Host(name: str = '', port: int = 0, flowinfo: int = 0, scope_id: int = 0)[source]#

Bases: NamedTuple

See: https://docs.python.org/3/library/socket.html#socket-families.

property address: IPv6Address#

Get an address object for this hostname.

property address_str: str#

Get an address string for this host.

property address_str_tuple: tuple[str, int, int, int]#

Get a string-address tuple for this instance.

property family: int#

Address family constant.

flowinfo: int#

Alias for field number 2

property hostname: str#

Get a hostname for this instance.

name: str#

Alias for field number 0

port: int#

Alias for field number 1

scope_id: int#

Alias for field number 3

zero_port() IPv6Host[source]#

Get a zeroed-out port instance.

class runtimepy.net.NullConnection(logger: Logger | LoggerAdapter[Any], env: ChannelEnvironment = None, add_metrics: bool = True, markdown: str = None)[source]#

Bases: Connection

A connection that doesn’t do anything with incoming data.

async process_binary(data: bytes) bool[source]#

Process a binary frame.

async process_text(data: str) bool[source]#

Process a text frame.

runtimepy.net.get_free_socket(local: IPv4Host | IPv6Host = None, kind: int = SocketKind.SOCK_STREAM, reuse: bool = False) socket[source]#

Attempt to get an available socket.

runtimepy.net.get_free_socket_name(local: IPv4Host | IPv6Host = None, kind: int = SocketKind.SOCK_STREAM) IPv4Host | IPv6Host[source]#

Create a socket to determine an arbitrary port number that’s available. There is an inherent race condition using this strategy.

runtimepy.net.hostname(ip_address: str) str[source]#

Attempt to get a string hostname for a string IP address argument that ‘gethostbyaddr’ accepts. Otherwise return the original string

runtimepy.net.normalize_host(*args: str | int | ~runtimepy.net.util.IPv4Host | ~runtimepy.net.util.IPv6Host | None, default: type[~runtimepy.net.util.IPv4Host | ~runtimepy.net.util.IPv6Host] = <class 'runtimepy.net.util.IPv4Host'>) IPv4Host | IPv6Host[source]#

Get a host object from caller parameters.

runtimepy.net.sockname(sock: socket) IPv4Host | IPv6Host[source]#

Get address information from a socket.

async runtimepy.net.try_log_connection_error(future: Awaitable[T], message: str, logger: Logger | LoggerAdapter[Any] = None) T | None[source]#

Attempt to resolve a future but log any connection-related exceptions.