Complete before first tests

This commit is contained in:
2026-02-13 00:19:56 +01:00
parent f513ce798e
commit b1f0bafad9
5 changed files with 948 additions and 701 deletions

25
.gitignore vendored Normal file
View File

@@ -0,0 +1,25 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
.env
venv/
ENV/
NUL

BIN
config/config.xlsx Normal file

Binary file not shown.

View File

@@ -1,699 +0,0 @@
"""
Endobest Script Template
Template for creating scripts to access Endobest clinical research platform data.
FEATURES:
- Multi-microservice authentication (IAM, RC, GDD)
- Thread-safe HTTP client pool
- Multithreading with main pool + subtasks pool
- Automatic retry with token refresh on 401
- Progress bars and logging
- Utilities for JSON navigation
HOW TO USE:
1. Configure MICROSERVICES dict (comment unused services)
2. Implement your processing logic in main() function
3. Use API templates as examples for your own endpoints
4. Customize constants as needed (timeouts, threads, etc.)
QUICK START:
- Run script: python eb_script_template.py
- Login with credentials (defaults provided)
- Choose number of threads
- Processing happens in main() TODO block
For detailed documentation, see Script_template_spec.md
"""
import json
import logging
import os
import sys
import threading
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import timedelta
from time import perf_counter, sleep
import functools
import httpx
import questionary
from tqdm import tqdm
from rich.console import Console
# ============================================================================
# CONFIGURATION - CREDENTIALS
# ============================================================================
DEFAULT_USER_NAME = "ziwig-invest2@yopmail.com"
DEFAULT_PASSWORD = "pbrrA765$bP3beiuyuiyhiuy!agx"
REALME = "ziwig-pro"
# ============================================================================
# CONFIGURATION - MICROSERVICES
# ============================================================================
# Comment out unused microservices to skip their token configuration
MICROSERVICES = {
"IAM": {
"app_id": None, # IAM doesn't use app_id
"base_url": "https://api-auth.ziwig-connect.com",
"endpoints": {
"login": "/api/auth/{REALME}/login", # POST : Body = {"username": "{user_name}", "password": "{pass}"}
"refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"}
"get_roles": "/api/profiles/paginate", # POST : Body = {"limit": 100, "currentPage": 1, "sort": [], "filters": {}}
"get_user_by_id": "/api/users/find/{user_id}?domaine={REALME}", # GET
"get_applications": "/api/applications", # GET
"get_profiles_by_app_id": "/api/identity-profiles/paginate", # POST : Body = {"page":null,"limit":100,"search":{},"clientId":"{app_id}","type":"user"}
"get_users_by_profile_id": "/api/identity-profiles/{profile_id}/users", # GET
}
},
"RC": {
"app_id": "602aea51-cdb2-4f73-ac99-fd84050dc393",
"base_url": "https://api-hcp.ziwig-connect.com",
"endpoints": {
"config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}}
"refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"}
"organizations": "/api/inclusions/getAllOrganizations", # GET
"statistics": "/api/inclusions/inclusion-statistics", # POST : Body = {"center": "rc_endobest_current_center}}", "protocolId": "{rc_endobest_prot_id}", "excludedCenters": {rc_endobest_excl_centers}}
"search_inclusions": "/api/inclusions/search?limit={limit}&page={page}", # POST : Body = {"protocolId": "3c7bcb4d-91ed-4e9f-b93f-99d8447a276e", "center": organization_id, "keywords": ""}
"record_by_patient": "/api/records/byPatient", # POST : Body = {"center": "{rc_endobest_current_center}", "patientId": "{patient_id}", "mode": "exchange", "state": "ongoing", "includeEndoParcour": false, "sourceClient": "pro_prm"},
"surveys": "/api/surveys/filter/with-answers", #POST : Body = {"context": "clinic_research", "subject": "{patient_id}", "blockedQcmVersions": {blocked_qcm_versions}}
}
},
"GDD": {
"app_id": "4f5ac063-6a22-4e2c-bda5-b50c0dddab79",
"base_url": "https://api-lab.ziwig-connect.com",
"endpoints": {
"config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}}
"refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"}
"request_by_tube": "/api/requests/by-tube-id/{tube_id}", # GET
}
},
"HRD": {
"app_id": "93bc44fd-c64b-4fff-a450-f3cba956e934",
"base_url": "https://api-resources.ziwig-connect.com",
"endpoints": {
"config_token": "/api/auth/config-token", # POST : Body = {"userId": "{user_id}", "clientId": "{app_id}", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"}}
"refresh": "/api/auth/refreshToken", # POST : Body = {"refresh_token": "{refresh_token}"}
"pro_by_id": "api/entity-manager/meta/{model}/data/nodes/pro/{pro_id}?relationships=2", # GET
"get_pros_by_endobest_center": "/api/entity-manager/meta/modele_fr/data/orga/{organization_id}/centers/pros?limit=1000", # GET
}
},
}
# ============================================================================
# CONFIGURATION - THREADING
# ============================================================================
MAX_THREADS = 20 # Maximum threads for main pool
SUBTASKS_POOL_SIZE = 40 # Fixed size for subtasks pool
# ============================================================================
# CONFIGURATION - RETRY & TIMEOUTS
# ============================================================================
ERROR_MAX_RETRY = 10 # Max retry attempts for API calls
WAIT_BEFORE_RETRY = 0.5 # Delay in seconds between retries (fixed)
API_TIMEOUT = 60 # Default timeout for API calls (seconds)
MAX_BATCHS_OF_RETRIES = 3 # Max batches of retries for API calls
WAIT_BEFORE_NEW_BATCH_OF_RETRIES = 5 # Delay in seconds between retry batches
# ============================================================================
# CONFIGURATION - LOGGING
# ============================================================================
LOG_LEVEL = logging.INFO # Change to DEBUG for detailed logs
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
# LOG_FILE_NAME auto-generated based on script name in __main__
# ============================================================================
# CONFIGURATION - PROGRESS BARS
# ============================================================================
BAR_N_FMT_WIDTH = 4
BAR_TOTAL_FMT_WIDTH = 4
BAR_TIME_WIDTH = 8
BAR_RATE_WIDTH = 10
custom_bar_format = ("{l_bar}{bar}"
f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} "
f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, "
f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}")
# ============================================================================
# GLOBAL VARIABLES
# ============================================================================
# Tokens storage: {app_name: {"access_token": ..., "refresh_token": ...}}
tokens = {}
# Thread-safe HTTP client pool (one client per thread)
httpx_clients = {}
# Thread management
threads_list = []
_threads_list_lock = threading.Lock()
_token_refresh_lock = threading.Lock()
# Thread pools (initialized in main())
main_thread_pool = None
subtasks_thread_pool = None
# User interaction lock
_user_interaction_lock = threading.Lock()
# Thread-local storage for context
thread_local_storage = threading.local()
# Rich console for formatted output
console = Console()
# ============================================================================
# UTILITIES
# ============================================================================
def get_nested_value(data_structure, path, default=None):
"""
Extract value from nested dict/list structures with wildcard support.
Args:
data_structure: Nested dict/list to navigate
path: List of keys/indices. Use '*' for list wildcard
default: Value to return if path not found
Returns:
Value at path, or default if not found
Examples:
get_nested_value({"a": {"b": 1}}, ["a", "b"]) -> 1
get_nested_value({"items": [{"x": 1}, {"x": 2}]}, ["items", "*", "x"]) -> [1, 2]
"""
if data_structure is None:
return "$$$$ No Data"
if not path:
return default
# Handle wildcard in path
if "*" in path:
wildcard_index = path.index("*")
path_before = path[:wildcard_index]
path_after = path[wildcard_index+1:]
# Helper for non-wildcard path resolution
def _get_simple_nested_value(ds, p, d):
cl = ds
for k in p:
if isinstance(cl, dict):
cl = cl.get(k)
elif isinstance(cl, list):
try:
if isinstance(k, int) and -len(cl) <= k < len(cl):
cl = cl[k]
else:
return d
except (IndexError, TypeError):
return d
else:
return d
if cl is None:
return d
return cl
base_level = _get_simple_nested_value(data_structure, path_before, default)
if not isinstance(base_level, list):
return default
results = []
for item in base_level:
value = get_nested_value(item, path_after, default)
if value is not default and value != "$$$$ No Data":
results.append(value)
# Flatten one level for multiple wildcards
final_results = []
for res in results:
if isinstance(res, list):
final_results.extend(res)
else:
final_results.append(res)
return final_results
# No wildcard - standard traversal
current_level = data_structure
for key_or_index in path:
if isinstance(current_level, dict):
current_level = current_level.get(key_or_index)
if current_level is None:
return default
elif isinstance(current_level, list):
try:
if isinstance(key_or_index, int) and -len(current_level) <= key_or_index < len(current_level):
current_level = current_level[key_or_index]
else:
return default
except (IndexError, TypeError):
return default
else:
return default
return current_level
def get_httpx_client() -> httpx.Client:
"""
Get or create thread-local HTTP client with keep-alive enabled.
Each thread gets its own client to avoid connection conflicts.
Returns:
httpx.Client instance for current thread
"""
global httpx_clients
thread_id = threading.get_ident()
if thread_id not in httpx_clients:
httpx_clients[thread_id] = httpx.Client(
headers={"Connection": "keep-alive"},
limits=httpx.Limits(max_keepalive_connections=20, max_connections=100)
)
return httpx_clients[thread_id]
def get_thread_position():
"""
Get position of current thread in threads list.
Used for managing progress bar positions in multithreaded environment.
Returns:
Zero-based index of current thread
"""
global threads_list
thread_id = threading.get_ident()
with _threads_list_lock:
if thread_id not in threads_list:
threads_list.append(thread_id)
return len(threads_list) - 1
else:
return threads_list.index(thread_id)
def clear_httpx_client():
"""
Clear the thread-local HTTP client to force creation of a new one.
Useful for resetting connections after errors.
"""
global httpx_clients
thread_id = threading.get_ident()
if thread_id in httpx_clients:
try:
httpx_clients[thread_id].close()
except Exception:
pass
del httpx_clients[thread_id]
def run_with_context(func, context, *args, **kwargs):
"""
Wrapper to set thread-local context before running a function in a new thread.
Useful for ThreadPoolExecutor where context is lost.
"""
thread_local_storage.current_patient_context = context
return func(*args, **kwargs)
# ============================================================================
# AUTHENTICATION
# ============================================================================
def login():
"""
Authenticate with IAM and configure tokens for all microservices.
Process:
1. Prompt for credentials (with defaults)
2. Login to IAM -> get master_token and user_id
3. For each microservice (except IAM): call config-token API
4. Store access_token and refresh_token for each service
Returns:
"Success": Authentication succeeded for all services
"Error": Authentication failed (can retry)
"Exit": User cancelled login
"""
global tokens
# Prompt for credentials
user_name = questionary.text("login:", default=DEFAULT_USER_NAME).ask()
password = questionary.password("password:", default=DEFAULT_PASSWORD).ask()
if not (user_name and password):
return "Exit"
# Step 1: Login to IAM
try:
client = get_httpx_client()
client.base_url = MICROSERVICES["IAM"]["base_url"]
response = client.post(
MICROSERVICES["IAM"]["endpoints"]["login"].format(**{**globals(),**locals()}),
json={"username": user_name, "password": password},
timeout=20
)
response.raise_for_status()
master_token = response.json()["access_token"]
user_id = response.json()["userId"]
tokens["IAM"] = {
"access_token": master_token,
"refresh_token": response.json()["refresh_token"]
}
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
print(f"Login Error: {exc}")
logging.warning(f"Login Error: {exc}")
return "Error"
# Step 2: Configure tokens for each microservice
for app_name, app_config in MICROSERVICES.items():
if app_name == "IAM":
continue # IAM doesn't need config-token
try:
client = get_httpx_client()
client.base_url = app_config["base_url"]
response = client.post(
app_config["endpoints"]["config_token"].format(**{**globals(),**locals()}),
headers={"Authorization": f"Bearer {master_token}"},
json={
"userId": user_id,
"clientId": app_config["app_id"],
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"
},
timeout=20
)
response.raise_for_status()
tokens[app_name] = {
"access_token": response.json()["access_token"],
"refresh_token": response.json()["refresh_token"]
}
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
print(f"Config-token Error for {app_name}: {exc}")
logging.warning(f"Config-token Error for {app_name}: {exc}")
return "Error"
print("\nLogin Success")
return "Success"
def new_token(app):
"""
Refresh access token for a specific microservice.
Uses refresh_token to obtain new access_token and refresh_token.
Thread-safe with lock to prevent concurrent refresh attempts.
Args:
app: Microservice name (e.g., "RC", "GDD")
Raises:
httpx.RequestError: If refresh fails after all retries
"""
global tokens
with _token_refresh_lock:
for attempt in range(ERROR_MAX_RETRY):
try:
client = get_httpx_client()
client.base_url = MICROSERVICES[app]["base_url"]
response = client.post(
MICROSERVICES[app]["endpoints"]["refresh"].format(**{**globals(),**locals()}),
headers={"Authorization": f"Bearer {tokens[app]['access_token']}"},
json={"refresh_token": tokens[app]["refresh_token"]},
timeout=20
)
response.raise_for_status()
tokens[app]["access_token"] = response.json()["access_token"]
tokens[app]["refresh_token"] = response.json()["refresh_token"]
return
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
logging.warning(f"Refresh Token Error for {app} (Attempt {attempt + 1}): {exc}")
if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY)
logging.critical(f"Persistent error in refresh_token for {app}")
raise httpx.RequestError(message=f"Persistent error in refresh_token for {app}")
# ============================================================================
# DECORATORS
# ============================================================================
def api_call_with_retry(func):
"""Decorator for API calls with automatic retry and token refresh on 401 errors"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
func_name = func.__name__
total_attempts = 0
batch_count = 1
while True:
for attempt in range(ERROR_MAX_RETRY):
total_attempts += 1
try:
return func(*args, **kwargs)
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
logging.warning(f"Error in {func_name} (Attempt {total_attempts}): {exc}")
# Refresh the thread-local client if an error occurs
# to avoid potential pool corruption or stale connections
clear_httpx_client()
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
logging.info(f"Token expired for {func_name}. Refreshing token.")
new_token()
if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY)
else:
# Max retries reached for this batch
if batch_count < MAX_BATCHS_OF_RETRIES:
logging.warning(f"Batch {batch_count}/{MAX_BATCHS_OF_RETRIES} failed for {func_name}. "
f"Waiting {WAIT_BEFORE_NEW_BATCH_OF_RETRIES}s before automatic retry batch.")
batch_count += 1
sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES)
break # Exit for loop to restart batch in while True
else:
# All automatic batches exhausted, ask the user
with _user_interaction_lock:
console.print(f"\n[bold red]Persistent error in {func_name} after {batch_count} batches ({total_attempts} attempts).[/bold red]")
console.print(f"[red]Exception: {exc}[/red]")
choice = questionary.select(
f"What would you like to do for {func_name}?",
choices=[
"Retry (try another batch of retries)",
"Ignore (return None and continue)",
"Stop script (critical error)"
]
).ask()
if choice == "Retry (try another batch of retries)":
logging.info(f"User chose to retry {func_name}. Restarting batch sequence.")
batch_count = 1 # Reset batch counter for the next interactive round
break # Exit for loop to restart batch in while True
elif choice == "Ignore (return None and continue)":
# Retrieve context if available
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['pseudo']}). Error: {exc}")
return None
else:
logging.critical(f"User chose to stop script after persistent error in {func_name}.")
raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)")
return wrapper
# ============================================================================
# API TEMPLATES
# ============================================================================
# Templates for common API patterns. Duplicate and modify for your needs.
# Remember to:
# - Choose appropriate HTTP method (GET/POST/PUT/DELETE)
# - Update endpoint from MICROSERVICES dict
# - Adjust timeout if needed (global API_TIMEOUT or specific value)
# - Always return response.json()
@api_call_with_retry("RC")
def get_all_organizations():
"""
Example API call using GET method.
Returns:
List of organization dictionaries
"""
client = get_httpx_client()
client.base_url = MICROSERVICES["RC"]["base_url"].format(**{**globals(),**locals()})
response = client.get(
MICROSERVICES["RC"]["endpoints"]["organizations"],
headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"},
timeout=API_TIMEOUT
)
response.raise_for_status()
return response.json()
@api_call_with_retry("RC")
def search_inclusions(organization_id, limit, page):
"""
Example API call using POST method with query params and JSON body.
Args:
organization_id: Organization UUID
limit: Max results per page
page: Page number (1-based)
Returns:
Dict with "data" key containing list of inclusions
"""
client = get_httpx_client()
client.base_url = MICROSERVICES["RC"]["base_url"].format(**{**globals(),**locals()})
response = client.post(
f"{MICROSERVICES['RC']['endpoints']['search_inclusions']}?limit={limit}&page={page}",
headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"},
json={
"protocolId": "3c7bcb4d-91ed-4e9f-b93f-99d8447a276e", # TODO: Configure if needed
"center": organization_id,
"keywords": ""
},
timeout=API_TIMEOUT
)
response.raise_for_status()
return response.json()
# ============================================================================
# MAIN PROCESSING
# ============================================================================
def main():
"""
Main processing function.
Structure:
1. Authentication
2. Configuration (thread count)
3. Initialization (thread pools, timing)
4. Main processing block (TODO: implement your logic here)
5. Finalization (elapsed time)
"""
global main_thread_pool, subtasks_thread_pool, thread_local_storage
# ========== AUTHENTICATION ==========
print()
login_status = login()
while login_status == "Error":
login_status = login()
if login_status == "Exit":
return
# ========== CONFIGURATION ==========
print()
number_of_threads = int(
questionary.text(
"Number of threads:",
default="12",
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS
).ask()
)
# ========== INITIALIZATION ==========
start_time = perf_counter()
# Initialize thread pools
main_thread_pool = ThreadPoolExecutor(max_workers=number_of_threads)
subtasks_thread_pool = ThreadPoolExecutor(max_workers=SUBTASKS_POOL_SIZE)
# ========== MAIN PROCESSING BLOCK ==========
print()
console.print("[bold cyan]Starting main processing...[/bold cyan]")
# TODO: IMPLEMENT YOUR PROCESSING LOGIC HERE
#
# Example pattern with progress bar and multithreading:
#
# items = [...] # Your data to process
# futures = []
#
# with tqdm(total=len(items), desc="Processing items",
# bar_format=custom_bar_format) as pbar:
# with main_thread_pool as executor:
#
# for item in items:
#
# # Set thread-local context for detailed error logging in decorators
# ctx = {"id": patient_id, "pseudo": pseudo}
# thread_local_storage.current_patient_context = ctx
#
# futures.append(executor.submit(run_with_context, process_item, ctx, item))
#
# for future in as_completed(futures):
# try:
# result = future.result()
# # Process result here
# pbar.update(1)
# except Exception as exc:
# logging.critical(f"Error in worker: {exc}", exc_info=True)
# print(f"\nCRITICAL ERROR: {exc}")
# executor.shutdown(wait=False, cancel_futures=True)
# raise
#
# Example: Simple test to verify authentication works
# organizations = get_all_organizations()
# console.print(f"[green]Retrieved {len(organizations)} organizations[/green]")
# ========== FINALIZATION ==========
print()
print(f"Elapsed time: {str(timedelta(seconds=perf_counter() - start_time))}")
# ============================================================================
# ENTRY POINT
# ============================================================================
if __name__ == '__main__':
# ========== LOGGING CONFIGURATION ==========
# Auto-generate log filename based on script name
script_name = os.path.splitext(os.path.basename(__file__))[0]
log_file_name = f"{script_name}.log"
logging.basicConfig(
level=LOG_LEVEL,
format=LOG_FORMAT,
filename=log_file_name,
filemode='w'
)
# ========== MAIN EXECUTION ==========
try:
main()
except Exception as e:
logging.critical(f"Script terminated with exception: {e}", exc_info=True)
print(f"\nScript stopped due to error: {e}")
print(traceback.format_exc())
finally:
# ========== CLEANUP ==========
# Shutdown thread pools gracefully
if 'main_thread_pool' in globals() and main_thread_pool:
main_thread_pool.shutdown(wait=False, cancel_futures=True)
if 'subtasks_thread_pool' in globals() and subtasks_thread_pool:
subtasks_thread_pool.shutdown(wait=False, cancel_futures=True)
# Pause before exit (prevents console from closing immediately when launched from Windows Explorer)
print('\n')
input("Press Enter to exit...")

View File

@@ -1,4 +1,3 @@
@echo off
call C:\PythonProjects\.rcvenv\Scripts\activate.bat
python eb_script_template.py %*
python extract_endoconnect_medical_records.py %*

View File

@@ -0,0 +1,922 @@
"""
Extract Endoconnect Medical Records
Automated extraction of patient medical records from the Endoconnect platform.
FEATURES:
- Single-service authentication (Endoconnect)
- Thread-safe HTTP client pool
- Multithreading with progress bars
- Automatic retry on API errors
- Criteria/values configuration from Excel
- JSON export
QUICK START:
- Run script: python extract_endoconnect_medical_records.py
- Login with credentials
- Confirm/edit professional ID and thread count
- Wait for processing to complete
- Output: JSON file in current directory
"""
import json
import logging
import os
import sys
import threading
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from time import perf_counter, sleep
import functools
import httpx
import openpyxl
import questionary
from tqdm import tqdm
from rich.console import Console
# ============================================================================
# CONFIGURATION - CREDENTIALS
# ============================================================================
DEFAULT_USER_NAME = "abdel.lhachimi@gmail.com"
DEFAULT_PASSWORD = "GU$y#C#Cv73XFKyT3j6^"
# ============================================================================
# CONFIGURATION - ENDOCONNECT API
# ============================================================================
ENDOCONNECT_BASE_URL = "https://api-endo.ziwig.com/api/"
LOGIN_PATH = "auth/login"
PATIENTS_LIST_PATH = "patients/list"
MEDICAL_RECORD_PATH = "records"
MEDICAL_EVENTS_PATH = "events"
# ============================================================================
# CONFIGURATION - PROFESSIONAL
# ============================================================================
DEFAULT_PROFESSIONAL_ID = "99990000005"
# ============================================================================
# CONFIGURATION - PAGINATION
# ============================================================================
MAX_PAGE_SIZE = 1000
# ============================================================================
# CONFIGURATION - THREADING
# ============================================================================
MAX_THREADS = 20
# ============================================================================
# CONFIGURATION - EXCEL CONFIG
# ============================================================================
CONFIG_DIR = "config"
CONFIG_WORKBOOK_NAME = "config.xlsx"
CONFIG_CRITERIA_SHEET_NAME = "Criteria"
CONFIG_VALUES_SHEET_NAME = "Criteria_values"
# Column names - criteria sheet
COL_CRITERIA_ID = "criteria_id"
COL_CRITERIA_LABEL = "criteria_name"
COL_CRITERIA_TYPE = "criteria_type"
COL_CRITERIA_LEVEL1_LABEL = "domaine_name"
COL_CRITERIA_LEVEL2_LABEL = "subdomaine_name"
COL_CRITERIA_ORDER = "criteria_order"
# Column names - values sheet
COL_VALUE_CRITERIA_ID = "criteria_id"
COL_VALUE_ID = "criteria_value_id"
COL_VALUE_LABEL = "criteria_value"
# ============================================================================
# CONFIGURATION - OUTPUT
# ============================================================================
OUTPUT_FILE_NAME = "endoconnect_medical_records"
# ============================================================================
# CONFIGURATION - RETRY & TIMEOUTS
# ============================================================================
ERROR_MAX_RETRY = 10
WAIT_BEFORE_RETRY = 1
API_TIMEOUT = 600
MAX_BATCHS_OF_RETRIES = 3
WAIT_BEFORE_NEW_BATCH_OF_RETRIES = 20
# ============================================================================
# CONFIGURATION - LOGGING
# ============================================================================
LOG_LEVEL = logging.INFO
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
# ============================================================================
# CONFIGURATION - PROGRESS BARS
# ============================================================================
BAR_N_FMT_WIDTH = 4
BAR_TOTAL_FMT_WIDTH = 4
BAR_TIME_WIDTH = 8
BAR_RATE_WIDTH = 10
custom_bar_format = ("{l_bar}{bar}"
f" {{n_fmt:>{BAR_N_FMT_WIDTH}}}/{{total_fmt:<{BAR_TOTAL_FMT_WIDTH}}} "
f"[{{elapsed:<{BAR_TIME_WIDTH}}}<{{remaining:>{BAR_TIME_WIDTH}}}, "
f"{{rate_fmt:>{BAR_RATE_WIDTH}}}]{{postfix}}")
# ============================================================================
# GLOBAL VARIABLES
# ============================================================================
token = None
# Thread-safe HTTP client pool (one client per thread)
httpx_clients = {}
# Thread management
threads_list = []
_threads_list_lock = threading.Lock()
# Thread pool (initialized in main())
main_thread_pool = None
# User interaction lock
_user_interaction_lock = threading.Lock()
# Thread-local storage for context
thread_local_storage = threading.local()
# Rich console for formatted output
console = Console()
# Criteria and values configuration (loaded from Excel)
criteria_config = {}
values_config = {}
# ============================================================================
# UTILITIES
# ============================================================================
def get_nested_value(data_structure, path, default=None):
"""
Extract value from nested dict/list structures with wildcard support.
Args:
data_structure: Nested dict/list to navigate
path: List of keys/indices. Use '*' for list wildcard
default: Value to return if path not found
Returns:
Value at path, or default if not found
Examples:
get_nested_value({"a": {"b": 1}}, ["a", "b"]) -> 1
get_nested_value({"items": [{"x": 1}, {"x": 2}]}, ["items", "*", "x"]) -> [1, 2]
"""
if data_structure is None:
return "$$$$ No Data"
if not path:
return default
# Handle wildcard in path
if "*" in path:
wildcard_index = path.index("*")
path_before = path[:wildcard_index]
path_after = path[wildcard_index+1:]
# Helper for non-wildcard path resolution
def _get_simple_nested_value(ds, p, d):
cl = ds
for k in p:
if isinstance(cl, dict):
cl = cl.get(k)
elif isinstance(cl, list):
try:
if isinstance(k, int) and -len(cl) <= k < len(cl):
cl = cl[k]
else:
return d
except (IndexError, TypeError):
return d
else:
return d
if cl is None:
return d
return cl
base_level = _get_simple_nested_value(data_structure, path_before, default)
if not isinstance(base_level, list):
return default
results = []
for item in base_level:
value = get_nested_value(item, path_after, default)
if value is not default and value != "$$$$ No Data":
results.append(value)
# Flatten one level for multiple wildcards
final_results = []
for res in results:
if isinstance(res, list):
final_results.extend(res)
else:
final_results.append(res)
return final_results
# No wildcard - standard traversal
current_level = data_structure
for key_or_index in path:
if isinstance(current_level, dict):
current_level = current_level.get(key_or_index)
if current_level is None:
return default
elif isinstance(current_level, list):
try:
if isinstance(key_or_index, int) and -len(current_level) <= key_or_index < len(current_level):
current_level = current_level[key_or_index]
else:
return default
except (IndexError, TypeError):
return default
else:
return default
return current_level
def get_httpx_client() -> httpx.Client:
"""
Get or create thread-local HTTP client with keep-alive enabled.
Each thread gets its own client to avoid connection conflicts.
Returns:
httpx.Client instance for current thread
"""
global httpx_clients
thread_id = threading.get_ident()
if thread_id not in httpx_clients:
httpx_clients[thread_id] = httpx.Client(
headers={"Connection": "keep-alive"},
limits=httpx.Limits(max_keepalive_connections=20, max_connections=100)
)
return httpx_clients[thread_id]
def get_thread_position():
"""
Get position of current thread in threads list.
Used for managing progress bar positions in multithreaded environment.
Returns:
Zero-based index of current thread
"""
global threads_list
thread_id = threading.get_ident()
with _threads_list_lock:
if thread_id not in threads_list:
threads_list.append(thread_id)
return len(threads_list) - 1
else:
return threads_list.index(thread_id)
def clear_httpx_client():
"""
Clear the thread-local HTTP client to force creation of a new one.
Useful for resetting connections after errors.
"""
global httpx_clients
thread_id = threading.get_ident()
if thread_id in httpx_clients:
try:
httpx_clients[thread_id].close()
except Exception:
pass
del httpx_clients[thread_id]
def run_with_context(func, context, *args, **kwargs):
"""
Wrapper to set thread-local context before running a function in a new thread.
Useful for ThreadPoolExecutor where context is lost.
"""
thread_local_storage.current_patient_context = context
return func(*args, **kwargs)
# ============================================================================
# CONFIG PATH RESOLUTION (PyInstaller compatible)
# ============================================================================
def get_config_path():
"""Resolve path to the config directory, compatible with PyInstaller packaging."""
if getattr(sys, '_MEIPASS', None):
return os.path.join(sys._MEIPASS, CONFIG_DIR)
return os.path.join(os.path.dirname(os.path.abspath(__file__)), CONFIG_DIR)
# ============================================================================
# EXCEL CONFIGURATION LOADING
# ============================================================================
def load_criteria_config():
"""
Load criteria and values configuration from the Excel workbook.
Populates global dicts:
- criteria_config: {criteria_id: {label, type, level1, level2, order}}
- values_config: {criteria_id: {value_id: value_label}}
"""
global criteria_config, values_config
config_path = os.path.join(get_config_path(), CONFIG_WORKBOOK_NAME)
if not os.path.exists(config_path):
logging.critical(f"Configuration file not found: {config_path}")
raise FileNotFoundError(f"Configuration file not found: {config_path}")
wb = openpyxl.load_workbook(config_path, read_only=True, data_only=True)
# --- Load criteria sheet ---
if CONFIG_CRITERIA_SHEET_NAME not in wb.sheetnames:
logging.critical(f"Sheet '{CONFIG_CRITERIA_SHEET_NAME}' not found in {CONFIG_WORKBOOK_NAME}")
raise ValueError(f"Sheet '{CONFIG_CRITERIA_SHEET_NAME}' not found in {CONFIG_WORKBOOK_NAME}")
ws_criteria = wb[CONFIG_CRITERIA_SHEET_NAME]
rows = list(ws_criteria.iter_rows(values_only=True))
if not rows:
logging.critical(f"Sheet '{CONFIG_CRITERIA_SHEET_NAME}' is empty")
raise ValueError(f"Sheet '{CONFIG_CRITERIA_SHEET_NAME}' is empty")
header = [str(cell).strip() if cell else "" for cell in rows[0]]
# Find column indices
required_columns = {
COL_CRITERIA_ID: None, COL_CRITERIA_LABEL: None, COL_CRITERIA_TYPE: None,
COL_CRITERIA_LEVEL1_LABEL: None, COL_CRITERIA_LEVEL2_LABEL: None, COL_CRITERIA_ORDER: None,
}
for col_name in required_columns:
if col_name not in header:
logging.critical(f"Missing column '{col_name}' in '{CONFIG_CRITERIA_SHEET_NAME}' sheet. Available columns: {header}")
raise ValueError(f"Missing column '{col_name}' in '{CONFIG_CRITERIA_SHEET_NAME}' sheet")
required_columns[col_name] = header.index(col_name)
idx_id = required_columns[COL_CRITERIA_ID]
idx_label = required_columns[COL_CRITERIA_LABEL]
idx_type = required_columns[COL_CRITERIA_TYPE]
idx_level1 = required_columns[COL_CRITERIA_LEVEL1_LABEL]
idx_level2 = required_columns[COL_CRITERIA_LEVEL2_LABEL]
idx_order = required_columns[COL_CRITERIA_ORDER]
criteria_config = {}
for row in rows[1:]:
crit_id = str(row[idx_id]).strip() if row[idx_id] is not None else None
if not crit_id:
continue
criteria_config[crit_id] = {
"label": str(row[idx_label]).strip() if row[idx_label] else crit_id,
"type": str(row[idx_type]).strip().upper() if row[idx_type] else "TEXT",
"level1": str(row[idx_level1]).strip() if row[idx_level1] else None,
"level2": str(row[idx_level2]).strip() if row[idx_level2] else None,
"order": int(row[idx_order]) if row[idx_order] is not None else 9999,
}
# --- Load values sheet ---
if CONFIG_VALUES_SHEET_NAME not in wb.sheetnames:
logging.critical(f"Sheet '{CONFIG_VALUES_SHEET_NAME}' not found in {CONFIG_WORKBOOK_NAME}")
raise ValueError(f"Sheet '{CONFIG_VALUES_SHEET_NAME}' not found in {CONFIG_WORKBOOK_NAME}")
ws_values = wb[CONFIG_VALUES_SHEET_NAME]
rows_v = list(ws_values.iter_rows(values_only=True))
if not rows_v:
logging.critical(f"Sheet '{CONFIG_VALUES_SHEET_NAME}' is empty")
raise ValueError(f"Sheet '{CONFIG_VALUES_SHEET_NAME}' is empty")
header_v = [str(cell).strip() if cell else "" for cell in rows_v[0]]
required_columns_v = {COL_VALUE_CRITERIA_ID: None, COL_VALUE_ID: None, COL_VALUE_LABEL: None}
for col_name in required_columns_v:
if col_name not in header_v:
logging.critical(f"Missing column '{col_name}' in '{CONFIG_VALUES_SHEET_NAME}' sheet. Available columns: {header_v}")
raise ValueError(f"Missing column '{col_name}' in '{CONFIG_VALUES_SHEET_NAME}' sheet")
required_columns_v[col_name] = header_v.index(col_name)
idx_v_crit_id = required_columns_v[COL_VALUE_CRITERIA_ID]
idx_v_id = required_columns_v[COL_VALUE_ID]
idx_v_label = required_columns_v[COL_VALUE_LABEL]
values_config = {}
for row in rows_v[1:]:
crit_id = str(row[idx_v_crit_id]).strip() if row[idx_v_crit_id] is not None else None
val_id = str(row[idx_v_id]).strip() if row[idx_v_id] is not None else None
val_label = str(row[idx_v_label]).strip() if row[idx_v_label] is not None else val_id
if not crit_id or not val_id:
continue
if crit_id not in values_config:
values_config[crit_id] = {}
values_config[crit_id][val_id] = val_label
wb.close()
logging.info(f"Loaded {len(criteria_config)} criteria and {sum(len(v) for v in values_config.values())} values from config")
console.print(f"[green]Config loaded: {len(criteria_config)} criteria, "
f"{sum(len(v) for v in values_config.values())} values[/green]")
# ============================================================================
# DECORATOR - API CALL WITH RETRY
# ============================================================================
def api_call_with_retry(func):
"""Decorator for API calls with automatic retry on errors."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
func_name = func.__name__
total_attempts = 0
batch_count = 1
while True:
for attempt in range(ERROR_MAX_RETRY):
total_attempts += 1
try:
return func(*args, **kwargs)
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "name": "Unknown"})
logging.warning(f"Error in {func_name} [Patient {ctx['id']}] ({ctx['name']}) (Attempt {total_attempts}): {exc}")
clear_httpx_client()
if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY)
else:
if batch_count < MAX_BATCHS_OF_RETRIES:
logging.warning(f"Batch {batch_count}/{MAX_BATCHS_OF_RETRIES} failed for {func_name} "
f"[Patient {ctx['id']}] ({ctx['name']}). "
f"Waiting {WAIT_BEFORE_NEW_BATCH_OF_RETRIES}s before automatic retry batch.")
batch_count += 1
sleep(WAIT_BEFORE_NEW_BATCH_OF_RETRIES)
break
else:
with _user_interaction_lock:
console.print(f"\n[bold red]Persistent error in {func_name} [Patient {ctx['id']}] ({ctx['name']}) "
f"after {batch_count} batches ({total_attempts} attempts).[/bold red]")
console.print(f"[red]Exception: {exc}[/red]")
choice = questionary.select(
f"What would you like to do for {func_name}?",
choices=[
"Retry (try another batch of retries)",
"Ignore (return None and continue)",
"Stop script (critical error)"
]
).ask()
if choice == "Retry (try another batch of retries)":
logging.info(f"User chose to retry {func_name}. Restarting batch sequence.")
batch_count = 1
break
elif choice == "Ignore (return None and continue)":
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "name": "Unknown"})
logging.warning(f"[IGNORE] User opted to skip {func_name} for Patient {ctx['id']} ({ctx['name']}). Error: {exc}")
return None
else:
logging.critical(f"User chose to stop script after persistent error in {func_name}.")
raise httpx.RequestError(message=f"Persistent error in {func_name} (stopped by user)")
return wrapper
# ============================================================================
# AUTHENTICATION
# ============================================================================
def login():
"""
Authenticate with Endoconnect.
Returns:
"Success": Authentication succeeded
"Error": Authentication failed (can retry)
"Exit": User cancelled login
"""
global token
user_name = questionary.text("login:", default=DEFAULT_USER_NAME).ask()
password = questionary.password("password:", default=DEFAULT_PASSWORD).ask()
if not (user_name and password):
return "Exit"
try:
client = get_httpx_client()
response = client.post(
f"{ENDOCONNECT_BASE_URL}{LOGIN_PATH}",
json={"email": user_name, "password": password},
timeout=20
)
response.raise_for_status()
token = response.json()["token"]
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
print(f"Login Error: {exc}")
logging.warning(f"Login Error: {exc}")
return "Error"
print("\nLogin Success")
return "Success"
# ============================================================================
# API FUNCTIONS
# ============================================================================
@api_call_with_retry
def get_patients(professional_id):
"""Get all patients for a given professional ID (RPPS)."""
client = get_httpx_client()
response = client.post(
f"{ENDOCONNECT_BASE_URL}{PATIENTS_LIST_PATH}",
headers={"Authorization": f"Bearer {token}"},
json={
"page": 1,
"pageSize": MAX_PAGE_SIZE,
"RPPS": professional_id,
"search": ""
},
timeout=API_TIMEOUT
)
response.raise_for_status()
return response.json()
@api_call_with_retry
def get_medical_record(patient_id):
"""Get medical record for a patient. Returns docs[0] or None."""
client = get_httpx_client()
response = client.get(
f"{ENDOCONNECT_BASE_URL}{MEDICAL_RECORD_PATH}?profile={patient_id}",
headers={"Authorization": f"Bearer {token}"},
timeout=API_TIMEOUT
)
response.raise_for_status()
data = response.json()
docs = data.get("docs", [])
if len(docs) != 1:
ctx = getattr(thread_local_storage, "current_patient_context", {"id": patient_id, "name": "Unknown"})
logging.error(f"Expected exactly 1 doc in medical record for patient {ctx['id']} ({ctx['name']}), got {len(docs)}")
if len(docs) == 0:
return None
return docs[0]
@api_call_with_retry
def get_medical_events(patient_id):
"""Get medical events for a patient. Returns docs array (may be empty)."""
client = get_httpx_client()
response = client.get(
f"{ENDOCONNECT_BASE_URL}{MEDICAL_EVENTS_PATH}?profile={patient_id}",
headers={"Authorization": f"Bearer {token}"},
timeout=API_TIMEOUT
)
response.raise_for_status()
data = response.json()
return data.get("docs", [])
# ============================================================================
# RESULT BUILDING
# ============================================================================
def resolve_criteria_value(criteria_id, raw_value, patient_id="Unknown"):
"""
Resolve the display value for a criteria answer.
For TEXT/NUMERIC/DATE: use raw value directly (join with " | " if array).
For MULTIBOOLEAN/CHECKLIST: lookup value labels from values_config.
"""
config = criteria_config.get(criteria_id)
if not config:
logging.warning(f"[Patient {patient_id}] Unknown criteria_id: {criteria_id}, raw_value: {raw_value}")
if isinstance(raw_value, list):
return " | ".join(str(v) for v in raw_value)
return raw_value
crit_type = config["type"]
if crit_type in ("TEXT", "NUMERIC", "DATE"):
if isinstance(raw_value, list):
return " | ".join(str(v) for v in raw_value)
return raw_value
elif crit_type in ("MULTIBOOLEAN", "CHECKLIST"):
val_lookup = values_config.get(criteria_id, {})
if not val_lookup:
logging.warning(f"[Patient {patient_id}] No values configured for MULTIBOOLEAN/CHECKLIST criteria '{criteria_id}' (label: {config['label']})")
if isinstance(raw_value, list):
labels = []
for v in raw_value:
v_str = str(v).strip()
label = val_lookup.get(v_str)
if label is None:
logging.warning(f"[Patient {patient_id}] Unknown value_id '{v_str}' for criteria '{criteria_id}' (label: {config['label']})")
labels.append(v_str)
else:
labels.append(label)
return " | ".join(labels)
else:
v_str = str(raw_value).strip()
label = val_lookup.get(v_str)
if label is None:
logging.warning(f"[Patient {patient_id}] Unknown value_id '{v_str}' for criteria '{criteria_id}' (label: {config['label']})")
return v_str
return label
else:
logging.warning(f"[Patient {patient_id}] Unknown criteria type '{crit_type}' for criteria '{criteria_id}' (label: {config['label']})")
if isinstance(raw_value, list):
return " | ".join(str(v) for v in raw_value)
return raw_value
def build_detail(answers, patient_id="Unknown", use_nesting=True):
"""
Build an ordered detail object from answers.
Attributes are inserted in criteria_order.
If use_nesting=True (record_detail): applies level1/level2 nesting.
If use_nesting=False (event_detail): flat structure, no nesting.
"""
if not answers:
return {}
# Collect answers with their config, filter unknown criteria
enriched = []
for answer in answers:
crit_id = str(answer.get("criteria", "")).strip()
raw_value = answer.get("value")
config = criteria_config.get(crit_id)
if not config:
logging.warning(f"[Patient {patient_id}] Skipping unknown criteria_id in answers: {crit_id}, raw_value: {raw_value}")
continue
enriched.append({
"criteria_id": crit_id,
"label": config["label"],
"type": config["type"],
"level1": config["level1"],
"level2": config["level2"],
"order": config["order"],
"raw_value": raw_value,
})
# Sort by criteria_order
enriched.sort(key=lambda x: x["order"])
# Build the detail dict
detail = {}
for item in enriched:
label = item["label"]
value = resolve_criteria_value(item["criteria_id"], item["raw_value"], patient_id=patient_id)
level1 = item["level1"]
level2 = item["level2"]
if use_nesting and level1:
if level1 not in detail:
detail[level1] = {}
if level2:
if level2 not in detail[level1]:
detail[level1][level2] = {}
detail[level1][level2][label] = value
else:
detail[level1][label] = value
else:
detail[label] = value
return detail
def process_patient(patient):
"""
Process a single patient: fetch medical record and events, build result object.
Args:
patient: Patient dict from get_patients response
Returns:
Dict with patient_ident, record_metadata, record_detail, events
"""
patient_id = patient["_id"]
patient_name = patient.get("fullName", "Unknown")
# Fetch data (sequential)
record = get_medical_record(patient_id)
if record is None:
logging.warning(f"[Patient {patient_id}] ({patient_name}) No medical record returned, record_detail will be empty")
events_data = get_medical_events(patient_id)
if events_data is None:
logging.warning(f"[Patient {patient_id}] ({patient_name}) No medical events returned, events will be empty")
# Build result
result = {
"patient_ident": {
"_id": patient_id,
"fullName": patient_name,
"birthday": patient.get("birthday", ""),
"email": patient.get("email", ""),
},
"record_metadata": {
"createdAt": patient.get("createdAt", ""),
"isFinishMedicalRecord": patient.get("isFinishMedicalRecord", False),
"lastUpdate": patient.get("lasUpdate", ""),
"finishOn": patient.get("finishOn", ""),
"confirmedEndo": patient.get("confirmedEndo", False),
},
"record_detail": build_detail(record.get("answers", []), patient_id=patient_id, use_nesting=True) if record else {},
"events": [
{
"event_date": evt.get("date", ""),
"event_type": evt.get("type", ""),
"event_detail": build_detail(evt.get("answers", []), patient_id=patient_id, use_nesting=False),
}
for evt in (events_data or [])
],
}
return result
# ============================================================================
# MAIN PROCESSING
# ============================================================================
def main():
"""
Main processing function.
Flow:
1. Load Excel config
2. Login
3. Confirm professional ID
4. Confirm thread count
5. Fetch and filter patients
6. Process patients in thread pool
7. Sort and export results to JSON
"""
global main_thread_pool
# ========== LOAD CONFIG ==========
console.print("[bold cyan]Loading criteria configuration...[/bold cyan]")
load_criteria_config()
# ========== AUTHENTICATION ==========
print()
login_status = login()
while login_status == "Error":
login_status = login()
if login_status == "Exit":
return
# ========== PROFESSIONAL ID ==========
print()
professional_id = questionary.text(
"Professional ID (RPPS):",
default=DEFAULT_PROFESSIONAL_ID
).ask()
if not professional_id:
return
# ========== THREAD COUNT ==========
print()
number_of_threads = int(
questionary.text(
"Number of threads:",
default="12",
validate=lambda x: x.isdigit() and 0 < int(x) <= MAX_THREADS
).ask()
)
# ========== INITIALIZATION ==========
start_time = perf_counter()
main_thread_pool = ThreadPoolExecutor(max_workers=number_of_threads)
# ========== FETCH PATIENTS ==========
print()
console.print("[bold cyan]Fetching patients list...[/bold cyan]")
all_patients = get_patients(professional_id)
if all_patients is None:
logging.critical(f"Failed to fetch patients list for professional_id={professional_id}. Aborting.")
console.print("[bold red]Failed to fetch patients list. Aborting.[/bold red]")
return
# Filter: keep only patients with finished medical record
patients = [p for p in all_patients if p.get("isFinishMedicalRecord") is True]
console.print(f"[green]Total patients: {len(all_patients)} | With finished medical record: {len(patients)}[/green]")
if not patients:
console.print("[yellow]No patients with finished medical records found. Nothing to process.[/yellow]")
return
# ========== PROCESS PATIENTS ==========
print()
console.print("[bold cyan]Processing patients...[/bold cyan]")
output = []
futures = []
with tqdm(total=len(patients), desc="Extracting records",
bar_format=custom_bar_format) as pbar:
with main_thread_pool as executor:
for patient in patients:
ctx = {"id": patient["_id"], "name": patient.get("fullName", "Unknown")}
futures.append(
executor.submit(run_with_context, process_patient, ctx, patient)
)
for future in as_completed(futures):
try:
result = future.result()
if result is not None:
output.append(result)
pbar.update(1)
except Exception as exc:
logging.critical(f"Error in worker: {exc}", exc_info=True)
print(f"\nCRITICAL ERROR: {exc}")
executor.shutdown(wait=False, cancel_futures=True)
raise
# ========== SORT RESULTS ==========
output.sort(key=lambda x: (
x.get("patient_ident", {}).get("fullName", ""),
x.get("patient_ident", {}).get("email", "")
))
# ========== EXPORT JSON ==========
timestamp = datetime.now().strftime("%Y%m%d-%H%M")
output_filename = f"{OUTPUT_FILE_NAME}-{timestamp}.json"
with open(output_filename, "w", encoding="utf-8") as f:
json.dump(output, f, indent=2, ensure_ascii=False)
# ========== FINALIZATION ==========
print()
console.print(f"[bold green]Export complete: {len(output)} patients -> {output_filename}[/bold green]")
print(f"Elapsed time: {str(timedelta(seconds=perf_counter() - start_time))}")
# ============================================================================
# ENTRY POINT
# ============================================================================
if __name__ == '__main__':
# ========== LOGGING CONFIGURATION ==========
script_name = os.path.splitext(os.path.basename(__file__))[0]
log_file_name = f"{script_name}.log"
logging.basicConfig(
level=LOG_LEVEL,
format=LOG_FORMAT,
filename=log_file_name,
filemode='w'
)
# ========== MAIN EXECUTION ==========
try:
main()
except Exception as e:
logging.critical(f"Script terminated with exception: {e}", exc_info=True)
print(f"\nScript stopped due to error: {e}")
print(traceback.format_exc())
finally:
# ========== CLEANUP ==========
if 'main_thread_pool' in globals() and main_thread_pool:
main_thread_pool.shutdown(wait=False, cancel_futures=True)
# Pause before exit
print('\n')
input("Press Enter to exit...")