gpf_entrepot_toolbelt.orchestrator.models.parameters module

Model definition for GPF orchestrator parameters and related sugar.

Author: Julien Moura (Oslandia)

class gpf_entrepot_toolbelt.orchestrator.models.parameters.GpfOrchestratorParameters(_id: str | None = None, inputs: dict[dict, Any] | None = None, job_name: str | None = None, output: dict[dict, Any] | None = None, parameters: dict[dict, Any] | None = None, pipeline_status: dict[dict, Any] | None = None, json_ref_path: Path | None = None, loaded_from_json: bool = False, **kwargs)

Bases : object

Object definition for GPF orchestrator parameters.

ATTR_MAP = {}
MODEL_ATTR = ('_id', 'inputs', 'job_name', 'output', 'parameters', 'pipeline_status')
__init__(_id: str | None = None, inputs: dict[dict, Any] | None = None, job_name: str | None = None, output: dict[dict, Any] | None = None, parameters: dict[dict, Any] | None = None, pipeline_status: dict[dict, Any] | None = None, json_ref_path: Path | None = None, loaded_from_json: bool = False, **kwargs)

Initialize an orchestrator parameters object.

create_output_file(work_dir: Path) None

Create output file for a GpfOrchestratorParameters.

Paramètres:

work_dir (Path) – Input working directory. The folder must exist.

classmethod from_json(in_json_path: Path) Self

Load object from a JSON file.

Paramètres:

in_json_path (Path) – path to the json file

Renvoie:

object with attributes filled from JSON.

Type renvoyé:

Self

Exemple

parameters = GpfOrchestratorParameters.from_json(
        Path("workd_dir/parameters.json")
    )
print(GpfOrchestratorParameters.executionId)
get_database_from_stored_data(stored_data: GpfStoredData) PgDatabase | None
Returns database from stored data with user and password information

extracted from environment variables.

Args

stored_data (GpfStoredData): stored data

Returns

Optional[PgDatabase]: all information to connect database if no error, None otherwise

get_input_upload_object_storage_client(upload_position: int = 0, first: bool = True) GpfS3Client | None
Returns the object storage client from input/uploads. Useful to download

input data to check or process.

Paramètres:
  • upload_position (int, optional) – index of upload to return. Defaults to 0.

  • first (bool, optional) – if True, it returns the first upload which is an object storage. Defaults to True.

Renvoie:

object storage client

Type renvoyé:

Union[GpfS3Client, None]

property global_extent: GpfExtent | None

return global extent

Renvoie:

global extent

Type renvoyé:

Optional[GpfExtent]

property id: str | None

Returns the _id.

Returns

str: object _id

import_input_files(dest: Path, strict: bool = True) tuple[list[str], list[str]]
Importe tous les fichiers définis dans les upload/stored_data d’input

dans le dossier spécifié. Les fichiers sont copiés dans un répertoire upload/stored_data._id.

Paramètres:
  • dest (Path) – Dossier de destination des fichiers

  • strict (bool, optional) – active le mode strict. Defaults to True.

Renvoie:

tuple des listes d’objets téléchargés avec succès et

ceux dont le téléchargement a échoué.

Type renvoyé:

tuple[list[str], list[str]]

property input_databases: dict[str, PgDatabase] | None

Returns dict of input database by stored data id

Returns

Dict[str, PgDatabase]: database for each database input stored data

property input_extent: GpfExtent | None

return input extent

Renvoie:

input extent

Type renvoyé:

Optional[GpfExtent]

property input_stored_datas: list[GpfStoredData]

Returns list of input stored datas

Returns

List[GpfStoredData]: list of input stored data

property input_uploads: list[GpfUpload]

Returns list of input uploads.

Returns

List[GpfUpload]: list of input uploads.

property inputs: dict | None

Returns the inputs.

Returns

str: object inputs

is_any_previous_step_is_failed() bool

Check if at least one pipeline status is not SUCCESS.

Renvoie:

True if there is an error in the pipeline status, False otherwise.

Type renvoyé:

bool

property is_loaded_from_json: bool

Tells if the object has been loaded from a JSON file.

Returns

bool: True if the object has been loaded from a JSON file

property job_name: str

Returns the job_name.

Returns

str: object job_name

property json_ref_path: Path | None

Returns the path to the corresponding JSON path.

Returns

Path: input JSON path

property output: dict | None

Returns the output.

Returns

dict: object output

property output_database: PgDatabase | None

Returns output database if output is a database, None otherwise or if an error occurs

Renvoie:

database connection information, None in case of error

Type renvoyé:

Optional[PgDatabase]

property output_filename: str

Returns the output filename when object is serialized into JSON.

Renvoie:

JSON filename ready for output

Type renvoyé:

str

property output_stored_data: GpfStoredData | None

Returns output stored data if available, None otherwise

Returns

Optional[GpfStoredData] output stored data if available, None otherwise

property output_upload: GpfUpload | None

Returns output upload if available, None otherwise

Returns

Optional[GpfUpload] output upload if available, None otherwise

property parameters: dict

Returns the parameters. Dict of parameters to pass as to job execution.

Returns

dict: object parameters

property pipeline_status: dict[str, Status]

Returns a dict with the result of every job which runs into the pipeline.

Renvoie:

pipeline jobs statuses

Type renvoyé:

dict[str, Status]

retrieve_input_files(dest: Path, strict: bool = True) tuple[dict, list[str]]
Pour les upload d’input :
  • Si le stockage est S3 : Copie tous les fichiers dans le dossier spécifié, préfixé par l’_id de la donnée

  • Si le stockage est FILESYSTEM : Référence le chemin d’accès aux fichiers, sans recopie

Paramètres:
  • dest (Path) – Dossier de destination des fichiers copiés

  • strict (bool, optional) – active le mode strict. Default à True.

Returns
tuple[dict, list[str]]:
  • data_directories (dict(str, Path)): Un dictionnaire d’objets _id/path : - _id est l’identifiant UUID de la donnée en entrée (upload) - path est le chemin d’accès aux fichiers (copiés, ou référencés)

  • failures (list[str]): La liste des erreurs

set_pipeline_status(processing_name: str, status: Status) None

Set processing status to pipeline_status.

Paramètres:
  • processing_name (str) – processing name

  • status (Status) – processing status

to_dict() dict

Convert object into dictionary, handling some subobject to_dict method.

Supports : - use of to_dict method in list or dict object - use of Enum.name in dict object in case of Enum values

Renvoie:

object as dictionary

Type renvoyé:

dict

to_json(**kwargs) str

Supersedes json.dumps using the dictionary returned by to_dict().

Renvoie:

object serialized as JSON string

Type renvoyé:

str

Exemple

from pathlib import Path

params = GpfOrchestratorParameters.from_json(
    in_json_path=Path("workdir/parameters.json")
)

# create output file
out_file = Path(f"/tmp/{params.output_filename}")
out_file.parent.mkdir(parents=True, exist_ok=True)

# write into the file passing extra parameters to json.dumps
with out_file.open("w", encoding="UTF8") as wf:
    wf.write(params.to_json(sort_keys=True))