# Endobest Clinical Research Dashboard Generator # This script automates the collection and processing of patient inclusion data from the Endobest clinical research protocol. # It authenticates with Ziwig's IAM, Research Clinic (RC), and GDD APIs to gather patient records, questionnaire responses, # and diagnostic test results across multiple healthcare organizations. The script generates a comprehensive JSON report # containing all patient data, with 100% configurable fields defined in an Excel configuration file. All fields (patient # identification, inclusion data, clinical records, test requests, and questionnaire responses) are externalized and can be # configured without any code modification. The configuration supports multiple data sources (questionnaires, records, # inclusions, requests, visits), custom functions for business logic (conditional fields, data transformations, pattern matching), # field dependencies and conditions, value transformations (labels, templates), and multi-line field definitions for complex # calculations. Organization names are enriched with center identifiers using a configurable mapping table, enabling seamless # integration with center-based reporting workflows. It employs multithreading with configurable worker pools to parallelize # API calls across organizations and patients, significantly reducing execution time. A single optimized API call retrieves all # questionnaires per patient, improving performance by 4-5x compared to multiple filtered calls. Results are exported as # structured JSON files with nested field groups for easy integration with downstream analytics tools. Built-in quality assurance # includes coherence checks between statistics and detailed data, comprehensive non-regression testing with configurable # Warning/Critical thresholds, and user confirmation prompts when critical issues are detected. Excel export functionality # enables generation of configurable Excel workbooks with data filtering, sorting, value replacement, and formula recalculation # (see eb_dashboard_excel_export.py and DOCUMENTATION_04_EXCEL_EXPORT.md). Key features include automatic token refresh handling, # retry mechanisms for transient API failures, progress tracking with real-time visual feedback, flexible data source # identification, and support for complex data extraction using JSON path expressions. import json import logging import msvcrt import os import re import sys import threading import traceback from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import timedelta, datetime from time import perf_counter, sleep import functools import httpx import openpyxl import questionary from tqdm import tqdm from rich.console import Console # Import centralized constants (SINGLE SOURCE OF TRUTH) from eb_dashboard_constants import ( INCLUSIONS_FILE_NAME, ORGANIZATIONS_FILE_NAME, OLD_FILE_SUFFIX, DASHBOARD_CONFIG_FILE_NAME, INCLUSIONS_MAPPING_TABLE_NAME, ORGANIZATIONS_MAPPING_TABLE_NAME, ORG_CENTER_MAPPING_FILE_NAME, ORG_CENTER_MAPPING_TABLE_NAME, DEFAULT_USER_NAME, DEFAULT_PASSWORD, IAM_URL, RC_URL, RC_APP_ID, GDD_URL, ERROR_MAX_RETRY, WAIT_BEFORE_RETRY, WAIT_BEFORE_NEW_BATCH_OF_RETRIES, MAX_BATCHS_OF_RETRIES, MAX_THREADS, RC_ENDOBEST_PROTOCOL_ID, RC_ENDOBEST_EXCLUDED_CENTERS, BAR_N_FMT_WIDTH, BAR_TOTAL_FMT_WIDTH, BAR_TIME_WIDTH, BAR_RATE_WIDTH, LOG_FILE_NAME, API_TIMEOUT, API_AUTH_LOGIN_ENDPOINT, API_AUTH_CONFIG_TOKEN_ENDPOINT, API_AUTH_REFRESH_TOKEN_ENDPOINT, API_RC_GET_ALL_ORGANIZATIONS_ENDPOINT, API_RC_INCLUSION_STATISTICS_ENDPOINT, API_RC_SEARCH_INCLUSIONS_ENDPOINT, API_RC_GET_RECORD_BY_PATIENT_ENDPOINT, API_RC_GET_SURVEYS_ENDPOINT, API_RC_SEARCH_VISITS_ENDPOINT, API_GDD_GET_REQUEST_BY_TUBE_ID_ENDPOINT ) # Import refactored modules from eb_dashboard_utils import ( get_nested_value, get_httpx_client, clear_httpx_client, get_thread_position, get_config_path, thread_local_storage, run_with_context ) from eb_dashboard_quality_checks import ( backup_output_files, run_quality_checks, run_check_only_mode, set_dependencies as quality_set_dependencies, enable_debug_mode ) from eb_dashboard_excel_export import ( prepare_excel_export, export_excel_only, run_normal_mode_export, set_dependencies as excel_set_dependencies ) logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s', filename=LOG_FILE_NAME, filemode='w') # ============================================================================ # BLOCK 1: CONFIGURATION & BASE INFRASTRUCTURE # ============================================================================ # NOTE: All constants are imported from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH) # --- Global Variables --- access_token = "" refresh_token = "" threads_list = [] _token_refresh_lock = threading.Lock() on_retry_exhausted = "ask" # "ask" | "ignore" | "abort" — set at startup _threads_list_lock = threading.Lock() global_pbar = None _global_pbar_lock = threading.Lock() _user_interaction_lock = threading.Lock() # Global variables (mutable, set at runtime - not constants) inclusions_mapping_config = [] organizations_mapping_config = [] excel_export_config = None excel_export_enabled = False subtasks_thread_pool = ThreadPoolExecutor(40) httpx_clients = {} console = Console() # Share global variables with utility modules (required for thread-safe operations) import eb_dashboard_utils eb_dashboard_utils.httpx_clients = httpx_clients eb_dashboard_utils.threads_list = threads_list eb_dashboard_utils._threads_list_lock = _threads_list_lock # Inject console instance to modules quality_set_dependencies(console) excel_set_dependencies(console) # Detect and enable debug mode if --debug flag is present (and remove it from argv) if "--debug" in sys.argv: sys.argv.remove("--debug") enable_debug_mode() # --- Progress Bar Configuration --- # NOTE: BAR_N_FMT_WIDTH, BAR_TOTAL_FMT_WIDTH, BAR_TIME_WIDTH, BAR_RATE_WIDTH # are imported from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH) custom_bar_format = ("{l_bar}{bar}" f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} " f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, " f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}") # ============================================================================ # BLOCK 2: DECORATORS & RESILIENCE # ============================================================================ def new_token(): """Refresh access token using the refresh token""" global access_token, refresh_token with _token_refresh_lock: for attempt in range(ERROR_MAX_RETRY): try: client = get_httpx_client() client.base_url = RC_URL response = client.post(API_AUTH_REFRESH_TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}, json={"refresh_token": refresh_token}, timeout=20) response.raise_for_status() access_token = response.json()["access_token"] refresh_token = response.json()["refresh_token"] return except httpx.RequestError as exc: logging.warning(f"Refresh Token Error (Attempt {attempt + 1}) : {exc}") clear_httpx_client() except httpx.HTTPStatusError as exc: logging.warning( f"Refresh Token Error (Attempt {attempt + 1}) : {exc.response.status_code} for Url {exc.request.url}") clear_httpx_client() finally: if attempt < ERROR_MAX_RETRY - 1: sleep(WAIT_BEFORE_RETRY) logging.critical("Persistent error in refresh_token") raise httpx.RequestError(message="Persistent error in refresh_token") def api_call_with_retry(func): """Decorator for API calls with automatic retry and token refresh on 401 errors""" @functools.wraps(func) def wrapper(*args, **kwargs): func_name = func.__name__ total_attempts = 0 batch_count = 1 while True: for attempt in range(ERROR_MAX_RETRY): total_attempts += 1 try: return func(*args, **kwargs) except (httpx.RequestError, httpx.HTTPStatusError) as exc: logging.warning(f"Error in {func_name} (Attempt {total_attempts}): {exc}") # Refresh the thread-local client if an error occurs # to avoid potential pool corruption or stale connections clear_httpx_client() if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: logging.info(f"Token expired for {func_name}. Refreshing token.") new_token() if attempt < ERROR_MAX_RETRY - 1: sleep(WAIT_BEFORE_RETRY) else: # Max retries reached for this batch if batch_count < MAX_BATCHS_OF_RETRIES: logging.warning(f"Batch {batch_count}/{MAX_BATCHS_OF_RETRIES} failed for {func_name}. " f"Waiting {WAIT_BEFORE_NEW_BATCH_OF_RETRIES}s before automatic retry batch.") batch_count += 1 sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES) break # Exit for loop to restart batch in while True else: # All automatic batches exhausted — apply on_retry_exhausted policy with _user_interaction_lock: console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]") console.print(f"[red]Exception: {exc}[/red]") if on_retry_exhausted == "ignore": ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"}) logging.warning(f"[AUTO-IGNORE] Skipping {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}") console.print(f"[yellow]⚠ Auto-ignore: skipping {func_name}.[/yellow]") return None elif on_retry_exhausted == "abort": logging.critical(f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}") console.print(f"[bold red]Auto-abort: stopping script.[/bold red]") raise httpx.RequestError(message=f"Persistent error in {func_name} (auto-aborted)") else: # "ask" — interactive prompt (original behaviour) choice = questionary.select( f"What would you like to do for {func_name}?", choices=[ "Retry (try another batch of retries)", "Ignore (return None and continue)", "Stop script (critical error)" ] ).ask() if choice == "Retry (try another batch of retries)": logging.info(f"User chose to retry {func_name}. Restarting batch sequence.") batch_count = 1 # Reset batch counter for the next interactive round break # Exit for loop to restart batch in while True elif choice == "Ignore (return None and continue)": ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"}) logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}") return None else: logging.critical(f"User chose to stop script after persistent error in {func_name}.") raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)") return wrapper # ============================================================================ # BLOCK 3: AUTHENTICATION # ============================================================================ def login(): global access_token, refresh_token user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask()) password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask()) if not (user_name and password): return "Exit" try: client = get_httpx_client() client.base_url = IAM_URL response = client.post(API_AUTH_LOGIN_ENDPOINT, json={"username": user_name, "password": password}, timeout=20) response.raise_for_status() master_token = response.json()["access_token"] user_id = response.json()["userId"] except httpx.RequestError as exc: print(f"Login Error : {exc}") logging.warning(f"Login Error : {exc}") return "Error" except httpx.HTTPStatusError as exc: print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}") logging.warning( f"Login Error : {exc.response.status_code} for Url {exc.request.url}") return "Error" try: client = get_httpx_client() client.base_url = RC_URL response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {master_token}"}, json={"userId": user_id, "clientId": RC_APP_ID, "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}, timeout=20) response.raise_for_status() access_token = response.json()["access_token"] refresh_token = response.json()["refresh_token"] except httpx.RequestError as exc: print(f"Login Error : {exc}") logging.warning(f"Login Error : {exc}") return "Error" except httpx.HTTPStatusError as exc: print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}") logging.warning(f"Login Error : {exc}") return "Error" print() print("Login Success") return "Success" # ============================================================================ # BLOCK 3B: FILE UTILITIES # ============================================================================ def ask_on_retry_exhausted(): """Asks the user what to do when all API retry batches are exhausted.""" global on_retry_exhausted choice = questionary.select( "On retry exhausted :", choices=[ "Ask (interactive prompt)", "Ignore (return None and continue)", "Abort (stop script)" ] ).ask() if choice is None or choice == "Ask (interactive prompt)": on_retry_exhausted = "ask" elif choice == "Ignore (return None and continue)": on_retry_exhausted = "ignore" else: on_retry_exhausted = "abort" def wait_for_scheduled_launch(): """Asks the user when to start the processing and waits if needed. Options: Immediately / In X minutes / At HH:MM """ choice = questionary.select( "When to start processing ?", choices=["Immediately", "In X minutes", "At HH:MM"] ).ask() if choice is None or choice == "Immediately": return if choice == "In X minutes": minutes_str = questionary.text( "Number of minutes :", validate=lambda x: x.isdigit() and int(x) > 0 ).ask() if not minutes_str: return target_time = datetime.now() + timedelta(minutes=int(minutes_str)) else: # "At HH:MM" time_str = questionary.text( "Start time (HH:MM) :", validate=lambda x: bool(re.match(r'^\d{2}:\d{2}$', x)) and 0 <= int(x.split(':')[0]) <= 23 and 0 <= int(x.split(':')[1]) <= 59 ).ask() if not time_str: return now = datetime.now() h, m = int(time_str.split(':')[0]), int(time_str.split(':')[1]) target_time = now.replace(hour=h, minute=m, second=0, microsecond=0) if target_time <= now: console.print("[yellow]⚠ Specified time is already past. Starting immediately.[/yellow]") return print() try: while True: remaining = target_time - datetime.now() if remaining.total_seconds() <= 0: break total_secs = int(remaining.total_seconds()) h = total_secs // 3600 m = (total_secs % 3600) // 60 s = total_secs % 60 target_str = target_time.strftime('%H:%M:%S') print(f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ", end="", flush=True) sleep(1) # Flush keyboard buffer to prevent stray keystrokes from polluting subsequent prompts while msvcrt.kbhit(): msvcrt.getwch() print() console.print("[green]✓ Starting processing.[/green]") except KeyboardInterrupt: print() console.print("[bold red]Launch cancelled by user.[/bold red]") raise SystemExit(0) def load_json_file(filename): """ Load a JSON file from disk. Args: filename: Path to JSON file Returns: Parsed JSON data or None if file not found or error occurred """ if os.path.exists(filename): try: with open(filename, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logging.warning(f"Could not load JSON file '{filename}': {e}") console.print(f"[yellow]⚠ Warning: Could not load JSON file '{filename}': {e}[/yellow]") return None # ============================================================================ # BLOCK 4: INCLUSIONS MAPPING CONFIGURATION # ============================================================================ def load_inclusions_mapping_config(): """Loads and validates the inclusions mapping configuration from the Excel file.""" global inclusions_mapping_config config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME) try: # Load with data_only=True to read calculated values instead of formulas # (e.g., if mapping columns use formulas like =+1, we get 1, not the formula text) workbook = openpyxl.load_workbook(config_path, data_only=True) except FileNotFoundError: error_msg = f"Error: Configuration file not found at: {config_path}" logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) if INCLUSIONS_MAPPING_TABLE_NAME not in workbook.sheetnames: error_msg = f"Error: Sheet ''{INCLUSIONS_MAPPING_TABLE_NAME}'' not found in the configuration file." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) sheet = workbook[INCLUSIONS_MAPPING_TABLE_NAME] headers = [cell.value for cell in sheet[1]] temp_config = [] for row_index, row in enumerate(sheet.iter_rows(min_row=2, values_only=True), start=2): field_config = dict(zip(headers, row)) # --- Validation and Parsing --- if field_config.get("source_name") == "Not Specified": continue field_name = field_config.get("field_name") if not field_name or not isinstance(field_name, str): error_msg = f"Error in config file, row {row_index}: 'field_name' is mandatory." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) field_config["field_name"] = re.sub(r'\s*\([^)]*\)$', '', field_name).strip() # Parse source_id prefix (q_id=, q_name=, q_category=, record, inclusion, request) source_id_raw = field_config.get("source_id", "") if source_id_raw and isinstance(source_id_raw, str): if source_id_raw.startswith("q_id="): field_config["source_type"] = "q_id" field_config["source_value"] = source_id_raw[5:] elif source_id_raw.startswith("q_name="): field_config["source_type"] = "q_name" field_config["source_value"] = source_id_raw[7:] elif source_id_raw.startswith("q_category="): field_config["source_type"] = "q_category" field_config["source_value"] = source_id_raw[11:] elif source_id_raw == "record": field_config["source_type"] = "record" field_config["source_value"] = None elif source_id_raw == "inclusion": field_config["source_type"] = "inclusion" field_config["source_value"] = None elif source_id_raw == "request": field_config["source_type"] = "request" field_config["source_value"] = None elif source_id_raw == "6_month_visit": field_config["source_type"] = "6_month_visit" field_config["source_value"] = None else: field_config["source_type"] = None field_config["source_value"] = source_id_raw else: field_config["source_type"] = None field_config["source_value"] = None for json_field in ["field_path", "field_condition", "true_if_any", "value_labels"]: value = field_config.get(json_field) if value: if not isinstance(value, str): error_msg = f"Error in config file, row {row_index}, field '{json_field}': Invalid value, must be a JSON string." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) try: field_config[json_field] = json.loads(value) except json.JSONDecodeError: error_msg = f"Error in config file, row {row_index}, field '{json_field}': Invalid JSON format." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) else: field_config[json_field] = None if not field_config.get("field_path"): error_msg = f"Error in config file, row {row_index}: 'field_path' is mandatory when a field is specified." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) temp_config.append(field_config) inclusions_mapping_config = temp_config console.print(f"Loaded {len(inclusions_mapping_config)} fields from inclusions mapping configuration.", style="green") def load_organizations_mapping_config(): """Loads and validates the organizations mapping configuration from the Excel file.""" global organizations_mapping_config config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME) try: # Load with data_only=True to read calculated values instead of formulas # (e.g., if mapping columns use formulas like =+1, we get 1, not the formula text) workbook = openpyxl.load_workbook(config_path, data_only=True) except FileNotFoundError: error_msg = f"Error: Configuration file not found at: {config_path}" logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) if ORGANIZATIONS_MAPPING_TABLE_NAME not in workbook.sheetnames: # Organizations mapping is optional, so return empty config if sheet not found logging.info(f"Sheet '{ORGANIZATIONS_MAPPING_TABLE_NAME}' not found in configuration file. Organizations mapping is optional.") organizations_mapping_config = [] return sheet = workbook[ORGANIZATIONS_MAPPING_TABLE_NAME] headers = [cell.value for cell in sheet[1]] # Filter out None headers (empty columns) headers_filtered = [h for h in headers if h is not None] mapping_config = [] try: for row in sheet.iter_rows(min_row=2, values_only=True): if all(cell is None for cell in row): break # Trim row to match filtered headers length row_filtered = row[:len(headers_filtered)] config_dict = dict(zip(headers_filtered, row_filtered)) mapping_config.append(config_dict) except Exception as e: error_msg = f"Error parsing organizations mapping: {e}" logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) finally: workbook.close() organizations_mapping_config = mapping_config if mapping_config: console.print(f"Loaded {len(organizations_mapping_config)} organizations from organizations mapping configuration.", style="green") else: console.print("No organizations mapping found (this is optional).", style="yellow") # ============================================================================ # BLOCK 5: DATA SEARCH & EXTRACTION # ============================================================================ def _find_questionnaire_by_id(qcm_dict, qcm_id): """Finds a questionnaire by ID (direct dictionary lookup).""" if not isinstance(qcm_dict, dict): return None qcm_data = qcm_dict.get(qcm_id) return qcm_data.get("answers") if qcm_data else None def _find_questionnaire_by_name(qcm_dict, name): """Finds a questionnaire by name (sequential search, returns first match).""" if not isinstance(qcm_dict, dict): return None for qcm in qcm_dict.values(): if get_nested_value(qcm, ["questionnaire", "name"]) == name: return qcm.get("answers") return None def _find_questionnaire_by_category(qcm_dict, category): """Finds a questionnaire by category (sequential search, returns first match).""" if not isinstance(qcm_dict, dict): return None for qcm in qcm_dict.values(): if get_nested_value(qcm, ["questionnaire", "category"]) == category: return qcm.get("answers") return None def _get_field_value_from_questionnaire(all_questionnaires, field_config): """ Gets the raw value for a field from questionnaires (without post-processing). """ # Find questionnaire based on type source_type = field_config.get("source_type") source_value = field_config.get("source_value") if source_type == "q_id": answers = _find_questionnaire_by_id(all_questionnaires, source_value) elif source_type == "q_name": answers = _find_questionnaire_by_name(all_questionnaires, source_value) elif source_type == "q_category": answers = _find_questionnaire_by_category(all_questionnaires, source_value) else: answers = None return get_nested_value(answers, field_config["field_path"], default="undefined") def get_value_from_inclusion(inclusion_dict, key): """Helper to find a key in the new nested inclusion structure.""" for group in inclusion_dict.values(): if isinstance(group, dict) and key in group: return group[key] return None # ============================================================================ # BLOCK 6: CUSTOM FUNCTIONS & FIELD PROCESSING # ============================================================================ def _execute_custom_function(function_name, args, output_inclusion): """Executes a custom function for a calculated field.""" if function_name == "search_in_fields_using_regex": if not args or len(args) < 2: return "$$$$ Argument Error: search_in_fields_using_regex requires at least 2 arguments" regex_pattern = args[0] field_names = args[1:] field_values = [] all_undefined = True for field_name in field_names: value = get_value_from_inclusion(output_inclusion, field_name) field_values.append(value) if value is not None and value != "undefined": all_undefined = False if all_undefined: return "undefined" try: for value in field_values: # We only try to match on string values. if isinstance(value, str) and re.search(regex_pattern, value, re.IGNORECASE): return True except re.error as e: return f"$$$$ Regex Error: {e}" return False elif function_name == "extract_parentheses_content": if not args or len(args) != 1: return "$$$$ Argument Error: extract_parentheses_content requires 1 argument" field_name = args[0] value = get_value_from_inclusion(output_inclusion, field_name) if value is None or value == "undefined": return "undefined" match = re.search(r'\((.*?)\)', str(value)) return match.group(1) if match else "undefined" elif function_name == "append_terminated_suffix": if not args or len(args) != 2: return "$$$$ Argument Error: append_terminated_suffix requires 2 arguments" status = get_value_from_inclusion(output_inclusion, args[0]) is_terminated = get_value_from_inclusion(output_inclusion, args[1]) if status is None or status == "undefined": return "undefined" if not isinstance(is_terminated, bool) or not is_terminated: return status return f"{status} - AP" elif function_name == "if_then_else": # Unified conditional function # Syntax: ["operator", arg1, arg2_optional, result_if_true, result_if_false] # Operators: "is_true", "is_false", "all_true", "is_defined", "is_undefined", "all_defined", "==", "!=" if not args or len(args) < 4: return "$$$$ Argument Error: if_then_else requires at least 4 arguments" operator = args[0] # Helper function to resolve value (literal or field name) def resolve_value(arg): # If boolean literal if isinstance(arg, bool): return arg # If numeric literal if isinstance(arg, (int, float)): return arg # If string literal (starts with $) if isinstance(arg, str) and arg.startswith("$"): return arg[1:] # Remove the $ prefix # Otherwise, treat as field name return get_value_from_inclusion(output_inclusion, arg) # Determine condition based on operator if operator == "is_true": if len(args) != 4: return "$$$$ Argument Error: is_true requires 4 arguments" value = resolve_value(args[1]) if value is None or value == "undefined": return "undefined" condition = (value is True) result_if_true = resolve_value(args[2]) result_if_false = resolve_value(args[3]) elif operator == "is_false": if len(args) != 4: return "$$$$ Argument Error: is_false requires 4 arguments" value = resolve_value(args[1]) if value is None or value == "undefined": return "undefined" condition = (value is False) result_if_true = resolve_value(args[2]) result_if_false = resolve_value(args[3]) elif operator == "all_true": if len(args) != 4: return "$$$$ Argument Error: all_true requires 4 arguments" fields_arg = args[1] if not isinstance(fields_arg, list): return "$$$$ Argument Error: all_true requires arg1 to be a list of field names" conditions = [] for field_name in fields_arg: field_value = get_value_from_inclusion(output_inclusion, field_name) if field_value is None or field_value == "undefined": return "undefined" conditions.append(field_value) condition = all(conditions) result_if_true = resolve_value(args[2]) result_if_false = resolve_value(args[3]) elif operator == "is_defined": if len(args) != 4: return "$$$$ Argument Error: is_defined requires 4 arguments" value = resolve_value(args[1]) condition = (value is not None and value != "undefined") result_if_true = resolve_value(args[2]) result_if_false = resolve_value(args[3]) elif operator == "is_undefined": if len(args) != 4: return "$$$$ Argument Error: is_undefined requires 4 arguments" value = resolve_value(args[1]) condition = (value is None or value == "undefined") result_if_true = resolve_value(args[2]) result_if_false = resolve_value(args[3]) elif operator == "all_defined": if len(args) != 4: return "$$$$ Argument Error: all_defined requires 4 arguments" fields_arg = args[1] if not isinstance(fields_arg, list): return "$$$$ Argument Error: all_defined requires arg1 to be a list of field names" for field_name in fields_arg: field_value = get_value_from_inclusion(output_inclusion, field_name) if field_value is None or field_value == "undefined": condition = False break else: condition = True result_if_true = resolve_value(args[2]) result_if_false = resolve_value(args[3]) elif operator == "==": if len(args) != 5: return "$$$$ Argument Error: == requires 5 arguments" value1 = resolve_value(args[1]) value2 = resolve_value(args[2]) if value1 is None or value1 == "undefined" or value2 is None or value2 == "undefined": return "undefined" condition = (value1 == value2) result_if_true = resolve_value(args[3]) result_if_false = resolve_value(args[4]) elif operator == "!=": if len(args) != 5: return "$$$$ Argument Error: != requires 5 arguments" value1 = resolve_value(args[1]) value2 = resolve_value(args[2]) if value1 is None or value1 == "undefined" or value2 is None or value2 == "undefined": return "undefined" condition = (value1 != value2) result_if_true = resolve_value(args[3]) result_if_false = resolve_value(args[4]) else: return f"$$$$ Unknown Operator: {operator}" return result_if_true if condition else result_if_false return f"$$$$ Unknown Custom Function: {function_name}" def process_inclusions_mapping(output_inclusion, inclusion_data, record_data, request_data, all_questionnaires, six_month_visit_data): """Processes and adds the inclusions mapping fields to the inclusion dictionary.""" for field in inclusions_mapping_config: field_name = field["field_name"] field_group = field.get("field_group", "Extended_Fields") # Default group if not specified final_value = "undefined" # Default value # Check condition condition_field_name = field.get("field_condition") if condition_field_name: condition_value = get_value_from_inclusion(output_inclusion, condition_field_name) if condition_value is None or condition_value == "undefined": final_value = "undefined" elif not isinstance(condition_value, bool): final_value = "$$$$ Condition Field Error" elif not condition_value: final_value = "N/A" # If condition allows, process the field if final_value == "undefined": source_name = field.get("source_name") source_type = field.get("source_type") field_path = field.get("field_path") # Get raw value from appropriate source if source_name == "Calculated": function_name = field.get("source_id") args = field_path final_value = _execute_custom_function(function_name, args, output_inclusion) elif source_type in ["q_id", "q_name", "q_category"]: final_value = _get_field_value_from_questionnaire(all_questionnaires, field) elif source_type == "record": final_value = get_nested_value(record_data, field_path, default="undefined") elif source_type == "inclusion": final_value = get_nested_value(inclusion_data, field_path, default="undefined") elif source_type == "request": final_value = get_nested_value(request_data, field_path, default="undefined") elif source_type == "6_month_visit": final_value = get_nested_value(six_month_visit_data, field_path, default="undefined") else: final_value = f"$$$$ Unknown Source Type: {source_type}" # If the source data itself is missing (e.g., 6-month visit not created), log a warning but continue if final_value == "$$$$ No Data": patient_id = inclusion_data.get("id", "Unknown") pseudo = inclusion_data.get("pseudo", "Unknown") logging.warning(f"No '{source_type}' data source found for Patient {patient_id} / {pseudo} (Field: {field_name})") final_value = "undefined" # Post-processing: Apply true_if_any and value_labels transformations (for all sources) if final_value not in ["undefined", "$$$$ No Data"]: # Check if any value matches check_values = field.get("true_if_any") if check_values: raw_value_set = set(final_value if isinstance(final_value, list) else [final_value]) check_values_set = set(check_values if isinstance(check_values, list) else [check_values]) final_value = not raw_value_set.isdisjoint(check_values_set) # Map value to label value_labels = field.get("value_labels") if value_labels and final_value not in ["$$$$ Format Error : Array expected"]: found = False for label_map in value_labels: if label_map.get("value") == final_value: final_value = get_nested_value(label_map, ["text", "fr"], default=f"$$$$ Value Error : {final_value}") found = True break if not found: final_value = f"$$$$ Value Error : {final_value}" # Post-processing: If the value is a list (e.g. from a wildcard), join it with a pipe. if isinstance(final_value, list): final_value = "|".join(map(str, final_value)) # Post-processing: Format score dictionaries if isinstance(final_value, dict) and 'total' in final_value and 'max' in final_value: final_value = f"{final_value['total']}/{final_value['max']}" # Post-processing: Apply field template field_template = field.get("field_template") if field_template and final_value not in ["undefined", "N/A"] and isinstance(final_value, (str, int, float, bool)): final_value = field_template.replace("$value", str(final_value)) # Ensure the group sub-dictionary exists if field_group not in output_inclusion: output_inclusion[field_group] = {} output_inclusion[field_group][field_name] = final_value # ============================================================================ # BLOCK 7: BUSINESS API CALLS # ============================================================================ @api_call_with_retry def get_all_organizations(): start_time = perf_counter() with console.status("[bold green]Getting Organizations...", spinner="dots"): client = get_httpx_client() client.base_url = RC_URL response = client.get(API_RC_GET_ALL_ORGANIZATIONS_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}, timeout=1200) response.raise_for_status() duration = perf_counter() - start_time console.print(f"Organizations loaded. ({duration:.2f}s)", style="green") return response.json() @api_call_with_retry def _get_organization_statistics(organization_id): client = get_httpx_client() client.base_url = RC_URL response = client.post(API_RC_INCLUSION_STATISTICS_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}, json={"protocolId": RC_ENDOBEST_PROTOCOL_ID, "center": organization_id, "excludedCenters": RC_ENDOBEST_EXCLUDED_CENTERS}, timeout=API_TIMEOUT) response.raise_for_status() return response.json()["statistic"] def get_organization_counters(organization): organization_id = organization['id'] stats = _get_organization_statistics(organization_id) organization["patients_count"] = stats.get("totalInclusions", 0) organization["preincluded_count"] = stats.get("preIncluded", 0) organization["included_count"] = stats.get("included", 0) organization["prematurely_terminated_count"] = stats.get("prematurelyTerminated", 0) return organization @api_call_with_retry def get_organization_inclusions(organization_id, limit, page): client = get_httpx_client() client.base_url = RC_URL response = client.post(f"{API_RC_SEARCH_INCLUSIONS_ENDPOINT}?limit={limit}&page={page}", headers={"Authorization": f"Bearer {access_token}"}, json={"protocolId": RC_ENDOBEST_PROTOCOL_ID, "center": organization_id, "keywords": ""}, timeout=API_TIMEOUT) response.raise_for_status() return response.json()["data"] @api_call_with_retry def get_record_by_patient_id(patient_id, organization_id): client = get_httpx_client() client.base_url = RC_URL response = client.post(API_RC_GET_RECORD_BY_PATIENT_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}, json={"center": organization_id, "patientId": patient_id, "mode": "exchange", "state": "ongoing", "includeEndoParcour": False, "sourceClient": "pro_prm"}, timeout=API_TIMEOUT) response.raise_for_status() return response.json() @api_call_with_retry def get_request_by_tube_id(tube_id): client = get_httpx_client() client.base_url = GDD_URL response = client.get(f"{API_GDD_GET_REQUEST_BY_TUBE_ID_ENDPOINT}/{tube_id}?isAdmin=true&organization=undefined", headers={"Authorization": f"Bearer {access_token}"}, timeout=API_TIMEOUT) response.raise_for_status() return response.json() @api_call_with_retry def search_visit_by_pseudo_and_order(pseudo, order): """Searches for a visit by patient pseudo and visit order.""" client = get_httpx_client() client.base_url = RC_URL response = client.post(API_RC_SEARCH_VISITS_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}, json={"visitOrder": order, "keywords": pseudo}, timeout=API_TIMEOUT) response.raise_for_status() resp_json = response.json() if isinstance(resp_json, dict): data = resp_json.get("data") if isinstance(data, list) and len(data) > 0: return data[0] return None @api_call_with_retry def get_all_questionnaires_by_patient(patient_id, record_data): """Fetches all questionnaires for a patient with a single API call.""" client = get_httpx_client() client.base_url = RC_URL payload = { "context": "clinic_research", "subject": patient_id } # Extract blockedQcmVersions from record (same logic as get_questionnaire_answers) if record_data is None: all_blocked_versions = [] else: all_blocked_versions = get_nested_value(record_data, path=["record", "protocol_inclusions", 0, "blockedQcmVersions"], default=[]) # Ensure it's a list even if get_nested_value returns "$$$$ No Data" if all_blocked_versions == "$$$$ No Data": all_blocked_versions = [] if all_blocked_versions: payload["blockedQcmVersions"] = all_blocked_versions response = client.post(API_RC_GET_SURVEYS_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}, json=payload, timeout=API_TIMEOUT) response.raise_for_status() response_data = response.json() # Build dictionary with questionnaire metadata for searching results = {} for item in response_data: q_id = get_nested_value(item, path=["questionnaire", "id"]) q_name = get_nested_value(item, path=["questionnaire", "name"]) q_category = get_nested_value(item, path=["questionnaire", "category"]) answers = get_nested_value(item, path=["answers"], default={}) if q_id: results[q_id] = { "questionnaire": { "id": q_id, "name": q_name, "category": q_category }, "answers": answers } return results # ============================================================================ # BLOCK 7b: ORGANIZATION CENTER MAPPING # ============================================================================ def load_organization_center_mapping(): """ Loads organization ↔ center mapping from Excel file in script directory. Returns: dict: {organization_name_normalized: center_name} or {} if error/skip """ mapping_file = ORG_CENTER_MAPPING_FILE_NAME if not os.path.exists(mapping_file): console.print(f"[yellow]⚠ Mapping file not found at: {mapping_file}. Skipping center mapping.[/yellow]") return {} try: workbook = openpyxl.load_workbook(mapping_file) except Exception as e: console.print(f"[yellow]⚠ Error loading mapping file: {e}. Skipping center mapping.[/yellow]") logging.warning(f"Error loading mapping file: {e}") return {} if ORG_CENTER_MAPPING_TABLE_NAME not in workbook.sheetnames: console.print(f"[yellow]⚠ Sheet '{ORG_CENTER_MAPPING_TABLE_NAME}' not found in mapping file. Skipping center mapping.[/yellow]") return {} sheet = workbook[ORG_CENTER_MAPPING_TABLE_NAME] headers = [cell.value for cell in sheet[1]] # Validate required columns if "Organization_Name" not in headers or "Center_Name" not in headers: console.print(f"[yellow]⚠ Required columns 'Organization_Name' or 'Center_Name' not found in mapping file. Skipping center mapping.[/yellow]") return {} # Load mapping rows mapping_rows = [] try: for row in sheet.iter_rows(min_row=2, values_only=True): # Skip empty rows if all(cell is None for cell in row): continue row_dict = dict(zip(headers, row)) org_name = row_dict.get("Organization_Name") center_name = row_dict.get("Center_Name") if org_name and center_name: mapping_rows.append({ "Organization_Name": org_name, "Center_Name": center_name }) except Exception as e: console.print(f"[yellow]⚠ Error reading mapping file rows: {e}. Skipping center mapping.[/yellow]") logging.warning(f"Error reading mapping file rows: {e}") return {} # === VALIDATE: Check for duplicates on NORMALIZED versions === org_names_normalized = {} # {normalized: original} center_names_normalized = {} # {normalized: original} for row in mapping_rows: org_name_raw = row["Organization_Name"] center_name_raw = row["Center_Name"] # Normalize org_normalized = org_name_raw.strip().lower() if isinstance(org_name_raw, str) else str(org_name_raw).strip().lower() center_normalized = center_name_raw.strip().lower() if isinstance(center_name_raw, str) else str(center_name_raw).strip().lower() # Check for duplicates if org_normalized in org_names_normalized: console.print(f"[yellow]⚠ Duplicate found in Organization_Name: '{org_name_raw}'. Skipping center mapping.[/yellow]") logging.warning(f"Duplicate in Organization_Name: '{org_name_raw}'") return {} if center_normalized in center_names_normalized: console.print(f"[yellow]⚠ Duplicate found in Center_Name: '{center_name_raw}'. Skipping center mapping.[/yellow]") logging.warning(f"Duplicate in Center_Name: '{center_name_raw}'") return {} # Store normalized version org_names_normalized[org_normalized] = org_name_raw center_names_normalized[center_normalized] = center_name_raw # === BUILD MAPPING DICT === mapping_dict = {} for row in mapping_rows: org_name_raw = row["Organization_Name"] center_name_raw = row["Center_Name"] # Normalize key, keep center_name clean (strip but not lower) org_normalized = org_name_raw.strip().lower() if isinstance(org_name_raw, str) else str(org_name_raw).strip().lower() center_clean = center_name_raw.strip() if isinstance(center_name_raw, str) else str(center_name_raw).strip() mapping_dict[org_normalized] = center_clean return mapping_dict def apply_center_mapping(organizations_list, mapping_dict): """ Applies organization → center mapping to organizations list. Adds 'Center_Name' field to each organization only if mapping succeeded. Args: organizations_list: List of organization dicts mapping_dict: {organization_name_normalized: center_name} """ if not mapping_dict: # Mapping dict is empty due to error → skip mapping return unmapped = [] for org in organizations_list: org_name = org.get("name", "") org_name_normalized = org_name.strip().lower() # Try to find match in mapping dict if org_name_normalized in mapping_dict: org["Center_Name"] = mapping_dict[org_name_normalized] else: # Fallback to organization name org["Center_Name"] = org_name unmapped.append(org_name) # Display results if not unmapped: console.print(f"[green]✓ All {len(organizations_list)} organizations mapped successfully.[/green]") else: console.print(f"[yellow]⚠ {len(unmapped)} organization(s) not mapped:[/yellow]") for org_name in sorted(unmapped): console.print(f"[yellow] - {org_name}[/yellow]") # ============================================================================ # BLOCK 8: PROCESSING ORCHESTRATION # ============================================================================ def _process_inclusion_data(inclusion, organization): """Processes a single inclusion record and returns a dictionary.""" organization_id = organization["id"] patient_id = get_nested_value(inclusion, path=["id"]) pseudo = get_nested_value(inclusion, path=["pseudo"], default="Unknown") # Set thread-local context for detailed error logging in decorators ctx = {"id": patient_id, "pseudo": pseudo} thread_local_storage.current_patient_context = ctx # Initialize empty output structure output_inclusion = {} # --- Prepare all data sources --- # 1. Launch Visit Search asynchronously (it's slow, ~5s) # We use run_with_context to pass the patient identity to the new thread visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2) # 2. Prepare inclusion_data: enrich inclusion with organization info inclusion_data = dict(inclusion) inclusion_data["organization_id"] = organization_id inclusion_data["organization_name"] = organization["name"] if "Center_Name" in organization: inclusion_data["center_name"] = organization["Center_Name"] # 3. Prepare record_data (sequential as it's often needed for questionnaires) record_data = get_record_by_patient_id(patient_id, organization_id) # 4. Get tube_id for request and launch in parallel with questionnaires tube_id = get_nested_value(record_data, path=["record", "clinicResearchData", 0, "requestMetaData", "tubeId"], default="undefined") request_future = subtasks_thread_pool.submit(run_with_context, get_request_by_tube_id, ctx, tube_id) all_questionnaires = get_all_questionnaires_by_patient(patient_id, record_data) # --- Synchronize all asynchronous tasks --- try: request_data = request_future.result() except Exception as e: logging.error(f"Error fetching request data for patient {patient_id}: {e}") request_data = None try: six_month_visit_data = visit_future.result() except Exception as e: logging.error(f"Error searching 6-month visit for patient {pseudo}: {e}") six_month_visit_data = None # --- Process all fields from configuration --- process_inclusions_mapping(output_inclusion, inclusion_data, record_data, request_data, all_questionnaires, six_month_visit_data) return output_inclusion def process_organization(organization, index, total_organizations): global threads_list, global_pbar position = get_thread_position() + 2 organization_id = organization["id"] output_inclusions = [] inclusions = get_organization_inclusions(organization_id, 1000, 1) with tqdm(total=len(inclusions), unit='Incl.', desc=f"{str(index) + "/" + str(total_organizations):<9} - {organization['name'][:40]:<40}", position=position, leave=False, bar_format=custom_bar_format) as incl_pbar: for inclusion in inclusions: output_inclusion = _process_inclusion_data(inclusion, organization) output_inclusions.append(output_inclusion) incl_pbar.update(1) with _global_pbar_lock: if global_pbar: global_pbar.update(1) return output_inclusions # ============================================================================ # BLOCK 9: MAIN EXECUTION # ============================================================================ def main(): global global_pbar, excel_export_config, excel_export_enabled # --- Check for CLI Check_Only mode --- check_only_mode = "--check-only" in sys.argv if check_only_mode: run_check_only_mode(sys.argv) return # --- Check for CLI Excel_Only mode --- excel_only_mode = "--excel-only" in sys.argv if excel_only_mode: # Load mapping configs for Excel export (same as normal workflow) print() load_inclusions_mapping_config() load_organizations_mapping_config() # Completely externalized Excel-only workflow export_excel_only(sys.argv, INCLUSIONS_FILE_NAME, ORGANIZATIONS_FILE_NAME, inclusions_mapping_config, organizations_mapping_config) return # === NORMAL MODE: Full data collection === print() login_status = login() while login_status == "Error": login_status = login() if login_status == "Exit": return print() number_of_threads = int((questionary.text("Number of threads :", default="12", validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask())) print() ask_on_retry_exhausted() print() wait_for_scheduled_launch() print() load_inclusions_mapping_config() load_organizations_mapping_config() # === LOAD AND VALIDATE EXCEL EXPORT CONFIGURATION === print() console.print("[bold cyan]Loading Excel export configuration...[/bold cyan]") # Validate Excel config (no data loading - JSONs don't exist yet in NORMAL MODE) # prepare_excel_export() displays error messages directly to console excel_export_config, has_config_critical, _ = \ prepare_excel_export(inclusions_mapping_config, organizations_mapping_config) # Ask user confirmation if critical errors found if has_config_critical: print() answer = questionary.confirm( "⚠ Critical configuration errors detected. Continue anyway?", default=False ).ask() if not answer: console.print("[bold red]Aborted by user[/bold red]") return else: excel_export_enabled = False # Skip Excel export if user continues despite errors else: excel_export_enabled = True if excel_export_config else False # Config is valid, Excel export can proceed print() start_time = perf_counter() organizations_list = get_all_organizations() organizations_list = [org for org in organizations_list if org["id"] not in RC_ENDOBEST_EXCLUDED_CENTERS] # === APPLY ORGANIZATION CENTER MAPPING === print() print("Mapping organizations to centers...") mapping_dict = load_organization_center_mapping() apply_center_mapping(organizations_list, mapping_dict) print() with ThreadPoolExecutor(max_workers=number_of_threads) as counter_pool: futures = [counter_pool.submit(get_organization_counters, org) for org in organizations_list] organizations_list_with_counters = [] for future in tqdm(as_completed(futures), total=len(futures), desc=f"{'Fetching Organizations Counters':<52}", unit="orgs.", bar_format=custom_bar_format): try: updated_org = future.result() organizations_list_with_counters.append(updated_org) except Exception as exc: print(f"\nCRITICAL ERROR while fetching counters: {exc}") counter_pool.shutdown(wait=False, cancel_futures=True) raise organizations_list = organizations_list_with_counters print() inclusions_total_count = sum(org.get('patients_count', 0) for org in organizations_list) organizations_list.sort(key=lambda org: (-org.get('patients_count', 0), org.get('name', ''))) number_of_organizations = len(organizations_list) print(f"{inclusions_total_count} Inclusions in {number_of_organizations} Organizations...") print() output_inclusions = [] with tqdm(total=inclusions_total_count, unit="incl.", desc=f"{'Overall Progress':<52}", position=0, leave=True, bar_format=custom_bar_format) as overall_progress_pbar: global_pbar = overall_progress_pbar with ThreadPoolExecutor(max_workers=number_of_threads) as thread_pool: futures = [thread_pool.submit(process_organization, organization, index + 1, number_of_organizations) for index, organization in enumerate(organizations_list)] for future in as_completed(futures): try: result = future.result() output_inclusions.extend(result) except Exception as exc: logging.critical(f"Arrêt dû à une exception critique dans un worker: {exc}", exc_info=True) print(f"\nERREUR CRITIQUE dans un thread de traitement, arrêt du processus:") print(f"Exception: {exc}") print("Traceback original du worker:") traceback.print_exc() print( "Signal d'arrêt envoyé au pool principal avec tentative d'annulation des tâches en attente...") thread_pool.shutdown(wait=False, cancel_futures=True) raise def get_sort_key(item): org_name = get_nested_value(item, ["Patient_Identification", "Organisation_Name"], default='') pseudo = get_nested_value(item, ["Patient_Identification", "Pseudo"], default='') date_str = get_nested_value(item, ["Inclusion", "Inclusion_Date"], default='') sort_date = datetime.max if date_str and date_str != "undefined": try: sort_date = datetime.strptime(date_str, '%d/%m/%Y') except (ValueError, TypeError): pass return org_name, sort_date, pseudo try: print() print() print("Sorting results...") output_inclusions.sort(key=get_sort_key) # === QUALITY CHECKS (before backup to avoid losing history on crash) === print() has_coherence_critical, has_regression_critical = run_quality_checks( current_inclusions=output_inclusions, # list: données en mémoire (nouvellement collectées) organizations_list=organizations_list, # list: données en mémoire avec compteurs old_inclusions_filename=INCLUSIONS_FILE_NAME # str: "endobest_inclusions.json" (version courante sur disque) ) # === CHECK FOR CRITICAL ISSUES AND ASK USER CONFIRMATION === if has_coherence_critical or has_regression_critical: print() console.print("[bold red]⚠ CRITICAL issues detected in quality checks![/bold red]") confirm_write = questionary.confirm( "Do you want to write the results anyway?", default=True ).ask() if not confirm_write: console.print("[yellow]✗ Output writing cancelled by user. Files were not modified.[/yellow]") console.print("[yellow] You can re-run the script to try again.[/yellow]") print() print(f"Elapsed time : {str(timedelta(seconds=perf_counter() - start_time))}") return # === BACKUP OLD FILES (only after checks pass and user confirmation) === backup_output_files() # === WRITE NEW FILES === print("Writing files...") with open(INCLUSIONS_FILE_NAME, 'w', encoding='utf-8') as f_json: json.dump(output_inclusions, f_json, indent=4, ensure_ascii=False) with open(ORGANIZATIONS_FILE_NAME, 'w', encoding='utf-8') as f_json: json.dump(organizations_list, f_json, indent=4, ensure_ascii=False) console.print("[green]✓ Data saved to JSON files[/green]") print() # === EXCEL EXPORT === # Completely externalized Excel export workflow run_normal_mode_export(excel_export_enabled, excel_export_config, inclusions_mapping_config, organizations_mapping_config) except IOError as io_err: logging.critical(f"Error while writing json file : {io_err}") print(f"Error while writing json file : {io_err}") except Exception as exc: logging.critical(f"Error while writing json file : {exc}") print(f"Error while writing json file : {exc}") print() print(f"Elapsed time : {str(timedelta(seconds=perf_counter() - start_time))}") if __name__ == '__main__': try: main() except Exception as e: logging.critical(f"Le script principal s'est terminé prématurément à cause d'une exception: {e}", exc_info=True) print(f"Le script s'est arrêté à cause d'une erreur : {e}") finally: if 'subtasks_thread_pool' in globals() and subtasks_thread_pool: subtasks_thread_pool.shutdown(wait=False, cancel_futures=True) print('\n') input("Press Enter to exit...")