runtimepy.task.basic package#
Submodules#
runtimepy.task.basic.manager module#
A module implementing a periodic-task manager.
runtimepy.task.basic.periodic module#
A module implementing a basic periodic task.
- class runtimepy.task.basic.periodic.PeriodicTask(name: str, average_depth: int = 10, metrics: PeriodicTaskMetrics = None, period_s: float = 1.0, env: ChannelEnvironment = None, period_controls: dict[str, int | float | bool | dict[str, int | float | bool]] | str = 'period', markdown: str = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None)[source]#
Bases:
LoggerMixinLevelControl
,ChannelEnvironmentMixin
,MarkdownMixin
,ABC
A class implementing a simple periodic-task interface.
- auto_finalize = True#
- config: _JsonObject#
- env: ChannelEnvironment#
- logger: LoggerType#
- markdown: str#
- async run(period_s: float = None, stop_sig: Event = None) None [source]#
Run this task by executing the dispatch method at the specified period until a dispatch iteration fails or the task is otherwise disabled.
- set_period(period_s: float = None, update_default: bool = True) bool [source]#
Attempt to set a new period for this task.
- async task(period_s: float = None, stop_sig: Event = None) Task[None] [source]#
Create an event-loop task for this periodic.
- async wait_for_disable(timeout: float, value: bool = False) EvalResult [source]#
Wait for a task to become disabled.
Module contents#
A module implementing a simple periodic-task interface.
- class runtimepy.task.basic.PeriodicTask(name: str, average_depth: int = 10, metrics: PeriodicTaskMetrics = None, period_s: float = 1.0, env: ChannelEnvironment = None, period_controls: dict[str, int | float | bool | dict[str, int | float | bool]] | str = 'period', markdown: str = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None)[source]#
Bases:
LoggerMixinLevelControl
,ChannelEnvironmentMixin
,MarkdownMixin
,ABC
A class implementing a simple periodic-task interface.
- auto_finalize = True#
- config: _JsonObject#
- env: ChannelEnvironment#
- logger: LoggerType#
- markdown: str#
- async run(period_s: float = None, stop_sig: Event = None) None [source]#
Run this task by executing the dispatch method at the specified period until a dispatch iteration fails or the task is otherwise disabled.
- set_period(period_s: float = None, update_default: bool = True) bool [source]#
Attempt to set a new period for this task.
- async task(period_s: float = None, stop_sig: Event = None) Task[None] [source]#
Create an event-loop task for this periodic.
- async wait_for_disable(timeout: float, value: bool = False) EvalResult [source]#
Wait for a task to become disabled.