runtimepy.net.arbiter.struct package#
Module contents#
A module implementing runtime struct interfaces.
- class runtimepy.net.arbiter.struct.TimestampedStruct(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], markdown: str = None)[source]#
Bases:
RuntimeStruct
A bast struct with a timestamp field.
- log_level_channel: bool = False#
- sequence: Uint16Primitive#
- time_keeper() int #
Get a timestamp value using a default method.
- timestamp: Uint64Primitive#
- class runtimepy.net.arbiter.struct.UdpStructTransceiver(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
UdpConnection
,Generic
[T
]A connection that can send and receive arrays of structs.
- assign_app_rx(pattern: str, app: AppInfo) T | None [source]#
Attempt to assign a receive struct to this instance.
- assign_app_tx(pattern: str, app: AppInfo) T | None [source]#
Attempt to assign a transmit struct to this instance.
- capture(sample: bool = True, flush: bool = False) None [source]#
Sample this struct and possibly send telemetry.
- framer_tx: SerializableFramer#
- handle_update(timestamp_ns: int, instance: T, addr: tuple[str, int]) None [source]#
Handle individual struct updates.
- async process_datagram(data: bytes, addr: tuple[str, int]) bool [source]#
Process an array of struct instances.
- struct_kind: type[T]#
- struct_rx: T | None = None#
- struct_tx: T | None = None#