1582 lines
64 KiB
Python
1582 lines
64 KiB
Python
|
|
# DO (Diagnostic Order) Dashboard Generator
|
|
# This script automates the collection and processing of diagnostic order requests from the Ziwig GDD platform.
|
|
# It authenticates with Ziwig's IAM and GDD APIs to gather all worklist requests, their details, and
|
|
# associated professional (prescriber/requester) information. The script generates a comprehensive JSON
|
|
# report containing all request data, with 100% configurable fields defined in an Excel configuration file.
|
|
# All fields are externalized and can be configured without any code modification. The configuration
|
|
# supports multiple data sources, custom functions for business logic, field dependencies and conditions,
|
|
# value transformations, and multi-line field definitions for complex calculations. Organization counters
|
|
# are computed directly from request details (no separate statistics API). It employs multithreading with
|
|
# configurable worker pools to parallelize request processing across the thread pool, significantly reducing
|
|
# execution time. Results are exported as structured JSON files for easy integration with downstream analytics
|
|
# tools. Built-in quality assurance includes comprehensive non-regression testing with configurable
|
|
# Warning/Critical thresholds, and user confirmation prompts when critical issues are detected. Excel export
|
|
# functionality enables generation of configurable Excel workbooks with data filtering, sorting, value
|
|
# replacement, and formula recalculation. Key features include automatic token refresh handling, retry
|
|
# mechanisms for transient API failures, progress tracking with real-time visual feedback, and support for
|
|
# complex data extraction using JSON path expressions.
|
|
import json
|
|
import logging
|
|
import msvcrt
|
|
import os
|
|
import re
|
|
import sys
|
|
import threading
|
|
import traceback
|
|
from collections import defaultdict
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import timedelta, datetime
|
|
from time import perf_counter, sleep
|
|
import functools
|
|
|
|
import httpx
|
|
import openpyxl
|
|
import questionary
|
|
from tqdm import tqdm
|
|
from rich.console import Console
|
|
|
|
# Import centralized constants (SINGLE SOURCE OF TRUTH)
|
|
from do_dashboard_constants import (
|
|
REQUESTS_FILE_NAME,
|
|
ORGANIZATIONS_FILE_NAME,
|
|
OLD_FILE_SUFFIX,
|
|
DASHBOARD_CONFIG_FILE_NAME,
|
|
REQUESTS_MAPPING_TABLE_NAME,
|
|
ORGANIZATIONS_MAPPING_TABLE_NAME,
|
|
ORG_CENTER_MAPPING_FILE_NAME,
|
|
ORG_CENTER_MAPPING_TABLE_NAME,
|
|
DEFAULT_USER_NAME,
|
|
DEFAULT_PASSWORD,
|
|
IAM_URL,
|
|
GDD_URL,
|
|
GDD_APP_ID,
|
|
ERROR_MAX_RETRY,
|
|
WAIT_BEFORE_RETRY,
|
|
WAIT_BEFORE_NEW_BATCH_OF_RETRIES,
|
|
MAX_BATCHS_OF_RETRIES,
|
|
MAX_THREADS,
|
|
DO_FILTERS,
|
|
DO_WORKLIST_PAGE_SIZE,
|
|
BAR_N_FMT_WIDTH,
|
|
BAR_TOTAL_FMT_WIDTH,
|
|
BAR_TIME_WIDTH,
|
|
BAR_RATE_WIDTH,
|
|
LOG_FILE_NAME,
|
|
API_TIMEOUT,
|
|
API_AUTH_LOGIN_ENDPOINT,
|
|
API_AUTH_CONFIG_TOKEN_ENDPOINT,
|
|
API_AUTH_REFRESH_TOKEN_ENDPOINT,
|
|
API_DO_WORKLIST_ENDPOINT,
|
|
API_DO_REQUEST_DETAIL_ENDPOINT,
|
|
API_DO_PROFESSIONALS_ENDPOINT
|
|
)
|
|
|
|
# Import refactored modules
|
|
from do_dashboard_utils import (
|
|
get_nested_value,
|
|
get_httpx_client,
|
|
clear_httpx_client,
|
|
get_thread_position,
|
|
get_config_path,
|
|
thread_local_storage,
|
|
run_with_context
|
|
)
|
|
from do_dashboard_quality_checks import (
|
|
backup_output_files,
|
|
run_quality_checks,
|
|
run_check_only_mode,
|
|
set_dependencies as quality_set_dependencies,
|
|
enable_debug_mode
|
|
)
|
|
from do_dashboard_excel_export import (
|
|
prepare_excel_export,
|
|
export_excel_only,
|
|
run_normal_mode_export,
|
|
set_dependencies as excel_set_dependencies
|
|
)
|
|
|
|
logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s', filename=LOG_FILE_NAME,
|
|
filemode='w')
|
|
|
|
|
|
# ============================================================================
|
|
# BLOCK 1: CONFIGURATION & BASE INFRASTRUCTURE
|
|
# ============================================================================
|
|
|
|
# NOTE: All constants are imported from do_dashboard_constants.py (SINGLE SOURCE OF TRUTH)
|
|
|
|
# --- Global Variables ---
|
|
access_token = ""
|
|
refresh_token = ""
|
|
threads_list = []
|
|
_token_refresh_lock = threading.Lock()
|
|
on_retry_exhausted = "ask" # "ask" | "ignore" | "abort" — set at startup
|
|
_stored_username = "" # Credentials stored at login for automatic re-login
|
|
_stored_password = ""
|
|
_threads_list_lock = threading.Lock()
|
|
global_pbar = None
|
|
_global_pbar_lock = threading.Lock()
|
|
_user_interaction_lock = threading.Lock()
|
|
|
|
# Global variables (mutable, set at runtime - not constants)
|
|
requests_mapping_config = []
|
|
organizations_mapping_config = []
|
|
excel_export_config = None
|
|
excel_export_enabled = False
|
|
|
|
subtasks_thread_pool = ThreadPoolExecutor(40)
|
|
httpx_clients = {}
|
|
console = Console()
|
|
|
|
# Share global variables with utility modules (required for thread-safe operations)
|
|
import do_dashboard_utils
|
|
do_dashboard_utils.httpx_clients = httpx_clients
|
|
do_dashboard_utils.threads_list = threads_list
|
|
do_dashboard_utils._threads_list_lock = _threads_list_lock
|
|
|
|
# Inject console instance to modules
|
|
quality_set_dependencies(console)
|
|
excel_set_dependencies(console)
|
|
|
|
# Detect and enable debug mode if --debug flag is present (and remove it from argv)
|
|
if "--debug" in sys.argv:
|
|
sys.argv.remove("--debug")
|
|
enable_debug_mode()
|
|
|
|
# --- Progress Bar Configuration ---
|
|
custom_bar_format = ("{l_bar}{bar}"
|
|
f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} "
|
|
f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, "
|
|
f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}")
|
|
|
|
|
|
# ============================================================================
|
|
# BLOCK 2: DECORATORS & RESILIENCE
|
|
# ============================================================================
|
|
|
|
def new_token():
|
|
"""Refresh access token using the refresh token"""
|
|
global access_token, refresh_token
|
|
with _token_refresh_lock:
|
|
for attempt in range(ERROR_MAX_RETRY):
|
|
try:
|
|
client = get_httpx_client()
|
|
client.base_url = GDD_URL
|
|
response = client.post(API_AUTH_REFRESH_TOKEN_ENDPOINT,
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
json={"refresh_token": refresh_token}, timeout=20)
|
|
response.raise_for_status()
|
|
access_token = response.json()["access_token"]
|
|
refresh_token = response.json()["refresh_token"]
|
|
return
|
|
except httpx.RequestError as exc:
|
|
logging.warning(f"Refresh Token Error (Attempt {attempt + 1}) : {exc}")
|
|
clear_httpx_client()
|
|
except httpx.HTTPStatusError as exc:
|
|
logging.warning(
|
|
f"Refresh Token Error (Attempt {attempt + 1}) : {exc.response.status_code} for Url {exc.request.url}")
|
|
clear_httpx_client()
|
|
finally:
|
|
if attempt < ERROR_MAX_RETRY - 1:
|
|
sleep(WAIT_BEFORE_RETRY)
|
|
# Refresh token exhausted — attempt full re-login with stored credentials
|
|
logging.warning("Refresh token exhausted. Attempting re-login with stored credentials.")
|
|
_do_login(_stored_username, _stored_password)
|
|
logging.info("Re-login successful. New tokens acquired.")
|
|
|
|
|
|
def api_call_with_retry(func):
|
|
"""Decorator for API calls with automatic retry and token refresh on 401 errors"""
|
|
@functools.wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
func_name = func.__name__
|
|
total_attempts = 0
|
|
batch_count = 1
|
|
|
|
while True:
|
|
for attempt in range(ERROR_MAX_RETRY):
|
|
total_attempts += 1
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
|
|
logging.warning(f"Error in {func_name} (Attempt {total_attempts}): {exc}")
|
|
|
|
clear_httpx_client()
|
|
|
|
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
|
|
logging.info(f"Token expired for {func_name}. Refreshing token.")
|
|
try:
|
|
new_token()
|
|
except (httpx.RequestError, httpx.HTTPStatusError) as token_exc:
|
|
logging.warning(f"Token refresh/re-login failed for {func_name}: {token_exc}")
|
|
|
|
if attempt < ERROR_MAX_RETRY - 1:
|
|
sleep(WAIT_BEFORE_RETRY)
|
|
else:
|
|
if batch_count < MAX_BATCHS_OF_RETRIES:
|
|
logging.warning(f"Batch {batch_count}/{MAX_BATCHS_OF_RETRIES} failed for {func_name}. "
|
|
f"Waiting {WAIT_BEFORE_NEW_BATCH_OF_RETRIES}s before automatic retry batch.")
|
|
batch_count += 1
|
|
sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES)
|
|
break
|
|
else:
|
|
with _user_interaction_lock:
|
|
if on_retry_exhausted == "ignore":
|
|
ctx = getattr(thread_local_storage, "current_request_context", {"id": "Unknown"})
|
|
logging.warning(f"[AUTO-IGNORE] Skipping {func_name} for Request {ctx['id']}. Error: {exc}")
|
|
return None
|
|
|
|
elif on_retry_exhausted == "abort":
|
|
logging.critical(f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}")
|
|
raise httpx.RequestError(message=f"Persistent error in {func_name} (auto-aborted)")
|
|
|
|
else: # "ask" — display error then interactive prompt
|
|
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
|
|
console.print(f"[red]Exception: {exc}[/red]")
|
|
|
|
choice = questionary.select(
|
|
f"What would you like to do for {func_name}?",
|
|
choices=[
|
|
"Retry (try another batch of retries)",
|
|
"Ignore (return None and continue)",
|
|
"Stop script (critical error)"
|
|
]
|
|
).ask()
|
|
|
|
if choice == "Retry (try another batch of retries)":
|
|
logging.info(f"User chose to retry {func_name}. Restarting batch sequence.")
|
|
batch_count = 1
|
|
break
|
|
elif choice == "Ignore (return None and continue)":
|
|
ctx = getattr(thread_local_storage, "current_request_context", {"id": "Unknown"})
|
|
logging.warning(f"[IGNORE] User opted to skip {func_name} for Request {ctx['id']}. Error: {exc}")
|
|
return None
|
|
else:
|
|
logging.critical(f"User chose to stop script after persistent error in {func_name}.")
|
|
raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)")
|
|
|
|
return wrapper
|
|
|
|
|
|
# ============================================================================
|
|
# BLOCK 3: AUTHENTICATION
|
|
# ============================================================================
|
|
|
|
def _do_login(username, password):
|
|
"""Performs the two-step authentication (IAM → GDD) with the given credentials.
|
|
Updates global access_token and refresh_token on success.
|
|
Raises httpx.RequestError or httpx.HTTPStatusError on failure.
|
|
Must NOT acquire _token_refresh_lock (caller's responsibility).
|
|
"""
|
|
global access_token, refresh_token
|
|
|
|
# Step 1: IAM login
|
|
client = get_httpx_client()
|
|
client.base_url = IAM_URL
|
|
response = client.post(API_AUTH_LOGIN_ENDPOINT,
|
|
json={"username": username, "password": password},
|
|
timeout=20)
|
|
response.raise_for_status()
|
|
master_token = response.json()["access_token"]
|
|
user_id = response.json()["userId"]
|
|
|
|
# Step 2: GDD config-token
|
|
client = get_httpx_client()
|
|
client.base_url = GDD_URL
|
|
response = client.post(API_AUTH_CONFIG_TOKEN_ENDPOINT,
|
|
headers={"Authorization": f"Bearer {master_token}"},
|
|
json={"userId": user_id, "clientId": GDD_APP_ID,
|
|
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"},
|
|
timeout=20)
|
|
response.raise_for_status()
|
|
access_token = response.json()["access_token"]
|
|
refresh_token = response.json()["refresh_token"]
|
|
|
|
|
|
def login():
|
|
global _stored_username, _stored_password
|
|
|
|
user_name = (questionary.text("login :", default=DEFAULT_USER_NAME).ask())
|
|
password = (questionary.password("password :", default=DEFAULT_PASSWORD).ask())
|
|
if not (user_name and password):
|
|
return "Exit"
|
|
|
|
try:
|
|
_do_login(user_name, password)
|
|
except httpx.RequestError as exc:
|
|
print(f"Login Error : {exc}")
|
|
logging.warning(f"Login Error : {exc}")
|
|
return "Error"
|
|
except httpx.HTTPStatusError as exc:
|
|
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
|
|
logging.warning(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
|
|
return "Error"
|
|
|
|
_stored_username = user_name
|
|
_stored_password = password
|
|
print()
|
|
print("Login Success")
|
|
return "Success"
|
|
|
|
|
|
# ============================================================================
|
|
# BLOCK 3B: STARTUP PARAMETERS & FILE UTILITIES
|
|
# ============================================================================
|
|
|
|
def ask_on_retry_exhausted():
|
|
"""Asks the user what to do when all API retry batches are exhausted."""
|
|
global on_retry_exhausted
|
|
choice = questionary.select(
|
|
"On retry exhausted :",
|
|
choices=[
|
|
"Ask (interactive prompt)",
|
|
"Ignore (return None and continue)",
|
|
"Abort (stop script)"
|
|
]
|
|
).ask()
|
|
|
|
if choice is None or choice == "Ask (interactive prompt)":
|
|
on_retry_exhausted = "ask"
|
|
elif choice == "Ignore (return None and continue)":
|
|
on_retry_exhausted = "ignore"
|
|
else:
|
|
on_retry_exhausted = "abort"
|
|
|
|
|
|
def wait_for_scheduled_launch():
|
|
"""Asks the user when to start the processing and waits if needed.
|
|
Options: Immediately / In X minutes / At HH:MM
|
|
"""
|
|
choice = questionary.select(
|
|
"When to start processing ?",
|
|
choices=["Immediately", "In X minutes", "At HH:MM"]
|
|
).ask()
|
|
|
|
if choice is None or choice == "Immediately":
|
|
return
|
|
|
|
if choice == "In X minutes":
|
|
minutes_str = questionary.text(
|
|
"Number of minutes :",
|
|
validate=lambda x: x.isdigit() and int(x) > 0
|
|
).ask()
|
|
if not minutes_str:
|
|
return
|
|
target_time = datetime.now() + timedelta(minutes=int(minutes_str))
|
|
|
|
else: # "At HH:MM"
|
|
time_str = questionary.text(
|
|
"Start time (HH:MM) :",
|
|
validate=lambda x: bool(re.match(r'^\d{2}:\d{2}$', x)) and
|
|
0 <= int(x.split(':')[0]) <= 23 and
|
|
0 <= int(x.split(':')[1]) <= 59
|
|
).ask()
|
|
if not time_str:
|
|
return
|
|
now = datetime.now()
|
|
h, m = int(time_str.split(':')[0]), int(time_str.split(':')[1])
|
|
target_time = now.replace(hour=h, minute=m, second=0, microsecond=0)
|
|
if target_time <= now:
|
|
console.print("[yellow]⚠ Specified time is already past. Starting immediately.[/yellow]")
|
|
return
|
|
|
|
print()
|
|
try:
|
|
while True:
|
|
remaining = target_time - datetime.now()
|
|
if remaining.total_seconds() <= 0:
|
|
break
|
|
total_secs = int(remaining.total_seconds())
|
|
h = total_secs // 3600
|
|
m = (total_secs % 3600) // 60
|
|
s = total_secs % 60
|
|
target_str = target_time.strftime('%H:%M:%S')
|
|
print(f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ",
|
|
end="", flush=True)
|
|
sleep(1)
|
|
while msvcrt.kbhit():
|
|
msvcrt.getwch()
|
|
print()
|
|
console.print("[green]✓ Starting processing.[/green]")
|
|
except KeyboardInterrupt:
|
|
print()
|
|
console.print("[bold red]Launch cancelled by user.[/bold red]")
|
|
raise SystemExit(0)
|
|
|
|
|
|
def load_json_file(filename):
|
|
"""Load a JSON file from disk. Returns parsed data or None on error."""
|
|
if os.path.exists(filename):
|
|
try:
|
|
with open(filename, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logging.warning(f"Could not load JSON file '{filename}': {e}")
|
|
console.print(f"[yellow]⚠ Warning: Could not load JSON file '{filename}': {e}[/yellow]")
|
|
return None
|
|
|
|
|
|
# ============================================================================
|
|
# BLOCK 4: CONFIGURATION LOADING
|
|
# ============================================================================
|
|
|
|
def load_requests_mapping_config():
|
|
"""Loads and validates the requests mapping configuration from the Excel file."""
|
|
global requests_mapping_config
|
|
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME)
|
|
|
|
try:
|
|
workbook = openpyxl.load_workbook(config_path, data_only=True)
|
|
except FileNotFoundError:
|
|
error_msg = f"Error: Configuration file not found at: {config_path}"
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
if REQUESTS_MAPPING_TABLE_NAME not in workbook.sheetnames:
|
|
error_msg = f"Error: Sheet '{REQUESTS_MAPPING_TABLE_NAME}' not found in the configuration file."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
sheet = workbook[REQUESTS_MAPPING_TABLE_NAME]
|
|
headers = [cell.value for cell in sheet[1]]
|
|
|
|
temp_config = []
|
|
|
|
for row_index, row in enumerate(sheet.iter_rows(min_row=2, values_only=True), start=2):
|
|
field_config = dict(zip(headers, row))
|
|
|
|
if field_config.get("source_name") == "Not Specified":
|
|
continue
|
|
|
|
field_name = field_config.get("field_name")
|
|
if not field_name or not isinstance(field_name, str):
|
|
error_msg = f"Error in config file, row {row_index}: 'field_name' is mandatory."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
field_config["field_name"] = re.sub(r'\s*\([^)]*\)$', '', field_name).strip()
|
|
|
|
# Parse source_id prefix
|
|
source_id_raw = field_config.get("source_id", "")
|
|
if source_id_raw and isinstance(source_id_raw, str):
|
|
if source_id_raw.startswith("q_id="):
|
|
field_config["source_type"] = "q_id"
|
|
field_config["source_value"] = source_id_raw[5:]
|
|
elif source_id_raw.startswith("q_name="):
|
|
field_config["source_type"] = "q_name"
|
|
field_config["source_value"] = source_id_raw[7:]
|
|
elif source_id_raw.startswith("q_category="):
|
|
field_config["source_type"] = "q_category"
|
|
field_config["source_value"] = source_id_raw[11:]
|
|
elif source_id_raw == "record":
|
|
field_config["source_type"] = "record"
|
|
field_config["source_value"] = None
|
|
elif source_id_raw == "request":
|
|
field_config["source_type"] = "request"
|
|
field_config["source_value"] = None
|
|
else:
|
|
field_config["source_type"] = None
|
|
field_config["source_value"] = source_id_raw
|
|
else:
|
|
field_config["source_type"] = None
|
|
field_config["source_value"] = None
|
|
|
|
for json_field in ["field_path", "field_condition", "true_if_any", "value_labels"]:
|
|
value = field_config.get(json_field)
|
|
if value:
|
|
if not isinstance(value, str):
|
|
error_msg = f"Error in config file, row {row_index}, field '{json_field}': Invalid value, must be a JSON string."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
try:
|
|
field_config[json_field] = json.loads(value)
|
|
except json.JSONDecodeError:
|
|
error_msg = f"Error in config file, row {row_index}, field '{json_field}': Invalid JSON format."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
else:
|
|
field_config[json_field] = None
|
|
|
|
if not field_config.get("field_path"):
|
|
error_msg = f"Error in config file, row {row_index}: 'field_path' is mandatory when a field is specified."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
temp_config.append(field_config)
|
|
|
|
requests_mapping_config = temp_config
|
|
console.print(f"Loaded {len(requests_mapping_config)} fields from requests mapping configuration.", style="green")
|
|
|
|
|
|
def load_organizations_mapping_config():
|
|
"""Loads and validates the organizations mapping configuration from the Excel file."""
|
|
global organizations_mapping_config
|
|
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME)
|
|
|
|
try:
|
|
workbook = openpyxl.load_workbook(config_path, data_only=True)
|
|
except FileNotFoundError:
|
|
error_msg = f"Error: Configuration file not found at: {config_path}"
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
if ORGANIZATIONS_MAPPING_TABLE_NAME not in workbook.sheetnames:
|
|
logging.info(f"Sheet '{ORGANIZATIONS_MAPPING_TABLE_NAME}' not found. Organizations mapping is optional.")
|
|
organizations_mapping_config = []
|
|
return
|
|
|
|
sheet = workbook[ORGANIZATIONS_MAPPING_TABLE_NAME]
|
|
headers = [cell.value for cell in sheet[1]]
|
|
headers_filtered = [h for h in headers if h is not None]
|
|
|
|
mapping_config = []
|
|
try:
|
|
for row in sheet.iter_rows(min_row=2, values_only=True):
|
|
if all(cell is None for cell in row):
|
|
break
|
|
row_filtered = row[:len(headers_filtered)]
|
|
config_dict = dict(zip(headers_filtered, row_filtered))
|
|
mapping_config.append(config_dict)
|
|
except Exception as e:
|
|
error_msg = f"Error parsing organizations mapping: {e}"
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
finally:
|
|
workbook.close()
|
|
|
|
organizations_mapping_config = mapping_config
|
|
if mapping_config:
|
|
console.print(f"Loaded {len(organizations_mapping_config)} organizations from organizations mapping configuration.", style="green")
|
|
else:
|
|
console.print("No organizations mapping found (this is optional).", style="yellow")
|
|
|
|
|
|
def load_do_filters_config():
|
|
"""
|
|
Loads the DO filters from the Named Range 'DO_Filters' in the config Excel file.
|
|
The Named Range contains a JSON string representing the filters object for the worklist API.
|
|
|
|
Returns:
|
|
dict: Filters object (e.g. {"status": "all-admin", "study": "ENDOLIFE"})
|
|
|
|
Raises:
|
|
Exception: If the Named Range is not found or the value is not valid JSON.
|
|
"""
|
|
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME)
|
|
|
|
try:
|
|
workbook = openpyxl.load_workbook(config_path, data_only=True)
|
|
except FileNotFoundError:
|
|
error_msg = f"Error: Configuration file not found at: {config_path}"
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
if DO_FILTERS not in workbook.defined_names:
|
|
error_msg = f"Error: Named range '{DO_FILTERS}' not found in configuration file."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
try:
|
|
named_range = workbook.defined_names[DO_FILTERS]
|
|
destinations = list(named_range.destinations)
|
|
if not destinations:
|
|
raise ValueError("Named range has no destinations")
|
|
sheet_name, cell_ref = destinations[0]
|
|
# Remove absolute reference markers ($) for cell access
|
|
cell_ref_clean = cell_ref.replace('$', '')
|
|
sheet = workbook[sheet_name]
|
|
cell_value = sheet[cell_ref_clean].value
|
|
except Exception as e:
|
|
error_msg = f"Error reading Named Range '{DO_FILTERS}': {e}"
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
finally:
|
|
workbook.close()
|
|
|
|
if not cell_value or not isinstance(cell_value, str):
|
|
error_msg = f"Error: Named range '{DO_FILTERS}' is empty or not a string."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
try:
|
|
filters = json.loads(cell_value)
|
|
except json.JSONDecodeError as e:
|
|
error_msg = f"Error: Named range '{DO_FILTERS}' does not contain valid JSON: {e}"
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
console.print(f"Loaded DO filters: {filters}", style="green")
|
|
return filters
|
|
|
|
|
|
# ============================================================================
|
|
# BLOCK 5: DATA SEARCH & EXTRACTION
|
|
# ============================================================================
|
|
|
|
def get_value_from_request(output_request, key):
|
|
"""Helper to find a key in the nested output_request structure (groups → fields)."""
|
|
for group in output_request.values():
|
|
if isinstance(group, dict) and key in group:
|
|
return group[key]
|
|
return None
|
|
|
|
|
|
# ============================================================================
|
|
# BLOCK 6: CUSTOM FUNCTIONS & FIELD PROCESSING
|
|
# ============================================================================
|
|
|
|
def _execute_custom_function(function_name, args, output_request):
|
|
"""Executes a custom function for a calculated field."""
|
|
if function_name == "search_in_fields_using_regex":
|
|
if not args or len(args) < 2:
|
|
return "$$$$ Argument Error: search_in_fields_using_regex requires at least 2 arguments"
|
|
|
|
regex_pattern = args[0]
|
|
field_names = args[1:]
|
|
|
|
field_values = []
|
|
all_undefined = True
|
|
|
|
for field_name in field_names:
|
|
value = get_value_from_request(output_request, field_name)
|
|
field_values.append(value)
|
|
if value is not None and value != "undefined":
|
|
all_undefined = False
|
|
|
|
if all_undefined:
|
|
return "undefined"
|
|
|
|
try:
|
|
for value in field_values:
|
|
if isinstance(value, str) and re.search(regex_pattern, value, re.IGNORECASE):
|
|
return True
|
|
except re.error as e:
|
|
return f"$$$$ Regex Error: {e}"
|
|
|
|
return False
|
|
|
|
elif function_name == "extract_parentheses_content":
|
|
if not args or len(args) != 1:
|
|
return "$$$$ Argument Error: extract_parentheses_content requires 1 argument"
|
|
|
|
field_name = args[0]
|
|
value = get_value_from_request(output_request, field_name)
|
|
|
|
if value is None or value == "undefined":
|
|
return "undefined"
|
|
|
|
match = re.search(r'\((.*?)\)', str(value))
|
|
return match.group(1) if match else "undefined"
|
|
|
|
elif function_name == "append_terminated_suffix":
|
|
if not args or len(args) != 2:
|
|
return "$$$$ Argument Error: append_terminated_suffix requires 2 arguments"
|
|
|
|
status = get_value_from_request(output_request, args[0])
|
|
is_terminated = get_value_from_request(output_request, args[1])
|
|
|
|
if status is None or status == "undefined":
|
|
return "undefined"
|
|
|
|
if not isinstance(is_terminated, bool) or not is_terminated:
|
|
return status
|
|
|
|
return f"{status} - AP"
|
|
|
|
elif function_name == "if_then_else":
|
|
if not args or len(args) < 4:
|
|
return "$$$$ Argument Error: if_then_else requires at least 4 arguments"
|
|
|
|
operator = args[0]
|
|
|
|
def resolve_value(arg):
|
|
if isinstance(arg, bool):
|
|
return arg
|
|
if isinstance(arg, (int, float)):
|
|
return arg
|
|
if isinstance(arg, str) and arg.startswith("$"):
|
|
return arg[1:]
|
|
return get_value_from_request(output_request, arg)
|
|
|
|
if operator == "is_true":
|
|
if len(args) != 4:
|
|
return "$$$$ Argument Error: is_true requires 4 arguments"
|
|
value = resolve_value(args[1])
|
|
if value is None or value == "undefined":
|
|
return "undefined"
|
|
condition = (value is True)
|
|
result_if_true = resolve_value(args[2])
|
|
result_if_false = resolve_value(args[3])
|
|
|
|
elif operator == "is_false":
|
|
if len(args) != 4:
|
|
return "$$$$ Argument Error: is_false requires 4 arguments"
|
|
value = resolve_value(args[1])
|
|
if value is None or value == "undefined":
|
|
return "undefined"
|
|
condition = (value is False)
|
|
result_if_true = resolve_value(args[2])
|
|
result_if_false = resolve_value(args[3])
|
|
|
|
elif operator == "all_true":
|
|
if len(args) != 4:
|
|
return "$$$$ Argument Error: all_true requires 4 arguments"
|
|
fields_arg = args[1]
|
|
if not isinstance(fields_arg, list):
|
|
return "$$$$ Argument Error: all_true requires arg1 to be a list of field names"
|
|
|
|
conditions = []
|
|
for field_name in fields_arg:
|
|
field_value = get_value_from_request(output_request, field_name)
|
|
if field_value is None or field_value == "undefined":
|
|
return "undefined"
|
|
conditions.append(field_value)
|
|
|
|
condition = all(conditions)
|
|
result_if_true = resolve_value(args[2])
|
|
result_if_false = resolve_value(args[3])
|
|
|
|
elif operator == "is_defined":
|
|
if len(args) != 4:
|
|
return "$$$$ Argument Error: is_defined requires 4 arguments"
|
|
value = resolve_value(args[1])
|
|
condition = (value is not None and value != "undefined")
|
|
result_if_true = resolve_value(args[2])
|
|
result_if_false = resolve_value(args[3])
|
|
|
|
elif operator == "is_undefined":
|
|
if len(args) != 4:
|
|
return "$$$$ Argument Error: is_undefined requires 4 arguments"
|
|
value = resolve_value(args[1])
|
|
condition = (value is None or value == "undefined")
|
|
result_if_true = resolve_value(args[2])
|
|
result_if_false = resolve_value(args[3])
|
|
|
|
elif operator == "all_defined":
|
|
if len(args) != 4:
|
|
return "$$$$ Argument Error: all_defined requires 4 arguments"
|
|
fields_arg = args[1]
|
|
if not isinstance(fields_arg, list):
|
|
return "$$$$ Argument Error: all_defined requires arg1 to be a list of field names"
|
|
|
|
for field_name in fields_arg:
|
|
field_value = get_value_from_request(output_request, field_name)
|
|
if field_value is None or field_value == "undefined":
|
|
condition = False
|
|
break
|
|
else:
|
|
condition = True
|
|
|
|
result_if_true = resolve_value(args[2])
|
|
result_if_false = resolve_value(args[3])
|
|
|
|
elif operator == "==":
|
|
if len(args) != 5:
|
|
return "$$$$ Argument Error: == requires 5 arguments"
|
|
value1 = resolve_value(args[1])
|
|
value2 = resolve_value(args[2])
|
|
|
|
if value1 is None or value1 == "undefined" or value2 is None or value2 == "undefined":
|
|
return "undefined"
|
|
|
|
condition = (value1 == value2)
|
|
result_if_true = resolve_value(args[3])
|
|
result_if_false = resolve_value(args[4])
|
|
|
|
elif operator == "!=":
|
|
if len(args) != 5:
|
|
return "$$$$ Argument Error: != requires 5 arguments"
|
|
value1 = resolve_value(args[1])
|
|
value2 = resolve_value(args[2])
|
|
|
|
if value1 is None or value1 == "undefined" or value2 is None or value2 == "undefined":
|
|
return "undefined"
|
|
|
|
condition = (value1 != value2)
|
|
result_if_true = resolve_value(args[3])
|
|
result_if_false = resolve_value(args[4])
|
|
|
|
else:
|
|
return f"$$$$ Unknown Operator: {operator}"
|
|
|
|
return result_if_true if condition else result_if_false
|
|
|
|
elif function_name == "extract_value_from_array":
|
|
# Args: [array_field_name, key_path, search_value, value_path]
|
|
# array_field_name : name of an already-computed field in output_request containing the array
|
|
# key_path : relative path (list) to the key attribute within each array item
|
|
# search_value : JSON value to match against (string, number, bool, object...)
|
|
# value_path : relative path (list) to the value to extract from the matched item
|
|
if not args or len(args) != 4:
|
|
return "$$$$ Argument Error: extract_value_from_array requires 4 arguments"
|
|
|
|
array_field_name, key_path, search_value, value_path = args
|
|
|
|
if not isinstance(key_path, list):
|
|
return "$$$$ Argument Error: extract_value_from_array key_path (arg2) must be a list"
|
|
if not isinstance(value_path, list):
|
|
return "$$$$ Argument Error: extract_value_from_array value_path (arg4) must be a list"
|
|
|
|
array = get_value_from_request(output_request, array_field_name)
|
|
|
|
if array is None or array == "undefined":
|
|
return "undefined"
|
|
if not isinstance(array, list):
|
|
return "$$$$ Format Error : Array expected"
|
|
|
|
for item in array:
|
|
if get_nested_value(item, key_path) == search_value:
|
|
return get_nested_value(item, value_path, default="undefined")
|
|
|
|
return "undefined"
|
|
|
|
elif function_name == "concat":
|
|
# Args: [separator, field_name_1, field_name_2, ..., field_name_n]
|
|
# separator : string to insert between concatenated values
|
|
# field_name_* : names of already-computed fields in output_request to concatenate
|
|
# undefined/None values are silently skipped
|
|
if not args or len(args) < 2:
|
|
return "$$$$ Argument Error: concat requires at least 2 arguments (separator + 1 field)"
|
|
|
|
separator = args[0]
|
|
if not isinstance(separator, str):
|
|
return "$$$$ Argument Error: concat separator (arg1) must be a string"
|
|
|
|
field_names = args[1:]
|
|
parts = []
|
|
all_undefined = True
|
|
|
|
for field_name in field_names:
|
|
value = get_value_from_request(output_request, field_name)
|
|
if value is not None and value != "undefined":
|
|
all_undefined = False
|
|
parts.append(str(value))
|
|
|
|
if all_undefined:
|
|
return "undefined"
|
|
|
|
return separator.join(parts)
|
|
|
|
return f"$$$$ Unknown Custom Function: {function_name}"
|
|
|
|
|
|
def _apply_date_format(strftime_template, iso_value):
|
|
"""
|
|
Parses an ISO 8601 date string and formats it using a strftime template.
|
|
|
|
Returns the formatted string, or None if the source value cannot be parsed.
|
|
"""
|
|
iso_formats = [
|
|
"%Y-%m-%dT%H:%M:%S.%fZ",
|
|
"%Y-%m-%dT%H:%M:%SZ",
|
|
"%Y-%m-%dT%H:%M:%S.%f",
|
|
"%Y-%m-%dT%H:%M:%S",
|
|
"%Y-%m-%dT%H:%M",
|
|
"%Y-%m-%d",
|
|
]
|
|
for fmt in iso_formats:
|
|
try:
|
|
return datetime.strptime(iso_value, fmt).strftime(strftime_template)
|
|
except (ValueError, TypeError):
|
|
continue
|
|
return None
|
|
|
|
|
|
def process_requests_mapping(output_request, request_data):
|
|
"""Processes and adds the requests mapping fields to the output request dictionary."""
|
|
for field in requests_mapping_config:
|
|
field_name = field["field_name"]
|
|
field_group = field.get("field_group", "Extended_Fields")
|
|
final_value = "undefined"
|
|
|
|
# Check condition
|
|
condition_field_name = field.get("field_condition")
|
|
if condition_field_name:
|
|
condition_value = get_value_from_request(output_request, condition_field_name)
|
|
|
|
if condition_value is None or condition_value == "undefined":
|
|
final_value = "undefined"
|
|
elif not isinstance(condition_value, bool):
|
|
final_value = "$$$$ Condition Field Error"
|
|
elif not condition_value:
|
|
final_value = "N/A"
|
|
|
|
# If condition allows, process the field
|
|
if final_value == "undefined":
|
|
source_name = field.get("source_name")
|
|
source_type = field.get("source_type")
|
|
field_path = field.get("field_path")
|
|
|
|
# Get raw value from appropriate source
|
|
if source_name == "Calculated":
|
|
function_name = field.get("source_id")
|
|
args = field_path
|
|
final_value = _execute_custom_function(function_name, args, output_request)
|
|
elif source_type == "request":
|
|
final_value = get_nested_value(request_data, field_path, default="undefined")
|
|
else:
|
|
# source types not used in DO (q_id, q_name, q_category, record, etc.)
|
|
# return undefined to allow future extensibility
|
|
final_value = "undefined"
|
|
|
|
# If the source data itself is missing, log a warning but continue
|
|
if final_value == "$$$$ No Data":
|
|
request_id = request_data.get("id", "Unknown") if isinstance(request_data, dict) else "Unknown"
|
|
logging.warning(f"No '{source_type}' data source found for Request {request_id} (Field: {field_name})")
|
|
final_value = "undefined"
|
|
|
|
# Post-processing: Apply true_if_any and value_labels transformations
|
|
if final_value not in ["undefined", "$$$$ No Data"]:
|
|
check_values = field.get("true_if_any")
|
|
if check_values:
|
|
raw_value_set = set(final_value if isinstance(final_value, list) else [final_value])
|
|
check_values_set = set(check_values if isinstance(check_values, list) else [check_values])
|
|
final_value = not raw_value_set.isdisjoint(check_values_set)
|
|
|
|
value_labels = field.get("value_labels")
|
|
if value_labels and final_value not in ["$$$$ Format Error : Array expected"]:
|
|
found = False
|
|
for label_map in value_labels:
|
|
if label_map.get("value") == final_value:
|
|
final_value = get_nested_value(label_map, ["text", "fr"], default=f"$$$$ Value Error : {final_value}")
|
|
found = True
|
|
break
|
|
if not found:
|
|
final_value = f"$$$$ Value Error : {final_value}"
|
|
|
|
field_template = field.get("field_template")
|
|
|
|
# Post-processing: If the value is a list, join it with a pipe (unless raw_array template)
|
|
if isinstance(final_value, list) and field_template != "raw_array":
|
|
final_value = "|".join(map(str, final_value))
|
|
|
|
# Post-processing: Format score dictionaries
|
|
if isinstance(final_value, dict) and 'total' in final_value and 'max' in final_value:
|
|
final_value = f"{final_value['total']}/{final_value['max']}"
|
|
|
|
# Post-processing: Apply field template
|
|
if field_template and field_template != "raw_array" and final_value not in ["undefined", "N/A"] and isinstance(final_value, (str, int, float, bool)):
|
|
if "%" in field_template:
|
|
formatted = _apply_date_format(field_template, str(final_value))
|
|
final_value = formatted if formatted is not None else f"$$$$ Date Format Error: {final_value}"
|
|
else:
|
|
final_value = field_template.replace("$value", str(final_value))
|
|
|
|
if field_group not in output_request:
|
|
output_request[field_group] = {}
|
|
output_request[field_group][field_name] = final_value
|
|
|
|
|
|
# ============================================================================
|
|
# BLOCK 7: BUSINESS API CALLS
|
|
# ============================================================================
|
|
|
|
@api_call_with_retry
|
|
def get_worklist_page(filters, page, page_size):
|
|
"""Fetches one page of the diagnostic order worklist."""
|
|
client = get_httpx_client()
|
|
client.base_url = GDD_URL
|
|
response = client.post(
|
|
API_DO_WORKLIST_ENDPOINT,
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
json={
|
|
"lang": "fr-FR",
|
|
"filters": filters,
|
|
"limit": page_size,
|
|
"page": page,
|
|
"sort": []
|
|
},
|
|
timeout=API_TIMEOUT
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
@api_call_with_retry
|
|
def get_request_detail_by_id(request_id):
|
|
"""Fetches the full validation detail for a single request."""
|
|
client = get_httpx_client()
|
|
client.base_url = GDD_URL
|
|
response = client.get(
|
|
f"{API_DO_REQUEST_DETAIL_ENDPOINT}/{request_id}/validation",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
timeout=API_TIMEOUT
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
@api_call_with_retry
|
|
def get_professionals(ids):
|
|
"""
|
|
Fetches professional display names for a list of IDs (prescriber / requester).
|
|
IDs are deduplicated before the call. Results are matched by ID for robustness.
|
|
|
|
Returns:
|
|
dict: {professional_id: display_name}
|
|
"""
|
|
if not ids:
|
|
return {}
|
|
client = get_httpx_client()
|
|
client.base_url = GDD_URL
|
|
response = client.post(
|
|
API_DO_PROFESSIONALS_ENDPOINT,
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
json={"ids": ids},
|
|
timeout=API_TIMEOUT
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json().get("data", [])
|
|
|
|
result = {}
|
|
for pro in data:
|
|
pro_id = get_nested_value(pro, ["metadata", "id"])
|
|
display = pro.get("display")
|
|
if pro_id:
|
|
result[pro_id] = display
|
|
return result
|
|
|
|
|
|
# ============================================================================
|
|
# BLOCK 7b: ORGANIZATION CENTER MAPPING
|
|
# ============================================================================
|
|
|
|
def load_organization_center_mapping():
|
|
"""
|
|
Loads organization ↔ center mapping from Excel file in script directory.
|
|
|
|
Returns:
|
|
dict: {organization_name_normalized: center_name} or {} if error/skip
|
|
"""
|
|
mapping_file = ORG_CENTER_MAPPING_FILE_NAME
|
|
|
|
if not os.path.exists(mapping_file):
|
|
console.print(f"[yellow]⚠ Mapping file not found at: {mapping_file}. Skipping center mapping.[/yellow]")
|
|
return {}
|
|
|
|
try:
|
|
workbook = openpyxl.load_workbook(mapping_file)
|
|
except Exception as e:
|
|
console.print(f"[yellow]⚠ Error loading mapping file: {e}. Skipping center mapping.[/yellow]")
|
|
logging.warning(f"Error loading mapping file: {e}")
|
|
return {}
|
|
|
|
if ORG_CENTER_MAPPING_TABLE_NAME not in workbook.sheetnames:
|
|
console.print(f"[yellow]⚠ Sheet '{ORG_CENTER_MAPPING_TABLE_NAME}' not found in mapping file. Skipping center mapping.[/yellow]")
|
|
return {}
|
|
|
|
sheet = workbook[ORG_CENTER_MAPPING_TABLE_NAME]
|
|
headers = [cell.value for cell in sheet[1]]
|
|
|
|
if "Organization_Name" not in headers or "Center_Name" not in headers:
|
|
console.print(f"[yellow]⚠ Required columns 'Organization_Name' or 'Center_Name' not found in mapping file. Skipping center mapping.[/yellow]")
|
|
return {}
|
|
|
|
mapping_rows = []
|
|
try:
|
|
for row in sheet.iter_rows(min_row=2, values_only=True):
|
|
if all(cell is None for cell in row):
|
|
continue
|
|
row_dict = dict(zip(headers, row))
|
|
org_name = row_dict.get("Organization_Name")
|
|
center_name = row_dict.get("Center_Name")
|
|
if org_name and center_name:
|
|
mapping_rows.append({"Organization_Name": org_name, "Center_Name": center_name})
|
|
except Exception as e:
|
|
console.print(f"[yellow]⚠ Error reading mapping file rows: {e}. Skipping center mapping.[/yellow]")
|
|
logging.warning(f"Error reading mapping file rows: {e}")
|
|
return {}
|
|
|
|
# Validate: check for duplicates on normalized versions
|
|
org_names_normalized = {}
|
|
center_names_normalized = {}
|
|
|
|
for row in mapping_rows:
|
|
org_name_raw = row["Organization_Name"]
|
|
center_name_raw = row["Center_Name"]
|
|
|
|
org_normalized = org_name_raw.strip().lower() if isinstance(org_name_raw, str) else str(org_name_raw).strip().lower()
|
|
center_normalized = center_name_raw.strip().lower() if isinstance(center_name_raw, str) else str(center_name_raw).strip().lower()
|
|
|
|
if org_normalized in org_names_normalized:
|
|
console.print(f"[yellow]⚠ Duplicate found in Organization_Name: '{org_name_raw}'. Skipping center mapping.[/yellow]")
|
|
return {}
|
|
|
|
if center_normalized in center_names_normalized:
|
|
console.print(f"[yellow]⚠ Duplicate found in Center_Name: '{center_name_raw}'. Skipping center mapping.[/yellow]")
|
|
return {}
|
|
|
|
org_names_normalized[org_normalized] = org_name_raw
|
|
center_names_normalized[center_normalized] = center_name_raw
|
|
|
|
# Build mapping dict
|
|
mapping_dict = {}
|
|
for row in mapping_rows:
|
|
org_name_raw = row["Organization_Name"]
|
|
center_name_raw = row["Center_Name"]
|
|
org_normalized = org_name_raw.strip().lower() if isinstance(org_name_raw, str) else str(org_name_raw).strip().lower()
|
|
center_clean = center_name_raw.strip() if isinstance(center_name_raw, str) else str(center_name_raw).strip()
|
|
mapping_dict[org_normalized] = center_clean
|
|
|
|
return mapping_dict
|
|
|
|
|
|
# ============================================================================
|
|
# BLOCK 8: REQUEST PROCESSING
|
|
# ============================================================================
|
|
|
|
def _process_single_request(worklist_request, mapping_dict):
|
|
"""
|
|
Processes a single request from the worklist:
|
|
1. Fetches full request detail
|
|
2. Fetches prescriber and requester names (deduplicated single API call)
|
|
3. Injects enrichment data (names, identity fields, center name, status override)
|
|
4. Applies requests mapping to produce the output object
|
|
|
|
Args:
|
|
worklist_request: Request object from the worklist (root-level fields available)
|
|
mapping_dict: Organization → center name mapping dict
|
|
|
|
Returns:
|
|
Tuple of (output_request, request_meta) where request_meta contains
|
|
raw fields needed for organization building and sorting.
|
|
"""
|
|
request_id = worklist_request.get("id")
|
|
|
|
# Set thread-local context for detailed error logging in decorators
|
|
ctx = {"id": request_id}
|
|
thread_local_storage.current_request_context = ctx
|
|
|
|
# --- 1. Fetch request detail ---
|
|
request_detail = get_request_detail_by_id(request_id)
|
|
if request_detail is None:
|
|
request_detail = {}
|
|
|
|
# --- 2. Fetch professional names (prescriber + requester, deduplicated) ---
|
|
prescriber_id = request_detail.get("prescriber")
|
|
requester_id = request_detail.get("requester")
|
|
|
|
# Deduplicate IDs before API call
|
|
unique_ids = list({pid for pid in [prescriber_id, requester_id] if pid})
|
|
professionals = get_professionals(unique_ids) if unique_ids else {}
|
|
|
|
# Inject professional names (None if not found)
|
|
request_detail["prescriberName"] = professionals.get(prescriber_id) if prescriber_id else None
|
|
request_detail["requesterName"] = professionals.get(requester_id) if requester_id else None
|
|
|
|
# --- 3. Inject patient identity fields from worklist (only source for these fields) ---
|
|
identity = worklist_request.get("identity") or {}
|
|
request_detail["lastname"] = identity.get("lastname")
|
|
request_detail["firstname"] = identity.get("firstname")
|
|
request_detail["birthday"] = identity.get("birthday")
|
|
|
|
# --- 4. Status override: diagnostic_status takes precedence if defined ---
|
|
diagnostic_status = request_detail.get("diagnostic_status")
|
|
if diagnostic_status is not None and diagnostic_status != "":
|
|
request_detail["status"] = diagnostic_status
|
|
|
|
# --- 5. Center mapping: inject Center_Name from labeledOrganization ---
|
|
labeled_org = request_detail.get("labeledOrganization")
|
|
if labeled_org:
|
|
org_normalized = labeled_org.strip().lower()
|
|
request_detail["Center_Name"] = mapping_dict.get(org_normalized, labeled_org)
|
|
else:
|
|
request_detail["Center_Name"] = None
|
|
|
|
# --- 6. Apply requests mapping to produce output object ---
|
|
output_request = {}
|
|
process_requests_mapping(output_request, request_detail)
|
|
|
|
# --- 7. Build meta for organization building and sorting ---
|
|
request_meta = {
|
|
"org_id": request_detail.get("organization"),
|
|
"org_name": labeled_org,
|
|
"center_name": request_detail.get("Center_Name"),
|
|
"status": request_detail.get("status"),
|
|
"diagnostic_result": request_detail.get("diagnostic_result"),
|
|
"lastname": request_detail.get("lastname"),
|
|
"firstname": request_detail.get("firstname"),
|
|
"id": request_id
|
|
}
|
|
|
|
return output_request, request_meta
|
|
|
|
|
|
# ============================================================================
|
|
# BLOCK 9: ORGANIZATIONS BUILDING
|
|
# ============================================================================
|
|
|
|
def build_organizations(request_metas):
|
|
"""
|
|
Builds the organizations summary list from collected request metadata.
|
|
|
|
Each organization entry contains the organization identity and counters
|
|
derived from the status and diagnostic_result of its requests.
|
|
|
|
Args:
|
|
request_metas: List of request_meta dicts (from _process_single_request)
|
|
|
|
Returns:
|
|
List of organization dicts, sorted by center_name then id
|
|
"""
|
|
org_map = {}
|
|
|
|
for meta in request_metas:
|
|
org_id = meta.get("org_id")
|
|
if not org_id:
|
|
continue
|
|
|
|
if org_id not in org_map:
|
|
org_map[org_id] = {
|
|
"id": org_id,
|
|
"name": meta.get("org_name"),
|
|
"center_name": meta.get("center_name"),
|
|
"total_count": 0,
|
|
"sent_count": 0, # status == "active"
|
|
"accepted_count": 0, # status == "accepted"
|
|
"rejected_count": 0, # status == "rejected"
|
|
"sequencing_count": 0, # status == "waiting"
|
|
"ai_count": 0, # status == "in progress"
|
|
"result_available_count": 0, # status == "finished"
|
|
"report_available_count": 0, # status == "signed"
|
|
"positive_count": 0, # diagnostic_result == "POSITIVE"
|
|
"negative_count": 0, # diagnostic_result == "NEGATIVE"
|
|
"uninterpretable_count": 0 # diagnostic_result == "UNINTERPRETABLE"
|
|
}
|
|
|
|
org = org_map[org_id]
|
|
org["total_count"] += 1
|
|
|
|
status = meta.get("status")
|
|
if status == "active":
|
|
org["sent_count"] += 1
|
|
elif status == "accepted":
|
|
org["accepted_count"] += 1
|
|
elif status == "rejected":
|
|
org["rejected_count"] += 1
|
|
elif status == "waiting":
|
|
org["sequencing_count"] += 1
|
|
elif status == "in progress":
|
|
org["ai_count"] += 1
|
|
elif status == "finished":
|
|
org["result_available_count"] += 1
|
|
elif status == "signed":
|
|
org["report_available_count"] += 1
|
|
|
|
diagnostic_result = meta.get("diagnostic_result")
|
|
if diagnostic_result == "POSITIVE":
|
|
org["positive_count"] += 1
|
|
elif diagnostic_result == "NEGATIVE":
|
|
org["negative_count"] += 1
|
|
elif diagnostic_result == "UNINTERPRETABLE":
|
|
org["uninterpretable_count"] += 1
|
|
|
|
organizations = list(org_map.values())
|
|
organizations.sort(key=lambda o: (o.get("center_name") or "", o.get("id") or ""))
|
|
return organizations
|
|
|
|
|
|
# ============================================================================
|
|
# BLOCK 10: MAIN EXECUTION
|
|
# ============================================================================
|
|
|
|
def main():
|
|
global global_pbar, excel_export_config, excel_export_enabled
|
|
|
|
# --- Check for CLI Check_Only mode ---
|
|
check_only_mode = "--check-only" in sys.argv
|
|
|
|
if check_only_mode:
|
|
run_check_only_mode(sys.argv)
|
|
return
|
|
|
|
# --- Check for CLI Excel_Only mode ---
|
|
excel_only_mode = "--excel-only" in sys.argv
|
|
|
|
if excel_only_mode:
|
|
print()
|
|
load_requests_mapping_config()
|
|
load_organizations_mapping_config()
|
|
|
|
export_excel_only(sys.argv, REQUESTS_FILE_NAME, ORGANIZATIONS_FILE_NAME,
|
|
requests_mapping_config, organizations_mapping_config)
|
|
return
|
|
|
|
# === NORMAL MODE: Full data collection ===
|
|
|
|
print()
|
|
login_status = login()
|
|
|
|
while login_status == "Error":
|
|
login_status = login()
|
|
if login_status == "Exit":
|
|
return
|
|
|
|
print()
|
|
number_of_threads = int((questionary.text("Number of threads :", default="12",
|
|
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS).ask()))
|
|
|
|
print()
|
|
ask_on_retry_exhausted()
|
|
|
|
print()
|
|
wait_for_scheduled_launch()
|
|
|
|
print()
|
|
load_requests_mapping_config()
|
|
load_organizations_mapping_config()
|
|
|
|
# Load DO filters from config
|
|
print()
|
|
do_filters = load_do_filters_config()
|
|
|
|
# Load and validate Excel export configuration
|
|
print()
|
|
console.print("[bold cyan]Loading Excel export configuration...[/bold cyan]")
|
|
|
|
excel_export_config, has_config_critical, config_error_messages = \
|
|
prepare_excel_export(requests_mapping_config, organizations_mapping_config)
|
|
|
|
if has_config_critical:
|
|
for err in config_error_messages:
|
|
console.print(f"[bold red] • {err}[/bold red]")
|
|
print()
|
|
answer = questionary.confirm(
|
|
"⚠ Critical configuration errors detected. Continue anyway?",
|
|
default=False
|
|
).ask()
|
|
if not answer:
|
|
console.print("[bold red]Aborted by user[/bold red]")
|
|
return
|
|
else:
|
|
excel_export_enabled = False
|
|
else:
|
|
excel_export_enabled = True if excel_export_config else False
|
|
|
|
# Load center mapping
|
|
print()
|
|
print("Loading organization center mapping...")
|
|
mapping_dict = load_organization_center_mapping()
|
|
|
|
# === FETCH WORKLIST (paginated) ===
|
|
print()
|
|
start_time = perf_counter()
|
|
|
|
with console.status("[bold green]Fetching worklist (page 1)...", spinner="dots"):
|
|
first_page = get_worklist_page(do_filters, 1, DO_WORKLIST_PAGE_SIZE)
|
|
|
|
metadata = first_page.get("metadata", {})
|
|
total_requests = metadata.get("total", 0)
|
|
total_pages = metadata.get("pages", 1)
|
|
|
|
print(f"{total_requests} requests across {total_pages} pages...")
|
|
print()
|
|
|
|
# === SUBMIT ALL REQUESTS TO THREAD POOL AS PAGES ARRIVE ===
|
|
# Both progress bars are shown simultaneously:
|
|
# - fetching_pbar (position=0): advances by the number of requests in each fetched page
|
|
# - processing_pbar (position=1): advances as each request finishes processing
|
|
# Completed futures are drained inline after each page so the processing bar
|
|
# starts moving immediately, without waiting for all pages to be fetched first.
|
|
all_futures = []
|
|
all_results = []
|
|
completed_set = set()
|
|
|
|
with ThreadPoolExecutor(max_workers=number_of_threads) as thread_pool:
|
|
|
|
with tqdm(total=total_requests, unit="req.",
|
|
desc=f"{'Fetching requests':<52}",
|
|
position=0, leave=True,
|
|
bar_format=custom_bar_format) as fetching_pbar:
|
|
|
|
with tqdm(total=total_requests, unit="req.",
|
|
desc=f"{'Processing requests':<52}",
|
|
position=1, leave=True,
|
|
bar_format=custom_bar_format) as processing_pbar:
|
|
|
|
global_pbar = processing_pbar
|
|
|
|
def _drain_completed():
|
|
for f in list(all_futures):
|
|
if f not in completed_set and f.done():
|
|
try:
|
|
result = f.result()
|
|
all_results.append(result)
|
|
except Exception as exc:
|
|
logging.critical(f"Critical exception in request worker: {exc}", exc_info=True)
|
|
print(f"\nCRITICAL ERROR in request processing thread:")
|
|
print(f"Exception: {exc}")
|
|
traceback.print_exc()
|
|
thread_pool.shutdown(wait=False, cancel_futures=True)
|
|
raise
|
|
finally:
|
|
completed_set.add(f)
|
|
with _global_pbar_lock:
|
|
if global_pbar:
|
|
global_pbar.update(1)
|
|
|
|
# Submit first page requests
|
|
first_page_data = first_page.get("data", [])
|
|
for worklist_request in first_page_data:
|
|
f = thread_pool.submit(run_with_context, _process_single_request,
|
|
{"id": worklist_request.get("id")},
|
|
worklist_request, mapping_dict)
|
|
all_futures.append(f)
|
|
fetching_pbar.update(len(first_page_data))
|
|
|
|
# Fetch and submit remaining pages; drain completed futures after each page
|
|
for page_num in range(2, total_pages + 1):
|
|
page_data = get_worklist_page(do_filters, page_num, DO_WORKLIST_PAGE_SIZE)
|
|
page_requests = page_data.get("data", [])
|
|
for worklist_request in page_requests:
|
|
f = thread_pool.submit(run_with_context, _process_single_request,
|
|
{"id": worklist_request.get("id")},
|
|
worklist_request, mapping_dict)
|
|
all_futures.append(f)
|
|
fetching_pbar.update(len(page_requests))
|
|
_drain_completed()
|
|
|
|
# Drain remaining futures not yet collected
|
|
remaining = [f for f in all_futures if f not in completed_set]
|
|
for future in as_completed(remaining):
|
|
try:
|
|
result = future.result()
|
|
all_results.append(result)
|
|
except Exception as exc:
|
|
logging.critical(f"Critical exception in request worker: {exc}", exc_info=True)
|
|
print(f"\nCRITICAL ERROR in request processing thread:")
|
|
print(f"Exception: {exc}")
|
|
traceback.print_exc()
|
|
thread_pool.shutdown(wait=False, cancel_futures=True)
|
|
raise
|
|
finally:
|
|
completed_set.add(future)
|
|
with _global_pbar_lock:
|
|
if global_pbar:
|
|
global_pbar.update(1)
|
|
|
|
# === SORT RESULTS ===
|
|
print()
|
|
print()
|
|
print("Sorting results...")
|
|
|
|
all_results.sort(key=lambda x: (
|
|
x[1].get("center_name") or "",
|
|
x[1].get("lastname") or "",
|
|
x[1].get("firstname") or "",
|
|
x[1].get("id") or ""
|
|
))
|
|
|
|
output_requests = [r[0] for r in all_results]
|
|
request_metas = [r[1] for r in all_results]
|
|
|
|
# === BUILD ORGANIZATIONS ===
|
|
print("Building organizations summary...")
|
|
organizations_list = build_organizations(request_metas)
|
|
|
|
# === REPORT ORGANIZATION CENTER MAPPING ===
|
|
if mapping_dict:
|
|
unique_orgs = {} # org_normalized -> original org_name
|
|
for meta in request_metas:
|
|
org_name = meta.get("org_name")
|
|
if org_name:
|
|
unique_orgs[org_name.strip().lower()] = org_name
|
|
|
|
mapped_count = sum(1 for norm in unique_orgs if norm in mapping_dict)
|
|
unmapped_orgs = sorted(
|
|
name for norm, name in unique_orgs.items() if norm not in mapping_dict
|
|
)
|
|
|
|
mapping_summary = (
|
|
f"Organization center mapping: {mapped_count}/{len(unique_orgs)} organization(s) mapped."
|
|
)
|
|
console.print(f"[green]{mapping_summary}[/green]")
|
|
logging.info(mapping_summary)
|
|
|
|
if unmapped_orgs:
|
|
unmapped_header = f"{len(unmapped_orgs)} unmapped organization(s):"
|
|
console.print(f"[yellow]⚠ {unmapped_header}[/yellow]")
|
|
logging.warning(unmapped_header)
|
|
for org_name in unmapped_orgs:
|
|
console.print(f"[yellow] - {org_name}[/yellow]")
|
|
logging.warning(" - %s", org_name)
|
|
|
|
try:
|
|
# === QUALITY CHECKS ===
|
|
print()
|
|
has_regression_critical = run_quality_checks(
|
|
current_requests=output_requests,
|
|
old_requests_filename=REQUESTS_FILE_NAME
|
|
)
|
|
|
|
# === CHECK FOR CRITICAL ISSUES AND ASK USER CONFIRMATION ===
|
|
if has_regression_critical:
|
|
print()
|
|
console.print("[bold red]⚠ CRITICAL issues detected in quality checks![/bold red]")
|
|
confirm_write = questionary.confirm(
|
|
"Do you want to write the results anyway?",
|
|
default=True
|
|
).ask()
|
|
|
|
if not confirm_write:
|
|
console.print("[yellow]✗ Output writing cancelled by user. Files were not modified.[/yellow]")
|
|
console.print("[yellow] You can re-run the script to try again.[/yellow]")
|
|
print()
|
|
print(f"Elapsed time : {str(timedelta(seconds=perf_counter() - start_time))}")
|
|
return
|
|
|
|
# === BACKUP OLD FILES ===
|
|
backup_output_files()
|
|
|
|
# === WRITE NEW FILES ===
|
|
print("Writing files...")
|
|
|
|
with open(REQUESTS_FILE_NAME, 'w', encoding='utf-8') as f_json:
|
|
json.dump(output_requests, f_json, indent=4, ensure_ascii=False)
|
|
with open(ORGANIZATIONS_FILE_NAME, 'w', encoding='utf-8') as f_json:
|
|
json.dump(organizations_list, f_json, indent=4, ensure_ascii=False)
|
|
|
|
console.print("[green]✓ Data saved to JSON files[/green]")
|
|
print()
|
|
|
|
# === EXCEL EXPORT ===
|
|
run_normal_mode_export(excel_export_enabled, excel_export_config,
|
|
requests_mapping_config, organizations_mapping_config)
|
|
|
|
except IOError as io_err:
|
|
logging.critical(f"Error while writing JSON file : {io_err}")
|
|
print(f"Error while writing JSON file : {io_err}")
|
|
except Exception as exc:
|
|
logging.critical(f"Error during final processing : {exc}")
|
|
print(f"Error during final processing : {exc}")
|
|
|
|
print()
|
|
print(f"Elapsed time : {str(timedelta(seconds=perf_counter() - start_time))}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
try:
|
|
main()
|
|
except Exception as e:
|
|
logging.critical(f"Script terminated prematurely due to an exception: {e}", exc_info=True)
|
|
print(f"Script stopped due to an error : {e}")
|
|
finally:
|
|
if 'subtasks_thread_pool' in globals() and subtasks_thread_pool:
|
|
subtasks_thread_pool.shutdown(wait=False, cancel_futures=True)
|
|
print('\n')
|
|
input("Press Enter to exit...")
|