8 Commits

Author SHA1 Message Date
a1985741f8 clean 2026-02-09 22:52:17 +01:00
9ff3595a17 . 2026-02-09 22:31:02 +01:00
13e794eb34 ignore logs 2026-02-09 18:43:34 +01:00
e58f867bd0 ignore exe 2026-02-09 18:42:39 +01:00
57540d5159 ignore all jsons 2026-02-09 18:34:51 +01:00
d6943faf59 exclude log from git 2026-02-09 17:16:53 +01:00
8562c45f05 executable 2026-02-09 17:08:59 +01:00
1cc2c754b7 No-6-Month-Visit 2026-02-09 17:01:55 +01:00
8 changed files with 60 additions and 193 deletions

Binary file not shown.

Binary file not shown.

View File

@@ -21,7 +21,7 @@
# identification, and support for complex data extraction using JSON path expressions. # identification, and support for complex data extraction using JSON path expressions.
import json import json
import logging import logging
import msvcrt
import os import os
import re import re
import sys import sys
@@ -118,10 +118,6 @@ access_token = ""
refresh_token = "" refresh_token = ""
threads_list = [] threads_list = []
_token_refresh_lock = threading.Lock() _token_refresh_lock = threading.Lock()
on_retry_exhausted = "ask" # "ask" | "ignore" | "abort" — set at startup
fetch_six_month_visit = False # Whether to fetch 6-month visit data (slow, ~5s per patient)
_stored_username = "" # Credentials stored at login for automatic re-login
_stored_password = ""
_threads_list_lock = threading.Lock() _threads_list_lock = threading.Lock()
global_pbar = None global_pbar = None
_global_pbar_lock = threading.Lock() _global_pbar_lock = threading.Lock()
@@ -190,10 +186,8 @@ def new_token():
finally: finally:
if attempt < ERROR_MAX_RETRY - 1: if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY) sleep(WAIT_BEFORE_RETRY)
# Refresh token exhausted — attempt full re-login with stored credentials logging.critical("Persistent error in refresh_token")
logging.warning("Refresh token exhausted. Attempting re-login with stored credentials.") raise httpx.RequestError(message="Persistent error in refresh_token")
_do_login(_stored_username, _stored_password)
logging.info("Re-login successful. New tokens acquired.")
def api_call_with_retry(func): def api_call_with_retry(func):
@@ -218,10 +212,7 @@ def api_call_with_retry(func):
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
logging.info(f"Token expired for {func_name}. Refreshing token.") logging.info(f"Token expired for {func_name}. Refreshing token.")
try:
new_token() new_token()
except (httpx.RequestError, httpx.HTTPStatusError) as token_exc:
logging.warning(f"Token refresh/re-login failed for {func_name}: {token_exc}")
if attempt < ERROR_MAX_RETRY - 1: if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY) sleep(WAIT_BEFORE_RETRY)
@@ -234,18 +225,8 @@ def api_call_with_retry(func):
sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES) sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES)
break # Exit for loop to restart batch in while True break # Exit for loop to restart batch in while True
else: else:
# All automatic batches exhausted — apply on_retry_exhausted policy # All automatic batches exhausted, ask the user
with _user_interaction_lock: with _user_interaction_lock:
if on_retry_exhausted == "ignore":
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
logging.warning(f"[AUTO-IGNORE] Skipping {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
return None
elif on_retry_exhausted == "abort":
logging.critical(f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}")
raise httpx.RequestError(message=f"Persistent error in {func_name} (auto-aborted)")
else: # "ask" — display error then interactive prompt
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]") console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
console.print(f"[red]Exception: {exc}[/red]") console.print(f"[red]Exception: {exc}[/red]")
@@ -263,6 +244,7 @@ def api_call_with_retry(func):
batch_count = 1 # Reset batch counter for the next interactive round batch_count = 1 # Reset batch counter for the next interactive round
break # Exit for loop to restart batch in while True break # Exit for loop to restart batch in while True
elif choice == "Ignore (return None and continue)": elif choice == "Ignore (return None and continue)":
# Retrieve context if available
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"}) ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}") logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
return None return None
@@ -277,37 +259,8 @@ def api_call_with_retry(func):
# BLOCK 3: AUTHENTICATION # BLOCK 3: AUTHENTICATION
# ============================================================================ # ============================================================================
def _do_login(username, password):
"""Performs the two-step authentication (IAM → RC) with the given credentials.
Updates global access_token and refresh_token on success.
Raises httpx.RequestError or httpx.HTTPStatusError on failure.
Must NOT acquire _token_refresh_lock (caller's responsibility).
"""
global access_token, refresh_token
client = get_httpx_client()
client.base_url = IAM_URL
response = client.post(API_AUTH_LOGIN_ENDPOINT,
json={"username": username, "password": password},
timeout=20)
response.raise_for_status()
master_token = response.json()["access_token"]
user_id = response.json()["userId"]
client = get_httpx_client()
client.base_url = RC_URL
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT,
headers={"Authorization": f"Bearer {master_token}"},
json={"userId": user_id, "clientId": RC_APP_ID,
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
timeout=20)
response.raise_for_status()
access_token = response.json()["access_token"]
refresh_token = response.json()["refresh_token"]
def login(): def login():
global _stored_username, _stored_password global access_token, refresh_token
user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask()) user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask())
password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask()) password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask())
@@ -315,18 +268,42 @@ def login():
return "Exit" return "Exit"
try: try:
_do_login(user_name, password) client = get_httpx_client()
client.base_url = IAM_URL
response = client.post(API_AUTH_LOGIN_ENDPOINT, json={"username": user_name, "password": password},
timeout=20)
response.raise_for_status()
master_token = response.json()["access_token"]
user_id = response.json()["userId"]
except httpx.RequestError as exc: except httpx.RequestError as exc:
print(f"Login Error : {exc}") print(f"Login Error : {exc}")
logging.warning(f"Login Error : {exc}") logging.warning(f"Login Error : {exc}")
return "Error" return "Error"
except httpx.HTTPStatusError as exc: except httpx.HTTPStatusError as exc:
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}") print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
logging.warning(f"Login Error : {exc.response.status_code} for Url {exc.request.url}") logging.warning(
f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
return "Error"
try:
client = get_httpx_client()
client.base_url = RC_URL
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {master_token}"},
json={"userId": user_id, "clientId": RC_APP_ID,
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
timeout=20)
response.raise_for_status()
access_token = response.json()["access_token"]
refresh_token = response.json()["refresh_token"]
except httpx.RequestError as exc:
print(f"Login Error : {exc}")
logging.warning(f"Login Error : {exc}")
return "Error"
except httpx.HTTPStatusError as exc:
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
logging.warning(f"Login Error : {exc}")
return "Error" return "Error"
_stored_username = user_name
_stored_password = password
print() print()
print("Login Success") print("Login Success")
return "Success" return "Success"
@@ -336,100 +313,6 @@ def login():
# BLOCK 3B: FILE UTILITIES # BLOCK 3B: FILE UTILITIES
# ============================================================================ # ============================================================================
def ask_on_retry_exhausted():
"""Asks the user what to do when all API retry batches are exhausted."""
global on_retry_exhausted
choice = questionary.select(
"On retry exhausted :",
choices=[
"Ask (interactive prompt)",
"Ignore (return None and continue)",
"Abort (stop script)"
]
).ask()
if choice is None or choice == "Ask (interactive prompt)":
on_retry_exhausted = "ask"
elif choice == "Ignore (return None and continue)":
on_retry_exhausted = "ignore"
else:
on_retry_exhausted = "abort"
def ask_fetch_six_month_visit():
"""Asks the user whether to fetch 6-month visit data (slow API call, ~5s per patient)."""
global fetch_six_month_visit
choice = questionary.select(
"Fetch 6-month visit progress data? (slow, ~5s per patient) :",
choices=[
"No (skip, faster execution)",
"Yes (fetch 6-month visit data)"
]
).ask()
fetch_six_month_visit = (choice == "Yes (fetch 6-month visit data)")
def wait_for_scheduled_launch():
"""Asks the user when to start the processing and waits if needed.
Options: Immediately / In X minutes / At HH:MM
"""
choice = questionary.select(
"When to start processing ?",
choices=["Immediately", "In X minutes", "At HH:MM"]
).ask()
if choice is None or choice == "Immediately":
return
if choice == "In X minutes":
minutes_str = questionary.text(
"Number of minutes :",
validate=lambda x: x.isdigit() and int(x) > 0
).ask()
if not minutes_str:
return
target_time = datetime.now() + timedelta(minutes=int(minutes_str))
else: # "At HH:MM"
time_str = questionary.text(
"Start time (HH:MM) :",
validate=lambda x: bool(re.match(r'^\d{2}:\d{2}$', x)) and
0 <= int(x.split(':')[0]) <= 23 and
0 <= int(x.split(':')[1]) <= 59
).ask()
if not time_str:
return
now = datetime.now()
h, m = int(time_str.split(':')[0]), int(time_str.split(':')[1])
target_time = now.replace(hour=h, minute=m, second=0, microsecond=0)
if target_time <= now:
console.print("[yellow]⚠ Specified time is already past. Starting immediately.[/yellow]")
return
print()
try:
while True:
remaining = target_time - datetime.now()
if remaining.total_seconds() <= 0:
break
total_secs = int(remaining.total_seconds())
h = total_secs // 3600
m = (total_secs % 3600) // 60
s = total_secs % 60
target_str = target_time.strftime('%H:%M:%S')
print(f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ",
end="", flush=True)
sleep(1)
# Flush keyboard buffer to prevent stray keystrokes from polluting subsequent prompts
while msvcrt.kbhit():
msvcrt.getwch()
print()
console.print("[green]✓ Starting processing.[/green]")
except KeyboardInterrupt:
print()
console.print("[bold red]Launch cancelled by user.[/bold red]")
raise SystemExit(0)
def load_json_file(filename): def load_json_file(filename):
""" """
Load a JSON file from disk. Load a JSON file from disk.
@@ -1252,12 +1135,8 @@ def _process_inclusion_data(inclusion, organization):
output_inclusion = {} output_inclusion = {}
# --- Prepare all data sources --- # --- Prepare all data sources ---
# 1. Launch Visit Search asynchronously (it's slow, ~5s) — only if enabled by user # 1. 6-month visit loading disabled on this branch (No-6-Month-Visit)
# We use run_with_context to pass the patient identity to the new thread # visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2)
if fetch_six_month_visit:
visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2)
else:
visit_future = None
# 2. Prepare inclusion_data: enrich inclusion with organization info # 2. Prepare inclusion_data: enrich inclusion with organization info
inclusion_data = dict(inclusion) inclusion_data = dict(inclusion)
@@ -1281,10 +1160,7 @@ def _process_inclusion_data(inclusion, organization):
logging.error(f"Error fetching request data for patient {patient_id}: {e}") logging.error(f"Error fetching request data for patient {patient_id}: {e}")
request_data = None request_data = None
try: # 6-month visit loading disabled on this branch (No-6-Month-Visit)
six_month_visit_data = visit_future.result() if visit_future is not None else {}
except Exception as e:
logging.error(f"Error searching 6-month visit for patient {pseudo}: {e}")
six_month_visit_data = None six_month_visit_data = None
# --- Process all fields from configuration --- # --- Process all fields from configuration ---
@@ -1352,19 +1228,10 @@ def main():
if login_status == "Exit": if login_status == "Exit":
return return
print()
ask_fetch_six_month_visit()
print() print()
number_of_threads = int((questionary.text("Number of threads :", default="12", number_of_threads = int((questionary.text("Number of threads :", default="12",
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask())) validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask()))
print()
ask_on_retry_exhausted()
print()
wait_for_scheduled_launch()
print() print()
load_inclusions_mapping_config() load_inclusions_mapping_config()
load_organizations_mapping_config() load_organizations_mapping_config()

Binary file not shown.