Aller au contenu

Tutoriel PGQ

PGQ est une solution de file d’attente s’intégrant à PostgreSQL, faisant partie des Skytools développés à l’origine par Skype. Londiste, un outil de réplication logique, s’appuie sur PGQ. Le tutoriel ci-dessous décrit comment utiliser PGQ pour gérer des processus asynchrones.

Problèmes résolus par PGQ

PGQ permet de gérer le traitement asynchrone des transactions en direct, sans bloquer les transactions en cours. Il est conçu pour différer certains traitements après l’achèvement des transactions, via un système de batchs, en fournissant une API SQL.

Installation et configuration

Pour utiliser PGQ, vous devez exécuter une instance de ticker sur la base de données où les événements sont produits. Cela implique la configuration d’un fichier ticker.ini et l’exécution d’un démon ticker. Le ticker génère des ticks servant de limites pour les batchs d’événements, qu’un consommateur peut demander pour traitement.

Production et consommation d’événements

Assurez-vous d’avoir au moins un consommateur inscrit avant de produire des événements, sinon ceux-ci seront perdus. Vous pouvez avoir plusieurs consommateurs sur une même file d’attente, mais ils traiteront tous les mêmes événements. PGQ propose également des consommateurs coopératifs.

Création d’événements

Les événements peuvent être créés via une API fonctionnelle ou basée sur des déclencheurs, cette dernière étant recommandée pour une validation facile des types de données SQL.

Écriture d’un consommateur PGQ

Il est recommandé d’écrire le code du consommateur en Python. Un consommateur enregistre et désenregistre des files d’attente et traite les événements par lots. Le consommateur doit boucler sur l’appel à pgq.next_batch() jusqu’à ce qu’il reçoive NULL, indiquant qu’il doit se reposer.

Utilisation de l’API SQL de PGQ

  • Inscription et désinscription des consommateurs.
  • Boucle de traitement des événements : obtenir des batchs avec pgq.next_batch(), traiter les événements, marquer les événements comme traités ou à réessayer, et finir les batchs avec pgq.finish_batch().

Consommation distante

Pour traiter les événements sur une base de données différente de celle où ils sont produits, vous devrez gérer les cas où pgq.finish_batch() échoue après un commit réussi. pgq_ext aide à éviter de traiter plusieurs fois le même batch en enregistrant l’identifiant du dernier batch traité.

Traitement non transactionnel

Pour les actions non transactionnelles (comme l’envoi d’emails), la gestion de la fiabilité du traitement incombe au développeur.

Utilisation de l’API Python

Skytools fournit tout le nécessaire pour écrire des consommateurs en Python.

Exemple de consommateur en Python :

import pgq

class RowCounter(pgq.Consumer):
    def process_batch(self, db, batch_id, ev_list):
        tbl = self.cf.get('table_name')
        delta = 0
        for ev in ev_list:
            if ev.type == 'I' and ev.extra1 == tbl:
                delta += 1
            elif ev.type == 'D' and ev.extra1 == tbl:
                delta -= 1
            ev.tag_done()
        q = 'select update_stats(%s, %s)'
        db.cursor().execute(q, [tbl, delta])

RowCounter('row_counter', 'db', sys.argv[1:]).start()

Utilisation de l’API PHP

Un exemple d’implémentation de consommateur en PHP est également disponible.

Documentation supplémentaire

Pour plus de détails, vous pouvez consulter la page originale ici.