Utilisation basique

Cette page présente quelques bases pour utiliser la toolbelt, mais elle n’est pas exhaustive et mieux vaut :

Importer le package

import gpf_entrepot_toolbelt

Importer seulement un objet, par exemple le statut :

from gpf_entrepot_toolbelt.orchestrator.status import Status

Charger et manipuler un fichier parameters.json

from gpf_entrepot_toolbelt.orchestrator.models import GpfOrchestratorParameters
from os import getenv

params: GpfOrchestratorParameters = GpfOrchestratorParameters.from_json(
    in_json_path=Path(f"{getenv('GPF_WORK_DIR')}/parameters.json")
)

Récupérer les paramètres de connexion à la base de données de sortie

# on ajoute les imports
import psycopg

[...]


output_db = params.output_database

sql = "SELECT PostGIS_Version();"

with psycopg.connect(output_db.pg_connection_uri) as conn:
    result = conn.execute(sql)

print(result.fetchall())

Mettre à jour les informations sur une base de données de sortie

from gpf_entrepot_toolbelt.utils.database_inspection import PgDatabaseInspector

# Récupération de la base de données et de la stored data
output_stored_data = params.output_stored_data
output_db = params.output_database

# Inspection de la base de données, avec calcul de l'étendue
db_inspector = PgDatabaseInspector(output_db)
output_stored_data = db_inspector.update_stored_data(output_stored_data, True)

# Mise à jour des paramètres
params.output_stored_data = output_stored_data

Modifier le statut d’exécution du pipeline

# on ajoute les imports
from gpf_entrepot_toolbelt.orchestrator.status import Status

pipeline_status = params.pipeline_status
pipeline_status["vector-to-db"] = Status.SUCCESS
params.pipeline_status = pipeline_status

# Possibilité d'utiliser la méthode set_pipeline_status
params.set_pipeline_status("other-processing", Status.TECHNICAL_ERROR)

Télécharger les fichiers depuis le stockage objet

Types d’accès de stockage objets supportés :

  • API S3

Configuration

L’accès au stockage objet présuppose que des les paramètres d’authentification sont préalablement stockés sous forme de variables d’environnement (voir la page Configuration).
Certains paramètres ont des valeurs par défaut. Typiquement sur Linux, pour une configuration minimale :

  • Pour S3 :

    export GPF_S3_KEY=32chiffresetlettresnonspeciales
    export GPF_S3_SECRETKEY=jesuisunbelinconnude32charactère
    

Utilisation via GpfOrchestratorParameters

Puis en Python :

# récupère le premier client de stockage parmi les inputs
in_storage_client = params.get_input_upload_object_storage_client()
# s'il y en a un, lancer le téléchargement dans un dossier
if in_storage_client:
    in_storage_client.download_objects(
        output_folder=Path("/tmp/gpf/local_storage/")
    )

    # ou avec un préfixe
    in_storage_client.download_objects(
        output_folder=Path("/tmp/gpf/local_storage/"),
        prefix="statics/"
    )

Une autre possibilité est de passer la propriété stored_data :

output_stored_data = params.output_stored_data
if output_stored_data.storage and output_stored_data.storage.is_s3_storage:
        output_stored_data.storage.storage_client.download_objects(
            output_folder=Path("/tmp/gpf/local_storage/")
        )

Utilisation directe

Il est également possible d’utiliser directement le client pour affiner les options.

Client S3
import json
from pathlib import Path

from gpf_entrepot_toolbelt.orchestrator.storage.s3_client import GpfS3Client

# lire le fichier JSON
with Path("parameters.json").open(mode="r", encoding="utf8") as in_json:
  params = json.load(in_json)

input_storage_infos = params.get("inputs").get("uploads")[0]

# configurer le client
client = GpfS3Client(
  # auth
  access_key_secret="jesuisunbelinconnude32charactère",
  access_key_id="32chiffresetlettresnonspeciales",
  # config
  bucket_name=input_storage_infos.get("storage").get("type_infos").get("pot_name"),
  skip_checks=False,
  output_dir=Path("/tmp/testgpf/s3"),
  prefix=f"uploads/{input_storage_infos.get('_id')}",
  header_user_agent="mon-app/1.1.0"
)

# télécharger (download)
client.download_objects(output_folder=Path("/tmp/gpf/s3/"))

# téléverser (upload), la clé est le chemin du fichier
client.upload_files(local_input_paths=Path("dossier/entier/à/uploader"))
# ou via une liste de fichiers filtrée, la clé est le chemin du fichier
li_files_to_upload = Path("dossier/entier/à/uploader").glob("**/*.tiff")
client.upload_files(local_input_paths=li_files_to_upload)

# téléverser (upload), avec définition d'un répertoire root qui sera utilisé pour définir la clé des fichiers (chemin relatif des fichiers depuis ce répertoire)
client.upload_files(local_input_paths=Path("dossier/entier/à/uploader"), root_dir=Path("dossier/entier/à/uploader"))

Écrire un fichier de sortie

L’écriture d’un fichier de sortie se fait à l’issue d’un traitement ou d’une vérification, même lorsque le processus se termine avec une erreur. Dans ce cas, le fichier de sortie est généré et contient le message de l’erreur produite.

  • Exemple d’appel à la fonction fail_and_exit qui écrit un fichier de sortie en cas d’une fin de programme en erreur :

    from gpf_entrepot_toolbelt.utils.exit_program import fail_and_exit
    
    try:
        parameters: GpfOrchestratorParameters = GpfOrchestratorParameters.from_json(
            Path(args.work_dir_path, args.input_configuration_file_name)
        )
    except Exception as error:
        error_message = (
            "Reading the input configuration file "
            f"({args.work_dir_path}/{args.input_configuration_file_name} failed. "
            f"Trace: {error}"
        )
        logger.error(error_message)
        # Ecrire un fichier de sortie contenant l'erreur
        fail_and_exit(work_dir, error_message, logger))
    
        sys.exit(error_message)
    
  • Exemple d’écriture d’un fichier de sortie à partir d’un fichier de paramètres :

    from gpf_entrepot_toolbelt.models.parameters import GpfOrchestratorParameters
    from gpf_entrepot_toolbelt.utils.gpf_logger import gpf_logger_script
    
    
    logger = gpf_logger_script(verbosity=1, title="My awesome package")
    
    # Run
    parameters = GpfOrchestratorParameters.from_json(
            in_json_path=Path("my_gpf_workdir", "parameters.json")
        )
    # Ecriture d'un fichier de sortie qui copie à l'identique parameters.json
    parameters.create_output_file(Path("my_gpf_workdir/output"), logger)
    

Utiliser le client HTTP minimaliste

Pour limiter les dépendances externes tierces, un client HTTP minimaliste est intégré puisque les besoins en termes d’impléntation du protocole, sont assez limités sur l’Entrepôt.

Le client permet :

  • types de requête : DELETE, GET, HEAD, OPTIONS, POST, PUT

  • téléchargement de fichier (GET ou POST)

  • authentification : basique (utilisateur/mot de passe), bearer token

  • options particulières : gestion du timeout et des en-têtes de requête

En cas de besoin plus important, se rabattre sur les bibliothèques externes les plus répandues et robustes :

  • urllib3

  • requests (surcouche à urllib3)

  • httpx

Astuce

Pour tester facilement le cliet HTTP en local :

docker run -p 9753:80 kennethreitz/httpbin

Requête GET

from gpf_entrepot_toolbelt.utils.simple_http_client import SimpleHttpClient

client = SimpleHttpClient(timeout=5)

response = client.get("http://localhost:9753/get")
print(response.status, response.reason)
print(response.content())

Télécharger un fichier (avec GET)

from gpf_entrepot_toolbelt.utils.simple_http_client import SimpleHttpClient

client = SimpleHttpClient()
file_path = client.download_file(
    url="https://speed.hetzner.de/100MB.bin",
    destination=Path("/tmp/ign/toolbelt/test.fake"),
)
print(file_path.exists())

Télécharger un fichier (avec POST)

cas d’usage : pour générer un token dans une API

import json
import pprint

from gpf_entrepot_toolbelt.utils.simple_http_client import SimpleHttpClient

client = SimpleHttpClient()

token_path = client.download_file(
    url="https://panoramax.ign.fr/api/auth/tokens/generate",
    headers={"accept": "application/json"},
    data={"description": "GPF Toolbelt"},
    destination=Path("/tmp/ign/toolbelt/test.fake"),
)
print(type(token_path))
token_claim_url = json.loads(token_path.read_bytes())
pprint.pprint(token_claim_url)