Compare commits
12 Commits
No-6-Month
...
cc709200a0
| Author | SHA1 | Date | |
|---|---|---|---|
| cc709200a0 | |||
| af57a1e187 | |||
| bff3c6b947 | |||
| 739aae0357 | |||
| fd37fac283 | |||
| 2df128a4d7 | |||
| 9321a33c1b | |||
| c80f19c537 | |||
| f1761dce8c | |||
| 8ef3392e4d | |||
| c7b1e348a2 | |||
| aac259da42 |
Binary file not shown.
BIN
config/Endobest_Dashboard_Config-sav-260130-1615.xlsx
Normal file
BIN
config/Endobest_Dashboard_Config-sav-260130-1615.xlsx
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
BIN
config/eb_dashboard_extended_template-sav-260130-1615.xlsx
Normal file
BIN
config/eb_dashboard_extended_template-sav-260130-1615.xlsx
Normal file
Binary file not shown.
Binary file not shown.
150
eb_dashboard.py
150
eb_dashboard.py
@@ -118,6 +118,7 @@ access_token = ""
|
|||||||
refresh_token = ""
|
refresh_token = ""
|
||||||
threads_list = []
|
threads_list = []
|
||||||
_token_refresh_lock = threading.Lock()
|
_token_refresh_lock = threading.Lock()
|
||||||
|
on_retry_exhausted = "ask" # "ask" | "ignore" | "abort" — set at startup
|
||||||
_threads_list_lock = threading.Lock()
|
_threads_list_lock = threading.Lock()
|
||||||
global_pbar = None
|
global_pbar = None
|
||||||
_global_pbar_lock = threading.Lock()
|
_global_pbar_lock = threading.Lock()
|
||||||
@@ -225,32 +226,43 @@ def api_call_with_retry(func):
|
|||||||
sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES)
|
sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES)
|
||||||
break # Exit for loop to restart batch in while True
|
break # Exit for loop to restart batch in while True
|
||||||
else:
|
else:
|
||||||
# All automatic batches exhausted, ask the user
|
# All automatic batches exhausted — apply on_retry_exhausted policy
|
||||||
with _user_interaction_lock:
|
with _user_interaction_lock:
|
||||||
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
|
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
|
||||||
console.print(f"[red]Exception: {exc}[/red]")
|
console.print(f"[red]Exception: {exc}[/red]")
|
||||||
|
|
||||||
choice = questionary.select(
|
if on_retry_exhausted == "ignore":
|
||||||
f"What would you like to do for {func_name}?",
|
|
||||||
choices=[
|
|
||||||
"Retry (try another batch of retries)",
|
|
||||||
"Ignore (return None and continue)",
|
|
||||||
"Stop script (critical error)"
|
|
||||||
]
|
|
||||||
).ask()
|
|
||||||
|
|
||||||
if choice == "Retry (try another batch of retries)":
|
|
||||||
logging.info(f"User chose to retry {func_name}. Restarting batch sequence.")
|
|
||||||
batch_count = 1 # Reset batch counter for the next interactive round
|
|
||||||
break # Exit for loop to restart batch in while True
|
|
||||||
elif choice == "Ignore (return None and continue)":
|
|
||||||
# Retrieve context if available
|
|
||||||
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
|
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
|
||||||
logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
|
logging.warning(f"[AUTO-IGNORE] Skipping {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
|
||||||
|
console.print(f"[yellow]⚠ Auto-ignore: skipping {func_name}.[/yellow]")
|
||||||
return None
|
return None
|
||||||
else:
|
|
||||||
logging.critical(f"User chose to stop script after persistent error in {func_name}.")
|
elif on_retry_exhausted == "abort":
|
||||||
raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)")
|
logging.critical(f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}")
|
||||||
|
console.print(f"[bold red]Auto-abort: stopping script.[/bold red]")
|
||||||
|
raise httpx.RequestError(message=f"Persistent error in {func_name} (auto-aborted)")
|
||||||
|
|
||||||
|
else: # "ask" — interactive prompt (original behaviour)
|
||||||
|
choice = questionary.select(
|
||||||
|
f"What would you like to do for {func_name}?",
|
||||||
|
choices=[
|
||||||
|
"Retry (try another batch of retries)",
|
||||||
|
"Ignore (return None and continue)",
|
||||||
|
"Stop script (critical error)"
|
||||||
|
]
|
||||||
|
).ask()
|
||||||
|
|
||||||
|
if choice == "Retry (try another batch of retries)":
|
||||||
|
logging.info(f"User chose to retry {func_name}. Restarting batch sequence.")
|
||||||
|
batch_count = 1 # Reset batch counter for the next interactive round
|
||||||
|
break # Exit for loop to restart batch in while True
|
||||||
|
elif choice == "Ignore (return None and continue)":
|
||||||
|
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
|
||||||
|
logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
logging.critical(f"User chose to stop script after persistent error in {func_name}.")
|
||||||
|
raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)")
|
||||||
|
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
@@ -313,6 +325,84 @@ def login():
|
|||||||
# BLOCK 3B: FILE UTILITIES
|
# BLOCK 3B: FILE UTILITIES
|
||||||
# ============================================================================
|
# ============================================================================
|
||||||
|
|
||||||
|
def ask_on_retry_exhausted():
|
||||||
|
"""Asks the user what to do when all API retry batches are exhausted."""
|
||||||
|
global on_retry_exhausted
|
||||||
|
choice = questionary.select(
|
||||||
|
"On retry exhausted :",
|
||||||
|
choices=[
|
||||||
|
"Ask (interactive prompt)",
|
||||||
|
"Ignore (return None and continue)",
|
||||||
|
"Abort (stop script)"
|
||||||
|
]
|
||||||
|
).ask()
|
||||||
|
if choice is None or choice == "Ask (interactive prompt)":
|
||||||
|
on_retry_exhausted = "ask"
|
||||||
|
elif choice == "Ignore (return None and continue)":
|
||||||
|
on_retry_exhausted = "ignore"
|
||||||
|
else:
|
||||||
|
on_retry_exhausted = "abort"
|
||||||
|
|
||||||
|
|
||||||
|
def wait_for_scheduled_launch():
|
||||||
|
"""Asks the user when to start the processing and waits if needed.
|
||||||
|
Options: Immediately / In X minutes / At HH:MM
|
||||||
|
"""
|
||||||
|
choice = questionary.select(
|
||||||
|
"When to start processing ?",
|
||||||
|
choices=["Immediately", "In X minutes", "At HH:MM"]
|
||||||
|
).ask()
|
||||||
|
|
||||||
|
if choice is None or choice == "Immediately":
|
||||||
|
return
|
||||||
|
|
||||||
|
if choice == "In X minutes":
|
||||||
|
minutes_str = questionary.text(
|
||||||
|
"Number of minutes :",
|
||||||
|
validate=lambda x: x.isdigit() and int(x) > 0
|
||||||
|
).ask()
|
||||||
|
if not minutes_str:
|
||||||
|
return
|
||||||
|
target_time = datetime.now() + timedelta(minutes=int(minutes_str))
|
||||||
|
|
||||||
|
else: # "At HH:MM"
|
||||||
|
time_str = questionary.text(
|
||||||
|
"Start time (HH:MM) :",
|
||||||
|
validate=lambda x: bool(re.match(r'^\d{2}:\d{2}$', x)) and
|
||||||
|
0 <= int(x.split(':')[0]) <= 23 and
|
||||||
|
0 <= int(x.split(':')[1]) <= 59
|
||||||
|
).ask()
|
||||||
|
if not time_str:
|
||||||
|
return
|
||||||
|
now = datetime.now()
|
||||||
|
h, m = int(time_str.split(':')[0]), int(time_str.split(':')[1])
|
||||||
|
target_time = now.replace(hour=h, minute=m, second=0, microsecond=0)
|
||||||
|
if target_time <= now:
|
||||||
|
console.print("[yellow]⚠ Specified time is already past. Starting immediately.[/yellow]")
|
||||||
|
return
|
||||||
|
|
||||||
|
print()
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
remaining = target_time - datetime.now()
|
||||||
|
if remaining.total_seconds() <= 0:
|
||||||
|
break
|
||||||
|
total_secs = int(remaining.total_seconds())
|
||||||
|
h = total_secs // 3600
|
||||||
|
m = (total_secs % 3600) // 60
|
||||||
|
s = total_secs % 60
|
||||||
|
target_str = target_time.strftime('%H:%M:%S')
|
||||||
|
print(f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ",
|
||||||
|
end="", flush=True)
|
||||||
|
sleep(1)
|
||||||
|
print()
|
||||||
|
console.print("[green]✓ Starting processing.[/green]")
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print()
|
||||||
|
console.print("[bold red]Launch cancelled by user.[/bold red]")
|
||||||
|
raise SystemExit(0)
|
||||||
|
|
||||||
|
|
||||||
def load_json_file(filename):
|
def load_json_file(filename):
|
||||||
"""
|
"""
|
||||||
Load a JSON file from disk.
|
Load a JSON file from disk.
|
||||||
@@ -1135,8 +1225,9 @@ def _process_inclusion_data(inclusion, organization):
|
|||||||
output_inclusion = {}
|
output_inclusion = {}
|
||||||
|
|
||||||
# --- Prepare all data sources ---
|
# --- Prepare all data sources ---
|
||||||
# 1. 6-month visit loading disabled on this branch (No-6-Month-Visit)
|
# 1. Launch Visit Search asynchronously (it's slow, ~5s)
|
||||||
# visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2)
|
# We use run_with_context to pass the patient identity to the new thread
|
||||||
|
visit_future = subtasks_thread_pool.submit(run_with_context, search_visit_by_pseudo_and_order, ctx, pseudo, 2)
|
||||||
|
|
||||||
# 2. Prepare inclusion_data: enrich inclusion with organization info
|
# 2. Prepare inclusion_data: enrich inclusion with organization info
|
||||||
inclusion_data = dict(inclusion)
|
inclusion_data = dict(inclusion)
|
||||||
@@ -1160,8 +1251,11 @@ def _process_inclusion_data(inclusion, organization):
|
|||||||
logging.error(f"Error fetching request data for patient {patient_id}: {e}")
|
logging.error(f"Error fetching request data for patient {patient_id}: {e}")
|
||||||
request_data = None
|
request_data = None
|
||||||
|
|
||||||
# 6-month visit loading disabled on this branch (No-6-Month-Visit)
|
try:
|
||||||
six_month_visit_data = None
|
six_month_visit_data = visit_future.result()
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error searching 6-month visit for patient {pseudo}: {e}")
|
||||||
|
six_month_visit_data = None
|
||||||
|
|
||||||
# --- Process all fields from configuration ---
|
# --- Process all fields from configuration ---
|
||||||
process_inclusions_mapping(output_inclusion, inclusion_data, record_data, request_data, all_questionnaires, six_month_visit_data)
|
process_inclusions_mapping(output_inclusion, inclusion_data, record_data, request_data, all_questionnaires, six_month_visit_data)
|
||||||
@@ -1232,6 +1326,12 @@ def main():
|
|||||||
number_of_threads = int((questionary.text("Number of threads :", default="12",
|
number_of_threads = int((questionary.text("Number of threads :", default="12",
|
||||||
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask()))
|
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask()))
|
||||||
|
|
||||||
|
print()
|
||||||
|
ask_on_retry_exhausted()
|
||||||
|
|
||||||
|
print()
|
||||||
|
wait_for_scheduled_launch()
|
||||||
|
|
||||||
print()
|
print()
|
||||||
load_inclusions_mapping_config()
|
load_inclusions_mapping_config()
|
||||||
load_organizations_mapping_config()
|
load_organizations_mapping_config()
|
||||||
|
|||||||
Reference in New Issue
Block a user