Compare commits
3 Commits
cc709200a0
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 3904948c32 | |||
| 0db52e6492 | |||
| 9fbca92f37 |
117
eb_dashboard.py
117
eb_dashboard.py
@@ -21,7 +21,7 @@
|
|||||||
# identification, and support for complex data extraction using JSON path expressions.
|
# identification, and support for complex data extraction using JSON path expressions.
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import msvcrt
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
@@ -119,6 +119,9 @@ refresh_token = ""
|
|||||||
threads_list = []
|
threads_list = []
|
||||||
_token_refresh_lock = threading.Lock()
|
_token_refresh_lock = threading.Lock()
|
||||||
on_retry_exhausted = "ask" # "ask" | "ignore" | "abort" — set at startup
|
on_retry_exhausted = "ask" # "ask" | "ignore" | "abort" — set at startup
|
||||||
|
fetch_six_month_visit = False # Whether to fetch 6-month visit data (slow, ~5s per patient)
|
||||||
|
_stored_username = "" # Credentials stored at login for automatic re-login
|
||||||
|
_stored_password = ""
|
||||||
_threads_list_lock = threading.Lock()
|
_threads_list_lock = threading.Lock()
|
||||||
global_pbar = None
|
global_pbar = None
|
||||||
_global_pbar_lock = threading.Lock()
|
_global_pbar_lock = threading.Lock()
|
||||||
@@ -187,8 +190,10 @@ def new_token():
|
|||||||
finally:
|
finally:
|
||||||
if attempt < ERROR_MAX_RETRY - 1:
|
if attempt < ERROR_MAX_RETRY - 1:
|
||||||
sleep(WAIT_BEFORE_RETRY)
|
sleep(WAIT_BEFORE_RETRY)
|
||||||
logging.critical("Persistent error in refresh_token")
|
# Refresh token exhausted — attempt full re-login with stored credentials
|
||||||
raise httpx.RequestError(message="Persistent error in refresh_token")
|
logging.warning("Refresh token exhausted. Attempting re-login with stored credentials.")
|
||||||
|
_do_login(_stored_username, _stored_password)
|
||||||
|
logging.info("Re-login successful. New tokens acquired.")
|
||||||
|
|
||||||
|
|
||||||
def api_call_with_retry(func):
|
def api_call_with_retry(func):
|
||||||
@@ -213,7 +218,10 @@ def api_call_with_retry(func):
|
|||||||
|
|
||||||
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
|
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
|
||||||
logging.info(f"Token expired for {func_name}. Refreshing token.")
|
logging.info(f"Token expired for {func_name}. Refreshing token.")
|
||||||
new_token()
|
try:
|
||||||
|
new_token()
|
||||||
|
except (httpx.RequestError, httpx.HTTPStatusError) as token_exc:
|
||||||
|
logging.warning(f"Token refresh/re-login failed for {func_name}: {token_exc}")
|
||||||
|
|
||||||
if attempt < ERROR_MAX_RETRY - 1:
|
if attempt < ERROR_MAX_RETRY - 1:
|
||||||
sleep(WAIT_BEFORE_RETRY)
|
sleep(WAIT_BEFORE_RETRY)
|
||||||
@@ -228,21 +236,19 @@ def api_call_with_retry(func):
|
|||||||
else:
|
else:
|
||||||
# All automatic batches exhausted — apply on_retry_exhausted policy
|
# All automatic batches exhausted — apply on_retry_exhausted policy
|
||||||
with _user_interaction_lock:
|
with _user_interaction_lock:
|
||||||
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
|
|
||||||
console.print(f"[red]Exception: {exc}[/red]")
|
|
||||||
|
|
||||||
if on_retry_exhausted == "ignore":
|
if on_retry_exhausted == "ignore":
|
||||||
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
|
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
|
||||||
logging.warning(f"[AUTO-IGNORE] Skipping {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
|
logging.warning(f"[AUTO-IGNORE] Skipping {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
|
||||||
console.print(f"[yellow]⚠ Auto-ignore: skipping {func_name}.[/yellow]")
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
elif on_retry_exhausted == "abort":
|
elif on_retry_exhausted == "abort":
|
||||||
logging.critical(f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}")
|
logging.critical(f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}")
|
||||||
console.print(f"[bold red]Auto-abort: stopping script.[/bold red]")
|
|
||||||
raise httpx.RequestError(message=f"Persistent error in {func_name} (auto-aborted)")
|
raise httpx.RequestError(message=f"Persistent error in {func_name} (auto-aborted)")
|
||||||
|
|
||||||
else: # "ask" — interactive prompt (original behaviour)
|
else: # "ask" — display error then interactive prompt
|
||||||
|
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
|
||||||
|
console.print(f"[red]Exception: {exc}[/red]")
|
||||||
|
|
||||||
choice = questionary.select(
|
choice = questionary.select(
|
||||||
f"What would you like to do for {func_name}?",
|
f"What would you like to do for {func_name}?",
|
||||||
choices=[
|
choices=[
|
||||||
@@ -271,51 +277,56 @@ def api_call_with_retry(func):
|
|||||||
# BLOCK 3: AUTHENTICATION
|
# BLOCK 3: AUTHENTICATION
|
||||||
# ============================================================================
|
# ============================================================================
|
||||||
|
|
||||||
def login():
|
def _do_login(username, password):
|
||||||
|
"""Performs the two-step authentication (IAM → RC) with the given credentials.
|
||||||
|
Updates global access_token and refresh_token on success.
|
||||||
|
Raises httpx.RequestError or httpx.HTTPStatusError on failure.
|
||||||
|
Must NOT acquire _token_refresh_lock (caller's responsibility).
|
||||||
|
"""
|
||||||
global access_token, refresh_token
|
global access_token, refresh_token
|
||||||
|
|
||||||
|
client = get_httpx_client()
|
||||||
|
client.base_url = IAM_URL
|
||||||
|
response = client.post(API_AUTH_LOGIN_ENDPOINT,
|
||||||
|
json={"username": username, "password": password},
|
||||||
|
timeout=20)
|
||||||
|
response.raise_for_status()
|
||||||
|
master_token = response.json()["access_token"]
|
||||||
|
user_id = response.json()["userId"]
|
||||||
|
|
||||||
|
client = get_httpx_client()
|
||||||
|
client.base_url = RC_URL
|
||||||
|
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT,
|
||||||
|
headers={"Authorization": f"Bearer {master_token}"},
|
||||||
|
json={"userId": user_id, "clientId": RC_APP_ID,
|
||||||
|
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
|
||||||
|
timeout=20)
|
||||||
|
response.raise_for_status()
|
||||||
|
access_token = response.json()["access_token"]
|
||||||
|
refresh_token = response.json()["refresh_token"]
|
||||||
|
|
||||||
|
|
||||||
|
def login():
|
||||||
|
global _stored_username, _stored_password
|
||||||
|
|
||||||
user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask())
|
user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask())
|
||||||
password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask())
|
password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask())
|
||||||
if not (user_name and password):
|
if not (user_name and password):
|
||||||
return "Exit"
|
return "Exit"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
client = get_httpx_client()
|
_do_login(user_name, password)
|
||||||
client.base_url = IAM_URL
|
|
||||||
response = client.post(API_AUTH_LOGIN_ENDPOINT, json={"username": user_name, "password": password},
|
|
||||||
timeout=20)
|
|
||||||
response.raise_for_status()
|
|
||||||
master_token = response.json()["access_token"]
|
|
||||||
user_id = response.json()["userId"]
|
|
||||||
except httpx.RequestError as exc:
|
except httpx.RequestError as exc:
|
||||||
print(f"Login Error : {exc}")
|
print(f"Login Error : {exc}")
|
||||||
logging.warning(f"Login Error : {exc}")
|
logging.warning(f"Login Error : {exc}")
|
||||||
return "Error"
|
return "Error"
|
||||||
except httpx.HTTPStatusError as exc:
|
except httpx.HTTPStatusError as exc:
|
||||||
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
|
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
|
||||||
logging.warning(
|
logging.warning(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
|
||||||
f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
|
|
||||||
return "Error"
|
|
||||||
|
|
||||||
try:
|
|
||||||
client = get_httpx_client()
|
|
||||||
client.base_url = RC_URL
|
|
||||||
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {master_token}"},
|
|
||||||
json={"userId": user_id, "clientId": RC_APP_ID,
|
|
||||||
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
|
|
||||||
timeout=20)
|
|
||||||
response.raise_for_status()
|
|
||||||
access_token = response.json()["access_token"]
|
|
||||||
refresh_token = response.json()["refresh_token"]
|
|
||||||
except httpx.RequestError as exc:
|
|
||||||
print(f"Login Error : {exc}")
|
|
||||||
logging.warning(f"Login Error : {exc}")
|
|
||||||
return "Error"
|
|
||||||
except httpx.HTTPStatusError as exc:
|
|
||||||
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
|
|
||||||
logging.warning(f"Login Error : {exc}")
|
|
||||||
return "Error"
|
return "Error"
|
||||||
|
|
||||||
|
_stored_username = user_name
|
||||||
|
_stored_password = password
|
||||||
print()
|
print()
|
||||||
print("Login Success")
|
print("Login Success")
|
||||||
return "Success"
|
return "Success"
|
||||||
@@ -344,6 +355,19 @@ def ask_on_retry_exhausted():
|
|||||||
on_retry_exhausted = "abort"
|
on_retry_exhausted = "abort"
|
||||||
|
|
||||||
|
|
||||||
|
def ask_fetch_six_month_visit():
|
||||||
|
"""Asks the user whether to fetch 6-month visit data (slow API call, ~5s per patient)."""
|
||||||
|
global fetch_six_month_visit
|
||||||
|
choice = questionary.select(
|
||||||
|
"Fetch 6-month visit progress data? (slow, ~5s per patient) :",
|
||||||
|
choices=[
|
||||||
|
"No (skip, faster execution)",
|
||||||
|
"Yes (fetch 6-month visit data)"
|
||||||
|
]
|
||||||
|
).ask()
|
||||||
|
fetch_six_month_visit = (choice == "Yes (fetch 6-month visit data)")
|
||||||
|
|
||||||
|
|
||||||
def wait_for_scheduled_launch():
|
def wait_for_scheduled_launch():
|
||||||
"""Asks the user when to start the processing and waits if needed.
|
"""Asks the user when to start the processing and waits if needed.
|
||||||
Options: Immediately / In X minutes / At HH:MM
|
Options: Immediately / In X minutes / At HH:MM
|
||||||
@@ -395,6 +419,9 @@ def wait_for_scheduled_launch():
|
|||||||
print(f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ",
|
print(f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ",
|
||||||
end="", flush=True)
|
end="", flush=True)
|
||||||
sleep(1)
|
sleep(1)
|
||||||
|
# Flush keyboard buffer to prevent stray keystrokes from polluting subsequent prompts
|
||||||
|
while msvcrt.kbhit():
|
||||||
|
msvcrt.getwch()
|
||||||
print()
|
print()
|
||||||
console.print("[green]✓ Starting processing.[/green]")
|
console.print("[green]✓ Starting processing.[/green]")
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
@@ -1225,9 +1252,12 @@ def _process_inclusion_data(inclusion, organization):
|
|||||||
output_inclusion = {}
|
output_inclusion = {}
|
||||||
|
|
||||||
# --- Prepare all data sources ---
|
# --- Prepare all data sources ---
|
||||||
# 1. Launch Visit Search asynchronously (it's slow, ~5s)
|
# 1. Launch Visit Search asynchronously (it's slow, ~5s) — only if enabled by user
|
||||||
# We use run_with_context to pass the patient identity to the new thread
|
# We use run_with_context to pass the patient identity to the new thread
|
||||||
visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2)
|
if fetch_six_month_visit:
|
||||||
|
visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2)
|
||||||
|
else:
|
||||||
|
visit_future = None
|
||||||
|
|
||||||
# 2. Prepare inclusion_data: enrich inclusion with organization info
|
# 2. Prepare inclusion_data: enrich inclusion with organization info
|
||||||
inclusion_data = dict(inclusion)
|
inclusion_data = dict(inclusion)
|
||||||
@@ -1252,7 +1282,7 @@ def _process_inclusion_data(inclusion, organization):
|
|||||||
request_data = None
|
request_data = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
six_month_visit_data = visit_future.result()
|
six_month_visit_data = visit_future.result() if visit_future is not None else {}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Error searching 6-month visit for patient {pseudo}: {e}")
|
logging.error(f"Error searching 6-month visit for patient {pseudo}: {e}")
|
||||||
six_month_visit_data = None
|
six_month_visit_data = None
|
||||||
@@ -1322,6 +1352,9 @@ def main():
|
|||||||
if login_status == "Exit":
|
if login_status == "Exit":
|
||||||
return
|
return
|
||||||
|
|
||||||
|
print()
|
||||||
|
ask_fetch_six_month_visit()
|
||||||
|
|
||||||
print()
|
print()
|
||||||
number_of_threads = int((questionary.text("Number of threads :", default="12",
|
number_of_threads = int((questionary.text("Number of threads :", default="12",
|
||||||
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask()))
|
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask()))
|
||||||
|
|||||||
Binary file not shown.
Reference in New Issue
Block a user