Compare commits
11 Commits
6711bce827
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 428c88c268 | |||
| 415d003ea7 | |||
| 5c261bfc42 | |||
| 11287d3319 | |||
| d3c8916109 | |||
| a702865cda | |||
| 29cf2e3772 | |||
| ec5bb82c55 | |||
| 93ded6e385 | |||
| 96ddc1fb2f | |||
| 4960f17ccf |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -201,3 +201,4 @@ nul
|
||||
|
||||
/*.json
|
||||
/*.exe
|
||||
/pyproject.toml
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -62,8 +62,8 @@ IAM_URL = "https://api-auth.ziwig-connect.com"
|
||||
GDD_URL = "https://api-lab.ziwig-connect.com"
|
||||
GDD_APP_ID = "4f5ac063-6a22-4e2c-bda5-b50c0dddab79"
|
||||
|
||||
DEFAULT_USER_NAME = "paul.renaud"
|
||||
DEFAULT_PASSWORD = "Abdel#@#@#@123"
|
||||
DEFAULT_USER_NAME = "habib.benmoussa"
|
||||
DEFAULT_PASSWORD = "3DAqE5w&JCJx!RVKUd16#d^$u"
|
||||
|
||||
# ============================================================================
|
||||
# API ENDPOINTS
|
||||
@@ -133,3 +133,16 @@ BAR_N_FMT_WIDTH = 4
|
||||
BAR_TOTAL_FMT_WIDTH = 4
|
||||
BAR_TIME_WIDTH = 8
|
||||
BAR_RATE_WIDTH = 10
|
||||
|
||||
# ============================================================================
|
||||
# DO HISTORY — SPECIFIC CONSTANTS
|
||||
# ============================================================================
|
||||
|
||||
HISTORY_FILE_NAME = "do_requests_history.json"
|
||||
HISTORY_LOG_FILE_NAME = "history.log"
|
||||
|
||||
# Default filters proposed to the user at startup (editable as JSON before processing)
|
||||
DO_HISTORY_FILTERS_DEFAULT = {"hideArchivedRequests": False, "excludeTest": True, "status": "all-admin"}
|
||||
|
||||
# History API endpoint (GET + ?sortOrder=DESC&requestId={id})
|
||||
API_REQUEST_HISTORY_ENDPOINT = "/api/request-history"
|
||||
|
||||
3
do_history.bat
Normal file
3
do_history.bat
Normal file
@@ -0,0 +1,3 @@
|
||||
@echo off
|
||||
call C:\PythonProjects\.rcvenv\Scripts\activate.bat
|
||||
python do_history.py %*
|
||||
675
do_history.py
Normal file
675
do_history.py
Normal file
@@ -0,0 +1,675 @@
|
||||
|
||||
# DO (Diagnostic Order) Request History Exporter
|
||||
# This script exports the audit history of all diagnostic order requests matching
|
||||
# a configurable filter. It reuses the authentication, retry, and parallel processing
|
||||
# infrastructure of do_dashboard.py (token refresh, api_call_with_retry decorator,
|
||||
# thread-local HTTP clients, dual progress bars with ThreadPoolExecutor).
|
||||
# The DO_Filters constant is presented as an editable JSON prompt at startup.
|
||||
# Output: a single JSON dict {request_id: history_data} saved to do_requests_history.json.
|
||||
# Each value is the raw response from GET /api/request-history, or null on persistent error.
|
||||
import json
|
||||
import logging
|
||||
import msvcrt
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import sys
|
||||
import threading
|
||||
import traceback
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from datetime import timedelta, datetime
|
||||
from time import perf_counter, sleep
|
||||
import functools
|
||||
|
||||
import httpx
|
||||
import questionary
|
||||
from tqdm import tqdm
|
||||
from rich.console import Console
|
||||
|
||||
from do_dashboard_constants import (
|
||||
HISTORY_FILE_NAME,
|
||||
HISTORY_LOG_FILE_NAME,
|
||||
OLD_FILE_SUFFIX,
|
||||
DO_HISTORY_FILTERS_DEFAULT,
|
||||
DEFAULT_USER_NAME,
|
||||
DEFAULT_PASSWORD,
|
||||
IAM_URL,
|
||||
GDD_URL,
|
||||
GDD_APP_ID,
|
||||
ERROR_MAX_RETRY,
|
||||
WAIT_BEFORE_RETRY,
|
||||
WAIT_BEFORE_NEW_BATCH_OF_RETRIES,
|
||||
MAX_BATCHS_OF_RETRIES,
|
||||
MAX_THREADS,
|
||||
DO_WORKLIST_PAGE_SIZE,
|
||||
BAR_N_FMT_WIDTH,
|
||||
BAR_TOTAL_FMT_WIDTH,
|
||||
BAR_TIME_WIDTH,
|
||||
BAR_RATE_WIDTH,
|
||||
API_TIMEOUT,
|
||||
API_AUTH_LOGIN_ENDPOINT,
|
||||
API_AUTH_CONFIG_TOKEN_ENDPOINT,
|
||||
API_AUTH_REFRESH_TOKEN_ENDPOINT,
|
||||
API_DO_WORKLIST_ENDPOINT,
|
||||
API_REQUEST_HISTORY_ENDPOINT,
|
||||
)
|
||||
from do_dashboard_utils import (
|
||||
get_httpx_client,
|
||||
clear_httpx_client,
|
||||
thread_local_storage,
|
||||
run_with_context,
|
||||
get_old_filename,
|
||||
)
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.WARNING,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
filename=HISTORY_LOG_FILE_NAME,
|
||||
filemode='w',
|
||||
)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# BLOCK 1: CONFIGURATION & BASE INFRASTRUCTURE
|
||||
# ============================================================================
|
||||
|
||||
access_token = ""
|
||||
refresh_token = ""
|
||||
threads_list = []
|
||||
_token_refresh_lock = threading.Lock()
|
||||
on_retry_exhausted = "ask"
|
||||
_stored_username = ""
|
||||
_stored_password = ""
|
||||
_threads_list_lock = threading.Lock()
|
||||
global_pbar = None
|
||||
_global_pbar_lock = threading.Lock()
|
||||
_user_interaction_lock = threading.Lock()
|
||||
|
||||
httpx_clients = {}
|
||||
console = Console()
|
||||
|
||||
import do_dashboard_utils
|
||||
do_dashboard_utils.httpx_clients = httpx_clients
|
||||
do_dashboard_utils.threads_list = threads_list
|
||||
do_dashboard_utils._threads_list_lock = _threads_list_lock
|
||||
|
||||
custom_bar_format = (
|
||||
"{l_bar}{bar}"
|
||||
f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} "
|
||||
f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, "
|
||||
f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}"
|
||||
)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# BLOCK 2: DECORATORS & RESILIENCE
|
||||
# ============================================================================
|
||||
|
||||
def new_token():
|
||||
"""Refresh access token using the refresh token, with re-login fallback."""
|
||||
global access_token, refresh_token
|
||||
with _token_refresh_lock:
|
||||
for attempt in range(ERROR_MAX_RETRY):
|
||||
try:
|
||||
client = get_httpx_client()
|
||||
client.base_url = GDD_URL
|
||||
response = client.post(
|
||||
API_AUTH_REFRESH_TOKEN_ENDPOINT,
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
json={"refresh_token": refresh_token},
|
||||
timeout=20,
|
||||
)
|
||||
response.raise_for_status()
|
||||
access_token = response.json()["access_token"]
|
||||
refresh_token = response.json()["refresh_token"]
|
||||
return
|
||||
except httpx.RequestError as exc:
|
||||
logging.warning(f"Refresh Token Error (Attempt {attempt + 1}) : {exc}")
|
||||
clear_httpx_client()
|
||||
except httpx.HTTPStatusError as exc:
|
||||
logging.warning(
|
||||
f"Refresh Token Error (Attempt {attempt + 1}) : "
|
||||
f"{exc.response.status_code} for Url {exc.request.url}"
|
||||
)
|
||||
clear_httpx_client()
|
||||
finally:
|
||||
if attempt < ERROR_MAX_RETRY - 1:
|
||||
sleep(WAIT_BEFORE_RETRY)
|
||||
logging.warning("Refresh token exhausted. Attempting re-login with stored credentials.")
|
||||
_do_login(_stored_username, _stored_password)
|
||||
logging.info("Re-login successful. New tokens acquired.")
|
||||
|
||||
|
||||
def api_call_with_retry(func):
|
||||
"""Decorator for API calls with automatic retry and token refresh on 401 errors."""
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
func_name = func.__name__
|
||||
total_attempts = 0
|
||||
batch_count = 1
|
||||
|
||||
while True:
|
||||
for attempt in range(ERROR_MAX_RETRY):
|
||||
total_attempts += 1
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
|
||||
logging.warning(f"Error in {func_name} (Attempt {total_attempts}): {exc}")
|
||||
|
||||
clear_httpx_client()
|
||||
|
||||
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
|
||||
logging.info(f"Token expired for {func_name}. Refreshing token.")
|
||||
try:
|
||||
new_token()
|
||||
except (httpx.RequestError, httpx.HTTPStatusError) as token_exc:
|
||||
logging.warning(f"Token refresh/re-login failed for {func_name}: {token_exc}")
|
||||
|
||||
if attempt < ERROR_MAX_RETRY - 1:
|
||||
sleep(WAIT_BEFORE_RETRY)
|
||||
else:
|
||||
if batch_count < MAX_BATCHS_OF_RETRIES:
|
||||
logging.warning(
|
||||
f"Batch {batch_count}/{MAX_BATCHS_OF_RETRIES} failed for {func_name}. "
|
||||
f"Waiting {WAIT_BEFORE_NEW_BATCH_OF_RETRIES}s before automatic retry batch."
|
||||
)
|
||||
batch_count += 1
|
||||
sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES)
|
||||
break
|
||||
else:
|
||||
with _user_interaction_lock:
|
||||
if on_retry_exhausted == "ignore":
|
||||
ctx = getattr(thread_local_storage, "current_request_context", {"id": "Unknown"})
|
||||
logging.warning(
|
||||
f"[AUTO-IGNORE] Skipping {func_name} for Request {ctx['id']}. Error: {exc}"
|
||||
)
|
||||
return None
|
||||
|
||||
elif on_retry_exhausted == "abort":
|
||||
logging.critical(
|
||||
f"[AUTO-ABORT] Stopping script after persistent error in {func_name}. Error: {exc}"
|
||||
)
|
||||
raise httpx.RequestError(
|
||||
message=f"Persistent error in {func_name} (auto-aborted)"
|
||||
)
|
||||
|
||||
else:
|
||||
console.print(
|
||||
f"\n[bold red]Persistent error in {func_name} after "
|
||||
f"{batch_count} batches ({total_attempts} attempts).[/bold red]"
|
||||
)
|
||||
console.print(f"[red]Exception: {exc}[/red]")
|
||||
|
||||
choice = questionary.select(
|
||||
f"What would you like to do for {func_name}?",
|
||||
choices=[
|
||||
"Retry (try another batch of retries)",
|
||||
"Ignore (return None and continue)",
|
||||
"Stop script (critical error)",
|
||||
],
|
||||
).ask()
|
||||
|
||||
if choice == "Retry (try another batch of retries)":
|
||||
logging.info(f"User chose to retry {func_name}. Restarting batch sequence.")
|
||||
batch_count = 1
|
||||
break
|
||||
elif choice == "Ignore (return None and continue)":
|
||||
ctx = getattr(thread_local_storage, "current_request_context", {"id": "Unknown"})
|
||||
logging.warning(
|
||||
f"[IGNORE] User opted to skip {func_name} for Request {ctx['id']}. Error: {exc}"
|
||||
)
|
||||
return None
|
||||
else:
|
||||
logging.critical(
|
||||
f"User chose to stop script after persistent error in {func_name}."
|
||||
)
|
||||
raise httpx.RequestError(
|
||||
message=f"Persistent error in {func_name} (stopped by user)"
|
||||
)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# BLOCK 3: AUTHENTICATION
|
||||
# ============================================================================
|
||||
|
||||
def _do_login(username, password):
|
||||
"""Performs the two-step authentication (IAM → GDD) with the given credentials.
|
||||
Updates global access_token and refresh_token on success.
|
||||
Raises httpx.RequestError or httpx.HTTPStatusError on failure.
|
||||
Must NOT acquire _token_refresh_lock (caller's responsibility).
|
||||
"""
|
||||
global access_token, refresh_token
|
||||
|
||||
client = get_httpx_client()
|
||||
client.base_url = IAM_URL
|
||||
response = client.post(
|
||||
API_AUTH_LOGIN_ENDPOINT,
|
||||
json={"username": username, "password": password},
|
||||
timeout=20,
|
||||
)
|
||||
response.raise_for_status()
|
||||
master_token = response.json()["access_token"]
|
||||
user_id = response.json()["userId"]
|
||||
|
||||
client = get_httpx_client()
|
||||
client.base_url = GDD_URL
|
||||
response = client.post(
|
||||
API_AUTH_CONFIG_TOKEN_ENDPOINT,
|
||||
headers={"Authorization": f"Bearer {master_token}"},
|
||||
json={
|
||||
"userId": user_id,
|
||||
"clientId": GDD_APP_ID,
|
||||
"userAgent": (
|
||||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
||||
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||||
"Chrome/137.0.0.0 Safari/537.36"
|
||||
),
|
||||
},
|
||||
timeout=20,
|
||||
)
|
||||
response.raise_for_status()
|
||||
access_token = response.json()["access_token"]
|
||||
refresh_token = response.json()["refresh_token"]
|
||||
|
||||
|
||||
def login():
|
||||
global _stored_username, _stored_password
|
||||
|
||||
user_name = questionary.text("login :", default=DEFAULT_USER_NAME).ask()
|
||||
password = questionary.password("password :", default=DEFAULT_PASSWORD).ask()
|
||||
if not (user_name and password):
|
||||
return "Exit"
|
||||
|
||||
try:
|
||||
_do_login(user_name, password)
|
||||
except httpx.RequestError as exc:
|
||||
print(f"Login Error : {exc}")
|
||||
logging.warning(f"Login Error : {exc}")
|
||||
return "Error"
|
||||
except httpx.HTTPStatusError as exc:
|
||||
print(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
|
||||
logging.warning(f"Login Error : {exc.response.status_code} for Url {exc.request.url}")
|
||||
return "Error"
|
||||
|
||||
_stored_username = user_name
|
||||
_stored_password = password
|
||||
print()
|
||||
print("Login Success")
|
||||
return "Success"
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# BLOCK 3B: STARTUP PARAMETERS
|
||||
# ============================================================================
|
||||
|
||||
def ask_do_filters():
|
||||
"""Presents DO_HISTORY_FILTERS_DEFAULT as an editable JSON with validation.
|
||||
Called immediately after login so the user can adjust filters before processing.
|
||||
"""
|
||||
default_value = json.dumps(DO_HISTORY_FILTERS_DEFAULT, ensure_ascii=False)
|
||||
|
||||
def validate_json(text):
|
||||
if not text:
|
||||
return "Les filtres ne peuvent pas être vides"
|
||||
try:
|
||||
json.loads(text)
|
||||
return True
|
||||
except json.JSONDecodeError as e:
|
||||
return f"JSON invalide : {e}"
|
||||
|
||||
filters_str = questionary.text(
|
||||
"DO Filters (JSON) :",
|
||||
default=default_value,
|
||||
validate=validate_json,
|
||||
).ask()
|
||||
|
||||
if filters_str is None:
|
||||
return DO_HISTORY_FILTERS_DEFAULT
|
||||
return json.loads(filters_str)
|
||||
|
||||
|
||||
def ask_on_retry_exhausted():
|
||||
"""Asks the user what to do when all API retry batches are exhausted."""
|
||||
global on_retry_exhausted
|
||||
choice = questionary.select(
|
||||
"On retry exhausted :",
|
||||
choices=[
|
||||
"Ask (interactive prompt)",
|
||||
"Ignore (return None and continue)",
|
||||
"Abort (stop script)",
|
||||
],
|
||||
).ask()
|
||||
|
||||
if choice is None or choice == "Ask (interactive prompt)":
|
||||
on_retry_exhausted = "ask"
|
||||
elif choice == "Ignore (return None and continue)":
|
||||
on_retry_exhausted = "ignore"
|
||||
else:
|
||||
on_retry_exhausted = "abort"
|
||||
|
||||
|
||||
def wait_for_scheduled_launch():
|
||||
"""Asks the user when to start the processing and waits if needed.
|
||||
Options: Immediately / In X minutes / At HH:MM
|
||||
"""
|
||||
choice = questionary.select(
|
||||
"When to start processing ?",
|
||||
choices=["Immediately", "In X minutes", "At HH:MM"],
|
||||
).ask()
|
||||
|
||||
if choice is None or choice == "Immediately":
|
||||
return
|
||||
|
||||
if choice == "In X minutes":
|
||||
minutes_str = questionary.text(
|
||||
"Number of minutes :",
|
||||
validate=lambda x: x.isdigit() and int(x) > 0,
|
||||
).ask()
|
||||
if not minutes_str:
|
||||
return
|
||||
target_time = datetime.now() + timedelta(minutes=int(minutes_str))
|
||||
|
||||
else: # "At HH:MM"
|
||||
time_str = questionary.text(
|
||||
"Start time (HH:MM) :",
|
||||
validate=lambda x: bool(re.match(r'^\d{2}:\d{2}$', x))
|
||||
and 0 <= int(x.split(':')[0]) <= 23
|
||||
and 0 <= int(x.split(':')[1]) <= 59,
|
||||
).ask()
|
||||
if not time_str:
|
||||
return
|
||||
now = datetime.now()
|
||||
h, m = int(time_str.split(':')[0]), int(time_str.split(':')[1])
|
||||
target_time = now.replace(hour=h, minute=m, second=0, microsecond=0)
|
||||
if target_time <= now:
|
||||
console.print("[yellow]⚠ Specified time is already past. Starting immediately.[/yellow]")
|
||||
return
|
||||
|
||||
print()
|
||||
try:
|
||||
while True:
|
||||
remaining = target_time - datetime.now()
|
||||
if remaining.total_seconds() <= 0:
|
||||
break
|
||||
total_secs = int(remaining.total_seconds())
|
||||
h = total_secs // 3600
|
||||
m = (total_secs % 3600) // 60
|
||||
s = total_secs % 60
|
||||
target_str = target_time.strftime('%H:%M:%S')
|
||||
print(
|
||||
f"\r Starting in {h:02d}:{m:02d}:{s:02d}... (at {target_str}) — Ctrl+C to cancel ",
|
||||
end="",
|
||||
flush=True,
|
||||
)
|
||||
sleep(1)
|
||||
while msvcrt.kbhit():
|
||||
msvcrt.getwch()
|
||||
print()
|
||||
console.print("[green]✓ Starting processing.[/green]")
|
||||
except KeyboardInterrupt:
|
||||
print()
|
||||
console.print("[bold red]Launch cancelled by user.[/bold red]")
|
||||
raise SystemExit(0)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# BLOCK 4: BUSINESS API CALLS
|
||||
# ============================================================================
|
||||
|
||||
@api_call_with_retry
|
||||
def get_worklist_page(filters, page, page_size):
|
||||
"""Fetches one page of the diagnostic order worklist."""
|
||||
client = get_httpx_client()
|
||||
client.base_url = GDD_URL
|
||||
response = client.post(
|
||||
API_DO_WORKLIST_ENDPOINT,
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
json={
|
||||
"lang": "fr-FR",
|
||||
"filters": filters,
|
||||
"limit": page_size,
|
||||
"page": page,
|
||||
"sort": [],
|
||||
},
|
||||
timeout=API_TIMEOUT,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
|
||||
@api_call_with_retry
|
||||
def get_request_history(request_id):
|
||||
"""Fetches the full audit history for a single request (descending chronological order)."""
|
||||
client = get_httpx_client()
|
||||
client.base_url = GDD_URL
|
||||
response = client.get(
|
||||
API_REQUEST_HISTORY_ENDPOINT,
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
params={"sortOrder": "DESC", "requestId": request_id},
|
||||
timeout=API_TIMEOUT,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# BLOCK 5: REQUEST PROCESSING
|
||||
# ============================================================================
|
||||
|
||||
def _process_single_request(worklist_request):
|
||||
"""Fetches the audit history for one worklist request.
|
||||
|
||||
Returns:
|
||||
Tuple of (request_id, history_data) where history_data is None on
|
||||
persistent API failure (stored as JSON null in the output).
|
||||
"""
|
||||
request_id = worklist_request.get("id")
|
||||
thread_local_storage.current_request_context = {"id": request_id}
|
||||
history_data = get_request_history(request_id)
|
||||
return request_id, history_data
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# BLOCK 6: FILE UTILITIES
|
||||
# ============================================================================
|
||||
|
||||
def backup_history_file():
|
||||
"""Backs up the existing history JSON file before writing a new version."""
|
||||
if os.path.exists(HISTORY_FILE_NAME):
|
||||
old_path = get_old_filename(HISTORY_FILE_NAME, OLD_FILE_SUFFIX)
|
||||
try:
|
||||
shutil.copy2(HISTORY_FILE_NAME, old_path)
|
||||
except Exception as e:
|
||||
logging.warning(f"Could not backup {HISTORY_FILE_NAME}: {e}")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# BLOCK 7: MAIN EXECUTION
|
||||
# ============================================================================
|
||||
|
||||
def main():
|
||||
global global_pbar
|
||||
|
||||
print()
|
||||
login_status = login()
|
||||
|
||||
while login_status == "Error":
|
||||
login_status = login()
|
||||
if login_status == "Exit":
|
||||
return
|
||||
|
||||
print()
|
||||
do_filters = ask_do_filters()
|
||||
|
||||
print()
|
||||
number_of_threads = int(
|
||||
questionary.text(
|
||||
"Number of threads :",
|
||||
default="12",
|
||||
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS,
|
||||
).ask()
|
||||
)
|
||||
|
||||
print()
|
||||
ask_on_retry_exhausted()
|
||||
|
||||
print()
|
||||
wait_for_scheduled_launch()
|
||||
|
||||
# === FETCH WORKLIST (paginated) ===
|
||||
print()
|
||||
start_time = perf_counter()
|
||||
|
||||
with console.status("[bold green]Fetching worklist (page 1)...", spinner="dots"):
|
||||
first_page = get_worklist_page(do_filters, 1, DO_WORKLIST_PAGE_SIZE)
|
||||
|
||||
metadata = first_page.get("metadata", {})
|
||||
total_requests = metadata.get("total", 0)
|
||||
total_pages = metadata.get("pages", 1)
|
||||
|
||||
print(f"{total_requests} requests across {total_pages} pages...")
|
||||
print()
|
||||
|
||||
# === SUBMIT ALL REQUESTS TO THREAD POOL AS PAGES ARRIVE ===
|
||||
# fetching_pbar (position=0): advances as each worklist page is fetched
|
||||
# processing_pbar (position=1): advances as each request history is retrieved
|
||||
all_futures = []
|
||||
all_results = []
|
||||
completed_set = set()
|
||||
|
||||
with ThreadPoolExecutor(max_workers=number_of_threads) as thread_pool:
|
||||
|
||||
with tqdm(
|
||||
total=total_requests,
|
||||
unit="req.",
|
||||
desc=f"{'Fetching requests':<52}",
|
||||
position=0,
|
||||
leave=True,
|
||||
bar_format=custom_bar_format,
|
||||
) as fetching_pbar:
|
||||
|
||||
with tqdm(
|
||||
total=total_requests,
|
||||
unit="req.",
|
||||
desc=f"{'Fetching history':<52}",
|
||||
position=1,
|
||||
leave=True,
|
||||
bar_format=custom_bar_format,
|
||||
) as processing_pbar:
|
||||
|
||||
global_pbar = processing_pbar
|
||||
|
||||
def _drain_completed():
|
||||
for f in list(all_futures):
|
||||
if f not in completed_set and f.done():
|
||||
try:
|
||||
result = f.result()
|
||||
all_results.append(result)
|
||||
except Exception as exc:
|
||||
logging.critical(
|
||||
f"Critical exception in history worker: {exc}", exc_info=True
|
||||
)
|
||||
print(f"\nCRITICAL ERROR in history processing thread:")
|
||||
print(f"Exception: {exc}")
|
||||
traceback.print_exc()
|
||||
thread_pool.shutdown(wait=False, cancel_futures=True)
|
||||
raise
|
||||
finally:
|
||||
completed_set.add(f)
|
||||
with _global_pbar_lock:
|
||||
if global_pbar:
|
||||
global_pbar.update(1)
|
||||
|
||||
# Submit first page requests
|
||||
first_page_data = first_page.get("data", [])
|
||||
for worklist_request in first_page_data:
|
||||
f = thread_pool.submit(
|
||||
run_with_context,
|
||||
_process_single_request,
|
||||
{"id": worklist_request.get("id")},
|
||||
worklist_request,
|
||||
)
|
||||
all_futures.append(f)
|
||||
fetching_pbar.update(len(first_page_data))
|
||||
|
||||
# Fetch and submit remaining pages; drain completed futures after each page
|
||||
for page_num in range(2, total_pages + 1):
|
||||
page_data = get_worklist_page(do_filters, page_num, DO_WORKLIST_PAGE_SIZE)
|
||||
page_requests = page_data.get("data", [])
|
||||
for worklist_request in page_requests:
|
||||
f = thread_pool.submit(
|
||||
run_with_context,
|
||||
_process_single_request,
|
||||
{"id": worklist_request.get("id")},
|
||||
worklist_request,
|
||||
)
|
||||
all_futures.append(f)
|
||||
fetching_pbar.update(len(page_requests))
|
||||
_drain_completed()
|
||||
|
||||
# Drain remaining futures not yet collected
|
||||
remaining = [f for f in all_futures if f not in completed_set]
|
||||
for future in as_completed(remaining):
|
||||
try:
|
||||
result = future.result()
|
||||
all_results.append(result)
|
||||
except Exception as exc:
|
||||
logging.critical(
|
||||
f"Critical exception in history worker: {exc}", exc_info=True
|
||||
)
|
||||
print(f"\nCRITICAL ERROR in history processing thread:")
|
||||
print(f"Exception: {exc}")
|
||||
traceback.print_exc()
|
||||
thread_pool.shutdown(wait=False, cancel_futures=True)
|
||||
raise
|
||||
finally:
|
||||
completed_set.add(future)
|
||||
with _global_pbar_lock:
|
||||
if global_pbar:
|
||||
global_pbar.update(1)
|
||||
|
||||
# === BUILD OUTPUT DICT ===
|
||||
print()
|
||||
print()
|
||||
print("Building history output...")
|
||||
|
||||
output_dict = dict(
|
||||
sorted(
|
||||
((request_id, history_data) for request_id, history_data in all_results),
|
||||
key=lambda x: x[0],
|
||||
)
|
||||
)
|
||||
|
||||
# === BACKUP & WRITE ===
|
||||
try:
|
||||
backup_history_file()
|
||||
|
||||
print("Writing file...")
|
||||
with open(HISTORY_FILE_NAME, 'w', encoding='utf-8') as f_json:
|
||||
json.dump(output_dict, f_json, indent=4, ensure_ascii=False)
|
||||
|
||||
console.print("[green]✓ History saved to JSON file[/green]")
|
||||
|
||||
except IOError as io_err:
|
||||
logging.critical(f"Error while writing JSON file : {io_err}")
|
||||
print(f"Error while writing JSON file : {io_err}")
|
||||
except Exception as exc:
|
||||
logging.critical(f"Error during final processing : {exc}")
|
||||
print(f"Error during final processing : {exc}")
|
||||
|
||||
print()
|
||||
print(f"Elapsed time : {str(timedelta(seconds=perf_counter() - start_time))}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
try:
|
||||
main()
|
||||
except Exception as e:
|
||||
logging.critical(f"Script terminated prematurely due to an exception: {e}", exc_info=True)
|
||||
print(f"Script stopped due to an error : {e}")
|
||||
finally:
|
||||
print('\n')
|
||||
input("Press Enter to exit...")
|
||||
Binary file not shown.
Reference in New Issue
Block a user