commit f513ce798ee82d78e1d5c7799b9c383250d14281 Author: Abdelkouddous LHACHIMI Date: Thu Feb 12 18:34:46 2026 +0100 Initial diff --git a/eb_script_template.bat b/eb_script_template.bat new file mode 100644 index 0000000..eb36bb4 --- /dev/null +++ b/eb_script_template.bat @@ -0,0 +1,4 @@ +@echo off +call C:\PythonProjects\.rcvenv\Scripts\activate.bat +python eb_script_template.py %* + diff --git a/eb_script_template.py b/eb_script_template.py new file mode 100644 index 0000000..c4af497 --- /dev/null +++ b/eb_script_template.py @@ -0,0 +1,699 @@ +""" +Endobest Script Template + +Template for creating scripts to access Endobest clinical research platform data. + +FEATURES: +- Multi-microservice authentication (IAM, RC, GDD) +- Thread-safe HTTP client pool +- Multithreading with main pool + subtasks pool +- Automatic retry with token refresh on 401 +- Progress bars and logging +- Utilities for JSON navigation + +HOW TO USE: +1. Configure MICROSERVICES dict (comment unused services) +2. Implement your processing logic in main() function +3. Use API templates as examples for your own endpoints +4. Customize constants as needed (timeouts, threads, etc.) + +QUICK START: +- Run script: python eb_script_template.py +- Login with credentials (defaults provided) +- Choose number of threads +- Processing happens in main() TODO block + +For detailed documentation, see Script_template_spec.md +""" + +import json +import logging +import os +import sys +import threading +import traceback +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import timedelta +from time import perf_counter, sleep +import functools + +import httpx +import questionary +from tqdm import tqdm +from rich.console import Console + + +# ============================================================================ +# CONFIGURATION - CREDENTIALS +# ============================================================================ + +DEFAULT_USER_NAME = "ziwig-invest2@yopmail.com" +DEFAULT_PASSWORD = "pbrrA765$bP3beiuyuiyhiuy!agx" +REALME = "ziwig-pro" + + +# ============================================================================ +# CONFIGURATION - MICROSERVICES +# ============================================================================ +# Comment out unused microservices to skip their token configuration + +MICROSERVICES = { + "IAM": { + "app_id": None, # IAM doesn't use app_id + "base_url": "https://api-auth.ziwig-connect.com", + "endpoints": { + "login": "/api/auth/{REALME}/login", # POST : Body = {"username": "{user_name}", "password": "{pass}"} + "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} + "get_roles": "/api/profiles/paginate", # POST : Body = {"limit": 100, "currentPage": 1, "sort": [], "filters": {}} + "get_user_by_id": "/api/users/find/{user_id}?domaine={REALME}", # GET + "get_applications": "/api/applications", # GET + "get_profiles_by_app_id": "/api/identity-profiles/paginate", # POST : Body = {"page":null,"limit":100,"search":{},"clientId":"{app_id}","type":"user"} + "get_users_by_profile_id": "/api/identity-profiles/{profile_id}/users", # GET + } + }, + "RC": { + "app_id": "602aea51-cdb2-4f73-ac99-fd84050dc393", + "base_url": "https://api-hcp.ziwig-connect.com", + "endpoints": { + "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}} + "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} + "organizations": "/api/inclusions/getAllOrganizations", # GET + "statistics": "/api/inclusions/inclusion-statistics", # POST : Body = {"center": "rc_endobest_current_center}}", "protocolId": "{rc_endobest_prot_id}", "excludedCenters": {rc_endobest_excl_centers}} + "search_inclusions": "/api/inclusions/search?limit={limit}&page={page}", # POST : Body = {"protocolId": "3c7bcb4d-91ed-4e9f-b93f-99d8447a276e", "center": organization_id, "keywords": ""} + "record_by_patient": "/api/records/byPatient", # POST : Body = {"center": "{rc_endobest_current_center}", "patientId": "{patient_id}", "mode": "exchange", "state": "ongoing", "includeEndoParcour": false, "sourceClient": "pro_prm"}, + "surveys": "/api/surveys/filter/with-answers", #POST : Body = {"context": "clinic_research", "subject": "{patient_id}", "blockedQcmVersions": {blocked_qcm_versions}} + } + }, + "GDD": { + "app_id": "4f5ac063-6a22-4e2c-bda5-b50c0dddab79", + "base_url": "https://api-lab.ziwig-connect.com", + "endpoints": { + "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}} + "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} + "request_by_tube": "/api/requests/by-tube-id/{tube_id}", # GET + } + }, + "HRD": { + "app_id": "93bc44fd-c64b-4fff-a450-f3cba956e934", + "base_url": "https://api-resources.ziwig-connect.com", + "endpoints": { + "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}} + "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} + "pro_by_id": "api/entity-manager/meta/{model}/data/nodes/pro/{pro_id}?relationships=2", # GET + "get_pros_by_endobest_center": "/api/entity-manager/meta/modele_fr/data/orga/{organization_id}/centers/pros?limit=1000", # GET + } + }, +} + + +# ============================================================================ +# CONFIGURATION - THREADING +# ============================================================================ + +MAX_THREADS = 20 # Maximum threads for main pool +SUBTASKS_POOL_SIZE = 40 # Fixed size for subtasks pool + + +# ============================================================================ +# CONFIGURATION - RETRY & TIMEOUTS +# ============================================================================ + +ERROR_MAX_RETRY = 10 # Max retry attempts for API calls +WAIT_BEFORE_RETRY = 0.5 # Delay in seconds between retries (fixed) +API_TIMEOUT = 60 # Default timeout for API calls (seconds) +MAX_BATCHS_OF_RETRIES = 3 # Max batches of retries for API calls +WAIT_BEFORE_NEW_BATCH_OF_RETRIES = 5 # Delay in seconds between retry batches + + +# ============================================================================ +# CONFIGURATION - LOGGING +# ============================================================================ + +LOG_LEVEL = logging.INFO # Change to DEBUG for detailed logs +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' +# LOG_FILE_NAME auto-generated based on script name in __main__ + + +# ============================================================================ +# CONFIGURATION - PROGRESS BARS +# ============================================================================ + +BAR_N_FMT_WIDTH = 4 +BAR_TOTAL_FMT_WIDTH = 4 +BAR_TIME_WIDTH = 8 +BAR_RATE_WIDTH = 10 + +custom_bar_format = ("{l_bar}{bar}" + f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} " + f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, " + f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}") + + +# ============================================================================ +# GLOBAL VARIABLES +# ============================================================================ + +# Tokens storage: {app_name: {"access_token": ..., "refresh_token": ...}} +tokens = {} + +# Thread-safe HTTP client pool (one client per thread) +httpx_clients = {} + +# Thread management +threads_list = [] +_threads_list_lock = threading.Lock() +_token_refresh_lock = threading.Lock() + +# Thread pools (initialized in main()) +main_thread_pool = None +subtasks_thread_pool = None + +# User interaction lock +_user_interaction_lock = threading.Lock() + +# Thread-local storage for context +thread_local_storage = threading.local() + +# Rich console for formatted output +console = Console() + + +# ============================================================================ +# UTILITIES +# ============================================================================ + +def get_nested_value(data_structure, path, default=None): + """ + Extract value from nested dict/list structures with wildcard support. + + Args: + data_structure: Nested dict/list to navigate + path: List of keys/indices. Use '*' for list wildcard + default: Value to return if path not found + + Returns: + Value at path, or default if not found + + Examples: + get_nested_value({"a": {"b": 1}}, ["a", "b"]) -> 1 + get_nested_value({"items": [{"x": 1}, {"x": 2}]}, ["items", "*", "x"]) -> [1, 2] + """ + if data_structure is None: + return "$$$$ No Data" + if not path: + return default + + # Handle wildcard in path + if "*" in path: + wildcard_index = path.index("*") + path_before = path[:wildcard_index] + path_after = path[wildcard_index+1:] + + # Helper for non-wildcard path resolution + def _get_simple_nested_value(ds, p, d): + cl = ds + for k in p: + if isinstance(cl, dict): + cl = cl.get(k) + elif isinstance(cl, list): + try: + if isinstance(k, int) and -len(cl) <= k < len(cl): + cl = cl[k] + else: + return d + except (IndexError, TypeError): + return d + else: + return d + if cl is None: + return d + return cl + + base_level = _get_simple_nested_value(data_structure, path_before, default) + + if not isinstance(base_level, list): + return default + + results = [] + for item in base_level: + value = get_nested_value(item, path_after, default) + if value is not default and value != "$$$$ No Data": + results.append(value) + + # Flatten one level for multiple wildcards + final_results = [] + for res in results: + if isinstance(res, list): + final_results.extend(res) + else: + final_results.append(res) + + return final_results + + # No wildcard - standard traversal + current_level = data_structure + for key_or_index in path: + if isinstance(current_level, dict): + current_level = current_level.get(key_or_index) + if current_level is None: + return default + elif isinstance(current_level, list): + try: + if isinstance(key_or_index, int) and -len(current_level) <= key_or_index < len(current_level): + current_level = current_level[key_or_index] + else: + return default + except (IndexError, TypeError): + return default + else: + return default + return current_level + + +def get_httpx_client() -> httpx.Client: + """ + Get or create thread-local HTTP client with keep-alive enabled. + Each thread gets its own client to avoid connection conflicts. + + Returns: + httpx.Client instance for current thread + """ + global httpx_clients + thread_id = threading.get_ident() + if thread_id not in httpx_clients: + httpx_clients[thread_id] = httpx.Client( + headers={"Connection": "keep-alive"}, + limits=httpx.Limits(max_keepalive_connections=20, max_connections=100) + ) + return httpx_clients[thread_id] + + +def get_thread_position(): + """ + Get position of current thread in threads list. + Used for managing progress bar positions in multithreaded environment. + + Returns: + Zero-based index of current thread + """ + global threads_list + thread_id = threading.get_ident() + with _threads_list_lock: + if thread_id not in threads_list: + threads_list.append(thread_id) + return len(threads_list) - 1 + else: + return threads_list.index(thread_id) + + +def clear_httpx_client(): + """ + Clear the thread-local HTTP client to force creation of a new one. + Useful for resetting connections after errors. + """ + global httpx_clients + thread_id = threading.get_ident() + if thread_id in httpx_clients: + try: + httpx_clients[thread_id].close() + except Exception: + pass + del httpx_clients[thread_id] + + +def run_with_context(func, context, *args, **kwargs): + """ + Wrapper to set thread-local context before running a function in a new thread. + Useful for ThreadPoolExecutor where context is lost. + """ + thread_local_storage.current_patient_context = context + return func(*args, **kwargs) + + +# ============================================================================ +# AUTHENTICATION +# ============================================================================ + +def login(): + """ + Authenticate with IAM and configure tokens for all microservices. + + Process: + 1. Prompt for credentials (with defaults) + 2. Login to IAM -> get master_token and user_id + 3. For each microservice (except IAM): call config-token API + 4. Store access_token and refresh_token for each service + + Returns: + "Success": Authentication succeeded for all services + "Error": Authentication failed (can retry) + "Exit": User cancelled login + """ + global tokens + + # Prompt for credentials + user_name = questionary.text("login:", default=DEFAULT_USER_NAME).ask() + password = questionary.password("password:", default=DEFAULT_PASSWORD).ask() + + if not (user_name and password): + return "Exit" + + # Step 1: Login to IAM + try: + client = get_httpx_client() + client.base_url = MICROSERVICES["IAM"]["base_url"] + response = client.post( + MICROSERVICES["IAM"]["endpoints"]["login"].format(**{**globals(),**locals()}), + json={"username": user_name, "password": password}, + timeout=20 + ) + response.raise_for_status() + master_token = response.json()["access_token"] + user_id = response.json()["userId"] + tokens["IAM"] = { + "access_token": master_token, + "refresh_token": response.json()["refresh_token"] + } + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + print(f"Login Error: {exc}") + logging.warning(f"Login Error: {exc}") + return "Error" + + # Step 2: Configure tokens for each microservice + for app_name, app_config in MICROSERVICES.items(): + if app_name == "IAM": + continue # IAM doesn't need config-token + + try: + client = get_httpx_client() + client.base_url = app_config["base_url"] + response = client.post( + app_config["endpoints"]["config_token"].format(**{**globals(),**locals()}), + headers={"Authorization": f"Bearer {master_token}"}, + json={ + "userId": user_id, + "clientId": app_config["app_id"], + "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36" + }, + timeout=20 + ) + response.raise_for_status() + tokens[app_name] = { + "access_token": response.json()["access_token"], + "refresh_token": response.json()["refresh_token"] + } + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + print(f"Config-token Error for {app_name}: {exc}") + logging.warning(f"Config-token Error for {app_name}: {exc}") + return "Error" + + print("\nLogin Success") + return "Success" + + +def new_token(app): + """ + Refresh access token for a specific microservice. + + Uses refresh_token to obtain new access_token and refresh_token. + Thread-safe with lock to prevent concurrent refresh attempts. + + Args: + app: Microservice name (e.g., "RC", "GDD") + + Raises: + httpx.RequestError: If refresh fails after all retries + """ + global tokens + + with _token_refresh_lock: + for attempt in range(ERROR_MAX_RETRY): + try: + client = get_httpx_client() + client.base_url = MICROSERVICES[app]["base_url"] + response = client.post( + MICROSERVICES[app]["endpoints"]["refresh"].format(**{**globals(),**locals()}), + headers={"Authorization": f"Bearer {tokens[app]['access_token']}"}, + json={"refresh_token": tokens[app]["refresh_token"]}, + timeout=20 + ) + response.raise_for_status() + tokens[app]["access_token"] = response.json()["access_token"] + tokens[app]["refresh_token"] = response.json()["refresh_token"] + return + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + logging.warning(f"Refresh Token Error for {app} (Attempt {attempt + 1}): {exc}") + if attempt < ERROR_MAX_RETRY - 1: + sleep(WAIT_BEFORE_RETRY) + + logging.critical(f"Persistent error in refresh_token for {app}") + raise httpx.RequestError(message=f"Persistent error in refresh_token for {app}") + + +# ============================================================================ +# DECORATORS +# ============================================================================ + +def api_call_with_retry(func): + """Decorator for API calls with automatic retry and token refresh on 401 errors""" + @functools.wraps(func) + def wrapper(*args, **kwargs): + func_name = func.__name__ + total_attempts = 0 + batch_count = 1 + + while True: + for attempt in range(ERROR_MAX_RETRY): + total_attempts += 1 + try: + return func(*args, **kwargs) + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + logging.warning(f"Error in {func_name} (Attempt {total_attempts}): {exc}") + + # Refresh the thread-local client if an error occurs + # to avoid potential pool corruption or stale connections + clear_httpx_client() + + if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: + logging.info(f"Token expired for {func_name}. Refreshing token.") + new_token() + + if attempt < ERROR_MAX_RETRY - 1: + sleep(WAIT_BEFORE_RETRY) + else: + # Max retries reached for this batch + if batch_count < MAX_BATCHS_OF_RETRIES: + logging.warning(f"Batch {batch_count}/{MAX_BATCHS_OF_RETRIES} failed for {func_name}. " + f"Waiting {WAIT_BEFORE_NEW_BATCH_OF_RETRIES}s before automatic retry batch.") + batch_count += 1 + sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES) + break # Exit for loop to restart batch in while True + else: + # All automatic batches exhausted, ask the user + with _user_interaction_lock: + console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]") + console.print(f"[red]Exception: {exc}[/red]") + + choice = questionary.select( + f"What would you like to do for {func_name}?", + choices=[ + "Retry (try another batch of retries)", + "Ignore (return None and continue)", + "Stop script (critical error)" + ] + ).ask() + + if choice == "Retry (try another batch of retries)": + logging.info(f"User chose to retry {func_name}. Restarting batch sequence.") + batch_count = 1 # Reset batch counter for the next interactive round + break # Exit for loop to restart batch in while True + elif choice == "Ignore (return None and continue)": + # Retrieve context if available + ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"}) + logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}") + return None + else: + logging.critical(f"User chose to stop script after persistent error in {func_name}.") + raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)") + + return wrapper + + +# ============================================================================ +# API TEMPLATES +# ============================================================================ +# Templates for common API patterns. Duplicate and modify for your needs. +# Remember to: +# - Choose appropriate HTTP method (GET/POST/PUT/DELETE) +# - Update endpoint from MICROSERVICES dict +# - Adjust timeout if needed (global API_TIMEOUT or specific value) +# - Always return response.json() + +@api_call_with_retry("RC") +def get_all_organizations(): + """ + Example API call using GET method. + + Returns: + List of organization dictionaries + """ + client = get_httpx_client() + client.base_url = MICROSERVICES["RC"]["base_url"].format(**{**globals(),**locals()}) + response = client.get( + MICROSERVICES["RC"]["endpoints"]["organizations"], + headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"}, + timeout=API_TIMEOUT + ) + response.raise_for_status() + return response.json() + + +@api_call_with_retry("RC") +def search_inclusions(organization_id, limit, page): + """ + Example API call using POST method with query params and JSON body. + + Args: + organization_id: Organization UUID + limit: Max results per page + page: Page number (1-based) + + Returns: + Dict with "data" key containing list of inclusions + """ + client = get_httpx_client() + client.base_url = MICROSERVICES["RC"]["base_url"].format(**{**globals(),**locals()}) + response = client.post( + f"{MICROSERVICES['RC']['endpoints']['search_inclusions']}?limit={limit}&page={page}", + headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"}, + json={ + "protocolId": "3c7bcb4d-91ed-4e9f-b93f-99d8447a276e", # TODO: Configure if needed + "center": organization_id, + "keywords": "" + }, + timeout=API_TIMEOUT + ) + response.raise_for_status() + return response.json() + + +# ============================================================================ +# MAIN PROCESSING +# ============================================================================ + +def main(): + """ + Main processing function. + + Structure: + 1. Authentication + 2. Configuration (thread count) + 3. Initialization (thread pools, timing) + 4. Main processing block (TODO: implement your logic here) + 5. Finalization (elapsed time) + """ + global main_thread_pool, subtasks_thread_pool, thread_local_storage + + # ========== AUTHENTICATION ========== + print() + login_status = login() + while login_status == "Error": + login_status = login() + if login_status == "Exit": + return + + # ========== CONFIGURATION ========== + print() + number_of_threads = int( + questionary.text( + "Number of threads:", + default="12", + validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS + ).ask() + ) + + # ========== INITIALIZATION ========== + start_time = perf_counter() + + # Initialize thread pools + main_thread_pool = ThreadPoolExecutor(max_workers=number_of_threads) + subtasks_thread_pool = ThreadPoolExecutor(max_workers=SUBTASKS_POOL_SIZE) + + # ========== MAIN PROCESSING BLOCK ========== + print() + console.print("[bold cyan]Starting main processing...[/bold cyan]") + + # TODO: IMPLEMENT YOUR PROCESSING LOGIC HERE + # + # Example pattern with progress bar and multithreading: + # + # items = [...] # Your data to process + # futures = [] + # + # with tqdm(total=len(items), desc="Processing items", + # bar_format=custom_bar_format) as pbar: + # with main_thread_pool as executor: + # + # for item in items: + # + # # Set thread-local context for detailed error logging in decorators + # ctx = {"id": patient_id, "pseudo": pseudo} + # thread_local_storage.current_patient_context = ctx + # + # futures.append(executor.submit(run_with_context, process_item, ctx, item)) + # + # for future in as_completed(futures): + # try: + # result = future.result() + # # Process result here + # pbar.update(1) + # except Exception as exc: + # logging.critical(f"Error in worker: {exc}", exc_info=True) + # print(f"\nCRITICAL ERROR: {exc}") + # executor.shutdown(wait=False, cancel_futures=True) + # raise + # + # Example: Simple test to verify authentication works + # organizations = get_all_organizations() + # console.print(f"[green]Retrieved {len(organizations)} organizations[/green]") + + # ========== FINALIZATION ========== + print() + print(f"Elapsed time: {str(timedelta(seconds=perf_counter() - start_time))}") + + +# ============================================================================ +# ENTRY POINT +# ============================================================================ + +if __name__ == '__main__': + # ========== LOGGING CONFIGURATION ========== + # Auto-generate log filename based on script name + script_name = os.path.splitext(os.path.basename(__file__))[0] + log_file_name = f"{script_name}.log" + + logging.basicConfig( + level=LOG_LEVEL, + format=LOG_FORMAT, + filename=log_file_name, + filemode='w' + ) + + # ========== MAIN EXECUTION ========== + try: + main() + except Exception as e: + logging.critical(f"Script terminated with exception: {e}", exc_info=True) + print(f"\nScript stopped due to error: {e}") + print(traceback.format_exc()) + finally: + # ========== CLEANUP ========== + # Shutdown thread pools gracefully + if 'main_thread_pool' in globals() and main_thread_pool: + main_thread_pool.shutdown(wait=False, cancel_futures=True) + if 'subtasks_thread_pool' in globals() and subtasks_thread_pool: + subtasks_thread_pool.shutdown(wait=False, cancel_futures=True) + + # Pause before exit (prevents console from closing immediately when launched from Windows Explorer) + print('\n') + input("Press Enter to exit...")