Strategy change new rule (Part1)

This commit is contained in:
2026-05-05 02:36:03 +01:00
parent 7174ed44f0
commit d990307456
10 changed files with 88 additions and 15 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -289,7 +289,7 @@ def _do_login(username, password):
client.base_url = IAM_URL client.base_url = IAM_URL
response = client.post(API_AUTH_LOGIN_ENDPOINT, response = client.post(API_AUTH_LOGIN_ENDPOINT,
json={"username": username, "password": password}, json={"username": username, "password": password},
timeout=20) timeout=60)
response.raise_for_status() response.raise_for_status()
master_token = response.json()["access_token"] master_token = response.json()["access_token"]
user_id = response.json()["userId"] user_id = response.json()["userId"]
@@ -692,6 +692,13 @@ def get_value_from_inclusion(inclusion_dict, key):
def _execute_custom_function(function_name, args, output_inclusion): def _execute_custom_function(function_name, args, output_inclusion):
"""Executes a custom function for a calculated field.""" """Executes a custom function for a calculated field."""
def dominant_no_value(has_undef, has_na):
"""Returns the dominant sentinel: 'undefined' > 'N/A' > None (real value present)."""
if has_undef: return "undefined"
if has_na: return "N/A"
return None
if function_name == "search_in_fields_using_regex": if function_name == "search_in_fields_using_regex":
if not args or len(args) < 2: if not args or len(args) < 2:
return "$$$$ Argument Error: search_in_fields_using_regex requires at least 2 arguments" return "$$$$ Argument Error: search_in_fields_using_regex requires at least 2 arguments"
@@ -700,16 +707,19 @@ def _execute_custom_function(function_name, args, output_inclusion):
field_names = args[1:] field_names = args[1:]
field_values = [] field_values = []
all_undefined = True has_undefined = False
has_na = False
for field_name in field_names: for field_name in field_names:
value = get_value_from_inclusion(output_inclusion, field_name) value = get_value_from_inclusion(output_inclusion, field_name)
field_values.append(value) field_values.append(value)
if value is not None and value != "undefined": if value is None or value == "undefined":
all_undefined = False has_undefined = True
elif value == "N/A":
has_na = True
if all_undefined: if not any(v not in (None, "undefined", "N/A") for v in field_values):
return "undefined" return dominant_no_value(has_undefined, has_na)
try: try:
for value in field_values: for value in field_values:
@@ -730,6 +740,8 @@ def _execute_custom_function(function_name, args, output_inclusion):
if value is None or value == "undefined": if value is None or value == "undefined":
return "undefined" return "undefined"
if value == "N/A":
return "N/A"
match = re.search(r'\((.*?)\)', str(value)) match = re.search(r'\((.*?)\)', str(value))
return match.group(1) if match else "undefined" return match.group(1) if match else "undefined"
@@ -743,6 +755,8 @@ def _execute_custom_function(function_name, args, output_inclusion):
if status is None or status == "undefined": if status is None or status == "undefined":
return "undefined" return "undefined"
if status == "N/A":
return "N/A"
if not isinstance(is_terminated, bool) or not is_terminated: if not isinstance(is_terminated, bool) or not is_terminated:
return status return status
@@ -752,7 +766,8 @@ def _execute_custom_function(function_name, args, output_inclusion):
elif function_name == "if_then_else": elif function_name == "if_then_else":
# Unified conditional function # Unified conditional function
# Syntax: ["operator", arg1, arg2_optional, result_if_true, result_if_false] # Syntax: ["operator", arg1, arg2_optional, result_if_true, result_if_false]
# Operators: "is_true", "is_false", "all_true", "is_defined", "is_undefined", "all_defined", "==", "!=" # Operators: "is_true", "is_false", "all_true", "any_true", "is_defined", "is_undefined", "all_defined", "==", "!="
# Sentinel propagation: "undefined" (inconnu) > "N/A" (non applicable) > vraie valeur.
if not args or len(args) < 4: if not args or len(args) < 4:
return "$$$$ Argument Error: if_then_else requires at least 4 arguments" return "$$$$ Argument Error: if_then_else requires at least 4 arguments"
@@ -780,6 +795,8 @@ def _execute_custom_function(function_name, args, output_inclusion):
value = resolve_value(args[1]) value = resolve_value(args[1])
if value is None or value == "undefined": if value is None or value == "undefined":
return "undefined" return "undefined"
if value == "N/A":
return "N/A"
condition = (value is True) condition = (value is True)
result_if_true = resolve_value(args[2]) result_if_true = resolve_value(args[2])
result_if_false = resolve_value(args[3]) result_if_false = resolve_value(args[3])
@@ -790,6 +807,8 @@ def _execute_custom_function(function_name, args, output_inclusion):
value = resolve_value(args[1]) value = resolve_value(args[1])
if value is None or value == "undefined": if value is None or value == "undefined":
return "undefined" return "undefined"
if value == "N/A":
return "N/A"
condition = (value is False) condition = (value is False)
result_if_true = resolve_value(args[2]) result_if_true = resolve_value(args[2])
result_if_false = resolve_value(args[3]) result_if_false = resolve_value(args[3])
@@ -801,22 +820,58 @@ def _execute_custom_function(function_name, args, output_inclusion):
if not isinstance(fields_arg, list): if not isinstance(fields_arg, list):
return "$$$$ Argument Error: all_true requires arg1 to be a list of field names" return "$$$$ Argument Error: all_true requires arg1 to be a list of field names"
has_undefined = False
has_na = False
conditions = [] conditions = []
for field_name in fields_arg: for field_name in fields_arg:
field_value = get_value_from_inclusion(output_inclusion, field_name) field_value = get_value_from_inclusion(output_inclusion, field_name)
if field_value is None or field_value == "undefined": if field_value is None or field_value == "undefined":
return "undefined" has_undefined = True
conditions.append(field_value) elif field_value == "N/A":
has_na = True
else:
conditions.append(field_value)
status = dominant_no_value(has_undefined, has_na)
if status:
return status
condition = all(conditions) condition = all(conditions)
result_if_true = resolve_value(args[2]) result_if_true = resolve_value(args[2])
result_if_false = resolve_value(args[3]) result_if_false = resolve_value(args[3])
elif operator == "any_true":
# OR semantics: returns "undefined" only if ALL operands are undefined
if len(args) != 4:
return "$$$$ Argument Error: any_true requires 4 arguments"
fields_arg = args[1]
if not isinstance(fields_arg, list):
return "$$$$ Argument Error: any_true requires arg1 to be a list of field names"
has_undefined = False
has_na = False
resolved = []
for field_name in fields_arg:
field_value = get_value_from_inclusion(output_inclusion, field_name)
if field_value is None or field_value == "undefined":
has_undefined = True
elif field_value == "N/A":
has_na = True
else:
resolved.append(field_value)
if not resolved:
return dominant_no_value(has_undefined, has_na)
condition = any(resolved)
result_if_true = resolve_value(args[2])
result_if_false = resolve_value(args[3])
elif operator == "is_defined": elif operator == "is_defined":
if len(args) != 4: if len(args) != 4:
return "$$$$ Argument Error: is_defined requires 4 arguments" return "$$$$ Argument Error: is_defined requires 4 arguments"
value = resolve_value(args[1]) value = resolve_value(args[1])
condition = (value is not None and value != "undefined") condition = (value is not None and value != "undefined" and value != "N/A")
result_if_true = resolve_value(args[2]) result_if_true = resolve_value(args[2])
result_if_false = resolve_value(args[3]) result_if_false = resolve_value(args[3])
@@ -837,7 +892,7 @@ def _execute_custom_function(function_name, args, output_inclusion):
for field_name in fields_arg: for field_name in fields_arg:
field_value = get_value_from_inclusion(output_inclusion, field_name) field_value = get_value_from_inclusion(output_inclusion, field_name)
if field_value is None or field_value == "undefined": if field_value is None or field_value == "undefined" or field_value == "N/A":
condition = False condition = False
break break
else: else:
@@ -852,8 +907,12 @@ def _execute_custom_function(function_name, args, output_inclusion):
value1 = resolve_value(args[1]) value1 = resolve_value(args[1])
value2 = resolve_value(args[2]) value2 = resolve_value(args[2])
if value1 is None or value1 == "undefined" or value2 is None or value2 == "undefined": v1_undef = value1 is None or value1 == "undefined"
return "undefined" v2_undef = value2 is None or value2 == "undefined"
status = dominant_no_value(v1_undef or v2_undef,
value1 == "N/A" or value2 == "N/A")
if status:
return status
condition = (value1 == value2) condition = (value1 == value2)
result_if_true = resolve_value(args[3]) result_if_true = resolve_value(args[3])
@@ -865,8 +924,12 @@ def _execute_custom_function(function_name, args, output_inclusion):
value1 = resolve_value(args[1]) value1 = resolve_value(args[1])
value2 = resolve_value(args[2]) value2 = resolve_value(args[2])
if value1 is None or value1 == "undefined" or value2 is None or value2 == "undefined": v1_undef = value1 is None or value1 == "undefined"
return "undefined" v2_undef = value2 is None or value2 == "undefined"
status = dominant_no_value(v1_undef or v2_undef,
value1 == "N/A" or value2 == "N/A")
if status:
return status
condition = (value1 != value2) condition = (value1 != value2)
result_if_true = resolve_value(args[3]) result_if_true = resolve_value(args[3])
@@ -894,6 +957,8 @@ def process_inclusions_mapping(output_inclusion, inclusion_data, record_data, re
if condition_value is None or condition_value == "undefined": if condition_value is None or condition_value == "undefined":
final_value = "undefined" final_value = "undefined"
elif condition_value == "N/A":
final_value = "N/A"
elif not isinstance(condition_value, bool): elif not isinstance(condition_value, bool):
final_value = "$$$$ Condition Field Error" final_value = "$$$$ Condition Field Error"
elif not condition_value: elif not condition_value:
@@ -1449,6 +1514,14 @@ def main():
inclusions_total_count = sum(org.get('patients_count', 0) for org in organizations_list) inclusions_total_count = sum(org.get('patients_count', 0) for org in organizations_list)
organizations_list.sort(key=lambda org: (-org.get('patients_count', 0), org.get('name', ''))) organizations_list.sort(key=lambda org: (-org.get('patients_count', 0), org.get('name', '')))
# ╔══════════════════════════════════════════════════════════════════╗
# ║ TEST INSTRUMENTATION — À SUPPRIMER AVANT MISE EN PRODUCTION ║
# ║ Limite le tableau aux organisations aux rangs 33 et 34 ║
# ║ (indices 32 et 33, après tri décroissant par patients_count) ║
# ╚══════════════════════════════════════════════════════════════════╝
# organizations_list = [org for i, org in enumerate(organizations_list) if i in (32, 33)]
# ══════════════════════════════════════════════════════════════════
number_of_organizations = len(organizations_list) number_of_organizations = len(organizations_list)
print(f"{inclusions_total_count} Inclusions in {number_of_organizations} Organizations...") print(f"{inclusions_total_count} Inclusions in {number_of_organizations} Organizations...")
print() print()