# Endobest Clinical Research Dashboard Generator # This script automates the collection and processing of patient inclusion data from the Endobest clinical research protocol. # It authenticates with Ziwig's IAM, Research Clinic (RC), and GDD APIs to gather patient records, questionnaire responses, # and diagnostic test results across multiple healthcare organizations. The script generates a comprehensive JSON report # containing all patient data, with 100% configurable fields defined in an Excel configuration file. All fields (patient # identification, inclusion data, clinical records, test requests, and questionnaire responses) are externalized and can be # configured without any code modification. The configuration supports multiple data sources (questionnaires, records, # inclusions, requests), custom functions for business logic (conditional fields, data transformations, pattern matching), # field dependencies and conditions, value transformations (labels, templates), and multi-line field definitions for complex # calculations. Organization names are enriched with center identifiers using a configurable mapping table, enabling seamless # integration with center-based reporting workflows. It employs multithreading with configurable worker pools to parallelize # API calls across organizations and patients, significantly reducing execution time. A single optimized API call retrieves all # questionnaires per patient, improving performance by 4-5x compared to multiple filtered calls. Results are exported as # structured JSON files with nested field groups for easy integration with downstream analytics tools. Built-in quality assurance # includes coherence checks between statistics and detailed data, comprehensive non-regression testing with configurable # Warning/Critical thresholds, and user confirmation prompts when critical issues are detected. Excel export functionality # enables generation of configurable Excel workbooks with data filtering, sorting, value replacement, and formula recalculation # (see eb_dashboard_excel_export.py and DOCUMENTATION_04_EXCEL_EXPORT.md). Key features include automatic token refresh handling, # retry mechanisms for transient API failures, progress tracking with real-time visual feedback, flexible data source # identification, and support for complex data extraction using JSON path expressions. import json import logging import os import re import sys import threading import traceback from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import timedelta, datetime from time import perf_counter, sleep import functools import httpx import openpyxl import questionary from tqdm import tqdm from rich.console import Console # Import centralized constants (SINGLE SOURCE OF TRUTH) from eb_dashboard_constants import ( INCLUSIONS_FILE_NAME, ORGANIZATIONS_FILE_NAME, OLD_FILE_SUFFIX, DASHBOARD_CONFIG_FILE_NAME, INCLUSIONS_MAPPING_TABLE_NAME, ORGANIZATIONS_MAPPING_TABLE_NAME, ORG_CENTER_MAPPING_FILE_NAME, ORG_CENTER_MAPPING_TABLE_NAME, DEFAULT_USER_NAME, DEFAULT_PASSWORD, IAM_URL, RC_URL, RC_APP_ID, GDD_URL, ERROR_MAX_RETRY, WAIT_BEFORE_RETRY, WAIT_BEFORE_NEW_BATCH_OF_RETRIES, MAX_BATCHS_OF_RETRIES, MAX_THREADS, RC_ENDOBEST_PROTOCOL_ID, RC_ENDOBEST_EXCLUDED_CENTERS, BAR_N_FMT_WIDTH, BAR_TOTAL_FMT_WIDTH, BAR_TIME_WIDTH, BAR_RATE_WIDTH, LOG_FILE_NAME, API_TIMEOUT, API_AUTH_LOGIN_ENDPOINT, API_AUTH_CONFIG_TOKEN_ENDPOINT, API_AUTH_REFRESH_TOKEN_ENDPOINT, API_RC_GET_ALL_ORGANIZATIONS_ENDPOINT, API_RC_INCLUSION_STATISTICS_ENDPOINT, API_RC_SEARCH_INCLUSIONS_ENDPOINT, API_RC_GET_RECORD_BY_PATIENT_ENDPOINT, API_RC_GET_SURVEYS_ENDPOINT, API_RC_SEARCH_VISITS_ENDPOINT, API_GDD_GET_REQUEST_BY_TUBE_ID_ENDPOINT ) # Import refactored modules from eb_dashboard_utils import ( get_nested_value, get_httpx_client, clear_httpx_client, get_thread_position, get_config_path, thread_local_storage ) from eb_dashboard_quality_checks import ( backup_output_files, run_quality_checks, run_check_only_mode, set_dependencies as quality_set_dependencies, enable_debug_mode ) from eb_dashboard_excel_export import ( prepare_excel_export, export_excel_only, run_normal_mode_export, set_dependencies as excel_set_dependencies ) logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s', filename=LOG_FILE_NAME, filemode='w') # ============================================================================ # BLOCK 1: CONFIGURATION & BASE INFRASTRUCTURE # ============================================================================ # NOTE: All constants are imported from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH) # --- Global Variables --- access_token = "" refresh_token = "" threads_list = [] _token_refresh_lock = threading.Lock() _threads_list_lock = threading.Lock() global_pbar = None _global_pbar_lock = threading.Lock() _user_interaction_lock = threading.Lock() # Global variables (mutable, set at runtime - not constants) inclusions_mapping_config = [] organizations_mapping_config = [] excel_export_config = None excel_export_enabled = False subtasks_thread_pool = ThreadPoolExecutor(40) httpx_clients = {} console = Console() # Share global variables with utility modules (required for thread-safe operations) import eb_dashboard_utils eb_dashboard_utils.httpx_clients = httpx_clients eb_dashboard_utils.threads_list = threads_list eb_dashboard_utils._threads_list_lock = _threads_list_lock # Inject console instance to modules quality_set_dependencies(console) excel_set_dependencies(console) # Detect and enable debug mode if --debug flag is present (and remove it from argv) if "--debug" in sys.argv: sys.argv.remove("--debug") enable_debug_mode() # --- Progress Bar Configuration --- # NOTE: BAR_N_FMT_WIDTH, BAR_TOTAL_FMT_WIDTH, BAR_TIME_WIDTH, BAR_RATE_WIDTH # are imported from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH) custom_bar_format = ("{l_bar}{bar}" f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} " f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, " f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}") # ============================================================================ # BLOCK 2: DECORATORS & RESILIENCE # ============================================================================ def new_token(): """Refresh access token using the refresh token""" global access_token, refresh_token with _token_refresh_lock: for attempt in range(ERROR_MAX_RETRY): try: client = get_httpx_client() client.base_url = RC_URL response = client.post(API_AUTH_REFRESH_TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}, json={"refresh_token": refresh_token}, timeout=20) response.raise_for_status() access_token = response.json()["access_token"] refresh_token = response.json()["refresh_token"] return except httpx.RequestError as exc: logging.warning(f"Refresh Token Error (Attempt {attempt + 1}) : {exc}") clear_httpx_client() except httpx.HTTPStatusError as exc: logging.warning( f"Refresh Token Error (Attempt {attempt + 1}) : {exc.response.status_code} for Url {exc.request.url}") clear_httpx_client() finally: if attempt < ERROR_MAX_RETRY - 1: sleep(WAIT_BEFORE_RETRY) logging.critical("Persistent error in refresh_token") raise httpx.RequestError(message="Persistent error in refresh_token") def api_call_with_retry(func): """Decorator for API calls with automatic retry and token refresh on 401 errors""" @functools.wraps(func) def wrapper(*args, **kwargs): func_name = func.__name__ total_attempts = 0 batch_count = 1 while True: for attempt in range(ERROR_MAX_RETRY): total_attempts += 1 try: return func(*args, **kwargs) except (httpx.RequestError, httpx.HTTPStatusError) as exc: logging.warning(f"Error in {func_name} (Attempt {total_attempts}): {exc}") # Refresh the thread-local client if an error occurs # to avoid potential pool corruption or stale connections clear_httpx_client() if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: logging.info(f"Token expired for {func_name}. Refreshing token.") new_token() if attempt < ERROR_MAX_RETRY - 1: sleep(WAIT_BEFORE_RETRY) else: # Max retries reached for this batch if batch_count < MAX_BATCHS_OF_RETRIES: logging.warning(f"Batch {batch_count}/{MAX_BATCHS_OF_RETRIES} failed for {func_name}. " f"Waiting {WAIT_BEFORE_NEW_BATCH_OF_RETRIES}s before automatic retry batch.") batch_count += 1 sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES) break # Exit for loop to restart batch in while True else: # All automatic batches exhausted, ask the user with _user_interaction_lock: console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]") console.print(f"[red]Exception: {exc}[/red]") choice = questionary.select( f"What would you like to do for {func_name}?", choices=[ "Retry (try another batch of retries)", "Ignore (return None and continue)", "Stop script (critical error)" ] ).ask() if choice == "Retry (try another batch of retries)": logging.info(f"User chose to retry {func_name}. Restarting batch sequence.") batch_count = 1 # Reset batch counter for the next interactive round break # Exit for loop to restart batch in while True elif choice == "Ignore (return None and continue)": # Retrieve context if available ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"}) logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}") return None else: logging.critical(f"User chose to stop script after persistent error in {func_name}.") raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)") return wrapper # ============================================================================ # BLOCK 3: AUTHENTICATION # ============================================================================ def login(): global access_token, refresh_token user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask()) password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask()) if not (user_name and password): return "Exit" try: client = get_httpx_client() client.base_url = IAM_URL response = client.post(API_AUTH_LOGIN_ENDPOINT, json={"username": user_name, "password": password}, timeout=20) response.raise_for_status() master_token = response.json()["access_token"] user_id = response.json()["userId"] except httpx.RequestError as exc: print(f"Login Error : {exc}") logging.warning(f"Login Error : {exc}") return "Error" except httpx.HTTPStatusError as exc: print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}") logging.warning( f"Login Error : {exc.response.status_code} for Url {exc.request.url}") return "Error" try: client = get_httpx_client() client.base_url = RC_URL response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {master_token}"}, json={"userId": user_id, "clientId": RC_APP_ID, "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}, timeout=20) response.raise_for_status() access_token = response.json()["access_token"] refresh_token = response.json()["refresh_token"] except httpx.RequestError as exc: print(f"Login Error : {exc}") logging.warning(f"Login Error : {exc}") return "Error" except httpx.HTTPStatusError as exc: print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}") logging.warning(f"Login Error : {exc}") return "Error" print() print("Login Success") return "Success" # ============================================================================ # BLOCK 3B: FILE UTILITIES # ============================================================================ def load_json_file(filename): """ Load a JSON file from disk. Args: filename: Path to JSON file Returns: Parsed JSON data or None if file not found or error occurred """ if os.path.exists(filename): try: with open(filename, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logging.warning(f"Could not load JSON file '{filename}': {e}") console.print(f"[yellow]⚠ Warning: Could not load JSON file '{filename}': {e}[/yellow]") return None # ============================================================================ # BLOCK 4: INCLUSIONS MAPPING CONFIGURATION # ============================================================================ def load_inclusions_mapping_config(): """Loads and validates the inclusions mapping configuration from the Excel file.""" global inclusions_mapping_config config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME) try: # Load with data_only=True to read calculated values instead of formulas # (e.g., if mapping columns use formulas like =+1, we get 1, not the formula text) workbook = openpyxl.load_workbook(config_path, data_only=True) except FileNotFoundError: error_msg = f"Error: Configuration file not found at: {config_path}" logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) if INCLUSIONS_MAPPING_TABLE_NAME not in workbook.sheetnames: error_msg = f"Error: Sheet ''{INCLUSIONS_MAPPING_TABLE_NAME}'' not found in the configuration file." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) sheet = workbook[INCLUSIONS_MAPPING_TABLE_NAME] headers = [cell.value for cell in sheet[1]] temp_config = [] for row_index, row in enumerate(sheet.iter_rows(min_row=2, values_only=True), start=2): field_config = dict(zip(headers, row)) # --- Validation and Parsing --- if field_config.get("source_name") == "Not Specified": continue field_name = field_config.get("field_name") if not field_name or not isinstance(field_name, str): error_msg = f"Error in config file, row {row_index}: 'field_name' is mandatory." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) field_config["field_name"] = re.sub(r'\s*\([^)]*\)$', '', field_name).strip() # Parse source_id prefix (q_id=, q_name=, q_category=, record, inclusion, request) source_id_raw = field_config.get("source_id", "") if source_id_raw and isinstance(source_id_raw, str): if source_id_raw.startswith("q_id="): field_config["source_type"] = "q_id" field_config["source_value"] = source_id_raw[5:] elif source_id_raw.startswith("q_name="): field_config["source_type"] = "q_name" field_config["source_value"] = source_id_raw[7:] elif source_id_raw.startswith("q_category="): field_config["source_type"] = "q_category" field_config["source_value"] = source_id_raw[11:] elif source_id_raw == "record": field_config["source_type"] = "record" field_config["source_value"] = None elif source_id_raw == "inclusion": field_config["source_type"] = "inclusion" field_config["source_value"] = None elif source_id_raw == "request": field_config["source_type"] = "request" field_config["source_value"] = None elif source_id_raw == "6_month_visit": field_config["source_type"] = "6_month_visit" field_config["source_value"] = None else: field_config["source_type"] = None field_config["source_value"] = source_id_raw else: field_config["source_type"] = None field_config["source_value"] = None for json_field in ["field_path", "field_condition", "true_if_any", "value_labels"]: value = field_config.get(json_field) if value: if not isinstance(value, str): error_msg = f"Error in config file, row {row_index}, field '{json_field}': Invalid value, must be a JSON string." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) try: field_config[json_field] = json.loads(value) except json.JSONDecodeError: error_msg = f"Error in config file, row {row_index}, field '{json_field}': Invalid JSON format." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) else: field_config[json_field] = None if not field_config.get("field_path"): error_msg = f"Error in config file, row {row_index}: 'field_path' is mandatory when a field is specified." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) temp_config.append(field_config) inclusions_mapping_config = temp_config console.print(f"Loaded {len(inclusions_mapping_config)} fields from inclusions mapping configuration.", style="green") def load_organizations_mapping_config(): """Loads and validates the organizations mapping configuration from the Excel file.""" global organizations_mapping_config config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME) try: # Load with data_only=True to read calculated values instead of formulas # (e.g., if mapping columns use formulas like =+1, we get 1, not the formula text) workbook = openpyxl.load_workbook(config_path, data_only=True) except FileNotFoundError: error_msg = f"Error: Configuration file not found at: {config_path}" logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) if ORGANIZATIONS_MAPPING_TABLE_NAME not in workbook.sheetnames: # Organizations mapping is optional, so return empty config if sheet not found logging.info(f"Sheet '{ORGANIZATIONS_MAPPING_TABLE_NAME}' not found in configuration file. Organizations mapping is optional.") organizations_mapping_config = [] return sheet = workbook[ORGANIZATIONS_MAPPING_TABLE_NAME] headers = [cell.value for cell in sheet[1]] # Filter out None headers (empty columns) headers_filtered = [h for h in headers if h is not None] mapping_config = [] try: for row in sheet.iter_rows(min_row=2, values_only=True): if all(cell is None for cell in row): break # Trim row to match filtered headers length row_filtered = row[:len(headers_filtered)] config_dict = dict(zip(headers_filtered, row_filtered)) mapping_config.append(config_dict) except Exception as e: error_msg = f"Error parsing organizations mapping: {e}" logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) finally: workbook.close() organizations_mapping_config = mapping_config if mapping_config: console.print(f"Loaded {len(organizations_mapping_config)} organizations from organizations mapping configuration.", style="green") else: console.print("No organizations mapping found (this is optional).", style="yellow") # ============================================================================ # BLOCK 5: DATA SEARCH & EXTRACTION # ============================================================================ def _find_questionnaire_by_id(qcm_dict, qcm_id): """Finds a questionnaire by ID (direct dictionary lookup).""" if not isinstance(qcm_dict, dict): return None qcm_data = qcm_dict.get(qcm_id) return qcm_data.get("answers") if qcm_data else None def _find_questionnaire_by_name(qcm_dict, name): """Finds a questionnaire by name (sequential search, returns first match).""" if not isinstance(qcm_dict, dict): return None for qcm in qcm_dict.values(): if get_nested_value(qcm, ["questionnaire", "name"]) == name: return qcm.get("answers") return None def _find_questionnaire_by_category(qcm_dict, category): """Finds a questionnaire by category (sequential search, returns first match).""" if not isinstance(qcm_dict, dict): return None for qcm in qcm_dict.values(): if get_nested_value(qcm, ["questionnaire", "category"]) == category: return qcm.get("answers") return None def _get_field_value_from_questionnaire(all_questionnaires, field_config): """ Gets the raw value for a field from questionnaires (without post-processing). """ # Find questionnaire based on type source_type = field_config.get("source_type") source_value = field_config.get("source_value") if source_type == "q_id": answers = _find_questionnaire_by_id(all_questionnaires, source_value) elif source_type == "q_name": answers = _find_questionnaire_by_name(all_questionnaires, source_value) elif source_type == "q_category": answers = _find_questionnaire_by_category(all_questionnaires, source_value) else: answers = None return get_nested_value(answers, field_config["field_path"], default="undefined") def get_value_from_inclusion(inclusion_dict, key): """Helper to find a key in the new nested inclusion structure.""" for group in inclusion_dict.values(): if isinstance(group, dict) and key in group: return group[key] return None # ============================================================================ # BLOCK 6: CUSTOM FUNCTIONS & FIELD PROCESSING # ============================================================================ def _execute_custom_function(function_name, args, output_inclusion): """Executes a custom function for a calculated field.""" if function_name == "search_in_fields_using_regex": if not args or len(args) < 2: return "$$$$ Argument Error: search_in_fields_using_regex requires at least 2 arguments" regex_pattern = args[0] field_names = args[1:] field_values = [] all_undefined = True for field_name in field_names: value = get_value_from_inclusion(output_inclusion, field_name) field_values.append(value) if value is not None and value != "undefined": all_undefined = False if all_undefined: return "undefined" try: for value in field_values: # We only try to match on string values. if isinstance(value, str) and re.search(regex_pattern, value, re.IGNORECASE): return True except re.error as e: return f"$$$$ Regex Error: {e}" return False elif function_name == "extract_parentheses_content": if not args or len(args) != 1: return "$$$$ Argument Error: extract_parentheses_content requires 1 argument" field_name = args[0] value = get_value_from_inclusion(output_inclusion, field_name) if value is None or value == "undefined": return "undefined" match = re.search(r'\((.*?)\)', str(value)) return match.group(1) if match else "undefined" elif function_name == "append_terminated_suffix": if not args or len(args) != 2: return "$$$$ Argument Error: append_terminated_suffix requires 2 arguments" status = get_value_from_inclusion(output_inclusion, args[0]) is_terminated = get_value_from_inclusion(output_inclusion, args[1]) if status is None or status == "undefined": return "undefined" if not isinstance(is_terminated, bool) or not is_terminated: return status return f"{status} - AP" elif function_name == "if_then_else": # Unified conditional function # Syntax: ["operator", arg1, arg2_optional, result_if_true, result_if_false] # Operators: "is_true", "is_false", "all_true", "is_defined", "is_undefined", "all_defined", "==", "!=" if not args or len(args) < 4: return "$$$$ Argument Error: if_then_else requires at least 4 arguments" operator = args[0] # Helper function to resolve value (literal or field name) def resolve_value(arg): # If boolean literal if isinstance(arg, bool): return arg # If numeric literal if isinstance(arg, (int, float)): return arg # If string literal (starts with $) if isinstance(arg, str) and arg.startswith("$"): return arg[1:] # Remove the $ prefix # Otherwise, treat as field name return get_value_from_inclusion(output_inclusion, arg) # Determine condition based on operator if operator == "is_true": if len(args) != 4: return "$$$$ Argument Error: is_true requires 4 arguments" value = resolve_value(args[1]) if value is None or value == "undefined": return "undefined" condition = (value is True) result_if_true = resolve_value(args[2]) result_if_false = resolve_value(args[3]) elif operator == "is_false": if len(args) != 4: return "$$$$ Argument Error: is_false requires 4 arguments" value = resolve_value(args[1]) if value is None or value == "undefined": return "undefined" condition = (value is False) result_if_true = resolve_value(args[2]) result_if_false = resolve_value(args[3]) elif operator == "all_true": if len(args) != 4: return "$$$$ Argument Error: all_true requires 4 arguments" fields_arg = args[1] if not isinstance(fields_arg, list): return "$$$$ Argument Error: all_true requires arg1 to be a list of field names" conditions = [] for field_name in fields_arg: field_value = get_value_from_inclusion(output_inclusion, field_name) if field_value is None or field_value == "undefined": return "undefined" conditions.append(field_value) condition = all(conditions) result_if_true = resolve_value(args[2]) result_if_false = resolve_value(args[3]) elif operator == "is_defined": if len(args) != 4: return "$$$$ Argument Error: is_defined requires 4 arguments" value = resolve_value(args[1]) condition = (value is not None and value != "undefined") result_if_true = resolve_value(args[2]) result_if_false = resolve_value(args[3]) elif operator == "is_undefined": if len(args) != 4: return "$$$$ Argument Error: is_undefined requires 4 arguments" value = resolve_value(args[1]) condition = (value is None or value == "undefined") result_if_true = resolve_value(args[2]) result_if_false = resolve_value(args[3]) elif operator == "all_defined": if len(args) != 4: return "$$$$ Argument Error: all_defined requires 4 arguments" fields_arg = args[1] if not isinstance(fields_arg, list): return "$$$$ Argument Error: all_defined requires arg1 to be a list of field names" for field_name in fields_arg: field_value = get_value_from_inclusion(output_inclusion, field_name) if field_value is None or field_value == "undefined": condition = False break else: condition = True result_if_true = resolve_value(args[2]) result_if_false = resolve_value(args[3]) elif operator == "==": if len(args) != 5: return "$$$$ Argument Error: == requires 5 arguments" value1 = resolve_value(args[1]) value2 = resolve_value(args[2]) if value1 is None or value1 == "undefined" or value2 is None or value2 == "undefined": return "undefined" condition = (value1 == value2) result_if_true = resolve_value(args[3]) result_if_false = resolve_value(args[4]) elif operator == "!=": if len(args) != 5: return "$$$$ Argument Error: != requires 5 arguments" value1 = resolve_value(args[1]) value2 = resolve_value(args[2]) if value1 is None or value1 == "undefined" or value2 is None or value2 == "undefined": return "undefined" condition = (value1 != value2) result_if_true = resolve_value(args[3]) result_if_false = resolve_value(args[4]) else: return f"$$$$ Unknown Operator: {operator}" return result_if_true if condition else result_if_false return f"$$$$ Unknown Custom Function: {function_name}" def process_inclusions_mapping(output_inclusion, inclusion_data, record_data, request_data, all_questionnaires, six_month_visit_data): """Processes and adds the inclusions mapping fields to the inclusion dictionary.""" for field in inclusions_mapping_config: field_name = field["field_name"] field_group = field.get("field_group", "Extended_Fields") # Default group if not specified final_value = "undefined" # Default value # Check condition condition_field_name = field.get("field_condition") if condition_field_name: condition_value = get_value_from_inclusion(output_inclusion, condition_field_name) if condition_value is None or condition_value == "undefined": final_value = "undefined" elif not isinstance(condition_value, bool): final_value = "$$$$ Condition Field Error" elif not condition_value: final_value = "N/A" # If condition allows, process the field if final_value == "undefined": source_name = field.get("source_name") source_type = field.get("source_type") field_path = field.get("field_path") # Get raw value from appropriate source if source_name == "Calculated": function_name = field.get("source_id") args = field_path final_value = _execute_custom_function(function_name, args, output_inclusion) elif source_type in ["q_id", "q_name", "q_category"]: final_value = _get_field_value_from_questionnaire(all_questionnaires, field) elif source_type == "record": final_value = get_nested_value(record_data, field_path, default="undefined") elif source_type == "inclusion": final_value = get_nested_value(inclusion_data, field_path, default="undefined") elif source_type == "request": final_value = get_nested_value(request_data, field_path, default="undefined") elif source_type == "6_month_visit": final_value = get_nested_value(six_month_visit_data, field_path, default="undefined") else: final_value = f"$$$$ Unknown Source Type: {source_type}" # If the source data itself is missing (e.g., 6-month visit not created), log a warning but continue if final_value == "$$$$ No Data": patient_id = inclusion_data.get("id", "Unknown") pseudo = inclusion_data.get("pseudo", "Unknown") logging.warning(f"No '{source_type}' data source found for Patient {patient_id} / {pseudo} (Field: {field_name})") final_value = "undefined" # Post-processing: Apply true_if_any and value_labels transformations (for all sources) if final_value not in ["undefined", "$$$$ No Data"]: # Check if any value matches check_values = field.get("true_if_any") if check_values: raw_value_set = set(final_value if isinstance(final_value, list) else [final_value]) check_values_set = set(check_values if isinstance(check_values, list) else [check_values]) final_value = not raw_value_set.isdisjoint(check_values_set) # Map value to label value_labels = field.get("value_labels") if value_labels and final_value not in ["$$$$ Format Error : Array expected"]: found = False for label_map in value_labels: if label_map.get("value") == final_value: final_value = get_nested_value(label_map, ["text", "fr"], default=f"$$$$ Value Error : {final_value}") found = True break if not found: final_value = f"$$$$ Value Error : {final_value}" # Post-processing: If the value is a list (e.g. from a wildcard), join it with a pipe. if isinstance(final_value, list): final_value = "|".join(map(str, final_value)) # Post-processing: Format score dictionaries if isinstance(final_value, dict) and 'total' in final_value and 'max' in final_value: final_value = f"{final_value['total']}/{final_value['max']}" # Post-processing: Apply field template field_template = field.get("field_template") if field_template and final_value not in ["undefined", "N/A"] and isinstance(final_value, (str, int, float, bool)): final_value = field_template.replace("$value", str(final_value)) # Ensure the group sub-dictionary exists if field_group not in output_inclusion: output_inclusion[field_group] = {} output_inclusion[field_group][field_name] = final_value # ============================================================================ # BLOCK 7: BUSINESS API CALLS # ============================================================================ @api_call_with_retry def get_all_organizations(): start_time = perf_counter() with console.status("[bold green]Getting Organizations...", spinner="dots"): client = get_httpx_client() client.base_url = RC_URL response = client.get(API_RC_GET_ALL_ORGANIZATIONS_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}, timeout=1200) response.raise_for_status() duration = perf_counter() - start_time console.print(f"Organizations loaded. ({duration:.2f}s)", style="green") return response.json() @api_call_with_retry def _get_organization_statistics(organization_id): client = get_httpx_client() client.base_url = RC_URL response = client.post(API_RC_INCLUSION_STATISTICS_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}, json={"protocolId": RC_ENDOBEST_PROTOCOL_ID, "center": organization_id, "excludedCenters": RC_ENDOBEST_EXCLUDED_CENTERS}, timeout=API_TIMEOUT) response.raise_for_status() return response.json()["statistic"] def get_organization_counters(organization): organization_id = organization['id'] stats = _get_organization_statistics(organization_id) organization["patients_count"] = stats.get("totalInclusions", 0) organization["preincluded_count"] = stats.get("preIncluded", 0) organization["included_count"] = stats.get("included", 0) organization["prematurely_terminated_count"] = stats.get("prematurelyTerminated", 0) return organization @api_call_with_retry def get_organization_inclusions(organization_id, limit, page): client = get_httpx_client() client.base_url = RC_URL response = client.post(f"{API_RC_SEARCH_INCLUSIONS_ENDPOINT}?limit={limit}&page={page}", headers={"Authorization": f"Bearer {access_token}"}, json={"protocolId": RC_ENDOBEST_PROTOCOL_ID, "center": organization_id, "keywords": ""}, timeout=API_TIMEOUT) response.raise_for_status() return response.json()["data"] @api_call_with_retry def get_record_by_patient_id(patient_id, organization_id): client = get_httpx_client() client.base_url = RC_URL response = client.post(API_RC_GET_RECORD_BY_PATIENT_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}, json={"center": organization_id, "patientId": patient_id, "mode": "exchange", "state": "ongoing", "includeEndoParcour": False, "sourceClient": "pro_prm"}, timeout=API_TIMEOUT) response.raise_for_status() return response.json() @api_call_with_retry def get_request_by_tube_id(tube_id): client = get_httpx_client() client.base_url = GDD_URL response = client.get(f"{API_GDD_GET_REQUEST_BY_TUBE_ID_ENDPOINT}/{tube_id}?isAdmin=true&organization=undefined", headers={"Authorization": f"Bearer {access_token}"}, timeout=API_TIMEOUT) response.raise_for_status() return response.json() @api_call_with_retry def search_visit_by_pseudo_and_order(pseudo, order): """Searches for a visit by patient pseudo and visit order.""" client = get_httpx_client() client.base_url = RC_URL response = client.post(API_RC_SEARCH_VISITS_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}, json={"visitOrder": order, "keywords": pseudo}, timeout=API_TIMEOUT) response.raise_for_status() resp_json = response.json() if isinstance(resp_json, dict): data = resp_json.get("data") if isinstance(data, list) and len(data) > 0: return data[0] return None @api_call_with_retry def get_all_questionnaires_by_patient(patient_id, record_data): """Fetches all questionnaires for a patient with a single API call.""" client = get_httpx_client() client.base_url = RC_URL payload = { "context": "clinic_research", "subject": patient_id } # Extract blockedQcmVersions from record (same logic as get_questionnaire_answers) if record_data is None: all_blocked_versions = [] else: all_blocked_versions = get_nested_value(record_data, path=["record", "protocol_inclusions", 0, "blockedQcmVersions"], default=[]) # Ensure it's a list even if get_nested_value returns "$$$$ No Data" if all_blocked_versions == "$$$$ No Data": all_blocked_versions = [] if all_blocked_versions: payload["blockedQcmVersions"] = all_blocked_versions response = client.post(API_RC_GET_SURVEYS_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}, json=payload, timeout=API_TIMEOUT) response.raise_for_status() response_data = response.json() # Build dictionary with questionnaire metadata for searching results = {} for item in response_data: q_id = get_nested_value(item, path=["questionnaire", "id"]) q_name = get_nested_value(item, path=["questionnaire", "name"]) q_category = get_nested_value(item, path=["questionnaire", "category"]) answers = get_nested_value(item, path=["answers"], default={}) if q_id: results[q_id] = { "questionnaire": { "id": q_id, "name": q_name, "category": q_category }, "answers": answers } return results # ============================================================================ # BLOCK 7b: ORGANIZATION CENTER MAPPING # ============================================================================ def load_organization_center_mapping(): """ Loads organization ↔ center mapping from Excel file in script directory. Returns: dict: {organization_name_normalized: center_name} or {} if error/skip """ mapping_file = ORG_CENTER_MAPPING_FILE_NAME if not os.path.exists(mapping_file): console.print(f"[yellow]⚠ Mapping file not found at: {mapping_file}. Skipping center mapping.[/yellow]") return {} try: workbook = openpyxl.load_workbook(mapping_file) except Exception as e: console.print(f"[yellow]⚠ Error loading mapping file: {e}. Skipping center mapping.[/yellow]") logging.warning(f"Error loading mapping file: {e}") return {} if ORG_CENTER_MAPPING_TABLE_NAME not in workbook.sheetnames: console.print(f"[yellow]⚠ Sheet '{ORG_CENTER_MAPPING_TABLE_NAME}' not found in mapping file. Skipping center mapping.[/yellow]") return {} sheet = workbook[ORG_CENTER_MAPPING_TABLE_NAME] headers = [cell.value for cell in sheet[1]] # Validate required columns if "Organization_Name" not in headers or "Center_Name" not in headers: console.print(f"[yellow]⚠ Required columns 'Organization_Name' or 'Center_Name' not found in mapping file. Skipping center mapping.[/yellow]") return {} # Load mapping rows mapping_rows = [] try: for row in sheet.iter_rows(min_row=2, values_only=True): # Skip empty rows if all(cell is None for cell in row): continue row_dict = dict(zip(headers, row)) org_name = row_dict.get("Organization_Name") center_name = row_dict.get("Center_Name") if org_name and center_name: mapping_rows.append({ "Organization_Name": org_name, "Center_Name": center_name }) except Exception as e: console.print(f"[yellow]⚠ Error reading mapping file rows: {e}. Skipping center mapping.[/yellow]") logging.warning(f"Error reading mapping file rows: {e}") return {} # === VALIDATE: Check for duplicates on NORMALIZED versions === org_names_normalized = {} # {normalized: original} center_names_normalized = {} # {normalized: original} for row in mapping_rows: org_name_raw = row["Organization_Name"] center_name_raw = row["Center_Name"] # Normalize org_normalized = org_name_raw.strip().lower() if isinstance(org_name_raw, str) else str(org_name_raw).strip().lower() center_normalized = center_name_raw.strip().lower() if isinstance(center_name_raw, str) else str(center_name_raw).strip().lower() # Check for duplicates if org_normalized in org_names_normalized: console.print(f"[yellow]⚠ Duplicate found in Organization_Name: '{org_name_raw}'. Skipping center mapping.[/yellow]") logging.warning(f"Duplicate in Organization_Name: '{org_name_raw}'") return {} if center_normalized in center_names_normalized: console.print(f"[yellow]⚠ Duplicate found in Center_Name: '{center_name_raw}'. Skipping center mapping.[/yellow]") logging.warning(f"Duplicate in Center_Name: '{center_name_raw}'") return {} # Store normalized version org_names_normalized[org_normalized] = org_name_raw center_names_normalized[center_normalized] = center_name_raw # === BUILD MAPPING DICT === mapping_dict = {} for row in mapping_rows: org_name_raw = row["Organization_Name"] center_name_raw = row["Center_Name"] # Normalize key, keep center_name clean (strip but not lower) org_normalized = org_name_raw.strip().lower() if isinstance(org_name_raw, str) else str(org_name_raw).strip().lower() center_clean = center_name_raw.strip() if isinstance(center_name_raw, str) else str(center_name_raw).strip() mapping_dict[org_normalized] = center_clean return mapping_dict def apply_center_mapping(organizations_list, mapping_dict): """ Applies organization → center mapping to organizations list. Adds 'Center_Name' field to each organization only if mapping succeeded. Args: organizations_list: List of organization dicts mapping_dict: {organization_name_normalized: center_name} """ if not mapping_dict: # Mapping dict is empty due to error → skip mapping return unmapped = [] for org in organizations_list: org_name = org.get("name", "") org_name_normalized = org_name.strip().lower() # Try to find match in mapping dict if org_name_normalized in mapping_dict: org["Center_Name"] = mapping_dict[org_name_normalized] else: # Fallback to organization name org["Center_Name"] = org_name unmapped.append(org_name) # Display results if not unmapped: console.print(f"[green]✓ All {len(organizations_list)} organizations mapped successfully.[/green]") else: console.print(f"[yellow]⚠ {len(unmapped)} organization(s) not mapped:[/yellow]") for org_name in sorted(unmapped): console.print(f"[yellow] - {org_name}[/yellow]") # ============================================================================ # BLOCK 8: PROCESSING ORCHESTRATION # ============================================================================ def _process_inclusion_data(inclusion, organization): """Processes a single inclusion record and returns a dictionary.""" organization_id = organization["id"] patient_id = get_nested_value(inclusion, path=["id"]) pseudo = get_nested_value(inclusion, path=["pseudo"], default="Unknown") # Set thread-local context for detailed error logging in decorators thread_local_storage.current_patient_context = {"id": patient_id, "pseudo": pseudo} # Initialize empty output structure output_inclusion = {} # --- Prepare all data sources --- # 1. Launch Visit Search asynchronously (it's slow, ~5s) visit_future = subtasks_thread_pool.submit(search_visit_by_pseudo_and_order, pseudo, 2) # 2. Prepare inclusion_data: enrich inclusion with organization info inclusion_data = dict(inclusion) inclusion_data["organization_id"] = organization_id inclusion_data["organization_name"] = organization["name"] if "Center_Name" in organization: inclusion_data["center_name"] = organization["Center_Name"] # 3. Prepare record_data (sequential as it's often needed for questionnaires) record_data = get_record_by_patient_id(patient_id, organization_id) # 4. Get tube_id for request and launch in parallel with questionnaires tube_id = get_nested_value(record_data, path=["record", "clinicResearchData", 0, "requestMetaData", "tubeId"], default="undefined") request_future = subtasks_thread_pool.submit(get_request_by_tube_id, tube_id) all_questionnaires = get_all_questionnaires_by_patient(patient_id, record_data) # --- Synchronize all asynchronous tasks --- try: request_data = request_future.result() except Exception as e: logging.error(f"Error fetching request data for patient {patient_id}: {e}") request_data = None try: six_month_visit_data = visit_future.result() except Exception as e: logging.error(f"Error searching 6-month visit for patient {pseudo}: {e}") six_month_visit_data = None # --- Process all fields from configuration --- process_inclusions_mapping(output_inclusion, inclusion_data, record_data, request_data, all_questionnaires, six_month_visit_data) return output_inclusion def process_organization(organization, index, total_organizations): global threads_list, global_pbar position = get_thread_position() + 2 organization_id = organization["id"] output_inclusions = [] inclusions = get_organization_inclusions(organization_id, 1000, 1) with tqdm(total=len(inclusions), unit='Incl.', desc=f"{str(index) + "/" + str(total_organizations):<9} - {organization['name'][:40]:<40}", position=position, leave=False, bar_format=custom_bar_format) as incl_pbar: for inclusion in inclusions: output_inclusion = _process_inclusion_data(inclusion, organization) output_inclusions.append(output_inclusion) incl_pbar.update(1) with _global_pbar_lock: if global_pbar: global_pbar.update(1) return output_inclusions # ============================================================================ # BLOCK 9: MAIN EXECUTION # ============================================================================ def main(): global global_pbar, excel_export_config, excel_export_enabled # --- Check for CLI Check_Only mode --- check_only_mode = "--check-only" in sys.argv if check_only_mode: run_check_only_mode(sys.argv) return # --- Check for CLI Excel_Only mode --- excel_only_mode = "--excel-only" in sys.argv if excel_only_mode: # Load mapping configs for Excel export (same as normal workflow) print() load_inclusions_mapping_config() load_organizations_mapping_config() # Completely externalized Excel-only workflow export_excel_only(sys.argv, INCLUSIONS_FILE_NAME, ORGANIZATIONS_FILE_NAME, inclusions_mapping_config, organizations_mapping_config) return # === NORMAL MODE: Full data collection === print() login_status = login() while login_status == "Error": login_status = login() if login_status == "Exit": return print() number_of_threads = int((questionary.text("Number of threads :", default="12", validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask())) print() load_inclusions_mapping_config() load_organizations_mapping_config() # === LOAD AND VALIDATE EXCEL EXPORT CONFIGURATION === print() console.print("[bold cyan]Loading Excel export configuration...[/bold cyan]") # Validate Excel config (no data loading - JSONs don't exist yet in NORMAL MODE) # prepare_excel_export() displays error messages directly to console excel_export_config, has_config_critical, _ = \ prepare_excel_export(inclusions_mapping_config, organizations_mapping_config) # Ask user confirmation if critical errors found if has_config_critical: print() answer = questionary.confirm( "⚠ Critical configuration errors detected. Continue anyway?", default=False ).ask() if not answer: console.print("[bold red]Aborted by user[/bold red]") return else: excel_export_enabled = False # Skip Excel export if user continues despite errors else: excel_export_enabled = True if excel_export_config else False # Config is valid, Excel export can proceed print() start_time = perf_counter() organizations_list = get_all_organizations() organizations_list = [org for org in organizations_list if org["id"] not in RC_ENDOBEST_EXCLUDED_CENTERS] # === APPLY ORGANIZATION CENTER MAPPING === print() print("Mapping organizations to centers...") mapping_dict = load_organization_center_mapping() apply_center_mapping(organizations_list, mapping_dict) print() with ThreadPoolExecutor(max_workers=number_of_threads) as counter_pool: futures = [counter_pool.submit(get_organization_counters, org) for org in organizations_list] organizations_list_with_counters = [] for future in tqdm(as_completed(futures), total=len(futures), desc=f"{'Fetching Organizations Counters':<52}", unit="orgs.", bar_format=custom_bar_format): try: updated_org = future.result() organizations_list_with_counters.append(updated_org) except Exception as exc: print(f"\nCRITICAL ERROR while fetching counters: {exc}") counter_pool.shutdown(wait=False, cancel_futures=True) raise organizations_list = organizations_list_with_counters print() inclusions_total_count = sum(org.get('patients_count', 0) for org in organizations_list) organizations_list.sort(key=lambda org: (-org.get('patients_count', 0), org.get('name', ''))) number_of_organizations = len(organizations_list) print(f"{inclusions_total_count} Inclusions in {number_of_organizations} Organizations...") print() output_inclusions = [] with tqdm(total=inclusions_total_count, unit="incl.", desc=f"{'Overall Progress':<52}", position=0, leave=True, bar_format=custom_bar_format) as overall_progress_pbar: global_pbar = overall_progress_pbar with ThreadPoolExecutor(max_workers=number_of_threads) as thread_pool: futures = [thread_pool.submit(process_organization, organization, index + 1, number_of_organizations) for index, organization in enumerate(organizations_list)] for future in as_completed(futures): try: result = future.result() output_inclusions.extend(result) except Exception as exc: logging.critical(f"Arrêt dû à une exception critique dans un worker: {exc}", exc_info=True) print(f"\nERREUR CRITIQUE dans un thread de traitement, arrêt du processus:") print(f"Exception: {exc}") print("Traceback original du worker:") traceback.print_exc() print( "Signal d'arrêt envoyé au pool principal avec tentative d'annulation des tâches en attente...") thread_pool.shutdown(wait=False, cancel_futures=True) raise def get_sort_key(item): org_name = get_nested_value(item, ["Patient_Identification", "Organisation_Name"], default='') pseudo = get_nested_value(item, ["Patient_Identification", "Pseudo"], default='') date_str = get_nested_value(item, ["Inclusion", "Inclusion_Date"], default='') sort_date = datetime.max if date_str and date_str != "undefined": try: sort_date = datetime.strptime(date_str, '%d/%m/%Y') except (ValueError, TypeError): pass return org_name, sort_date, pseudo try: print() print() print("Sorting results...") output_inclusions.sort(key=get_sort_key) # === QUALITY CHECKS (before backup to avoid losing history on crash) === print() has_coherence_critical, has_regression_critical = run_quality_checks( current_inclusions=output_inclusions, # list: données en mémoire (nouvellement collectées) organizations_list=organizations_list, # list: données en mémoire avec compteurs old_inclusions_filename=INCLUSIONS_FILE_NAME # str: "endobest_inclusions.json" (version courante sur disque) ) # === CHECK FOR CRITICAL ISSUES AND ASK USER CONFIRMATION === if has_coherence_critical or has_regression_critical: print() console.print("[bold red]⚠ CRITICAL issues detected in quality checks![/bold red]") confirm_write = questionary.confirm( "Do you want to write the results anyway?", default=True ).ask() if not confirm_write: console.print("[yellow]✗ Output writing cancelled by user. Files were not modified.[/yellow]") console.print("[yellow] You can re-run the script to try again.[/yellow]") print() print(f"Elapsed time : {str(timedelta(seconds=perf_counter() - start_time))}") return # === BACKUP OLD FILES (only after checks pass and user confirmation) === backup_output_files() # === WRITE NEW FILES === print("Writing files...") with open(INCLUSIONS_FILE_NAME, 'w', encoding='utf-8') as f_json: json.dump(output_inclusions, f_json, indent=4, ensure_ascii=False) with open(ORGANIZATIONS_FILE_NAME, 'w', encoding='utf-8') as f_json: json.dump(organizations_list, f_json, indent=4, ensure_ascii=False) console.print("[green]✓ Data saved to JSON files[/green]") print() # === EXCEL EXPORT === # Completely externalized Excel export workflow run_normal_mode_export(excel_export_enabled, excel_export_config, inclusions_mapping_config, organizations_mapping_config) except IOError as io_err: logging.critical(f"Error while writing json file : {io_err}") print(f"Error while writing json file : {io_err}") except Exception as exc: logging.critical(f"Error while writing json file : {exc}") print(f"Error while writing json file : {exc}") print() print(f"Elapsed time : {str(timedelta(seconds=perf_counter() - start_time))}") if __name__ == '__main__': try: main() except Exception as e: logging.critical(f"Le script principal s'est terminé prématurément à cause d'une exception: {e}", exc_info=True) print(f"Le script s'est arrêté à cause d'une erreur : {e}") finally: if 'subtasks_thread_pool' in globals() and subtasks_thread_pool: subtasks_thread_pool.shutdown(wait=False, cancel_futures=True) print('\n') input("Press Enter to exit...")