""" Get All Users Script Retrieves all users from Ziwig Connect (IAM) and their associated Professional details (HRD). Output: JSON file with user and professional details. Based on Endobest Script Template. """ import json import logging import os import sys import threading import traceback from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import timedelta from time import perf_counter, sleep import functools import httpx import questionary from tqdm import tqdm from rich.console import Console # ============================================================================ # CONFIGURATION - CREDENTIALS # ============================================================================ ENV = "pre-prod" # "prod" or "pre-prod" if "prod" in sys.argv: ENV = "prod" sys.argv.remove("prod") elif "pre-prod" in sys.argv: ENV = "pre-prod" sys.argv.remove("pre-prod") DEFAULT_USER_NAME = "admin" if ENV == "prod" else "admin" DEFAULT_PASSWORD = "+J3/rw..'ynxXDHwt?bAvn_>" if ENV == "prod" else "123@Admin" REALME = "ziwig-pro" if ENV == "prod" else "ziwig-pro" # ============================================================================ # CONFIGURATION - MICROSERVICES # ============================================================================ # Comment out unused microservices to skip their token configuration MICROSERVICES = { "IAM": { "app_id": None, # IAM doesn't use app_id "base_url": "https://api-auth.ziwig-connect.com" if ENV == "prod" else "https://api-iam.pp.ziwig-platform.com", # PRE-PROD "endpoints": { "login": "/api/auth/{REALME}/login", # POST : Body = {"username": "{user_name}", "password": "{pass}"} "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} "get_roles": "/api/profiles/paginate", # POST : Body = {"limit": 100, "currentPage": 1, "sort": [], "filters": {}} "get_user_by_id": "/api/users/find/{user_id}?domaine={REALME}", # GET "get_applications": "/api/applications", # GET "get_profiles_by_app_id": "/api/identity-profiles/paginate", # POST : Body = {"page":null,"limit":100,"search":{},"clientId":"{app_id}","type":"user"} "get_users_by_profile_id": "/api/identity-profiles/{profile_id}/users", # GET } }, # "RC": { # "app_id": "602aea51-cdb2-4f73-ac99-fd84050dc393", # "base_url": "https://api-hcp.ziwig-connect.com", # "endpoints": { # "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}} # "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} # "organizations": "/api/inclusions/getAllOrganizations", # GET # "statistics": "/api/inclusions/inclusion-statistics", # POST : Body = {"center": "rc_endobest_current_center}}", "protocolId": "{rc_endobest_prot_id}", "excludedCenters": {rc_endobest_excl_centers}} # "search_inclusions": "/api/inclusions/search?limit={limit}&page={page}", # POST : Body = {"protocolId": "3c7bcb4d-91ed-4e9f-b93f-99d8447a276e", "center": organization_id, "keywords": ""} # "record_by_patient": "/api/records/byPatient", # POST : Body = {"center": "{rc_endobest_current_center}", "patientId": "{patient_id}", "mode": "exchange", "state": "ongoing", "includeEndoParcour": false, "sourceClient": "pro_prm"}, # "surveys": "/api/surveys/filter/with-answers", #POST : Body = {"context": "clinic_research", "subject": "{patient_id}", "blockedQcmVersions": {blocked_qcm_versions}} # } # }, # "GDD": { # "app_id": "4f5ac063-6a22-4e2c-bda5-b50c0dddab79", # "base_url": "https://api-lab.ziwig-connect.com", # "endpoints": { # "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}} # "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} # "request_by_tube": "/api/requests/by-tube-id/{tube_id}", # GET # } # }, "HRD": { "app_id": "93bc44fd-c64b-4fff-a450-f3cba956e934" if ENV == "prod" else "234d72f1-509a-4e99-b2b9-ee12da1e336d", "base_url": "https://api-resources.ziwig-connect.com" if ENV == "prod" else "https://api-hrd.pp.ziwig-platform.com", "endpoints": { "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}} "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} "pro_by_id": "/api/entity-manager/meta/{model}/data/nodes/pro/{pro_id}?relationships=2", # GET - Note: added leading slash "get_pros_by_endobest_center": "/api/entity-manager/meta/modele_fr/data/orga/{organization_id}/centers/pros?limit=1000", # GET } }, } # ============================================================================ # CONFIGURATION - THREADING # ============================================================================ MAX_THREADS = 20 # Maximum threads for main pool SUBTASKS_POOL_SIZE = 40 # Fixed size for subtasks pool # ============================================================================ # CONFIGURATION - RETRY & TIMEOUTS # ============================================================================ ERROR_MAX_RETRY = 10 # Max retry attempts for API calls WAIT_BEFORE_RETRY = 0.5 # Delay in seconds between retries (fixed) API_TIMEOUT = 30 # Default timeout for API calls (seconds) # ============================================================================ # CONFIGURATION - LOGGING # ============================================================================ LOG_LEVEL = logging.WARNING # Change to DEBUG for detailed logs LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' # LOG_FILE_NAME auto-generated based on script name in __main__ # ============================================================================ # CONFIGURATION - PROGRESS BARS # ============================================================================ BAR_N_FMT_WIDTH = 4 BAR_TOTAL_FMT_WIDTH = 4 BAR_TIME_WIDTH = 8 BAR_RATE_WIDTH = 10 custom_bar_format = ("{l_bar}{bar}" f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} " f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, " f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}") # ============================================================================ # CONFIGURATION - OUTPUT # ============================================================================ OUTPUT_FILENAME = "all_users_data.json" FILENAME_ENDOBEST_CENTERS_INPUT = "endobest_organizations.json" FILENAME_ENDOBEST_CENTERS_OUTPUT = "professionals_by_endobest_center.json" # ============================================================================ # GLOBAL VARIABLES # ============================================================================ # Tokens storage: {app_name: {"access_token": ..., "refresh_token": ...}} tokens = {} # Thread-safe HTTP client pool (one client per thread) httpx_clients = {} # Thread management threads_list = [] _threads_list_lock = threading.Lock() _token_refresh_lock = threading.Lock() # Thread pools (initialized in main()) main_thread_pool = None subtasks_thread_pool = None # Rich console for formatted output console = Console() # ============================================================================ # UTILITIES # ============================================================================ def get_nested_value(data_structure, path, default=None): """ Extract value from nested dict/list structures with wildcard support. """ if data_structure is None: return "$$$$ No Data" if not path: return default # Handle wildcard in path if "*" in path: wildcard_index = path.index("*") path_before = path[:wildcard_index] path_after = path[wildcard_index+1:] # Helper for non-wildcard path resolution def _get_simple_nested_value(ds, p, d): cl = ds for k in p: if isinstance(cl, dict): cl = cl.get(k) elif isinstance(cl, list): try: if isinstance(k, int) and -len(cl) <= k < len(cl): cl = cl[k] else: return d except (IndexError, TypeError): return d else: return d if cl is None: return d return cl base_level = _get_simple_nested_value(data_structure, path_before, default) if not isinstance(base_level, list): return default results = [] for item in base_level: value = get_nested_value(item, path_after, default) if value is not default and value != "$$$$ No Data": results.append(value) # Flatten one level for multiple wildcards final_results = [] for res in results: if isinstance(res, list): final_results.extend(res) else: final_results.append(res) return final_results # No wildcard - standard traversal current_level = data_structure for key_or_index in path: if isinstance(current_level, dict): current_level = current_level.get(key_or_index) if current_level is None: return default elif isinstance(current_level, list): try: if isinstance(key_or_index, int) and -len(current_level) <= key_or_index < len(current_level): current_level = current_level[key_or_index] else: return default except (IndexError, TypeError): return default else: return default return current_level def get_httpx_client() -> httpx.Client: """ Get or create thread-local HTTP client with keep-alive enabled. """ global httpx_clients thread_id = threading.get_ident() if thread_id not in httpx_clients: httpx_clients[thread_id] = httpx.Client( headers={"Connection": "keep-alive"}, limits=httpx.Limits(max_keepalive_connections=20, max_connections=100) ) return httpx_clients[thread_id] def get_thread_position(): """ Get position of current thread in threads list. """ global threads_list thread_id = threading.get_ident() with _threads_list_lock: if thread_id not in threads_list: threads_list.append(thread_id) return len(threads_list) - 1 else: return threads_list.index(thread_id) # ============================================================================ # AUTHENTICATION # ============================================================================ def login(): """ Authenticate with IAM and configure tokens for all microservices. """ global tokens # Prompt for credentials user_name = questionary.text("login:", default=DEFAULT_USER_NAME).ask() password = questionary.password("password:", default=DEFAULT_PASSWORD).ask() if not (user_name and password): return "Exit" # Step 1: Login to IAM try: client = get_httpx_client() client.base_url = MICROSERVICES["IAM"]["base_url"] response = client.post( MICROSERVICES["IAM"]["endpoints"]["login"].format(**{**globals(),**locals()}), json={"username": user_name, "password": password}, timeout=20 ) response.raise_for_status() master_token = response.json()["access_token"] user_id = response.json()["userId"] tokens["IAM"] = {"access_token": master_token, "refresh_token": response.json()["refresh_token"]} except (httpx.RequestError, httpx.HTTPStatusError) as exc: print(f"Login Error: {exc}") logging.warning(f"Login Error: {exc}") return "Error" # Step 2: Configure tokens for each microservice for app_name, app_config in MICROSERVICES.items(): if app_name == "IAM": continue # IAM doesn't need config-token try: client = get_httpx_client() client.base_url = app_config["base_url"] response = client.post( app_config["endpoints"]["config_token"].format(**{**globals(),**locals()}), headers={"Authorization": f"Bearer {master_token}"}, json={ "userId": user_id, "clientId": app_config["app_id"], "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36" }, timeout=20 ) response.raise_for_status() tokens[app_name] = { "access_token": response.json()["access_token"], "refresh_token": response.json()["refresh_token"] } except (httpx.RequestError, httpx.HTTPStatusError) as exc: print(f"Config-token Error for {app_name}: {exc}") logging.warning(f"Config-token Error for {app_name}: {exc}") return "Error" print("\nLogin Success") return "Success" def new_token(app): """ Refresh access token for a specific microservice. """ global tokens with _token_refresh_lock: for attempt in range(ERROR_MAX_RETRY): try: client = get_httpx_client() client.base_url = MICROSERVICES[app]["base_url"] response = client.post( MICROSERVICES[app]["endpoints"]["refresh"].format(**{**globals(),**locals()}), headers={"Authorization": f"Bearer {tokens[app]['access_token']}"}, json={"refresh_token": tokens[app]["refresh_token"]}, timeout=20 ) response.raise_for_status() tokens[app]["access_token"] = response.json()["access_token"] tokens[app]["refresh_token"] = response.json()["refresh_token"] return except (httpx.RequestError, httpx.HTTPStatusError) as exc: logging.warning(f"Refresh Token Error for {app} (Attempt {attempt + 1}): {exc}") if attempt < ERROR_MAX_RETRY - 1: sleep(WAIT_BEFORE_RETRY) logging.critical(f"Persistent error in refresh_token for {app}") raise httpx.RequestError(message=f"Persistent error in refresh_token for {app}") # ============================================================================ # DECORATORS # ============================================================================ def api_call_with_retry(app): """ Decorator for API calls with automatic retry and token refresh on 401. """ def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): func_name = func.__name__ for attempt in range(ERROR_MAX_RETRY): try: return func(*args, **kwargs) except (httpx.RequestError, httpx.HTTPStatusError) as exc: logging.warning(f"Error in {func_name} (Attempt {attempt + 1}/{ERROR_MAX_RETRY}): {exc}") # Auto-refresh token on 401 if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: logging.info(f"Token expired for {func_name}. Refreshing token for {app}.") new_token(app) if attempt < ERROR_MAX_RETRY - 1: sleep(WAIT_BEFORE_RETRY) logging.critical(f"Persistent error in {func_name} after {ERROR_MAX_RETRY} attempts.") raise httpx.RequestError(message=f"Persistent error in {func_name}") return wrapper return decorator # ============================================================================ # API CALLS # ============================================================================ @api_call_with_retry("IAM") def get_roles(): """ Get all roles from IAM. """ client = get_httpx_client() client.base_url = MICROSERVICES["IAM"]["base_url"] response = client.post( MICROSERVICES["IAM"]["endpoints"]["get_roles"], headers={"Authorization": f"Bearer {tokens['IAM']['access_token']}"}, json={"limit": 100, "currentPage": 1, "sort": [], "filters": {}}, timeout=API_TIMEOUT ) response.raise_for_status() return response.json() @api_call_with_retry("IAM") def get_user_by_id(user_id): """ Get user details by ID from IAM. """ client = get_httpx_client() client.base_url = MICROSERVICES["IAM"]["base_url"] response = client.get( MICROSERVICES["IAM"]["endpoints"]["get_user_by_id"].format(user_id=user_id, REALME=REALME), headers={"Authorization": f"Bearer {tokens['IAM']['access_token']}"}, timeout=API_TIMEOUT ) response.raise_for_status() return response.json() @api_call_with_retry("HRD") def get_professional_by_id(model, pro_id): """ Get professional details by ID from HRD. """ client = get_httpx_client() client.base_url = MICROSERVICES["HRD"]["base_url"] response = client.get( MICROSERVICES["HRD"]["endpoints"]["pro_by_id"].format(model=model, pro_id=pro_id), headers={"Authorization": f"Bearer {tokens['HRD']['access_token']}"}, timeout=API_TIMEOUT ) response.raise_for_status() return response.json() @api_call_with_retry("IAM") def get_applications(): """ Get all applications from IAM. """ client = get_httpx_client() client.base_url = MICROSERVICES["IAM"]["base_url"] response = client.get( MICROSERVICES["IAM"]["endpoints"]["get_applications"], headers={"Authorization": f"Bearer {tokens['IAM']['access_token']}"}, timeout=API_TIMEOUT ) response.raise_for_status() return response.json() @api_call_with_retry("IAM") def get_profiles_by_app_id(app_id): """ Get profiles for a specific application ID. """ client = get_httpx_client() client.base_url = MICROSERVICES["IAM"]["base_url"] # Body payload as per specs payload = { "page": None, "limit": 100, "search": {}, "clientId": app_id, "type": "user" } response = client.post( MICROSERVICES["IAM"]["endpoints"]["get_profiles_by_app_id"], headers={"Authorization": f"Bearer {tokens['IAM']['access_token']}"}, json=payload, timeout=API_TIMEOUT ) response.raise_for_status() return response.json() @api_call_with_retry("IAM") def get_users_by_profile_id(profile_id): """ Get users associated with a specific profile ID. """ client = get_httpx_client() client.base_url = MICROSERVICES["IAM"]["base_url"] response = client.get( MICROSERVICES["IAM"]["endpoints"]["get_users_by_profile_id"].format(profile_id=profile_id), headers={"Authorization": f"Bearer {tokens['IAM']['access_token']}"}, timeout=API_TIMEOUT ) response.raise_for_status() return response.json() @api_call_with_retry("HRD") def get_pros_by_endobest_center(organization_id): """ Get professionals for a specific Endobest center. """ client = get_httpx_client() client.base_url = MICROSERVICES["HRD"]["base_url"] response = client.get( MICROSERVICES["HRD"]["endpoints"]["get_pros_by_endobest_center"].format(organization_id=organization_id), headers={"Authorization": f"Bearer {tokens['HRD']['access_token']}"}, timeout=API_TIMEOUT ) response.raise_for_status() return response.json() # ============================================================================ # WORKER FUNCTIONS # ============================================================================ def process_user(user_id, output_data, output_lock, pbar_pros, pbar_lock): """ Process a single user: fetch details, update output, and trigger pro fetch if needed. """ try: # Fetch user details user_data = get_user_by_id(user_id) # Update output with user data with output_lock: if user_id in output_data: output_data[user_id]["user"] = user_data else: # Should not happen if initialized correctly, but safe fallback output_data[user_id] = {"roles": [], "user": user_data, "professional": {}} # Extract professional info # Path: professional.data.graph -> model # Path: hrdProId -> pro_id model = get_nested_value(user_data, ["professional", "data", "graph"], "modele_fr") pro_id = get_nested_value(user_data, ["hrdProId"]) if pro_id and pro_id != "$$$$ No Data" and model and model != "$$$$ No Data": # Submit professional task subtasks_thread_pool.submit( process_professional, user_id, model, pro_id, output_data, output_lock, pbar_pros, pbar_lock ) else: # No professional data to fetch, update pbar_pros immediately with pbar_lock: pbar_pros.update(1) except Exception as e: logging.error(f"Error processing user {user_id}: {e}") # Ensure pbar_pros is updated even on error to avoid hanging with pbar_lock: pbar_pros.update(1) def process_professional(user_id, model, pro_id, output_data, output_lock, pbar_pros, pbar_lock): """ Process a professional: fetch details and update output. """ try: pro_data = get_professional_by_id(model, pro_id) with output_lock: if user_id in output_data: output_data[user_id]["professional"] = pro_data except Exception as e: logging.error(f"Error processing professional {pro_id} for user {user_id}: {e}") finally: with pbar_lock: pbar_pros.update(1) # ============================================================================ # MAIN PROCESSING # ============================================================================ def process_user_list(output_data, context_name, output_filename_suffix=""): """ Execute the multithreaded processing for a given dictionary of users. """ global main_thread_pool, subtasks_thread_pool total_users = len(output_data) if total_users == 0: console.print(f"[yellow]No users found for {context_name}. Skipping.[/yellow]") return console.print(f"[bold]Processing {total_users} users for {context_name}...[/bold]") # Create progress bars # Index 0 for users, Index 1 for professionals # We must ensure pbar_pros is managed correctly pbar_lock = threading.Lock() output_lock = threading.Lock() # Note: We create new bars for each run to avoid state issues pbar_users = tqdm(total=total_users, unit="users", desc="Users ", position=0, bar_format=custom_bar_format) pbar_pros = tqdm(total=total_users, unit="pros.", desc="Professionals", position=1, bar_format=custom_bar_format) futures = [] # Submit main user tasks for user_id in output_data.keys(): futures.append( main_thread_pool.submit( process_user, user_id, output_data, output_lock, pbar_pros, pbar_lock ) ) # Wait for all user tasks for future in as_completed(futures): try: future.result() pbar_users.update(1) except Exception as e: logging.error(f"Task error in {context_name}: {e}") pbar_users.update(1) pbar_users.close() # Move pbar_pros up with pbar_lock: pbar_pros.clear() pbar_pros.pos = 0 pbar_pros.refresh() subtasks_thread_pool.shutdown(wait=True) # Re-initialize for next run # Note: Global variable update init_subtasks_pool() pbar_pros.close() # Sort and Save console.print(f"Exporting data to {OUTPUT_FILENAME.replace('.json', output_filename_suffix + '.json')}...") final_output = [{"user_id": k, **v} for k, v in output_data.items()] final_output.sort(key=lambda x: ( str(x.get("user", {}).get("lastname", "")).lower(), str(x.get("user", {}).get("firstname", "")).lower(), str(x.get("user_id", "")) )) filename = OUTPUT_FILENAME if output_filename_suffix: filename = filename.replace(".json", f"{output_filename_suffix}.json") with open(filename, 'w', encoding='utf-8') as f: json.dump(final_output, f, indent=4, ensure_ascii=False) console.print(f"[green]Export complete. {len(final_output)} records saved to {filename}.[/green]") print() def process_endobest_centers(): """ Phase 3: Process Endobest Centers from input JSON file. Sequential processing (no thread pool). """ print() console.print("==================================================") console.print("[bold cyan]PHASE 3: Processing Endobest Centers[/bold cyan]") console.print("==================================================") # 1. Load Input File try: with open(FILENAME_ENDOBEST_CENTERS_INPUT, 'r', encoding='utf-8') as f: centers_data = json.load(f) except FileNotFoundError: console.print(f"[yellow]Input file '{FILENAME_ENDOBEST_CENTERS_INPUT}' not found. Skipping Phase 3.[/yellow]") return except json.JSONDecodeError as e: console.print(f"[red]Error decoding '{FILENAME_ENDOBEST_CENTERS_INPUT}': {e}. Skipping Phase 3.[/red]") return # Filter out entries that might not be objects or missing basic data if necessary, # but spec implies trusting the array. We'll iterate what we have. if not isinstance(centers_data, list): console.print(f"[red]Input file content is not a list. Skipping Phase 3.[/red]") return total_centers = len(centers_data) console.print(f"Processing {total_centers} centers...") # 2. Progress Bar pbar = tqdm(total=total_centers, unit="centers", desc="Centers ", bar_format=custom_bar_format) # 3. Iterate & Process for center in centers_data: center_id = center.get("id") if not center_id: pbar.update(1) continue try: response_json = get_pros_by_endobest_center(center_id) pros_list = response_json.get("data", []) if not isinstance(pros_list, list): pros_list = [] # Sort Pros: nom_exercice, prenom_exercice, id # Using get_nested_value safely pros_list.sort(key=lambda x: ( str(get_nested_value(x, ["properties", "nom_exercice"], default="")).lower(), str(get_nested_value(x, ["properties", "prenom_exercice"], default="")).lower(), str(get_nested_value(x, ["metadata", "id"], default="")) )) center["pros"] = pros_list except Exception as e: logging.error(f"Error processing center {center_id}: {e}") finally: pbar.update(1) pbar.close() # 4. Sort Centers # Center_Name, name, id centers_data.sort(key=lambda x: ( str(x.get("Center_Name", "")).lower(), str(x.get("name", "")).lower(), str(x.get("id", "")) )) # 5. Save Output console.print(f"Exporting data to {FILENAME_ENDOBEST_CENTERS_OUTPUT}...") try: with open(FILENAME_ENDOBEST_CENTERS_OUTPUT, 'w', encoding='utf-8') as f: json.dump(centers_data, f, indent=4, ensure_ascii=False) console.print(f"[green]Export complete. {len(centers_data)} centers saved to {FILENAME_ENDOBEST_CENTERS_OUTPUT}.[/green]") except Exception as e: console.print(f"[red]Error saving output: {e}[/red]") print() def init_subtasks_pool(): global subtasks_thread_pool, number_of_threads # Ensure we have a thread count, fallback to 10 if not set (should not happen) count = globals().get('number_of_threads', 10) subtasks_thread_pool = ThreadPoolExecutor(max_workers=count) def main(): """ Main processing function. """ global main_thread_pool, subtasks_thread_pool, number_of_threads phase1 = True phase2 = True phase3 = True if len(sys.argv) > 1: if "1" not in sys.argv: phase1 = False if "2" not in sys.argv: phase2 = False if "3" not in sys.argv: phase3 = False # ========== AUTHENTICATION ========== print() login_status = login() while login_status == "Error": login_status = login() if login_status == "Exit": return # ========== CONFIGURATION ========== print() number_of_threads = int( questionary.text( "Number of threads:", default="5", validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS ).ask() ) # ========== INITIALIZATION ========== start_time = perf_counter() # Initialize thread pools main_thread_pool = ThreadPoolExecutor(max_workers=number_of_threads) init_subtasks_pool() # ========== PHASE 1: ROLES ========== if phase1: print() console.print("==================================================") console.print("[bold cyan]PHASE 1: Processing Roles[/bold cyan]") console.print("==================================================") console.print("Fetching roles...") roles_response = get_roles() roles_data = roles_response.get("data", []) output_data_roles = {} console.print("Initializing user list from roles...") for role in roles_data: role_info = {"id": role.get("id"), "name": role.get("name")} users = role.get("users", []) for user_id in users: if user_id not in output_data_roles: output_data_roles[user_id] = { "roles": [role_info], "user": {}, "professional": {} } else: existing_role_ids = [r["id"] for r in output_data_roles[user_id]["roles"]] if role_info["id"] not in existing_role_ids: output_data_roles[user_id]["roles"].append(role_info) process_user_list(output_data_roles, "Roles") # ========== PHASE 2: APPLICATIONS ========== if phase2: print() console.print("==================================================") console.print("[bold cyan]PHASE 2: Processing Applications[/bold cyan]") console.print("==================================================") console.print("Fetching applications...") apps_response = get_applications() # apps_response is a list of dicts for app in apps_response: app_name = app.get("label") or app.get("name") or "Unknown" client_id = app.get("clientId") if not client_id: continue print() console.print(f"[bold magenta]--- Application: {app_name} ---[/bold magenta]") console.print(f"Fetching profiles for {app_name}...") profiles_response = get_profiles_by_app_id(client_id) profiles_data = profiles_response.get("data", []) output_data_app = {} console.print(f"Initializing user list for {app_name}...") for profile in profiles_data: profile_info = {"id": profile.get("id"), "name": profile.get("name")} profile_id = profile.get("id") # Fetch users for this profile # Note: The API returns a list of user IDs directly? # Spec check: "get_users_by_profile_id : la réponse est un simple array de user_id" try: users_list = get_users_by_profile_id(profile_id) # Ensure it's a list if not isinstance(users_list, list): users_list = [] except Exception as e: logging.error(f"Error fetching users for profile {profile_id}: {e}") users_list = [] for user_id in users_list: if user_id not in output_data_app: output_data_app[user_id] = { "profiles": [profile_info], "user": {}, "professional": {} } else: existing_profile_ids = [p["id"] for p in output_data_app[user_id]["profiles"]] if profile_info["id"] not in existing_profile_ids: output_data_app[user_id]["profiles"].append(profile_info) # Process this application's users # Sanitize app_name for filename safe_app_name = "".join([c if c.isalnum() else "_" for c in app_name]) process_user_list(output_data_app, f"Application {app_name}", f"_{safe_app_name}") # ========== PHASE 3: ENDOBEST CENTERS ========== if phase3: process_endobest_centers() # ========== FINALIZATION ========== print() console.print("[bold green]All processing complete.[/bold green]") print(f"Total Elapsed time: {str(timedelta(seconds=perf_counter() - start_time))}") # ============================================================================ # ENTRY POINT # ============================================================================ if __name__ == '__main__': # ========== LOGGING CONFIGURATION ========== # Auto-generate log filename based on script name script_name = os.path.splitext(os.path.basename(__file__))[0] log_file_name = f"{script_name}.log" logging.basicConfig( level=LOG_LEVEL, format=LOG_FORMAT, filename=log_file_name, filemode='w' ) # ========== MAIN EXECUTION ========== try: main() except Exception as e: logging.critical(f"Script terminated with exception: {e}", exc_info=True) print(f"\nScript stopped due to error: {e}") print(traceback.format_exc()) finally: # ========== CLEANUP ========== # Shutdown thread pools gracefully if 'main_thread_pool' in globals() and main_thread_pool: main_thread_pool.shutdown(wait=False, cancel_futures=True) if 'subtasks_thread_pool' in globals() and subtasks_thread_pool: subtasks_thread_pool.shutdown(wait=False, cancel_futures=True) # Pause before exit (prevents console from closing immediately when launched from Windows Explorer) print('\n') input("Press Enter to exit...")