Compare commits

...

3 Commits

2 changed files with 75 additions and 42 deletions

View File

@@ -21,7 +21,7 @@
# identification, and support for complex data extraction using JSON path expressions. # identification, and support for complex data extraction using JSON path expressions.
import json import json
import logging import logging
import msvcrt
import os import os
import re import re
import sys import sys
@@ -119,6 +119,9 @@ refresh_token = ""
threads_list = [] threads_list = []
_token_refresh_lock = threading.Lock() _token_refresh_lock = threading.Lock()
on_retry_exhausted = "ask" # "ask" | "ignore" | "abort" — set at startup on_retry_exhausted = "ask" # "ask" | "ignore" | "abort" — set at startup
fetch_six_month_visit = False # Whether to fetch 6-month visit data (slow, ~5s per patient)
_stored_username = "" # Credentials stored at login for automatic re-login
_stored_password = ""
_threads_list_lock = threading.Lock() _threads_list_lock = threading.Lock()
global_pbar = None global_pbar = None
_global_pbar_lock = threading.Lock() _global_pbar_lock = threading.Lock()
@@ -187,8 +190,10 @@ def new_token():
finally: finally:
if attempt < ERROR_MAX_RETRY - 1: if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY) sleep(WAIT_BEFORE_RETRY)
logging.critical("Persistent error in refresh_token") # Refresh token exhausted — attempt full re-login with stored credentials
raise httpx.RequestError(message="Persistent error in refresh_token") logging.warning("Refresh token exhausted. Attempting re-login with stored credentials.")
_do_login(_stored_username, _stored_password)
logging.info("Re-login successful. New tokens acquired.")
def api_call_with_retry(func): def api_call_with_retry(func):
@@ -213,7 +218,10 @@ def api_call_with_retry(func):
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
logging.info(f"Token expired for {func_name}. Refreshing token.") logging.info(f"Token expired for {func_name}. Refreshing token.")
new_token() try:
new_token()
except (httpx.RequestError, httpx.HTTPStatusError) as token_exc:
logging.warning(f"Token refresh/re-login failed for {func_name}: {token_exc}")
if attempt < ERROR_MAX_RETRY - 1: if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY) sleep(WAIT_BEFORE_RETRY)
@@ -228,21 +236,19 @@ def api_call_with_retry(func):
else: else:
# All automatic batches exhausted — apply on_retry_exhausted policy # All automatic batches exhausted — apply on_retry_exhausted policy
with _user_interaction_lock: with _user_interaction_lock:
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
console.print(f"[red]Exception: {exc}[/red]")
if on_retry_exhausted == "ignore": if on_retry_exhausted == "ignore":
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"}) ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
logging.warning(f"[AUTO-IGNORE] Skipping {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}") logging.warning(f"[AUTO-IGNORE] Skipping {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
console.print(f"[yellow]⚠ Auto-ignore: skipping {func_name}.[/yellow]")
return None return None
elif on_retry_exhausted == "abort": elif on_retry_exhausted == "abort":
logging.critical(f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}") logging.critical(f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}")
console.print(f"[bold red]Auto-abort: stopping script.[/bold red]")
raise httpx.RequestError(message=f"Persistent error in {func_name} (auto-aborted)") raise httpx.RequestError(message=f"Persistent error in {func_name} (auto-aborted)")
else: # "ask" — interactive prompt (original behaviour) else: # "ask" — display error then interactive prompt
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
console.print(f"[red]Exception: {exc}[/red]")
choice = questionary.select( choice = questionary.select(
f"What would you like to do for {func_name}?", f"What would you like to do for {func_name}?",
choices=[ choices=[
@@ -271,51 +277,56 @@ def api_call_with_retry(func):
# BLOCK 3: AUTHENTICATION # BLOCK 3: AUTHENTICATION
# ============================================================================ # ============================================================================
def login(): def _do_login(username, password):
"""Performs the two-step authentication (IAM → RC) with the given credentials.
Updates global access_token and refresh_token on success.
Raises httpx.RequestError or httpx.HTTPStatusError on failure.
Must NOT acquire _token_refresh_lock (caller's responsibility).
"""
global access_token, refresh_token global access_token, refresh_token
client = get_httpx_client()
client.base_url = IAM_URL
response = client.post(API_AUTH_LOGIN_ENDPOINT,
json={"username": username, "password": password},
timeout=20)
response.raise_for_status()
master_token = response.json()["access_token"]
user_id = response.json()["userId"]
client = get_httpx_client()
client.base_url = RC_URL
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT,
headers={"Authorization": f"Bearer {master_token}"},
json={"userId": user_id, "clientId": RC_APP_ID,
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
timeout=20)
response.raise_for_status()
access_token = response.json()["access_token"]
refresh_token = response.json()["refresh_token"]
def login():
global _stored_username, _stored_password
user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask()) user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask())
password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask()) password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask())
if not (user_name and password): if not (user_name and password):
return "Exit" return "Exit"
try: try:
client = get_httpx_client() _do_login(user_name, password)
client.base_url = IAM_URL
response = client.post(API_AUTH_LOGIN_ENDPOINT, json={"username": user_name, "password": password},
timeout=20)
response.raise_for_status()
master_token = response.json()["access_token"]
user_id = response.json()["userId"]
except httpx.RequestError as exc: except httpx.RequestError as exc:
print(f"Login Error : {exc}") print(f"Login Error : {exc}")
logging.warning(f"Login Error : {exc}") logging.warning(f"Login Error : {exc}")
return "Error" return "Error"
except httpx.HTTPStatusError as exc: except httpx.HTTPStatusError as exc:
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}") print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
logging.warning( logging.warning(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
return "Error"
try:
client = get_httpx_client()
client.base_url = RC_URL
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {master_token}"},
json={"userId": user_id, "clientId": RC_APP_ID,
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
timeout=20)
response.raise_for_status()
access_token = response.json()["access_token"]
refresh_token = response.json()["refresh_token"]
except httpx.RequestError as exc:
print(f"Login Error : {exc}")
logging.warning(f"Login Error : {exc}")
return "Error"
except httpx.HTTPStatusError as exc:
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
logging.warning(f"Login Error : {exc}")
return "Error" return "Error"
_stored_username = user_name
_stored_password = password
print() print()
print("Login Success") print("Login Success")
return "Success" return "Success"
@@ -344,6 +355,19 @@ def ask_on_retry_exhausted():
on_retry_exhausted = "abort" on_retry_exhausted = "abort"
def ask_fetch_six_month_visit():
"""Asks the user whether to fetch 6-month visit data (slow API call, ~5s per patient)."""
global fetch_six_month_visit
choice = questionary.select(
"Fetch 6-month visit progress data? (slow, ~5s per patient) :",
choices=[
"No (skip, faster execution)",
"Yes (fetch 6-month visit data)"
]
).ask()
fetch_six_month_visit = (choice == "Yes (fetch 6-month visit data)")
def wait_for_scheduled_launch(): def wait_for_scheduled_launch():
"""Asks the user when to start the processing and waits if needed. """Asks the user when to start the processing and waits if needed.
Options: Immediately / In X minutes / At HH:MM Options: Immediately / In X minutes / At HH:MM
@@ -395,6 +419,9 @@ def wait_for_scheduled_launch():
print(f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ", print(f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ",
end="", flush=True) end="", flush=True)
sleep(1) sleep(1)
# Flush keyboard buffer to prevent stray keystrokes from polluting subsequent prompts
while msvcrt.kbhit():
msvcrt.getwch()
print() print()
console.print("[green]✓ Starting processing.[/green]") console.print("[green]✓ Starting processing.[/green]")
except KeyboardInterrupt: except KeyboardInterrupt:
@@ -1225,9 +1252,12 @@ def _process_inclusion_data(inclusion, organization):
output_inclusion = {} output_inclusion = {}
# --- Prepare all data sources --- # --- Prepare all data sources ---
# 1. Launch Visit Search asynchronously (it's slow, ~5s) # 1. Launch Visit Search asynchronously (it's slow, ~5s) — only if enabled by user
# We use run_with_context to pass the patient identity to the new thread # We use run_with_context to pass the patient identity to the new thread
visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2) if fetch_six_month_visit:
visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2)
else:
visit_future = None
# 2. Prepare inclusion_data: enrich inclusion with organization info # 2. Prepare inclusion_data: enrich inclusion with organization info
inclusion_data = dict(inclusion) inclusion_data = dict(inclusion)
@@ -1252,7 +1282,7 @@ def _process_inclusion_data(inclusion, organization):
request_data = None request_data = None
try: try:
six_month_visit_data = visit_future.result() six_month_visit_data = visit_future.result() if visit_future is not None else {}
except Exception as e: except Exception as e:
logging.error(f"Error searching 6-month visit for patient {pseudo}: {e}") logging.error(f"Error searching 6-month visit for patient {pseudo}: {e}")
six_month_visit_data = None six_month_visit_data = None
@@ -1322,6 +1352,9 @@ def main():
if login_status == "Exit": if login_status == "Exit":
return return
print()
ask_fetch_six_month_visit()
print() print()
number_of_threads = int((questionary.text("Number of threads :", default="12", number_of_threads = int((questionary.text("Number of threads :", default="12",
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask())) validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask()))

Binary file not shown.