811 lines
34 KiB
Python
811 lines
34 KiB
Python
"""
|
|
DO Dashboard - Quality Checks Module
|
|
|
|
This module contains all quality assurance functions:
|
|
- JSON file loading and backup utilities
|
|
- Comprehensive non-regression checks with configurable rules
|
|
- Config-driven validation with Warning/Critical thresholds
|
|
- Support for special rules (New/Deleted Requests, New/Deleted Fields)
|
|
- 4-step logic for normal rules (field selection, transition matching, exception application, bloc_scope)
|
|
|
|
Note: Coherence check is not applicable for DO Dashboard since organization
|
|
counters are computed directly from request details (not from a separate API).
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
|
|
import openpyxl
|
|
from rich.console import Console
|
|
from do_dashboard_utils import get_nested_value, get_old_filename as _get_old_filename, get_config_path
|
|
from do_dashboard_constants import (
|
|
REQUESTS_FILE_NAME,
|
|
ORGANIZATIONS_FILE_NAME,
|
|
OLD_FILE_SUFFIX,
|
|
DASHBOARD_CONFIG_FILE_NAME,
|
|
REGRESSION_CHECK_TABLE_NAME
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# MODULE CONFIGURATION
|
|
# ============================================================================
|
|
|
|
# Debug mode: Set to True to display detailed changes for each regression check rule
|
|
debug_mode = False
|
|
|
|
|
|
def enable_debug_mode():
|
|
"""Enable debug mode to display detailed changes for each regression check rule."""
|
|
global debug_mode
|
|
debug_mode = True
|
|
if console:
|
|
console.print("[dim]DEBUG MODE enabled - detailed changes will be displayed[/dim]")
|
|
|
|
|
|
# ============================================================================
|
|
# MODULE DEPENDENCIES (injected from main module)
|
|
# ============================================================================
|
|
|
|
# Will be injected by the main module
|
|
console = None
|
|
|
|
# Regression check config is loaded on-demand via load_regression_check_config()
|
|
regression_check_config = []
|
|
|
|
# NOTE: File names and table names are imported from do_dashboard_constants.py (SINGLE SOURCE OF TRUTH):
|
|
# - REQUESTS_FILE_NAME
|
|
# - ORGANIZATIONS_FILE_NAME
|
|
# - OLD_FILE_SUFFIX
|
|
# - DASHBOARD_CONFIG_FILE_NAME
|
|
# - REGRESSION_CHECK_TABLE_NAME
|
|
|
|
|
|
def set_dependencies(console_instance):
|
|
"""
|
|
Inject console instance from main module.
|
|
|
|
Args:
|
|
console_instance: Rich Console instance for formatted output
|
|
"""
|
|
global console
|
|
console = console_instance
|
|
|
|
|
|
# ============================================================================
|
|
# CONFIGURATION LOADING
|
|
# ============================================================================
|
|
|
|
def load_regression_check_config(console_instance=None):
|
|
"""Loads and validates the regression check configuration from the Excel file.
|
|
|
|
Args:
|
|
console_instance: Optional Rich Console instance. If not provided, uses global console.
|
|
"""
|
|
global regression_check_config, console
|
|
|
|
if console_instance:
|
|
console = console_instance
|
|
|
|
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME)
|
|
|
|
try:
|
|
workbook = openpyxl.load_workbook(config_path)
|
|
except FileNotFoundError:
|
|
error_msg = f"Error: Configuration file not found at: {config_path}"
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
if REGRESSION_CHECK_TABLE_NAME not in workbook.sheetnames:
|
|
error_msg = f"Error: Sheet '{REGRESSION_CHECK_TABLE_NAME}' not found in the configuration file."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
sheet = workbook[REGRESSION_CHECK_TABLE_NAME]
|
|
headers = [cell.value for cell in sheet[1]]
|
|
|
|
temp_config = []
|
|
|
|
for row_index, row in enumerate(sheet.iter_rows(min_row=2, values_only=True), start=2):
|
|
rule_config = dict(zip(headers, row))
|
|
|
|
# Skip if ignore column contains "ignore" (case insensitive)
|
|
ignore_value = rule_config.get("ignore")
|
|
if ignore_value and isinstance(ignore_value, str) and "ignore" in ignore_value.lower():
|
|
continue
|
|
|
|
# Skip if all columns are None (empty row)
|
|
if all(value is None for value in row):
|
|
continue
|
|
|
|
# Validate bloc_title and line_label
|
|
bloc_title = rule_config.get("bloc_title")
|
|
line_label = rule_config.get("line_label")
|
|
|
|
if not bloc_title or not isinstance(bloc_title, str):
|
|
continue # Skip rows without bloc_title (header separators, etc.)
|
|
|
|
if not line_label or not isinstance(line_label, str):
|
|
error_msg = f"Error in Regression_Check config, row {row_index}: 'line_label' is mandatory when 'bloc_title' is specified."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
# Validate thresholds
|
|
warning_threshold = rule_config.get("warning_threshold")
|
|
critical_threshold = rule_config.get("critical_threshold")
|
|
|
|
if warning_threshold is None or not isinstance(warning_threshold, (int, float)) or warning_threshold < 0:
|
|
error_msg = f"Error in Regression_Check config, row {row_index}: 'warning_threshold' must be a number >= 0."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
if critical_threshold is None or not isinstance(critical_threshold, (int, float)) or critical_threshold < 0:
|
|
error_msg = f"Error in Regression_Check config, row {row_index}: 'critical_threshold' must be a number >= 0."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
# Parse JSON fields
|
|
for json_field in ["field_selection", "transitions"]:
|
|
value = rule_config.get(json_field)
|
|
if value and isinstance(value, str):
|
|
try:
|
|
rule_config[json_field] = json.loads(value)
|
|
except json.JSONDecodeError:
|
|
error_msg = f"Error in Regression_Check config, row {row_index}, field '{json_field}': Invalid JSON format."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
elif value is None:
|
|
rule_config[json_field] = None
|
|
|
|
# Validate field_selection format
|
|
field_selection = rule_config.get("field_selection")
|
|
|
|
# Special rules that don't use field_selection
|
|
special_rules_no_selection = ["New Fields", "Deleted Fields", "Deleted Requests"]
|
|
|
|
if line_label not in special_rules_no_selection:
|
|
# Standard rules and "New Requests" MUST have field_selection
|
|
if field_selection is None:
|
|
error_msg = f"Error in Regression_Check config, row {row_index}: 'field_selection' is mandatory for rule '{line_label}'."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
if not isinstance(field_selection, list):
|
|
console.print(f"[yellow]⚠ Row {row_index}: 'field_selection' must be a JSON array of [action, selector] pairs, skipping rule[/yellow]")
|
|
rule_config["_config_error"] = True
|
|
else:
|
|
for step_idx, step in enumerate(field_selection):
|
|
if not isinstance(step, list) or len(step) != 2:
|
|
console.print(f"[yellow]⚠ Row {row_index}: field_selection[{step_idx}] must be array of 2 elements [action, selector], skipping rule[/yellow]")
|
|
rule_config["_config_error"] = True
|
|
break
|
|
|
|
action, field_selector = step
|
|
|
|
if action not in ["include", "exclude"]:
|
|
console.print(f"[yellow]⚠ Row {row_index}: field_selection[{step_idx}] action must be 'include' or 'exclude', got '{action}', skipping rule[/yellow]")
|
|
rule_config["_config_error"] = True
|
|
break
|
|
|
|
if not isinstance(field_selector, str) or "." not in field_selector:
|
|
console.print(f"[yellow]⚠ Row {row_index}: field_selection[{step_idx}] selector must be string with dot notation (e.g., '*.*', 'group.*', 'group.field'), got '{field_selector}', skipping rule[/yellow]")
|
|
rule_config["_config_error"] = True
|
|
break
|
|
else:
|
|
if field_selection is not None and field_selection != [] and field_selection != "":
|
|
console.print(f"[yellow]⚠ Row {row_index}: Special rule '{line_label}' should have empty field_selection, got {field_selection}[/yellow]")
|
|
rule_config["_config_error"] = True
|
|
|
|
# Validate bloc_scope
|
|
bloc_scope = rule_config.get("bloc_scope")
|
|
if bloc_scope is not None and bloc_scope not in ["all", "any"]:
|
|
error_msg = f"Error in Regression_Check config, row {row_index}: 'bloc_scope' must be 'all' or 'any'."
|
|
logging.critical(error_msg)
|
|
console.print(f"[bold red]{error_msg}[/bold red]")
|
|
raise Exception(error_msg)
|
|
|
|
# Validate transitions format
|
|
transitions = rule_config.get("transitions")
|
|
config_error = False
|
|
|
|
if transitions is not None:
|
|
if not isinstance(transitions, list):
|
|
console.print(f"[yellow]⚠ Row {row_index}: 'transitions' must be a JSON array, skipping this rule[/yellow]")
|
|
config_error = True
|
|
else:
|
|
for step_idx, transition_step in enumerate(transitions):
|
|
if not isinstance(transition_step, list) or len(transition_step) != 4:
|
|
console.print(f"[yellow]⚠ Row {row_index}: transitions[{step_idx}] must be array of 4 elements [action, field_selector, from, to], skipping[/yellow]")
|
|
config_error = True
|
|
break
|
|
|
|
action, field_selector, from_val, to_val = transition_step
|
|
|
|
if action not in ["include", "exclude"]:
|
|
console.print(f"[yellow]⚠ Row {row_index}: transitions[{step_idx}] action must be 'include' or 'exclude', got '{action}', skipping[/yellow]")
|
|
config_error = True
|
|
break
|
|
|
|
if not isinstance(field_selector, str) or "." not in field_selector:
|
|
console.print(f"[yellow]⚠ Row {row_index}: transitions[{step_idx}] field_selector must be string with dot notation, got '{field_selector}', skipping[/yellow]")
|
|
config_error = True
|
|
break
|
|
|
|
if config_error:
|
|
rule_config["_config_error"] = True
|
|
|
|
temp_config.append(rule_config)
|
|
|
|
regression_check_config = temp_config
|
|
console.print(f"Loaded {len(regression_check_config)} regression check rules.", style="green")
|
|
|
|
|
|
def run_check_only_mode(sys_argv):
|
|
"""
|
|
Orchestrates CHECK_ONLY and CHECK_ONLY_COMPARE modes.
|
|
|
|
- CHECK_ONLY: Full non-regression validation on existing files
|
|
- CHECK_ONLY_COMPARE: Regression-only comparison of two specific files
|
|
|
|
Args:
|
|
sys_argv: sys.argv from main script (to parse command-line arguments)
|
|
"""
|
|
global console
|
|
|
|
if console is None:
|
|
console = Console()
|
|
|
|
print()
|
|
|
|
# Detect CHECK_ONLY_COMPARE mode: --check-only <file1> <file2>
|
|
if len(sys_argv) >= 4:
|
|
current_file = sys_argv[2]
|
|
old_file = sys_argv[3]
|
|
|
|
console.print("[bold cyan]═══ CHECK ONLY COMPARE MODE ═══[/bold cyan]")
|
|
console.print(f"Comparing two specific files:\n")
|
|
console.print(f" Current: [bold]{current_file}[/bold]")
|
|
console.print(f" Old: [bold]{old_file}[/bold]\n")
|
|
|
|
print()
|
|
load_regression_check_config(console)
|
|
|
|
print()
|
|
has_regression_critical = run_quality_checks(
|
|
current_requests=current_file,
|
|
old_requests_filename=old_file
|
|
)
|
|
|
|
if has_regression_critical:
|
|
console.print("[bold red]✗ CRITICAL issues detected![/bold red]")
|
|
else:
|
|
console.print("[bold green]✓ All checks passed successfully![/bold green]")
|
|
|
|
else:
|
|
console.print("[bold cyan]═══ CHECK ONLY MODE ═══[/bold cyan]")
|
|
console.print("Running quality checks on existing data files without collecting new data.\n")
|
|
|
|
print()
|
|
load_regression_check_config(console)
|
|
|
|
print()
|
|
old_requests_file = _get_old_filename(REQUESTS_FILE_NAME, OLD_FILE_SUFFIX)
|
|
has_regression_critical = run_quality_checks(
|
|
current_requests=REQUESTS_FILE_NAME,
|
|
old_requests_filename=old_requests_file
|
|
)
|
|
|
|
if has_regression_critical:
|
|
console.print("[bold red]✗ CRITICAL issues detected![/bold red]")
|
|
else:
|
|
console.print("[bold green]✓ All checks passed successfully![/bold green]")
|
|
|
|
|
|
# ============================================================================
|
|
# FILE UTILITIES
|
|
# ============================================================================
|
|
|
|
def load_json_file(filename):
|
|
"""
|
|
Loads a JSON file (requests, organizations, or any JSON data).
|
|
Returns the parsed JSON data or None if file doesn't exist or error occurred.
|
|
"""
|
|
if os.path.exists(filename):
|
|
try:
|
|
with open(filename, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logging.warning(f"Could not load JSON file '{filename}': {e}")
|
|
console.print(f"[yellow]⚠ Warning: Could not load JSON file '{filename}': {e}[/yellow]")
|
|
return None
|
|
|
|
|
|
def backup_output_files():
|
|
"""
|
|
Silently backups current output files before writing new versions.
|
|
Called AFTER all checks pass to avoid losing history on crash.
|
|
"""
|
|
def _backup_file_silent(source, destination):
|
|
if os.path.exists(source):
|
|
try:
|
|
shutil.copy2(source, destination)
|
|
except Exception as e:
|
|
logging.warning(f"Could not backup {source}: {e}")
|
|
|
|
_backup_file_silent(REQUESTS_FILE_NAME, _get_old_filename(REQUESTS_FILE_NAME, OLD_FILE_SUFFIX))
|
|
_backup_file_silent(ORGANIZATIONS_FILE_NAME, _get_old_filename(ORGANIZATIONS_FILE_NAME, OLD_FILE_SUFFIX))
|
|
|
|
|
|
# ============================================================================
|
|
# QUALITY CHECKS ORCHESTRATION
|
|
# ============================================================================
|
|
|
|
def run_quality_checks(current_requests, old_requests_filename):
|
|
"""
|
|
Runs non-regression quality checks on requests data.
|
|
|
|
Note: Coherence check is not applicable for DO Dashboard since organization
|
|
counters are computed from request details, not from a separate API.
|
|
|
|
Args:
|
|
current_requests: Either a filename (str) to load requests from,
|
|
or a list of request dictionaries (already in memory)
|
|
old_requests_filename: Filename of old requests for regression comparison (str)
|
|
|
|
Returns:
|
|
has_regression_critical (bool)
|
|
|
|
Usage:
|
|
- Normal mode:
|
|
run_quality_checks(
|
|
current_requests=output_requests, # list (in memory)
|
|
old_requests_filename=REQUESTS_FILE_NAME # str (current file on disk)
|
|
)
|
|
|
|
- Check-only mode:
|
|
run_quality_checks(
|
|
current_requests=REQUESTS_FILE_NAME, # str (current file)
|
|
old_requests_filename=get_old_filename(REQUESTS_FILE_NAME) # str (old file)
|
|
)
|
|
"""
|
|
global console, regression_check_config
|
|
|
|
if not regression_check_config:
|
|
if console is None:
|
|
console = Console()
|
|
load_regression_check_config(console)
|
|
|
|
console.print("[bold cyan]══════════════════════════════════════════════════[/bold cyan]")
|
|
|
|
# Load current_requests if it's a filename
|
|
if isinstance(current_requests, str):
|
|
current_requests_data = load_json_file(current_requests)
|
|
if current_requests_data is None:
|
|
console.print(f"[bold red]Error: Could not load current requests from '{current_requests}'[/bold red]")
|
|
return True
|
|
elif isinstance(current_requests, list):
|
|
current_requests_data = current_requests
|
|
else:
|
|
console.print(f"[bold red]Error: current_requests must be either a filename (str) or a list of requests[/bold red]")
|
|
return True
|
|
|
|
# Run non-regression check
|
|
has_regression_critical = non_regression_check(current_requests_data, old_requests_filename)
|
|
|
|
console.print("[bold cyan]══════════════════════════════════════════════════[/bold cyan]")
|
|
print()
|
|
|
|
return has_regression_critical
|
|
|
|
|
|
# ============================================================================
|
|
# NON-REGRESSION CHECK
|
|
# ============================================================================
|
|
|
|
def non_regression_check(output_requests, old_requests_filename):
|
|
"""
|
|
Comprehensive config-driven non-regression check comparing current vs old requests.
|
|
Uses rules from regression_check_config loaded from Excel.
|
|
Returns True if any critical issue was found, False otherwise.
|
|
|
|
Args:
|
|
output_requests: Current requests data (list)
|
|
old_requests_filename: Filename of old requests JSON file to load
|
|
"""
|
|
console.print("\n[bold]═══ Non Regression Check ═══[/bold]\n")
|
|
|
|
console.print(f"[dim]Loading old requests from: {old_requests_filename}[/dim]")
|
|
old_requests = load_json_file(old_requests_filename)
|
|
|
|
if old_requests is None:
|
|
console.print(f"[yellow]⚠ No old requests file found at '{old_requests_filename}', skipping non-regression check[/yellow]")
|
|
return False
|
|
|
|
has_critical = False
|
|
|
|
# ========== INTERNAL UTILITY FUNCTIONS ==========
|
|
|
|
def _is_undefined(value):
|
|
return value in [None, "", "undefined"]
|
|
|
|
def _values_are_equal(val1, val2):
|
|
if _is_undefined(val1) and _is_undefined(val2):
|
|
return True
|
|
return val1 == val2
|
|
|
|
def _apply_pipeline_step(checked_fields, action, field_selector, from_pattern, to_pattern):
|
|
for i, field_record in enumerate(checked_fields):
|
|
group_name, field_name, old_val, new_val, is_checked = field_record
|
|
if not _field_selector_matches_pattern(field_selector, group_name, field_name):
|
|
continue
|
|
if _transition_matches(old_val, new_val, from_pattern, to_pattern):
|
|
if action == "include":
|
|
checked_fields[i][4] = True
|
|
elif action == "exclude":
|
|
checked_fields[i][4] = False
|
|
|
|
def _transition_matches(old_val, new_val, expected_old, expected_new):
|
|
if expected_old == "*undefined":
|
|
old_matches = old_val in [None, "", "undefined"]
|
|
elif expected_old == "*defined":
|
|
old_matches = old_val not in [None, "", "undefined"]
|
|
elif expected_old == "*":
|
|
old_matches = True
|
|
else:
|
|
old_matches = (old_val == expected_old)
|
|
|
|
if expected_new == "*undefined":
|
|
new_matches = new_val in [None, "", "undefined"]
|
|
elif expected_new == "*defined":
|
|
new_matches = new_val not in [None, "", "undefined"]
|
|
elif expected_new == "*":
|
|
new_matches = True
|
|
else:
|
|
new_matches = (new_val == expected_new)
|
|
|
|
return old_matches and new_matches
|
|
|
|
def _get_status_and_style(count, warning_threshold, critical_threshold):
|
|
nonlocal has_critical
|
|
if count > critical_threshold:
|
|
has_critical = True
|
|
return "CRITICAL", "red", "✗"
|
|
elif count > warning_threshold:
|
|
return "WARNING", "yellow", "⚠"
|
|
else:
|
|
return "OK", "green", "✓"
|
|
|
|
def _print_block_header(title, status_tuple, indent=0):
|
|
indent_str = " " * indent
|
|
status, color, emoji = status_tuple
|
|
console.print(f"{indent_str}{emoji} [{color}][bold]{title}[/bold][/{color}]")
|
|
|
|
def _print_check_line(message, count, status_tuple, indent=1):
|
|
indent_str = " " * indent
|
|
status, color, emoji = status_tuple
|
|
console.print(f"{indent_str}{emoji} [{color}]{message}: {count}[/{color}]")
|
|
|
|
def _calculate_block_status(line_statuses):
|
|
if any(s[0] == "CRITICAL" for s in line_statuses):
|
|
return ("CRITICAL", "red", "✗")
|
|
elif any(s[0] == "WARNING" for s in line_statuses):
|
|
return ("WARNING", "yellow", "⚠")
|
|
else:
|
|
return ("OK", "green", "✓")
|
|
|
|
def _field_selector_matches_pattern(selector, group_name, field_name):
|
|
if selector == "*.*":
|
|
return True
|
|
sel_group, sel_field = selector.split(".", 1)
|
|
if sel_group != "*" and sel_group != group_name:
|
|
return False
|
|
if sel_field == "*":
|
|
return True
|
|
return sel_field == field_name
|
|
|
|
def _apply_field_selection_pipeline(all_fields, field_selection_config):
|
|
candidate_fields = set()
|
|
if not field_selection_config:
|
|
return candidate_fields
|
|
for action, field_selector in field_selection_config:
|
|
for group_name, field_name in all_fields:
|
|
if _field_selector_matches_pattern(field_selector, group_name, field_name):
|
|
if action == "include":
|
|
candidate_fields.add((group_name, field_name))
|
|
elif action == "exclude":
|
|
candidate_fields.discard((group_name, field_name))
|
|
return candidate_fields
|
|
|
|
def _get_key_field_from_new_requests_rule(rule, new_requests_list, old_requests_list):
|
|
if not new_requests_list or not old_requests_list:
|
|
raise ValueError("Cannot determine key field: empty request lists")
|
|
|
|
new_req = new_requests_list[0]
|
|
old_req = old_requests_list[0]
|
|
|
|
candidate_fields = _build_candidate_fields(new_req, old_req, rule.get("field_selection"))
|
|
|
|
if not candidate_fields:
|
|
raise ValueError(
|
|
f"field_selection produced no candidate fields. "
|
|
f"Config: {rule.get('field_selection')}"
|
|
)
|
|
|
|
# Iterate in config order (field_selection), not alphabetically
|
|
field_selection_config = rule.get("field_selection") or []
|
|
ordered_candidates = []
|
|
seen = set()
|
|
for _action, selector in field_selection_config:
|
|
sel_group, sel_field = selector.split(".", 1)
|
|
for (group_name, field_name) in candidate_fields:
|
|
if (group_name, field_name) in seen:
|
|
continue
|
|
if (sel_group in ("*", group_name)) and (sel_field in ("*", field_name)):
|
|
ordered_candidates.append((group_name, field_name))
|
|
seen.add((group_name, field_name))
|
|
|
|
for group_name, field_name in ordered_candidates:
|
|
new_val = get_nested_value(new_req, [group_name, field_name])
|
|
old_val = get_nested_value(old_req, [group_name, field_name])
|
|
if new_val is not None and old_val is not None:
|
|
return field_name, group_name
|
|
|
|
raise ValueError(
|
|
f"No field in field_selection has values in both first new and old request. "
|
|
f"Candidates from pipeline: {candidate_fields}. "
|
|
f"Verify field_selection config or data has proper values."
|
|
)
|
|
|
|
def _build_requests_dict(requests_list, key_field, field_group):
|
|
result = {}
|
|
for request in requests_list:
|
|
key = get_nested_value(request, [field_group, key_field])
|
|
if key:
|
|
result[key] = request
|
|
return result
|
|
|
|
def _matches_transition(old_val, new_val, transitions_config):
|
|
if transitions_config is None:
|
|
return False
|
|
for transition in transitions_config:
|
|
expected_old, expected_new = transition
|
|
if _transition_matches(old_val, new_val, expected_old, expected_new):
|
|
return True
|
|
return False
|
|
|
|
def _process_special_rule(rule, line_label, new_dict, old_dict):
|
|
if line_label == "New Requests":
|
|
return len(set(new_dict.keys()) - set(old_dict.keys()))
|
|
elif line_label == "Deleted Requests":
|
|
return len(set(old_dict.keys()) - set(new_dict.keys()))
|
|
else:
|
|
return 0
|
|
|
|
def _process_new_deleted_fields(line_label, new_dict, old_dict):
|
|
field_counts = {}
|
|
common_keys = sorted(set(new_dict.keys()) & set(old_dict.keys()))
|
|
|
|
for key in common_keys:
|
|
new_req = new_dict[key]
|
|
old_req = old_dict[key]
|
|
|
|
all_groups = sorted(set(new_req.keys()) | set(old_req.keys()))
|
|
|
|
for group_name in all_groups:
|
|
new_group = new_req.get(group_name, {})
|
|
old_group = old_req.get(group_name, {})
|
|
|
|
if not isinstance(new_group, dict):
|
|
new_group = {}
|
|
if not isinstance(old_group, dict):
|
|
old_group = {}
|
|
|
|
new_fields = set(new_group.keys())
|
|
old_fields = set(old_group.keys())
|
|
|
|
if line_label == "New Fields":
|
|
changed_fields = sorted(new_fields - old_fields)
|
|
elif line_label == "Deleted Fields":
|
|
changed_fields = sorted(old_fields - new_fields)
|
|
else:
|
|
changed_fields = []
|
|
|
|
for field_name in changed_fields:
|
|
qualified_name = f"{group_name}.{field_name}"
|
|
field_counts[qualified_name] = field_counts.get(qualified_name, 0) + 1
|
|
|
|
return sorted(field_counts.items(), key=lambda x: (-x[1], x[0]))
|
|
|
|
def _build_candidate_fields(new_req, old_req, field_selection_config):
|
|
common_groups = sorted(set(new_req.keys()) & set(old_req.keys()))
|
|
all_available_fields = []
|
|
|
|
for group_name in common_groups:
|
|
new_group = new_req.get(group_name, {})
|
|
old_group = old_req.get(group_name, {})
|
|
|
|
if not isinstance(new_group, dict):
|
|
new_group = {}
|
|
if not isinstance(old_group, dict):
|
|
old_group = {}
|
|
|
|
common_field_names = sorted(set(new_group.keys()) & set(old_group.keys()))
|
|
for field_name in common_field_names:
|
|
all_available_fields.append((group_name, field_name))
|
|
|
|
if not field_selection_config:
|
|
return []
|
|
|
|
candidate_fields = _apply_field_selection_pipeline(all_available_fields, field_selection_config)
|
|
return sorted(candidate_fields, key=lambda x: (x[0], x[1]))
|
|
|
|
def _process_rule(rule, new_dict, old_dict):
|
|
if rule.get("_config_error"):
|
|
return 0, []
|
|
|
|
field_selection_config = rule.get("field_selection")
|
|
bloc_scope = rule.get("bloc_scope") or "any"
|
|
|
|
common_keys = sorted(set(new_dict.keys()) & set(old_dict.keys()))
|
|
matching_requests_count = 0
|
|
details_list = []
|
|
|
|
for key in common_keys:
|
|
new_req = new_dict[key]
|
|
old_req = old_dict[key]
|
|
|
|
candidate_fields = _build_candidate_fields(new_req, old_req, field_selection_config)
|
|
|
|
if not candidate_fields:
|
|
continue
|
|
|
|
all_fields_list = []
|
|
changed_fields = []
|
|
|
|
for group_name, field_name in candidate_fields:
|
|
new_val = get_nested_value(new_req, [group_name, field_name])
|
|
old_val = get_nested_value(old_req, [group_name, field_name])
|
|
|
|
field_has_changed = not _values_are_equal(old_val, new_val)
|
|
if field_has_changed:
|
|
changed_fields.append((group_name, field_name))
|
|
all_fields_list.append([group_name, field_name, old_val, new_val, False])
|
|
|
|
transitions_config = rule.get("transitions", [])
|
|
if transitions_config and isinstance(transitions_config, list):
|
|
for action, field_selector, from_val, to_val in transitions_config:
|
|
_apply_pipeline_step(all_fields_list, action, field_selector, from_val, to_val)
|
|
|
|
checked_fields = [(f[0], f[1], f[2], f[3]) for f in all_fields_list if f[4]]
|
|
|
|
request_matches = False
|
|
if bloc_scope == "all":
|
|
if len(changed_fields) > 0 and len(checked_fields) == len(changed_fields):
|
|
request_matches = True
|
|
else: # bloc_scope == "any"
|
|
if len(checked_fields) > 0:
|
|
request_matches = True
|
|
|
|
if request_matches:
|
|
matching_requests_count += 1
|
|
if debug_mode and checked_fields:
|
|
field_changes = [(f"{gn}.{fn}", ov, nv) for gn, fn, ov, nv in checked_fields]
|
|
details_list.append((key, field_changes))
|
|
|
|
return matching_requests_count, details_list
|
|
|
|
# ========== MAIN LOGIC ==========
|
|
|
|
key_field = None
|
|
field_group = None
|
|
|
|
for rule in regression_check_config:
|
|
if rule.get("line_label") == "New Requests":
|
|
try:
|
|
key_field, field_group = _get_key_field_from_new_requests_rule(
|
|
rule,
|
|
output_requests,
|
|
old_requests
|
|
)
|
|
break
|
|
except ValueError as e:
|
|
console.print(f"[bold red]Error determining key field: {e}[/bold red]")
|
|
return True
|
|
|
|
if not key_field:
|
|
console.print("[bold red]Error: 'New Requests' rule not found or has no valid field_selection[/bold red]")
|
|
return True
|
|
|
|
console.print(f"[dim]Using key field: {field_group}.{key_field}[/dim]\n")
|
|
|
|
new_dict = _build_requests_dict(output_requests, key_field, field_group)
|
|
old_dict = _build_requests_dict(old_requests, key_field, field_group)
|
|
|
|
# Group rules by bloc_title, preserving order of first appearance
|
|
blocs = {}
|
|
bloc_order = []
|
|
for rule in regression_check_config:
|
|
bloc_title = rule["bloc_title"]
|
|
if bloc_title not in blocs:
|
|
blocs[bloc_title] = []
|
|
bloc_order.append(bloc_title)
|
|
blocs[bloc_title].append(rule)
|
|
|
|
for bloc_title in bloc_order:
|
|
rules = blocs[bloc_title]
|
|
line_results = []
|
|
|
|
for rule in rules:
|
|
line_label = rule["line_label"]
|
|
|
|
if line_label == "New Requests":
|
|
count = _process_special_rule(rule, line_label, new_dict, old_dict)
|
|
line_results.append((line_label, count, None, "simple"))
|
|
|
|
elif line_label == "Deleted Requests":
|
|
deleted_keys = sorted(set(old_dict.keys()) - set(new_dict.keys()))
|
|
line_results.append((line_label, len(deleted_keys), deleted_keys, "deleted_requests"))
|
|
|
|
elif line_label in ["New Fields", "Deleted Fields"]:
|
|
field_list = _process_new_deleted_fields(line_label, new_dict, old_dict)
|
|
count = len(field_list)
|
|
line_results.append((line_label, count, field_list, "fields"))
|
|
|
|
else:
|
|
count, details = _process_rule(rule, new_dict, old_dict)
|
|
line_results.append((line_label, count, details, "details"))
|
|
|
|
# Calculate status for each line
|
|
line_results_with_status = []
|
|
for line_label, count, data, result_type in line_results:
|
|
rule = next(r for r in rules if r["line_label"] == line_label)
|
|
warning_threshold = rule["warning_threshold"]
|
|
critical_threshold = rule["critical_threshold"]
|
|
status_tuple = _get_status_and_style(count, warning_threshold, critical_threshold)
|
|
line_results_with_status.append((line_label, count, data, result_type, status_tuple))
|
|
|
|
bloc_status = _calculate_block_status([result[4] for result in line_results_with_status])
|
|
_print_block_header(bloc_title, bloc_status, indent=0)
|
|
|
|
for line_label, count, data, result_type, status_tuple in line_results_with_status:
|
|
should_display = (bloc_title == "Structure") or (status_tuple[0] != "OK")
|
|
|
|
if should_display:
|
|
if result_type == "fields":
|
|
_print_check_line(line_label, count, status_tuple, indent=1)
|
|
for field_name, request_count in data:
|
|
console.print(f" {field_name} ({request_count} requests)")
|
|
|
|
elif result_type == "details":
|
|
_print_check_line(line_label, count, status_tuple, indent=1)
|
|
if debug_mode and data and len(data) > 0:
|
|
for request_key, field_changes in data:
|
|
console.print(f" [dim]{key_field}: {request_key}[/dim]")
|
|
for qualified_field, old_val, new_val in field_changes:
|
|
old_display = f"'{old_val}'" if isinstance(old_val, str) else str(old_val)
|
|
new_display = f"'{new_val}'" if isinstance(new_val, str) else str(new_val)
|
|
console.print(f" - {qualified_field}: {old_display} → {new_display}")
|
|
|
|
elif result_type == "deleted_requests":
|
|
_print_check_line(line_label, count, status_tuple, indent=1)
|
|
logging.warning("Regression check - %s: %d", line_label, count)
|
|
for deleted_key in data:
|
|
console.print(f" {key_field}: {deleted_key}")
|
|
logging.warning(" Deleted request: %s=%s", key_field, deleted_key)
|
|
|
|
else:
|
|
_print_check_line(line_label, count, status_tuple, indent=1)
|
|
|
|
console.print()
|
|
|
|
return has_critical
|