Compare commits

...

2 Commits

View File

@@ -21,7 +21,7 @@
# identification, and support for complex data extraction using JSON path expressions. # identification, and support for complex data extraction using JSON path expressions.
import json import json
import logging import logging
import msvcrt
import os import os
import re import re
import sys import sys
@@ -119,6 +119,8 @@ refresh_token = ""
threads_list = [] threads_list = []
_token_refresh_lock = threading.Lock() _token_refresh_lock = threading.Lock()
on_retry_exhausted = "ask" # "ask" | "ignore" | "abort" — set at startup on_retry_exhausted = "ask" # "ask" | "ignore" | "abort" — set at startup
_stored_username = "" # Credentials stored at login for automatic re-login
_stored_password = ""
_threads_list_lock = threading.Lock() _threads_list_lock = threading.Lock()
global_pbar = None global_pbar = None
_global_pbar_lock = threading.Lock() _global_pbar_lock = threading.Lock()
@@ -187,8 +189,10 @@ def new_token():
finally: finally:
if attempt < ERROR_MAX_RETRY - 1: if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY) sleep(WAIT_BEFORE_RETRY)
logging.critical("Persistent error in refresh_token") # Refresh token exhausted — attempt full re-login with stored credentials
raise httpx.RequestError(message="Persistent error in refresh_token") logging.warning("Refresh token exhausted. Attempting re-login with stored credentials.")
_do_login(_stored_username, _stored_password)
logging.info("Re-login successful. New tokens acquired.")
def api_call_with_retry(func): def api_call_with_retry(func):
@@ -213,7 +217,10 @@ def api_call_with_retry(func):
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
logging.info(f"Token expired for {func_name}. Refreshing token.") logging.info(f"Token expired for {func_name}. Refreshing token.")
try:
new_token() new_token()
except (httpx.RequestError, httpx.HTTPStatusError) as token_exc:
logging.warning(f"Token refresh/re-login failed for {func_name}: {token_exc}")
if attempt < ERROR_MAX_RETRY - 1: if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY) sleep(WAIT_BEFORE_RETRY)
@@ -228,21 +235,19 @@ def api_call_with_retry(func):
else: else:
# All automatic batches exhausted — apply on_retry_exhausted policy # All automatic batches exhausted — apply on_retry_exhausted policy
with _user_interaction_lock: with _user_interaction_lock:
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
console.print(f"[red]Exception: {exc}[/red]")
if on_retry_exhausted == "ignore": if on_retry_exhausted == "ignore":
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"}) ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
logging.warning(f"[AUTO-IGNORE] Skipping {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}") logging.warning(f"[AUTO-IGNORE] Skipping {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
console.print(f"[yellow]⚠ Auto-ignore: skipping {func_name}.[/yellow]")
return None return None
elif on_retry_exhausted == "abort": elif on_retry_exhausted == "abort":
logging.critical(f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}") logging.critical(f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}")
console.print(f"[bold red]Auto-abort: stopping script.[/bold red]")
raise httpx.RequestError(message=f"Persistent error in {func_name} (auto-aborted)") raise httpx.RequestError(message=f"Persistent error in {func_name} (auto-aborted)")
else: # "ask" — interactive prompt (original behaviour) else: # "ask" — display error then interactive prompt
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
console.print(f"[red]Exception: {exc}[/red]")
choice = questionary.select( choice = questionary.select(
f"What would you like to do for {func_name}?", f"What would you like to do for {func_name}?",
choices=[ choices=[
@@ -271,51 +276,56 @@ def api_call_with_retry(func):
# BLOCK 3: AUTHENTICATION # BLOCK 3: AUTHENTICATION
# ============================================================================ # ============================================================================
def login(): def _do_login(username, password):
"""Performs the two-step authentication (IAM → RC) with the given credentials.
Updates global access_token and refresh_token on success.
Raises httpx.RequestError or httpx.HTTPStatusError on failure.
Must NOT acquire _token_refresh_lock (caller's responsibility).
"""
global access_token, refresh_token global access_token, refresh_token
client = get_httpx_client()
client.base_url = IAM_URL
response = client.post(API_AUTH_LOGIN_ENDPOINT,
json={"username": username, "password": password},
timeout=20)
response.raise_for_status()
master_token = response.json()["access_token"]
user_id = response.json()["userId"]
client = get_httpx_client()
client.base_url = RC_URL
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT,
headers={"Authorization": f"Bearer {master_token}"},
json={"userId": user_id, "clientId": RC_APP_ID,
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
timeout=20)
response.raise_for_status()
access_token = response.json()["access_token"]
refresh_token = response.json()["refresh_token"]
def login():
global _stored_username, _stored_password
user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask()) user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask())
password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask()) password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask())
if not (user_name and password): if not (user_name and password):
return "Exit" return "Exit"
try: try:
client = get_httpx_client() _do_login(user_name, password)
client.base_url = IAM_URL
response = client.post(API_AUTH_LOGIN_ENDPOINT, json={"username": user_name, "password": password},
timeout=20)
response.raise_for_status()
master_token = response.json()["access_token"]
user_id = response.json()["userId"]
except httpx.RequestError as exc: except httpx.RequestError as exc:
print(f"Login Error : {exc}") print(f"Login Error : {exc}")
logging.warning(f"Login Error : {exc}") logging.warning(f"Login Error : {exc}")
return "Error" return "Error"
except httpx.HTTPStatusError as exc: except httpx.HTTPStatusError as exc:
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}") print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
logging.warning( logging.warning(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
return "Error"
try:
client = get_httpx_client()
client.base_url = RC_URL
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {master_token}"},
json={"userId": user_id, "clientId": RC_APP_ID,
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
timeout=20)
response.raise_for_status()
access_token = response.json()["access_token"]
refresh_token = response.json()["refresh_token"]
except httpx.RequestError as exc:
print(f"Login Error : {exc}")
logging.warning(f"Login Error : {exc}")
return "Error"
except httpx.HTTPStatusError as exc:
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
logging.warning(f"Login Error : {exc}")
return "Error" return "Error"
_stored_username = user_name
_stored_password = password
print() print()
print("Login Success") print("Login Success")
return "Success" return "Success"
@@ -395,6 +405,9 @@ def wait_for_scheduled_launch():
print(f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ", print(f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ",
end="", flush=True) end="", flush=True)
sleep(1) sleep(1)
# Flush keyboard buffer to prevent stray keystrokes from polluting subsequent prompts
while msvcrt.kbhit():
msvcrt.getwch()
print() print()
console.print("[green]✓ Starting processing.[/green]") console.print("[green]✓ Starting processing.[/green]")
except KeyboardInterrupt: except KeyboardInterrupt: