gpf_entrepot_toolbelt.orchestrator.models.parameters module¶
Model definition for GPF orchestrator parameters and related sugar.
Author: Julien Moura (Oslandia)
- class gpf_entrepot_toolbelt.orchestrator.models.parameters.GpfOrchestratorParameters(_id: str | None = None, inputs: dict[dict, Any] | None = None, job_name: str | None = None, output: dict[dict, Any] | None = None, parameters: dict[dict, Any] | None = None, pipeline_status: dict[dict, Any] | None = None, json_ref_path: Path | None = None, loaded_from_json: bool = False, **kwargs)¶
Bases :
object
Object definition for GPF orchestrator parameters.
- ATTR_MAP = {}¶
- MODEL_ATTR = ('_id', 'inputs', 'job_name', 'output', 'parameters', 'pipeline_status')¶
- __init__(_id: str | None = None, inputs: dict[dict, Any] | None = None, job_name: str | None = None, output: dict[dict, Any] | None = None, parameters: dict[dict, Any] | None = None, pipeline_status: dict[dict, Any] | None = None, json_ref_path: Path | None = None, loaded_from_json: bool = False, **kwargs)¶
Initialize an orchestrator parameters object.
- create_output_file(work_dir: Path) None ¶
Create output file for a GpfOrchestratorParameters.
- Paramètres:
work_dir (Path) – Input working directory. The folder must exist.
- classmethod from_json(in_json_path: Path) Self ¶
Load object from a JSON file.
- Paramètres:
in_json_path (Path) – path to the json file
- Renvoie:
object with attributes filled from JSON.
- Type renvoyé:
Self
Exemple
parameters = GpfOrchestratorParameters.from_json( Path("workd_dir/parameters.json") ) print(GpfOrchestratorParameters.executionId)
- get_database_from_stored_data(stored_data: GpfStoredData) PgDatabase | None ¶
- Returns database from stored data with user and password information
extracted from environment variables.
- Args
stored_data (GpfStoredData): stored data
- Returns
Optional[PgDatabase]: all information to connect database if no error, None otherwise
- get_input_upload_object_storage_client(upload_position: int = 0, first: bool = True) GpfS3Client | None ¶
- Returns the object storage client from input/uploads. Useful to download
input data to check or process.
- Paramètres:
upload_position (int, optional) – index of upload to return. Defaults to 0.
first (bool, optional) – if True, it returns the first upload which is an object storage. Defaults to True.
- Renvoie:
object storage client
- Type renvoyé:
Union[GpfS3Client, None]
- property global_extent: GpfExtent | None¶
return global extent
- Renvoie:
global extent
- Type renvoyé:
Optional[GpfExtent]
- property id: str | None¶
Returns the _id.
- Returns
str: object _id
- import_input_files(dest: Path, strict: bool = True) tuple[list[str], list[str]] ¶
- Importe tous les fichiers définis dans les upload/stored_data d’input
dans le dossier spécifié. Les fichiers sont copiés dans un répertoire upload/stored_data._id.
- Paramètres:
dest (Path) – Dossier de destination des fichiers
strict (bool, optional) – active le mode strict. Defaults to True.
- Renvoie:
- tuple des listes d’objets téléchargés avec succès et
ceux dont le téléchargement a échoué.
- Type renvoyé:
tuple[list[str], list[str]]
- property input_databases: dict[str, PgDatabase] | None¶
Returns dict of input database by stored data id
- Returns
Dict[str, PgDatabase]: database for each database input stored data
- property input_extent: GpfExtent | None¶
return input extent
- Renvoie:
input extent
- Type renvoyé:
Optional[GpfExtent]
- property input_stored_datas: list[GpfStoredData]¶
Returns list of input stored datas
- Returns
List[GpfStoredData]: list of input stored data
- property input_uploads: list[GpfUpload]¶
Returns list of input uploads.
- Returns
List[GpfUpload]: list of input uploads.
- property inputs: dict | None¶
Returns the inputs.
- Returns
str: object inputs
- is_any_previous_step_is_failed() bool ¶
Check if at least one pipeline status is not SUCCESS.
- Renvoie:
True if there is an error in the pipeline status, False otherwise.
- Type renvoyé:
bool
- property is_loaded_from_json: bool¶
Tells if the object has been loaded from a JSON file.
- Returns
bool: True if the object has been loaded from a JSON file
- property job_name: str¶
Returns the job_name.
- Returns
str: object job_name
- property json_ref_path: Path | None¶
Returns the path to the corresponding JSON path.
- Returns
Path: input JSON path
- property output: dict | None¶
Returns the output.
- Returns
dict: object output
- property output_database: PgDatabase | None¶
Returns output database if output is a database, None otherwise or if an error occurs
- Renvoie:
database connection information, None in case of error
- Type renvoyé:
Optional[PgDatabase]
- property output_filename: str¶
Returns the output filename when object is serialized into JSON.
- Renvoie:
JSON filename ready for output
- Type renvoyé:
str
- property output_stored_data: GpfStoredData | None¶
Returns output stored data if available, None otherwise
- Returns
Optional[GpfStoredData] output stored data if available, None otherwise
- property output_upload: GpfUpload | None¶
Returns output upload if available, None otherwise
- Returns
Optional[GpfUpload] output upload if available, None otherwise
- property parameters: dict¶
Returns the parameters. Dict of parameters to pass as to job execution.
- Returns
dict: object parameters
- property pipeline_status: dict[str, Status]¶
Returns a dict with the result of every job which runs into the pipeline.
- Renvoie:
pipeline jobs statuses
- Type renvoyé:
dict[str, Status]
- retrieve_input_files(dest: Path, strict: bool = True) tuple[dict, list[str]] ¶
- Pour les upload d’input :
Si le stockage est S3 : Copie tous les fichiers dans le dossier spécifié, préfixé par l’_id de la donnée
Si le stockage est FILESYSTEM : Référence le chemin d’accès aux fichiers, sans recopie
- Paramètres:
dest (Path) – Dossier de destination des fichiers copiés
strict (bool, optional) – active le mode strict. Default à True.
- Returns
- tuple[dict, list[str]]:
data_directories (dict(str, Path)): Un dictionnaire d’objets _id/path : - _id est l’identifiant UUID de la donnée en entrée (upload) - path est le chemin d’accès aux fichiers (copiés, ou référencés)
failures (list[str]): La liste des erreurs
- set_pipeline_status(processing_name: str, status: Status) None ¶
Set processing status to pipeline_status.
- Paramètres:
processing_name (str) – processing name
status (Status) – processing status
- to_dict() dict ¶
Convert object into dictionary, handling some subobject to_dict method.
Supports : - use of to_dict method in list or dict object - use of Enum.name in dict object in case of Enum values
- Renvoie:
object as dictionary
- Type renvoyé:
dict
- to_json(**kwargs) str ¶
Supersedes json.dumps using the dictionary returned by to_dict().
- Renvoie:
object serialized as JSON string
- Type renvoyé:
str
Exemple
from pathlib import Path params = GpfOrchestratorParameters.from_json( in_json_path=Path("workdir/parameters.json") ) # create output file out_file = Path(f"/tmp/{params.output_filename}") out_file.parent.mkdir(parents=True, exist_ok=True) # write into the file passing extra parameters to json.dumps with out_file.open("w", encoding="UTF8") as wf: wf.write(params.to_json(sort_keys=True))