import argparse import math import sys from os import path from shutil import copyfileobj from zipfile import ZipFile, is_zipfile import polars as pl import requests from tqdm import tqdm # could use from tqdm.gui import tqdm from tqdm.utils import CallbackIOWrapper from urllib3.exceptions import InsecureRequestWarning from urllib3 import disable_warnings import questionary # Colonnes à effacer (données personnelles ou inutiles pour l'import) COLUMNS_TO_CLEAR = [ "Type d'identifiant PP", "Identification nationale PP", "Libellé civilité d'exercice", "Code civilité", "Libellé civilité", "Code profession", "Code catégorie professionnelle", "Libellé catégorie professionnelle", "Code type savoir-faire", "Libellé type savoir-faire", "Code savoir-faire", "Code mode exercice", "Libellé mode exercice", "Numéro SIRET site", "Numéro SIREN site", "Numéro FINESS site", "Numéro FINESS établissement juridique", "Identifiant technique de la structure", "Raison sociale site", "Enseigne commerciale site", "Complément destinataire (coord. structure)", "Complément point géographique (coord. structure)", "Code type de voie (coord. structure)", "Code postal (coord. structure)", "Code commune (coord. structure)", "Libellé commune (coord. structure)", "Code pays (coord. structure)", "Libellé pays (coord. structure)", "Téléphone 2 (coord. structure)", "Télécopie (coord. structure)", "Adresse e-mail (coord. structure)", "Code Département (structure)", "Libellé Département (structure)", "Ancien identifiant de la structure", "Autorité d'enregistrement", "Code secteur d'activité", "Libellé secteur d'activité", "Code section tableau pharmaciens", "Libellé section tableau pharmaciens", "Code rôle", "Libellé rôle", "Code genre activité", "Libellé genre activité", ] def process_professionals_table(xls_file, txt_file, output_file): # Chargement des onglets Excel professions = pl.read_excel(xls_file, sheet_name='F_Professions').get_column('Professions').to_list() df_append = pl.read_excel(xls_file, sheet_name='F_Append_Update', infer_schema_length=0) df_etrangers = pl.read_excel(xls_file, sheet_name='F_Etrangers', infer_schema_length=0) df_fake = pl.read_excel(xls_file, sheet_name='F_Fake', infer_schema_length=0) df_sophrolo = pl.read_excel(xls_file, sheet_name='F_Sophrologues', infer_schema_length=0) # Colonnes à effacer présentes dans le fichier (filtre défensif) schema = pl.scan_csv(txt_file, separator='|', infer_schema=False, quote_char=None).collect_schema() cols_to_clear = [c for c in COLUMNS_TO_CLEAR if c in schema] # Lecture lazy du CSV + filtrage + effacement des colonnes sensibles with tqdm(desc=f'Processing {path.basename(txt_file)}', bar_format='{desc}... {elapsed}', leave=True) as spinner: result = ( pl.scan_csv(txt_file, separator='|', infer_schema=False, quote_char=None) .filter(pl.col('Libellé profession').is_in(professions)) .with_columns([pl.lit('').alias(c) for c in cols_to_clear]) .collect() ) spinner.set_description(f'Processing {path.basename(txt_file)} — {len(result)} lignes') # Ajout des données complémentaires des onglets Excel extra = ( pl.concat([df_append, df_etrangers, df_fake, df_sophrolo], how='diagonal_relaxed') .filter(pl.col('Libellé profession').is_in(professions)) .with_columns([pl.lit('').alias(c) for c in cols_to_clear if c in df_append.columns]) ) final = pl.concat([result, extra], how='diagonal_relaxed') final.write_csv(output_file, separator='|', quote_style='never', line_terminator='\n') print(f"Written: {path.basename(output_file)} ({len(final)} lignes)") def download_file(url: str, filename: str = False) -> object: if not filename: local_filename = path.join(".", url.split('/')[-1]) else: local_filename = filename disable_warnings(InsecureRequestWarning) r = requests.get(url, stream=True, verify=False) file_size = int(r.headers['Content-Length']) unit_scale = 64 with open(local_filename, 'wb') as fp: for chunk in tqdm(r.iter_content(chunk_size=unit_scale * 1024), total=math.ceil(file_size / 1024 / unit_scale), unit_scale=unit_scale, unit='KB', desc=f"Downloading to {path.basename(local_filename)}", leave=True): fp.write(chunk) return def extract_one_file_from_zip(zipfile, fromfile, tofile, desc=False): if not desc: desc = f"Extracting to {path.basename(tofile)}" file = None if not is_zipfile(zipfile): return f"Can't open Zipfile (non existent or bad): {zipfile}" zipf = ZipFile(zipfile) for f in zipf.infolist(): if getattr(f, "filename", "").startswith(fromfile): file = f break if file is None: return f"No such file name in the Zip ({fromfile}*)..." with zipf, tqdm( desc=desc, unit="B", unit_scale=True, unit_divisor=1024, total=getattr(file, "file_size", 0), leave=True, ) as pbar: with zipf.open(file) as fi, open(tofile, "wb") as fo: copyfileobj(CallbackIOWrapper(pbar.update, fi), fo) pbar.close() def main(): defaultFileName = 'Table_Réf_Professionnels' defaultExcelFileName = 'Table_Réf_Professionnels' internalFileName = 'PS_LibreAcces_Personne_activite' parser = argparse.ArgumentParser(description='Prepare Professionals Table for Import to Endoziwig.') parser.add_argument('fileName', type=str, nargs='?', default=defaultFileName, help=f'File name to use : default="{defaultFileName}"') parser.add_argument('--excelFileName', '-x', type=str, nargs='?', default=defaultExcelFileName, help=f'Excel File Containing Append Data: default="{defaultExcelFileName}" (without extension)') parser.add_argument('--noDownload', '-ndw', action='store_true', help='Do not Download the file (Default = Download).') parser.add_argument('--noUnzip', '-nuz', action='store_true', help='Do not Unzip the file (Default = Unzip).') parser.add_argument('--noProcess', '-npr', action='store_true', help='Do not Process the file (Default = Process).') args = parser.parse_args() if len(sys.argv) == 1: print("You're about to download and prepare Professionals Table for import to Endoziwig") # Files Settings if args.fileName == defaultFileName: print("\n") args.fileName = questionary.text("Please confirm file name (or empty to cancel):", default=defaultFileName).ask() if args.fileName == '': sys.exit(0) BASE_DIR = path.dirname(path.abspath(__file__)) zipFileName = path.join(BASE_DIR, f'{args.fileName}.zip') xlsFileName = path.join(BASE_DIR, f'{args.excelFileName}.xlsx') txtFileName = path.join(BASE_DIR, f'{args.fileName}.txt') outputFileName = path.join(BASE_DIR, f'{args.fileName}.csv') print("\n") if not args.noDownload: download_file( 'https://service.annuaire.sante.fr/annuaire-sante-webservices/V300/services/extraction/PS_LibreAcces', filename=zipFileName) print("\n") if not args.noUnzip: unzipResult = extract_one_file_from_zip(zipFileName, internalFileName, txtFileName) if unzipResult is not None: print(unzipResult) print("\n") if not args.noProcess: process_professionals_table(xlsFileName, txtFileName, outputFileName) print("\n") if __name__ == '__main__': try : main() except(Exception) as e : print(e) finally : input('Finished... Press Enter to continue') print('\n')