2095 lines
90 KiB
Python
2095 lines
90 KiB
Python
"""
|
||
Endobest Dashboard - Excel Export Module
|
||
|
||
This module handles generation of Excel workbooks from Inclusions and Organizations data.
|
||
Fully configurable via external Excel configuration file (Endobest_Dashboard_Config.xlsx).
|
||
|
||
Features:
|
||
- Config-driven workbook generation (no code changes needed)
|
||
- Support for Variable templates and Table data fills
|
||
- Configurable filtering, sorting, and value replacement
|
||
- xlwings-based data processing with automatic formula recalculation
|
||
- Robust error handling and logging
|
||
"""
|
||
|
||
import functools
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import shutil
|
||
import tempfile
|
||
import traceback
|
||
import zipfile
|
||
from datetime import datetime, timedelta, timezone
|
||
from time import perf_counter
|
||
from zoneinfo import ZoneInfo
|
||
|
||
import openpyxl
|
||
from openpyxl.utils import get_column_letter
|
||
from rich.console import Console
|
||
|
||
try:
|
||
import xlwings as xw
|
||
except ImportError:
|
||
xw = None
|
||
|
||
from eb_dashboard_utils import get_nested_value, get_config_path
|
||
from eb_dashboard_constants import (
|
||
INCLUSIONS_FILE_NAME,
|
||
ORGANIZATIONS_FILE_NAME,
|
||
DASHBOARD_CONFIG_FILE_NAME,
|
||
EXCEL_WORKBOOKS_TABLE_NAME,
|
||
EXCEL_SHEETS_TABLE_NAME,
|
||
OUTPUT_ACTION_OVERWRITE,
|
||
OUTPUT_ACTION_INCREMENT,
|
||
OUTPUT_ACTION_BACKUP,
|
||
OUTPUT_ACTIONS,
|
||
SOURCE_TYPE_INCLUSIONS,
|
||
SOURCE_TYPE_ORGANIZATIONS,
|
||
SOURCE_TYPE_VARIABLE,
|
||
SOURCE_TYPES,
|
||
TARGET_TYPE_TABLE,
|
||
TARGET_TYPE_NAMED_RANGE,
|
||
EXCEL_COM_MAX_RETRIES,
|
||
EXCEL_COM_RETRY_DELAY
|
||
)
|
||
|
||
# ============================================================================
|
||
# CONSTANTS
|
||
# ============================================================================
|
||
|
||
EXCEL_OUTPUT_FOLDER = os.getcwd() # Current working directory
|
||
|
||
# ============================================================================
|
||
# MODULE DEPENDENCIES (injected from main module)
|
||
# ============================================================================
|
||
|
||
console = None
|
||
|
||
# NOTE: Constants imported from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH):
|
||
# Configuration Files:
|
||
# - INCLUSIONS_FILE_NAME, ORGANIZATIONS_FILE_NAME, DASHBOARD_CONFIG_FILE_NAME
|
||
# - EXCEL_WORKBOOKS_TABLE_NAME, EXCEL_SHEETS_TABLE_NAME
|
||
# Output Handling:
|
||
# - OUTPUT_ACTION_OVERWRITE, OUTPUT_ACTION_INCREMENT, OUTPUT_ACTION_BACKUP, OUTPUT_ACTIONS
|
||
# Data Sources:
|
||
# - SOURCE_TYPE_INCLUSIONS, SOURCE_TYPE_ORGANIZATIONS, SOURCE_TYPE_VARIABLE, SOURCE_TYPES
|
||
#
|
||
# NOTE: Mapping table names (INCLUSIONS_MAPPING_TABLE_NAME, ORGANIZATIONS_MAPPING_TABLE_NAME)
|
||
# are defined in constants but loaded/used in main script (eb_dashboard.py)
|
||
|
||
|
||
def set_dependencies(console_instance):
|
||
"""
|
||
Inject console instance from main module.
|
||
|
||
Args:
|
||
console_instance: Rich Console instance for formatted output
|
||
|
||
Note:
|
||
File and table names are imported directly from eb_dashboard_constants.py
|
||
(SINGLE SOURCE OF TRUTH)
|
||
"""
|
||
global console
|
||
console = console_instance
|
||
|
||
|
||
# ============================================================================
|
||
# PUBLIC FUNCTIONS
|
||
# ============================================================================
|
||
|
||
def load_excel_export_config(console_instance=None):
|
||
"""
|
||
Load and validate Excel export configuration from config file.
|
||
|
||
Args:
|
||
console_instance: Optional Rich Console instance
|
||
|
||
Returns:
|
||
Tuple of (excel_workbooks_config, excel_sheets_config, has_error, error_messages)
|
||
- excel_workbooks_config: List of workbook definitions
|
||
- excel_sheets_config: List of sheet fill definitions
|
||
- has_error: Boolean flag if critical errors found
|
||
- error_messages: List of error message strings
|
||
"""
|
||
global console
|
||
if console_instance:
|
||
console = console_instance
|
||
|
||
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME)
|
||
error_messages = []
|
||
|
||
try:
|
||
workbook = openpyxl.load_workbook(config_path)
|
||
except FileNotFoundError:
|
||
error_msg = f"Error: Configuration file not found at: {config_path}"
|
||
logging.critical(error_msg)
|
||
console.print(f"[bold red]{error_msg}[/bold red]")
|
||
return None, None, True, [error_msg]
|
||
|
||
# Load Excel_Workbooks sheet
|
||
if EXCEL_WORKBOOKS_TABLE_NAME not in workbook.sheetnames:
|
||
error_msg = f"Error: Sheet '{EXCEL_WORKBOOKS_TABLE_NAME}' not found in configuration file."
|
||
error_messages.append(error_msg)
|
||
return None, None, True, error_messages
|
||
|
||
excel_workbooks_sheet = workbook[EXCEL_WORKBOOKS_TABLE_NAME]
|
||
excel_workbooks_config = []
|
||
|
||
try:
|
||
headers = [cell.value for cell in excel_workbooks_sheet[1]]
|
||
for row_index, row in enumerate(excel_workbooks_sheet.iter_rows(min_row=2, values_only=True), start=2):
|
||
if all(cell is None for cell in row):
|
||
continue # Skip empty rows
|
||
|
||
workbook_config = dict(zip(headers, row))
|
||
|
||
# Validate required fields
|
||
if not workbook_config.get("workbook_id"):
|
||
error_msg = f"Row {row_index}: 'workbook_id' is mandatory"
|
||
error_messages.append(error_msg)
|
||
continue
|
||
|
||
if not workbook_config.get("workbook_template_name"):
|
||
error_msg = f"Row {row_index}: 'workbook_template_name' is mandatory"
|
||
error_messages.append(error_msg)
|
||
continue
|
||
|
||
if not workbook_config.get("output_file_name_template"):
|
||
error_msg = f"Row {row_index}: 'output_file_name_template' is mandatory"
|
||
error_messages.append(error_msg)
|
||
continue
|
||
|
||
if_output_exists = workbook_config.get("if_output_exists", OUTPUT_ACTION_OVERWRITE)
|
||
if if_output_exists not in OUTPUT_ACTIONS:
|
||
error_msg = f"Row {row_index}: 'if_output_exists' must be one of {OUTPUT_ACTIONS}"
|
||
error_messages.append(error_msg)
|
||
continue
|
||
|
||
excel_workbooks_config.append(workbook_config)
|
||
except Exception as e:
|
||
error_msg = f"Error loading Excel_Workbooks sheet: {e}"
|
||
error_messages.append(error_msg)
|
||
return None, None, True, error_messages
|
||
|
||
# Load Excel_Sheets sheet
|
||
if EXCEL_SHEETS_TABLE_NAME not in workbook.sheetnames:
|
||
error_msg = f"Error: Sheet '{EXCEL_SHEETS_TABLE_NAME}' not found in configuration file."
|
||
error_messages.append(error_msg)
|
||
return excel_workbooks_config, None, True, error_messages
|
||
|
||
excel_sheets_sheet = workbook[EXCEL_SHEETS_TABLE_NAME]
|
||
excel_sheets_config = []
|
||
|
||
try:
|
||
headers = [cell.value for cell in excel_sheets_sheet[1]]
|
||
for row_index, row in enumerate(excel_sheets_sheet.iter_rows(min_row=2, values_only=True), start=2):
|
||
if all(cell is None for cell in row):
|
||
continue
|
||
|
||
sheet_config = dict(zip(headers, row))
|
||
|
||
# Validate required fields
|
||
if not sheet_config.get("workbook_id"):
|
||
continue # Skip rows without workbook_id
|
||
|
||
if not sheet_config.get("source_type"):
|
||
error_msg = f"Row {row_index}: 'source_type' is mandatory"
|
||
error_messages.append(error_msg)
|
||
continue
|
||
|
||
source_type = sheet_config["source_type"]
|
||
if source_type not in SOURCE_TYPES:
|
||
error_msg = f"Row {row_index}: 'source_type' must be one of {SOURCE_TYPES}"
|
||
error_messages.append(error_msg)
|
||
continue
|
||
|
||
if not sheet_config.get("source"):
|
||
error_msg = f"Row {row_index}: 'source' is mandatory"
|
||
error_messages.append(error_msg)
|
||
continue
|
||
|
||
if not sheet_config.get("target_name"):
|
||
error_msg = f"Row {row_index}: 'target_name' is mandatory"
|
||
error_messages.append(error_msg)
|
||
continue
|
||
|
||
# Parse JSON fields
|
||
has_json_error = False
|
||
for json_field in ["filter_condition", "sort_keys", "value_replacement"]:
|
||
value = sheet_config.get(json_field)
|
||
if value:
|
||
if isinstance(value, str):
|
||
try:
|
||
sheet_config[json_field] = json.loads(value)
|
||
except json.JSONDecodeError:
|
||
error_msg = f"Row {row_index}, field '{json_field}': Invalid JSON format"
|
||
error_messages.append(error_msg)
|
||
has_json_error = True
|
||
break # ← Skip this row entirely
|
||
# else: value is already parsed (dict/list), keep as-is
|
||
else:
|
||
# Empty/None value - leave as None or empty
|
||
sheet_config[json_field] = None
|
||
|
||
if not has_json_error:
|
||
excel_sheets_config.append(sheet_config)
|
||
except Exception as e:
|
||
error_msg = f"Error loading Excel_Sheets sheet: {e}"
|
||
error_messages.append(error_msg)
|
||
return excel_workbooks_config, excel_sheets_config, True, error_messages
|
||
|
||
workbook.close()
|
||
|
||
has_error = len(error_messages) > 0
|
||
return excel_workbooks_config, excel_sheets_config, has_error, error_messages
|
||
|
||
|
||
def validate_excel_config(excel_config, console_instance, inclusions_mapping_config=None, organizations_mapping_config=None):
|
||
"""
|
||
Validate Excel export configuration against templates.
|
||
|
||
Args:
|
||
excel_config: Tuple of (workbooks_config, sheets_config) from load_excel_export_config()
|
||
console_instance: Rich Console instance
|
||
inclusions_mapping_config: Loaded inclusions mapping config (optional, for future use)
|
||
organizations_mapping_config: Loaded organizations mapping config (optional, for future use)
|
||
|
||
Returns:
|
||
Tuple of (has_critical_error, error_messages)
|
||
"""
|
||
global console
|
||
if console_instance:
|
||
console = console_instance
|
||
|
||
if not excel_config or not excel_config[0] or not excel_config[1]:
|
||
return False, [] # No config to validate
|
||
|
||
excel_workbooks_config, excel_sheets_config = excel_config[0], excel_config[1]
|
||
error_messages = []
|
||
|
||
# Validate each workbook
|
||
for workbook_config in excel_workbooks_config:
|
||
workbook_id = workbook_config.get("workbook_id")
|
||
template_name = workbook_config.get("workbook_template_name")
|
||
|
||
# Check template exists
|
||
template_path = os.path.join(get_config_path(), template_name)
|
||
if not os.path.exists(template_path):
|
||
error_msg = f"Template '{template_name}' (workbook_id: {workbook_id}) not found in config/"
|
||
error_messages.append(error_msg)
|
||
continue
|
||
|
||
# Check template is valid Excel
|
||
try:
|
||
template_wb = openpyxl.load_workbook(template_path)
|
||
except Exception as e:
|
||
error_msg = f"Template '{template_name}' (workbook_id: {workbook_id}) is not a valid Excel file: {e}"
|
||
error_messages.append(error_msg)
|
||
continue
|
||
|
||
# Validate sheets for this workbook
|
||
workbook_sheets = [s for s in excel_sheets_config if s.get("workbook_id") == workbook_id]
|
||
|
||
for sheet_config in workbook_sheets:
|
||
target_name = sheet_config.get("target_name")
|
||
source_type = sheet_config.get("source_type")
|
||
|
||
# Find the target in the template (check both named ranges AND tables)
|
||
target_found = False
|
||
if target_name in template_wb.defined_names:
|
||
target_found = True
|
||
else:
|
||
# Check if it's a table in any sheet
|
||
for sheet in template_wb.sheetnames:
|
||
sheet_obj = template_wb[sheet]
|
||
if hasattr(sheet_obj, 'tables') and target_name in sheet_obj.tables:
|
||
target_found = True
|
||
break
|
||
|
||
# If target was found, validate based on source type
|
||
if target_found:
|
||
# For Variable sources, ensure it's a single cell
|
||
if source_type == SOURCE_TYPE_VARIABLE:
|
||
# Check if the defined name references a single cell
|
||
# NOTE: We still use openpyxl here because template_wb is already open from config loading
|
||
table_dims = _get_named_range_dimensions(template_wb, target_name)
|
||
if table_dims:
|
||
_, _, height, width = table_dims
|
||
if height != 1 or width != 1:
|
||
error_msg = f"Target '{target_name}' (template: {template_name}) for Variable source must reference a single cell (found {height}x{width})"
|
||
error_messages.append(error_msg)
|
||
|
||
# For Table sources (Inclusions/Organizations), validate dimensions
|
||
elif source_type in [SOURCE_TYPE_INCLUSIONS, SOURCE_TYPE_ORGANIZATIONS]:
|
||
# Get the dimensions of the named range
|
||
# NOTE: We still use openpyxl here because template_wb is already open from config loading
|
||
table_dims = _get_named_range_dimensions(template_wb, target_name)
|
||
if table_dims:
|
||
_, _, height, width = table_dims
|
||
|
||
# CRITICAL: Table height MUST be exactly 1 (template row only)
|
||
if height != 1:
|
||
error_msg = f"Target '{target_name}' (template: {template_name}, source_type: {source_type}) must have height=1 (found height={height}). " \
|
||
f"Template row must be a single row."
|
||
error_messages.append(error_msg)
|
||
|
||
# CRITICAL: Table width must be >= max(mapping_indices)
|
||
# Get the mapping column to validate indices
|
||
source = sheet_config.get("source")
|
||
if source:
|
||
mapping_config = inclusions_mapping_config if source_type == SOURCE_TYPE_INCLUSIONS else organizations_mapping_config
|
||
if mapping_config:
|
||
column_mapping = _get_column_mapping(mapping_config, source, source_type)
|
||
if column_mapping:
|
||
max_col_index = max(column_mapping.keys()) # 0-based index
|
||
if max_col_index >= width:
|
||
error_msg = f"Target '{target_name}' (template: {template_name}) width={width} is insufficient. " \
|
||
f"Maximum column index from mapping is {max_col_index} (0-based). " \
|
||
f"Width must be > {max_col_index}."
|
||
error_messages.append(error_msg)
|
||
else:
|
||
error_msg = f"Named range '{target_name}' (template: {template_name}, workbook_id: {workbook_id}) not found in template"
|
||
error_messages.append(error_msg)
|
||
|
||
template_wb.close()
|
||
|
||
return len(error_messages) > 0, error_messages
|
||
|
||
|
||
def export_to_excel(inclusions_data, organizations_data, excel_config,
|
||
inclusions_mapping_config=None, organizations_mapping_config=None):
|
||
"""
|
||
Main export function - orchestrates Excel workbook generation.
|
||
|
||
Args:
|
||
inclusions_data: List of inclusion dictionaries
|
||
organizations_data: List of organization dictionaries
|
||
excel_config: Tuple of (workbooks_config, sheets_config)
|
||
inclusions_mapping_config: Inclusions field mapping configuration
|
||
organizations_mapping_config: Organizations field mapping configuration
|
||
|
||
Returns:
|
||
Tuple of (success, error_count)
|
||
|
||
Note:
|
||
Uses global console instance (injected from main script)
|
||
"""
|
||
if not excel_config or not excel_config[0] or not excel_config[1]:
|
||
console.print("[yellow]⚠ No Excel export configuration found, skipping[/yellow]")
|
||
return True, 0
|
||
|
||
excel_workbooks_config, excel_sheets_config = excel_config[0], excel_config[1]
|
||
|
||
# Prepare template variables
|
||
template_vars = _prepare_template_variables()
|
||
|
||
error_count = 0
|
||
success_count = 0
|
||
|
||
# Track overall export duration
|
||
export_start_time = perf_counter()
|
||
|
||
# Process each workbook
|
||
for workbook_config in excel_workbooks_config:
|
||
try:
|
||
workbook_id = workbook_config.get("workbook_id")
|
||
template_name = workbook_config.get("workbook_template_name")
|
||
output_template = workbook_config.get("output_file_name_template")
|
||
if_output_exists = workbook_config.get("if_output_exists", OUTPUT_ACTION_OVERWRITE)
|
||
|
||
# Resolve output filename
|
||
try:
|
||
output_filename = output_template.format(**template_vars)
|
||
except KeyError as e:
|
||
console.print(f"[bold red]✗ Unknown variable in template: {e}[/bold red]")
|
||
error_count += 1
|
||
continue
|
||
|
||
output_path = os.path.join(EXCEL_OUTPUT_FOLDER, output_filename)
|
||
|
||
# Log workbook processing start
|
||
logging.info(f"Processing workbook: {workbook_id} (template: {template_name}, output: {output_filename})")
|
||
|
||
# PHASE PRÉPARATION: Handle existing file according to action
|
||
output_path = _handle_output_exists(output_path, if_output_exists)
|
||
|
||
# XLWINGS PHASE: Open template, fill, save as output
|
||
template_path = os.path.join(get_config_path(), template_name)
|
||
|
||
# Track workbook processing duration with spinning status
|
||
workbook_start_time = perf_counter()
|
||
|
||
try:
|
||
if xw is None:
|
||
raise ImportError("xlwings is not installed. Install with: pip install xlwings")
|
||
|
||
# Use status with spinner while processing the workbook
|
||
with console.status(f"[bold cyan]Exporting {output_filename}...", spinner="dots"):
|
||
# PERFORMANCE: Make Excel invisible BEFORE opening the workbook
|
||
app_xw = None
|
||
screen_updating_original = None
|
||
visible_original = None
|
||
try:
|
||
# Get or create Excel app in invisible mode
|
||
if xw.apps:
|
||
app_xw = xw.apps.active
|
||
visible_original = app_xw.visible
|
||
screen_updating_original = app_xw.screen_updating
|
||
else:
|
||
# Create new app in invisible mode
|
||
app_xw = xw.App(visible=False)
|
||
visible_original = False
|
||
screen_updating_original = True
|
||
|
||
app_xw.visible = False # Make Excel invisible
|
||
app_xw.screen_updating = False # Disable screen updates
|
||
|
||
except Exception as e:
|
||
logging.warning(f"Failed to manage Excel visibility: {e}")
|
||
app_xw = None
|
||
|
||
# Open TEMPLATE directly (not a copy)
|
||
wb_xw = xw.Book(template_path, update_links=False)
|
||
|
||
try:
|
||
# CAPTURE TEMPLATE STATE: Save initial state for restoration before save
|
||
template_state = _capture_workbook_state(wb_xw, workbook_context=f"{workbook_id} ({output_filename})")
|
||
logging.info(f"Captured template state: active_sheet='{template_state['active_sheet']}', {len(template_state['sheets'])} sheet(s)")
|
||
|
||
# Get sheets for this workbook
|
||
workbook_sheets = [s for s in excel_sheets_config if s.get("workbook_id") == workbook_id]
|
||
|
||
# Process each sheet with xlwings
|
||
for sheet_config in workbook_sheets:
|
||
_process_sheet_xlwings(
|
||
wb_xw,
|
||
sheet_config,
|
||
inclusions_data,
|
||
organizations_data,
|
||
inclusions_mapping_config=inclusions_mapping_config,
|
||
organizations_mapping_config=organizations_mapping_config,
|
||
template_vars=template_vars,
|
||
workbook_context=f"{workbook_id} ({output_filename})"
|
||
)
|
||
|
||
# RESTORE TEMPLATE STATE: Restore initial state before saving
|
||
_restore_workbook_state(wb_xw, template_state, workbook_context=f"{workbook_id} ({output_filename})")
|
||
logging.info(f"Restored template state before save")
|
||
|
||
# Save as output file with forced overwrite (with retry mechanism)
|
||
# This preserves filesystem versioning for cloud storage
|
||
# Disable alerts to force silent overwrite
|
||
abs_output_path = os.path.abspath(output_path)
|
||
if app_xw:
|
||
display_alerts_original = app_xw.api.DisplayAlerts
|
||
app_xw.api.DisplayAlerts = False
|
||
try:
|
||
_save_workbook_with_retry(wb_xw, abs_output_path)
|
||
logging.info(f"Saved workbook to: {abs_output_path}")
|
||
finally:
|
||
if app_xw:
|
||
app_xw.api.DisplayAlerts = display_alerts_original
|
||
# Excel automatically recalculates formulas on save
|
||
# No need for separate recalculation step
|
||
|
||
finally:
|
||
# Always close the workbook and restore visibility/screen updates
|
||
wb_xw.close()
|
||
if app_xw is not None:
|
||
try:
|
||
if screen_updating_original is not None:
|
||
app_xw.screen_updating = screen_updating_original
|
||
if visible_original is not None:
|
||
app_xw.visible = visible_original
|
||
except:
|
||
pass
|
||
|
||
# Calculate duration and display success message
|
||
workbook_duration = perf_counter() - workbook_start_time
|
||
console.print(f"[green]✓ Created: {output_filename} ({workbook_duration:.2f}s)[/green]")
|
||
success_count += 1
|
||
|
||
except Exception as e:
|
||
console.print(f"[bold red]✗ Error processing {output_filename}: {e}[/bold red]")
|
||
logging.error(f"Excel export error for {output_filename}: {e}", exc_info=True)
|
||
error_count += 1
|
||
continue
|
||
|
||
except Exception as e:
|
||
console.print(f"[bold red]✗ Error processing workbook {workbook_id}: {e}[/bold red]")
|
||
logging.error(f"Excel workbook processing error: {e}", exc_info=True)
|
||
error_count += 1
|
||
|
||
# Summary with total duration
|
||
total_workbooks = success_count + error_count
|
||
export_duration = perf_counter() - export_start_time
|
||
|
||
if error_count == 0:
|
||
# Success: all workbooks processed
|
||
console.print(f"\n[green]✓ Excel export completed successfully: {success_count}/{total_workbooks} workbooks generated ({export_duration:.2f}s)[/green]")
|
||
else:
|
||
# Failure: some or all workbooks failed
|
||
if success_count > 0:
|
||
# Partial success
|
||
console.print(f"\n[yellow]⚠ Excel export completed with errors ({export_duration:.2f}s)[/yellow]")
|
||
console.print(f"[green] {success_count} workbook(s) generated successfully[/green]")
|
||
console.print(f"[bold red] {error_count} workbook(s) failed[/bold red]")
|
||
else:
|
||
# Complete failure
|
||
console.print(f"\n[bold red]✗ Excel export failed: all {error_count} workbook(s) failed ({export_duration:.2f}s)[/bold red]")
|
||
|
||
return error_count == 0, error_count
|
||
|
||
|
||
# ============================================================================
|
||
# INTERNAL FUNCTIONS
|
||
# ============================================================================
|
||
|
||
def _prepare_template_variables():
|
||
"""
|
||
Prepare variables available for Template String evaluation.
|
||
|
||
Returns:
|
||
Dictionary of variables available to .format(**locals())
|
||
"""
|
||
# Get UTC timestamp from inclusions file
|
||
# Use constant from eb_dashboard_constants (SINGLE SOURCE OF TRUTH)
|
||
inclusions_file = INCLUSIONS_FILE_NAME
|
||
if os.path.exists(inclusions_file):
|
||
file_mtime = os.path.getmtime(inclusions_file)
|
||
extract_date_time_utc = datetime.fromtimestamp(file_mtime, tz=timezone.utc)
|
||
else:
|
||
extract_date_time_utc = datetime.now(tz=timezone.utc)
|
||
|
||
# Convert to Paris timezone
|
||
extract_date_time_french = extract_date_time_utc.astimezone(
|
||
ZoneInfo('Europe/Paris')
|
||
)
|
||
|
||
return {
|
||
'extract_date_time_utc': extract_date_time_utc,
|
||
'extract_date_time_french': extract_date_time_french,
|
||
}
|
||
|
||
|
||
def _apply_filter(item, filter_condition):
|
||
"""
|
||
Apply filter condition to item (AND logic for all conditions).
|
||
|
||
Args:
|
||
item: Dictionary to filter
|
||
filter_condition: List of [field_name, operator, value] conditions
|
||
|
||
Returns:
|
||
Boolean True if item passes all filters
|
||
"""
|
||
if not filter_condition:
|
||
return True # Empty filter = accept all
|
||
|
||
for field_path, operator, expected_value in filter_condition:
|
||
actual_value = get_nested_value(item, field_path.split("."))
|
||
|
||
if actual_value is None:
|
||
return False # Missing field = filter out
|
||
|
||
# Apply operator
|
||
if operator == "==":
|
||
if actual_value != expected_value:
|
||
return False
|
||
elif operator == "<>":
|
||
if actual_value == expected_value:
|
||
return False
|
||
elif operator == ">":
|
||
if not (actual_value > expected_value):
|
||
return False
|
||
elif operator == ">=":
|
||
if not (actual_value >= expected_value):
|
||
return False
|
||
elif operator == "<":
|
||
if not (actual_value < expected_value):
|
||
return False
|
||
elif operator == "<=":
|
||
if not (actual_value <= expected_value):
|
||
return False
|
||
|
||
return True # All conditions passed
|
||
|
||
|
||
def _apply_sort(items, sort_keys):
|
||
"""
|
||
Apply multi-key sort to items with support for mixed asc/desc ordering.
|
||
|
||
Args:
|
||
items: List of dictionaries to sort
|
||
sort_keys: List of [field_name, order] or [field_name, order, option]
|
||
where:
|
||
- order is "asc" or "desc"
|
||
- option (optional) can be:
|
||
* datetime format string (e.g., "%Y-%m-%d") for date parsing
|
||
* "*natsort" for natural alphanumeric sorting
|
||
Supports MIXED asc/desc on different columns!
|
||
|
||
Returns:
|
||
Sorted list
|
||
"""
|
||
if not sort_keys:
|
||
return items
|
||
|
||
def natural_sort_key(text):
|
||
"""
|
||
Helper for natural alphanumeric sorting.
|
||
Converts "ENDOBEST-003-920-BA" to ["endobest", "-", 3, "-", 920, "-", "ba"]
|
||
Python's native list comparison handles the rest element by element.
|
||
"""
|
||
def convert(segment):
|
||
return int(segment) if segment.isdigit() else segment.lower()
|
||
return [convert(s) for s in re.split(r'(\d+)', str(text)) if s]
|
||
|
||
def compare_items(item1, item2):
|
||
"""
|
||
Comparator function for multi-key sorting with mixed asc/desc support.
|
||
Returns: -1 if item1 < item2, 0 if equal, 1 if item1 > item2
|
||
"""
|
||
for sort_spec in sort_keys:
|
||
field_name = sort_spec[0]
|
||
order = sort_spec[1] if len(sort_spec) > 1 else "asc"
|
||
sort_option = sort_spec[2] if len(sort_spec) > 2 else None
|
||
|
||
# Get values from both items
|
||
val1 = get_nested_value(item1, field_name.split("."))
|
||
val2 = get_nested_value(item2, field_name.split("."))
|
||
|
||
# Handle undefined/None - place at end
|
||
is_undef1 = val1 in [None, "", "undefined"]
|
||
is_undef2 = val2 in [None, "", "undefined"]
|
||
|
||
# Both undefined: equal
|
||
if is_undef1 and is_undef2:
|
||
continue
|
||
|
||
# Only one undefined: undefined goes last
|
||
if is_undef1:
|
||
return 1 # item1 > item2 (undefined last)
|
||
if is_undef2:
|
||
return -1 # item1 < item2 (item2 is undefined)
|
||
|
||
# Check if natural sort requested
|
||
is_natural_sort = (sort_option == "*natsort")
|
||
|
||
# Parse datetime if option is a datetime format (not *natsort)
|
||
if sort_option and not is_natural_sort:
|
||
datetime_format = sort_option
|
||
try:
|
||
val1 = datetime.strptime(str(val1), datetime_format).timestamp()
|
||
except (ValueError, TypeError):
|
||
val1 = None
|
||
return 1 # Invalid datetime goes last
|
||
|
||
try:
|
||
val2 = datetime.strptime(str(val2), datetime_format).timestamp()
|
||
except (ValueError, TypeError):
|
||
val2 = None
|
||
return -1 # Invalid datetime goes last
|
||
|
||
# Apply natural sort transformation if requested
|
||
if is_natural_sort:
|
||
val1 = natural_sort_key(val1)
|
||
val2 = natural_sort_key(val2)
|
||
|
||
# Compare values
|
||
# For strings (non-natsort), use case-insensitive comparison for natural alphabetical ordering
|
||
if isinstance(val1, str) and isinstance(val2, str):
|
||
val1_lower = val1.lower()
|
||
val2_lower = val2.lower()
|
||
if val1_lower < val2_lower:
|
||
cmp_result = -1
|
||
elif val1_lower > val2_lower:
|
||
cmp_result = 1
|
||
else:
|
||
# Case-insensitive equal, use case-sensitive as tiebreaker
|
||
if val1 < val2:
|
||
cmp_result = -1
|
||
elif val1 > val2:
|
||
cmp_result = 1
|
||
else:
|
||
cmp_result = 0
|
||
else:
|
||
# Non-string comparison (numbers, dates, natsort lists, etc.)
|
||
if val1 < val2:
|
||
cmp_result = -1
|
||
elif val1 > val2:
|
||
cmp_result = 1
|
||
else:
|
||
cmp_result = 0 # Equal, continue to next sort key
|
||
|
||
# Apply asc/desc ordering
|
||
if cmp_result != 0:
|
||
is_desc = isinstance(order, str) and order.lower() == "desc"
|
||
return cmp_result if not is_desc else -cmp_result
|
||
|
||
# All keys are equal
|
||
return 0
|
||
|
||
# Use functools.cmp_to_key to convert comparator to key function
|
||
return sorted(items, key=functools.cmp_to_key(compare_items))
|
||
|
||
|
||
def _apply_value_replacement(value, replacements):
|
||
"""
|
||
Apply value replacement rules (first-match-wins, strict type matching).
|
||
|
||
Args:
|
||
value: Value to potentially replace
|
||
replacements: List of [value_before, value_after] pairs
|
||
|
||
Returns:
|
||
Replaced value or original
|
||
|
||
Note:
|
||
This function is currently prepared for future use in table data filling.
|
||
"""
|
||
if not replacements:
|
||
return value
|
||
|
||
for value_before, value_after in replacements:
|
||
if value == value_before: # Strict equality
|
||
return value_after
|
||
|
||
return value # No match, return original
|
||
|
||
|
||
# OBSOLETE: _preserve_media_in_workbook() removed - xlwings handles media preservation automatically
|
||
# When using xlwings, Excel natively preserves all media, images, and relationships
|
||
|
||
|
||
def _save_workbook_with_retry(wb_xw, output_path):
|
||
"""
|
||
Save workbook with retry mechanism for transient xlwings/Excel failures.
|
||
|
||
Excel's SaveAs can fail randomly on some environments (especially Excel 2013).
|
||
This function retries the save operation with configurable retry count and delay.
|
||
|
||
Args:
|
||
wb_xw: xlwings Book object
|
||
output_path: Absolute path where workbook should be saved
|
||
|
||
Raises:
|
||
Exception: If SaveAs fails after all retry attempts
|
||
"""
|
||
from time import sleep
|
||
|
||
for attempt in range(1, EXCEL_COM_MAX_RETRIES + 1):
|
||
try:
|
||
logging.info(f"SaveAs attempt {attempt}/{EXCEL_COM_MAX_RETRIES}: {output_path}")
|
||
wb_xw.api.SaveAs(output_path)
|
||
logging.info(f"SaveAs succeeded on attempt {attempt}")
|
||
return # Success
|
||
|
||
except Exception as e:
|
||
error_msg = f"SaveAs failed on attempt {attempt}: {type(e).__name__}: {str(e)}"
|
||
|
||
if attempt < EXCEL_COM_MAX_RETRIES:
|
||
# Intermediate retry - log as warning and sleep before retry
|
||
logging.warning(f"{error_msg} - Retrying in {EXCEL_COM_RETRY_DELAY}s...")
|
||
sleep(EXCEL_COM_RETRY_DELAY)
|
||
else:
|
||
# Final attempt failed - log as critical error and raise
|
||
logging.error(f"{error_msg} - All {EXCEL_COM_MAX_RETRIES} retry attempts exhausted")
|
||
raise
|
||
|
||
|
||
def _capture_workbook_state(wb_xw, workbook_context=""):
|
||
"""
|
||
Capture the visual state of the workbook (active sheet, selections, scroll positions).
|
||
|
||
This allows restoration of the template's visual state after data processing,
|
||
ensuring recipients see the workbook exactly as designed in the template.
|
||
|
||
Args:
|
||
wb_xw: xlwings Book object
|
||
workbook_context: String identifier for logging (workbook_id and filename)
|
||
|
||
Returns:
|
||
dict: State dictionary with 'active_sheet' and 'sheets' state per sheet
|
||
"""
|
||
ctx = f"[{workbook_context}]" if workbook_context else ""
|
||
logging.info(f"{ctx} [CAPTURE_STATE] Starting workbook state capture")
|
||
logging.info(f"{ctx} [CAPTURE_STATE] Total sheets: {len(wb_xw.sheets)}")
|
||
|
||
state = {
|
||
'active_sheet': None,
|
||
'sheets': {}
|
||
}
|
||
|
||
try:
|
||
# Capture active sheet name
|
||
state['active_sheet'] = wb_xw.api.ActiveSheet.Name
|
||
logging.info(f"{ctx} [CAPTURE_STATE] Active sheet captured: '{state['active_sheet']}'")
|
||
except Exception as e:
|
||
logging.warning(f"{ctx} [CAPTURE_STATE] Could not capture active sheet: {type(e).__name__}: {str(e)}")
|
||
|
||
# Capture state for each sheet
|
||
for idx, sheet in enumerate(wb_xw.sheets, 1):
|
||
logging.info(f"{ctx} [CAPTURE_STATE] Processing sheet {idx}/{len(wb_xw.sheets)}: '{sheet.name}'")
|
||
try:
|
||
# Activate sheet to get its state
|
||
sheet.activate()
|
||
logging.info(f"{ctx} [CAPTURE_STATE] Sheet '{sheet.name}' activated successfully")
|
||
sheet_api = sheet.api
|
||
|
||
sheet_state = {
|
||
'selection': None,
|
||
'scroll_row': 1,
|
||
'scroll_col': 1
|
||
}
|
||
|
||
# Capture selection address
|
||
try:
|
||
selection_address = sheet_api.Application.Selection.Address
|
||
sheet_state['selection'] = selection_address
|
||
logging.info(f"{ctx} [CAPTURE_STATE] Sheet '{sheet.name}' selection captured: {selection_address}")
|
||
except Exception as e:
|
||
sheet_state['selection'] = "A1" # Default
|
||
logging.warning(f"{ctx} [CAPTURE_STATE] Could not capture selection for sheet '{sheet.name}': {type(e).__name__}, defaulting to A1")
|
||
|
||
# Capture scroll position
|
||
try:
|
||
scroll_row = sheet_api.Application.ActiveWindow.ScrollRow
|
||
scroll_col = sheet_api.Application.ActiveWindow.ScrollColumn
|
||
sheet_state['scroll_row'] = scroll_row
|
||
sheet_state['scroll_col'] = scroll_col
|
||
logging.info(f"{ctx} [CAPTURE_STATE] Sheet '{sheet.name}' scroll position captured: Row={scroll_row}, Col={scroll_col}")
|
||
except Exception as e:
|
||
logging.warning(f"{ctx} [CAPTURE_STATE] Could not capture scroll position for sheet '{sheet.name}': {type(e).__name__}, keeping defaults")
|
||
|
||
state['sheets'][sheet.name] = sheet_state
|
||
logging.info(f"{ctx} [CAPTURE_STATE] Sheet '{sheet.name}' state complete: {sheet_state}")
|
||
|
||
except Exception as e:
|
||
logging.error(f"{ctx} [CAPTURE_STATE] ERROR capturing state for sheet '{sheet.name}': {type(e).__name__}: {str(e)}")
|
||
|
||
logging.info(f"{ctx} [CAPTURE_STATE] Workbook state capture complete. Captured {len(state['sheets'])} sheet(s)")
|
||
return state
|
||
|
||
|
||
def _restore_workbook_state(wb_xw, state, workbook_context=""):
|
||
"""
|
||
Restore the visual state of the workbook (active sheet, selections, scroll positions).
|
||
|
||
Args:
|
||
wb_xw: xlwings Book object
|
||
state: State dictionary from _capture_workbook_state()
|
||
workbook_context: String identifier for logging (workbook_id and filename)
|
||
"""
|
||
if not state:
|
||
logging.warning("[RESTORE_STATE] Empty state provided, skipping restoration")
|
||
return
|
||
|
||
from time import sleep
|
||
|
||
ctx = f"[{workbook_context}]" if workbook_context else ""
|
||
logging.info(f"{ctx} [RESTORE_STATE] Starting workbook state restoration")
|
||
logging.info(f"{ctx} [RESTORE_STATE] Restoring {len(state.get('sheets', {}))} sheet(s)")
|
||
|
||
# NOTE: Screen updating is already disabled at the global level (in export_to_excel)
|
||
# for the entire workbook processing cycle (from open to save).
|
||
# We do NOT re-disable it here to avoid state conflicts.
|
||
# The global setting ensures all operations (capture, process, restore, save) run efficiently.
|
||
|
||
# CRITICAL: Excel 2013 COM layer lock recovery
|
||
# After bulk paste operations, Excel's COM layer can enter a "locked" state where Range.Select()
|
||
# fails persistently. This appears to be a fundamental limitation/bug in Excel 2013.
|
||
# To work around this, we need to:
|
||
# 1. Give Excel time to recover with a large delay
|
||
# 2. Then make a "dummy" Range.Select() to wake up the COM layer
|
||
# 3. Then proceed with real restorations
|
||
logging.info(f"{ctx} [RESTORE_STATE] Waiting for Excel COM layer to stabilize after bulk operations (2 seconds)...")
|
||
sleep(2.0) # Large delay to allow COM layer to recover
|
||
|
||
# Track original visibility state (used for temporary visibility during retries)
|
||
original_app_visible = None
|
||
try:
|
||
if wb_xw.app:
|
||
original_app_visible = wb_xw.app.visible
|
||
|
||
if not original_app_visible:
|
||
# Make Excel visible during restoration so user sees what's happening
|
||
# (important for selection restore retries which may take 2+ seconds)
|
||
wb_xw.app.visible = True
|
||
logging.info(f"{ctx} [RESTORE_STATE] Excel app temporarily made visible for restoration operations")
|
||
except Exception as e:
|
||
logging.warning(f"{ctx} [RESTORE_STATE] Could not manage Excel visibility during restoration: {type(e).__name__}: {str(e)}")
|
||
|
||
# Wake up the COM layer with a dummy selection attempt on the first sheet
|
||
# This "primes" the COM layer so subsequent Range.Select() calls work reliably
|
||
try:
|
||
if len(wb_xw.sheets) > 0:
|
||
first_sheet = wb_xw.sheets[0]
|
||
first_sheet.activate()
|
||
logging.info(f"{ctx} [RESTORE_STATE] Priming COM layer by activating first sheet...")
|
||
first_sheet.api.Range("$A$1").Select()
|
||
logging.info(f"{ctx} [RESTORE_STATE] COM layer priming successful")
|
||
except Exception as e:
|
||
# This is not critical - if it fails, retries will handle it
|
||
logging.info(f"{ctx} [RESTORE_STATE] COM layer priming attempt completed (may have failed, retries will handle it)")
|
||
|
||
# Restore state for each sheet
|
||
for idx, (sheet_name, sheet_state) in enumerate(state.get('sheets', {}).items(), 1):
|
||
logging.info(f"{ctx} [RESTORE_STATE] Processing sheet {idx}: '{sheet_name}'")
|
||
try:
|
||
sheet = wb_xw.sheets[sheet_name]
|
||
sheet.activate()
|
||
logging.info(f"{ctx} [RESTORE_STATE] Sheet '{sheet_name}' activated successfully")
|
||
|
||
# Small delay after activation to ensure Excel has completed the sheet switch
|
||
sleep(0.3)
|
||
|
||
sheet_api = sheet.api
|
||
|
||
# Restore selection with retry mechanism for transient Excel COM failures
|
||
if sheet_state.get('selection'):
|
||
selection = sheet_state['selection']
|
||
selection_restored = False
|
||
|
||
# Try to restore original selection with retry
|
||
for attempt in range(1, EXCEL_COM_MAX_RETRIES + 1):
|
||
try:
|
||
logging.info(f"{ctx} [RESTORE_STATE] Selection restore attempt {attempt}/{EXCEL_COM_MAX_RETRIES} for '{selection}' on sheet '{sheet_name}'")
|
||
sheet_api.Range(selection).Select()
|
||
logging.info(f"{ctx} [RESTORE_STATE] Sheet '{sheet_name}' selection restored to: {selection}")
|
||
selection_restored = True
|
||
break # Success
|
||
except Exception as e:
|
||
error_msg = f"Selection restore failed on attempt {attempt}: {type(e).__name__}: {str(e)}"
|
||
|
||
if attempt < EXCEL_COM_MAX_RETRIES:
|
||
# Intermediate retry - log as warning and sleep before retry
|
||
logging.warning(f"{ctx} [RESTORE_STATE] {error_msg} - Retrying in {EXCEL_COM_RETRY_DELAY}s...")
|
||
sleep(EXCEL_COM_RETRY_DELAY)
|
||
else:
|
||
# Final attempt failed - log as error, will default to A1
|
||
logging.error(f"{ctx} [RESTORE_STATE] {error_msg} - All {EXCEL_COM_MAX_RETRIES} retry attempts exhausted")
|
||
|
||
# If selection restore failed after all retries, default to A1
|
||
if not selection_restored:
|
||
logging.warning(f"{ctx} [RESTORE_STATE] Could not restore selection '{selection}' for sheet '{sheet_name}' after {EXCEL_COM_MAX_RETRIES} attempts, defaulting to A1")
|
||
|
||
# Try to set default A1 selection (using absolute reference: $A$1)
|
||
for attempt in range(1, EXCEL_COM_MAX_RETRIES + 1):
|
||
try:
|
||
logging.info(f"{ctx} [RESTORE_STATE] A1 default attempt {attempt}/{EXCEL_COM_MAX_RETRIES} for sheet '{sheet_name}'")
|
||
sheet_api.Range("$A$1").Select()
|
||
logging.info(f"{ctx} [RESTORE_STATE] Sheet '{sheet_name}' selection defaulted to A1")
|
||
break # Success
|
||
except Exception as e2:
|
||
error_msg = f"A1 default failed on attempt {attempt}: {type(e2).__name__}: {str(e2)}"
|
||
|
||
if attempt < EXCEL_COM_MAX_RETRIES:
|
||
logging.warning(f"{ctx} [RESTORE_STATE] {error_msg} - Retrying in {EXCEL_COM_RETRY_DELAY}s...")
|
||
sleep(EXCEL_COM_RETRY_DELAY)
|
||
else:
|
||
logging.error(f"{ctx} [RESTORE_STATE] {error_msg} - All {EXCEL_COM_MAX_RETRIES} retry attempts exhausted")
|
||
|
||
# Restore scroll position
|
||
try:
|
||
scroll_row = sheet_state.get('scroll_row', 1)
|
||
scroll_col = sheet_state.get('scroll_col', 1)
|
||
sheet_api.Application.ActiveWindow.ScrollRow = scroll_row
|
||
sheet_api.Application.ActiveWindow.ScrollColumn = scroll_col
|
||
logging.info(f"{ctx} [RESTORE_STATE] Sheet '{sheet_name}' scroll position restored: Row={scroll_row}, Col={scroll_col}")
|
||
except Exception as e:
|
||
logging.warning(f"{ctx} [RESTORE_STATE] Could not restore scroll position for sheet '{sheet_name}': {type(e).__name__}")
|
||
|
||
except Exception as e:
|
||
logging.error(f"{ctx} [RESTORE_STATE] ERROR restoring state for sheet '{sheet_name}': {type(e).__name__}: {str(e)}")
|
||
|
||
# Restore active sheet
|
||
if state.get('active_sheet'):
|
||
try:
|
||
from time import sleep
|
||
|
||
active_sheet_name = state['active_sheet']
|
||
wb_xw.sheets[active_sheet_name].activate()
|
||
|
||
# Wait for sheet activation to complete on Excel 2013's COM layer
|
||
sleep(0.3)
|
||
|
||
logging.info(f"{ctx} [RESTORE_STATE] Active sheet restored to: '{active_sheet_name}'")
|
||
except Exception as e:
|
||
logging.error(f"{ctx} [RESTORE_STATE] Could not restore active sheet '{state.get('active_sheet')}': {type(e).__name__}: {str(e)}")
|
||
|
||
# Force sheet tabs to scroll to show the first sheet
|
||
# This ensures the tab bar starts from the first sheet, regardless of which sheet is active
|
||
# NOTE: ScrollWorkbookTabs only works when Excel is visible
|
||
try:
|
||
if len(wb_xw.sheets) > 0 and wb_xw.app:
|
||
logging.info(f"{ctx} [RESTORE_STATE] Attempting to scroll sheet tabs to first sheet")
|
||
|
||
try:
|
||
# ScrollWorkbookTabs with negative number scrolls tabs LEFT (toward first sheet)
|
||
# Use large negative number (-100) to guarantee we reach the beginning
|
||
# Excel visibility is already managed at the beginning of this function
|
||
wb_xw.api.Application.ActiveWindow.ScrollWorkbookTabs(-100)
|
||
logging.info(f"{ctx} [RESTORE_STATE] Sheet tabs scrolled to beginning")
|
||
except Exception as e:
|
||
logging.warning(f"{ctx} [RESTORE_STATE] Could not scroll sheet tabs to beginning: {type(e).__name__}: {str(e)}")
|
||
except Exception as e:
|
||
logging.error(f"{ctx} [RESTORE_STATE] ERROR during sheet tabs scroll operation: {type(e).__name__}: {str(e)}")
|
||
|
||
# Restore original visibility state (if we temporarily made it visible)
|
||
# NOTE: Screen updating restoration is handled at the global level (in export_to_excel)
|
||
# after the workbook is saved and closed
|
||
try:
|
||
if original_app_visible is not None and wb_xw.app:
|
||
if not original_app_visible and wb_xw.app.visible:
|
||
# Restore to hidden state if it was originally hidden
|
||
wb_xw.app.visible = False
|
||
logging.info(f"{ctx} [RESTORE_STATE] Excel app visibility restored to original state: False")
|
||
except Exception as e:
|
||
logging.warning(f"{ctx} [RESTORE_STATE] Could not restore Excel app visibility: {type(e).__name__}: {str(e)}")
|
||
|
||
logging.info(f"{ctx} [RESTORE_STATE] Workbook state restoration complete")
|
||
|
||
|
||
def _handle_output_exists(output_path, action):
|
||
"""
|
||
Handle existing output file (Overwrite/Increment/Backup).
|
||
|
||
Args:
|
||
output_path: Full path to output file
|
||
action: "Overwrite", "Increment", or "Backup"
|
||
|
||
Returns:
|
||
Actual path to use (may be different if Increment/Backup)
|
||
"""
|
||
if not os.path.exists(output_path):
|
||
logging.info(f"Output file doesn't exist yet: {output_path}")
|
||
return output_path
|
||
|
||
logging.info(f"Output file exists, applying '{action}' rule: {output_path}")
|
||
|
||
if action == OUTPUT_ACTION_OVERWRITE:
|
||
logging.info(f"Overwriting existing file: {output_path}")
|
||
return output_path
|
||
|
||
elif action == OUTPUT_ACTION_INCREMENT:
|
||
base, ext = os.path.splitext(output_path)
|
||
counter = 1
|
||
while os.path.exists(f"{base}_{counter}{ext}"):
|
||
counter += 1
|
||
new_path = f"{base}_{counter}{ext}"
|
||
logging.info(f"Using incremented filename: {new_path}")
|
||
return new_path
|
||
|
||
elif action == OUTPUT_ACTION_BACKUP:
|
||
base, ext = os.path.splitext(output_path)
|
||
counter = 1
|
||
backup_path = f"{base}_backup_{counter}{ext}"
|
||
while os.path.exists(backup_path):
|
||
counter += 1
|
||
backup_path = f"{base}_backup_{counter}{ext}"
|
||
|
||
try:
|
||
logging.info(f"Backing up existing file to: {backup_path}")
|
||
shutil.copy2(output_path, backup_path)
|
||
logging.info(f"Backup successful: {output_path} -> {backup_path}")
|
||
except Exception as e:
|
||
logging.error(f"Backup failed: {e}")
|
||
raise
|
||
|
||
# Return original path - the existing file will be overwritten by SaveAs
|
||
return output_path
|
||
|
||
return output_path
|
||
|
||
|
||
def _get_column_mapping(mapping_config, mapping_column_name, source_type):
|
||
"""
|
||
Extract column mapping from Inclusions_Mapping or Organizations_Mapping.
|
||
|
||
The mapping column contains user-friendly 1-based indices (1, 2, 3, ...)
|
||
indicating which column in the Excel table each field should be placed.
|
||
These are converted to 0-based indices for internal use.
|
||
|
||
Args:
|
||
mapping_config: List of mapping config rows (dicts with field_name, etc.)
|
||
mapping_column_name: Name of the mapping column to extract (e.g., "MainReport_PatientsList")
|
||
source_type: "Inclusions" or "Organizations"
|
||
|
||
Returns:
|
||
Dictionary: {excel_column_index: source_field_name}
|
||
Example: {0: "Patient_Identification.Patient_Id", 1: "Inclusion.Status", ...}
|
||
Indices are 0-based (converted from 1-based user input)
|
||
or None if mapping_column not found
|
||
"""
|
||
if not mapping_config:
|
||
return None
|
||
|
||
column_mapping = {}
|
||
|
||
for row in mapping_config:
|
||
# Get the field name (source field in the JSON)
|
||
field_name = row.get("field_name")
|
||
if not field_name:
|
||
continue
|
||
|
||
# Get the mapping value from the specified column
|
||
mapping_value = row.get(mapping_column_name)
|
||
if mapping_value is None or mapping_value == "":
|
||
continue # Skip empty mappings
|
||
|
||
# Convert mapping_value to integer (1-based user-friendly index)
|
||
try:
|
||
user_col_index = int(mapping_value)
|
||
except (ValueError, TypeError):
|
||
logging.warning(f"Invalid column index '{mapping_value}' for field '{field_name}'")
|
||
continue
|
||
|
||
# Convert 1-based to 0-based index
|
||
excel_col_index = user_col_index - 1
|
||
|
||
if excel_col_index < 0:
|
||
logging.warning(f"Column index '{user_col_index}' for field '{field_name}' must be >= 1")
|
||
continue
|
||
|
||
# Store: excel_column_index -> field_name
|
||
# Field name needs to be qualified with group for Inclusions
|
||
# (extracted from the row's field_group if available)
|
||
if source_type == "Inclusions":
|
||
field_group = row.get("field_group", "")
|
||
if field_group:
|
||
full_field_name = f"{field_group}.{field_name}"
|
||
else:
|
||
full_field_name = field_name
|
||
else:
|
||
# For Organizations, field_name might already be qualified or standalone
|
||
full_field_name = field_name
|
||
|
||
column_mapping[excel_col_index] = full_field_name
|
||
|
||
return column_mapping if column_mapping else None
|
||
|
||
|
||
def _parse_range_dimensions(start_row, start_col, end_row, end_col, header_row_count=0):
|
||
"""
|
||
Shared utility: Calculate dimensions from cell coordinates.
|
||
|
||
Args:
|
||
start_row, start_col: Starting cell (1-based, after headers)
|
||
end_row, end_col: Ending cell (1-based)
|
||
header_row_count: Number of header rows (0 if none)
|
||
|
||
Returns:
|
||
Tuple of (width, total_height, data_height)
|
||
"""
|
||
width = end_col - start_col + 1
|
||
total_height = end_row - start_row + 1
|
||
data_height = total_height - header_row_count
|
||
return width, total_height, data_height
|
||
|
||
|
||
def _get_named_range_dimensions(workbook, range_name):
|
||
"""
|
||
Get dimensions of named range or table in workbook.
|
||
|
||
Args:
|
||
workbook: openpyxl Workbook object
|
||
range_name: Name of the named range or table
|
||
|
||
Returns:
|
||
Tuple of (sheet_name, start_cell, height, width) or None if not found
|
||
"""
|
||
# First check for defined named ranges (in openpyxl 3.x)
|
||
if range_name in workbook.defined_names:
|
||
defined_name = workbook.defined_names[range_name]
|
||
# Get the range reference from attr_text (e.g., "Sheet!$A$1:$B$10")
|
||
range_ref = defined_name.attr_text
|
||
|
||
# Parse: "SheetName!$A$1:$B$10"
|
||
if '!' in range_ref:
|
||
sheet_name, cell_range = range_ref.split('!')
|
||
# Remove quotes if present
|
||
sheet_name = sheet_name.strip("'\"")
|
||
# Remove $ signs for parsing
|
||
cell_range = cell_range.replace('$', '')
|
||
|
||
if sheet_name in workbook.sheetnames:
|
||
sheet = workbook[sheet_name]
|
||
|
||
# Parse cell range (e.g., "A1:B10" or single "A1")
|
||
if ':' in cell_range:
|
||
start_cell_str, end_cell_str = cell_range.split(':')
|
||
start_cell = sheet[start_cell_str]
|
||
end_cell = sheet[end_cell_str]
|
||
width = end_cell.column - start_cell.column + 1
|
||
height = end_cell.row - start_cell.row + 1
|
||
else:
|
||
start_cell = sheet[cell_range]
|
||
width = 1
|
||
height = 1
|
||
|
||
return sheet_name, start_cell, height, width
|
||
|
||
# Check if it's a Table (Excel table object, not just a named range)
|
||
for sheet_name in workbook.sheetnames:
|
||
sheet = workbook[sheet_name]
|
||
if hasattr(sheet, 'tables') and range_name in sheet.tables:
|
||
table = sheet.tables[range_name]
|
||
# Table has a 'ref' property with the range (e.g., "A4:F5")
|
||
# Excel tables can have header rows (default 1, but can be 0)
|
||
table_ref = table.ref
|
||
header_row_count = getattr(table, 'headerRowCount', 1) or 0 # 0 if None or False
|
||
|
||
# Parse cell range (e.g., "A4:F5")
|
||
if ':' in table_ref:
|
||
start_cell_str, end_cell_str = table_ref.split(':')
|
||
start_cell_temp = sheet[start_cell_str]
|
||
end_cell = sheet[end_cell_str]
|
||
width = end_cell.column - start_cell_temp.column + 1
|
||
total_height = end_cell.row - start_cell_temp.row + 1
|
||
|
||
# Skip header rows: point to first DATA row
|
||
if header_row_count > 0:
|
||
data_start_row = start_cell_temp.row + header_row_count
|
||
start_cell = sheet.cell(row=data_start_row, column=start_cell_temp.column)
|
||
else:
|
||
start_cell = start_cell_temp
|
||
|
||
# Calculate data row count (total - headers)
|
||
height = total_height - header_row_count
|
||
else:
|
||
start_cell = sheet[table_ref]
|
||
width = 1
|
||
height = 1
|
||
|
||
return sheet_name, start_cell, height, width
|
||
|
||
return None
|
||
|
||
# OBSOLETE: _update_named_range_height() removed
|
||
# This function was only called by the old openpyxl-based _process_sheet() implementation
|
||
# xlwings uses table.Resize() via COM API instead, which is more reliable
|
||
# See PHASE 2 migration notes for details
|
||
|
||
|
||
# OBSOLETE: _recalculate_workbook() removed - xlwings handles formula recalculation automatically
|
||
# When using xlwings with wb.save(), Excel automatically recalculates all formulas
|
||
|
||
# OBSOLETE: _process_sheet() removed - openpyxl implementation migrated to xlwings
|
||
# All sheet processing is now handled by _process_sheet_xlwings() using xlwings library
|
||
# This eliminates code duplication and provides better preservation of workbook structure
|
||
|
||
|
||
def _get_table_dimensions_xlwings(workbook_xw, range_name):
|
||
"""
|
||
Get dimensions of an Excel table OR named range using xlwings COM API.
|
||
|
||
First searches for ListObjects (structured tables), then falls back to
|
||
simple named ranges if no table is found.
|
||
|
||
Args:
|
||
workbook_xw: xlwings Book object (already open)
|
||
range_name: Name of the Excel table (ListObject) or named range
|
||
|
||
Returns:
|
||
Tuple (sheet_name, start_cell, height, width, header_row_count, target_type) or None if not found
|
||
- start_cell: Points to FIRST DATA ROW (after headers for tables, first row for named ranges)
|
||
- height: Number of DATA ROWS (excluding headers for tables)
|
||
- header_row_count: Number of header rows (0 for named ranges, 0 or 1 for tables)
|
||
- target_type: TARGET_TYPE_TABLE or TARGET_TYPE_NAMED_RANGE
|
||
|
||
Note:
|
||
- For tables with headers: start_cell points to first data row (after header)
|
||
- For tables without headers: start_cell points to first row of table
|
||
- For named ranges: start_cell points to first row (no headers assumed)
|
||
"""
|
||
# Helper class to mimic openpyxl Cell behavior
|
||
class CellRef:
|
||
def __init__(self, row, column):
|
||
self.row = row
|
||
self.column = column
|
||
@property
|
||
def coordinate(self):
|
||
col_letter = get_column_letter(self.column)
|
||
return f"{col_letter}{self.row}"
|
||
|
||
# === PRIORITY 1: Check if it's a table (ListObject) ===
|
||
# Excel tables are more reliable than plain named ranges with xlwings
|
||
for sheet in workbook_xw.sheets:
|
||
sheet_api = sheet.api
|
||
|
||
# Try to get the table count - if this fails, the sheet has no ListObjects property
|
||
try:
|
||
table_count = sheet_api.ListObjects.Count
|
||
except:
|
||
# Sheet doesn't support ListObjects or has none
|
||
continue
|
||
|
||
# If no tables in this sheet, skip
|
||
if table_count == 0:
|
||
continue
|
||
|
||
# Iterate through tables by index
|
||
for i in range(1, table_count + 1): # COM indexing starts at 1
|
||
try:
|
||
xl_table = sheet_api.ListObjects.Item(i)
|
||
table_name = xl_table.Name
|
||
if table_name == range_name:
|
||
# Found a table - get its range
|
||
xl_range = xl_table.Range
|
||
sheet_name = sheet.name
|
||
total_rows = xl_range.Rows.Count
|
||
total_cols = xl_range.Columns.Count
|
||
start_row = xl_range.Row
|
||
start_col = xl_range.Column
|
||
|
||
# Get header row count from the table
|
||
# In COM API, ListObject has ShowHeaders property (boolean) and HeaderRowRange
|
||
# ShowHeaders: True if table has header row, False if not
|
||
try:
|
||
has_headers = xl_table.ShowHeaders
|
||
header_row_count = 1 if has_headers else 0
|
||
except:
|
||
# If ShowHeaders not accessible, try HeaderRowRange
|
||
try:
|
||
header_range = xl_table.HeaderRowRange
|
||
header_row_count = 1 if header_range is not None else 0
|
||
except:
|
||
# Fallback: assume headers exist (most common case)
|
||
header_row_count = 1
|
||
|
||
# Data height = total height - header rows
|
||
data_height = total_rows - header_row_count
|
||
|
||
# start_cell points to the FIRST DATA ROW (after headers)
|
||
# If table has headers: skip them. If no headers: start at table start
|
||
if header_row_count > 0:
|
||
data_start_row = start_row + header_row_count
|
||
else:
|
||
data_start_row = start_row
|
||
start_cell = CellRef(data_start_row, start_col)
|
||
|
||
logging.info(f"[TABLE FOUND] Located table '{range_name}' at {sheet_name}!{start_cell.coordinate} "
|
||
f"(data rows: {data_height}, headers: {header_row_count}, total width: {total_cols})")
|
||
return sheet_name, start_cell, data_height, total_cols, header_row_count, TARGET_TYPE_TABLE
|
||
except Exception as e:
|
||
# Error accessing this specific table, skip it
|
||
logging.warning(f"Error accessing table {i} in '{sheet.name}': {type(e).__name__}")
|
||
|
||
# === PRIORITY 2: Check if it's a named range ===
|
||
# Named ranges don't have headers - data starts at first row
|
||
try:
|
||
if range_name in workbook_xw.names:
|
||
named_range = workbook_xw.names[range_name]
|
||
target_range = named_range.refers_to_range
|
||
|
||
sheet_name = target_range.sheet.name
|
||
start_row = target_range.row
|
||
start_col = target_range.column
|
||
total_rows = target_range.rows.count
|
||
total_cols = target_range.columns.count
|
||
|
||
# Named ranges have no headers - all rows are data rows
|
||
header_row_count = 0
|
||
data_height = total_rows
|
||
|
||
start_cell = CellRef(start_row, start_col)
|
||
|
||
logging.info(f"[NAMED RANGE FOUND] Located named range '{range_name}' at {sheet_name}!{start_cell.coordinate} "
|
||
f"(data rows: {data_height}, no headers, total width: {total_cols})")
|
||
return sheet_name, start_cell, data_height, total_cols, header_row_count, TARGET_TYPE_NAMED_RANGE
|
||
except Exception as e:
|
||
logging.warning(f"Error accessing named range '{range_name}': {type(e).__name__}: {str(e)}")
|
||
|
||
# Range/table not found
|
||
logging.warning(f"Named range or table '{range_name}' not found in workbook")
|
||
return None
|
||
|
||
|
||
# ============================================================================
|
||
# HELPER FUNCTIONS FOR SHEET PROCESSING (extracted from _process_sheet_xlwings)
|
||
# ============================================================================
|
||
|
||
def _fill_variable_in_sheet(workbook_xw, target_name, source_template, template_vars, workbook_context=""):
|
||
"""
|
||
Fill a single variable cell with evaluated template value.
|
||
|
||
Args:
|
||
workbook_xw: xlwings Book object
|
||
target_name: Name of the target named range (single cell)
|
||
source_template: Template string with {variables}
|
||
template_vars: Dictionary of variable values
|
||
workbook_context: Context string for logging
|
||
|
||
Returns:
|
||
Boolean True if successful
|
||
"""
|
||
try:
|
||
# Evaluate template string
|
||
cell_value = source_template.format(**template_vars)
|
||
except KeyError as e:
|
||
logging.warning(f"Unknown variable in template: {e}")
|
||
return False
|
||
|
||
# Write to named cell using xlwings
|
||
try:
|
||
named_range = workbook_xw.names[target_name]
|
||
target_range = named_range.refers_to_range
|
||
target_range.value = cell_value
|
||
logging.info(f"Set variable '{target_name}' to '{cell_value}'")
|
||
return True
|
||
except KeyError:
|
||
logging.warning(f"Named range '{target_name}' not found in {workbook_context}")
|
||
return False
|
||
except Exception as e:
|
||
logging.warning(f"Error setting variable '{target_name}' in {workbook_context}: {e}")
|
||
return False
|
||
|
||
|
||
def _prepare_table_data(source_type, source, sheet_config, inclusions_data, organizations_data,
|
||
inclusions_mapping_config, organizations_mapping_config, target_name):
|
||
"""
|
||
Prepare table data: select source, apply filter/sort, get column mapping.
|
||
|
||
Args:
|
||
source_type: Type of source (Inclusions or Organizations)
|
||
source: Source identifier (mapping name)
|
||
sheet_config: Sheet configuration dictionary
|
||
inclusions_data: Inclusions data list
|
||
organizations_data: Organizations data list
|
||
inclusions_mapping_config: Inclusions mapping config
|
||
organizations_mapping_config: Organizations mapping config
|
||
target_name: Target range name (for logging)
|
||
|
||
Returns:
|
||
Tuple of (sorted_data, column_mapping) or (None, None) if error
|
||
"""
|
||
# Select source data and mapping config
|
||
if source_type == SOURCE_TYPE_INCLUSIONS:
|
||
source_data = inclusions_data
|
||
mapping_config = inclusions_mapping_config
|
||
else:
|
||
source_data = organizations_data
|
||
mapping_config = organizations_mapping_config
|
||
|
||
# Apply filter and sort
|
||
filter_condition = sheet_config.get("filter_condition")
|
||
sort_keys = sheet_config.get("sort_keys")
|
||
|
||
filtered_data = [item for item in source_data if _apply_filter(item, filter_condition)]
|
||
sorted_data = _apply_sort(filtered_data, sort_keys)
|
||
|
||
# Get column mapping
|
||
column_mapping = _get_column_mapping(mapping_config, source, source_type)
|
||
if not column_mapping:
|
||
logging.warning(f"Column mapping '{source}' not found or empty for {target_name}")
|
||
return None, None
|
||
|
||
return sorted_data, column_mapping
|
||
|
||
|
||
def _resize_table_range(workbook_xw, sheet_xw, target_name, start_cell, max_col, start_row, num_data_rows, header_row_count=0, target_type=TARGET_TYPE_TABLE):
|
||
"""
|
||
Resize Excel table (ListObject) or named range to match data dimensions.
|
||
|
||
For Tables (ListObjects): Uses ListObject.Resize() COM API
|
||
For Named Ranges: Redefines the named range via Name.RefersTo property
|
||
|
||
Args:
|
||
workbook_xw: xlwings Book object (needed for named range resize)
|
||
sheet_xw: xlwings Sheet object
|
||
target_name: Name of the table/named range
|
||
start_cell: Starting cell (CellRef) - points to FIRST DATA ROW (after headers for tables)
|
||
max_col: Maximum column (1-based)
|
||
start_row: Starting row (1-based, first data row)
|
||
num_data_rows: Number of data rows
|
||
header_row_count: Number of header rows in the table (0 for named ranges)
|
||
target_type: TARGET_TYPE_TABLE or TARGET_TYPE_NAMED_RANGE
|
||
|
||
Returns:
|
||
None (logging handles errors)
|
||
"""
|
||
if num_data_rows <= 1:
|
||
return
|
||
|
||
try:
|
||
# Calculate the last data row
|
||
last_data_row = start_row + num_data_rows - 1
|
||
|
||
if target_type == TARGET_TYPE_TABLE:
|
||
# === TABLE (ListObject) RESIZE ===
|
||
excel_sheet = sheet_xw.api
|
||
|
||
# Find the ListObject (Table) by name
|
||
for list_obj in excel_sheet.ListObjects:
|
||
if list_obj.Name == target_name:
|
||
# If header_row_count not provided (legacy fallback), get it from the table
|
||
if header_row_count == 0:
|
||
try:
|
||
has_headers = list_obj.ShowHeaders
|
||
header_row_count = 1 if has_headers else 0
|
||
except:
|
||
header_row_count = 1
|
||
|
||
# For resize, include header rows if they exist
|
||
if header_row_count > 0:
|
||
first_row = start_row - header_row_count
|
||
else:
|
||
first_row = start_row
|
||
|
||
resize_range_str = f"{get_column_letter(start_cell.column)}{first_row}:{get_column_letter(max_col)}{last_data_row}"
|
||
|
||
# Perform resize via ListObject.Resize()
|
||
new_range = excel_sheet.Range(resize_range_str)
|
||
list_obj.Resize(new_range)
|
||
logging.info(f"Resized table '{target_name}' to {resize_range_str} (header_rows={header_row_count})")
|
||
break
|
||
|
||
elif target_type == TARGET_TYPE_NAMED_RANGE:
|
||
# === NAMED RANGE RESIZE ===
|
||
# Redefine the named range to cover all data rows
|
||
# Named ranges have no headers, so start_row is the first row
|
||
first_col_letter = get_column_letter(start_cell.column)
|
||
last_col_letter = get_column_letter(max_col)
|
||
|
||
# Build the range address in A1 style
|
||
range_address = f"${first_col_letter}${start_row}:${last_col_letter}${last_data_row}"
|
||
|
||
# Get the actual Range object from the sheet and assign it to the Name
|
||
# This avoids R1C1/A1 format issues by using the Range object directly
|
||
new_range = sheet_xw.range(range_address)
|
||
workbook_xw.api.Names(target_name).RefersTo = new_range.api
|
||
logging.info(f"Resized named range '{target_name}' to {sheet_xw.name}!{range_address}")
|
||
|
||
except Exception as e:
|
||
logging.warning(f"Resize skipped for {target_name} ({target_type}): {e}")
|
||
|
||
|
||
def _duplicate_template_row(sheet_xw, start_cell, max_col, start_row, num_data_rows, target_name, workbook_context=""):
|
||
"""
|
||
Duplicate template row to all data rows via copy-paste.
|
||
|
||
Args:
|
||
sheet_xw: xlwings Sheet object
|
||
start_cell: Starting cell (CellRef)
|
||
max_col: Maximum column (1-based)
|
||
start_row: Starting row (1-based)
|
||
num_data_rows: Number of data rows
|
||
target_name: Target range name (for logging)
|
||
workbook_context: Context string for logging
|
||
|
||
Returns:
|
||
None (logging handles errors)
|
||
"""
|
||
if num_data_rows <= 1:
|
||
return
|
||
|
||
try:
|
||
# Replicate template row to all data rows in a single operation
|
||
template_range_str = f"{get_column_letter(start_cell.column)}{start_row}:{get_column_letter(max_col)}{start_row}"
|
||
last_data_row = start_row + num_data_rows - 1
|
||
full_target_range_str = f"{get_column_letter(start_cell.column)}{start_row}:{get_column_letter(max_col)}{last_data_row}"
|
||
|
||
# Copy template row
|
||
sheet_xw.range(template_range_str).copy()
|
||
# Paste to entire range - Excel automatically replicates the template row
|
||
sheet_xw.range(full_target_range_str).paste()
|
||
|
||
# CRITICAL: Deselect after paste to avoid COM layer lock
|
||
# After bulk paste on large ranges (85k+ cells), Excel's COM layer becomes saturated
|
||
# and leaves a massive selection active. This prevents subsequent Range.Select() calls.
|
||
# Solution: Reset Excel's selection state by switching sheets and back, then select A1.
|
||
try:
|
||
from time import sleep
|
||
logging.info(f"Deselecting range after bulk paste for {target_name}...")
|
||
|
||
# Switch to another sheet to force Excel to reset selection state
|
||
other_sheets = [s for s in sheet_xw.book.sheets if s.name != sheet_xw.name]
|
||
if other_sheets:
|
||
other_sheets[0].activate()
|
||
sleep(0.1)
|
||
|
||
# Reactivate our sheet - Excel resets selection management when returning
|
||
sheet_xw.activate()
|
||
|
||
# Select A1 - COM should manage this easily now
|
||
sheet_xw.api.Range("$A$1").Select()
|
||
logging.info(f"Successfully deselected after bulk paste for {target_name} (sheet reactivation)")
|
||
|
||
except Exception as e:
|
||
# Deselection is non-critical, log and continue if it fails
|
||
logging.warning(f"Deselection after paste failed for {target_name}: {type(e).__name__}: {str(e)}")
|
||
|
||
except Exception as e:
|
||
logging.warning(f"Template duplication failed for {target_name} in {workbook_context}: {e}")
|
||
|
||
|
||
def _fill_table_with_data(sheet_xw, start_cell, start_row, start_col, sorted_data, column_mapping,
|
||
value_replacement, target_name, sheet_name):
|
||
"""
|
||
Fill table with data: group contiguous columns and transfer via bulk 2D arrays.
|
||
|
||
Args:
|
||
sheet_xw: xlwings Sheet object
|
||
start_cell: Starting cell (CellRef)
|
||
start_row: Starting row (1-based)
|
||
start_col: Starting column (1-based)
|
||
sorted_data: Sorted list of data items
|
||
column_mapping: Dict mapping Excel column indices to source field paths
|
||
value_replacement: Value replacement configuration (or None)
|
||
target_name: Target range name (for logging)
|
||
sheet_name: Sheet name (for logging)
|
||
|
||
Returns:
|
||
None (logging handles errors and success)
|
||
"""
|
||
try:
|
||
# === Prepare column mapping and group contiguous columns ===
|
||
col_order = sorted(column_mapping.keys())
|
||
|
||
# Group contiguous columns for optimal bulk update
|
||
contiguous_groups = []
|
||
if col_order:
|
||
current_group = [col_order[0]]
|
||
for i in range(1, len(col_order)):
|
||
if col_order[i] == col_order[i-1] + 1:
|
||
current_group.append(col_order[i])
|
||
else:
|
||
contiguous_groups.append(current_group)
|
||
current_group = [col_order[i]]
|
||
contiguous_groups.append(current_group)
|
||
|
||
# === Update contiguous column groups (bulk 2D transfer) ===
|
||
for col_group in contiguous_groups:
|
||
# Build 2D array for this group: rows × columns
|
||
data_2d = []
|
||
for item in sorted_data:
|
||
row_values = []
|
||
for excel_col_index in col_group:
|
||
source_field_path = column_mapping[excel_col_index]
|
||
# Get value from source item
|
||
value = get_nested_value(item, source_field_path.split("."))
|
||
|
||
# Apply value replacement
|
||
if value_replacement:
|
||
value = _apply_value_replacement(value, value_replacement)
|
||
|
||
row_values.append(value)
|
||
data_2d.append(row_values)
|
||
|
||
# Transfer entire group to Excel in ONE operation
|
||
first_col_in_group = start_col + col_group[0]
|
||
first_col_letter = get_column_letter(first_col_in_group)
|
||
target_range_start = f"{first_col_letter}{start_row}"
|
||
|
||
# Write 2D array at once (xlwings automatically maps rows × columns)
|
||
sheet_xw.range(target_range_start).value = data_2d
|
||
|
||
# Logging
|
||
num_data_rows = len(sorted_data)
|
||
logging.info(f"Filled table {target_name} with {num_data_rows} rows "
|
||
f"at {sheet_name}!{start_cell.coordinate} "
|
||
f"(bulk duplication + {len(contiguous_groups)} contiguous group(s))")
|
||
except Exception as e:
|
||
logging.error(f"Error filling table data for {target_name}: {e}")
|
||
logging.error(f"Traceback: {traceback.format_exc()}")
|
||
|
||
|
||
def _process_sheet_xlwings(workbook_xw, sheet_config, inclusions_data, organizations_data,
|
||
inclusions_mapping_config, organizations_mapping_config, template_vars,
|
||
workbook_context=""):
|
||
"""
|
||
Process a single sheet using xlwings (hybrid approach).
|
||
|
||
Delegates to specialized helpers to maintain clarity and testability.
|
||
|
||
Args:
|
||
workbook_xw: xlwings Book object
|
||
sheet_config: Sheet configuration dict
|
||
inclusions_data: List of inclusion dictionaries
|
||
organizations_data: List of organization dictionaries
|
||
inclusions_mapping_config: Inclusions mapping config (for column mapping)
|
||
organizations_mapping_config: Organizations mapping config
|
||
template_vars: Dictionary of variables for template evaluation
|
||
workbook_context: Context string identifying the workbook (for logging)
|
||
|
||
Returns:
|
||
Boolean True if successful
|
||
"""
|
||
source_type = sheet_config.get("source_type")
|
||
source = sheet_config.get("source")
|
||
target_name = sheet_config.get("target_name")
|
||
value_replacement = sheet_config.get("value_replacement")
|
||
|
||
# === Variable sources: single cell fill ===
|
||
if source_type == SOURCE_TYPE_VARIABLE:
|
||
return _fill_variable_in_sheet(workbook_xw, target_name, source, template_vars, workbook_context)
|
||
|
||
# === Table sources: bulk data filling ===
|
||
if source_type not in [SOURCE_TYPE_INCLUSIONS, SOURCE_TYPE_ORGANIZATIONS]:
|
||
return False
|
||
|
||
# Prepare data: filter, sort, get column mapping
|
||
sorted_data, column_mapping = _prepare_table_data(
|
||
source_type, source, sheet_config, inclusions_data, organizations_data,
|
||
inclusions_mapping_config, organizations_mapping_config, target_name
|
||
)
|
||
if sorted_data is None or column_mapping is None:
|
||
return False
|
||
|
||
# Get table/named range dimensions from xlwings
|
||
try:
|
||
table_dims = _get_table_dimensions_xlwings(workbook_xw, target_name)
|
||
if not table_dims:
|
||
logging.warning(f"Target '{target_name}' not found (neither table nor named range)")
|
||
return False
|
||
|
||
sheet_name, start_cell, table_height, table_width, header_row_count, target_type = table_dims
|
||
sheet_xw = workbook_xw.sheets[sheet_name]
|
||
start_row = start_cell.row
|
||
start_col = start_cell.column
|
||
max_col = start_col + table_width - 1
|
||
num_data_rows = len(sorted_data)
|
||
|
||
# === Bulk operations for data filling ===
|
||
if sorted_data:
|
||
# STEP 0: Resize table/named range to match data dimensions
|
||
_resize_table_range(workbook_xw, sheet_xw, target_name, start_cell, max_col, start_row, num_data_rows, header_row_count, target_type)
|
||
|
||
# STEP 1: Duplicate template row to all data rows
|
||
_duplicate_template_row(sheet_xw, start_cell, max_col, start_row, num_data_rows, target_name, workbook_context)
|
||
|
||
# STEP 2-3: Fill with data (grouped contiguous columns)
|
||
_fill_table_with_data(sheet_xw, start_cell, start_row, start_col, sorted_data,
|
||
column_mapping, value_replacement, target_name, sheet_name)
|
||
else:
|
||
# No data - template row stays empty
|
||
logging.info(f"No data for target '{target_name}' ({target_type}), leaving template row empty")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logging.warning(f"Error processing target '{target_name}': {e}")
|
||
logging.error(f"Traceback: {traceback.format_exc()}")
|
||
return False
|
||
|
||
|
||
# ============================================================================
|
||
# COMPREHENSIVE EXCEL EXPORT ORCHESTRATION (for main script)
|
||
# ============================================================================
|
||
|
||
def prepare_excel_export(inclusions_mapping_config, organizations_mapping_config):
|
||
"""
|
||
Validate Excel export configuration (no data loading).
|
||
|
||
This function has a SINGLE responsibility: validate the Excel export CONFIG.
|
||
It does NOT load production data (JSONs) - that is the responsibility of
|
||
the execution functions (run_normal_mode_export, export_excel_only).
|
||
|
||
IMPORTANT: Mapping configs MUST be provided by the caller. The caller is responsible for:
|
||
1. Loading mapping configs from Excel (e.g., via load_inclusions_mapping_config())
|
||
2. Passing them to this function for config validation
|
||
|
||
This follows the dependency injection pattern: the caller provides dependencies,
|
||
this function validates config. This ensures:
|
||
- Clear responsibility separation: validation ≠ data loading
|
||
- Early CONFIG validation (BEFORE data collection in NORMAL MODE)
|
||
- Late DATA loading (AFTER collection, only when needed for execution)
|
||
|
||
Args:
|
||
inclusions_mapping_config: Loaded inclusions mapping (required, non-empty list/dict)
|
||
organizations_mapping_config: Loaded organizations mapping (required, non-empty list/dict)
|
||
|
||
Returns:
|
||
Tuple of (excel_config, has_critical_errors, error_messages)
|
||
- excel_config: Tuple of (workbooks_config, sheets_config) or None if errors
|
||
- has_critical_errors: Boolean True if validation found critical errors
|
||
- error_messages: List of error message strings
|
||
|
||
Note:
|
||
JSONs are loaded separately by execution functions:
|
||
- NORMAL MODE: run_normal_mode_export() loads JSONs AFTER data collection
|
||
- --EXCEL-ONLY: export_excel_only() loads JSONs before execution
|
||
"""
|
||
|
||
error_messages = []
|
||
excel_config = None
|
||
has_critical_errors = False
|
||
|
||
# === STEP 1: Validate mapping configurations are provided ===
|
||
# Caller is responsible for loading these configs before calling this function
|
||
if not inclusions_mapping_config or (isinstance(inclusions_mapping_config, (list, dict)) and len(inclusions_mapping_config) == 0):
|
||
error_msg = "Inclusions mapping configuration must be provided and non-empty"
|
||
error_messages.append(error_msg)
|
||
logging.error(error_msg)
|
||
if console:
|
||
console.print(f"[bold red]✗ {error_msg}[/bold red]")
|
||
has_critical_errors = True
|
||
return excel_config, has_critical_errors, error_messages
|
||
|
||
if not organizations_mapping_config or (isinstance(organizations_mapping_config, (list, dict)) and len(organizations_mapping_config) == 0):
|
||
error_msg = "Organizations mapping configuration must be provided and non-empty"
|
||
error_messages.append(error_msg)
|
||
logging.error(error_msg)
|
||
if console:
|
||
console.print(f"[bold red]✗ {error_msg}[/bold red]")
|
||
has_critical_errors = True
|
||
return excel_config, has_critical_errors, error_messages
|
||
|
||
# === STEP 2: Load Excel config ===
|
||
logging.info("Loading Excel export configuration...")
|
||
|
||
excel_workbooks_config, excel_sheets_config, has_config_error, config_error_messages = load_excel_export_config(console)
|
||
if has_config_error:
|
||
error_msg = "Critical errors in Excel Export Config"
|
||
error_messages.append(error_msg)
|
||
error_messages.extend(config_error_messages)
|
||
has_critical_errors = True
|
||
logging.warning(error_msg)
|
||
if console:
|
||
console.print(f"[bold red]✗ {error_msg}[/bold red]")
|
||
excel_config = (excel_workbooks_config, excel_sheets_config)
|
||
return excel_config, has_critical_errors, error_messages
|
||
|
||
if not excel_workbooks_config or not excel_sheets_config:
|
||
error_msg = "Excel export configuration is empty"
|
||
error_messages.append(error_msg)
|
||
logging.warning(error_msg)
|
||
if console:
|
||
console.print(f"[bold red]✗ {error_msg}[/bold red]")
|
||
excel_config = (excel_workbooks_config, excel_sheets_config)
|
||
return excel_config, has_critical_errors, error_messages
|
||
|
||
# Package config into tuple for downstream functions
|
||
excel_config = (excel_workbooks_config, excel_sheets_config)
|
||
|
||
# === STEP 3: Validate Excel config ===
|
||
logging.info("Validating Excel export configuration...")
|
||
|
||
has_critical_errors, validation_errors = validate_excel_config(
|
||
excel_config,
|
||
console,
|
||
inclusions_mapping_config or [],
|
||
organizations_mapping_config or {}
|
||
)
|
||
|
||
if validation_errors:
|
||
error_messages.extend(validation_errors)
|
||
if has_critical_errors and console:
|
||
console.print("[bold red]✗ Critical validation errors found[/bold red]")
|
||
else:
|
||
logging.info("✓ Excel export configuration validated successfully")
|
||
|
||
return excel_config, has_critical_errors, error_messages
|
||
|
||
|
||
# ============================================================================
|
||
# HIGH-LEVEL ORCHESTRATION FUNCTIONS (for main script integration)
|
||
# ============================================================================
|
||
|
||
def export_excel_only(sys_argv,
|
||
inclusions_filename=None, organizations_filename=None,
|
||
inclusions_mapping_config=None, organizations_mapping_config=None):
|
||
"""
|
||
Orchestrates EXCEL_ONLY mode - complete end-to-end Excel export workflow.
|
||
|
||
This function completely encapsulates the --excel_only mode:
|
||
1. Validates Excel configuration
|
||
2. Loads JSON data files (must exist)
|
||
3. Executes Excel export with error handling
|
||
4. Displays user-friendly messages and confirmations
|
||
|
||
IMPORTANT: The caller (main script) is responsible for loading mapping configs
|
||
before calling this function. This ensures consistent config instances across
|
||
the application and follows the dependency injection pattern.
|
||
|
||
This follows the same pattern as run_check_only_mode() from quality_checks module.
|
||
|
||
Args:
|
||
sys_argv: sys.argv from main script (for potential future CLI arg parsing)
|
||
inclusions_filename: Name of inclusions JSON file (e.g., "endobest_inclusions.json")
|
||
organizations_filename: Name of organizations JSON file (e.g., "endobest_organizations.json")
|
||
inclusions_mapping_config: Loaded inclusions mapping configuration (REQUIRED - caller must load)
|
||
organizations_mapping_config: Loaded organizations mapping configuration (REQUIRED - caller must load)
|
||
"""
|
||
global console
|
||
|
||
if not inclusions_filename:
|
||
inclusions_filename = INCLUSIONS_FILE_NAME
|
||
if not organizations_filename:
|
||
organizations_filename = ORGANIZATIONS_FILE_NAME
|
||
|
||
print()
|
||
console.print("[bold cyan]═══ EXCEL ONLY MODE ═══[/bold cyan]\n")
|
||
|
||
# Step 1: Validate Excel configuration (no data loading)
|
||
logging.info("EXCEL ONLY MODE: Validating Excel configuration")
|
||
excel_config, has_config_critical, error_messages = \
|
||
prepare_excel_export(inclusions_mapping_config, organizations_mapping_config)
|
||
|
||
# Step 2: Handle critical configuration errors
|
||
if has_config_critical:
|
||
print()
|
||
console.print("[bold red]⚠ CRITICAL CONFIGURATION ERROR(S) DETECTED[/bold red]")
|
||
console.print("[bold red]────────────────────────────────────[/bold red]")
|
||
for idx, error_msg in enumerate(error_messages, 1):
|
||
console.print(f"[bold red]Error {idx}: {error_msg}[/bold red]")
|
||
console.print("[bold red]────────────────────────────────────[/bold red]")
|
||
print()
|
||
try:
|
||
import questionary
|
||
answer = questionary.confirm(
|
||
"⚠ Continue anyway?",
|
||
default=False
|
||
).ask()
|
||
if not answer:
|
||
console.print("[bold red]Aborted by user[/bold red]")
|
||
logging.warning("EXCEL ONLY MODE: Aborted by user due to critical errors")
|
||
return
|
||
except ImportError:
|
||
console.print("[bold yellow]⚠ questionary not available for confirmation[/bold yellow]")
|
||
console.print("[bold yellow]Proceeding with export despite critical errors[/bold yellow]")
|
||
|
||
# Step 3: Load JSON data files (must exist in --excel-only mode)
|
||
logging.info("EXCEL ONLY MODE: Loading data files")
|
||
inclusions_data = _load_json_file_internal(inclusions_filename)
|
||
organizations_data = _load_json_file_internal(organizations_filename)
|
||
|
||
if inclusions_data is None or organizations_data is None:
|
||
console.print("[bold red]✗ Error: Could not load data files for Excel export[/bold red]")
|
||
logging.error("EXCEL ONLY MODE: Data file loading failed")
|
||
return
|
||
|
||
# Step 4: Execute Excel export (direct call to export_to_excel, console is global)
|
||
print()
|
||
console.print("[bold cyan]═══ Excel Export ═══[/bold cyan]\n")
|
||
logging.info("EXCEL ONLY MODE: Executing export")
|
||
|
||
if excel_config:
|
||
try:
|
||
logging.info(f"Starting Excel export: {len(inclusions_data)} inclusions, {len(organizations_data)} organizations")
|
||
|
||
success, error_count = export_to_excel(
|
||
inclusions_data,
|
||
organizations_data,
|
||
excel_config,
|
||
inclusions_mapping_config=inclusions_mapping_config,
|
||
organizations_mapping_config=organizations_mapping_config
|
||
)
|
||
|
||
if success:
|
||
logging.info("EXCEL ONLY MODE: Export completed successfully")
|
||
else:
|
||
logging.warning(f"EXCEL ONLY MODE: Export completed with {error_count} error(s)")
|
||
except Exception as e:
|
||
error_msg = f"Excel export failed: {str(e)}"
|
||
logging.error(f"EXCEL ONLY MODE: {error_msg}\n{traceback.format_exc()}")
|
||
console.print(f"[bold red]✗ {error_msg}[/bold red]\n")
|
||
else:
|
||
console.print("[bold red]✗ Could not load Excel configuration[/bold red]\n")
|
||
logging.error("EXCEL ONLY MODE: Excel config missing")
|
||
|
||
|
||
def run_normal_mode_export(excel_enabled, excel_config,
|
||
inclusions_mapping_config=None, organizations_mapping_config=None):
|
||
"""
|
||
Orchestrates Excel export during normal mode execution.
|
||
|
||
This function encapsulates the Excel export step that runs after inclusions and organizations
|
||
have been collected and written to JSON files. It handles:
|
||
- Loading JSONs from filesystem (ensures fresh data consistency)
|
||
- Executing Excel export with comprehensive error handling
|
||
- Displaying results to user
|
||
|
||
This is called from the normal workflow after data collection completes.
|
||
|
||
Args:
|
||
excel_enabled: Boolean indicating if Excel export is enabled
|
||
excel_config: Tuple of (workbooks_config, sheets_config) or None
|
||
inclusions_mapping_config: Loaded inclusions mapping configuration (optional)
|
||
organizations_mapping_config: Loaded organizations mapping configuration (optional)
|
||
|
||
Note:
|
||
This function loads JSON files from the filesystem (which were written
|
||
during the data collection phase) to ensure consistency.
|
||
|
||
Returns:
|
||
Tuple of (export_succeeded, error_message)
|
||
- export_succeeded: Boolean True if export completed successfully (or skipped)
|
||
- error_message: String with error details (empty if success=True or skipped)
|
||
"""
|
||
global console
|
||
|
||
# Only proceed if export is enabled and config is available
|
||
if not excel_enabled or not excel_config:
|
||
logging.info("Excel export not enabled or config missing, skipping")
|
||
return True, "" # FIX BUG #3: Return True when export is intentionally skipped (not an error)
|
||
|
||
print()
|
||
console.print("[bold cyan]═══ Excel Export ═══[/bold cyan]\n")
|
||
logging.info("NORMAL MODE: Starting Excel export")
|
||
|
||
try:
|
||
# Load JSONs from filesystem to ensure data consistency with what was written
|
||
# Use constants imported from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH)
|
||
inclusions_from_fs = _load_json_file_internal(INCLUSIONS_FILE_NAME)
|
||
organizations_from_fs = _load_json_file_internal(ORGANIZATIONS_FILE_NAME)
|
||
|
||
if inclusions_from_fs is None or organizations_from_fs is None:
|
||
error_msg = "Could not load data files for Excel export"
|
||
logging.error(f"NORMAL MODE: {error_msg}")
|
||
console.print(f"[bold red]✗ {error_msg}[/bold red]\n")
|
||
return False, error_msg
|
||
|
||
# Execute the export (direct call to export_to_excel, console is global)
|
||
logging.info(f"Starting Excel export: {len(inclusions_from_fs)} inclusions, {len(organizations_from_fs)} organizations")
|
||
|
||
success, error_count = export_to_excel(
|
||
inclusions_from_fs,
|
||
organizations_from_fs,
|
||
excel_config,
|
||
inclusions_mapping_config=inclusions_mapping_config,
|
||
organizations_mapping_config=organizations_mapping_config
|
||
)
|
||
|
||
if success:
|
||
logging.info("NORMAL MODE: Excel export completed successfully")
|
||
return True, ""
|
||
else:
|
||
error_msg = f"Excel export completed with {error_count} error(s)"
|
||
logging.warning(f"NORMAL MODE: {error_msg}")
|
||
return False, error_msg
|
||
|
||
except Exception as e:
|
||
error_msg = f"Unexpected error during Excel export: {str(e)}"
|
||
logging.error(f"NORMAL MODE: {error_msg}\n{traceback.format_exc()}")
|
||
console.print(f"[bold red]✗ {error_msg}[/bold red]\n")
|
||
return False, error_msg
|
||
|
||
|
||
def _load_json_file_internal(filename):
|
||
"""
|
||
Internal helper to load JSON file.
|
||
|
||
Args:
|
||
filename: Path to JSON file
|
||
|
||
Returns:
|
||
Parsed JSON data or None if file doesn't exist or can't be parsed
|
||
"""
|
||
try:
|
||
if not os.path.exists(filename):
|
||
logging.warning(f"JSON file not found: {filename}")
|
||
return None
|
||
|
||
with open(filename, 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
logging.info(f"Loaded {filename}: {len(data) if isinstance(data, list) else 'data'}")
|
||
return data
|
||
|
||
except Exception as e:
|
||
logging.error(f"Error loading {filename}: {str(e)}")
|
||
return None
|