From 3380b73ec774ded89a8a6a8419251a67fe1b9b01 Mon Sep 17 00:00:00 2001 From: Abdelkouddous LHACHIMI Date: Fri, 13 Feb 2026 02:53:46 +0100 Subject: [PATCH] Extended Parallelism with 2 pbars and interruption management --- extract_endoconnect_medical_records.py | 169 +++++++++++++++++++------ 1 file changed, 127 insertions(+), 42 deletions(-) diff --git a/extract_endoconnect_medical_records.py b/extract_endoconnect_medical_records.py index bba8ed3..c55ac15 100644 --- a/extract_endoconnect_medical_records.py +++ b/extract_endoconnect_medical_records.py @@ -68,7 +68,7 @@ DEFAULT_PROFESSIONAL_ID = "99990000005" # CONFIGURATION - PAGINATION # ============================================================================ -MAX_PAGE_SIZE = 1000 +PAGE_SIZE = 10 # ============================================================================ @@ -552,15 +552,16 @@ def login(): # ============================================================================ @api_call_with_retry -def get_patients(professional_id): - """Get all patients for a given professional ID (RPPS).""" +def get_patients_page(professional_id, page=1): + """Get a page of patients for a given professional ID (RPPS). + Returns (patients_list, total_count).""" client = get_httpx_client() response = client.post( f"{ENDOCONNECT_BASE_URL}{PATIENTS_LIST_PATH}", headers={"Authorization": f"Bearer {token}"}, json={ - "page": 1, - "pageSize": MAX_PAGE_SIZE, + "page": page, + "pageSize": PAGE_SIZE, "RPPS": professional_id, "search": "" }, @@ -568,7 +569,7 @@ def get_patients(professional_id): ) response.raise_for_status() data = response.json() - return data.get("patients", []) + return data.get("patients", []), data.get("count", 0) @api_call_with_retry @@ -832,52 +833,136 @@ def main(): start_time = perf_counter() main_thread_pool = ThreadPoolExecutor(max_workers=number_of_threads) - # ========== FETCH PATIENTS ========== + # ========== FETCH & PROCESS PATIENTS (paginated) ========== print() - console.print("[bold cyan]Fetching patients list...[/bold cyan]") - all_patients = get_patients(professional_id) + console.print("[bold cyan]Fetching and processing patients...[/bold cyan]") - if all_patients is None: + # Fetch first page to get total count + thread_local_storage.current_patient_context = {"id": "N/A", "name": "page_fetcher"} + first_page_patients, total_count = get_patients_page(professional_id, page=1) + if first_page_patients is None: logging.critical(f"Failed to fetch patients list for professional_id={professional_id}. Aborting.") console.print("[bold red]Failed to fetch patients list. Aborting.[/bold red]") return - # Filter: keep only patients with finished medical record - patients = [p for p in all_patients if p.get("isFinishMedicalRecord") is True] + total_pages = (total_count + PAGE_SIZE - 1) // PAGE_SIZE + console.print(f"[green]Total patients reported by API: {total_count} (across {total_pages} pages)[/green]") - console.print(f"[green]Total patients: {len(all_patients)} | With finished medical record: {len(patients)}[/green]") - - if not patients: - console.print("[yellow]No patients with finished medical records found. Nothing to process.[/yellow]") - return - - # ========== PROCESS PATIENTS ========== - print() - console.print("[bold cyan]Processing patients...[/bold cyan]") - - output = [] + # Shared state between producer and consumer + no_more_patients = threading.Event() + stop_fetch = threading.Event() # Signal from main thread to stop producer futures = [] + futures_lock = threading.Lock() + output = [] + ignored_count = 0 + ignored_count_lock = threading.Lock() - with tqdm(total=len(patients), desc="Extracting records", - bar_format=custom_bar_format) as pbar: - with main_thread_pool as executor: - for patient in patients: - ctx = {"id": patient["_id"], "name": patient.get("fullName", "Unknown")} - futures.append( - executor.submit(run_with_context, process_patient, ctx, patient) - ) + # Progress bars + pbar_fetch = tqdm(total=total_count, desc="Fetching patients ", + bar_format=custom_bar_format, position=0) + pbar_process = tqdm(total=total_count, desc="Processing patients", + bar_format=custom_bar_format, position=1) - for future in as_completed(futures): - try: - result = future.result() - if result is not None: - output.append(result) - pbar.update(1) - except Exception as exc: - logging.critical(f"Error in worker: {exc}", exc_info=True) - print(f"\nCRITICAL ERROR: {exc}") - executor.shutdown(wait=False, cancel_futures=True) - raise + def submit_eligible_patients(page_patients, executor): + """Filter eligible patients, submit to executor, update counters.""" + nonlocal ignored_count + eligible = [p for p in page_patients if p.get("isFinishMedicalRecord") is True] + skipped = len(page_patients) - len(eligible) + + if skipped > 0: + with ignored_count_lock: + ignored_count += skipped + pbar_process.total -= skipped + pbar_process.refresh() + + for patient in eligible: + ctx = {"id": patient["_id"], "name": patient.get("fullName", "Unknown")} + future = executor.submit(run_with_context, process_patient, ctx, patient) + with futures_lock: + futures.append(future) + + pbar_fetch.update(len(page_patients)) + + def producer(executor): + """Fetch pages sequentially, submit eligible patients to the executor. + Checks stop_fetch event to allow graceful interruption from main thread.""" + thread_local_storage.current_patient_context = {"id": "N/A", "name": "page_fetcher"} + try: + # Process first page (already fetched) + submit_eligible_patients(first_page_patients, executor) + + # Fetch remaining pages + for page_num in range(2, total_pages + 1): + if stop_fetch.is_set(): + logging.info(f"Producer stopped at page {page_num}/{total_pages} (CTRL-C).") + break + page_patients, _ = get_patients_page(professional_id, page=page_num) + if page_patients is None: + logging.warning(f"Failed to fetch page {page_num}/{total_pages}, skipping") + continue + submit_eligible_patients(page_patients, executor) + except Exception as exc: + logging.error(f"Producer error: {exc}", exc_info=True) + finally: + no_more_patients.set() + pbar_fetch.close() + + with main_thread_pool as executor: + # Start producer in a separate thread + producer_thread = threading.Thread(target=producer, args=(executor,), daemon=True) + producer_thread.start() + + # Consumer loop: collect results as they complete + processed_futures = set() + interrupted = False + while True: + try: + # Snapshot current futures not yet processed + with futures_lock: + current_futures = [f for f in futures if f not in processed_futures] + all_done = no_more_patients.is_set() and not current_futures + + if all_done: + break + + if not current_futures: + sleep(0.5) + continue + + # Collect completed futures (timeout allows loop to re-check for new futures) + for future in as_completed(current_futures, timeout=1): + try: + result = future.result() + if result is not None: + output.append(result) + pbar_process.update(1) + except Exception as exc: + logging.critical(f"Error in worker: {exc}", exc_info=True) + print(f"\nCRITICAL ERROR: {exc}") + executor.shutdown(wait=False, cancel_futures=True) + raise + finally: + processed_futures.add(future) + + except TimeoutError: + pass + except KeyboardInterrupt: + if not interrupted: + interrupted = True + stop_fetch.set() + console.print("\n[bold yellow]CTRL-C: Stopping fetch. Waiting for pending patients to finish...[/bold yellow]") + # Update pbar_process total to reflect only submitted futures + with futures_lock: + pbar_process.total = len(futures) - ignored_count + pbar_process.refresh() + + producer_thread.join() + + pbar_process.close() + + console.print(f"\n[green]Fetched: {pbar_fetch.n} patients | " + f"Processed: {len(output)} | " + f"Ignored (unfinished): {ignored_count}[/green]") # ========== SORT RESULTS ========== output.sort(key=lambda x: (