Compare commits

..

29 Commits

Author SHA1 Message Date
c679182707 Config Debug Fix 2026-05-06 03:55:27 +01:00
0d1b362217 --config CLI Param fix 2026-05-06 03:24:56 +01:00
c812091ce3 add -config CLI param 2026-05-06 02:54:36 +01:00
04b4298296 Config clear 2026-05-06 02:07:13 +01:00
5dc5285073 gitignore *.csv 2026-05-06 01:59:51 +01:00
e3a1ef78cf Qustionnaires Unliked Patient filter 2026-05-06 01:58:01 +01:00
51864267d7 Strategy change new rule (Part2 - Fix2) 2026-05-05 23:04:16 +01:00
90a92b8184 Strategy change new rule (Part2 - Fix) 2026-05-05 18:26:21 +01:00
6eab4f367f Strategy change new rule (Part2) 2026-05-05 17:58:21 +01:00
d990307456 Strategy change new rule (Part1) 2026-05-05 02:36:03 +01:00
7174ed44f0 Counters adjustment 2026-04-22 22:17:05 +01:00
dae2114fe2 Counters adjustment and minor template update 2026-04-22 22:03:43 +01:00
6a9dbf1d95 Exclude Non Consent from counters 2026-04-22 20:56:23 +01:00
00dd82290c Detecting duplicate Questionnaires 2026-04-16 18:43:17 +01:00
3904948c32 Optional 6 month visit completeness 2026-03-12 16:16:22 +00:00
0db52e6492 re-login if refresh-token exhausted 2026-02-21 01:12:00 +00:00
9fbca92f37 Clear keyboard queue while waiting schedule 2026-02-20 15:17:07 +00:00
cc709200a0 On Retry Exhausted Policy 2026-02-20 13:54:34 +00:00
af57a1e187 Adding planned execution 2026-02-20 13:42:25 +00:00
bff3c6b947 clean 2026-02-09 22:40:12 +01:00
739aae0357 . 2026-02-09 22:34:08 +01:00
fd37fac283 . 2026-02-09 22:22:14 +01:00
2df128a4d7 . 2026-02-09 22:15:00 +01:00
9321a33c1b .ignore 2026-02-09 22:08:27 +01:00
c80f19c537 .ignore 2026-02-09 21:58:47 +01:00
f1761dce8c . 2026-02-09 21:56:33 +01:00
8ef3392e4d igonre log 2026-02-09 18:44:54 +01:00
c7b1e348a2 ingnore exe 2026-02-09 18:41:38 +01:00
aac259da42 Just other executable file for dashboard, which is not related to the 6-month visit. It is used for testing the dashboard without the 6-month visit data. 2026-02-09 17:14:40 +01:00
12 changed files with 417 additions and 91 deletions

4
.gitignore vendored
View File

@@ -195,9 +195,11 @@ Endobest Reporting/
jsons history/ jsons history/
nul nul
# Ignore all json, exe, log and xlsx files # Ignore all json, exe, log, txt, csv and xlsx files
*.json *.json
*.exe *.exe
*.log *.log
*.txt
*.csv
/*.xlsx /*.xlsx
!eb_org_center_mapping.xlsx !eb_org_center_mapping.xlsx

Binary file not shown.

Binary file not shown.

View File

@@ -21,7 +21,7 @@
# identification, and support for complex data extraction using JSON path expressions. # identification, and support for complex data extraction using JSON path expressions.
import json import json
import logging import logging
import msvcrt
import os import os
import re import re
import sys import sys
@@ -86,6 +86,8 @@ from eb_dashboard_utils import (
clear_httpx_client, clear_httpx_client,
get_thread_position, get_thread_position,
get_config_path, get_config_path,
set_dashboard_config_path_override,
get_dashboard_config_path,
thread_local_storage, thread_local_storage,
run_with_context run_with_context
) )
@@ -118,6 +120,10 @@ access_token = ""
refresh_token = "" refresh_token = ""
threads_list = [] threads_list = []
_token_refresh_lock = threading.Lock() _token_refresh_lock = threading.Lock()
on_retry_exhausted = "ask" # "ask" | "ignore" | "abort" — set at startup
fetch_six_month_visit = False # Whether to fetch 6-month visit data (slow, ~5s per patient)
_stored_username = "" # Credentials stored at login for automatic re-login
_stored_password = ""
_threads_list_lock = threading.Lock() _threads_list_lock = threading.Lock()
global_pbar = None global_pbar = None
_global_pbar_lock = threading.Lock() _global_pbar_lock = threading.Lock()
@@ -148,6 +154,19 @@ if "--debug" in sys.argv:
sys.argv.remove("--debug") sys.argv.remove("--debug")
enable_debug_mode() enable_debug_mode()
# Handle --config <path> override (remove from sys.argv to preserve positional args)
if "--config" in sys.argv:
_idx = sys.argv.index("--config")
if _idx + 1 >= len(sys.argv):
print("Error: --config requires a file path argument")
sys.exit(1)
_raw_config_path = sys.argv[_idx + 1]
del sys.argv[_idx:_idx + 2]
if os.path.isabs(_raw_config_path):
set_dashboard_config_path_override(_raw_config_path)
else:
set_dashboard_config_path_override(os.path.join(get_config_path(), _raw_config_path))
# --- Progress Bar Configuration --- # --- Progress Bar Configuration ---
# NOTE: BAR_N_FMT_WIDTH, BAR_TOTAL_FMT_WIDTH, BAR_TIME_WIDTH, BAR_RATE_WIDTH # NOTE: BAR_N_FMT_WIDTH, BAR_TOTAL_FMT_WIDTH, BAR_TIME_WIDTH, BAR_RATE_WIDTH
# are imported from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH) # are imported from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH)
@@ -186,8 +205,10 @@ def new_token():
finally: finally:
if attempt < ERROR_MAX_RETRY - 1: if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY) sleep(WAIT_BEFORE_RETRY)
logging.critical("Persistent error in refresh_token") # Refresh token exhausted — attempt full re-login with stored credentials
raise httpx.RequestError(message="Persistent error in refresh_token") logging.warning("Refresh token exhausted. Attempting re-login with stored credentials.")
_do_login(_stored_username, _stored_password)
logging.info("Re-login successful. New tokens acquired.")
def api_call_with_retry(func): def api_call_with_retry(func):
@@ -212,7 +233,10 @@ def api_call_with_retry(func):
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
logging.info(f"Token expired for {func_name}. Refreshing token.") logging.info(f"Token expired for {func_name}. Refreshing token.")
try:
new_token() new_token()
except (httpx.RequestError, httpx.HTTPStatusError) as token_exc:
logging.warning(f"Token refresh/re-login failed for {func_name}: {token_exc}")
if attempt < ERROR_MAX_RETRY - 1: if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY) sleep(WAIT_BEFORE_RETRY)
@@ -225,8 +249,18 @@ def api_call_with_retry(func):
sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES) sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES)
break # Exit for loop to restart batch in while True break # Exit for loop to restart batch in while True
else: else:
# All automatic batches exhausted, ask the user # All automatic batches exhausted — apply on_retry_exhausted policy
with _user_interaction_lock: with _user_interaction_lock:
if on_retry_exhausted == "ignore":
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
logging.warning(f"[AUTO-IGNORE] Skipping {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
return None
elif on_retry_exhausted == "abort":
logging.critical(f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}")
raise httpx.RequestError(message=f"Persistent error in {func_name} (auto-aborted)")
else: # "ask" — display error then interactive prompt
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]") console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
console.print(f"[red]Exception: {exc}[/red]") console.print(f"[red]Exception: {exc}[/red]")
@@ -244,7 +278,6 @@ def api_call_with_retry(func):
batch_count = 1 # Reset batch counter for the next interactive round batch_count = 1 # Reset batch counter for the next interactive round
break # Exit for loop to restart batch in while True break # Exit for loop to restart batch in while True
elif choice == "Ignore (return None and continue)": elif choice == "Ignore (return None and continue)":
# Retrieve context if available
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"}) ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}") logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
return None return None
@@ -259,51 +292,56 @@ def api_call_with_retry(func):
# BLOCK 3: AUTHENTICATION # BLOCK 3: AUTHENTICATION
# ============================================================================ # ============================================================================
def login(): def _do_login(username, password):
"""Performs the two-step authentication (IAM → RC) with the given credentials.
Updates global access_token and refresh_token on success.
Raises httpx.RequestError or httpx.HTTPStatusError on failure.
Must NOT acquire _token_refresh_lock (caller's responsibility).
"""
global access_token, refresh_token global access_token, refresh_token
client = get_httpx_client()
client.base_url = IAM_URL
response = client.post(API_AUTH_LOGIN_ENDPOINT,
json={"username": username, "password": password},
timeout=60)
response.raise_for_status()
master_token = response.json()["access_token"]
user_id = response.json()["userId"]
client = get_httpx_client()
client.base_url = RC_URL
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT,
headers={"Authorization": f"Bearer {master_token}"},
json={"userId": user_id, "clientId": RC_APP_ID,
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
timeout=20)
response.raise_for_status()
access_token = response.json()["access_token"]
refresh_token = response.json()["refresh_token"]
def login():
global _stored_username, _stored_password
user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask()) user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask())
password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask()) password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask())
if not (user_name and password): if not (user_name and password):
return "Exit" return "Exit"
try: try:
client = get_httpx_client() _do_login(user_name, password)
client.base_url = IAM_URL
response = client.post(API_AUTH_LOGIN_ENDPOINT, json={"username": user_name, "password": password},
timeout=20)
response.raise_for_status()
master_token = response.json()["access_token"]
user_id = response.json()["userId"]
except httpx.RequestError as exc: except httpx.RequestError as exc:
print(f"Login Error : {exc}") print(f"Login Error : {exc}")
logging.warning(f"Login Error : {exc}") logging.warning(f"Login Error : {exc}")
return "Error" return "Error"
except httpx.HTTPStatusError as exc: except httpx.HTTPStatusError as exc:
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}") print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
logging.warning( logging.warning(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
return "Error"
try:
client = get_httpx_client()
client.base_url = RC_URL
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {master_token}"},
json={"userId": user_id, "clientId": RC_APP_ID,
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
timeout=20)
response.raise_for_status()
access_token = response.json()["access_token"]
refresh_token = response.json()["refresh_token"]
except httpx.RequestError as exc:
print(f"Login Error : {exc}")
logging.warning(f"Login Error : {exc}")
return "Error"
except httpx.HTTPStatusError as exc:
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
logging.warning(f"Login Error : {exc}")
return "Error" return "Error"
_stored_username = user_name
_stored_password = password
print() print()
print("Login Success") print("Login Success")
return "Success" return "Success"
@@ -313,6 +351,100 @@ def login():
# BLOCK 3B: FILE UTILITIES # BLOCK 3B: FILE UTILITIES
# ============================================================================ # ============================================================================
def ask_on_retry_exhausted():
"""Asks the user what to do when all API retry batches are exhausted."""
global on_retry_exhausted
choice = questionary.select(
"On retry exhausted :",
choices=[
"Ask (interactive prompt)",
"Ignore (return None and continue)",
"Abort (stop script)"
]
).ask()
if choice is None or choice == "Ask (interactive prompt)":
on_retry_exhausted = "ask"
elif choice == "Ignore (return None and continue)":
on_retry_exhausted = "ignore"
else:
on_retry_exhausted = "abort"
def ask_fetch_six_month_visit():
"""Asks the user whether to fetch 6-month visit data (slow API call, ~5s per patient)."""
global fetch_six_month_visit
choice = questionary.select(
"Fetch 6-month visit progress data? (slow, ~5s per patient) :",
choices=[
"No (skip, faster execution)",
"Yes (fetch 6-month visit data)"
]
).ask()
fetch_six_month_visit = (choice == "Yes (fetch 6-month visit data)")
def wait_for_scheduled_launch():
"""Asks the user when to start the processing and waits if needed.
Options: Immediately / In X minutes / At HH:MM
"""
choice = questionary.select(
"When to start processing ?",
choices=["Immediately", "In X minutes", "At HH:MM"]
).ask()
if choice is None or choice == "Immediately":
return
if choice == "In X minutes":
minutes_str = questionary.text(
"Number of minutes :",
validate=lambda x: x.isdigit() and int(x) > 0
).ask()
if not minutes_str:
return
target_time = datetime.now() + timedelta(minutes=int(minutes_str))
else: # "At HH:MM"
time_str = questionary.text(
"Start time (HH:MM) :",
validate=lambda x: bool(re.match(r'^\d{2}:\d{2}$', x)) and
0 <= int(x.split(':')[0]) <= 23 and
0 <= int(x.split(':')[1]) <= 59
).ask()
if not time_str:
return
now = datetime.now()
h, m = int(time_str.split(':')[0]), int(time_str.split(':')[1])
target_time = now.replace(hour=h, minute=m, second=0, microsecond=0)
if target_time <= now:
console.print("[yellow]⚠ Specified time is already past. Starting immediately.[/yellow]")
return
print()
try:
while True:
remaining = target_time - datetime.now()
if remaining.total_seconds() <= 0:
break
total_secs = int(remaining.total_seconds())
h = total_secs // 3600
m = (total_secs % 3600) // 60
s = total_secs % 60
target_str = target_time.strftime('%H:%M:%S')
print(f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ",
end="", flush=True)
sleep(1)
# Flush keyboard buffer to prevent stray keystrokes from polluting subsequent prompts
while msvcrt.kbhit():
msvcrt.getwch()
print()
console.print("[green]✓ Starting processing.[/green]")
except KeyboardInterrupt:
print()
console.print("[bold red]Launch cancelled by user.[/bold red]")
raise SystemExit(0)
def load_json_file(filename): def load_json_file(filename):
""" """
Load a JSON file from disk. Load a JSON file from disk.
@@ -340,7 +472,7 @@ def load_json_file(filename):
def load_inclusions_mapping_config(): def load_inclusions_mapping_config():
"""Loads and validates the inclusions mapping configuration from the Excel file.""" """Loads and validates the inclusions mapping configuration from the Excel file."""
global inclusions_mapping_config global inclusions_mapping_config
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME) config_path = get_dashboard_config_path(DASHBOARD_CONFIG_FILE_NAME)
try: try:
# Load with data_only=True to read calculated values instead of formulas # Load with data_only=True to read calculated values instead of formulas
@@ -442,7 +574,7 @@ def load_inclusions_mapping_config():
def load_organizations_mapping_config(): def load_organizations_mapping_config():
"""Loads and validates the organizations mapping configuration from the Excel file.""" """Loads and validates the organizations mapping configuration from the Excel file."""
global organizations_mapping_config global organizations_mapping_config
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME) config_path = get_dashboard_config_path(DASHBOARD_CONFIG_FILE_NAME)
try: try:
# Load with data_only=True to read calculated values instead of formulas # Load with data_only=True to read calculated values instead of formulas
@@ -500,6 +632,12 @@ def _find_questionnaire_by_id(qcm_dict, qcm_id):
if not isinstance(qcm_dict, dict): if not isinstance(qcm_dict, dict):
return None return None
qcm_data = qcm_dict.get(qcm_id) qcm_data = qcm_dict.get(qcm_id)
if qcm_data and qcm_data.get("_count", 1) > 1:
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
logging.error(
f"[DUPLICATE QCM] Patient {ctx['id']} ({ctx['pseudo']}): "
f"Questionnaire id='{qcm_id}' appeared {qcm_data['_count']} times in API response — using last received copy"
)
return qcm_data.get("answers") if qcm_data else None return qcm_data.get("answers") if qcm_data else None
@@ -507,20 +645,32 @@ def _find_questionnaire_by_name(qcm_dict, name):
"""Finds a questionnaire by name (sequential search, returns first match).""" """Finds a questionnaire by name (sequential search, returns first match)."""
if not isinstance(qcm_dict, dict): if not isinstance(qcm_dict, dict):
return None return None
for qcm in qcm_dict.values(): matches = [qcm for qcm in qcm_dict.values()
if get_nested_value(qcm, ["questionnaire", "name"]) == name: if get_nested_value(qcm, ["questionnaire", "name"]) == name]
return qcm.get("answers") if len(matches) > 1:
return None ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
ids = [get_nested_value(q, ["questionnaire", "id"]) for q in matches]
logging.error(
f"[DUPLICATE QCM] Patient {ctx['id']} ({ctx['pseudo']}): "
f"Questionnaire name='{name}' matches {len(matches)} entries (ids: {ids}) — returning first match"
)
return matches[0].get("answers") if matches else None
def _find_questionnaire_by_category(qcm_dict, category): def _find_questionnaire_by_category(qcm_dict, category):
"""Finds a questionnaire by category (sequential search, returns first match).""" """Finds a questionnaire by category (sequential search, returns first match)."""
if not isinstance(qcm_dict, dict): if not isinstance(qcm_dict, dict):
return None return None
for qcm in qcm_dict.values(): matches = [qcm for qcm in qcm_dict.values()
if get_nested_value(qcm, ["questionnaire", "category"]) == category: if get_nested_value(qcm, ["questionnaire", "category"]) == category]
return qcm.get("answers") if len(matches) > 1:
return None ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
ids = [get_nested_value(q, ["questionnaire", "id"]) for q in matches]
logging.error(
f"[DUPLICATE QCM] Patient {ctx['id']} ({ctx['pseudo']}): "
f"Questionnaire category='{category}' matches {len(matches)} entries (ids: {ids}) — returning first match"
)
return matches[0].get("answers") if matches else None
def _get_field_value_from_questionnaire(all_questionnaires, field_config): def _get_field_value_from_questionnaire(all_questionnaires, field_config):
@@ -557,6 +707,13 @@ def get_value_from_inclusion(inclusion_dict, key):
def _execute_custom_function(function_name, args, output_inclusion): def _execute_custom_function(function_name, args, output_inclusion):
"""Executes a custom function for a calculated field.""" """Executes a custom function for a calculated field."""
def dominant_no_value(has_undef, has_na):
"""Returns the dominant sentinel: 'undefined' > 'N/A' > None (real value present)."""
if has_undef: return "undefined"
if has_na: return "N/A"
return None
if function_name == "search_in_fields_using_regex": if function_name == "search_in_fields_using_regex":
if not args or len(args) < 2: if not args or len(args) < 2:
return "$$$$ Argument Error: search_in_fields_using_regex requires at least 2 arguments" return "$$$$ Argument Error: search_in_fields_using_regex requires at least 2 arguments"
@@ -565,16 +722,19 @@ def _execute_custom_function(function_name, args, output_inclusion):
field_names = args[1:] field_names = args[1:]
field_values = [] field_values = []
all_undefined = True has_undefined = False
has_na = False
for field_name in field_names: for field_name in field_names:
value = get_value_from_inclusion(output_inclusion, field_name) value = get_value_from_inclusion(output_inclusion, field_name)
field_values.append(value) field_values.append(value)
if value is not None and value != "undefined": if value is None or value == "undefined":
all_undefined = False has_undefined = True
elif value == "N/A":
has_na = True
if all_undefined: if not any(v not in (None, "undefined", "N/A") for v in field_values):
return "undefined" return dominant_no_value(has_undefined, has_na)
try: try:
for value in field_values: for value in field_values:
@@ -595,6 +755,8 @@ def _execute_custom_function(function_name, args, output_inclusion):
if value is None or value == "undefined": if value is None or value == "undefined":
return "undefined" return "undefined"
if value == "N/A":
return "N/A"
match = re.search(r'\((.*?)\)', str(value)) match = re.search(r'\((.*?)\)', str(value))
return match.group(1) if match else "undefined" return match.group(1) if match else "undefined"
@@ -608,6 +770,8 @@ def _execute_custom_function(function_name, args, output_inclusion):
if status is None or status == "undefined": if status is None or status == "undefined":
return "undefined" return "undefined"
if status == "N/A":
return "N/A"
if not isinstance(is_terminated, bool) or not is_terminated: if not isinstance(is_terminated, bool) or not is_terminated:
return status return status
@@ -617,7 +781,8 @@ def _execute_custom_function(function_name, args, output_inclusion):
elif function_name == "if_then_else": elif function_name == "if_then_else":
# Unified conditional function # Unified conditional function
# Syntax: ["operator", arg1, arg2_optional, result_if_true, result_if_false] # Syntax: ["operator", arg1, arg2_optional, result_if_true, result_if_false]
# Operators: "is_true", "is_false", "all_true", "is_defined", "is_undefined", "all_defined", "==", "!=" # Operators: "is_true", "is_false", "all_true", "any_true", "is_defined", "is_undefined", "all_defined", "==", "!=", ">", ">=", "<", "<="
# Sentinel propagation: "undefined" (unknown) > "N/A" (not applicable) > real value.
if not args or len(args) < 4: if not args or len(args) < 4:
return "$$$$ Argument Error: if_then_else requires at least 4 arguments" return "$$$$ Argument Error: if_then_else requires at least 4 arguments"
@@ -645,6 +810,8 @@ def _execute_custom_function(function_name, args, output_inclusion):
value = resolve_value(args[1]) value = resolve_value(args[1])
if value is None or value == "undefined": if value is None or value == "undefined":
return "undefined" return "undefined"
if value == "N/A":
return "N/A"
condition = (value is True) condition = (value is True)
result_if_true = resolve_value(args[2]) result_if_true = resolve_value(args[2])
result_if_false = resolve_value(args[3]) result_if_false = resolve_value(args[3])
@@ -655,6 +822,8 @@ def _execute_custom_function(function_name, args, output_inclusion):
value = resolve_value(args[1]) value = resolve_value(args[1])
if value is None or value == "undefined": if value is None or value == "undefined":
return "undefined" return "undefined"
if value == "N/A":
return "N/A"
condition = (value is False) condition = (value is False)
result_if_true = resolve_value(args[2]) result_if_true = resolve_value(args[2])
result_if_false = resolve_value(args[3]) result_if_false = resolve_value(args[3])
@@ -666,22 +835,58 @@ def _execute_custom_function(function_name, args, output_inclusion):
if not isinstance(fields_arg, list): if not isinstance(fields_arg, list):
return "$$$$ Argument Error: all_true requires arg1 to be a list of field names" return "$$$$ Argument Error: all_true requires arg1 to be a list of field names"
has_undefined = False
has_na = False
conditions = [] conditions = []
for field_name in fields_arg: for field_name in fields_arg:
field_value = get_value_from_inclusion(output_inclusion, field_name) field_value = get_value_from_inclusion(output_inclusion, field_name)
if field_value is None or field_value == "undefined": if field_value is None or field_value == "undefined":
return "undefined" has_undefined = True
elif field_value == "N/A":
has_na = True
else:
conditions.append(field_value) conditions.append(field_value)
status = dominant_no_value(has_undefined, has_na)
if status:
return status
condition = all(conditions) condition = all(conditions)
result_if_true = resolve_value(args[2]) result_if_true = resolve_value(args[2])
result_if_false = resolve_value(args[3]) result_if_false = resolve_value(args[3])
elif operator == "any_true":
# OR semantics: returns "undefined" only if ALL operands are undefined
if len(args) != 4:
return "$$$$ Argument Error: any_true requires 4 arguments"
fields_arg = args[1]
if not isinstance(fields_arg, list):
return "$$$$ Argument Error: any_true requires arg1 to be a list of field names"
has_undefined = False
has_na = False
resolved = []
for field_name in fields_arg:
field_value = get_value_from_inclusion(output_inclusion, field_name)
if field_value is None or field_value == "undefined":
has_undefined = True
elif field_value == "N/A":
has_na = True
else:
resolved.append(field_value)
if not resolved:
return dominant_no_value(has_undefined, has_na)
condition = any(resolved)
result_if_true = resolve_value(args[2])
result_if_false = resolve_value(args[3])
elif operator == "is_defined": elif operator == "is_defined":
if len(args) != 4: if len(args) != 4:
return "$$$$ Argument Error: is_defined requires 4 arguments" return "$$$$ Argument Error: is_defined requires 4 arguments"
value = resolve_value(args[1]) value = resolve_value(args[1])
condition = (value is not None and value != "undefined") condition = (value is not None and value != "undefined" and value != "N/A")
result_if_true = resolve_value(args[2]) result_if_true = resolve_value(args[2])
result_if_false = resolve_value(args[3]) result_if_false = resolve_value(args[3])
@@ -702,7 +907,7 @@ def _execute_custom_function(function_name, args, output_inclusion):
for field_name in fields_arg: for field_name in fields_arg:
field_value = get_value_from_inclusion(output_inclusion, field_name) field_value = get_value_from_inclusion(output_inclusion, field_name)
if field_value is None or field_value == "undefined": if field_value is None or field_value == "undefined" or field_value == "N/A":
condition = False condition = False
break break
else: else:
@@ -717,8 +922,12 @@ def _execute_custom_function(function_name, args, output_inclusion):
value1 = resolve_value(args[1]) value1 = resolve_value(args[1])
value2 = resolve_value(args[2]) value2 = resolve_value(args[2])
if value1 is None or value1 == "undefined" or value2 is None or value2 == "undefined": v1_undef = value1 is None or value1 == "undefined"
return "undefined" v2_undef = value2 is None or value2 == "undefined"
status = dominant_no_value(v1_undef or v2_undef,
value1 == "N/A" or value2 == "N/A")
if status:
return status
condition = (value1 == value2) condition = (value1 == value2)
result_if_true = resolve_value(args[3]) result_if_true = resolve_value(args[3])
@@ -730,13 +939,58 @@ def _execute_custom_function(function_name, args, output_inclusion):
value1 = resolve_value(args[1]) value1 = resolve_value(args[1])
value2 = resolve_value(args[2]) value2 = resolve_value(args[2])
if value1 is None or value1 == "undefined" or value2 is None or value2 == "undefined": v1_undef = value1 is None or value1 == "undefined"
return "undefined" v2_undef = value2 is None or value2 == "undefined"
status = dominant_no_value(v1_undef or v2_undef,
value1 == "N/A" or value2 == "N/A")
if status:
return status
condition = (value1 != value2) condition = (value1 != value2)
result_if_true = resolve_value(args[3]) result_if_true = resolve_value(args[3])
result_if_false = resolve_value(args[4]) result_if_false = resolve_value(args[4])
elif operator in (">", ">=", "<", "<="):
if len(args) != 5:
return f"$$$$ Argument Error: {operator} requires 5 arguments"
value1 = resolve_value(args[1])
value2 = resolve_value(args[2])
v1_undef = value1 is None or value1 == "undefined"
v2_undef = value2 is None or value2 == "undefined"
status = dominant_no_value(v1_undef or v2_undef,
value1 == "N/A" or value2 == "N/A")
if status:
return status
# Si l'un est numérique, tenter de convertir l'autre ; sinon comparer en string
cmp1, cmp2 = value1, value2
if isinstance(value1, (int, float)) and not isinstance(value2, (int, float)):
try:
cmp2 = float(value2)
except (ValueError, TypeError):
cmp1 = str(value1)
elif isinstance(value2, (int, float)) and not isinstance(value1, (int, float)):
try:
cmp1 = float(value1)
except (ValueError, TypeError):
cmp2 = str(value2)
try:
if operator == ">":
condition = (cmp1 > cmp2)
elif operator == ">=":
condition = (cmp1 >= cmp2)
elif operator == "<":
condition = (cmp1 < cmp2)
else:
condition = (cmp1 <= cmp2)
except TypeError:
return f"$$$$ Comparison Error: cannot compare {type(cmp1).__name__} and {type(cmp2).__name__}"
result_if_true = resolve_value(args[3])
result_if_false = resolve_value(args[4])
else: else:
return f"$$$$ Unknown Operator: {operator}" return f"$$$$ Unknown Operator: {operator}"
@@ -759,6 +1013,8 @@ def process_inclusions_mapping(output_inclusion, inclusion_data, record_data, re
if condition_value is None or condition_value == "undefined": if condition_value is None or condition_value == "undefined":
final_value = "undefined" final_value = "undefined"
elif condition_value == "N/A":
final_value = "N/A"
elif not isinstance(condition_value, bool): elif not isinstance(condition_value, bool):
final_value = "$$$$ Condition Field Error" final_value = "$$$$ Condition Field Error"
elif not condition_value: elif not condition_value:
@@ -962,6 +1218,13 @@ def get_all_questionnaires_by_patient(patient_id, record_data):
response.raise_for_status() response.raise_for_status()
response_data = response.json() response_data = response.json()
# First pass: count occurrences of each q_id to detect duplicates at lookup time
q_id_counts = {}
for item in response_data:
q_id = get_nested_value(item, path=["questionnaire", "id"])
if q_id:
q_id_counts[q_id] = q_id_counts.get(q_id, 0) + 1
# Build dictionary with questionnaire metadata for searching # Build dictionary with questionnaire metadata for searching
results = {} results = {}
for item in response_data: for item in response_data:
@@ -969,6 +1232,17 @@ def get_all_questionnaires_by_patient(patient_id, record_data):
q_name = get_nested_value(item, path=["questionnaire", "name"]) q_name = get_nested_value(item, path=["questionnaire", "name"])
q_category = get_nested_value(item, path=["questionnaire", "category"]) q_category = get_nested_value(item, path=["questionnaire", "category"])
answers = get_nested_value(item, path=["answers"], default={}) answers = get_nested_value(item, path=["answers"], default={})
# ── UNLINKED PATIENT FILTER ─────────────────────────────────────────────
# If answers.patient is missing, null, or does not match the requested
# patient_id, the questionnaire is considered unlinked → clear answers
# to prevent use of orphaned data.
# (disable: comment out the block below)
answers_patient = answers.get("subject") if isinstance(answers, dict) else None
if not answers_patient or answers_patient != patient_id:
answers = {}
# ────────────────────────────────────────────────────────────────────────
if q_id: if q_id:
results[q_id] = { results[q_id] = {
"questionnaire": { "questionnaire": {
@@ -976,7 +1250,8 @@ def get_all_questionnaires_by_patient(patient_id, record_data):
"name": q_name, "name": q_name,
"category": q_category "category": q_category
}, },
"answers": answers "answers": answers,
"_count": q_id_counts[q_id]
} }
return results return results
@@ -1135,8 +1410,12 @@ def _process_inclusion_data(inclusion, organization):
output_inclusion = {} output_inclusion = {}
# --- Prepare all data sources --- # --- Prepare all data sources ---
# 1. 6-month visit loading disabled on this branch (No-6-Month-Visit) # 1. Launch Visit Search asynchronously (it's slow, ~5s) — only if enabled by user
# visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2) # We use run_with_context to pass the patient identity to the new thread
if fetch_six_month_visit:
visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2)
else:
visit_future = None
# 2. Prepare inclusion_data: enrich inclusion with organization info # 2. Prepare inclusion_data: enrich inclusion with organization info
inclusion_data = dict(inclusion) inclusion_data = dict(inclusion)
@@ -1160,7 +1439,10 @@ def _process_inclusion_data(inclusion, organization):
logging.error(f"Error fetching request data for patient {patient_id}: {e}") logging.error(f"Error fetching request data for patient {patient_id}: {e}")
request_data = None request_data = None
# 6-month visit loading disabled on this branch (No-6-Month-Visit) try:
six_month_visit_data = visit_future.result() if visit_future is not None else {}
except Exception as e:
logging.error(f"Error searching 6-month visit for patient {pseudo}: {e}")
six_month_visit_data = None six_month_visit_data = None
# --- Process all fields from configuration --- # --- Process all fields from configuration ---
@@ -1228,10 +1510,19 @@ def main():
if login_status == "Exit": if login_status == "Exit":
return return
print()
ask_fetch_six_month_visit()
print() print()
number_of_threads = int((questionary.text("Number of threads :", default="12", number_of_threads = int((questionary.text("Number of threads :", default="12",
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask())) validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask()))
print()
ask_on_retry_exhausted()
print()
wait_for_scheduled_launch()
print() print()
load_inclusions_mapping_config() load_inclusions_mapping_config()
load_organizations_mapping_config() load_organizations_mapping_config()
@@ -1290,6 +1581,14 @@ def main():
inclusions_total_count = sum(org.get('patients_count', 0) for org in organizations_list) inclusions_total_count = sum(org.get('patients_count', 0) for org in organizations_list)
organizations_list.sort(key=lambda org: (-org.get('patients_count', 0), org.get('name', ''))) organizations_list.sort(key=lambda org: (-org.get('patients_count', 0), org.get('name', '')))
# ╔══════════════════════════════════════════════════════════════════╗
# ║ TEST INSTRUMENTATION — REMOVE BEFORE PRODUCTION ║
# ║ Limits the table to organizations at ranks 33 and 34 ║
# ║ (indices 32 and 33, after descending sort by patients_count) ║
# ╚══════════════════════════════════════════════════════════════════╝
# organizations_list = [org for i, org in enumerate(organizations_list) if i in (32, 33)]
# ══════════════════════════════════════════════════════════════════
number_of_organizations = len(organizations_list) number_of_organizations = len(organizations_list)
print(f"{inclusions_total_count} Inclusions in {number_of_organizations} Organizations...") print(f"{inclusions_total_count} Inclusions in {number_of_organizations} Organizations...")
print() print()

View File

@@ -34,7 +34,7 @@ try:
except ImportError: except ImportError:
xw = None xw = None
from eb_dashboard_utils import get_nested_value, get_config_path from eb_dashboard_utils import get_nested_value, get_config_path, get_dashboard_config_path
from eb_dashboard_constants import ( from eb_dashboard_constants import (
INCLUSIONS_FILE_NAME, INCLUSIONS_FILE_NAME,
ORGANIZATIONS_FILE_NAME, ORGANIZATIONS_FILE_NAME,
@@ -117,7 +117,7 @@ def load_excel_export_config(console_instance=None):
if console_instance: if console_instance:
console = console_instance console = console_instance
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME) config_path = get_dashboard_config_path(DASHBOARD_CONFIG_FILE_NAME)
error_messages = [] error_messages = []
try: try:
@@ -754,7 +754,11 @@ def _apply_value_replacement(value, replacements):
return value return value
for value_before, value_after in replacements: for value_before, value_after in replacements:
if value == value_before: # Strict equality # bool is a subclass of int in Python (True == 1, False == 0):
# reject the match if one side is bool and the other is not.
if isinstance(value, bool) != isinstance(value_before, bool):
continue
if value == value_before:
return value_after return value_after
return value # No match, return original return value # No match, return original

View File

@@ -17,7 +17,7 @@ import shutil
import openpyxl import openpyxl
from rich.console import Console from rich.console import Console
from eb_dashboard_utils import get_nested_value, get_old_filename as _get_old_filename, get_config_path from eb_dashboard_utils import get_nested_value, get_old_filename as _get_old_filename, get_config_path, get_dashboard_config_path
from eb_dashboard_constants import ( from eb_dashboard_constants import (
INCLUSIONS_FILE_NAME, INCLUSIONS_FILE_NAME,
ORGANIZATIONS_FILE_NAME, ORGANIZATIONS_FILE_NAME,
@@ -93,7 +93,7 @@ def load_regression_check_config(console_instance=None):
if console_instance: if console_instance:
console = console_instance console = console_instance
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME) config_path = get_dashboard_config_path(DASHBOARD_CONFIG_FILE_NAME)
try: try:
workbook = openpyxl.load_workbook(config_path) workbook = openpyxl.load_workbook(config_path)
@@ -431,6 +431,11 @@ def coherence_check(output_inclusions, organizations_list):
continue continue
patients += 1 patients += 1
# API statistics exclude non-consented from status sub-counters only
if get_nested_value(inclusion, ["Inclusion", "Consent_Signed"]) is not True:
continue
status = get_nested_value(inclusion, ["Inclusion", "Inclusion_Status"], default="") status = get_nested_value(inclusion, ["Inclusion", "Inclusion_Status"], default="")
if isinstance(status, str): if isinstance(status, str):

View File

@@ -204,6 +204,22 @@ def get_config_path():
return CONFIG_FOLDER_NAME return CONFIG_FOLDER_NAME
_dashboard_config_path_override = None
def set_dashboard_config_path_override(path):
"""Sets a global override for the dashboard config file path (used by --config CLI arg)."""
global _dashboard_config_path_override
_dashboard_config_path_override = path
def get_dashboard_config_path(config_file_name):
"""Returns the dashboard config file path, respecting any --config CLI override."""
if _dashboard_config_path_override:
return _dashboard_config_path_override
return os.path.join(get_config_path(), config_file_name)
def get_old_filename(current_filename, old_suffix="_old"): def get_old_filename(current_filename, old_suffix="_old"):
"""Generate old backup filename from current filename. """Generate old backup filename from current filename.

Binary file not shown.