diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b7e4351 --- /dev/null +++ b/.gitignore @@ -0,0 +1,25 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +.env +venv/ +ENV/ +NUL diff --git a/config/config.xlsx b/config/config.xlsx new file mode 100644 index 0000000..823b556 Binary files /dev/null and b/config/config.xlsx differ diff --git a/eb_script_template.py b/eb_script_template.py deleted file mode 100644 index c4af497..0000000 --- a/eb_script_template.py +++ /dev/null @@ -1,699 +0,0 @@ -""" -Endobest Script Template - -Template for creating scripts to access Endobest clinical research platform data. - -FEATURES: -- Multi-microservice authentication (IAM, RC, GDD) -- Thread-safe HTTP client pool -- Multithreading with main pool + subtasks pool -- Automatic retry with token refresh on 401 -- Progress bars and logging -- Utilities for JSON navigation - -HOW TO USE: -1. Configure MICROSERVICES dict (comment unused services) -2. Implement your processing logic in main() function -3. Use API templates as examples for your own endpoints -4. Customize constants as needed (timeouts, threads, etc.) - -QUICK START: -- Run script: python eb_script_template.py -- Login with credentials (defaults provided) -- Choose number of threads -- Processing happens in main() TODO block - -For detailed documentation, see Script_template_spec.md -""" - -import json -import logging -import os -import sys -import threading -import traceback -from concurrent.futures import ThreadPoolExecutor, as_completed -from datetime import timedelta -from time import perf_counter, sleep -import functools - -import httpx -import questionary -from tqdm import tqdm -from rich.console import Console - - -# ============================================================================ -# CONFIGURATION - CREDENTIALS -# ============================================================================ - -DEFAULT_USER_NAME = "ziwig-invest2@yopmail.com" -DEFAULT_PASSWORD = "pbrrA765$bP3beiuyuiyhiuy!agx" -REALME = "ziwig-pro" - - -# ============================================================================ -# CONFIGURATION - MICROSERVICES -# ============================================================================ -# Comment out unused microservices to skip their token configuration - -MICROSERVICES = { - "IAM": { - "app_id": None, # IAM doesn't use app_id - "base_url": "https://api-auth.ziwig-connect.com", - "endpoints": { - "login": "/api/auth/{REALME}/login", # POST : Body = {"username": "{user_name}", "password": "{pass}"} - "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} - "get_roles": "/api/profiles/paginate", # POST : Body = {"limit": 100, "currentPage": 1, "sort": [], "filters": {}} - "get_user_by_id": "/api/users/find/{user_id}?domaine={REALME}", # GET - "get_applications": "/api/applications", # GET - "get_profiles_by_app_id": "/api/identity-profiles/paginate", # POST : Body = {"page":null,"limit":100,"search":{},"clientId":"{app_id}","type":"user"} - "get_users_by_profile_id": "/api/identity-profiles/{profile_id}/users", # GET - } - }, - "RC": { - "app_id": "602aea51-cdb2-4f73-ac99-fd84050dc393", - "base_url": "https://api-hcp.ziwig-connect.com", - "endpoints": { - "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}} - "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} - "organizations": "/api/inclusions/getAllOrganizations", # GET - "statistics": "/api/inclusions/inclusion-statistics", # POST : Body = {"center": "rc_endobest_current_center}}", "protocolId": "{rc_endobest_prot_id}", "excludedCenters": {rc_endobest_excl_centers}} - "search_inclusions": "/api/inclusions/search?limit={limit}&page={page}", # POST : Body = {"protocolId": "3c7bcb4d-91ed-4e9f-b93f-99d8447a276e", "center": organization_id, "keywords": ""} - "record_by_patient": "/api/records/byPatient", # POST : Body = {"center": "{rc_endobest_current_center}", "patientId": "{patient_id}", "mode": "exchange", "state": "ongoing", "includeEndoParcour": false, "sourceClient": "pro_prm"}, - "surveys": "/api/surveys/filter/with-answers", #POST : Body = {"context": "clinic_research", "subject": "{patient_id}", "blockedQcmVersions": {blocked_qcm_versions}} - } - }, - "GDD": { - "app_id": "4f5ac063-6a22-4e2c-bda5-b50c0dddab79", - "base_url": "https://api-lab.ziwig-connect.com", - "endpoints": { - "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}} - "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} - "request_by_tube": "/api/requests/by-tube-id/{tube_id}", # GET - } - }, - "HRD": { - "app_id": "93bc44fd-c64b-4fff-a450-f3cba956e934", - "base_url": "https://api-resources.ziwig-connect.com", - "endpoints": { - "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}} - "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} - "pro_by_id": "api/entity-manager/meta/{model}/data/nodes/pro/{pro_id}?relationships=2", # GET - "get_pros_by_endobest_center": "/api/entity-manager/meta/modele_fr/data/orga/{organization_id}/centers/pros?limit=1000", # GET - } - }, -} - - -# ============================================================================ -# CONFIGURATION - THREADING -# ============================================================================ - -MAX_THREADS = 20 # Maximum threads for main pool -SUBTASKS_POOL_SIZE = 40 # Fixed size for subtasks pool - - -# ============================================================================ -# CONFIGURATION - RETRY & TIMEOUTS -# ============================================================================ - -ERROR_MAX_RETRY = 10 # Max retry attempts for API calls -WAIT_BEFORE_RETRY = 0.5 # Delay in seconds between retries (fixed) -API_TIMEOUT = 60 # Default timeout for API calls (seconds) -MAX_BATCHS_OF_RETRIES = 3 # Max batches of retries for API calls -WAIT_BEFORE_NEW_BATCH_OF_RETRIES = 5 # Delay in seconds between retry batches - - -# ============================================================================ -# CONFIGURATION - LOGGING -# ============================================================================ - -LOG_LEVEL = logging.INFO # Change to DEBUG for detailed logs -LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' -# LOG_FILE_NAME auto-generated based on script name in __main__ - - -# ============================================================================ -# CONFIGURATION - PROGRESS BARS -# ============================================================================ - -BAR_N_FMT_WIDTH = 4 -BAR_TOTAL_FMT_WIDTH = 4 -BAR_TIME_WIDTH = 8 -BAR_RATE_WIDTH = 10 - -custom_bar_format = ("{l_bar}{bar}" - f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} " - f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, " - f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}") - - -# ============================================================================ -# GLOBAL VARIABLES -# ============================================================================ - -# Tokens storage: {app_name: {"access_token": ..., "refresh_token": ...}} -tokens = {} - -# Thread-safe HTTP client pool (one client per thread) -httpx_clients = {} - -# Thread management -threads_list = [] -_threads_list_lock = threading.Lock() -_token_refresh_lock = threading.Lock() - -# Thread pools (initialized in main()) -main_thread_pool = None -subtasks_thread_pool = None - -# User interaction lock -_user_interaction_lock = threading.Lock() - -# Thread-local storage for context -thread_local_storage = threading.local() - -# Rich console for formatted output -console = Console() - - -# ============================================================================ -# UTILITIES -# ============================================================================ - -def get_nested_value(data_structure, path, default=None): - """ - Extract value from nested dict/list structures with wildcard support. - - Args: - data_structure: Nested dict/list to navigate - path: List of keys/indices. Use '*' for list wildcard - default: Value to return if path not found - - Returns: - Value at path, or default if not found - - Examples: - get_nested_value({"a": {"b": 1}}, ["a", "b"]) -> 1 - get_nested_value({"items": [{"x": 1}, {"x": 2}]}, ["items", "*", "x"]) -> [1, 2] - """ - if data_structure is None: - return "$$$$ No Data" - if not path: - return default - - # Handle wildcard in path - if "*" in path: - wildcard_index = path.index("*") - path_before = path[:wildcard_index] - path_after = path[wildcard_index+1:] - - # Helper for non-wildcard path resolution - def _get_simple_nested_value(ds, p, d): - cl = ds - for k in p: - if isinstance(cl, dict): - cl = cl.get(k) - elif isinstance(cl, list): - try: - if isinstance(k, int) and -len(cl) <= k < len(cl): - cl = cl[k] - else: - return d - except (IndexError, TypeError): - return d - else: - return d - if cl is None: - return d - return cl - - base_level = _get_simple_nested_value(data_structure, path_before, default) - - if not isinstance(base_level, list): - return default - - results = [] - for item in base_level: - value = get_nested_value(item, path_after, default) - if value is not default and value != "$$$$ No Data": - results.append(value) - - # Flatten one level for multiple wildcards - final_results = [] - for res in results: - if isinstance(res, list): - final_results.extend(res) - else: - final_results.append(res) - - return final_results - - # No wildcard - standard traversal - current_level = data_structure - for key_or_index in path: - if isinstance(current_level, dict): - current_level = current_level.get(key_or_index) - if current_level is None: - return default - elif isinstance(current_level, list): - try: - if isinstance(key_or_index, int) and -len(current_level) <= key_or_index < len(current_level): - current_level = current_level[key_or_index] - else: - return default - except (IndexError, TypeError): - return default - else: - return default - return current_level - - -def get_httpx_client() -> httpx.Client: - """ - Get or create thread-local HTTP client with keep-alive enabled. - Each thread gets its own client to avoid connection conflicts. - - Returns: - httpx.Client instance for current thread - """ - global httpx_clients - thread_id = threading.get_ident() - if thread_id not in httpx_clients: - httpx_clients[thread_id] = httpx.Client( - headers={"Connection": "keep-alive"}, - limits=httpx.Limits(max_keepalive_connections=20, max_connections=100) - ) - return httpx_clients[thread_id] - - -def get_thread_position(): - """ - Get position of current thread in threads list. - Used for managing progress bar positions in multithreaded environment. - - Returns: - Zero-based index of current thread - """ - global threads_list - thread_id = threading.get_ident() - with _threads_list_lock: - if thread_id not in threads_list: - threads_list.append(thread_id) - return len(threads_list) - 1 - else: - return threads_list.index(thread_id) - - -def clear_httpx_client(): - """ - Clear the thread-local HTTP client to force creation of a new one. - Useful for resetting connections after errors. - """ - global httpx_clients - thread_id = threading.get_ident() - if thread_id in httpx_clients: - try: - httpx_clients[thread_id].close() - except Exception: - pass - del httpx_clients[thread_id] - - -def run_with_context(func, context, *args, **kwargs): - """ - Wrapper to set thread-local context before running a function in a new thread. - Useful for ThreadPoolExecutor where context is lost. - """ - thread_local_storage.current_patient_context = context - return func(*args, **kwargs) - - -# ============================================================================ -# AUTHENTICATION -# ============================================================================ - -def login(): - """ - Authenticate with IAM and configure tokens for all microservices. - - Process: - 1. Prompt for credentials (with defaults) - 2. Login to IAM -> get master_token and user_id - 3. For each microservice (except IAM): call config-token API - 4. Store access_token and refresh_token for each service - - Returns: - "Success": Authentication succeeded for all services - "Error": Authentication failed (can retry) - "Exit": User cancelled login - """ - global tokens - - # Prompt for credentials - user_name = questionary.text("login:", default=DEFAULT_USER_NAME).ask() - password = questionary.password("password:", default=DEFAULT_PASSWORD).ask() - - if not (user_name and password): - return "Exit" - - # Step 1: Login to IAM - try: - client = get_httpx_client() - client.base_url = MICROSERVICES["IAM"]["base_url"] - response = client.post( - MICROSERVICES["IAM"]["endpoints"]["login"].format(**{**globals(),**locals()}), - json={"username": user_name, "password": password}, - timeout=20 - ) - response.raise_for_status() - master_token = response.json()["access_token"] - user_id = response.json()["userId"] - tokens["IAM"] = { - "access_token": master_token, - "refresh_token": response.json()["refresh_token"] - } - except (httpx.RequestError, httpx.HTTPStatusError) as exc: - print(f"Login Error: {exc}") - logging.warning(f"Login Error: {exc}") - return "Error" - - # Step 2: Configure tokens for each microservice - for app_name, app_config in MICROSERVICES.items(): - if app_name == "IAM": - continue # IAM doesn't need config-token - - try: - client = get_httpx_client() - client.base_url = app_config["base_url"] - response = client.post( - app_config["endpoints"]["config_token"].format(**{**globals(),**locals()}), - headers={"Authorization": f"Bearer {master_token}"}, - json={ - "userId": user_id, - "clientId": app_config["app_id"], - "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36" - }, - timeout=20 - ) - response.raise_for_status() - tokens[app_name] = { - "access_token": response.json()["access_token"], - "refresh_token": response.json()["refresh_token"] - } - except (httpx.RequestError, httpx.HTTPStatusError) as exc: - print(f"Config-token Error for {app_name}: {exc}") - logging.warning(f"Config-token Error for {app_name}: {exc}") - return "Error" - - print("\nLogin Success") - return "Success" - - -def new_token(app): - """ - Refresh access token for a specific microservice. - - Uses refresh_token to obtain new access_token and refresh_token. - Thread-safe with lock to prevent concurrent refresh attempts. - - Args: - app: Microservice name (e.g., "RC", "GDD") - - Raises: - httpx.RequestError: If refresh fails after all retries - """ - global tokens - - with _token_refresh_lock: - for attempt in range(ERROR_MAX_RETRY): - try: - client = get_httpx_client() - client.base_url = MICROSERVICES[app]["base_url"] - response = client.post( - MICROSERVICES[app]["endpoints"]["refresh"].format(**{**globals(),**locals()}), - headers={"Authorization": f"Bearer {tokens[app]['access_token']}"}, - json={"refresh_token": tokens[app]["refresh_token"]}, - timeout=20 - ) - response.raise_for_status() - tokens[app]["access_token"] = response.json()["access_token"] - tokens[app]["refresh_token"] = response.json()["refresh_token"] - return - except (httpx.RequestError, httpx.HTTPStatusError) as exc: - logging.warning(f"Refresh Token Error for {app} (Attempt {attempt + 1}): {exc}") - if attempt < ERROR_MAX_RETRY - 1: - sleep(WAIT_BEFORE_RETRY) - - logging.critical(f"Persistent error in refresh_token for {app}") - raise httpx.RequestError(message=f"Persistent error in refresh_token for {app}") - - -# ============================================================================ -# DECORATORS -# ============================================================================ - -def api_call_with_retry(func): - """Decorator for API calls with automatic retry and token refresh on 401 errors""" - @functools.wraps(func) - def wrapper(*args, **kwargs): - func_name = func.__name__ - total_attempts = 0 - batch_count = 1 - - while True: - for attempt in range(ERROR_MAX_RETRY): - total_attempts += 1 - try: - return func(*args, **kwargs) - except (httpx.RequestError, httpx.HTTPStatusError) as exc: - logging.warning(f"Error in {func_name} (Attempt {total_attempts}): {exc}") - - # Refresh the thread-local client if an error occurs - # to avoid potential pool corruption or stale connections - clear_httpx_client() - - if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: - logging.info(f"Token expired for {func_name}. Refreshing token.") - new_token() - - if attempt < ERROR_MAX_RETRY - 1: - sleep(WAIT_BEFORE_RETRY) - else: - # Max retries reached for this batch - if batch_count < MAX_BATCHS_OF_RETRIES: - logging.warning(f"Batch {batch_count}/{MAX_BATCHS_OF_RETRIES} failed for {func_name}. " - f"Waiting {WAIT_BEFORE_NEW_BATCH_OF_RETRIES}s before automatic retry batch.") - batch_count += 1 - sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES) - break # Exit for loop to restart batch in while True - else: - # All automatic batches exhausted, ask the user - with _user_interaction_lock: - console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]") - console.print(f"[red]Exception: {exc}[/red]") - - choice = questionary.select( - f"What would you like to do for {func_name}?", - choices=[ - "Retry (try another batch of retries)", - "Ignore (return None and continue)", - "Stop script (critical error)" - ] - ).ask() - - if choice == "Retry (try another batch of retries)": - logging.info(f"User chose to retry {func_name}. Restarting batch sequence.") - batch_count = 1 # Reset batch counter for the next interactive round - break # Exit for loop to restart batch in while True - elif choice == "Ignore (return None and continue)": - # Retrieve context if available - ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"}) - logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}") - return None - else: - logging.critical(f"User chose to stop script after persistent error in {func_name}.") - raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)") - - return wrapper - - -# ============================================================================ -# API TEMPLATES -# ============================================================================ -# Templates for common API patterns. Duplicate and modify for your needs. -# Remember to: -# - Choose appropriate HTTP method (GET/POST/PUT/DELETE) -# - Update endpoint from MICROSERVICES dict -# - Adjust timeout if needed (global API_TIMEOUT or specific value) -# - Always return response.json() - -@api_call_with_retry("RC") -def get_all_organizations(): - """ - Example API call using GET method. - - Returns: - List of organization dictionaries - """ - client = get_httpx_client() - client.base_url = MICROSERVICES["RC"]["base_url"].format(**{**globals(),**locals()}) - response = client.get( - MICROSERVICES["RC"]["endpoints"]["organizations"], - headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"}, - timeout=API_TIMEOUT - ) - response.raise_for_status() - return response.json() - - -@api_call_with_retry("RC") -def search_inclusions(organization_id, limit, page): - """ - Example API call using POST method with query params and JSON body. - - Args: - organization_id: Organization UUID - limit: Max results per page - page: Page number (1-based) - - Returns: - Dict with "data" key containing list of inclusions - """ - client = get_httpx_client() - client.base_url = MICROSERVICES["RC"]["base_url"].format(**{**globals(),**locals()}) - response = client.post( - f"{MICROSERVICES['RC']['endpoints']['search_inclusions']}?limit={limit}&page={page}", - headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"}, - json={ - "protocolId": "3c7bcb4d-91ed-4e9f-b93f-99d8447a276e", # TODO: Configure if needed - "center": organization_id, - "keywords": "" - }, - timeout=API_TIMEOUT - ) - response.raise_for_status() - return response.json() - - -# ============================================================================ -# MAIN PROCESSING -# ============================================================================ - -def main(): - """ - Main processing function. - - Structure: - 1. Authentication - 2. Configuration (thread count) - 3. Initialization (thread pools, timing) - 4. Main processing block (TODO: implement your logic here) - 5. Finalization (elapsed time) - """ - global main_thread_pool, subtasks_thread_pool, thread_local_storage - - # ========== AUTHENTICATION ========== - print() - login_status = login() - while login_status == "Error": - login_status = login() - if login_status == "Exit": - return - - # ========== CONFIGURATION ========== - print() - number_of_threads = int( - questionary.text( - "Number of threads:", - default="12", - validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS - ).ask() - ) - - # ========== INITIALIZATION ========== - start_time = perf_counter() - - # Initialize thread pools - main_thread_pool = ThreadPoolExecutor(max_workers=number_of_threads) - subtasks_thread_pool = ThreadPoolExecutor(max_workers=SUBTASKS_POOL_SIZE) - - # ========== MAIN PROCESSING BLOCK ========== - print() - console.print("[bold cyan]Starting main processing...[/bold cyan]") - - # TODO: IMPLEMENT YOUR PROCESSING LOGIC HERE - # - # Example pattern with progress bar and multithreading: - # - # items = [...] # Your data to process - # futures = [] - # - # with tqdm(total=len(items), desc="Processing items", - # bar_format=custom_bar_format) as pbar: - # with main_thread_pool as executor: - # - # for item in items: - # - # # Set thread-local context for detailed error logging in decorators - # ctx = {"id": patient_id, "pseudo": pseudo} - # thread_local_storage.current_patient_context = ctx - # - # futures.append(executor.submit(run_with_context, process_item, ctx, item)) - # - # for future in as_completed(futures): - # try: - # result = future.result() - # # Process result here - # pbar.update(1) - # except Exception as exc: - # logging.critical(f"Error in worker: {exc}", exc_info=True) - # print(f"\nCRITICAL ERROR: {exc}") - # executor.shutdown(wait=False, cancel_futures=True) - # raise - # - # Example: Simple test to verify authentication works - # organizations = get_all_organizations() - # console.print(f"[green]Retrieved {len(organizations)} organizations[/green]") - - # ========== FINALIZATION ========== - print() - print(f"Elapsed time: {str(timedelta(seconds=perf_counter() - start_time))}") - - -# ============================================================================ -# ENTRY POINT -# ============================================================================ - -if __name__ == '__main__': - # ========== LOGGING CONFIGURATION ========== - # Auto-generate log filename based on script name - script_name = os.path.splitext(os.path.basename(__file__))[0] - log_file_name = f"{script_name}.log" - - logging.basicConfig( - level=LOG_LEVEL, - format=LOG_FORMAT, - filename=log_file_name, - filemode='w' - ) - - # ========== MAIN EXECUTION ========== - try: - main() - except Exception as e: - logging.critical(f"Script terminated with exception: {e}", exc_info=True) - print(f"\nScript stopped due to error: {e}") - print(traceback.format_exc()) - finally: - # ========== CLEANUP ========== - # Shutdown thread pools gracefully - if 'main_thread_pool' in globals() and main_thread_pool: - main_thread_pool.shutdown(wait=False, cancel_futures=True) - if 'subtasks_thread_pool' in globals() and subtasks_thread_pool: - subtasks_thread_pool.shutdown(wait=False, cancel_futures=True) - - # Pause before exit (prevents console from closing immediately when launched from Windows Explorer) - print('\n') - input("Press Enter to exit...") diff --git a/eb_script_template.bat b/extract_endoconnect_medical_records.bat similarity index 55% rename from eb_script_template.bat rename to extract_endoconnect_medical_records.bat index eb36bb4..bab9356 100644 --- a/eb_script_template.bat +++ b/extract_endoconnect_medical_records.bat @@ -1,4 +1,3 @@ @echo off call C:\PythonProjects\.rcvenv\Scripts\activate.bat -python eb_script_template.py %* - +python extract_endoconnect_medical_records.py %* diff --git a/extract_endoconnect_medical_records.py b/extract_endoconnect_medical_records.py new file mode 100644 index 0000000..8a5ca68 --- /dev/null +++ b/extract_endoconnect_medical_records.py @@ -0,0 +1,922 @@ +""" +Extract Endoconnect Medical Records + +Automated extraction of patient medical records from the Endoconnect platform. + +FEATURES: +- Single-service authentication (Endoconnect) +- Thread-safe HTTP client pool +- Multithreading with progress bars +- Automatic retry on API errors +- Criteria/values configuration from Excel +- JSON export + +QUICK START: +- Run script: python extract_endoconnect_medical_records.py +- Login with credentials +- Confirm/edit professional ID and thread count +- Wait for processing to complete +- Output: JSON file in current directory +""" + +import json +import logging +import os +import sys +import threading +import traceback +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime, timedelta +from time import perf_counter, sleep +import functools + +import httpx +import openpyxl +import questionary +from tqdm import tqdm +from rich.console import Console + + +# ============================================================================ +# CONFIGURATION - CREDENTIALS +# ============================================================================ + +DEFAULT_USER_NAME = "abdel.lhachimi@gmail.com" +DEFAULT_PASSWORD = "GU$y#C#Cv73XFKyT3j6^" + + +# ============================================================================ +# CONFIGURATION - ENDOCONNECT API +# ============================================================================ + +ENDOCONNECT_BASE_URL = "https://api-endo.ziwig.com/api/" + +LOGIN_PATH = "auth/login" +PATIENTS_LIST_PATH = "patients/list" +MEDICAL_RECORD_PATH = "records" +MEDICAL_EVENTS_PATH = "events" + + +# ============================================================================ +# CONFIGURATION - PROFESSIONAL +# ============================================================================ + +DEFAULT_PROFESSIONAL_ID = "99990000005" + + +# ============================================================================ +# CONFIGURATION - PAGINATION +# ============================================================================ + +MAX_PAGE_SIZE = 1000 + + +# ============================================================================ +# CONFIGURATION - THREADING +# ============================================================================ + +MAX_THREADS = 20 + + +# ============================================================================ +# CONFIGURATION - EXCEL CONFIG +# ============================================================================ + +CONFIG_DIR = "config" +CONFIG_WORKBOOK_NAME = "config.xlsx" +CONFIG_CRITERIA_SHEET_NAME = "Criteria" +CONFIG_VALUES_SHEET_NAME = "Criteria_values" + +# Column names - criteria sheet +COL_CRITERIA_ID = "criteria_id" +COL_CRITERIA_LABEL = "criteria_name" +COL_CRITERIA_TYPE = "criteria_type" +COL_CRITERIA_LEVEL1_LABEL = "domaine_name" +COL_CRITERIA_LEVEL2_LABEL = "subdomaine_name" +COL_CRITERIA_ORDER = "criteria_order" + +# Column names - values sheet +COL_VALUE_CRITERIA_ID = "criteria_id" +COL_VALUE_ID = "criteria_value_id" +COL_VALUE_LABEL = "criteria_value" + + +# ============================================================================ +# CONFIGURATION - OUTPUT +# ============================================================================ + +OUTPUT_FILE_NAME = "endoconnect_medical_records" + + +# ============================================================================ +# CONFIGURATION - RETRY & TIMEOUTS +# ============================================================================ + +ERROR_MAX_RETRY = 10 +WAIT_BEFORE_RETRY = 1 +API_TIMEOUT = 600 +MAX_BATCHS_OF_RETRIES = 3 +WAIT_BEFORE_NEW_BATCH_OF_RETRIES = 20 + + +# ============================================================================ +# CONFIGURATION - LOGGING +# ============================================================================ + +LOG_LEVEL = logging.INFO +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' + + +# ============================================================================ +# CONFIGURATION - PROGRESS BARS +# ============================================================================ + +BAR_N_FMT_WIDTH = 4 +BAR_TOTAL_FMT_WIDTH = 4 +BAR_TIME_WIDTH = 8 +BAR_RATE_WIDTH = 10 + +custom_bar_format = ("{l_bar}{bar}" + f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} " + f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, " + f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}") + + +# ============================================================================ +# GLOBAL VARIABLES +# ============================================================================ + +token = None + +# Thread-safe HTTP client pool (one client per thread) +httpx_clients = {} + +# Thread management +threads_list = [] +_threads_list_lock = threading.Lock() + +# Thread pool (initialized in main()) +main_thread_pool = None + +# User interaction lock +_user_interaction_lock = threading.Lock() + +# Thread-local storage for context +thread_local_storage = threading.local() + +# Rich console for formatted output +console = Console() + +# Criteria and values configuration (loaded from Excel) +criteria_config = {} +values_config = {} + + +# ============================================================================ +# UTILITIES +# ============================================================================ + +def get_nested_value(data_structure, path, default=None): + """ + Extract value from nested dict/list structures with wildcard support. + + Args: + data_structure: Nested dict/list to navigate + path: List of keys/indices. Use '*' for list wildcard + default: Value to return if path not found + + Returns: + Value at path, or default if not found + + Examples: + get_nested_value({"a": {"b": 1}}, ["a", "b"]) -> 1 + get_nested_value({"items": [{"x": 1}, {"x": 2}]}, ["items", "*", "x"]) -> [1, 2] + """ + if data_structure is None: + return "$$$$ No Data" + if not path: + return default + + # Handle wildcard in path + if "*" in path: + wildcard_index = path.index("*") + path_before = path[:wildcard_index] + path_after = path[wildcard_index+1:] + + # Helper for non-wildcard path resolution + def _get_simple_nested_value(ds, p, d): + cl = ds + for k in p: + if isinstance(cl, dict): + cl = cl.get(k) + elif isinstance(cl, list): + try: + if isinstance(k, int) and -len(cl) <= k < len(cl): + cl = cl[k] + else: + return d + except (IndexError, TypeError): + return d + else: + return d + if cl is None: + return d + return cl + + base_level = _get_simple_nested_value(data_structure, path_before, default) + + if not isinstance(base_level, list): + return default + + results = [] + for item in base_level: + value = get_nested_value(item, path_after, default) + if value is not default and value != "$$$$ No Data": + results.append(value) + + # Flatten one level for multiple wildcards + final_results = [] + for res in results: + if isinstance(res, list): + final_results.extend(res) + else: + final_results.append(res) + + return final_results + + # No wildcard - standard traversal + current_level = data_structure + for key_or_index in path: + if isinstance(current_level, dict): + current_level = current_level.get(key_or_index) + if current_level is None: + return default + elif isinstance(current_level, list): + try: + if isinstance(key_or_index, int) and -len(current_level) <= key_or_index < len(current_level): + current_level = current_level[key_or_index] + else: + return default + except (IndexError, TypeError): + return default + else: + return default + return current_level + + +def get_httpx_client() -> httpx.Client: + """ + Get or create thread-local HTTP client with keep-alive enabled. + Each thread gets its own client to avoid connection conflicts. + + Returns: + httpx.Client instance for current thread + """ + global httpx_clients + thread_id = threading.get_ident() + if thread_id not in httpx_clients: + httpx_clients[thread_id] = httpx.Client( + headers={"Connection": "keep-alive"}, + limits=httpx.Limits(max_keepalive_connections=20, max_connections=100) + ) + return httpx_clients[thread_id] + + +def get_thread_position(): + """ + Get position of current thread in threads list. + Used for managing progress bar positions in multithreaded environment. + + Returns: + Zero-based index of current thread + """ + global threads_list + thread_id = threading.get_ident() + with _threads_list_lock: + if thread_id not in threads_list: + threads_list.append(thread_id) + return len(threads_list) - 1 + else: + return threads_list.index(thread_id) + + +def clear_httpx_client(): + """ + Clear the thread-local HTTP client to force creation of a new one. + Useful for resetting connections after errors. + """ + global httpx_clients + thread_id = threading.get_ident() + if thread_id in httpx_clients: + try: + httpx_clients[thread_id].close() + except Exception: + pass + del httpx_clients[thread_id] + + +def run_with_context(func, context, *args, **kwargs): + """ + Wrapper to set thread-local context before running a function in a new thread. + Useful for ThreadPoolExecutor where context is lost. + """ + thread_local_storage.current_patient_context = context + return func(*args, **kwargs) + + +# ============================================================================ +# CONFIG PATH RESOLUTION (PyInstaller compatible) +# ============================================================================ + +def get_config_path(): + """Resolve path to the config directory, compatible with PyInstaller packaging.""" + if getattr(sys, '_MEIPASS', None): + return os.path.join(sys._MEIPASS, CONFIG_DIR) + return os.path.join(os.path.dirname(os.path.abspath(__file__)), CONFIG_DIR) + + +# ============================================================================ +# EXCEL CONFIGURATION LOADING +# ============================================================================ + +def load_criteria_config(): + """ + Load criteria and values configuration from the Excel workbook. + + Populates global dicts: + - criteria_config: {criteria_id: {label, type, level1, level2, order}} + - values_config: {criteria_id: {value_id: value_label}} + """ + global criteria_config, values_config + + config_path = os.path.join(get_config_path(), CONFIG_WORKBOOK_NAME) + + if not os.path.exists(config_path): + logging.critical(f"Configuration file not found: {config_path}") + raise FileNotFoundError(f"Configuration file not found: {config_path}") + + wb = openpyxl.load_workbook(config_path, read_only=True, data_only=True) + + # --- Load criteria sheet --- + if CONFIG_CRITERIA_SHEET_NAME not in wb.sheetnames: + logging.critical(f"Sheet '{CONFIG_CRITERIA_SHEET_NAME}' not found in {CONFIG_WORKBOOK_NAME}") + raise ValueError(f"Sheet '{CONFIG_CRITERIA_SHEET_NAME}' not found in {CONFIG_WORKBOOK_NAME}") + + ws_criteria = wb[CONFIG_CRITERIA_SHEET_NAME] + rows = list(ws_criteria.iter_rows(values_only=True)) + + if not rows: + logging.critical(f"Sheet '{CONFIG_CRITERIA_SHEET_NAME}' is empty") + raise ValueError(f"Sheet '{CONFIG_CRITERIA_SHEET_NAME}' is empty") + + header = [str(cell).strip() if cell else "" for cell in rows[0]] + + # Find column indices + required_columns = { + COL_CRITERIA_ID: None, COL_CRITERIA_LABEL: None, COL_CRITERIA_TYPE: None, + COL_CRITERIA_LEVEL1_LABEL: None, COL_CRITERIA_LEVEL2_LABEL: None, COL_CRITERIA_ORDER: None, + } + for col_name in required_columns: + if col_name not in header: + logging.critical(f"Missing column '{col_name}' in '{CONFIG_CRITERIA_SHEET_NAME}' sheet. Available columns: {header}") + raise ValueError(f"Missing column '{col_name}' in '{CONFIG_CRITERIA_SHEET_NAME}' sheet") + required_columns[col_name] = header.index(col_name) + + idx_id = required_columns[COL_CRITERIA_ID] + idx_label = required_columns[COL_CRITERIA_LABEL] + idx_type = required_columns[COL_CRITERIA_TYPE] + idx_level1 = required_columns[COL_CRITERIA_LEVEL1_LABEL] + idx_level2 = required_columns[COL_CRITERIA_LEVEL2_LABEL] + idx_order = required_columns[COL_CRITERIA_ORDER] + + criteria_config = {} + for row in rows[1:]: + crit_id = str(row[idx_id]).strip() if row[idx_id] is not None else None + if not crit_id: + continue + criteria_config[crit_id] = { + "label": str(row[idx_label]).strip() if row[idx_label] else crit_id, + "type": str(row[idx_type]).strip().upper() if row[idx_type] else "TEXT", + "level1": str(row[idx_level1]).strip() if row[idx_level1] else None, + "level2": str(row[idx_level2]).strip() if row[idx_level2] else None, + "order": int(row[idx_order]) if row[idx_order] is not None else 9999, + } + + # --- Load values sheet --- + if CONFIG_VALUES_SHEET_NAME not in wb.sheetnames: + logging.critical(f"Sheet '{CONFIG_VALUES_SHEET_NAME}' not found in {CONFIG_WORKBOOK_NAME}") + raise ValueError(f"Sheet '{CONFIG_VALUES_SHEET_NAME}' not found in {CONFIG_WORKBOOK_NAME}") + + ws_values = wb[CONFIG_VALUES_SHEET_NAME] + rows_v = list(ws_values.iter_rows(values_only=True)) + + if not rows_v: + logging.critical(f"Sheet '{CONFIG_VALUES_SHEET_NAME}' is empty") + raise ValueError(f"Sheet '{CONFIG_VALUES_SHEET_NAME}' is empty") + + header_v = [str(cell).strip() if cell else "" for cell in rows_v[0]] + + required_columns_v = {COL_VALUE_CRITERIA_ID: None, COL_VALUE_ID: None, COL_VALUE_LABEL: None} + for col_name in required_columns_v: + if col_name not in header_v: + logging.critical(f"Missing column '{col_name}' in '{CONFIG_VALUES_SHEET_NAME}' sheet. Available columns: {header_v}") + raise ValueError(f"Missing column '{col_name}' in '{CONFIG_VALUES_SHEET_NAME}' sheet") + required_columns_v[col_name] = header_v.index(col_name) + + idx_v_crit_id = required_columns_v[COL_VALUE_CRITERIA_ID] + idx_v_id = required_columns_v[COL_VALUE_ID] + idx_v_label = required_columns_v[COL_VALUE_LABEL] + + values_config = {} + for row in rows_v[1:]: + crit_id = str(row[idx_v_crit_id]).strip() if row[idx_v_crit_id] is not None else None + val_id = str(row[idx_v_id]).strip() if row[idx_v_id] is not None else None + val_label = str(row[idx_v_label]).strip() if row[idx_v_label] is not None else val_id + if not crit_id or not val_id: + continue + if crit_id not in values_config: + values_config[crit_id] = {} + values_config[crit_id][val_id] = val_label + + wb.close() + + logging.info(f"Loaded {len(criteria_config)} criteria and {sum(len(v) for v in values_config.values())} values from config") + console.print(f"[green]Config loaded: {len(criteria_config)} criteria, " + f"{sum(len(v) for v in values_config.values())} values[/green]") + + +# ============================================================================ +# DECORATOR - API CALL WITH RETRY +# ============================================================================ + +def api_call_with_retry(func): + """Decorator for API calls with automatic retry on errors.""" + @functools.wraps(func) + def wrapper(*args, **kwargs): + func_name = func.__name__ + total_attempts = 0 + batch_count = 1 + + while True: + for attempt in range(ERROR_MAX_RETRY): + total_attempts += 1 + try: + return func(*args, **kwargs) + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "name": "Unknown"}) + logging.warning(f"Error in {func_name} [Patient {ctx['id']}] ({ctx['name']}) (Attempt {total_attempts}): {exc}") + + clear_httpx_client() + + if attempt < ERROR_MAX_RETRY - 1: + sleep(WAIT_BEFORE_RETRY) + else: + if batch_count < MAX_BATCHS_OF_RETRIES: + logging.warning(f"Batch {batch_count}/{MAX_BATCHS_OF_RETRIES} failed for {func_name} " + f"[Patient {ctx['id']}] ({ctx['name']}). " + f"Waiting {WAIT_BEFORE_NEW_BATCH_OF_RETRIES}s before automatic retry batch.") + batch_count += 1 + sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES) + break + else: + with _user_interaction_lock: + console.print(f"\n[bold red]Persistent error in {func_name} [Patient {ctx['id']}] ({ctx['name']}) " + f"after {batch_count} batches ({total_attempts} attempts).[/bold red]") + console.print(f"[red]Exception: {exc}[/red]") + + choice = questionary.select( + f"What would you like to do for {func_name}?", + choices=[ + "Retry (try another batch of retries)", + "Ignore (return None and continue)", + "Stop script (critical error)" + ] + ).ask() + + if choice == "Retry (try another batch of retries)": + logging.info(f"User chose to retry {func_name}. Restarting batch sequence.") + batch_count = 1 + break + elif choice == "Ignore (return None and continue)": + ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "name": "Unknown"}) + logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['name']}). Error: {exc}") + return None + else: + logging.critical(f"User chose to stop script after persistent error in {func_name}.") + raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)") + + return wrapper + + +# ============================================================================ +# AUTHENTICATION +# ============================================================================ + +def login(): + """ + Authenticate with Endoconnect. + + Returns: + "Success": Authentication succeeded + "Error": Authentication failed (can retry) + "Exit": User cancelled login + """ + global token + + user_name = questionary.text("login:", default=DEFAULT_USER_NAME).ask() + password = questionary.password("password:", default=DEFAULT_PASSWORD).ask() + + if not (user_name and password): + return "Exit" + + try: + client = get_httpx_client() + response = client.post( + f"{ENDOCONNECT_BASE_URL}{LOGIN_PATH}", + json={"email": user_name, "password": password}, + timeout=20 + ) + response.raise_for_status() + token = response.json()["token"] + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + print(f"Login Error: {exc}") + logging.warning(f"Login Error: {exc}") + return "Error" + + print("\nLogin Success") + return "Success" + + +# ============================================================================ +# API FUNCTIONS +# ============================================================================ + +@api_call_with_retry +def get_patients(professional_id): + """Get all patients for a given professional ID (RPPS).""" + client = get_httpx_client() + response = client.post( + f"{ENDOCONNECT_BASE_URL}{PATIENTS_LIST_PATH}", + headers={"Authorization": f"Bearer {token}"}, + json={ + "page": 1, + "pageSize": MAX_PAGE_SIZE, + "RPPS": professional_id, + "search": "" + }, + timeout=API_TIMEOUT + ) + response.raise_for_status() + return response.json() + + +@api_call_with_retry +def get_medical_record(patient_id): + """Get medical record for a patient. Returns docs[0] or None.""" + client = get_httpx_client() + response = client.get( + f"{ENDOCONNECT_BASE_URL}{MEDICAL_RECORD_PATH}?profile={patient_id}", + headers={"Authorization": f"Bearer {token}"}, + timeout=API_TIMEOUT + ) + response.raise_for_status() + data = response.json() + + docs = data.get("docs", []) + if len(docs) != 1: + ctx = getattr(thread_local_storage, "current_patient_context", {"id": patient_id, "name": "Unknown"}) + logging.error(f"Expected exactly 1 doc in medical record for patient {ctx['id']} ({ctx['name']}), got {len(docs)}") + if len(docs) == 0: + return None + return docs[0] + + +@api_call_with_retry +def get_medical_events(patient_id): + """Get medical events for a patient. Returns docs array (may be empty).""" + client = get_httpx_client() + response = client.get( + f"{ENDOCONNECT_BASE_URL}{MEDICAL_EVENTS_PATH}?profile={patient_id}", + headers={"Authorization": f"Bearer {token}"}, + timeout=API_TIMEOUT + ) + response.raise_for_status() + data = response.json() + return data.get("docs", []) + + +# ============================================================================ +# RESULT BUILDING +# ============================================================================ + +def resolve_criteria_value(criteria_id, raw_value, patient_id="Unknown"): + """ + Resolve the display value for a criteria answer. + + For TEXT/NUMERIC/DATE: use raw value directly (join with " | " if array). + For MULTIBOOLEAN/CHECKLIST: lookup value labels from values_config. + """ + config = criteria_config.get(criteria_id) + if not config: + logging.warning(f"[Patient {patient_id}] Unknown criteria_id: {criteria_id}, raw_value: {raw_value}") + if isinstance(raw_value, list): + return " | ".join(str(v) for v in raw_value) + return raw_value + + crit_type = config["type"] + + if crit_type in ("TEXT", "NUMERIC", "DATE"): + if isinstance(raw_value, list): + return " | ".join(str(v) for v in raw_value) + return raw_value + + elif crit_type in ("MULTIBOOLEAN", "CHECKLIST"): + val_lookup = values_config.get(criteria_id, {}) + + if not val_lookup: + logging.warning(f"[Patient {patient_id}] No values configured for MULTIBOOLEAN/CHECKLIST criteria '{criteria_id}' (label: {config['label']})") + + if isinstance(raw_value, list): + labels = [] + for v in raw_value: + v_str = str(v).strip() + label = val_lookup.get(v_str) + if label is None: + logging.warning(f"[Patient {patient_id}] Unknown value_id '{v_str}' for criteria '{criteria_id}' (label: {config['label']})") + labels.append(v_str) + else: + labels.append(label) + return " | ".join(labels) + else: + v_str = str(raw_value).strip() + label = val_lookup.get(v_str) + if label is None: + logging.warning(f"[Patient {patient_id}] Unknown value_id '{v_str}' for criteria '{criteria_id}' (label: {config['label']})") + return v_str + return label + + else: + logging.warning(f"[Patient {patient_id}] Unknown criteria type '{crit_type}' for criteria '{criteria_id}' (label: {config['label']})") + if isinstance(raw_value, list): + return " | ".join(str(v) for v in raw_value) + return raw_value + + +def build_detail(answers, patient_id="Unknown", use_nesting=True): + """ + Build an ordered detail object from answers. + + Attributes are inserted in criteria_order. + If use_nesting=True (record_detail): applies level1/level2 nesting. + If use_nesting=False (event_detail): flat structure, no nesting. + """ + if not answers: + return {} + + # Collect answers with their config, filter unknown criteria + enriched = [] + for answer in answers: + crit_id = str(answer.get("criteria", "")).strip() + raw_value = answer.get("value") + config = criteria_config.get(crit_id) + if not config: + logging.warning(f"[Patient {patient_id}] Skipping unknown criteria_id in answers: {crit_id}, raw_value: {raw_value}") + continue + enriched.append({ + "criteria_id": crit_id, + "label": config["label"], + "type": config["type"], + "level1": config["level1"], + "level2": config["level2"], + "order": config["order"], + "raw_value": raw_value, + }) + + # Sort by criteria_order + enriched.sort(key=lambda x: x["order"]) + + # Build the detail dict + detail = {} + for item in enriched: + label = item["label"] + value = resolve_criteria_value(item["criteria_id"], item["raw_value"], patient_id=patient_id) + level1 = item["level1"] + level2 = item["level2"] + + if use_nesting and level1: + if level1 not in detail: + detail[level1] = {} + if level2: + if level2 not in detail[level1]: + detail[level1][level2] = {} + detail[level1][level2][label] = value + else: + detail[level1][label] = value + else: + detail[label] = value + + return detail + + +def process_patient(patient): + """ + Process a single patient: fetch medical record and events, build result object. + + Args: + patient: Patient dict from get_patients response + + Returns: + Dict with patient_ident, record_metadata, record_detail, events + """ + patient_id = patient["_id"] + patient_name = patient.get("fullName", "Unknown") + + # Fetch data (sequential) + record = get_medical_record(patient_id) + if record is None: + logging.warning(f"[Patient {patient_id}] ({patient_name}) No medical record returned, record_detail will be empty") + + events_data = get_medical_events(patient_id) + if events_data is None: + logging.warning(f"[Patient {patient_id}] ({patient_name}) No medical events returned, events will be empty") + + # Build result + result = { + "patient_ident": { + "_id": patient_id, + "fullName": patient_name, + "birthday": patient.get("birthday", ""), + "email": patient.get("email", ""), + }, + "record_metadata": { + "createdAt": patient.get("createdAt", ""), + "isFinishMedicalRecord": patient.get("isFinishMedicalRecord", False), + "lastUpdate": patient.get("lasUpdate", ""), + "finishOn": patient.get("finishOn", ""), + "confirmedEndo": patient.get("confirmedEndo", False), + }, + "record_detail": build_detail(record.get("answers", []), patient_id=patient_id, use_nesting=True) if record else {}, + "events": [ + { + "event_date": evt.get("date", ""), + "event_type": evt.get("type", ""), + "event_detail": build_detail(evt.get("answers", []), patient_id=patient_id, use_nesting=False), + } + for evt in (events_data or []) + ], + } + + return result + + +# ============================================================================ +# MAIN PROCESSING +# ============================================================================ + +def main(): + """ + Main processing function. + + Flow: + 1. Load Excel config + 2. Login + 3. Confirm professional ID + 4. Confirm thread count + 5. Fetch and filter patients + 6. Process patients in thread pool + 7. Sort and export results to JSON + """ + global main_thread_pool + + # ========== LOAD CONFIG ========== + console.print("[bold cyan]Loading criteria configuration...[/bold cyan]") + load_criteria_config() + + # ========== AUTHENTICATION ========== + print() + login_status = login() + while login_status == "Error": + login_status = login() + if login_status == "Exit": + return + + # ========== PROFESSIONAL ID ========== + print() + professional_id = questionary.text( + "Professional ID (RPPS):", + default=DEFAULT_PROFESSIONAL_ID + ).ask() + if not professional_id: + return + + # ========== THREAD COUNT ========== + print() + number_of_threads = int( + questionary.text( + "Number of threads:", + default="12", + validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS + ).ask() + ) + + # ========== INITIALIZATION ========== + start_time = perf_counter() + main_thread_pool = ThreadPoolExecutor(max_workers=number_of_threads) + + # ========== FETCH PATIENTS ========== + print() + console.print("[bold cyan]Fetching patients list...[/bold cyan]") + all_patients = get_patients(professional_id) + + if all_patients is None: + logging.critical(f"Failed to fetch patients list for professional_id={professional_id}. Aborting.") + console.print("[bold red]Failed to fetch patients list. Aborting.[/bold red]") + return + + # Filter: keep only patients with finished medical record + patients = [p for p in all_patients if p.get("isFinishMedicalRecord") is True] + + console.print(f"[green]Total patients: {len(all_patients)} | With finished medical record: {len(patients)}[/green]") + + if not patients: + console.print("[yellow]No patients with finished medical records found. Nothing to process.[/yellow]") + return + + # ========== PROCESS PATIENTS ========== + print() + console.print("[bold cyan]Processing patients...[/bold cyan]") + + output = [] + futures = [] + + with tqdm(total=len(patients), desc="Extracting records", + bar_format=custom_bar_format) as pbar: + with main_thread_pool as executor: + for patient in patients: + ctx = {"id": patient["_id"], "name": patient.get("fullName", "Unknown")} + futures.append( + executor.submit(run_with_context, process_patient, ctx, patient) + ) + + for future in as_completed(futures): + try: + result = future.result() + if result is not None: + output.append(result) + pbar.update(1) + except Exception as exc: + logging.critical(f"Error in worker: {exc}", exc_info=True) + print(f"\nCRITICAL ERROR: {exc}") + executor.shutdown(wait=False, cancel_futures=True) + raise + + # ========== SORT RESULTS ========== + output.sort(key=lambda x: ( + x.get("patient_ident", {}).get("fullName", ""), + x.get("patient_ident", {}).get("email", "") + )) + + # ========== EXPORT JSON ========== + timestamp = datetime.now().strftime("%Y%m%d-%H%M") + output_filename = f"{OUTPUT_FILE_NAME}-{timestamp}.json" + + with open(output_filename, "w", encoding="utf-8") as f: + json.dump(output, f, indent=2, ensure_ascii=False) + + # ========== FINALIZATION ========== + print() + console.print(f"[bold green]Export complete: {len(output)} patients -> {output_filename}[/bold green]") + print(f"Elapsed time: {str(timedelta(seconds=perf_counter() - start_time))}") + + +# ============================================================================ +# ENTRY POINT +# ============================================================================ + +if __name__ == '__main__': + # ========== LOGGING CONFIGURATION ========== + script_name = os.path.splitext(os.path.basename(__file__))[0] + log_file_name = f"{script_name}.log" + + logging.basicConfig( + level=LOG_LEVEL, + format=LOG_FORMAT, + filename=log_file_name, + filemode='w' + ) + + # ========== MAIN EXECUTION ========== + try: + main() + except Exception as e: + logging.critical(f"Script terminated with exception: {e}", exc_info=True) + print(f"\nScript stopped due to error: {e}") + print(traceback.format_exc()) + finally: + # ========== CLEANUP ========== + if 'main_thread_pool' in globals() and main_thread_pool: + main_thread_pool.shutdown(wait=False, cancel_futures=True) + + # Pause before exit + print('\n') + input("Press Enter to exit...")