Files
EB_Dashboard/eb_dashboard.py

1520 lines
64 KiB
Python

# Endobest Clinical Research Dashboard Generator
# This script automates the collection and processing of patient inclusion data from the Endobest clinical research protocol.
# It authenticates with Ziwig's IAM, Research Clinic (RC), and GDD APIs to gather patient records, questionnaire responses,
# and diagnostic test results across multiple healthcare organizations. The script generates a comprehensive JSON report
# containing all patient data, with 100% configurable fields defined in an Excel configuration file. All fields (patient
# identification, inclusion data, clinical records, test requests, and questionnaire responses) are externalized and can be
# configured without any code modification. The configuration supports multiple data sources (questionnaires, records,
# inclusions, requests, visits), custom functions for business logic (conditional fields, data transformations, pattern matching),
# field dependencies and conditions, value transformations (labels, templates), and multi-line field definitions for complex
# calculations. Organization names are enriched with center identifiers using a configurable mapping table, enabling seamless
# integration with center-based reporting workflows. It employs multithreading with configurable worker pools to parallelize
# API calls across organizations and patients, significantly reducing execution time. A single optimized API call retrieves all
# questionnaires per patient, improving performance by 4-5x compared to multiple filtered calls. Results are exported as
# structured JSON files with nested field groups for easy integration with downstream analytics tools. Built-in quality assurance
# includes coherence checks between statistics and detailed data, comprehensive non-regression testing with configurable
# Warning/Critical thresholds, and user confirmation prompts when critical issues are detected. Excel export functionality
# enables generation of configurable Excel workbooks with data filtering, sorting, value replacement, and formula recalculation
# (see eb_dashboard_excel_export.py and DOCUMENTATION_04_EXCEL_EXPORT.md). Key features include automatic token refresh handling,
# retry mechanisms for transient API failures, progress tracking with real-time visual feedback, flexible data source
# identification, and support for complex data extraction using JSON path expressions.
import json
import logging
import msvcrt
import os
import re
import sys
import threading
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import timedelta, datetime
from time import perf_counter, sleep
import functools
import httpx
import openpyxl
import questionary
from tqdm import tqdm
from rich.console import Console
# Import centralized constants (SINGLE SOURCE OF TRUTH)
from eb_dashboard_constants import (
INCLUSIONS_FILE_NAME,
ORGANIZATIONS_FILE_NAME,
OLD_FILE_SUFFIX,
DASHBOARD_CONFIG_FILE_NAME,
INCLUSIONS_MAPPING_TABLE_NAME,
ORGANIZATIONS_MAPPING_TABLE_NAME,
ORG_CENTER_MAPPING_FILE_NAME,
ORG_CENTER_MAPPING_TABLE_NAME,
DEFAULT_USER_NAME,
DEFAULT_PASSWORD,
IAM_URL,
RC_URL,
RC_APP_ID,
GDD_URL,
ERROR_MAX_RETRY,
WAIT_BEFORE_RETRY,
WAIT_BEFORE_NEW_BATCH_OF_RETRIES,
MAX_BATCHS_OF_RETRIES,
MAX_THREADS,
RC_ENDOBEST_PROTOCOL_ID,
RC_ENDOBEST_EXCLUDED_CENTERS,
BAR_N_FMT_WIDTH,
BAR_TOTAL_FMT_WIDTH,
BAR_TIME_WIDTH,
BAR_RATE_WIDTH,
LOG_FILE_NAME,
API_TIMEOUT,
API_AUTH_LOGIN_ENDPOINT,
API_AUTH_CONFIG_TOKEN_ENDPOINT,
API_AUTH_REFRESH_TOKEN_ENDPOINT,
API_RC_GET_ALL_ORGANIZATIONS_ENDPOINT,
API_RC_INCLUSION_STATISTICS_ENDPOINT,
API_RC_SEARCH_INCLUSIONS_ENDPOINT,
API_RC_GET_RECORD_BY_PATIENT_ENDPOINT,
API_RC_GET_SURVEYS_ENDPOINT,
API_RC_SEARCH_VISITS_ENDPOINT,
API_GDD_GET_REQUEST_BY_TUBE_ID_ENDPOINT
)
# Import refactored modules
from eb_dashboard_utils import (
get_nested_value,
get_httpx_client,
clear_httpx_client,
get_thread_position,
get_config_path,
thread_local_storage,
run_with_context
)
from eb_dashboard_quality_checks import (
backup_output_files,
run_quality_checks,
run_check_only_mode,
set_dependencies as quality_set_dependencies,
enable_debug_mode
)
from eb_dashboard_excel_export import (
prepare_excel_export,
export_excel_only,
run_normal_mode_export,
set_dependencies as excel_set_dependencies
)
logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s', filename=LOG_FILE_NAME,
filemode='w')
# ============================================================================
# BLOCK 1: CONFIGURATION & BASE INFRASTRUCTURE
# ============================================================================
# NOTE: All constants are imported from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH)
# --- Global Variables ---
access_token = ""
refresh_token = ""
threads_list = []
_token_refresh_lock = threading.Lock()
on_retry_exhausted = "ask" # "ask" | "ignore" | "abort" — set at startup
_stored_username = "" # Credentials stored at login for automatic re-login
_stored_password = ""
_threads_list_lock = threading.Lock()
global_pbar = None
_global_pbar_lock = threading.Lock()
_user_interaction_lock = threading.Lock()
# Global variables (mutable, set at runtime - not constants)
inclusions_mapping_config = []
organizations_mapping_config = []
excel_export_config = None
excel_export_enabled = False
subtasks_thread_pool = ThreadPoolExecutor(40)
httpx_clients = {}
console = Console()
# Share global variables with utility modules (required for thread-safe operations)
import eb_dashboard_utils
eb_dashboard_utils.httpx_clients = httpx_clients
eb_dashboard_utils.threads_list = threads_list
eb_dashboard_utils._threads_list_lock = _threads_list_lock
# Inject console instance to modules
quality_set_dependencies(console)
excel_set_dependencies(console)
# Detect and enable debug mode if --debug flag is present (and remove it from argv)
if "--debug" in sys.argv:
sys.argv.remove("--debug")
enable_debug_mode()
# --- Progress Bar Configuration ---
# NOTE: BAR_N_FMT_WIDTH, BAR_TOTAL_FMT_WIDTH, BAR_TIME_WIDTH, BAR_RATE_WIDTH
# are imported from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH)
custom_bar_format = ("{l_bar}{bar}"
f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} "
f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, "
f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}")
# ============================================================================
# BLOCK 2: DECORATORS & RESILIENCE
# ============================================================================
def new_token():
"""Refresh access token using the refresh token"""
global access_token, refresh_token
with _token_refresh_lock:
for attempt in range(ERROR_MAX_RETRY):
try:
client = get_httpx_client()
client.base_url = RC_URL
response = client.post(API_AUTH_REFRESH_TOKEN_ENDPOINT,
headers={"Authorization": f"Bearer {access_token}"},
json={"refresh_token": refresh_token}, timeout=20)
response.raise_for_status()
access_token = response.json()["access_token"]
refresh_token = response.json()["refresh_token"]
return
except httpx.RequestError as exc:
logging.warning(f"Refresh Token Error (Attempt {attempt + 1}) : {exc}")
clear_httpx_client()
except httpx.HTTPStatusError as exc:
logging.warning(
f"Refresh Token Error (Attempt {attempt + 1}) : {exc.response.status_code} for Url {exc.request.url}")
clear_httpx_client()
finally:
if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY)
# Refresh token exhausted — attempt full re-login with stored credentials
logging.warning("Refresh token exhausted. Attempting re-login with stored credentials.")
_do_login(_stored_username, _stored_password)
logging.info("Re-login successful. New tokens acquired.")
def api_call_with_retry(func):
"""Decorator for API calls with automatic retry and token refresh on 401 errors"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
func_name = func.__name__
total_attempts = 0
batch_count = 1
while True:
for attempt in range(ERROR_MAX_RETRY):
total_attempts += 1
try:
return func(*args, **kwargs)
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
logging.warning(f"Error in {func_name} (Attempt {total_attempts}): {exc}")
# Refresh the thread-local client if an error occurs
# to avoid potential pool corruption or stale connections
clear_httpx_client()
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
logging.info(f"Token expired for {func_name}. Refreshing token.")
try:
new_token()
except (httpx.RequestError, httpx.HTTPStatusError) as token_exc:
logging.warning(f"Token refresh/re-login failed for {func_name}: {token_exc}")
if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY)
else:
# Max retries reached for this batch
if batch_count < MAX_BATCHS_OF_RETRIES:
logging.warning(f"Batch {batch_count}/{MAX_BATCHS_OF_RETRIES} failed for {func_name}. "
f"Waiting {WAIT_BEFORE_NEW_BATCH_OF_RETRIES}s before automatic retry batch.")
batch_count += 1
sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES)
break # Exit for loop to restart batch in while True
else:
# All automatic batches exhausted — apply on_retry_exhausted policy
with _user_interaction_lock:
if on_retry_exhausted == "ignore":
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
logging.warning(f"[AUTO-IGNORE] Skipping {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
return None
elif on_retry_exhausted == "abort":
logging.critical(f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}")
raise httpx.RequestError(message=f"Persistent error in {func_name} (auto-aborted)")
else: # "ask" — display error then interactive prompt
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
console.print(f"[red]Exception: {exc}[/red]")
choice = questionary.select(
f"What would you like to do for {func_name}?",
choices=[
"Retry (try another batch of retries)",
"Ignore (return None and continue)",
"Stop script (critical error)"
]
).ask()
if choice == "Retry (try another batch of retries)":
logging.info(f"User chose to retry {func_name}. Restarting batch sequence.")
batch_count = 1 # Reset batch counter for the next interactive round
break # Exit for loop to restart batch in while True
elif choice == "Ignore (return None and continue)":
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
return None
else:
logging.critical(f"User chose to stop script after persistent error in {func_name}.")
raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)")
return wrapper
# ============================================================================
# BLOCK 3: AUTHENTICATION
# ============================================================================
def _do_login(username, password):
"""Performs the two-step authentication (IAM → RC) with the given credentials.
Updates global access_token and refresh_token on success.
Raises httpx.RequestError or httpx.HTTPStatusError on failure.
Must NOT acquire _token_refresh_lock (caller's responsibility).
"""
global access_token, refresh_token
client = get_httpx_client()
client.base_url = IAM_URL
response = client.post(API_AUTH_LOGIN_ENDPOINT,
json={"username": username, "password": password},
timeout=20)
response.raise_for_status()
master_token = response.json()["access_token"]
user_id = response.json()["userId"]
client = get_httpx_client()
client.base_url = RC_URL
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT,
headers={"Authorization": f"Bearer {master_token}"},
json={"userId": user_id, "clientId": RC_APP_ID,
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
timeout=20)
response.raise_for_status()
access_token = response.json()["access_token"]
refresh_token = response.json()["refresh_token"]
def login():
global _stored_username, _stored_password
user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask())
password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask())
if not (user_name and password):
return "Exit"
try:
_do_login(user_name, password)
except httpx.RequestError as exc:
print(f"Login Error : {exc}")
logging.warning(f"Login Error : {exc}")
return "Error"
except httpx.HTTPStatusError as exc:
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
logging.warning(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
return "Error"
_stored_username = user_name
_stored_password = password
print()
print("Login Success")
return "Success"
# ============================================================================
# BLOCK 3B: FILE UTILITIES
# ============================================================================
def ask_on_retry_exhausted():
"""Asks the user what to do when all API retry batches are exhausted."""
global on_retry_exhausted
choice = questionary.select(
"On retry exhausted :",
choices=[
"Ask (interactive prompt)",
"Ignore (return None and continue)",
"Abort (stop script)"
]
).ask()
if choice is None or choice == "Ask (interactive prompt)":
on_retry_exhausted = "ask"
elif choice == "Ignore (return None and continue)":
on_retry_exhausted = "ignore"
else:
on_retry_exhausted = "abort"
def wait_for_scheduled_launch():
"""Asks the user when to start the processing and waits if needed.
Options: Immediately / In X minutes / At HH:MM
"""
choice = questionary.select(
"When to start processing ?",
choices=["Immediately", "In X minutes", "At HH:MM"]
).ask()
if choice is None or choice == "Immediately":
return
if choice == "In X minutes":
minutes_str = questionary.text(
"Number of minutes :",
validate=lambda x: x.isdigit() and int(x) > 0
).ask()
if not minutes_str:
return
target_time = datetime.now() + timedelta(minutes=int(minutes_str))
else: # "At HH:MM"
time_str = questionary.text(
"Start time (HH:MM) :",
validate=lambda x: bool(re.match(r'^\d{2}:\d{2}$', x)) and
0 <= int(x.split(':')[0]) <= 23 and
0 <= int(x.split(':')[1]) <= 59
).ask()
if not time_str:
return
now = datetime.now()
h, m = int(time_str.split(':')[0]), int(time_str.split(':')[1])
target_time = now.replace(hour=h, minute=m, second=0, microsecond=0)
if target_time <= now:
console.print("[yellow]⚠ Specified time is already past. Starting immediately.[/yellow]")
return
print()
try:
while True:
remaining = target_time - datetime.now()
if remaining.total_seconds() <= 0:
break
total_secs = int(remaining.total_seconds())
h = total_secs // 3600
m = (total_secs % 3600) // 60
s = total_secs % 60
target_str = target_time.strftime('%H:%M:%S')
print(f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ",
end="", flush=True)
sleep(1)
# Flush keyboard buffer to prevent stray keystrokes from polluting subsequent prompts
while msvcrt.kbhit():
msvcrt.getwch()
print()
console.print("[green]✓ Starting processing.[/green]")
except KeyboardInterrupt:
print()
console.print("[bold red]Launch cancelled by user.[/bold red]")
raise SystemExit(0)
def load_json_file(filename):
"""
Load a JSON file from disk.
Args:
filename: Path to JSON file
Returns:
Parsed JSON data or None if file not found or error occurred
"""
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logging.warning(f"Could not load JSON file '{filename}': {e}")
console.print(f"[yellow]⚠ Warning: Could not load JSON file '{filename}': {e}[/yellow]")
return None
# ============================================================================
# BLOCK 4: INCLUSIONS MAPPING CONFIGURATION
# ============================================================================
def load_inclusions_mapping_config():
"""Loads and validates the inclusions mapping configuration from the Excel file."""
global inclusions_mapping_config
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME)
try:
# Load with data_only=True to read calculated values instead of formulas
# (e.g., if mapping columns use formulas like =+1, we get 1, not the formula text)
workbook = openpyxl.load_workbook(config_path, data_only=True)
except FileNotFoundError:
error_msg = f"Error: Configuration file not found at: {config_path}"
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
if INCLUSIONS_MAPPING_TABLE_NAME not in workbook.sheetnames:
error_msg = f"Error: Sheet ''{INCLUSIONS_MAPPING_TABLE_NAME}'' not found in the configuration file."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
sheet = workbook[INCLUSIONS_MAPPING_TABLE_NAME]
headers = [cell.value for cell in sheet[1]]
temp_config = []
for row_index, row in enumerate(sheet.iter_rows(min_row=2, values_only=True), start=2):
field_config = dict(zip(headers, row))
# --- Validation and Parsing ---
if field_config.get("source_name") == "Not Specified":
continue
field_name = field_config.get("field_name")
if not field_name or not isinstance(field_name, str):
error_msg = f"Error in config file, row {row_index}: 'field_name' is mandatory."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
field_config["field_name"] = re.sub(r'\s*\([^)]*\)$', '', field_name).strip()
# Parse source_id prefix (q_id=, q_name=, q_category=, record, inclusion, request)
source_id_raw = field_config.get("source_id", "")
if source_id_raw and isinstance(source_id_raw, str):
if source_id_raw.startswith("q_id="):
field_config["source_type"] = "q_id"
field_config["source_value"] = source_id_raw[5:]
elif source_id_raw.startswith("q_name="):
field_config["source_type"] = "q_name"
field_config["source_value"] = source_id_raw[7:]
elif source_id_raw.startswith("q_category="):
field_config["source_type"] = "q_category"
field_config["source_value"] = source_id_raw[11:]
elif source_id_raw == "record":
field_config["source_type"] = "record"
field_config["source_value"] = None
elif source_id_raw == "inclusion":
field_config["source_type"] = "inclusion"
field_config["source_value"] = None
elif source_id_raw == "request":
field_config["source_type"] = "request"
field_config["source_value"] = None
elif source_id_raw == "6_month_visit":
field_config["source_type"] = "6_month_visit"
field_config["source_value"] = None
else:
field_config["source_type"] = None
field_config["source_value"] = source_id_raw
else:
field_config["source_type"] = None
field_config["source_value"] = None
for json_field in ["field_path", "field_condition", "true_if_any", "value_labels"]:
value = field_config.get(json_field)
if value:
if not isinstance(value, str):
error_msg = f"Error in config file, row {row_index}, field '{json_field}': Invalid value, must be a JSON string."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
try:
field_config[json_field] = json.loads(value)
except json.JSONDecodeError:
error_msg = f"Error in config file, row {row_index}, field '{json_field}': Invalid JSON format."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
else:
field_config[json_field] = None
if not field_config.get("field_path"):
error_msg = f"Error in config file, row {row_index}: 'field_path' is mandatory when a field is specified."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
temp_config.append(field_config)
inclusions_mapping_config = temp_config
console.print(f"Loaded {len(inclusions_mapping_config)} fields from inclusions mapping configuration.", style="green")
def load_organizations_mapping_config():
"""Loads and validates the organizations mapping configuration from the Excel file."""
global organizations_mapping_config
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME)
try:
# Load with data_only=True to read calculated values instead of formulas
# (e.g., if mapping columns use formulas like =+1, we get 1, not the formula text)
workbook = openpyxl.load_workbook(config_path, data_only=True)
except FileNotFoundError:
error_msg = f"Error: Configuration file not found at: {config_path}"
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
if ORGANIZATIONS_MAPPING_TABLE_NAME not in workbook.sheetnames:
# Organizations mapping is optional, so return empty config if sheet not found
logging.info(f"Sheet '{ORGANIZATIONS_MAPPING_TABLE_NAME}' not found in configuration file. Organizations mapping is optional.")
organizations_mapping_config = []
return
sheet = workbook[ORGANIZATIONS_MAPPING_TABLE_NAME]
headers = [cell.value for cell in sheet[1]]
# Filter out None headers (empty columns)
headers_filtered = [h for h in headers if h is not None]
mapping_config = []
try:
for row in sheet.iter_rows(min_row=2, values_only=True):
if all(cell is None for cell in row):
break
# Trim row to match filtered headers length
row_filtered = row[:len(headers_filtered)]
config_dict = dict(zip(headers_filtered, row_filtered))
mapping_config.append(config_dict)
except Exception as e:
error_msg = f"Error parsing organizations mapping: {e}"
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
finally:
workbook.close()
organizations_mapping_config = mapping_config
if mapping_config:
console.print(f"Loaded {len(organizations_mapping_config)} organizations from organizations mapping configuration.", style="green")
else:
console.print("No organizations mapping found (this is optional).", style="yellow")
# ============================================================================
# BLOCK 5: DATA SEARCH & EXTRACTION
# ============================================================================
def _find_questionnaire_by_id(qcm_dict, qcm_id):
"""Finds a questionnaire by ID (direct dictionary lookup)."""
if not isinstance(qcm_dict, dict):
return None
qcm_data = qcm_dict.get(qcm_id)
return qcm_data.get("answers") if qcm_data else None
def _find_questionnaire_by_name(qcm_dict, name):
"""Finds a questionnaire by name (sequential search, returns first match)."""
if not isinstance(qcm_dict, dict):
return None
for qcm in qcm_dict.values():
if get_nested_value(qcm, ["questionnaire", "name"]) == name:
return qcm.get("answers")
return None
def _find_questionnaire_by_category(qcm_dict, category):
"""Finds a questionnaire by category (sequential search, returns first match)."""
if not isinstance(qcm_dict, dict):
return None
for qcm in qcm_dict.values():
if get_nested_value(qcm, ["questionnaire", "category"]) == category:
return qcm.get("answers")
return None
def _get_field_value_from_questionnaire(all_questionnaires, field_config):
"""
Gets the raw value for a field from questionnaires (without post-processing).
"""
# Find questionnaire based on type
source_type = field_config.get("source_type")
source_value = field_config.get("source_value")
if source_type == "q_id":
answers = _find_questionnaire_by_id(all_questionnaires, source_value)
elif source_type == "q_name":
answers = _find_questionnaire_by_name(all_questionnaires, source_value)
elif source_type == "q_category":
answers = _find_questionnaire_by_category(all_questionnaires, source_value)
else:
answers = None
return get_nested_value(answers, field_config["field_path"], default="undefined")
def get_value_from_inclusion(inclusion_dict, key):
"""Helper to find a key in the new nested inclusion structure."""
for group in inclusion_dict.values():
if isinstance(group, dict) and key in group:
return group[key]
return None
# ============================================================================
# BLOCK 6: CUSTOM FUNCTIONS & FIELD PROCESSING
# ============================================================================
def _execute_custom_function(function_name, args, output_inclusion):
"""Executes a custom function for a calculated field."""
if function_name == "search_in_fields_using_regex":
if not args or len(args) < 2:
return "$$$$ Argument Error: search_in_fields_using_regex requires at least 2 arguments"
regex_pattern = args[0]
field_names = args[1:]
field_values = []
all_undefined = True
for field_name in field_names:
value = get_value_from_inclusion(output_inclusion, field_name)
field_values.append(value)
if value is not None and value != "undefined":
all_undefined = False
if all_undefined:
return "undefined"
try:
for value in field_values:
# We only try to match on string values.
if isinstance(value, str) and re.search(regex_pattern, value, re.IGNORECASE):
return True
except re.error as e:
return f"$$$$ Regex Error: {e}"
return False
elif function_name == "extract_parentheses_content":
if not args or len(args) != 1:
return "$$$$ Argument Error: extract_parentheses_content requires 1 argument"
field_name = args[0]
value = get_value_from_inclusion(output_inclusion, field_name)
if value is None or value == "undefined":
return "undefined"
match = re.search(r'\((.*?)\)', str(value))
return match.group(1) if match else "undefined"
elif function_name == "append_terminated_suffix":
if not args or len(args) != 2:
return "$$$$ Argument Error: append_terminated_suffix requires 2 arguments"
status = get_value_from_inclusion(output_inclusion, args[0])
is_terminated = get_value_from_inclusion(output_inclusion, args[1])
if status is None or status == "undefined":
return "undefined"
if not isinstance(is_terminated, bool) or not is_terminated:
return status
return f"{status} - AP"
elif function_name == "if_then_else":
# Unified conditional function
# Syntax: ["operator", arg1, arg2_optional, result_if_true, result_if_false]
# Operators: "is_true", "is_false", "all_true", "is_defined", "is_undefined", "all_defined", "==", "!="
if not args or len(args) < 4:
return "$$$$ Argument Error: if_then_else requires at least 4 arguments"
operator = args[0]
# Helper function to resolve value (literal or field name)
def resolve_value(arg):
# If boolean literal
if isinstance(arg, bool):
return arg
# If numeric literal
if isinstance(arg, (int, float)):
return arg
# If string literal (starts with $)
if isinstance(arg, str) and arg.startswith("$"):
return arg[1:] # Remove the $ prefix
# Otherwise, treat as field name
return get_value_from_inclusion(output_inclusion, arg)
# Determine condition based on operator
if operator == "is_true":
if len(args) != 4:
return "$$$$ Argument Error: is_true requires 4 arguments"
value = resolve_value(args[1])
if value is None or value == "undefined":
return "undefined"
condition = (value is True)
result_if_true = resolve_value(args[2])
result_if_false = resolve_value(args[3])
elif operator == "is_false":
if len(args) != 4:
return "$$$$ Argument Error: is_false requires 4 arguments"
value = resolve_value(args[1])
if value is None or value == "undefined":
return "undefined"
condition = (value is False)
result_if_true = resolve_value(args[2])
result_if_false = resolve_value(args[3])
elif operator == "all_true":
if len(args) != 4:
return "$$$$ Argument Error: all_true requires 4 arguments"
fields_arg = args[1]
if not isinstance(fields_arg, list):
return "$$$$ Argument Error: all_true requires arg1 to be a list of field names"
conditions = []
for field_name in fields_arg:
field_value = get_value_from_inclusion(output_inclusion, field_name)
if field_value is None or field_value == "undefined":
return "undefined"
conditions.append(field_value)
condition = all(conditions)
result_if_true = resolve_value(args[2])
result_if_false = resolve_value(args[3])
elif operator == "is_defined":
if len(args) != 4:
return "$$$$ Argument Error: is_defined requires 4 arguments"
value = resolve_value(args[1])
condition = (value is not None and value != "undefined")
result_if_true = resolve_value(args[2])
result_if_false = resolve_value(args[3])
elif operator == "is_undefined":
if len(args) != 4:
return "$$$$ Argument Error: is_undefined requires 4 arguments"
value = resolve_value(args[1])
condition = (value is None or value == "undefined")
result_if_true = resolve_value(args[2])
result_if_false = resolve_value(args[3])
elif operator == "all_defined":
if len(args) != 4:
return "$$$$ Argument Error: all_defined requires 4 arguments"
fields_arg = args[1]
if not isinstance(fields_arg, list):
return "$$$$ Argument Error: all_defined requires arg1 to be a list of field names"
for field_name in fields_arg:
field_value = get_value_from_inclusion(output_inclusion, field_name)
if field_value is None or field_value == "undefined":
condition = False
break
else:
condition = True
result_if_true = resolve_value(args[2])
result_if_false = resolve_value(args[3])
elif operator == "==":
if len(args) != 5:
return "$$$$ Argument Error: == requires 5 arguments"
value1 = resolve_value(args[1])
value2 = resolve_value(args[2])
if value1 is None or value1 == "undefined" or value2 is None or value2 == "undefined":
return "undefined"
condition = (value1 == value2)
result_if_true = resolve_value(args[3])
result_if_false = resolve_value(args[4])
elif operator == "!=":
if len(args) != 5:
return "$$$$ Argument Error: != requires 5 arguments"
value1 = resolve_value(args[1])
value2 = resolve_value(args[2])
if value1 is None or value1 == "undefined" or value2 is None or value2 == "undefined":
return "undefined"
condition = (value1 != value2)
result_if_true = resolve_value(args[3])
result_if_false = resolve_value(args[4])
else:
return f"$$$$ Unknown Operator: {operator}"
return result_if_true if condition else result_if_false
return f"$$$$ Unknown Custom Function: {function_name}"
def process_inclusions_mapping(output_inclusion, inclusion_data, record_data, request_data, all_questionnaires, six_month_visit_data):
"""Processes and adds the inclusions mapping fields to the inclusion dictionary."""
for field in inclusions_mapping_config:
field_name = field["field_name"]
field_group = field.get("field_group", "Extended_Fields") # Default group if not specified
final_value = "undefined" # Default value
# Check condition
condition_field_name = field.get("field_condition")
if condition_field_name:
condition_value = get_value_from_inclusion(output_inclusion, condition_field_name)
if condition_value is None or condition_value == "undefined":
final_value = "undefined"
elif not isinstance(condition_value, bool):
final_value = "$$$$ Condition Field Error"
elif not condition_value:
final_value = "N/A"
# If condition allows, process the field
if final_value == "undefined":
source_name = field.get("source_name")
source_type = field.get("source_type")
field_path = field.get("field_path")
# Get raw value from appropriate source
if source_name == "Calculated":
function_name = field.get("source_id")
args = field_path
final_value = _execute_custom_function(function_name, args, output_inclusion)
elif source_type in ["q_id", "q_name", "q_category"]:
final_value = _get_field_value_from_questionnaire(all_questionnaires, field)
elif source_type == "record":
final_value = get_nested_value(record_data, field_path, default="undefined")
elif source_type == "inclusion":
final_value = get_nested_value(inclusion_data, field_path, default="undefined")
elif source_type == "request":
final_value = get_nested_value(request_data, field_path, default="undefined")
elif source_type == "6_month_visit":
final_value = get_nested_value(six_month_visit_data, field_path, default="undefined")
else:
final_value = f"$$$$ Unknown Source Type: {source_type}"
# If the source data itself is missing (e.g., 6-month visit not created), log a warning but continue
if final_value == "$$$$ No Data":
patient_id = inclusion_data.get("id", "Unknown")
pseudo = inclusion_data.get("pseudo", "Unknown")
logging.warning(f"No '{source_type}' data source found for Patient {patient_id} / {pseudo} (Field: {field_name})")
final_value = "undefined"
# Post-processing: Apply true_if_any and value_labels transformations (for all sources)
if final_value not in ["undefined", "$$$$ No Data"]:
# Check if any value matches
check_values = field.get("true_if_any")
if check_values:
raw_value_set = set(final_value if isinstance(final_value, list) else [final_value])
check_values_set = set(check_values if isinstance(check_values, list) else [check_values])
final_value = not raw_value_set.isdisjoint(check_values_set)
# Map value to label
value_labels = field.get("value_labels")
if value_labels and final_value not in ["$$$$ Format Error : Array expected"]:
found = False
for label_map in value_labels:
if label_map.get("value") == final_value:
final_value = get_nested_value(label_map, ["text", "fr"], default=f"$$$$ Value Error : {final_value}")
found = True
break
if not found:
final_value = f"$$$$ Value Error : {final_value}"
# Post-processing: If the value is a list (e.g. from a wildcard), join it with a pipe.
if isinstance(final_value, list):
final_value = "|".join(map(str, final_value))
# Post-processing: Format score dictionaries
if isinstance(final_value, dict) and 'total' in final_value and 'max' in final_value:
final_value = f"{final_value['total']}/{final_value['max']}"
# Post-processing: Apply field template
field_template = field.get("field_template")
if field_template and final_value not in ["undefined", "N/A"] and isinstance(final_value, (str, int, float, bool)):
final_value = field_template.replace("$value", str(final_value))
# Ensure the group sub-dictionary exists
if field_group not in output_inclusion:
output_inclusion[field_group] = {}
output_inclusion[field_group][field_name] = final_value
# ============================================================================
# BLOCK 7: BUSINESS API CALLS
# ============================================================================
@api_call_with_retry
def get_all_organizations():
start_time = perf_counter()
with console.status("[bold green]Getting Organizations...", spinner="dots"):
client = get_httpx_client()
client.base_url = RC_URL
response = client.get(API_RC_GET_ALL_ORGANIZATIONS_ENDPOINT,
headers={"Authorization": f"Bearer {access_token}"}, timeout=1200)
response.raise_for_status()
duration = perf_counter() - start_time
console.print(f"Organizations loaded. ({duration:.2f}s)", style="green")
return response.json()
@api_call_with_retry
def _get_organization_statistics(organization_id):
client = get_httpx_client()
client.base_url = RC_URL
response = client.post(API_RC_INCLUSION_STATISTICS_ENDPOINT,
headers={"Authorization": f"Bearer {access_token}"},
json={"protocolId": RC_ENDOBEST_PROTOCOL_ID, "center": organization_id,
"excludedCenters": RC_ENDOBEST_EXCLUDED_CENTERS},
timeout=API_TIMEOUT)
response.raise_for_status()
return response.json()["statistic"]
def get_organization_counters(organization):
organization_id = organization['id']
stats = _get_organization_statistics(organization_id)
organization["patients_count"] = stats.get("totalInclusions", 0)
organization["preincluded_count"] = stats.get("preIncluded", 0)
organization["included_count"] = stats.get("included", 0)
organization["prematurely_terminated_count"] = stats.get("prematurelyTerminated", 0)
return organization
@api_call_with_retry
def get_organization_inclusions(organization_id, limit, page):
client = get_httpx_client()
client.base_url = RC_URL
response = client.post(f"{API_RC_SEARCH_INCLUSIONS_ENDPOINT}?limit={limit}&page={page}",
headers={"Authorization": f"Bearer {access_token}"},
json={"protocolId": RC_ENDOBEST_PROTOCOL_ID, "center": organization_id,
"keywords": ""}, timeout=API_TIMEOUT)
response.raise_for_status()
return response.json()["data"]
@api_call_with_retry
def get_record_by_patient_id(patient_id, organization_id):
client = get_httpx_client()
client.base_url = RC_URL
response = client.post(API_RC_GET_RECORD_BY_PATIENT_ENDPOINT,
headers={"Authorization": f"Bearer {access_token}"},
json={"center": organization_id, "patientId": patient_id,
"mode": "exchange", "state": "ongoing", "includeEndoParcour": False,
"sourceClient": "pro_prm"},
timeout=API_TIMEOUT)
response.raise_for_status()
return response.json()
@api_call_with_retry
def get_request_by_tube_id(tube_id):
client = get_httpx_client()
client.base_url = GDD_URL
response = client.get(f"{API_GDD_GET_REQUEST_BY_TUBE_ID_ENDPOINT}/{tube_id}?isAdmin=true&organization=undefined",
headers={"Authorization": f"Bearer {access_token}"}, timeout=API_TIMEOUT)
response.raise_for_status()
return response.json()
@api_call_with_retry
def search_visit_by_pseudo_and_order(pseudo, order):
"""Searches for a visit by patient pseudo and visit order."""
client = get_httpx_client()
client.base_url = RC_URL
response = client.post(API_RC_SEARCH_VISITS_ENDPOINT,
headers={"Authorization": f"Bearer {access_token}"},
json={"visitOrder": order, "keywords": pseudo},
timeout=API_TIMEOUT)
response.raise_for_status()
resp_json = response.json()
if isinstance(resp_json, dict):
data = resp_json.get("data")
if isinstance(data, list) and len(data) > 0:
return data[0]
return None
@api_call_with_retry
def get_all_questionnaires_by_patient(patient_id, record_data):
"""Fetches all questionnaires for a patient with a single API call."""
client = get_httpx_client()
client.base_url = RC_URL
payload = {
"context": "clinic_research",
"subject": patient_id
}
# Extract blockedQcmVersions from record (same logic as get_questionnaire_answers)
if record_data is None:
all_blocked_versions = []
else:
all_blocked_versions = get_nested_value(record_data, path=["record", "protocol_inclusions", 0, "blockedQcmVersions"],
default=[])
# Ensure it's a list even if get_nested_value returns "$$$$ No Data"
if all_blocked_versions == "$$$$ No Data":
all_blocked_versions = []
if all_blocked_versions:
payload["blockedQcmVersions"] = all_blocked_versions
response = client.post(API_RC_GET_SURVEYS_ENDPOINT,
headers={"Authorization": f"Bearer {access_token}"},
json=payload,
timeout=API_TIMEOUT)
response.raise_for_status()
response_data = response.json()
# Build dictionary with questionnaire metadata for searching
results = {}
for item in response_data:
q_id = get_nested_value(item, path=["questionnaire", "id"])
q_name = get_nested_value(item, path=["questionnaire", "name"])
q_category = get_nested_value(item, path=["questionnaire", "category"])
answers = get_nested_value(item, path=["answers"], default={})
if q_id:
results[q_id] = {
"questionnaire": {
"id": q_id,
"name": q_name,
"category": q_category
},
"answers": answers
}
return results
# ============================================================================
# BLOCK 7b: ORGANIZATION CENTER MAPPING
# ============================================================================
def load_organization_center_mapping():
"""
Loads organization ↔ center mapping from Excel file in script directory.
Returns:
dict: {organization_name_normalized: center_name} or {} if error/skip
"""
mapping_file = ORG_CENTER_MAPPING_FILE_NAME
if not os.path.exists(mapping_file):
console.print(f"[yellow]⚠ Mapping file not found at: {mapping_file}. Skipping center mapping.[/yellow]")
return {}
try:
workbook = openpyxl.load_workbook(mapping_file)
except Exception as e:
console.print(f"[yellow]⚠ Error loading mapping file: {e}. Skipping center mapping.[/yellow]")
logging.warning(f"Error loading mapping file: {e}")
return {}
if ORG_CENTER_MAPPING_TABLE_NAME not in workbook.sheetnames:
console.print(f"[yellow]⚠ Sheet '{ORG_CENTER_MAPPING_TABLE_NAME}' not found in mapping file. Skipping center mapping.[/yellow]")
return {}
sheet = workbook[ORG_CENTER_MAPPING_TABLE_NAME]
headers = [cell.value for cell in sheet[1]]
# Validate required columns
if "Organization_Name" not in headers or "Center_Name" not in headers:
console.print(f"[yellow]⚠ Required columns 'Organization_Name' or 'Center_Name' not found in mapping file. Skipping center mapping.[/yellow]")
return {}
# Load mapping rows
mapping_rows = []
try:
for row in sheet.iter_rows(min_row=2, values_only=True):
# Skip empty rows
if all(cell is None for cell in row):
continue
row_dict = dict(zip(headers, row))
org_name = row_dict.get("Organization_Name")
center_name = row_dict.get("Center_Name")
if org_name and center_name:
mapping_rows.append({
"Organization_Name": org_name,
"Center_Name": center_name
})
except Exception as e:
console.print(f"[yellow]⚠ Error reading mapping file rows: {e}. Skipping center mapping.[/yellow]")
logging.warning(f"Error reading mapping file rows: {e}")
return {}
# === VALIDATE: Check for duplicates on NORMALIZED versions ===
org_names_normalized = {} # {normalized: original}
center_names_normalized = {} # {normalized: original}
for row in mapping_rows:
org_name_raw = row["Organization_Name"]
center_name_raw = row["Center_Name"]
# Normalize
org_normalized = org_name_raw.strip().lower() if isinstance(org_name_raw, str) else str(org_name_raw).strip().lower()
center_normalized = center_name_raw.strip().lower() if isinstance(center_name_raw, str) else str(center_name_raw).strip().lower()
# Check for duplicates
if org_normalized in org_names_normalized:
console.print(f"[yellow]⚠ Duplicate found in Organization_Name: '{org_name_raw}'. Skipping center mapping.[/yellow]")
logging.warning(f"Duplicate in Organization_Name: '{org_name_raw}'")
return {}
if center_normalized in center_names_normalized:
console.print(f"[yellow]⚠ Duplicate found in Center_Name: '{center_name_raw}'. Skipping center mapping.[/yellow]")
logging.warning(f"Duplicate in Center_Name: '{center_name_raw}'")
return {}
# Store normalized version
org_names_normalized[org_normalized] = org_name_raw
center_names_normalized[center_normalized] = center_name_raw
# === BUILD MAPPING DICT ===
mapping_dict = {}
for row in mapping_rows:
org_name_raw = row["Organization_Name"]
center_name_raw = row["Center_Name"]
# Normalize key, keep center_name clean (strip but not lower)
org_normalized = org_name_raw.strip().lower() if isinstance(org_name_raw, str) else str(org_name_raw).strip().lower()
center_clean = center_name_raw.strip() if isinstance(center_name_raw, str) else str(center_name_raw).strip()
mapping_dict[org_normalized] = center_clean
return mapping_dict
def apply_center_mapping(organizations_list, mapping_dict):
"""
Applies organization → center mapping to organizations list.
Adds 'Center_Name' field to each organization only if mapping succeeded.
Args:
organizations_list: List of organization dicts
mapping_dict: {organization_name_normalized: center_name}
"""
if not mapping_dict:
# Mapping dict is empty due to error → skip mapping
return
unmapped = []
for org in organizations_list:
org_name = org.get("name", "")
org_name_normalized = org_name.strip().lower()
# Try to find match in mapping dict
if org_name_normalized in mapping_dict:
org["Center_Name"] = mapping_dict[org_name_normalized]
else:
# Fallback to organization name
org["Center_Name"] = org_name
unmapped.append(org_name)
# Display results
if not unmapped:
console.print(f"[green]✓ All {len(organizations_list)} organizations mapped successfully.[/green]")
else:
console.print(f"[yellow]⚠ {len(unmapped)} organization(s) not mapped:[/yellow]")
for org_name in sorted(unmapped):
console.print(f"[yellow] - {org_name}[/yellow]")
# ============================================================================
# BLOCK 8: PROCESSING ORCHESTRATION
# ============================================================================
def _process_inclusion_data(inclusion, organization):
"""Processes a single inclusion record and returns a dictionary."""
organization_id = organization["id"]
patient_id = get_nested_value(inclusion, path=["id"])
pseudo = get_nested_value(inclusion, path=["pseudo"], default="Unknown")
# Set thread-local context for detailed error logging in decorators
ctx = {"id": patient_id, "pseudo": pseudo}
thread_local_storage.current_patient_context = ctx
# Initialize empty output structure
output_inclusion = {}
# --- Prepare all data sources ---
# 1. Launch Visit Search asynchronously (it's slow, ~5s)
# We use run_with_context to pass the patient identity to the new thread
visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2)
# 2. Prepare inclusion_data: enrich inclusion with organization info
inclusion_data = dict(inclusion)
inclusion_data["organization_id"] = organization_id
inclusion_data["organization_name"] = organization["name"]
if "Center_Name" in organization:
inclusion_data["center_name"] = organization["Center_Name"]
# 3. Prepare record_data (sequential as it's often needed for questionnaires)
record_data = get_record_by_patient_id(patient_id, organization_id)
# 4. Get tube_id for request and launch in parallel with questionnaires
tube_id = get_nested_value(record_data, path=["record", "clinicResearchData", 0, "requestMetaData", "tubeId"], default="undefined")
request_future = subtasks_thread_pool.submit(run_with_context, get_request_by_tube_id, ctx, tube_id)
all_questionnaires = get_all_questionnaires_by_patient(patient_id, record_data)
# --- Synchronize all asynchronous tasks ---
try:
request_data = request_future.result()
except Exception as e:
logging.error(f"Error fetching request data for patient {patient_id}: {e}")
request_data = None
try:
six_month_visit_data = visit_future.result()
except Exception as e:
logging.error(f"Error searching 6-month visit for patient {pseudo}: {e}")
six_month_visit_data = None
# --- Process all fields from configuration ---
process_inclusions_mapping(output_inclusion, inclusion_data, record_data, request_data, all_questionnaires, six_month_visit_data)
return output_inclusion
def process_organization(organization, index, total_organizations):
global threads_list, global_pbar
position = get_thread_position() + 2
organization_id = organization["id"]
output_inclusions = []
inclusions = get_organization_inclusions(organization_id, 1000, 1)
with tqdm(total=len(inclusions), unit='Incl.',
desc=f"{str(index) + "/" + str(total_organizations):<9} - {organization['name'][:40]:<40}",
position=position, leave=False, bar_format=custom_bar_format) as incl_pbar:
for inclusion in inclusions:
output_inclusion = _process_inclusion_data(inclusion, organization)
output_inclusions.append(output_inclusion)
incl_pbar.update(1)
with _global_pbar_lock:
if global_pbar:
global_pbar.update(1)
return output_inclusions
# ============================================================================
# BLOCK 9: MAIN EXECUTION
# ============================================================================
def main():
global global_pbar, excel_export_config, excel_export_enabled
# --- Check for CLI Check_Only mode ---
check_only_mode = "--check-only" in sys.argv
if check_only_mode:
run_check_only_mode(sys.argv)
return
# --- Check for CLI Excel_Only mode ---
excel_only_mode = "--excel-only" in sys.argv
if excel_only_mode:
# Load mapping configs for Excel export (same as normal workflow)
print()
load_inclusions_mapping_config()
load_organizations_mapping_config()
# Completely externalized Excel-only workflow
export_excel_only(sys.argv, INCLUSIONS_FILE_NAME, ORGANIZATIONS_FILE_NAME,
inclusions_mapping_config, organizations_mapping_config)
return
# === NORMAL MODE: Full data collection ===
print()
login_status = login()
while login_status == "Error":
login_status = login()
if login_status == "Exit":
return
print()
number_of_threads = int((questionary.text("Number of threads :", default="12",
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask()))
print()
ask_on_retry_exhausted()
print()
wait_for_scheduled_launch()
print()
load_inclusions_mapping_config()
load_organizations_mapping_config()
# === LOAD AND VALIDATE EXCEL EXPORT CONFIGURATION ===
print()
console.print("[bold cyan]Loading Excel export configuration...[/bold cyan]")
# Validate Excel config (no data loading - JSONs don't exist yet in NORMAL MODE)
# prepare_excel_export() displays error messages directly to console
excel_export_config, has_config_critical, _ = \
prepare_excel_export(inclusions_mapping_config, organizations_mapping_config)
# Ask user confirmation if critical errors found
if has_config_critical:
print()
answer = questionary.confirm(
"⚠ Critical configuration errors detected. Continue anyway?",
default=False
).ask()
if not answer:
console.print("[bold red]Aborted by user[/bold red]")
return
else:
excel_export_enabled = False # Skip Excel export if user continues despite errors
else:
excel_export_enabled = True if excel_export_config else False # Config is valid, Excel export can proceed
print()
start_time = perf_counter()
organizations_list = get_all_organizations()
organizations_list = [org for org in organizations_list if org["id"] not in RC_ENDOBEST_EXCLUDED_CENTERS]
# === APPLY ORGANIZATION CENTER MAPPING ===
print()
print("Mapping organizations to centers...")
mapping_dict = load_organization_center_mapping()
apply_center_mapping(organizations_list, mapping_dict)
print()
with ThreadPoolExecutor(max_workers=number_of_threads) as counter_pool:
futures = [counter_pool.submit(get_organization_counters, org) for org in organizations_list]
organizations_list_with_counters = []
for future in tqdm(as_completed(futures), total=len(futures), desc=f"{'Fetching Organizations Counters':<52}",
unit="orgs.", bar_format=custom_bar_format):
try:
updated_org = future.result()
organizations_list_with_counters.append(updated_org)
except Exception as exc:
print(f"\nCRITICAL ERROR while fetching counters: {exc}")
counter_pool.shutdown(wait=False, cancel_futures=True)
raise
organizations_list = organizations_list_with_counters
print()
inclusions_total_count = sum(org.get('patients_count', 0) for org in organizations_list)
organizations_list.sort(key=lambda org: (-org.get('patients_count', 0), org.get('name', '')))
number_of_organizations = len(organizations_list)
print(f"{inclusions_total_count} Inclusions in {number_of_organizations} Organizations...")
print()
output_inclusions = []
with tqdm(total=inclusions_total_count, unit="incl.", desc=f"{'Overall Progress':<52}", position=0, leave=True,
bar_format=custom_bar_format) as overall_progress_pbar:
global_pbar = overall_progress_pbar
with ThreadPoolExecutor(max_workers=number_of_threads) as thread_pool:
futures = [thread_pool.submit(process_organization, organization, index + 1, number_of_organizations)
for index, organization in enumerate(organizations_list)]
for future in as_completed(futures):
try:
result = future.result()
output_inclusions.extend(result)
except Exception as exc:
logging.critical(f"Arrêt dû à une exception critique dans un worker: {exc}", exc_info=True)
print(f"\nERREUR CRITIQUE dans un thread de traitement, arrêt du processus:")
print(f"Exception: {exc}")
print("Traceback original du worker:")
traceback.print_exc()
print(
"Signal d'arrêt envoyé au pool principal avec tentative d'annulation des tâches en attente...")
thread_pool.shutdown(wait=False, cancel_futures=True)
raise
def get_sort_key(item):
org_name = get_nested_value(item, ["Patient_Identification", "Organisation_Name"], default='')
pseudo = get_nested_value(item, ["Patient_Identification", "Pseudo"], default='')
date_str = get_nested_value(item, ["Inclusion", "Inclusion_Date"], default='')
sort_date = datetime.max
if date_str and date_str != "undefined":
try:
sort_date = datetime.strptime(date_str, '%d/%m/%Y')
except (ValueError, TypeError):
pass
return org_name, sort_date, pseudo
try:
print()
print()
print("Sorting results...")
output_inclusions.sort(key=get_sort_key)
# === QUALITY CHECKS (before backup to avoid losing history on crash) ===
print()
has_coherence_critical, has_regression_critical = run_quality_checks(
current_inclusions=output_inclusions, # list: données en mémoire (nouvellement collectées)
organizations_list=organizations_list, # list: données en mémoire avec compteurs
old_inclusions_filename=INCLUSIONS_FILE_NAME # str: "endobest_inclusions.json" (version courante sur disque)
)
# === CHECK FOR CRITICAL ISSUES AND ASK USER CONFIRMATION ===
if has_coherence_critical or has_regression_critical:
print()
console.print("[bold red]⚠ CRITICAL issues detected in quality checks![/bold red]")
confirm_write = questionary.confirm(
"Do you want to write the results anyway?",
default=True
).ask()
if not confirm_write:
console.print("[yellow]✗ Output writing cancelled by user. Files were not modified.[/yellow]")
console.print("[yellow] You can re-run the script to try again.[/yellow]")
print()
print(f"Elapsed time : {str(timedelta(seconds=perf_counter() - start_time))}")
return
# === BACKUP OLD FILES (only after checks pass and user confirmation) ===
backup_output_files()
# === WRITE NEW FILES ===
print("Writing files...")
with open(INCLUSIONS_FILE_NAME, 'w', encoding='utf-8') as f_json:
json.dump(output_inclusions, f_json, indent=4, ensure_ascii=False)
with open(ORGANIZATIONS_FILE_NAME, 'w', encoding='utf-8') as f_json:
json.dump(organizations_list, f_json, indent=4, ensure_ascii=False)
console.print("[green]✓ Data saved to JSON files[/green]")
print()
# === EXCEL EXPORT ===
# Completely externalized Excel export workflow
run_normal_mode_export(excel_export_enabled, excel_export_config,
inclusions_mapping_config, organizations_mapping_config)
except IOError as io_err:
logging.critical(f"Error while writing json file : {io_err}")
print(f"Error while writing json file : {io_err}")
except Exception as exc:
logging.critical(f"Error while writing json file : {exc}")
print(f"Error while writing json file : {exc}")
print()
print(f"Elapsed time : {str(timedelta(seconds=perf_counter() - start_time))}")
if __name__ == '__main__':
try:
main()
except Exception as e:
logging.critical(f"Le script principal s'est terminé prématurément à cause d'une exception: {e}", exc_info=True)
print(f"Le script s'est arrêté à cause d'une erreur : {e}")
finally:
if 'subtasks_thread_pool' in globals() and subtasks_thread_pool:
subtasks_thread_pool.shutdown(wait=False, cancel_futures=True)
print('\n')
input("Press Enter to exit...")