runtimepy.task.basic package#

Submodules#

runtimepy.task.basic.manager module#

A module implementing a periodic-task manager.

class runtimepy.task.basic.manager.PeriodicTaskManager[source]#

Bases: Generic[T]

A class for managing periodic tasks as a single group.

register(task: T, period_s: float = None) bool[source]#

Register a periodic task.

running(stop_sig: Event = None) AsyncIterator[None][source]#

Run tasks as an async context.

async start(stop_sig: Event = None) None[source]#

Ensure tasks are started.

async stop() None[source]#

Ensure tasks are stopped.

property tasks: Iterator[T]#

Iterate over tasks.

runtimepy.task.basic.periodic module#

A module implementing a basic periodic task.

class runtimepy.task.basic.periodic.PeriodicTask(name: str, average_depth: int = 10, metrics: PeriodicTaskMetrics = None, period_s: float = 1.0, env: ChannelEnvironment = None, period_controls: dict[str, int | float | bool | dict[str, int | float | bool]] | str = 'period', markdown: str = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None)[source]#

Bases: LoggerMixinLevelControl, ChannelEnvironmentMixin, MarkdownMixin, ABC

A class implementing a simple periodic-task interface.

auto_finalize = True#
config: _JsonObject#
disable() bool[source]#

Disable this task, return whether or not any action was taken.

abstractmethod async dispatch() bool[source]#

Dispatch an iteration of this task.

env: ChannelEnvironment#
logger: LoggerType#
markdown: str#
async run(period_s: float = None, stop_sig: Event = None) None[source]#

Run this task by executing the dispatch method at the specified period until a dispatch iteration fails or the task is otherwise disabled.

set_period(period_s: float = None, update_default: bool = True) bool[source]#

Attempt to set a new period for this task.

async stop() bool[source]#

Wait for this task to stop running (if it is).

async stop_extra() None[source]#

Extra actions to perform when this task is stopping.

async task(period_s: float = None, stop_sig: Event = None) Task[None][source]#

Create an event-loop task for this periodic.

async wait_for_disable(timeout: float, value: bool = False) EvalResult[source]#

Wait for a task to become disabled.

async wait_iterations(timeout: float, count: int = 1) bool[source]#

Wait for a task to complete a certain number of iterations.

Module contents#

A module implementing a simple periodic-task interface.

class runtimepy.task.basic.PeriodicTask(name: str, average_depth: int = 10, metrics: PeriodicTaskMetrics = None, period_s: float = 1.0, env: ChannelEnvironment = None, period_controls: dict[str, int | float | bool | dict[str, int | float | bool]] | str = 'period', markdown: str = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None)[source]#

Bases: LoggerMixinLevelControl, ChannelEnvironmentMixin, MarkdownMixin, ABC

A class implementing a simple periodic-task interface.

auto_finalize = True#
config: _JsonObject#
disable() bool[source]#

Disable this task, return whether or not any action was taken.

abstractmethod async dispatch() bool[source]#

Dispatch an iteration of this task.

env: ChannelEnvironment#
logger: LoggerType#
markdown: str#
async run(period_s: float = None, stop_sig: Event = None) None[source]#

Run this task by executing the dispatch method at the specified period until a dispatch iteration fails or the task is otherwise disabled.

set_period(period_s: float = None, update_default: bool = True) bool[source]#

Attempt to set a new period for this task.

async stop() bool[source]#

Wait for this task to stop running (if it is).

async stop_extra() None[source]#

Extra actions to perform when this task is stopping.

async task(period_s: float = None, stop_sig: Event = None) Task[None][source]#

Create an event-loop task for this periodic.

async wait_for_disable(timeout: float, value: bool = False) EvalResult[source]#

Wait for a task to become disabled.

async wait_iterations(timeout: float, count: int = 1) bool[source]#

Wait for a task to complete a certain number of iterations.

class runtimepy.task.basic.PeriodicTaskManager[source]#

Bases: Generic[T]

A class for managing periodic tasks as a single group.

register(task: T, period_s: float = None) bool[source]#

Register a periodic task.

running(stop_sig: Event = None) AsyncIterator[None][source]#

Run tasks as an async context.

async start(stop_sig: Event = None) None[source]#

Ensure tasks are started.

async stop() None[source]#

Ensure tasks are stopped.

property tasks: Iterator[T]#

Iterate over tasks.