# DO (Diagnostic Order) Request History Exporter # This script exports the audit history of all diagnostic order requests matching # a configurable filter. It reuses the authentication, retry, and parallel processing # infrastructure of do_dashboard.py (token refresh, api_call_with_retry decorator, # thread-local HTTP clients, dual progress bars with ThreadPoolExecutor). # The DO_Filters constant is presented as an editable JSON prompt at startup. # Output: a single JSON dict {request_id: history_data} saved to do_requests_history.json. # Each value is the raw response from GET /api/request-history, or null on persistent error. import json import logging import msvcrt import os import re import shutil import sys import threading import traceback from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import timedelta, datetime from time import perf_counter, sleep import functools import httpx import questionary from tqdm import tqdm from rich.console import Console from do_dashboard_constants import ( HISTORY_FILE_NAME, HISTORY_LOG_FILE_NAME, OLD_FILE_SUFFIX, DO_HISTORY_FILTERS_DEFAULT, DEFAULT_USER_NAME, DEFAULT_PASSWORD, IAM_URL, GDD_URL, GDD_APP_ID, ERROR_MAX_RETRY, WAIT_BEFORE_RETRY, WAIT_BEFORE_NEW_BATCH_OF_RETRIES, MAX_BATCHS_OF_RETRIES, MAX_THREADS, DO_WORKLIST_PAGE_SIZE, BAR_N_FMT_WIDTH, BAR_TOTAL_FMT_WIDTH, BAR_TIME_WIDTH, BAR_RATE_WIDTH, API_TIMEOUT, API_AUTH_LOGIN_ENDPOINT, API_AUTH_CONFIG_TOKEN_ENDPOINT, API_AUTH_REFRESH_TOKEN_ENDPOINT, API_DO_WORKLIST_ENDPOINT, API_REQUEST_HISTORY_ENDPOINT, ) from do_dashboard_utils import ( get_httpx_client, clear_httpx_client, thread_local_storage, run_with_context, get_old_filename, ) logging.basicConfig( level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s', filename=HISTORY_LOG_FILE_NAME, filemode='w', ) # ============================================================================ # BLOCK 1: CONFIGURATION & BASE INFRASTRUCTURE # ============================================================================ access_token = "" refresh_token = "" threads_list = [] _token_refresh_lock = threading.Lock() on_retry_exhausted = "ask" _stored_username = "" _stored_password = "" _threads_list_lock = threading.Lock() global_pbar = None _global_pbar_lock = threading.Lock() _user_interaction_lock = threading.Lock() httpx_clients = {} console = Console() import do_dashboard_utils do_dashboard_utils.httpx_clients = httpx_clients do_dashboard_utils.threads_list = threads_list do_dashboard_utils._threads_list_lock = _threads_list_lock custom_bar_format = ( "{l_bar}{bar}" f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} " f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, " f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}" ) # ============================================================================ # BLOCK 2: DECORATORS & RESILIENCE # ============================================================================ def new_token(): """Refresh access token using the refresh token, with re-login fallback.""" global access_token, refresh_token with _token_refresh_lock: for attempt in range(ERROR_MAX_RETRY): try: client = get_httpx_client() client.base_url = GDD_URL response = client.post( API_AUTH_REFRESH_TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}, json={"refresh_token": refresh_token}, timeout=20, ) response.raise_for_status() access_token = response.json()["access_token"] refresh_token = response.json()["refresh_token"] return except httpx.RequestError as exc: logging.warning(f"Refresh Token Error (Attempt {attempt + 1}) : {exc}") clear_httpx_client() except httpx.HTTPStatusError as exc: logging.warning( f"Refresh Token Error (Attempt {attempt + 1}) : " f"{exc.response.status_code} for Url {exc.request.url}" ) clear_httpx_client() finally: if attempt < ERROR_MAX_RETRY - 1: sleep(WAIT_BEFORE_RETRY) logging.warning("Refresh token exhausted. Attempting re-login with stored credentials.") _do_login(_stored_username, _stored_password) logging.info("Re-login successful. New tokens acquired.") def api_call_with_retry(func): """Decorator for API calls with automatic retry and token refresh on 401 errors.""" @functools.wraps(func) def wrapper(*args, **kwargs): func_name = func.__name__ total_attempts = 0 batch_count = 1 while True: for attempt in range(ERROR_MAX_RETRY): total_attempts += 1 try: return func(*args, **kwargs) except (httpx.RequestError, httpx.HTTPStatusError) as exc: logging.warning(f"Error in {func_name} (Attempt {total_attempts}): {exc}") clear_httpx_client() if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: logging.info(f"Token expired for {func_name}. Refreshing token.") try: new_token() except (httpx.RequestError, httpx.HTTPStatusError) as token_exc: logging.warning(f"Token refresh/re-login failed for {func_name}: {token_exc}") if attempt < ERROR_MAX_RETRY - 1: sleep(WAIT_BEFORE_RETRY) else: if batch_count < MAX_BATCHS_OF_RETRIES: logging.warning( f"Batch {batch_count}/{MAX_BATCHS_OF_RETRIES} failed for {func_name}. " f"Waiting {WAIT_BEFORE_NEW_BATCH_OF_RETRIES}s before automatic retry batch." ) batch_count += 1 sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES) break else: with _user_interaction_lock: if on_retry_exhausted == "ignore": ctx = getattr(thread_local_storage, "current_request_context", {"id": "Unknown"}) logging.warning( f"[AUTO-IGNORE] Skipping {func_name} for Request {ctx['id']}. Error: {exc}" ) return None elif on_retry_exhausted == "abort": logging.critical( f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}" ) raise httpx.RequestError( message=f"Persistent error in {func_name} (auto-aborted)" ) else: console.print( f"\n[bold red]Persistent error in {func_name} after " f"{batch_count} batches ({total_attempts} attempts).[/bold red]" ) console.print(f"[red]Exception: {exc}[/red]") choice = questionary.select( f"What would you like to do for {func_name}?", choices=[ "Retry (try another batch of retries)", "Ignore (return None and continue)", "Stop script (critical error)", ], ).ask() if choice == "Retry (try another batch of retries)": logging.info(f"User chose to retry {func_name}. Restarting batch sequence.") batch_count = 1 break elif choice == "Ignore (return None and continue)": ctx = getattr(thread_local_storage, "current_request_context", {"id": "Unknown"}) logging.warning( f"[IGNORE] User opted to skip {func_name} for Request {ctx['id']}. Error: {exc}" ) return None else: logging.critical( f"User chose to stop script after persistent error in {func_name}." ) raise httpx.RequestError( message=f"Persistent error in {func_name} (stopped by user)" ) return wrapper # ============================================================================ # BLOCK 3: AUTHENTICATION # ============================================================================ def _do_login(username, password): """Performs the two-step authentication (IAM → GDD) with the given credentials. Updates global access_token and refresh_token on success. Raises httpx.RequestError or httpx.HTTPStatusError on failure. Must NOT acquire _token_refresh_lock (caller's responsibility). """ global access_token, refresh_token client = get_httpx_client() client.base_url = IAM_URL response = client.post( API_AUTH_LOGIN_ENDPOINT, json={"username": username, "password": password}, timeout=20, ) response.raise_for_status() master_token = response.json()["access_token"] user_id = response.json()["userId"] client = get_httpx_client() client.base_url = GDD_URL response = client.post( API_AUTH_CONFIG_TOKEN_ENDPOINT, headers={"Authorization": f"Bearer {master_token}"}, json={ "userId": user_id, "clientId": GDD_APP_ID, "userAgent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/137.0.0.0 Safari/537.36" ), }, timeout=20, ) response.raise_for_status() access_token = response.json()["access_token"] refresh_token = response.json()["refresh_token"] def login(): global _stored_username, _stored_password user_name = questionary.text("login :", default=DEFAULT_USER_NAME).ask() password = questionary.password("password :", default=DEFAULT_PASSWORD).ask() if not (user_name and password): return "Exit" try: _do_login(user_name, password) except httpx.RequestError as exc: print(f"Login Error : {exc}") logging.warning(f"Login Error : {exc}") return "Error" except httpx.HTTPStatusError as exc: print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}") logging.warning(f"Login Error : {exc.response.status_code} for Url {exc.request.url}") return "Error" _stored_username = user_name _stored_password = password print() print("Login Success") return "Success" # ============================================================================ # BLOCK 3B: STARTUP PARAMETERS # ============================================================================ def ask_do_filters(): """Presents DO_HISTORY_FILTERS_DEFAULT as an editable JSON with validation. Called immediately after login so the user can adjust filters before processing. """ default_value = json.dumps(DO_HISTORY_FILTERS_DEFAULT, ensure_ascii=False) def validate_json(text): if not text: return "Les filtres ne peuvent pas être vides" try: json.loads(text) return True except json.JSONDecodeError as e: return f"JSON invalide : {e}" filters_str = questionary.text( "DO Filters (JSON) :", default=default_value, validate=validate_json, ).ask() if filters_str is None: return DO_HISTORY_FILTERS_DEFAULT return json.loads(filters_str) def ask_on_retry_exhausted(): """Asks the user what to do when all API retry batches are exhausted.""" global on_retry_exhausted choice = questionary.select( "On retry exhausted :", choices=[ "Ask (interactive prompt)", "Ignore (return None and continue)", "Abort (stop script)", ], ).ask() if choice is None or choice == "Ask (interactive prompt)": on_retry_exhausted = "ask" elif choice == "Ignore (return None and continue)": on_retry_exhausted = "ignore" else: on_retry_exhausted = "abort" def wait_for_scheduled_launch(): """Asks the user when to start the processing and waits if needed. Options: Immediately / In X minutes / At HH:MM """ choice = questionary.select( "When to start processing ?", choices=["Immediately", "In X minutes", "At HH:MM"], ).ask() if choice is None or choice == "Immediately": return if choice == "In X minutes": minutes_str = questionary.text( "Number of minutes :", validate=lambda x: x.isdigit() and int(x) > 0, ).ask() if not minutes_str: return target_time = datetime.now() + timedelta(minutes=int(minutes_str)) else: # "At HH:MM" time_str = questionary.text( "Start time (HH:MM) :", validate=lambda x: bool(re.match(r'^\d{2}:\d{2}$', x)) and 0 <= int(x.split(':')[0]) <= 23 and 0 <= int(x.split(':')[1]) <= 59, ).ask() if not time_str: return now = datetime.now() h, m = int(time_str.split(':')[0]), int(time_str.split(':')[1]) target_time = now.replace(hour=h, minute=m, second=0, microsecond=0) if target_time <= now: console.print("[yellow]⚠ Specified time is already past. Starting immediately.[/yellow]") return print() try: while True: remaining = target_time - datetime.now() if remaining.total_seconds() <= 0: break total_secs = int(remaining.total_seconds()) h = total_secs // 3600 m = (total_secs % 3600) // 60 s = total_secs % 60 target_str = target_time.strftime('%H:%M:%S') print( f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ", end="", flush=True, ) sleep(1) while msvcrt.kbhit(): msvcrt.getwch() print() console.print("[green]✓ Starting processing.[/green]") except KeyboardInterrupt: print() console.print("[bold red]Launch cancelled by user.[/bold red]") raise SystemExit(0) # ============================================================================ # BLOCK 4: BUSINESS API CALLS # ============================================================================ @api_call_with_retry def get_worklist_page(filters, page, page_size): """Fetches one page of the diagnostic order worklist.""" client = get_httpx_client() client.base_url = GDD_URL response = client.post( API_DO_WORKLIST_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}, json={ "lang": "fr-FR", "filters": filters, "limit": page_size, "page": page, "sort": [], }, timeout=API_TIMEOUT, ) response.raise_for_status() return response.json() @api_call_with_retry def get_request_history(request_id): """Fetches the full audit history for a single request (descending chronological order).""" client = get_httpx_client() client.base_url = GDD_URL response = client.get( API_REQUEST_HISTORY_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}, params={"sortOrder": "DESC", "requestId": request_id}, timeout=API_TIMEOUT, ) response.raise_for_status() return response.json() # ============================================================================ # BLOCK 5: REQUEST PROCESSING # ============================================================================ def _process_single_request(worklist_request): """Fetches the audit history for one worklist request. Returns: Tuple of (request_id, history_data) where history_data is None on persistent API failure (stored as JSON null in the output). """ request_id = worklist_request.get("id") thread_local_storage.current_request_context = {"id": request_id} history_data = get_request_history(request_id) return request_id, history_data # ============================================================================ # BLOCK 6: FILE UTILITIES # ============================================================================ def backup_history_file(): """Backs up the existing history JSON file before writing a new version.""" if os.path.exists(HISTORY_FILE_NAME): old_path = get_old_filename(HISTORY_FILE_NAME, OLD_FILE_SUFFIX) try: shutil.copy2(HISTORY_FILE_NAME, old_path) except Exception as e: logging.warning(f"Could not backup {HISTORY_FILE_NAME}: {e}") # ============================================================================ # BLOCK 7: MAIN EXECUTION # ============================================================================ def main(): global global_pbar print() login_status = login() while login_status == "Error": login_status = login() if login_status == "Exit": return print() do_filters = ask_do_filters() print() number_of_threads = int( questionary.text( "Number of threads :", default="12", validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS, ).ask() ) print() ask_on_retry_exhausted() print() wait_for_scheduled_launch() # === FETCH WORKLIST (paginated) === print() start_time = perf_counter() with console.status("[bold green]Fetching worklist (page 1)...", spinner="dots"): first_page = get_worklist_page(do_filters, 1, DO_WORKLIST_PAGE_SIZE) metadata = first_page.get("metadata", {}) total_requests = metadata.get("total", 0) total_pages = metadata.get("pages", 1) print(f"{total_requests} requests across {total_pages} pages...") print() # === SUBMIT ALL REQUESTS TO THREAD POOL AS PAGES ARRIVE === # fetching_pbar (position=0): advances as each worklist page is fetched # processing_pbar (position=1): advances as each request history is retrieved all_futures = [] all_results = [] completed_set = set() with ThreadPoolExecutor(max_workers=number_of_threads) as thread_pool: with tqdm( total=total_requests, unit="req.", desc=f"{'Fetching requests':<52}", position=0, leave=True, bar_format=custom_bar_format, ) as fetching_pbar: with tqdm( total=total_requests, unit="req.", desc=f"{'Fetching history':<52}", position=1, leave=True, bar_format=custom_bar_format, ) as processing_pbar: global_pbar = processing_pbar def _drain_completed(): for f in list(all_futures): if f not in completed_set and f.done(): try: result = f.result() all_results.append(result) except Exception as exc: logging.critical( f"Critical exception in history worker: {exc}", exc_info=True ) print(f"\nCRITICAL ERROR in history processing thread:") print(f"Exception: {exc}") traceback.print_exc() thread_pool.shutdown(wait=False, cancel_futures=True) raise finally: completed_set.add(f) with _global_pbar_lock: if global_pbar: global_pbar.update(1) # Submit first page requests first_page_data = first_page.get("data", []) for worklist_request in first_page_data: f = thread_pool.submit( run_with_context, _process_single_request, {"id": worklist_request.get("id")}, worklist_request, ) all_futures.append(f) fetching_pbar.update(len(first_page_data)) # Fetch and submit remaining pages; drain completed futures after each page for page_num in range(2, total_pages + 1): page_data = get_worklist_page(do_filters, page_num, DO_WORKLIST_PAGE_SIZE) page_requests = page_data.get("data", []) for worklist_request in page_requests: f = thread_pool.submit( run_with_context, _process_single_request, {"id": worklist_request.get("id")}, worklist_request, ) all_futures.append(f) fetching_pbar.update(len(page_requests)) _drain_completed() # Drain remaining futures not yet collected remaining = [f for f in all_futures if f not in completed_set] for future in as_completed(remaining): try: result = future.result() all_results.append(result) except Exception as exc: logging.critical( f"Critical exception in history worker: {exc}", exc_info=True ) print(f"\nCRITICAL ERROR in history processing thread:") print(f"Exception: {exc}") traceback.print_exc() thread_pool.shutdown(wait=False, cancel_futures=True) raise finally: completed_set.add(future) with _global_pbar_lock: if global_pbar: global_pbar.update(1) # === BUILD OUTPUT DICT === print() print() print("Building history output...") output_dict = {} for request_id, history_data in all_results: output_dict[request_id] = history_data # None → JSON null for failed requests # === BACKUP & WRITE === try: backup_history_file() print("Writing file...") with open(HISTORY_FILE_NAME, 'w', encoding='utf-8') as f_json: json.dump(output_dict, f_json, indent=4, ensure_ascii=False) console.print("[green]✓ History saved to JSON file[/green]") except IOError as io_err: logging.critical(f"Error while writing JSON file : {io_err}") print(f"Error while writing JSON file : {io_err}") except Exception as exc: logging.critical(f"Error during final processing : {exc}") print(f"Error during final processing : {exc}") print() print(f"Elapsed time : {str(timedelta(seconds=perf_counter() - start_time))}") if __name__ == '__main__': try: main() except Exception as e: logging.critical(f"Script terminated prematurely due to an exception: {e}", exc_info=True) print(f"Script stopped due to an error : {e}") finally: print('\n') input("Press Enter to exit...")