19 KiB
Template de script pour automatiser des extractions et/ou des modifications des données de la plateforme de recherche clinique Endobest
1. OBJECTIF
Créer un template constituant un point de départ réutilisable pour tout script d'accès aux données Endobest offrant les services minimum suivants :
- Authentification multi-microservices
- Pool de clients HTTP thread-safe
- Multithreading avec pool principal et pool de sous-tâches
- Gestion des erreurs avec retry automatique
- Logging configuré
- Barres de progression
- Templates d'API calls
- Utilitaires de navigation JSON
Contraintes :
- Tous les utilitaires regroupés dans le même script (pas de modules externes)
- Configuration 100% par constantes en début de script (éviter les littéraux)
- Traitement "main" par défaut avec squelette standard
2. ARCHITECTURE
2.1 Structure du fichier
eb_script_template.py
├── IMPORTS
├── CONSTANTES DE CONFIGURATION
│ ├── Credentials
│ ├── Microservices (dictionnaire)
│ ├── Threading
│ ├── Retry & Logging
│ └── Progress bars
├── VARIABLES GLOBALES
├── UTILITAIRES
│ ├── get_nested_value()
│ ├── get_httpx_client()
│ ├── get_thread_position()
├── AUTHENTIFICATION
│ ├── login()
│ ├── new_token(app)
├── DECORATEURS
│ ├── api_call_with_retry(app)
├── API TEMPLATES
│ ├── api_call_template()
├── FONCTION MAIN
└── POINT D'ENTREE (if __name__ == '__main__')
2.2 Dépendances obligatoires
import json
import logging
import os
import sys
import threading
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import timedelta
from time import perf_counter, sleep
import functools
import httpx
import questionary
from tqdm import tqdm
from rich.console import Console
3. CONFIGURATION (CONSTANTES)
3.1 Credentials
DEFAULT_USER_NAME = "ziwig-invest2@yopmail.com"
DEFAULT_PASSWORD = "pbrrA765$bP3beiuyuiyhiuy!agx"
3.2 Microservices (dictionnaire)
Structure du dictionnaire MICROSERVICES :
MICROSERVICES = {
"IAM": {
"app_id": None, # IAM n'utilise pas d'app_id
"base_url": "https://api-auth.ziwig-connect.com",
"endpoints": {
"login": "/api/auth/ziwig-pro/login",
"refresh": "/api/auth/refreshToken",
}
},
"RC": {
"app_id": "602aea51-cdb2-4f73-ac99-fd84050dc393",
"base_url": "https://api-hcp.ziwig-connect.com",
"endpoints": {
"config_token": "/api/auth/config-token",
"refresh": "/api/auth/refreshToken",
"organizations": "/api/inclusions/getAllOrganizations",
"statistics": "/api/inclusions/inclusion-statistics",
"search_inclusions": "/api/inclusions/search",
"record_by_patient": "/api/records/byPatient",
"surveys": "/api/surveys/filter/with-answers",
}
},
"GDD": {
"app_id": None, # À compléter si différent de RC
"base_url": "https://api-lab.ziwig-connect.com",
"endpoints": {
"config_token": "/api/auth/config-token",
"refresh": "/api/auth/refreshToken",
"request_by_tube": "/api/requests/by-tube-id",
}
},
}
Notes :
- Les microservices non utilisés peuvent être commentés par le développeur
- Chaque microservice définit : libellé (clé), app_id, base_url, endpoints
- Les endpoints sont tous ceux déjà configurés dans eb_dashboard.py
3.3 Threading
MAX_THREADS = 20 # Limite maximale pour le pool principal
SUBTASKS_POOL_SIZE = 40 # Taille fixe du pool de sous-tâches
3.4 Retry & Timeouts
ERROR_MAX_RETRY = 10 # Nombre maximum de tentatives
WAIT_BEFORE_RETRY = 0.5 # Délai en secondes (fixe, pas exponentiel)
API_TIMEOUT = 60 # Timeout en secondes (modifiable globalement ou par API)
3.5 Logging
LOG_LEVEL = logging.INFO # Niveau de log par défaut (modifiable)
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
# LOG_FILE_NAME sera généré automatiquement basé sur le nom du script
3.6 Progress Bars
BAR_N_FMT_WIDTH = 4
BAR_TOTAL_FMT_WIDTH = 4
BAR_TIME_WIDTH = 8
BAR_RATE_WIDTH = 10
custom_bar_format = ("{l_bar}{bar}"
f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} "
f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, "
f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}")
4. VARIABLES GLOBALES
# Tokens par microservice (dictionnaire)
tokens = {} # Structure: {app_name: {"access_token": ..., "refresh_token": ...}}
# Pool de clients HTTP thread-safe
httpx_clients = {}
# Gestion des threads
threads_list = []
_threads_list_lock = threading.Lock()
_token_refresh_lock = threading.Lock()
# Pools de threads
main_thread_pool = None
subtasks_thread_pool = None
# Console Rich
console = Console()
5. UTILITAIRES
5.1 get_nested_value(data_structure, path, default=None)
Navigation dans structures JSON imbriquées avec support wildcard '*'.
Signature :
def get_nested_value(data_structure, path, default=None):
"""
Extracts a value from nested dict/list structures.
Args:
data_structure: Nested dict/list to navigate
path: List of keys/indices. Use '*' for list wildcard.
default: Value to return if path not found
Returns:
Value at path or default
Examples:
get_nested_value({"a": {"b": 1}}, ["a", "b"]) -> 1
get_nested_value({"items": [{"x": 1}, {"x": 2}]}, ["items", "*", "x"]) -> [1, 2]
"""
Source : eb_dashboard_utils.py:71-154
5.2 get_httpx_client() -> httpx.Client
Retourne un client HTTP thread-local avec keep-alive.
Signature :
def get_httpx_client() -> httpx.Client:
"""
Get or create thread-local HTTP client with keep-alive enabled.
Each thread gets its own httpx.Client instance to avoid connection conflicts.
"""
Source : eb_dashboard_utils.py:35-49
5.3 get_thread_position() -> int
Retourne la position du thread actuel dans la liste (pour barres de progression).
Signature :
def get_thread_position() -> int:
"""
Get the position of the current thread in the threads list.
Used for managing progress bar positions in multithreaded environment.
"""
Source : eb_dashboard_utils.py:52-64
6. AUTHENTIFICATION
6.1 Fonction login()
Responsabilités :
- Demander login/password avec questionary (valeurs par défaut depuis constantes)
- Se connecter à l'IAM et obtenir le master token
- Pour chaque microservice (sauf IAM) : appeler config-token et stocker access_token + refresh_token
Logique :
def login():
"""
Authenticate with IAM and configure tokens for all microservices.
Returns:
"Success", "Error", or "Exit"
"""
global tokens
# 1. Demander credentials avec questionary
user_name = questionary.text("login:", default=DEFAULT_USER_NAME).ask()
password = questionary.password("password:", default=DEFAULT_PASSWORD).ask()
if not (user_name and password):
return "Exit"
# 2. Login IAM -> master_token + user_id
try:
client = get_httpx_client()
client.base_url = MICROSERVICES["IAM"]["base_url"]
response = client.post(
MICROSERVICES["IAM"]["endpoints"]["login"],
json={"username": user_name, "password": password},
timeout=20
)
response.raise_for_status()
master_token = response.json()["access_token"]
user_id = response.json()["userId"]
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
print(f"Login Error: {exc}")
logging.warning(f"Login Error: {exc}")
return "Error"
# 3. Config-token pour chaque microservice
for app_name, app_config in MICROSERVICES.items():
if app_name == "IAM":
continue # IAM n'a pas besoin de config-token
try:
client = get_httpx_client()
client.base_url = app_config["base_url"]
response = client.post(
app_config["endpoints"]["config_token"],
headers={"Authorization": f"Bearer {master_token}"},
json={
"userId": user_id,
"clientId": app_config["app_id"],
"userAgent": "Mozilla/5.0 ..."
},
timeout=20
)
response.raise_for_status()
tokens[app_name] = {
"access_token": response.json()["access_token"],
"refresh_token": response.json()["refresh_token"]
}
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
print(f"Config-token Error for {app_name}: {exc}")
logging.warning(f"Config-token Error for {app_name}: {exc}")
return "Error"
print("\nLogin Success")
return "Success"
Source : eb_dashboard.py:210-258 (adapté)
6.2 Fonction new_token(app)
Responsabilités : Actualiser l'access_token d'un microservice spécifique en utilisant son refresh_token.
Signature :
def new_token(app):
"""
Refresh access token for a specific microservice.
Args:
app: Microservice name (e.g., "RC", "GDD")
"""
global tokens
with _token_refresh_lock:
for attempt in range(ERROR_MAX_RETRY):
try:
client = get_httpx_client()
client.base_url = MICROSERVICES[app]["base_url"]
response = client.post(
MICROSERVICES[app]["endpoints"]["refresh"],
headers={"Authorization": f"Bearer {tokens[app]['access_token']}"},
json={"refresh_token": tokens[app]["refresh_token"]},
timeout=20
)
response.raise_for_status()
tokens[app]["access_token"] = response.json()["access_token"]
tokens[app]["refresh_token"] = response.json()["refresh_token"]
return
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
logging.warning(f"Refresh Token Error for {app} (Attempt {attempt + 1}): {exc}")
if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY)
logging.critical(f"Persistent error in refresh_token for {app}")
raise httpx.RequestError(message=f"Persistent error in refresh_token for {app}")
Source : eb_dashboard.py:157-181 (adapté)
7. DECORATEURS
7.1 api_call_with_retry(app)
Décorateur pour API calls avec retry automatique et refresh token sur erreur 401.
Signature :
def api_call_with_retry(app):
"""
Decorator for API calls with automatic retry and token refresh on 401 errors.
Args:
app: Microservice name (e.g., "RC", "GDD")
Usage:
@api_call_with_retry("RC")
def get_organizations():
...
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
func_name = func.__name__
for attempt in range(ERROR_MAX_RETRY):
try:
return func(*args, **kwargs)
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
logging.warning(f"Error in {func_name} (Attempt {attempt + 1}/{ERROR_MAX_RETRY}): {exc}")
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
logging.info(f"Token expired for {func_name}. Refreshing token for {app}.")
new_token(app)
if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY)
logging.critical(f"Persistent error in {func_name} after {ERROR_MAX_RETRY} attempts.")
raise httpx.RequestError(message=f"Persistent error in {func_name}")
return wrapper
return decorator
Source : eb_dashboard.py:184-203 (adapté)
8. API TEMPLATES
8.1 api_call_template()
Template générique pour API call (GET/POST/PUT/PATCH/DELETE au choix du développeur).
Exemple :
@api_call_with_retry("RC")
def get_all_organizations():
"""Example API call: Get all organizations."""
client = get_httpx_client()
client.base_url = MICROSERVICES["RC"]["base_url"]
response = client.get(
MICROSERVICES["RC"]["endpoints"]["organizations"],
headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"},
timeout=API_TIMEOUT # Ou timeout spécifique
)
response.raise_for_status()
return response.json()
@api_call_with_retry("RC")
def search_inclusions(organization_id, limit, page):
"""Example API call: Search inclusions (POST)."""
client = get_httpx_client()
client.base_url = MICROSERVICES["RC"]["base_url"]
response = client.post(
f"{MICROSERVICES['RC']['endpoints']['search_inclusions']}?limit={limit}&page={page}",
headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"},
json={"protocolId": "...", "center": organization_id, "keywords": ""},
timeout=API_TIMEOUT
)
response.raise_for_status()
return response.json()
Notes :
- Toujours retourner
response.json() - Le développeur choisit la méthode HTTP (GET/POST/PUT/DELETE)
- Timeout configurable globalement (API_TIMEOUT) ou localement
Source : eb_dashboard.py:773-843 (exemples)
9. FONCTION MAIN
9.1 Structure
def main():
"""Main processing function."""
global main_thread_pool, subtasks_thread_pool
# ============================================================================
# AUTHENTICATION
# ============================================================================
print()
login_status = login()
while login_status == "Error":
login_status = login()
if login_status == "Exit":
return
# ============================================================================
# CONFIGURATION
# ============================================================================
print()
number_of_threads = int(
questionary.text(
"Number of threads:",
default="12",
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS
).ask()
)
# ============================================================================
# INITIALIZATION
# ============================================================================
start_time = perf_counter()
# Initialize thread pools
main_thread_pool = ThreadPoolExecutor(max_workers=number_of_threads)
subtasks_thread_pool = ThreadPoolExecutor(max_workers=SUBTASKS_POOL_SIZE)
# ============================================================================
# MAIN PROCESSING BLOCK
# ============================================================================
print()
console.print("[bold cyan]Starting main processing...[/bold cyan]")
# TODO: Developer implements processing logic here
# Example structure:
#
# with tqdm(total=total_items, desc="Processing", bar_format=custom_bar_format) as pbar:
# with main_thread_pool as executor:
# futures = [executor.submit(process_item, item) for item in items]
# for future in as_completed(futures):
# try:
# result = future.result()
# # Process result
# pbar.update(1)
# except Exception as exc:
# logging.critical(f"Error in worker: {exc}", exc_info=True)
# print(f"\nCRITICAL ERROR: {exc}")
# executor.shutdown(wait=False, cancel_futures=True)
# raise
# ============================================================================
# FINALIZATION
# ============================================================================
print()
print(f"Elapsed time: {str(timedelta(seconds=perf_counter() - start_time))}")
Source : eb_dashboard.py:1090-1285 (adapté)
10. POINT D'ENTREE
10.1 Structure avec finally
if __name__ == '__main__':
# Logging configuration (filename basé sur le nom du script)
script_name = os.path.splitext(os.path.basename(__file__))[0]
log_file_name = f"{script_name}.log"
logging.basicConfig(
level=LOG_LEVEL,
format=LOG_FORMAT,
filename=log_file_name,
filemode='w'
)
try:
main()
except Exception as e:
logging.critical(f"Script terminated with exception: {e}", exc_info=True)
print(f"\nScript stopped due to error: {e}")
finally:
# Shutdown thread pools
if 'main_thread_pool' in globals() and main_thread_pool:
main_thread_pool.shutdown(wait=False, cancel_futures=True)
if 'subtasks_thread_pool' in globals() and subtasks_thread_pool:
subtasks_thread_pool.shutdown(wait=False, cancel_futures=True)
# Pause before exit (pour que la console ne se ferme pas immédiatement)
print('\n')
input("Press Enter to exit...")
Source : eb_dashboard.py:1287-1299 (adapté)
11. PLAN D'IMPLEMENTATION
Phase 1 : Squelette et infrastructure
- Créer
eb_script_template.py - Ajouter imports et constantes
- Implémenter variables globales
- Implémenter utilitaires (get_nested_value, get_httpx_client, get_thread_position)
Phase 2 : Authentification
- Implémenter login() avec IAM + config-token multi-microservices
- Implémenter new_token(app)
- Implémenter décorateur api_call_with_retry(app)
Phase 3 : API Templates
- Créer exemples d'API calls (GET/POST)
- Documenter les patterns
Phase 4 : Fonction main et point d'entrée
- Implémenter main() avec structure complète
- Implémenter point d'entrée avec finally
- Configurer logging automatique basé sur nom du script
Phase 5 : Documentation et tests
- Ajouter docstrings détaillés
- Ajouter commentaires pour guider le développeur
- Tester le template avec un cas d'usage simple
12. NOTES TECHNIQUES
12.1 Choix de design
- Pas de modules externes : Tous les utilitaires dans un seul fichier pour faciliter la réutilisation
- Configuration par constantes : Facilite la personnalisation sans chercher dans le code
- Dictionnaire MICROSERVICES : Architecture extensible pour ajouter de nouveaux services
- Tokens par app : Permet la gestion indépendante de chaque microservice
- Retry fixe : Délai constant (pas exponentiel) pour simplifier le debugging
12.2 Points d'extension pour le développeur
Le développeur peut personnaliser :
- Constantes (credentials, timeouts, retry, logging level)
- Microservices utilisés (commenter ceux non nécessaires)
- Endpoints (ajouter selon besoins)
- Méthodes HTTP dans les templates
- Logique de traitement dans main()
- Taille du pool de threads
12.3 Compatibilité
- Compatible avec eb_dashboard.py (même architecture)
- Réutilise les mêmes patterns éprouvés
- Extensible pour futurs scripts Endobest