Skip to content

registry

Classes:

ChannelManifest

Bases: BaseModel

SkillRegistry

SkillRegistry(registry_dir: Path | None = None)

Methods:

Attributes:

Source code in src/hiperhealth/pipeline/registry.py
def __init__(self, registry_dir: Path | None = None) -> None:
    """
    title: Initialize the channel-aware skill registry.
    parameters:
      registry_dir:
        type: Path | None
    """
    self._registry_dir = (
        registry_dir
        or Path.home() / '.hiperhealth' / 'artifacts' / 'skills'
    )
    if (
        self._registry_dir.name == 'skills'
        and self._registry_dir.parent.name == 'artifacts'
    ):
        self._root_dir = self._registry_dir.parent.parent
    else:
        self._root_dir = self._registry_dir.parent
    self._state_dir = self._root_dir / 'registry'
    self._channels_dir = self._root_dir / 'channels'
    self._builtin_dir = Path(__file__).resolve().parent.parent / 'skills'

registry_dir property

registry_dir: Path

root_dir property

root_dir: Path

add_channel

add_channel(
    source: str,
    local_name: str | None = None,
    ref: str | None = None,
) -> str
Source code in src/hiperhealth/pipeline/registry.py
def add_channel(
    self,
    source: str,
    local_name: str | None = None,
    ref: str | None = None,
) -> str:
    """
    title: Register a new local-folder or Git-backed skill channel.
    parameters:
      source:
        type: str
      local_name:
        type: str | None
      ref:
        type: str | None
    returns:
      type: str
    """
    self._ensure_storage_dirs()
    state = self._load_state()
    with tempfile.TemporaryDirectory() as tmp_dir:
        temp_repo = Path(tmp_dir) / 'repo'
        normalized_source, is_local = self._materialize_channel_source(
            source,
            temp_repo,
            ref=ref,
        )
        self._detect_source_kind(temp_repo)
        channel_manifest = self._read_channel_manifest(temp_repo)
        resolved_name = self._resolve_local_name(
            channel_manifest, local_name, state
        )
        target_repo = self._channel_repo_dir(resolved_name)
        if target_repo.exists():
            msg = f'Channel alias {resolved_name!r} is already registered.'
            raise ValueError(msg)
        target_repo.parent.mkdir(parents=True, exist_ok=True)
        shutil.move(str(temp_repo), str(target_repo))

    record = self._channel_record_from_repo(
        resolved_name,
        normalized_source,
        target_repo,
        ref=None if is_local else ref,
    )
    state.channels[resolved_name] = record
    self._save_state(state)
    self._save_channel_metadata(resolved_name, record, target_repo)
    return resolved_name

install_channel

install_channel(
    local_name: str, include_disabled: bool = False
) -> list[str]
Source code in src/hiperhealth/pipeline/registry.py
def install_channel(
    self, local_name: str, include_disabled: bool = False
) -> list[str]:
    """
    title: Install all eligible skills from a registered channel.
    parameters:
      local_name:
        type: str
      include_disabled:
        type: bool
    returns:
      type: list[str]
    """
    state = self._load_state()
    if local_name not in state.channels:
        msg = f'Channel {local_name!r} is not registered.'
        raise KeyError(msg)

    installed: list[str] = []
    for entry in self._iter_channel_skill_entries(local_name):
        if not include_disabled and not entry.available.enabled:
            continue
        installed.append(self.install_skill(entry.available.canonical_id))
    return sorted(installed)

install_skill

install_skill(skill_id: str) -> str
Source code in src/hiperhealth/pipeline/registry.py
def install_skill(self, skill_id: str) -> str:
    """
    title: Install one skill from a registered channel.
    parameters:
      skill_id:
        type: str
    returns:
      type: str
    """
    if skill_id.startswith(f'{BUILTIN_CHANNEL}.'):
        msg = 'Built-in hiperhealth skills do not need installation.'
        raise ValueError(msg)

    state = self._load_state()
    available = self._find_available_channel_skill(skill_id)
    if available is None:
        msg = (
            f'Skill {skill_id!r} is not available from any registered '
            'channel.'
        )
        raise KeyError(msg)

    existing = state.skills.get(skill_id)
    channel = state.channels[available.available.channel]
    installed_at = existing.installed_at if existing else _utcnow()
    state.skills[skill_id] = InstalledSkillRecord(
        id=skill_id,
        channel=available.available.channel,
        skill_name=available.available.name,
        manifest_path=str(available.manifest_path),
        installed_at=installed_at,
        updated_at=_utcnow(),
        version=available.manifest.version,
        source_commit=channel.commit,
        enabled=available.available.enabled,
    )
    self._save_state(state)
    self._install_dependencies(available.manifest.dependencies)
    return skill_id

list_channel_skills

list_channel_skills(
    local_name: str,
) -> list[AvailableSkillRecord]
Source code in src/hiperhealth/pipeline/registry.py
def list_channel_skills(
    self, local_name: str
) -> list[AvailableSkillRecord]:
    """
    title: List the skills declared by one registered channel.
    parameters:
      local_name:
        type: str
    returns:
      type: list[AvailableSkillRecord]
    """
    state = self._load_state()
    if local_name not in state.channels:
        msg = f'Channel {local_name!r} is not registered.'
        raise KeyError(msg)
    entries = self._iter_channel_skill_entries(local_name)
    return [
        entry.available
        for entry in sorted(
            entries,
            key=lambda entry: entry.available.canonical_id,
        )
    ]

list_channels

list_channels() -> list[ChannelRecord]
Source code in src/hiperhealth/pipeline/registry.py
def list_channels(self) -> list[ChannelRecord]:
    """
    title: List all registered channels.
    returns:
      type: list[ChannelRecord]
    """
    state = self._load_state()
    return [state.channels[name] for name in sorted(state.channels.keys())]

list_skills

list_skills(
    channel: str | None = None, installed_only: bool = False
) -> list[SkillSummary]
Source code in src/hiperhealth/pipeline/registry.py
def list_skills(
    self,
    channel: str | None = None,
    installed_only: bool = False,
) -> list[SkillSummary]:
    """
    title: List built-in and channel skills known to the registry.
    parameters:
      channel:
        type: str | None
      installed_only:
        type: bool
    returns:
      type: list[SkillSummary]
    """
    state = self._load_state()
    summaries: list[SkillSummary] = []

    if channel in (None, BUILTIN_CHANNEL):
        for skill_dir, manifest in self._iter_builtin_skill_entries():
            skill_name = self._builtin_skill_name(manifest, skill_dir)
            canonical_id = _canonical_skill_id(BUILTIN_CHANNEL, skill_name)
            summaries.append(
                SkillSummary(
                    **manifest.model_dump(),
                    channel=BUILTIN_CHANNEL,
                    skill_name=skill_name,
                    canonical_id=canonical_id,
                    manifest_path=str(skill_dir / 'skill.yaml'),
                    installed=True,
                    enabled=True,
                    builtin=True,
                )
            )

    if channel is None:
        channel_names = sorted(state.channels.keys())
    elif channel in state.channels:
        channel_names = [channel]
    else:
        channel_names = []

    for local_name in channel_names:
        installed_ids = {
            skill_id
            for skill_id, record in state.skills.items()
            if record.channel == local_name
        }
        for entry in self._iter_channel_skill_entries(local_name):
            if (
                installed_only
                and entry.available.canonical_id not in installed_ids
            ):
                continue
            summaries.append(
                SkillSummary(
                    **entry.manifest.model_dump(),
                    channel=local_name,
                    skill_name=entry.available.name,
                    canonical_id=entry.available.canonical_id,
                    manifest_path=str(entry.manifest_path),
                    installed=entry.available.canonical_id
                    in installed_ids,
                    enabled=entry.available.enabled,
                    tags=list(entry.available.tags),
                )
            )

    unique: dict[str, SkillSummary] = {
        summary.canonical_id: summary for summary in summaries
    }
    return [unique[key] for key in sorted(unique.keys())]

load

load(name: str) -> BaseSkill
Source code in src/hiperhealth/pipeline/registry.py
def load(self, name: str) -> BaseSkill:
    """
    title: Load a built-in or installed channel skill by name.
    parameters:
      name:
        type: str
    returns:
      type: BaseSkill
    """
    for skill_dir, manifest in self._iter_builtin_skill_entries():
        if manifest.name != name:
            continue
        package_base = f'hiperhealth.skills.{skill_dir.name}'
        cls = _load_class_from_package(package_base, manifest.entry_point)
        skill = cls()
        return self._normalize_loaded_skill(skill, manifest.name)

    state = self._load_state()
    record = state.skills.get(name)
    if record is not None:
        return self._load_channel_skill(record)

    if self._find_available_channel_skill(name) is not None:
        msg = (
            f'Skill {name!r} is available but not installed. Use '
            f'install_skill({name!r}) first.'
        )
        raise KeyError(msg)

    msg = (
        f'Skill {name!r} not found. Use list_skills() to inspect '
        'available skills and install_skill() or install_channel() to add '
        'them.'
    )
    raise KeyError(msg)

remove_channel

remove_channel(local_name: str) -> None
Source code in src/hiperhealth/pipeline/registry.py
def remove_channel(self, local_name: str) -> None:
    """
    title: Remove a registered channel and its installed skills.
    parameters:
      local_name:
        type: str
    """
    state = self._load_state()
    if local_name not in state.channels:
        msg = f'Channel {local_name!r} is not registered.'
        raise KeyError(msg)

    for skill_id, record in list(state.skills.items()):
        if record.channel != local_name:
            continue
        state.skills.pop(skill_id, None)

    shutil.rmtree(self._channel_dir(local_name), ignore_errors=True)
    state.channels.pop(local_name, None)
    self._save_state(state)

remove_skill

remove_skill(skill_id: str) -> None
Source code in src/hiperhealth/pipeline/registry.py
def remove_skill(self, skill_id: str) -> None:
    """
    title: Remove one installed channel skill from the registry.
    parameters:
      skill_id:
        type: str
    """
    if skill_id.startswith(f'{BUILTIN_CHANNEL}.'):
        msg = 'Built-in hiperhealth skills cannot be removed.'
        raise ValueError(msg)

    state = self._load_state()
    record = state.skills.get(skill_id)
    if record is None:
        msg = f'Skill {skill_id!r} is not installed.'
        raise KeyError(msg)

    state.skills.pop(skill_id, None)
    self._save_state(state)

update_channel

update_channel(
    local_name: str, ref: str | None = None
) -> list[str]
Source code in src/hiperhealth/pipeline/registry.py
def update_channel(
    self, local_name: str, ref: str | None = None
) -> list[str]:
    """
    title: Refresh a channel checkout and its installed skills.
    parameters:
      local_name:
        type: str
      ref:
        type: str | None
    returns:
      type: list[str]
    """
    state = self._load_state()
    channel = state.channels.get(local_name)
    if channel is None:
        msg = f'Channel {local_name!r} is not registered.'
        raise KeyError(msg)

    repo_dir = self._channel_repo_dir(local_name)
    target_ref: str | None = None
    if channel.provider == 'local':
        if ref is not None:
            msg = 'ref is only supported for remote git sources.'
            raise ValueError(msg)
        source_dir = Path(channel.source)
        if not source_dir.is_dir():
            msg = (
                f'Local channel source {channel.source!r} no longer '
                'exists.'
            )
            raise FileNotFoundError(msg)
        self._copy_source_tree(source_dir, repo_dir)
    else:
        target_ref = ref if ref is not None else channel.ref
        self._update_repo(repo_dir, ref=target_ref)
    refreshed_channel = self._channel_record_from_repo(
        local_name,
        channel.source,
        repo_dir,
        registered_at=channel.registered_at,
        ref=target_ref,
    )
    state.channels[local_name] = refreshed_channel

    available_map = {
        entry.available.canonical_id: entry
        for entry in self._iter_channel_skill_entries(local_name)
    }
    updated: list[str] = []
    for skill_id, record in list(state.skills.items()):
        if record.channel != local_name:
            continue
        available = available_map.get(skill_id)
        if available is None:
            state.skills.pop(skill_id, None)
            continue

        self._install_dependencies(available.manifest.dependencies)
        state.skills[skill_id] = InstalledSkillRecord(
            id=skill_id,
            channel=local_name,
            skill_name=available.available.name,
            manifest_path=str(available.manifest_path),
            installed_at=record.installed_at,
            updated_at=_utcnow(),
            version=available.manifest.version,
            source_commit=refreshed_channel.commit,
            enabled=available.available.enabled,
        )
        updated.append(skill_id)

    self._save_state(state)
    self._save_channel_metadata(local_name, refreshed_channel, repo_dir)
    return sorted(updated)

update_skill

update_skill(
    skill_id: str, pull_channel: bool = False
) -> str
Source code in src/hiperhealth/pipeline/registry.py
def update_skill(self, skill_id: str, pull_channel: bool = False) -> str:
    """
    title: Refresh one installed skill, optionally pulling its channel.
    parameters:
      skill_id:
        type: str
      pull_channel:
        type: bool
    returns:
      type: str
    """
    state = self._load_state()
    record = state.skills.get(skill_id)
    if record is None:
        msg = f'Skill {skill_id!r} is not installed.'
        raise KeyError(msg)

    if record.channel is None:
        msg = f'Skill {skill_id!r} has no owning channel.'
        raise ValueError(msg)

    if pull_channel:
        self.update_channel(record.channel)
        return skill_id

    available = self._find_available_channel_skill(skill_id)
    if available is None:
        msg = (
            f'Skill {skill_id!r} is no longer declared by channel '
            f'{record.channel!r}.'
        )
        raise KeyError(msg)

    channel = state.channels[record.channel]
    state.skills[skill_id] = InstalledSkillRecord(
        id=skill_id,
        channel=record.channel,
        skill_name=available.available.name,
        manifest_path=str(available.manifest_path),
        installed_at=record.installed_at,
        updated_at=_utcnow(),
        version=available.manifest.version,
        source_commit=channel.commit,
        enabled=available.available.enabled,
    )
    self._save_state(state)
    self._install_dependencies(available.manifest.dependencies)
    return skill_id