Files
EB_Dashboard/eb_dashboard_excel_export.py

2095 lines
90 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Endobest Dashboard - Excel Export Module
This module handles generation of Excel workbooks from Inclusions and Organizations data.
Fully configurable via external Excel configuration file (Endobest_Dashboard_Config.xlsx).
Features:
- Config-driven workbook generation (no code changes needed)
- Support for Variable templates and Table data fills
- Configurable filtering, sorting, and value replacement
- xlwings-based data processing with automatic formula recalculation
- Robust error handling and logging
"""
import functools
import json
import logging
import os
import re
import shutil
import tempfile
import traceback
import zipfile
from datetime import datetime, timedelta, timezone
from time import perf_counter
from zoneinfo import ZoneInfo
import openpyxl
from openpyxl.utils import get_column_letter
from rich.console import Console
try:
import xlwings as xw
except ImportError:
xw = None
from eb_dashboard_utils import get_nested_value, get_config_path
from eb_dashboard_constants import (
INCLUSIONS_FILE_NAME,
ORGANIZATIONS_FILE_NAME,
DASHBOARD_CONFIG_FILE_NAME,
EXCEL_WORKBOOKS_TABLE_NAME,
EXCEL_SHEETS_TABLE_NAME,
OUTPUT_ACTION_OVERWRITE,
OUTPUT_ACTION_INCREMENT,
OUTPUT_ACTION_BACKUP,
OUTPUT_ACTIONS,
SOURCE_TYPE_INCLUSIONS,
SOURCE_TYPE_ORGANIZATIONS,
SOURCE_TYPE_VARIABLE,
SOURCE_TYPES,
TARGET_TYPE_TABLE,
TARGET_TYPE_NAMED_RANGE,
EXCEL_COM_MAX_RETRIES,
EXCEL_COM_RETRY_DELAY
)
# ============================================================================
# CONSTANTS
# ============================================================================
EXCEL_OUTPUT_FOLDER = os.getcwd() # Current working directory
# ============================================================================
# MODULE DEPENDENCIES (injected from main module)
# ============================================================================
console = None
# NOTE: Constants imported from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH):
# Configuration Files:
# - INCLUSIONS_FILE_NAME, ORGANIZATIONS_FILE_NAME, DASHBOARD_CONFIG_FILE_NAME
# - EXCEL_WORKBOOKS_TABLE_NAME, EXCEL_SHEETS_TABLE_NAME
# Output Handling:
# - OUTPUT_ACTION_OVERWRITE, OUTPUT_ACTION_INCREMENT, OUTPUT_ACTION_BACKUP, OUTPUT_ACTIONS
# Data Sources:
# - SOURCE_TYPE_INCLUSIONS, SOURCE_TYPE_ORGANIZATIONS, SOURCE_TYPE_VARIABLE, SOURCE_TYPES
#
# NOTE: Mapping table names (INCLUSIONS_MAPPING_TABLE_NAME, ORGANIZATIONS_MAPPING_TABLE_NAME)
# are defined in constants but loaded/used in main script (eb_dashboard.py)
def set_dependencies(console_instance):
"""
Inject console instance from main module.
Args:
console_instance: Rich Console instance for formatted output
Note:
File and table names are imported directly from eb_dashboard_constants.py
(SINGLE SOURCE OF TRUTH)
"""
global console
console = console_instance
# ============================================================================
# PUBLIC FUNCTIONS
# ============================================================================
def load_excel_export_config(console_instance=None):
"""
Load and validate Excel export configuration from config file.
Args:
console_instance: Optional Rich Console instance
Returns:
Tuple of (excel_workbooks_config, excel_sheets_config, has_error, error_messages)
- excel_workbooks_config: List of workbook definitions
- excel_sheets_config: List of sheet fill definitions
- has_error: Boolean flag if critical errors found
- error_messages: List of error message strings
"""
global console
if console_instance:
console = console_instance
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME)
error_messages = []
try:
workbook = openpyxl.load_workbook(config_path)
except FileNotFoundError:
error_msg = f"Error: Configuration file not found at: {config_path}"
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
return None, None, True, [error_msg]
# Load Excel_Workbooks sheet
if EXCEL_WORKBOOKS_TABLE_NAME not in workbook.sheetnames:
error_msg = f"Error: Sheet '{EXCEL_WORKBOOKS_TABLE_NAME}' not found in configuration file."
error_messages.append(error_msg)
return None, None, True, error_messages
excel_workbooks_sheet = workbook[EXCEL_WORKBOOKS_TABLE_NAME]
excel_workbooks_config = []
try:
headers = [cell.value for cell in excel_workbooks_sheet[1]]
for row_index, row in enumerate(excel_workbooks_sheet.iter_rows(min_row=2, values_only=True), start=2):
if all(cell is None for cell in row):
continue # Skip empty rows
workbook_config = dict(zip(headers, row))
# Validate required fields
if not workbook_config.get("workbook_id"):
error_msg = f"Row {row_index}: 'workbook_id' is mandatory"
error_messages.append(error_msg)
continue
if not workbook_config.get("workbook_template_name"):
error_msg = f"Row {row_index}: 'workbook_template_name' is mandatory"
error_messages.append(error_msg)
continue
if not workbook_config.get("output_file_name_template"):
error_msg = f"Row {row_index}: 'output_file_name_template' is mandatory"
error_messages.append(error_msg)
continue
if_output_exists = workbook_config.get("if_output_exists", OUTPUT_ACTION_OVERWRITE)
if if_output_exists not in OUTPUT_ACTIONS:
error_msg = f"Row {row_index}: 'if_output_exists' must be one of {OUTPUT_ACTIONS}"
error_messages.append(error_msg)
continue
excel_workbooks_config.append(workbook_config)
except Exception as e:
error_msg = f"Error loading Excel_Workbooks sheet: {e}"
error_messages.append(error_msg)
return None, None, True, error_messages
# Load Excel_Sheets sheet
if EXCEL_SHEETS_TABLE_NAME not in workbook.sheetnames:
error_msg = f"Error: Sheet '{EXCEL_SHEETS_TABLE_NAME}' not found in configuration file."
error_messages.append(error_msg)
return excel_workbooks_config, None, True, error_messages
excel_sheets_sheet = workbook[EXCEL_SHEETS_TABLE_NAME]
excel_sheets_config = []
try:
headers = [cell.value for cell in excel_sheets_sheet[1]]
for row_index, row in enumerate(excel_sheets_sheet.iter_rows(min_row=2, values_only=True), start=2):
if all(cell is None for cell in row):
continue
sheet_config = dict(zip(headers, row))
# Validate required fields
if not sheet_config.get("workbook_id"):
continue # Skip rows without workbook_id
if not sheet_config.get("source_type"):
error_msg = f"Row {row_index}: 'source_type' is mandatory"
error_messages.append(error_msg)
continue
source_type = sheet_config["source_type"]
if source_type not in SOURCE_TYPES:
error_msg = f"Row {row_index}: 'source_type' must be one of {SOURCE_TYPES}"
error_messages.append(error_msg)
continue
if not sheet_config.get("source"):
error_msg = f"Row {row_index}: 'source' is mandatory"
error_messages.append(error_msg)
continue
if not sheet_config.get("target_name"):
error_msg = f"Row {row_index}: 'target_name' is mandatory"
error_messages.append(error_msg)
continue
# Parse JSON fields
has_json_error = False
for json_field in ["filter_condition", "sort_keys", "value_replacement"]:
value = sheet_config.get(json_field)
if value:
if isinstance(value, str):
try:
sheet_config[json_field] = json.loads(value)
except json.JSONDecodeError:
error_msg = f"Row {row_index}, field '{json_field}': Invalid JSON format"
error_messages.append(error_msg)
has_json_error = True
break # ← Skip this row entirely
# else: value is already parsed (dict/list), keep as-is
else:
# Empty/None value - leave as None or empty
sheet_config[json_field] = None
if not has_json_error:
excel_sheets_config.append(sheet_config)
except Exception as e:
error_msg = f"Error loading Excel_Sheets sheet: {e}"
error_messages.append(error_msg)
return excel_workbooks_config, excel_sheets_config, True, error_messages
workbook.close()
has_error = len(error_messages) > 0
return excel_workbooks_config, excel_sheets_config, has_error, error_messages
def validate_excel_config(excel_config, console_instance, inclusions_mapping_config=None, organizations_mapping_config=None):
"""
Validate Excel export configuration against templates.
Args:
excel_config: Tuple of (workbooks_config, sheets_config) from load_excel_export_config()
console_instance: Rich Console instance
inclusions_mapping_config: Loaded inclusions mapping config (optional, for future use)
organizations_mapping_config: Loaded organizations mapping config (optional, for future use)
Returns:
Tuple of (has_critical_error, error_messages)
"""
global console
if console_instance:
console = console_instance
if not excel_config or not excel_config[0] or not excel_config[1]:
return False, [] # No config to validate
excel_workbooks_config, excel_sheets_config = excel_config[0], excel_config[1]
error_messages = []
# Validate each workbook
for workbook_config in excel_workbooks_config:
workbook_id = workbook_config.get("workbook_id")
template_name = workbook_config.get("workbook_template_name")
# Check template exists
template_path = os.path.join(get_config_path(), template_name)
if not os.path.exists(template_path):
error_msg = f"Template '{template_name}' (workbook_id: {workbook_id}) not found in config/"
error_messages.append(error_msg)
continue
# Check template is valid Excel
try:
template_wb = openpyxl.load_workbook(template_path)
except Exception as e:
error_msg = f"Template '{template_name}' (workbook_id: {workbook_id}) is not a valid Excel file: {e}"
error_messages.append(error_msg)
continue
# Validate sheets for this workbook
workbook_sheets = [s for s in excel_sheets_config if s.get("workbook_id") == workbook_id]
for sheet_config in workbook_sheets:
target_name = sheet_config.get("target_name")
source_type = sheet_config.get("source_type")
# Find the target in the template (check both named ranges AND tables)
target_found = False
if target_name in template_wb.defined_names:
target_found = True
else:
# Check if it's a table in any sheet
for sheet in template_wb.sheetnames:
sheet_obj = template_wb[sheet]
if hasattr(sheet_obj, 'tables') and target_name in sheet_obj.tables:
target_found = True
break
# If target was found, validate based on source type
if target_found:
# For Variable sources, ensure it's a single cell
if source_type == SOURCE_TYPE_VARIABLE:
# Check if the defined name references a single cell
# NOTE: We still use openpyxl here because template_wb is already open from config loading
table_dims = _get_named_range_dimensions(template_wb, target_name)
if table_dims:
_, _, height, width = table_dims
if height != 1 or width != 1:
error_msg = f"Target '{target_name}' (template: {template_name}) for Variable source must reference a single cell (found {height}x{width})"
error_messages.append(error_msg)
# For Table sources (Inclusions/Organizations), validate dimensions
elif source_type in [SOURCE_TYPE_INCLUSIONS, SOURCE_TYPE_ORGANIZATIONS]:
# Get the dimensions of the named range
# NOTE: We still use openpyxl here because template_wb is already open from config loading
table_dims = _get_named_range_dimensions(template_wb, target_name)
if table_dims:
_, _, height, width = table_dims
# CRITICAL: Table height MUST be exactly 1 (template row only)
if height != 1:
error_msg = f"Target '{target_name}' (template: {template_name}, source_type: {source_type}) must have height=1 (found height={height}). " \
f"Template row must be a single row."
error_messages.append(error_msg)
# CRITICAL: Table width must be >= max(mapping_indices)
# Get the mapping column to validate indices
source = sheet_config.get("source")
if source:
mapping_config = inclusions_mapping_config if source_type == SOURCE_TYPE_INCLUSIONS else organizations_mapping_config
if mapping_config:
column_mapping = _get_column_mapping(mapping_config, source, source_type)
if column_mapping:
max_col_index = max(column_mapping.keys()) # 0-based index
if max_col_index >= width:
error_msg = f"Target '{target_name}' (template: {template_name}) width={width} is insufficient. " \
f"Maximum column index from mapping is {max_col_index} (0-based). " \
f"Width must be > {max_col_index}."
error_messages.append(error_msg)
else:
error_msg = f"Named range '{target_name}' (template: {template_name}, workbook_id: {workbook_id}) not found in template"
error_messages.append(error_msg)
template_wb.close()
return len(error_messages) > 0, error_messages
def export_to_excel(inclusions_data, organizations_data, excel_config,
inclusions_mapping_config=None, organizations_mapping_config=None):
"""
Main export function - orchestrates Excel workbook generation.
Args:
inclusions_data: List of inclusion dictionaries
organizations_data: List of organization dictionaries
excel_config: Tuple of (workbooks_config, sheets_config)
inclusions_mapping_config: Inclusions field mapping configuration
organizations_mapping_config: Organizations field mapping configuration
Returns:
Tuple of (success, error_count)
Note:
Uses global console instance (injected from main script)
"""
if not excel_config or not excel_config[0] or not excel_config[1]:
console.print("[yellow]⚠ No Excel export configuration found, skipping[/yellow]")
return True, 0
excel_workbooks_config, excel_sheets_config = excel_config[0], excel_config[1]
# Prepare template variables
template_vars = _prepare_template_variables()
error_count = 0
success_count = 0
# Track overall export duration
export_start_time = perf_counter()
# Process each workbook
for workbook_config in excel_workbooks_config:
try:
workbook_id = workbook_config.get("workbook_id")
template_name = workbook_config.get("workbook_template_name")
output_template = workbook_config.get("output_file_name_template")
if_output_exists = workbook_config.get("if_output_exists", OUTPUT_ACTION_OVERWRITE)
# Resolve output filename
try:
output_filename = output_template.format(**template_vars)
except KeyError as e:
console.print(f"[bold red]✗ Unknown variable in template: {e}[/bold red]")
error_count += 1
continue
output_path = os.path.join(EXCEL_OUTPUT_FOLDER, output_filename)
# Log workbook processing start
logging.info(f"Processing workbook: {workbook_id} (template: {template_name}, output: {output_filename})")
# PHASE PRÉPARATION: Handle existing file according to action
output_path = _handle_output_exists(output_path, if_output_exists)
# XLWINGS PHASE: Open template, fill, save as output
template_path = os.path.join(get_config_path(), template_name)
# Track workbook processing duration with spinning status
workbook_start_time = perf_counter()
try:
if xw is None:
raise ImportError("xlwings is not installed. Install with: pip install xlwings")
# Use status with spinner while processing the workbook
with console.status(f"[bold cyan]Exporting {output_filename}...", spinner="dots"):
# PERFORMANCE: Make Excel invisible BEFORE opening the workbook
app_xw = None
screen_updating_original = None
visible_original = None
try:
# Get or create Excel app in invisible mode
if xw.apps:
app_xw = xw.apps.active
visible_original = app_xw.visible
screen_updating_original = app_xw.screen_updating
else:
# Create new app in invisible mode
app_xw = xw.App(visible=False)
visible_original = False
screen_updating_original = True
app_xw.visible = False # Make Excel invisible
app_xw.screen_updating = False # Disable screen updates
except Exception as e:
logging.warning(f"Failed to manage Excel visibility: {e}")
app_xw = None
# Open TEMPLATE directly (not a copy)
wb_xw = xw.Book(template_path, update_links=False)
try:
# CAPTURE TEMPLATE STATE: Save initial state for restoration before save
template_state = _capture_workbook_state(wb_xw, workbook_context=f"{workbook_id} ({output_filename})")
logging.info(f"Captured template state: active_sheet='{template_state['active_sheet']}', {len(template_state['sheets'])} sheet(s)")
# Get sheets for this workbook
workbook_sheets = [s for s in excel_sheets_config if s.get("workbook_id") == workbook_id]
# Process each sheet with xlwings
for sheet_config in workbook_sheets:
_process_sheet_xlwings(
wb_xw,
sheet_config,
inclusions_data,
organizations_data,
inclusions_mapping_config=inclusions_mapping_config,
organizations_mapping_config=organizations_mapping_config,
template_vars=template_vars,
workbook_context=f"{workbook_id} ({output_filename})"
)
# RESTORE TEMPLATE STATE: Restore initial state before saving
_restore_workbook_state(wb_xw, template_state, workbook_context=f"{workbook_id} ({output_filename})")
logging.info(f"Restored template state before save")
# Save as output file with forced overwrite (with retry mechanism)
# This preserves filesystem versioning for cloud storage
# Disable alerts to force silent overwrite
abs_output_path = os.path.abspath(output_path)
if app_xw:
display_alerts_original = app_xw.api.DisplayAlerts
app_xw.api.DisplayAlerts = False
try:
_save_workbook_with_retry(wb_xw, abs_output_path)
logging.info(f"Saved workbook to: {abs_output_path}")
finally:
if app_xw:
app_xw.api.DisplayAlerts = display_alerts_original
# Excel automatically recalculates formulas on save
# No need for separate recalculation step
finally:
# Always close the workbook and restore visibility/screen updates
wb_xw.close()
if app_xw is not None:
try:
if screen_updating_original is not None:
app_xw.screen_updating = screen_updating_original
if visible_original is not None:
app_xw.visible = visible_original
except:
pass
# Calculate duration and display success message
workbook_duration = perf_counter() - workbook_start_time
console.print(f"[green]✓ Created: {output_filename} ({workbook_duration:.2f}s)[/green]")
success_count += 1
except Exception as e:
console.print(f"[bold red]✗ Error processing {output_filename}: {e}[/bold red]")
logging.error(f"Excel export error for {output_filename}: {e}", exc_info=True)
error_count += 1
continue
except Exception as e:
console.print(f"[bold red]✗ Error processing workbook {workbook_id}: {e}[/bold red]")
logging.error(f"Excel workbook processing error: {e}", exc_info=True)
error_count += 1
# Summary with total duration
total_workbooks = success_count + error_count
export_duration = perf_counter() - export_start_time
if error_count == 0:
# Success: all workbooks processed
console.print(f"\n[green]✓ Excel export completed successfully: {success_count}/{total_workbooks} workbooks generated ({export_duration:.2f}s)[/green]")
else:
# Failure: some or all workbooks failed
if success_count > 0:
# Partial success
console.print(f"\n[yellow]⚠ Excel export completed with errors ({export_duration:.2f}s)[/yellow]")
console.print(f"[green] {success_count} workbook(s) generated successfully[/green]")
console.print(f"[bold red] {error_count} workbook(s) failed[/bold red]")
else:
# Complete failure
console.print(f"\n[bold red]✗ Excel export failed: all {error_count} workbook(s) failed ({export_duration:.2f}s)[/bold red]")
return error_count == 0, error_count
# ============================================================================
# INTERNAL FUNCTIONS
# ============================================================================
def _prepare_template_variables():
"""
Prepare variables available for Template String evaluation.
Returns:
Dictionary of variables available to .format(**locals())
"""
# Get UTC timestamp from inclusions file
# Use constant from eb_dashboard_constants (SINGLE SOURCE OF TRUTH)
inclusions_file = INCLUSIONS_FILE_NAME
if os.path.exists(inclusions_file):
file_mtime = os.path.getmtime(inclusions_file)
extract_date_time_utc = datetime.fromtimestamp(file_mtime, tz=timezone.utc)
else:
extract_date_time_utc = datetime.now(tz=timezone.utc)
# Convert to Paris timezone
extract_date_time_french = extract_date_time_utc.astimezone(
ZoneInfo('Europe/Paris')
)
return {
'extract_date_time_utc': extract_date_time_utc,
'extract_date_time_french': extract_date_time_french,
}
def _apply_filter(item, filter_condition):
"""
Apply filter condition to item (AND logic for all conditions).
Args:
item: Dictionary to filter
filter_condition: List of [field_name, operator, value] conditions
Returns:
Boolean True if item passes all filters
"""
if not filter_condition:
return True # Empty filter = accept all
for field_path, operator, expected_value in filter_condition:
actual_value = get_nested_value(item, field_path.split("."))
if actual_value is None:
return False # Missing field = filter out
# Apply operator
if operator == "==":
if actual_value != expected_value:
return False
elif operator == "<>":
if actual_value == expected_value:
return False
elif operator == ">":
if not (actual_value > expected_value):
return False
elif operator == ">=":
if not (actual_value >= expected_value):
return False
elif operator == "<":
if not (actual_value < expected_value):
return False
elif operator == "<=":
if not (actual_value <= expected_value):
return False
return True # All conditions passed
def _apply_sort(items, sort_keys):
"""
Apply multi-key sort to items with support for mixed asc/desc ordering.
Args:
items: List of dictionaries to sort
sort_keys: List of [field_name, order] or [field_name, order, option]
where:
- order is "asc" or "desc"
- option (optional) can be:
* datetime format string (e.g., "%Y-%m-%d") for date parsing
* "*natsort" for natural alphanumeric sorting
Supports MIXED asc/desc on different columns!
Returns:
Sorted list
"""
if not sort_keys:
return items
def natural_sort_key(text):
"""
Helper for natural alphanumeric sorting.
Converts "ENDOBEST-003-920-BA" to ["endobest", "-", 3, "-", 920, "-", "ba"]
Python's native list comparison handles the rest element by element.
"""
def convert(segment):
return int(segment) if segment.isdigit() else segment.lower()
return [convert(s) for s in re.split(r'(\d+)', str(text)) if s]
def compare_items(item1, item2):
"""
Comparator function for multi-key sorting with mixed asc/desc support.
Returns: -1 if item1 < item2, 0 if equal, 1 if item1 > item2
"""
for sort_spec in sort_keys:
field_name = sort_spec[0]
order = sort_spec[1] if len(sort_spec) > 1 else "asc"
sort_option = sort_spec[2] if len(sort_spec) > 2 else None
# Get values from both items
val1 = get_nested_value(item1, field_name.split("."))
val2 = get_nested_value(item2, field_name.split("."))
# Handle undefined/None - place at end
is_undef1 = val1 in [None, "", "undefined"]
is_undef2 = val2 in [None, "", "undefined"]
# Both undefined: equal
if is_undef1 and is_undef2:
continue
# Only one undefined: undefined goes last
if is_undef1:
return 1 # item1 > item2 (undefined last)
if is_undef2:
return -1 # item1 < item2 (item2 is undefined)
# Check if natural sort requested
is_natural_sort = (sort_option == "*natsort")
# Parse datetime if option is a datetime format (not *natsort)
if sort_option and not is_natural_sort:
datetime_format = sort_option
try:
val1 = datetime.strptime(str(val1), datetime_format).timestamp()
except (ValueError, TypeError):
val1 = None
return 1 # Invalid datetime goes last
try:
val2 = datetime.strptime(str(val2), datetime_format).timestamp()
except (ValueError, TypeError):
val2 = None
return -1 # Invalid datetime goes last
# Apply natural sort transformation if requested
if is_natural_sort:
val1 = natural_sort_key(val1)
val2 = natural_sort_key(val2)
# Compare values
# For strings (non-natsort), use case-insensitive comparison for natural alphabetical ordering
if isinstance(val1, str) and isinstance(val2, str):
val1_lower = val1.lower()
val2_lower = val2.lower()
if val1_lower < val2_lower:
cmp_result = -1
elif val1_lower > val2_lower:
cmp_result = 1
else:
# Case-insensitive equal, use case-sensitive as tiebreaker
if val1 < val2:
cmp_result = -1
elif val1 > val2:
cmp_result = 1
else:
cmp_result = 0
else:
# Non-string comparison (numbers, dates, natsort lists, etc.)
if val1 < val2:
cmp_result = -1
elif val1 > val2:
cmp_result = 1
else:
cmp_result = 0 # Equal, continue to next sort key
# Apply asc/desc ordering
if cmp_result != 0:
is_desc = isinstance(order, str) and order.lower() == "desc"
return cmp_result if not is_desc else -cmp_result
# All keys are equal
return 0
# Use functools.cmp_to_key to convert comparator to key function
return sorted(items, key=functools.cmp_to_key(compare_items))
def _apply_value_replacement(value, replacements):
"""
Apply value replacement rules (first-match-wins, strict type matching).
Args:
value: Value to potentially replace
replacements: List of [value_before, value_after] pairs
Returns:
Replaced value or original
Note:
This function is currently prepared for future use in table data filling.
"""
if not replacements:
return value
for value_before, value_after in replacements:
if value == value_before: # Strict equality
return value_after
return value # No match, return original
# OBSOLETE: _preserve_media_in_workbook() removed - xlwings handles media preservation automatically
# When using xlwings, Excel natively preserves all media, images, and relationships
def _save_workbook_with_retry(wb_xw, output_path):
"""
Save workbook with retry mechanism for transient xlwings/Excel failures.
Excel's SaveAs can fail randomly on some environments (especially Excel 2013).
This function retries the save operation with configurable retry count and delay.
Args:
wb_xw: xlwings Book object
output_path: Absolute path where workbook should be saved
Raises:
Exception: If SaveAs fails after all retry attempts
"""
from time import sleep
for attempt in range(1, EXCEL_COM_MAX_RETRIES + 1):
try:
logging.info(f"SaveAs attempt {attempt}/{EXCEL_COM_MAX_RETRIES}: {output_path}")
wb_xw.api.SaveAs(output_path)
logging.info(f"SaveAs succeeded on attempt {attempt}")
return # Success
except Exception as e:
error_msg = f"SaveAs failed on attempt {attempt}: {type(e).__name__}: {str(e)}"
if attempt < EXCEL_COM_MAX_RETRIES:
# Intermediate retry - log as warning and sleep before retry
logging.warning(f"{error_msg} - Retrying in {EXCEL_COM_RETRY_DELAY}s...")
sleep(EXCEL_COM_RETRY_DELAY)
else:
# Final attempt failed - log as critical error and raise
logging.error(f"{error_msg} - All {EXCEL_COM_MAX_RETRIES} retry attempts exhausted")
raise
def _capture_workbook_state(wb_xw, workbook_context=""):
"""
Capture the visual state of the workbook (active sheet, selections, scroll positions).
This allows restoration of the template's visual state after data processing,
ensuring recipients see the workbook exactly as designed in the template.
Args:
wb_xw: xlwings Book object
workbook_context: String identifier for logging (workbook_id and filename)
Returns:
dict: State dictionary with 'active_sheet' and 'sheets' state per sheet
"""
ctx = f"[{workbook_context}]" if workbook_context else ""
logging.info(f"{ctx} [CAPTURE_STATE] Starting workbook state capture")
logging.info(f"{ctx} [CAPTURE_STATE] Total sheets: {len(wb_xw.sheets)}")
state = {
'active_sheet': None,
'sheets': {}
}
try:
# Capture active sheet name
state['active_sheet'] = wb_xw.api.ActiveSheet.Name
logging.info(f"{ctx} [CAPTURE_STATE] Active sheet captured: '{state['active_sheet']}'")
except Exception as e:
logging.warning(f"{ctx} [CAPTURE_STATE] Could not capture active sheet: {type(e).__name__}: {str(e)}")
# Capture state for each sheet
for idx, sheet in enumerate(wb_xw.sheets, 1):
logging.info(f"{ctx} [CAPTURE_STATE] Processing sheet {idx}/{len(wb_xw.sheets)}: '{sheet.name}'")
try:
# Activate sheet to get its state
sheet.activate()
logging.info(f"{ctx} [CAPTURE_STATE] Sheet '{sheet.name}' activated successfully")
sheet_api = sheet.api
sheet_state = {
'selection': None,
'scroll_row': 1,
'scroll_col': 1
}
# Capture selection address
try:
selection_address = sheet_api.Application.Selection.Address
sheet_state['selection'] = selection_address
logging.info(f"{ctx} [CAPTURE_STATE] Sheet '{sheet.name}' selection captured: {selection_address}")
except Exception as e:
sheet_state['selection'] = "A1" # Default
logging.warning(f"{ctx} [CAPTURE_STATE] Could not capture selection for sheet '{sheet.name}': {type(e).__name__}, defaulting to A1")
# Capture scroll position
try:
scroll_row = sheet_api.Application.ActiveWindow.ScrollRow
scroll_col = sheet_api.Application.ActiveWindow.ScrollColumn
sheet_state['scroll_row'] = scroll_row
sheet_state['scroll_col'] = scroll_col
logging.info(f"{ctx} [CAPTURE_STATE] Sheet '{sheet.name}' scroll position captured: Row={scroll_row}, Col={scroll_col}")
except Exception as e:
logging.warning(f"{ctx} [CAPTURE_STATE] Could not capture scroll position for sheet '{sheet.name}': {type(e).__name__}, keeping defaults")
state['sheets'][sheet.name] = sheet_state
logging.info(f"{ctx} [CAPTURE_STATE] Sheet '{sheet.name}' state complete: {sheet_state}")
except Exception as e:
logging.error(f"{ctx} [CAPTURE_STATE] ERROR capturing state for sheet '{sheet.name}': {type(e).__name__}: {str(e)}")
logging.info(f"{ctx} [CAPTURE_STATE] Workbook state capture complete. Captured {len(state['sheets'])} sheet(s)")
return state
def _restore_workbook_state(wb_xw, state, workbook_context=""):
"""
Restore the visual state of the workbook (active sheet, selections, scroll positions).
Args:
wb_xw: xlwings Book object
state: State dictionary from _capture_workbook_state()
workbook_context: String identifier for logging (workbook_id and filename)
"""
if not state:
logging.warning("[RESTORE_STATE] Empty state provided, skipping restoration")
return
from time import sleep
ctx = f"[{workbook_context}]" if workbook_context else ""
logging.info(f"{ctx} [RESTORE_STATE] Starting workbook state restoration")
logging.info(f"{ctx} [RESTORE_STATE] Restoring {len(state.get('sheets', {}))} sheet(s)")
# NOTE: Screen updating is already disabled at the global level (in export_to_excel)
# for the entire workbook processing cycle (from open to save).
# We do NOT re-disable it here to avoid state conflicts.
# The global setting ensures all operations (capture, process, restore, save) run efficiently.
# CRITICAL: Excel 2013 COM layer lock recovery
# After bulk paste operations, Excel's COM layer can enter a "locked" state where Range.Select()
# fails persistently. This appears to be a fundamental limitation/bug in Excel 2013.
# To work around this, we need to:
# 1. Give Excel time to recover with a large delay
# 2. Then make a "dummy" Range.Select() to wake up the COM layer
# 3. Then proceed with real restorations
logging.info(f"{ctx} [RESTORE_STATE] Waiting for Excel COM layer to stabilize after bulk operations (2 seconds)...")
sleep(2.0) # Large delay to allow COM layer to recover
# Track original visibility state (used for temporary visibility during retries)
original_app_visible = None
try:
if wb_xw.app:
original_app_visible = wb_xw.app.visible
if not original_app_visible:
# Make Excel visible during restoration so user sees what's happening
# (important for selection restore retries which may take 2+ seconds)
wb_xw.app.visible = True
logging.info(f"{ctx} [RESTORE_STATE] Excel app temporarily made visible for restoration operations")
except Exception as e:
logging.warning(f"{ctx} [RESTORE_STATE] Could not manage Excel visibility during restoration: {type(e).__name__}: {str(e)}")
# Wake up the COM layer with a dummy selection attempt on the first sheet
# This "primes" the COM layer so subsequent Range.Select() calls work reliably
try:
if len(wb_xw.sheets) > 0:
first_sheet = wb_xw.sheets[0]
first_sheet.activate()
logging.info(f"{ctx} [RESTORE_STATE] Priming COM layer by activating first sheet...")
first_sheet.api.Range("$A$1").Select()
logging.info(f"{ctx} [RESTORE_STATE] COM layer priming successful")
except Exception as e:
# This is not critical - if it fails, retries will handle it
logging.info(f"{ctx} [RESTORE_STATE] COM layer priming attempt completed (may have failed, retries will handle it)")
# Restore state for each sheet
for idx, (sheet_name, sheet_state) in enumerate(state.get('sheets', {}).items(), 1):
logging.info(f"{ctx} [RESTORE_STATE] Processing sheet {idx}: '{sheet_name}'")
try:
sheet = wb_xw.sheets[sheet_name]
sheet.activate()
logging.info(f"{ctx} [RESTORE_STATE] Sheet '{sheet_name}' activated successfully")
# Small delay after activation to ensure Excel has completed the sheet switch
sleep(0.3)
sheet_api = sheet.api
# Restore selection with retry mechanism for transient Excel COM failures
if sheet_state.get('selection'):
selection = sheet_state['selection']
selection_restored = False
# Try to restore original selection with retry
for attempt in range(1, EXCEL_COM_MAX_RETRIES + 1):
try:
logging.info(f"{ctx} [RESTORE_STATE] Selection restore attempt {attempt}/{EXCEL_COM_MAX_RETRIES} for '{selection}' on sheet '{sheet_name}'")
sheet_api.Range(selection).Select()
logging.info(f"{ctx} [RESTORE_STATE] Sheet '{sheet_name}' selection restored to: {selection}")
selection_restored = True
break # Success
except Exception as e:
error_msg = f"Selection restore failed on attempt {attempt}: {type(e).__name__}: {str(e)}"
if attempt < EXCEL_COM_MAX_RETRIES:
# Intermediate retry - log as warning and sleep before retry
logging.warning(f"{ctx} [RESTORE_STATE] {error_msg} - Retrying in {EXCEL_COM_RETRY_DELAY}s...")
sleep(EXCEL_COM_RETRY_DELAY)
else:
# Final attempt failed - log as error, will default to A1
logging.error(f"{ctx} [RESTORE_STATE] {error_msg} - All {EXCEL_COM_MAX_RETRIES} retry attempts exhausted")
# If selection restore failed after all retries, default to A1
if not selection_restored:
logging.warning(f"{ctx} [RESTORE_STATE] Could not restore selection '{selection}' for sheet '{sheet_name}' after {EXCEL_COM_MAX_RETRIES} attempts, defaulting to A1")
# Try to set default A1 selection (using absolute reference: $A$1)
for attempt in range(1, EXCEL_COM_MAX_RETRIES + 1):
try:
logging.info(f"{ctx} [RESTORE_STATE] A1 default attempt {attempt}/{EXCEL_COM_MAX_RETRIES} for sheet '{sheet_name}'")
sheet_api.Range("$A$1").Select()
logging.info(f"{ctx} [RESTORE_STATE] Sheet '{sheet_name}' selection defaulted to A1")
break # Success
except Exception as e2:
error_msg = f"A1 default failed on attempt {attempt}: {type(e2).__name__}: {str(e2)}"
if attempt < EXCEL_COM_MAX_RETRIES:
logging.warning(f"{ctx} [RESTORE_STATE] {error_msg} - Retrying in {EXCEL_COM_RETRY_DELAY}s...")
sleep(EXCEL_COM_RETRY_DELAY)
else:
logging.error(f"{ctx} [RESTORE_STATE] {error_msg} - All {EXCEL_COM_MAX_RETRIES} retry attempts exhausted")
# Restore scroll position
try:
scroll_row = sheet_state.get('scroll_row', 1)
scroll_col = sheet_state.get('scroll_col', 1)
sheet_api.Application.ActiveWindow.ScrollRow = scroll_row
sheet_api.Application.ActiveWindow.ScrollColumn = scroll_col
logging.info(f"{ctx} [RESTORE_STATE] Sheet '{sheet_name}' scroll position restored: Row={scroll_row}, Col={scroll_col}")
except Exception as e:
logging.warning(f"{ctx} [RESTORE_STATE] Could not restore scroll position for sheet '{sheet_name}': {type(e).__name__}")
except Exception as e:
logging.error(f"{ctx} [RESTORE_STATE] ERROR restoring state for sheet '{sheet_name}': {type(e).__name__}: {str(e)}")
# Restore active sheet
if state.get('active_sheet'):
try:
from time import sleep
active_sheet_name = state['active_sheet']
wb_xw.sheets[active_sheet_name].activate()
# Wait for sheet activation to complete on Excel 2013's COM layer
sleep(0.3)
logging.info(f"{ctx} [RESTORE_STATE] Active sheet restored to: '{active_sheet_name}'")
except Exception as e:
logging.error(f"{ctx} [RESTORE_STATE] Could not restore active sheet '{state.get('active_sheet')}': {type(e).__name__}: {str(e)}")
# Force sheet tabs to scroll to show the first sheet
# This ensures the tab bar starts from the first sheet, regardless of which sheet is active
# NOTE: ScrollWorkbookTabs only works when Excel is visible
try:
if len(wb_xw.sheets) > 0 and wb_xw.app:
logging.info(f"{ctx} [RESTORE_STATE] Attempting to scroll sheet tabs to first sheet")
try:
# ScrollWorkbookTabs with negative number scrolls tabs LEFT (toward first sheet)
# Use large negative number (-100) to guarantee we reach the beginning
# Excel visibility is already managed at the beginning of this function
wb_xw.api.Application.ActiveWindow.ScrollWorkbookTabs(-100)
logging.info(f"{ctx} [RESTORE_STATE] Sheet tabs scrolled to beginning")
except Exception as e:
logging.warning(f"{ctx} [RESTORE_STATE] Could not scroll sheet tabs to beginning: {type(e).__name__}: {str(e)}")
except Exception as e:
logging.error(f"{ctx} [RESTORE_STATE] ERROR during sheet tabs scroll operation: {type(e).__name__}: {str(e)}")
# Restore original visibility state (if we temporarily made it visible)
# NOTE: Screen updating restoration is handled at the global level (in export_to_excel)
# after the workbook is saved and closed
try:
if original_app_visible is not None and wb_xw.app:
if not original_app_visible and wb_xw.app.visible:
# Restore to hidden state if it was originally hidden
wb_xw.app.visible = False
logging.info(f"{ctx} [RESTORE_STATE] Excel app visibility restored to original state: False")
except Exception as e:
logging.warning(f"{ctx} [RESTORE_STATE] Could not restore Excel app visibility: {type(e).__name__}: {str(e)}")
logging.info(f"{ctx} [RESTORE_STATE] Workbook state restoration complete")
def _handle_output_exists(output_path, action):
"""
Handle existing output file (Overwrite/Increment/Backup).
Args:
output_path: Full path to output file
action: "Overwrite", "Increment", or "Backup"
Returns:
Actual path to use (may be different if Increment/Backup)
"""
if not os.path.exists(output_path):
logging.info(f"Output file doesn't exist yet: {output_path}")
return output_path
logging.info(f"Output file exists, applying '{action}' rule: {output_path}")
if action == OUTPUT_ACTION_OVERWRITE:
logging.info(f"Overwriting existing file: {output_path}")
return output_path
elif action == OUTPUT_ACTION_INCREMENT:
base, ext = os.path.splitext(output_path)
counter = 1
while os.path.exists(f"{base}_{counter}{ext}"):
counter += 1
new_path = f"{base}_{counter}{ext}"
logging.info(f"Using incremented filename: {new_path}")
return new_path
elif action == OUTPUT_ACTION_BACKUP:
base, ext = os.path.splitext(output_path)
counter = 1
backup_path = f"{base}_backup_{counter}{ext}"
while os.path.exists(backup_path):
counter += 1
backup_path = f"{base}_backup_{counter}{ext}"
try:
logging.info(f"Backing up existing file to: {backup_path}")
shutil.copy2(output_path, backup_path)
logging.info(f"Backup successful: {output_path} -> {backup_path}")
except Exception as e:
logging.error(f"Backup failed: {e}")
raise
# Return original path - the existing file will be overwritten by SaveAs
return output_path
return output_path
def _get_column_mapping(mapping_config, mapping_column_name, source_type):
"""
Extract column mapping from Inclusions_Mapping or Organizations_Mapping.
The mapping column contains user-friendly 1-based indices (1, 2, 3, ...)
indicating which column in the Excel table each field should be placed.
These are converted to 0-based indices for internal use.
Args:
mapping_config: List of mapping config rows (dicts with field_name, etc.)
mapping_column_name: Name of the mapping column to extract (e.g., "MainReport_PatientsList")
source_type: "Inclusions" or "Organizations"
Returns:
Dictionary: {excel_column_index: source_field_name}
Example: {0: "Patient_Identification.Patient_Id", 1: "Inclusion.Status", ...}
Indices are 0-based (converted from 1-based user input)
or None if mapping_column not found
"""
if not mapping_config:
return None
column_mapping = {}
for row in mapping_config:
# Get the field name (source field in the JSON)
field_name = row.get("field_name")
if not field_name:
continue
# Get the mapping value from the specified column
mapping_value = row.get(mapping_column_name)
if mapping_value is None or mapping_value == "":
continue # Skip empty mappings
# Convert mapping_value to integer (1-based user-friendly index)
try:
user_col_index = int(mapping_value)
except (ValueError, TypeError):
logging.warning(f"Invalid column index '{mapping_value}' for field '{field_name}'")
continue
# Convert 1-based to 0-based index
excel_col_index = user_col_index - 1
if excel_col_index < 0:
logging.warning(f"Column index '{user_col_index}' for field '{field_name}' must be >= 1")
continue
# Store: excel_column_index -> field_name
# Field name needs to be qualified with group for Inclusions
# (extracted from the row's field_group if available)
if source_type == "Inclusions":
field_group = row.get("field_group", "")
if field_group:
full_field_name = f"{field_group}.{field_name}"
else:
full_field_name = field_name
else:
# For Organizations, field_name might already be qualified or standalone
full_field_name = field_name
column_mapping[excel_col_index] = full_field_name
return column_mapping if column_mapping else None
def _parse_range_dimensions(start_row, start_col, end_row, end_col, header_row_count=0):
"""
Shared utility: Calculate dimensions from cell coordinates.
Args:
start_row, start_col: Starting cell (1-based, after headers)
end_row, end_col: Ending cell (1-based)
header_row_count: Number of header rows (0 if none)
Returns:
Tuple of (width, total_height, data_height)
"""
width = end_col - start_col + 1
total_height = end_row - start_row + 1
data_height = total_height - header_row_count
return width, total_height, data_height
def _get_named_range_dimensions(workbook, range_name):
"""
Get dimensions of named range or table in workbook.
Args:
workbook: openpyxl Workbook object
range_name: Name of the named range or table
Returns:
Tuple of (sheet_name, start_cell, height, width) or None if not found
"""
# First check for defined named ranges (in openpyxl 3.x)
if range_name in workbook.defined_names:
defined_name = workbook.defined_names[range_name]
# Get the range reference from attr_text (e.g., "Sheet!$A$1:$B$10")
range_ref = defined_name.attr_text
# Parse: "SheetName!$A$1:$B$10"
if '!' in range_ref:
sheet_name, cell_range = range_ref.split('!')
# Remove quotes if present
sheet_name = sheet_name.strip("'\"")
# Remove $ signs for parsing
cell_range = cell_range.replace('$', '')
if sheet_name in workbook.sheetnames:
sheet = workbook[sheet_name]
# Parse cell range (e.g., "A1:B10" or single "A1")
if ':' in cell_range:
start_cell_str, end_cell_str = cell_range.split(':')
start_cell = sheet[start_cell_str]
end_cell = sheet[end_cell_str]
width = end_cell.column - start_cell.column + 1
height = end_cell.row - start_cell.row + 1
else:
start_cell = sheet[cell_range]
width = 1
height = 1
return sheet_name, start_cell, height, width
# Check if it's a Table (Excel table object, not just a named range)
for sheet_name in workbook.sheetnames:
sheet = workbook[sheet_name]
if hasattr(sheet, 'tables') and range_name in sheet.tables:
table = sheet.tables[range_name]
# Table has a 'ref' property with the range (e.g., "A4:F5")
# Excel tables can have header rows (default 1, but can be 0)
table_ref = table.ref
header_row_count = getattr(table, 'headerRowCount', 1) or 0 # 0 if None or False
# Parse cell range (e.g., "A4:F5")
if ':' in table_ref:
start_cell_str, end_cell_str = table_ref.split(':')
start_cell_temp = sheet[start_cell_str]
end_cell = sheet[end_cell_str]
width = end_cell.column - start_cell_temp.column + 1
total_height = end_cell.row - start_cell_temp.row + 1
# Skip header rows: point to first DATA row
if header_row_count > 0:
data_start_row = start_cell_temp.row + header_row_count
start_cell = sheet.cell(row=data_start_row, column=start_cell_temp.column)
else:
start_cell = start_cell_temp
# Calculate data row count (total - headers)
height = total_height - header_row_count
else:
start_cell = sheet[table_ref]
width = 1
height = 1
return sheet_name, start_cell, height, width
return None
# OBSOLETE: _update_named_range_height() removed
# This function was only called by the old openpyxl-based _process_sheet() implementation
# xlwings uses table.Resize() via COM API instead, which is more reliable
# See PHASE 2 migration notes for details
# OBSOLETE: _recalculate_workbook() removed - xlwings handles formula recalculation automatically
# When using xlwings with wb.save(), Excel automatically recalculates all formulas
# OBSOLETE: _process_sheet() removed - openpyxl implementation migrated to xlwings
# All sheet processing is now handled by _process_sheet_xlwings() using xlwings library
# This eliminates code duplication and provides better preservation of workbook structure
def _get_table_dimensions_xlwings(workbook_xw, range_name):
"""
Get dimensions of an Excel table OR named range using xlwings COM API.
First searches for ListObjects (structured tables), then falls back to
simple named ranges if no table is found.
Args:
workbook_xw: xlwings Book object (already open)
range_name: Name of the Excel table (ListObject) or named range
Returns:
Tuple (sheet_name, start_cell, height, width, header_row_count, target_type) or None if not found
- start_cell: Points to FIRST DATA ROW (after headers for tables, first row for named ranges)
- height: Number of DATA ROWS (excluding headers for tables)
- header_row_count: Number of header rows (0 for named ranges, 0 or 1 for tables)
- target_type: TARGET_TYPE_TABLE or TARGET_TYPE_NAMED_RANGE
Note:
- For tables with headers: start_cell points to first data row (after header)
- For tables without headers: start_cell points to first row of table
- For named ranges: start_cell points to first row (no headers assumed)
"""
# Helper class to mimic openpyxl Cell behavior
class CellRef:
def __init__(self, row, column):
self.row = row
self.column = column
@property
def coordinate(self):
col_letter = get_column_letter(self.column)
return f"{col_letter}{self.row}"
# === PRIORITY 1: Check if it's a table (ListObject) ===
# Excel tables are more reliable than plain named ranges with xlwings
for sheet in workbook_xw.sheets:
sheet_api = sheet.api
# Try to get the table count - if this fails, the sheet has no ListObjects property
try:
table_count = sheet_api.ListObjects.Count
except:
# Sheet doesn't support ListObjects or has none
continue
# If no tables in this sheet, skip
if table_count == 0:
continue
# Iterate through tables by index
for i in range(1, table_count + 1): # COM indexing starts at 1
try:
xl_table = sheet_api.ListObjects.Item(i)
table_name = xl_table.Name
if table_name == range_name:
# Found a table - get its range
xl_range = xl_table.Range
sheet_name = sheet.name
total_rows = xl_range.Rows.Count
total_cols = xl_range.Columns.Count
start_row = xl_range.Row
start_col = xl_range.Column
# Get header row count from the table
# In COM API, ListObject has ShowHeaders property (boolean) and HeaderRowRange
# ShowHeaders: True if table has header row, False if not
try:
has_headers = xl_table.ShowHeaders
header_row_count = 1 if has_headers else 0
except:
# If ShowHeaders not accessible, try HeaderRowRange
try:
header_range = xl_table.HeaderRowRange
header_row_count = 1 if header_range is not None else 0
except:
# Fallback: assume headers exist (most common case)
header_row_count = 1
# Data height = total height - header rows
data_height = total_rows - header_row_count
# start_cell points to the FIRST DATA ROW (after headers)
# If table has headers: skip them. If no headers: start at table start
if header_row_count > 0:
data_start_row = start_row + header_row_count
else:
data_start_row = start_row
start_cell = CellRef(data_start_row, start_col)
logging.info(f"[TABLE FOUND] Located table '{range_name}' at {sheet_name}!{start_cell.coordinate} "
f"(data rows: {data_height}, headers: {header_row_count}, total width: {total_cols})")
return sheet_name, start_cell, data_height, total_cols, header_row_count, TARGET_TYPE_TABLE
except Exception as e:
# Error accessing this specific table, skip it
logging.warning(f"Error accessing table {i} in '{sheet.name}': {type(e).__name__}")
# === PRIORITY 2: Check if it's a named range ===
# Named ranges don't have headers - data starts at first row
try:
if range_name in workbook_xw.names:
named_range = workbook_xw.names[range_name]
target_range = named_range.refers_to_range
sheet_name = target_range.sheet.name
start_row = target_range.row
start_col = target_range.column
total_rows = target_range.rows.count
total_cols = target_range.columns.count
# Named ranges have no headers - all rows are data rows
header_row_count = 0
data_height = total_rows
start_cell = CellRef(start_row, start_col)
logging.info(f"[NAMED RANGE FOUND] Located named range '{range_name}' at {sheet_name}!{start_cell.coordinate} "
f"(data rows: {data_height}, no headers, total width: {total_cols})")
return sheet_name, start_cell, data_height, total_cols, header_row_count, TARGET_TYPE_NAMED_RANGE
except Exception as e:
logging.warning(f"Error accessing named range '{range_name}': {type(e).__name__}: {str(e)}")
# Range/table not found
logging.warning(f"Named range or table '{range_name}' not found in workbook")
return None
# ============================================================================
# HELPER FUNCTIONS FOR SHEET PROCESSING (extracted from _process_sheet_xlwings)
# ============================================================================
def _fill_variable_in_sheet(workbook_xw, target_name, source_template, template_vars, workbook_context=""):
"""
Fill a single variable cell with evaluated template value.
Args:
workbook_xw: xlwings Book object
target_name: Name of the target named range (single cell)
source_template: Template string with {variables}
template_vars: Dictionary of variable values
workbook_context: Context string for logging
Returns:
Boolean True if successful
"""
try:
# Evaluate template string
cell_value = source_template.format(**template_vars)
except KeyError as e:
logging.warning(f"Unknown variable in template: {e}")
return False
# Write to named cell using xlwings
try:
named_range = workbook_xw.names[target_name]
target_range = named_range.refers_to_range
target_range.value = cell_value
logging.info(f"Set variable '{target_name}' to '{cell_value}'")
return True
except KeyError:
logging.warning(f"Named range '{target_name}' not found in {workbook_context}")
return False
except Exception as e:
logging.warning(f"Error setting variable '{target_name}' in {workbook_context}: {e}")
return False
def _prepare_table_data(source_type, source, sheet_config, inclusions_data, organizations_data,
inclusions_mapping_config, organizations_mapping_config, target_name):
"""
Prepare table data: select source, apply filter/sort, get column mapping.
Args:
source_type: Type of source (Inclusions or Organizations)
source: Source identifier (mapping name)
sheet_config: Sheet configuration dictionary
inclusions_data: Inclusions data list
organizations_data: Organizations data list
inclusions_mapping_config: Inclusions mapping config
organizations_mapping_config: Organizations mapping config
target_name: Target range name (for logging)
Returns:
Tuple of (sorted_data, column_mapping) or (None, None) if error
"""
# Select source data and mapping config
if source_type == SOURCE_TYPE_INCLUSIONS:
source_data = inclusions_data
mapping_config = inclusions_mapping_config
else:
source_data = organizations_data
mapping_config = organizations_mapping_config
# Apply filter and sort
filter_condition = sheet_config.get("filter_condition")
sort_keys = sheet_config.get("sort_keys")
filtered_data = [item for item in source_data if _apply_filter(item, filter_condition)]
sorted_data = _apply_sort(filtered_data, sort_keys)
# Get column mapping
column_mapping = _get_column_mapping(mapping_config, source, source_type)
if not column_mapping:
logging.warning(f"Column mapping '{source}' not found or empty for {target_name}")
return None, None
return sorted_data, column_mapping
def _resize_table_range(workbook_xw, sheet_xw, target_name, start_cell, max_col, start_row, num_data_rows, header_row_count=0, target_type=TARGET_TYPE_TABLE):
"""
Resize Excel table (ListObject) or named range to match data dimensions.
For Tables (ListObjects): Uses ListObject.Resize() COM API
For Named Ranges: Redefines the named range via Name.RefersTo property
Args:
workbook_xw: xlwings Book object (needed for named range resize)
sheet_xw: xlwings Sheet object
target_name: Name of the table/named range
start_cell: Starting cell (CellRef) - points to FIRST DATA ROW (after headers for tables)
max_col: Maximum column (1-based)
start_row: Starting row (1-based, first data row)
num_data_rows: Number of data rows
header_row_count: Number of header rows in the table (0 for named ranges)
target_type: TARGET_TYPE_TABLE or TARGET_TYPE_NAMED_RANGE
Returns:
None (logging handles errors)
"""
if num_data_rows <= 1:
return
try:
# Calculate the last data row
last_data_row = start_row + num_data_rows - 1
if target_type == TARGET_TYPE_TABLE:
# === TABLE (ListObject) RESIZE ===
excel_sheet = sheet_xw.api
# Find the ListObject (Table) by name
for list_obj in excel_sheet.ListObjects:
if list_obj.Name == target_name:
# If header_row_count not provided (legacy fallback), get it from the table
if header_row_count == 0:
try:
has_headers = list_obj.ShowHeaders
header_row_count = 1 if has_headers else 0
except:
header_row_count = 1
# For resize, include header rows if they exist
if header_row_count > 0:
first_row = start_row - header_row_count
else:
first_row = start_row
resize_range_str = f"{get_column_letter(start_cell.column)}{first_row}:{get_column_letter(max_col)}{last_data_row}"
# Perform resize via ListObject.Resize()
new_range = excel_sheet.Range(resize_range_str)
list_obj.Resize(new_range)
logging.info(f"Resized table '{target_name}' to {resize_range_str} (header_rows={header_row_count})")
break
elif target_type == TARGET_TYPE_NAMED_RANGE:
# === NAMED RANGE RESIZE ===
# Redefine the named range to cover all data rows
# Named ranges have no headers, so start_row is the first row
first_col_letter = get_column_letter(start_cell.column)
last_col_letter = get_column_letter(max_col)
# Build the range address in A1 style
range_address = f"${first_col_letter}${start_row}:${last_col_letter}${last_data_row}"
# Get the actual Range object from the sheet and assign it to the Name
# This avoids R1C1/A1 format issues by using the Range object directly
new_range = sheet_xw.range(range_address)
workbook_xw.api.Names(target_name).RefersTo = new_range.api
logging.info(f"Resized named range '{target_name}' to {sheet_xw.name}!{range_address}")
except Exception as e:
logging.warning(f"Resize skipped for {target_name} ({target_type}): {e}")
def _duplicate_template_row(sheet_xw, start_cell, max_col, start_row, num_data_rows, target_name, workbook_context=""):
"""
Duplicate template row to all data rows via copy-paste.
Args:
sheet_xw: xlwings Sheet object
start_cell: Starting cell (CellRef)
max_col: Maximum column (1-based)
start_row: Starting row (1-based)
num_data_rows: Number of data rows
target_name: Target range name (for logging)
workbook_context: Context string for logging
Returns:
None (logging handles errors)
"""
if num_data_rows <= 1:
return
try:
# Replicate template row to all data rows in a single operation
template_range_str = f"{get_column_letter(start_cell.column)}{start_row}:{get_column_letter(max_col)}{start_row}"
last_data_row = start_row + num_data_rows - 1
full_target_range_str = f"{get_column_letter(start_cell.column)}{start_row}:{get_column_letter(max_col)}{last_data_row}"
# Copy template row
sheet_xw.range(template_range_str).copy()
# Paste to entire range - Excel automatically replicates the template row
sheet_xw.range(full_target_range_str).paste()
# CRITICAL: Deselect after paste to avoid COM layer lock
# After bulk paste on large ranges (85k+ cells), Excel's COM layer becomes saturated
# and leaves a massive selection active. This prevents subsequent Range.Select() calls.
# Solution: Reset Excel's selection state by switching sheets and back, then select A1.
try:
from time import sleep
logging.info(f"Deselecting range after bulk paste for {target_name}...")
# Switch to another sheet to force Excel to reset selection state
other_sheets = [s for s in sheet_xw.book.sheets if s.name != sheet_xw.name]
if other_sheets:
other_sheets[0].activate()
sleep(0.1)
# Reactivate our sheet - Excel resets selection management when returning
sheet_xw.activate()
# Select A1 - COM should manage this easily now
sheet_xw.api.Range("$A$1").Select()
logging.info(f"Successfully deselected after bulk paste for {target_name} (sheet reactivation)")
except Exception as e:
# Deselection is non-critical, log and continue if it fails
logging.warning(f"Deselection after paste failed for {target_name}: {type(e).__name__}: {str(e)}")
except Exception as e:
logging.warning(f"Template duplication failed for {target_name} in {workbook_context}: {e}")
def _fill_table_with_data(sheet_xw, start_cell, start_row, start_col, sorted_data, column_mapping,
value_replacement, target_name, sheet_name):
"""
Fill table with data: group contiguous columns and transfer via bulk 2D arrays.
Args:
sheet_xw: xlwings Sheet object
start_cell: Starting cell (CellRef)
start_row: Starting row (1-based)
start_col: Starting column (1-based)
sorted_data: Sorted list of data items
column_mapping: Dict mapping Excel column indices to source field paths
value_replacement: Value replacement configuration (or None)
target_name: Target range name (for logging)
sheet_name: Sheet name (for logging)
Returns:
None (logging handles errors and success)
"""
try:
# === Prepare column mapping and group contiguous columns ===
col_order = sorted(column_mapping.keys())
# Group contiguous columns for optimal bulk update
contiguous_groups = []
if col_order:
current_group = [col_order[0]]
for i in range(1, len(col_order)):
if col_order[i] == col_order[i-1] + 1:
current_group.append(col_order[i])
else:
contiguous_groups.append(current_group)
current_group = [col_order[i]]
contiguous_groups.append(current_group)
# === Update contiguous column groups (bulk 2D transfer) ===
for col_group in contiguous_groups:
# Build 2D array for this group: rows × columns
data_2d = []
for item in sorted_data:
row_values = []
for excel_col_index in col_group:
source_field_path = column_mapping[excel_col_index]
# Get value from source item
value = get_nested_value(item, source_field_path.split("."))
# Apply value replacement
if value_replacement:
value = _apply_value_replacement(value, value_replacement)
row_values.append(value)
data_2d.append(row_values)
# Transfer entire group to Excel in ONE operation
first_col_in_group = start_col + col_group[0]
first_col_letter = get_column_letter(first_col_in_group)
target_range_start = f"{first_col_letter}{start_row}"
# Write 2D array at once (xlwings automatically maps rows × columns)
sheet_xw.range(target_range_start).value = data_2d
# Logging
num_data_rows = len(sorted_data)
logging.info(f"Filled table {target_name} with {num_data_rows} rows "
f"at {sheet_name}!{start_cell.coordinate} "
f"(bulk duplication + {len(contiguous_groups)} contiguous group(s))")
except Exception as e:
logging.error(f"Error filling table data for {target_name}: {e}")
logging.error(f"Traceback: {traceback.format_exc()}")
def _process_sheet_xlwings(workbook_xw, sheet_config, inclusions_data, organizations_data,
inclusions_mapping_config, organizations_mapping_config, template_vars,
workbook_context=""):
"""
Process a single sheet using xlwings (hybrid approach).
Delegates to specialized helpers to maintain clarity and testability.
Args:
workbook_xw: xlwings Book object
sheet_config: Sheet configuration dict
inclusions_data: List of inclusion dictionaries
organizations_data: List of organization dictionaries
inclusions_mapping_config: Inclusions mapping config (for column mapping)
organizations_mapping_config: Organizations mapping config
template_vars: Dictionary of variables for template evaluation
workbook_context: Context string identifying the workbook (for logging)
Returns:
Boolean True if successful
"""
source_type = sheet_config.get("source_type")
source = sheet_config.get("source")
target_name = sheet_config.get("target_name")
value_replacement = sheet_config.get("value_replacement")
# === Variable sources: single cell fill ===
if source_type == SOURCE_TYPE_VARIABLE:
return _fill_variable_in_sheet(workbook_xw, target_name, source, template_vars, workbook_context)
# === Table sources: bulk data filling ===
if source_type not in [SOURCE_TYPE_INCLUSIONS, SOURCE_TYPE_ORGANIZATIONS]:
return False
# Prepare data: filter, sort, get column mapping
sorted_data, column_mapping = _prepare_table_data(
source_type, source, sheet_config, inclusions_data, organizations_data,
inclusions_mapping_config, organizations_mapping_config, target_name
)
if sorted_data is None or column_mapping is None:
return False
# Get table/named range dimensions from xlwings
try:
table_dims = _get_table_dimensions_xlwings(workbook_xw, target_name)
if not table_dims:
logging.warning(f"Target '{target_name}' not found (neither table nor named range)")
return False
sheet_name, start_cell, table_height, table_width, header_row_count, target_type = table_dims
sheet_xw = workbook_xw.sheets[sheet_name]
start_row = start_cell.row
start_col = start_cell.column
max_col = start_col + table_width - 1
num_data_rows = len(sorted_data)
# === Bulk operations for data filling ===
if sorted_data:
# STEP 0: Resize table/named range to match data dimensions
_resize_table_range(workbook_xw, sheet_xw, target_name, start_cell, max_col, start_row, num_data_rows, header_row_count, target_type)
# STEP 1: Duplicate template row to all data rows
_duplicate_template_row(sheet_xw, start_cell, max_col, start_row, num_data_rows, target_name, workbook_context)
# STEP 2-3: Fill with data (grouped contiguous columns)
_fill_table_with_data(sheet_xw, start_cell, start_row, start_col, sorted_data,
column_mapping, value_replacement, target_name, sheet_name)
else:
# No data - template row stays empty
logging.info(f"No data for target '{target_name}' ({target_type}), leaving template row empty")
return True
except Exception as e:
logging.warning(f"Error processing target '{target_name}': {e}")
logging.error(f"Traceback: {traceback.format_exc()}")
return False
# ============================================================================
# COMPREHENSIVE EXCEL EXPORT ORCHESTRATION (for main script)
# ============================================================================
def prepare_excel_export(inclusions_mapping_config, organizations_mapping_config):
"""
Validate Excel export configuration (no data loading).
This function has a SINGLE responsibility: validate the Excel export CONFIG.
It does NOT load production data (JSONs) - that is the responsibility of
the execution functions (run_normal_mode_export, export_excel_only).
IMPORTANT: Mapping configs MUST be provided by the caller. The caller is responsible for:
1. Loading mapping configs from Excel (e.g., via load_inclusions_mapping_config())
2. Passing them to this function for config validation
This follows the dependency injection pattern: the caller provides dependencies,
this function validates config. This ensures:
- Clear responsibility separation: validation ≠ data loading
- Early CONFIG validation (BEFORE data collection in NORMAL MODE)
- Late DATA loading (AFTER collection, only when needed for execution)
Args:
inclusions_mapping_config: Loaded inclusions mapping (required, non-empty list/dict)
organizations_mapping_config: Loaded organizations mapping (required, non-empty list/dict)
Returns:
Tuple of (excel_config, has_critical_errors, error_messages)
- excel_config: Tuple of (workbooks_config, sheets_config) or None if errors
- has_critical_errors: Boolean True if validation found critical errors
- error_messages: List of error message strings
Note:
JSONs are loaded separately by execution functions:
- NORMAL MODE: run_normal_mode_export() loads JSONs AFTER data collection
- --EXCEL-ONLY: export_excel_only() loads JSONs before execution
"""
error_messages = []
excel_config = None
has_critical_errors = False
# === STEP 1: Validate mapping configurations are provided ===
# Caller is responsible for loading these configs before calling this function
if not inclusions_mapping_config or (isinstance(inclusions_mapping_config, (list, dict)) and len(inclusions_mapping_config) == 0):
error_msg = "Inclusions mapping configuration must be provided and non-empty"
error_messages.append(error_msg)
logging.error(error_msg)
if console:
console.print(f"[bold red]✗ {error_msg}[/bold red]")
has_critical_errors = True
return excel_config, has_critical_errors, error_messages
if not organizations_mapping_config or (isinstance(organizations_mapping_config, (list, dict)) and len(organizations_mapping_config) == 0):
error_msg = "Organizations mapping configuration must be provided and non-empty"
error_messages.append(error_msg)
logging.error(error_msg)
if console:
console.print(f"[bold red]✗ {error_msg}[/bold red]")
has_critical_errors = True
return excel_config, has_critical_errors, error_messages
# === STEP 2: Load Excel config ===
logging.info("Loading Excel export configuration...")
excel_workbooks_config, excel_sheets_config, has_config_error, config_error_messages = load_excel_export_config(console)
if has_config_error:
error_msg = "Critical errors in Excel Export Config"
error_messages.append(error_msg)
error_messages.extend(config_error_messages)
has_critical_errors = True
logging.warning(error_msg)
if console:
console.print(f"[bold red]✗ {error_msg}[/bold red]")
excel_config = (excel_workbooks_config, excel_sheets_config)
return excel_config, has_critical_errors, error_messages
if not excel_workbooks_config or not excel_sheets_config:
error_msg = "Excel export configuration is empty"
error_messages.append(error_msg)
logging.warning(error_msg)
if console:
console.print(f"[bold red]✗ {error_msg}[/bold red]")
excel_config = (excel_workbooks_config, excel_sheets_config)
return excel_config, has_critical_errors, error_messages
# Package config into tuple for downstream functions
excel_config = (excel_workbooks_config, excel_sheets_config)
# === STEP 3: Validate Excel config ===
logging.info("Validating Excel export configuration...")
has_critical_errors, validation_errors = validate_excel_config(
excel_config,
console,
inclusions_mapping_config or [],
organizations_mapping_config or {}
)
if validation_errors:
error_messages.extend(validation_errors)
if has_critical_errors and console:
console.print("[bold red]✗ Critical validation errors found[/bold red]")
else:
logging.info("✓ Excel export configuration validated successfully")
return excel_config, has_critical_errors, error_messages
# ============================================================================
# HIGH-LEVEL ORCHESTRATION FUNCTIONS (for main script integration)
# ============================================================================
def export_excel_only(sys_argv,
inclusions_filename=None, organizations_filename=None,
inclusions_mapping_config=None, organizations_mapping_config=None):
"""
Orchestrates EXCEL_ONLY mode - complete end-to-end Excel export workflow.
This function completely encapsulates the --excel_only mode:
1. Validates Excel configuration
2. Loads JSON data files (must exist)
3. Executes Excel export with error handling
4. Displays user-friendly messages and confirmations
IMPORTANT: The caller (main script) is responsible for loading mapping configs
before calling this function. This ensures consistent config instances across
the application and follows the dependency injection pattern.
This follows the same pattern as run_check_only_mode() from quality_checks module.
Args:
sys_argv: sys.argv from main script (for potential future CLI arg parsing)
inclusions_filename: Name of inclusions JSON file (e.g., "endobest_inclusions.json")
organizations_filename: Name of organizations JSON file (e.g., "endobest_organizations.json")
inclusions_mapping_config: Loaded inclusions mapping configuration (REQUIRED - caller must load)
organizations_mapping_config: Loaded organizations mapping configuration (REQUIRED - caller must load)
"""
global console
if not inclusions_filename:
inclusions_filename = INCLUSIONS_FILE_NAME
if not organizations_filename:
organizations_filename = ORGANIZATIONS_FILE_NAME
print()
console.print("[bold cyan]═══ EXCEL ONLY MODE ═══[/bold cyan]\n")
# Step 1: Validate Excel configuration (no data loading)
logging.info("EXCEL ONLY MODE: Validating Excel configuration")
excel_config, has_config_critical, error_messages = \
prepare_excel_export(inclusions_mapping_config, organizations_mapping_config)
# Step 2: Handle critical configuration errors
if has_config_critical:
print()
console.print("[bold red]⚠ CRITICAL CONFIGURATION ERROR(S) DETECTED[/bold red]")
console.print("[bold red]────────────────────────────────────[/bold red]")
for idx, error_msg in enumerate(error_messages, 1):
console.print(f"[bold red]Error {idx}: {error_msg}[/bold red]")
console.print("[bold red]────────────────────────────────────[/bold red]")
print()
try:
import questionary
answer = questionary.confirm(
"⚠ Continue anyway?",
default=False
).ask()
if not answer:
console.print("[bold red]Aborted by user[/bold red]")
logging.warning("EXCEL ONLY MODE: Aborted by user due to critical errors")
return
except ImportError:
console.print("[bold yellow]⚠ questionary not available for confirmation[/bold yellow]")
console.print("[bold yellow]Proceeding with export despite critical errors[/bold yellow]")
# Step 3: Load JSON data files (must exist in --excel-only mode)
logging.info("EXCEL ONLY MODE: Loading data files")
inclusions_data = _load_json_file_internal(inclusions_filename)
organizations_data = _load_json_file_internal(organizations_filename)
if inclusions_data is None or organizations_data is None:
console.print("[bold red]✗ Error: Could not load data files for Excel export[/bold red]")
logging.error("EXCEL ONLY MODE: Data file loading failed")
return
# Step 4: Execute Excel export (direct call to export_to_excel, console is global)
print()
console.print("[bold cyan]═══ Excel Export ═══[/bold cyan]\n")
logging.info("EXCEL ONLY MODE: Executing export")
if excel_config:
try:
logging.info(f"Starting Excel export: {len(inclusions_data)} inclusions, {len(organizations_data)} organizations")
success, error_count = export_to_excel(
inclusions_data,
organizations_data,
excel_config,
inclusions_mapping_config=inclusions_mapping_config,
organizations_mapping_config=organizations_mapping_config
)
if success:
logging.info("EXCEL ONLY MODE: Export completed successfully")
else:
logging.warning(f"EXCEL ONLY MODE: Export completed with {error_count} error(s)")
except Exception as e:
error_msg = f"Excel export failed: {str(e)}"
logging.error(f"EXCEL ONLY MODE: {error_msg}\n{traceback.format_exc()}")
console.print(f"[bold red]✗ {error_msg}[/bold red]\n")
else:
console.print("[bold red]✗ Could not load Excel configuration[/bold red]\n")
logging.error("EXCEL ONLY MODE: Excel config missing")
def run_normal_mode_export(excel_enabled, excel_config,
inclusions_mapping_config=None, organizations_mapping_config=None):
"""
Orchestrates Excel export during normal mode execution.
This function encapsulates the Excel export step that runs after inclusions and organizations
have been collected and written to JSON files. It handles:
- Loading JSONs from filesystem (ensures fresh data consistency)
- Executing Excel export with comprehensive error handling
- Displaying results to user
This is called from the normal workflow after data collection completes.
Args:
excel_enabled: Boolean indicating if Excel export is enabled
excel_config: Tuple of (workbooks_config, sheets_config) or None
inclusions_mapping_config: Loaded inclusions mapping configuration (optional)
organizations_mapping_config: Loaded organizations mapping configuration (optional)
Note:
This function loads JSON files from the filesystem (which were written
during the data collection phase) to ensure consistency.
Returns:
Tuple of (export_succeeded, error_message)
- export_succeeded: Boolean True if export completed successfully (or skipped)
- error_message: String with error details (empty if success=True or skipped)
"""
global console
# Only proceed if export is enabled and config is available
if not excel_enabled or not excel_config:
logging.info("Excel export not enabled or config missing, skipping")
return True, "" # FIX BUG #3: Return True when export is intentionally skipped (not an error)
print()
console.print("[bold cyan]═══ Excel Export ═══[/bold cyan]\n")
logging.info("NORMAL MODE: Starting Excel export")
try:
# Load JSONs from filesystem to ensure data consistency with what was written
# Use constants imported from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH)
inclusions_from_fs = _load_json_file_internal(INCLUSIONS_FILE_NAME)
organizations_from_fs = _load_json_file_internal(ORGANIZATIONS_FILE_NAME)
if inclusions_from_fs is None or organizations_from_fs is None:
error_msg = "Could not load data files for Excel export"
logging.error(f"NORMAL MODE: {error_msg}")
console.print(f"[bold red]✗ {error_msg}[/bold red]\n")
return False, error_msg
# Execute the export (direct call to export_to_excel, console is global)
logging.info(f"Starting Excel export: {len(inclusions_from_fs)} inclusions, {len(organizations_from_fs)} organizations")
success, error_count = export_to_excel(
inclusions_from_fs,
organizations_from_fs,
excel_config,
inclusions_mapping_config=inclusions_mapping_config,
organizations_mapping_config=organizations_mapping_config
)
if success:
logging.info("NORMAL MODE: Excel export completed successfully")
return True, ""
else:
error_msg = f"Excel export completed with {error_count} error(s)"
logging.warning(f"NORMAL MODE: {error_msg}")
return False, error_msg
except Exception as e:
error_msg = f"Unexpected error during Excel export: {str(e)}"
logging.error(f"NORMAL MODE: {error_msg}\n{traceback.format_exc()}")
console.print(f"[bold red]✗ {error_msg}[/bold red]\n")
return False, error_msg
def _load_json_file_internal(filename):
"""
Internal helper to load JSON file.
Args:
filename: Path to JSON file
Returns:
Parsed JSON data or None if file doesn't exist or can't be parsed
"""
try:
if not os.path.exists(filename):
logging.warning(f"JSON file not found: {filename}")
return None
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
logging.info(f"Loaded {filename}: {len(data) if isinstance(data, list) else 'data'}")
return data
except Exception as e:
logging.error(f"Error loading {filename}: {str(e)}")
return None