Skip to content

pipeline

Modules:

Classes:

Functions:

AuditEntry

Bases: BaseModel

BaseSkill

BaseSkill(metadata: SkillMetadata)

Methods:

Source code in src/hiperhealth/pipeline/skill.py
def __init__(self, metadata: SkillMetadata) -> None:
    """
    title: Initialize a base skill with immutable metadata.
    parameters:
      metadata:
        type: SkillMetadata
    """
    self.metadata = metadata

check_requirements

check_requirements(
    stage: str, ctx: PipelineContext
) -> list[Inquiry]

Override to return a list of Inquiry objects describing what additional data the skill needs. The default implementation returns an empty list (no extra data needed). Inquiries use three priority levels: - required: must have before this stage can run - supplementary: improves results, available now - deferred: only available after a future pipeline step parameters: stage: type: str ctx: type: PipelineContext returns: type: list[Inquiry]

Source code in src/hiperhealth/pipeline/skill.py
def check_requirements(
    self, stage: str, ctx: PipelineContext
) -> list[Inquiry]:
    """
    title: Determine what information is needed before execution.
    summary: |-
      Override to return a list of Inquiry objects describing
      what additional data the skill needs.  The default
      implementation returns an empty list (no extra data needed).
      Inquiries use three priority levels:
      - required: must have before this stage can run
      - supplementary: improves results, available now
      - deferred: only available after a future pipeline step
    parameters:
      stage:
        type: str
      ctx:
        type: PipelineContext
    returns:
      type: list[Inquiry]
    """
    return []

execute

execute(
    stage: str, ctx: PipelineContext
) -> PipelineContext
Source code in src/hiperhealth/pipeline/skill.py
def execute(self, stage: str, ctx: PipelineContext) -> PipelineContext:
    """
    title: The main execution hook for a stage.
    parameters:
      stage:
        type: str
      ctx:
        type: PipelineContext
    returns:
      type: PipelineContext
    """
    return ctx

post

post(stage: str, ctx: PipelineContext) -> PipelineContext
Source code in src/hiperhealth/pipeline/skill.py
def post(self, stage: str, ctx: PipelineContext) -> PipelineContext:
    """
    title: Called after the stage's main execution.
    parameters:
      stage:
        type: str
      ctx:
        type: PipelineContext
    returns:
      type: PipelineContext
    """
    return ctx

pre

pre(stage: str, ctx: PipelineContext) -> PipelineContext
Source code in src/hiperhealth/pipeline/skill.py
def pre(self, stage: str, ctx: PipelineContext) -> PipelineContext:
    """
    title: Called before the stage's main execution.
    parameters:
      stage:
        type: str
      ctx:
        type: PipelineContext
    returns:
      type: PipelineContext
    """
    return ctx

ChannelManifest

Bases: BaseModel

Inquiry

Bases: BaseModel

PipelineContext

Bases: BaseModel

Serializable context that flows between independently executed stages. summary: |- Callers can serialize this to JSON between invocations that may happen hours or days apart, by different actors. attributes: patient: type: dict[str, Any] language: type: str session_id: type: str | None results: type: dict[str, Any] audit: type: list[AuditEntry] extras: type: dict[str, Any]

Session

Session(path: Path, language: str = 'en')

System X creates or loads a session, provides clinical data, and uses the runner to assess / execute stages. The parquet file is the single source of truth. attributes: path: type: Path _language: type: str _events: type: list[dict[str, Any]]

Methods:

Attributes:

Source code in src/hiperhealth/pipeline/session.py
def __init__(self, path: Path, language: str = 'en') -> None:
    """
    title: Initialize an in-memory session wrapper for a parquet file.
    parameters:
      path:
        type: Path
      language:
        type: str
    """
    self.path: Path = path
    self._language: str = language
    self._events: list[dict[str, Any]] = []

clinical_data property

clinical_data: dict[str, Any]

events property

events: list[dict[str, Any]]

language property

language: str

pending_inquiries property

pending_inquiries: list[Inquiry]

results property

results: dict[str, Any]

stages_completed property

stages_completed: list[str]

create classmethod

create(path: str | Path, language: str = 'en') -> Session
Source code in src/hiperhealth/pipeline/session.py
@classmethod
def create(
    cls,
    path: str | Path,
    language: str = 'en',
) -> Session:
    """
    title: Create a new session file.
    parameters:
      path:
        type: str | Path
      language:
        type: str
    returns:
      type: Session
    """
    path = Path(path)
    if path.exists():
        msg = f'Session file already exists: {path}'
        raise FileExistsError(msg)
    session = cls(path, language=language)
    session._save()
    return session

load classmethod

load(path: str | Path) -> Session
Source code in src/hiperhealth/pipeline/session.py
@classmethod
def load(cls, path: str | Path) -> Session:
    """
    title: Load an existing session from a parquet file.
    parameters:
      path:
        type: str | Path
    returns:
      type: Session
    """
    path = Path(path)
    if not path.exists():
        msg = f'Session file not found: {path}'
        raise FileNotFoundError(msg)
    session = cls(path)
    session._load()
    return session

provide_answers

provide_answers(answers: dict[str, Any]) -> None
Source code in src/hiperhealth/pipeline/session.py
def provide_answers(self, answers: dict[str, Any]) -> None:
    """
    title: Provide answers to inquiries.
    parameters:
      answers:
        type: dict[str, Any]
    """
    self._append_event(
        'answers_provided',
        data={'fields': answers},
    )

record_event

record_event(
    event_type: str,
    stage: str | None = None,
    skill_name: str | None = None,
    data: dict[str, Any] | None = None,
) -> None
Source code in src/hiperhealth/pipeline/session.py
def record_event(
    self,
    event_type: str,
    stage: str | None = None,
    skill_name: str | None = None,
    data: dict[str, Any] | None = None,
) -> None:
    """
    title: Record an arbitrary event (used by the runner).
    parameters:
      event_type:
        type: str
      stage:
        type: str | None
      skill_name:
        type: str | None
      data:
        type: dict[str, Any] | None
    """
    self._append_event(
        event_type,
        stage=stage,
        skill_name=skill_name,
        data=data,
    )

set_clinical_data

set_clinical_data(fields: dict[str, Any]) -> None
Source code in src/hiperhealth/pipeline/session.py
def set_clinical_data(self, fields: dict[str, Any]) -> None:
    """
    title: Provide clinical information (no PII).
    parameters:
      fields:
        type: dict[str, Any]
    """
    self._append_event(
        'clinical_data_set',
        data={'fields': fields},
    )

to_context

to_context() -> PipelineContext
Source code in src/hiperhealth/pipeline/session.py
def to_context(self) -> PipelineContext:
    """
    title: Build a PipelineContext from current session state.
    returns:
      type: PipelineContext
    """
    return PipelineContext(
        patient=self.clinical_data,
        language=self._language,
        session_id=self.path.stem,
        results=self.results,
        extras={},
    )

update_from_context

update_from_context(
    stage: str, ctx: PipelineContext
) -> None
Source code in src/hiperhealth/pipeline/session.py
def update_from_context(
    self,
    stage: str,
    ctx: PipelineContext,
) -> None:
    """
    title: Capture results after a stage runs.
    parameters:
      stage:
        type: str
      ctx:
        type: PipelineContext
    """
    stage_result = ctx.results.get(stage)
    result_data: Any
    if stage_result is not None:
        if hasattr(stage_result, 'model_dump'):
            result_data = stage_result.model_dump()
        else:
            result_data = stage_result
    else:
        result_data = {}
    self._append_event(
        'stage_completed',
        stage=stage,
        data={'results': result_data},
    )

Skill

Bases: Protocol

Methods:

check_requirements

check_requirements(
    stage: str, ctx: PipelineContext
) -> list[Inquiry]
Source code in src/hiperhealth/pipeline/skill.py
def check_requirements(
    self, stage: str, ctx: PipelineContext
) -> list[Inquiry]:
    """
    title: Describe what information the skill still needs.
    parameters:
      stage:
        type: str
      ctx:
        type: PipelineContext
    returns:
      type: list[Inquiry]
    """
    ...

execute

execute(
    stage: str, ctx: PipelineContext
) -> PipelineContext
Source code in src/hiperhealth/pipeline/skill.py
def execute(self, stage: str, ctx: PipelineContext) -> PipelineContext:
    """
    title: Run the main execution hook for a stage.
    parameters:
      stage:
        type: str
      ctx:
        type: PipelineContext
    returns:
      type: PipelineContext
    """
    ...

post

post(stage: str, ctx: PipelineContext) -> PipelineContext
Source code in src/hiperhealth/pipeline/skill.py
def post(self, stage: str, ctx: PipelineContext) -> PipelineContext:
    """
    title: Run the post-execution hook for a stage.
    parameters:
      stage:
        type: str
      ctx:
        type: PipelineContext
    returns:
      type: PipelineContext
    """
    ...

pre

pre(stage: str, ctx: PipelineContext) -> PipelineContext
Source code in src/hiperhealth/pipeline/skill.py
def pre(self, stage: str, ctx: PipelineContext) -> PipelineContext:
    """
    title: Run the pre-execution hook for a stage.
    parameters:
      stage:
        type: str
      ctx:
        type: PipelineContext
    returns:
      type: PipelineContext
    """
    ...

SkillMetadata dataclass

SkillMetadata(
    name: str,
    version: str = '0.1.0',
    stages: tuple[str, ...] = (),
    description: str = '',
)

SkillRegistry

SkillRegistry(registry_dir: Path | None = None)

Methods:

Attributes:

Source code in src/hiperhealth/pipeline/registry.py
def __init__(self, registry_dir: Path | None = None) -> None:
    """
    title: Initialize the channel-aware skill registry.
    parameters:
      registry_dir:
        type: Path | None
    """
    self._registry_dir = (
        registry_dir
        or Path.home() / '.hiperhealth' / 'artifacts' / 'skills'
    )
    if (
        self._registry_dir.name == 'skills'
        and self._registry_dir.parent.name == 'artifacts'
    ):
        self._root_dir = self._registry_dir.parent.parent
    else:
        self._root_dir = self._registry_dir.parent
    self._state_dir = self._root_dir / 'registry'
    self._channels_dir = self._root_dir / 'channels'
    self._builtin_dir = Path(__file__).resolve().parent.parent / 'skills'

registry_dir property

registry_dir: Path

root_dir property

root_dir: Path

add_channel

add_channel(
    source: str,
    local_name: str | None = None,
    ref: str | None = None,
) -> str
Source code in src/hiperhealth/pipeline/registry.py
def add_channel(
    self,
    source: str,
    local_name: str | None = None,
    ref: str | None = None,
) -> str:
    """
    title: Register a new local-folder or Git-backed skill channel.
    parameters:
      source:
        type: str
      local_name:
        type: str | None
      ref:
        type: str | None
    returns:
      type: str
    """
    self._ensure_storage_dirs()
    state = self._load_state()
    with tempfile.TemporaryDirectory() as tmp_dir:
        temp_repo = Path(tmp_dir) / 'repo'
        normalized_source, is_local = self._materialize_channel_source(
            source,
            temp_repo,
            ref=ref,
        )
        self._detect_source_kind(temp_repo)
        channel_manifest = self._read_channel_manifest(temp_repo)
        resolved_name = self._resolve_local_name(
            channel_manifest, local_name, state
        )
        target_repo = self._channel_repo_dir(resolved_name)
        if target_repo.exists():
            msg = f'Channel alias {resolved_name!r} is already registered.'
            raise ValueError(msg)
        target_repo.parent.mkdir(parents=True, exist_ok=True)
        shutil.move(str(temp_repo), str(target_repo))

    record = self._channel_record_from_repo(
        resolved_name,
        normalized_source,
        target_repo,
        ref=None if is_local else ref,
    )
    state.channels[resolved_name] = record
    self._save_state(state)
    self._save_channel_metadata(resolved_name, record, target_repo)
    return resolved_name

install_channel

install_channel(
    local_name: str, include_disabled: bool = False
) -> list[str]
Source code in src/hiperhealth/pipeline/registry.py
def install_channel(
    self, local_name: str, include_disabled: bool = False
) -> list[str]:
    """
    title: Install all eligible skills from a registered channel.
    parameters:
      local_name:
        type: str
      include_disabled:
        type: bool
    returns:
      type: list[str]
    """
    state = self._load_state()
    if local_name not in state.channels:
        msg = f'Channel {local_name!r} is not registered.'
        raise KeyError(msg)

    installed: list[str] = []
    for entry in self._iter_channel_skill_entries(local_name):
        if not include_disabled and not entry.available.enabled:
            continue
        installed.append(self.install_skill(entry.available.canonical_id))
    return sorted(installed)

install_skill

install_skill(skill_id: str) -> str
Source code in src/hiperhealth/pipeline/registry.py
def install_skill(self, skill_id: str) -> str:
    """
    title: Install one skill from a registered channel.
    parameters:
      skill_id:
        type: str
    returns:
      type: str
    """
    if skill_id.startswith(f'{BUILTIN_CHANNEL}.'):
        msg = 'Built-in hiperhealth skills do not need installation.'
        raise ValueError(msg)

    state = self._load_state()
    available = self._find_available_channel_skill(skill_id)
    if available is None:
        msg = (
            f'Skill {skill_id!r} is not available from any registered '
            'channel.'
        )
        raise KeyError(msg)

    existing = state.skills.get(skill_id)
    channel = state.channels[available.available.channel]
    installed_at = existing.installed_at if existing else _utcnow()
    state.skills[skill_id] = InstalledSkillRecord(
        id=skill_id,
        channel=available.available.channel,
        skill_name=available.available.name,
        manifest_path=str(available.manifest_path),
        installed_at=installed_at,
        updated_at=_utcnow(),
        version=available.manifest.version,
        source_commit=channel.commit,
        enabled=available.available.enabled,
    )
    self._save_state(state)
    self._install_dependencies(available.manifest.dependencies)
    return skill_id

list_channel_skills

list_channel_skills(
    local_name: str,
) -> list[AvailableSkillRecord]
Source code in src/hiperhealth/pipeline/registry.py
def list_channel_skills(
    self, local_name: str
) -> list[AvailableSkillRecord]:
    """
    title: List the skills declared by one registered channel.
    parameters:
      local_name:
        type: str
    returns:
      type: list[AvailableSkillRecord]
    """
    state = self._load_state()
    if local_name not in state.channels:
        msg = f'Channel {local_name!r} is not registered.'
        raise KeyError(msg)
    entries = self._iter_channel_skill_entries(local_name)
    return [
        entry.available
        for entry in sorted(
            entries,
            key=lambda entry: entry.available.canonical_id,
        )
    ]

list_channels

list_channels() -> list[ChannelRecord]
Source code in src/hiperhealth/pipeline/registry.py
def list_channels(self) -> list[ChannelRecord]:
    """
    title: List all registered channels.
    returns:
      type: list[ChannelRecord]
    """
    state = self._load_state()
    return [state.channels[name] for name in sorted(state.channels.keys())]

list_skills

list_skills(
    channel: str | None = None, installed_only: bool = False
) -> list[SkillSummary]
Source code in src/hiperhealth/pipeline/registry.py
def list_skills(
    self,
    channel: str | None = None,
    installed_only: bool = False,
) -> list[SkillSummary]:
    """
    title: List built-in and channel skills known to the registry.
    parameters:
      channel:
        type: str | None
      installed_only:
        type: bool
    returns:
      type: list[SkillSummary]
    """
    state = self._load_state()
    summaries: list[SkillSummary] = []

    if channel in (None, BUILTIN_CHANNEL):
        for skill_dir, manifest in self._iter_builtin_skill_entries():
            skill_name = self._builtin_skill_name(manifest, skill_dir)
            canonical_id = _canonical_skill_id(BUILTIN_CHANNEL, skill_name)
            summaries.append(
                SkillSummary(
                    **manifest.model_dump(),
                    channel=BUILTIN_CHANNEL,
                    skill_name=skill_name,
                    canonical_id=canonical_id,
                    manifest_path=str(skill_dir / 'skill.yaml'),
                    installed=True,
                    enabled=True,
                    builtin=True,
                )
            )

    if channel is None:
        channel_names = sorted(state.channels.keys())
    elif channel in state.channels:
        channel_names = [channel]
    else:
        channel_names = []

    for local_name in channel_names:
        installed_ids = {
            skill_id
            for skill_id, record in state.skills.items()
            if record.channel == local_name
        }
        for entry in self._iter_channel_skill_entries(local_name):
            if (
                installed_only
                and entry.available.canonical_id not in installed_ids
            ):
                continue
            summaries.append(
                SkillSummary(
                    **entry.manifest.model_dump(),
                    channel=local_name,
                    skill_name=entry.available.name,
                    canonical_id=entry.available.canonical_id,
                    manifest_path=str(entry.manifest_path),
                    installed=entry.available.canonical_id
                    in installed_ids,
                    enabled=entry.available.enabled,
                    tags=list(entry.available.tags),
                )
            )

    unique: dict[str, SkillSummary] = {
        summary.canonical_id: summary for summary in summaries
    }
    return [unique[key] for key in sorted(unique.keys())]

load

load(name: str) -> BaseSkill
Source code in src/hiperhealth/pipeline/registry.py
def load(self, name: str) -> BaseSkill:
    """
    title: Load a built-in or installed channel skill by name.
    parameters:
      name:
        type: str
    returns:
      type: BaseSkill
    """
    for skill_dir, manifest in self._iter_builtin_skill_entries():
        if manifest.name != name:
            continue
        package_base = f'hiperhealth.skills.{skill_dir.name}'
        cls = _load_class_from_package(package_base, manifest.entry_point)
        skill = cls()
        return self._normalize_loaded_skill(skill, manifest.name)

    state = self._load_state()
    record = state.skills.get(name)
    if record is not None:
        return self._load_channel_skill(record)

    if self._find_available_channel_skill(name) is not None:
        msg = (
            f'Skill {name!r} is available but not installed. Use '
            f'install_skill({name!r}) first.'
        )
        raise KeyError(msg)

    msg = (
        f'Skill {name!r} not found. Use list_skills() to inspect '
        'available skills and install_skill() or install_channel() to add '
        'them.'
    )
    raise KeyError(msg)

remove_channel

remove_channel(local_name: str) -> None
Source code in src/hiperhealth/pipeline/registry.py
def remove_channel(self, local_name: str) -> None:
    """
    title: Remove a registered channel and its installed skills.
    parameters:
      local_name:
        type: str
    """
    state = self._load_state()
    if local_name not in state.channels:
        msg = f'Channel {local_name!r} is not registered.'
        raise KeyError(msg)

    for skill_id, record in list(state.skills.items()):
        if record.channel != local_name:
            continue
        state.skills.pop(skill_id, None)

    shutil.rmtree(self._channel_dir(local_name), ignore_errors=True)
    state.channels.pop(local_name, None)
    self._save_state(state)

remove_skill

remove_skill(skill_id: str) -> None
Source code in src/hiperhealth/pipeline/registry.py
def remove_skill(self, skill_id: str) -> None:
    """
    title: Remove one installed channel skill from the registry.
    parameters:
      skill_id:
        type: str
    """
    if skill_id.startswith(f'{BUILTIN_CHANNEL}.'):
        msg = 'Built-in hiperhealth skills cannot be removed.'
        raise ValueError(msg)

    state = self._load_state()
    record = state.skills.get(skill_id)
    if record is None:
        msg = f'Skill {skill_id!r} is not installed.'
        raise KeyError(msg)

    state.skills.pop(skill_id, None)
    self._save_state(state)

update_channel

update_channel(
    local_name: str, ref: str | None = None
) -> list[str]
Source code in src/hiperhealth/pipeline/registry.py
def update_channel(
    self, local_name: str, ref: str | None = None
) -> list[str]:
    """
    title: Refresh a channel checkout and its installed skills.
    parameters:
      local_name:
        type: str
      ref:
        type: str | None
    returns:
      type: list[str]
    """
    state = self._load_state()
    channel = state.channels.get(local_name)
    if channel is None:
        msg = f'Channel {local_name!r} is not registered.'
        raise KeyError(msg)

    repo_dir = self._channel_repo_dir(local_name)
    target_ref: str | None = None
    if channel.provider == 'local':
        if ref is not None:
            msg = 'ref is only supported for remote git sources.'
            raise ValueError(msg)
        source_dir = Path(channel.source)
        if not source_dir.is_dir():
            msg = (
                f'Local channel source {channel.source!r} no longer '
                'exists.'
            )
            raise FileNotFoundError(msg)
        self._copy_source_tree(source_dir, repo_dir)
    else:
        target_ref = ref if ref is not None else channel.ref
        self._update_repo(repo_dir, ref=target_ref)
    refreshed_channel = self._channel_record_from_repo(
        local_name,
        channel.source,
        repo_dir,
        registered_at=channel.registered_at,
        ref=target_ref,
    )
    state.channels[local_name] = refreshed_channel

    available_map = {
        entry.available.canonical_id: entry
        for entry in self._iter_channel_skill_entries(local_name)
    }
    updated: list[str] = []
    for skill_id, record in list(state.skills.items()):
        if record.channel != local_name:
            continue
        available = available_map.get(skill_id)
        if available is None:
            state.skills.pop(skill_id, None)
            continue

        self._install_dependencies(available.manifest.dependencies)
        state.skills[skill_id] = InstalledSkillRecord(
            id=skill_id,
            channel=local_name,
            skill_name=available.available.name,
            manifest_path=str(available.manifest_path),
            installed_at=record.installed_at,
            updated_at=_utcnow(),
            version=available.manifest.version,
            source_commit=refreshed_channel.commit,
            enabled=available.available.enabled,
        )
        updated.append(skill_id)

    self._save_state(state)
    self._save_channel_metadata(local_name, refreshed_channel, repo_dir)
    return sorted(updated)

update_skill

update_skill(
    skill_id: str, pull_channel: bool = False
) -> str
Source code in src/hiperhealth/pipeline/registry.py
def update_skill(self, skill_id: str, pull_channel: bool = False) -> str:
    """
    title: Refresh one installed skill, optionally pulling its channel.
    parameters:
      skill_id:
        type: str
      pull_channel:
        type: bool
    returns:
      type: str
    """
    state = self._load_state()
    record = state.skills.get(skill_id)
    if record is None:
        msg = f'Skill {skill_id!r} is not installed.'
        raise KeyError(msg)

    if record.channel is None:
        msg = f'Skill {skill_id!r} has no owning channel.'
        raise ValueError(msg)

    if pull_channel:
        self.update_channel(record.channel)
        return skill_id

    available = self._find_available_channel_skill(skill_id)
    if available is None:
        msg = (
            f'Skill {skill_id!r} is no longer declared by channel '
            f'{record.channel!r}.'
        )
        raise KeyError(msg)

    channel = state.channels[record.channel]
    state.skills[skill_id] = InstalledSkillRecord(
        id=skill_id,
        channel=record.channel,
        skill_name=available.available.name,
        manifest_path=str(available.manifest_path),
        installed_at=record.installed_at,
        updated_at=_utcnow(),
        version=available.manifest.version,
        source_commit=channel.commit,
        enabled=available.available.enabled,
    )
    self._save_state(state)
    self._install_dependencies(available.manifest.dependencies)
    return skill_id

Stage

Bases: str, Enum

Custom string stage names are also accepted by the StageRunner; this enum provides the standard built-in stages.

StageRunner

StageRunner(
    skills: list[Skill] | None = None,
    registry: SkillRegistry | None = None,
)

Each stage can be run independently, at any time, by any actor. The primary API is run() for single-stage execution. run_many() is a convenience for sequential batch execution. attributes: _skills: type: list[Skill] _registry: description: Value for _registry. _disabled_skill_names: type: set[str]

Methods:

Attributes:

Source code in src/hiperhealth/pipeline/runner.py
def __init__(
    self,
    skills: list[Skill] | None = None,
    registry: SkillRegistry | None = None,
) -> None:
    """
    title: Initialize the stage runner with optional skills.
    parameters:
      skills:
        type: list[Skill] | None
      registry:
        type: SkillRegistry | None
    """
    self._skills: list[Skill] = list(skills or [])
    self._registry = registry
    self._disabled_skill_names: set[str] = set()

skills property

skills: list[Skill]

check_requirements

check_requirements(
    stage: str,
    session: Session,
    *,
    disabled_skills: str | Collection[str] | None = None,
    **kwargs: Any,
) -> list[Inquiry]

Builds a PipelineContext from the session, calls skill.check_requirements() for every skill registered on the given stage, and records events in the session file. Inquiries are returned with three priority levels: - required: must have before this stage can run - supplementary: improves results, available now - deferred: only available after a future pipeline step parameters: stage: type: str session: type: Session disabled_skills: type: str | Collection[str] | None kwargs: type: Any variadic: keyword returns: type: list[Inquiry]

Source code in src/hiperhealth/pipeline/runner.py
def check_requirements(
    self,
    stage: str,
    session: Session,
    *,
    disabled_skills: str | Collection[str] | None = None,
    **kwargs: Any,
) -> list[Inquiry]:
    """
    title: Ask relevant skills what information they need.
    summary: |-
      Builds a PipelineContext from the session, calls
      ``skill.check_requirements()`` for every skill registered
      on the given stage, and records events in the session file.
      Inquiries are returned with three priority levels:
      - required: must have before this stage can run
      - supplementary: improves results, available now
      - deferred: only available after a future pipeline step
    parameters:
      stage:
        type: str
      session:
        type: Session
      disabled_skills:
        type: str | Collection[str] | None
      kwargs:
        type: Any
        variadic: keyword
    returns:
      type: list[Inquiry]
    """
    ctx = session.to_context()
    ctx.extras['_run_kwargs'] = kwargs
    session.record_event('check_requirements_started', stage=stage)

    relevant = self._relevant_skills(
        stage,
        disabled_skills=disabled_skills,
    )
    all_inquiries: list[Inquiry] = []

    for skill in relevant:
        inquiries = skill.check_requirements(stage, ctx)
        if inquiries:
            session.record_event(
                'inquiries_raised',
                stage=stage,
                skill_name=skill.metadata.name,
                data={
                    'inquiries': [i.model_dump() for i in inquiries],
                },
            )
            all_inquiries.extend(inquiries)

    session.record_event(
        'check_requirements_completed',
        stage=stage,
        data={'total_inquiries': len(all_inquiries)},
    )
    return all_inquiries

disabled

disabled(
    skill_names: str | Collection[str],
) -> Iterator[None]

Disabled skills stay registered and installed, but are skipped during runner operations while the context is active. parameters: skill_names: type: str | Collection[str] returns: type: Iterator[None]

Source code in src/hiperhealth/pipeline/runner.py
@contextmanager
def disabled(
    self,
    skill_names: str | Collection[str],
) -> Iterator[None]:
    """
    title: Temporarily disable one or more registered skills.
    summary: |-
      Disabled skills stay registered and installed, but are skipped
      during runner operations while the context is active.
    parameters:
      skill_names:
        type: str | Collection[str]
    returns:
      type: Iterator[None]
    """
    previous = set(self._disabled_skill_names)
    self._disabled_skill_names.update(
        self._normalize_skill_names(skill_names)
    )
    try:
        yield
    finally:
        self._disabled_skill_names = previous

register

register(name: str, index: int | None = None) -> None

Looks up the skill in the attached SkillRegistry using either a built-in name, a canonical channel skill id such as tm.ayurveda, or a legacy installed skill name; then it instantiates the skill and adds it to the execution list. Pass index to control execution order. parameters: name: type: str index: type: int | None

Source code in src/hiperhealth/pipeline/runner.py
def register(self, name: str, index: int | None = None) -> None:
    """
    title: Load a skill from the registry by name and activate it.
    summary: |-
      Looks up the skill in the attached SkillRegistry using either a
      built-in name, a canonical channel skill id such as
      ``tm.ayurveda``, or a legacy installed skill name; then it
      instantiates the skill and adds it to the execution list.
      Pass ``index`` to control execution order.
    parameters:
      name:
        type: str
      index:
        type: int | None
    """
    if self._registry is None:
        from hiperhealth.pipeline.registry import SkillRegistry

        self._registry = SkillRegistry()
    skill = self._registry.load(name)
    self._add_skill(skill, index=index)

run

run(
    stage: str,
    ctx: PipelineContext,
    *,
    disabled_skills: str | Collection[str] | None = None,
    **kwargs: Any,
) -> PipelineContext

Extra keyword arguments (e.g. llm, llm_settings) are stored in ctx.extras['_run_kwargs'] so skills can access them. parameters: stage: type: str ctx: type: PipelineContext disabled_skills: type: str | Collection[str] | None kwargs: type: Any variadic: keyword returns: type: PipelineContext

Source code in src/hiperhealth/pipeline/runner.py
def run(
    self,
    stage: str,
    ctx: PipelineContext,
    *,
    disabled_skills: str | Collection[str] | None = None,
    **kwargs: Any,
) -> PipelineContext:
    """
    title: Run a single stage. This is the primary API.
    summary: |-
      Extra keyword arguments (e.g. ``llm``, ``llm_settings``)
      are stored in ``ctx.extras['_run_kwargs']`` so skills can
      access them.
    parameters:
      stage:
        type: str
      ctx:
        type: PipelineContext
      disabled_skills:
        type: str | Collection[str] | None
      kwargs:
        type: Any
        variadic: keyword
    returns:
      type: PipelineContext
    """
    ctx.extras['_run_kwargs'] = kwargs
    return self._run_stage(stage, ctx, disabled_skills=disabled_skills)

run_many

run_many(
    stages: list[str],
    ctx: PipelineContext,
    *,
    disabled_skills: str | Collection[str] | None = None,
    **kwargs: Any,
) -> PipelineContext
Source code in src/hiperhealth/pipeline/runner.py
def run_many(
    self,
    stages: list[str],
    ctx: PipelineContext,
    *,
    disabled_skills: str | Collection[str] | None = None,
    **kwargs: Any,
) -> PipelineContext:
    """
    title: Run multiple stages sequentially.
    parameters:
      stages:
        type: list[str]
      ctx:
        type: PipelineContext
      disabled_skills:
        type: str | Collection[str] | None
      kwargs:
        type: Any
        variadic: keyword
    returns:
      type: PipelineContext
    """
    for stage in stages:
        ctx = self.run(
            stage,
            ctx,
            disabled_skills=disabled_skills,
            **kwargs,
        )
    return ctx

run_session

run_session(
    stage: str,
    session: Session,
    *,
    disabled_skills: str | Collection[str] | None = None,
    **kwargs: Any,
) -> Session

Builds a PipelineContext from the session, runs the stage with the existing run() method, then writes results back to the session parquet. parameters: stage: type: str session: type: Session disabled_skills: type: str | Collection[str] | None kwargs: type: Any variadic: keyword returns: type: Session

Source code in src/hiperhealth/pipeline/runner.py
def run_session(
    self,
    stage: str,
    session: Session,
    *,
    disabled_skills: str | Collection[str] | None = None,
    **kwargs: Any,
) -> Session:
    """
    title: Execute a stage using the session file.
    summary: |-
      Builds a PipelineContext from the session, runs the stage
      with the existing ``run()`` method, then writes results
      back to the session parquet.
    parameters:
      stage:
        type: str
      session:
        type: Session
      disabled_skills:
        type: str | Collection[str] | None
      kwargs:
        type: Any
        variadic: keyword
    returns:
      type: Session
    """
    ctx = session.to_context()
    session.record_event('stage_started', stage=stage)
    ctx = self.run(
        stage,
        ctx,
        disabled_skills=disabled_skills,
        **kwargs,
    )
    session.update_from_context(stage, ctx)
    return session

create_default_runner

create_default_runner() -> StageRunner

Uses the SkillRegistry to load and register built-in skills in the standard order: privacy, extraction, diagnostics. returns: type: StageRunner

Source code in src/hiperhealth/pipeline/__init__.py
def create_default_runner() -> StageRunner:
    """
    title: Create a StageRunner with all built-in skills pre-configured.
    summary: |-
      Uses the SkillRegistry to load and register built-in skills
      in the standard order: privacy, extraction, diagnostics.
    returns:
      type: StageRunner
    """
    registry = SkillRegistry()
    runner = StageRunner(registry=registry)
    runner.register('hiperhealth.privacy')
    runner.register('hiperhealth.extraction')
    runner.register('hiperhealth.diagnostics')
    return runner

discover_skills

discover_skills(
    group: str = 'hiperhealth.skills',
) -> list[BaseSkill]

Scans the hiperhealth.skills entry-point group for pip-installed skill packages. parameters: group: type: str returns: type: list[BaseSkill]

Source code in src/hiperhealth/pipeline/discovery.py
def discover_skills(
    group: str = 'hiperhealth.skills',
) -> list[BaseSkill]:
    """
    title: Load all installed skill classes and instantiate them.
    summary: |-
      Scans the ``hiperhealth.skills`` entry-point group for
      pip-installed skill packages.
    parameters:
      group:
        type: str
    returns:
      type: list[BaseSkill]
    """
    skills: list[BaseSkill] = []
    eps = entry_points(group=group)
    for ep in eps:
        skill_cls = ep.load()
        skills.append(skill_cls())
    return skills