runtimepy.channel package#
Subpackages#
- runtimepy.channel.environment package
- Subpackages
- Submodules
- runtimepy.channel.environment.array module
- runtimepy.channel.environment.base module
BaseChannelEnvironment
BaseChannelEnvironment.add_field()
BaseChannelEnvironment.add_int()
BaseChannelEnvironment.age_ns()
BaseChannelEnvironment.apply()
BaseChannelEnvironment.exists()
BaseChannelEnvironment.field_or_channel()
BaseChannelEnvironment.finalize()
BaseChannelEnvironment.get()
BaseChannelEnvironment.get_bool()
BaseChannelEnvironment.get_float()
BaseChannelEnvironment.get_int()
BaseChannelEnvironment.sample_bool_for()
BaseChannelEnvironment.sample_enum_for()
BaseChannelEnvironment.sample_float_for()
BaseChannelEnvironment.sample_int_for()
BaseChannelEnvironment.set()
BaseChannelEnvironment.value()
BaseChannelEnvironment.values()
BaseChannelEnvironment.wait_for_bool()
BaseChannelEnvironment.wait_for_enum()
BaseChannelEnvironment.wait_for_numeric()
BaseChannelEnvironment.wait_for_numeric_isclose()
- runtimepy.channel.environment.create module
- runtimepy.channel.environment.file module
- runtimepy.channel.environment.sample module
- runtimepy.channel.environment.telemetry module
- Module contents
- runtimepy.channel.event package
Submodules#
runtimepy.channel.registry module#
A module implementing a channel registry.
- class runtimepy.channel.registry.ChannelNameRegistry(mapping: MutableMapping[int, str] = None, reverse: MutableMapping[str, int] = None)[source]#
Bases:
NameRegistry
A name registry with a name-matching pattern for channel names.
- name_regex: _Pattern | None = re.compile('^[a-zA-Z0-9_.-]+$')#
- class runtimepy.channel.registry.ChannelRegistry(data: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None, schemas: SchemaMap = None, dest_attr: str = 'data', verify: bool = True)[source]#
-
A runtime enumeration registry.
- channel(name: str, kind: Primitive[Any] | type[Int8Primitive | Int16Primitive | Int32Primitive | Int64Primitive | Uint8Primitive | Uint16Primitive | Uint32Primitive | Uint64Primitive | HalfPrimitive | FloatPrimitive | DoublePrimitive | BooleanPrimitive] | str, commandable: bool = False, enum: str | int = None, scaling: list[float | int] = None, description: str = None, default: int | float | bool | None = None, controls: dict[str, int | float | bool | dict[str, int | float | bool]] | str = None, **kwargs) Channel[Int8Primitive] | Channel[Int16Primitive] | Channel[Int32Primitive] | Channel[Int64Primitive] | Channel[Uint8Primitive] | Channel[Uint16Primitive] | Channel[Uint32Primitive] | Channel[Uint64Primitive] | Channel[FloatPrimitive] | Channel[DoublePrimitive] | Channel[BooleanPrimitive] | None [source]#
Create a new channel.
- event_fifo: ByteFifo#
- header_ready: bool#
- init(data: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]]) None [source]#
Perform implementation-specific initialization.
- name_registry#
alias of
ChannelNameRegistry
- registered(stream: BinaryIO, pattern: str = '.*', exact: bool = False, flush: bool = False, channel: ChannelMetrics = None) Iterator[list[str]] [source]#
Register a stream as a managed context. Returns a list of all channels registered.
- class runtimepy.channel.registry.ParsedEvent(name: str, timestamp: int, value: bool | int | float)[source]#
Bases:
NamedTuple
A raw channel event.
- static by_channel(event_stream: Iterable[ParsedEvent]) dict[str, list[ParsedEvent]] [source]#
Get a dictionary of channel events broken down by individual channels.
- name: str#
Alias for field number 0
- timestamp: int#
Alias for field number 1
- value: bool | int | float#
Alias for field number 2
Module contents#
A module implementing a basic channel interface.
- class runtimepy.channel.Channel(data: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None, schemas: SchemaMap = None, dest_attr: str = 'data', verify: bool = True)[source]#
Bases:
RegistryItem
,EnumMixin
,Generic
[T
]An interface for an individual channel.
- asdict() dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] [source]#
Obtain a dictionary representing this instance.
- default: int | float | bool | None#
- property has_default: bool#
Determine if this channel has a default value configured.
- init(data: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]]) None [source]#
Perform implementation-specific initialization.
- raw: T#
- set_default(default: int | float | bool | None = None) None [source]#
Set a new default value for this channel.
- property type: Int8Type | Int16Type | Int32Type | Int64Type | Uint8Type | Uint16Type | Uint32Type | Uint64Type | HalfType | FloatType | DoubleType | BooleanType#
Get the underlying primitive type of this channel.