Compare commits
8 Commits
main
...
a1985741f8
| Author | SHA1 | Date | |
|---|---|---|---|
| a1985741f8 | |||
| 9ff3595a17 | |||
| 13e794eb34 | |||
| e58f867bd0 | |||
| 57540d5159 | |||
| d6943faf59 | |||
| 8562c45f05 | |||
| 1cc2c754b7 |
BIN
config/Endobest_Dashboard_Config-new.xlsx
Normal file
BIN
config/Endobest_Dashboard_Config-new.xlsx
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
BIN
config/eb_dashboard_extended_template-new.xlsx
Normal file
BIN
config/eb_dashboard_extended_template-new.xlsx
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
253
eb_dashboard.py
253
eb_dashboard.py
@@ -21,7 +21,7 @@
|
||||
# identification, and support for complex data extraction using JSON path expressions.
|
||||
import json
|
||||
import logging
|
||||
import msvcrt
|
||||
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
@@ -118,10 +118,6 @@ access_token = ""
|
||||
refresh_token = ""
|
||||
threads_list = []
|
||||
_token_refresh_lock = threading.Lock()
|
||||
on_retry_exhausted = "ask" # "ask" | "ignore" | "abort" — set at startup
|
||||
fetch_six_month_visit = False # Whether to fetch 6-month visit data (slow, ~5s per patient)
|
||||
_stored_username = "" # Credentials stored at login for automatic re-login
|
||||
_stored_password = ""
|
||||
_threads_list_lock = threading.Lock()
|
||||
global_pbar = None
|
||||
_global_pbar_lock = threading.Lock()
|
||||
@@ -190,10 +186,8 @@ def new_token():
|
||||
finally:
|
||||
if attempt < ERROR_MAX_RETRY - 1:
|
||||
sleep(WAIT_BEFORE_RETRY)
|
||||
# Refresh token exhausted — attempt full re-login with stored credentials
|
||||
logging.warning("Refresh token exhausted. Attempting re-login with stored credentials.")
|
||||
_do_login(_stored_username, _stored_password)
|
||||
logging.info("Re-login successful. New tokens acquired.")
|
||||
logging.critical("Persistent error in refresh_token")
|
||||
raise httpx.RequestError(message="Persistent error in refresh_token")
|
||||
|
||||
|
||||
def api_call_with_retry(func):
|
||||
@@ -218,10 +212,7 @@ def api_call_with_retry(func):
|
||||
|
||||
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
|
||||
logging.info(f"Token expired for {func_name}. Refreshing token.")
|
||||
try:
|
||||
new_token()
|
||||
except (httpx.RequestError, httpx.HTTPStatusError) as token_exc:
|
||||
logging.warning(f"Token refresh/re-login failed for {func_name}: {token_exc}")
|
||||
new_token()
|
||||
|
||||
if attempt < ERROR_MAX_RETRY - 1:
|
||||
sleep(WAIT_BEFORE_RETRY)
|
||||
@@ -234,41 +225,32 @@ def api_call_with_retry(func):
|
||||
sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES)
|
||||
break # Exit for loop to restart batch in while True
|
||||
else:
|
||||
# All automatic batches exhausted — apply on_retry_exhausted policy
|
||||
# All automatic batches exhausted, ask the user
|
||||
with _user_interaction_lock:
|
||||
if on_retry_exhausted == "ignore":
|
||||
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
|
||||
console.print(f"[red]Exception: {exc}[/red]")
|
||||
|
||||
choice = questionary.select(
|
||||
f"What would you like to do for {func_name}?",
|
||||
choices=[
|
||||
"Retry (try another batch of retries)",
|
||||
"Ignore (return None and continue)",
|
||||
"Stop script (critical error)"
|
||||
]
|
||||
).ask()
|
||||
|
||||
if choice == "Retry (try another batch of retries)":
|
||||
logging.info(f"User chose to retry {func_name}. Restarting batch sequence.")
|
||||
batch_count = 1 # Reset batch counter for the next interactive round
|
||||
break # Exit for loop to restart batch in while True
|
||||
elif choice == "Ignore (return None and continue)":
|
||||
# Retrieve context if available
|
||||
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
|
||||
logging.warning(f"[AUTO-IGNORE] Skipping {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
|
||||
logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
|
||||
return None
|
||||
|
||||
elif on_retry_exhausted == "abort":
|
||||
logging.critical(f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}")
|
||||
raise httpx.RequestError(message=f"Persistent error in {func_name} (auto-aborted)")
|
||||
|
||||
else: # "ask" — display error then interactive prompt
|
||||
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
|
||||
console.print(f"[red]Exception: {exc}[/red]")
|
||||
|
||||
choice = questionary.select(
|
||||
f"What would you like to do for {func_name}?",
|
||||
choices=[
|
||||
"Retry (try another batch of retries)",
|
||||
"Ignore (return None and continue)",
|
||||
"Stop script (critical error)"
|
||||
]
|
||||
).ask()
|
||||
|
||||
if choice == "Retry (try another batch of retries)":
|
||||
logging.info(f"User chose to retry {func_name}. Restarting batch sequence.")
|
||||
batch_count = 1 # Reset batch counter for the next interactive round
|
||||
break # Exit for loop to restart batch in while True
|
||||
elif choice == "Ignore (return None and continue)":
|
||||
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
|
||||
logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
|
||||
return None
|
||||
else:
|
||||
logging.critical(f"User chose to stop script after persistent error in {func_name}.")
|
||||
raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)")
|
||||
else:
|
||||
logging.critical(f"User chose to stop script after persistent error in {func_name}.")
|
||||
raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)")
|
||||
|
||||
return wrapper
|
||||
|
||||
@@ -277,37 +259,8 @@ def api_call_with_retry(func):
|
||||
# BLOCK 3: AUTHENTICATION
|
||||
# ============================================================================
|
||||
|
||||
def _do_login(username, password):
|
||||
"""Performs the two-step authentication (IAM → RC) with the given credentials.
|
||||
Updates global access_token and refresh_token on success.
|
||||
Raises httpx.RequestError or httpx.HTTPStatusError on failure.
|
||||
Must NOT acquire _token_refresh_lock (caller's responsibility).
|
||||
"""
|
||||
global access_token, refresh_token
|
||||
|
||||
client = get_httpx_client()
|
||||
client.base_url = IAM_URL
|
||||
response = client.post(API_AUTH_LOGIN_ENDPOINT,
|
||||
json={"username": username, "password": password},
|
||||
timeout=20)
|
||||
response.raise_for_status()
|
||||
master_token = response.json()["access_token"]
|
||||
user_id = response.json()["userId"]
|
||||
|
||||
client = get_httpx_client()
|
||||
client.base_url = RC_URL
|
||||
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT,
|
||||
headers={"Authorization": f"Bearer {master_token}"},
|
||||
json={"userId": user_id, "clientId": RC_APP_ID,
|
||||
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
|
||||
timeout=20)
|
||||
response.raise_for_status()
|
||||
access_token = response.json()["access_token"]
|
||||
refresh_token = response.json()["refresh_token"]
|
||||
|
||||
|
||||
def login():
|
||||
global _stored_username, _stored_password
|
||||
global access_token, refresh_token
|
||||
|
||||
user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask())
|
||||
password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask())
|
||||
@@ -315,18 +268,42 @@ def login():
|
||||
return "Exit"
|
||||
|
||||
try:
|
||||
_do_login(user_name, password)
|
||||
client = get_httpx_client()
|
||||
client.base_url = IAM_URL
|
||||
response = client.post(API_AUTH_LOGIN_ENDPOINT, json={"username": user_name, "password": password},
|
||||
timeout=20)
|
||||
response.raise_for_status()
|
||||
master_token = response.json()["access_token"]
|
||||
user_id = response.json()["userId"]
|
||||
except httpx.RequestError as exc:
|
||||
print(f"Login Error : {exc}")
|
||||
logging.warning(f"Login Error : {exc}")
|
||||
return "Error"
|
||||
except httpx.HTTPStatusError as exc:
|
||||
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
|
||||
logging.warning(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
|
||||
logging.warning(
|
||||
f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
|
||||
return "Error"
|
||||
|
||||
try:
|
||||
client = get_httpx_client()
|
||||
client.base_url = RC_URL
|
||||
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {master_token}"},
|
||||
json={"userId": user_id, "clientId": RC_APP_ID,
|
||||
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
|
||||
timeout=20)
|
||||
response.raise_for_status()
|
||||
access_token = response.json()["access_token"]
|
||||
refresh_token = response.json()["refresh_token"]
|
||||
except httpx.RequestError as exc:
|
||||
print(f"Login Error : {exc}")
|
||||
logging.warning(f"Login Error : {exc}")
|
||||
return "Error"
|
||||
except httpx.HTTPStatusError as exc:
|
||||
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
|
||||
logging.warning(f"Login Error : {exc}")
|
||||
return "Error"
|
||||
|
||||
_stored_username = user_name
|
||||
_stored_password = password
|
||||
print()
|
||||
print("Login Success")
|
||||
return "Success"
|
||||
@@ -336,100 +313,6 @@ def login():
|
||||
# BLOCK 3B: FILE UTILITIES
|
||||
# ============================================================================
|
||||
|
||||
def ask_on_retry_exhausted():
|
||||
"""Asks the user what to do when all API retry batches are exhausted."""
|
||||
global on_retry_exhausted
|
||||
choice = questionary.select(
|
||||
"On retry exhausted :",
|
||||
choices=[
|
||||
"Ask (interactive prompt)",
|
||||
"Ignore (return None and continue)",
|
||||
"Abort (stop script)"
|
||||
]
|
||||
).ask()
|
||||
if choice is None or choice == "Ask (interactive prompt)":
|
||||
on_retry_exhausted = "ask"
|
||||
elif choice == "Ignore (return None and continue)":
|
||||
on_retry_exhausted = "ignore"
|
||||
else:
|
||||
on_retry_exhausted = "abort"
|
||||
|
||||
|
||||
def ask_fetch_six_month_visit():
|
||||
"""Asks the user whether to fetch 6-month visit data (slow API call, ~5s per patient)."""
|
||||
global fetch_six_month_visit
|
||||
choice = questionary.select(
|
||||
"Fetch 6-month visit progress data? (slow, ~5s per patient) :",
|
||||
choices=[
|
||||
"No (skip, faster execution)",
|
||||
"Yes (fetch 6-month visit data)"
|
||||
]
|
||||
).ask()
|
||||
fetch_six_month_visit = (choice == "Yes (fetch 6-month visit data)")
|
||||
|
||||
|
||||
def wait_for_scheduled_launch():
|
||||
"""Asks the user when to start the processing and waits if needed.
|
||||
Options: Immediately / In X minutes / At HH:MM
|
||||
"""
|
||||
choice = questionary.select(
|
||||
"When to start processing ?",
|
||||
choices=["Immediately", "In X minutes", "At HH:MM"]
|
||||
).ask()
|
||||
|
||||
if choice is None or choice == "Immediately":
|
||||
return
|
||||
|
||||
if choice == "In X minutes":
|
||||
minutes_str = questionary.text(
|
||||
"Number of minutes :",
|
||||
validate=lambda x: x.isdigit() and int(x) > 0
|
||||
).ask()
|
||||
if not minutes_str:
|
||||
return
|
||||
target_time = datetime.now() + timedelta(minutes=int(minutes_str))
|
||||
|
||||
else: # "At HH:MM"
|
||||
time_str = questionary.text(
|
||||
"Start time (HH:MM) :",
|
||||
validate=lambda x: bool(re.match(r'^\d{2}:\d{2}$', x)) and
|
||||
0 <= int(x.split(':')[0]) <= 23 and
|
||||
0 <= int(x.split(':')[1]) <= 59
|
||||
).ask()
|
||||
if not time_str:
|
||||
return
|
||||
now = datetime.now()
|
||||
h, m = int(time_str.split(':')[0]), int(time_str.split(':')[1])
|
||||
target_time = now.replace(hour=h, minute=m, second=0, microsecond=0)
|
||||
if target_time <= now:
|
||||
console.print("[yellow]⚠ Specified time is already past. Starting immediately.[/yellow]")
|
||||
return
|
||||
|
||||
print()
|
||||
try:
|
||||
while True:
|
||||
remaining = target_time - datetime.now()
|
||||
if remaining.total_seconds() <= 0:
|
||||
break
|
||||
total_secs = int(remaining.total_seconds())
|
||||
h = total_secs // 3600
|
||||
m = (total_secs % 3600) // 60
|
||||
s = total_secs % 60
|
||||
target_str = target_time.strftime('%H:%M:%S')
|
||||
print(f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ",
|
||||
end="", flush=True)
|
||||
sleep(1)
|
||||
# Flush keyboard buffer to prevent stray keystrokes from polluting subsequent prompts
|
||||
while msvcrt.kbhit():
|
||||
msvcrt.getwch()
|
||||
print()
|
||||
console.print("[green]✓ Starting processing.[/green]")
|
||||
except KeyboardInterrupt:
|
||||
print()
|
||||
console.print("[bold red]Launch cancelled by user.[/bold red]")
|
||||
raise SystemExit(0)
|
||||
|
||||
|
||||
def load_json_file(filename):
|
||||
"""
|
||||
Load a JSON file from disk.
|
||||
@@ -1252,12 +1135,8 @@ def _process_inclusion_data(inclusion, organization):
|
||||
output_inclusion = {}
|
||||
|
||||
# --- Prepare all data sources ---
|
||||
# 1. Launch Visit Search asynchronously (it's slow, ~5s) — only if enabled by user
|
||||
# We use run_with_context to pass the patient identity to the new thread
|
||||
if fetch_six_month_visit:
|
||||
visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2)
|
||||
else:
|
||||
visit_future = None
|
||||
# 1. 6-month visit loading disabled on this branch (No-6-Month-Visit)
|
||||
# visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2)
|
||||
|
||||
# 2. Prepare inclusion_data: enrich inclusion with organization info
|
||||
inclusion_data = dict(inclusion)
|
||||
@@ -1281,11 +1160,8 @@ def _process_inclusion_data(inclusion, organization):
|
||||
logging.error(f"Error fetching request data for patient {patient_id}: {e}")
|
||||
request_data = None
|
||||
|
||||
try:
|
||||
six_month_visit_data = visit_future.result() if visit_future is not None else {}
|
||||
except Exception as e:
|
||||
logging.error(f"Error searching 6-month visit for patient {pseudo}: {e}")
|
||||
six_month_visit_data = None
|
||||
# 6-month visit loading disabled on this branch (No-6-Month-Visit)
|
||||
six_month_visit_data = None
|
||||
|
||||
# --- Process all fields from configuration ---
|
||||
process_inclusions_mapping(output_inclusion, inclusion_data, record_data, request_data, all_questionnaires, six_month_visit_data)
|
||||
@@ -1352,19 +1228,10 @@ def main():
|
||||
if login_status == "Exit":
|
||||
return
|
||||
|
||||
print()
|
||||
ask_fetch_six_month_visit()
|
||||
|
||||
print()
|
||||
number_of_threads = int((questionary.text("Number of threads :", default="12",
|
||||
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask()))
|
||||
|
||||
print()
|
||||
ask_on_retry_exhausted()
|
||||
|
||||
print()
|
||||
wait_for_scheduled_launch()
|
||||
|
||||
print()
|
||||
load_inclusions_mapping_config()
|
||||
load_organizations_mapping_config()
|
||||
|
||||
Binary file not shown.
Reference in New Issue
Block a user