8 Commits

Author SHA1 Message Date
a1985741f8 clean 2026-02-09 22:52:17 +01:00
9ff3595a17 . 2026-02-09 22:31:02 +01:00
13e794eb34 ignore logs 2026-02-09 18:43:34 +01:00
e58f867bd0 ignore exe 2026-02-09 18:42:39 +01:00
57540d5159 ignore all jsons 2026-02-09 18:34:51 +01:00
d6943faf59 exclude log from git 2026-02-09 17:16:53 +01:00
8562c45f05 executable 2026-02-09 17:08:59 +01:00
1cc2c754b7 No-6-Month-Visit 2026-02-09 17:01:55 +01:00
9 changed files with 70 additions and 230 deletions

3
.gitignore vendored
View File

@@ -195,10 +195,9 @@ Endobest Reporting/
jsons history/ jsons history/
nul nul
# Ignore all json, exe, log, txt and xlsx files # Ignore all json, exe, log and xlsx files
*.json *.json
*.exe *.exe
*.log *.log
*.txt
/*.xlsx /*.xlsx
!eb_org_center_mapping.xlsx !eb_org_center_mapping.xlsx

Binary file not shown.

Binary file not shown.

View File

@@ -21,7 +21,7 @@
# identification, and support for complex data extraction using JSON path expressions. # identification, and support for complex data extraction using JSON path expressions.
import json import json
import logging import logging
import msvcrt
import os import os
import re import re
import sys import sys
@@ -118,10 +118,6 @@ access_token = ""
refresh_token = "" refresh_token = ""
threads_list = [] threads_list = []
_token_refresh_lock = threading.Lock() _token_refresh_lock = threading.Lock()
on_retry_exhausted = "ask" # "ask" | "ignore" | "abort" — set at startup
fetch_six_month_visit = False # Whether to fetch 6-month visit data (slow, ~5s per patient)
_stored_username = "" # Credentials stored at login for automatic re-login
_stored_password = ""
_threads_list_lock = threading.Lock() _threads_list_lock = threading.Lock()
global_pbar = None global_pbar = None
_global_pbar_lock = threading.Lock() _global_pbar_lock = threading.Lock()
@@ -190,10 +186,8 @@ def new_token():
finally: finally:
if attempt < ERROR_MAX_RETRY - 1: if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY) sleep(WAIT_BEFORE_RETRY)
# Refresh token exhausted — attempt full re-login with stored credentials logging.critical("Persistent error in refresh_token")
logging.warning("Refresh token exhausted. Attempting re-login with stored credentials.") raise httpx.RequestError(message="Persistent error in refresh_token")
_do_login(_stored_username, _stored_password)
logging.info("Re-login successful. New tokens acquired.")
def api_call_with_retry(func): def api_call_with_retry(func):
@@ -218,10 +212,7 @@ def api_call_with_retry(func):
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
logging.info(f"Token expired for {func_name}. Refreshing token.") logging.info(f"Token expired for {func_name}. Refreshing token.")
try:
new_token() new_token()
except (httpx.RequestError, httpx.HTTPStatusError) as token_exc:
logging.warning(f"Token refresh/re-login failed for {func_name}: {token_exc}")
if attempt < ERROR_MAX_RETRY - 1: if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY) sleep(WAIT_BEFORE_RETRY)
@@ -234,18 +225,8 @@ def api_call_with_retry(func):
sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES) sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES)
break # Exit for loop to restart batch in while True break # Exit for loop to restart batch in while True
else: else:
# All automatic batches exhausted — apply on_retry_exhausted policy # All automatic batches exhausted, ask the user
with _user_interaction_lock: with _user_interaction_lock:
if on_retry_exhausted == "ignore":
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
logging.warning(f"[AUTO-IGNORE] Skipping {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
return None
elif on_retry_exhausted == "abort":
logging.critical(f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}")
raise httpx.RequestError(message=f"Persistent error in {func_name} (auto-aborted)")
else: # "ask" — display error then interactive prompt
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]") console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
console.print(f"[red]Exception: {exc}[/red]") console.print(f"[red]Exception: {exc}[/red]")
@@ -263,6 +244,7 @@ def api_call_with_retry(func):
batch_count = 1 # Reset batch counter for the next interactive round batch_count = 1 # Reset batch counter for the next interactive round
break # Exit for loop to restart batch in while True break # Exit for loop to restart batch in while True
elif choice == "Ignore (return None and continue)": elif choice == "Ignore (return None and continue)":
# Retrieve context if available
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"}) ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}") logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
return None return None
@@ -277,37 +259,8 @@ def api_call_with_retry(func):
# BLOCK 3: AUTHENTICATION # BLOCK 3: AUTHENTICATION
# ============================================================================ # ============================================================================
def _do_login(username, password):
"""Performs the two-step authentication (IAM → RC) with the given credentials.
Updates global access_token and refresh_token on success.
Raises httpx.RequestError or httpx.HTTPStatusError on failure.
Must NOT acquire _token_refresh_lock (caller's responsibility).
"""
global access_token, refresh_token
client = get_httpx_client()
client.base_url = IAM_URL
response = client.post(API_AUTH_LOGIN_ENDPOINT,
json={"username": username, "password": password},
timeout=20)
response.raise_for_status()
master_token = response.json()["access_token"]
user_id = response.json()["userId"]
client = get_httpx_client()
client.base_url = RC_URL
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT,
headers={"Authorization": f"Bearer {master_token}"},
json={"userId": user_id, "clientId": RC_APP_ID,
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
timeout=20)
response.raise_for_status()
access_token = response.json()["access_token"]
refresh_token = response.json()["refresh_token"]
def login(): def login():
global _stored_username, _stored_password global access_token, refresh_token
user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask()) user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask())
password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask()) password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask())
@@ -315,18 +268,42 @@ def login():
return "Exit" return "Exit"
try: try:
_do_login(user_name, password) client = get_httpx_client()
client.base_url = IAM_URL
response = client.post(API_AUTH_LOGIN_ENDPOINT, json={"username": user_name, "password": password},
timeout=20)
response.raise_for_status()
master_token = response.json()["access_token"]
user_id = response.json()["userId"]
except httpx.RequestError as exc: except httpx.RequestError as exc:
print(f"Login Error : {exc}") print(f"Login Error : {exc}")
logging.warning(f"Login Error : {exc}") logging.warning(f"Login Error : {exc}")
return "Error" return "Error"
except httpx.HTTPStatusError as exc: except httpx.HTTPStatusError as exc:
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}") print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
logging.warning(f"Login Error : {exc.response.status_code} for Url {exc.request.url}") logging.warning(
f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
return "Error"
try:
client = get_httpx_client()
client.base_url = RC_URL
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {master_token}"},
json={"userId": user_id, "clientId": RC_APP_ID,
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
timeout=20)
response.raise_for_status()
access_token = response.json()["access_token"]
refresh_token = response.json()["refresh_token"]
except httpx.RequestError as exc:
print(f"Login Error : {exc}")
logging.warning(f"Login Error : {exc}")
return "Error"
except httpx.HTTPStatusError as exc:
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
logging.warning(f"Login Error : {exc}")
return "Error" return "Error"
_stored_username = user_name
_stored_password = password
print() print()
print("Login Success") print("Login Success")
return "Success" return "Success"
@@ -336,100 +313,6 @@ def login():
# BLOCK 3B: FILE UTILITIES # BLOCK 3B: FILE UTILITIES
# ============================================================================ # ============================================================================
def ask_on_retry_exhausted():
"""Asks the user what to do when all API retry batches are exhausted."""
global on_retry_exhausted
choice = questionary.select(
"On retry exhausted :",
choices=[
"Ask (interactive prompt)",
"Ignore (return None and continue)",
"Abort (stop script)"
]
).ask()
if choice is None or choice == "Ask (interactive prompt)":
on_retry_exhausted = "ask"
elif choice == "Ignore (return None and continue)":
on_retry_exhausted = "ignore"
else:
on_retry_exhausted = "abort"
def ask_fetch_six_month_visit():
"""Asks the user whether to fetch 6-month visit data (slow API call, ~5s per patient)."""
global fetch_six_month_visit
choice = questionary.select(
"Fetch 6-month visit progress data? (slow, ~5s per patient) :",
choices=[
"No (skip, faster execution)",
"Yes (fetch 6-month visit data)"
]
).ask()
fetch_six_month_visit = (choice == "Yes (fetch 6-month visit data)")
def wait_for_scheduled_launch():
"""Asks the user when to start the processing and waits if needed.
Options: Immediately / In X minutes / At HH:MM
"""
choice = questionary.select(
"When to start processing ?",
choices=["Immediately", "In X minutes", "At HH:MM"]
).ask()
if choice is None or choice == "Immediately":
return
if choice == "In X minutes":
minutes_str = questionary.text(
"Number of minutes :",
validate=lambda x: x.isdigit() and int(x) > 0
).ask()
if not minutes_str:
return
target_time = datetime.now() + timedelta(minutes=int(minutes_str))
else: # "At HH:MM"
time_str = questionary.text(
"Start time (HH:MM) :",
validate=lambda x: bool(re.match(r'^\d{2}:\d{2}$', x)) and
0 <= int(x.split(':')[0]) <= 23 and
0 <= int(x.split(':')[1]) <= 59
).ask()
if not time_str:
return
now = datetime.now()
h, m = int(time_str.split(':')[0]), int(time_str.split(':')[1])
target_time = now.replace(hour=h, minute=m, second=0, microsecond=0)
if target_time <= now:
console.print("[yellow]⚠ Specified time is already past. Starting immediately.[/yellow]")
return
print()
try:
while True:
remaining = target_time - datetime.now()
if remaining.total_seconds() <= 0:
break
total_secs = int(remaining.total_seconds())
h = total_secs // 3600
m = (total_secs % 3600) // 60
s = total_secs % 60
target_str = target_time.strftime('%H:%M:%S')
print(f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ",
end="", flush=True)
sleep(1)
# Flush keyboard buffer to prevent stray keystrokes from polluting subsequent prompts
while msvcrt.kbhit():
msvcrt.getwch()
print()
console.print("[green]✓ Starting processing.[/green]")
except KeyboardInterrupt:
print()
console.print("[bold red]Launch cancelled by user.[/bold red]")
raise SystemExit(0)
def load_json_file(filename): def load_json_file(filename):
""" """
Load a JSON file from disk. Load a JSON file from disk.
@@ -617,12 +500,6 @@ def _find_questionnaire_by_id(qcm_dict, qcm_id):
if not isinstance(qcm_dict, dict): if not isinstance(qcm_dict, dict):
return None return None
qcm_data = qcm_dict.get(qcm_id) qcm_data = qcm_dict.get(qcm_id)
if qcm_data and qcm_data.get("_count", 1) > 1:
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
logging.error(
f"[DUPLICATE QCM] Patient {ctx['id']} ({ctx['pseudo']}): "
f"Questionnaire id='{qcm_id}' appeared {qcm_data['_count']} times in API response — using last received copy"
)
return qcm_data.get("answers") if qcm_data else None return qcm_data.get("answers") if qcm_data else None
@@ -630,32 +507,20 @@ def _find_questionnaire_by_name(qcm_dict, name):
"""Finds a questionnaire by name (sequential search, returns first match).""" """Finds a questionnaire by name (sequential search, returns first match)."""
if not isinstance(qcm_dict, dict): if not isinstance(qcm_dict, dict):
return None return None
matches = [qcm for qcm in qcm_dict.values() for qcm in qcm_dict.values():
if get_nested_value(qcm, ["questionnaire", "name"]) == name] if get_nested_value(qcm, ["questionnaire", "name"]) == name:
if len(matches) > 1: return qcm.get("answers")
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"}) return None
ids = [get_nested_value(q, ["questionnaire", "id"]) for q in matches]
logging.error(
f"[DUPLICATE QCM] Patient {ctx['id']} ({ctx['pseudo']}): "
f"Questionnaire name='{name}' matches {len(matches)} entries (ids: {ids}) — returning first match"
)
return matches[0].get("answers") if matches else None
def _find_questionnaire_by_category(qcm_dict, category): def _find_questionnaire_by_category(qcm_dict, category):
"""Finds a questionnaire by category (sequential search, returns first match).""" """Finds a questionnaire by category (sequential search, returns first match)."""
if not isinstance(qcm_dict, dict): if not isinstance(qcm_dict, dict):
return None return None
matches = [qcm for qcm in qcm_dict.values() for qcm in qcm_dict.values():
if get_nested_value(qcm, ["questionnaire", "category"]) == category] if get_nested_value(qcm, ["questionnaire", "category"]) == category:
if len(matches) > 1: return qcm.get("answers")
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"}) return None
ids = [get_nested_value(q, ["questionnaire", "id"]) for q in matches]
logging.error(
f"[DUPLICATE QCM] Patient {ctx['id']} ({ctx['pseudo']}): "
f"Questionnaire category='{category}' matches {len(matches)} entries (ids: {ids}) — returning first match"
)
return matches[0].get("answers") if matches else None
def _get_field_value_from_questionnaire(all_questionnaires, field_config): def _get_field_value_from_questionnaire(all_questionnaires, field_config):
@@ -1097,13 +962,6 @@ def get_all_questionnaires_by_patient(patient_id, record_data):
response.raise_for_status() response.raise_for_status()
response_data = response.json() response_data = response.json()
# First pass: count occurrences of each q_id to detect duplicates at lookup time
q_id_counts = {}
for item in response_data:
q_id = get_nested_value(item, path=["questionnaire", "id"])
if q_id:
q_id_counts[q_id] = q_id_counts.get(q_id, 0) + 1
# Build dictionary with questionnaire metadata for searching # Build dictionary with questionnaire metadata for searching
results = {} results = {}
for item in response_data: for item in response_data:
@@ -1118,8 +976,7 @@ def get_all_questionnaires_by_patient(patient_id, record_data):
"name": q_name, "name": q_name,
"category": q_category "category": q_category
}, },
"answers": answers, "answers": answers
"_count": q_id_counts[q_id]
} }
return results return results
@@ -1278,12 +1135,8 @@ def _process_inclusion_data(inclusion, organization):
output_inclusion = {} output_inclusion = {}
# --- Prepare all data sources --- # --- Prepare all data sources ---
# 1. Launch Visit Search asynchronously (it's slow, ~5s) — only if enabled by user # 1. 6-month visit loading disabled on this branch (No-6-Month-Visit)
# We use run_with_context to pass the patient identity to the new thread # visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2)
if fetch_six_month_visit:
visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2)
else:
visit_future = None
# 2. Prepare inclusion_data: enrich inclusion with organization info # 2. Prepare inclusion_data: enrich inclusion with organization info
inclusion_data = dict(inclusion) inclusion_data = dict(inclusion)
@@ -1307,10 +1160,7 @@ def _process_inclusion_data(inclusion, organization):
logging.error(f"Error fetching request data for patient {patient_id}: {e}") logging.error(f"Error fetching request data for patient {patient_id}: {e}")
request_data = None request_data = None
try: # 6-month visit loading disabled on this branch (No-6-Month-Visit)
six_month_visit_data = visit_future.result() if visit_future is not None else {}
except Exception as e:
logging.error(f"Error searching 6-month visit for patient {pseudo}: {e}")
six_month_visit_data = None six_month_visit_data = None
# --- Process all fields from configuration --- # --- Process all fields from configuration ---
@@ -1378,19 +1228,10 @@ def main():
if login_status == "Exit": if login_status == "Exit":
return return
print()
ask_fetch_six_month_visit()
print() print()
number_of_threads = int((questionary.text("Number of threads :", default="12", number_of_threads = int((questionary.text("Number of threads :", default="12",
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask())) validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask()))
print()
ask_on_retry_exhausted()
print()
wait_for_scheduled_launch()
print() print()
load_inclusions_mapping_config() load_inclusions_mapping_config()
load_organizations_mapping_config() load_organizations_mapping_config()

Binary file not shown.