45 KiB
Endobest Clinical Research Dashboard - Technical Documentation
Part 1: General Architecture & Report Generation Workflow
Document Version: 2.0 (Updated with Excel Export feature) Last Updated: 2025-11-08 Audience: Developers, Technical Architects Language: English
Table of Contents
- Overview
- System Architecture
- Module Structure
- Complete Data Collection Workflow
- API Integration
- Multithreading & Performance
- Data Processing Pipeline
- Execution Modes
- Error Handling & Resilience
Overview
The Endobest Clinical Research Dashboard is an automated data collection and processing system designed to extract, validate, and consolidate patient inclusion data from the Endobest clinical research protocol across multiple healthcare organizations.
Key Characteristics
- 100% Externalized Configuration: All extraction fields defined in Excel, zero code changes needed
- Multi-Source Data Integration: Fetches from RC (Research Clinic), GDD (Lab), and questionnaire APIs
- High-Performance Multithreading: 20+ concurrent workers for API parallelization
- Comprehensive Quality Assurance: Built-in coherence checks and regression testing
- Thread-Safe Operations: Dedicated HTTP clients per thread, synchronized access to shared resources
- Automated Error Recovery: Token refresh, automatic retry with exponential backoff
- Audit Trail: Detailed logging and JSON backup versioning
System Architecture
High-Level Component Diagram
┌─────────────────────────────────────────────────────────┐
│ Endobest Dashboard Main Process │
│ eb_dashboard.py │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Block 1-3 │ │ Block 4 │ │ Block 5-6 │ │
│ │ Config & Auth│ │ Config Load │ │ Data Extract │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ↓ ↓ ↓ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Extended Fields Configuration │ │
│ │ (Excel: Mapping Sheet → JSON field mapping) │ │
│ └─────────────────────────────────────────────────┘ │
│ ↓ ↓ ↓ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Block 7 │ │ Block 8 │ │ Block 9 │ │
│ │ API Calls │ │ Orchestration│ │ Quality QA │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ↓ ↓ ↓ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Multithreaded Processing (ThreadPoolExecutor) │ │
│ │ - Organizations: 20 workers (parallel) │ │
│ │ - Requests/Questionnaires: 40 workers (async) │ │
│ └─────────────────────────────────────────────────┘ │
│ ↓ ↓ ↓ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Quality Checks & Validation │ │
│ │ - Coherence Check (stats vs detail) │ │
│ │ - Non-Regression Check (config-driven) │ │
│ └─────────────────────────────────────────────────┘ │
│ ↓ ↓ ↓ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Export & Persistence │ │
│ │ - endobest_inclusions.json │ │
│ │ - endobest_organizations.json │ │
│ │ - Versioned backups (_old suffix) │ │
│ └─────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
↓
┌──────────────────────────────────┐
│ Utility Modules │
├──────────────────────────────────┤
│ • eb_dashboard_utils.py │
│ • eb_dashboard_quality_checks.py │
└──────────────────────────────────┘
↓
┌──────────────────────────────────┐
│ External APIs │
├──────────────────────────────────┤
│ • IAM (Authentication) │
│ • RC (Research Clinic) │
│ • GDD (Lab / Diagnostic Data) │
└──────────────────────────────────┘
Module Structure
1. eb_dashboard.py (Primary Orchestrator)
Size: ~45 KB | Lines: 1,021 Responsibility: Main application logic, API coordination, multithreading
Major Blocks:
- Block 1: Configuration & Base Infrastructure (constants, global variables, progress bar setup)
- Block 2: Decorators & Resilience (retry logic, token refresh)
- Block 3: Authentication (IAM login, token management)
- Block 4: Extended Fields Configuration (Excel loading & validation)
- Block 5: Data Search & Extraction (questionnaire finding, field retrieval)
- Block 6: Custom Functions & Field Processing (business logic, calculated fields)
- Block 7: Business API Calls (RC, GDD endpoints)
- Block 7b: Organization Center Mapping (organization enrichment with center identifiers)
- Block 8: Processing Orchestration (patient data processing)
- Block 9: Main Execution (entry point, quality checks, export)
2. eb_dashboard_utils.py (Reusable Utilities)
Size: ~6.4 KB | Lines: 184 Responsibility: Generic utility functions shared across modules
Core Functions:
get_httpx_client() # Thread-local HTTP client management
get_thread_position() # Progress bar positioning
get_nested_value() # JSON path navigation with wildcard support
get_config_path() # Config folder resolution (script vs PyInstaller)
get_old_filename() # Backup filename generation
3. eb_dashboard_quality_checks.py (QA & Validation)
Size: ~59 KB | Lines: 1,266 Responsibility: Quality assurance, data validation, regression checking
Core Functions:
load_regression_check_config() # Load regression rules from Excel
run_quality_checks() # Orchestrate all QA checks
coherence_check() # Verify stats vs detailed data consistency
non_regression_check() # Config-driven change validation
run_check_only_mode() # Standalone validation mode
backup_output_files() # Create versioned backups
4. eb_dashboard_excel_export.py (Excel Report Generation & Orchestration)
Size: ~38 KB | Lines: ~1,340 (v1.1+) Responsibility: Configuration-driven Excel workbook generation with data transformation + high-level orchestration
Low-Level Functions (Data Processing):
load_excel_export_config() # Load Excel_Workbooks and Excel_Sheets config
validate_excel_config() # Validate templates and named ranges
export_to_excel() # Main export orchestration (openpyxl + win32com)
_apply_filter() # AND-condition filtering
_apply_sort() # Multi-key sorting with datetime support
_apply_value_replacement() # Strict type matching value transformation
_handle_output_exists() # File conflict resolution (Overwrite/Increment/Backup)
_recalculate_workbook() # Formula recalculation via win32com (optional)
_process_sheet() # Sheet-specific data filling
High-Level Orchestration Functions (v1.1+):
export_excel_only(sys_argv, console, ...) # Complete --excel-only mode orchestration
run_normal_mode_export(data, data, enabled, config, ...) # Normal mode export phase
prepare_excel_export(inclusions_file, organizations_file, ...) # Prep + validate
execute_excel_export(inclusions_data, organizations_data, config, ...) # Exec + error handling
_load_json_file_internal(filename) # Safe JSON loading helper
Design Pattern (v1.1+):
- All export mechanics delegated to module (follows quality_checks pattern)
- Main script calls single function per mode:
export_excel_only()orrun_normal_mode_export() - Configuration validation and error handling centralized in module
- Result: Main script focused on business logic, export details encapsulated
Note: See DOCUMENTATION_13_EXCEL_EXPORT.md for complete architecture and configuration details.
5. eb_dashboard_constants.py (Centralized Configuration)
Size: ~3.5 KB | Lines: 120 Responsibility: Single source of truth for all application constants
Constants Categories:
# File Management
INCLUSIONS_FILE_NAME, ORGANIZATIONS_FILE_NAME, CONFIG_FOLDER_NAME, etc.
# Excel Configuration
DASHBOARD_CONFIG_FILE_NAME, ORG_CENTER_MAPPING_FILE_NAME
EXCEL_WORKBOOKS_TABLE_NAME, EXCEL_SHEETS_TABLE_NAME, etc.
# API Configuration
API_TIMEOUT, API_*_ENDPOINT (9 endpoints across Auth, RC, GDD)
DEFAULT_USER_NAME, DEFAULT_PASSWORD, IAM_URL, RC_URL, GDD_URL, RC_APP_ID
# Research Protocol
RC_ENDOBEST_PROTOCOL_ID, RC_ENDOBEST_EXCLUDED_CENTERS
# Performance & Quality
ERROR_MAX_RETRY, WAIT_BEFORE_RETRY, MAX_THREADS
EXCEL_RECALC_TIMEOUT
# Logging & UI
LOG_FILE_NAME, BAR_N_FMT_WIDTH, BAR_TOTAL_FMT_WIDTH, etc.
Design Principle: All constants are imported from this module - never duplicated or redefined in other modules. This ensures a single source of truth for all configuration values across the entire application.
Complete Data Collection Workflow
Phase 1: Initialization & Authentication
START
↓
[1] User Login Prompt
├─ Input: username, password (defaults available)
├─ IAM Authentication: POST /api/auth/ziwig-pro/login
├─ Get Master Token + User ID
└─ RC Token Exchange: POST /api/auth/config-token
└─ Output: access_token, refresh_token
↓
[2] Configuration Loading
├─ Parse Excel: Endobest_Dashboard_Config.xlsx
├─ Load Inclusions_Mapping sheet → Field mapping definition
├─ Validate all field configurations
└─ Load Regression_Check sheet → Quality rules
↓
[3] Thread Pool Configuration
├─ Main pool: ThreadPoolExecutor(user_input_threads, max=20)
├─ Async pool: ThreadPoolExecutor(40) for nested tasks
└─ Initialize per-thread HTTP clients
Phase 2: Organization & Counters Retrieval
[4] Get All Organizations
├─ API: GET /api/inclusions/getAllOrganizations
├─ Filter: Exclude RC_ENDOBEST_EXCLUDED_CENTERS
└─ Output: List of all centers
↓
[5] Fetch Organization Counters (Parallelized)
├─ For each organization:
│ └─ POST /api/inclusions/inclusion-statistics
│ ├─ Protocol: RC_ENDOBEST_PROTOCOL_ID
│ └─ Store: patients_count, preincluded_count, included_count, prematurely_terminated_count
├─ Execute: 20 parallel workers
└─ Output: Organizations with counters
↓
[5b] Enrich Organizations with Center Mapping (Optional)
├─ Load mapping file: eb_org_center_mapping.xlsx (if exists)
├─ Parse sheet: Org_Center_Mapping
│ ├─ Extract: Organization_Name → Center_Name pairs
│ ├─ Validate: No duplicate organizations or centers
│ └─ Build: Normalized key mapping (case-insensitive, trimmed)
├─ For each organization:
│ ├─ Normalize organization name
│ ├─ Lookup in mapping dictionary
│ ├─ If found: Add Center_Name field (mapped value)
│ └─ If not found: Add Center_Name field (fallback to org name)
├─ Error Handling: Graceful degradation (missing file = skip silently)
└─ Output: Organizations with enriched Center_Name field
↓
[6] Calculate Totals & Sort
├─ Sum all patient counts across organizations
├─ Sort organizations by patient count (descending)
└─ Display summary statistics
Phase 3: Patient Inclusion Data Collection
[7] For Each Organization (Parallelized - 20 workers):
├─ API: POST /api/inclusions/search?limit=1000&page=1
│ └─ Retrieve up to 1000 inclusions per organization
├─ Store: inclusions_list[]
└─ For Each Patient in Inclusions (Sequential):
↓
[8] Fetch Patient Data Sources (Parallel):
├─ THREAD 1: GET /api/records/byPatient
│ └─ Retrieve clinical record, protocol inclusions, data
├─ THREAD 2: GET /api/surveys/filter/with-answers (OPTIMIZED)
│ └─ Single call retrieves ALL questionnaires + answers for patient
├─ THREAD 3: GET /api/requests/by-tube-id/{tubeId}
│ └─ Retrieve lab test results
└─ WAIT: All parallel threads complete
↓
[9] Process Field Mappings
├─ For each field in field mapping config:
│ ├─ Determine field source (questionnaire, record, inclusion, request)
│ ├─ Extract raw value using field_path (supports JSON path + wildcards)
│ ├─ Apply field condition (if specified)
│ ├─ Execute custom functions (if Calculated type)
│ ├─ Apply post-processing transformations:
│ │ ├─ true_if_any: Convert to boolean if value matches list
│ │ ├─ value_labels: Map value to localized text
│ │ ├─ field_template: Apply formatting template
│ │ └─ List joining: Join array values with pipe delimiter
│ └─ Store in output_inclusion[field_group][field_name]
└─ Output: Complete inclusion record with all fields
↓
[10] Progress Update
├─ Update per-organization progress bar
└─ Update global progress bar (thread-safe)
↓
[11] Aggregate Results
└─ Combine all inclusions from all organizations
Phase 4: Quality Assurance & Validation
[12] Sorting
├─ Sort by: Organization Name, Inclusion Date, Patient Pseudo
└─ Output: Ordered inclusions_list[]
↓
[13] Quality Checks Execution
├─ COHERENCE CHECK:
│ ├─ Compare organization statistics (API counters)
│ ├─ vs. actual inclusion data (detailed records)
│ ├─ Verify: total, preincluded, included, prematurely_terminated counts
│ └─ Report mismatches with severity levels
│
├─ NON-REGRESSION CHECK:
│ ├─ Load previous inclusions (_old file)
│ ├─ Compare current vs. previous data
│ ├─ Apply config-driven regression rules
│ ├─ Detect: new inclusions, deleted inclusions, field changes
│ ├─ Apply transition patterns and exceptions
│ └─ Report violations by severity (Warning/Critical)
│
└─ Result: has_coherence_critical, has_regression_critical flags
↓
[14] Critical Issues Handling
├─ If NO critical issues:
│ └─ Continue to export
├─ If YES critical issues:
│ ├─ Display warning: ⚠ CRITICAL issues detected!
│ ├─ Prompt user: "Do you want to write results anyway?"
│ ├─ If NO → Cancel export, exit gracefully
│ └─ If YES → Continue to export (user override)
Phase 5: Export & Persistence
Phase 5 covers both JSON persistence and optional Excel export. The architecture is flexible:
[15] Backup Old Files (only if checks passed)
├─ endobest_inclusions.json → endobest_inclusions_old.json
├─ endobest_organizations.json → endobest_organizations_old.json
└─ Operation: Silent, overwrite existing backups
↓
[16] Write JSON Output Files
├─ File 1: endobest_inclusions.json
│ ├─ Format: JSON array of inclusion objects
│ ├─ Structure: Nested by field groups
│ └─ Size: Typically 6-7 MB (for full Endobest)
│
├─ File 2: endobest_organizations.json
│ ├─ Format: JSON array of organization objects
│ ├─ Includes: counters, statistics
│ └─ Size: Typically 17-20 KB
│
└─ Both: UTF-8 encoding, 4-space indentation
↓
[17] Excel Export (if configured)
├─ DELEGATED TO: run_normal_mode_export()
├─ (from eb_dashboard_excel_export module)
│
├─ Workflow:
│ ├─ Check: Is Excel export enabled?
│ │ └─ If NO → Skip to Completion (step 18)
│ │ └─ If YES → Continue
│ │
│ ├─ Load JSONs from filesystem
│ │ └─ Ensures consistency with just-written files
│ │
│ ├─ Load Excel export configuration
│ │ ├─ Sheet: Excel_Workbooks (workbook definitions)
│ │ └─ Sheet: Excel_Sheets (sheet configurations)
│ │
│ ├─ For each configured workbook:
│ │ ├─ Load template file (openpyxl)
│ │ ├─ For each sheet in workbook:
│ │ │ ├─ Load source data (Inclusions or Organizations JSON)
│ │ │ ├─ Apply filter (AND conditions)
│ │ │ ├─ Apply multi-key sort (datetime-aware)
│ │ │ ├─ Apply value replacements (strict type matching)
│ │ │ └─ Fill data into cells/named ranges
│ │ │
│ │ ├─ Handle file conflicts (Overwrite/Increment/Backup strategy)
│ │ ├─ Save workbook (openpyxl)
│ │ └─ Recalculate formulas (optional, via win32com)
│ │
│ └─ Return: status (success/failure) + error message
│
└─ Note: See DOCUMENTATION_13_EXCEL_EXPORT.md for data transformation details
↓
[18] Completion & Reporting
├─ Display elapsed time
├─ Report all file locations (JSONs + Excel files if generated)
├─ Log all operations to dashboard.log
└─ EXIT
Three Operating Modes:
-
NORMAL MODE (full workflow)
- Collect data → Quality checks → Write JSONs → Excel export (if enabled)
-
--excel-only MODE
- Skip data collection + quality checks
- Load existing JSONs → Excel export
- Uses:
export_excel_only()function from module
-
--check-only MODE
- Skip data collection
- Run quality checks only
- Uses:
run_check_only_mode()function from quality_checks module
Expected Output Structure
[
{
"Patient_Identification": {
"Organisation_Id": "uuid",
"Organisation_Name": "Center Name",
"Patient_Id": "internal_id",
"Pseudo": "ENDO-001",
"Patient_Name": "Doe, John",
"Patient_Birthday": "1975-05-15",
"Patient_Age": 49
},
"Inclusion": {
"Consent_Signed": true,
"Inclusion_Date": "15/10/2024",
"Inclusion_Status": "incluse",
"Inclusion_Complex": "Non",
"isPrematurelyTerminated": false,
"Inclusion_Status_Complete": "incluse",
"Need_RCP": false
},
"Extended_Fields": {
"Custom_Field_1": "value",
"Custom_Field_2": 42
},
"Endotest": {
"Request_Sent": true,
"Diagnostic_Status": "Completed",
"Request_Overall_Status": "Accepted par Ziwig Lab"
},
"Infos Générales": {
"Couleurs (ex: 8/10)": "8/10",
"Qualité de vie (ex: 43/55)": "43/55"
}
}
]
API Integration
Authentication APIs (IAM)
Login Endpoint
POST https://api-auth.ziwig-connect.com/api/auth/ziwig-pro/login
Request:
{
"username": "user@example.com",
"password": "password123"
}
Response:
{
"access_token": "jwt_token_master",
"userId": "user-uuid",
...
}
Token Exchange (RC-specific)
POST https://api-hcp.ziwig-connect.com/api/auth/config-token
Headers:
Authorization: Bearer {master_token}
Request:
{
"userId": "user-uuid",
"clientId": "602aea51-cdb2-4f73-ac99-fd84050dc393",
"userAgent": "Mozilla/5.0..."
}
Response:
{
"access_token": "jwt_token_rc",
"refresh_token": "refresh_token_value"
}
Token Refresh (Automatic on 401)
POST https://api-hcp.ziwig-connect.com/api/auth/refreshToken
Headers:
Authorization: Bearer {current_access_token}
Request:
{
"refresh_token": "refresh_token_value"
}
Response:
{
"access_token": "new_jwt_token",
"refresh_token": "new_refresh_token"
}
Research Clinic APIs (RC)
Get All Organizations
GET https://api-hcp.ziwig-connect.com/api/inclusions/getAllOrganizations
Headers:
Authorization: Bearer {access_token}
Response:
[
{
"id": "org-uuid",
"name": "Center Name",
"address": "...",
...
}
]
Get Organization Statistics
POST https://api-hcp.ziwig-connect.com/api/inclusions/inclusion-statistics
Headers:
Authorization: Bearer {access_token}
Request:
{
"protocolId": "3c7bcb4d-91ed-4e9f-b93f-99d8447a276e",
"center": "org-uuid",
"excludedCenters": ["excluded-org-uuid-1", "excluded-org-uuid-2"]
}
Response:
{
"statistic": {
"totalInclusions": 145,
"preIncluded": 23,
"included": 110,
"prematurelyTerminated": 12
}
}
Search Inclusions by Organization
POST https://api-hcp.ziwig-connect.com/api/inclusions/search?limit=1000&page=1
Headers:
Authorization: Bearer {access_token}
Request:
{
"protocolId": "3c7bcb4d-91ed-4e9f-b93f-99d8447a276e",
"center": "org-uuid",
"keywords": ""
}
Response:
{
"data": [
{
"id": "patient-uuid",
"name": "Doe, John",
"status": "incluse",
...
}
]
}
Get Patient Clinical Record
POST https://api-hcp.ziwig-connect.com/api/records/byPatient
Headers:
Authorization: Bearer {access_token}
Request:
{
"center": "org-uuid",
"patientId": "patient-uuid",
"mode": "exchange",
"state": "ongoing",
"includeEndoParcour": false,
"sourceClient": "pro_prm"
}
Response:
{
"record": {
"protocol_inclusions": [
{
"status": "incluse",
"blockedQcmVersions": [],
"clinicResearchData": [
{
"requestMetaData": {
"tubeId": "tube-uuid"
}
}
]
}
]
}
}
Get All Questionnaires for Patient (Optimized)
POST https://api-hcp.ziwig-connect.com/api/surveys/filter/with-answers
Headers:
Authorization: Bearer {access_token}
Request:
{
"context": "clinic_research",
"subject": "patient-uuid",
"blockedQcmVersions": [] (optional)
}
Response:
[
{
"questionnaire": {
"id": "qcm-uuid",
"name": "Questionnaire Name",
"category": "Category"
},
"answers": {
"question_1": "answer_value",
"question_2": true,
...
}
}
]
Lab APIs (GDD)
Get Request by Tube ID
GET https://api-lab.ziwig-connect.com/api/requests/by-tube-id/{tubeId}?isAdmin=true&organization=undefined
Headers:
Authorization: Bearer {access_token}
Response:
{
"id": "request-uuid",
"status": "completed",
"tubeId": "tube-uuid",
"diagnostic_status": "Completed",
"results": [
{
"test_name": "Test Result",
"value": "Result Value"
}
]
}
Multithreading & Performance
Thread Pool Architecture
Main Application Thread
↓
┌─────────────────────────────────────────────────────┐
│ Phase 1: Counter Fetching │
│ ThreadPoolExecutor(max_workers=user_input) │
│ ├─ Task 1: Get counter for Org 1 │
│ ├─ Task 2: Get counter for Org 2 │
│ └─ Task N: Get counter for Org N │
│ [Sequential wait: tqdm.as_completed] │
└─────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────┐
│ Phase 2: Inclusion Data Collection (Nested) │
│ Outer: ThreadPoolExecutor(max_workers=user_input) │
│ ├─ For Org 1: │
│ │ └─ Inner: ThreadPoolExecutor(max_workers=40) │
│ │ ├─ Patient 1: Async request/questionnaires │
│ │ ├─ Patient 2: Async request/questionnaires │
│ │ └─ Patient N: Async request/questionnaires │
│ │ └─ [Sequential wait: as_completed] │
│ │ │
│ ├─ For Org 2: │
│ │ └─ [Similar parallel processing] │
│ │ │
│ └─ For Org N: │
│ └─ [Similar parallel processing] │
│ [Outer wait: tqdm.as_completed] │
└─────────────────────────────────────────────────────┘
Performance Optimizations
1. Questionnaire Batching
Problem: Multiple filtered API calls per patient (slow) Solution: Single optimized API call retrieves all questionnaires with answers Impact: 4-5x performance improvement
# BEFORE (inefficient):
for qcm_id in questionnaire_ids:
answers = GET /api/surveys/{qcm_id}/answers?subject={patient_id}
# AFTER (optimized):
all_answers = POST /api/surveys/filter/with-answers
with payload: {"context": "clinic_research", "subject": patient_id}
2. Thread-Local HTTP Clients
Problem: Shared httpx.Client causes connection conflicts Solution: Each thread maintains its own client Implementation:
def get_httpx_client() -> httpx.Client:
thread_id = threading.get_ident()
if thread_id not in httpx_clients:
httpx_clients[thread_id] = httpx.Client()
return httpx_clients[thread_id]
3. Nested Parallelization
Problem: Sequential patient processing within organization Solution: Submitting request/questionnaire fetches to async pool Benefit: Non-blocking I/O during main thread processing
for inclusion in inclusions:
output_inclusion = _process_inclusion_data(inclusion, organization)
# Within _process_inclusion_data():
request_future = subtasks_thread_pool.submit(get_request_by_tube_id, tube_id)
all_questionnaires = get_all_questionnaires_by_patient(patient_id, record_data)
request_data = request_future.result() # Wait for async completion
4. Configurable Worker Threads
User Input: Thread count selection (1-20 workers) Rationale: Allows tuning for network bandwidth, API rate limits, system resources
Progress Tracking
Multi-Level Progress Bars
Overall Progress [████████████░░░░░░░░░░░░] 847/1200
1/15 - Center 1 [██████████░░░░░░░░░░░░░░░] 73/95
2/15 - Center 2 [██████░░░░░░░░░░░░░░░░░░░] 42/110
3/15 - Center 3 [████░░░░░░░░░░░░░░░░░░░░░] 28/85
Thread-Safe Progress Updates
with _global_pbar_lock:
if global_pbar:
global_pbar.update(1) # Thread-safe update
Data Processing Pipeline
Field Extraction Logic
For each field in field mapping configuration:
├─ Input: field configuration from Excel
│
├─ Step 1: Determine Field Source
│ ├─ If source_type in [q_id, q_name, q_category]
│ │ └─ Find questionnaire in all_questionnaires dict
│ ├─ If source_type == "record"
│ │ └─ Use record_data (clinical record)
│ ├─ If source_type == "inclusion"
│ │ └─ Use inclusion_data (patient inclusion data)
│ ├─ If source_type == "request"
│ │ └─ Use request_data (lab test request)
│ └─ If source_name == "Calculated"
│ └─ Execute custom function
│
├─ Step 2: Extract Raw Value
│ ├─ Navigate JSON using field_path (supports * wildcard)
│ ├─ Example: ["record", "clinicResearchData", "*", "value"]
│ └─ Result: raw_value or "undefined"
│
├─ Step 3: Check Field Condition (optional)
│ ├─ If condition field is undefined
│ │ └─ Set final_value = "undefined"
│ ├─ If condition field is not boolean
│ │ └─ Set final_value = "$$$$ Condition Field Error"
│ ├─ If condition field is False
│ │ └─ Set final_value = "N/A"
│ └─ If condition field is True
│ └─ Continue processing
│
├─ Step 4: Apply Post-Processing Transformations
│ ├─ true_if_any: Convert to boolean
│ │ └─ If raw_value matches any value in true_if_any list → True
│ │ └─ Otherwise → False
│ │
│ ├─ value_labels: Map to localized text
│ │ └─ Find matching label_map entry by raw_value
│ │ └─ Replace with French text (text.fr)
│ │
│ ├─ field_template: Apply formatting
│ │ └─ Replace "$value" placeholder with formatted value
│ │ └─ Example: "$value%" → "85%"
│ │
│ └─ List joining: Flatten arrays
│ └─ Join array elements with "|" delimiter
│
├─ Step 5: Format Score Dictionaries
│ ├─ If value is dict with keys ['total', 'max']
│ │ └─ Format as "total/max" string
│ │ └─ Example: {"total": 8, "max": 10} → "8/10"
│ └─ Otherwise: Keep as-is
│
└─ Output: final_value
└─ Stored in output_inclusion[field_group][field_name]
Custom Functions for Calculated Fields
1. search_in_fields_using_regex
Purpose: Search multiple fields for regex pattern match
Syntax: ["search_in_fields_using_regex", "regex_pattern", "field_1", "field_2", ...]
Logic:
FOR each field in [field_1, field_2, ...]:
IF field value matches regex_pattern (case-insensitive):
RETURN True
RETURN False
Example:
{
"source_id": "search_in_fields_using_regex",
"field_path": [".*surgery.*", "Indication", "Previous_Surgery"]
}
2. extract_parentheses_content
Purpose: Extract text within parentheses
Syntax: ["extract_parentheses_content", "field_name"]
Logic:
value = get_value_from_inclusion(field_name)
RETURN match first occurrence of (content) pattern
Example:
Input: "Status (Active)"
Output: "Active"
3. append_terminated_suffix
Purpose: Add " - AP" suffix if patient prematurely terminated
Syntax: ["append_terminated_suffix", "status_field", "is_terminated_field"]
Logic:
status = get_value_from_inclusion(status_field)
is_terminated = get_value_from_inclusion(is_terminated_field)
IF is_terminated == True:
RETURN status + " - AP"
ELSE:
RETURN status
4. if_then_else
Purpose: Unified conditional logic with 8 operators
Syntax: ["if_then_else", "operator", arg1, arg2_optional, result_if_true, result_if_false]
Operators:
| Operator | Args | Logic |
|---|---|---|
is_true |
field, true_val, false_val | IF field == True THEN true_val ELSE false_val |
is_false |
field, true_val, false_val | IF field == False THEN true_val ELSE false_val |
is_defined |
field, true_val, false_val | IF field is not undefined THEN true_val ELSE false_val |
is_undefined |
field, true_val, false_val | IF field is undefined THEN true_val ELSE false_val |
all_true |
[fields_list], true_val, false_val | IF all fields are True THEN true_val ELSE false_val |
all_defined |
[fields_list], true_val, false_val | IF all fields are defined THEN true_val ELSE false_val |
== |
value1, value2, true_val, false_val | IF value1 == value2 THEN true_val ELSE false_val |
!= |
value1, value2, true_val, false_val | IF value1 != value2 THEN true_val ELSE false_val |
Value Resolution Rules:
- Boolean literals:
true,false→ used directly - Numeric literals:
42,3.14→ used directly - String literals: Prefixed with
$→$"Active"→"Active" - Field references: No prefix → looked up from inclusion data
Examples:
{
"source_id": "if_then_else",
"field_path": ["is_defined", "Patient_Id", "$\"DEFINED\"", "$\"UNDEFINED\""]
}
{
"source_id": "if_then_else",
"field_path": ["==", "Status", "$\"Active\"", "$\"Is Active\"", "$\"Not Active\""]
}
{
"source_id": "if_then_else",
"field_path": ["all_true", ["Is_Consented", "Is_Included"], true, false]
}
Execution Modes
Mode 1: Normal Mode (Full Data Collection)
python eb_dashboard.py
Workflow:
- User login (with defaults)
- Load configuration
- Collect organizations & counters
- Collect all inclusion data (parallelized)
- Run quality checks (coherence + regression)
- Prompt user if critical issues
- Export JSON files
- Display elapsed time
Output Files:
endobest_inclusions.jsonendobest_organizations.json- Backup files with
_oldsuffix - Excel files (if configured in Excel_Workbooks table)
Mode 2: Excel-Only Mode (Fast Export) - NEW
python eb_dashboard.py --excel-only
Workflow:
- Load existing JSON files (no API calls, no collection)
- Load Excel export configuration
- Generate Excel workbooks from existing data
- Exit
Use Case: Regenerate Excel reports without data collection (faster iteration), test new configurations, apply new filters/sorts
Output Files:
- Excel files as specified in Excel_Workbooks configuration
Mode 3: Check-Only Mode (Validation Only)
python eb_dashboard.py --check-only
Workflow:
- Load existing JSON files (no API calls)
- Load regression check configuration
- Run quality checks without collecting new data
- Report any issues
- Exit
Use Case: Validate data before distribution, no fresh collection needed
Mode 4: Check-Only Compare Mode (File Comparison)
python eb_dashboard.py --check-only file1.json file2.json
Workflow:
- Load two specific JSON files
- Run regression check comparing file1 vs file2
- Skip coherence check (organizations file not needed)
- Report differences
- Exit
Use Case: Compare two snapshot versions without coherence validation
Mode 4: Debug Mode (Detailed Output)
python eb_dashboard.py --debug
Workflow:
- Execute as normal mode
- Enable DEBUG_MODE in quality checks module
- Display detailed field-by-field changes
- Show individual inclusion comparisons
- Verbose logging
Use Case: Troubleshoot regression check rules, understand data changes
Organization ↔ Center Mapping
Overview
The organization-to-center mapping feature enriches healthcare organization records with standardized center identifiers. This enables center-based reporting without requiring code modifications.
Configuration
File: eb_org_center_mapping.xlsx (optional, in script directory)
Sheet Name: Org_Center_Mapping (case-sensitive)
Required Columns:
| Organization_Name | Center_Name |
|-------------------|-------------|
| Hospital A | HOSP-A |
| Hospital B | HOSP-B |
Workflow
-
Load Mapping (Step [5b] of Phase 2)
- Read
eb_org_center_mapping.xlsxif file exists - Parse
Org_Center_Mappingsheet - Skip silently if file not found (graceful degradation)
- Read
-
Validate Data
- Check for duplicate organization names (normalized: lowercase, trimmed)
- Check for duplicate center names
- If duplicates found: abort mapping, return empty dict
-
Build Mapping Dictionary
- Key: normalized organization name
- Value: center name (original case preserved)
- Example:
{"hospital a": "HOSP-A"}
-
Apply to Organizations
- For each organization from RC API:
- Normalize organization name (lowercase, trim)
- Lookup in mapping dictionary
- If found: Add
Center_Namefield with mapped value - If not found: Add
Center_Namefield with fallback (org name)
- For each organization from RC API:
Error Handling
| Scenario | Behavior |
|---|---|
| File missing | Print warning, skip mapping |
| Sheet not found | Print warning, skip mapping |
| Columns missing | Print warning, skip mapping |
| Duplicate organizations | Abort mapping, print error |
| Duplicate centers | Abort mapping, print error |
| Organization not in mapping | Use fallback (org name) |
Output
In endobest_organizations.json:
{
"id": "org-uuid",
"name": "Hospital A",
"Center_Name": "HOSP-A",
"patients_count": 45,
...
}
In endobest_inclusions.json (if extended field configured):
{
"Patient_Identification": {
"Organisation_Name": "Hospital A",
"Center_Name": "HOSP-A",
...
}
}
Example
Input Organizations (from RC API):
[
{"id": "org1", "name": "Hospital A"},
{"id": "org2", "name": "Hospital B"},
{"id": "org3", "name": "Clinic C"}
]
Mapping File:
Organization_Name | Center_Name
Hospital A | HOSP-A
Hospital B | HOSP-B
Console Output:
Mapping organizations to centers...
⚠ 1 organization(s) not mapped:
- Clinic C
Result: Clinic C uses fallback → Center_Name = "Clinic C"
Features
- ✅ Case-Insensitive Matching: "Hospital A" matches "hospital a" in file
- ✅ Whitespace Trimming: " Hospital A " matches "Hospital A"
- ✅ Graceful Degradation: Missing file doesn't break process
- ✅ Fallback Strategy: Unmapped organizations use original name
- ✅ No Code Changes: Fully configurable via Excel file
Error Handling & Resilience
Token Management Strategy
1. Automatic Token Refresh on 401
@api_call_with_retry
def some_api_call():
# If response.status_code == 401:
# new_token() is called automatically
# Request is retried
pass
2. Thread-Safe Token Refresh
def new_token():
global access_token, refresh_token
with _token_refresh_lock: # Only one thread refreshes at a time
# Attempt refresh up to ERROR_MAX_RETRY times
for attempt in range(ERROR_MAX_RETRY):
try:
# POST /api/auth/refreshToken
# Update global tokens
except:
sleep(WAIT_BEFORE_RETRY)
Retry Mechanism
Configuration Constants
ERROR_MAX_RETRY = 10 # Maximum retry attempts
WAIT_BEFORE_RETRY = 0.5 # Seconds between retries (no exponential backoff)
Retry Logic
for attempt in range(ERROR_MAX_RETRY):
try:
# Make API call
response.raise_for_status()
return result
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
logging.warning(f"Error (Attempt {attempt + 1}/{ERROR_MAX_RETRY}): {exc}")
# Handle 401 (token expired)
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code == 401:
logging.info("Token expired. Refreshing token.")
new_token()
# Wait before retry (except last attempt)
if attempt < ERROR_MAX_RETRY - 1:
sleep(WAIT_BEFORE_RETRY)
# If all retries fail
logging.critical(f"Persistent error after {ERROR_MAX_RETRY} attempts")
raise httpx.RequestError(message="Persistent error")
Exception Handling
API Errors
- httpx.RequestError: Network errors, connection timeouts, DNS failures
- httpx.HTTPStatusError: HTTP status codes >= 400
- json.JSONDecodeError: Invalid JSON in configuration or response
File I/O Errors
- FileNotFoundError: Configuration file missing
- IOError: Cannot write output files
- json.JSONDecodeError: Corrupted JSON file loading
Validation Errors
- Configuration validation: Invalid field definitions in Excel
- Data validation: Incoherent statistics vs. detailed data
- Regression check violations: Unexpected data changes
Error Logging
import logging
logging.basicConfig(
level=logging.WARNING,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='dashboard.log',
filemode='w'
)
Logged Events:
- API errors with attempt numbers
- Token refresh events
- Configuration loading status
- Quality check results
- File I/O operations
- Thread errors with stack traces
Graceful Degradation
User Confirmation on Critical Issues
If has_coherence_critical or has_regression_critical:
Display: "⚠ CRITICAL issues detected in quality checks!"
Prompt: "Do you want to write the results anyway?"
If YES:
Continue with export (user override)
If NO:
Cancel export, preserve old files
Exit gracefully
Thread Failure Handling
try:
result = future.result()
output_inclusions.extend(result)
except Exception as exc:
logging.critical(f"Critical error in worker: {exc}", exc_info=True)
thread_pool.shutdown(wait=False, cancel_futures=True)
raise # Propagate to main handler
Main Exception Handler
if __name__ == '__main__':
try:
main()
except Exception as e:
logging.critical(f"Script terminated prematurely: {e}", exc_info=True)
print(f"Error: {e}")
finally:
if 'subtasks_thread_pool' in globals():
subtasks_thread_pool.shutdown(wait=False, cancel_futures=True)
input("Press Enter to exit...")
Performance Metrics & Benchmarks
Typical Execution Times
For a full Endobest dataset (1,200+ patients, 15+ organizations):
| Phase | Duration | Notes |
|---|---|---|
| Login & Config | ~2-3 sec | Sequential |
| Fetch Counters (20 workers) | ~5-8 sec | Parallelized |
| Collect Inclusions (20 workers) | ~2-4 min | Includes API calls + processing |
| Quality Checks | ~10-15 sec | Loads files, compares data |
| Export to JSON | ~3-5 sec | File I/O |
| Total | ~2.5-5 min | Depends on network, API performance |
Network Optimization Impact
With old questionnaire fetching (N filtered calls per patient):
- 1,200 patients × 15 questionnaires = 18,000 API calls
- Estimated: 15-30 minutes
With optimized single-call questionnaire fetching:
- 1,200 patients × 1 call = 1,200 API calls
- Estimated: 2-5 minutes
- Improvement: 3-6x faster
Configuration Files
Excel Configuration File: Endobest_Dashboard_Config.xlsx
Sheet 1: Inclusions_Mapping (Field Mapping Definition)
Defines all fields to be extracted and their transformation rules. See DOCUMENTATION_11_FIELD_MAPPING.md for detailed guide.
Sheet 2: Regression_Check (Non-Regression Rules)
Defines data validation rules for detecting unexpected changes. See DOCUMENTATION_12_QUALITY_CHECKS.md for detailed guide.
Summary
The Endobest Dashboard implements a sophisticated, production-grade data collection system with:
✅ Flexible Configuration: Zero-code field definitions via Excel ✅ High Performance: 4-5x faster via optimized API calls ✅ Robust Resilience: Automatic token refresh, retries, error recovery ✅ Thread Safety: Per-thread clients, synchronized shared state ✅ Quality Assurance: Coherence checks + config-driven regression testing ✅ Comprehensive Logging: Full audit trail in dashboard.log ✅ User-Friendly: Progress bars, interactive prompts, clear error messages
This architecture enables non-technical users to configure new data sources without code changes, while providing developers with extensible hooks for custom logic and quality validation.
Document End