Fixes after first test

This commit is contained in:
2026-03-28 02:29:24 +01:00
parent 61ce661332
commit a2dea95c89
8 changed files with 44046 additions and 49 deletions

View File

@@ -1169,10 +1169,6 @@ def _process_single_request(worklist_request, mapping_dict):
else:
request_detail["Center_Name"] = None
# Also inject organization and labeledOrganization for mapping access
request_detail["organization"] = worklist_request.get("organization")
request_detail["labeledOrganization"] = labeled_org
# --- 6. Apply requests mapping to produce output object ---
output_request = {}
process_requests_mapping(output_request, request_detail)
@@ -1324,10 +1320,12 @@ def main():
print()
console.print("[bold cyan]Loading Excel export configuration...[/bold cyan]")
excel_export_config, has_config_critical, _ = \
excel_export_config, has_config_critical, config_error_messages = \
prepare_excel_export(requests_mapping_config, organizations_mapping_config)
if has_config_critical:
for err in config_error_messages:
console.print(f"[bold red] • {err}[/bold red]")
print()
answer = questionary.confirm(
"⚠ Critical configuration errors detected. Continue anyway?",
@@ -1361,61 +1359,87 @@ def main():
print()
# === SUBMIT ALL REQUESTS TO THREAD POOL AS PAGES ARRIVE ===
# Both progress bars are shown simultaneously:
# - fetching_pbar (position=0): advances by the number of requests in each fetched page
# - processing_pbar (position=1): advances as each request finishes processing
# Completed futures are drained inline after each page so the processing bar
# starts moving immediately, without waiting for all pages to be fetched first.
all_futures = []
all_results = []
completed_set = set()
with ThreadPoolExecutor(max_workers=number_of_threads) as thread_pool:
# Progress bar 1: page fetching
with tqdm(total=total_pages, unit="page",
desc=f"{'Fetching pages':<52}",
with tqdm(total=total_requests, unit="req.",
desc=f"{'Fetching requests':<52}",
position=0, leave=True,
bar_format=custom_bar_format) as pages_pbar:
bar_format=custom_bar_format) as fetching_pbar:
# Submit first page requests
for worklist_request in first_page.get("data", []):
f = thread_pool.submit(run_with_context, _process_single_request,
{"id": worklist_request.get("id")},
worklist_request, mapping_dict)
all_futures.append(f)
pages_pbar.update(1)
with tqdm(total=total_requests, unit="req.",
desc=f"{'Processing requests':<52}",
position=1, leave=True,
bar_format=custom_bar_format) as processing_pbar:
# Fetch and submit remaining pages
for page_num in range(2, total_pages + 1):
page_data = get_worklist_page(do_filters, page_num, DO_WORKLIST_PAGE_SIZE)
for worklist_request in page_data.get("data", []):
global_pbar = processing_pbar
def _drain_completed():
for f in list(all_futures):
if f not in completed_set and f.done():
try:
result = f.result()
all_results.append(result)
except Exception as exc:
logging.critical(f"Critical exception in request worker: {exc}", exc_info=True)
print(f"\nCRITICAL ERROR in request processing thread:")
print(f"Exception: {exc}")
traceback.print_exc()
thread_pool.shutdown(wait=False, cancel_futures=True)
raise
finally:
completed_set.add(f)
with _global_pbar_lock:
if global_pbar:
global_pbar.update(1)
# Submit first page requests
first_page_data = first_page.get("data", [])
for worklist_request in first_page_data:
f = thread_pool.submit(run_with_context, _process_single_request,
{"id": worklist_request.get("id")},
worklist_request, mapping_dict)
all_futures.append(f)
pages_pbar.update(1)
fetching_pbar.update(len(first_page_data))
print()
# Fetch and submit remaining pages; drain completed futures after each page
for page_num in range(2, total_pages + 1):
page_data = get_worklist_page(do_filters, page_num, DO_WORKLIST_PAGE_SIZE)
page_requests = page_data.get("data", [])
for worklist_request in page_requests:
f = thread_pool.submit(run_with_context, _process_single_request,
{"id": worklist_request.get("id")},
worklist_request, mapping_dict)
all_futures.append(f)
fetching_pbar.update(len(page_requests))
_drain_completed()
# Progress bar 2: request processing
all_results = [] # list of (output_request, request_meta)
with tqdm(total=total_requests, unit="req.",
desc=f"{'Processing requests':<52}",
position=0, leave=True,
bar_format=custom_bar_format) as processing_pbar:
global_pbar = processing_pbar
for future in as_completed(all_futures):
try:
result = future.result()
all_results.append(result)
except Exception as exc:
logging.critical(f"Critical exception in request worker: {exc}", exc_info=True)
print(f"\nCRITICAL ERROR in request processing thread:")
print(f"Exception: {exc}")
traceback.print_exc()
thread_pool.shutdown(wait=False, cancel_futures=True)
raise
finally:
with _global_pbar_lock:
if global_pbar:
global_pbar.update(1)
# Drain remaining futures not yet collected
remaining = [f for f in all_futures if f not in completed_set]
for future in as_completed(remaining):
try:
result = future.result()
all_results.append(result)
except Exception as exc:
logging.critical(f"Critical exception in request worker: {exc}", exc_info=True)
print(f"\nCRITICAL ERROR in request processing thread:")
print(f"Exception: {exc}")
traceback.print_exc()
thread_pool.shutdown(wait=False, cancel_futures=True)
raise
finally:
completed_set.add(future)
with _global_pbar_lock:
if global_pbar:
global_pbar.update(1)
# === SORT RESULTS ===
print()