Source code for runtimepy.net.tcp.create
"""
A module for instantiating the underlying networking resources for
TcpConnection.
"""
# built-in
import asyncio as _asyncio
from asyncio import Transport as _Transport
from logging import getLogger as _getLogger
from typing import Callable, Optional
# internal
from runtimepy.net.backoff import ExponentialBackoff
from runtimepy.net.ssl import handle_possible_ssl
from runtimepy.net.tcp.protocol import QueueProtocol
from runtimepy.net.util import try_log_connection_error
TcpTransportProtocol = tuple[_Transport, QueueProtocol]
LOG = _getLogger(__name__)
[docs]
async def tcp_transport_protocol(**kwargs) -> TcpTransportProtocol:
"""
Create a transport and protocol pair relevant for this class's
implementation.
"""
transport: _Transport
transport, protocol = await _asyncio.get_event_loop().create_connection(
QueueProtocol, **handle_possible_ssl(**kwargs)
)
return transport, protocol
TcpTransportProtocolCallback = Callable[[TcpTransportProtocol], None]
[docs]
async def try_tcp_transport_protocol(
callback: TcpTransportProtocolCallback = None,
**kwargs,
) -> Optional[TcpTransportProtocol]:
"""Attempt to create a transport and protocol pair."""
result = await try_log_connection_error(
tcp_transport_protocol(**kwargs),
f"Error creating TCP connection ({kwargs}):",
logger=LOG,
)
if callback is not None and result is not None:
callback(result)
return result
[docs]
async def tcp_transport_protocol_backoff(
backoff: ExponentialBackoff = None, **kwargs
) -> TcpTransportProtocol:
"""
Create a transport and protocol pair relevant for this class's
implementation.
"""
if backoff is None:
backoff = ExponentialBackoff()
result = None
while result is None and not backoff.give_up:
await backoff.sleep()
result = await try_tcp_transport_protocol(**kwargs)
assert result is not None, f"Couldn't create TCP connection '{kwargs}'."
return result