gpf_pyroger.publish_agent_core module#

Classe générique destinée à être héritée par les agents de publication pour les échanges avec le BUS (RabbitMQ) de l’Entrepôt.

class gpf_pyroger.publish_agent_core.PublishAgent(endpoint_name: str, pod_id: str, url_entrepot: str, exchange_name: str = 'my_exchange', publication_result_topic: str = 'publication-endpoint-update', health_filepath: Path | None = None, host: str = 'localhost', port: int = 5672, keycloack_user_id: str | None = None, user_agent: str = 'PyRoger/1.8.0', rabbit_login: str | None = None, rabbit_password: str | None = None, service_url_health_check: str = '', listening_mode: bool = True, consistency_checking_timer: int = 300, consistency_checking_enabled: bool = True, mattermost_webhook_scheme: str = 'http', mattermost_webhook_host: str = 'mattermost.gpf-tech.ign.fr', mattermost_webhook_port: int = 80, mattermost_webhook_token: str | None = None, mattermost_request_timeout: int = 60, types_config: list[str] | None = None)#

Bases : object

Cette classe définit un agent de publication.

Un agent de publication reçoit les messages de publications envoyés par l’API Entrepôt, il effectue toutes les opérations spécifiques nécessaires à la publication, et renvoie un message lorsque toutes les opérations sont effectuées.

__init__(endpoint_name: str, pod_id: str, url_entrepot: str, exchange_name: str = 'my_exchange', publication_result_topic: str = 'publication-endpoint-update', health_filepath: Path | None = None, host: str = 'localhost', port: int = 5672, keycloack_user_id: str | None = None, user_agent: str = 'PyRoger/1.8.0', rabbit_login: str | None = None, rabbit_password: str | None = None, service_url_health_check: str = '', listening_mode: bool = True, consistency_checking_timer: int = 300, consistency_checking_enabled: bool = True, mattermost_webhook_scheme: str = 'http', mattermost_webhook_host: str = 'mattermost.gpf-tech.ign.fr', mattermost_webhook_port: int = 80, mattermost_webhook_token: str | None = None, mattermost_request_timeout: int = 60, types_config: list[str] | None = None)#

Constructeur de la classe PublishAgent.

Les paramètres à entrer sont principalement issus de variables d’environnement.

Paramètres:
  • endpoint_name (str) – nom du endpoint, fourni par la variable d’environnement GPF_ENDPOINT_NAME

  • pod_id (str) – nom de l’identifiant de service, fourni par la variable d’environnement GEOSERVICE_POD_NAME

  • url_entrepot (str) – URL de l’Entrepôt, fournie par la variable d’environnement GPF_WAREHOUSE_URL

  • exchange_name (str) – nom de l’exchange du bus sur lequel écouter, vaut « my_exchange » par défaut. Ne doit pas être une chaîne vide.

  • publication_result_topic (str) – topic du bus sur lequel l’agent envoie une réponse, fourni par la variable d’environnement PUBLICATION_RESULT_TOPIC

  • health_filepath (str) – chemin vers le fichier où stocker le statut de l’agent au démarrage

  • host (str) – nom de l’hôte du serveur RabbitMQ, par défaut vaut « localhost »

  • port (int) – port du serveur rabbitMQ, vaut 5672 par défaut

  • keycloack_user_id (str) – id de l’utilisateur technique qui a le droits d’utiliser les appels en /technical

  • user_agent (str) – nom du user_agent à envoyer lors des requètes vers l’API Entrepot

  • rabbit_login (str) – utilisateur du bus RabbitMQ

  • rabbit_password (str) – mot de passe pour le bus RabbitMQ

  • service_url_health_check (str, optional) – url de vérification de l’état du service, fourni par une variable GEOSERVICE_URL_HEALTHCHECK

  • listening_mode (bool, optional) – lance l’attente de réception de nouveaux messages. Vaut True par défaut.

  • consistency_checking_timer (int) – temps de repos (en secondes) entre deux vérification de cohérence

  • mattermost_webhook_scheme (str) – schéma de l’url du webhook mattermost

  • mattermost_webhook_host (str) – host du webhook mattermost

  • mattermost_webhook_port (int) – port du webhook mattermost

  • mattermost_webhook_token (str) – token du webhook mattermost

  • types_config (list[str]) – type(s) de offering possible sur l’agent

connect_to_entrepot() None#

Cette fonction met en place un bus d’écoute vers l’entrepôt. Elle enregistre les JSON correspondant aux nouvelles offres qui doivent être publiées par les agents.

Attention, cette fonction ne fait que réceptionner les offres, mais elle ne les lit pas.

consistency_control()#
get_from_api_details_offering(id_offering: str) dict | None#

Fonction qui appelle l’API entrepôt pour récupérer le détail d’une offre.

Paramètres:

id_offering (str) – Identifiant de l’offre

Renvoie:

détail de l’offre

Type renvoyé:

dict

Exemple

# structure d'une offre

{
    _id    : ""
    type: ""
    configuration_type: ""
    layer_name: ",
    type_infos: {},
    metadata :  [
        {
            format: "",
            url: "",
            type: ""
        }
    ],
    attribution : {
        title:   ""
        url:  ""
        logo: {
            format: "",
            url: "",
            width: None,
            height: None,
        }
    }
}
get_from_api_offerings() list#

Fonction qui appelle l’API entrepôt pour lister les offres actuellements publiées ou en cours de publication sur son point d’accès.

Renvoie:

Liste des offres

Type renvoyé:

list

abstract get_published_offerings() list[str]#

Fonction qui récupère la liste des offres publiées sur le serveur.

Attention, cette méthode est à implémenter dans les agents spécifiques qui héritent d’une classe PublishAgent de Pyroger.

Renvoie:

La liste des offres publiées.

Type renvoyé:

List[str]

abstract is_offering_published(offering: dict) bool#

Fonction qui vérifie si une offre est déjà publiée sur le serveur.

Attention, cette méthode est à implémenter dans les agents spécifiques qui héritent d’une classe PublishAgent de Pyroger.

Paramètres:

offering (dict) – offre courante

Renvoie:

True si l’offre a déjà été publiée précédemment, False sinon.

Type renvoyé:

bool

is_service_available() bool#

Vérifie si le service est disponible.

Paramètres:

service_url_health_check (str) – url de vérification de l’état du service

Renvoie:

True si service disponible, False autrement

Type renvoyé:

bool

on_message_callback(ch: Channel, method: Deliver, props: BasicProperties, body: bytes)#

Fonction de déclenchement au moment où l’agent consomme les messages envoyés sur le bus.

Cette fonction sera adaptée par les agents de publications spécialisés qui hériteront de cette classe générique.

Paramètres:
  • ch (Channel) – bus connecté au serveur RabbitMQ

  • method (spec.Basic.Deliver) – Propriétés de base de l’envoi des messages sur le bus

  • props (spec.BasicProperties) – Propriétés de base de la construction du protocole AMQP

  • body (bytes) – message reçu

process_message(message: dict) tuple[dict, gpf_pyroger.constants.Status]#

Fonction de traitement du message envoyé sur le bus.

On vérifie les cas de WARNING puis on applique la méthode spécifique adéquate

Paramètres:

message (dict) – offre courante à traiter

Renvoie:

Status (Status): Status retourné (OK|ERROR|WARN) après que les opérations nécessaires à la diffusion de l’offre ait été effectuées

Type renvoyé:

Offering (dict)

abstract publish_offering(offering: dict) Status#
publish_response(offering: dict, status: Status) dict#

Cette fonction publie une réponse sur le bus de topic publication-endpoint-update.

Paramètres:
  • offering (dict) – structure JSON récupérée depuis l’API entrepôt

  • status (Status) – message renvoyé après la diffusion d’une offre et vaut OK, WARN ou ERROR

Renvoie:

la réponse que publie l’agent après diffusion d’une offre

Type renvoyé:

dict

read_health_file() str#
Lis le message de statut dans le fichier de santé de l’agent. Seulement si

le fichier de santé (défini à l’initialisation de l’objet via le paramètre health_filepath) existe.

Renvoie:

statut du fichier

Type renvoyé:

str

reconnect_to_entrepot() None#

Cette fonction permet de se reconnecter, notamment d’un StreamLostError, à la queue créée par la fonction connect_to_entrepot.

send_message_to_mattermost_webhook(message: str)#

Envoi un message au webhook mattermost

Args

message (str): Le message à envoyer

static set_type_resp(message: dict) dict#

Fonction de remplissage du champ type_resp du message à renvoyer

Paramètres:

message (dict) – message à traiter

Renvoie:

le message modifié

Type renvoyé:

message (dict)

start_consuming_received_offers() None#

Cette fonction permet de traiter les messages qui se sont accumulés dans la file d’attente du bus.

Par défaut, on applique un délai de 60s entre chaque tentative de reconnexion

stop_consuming_and_ko()#
supply_service() list#

Cette fonction est appelée au démarrage d’un agent de publication.

Elle appelle dans un premier temps une API qui liste les offres associées au endpoint. Elle va récupérer et traiter les offres au statut PUBLISHED, c’est-à-dire qui ont déjà été publiées. Cela permet au service qui démarre de publier toutes les offres qui l’ont été dans le passé.

Renvoie:

l’ensemble des offres (ressources = couches = configurations ) que l’agent doit publier

Type renvoyé:

list

abstract synchronize_offering(offering: dict) Status#
abstract unpublish_offering(offering: dict) Status#
write_health_file(status_msg: str = 'OK') Path | None#
Ecrit le message de statut dans le fichier de santé de l’agent. Seulement si

le fichier de santé (défini à l’initialisation de l’objet via le paramètre health_filepath) existe.

Paramètres:

status_msg (str, optional) – message de statut à écrire. Defaults to « OK ».

Renvoie:

chemin du fichier de statut

Type renvoyé:

Path

write_response(offering: dict, status: Status) dict#

Cette fonction rédige un message une fois que les opérations de publication ont été effectuées.

Paramètres:
  • offering (dict) – structure JSON récupérée depuis l’API entrepôt

  • status (Status) – message renvoyé après la diffusion d’une offre et vaut OK, WARN ou ERROR

Renvoie:

la réponse que publie l’agent après diffusion d’une offre

Type renvoyé:

dict