310 lines
7.6 KiB
Markdown
310 lines
7.6 KiB
Markdown
# Endobest Script Template - Quick Start Guide
|
|
|
|
## Overview
|
|
|
|
`eb_script_template.py` is a reusable template for creating scripts that access Endobest clinical research platform data.
|
|
|
|
## Features
|
|
|
|
✅ **Multi-microservice authentication** (IAM, RC, GDD)
|
|
✅ **Thread-safe HTTP client pool** with keep-alive
|
|
✅ **Multithreading** with configurable main pool + fixed subtasks pool
|
|
✅ **Automatic retry** with token refresh on 401
|
|
✅ **Progress bars** using tqdm
|
|
✅ **Logging** with auto-generated filename
|
|
✅ **JSON utilities** for nested data navigation
|
|
|
|
## Quick Start
|
|
|
|
### 1. Copy the template
|
|
|
|
```bash
|
|
cp eb_script_template.py my_new_script.py
|
|
```
|
|
|
|
### 2. Configure microservices
|
|
|
|
Edit the `MICROSERVICES` dict to enable only the services you need:
|
|
|
|
```python
|
|
MICROSERVICES = {
|
|
# "IAM": {...}, # Always required
|
|
"RC": {...}, # Uncomment if needed
|
|
# "GDD": {...}, # Comment out if not needed
|
|
}
|
|
```
|
|
|
|
### 3. Implement your processing logic
|
|
|
|
Find the `TODO` block in `main()` and add your code:
|
|
|
|
```python
|
|
# ========== MAIN PROCESSING BLOCK ==========
|
|
# TODO: IMPLEMENT YOUR PROCESSING LOGIC HERE
|
|
|
|
# Example: Fetch and process organizations
|
|
organizations = get_all_organizations()
|
|
|
|
for org in organizations:
|
|
# Your processing logic here
|
|
process_organization(org)
|
|
```
|
|
|
|
### 4. Run the script
|
|
|
|
```bash
|
|
python my_new_script.py
|
|
```
|
|
|
|
## Configuration
|
|
|
|
### Constants (top of file)
|
|
|
|
| Constant | Default | Description |
|
|
|----------|---------|-------------|
|
|
| `DEFAULT_USER_NAME` | `ziwig-invest2@yopmail.com` | Default login |
|
|
| `DEFAULT_PASSWORD` | `pbrrA***` | Default password |
|
|
| `MAX_THREADS` | `20` | Maximum threads for main pool |
|
|
| `SUBTASKS_POOL_SIZE` | `40` | Fixed size for subtasks pool |
|
|
| `ERROR_MAX_RETRY` | `10` | Max retry attempts |
|
|
| `WAIT_BEFORE_RETRY` | `0.5` | Delay between retries (seconds) |
|
|
| `API_TIMEOUT` | `60` | Default API timeout (seconds) |
|
|
| `LOG_LEVEL` | `logging.INFO` | Logging level |
|
|
|
|
### Microservices Configuration
|
|
|
|
Each microservice has:
|
|
- `app_id`: Client ID for token configuration
|
|
- `base_url`: API base URL
|
|
- `endpoints`: Dict of endpoint paths
|
|
|
|
**Available endpoints (RC):**
|
|
- `organizations`: Get all organizations
|
|
- `statistics`: Get inclusion statistics
|
|
- `search_inclusions`: Search inclusions
|
|
- `record_by_patient`: Get patient record
|
|
- `surveys`: Get questionnaire responses
|
|
|
|
**Available endpoints (GDD):**
|
|
- `request_by_tube`: Get request by tube ID
|
|
|
|
## API Call Patterns
|
|
|
|
### GET Request
|
|
|
|
```python
|
|
@api_call_with_retry("RC")
|
|
def get_my_data():
|
|
client = get_httpx_client()
|
|
client.base_url = MICROSERVICES["RC"]["base_url"]
|
|
response = client.get(
|
|
MICROSERVICES["RC"]["endpoints"]["organizations"],
|
|
headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"},
|
|
timeout=API_TIMEOUT
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
```
|
|
|
|
### POST Request
|
|
|
|
```python
|
|
@api_call_with_retry("RC")
|
|
def post_my_data(param1, param2):
|
|
client = get_httpx_client()
|
|
client.base_url = MICROSERVICES["RC"]["base_url"]
|
|
response = client.post(
|
|
f"{MICROSERVICES['RC']['endpoints']['my_endpoint']}?param={param1}",
|
|
headers={"Authorization": f"Bearer {tokens['RC']['access_token']}"},
|
|
json={"key": param2},
|
|
timeout=API_TIMEOUT
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
```
|
|
|
|
## Utilities
|
|
|
|
### get_nested_value()
|
|
|
|
Navigate nested JSON structures with wildcard support:
|
|
|
|
```python
|
|
# Simple navigation
|
|
value = get_nested_value(data, ["level1", "level2", "field"])
|
|
|
|
# Array wildcard
|
|
values = get_nested_value(data, ["items", "*", "name"])
|
|
# Returns list of all "name" values from items array
|
|
```
|
|
|
|
### get_httpx_client()
|
|
|
|
Get thread-local HTTP client (automatic keep-alive):
|
|
|
|
```python
|
|
client = get_httpx_client()
|
|
client.base_url = "https://api.example.com"
|
|
response = client.get("/endpoint")
|
|
```
|
|
|
|
### get_thread_position()
|
|
|
|
Get current thread position (for progress bar positioning):
|
|
|
|
```python
|
|
position = get_thread_position()
|
|
# Use with tqdm position parameter for multi-threaded progress bars
|
|
```
|
|
|
|
## Multithreading Pattern
|
|
|
|
### Simple parallel processing
|
|
|
|
```python
|
|
items = [...] # Your data
|
|
|
|
with tqdm(total=len(items), desc="Processing", bar_format=custom_bar_format) as pbar:
|
|
with main_thread_pool as executor:
|
|
futures = [executor.submit(process_item, item) for item in items]
|
|
|
|
for future in as_completed(futures):
|
|
try:
|
|
result = future.result()
|
|
# Handle result
|
|
pbar.update(1)
|
|
except Exception as exc:
|
|
logging.critical(f"Error: {exc}", exc_info=True)
|
|
executor.shutdown(wait=False, cancel_futures=True)
|
|
raise
|
|
```
|
|
|
|
### Using subtasks pool
|
|
|
|
```python
|
|
def process_item(item):
|
|
# Launch subtask in separate pool
|
|
future = subtasks_thread_pool.submit(fetch_details, item)
|
|
details = future.result()
|
|
return combine(item, details)
|
|
```
|
|
|
|
## Logging
|
|
|
|
Logs are automatically written to `{script_name}.log`.
|
|
|
|
Change log level in constants:
|
|
|
|
```python
|
|
LOG_LEVEL = logging.DEBUG # For detailed logs
|
|
LOG_LEVEL = logging.INFO # Default
|
|
LOG_LEVEL = logging.WARNING # Errors only
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
### Automatic retry
|
|
|
|
All API calls decorated with `@api_call_with_retry(app)` automatically:
|
|
- Retry on network errors
|
|
- Retry on HTTP errors
|
|
- Refresh token on 401 Unauthorized
|
|
- Respect `ERROR_MAX_RETRY` limit
|
|
|
|
### Manual error handling
|
|
|
|
```python
|
|
try:
|
|
result = my_api_call()
|
|
except httpx.RequestError as e:
|
|
logging.error(f"Request failed: {e}")
|
|
# Handle error
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### 1. Configure only needed microservices
|
|
Comment out unused services to speed up authentication.
|
|
|
|
### 2. Use constants for configuration
|
|
Avoid hardcoded values - update constants at top of file.
|
|
|
|
### 3. Implement processing in main()
|
|
Keep your logic in the designated TODO block for clarity.
|
|
|
|
### 4. Use progress bars
|
|
Help users understand processing status with tqdm.
|
|
|
|
### 5. Log errors
|
|
Use `logging` module for debugging and audit trail.
|
|
|
|
### 6. Test incrementally
|
|
Start with simple API call, then add threading, then complex logic.
|
|
|
|
## Common Tasks
|
|
|
|
### Task 1: Fetch all organizations
|
|
|
|
```python
|
|
organizations = get_all_organizations()
|
|
for org in organizations:
|
|
print(f"{org['name']}: {org['id']}")
|
|
```
|
|
|
|
### Task 2: Process organizations in parallel
|
|
|
|
```python
|
|
organizations = get_all_organizations()
|
|
|
|
with tqdm(total=len(organizations), desc="Processing orgs") as pbar:
|
|
with main_thread_pool as executor:
|
|
futures = [executor.submit(process_org, org) for org in organizations]
|
|
for future in as_completed(futures):
|
|
result = future.result()
|
|
pbar.update(1)
|
|
```
|
|
|
|
### Task 3: Fetch nested data with subtasks
|
|
|
|
```python
|
|
def process_organization(org):
|
|
org_id = org['id']
|
|
|
|
# Launch subtask to fetch inclusions
|
|
future = subtasks_thread_pool.submit(search_inclusions, org_id, 1000, 1)
|
|
inclusions = future.result()
|
|
|
|
return {
|
|
"organization": org,
|
|
"inclusions": inclusions
|
|
}
|
|
```
|
|
|
|
## Troubleshooting
|
|
|
|
### Login fails
|
|
- Check credentials in constants
|
|
- Verify network connectivity
|
|
- Check logs for detailed error
|
|
|
|
### Token expired during execution
|
|
- Automatic refresh should handle this
|
|
- Check logs for refresh attempts
|
|
- Verify refresh token is valid
|
|
|
|
### Script hangs
|
|
- Check thread pool shutdown in finally block
|
|
- Verify API timeouts are appropriate
|
|
- Review logs for deadlocks
|
|
|
|
### Performance issues
|
|
- Adjust `MAX_THREADS` (more threads ≠ faster)
|
|
- Use subtasks pool for nested parallelism
|
|
- Profile with logging.DEBUG to find bottlenecks
|
|
|
|
## Support
|
|
|
|
For detailed technical specifications, see `Script_template_spec.md`.
|
|
|
|
For issues with the Endobest platform APIs, contact the technical team.
|