Skip to content

Gut Leak Patient — Full Pipeline Example

Gut Leak Patient: End-to-End Pipeline Example

This example walks through a complete clinical workflow for a patient presenting with gut leak symptoms. It demonstrates all six pipeline stages, the session-based workflow with requirement checking, and the multi-visit pattern where deferred data (lab results) arrives later.

Setup

from pathlib import Path
from dotenv import load_dotenv

# Load env vars — try both project root and docs/ as possible working dirs
load_dotenv(Path("tests/.env")) or load_dotenv(Path("../tests/.env"))

from hiperhealth.pipeline import (
    Session,
    Stage,
    create_default_runner,
)

Create a session and runner

The session file is the single source of truth for all interactions. It is a parquet-backed event log — every action is recorded as a row.

import tempfile
from pathlib import Path

session_dir = Path(tempfile.mkdtemp())
session_path = session_dir / "gut-leak-visit.parquet"

session = Session.create(session_path, language="en")
runner = create_default_runner()

This walkthrough intentionally uses only the built-in hiperhealth skills. Once you register a custom channel skill, it participates in the same session-and-runner workflow. An optional appendix near the end shows the channel-based setup and how to compare a run with and without a specific skill.

Stage 1 — Screening

The screening stage performs initial triage and de-identifies any PII in the patient record. The clinical system provides data without personally identifiable information — only clinical content.

session.set_clinical_data({
    "symptoms": (
        "Patient reports chronic bloating, abdominal discomfort after meals, "
        "fatigue, brain fog, and intermittent diarrhea for the past 6 months. "
        "Symptoms worsen with gluten and dairy intake."
    ),
    "age": 38,
    "biological_sex": "female",
    "medical_history": "Irritable bowel syndrome diagnosed 3 years ago",
    "medications": "Omeprazole 20mg daily",
    "allergies": "None known",
})

Run screening to de-identify any PII that may have been inadvertently included:

runner.run_session(Stage.SCREENING, session)
print("Screening complete.")
print("Stages completed:", session.stages_completed)
Screening complete.
Stages completed: [<Stage.SCREENING: 'screening'>]

Stage 2 — Intake

The intake stage extracts structured data from medical reports and wearable files. In this example we simulate providing a previous lab report.

# In a real workflow, you would provide file paths:
# session.provide_answers({"lab_report": "/path/to/previous_labs.pdf"})

# For this example, we provide the extracted data directly
session.provide_answers({
    "previous_labs": {
        "CBC": "within normal limits",
        "CRP": "slightly elevated (5.2 mg/L)",
        "vitamin_D": "low (18 ng/mL)",
        "iron": "low-normal (45 mcg/dL)",
        "ferritin": "low (15 ng/mL)",
    },
})

runner.run_session(Stage.INTAKE, session)
print("Intake complete.")
Intake complete.

Stage 3 — Diagnosis (first pass)

Before running diagnosis, check what information the skills need. This is the requirement checking cycle: assess → provide → execute.

Check requirements

inquiries = runner.check_requirements(Stage.DIAGNOSIS, session)

print(f"Total inquiries: {len(inquiries)}\n")
for inq in inquiries:
    print(f"  [{inq.priority}] {inq.field}: {inq.label}")
    if inq.description:
        print(f"    → {inq.description}")
Total inquiries: 9

  [required] vital_signs: Vital Signs
    → Current temperature, blood pressure, heart rate, respiratory rate, weight, and height to assess baseline status.
  [required] recent_weight_change: Recent Weight Change
    → Amount and time frame of weight loss or gain to evaluate for malabsorption or nutritional deficiency.
  [required] stool_characteristics: Stool Characteristics
    → Frequency, consistency, and presence of blood or mucus in stools to characterize diarrhea.
  [supplementary] family_history: Family Medical History
    → Family history of celiac disease, inflammatory bowel disease, colorectal cancer, or other gastrointestinal disorders.
  [supplementary] social_history: Social History
    → Tobacco and alcohol use, dietary habits beyond gluten and dairy, occupational exposures, and recent travel history.
  [supplementary] additional_past_medical_history: Additional Past Medical History
    → Other chronic conditions, surgeries, or hospitalizations not yet described.
  [supplementary] current_medications_and_supplements: Current Medications and Supplements
    → All prescription, over-the-counter medications, and supplements to assess potential contributors.
  [supplementary] systemic_symptoms: Systemic Symptoms
    → Presence of fevers, night sweats, joint pains, rashes, or other systemic manifestations.
  [deferred] sexual_history: Sexual History
    → Information on sexual activity and practices relevant to gastrointestinal infections.

Provide answers to the inquiries

The clinical system collects answers from the patient or provider and feeds them back into the session. We define an answer bank — a comprehensive dictionary of all clinical information available for this patient — and provide only the fields that were actually requested. Deferred inquiries (like lab results not yet ordered) are skipped for now.

# All clinical data available for this patient at this point.
# In a real system this comes from the EHR or clinician input.
ANSWER_BANK = {
    "chief_complaint": "Chronic bloating and abdominal discomfort for 6 months",
    "dietary_history": (
        "High carbohydrate diet, frequent processed foods, low fiber intake. "
        "Symptoms worsen significantly after gluten-containing meals and dairy. "
        "Patient has tried eliminating gluten for 2 weeks with partial improvement."
    ),
    "bowel_habits": (
        "2-4 loose stools daily, occasional mucus. Bristol stool scale type 5-6. "
        "Urgency after meals, especially breakfast."
    ),
    "family_history": (
        "Mother: celiac disease, hypothyroidism. "
        "Father: type 2 diabetes, hypertension. "
        "Sibling: no significant conditions."
    ),
    "social_history": (
        "Non-smoker, occasional alcohol (1-2 drinks/week). "
        "Office worker, sedentary lifestyle."
    ),
    "stress_level": "High — work-related stress, poor sleep quality (5-6 hours/night)",
    "supplement_history": "Probiotics (generic, 1 month, no improvement), multivitamin",
    "vital_signs": "BP 118/72, HR 74, Temp 36.8°C, BMI 24.1",
    "review_of_systems": (
        "Fatigue, brain fog, occasional joint pain (hands). "
        "No fever, no weight loss, no blood in stool."
    ),
    "sleep_quality": "5-6 hours/night, difficulty falling asleep, non-restorative",
    "exercise_habits": "Minimal — walks 2x/week, 20 minutes",
    "onset_timeline": "Gradual onset ~6 months ago, progressively worsening",
    "weight_changes": "Unintentional loss of 3 kg over 6 months",
    "travel_history": "No recent travel",
    "occupation": "Software engineer, desk-based",
}

# Provide answers only for the fields the LLM identified as missing
answers = {
    inq.field: ANSWER_BANK[inq.field]
    for inq in inquiries
    if inq.priority != "deferred" and inq.field in ANSWER_BANK
}
unanswered = [
    inq.field for inq in inquiries
    if inq.priority != "deferred" and inq.field not in ANSWER_BANK
]

session.provide_answers(answers)
print(f"Answered {len(answers)} inquiries: {list(answers.keys())}")
if unanswered:
    print(f"Not in answer bank (skipped): {unanswered}")
Answered 3 inquiries: ['vital_signs', 'family_history', 'social_history']
Not in answer bank (skipped): ['recent_weight_change', 'stool_characteristics', 'additional_past_medical_history', 'current_medications_and_supplements', 'systemic_symptoms']

Run preliminary diagnosis

This is a first-pass diagnosis with available data. Deferred information (like stool analysis or zonulin levels) is not yet available.

runner.run_session(Stage.DIAGNOSIS, session)

diagnosis = session.results.get(Stage.DIAGNOSIS, {})
print("Preliminary diagnosis complete.")
if hasattr(diagnosis, "summary"):
    print(f"\nSummary: {diagnosis.summary}")
    print(f"\nDifferential:\n{diagnosis.options}")
else:
    print(f"\nResults: {diagnosis}")
Preliminary diagnosis complete.

Results: {'summary': 'A 38-year-old female with a history of IBS presents with six months of postprandial bloating, abdominal discomfort, intermittent diarrhea, fatigue, and brain fog, exacerbated by gluten and dairy. Labs reveal mild inflammation, low vitamin D, and ferritin, with a family history of celiac disease, suggesting a malabsorptive or inflammatory etiology.', 'options': ['Celiac disease', 'Non-celiac gluten sensitivity', 'Lactose intolerance', 'Small intestinal bacterial overgrowth', 'Irritable bowel syndrome with diarrhea', 'Inflammatory bowel disease', 'Pancreatic exocrine insufficiency']}

Stage 4 — Exam

The exam stage suggests laboratory tests and procedures based on the preliminary diagnosis. This is where the system requests the deferred data.

runner.run_session(Stage.EXAM, session)

exam_results = session.results.get(Stage.EXAM, {})
print("Exam suggestions complete.")
if hasattr(exam_results, "summary"):
    print(f"\nSuggested exams: {exam_results.summary}")
    print(f"\nDetails:\n{exam_results.options}")
else:
    print(f"\nResults: {exam_results}")
Exam suggestions complete.

Results: {'summary': 'To distinguish among celiac disease, non-celiac gluten sensitivity, lactose intolerance, small intestinal bacterial overgrowth, IBS-D, inflammatory bowel disease, and pancreatic exocrine insufficiency, a combination of serologic, genetic, endoscopic, breath, and stool tests is recommended.', 'options': ['Serum IgA tissue transglutaminase antibody', 'Serum IgA anti‐endomysial antibody', 'Total serum IgA level', 'HLA-DQ2/DQ8 genotyping', 'Upper endoscopy with duodenal biopsy', 'Lactose hydrogen breath test', 'Lactulose breath test for SIBO', 'Colonoscopy with biopsies', 'Fecal calprotectin', 'Fecal elastase']}

Multi-visit gap — Lab results arrive

In a real clinical workflow, days or weeks may pass while lab work is performed. The session file persists on disk, and the system reloads it when new data arrives.

# Simulate reloading the session (as would happen days later)
session = Session.load(session_path)

# Lab results arrive from the laboratory
session.provide_answers({
    "stool_analysis": {
        "zonulin": "elevated (78 ng/mL, ref: <30)",
        "calprotectin": "mildly elevated (95 mcg/g, ref: <50)",
        "secretory_IgA": "low (42 mg/dL, ref: 70-400)",
        "parasitology": "negative",
        "occult_blood": "negative",
    },
    "food_sensitivity_panel": {
        "IgG_gluten": "highly reactive",
        "IgG_casein": "moderately reactive",
        "IgG_soy": "mildly reactive",
        "IgG_eggs": "non-reactive",
    },
    "intestinal_permeability_test": {
        "lactulose_mannitol_ratio": "elevated (0.09, ref: <0.03)",
        "interpretation": "consistent with increased intestinal permeability",
    },
})

print("Lab results recorded in session.")
print(f"Total events in session: {len(session.events)}")
Lab results recorded in session.
Total events in session: 15

Stage 3 (re-run) — Enriched diagnosis

With lab results now available, re-run the diagnosis stage for a complete clinical picture. The runner uses all accumulated data.

# Check if any requirements are still pending
inquiries = runner.check_requirements(Stage.DIAGNOSIS, session)
deferred = [i for i in inquiries if i.priority == "deferred"]
print(f"Remaining deferred inquiries: {len(deferred)}")

# Run enriched diagnosis
runner.run_session(Stage.DIAGNOSIS, session)

diagnosis = session.results.get(Stage.DIAGNOSIS, {})
print("\nEnriched diagnosis complete (with lab results).")
if hasattr(diagnosis, "summary"):
    print(f"\nSummary: {diagnosis.summary}")
    print(f"\nDifferential:\n{diagnosis.options}")
else:
    print(f"\nResults: {diagnosis}")
Remaining deferred inquiries: 3

Enriched diagnosis complete (with lab results).

Results: {'summary': 'A 38-year-old female with a history of IBS presents with chronic postprandial bloating, abdominal discomfort, intermittent diarrhea, fatigue, and brain fog worsened by gluten and dairy, alongside mild inflammatory markers and increased intestinal permeability. Elevated zonulin, positive IgG reactivity to gluten and casein, low secretory IgA, and family history of celiac disease suggest a gluten-mediated enteropathy or related gut dysfunction.', 'options': ['Celiac disease', 'Non-celiac gluten sensitivity', 'Irritable bowel syndrome (diarrhea-predominant)', 'Small intestinal bacterial overgrowth (SIBO)', 'Lactose intolerance', 'Microscopic colitis']}

Stage 5 — Treatment

The treatment stage generates a plan based on the enriched diagnosis.

# Check treatment requirements
inquiries = runner.check_requirements(Stage.TREATMENT, session)
for inq in inquiries:
    print(f"  [{inq.priority}] {inq.field}: {inq.label}")

# Treatment-specific answer bank
TREATMENT_ANSWERS = {
    "treatment_preferences": (
        "Patient prefers integrative approach. Open to dietary changes "
        "and supplements. Wants to minimize pharmaceutical interventions."
    ),
    "budget_constraints": "Standard insurance coverage, willing to pay OOP for supplements",
    "lifestyle_goals": "Improve energy, reduce bloating, better sleep",
    "dietary_restrictions": "Willing to eliminate gluten and dairy long-term",
    "exercise_tolerance": "Can tolerate light to moderate exercise",
    "compliance_history": "Good compliance with previous IBS dietary recommendations",
}

answers = {
    inq.field: TREATMENT_ANSWERS[inq.field]
    for inq in inquiries
    if inq.priority != "deferred" and inq.field in TREATMENT_ANSWERS
}
session.provide_answers(answers)
print(f"\nAnswered {len(answers)} treatment inquiries: {list(answers.keys())}")

runner.run_session(Stage.TREATMENT, session)

treatment = session.results.get(Stage.TREATMENT, {})
print("Treatment plan complete.")
if isinstance(treatment, dict):
    print(f"\nResults: {treatment}")
Answered 0 treatment inquiries: []
Treatment plan complete.

Results: {}

Stage 6 — Prescription

The prescription stage generates specific prescriptions or supplement recommendations based on the treatment plan.

runner.run_session(Stage.PRESCRIPTION, session)

prescription = session.results.get(Stage.PRESCRIPTION, {})
print("Prescription complete.")
if isinstance(prescription, dict):
    print(f"\nResults: {prescription}")
Prescription complete.

Results: {}

Inspect the full session

The session parquet is a standard file that can be analyzed with any data tool.

Session summary

print(f"Session file: {session.path}")
print(f"Language: {session.language}")
print(f"Stages completed: {session.stages_completed}")
print(f"Total events: {len(session.events)}")
print(f"Pending inquiries: {len(session.pending_inquiries)}")
Session file: /tmp/tmp1e2iz9tw/gut-leak-visit.parquet
Language: en
Stages completed: ['screening', 'intake', 'diagnosis', 'exam', <Stage.DIAGNOSIS: 'diagnosis'>, <Stage.TREATMENT: 'treatment'>, <Stage.PRESCRIPTION: 'prescription'>]
Total events: 27
Pending inquiries: 18

Clinical data accumulated

import json

clinical = session.clinical_data
print(json.dumps({k: v for k, v in clinical.items()
                   if k not in ("previous_labs", "stool_analysis",
                                "food_sensitivity_panel",
                                "intestinal_permeability_test")},
                  indent=2))
{
  "symptoms": "Patient reports chronic bloating, abdominal discomfort after meals, fatigue, brain fog, and intermittent diarrhea for the past 6 months. Symptoms worsen with gluten and dairy intake.",
  "age": 38,
  "biological_sex": "female",
  "medical_history": "Irritable bowel syndrome diagnosed 3 years ago",
  "medications": "Omeprazole 20mg daily",
  "allergies": "None known",
  "vital_signs": "BP 118/72, HR 74, Temp 36.8\u00b0C, BMI 24.1",
  "family_history": "Mother: celiac disease, hypothyroidism. Father: type 2 diabetes, hypertension. Sibling: no significant conditions.",
  "social_history": "Non-smoker, occasional alcohol (1-2 drinks/week). Office worker, sedentary lifestyle."
}

Event log with polars

import polars as pl

df = pl.read_parquet(session_path)

# All events
print(df.select("event_id", "event_type", "stage", "skill_name"))

# Filter to stage completions
completed = df.filter(pl.col("event_type") == "stage_completed")
print(completed.select("stage", "timestamp"))

# Filter to inquiries
raised = df.filter(pl.col("event_type") == "inquiries_raised")
print(raised.select("stage", "skill_name", "data"))

Event log with pandas

import pandas as pd

df = pd.read_parquet(session_path)
df[["event_id", "event_type", "stage", "timestamp"]]

Optional: Add a custom channel skill

Custom skills are now distributed through channels. After registering a channel and installing one of its skills, you can register it on the same runner with its canonical id.

from hiperhealth.pipeline import SkillRegistry

registry = SkillRegistry()
registry.add_channel(
    "https://github.com/my-org/traditional-medicine.git",
    local_name="tm",
)
registry.install_skill("tm.ayurveda")

runner = create_default_runner()
runner.register("tm.ayurveda", index=0)

Optional: Compare a run with and without a skill

For notebook experiments, the runner can temporarily disable one or more registered skills without uninstalling them. This is useful for side-by-side comparisons.

treatment_ctx = session.to_context()

with_skill = runner.run(Stage.TREATMENT, treatment_ctx)

with runner.disabled({"tm.ayurveda"}):
    without_skill = runner.run(Stage.TREATMENT, session.to_context())

# One-off calls can also use: disabled_skills={"tm.ayurveda"}

Cleanup

import shutil
shutil.rmtree(session_dir)

Summary

This example demonstrated the full hiperhealth pipeline for a gut leak patient:

  1. Screening — PII de-identification on intake data
  2. Intake — structured data extraction from reports
  3. Diagnosis (preliminary) — first-pass differential with available data
  4. Exam — suggested lab tests and procedures
  5. Diagnosis (enriched) — re-run with lab results for a complete picture
  6. Treatment — integrative treatment plan
  7. Prescription — specific supplement and medication recommendations

Key patterns shown:

  • Requirement checkingcheck_requirements() before execution, with three priority levels (required, supplementary, deferred)
  • Multi-visit workflow — session persists across days; deferred data (lab results) arrives later and triggers a re-run
  • Channel skills — custom skills can be registered with canonical ids such as tm.ayurveda and used in the same runner
  • A/B comparisonsrunner.disabled({...}) temporarily skips selected skills without uninstalling them
  • Session persistence — parquet event log as single source of truth, queryable with polars/pandas/DuckDB
  • No PII — only clinical data in the session file; the external system maps sessions to patients