""" Endobest Dashboard - Excel Export Module This module handles generation of Excel workbooks from Inclusions and Organizations data. Fully configurable via external Excel configuration file (Endobest_Dashboard_Config.xlsx). Features: - Config-driven workbook generation (no code changes needed) - Support for Variable templates and Table data fills - Configurable filtering, sorting, and value replacement - xlwings-based data processing with automatic formula recalculation - Robust error handling and logging """ import functools import json import logging import os import re import shutil import tempfile import traceback import zipfile from datetime import datetime, timedelta, timezone from time import perf_counter from zoneinfo import ZoneInfo import openpyxl from openpyxl.utils import get_column_letter from rich.console import Console try: import xlwings as xw except ImportError: xw = None from eb_dashboard_utils import get_nested_value, get_config_path from eb_dashboard_constants import ( INCLUSIONS_FILE_NAME, ORGANIZATIONS_FILE_NAME, DASHBOARD_CONFIG_FILE_NAME, EXCEL_WORKBOOKS_TABLE_NAME, EXCEL_SHEETS_TABLE_NAME, OUTPUT_ACTION_OVERWRITE, OUTPUT_ACTION_INCREMENT, OUTPUT_ACTION_BACKUP, OUTPUT_ACTIONS, SOURCE_TYPE_INCLUSIONS, SOURCE_TYPE_ORGANIZATIONS, SOURCE_TYPE_VARIABLE, SOURCE_TYPES, TARGET_TYPE_TABLE, TARGET_TYPE_NAMED_RANGE, EXCEL_COM_MAX_RETRIES, EXCEL_COM_RETRY_DELAY ) # ============================================================================ # CONSTANTS # ============================================================================ EXCEL_OUTPUT_FOLDER = os.getcwd() # Current working directory # ============================================================================ # MODULE DEPENDENCIES (injected from main module) # ============================================================================ console = None # NOTE: Constants imported from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH): # Configuration Files: # - INCLUSIONS_FILE_NAME, ORGANIZATIONS_FILE_NAME, DASHBOARD_CONFIG_FILE_NAME # - EXCEL_WORKBOOKS_TABLE_NAME, EXCEL_SHEETS_TABLE_NAME # Output Handling: # - OUTPUT_ACTION_OVERWRITE, OUTPUT_ACTION_INCREMENT, OUTPUT_ACTION_BACKUP, OUTPUT_ACTIONS # Data Sources: # - SOURCE_TYPE_INCLUSIONS, SOURCE_TYPE_ORGANIZATIONS, SOURCE_TYPE_VARIABLE, SOURCE_TYPES # # NOTE: Mapping table names (INCLUSIONS_MAPPING_TABLE_NAME, ORGANIZATIONS_MAPPING_TABLE_NAME) # are defined in constants but loaded/used in main script (eb_dashboard.py) def set_dependencies(console_instance): """ Inject console instance from main module. Args: console_instance: Rich Console instance for formatted output Note: File and table names are imported directly from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH) """ global console console = console_instance # ============================================================================ # PUBLIC FUNCTIONS # ============================================================================ def load_excel_export_config(console_instance=None): """ Load and validate Excel export configuration from config file. Args: console_instance: Optional Rich Console instance Returns: Tuple of (excel_workbooks_config, excel_sheets_config, has_error, error_messages) - excel_workbooks_config: List of workbook definitions - excel_sheets_config: List of sheet fill definitions - has_error: Boolean flag if critical errors found - error_messages: List of error message strings """ global console if console_instance: console = console_instance config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME) error_messages = [] try: workbook = openpyxl.load_workbook(config_path) except FileNotFoundError: error_msg = f"Error: Configuration file not found at: {config_path}" logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") return None, None, True, [error_msg] # Load Excel_Workbooks sheet if EXCEL_WORKBOOKS_TABLE_NAME not in workbook.sheetnames: error_msg = f"Error: Sheet '{EXCEL_WORKBOOKS_TABLE_NAME}' not found in configuration file." error_messages.append(error_msg) return None, None, True, error_messages excel_workbooks_sheet = workbook[EXCEL_WORKBOOKS_TABLE_NAME] excel_workbooks_config = [] try: headers = [cell.value for cell in excel_workbooks_sheet[1]] for row_index, row in enumerate(excel_workbooks_sheet.iter_rows(min_row=2, values_only=True), start=2): if all(cell is None for cell in row): continue # Skip empty rows workbook_config = dict(zip(headers, row)) # Validate required fields if not workbook_config.get("workbook_id"): error_msg = f"Row {row_index}: 'workbook_id' is mandatory" error_messages.append(error_msg) continue if not workbook_config.get("workbook_template_name"): error_msg = f"Row {row_index}: 'workbook_template_name' is mandatory" error_messages.append(error_msg) continue if not workbook_config.get("output_file_name_template"): error_msg = f"Row {row_index}: 'output_file_name_template' is mandatory" error_messages.append(error_msg) continue if_output_exists = workbook_config.get("if_output_exists", OUTPUT_ACTION_OVERWRITE) if if_output_exists not in OUTPUT_ACTIONS: error_msg = f"Row {row_index}: 'if_output_exists' must be one of {OUTPUT_ACTIONS}" error_messages.append(error_msg) continue excel_workbooks_config.append(workbook_config) except Exception as e: error_msg = f"Error loading Excel_Workbooks sheet: {e}" error_messages.append(error_msg) return None, None, True, error_messages # Load Excel_Sheets sheet if EXCEL_SHEETS_TABLE_NAME not in workbook.sheetnames: error_msg = f"Error: Sheet '{EXCEL_SHEETS_TABLE_NAME}' not found in configuration file." error_messages.append(error_msg) return excel_workbooks_config, None, True, error_messages excel_sheets_sheet = workbook[EXCEL_SHEETS_TABLE_NAME] excel_sheets_config = [] try: headers = [cell.value for cell in excel_sheets_sheet[1]] for row_index, row in enumerate(excel_sheets_sheet.iter_rows(min_row=2, values_only=True), start=2): if all(cell is None for cell in row): continue sheet_config = dict(zip(headers, row)) # Validate required fields if not sheet_config.get("workbook_id"): continue # Skip rows without workbook_id if not sheet_config.get("source_type"): error_msg = f"Row {row_index}: 'source_type' is mandatory" error_messages.append(error_msg) continue source_type = sheet_config["source_type"] if source_type not in SOURCE_TYPES: error_msg = f"Row {row_index}: 'source_type' must be one of {SOURCE_TYPES}" error_messages.append(error_msg) continue if not sheet_config.get("source"): error_msg = f"Row {row_index}: 'source' is mandatory" error_messages.append(error_msg) continue if not sheet_config.get("target_name"): error_msg = f"Row {row_index}: 'target_name' is mandatory" error_messages.append(error_msg) continue # Parse JSON fields has_json_error = False for json_field in ["filter_condition", "sort_keys", "value_replacement"]: value = sheet_config.get(json_field) if value: if isinstance(value, str): try: sheet_config[json_field] = json.loads(value) except json.JSONDecodeError: error_msg = f"Row {row_index}, field '{json_field}': Invalid JSON format" error_messages.append(error_msg) has_json_error = True break # ← Skip this row entirely # else: value is already parsed (dict/list), keep as-is else: # Empty/None value - leave as None or empty sheet_config[json_field] = None if not has_json_error: excel_sheets_config.append(sheet_config) except Exception as e: error_msg = f"Error loading Excel_Sheets sheet: {e}" error_messages.append(error_msg) return excel_workbooks_config, excel_sheets_config, True, error_messages workbook.close() has_error = len(error_messages) > 0 return excel_workbooks_config, excel_sheets_config, has_error, error_messages def validate_excel_config(excel_config, console_instance, inclusions_mapping_config=None, organizations_mapping_config=None): """ Validate Excel export configuration against templates. Args: excel_config: Tuple of (workbooks_config, sheets_config) from load_excel_export_config() console_instance: Rich Console instance inclusions_mapping_config: Loaded inclusions mapping config (optional, for future use) organizations_mapping_config: Loaded organizations mapping config (optional, for future use) Returns: Tuple of (has_critical_error, error_messages) """ global console if console_instance: console = console_instance if not excel_config or not excel_config[0] or not excel_config[1]: return False, [] # No config to validate excel_workbooks_config, excel_sheets_config = excel_config[0], excel_config[1] error_messages = [] # Validate each workbook for workbook_config in excel_workbooks_config: workbook_id = workbook_config.get("workbook_id") template_name = workbook_config.get("workbook_template_name") # Check template exists template_path = os.path.join(get_config_path(), template_name) if not os.path.exists(template_path): error_msg = f"Template '{template_name}' (workbook_id: {workbook_id}) not found in config/" error_messages.append(error_msg) continue # Check template is valid Excel try: template_wb = openpyxl.load_workbook(template_path) except Exception as e: error_msg = f"Template '{template_name}' (workbook_id: {workbook_id}) is not a valid Excel file: {e}" error_messages.append(error_msg) continue # Validate sheets for this workbook workbook_sheets = [s for s in excel_sheets_config if s.get("workbook_id") == workbook_id] for sheet_config in workbook_sheets: target_name = sheet_config.get("target_name") source_type = sheet_config.get("source_type") # Find the target in the template (check both named ranges AND tables) target_found = False if target_name in template_wb.defined_names: target_found = True else: # Check if it's a table in any sheet for sheet in template_wb.sheetnames: sheet_obj = template_wb[sheet] if hasattr(sheet_obj, 'tables') and target_name in sheet_obj.tables: target_found = True break # If target was found, validate based on source type if target_found: # For Variable sources, ensure it's a single cell if source_type == SOURCE_TYPE_VARIABLE: # Check if the defined name references a single cell # NOTE: We still use openpyxl here because template_wb is already open from config loading table_dims = _get_named_range_dimensions(template_wb, target_name) if table_dims: _, _, height, width = table_dims if height != 1 or width != 1: error_msg = f"Target '{target_name}' (template: {template_name}) for Variable source must reference a single cell (found {height}x{width})" error_messages.append(error_msg) # For Table sources (Inclusions/Organizations), validate dimensions elif source_type in [SOURCE_TYPE_INCLUSIONS, SOURCE_TYPE_ORGANIZATIONS]: # Get the dimensions of the named range # NOTE: We still use openpyxl here because template_wb is already open from config loading table_dims = _get_named_range_dimensions(template_wb, target_name) if table_dims: _, _, height, width = table_dims # CRITICAL: Table height MUST be exactly 1 (template row only) if height != 1: error_msg = f"Target '{target_name}' (template: {template_name}, source_type: {source_type}) must have height=1 (found height={height}). " \ f"Template row must be a single row." error_messages.append(error_msg) # CRITICAL: Table width must be >= max(mapping_indices) # Get the mapping column to validate indices source = sheet_config.get("source") if source: mapping_config = inclusions_mapping_config if source_type == SOURCE_TYPE_INCLUSIONS else organizations_mapping_config if mapping_config: column_mapping = _get_column_mapping(mapping_config, source, source_type) if column_mapping: max_col_index = max(column_mapping.keys()) # 0-based index if max_col_index >= width: error_msg = f"Target '{target_name}' (template: {template_name}) width={width} is insufficient. " \ f"Maximum column index from mapping is {max_col_index} (0-based). " \ f"Width must be > {max_col_index}." error_messages.append(error_msg) else: error_msg = f"Named range '{target_name}' (template: {template_name}, workbook_id: {workbook_id}) not found in template" error_messages.append(error_msg) template_wb.close() return len(error_messages) > 0, error_messages def export_to_excel(inclusions_data, organizations_data, excel_config, inclusions_mapping_config=None, organizations_mapping_config=None): """ Main export function - orchestrates Excel workbook generation. Args: inclusions_data: List of inclusion dictionaries organizations_data: List of organization dictionaries excel_config: Tuple of (workbooks_config, sheets_config) inclusions_mapping_config: Inclusions field mapping configuration organizations_mapping_config: Organizations field mapping configuration Returns: Tuple of (success, error_count) Note: Uses global console instance (injected from main script) """ if not excel_config or not excel_config[0] or not excel_config[1]: console.print("[yellow]⚠ No Excel export configuration found, skipping[/yellow]") return True, 0 excel_workbooks_config, excel_sheets_config = excel_config[0], excel_config[1] # Prepare template variables template_vars = _prepare_template_variables() error_count = 0 success_count = 0 # Track overall export duration export_start_time = perf_counter() # Process each workbook for workbook_config in excel_workbooks_config: try: workbook_id = workbook_config.get("workbook_id") template_name = workbook_config.get("workbook_template_name") output_template = workbook_config.get("output_file_name_template") if_output_exists = workbook_config.get("if_output_exists", OUTPUT_ACTION_OVERWRITE) # Resolve output filename try: output_filename = output_template.format(**template_vars) except KeyError as e: console.print(f"[bold red]✗ Unknown variable in template: {e}[/bold red]") error_count += 1 continue output_path = os.path.join(EXCEL_OUTPUT_FOLDER, output_filename) # Log workbook processing start logging.info(f"Processing workbook: {workbook_id} (template: {template_name}, output: {output_filename})") # PHASE PRÉPARATION: Handle existing file according to action output_path = _handle_output_exists(output_path, if_output_exists) # XLWINGS PHASE: Open template, fill, save as output template_path = os.path.join(get_config_path(), template_name) # Track workbook processing duration with spinning status workbook_start_time = perf_counter() try: if xw is None: raise ImportError("xlwings is not installed. Install with: pip install xlwings") # Use status with spinner while processing the workbook with console.status(f"[bold cyan]Exporting {output_filename}...", spinner="dots"): # PERFORMANCE: Make Excel invisible BEFORE opening the workbook app_xw = None screen_updating_original = None visible_original = None try: # Get or create Excel app in invisible mode if xw.apps: app_xw = xw.apps.active visible_original = app_xw.visible screen_updating_original = app_xw.screen_updating else: # Create new app in invisible mode app_xw = xw.App(visible=False) visible_original = False screen_updating_original = True app_xw.visible = False # Make Excel invisible app_xw.screen_updating = False # Disable screen updates except Exception as e: logging.warning(f"Failed to manage Excel visibility: {e}") app_xw = None # Open TEMPLATE directly (not a copy) wb_xw = xw.Book(template_path, update_links=False) try: # CAPTURE TEMPLATE STATE: Save initial state for restoration before save template_state = _capture_workbook_state(wb_xw, workbook_context=f"{workbook_id} ({output_filename})") logging.info(f"Captured template state: active_sheet='{template_state['active_sheet']}', {len(template_state['sheets'])} sheet(s)") # Get sheets for this workbook workbook_sheets = [s for s in excel_sheets_config if s.get("workbook_id") == workbook_id] # Process each sheet with xlwings for sheet_config in workbook_sheets: _process_sheet_xlwings( wb_xw, sheet_config, inclusions_data, organizations_data, inclusions_mapping_config=inclusions_mapping_config, organizations_mapping_config=organizations_mapping_config, template_vars=template_vars, workbook_context=f"{workbook_id} ({output_filename})" ) # RESTORE TEMPLATE STATE: Restore initial state before saving _restore_workbook_state(wb_xw, template_state, workbook_context=f"{workbook_id} ({output_filename})") logging.info(f"Restored template state before save") # Save as output file with forced overwrite (with retry mechanism) # This preserves filesystem versioning for cloud storage # Disable alerts to force silent overwrite abs_output_path = os.path.abspath(output_path) if app_xw: display_alerts_original = app_xw.api.DisplayAlerts app_xw.api.DisplayAlerts = False try: _save_workbook_with_retry(wb_xw, abs_output_path) logging.info(f"Saved workbook to: {abs_output_path}") finally: if app_xw: app_xw.api.DisplayAlerts = display_alerts_original # Excel automatically recalculates formulas on save # No need for separate recalculation step finally: # Always close the workbook and restore visibility/screen updates wb_xw.close() if app_xw is not None: try: if screen_updating_original is not None: app_xw.screen_updating = screen_updating_original if visible_original is not None: app_xw.visible = visible_original except: pass # Calculate duration and display success message workbook_duration = perf_counter() - workbook_start_time console.print(f"[green]✓ Created: {output_filename} ({workbook_duration:.2f}s)[/green]") success_count += 1 except Exception as e: console.print(f"[bold red]✗ Error processing {output_filename}: {e}[/bold red]") logging.error(f"Excel export error for {output_filename}: {e}", exc_info=True) error_count += 1 continue except Exception as e: console.print(f"[bold red]✗ Error processing workbook {workbook_id}: {e}[/bold red]") logging.error(f"Excel workbook processing error: {e}", exc_info=True) error_count += 1 # Summary with total duration total_workbooks = success_count + error_count export_duration = perf_counter() - export_start_time if error_count == 0: # Success: all workbooks processed console.print(f"\n[green]✓ Excel export completed successfully: {success_count}/{total_workbooks} workbooks generated ({export_duration:.2f}s)[/green]") else: # Failure: some or all workbooks failed if success_count > 0: # Partial success console.print(f"\n[yellow]⚠ Excel export completed with errors ({export_duration:.2f}s)[/yellow]") console.print(f"[green] {success_count} workbook(s) generated successfully[/green]") console.print(f"[bold red] {error_count} workbook(s) failed[/bold red]") else: # Complete failure console.print(f"\n[bold red]✗ Excel export failed: all {error_count} workbook(s) failed ({export_duration:.2f}s)[/bold red]") return error_count == 0, error_count # ============================================================================ # INTERNAL FUNCTIONS # ============================================================================ def _prepare_template_variables(): """ Prepare variables available for Template String evaluation. Returns: Dictionary of variables available to .format(**locals()) """ # Get UTC timestamp from inclusions file # Use constant from eb_dashboard_constants (SINGLE SOURCE OF TRUTH) inclusions_file = INCLUSIONS_FILE_NAME if os.path.exists(inclusions_file): file_mtime = os.path.getmtime(inclusions_file) extract_date_time_utc = datetime.fromtimestamp(file_mtime, tz=timezone.utc) else: extract_date_time_utc = datetime.now(tz=timezone.utc) # Convert to Paris timezone extract_date_time_french = extract_date_time_utc.astimezone( ZoneInfo('Europe/Paris') ) return { 'extract_date_time_utc': extract_date_time_utc, 'extract_date_time_french': extract_date_time_french, } def _apply_filter(item, filter_condition): """ Apply filter condition to item (AND logic for all conditions). Args: item: Dictionary to filter filter_condition: List of [field_name, operator, value] conditions Returns: Boolean True if item passes all filters """ if not filter_condition: return True # Empty filter = accept all for field_path, operator, expected_value in filter_condition: actual_value = get_nested_value(item, field_path.split(".")) if actual_value is None: return False # Missing field = filter out # Apply operator if operator == "==": if actual_value != expected_value: return False elif operator == "<>": if actual_value == expected_value: return False elif operator == ">": if not (actual_value > expected_value): return False elif operator == ">=": if not (actual_value >= expected_value): return False elif operator == "<": if not (actual_value < expected_value): return False elif operator == "<=": if not (actual_value <= expected_value): return False return True # All conditions passed def _apply_sort(items, sort_keys): """ Apply multi-key sort to items with support for mixed asc/desc ordering. Args: items: List of dictionaries to sort sort_keys: List of [field_name, order] or [field_name, order, option] where: - order is "asc" or "desc" - option (optional) can be: * datetime format string (e.g., "%Y-%m-%d") for date parsing * "*natsort" for natural alphanumeric sorting Supports MIXED asc/desc on different columns! Returns: Sorted list """ if not sort_keys: return items def natural_sort_key(text): """ Helper for natural alphanumeric sorting. Converts "ENDOBEST-003-920-BA" to ["endobest", "-", 3, "-", 920, "-", "ba"] Python's native list comparison handles the rest element by element. """ def convert(segment): return int(segment) if segment.isdigit() else segment.lower() return [convert(s) for s in re.split(r'(\d+)', str(text)) if s] def compare_items(item1, item2): """ Comparator function for multi-key sorting with mixed asc/desc support. Returns: -1 if item1 < item2, 0 if equal, 1 if item1 > item2 """ for sort_spec in sort_keys: field_name = sort_spec[0] order = sort_spec[1] if len(sort_spec) > 1 else "asc" sort_option = sort_spec[2] if len(sort_spec) > 2 else None # Get values from both items val1 = get_nested_value(item1, field_name.split(".")) val2 = get_nested_value(item2, field_name.split(".")) # Handle undefined/None - place at end is_undef1 = val1 in [None, "", "undefined"] is_undef2 = val2 in [None, "", "undefined"] # Both undefined: equal if is_undef1 and is_undef2: continue # Only one undefined: undefined goes last if is_undef1: return 1 # item1 > item2 (undefined last) if is_undef2: return -1 # item1 < item2 (item2 is undefined) # Check if natural sort requested is_natural_sort = (sort_option == "*natsort") # Parse datetime if option is a datetime format (not *natsort) if sort_option and not is_natural_sort: datetime_format = sort_option try: val1 = datetime.strptime(str(val1), datetime_format).timestamp() except (ValueError, TypeError): val1 = None return 1 # Invalid datetime goes last try: val2 = datetime.strptime(str(val2), datetime_format).timestamp() except (ValueError, TypeError): val2 = None return -1 # Invalid datetime goes last # Apply natural sort transformation if requested if is_natural_sort: val1 = natural_sort_key(val1) val2 = natural_sort_key(val2) # Compare values # For strings (non-natsort), use case-insensitive comparison for natural alphabetical ordering if isinstance(val1, str) and isinstance(val2, str): val1_lower = val1.lower() val2_lower = val2.lower() if val1_lower < val2_lower: cmp_result = -1 elif val1_lower > val2_lower: cmp_result = 1 else: # Case-insensitive equal, use case-sensitive as tiebreaker if val1 < val2: cmp_result = -1 elif val1 > val2: cmp_result = 1 else: cmp_result = 0 else: # Non-string comparison (numbers, dates, natsort lists, etc.) if val1 < val2: cmp_result = -1 elif val1 > val2: cmp_result = 1 else: cmp_result = 0 # Equal, continue to next sort key # Apply asc/desc ordering if cmp_result != 0: is_desc = isinstance(order, str) and order.lower() == "desc" return cmp_result if not is_desc else -cmp_result # All keys are equal return 0 # Use functools.cmp_to_key to convert comparator to key function return sorted(items, key=functools.cmp_to_key(compare_items)) def _apply_value_replacement(value, replacements): """ Apply value replacement rules (first-match-wins, strict type matching). Args: value: Value to potentially replace replacements: List of [value_before, value_after] pairs Returns: Replaced value or original Note: This function is currently prepared for future use in table data filling. """ if not replacements: return value for value_before, value_after in replacements: if value == value_before: # Strict equality return value_after return value # No match, return original # OBSOLETE: _preserve_media_in_workbook() removed - xlwings handles media preservation automatically # When using xlwings, Excel natively preserves all media, images, and relationships def _save_workbook_with_retry(wb_xw, output_path): """ Save workbook with retry mechanism for transient xlwings/Excel failures. Excel's SaveAs can fail randomly on some environments (especially Excel 2013). This function retries the save operation with configurable retry count and delay. Args: wb_xw: xlwings Book object output_path: Absolute path where workbook should be saved Raises: Exception: If SaveAs fails after all retry attempts """ from time import sleep for attempt in range(1, EXCEL_COM_MAX_RETRIES + 1): try: logging.info(f"SaveAs attempt {attempt}/{EXCEL_COM_MAX_RETRIES}: {output_path}") wb_xw.api.SaveAs(output_path) logging.info(f"SaveAs succeeded on attempt {attempt}") return # Success except Exception as e: error_msg = f"SaveAs failed on attempt {attempt}: {type(e).__name__}: {str(e)}" if attempt < EXCEL_COM_MAX_RETRIES: # Intermediate retry - log as warning and sleep before retry logging.warning(f"{error_msg} - Retrying in {EXCEL_COM_RETRY_DELAY}s...") sleep(EXCEL_COM_RETRY_DELAY) else: # Final attempt failed - log as critical error and raise logging.error(f"{error_msg} - All {EXCEL_COM_MAX_RETRIES} retry attempts exhausted") raise def _capture_workbook_state(wb_xw, workbook_context=""): """ Capture the visual state of the workbook (active sheet, selections, scroll positions). This allows restoration of the template's visual state after data processing, ensuring recipients see the workbook exactly as designed in the template. Args: wb_xw: xlwings Book object workbook_context: String identifier for logging (workbook_id and filename) Returns: dict: State dictionary with 'active_sheet' and 'sheets' state per sheet """ ctx = f"[{workbook_context}]" if workbook_context else "" logging.info(f"{ctx} [CAPTURE_STATE] Starting workbook state capture") logging.info(f"{ctx} [CAPTURE_STATE] Total sheets: {len(wb_xw.sheets)}") state = { 'active_sheet': None, 'sheets': {} } try: # Capture active sheet name state['active_sheet'] = wb_xw.api.ActiveSheet.Name logging.info(f"{ctx} [CAPTURE_STATE] Active sheet captured: '{state['active_sheet']}'") except Exception as e: logging.warning(f"{ctx} [CAPTURE_STATE] Could not capture active sheet: {type(e).__name__}: {str(e)}") # Capture state for each sheet for idx, sheet in enumerate(wb_xw.sheets, 1): logging.info(f"{ctx} [CAPTURE_STATE] Processing sheet {idx}/{len(wb_xw.sheets)}: '{sheet.name}'") try: # Activate sheet to get its state sheet.activate() logging.info(f"{ctx} [CAPTURE_STATE] Sheet '{sheet.name}' activated successfully") sheet_api = sheet.api sheet_state = { 'selection': None, 'scroll_row': 1, 'scroll_col': 1 } # Capture selection address try: selection_address = sheet_api.Application.Selection.Address sheet_state['selection'] = selection_address logging.info(f"{ctx} [CAPTURE_STATE] Sheet '{sheet.name}' selection captured: {selection_address}") except Exception as e: sheet_state['selection'] = "A1" # Default logging.warning(f"{ctx} [CAPTURE_STATE] Could not capture selection for sheet '{sheet.name}': {type(e).__name__}, defaulting to A1") # Capture scroll position try: scroll_row = sheet_api.Application.ActiveWindow.ScrollRow scroll_col = sheet_api.Application.ActiveWindow.ScrollColumn sheet_state['scroll_row'] = scroll_row sheet_state['scroll_col'] = scroll_col logging.info(f"{ctx} [CAPTURE_STATE] Sheet '{sheet.name}' scroll position captured: Row={scroll_row}, Col={scroll_col}") except Exception as e: logging.warning(f"{ctx} [CAPTURE_STATE] Could not capture scroll position for sheet '{sheet.name}': {type(e).__name__}, keeping defaults") state['sheets'][sheet.name] = sheet_state logging.info(f"{ctx} [CAPTURE_STATE] Sheet '{sheet.name}' state complete: {sheet_state}") except Exception as e: logging.error(f"{ctx} [CAPTURE_STATE] ERROR capturing state for sheet '{sheet.name}': {type(e).__name__}: {str(e)}") logging.info(f"{ctx} [CAPTURE_STATE] Workbook state capture complete. Captured {len(state['sheets'])} sheet(s)") return state def _restore_workbook_state(wb_xw, state, workbook_context=""): """ Restore the visual state of the workbook (active sheet, selections, scroll positions). Args: wb_xw: xlwings Book object state: State dictionary from _capture_workbook_state() workbook_context: String identifier for logging (workbook_id and filename) """ if not state: logging.warning("[RESTORE_STATE] Empty state provided, skipping restoration") return from time import sleep ctx = f"[{workbook_context}]" if workbook_context else "" logging.info(f"{ctx} [RESTORE_STATE] Starting workbook state restoration") logging.info(f"{ctx} [RESTORE_STATE] Restoring {len(state.get('sheets', {}))} sheet(s)") # NOTE: Screen updating is already disabled at the global level (in export_to_excel) # for the entire workbook processing cycle (from open to save). # We do NOT re-disable it here to avoid state conflicts. # The global setting ensures all operations (capture, process, restore, save) run efficiently. # CRITICAL: Excel 2013 COM layer lock recovery # After bulk paste operations, Excel's COM layer can enter a "locked" state where Range.Select() # fails persistently. This appears to be a fundamental limitation/bug in Excel 2013. # To work around this, we need to: # 1. Give Excel time to recover with a large delay # 2. Then make a "dummy" Range.Select() to wake up the COM layer # 3. Then proceed with real restorations logging.info(f"{ctx} [RESTORE_STATE] Waiting for Excel COM layer to stabilize after bulk operations (2 seconds)...") sleep(2.0) # Large delay to allow COM layer to recover # Track original visibility state (used for temporary visibility during retries) original_app_visible = None try: if wb_xw.app: original_app_visible = wb_xw.app.visible if not original_app_visible: # Make Excel visible during restoration so user sees what's happening # (important for selection restore retries which may take 2+ seconds) wb_xw.app.visible = True logging.info(f"{ctx} [RESTORE_STATE] Excel app temporarily made visible for restoration operations") except Exception as e: logging.warning(f"{ctx} [RESTORE_STATE] Could not manage Excel visibility during restoration: {type(e).__name__}: {str(e)}") # Wake up the COM layer with a dummy selection attempt on the first sheet # This "primes" the COM layer so subsequent Range.Select() calls work reliably try: if len(wb_xw.sheets) > 0: first_sheet = wb_xw.sheets[0] first_sheet.activate() logging.info(f"{ctx} [RESTORE_STATE] Priming COM layer by activating first sheet...") first_sheet.api.Range("$A$1").Select() logging.info(f"{ctx} [RESTORE_STATE] COM layer priming successful") except Exception as e: # This is not critical - if it fails, retries will handle it logging.info(f"{ctx} [RESTORE_STATE] COM layer priming attempt completed (may have failed, retries will handle it)") # Restore state for each sheet for idx, (sheet_name, sheet_state) in enumerate(state.get('sheets', {}).items(), 1): logging.info(f"{ctx} [RESTORE_STATE] Processing sheet {idx}: '{sheet_name}'") try: sheet = wb_xw.sheets[sheet_name] sheet.activate() logging.info(f"{ctx} [RESTORE_STATE] Sheet '{sheet_name}' activated successfully") # Small delay after activation to ensure Excel has completed the sheet switch sleep(0.3) sheet_api = sheet.api # Restore selection with retry mechanism for transient Excel COM failures if sheet_state.get('selection'): selection = sheet_state['selection'] selection_restored = False # Try to restore original selection with retry for attempt in range(1, EXCEL_COM_MAX_RETRIES + 1): try: logging.info(f"{ctx} [RESTORE_STATE] Selection restore attempt {attempt}/{EXCEL_COM_MAX_RETRIES} for '{selection}' on sheet '{sheet_name}'") sheet_api.Range(selection).Select() logging.info(f"{ctx} [RESTORE_STATE] Sheet '{sheet_name}' selection restored to: {selection}") selection_restored = True break # Success except Exception as e: error_msg = f"Selection restore failed on attempt {attempt}: {type(e).__name__}: {str(e)}" if attempt < EXCEL_COM_MAX_RETRIES: # Intermediate retry - log as warning and sleep before retry logging.warning(f"{ctx} [RESTORE_STATE] {error_msg} - Retrying in {EXCEL_COM_RETRY_DELAY}s...") sleep(EXCEL_COM_RETRY_DELAY) else: # Final attempt failed - log as error, will default to A1 logging.error(f"{ctx} [RESTORE_STATE] {error_msg} - All {EXCEL_COM_MAX_RETRIES} retry attempts exhausted") # If selection restore failed after all retries, default to A1 if not selection_restored: logging.warning(f"{ctx} [RESTORE_STATE] Could not restore selection '{selection}' for sheet '{sheet_name}' after {EXCEL_COM_MAX_RETRIES} attempts, defaulting to A1") # Try to set default A1 selection (using absolute reference: $A$1) for attempt in range(1, EXCEL_COM_MAX_RETRIES + 1): try: logging.info(f"{ctx} [RESTORE_STATE] A1 default attempt {attempt}/{EXCEL_COM_MAX_RETRIES} for sheet '{sheet_name}'") sheet_api.Range("$A$1").Select() logging.info(f"{ctx} [RESTORE_STATE] Sheet '{sheet_name}' selection defaulted to A1") break # Success except Exception as e2: error_msg = f"A1 default failed on attempt {attempt}: {type(e2).__name__}: {str(e2)}" if attempt < EXCEL_COM_MAX_RETRIES: logging.warning(f"{ctx} [RESTORE_STATE] {error_msg} - Retrying in {EXCEL_COM_RETRY_DELAY}s...") sleep(EXCEL_COM_RETRY_DELAY) else: logging.error(f"{ctx} [RESTORE_STATE] {error_msg} - All {EXCEL_COM_MAX_RETRIES} retry attempts exhausted") # Restore scroll position try: scroll_row = sheet_state.get('scroll_row', 1) scroll_col = sheet_state.get('scroll_col', 1) sheet_api.Application.ActiveWindow.ScrollRow = scroll_row sheet_api.Application.ActiveWindow.ScrollColumn = scroll_col logging.info(f"{ctx} [RESTORE_STATE] Sheet '{sheet_name}' scroll position restored: Row={scroll_row}, Col={scroll_col}") except Exception as e: logging.warning(f"{ctx} [RESTORE_STATE] Could not restore scroll position for sheet '{sheet_name}': {type(e).__name__}") except Exception as e: logging.error(f"{ctx} [RESTORE_STATE] ERROR restoring state for sheet '{sheet_name}': {type(e).__name__}: {str(e)}") # Restore active sheet if state.get('active_sheet'): try: from time import sleep active_sheet_name = state['active_sheet'] wb_xw.sheets[active_sheet_name].activate() # Wait for sheet activation to complete on Excel 2013's COM layer sleep(0.3) logging.info(f"{ctx} [RESTORE_STATE] Active sheet restored to: '{active_sheet_name}'") except Exception as e: logging.error(f"{ctx} [RESTORE_STATE] Could not restore active sheet '{state.get('active_sheet')}': {type(e).__name__}: {str(e)}") # Force sheet tabs to scroll to show the first sheet # This ensures the tab bar starts from the first sheet, regardless of which sheet is active # NOTE: ScrollWorkbookTabs only works when Excel is visible try: if len(wb_xw.sheets) > 0 and wb_xw.app: logging.info(f"{ctx} [RESTORE_STATE] Attempting to scroll sheet tabs to first sheet") try: # ScrollWorkbookTabs with negative number scrolls tabs LEFT (toward first sheet) # Use large negative number (-100) to guarantee we reach the beginning # Excel visibility is already managed at the beginning of this function wb_xw.api.Application.ActiveWindow.ScrollWorkbookTabs(-100) logging.info(f"{ctx} [RESTORE_STATE] Sheet tabs scrolled to beginning") except Exception as e: logging.warning(f"{ctx} [RESTORE_STATE] Could not scroll sheet tabs to beginning: {type(e).__name__}: {str(e)}") except Exception as e: logging.error(f"{ctx} [RESTORE_STATE] ERROR during sheet tabs scroll operation: {type(e).__name__}: {str(e)}") # Restore original visibility state (if we temporarily made it visible) # NOTE: Screen updating restoration is handled at the global level (in export_to_excel) # after the workbook is saved and closed try: if original_app_visible is not None and wb_xw.app: if not original_app_visible and wb_xw.app.visible: # Restore to hidden state if it was originally hidden wb_xw.app.visible = False logging.info(f"{ctx} [RESTORE_STATE] Excel app visibility restored to original state: False") except Exception as e: logging.warning(f"{ctx} [RESTORE_STATE] Could not restore Excel app visibility: {type(e).__name__}: {str(e)}") logging.info(f"{ctx} [RESTORE_STATE] Workbook state restoration complete") def _handle_output_exists(output_path, action): """ Handle existing output file (Overwrite/Increment/Backup). Args: output_path: Full path to output file action: "Overwrite", "Increment", or "Backup" Returns: Actual path to use (may be different if Increment/Backup) """ if not os.path.exists(output_path): logging.info(f"Output file doesn't exist yet: {output_path}") return output_path logging.info(f"Output file exists, applying '{action}' rule: {output_path}") if action == OUTPUT_ACTION_OVERWRITE: logging.info(f"Overwriting existing file: {output_path}") return output_path elif action == OUTPUT_ACTION_INCREMENT: base, ext = os.path.splitext(output_path) counter = 1 while os.path.exists(f"{base}_{counter}{ext}"): counter += 1 new_path = f"{base}_{counter}{ext}" logging.info(f"Using incremented filename: {new_path}") return new_path elif action == OUTPUT_ACTION_BACKUP: base, ext = os.path.splitext(output_path) counter = 1 backup_path = f"{base}_backup_{counter}{ext}" while os.path.exists(backup_path): counter += 1 backup_path = f"{base}_backup_{counter}{ext}" try: logging.info(f"Backing up existing file to: {backup_path}") shutil.copy2(output_path, backup_path) logging.info(f"Backup successful: {output_path} -> {backup_path}") except Exception as e: logging.error(f"Backup failed: {e}") raise # Return original path - the existing file will be overwritten by SaveAs return output_path return output_path def _get_column_mapping(mapping_config, mapping_column_name, source_type): """ Extract column mapping from Inclusions_Mapping or Organizations_Mapping. The mapping column contains user-friendly 1-based indices (1, 2, 3, ...) indicating which column in the Excel table each field should be placed. These are converted to 0-based indices for internal use. Args: mapping_config: List of mapping config rows (dicts with field_name, etc.) mapping_column_name: Name of the mapping column to extract (e.g., "MainReport_PatientsList") source_type: "Inclusions" or "Organizations" Returns: Dictionary: {excel_column_index: source_field_name} Example: {0: "Patient_Identification.Patient_Id", 1: "Inclusion.Status", ...} Indices are 0-based (converted from 1-based user input) or None if mapping_column not found """ if not mapping_config: return None column_mapping = {} for row in mapping_config: # Get the field name (source field in the JSON) field_name = row.get("field_name") if not field_name: continue # Get the mapping value from the specified column mapping_value = row.get(mapping_column_name) if mapping_value is None or mapping_value == "": continue # Skip empty mappings # Convert mapping_value to integer (1-based user-friendly index) try: user_col_index = int(mapping_value) except (ValueError, TypeError): logging.warning(f"Invalid column index '{mapping_value}' for field '{field_name}'") continue # Convert 1-based to 0-based index excel_col_index = user_col_index - 1 if excel_col_index < 0: logging.warning(f"Column index '{user_col_index}' for field '{field_name}' must be >= 1") continue # Store: excel_column_index -> field_name # Field name needs to be qualified with group for Inclusions # (extracted from the row's field_group if available) if source_type == "Inclusions": field_group = row.get("field_group", "") if field_group: full_field_name = f"{field_group}.{field_name}" else: full_field_name = field_name else: # For Organizations, field_name might already be qualified or standalone full_field_name = field_name column_mapping[excel_col_index] = full_field_name return column_mapping if column_mapping else None def _parse_range_dimensions(start_row, start_col, end_row, end_col, header_row_count=0): """ Shared utility: Calculate dimensions from cell coordinates. Args: start_row, start_col: Starting cell (1-based, after headers) end_row, end_col: Ending cell (1-based) header_row_count: Number of header rows (0 if none) Returns: Tuple of (width, total_height, data_height) """ width = end_col - start_col + 1 total_height = end_row - start_row + 1 data_height = total_height - header_row_count return width, total_height, data_height def _get_named_range_dimensions(workbook, range_name): """ Get dimensions of named range or table in workbook. Args: workbook: openpyxl Workbook object range_name: Name of the named range or table Returns: Tuple of (sheet_name, start_cell, height, width) or None if not found """ # First check for defined named ranges (in openpyxl 3.x) if range_name in workbook.defined_names: defined_name = workbook.defined_names[range_name] # Get the range reference from attr_text (e.g., "Sheet!$A$1:$B$10") range_ref = defined_name.attr_text # Parse: "SheetName!$A$1:$B$10" if '!' in range_ref: sheet_name, cell_range = range_ref.split('!') # Remove quotes if present sheet_name = sheet_name.strip("'\"") # Remove $ signs for parsing cell_range = cell_range.replace('$', '') if sheet_name in workbook.sheetnames: sheet = workbook[sheet_name] # Parse cell range (e.g., "A1:B10" or single "A1") if ':' in cell_range: start_cell_str, end_cell_str = cell_range.split(':') start_cell = sheet[start_cell_str] end_cell = sheet[end_cell_str] width = end_cell.column - start_cell.column + 1 height = end_cell.row - start_cell.row + 1 else: start_cell = sheet[cell_range] width = 1 height = 1 return sheet_name, start_cell, height, width # Check if it's a Table (Excel table object, not just a named range) for sheet_name in workbook.sheetnames: sheet = workbook[sheet_name] if hasattr(sheet, 'tables') and range_name in sheet.tables: table = sheet.tables[range_name] # Table has a 'ref' property with the range (e.g., "A4:F5") # Excel tables can have header rows (default 1, but can be 0) table_ref = table.ref header_row_count = getattr(table, 'headerRowCount', 1) or 0 # 0 if None or False # Parse cell range (e.g., "A4:F5") if ':' in table_ref: start_cell_str, end_cell_str = table_ref.split(':') start_cell_temp = sheet[start_cell_str] end_cell = sheet[end_cell_str] width = end_cell.column - start_cell_temp.column + 1 total_height = end_cell.row - start_cell_temp.row + 1 # Skip header rows: point to first DATA row if header_row_count > 0: data_start_row = start_cell_temp.row + header_row_count start_cell = sheet.cell(row=data_start_row, column=start_cell_temp.column) else: start_cell = start_cell_temp # Calculate data row count (total - headers) height = total_height - header_row_count else: start_cell = sheet[table_ref] width = 1 height = 1 return sheet_name, start_cell, height, width return None # OBSOLETE: _update_named_range_height() removed # This function was only called by the old openpyxl-based _process_sheet() implementation # xlwings uses table.Resize() via COM API instead, which is more reliable # See PHASE 2 migration notes for details # OBSOLETE: _recalculate_workbook() removed - xlwings handles formula recalculation automatically # When using xlwings with wb.save(), Excel automatically recalculates all formulas # OBSOLETE: _process_sheet() removed - openpyxl implementation migrated to xlwings # All sheet processing is now handled by _process_sheet_xlwings() using xlwings library # This eliminates code duplication and provides better preservation of workbook structure def _get_table_dimensions_xlwings(workbook_xw, range_name): """ Get dimensions of an Excel table OR named range using xlwings COM API. First searches for ListObjects (structured tables), then falls back to simple named ranges if no table is found. Args: workbook_xw: xlwings Book object (already open) range_name: Name of the Excel table (ListObject) or named range Returns: Tuple (sheet_name, start_cell, height, width, header_row_count, target_type) or None if not found - start_cell: Points to FIRST DATA ROW (after headers for tables, first row for named ranges) - height: Number of DATA ROWS (excluding headers for tables) - header_row_count: Number of header rows (0 for named ranges, 0 or 1 for tables) - target_type: TARGET_TYPE_TABLE or TARGET_TYPE_NAMED_RANGE Note: - For tables with headers: start_cell points to first data row (after header) - For tables without headers: start_cell points to first row of table - For named ranges: start_cell points to first row (no headers assumed) """ # Helper class to mimic openpyxl Cell behavior class CellRef: def __init__(self, row, column): self.row = row self.column = column @property def coordinate(self): col_letter = get_column_letter(self.column) return f"{col_letter}{self.row}" # === PRIORITY 1: Check if it's a table (ListObject) === # Excel tables are more reliable than plain named ranges with xlwings for sheet in workbook_xw.sheets: sheet_api = sheet.api # Try to get the table count - if this fails, the sheet has no ListObjects property try: table_count = sheet_api.ListObjects.Count except: # Sheet doesn't support ListObjects or has none continue # If no tables in this sheet, skip if table_count == 0: continue # Iterate through tables by index for i in range(1, table_count + 1): # COM indexing starts at 1 try: xl_table = sheet_api.ListObjects.Item(i) table_name = xl_table.Name if table_name == range_name: # Found a table - get its range xl_range = xl_table.Range sheet_name = sheet.name total_rows = xl_range.Rows.Count total_cols = xl_range.Columns.Count start_row = xl_range.Row start_col = xl_range.Column # Get header row count from the table # In COM API, ListObject has ShowHeaders property (boolean) and HeaderRowRange # ShowHeaders: True if table has header row, False if not try: has_headers = xl_table.ShowHeaders header_row_count = 1 if has_headers else 0 except: # If ShowHeaders not accessible, try HeaderRowRange try: header_range = xl_table.HeaderRowRange header_row_count = 1 if header_range is not None else 0 except: # Fallback: assume headers exist (most common case) header_row_count = 1 # Data height = total height - header rows data_height = total_rows - header_row_count # start_cell points to the FIRST DATA ROW (after headers) # If table has headers: skip them. If no headers: start at table start if header_row_count > 0: data_start_row = start_row + header_row_count else: data_start_row = start_row start_cell = CellRef(data_start_row, start_col) logging.info(f"[TABLE FOUND] Located table '{range_name}' at {sheet_name}!{start_cell.coordinate} " f"(data rows: {data_height}, headers: {header_row_count}, total width: {total_cols})") return sheet_name, start_cell, data_height, total_cols, header_row_count, TARGET_TYPE_TABLE except Exception as e: # Error accessing this specific table, skip it logging.warning(f"Error accessing table {i} in '{sheet.name}': {type(e).__name__}") # === PRIORITY 2: Check if it's a named range === # Named ranges don't have headers - data starts at first row try: if range_name in workbook_xw.names: named_range = workbook_xw.names[range_name] target_range = named_range.refers_to_range sheet_name = target_range.sheet.name start_row = target_range.row start_col = target_range.column total_rows = target_range.rows.count total_cols = target_range.columns.count # Named ranges have no headers - all rows are data rows header_row_count = 0 data_height = total_rows start_cell = CellRef(start_row, start_col) logging.info(f"[NAMED RANGE FOUND] Located named range '{range_name}' at {sheet_name}!{start_cell.coordinate} " f"(data rows: {data_height}, no headers, total width: {total_cols})") return sheet_name, start_cell, data_height, total_cols, header_row_count, TARGET_TYPE_NAMED_RANGE except Exception as e: logging.warning(f"Error accessing named range '{range_name}': {type(e).__name__}: {str(e)}") # Range/table not found logging.warning(f"Named range or table '{range_name}' not found in workbook") return None # ============================================================================ # HELPER FUNCTIONS FOR SHEET PROCESSING (extracted from _process_sheet_xlwings) # ============================================================================ def _fill_variable_in_sheet(workbook_xw, target_name, source_template, template_vars, workbook_context=""): """ Fill a single variable cell with evaluated template value. Args: workbook_xw: xlwings Book object target_name: Name of the target named range (single cell) source_template: Template string with {variables} template_vars: Dictionary of variable values workbook_context: Context string for logging Returns: Boolean True if successful """ try: # Evaluate template string cell_value = source_template.format(**template_vars) except KeyError as e: logging.warning(f"Unknown variable in template: {e}") return False # Write to named cell using xlwings try: named_range = workbook_xw.names[target_name] target_range = named_range.refers_to_range target_range.value = cell_value logging.info(f"Set variable '{target_name}' to '{cell_value}'") return True except KeyError: logging.warning(f"Named range '{target_name}' not found in {workbook_context}") return False except Exception as e: logging.warning(f"Error setting variable '{target_name}' in {workbook_context}: {e}") return False def _prepare_table_data(source_type, source, sheet_config, inclusions_data, organizations_data, inclusions_mapping_config, organizations_mapping_config, target_name): """ Prepare table data: select source, apply filter/sort, get column mapping. Args: source_type: Type of source (Inclusions or Organizations) source: Source identifier (mapping name) sheet_config: Sheet configuration dictionary inclusions_data: Inclusions data list organizations_data: Organizations data list inclusions_mapping_config: Inclusions mapping config organizations_mapping_config: Organizations mapping config target_name: Target range name (for logging) Returns: Tuple of (sorted_data, column_mapping) or (None, None) if error """ # Select source data and mapping config if source_type == SOURCE_TYPE_INCLUSIONS: source_data = inclusions_data mapping_config = inclusions_mapping_config else: source_data = organizations_data mapping_config = organizations_mapping_config # Apply filter and sort filter_condition = sheet_config.get("filter_condition") sort_keys = sheet_config.get("sort_keys") filtered_data = [item for item in source_data if _apply_filter(item, filter_condition)] sorted_data = _apply_sort(filtered_data, sort_keys) # Get column mapping column_mapping = _get_column_mapping(mapping_config, source, source_type) if not column_mapping: logging.warning(f"Column mapping '{source}' not found or empty for {target_name}") return None, None return sorted_data, column_mapping def _resize_table_range(workbook_xw, sheet_xw, target_name, start_cell, max_col, start_row, num_data_rows, header_row_count=0, target_type=TARGET_TYPE_TABLE): """ Resize Excel table (ListObject) or named range to match data dimensions. For Tables (ListObjects): Uses ListObject.Resize() COM API For Named Ranges: Redefines the named range via Name.RefersTo property Args: workbook_xw: xlwings Book object (needed for named range resize) sheet_xw: xlwings Sheet object target_name: Name of the table/named range start_cell: Starting cell (CellRef) - points to FIRST DATA ROW (after headers for tables) max_col: Maximum column (1-based) start_row: Starting row (1-based, first data row) num_data_rows: Number of data rows header_row_count: Number of header rows in the table (0 for named ranges) target_type: TARGET_TYPE_TABLE or TARGET_TYPE_NAMED_RANGE Returns: None (logging handles errors) """ if num_data_rows <= 1: return try: # Calculate the last data row last_data_row = start_row + num_data_rows - 1 if target_type == TARGET_TYPE_TABLE: # === TABLE (ListObject) RESIZE === excel_sheet = sheet_xw.api # Find the ListObject (Table) by name for list_obj in excel_sheet.ListObjects: if list_obj.Name == target_name: # If header_row_count not provided (legacy fallback), get it from the table if header_row_count == 0: try: has_headers = list_obj.ShowHeaders header_row_count = 1 if has_headers else 0 except: header_row_count = 1 # For resize, include header rows if they exist if header_row_count > 0: first_row = start_row - header_row_count else: first_row = start_row resize_range_str = f"{get_column_letter(start_cell.column)}{first_row}:{get_column_letter(max_col)}{last_data_row}" # Perform resize via ListObject.Resize() new_range = excel_sheet.Range(resize_range_str) list_obj.Resize(new_range) logging.info(f"Resized table '{target_name}' to {resize_range_str} (header_rows={header_row_count})") break elif target_type == TARGET_TYPE_NAMED_RANGE: # === NAMED RANGE RESIZE === # Redefine the named range to cover all data rows # Named ranges have no headers, so start_row is the first row first_col_letter = get_column_letter(start_cell.column) last_col_letter = get_column_letter(max_col) # Build the range address in A1 style range_address = f"${first_col_letter}${start_row}:${last_col_letter}${last_data_row}" # Get the actual Range object from the sheet and assign it to the Name # This avoids R1C1/A1 format issues by using the Range object directly new_range = sheet_xw.range(range_address) workbook_xw.api.Names(target_name).RefersTo = new_range.api logging.info(f"Resized named range '{target_name}' to {sheet_xw.name}!{range_address}") except Exception as e: logging.warning(f"Resize skipped for {target_name} ({target_type}): {e}") def _duplicate_template_row(sheet_xw, start_cell, max_col, start_row, num_data_rows, target_name, workbook_context=""): """ Duplicate template row to all data rows via copy-paste. Args: sheet_xw: xlwings Sheet object start_cell: Starting cell (CellRef) max_col: Maximum column (1-based) start_row: Starting row (1-based) num_data_rows: Number of data rows target_name: Target range name (for logging) workbook_context: Context string for logging Returns: None (logging handles errors) """ if num_data_rows <= 1: return try: # Replicate template row to all data rows in a single operation template_range_str = f"{get_column_letter(start_cell.column)}{start_row}:{get_column_letter(max_col)}{start_row}" last_data_row = start_row + num_data_rows - 1 full_target_range_str = f"{get_column_letter(start_cell.column)}{start_row}:{get_column_letter(max_col)}{last_data_row}" # Copy template row sheet_xw.range(template_range_str).copy() # Paste to entire range - Excel automatically replicates the template row sheet_xw.range(full_target_range_str).paste() # CRITICAL: Deselect after paste to avoid COM layer lock # After bulk paste on large ranges (85k+ cells), Excel's COM layer becomes saturated # and leaves a massive selection active. This prevents subsequent Range.Select() calls. # Solution: Reset Excel's selection state by switching sheets and back, then select A1. try: from time import sleep logging.info(f"Deselecting range after bulk paste for {target_name}...") # Switch to another sheet to force Excel to reset selection state other_sheets = [s for s in sheet_xw.book.sheets if s.name != sheet_xw.name] if other_sheets: other_sheets[0].activate() sleep(0.1) # Reactivate our sheet - Excel resets selection management when returning sheet_xw.activate() # Select A1 - COM should manage this easily now sheet_xw.api.Range("$A$1").Select() logging.info(f"Successfully deselected after bulk paste for {target_name} (sheet reactivation)") except Exception as e: # Deselection is non-critical, log and continue if it fails logging.warning(f"Deselection after paste failed for {target_name}: {type(e).__name__}: {str(e)}") except Exception as e: logging.warning(f"Template duplication failed for {target_name} in {workbook_context}: {e}") def _fill_table_with_data(sheet_xw, start_cell, start_row, start_col, sorted_data, column_mapping, value_replacement, target_name, sheet_name): """ Fill table with data: group contiguous columns and transfer via bulk 2D arrays. Args: sheet_xw: xlwings Sheet object start_cell: Starting cell (CellRef) start_row: Starting row (1-based) start_col: Starting column (1-based) sorted_data: Sorted list of data items column_mapping: Dict mapping Excel column indices to source field paths value_replacement: Value replacement configuration (or None) target_name: Target range name (for logging) sheet_name: Sheet name (for logging) Returns: None (logging handles errors and success) """ try: # === Prepare column mapping and group contiguous columns === col_order = sorted(column_mapping.keys()) # Group contiguous columns for optimal bulk update contiguous_groups = [] if col_order: current_group = [col_order[0]] for i in range(1, len(col_order)): if col_order[i] == col_order[i-1] + 1: current_group.append(col_order[i]) else: contiguous_groups.append(current_group) current_group = [col_order[i]] contiguous_groups.append(current_group) # === Update contiguous column groups (bulk 2D transfer) === for col_group in contiguous_groups: # Build 2D array for this group: rows × columns data_2d = [] for item in sorted_data: row_values = [] for excel_col_index in col_group: source_field_path = column_mapping[excel_col_index] # Get value from source item value = get_nested_value(item, source_field_path.split(".")) # Apply value replacement if value_replacement: value = _apply_value_replacement(value, value_replacement) row_values.append(value) data_2d.append(row_values) # Transfer entire group to Excel in ONE operation first_col_in_group = start_col + col_group[0] first_col_letter = get_column_letter(first_col_in_group) target_range_start = f"{first_col_letter}{start_row}" # Write 2D array at once (xlwings automatically maps rows × columns) sheet_xw.range(target_range_start).value = data_2d # Logging num_data_rows = len(sorted_data) logging.info(f"Filled table {target_name} with {num_data_rows} rows " f"at {sheet_name}!{start_cell.coordinate} " f"(bulk duplication + {len(contiguous_groups)} contiguous group(s))") except Exception as e: logging.error(f"Error filling table data for {target_name}: {e}") logging.error(f"Traceback: {traceback.format_exc()}") def _process_sheet_xlwings(workbook_xw, sheet_config, inclusions_data, organizations_data, inclusions_mapping_config, organizations_mapping_config, template_vars, workbook_context=""): """ Process a single sheet using xlwings (hybrid approach). Delegates to specialized helpers to maintain clarity and testability. Args: workbook_xw: xlwings Book object sheet_config: Sheet configuration dict inclusions_data: List of inclusion dictionaries organizations_data: List of organization dictionaries inclusions_mapping_config: Inclusions mapping config (for column mapping) organizations_mapping_config: Organizations mapping config template_vars: Dictionary of variables for template evaluation workbook_context: Context string identifying the workbook (for logging) Returns: Boolean True if successful """ source_type = sheet_config.get("source_type") source = sheet_config.get("source") target_name = sheet_config.get("target_name") value_replacement = sheet_config.get("value_replacement") # === Variable sources: single cell fill === if source_type == SOURCE_TYPE_VARIABLE: return _fill_variable_in_sheet(workbook_xw, target_name, source, template_vars, workbook_context) # === Table sources: bulk data filling === if source_type not in [SOURCE_TYPE_INCLUSIONS, SOURCE_TYPE_ORGANIZATIONS]: return False # Prepare data: filter, sort, get column mapping sorted_data, column_mapping = _prepare_table_data( source_type, source, sheet_config, inclusions_data, organizations_data, inclusions_mapping_config, organizations_mapping_config, target_name ) if sorted_data is None or column_mapping is None: return False # Get table/named range dimensions from xlwings try: table_dims = _get_table_dimensions_xlwings(workbook_xw, target_name) if not table_dims: logging.warning(f"Target '{target_name}' not found (neither table nor named range)") return False sheet_name, start_cell, table_height, table_width, header_row_count, target_type = table_dims sheet_xw = workbook_xw.sheets[sheet_name] start_row = start_cell.row start_col = start_cell.column max_col = start_col + table_width - 1 num_data_rows = len(sorted_data) # === Bulk operations for data filling === if sorted_data: # STEP 0: Resize table/named range to match data dimensions _resize_table_range(workbook_xw, sheet_xw, target_name, start_cell, max_col, start_row, num_data_rows, header_row_count, target_type) # STEP 1: Duplicate template row to all data rows _duplicate_template_row(sheet_xw, start_cell, max_col, start_row, num_data_rows, target_name, workbook_context) # STEP 2-3: Fill with data (grouped contiguous columns) _fill_table_with_data(sheet_xw, start_cell, start_row, start_col, sorted_data, column_mapping, value_replacement, target_name, sheet_name) else: # No data - template row stays empty logging.info(f"No data for target '{target_name}' ({target_type}), leaving template row empty") return True except Exception as e: logging.warning(f"Error processing target '{target_name}': {e}") logging.error(f"Traceback: {traceback.format_exc()}") return False # ============================================================================ # COMPREHENSIVE EXCEL EXPORT ORCHESTRATION (for main script) # ============================================================================ def prepare_excel_export(inclusions_mapping_config, organizations_mapping_config): """ Validate Excel export configuration (no data loading). This function has a SINGLE responsibility: validate the Excel export CONFIG. It does NOT load production data (JSONs) - that is the responsibility of the execution functions (run_normal_mode_export, export_excel_only). IMPORTANT: Mapping configs MUST be provided by the caller. The caller is responsible for: 1. Loading mapping configs from Excel (e.g., via load_inclusions_mapping_config()) 2. Passing them to this function for config validation This follows the dependency injection pattern: the caller provides dependencies, this function validates config. This ensures: - Clear responsibility separation: validation ≠ data loading - Early CONFIG validation (BEFORE data collection in NORMAL MODE) - Late DATA loading (AFTER collection, only when needed for execution) Args: inclusions_mapping_config: Loaded inclusions mapping (required, non-empty list/dict) organizations_mapping_config: Loaded organizations mapping (required, non-empty list/dict) Returns: Tuple of (excel_config, has_critical_errors, error_messages) - excel_config: Tuple of (workbooks_config, sheets_config) or None if errors - has_critical_errors: Boolean True if validation found critical errors - error_messages: List of error message strings Note: JSONs are loaded separately by execution functions: - NORMAL MODE: run_normal_mode_export() loads JSONs AFTER data collection - --EXCEL-ONLY: export_excel_only() loads JSONs before execution """ error_messages = [] excel_config = None has_critical_errors = False # === STEP 1: Validate mapping configurations are provided === # Caller is responsible for loading these configs before calling this function if not inclusions_mapping_config or (isinstance(inclusions_mapping_config, (list, dict)) and len(inclusions_mapping_config) == 0): error_msg = "Inclusions mapping configuration must be provided and non-empty" error_messages.append(error_msg) logging.error(error_msg) if console: console.print(f"[bold red]✗ {error_msg}[/bold red]") has_critical_errors = True return excel_config, has_critical_errors, error_messages if not organizations_mapping_config or (isinstance(organizations_mapping_config, (list, dict)) and len(organizations_mapping_config) == 0): error_msg = "Organizations mapping configuration must be provided and non-empty" error_messages.append(error_msg) logging.error(error_msg) if console: console.print(f"[bold red]✗ {error_msg}[/bold red]") has_critical_errors = True return excel_config, has_critical_errors, error_messages # === STEP 2: Load Excel config === logging.info("Loading Excel export configuration...") excel_workbooks_config, excel_sheets_config, has_config_error, config_error_messages = load_excel_export_config(console) if has_config_error: error_msg = "Critical errors in Excel Export Config" error_messages.append(error_msg) error_messages.extend(config_error_messages) has_critical_errors = True logging.warning(error_msg) if console: console.print(f"[bold red]✗ {error_msg}[/bold red]") excel_config = (excel_workbooks_config, excel_sheets_config) return excel_config, has_critical_errors, error_messages if not excel_workbooks_config or not excel_sheets_config: error_msg = "Excel export configuration is empty" error_messages.append(error_msg) logging.warning(error_msg) if console: console.print(f"[bold red]✗ {error_msg}[/bold red]") excel_config = (excel_workbooks_config, excel_sheets_config) return excel_config, has_critical_errors, error_messages # Package config into tuple for downstream functions excel_config = (excel_workbooks_config, excel_sheets_config) # === STEP 3: Validate Excel config === logging.info("Validating Excel export configuration...") has_critical_errors, validation_errors = validate_excel_config( excel_config, console, inclusions_mapping_config or [], organizations_mapping_config or {} ) if validation_errors: error_messages.extend(validation_errors) if has_critical_errors and console: console.print("[bold red]✗ Critical validation errors found[/bold red]") else: logging.info("✓ Excel export configuration validated successfully") return excel_config, has_critical_errors, error_messages # ============================================================================ # HIGH-LEVEL ORCHESTRATION FUNCTIONS (for main script integration) # ============================================================================ def export_excel_only(sys_argv, inclusions_filename=None, organizations_filename=None, inclusions_mapping_config=None, organizations_mapping_config=None): """ Orchestrates EXCEL_ONLY mode - complete end-to-end Excel export workflow. This function completely encapsulates the --excel_only mode: 1. Validates Excel configuration 2. Loads JSON data files (must exist) 3. Executes Excel export with error handling 4. Displays user-friendly messages and confirmations IMPORTANT: The caller (main script) is responsible for loading mapping configs before calling this function. This ensures consistent config instances across the application and follows the dependency injection pattern. This follows the same pattern as run_check_only_mode() from quality_checks module. Args: sys_argv: sys.argv from main script (for potential future CLI arg parsing) inclusions_filename: Name of inclusions JSON file (e.g., "endobest_inclusions.json") organizations_filename: Name of organizations JSON file (e.g., "endobest_organizations.json") inclusions_mapping_config: Loaded inclusions mapping configuration (REQUIRED - caller must load) organizations_mapping_config: Loaded organizations mapping configuration (REQUIRED - caller must load) """ global console if not inclusions_filename: inclusions_filename = INCLUSIONS_FILE_NAME if not organizations_filename: organizations_filename = ORGANIZATIONS_FILE_NAME print() console.print("[bold cyan]═══ EXCEL ONLY MODE ═══[/bold cyan]\n") # Step 1: Validate Excel configuration (no data loading) logging.info("EXCEL ONLY MODE: Validating Excel configuration") excel_config, has_config_critical, error_messages = \ prepare_excel_export(inclusions_mapping_config, organizations_mapping_config) # Step 2: Handle critical configuration errors if has_config_critical: print() console.print("[bold red]⚠ CRITICAL CONFIGURATION ERROR(S) DETECTED[/bold red]") console.print("[bold red]────────────────────────────────────[/bold red]") for idx, error_msg in enumerate(error_messages, 1): console.print(f"[bold red]Error {idx}: {error_msg}[/bold red]") console.print("[bold red]────────────────────────────────────[/bold red]") print() try: import questionary answer = questionary.confirm( "⚠ Continue anyway?", default=False ).ask() if not answer: console.print("[bold red]Aborted by user[/bold red]") logging.warning("EXCEL ONLY MODE: Aborted by user due to critical errors") return except ImportError: console.print("[bold yellow]⚠ questionary not available for confirmation[/bold yellow]") console.print("[bold yellow]Proceeding with export despite critical errors[/bold yellow]") # Step 3: Load JSON data files (must exist in --excel-only mode) logging.info("EXCEL ONLY MODE: Loading data files") inclusions_data = _load_json_file_internal(inclusions_filename) organizations_data = _load_json_file_internal(organizations_filename) if inclusions_data is None or organizations_data is None: console.print("[bold red]✗ Error: Could not load data files for Excel export[/bold red]") logging.error("EXCEL ONLY MODE: Data file loading failed") return # Step 4: Execute Excel export (direct call to export_to_excel, console is global) print() console.print("[bold cyan]═══ Excel Export ═══[/bold cyan]\n") logging.info("EXCEL ONLY MODE: Executing export") if excel_config: try: logging.info(f"Starting Excel export: {len(inclusions_data)} inclusions, {len(organizations_data)} organizations") success, error_count = export_to_excel( inclusions_data, organizations_data, excel_config, inclusions_mapping_config=inclusions_mapping_config, organizations_mapping_config=organizations_mapping_config ) if success: logging.info("EXCEL ONLY MODE: Export completed successfully") else: logging.warning(f"EXCEL ONLY MODE: Export completed with {error_count} error(s)") except Exception as e: error_msg = f"Excel export failed: {str(e)}" logging.error(f"EXCEL ONLY MODE: {error_msg}\n{traceback.format_exc()}") console.print(f"[bold red]✗ {error_msg}[/bold red]\n") else: console.print("[bold red]✗ Could not load Excel configuration[/bold red]\n") logging.error("EXCEL ONLY MODE: Excel config missing") def run_normal_mode_export(excel_enabled, excel_config, inclusions_mapping_config=None, organizations_mapping_config=None): """ Orchestrates Excel export during normal mode execution. This function encapsulates the Excel export step that runs after inclusions and organizations have been collected and written to JSON files. It handles: - Loading JSONs from filesystem (ensures fresh data consistency) - Executing Excel export with comprehensive error handling - Displaying results to user This is called from the normal workflow after data collection completes. Args: excel_enabled: Boolean indicating if Excel export is enabled excel_config: Tuple of (workbooks_config, sheets_config) or None inclusions_mapping_config: Loaded inclusions mapping configuration (optional) organizations_mapping_config: Loaded organizations mapping configuration (optional) Note: This function loads JSON files from the filesystem (which were written during the data collection phase) to ensure consistency. Returns: Tuple of (export_succeeded, error_message) - export_succeeded: Boolean True if export completed successfully (or skipped) - error_message: String with error details (empty if success=True or skipped) """ global console # Only proceed if export is enabled and config is available if not excel_enabled or not excel_config: logging.info("Excel export not enabled or config missing, skipping") return True, "" # FIX BUG #3: Return True when export is intentionally skipped (not an error) print() console.print("[bold cyan]═══ Excel Export ═══[/bold cyan]\n") logging.info("NORMAL MODE: Starting Excel export") try: # Load JSONs from filesystem to ensure data consistency with what was written # Use constants imported from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH) inclusions_from_fs = _load_json_file_internal(INCLUSIONS_FILE_NAME) organizations_from_fs = _load_json_file_internal(ORGANIZATIONS_FILE_NAME) if inclusions_from_fs is None or organizations_from_fs is None: error_msg = "Could not load data files for Excel export" logging.error(f"NORMAL MODE: {error_msg}") console.print(f"[bold red]✗ {error_msg}[/bold red]\n") return False, error_msg # Execute the export (direct call to export_to_excel, console is global) logging.info(f"Starting Excel export: {len(inclusions_from_fs)} inclusions, {len(organizations_from_fs)} organizations") success, error_count = export_to_excel( inclusions_from_fs, organizations_from_fs, excel_config, inclusions_mapping_config=inclusions_mapping_config, organizations_mapping_config=organizations_mapping_config ) if success: logging.info("NORMAL MODE: Excel export completed successfully") return True, "" else: error_msg = f"Excel export completed with {error_count} error(s)" logging.warning(f"NORMAL MODE: {error_msg}") return False, error_msg except Exception as e: error_msg = f"Unexpected error during Excel export: {str(e)}" logging.error(f"NORMAL MODE: {error_msg}\n{traceback.format_exc()}") console.print(f"[bold red]✗ {error_msg}[/bold red]\n") return False, error_msg def _load_json_file_internal(filename): """ Internal helper to load JSON file. Args: filename: Path to JSON file Returns: Parsed JSON data or None if file doesn't exist or can't be parsed """ try: if not os.path.exists(filename): logging.warning(f"JSON file not found: {filename}") return None with open(filename, 'r', encoding='utf-8') as f: data = json.load(f) logging.info(f"Loaded {filename}: {len(data) if isinstance(data, list) else 'data'}") return data except Exception as e: logging.error(f"Error loading {filename}: {str(e)}") return None