""" DO Dashboard - Quality Checks Module This module contains all quality assurance functions: - JSON file loading and backup utilities - Comprehensive non-regression checks with configurable rules - Config-driven validation with Warning/Critical thresholds - Support for special rules (New/Deleted Requests, New/Deleted Fields) - 4-step logic for normal rules (field selection, transition matching, exception application, bloc_scope) Note: Coherence check is not applicable for DO Dashboard since organization counters are computed directly from request details (not from a separate API). """ import json import logging import os import shutil import openpyxl from rich.console import Console from do_dashboard_utils import get_nested_value, get_old_filename as _get_old_filename, get_config_path from do_dashboard_constants import ( REQUESTS_FILE_NAME, ORGANIZATIONS_FILE_NAME, OLD_FILE_SUFFIX, DASHBOARD_CONFIG_FILE_NAME, REGRESSION_CHECK_TABLE_NAME ) # ============================================================================ # MODULE CONFIGURATION # ============================================================================ # Debug mode: Set to True to display detailed changes for each regression check rule debug_mode = False def enable_debug_mode(): """Enable debug mode to display detailed changes for each regression check rule.""" global debug_mode debug_mode = True if console: console.print("[dim]DEBUG MODE enabled - detailed changes will be displayed[/dim]") # ============================================================================ # MODULE DEPENDENCIES (injected from main module) # ============================================================================ # Will be injected by the main module console = None # Regression check config is loaded on-demand via load_regression_check_config() regression_check_config = [] # NOTE: File names and table names are imported from do_dashboard_constants.py (SINGLE SOURCE OF TRUTH): # - REQUESTS_FILE_NAME # - ORGANIZATIONS_FILE_NAME # - OLD_FILE_SUFFIX # - DASHBOARD_CONFIG_FILE_NAME # - REGRESSION_CHECK_TABLE_NAME def set_dependencies(console_instance): """ Inject console instance from main module. Args: console_instance: Rich Console instance for formatted output """ global console console = console_instance # ============================================================================ # CONFIGURATION LOADING # ============================================================================ def load_regression_check_config(console_instance=None): """Loads and validates the regression check configuration from the Excel file. Args: console_instance: Optional Rich Console instance. If not provided, uses global console. """ global regression_check_config, console if console_instance: console = console_instance config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME) try: workbook = openpyxl.load_workbook(config_path) except FileNotFoundError: error_msg = f"Error: Configuration file not found at: {config_path}" logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) if REGRESSION_CHECK_TABLE_NAME not in workbook.sheetnames: error_msg = f"Error: Sheet '{REGRESSION_CHECK_TABLE_NAME}' not found in the configuration file." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) sheet = workbook[REGRESSION_CHECK_TABLE_NAME] headers = [cell.value for cell in sheet[1]] temp_config = [] for row_index, row in enumerate(sheet.iter_rows(min_row=2, values_only=True), start=2): rule_config = dict(zip(headers, row)) # Skip if ignore column contains "ignore" (case insensitive) ignore_value = rule_config.get("ignore") if ignore_value and isinstance(ignore_value, str) and "ignore" in ignore_value.lower(): continue # Skip if all columns are None (empty row) if all(value is None for value in row): continue # Validate bloc_title and line_label bloc_title = rule_config.get("bloc_title") line_label = rule_config.get("line_label") if not bloc_title or not isinstance(bloc_title, str): continue # Skip rows without bloc_title (header separators, etc.) if not line_label or not isinstance(line_label, str): error_msg = f"Error in Regression_Check config, row {row_index}: 'line_label' is mandatory when 'bloc_title' is specified." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) # Validate thresholds warning_threshold = rule_config.get("warning_threshold") critical_threshold = rule_config.get("critical_threshold") if warning_threshold is None or not isinstance(warning_threshold, (int, float)) or warning_threshold < 0: error_msg = f"Error in Regression_Check config, row {row_index}: 'warning_threshold' must be a number >= 0." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) if critical_threshold is None or not isinstance(critical_threshold, (int, float)) or critical_threshold < 0: error_msg = f"Error in Regression_Check config, row {row_index}: 'critical_threshold' must be a number >= 0." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) # Parse JSON fields for json_field in ["field_selection", "transitions"]: value = rule_config.get(json_field) if value and isinstance(value, str): try: rule_config[json_field] = json.loads(value) except json.JSONDecodeError: error_msg = f"Error in Regression_Check config, row {row_index}, field '{json_field}': Invalid JSON format." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) elif value is None: rule_config[json_field] = None # Validate field_selection format field_selection = rule_config.get("field_selection") # Special rules that don't use field_selection special_rules_no_selection = ["New Fields", "Deleted Fields", "Deleted Requests"] if line_label not in special_rules_no_selection: # Standard rules and "New Requests" MUST have field_selection if field_selection is None: error_msg = f"Error in Regression_Check config, row {row_index}: 'field_selection' is mandatory for rule '{line_label}'." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) if not isinstance(field_selection, list): console.print(f"[yellow]⚠ Row {row_index}: 'field_selection' must be a JSON array of [action, selector] pairs, skipping rule[/yellow]") rule_config["_config_error"] = True else: for step_idx, step in enumerate(field_selection): if not isinstance(step, list) or len(step) != 2: console.print(f"[yellow]⚠ Row {row_index}: field_selection[{step_idx}] must be array of 2 elements [action, selector], skipping rule[/yellow]") rule_config["_config_error"] = True break action, field_selector = step if action not in ["include", "exclude"]: console.print(f"[yellow]⚠ Row {row_index}: field_selection[{step_idx}] action must be 'include' or 'exclude', got '{action}', skipping rule[/yellow]") rule_config["_config_error"] = True break if not isinstance(field_selector, str) or "." not in field_selector: console.print(f"[yellow]⚠ Row {row_index}: field_selection[{step_idx}] selector must be string with dot notation (e.g., '*.*', 'group.*', 'group.field'), got '{field_selector}', skipping rule[/yellow]") rule_config["_config_error"] = True break else: if field_selection is not None and field_selection != [] and field_selection != "": console.print(f"[yellow]⚠ Row {row_index}: Special rule '{line_label}' should have empty field_selection, got {field_selection}[/yellow]") rule_config["_config_error"] = True # Validate bloc_scope bloc_scope = rule_config.get("bloc_scope") if bloc_scope is not None and bloc_scope not in ["all", "any"]: error_msg = f"Error in Regression_Check config, row {row_index}: 'bloc_scope' must be 'all' or 'any'." logging.critical(error_msg) console.print(f"[bold red]{error_msg}[/bold red]") raise Exception(error_msg) # Validate transitions format transitions = rule_config.get("transitions") config_error = False if transitions is not None: if not isinstance(transitions, list): console.print(f"[yellow]⚠ Row {row_index}: 'transitions' must be a JSON array, skipping this rule[/yellow]") config_error = True else: for step_idx, transition_step in enumerate(transitions): if not isinstance(transition_step, list) or len(transition_step) != 4: console.print(f"[yellow]⚠ Row {row_index}: transitions[{step_idx}] must be array of 4 elements [action, field_selector, from, to], skipping[/yellow]") config_error = True break action, field_selector, from_val, to_val = transition_step if action not in ["include", "exclude"]: console.print(f"[yellow]⚠ Row {row_index}: transitions[{step_idx}] action must be 'include' or 'exclude', got '{action}', skipping[/yellow]") config_error = True break if not isinstance(field_selector, str) or "." not in field_selector: console.print(f"[yellow]⚠ Row {row_index}: transitions[{step_idx}] field_selector must be string with dot notation, got '{field_selector}', skipping[/yellow]") config_error = True break if config_error: rule_config["_config_error"] = True temp_config.append(rule_config) regression_check_config = temp_config console.print(f"Loaded {len(regression_check_config)} regression check rules.", style="green") def run_check_only_mode(sys_argv): """ Orchestrates CHECK_ONLY and CHECK_ONLY_COMPARE modes. - CHECK_ONLY: Full non-regression validation on existing files - CHECK_ONLY_COMPARE: Regression-only comparison of two specific files Args: sys_argv: sys.argv from main script (to parse command-line arguments) """ global console if console is None: console = Console() print() # Detect CHECK_ONLY_COMPARE mode: --check-only if len(sys_argv) >= 4: current_file = sys_argv[2] old_file = sys_argv[3] console.print("[bold cyan]═══ CHECK ONLY COMPARE MODE ═══[/bold cyan]") console.print(f"Comparing two specific files:\n") console.print(f" Current: [bold]{current_file}[/bold]") console.print(f" Old: [bold]{old_file}[/bold]\n") print() load_regression_check_config(console) print() has_regression_critical = run_quality_checks( current_requests=current_file, old_requests_filename=old_file ) if has_regression_critical: console.print("[bold red]✗ CRITICAL issues detected![/bold red]") else: console.print("[bold green]✓ All checks passed successfully![/bold green]") else: console.print("[bold cyan]═══ CHECK ONLY MODE ═══[/bold cyan]") console.print("Running quality checks on existing data files without collecting new data.\n") print() load_regression_check_config(console) print() old_requests_file = _get_old_filename(REQUESTS_FILE_NAME, OLD_FILE_SUFFIX) has_regression_critical = run_quality_checks( current_requests=REQUESTS_FILE_NAME, old_requests_filename=old_requests_file ) if has_regression_critical: console.print("[bold red]✗ CRITICAL issues detected![/bold red]") else: console.print("[bold green]✓ All checks passed successfully![/bold green]") # ============================================================================ # FILE UTILITIES # ============================================================================ def load_json_file(filename): """ Loads a JSON file (requests, organizations, or any JSON data). Returns the parsed JSON data or None if file doesn't exist or error occurred. """ if os.path.exists(filename): try: with open(filename, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logging.warning(f"Could not load JSON file '{filename}': {e}") console.print(f"[yellow]⚠ Warning: Could not load JSON file '{filename}': {e}[/yellow]") return None def backup_output_files(): """ Silently backups current output files before writing new versions. Called AFTER all checks pass to avoid losing history on crash. """ def _backup_file_silent(source, destination): if os.path.exists(source): try: shutil.copy2(source, destination) except Exception as e: logging.warning(f"Could not backup {source}: {e}") _backup_file_silent(REQUESTS_FILE_NAME, _get_old_filename(REQUESTS_FILE_NAME, OLD_FILE_SUFFIX)) _backup_file_silent(ORGANIZATIONS_FILE_NAME, _get_old_filename(ORGANIZATIONS_FILE_NAME, OLD_FILE_SUFFIX)) # ============================================================================ # QUALITY CHECKS ORCHESTRATION # ============================================================================ def run_quality_checks(current_requests, old_requests_filename): """ Runs non-regression quality checks on requests data. Note: Coherence check is not applicable for DO Dashboard since organization counters are computed from request details, not from a separate API. Args: current_requests: Either a filename (str) to load requests from, or a list of request dictionaries (already in memory) old_requests_filename: Filename of old requests for regression comparison (str) Returns: has_regression_critical (bool) Usage: - Normal mode: run_quality_checks( current_requests=output_requests, # list (in memory) old_requests_filename=REQUESTS_FILE_NAME # str (current file on disk) ) - Check-only mode: run_quality_checks( current_requests=REQUESTS_FILE_NAME, # str (current file) old_requests_filename=get_old_filename(REQUESTS_FILE_NAME) # str (old file) ) """ global console, regression_check_config if not regression_check_config: if console is None: console = Console() load_regression_check_config(console) console.print("[bold cyan]══════════════════════════════════════════════════[/bold cyan]") # Load current_requests if it's a filename if isinstance(current_requests, str): current_requests_data = load_json_file(current_requests) if current_requests_data is None: console.print(f"[bold red]Error: Could not load current requests from '{current_requests}'[/bold red]") return True elif isinstance(current_requests, list): current_requests_data = current_requests else: console.print(f"[bold red]Error: current_requests must be either a filename (str) or a list of requests[/bold red]") return True # Run non-regression check has_regression_critical = non_regression_check(current_requests_data, old_requests_filename) console.print("[bold cyan]══════════════════════════════════════════════════[/bold cyan]") print() return has_regression_critical # ============================================================================ # NON-REGRESSION CHECK # ============================================================================ def non_regression_check(output_requests, old_requests_filename): """ Comprehensive config-driven non-regression check comparing current vs old requests. Uses rules from regression_check_config loaded from Excel. Returns True if any critical issue was found, False otherwise. Args: output_requests: Current requests data (list) old_requests_filename: Filename of old requests JSON file to load """ console.print("\n[bold]═══ Non Regression Check ═══[/bold]\n") console.print(f"[dim]Loading old requests from: {old_requests_filename}[/dim]") old_requests = load_json_file(old_requests_filename) if old_requests is None: console.print(f"[yellow]⚠ No old requests file found at '{old_requests_filename}', skipping non-regression check[/yellow]") return False has_critical = False # ========== INTERNAL UTILITY FUNCTIONS ========== def _is_undefined(value): return value in [None, "", "undefined"] def _values_are_equal(val1, val2): if _is_undefined(val1) and _is_undefined(val2): return True return val1 == val2 def _apply_pipeline_step(checked_fields, action, field_selector, from_pattern, to_pattern): for i, field_record in enumerate(checked_fields): group_name, field_name, old_val, new_val, is_checked = field_record if not _field_selector_matches_pattern(field_selector, group_name, field_name): continue if _transition_matches(old_val, new_val, from_pattern, to_pattern): if action == "include": checked_fields[i][4] = True elif action == "exclude": checked_fields[i][4] = False def _transition_matches(old_val, new_val, expected_old, expected_new): if expected_old == "*undefined": old_matches = old_val in [None, "", "undefined"] elif expected_old == "*defined": old_matches = old_val not in [None, "", "undefined"] elif expected_old == "*": old_matches = True else: old_matches = (old_val == expected_old) if expected_new == "*undefined": new_matches = new_val in [None, "", "undefined"] elif expected_new == "*defined": new_matches = new_val not in [None, "", "undefined"] elif expected_new == "*": new_matches = True else: new_matches = (new_val == expected_new) return old_matches and new_matches def _get_status_and_style(count, warning_threshold, critical_threshold): nonlocal has_critical if count > critical_threshold: has_critical = True return "CRITICAL", "red", "✗" elif count > warning_threshold: return "WARNING", "yellow", "⚠" else: return "OK", "green", "✓" def _print_block_header(title, status_tuple, indent=0): indent_str = " " * indent status, color, emoji = status_tuple console.print(f"{indent_str}{emoji} [{color}][bold]{title}[/bold][/{color}]") def _print_check_line(message, count, status_tuple, indent=1): indent_str = " " * indent status, color, emoji = status_tuple console.print(f"{indent_str}{emoji} [{color}]{message}: {count}[/{color}]") def _calculate_block_status(line_statuses): if any(s[0] == "CRITICAL" for s in line_statuses): return ("CRITICAL", "red", "✗") elif any(s[0] == "WARNING" for s in line_statuses): return ("WARNING", "yellow", "⚠") else: return ("OK", "green", "✓") def _field_selector_matches_pattern(selector, group_name, field_name): if selector == "*.*": return True sel_group, sel_field = selector.split(".", 1) if sel_group != "*" and sel_group != group_name: return False if sel_field == "*": return True return sel_field == field_name def _apply_field_selection_pipeline(all_fields, field_selection_config): candidate_fields = set() if not field_selection_config: return candidate_fields for action, field_selector in field_selection_config: for group_name, field_name in all_fields: if _field_selector_matches_pattern(field_selector, group_name, field_name): if action == "include": candidate_fields.add((group_name, field_name)) elif action == "exclude": candidate_fields.discard((group_name, field_name)) return candidate_fields def _get_key_field_from_new_requests_rule(rule, new_requests_list, old_requests_list): if not new_requests_list or not old_requests_list: raise ValueError("Cannot determine key field: empty request lists") new_req = new_requests_list[0] old_req = old_requests_list[0] candidate_fields = _build_candidate_fields(new_req, old_req, rule.get("field_selection")) if not candidate_fields: raise ValueError( f"field_selection produced no candidate fields. " f"Config: {rule.get('field_selection')}" ) for group_name, field_name in sorted(candidate_fields): new_val = get_nested_value(new_req, [group_name, field_name]) old_val = get_nested_value(old_req, [group_name, field_name]) if new_val is not None and old_val is not None: return field_name, group_name raise ValueError( f"No field in field_selection has values in both first new and old request. " f"Candidates from pipeline: {candidate_fields}. " f"Verify field_selection config or data has proper values." ) def _build_requests_dict(requests_list, key_field, field_group): result = {} for request in requests_list: key = get_nested_value(request, [field_group, key_field]) if key: result[key] = request return result def _matches_transition(old_val, new_val, transitions_config): if transitions_config is None: return False for transition in transitions_config: expected_old, expected_new = transition if _transition_matches(old_val, new_val, expected_old, expected_new): return True return False def _process_special_rule(rule, line_label, new_dict, old_dict): if line_label == "New Requests": return len(set(new_dict.keys()) - set(old_dict.keys())) elif line_label == "Deleted Requests": return len(set(old_dict.keys()) - set(new_dict.keys())) else: return 0 def _process_new_deleted_fields(line_label, new_dict, old_dict): field_counts = {} common_keys = sorted(set(new_dict.keys()) & set(old_dict.keys())) for key in common_keys: new_req = new_dict[key] old_req = old_dict[key] all_groups = sorted(set(new_req.keys()) | set(old_req.keys())) for group_name in all_groups: new_group = new_req.get(group_name, {}) old_group = old_req.get(group_name, {}) if not isinstance(new_group, dict): new_group = {} if not isinstance(old_group, dict): old_group = {} new_fields = set(new_group.keys()) old_fields = set(old_group.keys()) if line_label == "New Fields": changed_fields = sorted(new_fields - old_fields) elif line_label == "Deleted Fields": changed_fields = sorted(old_fields - new_fields) else: changed_fields = [] for field_name in changed_fields: qualified_name = f"{group_name}.{field_name}" field_counts[qualified_name] = field_counts.get(qualified_name, 0) + 1 return sorted(field_counts.items(), key=lambda x: (-x[1], x[0])) def _build_candidate_fields(new_req, old_req, field_selection_config): common_groups = sorted(set(new_req.keys()) & set(old_req.keys())) all_available_fields = [] for group_name in common_groups: new_group = new_req.get(group_name, {}) old_group = old_req.get(group_name, {}) if not isinstance(new_group, dict): new_group = {} if not isinstance(old_group, dict): old_group = {} common_field_names = sorted(set(new_group.keys()) & set(old_group.keys())) for field_name in common_field_names: all_available_fields.append((group_name, field_name)) if not field_selection_config: return [] candidate_fields = _apply_field_selection_pipeline(all_available_fields, field_selection_config) return sorted(candidate_fields, key=lambda x: (x[0], x[1])) def _process_rule(rule, new_dict, old_dict): if rule.get("_config_error"): return 0, [] field_selection_config = rule.get("field_selection") bloc_scope = rule.get("bloc_scope") or "any" common_keys = sorted(set(new_dict.keys()) & set(old_dict.keys())) matching_requests_count = 0 details_list = [] for key in common_keys: new_req = new_dict[key] old_req = old_dict[key] candidate_fields = _build_candidate_fields(new_req, old_req, field_selection_config) if not candidate_fields: continue all_fields_list = [] changed_fields = [] for group_name, field_name in candidate_fields: new_val = get_nested_value(new_req, [group_name, field_name]) old_val = get_nested_value(old_req, [group_name, field_name]) field_has_changed = not _values_are_equal(old_val, new_val) if field_has_changed: changed_fields.append((group_name, field_name)) all_fields_list.append([group_name, field_name, old_val, new_val, False]) transitions_config = rule.get("transitions", []) if transitions_config and isinstance(transitions_config, list): for action, field_selector, from_val, to_val in transitions_config: _apply_pipeline_step(all_fields_list, action, field_selector, from_val, to_val) checked_fields = [(f[0], f[1], f[2], f[3]) for f in all_fields_list if f[4]] inclusion_matches = False if bloc_scope == "all": if len(changed_fields) > 0 and len(checked_fields) == len(changed_fields): inclusion_matches = True else: # bloc_scope == "any" if len(checked_fields) > 0: inclusion_matches = True if inclusion_matches: matching_requests_count += 1 if debug_mode and checked_fields: field_changes = [(f"{gn}.{fn}", ov, nv) for gn, fn, ov, nv in checked_fields] details_list.append((key, field_changes)) return matching_requests_count, details_list # ========== MAIN LOGIC ========== key_field = None field_group = None for rule in regression_check_config: if rule.get("line_label") == "New Requests": try: key_field, field_group = _get_key_field_from_new_requests_rule( rule, output_requests, old_requests ) break except ValueError as e: console.print(f"[bold red]Error determining key field: {e}[/bold red]") return True if not key_field: console.print("[bold red]Error: 'New Requests' rule not found or has no valid field_selection[/bold red]") return True console.print(f"[dim]Using key field: {field_group}.{key_field}[/dim]\n") new_dict = _build_requests_dict(output_requests, key_field, field_group) old_dict = _build_requests_dict(old_requests, key_field, field_group) # Group rules by bloc_title, preserving order of first appearance blocs = {} bloc_order = [] for rule in regression_check_config: bloc_title = rule["bloc_title"] if bloc_title not in blocs: blocs[bloc_title] = [] bloc_order.append(bloc_title) blocs[bloc_title].append(rule) for bloc_title in bloc_order: rules = blocs[bloc_title] line_results = [] for rule in rules: line_label = rule["line_label"] if line_label in ["New Requests", "Deleted Requests"]: count = _process_special_rule(rule, line_label, new_dict, old_dict) line_results.append((line_label, count, None, "simple")) elif line_label in ["New Fields", "Deleted Fields"]: field_list = _process_new_deleted_fields(line_label, new_dict, old_dict) count = len(field_list) line_results.append((line_label, count, field_list, "fields")) else: count, details = _process_rule(rule, new_dict, old_dict) line_results.append((line_label, count, details, "details")) # Calculate status for each line line_results_with_status = [] for line_label, count, data, result_type in line_results: rule = next(r for r in rules if r["line_label"] == line_label) warning_threshold = rule["warning_threshold"] critical_threshold = rule["critical_threshold"] status_tuple = _get_status_and_style(count, warning_threshold, critical_threshold) line_results_with_status.append((line_label, count, data, result_type, status_tuple)) bloc_status = _calculate_block_status([result[4] for result in line_results_with_status]) _print_block_header(bloc_title, bloc_status, indent=0) for line_label, count, data, result_type, status_tuple in line_results_with_status: should_display = (bloc_title == "Structure") or (status_tuple[0] != "OK") if should_display: if result_type == "fields": _print_check_line(line_label, count, status_tuple, indent=1) for field_name, request_count in data: console.print(f" {field_name} ({request_count} requests)") elif result_type == "details": _print_check_line(line_label, count, status_tuple, indent=1) if debug_mode and data and len(data) > 0: for request_key, field_changes in data: console.print(f" [dim]{key_field}: {request_key}[/dim]") for qualified_field, old_val, new_val in field_changes: old_display = f"'{old_val}'" if isinstance(old_val, str) else str(old_val) new_display = f"'{new_val}'" if isinstance(new_val, str) else str(new_val) console.print(f" - {qualified_field}: {old_display} → {new_display}") else: _print_check_line(line_label, count, status_tuple, indent=1) console.print() return has_critical