Pourquoi ce chapitre
Avec les articles précédents, la téléinfo est complète :
- l’interface électronique fournit un signal TTL sur
/dev/ttyAMA0; raspjsonlit ce signal, décode les trames et émet une trame JSON par cycle ;- un relais publie ces trames sur le topic MQTT
maison/energie/compteur/tic; - Home Assistant s’y abonne pour afficher la consommation en temps réel et alimenter son tableau de bord Énergie.
Home Assistant suffit pour la plupart des usages. Mais il a deux limites pour qui aime fouiller ses données :
- son historique est compressé au fil du temps (la granularité diminue après quelques semaines) ;
- ses requêtes sont limitées au modèle de capteurs qu’il propose — pas de SQL ad hoc, pas de jointure, pas d’agrégat personnalisé.
Pour conserver un historique long avec accès SQL complet, l’idée de ce chapitre est d’ajouter un second abonné au broker MQTT : un petit démon qui écoute le même topic et insère chaque trame dans une base PostgreSQL.
C’est l’occasion d’illustrer concrètement ce que MQTT apporte : plusieurs consommateurs indépendants pour un seul producteur. Home Assistant et le démon PostgreSQL n’ont aucune connaissance l’un de l’autre ; on peut couper l’un sans perturber l’autre.
┌────────────────┐
│ Home Assistant │
└────────▲───────┘
│ abonnement
tic.ndjson ──► relais MQTT ──► broker MQTT
│ abonnement
┌────────▼───────┐
│ démon SQL │
└────────┬───────┘
│ INSERT
┌────────▼───────┐
│ PostgreSQL │
└────────────────┘
Ce chapitre est optionnel. Si Home Assistant couvre tes besoins, tu peux le sauter et passer au reste du livre. La téléinfo fonctionne déjà.
Choix techniques
Pourquoi PostgreSQL
Plutôt que MySQL, MariaDB ou SQLite :
- Types riches :
timestamptz,numericà précision arbitraire,jsonbpour archiver la trame brute en complément des colonnes décodées. - Intervalles natifs :
INTERVAL '1 hour', fonctions de date, fenêtrage SQL — utiles pour les agrégats horaires/journaliers. - Robustesse : transactions strictes, intégrité référentielle, pas de surprises sur les conversions.
- Outils :
psql,pg_dump, intégration directe avec Grafana, DBeaver, Metabase.
SQLite serait également défendable pour un usage purement local (zero-config, fichier unique). Le choix de PostgreSQL est cohérent avec le reste du livre.
Pourquoi Python
Le démon est simple, son rôle est de :
- s’abonner à un topic MQTT,
- parser le JSON entrant,
- insérer une ligne en base.
Python + paho-mqtt + psycopg font ça en moins de 100 lignes. C’est par ailleurs la même bibliothèque MQTT que celle déjà utilisée pour le relais (article précédent) — pas de nouvelle dépendance à apprivoiser.
Préparation de la base
Installation
Sur le Raspberry Pi (ou sur une autre machine du réseau — on peut séparer le broker, le démon et la base) :
sudo apt update
sudo apt install postgresql python3-paho-mqtt python3-psycopg2
Le service démarre automatiquement. Vérification :
sudo systemctl status postgresql
Rôle et base
PostgreSQL ne distingue pas « utilisateur » et « groupe » : tout est un rôle. On crée un rôle dédié à l’application puis sa base :
sudo -u postgres psql
CREATE ROLE r_ampere
WITH LOGIN
PASSWORD 'This1sN0tAnPwd'
VALID UNTIL 'infinity';
CREATE DATABASE ampere
WITH OWNER = r_ampere
ENCODING = 'UTF8'
LC_COLLATE = 'fr_FR.UTF-8'
LC_CTYPE = 'fr_FR.UTF-8'
TEMPLATE = template0;
\q
TEMPLATE template0est nécessaire dès que les localesLC_COLLATE/LC_CTYPEdiffèrent de celles detemplate1(qui hérite des locales du cluster).
Schéma
Une table principale pour les trames, avec une colonne jsonb pour conserver la trame brute :
\c ampere r_ampere
CREATE TABLE tic_trame (
id bigserial PRIMARY KEY,
recu_le timestamptz NOT NULL DEFAULT now(),
-- mesures fréquemment requêtées, extraites pour rapidité d'accès
iinst smallint,
papp integer,
ptec varchar(4),
hchc bigint,
hchp bigint,
base bigint,
-- trame complète pour ne rien perdre
brute jsonb NOT NULL,
-- contrainte de fraîcheur : pas plus d'une trame par seconde
CONSTRAINT pas_de_doublon EXCLUDE USING gist (
tstzrange(recu_le, recu_le + interval '900 ms') WITH &&
)
);
CREATE INDEX idx_tic_trame_recu_le ON tic_trame (recu_le DESC);
CREATE INDEX idx_tic_trame_brute ON tic_trame USING gin (brute);
Quelques choix :
- Colonnes extraites pour les grandeurs les plus consultées (puissance apparente, intensité, période tarifaire, index). Évite de parser le JSON à chaque requête.
brute jsonbpour conserver l’intégralité de la trame, même les étiquettes rares (ADPS,PEJP…). Indexé en GIN pour pouvoir requêterWHERE brute ? 'ADPS'.- Contrainte d’exclusion sur
recu_le: empêche d’insérer deux trames espacées de moins de 900 ms (au cas où le démon se ferait redémarrer pendant qu’il traitait un retard). bigserialpour l’id : avec une trame par seconde, on remplit 4 milliards d’id en 130 ans. Tranquille.
Autorisation
Le démon se connecte à PostgreSQL en mot de passe. On adapte le fichier d’authentification /etc/postgresql/16/main/pg_hba.conf :
local ampere r_ampere scram-sha-256
host ampere r_ampere 127.0.0.1/32 scram-sha-256
Puis recharge :
sudo systemctl reload postgresql
Test depuis le compte pi :
psql -U r_ampere -W -d ampere -h localhost
Le démon MQTT → PostgreSQL
#!/usr/bin/env python3
"""Abonné MQTT qui insère les trames TIC dans PostgreSQL."""
import json
import logging
import os
import signal
import sys
import paho.mqtt.client as mqtt
import psycopg2
import psycopg2.extras
BROKER = os.environ.get("MQTT_BROKER", "localhost")
PORT = int(os.environ.get("MQTT_PORT", 1883))
TOPIC = "maison/energie/compteur/tic"
PG_DSN = os.environ.get("PG_DSN", "dbname=ampere user=r_ampere host=localhost")
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("tic-sql")
INSERT = """
INSERT INTO tic_trame (iinst, papp, ptec, hchc, hchp, base, brute)
VALUES (%(iinst)s, %(papp)s, %(ptec)s, %(hchc)s, %(hchp)s, %(base)s, %(brute)s)
ON CONFLICT DO NOTHING
"""
def to_int(v):
try: return int(v) if v is not None else None
except (TypeError, ValueError): return None
def on_connect(client, _userdata, _flags, rc):
if rc == 0:
log.info("connecté au broker, abonnement à %s", TOPIC)
client.subscribe(TOPIC, qos=1)
else:
log.error("connexion MQTT refusée, code=%s", rc)
def on_message(client, userdata, msg):
conn = userdata["conn"]
try:
trame = json.loads(msg.payload.decode("utf-8"))
except json.JSONDecodeError as e:
log.warning("trame invalide ignorée: %s", e)
return
params = {
"iinst": to_int(trame.get("IINST")),
"papp": to_int(trame.get("PAPP")),
"ptec": trame.get("PTEC"),
"hchc": to_int(trame.get("HCHC")),
"hchp": to_int(trame.get("HCHP")),
"base": to_int(trame.get("BASE")),
"brute": psycopg2.extras.Json(trame),
}
try:
with conn.cursor() as cur:
cur.execute(INSERT, params)
conn.commit()
except psycopg2.Error as e:
log.error("erreur INSERT: %s", e)
conn.rollback()
def main() -> int:
conn = psycopg2.connect(PG_DSN)
log.info("connecté à PostgreSQL")
client = mqtt.Client(client_id="tic-sql", clean_session=False,
userdata={"conn": conn})
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, PORT, keepalive=60)
def stop(*_):
log.info("arrêt demandé")
client.disconnect()
signal.signal(signal.SIGTERM, stop)
signal.signal(signal.SIGINT, stop)
client.loop_forever()
conn.close()
return 0
if __name__ == "__main__":
sys.exit(main())
Points à noter :
clean_session=False: si le démon se déconnecte, le broker garde les messages QoS 1 jusqu’à sa reconnexion. Combiné au QoS 1 côté relais, on a une garantie at-least-once de bout en bout.ON CONFLICT DO NOTHING: si une trame est livrée deux fois (effet de bord du at-least-once), elle ne provoque pas d’erreur. La contrainte d’exclusion surrecu_lejoue le rôle de déduplication temporelle.psycopg2.extras.Json: sérialise correctement le dict Python enjsonbcôté base.- Pas de retry réseau explicite :
loop_forever()gère la reconnexion automatique au broker. Pour PostgreSQL, on se contente de logger l’erreur et de relâcher la trame — la suivante arrive dans une seconde.
Service systemd
# /etc/systemd/system/tic-sql.service
[Unit]
Description=Insertion des trames TIC dans PostgreSQL
After=network.target mosquitto.service postgresql.service
Wants=mosquitto.service postgresql.service
[Service]
Type=simple
ExecStart=/usr/local/bin/tic-sql.py
Restart=on-failure
RestartSec=5
User=pi
Environment=PGPASSWORD=This1sN0tAnPwd
[Install]
WantedBy=multi-user.target
Mettre le mot de passe en variable d’environnement n’est pas idéal. En production on utilisera plutôt un fichier
~/.pgpass(mode 600), qui est lu automatiquement parlibpq.
Quelques requêtes utiles
Une fois la base alimentée depuis quelques jours, on peut commencer à interroger.
Puissance apparente sur la dernière heure
SELECT recu_le, papp
FROM tic_trame
WHERE recu_le > now() - interval '1 hour'
ORDER BY recu_le;
Consommation horaire heures pleines / heures creuses
Les index HCHC / HCHP sont cumulatifs. La consommation d’une période, c’est la différence entre l’index de fin et l’index de début.
WITH bornes AS (
SELECT date_trunc('hour', recu_le) AS heure,
min(hchc) AS hchc_debut,
max(hchc) AS hchc_fin,
min(hchp) AS hchp_debut,
max(hchp) AS hchp_fin
FROM tic_trame
WHERE recu_le > now() - interval '24 hours'
AND hchc IS NOT NULL
GROUP BY 1
)
SELECT heure,
hchc_fin - hchc_debut AS conso_hc_wh,
hchp_fin - hchp_debut AS conso_hp_wh
FROM bornes
ORDER BY heure;
Pic de puissance du mois
SELECT recu_le, papp
FROM tic_trame
WHERE recu_le >= date_trunc('month', now())
ORDER BY papp DESC
LIMIT 10;
Étiquettes rares — exemple ADPS (dépassement)
ADPS n’est émis qu’en cas de dépassement de puissance souscrite. Pour retrouver tous ces incidents :
SELECT recu_le, brute -> 'ADPS' AS depassement_a
FROM tic_trame
WHERE brute ? 'ADPS'
ORDER BY recu_le DESC;
L’index GIN sur brute rend cette requête rapide même sur des millions de lignes.
Rotation et taille
Une trame par seconde, c’est 86 400 lignes par jour, 31 millions par an. Ça reste raisonnable pour PostgreSQL sur Raspberry Pi tant qu’on a un SSD ou une carte SD de qualité (les écritures sont fréquentes).
Si l’on veut réduire l’espace disque, deux stratégies :
- Agrégation continue : créer une table
tic_trame_minutequi contient la moyenne / le min / le max sur chaque minute, et purger les trames brutes au-delà de 30 jours. - Compression : avec l’extension TimescaleDB, on peut compresser les anciennes partitions tout en gardant la requêtabilité.
Mais à ce stade, on construit déjà une véritable petite plateforme d’observabilité de la consommation électrique — c’est un autre sujet.
Conclusion du dossier téléinfo
À ce point, la chaîne complète est en place :
- Bornier TIC du compteur → câble paire torsadée → interface de démodulation.
- Interface (opto + MOSFET) → UART du Raspberry Pi.
raspjson→ fichier journal NDJSON.- Relais MQTT → broker Mosquitto.
- Broker → Home Assistant (temps réel) et démon SQL → PostgreSQL (historique long).
Chaque maillon a un rôle précis et peut être remplacé sans toucher aux autres. C’est ce découplage qui fait l’intérêt d’une architecture MQTT, et qui rend le tout robuste aux pannes individuelles.
Commentaires
Aucun commentaire pour l'instant. Soyez le premier !
Laisser un commentaire