""" Endobest Dashboard - Quality Checks Module This module contains all quality assurance functions: - JSON file loading and backup utilities - Coherence checks between organization statistics and detailed inclusion data - Comprehensive non-regression checks with configurable rules - Config-driven validation with Warning/Critical thresholds - Support for special rules (New/Deleted Inclusions, New/Deleted Fields) - 4-step logic for normal rules (field selection, transition matching, exception application, bloc_scope) """ import json import logging import os import shutil import openpyxl from rich.console import Console from eb_dashboard_utils import get_nested_value, get_old_filename as _get_old_filename, get_config_path from eb_dashboard_constants import ( INCLUSIONS_FILE_NAME, ORGANIZATIONS_FILE_NAME, OLD_FILE_SUFFIX, DASHBOARD_CONFIG_FILE_NAME, REGRESSION_CHECK_TABLE_NAME ) # ============================================================================ # MODULE CONFIGURATION # ============================================================================ # Debug mode: Set to True to display detailed changes for each regression check rule # (Variable globale - mutée au runtime, pas une constante) debug_mode = False def enable_debug_mode(): """Enable debug mode to display detailed changes for each regression check rule.""" global debug_mode debug_mode = True if console: console.print("[dim]DEBUG MODE enabled - detailed changes will be displayed[/dim]") # ============================================================================ # MODULE DEPENDENCIES (injected from main module) # ============================================================================ # Will be injected by the main module console = None # Regression check config is loaded on-demand via load_regression_check_config() regression_check_config = [] # NOTE: File names and table names are imported from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH): # - INCLUSIONS_FILE_NAME # - ORGANIZATIONS_FILE_NAME # - OLD_FILE_SUFFIX # - DASHBOARD_CONFIG_FILE_NAME # - REGRESSION_CHECK_TABLE_NAME def set_dependencies(console_instance): """ Inject console instance from main module. Args: console_instance: Rich Console instance for formatted output Note: - File and table names are imported directly from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH) - Regression check config is loaded on-demand via load_regression_check_config() """ global console console = console_instance # ============================================================================ # CONFIGURATION LOADING # ============================================================================ def load_regression_check_config(console_instance=None): """Loads and validates the regression check configuration from the Excel file. Args: console_instance: Optional Rich Console instance. If not provided, uses global console. """ global regression_check_config, console # Use provided console or fall back to global if console_instance: console = console_instance config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME) try: workbook = openpyxl.load_workbook(config_path) except FileNotFoundError: error_msg = f"Error: Configuration file not found at: {config_path}" logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) if REGRESSION_CHECK_TABLE_NAME not in workbook.sheetnames: error_msg = f"Error: Sheet '{REGRESSION_CHECK_TABLE_NAME}' not found in the configuration file." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) sheet = workbook[REGRESSION_CHECK_TABLE_NAME] headers = [cell.value for cell in sheet[1]] temp_config = [] for row_index, row in enumerate(sheet.iter_rows(min_row=2, values_only=True), start=2): rule_config = dict(zip(headers, row)) # Skip if ignore column contains "ignore" (case insensitive) ignore_value = rule_config.get("ignore") if ignore_value and isinstance(ignore_value, str) and "ignore" in ignore_value.lower(): continue # Skip if all columns are None (empty row) if all(value is None for value in row): continue # Validate bloc_title and line_label bloc_title = rule_config.get("bloc_title") line_label = rule_config.get("line_label") if not bloc_title or not isinstance(bloc_title, str): continue # Skip rows without bloc_title (header separators, etc.) if not line_label or not isinstance(line_label, str): error_msg = f"Error in Regression_Check config, row {row_index}: 'line_label' is mandatory when 'bloc_title' is specified." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) # Validate thresholds warning_threshold = rule_config.get("warning_threshold") critical_threshold = rule_config.get("critical_threshold") if warning_threshold is None or not isinstance(warning_threshold, (int, float)) or warning_threshold < 0: error_msg = f"Error in Regression_Check config, row {row_index}: 'warning_threshold' must be a number >= 0." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) if critical_threshold is None or not isinstance(critical_threshold, (int, float)) or critical_threshold < 0: error_msg = f"Error in Regression_Check config, row {row_index}: 'critical_threshold' must be a number >= 0." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) # Parse JSON fields for json_field in ["field_selection", "transitions"]: value = rule_config.get(json_field) if value and isinstance(value, str): try: rule_config[json_field] = json.loads(value) except json.JSONDecodeError: error_msg = f"Error in Regression_Check config, row {row_index}, field '{json_field}': Invalid JSON format." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) elif value is None: rule_config[json_field] = None # Validate field_selection format line_label = rule_config.get("line_label") field_selection = rule_config.get("field_selection") # Special rules that don't use field_selection special_rules_no_selection = ["New Fields", "Deleted Fields", "Deleted Inclusions"] if line_label not in special_rules_no_selection: # Standard rules and "New Inclusions" MUST have field_selection if field_selection is None: error_msg = f"Error in Regression_Check config, row {row_index}: 'field_selection' is mandatory for rule '{line_label}'." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) if not isinstance(field_selection, list): console.print(f"[yellow]⚠ Row {row_index}: 'field_selection' must be a JSON array of [action, selector] pairs, skipping rule[/yellow]") rule_config["_config_error"] = True else: # Validate each field_selection step for step_idx, step in enumerate(field_selection): if not isinstance(step, list) or len(step) != 2: console.print(f"[yellow]⚠ Row {row_index}: field_selection[{step_idx}] must be array of 2 elements [action, selector], skipping rule[/yellow]") rule_config["_config_error"] = True break action, field_selector = step if action not in ["include", "exclude"]: console.print(f"[yellow]⚠ Row {row_index}: field_selection[{step_idx}] action must be 'include' or 'exclude', got '{action}', skipping rule[/yellow]") rule_config["_config_error"] = True break if not isinstance(field_selector, str) or "." not in field_selector: console.print(f"[yellow]⚠ Row {row_index}: field_selection[{step_idx}] selector must be string with dot notation (e.g., '*.*', 'group.*', 'group.field'), got '{field_selector}', skipping rule[/yellow]") rule_config["_config_error"] = True break else: # Special rules should have empty field_selection if field_selection is not None and field_selection != [] and field_selection != "": console.print(f"[yellow]⚠ Row {row_index}: Special rule '{line_label}' should have empty field_selection, got {field_selection}[/yellow]") rule_config["_config_error"] = True # Validate bloc_scope bloc_scope = rule_config.get("bloc_scope") if bloc_scope is not None and bloc_scope not in ["all", "any"]: error_msg = f"Error in Regression_Check config, row {row_index}: 'bloc_scope' must be 'all' or 'any'." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) # Validate transitions format (new pipeline format) # Format: [["include"/"exclude", "field_selector", "from_pattern", "to_pattern"], ...] transitions = rule_config.get("transitions") config_error = False if transitions is not None: if not isinstance(transitions, list): console.print(f"[yellow]⚠ Row {row_index}: 'transitions' must be a JSON array, skipping this rule[/yellow]") config_error = True else: # Validate each transition step for step_idx, transition_step in enumerate(transitions): if not isinstance(transition_step, list) or len(transition_step) != 4: console.print(f"[yellow]⚠ Row {row_index}: transitions[{step_idx}] must be array of 4 elements [action, field_selector, from, to], skipping[/yellow]") config_error = True break action, field_selector, from_val, to_val = transition_step if action not in ["include", "exclude"]: console.print(f"[yellow]⚠ Row {row_index}: transitions[{step_idx}] action must be 'include' or 'exclude', got '{action}', skipping[/yellow]") config_error = True break if not isinstance(field_selector, str) or "." not in field_selector: console.print(f"[yellow]⚠ Row {row_index}: transitions[{step_idx}] field_selector must be string with dot notation (e.g., '*.*', 'group.*', 'group.field'), got '{field_selector}', skipping[/yellow]") config_error = True break if config_error: rule_config["_config_error"] = True temp_config.append(rule_config) regression_check_config = temp_config console.print(f"Loaded {len(regression_check_config)} regression check rules.", style="green") def run_check_only_mode(sys_argv): """ Orchestrates CHECK_ONLY and CHECK_ONLY_COMPARE modes. This function handles the complete workflow for both CHECK_ONLY modes: - CHECK_ONLY: Full validation (coherence + regression) on existing files - CHECK_ONLY_COMPARE: Regression-only comparison of two specific files Args: sys_argv: sys.argv from main script (to parse command-line arguments) """ global console # Initialize console if not already set if console is None: console = Console() print() # Detect CHECK_ONLY_COMPARE mode: --check-only if len(sys_argv) >= 4: # CHECK_ONLY_COMPARE mode: Compare two specific files current_file = sys_argv[2] old_file = sys_argv[3] console.print("[bold cyan]═══ CHECK ONLY COMPARE MODE ═══[/bold cyan]") console.print(f"Comparing two specific files without coherence check:\n") console.print(f" Current: [bold]{current_file}[/bold]") console.print(f" Old: [bold]{old_file}[/bold]\n") # Load only regression check configuration print() load_regression_check_config(console) # Run quality checks with coherence check skipped print() has_coherence_critical, has_regression_critical = run_quality_checks( current_inclusions=current_file, organizations_list=None, old_inclusions_filename=old_file, skip_coherence=True ) # Display summary if has_regression_critical: console.print("[bold red]✗ CRITICAL issues detected![/bold red]") else: console.print("[bold green]✓ All checks passed successfully![/bold green]") else: # Standard CHECK_ONLY mode: Full validation with coherence + regression console.print("[bold cyan]═══ CHECK ONLY MODE ═══[/bold cyan]") console.print("Running quality checks on existing data files without collecting new data.\n") # Load regression check configuration (coherence check doesn't need extended fields) print() load_regression_check_config(console) # Run quality checks (will load all files internally) print() old_inclusions_file = _get_old_filename(INCLUSIONS_FILE_NAME, OLD_FILE_SUFFIX) has_coherence_critical, has_regression_critical = run_quality_checks( current_inclusions=INCLUSIONS_FILE_NAME, organizations_list=ORGANIZATIONS_FILE_NAME, old_inclusions_filename=old_inclusions_file ) # Display summary if has_coherence_critical or has_regression_critical: console.print("[bold red]✗ CRITICAL issues detected![/bold red]") else: console.print("[bold green]✓ All checks passed successfully![/bold green]") # ============================================================================ # FILE UTILITIES # ============================================================================ def load_json_file(filename): """ Loads a JSON file (inclusions, organizations, or any JSON data). Returns the parsed JSON data or None if file doesn't exist or error occurred. Args: filename: Path to the JSON file to load. Returns: Parsed JSON data (list, dict, etc.) or None if file not found or error occurred. """ if os.path.exists(filename): try: with open(filename, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logging.warning(f"Could not load JSON file '{filename}': {e}") console.print(f"[yellow]⚠ Warning: Could not load JSON file '{filename}': {e}[/yellow]") return None def backup_output_files(): """ Silently backups current output files before writing new versions. This is called AFTER all checks pass to avoid losing history on crash. """ def _backup_file_silent(source, destination): """Internal: Silently backup a file if it exists, overwriting destination.""" if os.path.exists(source): try: shutil.copy2(source, destination) except Exception as e: logging.warning(f"Could not backup {source}: {e}") _backup_file_silent(INCLUSIONS_FILE_NAME, _get_old_filename(INCLUSIONS_FILE_NAME, OLD_FILE_SUFFIX)) _backup_file_silent(ORGANIZATIONS_FILE_NAME, _get_old_filename(ORGANIZATIONS_FILE_NAME, OLD_FILE_SUFFIX)) # ============================================================================ # COHERENCE CHECK # ============================================================================ def coherence_check(output_inclusions, organizations_list): """ Checks coherence between organization statistics and actual inclusion details. Displays results with color-coded status. Returns True if any critical issue was found, False otherwise. """ has_critical = False # Track critical status def _get_status_and_style(count, warning_threshold=None, critical_threshold=None): """Internal: Determine status level and visual style.""" nonlocal has_critical if critical_threshold is not None and count > critical_threshold: has_critical = True return "CRITICAL", "red", "✗" elif warning_threshold is not None and count > warning_threshold: return "WARNING", "yellow", "⚠" else: return "OK", "green", "✓" def _print_check_line(message, count=None, status_tuple=None, indent=0): """Internal: Print a formatted check line with emoji and color.""" indent_str = " " * indent if status_tuple: status, color, emoji = status_tuple if count is not None: console.print(f"{indent_str}{emoji} [{color}]{message}: {count}[/{color}]") else: console.print(f"{indent_str}{emoji} [{color}]{message}[/{color}]") else: console.print(f"{indent_str}{message}") def _calculate_detail_counters_with_ap(inclusions_list, org_id=None): """Internal: Calculate actual counters from inclusions detail with AP (prematurely terminated) handling. Rules: - If status ends with ' - AP': increment prematurely_terminated - Else if starts with 'pré-incluse': increment preincluded - Else if starts with 'incluse': increment included - Always increment patients """ patients = 0 preincluded = 0 included = 0 prematurely_terminated = 0 for inclusion in inclusions_list: # Filter by organization if specified if org_id: inc_org_id = get_nested_value(inclusion, ["Patient_Identification", "Organisation_Id"]) if inc_org_id != org_id: continue patients += 1 status = get_nested_value(inclusion, ["Inclusion", "Inclusion_Status"], default="") if isinstance(status, str): # Check if status ends with ' - AP' (prematurely terminated) if status.endswith(" - AP"): prematurely_terminated += 1 # Otherwise apply the normal classification elif status.lower().startswith("pré-incluse"): preincluded += 1 elif status.lower().startswith("incluse"): included += 1 return patients, preincluded, included, prematurely_terminated # Main coherence check logic console.print("\n[bold]═══ Coherence Check ═══[/bold]\n") # Calculate total counters total_stats = { 'patients': sum(org.get('patients_count', 0) for org in organizations_list), 'preincluded': sum(org.get('preincluded_count', 0) for org in organizations_list), 'included': sum(org.get('included_count', 0) for org in organizations_list), 'prematurely_terminated': sum(org.get('prematurely_terminated_count', 0) for org in organizations_list) } total_detail_tuple = _calculate_detail_counters_with_ap(output_inclusions) total_detail = { 'patients': total_detail_tuple[0], 'preincluded': total_detail_tuple[1], 'included': total_detail_tuple[2], 'prematurely_terminated': total_detail_tuple[3] } # Check total (4 counters must match) total_ok = (total_stats['patients'] == total_detail['patients'] and total_stats['preincluded'] == total_detail['preincluded'] and total_stats['included'] == total_detail['included'] and total_stats['prematurely_terminated'] == total_detail['prematurely_terminated']) total_status = _get_status_and_style(0 if total_ok else 1, 0, 0) message = (f"TOTAL - Stats({total_stats['patients']}/{total_stats['preincluded']}/{total_stats['included']}/{total_stats['prematurely_terminated']}) " f"vs Detail({total_detail['patients']}/{total_detail['preincluded']}/{total_detail['included']}/{total_detail['prematurely_terminated']})") _print_check_line(message, status_tuple=total_status, indent=0) # Check each organization (only display if not OK) for org in organizations_list: org_id = org.get('id') org_name = org.get('name', 'Unknown') org_stats = { 'patients': org.get('patients_count', 0), 'preincluded': org.get('preincluded_count', 0), 'included': org.get('included_count', 0), 'prematurely_terminated': org.get('prematurely_terminated_count', 0) } org_detail_tuple = _calculate_detail_counters_with_ap(output_inclusions, org_id) org_detail = { 'patients': org_detail_tuple[0], 'preincluded': org_detail_tuple[1], 'included': org_detail_tuple[2], 'prematurely_terminated': org_detail_tuple[3] } org_ok = (org_stats['patients'] == org_detail['patients'] and org_stats['preincluded'] == org_detail['preincluded'] and org_stats['included'] == org_detail['included'] and org_stats['prematurely_terminated'] == org_detail['prematurely_terminated']) if not org_ok: org_status = _get_status_and_style(1, 0, 0) message = (f"{org_name} - Stats({org_stats['patients']}/{org_stats['preincluded']}/{org_stats['included']}/{org_stats['prematurely_terminated']}) " f"vs Detail({org_detail['patients']}/{org_detail['preincluded']}/{org_detail['included']}/{org_detail['prematurely_terminated']})") _print_check_line(message, status_tuple=org_status, indent=1) return has_critical # ============================================================================ # QUALITY CHECKS ORCHESTRATION # ============================================================================ def run_quality_checks(current_inclusions, organizations_list, old_inclusions_filename, skip_coherence=False): """ Runs coherence and non-regression quality checks on inclusions data. Args: current_inclusions: Either a filename (str) to load inclusions from, or a list of inclusion dictionaries (already in memory) organizations_list: Either a filename (str) to load organizations from, or a list of organization dictionaries (already in memory) old_inclusions_filename: Filename of old inclusions for regression comparison Must be a string (filename) skip_coherence: If True, skip coherence check (default: False) Returns: Tuple of (has_coherence_critical, has_regression_critical) Usage: - Normal mode: run_quality_checks( current_inclusions=output_inclusions, # list (in memory) organizations_list=organizations_list, # list (in memory) old_inclusions_filename=INCLUSIONS_FILE_NAME # str (current file) ) - Check-only mode: run_quality_checks( current_inclusions=INCLUSIONS_FILE_NAME, # str (current file) organizations_list=ORGANIZATIONS_FILE_NAME, # str (organizations file) old_inclusions_filename=get_old_filename(INCLUSIONS_FILE_NAME) # str (old file) ) """ global console, regression_check_config # Auto-load regression config if not already loaded if not regression_check_config: if console is None: console = Console() load_regression_check_config(console) console.print("[bold cyan]══════════════════════════════════════════════════[/bold cyan]") # Load current_inclusions if it's a filename if isinstance(current_inclusions, str): current_inclusions_data = load_json_file(current_inclusions) if current_inclusions_data is None: console.print(f"[bold red]Error: Could not load current inclusions from '{current_inclusions}'[/bold red]") return True, True # Return critical errors if can't load elif isinstance(current_inclusions, list): current_inclusions_data = current_inclusions else: console.print(f"[bold red]Error: current_inclusions must be either a filename (str) or a list of inclusions[/bold red]") return True, True # Load organizations and run coherence check (unless skipped) has_coherence_critical = False if not skip_coherence: # Load organizations_list if it's a filename if isinstance(organizations_list, str): organizations_data = load_json_file(organizations_list) if organizations_data is None: console.print(f"[bold red]Error: Could not load organizations from '{organizations_list}'[/bold red]") return True, True # Return critical errors if can't load elif isinstance(organizations_list, list): organizations_data = organizations_list else: console.print(f"[bold red]Error: organizations_list must be either a filename (str) or a list of organizations[/bold red]") return True, True # Run coherence check has_coherence_critical = coherence_check(current_inclusions_data, organizations_data) # Load and run non-regression check has_regression_critical = non_regression_check(current_inclusions_data, old_inclusions_filename) console.print("[bold cyan]══════════════════════════════════════════════════[/bold cyan]") print() return has_coherence_critical, has_regression_critical # ============================================================================ # NON-REGRESSION CHECK # ============================================================================ def non_regression_check(output_inclusions, old_inclusions_filename): """ Comprehensive config-driven non-regression check comparing current vs old inclusions. Uses rules from regression_check_config loaded from Excel. Returns True if any critical issue was found, False otherwise. Args: output_inclusions: Current inclusions data (list) old_inclusions_filename: Filename of old inclusions JSON file to load """ # Display section header first console.print("\n[bold]═══ Non Regression Check ═══[/bold]\n") # Display loading message and load old inclusions file console.print(f"[dim]Loading old inclusions from: {old_inclusions_filename}[/dim]") old_inclusions = load_json_file(old_inclusions_filename) if old_inclusions is None: console.print(f"[yellow]⚠ No old inclusions file found at '{old_inclusions_filename}', skipping non-regression check[/yellow]") return False has_critical = False # Track critical status # ========== INTERNAL UTILITY FUNCTIONS ========== def _is_undefined(value): """Check if a value is considered undefined.""" return value in [None, "", "undefined"] def _values_are_equal(val1, val2): """ Compare two values with special handling for undefined values. - If both are undefined → considered equal - Otherwise → strict equality """ if _is_undefined(val1) and _is_undefined(val2): return True return val1 == val2 def _apply_pipeline_step(checked_fields, action, field_selector, from_pattern, to_pattern): """Apply one pipeline step to checked_fields list IN-PLACE. Modifies the is_checked status (5th element) of fields matching the selector and transition pattern. Args: checked_fields: List of [group_name, field_name, old_val, new_val, is_checked] MODIFIED IN-PLACE action: "include" or "exclude" field_selector: "*.*", "group.*", or "group.field" from_pattern: "*undefined", "*defined", "*", or literal value to_pattern: "*undefined", "*defined", "*", or literal value Logic: - For each field in checked_fields: - If field matches selector AND transition matches: - if action="include": set is_checked=True - if action="exclude": set is_checked=False - Otherwise: leave is_checked unchanged Returns: None (modifies list in place) """ for i, field_record in enumerate(checked_fields): group_name, field_name, old_val, new_val, is_checked = field_record # Check if this step applies to this field if not _field_selector_matches_pattern(field_selector, group_name, field_name): continue # Check if transition matches if _transition_matches(old_val, new_val, from_pattern, to_pattern): if action == "include": checked_fields[i][4] = True elif action == "exclude": checked_fields[i][4] = False def _transition_matches(old_val, new_val, expected_old, expected_new): """ Check if a transition matches with support for keywords. Keywords supported (start with *): - "*undefined": matches None, "", "undefined" - "*defined": matches any defined value (NOT None, "", "undefined") - "*": matches any value All other values are treated as literal values and matched by exact equality. Args: old_val: Actual old value new_val: Actual new value expected_old: Expected old value or keyword (if starts with *) expected_new: Expected new value or keyword (if starts with *) Returns: True if transition matches """ # Handle old value matching if expected_old == "*undefined": old_matches = old_val in [None, "", "undefined"] elif expected_old == "*defined": old_matches = old_val not in [None, "", "undefined"] elif expected_old == "*": old_matches = True else: # Literal value matching (exact equality) old_matches = (old_val == expected_old) # Handle new value matching if expected_new == "*undefined": new_matches = new_val in [None, "", "undefined"] elif expected_new == "*defined": new_matches = new_val not in [None, "", "undefined"] elif expected_new == "*": new_matches = True else: # Literal value matching (exact equality) new_matches = (new_val == expected_new) return old_matches and new_matches def _check_field_matches_exception(group_name, field_name, old_val, new_val, exception_spec): """ Check if a field matches an exception specification. Now supports both single transitions and multiple transitions per exception. Args: group_name: Field group name field_name: Field name old_val: Old value new_val: New value exception_spec: Exception specification dict with "field" and "transition" Examples: Single: {"field": "Status", "transition": [false, true]} Multiple: {"field": "Status", "transition": [[false, true], [true, false]]} Returns: True if the field and its transition match the exception """ if not isinstance(exception_spec, dict): return False exception_field = exception_spec.get("field") exception_transition = exception_spec.get("transition") if not exception_field or not exception_transition: return False # Parse field specification (format: "field_group.field_name" or just "field_name") if "." in exception_field: exc_group, exc_name = exception_field.split(".", 1) # Must match both group and name if exc_group != group_name or exc_name != field_name: return False else: # Only field name specified, must match field name only if exception_field != field_name: return False # Check if transition matches (now supports multiple transitions) if not isinstance(exception_transition, list): return False # Check if this is array of arrays: [[old1, new1], [old2, new2], ...] if exception_transition and isinstance(exception_transition[0], list): # Multiple transitions for trans_pair in exception_transition: if len(trans_pair) != 2: continue expected_old, expected_new = trans_pair if _transition_matches(old_val, new_val, expected_old, expected_new): return True return False # Legacy support: single transition [old, new] elif len(exception_transition) == 2 and not isinstance(exception_transition[0], list): expected_old, expected_new = exception_transition return _transition_matches(old_val, new_val, expected_old, expected_new) return False def _get_status_and_style(count, warning_threshold, critical_threshold): """Determine status level and visual style.""" nonlocal has_critical if count > critical_threshold: has_critical = True return "CRITICAL", "red", "✗" elif count > warning_threshold: return "WARNING", "yellow", "⚠" else: return "OK", "green", "✓" def _print_block_header(title, status_tuple, indent=0): """Print block header with status.""" indent_str = " " * indent status, color, emoji = status_tuple console.print(f"{indent_str}{emoji} [{color}][bold]{title}[/bold][/{color}]") def _print_check_line(message, count, status_tuple, indent=1): """Print a check line.""" indent_str = " " * indent status, color, emoji = status_tuple console.print(f"{indent_str}{emoji} [{color}]{message}: {count}[/{color}]") def _calculate_block_status(line_statuses): """Calculate overall block status from line statuses.""" if any(s[0] == "CRITICAL" for s in line_statuses): return ("CRITICAL", "red", "✗") elif any(s[0] == "WARNING" for s in line_statuses): return ("WARNING", "yellow", "⚠") else: return ("OK", "green", "✓") # ========== NEW FIELD SELECTION PIPELINE FUNCTIONS ========== def _field_selector_matches_pattern(selector, group_name, field_name): """ Check if a field matches a field_selector pattern. Patterns: - "*.*": matches any field - "group.*": matches any field in specific group - "group.field": matches specific field Args: selector: Field selector pattern string group_name: Actual group name field_name: Actual field name Returns: True if matches, False otherwise """ if selector == "*.*": return True sel_group, sel_field = selector.split(".", 1) # Check group part if sel_group != "*" and sel_group != group_name: return False # Check field part if sel_field == "*": return True return sel_field == field_name def _apply_field_selection_pipeline(all_fields, field_selection_config): """ Apply field_selection pipeline to build candidate_fields. Args: all_fields: List of (group_name, field_name) tuples available field_selection_config: List of [action, field_selector] steps Returns: Set of (group_name, field_name) tuples matching pipeline """ # Start with empty set candidate_fields = set() # If None or empty, return empty (explicit requirement) if not field_selection_config: return candidate_fields # Apply each pipeline step for action, field_selector in field_selection_config: for group_name, field_name in all_fields: # Check if this field matches the selector if _field_selector_matches_pattern(field_selector, group_name, field_name): if action == "include": candidate_fields.add((group_name, field_name)) elif action == "exclude": candidate_fields.discard((group_name, field_name)) return candidate_fields def _get_key_field_from_new_inclusions_rule(rule, new_inclusions_list, old_inclusions_list): """ Determine key field by applying field_selection to first inclusion sample. Logic: 1. Get first inclusion from new and old data (representative sample) 2. Apply field_selection pipeline to both (same as any rule) 3. Return first field that exists with value in BOTH inclusions Assumes inclusion structure is stable across all inclusions (reasonable assumption for database-backed data). Args: rule: "New Inclusions" rule with field_selection config new_inclusions_list: List of new inclusions old_inclusions_list: List of old inclusions Returns: (key_field_name, field_group) tuple Raises: ValueError: If lists empty or no valid key field found """ # Get first inclusion from each (representative sample of structure) if not new_inclusions_list or not old_inclusions_list: raise ValueError("Cannot determine key field: empty inclusion lists") new_inc = new_inclusions_list[0] # First new inclusion old_inc = old_inclusions_list[0] # First old inclusion # Apply field_selection pipeline (SAME AS FOR ANY RULE!) # This respects the full pipeline: include/exclude/wildcards candidate_fields = _build_candidate_fields(new_inc, old_inc, rule.get("field_selection")) if not candidate_fields: raise ValueError( f"field_selection produced no candidate fields. " f"Config: {rule.get('field_selection')}" ) # Try each candidate field in order (sorted for determinism) # Return first field that has non-null value in both inclusions for group_name, field_name in sorted(candidate_fields): new_val = get_nested_value(new_inc, [group_name, field_name]) old_val = get_nested_value(old_inc, [group_name, field_name]) if new_val is not None and old_val is not None: return field_name, group_name # No valid key found raise ValueError( f"No field in field_selection has values in both first new and old inclusion. " f"Candidates from pipeline: {candidate_fields}. " f"Verify field_selection config or data has proper values." ) def _build_inclusion_dict(inclusions_list, key_field, field_group="Patient_Identification"): """ Build dictionary indexed by key field. Args: inclusions_list: List of inclusion dicts key_field: Field name to use as key (e.g., "Patient_Id", "Pseudo") field_group: Group containing the key field (default: "Patient_Identification") Returns: Dict with key values as keys, inclusion dicts as values """ result = {} for inclusion in inclusions_list: key = get_nested_value(inclusion, [field_group, key_field]) if key: result[key] = inclusion return result # ========== TRANSITION MATCHING FUNCTIONS ========== def _matches_transition(old_val, new_val, transitions_config): """Check if (old_val, new_val) matches any configured transition. Uses the helper function _transition_matches for consistency. Supports keywords with asterisk prefix: - *undefined: matches any undefined value (None, "", "undefined") - *defined: matches any defined value (not None, "", or "undefined") - *: wildcard, matches any value All other values are treated as literal values and matched by exact equality. """ if transitions_config is None: return False for transition in transitions_config: expected_old, expected_new = transition if _transition_matches(old_val, new_val, expected_old, expected_new): return True return False # ========== RULE PROCESSING FUNCTIONS ========== def _process_special_rule(rule, line_label, new_dict, old_dict): """ Process special rules: "New Inclusions" and "Deleted Inclusions". These rules simply count the number of keys present in one dict but not the other. Args: rule: Rule configuration (unused for counting, but kept for consistency) line_label: The line label to identify which special rule this is new_dict: Dictionary of new inclusions old_dict: Dictionary of old inclusions Returns: Count of new or deleted inclusions """ if line_label == "New Inclusions": return len(set(new_dict.keys()) - set(old_dict.keys())) elif line_label == "Deleted Inclusions": return len(set(old_dict.keys()) - set(new_dict.keys())) else: # Should not happen, but return 0 for safety return 0 def _process_new_deleted_fields(line_label, new_dict, old_dict): """ Process special rules: "New Fields" and "Deleted Fields". These rules collect all fields that appear/disappear in inclusions, using qualified names "group.field" to distinguish fields across different groups. Note: field_selection is NOT used for these rules (must be empty). Returns a list of tuples: [(field_qualified_name, count_of_inclusions), ...] where count_of_inclusions is the number of inclusions that have this field added/removed. Args: line_label: "New Fields" or "Deleted Fields" new_dict: Dictionary of new inclusions old_dict: Dictionary of old inclusions Returns: List of (qualified_field_name, inclusion_count) tuples """ # Collect field changes across all common inclusions field_counts = {} # qualified_field_name -> count of inclusions # Only examine common inclusions (present in both versions) # Sort for deterministic processing common_keys = sorted(set(new_dict.keys()) & set(old_dict.keys())) for key in common_keys: new_inc = new_dict[key] old_inc = old_dict[key] # Get all groups from both versions # Sort for deterministic processing all_groups = sorted(set(new_inc.keys()) | set(old_inc.keys())) for group_name in all_groups: new_group = new_inc.get(group_name, {}) old_group = old_inc.get(group_name, {}) if not isinstance(new_group, dict): new_group = {} if not isinstance(old_group, dict): old_group = {} new_fields = set(new_group.keys()) old_fields = set(old_group.keys()) # Determine which fields to count based on line_label if line_label == "New Fields": changed_fields = sorted(new_fields - old_fields) elif line_label == "Deleted Fields": changed_fields = sorted(old_fields - new_fields) else: changed_fields = [] # Count each changed field with qualified name (sorted for determinism) for field_name in changed_fields: qualified_name = f"{group_name}.{field_name}" field_counts[qualified_name] = field_counts.get(qualified_name, 0) + 1 # Convert to list of tuples and sort by count (descending) then by name result = sorted(field_counts.items(), key=lambda x: (-x[1], x[0])) return result def _build_candidate_fields(new_inc, old_inc, field_selection_config): """ Helper function to build candidate fields using field_selection pipeline. Args: new_inc: New inclusion dict old_inc: Old inclusion dict field_selection_config: List of [action, field_selector] pipeline steps Returns: Sorted list of (group_name, field_name) tuples that exist in both versions """ # Step 1: Collect all available fields from both versions common_groups = sorted(set(new_inc.keys()) & set(old_inc.keys())) all_available_fields = [] for group_name in common_groups: new_group = new_inc.get(group_name, {}) old_group = old_inc.get(group_name, {}) if not isinstance(new_group, dict): new_group = {} if not isinstance(old_group, dict): old_group = {} # Only fields that exist in both versions common_field_names = sorted(set(new_group.keys()) & set(old_group.keys())) for field_name in common_field_names: all_available_fields.append((group_name, field_name)) # Step 2: Apply field_selection pipeline if not field_selection_config: return [] candidate_fields = _apply_field_selection_pipeline( all_available_fields, field_selection_config ) return sorted(candidate_fields, key=lambda x: (x[0], x[1])) def _process_rule(rule, new_dict, old_dict): """ Process a single regression check rule with correct 4-step logic. Logic: 1. Build candidate fields using field_selection pipeline 2. For each changed field, check if transition matches → mark as "checked" 3. Apply transitions pipeline steps → modify "checked" status 4. Apply bloc_scope (all/any) → count inclusion Only processes common_keys (inclusions present in both new and old dicts). Args: rule: Rule configuration dict new_dict: Dict of new inclusions indexed by key field old_dict: Dict of old inclusions indexed by key field Returns: Tuple of (count, details_list) where: - count: Number of matching inclusions - details_list: List of (inclusion_key, field_changes) tuples for DEBUG_MODE field_changes is list of (group.field, old_val, new_val) tuples """ # Check for config errors first if rule.get("_config_error"): return 0, [] field_selection_config = rule.get("field_selection") bloc_scope = rule.get("bloc_scope") or "any" # Only process inclusions present in both versions common_keys = sorted(set(new_dict.keys()) & set(old_dict.keys())) matching_inclusions_count = 0 details_list = [] # For DEBUG_MODE for key in common_keys: new_inc = new_dict[key] old_inc = old_dict[key] # Step 1: Build candidate fields using field_selection pipeline candidate_fields = _build_candidate_fields(new_inc, old_inc, field_selection_config) # If no candidate fields, skip this inclusion if not candidate_fields: continue # Step 2 & 3: Build initial field list and apply transitions pipeline # Initialize field list with all changed fields # Format: [group_name, field_name, old_val, new_val, is_checked] all_fields_list = [] changed_fields = [] # Track for bloc_scope="all" logic for group_name, field_name in candidate_fields: new_val = get_nested_value(new_inc, [group_name, field_name]) old_val = get_nested_value(old_inc, [group_name, field_name]) # Track if field has changed (for bloc_scope="all" logic) field_has_changed = not _values_are_equal(old_val, new_val) if field_has_changed: changed_fields.append((group_name, field_name)) # Add to all_fields_list with is_checked=False initially all_fields_list.append([group_name, field_name, old_val, new_val, False]) # Apply transitions pipeline: each step modifies is_checked in-place transitions_config = rule.get("transitions", []) if transitions_config and isinstance(transitions_config, list): for action, field_selector, from_val, to_val in transitions_config: _apply_pipeline_step(all_fields_list, action, field_selector, from_val, to_val) # Extract final checked fields checked_fields = [(f[0], f[1], f[2], f[3]) for f in all_fields_list if f[4]] # Step 4: Apply bloc_scope logic inclusion_matches = False if bloc_scope == "all": # ALL fields that CHANGED must match the transition pattern # (unchanged fields don't block the rule) if len(changed_fields) > 0 and len(checked_fields) == len(changed_fields): inclusion_matches = True else: # bloc_scope == "any" # AT LEAST ONE field must be checked if len(checked_fields) > 0: inclusion_matches = True if inclusion_matches: matching_inclusions_count += 1 # Collect details for debug_mode if debug_mode and checked_fields: field_changes = [(f"{gn}.{fn}", ov, nv) for gn, fn, ov, nv in checked_fields] details_list.append((key, field_changes)) return matching_inclusions_count, details_list # ========== MAIN LOGIC ========== # Determine key field from "New Inclusions" rule config key_field = None field_group = None for rule in regression_check_config: if rule.get("line_label") == "New Inclusions": try: key_field, field_group = _get_key_field_from_new_inclusions_rule( rule, output_inclusions, old_inclusions ) break except ValueError as e: console.print(f"[bold red]Error determining key field: {e}[/bold red]") return True # Critical error, trigger user confirmation if not key_field: console.print("[bold red]Error: 'New Inclusions' rule not found or has no valid field_selection[/bold red]") return True # Critical error, trigger user confirmation console.print(f"[dim]Using key field: {field_group}.{key_field}[/dim]\n") new_dict = _build_inclusion_dict(output_inclusions, key_field, field_group) old_dict = _build_inclusion_dict(old_inclusions, key_field, field_group) # Group rules by bloc_title, preserving order of first appearance in regression_check_config blocs = {} bloc_order = [] # Track order of first appearance for rule in regression_check_config: bloc_title = rule["bloc_title"] if bloc_title not in blocs: blocs[bloc_title] = [] bloc_order.append(bloc_title) blocs[bloc_title].append(rule) # Process each bloc in order of first appearance for bloc_title in bloc_order: rules = blocs[bloc_title] line_results = [] for rule in rules: line_label = rule["line_label"] warning_threshold = rule["warning_threshold"] critical_threshold = rule["critical_threshold"] # Detect special rules and route to appropriate processing function if line_label in ["New Inclusions", "Deleted Inclusions"]: # Special rules: just count new/deleted keys count = _process_special_rule(rule, line_label, new_dict, old_dict) line_results.append((line_label, count, None, "simple")) # type: simple count elif line_label in ["New Fields", "Deleted Fields"]: # Special rules: collect field-by-field details field_list = _process_new_deleted_fields(line_label, new_dict, old_dict) # Count is the number of fields detected count = len(field_list) line_results.append((line_label, count, field_list, "fields")) # type: field list else: # Normal rules: apply 4-step logic count, details = _process_rule(rule, new_dict, old_dict) line_results.append((line_label, count, details, "details")) # type: inclusion details # Calculate status for each line now that we have counts line_results_with_status = [] for line_label, count, data, result_type in line_results: # Find the rule to get thresholds rule = next(r for r in rules if r["line_label"] == line_label) warning_threshold = rule["warning_threshold"] critical_threshold = rule["critical_threshold"] status_tuple = _get_status_and_style(count, warning_threshold, critical_threshold) line_results_with_status.append((line_label, count, data, result_type, status_tuple)) # Calculate bloc status bloc_status = _calculate_block_status([result[4] for result in line_results_with_status]) # Display bloc header _print_block_header(bloc_title, bloc_status, indent=0) # Display lines based on bloc and status for line_label, count, data, result_type, status_tuple in line_results_with_status: # Structure bloc shows everything, others only show non-OK lines should_display = (bloc_title == "Structure") or (status_tuple[0] != "OK") if should_display: if result_type == "fields": # Display field list with title and sub-items _print_check_line(line_label, count, status_tuple, indent=1) # Display each field as a sub-item for field_name, inclusion_count in data: console.print(f" {field_name} ({inclusion_count} inclusions)") elif result_type == "details": # Display count _print_check_line(line_label, count, status_tuple, indent=1) # Display detailed changes if debug_mode is enabled and data exists if debug_mode and data and len(data) > 0: for inclusion_key, field_changes in data: console.print(f" [dim]{key_field}: {inclusion_key}[/dim]") for qualified_field, old_val, new_val in field_changes: # Format values for display old_display = f"'{old_val}'" if isinstance(old_val, str) else str(old_val) new_display = f"'{new_val}'" if isinstance(new_val, str) else str(new_val) console.print(f" - {qualified_field}: {old_display} → {new_display}") else: # Simple count display _print_check_line(line_label, count, status_tuple, indent=1) console.print() return has_critical