runtimepy.net.udp package#
Subpackages#
- runtimepy.net.udp.tftp package
- Submodules
- runtimepy.net.udp.tftp.base module
BaseTftpConnection
BaseTftpConnection.default_auto_restart
BaseTftpConnection.endpoint()
BaseTftpConnection.init()
BaseTftpConnection.log_alias
BaseTftpConnection.path
BaseTftpConnection.process_datagram()
BaseTftpConnection.send_ack()
BaseTftpConnection.send_data()
BaseTftpConnection.send_error()
BaseTftpConnection.send_rrq()
BaseTftpConnection.send_wrq()
BaseTftpConnection.set_root()
BaseTftpConnection.should_connect
- runtimepy.net.udp.tftp.endpoint module
TftpEndpoint
TftpEndpoint.chunk_sender()
TftpEndpoint.get_path()
TftpEndpoint.handle_ack()
TftpEndpoint.handle_data()
TftpEndpoint.handle_error()
TftpEndpoint.handle_read_request()
TftpEndpoint.handle_write_request()
TftpEndpoint.ingest_file()
TftpEndpoint.serve_file()
TftpEndpoint.set_root()
TftpEndpoint.update_from_other()
- runtimepy.net.udp.tftp.enums module
- runtimepy.net.udp.tftp.io module
- Module contents
Submodules#
runtimepy.net.udp.connection module#
A module implementing a UDP connection interface.
- class runtimepy.net.udp.connection.EchoUdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
UdpConnection
,EchoConnection
An echo connection for UDP.
- class runtimepy.net.udp.connection.NullUdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
UdpConnection
,NullConnection
A null UDP connection.
- class runtimepy.net.udp.connection.UdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
Connection
,TransportMixin
A UDP connection interface.
- async classmethod create_connection(markdown: str = None, **kwargs) T [source]#
Create a UDP connection.
- env: ChannelEnvironment#
- latest_rx_address: tuple[str, int] | None#
- log_alias = 'UDP'#
- logger: LoggerType#
- markdown: str#
- abstractmethod async process_datagram(data: bytes, addr: tuple[str, int]) bool [source]#
Process a datagram.
- remote_address: _Optional[_IpHost]#
- async restart() bool [source]#
Reset necessary underlying state for this connection to ‘process’ again.
- sendto(data: bytes, addr: IPv4Host | IPv6Host | tuple[str, int] | tuple[str, int, int, int] = None) None [source]#
Send to a specific address.
- set_remote_address(addr: IPv4Host | IPv6Host) None [source]#
Set a new remote address. Note that this doesn’t interact with or attempt to ‘connect’ the underlying socket. That should be done at creation time.
- should_connect: bool = True#
- uses_binary_tx_queue = False#
- uses_text_tx_queue = False#
runtimepy.net.udp.create module#
A module for instantiating the underlying networking resources for UdpConnection.
- async runtimepy.net.udp.create.try_udp_transport_protocol(callback: Callable[[tuple[DatagramTransport, UdpQueueProtocol]], None] = None, **kwargs) tuple[DatagramTransport, UdpQueueProtocol] | None [source]#
Attempt to create a transport and protocol pair.
- async runtimepy.net.udp.create.udp_transport_protocol(**kwargs) tuple[DatagramTransport, UdpQueueProtocol] [source]#
Create a transport and protocol pair relevant for this class’s implementation.
- async runtimepy.net.udp.create.udp_transport_protocol_backoff(backoff: ExponentialBackoff = None, **kwargs) tuple[DatagramTransport, UdpQueueProtocol] [source]#
Create a transport and protocol pair relevant for this class’s implementation.
runtimepy.net.udp.protocol module#
A module implementing a DatagramProtocol for UdpConnection.
- class runtimepy.net.udp.protocol.UdpQueueProtocol[source]#
Bases:
DatagramProtocol
A simple UDP protocol that populates a message queue.
- conn: Connection#
- logger: Logger | LoggerAdapter[Any]#
runtimepy.net.udp.queue module#
A module implementing a simple queue-based UDP interface.
- class runtimepy.net.udp.queue.QueueUdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
UdpConnection
An echo connection for UDP.
- datagrams: Queue[tuple[bytes, tuple[str, int]]]#
Module contents#
A module implementing networking utilities relevant to UDP.
- class runtimepy.net.udp.EchoUdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
UdpConnection
,EchoConnection
An echo connection for UDP.
- class runtimepy.net.udp.NullUdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
UdpConnection
,NullConnection
A null UDP connection.
- class runtimepy.net.udp.QueueUdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
UdpConnection
An echo connection for UDP.
- datagrams: Queue[tuple[bytes, tuple[str, int]]]#
- env: ChannelEnvironment#
- latest_rx_address: _Optional[tuple[str, int]]#
- logger: LoggerType#
- markdown: str#
- remote_address: _Optional[_IpHost]#
- class runtimepy.net.udp.UdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
Connection
,TransportMixin
A UDP connection interface.
- async classmethod create_connection(markdown: str = None, **kwargs) T [source]#
Create a UDP connection.
- env: ChannelEnvironment#
- latest_rx_address: tuple[str, int] | None#
- log_alias = 'UDP'#
- logger: LoggerType#
- markdown: str#
- abstractmethod async process_datagram(data: bytes, addr: tuple[str, int]) bool [source]#
Process a datagram.
- remote_address: _Optional[_IpHost]#
- async restart() bool [source]#
Reset necessary underlying state for this connection to ‘process’ again.
- sendto(data: bytes, addr: IPv4Host | IPv6Host | tuple[str, int] | tuple[str, int, int, int] = None) None [source]#
Send to a specific address.
- set_remote_address(addr: IPv4Host | IPv6Host) None [source]#
Set a new remote address. Note that this doesn’t interact with or attempt to ‘connect’ the underlying socket. That should be done at creation time.
- should_connect: bool = True#
- uses_binary_tx_queue = False#
- uses_text_tx_queue = False#