7.6 KiB
Endobest Script Template - Quick Start Guide
Overview
eb_script_template.py is a reusable template for creating scripts that access Endobest clinical research platform data.
Features
✅ Multi-microservice authentication (IAM, RC, GDD) ✅ Thread-safe HTTP client pool with keep-alive ✅ Multithreading with configurable main pool + fixed subtasks pool ✅ Automatic retry with token refresh on 401 ✅ Progress bars using tqdm ✅ Logging with auto-generated filename ✅ JSON utilities for nested data navigation
Quick Start
1. Copy the template
cp eb_script_template.py my_new_script.py
2. Configure microservices
Edit the MICROSERVICES dict to enable only the services you need:
MICROSERVICES = {
# "IAM": {...}, # Always required
"RC": {...}, # Uncomment if needed
# "GDD": {...}, # Comment out if not needed
}
3. Implement your processing logic
Find the TODO block in main() and add your code:
# ========== MAIN PROCESSING BLOCK ==========
# TODO: IMPLEMENT YOUR PROCESSING LOGIC HERE
# Example: Fetch and process organizations
organizations = get_all_organizations()
for org in organizations:
# Your processing logic here
process_organization(org)
4. Run the script
python my_new_script.py
Configuration
Constants (top of file)
| Constant | Default | Description |
|---|---|---|
DEFAULT_USER_NAME |
ziwig-invest2@yopmail.com |
Default login |
DEFAULT_PASSWORD |
pbrrA*** |
Default password |
MAX_THREADS |
20 |
Maximum threads for main pool |
SUBTASKS_POOL_SIZE |
40 |
Fixed size for subtasks pool |
ERROR_MAX_RETRY |
10 |
Max retry attempts |
WAIT_BEFORE_RETRY |
0.5 |
Delay between retries (seconds) |
API_TIMEOUT |
60 |
Default API timeout (seconds) |
LOG_LEVEL |
logging.INFO |
Logging level |
Microservices Configuration
Each microservice has:
app_id: Client ID for token configurationbase_url: API base URLendpoints: Dict of endpoint paths
Available endpoints (RC):
organizations: Get all organizationsstatistics: Get inclusion statisticssearch_inclusions: Search inclusionsrecord_by_patient: Get patient recordsurveys: Get questionnaire responses
Available endpoints (GDD):
request_by_tube: Get request by tube ID
API Call Patterns
GET Request
@api_call_with_retry("RC")
def get_my_data():
client = get_httpx_client()
client.base_url = MICROSERVICES["RC"]["base_url"]
response = client.get(
MICROSERVICES["RC"]["endpoints"]["organizations"],
headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"},
timeout=API_TIMEOUT
)
response.raise_for_status()
return response.json()
POST Request
@api_call_with_retry("RC")
def post_my_data(param1, param2):
client = get_httpx_client()
client.base_url = MICROSERVICES["RC"]["base_url"]
response = client.post(
f"{MICROSERVICES['RC']['endpoints']['my_endpoint']}?param={param1}",
headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"},
json={"key": param2},
timeout=API_TIMEOUT
)
response.raise_for_status()
return response.json()
Utilities
get_nested_value()
Navigate nested JSON structures with wildcard support:
# Simple navigation
value = get_nested_value(data, ["level1", "level2", "field"])
# Array wildcard
values = get_nested_value(data, ["items", "*", "name"])
# Returns list of all "name" values from items array
get_httpx_client()
Get thread-local HTTP client (automatic keep-alive):
client = get_httpx_client()
client.base_url = "https://api.example.com"
response = client.get("/endpoint")
get_thread_position()
Get current thread position (for progress bar positioning):
position = get_thread_position()
# Use with tqdm position parameter for multi-threaded progress bars
Multithreading Pattern
Simple parallel processing
items = [...] # Your data
with tqdm(total=len(items), desc="Processing", bar_format=custom_bar_format) as pbar:
with main_thread_pool as executor:
futures = [executor.submit(process_item, item) for item in items]
for future in as_completed(futures):
try:
result = future.result()
# Handle result
pbar.update(1)
except Exception as exc:
logging.critical(f"Error: {exc}", exc_info=True)
executor.shutdown(wait=False, cancel_futures=True)
raise
Using subtasks pool
def process_item(item):
# Launch subtask in separate pool
future = subtasks_thread_pool.submit(fetch_details, item)
details = future.result()
return combine(item, details)
Logging
Logs are automatically written to {script_name}.log.
Change log level in constants:
LOG_LEVEL = logging.DEBUG # For detailed logs
LOG_LEVEL = logging.INFO # Default
LOG_LEVEL = logging.WARNING # Errors only
Error Handling
Automatic retry
All API calls decorated with @api_call_with_retry(app) automatically:
- Retry on network errors
- Retry on HTTP errors
- Refresh token on 401 Unauthorized
- Respect
ERROR_MAX_RETRYlimit
Manual error handling
try:
result = my_api_call()
except httpx.RequestError as e:
logging.error(f"Request failed: {e}")
# Handle error
Best Practices
1. Configure only needed microservices
Comment out unused services to speed up authentication.
2. Use constants for configuration
Avoid hardcoded values - update constants at top of file.
3. Implement processing in main()
Keep your logic in the designated TODO block for clarity.
4. Use progress bars
Help users understand processing status with tqdm.
5. Log errors
Use logging module for debugging and audit trail.
6. Test incrementally
Start with simple API call, then add threading, then complex logic.
Common Tasks
Task 1: Fetch all organizations
organizations = get_all_organizations()
for org in organizations:
print(f"{org['name']}: {org['id']}")
Task 2: Process organizations in parallel
organizations = get_all_organizations()
with tqdm(total=len(organizations), desc="Processing orgs") as pbar:
with main_thread_pool as executor:
futures = [executor.submit(process_org, org) for org in organizations]
for future in as_completed(futures):
result = future.result()
pbar.update(1)
Task 3: Fetch nested data with subtasks
def process_organization(org):
org_id = org['id']
# Launch subtask to fetch inclusions
future = subtasks_thread_pool.submit(search_inclusions, org_id, 1000, 1)
inclusions = future.result()
return {
"organization": org,
"inclusions": inclusions
}
Troubleshooting
Login fails
- Check credentials in constants
- Verify network connectivity
- Check logs for detailed error
Token expired during execution
- Automatic refresh should handle this
- Check logs for refresh attempts
- Verify refresh token is valid
Script hangs
- Check thread pool shutdown in finally block
- Verify API timeouts are appropriate
- Review logs for deadlocks
Performance issues
- Adjust
MAX_THREADS(more threads ≠ faster) - Use subtasks pool for nested parallelism
- Profile with logging.DEBUG to find bottlenecks
Support
For detailed technical specifications, see Script_template_spec.md.
For issues with the Endobest platform APIs, contact the technical team.