Compare commits

..

9 Commits

11 changed files with 695 additions and 3 deletions

3
.gitignore vendored
View File

@@ -200,4 +200,5 @@ nul
!do_org_center_mapping.xlsx !do_org_center_mapping.xlsx
/*.json /*.json
/*.exe /*.exe
/pyproject.toml

Binary file not shown.

Binary file not shown.

View File

@@ -62,8 +62,8 @@ IAM_URL = "https://api-auth.ziwig-connect.com"
GDD_URL = "https://api-lab.ziwig-connect.com" GDD_URL = "https://api-lab.ziwig-connect.com"
GDD_APP_ID = "4f5ac063-6a22-4e2c-bda5-b50c0dddab79" GDD_APP_ID = "4f5ac063-6a22-4e2c-bda5-b50c0dddab79"
DEFAULT_USER_NAME = "paul.renaud" DEFAULT_USER_NAME = "habib.benmoussa"
DEFAULT_PASSWORD = "Abdel#@#@#@123" DEFAULT_PASSWORD = "3DAqE5w&JCJx!RVKUd16#d^$u"
# ============================================================================ # ============================================================================
# API ENDPOINTS # API ENDPOINTS
@@ -133,3 +133,16 @@ BAR_N_FMT_WIDTH = 4
BAR_TOTAL_FMT_WIDTH = 4 BAR_TOTAL_FMT_WIDTH = 4
BAR_TIME_WIDTH = 8 BAR_TIME_WIDTH = 8
BAR_RATE_WIDTH = 10 BAR_RATE_WIDTH = 10
# ============================================================================
# DO HISTORY — SPECIFIC CONSTANTS
# ============================================================================
HISTORY_FILE_NAME = "do_requests_history.json"
HISTORY_LOG_FILE_NAME = "history.log"
# Default filters proposed to the user at startup (editable as JSON before processing)
DO_HISTORY_FILTERS_DEFAULT = {"hideArchivedRequests": False, "excludeTest": True, "status": "all-admin"}
# History API endpoint (GET + ?sortOrder=DESC&requestId={id})
API_REQUEST_HISTORY_ENDPOINT = "/api/request-history"

3
do_history.bat Normal file
View File

@@ -0,0 +1,3 @@
@echo off
call C:\PythonProjects\.rcvenv\Scripts\activate.bat
python do_history.py %*

675
do_history.py Normal file
View File

@@ -0,0 +1,675 @@
# DO (Diagnostic Order) Request History Exporter
# This script exports the audit history of all diagnostic order requests matching
# a configurable filter. It reuses the authentication, retry, and parallel processing
# infrastructure of do_dashboard.py (token refresh, api_call_with_retry decorator,
# thread-local HTTP clients, dual progress bars with ThreadPoolExecutor).
# The DO_Filters constant is presented as an editable JSON prompt at startup.
# Output: a single JSON dict {request_id: history_data} saved to do_requests_history.json.
# Each value is the raw response from GET /api/request-history, or null on persistent error.
import json
import logging
import msvcrt
import os
import re
import shutil
import sys
import threading
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import timedelta, datetime
from time import perf_counter, sleep
import functools
import httpx
import questionary
from tqdm import tqdm
from rich.console import Console
from do_dashboard_constants import (
HISTORY_FILE_NAME,
HISTORY_LOG_FILE_NAME,
OLD_FILE_SUFFIX,
DO_HISTORY_FILTERS_DEFAULT,
DEFAULT_USER_NAME,
DEFAULT_PASSWORD,
IAM_URL,
GDD_URL,
GDD_APP_ID,
ERROR_MAX_RETRY,
WAIT_BEFORE_RETRY,
WAIT_BEFORE_NEW_BATCH_OF_RETRIES,
MAX_BATCHS_OF_RETRIES,
MAX_THREADS,
DO_WORKLIST_PAGE_SIZE,
BAR_N_FMT_WIDTH,
BAR_TOTAL_FMT_WIDTH,
BAR_TIME_WIDTH,
BAR_RATE_WIDTH,
API_TIMEOUT,
API_AUTH_LOGIN_ENDPOINT,
API_AUTH_CONFIG_TOKEN_ENDPOINT,
API_AUTH_REFRESH_TOKEN_ENDPOINT,
API_DO_WORKLIST_ENDPOINT,
API_REQUEST_HISTORY_ENDPOINT,
)
from do_dashboard_utils import (
get_httpx_client,
clear_httpx_client,
thread_local_storage,
run_with_context,
get_old_filename,
)
logging.basicConfig(
level=logging.WARNING,
format='%(asctime)s - %(levelname)s - %(message)s',
filename=HISTORY_LOG_FILE_NAME,
filemode='w',
)
# ============================================================================
# BLOCK 1: CONFIGURATION & BASE INFRASTRUCTURE
# ============================================================================
access_token = ""
refresh_token = ""
threads_list = []
_token_refresh_lock = threading.Lock()
on_retry_exhausted = "ask"
_stored_username = ""
_stored_password = ""
_threads_list_lock = threading.Lock()
global_pbar = None
_global_pbar_lock = threading.Lock()
_user_interaction_lock = threading.Lock()
httpx_clients = {}
console = Console()
import do_dashboard_utils
do_dashboard_utils.httpx_clients = httpx_clients
do_dashboard_utils.threads_list = threads_list
do_dashboard_utils._threads_list_lock = _threads_list_lock
custom_bar_format = (
"{l_bar}{bar}"
f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} "
f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, "
f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}"
)
# ============================================================================
# BLOCK 2: DECORATORS & RESILIENCE
# ============================================================================
def new_token():
"""Refresh access token using the refresh token, with re-login fallback."""
global access_token, refresh_token
with _token_refresh_lock:
for attempt in range(ERROR_MAX_RETRY):
try:
client = get_httpx_client()
client.base_url = GDD_URL
response = client.post(
API_AUTH_REFRESH_TOKEN_ENDPOINT,
headers={"Authorization": f"Bearer {access_token}"},
json={"refresh_token": refresh_token},
timeout=20,
)
response.raise_for_status()
access_token = response.json()["access_token"]
refresh_token = response.json()["refresh_token"]
return
except httpx.RequestError as exc:
logging.warning(f"Refresh Token Error (Attempt {attempt + 1}) : {exc}")
clear_httpx_client()
except httpx.HTTPStatusError as exc:
logging.warning(
f"Refresh Token Error (Attempt {attempt + 1}) : "
f"{exc.response.status_code} for Url {exc.request.url}"
)
clear_httpx_client()
finally:
if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY)
logging.warning("Refresh token exhausted. Attempting re-login with stored credentials.")
_do_login(_stored_username, _stored_password)
logging.info("Re-login successful. New tokens acquired.")
def api_call_with_retry(func):
"""Decorator for API calls with automatic retry and token refresh on 401 errors."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
func_name = func.__name__
total_attempts = 0
batch_count = 1
while True:
for attempt in range(ERROR_MAX_RETRY):
total_attempts += 1
try:
return func(*args, **kwargs)
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
logging.warning(f"Error in {func_name} (Attempt {total_attempts}): {exc}")
clear_httpx_client()
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
logging.info(f"Token expired for {func_name}. Refreshing token.")
try:
new_token()
except (httpx.RequestError, httpx.HTTPStatusError) as token_exc:
logging.warning(f"Token refresh/re-login failed for {func_name}: {token_exc}")
if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY)
else:
if batch_count < MAX_BATCHS_OF_RETRIES:
logging.warning(
f"Batch {batch_count}/{MAX_BATCHS_OF_RETRIES} failed for {func_name}. "
f"Waiting {WAIT_BEFORE_NEW_BATCH_OF_RETRIES}s before automatic retry batch."
)
batch_count += 1
sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES)
break
else:
with _user_interaction_lock:
if on_retry_exhausted == "ignore":
ctx = getattr(thread_local_storage, "current_request_context", {"id": "Unknown"})
logging.warning(
f"[AUTO-IGNORE] Skipping {func_name} for Request {ctx['id']}. Error: {exc}"
)
return None
elif on_retry_exhausted == "abort":
logging.critical(
f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}"
)
raise httpx.RequestError(
message=f"Persistent error in {func_name} (auto-aborted)"
)
else:
console.print(
f"\n[bold red]Persistent error in {func_name} after "
f"{batch_count} batches ({total_attempts} attempts).[/bold red]"
)
console.print(f"[red]Exception: {exc}[/red]")
choice = questionary.select(
f"What would you like to do for {func_name}?",
choices=[
"Retry (try another batch of retries)",
"Ignore (return None and continue)",
"Stop script (critical error)",
],
).ask()
if choice == "Retry (try another batch of retries)":
logging.info(f"User chose to retry {func_name}. Restarting batch sequence.")
batch_count = 1
break
elif choice == "Ignore (return None and continue)":
ctx = getattr(thread_local_storage, "current_request_context", {"id": "Unknown"})
logging.warning(
f"[IGNORE] User opted to skip {func_name} for Request {ctx['id']}. Error: {exc}"
)
return None
else:
logging.critical(
f"User chose to stop script after persistent error in {func_name}."
)
raise httpx.RequestError(
message=f"Persistent error in {func_name} (stopped by user)"
)
return wrapper
# ============================================================================
# BLOCK 3: AUTHENTICATION
# ============================================================================
def _do_login(username, password):
"""Performs the two-step authentication (IAM → GDD) with the given credentials.
Updates global access_token and refresh_token on success.
Raises httpx.RequestError or httpx.HTTPStatusError on failure.
Must NOT acquire _token_refresh_lock (caller's responsibility).
"""
global access_token, refresh_token
client = get_httpx_client()
client.base_url = IAM_URL
response = client.post(
API_AUTH_LOGIN_ENDPOINT,
json={"username": username, "password": password},
timeout=20,
)
response.raise_for_status()
master_token = response.json()["access_token"]
user_id = response.json()["userId"]
client = get_httpx_client()
client.base_url = GDD_URL
response = client.post(
API_AUTH_CONFIG_TOKEN_ENDPOINT,
headers={"Authorization": f"Bearer {master_token}"},
json={
"userId": user_id,
"clientId": GDD_APP_ID,
"userAgent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/137.0.0.0 Safari/537.36"
),
},
timeout=20,
)
response.raise_for_status()
access_token = response.json()["access_token"]
refresh_token = response.json()["refresh_token"]
def login():
global _stored_username, _stored_password
user_name = questionary.text("login :", default=DEFAULT_USER_NAME).ask()
password = questionary.password("password :", default=DEFAULT_PASSWORD).ask()
if not (user_name and password):
return "Exit"
try:
_do_login(user_name, password)
except httpx.RequestError as exc:
print(f"Login Error : {exc}")
logging.warning(f"Login Error : {exc}")
return "Error"
except httpx.HTTPStatusError as exc:
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
logging.warning(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
return "Error"
_stored_username = user_name
_stored_password = password
print()
print("Login Success")
return "Success"
# ============================================================================
# BLOCK 3B: STARTUP PARAMETERS
# ============================================================================
def ask_do_filters():
"""Presents DO_HISTORY_FILTERS_DEFAULT as an editable JSON with validation.
Called immediately after login so the user can adjust filters before processing.
"""
default_value = json.dumps(DO_HISTORY_FILTERS_DEFAULT, ensure_ascii=False)
def validate_json(text):
if not text:
return "Les filtres ne peuvent pas être vides"
try:
json.loads(text)
return True
except json.JSONDecodeError as e:
return f"JSON invalide : {e}"
filters_str = questionary.text(
"DO Filters (JSON) :",
default=default_value,
validate=validate_json,
).ask()
if filters_str is None:
return DO_HISTORY_FILTERS_DEFAULT
return json.loads(filters_str)
def ask_on_retry_exhausted():
"""Asks the user what to do when all API retry batches are exhausted."""
global on_retry_exhausted
choice = questionary.select(
"On retry exhausted :",
choices=[
"Ask (interactive prompt)",
"Ignore (return None and continue)",
"Abort (stop script)",
],
).ask()
if choice is None or choice == "Ask (interactive prompt)":
on_retry_exhausted = "ask"
elif choice == "Ignore (return None and continue)":
on_retry_exhausted = "ignore"
else:
on_retry_exhausted = "abort"
def wait_for_scheduled_launch():
"""Asks the user when to start the processing and waits if needed.
Options: Immediately / In X minutes / At HH:MM
"""
choice = questionary.select(
"When to start processing ?",
choices=["Immediately", "In X minutes", "At HH:MM"],
).ask()
if choice is None or choice == "Immediately":
return
if choice == "In X minutes":
minutes_str = questionary.text(
"Number of minutes :",
validate=lambda x: x.isdigit() and int(x) > 0,
).ask()
if not minutes_str:
return
target_time = datetime.now() + timedelta(minutes=int(minutes_str))
else: # "At HH:MM"
time_str = questionary.text(
"Start time (HH:MM) :",
validate=lambda x: bool(re.match(r'^\d{2}:\d{2}$', x))
and 0 <= int(x.split(':')[0]) <= 23
and 0 <= int(x.split(':')[1]) <= 59,
).ask()
if not time_str:
return
now = datetime.now()
h, m = int(time_str.split(':')[0]), int(time_str.split(':')[1])
target_time = now.replace(hour=h, minute=m, second=0, microsecond=0)
if target_time <= now:
console.print("[yellow]⚠ Specified time is already past. Starting immediately.[/yellow]")
return
print()
try:
while True:
remaining = target_time - datetime.now()
if remaining.total_seconds() <= 0:
break
total_secs = int(remaining.total_seconds())
h = total_secs // 3600
m = (total_secs % 3600) // 60
s = total_secs % 60
target_str = target_time.strftime('%H:%M:%S')
print(
f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ",
end="",
flush=True,
)
sleep(1)
while msvcrt.kbhit():
msvcrt.getwch()
print()
console.print("[green]✓ Starting processing.[/green]")
except KeyboardInterrupt:
print()
console.print("[bold red]Launch cancelled by user.[/bold red]")
raise SystemExit(0)
# ============================================================================
# BLOCK 4: BUSINESS API CALLS
# ============================================================================
@api_call_with_retry
def get_worklist_page(filters, page, page_size):
"""Fetches one page of the diagnostic order worklist."""
client = get_httpx_client()
client.base_url = GDD_URL
response = client.post(
API_DO_WORKLIST_ENDPOINT,
headers={"Authorization": f"Bearer {access_token}"},
json={
"lang": "fr-FR",
"filters": filters,
"limit": page_size,
"page": page,
"sort": [],
},
timeout=API_TIMEOUT,
)
response.raise_for_status()
return response.json()
@api_call_with_retry
def get_request_history(request_id):
"""Fetches the full audit history for a single request (descending chronological order)."""
client = get_httpx_client()
client.base_url = GDD_URL
response = client.get(
API_REQUEST_HISTORY_ENDPOINT,
headers={"Authorization": f"Bearer {access_token}"},
params={"sortOrder": "DESC", "requestId": request_id},
timeout=API_TIMEOUT,
)
response.raise_for_status()
return response.json()
# ============================================================================
# BLOCK 5: REQUEST PROCESSING
# ============================================================================
def _process_single_request(worklist_request):
"""Fetches the audit history for one worklist request.
Returns:
Tuple of (request_id, history_data) where history_data is None on
persistent API failure (stored as JSON null in the output).
"""
request_id = worklist_request.get("id")
thread_local_storage.current_request_context = {"id": request_id}
history_data = get_request_history(request_id)
return request_id, history_data
# ============================================================================
# BLOCK 6: FILE UTILITIES
# ============================================================================
def backup_history_file():
"""Backs up the existing history JSON file before writing a new version."""
if os.path.exists(HISTORY_FILE_NAME):
old_path = get_old_filename(HISTORY_FILE_NAME, OLD_FILE_SUFFIX)
try:
shutil.copy2(HISTORY_FILE_NAME, old_path)
except Exception as e:
logging.warning(f"Could not backup {HISTORY_FILE_NAME}: {e}")
# ============================================================================
# BLOCK 7: MAIN EXECUTION
# ============================================================================
def main():
global global_pbar
print()
login_status = login()
while login_status == "Error":
login_status = login()
if login_status == "Exit":
return
print()
do_filters = ask_do_filters()
print()
number_of_threads = int(
questionary.text(
"Number of threads :",
default="12",
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS,
).ask()
)
print()
ask_on_retry_exhausted()
print()
wait_for_scheduled_launch()
# === FETCH WORKLIST (paginated) ===
print()
start_time = perf_counter()
with console.status("[bold green]Fetching worklist (page 1)...", spinner="dots"):
first_page = get_worklist_page(do_filters, 1, DO_WORKLIST_PAGE_SIZE)
metadata = first_page.get("metadata", {})
total_requests = metadata.get("total", 0)
total_pages = metadata.get("pages", 1)
print(f"{total_requests} requests across {total_pages} pages...")
print()
# === SUBMIT ALL REQUESTS TO THREAD POOL AS PAGES ARRIVE ===
# fetching_pbar (position=0): advances as each worklist page is fetched
# processing_pbar (position=1): advances as each request history is retrieved
all_futures = []
all_results = []
completed_set = set()
with ThreadPoolExecutor(max_workers=number_of_threads) as thread_pool:
with tqdm(
total=total_requests,
unit="req.",
desc=f"{'Fetching requests':<52}",
position=0,
leave=True,
bar_format=custom_bar_format,
) as fetching_pbar:
with tqdm(
total=total_requests,
unit="req.",
desc=f"{'Fetching history':<52}",
position=1,
leave=True,
bar_format=custom_bar_format,
) as processing_pbar:
global_pbar = processing_pbar
def _drain_completed():
for f in list(all_futures):
if f not in completed_set and f.done():
try:
result = f.result()
all_results.append(result)
except Exception as exc:
logging.critical(
f"Critical exception in history worker: {exc}", exc_info=True
)
print(f"\nCRITICAL ERROR in history processing thread:")
print(f"Exception: {exc}")
traceback.print_exc()
thread_pool.shutdown(wait=False, cancel_futures=True)
raise
finally:
completed_set.add(f)
with _global_pbar_lock:
if global_pbar:
global_pbar.update(1)
# Submit first page requests
first_page_data = first_page.get("data", [])
for worklist_request in first_page_data:
f = thread_pool.submit(
run_with_context,
_process_single_request,
{"id": worklist_request.get("id")},
worklist_request,
)
all_futures.append(f)
fetching_pbar.update(len(first_page_data))
# Fetch and submit remaining pages; drain completed futures after each page
for page_num in range(2, total_pages + 1):
page_data = get_worklist_page(do_filters, page_num, DO_WORKLIST_PAGE_SIZE)
page_requests = page_data.get("data", [])
for worklist_request in page_requests:
f = thread_pool.submit(
run_with_context,
_process_single_request,
{"id": worklist_request.get("id")},
worklist_request,
)
all_futures.append(f)
fetching_pbar.update(len(page_requests))
_drain_completed()
# Drain remaining futures not yet collected
remaining = [f for f in all_futures if f not in completed_set]
for future in as_completed(remaining):
try:
result = future.result()
all_results.append(result)
except Exception as exc:
logging.critical(
f"Critical exception in history worker: {exc}", exc_info=True
)
print(f"\nCRITICAL ERROR in history processing thread:")
print(f"Exception: {exc}")
traceback.print_exc()
thread_pool.shutdown(wait=False, cancel_futures=True)
raise
finally:
completed_set.add(future)
with _global_pbar_lock:
if global_pbar:
global_pbar.update(1)
# === BUILD OUTPUT DICT ===
print()
print()
print("Building history output...")
output_dict = dict(
sorted(
((request_id, history_data) for request_id, history_data in all_results),
key=lambda x: x[0],
)
)
# === BACKUP & WRITE ===
try:
backup_history_file()
print("Writing file...")
with open(HISTORY_FILE_NAME, 'w', encoding='utf-8') as f_json:
json.dump(output_dict, f_json, indent=4, ensure_ascii=False)
console.print("[green]✓ History saved to JSON file[/green]")
except IOError as io_err:
logging.critical(f"Error while writing JSON file : {io_err}")
print(f"Error while writing JSON file : {io_err}")
except Exception as exc:
logging.critical(f"Error during final processing : {exc}")
print(f"Error during final processing : {exc}")
print()
print(f"Elapsed time : {str(timedelta(seconds=perf_counter() - start_time))}")
if __name__ == '__main__':
try:
main()
except Exception as e:
logging.critical(f"Script terminated prematurely due to an exception: {e}", exc_info=True)
print(f"Script stopped due to an error : {e}")
finally:
print('\n')
input("Press Enter to exit...")

Binary file not shown.