Files
EB_Script_Template/README_TEMPLATE.md

7.6 KiB

Endobest Script Template - Quick Start Guide

Overview

eb_script_template.py is a reusable template for creating scripts that access Endobest clinical research platform data.

Features

Multi-microservice authentication (IAM, RC, GDD) Thread-safe HTTP client pool with keep-alive Multithreading with configurable main pool + fixed subtasks pool Automatic retry with token refresh on 401 Progress bars using tqdm Logging with auto-generated filename JSON utilities for nested data navigation

Quick Start

1. Copy the template

cp eb_script_template.py my_new_script.py

2. Configure microservices

Edit the MICROSERVICES dict to enable only the services you need:

MICROSERVICES = {
    # "IAM": {...},      # Always required
    "RC": {...},         # Uncomment if needed
    # "GDD": {...},      # Comment out if not needed
}

3. Implement your processing logic

Find the TODO block in main() and add your code:

# ========== MAIN PROCESSING BLOCK ==========
# TODO: IMPLEMENT YOUR PROCESSING LOGIC HERE

# Example: Fetch and process organizations
organizations = get_all_organizations()

for org in organizations:
    # Your processing logic here
    process_organization(org)

4. Run the script

python my_new_script.py

Configuration

Constants (top of file)

Constant Default Description
DEFAULT_USER_NAME ziwig-invest2@yopmail.com Default login
DEFAULT_PASSWORD pbrrA*** Default password
MAX_THREADS 20 Maximum threads for main pool
SUBTASKS_POOL_SIZE 40 Fixed size for subtasks pool
ERROR_MAX_RETRY 10 Max retry attempts
WAIT_BEFORE_RETRY 0.5 Delay between retries (seconds)
API_TIMEOUT 60 Default API timeout (seconds)
LOG_LEVEL logging.INFO Logging level

Microservices Configuration

Each microservice has:

  • app_id: Client ID for token configuration
  • base_url: API base URL
  • endpoints: Dict of endpoint paths

Available endpoints (RC):

  • organizations: Get all organizations
  • statistics: Get inclusion statistics
  • search_inclusions: Search inclusions
  • record_by_patient: Get patient record
  • surveys: Get questionnaire responses

Available endpoints (GDD):

  • request_by_tube: Get request by tube ID

API Call Patterns

GET Request

@api_call_with_retry("RC")
def get_my_data():
    client = get_httpx_client()
    client.base_url = MICROSERVICES["RC"]["base_url"]
    response = client.get(
        MICROSERVICES["RC"]["endpoints"]["organizations"],
        headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"},
        timeout=API_TIMEOUT
    )
    response.raise_for_status()
    return response.json()

POST Request

@api_call_with_retry("RC")
def post_my_data(param1, param2):
    client = get_httpx_client()
    client.base_url = MICROSERVICES["RC"]["base_url"]
    response = client.post(
        f"{MICROSERVICES['RC']['endpoints']['my_endpoint']}?param={param1}",
        headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"},
        json={"key": param2},
        timeout=API_TIMEOUT
    )
    response.raise_for_status()
    return response.json()

Utilities

get_nested_value()

Navigate nested JSON structures with wildcard support:

# Simple navigation
value = get_nested_value(data, ["level1", "level2", "field"])

# Array wildcard
values = get_nested_value(data, ["items", "*", "name"])
# Returns list of all "name" values from items array

get_httpx_client()

Get thread-local HTTP client (automatic keep-alive):

client = get_httpx_client()
client.base_url = "https://api.example.com"
response = client.get("/endpoint")

get_thread_position()

Get current thread position (for progress bar positioning):

position = get_thread_position()
# Use with tqdm position parameter for multi-threaded progress bars

Multithreading Pattern

Simple parallel processing

items = [...]  # Your data

with tqdm(total=len(items), desc="Processing", bar_format=custom_bar_format) as pbar:
    with main_thread_pool as executor:
        futures = [executor.submit(process_item, item) for item in items]

        for future in as_completed(futures):
            try:
                result = future.result()
                # Handle result
                pbar.update(1)
            except Exception as exc:
                logging.critical(f"Error: {exc}", exc_info=True)
                executor.shutdown(wait=False, cancel_futures=True)
                raise

Using subtasks pool

def process_item(item):
    # Launch subtask in separate pool
    future = subtasks_thread_pool.submit(fetch_details, item)
    details = future.result()
    return combine(item, details)

Logging

Logs are automatically written to {script_name}.log.

Change log level in constants:

LOG_LEVEL = logging.DEBUG  # For detailed logs
LOG_LEVEL = logging.INFO   # Default
LOG_LEVEL = logging.WARNING  # Errors only

Error Handling

Automatic retry

All API calls decorated with @api_call_with_retry(app) automatically:

  • Retry on network errors
  • Retry on HTTP errors
  • Refresh token on 401 Unauthorized
  • Respect ERROR_MAX_RETRY limit

Manual error handling

try:
    result = my_api_call()
except httpx.RequestError as e:
    logging.error(f"Request failed: {e}")
    # Handle error

Best Practices

1. Configure only needed microservices

Comment out unused services to speed up authentication.

2. Use constants for configuration

Avoid hardcoded values - update constants at top of file.

3. Implement processing in main()

Keep your logic in the designated TODO block for clarity.

4. Use progress bars

Help users understand processing status with tqdm.

5. Log errors

Use logging module for debugging and audit trail.

6. Test incrementally

Start with simple API call, then add threading, then complex logic.

Common Tasks

Task 1: Fetch all organizations

organizations = get_all_organizations()
for org in organizations:
    print(f"{org['name']}: {org['id']}")

Task 2: Process organizations in parallel

organizations = get_all_organizations()

with tqdm(total=len(organizations), desc="Processing orgs") as pbar:
    with main_thread_pool as executor:
        futures = [executor.submit(process_org, org) for org in organizations]
        for future in as_completed(futures):
            result = future.result()
            pbar.update(1)

Task 3: Fetch nested data with subtasks

def process_organization(org):
    org_id = org['id']

    # Launch subtask to fetch inclusions
    future = subtasks_thread_pool.submit(search_inclusions, org_id, 1000, 1)
    inclusions = future.result()

    return {
        "organization": org,
        "inclusions": inclusions
    }

Troubleshooting

Login fails

  • Check credentials in constants
  • Verify network connectivity
  • Check logs for detailed error

Token expired during execution

  • Automatic refresh should handle this
  • Check logs for refresh attempts
  • Verify refresh token is valid

Script hangs

  • Check thread pool shutdown in finally block
  • Verify API timeouts are appropriate
  • Review logs for deadlocks

Performance issues

  • Adjust MAX_THREADS (more threads ≠ faster)
  • Use subtasks pool for nested parallelism
  • Profile with logging.DEBUG to find bottlenecks

Support

For detailed technical specifications, see Script_template_spec.md.

For issues with the Endobest platform APIs, contact the technical team.