Extended Parallelism with 2 pbars and interruption management

This commit is contained in:
2026-02-13 02:53:46 +01:00
parent 24e8f1bf54
commit 3380b73ec7

View File

@@ -68,7 +68,7 @@ DEFAULT_PROFESSIONAL_ID = "99990000005"
# CONFIGURATION - PAGINATION
# ============================================================================
MAX_PAGE_SIZE = 1000
PAGE_SIZE = 10
# ============================================================================
@@ -552,15 +552,16 @@ def login():
# ============================================================================
@api_call_with_retry
def get_patients(professional_id):
"""Get all patients for a given professional ID (RPPS)."""
def get_patients_page(professional_id, page=1):
"""Get a page of patients for a given professional ID (RPPS).
Returns (patients_list, total_count)."""
client = get_httpx_client()
response = client.post(
f"{ENDOCONNECT_BASE_URL}{PATIENTS_LIST_PATH}",
headers={"Authorization": f"Bearer {token}"},
json={
"page": 1,
"pageSize": MAX_PAGE_SIZE,
"page": page,
"pageSize": PAGE_SIZE,
"RPPS": professional_id,
"search": ""
},
@@ -568,7 +569,7 @@ def get_patients(professional_id):
)
response.raise_for_status()
data = response.json()
return data.get("patients", [])
return data.get("patients", []), data.get("count", 0)
@api_call_with_retry
@@ -832,52 +833,136 @@ def main():
start_time = perf_counter()
main_thread_pool = ThreadPoolExecutor(max_workers=number_of_threads)
# ========== FETCH PATIENTS ==========
# ========== FETCH & PROCESS PATIENTS (paginated) ==========
print()
console.print("[bold cyan]Fetching patients list...[/bold cyan]")
all_patients = get_patients(professional_id)
console.print("[bold cyan]Fetching and processing patients...[/bold cyan]")
if all_patients is None:
# Fetch first page to get total count
thread_local_storage.current_patient_context = {"id": "N/A", "name": "page_fetcher"}
first_page_patients, total_count = get_patients_page(professional_id, page=1)
if first_page_patients is None:
logging.critical(f"Failed to fetch patients list for professional_id={professional_id}. Aborting.")
console.print("[bold red]Failed to fetch patients list. Aborting.[/bold red]")
return
# Filter: keep only patients with finished medical record
patients = [p for p in all_patients if p.get("isFinishMedicalRecord") is True]
total_pages = (total_count + PAGE_SIZE - 1) // PAGE_SIZE
console.print(f"[green]Total patients reported by API: {total_count} (across {total_pages} pages)[/green]")
console.print(f"[green]Total patients: {len(all_patients)} | With finished medical record: {len(patients)}[/green]")
if not patients:
console.print("[yellow]No patients with finished medical records found. Nothing to process.[/yellow]")
return
# ========== PROCESS PATIENTS ==========
print()
console.print("[bold cyan]Processing patients...[/bold cyan]")
output = []
# Shared state between producer and consumer
no_more_patients = threading.Event()
stop_fetch = threading.Event() # Signal from main thread to stop producer
futures = []
futures_lock = threading.Lock()
output = []
ignored_count = 0
ignored_count_lock = threading.Lock()
with tqdm(total=len(patients), desc="Extracting records",
bar_format=custom_bar_format) as pbar:
with main_thread_pool as executor:
for patient in patients:
ctx = {"id": patient["_id"], "name": patient.get("fullName", "Unknown")}
futures.append(
executor.submit(run_with_context, process_patient, ctx, patient)
)
# Progress bars
pbar_fetch = tqdm(total=total_count, desc="Fetching patients ",
bar_format=custom_bar_format, position=0)
pbar_process = tqdm(total=total_count, desc="Processing patients",
bar_format=custom_bar_format, position=1)
for future in as_completed(futures):
try:
result = future.result()
if result is not None:
output.append(result)
pbar.update(1)
except Exception as exc:
logging.critical(f"Error in worker: {exc}", exc_info=True)
print(f"\nCRITICAL ERROR: {exc}")
executor.shutdown(wait=False, cancel_futures=True)
raise
def submit_eligible_patients(page_patients, executor):
"""Filter eligible patients, submit to executor, update counters."""
nonlocal ignored_count
eligible = [p for p in page_patients if p.get("isFinishMedicalRecord") is True]
skipped = len(page_patients) - len(eligible)
if skipped > 0:
with ignored_count_lock:
ignored_count += skipped
pbar_process.total -= skipped
pbar_process.refresh()
for patient in eligible:
ctx = {"id": patient["_id"], "name": patient.get("fullName", "Unknown")}
future = executor.submit(run_with_context, process_patient, ctx, patient)
with futures_lock:
futures.append(future)
pbar_fetch.update(len(page_patients))
def producer(executor):
"""Fetch pages sequentially, submit eligible patients to the executor.
Checks stop_fetch event to allow graceful interruption from main thread."""
thread_local_storage.current_patient_context = {"id": "N/A", "name": "page_fetcher"}
try:
# Process first page (already fetched)
submit_eligible_patients(first_page_patients, executor)
# Fetch remaining pages
for page_num in range(2, total_pages + 1):
if stop_fetch.is_set():
logging.info(f"Producer stopped at page {page_num}/{total_pages} (CTRL-C).")
break
page_patients, _ = get_patients_page(professional_id, page=page_num)
if page_patients is None:
logging.warning(f"Failed to fetch page {page_num}/{total_pages}, skipping")
continue
submit_eligible_patients(page_patients, executor)
except Exception as exc:
logging.error(f"Producer error: {exc}", exc_info=True)
finally:
no_more_patients.set()
pbar_fetch.close()
with main_thread_pool as executor:
# Start producer in a separate thread
producer_thread = threading.Thread(target=producer, args=(executor,), daemon=True)
producer_thread.start()
# Consumer loop: collect results as they complete
processed_futures = set()
interrupted = False
while True:
try:
# Snapshot current futures not yet processed
with futures_lock:
current_futures = [f for f in futures if f not in processed_futures]
all_done = no_more_patients.is_set() and not current_futures
if all_done:
break
if not current_futures:
sleep(0.5)
continue
# Collect completed futures (timeout allows loop to re-check for new futures)
for future in as_completed(current_futures, timeout=1):
try:
result = future.result()
if result is not None:
output.append(result)
pbar_process.update(1)
except Exception as exc:
logging.critical(f"Error in worker: {exc}", exc_info=True)
print(f"\nCRITICAL ERROR: {exc}")
executor.shutdown(wait=False, cancel_futures=True)
raise
finally:
processed_futures.add(future)
except TimeoutError:
pass
except KeyboardInterrupt:
if not interrupted:
interrupted = True
stop_fetch.set()
console.print("\n[bold yellow]CTRL-C: Stopping fetch. Waiting for pending patients to finish...[/bold yellow]")
# Update pbar_process total to reflect only submitted futures
with futures_lock:
pbar_process.total = len(futures) - ignored_count
pbar_process.refresh()
producer_thread.join()
pbar_process.close()
console.print(f"\n[green]Fetched: {pbar_fetch.n} patients | "
f"Processed: {len(output)} | "
f"Ignored (unfinished): {ignored_count}[/green]")
# ========== SORT RESULTS ==========
output.sort(key=lambda x: (