📖 Chapitre 27/27 — La domotique chez soi Voir le sommaire →
← Retour
domotique

Historiser la téléinfo dans PostgreSQL via MQTT

Cédrix · 23/05/2026
Modifié le 23 mai 2026 à 05h02

1/ 7 j  ·  1/ 14 j  ·  1/ 30 j  lecteurs

Pourquoi ce chapitre

Avec les articles précédents, la téléinfo est complète :

  • l’interface électronique fournit un signal TTL sur /dev/ttyAMA0 ;
  • raspjson lit ce signal, décode les trames et émet une trame JSON par cycle ;
  • un relais publie ces trames sur le topic MQTT maison/energie/compteur/tic ;
  • Home Assistant s’y abonne pour afficher la consommation en temps réel et alimenter son tableau de bord Énergie.

Home Assistant suffit pour la plupart des usages. Mais il a deux limites pour qui aime fouiller ses données :

  • son historique est compressé au fil du temps (la granularité diminue après quelques semaines) ;
  • ses requêtes sont limitées au modèle de capteurs qu’il propose — pas de SQL ad hoc, pas de jointure, pas d’agrégat personnalisé.

Pour conserver un historique long avec accès SQL complet, l’idée de ce chapitre est d’ajouter un second abonné au broker MQTT : un petit démon qui écoute le même topic et insère chaque trame dans une base PostgreSQL.

C’est l’occasion d’illustrer concrètement ce que MQTT apporte : plusieurs consommateurs indépendants pour un seul producteur. Home Assistant et le démon PostgreSQL n’ont aucune connaissance l’un de l’autre ; on peut couper l’un sans perturber l’autre.

                                    ┌────────────────┐
                                    │ Home Assistant │
                                    └────────▲───────┘
                                             │ abonnement
   tic.ndjson  ──►  relais MQTT  ──►  broker MQTT
                                             │ abonnement
                                    ┌────────▼───────┐
                                    │  démon SQL     │
                                    └────────┬───────┘
                                             │ INSERT
                                    ┌────────▼───────┐
                                    │   PostgreSQL   │
                                    └────────────────┘

Ce chapitre est optionnel. Si Home Assistant couvre tes besoins, tu peux le sauter et passer au reste du livre. La téléinfo fonctionne déjà.


Choix techniques

Pourquoi PostgreSQL

Plutôt que MySQL, MariaDB ou SQLite :

  • Types riches : timestamptz, numeric à précision arbitraire, jsonb pour archiver la trame brute en complément des colonnes décodées.
  • Intervalles natifs : INTERVAL '1 hour', fonctions de date, fenêtrage SQL — utiles pour les agrégats horaires/journaliers.
  • Robustesse : transactions strictes, intégrité référentielle, pas de surprises sur les conversions.
  • Outils : psql, pg_dump, intégration directe avec Grafana, DBeaver, Metabase.

SQLite serait également défendable pour un usage purement local (zero-config, fichier unique). Le choix de PostgreSQL est cohérent avec le reste du livre.

Pourquoi Python

Le démon est simple, son rôle est de :

  1. s’abonner à un topic MQTT,
  2. parser le JSON entrant,
  3. insérer une ligne en base.

Python + paho-mqtt + psycopg font ça en moins de 100 lignes. C’est par ailleurs la même bibliothèque MQTT que celle déjà utilisée pour le relais (article précédent) — pas de nouvelle dépendance à apprivoiser.


Préparation de la base

Installation

Sur le Raspberry Pi (ou sur une autre machine du réseau — on peut séparer le broker, le démon et la base) :

sudo apt update
sudo apt install postgresql python3-paho-mqtt python3-psycopg2

Le service démarre automatiquement. Vérification :

sudo systemctl status postgresql

Rôle et base

PostgreSQL ne distingue pas « utilisateur » et « groupe » : tout est un rôle. On crée un rôle dédié à l’application puis sa base :

sudo -u postgres psql
CREATE ROLE r_ampere
  WITH LOGIN
  PASSWORD 'This1sN0tAnPwd'
  VALID UNTIL 'infinity';

CREATE DATABASE ampere
  WITH OWNER     = r_ampere
       ENCODING  = 'UTF8'
       LC_COLLATE = 'fr_FR.UTF-8'
       LC_CTYPE   = 'fr_FR.UTF-8'
       TEMPLATE   = template0;

\q

TEMPLATE template0 est nécessaire dès que les locales LC_COLLATE / LC_CTYPE diffèrent de celles de template1 (qui hérite des locales du cluster).

Schéma

Une table principale pour les trames, avec une colonne jsonb pour conserver la trame brute :

\c ampere r_ampere

CREATE TABLE tic_trame (
    id          bigserial    PRIMARY KEY,
    recu_le     timestamptz  NOT NULL DEFAULT now(),

    -- mesures fréquemment requêtées, extraites pour rapidité d'accès
    iinst       smallint,
    papp        integer,
    ptec        varchar(4),
    hchc        bigint,
    hchp        bigint,
    base        bigint,

    -- trame complète pour ne rien perdre
    brute       jsonb        NOT NULL,

    -- contrainte de fraîcheur : pas plus d'une trame par seconde
    CONSTRAINT pas_de_doublon EXCLUDE USING gist (
        tstzrange(recu_le, recu_le + interval '900 ms') WITH &&
    )
);

CREATE INDEX idx_tic_trame_recu_le ON tic_trame (recu_le DESC);
CREATE INDEX idx_tic_trame_brute   ON tic_trame USING gin (brute);

Quelques choix :

  • Colonnes extraites pour les grandeurs les plus consultées (puissance apparente, intensité, période tarifaire, index). Évite de parser le JSON à chaque requête.
  • brute jsonb pour conserver l’intégralité de la trame, même les étiquettes rares (ADPS, PEJP…). Indexé en GIN pour pouvoir requêter WHERE brute ? 'ADPS'.
  • Contrainte d’exclusion sur recu_le : empêche d’insérer deux trames espacées de moins de 900 ms (au cas où le démon se ferait redémarrer pendant qu’il traitait un retard).
  • bigserial pour l’id : avec une trame par seconde, on remplit 4 milliards d’id en 130 ans. Tranquille.

Autorisation

Le démon se connecte à PostgreSQL en mot de passe. On adapte le fichier d’authentification /etc/postgresql/16/main/pg_hba.conf :

local   ampere      r_ampere               scram-sha-256
host    ampere      r_ampere   127.0.0.1/32   scram-sha-256

Puis recharge :

sudo systemctl reload postgresql

Test depuis le compte pi :

psql -U r_ampere -W -d ampere -h localhost

Le démon MQTT → PostgreSQL

#!/usr/bin/env python3
"""Abonné MQTT qui insère les trames TIC dans PostgreSQL."""

import json
import logging
import os
import signal
import sys

import paho.mqtt.client as mqtt
import psycopg2
import psycopg2.extras

BROKER   = os.environ.get("MQTT_BROKER", "localhost")
PORT     = int(os.environ.get("MQTT_PORT", 1883))
TOPIC    = "maison/energie/compteur/tic"
PG_DSN   = os.environ.get("PG_DSN", "dbname=ampere user=r_ampere host=localhost")

logging.basicConfig(level=logging.INFO,
                    format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("tic-sql")

INSERT = """
    INSERT INTO tic_trame (iinst, papp, ptec, hchc, hchp, base, brute)
    VALUES (%(iinst)s, %(papp)s, %(ptec)s, %(hchc)s, %(hchp)s, %(base)s, %(brute)s)
    ON CONFLICT DO NOTHING
"""

def to_int(v):
    try: return int(v) if v is not None else None
    except (TypeError, ValueError): return None


def on_connect(client, _userdata, _flags, rc):
    if rc == 0:
        log.info("connecté au broker, abonnement à %s", TOPIC)
        client.subscribe(TOPIC, qos=1)
    else:
        log.error("connexion MQTT refusée, code=%s", rc)


def on_message(client, userdata, msg):
    conn = userdata["conn"]
    try:
        trame = json.loads(msg.payload.decode("utf-8"))
    except json.JSONDecodeError as e:
        log.warning("trame invalide ignorée: %s", e)
        return

    params = {
        "iinst": to_int(trame.get("IINST")),
        "papp":  to_int(trame.get("PAPP")),
        "ptec":  trame.get("PTEC"),
        "hchc":  to_int(trame.get("HCHC")),
        "hchp":  to_int(trame.get("HCHP")),
        "base":  to_int(trame.get("BASE")),
        "brute": psycopg2.extras.Json(trame),
    }

    try:
        with conn.cursor() as cur:
            cur.execute(INSERT, params)
        conn.commit()
    except psycopg2.Error as e:
        log.error("erreur INSERT: %s", e)
        conn.rollback()


def main() -> int:
    conn = psycopg2.connect(PG_DSN)
    log.info("connecté à PostgreSQL")

    client = mqtt.Client(client_id="tic-sql", clean_session=False,
                          userdata={"conn": conn})
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect(BROKER, PORT, keepalive=60)

    def stop(*_):
        log.info("arrêt demandé")
        client.disconnect()

    signal.signal(signal.SIGTERM, stop)
    signal.signal(signal.SIGINT,  stop)

    client.loop_forever()
    conn.close()
    return 0


if __name__ == "__main__":
    sys.exit(main())

Points à noter :

  • clean_session=False : si le démon se déconnecte, le broker garde les messages QoS 1 jusqu’à sa reconnexion. Combiné au QoS 1 côté relais, on a une garantie at-least-once de bout en bout.
  • ON CONFLICT DO NOTHING : si une trame est livrée deux fois (effet de bord du at-least-once), elle ne provoque pas d’erreur. La contrainte d’exclusion sur recu_le joue le rôle de déduplication temporelle.
  • psycopg2.extras.Json : sérialise correctement le dict Python en jsonb côté base.
  • Pas de retry réseau explicite : loop_forever() gère la reconnexion automatique au broker. Pour PostgreSQL, on se contente de logger l’erreur et de relâcher la trame — la suivante arrive dans une seconde.

Service systemd

# /etc/systemd/system/tic-sql.service
[Unit]
Description=Insertion des trames TIC dans PostgreSQL
After=network.target mosquitto.service postgresql.service
Wants=mosquitto.service postgresql.service

[Service]
Type=simple
ExecStart=/usr/local/bin/tic-sql.py
Restart=on-failure
RestartSec=5
User=pi
Environment=PGPASSWORD=This1sN0tAnPwd

[Install]
WantedBy=multi-user.target

Mettre le mot de passe en variable d’environnement n’est pas idéal. En production on utilisera plutôt un fichier ~/.pgpass (mode 600), qui est lu automatiquement par libpq.


Quelques requêtes utiles

Une fois la base alimentée depuis quelques jours, on peut commencer à interroger.

Puissance apparente sur la dernière heure

SELECT recu_le, papp
FROM   tic_trame
WHERE  recu_le > now() - interval '1 hour'
ORDER  BY recu_le;

Consommation horaire heures pleines / heures creuses

Les index HCHC / HCHP sont cumulatifs. La consommation d’une période, c’est la différence entre l’index de fin et l’index de début.

WITH bornes AS (
    SELECT date_trunc('hour', recu_le)         AS heure,
           min(hchc) AS hchc_debut,
           max(hchc) AS hchc_fin,
           min(hchp) AS hchp_debut,
           max(hchp) AS hchp_fin
    FROM   tic_trame
    WHERE  recu_le > now() - interval '24 hours'
      AND  hchc IS NOT NULL
    GROUP  BY 1
)
SELECT heure,
       hchc_fin - hchc_debut AS conso_hc_wh,
       hchp_fin - hchp_debut AS conso_hp_wh
FROM   bornes
ORDER  BY heure;

Pic de puissance du mois

SELECT recu_le, papp
FROM   tic_trame
WHERE  recu_le >= date_trunc('month', now())
ORDER  BY papp DESC
LIMIT  10;

Étiquettes rares — exemple ADPS (dépassement)

ADPS n’est émis qu’en cas de dépassement de puissance souscrite. Pour retrouver tous ces incidents :

SELECT recu_le, brute -> 'ADPS' AS depassement_a
FROM   tic_trame
WHERE  brute ? 'ADPS'
ORDER  BY recu_le DESC;

L’index GIN sur brute rend cette requête rapide même sur des millions de lignes.


Rotation et taille

Une trame par seconde, c’est 86 400 lignes par jour, 31 millions par an. Ça reste raisonnable pour PostgreSQL sur Raspberry Pi tant qu’on a un SSD ou une carte SD de qualité (les écritures sont fréquentes).

Si l’on veut réduire l’espace disque, deux stratégies :

  • Agrégation continue : créer une table tic_trame_minute qui contient la moyenne / le min / le max sur chaque minute, et purger les trames brutes au-delà de 30 jours.
  • Compression : avec l’extension TimescaleDB, on peut compresser les anciennes partitions tout en gardant la requêtabilité.

Mais à ce stade, on construit déjà une véritable petite plateforme d’observabilité de la consommation électrique — c’est un autre sujet.


Conclusion du dossier téléinfo

À ce point, la chaîne complète est en place :

  1. Bornier TIC du compteur → câble paire torsadée → interface de démodulation.
  2. Interface (opto + MOSFET) → UART du Raspberry Pi.
  3. raspjson → fichier journal NDJSON.
  4. Relais MQTT → broker Mosquitto.
  5. Broker → Home Assistant (temps réel) et démon SQL → PostgreSQL (historique long).

Chaque maillon a un rôle précis et peut être remplacé sans toucher aux autres. C’est ce découplage qui fait l’intérêt d’une architecture MQTT, et qui rend le tout robuste aux pannes individuelles.

Partager : ✉ Mail X in 🐘
Commentaires

Aucun commentaire pour l'instant. Soyez le premier !

Laisser un commentaire
Un code de vérification sera envoyé à votre adresse email.