Detecting duplicate Questionnaires

This commit is contained in:
2026-04-16 18:43:17 +01:00
parent 3904948c32
commit 00dd82290c
2 changed files with 37 additions and 10 deletions

3
.gitignore vendored
View File

@@ -195,9 +195,10 @@ Endobest Reporting/
jsons history/
nul
# Ignore all json, exe, log and xlsx files
# Ignore all json, exe, log, txt and xlsx files
*.json
*.exe
*.log
*.txt
/*.xlsx
!eb_org_center_mapping.xlsx

View File

@@ -617,6 +617,12 @@ def _find_questionnaire_by_id(qcm_dict, qcm_id):
if not isinstance(qcm_dict, dict):
return None
qcm_data = qcm_dict.get(qcm_id)
if qcm_data and qcm_data.get("_count", 1) > 1:
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
logging.error(
f"[DUPLICATE QCM] Patient {ctx['id']} ({ctx['pseudo']}): "
f"Questionnaire id='{qcm_id}' appeared {qcm_data['_count']} times in API response — using last received copy"
)
return qcm_data.get("answers") if qcm_data else None
@@ -624,20 +630,32 @@ def _find_questionnaire_by_name(qcm_dict, name):
"""Finds a questionnaire by name (sequential search, returns first match)."""
if not isinstance(qcm_dict, dict):
return None
for qcm in qcm_dict.values():
if get_nested_value(qcm, ["questionnaire", "name"]) == name:
return qcm.get("answers")
return None
matches = [qcm for qcm in qcm_dict.values()
if get_nested_value(qcm, ["questionnaire", "name"]) == name]
if len(matches) > 1:
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
ids = [get_nested_value(q, ["questionnaire", "id"]) for q in matches]
logging.error(
f"[DUPLICATE QCM] Patient {ctx['id']} ({ctx['pseudo']}): "
f"Questionnaire name='{name}' matches {len(matches)} entries (ids: {ids}) — returning first match"
)
return matches[0].get("answers") if matches else None
def _find_questionnaire_by_category(qcm_dict, category):
"""Finds a questionnaire by category (sequential search, returns first match)."""
if not isinstance(qcm_dict, dict):
return None
for qcm in qcm_dict.values():
if get_nested_value(qcm, ["questionnaire", "category"]) == category:
return qcm.get("answers")
return None
matches = [qcm for qcm in qcm_dict.values()
if get_nested_value(qcm, ["questionnaire", "category"]) == category]
if len(matches) > 1:
ctx = getattr(thread_local_storage, "current_patient_context", {"id": "Unknown", "pseudo": "Unknown"})
ids = [get_nested_value(q, ["questionnaire", "id"]) for q in matches]
logging.error(
f"[DUPLICATE QCM] Patient {ctx['id']} ({ctx['pseudo']}): "
f"Questionnaire category='{category}' matches {len(matches)} entries (ids: {ids}) — returning first match"
)
return matches[0].get("answers") if matches else None
def _get_field_value_from_questionnaire(all_questionnaires, field_config):
@@ -1079,6 +1097,13 @@ def get_all_questionnaires_by_patient(patient_id, record_data):
response.raise_for_status()
response_data = response.json()
# First pass: count occurrences of each q_id to detect duplicates at lookup time
q_id_counts = {}
for item in response_data:
q_id = get_nested_value(item, path=["questionnaire", "id"])
if q_id:
q_id_counts[q_id] = q_id_counts.get(q_id, 0) + 1
# Build dictionary with questionnaire metadata for searching
results = {}
for item in response_data:
@@ -1093,7 +1118,8 @@ def get_all_questionnaires_by_patient(patient_id, record_data):
"name": q_name,
"category": q_category
},
"answers": answers
"answers": answers,
"_count": q_id_counts[q_id]
}
return results