Utilisation basique¶
Cette page présente quelques bases pour utiliser la toolbelt, mais elle n’est pas exhaustive et mieux vaut :
se référer aux tests unitaires (https://gitlab.gpf-tech.ign.fr/geoplateforme/gpf-entrepot-toolbelt-py/-/tree/main/tests) qui par définition utilisent les modules et classes
le code est documenté via des docstrings qui intègrent parfois des exemples (ici ou là notamment)
Importer le package¶
import gpf_entrepot_toolbelt
Importer seulement un objet, par exemple le statut :
from gpf_entrepot_toolbelt.orchestrator.status import Status
Charger et manipuler un fichier parameters.json
¶
from gpf_entrepot_toolbelt.orchestrator.models import GpfOrchestratorParameters
from os import getenv
params: GpfOrchestratorParameters = GpfOrchestratorParameters.from_json(
in_json_path=Path(f"{getenv('GPF_WORK_DIR')}/parameters.json")
)
Récupérer les paramètres de connexion à la base de données de sortie¶
# on ajoute les imports
import psycopg
[...]
output_db = params.output_database
sql = "SELECT PostGIS_Version();"
with psycopg.connect(output_db.pg_connection_uri) as conn:
result = conn.execute(sql)
print(result.fetchall())
Mettre à jour les informations sur une base de données de sortie¶
from gpf_entrepot_toolbelt.utils.database_inspection import PgDatabaseInspector
# Récupération de la base de données et de la stored data
output_stored_data = params.output_stored_data
output_db = params.output_database
# Inspection de la base de données, avec calcul de l'étendue
db_inspector = PgDatabaseInspector(output_db)
output_stored_data = db_inspector.update_stored_data(output_stored_data, True)
# Mise à jour des paramètres
params.output_stored_data = output_stored_data
Modifier le statut d’exécution du pipeline¶
# on ajoute les imports
from gpf_entrepot_toolbelt.orchestrator.status import Status
pipeline_status = params.pipeline_status
pipeline_status["vector-to-db"] = Status.SUCCESS
params.pipeline_status = pipeline_status
# Possibilité d'utiliser la méthode set_pipeline_status
params.set_pipeline_status("other-processing", Status.TECHNICAL_ERROR)
Télécharger les fichiers depuis le stockage objet¶
Types d’accès de stockage objets supportés :
API S3
Configuration¶
L’accès au stockage objet présuppose que des les paramètres d’authentification sont préalablement stockés sous forme de variables d’environnement (voir la page Configuration).
Certains paramètres ont des valeurs par défaut. Typiquement sur Linux, pour une configuration minimale :
Pour S3 :
export GPF_S3_KEY=32chiffresetlettresnonspeciales export GPF_S3_SECRETKEY=jesuisunbelinconnude32charactère
Utilisation via GpfOrchestratorParameters¶
Puis en Python :
# récupère le premier client de stockage parmi les inputs
in_storage_client = params.get_input_upload_object_storage_client()
# s'il y en a un, lancer le téléchargement dans un dossier
if in_storage_client:
in_storage_client.download_objects(
output_folder=Path("/tmp/gpf/local_storage/")
)
# ou avec un préfixe
in_storage_client.download_objects(
output_folder=Path("/tmp/gpf/local_storage/"),
prefix="statics/"
)
Une autre possibilité est de passer la propriété stored_data
:
output_stored_data = params.output_stored_data
if output_stored_data.storage and output_stored_data.storage.is_s3_storage:
output_stored_data.storage.storage_client.download_objects(
output_folder=Path("/tmp/gpf/local_storage/")
)
Utilisation directe¶
Il est également possible d’utiliser directement le client pour affiner les options.
Client S3¶
import json
from pathlib import Path
from gpf_entrepot_toolbelt.orchestrator.storage.s3_client import GpfS3Client
# lire le fichier JSON
with Path("parameters.json").open(mode="r", encoding="utf8") as in_json:
params = json.load(in_json)
input_storage_infos = params.get("inputs").get("uploads")[0]
# configurer le client
client = GpfS3Client(
# auth
access_key_secret="jesuisunbelinconnude32charactère",
access_key_id="32chiffresetlettresnonspeciales",
# config
bucket_name=input_storage_infos.get("storage").get("type_infos").get("pot_name"),
skip_checks=False,
output_dir=Path("/tmp/testgpf/s3"),
prefix=f"uploads/{input_storage_infos.get('_id')}",
header_user_agent="mon-app/1.1.0"
)
# télécharger (download)
client.download_objects(output_folder=Path("/tmp/gpf/s3/"))
# téléverser (upload), la clé est le chemin du fichier
client.upload_files(local_input_paths=Path("dossier/entier/à/uploader"))
# ou via une liste de fichiers filtrée, la clé est le chemin du fichier
li_files_to_upload = Path("dossier/entier/à/uploader").glob("**/*.tiff")
client.upload_files(local_input_paths=li_files_to_upload)
# téléverser (upload), avec définition d'un répertoire root qui sera utilisé pour définir la clé des fichiers (chemin relatif des fichiers depuis ce répertoire)
client.upload_files(local_input_paths=Path("dossier/entier/à/uploader"), root_dir=Path("dossier/entier/à/uploader"))
Écrire un fichier de sortie¶
L’écriture d’un fichier de sortie se fait à l’issue d’un traitement ou d’une vérification, même lorsque le processus se termine avec une erreur. Dans ce cas, le fichier de sortie est généré et contient le message de l’erreur produite.
Exemple d’appel à la fonction
fail_and_exit
qui écrit un fichier de sortie en cas d’une fin de programme en erreur :from gpf_entrepot_toolbelt.utils.exit_program import fail_and_exit try: parameters: GpfOrchestratorParameters = GpfOrchestratorParameters.from_json( Path(args.work_dir_path, args.input_configuration_file_name) ) except Exception as error: error_message = ( "Reading the input configuration file " f"({args.work_dir_path}/{args.input_configuration_file_name} failed. " f"Trace: {error}" ) logger.error(error_message) # Ecrire un fichier de sortie contenant l'erreur fail_and_exit(work_dir, error_message, logger)) sys.exit(error_message)
Exemple d’écriture d’un fichier de sortie à partir d’un fichier de paramètres :
from gpf_entrepot_toolbelt.models.parameters import GpfOrchestratorParameters from gpf_entrepot_toolbelt.utils.gpf_logger import gpf_logger_script logger = gpf_logger_script(verbosity=1, title="My awesome package") # Run parameters = GpfOrchestratorParameters.from_json( in_json_path=Path("my_gpf_workdir", "parameters.json") ) # Ecriture d'un fichier de sortie qui copie à l'identique parameters.json parameters.create_output_file(Path("my_gpf_workdir/output"), logger)
Utiliser le client HTTP minimaliste¶
Pour limiter les dépendances externes tierces, un client HTTP minimaliste est intégré puisque les besoins en termes d’impléntation du protocole, sont assez limités sur l’Entrepôt.
Le client permet :
types de requête : DELETE, GET, HEAD, OPTIONS, POST, PUT
téléchargement de fichier (GET ou POST)
authentification : basique (utilisateur/mot de passe), bearer token
options particulières : gestion du timeout et des en-têtes de requête
En cas de besoin plus important, se rabattre sur les bibliothèques externes les plus répandues et robustes :
urllib3
requests (surcouche à urllib3)
httpx
Astuce
Pour tester facilement le cliet HTTP en local :
docker run -p 9753:80 kennethreitz/httpbin
Requête GET¶
from gpf_entrepot_toolbelt.utils.simple_http_client import SimpleHttpClient
client = SimpleHttpClient(timeout=5)
response = client.get("http://localhost:9753/get")
print(response.status, response.reason)
print(response.content())
Télécharger un fichier (avec GET)¶
from gpf_entrepot_toolbelt.utils.simple_http_client import SimpleHttpClient
client = SimpleHttpClient()
file_path = client.download_file(
url="https://speed.hetzner.de/100MB.bin",
destination=Path("/tmp/ign/toolbelt/test.fake"),
)
print(file_path.exists())
Télécharger un fichier (avec POST)¶
cas d’usage : pour générer un token dans une API
import json
import pprint
from gpf_entrepot_toolbelt.utils.simple_http_client import SimpleHttpClient
client = SimpleHttpClient()
token_path = client.download_file(
url="https://panoramax.ign.fr/api/auth/tokens/generate",
headers={"accept": "application/json"},
data={"description": "GPF Toolbelt"},
destination=Path("/tmp/ign/toolbelt/test.fake"),
)
print(type(token_path))
token_claim_url = json.loads(token_path.read_bytes())
pprint.pprint(token_claim_url)