Files
professionals_from_sante_fr/prepareProfessionalsTable.py
2026-03-05 11:11:10 +00:00

164 lines
6.4 KiB
Python

import argparse
import csv
import math
import sys
from os import path
from shutil import copyfileobj
from zipfile import ZipFile, is_zipfile
import numpy as np
import pandas as pd
import requests
from tqdm import tqdm # could use from tqdm.gui import tqdm
from tqdm.utils import CallbackIOWrapper
from urllib3.exceptions import InsecureRequestWarning
from urllib3 import disable_warnings
import questionary
def process_professionals_table(xls_file, txt_file, output_file):
# Load Excel Dataframes
xls = pd.read_excel(xls_file, sheet_name=None, dtype=str,
na_values='', keep_default_na=False)
professions = xls['F_Professions']['Professions'].tolist()
# CSV Progressbar initialisation
estimated_total_rows = sum(1 for _ in open(txt_file, 'rb')) - 1
chunk_size = 20000
# Iterating over CSV file
columns_to_clean = np.r_[0, 2, 4:7, 9, 11:16, 17:28, 30, 35:40, 41:56]
with tqdm(total=estimated_total_rows, desc=f'Writing to {path.basename(output_file)}',
leave=True, unit="Ln") as bar:
for i, df in enumerate(pd.read_csv(txt_file, sep='|', doublequote=False, quoting=csv.QUOTE_NONE,
dtype=str, na_values='', keep_default_na=False, chunksize=chunk_size)):
n_rows = df.shape[0]
df.iloc[:, columns_to_clean] = ''
df = df[df['Libellé profession'].isin(professions)]
if i == 0:
df.to_csv(output_file, sep='|', index=False, doublequote=False, quoting=csv.QUOTE_NONE,
lineterminator='\n')
else:
df.to_csv(output_file, sep='|', index=False, doublequote=False, quoting=csv.QUOTE_NONE,
lineterminator='\n', header=False, mode='a')
bar.update(n_rows)
bar.close()
# Appending Other xls tabs
df = pd.concat([df[:0], xls['F_Append_Update'], xls['F_Etrangers'],
xls['F_Fake'], xls['F_Sophrologues']], ignore_index=True)
df.iloc[:, columns_to_clean] = ''
df = df[df['Libellé profession'].isin(professions)]
df.to_csv(output_file, sep='|', index=False, doublequote=False, quoting=csv.QUOTE_NONE,
lineterminator='\n', header=False, mode='a')
def download_file(url: str, filename: str = False) -> object:
if not filename:
local_filename = path.join(".", url.split('/')[-1])
else:
local_filename = filename
disable_warnings(InsecureRequestWarning)
r = requests.get(url, stream=True, verify=False)
file_size = int(r.headers['Content-Length'])
unit_scale = 64
with open(local_filename, 'wb') as fp:
for chunk in tqdm(r.iter_content(chunk_size=unit_scale * 1024),
total=math.ceil(file_size / 1024 / unit_scale),
unit_scale=unit_scale,
unit='KB',
desc=f"Downloading to {path.basename(local_filename)}",
leave=True):
fp.write(chunk)
return
def extract_one_file_from_zip(zipfile, fromfile, tofile, desc=False):
if not desc:
desc = f"Extracting to {path.basename(tofile)}"
file = None
if not is_zipfile(zipfile):
return f"Can't open Zipfile (non existent or bad): {zipfile}"
zipf = ZipFile(zipfile)
for f in zipf.infolist():
if getattr(f, "filename", "").startswith(fromfile):
file = f
break
if file is None:
return f"No such file name in the Zip ({fromfile}*)..."
with zipf, tqdm(
desc=desc, unit="B", unit_scale=True, unit_divisor=1024,
total=getattr(file, "file_size", 0), leave=True,
) as pbar:
with zipf.open(file) as fi, open(tofile, "wb") as fo:
copyfileobj(CallbackIOWrapper(pbar.update, fi), fo)
pbar.close()
def main():
defaultFileName = 'Table_Réf_Professionnels'
defaultExcelFileName = 'Table_Réf_Professionnels'
internalFileName = 'PS_LibreAcces_Personne_activite'
parser = argparse.ArgumentParser(description='Prepare Professionals Table for Import to Endoziwig.')
parser.add_argument('fileName', type=str, nargs='?', default=defaultFileName,
help=f'File name to use : default="{defaultFileName}"')
parser.add_argument('--excelFileName', '-x', type=str, nargs='?', default=defaultExcelFileName,
help=f'Excel File Containing Append Data: default="{defaultExcelFileName}" (without extension)')
parser.add_argument('--noDownload', '-ndw', action='store_true',
help='Do not Download the file (Default = Download).')
parser.add_argument('--noUnzip', '-nuz', action='store_true',
help='Do not Unzip the file (Default = Unzip).')
parser.add_argument('--noProcess', '-npr', action='store_true',
help='Do not Process the file (Default = Process).')
args = parser.parse_args()
if len(sys.argv) == 1:
print("You're about to download and prepare Professionals Table for import to Endoziwig")
# Files Settings
if args.fileName == defaultFileName:
print("\n")
args.fileName = questionary.text("Please confirm file name (or empty to cancel):",
default=defaultFileName).ask()
if args.fileName == '':
sys.exit(0)
BASE_DIR = path.dirname(path.abspath(__file__))
zipFileName = path.join(BASE_DIR, f'{args.fileName}.zip')
xlsFileName = path.join(BASE_DIR, f'{args.excelFileName}.xlsx')
txtFileName = path.join(BASE_DIR, f'{args.fileName}.txt')
outputFileName = path.join(BASE_DIR, f'{args.fileName}.csv')
print("\n")
if not args.noDownload:
download_file(
'https://service.annuaire.sante.fr/annuaire-sante-webservices/V300/services/extraction/PS_LibreAcces',
filename=zipFileName)
print("\n")
if not args.noUnzip:
unzipResult = extract_one_file_from_zip(zipFileName, internalFileName, txtFileName)
if unzipResult is not None:
print(unzipResult)
print("\n")
if not args.noProcess:
process_professionals_table(xlsFileName, txtFileName, outputFileName)
print("\n")
if __name__ == '__main__':
try :
main()
except(Exception) as e :
print(e)
finally :
input('Finished... Press Enter to continue')
print('\n')