diff --git a/Endolife Reporting/Endolife Reporting.zip b/Endolife Reporting/Endolife Reporting.zip index 391c389..a5d0430 100644 Binary files a/Endolife Reporting/Endolife Reporting.zip and b/Endolife Reporting/Endolife Reporting.zip differ diff --git a/Endolife Reporting/do_dashboard.exe b/Endolife Reporting/do_dashboard.exe index e049c09..5d2531f 100644 Binary files a/Endolife Reporting/do_dashboard.exe and b/Endolife Reporting/do_dashboard.exe differ diff --git a/do_dashboard_constants.py b/do_dashboard_constants.py index 48f37d9..7b470cc 100644 --- a/do_dashboard_constants.py +++ b/do_dashboard_constants.py @@ -133,3 +133,16 @@ BAR_N_FMT_WIDTH = 4 BAR_TOTAL_FMT_WIDTH = 4 BAR_TIME_WIDTH = 8 BAR_RATE_WIDTH = 10 + +# ============================================================================ +# DO HISTORY — SPECIFIC CONSTANTS +# ============================================================================ + +HISTORY_FILE_NAME = "do_requests_history.json" +HISTORY_LOG_FILE_NAME = "history.log" + +# Default filters proposed to the user at startup (editable as JSON before processing) +DO_HISTORY_FILTERS_DEFAULT = {"hideArchivedRequests": False, "excludeTest": True, "status": "all-admin"} + +# History API endpoint (GET + ?sortOrder=DESC&requestId={id}) +API_REQUEST_HISTORY_ENDPOINT = "/api/request-history" diff --git a/do_history.bat b/do_history.bat new file mode 100644 index 0000000..cde9952 --- /dev/null +++ b/do_history.bat @@ -0,0 +1,3 @@ +@echo off +call C:\PythonProjects\.rcvenv\Scripts\activate.bat +python do_history.py %* diff --git a/do_history.py b/do_history.py new file mode 100644 index 0000000..5c64aa2 --- /dev/null +++ b/do_history.py @@ -0,0 +1,672 @@ + +# DO (Diagnostic Order) Request History Exporter +# This script exports the audit history of all diagnostic order requests matching +# a configurable filter. It reuses the authentication, retry, and parallel processing +# infrastructure of do_dashboard.py (token refresh, api_call_with_retry decorator, +# thread-local HTTP clients, dual progress bars with ThreadPoolExecutor). +# The DO_Filters constant is presented as an editable JSON prompt at startup. +# Output: a single JSON dict {request_id: history_data} saved to do_requests_history.json. +# Each value is the raw response from GET /api/request-history, or null on persistent error. +import json +import logging +import msvcrt +import os +import re +import shutil +import sys +import threading +import traceback +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import timedelta, datetime +from time import perf_counter, sleep +import functools + +import httpx +import questionary +from tqdm import tqdm +from rich.console import Console + +from do_dashboard_constants import ( + HISTORY_FILE_NAME, + HISTORY_LOG_FILE_NAME, + OLD_FILE_SUFFIX, + DO_HISTORY_FILTERS_DEFAULT, + DEFAULT_USER_NAME, + DEFAULT_PASSWORD, + IAM_URL, + GDD_URL, + GDD_APP_ID, + ERROR_MAX_RETRY, + WAIT_BEFORE_RETRY, + WAIT_BEFORE_NEW_BATCH_OF_RETRIES, + MAX_BATCHS_OF_RETRIES, + MAX_THREADS, + DO_WORKLIST_PAGE_SIZE, + BAR_N_FMT_WIDTH, + BAR_TOTAL_FMT_WIDTH, + BAR_TIME_WIDTH, + BAR_RATE_WIDTH, + API_TIMEOUT, + API_AUTH_LOGIN_ENDPOINT, + API_AUTH_CONFIG_TOKEN_ENDPOINT, + API_AUTH_REFRESH_TOKEN_ENDPOINT, + API_DO_WORKLIST_ENDPOINT, + API_REQUEST_HISTORY_ENDPOINT, +) +from do_dashboard_utils import ( + get_httpx_client, + clear_httpx_client, + thread_local_storage, + run_with_context, + get_old_filename, +) + +logging.basicConfig( + level=logging.WARNING, + format='%(asctime)s - %(levelname)s - %(message)s', + filename=HISTORY_LOG_FILE_NAME, + filemode='w', +) + + +# ============================================================================ +# BLOCK 1: CONFIGURATION & BASE INFRASTRUCTURE +# ============================================================================ + +access_token = "" +refresh_token = "" +threads_list = [] +_token_refresh_lock = threading.Lock() +on_retry_exhausted = "ask" +_stored_username = "" +_stored_password = "" +_threads_list_lock = threading.Lock() +global_pbar = None +_global_pbar_lock = threading.Lock() +_user_interaction_lock = threading.Lock() + +httpx_clients = {} +console = Console() + +import do_dashboard_utils +do_dashboard_utils.httpx_clients = httpx_clients +do_dashboard_utils.threads_list = threads_list +do_dashboard_utils._threads_list_lock = _threads_list_lock + +custom_bar_format = ( + "{l_bar}{bar}" + f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} " + f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, " + f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}" +) + + +# ============================================================================ +# BLOCK 2: DECORATORS & RESILIENCE +# ============================================================================ + +def new_token(): + """Refresh access token using the refresh token, with re-login fallback.""" + global access_token, refresh_token + with _token_refresh_lock: + for attempt in range(ERROR_MAX_RETRY): + try: + client = get_httpx_client() + client.base_url = GDD_URL + response = client.post( + API_AUTH_REFRESH_TOKEN_ENDPOINT, + headers={"Authorization": f"Bearer {access_token}"}, + json={"refresh_token": refresh_token}, + timeout=20, + ) + response.raise_for_status() + access_token = response.json()["access_token"] + refresh_token = response.json()["refresh_token"] + return + except httpx.RequestError as exc: + logging.warning(f"Refresh Token Error (Attempt {attempt + 1}) : {exc}") + clear_httpx_client() + except httpx.HTTPStatusError as exc: + logging.warning( + f"Refresh Token Error (Attempt {attempt + 1}) : " + f"{exc.response.status_code} for Url {exc.request.url}" + ) + clear_httpx_client() + finally: + if attempt < ERROR_MAX_RETRY - 1: + sleep(WAIT_BEFORE_RETRY) + logging.warning("Refresh token exhausted. Attempting re-login with stored credentials.") + _do_login(_stored_username, _stored_password) + logging.info("Re-login successful. New tokens acquired.") + + +def api_call_with_retry(func): + """Decorator for API calls with automatic retry and token refresh on 401 errors.""" + @functools.wraps(func) + def wrapper(*args, **kwargs): + func_name = func.__name__ + total_attempts = 0 + batch_count = 1 + + while True: + for attempt in range(ERROR_MAX_RETRY): + total_attempts += 1 + try: + return func(*args, **kwargs) + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + logging.warning(f"Error in {func_name} (Attempt {total_attempts}): {exc}") + + clear_httpx_client() + + if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: + logging.info(f"Token expired for {func_name}. Refreshing token.") + try: + new_token() + except (httpx.RequestError, httpx.HTTPStatusError) as token_exc: + logging.warning(f"Token refresh/re-login failed for {func_name}: {token_exc}") + + if attempt < ERROR_MAX_RETRY - 1: + sleep(WAIT_BEFORE_RETRY) + else: + if batch_count < MAX_BATCHS_OF_RETRIES: + logging.warning( + f"Batch {batch_count}/{MAX_BATCHS_OF_RETRIES} failed for {func_name}. " + f"Waiting {WAIT_BEFORE_NEW_BATCH_OF_RETRIES}s before automatic retry batch." + ) + batch_count += 1 + sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES) + break + else: + with _user_interaction_lock: + if on_retry_exhausted == "ignore": + ctx = getattr(thread_local_storage, "current_request_context", {"id": "Unknown"}) + logging.warning( + f"[AUTO-IGNORE] Skipping {func_name} for Request {ctx['id']}. Error: {exc}" + ) + return None + + elif on_retry_exhausted == "abort": + logging.critical( + f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}" + ) + raise httpx.RequestError( + message=f"Persistent error in {func_name} (auto-aborted)" + ) + + else: + console.print( + f"\n[bold red]Persistent error in {func_name} after " + f"{batch_count} batches ({total_attempts} attempts).[/bold red]" + ) + console.print(f"[red]Exception: {exc}[/red]") + + choice = questionary.select( + f"What would you like to do for {func_name}?", + choices=[ + "Retry (try another batch of retries)", + "Ignore (return None and continue)", + "Stop script (critical error)", + ], + ).ask() + + if choice == "Retry (try another batch of retries)": + logging.info(f"User chose to retry {func_name}. Restarting batch sequence.") + batch_count = 1 + break + elif choice == "Ignore (return None and continue)": + ctx = getattr(thread_local_storage, "current_request_context", {"id": "Unknown"}) + logging.warning( + f"[IGNORE] User opted to skip {func_name} for Request {ctx['id']}. Error: {exc}" + ) + return None + else: + logging.critical( + f"User chose to stop script after persistent error in {func_name}." + ) + raise httpx.RequestError( + message=f"Persistent error in {func_name} (stopped by user)" + ) + + return wrapper + + +# ============================================================================ +# BLOCK 3: AUTHENTICATION +# ============================================================================ + +def _do_login(username, password): + """Performs the two-step authentication (IAM → GDD) with the given credentials. + Updates global access_token and refresh_token on success. + Raises httpx.RequestError or httpx.HTTPStatusError on failure. + Must NOT acquire _token_refresh_lock (caller's responsibility). + """ + global access_token, refresh_token + + client = get_httpx_client() + client.base_url = IAM_URL + response = client.post( + API_AUTH_LOGIN_ENDPOINT, + json={"username": username, "password": password}, + timeout=20, + ) + response.raise_for_status() + master_token = response.json()["access_token"] + user_id = response.json()["userId"] + + client = get_httpx_client() + client.base_url = GDD_URL + response = client.post( + API_AUTH_CONFIG_TOKEN_ENDPOINT, + headers={"Authorization": f"Bearer {master_token}"}, + json={ + "userId": user_id, + "clientId": GDD_APP_ID, + "userAgent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/137.0.0.0 Safari/537.36" + ), + }, + timeout=20, + ) + response.raise_for_status() + access_token = response.json()["access_token"] + refresh_token = response.json()["refresh_token"] + + +def login(): + global _stored_username, _stored_password + + user_name = questionary.text("login :", default=DEFAULT_USER_NAME).ask() + password = questionary.password("password :", default=DEFAULT_PASSWORD).ask() + if not (user_name and password): + return "Exit" + + try: + _do_login(user_name, password) + except httpx.RequestError as exc: + print(f"Login Error : {exc}") + logging.warning(f"Login Error : {exc}") + return "Error" + except httpx.HTTPStatusError as exc: + print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}") + logging.warning(f"Login Error : {exc.response.status_code} for Url {exc.request.url}") + return "Error" + + _stored_username = user_name + _stored_password = password + print() + print("Login Success") + return "Success" + + +# ============================================================================ +# BLOCK 3B: STARTUP PARAMETERS +# ============================================================================ + +def ask_do_filters(): + """Presents DO_HISTORY_FILTERS_DEFAULT as an editable JSON with validation. + Called immediately after login so the user can adjust filters before processing. + """ + default_value = json.dumps(DO_HISTORY_FILTERS_DEFAULT, ensure_ascii=False) + + def validate_json(text): + if not text: + return "Les filtres ne peuvent pas être vides" + try: + json.loads(text) + return True + except json.JSONDecodeError as e: + return f"JSON invalide : {e}" + + filters_str = questionary.text( + "DO Filters (JSON) :", + default=default_value, + validate=validate_json, + ).ask() + + if filters_str is None: + return DO_HISTORY_FILTERS_DEFAULT + return json.loads(filters_str) + + +def ask_on_retry_exhausted(): + """Asks the user what to do when all API retry batches are exhausted.""" + global on_retry_exhausted + choice = questionary.select( + "On retry exhausted :", + choices=[ + "Ask (interactive prompt)", + "Ignore (return None and continue)", + "Abort (stop script)", + ], + ).ask() + + if choice is None or choice == "Ask (interactive prompt)": + on_retry_exhausted = "ask" + elif choice == "Ignore (return None and continue)": + on_retry_exhausted = "ignore" + else: + on_retry_exhausted = "abort" + + +def wait_for_scheduled_launch(): + """Asks the user when to start the processing and waits if needed. + Options: Immediately / In X minutes / At HH:MM + """ + choice = questionary.select( + "When to start processing ?", + choices=["Immediately", "In X minutes", "At HH:MM"], + ).ask() + + if choice is None or choice == "Immediately": + return + + if choice == "In X minutes": + minutes_str = questionary.text( + "Number of minutes :", + validate=lambda x: x.isdigit() and int(x) > 0, + ).ask() + if not minutes_str: + return + target_time = datetime.now() + timedelta(minutes=int(minutes_str)) + + else: # "At HH:MM" + time_str = questionary.text( + "Start time (HH:MM) :", + validate=lambda x: bool(re.match(r'^\d{2}:\d{2}$', x)) + and 0 <= int(x.split(':')[0]) <= 23 + and 0 <= int(x.split(':')[1]) <= 59, + ).ask() + if not time_str: + return + now = datetime.now() + h, m = int(time_str.split(':')[0]), int(time_str.split(':')[1]) + target_time = now.replace(hour=h, minute=m, second=0, microsecond=0) + if target_time <= now: + console.print("[yellow]⚠ Specified time is already past. Starting immediately.[/yellow]") + return + + print() + try: + while True: + remaining = target_time - datetime.now() + if remaining.total_seconds() <= 0: + break + total_secs = int(remaining.total_seconds()) + h = total_secs // 3600 + m = (total_secs % 3600) // 60 + s = total_secs % 60 + target_str = target_time.strftime('%H:%M:%S') + print( + f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ", + end="", + flush=True, + ) + sleep(1) + while msvcrt.kbhit(): + msvcrt.getwch() + print() + console.print("[green]✓ Starting processing.[/green]") + except KeyboardInterrupt: + print() + console.print("[bold red]Launch cancelled by user.[/bold red]") + raise SystemExit(0) + + +# ============================================================================ +# BLOCK 4: BUSINESS API CALLS +# ============================================================================ + +@api_call_with_retry +def get_worklist_page(filters, page, page_size): + """Fetches one page of the diagnostic order worklist.""" + client = get_httpx_client() + client.base_url = GDD_URL + response = client.post( + API_DO_WORKLIST_ENDPOINT, + headers={"Authorization": f"Bearer {access_token}"}, + json={ + "lang": "fr-FR", + "filters": filters, + "limit": page_size, + "page": page, + "sort": [], + }, + timeout=API_TIMEOUT, + ) + response.raise_for_status() + return response.json() + + +@api_call_with_retry +def get_request_history(request_id): + """Fetches the full audit history for a single request (descending chronological order).""" + client = get_httpx_client() + client.base_url = GDD_URL + response = client.get( + API_REQUEST_HISTORY_ENDPOINT, + headers={"Authorization": f"Bearer {access_token}"}, + params={"sortOrder": "DESC", "requestId": request_id}, + timeout=API_TIMEOUT, + ) + response.raise_for_status() + return response.json() + + +# ============================================================================ +# BLOCK 5: REQUEST PROCESSING +# ============================================================================ + +def _process_single_request(worklist_request): + """Fetches the audit history for one worklist request. + + Returns: + Tuple of (request_id, history_data) where history_data is None on + persistent API failure (stored as JSON null in the output). + """ + request_id = worklist_request.get("id") + thread_local_storage.current_request_context = {"id": request_id} + history_data = get_request_history(request_id) + return request_id, history_data + + +# ============================================================================ +# BLOCK 6: FILE UTILITIES +# ============================================================================ + +def backup_history_file(): + """Backs up the existing history JSON file before writing a new version.""" + if os.path.exists(HISTORY_FILE_NAME): + old_path = get_old_filename(HISTORY_FILE_NAME, OLD_FILE_SUFFIX) + try: + shutil.copy2(HISTORY_FILE_NAME, old_path) + except Exception as e: + logging.warning(f"Could not backup {HISTORY_FILE_NAME}: {e}") + + +# ============================================================================ +# BLOCK 7: MAIN EXECUTION +# ============================================================================ + +def main(): + global global_pbar + + print() + login_status = login() + + while login_status == "Error": + login_status = login() + if login_status == "Exit": + return + + print() + do_filters = ask_do_filters() + + print() + number_of_threads = int( + questionary.text( + "Number of threads :", + default="12", + validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS, + ).ask() + ) + + print() + ask_on_retry_exhausted() + + print() + wait_for_scheduled_launch() + + # === FETCH WORKLIST (paginated) === + print() + start_time = perf_counter() + + with console.status("[bold green]Fetching worklist (page 1)...", spinner="dots"): + first_page = get_worklist_page(do_filters, 1, DO_WORKLIST_PAGE_SIZE) + + metadata = first_page.get("metadata", {}) + total_requests = metadata.get("total", 0) + total_pages = metadata.get("pages", 1) + + print(f"{total_requests} requests across {total_pages} pages...") + print() + + # === SUBMIT ALL REQUESTS TO THREAD POOL AS PAGES ARRIVE === + # fetching_pbar (position=0): advances as each worklist page is fetched + # processing_pbar (position=1): advances as each request history is retrieved + all_futures = [] + all_results = [] + completed_set = set() + + with ThreadPoolExecutor(max_workers=number_of_threads) as thread_pool: + + with tqdm( + total=total_requests, + unit="req.", + desc=f"{'Fetching requests':<52}", + position=0, + leave=True, + bar_format=custom_bar_format, + ) as fetching_pbar: + + with tqdm( + total=total_requests, + unit="req.", + desc=f"{'Fetching history':<52}", + position=1, + leave=True, + bar_format=custom_bar_format, + ) as processing_pbar: + + global_pbar = processing_pbar + + def _drain_completed(): + for f in list(all_futures): + if f not in completed_set and f.done(): + try: + result = f.result() + all_results.append(result) + except Exception as exc: + logging.critical( + f"Critical exception in history worker: {exc}", exc_info=True + ) + print(f"\nCRITICAL ERROR in history processing thread:") + print(f"Exception: {exc}") + traceback.print_exc() + thread_pool.shutdown(wait=False, cancel_futures=True) + raise + finally: + completed_set.add(f) + with _global_pbar_lock: + if global_pbar: + global_pbar.update(1) + + # Submit first page requests + first_page_data = first_page.get("data", []) + for worklist_request in first_page_data: + f = thread_pool.submit( + run_with_context, + _process_single_request, + {"id": worklist_request.get("id")}, + worklist_request, + ) + all_futures.append(f) + fetching_pbar.update(len(first_page_data)) + + # Fetch and submit remaining pages; drain completed futures after each page + for page_num in range(2, total_pages + 1): + page_data = get_worklist_page(do_filters, page_num, DO_WORKLIST_PAGE_SIZE) + page_requests = page_data.get("data", []) + for worklist_request in page_requests: + f = thread_pool.submit( + run_with_context, + _process_single_request, + {"id": worklist_request.get("id")}, + worklist_request, + ) + all_futures.append(f) + fetching_pbar.update(len(page_requests)) + _drain_completed() + + # Drain remaining futures not yet collected + remaining = [f for f in all_futures if f not in completed_set] + for future in as_completed(remaining): + try: + result = future.result() + all_results.append(result) + except Exception as exc: + logging.critical( + f"Critical exception in history worker: {exc}", exc_info=True + ) + print(f"\nCRITICAL ERROR in history processing thread:") + print(f"Exception: {exc}") + traceback.print_exc() + thread_pool.shutdown(wait=False, cancel_futures=True) + raise + finally: + completed_set.add(future) + with _global_pbar_lock: + if global_pbar: + global_pbar.update(1) + + # === BUILD OUTPUT DICT === + print() + print() + print("Building history output...") + + output_dict = {} + for request_id, history_data in all_results: + output_dict[request_id] = history_data # None → JSON null for failed requests + + # === BACKUP & WRITE === + try: + backup_history_file() + + print("Writing file...") + with open(HISTORY_FILE_NAME, 'w', encoding='utf-8') as f_json: + json.dump(output_dict, f_json, indent=4, ensure_ascii=False) + + console.print("[green]✓ History saved to JSON file[/green]") + + except IOError as io_err: + logging.critical(f"Error while writing JSON file : {io_err}") + print(f"Error while writing JSON file : {io_err}") + except Exception as exc: + logging.critical(f"Error during final processing : {exc}") + print(f"Error during final processing : {exc}") + + print() + print(f"Elapsed time : {str(timedelta(seconds=perf_counter() - start_time))}") + + +if __name__ == '__main__': + + try: + main() + except Exception as e: + logging.critical(f"Script terminated prematurely due to an exception: {e}", exc_info=True) + print(f"Script stopped due to an error : {e}") + finally: + print('\n') + input("Press Enter to exit...")