Compare commits
8 Commits
c679182707
...
No-6-Month
| Author | SHA1 | Date | |
|---|---|---|---|
| a1985741f8 | |||
| 9ff3595a17 | |||
| 13e794eb34 | |||
| e58f867bd0 | |||
| 57540d5159 | |||
| d6943faf59 | |||
| 8562c45f05 | |||
| 1cc2c754b7 |
4
.gitignore
vendored
4
.gitignore
vendored
@@ -195,11 +195,9 @@ Endobest Reporting/
|
||||
jsons history/
|
||||
nul
|
||||
|
||||
# Ignore all json, exe, log, txt, csv and xlsx files
|
||||
# Ignore all json, exe, log and xlsx files
|
||||
*.json
|
||||
*.exe
|
||||
*.log
|
||||
*.txt
|
||||
*.csv
|
||||
/*.xlsx
|
||||
!eb_org_center_mapping.xlsx
|
||||
Binary file not shown.
BIN
config/Endobest_Dashboard_Config-new.xlsx
Normal file
BIN
config/Endobest_Dashboard_Config-new.xlsx
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
BIN
config/eb_dashboard_extended_template-new.xlsx
Normal file
BIN
config/eb_dashboard_extended_template-new.xlsx
Normal file
Binary file not shown.
Binary file not shown.
469
eb_dashboard.py
469
eb_dashboard.py
@@ -21,7 +21,7 @@
|
||||
# identification, and support for complex data extraction using JSON path expressions.
|
||||
import json
|
||||
import logging
|
||||
import msvcrt
|
||||
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
@@ -86,8 +86,6 @@ from eb_dashboard_utils import (
|
||||
clear_httpx_client,
|
||||
get_thread_position,
|
||||
get_config_path,
|
||||
set_dashboard_config_path_override,
|
||||
get_dashboard_config_path,
|
||||
thread_local_storage,
|
||||
run_with_context
|
||||
)
|
||||
@@ -120,10 +118,6 @@ access_token = ""
|
||||
refresh_token = ""
|
||||
threads_list = []
|
||||
_token_refresh_lock = threading.Lock()
|
||||
on_retry_exhausted = "ask" # "ask" | "ignore" | "abort" — set at startup
|
||||
fetch_six_month_visit = False # Whether to fetch 6-month visit data (slow, ~5s per patient)
|
||||
_stored_username = "" # Credentials stored at login for automatic re-login
|
||||
_stored_password = ""
|
||||
_threads_list_lock = threading.Lock()
|
||||
global_pbar = None
|
||||
_global_pbar_lock = threading.Lock()
|
||||
@@ -154,19 +148,6 @@ if "--debug" in sys.argv:
|
||||
sys.argv.remove("--debug")
|
||||
enable_debug_mode()
|
||||
|
||||
# Handle --config <path> override (remove from sys.argv to preserve positional args)
|
||||
if "--config" in sys.argv:
|
||||
_idx = sys.argv.index("--config")
|
||||
if _idx + 1 >= len(sys.argv):
|
||||
print("Error: --config requires a file path argument")
|
||||
sys.exit(1)
|
||||
_raw_config_path = sys.argv[_idx + 1]
|
||||
del sys.argv[_idx:_idx + 2]
|
||||
if os.path.isabs(_raw_config_path):
|
||||
set_dashboard_config_path_override(_raw_config_path)
|
||||
else:
|
||||
set_dashboard_config_path_override(os.path.join(get_config_path(), _raw_config_path))
|
||||
|
||||
# --- Progress Bar Configuration ---
|
||||
# NOTE: BAR_N_FMT_WIDTH, BAR_TOTAL_FMT_WIDTH, BAR_TIME_WIDTH, BAR_RATE_WIDTH
|
||||
# are imported from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH)
|
||||
@@ -205,10 +186,8 @@ def new_token():
|
||||
finally:
|
||||
if attempt < ERROR_MAX_RETRY - 1:
|
||||
sleep(WAIT_BEFORE_RETRY)
|
||||
# Refresh token exhausted — attempt full re-login with stored credentials
|
||||
logging.warning("Refresh token exhausted. Attempting re-login with stored credentials.")
|
||||
_do_login(_stored_username, _stored_password)
|
||||
logging.info("Re-login successful. New tokens acquired.")
|
||||
logging.critical("Persistent error in refresh_token")
|
||||
raise httpx.RequestError(message="Persistent error in refresh_token")
|
||||
|
||||
|
||||
def api_call_with_retry(func):
|
||||
@@ -233,10 +212,7 @@ def api_call_with_retry(func):
|
||||
|
||||
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
|
||||
logging.info(f"Token expired for {func_name}. Refreshing token.")
|
||||
try:
|
||||
new_token()
|
||||
except (httpx.RequestError, httpx.HTTPStatusError) as token_exc:
|
||||
logging.warning(f"Token refresh/re-login failed for {func_name}: {token_exc}")
|
||||
new_token()
|
||||
|
||||
if attempt < ERROR_MAX_RETRY - 1:
|
||||
sleep(WAIT_BEFORE_RETRY)
|
||||
@@ -249,41 +225,32 @@ def api_call_with_retry(func):
|
||||
sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES)
|
||||
break # Exit for loop to restart batch in while True
|
||||
else:
|
||||
# All automatic batches exhausted — apply on_retry_exhausted policy
|
||||
# All automatic batches exhausted, ask the user
|
||||
with _user_interaction_lock:
|
||||
if on_retry_exhausted == "ignore":
|
||||
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
|
||||
console.print(f"[red]Exception: {exc}[/red]")
|
||||
|
||||
choice = questionary.select(
|
||||
f"What would you like to do for {func_name}?",
|
||||
choices=[
|
||||
"Retry (try another batch of retries)",
|
||||
"Ignore (return None and continue)",
|
||||
"Stop script (critical error)"
|
||||
]
|
||||
).ask()
|
||||
|
||||
if choice == "Retry (try another batch of retries)":
|
||||
logging.info(f"User chose to retry {func_name}. Restarting batch sequence.")
|
||||
batch_count = 1 # Reset batch counter for the next interactive round
|
||||
break # Exit for loop to restart batch in while True
|
||||
elif choice == "Ignore (return None and continue)":
|
||||
# Retrieve context if available
|
||||
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
|
||||
logging.warning(f"[AUTO-IGNORE] Skipping {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
|
||||
logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
|
||||
return None
|
||||
|
||||
elif on_retry_exhausted == "abort":
|
||||
logging.critical(f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}")
|
||||
raise httpx.RequestError(message=f"Persistent error in {func_name} (auto-aborted)")
|
||||
|
||||
else: # "ask" — display error then interactive prompt
|
||||
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
|
||||
console.print(f"[red]Exception: {exc}[/red]")
|
||||
|
||||
choice = questionary.select(
|
||||
f"What would you like to do for {func_name}?",
|
||||
choices=[
|
||||
"Retry (try another batch of retries)",
|
||||
"Ignore (return None and continue)",
|
||||
"Stop script (critical error)"
|
||||
]
|
||||
).ask()
|
||||
|
||||
if choice == "Retry (try another batch of retries)":
|
||||
logging.info(f"User chose to retry {func_name}. Restarting batch sequence.")
|
||||
batch_count = 1 # Reset batch counter for the next interactive round
|
||||
break # Exit for loop to restart batch in while True
|
||||
elif choice == "Ignore (return None and continue)":
|
||||
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
|
||||
logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
|
||||
return None
|
||||
else:
|
||||
logging.critical(f"User chose to stop script after persistent error in {func_name}.")
|
||||
raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)")
|
||||
else:
|
||||
logging.critical(f"User chose to stop script after persistent error in {func_name}.")
|
||||
raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)")
|
||||
|
||||
return wrapper
|
||||
|
||||
@@ -292,37 +259,8 @@ def api_call_with_retry(func):
|
||||
# BLOCK 3: AUTHENTICATION
|
||||
# ============================================================================
|
||||
|
||||
def _do_login(username, password):
|
||||
"""Performs the two-step authentication (IAM → RC) with the given credentials.
|
||||
Updates global access_token and refresh_token on success.
|
||||
Raises httpx.RequestError or httpx.HTTPStatusError on failure.
|
||||
Must NOT acquire _token_refresh_lock (caller's responsibility).
|
||||
"""
|
||||
global access_token, refresh_token
|
||||
|
||||
client = get_httpx_client()
|
||||
client.base_url = IAM_URL
|
||||
response = client.post(API_AUTH_LOGIN_ENDPOINT,
|
||||
json={"username": username, "password": password},
|
||||
timeout=60)
|
||||
response.raise_for_status()
|
||||
master_token = response.json()["access_token"]
|
||||
user_id = response.json()["userId"]
|
||||
|
||||
client = get_httpx_client()
|
||||
client.base_url = RC_URL
|
||||
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT,
|
||||
headers={"Authorization": f"Bearer {master_token}"},
|
||||
json={"userId": user_id, "clientId": RC_APP_ID,
|
||||
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
|
||||
timeout=20)
|
||||
response.raise_for_status()
|
||||
access_token = response.json()["access_token"]
|
||||
refresh_token = response.json()["refresh_token"]
|
||||
|
||||
|
||||
def login():
|
||||
global _stored_username, _stored_password
|
||||
global access_token, refresh_token
|
||||
|
||||
user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask())
|
||||
password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask())
|
||||
@@ -330,18 +268,42 @@ def login():
|
||||
return "Exit"
|
||||
|
||||
try:
|
||||
_do_login(user_name, password)
|
||||
client = get_httpx_client()
|
||||
client.base_url = IAM_URL
|
||||
response = client.post(API_AUTH_LOGIN_ENDPOINT, json={"username": user_name, "password": password},
|
||||
timeout=20)
|
||||
response.raise_for_status()
|
||||
master_token = response.json()["access_token"]
|
||||
user_id = response.json()["userId"]
|
||||
except httpx.RequestError as exc:
|
||||
print(f"Login Error : {exc}")
|
||||
logging.warning(f"Login Error : {exc}")
|
||||
return "Error"
|
||||
except httpx.HTTPStatusError as exc:
|
||||
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
|
||||
logging.warning(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
|
||||
logging.warning(
|
||||
f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
|
||||
return "Error"
|
||||
|
||||
try:
|
||||
client = get_httpx_client()
|
||||
client.base_url = RC_URL
|
||||
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {master_token}"},
|
||||
json={"userId": user_id, "clientId": RC_APP_ID,
|
||||
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
|
||||
timeout=20)
|
||||
response.raise_for_status()
|
||||
access_token = response.json()["access_token"]
|
||||
refresh_token = response.json()["refresh_token"]
|
||||
except httpx.RequestError as exc:
|
||||
print(f"Login Error : {exc}")
|
||||
logging.warning(f"Login Error : {exc}")
|
||||
return "Error"
|
||||
except httpx.HTTPStatusError as exc:
|
||||
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
|
||||
logging.warning(f"Login Error : {exc}")
|
||||
return "Error"
|
||||
|
||||
_stored_username = user_name
|
||||
_stored_password = password
|
||||
print()
|
||||
print("Login Success")
|
||||
return "Success"
|
||||
@@ -351,100 +313,6 @@ def login():
|
||||
# BLOCK 3B: FILE UTILITIES
|
||||
# ============================================================================
|
||||
|
||||
def ask_on_retry_exhausted():
|
||||
"""Asks the user what to do when all API retry batches are exhausted."""
|
||||
global on_retry_exhausted
|
||||
choice = questionary.select(
|
||||
"On retry exhausted :",
|
||||
choices=[
|
||||
"Ask (interactive prompt)",
|
||||
"Ignore (return None and continue)",
|
||||
"Abort (stop script)"
|
||||
]
|
||||
).ask()
|
||||
if choice is None or choice == "Ask (interactive prompt)":
|
||||
on_retry_exhausted = "ask"
|
||||
elif choice == "Ignore (return None and continue)":
|
||||
on_retry_exhausted = "ignore"
|
||||
else:
|
||||
on_retry_exhausted = "abort"
|
||||
|
||||
|
||||
def ask_fetch_six_month_visit():
|
||||
"""Asks the user whether to fetch 6-month visit data (slow API call, ~5s per patient)."""
|
||||
global fetch_six_month_visit
|
||||
choice = questionary.select(
|
||||
"Fetch 6-month visit progress data? (slow, ~5s per patient) :",
|
||||
choices=[
|
||||
"No (skip, faster execution)",
|
||||
"Yes (fetch 6-month visit data)"
|
||||
]
|
||||
).ask()
|
||||
fetch_six_month_visit = (choice == "Yes (fetch 6-month visit data)")
|
||||
|
||||
|
||||
def wait_for_scheduled_launch():
|
||||
"""Asks the user when to start the processing and waits if needed.
|
||||
Options: Immediately / In X minutes / At HH:MM
|
||||
"""
|
||||
choice = questionary.select(
|
||||
"When to start processing ?",
|
||||
choices=["Immediately", "In X minutes", "At HH:MM"]
|
||||
).ask()
|
||||
|
||||
if choice is None or choice == "Immediately":
|
||||
return
|
||||
|
||||
if choice == "In X minutes":
|
||||
minutes_str = questionary.text(
|
||||
"Number of minutes :",
|
||||
validate=lambda x: x.isdigit() and int(x) > 0
|
||||
).ask()
|
||||
if not minutes_str:
|
||||
return
|
||||
target_time = datetime.now() + timedelta(minutes=int(minutes_str))
|
||||
|
||||
else: # "At HH:MM"
|
||||
time_str = questionary.text(
|
||||
"Start time (HH:MM) :",
|
||||
validate=lambda x: bool(re.match(r'^\d{2}:\d{2}$', x)) and
|
||||
0 <= int(x.split(':')[0]) <= 23 and
|
||||
0 <= int(x.split(':')[1]) <= 59
|
||||
).ask()
|
||||
if not time_str:
|
||||
return
|
||||
now = datetime.now()
|
||||
h, m = int(time_str.split(':')[0]), int(time_str.split(':')[1])
|
||||
target_time = now.replace(hour=h, minute=m, second=0, microsecond=0)
|
||||
if target_time <= now:
|
||||
console.print("[yellow]⚠ Specified time is already past. Starting immediately.[/yellow]")
|
||||
return
|
||||
|
||||
print()
|
||||
try:
|
||||
while True:
|
||||
remaining = target_time - datetime.now()
|
||||
if remaining.total_seconds() <= 0:
|
||||
break
|
||||
total_secs = int(remaining.total_seconds())
|
||||
h = total_secs // 3600
|
||||
m = (total_secs % 3600) // 60
|
||||
s = total_secs % 60
|
||||
target_str = target_time.strftime('%H:%M:%S')
|
||||
print(f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ",
|
||||
end="", flush=True)
|
||||
sleep(1)
|
||||
# Flush keyboard buffer to prevent stray keystrokes from polluting subsequent prompts
|
||||
while msvcrt.kbhit():
|
||||
msvcrt.getwch()
|
||||
print()
|
||||
console.print("[green]✓ Starting processing.[/green]")
|
||||
except KeyboardInterrupt:
|
||||
print()
|
||||
console.print("[bold red]Launch cancelled by user.[/bold red]")
|
||||
raise SystemExit(0)
|
||||
|
||||
|
||||
def load_json_file(filename):
|
||||
"""
|
||||
Load a JSON file from disk.
|
||||
@@ -472,7 +340,7 @@ def load_json_file(filename):
|
||||
def load_inclusions_mapping_config():
|
||||
"""Loads and validates the inclusions mapping configuration from the Excel file."""
|
||||
global inclusions_mapping_config
|
||||
config_path = get_dashboard_config_path(DASHBOARD_CONFIG_FILE_NAME)
|
||||
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME)
|
||||
|
||||
try:
|
||||
# Load with data_only=True to read calculated values instead of formulas
|
||||
@@ -574,7 +442,7 @@ def load_inclusions_mapping_config():
|
||||
def load_organizations_mapping_config():
|
||||
"""Loads and validates the organizations mapping configuration from the Excel file."""
|
||||
global organizations_mapping_config
|
||||
config_path = get_dashboard_config_path(DASHBOARD_CONFIG_FILE_NAME)
|
||||
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME)
|
||||
|
||||
try:
|
||||
# Load with data_only=True to read calculated values instead of formulas
|
||||
@@ -632,12 +500,6 @@ def _find_questionnaire_by_id(qcm_dict, qcm_id):
|
||||
if not isinstance(qcm_dict, dict):
|
||||
return None
|
||||
qcm_data = qcm_dict.get(qcm_id)
|
||||
if qcm_data and qcm_data.get("_count", 1) > 1:
|
||||
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
|
||||
logging.error(
|
||||
f"[DUPLICATE QCM] Patient {ctx['id']} ({ctx['pseudo']}): "
|
||||
f"Questionnaire id='{qcm_id}' appeared {qcm_data['_count']} times in API response — using last received copy"
|
||||
)
|
||||
return qcm_data.get("answers") if qcm_data else None
|
||||
|
||||
|
||||
@@ -645,32 +507,20 @@ def _find_questionnaire_by_name(qcm_dict, name):
|
||||
"""Finds a questionnaire by name (sequential search, returns first match)."""
|
||||
if not isinstance(qcm_dict, dict):
|
||||
return None
|
||||
matches = [qcm for qcm in qcm_dict.values()
|
||||
if get_nested_value(qcm, ["questionnaire", "name"]) == name]
|
||||
if len(matches) > 1:
|
||||
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
|
||||
ids = [get_nested_value(q, ["questionnaire", "id"]) for q in matches]
|
||||
logging.error(
|
||||
f"[DUPLICATE QCM] Patient {ctx['id']} ({ctx['pseudo']}): "
|
||||
f"Questionnaire name='{name}' matches {len(matches)} entries (ids: {ids}) — returning first match"
|
||||
)
|
||||
return matches[0].get("answers") if matches else None
|
||||
for qcm in qcm_dict.values():
|
||||
if get_nested_value(qcm, ["questionnaire", "name"]) == name:
|
||||
return qcm.get("answers")
|
||||
return None
|
||||
|
||||
|
||||
def _find_questionnaire_by_category(qcm_dict, category):
|
||||
"""Finds a questionnaire by category (sequential search, returns first match)."""
|
||||
if not isinstance(qcm_dict, dict):
|
||||
return None
|
||||
matches = [qcm for qcm in qcm_dict.values()
|
||||
if get_nested_value(qcm, ["questionnaire", "category"]) == category]
|
||||
if len(matches) > 1:
|
||||
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
|
||||
ids = [get_nested_value(q, ["questionnaire", "id"]) for q in matches]
|
||||
logging.error(
|
||||
f"[DUPLICATE QCM] Patient {ctx['id']} ({ctx['pseudo']}): "
|
||||
f"Questionnaire category='{category}' matches {len(matches)} entries (ids: {ids}) — returning first match"
|
||||
)
|
||||
return matches[0].get("answers") if matches else None
|
||||
for qcm in qcm_dict.values():
|
||||
if get_nested_value(qcm, ["questionnaire", "category"]) == category:
|
||||
return qcm.get("answers")
|
||||
return None
|
||||
|
||||
|
||||
def _get_field_value_from_questionnaire(all_questionnaires, field_config):
|
||||
@@ -707,13 +557,6 @@ def get_value_from_inclusion(inclusion_dict, key):
|
||||
|
||||
def _execute_custom_function(function_name, args, output_inclusion):
|
||||
"""Executes a custom function for a calculated field."""
|
||||
|
||||
def dominant_no_value(has_undef, has_na):
|
||||
"""Returns the dominant sentinel: 'undefined' > 'N/A' > None (real value present)."""
|
||||
if has_undef: return "undefined"
|
||||
if has_na: return "N/A"
|
||||
return None
|
||||
|
||||
if function_name == "search_in_fields_using_regex":
|
||||
if not args or len(args) < 2:
|
||||
return "$$$$ Argument Error: search_in_fields_using_regex requires at least 2 arguments"
|
||||
@@ -722,19 +565,16 @@ def _execute_custom_function(function_name, args, output_inclusion):
|
||||
field_names = args[1:]
|
||||
|
||||
field_values = []
|
||||
has_undefined = False
|
||||
has_na = False
|
||||
all_undefined = True
|
||||
|
||||
for field_name in field_names:
|
||||
value = get_value_from_inclusion(output_inclusion, field_name)
|
||||
field_values.append(value)
|
||||
if value is None or value == "undefined":
|
||||
has_undefined = True
|
||||
elif value == "N/A":
|
||||
has_na = True
|
||||
if value is not None and value != "undefined":
|
||||
all_undefined = False
|
||||
|
||||
if not any(v not in (None, "undefined", "N/A") for v in field_values):
|
||||
return dominant_no_value(has_undefined, has_na)
|
||||
if all_undefined:
|
||||
return "undefined"
|
||||
|
||||
try:
|
||||
for value in field_values:
|
||||
@@ -755,8 +595,6 @@ def _execute_custom_function(function_name, args, output_inclusion):
|
||||
|
||||
if value is None or value == "undefined":
|
||||
return "undefined"
|
||||
if value == "N/A":
|
||||
return "N/A"
|
||||
|
||||
match = re.search(r'\((.*?)\)', str(value))
|
||||
return match.group(1) if match else "undefined"
|
||||
@@ -770,8 +608,6 @@ def _execute_custom_function(function_name, args, output_inclusion):
|
||||
|
||||
if status is None or status == "undefined":
|
||||
return "undefined"
|
||||
if status == "N/A":
|
||||
return "N/A"
|
||||
|
||||
if not isinstance(is_terminated, bool) or not is_terminated:
|
||||
return status
|
||||
@@ -781,8 +617,7 @@ def _execute_custom_function(function_name, args, output_inclusion):
|
||||
elif function_name == "if_then_else":
|
||||
# Unified conditional function
|
||||
# Syntax: ["operator", arg1, arg2_optional, result_if_true, result_if_false]
|
||||
# Operators: "is_true", "is_false", "all_true", "any_true", "is_defined", "is_undefined", "all_defined", "==", "!=", ">", ">=", "<", "<="
|
||||
# Sentinel propagation: "undefined" (unknown) > "N/A" (not applicable) > real value.
|
||||
# Operators: "is_true", "is_false", "all_true", "is_defined", "is_undefined", "all_defined", "==", "!="
|
||||
|
||||
if not args or len(args) < 4:
|
||||
return "$$$$ Argument Error: if_then_else requires at least 4 arguments"
|
||||
@@ -810,8 +645,6 @@ def _execute_custom_function(function_name, args, output_inclusion):
|
||||
value = resolve_value(args[1])
|
||||
if value is None or value == "undefined":
|
||||
return "undefined"
|
||||
if value == "N/A":
|
||||
return "N/A"
|
||||
condition = (value is True)
|
||||
result_if_true = resolve_value(args[2])
|
||||
result_if_false = resolve_value(args[3])
|
||||
@@ -822,8 +655,6 @@ def _execute_custom_function(function_name, args, output_inclusion):
|
||||
value = resolve_value(args[1])
|
||||
if value is None or value == "undefined":
|
||||
return "undefined"
|
||||
if value == "N/A":
|
||||
return "N/A"
|
||||
condition = (value is False)
|
||||
result_if_true = resolve_value(args[2])
|
||||
result_if_false = resolve_value(args[3])
|
||||
@@ -835,58 +666,22 @@ def _execute_custom_function(function_name, args, output_inclusion):
|
||||
if not isinstance(fields_arg, list):
|
||||
return "$$$$ Argument Error: all_true requires arg1 to be a list of field names"
|
||||
|
||||
has_undefined = False
|
||||
has_na = False
|
||||
conditions = []
|
||||
for field_name in fields_arg:
|
||||
field_value = get_value_from_inclusion(output_inclusion, field_name)
|
||||
if field_value is None or field_value == "undefined":
|
||||
has_undefined = True
|
||||
elif field_value == "N/A":
|
||||
has_na = True
|
||||
else:
|
||||
conditions.append(field_value)
|
||||
|
||||
status = dominant_no_value(has_undefined, has_na)
|
||||
if status:
|
||||
return status
|
||||
return "undefined"
|
||||
conditions.append(field_value)
|
||||
|
||||
condition = all(conditions)
|
||||
result_if_true = resolve_value(args[2])
|
||||
result_if_false = resolve_value(args[3])
|
||||
|
||||
elif operator == "any_true":
|
||||
# OR semantics: returns "undefined" only if ALL operands are undefined
|
||||
if len(args) != 4:
|
||||
return "$$$$ Argument Error: any_true requires 4 arguments"
|
||||
fields_arg = args[1]
|
||||
if not isinstance(fields_arg, list):
|
||||
return "$$$$ Argument Error: any_true requires arg1 to be a list of field names"
|
||||
|
||||
has_undefined = False
|
||||
has_na = False
|
||||
resolved = []
|
||||
for field_name in fields_arg:
|
||||
field_value = get_value_from_inclusion(output_inclusion, field_name)
|
||||
if field_value is None or field_value == "undefined":
|
||||
has_undefined = True
|
||||
elif field_value == "N/A":
|
||||
has_na = True
|
||||
else:
|
||||
resolved.append(field_value)
|
||||
|
||||
if not resolved:
|
||||
return dominant_no_value(has_undefined, has_na)
|
||||
|
||||
condition = any(resolved)
|
||||
result_if_true = resolve_value(args[2])
|
||||
result_if_false = resolve_value(args[3])
|
||||
|
||||
elif operator == "is_defined":
|
||||
if len(args) != 4:
|
||||
return "$$$$ Argument Error: is_defined requires 4 arguments"
|
||||
value = resolve_value(args[1])
|
||||
condition = (value is not None and value != "undefined" and value != "N/A")
|
||||
condition = (value is not None and value != "undefined")
|
||||
result_if_true = resolve_value(args[2])
|
||||
result_if_false = resolve_value(args[3])
|
||||
|
||||
@@ -907,7 +702,7 @@ def _execute_custom_function(function_name, args, output_inclusion):
|
||||
|
||||
for field_name in fields_arg:
|
||||
field_value = get_value_from_inclusion(output_inclusion, field_name)
|
||||
if field_value is None or field_value == "undefined" or field_value == "N/A":
|
||||
if field_value is None or field_value == "undefined":
|
||||
condition = False
|
||||
break
|
||||
else:
|
||||
@@ -922,12 +717,8 @@ def _execute_custom_function(function_name, args, output_inclusion):
|
||||
value1 = resolve_value(args[1])
|
||||
value2 = resolve_value(args[2])
|
||||
|
||||
v1_undef = value1 is None or value1 == "undefined"
|
||||
v2_undef = value2 is None or value2 == "undefined"
|
||||
status = dominant_no_value(v1_undef or v2_undef,
|
||||
value1 == "N/A" or value2 == "N/A")
|
||||
if status:
|
||||
return status
|
||||
if value1 is None or value1 == "undefined" or value2 is None or value2 == "undefined":
|
||||
return "undefined"
|
||||
|
||||
condition = (value1 == value2)
|
||||
result_if_true = resolve_value(args[3])
|
||||
@@ -939,58 +730,13 @@ def _execute_custom_function(function_name, args, output_inclusion):
|
||||
value1 = resolve_value(args[1])
|
||||
value2 = resolve_value(args[2])
|
||||
|
||||
v1_undef = value1 is None or value1 == "undefined"
|
||||
v2_undef = value2 is None or value2 == "undefined"
|
||||
status = dominant_no_value(v1_undef or v2_undef,
|
||||
value1 == "N/A" or value2 == "N/A")
|
||||
if status:
|
||||
return status
|
||||
if value1 is None or value1 == "undefined" or value2 is None or value2 == "undefined":
|
||||
return "undefined"
|
||||
|
||||
condition = (value1 != value2)
|
||||
result_if_true = resolve_value(args[3])
|
||||
result_if_false = resolve_value(args[4])
|
||||
|
||||
elif operator in (">", ">=", "<", "<="):
|
||||
if len(args) != 5:
|
||||
return f"$$$$ Argument Error: {operator} requires 5 arguments"
|
||||
value1 = resolve_value(args[1])
|
||||
value2 = resolve_value(args[2])
|
||||
|
||||
v1_undef = value1 is None or value1 == "undefined"
|
||||
v2_undef = value2 is None or value2 == "undefined"
|
||||
status = dominant_no_value(v1_undef or v2_undef,
|
||||
value1 == "N/A" or value2 == "N/A")
|
||||
if status:
|
||||
return status
|
||||
|
||||
# Si l'un est numérique, tenter de convertir l'autre ; sinon comparer en string
|
||||
cmp1, cmp2 = value1, value2
|
||||
if isinstance(value1, (int, float)) and not isinstance(value2, (int, float)):
|
||||
try:
|
||||
cmp2 = float(value2)
|
||||
except (ValueError, TypeError):
|
||||
cmp1 = str(value1)
|
||||
elif isinstance(value2, (int, float)) and not isinstance(value1, (int, float)):
|
||||
try:
|
||||
cmp1 = float(value1)
|
||||
except (ValueError, TypeError):
|
||||
cmp2 = str(value2)
|
||||
|
||||
try:
|
||||
if operator == ">":
|
||||
condition = (cmp1 > cmp2)
|
||||
elif operator == ">=":
|
||||
condition = (cmp1 >= cmp2)
|
||||
elif operator == "<":
|
||||
condition = (cmp1 < cmp2)
|
||||
else:
|
||||
condition = (cmp1 <= cmp2)
|
||||
except TypeError:
|
||||
return f"$$$$ Comparison Error: cannot compare {type(cmp1).__name__} and {type(cmp2).__name__}"
|
||||
|
||||
result_if_true = resolve_value(args[3])
|
||||
result_if_false = resolve_value(args[4])
|
||||
|
||||
else:
|
||||
return f"$$$$ Unknown Operator: {operator}"
|
||||
|
||||
@@ -1013,8 +759,6 @@ def process_inclusions_mapping(output_inclusion, inclusion_data, record_data, re
|
||||
|
||||
if condition_value is None or condition_value == "undefined":
|
||||
final_value = "undefined"
|
||||
elif condition_value == "N/A":
|
||||
final_value = "N/A"
|
||||
elif not isinstance(condition_value, bool):
|
||||
final_value = "$$$$ Condition Field Error"
|
||||
elif not condition_value:
|
||||
@@ -1218,13 +962,6 @@ def get_all_questionnaires_by_patient(patient_id, record_data):
|
||||
response.raise_for_status()
|
||||
response_data = response.json()
|
||||
|
||||
# First pass: count occurrences of each q_id to detect duplicates at lookup time
|
||||
q_id_counts = {}
|
||||
for item in response_data:
|
||||
q_id = get_nested_value(item, path=["questionnaire", "id"])
|
||||
if q_id:
|
||||
q_id_counts[q_id] = q_id_counts.get(q_id, 0) + 1
|
||||
|
||||
# Build dictionary with questionnaire metadata for searching
|
||||
results = {}
|
||||
for item in response_data:
|
||||
@@ -1232,17 +969,6 @@ def get_all_questionnaires_by_patient(patient_id, record_data):
|
||||
q_name = get_nested_value(item, path=["questionnaire", "name"])
|
||||
q_category = get_nested_value(item, path=["questionnaire", "category"])
|
||||
answers = get_nested_value(item, path=["answers"], default={})
|
||||
|
||||
# ── UNLINKED PATIENT FILTER ─────────────────────────────────────────────
|
||||
# If answers.patient is missing, null, or does not match the requested
|
||||
# patient_id, the questionnaire is considered unlinked → clear answers
|
||||
# to prevent use of orphaned data.
|
||||
# (disable: comment out the block below)
|
||||
answers_patient = answers.get("subject") if isinstance(answers, dict) else None
|
||||
if not answers_patient or answers_patient != patient_id:
|
||||
answers = {}
|
||||
# ────────────────────────────────────────────────────────────────────────
|
||||
|
||||
if q_id:
|
||||
results[q_id] = {
|
||||
"questionnaire": {
|
||||
@@ -1250,8 +976,7 @@ def get_all_questionnaires_by_patient(patient_id, record_data):
|
||||
"name": q_name,
|
||||
"category": q_category
|
||||
},
|
||||
"answers": answers,
|
||||
"_count": q_id_counts[q_id]
|
||||
"answers": answers
|
||||
}
|
||||
return results
|
||||
|
||||
@@ -1410,12 +1135,8 @@ def _process_inclusion_data(inclusion, organization):
|
||||
output_inclusion = {}
|
||||
|
||||
# --- Prepare all data sources ---
|
||||
# 1. Launch Visit Search asynchronously (it's slow, ~5s) — only if enabled by user
|
||||
# We use run_with_context to pass the patient identity to the new thread
|
||||
if fetch_six_month_visit:
|
||||
visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2)
|
||||
else:
|
||||
visit_future = None
|
||||
# 1. 6-month visit loading disabled on this branch (No-6-Month-Visit)
|
||||
# visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2)
|
||||
|
||||
# 2. Prepare inclusion_data: enrich inclusion with organization info
|
||||
inclusion_data = dict(inclusion)
|
||||
@@ -1439,11 +1160,8 @@ def _process_inclusion_data(inclusion, organization):
|
||||
logging.error(f"Error fetching request data for patient {patient_id}: {e}")
|
||||
request_data = None
|
||||
|
||||
try:
|
||||
six_month_visit_data = visit_future.result() if visit_future is not None else {}
|
||||
except Exception as e:
|
||||
logging.error(f"Error searching 6-month visit for patient {pseudo}: {e}")
|
||||
six_month_visit_data = None
|
||||
# 6-month visit loading disabled on this branch (No-6-Month-Visit)
|
||||
six_month_visit_data = None
|
||||
|
||||
# --- Process all fields from configuration ---
|
||||
process_inclusions_mapping(output_inclusion, inclusion_data, record_data, request_data, all_questionnaires, six_month_visit_data)
|
||||
@@ -1510,19 +1228,10 @@ def main():
|
||||
if login_status == "Exit":
|
||||
return
|
||||
|
||||
print()
|
||||
ask_fetch_six_month_visit()
|
||||
|
||||
print()
|
||||
number_of_threads = int((questionary.text("Number of threads :", default="12",
|
||||
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask()))
|
||||
|
||||
print()
|
||||
ask_on_retry_exhausted()
|
||||
|
||||
print()
|
||||
wait_for_scheduled_launch()
|
||||
|
||||
print()
|
||||
load_inclusions_mapping_config()
|
||||
load_organizations_mapping_config()
|
||||
@@ -1581,14 +1290,6 @@ def main():
|
||||
inclusions_total_count = sum(org.get('patients_count', 0) for org in organizations_list)
|
||||
organizations_list.sort(key=lambda org: (-org.get('patients_count', 0), org.get('name', '')))
|
||||
|
||||
# ╔══════════════════════════════════════════════════════════════════╗
|
||||
# ║ TEST INSTRUMENTATION — REMOVE BEFORE PRODUCTION ║
|
||||
# ║ Limits the table to organizations at ranks 33 and 34 ║
|
||||
# ║ (indices 32 and 33, after descending sort by patients_count) ║
|
||||
# ╚══════════════════════════════════════════════════════════════════╝
|
||||
# organizations_list = [org for i, org in enumerate(organizations_list) if i in (32, 33)]
|
||||
# ══════════════════════════════════════════════════════════════════
|
||||
|
||||
number_of_organizations = len(organizations_list)
|
||||
print(f"{inclusions_total_count} Inclusions in {number_of_organizations} Organizations...")
|
||||
print()
|
||||
|
||||
@@ -34,7 +34,7 @@ try:
|
||||
except ImportError:
|
||||
xw = None
|
||||
|
||||
from eb_dashboard_utils import get_nested_value, get_config_path, get_dashboard_config_path
|
||||
from eb_dashboard_utils import get_nested_value, get_config_path
|
||||
from eb_dashboard_constants import (
|
||||
INCLUSIONS_FILE_NAME,
|
||||
ORGANIZATIONS_FILE_NAME,
|
||||
@@ -117,7 +117,7 @@ def load_excel_export_config(console_instance=None):
|
||||
if console_instance:
|
||||
console = console_instance
|
||||
|
||||
config_path = get_dashboard_config_path(DASHBOARD_CONFIG_FILE_NAME)
|
||||
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME)
|
||||
error_messages = []
|
||||
|
||||
try:
|
||||
@@ -754,11 +754,7 @@ def _apply_value_replacement(value, replacements):
|
||||
return value
|
||||
|
||||
for value_before, value_after in replacements:
|
||||
# bool is a subclass of int in Python (True == 1, False == 0):
|
||||
# reject the match if one side is bool and the other is not.
|
||||
if isinstance(value, bool) != isinstance(value_before, bool):
|
||||
continue
|
||||
if value == value_before:
|
||||
if value == value_before: # Strict equality
|
||||
return value_after
|
||||
|
||||
return value # No match, return original
|
||||
|
||||
@@ -17,7 +17,7 @@ import shutil
|
||||
|
||||
import openpyxl
|
||||
from rich.console import Console
|
||||
from eb_dashboard_utils import get_nested_value, get_old_filename as _get_old_filename, get_config_path, get_dashboard_config_path
|
||||
from eb_dashboard_utils import get_nested_value, get_old_filename as _get_old_filename, get_config_path
|
||||
from eb_dashboard_constants import (
|
||||
INCLUSIONS_FILE_NAME,
|
||||
ORGANIZATIONS_FILE_NAME,
|
||||
@@ -93,7 +93,7 @@ def load_regression_check_config(console_instance=None):
|
||||
if console_instance:
|
||||
console = console_instance
|
||||
|
||||
config_path = get_dashboard_config_path(DASHBOARD_CONFIG_FILE_NAME)
|
||||
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME)
|
||||
|
||||
try:
|
||||
workbook = openpyxl.load_workbook(config_path)
|
||||
@@ -431,11 +431,6 @@ def coherence_check(output_inclusions, organizations_list):
|
||||
continue
|
||||
|
||||
patients += 1
|
||||
|
||||
# API statistics exclude non-consented from status sub-counters only
|
||||
if get_nested_value(inclusion, ["Inclusion", "Consent_Signed"]) is not True:
|
||||
continue
|
||||
|
||||
status = get_nested_value(inclusion, ["Inclusion", "Inclusion_Status"], default="")
|
||||
|
||||
if isinstance(status, str):
|
||||
|
||||
@@ -204,22 +204,6 @@ def get_config_path():
|
||||
return CONFIG_FOLDER_NAME
|
||||
|
||||
|
||||
_dashboard_config_path_override = None
|
||||
|
||||
|
||||
def set_dashboard_config_path_override(path):
|
||||
"""Sets a global override for the dashboard config file path (used by --config CLI arg)."""
|
||||
global _dashboard_config_path_override
|
||||
_dashboard_config_path_override = path
|
||||
|
||||
|
||||
def get_dashboard_config_path(config_file_name):
|
||||
"""Returns the dashboard config file path, respecting any --config CLI override."""
|
||||
if _dashboard_config_path_override:
|
||||
return _dashboard_config_path_override
|
||||
return os.path.join(get_config_path(), config_file_name)
|
||||
|
||||
|
||||
def get_old_filename(current_filename, old_suffix="_old"):
|
||||
"""Generate old backup filename from current filename.
|
||||
|
||||
|
||||
Binary file not shown.
Reference in New Issue
Block a user