|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import json |
|
|
import shutil |
|
|
import html |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, List, Optional |
|
|
from datetime import datetime |
|
|
|
|
|
import streamlit as st |
|
|
|
|
|
from src import store, billing, modal_templates, config |
|
|
from src.styles import inject_base_css |
|
|
from src.paths import faiss_index_dir, initialize_environment |
|
|
from src.prompt_builder import build_referral_summary |
|
|
from src.explainability import text_hash, normalize_text, is_stale |
|
|
from src.guideline_annotator import generate_guideline_rationale |
|
|
from src.ai_core import generate_soap_draft |
|
|
from src.model_loader import active_model_status |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.set_page_config(page_title="Step 2 β Workflow", page_icon="π©Ί", layout="wide") |
|
|
inject_base_css() |
|
|
initialize_environment() |
|
|
st.session_state.setdefault("_show_exports", True) |
|
|
|
|
|
st.title("Step 2 β Workflow") |
|
|
st.caption("PCP Referral β Specialist Review β Documentation & Billing. Demo only β de-identified data; not for clinical use.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _model_banner() -> None: |
|
|
status = active_model_status() |
|
|
sel = status.get("selected_id", "") |
|
|
device = status.get("device", "CPU") |
|
|
if device == "CPU": |
|
|
text = f"βοΈ Using {sel} on CPU (fallback mode)" |
|
|
else: |
|
|
short = "27B" if "27" in str(sel) else ("4B" if "4" in str(sel) else "GPU") |
|
|
text = f"βοΈ Using {sel} on GPU ({short})" |
|
|
hint = f"Primary={status.get('primary_id','')} | Fallback={status.get('fallback_id','')} | forced_cpu={status.get('forced_cpu', False)}" |
|
|
st.markdown( |
|
|
f"<div style='text-align:right'><span class='badge accent' title='{hint}'>{text}</span></div>", |
|
|
unsafe_allow_html=True, |
|
|
) |
|
|
|
|
|
def _faiss_index_present() -> bool: |
|
|
idx = faiss_index_dir() |
|
|
return (idx / "faiss.index").exists() and (idx / "chunks.jsonl").exists() and (idx / "index_info.json").exists() |
|
|
|
|
|
def _as_text(x: Any) -> str: |
|
|
return x if isinstance(x, str) else ("\n".join(x) if isinstance(x, list) else str(x or "")) |
|
|
|
|
|
def _status_badge(status: str) -> str: |
|
|
label = {"draft": "Draft", "submitted": "In Review", "completed": "Completed"}.get(status, status.title()) |
|
|
klass = "badge" + (" accent" if status in {"submitted"} else "") |
|
|
return f"<span class='{klass}'>{label}</span>" |
|
|
|
|
|
def _case_summary_header(case: Dict[str, Any]) -> None: |
|
|
p = case.get("patient", {}) or {} |
|
|
c = case.get("consult", {}) or {} |
|
|
left, right = st.columns([3, 1]) |
|
|
with left: |
|
|
demo = " β’ ".join([s for s in [p.get("name") or "Unknown", str(p.get("age") or ""), p.get("sex") or ""] if s]) |
|
|
specialty = c.get("specialty") or "β" |
|
|
st.subheader(demo) |
|
|
st.caption(f"Specialty: {specialty}") |
|
|
with right: |
|
|
st.markdown(_status_badge(case.get("status", "draft")), unsafe_allow_html=True) |
|
|
|
|
|
def _green_info_box(title: str, html_body: str) -> None: |
|
|
st.markdown( |
|
|
f""" |
|
|
<div style=" |
|
|
background:#e8f5e9; |
|
|
border-left:4px solid #2e7d32; |
|
|
padding:0.75rem 1rem; |
|
|
border-radius:6px; |
|
|
margin-top:0.25rem;"> |
|
|
<div style="font-weight:600;">{title}</div> |
|
|
<div style="margin-top:0.25rem;">{html_body}</div> |
|
|
</div> |
|
|
""", |
|
|
unsafe_allow_html=True, |
|
|
) |
|
|
|
|
|
def _render_guideline_bullets(items: List[str]) -> None: |
|
|
if not items: |
|
|
return |
|
|
lis = "".join(f"<li>{html.escape(i)}</li>" for i in items) |
|
|
_green_info_box("π Guideline Rationale", f"<ul style='margin:0 0 0 1.1rem'>{lis}</ul>") |
|
|
|
|
|
def _exports_dir() -> Path: |
|
|
|
|
|
try: |
|
|
pd = getattr(config, "paths_dict", None) |
|
|
if callable(pd): |
|
|
return Path(pd()["exports_dir"]) |
|
|
except Exception: |
|
|
pass |
|
|
try: |
|
|
ed = getattr(config, "EXPORTS_DIR", None) or getattr(config, "exports_dir", None) |
|
|
if ed: |
|
|
return Path(ed) |
|
|
except Exception: |
|
|
pass |
|
|
return Path("exports") |
|
|
|
|
|
def _latest_export(case_id: str, pattern_suffix: str) -> Optional[Path]: |
|
|
|
|
|
|
|
|
|
|
|
exp = _exports_dir() |
|
|
if not exp.exists(): |
|
|
return None |
|
|
matches = sorted(exp.glob(f"EC-{case_id}_*{pattern_suffix}"), key=lambda p: p.stat().st_mtime) |
|
|
return matches[-1] if matches else None |
|
|
|
|
|
def _note_markdown(case: Dict[str, Any], summary: str, soap: Dict[str, str], |
|
|
guideline_points: List[str], endnotes: List[Dict[str, Any]]) -> str: |
|
|
"""Generate formatted EHR-style consultation report (matches sample-like format).""" |
|
|
p = case.get("patient", {}) or {} |
|
|
c = case.get("consult", {}) or {} |
|
|
consult_id = case.get("case_id", "") |
|
|
today = datetime.utcnow().strftime("%B %d, %Y") |
|
|
|
|
|
patient_line = f"**Patient:** {p.get('name','Unknown')} ({p.get('sex','')}, {p.get('age','')})" |
|
|
specialty = c.get("specialty") or "Cardiology" |
|
|
question = c.get("question") or "" |
|
|
referring = c.get("referrer") or "Referring provider not specified" |
|
|
|
|
|
header = ( |
|
|
f"# {specialty} E-Consult Report\n\n" |
|
|
f"{patient_line} \n" |
|
|
f"**Consult ID:** {consult_id} \n" |
|
|
f"**Date:** {today} \n" |
|
|
f"**Specialty:** {specialty} \n" |
|
|
f"**Referring Provider:** {referring}\n\n" |
|
|
"---\n\n" |
|
|
) |
|
|
|
|
|
referral_section = "" |
|
|
if question: |
|
|
referral_section += f"### Referral Question\n> {question.strip()}\n\n" |
|
|
if summary.strip(): |
|
|
referral_section += f"### Clinical Summary\n{summary.strip()}\n\n---\n\n" |
|
|
|
|
|
soap_section = ( |
|
|
f"### Subjective\n{(soap.get('subjective') or '').strip()}\n\n" |
|
|
f"### Objective\n{(soap.get('objective') or '').strip()}\n\n" |
|
|
f"### Assessment\n{(soap.get('assessment') or '').strip()}\n\n" |
|
|
f"### Plan\n{(soap.get('plan') or '').strip()}\n\n---\n\n" |
|
|
) |
|
|
|
|
|
guideline_section = "" |
|
|
if guideline_points: |
|
|
bullets = "\n".join([f"- {pt}" for pt in guideline_points]) |
|
|
guideline_section = f"### Guideline Alignment\n{bullets}\n\n---\n\n" |
|
|
|
|
|
endnote_section = "" |
|
|
if endnotes: |
|
|
cites = "\n".join( |
|
|
[f"- [{e.get('n', i+1)}] {e.get('doc','Guideline')}" |
|
|
+ (f" p.{e.get('page','')}" if e.get('page') else "") for i, e in enumerate(endnotes)] |
|
|
) |
|
|
endnote_section = f"### References\n{cites}\n\n---\n\n" |
|
|
|
|
|
attestation = ( |
|
|
"### Consulting Clinician Attestation\n" |
|
|
"I personally reviewed the provided information, generated this report, " |
|
|
"and confirm that I spent the documented time reviewing and documenting this e-consult.\n" |
|
|
) |
|
|
|
|
|
return header + referral_section + soap_section + guideline_section + endnote_section + attestation |
|
|
|
|
|
def _soap_is_empty(soap: Dict[str, str]) -> bool: |
|
|
return not any((soap or {}).get(k, "").strip() for k in ("subjective", "objective", "assessment", "plan")) |
|
|
|
|
|
def _seed_once() -> None: |
|
|
if not st.session_state.get("_seeded", False): |
|
|
store.seed_cases(reset=True) |
|
|
st.session_state["_seeded"] = True |
|
|
|
|
|
def _case_options() -> List[Dict[str, Any]]: |
|
|
items = store.list_cases() |
|
|
if not items: |
|
|
_seed_once() |
|
|
items = store.list_cases() |
|
|
return items |
|
|
|
|
|
def _get_case(case_id: str) -> Dict[str, Any]: |
|
|
return store.read_case(case_id) or {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_model_banner() |
|
|
|
|
|
if not _faiss_index_present(): |
|
|
with st.container(): |
|
|
st.info("Guideline rationales need a FAISS index. No index detected β build it in **Step 1 β RAG Corpus Prep**.", icon="βΉοΈ") |
|
|
try: |
|
|
st.page_link("pages/01_RAG_Corpus_Prep.py", label="Open RAG Prep") |
|
|
except Exception: |
|
|
st.caption("Open the βStep 1 β RAG Corpus Prepβ page from the sidebar.") |
|
|
|
|
|
st.divider() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not store.list_cases(): |
|
|
_seed_once() |
|
|
|
|
|
items = _case_options() |
|
|
if not items: |
|
|
st.error("No cases available.") |
|
|
st.stop() |
|
|
|
|
|
|
|
|
if st.button("β New Case", use_container_width=True): |
|
|
new_id = store.new_case_id() |
|
|
store.create_case({ |
|
|
"case_id": new_id, |
|
|
"status": "draft", |
|
|
"patient": {"name": "New Patient", "age": None, "sex": ""}, |
|
|
"consult": { |
|
|
"specialty": "Cardiology", |
|
|
"question": "", |
|
|
"summary": "", |
|
|
"medications": "", |
|
|
"labs": "", |
|
|
"consent_obtained": False, |
|
|
}, |
|
|
"soap_draft": {"subjective": "", "objective": "", "assessment": "", "plan": ""}, |
|
|
"billing": {"minutes": 5, "spoke": False, "cpt_code": None, "attested": False}, |
|
|
"explainability": { |
|
|
"baseline": {"assessment_hash": "", "plan_hash": ""}, |
|
|
"guideline_rationale": [], |
|
|
}, |
|
|
}) |
|
|
st.success(f"Created new case {new_id}. Refreshing listβ¦") |
|
|
st.rerun() |
|
|
|
|
|
|
|
|
if st.button("ποΈ Reset Demo (clear all cases)", use_container_width=True): |
|
|
try: |
|
|
store.seed_cases(reset=True) |
|
|
st.success("β
All demo cases cleared and re-seeded.") |
|
|
st.session_state["_seeded"] = True |
|
|
st.rerun() |
|
|
except Exception as e: |
|
|
st.error(f"Failed to reset cases: {e}") |
|
|
|
|
|
|
|
|
default_case_id = st.session_state.get("_current_case_id") or items[0]["case_id"] |
|
|
opt_by_id = {i["case_id"]: i for i in items} |
|
|
case_ids = list(opt_by_id.keys()) |
|
|
selected = st.selectbox( |
|
|
"Select case", |
|
|
options=case_ids, |
|
|
index=case_ids.index(default_case_id) if default_case_id in case_ids else 0, |
|
|
format_func=lambda cid: f"{cid} β {opt_by_id[cid].get('patient_name') or ''} [{opt_by_id[cid]['status']}]", |
|
|
) |
|
|
st.session_state["_current_case_id"] = selected |
|
|
case = _get_case(selected) |
|
|
_case_summary_header(case) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tab_pcp, tab_spec, tab_bill = st.tabs(["PCP Referral", "Specialist Review", "Documentation & Billing"]) |
|
|
|
|
|
|
|
|
|
|
|
with tab_pcp: |
|
|
is_draft = case.get("status") == "draft" |
|
|
p = case.get("patient", {}) or {} |
|
|
c = case.get("consult", {}) or {} |
|
|
|
|
|
c1, c2, c3 = st.columns([2, 1, 1]) |
|
|
with c1: |
|
|
patient_name = st.text_input("Patient Name", value=p.get("name", ""), disabled=not is_draft, key=f"pt_name_{selected}") |
|
|
consult_q = st.text_area("Consult Question", value=c.get("question", ""), height=80, disabled=not is_draft, key=f"q_{selected}") |
|
|
clinical_summary = st.text_area( |
|
|
"Clinical Summary", |
|
|
value=c.get("summary", "") or c.get("history", ""), |
|
|
height=160, |
|
|
placeholder=("E.g., NYHA III HFrEF (EF 30%). BP 110/70 HR 62. Echo EF 30%, mild MR. Stable; med optimization."), |
|
|
disabled=not is_draft, |
|
|
key=f"summary_{selected}", |
|
|
) |
|
|
with c2: |
|
|
age = st.number_input("Age", min_value=0, max_value=120, value=int(p.get("age") or 0), disabled=not is_draft, key=f"age_{selected}") |
|
|
sex = st.selectbox("Sex", ["F", "M", "Other", ""], index=["F","M","Other",""].index(p.get("sex") or ""), disabled=not is_draft, key=f"sex_{selected}") |
|
|
specialty = st.text_input("Specialty", value=c.get("specialty", "") or "", disabled=not is_draft, key=f"spec_{selected}") |
|
|
with c3: |
|
|
medications = st.text_area("Medications", value=c.get("medications", ""), height=100, disabled=not is_draft, key=f"meds_{selected}") |
|
|
labs = st.text_area("Labs / Key Values", value=c.get("labs", ""), height=100, disabled=not is_draft, key=f"labs_{selected}") |
|
|
|
|
|
consent = bool(c.get("consent_obtained", False)) |
|
|
consent = st.checkbox("Patient consent obtained", value=consent, disabled=not is_draft, key=f"consent_{selected}") |
|
|
|
|
|
col = st.columns([1,1,6])[0] |
|
|
with col: |
|
|
if is_draft: |
|
|
if st.button("Submit Referral", type="primary", use_container_width=True, key=f"submit_{selected}"): |
|
|
if not consent: |
|
|
st.error("Consent is required to submit.") |
|
|
else: |
|
|
patch = { |
|
|
"patient": {"name": patient_name.strip(), "age": int(age) if age else None, "sex": sex}, |
|
|
"consult": { |
|
|
"specialty": specialty.strip(), |
|
|
"question": consult_q.strip(), |
|
|
"summary": clinical_summary.strip(), |
|
|
"medications": medications.strip(), |
|
|
"labs": labs.strip(), |
|
|
"consent_obtained": True, |
|
|
}, |
|
|
} |
|
|
store.update_case(selected, patch) |
|
|
store.set_status(selected, "submitted") |
|
|
st.success("Referral submitted. Activate the Specialist Review tab to continue.") |
|
|
st.rerun() |
|
|
else: |
|
|
st.info("Referral is not editable (status is In Review or Completed).") |
|
|
|
|
|
|
|
|
|
|
|
with tab_spec: |
|
|
status = case.get("status") |
|
|
if status == "draft": |
|
|
st.info("Submit the referral on the PCP tab to activate Specialist Review.") |
|
|
st.stop() |
|
|
|
|
|
read_only = status == "completed" |
|
|
|
|
|
st.subheader("SOAP Draft") |
|
|
soap = case.get("soap_draft", {}) or {"subjective": "", "objective": "", "assessment": "", "plan": ""} |
|
|
|
|
|
|
|
|
gen_needed = (status == "submitted") and _soap_is_empty(soap) |
|
|
if gen_needed and not read_only: |
|
|
if st.button("Generate SOAP Draft (LLM)", key=f"gen_{selected}"): |
|
|
try: |
|
|
result = generate_soap_draft( |
|
|
intake=case, |
|
|
max_new_tokens=550, |
|
|
temperature=0.2, |
|
|
top_p=0.95, |
|
|
) |
|
|
s = result.get("soap", {}) or {} |
|
|
subj = _as_text(s.get("subjective", "")) |
|
|
obj = _as_text(s.get("objective", "")) |
|
|
assess = _as_text(s.get("assessment", "")) |
|
|
plan = _as_text(s.get("plan", "")) |
|
|
|
|
|
explain_patch = { |
|
|
"baseline": {"assessment_hash": text_hash(assess), "plan_hash": text_hash(plan)}, |
|
|
"guideline_rationale": [], |
|
|
} |
|
|
except Exception as e: |
|
|
subj, obj, assess, plan = build_referral_summary(case), "", "β", "β" |
|
|
explain_patch = {"baseline": {"assessment_hash": text_hash(assess), "plan_hash": text_hash(plan)}, "guideline_rationale": []} |
|
|
st.warning(f"LLM generation unavailable; seeded a minimal draft. ({type(e).__name__})") |
|
|
result = {"error": str(e)} |
|
|
|
|
|
store.update_case(selected, { |
|
|
"soap_draft": {"subjective": subj.strip(), "objective": obj.strip(), "assessment": assess.strip(), "plan": plan.strip()}, |
|
|
"explainability": explain_patch, |
|
|
}) |
|
|
|
|
|
st.success("SOAP draft generated. If fields appear empty, toggle tabs once to refresh the view.") |
|
|
|
|
|
|
|
|
c1, c2 = st.columns(2) |
|
|
with c1: |
|
|
subj_new = st.text_area("Subjective", value=soap.get("subjective",""), height=140, disabled=read_only, key=f"subj_{selected}") |
|
|
with c2: |
|
|
obj_new = st.text_area("Objective", value=soap.get("objective",""), height=140, disabled=read_only, key=f"obj_{selected}") |
|
|
assess_new = st.text_area("Assessment", value=soap.get("assessment",""), height=140, disabled=read_only, key=f"assess_{selected}") |
|
|
plan_new = st.text_area("Plan", value=soap.get("plan",""), height=180, disabled=read_only, key=f"plan_{selected}") |
|
|
|
|
|
|
|
|
if (not read_only) and (status == "submitted"): |
|
|
changed = any([ |
|
|
normalize_text(subj_new) != normalize_text(soap.get("subjective") or ""), |
|
|
normalize_text(obj_new) != normalize_text(soap.get("objective") or ""), |
|
|
normalize_text(assess_new) != normalize_text(soap.get("assessment") or ""), |
|
|
normalize_text(plan_new) != normalize_text(soap.get("plan") or ""), |
|
|
]) |
|
|
if changed: |
|
|
store.update_case(selected, { |
|
|
"soap_draft": {"subjective": subj_new, "objective": obj_new, "assessment": assess_new, "plan": plan_new}, |
|
|
}) |
|
|
|
|
|
|
|
|
exp = (case.get("explainability", {}) or {}) |
|
|
baseline = (exp.get("baseline") or {}) |
|
|
assess_hash0 = str(baseline.get("assessment_hash") or "") |
|
|
plan_hash0 = str(baseline.get("plan_hash") or "") |
|
|
|
|
|
stale = is_stale(assess_new, assess_hash0) or is_stale(plan_new, plan_hash0) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if _soap_is_empty(soap): |
|
|
|
|
|
|
|
|
pass |
|
|
else: |
|
|
|
|
|
if not _soap_is_empty(soap): |
|
|
if not stale and (normalize_text(assess_new + plan_new)): |
|
|
|
|
|
stored_points = (exp.get("guideline_rationale") or []) |
|
|
if stored_points: |
|
|
_render_guideline_bullets(stored_points) |
|
|
else: |
|
|
g = generate_guideline_rationale(assessment_text=assess_new, plan_text=plan_new) |
|
|
warn = (g.get("warning") or "").strip() |
|
|
if warn: |
|
|
st.info(warn) |
|
|
try: |
|
|
st.page_link("pages/01_RAG_Corpus_Prep.py", label="Open RAG Prep") |
|
|
except Exception: |
|
|
pass |
|
|
points = g.get("rationale", []) or [] |
|
|
_render_guideline_bullets(points) |
|
|
if points: |
|
|
store.update_case(selected, {"explainability": {"guideline_rationale": points}}) |
|
|
else: |
|
|
if (not read_only) and (status == "submitted"): |
|
|
if st.button("β» Re-run Guidelines", key=f"rerun_{selected}"): |
|
|
g = generate_guideline_rationale(assessment_text=assess_new, plan_text=plan_new) |
|
|
points = g.get("rationale", []) or [] |
|
|
store.update_case(selected, { |
|
|
"explainability": { |
|
|
"baseline": {"assessment_hash": text_hash(assess_new), "plan_hash": text_hash(plan_new)}, |
|
|
"guideline_rationale": points, |
|
|
} |
|
|
}) |
|
|
st.rerun() |
|
|
else: |
|
|
st.caption("Guideline rationale hidden after edits. Click **Re-run Guidelines** to refresh.") |
|
|
|
|
|
st.divider() |
|
|
|
|
|
|
|
|
st.subheader("Specialist Review") |
|
|
b = case.get("billing", {}) or {} |
|
|
minutes_val = int(b.get("minutes", 5)) |
|
|
minutes = st.slider("Time spent (minutes)", min_value=5, max_value=30, value=minutes_val, step=1, disabled=read_only, key=f"mins_{selected}") |
|
|
spoke = bool(b.get("spoke", False)) |
|
|
spoke = st.checkbox("Spoke to Referring Physician", value=spoke, disabled=read_only, key=f"spoke_{selected}") |
|
|
attested = bool(b.get("attested", False)) if read_only else False |
|
|
attested = st.checkbox("I attest that the consult note is complete.", value=attested, disabled=read_only, key=f"att_{selected}") |
|
|
|
|
|
|
|
|
if (not read_only) and (status == "submitted"): |
|
|
if (minutes != minutes_val) or (spoke != bool(b.get("spoke", False))): |
|
|
store.update_case(selected, {"billing": {"minutes": minutes, "spoke": bool(spoke)}}) |
|
|
|
|
|
|
|
|
can_finalize = (not read_only) and (minutes >= 5) and bool(st.session_state.get(f"att_{selected}", False)) |
|
|
finalize_btn = st.button("Finalize Consult", type="primary", disabled=not can_finalize, key=f"finalize_{selected}") |
|
|
|
|
|
if finalize_btn: |
|
|
|
|
|
points = (case.get("explainability", {}) or {}).get("guideline_rationale") or [] |
|
|
endnotes: List[Dict[str, Any]] = [] |
|
|
|
|
|
soap_now = case.get("soap_draft", {}) or {} |
|
|
note_md = _note_markdown( |
|
|
case, |
|
|
build_referral_summary(case), |
|
|
{ |
|
|
"subjective": subj_new or soap_now.get("subjective",""), |
|
|
"objective": obj_new or soap_now.get("objective",""), |
|
|
"assessment": assess_new or soap_now.get("assessment",""), |
|
|
"plan": plan_new or soap_now.get("plan",""), |
|
|
}, |
|
|
points, endnotes=endnotes |
|
|
) |
|
|
|
|
|
|
|
|
note_path = config.make_export_path(selected, "Sample Consultation Report.md") |
|
|
Path(note_path).write_text(note_md, encoding="utf-8") |
|
|
|
|
|
|
|
|
store.update_case(selected, { |
|
|
"billing": {"minutes": minutes, "spoke": bool(spoke), "attested": True}, |
|
|
"status": "completed", |
|
|
}) |
|
|
|
|
|
st.session_state["_show_exports"] = True |
|
|
st.success("β
Finalized. Consult note exported; proceed to the **Documentation & Billing** tab to submit a claim.") |
|
|
st.caption(f"Note: {note_path}") |
|
|
|
|
|
|
|
|
|
|
|
with tab_bill: |
|
|
status = case.get("status") |
|
|
if status != "completed": |
|
|
st.info("π Complete the consult first in the **Specialist Review** tab to access Documentation & Billing.") |
|
|
st.stop() |
|
|
|
|
|
|
|
|
st.subheader("EHR Consultation Note") |
|
|
note_path = _latest_export(selected, "Sample Consultation Report.md") |
|
|
if note_path and note_path.exists(): |
|
|
with st.expander("π View EHR Consultation Note", expanded=True): |
|
|
md = note_path.read_text(encoding="utf-8") |
|
|
st.markdown(md) |
|
|
st.caption(f"File: {note_path.name}") |
|
|
else: |
|
|
st.warning("No consultation note found for this case. Finalize the consult to generate one.") |
|
|
|
|
|
st.divider() |
|
|
|
|
|
|
|
|
st.subheader("CPT Suggestions & Claim") |
|
|
b = case.get("billing", {}) or {} |
|
|
minutes = int(b.get("minutes", 5)) |
|
|
spoke = bool(b.get("spoke", False)) |
|
|
|
|
|
suggestions = billing.autosuggest_cpt(minutes=minutes, spoke=spoke) |
|
|
elig_codes = [s.code for s in suggestions if s.eligible] |
|
|
|
|
|
st.markdown("**CPT Autosuggest**") |
|
|
for s in suggestions: |
|
|
with st.container(): |
|
|
icon = "β
" if s.eligible else "β οΈ" |
|
|
st.write(f"{icon} **{s.code}** β {s.descriptor} β ${s.rate:.2f}") |
|
|
st.caption(s.why) |
|
|
|
|
|
|
|
|
soap_now = case.get("soap_draft", {}) or {} |
|
|
icd_suggestions = billing.autosuggest_icd( |
|
|
assessment_text=soap_now.get("assessment", ""), |
|
|
plan_text=soap_now.get("plan", "") |
|
|
) |
|
|
st.markdown("**ICD-10 Diagnosis Suggestions**") |
|
|
if icd_suggestions: |
|
|
for s in icd_suggestions: |
|
|
st.write(f"β
**{s.code}** β {s.description} (confidence {int(s.confidence*100)}%)") |
|
|
st.caption(s.why) |
|
|
icd_options = [f"{s.code} β {s.description}" for s in icd_suggestions] |
|
|
picked_icd = st.multiselect( |
|
|
"Select diagnosis code(s) to include on the claim", |
|
|
options=icd_options, |
|
|
default=icd_options[:1], |
|
|
key=f"icd_sel_{selected}" |
|
|
) |
|
|
picked_icd_codes = [o.split(" β ")[0] for o in picked_icd] |
|
|
else: |
|
|
st.info("No ICD-10 suggestions available for this case.") |
|
|
picked_icd_codes: List[str] = [] |
|
|
|
|
|
chosen_default = b.get("cpt_code") or (elig_codes[0] if elig_codes else None) |
|
|
if elig_codes: |
|
|
chosen = st.selectbox( |
|
|
"Choose CPT (eligible)", |
|
|
options=elig_codes, |
|
|
index=elig_codes.index(chosen_default) if (chosen_default in elig_codes) else 0, |
|
|
key=f"cpt_{selected}" |
|
|
) |
|
|
else: |
|
|
st.warning("No eligible CPT for the current minutes/spoke combination.", icon="β οΈ") |
|
|
chosen = None |
|
|
|
|
|
att_claim = st.checkbox("I attest the claim details are accurate and ready to submit.", key=f"claim_att_{selected}") |
|
|
|
|
|
submit_claim = st.button("Submit Claim", type="primary", disabled=not (chosen and att_claim), key=f"submit_claim_{selected}") |
|
|
|
|
|
if submit_claim and chosen: |
|
|
|
|
|
store.update_case(selected, { |
|
|
"billing": { |
|
|
"cpt_code": chosen, |
|
|
"icd_codes": picked_icd_codes, |
|
|
} |
|
|
}) |
|
|
|
|
|
|
|
|
pick = next((s for s in suggestions if s.code == chosen), None) |
|
|
rate = float(getattr(pick, "rate", 0.0)) if pick else 0.0 |
|
|
claim = billing.build_837_claim( |
|
|
case, |
|
|
code=str(chosen), |
|
|
rate=rate, |
|
|
minutes=minutes, |
|
|
spoke=bool(spoke), |
|
|
attested=True, |
|
|
icd_codes=picked_icd_codes, |
|
|
) |
|
|
claim_path = config.make_export_path(selected, "Sample 837 Json.json") |
|
|
Path(claim_path).write_text(json.dumps(claim, ensure_ascii=False, indent=2), encoding="utf-8") |
|
|
|
|
|
st.success("Claim submitted. 837 JSON generated (see preview below).") |
|
|
st.caption(f"Claim: {claim_path.name}") |
|
|
|
|
|
|
|
|
claim_path = _latest_export(selected, "Sample 837 Json.json") |
|
|
if claim_path and claim_path.exists(): |
|
|
with st.expander("π§Ύ View 837 JSON Message", expanded=True): |
|
|
try: |
|
|
claim_data = json.loads(claim_path.read_text(encoding="utf-8")) |
|
|
st.json(claim_data) |
|
|
except Exception: |
|
|
st.code(claim_path.read_text(encoding="utf-8"), language="json") |
|
|
st.caption(f"File: {claim_path.name}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|