629 lines
19 KiB
Markdown
629 lines
19 KiB
Markdown
# Template de script pour automatiser des extractions et/ou des modifications des données de la plateforme de recherche clinique Endobest
|
|
|
|
## 1. OBJECTIF
|
|
|
|
Créer un template constituant un point de départ réutilisable pour tout script d'accès aux données Endobest offrant les services minimum suivants :
|
|
- Authentification multi-microservices
|
|
- Pool de clients HTTP thread-safe
|
|
- Multithreading avec pool principal et pool de sous-tâches
|
|
- Gestion des erreurs avec retry automatique
|
|
- Logging configuré
|
|
- Barres de progression
|
|
- Templates d'API calls
|
|
- Utilitaires de navigation JSON
|
|
|
|
**Contraintes :**
|
|
- Tous les utilitaires regroupés dans le même script (pas de modules externes)
|
|
- Configuration 100% par constantes en début de script (éviter les littéraux)
|
|
- Traitement "main" par défaut avec squelette standard
|
|
|
|
---
|
|
|
|
## 2. ARCHITECTURE
|
|
|
|
### 2.1 Structure du fichier
|
|
|
|
```
|
|
eb_script_template.py
|
|
├── IMPORTS
|
|
├── CONSTANTES DE CONFIGURATION
|
|
│ ├── Credentials
|
|
│ ├── Microservices (dictionnaire)
|
|
│ ├── Threading
|
|
│ ├── Retry & Logging
|
|
│ └── Progress bars
|
|
├── VARIABLES GLOBALES
|
|
├── UTILITAIRES
|
|
│ ├── get_nested_value()
|
|
│ ├── get_httpx_client()
|
|
│ ├── get_thread_position()
|
|
├── AUTHENTIFICATION
|
|
│ ├── login()
|
|
│ ├── new_token(app)
|
|
├── DECORATEURS
|
|
│ ├── api_call_with_retry(app)
|
|
├── API TEMPLATES
|
|
│ ├── api_call_template()
|
|
├── FONCTION MAIN
|
|
└── POINT D'ENTREE (if __name__ == '__main__')
|
|
```
|
|
|
|
### 2.2 Dépendances obligatoires
|
|
|
|
```python
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import threading
|
|
import traceback
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import timedelta
|
|
from time import perf_counter, sleep
|
|
import functools
|
|
|
|
import httpx
|
|
import questionary
|
|
from tqdm import tqdm
|
|
from rich.console import Console
|
|
```
|
|
|
|
---
|
|
|
|
## 3. CONFIGURATION (CONSTANTES)
|
|
|
|
### 3.1 Credentials
|
|
|
|
```python
|
|
DEFAULT_USER_NAME = "ziwig-invest2@yopmail.com"
|
|
DEFAULT_PASSWORD = "pbrrA765$bP3beiuyuiyhiuy!agx"
|
|
```
|
|
|
|
### 3.2 Microservices (dictionnaire)
|
|
|
|
Structure du dictionnaire MICROSERVICES :
|
|
|
|
```python
|
|
MICROSERVICES = {
|
|
"IAM": {
|
|
"app_id": None, # IAM n'utilise pas d'app_id
|
|
"base_url": "https://api-auth.ziwig-connect.com",
|
|
"endpoints": {
|
|
"login": "/api/auth/ziwig-pro/login",
|
|
"refresh": "/api/auth/refreshToken",
|
|
}
|
|
},
|
|
"RC": {
|
|
"app_id": "602aea51-cdb2-4f73-ac99-fd84050dc393",
|
|
"base_url": "https://api-hcp.ziwig-connect.com",
|
|
"endpoints": {
|
|
"config_token": "/api/auth/config-token",
|
|
"refresh": "/api/auth/refreshToken",
|
|
"organizations": "/api/inclusions/getAllOrganizations",
|
|
"statistics": "/api/inclusions/inclusion-statistics",
|
|
"search_inclusions": "/api/inclusions/search",
|
|
"record_by_patient": "/api/records/byPatient",
|
|
"surveys": "/api/surveys/filter/with-answers",
|
|
}
|
|
},
|
|
"GDD": {
|
|
"app_id": None, # À compléter si différent de RC
|
|
"base_url": "https://api-lab.ziwig-connect.com",
|
|
"endpoints": {
|
|
"config_token": "/api/auth/config-token",
|
|
"refresh": "/api/auth/refreshToken",
|
|
"request_by_tube": "/api/requests/by-tube-id",
|
|
}
|
|
},
|
|
}
|
|
```
|
|
|
|
**Notes :**
|
|
- Les microservices non utilisés peuvent être commentés par le développeur
|
|
- Chaque microservice définit : libellé (clé), app_id, base_url, endpoints
|
|
- Les endpoints sont tous ceux déjà configurés dans eb_dashboard.py
|
|
|
|
### 3.3 Threading
|
|
|
|
```python
|
|
MAX_THREADS = 20 # Limite maximale pour le pool principal
|
|
SUBTASKS_POOL_SIZE = 40 # Taille fixe du pool de sous-tâches
|
|
```
|
|
|
|
### 3.4 Retry & Timeouts
|
|
|
|
```python
|
|
ERROR_MAX_RETRY = 10 # Nombre maximum de tentatives
|
|
WAIT_BEFORE_RETRY = 0.5 # Délai en secondes (fixe, pas exponentiel)
|
|
API_TIMEOUT = 60 # Timeout en secondes (modifiable globalement ou par API)
|
|
```
|
|
|
|
### 3.5 Logging
|
|
|
|
```python
|
|
LOG_LEVEL = logging.INFO # Niveau de log par défaut (modifiable)
|
|
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
|
|
# LOG_FILE_NAME sera généré automatiquement basé sur le nom du script
|
|
```
|
|
|
|
### 3.6 Progress Bars
|
|
|
|
```python
|
|
BAR_N_FMT_WIDTH = 4
|
|
BAR_TOTAL_FMT_WIDTH = 4
|
|
BAR_TIME_WIDTH = 8
|
|
BAR_RATE_WIDTH = 10
|
|
|
|
custom_bar_format = ("{l_bar}{bar}"
|
|
f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} "
|
|
f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, "
|
|
f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}")
|
|
```
|
|
|
|
---
|
|
|
|
## 4. VARIABLES GLOBALES
|
|
|
|
```python
|
|
# Tokens par microservice (dictionnaire)
|
|
tokens = {} # Structure: {app_name: {"access_token": ..., "refresh_token": ...}}
|
|
|
|
# Pool de clients HTTP thread-safe
|
|
httpx_clients = {}
|
|
|
|
# Gestion des threads
|
|
threads_list = []
|
|
_threads_list_lock = threading.Lock()
|
|
_token_refresh_lock = threading.Lock()
|
|
|
|
# Pools de threads
|
|
main_thread_pool = None
|
|
subtasks_thread_pool = None
|
|
|
|
# Console Rich
|
|
console = Console()
|
|
```
|
|
|
|
---
|
|
|
|
## 5. UTILITAIRES
|
|
|
|
### 5.1 get_nested_value(data_structure, path, default=None)
|
|
|
|
Navigation dans structures JSON imbriquées avec support wildcard '*'.
|
|
|
|
**Signature :**
|
|
```python
|
|
def get_nested_value(data_structure, path, default=None):
|
|
"""
|
|
Extracts a value from nested dict/list structures.
|
|
|
|
Args:
|
|
data_structure: Nested dict/list to navigate
|
|
path: List of keys/indices. Use '*' for list wildcard.
|
|
default: Value to return if path not found
|
|
|
|
Returns:
|
|
Value at path or default
|
|
|
|
Examples:
|
|
get_nested_value({"a": {"b": 1}}, ["a", "b"]) -> 1
|
|
get_nested_value({"items": [{"x": 1}, {"x": 2}]}, ["items", "*", "x"]) -> [1, 2]
|
|
"""
|
|
```
|
|
|
|
**Source :** eb_dashboard_utils.py:71-154
|
|
|
|
### 5.2 get_httpx_client() -> httpx.Client
|
|
|
|
Retourne un client HTTP thread-local avec keep-alive.
|
|
|
|
**Signature :**
|
|
```python
|
|
def get_httpx_client() -> httpx.Client:
|
|
"""
|
|
Get or create thread-local HTTP client with keep-alive enabled.
|
|
Each thread gets its own httpx.Client instance to avoid connection conflicts.
|
|
"""
|
|
```
|
|
|
|
**Source :** eb_dashboard_utils.py:35-49
|
|
|
|
### 5.3 get_thread_position() -> int
|
|
|
|
Retourne la position du thread actuel dans la liste (pour barres de progression).
|
|
|
|
**Signature :**
|
|
```python
|
|
def get_thread_position() -> int:
|
|
"""
|
|
Get the position of the current thread in the threads list.
|
|
Used for managing progress bar positions in multithreaded environment.
|
|
"""
|
|
```
|
|
|
|
**Source :** eb_dashboard_utils.py:52-64
|
|
|
|
---
|
|
|
|
## 6. AUTHENTIFICATION
|
|
|
|
### 6.1 Fonction login()
|
|
|
|
**Responsabilités :**
|
|
1. Demander login/password avec questionary (valeurs par défaut depuis constantes)
|
|
2. Se connecter à l'IAM et obtenir le master token
|
|
3. Pour chaque microservice (sauf IAM) : appeler config-token et stocker access_token + refresh_token
|
|
|
|
**Logique :**
|
|
```python
|
|
def login():
|
|
"""
|
|
Authenticate with IAM and configure tokens for all microservices.
|
|
|
|
Returns:
|
|
"Success", "Error", or "Exit"
|
|
"""
|
|
global tokens
|
|
|
|
# 1. Demander credentials avec questionary
|
|
user_name = questionary.text("login:", default=DEFAULT_USER_NAME).ask()
|
|
password = questionary.password("password:", default=DEFAULT_PASSWORD).ask()
|
|
|
|
if not (user_name and password):
|
|
return "Exit"
|
|
|
|
# 2. Login IAM -> master_token + user_id
|
|
try:
|
|
client = get_httpx_client()
|
|
client.base_url = MICROSERVICES["IAM"]["base_url"]
|
|
response = client.post(
|
|
MICROSERVICES["IAM"]["endpoints"]["login"],
|
|
json={"username": user_name, "password": password},
|
|
timeout=20
|
|
)
|
|
response.raise_for_status()
|
|
master_token = response.json()["access_token"]
|
|
user_id = response.json()["userId"]
|
|
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
|
|
print(f"Login Error: {exc}")
|
|
logging.warning(f"Login Error: {exc}")
|
|
return "Error"
|
|
|
|
# 3. Config-token pour chaque microservice
|
|
for app_name, app_config in MICROSERVICES.items():
|
|
if app_name == "IAM":
|
|
continue # IAM n'a pas besoin de config-token
|
|
|
|
try:
|
|
client = get_httpx_client()
|
|
client.base_url = app_config["base_url"]
|
|
response = client.post(
|
|
app_config["endpoints"]["config_token"],
|
|
headers={"Authorization": f"Bearer {master_token}"},
|
|
json={
|
|
"userId": user_id,
|
|
"clientId": app_config["app_id"],
|
|
"userAgent": "Mozilla/5.0 ..."
|
|
},
|
|
timeout=20
|
|
)
|
|
response.raise_for_status()
|
|
tokens[app_name] = {
|
|
"access_token": response.json()["access_token"],
|
|
"refresh_token": response.json()["refresh_token"]
|
|
}
|
|
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
|
|
print(f"Config-token Error for {app_name}: {exc}")
|
|
logging.warning(f"Config-token Error for {app_name}: {exc}")
|
|
return "Error"
|
|
|
|
print("\nLogin Success")
|
|
return "Success"
|
|
```
|
|
|
|
**Source :** eb_dashboard.py:210-258 (adapté)
|
|
|
|
### 6.2 Fonction new_token(app)
|
|
|
|
**Responsabilités :**
|
|
Actualiser l'access_token d'un microservice spécifique en utilisant son refresh_token.
|
|
|
|
**Signature :**
|
|
```python
|
|
def new_token(app):
|
|
"""
|
|
Refresh access token for a specific microservice.
|
|
|
|
Args:
|
|
app: Microservice name (e.g., "RC", "GDD")
|
|
"""
|
|
global tokens
|
|
|
|
with _token_refresh_lock:
|
|
for attempt in range(ERROR_MAX_RETRY):
|
|
try:
|
|
client = get_httpx_client()
|
|
client.base_url = MICROSERVICES[app]["base_url"]
|
|
response = client.post(
|
|
MICROSERVICES[app]["endpoints"]["refresh"],
|
|
headers={"Authorization": f"Bearer {tokens[app]['access_token']}"},
|
|
json={"refresh_token": tokens[app]["refresh_token"]},
|
|
timeout=20
|
|
)
|
|
response.raise_for_status()
|
|
tokens[app]["access_token"] = response.json()["access_token"]
|
|
tokens[app]["refresh_token"] = response.json()["refresh_token"]
|
|
return
|
|
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
|
|
logging.warning(f"Refresh Token Error for {app} (Attempt {attempt + 1}): {exc}")
|
|
if attempt < ERROR_MAX_RETRY - 1:
|
|
sleep(WAIT_BEFORE_RETRY)
|
|
|
|
logging.critical(f"Persistent error in refresh_token for {app}")
|
|
raise httpx.RequestError(message=f"Persistent error in refresh_token for {app}")
|
|
```
|
|
|
|
**Source :** eb_dashboard.py:157-181 (adapté)
|
|
|
|
---
|
|
|
|
## 7. DECORATEURS
|
|
|
|
### 7.1 api_call_with_retry(app)
|
|
|
|
Décorateur pour API calls avec retry automatique et refresh token sur erreur 401.
|
|
|
|
**Signature :**
|
|
```python
|
|
def api_call_with_retry(app):
|
|
"""
|
|
Decorator for API calls with automatic retry and token refresh on 401 errors.
|
|
|
|
Args:
|
|
app: Microservice name (e.g., "RC", "GDD")
|
|
|
|
Usage:
|
|
@api_call_with_retry("RC")
|
|
def get_organizations():
|
|
...
|
|
"""
|
|
def decorator(func):
|
|
@functools.wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
func_name = func.__name__
|
|
for attempt in range(ERROR_MAX_RETRY):
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
|
|
logging.warning(f"Error in {func_name} (Attempt {attempt + 1}/{ERROR_MAX_RETRY}): {exc}")
|
|
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
|
|
logging.info(f"Token expired for {func_name}. Refreshing token for {app}.")
|
|
new_token(app)
|
|
if attempt < ERROR_MAX_RETRY - 1:
|
|
sleep(WAIT_BEFORE_RETRY)
|
|
|
|
logging.critical(f"Persistent error in {func_name} after {ERROR_MAX_RETRY} attempts.")
|
|
raise httpx.RequestError(message=f"Persistent error in {func_name}")
|
|
|
|
return wrapper
|
|
return decorator
|
|
```
|
|
|
|
**Source :** eb_dashboard.py:184-203 (adapté)
|
|
|
|
---
|
|
|
|
## 8. API TEMPLATES
|
|
|
|
### 8.1 api_call_template()
|
|
|
|
Template générique pour API call (GET/POST/PUT/PATCH/DELETE au choix du développeur).
|
|
|
|
**Exemple :**
|
|
```python
|
|
@api_call_with_retry("RC")
|
|
def get_all_organizations():
|
|
"""Example API call: Get all organizations."""
|
|
client = get_httpx_client()
|
|
client.base_url = MICROSERVICES["RC"]["base_url"]
|
|
response = client.get(
|
|
MICROSERVICES["RC"]["endpoints"]["organizations"],
|
|
headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"},
|
|
timeout=API_TIMEOUT # Ou timeout spécifique
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
@api_call_with_retry("RC")
|
|
def search_inclusions(organization_id, limit, page):
|
|
"""Example API call: Search inclusions (POST)."""
|
|
client = get_httpx_client()
|
|
client.base_url = MICROSERVICES["RC"]["base_url"]
|
|
response = client.post(
|
|
f"{MICROSERVICES['RC']['endpoints']['search_inclusions']}?limit={limit}&page={page}",
|
|
headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"},
|
|
json={"protocolId": "...", "center": organization_id, "keywords": ""},
|
|
timeout=API_TIMEOUT
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
```
|
|
|
|
**Notes :**
|
|
- Toujours retourner `response.json()`
|
|
- Le développeur choisit la méthode HTTP (GET/POST/PUT/DELETE)
|
|
- Timeout configurable globalement (API_TIMEOUT) ou localement
|
|
|
|
**Source :** eb_dashboard.py:773-843 (exemples)
|
|
|
|
---
|
|
|
|
## 9. FONCTION MAIN
|
|
|
|
### 9.1 Structure
|
|
|
|
```python
|
|
def main():
|
|
"""Main processing function."""
|
|
global main_thread_pool, subtasks_thread_pool
|
|
|
|
# ============================================================================
|
|
# AUTHENTICATION
|
|
# ============================================================================
|
|
print()
|
|
login_status = login()
|
|
while login_status == "Error":
|
|
login_status = login()
|
|
if login_status == "Exit":
|
|
return
|
|
|
|
# ============================================================================
|
|
# CONFIGURATION
|
|
# ============================================================================
|
|
print()
|
|
number_of_threads = int(
|
|
questionary.text(
|
|
"Number of threads:",
|
|
default="12",
|
|
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS
|
|
).ask()
|
|
)
|
|
|
|
# ============================================================================
|
|
# INITIALIZATION
|
|
# ============================================================================
|
|
start_time = perf_counter()
|
|
|
|
# Initialize thread pools
|
|
main_thread_pool = ThreadPoolExecutor(max_workers=number_of_threads)
|
|
subtasks_thread_pool = ThreadPoolExecutor(max_workers=SUBTASKS_POOL_SIZE)
|
|
|
|
# ============================================================================
|
|
# MAIN PROCESSING BLOCK
|
|
# ============================================================================
|
|
print()
|
|
console.print("[bold cyan]Starting main processing...[/bold cyan]")
|
|
|
|
# TODO: Developer implements processing logic here
|
|
# Example structure:
|
|
#
|
|
# with tqdm(total=total_items, desc="Processing", bar_format=custom_bar_format) as pbar:
|
|
# with main_thread_pool as executor:
|
|
# futures = [executor.submit(process_item, item) for item in items]
|
|
# for future in as_completed(futures):
|
|
# try:
|
|
# result = future.result()
|
|
# # Process result
|
|
# pbar.update(1)
|
|
# except Exception as exc:
|
|
# logging.critical(f"Error in worker: {exc}", exc_info=True)
|
|
# print(f"\nCRITICAL ERROR: {exc}")
|
|
# executor.shutdown(wait=False, cancel_futures=True)
|
|
# raise
|
|
|
|
# ============================================================================
|
|
# FINALIZATION
|
|
# ============================================================================
|
|
print()
|
|
print(f"Elapsed time: {str(timedelta(seconds=perf_counter() - start_time))}")
|
|
```
|
|
|
|
**Source :** eb_dashboard.py:1090-1285 (adapté)
|
|
|
|
---
|
|
|
|
## 10. POINT D'ENTREE
|
|
|
|
### 10.1 Structure avec finally
|
|
|
|
```python
|
|
if __name__ == '__main__':
|
|
# Logging configuration (filename basé sur le nom du script)
|
|
script_name = os.path.splitext(os.path.basename(__file__))[0]
|
|
log_file_name = f"{script_name}.log"
|
|
|
|
logging.basicConfig(
|
|
level=LOG_LEVEL,
|
|
format=LOG_FORMAT,
|
|
filename=log_file_name,
|
|
filemode='w'
|
|
)
|
|
|
|
try:
|
|
main()
|
|
except Exception as e:
|
|
logging.critical(f"Script terminated with exception: {e}", exc_info=True)
|
|
print(f"\nScript stopped due to error: {e}")
|
|
finally:
|
|
# Shutdown thread pools
|
|
if 'main_thread_pool' in globals() and main_thread_pool:
|
|
main_thread_pool.shutdown(wait=False, cancel_futures=True)
|
|
if 'subtasks_thread_pool' in globals() and subtasks_thread_pool:
|
|
subtasks_thread_pool.shutdown(wait=False, cancel_futures=True)
|
|
|
|
# Pause before exit (pour que la console ne se ferme pas immédiatement)
|
|
print('\n')
|
|
input("Press Enter to exit...")
|
|
```
|
|
|
|
**Source :** eb_dashboard.py:1287-1299 (adapté)
|
|
|
|
---
|
|
|
|
## 11. PLAN D'IMPLEMENTATION
|
|
|
|
### Phase 1 : Squelette et infrastructure
|
|
1. Créer `eb_script_template.py`
|
|
2. Ajouter imports et constantes
|
|
3. Implémenter variables globales
|
|
4. Implémenter utilitaires (get_nested_value, get_httpx_client, get_thread_position)
|
|
|
|
### Phase 2 : Authentification
|
|
5. Implémenter login() avec IAM + config-token multi-microservices
|
|
6. Implémenter new_token(app)
|
|
7. Implémenter décorateur api_call_with_retry(app)
|
|
|
|
### Phase 3 : API Templates
|
|
8. Créer exemples d'API calls (GET/POST)
|
|
9. Documenter les patterns
|
|
|
|
### Phase 4 : Fonction main et point d'entrée
|
|
10. Implémenter main() avec structure complète
|
|
11. Implémenter point d'entrée avec finally
|
|
12. Configurer logging automatique basé sur nom du script
|
|
|
|
### Phase 5 : Documentation et tests
|
|
13. Ajouter docstrings détaillés
|
|
14. Ajouter commentaires pour guider le développeur
|
|
15. Tester le template avec un cas d'usage simple
|
|
|
|
---
|
|
|
|
## 12. NOTES TECHNIQUES
|
|
|
|
### 12.1 Choix de design
|
|
|
|
- **Pas de modules externes** : Tous les utilitaires dans un seul fichier pour faciliter la réutilisation
|
|
- **Configuration par constantes** : Facilite la personnalisation sans chercher dans le code
|
|
- **Dictionnaire MICROSERVICES** : Architecture extensible pour ajouter de nouveaux services
|
|
- **Tokens par app** : Permet la gestion indépendante de chaque microservice
|
|
- **Retry fixe** : Délai constant (pas exponentiel) pour simplifier le debugging
|
|
|
|
### 12.2 Points d'extension pour le développeur
|
|
|
|
Le développeur peut personnaliser :
|
|
- Constantes (credentials, timeouts, retry, logging level)
|
|
- Microservices utilisés (commenter ceux non nécessaires)
|
|
- Endpoints (ajouter selon besoins)
|
|
- Méthodes HTTP dans les templates
|
|
- Logique de traitement dans main()
|
|
- Taille du pool de threads
|
|
|
|
### 12.3 Compatibilité
|
|
|
|
- Compatible avec eb_dashboard.py (même architecture)
|
|
- Réutilise les mêmes patterns éprouvés
|
|
- Extensible pour futurs scripts Endobest
|