runtimepy.task package#
Subpackages#
- runtimepy.task.basic package
- Submodules
- runtimepy.task.basic.manager module
- runtimepy.task.basic.periodic module
PeriodicTask
PeriodicTask.auto_finalize
PeriodicTask.config
PeriodicTask.disable()
PeriodicTask.dispatch()
PeriodicTask.env
PeriodicTask.logger
PeriodicTask.markdown
PeriodicTask.run()
PeriodicTask.set_period()
PeriodicTask.stop()
PeriodicTask.stop_extra()
PeriodicTask.task()
PeriodicTask.wait_for_disable()
PeriodicTask.wait_iterations()
- Module contents
PeriodicTask
PeriodicTask.auto_finalize
PeriodicTask.config
PeriodicTask.disable()
PeriodicTask.dispatch()
PeriodicTask.env
PeriodicTask.logger
PeriodicTask.markdown
PeriodicTask.run()
PeriodicTask.set_period()
PeriodicTask.stop()
PeriodicTask.stop_extra()
PeriodicTask.task()
PeriodicTask.wait_for_disable()
PeriodicTask.wait_iterations()
PeriodicTaskManager
- runtimepy.task.trig package
Submodules#
runtimepy.task.asynchronous module#
A module implementing an asynchronous task interface.
- class runtimepy.task.asynchronous.AsyncTask(name: str, period_s: float, env: ChannelEnvironment, average_depth: int = 10, max_iterations: int = 0)[source]#
Bases:
LoggerMixin
A basic implementation of a periodic task.
- init_channels(env: ChannelEnvironment) None [source]#
Initialize task-specific channels.
- property rate_str: str#
Get this periodic’s rate as a string.
runtimepy.task.sample module#
A sample task interface.
- class runtimepy.task.sample.Sample[source]#
Bases:
TaskFactory
[SampleTask
]A sample-task application factory.
- kind#
alias of
SampleTask
- class runtimepy.task.sample.SampleApp[source]#
Bases:
TaskFactory
[SampleAppTask
]A TUI application factory.
- kind#
alias of
SampleAppTask
- class runtimepy.task.sample.SampleAppTask(name: str, average_depth: int = 10, metrics: PeriodicTaskMetrics = None, period_s: float = 1.0, env: ChannelEnvironment = None, period_controls: dict[str, int | float | bool | dict[str, int | float | bool]] | str = 'period', markdown: str = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None)[source]#
Bases:
ArbiterTask
A base TUI application.
- class runtimepy.task.sample.SampleTask(name: str, average_depth: int = 10, metrics: PeriodicTaskMetrics = None, period_s: float = 1.0, env: ChannelEnvironment = None, period_controls: dict[str, int | float | bool | dict[str, int | float | bool]] | str = 'period', markdown: str = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None)[source]#
Bases:
ArbiterTask
,TrigMixin
,AsyncCommandProcessingMixin
A sample application.
Module contents#
A module implementing an asynchronous task interface.
- class runtimepy.task.AsyncTask(name: str, period_s: float, env: ChannelEnvironment, average_depth: int = 10, max_iterations: int = 0)[source]#
Bases:
LoggerMixin
A basic implementation of a periodic task.
- init_channels(env: ChannelEnvironment) None [source]#
Initialize task-specific channels.
- property rate_str: str#
Get this periodic’s rate as a string.
- class runtimepy.task.PeriodicTask(name: str, average_depth: int = 10, metrics: PeriodicTaskMetrics = None, period_s: float = 1.0, env: ChannelEnvironment = None, period_controls: dict[str, int | float | bool | dict[str, int | float | bool]] | str = 'period', markdown: str = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None)[source]#
Bases:
LoggerMixinLevelControl
,ChannelEnvironmentMixin
,MarkdownMixin
,ABC
A class implementing a simple periodic-task interface.
- auto_finalize = True#
- async run(period_s: float = None, stop_sig: Event = None) None [source]#
Run this task by executing the dispatch method at the specified period until a dispatch iteration fails or the task is otherwise disabled.
- set_period(period_s: float = None, update_default: bool = True) bool [source]#
Attempt to set a new period for this task.
- async task(period_s: float = None, stop_sig: Event = None) Task[None] [source]#
Create an event-loop task for this periodic.
- async wait_for_disable(timeout: float, value: bool = False) EvalResult [source]#
Wait for a task to become disabled.
- class runtimepy.task.PeriodicTaskManager[source]#
Bases:
Generic
[T
]A class for managing periodic tasks as a single group.
- property tasks: Iterator[T]#
Iterate over tasks.
- class runtimepy.task.PeriodicTaskMetrics(dispatches: Uint32Primitive, rate_hz: FloatPrimitive, average_s: FloatPrimitive, max_s: FloatPrimitive, min_s: FloatPrimitive, overruns: Uint16Primitive)[source]#
Bases:
NamedTuple
Metrics for a periodic tasks.
- average_s: FloatPrimitive#
Alias for field number 2
- static create(time_source: ~typing.Callable[[], int] = <function metrics_time_ns>) PeriodicTaskMetrics [source]#
Create a new metrics instance.
- dispatches: Uint32Primitive#
Alias for field number 0
- max_s: FloatPrimitive#
Alias for field number 3
- measure(rate: RateTracker, dispatch: MovingAverage, iter_time: DoublePrimitive, period_s: float) Iterator[None] [source]#
Measure the time spent yielding and update data.
- min_s: FloatPrimitive#
Alias for field number 4
- overruns: Uint16Primitive#
Alias for field number 5
- rate_hz: FloatPrimitive#
Alias for field number 1