import os import sys import time import requests import pandas as pd import openpyxl import questionary from rich.console import Console from rich.logging import RichHandler from rich.progress import Progress, SpinnerColumn, TextColumn from rich import print as rprint from rich.panel import Panel from datetime import datetime # ----------------------------------------------------------------------------- # CONSTANTS & CONFIGURATION # ----------------------------------------------------------------------------- # API Configuration API_URL = "https://api-endo.ziwig.com" LOGIN_ENDPOINT = "/api/auth/login" PDF_ENDPOINT_TEMPLATE = "/api/records/pdf/{}" # Defaults (User Configurable via Prompt) DEFAULT_USER_EMAIL = "abdel.lhachimi@gmail.com" DEFAULT_USER_PASSWORD = "GU$y#C#Cv73XFKyT3j6^" DEFAULT_EXCEL_PATH = r"E:\Ziwig Drive\Ziwig Health\Data\Patients\Records_Status.xlsm" DEFAULT_OUTPUT_ROOT = r"Temp PDF" # Constants (Code Configurable Only) SHEET_TABLE_NAME = "Records_Status" # Name of the Excel Table (ListObject) COL_PATIENT_ID = "id" # Column name for Patient ID COL_PATIENT_NAME = "fullName" # Column name for Patient Name COL_RECORD_FINISHED = "isFinished" # Column name for boolean flag # ----------------------------------------------------------------------------- # SETUP COSOLE # ----------------------------------------------------------------------------- console = Console() # ----------------------------------------------------------------------------- # FUNCTIONS # ----------------------------------------------------------------------------- def get_credentials(): """Prompts for credentials, reiterates on login failure.""" while True: email = questionary.text("Enter Endoconnect Email:", default=DEFAULT_USER_EMAIL).ask() if not email: console.print("[red]User email cannot be empty. Exiting.[/red]") sys.exit(1) password = questionary.password("Enter Endoconnect Password:", default=DEFAULT_USER_PASSWORD).ask() # Verify credentials with console.status("[bold green]Verifying credentials...") as status: token = login(email, password) if token: console.print("[bold green]Login successful![/bold green]") return token, email else: console.print("[bold red]Login failed. Please try again.[/bold red]") def login(email, password): """Authenticates with the API and returns the token or None.""" url = f"{API_URL}{LOGIN_ENDPOINT}" payload = { "email": email, "password": password, "rememberMe": None } try: response = requests.post(url, json=payload, timeout=10) response.raise_for_status() data = response.json() return data.get("token") except requests.exceptions.RequestException as e: # console.print(f"[red]API Error: {e}[/red]") return None def get_excel_table_data(file_path, table_name): """ Locates an Excel Table by name in any sheet and returns it as a DataFrame. """ try: wb = openpyxl.load_workbook(file_path, data_only=True) except Exception as e: console.print(f"[bold red]Error loading Excel file: {e}[/bold red]") sys.exit(1) target_sheet = None target_range = None # Search for the table in all sheets for sheet_name in wb.sheetnames: ws = wb[sheet_name] if table_name in ws.tables: target_sheet = ws target_range = ws.tables[table_name].ref break if not target_sheet: console.print(f"[bold red]Table '{table_name}' not found in workbook.[/bold red]") sys.exit(1) # Extract data from the range data_rows = [] # ws[target_range] returns a tuple of rows rows = list(target_sheet[target_range]) if not rows: return pd.DataFrame() # First row is header headers = [cell.value for cell in rows[0]] for row in rows[1:]: values = [cell.value for cell in row] data_rows.append(values) df = pd.DataFrame(data_rows, columns=headers) return df def download_pdf(token, patient_id, output_path, patient_name): """Downloads the PDF for a patient.""" url = f"{API_URL}{PDF_ENDPOINT_TEMPLATE.format(patient_id)}" headers = {"Authorization": f"Bearer {token}"} start_time = time.time() try: response = requests.get(url, headers=headers, stream=True, timeout=30) response.raise_for_status() with open(output_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) duration = time.time() - start_time return True, duration, None except Exception as e: return False, 0, str(e) def sanitize_filename(name): """Sanitizes the patient name for use as a filename.""" invalid_chars = '<>:"/\\|?*' for char in invalid_chars: name = name.replace(char, '_') return name.strip() # ----------------------------------------------------------------------------- # MAIN # ----------------------------------------------------------------------------- def main(): console.print(Panel.fit("[bold blue]Endoconnect Patient PDF Extractor[/bold blue]")) # 1. Credentials token, user_email = get_credentials() # 2. Configuration (Excel & Output) excel_path = questionary.path("Path to Excel file:", default=DEFAULT_EXCEL_PATH).ask() if not os.path.exists(excel_path): console.print(f"[bold red]File not found: {excel_path}[/bold red]") sys.exit(1) # Output Directory today_str = datetime.now().strftime("PDFs-%Y%m%d") default_output_dir = os.path.join(DEFAULT_OUTPUT_ROOT, today_str) output_dir = questionary.path("Output Directory:", default=default_output_dir).ask() if not os.path.exists(output_dir): try: os.makedirs(output_dir) console.print(f"[green]Created output directory: {output_dir}[/green]") except Exception as e: console.print(f"[bold red]Could not create directory: {e}[/bold red]") sys.exit(1) console.print() # Spacing # 3. Read Data console.print(f"Reading table '{SHEET_TABLE_NAME}' from Excel...") df = get_excel_table_data(excel_path, SHEET_TABLE_NAME) console.print() # Spacing # Validation required_cols = [COL_PATIENT_ID, COL_PATIENT_NAME, COL_RECORD_FINISHED] missing_cols = [c for c in required_cols if c not in df.columns] if missing_cols: console.print(f"[bold red]Missing columns in table: {', '.join(missing_cols)}[/bold red]") sys.exit(1) # Filter # Ensure record_finished is treated as boolean # Handle various truthy values just in case (though Excel boolean is usually 1/0 or True/False) # df[COL_RECORD_FINISHED] = df[COL_RECORD_FINISHED].astype(bool) # Might be risky if nulls # Safe filtering for True values patients_to_process = df[df[COL_RECORD_FINISHED] == True] # Direct comparison for boolean or 1 total_patients = len(patients_to_process) console.print(f"[bold]Found {total_patients} patients to process.[/bold]") console.print() # Add spacing if total_patients == 0: console.print("[yellow]No patients found with record_finished=True. Exiting.[/yellow]") return # 4. Processing Loop # Let's refactor loop to be safer records = patients_to_process.to_dict('records') for i, record in enumerate(records, start=1): p_id = record[COL_PATIENT_ID] p_name_raw = record[COL_PATIENT_NAME] # Normalize Name: Remove extra spaces and Title Case if p_name_raw: p_name = " ".join(str(p_name_raw).split()).title() else: p_name = "Unknown_Patient" safe_name = sanitize_filename(p_name) filename = f"{safe_name}.pdf" file_path = os.path.join(output_dir, filename) # Display Message with colored filename # We construct the visible message separately from the status simple text if needed, # but rich status supports markup. prefix = f"{i}/{total_patients} -" msg_colored = f"{prefix} Downloading [bold cyan]{filename}[/bold cyan]" msg_plain = f"{prefix} Downloading {filename}" with console.status(msg_colored, spinner="dots") as status: success, duration, error = download_pdf(token, p_id, file_path, p_name) if success: console.print(f"[bold green]✓[/bold green] {msg_colored} ({duration:.2f}s)") else: console.print(f"[bold red]✗[/bold red] {msg_colored} [red]ERROR: {error}[/red]") console.print(Panel("[bold green]Extraction Finished![/bold green]")) if __name__ == "__main__": try: main() except KeyboardInterrupt: console.print("\n[yellow]Script interrupted by user.[/yellow]") sys.exit(0) except Exception as e: console.print(f"\n[bold red]An unexpected error occurred: {e}[/bold red]") sys.exit(1) finally: console.print() input("Press Enter to close...")