270 lines
9.5 KiB
Python
270 lines
9.5 KiB
Python
|
|
import os
|
|
import sys
|
|
import time
|
|
import requests
|
|
import pandas as pd
|
|
import openpyxl
|
|
import questionary
|
|
from rich.console import Console
|
|
from rich.logging import RichHandler
|
|
from rich.progress import Progress, SpinnerColumn, TextColumn
|
|
from rich import print as rprint
|
|
from rich.panel import Panel
|
|
from datetime import datetime
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# CONSTANTS & CONFIGURATION
|
|
# -----------------------------------------------------------------------------
|
|
|
|
# API Configuration
|
|
API_URL = "https://api-endo.ziwig.com"
|
|
LOGIN_ENDPOINT = "/api/auth/login"
|
|
PDF_ENDPOINT_TEMPLATE = "/api/records/pdf/{}"
|
|
|
|
# Defaults (User Configurable via Prompt)
|
|
DEFAULT_USER_EMAIL = "abdel.lhachimi@gmail.com"
|
|
DEFAULT_USER_PASSWORD = "GU$y#C#Cv73XFKyT3j6^"
|
|
DEFAULT_EXCEL_PATH = r"E:\Ziwig Drive\Ziwig Health\Data\Patients\Records_Status.xlsm"
|
|
DEFAULT_OUTPUT_ROOT = r"Temp PDF"
|
|
|
|
# Constants (Code Configurable Only)
|
|
SHEET_TABLE_NAME = "Records_Status" # Name of the Excel Table (ListObject)
|
|
COL_PATIENT_ID = "id" # Column name for Patient ID
|
|
COL_PATIENT_NAME = "fullName" # Column name for Patient Name
|
|
COL_RECORD_FINISHED = "isFinished" # Column name for boolean flag
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# SETUP COSOLE
|
|
# -----------------------------------------------------------------------------
|
|
console = Console()
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# FUNCTIONS
|
|
# -----------------------------------------------------------------------------
|
|
|
|
def resolve_path(path):
|
|
"""Returns the absolute path, resolving relative paths against the script directory."""
|
|
if not path:
|
|
return path
|
|
if not os.path.isabs(path):
|
|
# Calculate script directory
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
return os.path.normpath(os.path.join(script_dir, path))
|
|
return path
|
|
|
|
def get_credentials():
|
|
"""Prompts for credentials, reiterates on login failure."""
|
|
while True:
|
|
email = questionary.text("Enter Endoconnect Email:", default=DEFAULT_USER_EMAIL).ask()
|
|
if not email:
|
|
console.print("[red]User email cannot be empty. Exiting.[/red]")
|
|
sys.exit(1)
|
|
|
|
password = questionary.password("Enter Endoconnect Password:", default=DEFAULT_USER_PASSWORD).ask()
|
|
|
|
# Verify credentials
|
|
with console.status("[bold green]Verifying credentials...") as status:
|
|
token = login(email, password)
|
|
if token:
|
|
console.print("[bold green]Login successful![/bold green]")
|
|
return token, email
|
|
else:
|
|
console.print("[bold red]Login failed. Please try again.[/bold red]")
|
|
|
|
def login(email, password):
|
|
"""Authenticates with the API and returns the token or None."""
|
|
url = f"{API_URL}{LOGIN_ENDPOINT}"
|
|
payload = {
|
|
"email": email,
|
|
"password": password,
|
|
"rememberMe": None
|
|
}
|
|
try:
|
|
response = requests.post(url, json=payload, timeout=10)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data.get("token")
|
|
except requests.exceptions.RequestException as e:
|
|
# console.print(f"[red]API Error: {e}[/red]")
|
|
return None
|
|
|
|
def get_excel_table_data(file_path, table_name):
|
|
"""
|
|
Locates an Excel Table by name in any sheet and returns it as a DataFrame.
|
|
"""
|
|
try:
|
|
wb = openpyxl.load_workbook(file_path, data_only=True)
|
|
except Exception as e:
|
|
console.print(f"[bold red]Error loading Excel file: {e}[/bold red]")
|
|
sys.exit(1)
|
|
|
|
target_sheet = None
|
|
target_range = None
|
|
|
|
# Search for the table in all sheets
|
|
for sheet_name in wb.sheetnames:
|
|
ws = wb[sheet_name]
|
|
if table_name in ws.tables:
|
|
target_sheet = ws
|
|
target_range = ws.tables[table_name].ref
|
|
break
|
|
|
|
if not target_sheet:
|
|
console.print(f"[bold red]Table '{table_name}' not found in workbook.[/bold red]")
|
|
sys.exit(1)
|
|
|
|
# Extract data from the range
|
|
data_rows = []
|
|
# ws[target_range] returns a tuple of rows
|
|
rows = list(target_sheet[target_range])
|
|
|
|
if not rows:
|
|
return pd.DataFrame()
|
|
|
|
# First row is header
|
|
headers = [cell.value for cell in rows[0]]
|
|
|
|
for row in rows[1:]:
|
|
values = [cell.value for cell in row]
|
|
data_rows.append(values)
|
|
|
|
df = pd.DataFrame(data_rows, columns=headers)
|
|
return df
|
|
|
|
def download_pdf(token, patient_id, output_path, patient_name):
|
|
"""Downloads the PDF for a patient."""
|
|
url = f"{API_URL}{PDF_ENDPOINT_TEMPLATE.format(patient_id)}"
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
start_time = time.time()
|
|
try:
|
|
response = requests.get(url, headers=headers, stream=True, timeout=30)
|
|
response.raise_for_status()
|
|
|
|
with open(output_path, 'wb') as f:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
f.write(chunk)
|
|
|
|
duration = time.time() - start_time
|
|
return True, duration, None
|
|
except Exception as e:
|
|
return False, 0, str(e)
|
|
|
|
def sanitize_filename(name):
|
|
"""Sanitizes the patient name for use as a filename."""
|
|
invalid_chars = '<>:"/\\|?*'
|
|
for char in invalid_chars:
|
|
name = name.replace(char, '_')
|
|
return name.strip()
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# MAIN
|
|
# -----------------------------------------------------------------------------
|
|
|
|
def main():
|
|
console.print(Panel.fit("[bold blue]Endoconnect Patient PDF Extractor[/bold blue]"))
|
|
|
|
# 1. Credentials
|
|
token, user_email = get_credentials()
|
|
|
|
# 2. Configuration (Excel & Output)
|
|
excel_path = questionary.path("Path to Excel file:", default=resolve_path(DEFAULT_EXCEL_PATH)).ask()
|
|
if not excel_path or not os.path.exists(excel_path):
|
|
console.print(f"[bold red]File not found: {excel_path}[/bold red]")
|
|
sys.exit(1)
|
|
|
|
# Output Directory
|
|
today_str = datetime.now().strftime("PDFs-%Y%m%d")
|
|
default_output_dir = os.path.join(resolve_path(DEFAULT_OUTPUT_ROOT), today_str)
|
|
|
|
output_dir = questionary.path("Output Directory:", default=default_output_dir).ask()
|
|
|
|
if not os.path.exists(output_dir):
|
|
try:
|
|
os.makedirs(output_dir)
|
|
console.print(f"[green]Created output directory: {output_dir}[/green]")
|
|
except Exception as e:
|
|
console.print(f"[bold red]Could not create directory: {e}[/bold red]")
|
|
sys.exit(1)
|
|
|
|
console.print() # Spacing
|
|
|
|
# 3. Read Data
|
|
console.print(f"Reading table '{SHEET_TABLE_NAME}' from Excel...")
|
|
df = get_excel_table_data(excel_path, SHEET_TABLE_NAME)
|
|
console.print() # Spacing
|
|
|
|
# Validation
|
|
required_cols = [COL_PATIENT_ID, COL_PATIENT_NAME, COL_RECORD_FINISHED]
|
|
missing_cols = [c for c in required_cols if c not in df.columns]
|
|
if missing_cols:
|
|
console.print(f"[bold red]Missing columns in table: {', '.join(missing_cols)}[/bold red]")
|
|
sys.exit(1)
|
|
|
|
# Filter
|
|
# Ensure record_finished is treated as boolean
|
|
# Handle various truthy values just in case (though Excel boolean is usually 1/0 or True/False)
|
|
# df[COL_RECORD_FINISHED] = df[COL_RECORD_FINISHED].astype(bool) # Might be risky if nulls
|
|
|
|
# Safe filtering for True values
|
|
patients_to_process = df[df[COL_RECORD_FINISHED] == True] # Direct comparison for boolean or 1
|
|
|
|
total_patients = len(patients_to_process)
|
|
console.print(f"[bold]Found {total_patients} patients to process.[/bold]")
|
|
console.print() # Add spacing
|
|
|
|
if total_patients == 0:
|
|
console.print("[yellow]No patients found with record_finished=True. Exiting.[/yellow]")
|
|
return
|
|
|
|
# 4. Processing Loop
|
|
|
|
# Let's refactor loop to be safer
|
|
records = patients_to_process.to_dict('records')
|
|
|
|
for i, record in enumerate(records, start=1):
|
|
p_id = record[COL_PATIENT_ID]
|
|
p_name_raw = record[COL_PATIENT_NAME]
|
|
|
|
# Normalize Name: Remove extra spaces and Title Case
|
|
if p_name_raw:
|
|
p_name = " ".join(str(p_name_raw).split()).title()
|
|
else:
|
|
p_name = "Unknown_Patient"
|
|
|
|
safe_name = sanitize_filename(p_name)
|
|
filename = f"{safe_name}.pdf"
|
|
file_path = os.path.join(output_dir, filename)
|
|
|
|
# Display Message with colored filename
|
|
# We construct the visible message separately from the status simple text if needed,
|
|
# but rich status supports markup.
|
|
prefix = f"{i}/{total_patients} -"
|
|
msg_colored = f"{prefix} Downloading [bold cyan]{filename}[/bold cyan]"
|
|
msg_plain = f"{prefix} Downloading {filename}"
|
|
|
|
with console.status(msg_colored, spinner="dots") as status:
|
|
success, duration, error = download_pdf(token, p_id, file_path, p_name)
|
|
|
|
if success:
|
|
console.print(f"[bold green]✓[/bold green] {msg_colored} ({duration:.2f}s)")
|
|
else:
|
|
console.print(f"[bold red]✗[/bold red] {msg_colored} [red]ERROR: {error}[/red]")
|
|
|
|
|
|
console.print(Panel("[bold green]Extraction Finished![/bold green]"))
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
console.print("\n[yellow]Script interrupted by user.[/yellow]")
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
console.print(f"\n[bold red]An unexpected error occurred: {e}[/bold red]")
|
|
sys.exit(1)
|
|
finally:
|
|
console.print()
|
|
input("Press Enter to close...")
|