Files
Get_All_Users/get_all_users.py

939 lines
34 KiB
Python

"""
Get All Users Script
Retrieves all users from Ziwig Connect (IAM) and their associated Professional details (HRD).
Output: JSON file with user and professional details.
Based on Endobest Script Template.
"""
import json
import logging
import os
import sys
import threading
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import timedelta
from time import perf_counter, sleep
import functools
import httpx
import questionary
from tqdm import tqdm
from rich.console import Console
# ============================================================================
# CONFIGURATION - CREDENTIALS
# ============================================================================
DEFAULT_USER_NAME = "admin"
DEFAULT_PASSWORD = "+J3/rw..'ynxXDHwt?bAvn_>"
REALME = "ziwig-pro"
# ============================================================================
# CONFIGURATION - MICROSERVICES
# ============================================================================
# Comment out unused microservices to skip their token configuration
MICROSERVICES = {
"IAM": {
"app_id": None, # IAM doesn't use app_id
"base_url": "https://api-auth.ziwig-connect.com",
"endpoints": {
"login": "/api/auth/{REALME}/login", # POST : Body = {"username": "{user_name}", "password": "{pass}"}
"refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"}
"get_roles": "/api/profiles/paginate", # POST : Body = {"limit": 100, "currentPage": 1, "sort": [], "filters": {}}
"get_user_by_id": "/api/users/find/{user_id}?domaine={REALME}", # GET
"get_applications": "/api/applications", # GET
"get_profiles_by_app_id": "/api/identity-profiles/paginate", # POST : Body = {"page":null,"limit":100,"search":{},"clientId":"{app_id}","type":"user"}
"get_users_by_profile_id": "/api/identity-profiles/{profile_id}/users", # GET
}
},
# "RC": {
# "app_id": "602aea51-cdb2-4f73-ac99-fd84050dc393",
# "base_url": "https://api-hcp.ziwig-connect.com",
# "endpoints": {
# "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}}
# "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"}
# "organizations": "/api/inclusions/getAllOrganizations", # GET
# "statistics": "/api/inclusions/inclusion-statistics", # POST : Body = {"center": "rc_endobest_current_center}}", "protocolId": "{rc_endobest_prot_id}", "excludedCenters": {rc_endobest_excl_centers}}
# "search_inclusions": "/api/inclusions/search?limit={limit}&page={page}", # POST : Body = {"protocolId": "3c7bcb4d-91ed-4e9f-b93f-99d8447a276e", "center": organization_id, "keywords": ""}
# "record_by_patient": "/api/records/byPatient", # POST : Body = {"center": "{rc_endobest_current_center}", "patientId": "{patient_id}", "mode": "exchange", "state": "ongoing", "includeEndoParcour": false, "sourceClient": "pro_prm"},
# "surveys": "/api/surveys/filter/with-answers", #POST : Body = {"context": "clinic_research", "subject": "{patient_id}", "blockedQcmVersions": {blocked_qcm_versions}}
# }
# },
# "GDD": {
# "app_id": "4f5ac063-6a22-4e2c-bda5-b50c0dddab79",
# "base_url": "https://api-lab.ziwig-connect.com",
# "endpoints": {
# "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}}
# "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"}
# "request_by_tube": "/api/requests/by-tube-id/{tube_id}", # GET
# }
# },
"HRD": {
"app_id": "93bc44fd-c64b-4fff-a450-f3cba956e934",
"base_url": "https://api-resources.ziwig-connect.com",
"endpoints": {
"config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}}
"refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"}
"pro_by_id": "/api/entity-manager/meta/{model}/data/nodes/pro/{pro_id}?relationships=2", # GET - Note: added leading slash
"get_pros_by_endobest_center": "/api/entity-manager/meta/modele_fr/data/orga/{organization_id}/centers/pros?limit=1000", # GET
}
},
}
# ============================================================================
# CONFIGURATION - THREADING
# ============================================================================
MAX_THREADS = 20 # Maximum threads for main pool
SUBTASKS_POOL_SIZE = 40 # Fixed size for subtasks pool
# ============================================================================
# CONFIGURATION - RETRY & TIMEOUTS
# ============================================================================
ERROR_MAX_RETRY = 10 # Max retry attempts for API calls
WAIT_BEFORE_RETRY = 0.5 # Delay in seconds between retries (fixed)
API_TIMEOUT = 30 # Default timeout for API calls (seconds)
# ============================================================================
# CONFIGURATION - LOGGING
# ============================================================================
LOG_LEVEL = logging.WARNING # Change to DEBUG for detailed logs
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
# LOG_FILE_NAME auto-generated based on script name in __main__
# ============================================================================
# CONFIGURATION - PROGRESS BARS
# ============================================================================
BAR_N_FMT_WIDTH = 4
BAR_TOTAL_FMT_WIDTH = 4
BAR_TIME_WIDTH = 8
BAR_RATE_WIDTH = 10
custom_bar_format = ("{l_bar}{bar}"
f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} "
f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, "
f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}")
# ============================================================================
# CONFIGURATION - OUTPUT
# ============================================================================
OUTPUT_FILENAME = "all_users_data.json"
FILENAME_ENDOBEST_CENTERS_INPUT = "endobest_organizations.json"
FILENAME_ENDOBEST_CENTERS_OUTPUT = "professionals_by_endobest_center.json"
# ============================================================================
# GLOBAL VARIABLES
# ============================================================================
# Tokens storage: {app_name: {"access_token": ..., "refresh_token": ...}}
tokens = {}
# Thread-safe HTTP client pool (one client per thread)
httpx_clients = {}
# Thread management
threads_list = []
_threads_list_lock = threading.Lock()
_token_refresh_lock = threading.Lock()
# Thread pools (initialized in main())
main_thread_pool = None
subtasks_thread_pool = None
# Rich console for formatted output
console = Console()
# ============================================================================
# UTILITIES
# ============================================================================
def get_nested_value(data_structure, path, default=None):
"""
Extract value from nested dict/list structures with wildcard support.
"""
if data_structure is None:
return "$$$$ No Data"
if not path:
return default
# Handle wildcard in path
if "*" in path:
wildcard_index = path.index("*")
path_before = path[:wildcard_index]
path_after = path[wildcard_index+1:]
# Helper for non-wildcard path resolution
def _get_simple_nested_value(ds, p, d):
cl = ds
for k in p:
if isinstance(cl, dict):
cl = cl.get(k)
elif isinstance(cl, list):
try:
if isinstance(k, int) and -len(cl) <= k < len(cl):
cl = cl[k]
else:
return d
except (IndexError, TypeError):
return d
else:
return d
if cl is None:
return d
return cl
base_level = _get_simple_nested_value(data_structure, path_before, default)
if not isinstance(base_level, list):
return default
results = []
for item in base_level:
value = get_nested_value(item, path_after, default)
if value is not default and value != "$$$$ No Data":
results.append(value)
# Flatten one level for multiple wildcards
final_results = []
for res in results:
if isinstance(res, list):
final_results.extend(res)
else:
final_results.append(res)
return final_results
# No wildcard - standard traversal
current_level = data_structure
for key_or_index in path:
if isinstance(current_level, dict):
current_level = current_level.get(key_or_index)
if current_level is None:
return default
elif isinstance(current_level, list):
try:
if isinstance(key_or_index, int) and -len(current_level) <= key_or_index < len(current_level):
current_level = current_level[key_or_index]
else:
return default
except (IndexError, TypeError):
return default
else:
return default
return current_level
def get_httpx_client() -> httpx.Client:
"""
Get or create thread-local HTTP client with keep-alive enabled.
"""
global httpx_clients
thread_id = threading.get_ident()
if thread_id not in httpx_clients:
httpx_clients[thread_id] = httpx.Client(
headers={"Connection": "keep-alive"},
limits=httpx.Limits(max_keepalive_connections=20, max_connections=100)
)
return httpx_clients[thread_id]
def get_thread_position():
"""
Get position of current thread in threads list.
"""
global threads_list
thread_id = threading.get_ident()
with _threads_list_lock:
if thread_id not in threads_list:
threads_list.append(thread_id)
return len(threads_list) - 1
else:
return threads_list.index(thread_id)
# ============================================================================
# AUTHENTICATION
# ============================================================================
def login():
"""
Authenticate with IAM and configure tokens for all microservices.
"""
global tokens
# Prompt for credentials
user_name = questionary.text("login:", default=DEFAULT_USER_NAME).ask()
password = questionary.password("password:", default=DEFAULT_PASSWORD).ask()
if not (user_name and password):
return "Exit"
# Step 1: Login to IAM
try:
client = get_httpx_client()
client.base_url = MICROSERVICES["IAM"]["base_url"]
response = client.post(
MICROSERVICES["IAM"]["endpoints"]["login"].format(**{**globals(),**locals()}),
json={"username": user_name, "password": password},
timeout=20
)
response.raise_for_status()
master_token = response.json()["access_token"]
user_id = response.json()["userId"]
tokens["IAM"] = {"access_token": master_token, "refresh_token": response.json()["refresh_token"]}
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
print(f"Login Error: {exc}")
logging.warning(f"Login Error: {exc}")
return "Error"
# Step 2: Configure tokens for each microservice
for app_name, app_config in MICROSERVICES.items():
if app_name == "IAM":
continue # IAM doesn't need config-token
try:
client = get_httpx_client()
client.base_url = app_config["base_url"]
response = client.post(
app_config["endpoints"]["config_token"].format(**{**globals(),**locals()}),
headers={"Authorization": f"Bearer {master_token}"},
json={
"userId": user_id,
"clientId": app_config["app_id"],
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"
},
timeout=20
)
response.raise_for_status()
tokens[app_name] = {
"access_token": response.json()["access_token"],
"refresh_token": response.json()["refresh_token"]
}
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
print(f"Config-token Error for {app_name}: {exc}")
logging.warning(f"Config-token Error for {app_name}: {exc}")
return "Error"
print("\nLogin Success")
return "Success"
def new_token(app):
"""
Refresh access token for a specific microservice.
"""
global tokens
with _token_refresh_lock:
for attempt in range(ERROR_MAX_RETRY):
try:
client = get_httpx_client()
client.base_url = MICROSERVICES[app]["base_url"]
response = client.post(
MICROSERVICES[app]["endpoints"]["refresh"].format(**{**globals(),**locals()}),
headers={"Authorization": f"Bearer {tokens[app]['access_token']}"},
json={"refresh_token": tokens[app]["refresh_token"]},
timeout=20
)
response.raise_for_status()
tokens[app]["access_token"] = response.json()["access_token"]
tokens[app]["refresh_token"] = response.json()["refresh_token"]
return
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
logging.warning(f"Refresh Token Error for {app} (Attempt {attempt + 1}): {exc}")
if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY)
logging.critical(f"Persistent error in refresh_token for {app}")
raise httpx.RequestError(message=f"Persistent error in refresh_token for {app}")
# ============================================================================
# DECORATORS
# ============================================================================
def api_call_with_retry(app):
"""
Decorator for API calls with automatic retry and token refresh on 401.
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
func_name = func.__name__
for attempt in range(ERROR_MAX_RETRY):
try:
return func(*args, **kwargs)
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
logging.warning(f"Error in {func_name} (Attempt {attempt + 1}/{ERROR_MAX_RETRY}): {exc}")
# Auto-refresh token on 401
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
logging.info(f"Token expired for {func_name}. Refreshing token for {app}.")
new_token(app)
if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY)
logging.critical(f"Persistent error in {func_name} after {ERROR_MAX_RETRY} attempts.")
raise httpx.RequestError(message=f"Persistent error in {func_name}")
return wrapper
return decorator
# ============================================================================
# API CALLS
# ============================================================================
@api_call_with_retry("IAM")
def get_roles():
"""
Get all roles from IAM.
"""
client = get_httpx_client()
client.base_url = MICROSERVICES["IAM"]["base_url"]
response = client.post(
MICROSERVICES["IAM"]["endpoints"]["get_roles"],
headers={"Authorization": f"Bearer {tokens['IAM']['access_token']}"},
json={"limit": 100, "currentPage": 1, "sort": [], "filters": {}},
timeout=API_TIMEOUT
)
response.raise_for_status()
return response.json()
@api_call_with_retry("IAM")
def get_user_by_id(user_id):
"""
Get user details by ID from IAM.
"""
client = get_httpx_client()
client.base_url = MICROSERVICES["IAM"]["base_url"]
response = client.get(
MICROSERVICES["IAM"]["endpoints"]["get_user_by_id"].format(user_id=user_id, REALME=REALME),
headers={"Authorization": f"Bearer {tokens['IAM']['access_token']}"},
timeout=API_TIMEOUT
)
response.raise_for_status()
return response.json()
@api_call_with_retry("HRD")
def get_professional_by_id(model, pro_id):
"""
Get professional details by ID from HRD.
"""
client = get_httpx_client()
client.base_url = MICROSERVICES["HRD"]["base_url"]
response = client.get(
MICROSERVICES["HRD"]["endpoints"]["pro_by_id"].format(model=model, pro_id=pro_id),
headers={"Authorization": f"Bearer {tokens['HRD']['access_token']}"},
timeout=API_TIMEOUT
)
response.raise_for_status()
return response.json()
@api_call_with_retry("IAM")
def get_applications():
"""
Get all applications from IAM.
"""
client = get_httpx_client()
client.base_url = MICROSERVICES["IAM"]["base_url"]
response = client.get(
MICROSERVICES["IAM"]["endpoints"]["get_applications"],
headers={"Authorization": f"Bearer {tokens['IAM']['access_token']}"},
timeout=API_TIMEOUT
)
response.raise_for_status()
return response.json()
@api_call_with_retry("IAM")
def get_profiles_by_app_id(app_id):
"""
Get profiles for a specific application ID.
"""
client = get_httpx_client()
client.base_url = MICROSERVICES["IAM"]["base_url"]
# Body payload as per specs
payload = {
"page": None,
"limit": 100,
"search": {},
"clientId": app_id,
"type": "user"
}
response = client.post(
MICROSERVICES["IAM"]["endpoints"]["get_profiles_by_app_id"],
headers={"Authorization": f"Bearer {tokens['IAM']['access_token']}"},
json=payload,
timeout=API_TIMEOUT
)
response.raise_for_status()
return response.json()
@api_call_with_retry("IAM")
def get_users_by_profile_id(profile_id):
"""
Get users associated with a specific profile ID.
"""
client = get_httpx_client()
client.base_url = MICROSERVICES["IAM"]["base_url"]
response = client.get(
MICROSERVICES["IAM"]["endpoints"]["get_users_by_profile_id"].format(profile_id=profile_id),
headers={"Authorization": f"Bearer {tokens['IAM']['access_token']}"},
timeout=API_TIMEOUT
)
response.raise_for_status()
return response.json()
@api_call_with_retry("HRD")
def get_pros_by_endobest_center(organization_id):
"""
Get professionals for a specific Endobest center.
"""
client = get_httpx_client()
client.base_url = MICROSERVICES["HRD"]["base_url"]
response = client.get(
MICROSERVICES["HRD"]["endpoints"]["get_pros_by_endobest_center"].format(organization_id=organization_id),
headers={"Authorization": f"Bearer {tokens['HRD']['access_token']}"},
timeout=API_TIMEOUT
)
response.raise_for_status()
return response.json()
# ============================================================================
# WORKER FUNCTIONS
# ============================================================================
def process_user(user_id, output_data, output_lock, pbar_pros, pbar_lock):
"""
Process a single user: fetch details, update output, and trigger pro fetch if needed.
"""
try:
# Fetch user details
user_data = get_user_by_id(user_id)
# Update output with user data
with output_lock:
if user_id in output_data:
output_data[user_id]["user"] = user_data
else:
# Should not happen if initialized correctly, but safe fallback
output_data[user_id] = {"roles": [], "user": user_data, "professional": {}}
# Extract professional info
# Path: professional.data.graph -> model
# Path: hrdProId -> pro_id
model = get_nested_value(user_data, ["professional", "data", "graph"], "modele_fr")
pro_id = get_nested_value(user_data, ["hrdProId"])
if pro_id and pro_id != "$$$$ No Data" and model and model != "$$$$ No Data":
# Submit professional task
subtasks_thread_pool.submit(
process_professional,
user_id, model, pro_id,
output_data, output_lock,
pbar_pros, pbar_lock
)
else:
# No professional data to fetch, update pbar_pros immediately
with pbar_lock:
pbar_pros.update(1)
except Exception as e:
logging.error(f"Error processing user {user_id}: {e}")
# Ensure pbar_pros is updated even on error to avoid hanging
with pbar_lock:
pbar_pros.update(1)
def process_professional(user_id, model, pro_id, output_data, output_lock, pbar_pros, pbar_lock):
"""
Process a professional: fetch details and update output.
"""
try:
pro_data = get_professional_by_id(model, pro_id)
with output_lock:
if user_id in output_data:
output_data[user_id]["professional"] = pro_data
except Exception as e:
logging.error(f"Error processing professional {pro_id} for user {user_id}: {e}")
finally:
with pbar_lock:
pbar_pros.update(1)
# ============================================================================
# MAIN PROCESSING
# ============================================================================
def process_user_list(output_data, context_name, output_filename_suffix=""):
"""
Execute the multithreaded processing for a given dictionary of users.
"""
global main_thread_pool, subtasks_thread_pool
total_users = len(output_data)
if total_users == 0:
console.print(f"[yellow]No users found for {context_name}. Skipping.[/yellow]")
return
console.print(f"[bold]Processing {total_users} users for {context_name}...[/bold]")
# Create progress bars
# Index 0 for users, Index 1 for professionals
# We must ensure pbar_pros is managed correctly
pbar_lock = threading.Lock()
output_lock = threading.Lock()
# Note: We create new bars for each run to avoid state issues
pbar_users = tqdm(total=total_users, unit="users", desc="Users ", position=0, bar_format=custom_bar_format)
pbar_pros = tqdm(total=total_users, unit="pros.", desc="Professionals", position=1, bar_format=custom_bar_format)
futures = []
# Submit main user tasks
for user_id in output_data.keys():
futures.append(
main_thread_pool.submit(
process_user,
user_id,
output_data,
output_lock,
pbar_pros,
pbar_lock
)
)
# Wait for all user tasks
for future in as_completed(futures):
try:
future.result()
pbar_users.update(1)
except Exception as e:
logging.error(f"Task error in {context_name}: {e}")
pbar_users.update(1)
pbar_users.close()
# Move pbar_pros up
with pbar_lock:
pbar_pros.clear()
pbar_pros.pos = 0
pbar_pros.refresh()
subtasks_thread_pool.shutdown(wait=True)
# Re-initialize for next run
# Note: Global variable update
init_subtasks_pool()
pbar_pros.close()
# Sort and Save
console.print(f"Exporting data to {OUTPUT_FILENAME.replace('.json', output_filename_suffix + '.json')}...")
final_output = [{"user_id": k, **v} for k, v in output_data.items()]
final_output.sort(key=lambda x: (
str(x.get("user", {}).get("lastname", "")).lower(),
str(x.get("user", {}).get("firstname", "")).lower(),
str(x.get("user_id", ""))
))
filename = OUTPUT_FILENAME
if output_filename_suffix:
filename = filename.replace(".json", f"{output_filename_suffix}.json")
with open(filename, 'w', encoding='utf-8') as f:
json.dump(final_output, f, indent=4, ensure_ascii=False)
console.print(f"[green]Export complete. {len(final_output)} records saved to {filename}.[/green]")
print()
def process_endobest_centers():
"""
Phase 3: Process Endobest Centers from input JSON file.
Sequential processing (no thread pool).
"""
print()
console.print("==================================================")
console.print("[bold cyan]PHASE 3: Processing Endobest Centers[/bold cyan]")
console.print("==================================================")
# 1. Load Input File
try:
with open(FILENAME_ENDOBEST_CENTERS_INPUT, 'r', encoding='utf-8') as f:
centers_data = json.load(f)
except FileNotFoundError:
console.print(f"[yellow]Input file '{FILENAME_ENDOBEST_CENTERS_INPUT}' not found. Skipping Phase 3.[/yellow]")
return
except json.JSONDecodeError as e:
console.print(f"[red]Error decoding '{FILENAME_ENDOBEST_CENTERS_INPUT}': {e}. Skipping Phase 3.[/red]")
return
# Filter out entries that might not be objects or missing basic data if necessary,
# but spec implies trusting the array. We'll iterate what we have.
if not isinstance(centers_data, list):
console.print(f"[red]Input file content is not a list. Skipping Phase 3.[/red]")
return
total_centers = len(centers_data)
console.print(f"Processing {total_centers} centers...")
# 2. Progress Bar
pbar = tqdm(total=total_centers, unit="centers", desc="Centers ", bar_format=custom_bar_format)
# 3. Iterate & Process
for center in centers_data:
center_id = center.get("id")
if not center_id:
pbar.update(1)
continue
try:
response_json = get_pros_by_endobest_center(center_id)
pros_list = response_json.get("data", [])
if not isinstance(pros_list, list):
pros_list = []
# Sort Pros: nom_exercice, prenom_exercice, id
# Using get_nested_value safely
pros_list.sort(key=lambda x: (
str(get_nested_value(x, ["properties", "nom_exercice"], default="")).lower(),
str(get_nested_value(x, ["properties", "prenom_exercice"], default="")).lower(),
str(get_nested_value(x, ["metadata", "id"], default=""))
))
center["pros"] = pros_list
except Exception as e:
logging.error(f"Error processing center {center_id}: {e}")
finally:
pbar.update(1)
pbar.close()
# 4. Sort Centers
# Center_Name, name, id
centers_data.sort(key=lambda x: (
str(x.get("Center_Name", "")).lower(),
str(x.get("name", "")).lower(),
str(x.get("id", ""))
))
# 5. Save Output
console.print(f"Exporting data to {FILENAME_ENDOBEST_CENTERS_OUTPUT}...")
try:
with open(FILENAME_ENDOBEST_CENTERS_OUTPUT, 'w', encoding='utf-8') as f:
json.dump(centers_data, f, indent=4, ensure_ascii=False)
console.print(f"[green]Export complete. {len(centers_data)} centers saved to {FILENAME_ENDOBEST_CENTERS_OUTPUT}.[/green]")
except Exception as e:
console.print(f"[red]Error saving output: {e}[/red]")
print()
def init_subtasks_pool():
global subtasks_thread_pool, number_of_threads
# Ensure we have a thread count, fallback to 10 if not set (should not happen)
count = globals().get('number_of_threads', 10)
subtasks_thread_pool = ThreadPoolExecutor(max_workers=count)
def main():
"""
Main processing function.
"""
global main_thread_pool, subtasks_thread_pool, number_of_threads
# ========== AUTHENTICATION ==========
print()
login_status = login()
while login_status == "Error":
login_status = login()
if login_status == "Exit":
return
# ========== CONFIGURATION ==========
print()
number_of_threads = int(
questionary.text(
"Number of threads:",
default="12",
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS
).ask()
)
# ========== INITIALIZATION ==========
start_time = perf_counter()
# Initialize thread pools
main_thread_pool = ThreadPoolExecutor(max_workers=number_of_threads)
init_subtasks_pool()
# ========== PHASE 1: ROLES ==========
print()
console.print("==================================================")
console.print("[bold cyan]PHASE 1: Processing Roles[/bold cyan]")
console.print("==================================================")
console.print("Fetching roles...")
roles_response = get_roles()
roles_data = roles_response.get("data", [])
output_data_roles = {}
console.print("Initializing user list from roles...")
for role in roles_data:
role_info = {"id": role.get("id"), "name": role.get("name")}
users = role.get("users", [])
for user_id in users:
if user_id not in output_data_roles:
output_data_roles[user_id] = {
"roles": [role_info],
"user": {},
"professional": {}
}
else:
existing_role_ids = [r["id"] for r in output_data_roles[user_id]["roles"]]
if role_info["id"] not in existing_role_ids:
output_data_roles[user_id]["roles"].append(role_info)
process_user_list(output_data_roles, "Roles")
# ========== PHASE 2: APPLICATIONS ==========
print()
console.print("==================================================")
console.print("[bold cyan]PHASE 2: Processing Applications[/bold cyan]")
console.print("==================================================")
console.print("Fetching applications...")
apps_response = get_applications()
# apps_response is a list of dicts
for app in apps_response:
app_name = app.get("label") or app.get("name") or "Unknown"
client_id = app.get("clientId")
if not client_id:
continue
print()
console.print(f"[bold magenta]--- Application: {app_name} ---[/bold magenta]")
console.print(f"Fetching profiles for {app_name}...")
profiles_response = get_profiles_by_app_id(client_id)
profiles_data = profiles_response.get("data", [])
output_data_app = {}
console.print(f"Initializing user list for {app_name}...")
for profile in profiles_data:
profile_info = {"id": profile.get("id"), "name": profile.get("name")}
profile_id = profile.get("id")
# Fetch users for this profile
# Note: The API returns a list of user IDs directly?
# Spec check: "get_users_by_profile_id : la réponse est un simple array de user_id"
try:
users_list = get_users_by_profile_id(profile_id)
# Ensure it's a list
if not isinstance(users_list, list):
users_list = []
except Exception as e:
logging.error(f"Error fetching users for profile {profile_id}: {e}")
users_list = []
for user_id in users_list:
if user_id not in output_data_app:
output_data_app[user_id] = {
"profiles": [profile_info],
"user": {},
"professional": {}
}
else:
existing_profile_ids = [p["id"] for p in output_data_app[user_id]["profiles"]]
if profile_info["id"] not in existing_profile_ids:
output_data_app[user_id]["profiles"].append(profile_info)
# Process this application's users
# Sanitize app_name for filename
safe_app_name = "".join([c if c.isalnum() else "_" for c in app_name])
process_user_list(output_data_app, f"Application {app_name}", f"_{safe_app_name}")
# ========== PHASE 3: ENDOBEST CENTERS ==========
process_endobest_centers()
# ========== FINALIZATION ==========
print()
console.print("[bold green]All processing complete.[/bold green]")
print(f"Total Elapsed time: {str(timedelta(seconds=perf_counter() - start_time))}")
# ============================================================================
# ENTRY POINT
# ============================================================================
if __name__ == '__main__':
# ========== LOGGING CONFIGURATION ==========
# Auto-generate log filename based on script name
script_name = os.path.splitext(os.path.basename(__file__))[0]
log_file_name = f"{script_name}.log"
logging.basicConfig(
level=LOG_LEVEL,
format=LOG_FORMAT,
filename=log_file_name,
filemode='w'
)
# ========== MAIN EXECUTION ==========
try:
main()
except Exception as e:
logging.critical(f"Script terminated with exception: {e}", exc_info=True)
print(f"\nScript stopped due to error: {e}")
print(traceback.format_exc())
finally:
# ========== CLEANUP ==========
# Shutdown thread pools gracefully
if 'main_thread_pool' in globals() and main_thread_pool:
main_thread_pool.shutdown(wait=False, cancel_futures=True)
if 'subtasks_thread_pool' in globals() and subtasks_thread_pool:
subtasks_thread_pool.shutdown(wait=False, cancel_futures=True)
# Pause before exit (prevents console from closing immediately when launched from Windows Explorer)
print('\n')
input("Press Enter to exit...")