commit 18be4d9000bf5700fbc90e618584759162bf093f Author: Abdelkouddous LHACHIMI Date: Fri Dec 12 22:40:54 2025 +0100 Version fonctionnelle diff --git a/Specs.md b/Specs.md new file mode 100644 index 0000000..c916a77 --- /dev/null +++ b/Specs.md @@ -0,0 +1,107 @@ +### Objectifs +* Créer un script qui permet de récupérer tous les utilisateurs de la plateforme Ziwig Connect avec le détail de l'utilisateur ainsi que celui du Professionnel qui lui est associé +* Le microservice à Utiliser pour les données utilisateur est IAM +* Le microservice à utiliser pour les données du professionnel est HRD + +### Environnement de Développement +* Python 3.12 +* Un seul module + +### Template +* Utiliser le template à partir de EB_Script_Template et bien lire la documentation qui s'y trouve + + + +### Logique du Script + +Le script s'exécute en deux phases principales séquentielles : + +**Phase 1 : Traitement par Rôles** +1. Récupérer tous les Rôles de l'IAM (Limit 100, pas de pagination nécessaire). +2. Initialiser un dictionnaire `output` par rôles : + * La réponse contient une liste de rôles, chacun avec un tableau d'utilisateurs. + * Pour chaque utilisateur trouvé, créer ou mettre à jour une entrée : `user_id: {roles: [{id, name}], user: {}, professional: {}}`. +3. Lancer le traitement multithreadé (voir section Multithreading) sur ces utilisateurs. +4. Sauvegarder le résultat dans `all_users_data.json`. + +**Phase 2 : Traitement par Applications** +1. Récupérer la liste des applications via `get_applications`. +2. Pour chaque application trouvée : + * Afficher une séparation claire dans la console. + * Récupérer les profils associés via `get_profiles_by_app_id` (avec `clientId`). + * Initialiser un *nouveau* dictionnaire `output` pour cette application : + * Pour chaque profil, récupérer les utilisateurs via `get_users_by_profile_id`. + * Pour chaque utilisateur trouvé, créer ou mettre à jour une entrée : `user_id: {profiles: [{id, name}], user: {}, professional: {}}`. + * *(Note : La clé est `profiles` ici, au lieu de `roles`)*. + * Lancer le même traitement multithreadé sur ces utilisateurs. + * Sauvegarder le résultat dans un fichier spécifique : `all_users_data_{app_name}.json`. + +**Phase 3 : Traitement par Centres Endobest** +1. **Configuration** : + * Fichier input : `endobest_organizations.json` (Constante). + * Fichier output : `professionals_by_endobest_center.json` (Constante). +2. **Logique** : + * Charger la liste des organisations depuis le fichier JSON input (champs requis: `id`, `name`, `Center_Name`). + * Initialiser une barre de progression basée sur le nombre de centres. + * **Itération (Séquentielle, pas de thread pool)** : + * Pour chaque centre : + * Appeler l'API `get_pros_by_endobest_center`. + * Récupérer le tableau de professionnels depuis l'attribut `data`. + * **Tri des Pros** : Trier par `properties.nom_exercice`, puis `properties.prenom_exercice`, puis `metadata.id`. + * Ajouter ce tableau trié au centre en cours sous l'attribut `pros`. + * **Tri des Centres** : Trier la liste finale des centres par `Center_Name`, puis `name`, puis `id`. + * Sauvegarder la liste triée dans le fichier output. + +### Format de l'Output +* **Fichiers générés** : + * `all_users_data.json` (Données issues des Rôles). + * `all_users_data_{app_name}.json` (Un fichier par Application). + * `professionals_by_endobest_center.json` (Données issues des Centres Endobest). +* **Structure JSON** : + * Array d'objets triés (par `lastname`, `firstname`, `user_id`). + * Chaque objet contient : + * `user_id` + * `user` (Détails IAM) + * `professional` (Détails HRD) + * `roles` (Uniquement pour le fichier par Rôles) + * `profiles` (Uniquement pour les fichiers par Applications) + +### Specs des APIs +* **Authentication (IAM)**: + * `/api/auth/{REALME}/login` [POST] + * `/api/auth/refreshToken` [POST] +* **Common (All Microservices)**: + * `/api/auth/config-token` [POST] + * `/api/auth/refreshToken` [POST] +* **IAM (User Data)**: + * `get_roles`: `/api/profiles/paginate` [POST] + * `get_applications`: `/api/applications` [GET] + * `get_profiles_by_app_id`: `/api/identity-profiles/paginate` [POST] + * `get_users_by_profile_id`: `/api/identity-profiles/{profile_id}/users` [GET] + * `get_user_by_id`: `/api/users/find/{user_id}` [GET] +* **HRD (Professional Data)**: + * `get_professional_by_id`: `/api/entity-manager/meta/{model}/data/nodes/pro/{pro_id}?relationships=2` [GET] + * `get_pros_by_endobest_center`: `/api/entity-manager/meta/modele_fr/data/orga/{organization_id}/centers/pros?limit=1000` [GET] +* **Note** : `user_id`, `model`, et `professional_id` sont récupérés de la même manière pour tous les traitements. + +### Multithreading +* Input utilisateur pour le nombre de threads (global). +* Le script utilise deux pools de threads persistants (`main` et `subtasks`) réutilisés pour chaque phase (Rôles, App 1, App 2...). +* **Traitement Utilisateur (Main Pool)** : + * Récupère `get_user_by_id`. + * Stocke dans `user`. + * Si `hrdProId` existe : soumet une tâche "Professionnel". + * Sinon : incrémente la barre de progression "Professionals" (pour ne pas bloquer). +* **Traitement Professionnel (Subtasks Pool)** : + * Récupère `get_professional_by_id`. + * Stocke dans `professional`. + * Incrémente la barre de progression "Professionals". + +### Progression du Script +* Utilisation de `tqdm` pour afficher deux barres (Users et Professionals). +* Les barres sont créées, mises à jour, et fermées **indépendamment pour chaque fichier généré** (Rôles, puis chaque Application). +* Gestion thread-safe des mises à jour. + + + + diff --git a/all_users_data.xlsx b/all_users_data.xlsx new file mode 100644 index 0000000..68c7901 Binary files /dev/null and b/all_users_data.xlsx differ diff --git a/get_all_users.bat b/get_all_users.bat new file mode 100644 index 0000000..ac7446c --- /dev/null +++ b/get_all_users.bat @@ -0,0 +1,4 @@ +@echo off +call C:\PythonProjects\.rcvenv\Scripts\activate.bat +python get_all_users.py %* + diff --git a/get_all_users.py b/get_all_users.py new file mode 100644 index 0000000..3b4b375 --- /dev/null +++ b/get_all_users.py @@ -0,0 +1,938 @@ +""" +Get All Users Script + +Retrieves all users from Ziwig Connect (IAM) and their associated Professional details (HRD). +Output: JSON file with user and professional details. + +Based on Endobest Script Template. +""" + +import json +import logging +import os +import sys +import threading +import traceback +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import timedelta +from time import perf_counter, sleep +import functools + +import httpx +import questionary +from tqdm import tqdm +from rich.console import Console + + +# ============================================================================ +# CONFIGURATION - CREDENTIALS +# ============================================================================ + +DEFAULT_USER_NAME = "admin" +DEFAULT_PASSWORD = "+J3/rw..'ynxXDHwt?bAvn_>" +REALME = "ziwig-pro" + + +# ============================================================================ +# CONFIGURATION - MICROSERVICES +# ============================================================================ +# Comment out unused microservices to skip their token configuration + +MICROSERVICES = { + "IAM": { + "app_id": None, # IAM doesn't use app_id + "base_url": "https://api-auth.ziwig-connect.com", + "endpoints": { + "login": "/api/auth/{REALME}/login", # POST : Body = {"username": "{user_name}", "password": "{pass}"} + "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} + "get_roles": "/api/profiles/paginate", # POST : Body = {"limit": 100, "currentPage": 1, "sort": [], "filters": {}} + "get_user_by_id": "/api/users/find/{user_id}?domaine={REALME}", # GET + "get_applications": "/api/applications", # GET + "get_profiles_by_app_id": "/api/identity-profiles/paginate", # POST : Body = {"page":null,"limit":100,"search":{},"clientId":"{app_id}","type":"user"} + "get_users_by_profile_id": "/api/identity-profiles/{profile_id}/users", # GET + } + }, + # "RC": { + # "app_id": "602aea51-cdb2-4f73-ac99-fd84050dc393", + # "base_url": "https://api-hcp.ziwig-connect.com", + # "endpoints": { + # "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}} + # "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} + # "organizations": "/api/inclusions/getAllOrganizations", # GET + # "statistics": "/api/inclusions/inclusion-statistics", # POST : Body = {"center": "rc_endobest_current_center}}", "protocolId": "{rc_endobest_prot_id}", "excludedCenters": {rc_endobest_excl_centers}} + # "search_inclusions": "/api/inclusions/search?limit={limit}&page={page}", # POST : Body = {"protocolId": "3c7bcb4d-91ed-4e9f-b93f-99d8447a276e", "center": organization_id, "keywords": ""} + # "record_by_patient": "/api/records/byPatient", # POST : Body = {"center": "{rc_endobest_current_center}", "patientId": "{patient_id}", "mode": "exchange", "state": "ongoing", "includeEndoParcour": false, "sourceClient": "pro_prm"}, + # "surveys": "/api/surveys/filter/with-answers", #POST : Body = {"context": "clinic_research", "subject": "{patient_id}", "blockedQcmVersions": {blocked_qcm_versions}} + # } + # }, + # "GDD": { + # "app_id": "4f5ac063-6a22-4e2c-bda5-b50c0dddab79", + # "base_url": "https://api-lab.ziwig-connect.com", + # "endpoints": { + # "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}} + # "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} + # "request_by_tube": "/api/requests/by-tube-id/{tube_id}", # GET + # } + # }, + "HRD": { + "app_id": "93bc44fd-c64b-4fff-a450-f3cba956e934", + "base_url": "https://api-resources.ziwig-connect.com", + "endpoints": { + "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}} + "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} + "pro_by_id": "/api/entity-manager/meta/{model}/data/nodes/pro/{pro_id}?relationships=2", # GET - Note: added leading slash + "get_pros_by_endobest_center": "/api/entity-manager/meta/modele_fr/data/orga/{organization_id}/centers/pros?limit=1000", # GET + } + }, +} + + +# ============================================================================ +# CONFIGURATION - THREADING +# ============================================================================ + +MAX_THREADS = 20 # Maximum threads for main pool +SUBTASKS_POOL_SIZE = 40 # Fixed size for subtasks pool + + +# ============================================================================ +# CONFIGURATION - RETRY & TIMEOUTS +# ============================================================================ + +ERROR_MAX_RETRY = 10 # Max retry attempts for API calls +WAIT_BEFORE_RETRY = 0.5 # Delay in seconds between retries (fixed) +API_TIMEOUT = 30 # Default timeout for API calls (seconds) + + +# ============================================================================ +# CONFIGURATION - LOGGING +# ============================================================================ + +LOG_LEVEL = logging.WARNING # Change to DEBUG for detailed logs +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' +# LOG_FILE_NAME auto-generated based on script name in __main__ + + +# ============================================================================ +# CONFIGURATION - PROGRESS BARS +# ============================================================================ + +BAR_N_FMT_WIDTH = 4 +BAR_TOTAL_FMT_WIDTH = 4 +BAR_TIME_WIDTH = 8 +BAR_RATE_WIDTH = 10 + +custom_bar_format = ("{l_bar}{bar}" + f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} " + f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, " + f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}") + +# ============================================================================ +# CONFIGURATION - OUTPUT +# ============================================================================ + +OUTPUT_FILENAME = "all_users_data.json" +FILENAME_ENDOBEST_CENTERS_INPUT = "endobest_organizations.json" +FILENAME_ENDOBEST_CENTERS_OUTPUT = "professionals_by_endobest_center.json" + + +# ============================================================================ +# GLOBAL VARIABLES +# ============================================================================ + +# Tokens storage: {app_name: {"access_token": ..., "refresh_token": ...}} +tokens = {} + +# Thread-safe HTTP client pool (one client per thread) +httpx_clients = {} + +# Thread management +threads_list = [] +_threads_list_lock = threading.Lock() +_token_refresh_lock = threading.Lock() + +# Thread pools (initialized in main()) +main_thread_pool = None +subtasks_thread_pool = None + +# Rich console for formatted output +console = Console() + + +# ============================================================================ +# UTILITIES +# ============================================================================ + +def get_nested_value(data_structure, path, default=None): + """ + Extract value from nested dict/list structures with wildcard support. + """ + if data_structure is None: + return "$$$$ No Data" + if not path: + return default + + # Handle wildcard in path + if "*" in path: + wildcard_index = path.index("*") + path_before = path[:wildcard_index] + path_after = path[wildcard_index+1:] + + # Helper for non-wildcard path resolution + def _get_simple_nested_value(ds, p, d): + cl = ds + for k in p: + if isinstance(cl, dict): + cl = cl.get(k) + elif isinstance(cl, list): + try: + if isinstance(k, int) and -len(cl) <= k < len(cl): + cl = cl[k] + else: + return d + except (IndexError, TypeError): + return d + else: + return d + if cl is None: + return d + return cl + + base_level = _get_simple_nested_value(data_structure, path_before, default) + + if not isinstance(base_level, list): + return default + + results = [] + for item in base_level: + value = get_nested_value(item, path_after, default) + if value is not default and value != "$$$$ No Data": + results.append(value) + + # Flatten one level for multiple wildcards + final_results = [] + for res in results: + if isinstance(res, list): + final_results.extend(res) + else: + final_results.append(res) + + return final_results + + # No wildcard - standard traversal + current_level = data_structure + for key_or_index in path: + if isinstance(current_level, dict): + current_level = current_level.get(key_or_index) + if current_level is None: + return default + elif isinstance(current_level, list): + try: + if isinstance(key_or_index, int) and -len(current_level) <= key_or_index < len(current_level): + current_level = current_level[key_or_index] + else: + return default + except (IndexError, TypeError): + return default + else: + return default + return current_level + + +def get_httpx_client() -> httpx.Client: + """ + Get or create thread-local HTTP client with keep-alive enabled. + """ + global httpx_clients + thread_id = threading.get_ident() + if thread_id not in httpx_clients: + httpx_clients[thread_id] = httpx.Client( + headers={"Connection": "keep-alive"}, + limits=httpx.Limits(max_keepalive_connections=20, max_connections=100) + ) + return httpx_clients[thread_id] + + +def get_thread_position(): + """ + Get position of current thread in threads list. + """ + global threads_list + thread_id = threading.get_ident() + with _threads_list_lock: + if thread_id not in threads_list: + threads_list.append(thread_id) + return len(threads_list) - 1 + else: + return threads_list.index(thread_id) + + +# ============================================================================ +# AUTHENTICATION +# ============================================================================ + +def login(): + """ + Authenticate with IAM and configure tokens for all microservices. + """ + global tokens + + # Prompt for credentials + user_name = questionary.text("login:", default=DEFAULT_USER_NAME).ask() + password = questionary.password("password:", default=DEFAULT_PASSWORD).ask() + + if not (user_name and password): + return "Exit" + + # Step 1: Login to IAM + try: + client = get_httpx_client() + client.base_url = MICROSERVICES["IAM"]["base_url"] + response = client.post( + MICROSERVICES["IAM"]["endpoints"]["login"].format(**{**globals(),**locals()}), + json={"username": user_name, "password": password}, + timeout=20 + ) + response.raise_for_status() + master_token = response.json()["access_token"] + user_id = response.json()["userId"] + tokens["IAM"] = {"access_token": master_token, "refresh_token": response.json()["refresh_token"]} + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + print(f"Login Error: {exc}") + logging.warning(f"Login Error: {exc}") + return "Error" + + # Step 2: Configure tokens for each microservice + for app_name, app_config in MICROSERVICES.items(): + if app_name == "IAM": + continue # IAM doesn't need config-token + + try: + client = get_httpx_client() + client.base_url = app_config["base_url"] + response = client.post( + app_config["endpoints"]["config_token"].format(**{**globals(),**locals()}), + headers={"Authorization": f"Bearer {master_token}"}, + json={ + "userId": user_id, + "clientId": app_config["app_id"], + "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36" + }, + timeout=20 + ) + response.raise_for_status() + tokens[app_name] = { + "access_token": response.json()["access_token"], + "refresh_token": response.json()["refresh_token"] + } + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + print(f"Config-token Error for {app_name}: {exc}") + logging.warning(f"Config-token Error for {app_name}: {exc}") + return "Error" + + print("\nLogin Success") + return "Success" + + +def new_token(app): + """ + Refresh access token for a specific microservice. + """ + global tokens + + with _token_refresh_lock: + for attempt in range(ERROR_MAX_RETRY): + try: + client = get_httpx_client() + client.base_url = MICROSERVICES[app]["base_url"] + response = client.post( + MICROSERVICES[app]["endpoints"]["refresh"].format(**{**globals(),**locals()}), + headers={"Authorization": f"Bearer {tokens[app]['access_token']}"}, + json={"refresh_token": tokens[app]["refresh_token"]}, + timeout=20 + ) + response.raise_for_status() + tokens[app]["access_token"] = response.json()["access_token"] + tokens[app]["refresh_token"] = response.json()["refresh_token"] + return + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + logging.warning(f"Refresh Token Error for {app} (Attempt {attempt + 1}): {exc}") + if attempt < ERROR_MAX_RETRY - 1: + sleep(WAIT_BEFORE_RETRY) + + logging.critical(f"Persistent error in refresh_token for {app}") + raise httpx.RequestError(message=f"Persistent error in refresh_token for {app}") + + +# ============================================================================ +# DECORATORS +# ============================================================================ + +def api_call_with_retry(app): + """ + Decorator for API calls with automatic retry and token refresh on 401. + """ + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + func_name = func.__name__ + for attempt in range(ERROR_MAX_RETRY): + try: + return func(*args, **kwargs) + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + logging.warning(f"Error in {func_name} (Attempt {attempt + 1}/{ERROR_MAX_RETRY}): {exc}") + + # Auto-refresh token on 401 + if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: + logging.info(f"Token expired for {func_name}. Refreshing token for {app}.") + new_token(app) + + if attempt < ERROR_MAX_RETRY - 1: + sleep(WAIT_BEFORE_RETRY) + + logging.critical(f"Persistent error in {func_name} after {ERROR_MAX_RETRY} attempts.") + raise httpx.RequestError(message=f"Persistent error in {func_name}") + + return wrapper + return decorator + + +# ============================================================================ +# API CALLS +# ============================================================================ + +@api_call_with_retry("IAM") +def get_roles(): + """ + Get all roles from IAM. + """ + client = get_httpx_client() + client.base_url = MICROSERVICES["IAM"]["base_url"] + response = client.post( + MICROSERVICES["IAM"]["endpoints"]["get_roles"], + headers={"Authorization": f"Bearer {tokens['IAM']['access_token']}"}, + json={"limit": 100, "currentPage": 1, "sort": [], "filters": {}}, + timeout=API_TIMEOUT + ) + response.raise_for_status() + return response.json() + + +@api_call_with_retry("IAM") +def get_user_by_id(user_id): + """ + Get user details by ID from IAM. + """ + client = get_httpx_client() + client.base_url = MICROSERVICES["IAM"]["base_url"] + response = client.get( + MICROSERVICES["IAM"]["endpoints"]["get_user_by_id"].format(user_id=user_id, REALME=REALME), + headers={"Authorization": f"Bearer {tokens['IAM']['access_token']}"}, + timeout=API_TIMEOUT + ) + response.raise_for_status() + return response.json() + + +@api_call_with_retry("HRD") +def get_professional_by_id(model, pro_id): + """ + Get professional details by ID from HRD. + """ + client = get_httpx_client() + client.base_url = MICROSERVICES["HRD"]["base_url"] + response = client.get( + MICROSERVICES["HRD"]["endpoints"]["pro_by_id"].format(model=model, pro_id=pro_id), + headers={"Authorization": f"Bearer {tokens['HRD']['access_token']}"}, + timeout=API_TIMEOUT + ) + response.raise_for_status() + return response.json() + + +@api_call_with_retry("IAM") +def get_applications(): + """ + Get all applications from IAM. + """ + client = get_httpx_client() + client.base_url = MICROSERVICES["IAM"]["base_url"] + response = client.get( + MICROSERVICES["IAM"]["endpoints"]["get_applications"], + headers={"Authorization": f"Bearer {tokens['IAM']['access_token']}"}, + timeout=API_TIMEOUT + ) + response.raise_for_status() + return response.json() + + +@api_call_with_retry("IAM") +def get_profiles_by_app_id(app_id): + """ + Get profiles for a specific application ID. + """ + client = get_httpx_client() + client.base_url = MICROSERVICES["IAM"]["base_url"] + + # Body payload as per specs + payload = { + "page": None, + "limit": 100, + "search": {}, + "clientId": app_id, + "type": "user" + } + + response = client.post( + MICROSERVICES["IAM"]["endpoints"]["get_profiles_by_app_id"], + headers={"Authorization": f"Bearer {tokens['IAM']['access_token']}"}, + json=payload, + timeout=API_TIMEOUT + ) + response.raise_for_status() + return response.json() + + +@api_call_with_retry("IAM") +def get_users_by_profile_id(profile_id): + """ + Get users associated with a specific profile ID. + """ + client = get_httpx_client() + client.base_url = MICROSERVICES["IAM"]["base_url"] + response = client.get( + MICROSERVICES["IAM"]["endpoints"]["get_users_by_profile_id"].format(profile_id=profile_id), + headers={"Authorization": f"Bearer {tokens['IAM']['access_token']}"}, + timeout=API_TIMEOUT + ) + response.raise_for_status() + return response.json() + + +@api_call_with_retry("HRD") +def get_pros_by_endobest_center(organization_id): + """ + Get professionals for a specific Endobest center. + """ + client = get_httpx_client() + client.base_url = MICROSERVICES["HRD"]["base_url"] + response = client.get( + MICROSERVICES["HRD"]["endpoints"]["get_pros_by_endobest_center"].format(organization_id=organization_id), + headers={"Authorization": f"Bearer {tokens['HRD']['access_token']}"}, + timeout=API_TIMEOUT + ) + response.raise_for_status() + return response.json() + + +# ============================================================================ +# WORKER FUNCTIONS +# ============================================================================ + +def process_user(user_id, output_data, output_lock, pbar_pros, pbar_lock): + """ + Process a single user: fetch details, update output, and trigger pro fetch if needed. + """ + try: + # Fetch user details + user_data = get_user_by_id(user_id) + + # Update output with user data + with output_lock: + if user_id in output_data: + output_data[user_id]["user"] = user_data + else: + # Should not happen if initialized correctly, but safe fallback + output_data[user_id] = {"roles": [], "user": user_data, "professional": {}} + + # Extract professional info + # Path: professional.data.graph -> model + # Path: hrdProId -> pro_id + model = get_nested_value(user_data, ["professional", "data", "graph"], "modele_fr") + pro_id = get_nested_value(user_data, ["hrdProId"]) + + if pro_id and pro_id != "$$$$ No Data" and model and model != "$$$$ No Data": + # Submit professional task + subtasks_thread_pool.submit( + process_professional, + user_id, model, pro_id, + output_data, output_lock, + pbar_pros, pbar_lock + ) + else: + # No professional data to fetch, update pbar_pros immediately + with pbar_lock: + pbar_pros.update(1) + + except Exception as e: + logging.error(f"Error processing user {user_id}: {e}") + # Ensure pbar_pros is updated even on error to avoid hanging + with pbar_lock: + pbar_pros.update(1) + + +def process_professional(user_id, model, pro_id, output_data, output_lock, pbar_pros, pbar_lock): + """ + Process a professional: fetch details and update output. + """ + try: + pro_data = get_professional_by_id(model, pro_id) + + with output_lock: + if user_id in output_data: + output_data[user_id]["professional"] = pro_data + except Exception as e: + logging.error(f"Error processing professional {pro_id} for user {user_id}: {e}") + finally: + with pbar_lock: + pbar_pros.update(1) + + +# ============================================================================ +# MAIN PROCESSING +# ============================================================================ + + +def process_user_list(output_data, context_name, output_filename_suffix=""): + """ + Execute the multithreaded processing for a given dictionary of users. + """ + global main_thread_pool, subtasks_thread_pool + + total_users = len(output_data) + if total_users == 0: + console.print(f"[yellow]No users found for {context_name}. Skipping.[/yellow]") + return + + console.print(f"[bold]Processing {total_users} users for {context_name}...[/bold]") + + # Create progress bars + # Index 0 for users, Index 1 for professionals + # We must ensure pbar_pros is managed correctly + pbar_lock = threading.Lock() + output_lock = threading.Lock() + + # Note: We create new bars for each run to avoid state issues + pbar_users = tqdm(total=total_users, unit="users", desc="Users ", position=0, bar_format=custom_bar_format) + pbar_pros = tqdm(total=total_users, unit="pros.", desc="Professionals", position=1, bar_format=custom_bar_format) + + futures = [] + + # Submit main user tasks + for user_id in output_data.keys(): + futures.append( + main_thread_pool.submit( + process_user, + user_id, + output_data, + output_lock, + pbar_pros, + pbar_lock + ) + ) + + # Wait for all user tasks + for future in as_completed(futures): + try: + future.result() + pbar_users.update(1) + except Exception as e: + logging.error(f"Task error in {context_name}: {e}") + pbar_users.update(1) + + pbar_users.close() + + # Move pbar_pros up + with pbar_lock: + pbar_pros.clear() + pbar_pros.pos = 0 + pbar_pros.refresh() + + subtasks_thread_pool.shutdown(wait=True) + # Re-initialize for next run + # Note: Global variable update + init_subtasks_pool() + + pbar_pros.close() + + # Sort and Save + console.print(f"Exporting data to {OUTPUT_FILENAME.replace('.json', output_filename_suffix + '.json')}...") + + final_output = [{"user_id": k, **v} for k, v in output_data.items()] + final_output.sort(key=lambda x: ( + str(x.get("user", {}).get("lastname", "")).lower(), + str(x.get("user", {}).get("firstname", "")).lower(), + str(x.get("user_id", "")) + )) + + filename = OUTPUT_FILENAME + if output_filename_suffix: + filename = filename.replace(".json", f"{output_filename_suffix}.json") + + with open(filename, 'w', encoding='utf-8') as f: + json.dump(final_output, f, indent=4, ensure_ascii=False) + + console.print(f"[green]Export complete. {len(final_output)} records saved to {filename}.[/green]") + print() + + +def process_endobest_centers(): + """ + Phase 3: Process Endobest Centers from input JSON file. + Sequential processing (no thread pool). + """ + print() + console.print("==================================================") + console.print("[bold cyan]PHASE 3: Processing Endobest Centers[/bold cyan]") + console.print("==================================================") + + # 1. Load Input File + try: + with open(FILENAME_ENDOBEST_CENTERS_INPUT, 'r', encoding='utf-8') as f: + centers_data = json.load(f) + except FileNotFoundError: + console.print(f"[yellow]Input file '{FILENAME_ENDOBEST_CENTERS_INPUT}' not found. Skipping Phase 3.[/yellow]") + return + except json.JSONDecodeError as e: + console.print(f"[red]Error decoding '{FILENAME_ENDOBEST_CENTERS_INPUT}': {e}. Skipping Phase 3.[/red]") + return + + # Filter out entries that might not be objects or missing basic data if necessary, + # but spec implies trusting the array. We'll iterate what we have. + if not isinstance(centers_data, list): + console.print(f"[red]Input file content is not a list. Skipping Phase 3.[/red]") + return + + total_centers = len(centers_data) + console.print(f"Processing {total_centers} centers...") + + # 2. Progress Bar + pbar = tqdm(total=total_centers, unit="centers", desc="Centers ", bar_format=custom_bar_format) + + # 3. Iterate & Process + for center in centers_data: + center_id = center.get("id") + + if not center_id: + pbar.update(1) + continue + + try: + response_json = get_pros_by_endobest_center(center_id) + pros_list = response_json.get("data", []) + + if not isinstance(pros_list, list): + pros_list = [] + + # Sort Pros: nom_exercice, prenom_exercice, id + # Using get_nested_value safely + pros_list.sort(key=lambda x: ( + str(get_nested_value(x, ["properties", "nom_exercice"], default="")).lower(), + str(get_nested_value(x, ["properties", "prenom_exercice"], default="")).lower(), + str(get_nested_value(x, ["metadata", "id"], default="")) + )) + + center["pros"] = pros_list + + except Exception as e: + logging.error(f"Error processing center {center_id}: {e}") + finally: + pbar.update(1) + + pbar.close() + + # 4. Sort Centers + # Center_Name, name, id + centers_data.sort(key=lambda x: ( + str(x.get("Center_Name", "")).lower(), + str(x.get("name", "")).lower(), + str(x.get("id", "")) + )) + + # 5. Save Output + console.print(f"Exporting data to {FILENAME_ENDOBEST_CENTERS_OUTPUT}...") + try: + with open(FILENAME_ENDOBEST_CENTERS_OUTPUT, 'w', encoding='utf-8') as f: + json.dump(centers_data, f, indent=4, ensure_ascii=False) + console.print(f"[green]Export complete. {len(centers_data)} centers saved to {FILENAME_ENDOBEST_CENTERS_OUTPUT}.[/green]") + except Exception as e: + console.print(f"[red]Error saving output: {e}[/red]") + print() + + +def init_subtasks_pool(): + global subtasks_thread_pool, number_of_threads + # Ensure we have a thread count, fallback to 10 if not set (should not happen) + count = globals().get('number_of_threads', 10) + subtasks_thread_pool = ThreadPoolExecutor(max_workers=count) + + +def main(): + """ + Main processing function. + """ + global main_thread_pool, subtasks_thread_pool, number_of_threads + + # ========== AUTHENTICATION ========== + print() + login_status = login() + while login_status == "Error": + login_status = login() + if login_status == "Exit": + return + + # ========== CONFIGURATION ========== + print() + number_of_threads = int( + questionary.text( + "Number of threads:", + default="12", + validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS + ).ask() + ) + + # ========== INITIALIZATION ========== + start_time = perf_counter() + + # Initialize thread pools + main_thread_pool = ThreadPoolExecutor(max_workers=number_of_threads) + init_subtasks_pool() + + # ========== PHASE 1: ROLES ========== + print() + console.print("==================================================") + console.print("[bold cyan]PHASE 1: Processing Roles[/bold cyan]") + console.print("==================================================") + + console.print("Fetching roles...") + roles_response = get_roles() + roles_data = roles_response.get("data", []) + + output_data_roles = {} + + console.print("Initializing user list from roles...") + for role in roles_data: + role_info = {"id": role.get("id"), "name": role.get("name")} + users = role.get("users", []) + + for user_id in users: + if user_id not in output_data_roles: + output_data_roles[user_id] = { + "roles": [role_info], + "user": {}, + "professional": {} + } + else: + existing_role_ids = [r["id"] for r in output_data_roles[user_id]["roles"]] + if role_info["id"] not in existing_role_ids: + output_data_roles[user_id]["roles"].append(role_info) + + process_user_list(output_data_roles, "Roles") + + + # ========== PHASE 2: APPLICATIONS ========== + print() + console.print("==================================================") + console.print("[bold cyan]PHASE 2: Processing Applications[/bold cyan]") + console.print("==================================================") + + console.print("Fetching applications...") + apps_response = get_applications() + # apps_response is a list of dicts + + for app in apps_response: + app_name = app.get("label") or app.get("name") or "Unknown" + client_id = app.get("clientId") + + if not client_id: + continue + + print() + console.print(f"[bold magenta]--- Application: {app_name} ---[/bold magenta]") + + console.print(f"Fetching profiles for {app_name}...") + profiles_response = get_profiles_by_app_id(client_id) + profiles_data = profiles_response.get("data", []) + + output_data_app = {} + + console.print(f"Initializing user list for {app_name}...") + for profile in profiles_data: + profile_info = {"id": profile.get("id"), "name": profile.get("name")} + profile_id = profile.get("id") + + # Fetch users for this profile + # Note: The API returns a list of user IDs directly? + # Spec check: "get_users_by_profile_id : la réponse est un simple array de user_id" + try: + users_list = get_users_by_profile_id(profile_id) + # Ensure it's a list + if not isinstance(users_list, list): + users_list = [] + except Exception as e: + logging.error(f"Error fetching users for profile {profile_id}: {e}") + users_list = [] + + for user_id in users_list: + if user_id not in output_data_app: + output_data_app[user_id] = { + "profiles": [profile_info], + "user": {}, + "professional": {} + } + else: + existing_profile_ids = [p["id"] for p in output_data_app[user_id]["profiles"]] + if profile_info["id"] not in existing_profile_ids: + output_data_app[user_id]["profiles"].append(profile_info) + + # Process this application's users + # Sanitize app_name for filename + safe_app_name = "".join([c if c.isalnum() else "_" for c in app_name]) + process_user_list(output_data_app, f"Application {app_name}", f"_{safe_app_name}") + + + # ========== PHASE 3: ENDOBEST CENTERS ========== + process_endobest_centers() + + + # ========== FINALIZATION ========== + print() + console.print("[bold green]All processing complete.[/bold green]") + print(f"Total Elapsed time: {str(timedelta(seconds=perf_counter() - start_time))}") + + +# ============================================================================ +# ENTRY POINT +# ============================================================================ + +if __name__ == '__main__': + # ========== LOGGING CONFIGURATION ========== + # Auto-generate log filename based on script name + script_name = os.path.splitext(os.path.basename(__file__))[0] + log_file_name = f"{script_name}.log" + + logging.basicConfig( + level=LOG_LEVEL, + format=LOG_FORMAT, + filename=log_file_name, + filemode='w' + ) + + # ========== MAIN EXECUTION ========== + try: + main() + except Exception as e: + logging.critical(f"Script terminated with exception: {e}", exc_info=True) + print(f"\nScript stopped due to error: {e}") + print(traceback.format_exc()) + finally: + # ========== CLEANUP ========== + # Shutdown thread pools gracefully + if 'main_thread_pool' in globals() and main_thread_pool: + main_thread_pool.shutdown(wait=False, cancel_futures=True) + if 'subtasks_thread_pool' in globals() and subtasks_thread_pool: + subtasks_thread_pool.shutdown(wait=False, cancel_futures=True) + + # Pause before exit (prevents console from closing immediately when launched from Windows Explorer) + print('\n') + input("Press Enter to exit...")