1314 lines
56 KiB
Python
1314 lines
56 KiB
Python
"""
|
|
Endobest Dashboard - Quality Checks Module
|
|
|
|
This module contains all quality assurance functions:
|
|
- JSON file loading and backup utilities
|
|
- Coherence checks between organization statistics and detailed inclusion data
|
|
- Comprehensive non-regression checks with configurable rules
|
|
- Config-driven validation with Warning/Critical thresholds
|
|
- Support for special rules (New/Deleted Inclusions, New/Deleted Fields)
|
|
- 4-step logic for normal rules (field selection, transition matching, exception application, bloc_scope)
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
|
|
import openpyxl
|
|
from rich.console import Console
|
|
from eb_dashboard_utils import get_nested_value, get_old_filename as _get_old_filename, get_config_path
|
|
from eb_dashboard_constants import (
|
|
INCLUSIONS_FILE_NAME,
|
|
ORGANIZATIONS_FILE_NAME,
|
|
OLD_FILE_SUFFIX,
|
|
DASHBOARD_CONFIG_FILE_NAME,
|
|
REGRESSION_CHECK_TABLE_NAME
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# MODULE CONFIGURATION
|
|
# ============================================================================
|
|
|
|
# Debug mode: Set to True to display detailed changes for each regression check rule
|
|
# (Variable globale - mutée au runtime, pas une constante)
|
|
debug_mode = False
|
|
|
|
|
|
def enable_debug_mode():
|
|
"""Enable debug mode to display detailed changes for each regression check rule."""
|
|
global debug_mode
|
|
debug_mode = True
|
|
if console:
|
|
console.print("[dim]DEBUG MODE enabled - detailed changes will be displayed[/dim]")
|
|
|
|
|
|
# ============================================================================
|
|
# MODULE DEPENDENCIES (injected from main module)
|
|
# ============================================================================
|
|
|
|
# Will be injected by the main module
|
|
console = None
|
|
|
|
# Regression check config is loaded on-demand via load_regression_check_config()
|
|
regression_check_config = []
|
|
|
|
# NOTE: File names and table names are imported from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH):
|
|
# - INCLUSIONS_FILE_NAME
|
|
# - ORGANIZATIONS_FILE_NAME
|
|
# - OLD_FILE_SUFFIX
|
|
# - DASHBOARD_CONFIG_FILE_NAME
|
|
# - REGRESSION_CHECK_TABLE_NAME
|
|
|
|
|
|
def set_dependencies(console_instance):
|
|
"""
|
|
Inject console instance from main module.
|
|
|
|
Args:
|
|
console_instance: Rich Console instance for formatted output
|
|
|
|
Note:
|
|
- File and table names are imported directly from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH)
|
|
- Regression check config is loaded on-demand via load_regression_check_config()
|
|
"""
|
|
global console
|
|
console = console_instance
|
|
|
|
|
|
# ============================================================================
|
|
# CONFIGURATION LOADING
|
|
# ============================================================================
|
|
|
|
def load_regression_check_config(console_instance=None):
|
|
"""Loads and validates the regression check configuration from the Excel file.
|
|
|
|
Args:
|
|
console_instance: Optional Rich Console instance. If not provided, uses global console.
|
|
"""
|
|
global regression_check_config, console
|
|
|
|
# Use provided console or fall back to global
|
|
if console_instance:
|
|
console = console_instance
|
|
|
|
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME)
|
|
|
|
try:
|
|
workbook = openpyxl.load_workbook(config_path)
|
|
except FileNotFoundError:
|
|
error_msg = f"Error: Configuration file not found at: {config_path}"
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
if REGRESSION_CHECK_TABLE_NAME not in workbook.sheetnames:
|
|
error_msg = f"Error: Sheet '{REGRESSION_CHECK_TABLE_NAME}' not found in the configuration file."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
sheet = workbook[REGRESSION_CHECK_TABLE_NAME]
|
|
headers = [cell.value for cell in sheet[1]]
|
|
|
|
temp_config = []
|
|
|
|
for row_index, row in enumerate(sheet.iter_rows(min_row=2, values_only=True), start=2):
|
|
rule_config = dict(zip(headers, row))
|
|
|
|
# Skip if ignore column contains "ignore" (case insensitive)
|
|
ignore_value = rule_config.get("ignore")
|
|
if ignore_value and isinstance(ignore_value, str) and "ignore" in ignore_value.lower():
|
|
continue
|
|
|
|
# Skip if all columns are None (empty row)
|
|
if all(value is None for value in row):
|
|
continue
|
|
|
|
# Validate bloc_title and line_label
|
|
bloc_title = rule_config.get("bloc_title")
|
|
line_label = rule_config.get("line_label")
|
|
|
|
if not bloc_title or not isinstance(bloc_title, str):
|
|
continue # Skip rows without bloc_title (header separators, etc.)
|
|
|
|
if not line_label or not isinstance(line_label, str):
|
|
error_msg = f"Error in Regression_Check config, row {row_index}: 'line_label' is mandatory when 'bloc_title' is specified."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
# Validate thresholds
|
|
warning_threshold = rule_config.get("warning_threshold")
|
|
critical_threshold = rule_config.get("critical_threshold")
|
|
|
|
if warning_threshold is None or not isinstance(warning_threshold, (int, float)) or warning_threshold < 0:
|
|
error_msg = f"Error in Regression_Check config, row {row_index}: 'warning_threshold' must be a number >= 0."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
if critical_threshold is None or not isinstance(critical_threshold, (int, float)) or critical_threshold < 0:
|
|
error_msg = f"Error in Regression_Check config, row {row_index}: 'critical_threshold' must be a number >= 0."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
# Parse JSON fields
|
|
for json_field in ["field_selection", "transitions"]:
|
|
value = rule_config.get(json_field)
|
|
if value and isinstance(value, str):
|
|
try:
|
|
rule_config[json_field] = json.loads(value)
|
|
except json.JSONDecodeError:
|
|
error_msg = f"Error in Regression_Check config, row {row_index}, field '{json_field}': Invalid JSON format."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
elif value is None:
|
|
rule_config[json_field] = None
|
|
|
|
# Validate field_selection format
|
|
line_label = rule_config.get("line_label")
|
|
field_selection = rule_config.get("field_selection")
|
|
|
|
# Special rules that don't use field_selection
|
|
special_rules_no_selection = ["New Fields", "Deleted Fields", "Deleted Inclusions"]
|
|
|
|
if line_label not in special_rules_no_selection:
|
|
# Standard rules and "New Inclusions" MUST have field_selection
|
|
if field_selection is None:
|
|
error_msg = f"Error in Regression_Check config, row {row_index}: 'field_selection' is mandatory for rule '{line_label}'."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
if not isinstance(field_selection, list):
|
|
console.print(f"[yellow]⚠ Row {row_index}: 'field_selection' must be a JSON array of [action, selector] pairs, skipping rule[/yellow]")
|
|
rule_config["_config_error"] = True
|
|
else:
|
|
# Validate each field_selection step
|
|
for step_idx, step in enumerate(field_selection):
|
|
if not isinstance(step, list) or len(step) != 2:
|
|
console.print(f"[yellow]⚠ Row {row_index}: field_selection[{step_idx}] must be array of 2 elements [action, selector], skipping rule[/yellow]")
|
|
rule_config["_config_error"] = True
|
|
break
|
|
|
|
action, field_selector = step
|
|
|
|
if action not in ["include", "exclude"]:
|
|
console.print(f"[yellow]⚠ Row {row_index}: field_selection[{step_idx}] action must be 'include' or 'exclude', got '{action}', skipping rule[/yellow]")
|
|
rule_config["_config_error"] = True
|
|
break
|
|
|
|
if not isinstance(field_selector, str) or "." not in field_selector:
|
|
console.print(f"[yellow]⚠ Row {row_index}: field_selection[{step_idx}] selector must be string with dot notation (e.g., '*.*', 'group.*', 'group.field'), got '{field_selector}', skipping rule[/yellow]")
|
|
rule_config["_config_error"] = True
|
|
break
|
|
else:
|
|
# Special rules should have empty field_selection
|
|
if field_selection is not None and field_selection != [] and field_selection != "":
|
|
console.print(f"[yellow]⚠ Row {row_index}: Special rule '{line_label}' should have empty field_selection, got {field_selection}[/yellow]")
|
|
rule_config["_config_error"] = True
|
|
|
|
# Validate bloc_scope
|
|
bloc_scope = rule_config.get("bloc_scope")
|
|
if bloc_scope is not None and bloc_scope not in ["all", "any"]:
|
|
error_msg = f"Error in Regression_Check config, row {row_index}: 'bloc_scope' must be 'all' or 'any'."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
# Validate transitions format (new pipeline format)
|
|
# Format: [["include"/"exclude", "field_selector", "from_pattern", "to_pattern"], ...]
|
|
transitions = rule_config.get("transitions")
|
|
config_error = False
|
|
|
|
if transitions is not None:
|
|
if not isinstance(transitions, list):
|
|
console.print(f"[yellow]⚠ Row {row_index}: 'transitions' must be a JSON array, skipping this rule[/yellow]")
|
|
config_error = True
|
|
else:
|
|
# Validate each transition step
|
|
for step_idx, transition_step in enumerate(transitions):
|
|
if not isinstance(transition_step, list) or len(transition_step) != 4:
|
|
console.print(f"[yellow]⚠ Row {row_index}: transitions[{step_idx}] must be array of 4 elements [action, field_selector, from, to], skipping[/yellow]")
|
|
config_error = True
|
|
break
|
|
|
|
action, field_selector, from_val, to_val = transition_step
|
|
|
|
if action not in ["include", "exclude"]:
|
|
console.print(f"[yellow]⚠ Row {row_index}: transitions[{step_idx}] action must be 'include' or 'exclude', got '{action}', skipping[/yellow]")
|
|
config_error = True
|
|
break
|
|
|
|
if not isinstance(field_selector, str) or "." not in field_selector:
|
|
console.print(f"[yellow]⚠ Row {row_index}: transitions[{step_idx}] field_selector must be string with dot notation (e.g., '*.*', 'group.*', 'group.field'), got '{field_selector}', skipping[/yellow]")
|
|
config_error = True
|
|
break
|
|
|
|
if config_error:
|
|
rule_config["_config_error"] = True
|
|
|
|
temp_config.append(rule_config)
|
|
|
|
regression_check_config = temp_config
|
|
console.print(f"Loaded {len(regression_check_config)} regression check rules.", style="green")
|
|
|
|
|
|
def run_check_only_mode(sys_argv):
|
|
"""
|
|
Orchestrates CHECK_ONLY and CHECK_ONLY_COMPARE modes.
|
|
|
|
This function handles the complete workflow for both CHECK_ONLY modes:
|
|
- CHECK_ONLY: Full validation (coherence + regression) on existing files
|
|
- CHECK_ONLY_COMPARE: Regression-only comparison of two specific files
|
|
|
|
Args:
|
|
sys_argv: sys.argv from main script (to parse command-line arguments)
|
|
"""
|
|
global console
|
|
|
|
# Initialize console if not already set
|
|
if console is None:
|
|
console = Console()
|
|
|
|
print()
|
|
|
|
# Detect CHECK_ONLY_COMPARE mode: --check-only <file1> <file2>
|
|
if len(sys_argv) >= 4:
|
|
# CHECK_ONLY_COMPARE mode: Compare two specific files
|
|
current_file = sys_argv[2]
|
|
old_file = sys_argv[3]
|
|
|
|
console.print("[bold cyan]═══ CHECK ONLY COMPARE MODE ═══[/bold cyan]")
|
|
console.print(f"Comparing two specific files without coherence check:\n")
|
|
console.print(f" Current: [bold]{current_file}[/bold]")
|
|
console.print(f" Old: [bold]{old_file}[/bold]\n")
|
|
|
|
# Load only regression check configuration
|
|
print()
|
|
load_regression_check_config(console)
|
|
|
|
# Run quality checks with coherence check skipped
|
|
print()
|
|
has_coherence_critical, has_regression_critical = run_quality_checks(
|
|
current_inclusions=current_file,
|
|
organizations_list=None,
|
|
old_inclusions_filename=old_file,
|
|
skip_coherence=True
|
|
)
|
|
|
|
# Display summary
|
|
if has_regression_critical:
|
|
console.print("[bold red]✗ CRITICAL issues detected![/bold red]")
|
|
else:
|
|
console.print("[bold green]✓ All checks passed successfully![/bold green]")
|
|
|
|
else:
|
|
# Standard CHECK_ONLY mode: Full validation with coherence + regression
|
|
console.print("[bold cyan]═══ CHECK ONLY MODE ═══[/bold cyan]")
|
|
console.print("Running quality checks on existing data files without collecting new data.\n")
|
|
|
|
# Load regression check configuration (coherence check doesn't need extended fields)
|
|
print()
|
|
load_regression_check_config(console)
|
|
|
|
# Run quality checks (will load all files internally)
|
|
print()
|
|
old_inclusions_file = _get_old_filename(INCLUSIONS_FILE_NAME, OLD_FILE_SUFFIX)
|
|
has_coherence_critical, has_regression_critical = run_quality_checks(
|
|
current_inclusions=INCLUSIONS_FILE_NAME,
|
|
organizations_list=ORGANIZATIONS_FILE_NAME,
|
|
old_inclusions_filename=old_inclusions_file
|
|
)
|
|
|
|
# Display summary
|
|
if has_coherence_critical or has_regression_critical:
|
|
console.print("[bold red]✗ CRITICAL issues detected![/bold red]")
|
|
else:
|
|
console.print("[bold green]✓ All checks passed successfully![/bold green]")
|
|
|
|
|
|
# ============================================================================
|
|
# FILE UTILITIES
|
|
# ============================================================================
|
|
|
|
def load_json_file(filename):
|
|
"""
|
|
Loads a JSON file (inclusions, organizations, or any JSON data).
|
|
Returns the parsed JSON data or None if file doesn't exist or error occurred.
|
|
|
|
Args:
|
|
filename: Path to the JSON file to load.
|
|
|
|
Returns:
|
|
Parsed JSON data (list, dict, etc.) or None if file not found or error occurred.
|
|
"""
|
|
if os.path.exists(filename):
|
|
try:
|
|
with open(filename, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logging.warning(f"Could not load JSON file '{filename}': {e}")
|
|
console.print(f"[yellow]⚠ Warning: Could not load JSON file '{filename}': {e}[/yellow]")
|
|
return None
|
|
|
|
|
|
def backup_output_files():
|
|
"""
|
|
Silently backups current output files before writing new versions.
|
|
This is called AFTER all checks pass to avoid losing history on crash.
|
|
"""
|
|
def _backup_file_silent(source, destination):
|
|
"""Internal: Silently backup a file if it exists, overwriting destination."""
|
|
if os.path.exists(source):
|
|
try:
|
|
shutil.copy2(source, destination)
|
|
except Exception as e:
|
|
logging.warning(f"Could not backup {source}: {e}")
|
|
|
|
_backup_file_silent(INCLUSIONS_FILE_NAME, _get_old_filename(INCLUSIONS_FILE_NAME, OLD_FILE_SUFFIX))
|
|
_backup_file_silent(ORGANIZATIONS_FILE_NAME, _get_old_filename(ORGANIZATIONS_FILE_NAME, OLD_FILE_SUFFIX))
|
|
|
|
|
|
# ============================================================================
|
|
# COHERENCE CHECK
|
|
# ============================================================================
|
|
|
|
def coherence_check(output_inclusions, organizations_list):
|
|
"""
|
|
Checks coherence between organization statistics and actual inclusion details.
|
|
Displays results with color-coded status.
|
|
Returns True if any critical issue was found, False otherwise.
|
|
"""
|
|
has_critical = False # Track critical status
|
|
|
|
def _get_status_and_style(count, warning_threshold=None, critical_threshold=None):
|
|
"""Internal: Determine status level and visual style."""
|
|
nonlocal has_critical
|
|
if critical_threshold is not None and count > critical_threshold:
|
|
has_critical = True
|
|
return "CRITICAL", "red", "✗"
|
|
elif warning_threshold is not None and count > warning_threshold:
|
|
return "WARNING", "yellow", "⚠"
|
|
else:
|
|
return "OK", "green", "✓"
|
|
|
|
def _print_check_line(message, count=None, status_tuple=None, indent=0):
|
|
"""Internal: Print a formatted check line with emoji and color."""
|
|
indent_str = " " * indent
|
|
if status_tuple:
|
|
status, color, emoji = status_tuple
|
|
if count is not None:
|
|
console.print(f"{indent_str}{emoji} [{color}]{message}: {count}[/{color}]")
|
|
else:
|
|
console.print(f"{indent_str}{emoji} [{color}]{message}[/{color}]")
|
|
else:
|
|
console.print(f"{indent_str}{message}")
|
|
|
|
def _calculate_detail_counters_with_ap(inclusions_list, org_id=None):
|
|
"""Internal: Calculate actual counters from inclusions detail with AP (prematurely terminated) handling.
|
|
|
|
Rules:
|
|
- If status ends with ' - AP': increment prematurely_terminated
|
|
- Else if starts with 'pré-incluse': increment preincluded
|
|
- Else if starts with 'incluse': increment included
|
|
- Always increment patients
|
|
"""
|
|
patients = 0
|
|
preincluded = 0
|
|
included = 0
|
|
prematurely_terminated = 0
|
|
|
|
for inclusion in inclusions_list:
|
|
# Filter by organization if specified
|
|
if org_id:
|
|
inc_org_id = get_nested_value(inclusion, ["Patient_Identification", "Organisation_Id"])
|
|
if inc_org_id != org_id:
|
|
continue
|
|
|
|
patients += 1
|
|
status = get_nested_value(inclusion, ["Inclusion", "Inclusion_Status"], default="")
|
|
|
|
if isinstance(status, str):
|
|
# Check if status ends with ' - AP' (prematurely terminated)
|
|
if status.endswith(" - AP"):
|
|
prematurely_terminated += 1
|
|
# Otherwise apply the normal classification
|
|
elif status.lower().startswith("pré-incluse"):
|
|
preincluded += 1
|
|
elif status.lower().startswith("incluse"):
|
|
included += 1
|
|
|
|
return patients, preincluded, included, prematurely_terminated
|
|
|
|
# Main coherence check logic
|
|
console.print("\n[bold]═══ Coherence Check ═══[/bold]\n")
|
|
|
|
# Calculate total counters
|
|
total_stats = {
|
|
'patients': sum(org.get('patients_count', 0) for org in organizations_list),
|
|
'preincluded': sum(org.get('preincluded_count', 0) for org in organizations_list),
|
|
'included': sum(org.get('included_count', 0) for org in organizations_list),
|
|
'prematurely_terminated': sum(org.get('prematurely_terminated_count', 0) for org in organizations_list)
|
|
}
|
|
|
|
total_detail_tuple = _calculate_detail_counters_with_ap(output_inclusions)
|
|
total_detail = {
|
|
'patients': total_detail_tuple[0],
|
|
'preincluded': total_detail_tuple[1],
|
|
'included': total_detail_tuple[2],
|
|
'prematurely_terminated': total_detail_tuple[3]
|
|
}
|
|
|
|
# Check total (4 counters must match)
|
|
total_ok = (total_stats['patients'] == total_detail['patients'] and
|
|
total_stats['preincluded'] == total_detail['preincluded'] and
|
|
total_stats['included'] == total_detail['included'] and
|
|
total_stats['prematurely_terminated'] == total_detail['prematurely_terminated'])
|
|
|
|
total_status = _get_status_and_style(0 if total_ok else 1, 0, 0)
|
|
|
|
message = (f"TOTAL - Stats({total_stats['patients']}/{total_stats['preincluded']}/{total_stats['included']}/{total_stats['prematurely_terminated']}) "
|
|
f"vs Detail({total_detail['patients']}/{total_detail['preincluded']}/{total_detail['included']}/{total_detail['prematurely_terminated']})")
|
|
_print_check_line(message, status_tuple=total_status, indent=0)
|
|
|
|
# Check each organization (only display if not OK)
|
|
for org in organizations_list:
|
|
org_id = org.get('id')
|
|
org_name = org.get('name', 'Unknown')
|
|
|
|
org_stats = {
|
|
'patients': org.get('patients_count', 0),
|
|
'preincluded': org.get('preincluded_count', 0),
|
|
'included': org.get('included_count', 0),
|
|
'prematurely_terminated': org.get('prematurely_terminated_count', 0)
|
|
}
|
|
|
|
org_detail_tuple = _calculate_detail_counters_with_ap(output_inclusions, org_id)
|
|
org_detail = {
|
|
'patients': org_detail_tuple[0],
|
|
'preincluded': org_detail_tuple[1],
|
|
'included': org_detail_tuple[2],
|
|
'prematurely_terminated': org_detail_tuple[3]
|
|
}
|
|
|
|
org_ok = (org_stats['patients'] == org_detail['patients'] and
|
|
org_stats['preincluded'] == org_detail['preincluded'] and
|
|
org_stats['included'] == org_detail['included'] and
|
|
org_stats['prematurely_terminated'] == org_detail['prematurely_terminated'])
|
|
|
|
if not org_ok:
|
|
org_status = _get_status_and_style(1, 0, 0)
|
|
message = (f"{org_name} - Stats({org_stats['patients']}/{org_stats['preincluded']}/{org_stats['included']}/{org_stats['prematurely_terminated']}) "
|
|
f"vs Detail({org_detail['patients']}/{org_detail['preincluded']}/{org_detail['included']}/{org_detail['prematurely_terminated']})")
|
|
_print_check_line(message, status_tuple=org_status, indent=1)
|
|
|
|
return has_critical
|
|
|
|
|
|
# ============================================================================
|
|
# QUALITY CHECKS ORCHESTRATION
|
|
# ============================================================================
|
|
|
|
def run_quality_checks(current_inclusions, organizations_list, old_inclusions_filename, skip_coherence=False):
|
|
"""
|
|
Runs coherence and non-regression quality checks on inclusions data.
|
|
|
|
Args:
|
|
current_inclusions: Either a filename (str) to load inclusions from,
|
|
or a list of inclusion dictionaries (already in memory)
|
|
organizations_list: Either a filename (str) to load organizations from,
|
|
or a list of organization dictionaries (already in memory)
|
|
old_inclusions_filename: Filename of old inclusions for regression comparison
|
|
Must be a string (filename)
|
|
skip_coherence: If True, skip coherence check (default: False)
|
|
|
|
Returns:
|
|
Tuple of (has_coherence_critical, has_regression_critical)
|
|
|
|
Usage:
|
|
- Normal mode:
|
|
run_quality_checks(
|
|
current_inclusions=output_inclusions, # list (in memory)
|
|
organizations_list=organizations_list, # list (in memory)
|
|
old_inclusions_filename=INCLUSIONS_FILE_NAME # str (current file)
|
|
)
|
|
|
|
- Check-only mode:
|
|
run_quality_checks(
|
|
current_inclusions=INCLUSIONS_FILE_NAME, # str (current file)
|
|
organizations_list=ORGANIZATIONS_FILE_NAME, # str (organizations file)
|
|
old_inclusions_filename=get_old_filename(INCLUSIONS_FILE_NAME) # str (old file)
|
|
)
|
|
"""
|
|
global console, regression_check_config
|
|
|
|
# Auto-load regression config if not already loaded
|
|
if not regression_check_config:
|
|
if console is None:
|
|
console = Console()
|
|
load_regression_check_config(console)
|
|
|
|
console.print("[bold cyan]══════════════════════════════════════════════════[/bold cyan]")
|
|
|
|
# Load current_inclusions if it's a filename
|
|
if isinstance(current_inclusions, str):
|
|
current_inclusions_data = load_json_file(current_inclusions)
|
|
if current_inclusions_data is None:
|
|
console.print(f"[bold red]Error: Could not load current inclusions from '{current_inclusions}'[/bold red]")
|
|
return True, True # Return critical errors if can't load
|
|
elif isinstance(current_inclusions, list):
|
|
current_inclusions_data = current_inclusions
|
|
else:
|
|
console.print(f"[bold red]Error: current_inclusions must be either a filename (str) or a list of inclusions[/bold red]")
|
|
return True, True
|
|
|
|
# Load organizations and run coherence check (unless skipped)
|
|
has_coherence_critical = False
|
|
if not skip_coherence:
|
|
# Load organizations_list if it's a filename
|
|
if isinstance(organizations_list, str):
|
|
organizations_data = load_json_file(organizations_list)
|
|
if organizations_data is None:
|
|
console.print(f"[bold red]Error: Could not load organizations from '{organizations_list}'[/bold red]")
|
|
return True, True # Return critical errors if can't load
|
|
elif isinstance(organizations_list, list):
|
|
organizations_data = organizations_list
|
|
else:
|
|
console.print(f"[bold red]Error: organizations_list must be either a filename (str) or a list of organizations[/bold red]")
|
|
return True, True
|
|
|
|
# Run coherence check
|
|
has_coherence_critical = coherence_check(current_inclusions_data, organizations_data)
|
|
|
|
# Load and run non-regression check
|
|
has_regression_critical = non_regression_check(current_inclusions_data, old_inclusions_filename)
|
|
|
|
console.print("[bold cyan]══════════════════════════════════════════════════[/bold cyan]")
|
|
print()
|
|
|
|
return has_coherence_critical, has_regression_critical
|
|
|
|
|
|
# ============================================================================
|
|
# NON-REGRESSION CHECK
|
|
# ============================================================================
|
|
|
|
def non_regression_check(output_inclusions, old_inclusions_filename):
|
|
"""
|
|
Comprehensive config-driven non-regression check comparing current vs old inclusions.
|
|
Uses rules from regression_check_config loaded from Excel.
|
|
Returns True if any critical issue was found, False otherwise.
|
|
|
|
Args:
|
|
output_inclusions: Current inclusions data (list)
|
|
old_inclusions_filename: Filename of old inclusions JSON file to load
|
|
"""
|
|
# Display section header first
|
|
console.print("\n[bold]═══ Non Regression Check ═══[/bold]\n")
|
|
|
|
# Display loading message and load old inclusions file
|
|
console.print(f"[dim]Loading old inclusions from: {old_inclusions_filename}[/dim]")
|
|
old_inclusions = load_json_file(old_inclusions_filename)
|
|
|
|
if old_inclusions is None:
|
|
console.print(f"[yellow]⚠ No old inclusions file found at '{old_inclusions_filename}', skipping non-regression check[/yellow]")
|
|
return False
|
|
|
|
has_critical = False # Track critical status
|
|
|
|
# ========== INTERNAL UTILITY FUNCTIONS ==========
|
|
|
|
def _is_undefined(value):
|
|
"""Check if a value is considered undefined."""
|
|
return value in [None, "", "undefined"]
|
|
|
|
def _values_are_equal(val1, val2):
|
|
"""
|
|
Compare two values with special handling for undefined values.
|
|
- If both are undefined → considered equal
|
|
- Otherwise → strict equality
|
|
"""
|
|
if _is_undefined(val1) and _is_undefined(val2):
|
|
return True
|
|
return val1 == val2
|
|
|
|
def _apply_pipeline_step(checked_fields, action, field_selector, from_pattern, to_pattern):
|
|
"""Apply one pipeline step to checked_fields list IN-PLACE.
|
|
|
|
Modifies the is_checked status (5th element) of fields matching the selector
|
|
and transition pattern.
|
|
|
|
Args:
|
|
checked_fields: List of [group_name, field_name, old_val, new_val, is_checked]
|
|
MODIFIED IN-PLACE
|
|
action: "include" or "exclude"
|
|
field_selector: "*.*", "group.*", or "group.field"
|
|
from_pattern: "*undefined", "*defined", "*", or literal value
|
|
to_pattern: "*undefined", "*defined", "*", or literal value
|
|
|
|
Logic:
|
|
- For each field in checked_fields:
|
|
- If field matches selector AND transition matches:
|
|
- if action="include": set is_checked=True
|
|
- if action="exclude": set is_checked=False
|
|
- Otherwise: leave is_checked unchanged
|
|
|
|
Returns: None (modifies list in place)
|
|
"""
|
|
for i, field_record in enumerate(checked_fields):
|
|
group_name, field_name, old_val, new_val, is_checked = field_record
|
|
|
|
# Check if this step applies to this field
|
|
if not _field_selector_matches_pattern(field_selector, group_name, field_name):
|
|
continue
|
|
|
|
# Check if transition matches
|
|
if _transition_matches(old_val, new_val, from_pattern, to_pattern):
|
|
if action == "include":
|
|
checked_fields[i][4] = True
|
|
elif action == "exclude":
|
|
checked_fields[i][4] = False
|
|
|
|
def _transition_matches(old_val, new_val, expected_old, expected_new):
|
|
"""
|
|
Check if a transition matches with support for keywords.
|
|
|
|
Keywords supported (start with *):
|
|
- "*undefined": matches None, "", "undefined"
|
|
- "*defined": matches any defined value (NOT None, "", "undefined")
|
|
- "*": matches any value
|
|
|
|
All other values are treated as literal values and matched by exact equality.
|
|
|
|
Args:
|
|
old_val: Actual old value
|
|
new_val: Actual new value
|
|
expected_old: Expected old value or keyword (if starts with *)
|
|
expected_new: Expected new value or keyword (if starts with *)
|
|
|
|
Returns:
|
|
True if transition matches
|
|
"""
|
|
# Handle old value matching
|
|
if expected_old == "*undefined":
|
|
old_matches = old_val in [None, "", "undefined"]
|
|
elif expected_old == "*defined":
|
|
old_matches = old_val not in [None, "", "undefined"]
|
|
elif expected_old == "*":
|
|
old_matches = True
|
|
else:
|
|
# Literal value matching (exact equality)
|
|
old_matches = (old_val == expected_old)
|
|
|
|
# Handle new value matching
|
|
if expected_new == "*undefined":
|
|
new_matches = new_val in [None, "", "undefined"]
|
|
elif expected_new == "*defined":
|
|
new_matches = new_val not in [None, "", "undefined"]
|
|
elif expected_new == "*":
|
|
new_matches = True
|
|
else:
|
|
# Literal value matching (exact equality)
|
|
new_matches = (new_val == expected_new)
|
|
|
|
return old_matches and new_matches
|
|
|
|
def _check_field_matches_exception(group_name, field_name, old_val, new_val, exception_spec):
|
|
"""
|
|
Check if a field matches an exception specification.
|
|
|
|
Now supports both single transitions and multiple transitions per exception.
|
|
|
|
Args:
|
|
group_name: Field group name
|
|
field_name: Field name
|
|
old_val: Old value
|
|
new_val: New value
|
|
exception_spec: Exception specification dict with "field" and "transition"
|
|
Examples:
|
|
Single: {"field": "Status", "transition": [false, true]}
|
|
Multiple: {"field": "Status", "transition": [[false, true], [true, false]]}
|
|
|
|
Returns:
|
|
True if the field and its transition match the exception
|
|
"""
|
|
if not isinstance(exception_spec, dict):
|
|
return False
|
|
|
|
exception_field = exception_spec.get("field")
|
|
exception_transition = exception_spec.get("transition")
|
|
|
|
if not exception_field or not exception_transition:
|
|
return False
|
|
|
|
# Parse field specification (format: "field_group.field_name" or just "field_name")
|
|
if "." in exception_field:
|
|
exc_group, exc_name = exception_field.split(".", 1)
|
|
# Must match both group and name
|
|
if exc_group != group_name or exc_name != field_name:
|
|
return False
|
|
else:
|
|
# Only field name specified, must match field name only
|
|
if exception_field != field_name:
|
|
return False
|
|
|
|
# Check if transition matches (now supports multiple transitions)
|
|
if not isinstance(exception_transition, list):
|
|
return False
|
|
|
|
# Check if this is array of arrays: [[old1, new1], [old2, new2], ...]
|
|
if exception_transition and isinstance(exception_transition[0], list):
|
|
# Multiple transitions
|
|
for trans_pair in exception_transition:
|
|
if len(trans_pair) != 2:
|
|
continue
|
|
expected_old, expected_new = trans_pair
|
|
if _transition_matches(old_val, new_val, expected_old, expected_new):
|
|
return True
|
|
return False
|
|
|
|
# Legacy support: single transition [old, new]
|
|
elif len(exception_transition) == 2 and not isinstance(exception_transition[0], list):
|
|
expected_old, expected_new = exception_transition
|
|
return _transition_matches(old_val, new_val, expected_old, expected_new)
|
|
|
|
return False
|
|
|
|
def _get_status_and_style(count, warning_threshold, critical_threshold):
|
|
"""Determine status level and visual style."""
|
|
nonlocal has_critical
|
|
if count > critical_threshold:
|
|
has_critical = True
|
|
return "CRITICAL", "red", "✗"
|
|
elif count > warning_threshold:
|
|
return "WARNING", "yellow", "⚠"
|
|
else:
|
|
return "OK", "green", "✓"
|
|
|
|
def _print_block_header(title, status_tuple, indent=0):
|
|
"""Print block header with status."""
|
|
indent_str = " " * indent
|
|
status, color, emoji = status_tuple
|
|
console.print(f"{indent_str}{emoji} [{color}][bold]{title}[/bold][/{color}]")
|
|
|
|
def _print_check_line(message, count, status_tuple, indent=1):
|
|
"""Print a check line."""
|
|
indent_str = " " * indent
|
|
status, color, emoji = status_tuple
|
|
console.print(f"{indent_str}{emoji} [{color}]{message}: {count}[/{color}]")
|
|
|
|
def _calculate_block_status(line_statuses):
|
|
"""Calculate overall block status from line statuses."""
|
|
if any(s[0] == "CRITICAL" for s in line_statuses):
|
|
return ("CRITICAL", "red", "✗")
|
|
elif any(s[0] == "WARNING" for s in line_statuses):
|
|
return ("WARNING", "yellow", "⚠")
|
|
else:
|
|
return ("OK", "green", "✓")
|
|
|
|
# ========== NEW FIELD SELECTION PIPELINE FUNCTIONS ==========
|
|
|
|
def _field_selector_matches_pattern(selector, group_name, field_name):
|
|
"""
|
|
Check if a field matches a field_selector pattern.
|
|
|
|
Patterns:
|
|
- "*.*": matches any field
|
|
- "group.*": matches any field in specific group
|
|
- "group.field": matches specific field
|
|
|
|
Args:
|
|
selector: Field selector pattern string
|
|
group_name: Actual group name
|
|
field_name: Actual field name
|
|
|
|
Returns:
|
|
True if matches, False otherwise
|
|
"""
|
|
if selector == "*.*":
|
|
return True
|
|
|
|
sel_group, sel_field = selector.split(".", 1)
|
|
|
|
# Check group part
|
|
if sel_group != "*" and sel_group != group_name:
|
|
return False
|
|
|
|
# Check field part
|
|
if sel_field == "*":
|
|
return True
|
|
|
|
return sel_field == field_name
|
|
|
|
def _apply_field_selection_pipeline(all_fields, field_selection_config):
|
|
"""
|
|
Apply field_selection pipeline to build candidate_fields.
|
|
|
|
Args:
|
|
all_fields: List of (group_name, field_name) tuples available
|
|
field_selection_config: List of [action, field_selector] steps
|
|
|
|
Returns:
|
|
Set of (group_name, field_name) tuples matching pipeline
|
|
"""
|
|
# Start with empty set
|
|
candidate_fields = set()
|
|
|
|
# If None or empty, return empty (explicit requirement)
|
|
if not field_selection_config:
|
|
return candidate_fields
|
|
|
|
# Apply each pipeline step
|
|
for action, field_selector in field_selection_config:
|
|
for group_name, field_name in all_fields:
|
|
# Check if this field matches the selector
|
|
if _field_selector_matches_pattern(field_selector, group_name, field_name):
|
|
if action == "include":
|
|
candidate_fields.add((group_name, field_name))
|
|
elif action == "exclude":
|
|
candidate_fields.discard((group_name, field_name))
|
|
|
|
return candidate_fields
|
|
|
|
def _get_key_field_from_new_inclusions_rule(rule, new_inclusions_list, old_inclusions_list):
|
|
"""
|
|
Determine key field by applying field_selection to first inclusion sample.
|
|
|
|
Logic:
|
|
1. Get first inclusion from new and old data (representative sample)
|
|
2. Apply field_selection pipeline to both (same as any rule)
|
|
3. Return first field that exists with value in BOTH inclusions
|
|
|
|
Assumes inclusion structure is stable across all inclusions (reasonable assumption
|
|
for database-backed data).
|
|
|
|
Args:
|
|
rule: "New Inclusions" rule with field_selection config
|
|
new_inclusions_list: List of new inclusions
|
|
old_inclusions_list: List of old inclusions
|
|
|
|
Returns:
|
|
(key_field_name, field_group) tuple
|
|
|
|
Raises:
|
|
ValueError: If lists empty or no valid key field found
|
|
"""
|
|
# Get first inclusion from each (representative sample of structure)
|
|
if not new_inclusions_list or not old_inclusions_list:
|
|
raise ValueError("Cannot determine key field: empty inclusion lists")
|
|
|
|
new_inc = new_inclusions_list[0] # First new inclusion
|
|
old_inc = old_inclusions_list[0] # First old inclusion
|
|
|
|
# Apply field_selection pipeline (SAME AS FOR ANY RULE!)
|
|
# This respects the full pipeline: include/exclude/wildcards
|
|
candidate_fields = _build_candidate_fields(new_inc, old_inc, rule.get("field_selection"))
|
|
|
|
if not candidate_fields:
|
|
raise ValueError(
|
|
f"field_selection produced no candidate fields. "
|
|
f"Config: {rule.get('field_selection')}"
|
|
)
|
|
|
|
# Try each candidate field in order (sorted for determinism)
|
|
# Return first field that has non-null value in both inclusions
|
|
for group_name, field_name in sorted(candidate_fields):
|
|
new_val = get_nested_value(new_inc, [group_name, field_name])
|
|
old_val = get_nested_value(old_inc, [group_name, field_name])
|
|
|
|
if new_val is not None and old_val is not None:
|
|
return field_name, group_name
|
|
|
|
# No valid key found
|
|
raise ValueError(
|
|
f"No field in field_selection has values in both first new and old inclusion. "
|
|
f"Candidates from pipeline: {candidate_fields}. "
|
|
f"Verify field_selection config or data has proper values."
|
|
)
|
|
|
|
def _build_inclusion_dict(inclusions_list, key_field, field_group="Patient_Identification"):
|
|
"""
|
|
Build dictionary indexed by key field.
|
|
|
|
Args:
|
|
inclusions_list: List of inclusion dicts
|
|
key_field: Field name to use as key (e.g., "Patient_Id", "Pseudo")
|
|
field_group: Group containing the key field (default: "Patient_Identification")
|
|
|
|
Returns:
|
|
Dict with key values as keys, inclusion dicts as values
|
|
"""
|
|
result = {}
|
|
for inclusion in inclusions_list:
|
|
key = get_nested_value(inclusion, [field_group, key_field])
|
|
if key:
|
|
result[key] = inclusion
|
|
return result
|
|
|
|
# ========== TRANSITION MATCHING FUNCTIONS ==========
|
|
|
|
def _matches_transition(old_val, new_val, transitions_config):
|
|
"""Check if (old_val, new_val) matches any configured transition.
|
|
|
|
Uses the helper function _transition_matches for consistency.
|
|
|
|
Supports keywords with asterisk prefix:
|
|
- *undefined: matches any undefined value (None, "", "undefined")
|
|
- *defined: matches any defined value (not None, "", or "undefined")
|
|
- *: wildcard, matches any value
|
|
|
|
All other values are treated as literal values and matched by exact equality.
|
|
"""
|
|
if transitions_config is None:
|
|
return False
|
|
for transition in transitions_config:
|
|
expected_old, expected_new = transition
|
|
if _transition_matches(old_val, new_val, expected_old, expected_new):
|
|
return True
|
|
return False
|
|
|
|
# ========== RULE PROCESSING FUNCTIONS ==========
|
|
|
|
def _process_special_rule(rule, line_label, new_dict, old_dict):
|
|
"""
|
|
Process special rules: "New Inclusions" and "Deleted Inclusions".
|
|
These rules simply count the number of keys present in one dict but not the other.
|
|
|
|
Args:
|
|
rule: Rule configuration (unused for counting, but kept for consistency)
|
|
line_label: The line label to identify which special rule this is
|
|
new_dict: Dictionary of new inclusions
|
|
old_dict: Dictionary of old inclusions
|
|
|
|
Returns:
|
|
Count of new or deleted inclusions
|
|
"""
|
|
if line_label == "New Inclusions":
|
|
return len(set(new_dict.keys()) - set(old_dict.keys()))
|
|
elif line_label == "Deleted Inclusions":
|
|
return len(set(old_dict.keys()) - set(new_dict.keys()))
|
|
else:
|
|
# Should not happen, but return 0 for safety
|
|
return 0
|
|
|
|
def _process_new_deleted_fields(line_label, new_dict, old_dict):
|
|
"""
|
|
Process special rules: "New Fields" and "Deleted Fields".
|
|
|
|
These rules collect all fields that appear/disappear in inclusions, using
|
|
qualified names "group.field" to distinguish fields across different groups.
|
|
|
|
Note: field_selection is NOT used for these rules (must be empty).
|
|
|
|
Returns a list of tuples: [(field_qualified_name, count_of_inclusions), ...]
|
|
where count_of_inclusions is the number of inclusions that have this field added/removed.
|
|
|
|
Args:
|
|
line_label: "New Fields" or "Deleted Fields"
|
|
new_dict: Dictionary of new inclusions
|
|
old_dict: Dictionary of old inclusions
|
|
|
|
Returns:
|
|
List of (qualified_field_name, inclusion_count) tuples
|
|
"""
|
|
# Collect field changes across all common inclusions
|
|
field_counts = {} # qualified_field_name -> count of inclusions
|
|
|
|
# Only examine common inclusions (present in both versions)
|
|
# Sort for deterministic processing
|
|
common_keys = sorted(set(new_dict.keys()) & set(old_dict.keys()))
|
|
|
|
for key in common_keys:
|
|
new_inc = new_dict[key]
|
|
old_inc = old_dict[key]
|
|
|
|
# Get all groups from both versions
|
|
# Sort for deterministic processing
|
|
all_groups = sorted(set(new_inc.keys()) | set(old_inc.keys()))
|
|
|
|
for group_name in all_groups:
|
|
new_group = new_inc.get(group_name, {})
|
|
old_group = old_inc.get(group_name, {})
|
|
|
|
if not isinstance(new_group, dict):
|
|
new_group = {}
|
|
if not isinstance(old_group, dict):
|
|
old_group = {}
|
|
|
|
new_fields = set(new_group.keys())
|
|
old_fields = set(old_group.keys())
|
|
|
|
# Determine which fields to count based on line_label
|
|
if line_label == "New Fields":
|
|
changed_fields = sorted(new_fields - old_fields)
|
|
elif line_label == "Deleted Fields":
|
|
changed_fields = sorted(old_fields - new_fields)
|
|
else:
|
|
changed_fields = []
|
|
|
|
# Count each changed field with qualified name (sorted for determinism)
|
|
for field_name in changed_fields:
|
|
qualified_name = f"{group_name}.{field_name}"
|
|
field_counts[qualified_name] = field_counts.get(qualified_name, 0) + 1
|
|
|
|
# Convert to list of tuples and sort by count (descending) then by name
|
|
result = sorted(field_counts.items(), key=lambda x: (-x[1], x[0]))
|
|
return result
|
|
|
|
def _build_candidate_fields(new_inc, old_inc, field_selection_config):
|
|
"""
|
|
Helper function to build candidate fields using field_selection pipeline.
|
|
|
|
Args:
|
|
new_inc: New inclusion dict
|
|
old_inc: Old inclusion dict
|
|
field_selection_config: List of [action, field_selector] pipeline steps
|
|
|
|
Returns:
|
|
Sorted list of (group_name, field_name) tuples that exist in both versions
|
|
"""
|
|
# Step 1: Collect all available fields from both versions
|
|
common_groups = sorted(set(new_inc.keys()) & set(old_inc.keys()))
|
|
all_available_fields = []
|
|
|
|
for group_name in common_groups:
|
|
new_group = new_inc.get(group_name, {})
|
|
old_group = old_inc.get(group_name, {})
|
|
|
|
if not isinstance(new_group, dict):
|
|
new_group = {}
|
|
if not isinstance(old_group, dict):
|
|
old_group = {}
|
|
|
|
# Only fields that exist in both versions
|
|
common_field_names = sorted(set(new_group.keys()) & set(old_group.keys()))
|
|
|
|
for field_name in common_field_names:
|
|
all_available_fields.append((group_name, field_name))
|
|
|
|
# Step 2: Apply field_selection pipeline
|
|
if not field_selection_config:
|
|
return []
|
|
|
|
candidate_fields = _apply_field_selection_pipeline(
|
|
all_available_fields,
|
|
field_selection_config
|
|
)
|
|
|
|
return sorted(candidate_fields, key=lambda x: (x[0], x[1]))
|
|
|
|
def _process_rule(rule, new_dict, old_dict):
|
|
"""
|
|
Process a single regression check rule with correct 4-step logic.
|
|
|
|
Logic:
|
|
1. Build candidate fields using field_selection pipeline
|
|
2. For each changed field, check if transition matches → mark as "checked"
|
|
3. Apply transitions pipeline steps → modify "checked" status
|
|
4. Apply bloc_scope (all/any) → count inclusion
|
|
|
|
Only processes common_keys (inclusions present in both new and old dicts).
|
|
|
|
Args:
|
|
rule: Rule configuration dict
|
|
new_dict: Dict of new inclusions indexed by key field
|
|
old_dict: Dict of old inclusions indexed by key field
|
|
|
|
Returns:
|
|
Tuple of (count, details_list) where:
|
|
- count: Number of matching inclusions
|
|
- details_list: List of (inclusion_key, field_changes) tuples for DEBUG_MODE
|
|
field_changes is list of (group.field, old_val, new_val) tuples
|
|
"""
|
|
# Check for config errors first
|
|
if rule.get("_config_error"):
|
|
return 0, []
|
|
|
|
field_selection_config = rule.get("field_selection")
|
|
bloc_scope = rule.get("bloc_scope") or "any"
|
|
|
|
# Only process inclusions present in both versions
|
|
common_keys = sorted(set(new_dict.keys()) & set(old_dict.keys()))
|
|
matching_inclusions_count = 0
|
|
details_list = [] # For DEBUG_MODE
|
|
|
|
for key in common_keys:
|
|
new_inc = new_dict[key]
|
|
old_inc = old_dict[key]
|
|
|
|
# Step 1: Build candidate fields using field_selection pipeline
|
|
candidate_fields = _build_candidate_fields(new_inc, old_inc, field_selection_config)
|
|
|
|
# If no candidate fields, skip this inclusion
|
|
if not candidate_fields:
|
|
continue
|
|
|
|
# Step 2 & 3: Build initial field list and apply transitions pipeline
|
|
# Initialize field list with all changed fields
|
|
# Format: [group_name, field_name, old_val, new_val, is_checked]
|
|
all_fields_list = []
|
|
changed_fields = [] # Track for bloc_scope="all" logic
|
|
|
|
for group_name, field_name in candidate_fields:
|
|
new_val = get_nested_value(new_inc, [group_name, field_name])
|
|
old_val = get_nested_value(old_inc, [group_name, field_name])
|
|
|
|
# Track if field has changed (for bloc_scope="all" logic)
|
|
field_has_changed = not _values_are_equal(old_val, new_val)
|
|
if field_has_changed:
|
|
changed_fields.append((group_name, field_name))
|
|
# Add to all_fields_list with is_checked=False initially
|
|
all_fields_list.append([group_name, field_name, old_val, new_val, False])
|
|
|
|
# Apply transitions pipeline: each step modifies is_checked in-place
|
|
transitions_config = rule.get("transitions", [])
|
|
if transitions_config and isinstance(transitions_config, list):
|
|
for action, field_selector, from_val, to_val in transitions_config:
|
|
_apply_pipeline_step(all_fields_list, action, field_selector, from_val, to_val)
|
|
|
|
# Extract final checked fields
|
|
checked_fields = [(f[0], f[1], f[2], f[3]) for f in all_fields_list if f[4]]
|
|
|
|
# Step 4: Apply bloc_scope logic
|
|
inclusion_matches = False
|
|
if bloc_scope == "all":
|
|
# ALL fields that CHANGED must match the transition pattern
|
|
# (unchanged fields don't block the rule)
|
|
if len(changed_fields) > 0 and len(checked_fields) == len(changed_fields):
|
|
inclusion_matches = True
|
|
else: # bloc_scope == "any"
|
|
# AT LEAST ONE field must be checked
|
|
if len(checked_fields) > 0:
|
|
inclusion_matches = True
|
|
|
|
if inclusion_matches:
|
|
matching_inclusions_count += 1
|
|
# Collect details for debug_mode
|
|
if debug_mode and checked_fields:
|
|
field_changes = [(f"{gn}.{fn}", ov, nv) for gn, fn, ov, nv in checked_fields]
|
|
details_list.append((key, field_changes))
|
|
|
|
return matching_inclusions_count, details_list
|
|
|
|
# ========== MAIN LOGIC ==========
|
|
|
|
# Determine key field from "New Inclusions" rule config
|
|
key_field = None
|
|
field_group = None
|
|
|
|
for rule in regression_check_config:
|
|
if rule.get("line_label") == "New Inclusions":
|
|
try:
|
|
key_field, field_group = _get_key_field_from_new_inclusions_rule(
|
|
rule,
|
|
output_inclusions,
|
|
old_inclusions
|
|
)
|
|
break
|
|
except ValueError as e:
|
|
console.print(f"[bold red]Error determining key field: {e}[/bold red]")
|
|
return True # Critical error, trigger user confirmation
|
|
|
|
if not key_field:
|
|
console.print("[bold red]Error: 'New Inclusions' rule not found or has no valid field_selection[/bold red]")
|
|
return True # Critical error, trigger user confirmation
|
|
|
|
console.print(f"[dim]Using key field: {field_group}.{key_field}[/dim]\n")
|
|
|
|
new_dict = _build_inclusion_dict(output_inclusions, key_field, field_group)
|
|
old_dict = _build_inclusion_dict(old_inclusions, key_field, field_group)
|
|
|
|
# Group rules by bloc_title, preserving order of first appearance in regression_check_config
|
|
blocs = {}
|
|
bloc_order = [] # Track order of first appearance
|
|
for rule in regression_check_config:
|
|
bloc_title = rule["bloc_title"]
|
|
if bloc_title not in blocs:
|
|
blocs[bloc_title] = []
|
|
bloc_order.append(bloc_title)
|
|
blocs[bloc_title].append(rule)
|
|
|
|
# Process each bloc in order of first appearance
|
|
for bloc_title in bloc_order:
|
|
rules = blocs[bloc_title]
|
|
line_results = []
|
|
|
|
for rule in rules:
|
|
line_label = rule["line_label"]
|
|
warning_threshold = rule["warning_threshold"]
|
|
critical_threshold = rule["critical_threshold"]
|
|
|
|
# Detect special rules and route to appropriate processing function
|
|
if line_label in ["New Inclusions", "Deleted Inclusions"]:
|
|
# Special rules: just count new/deleted keys
|
|
count = _process_special_rule(rule, line_label, new_dict, old_dict)
|
|
line_results.append((line_label, count, None, "simple")) # type: simple count
|
|
|
|
elif line_label in ["New Fields", "Deleted Fields"]:
|
|
# Special rules: collect field-by-field details
|
|
field_list = _process_new_deleted_fields(line_label, new_dict, old_dict)
|
|
# Count is the number of fields detected
|
|
count = len(field_list)
|
|
line_results.append((line_label, count, field_list, "fields")) # type: field list
|
|
|
|
else:
|
|
# Normal rules: apply 4-step logic
|
|
count, details = _process_rule(rule, new_dict, old_dict)
|
|
line_results.append((line_label, count, details, "details")) # type: inclusion details
|
|
|
|
# Calculate status for each line now that we have counts
|
|
line_results_with_status = []
|
|
for line_label, count, data, result_type in line_results:
|
|
# Find the rule to get thresholds
|
|
rule = next(r for r in rules if r["line_label"] == line_label)
|
|
warning_threshold = rule["warning_threshold"]
|
|
critical_threshold = rule["critical_threshold"]
|
|
status_tuple = _get_status_and_style(count, warning_threshold, critical_threshold)
|
|
line_results_with_status.append((line_label, count, data, result_type, status_tuple))
|
|
|
|
# Calculate bloc status
|
|
bloc_status = _calculate_block_status([result[4] for result in line_results_with_status])
|
|
|
|
# Display bloc header
|
|
_print_block_header(bloc_title, bloc_status, indent=0)
|
|
|
|
# Display lines based on bloc and status
|
|
for line_label, count, data, result_type, status_tuple in line_results_with_status:
|
|
# Structure bloc shows everything, others only show non-OK lines
|
|
should_display = (bloc_title == "Structure") or (status_tuple[0] != "OK")
|
|
|
|
if should_display:
|
|
if result_type == "fields":
|
|
# Display field list with title and sub-items
|
|
_print_check_line(line_label, count, status_tuple, indent=1)
|
|
# Display each field as a sub-item
|
|
for field_name, inclusion_count in data:
|
|
console.print(f" {field_name} ({inclusion_count} inclusions)")
|
|
|
|
elif result_type == "details":
|
|
# Display count
|
|
_print_check_line(line_label, count, status_tuple, indent=1)
|
|
|
|
# Display detailed changes if debug_mode is enabled and data exists
|
|
if debug_mode and data and len(data) > 0:
|
|
for inclusion_key, field_changes in data:
|
|
console.print(f" [dim]{key_field}: {inclusion_key}[/dim]")
|
|
for qualified_field, old_val, new_val in field_changes:
|
|
# Format values for display
|
|
old_display = f"'{old_val}'" if isinstance(old_val, str) else str(old_val)
|
|
new_display = f"'{new_val}'" if isinstance(new_val, str) else str(new_val)
|
|
console.print(f" - {qualified_field}: {old_display} → {new_display}")
|
|
|
|
else:
|
|
# Simple count display
|
|
_print_check_line(line_label, count, status_tuple, indent=1)
|
|
|
|
console.print()
|
|
|
|
return has_critical
|