Compare commits

..

14 Commits

7 changed files with 173 additions and 60 deletions

Binary file not shown.

View File

@@ -21,7 +21,7 @@
# identification, and support for complex data extraction using JSON path expressions. # identification, and support for complex data extraction using JSON path expressions.
import json import json
import logging import logging
import msvcrt
import os import os
import re import re
import sys import sys
@@ -118,6 +118,9 @@ access_token = ""
refresh_token = "" refresh_token = ""
threads_list = [] threads_list = []
_token_refresh_lock = threading.Lock() _token_refresh_lock = threading.Lock()
on_retry_exhausted = "ask" # "ask" | "ignore" | "abort" — set at startup
_stored_username = "" # Credentials stored at login for automatic re-login
_stored_password = ""
_threads_list_lock = threading.Lock() _threads_list_lock = threading.Lock()
global_pbar = None global_pbar = None
_global_pbar_lock = threading.Lock() _global_pbar_lock = threading.Lock()
@@ -186,8 +189,10 @@ def new_token():
finally: finally:
if attempt < ERROR_MAX_RETRY - 1: if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY) sleep(WAIT_BEFORE_RETRY)
logging.critical("Persistent error in refresh_token") # Refresh token exhausted — attempt full re-login with stored credentials
raise httpx.RequestError(message="Persistent error in refresh_token") logging.warning("Refresh token exhausted. Attempting re-login with stored credentials.")
_do_login(_stored_username, _stored_password)
logging.info("Re-login successful. New tokens acquired.")
def api_call_with_retry(func): def api_call_with_retry(func):
@@ -212,7 +217,10 @@ def api_call_with_retry(func):
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
logging.info(f"Token expired for {func_name}. Refreshing token.") logging.info(f"Token expired for {func_name}. Refreshing token.")
new_token() try:
new_token()
except (httpx.RequestError, httpx.HTTPStatusError) as token_exc:
logging.warning(f"Token refresh/re-login failed for {func_name}: {token_exc}")
if attempt < ERROR_MAX_RETRY - 1: if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY) sleep(WAIT_BEFORE_RETRY)
@@ -225,32 +233,41 @@ def api_call_with_retry(func):
sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES) sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES)
break # Exit for loop to restart batch in while True break # Exit for loop to restart batch in while True
else: else:
# All automatic batches exhausted, ask the user # All automatic batches exhausted — apply on_retry_exhausted policy
with _user_interaction_lock: with _user_interaction_lock:
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]") if on_retry_exhausted == "ignore":
console.print(f"[red]Exception: {exc}[/red]")
choice = questionary.select(
f"What would you like to do for {func_name}?",
choices=[
"Retry (try another batch of retries)",
"Ignore (return None and continue)",
"Stop script (critical error)"
]
).ask()
if choice == "Retry (try another batch of retries)":
logging.info(f"User chose to retry {func_name}. Restarting batch sequence.")
batch_count = 1 # Reset batch counter for the next interactive round
break # Exit for loop to restart batch in while True
elif choice == "Ignore (return None and continue)":
# Retrieve context if available
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"}) ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}") logging.warning(f"[AUTO-IGNORE] Skipping {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
return None return None
else:
logging.critical(f"User chose to stop script after persistent error in {func_name}.") elif on_retry_exhausted == "abort":
raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)") logging.critical(f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}")
raise httpx.RequestError(message=f"Persistent error in {func_name} (auto-aborted)")
else: # "ask" — display error then interactive prompt
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
console.print(f"[red]Exception: {exc}[/red]")
choice = questionary.select(
f"What would you like to do for {func_name}?",
choices=[
"Retry (try another batch of retries)",
"Ignore (return None and continue)",
"Stop script (critical error)"
]
).ask()
if choice == "Retry (try another batch of retries)":
logging.info(f"User chose to retry {func_name}. Restarting batch sequence.")
batch_count = 1 # Reset batch counter for the next interactive round
break # Exit for loop to restart batch in while True
elif choice == "Ignore (return None and continue)":
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
return None
else:
logging.critical(f"User chose to stop script after persistent error in {func_name}.")
raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)")
return wrapper return wrapper
@@ -259,51 +276,56 @@ def api_call_with_retry(func):
# BLOCK 3: AUTHENTICATION # BLOCK 3: AUTHENTICATION
# ============================================================================ # ============================================================================
def login(): def _do_login(username, password):
"""Performs the two-step authentication (IAM → RC) with the given credentials.
Updates global access_token and refresh_token on success.
Raises httpx.RequestError or httpx.HTTPStatusError on failure.
Must NOT acquire _token_refresh_lock (caller's responsibility).
"""
global access_token, refresh_token global access_token, refresh_token
client = get_httpx_client()
client.base_url = IAM_URL
response = client.post(API_AUTH_LOGIN_ENDPOINT,
json={"username": username, "password": password},
timeout=20)
response.raise_for_status()
master_token = response.json()["access_token"]
user_id = response.json()["userId"]
client = get_httpx_client()
client.base_url = RC_URL
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT,
headers={"Authorization": f"Bearer {master_token}"},
json={"userId": user_id, "clientId": RC_APP_ID,
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
timeout=20)
response.raise_for_status()
access_token = response.json()["access_token"]
refresh_token = response.json()["refresh_token"]
def login():
global _stored_username, _stored_password
user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask()) user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask())
password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask()) password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask())
if not (user_name and password): if not (user_name and password):
return "Exit" return "Exit"
try: try:
client = get_httpx_client() _do_login(user_name, password)
client.base_url = IAM_URL
response = client.post(API_AUTH_LOGIN_ENDPOINT, json={"username": user_name, "password": password},
timeout=20)
response.raise_for_status()
master_token = response.json()["access_token"]
user_id = response.json()["userId"]
except httpx.RequestError as exc: except httpx.RequestError as exc:
print(f"Login Error : {exc}") print(f"Login Error : {exc}")
logging.warning(f"Login Error : {exc}") logging.warning(f"Login Error : {exc}")
return "Error" return "Error"
except httpx.HTTPStatusError as exc: except httpx.HTTPStatusError as exc:
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}") print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
logging.warning( logging.warning(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
return "Error"
try:
client = get_httpx_client()
client.base_url = RC_URL
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {master_token}"},
json={"userId": user_id, "clientId": RC_APP_ID,
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
timeout=20)
response.raise_for_status()
access_token = response.json()["access_token"]
refresh_token = response.json()["refresh_token"]
except httpx.RequestError as exc:
print(f"Login Error : {exc}")
logging.warning(f"Login Error : {exc}")
return "Error"
except httpx.HTTPStatusError as exc:
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
logging.warning(f"Login Error : {exc}")
return "Error" return "Error"
_stored_username = user_name
_stored_password = password
print() print()
print("Login Success") print("Login Success")
return "Success" return "Success"
@@ -313,6 +335,87 @@ def login():
# BLOCK 3B: FILE UTILITIES # BLOCK 3B: FILE UTILITIES
# ============================================================================ # ============================================================================
def ask_on_retry_exhausted():
"""Asks the user what to do when all API retry batches are exhausted."""
global on_retry_exhausted
choice = questionary.select(
"On retry exhausted :",
choices=[
"Ask (interactive prompt)",
"Ignore (return None and continue)",
"Abort (stop script)"
]
).ask()
if choice is None or choice == "Ask (interactive prompt)":
on_retry_exhausted = "ask"
elif choice == "Ignore (return None and continue)":
on_retry_exhausted = "ignore"
else:
on_retry_exhausted = "abort"
def wait_for_scheduled_launch():
"""Asks the user when to start the processing and waits if needed.
Options: Immediately / In X minutes / At HH:MM
"""
choice = questionary.select(
"When to start processing ?",
choices=["Immediately", "In X minutes", "At HH:MM"]
).ask()
if choice is None or choice == "Immediately":
return
if choice == "In X minutes":
minutes_str = questionary.text(
"Number of minutes :",
validate=lambda x: x.isdigit() and int(x) > 0
).ask()
if not minutes_str:
return
target_time = datetime.now() + timedelta(minutes=int(minutes_str))
else: # "At HH:MM"
time_str = questionary.text(
"Start time (HH:MM) :",
validate=lambda x: bool(re.match(r'^\d{2}:\d{2}$', x)) and
0 <= int(x.split(':')[0]) <= 23 and
0 <= int(x.split(':')[1]) <= 59
).ask()
if not time_str:
return
now = datetime.now()
h, m = int(time_str.split(':')[0]), int(time_str.split(':')[1])
target_time = now.replace(hour=h, minute=m, second=0, microsecond=0)
if target_time <= now:
console.print("[yellow]⚠ Specified time is already past. Starting immediately.[/yellow]")
return
print()
try:
while True:
remaining = target_time - datetime.now()
if remaining.total_seconds() <= 0:
break
total_secs = int(remaining.total_seconds())
h = total_secs // 3600
m = (total_secs % 3600) // 60
s = total_secs % 60
target_str = target_time.strftime('%H:%M:%S')
print(f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ",
end="", flush=True)
sleep(1)
# Flush keyboard buffer to prevent stray keystrokes from polluting subsequent prompts
while msvcrt.kbhit():
msvcrt.getwch()
print()
console.print("[green]✓ Starting processing.[/green]")
except KeyboardInterrupt:
print()
console.print("[bold red]Launch cancelled by user.[/bold red]")
raise SystemExit(0)
def load_json_file(filename): def load_json_file(filename):
""" """
Load a JSON file from disk. Load a JSON file from disk.
@@ -1135,8 +1238,9 @@ def _process_inclusion_data(inclusion, organization):
output_inclusion = {} output_inclusion = {}
# --- Prepare all data sources --- # --- Prepare all data sources ---
# 1. 6-month visit loading disabled on this branch (No-6-Month-Visit) # 1. Launch Visit Search asynchronously (it's slow, ~5s)
# visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2) # We use run_with_context to pass the patient identity to the new thread
visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2)
# 2. Prepare inclusion_data: enrich inclusion with organization info # 2. Prepare inclusion_data: enrich inclusion with organization info
inclusion_data = dict(inclusion) inclusion_data = dict(inclusion)
@@ -1160,8 +1264,11 @@ def _process_inclusion_data(inclusion, organization):
logging.error(f"Error fetching request data for patient {patient_id}: {e}") logging.error(f"Error fetching request data for patient {patient_id}: {e}")
request_data = None request_data = None
# 6-month visit loading disabled on this branch (No-6-Month-Visit) try:
six_month_visit_data = None six_month_visit_data = visit_future.result()
except Exception as e:
logging.error(f"Error searching 6-month visit for patient {pseudo}: {e}")
six_month_visit_data = None
# --- Process all fields from configuration --- # --- Process all fields from configuration ---
process_inclusions_mapping(output_inclusion, inclusion_data, record_data, request_data, all_questionnaires, six_month_visit_data) process_inclusions_mapping(output_inclusion, inclusion_data, record_data, request_data, all_questionnaires, six_month_visit_data)
@@ -1232,6 +1339,12 @@ def main():
number_of_threads = int((questionary.text("Number of threads :", default="12", number_of_threads = int((questionary.text("Number of threads :", default="12",
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask())) validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask()))
print()
ask_on_retry_exhausted()
print()
wait_for_scheduled_launch()
print() print()
load_inclusions_mapping_config() load_inclusions_mapping_config()
load_organizations_mapping_config() load_organizations_mapping_config()