gpf_pyroger.publish_agent_core module#
Classe générique destinée à être héritée par les agents de publication pour les échanges avec le BUS (RabbitMQ) de l’Entrepôt.
- class gpf_pyroger.publish_agent_core.PublishAgent(endpoint_name: str, pod_id: str, url_entrepot: str, exchange_name: str = 'my_exchange', publication_result_topic: str = 'publication-endpoint-update', health_filepath: Path | None = None, host: str = 'localhost', port: int = 5672, keycloack_user_id: str | None = None, user_agent: str = 'PyRoger/1.8.0', rabbit_login: str | None = None, rabbit_password: str | None = None, service_url_health_check: str = '', listening_mode: bool = True, consistency_checking_timer: int = 300, consistency_checking_enabled: bool = True, mattermost_webhook_scheme: str = 'http', mattermost_webhook_host: str = 'mattermost.gpf-tech.ign.fr', mattermost_webhook_port: int = 80, mattermost_webhook_token: str | None = None, mattermost_request_timeout: int = 60, types_config: list[str] | None = None)#
Bases :
object
Cette classe définit un agent de publication.
Un agent de publication reçoit les messages de publications envoyés par l’API Entrepôt, il effectue toutes les opérations spécifiques nécessaires à la publication, et renvoie un message lorsque toutes les opérations sont effectuées.
- __init__(endpoint_name: str, pod_id: str, url_entrepot: str, exchange_name: str = 'my_exchange', publication_result_topic: str = 'publication-endpoint-update', health_filepath: Path | None = None, host: str = 'localhost', port: int = 5672, keycloack_user_id: str | None = None, user_agent: str = 'PyRoger/1.8.0', rabbit_login: str | None = None, rabbit_password: str | None = None, service_url_health_check: str = '', listening_mode: bool = True, consistency_checking_timer: int = 300, consistency_checking_enabled: bool = True, mattermost_webhook_scheme: str = 'http', mattermost_webhook_host: str = 'mattermost.gpf-tech.ign.fr', mattermost_webhook_port: int = 80, mattermost_webhook_token: str | None = None, mattermost_request_timeout: int = 60, types_config: list[str] | None = None)#
Constructeur de la classe PublishAgent.
Les paramètres à entrer sont principalement issus de variables d’environnement.
- Paramètres:
endpoint_name (str) – nom du endpoint, fourni par la variable d’environnement GPF_ENDPOINT_NAME
pod_id (str) – nom de l’identifiant de service, fourni par la variable d’environnement GEOSERVICE_POD_NAME
url_entrepot (str) – URL de l’Entrepôt, fournie par la variable d’environnement GPF_WAREHOUSE_URL
exchange_name (str) – nom de l’exchange du bus sur lequel écouter, vaut « my_exchange » par défaut. Ne doit pas être une chaîne vide.
publication_result_topic (str) – topic du bus sur lequel l’agent envoie une réponse, fourni par la variable d’environnement PUBLICATION_RESULT_TOPIC
health_filepath (str) – chemin vers le fichier où stocker le statut de l’agent au démarrage
host (str) – nom de l’hôte du serveur RabbitMQ, par défaut vaut « localhost »
port (int) – port du serveur rabbitMQ, vaut 5672 par défaut
keycloack_user_id (str) – id de l’utilisateur technique qui a le droits d’utiliser les appels en /technical
user_agent (str) – nom du user_agent à envoyer lors des requètes vers l’API Entrepot
rabbit_login (str) – utilisateur du bus RabbitMQ
rabbit_password (str) – mot de passe pour le bus RabbitMQ
service_url_health_check (str, optional) – url de vérification de l’état du service, fourni par une variable GEOSERVICE_URL_HEALTHCHECK
listening_mode (bool, optional) – lance l’attente de réception de nouveaux messages. Vaut True par défaut.
consistency_checking_timer (int) – temps de repos (en secondes) entre deux vérification de cohérence
mattermost_webhook_scheme (str) – schéma de l’url du webhook mattermost
mattermost_webhook_host (str) – host du webhook mattermost
mattermost_webhook_port (int) – port du webhook mattermost
mattermost_webhook_token (str) – token du webhook mattermost
types_config (list[str]) – type(s) de offering possible sur l’agent
- connect_to_entrepot() None #
Cette fonction met en place un bus d’écoute vers l’entrepôt. Elle enregistre les JSON correspondant aux nouvelles offres qui doivent être publiées par les agents.
Attention, cette fonction ne fait que réceptionner les offres, mais elle ne les lit pas.
- consistency_control()#
- get_from_api_details_offering(id_offering: str) dict | None #
Fonction qui appelle l’API entrepôt pour récupérer le détail d’une offre.
- Paramètres:
id_offering (str) – Identifiant de l’offre
- Renvoie:
détail de l’offre
- Type renvoyé:
dict
Exemple
# structure d'une offre { _id : "" type: "" configuration_type: "" layer_name: ", type_infos: {}, metadata : [ { format: "", url: "", type: "" } ], attribution : { title: "" url: "" logo: { format: "", url: "", width: None, height: None, } } }
- get_from_api_offerings() list #
Fonction qui appelle l’API entrepôt pour lister les offres actuellements publiées ou en cours de publication sur son point d’accès.
- Renvoie:
Liste des offres
- Type renvoyé:
list
- abstract get_published_offerings() list[str] #
Fonction qui récupère la liste des offres publiées sur le serveur.
Attention, cette méthode est à implémenter dans les agents spécifiques qui héritent d’une classe PublishAgent de Pyroger.
- Renvoie:
La liste des offres publiées.
- Type renvoyé:
List[str]
- abstract is_offering_published(offering: dict) bool #
Fonction qui vérifie si une offre est déjà publiée sur le serveur.
Attention, cette méthode est à implémenter dans les agents spécifiques qui héritent d’une classe PublishAgent de Pyroger.
- Paramètres:
offering (dict) – offre courante
- Renvoie:
True si l’offre a déjà été publiée précédemment, False sinon.
- Type renvoyé:
bool
- is_service_available() bool #
Vérifie si le service est disponible.
- Paramètres:
service_url_health_check (str) – url de vérification de l’état du service
- Renvoie:
True si service disponible, False autrement
- Type renvoyé:
bool
- on_message_callback(ch: Channel, method: Deliver, props: BasicProperties, body: bytes)#
Fonction de déclenchement au moment où l’agent consomme les messages envoyés sur le bus.
Cette fonction sera adaptée par les agents de publications spécialisés qui hériteront de cette classe générique.
- Paramètres:
ch (Channel) – bus connecté au serveur RabbitMQ
method (spec.Basic.Deliver) – Propriétés de base de l’envoi des messages sur le bus
props (spec.BasicProperties) – Propriétés de base de la construction du protocole AMQP
body (bytes) – message reçu
- process_message(message: dict) tuple[dict, gpf_pyroger.constants.Status] #
Fonction de traitement du message envoyé sur le bus.
On vérifie les cas de WARNING puis on applique la méthode spécifique adéquate
- Paramètres:
message (dict) – offre courante à traiter
- Renvoie:
Status (Status): Status retourné (OK|ERROR|WARN) après que les opérations nécessaires à la diffusion de l’offre ait été effectuées
- Type renvoyé:
Offering (dict)
- publish_response(offering: dict, status: Status) dict #
Cette fonction publie une réponse sur le bus de topic publication-endpoint-update.
- Paramètres:
offering (dict) – structure JSON récupérée depuis l’API entrepôt
status (Status) – message renvoyé après la diffusion d’une offre et vaut OK, WARN ou ERROR
- Renvoie:
la réponse que publie l’agent après diffusion d’une offre
- Type renvoyé:
dict
- read_health_file() str #
- Lis le message de statut dans le fichier de santé de l’agent. Seulement si
le fichier de santé (défini à l’initialisation de l’objet via le paramètre health_filepath) existe.
- Renvoie:
statut du fichier
- Type renvoyé:
str
- reconnect_to_entrepot() None #
Cette fonction permet de se reconnecter, notamment d’un StreamLostError, à la queue créée par la fonction connect_to_entrepot.
- send_message_to_mattermost_webhook(message: str)#
Envoi un message au webhook mattermost
- Args
message (str): Le message à envoyer
- static set_type_resp(message: dict) dict #
Fonction de remplissage du champ type_resp du message à renvoyer
- Paramètres:
message (dict) – message à traiter
- Renvoie:
le message modifié
- Type renvoyé:
message (dict)
- start_consuming_received_offers() None #
Cette fonction permet de traiter les messages qui se sont accumulés dans la file d’attente du bus.
Par défaut, on applique un délai de 60s entre chaque tentative de reconnexion
- stop_consuming_and_ko()#
- supply_service() list #
Cette fonction est appelée au démarrage d’un agent de publication.
Elle appelle dans un premier temps une API qui liste les offres associées au endpoint. Elle va récupérer et traiter les offres au statut PUBLISHED, c’est-à-dire qui ont déjà été publiées. Cela permet au service qui démarre de publier toutes les offres qui l’ont été dans le passé.
- Renvoie:
l’ensemble des offres (ressources = couches = configurations ) que l’agent doit publier
- Type renvoyé:
list
- write_health_file(status_msg: str = 'OK') Path | None #
- Ecrit le message de statut dans le fichier de santé de l’agent. Seulement si
le fichier de santé (défini à l’initialisation de l’objet via le paramètre health_filepath) existe.
- Paramètres:
status_msg (str, optional) – message de statut à écrire. Defaults to « OK ».
- Renvoie:
chemin du fichier de statut
- Type renvoyé:
Path
- write_response(offering: dict, status: Status) dict #
Cette fonction rédige un message une fois que les opérations de publication ont été effectuées.
- Paramètres:
offering (dict) – structure JSON récupérée depuis l’API entrepôt
status (Status) – message renvoyé après la diffusion d’une offre et vaut OK, WARN ou ERROR
- Renvoie:
la réponse que publie l’agent après diffusion d’une offre
- Type renvoyé:
dict