Files
DO_Dashboard/do_dashboard_quality_checks.py

811 lines
34 KiB
Python

"""
DO Dashboard - Quality Checks Module
This module contains all quality assurance functions:
- JSON file loading and backup utilities
- Comprehensive non-regression checks with configurable rules
- Config-driven validation with Warning/Critical thresholds
- Support for special rules (New/Deleted Requests, New/Deleted Fields)
- 4-step logic for normal rules (field selection, transition matching, exception application, bloc_scope)
Note: Coherence check is not applicable for DO Dashboard since organization
counters are computed directly from request details (not from a separate API).
"""
import json
import logging
import os
import shutil
import openpyxl
from rich.console import Console
from do_dashboard_utils import get_nested_value, get_old_filename as _get_old_filename, get_config_path
from do_dashboard_constants import (
REQUESTS_FILE_NAME,
ORGANIZATIONS_FILE_NAME,
OLD_FILE_SUFFIX,
DASHBOARD_CONFIG_FILE_NAME,
REGRESSION_CHECK_TABLE_NAME
)
# ============================================================================
# MODULE CONFIGURATION
# ============================================================================
# Debug mode: Set to True to display detailed changes for each regression check rule
debug_mode = False
def enable_debug_mode():
"""Enable debug mode to display detailed changes for each regression check rule."""
global debug_mode
debug_mode = True
if console:
console.print("[dim]DEBUG MODE enabled - detailed changes will be displayed[/dim]")
# ============================================================================
# MODULE DEPENDENCIES (injected from main module)
# ============================================================================
# Will be injected by the main module
console = None
# Regression check config is loaded on-demand via load_regression_check_config()
regression_check_config = []
# NOTE: File names and table names are imported from do_dashboard_constants.py (SINGLE SOURCE OF TRUTH):
# - REQUESTS_FILE_NAME
# - ORGANIZATIONS_FILE_NAME
# - OLD_FILE_SUFFIX
# - DASHBOARD_CONFIG_FILE_NAME
# - REGRESSION_CHECK_TABLE_NAME
def set_dependencies(console_instance):
"""
Inject console instance from main module.
Args:
console_instance: Rich Console instance for formatted output
"""
global console
console = console_instance
# ============================================================================
# CONFIGURATION LOADING
# ============================================================================
def load_regression_check_config(console_instance=None):
"""Loads and validates the regression check configuration from the Excel file.
Args:
console_instance: Optional Rich Console instance. If not provided, uses global console.
"""
global regression_check_config, console
if console_instance:
console = console_instance
config_path = os.path.join(get_config_path(), DASHBOARD_CONFIG_FILE_NAME)
try:
workbook = openpyxl.load_workbook(config_path)
except FileNotFoundError:
error_msg = f"Error: Configuration file not found at: {config_path}"
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
if REGRESSION_CHECK_TABLE_NAME not in workbook.sheetnames:
error_msg = f"Error: Sheet '{REGRESSION_CHECK_TABLE_NAME}' not found in the configuration file."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
sheet = workbook[REGRESSION_CHECK_TABLE_NAME]
headers = [cell.value for cell in sheet[1]]
temp_config = []
for row_index, row in enumerate(sheet.iter_rows(min_row=2, values_only=True), start=2):
rule_config = dict(zip(headers, row))
# Skip if ignore column contains "ignore" (case insensitive)
ignore_value = rule_config.get("ignore")
if ignore_value and isinstance(ignore_value, str) and "ignore" in ignore_value.lower():
continue
# Skip if all columns are None (empty row)
if all(value is None for value in row):
continue
# Validate bloc_title and line_label
bloc_title = rule_config.get("bloc_title")
line_label = rule_config.get("line_label")
if not bloc_title or not isinstance(bloc_title, str):
continue # Skip rows without bloc_title (header separators, etc.)
if not line_label or not isinstance(line_label, str):
error_msg = f"Error in Regression_Check config, row {row_index}: 'line_label' is mandatory when 'bloc_title' is specified."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
# Validate thresholds
warning_threshold = rule_config.get("warning_threshold")
critical_threshold = rule_config.get("critical_threshold")
if warning_threshold is None or not isinstance(warning_threshold, (int, float)) or warning_threshold < 0:
error_msg = f"Error in Regression_Check config, row {row_index}: 'warning_threshold' must be a number >= 0."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
if critical_threshold is None or not isinstance(critical_threshold, (int, float)) or critical_threshold < 0:
error_msg = f"Error in Regression_Check config, row {row_index}: 'critical_threshold' must be a number >= 0."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
# Parse JSON fields
for json_field in ["field_selection", "transitions"]:
value = rule_config.get(json_field)
if value and isinstance(value, str):
try:
rule_config[json_field] = json.loads(value)
except json.JSONDecodeError:
error_msg = f"Error in Regression_Check config, row {row_index}, field '{json_field}': Invalid JSON format."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
elif value is None:
rule_config[json_field] = None
# Validate field_selection format
field_selection = rule_config.get("field_selection")
# Special rules that don't use field_selection
special_rules_no_selection = ["New Fields", "Deleted Fields", "Deleted Requests"]
if line_label not in special_rules_no_selection:
# Standard rules and "New Requests" MUST have field_selection
if field_selection is None:
error_msg = f"Error in Regression_Check config, row {row_index}: 'field_selection' is mandatory for rule '{line_label}'."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
if not isinstance(field_selection, list):
console.print(f"[yellow]⚠ Row {row_index}: 'field_selection' must be a JSON array of [action, selector] pairs, skipping rule[/yellow]")
rule_config["_config_error"] = True
else:
for step_idx, step in enumerate(field_selection):
if not isinstance(step, list) or len(step) != 2:
console.print(f"[yellow]⚠ Row {row_index}: field_selection[{step_idx}] must be array of 2 elements [action, selector], skipping rule[/yellow]")
rule_config["_config_error"] = True
break
action, field_selector = step
if action not in ["include", "exclude"]:
console.print(f"[yellow]⚠ Row {row_index}: field_selection[{step_idx}] action must be 'include' or 'exclude', got '{action}', skipping rule[/yellow]")
rule_config["_config_error"] = True
break
if not isinstance(field_selector, str) or "." not in field_selector:
console.print(f"[yellow]⚠ Row {row_index}: field_selection[{step_idx}] selector must be string with dot notation (e.g., '*.*', 'group.*', 'group.field'), got '{field_selector}', skipping rule[/yellow]")
rule_config["_config_error"] = True
break
else:
if field_selection is not None and field_selection != [] and field_selection != "":
console.print(f"[yellow]⚠ Row {row_index}: Special rule '{line_label}' should have empty field_selection, got {field_selection}[/yellow]")
rule_config["_config_error"] = True
# Validate bloc_scope
bloc_scope = rule_config.get("bloc_scope")
if bloc_scope is not None and bloc_scope not in ["all", "any"]:
error_msg = f"Error in Regression_Check config, row {row_index}: 'bloc_scope' must be 'all' or 'any'."
logging.critical(error_msg)
console.print(f"[bold red]{error_msg}[/bold red]")
raise Exception(error_msg)
# Validate transitions format
transitions = rule_config.get("transitions")
config_error = False
if transitions is not None:
if not isinstance(transitions, list):
console.print(f"[yellow]⚠ Row {row_index}: 'transitions' must be a JSON array, skipping this rule[/yellow]")
config_error = True
else:
for step_idx, transition_step in enumerate(transitions):
if not isinstance(transition_step, list) or len(transition_step) != 4:
console.print(f"[yellow]⚠ Row {row_index}: transitions[{step_idx}] must be array of 4 elements [action, field_selector, from, to], skipping[/yellow]")
config_error = True
break
action, field_selector, from_val, to_val = transition_step
if action not in ["include", "exclude"]:
console.print(f"[yellow]⚠ Row {row_index}: transitions[{step_idx}] action must be 'include' or 'exclude', got '{action}', skipping[/yellow]")
config_error = True
break
if not isinstance(field_selector, str) or "." not in field_selector:
console.print(f"[yellow]⚠ Row {row_index}: transitions[{step_idx}] field_selector must be string with dot notation, got '{field_selector}', skipping[/yellow]")
config_error = True
break
if config_error:
rule_config["_config_error"] = True
temp_config.append(rule_config)
regression_check_config = temp_config
console.print(f"Loaded {len(regression_check_config)} regression check rules.", style="green")
def run_check_only_mode(sys_argv):
"""
Orchestrates CHECK_ONLY and CHECK_ONLY_COMPARE modes.
- CHECK_ONLY: Full non-regression validation on existing files
- CHECK_ONLY_COMPARE: Regression-only comparison of two specific files
Args:
sys_argv: sys.argv from main script (to parse command-line arguments)
"""
global console
if console is None:
console = Console()
print()
# Detect CHECK_ONLY_COMPARE mode: --check-only <file1> <file2>
if len(sys_argv) >= 4:
current_file = sys_argv[2]
old_file = sys_argv[3]
console.print("[bold cyan]═══ CHECK ONLY COMPARE MODE ═══[/bold cyan]")
console.print(f"Comparing two specific files:\n")
console.print(f" Current: [bold]{current_file}[/bold]")
console.print(f" Old: [bold]{old_file}[/bold]\n")
print()
load_regression_check_config(console)
print()
has_regression_critical = run_quality_checks(
current_requests=current_file,
old_requests_filename=old_file
)
if has_regression_critical:
console.print("[bold red]✗ CRITICAL issues detected![/bold red]")
else:
console.print("[bold green]✓ All checks passed successfully![/bold green]")
else:
console.print("[bold cyan]═══ CHECK ONLY MODE ═══[/bold cyan]")
console.print("Running quality checks on existing data files without collecting new data.\n")
print()
load_regression_check_config(console)
print()
old_requests_file = _get_old_filename(REQUESTS_FILE_NAME, OLD_FILE_SUFFIX)
has_regression_critical = run_quality_checks(
current_requests=REQUESTS_FILE_NAME,
old_requests_filename=old_requests_file
)
if has_regression_critical:
console.print("[bold red]✗ CRITICAL issues detected![/bold red]")
else:
console.print("[bold green]✓ All checks passed successfully![/bold green]")
# ============================================================================
# FILE UTILITIES
# ============================================================================
def load_json_file(filename):
"""
Loads a JSON file (requests, organizations, or any JSON data).
Returns the parsed JSON data or None if file doesn't exist or error occurred.
"""
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logging.warning(f"Could not load JSON file '{filename}': {e}")
console.print(f"[yellow]⚠ Warning: Could not load JSON file '{filename}': {e}[/yellow]")
return None
def backup_output_files():
"""
Silently backups current output files before writing new versions.
Called AFTER all checks pass to avoid losing history on crash.
"""
def _backup_file_silent(source, destination):
if os.path.exists(source):
try:
shutil.copy2(source, destination)
except Exception as e:
logging.warning(f"Could not backup {source}: {e}")
_backup_file_silent(REQUESTS_FILE_NAME, _get_old_filename(REQUESTS_FILE_NAME, OLD_FILE_SUFFIX))
_backup_file_silent(ORGANIZATIONS_FILE_NAME, _get_old_filename(ORGANIZATIONS_FILE_NAME, OLD_FILE_SUFFIX))
# ============================================================================
# QUALITY CHECKS ORCHESTRATION
# ============================================================================
def run_quality_checks(current_requests, old_requests_filename):
"""
Runs non-regression quality checks on requests data.
Note: Coherence check is not applicable for DO Dashboard since organization
counters are computed from request details, not from a separate API.
Args:
current_requests: Either a filename (str) to load requests from,
or a list of request dictionaries (already in memory)
old_requests_filename: Filename of old requests for regression comparison (str)
Returns:
has_regression_critical (bool)
Usage:
- Normal mode:
run_quality_checks(
current_requests=output_requests, # list (in memory)
old_requests_filename=REQUESTS_FILE_NAME # str (current file on disk)
)
- Check-only mode:
run_quality_checks(
current_requests=REQUESTS_FILE_NAME, # str (current file)
old_requests_filename=get_old_filename(REQUESTS_FILE_NAME) # str (old file)
)
"""
global console, regression_check_config
if not regression_check_config:
if console is None:
console = Console()
load_regression_check_config(console)
console.print("[bold cyan]══════════════════════════════════════════════════[/bold cyan]")
# Load current_requests if it's a filename
if isinstance(current_requests, str):
current_requests_data = load_json_file(current_requests)
if current_requests_data is None:
console.print(f"[bold red]Error: Could not load current requests from '{current_requests}'[/bold red]")
return True
elif isinstance(current_requests, list):
current_requests_data = current_requests
else:
console.print(f"[bold red]Error: current_requests must be either a filename (str) or a list of requests[/bold red]")
return True
# Run non-regression check
has_regression_critical = non_regression_check(current_requests_data, old_requests_filename)
console.print("[bold cyan]══════════════════════════════════════════════════[/bold cyan]")
print()
return has_regression_critical
# ============================================================================
# NON-REGRESSION CHECK
# ============================================================================
def non_regression_check(output_requests, old_requests_filename):
"""
Comprehensive config-driven non-regression check comparing current vs old requests.
Uses rules from regression_check_config loaded from Excel.
Returns True if any critical issue was found, False otherwise.
Args:
output_requests: Current requests data (list)
old_requests_filename: Filename of old requests JSON file to load
"""
console.print("\n[bold]═══ Non Regression Check ═══[/bold]\n")
console.print(f"[dim]Loading old requests from: {old_requests_filename}[/dim]")
old_requests = load_json_file(old_requests_filename)
if old_requests is None:
console.print(f"[yellow]⚠ No old requests file found at '{old_requests_filename}', skipping non-regression check[/yellow]")
return False
has_critical = False
# ========== INTERNAL UTILITY FUNCTIONS ==========
def _is_undefined(value):
return value in [None, "", "undefined"]
def _values_are_equal(val1, val2):
if _is_undefined(val1) and _is_undefined(val2):
return True
return val1 == val2
def _apply_pipeline_step(checked_fields, action, field_selector, from_pattern, to_pattern):
for i, field_record in enumerate(checked_fields):
group_name, field_name, old_val, new_val, is_checked = field_record
if not _field_selector_matches_pattern(field_selector, group_name, field_name):
continue
if _transition_matches(old_val, new_val, from_pattern, to_pattern):
if action == "include":
checked_fields[i][4] = True
elif action == "exclude":
checked_fields[i][4] = False
def _transition_matches(old_val, new_val, expected_old, expected_new):
if expected_old == "*undefined":
old_matches = old_val in [None, "", "undefined"]
elif expected_old == "*defined":
old_matches = old_val not in [None, "", "undefined"]
elif expected_old == "*":
old_matches = True
else:
old_matches = (old_val == expected_old)
if expected_new == "*undefined":
new_matches = new_val in [None, "", "undefined"]
elif expected_new == "*defined":
new_matches = new_val not in [None, "", "undefined"]
elif expected_new == "*":
new_matches = True
else:
new_matches = (new_val == expected_new)
return old_matches and new_matches
def _get_status_and_style(count, warning_threshold, critical_threshold):
nonlocal has_critical
if count > critical_threshold:
has_critical = True
return "CRITICAL", "red", ""
elif count > warning_threshold:
return "WARNING", "yellow", ""
else:
return "OK", "green", ""
def _print_block_header(title, status_tuple, indent=0):
indent_str = " " * indent
status, color, emoji = status_tuple
console.print(f"{indent_str}{emoji} [{color}][bold]{title}[/bold][/{color}]")
def _print_check_line(message, count, status_tuple, indent=1):
indent_str = " " * indent
status, color, emoji = status_tuple
console.print(f"{indent_str}{emoji} [{color}]{message}: {count}[/{color}]")
def _calculate_block_status(line_statuses):
if any(s[0] == "CRITICAL" for s in line_statuses):
return ("CRITICAL", "red", "")
elif any(s[0] == "WARNING" for s in line_statuses):
return ("WARNING", "yellow", "")
else:
return ("OK", "green", "")
def _field_selector_matches_pattern(selector, group_name, field_name):
if selector == "*.*":
return True
sel_group, sel_field = selector.split(".", 1)
if sel_group != "*" and sel_group != group_name:
return False
if sel_field == "*":
return True
return sel_field == field_name
def _apply_field_selection_pipeline(all_fields, field_selection_config):
candidate_fields = set()
if not field_selection_config:
return candidate_fields
for action, field_selector in field_selection_config:
for group_name, field_name in all_fields:
if _field_selector_matches_pattern(field_selector, group_name, field_name):
if action == "include":
candidate_fields.add((group_name, field_name))
elif action == "exclude":
candidate_fields.discard((group_name, field_name))
return candidate_fields
def _get_key_field_from_new_requests_rule(rule, new_requests_list, old_requests_list):
if not new_requests_list or not old_requests_list:
raise ValueError("Cannot determine key field: empty request lists")
new_req = new_requests_list[0]
old_req = old_requests_list[0]
candidate_fields = _build_candidate_fields(new_req, old_req, rule.get("field_selection"))
if not candidate_fields:
raise ValueError(
f"field_selection produced no candidate fields. "
f"Config: {rule.get('field_selection')}"
)
# Iterate in config order (field_selection), not alphabetically
field_selection_config = rule.get("field_selection") or []
ordered_candidates = []
seen = set()
for _action, selector in field_selection_config:
sel_group, sel_field = selector.split(".", 1)
for (group_name, field_name) in candidate_fields:
if (group_name, field_name) in seen:
continue
if (sel_group in ("*", group_name)) and (sel_field in ("*", field_name)):
ordered_candidates.append((group_name, field_name))
seen.add((group_name, field_name))
for group_name, field_name in ordered_candidates:
new_val = get_nested_value(new_req, [group_name, field_name])
old_val = get_nested_value(old_req, [group_name, field_name])
if new_val is not None and old_val is not None:
return field_name, group_name
raise ValueError(
f"No field in field_selection has values in both first new and old request. "
f"Candidates from pipeline: {candidate_fields}. "
f"Verify field_selection config or data has proper values."
)
def _build_requests_dict(requests_list, key_field, field_group):
result = {}
for request in requests_list:
key = get_nested_value(request, [field_group, key_field])
if key:
result[key] = request
return result
def _matches_transition(old_val, new_val, transitions_config):
if transitions_config is None:
return False
for transition in transitions_config:
expected_old, expected_new = transition
if _transition_matches(old_val, new_val, expected_old, expected_new):
return True
return False
def _process_special_rule(rule, line_label, new_dict, old_dict):
if line_label == "New Requests":
return len(set(new_dict.keys()) - set(old_dict.keys()))
elif line_label == "Deleted Requests":
return len(set(old_dict.keys()) - set(new_dict.keys()))
else:
return 0
def _process_new_deleted_fields(line_label, new_dict, old_dict):
field_counts = {}
common_keys = sorted(set(new_dict.keys()) & set(old_dict.keys()))
for key in common_keys:
new_req = new_dict[key]
old_req = old_dict[key]
all_groups = sorted(set(new_req.keys()) | set(old_req.keys()))
for group_name in all_groups:
new_group = new_req.get(group_name, {})
old_group = old_req.get(group_name, {})
if not isinstance(new_group, dict):
new_group = {}
if not isinstance(old_group, dict):
old_group = {}
new_fields = set(new_group.keys())
old_fields = set(old_group.keys())
if line_label == "New Fields":
changed_fields = sorted(new_fields - old_fields)
elif line_label == "Deleted Fields":
changed_fields = sorted(old_fields - new_fields)
else:
changed_fields = []
for field_name in changed_fields:
qualified_name = f"{group_name}.{field_name}"
field_counts[qualified_name] = field_counts.get(qualified_name, 0) + 1
return sorted(field_counts.items(), key=lambda x: (-x[1], x[0]))
def _build_candidate_fields(new_req, old_req, field_selection_config):
common_groups = sorted(set(new_req.keys()) & set(old_req.keys()))
all_available_fields = []
for group_name in common_groups:
new_group = new_req.get(group_name, {})
old_group = old_req.get(group_name, {})
if not isinstance(new_group, dict):
new_group = {}
if not isinstance(old_group, dict):
old_group = {}
common_field_names = sorted(set(new_group.keys()) & set(old_group.keys()))
for field_name in common_field_names:
all_available_fields.append((group_name, field_name))
if not field_selection_config:
return []
candidate_fields = _apply_field_selection_pipeline(all_available_fields, field_selection_config)
return sorted(candidate_fields, key=lambda x: (x[0], x[1]))
def _process_rule(rule, new_dict, old_dict):
if rule.get("_config_error"):
return 0, []
field_selection_config = rule.get("field_selection")
bloc_scope = rule.get("bloc_scope") or "any"
common_keys = sorted(set(new_dict.keys()) & set(old_dict.keys()))
matching_requests_count = 0
details_list = []
for key in common_keys:
new_req = new_dict[key]
old_req = old_dict[key]
candidate_fields = _build_candidate_fields(new_req, old_req, field_selection_config)
if not candidate_fields:
continue
all_fields_list = []
changed_fields = []
for group_name, field_name in candidate_fields:
new_val = get_nested_value(new_req, [group_name, field_name])
old_val = get_nested_value(old_req, [group_name, field_name])
field_has_changed = not _values_are_equal(old_val, new_val)
if field_has_changed:
changed_fields.append((group_name, field_name))
all_fields_list.append([group_name, field_name, old_val, new_val, False])
transitions_config = rule.get("transitions", [])
if transitions_config and isinstance(transitions_config, list):
for action, field_selector, from_val, to_val in transitions_config:
_apply_pipeline_step(all_fields_list, action, field_selector, from_val, to_val)
checked_fields = [(f[0], f[1], f[2], f[3]) for f in all_fields_list if f[4]]
request_matches = False
if bloc_scope == "all":
if len(changed_fields) > 0 and len(checked_fields) == len(changed_fields):
request_matches = True
else: # bloc_scope == "any"
if len(checked_fields) > 0:
request_matches = True
if request_matches:
matching_requests_count += 1
if debug_mode and checked_fields:
field_changes = [(f"{gn}.{fn}", ov, nv) for gn, fn, ov, nv in checked_fields]
details_list.append((key, field_changes))
return matching_requests_count, details_list
# ========== MAIN LOGIC ==========
key_field = None
field_group = None
for rule in regression_check_config:
if rule.get("line_label") == "New Requests":
try:
key_field, field_group = _get_key_field_from_new_requests_rule(
rule,
output_requests,
old_requests
)
break
except ValueError as e:
console.print(f"[bold red]Error determining key field: {e}[/bold red]")
return True
if not key_field:
console.print("[bold red]Error: 'New Requests' rule not found or has no valid field_selection[/bold red]")
return True
console.print(f"[dim]Using key field: {field_group}.{key_field}[/dim]\n")
new_dict = _build_requests_dict(output_requests, key_field, field_group)
old_dict = _build_requests_dict(old_requests, key_field, field_group)
# Group rules by bloc_title, preserving order of first appearance
blocs = {}
bloc_order = []
for rule in regression_check_config:
bloc_title = rule["bloc_title"]
if bloc_title not in blocs:
blocs[bloc_title] = []
bloc_order.append(bloc_title)
blocs[bloc_title].append(rule)
for bloc_title in bloc_order:
rules = blocs[bloc_title]
line_results = []
for rule in rules:
line_label = rule["line_label"]
if line_label == "New Requests":
count = _process_special_rule(rule, line_label, new_dict, old_dict)
line_results.append((line_label, count, None, "simple"))
elif line_label == "Deleted Requests":
deleted_keys = sorted(set(old_dict.keys()) - set(new_dict.keys()))
line_results.append((line_label, len(deleted_keys), deleted_keys, "deleted_requests"))
elif line_label in ["New Fields", "Deleted Fields"]:
field_list = _process_new_deleted_fields(line_label, new_dict, old_dict)
count = len(field_list)
line_results.append((line_label, count, field_list, "fields"))
else:
count, details = _process_rule(rule, new_dict, old_dict)
line_results.append((line_label, count, details, "details"))
# Calculate status for each line
line_results_with_status = []
for line_label, count, data, result_type in line_results:
rule = next(r for r in rules if r["line_label"] == line_label)
warning_threshold = rule["warning_threshold"]
critical_threshold = rule["critical_threshold"]
status_tuple = _get_status_and_style(count, warning_threshold, critical_threshold)
line_results_with_status.append((line_label, count, data, result_type, status_tuple))
bloc_status = _calculate_block_status([result[4] for result in line_results_with_status])
_print_block_header(bloc_title, bloc_status, indent=0)
for line_label, count, data, result_type, status_tuple in line_results_with_status:
should_display = (bloc_title == "Structure") or (status_tuple[0] != "OK")
if should_display:
if result_type == "fields":
_print_check_line(line_label, count, status_tuple, indent=1)
for field_name, request_count in data:
console.print(f" {field_name} ({request_count} requests)")
elif result_type == "details":
_print_check_line(line_label, count, status_tuple, indent=1)
if debug_mode and data and len(data) > 0:
for request_key, field_changes in data:
console.print(f" [dim]{key_field}: {request_key}[/dim]")
for qualified_field, old_val, new_val in field_changes:
old_display = f"'{old_val}'" if isinstance(old_val, str) else str(old_val)
new_display = f"'{new_val}'" if isinstance(new_val, str) else str(new_val)
console.print(f" - {qualified_field}: {old_display}{new_display}")
elif result_type == "deleted_requests":
_print_check_line(line_label, count, status_tuple, indent=1)
logging.warning("Regression check - %s: %d", line_label, count)
for deleted_key in data:
console.print(f" {key_field}: {deleted_key}")
logging.warning(" Deleted request: %s=%s", key_field, deleted_key)
else:
_print_check_line(line_label, count, status_tuple, indent=1)
console.print()
return has_critical