runtimepy.task package#

Subpackages#

Submodules#

runtimepy.task.asynchronous module#

A module implementing an asynchronous task interface.

class runtimepy.task.asynchronous.AsyncTask(name: str, period_s: float, env: ChannelEnvironment, average_depth: int = 10, max_iterations: int = 0)[source]#

Bases: LoggerMixin

A basic implementation of a periodic task.

disable() None[source]#

Disable this task.

async dispatch(*_, **__) bool[source]#

Dispatch this task.

enable() None[source]#

Enable this task.

async init(*_, **__) bool[source]#

Initialize this task.

init_channels(env: ChannelEnvironment) None[source]#

Initialize task-specific channels.

log_metrics() None[source]#

Log information related to metrics channels.

property rate_str: str#

Get this periodic’s rate as a string.

reset_metrics() None[source]#

Reset metrics channel values.

async run(*args, stop_sig: Event = None, **kwargs) None[source]#

Run this task while it’s enabled.

runtimepy.task.sample module#

A sample task interface.

class runtimepy.task.sample.Sample[source]#

Bases: TaskFactory[SampleTask]

A sample-task application factory.

kind#

alias of SampleTask

class runtimepy.task.sample.SampleApp[source]#

Bases: TaskFactory[SampleAppTask]

A TUI application factory.

kind#

alias of SampleAppTask

class runtimepy.task.sample.SampleAppTask(name: str, average_depth: int = 10, metrics: PeriodicTaskMetrics = None, period_s: float = 1.0, env: ChannelEnvironment = None, period_controls: dict[str, int | float | bool | dict[str, int | float | bool]] | str = 'period', markdown: str = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None)[source]#

Bases: ArbiterTask

A base TUI application.

app: AppInfo#
async dispatch() bool[source]#

Dispatch an iteration of this task.

async init(app: AppInfo) None[source]#

Initialize this task with application information.

class runtimepy.task.sample.SampleTask(name: str, average_depth: int = 10, metrics: PeriodicTaskMetrics = None, period_s: float = 1.0, env: ChannelEnvironment = None, period_controls: dict[str, int | float | bool | dict[str, int | float | bool]] | str = 'period', markdown: str = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None)[source]#

Bases: ArbiterTask, TrigMixin, AsyncCommandProcessingMixin

A sample application.

async dispatch() bool[source]#

Dispatch an iteration of this task.

async init(app: AppInfo) None[source]#

Initialize this task with application information.

Module contents#

A module implementing an asynchronous task interface.

class runtimepy.task.AsyncTask(name: str, period_s: float, env: ChannelEnvironment, average_depth: int = 10, max_iterations: int = 0)[source]#

Bases: LoggerMixin

A basic implementation of a periodic task.

disable() None[source]#

Disable this task.

async dispatch(*_, **__) bool[source]#

Dispatch this task.

enable() None[source]#

Enable this task.

async init(*_, **__) bool[source]#

Initialize this task.

init_channels(env: ChannelEnvironment) None[source]#

Initialize task-specific channels.

log_metrics() None[source]#

Log information related to metrics channels.

property rate_str: str#

Get this periodic’s rate as a string.

reset_metrics() None[source]#

Reset metrics channel values.

async run(*args, stop_sig: Event = None, **kwargs) None[source]#

Run this task while it’s enabled.

class runtimepy.task.PeriodicTask(name: str, average_depth: int = 10, metrics: PeriodicTaskMetrics = None, period_s: float = 1.0, env: ChannelEnvironment = None, period_controls: dict[str, int | float | bool | dict[str, int | float | bool]] | str = 'period', markdown: str = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None)[source]#

Bases: LoggerMixinLevelControl, ChannelEnvironmentMixin, MarkdownMixin, ABC

A class implementing a simple periodic-task interface.

auto_finalize = True#
disable() bool[source]#

Disable this task, return whether or not any action was taken.

abstractmethod async dispatch() bool[source]#

Dispatch an iteration of this task.

async run(period_s: float = None, stop_sig: Event = None) None[source]#

Run this task by executing the dispatch method at the specified period until a dispatch iteration fails or the task is otherwise disabled.

set_period(period_s: float = None, update_default: bool = True) bool[source]#

Attempt to set a new period for this task.

async stop() bool[source]#

Wait for this task to stop running (if it is).

async stop_extra() None[source]#

Extra actions to perform when this task is stopping.

async task(period_s: float = None, stop_sig: Event = None) Task[None][source]#

Create an event-loop task for this periodic.

async wait_for_disable(timeout: float, value: bool = False) EvalResult[source]#

Wait for a task to become disabled.

async wait_iterations(timeout: float, count: int = 1) bool[source]#

Wait for a task to complete a certain number of iterations.

class runtimepy.task.PeriodicTaskManager[source]#

Bases: Generic[T]

A class for managing periodic tasks as a single group.

register(task: T, period_s: float = None) bool[source]#

Register a periodic task.

running(stop_sig: Event = None) AsyncIterator[None][source]#

Run tasks as an async context.

async start(stop_sig: Event = None) None[source]#

Ensure tasks are started.

async stop() None[source]#

Ensure tasks are stopped.

property tasks: Iterator[T]#

Iterate over tasks.

class runtimepy.task.PeriodicTaskMetrics(dispatches: Uint32Primitive, rate_hz: FloatPrimitive, average_s: FloatPrimitive, max_s: FloatPrimitive, min_s: FloatPrimitive, overruns: Uint16Primitive)[source]#

Bases: NamedTuple

Metrics for a periodic tasks.

average_s: FloatPrimitive#

Alias for field number 2

static create(time_source: ~typing.Callable[[], int] = <function metrics_time_ns>) PeriodicTaskMetrics[source]#

Create a new metrics instance.

dispatches: Uint32Primitive#

Alias for field number 0

max_s: FloatPrimitive#

Alias for field number 3

measure(rate: RateTracker, dispatch: MovingAverage, iter_time: DoublePrimitive, period_s: float) Iterator[None][source]#

Measure the time spent yielding and update data.

min_s: FloatPrimitive#

Alias for field number 4

overruns: Uint16Primitive#

Alias for field number 5

rate_hz: FloatPrimitive#

Alias for field number 1