From e9dc8015deea290ec7fe2f69c8fce579b541ff06 Mon Sep 17 00:00:00 2001 From: Abdelkouddous LHACHIMI Date: Thu, 12 Feb 2026 18:28:20 +0100 Subject: [PATCH] Advanced Retry Management --- eb_script_template.py | 136 ++++++++++++++++++++++++++++++------------ 1 file changed, 97 insertions(+), 39 deletions(-) diff --git a/eb_script_template.py b/eb_script_template.py index 05786bb..c4af497 100644 --- a/eb_script_template.py +++ b/eb_script_template.py @@ -121,6 +121,8 @@ SUBTASKS_POOL_SIZE = 40 # Fixed size for subtasks pool ERROR_MAX_RETRY = 10 # Max retry attempts for API calls WAIT_BEFORE_RETRY = 0.5 # Delay in seconds between retries (fixed) API_TIMEOUT = 60 # Default timeout for API calls (seconds) +MAX_BATCHS_OF_RETRIES = 3 # Max batches of retries for API calls +WAIT_BEFORE_NEW_BATCH_OF_RETRIES = 5 # Delay in seconds between retry batches # ============================================================================ @@ -166,6 +168,12 @@ _token_refresh_lock = threading.Lock() main_thread_pool = None subtasks_thread_pool = None +# User interaction lock +_user_interaction_lock = threading.Lock() + +# Thread-local storage for context +thread_local_storage = threading.local() + # Rich console for formatted output console = Console() @@ -298,6 +306,30 @@ def get_thread_position(): return threads_list.index(thread_id) +def clear_httpx_client(): + """ + Clear the thread-local HTTP client to force creation of a new one. + Useful for resetting connections after errors. + """ + global httpx_clients + thread_id = threading.get_ident() + if thread_id in httpx_clients: + try: + httpx_clients[thread_id].close() + except Exception: + pass + del httpx_clients[thread_id] + + +def run_with_context(func, context, *args, **kwargs): + """ + Wrapper to set thread-local context before running a function in a new thread. + Useful for ThreadPoolExecutor where context is lost. + """ + thread_local_storage.current_patient_context = context + return func(*args, **kwargs) + + # ============================================================================ # AUTHENTICATION # ============================================================================ @@ -422,51 +454,69 @@ def new_token(app): # DECORATORS # ============================================================================ -def api_call_with_retry(app): - """ - Decorator for API calls with automatic retry and token refresh on 401. - - Features: - - Retries on network errors (httpx.RequestError) - - Retries on HTTP errors (httpx.HTTPStatusError) - - Automatically refreshes token on 401 Unauthorized - - Configurable retry count and delay - - Args: - app: Microservice name for token refresh (e.g., "RC", "GDD") - - Usage: - @api_call_with_retry("RC") - def get_organizations(): - # API call implementation - ... - - Raises: - httpx.RequestError: If all retries exhausted - """ - def decorator(func): - @functools.wraps(func) - def wrapper(*args, **kwargs): - func_name = func.__name__ +def api_call_with_retry(func): + """Decorator for API calls with automatic retry and token refresh on 401 errors""" + @functools.wraps(func) + def wrapper(*args, **kwargs): + func_name = func.__name__ + total_attempts = 0 + batch_count = 1 + + while True: for attempt in range(ERROR_MAX_RETRY): + total_attempts += 1 try: return func(*args, **kwargs) except (httpx.RequestError, httpx.HTTPStatusError) as exc: - logging.warning(f"Error in {func_name} (Attempt {attempt + 1}/{ERROR_MAX_RETRY}): {exc}") - - # Auto-refresh token on 401 + logging.warning(f"Error in {func_name} (Attempt {total_attempts}): {exc}") + + # Refresh the thread-local client if an error occurs + # to avoid potential pool corruption or stale connections + clear_httpx_client() + if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: - logging.info(f"Token expired for {func_name}. Refreshing token for {app}.") - new_token(app) - + logging.info(f"Token expired for {func_name}. Refreshing token.") + new_token() + if attempt < ERROR_MAX_RETRY - 1: sleep(WAIT_BEFORE_RETRY) + else: + # Max retries reached for this batch + if batch_count < MAX_BATCHS_OF_RETRIES: + logging.warning(f"Batch {batch_count}/{MAX_BATCHS_OF_RETRIES} failed for {func_name}. " + f"Waiting {WAIT_BEFORE_NEW_BATCH_OF_RETRIES}s before automatic retry batch.") + batch_count += 1 + sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES) + break # Exit for loop to restart batch in while True + else: + # All automatic batches exhausted, ask the user + with _user_interaction_lock: + console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]") + console.print(f"[red]Exception: {exc}[/red]") + + choice = questionary.select( + f"What would you like to do for {func_name}?", + choices=[ + "Retry (try another batch of retries)", + "Ignore (return None and continue)", + "Stop script (critical error)" + ] + ).ask() + + if choice == "Retry (try another batch of retries)": + logging.info(f"User chose to retry {func_name}. Restarting batch sequence.") + batch_count = 1 # Reset batch counter for the next interactive round + break # Exit for loop to restart batch in while True + elif choice == "Ignore (return None and continue)": + # Retrieve context if available + ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"}) + logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}") + return None + else: + logging.critical(f"User chose to stop script after persistent error in {func_name}.") + raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)") - logging.critical(f"Persistent error in {func_name} after {ERROR_MAX_RETRY} attempts.") - raise httpx.RequestError(message=f"Persistent error in {func_name}") - - return wrapper - return decorator + return wrapper # ============================================================================ @@ -542,7 +592,7 @@ def main(): 4. Main processing block (TODO: implement your logic here) 5. Finalization (elapsed time) """ - global main_thread_pool, subtasks_thread_pool + global main_thread_pool, subtasks_thread_pool, thread_local_storage # ========== AUTHENTICATION ========== print() @@ -578,11 +628,19 @@ def main(): # Example pattern with progress bar and multithreading: # # items = [...] # Your data to process + # futures = [] # # with tqdm(total=len(items), desc="Processing items", # bar_format=custom_bar_format) as pbar: # with main_thread_pool as executor: - # futures = [executor.submit(process_item, item) for item in items] + # + # for item in items: + # + # # Set thread-local context for detailed error logging in decorators + # ctx = {"id": patient_id, "pseudo": pseudo} + # thread_local_storage.current_patient_context = ctx + # + # futures.append(executor.submit(run_with_context, process_item, ctx, item)) # # for future in as_completed(futures): # try: