# DO (Diagnostic Order) Dashboard Generator # This script automates the collection and processing of diagnostic order requests from the Ziwig GDD platform. # It authenticates with Ziwig's IAM and GDD APIs to gather all worklist requests, their details, and # associated professional (prescriber/requester) information. The script generates a comprehensive JSON # report containing all request data, with 100% configurable fields defined in an Excel configuration file. # All fields are externalized and can be configured without any code modification. The configuration # supports multiple data sources, custom functions for business logic, field dependencies and conditions, # value transformations, and multi-line field definitions for complex calculations. Organization counters # are computed directly from request details (no separate statistics API). It employs multithreading with # configurable worker pools to parallelize request processing across the thread pool, significantly reducing # execution time. Results are exported as structured JSON files for easy integration with downstream analytics # tools. Built-in quality assurance includes comprehensive non-regression testing with configurable # Warning/Critical thresholds, and user confirmation prompts when critical issues are detected. Excel export # functionality enables generation of configurable Excel workbooks with data filtering, sorting, value # replacement, and formula recalculation. Key features include automatic token refresh handling, retry # mechanisms for transient API failures, progress tracking with real-time visual feedback, and support for # complex data extraction using JSON path expressions. import json import logging import msvcrt import os import re import sys import threading import traceback from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import timedelta, datetime from time import perf_counter, sleep import functools import httpx import openpyxl import questionary from tqdm import tqdm from rich.console import Console # Import centralized constants (SINGLE SOURCE OF TRUTH) from do_dashboard_constants import ( REQUESTS_FILE_NAME, ORGANIZATIONS_FILE_NAME, OLD_FILE_SUFFIX, DASHBOARD_CONFIG_FILE_NAME, REQUESTS_MAPPING_TABLE_NAME, ORGANIZATIONS_MAPPING_TABLE_NAME, ORG_CENTER_MAPPING_FILE_NAME, ORG_CENTER_MAPPING_TABLE_NAME, DEFAULT_USER_NAME, DEFAULT_PASSWORD, IAM_URL, GDD_URL, GDD_APP_ID, ERROR_MAX_RETRY, WAIT_BEFORE_RETRY, WAIT_BEFORE_NEW_BATCH_OF_RETRIES, MAX_BATCHS_OF_RETRIES, MAX_THREADS, DO_FILTERS, DO_WORKLIST_PAGE_SIZE, BAR_N_FMT_WIDTH, BAR_TOTAL_FMT_WIDTH, BAR_TIME_WIDTH, BAR_RATE_WIDTH, LOG_FILE_NAME, API_TIMEOUT, API_AUTH_LOGIN_ENDPOINT, API_AUTH_CONFIG_TOKEN_ENDPOINT, API_AUTH_REFRESH_TOKEN_ENDPOINT, API_DO_WORKLIST_ENDPOINT, API_DO_REQUEST_DETAIL_ENDPOINT, API_DO_PROFESSIONALS_ENDPOINT ) # Import refactored modules from do_dashboard_utils import ( get_nested_value, get_httpx_client, clear_httpx_client, get_thread_position, get_config_path, thread_local_storage, run_with_context ) from do_dashboard_quality_checks import ( backup_output_files, run_quality_checks, run_check_only_mode, set_dependencies as quality_set_dependencies, enable_debug_mode ) from do_dashboard_excel_export import ( prepare_excel_export, export_excel_only, run_normal_mode_export, set_dependencies as excel_set_dependencies ) logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s', filename=LOG_FILE_NAME, filemode='w') # ============================================================================ # BLOCK 1: CONFIGURATION & BASE INFRASTRUCTURE # ============================================================================ # NOTE: All constants are imported from do_dashboard_constants.py (SINGLE SOURCE OF TRUTH) # --- Global Variables --- access_token = "" refresh_token = "" threads_list = [] _token_refresh_lock = threading.Lock() on_retry_exhausted = "ask" # "ask" | "ignore" | "abort" — set at startup _stored_username = "" # Credentials stored at login for automatic re-login _stored_password = "" _threads_list_lock = threading.Lock() global_pbar = None _global_pbar_lock = threading.Lock() _user_interaction_lock = threading.Lock() # Global variables (mutable, set at runtime - not constants) requests_mapping_config = [] organizations_mapping_config = [] excel_export_config = None excel_export_enabled = False subtasks_thread_pool = ThreadPoolExecutor(40) httpx_clients = {} console = Console() # Share global variables with utility modules (required for thread-safe operations) import do_dashboard_utils do_dashboard_utils.httpx_clients = httpx_clients do_dashboard_utils.threads_list = threads_list do_dashboard_utils._threads_list_lock = _threads_list_lock # Inject console instance to modules quality_set_dependencies(console) excel_set_dependencies(console) # Detect and enable debug mode if --debug flag is present (and remove it from argv) if "--debug" in sys.argv: sys.argv.remove("--debug") enable_debug_mode() # --- Progress Bar Configuration --- custom_bar_format = ("{l_bar}{bar}" f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} " f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, " f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}") # ============================================================================ # BLOCK 2: DECORATORS & RESILIENCE # ============================================================================ def new_token(): """Refresh access token using the refresh token""" global access_token, refresh_token with _token_refresh_lock: for attempt in range(ERROR_MAX_RETRY): try: client = get_httpx_client() client.base_url = GDD_URL response = client.post(API_AUTH_REFRESH_TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}, json={"refresh_token": refresh_token}, timeout=20) response.raise_for_status() access_token = response.json()["access_token"] refresh_token = response.json()["refresh_token"] return except httpx.RequestError as exc: logging.warning(f"Refresh Token Error (Attempt {attempt + 1}) : {exc}") clear_httpx_client() except httpx.HTTPStatusError as exc: logging.warning( f"Refresh Token Error (Attempt {attempt + 1}) : {exc.response.status_code} for Url {exc.request.url}") clear_httpx_client() finally: if attempt < ERROR_MAX_RETRY - 1: sleep(WAIT_BEFORE_RETRY) # Refresh token exhausted — attempt full re-login with stored credentials logging.warning("Refresh token exhausted. Attempting re-login with stored credentials.") _do_login(_stored_username, _stored_password) logging.info("Re-login successful. New tokens acquired.") def api_call_with_retry(func): """Decorator for API calls with automatic retry and token refresh on 401 errors""" @functools.wraps(func) def wrapper(*args, **kwargs): func_name = func.__name__ total_attempts = 0 batch_count = 1 while True: for attempt in range(ERROR_MAX_RETRY): total_attempts += 1 try: return func(*args, **kwargs) except (httpx.RequestError, httpx.HTTPStatusError) as exc: logging.warning(f"Error in {func_name} (Attempt {total_attempts}): {exc}") clear_httpx_client() if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: logging.info(f"Token expired for {func_name}. Refreshing token.") try: new_token() except (httpx.RequestError, httpx.HTTPStatusError) as token_exc: logging.warning(f"Token refresh/re-login failed for {func_name}: {token_exc}") if attempt < ERROR_MAX_RETRY - 1: sleep(WAIT_BEFORE_RETRY) else: if batch_count < MAX_BATCHS_OF_RETRIES: logging.warning(f"Batch {batch_count}/{MAX_BATCHS_OF_RETRIES} failed for {func_name}. " f"Waiting {WAIT_BEFORE_NEW_BATCH_OF_RETRIES}s before automatic retry batch.") batch_count += 1 sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES) break else: with _user_interaction_lock: if on_retry_exhausted == "ignore": ctx = getattr(thread_local_storage, "current_request_context", {"id": "Unknown"}) logging.warning(f"[AUTO-IGNORE] Skipping {func_name} for Request {ctx['id']}. Error: {exc}") return None elif on_retry_exhausted == "abort": logging.critical(f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}") raise httpx.RequestError(message=f"Persistent error in {func_name} (auto-aborted)") else: # "ask" — display error then interactive prompt console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]") console.print(f"[red]Exception: {exc}[/red]") choice = questionary.select( f"What would you like to do for {func_name}?", choices=[ "Retry (try another batch of retries)", "Ignore (return None and continue)", "Stop script (critical error)" ] ).ask() if choice == "Retry (try another batch of retries)": logging.info(f"User chose to retry {func_name}. Restarting batch sequence.") batch_count = 1 break elif choice == "Ignore (return None and continue)": ctx = getattr(thread_local_storage, "current_request_context", {"id": "Unknown"}) logging.warning(f"[IGNORE] User opted to skip {func_name} for Request {ctx['id']}. Error: {exc}") return None else: logging.critical(f"User chose to stop script after persistent error in {func_name}.") raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)") return wrapper # ============================================================================ # BLOCK 3: AUTHENTICATION # ============================================================================ def _do_login(username, password): """Performs the two-step authentication (IAM → GDD) with the given credentials. Updates global access_token and refresh_token on success. Raises httpx.RequestError or httpx.HTTPStatusError on failure. Must NOT acquire _token_refresh_lock (caller's responsibility). """ global access_token, refresh_token # Step 1: IAM login client = get_httpx_client() client.base_url = IAM_URL response = client.post(API_AUTH_LOGIN_ENDPOINT, json={"username": username, "password": password}, timeout=20) response.raise_for_status() master_token = response.json()["access_token"] user_id = response.json()["userId"] # Step 2: GDD config-token client = get_httpx_client() client.base_url = GDD_URL response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {master_token}"}, json={"userId": user_id, "clientId": GDD_APP_ID, "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}, timeout=20) response.raise_for_status() access_token = response.json()["access_token"] refresh_token = response.json()["refresh_token"] def login(): global _stored_username, _stored_password user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask()) password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask()) if not (user_name and password): return "Exit" try: _do_login(user_name, password) except httpx.RequestError as exc: print(f"Login Error : {exc}") logging.warning(f"Login Error : {exc}") return "Error" except httpx.HTTPStatusError as exc: print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}") logging.warning(f"Login Error : {exc.response.status_code} for Url {exc.request.url}") return "Error" _stored_username = user_name _stored_password = password print() print("Login Success") return "Success" # ============================================================================ # BLOCK 3B: STARTUP PARAMETERS & FILE UTILITIES # ============================================================================ def ask_on_retry_exhausted(): """Asks the user what to do when all API retry batches are exhausted.""" global on_retry_exhausted choice = questionary.select( "On retry exhausted :", choices=[ "Ask (interactive prompt)", "Ignore (return None and continue)", "Abort (stop script)" ] ).ask() if choice is None or choice == "Ask (interactive prompt)": on_retry_exhausted = "ask" elif choice == "Ignore (return None and continue)": on_retry_exhausted = "ignore" else: on_retry_exhausted = "abort" def wait_for_scheduled_launch(): """Asks the user when to start the processing and waits if needed. Options: Immediately / In X minutes / At HH:MM """ choice = questionary.select( "When to start processing ?", choices=["Immediately", "In X minutes", "At HH:MM"] ).ask() if choice is None or choice == "Immediately": return if choice == "In X minutes": minutes_str = questionary.text( "Number of minutes :", validate=lambda x: x.isdigit() and int(x) > 0 ).ask() if not minutes_str: return target_time = datetime.now() + timedelta(minutes=int(minutes_str)) else: # "At HH:MM" time_str = questionary.text( "Start time (HH:MM) :", validate=lambda x: bool(re.match(r'^\d{2}:\d{2}$', x)) and 0 <= int(x.split(':')[0]) <= 23 and 0 <= int(x.split(':')[1]) <= 59 ).ask() if not time_str: return now = datetime.now() h, m = int(time_str.split(':')[0]), int(time_str.split(':')[1]) target_time = now.replace(hour=h, minute=m, second=0, microsecond=0) if target_time <= now: console.print("[yellow]⚠ Specified time is already past. Starting immediately.[/yellow]") return print() try: while True: remaining = target_time - datetime.now() if remaining.total_seconds() <= 0: break total_secs = int(remaining.total_seconds()) h = total_secs // 3600 m = (total_secs % 3600) // 60 s = total_secs % 60 target_str = target_time.strftime('%H:%M:%S') print(f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ", end="", flush=True) sleep(1) while msvcrt.kbhit(): msvcrt.getwch() print() console.print("[green]✓ Starting processing.[/green]") except KeyboardInterrupt: print() console.print("[bold red]Launch cancelled by user.[/bold red]") raise SystemExit(0) def load_json_file(filename): """Load a JSON file from disk. Returns parsed data or None on error.""" if os.path.exists(filename): try: with open(filename, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logging.warning(f"Could not load JSON file '{filename}': {e}") console.print(f"[yellow]⚠ Warning: Could not load JSON file '{filename}': {e}[/yellow]") return None # ============================================================================ # BLOCK 4: CONFIGURATION LOADING # ============================================================================ def load_requests_mapping_config(): """Loads and validates the requests mapping configuration from the Excel file.""" global requests_mapping_config config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME) try: workbook = openpyxl.load_workbook(config_path, data_only=True) except FileNotFoundError: error_msg = f"Error: Configuration file not found at: {config_path}" logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) if REQUESTS_MAPPING_TABLE_NAME not in workbook.sheetnames: error_msg = f"Error: Sheet '{REQUESTS_MAPPING_TABLE_NAME}' not found in the configuration file." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) sheet = workbook[REQUESTS_MAPPING_TABLE_NAME] headers = [cell.value for cell in sheet[1]] temp_config = [] for row_index, row in enumerate(sheet.iter_rows(min_row=2, values_only=True), start=2): field_config = dict(zip(headers, row)) if field_config.get("source_name") == "Not Specified": continue field_name = field_config.get("field_name") if not field_name or not isinstance(field_name, str): error_msg = f"Error in config file, row {row_index}: 'field_name' is mandatory." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) field_config["field_name"] = re.sub(r'\s*\([^)]*\)$', '', field_name).strip() # Parse source_id prefix source_id_raw = field_config.get("source_id", "") if source_id_raw and isinstance(source_id_raw, str): if source_id_raw.startswith("q_id="): field_config["source_type"] = "q_id" field_config["source_value"] = source_id_raw[5:] elif source_id_raw.startswith("q_name="): field_config["source_type"] = "q_name" field_config["source_value"] = source_id_raw[7:] elif source_id_raw.startswith("q_category="): field_config["source_type"] = "q_category" field_config["source_value"] = source_id_raw[11:] elif source_id_raw == "record": field_config["source_type"] = "record" field_config["source_value"] = None elif source_id_raw == "request": field_config["source_type"] = "request" field_config["source_value"] = None else: field_config["source_type"] = None field_config["source_value"] = source_id_raw else: field_config["source_type"] = None field_config["source_value"] = None for json_field in ["field_path", "field_condition", "true_if_any", "value_labels"]: value = field_config.get(json_field) if value: if not isinstance(value, str): error_msg = f"Error in config file, row {row_index}, field '{json_field}': Invalid value, must be a JSON string." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) try: field_config[json_field] = json.loads(value) except json.JSONDecodeError: error_msg = f"Error in config file, row {row_index}, field '{json_field}': Invalid JSON format." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) else: field_config[json_field] = None if not field_config.get("field_path"): error_msg = f"Error in config file, row {row_index}: 'field_path' is mandatory when a field is specified." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) temp_config.append(field_config) requests_mapping_config = temp_config console.print(f"Loaded {len(requests_mapping_config)} fields from requests mapping configuration.", style="green") def load_organizations_mapping_config(): """Loads and validates the organizations mapping configuration from the Excel file.""" global organizations_mapping_config config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME) try: workbook = openpyxl.load_workbook(config_path, data_only=True) except FileNotFoundError: error_msg = f"Error: Configuration file not found at: {config_path}" logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) if ORGANIZATIONS_MAPPING_TABLE_NAME not in workbook.sheetnames: logging.info(f"Sheet '{ORGANIZATIONS_MAPPING_TABLE_NAME}' not found. Organizations mapping is optional.") organizations_mapping_config = [] return sheet = workbook[ORGANIZATIONS_MAPPING_TABLE_NAME] headers = [cell.value for cell in sheet[1]] headers_filtered = [h for h in headers if h is not None] mapping_config = [] try: for row in sheet.iter_rows(min_row=2, values_only=True): if all(cell is None for cell in row): break row_filtered = row[:len(headers_filtered)] config_dict = dict(zip(headers_filtered, row_filtered)) mapping_config.append(config_dict) except Exception as e: error_msg = f"Error parsing organizations mapping: {e}" logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) finally: workbook.close() organizations_mapping_config = mapping_config if mapping_config: console.print(f"Loaded {len(organizations_mapping_config)} organizations from organizations mapping configuration.", style="green") else: console.print("No organizations mapping found (this is optional).", style="yellow") def load_do_filters_config(): """ Loads the DO filters from the Named Range 'DO_Filters' in the config Excel file. The Named Range contains a JSON string representing the filters object for the worklist API. Returns: dict: Filters object (e.g. {"status": "all-admin", "study": "ENDOLIFE"}) Raises: Exception: If the Named Range is not found or the value is not valid JSON. """ config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME) try: workbook = openpyxl.load_workbook(config_path, data_only=True) except FileNotFoundError: error_msg = f"Error: Configuration file not found at: {config_path}" logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) if DO_FILTERS not in workbook.defined_names: error_msg = f"Error: Named range '{DO_FILTERS}' not found in configuration file." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) try: named_range = workbook.defined_names[DO_FILTERS] destinations = list(named_range.destinations) if not destinations: raise ValueError("Named range has no destinations") sheet_name, cell_ref = destinations[0] # Remove absolute reference markers ($) for cell access cell_ref_clean = cell_ref.replace('$', '') sheet = workbook[sheet_name] cell_value = sheet[cell_ref_clean].value except Exception as e: error_msg = f"Error reading Named Range '{DO_FILTERS}': {e}" logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) finally: workbook.close() if not cell_value or not isinstance(cell_value, str): error_msg = f"Error: Named range '{DO_FILTERS}' is empty or not a string." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) try: filters = json.loads(cell_value) except json.JSONDecodeError as e: error_msg = f"Error: Named range '{DO_FILTERS}' does not contain valid JSON: {e}" logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) console.print(f"Loaded DO filters: {filters}", style="green") return filters # ============================================================================ # BLOCK 5: DATA SEARCH & EXTRACTION # ============================================================================ def get_value_from_request(output_request, key): """Helper to find a key in the nested output_request structure (groups → fields).""" for group in output_request.values(): if isinstance(group, dict) and key in group: return group[key] return None # ============================================================================ # BLOCK 6: CUSTOM FUNCTIONS & FIELD PROCESSING # ============================================================================ def _execute_custom_function(function_name, args, output_request): """Executes a custom function for a calculated field.""" if function_name == "search_in_fields_using_regex": if not args or len(args) < 2: return "$$$$ Argument Error: search_in_fields_using_regex requires at least 2 arguments" regex_pattern = args[0] field_names = args[1:] field_values = [] all_undefined = True for field_name in field_names: value = get_value_from_request(output_request, field_name) field_values.append(value) if value is not None and value != "undefined": all_undefined = False if all_undefined: return "undefined" try: for value in field_values: if isinstance(value, str) and re.search(regex_pattern, value, re.IGNORECASE): return True except re.error as e: return f"$$$$ Regex Error: {e}" return False elif function_name == "extract_parentheses_content": if not args or len(args) != 1: return "$$$$ Argument Error: extract_parentheses_content requires 1 argument" field_name = args[0] value = get_value_from_request(output_request, field_name) if value is None or value == "undefined": return "undefined" match = re.search(r'\((.*?)\)', str(value)) return match.group(1) if match else "undefined" elif function_name == "append_terminated_suffix": if not args or len(args) != 2: return "$$$$ Argument Error: append_terminated_suffix requires 2 arguments" status = get_value_from_request(output_request, args[0]) is_terminated = get_value_from_request(output_request, args[1]) if status is None or status == "undefined": return "undefined" if not isinstance(is_terminated, bool) or not is_terminated: return status return f"{status} - AP" elif function_name == "if_then_else": if not args or len(args) < 4: return "$$$$ Argument Error: if_then_else requires at least 4 arguments" operator = args[0] def resolve_value(arg): if isinstance(arg, bool): return arg if isinstance(arg, (int, float)): return arg if isinstance(arg, str) and arg.startswith("$"): return arg[1:] return get_value_from_request(output_request, arg) if operator == "is_true": if len(args) != 4: return "$$$$ Argument Error: is_true requires 4 arguments" value = resolve_value(args[1]) if value is None or value == "undefined": return "undefined" condition = (value is True) result_if_true = resolve_value(args[2]) result_if_false = resolve_value(args[3]) elif operator == "is_false": if len(args) != 4: return "$$$$ Argument Error: is_false requires 4 arguments" value = resolve_value(args[1]) if value is None or value == "undefined": return "undefined" condition = (value is False) result_if_true = resolve_value(args[2]) result_if_false = resolve_value(args[3]) elif operator == "all_true": if len(args) != 4: return "$$$$ Argument Error: all_true requires 4 arguments" fields_arg = args[1] if not isinstance(fields_arg, list): return "$$$$ Argument Error: all_true requires arg1 to be a list of field names" conditions = [] for field_name in fields_arg: field_value = get_value_from_request(output_request, field_name) if field_value is None or field_value == "undefined": return "undefined" conditions.append(field_value) condition = all(conditions) result_if_true = resolve_value(args[2]) result_if_false = resolve_value(args[3]) elif operator == "is_defined": if len(args) != 4: return "$$$$ Argument Error: is_defined requires 4 arguments" value = resolve_value(args[1]) condition = (value is not None and value != "undefined") result_if_true = resolve_value(args[2]) result_if_false = resolve_value(args[3]) elif operator == "is_undefined": if len(args) != 4: return "$$$$ Argument Error: is_undefined requires 4 arguments" value = resolve_value(args[1]) condition = (value is None or value == "undefined") result_if_true = resolve_value(args[2]) result_if_false = resolve_value(args[3]) elif operator == "all_defined": if len(args) != 4: return "$$$$ Argument Error: all_defined requires 4 arguments" fields_arg = args[1] if not isinstance(fields_arg, list): return "$$$$ Argument Error: all_defined requires arg1 to be a list of field names" for field_name in fields_arg: field_value = get_value_from_request(output_request, field_name) if field_value is None or field_value == "undefined": condition = False break else: condition = True result_if_true = resolve_value(args[2]) result_if_false = resolve_value(args[3]) elif operator == "==": if len(args) != 5: return "$$$$ Argument Error: == requires 5 arguments" value1 = resolve_value(args[1]) value2 = resolve_value(args[2]) if value1 is None or value1 == "undefined" or value2 is None or value2 == "undefined": return "undefined" condition = (value1 == value2) result_if_true = resolve_value(args[3]) result_if_false = resolve_value(args[4]) elif operator == "!=": if len(args) != 5: return "$$$$ Argument Error: != requires 5 arguments" value1 = resolve_value(args[1]) value2 = resolve_value(args[2]) if value1 is None or value1 == "undefined" or value2 is None or value2 == "undefined": return "undefined" condition = (value1 != value2) result_if_true = resolve_value(args[3]) result_if_false = resolve_value(args[4]) else: return f"$$$$ Unknown Operator: {operator}" return result_if_true if condition else result_if_false elif function_name == "extract_value_from_array": # Args: [array_field_name, key_path, search_value, value_path] # array_field_name : name of an already-computed field in output_request containing the array # key_path : relative path (list) to the key attribute within each array item # search_value : JSON value to match against (string, number, bool, object...) # value_path : relative path (list) to the value to extract from the matched item if not args or len(args) != 4: return "$$$$ Argument Error: extract_value_from_array requires 4 arguments" array_field_name, key_path, search_value, value_path = args if not isinstance(key_path, list): return "$$$$ Argument Error: extract_value_from_array key_path (arg2) must be a list" if not isinstance(value_path, list): return "$$$$ Argument Error: extract_value_from_array value_path (arg4) must be a list" array = get_value_from_request(output_request, array_field_name) if array is None or array == "undefined": return "undefined" if not isinstance(array, list): return "$$$$ Format Error : Array expected" for item in array: if get_nested_value(item, key_path) == search_value: return get_nested_value(item, value_path, default="undefined") return "undefined" elif function_name == "concat": # Args: [separator, field_name_1, field_name_2, ..., field_name_n] # separator : string to insert between concatenated values # field_name_* : names of already-computed fields in output_request to concatenate # undefined/None values are silently skipped if not args or len(args) < 2: return "$$$$ Argument Error: concat requires at least 2 arguments (separator + 1 field)" separator = args[0] if not isinstance(separator, str): return "$$$$ Argument Error: concat separator (arg1) must be a string" field_names = args[1:] parts = [] all_undefined = True for field_name in field_names: value = get_value_from_request(output_request, field_name) if value is not None and value != "undefined": all_undefined = False parts.append(str(value)) if all_undefined: return "undefined" return separator.join(parts) return f"$$$$ Unknown Custom Function: {function_name}" def _apply_date_format(strftime_template, iso_value): """ Parses an ISO 8601 date string and formats it using a strftime template. Returns the formatted string, or None if the source value cannot be parsed. """ iso_formats = [ "%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M", "%Y-%m-%d", ] for fmt in iso_formats: try: return datetime.strptime(iso_value, fmt).strftime(strftime_template) except (ValueError, TypeError): continue return None def process_requests_mapping(output_request, request_data): """Processes and adds the requests mapping fields to the output request dictionary.""" for field in requests_mapping_config: field_name = field["field_name"] field_group = field.get("field_group", "Extended_Fields") final_value = "undefined" # Check condition condition_field_name = field.get("field_condition") if condition_field_name: condition_value = get_value_from_request(output_request, condition_field_name) if condition_value is None or condition_value == "undefined": final_value = "undefined" elif not isinstance(condition_value, bool): final_value = "$$$$ Condition Field Error" elif not condition_value: final_value = "N/A" # If condition allows, process the field if final_value == "undefined": source_name = field.get("source_name") source_type = field.get("source_type") field_path = field.get("field_path") # Get raw value from appropriate source if source_name == "Calculated": function_name = field.get("source_id") args = field_path final_value = _execute_custom_function(function_name, args, output_request) elif source_type == "request": final_value = get_nested_value(request_data, field_path, default="undefined") else: # source types not used in DO (q_id, q_name, q_category, record, etc.) # return undefined to allow future extensibility final_value = "undefined" # If the source data itself is missing, log a warning but continue if final_value == "$$$$ No Data": request_id = request_data.get("id", "Unknown") if isinstance(request_data, dict) else "Unknown" logging.warning(f"No '{source_type}' data source found for Request {request_id} (Field: {field_name})") final_value = "undefined" # Post-processing: Apply true_if_any and value_labels transformations if final_value not in ["undefined", "$$$$ No Data"]: check_values = field.get("true_if_any") if check_values: raw_value_set = set(final_value if isinstance(final_value, list) else [final_value]) check_values_set = set(check_values if isinstance(check_values, list) else [check_values]) final_value = not raw_value_set.isdisjoint(check_values_set) value_labels = field.get("value_labels") if value_labels and final_value not in ["$$$$ Format Error : Array expected"]: found = False for label_map in value_labels: if label_map.get("value") == final_value: final_value = get_nested_value(label_map, ["text", "fr"], default=f"$$$$ Value Error : {final_value}") found = True break if not found: final_value = f"$$$$ Value Error : {final_value}" field_template = field.get("field_template") # Post-processing: If the value is a list, join it with a pipe (unless raw_array template) if isinstance(final_value, list) and field_template != "raw_array": final_value = "|".join(map(str, final_value)) # Post-processing: Format score dictionaries if isinstance(final_value, dict) and 'total' in final_value and 'max' in final_value: final_value = f"{final_value['total']}/{final_value['max']}" # Post-processing: Apply field template if field_template and field_template != "raw_array" and final_value not in ["undefined", "N/A"] and isinstance(final_value, (str, int, float, bool)): if "%" in field_template: formatted = _apply_date_format(field_template, str(final_value)) final_value = formatted if formatted is not None else f"$$$$ Date Format Error: {final_value}" else: final_value = field_template.replace("$value", str(final_value)) if field_group not in output_request: output_request[field_group] = {} output_request[field_group][field_name] = final_value # ============================================================================ # BLOCK 7: BUSINESS API CALLS # ============================================================================ @api_call_with_retry def get_worklist_page(filters, page, page_size): """Fetches one page of the diagnostic order worklist.""" client = get_httpx_client() client.base_url = GDD_URL response = client.post( API_DO_WORKLIST_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}, json={ "lang": "fr-FR", "filters": filters, "limit": page_size, "page": page, "sort": [] }, timeout=API_TIMEOUT ) response.raise_for_status() return response.json() @api_call_with_retry def get_request_detail_by_id(request_id): """Fetches the full validation detail for a single request.""" client = get_httpx_client() client.base_url = GDD_URL response = client.get( f"{API_DO_REQUEST_DETAIL_ENDPOINT}/{request_id}/validation", headers={"Authorization": f"Bearer {access_token}"}, timeout=API_TIMEOUT ) response.raise_for_status() return response.json() @api_call_with_retry def get_professionals(ids): """ Fetches professional display names for a list of IDs (prescriber / requester). IDs are deduplicated before the call. Results are matched by ID for robustness. Returns: dict: {professional_id: display_name} """ if not ids: return {} client = get_httpx_client() client.base_url = GDD_URL response = client.post( API_DO_PROFESSIONALS_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}, json={"ids": ids}, timeout=API_TIMEOUT ) response.raise_for_status() data = response.json().get("data", []) result = {} for pro in data: pro_id = get_nested_value(pro, ["metadata", "id"]) display = pro.get("display") if pro_id: result[pro_id] = display return result # ============================================================================ # BLOCK 7b: ORGANIZATION CENTER MAPPING # ============================================================================ def load_organization_center_mapping(): """ Loads organization ↔ center mapping from Excel file in script directory. Returns: dict: {organization_name_normalized: center_name} or {} if error/skip """ mapping_file = ORG_CENTER_MAPPING_FILE_NAME if not os.path.exists(mapping_file): console.print(f"[yellow]⚠ Mapping file not found at: {mapping_file}. Skipping center mapping.[/yellow]") return {} try: workbook = openpyxl.load_workbook(mapping_file) except Exception as e: console.print(f"[yellow]⚠ Error loading mapping file: {e}. Skipping center mapping.[/yellow]") logging.warning(f"Error loading mapping file: {e}") return {} if ORG_CENTER_MAPPING_TABLE_NAME not in workbook.sheetnames: console.print(f"[yellow]⚠ Sheet '{ORG_CENTER_MAPPING_TABLE_NAME}' not found in mapping file. Skipping center mapping.[/yellow]") return {} sheet = workbook[ORG_CENTER_MAPPING_TABLE_NAME] headers = [cell.value for cell in sheet[1]] if "Organization_Name" not in headers or "Center_Name" not in headers: console.print(f"[yellow]⚠ Required columns 'Organization_Name' or 'Center_Name' not found in mapping file. Skipping center mapping.[/yellow]") return {} mapping_rows = [] try: for row in sheet.iter_rows(min_row=2, values_only=True): if all(cell is None for cell in row): continue row_dict = dict(zip(headers, row)) org_name = row_dict.get("Organization_Name") center_name = row_dict.get("Center_Name") if org_name and center_name: mapping_rows.append({"Organization_Name": org_name, "Center_Name": center_name}) except Exception as e: console.print(f"[yellow]⚠ Error reading mapping file rows: {e}. Skipping center mapping.[/yellow]") logging.warning(f"Error reading mapping file rows: {e}") return {} # Validate: check for duplicates on normalized versions org_names_normalized = {} center_names_normalized = {} for row in mapping_rows: org_name_raw = row["Organization_Name"] center_name_raw = row["Center_Name"] org_normalized = org_name_raw.strip().lower() if isinstance(org_name_raw, str) else str(org_name_raw).strip().lower() center_normalized = center_name_raw.strip().lower() if isinstance(center_name_raw, str) else str(center_name_raw).strip().lower() if org_normalized in org_names_normalized: console.print(f"[yellow]⚠ Duplicate found in Organization_Name: '{org_name_raw}'. Skipping center mapping.[/yellow]") return {} if center_normalized in center_names_normalized: console.print(f"[yellow]⚠ Duplicate found in Center_Name: '{center_name_raw}'. Skipping center mapping.[/yellow]") return {} org_names_normalized[org_normalized] = org_name_raw center_names_normalized[center_normalized] = center_name_raw # Build mapping dict mapping_dict = {} for row in mapping_rows: org_name_raw = row["Organization_Name"] center_name_raw = row["Center_Name"] org_normalized = org_name_raw.strip().lower() if isinstance(org_name_raw, str) else str(org_name_raw).strip().lower() center_clean = center_name_raw.strip() if isinstance(center_name_raw, str) else str(center_name_raw).strip() mapping_dict[org_normalized] = center_clean return mapping_dict # ============================================================================ # BLOCK 8: REQUEST PROCESSING # ============================================================================ def _process_single_request(worklist_request, mapping_dict): """ Processes a single request from the worklist: 1. Fetches full request detail 2. Fetches prescriber and requester names (deduplicated single API call) 3. Injects enrichment data (names, identity fields, center name, status override) 4. Applies requests mapping to produce the output object Args: worklist_request: Request object from the worklist (root-level fields available) mapping_dict: Organization → center name mapping dict Returns: Tuple of (output_request, request_meta) where request_meta contains raw fields needed for organization building and sorting. """ request_id = worklist_request.get("id") # Set thread-local context for detailed error logging in decorators ctx = {"id": request_id} thread_local_storage.current_request_context = ctx # --- 1. Fetch request detail --- request_detail = get_request_detail_by_id(request_id) if request_detail is None: request_detail = {} # --- 2. Fetch professional names (prescriber + requester, deduplicated) --- prescriber_id = request_detail.get("prescriber") requester_id = request_detail.get("requester") # Deduplicate IDs before API call unique_ids = list({pid for pid in [prescriber_id, requester_id] if pid}) professionals = get_professionals(unique_ids) if unique_ids else {} # Inject professional names (None if not found) request_detail["prescriberName"] = professionals.get(prescriber_id) if prescriber_id else None request_detail["requesterName"] = professionals.get(requester_id) if requester_id else None # --- 3. Inject patient identity fields from worklist (only source for these fields) --- identity = worklist_request.get("identity") or {} request_detail["lastname"] = identity.get("lastname") request_detail["firstname"] = identity.get("firstname") request_detail["birthday"] = identity.get("birthday") # --- 4. Status override: diagnostic_status takes precedence if defined --- diagnostic_status = request_detail.get("diagnostic_status") if diagnostic_status is not None and diagnostic_status != "": request_detail["status"] = diagnostic_status # --- 5. Center mapping: inject Center_Name from labeledOrganization --- labeled_org = request_detail.get("labeledOrganization") if labeled_org: org_normalized = labeled_org.strip().lower() request_detail["Center_Name"] = mapping_dict.get(org_normalized, labeled_org) else: request_detail["Center_Name"] = None # --- 6. Apply requests mapping to produce output object --- output_request = {} process_requests_mapping(output_request, request_detail) # --- 7. Build meta for organization building and sorting --- request_meta = { "org_id": request_detail.get("organization"), "org_name": labeled_org, "center_name": request_detail.get("Center_Name"), "status": request_detail.get("status"), "diagnostic_result": request_detail.get("diagnostic_result"), "lastname": request_detail.get("lastname"), "firstname": request_detail.get("firstname"), "id": request_id } return output_request, request_meta # ============================================================================ # BLOCK 9: ORGANIZATIONS BUILDING # ============================================================================ def build_organizations(request_metas): """ Builds the organizations summary list from collected request metadata. Each organization entry contains the organization identity and counters derived from the status and diagnostic_result of its requests. Args: request_metas: List of request_meta dicts (from _process_single_request) Returns: List of organization dicts, sorted by center_name then id """ org_map = {} for meta in request_metas: org_id = meta.get("org_id") if not org_id: continue if org_id not in org_map: org_map[org_id] = { "id": org_id, "name": meta.get("org_name"), "center_name": meta.get("center_name"), "total_count": 0, "sent_count": 0, # status == "active" "accepted_count": 0, # status == "accepted" "rejected_count": 0, # status == "rejected" "sequencing_count": 0, # status == "waiting" "ai_count": 0, # status == "in progress" "result_available_count": 0, # status == "finished" "report_available_count": 0, # status == "signed" "positive_count": 0, # diagnostic_result == "POSITIVE" "negative_count": 0, # diagnostic_result == "NEGATIVE" "uninterpretable_count": 0 # diagnostic_result == "UNINTERPRETABLE" } org = org_map[org_id] org["total_count"] += 1 status = meta.get("status") if status == "active": org["sent_count"] += 1 elif status == "accepted": org["accepted_count"] += 1 elif status == "rejected": org["rejected_count"] += 1 elif status == "waiting": org["sequencing_count"] += 1 elif status == "in progress": org["ai_count"] += 1 elif status == "finished": org["result_available_count"] += 1 elif status == "signed": org["report_available_count"] += 1 diagnostic_result = meta.get("diagnostic_result") if diagnostic_result == "POSITIVE": org["positive_count"] += 1 elif diagnostic_result == "NEGATIVE": org["negative_count"] += 1 elif diagnostic_result == "UNINTERPRETABLE": org["uninterpretable_count"] += 1 organizations = list(org_map.values()) organizations.sort(key=lambda o: (o.get("center_name") or "", o.get("id") or "")) return organizations # ============================================================================ # BLOCK 10: MAIN EXECUTION # ============================================================================ def main(): global global_pbar, excel_export_config, excel_export_enabled # --- Check for CLI Check_Only mode --- check_only_mode = "--check-only" in sys.argv if check_only_mode: run_check_only_mode(sys.argv) return # --- Check for CLI Excel_Only mode --- excel_only_mode = "--excel-only" in sys.argv if excel_only_mode: print() load_requests_mapping_config() load_organizations_mapping_config() export_excel_only(sys.argv, REQUESTS_FILE_NAME, ORGANIZATIONS_FILE_NAME, requests_mapping_config, organizations_mapping_config) return # === NORMAL MODE: Full data collection === print() login_status = login() while login_status == "Error": login_status = login() if login_status == "Exit": return print() number_of_threads = int((questionary.text("Number of threads :", default="12", validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask())) print() ask_on_retry_exhausted() print() wait_for_scheduled_launch() print() load_requests_mapping_config() load_organizations_mapping_config() # Load DO filters from config print() do_filters = load_do_filters_config() # Load and validate Excel export configuration print() console.print("[bold cyan]Loading Excel export configuration...[/bold cyan]") excel_export_config, has_config_critical, config_error_messages = \ prepare_excel_export(requests_mapping_config, organizations_mapping_config) if has_config_critical: for err in config_error_messages: console.print(f"[bold red] • {err}[/bold red]") print() answer = questionary.confirm( "⚠ Critical configuration errors detected. Continue anyway?", default=False ).ask() if not answer: console.print("[bold red]Aborted by user[/bold red]") return else: excel_export_enabled = False else: excel_export_enabled = True if excel_export_config else False # Load center mapping print() print("Loading organization center mapping...") mapping_dict = load_organization_center_mapping() # === FETCH WORKLIST (paginated) === print() start_time = perf_counter() with console.status("[bold green]Fetching worklist (page 1)...", spinner="dots"): first_page = get_worklist_page(do_filters, 1, DO_WORKLIST_PAGE_SIZE) metadata = first_page.get("metadata", {}) total_requests = metadata.get("total", 0) total_pages = metadata.get("pages", 1) print(f"{total_requests} requests across {total_pages} pages...") print() # === SUBMIT ALL REQUESTS TO THREAD POOL AS PAGES ARRIVE === # Both progress bars are shown simultaneously: # - fetching_pbar (position=0): advances by the number of requests in each fetched page # - processing_pbar (position=1): advances as each request finishes processing # Completed futures are drained inline after each page so the processing bar # starts moving immediately, without waiting for all pages to be fetched first. all_futures = [] all_results = [] completed_set = set() with ThreadPoolExecutor(max_workers=number_of_threads) as thread_pool: with tqdm(total=total_requests, unit="req.", desc=f"{'Fetching requests':<52}", position=0, leave=True, bar_format=custom_bar_format) as fetching_pbar: with tqdm(total=total_requests, unit="req.", desc=f"{'Processing requests':<52}", position=1, leave=True, bar_format=custom_bar_format) as processing_pbar: global_pbar = processing_pbar def _drain_completed(): for f in list(all_futures): if f not in completed_set and f.done(): try: result = f.result() all_results.append(result) except Exception as exc: logging.critical(f"Critical exception in request worker: {exc}", exc_info=True) print(f"\nCRITICAL ERROR in request processing thread:") print(f"Exception: {exc}") traceback.print_exc() thread_pool.shutdown(wait=False, cancel_futures=True) raise finally: completed_set.add(f) with _global_pbar_lock: if global_pbar: global_pbar.update(1) # Submit first page requests first_page_data = first_page.get("data", []) for worklist_request in first_page_data: f = thread_pool.submit(run_with_context, _process_single_request, {"id": worklist_request.get("id")}, worklist_request, mapping_dict) all_futures.append(f) fetching_pbar.update(len(first_page_data)) # Fetch and submit remaining pages; drain completed futures after each page for page_num in range(2, total_pages + 1): page_data = get_worklist_page(do_filters, page_num, DO_WORKLIST_PAGE_SIZE) page_requests = page_data.get("data", []) for worklist_request in page_requests: f = thread_pool.submit(run_with_context, _process_single_request, {"id": worklist_request.get("id")}, worklist_request, mapping_dict) all_futures.append(f) fetching_pbar.update(len(page_requests)) _drain_completed() # Drain remaining futures not yet collected remaining = [f for f in all_futures if f not in completed_set] for future in as_completed(remaining): try: result = future.result() all_results.append(result) except Exception as exc: logging.critical(f"Critical exception in request worker: {exc}", exc_info=True) print(f"\nCRITICAL ERROR in request processing thread:") print(f"Exception: {exc}") traceback.print_exc() thread_pool.shutdown(wait=False, cancel_futures=True) raise finally: completed_set.add(future) with _global_pbar_lock: if global_pbar: global_pbar.update(1) # === SORT RESULTS === print() print() print("Sorting results...") all_results.sort(key=lambda x: ( x[1].get("center_name") or "", x[1].get("lastname") or "", x[1].get("firstname") or "", x[1].get("id") or "" )) output_requests = [r[0] for r in all_results] request_metas = [r[1] for r in all_results] # === BUILD ORGANIZATIONS === print("Building organizations summary...") organizations_list = build_organizations(request_metas) try: # === QUALITY CHECKS === print() has_regression_critical = run_quality_checks( current_requests=output_requests, old_requests_filename=REQUESTS_FILE_NAME ) # === CHECK FOR CRITICAL ISSUES AND ASK USER CONFIRMATION === if has_regression_critical: print() console.print("[bold red]⚠ CRITICAL issues detected in quality checks![/bold red]") confirm_write = questionary.confirm( "Do you want to write the results anyway?", default=True ).ask() if not confirm_write: console.print("[yellow]✗ Output writing cancelled by user. Files were not modified.[/yellow]") console.print("[yellow] You can re-run the script to try again.[/yellow]") print() print(f"Elapsed time : {str(timedelta(seconds=perf_counter() - start_time))}") return # === BACKUP OLD FILES === backup_output_files() # === WRITE NEW FILES === print("Writing files...") with open(REQUESTS_FILE_NAME, 'w', encoding='utf-8') as f_json: json.dump(output_requests, f_json, indent=4, ensure_ascii=False) with open(ORGANIZATIONS_FILE_NAME, 'w', encoding='utf-8') as f_json: json.dump(organizations_list, f_json, indent=4, ensure_ascii=False) console.print("[green]✓ Data saved to JSON files[/green]") print() # === EXCEL EXPORT === run_normal_mode_export(excel_export_enabled, excel_export_config, requests_mapping_config, organizations_mapping_config) except IOError as io_err: logging.critical(f"Error while writing JSON file : {io_err}") print(f"Error while writing JSON file : {io_err}") except Exception as exc: logging.critical(f"Error during final processing : {exc}") print(f"Error during final processing : {exc}") print() print(f"Elapsed time : {str(timedelta(seconds=perf_counter() - start_time))}") if __name__ == '__main__': try: main() except Exception as e: logging.critical(f"Script terminated prematurely due to an exception: {e}", exc_info=True) print(f"Script stopped due to an error : {e}") finally: if 'subtasks_thread_pool' in globals() and subtasks_thread_pool: subtasks_thread_pool.shutdown(wait=False, cancel_futures=True) print('\n') input("Press Enter to exit...")