""" Endobest Script Template Template for creating scripts to access Endobest clinical research platform data. FEATURES: - Multi-microservice authentication (IAM, RC, GDD) - Thread-safe HTTP client pool - Multithreading with main pool + subtasks pool - Automatic retry with token refresh on 401 - Progress bars and logging - Utilities for JSON navigation HOW TO USE: 1. Configure MICROSERVICES dict (comment unused services) 2. Implement your processing logic in main() function 3. Use API templates as examples for your own endpoints 4. Customize constants as needed (timeouts, threads, etc.) QUICK START: - Run script: python eb_script_template.py - Login with credentials (defaults provided) - Choose number of threads - Processing happens in main() TODO block For detailed documentation, see Script_template_spec.md """ import json import logging import os import sys import threading import traceback from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import timedelta from time import perf_counter, sleep import functools import httpx import questionary from tqdm import tqdm from rich.console import Console # ============================================================================ # CONFIGURATION - CREDENTIALS # ============================================================================ DEFAULT_USER_NAME = "ziwig-invest2@yopmail.com" DEFAULT_PASSWORD = "pbrrA765$bP3beiuyuiyhiuy!agx" REALME = "ziwig-pro" # ============================================================================ # CONFIGURATION - MICROSERVICES # ============================================================================ # Comment out unused microservices to skip their token configuration MICROSERVICES = { "IAM": { "app_id": None, # IAM doesn't use app_id "base_url": "https://api-auth.ziwig-connect.com", "endpoints": { "login": "/api/auth/{REALME}/login", # POST : Body = {"username": "{user_name}", "password": "{pass}"} "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} "get_roles": "/api/profiles/paginate", # POST : Body = {"limit": 100, "currentPage": 1, "sort": [], "filters": {}} "get_user_by_id": "/api/users/find/{user_id}?domaine={REALME}", # GET "get_applications": "/api/applications", # GET "get_profiles_by_app_id": "/api/identity-profiles/paginate", # POST : Body = {"page":null,"limit":100,"search":{},"clientId":"{app_id}","type":"user"} "get_users_by_profile_id": "/api/identity-profiles/{profile_id}/users", # GET } }, "RC": { "app_id": "602aea51-cdb2-4f73-ac99-fd84050dc393", "base_url": "https://api-hcp.ziwig-connect.com", "endpoints": { "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}} "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} "organizations": "/api/inclusions/getAllOrganizations", # GET "statistics": "/api/inclusions/inclusion-statistics", # POST : Body = {"center": "rc_endobest_current_center}}", "protocolId": "{rc_endobest_prot_id}", "excludedCenters": {rc_endobest_excl_centers}} "search_inclusions": "/api/inclusions/search?limit={limit}&page={page}", # POST : Body = {"protocolId": "3c7bcb4d-91ed-4e9f-b93f-99d8447a276e", "center": organization_id, "keywords": ""} "record_by_patient": "/api/records/byPatient", # POST : Body = {"center": "{rc_endobest_current_center}", "patientId": "{patient_id}", "mode": "exchange", "state": "ongoing", "includeEndoParcour": false, "sourceClient": "pro_prm"}, "surveys": "/api/surveys/filter/with-answers", #POST : Body = {"context": "clinic_research", "subject": "{patient_id}", "blockedQcmVersions": {blocked_qcm_versions}} } }, "GDD": { "app_id": "4f5ac063-6a22-4e2c-bda5-b50c0dddab79", "base_url": "https://api-lab.ziwig-connect.com", "endpoints": { "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}} "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} "request_by_tube": "/api/requests/by-tube-id/{tube_id}", # GET } }, "HRD": { "app_id": "93bc44fd-c64b-4fff-a450-f3cba956e934", "base_url": "https://api-resources.ziwig-connect.com", "endpoints": { "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}} "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} "pro_by_id": "api/entity-manager/meta/{model}/data/nodes/pro/{pro_id}?relationships=2", # GET "get_pros_by_endobest_center": "/api/entity-manager/meta/modele_fr/data/orga/{organization_id}/centers/pros?limit=1000", # GET } }, } # ============================================================================ # CONFIGURATION - THREADING # ============================================================================ MAX_THREADS = 20 # Maximum threads for main pool SUBTASKS_POOL_SIZE = 40 # Fixed size for subtasks pool # ============================================================================ # CONFIGURATION - RETRY & TIMEOUTS # ============================================================================ ERROR_MAX_RETRY = 10 # Max retry attempts for API calls WAIT_BEFORE_RETRY = 0.5 # Delay in seconds between retries (fixed) API_TIMEOUT = 60 # Default timeout for API calls (seconds) MAX_BATCHS_OF_RETRIES = 3 # Max batches of retries for API calls WAIT_BEFORE_NEW_BATCH_OF_RETRIES = 5 # Delay in seconds between retry batches # ============================================================================ # CONFIGURATION - LOGGING # ============================================================================ LOG_LEVEL = logging.INFO # Change to DEBUG for detailed logs LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' # LOG_FILE_NAME auto-generated based on script name in __main__ # ============================================================================ # CONFIGURATION - PROGRESS BARS # ============================================================================ BAR_N_FMT_WIDTH = 4 BAR_TOTAL_FMT_WIDTH = 4 BAR_TIME_WIDTH = 8 BAR_RATE_WIDTH = 10 custom_bar_format = ("{l_bar}{bar}" f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} " f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, " f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}") # ============================================================================ # GLOBAL VARIABLES # ============================================================================ # Tokens storage: {app_name: {"access_token": ..., "refresh_token": ...}} tokens = {} # Thread-safe HTTP client pool (one client per thread) httpx_clients = {} # Thread management threads_list = [] _threads_list_lock = threading.Lock() _token_refresh_lock = threading.Lock() # Thread pools (initialized in main()) main_thread_pool = None subtasks_thread_pool = None # User interaction lock _user_interaction_lock = threading.Lock() # Thread-local storage for context thread_local_storage = threading.local() # Rich console for formatted output console = Console() # ============================================================================ # UTILITIES # ============================================================================ def get_nested_value(data_structure, path, default=None): """ Extract value from nested dict/list structures with wildcard support. Args: data_structure: Nested dict/list to navigate path: List of keys/indices. Use '*' for list wildcard default: Value to return if path not found Returns: Value at path, or default if not found Examples: get_nested_value({"a": {"b": 1}}, ["a", "b"]) -> 1 get_nested_value({"items": [{"x": 1}, {"x": 2}]}, ["items", "*", "x"]) -> [1, 2] """ if data_structure is None: return "$$$$ No Data" if not path: return default # Handle wildcard in path if "*" in path: wildcard_index = path.index("*") path_before = path[:wildcard_index] path_after = path[wildcard_index+1:] # Helper for non-wildcard path resolution def _get_simple_nested_value(ds, p, d): cl = ds for k in p: if isinstance(cl, dict): cl = cl.get(k) elif isinstance(cl, list): try: if isinstance(k, int) and -len(cl) <= k < len(cl): cl = cl[k] else: return d except (IndexError, TypeError): return d else: return d if cl is None: return d return cl base_level = _get_simple_nested_value(data_structure, path_before, default) if not isinstance(base_level, list): return default results = [] for item in base_level: value = get_nested_value(item, path_after, default) if value is not default and value != "$$$$ No Data": results.append(value) # Flatten one level for multiple wildcards final_results = [] for res in results: if isinstance(res, list): final_results.extend(res) else: final_results.append(res) return final_results # No wildcard - standard traversal current_level = data_structure for key_or_index in path: if isinstance(current_level, dict): current_level = current_level.get(key_or_index) if current_level is None: return default elif isinstance(current_level, list): try: if isinstance(key_or_index, int) and -len(current_level) <= key_or_index < len(current_level): current_level = current_level[key_or_index] else: return default except (IndexError, TypeError): return default else: return default return current_level def get_httpx_client() -> httpx.Client: """ Get or create thread-local HTTP client with keep-alive enabled. Each thread gets its own client to avoid connection conflicts. Returns: httpx.Client instance for current thread """ global httpx_clients thread_id = threading.get_ident() if thread_id not in httpx_clients: httpx_clients[thread_id] = httpx.Client( headers={"Connection": "keep-alive"}, limits=httpx.Limits(max_keepalive_connections=20, max_connections=100) ) return httpx_clients[thread_id] def get_thread_position(): """ Get position of current thread in threads list. Used for managing progress bar positions in multithreaded environment. Returns: Zero-based index of current thread """ global threads_list thread_id = threading.get_ident() with _threads_list_lock: if thread_id not in threads_list: threads_list.append(thread_id) return len(threads_list) - 1 else: return threads_list.index(thread_id) def clear_httpx_client(): """ Clear the thread-local HTTP client to force creation of a new one. Useful for resetting connections after errors. """ global httpx_clients thread_id = threading.get_ident() if thread_id in httpx_clients: try: httpx_clients[thread_id].close() except Exception: pass del httpx_clients[thread_id] def run_with_context(func, context, *args, **kwargs): """ Wrapper to set thread-local context before running a function in a new thread. Useful for ThreadPoolExecutor where context is lost. """ thread_local_storage.current_patient_context = context return func(*args, **kwargs) # ============================================================================ # AUTHENTICATION # ============================================================================ def login(): """ Authenticate with IAM and configure tokens for all microservices. Process: 1. Prompt for credentials (with defaults) 2. Login to IAM -> get master_token and user_id 3. For each microservice (except IAM): call config-token API 4. Store access_token and refresh_token for each service Returns: "Success": Authentication succeeded for all services "Error": Authentication failed (can retry) "Exit": User cancelled login """ global tokens # Prompt for credentials user_name = questionary.text("login:", default=DEFAULT_USER_NAME).ask() password = questionary.password("password:", default=DEFAULT_PASSWORD).ask() if not (user_name and password): return "Exit" # Step 1: Login to IAM try: client = get_httpx_client() client.base_url = MICROSERVICES["IAM"]["base_url"] response = client.post( MICROSERVICES["IAM"]["endpoints"]["login"].format(**{**globals(),**locals()}), json={"username": user_name, "password": password}, timeout=20 ) response.raise_for_status() master_token = response.json()["access_token"] user_id = response.json()["userId"] tokens["IAM"] = { "access_token": master_token, "refresh_token": response.json()["refresh_token"] } except (httpx.RequestError, httpx.HTTPStatusError) as exc: print(f"Login Error: {exc}") logging.warning(f"Login Error: {exc}") return "Error" # Step 2: Configure tokens for each microservice for app_name, app_config in MICROSERVICES.items(): if app_name == "IAM": continue # IAM doesn't need config-token try: client = get_httpx_client() client.base_url = app_config["base_url"] response = client.post( app_config["endpoints"]["config_token"].format(**{**globals(),**locals()}), headers={"Authorization": f"Bearer {master_token}"}, json={ "userId": user_id, "clientId": app_config["app_id"], "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36" }, timeout=20 ) response.raise_for_status() tokens[app_name] = { "access_token": response.json()["access_token"], "refresh_token": response.json()["refresh_token"] } except (httpx.RequestError, httpx.HTTPStatusError) as exc: print(f"Config-token Error for {app_name}: {exc}") logging.warning(f"Config-token Error for {app_name}: {exc}") return "Error" print("\nLogin Success") return "Success" def new_token(app): """ Refresh access token for a specific microservice. Uses refresh_token to obtain new access_token and refresh_token. Thread-safe with lock to prevent concurrent refresh attempts. Args: app: Microservice name (e.g., "RC", "GDD") Raises: httpx.RequestError: If refresh fails after all retries """ global tokens with _token_refresh_lock: for attempt in range(ERROR_MAX_RETRY): try: client = get_httpx_client() client.base_url = MICROSERVICES[app]["base_url"] response = client.post( MICROSERVICES[app]["endpoints"]["refresh"].format(**{**globals(),**locals()}), headers={"Authorization": f"Bearer {tokens[app]['access_token']}"}, json={"refresh_token": tokens[app]["refresh_token"]}, timeout=20 ) response.raise_for_status() tokens[app]["access_token"] = response.json()["access_token"] tokens[app]["refresh_token"] = response.json()["refresh_token"] return except (httpx.RequestError, httpx.HTTPStatusError) as exc: logging.warning(f"Refresh Token Error for {app} (Attempt {attempt + 1}): {exc}") if attempt < ERROR_MAX_RETRY - 1: sleep(WAIT_BEFORE_RETRY) logging.critical(f"Persistent error in refresh_token for {app}") raise httpx.RequestError(message=f"Persistent error in refresh_token for {app}") # ============================================================================ # DECORATORS # ============================================================================ def api_call_with_retry(func): """Decorator for API calls with automatic retry and token refresh on 401 errors""" @functools.wraps(func) def wrapper(*args, **kwargs): func_name = func.__name__ total_attempts = 0 batch_count = 1 while True: for attempt in range(ERROR_MAX_RETRY): total_attempts += 1 try: return func(*args, **kwargs) except (httpx.RequestError, httpx.HTTPStatusError) as exc: logging.warning(f"Error in {func_name} (Attempt {total_attempts}): {exc}") # Refresh the thread-local client if an error occurs # to avoid potential pool corruption or stale connections clear_httpx_client() if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: logging.info(f"Token expired for {func_name}. Refreshing token.") new_token() if attempt < ERROR_MAX_RETRY - 1: sleep(WAIT_BEFORE_RETRY) else: # Max retries reached for this batch if batch_count < MAX_BATCHS_OF_RETRIES: logging.warning(f"Batch {batch_count}/{MAX_BATCHS_OF_RETRIES} failed for {func_name}. " f"Waiting {WAIT_BEFORE_NEW_BATCH_OF_RETRIES}s before automatic retry batch.") batch_count += 1 sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES) break # Exit for loop to restart batch in while True else: # All automatic batches exhausted, ask the user with _user_interaction_lock: console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]") console.print(f"[red]Exception: {exc}[/red]") choice = questionary.select( f"What would you like to do for {func_name}?", choices=[ "Retry (try another batch of retries)", "Ignore (return None and continue)", "Stop script (critical error)" ] ).ask() if choice == "Retry (try another batch of retries)": logging.info(f"User chose to retry {func_name}. Restarting batch sequence.") batch_count = 1 # Reset batch counter for the next interactive round break # Exit for loop to restart batch in while True elif choice == "Ignore (return None and continue)": # Retrieve context if available ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"}) logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}") return None else: logging.critical(f"User chose to stop script after persistent error in {func_name}.") raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)") return wrapper # ============================================================================ # API TEMPLATES # ============================================================================ # Templates for common API patterns. Duplicate and modify for your needs. # Remember to: # - Choose appropriate HTTP method (GET/POST/PUT/DELETE) # - Update endpoint from MICROSERVICES dict # - Adjust timeout if needed (global API_TIMEOUT or specific value) # - Always return response.json() @api_call_with_retry("RC") def get_all_organizations(): """ Example API call using GET method. Returns: List of organization dictionaries """ client = get_httpx_client() client.base_url = MICROSERVICES["RC"]["base_url"].format(**{**globals(),**locals()}) response = client.get( MICROSERVICES["RC"]["endpoints"]["organizations"], headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"}, timeout=API_TIMEOUT ) response.raise_for_status() return response.json() @api_call_with_retry("RC") def search_inclusions(organization_id, limit, page): """ Example API call using POST method with query params and JSON body. Args: organization_id: Organization UUID limit: Max results per page page: Page number (1-based) Returns: Dict with "data" key containing list of inclusions """ client = get_httpx_client() client.base_url = MICROSERVICES["RC"]["base_url"].format(**{**globals(),**locals()}) response = client.post( f"{MICROSERVICES['RC']['endpoints']['search_inclusions']}?limit={limit}&page={page}", headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"}, json={ "protocolId": "3c7bcb4d-91ed-4e9f-b93f-99d8447a276e", # TODO: Configure if needed "center": organization_id, "keywords": "" }, timeout=API_TIMEOUT ) response.raise_for_status() return response.json() # ============================================================================ # MAIN PROCESSING # ============================================================================ def main(): """ Main processing function. Structure: 1. Authentication 2. Configuration (thread count) 3. Initialization (thread pools, timing) 4. Main processing block (TODO: implement your logic here) 5. Finalization (elapsed time) """ global main_thread_pool, subtasks_thread_pool, thread_local_storage # ========== AUTHENTICATION ========== print() login_status = login() while login_status == "Error": login_status = login() if login_status == "Exit": return # ========== CONFIGURATION ========== print() number_of_threads = int( questionary.text( "Number of threads:", default="12", validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS ).ask() ) # ========== INITIALIZATION ========== start_time = perf_counter() # Initialize thread pools main_thread_pool = ThreadPoolExecutor(max_workers=number_of_threads) subtasks_thread_pool = ThreadPoolExecutor(max_workers=SUBTASKS_POOL_SIZE) # ========== MAIN PROCESSING BLOCK ========== print() console.print("[bold cyan]Starting main processing...[/bold cyan]") # TODO: IMPLEMENT YOUR PROCESSING LOGIC HERE # # Example pattern with progress bar and multithreading: # # items = [...] # Your data to process # futures = [] # # with tqdm(total=len(items), desc="Processing items", # bar_format=custom_bar_format) as pbar: # with main_thread_pool as executor: # # for item in items: # # # Set thread-local context for detailed error logging in decorators # ctx = {"id": patient_id, "pseudo": pseudo} # thread_local_storage.current_patient_context = ctx # # futures.append(executor.submit(run_with_context, process_item, ctx, item)) # # for future in as_completed(futures): # try: # result = future.result() # # Process result here # pbar.update(1) # except Exception as exc: # logging.critical(f"Error in worker: {exc}", exc_info=True) # print(f"\nCRITICAL ERROR: {exc}") # executor.shutdown(wait=False, cancel_futures=True) # raise # # Example: Simple test to verify authentication works # organizations = get_all_organizations() # console.print(f"[green]Retrieved {len(organizations)} organizations[/green]") # ========== FINALIZATION ========== print() print(f"Elapsed time: {str(timedelta(seconds=perf_counter() - start_time))}") # ============================================================================ # ENTRY POINT # ============================================================================ if __name__ == '__main__': # ========== LOGGING CONFIGURATION ========== # Auto-generate log filename based on script name script_name = os.path.splitext(os.path.basename(__file__))[0] log_file_name = f"{script_name}.log" logging.basicConfig( level=LOG_LEVEL, format=LOG_FORMAT, filename=log_file_name, filemode='w' ) # ========== MAIN EXECUTION ========== try: main() except Exception as e: logging.critical(f"Script terminated with exception: {e}", exc_info=True) print(f"\nScript stopped due to error: {e}") print(traceback.format_exc()) finally: # ========== CLEANUP ========== # Shutdown thread pools gracefully if 'main_thread_pool' in globals() and main_thread_pool: main_thread_pool.shutdown(wait=False, cancel_futures=True) if 'subtasks_thread_pool' in globals() and subtasks_thread_pool: subtasks_thread_pool.shutdown(wait=False, cancel_futures=True) # Pause before exit (prevents console from closing immediately when launched from Windows Explorer) print('\n') input("Press Enter to exit...")