commit 599360ba344d04cd1710c455539954e1f894c072 Author: Abdelkouddous LHACHIMI Date: Sat Dec 13 00:06:47 2025 +0100 Version Initiale diff --git a/README.md b/README.md new file mode 100644 index 0000000..17e4264 --- /dev/null +++ b/README.md @@ -0,0 +1,66 @@ +# Endobest Script Template + +> Template réutilisable pour créer des scripts d'accès aux données de la plateforme Endobest + +## 🚀 Démarrage Rapide + +### 1. Copier le template +```bash +cp eb_script_template.py ../Dashboard/mon_script.py +``` + +### 2. Configurer les microservices +Commenter les services non utilisés dans `MICROSERVICES` : +```python +MICROSERVICES = { + "IAM": {...}, # ✓ Toujours obligatoire + "RC": {...}, # ✓ Décommenter si besoin + # "GDD": {...}, # ✗ Commenter si pas besoin +} +``` + +### 3. Implémenter votre logique +Trouver le bloc `# TODO:` dans `main()` et ajouter votre code. + +### 4. Exécuter +```bash +cd ../Dashboard +python mon_script.py +``` + +--- + +## 📁 Fichiers + +| Fichier | Description | +|---------|-------------| +| **eb_script_template.py** | ⭐ Template principal (621 lignes) | +| **example_usage.py** | Exemple concret d'utilisation | +| **README_TEMPLATE.md** | Guide complet avec exemples | +| **Script_template_spec.md** | Spécification technique détaillée | + +--- + +## ✨ Fonctionnalités + +- ✅ Authentification multi-microservices (IAM, RC, GDD) +- ✅ Pool de clients HTTP thread-safe avec keep-alive +- ✅ Multithreading (pool principal + sous-tâches) +- ✅ Retry automatique + refresh token sur 401 +- ✅ Progress bars (tqdm) +- ✅ Logging auto-configuré +- ✅ Utilitaires JSON avec wildcards +- ✅ Templates d'API (GET/POST) +- ✅ Zéro module externe (tous utilitaires inclus) + +--- + +## 📖 Documentation + +**Démarrage** → [README_TEMPLATE.md](README_TEMPLATE.md) +**Détails techniques** → [Script_template_spec.md](Script_template_spec.md) +**Exemple complet** → [example_usage.py](example_usage.py) + +--- + +**Prêt à coder ! 🚀** diff --git a/README_TEMPLATE.md b/README_TEMPLATE.md new file mode 100644 index 0000000..c654590 --- /dev/null +++ b/README_TEMPLATE.md @@ -0,0 +1,309 @@ +# Endobest Script Template - Quick Start Guide + +## Overview + +`eb_script_template.py` is a reusable template for creating scripts that access Endobest clinical research platform data. + +## Features + +✅ **Multi-microservice authentication** (IAM, RC, GDD) +✅ **Thread-safe HTTP client pool** with keep-alive +✅ **Multithreading** with configurable main pool + fixed subtasks pool +✅ **Automatic retry** with token refresh on 401 +✅ **Progress bars** using tqdm +✅ **Logging** with auto-generated filename +✅ **JSON utilities** for nested data navigation + +## Quick Start + +### 1. Copy the template + +```bash +cp eb_script_template.py my_new_script.py +``` + +### 2. Configure microservices + +Edit the `MICROSERVICES` dict to enable only the services you need: + +```python +MICROSERVICES = { + # "IAM": {...}, # Always required + "RC": {...}, # Uncomment if needed + # "GDD": {...}, # Comment out if not needed +} +``` + +### 3. Implement your processing logic + +Find the `TODO` block in `main()` and add your code: + +```python +# ========== MAIN PROCESSING BLOCK ========== +# TODO: IMPLEMENT YOUR PROCESSING LOGIC HERE + +# Example: Fetch and process organizations +organizations = get_all_organizations() + +for org in organizations: + # Your processing logic here + process_organization(org) +``` + +### 4. Run the script + +```bash +python my_new_script.py +``` + +## Configuration + +### Constants (top of file) + +| Constant | Default | Description | +|----------|---------|-------------| +| `DEFAULT_USER_NAME` | `ziwig-invest2@yopmail.com` | Default login | +| `DEFAULT_PASSWORD` | `pbrrA***` | Default password | +| `MAX_THREADS` | `20` | Maximum threads for main pool | +| `SUBTASKS_POOL_SIZE` | `40` | Fixed size for subtasks pool | +| `ERROR_MAX_RETRY` | `10` | Max retry attempts | +| `WAIT_BEFORE_RETRY` | `0.5` | Delay between retries (seconds) | +| `API_TIMEOUT` | `60` | Default API timeout (seconds) | +| `LOG_LEVEL` | `logging.INFO` | Logging level | + +### Microservices Configuration + +Each microservice has: +- `app_id`: Client ID for token configuration +- `base_url`: API base URL +- `endpoints`: Dict of endpoint paths + +**Available endpoints (RC):** +- `organizations`: Get all organizations +- `statistics`: Get inclusion statistics +- `search_inclusions`: Search inclusions +- `record_by_patient`: Get patient record +- `surveys`: Get questionnaire responses + +**Available endpoints (GDD):** +- `request_by_tube`: Get request by tube ID + +## API Call Patterns + +### GET Request + +```python +@api_call_with_retry("RC") +def get_my_data(): + client = get_httpx_client() + client.base_url = MICROSERVICES["RC"]["base_url"] + response = client.get( + MICROSERVICES["RC"]["endpoints"]["organizations"], + headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"}, + timeout=API_TIMEOUT + ) + response.raise_for_status() + return response.json() +``` + +### POST Request + +```python +@api_call_with_retry("RC") +def post_my_data(param1, param2): + client = get_httpx_client() + client.base_url = MICROSERVICES["RC"]["base_url"] + response = client.post( + f"{MICROSERVICES['RC']['endpoints']['my_endpoint']}?param={param1}", + headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"}, + json={"key": param2}, + timeout=API_TIMEOUT + ) + response.raise_for_status() + return response.json() +``` + +## Utilities + +### get_nested_value() + +Navigate nested JSON structures with wildcard support: + +```python +# Simple navigation +value = get_nested_value(data, ["level1", "level2", "field"]) + +# Array wildcard +values = get_nested_value(data, ["items", "*", "name"]) +# Returns list of all "name" values from items array +``` + +### get_httpx_client() + +Get thread-local HTTP client (automatic keep-alive): + +```python +client = get_httpx_client() +client.base_url = "https://api.example.com" +response = client.get("/endpoint") +``` + +### get_thread_position() + +Get current thread position (for progress bar positioning): + +```python +position = get_thread_position() +# Use with tqdm position parameter for multi-threaded progress bars +``` + +## Multithreading Pattern + +### Simple parallel processing + +```python +items = [...] # Your data + +with tqdm(total=len(items), desc="Processing", bar_format=custom_bar_format) as pbar: + with main_thread_pool as executor: + futures = [executor.submit(process_item, item) for item in items] + + for future in as_completed(futures): + try: + result = future.result() + # Handle result + pbar.update(1) + except Exception as exc: + logging.critical(f"Error: {exc}", exc_info=True) + executor.shutdown(wait=False, cancel_futures=True) + raise +``` + +### Using subtasks pool + +```python +def process_item(item): + # Launch subtask in separate pool + future = subtasks_thread_pool.submit(fetch_details, item) + details = future.result() + return combine(item, details) +``` + +## Logging + +Logs are automatically written to `{script_name}.log`. + +Change log level in constants: + +```python +LOG_LEVEL = logging.DEBUG # For detailed logs +LOG_LEVEL = logging.INFO # Default +LOG_LEVEL = logging.WARNING # Errors only +``` + +## Error Handling + +### Automatic retry + +All API calls decorated with `@api_call_with_retry(app)` automatically: +- Retry on network errors +- Retry on HTTP errors +- Refresh token on 401 Unauthorized +- Respect `ERROR_MAX_RETRY` limit + +### Manual error handling + +```python +try: + result = my_api_call() +except httpx.RequestError as e: + logging.error(f"Request failed: {e}") + # Handle error +``` + +## Best Practices + +### 1. Configure only needed microservices +Comment out unused services to speed up authentication. + +### 2. Use constants for configuration +Avoid hardcoded values - update constants at top of file. + +### 3. Implement processing in main() +Keep your logic in the designated TODO block for clarity. + +### 4. Use progress bars +Help users understand processing status with tqdm. + +### 5. Log errors +Use `logging` module for debugging and audit trail. + +### 6. Test incrementally +Start with simple API call, then add threading, then complex logic. + +## Common Tasks + +### Task 1: Fetch all organizations + +```python +organizations = get_all_organizations() +for org in organizations: + print(f"{org['name']}: {org['id']}") +``` + +### Task 2: Process organizations in parallel + +```python +organizations = get_all_organizations() + +with tqdm(total=len(organizations), desc="Processing orgs") as pbar: + with main_thread_pool as executor: + futures = [executor.submit(process_org, org) for org in organizations] + for future in as_completed(futures): + result = future.result() + pbar.update(1) +``` + +### Task 3: Fetch nested data with subtasks + +```python +def process_organization(org): + org_id = org['id'] + + # Launch subtask to fetch inclusions + future = subtasks_thread_pool.submit(search_inclusions, org_id, 1000, 1) + inclusions = future.result() + + return { + "organization": org, + "inclusions": inclusions + } +``` + +## Troubleshooting + +### Login fails +- Check credentials in constants +- Verify network connectivity +- Check logs for detailed error + +### Token expired during execution +- Automatic refresh should handle this +- Check logs for refresh attempts +- Verify refresh token is valid + +### Script hangs +- Check thread pool shutdown in finally block +- Verify API timeouts are appropriate +- Review logs for deadlocks + +### Performance issues +- Adjust `MAX_THREADS` (more threads ≠ faster) +- Use subtasks pool for nested parallelism +- Profile with logging.DEBUG to find bottlenecks + +## Support + +For detailed technical specifications, see `Script_template_spec.md`. + +For issues with the Endobest platform APIs, contact the technical team. diff --git a/Script_template_spec.md b/Script_template_spec.md new file mode 100644 index 0000000..2ca35a4 --- /dev/null +++ b/Script_template_spec.md @@ -0,0 +1,628 @@ +# Template de script pour automatiser des extractions et/ou des modifications des données de la plateforme de recherche clinique Endobest + +## 1. OBJECTIF + +Créer un template constituant un point de départ réutilisable pour tout script d'accès aux données Endobest offrant les services minimum suivants : +- Authentification multi-microservices +- Pool de clients HTTP thread-safe +- Multithreading avec pool principal et pool de sous-tâches +- Gestion des erreurs avec retry automatique +- Logging configuré +- Barres de progression +- Templates d'API calls +- Utilitaires de navigation JSON + +**Contraintes :** +- Tous les utilitaires regroupés dans le même script (pas de modules externes) +- Configuration 100% par constantes en début de script (éviter les littéraux) +- Traitement "main" par défaut avec squelette standard + +--- + +## 2. ARCHITECTURE + +### 2.1 Structure du fichier + +``` +eb_script_template.py +├── IMPORTS +├── CONSTANTES DE CONFIGURATION +│ ├── Credentials +│ ├── Microservices (dictionnaire) +│ ├── Threading +│ ├── Retry & Logging +│ └── Progress bars +├── VARIABLES GLOBALES +├── UTILITAIRES +│ ├── get_nested_value() +│ ├── get_httpx_client() +│ ├── get_thread_position() +├── AUTHENTIFICATION +│ ├── login() +│ ├── new_token(app) +├── DECORATEURS +│ ├── api_call_with_retry(app) +├── API TEMPLATES +│ ├── api_call_template() +├── FONCTION MAIN +└── POINT D'ENTREE (if __name__ == '__main__') +``` + +### 2.2 Dépendances obligatoires + +```python +import json +import logging +import os +import sys +import threading +import traceback +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import timedelta +from time import perf_counter, sleep +import functools + +import httpx +import questionary +from tqdm import tqdm +from rich.console import Console +``` + +--- + +## 3. CONFIGURATION (CONSTANTES) + +### 3.1 Credentials + +```python +DEFAULT_USER_NAME = "ziwig-invest2@yopmail.com" +DEFAULT_PASSWORD = "pbrrA765$bP3beiuyuiyhiuy!agx" +``` + +### 3.2 Microservices (dictionnaire) + +Structure du dictionnaire MICROSERVICES : + +```python +MICROSERVICES = { + "IAM": { + "app_id": None, # IAM n'utilise pas d'app_id + "base_url": "https://api-auth.ziwig-connect.com", + "endpoints": { + "login": "/api/auth/ziwig-pro/login", + "refresh": "/api/auth/refreshToken", + } + }, + "RC": { + "app_id": "602aea51-cdb2-4f73-ac99-fd84050dc393", + "base_url": "https://api-hcp.ziwig-connect.com", + "endpoints": { + "config_token": "/api/auth/config-token", + "refresh": "/api/auth/refreshToken", + "organizations": "/api/inclusions/getAllOrganizations", + "statistics": "/api/inclusions/inclusion-statistics", + "search_inclusions": "/api/inclusions/search", + "record_by_patient": "/api/records/byPatient", + "surveys": "/api/surveys/filter/with-answers", + } + }, + "GDD": { + "app_id": None, # À compléter si différent de RC + "base_url": "https://api-lab.ziwig-connect.com", + "endpoints": { + "config_token": "/api/auth/config-token", + "refresh": "/api/auth/refreshToken", + "request_by_tube": "/api/requests/by-tube-id", + } + }, +} +``` + +**Notes :** +- Les microservices non utilisés peuvent être commentés par le développeur +- Chaque microservice définit : libellé (clé), app_id, base_url, endpoints +- Les endpoints sont tous ceux déjà configurés dans eb_dashboard.py + +### 3.3 Threading + +```python +MAX_THREADS = 20 # Limite maximale pour le pool principal +SUBTASKS_POOL_SIZE = 40 # Taille fixe du pool de sous-tâches +``` + +### 3.4 Retry & Timeouts + +```python +ERROR_MAX_RETRY = 10 # Nombre maximum de tentatives +WAIT_BEFORE_RETRY = 0.5 # Délai en secondes (fixe, pas exponentiel) +API_TIMEOUT = 60 # Timeout en secondes (modifiable globalement ou par API) +``` + +### 3.5 Logging + +```python +LOG_LEVEL = logging.INFO # Niveau de log par défaut (modifiable) +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' +# LOG_FILE_NAME sera généré automatiquement basé sur le nom du script +``` + +### 3.6 Progress Bars + +```python +BAR_N_FMT_WIDTH = 4 +BAR_TOTAL_FMT_WIDTH = 4 +BAR_TIME_WIDTH = 8 +BAR_RATE_WIDTH = 10 + +custom_bar_format = ("{l_bar}{bar}" + f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} " + f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, " + f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}") +``` + +--- + +## 4. VARIABLES GLOBALES + +```python +# Tokens par microservice (dictionnaire) +tokens = {} # Structure: {app_name: {"access_token": ..., "refresh_token": ...}} + +# Pool de clients HTTP thread-safe +httpx_clients = {} + +# Gestion des threads +threads_list = [] +_threads_list_lock = threading.Lock() +_token_refresh_lock = threading.Lock() + +# Pools de threads +main_thread_pool = None +subtasks_thread_pool = None + +# Console Rich +console = Console() +``` + +--- + +## 5. UTILITAIRES + +### 5.1 get_nested_value(data_structure, path, default=None) + +Navigation dans structures JSON imbriquées avec support wildcard '*'. + +**Signature :** +```python +def get_nested_value(data_structure, path, default=None): + """ + Extracts a value from nested dict/list structures. + + Args: + data_structure: Nested dict/list to navigate + path: List of keys/indices. Use '*' for list wildcard. + default: Value to return if path not found + + Returns: + Value at path or default + + Examples: + get_nested_value({"a": {"b": 1}}, ["a", "b"]) -> 1 + get_nested_value({"items": [{"x": 1}, {"x": 2}]}, ["items", "*", "x"]) -> [1, 2] + """ +``` + +**Source :** eb_dashboard_utils.py:71-154 + +### 5.2 get_httpx_client() -> httpx.Client + +Retourne un client HTTP thread-local avec keep-alive. + +**Signature :** +```python +def get_httpx_client() -> httpx.Client: + """ + Get or create thread-local HTTP client with keep-alive enabled. + Each thread gets its own httpx.Client instance to avoid connection conflicts. + """ +``` + +**Source :** eb_dashboard_utils.py:35-49 + +### 5.3 get_thread_position() -> int + +Retourne la position du thread actuel dans la liste (pour barres de progression). + +**Signature :** +```python +def get_thread_position() -> int: + """ + Get the position of the current thread in the threads list. + Used for managing progress bar positions in multithreaded environment. + """ +``` + +**Source :** eb_dashboard_utils.py:52-64 + +--- + +## 6. AUTHENTIFICATION + +### 6.1 Fonction login() + +**Responsabilités :** +1. Demander login/password avec questionary (valeurs par défaut depuis constantes) +2. Se connecter à l'IAM et obtenir le master token +3. Pour chaque microservice (sauf IAM) : appeler config-token et stocker access_token + refresh_token + +**Logique :** +```python +def login(): + """ + Authenticate with IAM and configure tokens for all microservices. + + Returns: + "Success", "Error", or "Exit" + """ + global tokens + + # 1. Demander credentials avec questionary + user_name = questionary.text("login:", default=DEFAULT_USER_NAME).ask() + password = questionary.password("password:", default=DEFAULT_PASSWORD).ask() + + if not (user_name and password): + return "Exit" + + # 2. Login IAM -> master_token + user_id + try: + client = get_httpx_client() + client.base_url = MICROSERVICES["IAM"]["base_url"] + response = client.post( + MICROSERVICES["IAM"]["endpoints"]["login"], + json={"username": user_name, "password": password}, + timeout=20 + ) + response.raise_for_status() + master_token = response.json()["access_token"] + user_id = response.json()["userId"] + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + print(f"Login Error: {exc}") + logging.warning(f"Login Error: {exc}") + return "Error" + + # 3. Config-token pour chaque microservice + for app_name, app_config in MICROSERVICES.items(): + if app_name == "IAM": + continue # IAM n'a pas besoin de config-token + + try: + client = get_httpx_client() + client.base_url = app_config["base_url"] + response = client.post( + app_config["endpoints"]["config_token"], + headers={"Authorization": f"Bearer {master_token}"}, + json={ + "userId": user_id, + "clientId": app_config["app_id"], + "userAgent": "Mozilla/5.0 ..." + }, + timeout=20 + ) + response.raise_for_status() + tokens[app_name] = { + "access_token": response.json()["access_token"], + "refresh_token": response.json()["refresh_token"] + } + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + print(f"Config-token Error for {app_name}: {exc}") + logging.warning(f"Config-token Error for {app_name}: {exc}") + return "Error" + + print("\nLogin Success") + return "Success" +``` + +**Source :** eb_dashboard.py:210-258 (adapté) + +### 6.2 Fonction new_token(app) + +**Responsabilités :** +Actualiser l'access_token d'un microservice spécifique en utilisant son refresh_token. + +**Signature :** +```python +def new_token(app): + """ + Refresh access token for a specific microservice. + + Args: + app: Microservice name (e.g., "RC", "GDD") + """ + global tokens + + with _token_refresh_lock: + for attempt in range(ERROR_MAX_RETRY): + try: + client = get_httpx_client() + client.base_url = MICROSERVICES[app]["base_url"] + response = client.post( + MICROSERVICES[app]["endpoints"]["refresh"], + headers={"Authorization": f"Bearer {tokens[app]['access_token']}"}, + json={"refresh_token": tokens[app]["refresh_token"]}, + timeout=20 + ) + response.raise_for_status() + tokens[app]["access_token"] = response.json()["access_token"] + tokens[app]["refresh_token"] = response.json()["refresh_token"] + return + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + logging.warning(f"Refresh Token Error for {app} (Attempt {attempt + 1}): {exc}") + if attempt < ERROR_MAX_RETRY - 1: + sleep(WAIT_BEFORE_RETRY) + + logging.critical(f"Persistent error in refresh_token for {app}") + raise httpx.RequestError(message=f"Persistent error in refresh_token for {app}") +``` + +**Source :** eb_dashboard.py:157-181 (adapté) + +--- + +## 7. DECORATEURS + +### 7.1 api_call_with_retry(app) + +Décorateur pour API calls avec retry automatique et refresh token sur erreur 401. + +**Signature :** +```python +def api_call_with_retry(app): + """ + Decorator for API calls with automatic retry and token refresh on 401 errors. + + Args: + app: Microservice name (e.g., "RC", "GDD") + + Usage: + @api_call_with_retry("RC") + def get_organizations(): + ... + """ + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + func_name = func.__name__ + for attempt in range(ERROR_MAX_RETRY): + try: + return func(*args, **kwargs) + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + logging.warning(f"Error in {func_name} (Attempt {attempt + 1}/{ERROR_MAX_RETRY}): {exc}") + if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: + logging.info(f"Token expired for {func_name}. Refreshing token for {app}.") + new_token(app) + if attempt < ERROR_MAX_RETRY - 1: + sleep(WAIT_BEFORE_RETRY) + + logging.critical(f"Persistent error in {func_name} after {ERROR_MAX_RETRY} attempts.") + raise httpx.RequestError(message=f"Persistent error in {func_name}") + + return wrapper + return decorator +``` + +**Source :** eb_dashboard.py:184-203 (adapté) + +--- + +## 8. API TEMPLATES + +### 8.1 api_call_template() + +Template générique pour API call (GET/POST/PUT/PATCH/DELETE au choix du développeur). + +**Exemple :** +```python +@api_call_with_retry("RC") +def get_all_organizations(): + """Example API call: Get all organizations.""" + client = get_httpx_client() + client.base_url = MICROSERVICES["RC"]["base_url"] + response = client.get( + MICROSERVICES["RC"]["endpoints"]["organizations"], + headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"}, + timeout=API_TIMEOUT # Ou timeout spécifique + ) + response.raise_for_status() + return response.json() + + +@api_call_with_retry("RC") +def search_inclusions(organization_id, limit, page): + """Example API call: Search inclusions (POST).""" + client = get_httpx_client() + client.base_url = MICROSERVICES["RC"]["base_url"] + response = client.post( + f"{MICROSERVICES['RC']['endpoints']['search_inclusions']}?limit={limit}&page={page}", + headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"}, + json={"protocolId": "...", "center": organization_id, "keywords": ""}, + timeout=API_TIMEOUT + ) + response.raise_for_status() + return response.json() +``` + +**Notes :** +- Toujours retourner `response.json()` +- Le développeur choisit la méthode HTTP (GET/POST/PUT/DELETE) +- Timeout configurable globalement (API_TIMEOUT) ou localement + +**Source :** eb_dashboard.py:773-843 (exemples) + +--- + +## 9. FONCTION MAIN + +### 9.1 Structure + +```python +def main(): + """Main processing function.""" + global main_thread_pool, subtasks_thread_pool + + # ============================================================================ + # AUTHENTICATION + # ============================================================================ + print() + login_status = login() + while login_status == "Error": + login_status = login() + if login_status == "Exit": + return + + # ============================================================================ + # CONFIGURATION + # ============================================================================ + print() + number_of_threads = int( + questionary.text( + "Number of threads:", + default="12", + validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS + ).ask() + ) + + # ============================================================================ + # INITIALIZATION + # ============================================================================ + start_time = perf_counter() + + # Initialize thread pools + main_thread_pool = ThreadPoolExecutor(max_workers=number_of_threads) + subtasks_thread_pool = ThreadPoolExecutor(max_workers=SUBTASKS_POOL_SIZE) + + # ============================================================================ + # MAIN PROCESSING BLOCK + # ============================================================================ + print() + console.print("[bold cyan]Starting main processing...[/bold cyan]") + + # TODO: Developer implements processing logic here + # Example structure: + # + # with tqdm(total=total_items, desc="Processing", bar_format=custom_bar_format) as pbar: + # with main_thread_pool as executor: + # futures = [executor.submit(process_item, item) for item in items] + # for future in as_completed(futures): + # try: + # result = future.result() + # # Process result + # pbar.update(1) + # except Exception as exc: + # logging.critical(f"Error in worker: {exc}", exc_info=True) + # print(f"\nCRITICAL ERROR: {exc}") + # executor.shutdown(wait=False, cancel_futures=True) + # raise + + # ============================================================================ + # FINALIZATION + # ============================================================================ + print() + print(f"Elapsed time: {str(timedelta(seconds=perf_counter() - start_time))}") +``` + +**Source :** eb_dashboard.py:1090-1285 (adapté) + +--- + +## 10. POINT D'ENTREE + +### 10.1 Structure avec finally + +```python +if __name__ == '__main__': + # Logging configuration (filename basé sur le nom du script) + script_name = os.path.splitext(os.path.basename(__file__))[0] + log_file_name = f"{script_name}.log" + + logging.basicConfig( + level=LOG_LEVEL, + format=LOG_FORMAT, + filename=log_file_name, + filemode='w' + ) + + try: + main() + except Exception as e: + logging.critical(f"Script terminated with exception: {e}", exc_info=True) + print(f"\nScript stopped due to error: {e}") + finally: + # Shutdown thread pools + if 'main_thread_pool' in globals() and main_thread_pool: + main_thread_pool.shutdown(wait=False, cancel_futures=True) + if 'subtasks_thread_pool' in globals() and subtasks_thread_pool: + subtasks_thread_pool.shutdown(wait=False, cancel_futures=True) + + # Pause before exit (pour que la console ne se ferme pas immédiatement) + print('\n') + input("Press Enter to exit...") +``` + +**Source :** eb_dashboard.py:1287-1299 (adapté) + +--- + +## 11. PLAN D'IMPLEMENTATION + +### Phase 1 : Squelette et infrastructure +1. Créer `eb_script_template.py` +2. Ajouter imports et constantes +3. Implémenter variables globales +4. Implémenter utilitaires (get_nested_value, get_httpx_client, get_thread_position) + +### Phase 2 : Authentification +5. Implémenter login() avec IAM + config-token multi-microservices +6. Implémenter new_token(app) +7. Implémenter décorateur api_call_with_retry(app) + +### Phase 3 : API Templates +8. Créer exemples d'API calls (GET/POST) +9. Documenter les patterns + +### Phase 4 : Fonction main et point d'entrée +10. Implémenter main() avec structure complète +11. Implémenter point d'entrée avec finally +12. Configurer logging automatique basé sur nom du script + +### Phase 5 : Documentation et tests +13. Ajouter docstrings détaillés +14. Ajouter commentaires pour guider le développeur +15. Tester le template avec un cas d'usage simple + +--- + +## 12. NOTES TECHNIQUES + +### 12.1 Choix de design + +- **Pas de modules externes** : Tous les utilitaires dans un seul fichier pour faciliter la réutilisation +- **Configuration par constantes** : Facilite la personnalisation sans chercher dans le code +- **Dictionnaire MICROSERVICES** : Architecture extensible pour ajouter de nouveaux services +- **Tokens par app** : Permet la gestion indépendante de chaque microservice +- **Retry fixe** : Délai constant (pas exponentiel) pour simplifier le debugging + +### 12.2 Points d'extension pour le développeur + +Le développeur peut personnaliser : +- Constantes (credentials, timeouts, retry, logging level) +- Microservices utilisés (commenter ceux non nécessaires) +- Endpoints (ajouter selon besoins) +- Méthodes HTTP dans les templates +- Logique de traitement dans main() +- Taille du pool de threads + +### 12.3 Compatibilité + +- Compatible avec eb_dashboard.py (même architecture) +- Réutilise les mêmes patterns éprouvés +- Extensible pour futurs scripts Endobest diff --git a/eb_script_template.bat b/eb_script_template.bat new file mode 100644 index 0000000..eb36bb4 --- /dev/null +++ b/eb_script_template.bat @@ -0,0 +1,4 @@ +@echo off +call C:\PythonProjects\.rcvenv\Scripts\activate.bat +python eb_script_template.py %* + diff --git a/eb_script_template.py b/eb_script_template.py new file mode 100644 index 0000000..05786bb --- /dev/null +++ b/eb_script_template.py @@ -0,0 +1,641 @@ +""" +Endobest Script Template + +Template for creating scripts to access Endobest clinical research platform data. + +FEATURES: +- Multi-microservice authentication (IAM, RC, GDD) +- Thread-safe HTTP client pool +- Multithreading with main pool + subtasks pool +- Automatic retry with token refresh on 401 +- Progress bars and logging +- Utilities for JSON navigation + +HOW TO USE: +1. Configure MICROSERVICES dict (comment unused services) +2. Implement your processing logic in main() function +3. Use API templates as examples for your own endpoints +4. Customize constants as needed (timeouts, threads, etc.) + +QUICK START: +- Run script: python eb_script_template.py +- Login with credentials (defaults provided) +- Choose number of threads +- Processing happens in main() TODO block + +For detailed documentation, see Script_template_spec.md +""" + +import json +import logging +import os +import sys +import threading +import traceback +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import timedelta +from time import perf_counter, sleep +import functools + +import httpx +import questionary +from tqdm import tqdm +from rich.console import Console + + +# ============================================================================ +# CONFIGURATION - CREDENTIALS +# ============================================================================ + +DEFAULT_USER_NAME = "ziwig-invest2@yopmail.com" +DEFAULT_PASSWORD = "pbrrA765$bP3beiuyuiyhiuy!agx" +REALME = "ziwig-pro" + + +# ============================================================================ +# CONFIGURATION - MICROSERVICES +# ============================================================================ +# Comment out unused microservices to skip their token configuration + +MICROSERVICES = { + "IAM": { + "app_id": None, # IAM doesn't use app_id + "base_url": "https://api-auth.ziwig-connect.com", + "endpoints": { + "login": "/api/auth/{REALME}/login", # POST : Body = {"username": "{user_name}", "password": "{pass}"} + "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} + "get_roles": "/api/profiles/paginate", # POST : Body = {"limit": 100, "currentPage": 1, "sort": [], "filters": {}} + "get_user_by_id": "/api/users/find/{user_id}?domaine={REALME}", # GET + "get_applications": "/api/applications", # GET + "get_profiles_by_app_id": "/api/identity-profiles/paginate", # POST : Body = {"page":null,"limit":100,"search":{},"clientId":"{app_id}","type":"user"} + "get_users_by_profile_id": "/api/identity-profiles/{profile_id}/users", # GET + } + }, + "RC": { + "app_id": "602aea51-cdb2-4f73-ac99-fd84050dc393", + "base_url": "https://api-hcp.ziwig-connect.com", + "endpoints": { + "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}} + "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} + "organizations": "/api/inclusions/getAllOrganizations", # GET + "statistics": "/api/inclusions/inclusion-statistics", # POST : Body = {"center": "rc_endobest_current_center}}", "protocolId": "{rc_endobest_prot_id}", "excludedCenters": {rc_endobest_excl_centers}} + "search_inclusions": "/api/inclusions/search?limit={limit}&page={page}", # POST : Body = {"protocolId": "3c7bcb4d-91ed-4e9f-b93f-99d8447a276e", "center": organization_id, "keywords": ""} + "record_by_patient": "/api/records/byPatient", # POST : Body = {"center": "{rc_endobest_current_center}", "patientId": "{patient_id}", "mode": "exchange", "state": "ongoing", "includeEndoParcour": false, "sourceClient": "pro_prm"}, + "surveys": "/api/surveys/filter/with-answers", #POST : Body = {"context": "clinic_research", "subject": "{patient_id}", "blockedQcmVersions": {blocked_qcm_versions}} + } + }, + "GDD": { + "app_id": "4f5ac063-6a22-4e2c-bda5-b50c0dddab79", + "base_url": "https://api-lab.ziwig-connect.com", + "endpoints": { + "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}} + "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} + "request_by_tube": "/api/requests/by-tube-id/{tube_id}", # GET + } + }, + "HRD": { + "app_id": "93bc44fd-c64b-4fff-a450-f3cba956e934", + "base_url": "https://api-resources.ziwig-connect.com", + "endpoints": { + "config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}} + "refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"} + "pro_by_id": "api/entity-manager/meta/{model}/data/nodes/pro/{pro_id}?relationships=2", # GET + "get_pros_by_endobest_center": "/api/entity-manager/meta/modele_fr/data/orga/{organization_id}/centers/pros?limit=1000", # GET + } + }, +} + + +# ============================================================================ +# CONFIGURATION - THREADING +# ============================================================================ + +MAX_THREADS = 20 # Maximum threads for main pool +SUBTASKS_POOL_SIZE = 40 # Fixed size for subtasks pool + + +# ============================================================================ +# CONFIGURATION - RETRY & TIMEOUTS +# ============================================================================ + +ERROR_MAX_RETRY = 10 # Max retry attempts for API calls +WAIT_BEFORE_RETRY = 0.5 # Delay in seconds between retries (fixed) +API_TIMEOUT = 60 # Default timeout for API calls (seconds) + + +# ============================================================================ +# CONFIGURATION - LOGGING +# ============================================================================ + +LOG_LEVEL = logging.INFO # Change to DEBUG for detailed logs +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' +# LOG_FILE_NAME auto-generated based on script name in __main__ + + +# ============================================================================ +# CONFIGURATION - PROGRESS BARS +# ============================================================================ + +BAR_N_FMT_WIDTH = 4 +BAR_TOTAL_FMT_WIDTH = 4 +BAR_TIME_WIDTH = 8 +BAR_RATE_WIDTH = 10 + +custom_bar_format = ("{l_bar}{bar}" + f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} " + f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, " + f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}") + + +# ============================================================================ +# GLOBAL VARIABLES +# ============================================================================ + +# Tokens storage: {app_name: {"access_token": ..., "refresh_token": ...}} +tokens = {} + +# Thread-safe HTTP client pool (one client per thread) +httpx_clients = {} + +# Thread management +threads_list = [] +_threads_list_lock = threading.Lock() +_token_refresh_lock = threading.Lock() + +# Thread pools (initialized in main()) +main_thread_pool = None +subtasks_thread_pool = None + +# Rich console for formatted output +console = Console() + + +# ============================================================================ +# UTILITIES +# ============================================================================ + +def get_nested_value(data_structure, path, default=None): + """ + Extract value from nested dict/list structures with wildcard support. + + Args: + data_structure: Nested dict/list to navigate + path: List of keys/indices. Use '*' for list wildcard + default: Value to return if path not found + + Returns: + Value at path, or default if not found + + Examples: + get_nested_value({"a": {"b": 1}}, ["a", "b"]) -> 1 + get_nested_value({"items": [{"x": 1}, {"x": 2}]}, ["items", "*", "x"]) -> [1, 2] + """ + if data_structure is None: + return "$$$$ No Data" + if not path: + return default + + # Handle wildcard in path + if "*" in path: + wildcard_index = path.index("*") + path_before = path[:wildcard_index] + path_after = path[wildcard_index+1:] + + # Helper for non-wildcard path resolution + def _get_simple_nested_value(ds, p, d): + cl = ds + for k in p: + if isinstance(cl, dict): + cl = cl.get(k) + elif isinstance(cl, list): + try: + if isinstance(k, int) and -len(cl) <= k < len(cl): + cl = cl[k] + else: + return d + except (IndexError, TypeError): + return d + else: + return d + if cl is None: + return d + return cl + + base_level = _get_simple_nested_value(data_structure, path_before, default) + + if not isinstance(base_level, list): + return default + + results = [] + for item in base_level: + value = get_nested_value(item, path_after, default) + if value is not default and value != "$$$$ No Data": + results.append(value) + + # Flatten one level for multiple wildcards + final_results = [] + for res in results: + if isinstance(res, list): + final_results.extend(res) + else: + final_results.append(res) + + return final_results + + # No wildcard - standard traversal + current_level = data_structure + for key_or_index in path: + if isinstance(current_level, dict): + current_level = current_level.get(key_or_index) + if current_level is None: + return default + elif isinstance(current_level, list): + try: + if isinstance(key_or_index, int) and -len(current_level) <= key_or_index < len(current_level): + current_level = current_level[key_or_index] + else: + return default + except (IndexError, TypeError): + return default + else: + return default + return current_level + + +def get_httpx_client() -> httpx.Client: + """ + Get or create thread-local HTTP client with keep-alive enabled. + Each thread gets its own client to avoid connection conflicts. + + Returns: + httpx.Client instance for current thread + """ + global httpx_clients + thread_id = threading.get_ident() + if thread_id not in httpx_clients: + httpx_clients[thread_id] = httpx.Client( + headers={"Connection": "keep-alive"}, + limits=httpx.Limits(max_keepalive_connections=20, max_connections=100) + ) + return httpx_clients[thread_id] + + +def get_thread_position(): + """ + Get position of current thread in threads list. + Used for managing progress bar positions in multithreaded environment. + + Returns: + Zero-based index of current thread + """ + global threads_list + thread_id = threading.get_ident() + with _threads_list_lock: + if thread_id not in threads_list: + threads_list.append(thread_id) + return len(threads_list) - 1 + else: + return threads_list.index(thread_id) + + +# ============================================================================ +# AUTHENTICATION +# ============================================================================ + +def login(): + """ + Authenticate with IAM and configure tokens for all microservices. + + Process: + 1. Prompt for credentials (with defaults) + 2. Login to IAM -> get master_token and user_id + 3. For each microservice (except IAM): call config-token API + 4. Store access_token and refresh_token for each service + + Returns: + "Success": Authentication succeeded for all services + "Error": Authentication failed (can retry) + "Exit": User cancelled login + """ + global tokens + + # Prompt for credentials + user_name = questionary.text("login:", default=DEFAULT_USER_NAME).ask() + password = questionary.password("password:", default=DEFAULT_PASSWORD).ask() + + if not (user_name and password): + return "Exit" + + # Step 1: Login to IAM + try: + client = get_httpx_client() + client.base_url = MICROSERVICES["IAM"]["base_url"] + response = client.post( + MICROSERVICES["IAM"]["endpoints"]["login"].format(**{**globals(),**locals()}), + json={"username": user_name, "password": password}, + timeout=20 + ) + response.raise_for_status() + master_token = response.json()["access_token"] + user_id = response.json()["userId"] + tokens["IAM"] = { + "access_token": master_token, + "refresh_token": response.json()["refresh_token"] + } + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + print(f"Login Error: {exc}") + logging.warning(f"Login Error: {exc}") + return "Error" + + # Step 2: Configure tokens for each microservice + for app_name, app_config in MICROSERVICES.items(): + if app_name == "IAM": + continue # IAM doesn't need config-token + + try: + client = get_httpx_client() + client.base_url = app_config["base_url"] + response = client.post( + app_config["endpoints"]["config_token"].format(**{**globals(),**locals()}), + headers={"Authorization": f"Bearer {master_token}"}, + json={ + "userId": user_id, + "clientId": app_config["app_id"], + "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36" + }, + timeout=20 + ) + response.raise_for_status() + tokens[app_name] = { + "access_token": response.json()["access_token"], + "refresh_token": response.json()["refresh_token"] + } + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + print(f"Config-token Error for {app_name}: {exc}") + logging.warning(f"Config-token Error for {app_name}: {exc}") + return "Error" + + print("\nLogin Success") + return "Success" + + +def new_token(app): + """ + Refresh access token for a specific microservice. + + Uses refresh_token to obtain new access_token and refresh_token. + Thread-safe with lock to prevent concurrent refresh attempts. + + Args: + app: Microservice name (e.g., "RC", "GDD") + + Raises: + httpx.RequestError: If refresh fails after all retries + """ + global tokens + + with _token_refresh_lock: + for attempt in range(ERROR_MAX_RETRY): + try: + client = get_httpx_client() + client.base_url = MICROSERVICES[app]["base_url"] + response = client.post( + MICROSERVICES[app]["endpoints"]["refresh"].format(**{**globals(),**locals()}), + headers={"Authorization": f"Bearer {tokens[app]['access_token']}"}, + json={"refresh_token": tokens[app]["refresh_token"]}, + timeout=20 + ) + response.raise_for_status() + tokens[app]["access_token"] = response.json()["access_token"] + tokens[app]["refresh_token"] = response.json()["refresh_token"] + return + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + logging.warning(f"Refresh Token Error for {app} (Attempt {attempt + 1}): {exc}") + if attempt < ERROR_MAX_RETRY - 1: + sleep(WAIT_BEFORE_RETRY) + + logging.critical(f"Persistent error in refresh_token for {app}") + raise httpx.RequestError(message=f"Persistent error in refresh_token for {app}") + + +# ============================================================================ +# DECORATORS +# ============================================================================ + +def api_call_with_retry(app): + """ + Decorator for API calls with automatic retry and token refresh on 401. + + Features: + - Retries on network errors (httpx.RequestError) + - Retries on HTTP errors (httpx.HTTPStatusError) + - Automatically refreshes token on 401 Unauthorized + - Configurable retry count and delay + + Args: + app: Microservice name for token refresh (e.g., "RC", "GDD") + + Usage: + @api_call_with_retry("RC") + def get_organizations(): + # API call implementation + ... + + Raises: + httpx.RequestError: If all retries exhausted + """ + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + func_name = func.__name__ + for attempt in range(ERROR_MAX_RETRY): + try: + return func(*args, **kwargs) + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + logging.warning(f"Error in {func_name} (Attempt {attempt + 1}/{ERROR_MAX_RETRY}): {exc}") + + # Auto-refresh token on 401 + if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: + logging.info(f"Token expired for {func_name}. Refreshing token for {app}.") + new_token(app) + + if attempt < ERROR_MAX_RETRY - 1: + sleep(WAIT_BEFORE_RETRY) + + logging.critical(f"Persistent error in {func_name} after {ERROR_MAX_RETRY} attempts.") + raise httpx.RequestError(message=f"Persistent error in {func_name}") + + return wrapper + return decorator + + +# ============================================================================ +# API TEMPLATES +# ============================================================================ +# Templates for common API patterns. Duplicate and modify for your needs. +# Remember to: +# - Choose appropriate HTTP method (GET/POST/PUT/DELETE) +# - Update endpoint from MICROSERVICES dict +# - Adjust timeout if needed (global API_TIMEOUT or specific value) +# - Always return response.json() + +@api_call_with_retry("RC") +def get_all_organizations(): + """ + Example API call using GET method. + + Returns: + List of organization dictionaries + """ + client = get_httpx_client() + client.base_url = MICROSERVICES["RC"]["base_url"].format(**{**globals(),**locals()}) + response = client.get( + MICROSERVICES["RC"]["endpoints"]["organizations"], + headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"}, + timeout=API_TIMEOUT + ) + response.raise_for_status() + return response.json() + + +@api_call_with_retry("RC") +def search_inclusions(organization_id, limit, page): + """ + Example API call using POST method with query params and JSON body. + + Args: + organization_id: Organization UUID + limit: Max results per page + page: Page number (1-based) + + Returns: + Dict with "data" key containing list of inclusions + """ + client = get_httpx_client() + client.base_url = MICROSERVICES["RC"]["base_url"].format(**{**globals(),**locals()}) + response = client.post( + f"{MICROSERVICES['RC']['endpoints']['search_inclusions']}?limit={limit}&page={page}", + headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"}, + json={ + "protocolId": "3c7bcb4d-91ed-4e9f-b93f-99d8447a276e", # TODO: Configure if needed + "center": organization_id, + "keywords": "" + }, + timeout=API_TIMEOUT + ) + response.raise_for_status() + return response.json() + + +# ============================================================================ +# MAIN PROCESSING +# ============================================================================ + +def main(): + """ + Main processing function. + + Structure: + 1. Authentication + 2. Configuration (thread count) + 3. Initialization (thread pools, timing) + 4. Main processing block (TODO: implement your logic here) + 5. Finalization (elapsed time) + """ + global main_thread_pool, subtasks_thread_pool + + # ========== AUTHENTICATION ========== + print() + login_status = login() + while login_status == "Error": + login_status = login() + if login_status == "Exit": + return + + # ========== CONFIGURATION ========== + print() + number_of_threads = int( + questionary.text( + "Number of threads:", + default="12", + validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS + ).ask() + ) + + # ========== INITIALIZATION ========== + start_time = perf_counter() + + # Initialize thread pools + main_thread_pool = ThreadPoolExecutor(max_workers=number_of_threads) + subtasks_thread_pool = ThreadPoolExecutor(max_workers=SUBTASKS_POOL_SIZE) + + # ========== MAIN PROCESSING BLOCK ========== + print() + console.print("[bold cyan]Starting main processing...[/bold cyan]") + + # TODO: IMPLEMENT YOUR PROCESSING LOGIC HERE + # + # Example pattern with progress bar and multithreading: + # + # items = [...] # Your data to process + # + # with tqdm(total=len(items), desc="Processing items", + # bar_format=custom_bar_format) as pbar: + # with main_thread_pool as executor: + # futures = [executor.submit(process_item, item) for item in items] + # + # for future in as_completed(futures): + # try: + # result = future.result() + # # Process result here + # pbar.update(1) + # except Exception as exc: + # logging.critical(f"Error in worker: {exc}", exc_info=True) + # print(f"\nCRITICAL ERROR: {exc}") + # executor.shutdown(wait=False, cancel_futures=True) + # raise + # + # Example: Simple test to verify authentication works + # organizations = get_all_organizations() + # console.print(f"[green]Retrieved {len(organizations)} organizations[/green]") + + # ========== FINALIZATION ========== + print() + print(f"Elapsed time: {str(timedelta(seconds=perf_counter() - start_time))}") + + +# ============================================================================ +# ENTRY POINT +# ============================================================================ + +if __name__ == '__main__': + # ========== LOGGING CONFIGURATION ========== + # Auto-generate log filename based on script name + script_name = os.path.splitext(os.path.basename(__file__))[0] + log_file_name = f"{script_name}.log" + + logging.basicConfig( + level=LOG_LEVEL, + format=LOG_FORMAT, + filename=log_file_name, + filemode='w' + ) + + # ========== MAIN EXECUTION ========== + try: + main() + except Exception as e: + logging.critical(f"Script terminated with exception: {e}", exc_info=True) + print(f"\nScript stopped due to error: {e}") + print(traceback.format_exc()) + finally: + # ========== CLEANUP ========== + # Shutdown thread pools gracefully + if 'main_thread_pool' in globals() and main_thread_pool: + main_thread_pool.shutdown(wait=False, cancel_futures=True) + if 'subtasks_thread_pool' in globals() and subtasks_thread_pool: + subtasks_thread_pool.shutdown(wait=False, cancel_futures=True) + + # Pause before exit (prevents console from closing immediately when launched from Windows Explorer) + print('\n') + input("Press Enter to exit...") diff --git a/example_usage.py b/example_usage.py new file mode 100644 index 0000000..206297d --- /dev/null +++ b/example_usage.py @@ -0,0 +1,522 @@ +""" +Example Usage of eb_script_template.py + +This file demonstrates how to use the template for a real-world task: +Fetching all organizations and their inclusion counts. + +Copy this pattern to create your own scripts. +""" + +import json +import logging +import os +import sys +import threading +import traceback +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import timedelta +from time import perf_counter, sleep +import functools + +import httpx +import questionary +from tqdm import tqdm +from rich.console import Console + + +# ============================================================================ +# CONFIGURATION - CREDENTIALS +# ============================================================================ + +DEFAULT_USER_NAME = "ziwig-invest2@yopmail.com" +DEFAULT_PASSWORD = "pbrrA765$bP3beiuyuiyhiuy!agx" + + +# ============================================================================ +# CONFIGURATION - MICROSERVICES +# ============================================================================ + +MICROSERVICES = { + "IAM": { + "app_id": None, + "base_url": "https://api-auth.ziwig-connect.com", + "endpoints": { + "login": "/api/auth/ziwig-pro/login", + "refresh": "/api/auth/refreshToken", + } + }, + "RC": { + "app_id": "602aea51-cdb2-4f73-ac99-fd84050dc393", + "base_url": "https://api-hcp.ziwig-connect.com", + "endpoints": { + "config_token": "/api/auth/config-token", + "refresh": "/api/auth/refreshToken", + "organizations": "/api/inclusions/getAllOrganizations", + "statistics": "/api/inclusions/inclusion-statistics", + } + }, + # GDD not needed for this example +} + + +# ============================================================================ +# CONFIGURATION - THREADING +# ============================================================================ + +MAX_THREADS = 20 +SUBTASKS_POOL_SIZE = 40 + + +# ============================================================================ +# CONFIGURATION - RETRY & TIMEOUTS +# ============================================================================ + +ERROR_MAX_RETRY = 10 +WAIT_BEFORE_RETRY = 0.5 +API_TIMEOUT = 60 + + +# ============================================================================ +# CONFIGURATION - LOGGING +# ============================================================================ + +LOG_LEVEL = logging.INFO +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' + + +# ============================================================================ +# CONFIGURATION - PROGRESS BARS +# ============================================================================ + +BAR_N_FMT_WIDTH = 4 +BAR_TOTAL_FMT_WIDTH = 4 +BAR_TIME_WIDTH = 8 +BAR_RATE_WIDTH = 10 + +custom_bar_format = ("{l_bar}{bar}" + f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} " + f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, " + f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}") + + +# ============================================================================ +# GLOBAL VARIABLES +# ============================================================================ + +tokens = {} +httpx_clients = {} +threads_list = [] +_threads_list_lock = threading.Lock() +_token_refresh_lock = threading.Lock() +main_thread_pool = None +subtasks_thread_pool = None +console = Console() + + +# ============================================================================ +# UTILITIES (copied from template) +# ============================================================================ + +def get_nested_value(data_structure, path, default=None): + """Extract value from nested dict/list structures with wildcard support.""" + if data_structure is None: + return "$$$$ No Data" + if not path: + return default + + if "*" in path: + wildcard_index = path.index("*") + path_before = path[:wildcard_index] + path_after = path[wildcard_index+1:] + + def _get_simple_nested_value(ds, p, d): + cl = ds + for k in p: + if isinstance(cl, dict): + cl = cl.get(k) + elif isinstance(cl, list): + try: + if isinstance(k, int) and -len(cl) <= k < len(cl): + cl = cl[k] + else: + return d + except (IndexError, TypeError): + return d + else: + return d + if cl is None: + return d + return cl + + base_level = _get_simple_nested_value(data_structure, path_before, default) + + if not isinstance(base_level, list): + return default + + results = [] + for item in base_level: + value = get_nested_value(item, path_after, default) + if value is not default and value != "$$$$ No Data": + results.append(value) + + final_results = [] + for res in results: + if isinstance(res, list): + final_results.extend(res) + else: + final_results.append(res) + + return final_results + + current_level = data_structure + for key_or_index in path: + if isinstance(current_level, dict): + current_level = current_level.get(key_or_index) + if current_level is None: + return default + elif isinstance(current_level, list): + try: + if isinstance(key_or_index, int) and -len(current_level) <= key_or_index < len(current_level): + current_level = current_level[key_or_index] + else: + return default + except (IndexError, TypeError): + return default + else: + return default + return current_level + + +def get_httpx_client() -> httpx.Client: + """Get or create thread-local HTTP client with keep-alive enabled.""" + global httpx_clients + thread_id = threading.get_ident() + if thread_id not in httpx_clients: + httpx_clients[thread_id] = httpx.Client( + headers={"Connection": "keep-alive"}, + limits=httpx.Limits(max_keepalive_connections=20, max_connections=100) + ) + return httpx_clients[thread_id] + + +def get_thread_position(): + """Get position of current thread in threads list.""" + global threads_list + thread_id = threading.get_ident() + with _threads_list_lock: + if thread_id not in threads_list: + threads_list.append(thread_id) + return len(threads_list) - 1 + else: + return threads_list.index(thread_id) + + +# ============================================================================ +# AUTHENTICATION (copied from template) +# ============================================================================ + +def login(): + """Authenticate with IAM and configure tokens for all microservices.""" + global tokens + + user_name = questionary.text("login:", default=DEFAULT_USER_NAME).ask() + password = questionary.password("password:", default=DEFAULT_PASSWORD).ask() + + if not (user_name and password): + return "Exit" + + try: + client = get_httpx_client() + client.base_url = MICROSERVICES["IAM"]["base_url"] + response = client.post( + MICROSERVICES["IAM"]["endpoints"]["login"], + json={"username": user_name, "password": password}, + timeout=20 + ) + response.raise_for_status() + master_token = response.json()["access_token"] + user_id = response.json()["userId"] + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + print(f"Login Error: {exc}") + logging.warning(f"Login Error: {exc}") + return "Error" + + for app_name, app_config in MICROSERVICES.items(): + if app_name == "IAM": + continue + + try: + client = get_httpx_client() + client.base_url = app_config["base_url"] + response = client.post( + app_config["endpoints"]["config_token"], + headers={"Authorization": f"Bearer {master_token}"}, + json={ + "userId": user_id, + "clientId": app_config["app_id"], + "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + }, + timeout=20 + ) + response.raise_for_status() + tokens[app_name] = { + "access_token": response.json()["access_token"], + "refresh_token": response.json()["refresh_token"] + } + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + print(f"Config-token Error for {app_name}: {exc}") + logging.warning(f"Config-token Error for {app_name}: {exc}") + return "Error" + + print("\nLogin Success") + return "Success" + + +def new_token(app): + """Refresh access token for a specific microservice.""" + global tokens + + with _token_refresh_lock: + for attempt in range(ERROR_MAX_RETRY): + try: + client = get_httpx_client() + client.base_url = MICROSERVICES[app]["base_url"] + response = client.post( + MICROSERVICES[app]["endpoints"]["refresh"], + headers={"Authorization": f"Bearer {tokens[app]['access_token']}"}, + json={"refresh_token": tokens[app]["refresh_token"]}, + timeout=20 + ) + response.raise_for_status() + tokens[app]["access_token"] = response.json()["access_token"] + tokens[app]["refresh_token"] = response.json()["refresh_token"] + return + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + logging.warning(f"Refresh Token Error for {app} (Attempt {attempt + 1}): {exc}") + if attempt < ERROR_MAX_RETRY - 1: + sleep(WAIT_BEFORE_RETRY) + + logging.critical(f"Persistent error in refresh_token for {app}") + raise httpx.RequestError(message=f"Persistent error in refresh_token for {app}") + + +# ============================================================================ +# DECORATORS (copied from template) +# ============================================================================ + +def api_call_with_retry(app): + """Decorator for API calls with automatic retry and token refresh on 401.""" + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + func_name = func.__name__ + for attempt in range(ERROR_MAX_RETRY): + try: + return func(*args, **kwargs) + except (httpx.RequestError, httpx.HTTPStatusError) as exc: + logging.warning(f"Error in {func_name} (Attempt {attempt + 1}/{ERROR_MAX_RETRY}): {exc}") + + if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401: + logging.info(f"Token expired for {func_name}. Refreshing token for {app}.") + new_token(app) + + if attempt < ERROR_MAX_RETRY - 1: + sleep(WAIT_BEFORE_RETRY) + + logging.critical(f"Persistent error in {func_name} after {ERROR_MAX_RETRY} attempts.") + raise httpx.RequestError(message=f"Persistent error in {func_name}") + + return wrapper + return decorator + + +# ============================================================================ +# API CALLS - CUSTOM IMPLEMENTATION +# ============================================================================ + +@api_call_with_retry("RC") +def get_all_organizations(): + """Fetch all organizations from RC API.""" + client = get_httpx_client() + client.base_url = MICROSERVICES["RC"]["base_url"] + response = client.get( + MICROSERVICES["RC"]["endpoints"]["organizations"], + headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"}, + timeout=API_TIMEOUT + ) + response.raise_for_status() + return response.json() + + +@api_call_with_retry("RC") +def get_organization_statistics(organization_id, protocol_id): + """Fetch statistics for a specific organization.""" + client = get_httpx_client() + client.base_url = MICROSERVICES["RC"]["base_url"] + response = client.post( + MICROSERVICES["RC"]["endpoints"]["statistics"], + headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"}, + json={ + "protocolId": protocol_id, + "center": organization_id, + "excludedCenters": [] + }, + timeout=API_TIMEOUT + ) + response.raise_for_status() + return response.json()["statistic"] + + +# ============================================================================ +# PROCESSING FUNCTIONS - CUSTOM IMPLEMENTATION +# ============================================================================ + +def process_organization(organization, protocol_id): + """ + Process a single organization: fetch statistics and enrich data. + + Args: + organization: Organization dict from API + protocol_id: Protocol UUID + + Returns: + Dict with organization data and statistics + """ + org_id = organization["id"] + org_name = organization["name"] + + # Fetch statistics using subtasks pool + stats_future = subtasks_thread_pool.submit(get_organization_statistics, org_id, protocol_id) + stats = stats_future.result() + + return { + "id": org_id, + "name": org_name, + "total_inclusions": stats.get("totalInclusions", 0), + "pre_included": stats.get("preIncluded", 0), + "included": stats.get("included", 0), + "terminated": stats.get("prematurelyTerminated", 0) + } + + +# ============================================================================ +# MAIN PROCESSING - CUSTOM IMPLEMENTATION +# ============================================================================ + +def main(): + """Main processing: fetch organizations and their statistics.""" + global main_thread_pool, subtasks_thread_pool + + # ========== AUTHENTICATION ========== + print() + login_status = login() + while login_status == "Error": + login_status = login() + if login_status == "Exit": + return + + # ========== CONFIGURATION ========== + print() + number_of_threads = int( + questionary.text( + "Number of threads:", + default="8", + validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS + ).ask() + ) + + # Protocol ID for Endobest + protocol_id = "3c7bcb4d-91ed-4e9f-b93f-99d8447a276e" + + # ========== INITIALIZATION ========== + start_time = perf_counter() + + main_thread_pool = ThreadPoolExecutor(max_workers=number_of_threads) + subtasks_thread_pool = ThreadPoolExecutor(max_workers=SUBTASKS_POOL_SIZE) + + # ========== MAIN PROCESSING ========== + print() + console.print("[bold cyan]Fetching organizations...[/bold cyan]") + + organizations = get_all_organizations() + console.print(f"[green]Found {len(organizations)} organizations[/green]") + + print() + console.print("[bold cyan]Processing organizations in parallel...[/bold cyan]") + + results = [] + with tqdm(total=len(organizations), desc="Processing organizations", + bar_format=custom_bar_format) as pbar: + with main_thread_pool as executor: + futures = [executor.submit(process_organization, org, protocol_id) + for org in organizations] + + for future in as_completed(futures): + try: + result = future.result() + results.append(result) + pbar.update(1) + except Exception as exc: + logging.critical(f"Error processing organization: {exc}", exc_info=True) + print(f"\nCRITICAL ERROR: {exc}") + executor.shutdown(wait=False, cancel_futures=True) + raise + + # ========== RESULTS ========== + print() + console.print("[bold cyan]Results Summary:[/bold cyan]") + + # Sort by total inclusions (descending) + results.sort(key=lambda x: x["total_inclusions"], reverse=True) + + # Display top 10 + for i, org in enumerate(results[:10], 1): + console.print( + f"{i:2}. {org['name'][:40]:<40} | " + f"Total: {org['total_inclusions']:3} | " + f"Pre: {org['pre_included']:3} | " + f"Inc: {org['included']:3} | " + f"Term: {org['terminated']:2}" + ) + + # Save to JSON + output_file = "organizations_summary.json" + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(results, f, indent=4, ensure_ascii=False) + + console.print(f"\n[green]✓ Results saved to {output_file}[/green]") + + # ========== FINALIZATION ========== + print() + print(f"Elapsed time: {str(timedelta(seconds=perf_counter() - start_time))}") + + +# ============================================================================ +# ENTRY POINT +# ============================================================================ + +if __name__ == '__main__': + script_name = os.path.splitext(os.path.basename(__file__))[0] + log_file_name = f"{script_name}.log" + + logging.basicConfig( + level=LOG_LEVEL, + format=LOG_FORMAT, + filename=log_file_name, + filemode='w' + ) + + try: + main() + except Exception as e: + logging.critical(f"Script terminated with exception: {e}", exc_info=True) + print(f"\nScript stopped due to error: {e}") + print(traceback.format_exc()) + finally: + if 'main_thread_pool' in globals() and main_thread_pool: + main_thread_pool.shutdown(wait=False, cancel_futures=True) + if 'subtasks_thread_pool' in globals() and subtasks_thread_pool: + subtasks_thread_pool.shutdown(wait=False, cancel_futures=True) + + print('\n') + input("Press Enter to exit...")