Files
professionals_from_sante_fr/prepareProfessionalsTable.py

206 lines
8.0 KiB
Python

import argparse
import math
import sys
from os import path
from shutil import copyfileobj
from zipfile import ZipFile, is_zipfile
import polars as pl
import requests
from tqdm import tqdm # could use from tqdm.gui import tqdm
from tqdm.utils import CallbackIOWrapper
from urllib3.exceptions import InsecureRequestWarning
from urllib3 import disable_warnings
import questionary
# Colonnes à effacer (données personnelles ou inutiles pour l'import)
COLUMNS_TO_CLEAR = [
"Type d'identifiant PP",
"Identification nationale PP",
"Libellé civilité d'exercice",
"Code civilité",
"Libellé civilité",
"Code profession",
"Code catégorie professionnelle",
"Libellé catégorie professionnelle",
"Code type savoir-faire",
"Libellé type savoir-faire",
"Code savoir-faire",
"Code mode exercice",
"Libellé mode exercice",
"Numéro SIRET site",
"Numéro SIREN site",
"Numéro FINESS site",
"Numéro FINESS établissement juridique",
"Identifiant technique de la structure",
"Raison sociale site",
"Enseigne commerciale site",
"Complément destinataire (coord. structure)",
"Complément point géographique (coord. structure)",
"Code type de voie (coord. structure)",
"Code postal (coord. structure)",
"Code commune (coord. structure)",
"Libellé commune (coord. structure)",
"Code pays (coord. structure)",
"Libellé pays (coord. structure)",
"Téléphone 2 (coord. structure)",
"Télécopie (coord. structure)",
"Adresse e-mail (coord. structure)",
"Code Département (structure)",
"Libellé Département (structure)",
"Ancien identifiant de la structure",
"Autorité d'enregistrement",
"Code secteur d'activité",
"Libellé secteur d'activité",
"Code section tableau pharmaciens",
"Libellé section tableau pharmaciens",
"Code rôle",
"Libellé rôle",
"Code genre activité",
"Libellé genre activité",
]
def process_professionals_table(xls_file, txt_file, output_file):
# Chargement des onglets Excel
professions = pl.read_excel(xls_file, sheet_name='F_Professions').get_column('Professions').to_list()
df_append = pl.read_excel(xls_file, sheet_name='F_Append_Update', infer_schema_length=0)
df_etrangers = pl.read_excel(xls_file, sheet_name='F_Etrangers', infer_schema_length=0)
df_fake = pl.read_excel(xls_file, sheet_name='F_Fake', infer_schema_length=0)
df_sophrolo = pl.read_excel(xls_file, sheet_name='F_Sophrologues', infer_schema_length=0)
# Colonnes à effacer présentes dans le fichier (filtre défensif)
schema = pl.scan_csv(txt_file, separator='|', infer_schema=False, quote_char=None).collect_schema()
cols_to_clear = [c for c in COLUMNS_TO_CLEAR if c in schema]
# Lecture lazy du CSV + filtrage + effacement des colonnes sensibles
with tqdm(desc=f'Processing {path.basename(txt_file)}', bar_format='{desc}... {elapsed}',
leave=True) as spinner:
result = (
pl.scan_csv(txt_file, separator='|', infer_schema=False, quote_char=None)
.filter(pl.col('Libellé profession').is_in(professions))
.with_columns([pl.lit('').alias(c) for c in cols_to_clear])
.collect()
)
spinner.set_description(f'Processing {path.basename(txt_file)}{len(result)} lignes')
# Ajout des données complémentaires des onglets Excel
extra = (
pl.concat([df_append, df_etrangers, df_fake, df_sophrolo], how='diagonal_relaxed')
.filter(pl.col('Libellé profession').is_in(professions))
.with_columns([pl.lit('').alias(c) for c in cols_to_clear if c in df_append.columns])
)
final = pl.concat([result, extra], how='diagonal_relaxed')
final.write_csv(output_file, separator='|', quote_style='never', line_terminator='\n')
print(f"Written: {path.basename(output_file)} ({len(final)} lignes)")
def download_file(url: str, filename: str = False) -> object:
if not filename:
local_filename = path.join(".", url.split('/')[-1])
else:
local_filename = filename
disable_warnings(InsecureRequestWarning)
r = requests.get(url, stream=True, verify=False)
file_size = int(r.headers['Content-Length'])
unit_scale = 64
with open(local_filename, 'wb') as fp:
for chunk in tqdm(r.iter_content(chunk_size=unit_scale * 1024),
total=math.ceil(file_size / 1024 / unit_scale),
unit_scale=unit_scale,
unit='KB',
desc=f"Downloading to {path.basename(local_filename)}",
leave=True):
fp.write(chunk)
return
def extract_one_file_from_zip(zipfile, fromfile, tofile, desc=False):
if not desc:
desc = f"Extracting to {path.basename(tofile)}"
file = None
if not is_zipfile(zipfile):
return f"Can't open Zipfile (non existent or bad): {zipfile}"
zipf = ZipFile(zipfile)
for f in zipf.infolist():
if getattr(f, "filename", "").startswith(fromfile):
file = f
break
if file is None:
return f"No such file name in the Zip ({fromfile}*)..."
with zipf, tqdm(
desc=desc, unit="B", unit_scale=True, unit_divisor=1024,
total=getattr(file, "file_size", 0), leave=True,
) as pbar:
with zipf.open(file) as fi, open(tofile, "wb") as fo:
copyfileobj(CallbackIOWrapper(pbar.update, fi), fo)
pbar.close()
def main():
defaultFileName = 'Table_Réf_Professionnels'
defaultExcelFileName = 'Table_Réf_Professionnels'
internalFileName = 'PS_LibreAcces_Personne_activite'
parser = argparse.ArgumentParser(description='Prepare Professionals Table for Import to Endoziwig.')
parser.add_argument('fileName', type=str, nargs='?', default=defaultFileName,
help=f'File name to use : default="{defaultFileName}"')
parser.add_argument('--excelFileName', '-x', type=str, nargs='?', default=defaultExcelFileName,
help=f'Excel File Containing Append Data: default="{defaultExcelFileName}" (without extension)')
parser.add_argument('--noDownload', '-ndw', action='store_true',
help='Do not Download the file (Default = Download).')
parser.add_argument('--noUnzip', '-nuz', action='store_true',
help='Do not Unzip the file (Default = Unzip).')
parser.add_argument('--noProcess', '-npr', action='store_true',
help='Do not Process the file (Default = Process).')
args = parser.parse_args()
if len(sys.argv) == 1:
print("You're about to download and prepare Professionals Table for import to Endoziwig")
# Files Settings
if args.fileName == defaultFileName:
print("\n")
args.fileName = questionary.text("Please confirm file name (or empty to cancel):",
default=defaultFileName).ask()
if args.fileName == '':
sys.exit(0)
BASE_DIR = path.dirname(path.abspath(__file__))
zipFileName = path.join(BASE_DIR, f'{args.fileName}.zip')
xlsFileName = path.join(BASE_DIR, f'{args.excelFileName}.xlsx')
txtFileName = path.join(BASE_DIR, f'{args.fileName}.txt')
outputFileName = path.join(BASE_DIR, f'{args.fileName}.csv')
print("\n")
if not args.noDownload:
download_file(
'https://service.annuaire.sante.fr/annuaire-sante-webservices/V300/services/extraction/PS_LibreAcces',
filename=zipFileName)
print("\n")
if not args.noUnzip:
unzipResult = extract_one_file_from_zip(zipFileName, internalFileName, txtFileName)
if unzipResult is not None:
print(unzipResult)
print("\n")
if not args.noProcess:
process_professionals_table(xlsFileName, txtFileName, outputFileName)
print("\n")
if __name__ == '__main__':
try :
main()
except(Exception) as e :
print(e)
finally :
input('Finished... Press Enter to continue')
print('\n')