206 lines
8.0 KiB
Python
206 lines
8.0 KiB
Python
import argparse
|
|
import math
|
|
import sys
|
|
from os import path
|
|
from shutil import copyfileobj
|
|
from zipfile import ZipFile, is_zipfile
|
|
import polars as pl
|
|
import requests
|
|
from tqdm import tqdm # could use from tqdm.gui import tqdm
|
|
from tqdm.utils import CallbackIOWrapper
|
|
from urllib3.exceptions import InsecureRequestWarning
|
|
from urllib3 import disable_warnings
|
|
import questionary
|
|
|
|
|
|
# Colonnes à effacer (données personnelles ou inutiles pour l'import)
|
|
COLUMNS_TO_CLEAR = [
|
|
"Type d'identifiant PP",
|
|
"Identification nationale PP",
|
|
"Libellé civilité d'exercice",
|
|
"Code civilité",
|
|
"Libellé civilité",
|
|
"Code profession",
|
|
"Code catégorie professionnelle",
|
|
"Libellé catégorie professionnelle",
|
|
"Code type savoir-faire",
|
|
"Libellé type savoir-faire",
|
|
"Code savoir-faire",
|
|
"Code mode exercice",
|
|
"Libellé mode exercice",
|
|
"Numéro SIRET site",
|
|
"Numéro SIREN site",
|
|
"Numéro FINESS site",
|
|
"Numéro FINESS établissement juridique",
|
|
"Identifiant technique de la structure",
|
|
"Raison sociale site",
|
|
"Enseigne commerciale site",
|
|
"Complément destinataire (coord. structure)",
|
|
"Complément point géographique (coord. structure)",
|
|
"Code type de voie (coord. structure)",
|
|
"Code postal (coord. structure)",
|
|
"Code commune (coord. structure)",
|
|
"Libellé commune (coord. structure)",
|
|
"Code pays (coord. structure)",
|
|
"Libellé pays (coord. structure)",
|
|
"Téléphone 2 (coord. structure)",
|
|
"Télécopie (coord. structure)",
|
|
"Adresse e-mail (coord. structure)",
|
|
"Code Département (structure)",
|
|
"Libellé Département (structure)",
|
|
"Ancien identifiant de la structure",
|
|
"Autorité d'enregistrement",
|
|
"Code secteur d'activité",
|
|
"Libellé secteur d'activité",
|
|
"Code section tableau pharmaciens",
|
|
"Libellé section tableau pharmaciens",
|
|
"Code rôle",
|
|
"Libellé rôle",
|
|
"Code genre activité",
|
|
"Libellé genre activité",
|
|
]
|
|
|
|
|
|
def process_professionals_table(xls_file, txt_file, output_file):
|
|
# Chargement des onglets Excel
|
|
professions = pl.read_excel(xls_file, sheet_name='F_Professions').get_column('Professions').to_list()
|
|
df_append = pl.read_excel(xls_file, sheet_name='F_Append_Update', infer_schema_length=0)
|
|
df_etrangers = pl.read_excel(xls_file, sheet_name='F_Etrangers', infer_schema_length=0)
|
|
df_fake = pl.read_excel(xls_file, sheet_name='F_Fake', infer_schema_length=0)
|
|
df_sophrolo = pl.read_excel(xls_file, sheet_name='F_Sophrologues', infer_schema_length=0)
|
|
|
|
# Colonnes à effacer présentes dans le fichier (filtre défensif)
|
|
schema = pl.scan_csv(txt_file, separator='|', infer_schema=False, quote_char=None).collect_schema()
|
|
cols_to_clear = [c for c in COLUMNS_TO_CLEAR if c in schema]
|
|
|
|
# Lecture lazy du CSV + filtrage + effacement des colonnes sensibles
|
|
with tqdm(desc=f'Processing {path.basename(txt_file)}', bar_format='{desc}... {elapsed}',
|
|
leave=True) as spinner:
|
|
result = (
|
|
pl.scan_csv(txt_file, separator='|', infer_schema=False, quote_char=None)
|
|
.filter(pl.col('Libellé profession').is_in(professions))
|
|
.with_columns([pl.lit('').alias(c) for c in cols_to_clear])
|
|
.collect()
|
|
)
|
|
spinner.set_description(f'Processing {path.basename(txt_file)} — {len(result)} lignes')
|
|
|
|
# Ajout des données complémentaires des onglets Excel
|
|
extra = (
|
|
pl.concat([df_append, df_etrangers, df_fake, df_sophrolo], how='diagonal_relaxed')
|
|
.filter(pl.col('Libellé profession').is_in(professions))
|
|
.with_columns([pl.lit('').alias(c) for c in cols_to_clear if c in df_append.columns])
|
|
)
|
|
|
|
final = pl.concat([result, extra], how='diagonal_relaxed')
|
|
final.write_csv(output_file, separator='|', quote_style='never', line_terminator='\n')
|
|
print(f"Written: {path.basename(output_file)} ({len(final)} lignes)")
|
|
|
|
|
|
def download_file(url: str, filename: str = False) -> object:
|
|
if not filename:
|
|
local_filename = path.join(".", url.split('/')[-1])
|
|
else:
|
|
local_filename = filename
|
|
disable_warnings(InsecureRequestWarning)
|
|
r = requests.get(url, stream=True, verify=False)
|
|
file_size = int(r.headers['Content-Length'])
|
|
unit_scale = 64
|
|
|
|
with open(local_filename, 'wb') as fp:
|
|
for chunk in tqdm(r.iter_content(chunk_size=unit_scale * 1024),
|
|
total=math.ceil(file_size / 1024 / unit_scale),
|
|
unit_scale=unit_scale,
|
|
unit='KB',
|
|
desc=f"Downloading to {path.basename(local_filename)}",
|
|
leave=True):
|
|
fp.write(chunk)
|
|
return
|
|
|
|
|
|
def extract_one_file_from_zip(zipfile, fromfile, tofile, desc=False):
|
|
if not desc:
|
|
desc = f"Extracting to {path.basename(tofile)}"
|
|
file = None
|
|
if not is_zipfile(zipfile):
|
|
return f"Can't open Zipfile (non existent or bad): {zipfile}"
|
|
zipf = ZipFile(zipfile)
|
|
for f in zipf.infolist():
|
|
if getattr(f, "filename", "").startswith(fromfile):
|
|
file = f
|
|
break
|
|
if file is None:
|
|
return f"No such file name in the Zip ({fromfile}*)..."
|
|
|
|
with zipf, tqdm(
|
|
desc=desc, unit="B", unit_scale=True, unit_divisor=1024,
|
|
total=getattr(file, "file_size", 0), leave=True,
|
|
) as pbar:
|
|
with zipf.open(file) as fi, open(tofile, "wb") as fo:
|
|
copyfileobj(CallbackIOWrapper(pbar.update, fi), fo)
|
|
pbar.close()
|
|
|
|
|
|
def main():
|
|
defaultFileName = 'Table_Réf_Professionnels'
|
|
defaultExcelFileName = 'Table_Réf_Professionnels'
|
|
internalFileName = 'PS_LibreAcces_Personne_activite'
|
|
|
|
parser = argparse.ArgumentParser(description='Prepare Professionals Table for Import to Endoziwig.')
|
|
parser.add_argument('fileName', type=str, nargs='?', default=defaultFileName,
|
|
help=f'File name to use : default="{defaultFileName}"')
|
|
parser.add_argument('--excelFileName', '-x', type=str, nargs='?', default=defaultExcelFileName,
|
|
help=f'Excel File Containing Append Data: default="{defaultExcelFileName}" (without extension)')
|
|
parser.add_argument('--noDownload', '-ndw', action='store_true',
|
|
help='Do not Download the file (Default = Download).')
|
|
parser.add_argument('--noUnzip', '-nuz', action='store_true',
|
|
help='Do not Unzip the file (Default = Unzip).')
|
|
parser.add_argument('--noProcess', '-npr', action='store_true',
|
|
help='Do not Process the file (Default = Process).')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if len(sys.argv) == 1:
|
|
print("You're about to download and prepare Professionals Table for import to Endoziwig")
|
|
|
|
# Files Settings
|
|
if args.fileName == defaultFileName:
|
|
print("\n")
|
|
args.fileName = questionary.text("Please confirm file name (or empty to cancel):",
|
|
default=defaultFileName).ask()
|
|
if args.fileName == '':
|
|
sys.exit(0)
|
|
|
|
BASE_DIR = path.dirname(path.abspath(__file__))
|
|
zipFileName = path.join(BASE_DIR, f'{args.fileName}.zip')
|
|
xlsFileName = path.join(BASE_DIR, f'{args.excelFileName}.xlsx')
|
|
txtFileName = path.join(BASE_DIR, f'{args.fileName}.txt')
|
|
outputFileName = path.join(BASE_DIR, f'{args.fileName}.csv')
|
|
|
|
print("\n")
|
|
|
|
if not args.noDownload:
|
|
download_file(
|
|
'https://service.annuaire.sante.fr/annuaire-sante-webservices/V300/services/extraction/PS_LibreAcces',
|
|
filename=zipFileName)
|
|
print("\n")
|
|
|
|
if not args.noUnzip:
|
|
unzipResult = extract_one_file_from_zip(zipFileName, internalFileName, txtFileName)
|
|
if unzipResult is not None:
|
|
print(unzipResult)
|
|
print("\n")
|
|
|
|
if not args.noProcess:
|
|
process_professionals_table(xlsFileName, txtFileName, outputFileName)
|
|
print("\n")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try :
|
|
main()
|
|
except(Exception) as e :
|
|
print(e)
|
|
finally :
|
|
input('Finished... Press Enter to continue')
|
|
print('\n')
|
|
|