runtimepy.net.udp.tftp package#
Submodules#
runtimepy.net.udp.tftp.base module#
A module implementing a base tftp (RFC 1350) connection interface.
- class runtimepy.net.udp.tftp.base.BaseTftpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
UdpConnection
A class implementing a basic tftp interface.
- default_auto_restart = True#
- endpoint(addr: IPv4Host | IPv6Host | tuple[str, int] | tuple[str, int, int, int] = None) TftpEndpoint [source]#
Lookup an endpoint instance from an address.
- log_alias = 'TFTP'#
- property path: Path#
Get this connection’s root path.
- send_ack(block: int = 0, addr: IPv4Host | IPv6Host | tuple[str, int] | tuple[str, int, int, int] = None) None [source]#
Send a data message.
- send_data(block: int, data: bytes, addr: IPv4Host | IPv6Host | tuple[str, int] | tuple[str, int, int, int] = None) None [source]#
Send a data message.
- send_error(error_code: TftpErrorCode, message: str, addr: IPv4Host | IPv6Host | tuple[str, int] | tuple[str, int, int, int] = None) None [source]#
Send a data message.
- send_rrq(filename: str, mode: str = 'octet', addr: IPv4Host | IPv6Host | tuple[str, int] | tuple[str, int, int, int] = None) None [source]#
Send a read request.
- send_wrq(filename: str, mode: str = 'octet', addr: IPv4Host | IPv6Host | tuple[str, int] | tuple[str, int, int, int] = None) None [source]#
Send a write request.
- should_connect: bool = False#
runtimepy.net.udp.tftp.endpoint module#
A module implementing an interface for individual tftp endpoints.
- class runtimepy.net.udp.tftp.endpoint.TftpEndpoint(root: Path, logger: Logger | LoggerAdapter[Any], addr: IPv4Host | IPv6Host, data_sender: Callable[[int, bytes, IPv4Host | IPv6Host | tuple[str, int]], None], ack_sender: Callable[[int, IPv4Host | IPv6Host | tuple[str, int]], None], error_sender: Callable[[TftpErrorCode, str, IPv4Host | IPv6Host | tuple[str, int]], None], period: DoublePrimitive, timeout: DoublePrimitive)[source]#
Bases:
LoggerMixin
A data structure for endpoint-related runtime storage.
- chunk_sender(block: int, data: bytes) Callable[[], None] [source]#
Create a method that sends a specific block of data.
- handle_error(error_code: TftpErrorCode, message: str) None [source]#
Handle a tftp error message.
- handle_read_request(filename: str, mode: str) Task[None] | None [source]#
Handle a read-request message.
- update_from_other(other: TftpEndpoint) TftpEndpoint [source]#
Update this endpoint’s attributes with attributes of another’s.
runtimepy.net.udp.tftp.enums module#
A module implementing tftp enums and other protocol minutia interfaces.
- class runtimepy.net.udp.tftp.enums.TftpErrorCode(*values)[source]#
Bases:
RuntimeIntEnum
A runtime enumeration for tftp error codes.
- ACCESS_VIOLATION = 2#
- DISK_FULL = 3#
- FILE_EXISTS = 6#
- FILE_NOT_FOUND = 1#
- ILLEGAL_OPERATION = 4#
- NO_USER = 7#
- OPTION_NEGOTIATION = 8#
- UNKNOWN = 0#
- UNKNOWN_ID = 5#
- class runtimepy.net.udp.tftp.enums.TftpOpCode(*values)[source]#
Bases:
RuntimeIntEnum
A runtime enumeration for tftp op codes.
- ACK = 4#
- DATA = 3#
- ERROR = 5#
- INVALID = 0#
- RRQ = 1#
- WRQ = 2#
runtimepy.net.udp.tftp.io module#
A module implementing I/O utilities related to tftp transactions.
Module contents#
A module implementing a tftp (RFC 1350) interface.
- class runtimepy.net.udp.tftp.TftpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
BaseTftpConnection
A class implementing a basic tftp interface.
- runtimepy.net.udp.tftp.tftp(addr: IPv4Host | IPv6Host | tuple[str, int] | tuple[str, int, int, int], process_kwargs: dict[str, Any] = None, connection_kwargs: dict[str, Any] = None, timeout_s: float = 1.0, reemit_period_s: float = 0.2, error_handler: Callable[[TftpErrorCode, str, tuple[str, int]], None] = None) AsyncIterator[TftpConnection] [source]#
Use a tftp connection as a managed context.
- async runtimepy.net.udp.tftp.tftp_read(addr: IPv4Host | IPv6Host | tuple[str, int] | tuple[str, int, int, int], destination: Path, filename: str, mode: str = 'octet', process_kwargs: dict[str, Any] = None, connection_kwargs: dict[str, Any] = None, timeout_s: float = 1.0, reemit_period_s: float = 0.2, error_handler: Callable[[TftpErrorCode, str, tuple[str, int]], None] = None) bool [source]#
Attempt to perform a tftp read.
- async runtimepy.net.udp.tftp.tftp_write(addr: IPv4Host | IPv6Host | tuple[str, int] | tuple[str, int, int, int], source: Path | bytes | str, filename: str, mode: str = 'octet', verify: bool = True, process_kwargs: dict[str, Any] = None, connection_kwargs: dict[str, Any] = None, timeout_s: float = 1.0, reemit_period_s: float = 0.2, error_handler: Callable[[TftpErrorCode, str, tuple[str, int]], None] = None) bool [source]#
Attempt to perform a tftp write.