Files
EB_Dashboard/eb_dashboard_quality_checks.py

1314 lines
56 KiB
Python

"""
Endobest Dashboard - Quality Checks Module
This module contains all quality assurance functions:
- JSON file loading and backup utilities
- Coherence checks between organization statistics and detailed inclusion data
- Comprehensive non-regression checks with configurable rules
- Config-driven validation with Warning/Critical thresholds
- Support for special rules (New/Deleted Inclusions, New/Deleted Fields)
- 4-step logic for normal rules (field selection, transition matching, exception application, bloc_scope)
"""
import json
import logging
import os
import shutil
import openpyxl
from rich.console import Console
from eb_dashboard_utils import get_nested_value, get_old_filename as _get_old_filename, get_config_path
from eb_dashboard_constants import (
INCLUSIONS_FILE_NAME,
ORGANIZATIONS_FILE_NAME,
OLD_FILE_SUFFIX,
DASHBOARD_CONFIG_FILE_NAME,
REGRESSION_CHECK_TABLE_NAME
)
# ============================================================================
# MODULE CONFIGURATION
# ============================================================================
# Debug mode: Set to True to display detailed changes for each regression check rule
# (Variable globale - mutée au runtime, pas une constante)
debug_mode = False
def enable_debug_mode():
"""Enable debug mode to display detailed changes for each regression check rule."""
global debug_mode
debug_mode = True
if console:
console.print("[dim]DEBUG MODE enabled - detailed changes will be displayed[/dim]")
# ============================================================================
# MODULE DEPENDENCIES (injected from main module)
# ============================================================================
# Will be injected by the main module
console = None
# Regression check config is loaded on-demand via load_regression_check_config()
regression_check_config = []
# NOTE: File names and table names are imported from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH):
# - INCLUSIONS_FILE_NAME
# - ORGANIZATIONS_FILE_NAME
# - OLD_FILE_SUFFIX
# - DASHBOARD_CONFIG_FILE_NAME
# - REGRESSION_CHECK_TABLE_NAME
def set_dependencies(console_instance):
"""
Inject console instance from main module.
Args:
console_instance: Rich Console instance for formatted output
Note:
- File and table names are imported directly from eb_dashboard_constants.py (SINGLE SOURCE OF TRUTH)
- Regression check config is loaded on-demand via load_regression_check_config()
"""
global console
console = console_instance
# ============================================================================
# CONFIGURATION LOADING
# ============================================================================
def load_regression_check_config(console_instance=None):
"""Loads and validates the regression check configuration from the Excel file.
Args:
console_instance: Optional Rich Console instance. If not provided, uses global console.
"""
global regression_check_config, console
# Use provided console or fall back to global
if console_instance:
console = console_instance
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME)
try:
workbook = openpyxl.load_workbook(config_path)
except FileNotFoundError:
error_msg = f"Error: Configuration file not found at: {config_path}"
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
if REGRESSION_CHECK_TABLE_NAME not in workbook.sheetnames:
error_msg = f"Error: Sheet '{REGRESSION_CHECK_TABLE_NAME}' not found in the configuration file."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
sheet = workbook[REGRESSION_CHECK_TABLE_NAME]
headers = [cell.value for cell in sheet[1]]
temp_config = []
for row_index, row in enumerate(sheet.iter_rows(min_row=2, values_only=True), start=2):
rule_config = dict(zip(headers, row))
# Skip if ignore column contains "ignore" (case insensitive)
ignore_value = rule_config.get("ignore")
if ignore_value and isinstance(ignore_value, str) and "ignore" in ignore_value.lower():
continue
# Skip if all columns are None (empty row)
if all(value is None for value in row):
continue
# Validate bloc_title and line_label
bloc_title = rule_config.get("bloc_title")
line_label = rule_config.get("line_label")
if not bloc_title or not isinstance(bloc_title, str):
continue # Skip rows without bloc_title (header separators, etc.)
if not line_label or not isinstance(line_label, str):
error_msg = f"Error in Regression_Check config, row {row_index}: 'line_label' is mandatory when 'bloc_title' is specified."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
# Validate thresholds
warning_threshold = rule_config.get("warning_threshold")
critical_threshold = rule_config.get("critical_threshold")
if warning_threshold is None or not isinstance(warning_threshold, (int, float)) or warning_threshold < 0:
error_msg = f"Error in Regression_Check config, row {row_index}: 'warning_threshold' must be a number >= 0."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
if critical_threshold is None or not isinstance(critical_threshold, (int, float)) or critical_threshold < 0:
error_msg = f"Error in Regression_Check config, row {row_index}: 'critical_threshold' must be a number >= 0."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
# Parse JSON fields
for json_field in ["field_selection", "transitions"]:
value = rule_config.get(json_field)
if value and isinstance(value, str):
try:
rule_config[json_field] = json.loads(value)
except json.JSONDecodeError:
error_msg = f"Error in Regression_Check config, row {row_index}, field '{json_field}': Invalid JSON format."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
elif value is None:
rule_config[json_field] = None
# Validate field_selection format
line_label = rule_config.get("line_label")
field_selection = rule_config.get("field_selection")
# Special rules that don't use field_selection
special_rules_no_selection = ["New Fields", "Deleted Fields", "Deleted Inclusions"]
if line_label not in special_rules_no_selection:
# Standard rules and "New Inclusions" MUST have field_selection
if field_selection is None:
error_msg = f"Error in Regression_Check config, row {row_index}: 'field_selection' is mandatory for rule '{line_label}'."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
if not isinstance(field_selection, list):
console.print(f"[yellow]⚠ Row {row_index}: 'field_selection' must be a JSON array of [action, selector] pairs, skipping rule[/yellow]")
rule_config["_config_error"] = True
else:
# Validate each field_selection step
for step_idx, step in enumerate(field_selection):
if not isinstance(step, list) or len(step) != 2:
console.print(f"[yellow]⚠ Row {row_index}: field_selection[{step_idx}] must be array of 2 elements [action, selector], skipping rule[/yellow]")
rule_config["_config_error"] = True
break
action, field_selector = step
if action not in ["include", "exclude"]:
console.print(f"[yellow]⚠ Row {row_index}: field_selection[{step_idx}] action must be 'include' or 'exclude', got '{action}', skipping rule[/yellow]")
rule_config["_config_error"] = True
break
if not isinstance(field_selector, str) or "." not in field_selector:
console.print(f"[yellow]⚠ Row {row_index}: field_selection[{step_idx}] selector must be string with dot notation (e.g., '*.*', 'group.*', 'group.field'), got '{field_selector}', skipping rule[/yellow]")
rule_config["_config_error"] = True
break
else:
# Special rules should have empty field_selection
if field_selection is not None and field_selection != [] and field_selection != "":
console.print(f"[yellow]⚠ Row {row_index}: Special rule '{line_label}' should have empty field_selection, got {field_selection}[/yellow]")
rule_config["_config_error"] = True
# Validate bloc_scope
bloc_scope = rule_config.get("bloc_scope")
if bloc_scope is not None and bloc_scope not in ["all", "any"]:
error_msg = f"Error in Regression_Check config, row {row_index}: 'bloc_scope' must be 'all' or 'any'."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
# Validate transitions format (new pipeline format)
# Format: [["include"/"exclude", "field_selector", "from_pattern", "to_pattern"], ...]
transitions = rule_config.get("transitions")
config_error = False
if transitions is not None:
if not isinstance(transitions, list):
console.print(f"[yellow]⚠ Row {row_index}: 'transitions' must be a JSON array, skipping this rule[/yellow]")
config_error = True
else:
# Validate each transition step
for step_idx, transition_step in enumerate(transitions):
if not isinstance(transition_step, list) or len(transition_step) != 4:
console.print(f"[yellow]⚠ Row {row_index}: transitions[{step_idx}] must be array of 4 elements [action, field_selector, from, to], skipping[/yellow]")
config_error = True
break
action, field_selector, from_val, to_val = transition_step
if action not in ["include", "exclude"]:
console.print(f"[yellow]⚠ Row {row_index}: transitions[{step_idx}] action must be 'include' or 'exclude', got '{action}', skipping[/yellow]")
config_error = True
break
if not isinstance(field_selector, str) or "." not in field_selector:
console.print(f"[yellow]⚠ Row {row_index}: transitions[{step_idx}] field_selector must be string with dot notation (e.g., '*.*', 'group.*', 'group.field'), got '{field_selector}', skipping[/yellow]")
config_error = True
break
if config_error:
rule_config["_config_error"] = True
temp_config.append(rule_config)
regression_check_config = temp_config
console.print(f"Loaded {len(regression_check_config)} regression check rules.", style="green")
def run_check_only_mode(sys_argv):
"""
Orchestrates CHECK_ONLY and CHECK_ONLY_COMPARE modes.
This function handles the complete workflow for both CHECK_ONLY modes:
- CHECK_ONLY: Full validation (coherence + regression) on existing files
- CHECK_ONLY_COMPARE: Regression-only comparison of two specific files
Args:
sys_argv: sys.argv from main script (to parse command-line arguments)
"""
global console
# Initialize console if not already set
if console is None:
console = Console()
print()
# Detect CHECK_ONLY_COMPARE mode: --check-only <file1> <file2>
if len(sys_argv) >= 4:
# CHECK_ONLY_COMPARE mode: Compare two specific files
current_file = sys_argv[2]
old_file = sys_argv[3]
console.print("[bold cyan]═══ CHECK ONLY COMPARE MODE ═══[/bold cyan]")
console.print(f"Comparing two specific files without coherence check:\n")
console.print(f" Current: [bold]{current_file}[/bold]")
console.print(f" Old: [bold]{old_file}[/bold]\n")
# Load only regression check configuration
print()
load_regression_check_config(console)
# Run quality checks with coherence check skipped
print()
has_coherence_critical, has_regression_critical = run_quality_checks(
current_inclusions=current_file,
organizations_list=None,
old_inclusions_filename=old_file,
skip_coherence=True
)
# Display summary
if has_regression_critical:
console.print("[bold red]✗ CRITICAL issues detected![/bold red]")
else:
console.print("[bold green]✓ All checks passed successfully![/bold green]")
else:
# Standard CHECK_ONLY mode: Full validation with coherence + regression
console.print("[bold cyan]═══ CHECK ONLY MODE ═══[/bold cyan]")
console.print("Running quality checks on existing data files without collecting new data.\n")
# Load regression check configuration (coherence check doesn't need extended fields)
print()
load_regression_check_config(console)
# Run quality checks (will load all files internally)
print()
old_inclusions_file = _get_old_filename(INCLUSIONS_FILE_NAME, OLD_FILE_SUFFIX)
has_coherence_critical, has_regression_critical = run_quality_checks(
current_inclusions=INCLUSIONS_FILE_NAME,
organizations_list=ORGANIZATIONS_FILE_NAME,
old_inclusions_filename=old_inclusions_file
)
# Display summary
if has_coherence_critical or has_regression_critical:
console.print("[bold red]✗ CRITICAL issues detected![/bold red]")
else:
console.print("[bold green]✓ All checks passed successfully![/bold green]")
# ============================================================================
# FILE UTILITIES
# ============================================================================
def load_json_file(filename):
"""
Loads a JSON file (inclusions, organizations, or any JSON data).
Returns the parsed JSON data or None if file doesn't exist or error occurred.
Args:
filename: Path to the JSON file to load.
Returns:
Parsed JSON data (list, dict, etc.) or None if file not found or error occurred.
"""
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logging.warning(f"Could not load JSON file '{filename}': {e}")
console.print(f"[yellow]⚠ Warning: Could not load JSON file '{filename}': {e}[/yellow]")
return None
def backup_output_files():
"""
Silently backups current output files before writing new versions.
This is called AFTER all checks pass to avoid losing history on crash.
"""
def _backup_file_silent(source, destination):
"""Internal: Silently backup a file if it exists, overwriting destination."""
if os.path.exists(source):
try:
shutil.copy2(source, destination)
except Exception as e:
logging.warning(f"Could not backup {source}: {e}")
_backup_file_silent(INCLUSIONS_FILE_NAME, _get_old_filename(INCLUSIONS_FILE_NAME, OLD_FILE_SUFFIX))
_backup_file_silent(ORGANIZATIONS_FILE_NAME, _get_old_filename(ORGANIZATIONS_FILE_NAME, OLD_FILE_SUFFIX))
# ============================================================================
# COHERENCE CHECK
# ============================================================================
def coherence_check(output_inclusions, organizations_list):
"""
Checks coherence between organization statistics and actual inclusion details.
Displays results with color-coded status.
Returns True if any critical issue was found, False otherwise.
"""
has_critical = False # Track critical status
def _get_status_and_style(count, warning_threshold=None, critical_threshold=None):
"""Internal: Determine status level and visual style."""
nonlocal has_critical
if critical_threshold is not None and count > critical_threshold:
has_critical = True
return "CRITICAL", "red", ""
elif warning_threshold is not None and count > warning_threshold:
return "WARNING", "yellow", ""
else:
return "OK", "green", ""
def _print_check_line(message, count=None, status_tuple=None, indent=0):
"""Internal: Print a formatted check line with emoji and color."""
indent_str = " " * indent
if status_tuple:
status, color, emoji = status_tuple
if count is not None:
console.print(f"{indent_str}{emoji} [{color}]{message}: {count}[/{color}]")
else:
console.print(f"{indent_str}{emoji} [{color}]{message}[/{color}]")
else:
console.print(f"{indent_str}{message}")
def _calculate_detail_counters_with_ap(inclusions_list, org_id=None):
"""Internal: Calculate actual counters from inclusions detail with AP (prematurely terminated) handling.
Rules:
- If status ends with ' - AP': increment prematurely_terminated
- Else if starts with 'pré-incluse': increment preincluded
- Else if starts with 'incluse': increment included
- Always increment patients
"""
patients = 0
preincluded = 0
included = 0
prematurely_terminated = 0
for inclusion in inclusions_list:
# Filter by organization if specified
if org_id:
inc_org_id = get_nested_value(inclusion, ["Patient_Identification", "Organisation_Id"])
if inc_org_id != org_id:
continue
patients += 1
status = get_nested_value(inclusion, ["Inclusion", "Inclusion_Status"], default="")
if isinstance(status, str):
# Check if status ends with ' - AP' (prematurely terminated)
if status.endswith(" - AP"):
prematurely_terminated += 1
# Otherwise apply the normal classification
elif status.lower().startswith("pré-incluse"):
preincluded += 1
elif status.lower().startswith("incluse"):
included += 1
return patients, preincluded, included, prematurely_terminated
# Main coherence check logic
console.print("\n[bold]═══ Coherence Check ═══[/bold]\n")
# Calculate total counters
total_stats = {
'patients': sum(org.get('patients_count', 0) for org in organizations_list),
'preincluded': sum(org.get('preincluded_count', 0) for org in organizations_list),
'included': sum(org.get('included_count', 0) for org in organizations_list),
'prematurely_terminated': sum(org.get('prematurely_terminated_count', 0) for org in organizations_list)
}
total_detail_tuple = _calculate_detail_counters_with_ap(output_inclusions)
total_detail = {
'patients': total_detail_tuple[0],
'preincluded': total_detail_tuple[1],
'included': total_detail_tuple[2],
'prematurely_terminated': total_detail_tuple[3]
}
# Check total (4 counters must match)
total_ok = (total_stats['patients'] == total_detail['patients'] and
total_stats['preincluded'] == total_detail['preincluded'] and
total_stats['included'] == total_detail['included'] and
total_stats['prematurely_terminated'] == total_detail['prematurely_terminated'])
total_status = _get_status_and_style(0 if total_ok else 1, 0, 0)
message = (f"TOTAL - Stats({total_stats['patients']}/{total_stats['preincluded']}/{total_stats['included']}/{total_stats['prematurely_terminated']}) "
f"vs Detail({total_detail['patients']}/{total_detail['preincluded']}/{total_detail['included']}/{total_detail['prematurely_terminated']})")
_print_check_line(message, status_tuple=total_status, indent=0)
# Check each organization (only display if not OK)
for org in organizations_list:
org_id = org.get('id')
org_name = org.get('name', 'Unknown')
org_stats = {
'patients': org.get('patients_count', 0),
'preincluded': org.get('preincluded_count', 0),
'included': org.get('included_count', 0),
'prematurely_terminated': org.get('prematurely_terminated_count', 0)
}
org_detail_tuple = _calculate_detail_counters_with_ap(output_inclusions, org_id)
org_detail = {
'patients': org_detail_tuple[0],
'preincluded': org_detail_tuple[1],
'included': org_detail_tuple[2],
'prematurely_terminated': org_detail_tuple[3]
}
org_ok = (org_stats['patients'] == org_detail['patients'] and
org_stats['preincluded'] == org_detail['preincluded'] and
org_stats['included'] == org_detail['included'] and
org_stats['prematurely_terminated'] == org_detail['prematurely_terminated'])
if not org_ok:
org_status = _get_status_and_style(1, 0, 0)
message = (f"{org_name} - Stats({org_stats['patients']}/{org_stats['preincluded']}/{org_stats['included']}/{org_stats['prematurely_terminated']}) "
f"vs Detail({org_detail['patients']}/{org_detail['preincluded']}/{org_detail['included']}/{org_detail['prematurely_terminated']})")
_print_check_line(message, status_tuple=org_status, indent=1)
return has_critical
# ============================================================================
# QUALITY CHECKS ORCHESTRATION
# ============================================================================
def run_quality_checks(current_inclusions, organizations_list, old_inclusions_filename, skip_coherence=False):
"""
Runs coherence and non-regression quality checks on inclusions data.
Args:
current_inclusions: Either a filename (str) to load inclusions from,
or a list of inclusion dictionaries (already in memory)
organizations_list: Either a filename (str) to load organizations from,
or a list of organization dictionaries (already in memory)
old_inclusions_filename: Filename of old inclusions for regression comparison
Must be a string (filename)
skip_coherence: If True, skip coherence check (default: False)
Returns:
Tuple of (has_coherence_critical, has_regression_critical)
Usage:
- Normal mode:
run_quality_checks(
current_inclusions=output_inclusions, # list (in memory)
organizations_list=organizations_list, # list (in memory)
old_inclusions_filename=INCLUSIONS_FILE_NAME # str (current file)
)
- Check-only mode:
run_quality_checks(
current_inclusions=INCLUSIONS_FILE_NAME, # str (current file)
organizations_list=ORGANIZATIONS_FILE_NAME, # str (organizations file)
old_inclusions_filename=get_old_filename(INCLUSIONS_FILE_NAME) # str (old file)
)
"""
global console, regression_check_config
# Auto-load regression config if not already loaded
if not regression_check_config:
if console is None:
console = Console()
load_regression_check_config(console)
console.print("[bold cyan]══════════════════════════════════════════════════[/bold cyan]")
# Load current_inclusions if it's a filename
if isinstance(current_inclusions, str):
current_inclusions_data = load_json_file(current_inclusions)
if current_inclusions_data is None:
console.print(f"[bold red]Error: Could not load current inclusions from '{current_inclusions}'[/bold red]")
return True, True # Return critical errors if can't load
elif isinstance(current_inclusions, list):
current_inclusions_data = current_inclusions
else:
console.print(f"[bold red]Error: current_inclusions must be either a filename (str) or a list of inclusions[/bold red]")
return True, True
# Load organizations and run coherence check (unless skipped)
has_coherence_critical = False
if not skip_coherence:
# Load organizations_list if it's a filename
if isinstance(organizations_list, str):
organizations_data = load_json_file(organizations_list)
if organizations_data is None:
console.print(f"[bold red]Error: Could not load organizations from '{organizations_list}'[/bold red]")
return True, True # Return critical errors if can't load
elif isinstance(organizations_list, list):
organizations_data = organizations_list
else:
console.print(f"[bold red]Error: organizations_list must be either a filename (str) or a list of organizations[/bold red]")
return True, True
# Run coherence check
has_coherence_critical = coherence_check(current_inclusions_data, organizations_data)
# Load and run non-regression check
has_regression_critical = non_regression_check(current_inclusions_data, old_inclusions_filename)
console.print("[bold cyan]══════════════════════════════════════════════════[/bold cyan]")
print()
return has_coherence_critical, has_regression_critical
# ============================================================================
# NON-REGRESSION CHECK
# ============================================================================
def non_regression_check(output_inclusions, old_inclusions_filename):
"""
Comprehensive config-driven non-regression check comparing current vs old inclusions.
Uses rules from regression_check_config loaded from Excel.
Returns True if any critical issue was found, False otherwise.
Args:
output_inclusions: Current inclusions data (list)
old_inclusions_filename: Filename of old inclusions JSON file to load
"""
# Display section header first
console.print("\n[bold]═══ Non Regression Check ═══[/bold]\n")
# Display loading message and load old inclusions file
console.print(f"[dim]Loading old inclusions from: {old_inclusions_filename}[/dim]")
old_inclusions = load_json_file(old_inclusions_filename)
if old_inclusions is None:
console.print(f"[yellow]⚠ No old inclusions file found at '{old_inclusions_filename}', skipping non-regression check[/yellow]")
return False
has_critical = False # Track critical status
# ========== INTERNAL UTILITY FUNCTIONS ==========
def _is_undefined(value):
"""Check if a value is considered undefined."""
return value in [None, "", "undefined"]
def _values_are_equal(val1, val2):
"""
Compare two values with special handling for undefined values.
- If both are undefined → considered equal
- Otherwise → strict equality
"""
if _is_undefined(val1) and _is_undefined(val2):
return True
return val1 == val2
def _apply_pipeline_step(checked_fields, action, field_selector, from_pattern, to_pattern):
"""Apply one pipeline step to checked_fields list IN-PLACE.
Modifies the is_checked status (5th element) of fields matching the selector
and transition pattern.
Args:
checked_fields: List of [group_name, field_name, old_val, new_val, is_checked]
MODIFIED IN-PLACE
action: "include" or "exclude"
field_selector: "*.*", "group.*", or "group.field"
from_pattern: "*undefined", "*defined", "*", or literal value
to_pattern: "*undefined", "*defined", "*", or literal value
Logic:
- For each field in checked_fields:
- If field matches selector AND transition matches:
- if action="include": set is_checked=True
- if action="exclude": set is_checked=False
- Otherwise: leave is_checked unchanged
Returns: None (modifies list in place)
"""
for i, field_record in enumerate(checked_fields):
group_name, field_name, old_val, new_val, is_checked = field_record
# Check if this step applies to this field
if not _field_selector_matches_pattern(field_selector, group_name, field_name):
continue
# Check if transition matches
if _transition_matches(old_val, new_val, from_pattern, to_pattern):
if action == "include":
checked_fields[i][4] = True
elif action == "exclude":
checked_fields[i][4] = False
def _transition_matches(old_val, new_val, expected_old, expected_new):
"""
Check if a transition matches with support for keywords.
Keywords supported (start with *):
- "*undefined": matches None, "", "undefined"
- "*defined": matches any defined value (NOT None, "", "undefined")
- "*": matches any value
All other values are treated as literal values and matched by exact equality.
Args:
old_val: Actual old value
new_val: Actual new value
expected_old: Expected old value or keyword (if starts with *)
expected_new: Expected new value or keyword (if starts with *)
Returns:
True if transition matches
"""
# Handle old value matching
if expected_old == "*undefined":
old_matches = old_val in [None, "", "undefined"]
elif expected_old == "*defined":
old_matches = old_val not in [None, "", "undefined"]
elif expected_old == "*":
old_matches = True
else:
# Literal value matching (exact equality)
old_matches = (old_val == expected_old)
# Handle new value matching
if expected_new == "*undefined":
new_matches = new_val in [None, "", "undefined"]
elif expected_new == "*defined":
new_matches = new_val not in [None, "", "undefined"]
elif expected_new == "*":
new_matches = True
else:
# Literal value matching (exact equality)
new_matches = (new_val == expected_new)
return old_matches and new_matches
def _check_field_matches_exception(group_name, field_name, old_val, new_val, exception_spec):
"""
Check if a field matches an exception specification.
Now supports both single transitions and multiple transitions per exception.
Args:
group_name: Field group name
field_name: Field name
old_val: Old value
new_val: New value
exception_spec: Exception specification dict with "field" and "transition"
Examples:
Single: {"field": "Status", "transition": [false, true]}
Multiple: {"field": "Status", "transition": [[false, true], [true, false]]}
Returns:
True if the field and its transition match the exception
"""
if not isinstance(exception_spec, dict):
return False
exception_field = exception_spec.get("field")
exception_transition = exception_spec.get("transition")
if not exception_field or not exception_transition:
return False
# Parse field specification (format: "field_group.field_name" or just "field_name")
if "." in exception_field:
exc_group, exc_name = exception_field.split(".", 1)
# Must match both group and name
if exc_group != group_name or exc_name != field_name:
return False
else:
# Only field name specified, must match field name only
if exception_field != field_name:
return False
# Check if transition matches (now supports multiple transitions)
if not isinstance(exception_transition, list):
return False
# Check if this is array of arrays: [[old1, new1], [old2, new2], ...]
if exception_transition and isinstance(exception_transition[0], list):
# Multiple transitions
for trans_pair in exception_transition:
if len(trans_pair) != 2:
continue
expected_old, expected_new = trans_pair
if _transition_matches(old_val, new_val, expected_old, expected_new):
return True
return False
# Legacy support: single transition [old, new]
elif len(exception_transition) == 2 and not isinstance(exception_transition[0], list):
expected_old, expected_new = exception_transition
return _transition_matches(old_val, new_val, expected_old, expected_new)
return False
def _get_status_and_style(count, warning_threshold, critical_threshold):
"""Determine status level and visual style."""
nonlocal has_critical
if count > critical_threshold:
has_critical = True
return "CRITICAL", "red", ""
elif count > warning_threshold:
return "WARNING", "yellow", ""
else:
return "OK", "green", ""
def _print_block_header(title, status_tuple, indent=0):
"""Print block header with status."""
indent_str = " " * indent
status, color, emoji = status_tuple
console.print(f"{indent_str}{emoji} [{color}][bold]{title}[/bold][/{color}]")
def _print_check_line(message, count, status_tuple, indent=1):
"""Print a check line."""
indent_str = " " * indent
status, color, emoji = status_tuple
console.print(f"{indent_str}{emoji} [{color}]{message}: {count}[/{color}]")
def _calculate_block_status(line_statuses):
"""Calculate overall block status from line statuses."""
if any(s[0] == "CRITICAL" for s in line_statuses):
return ("CRITICAL", "red", "")
elif any(s[0] == "WARNING" for s in line_statuses):
return ("WARNING", "yellow", "")
else:
return ("OK", "green", "")
# ========== NEW FIELD SELECTION PIPELINE FUNCTIONS ==========
def _field_selector_matches_pattern(selector, group_name, field_name):
"""
Check if a field matches a field_selector pattern.
Patterns:
- "*.*": matches any field
- "group.*": matches any field in specific group
- "group.field": matches specific field
Args:
selector: Field selector pattern string
group_name: Actual group name
field_name: Actual field name
Returns:
True if matches, False otherwise
"""
if selector == "*.*":
return True
sel_group, sel_field = selector.split(".", 1)
# Check group part
if sel_group != "*" and sel_group != group_name:
return False
# Check field part
if sel_field == "*":
return True
return sel_field == field_name
def _apply_field_selection_pipeline(all_fields, field_selection_config):
"""
Apply field_selection pipeline to build candidate_fields.
Args:
all_fields: List of (group_name, field_name) tuples available
field_selection_config: List of [action, field_selector] steps
Returns:
Set of (group_name, field_name) tuples matching pipeline
"""
# Start with empty set
candidate_fields = set()
# If None or empty, return empty (explicit requirement)
if not field_selection_config:
return candidate_fields
# Apply each pipeline step
for action, field_selector in field_selection_config:
for group_name, field_name in all_fields:
# Check if this field matches the selector
if _field_selector_matches_pattern(field_selector, group_name, field_name):
if action == "include":
candidate_fields.add((group_name, field_name))
elif action == "exclude":
candidate_fields.discard((group_name, field_name))
return candidate_fields
def _get_key_field_from_new_inclusions_rule(rule, new_inclusions_list, old_inclusions_list):
"""
Determine key field by applying field_selection to first inclusion sample.
Logic:
1. Get first inclusion from new and old data (representative sample)
2. Apply field_selection pipeline to both (same as any rule)
3. Return first field that exists with value in BOTH inclusions
Assumes inclusion structure is stable across all inclusions (reasonable assumption
for database-backed data).
Args:
rule: "New Inclusions" rule with field_selection config
new_inclusions_list: List of new inclusions
old_inclusions_list: List of old inclusions
Returns:
(key_field_name, field_group) tuple
Raises:
ValueError: If lists empty or no valid key field found
"""
# Get first inclusion from each (representative sample of structure)
if not new_inclusions_list or not old_inclusions_list:
raise ValueError("Cannot determine key field: empty inclusion lists")
new_inc = new_inclusions_list[0] # First new inclusion
old_inc = old_inclusions_list[0] # First old inclusion
# Apply field_selection pipeline (SAME AS FOR ANY RULE!)
# This respects the full pipeline: include/exclude/wildcards
candidate_fields = _build_candidate_fields(new_inc, old_inc, rule.get("field_selection"))
if not candidate_fields:
raise ValueError(
f"field_selection produced no candidate fields. "
f"Config: {rule.get('field_selection')}"
)
# Try each candidate field in order (sorted for determinism)
# Return first field that has non-null value in both inclusions
for group_name, field_name in sorted(candidate_fields):
new_val = get_nested_value(new_inc, [group_name, field_name])
old_val = get_nested_value(old_inc, [group_name, field_name])
if new_val is not None and old_val is not None:
return field_name, group_name
# No valid key found
raise ValueError(
f"No field in field_selection has values in both first new and old inclusion. "
f"Candidates from pipeline: {candidate_fields}. "
f"Verify field_selection config or data has proper values."
)
def _build_inclusion_dict(inclusions_list, key_field, field_group="Patient_Identification"):
"""
Build dictionary indexed by key field.
Args:
inclusions_list: List of inclusion dicts
key_field: Field name to use as key (e.g., "Patient_Id", "Pseudo")
field_group: Group containing the key field (default: "Patient_Identification")
Returns:
Dict with key values as keys, inclusion dicts as values
"""
result = {}
for inclusion in inclusions_list:
key = get_nested_value(inclusion, [field_group, key_field])
if key:
result[key] = inclusion
return result
# ========== TRANSITION MATCHING FUNCTIONS ==========
def _matches_transition(old_val, new_val, transitions_config):
"""Check if (old_val, new_val) matches any configured transition.
Uses the helper function _transition_matches for consistency.
Supports keywords with asterisk prefix:
- *undefined: matches any undefined value (None, "", "undefined")
- *defined: matches any defined value (not None, "", or "undefined")
- *: wildcard, matches any value
All other values are treated as literal values and matched by exact equality.
"""
if transitions_config is None:
return False
for transition in transitions_config:
expected_old, expected_new = transition
if _transition_matches(old_val, new_val, expected_old, expected_new):
return True
return False
# ========== RULE PROCESSING FUNCTIONS ==========
def _process_special_rule(rule, line_label, new_dict, old_dict):
"""
Process special rules: "New Inclusions" and "Deleted Inclusions".
These rules simply count the number of keys present in one dict but not the other.
Args:
rule: Rule configuration (unused for counting, but kept for consistency)
line_label: The line label to identify which special rule this is
new_dict: Dictionary of new inclusions
old_dict: Dictionary of old inclusions
Returns:
Count of new or deleted inclusions
"""
if line_label == "New Inclusions":
return len(set(new_dict.keys()) - set(old_dict.keys()))
elif line_label == "Deleted Inclusions":
return len(set(old_dict.keys()) - set(new_dict.keys()))
else:
# Should not happen, but return 0 for safety
return 0
def _process_new_deleted_fields(line_label, new_dict, old_dict):
"""
Process special rules: "New Fields" and "Deleted Fields".
These rules collect all fields that appear/disappear in inclusions, using
qualified names "group.field" to distinguish fields across different groups.
Note: field_selection is NOT used for these rules (must be empty).
Returns a list of tuples: [(field_qualified_name, count_of_inclusions), ...]
where count_of_inclusions is the number of inclusions that have this field added/removed.
Args:
line_label: "New Fields" or "Deleted Fields"
new_dict: Dictionary of new inclusions
old_dict: Dictionary of old inclusions
Returns:
List of (qualified_field_name, inclusion_count) tuples
"""
# Collect field changes across all common inclusions
field_counts = {} # qualified_field_name -> count of inclusions
# Only examine common inclusions (present in both versions)
# Sort for deterministic processing
common_keys = sorted(set(new_dict.keys()) & set(old_dict.keys()))
for key in common_keys:
new_inc = new_dict[key]
old_inc = old_dict[key]
# Get all groups from both versions
# Sort for deterministic processing
all_groups = sorted(set(new_inc.keys()) | set(old_inc.keys()))
for group_name in all_groups:
new_group = new_inc.get(group_name, {})
old_group = old_inc.get(group_name, {})
if not isinstance(new_group, dict):
new_group = {}
if not isinstance(old_group, dict):
old_group = {}
new_fields = set(new_group.keys())
old_fields = set(old_group.keys())
# Determine which fields to count based on line_label
if line_label == "New Fields":
changed_fields = sorted(new_fields - old_fields)
elif line_label == "Deleted Fields":
changed_fields = sorted(old_fields - new_fields)
else:
changed_fields = []
# Count each changed field with qualified name (sorted for determinism)
for field_name in changed_fields:
qualified_name = f"{group_name}.{field_name}"
field_counts[qualified_name] = field_counts.get(qualified_name, 0) + 1
# Convert to list of tuples and sort by count (descending) then by name
result = sorted(field_counts.items(), key=lambda x: (-x[1], x[0]))
return result
def _build_candidate_fields(new_inc, old_inc, field_selection_config):
"""
Helper function to build candidate fields using field_selection pipeline.
Args:
new_inc: New inclusion dict
old_inc: Old inclusion dict
field_selection_config: List of [action, field_selector] pipeline steps
Returns:
Sorted list of (group_name, field_name) tuples that exist in both versions
"""
# Step 1: Collect all available fields from both versions
common_groups = sorted(set(new_inc.keys()) & set(old_inc.keys()))
all_available_fields = []
for group_name in common_groups:
new_group = new_inc.get(group_name, {})
old_group = old_inc.get(group_name, {})
if not isinstance(new_group, dict):
new_group = {}
if not isinstance(old_group, dict):
old_group = {}
# Only fields that exist in both versions
common_field_names = sorted(set(new_group.keys()) & set(old_group.keys()))
for field_name in common_field_names:
all_available_fields.append((group_name, field_name))
# Step 2: Apply field_selection pipeline
if not field_selection_config:
return []
candidate_fields = _apply_field_selection_pipeline(
all_available_fields,
field_selection_config
)
return sorted(candidate_fields, key=lambda x: (x[0], x[1]))
def _process_rule(rule, new_dict, old_dict):
"""
Process a single regression check rule with correct 4-step logic.
Logic:
1. Build candidate fields using field_selection pipeline
2. For each changed field, check if transition matches → mark as "checked"
3. Apply transitions pipeline steps → modify "checked" status
4. Apply bloc_scope (all/any) → count inclusion
Only processes common_keys (inclusions present in both new and old dicts).
Args:
rule: Rule configuration dict
new_dict: Dict of new inclusions indexed by key field
old_dict: Dict of old inclusions indexed by key field
Returns:
Tuple of (count, details_list) where:
- count: Number of matching inclusions
- details_list: List of (inclusion_key, field_changes) tuples for DEBUG_MODE
field_changes is list of (group.field, old_val, new_val) tuples
"""
# Check for config errors first
if rule.get("_config_error"):
return 0, []
field_selection_config = rule.get("field_selection")
bloc_scope = rule.get("bloc_scope") or "any"
# Only process inclusions present in both versions
common_keys = sorted(set(new_dict.keys()) & set(old_dict.keys()))
matching_inclusions_count = 0
details_list = [] # For DEBUG_MODE
for key in common_keys:
new_inc = new_dict[key]
old_inc = old_dict[key]
# Step 1: Build candidate fields using field_selection pipeline
candidate_fields = _build_candidate_fields(new_inc, old_inc, field_selection_config)
# If no candidate fields, skip this inclusion
if not candidate_fields:
continue
# Step 2 & 3: Build initial field list and apply transitions pipeline
# Initialize field list with all changed fields
# Format: [group_name, field_name, old_val, new_val, is_checked]
all_fields_list = []
changed_fields = [] # Track for bloc_scope="all" logic
for group_name, field_name in candidate_fields:
new_val = get_nested_value(new_inc, [group_name, field_name])
old_val = get_nested_value(old_inc, [group_name, field_name])
# Track if field has changed (for bloc_scope="all" logic)
field_has_changed = not _values_are_equal(old_val, new_val)
if field_has_changed:
changed_fields.append((group_name, field_name))
# Add to all_fields_list with is_checked=False initially
all_fields_list.append([group_name, field_name, old_val, new_val, False])
# Apply transitions pipeline: each step modifies is_checked in-place
transitions_config = rule.get("transitions", [])
if transitions_config and isinstance(transitions_config, list):
for action, field_selector, from_val, to_val in transitions_config:
_apply_pipeline_step(all_fields_list, action, field_selector, from_val, to_val)
# Extract final checked fields
checked_fields = [(f[0], f[1], f[2], f[3]) for f in all_fields_list if f[4]]
# Step 4: Apply bloc_scope logic
inclusion_matches = False
if bloc_scope == "all":
# ALL fields that CHANGED must match the transition pattern
# (unchanged fields don't block the rule)
if len(changed_fields) > 0 and len(checked_fields) == len(changed_fields):
inclusion_matches = True
else: # bloc_scope == "any"
# AT LEAST ONE field must be checked
if len(checked_fields) > 0:
inclusion_matches = True
if inclusion_matches:
matching_inclusions_count += 1
# Collect details for debug_mode
if debug_mode and checked_fields:
field_changes = [(f"{gn}.{fn}", ov, nv) for gn, fn, ov, nv in checked_fields]
details_list.append((key, field_changes))
return matching_inclusions_count, details_list
# ========== MAIN LOGIC ==========
# Determine key field from "New Inclusions" rule config
key_field = None
field_group = None
for rule in regression_check_config:
if rule.get("line_label") == "New Inclusions":
try:
key_field, field_group = _get_key_field_from_new_inclusions_rule(
rule,
output_inclusions,
old_inclusions
)
break
except ValueError as e:
console.print(f"[bold red]Error determining key field: {e}[/bold red]")
return True # Critical error, trigger user confirmation
if not key_field:
console.print("[bold red]Error: 'New Inclusions' rule not found or has no valid field_selection[/bold red]")
return True # Critical error, trigger user confirmation
console.print(f"[dim]Using key field: {field_group}.{key_field}[/dim]\n")
new_dict = _build_inclusion_dict(output_inclusions, key_field, field_group)
old_dict = _build_inclusion_dict(old_inclusions, key_field, field_group)
# Group rules by bloc_title, preserving order of first appearance in regression_check_config
blocs = {}
bloc_order = [] # Track order of first appearance
for rule in regression_check_config:
bloc_title = rule["bloc_title"]
if bloc_title not in blocs:
blocs[bloc_title] = []
bloc_order.append(bloc_title)
blocs[bloc_title].append(rule)
# Process each bloc in order of first appearance
for bloc_title in bloc_order:
rules = blocs[bloc_title]
line_results = []
for rule in rules:
line_label = rule["line_label"]
warning_threshold = rule["warning_threshold"]
critical_threshold = rule["critical_threshold"]
# Detect special rules and route to appropriate processing function
if line_label in ["New Inclusions", "Deleted Inclusions"]:
# Special rules: just count new/deleted keys
count = _process_special_rule(rule, line_label, new_dict, old_dict)
line_results.append((line_label, count, None, "simple")) # type: simple count
elif line_label in ["New Fields", "Deleted Fields"]:
# Special rules: collect field-by-field details
field_list = _process_new_deleted_fields(line_label, new_dict, old_dict)
# Count is the number of fields detected
count = len(field_list)
line_results.append((line_label, count, field_list, "fields")) # type: field list
else:
# Normal rules: apply 4-step logic
count, details = _process_rule(rule, new_dict, old_dict)
line_results.append((line_label, count, details, "details")) # type: inclusion details
# Calculate status for each line now that we have counts
line_results_with_status = []
for line_label, count, data, result_type in line_results:
# Find the rule to get thresholds
rule = next(r for r in rules if r["line_label"] == line_label)
warning_threshold = rule["warning_threshold"]
critical_threshold = rule["critical_threshold"]
status_tuple = _get_status_and_style(count, warning_threshold, critical_threshold)
line_results_with_status.append((line_label, count, data, result_type, status_tuple))
# Calculate bloc status
bloc_status = _calculate_block_status([result[4] for result in line_results_with_status])
# Display bloc header
_print_block_header(bloc_title, bloc_status, indent=0)
# Display lines based on bloc and status
for line_label, count, data, result_type, status_tuple in line_results_with_status:
# Structure bloc shows everything, others only show non-OK lines
should_display = (bloc_title == "Structure") or (status_tuple[0] != "OK")
if should_display:
if result_type == "fields":
# Display field list with title and sub-items
_print_check_line(line_label, count, status_tuple, indent=1)
# Display each field as a sub-item
for field_name, inclusion_count in data:
console.print(f" {field_name} ({inclusion_count} inclusions)")
elif result_type == "details":
# Display count
_print_check_line(line_label, count, status_tuple, indent=1)
# Display detailed changes if debug_mode is enabled and data exists
if debug_mode and data and len(data) > 0:
for inclusion_key, field_changes in data:
console.print(f" [dim]{key_field}: {inclusion_key}[/dim]")
for qualified_field, old_val, new_val in field_changes:
# Format values for display
old_display = f"'{old_val}'" if isinstance(old_val, str) else str(old_val)
new_display = f"'{new_val}'" if isinstance(new_val, str) else str(new_val)
console.print(f" - {qualified_field}: {old_display}{new_display}")
else:
# Simple count display
_print_check_line(line_label, count, status_tuple, indent=1)
console.print()
return has_critical