191 lines
6.4 KiB
Python
191 lines
6.4 KiB
Python
"""
|
|
Endobest Dashboard - Utility Functions Module
|
|
|
|
This module contains generic utility functions used throughout the Endobest Dashboard:
|
|
- HTTP client management (thread-safe)
|
|
- Nested data structure navigation with wildcard support
|
|
- Configuration path resolution (script vs PyInstaller)
|
|
- Thread position management for progress bars
|
|
- Filename generation utilities
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import threading
|
|
|
|
import httpx
|
|
|
|
from eb_dashboard_constants import CONFIG_FOLDER_NAME
|
|
|
|
|
|
# ============================================================================
|
|
# GLOBAL VARIABLES (managed by main module)
|
|
# ============================================================================
|
|
|
|
# These will be set/accessed from the main module
|
|
httpx_clients = {}
|
|
threads_list = []
|
|
_threads_list_lock = threading.Lock()
|
|
|
|
|
|
# ============================================================================
|
|
# HTTP CLIENT MANAGEMENT
|
|
# ============================================================================
|
|
|
|
def get_httpx_client() -> httpx.Client:
|
|
"""
|
|
Get or create thread-local HTTP client with keep-alive enabled.
|
|
Each thread gets its own httpx.Client instance to avoid connection conflicts.
|
|
Keep-alive connections improve performance by reusing TCP connections.
|
|
"""
|
|
global httpx_clients
|
|
thread_id = threading.get_ident()
|
|
if thread_id not in httpx_clients:
|
|
# Create client with keep-alive headers and connection pooling
|
|
httpx_clients[thread_id] = httpx.Client(
|
|
headers={"Connection": "keep-alive"},
|
|
limits=httpx.Limits(max_keepalive_connections=20, max_connections=100)
|
|
)
|
|
return httpx_clients[thread_id]
|
|
|
|
|
|
def get_thread_position():
|
|
"""
|
|
Get the position of the current thread in the threads list.
|
|
Used for managing progress bar positions in multithreaded environment.
|
|
"""
|
|
global threads_list
|
|
thread_id = threading.get_ident()
|
|
with _threads_list_lock:
|
|
if thread_id not in threads_list:
|
|
threads_list.append(thread_id)
|
|
return len(threads_list) - 1
|
|
else:
|
|
return threads_list.index(thread_id)
|
|
|
|
|
|
# ============================================================================
|
|
# NESTED DATA NAVIGATION
|
|
# ============================================================================
|
|
|
|
def get_nested_value(data_structure, path, default=None):
|
|
"""
|
|
Extracts a value from a nested structure of dictionaries and lists.
|
|
Supports a wildcard '*' in the path to retrieve all elements from a list.
|
|
|
|
Args:
|
|
data_structure: The nested dict/list structure to navigate
|
|
path: List of keys/indices to follow. Use '*' for list wildcard.
|
|
default: Value to return if path not found
|
|
|
|
Returns:
|
|
The value at the end of the path, or default if not found
|
|
|
|
Examples:
|
|
get_nested_value({"a": {"b": 1}}, ["a", "b"]) -> 1
|
|
get_nested_value({"items": [{"x": 1}, {"x": 2}]}, ["items", "*", "x"]) -> [1, 2]
|
|
"""
|
|
if data_structure is None:
|
|
return "$$$$ No Data"
|
|
if not path:
|
|
return default
|
|
|
|
if "*" in path:
|
|
wildcard_index = path.index("*")
|
|
path_before = path[:wildcard_index]
|
|
path_after = path[wildcard_index+1:]
|
|
|
|
# Create a temporary function for non-wildcard path resolution
|
|
def _get_simple_nested_value(ds, p, d):
|
|
cl = ds
|
|
for k in p:
|
|
if isinstance(cl, dict):
|
|
cl = cl.get(k)
|
|
elif isinstance(cl, list):
|
|
try:
|
|
if isinstance(k, int) and -len(cl) <= k < len(cl):
|
|
cl = cl[k]
|
|
else: return d
|
|
except (IndexError, TypeError): return d
|
|
else: return d
|
|
if cl is None: return d
|
|
return cl
|
|
|
|
base_level = _get_simple_nested_value(data_structure, path_before, default)
|
|
|
|
if not isinstance(base_level, list):
|
|
return default
|
|
|
|
results = []
|
|
for item in base_level:
|
|
# For each item, recursively call to resolve the rest of the path
|
|
value = get_nested_value(item, path_after, default)
|
|
if value is not default and value != "$$$$ No Data":
|
|
results.append(value)
|
|
|
|
# Flatten the results by one level to handle multiple wildcards
|
|
final_results = []
|
|
for res in results:
|
|
if isinstance(res, list):
|
|
final_results.extend(res)
|
|
else:
|
|
final_results.append(res)
|
|
|
|
return final_results
|
|
|
|
# No wildcard, original logic (iterative)
|
|
current_level = data_structure
|
|
for key_or_index in path:
|
|
if isinstance(current_level, dict):
|
|
current_level = current_level.get(key_or_index)
|
|
if current_level is None:
|
|
return default
|
|
elif isinstance(current_level, list):
|
|
try:
|
|
if isinstance(key_or_index, int) and -len(current_level) <= key_or_index < len(current_level):
|
|
current_level = current_level[key_or_index]
|
|
else:
|
|
return default
|
|
except (IndexError, TypeError):
|
|
return default
|
|
else:
|
|
return default
|
|
return current_level
|
|
|
|
|
|
# ============================================================================
|
|
# CONFIGURATION UTILITIES
|
|
# ============================================================================
|
|
|
|
def get_config_path():
|
|
"""
|
|
Gets the correct path to the config folder.
|
|
Works for both script execution and PyInstaller executable.
|
|
|
|
Returns:
|
|
Path to config folder
|
|
"""
|
|
if getattr(sys, 'frozen', False):
|
|
# Running as a PyInstaller bundle
|
|
config_folder = CONFIG_FOLDER_NAME
|
|
return os.path.join(sys._MEIPASS, config_folder)
|
|
else:
|
|
# Running as a script
|
|
return CONFIG_FOLDER_NAME
|
|
|
|
|
|
def get_old_filename(current_filename, old_suffix="_old"):
|
|
"""Generate old backup filename from current filename.
|
|
|
|
Example: "endobest_inclusions.json" → "endobest_inclusions_old.json"
|
|
|
|
Args:
|
|
current_filename: Current file name (e.g., "endobest_inclusions.json")
|
|
old_suffix: Suffix to append before file extension (default: "_old")
|
|
|
|
Returns:
|
|
Old backup filename with suffix before extension
|
|
"""
|
|
name, ext = os.path.splitext(current_filename)
|
|
return f"{name}{old_suffix}{ext}"
|