Source code for experimental_lowqa.tasks.conntextual
"""
A module capable of registering workflow tasks for invoking project-specific
conntextual tasks.
"""
# built-in
from pathlib import Path
from typing import Any, Dict, List
# third-party
from vcorelib.io import ARBITER
from vcorelib.io.types import FileExtension
from vcorelib.paths.context import tempfile
from vcorelib.task import Inbox, Outbox
from vcorelib.task.manager import TaskManager
from vcorelib.task.subprocess.run import SubprocessLogMixin
# internal
from experimental_lowqa.prompts import manual_select
[docs]
class ConntextualTask(SubprocessLogMixin):
"""A task for running conntextual."""
default_requirements = {
"vmklib.init",
"venv",
"python-install-conntextual",
}
[docs]
async def run(self, inbox: Inbox, outbox: Outbox, *args, **kwargs) -> bool:
"""Generate ninja configuration files."""
# check for data files in 'configs'
configs = {}
root: Path = args[0].joinpath("tasks")
for path in root.iterdir():
if path.is_file():
ext = FileExtension.from_path(path)
if ext is not None and ext.is_data():
configs[path.with_suffix("").name] = path
app = manual_select(
"app", configs, default=kwargs.get("app", "default")
)
result = False
if app is not None:
config_args = [str(configs[app])]
# Add extra data.
extra_data: Dict[str, Any] = args[1]
with tempfile(suffix=".yaml") as temp_config:
ARBITER.encode(temp_config, {"config": extra_data})
config_args.append(str(temp_config))
result = await self.exec(
str(
inbox["venv"]["venv{python_version}"]["bin"].joinpath(
"conntextual"
)
),
"ui",
*args[2],
*config_args,
)
return result
[docs]
def register(
manager: TaskManager,
project: str,
cwd: Path,
substitutions: Dict[str, str],
prefix: str = "r",
) -> bool:
"""Register project tasks to the manager."""
# May as well forward everything to the 'config' data.
extra_data = {
"project": project,
"cwd": str(cwd),
"substitutions": substitutions,
}
standard: List[str] = ["-n", "--no-poller"]
headless: List[str] = standard + ["-v", "headless"]
manager.register(ConntextualTask(prefix, cwd, extra_data, standard))
manager.register(ConntextualTask(prefix + "h", cwd, extra_data, headless))
manager.register(
ConntextualTask(prefix + "-{app}", cwd, extra_data, standard)
)
manager.register(
ConntextualTask(prefix + "h-{app}", cwd, extra_data, headless)
)
return True