Où on en est
À ce stade du dossier téléinfo :
- l’interface électronique (démodulateur ASK + Raspberry Pi 3) capte le signal TIC et le restitue en série TTL sur
/dev/ttyAMA0; - le programme
raspjsonlit ce flux, décode les trames, vérifie les checksums et émet une trame JSON par cycle sur sa sortie standard ; - l’unité de traitement principale de la maison — c’est Home Assistant, conformément à la stack domotique mise en place dans les chapitres précédents — attend ces données pour les historiser, déclencher des automatisations et les afficher.
Reste un maillon : transporter chaque trame JSON depuis raspjson jusqu’à Home Assistant. Cet article décrit comment, et surtout comment ne perdre aucune trame quand le réseau ou l’unité de traitement font défaut.
┌────────────┐ JSON sur ┌──────────┐ MQTT ┌─────────────────┐
│ raspjson │ ─── stdout ──► │ relais │ ──────────► │ broker MQTT │
└────────────┘ └──────────┘ │ (Mosquitto) │
└────────┬────────┘
│ abonnement
▼
┌─────────────────┐
│ Home Assistant │
└─────────────────┘
Le broker MQTT permettra plus tard d’ajouter d’autres consommateurs sans toucher ni à
raspjson, ni au relais, ni à Home Assistant. C’est ce que fait le chapitre optionnel sur l’historisation PostgreSQL : un second abonné au même topic.
Choix du transport : pourquoi MQTT
Tout le livre s’appuie sur un broker MQTT (Mosquitto) comme épine dorsale de communication entre les dispositifs et l’unité de traitement (voir les chapitres MQTT, Broker MQTT, Mosquitto). Il est naturel d’y faire arriver aussi les trames TIC.
MQTT colle bien au cas d’usage :
- publication asynchrone :
raspjsonn’a pas à attendre que Home Assistant ait fini de traiter pour passer à la trame suivante ; - découplage : on peut ajouter ou retirer des consommateurs (Home Assistant, démon SQL, dashboard alternatif…) sans toucher à l’émetteur ;
- QoS configurable : on choisit le compromis entre rapidité et garantie de livraison.
Convention de topic adoptée dans la suite :
maison/energie/compteur/tic
Cette structure hiérarchique s’aligne sur les autres topics du livre (maison/<pièce>/<dispositif>/<grandeur>).
Le problème de résilience
Le compteur émet une trame TIC toutes les secondes environ. L’émetteur (raspjson + son relais) doit donc envoyer une trame par seconde vers le broker. Plusieurs scénarios peuvent perturber cette chaîne :
| Scénario | Cause typique | Effet sans précaution |
|---|---|---|
| Débit entrant trop élevé | Pic ponctuel de trames valides | Saturation du relais, trames droppées |
| Cible lente | Charge sur le broker, latence réseau | Retard cumulatif, files mémoire qui gonflent |
| Cible injoignable | Broker arrêté, panne réseau, reboot Pi | Perte sèche des trames émises pendant l’incident |
Le principe directeur :
Découpler la capture de la transmission.
raspjsonécrit ce qu’il lit, sans se soucier de la cible. Un processus indépendant lit ce qui a été écrit et tente de l’envoyer.
Concrètement : raspjson écrit chaque trame dans un journal (fichier append-only), et un relais MQTT parcourt ce journal pour publier. Si le relais ne peut pas publier (broker absent, réseau coupé), il s’arrête et reprendra plus tard, là où il s’était arrêté. Le journal sert de tampon persistant : aucune trame n’est perdue, même si le Raspberry Pi redémarre pendant un incident.
Pourquoi un fichier append-only plutôt qu’un FIFO
Plusieurs approches sont possibles pour découpler l’écriture et la lecture :
| Solution | Persistance reboot | Complexité | Diagnostic |
|---|---|---|---|
Pipe Unix \| direct |
❌ | très faible | impossible |
FIFO nommée (mkfifo) |
❌ | faible | difficile |
| Fichier append-only + curseur | ✅ | faible | facile (tail, grep) |
| Redis / SQLite / RabbitMQ | ✅ | élevée | dépendance lourde |
Pour un débit d’une trame par seconde, le fichier append-only est imbattable : il survit aux reboots, on peut le lire avec n’importe quel outil shell pour déboguer, et la rotation logarithmique se fait avec logrotate.
Une ébauche fréquente consiste à supprimer la ligne du fichier après envoi, avec
sed -i '1d'. Mauvaise idée :sed -iréécrit tout le fichier à chaque ligne, ce qui devient catastrophique au-delà de quelques milliers de lignes, et crée une fenêtre de course avec l’écriture concurrente deraspjson. On n’écrit pas et on ne modifie pas le journal côté relais : on mémorise une position de lecture (un curseur).
Mise en place
1. Démarrer raspjson en écriture sur le journal
mkdir -p /var/log/raspjson
raspjson >> /var/log/raspjson/tic.ndjson 2>> /var/log/raspjson/tic.err &
Le format NDJSON (Newline Delimited JSON, un JSON par ligne) est idéal : facile à concaténer, facile à lire ligne par ligne, supporté nativement par les outils Unix.
Une rotation logrotate quotidienne s’occupera de garder une taille raisonnable :
# /etc/logrotate.d/raspjson
/var/log/raspjson/tic.ndjson {
daily
rotate 7
compress
copytruncate
missingok
notifempty
}
copytruncate est important : il copie le fichier puis le tronque sur place, sans casser le handle ouvert par raspjson. Le relais, lui, devra détecter la troncature (voir plus bas).
2. Relais minimal en bash
Version simple, utile pour comprendre le pattern. On suppose mosquitto_pub installé (paquet mosquitto-clients).
#!/bin/bash
# /usr/local/bin/tic-relay.sh
set -euo pipefail
JOURNAL="/var/log/raspjson/tic.ndjson"
CURSOR="/var/lib/tic-relay/cursor"
BROKER="localhost"
TOPIC="maison/energie/compteur/tic"
mkdir -p "$(dirname "$CURSOR")"
[[ -f "$CURSOR" ]] || echo 0 > "$CURSOR"
while true; do
pos=$(cat "$CURSOR")
# Suit le fichier à partir de la position connue
tail -c "+$((pos + 1))" "$JOURNAL" | while IFS= read -r line; do
if mosquitto_pub -h "$BROKER" -t "$TOPIC" -m "$line" -q 1; then
# +1 pour le \n consommé par read
pos=$((pos + ${#line} + 1))
echo "$pos" > "$CURSOR"
else
echo "publication échouée, on retentera" >&2
sleep 5
break
fi
done
sleep 1
done
Limites de cette version :
- ne détecte pas la rotation du journal (fichier tronqué = position invalide) ;
- relance un
tailtoutes les secondes même quand rien n’arrive ; - pas de gestion fine de signaux pour un arrêt propre.
Suffisant pour un prototype, insuffisant pour mettre en service avec systemd. D’où la version Python.
3. Relais robuste en Python
Cette version utilise paho-mqtt (paquet python3-paho-mqtt), même bibliothèque que celle déployée pour les autres briques du livre.
#!/usr/bin/env python3
"""Relais entre le journal NDJSON de raspjson et le broker MQTT."""
import json
import logging
import os
import signal
import sys
import time
from pathlib import Path
import paho.mqtt.client as mqtt
JOURNAL = Path("/var/log/raspjson/tic.ndjson")
CURSOR = Path("/var/lib/tic-relay/cursor")
BROKER = "localhost"
PORT = 1883
TOPIC = "maison/energie/compteur/tic"
QOS = 1
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("tic-relay")
running = True
def stop(*_):
global running
running = False
log.info("arrêt demandé")
signal.signal(signal.SIGTERM, stop)
signal.signal(signal.SIGINT, stop)
def load_cursor() -> int:
try:
return int(CURSOR.read_text().strip())
except (FileNotFoundError, ValueError):
return 0
def save_cursor(pos: int) -> None:
CURSOR.parent.mkdir(parents=True, exist_ok=True)
tmp = CURSOR.with_suffix(".tmp")
tmp.write_text(str(pos))
tmp.replace(CURSOR) # écriture atomique
def detect_truncation(pos: int) -> int:
"""Si le fichier a été tronqué (rotation), repart de zéro."""
size = JOURNAL.stat().st_size if JOURNAL.exists() else 0
if pos > size:
log.warning("journal tronqué (pos=%d, taille=%d), réinit", pos, size)
return 0
return pos
def main() -> int:
client = mqtt.Client(client_id="tic-relay", clean_session=True)
client.connect(BROKER, PORT, keepalive=60)
client.loop_start()
pos = load_cursor()
with JOURNAL.open("rb") as fh:
while running:
pos = detect_truncation(pos)
fh.seek(pos)
line = fh.readline()
if not line:
time.sleep(1)
continue
if not line.endswith(b"\n"):
# ligne en cours d'écriture par raspjson, on attend
time.sleep(0.2)
continue
payload = line.decode("utf-8", errors="replace").rstrip("\n")
try:
json.loads(payload) # validation
except json.JSONDecodeError as e:
log.error("ligne invalide ignorée (offset=%d): %s", pos, e)
pos += len(line)
save_cursor(pos)
continue
info = client.publish(TOPIC, payload, qos=QOS)
info.wait_for_publish(timeout=10)
if info.rc != mqtt.MQTT_ERR_SUCCESS:
log.warning("publication échouée (rc=%s), nouvel essai", info.rc)
time.sleep(5)
continue
pos += len(line)
save_cursor(pos)
client.loop_stop()
client.disconnect()
return 0
if __name__ == "__main__":
sys.exit(main())
Choix techniques :
- QoS 1 côté publication : le broker confirme la réception, ce qui garantit qu’aucune trame n’est perdue entre le relais et le broker. QoS 2 (exactly-once) serait plus strict mais plus coûteux ; QoS 0 perdrait des trames en cas de coupure réseau brève.
- Curseur écrit atomiquement via
os.replace()sur un fichier temporaire : un crash en plein milieu ne laisse jamais un curseur partiel. - Détection de troncature : si la taille du journal est inférieure à la position courante, c’est qu’il y a eu rotation — on repart de zéro.
- Validation JSON avant publication : une ligne corrompue n’est pas envoyée mais on avance quand même le curseur, sinon le relais resterait bloqué dessus.
- Gestion SIGTERM pour un arrêt propre via
systemd.
Déploiement avec systemd
Une unité simple pour le relais :
# /etc/systemd/system/tic-relay.service
[Unit]
Description=Relais MQTT pour les trames TIC
After=network.target mosquitto.service
Wants=mosquitto.service
[Service]
Type=simple
ExecStart=/usr/local/bin/tic-relay.py
Restart=on-failure
RestartSec=5
User=pi
[Install]
WantedBy=multi-user.target
Activation :
sudo systemctl daemon-reload
sudo systemctl enable --now tic-relay
sudo journalctl -u tic-relay -f
On peut faire la même chose pour
raspjsonlui-même, avec une unité qui redirigestdoutvers le journal etRestart=always.
Côté Home Assistant
L’abonnement se fait via une intégration mqtt standard. Exemple de capteurs dans configuration.yaml (extrait — on n’expose ici que quelques étiquettes représentatives) :
mqtt:
sensor:
- name: "Puissance apparente"
state_topic: "maison/energie/compteur/tic"
unit_of_measurement: "VA"
device_class: apparent_power
value_template: "{{ value_json.PAPP }}"
- name: "Intensité instantanée"
state_topic: "maison/energie/compteur/tic"
unit_of_measurement: "A"
device_class: current
value_template: "{{ value_json.IINST }}"
- name: "Index heures creuses"
state_topic: "maison/energie/compteur/tic"
unit_of_measurement: "Wh"
device_class: energy
state_class: total_increasing
value_template: "{{ value_json.HCHC }}"
- name: "Index heures pleines"
state_topic: "maison/energie/compteur/tic"
unit_of_measurement: "Wh"
device_class: energy
state_class: total_increasing
value_template: "{{ value_json.HCHP }}"
state_class: total_increasing est important sur les index : c’est ce qui permet à Home Assistant de calculer la consommation par période dans le tableau de bord Énergie.
Vérifier que ça marche
Trois points de contrôle, du bas vers le haut :
# 1. raspjson écrit-il dans le journal ?
tail -f /var/log/raspjson/tic.ndjson
# 2. Le broker reçoit-il les publications ?
mosquitto_sub -h localhost -t 'maison/energie/compteur/tic' -v
# 3. Home Assistant met-il les capteurs à jour ?
# → Paramètres → Appareils → MQTT → vérifier les "dernières mises à jour"
Si (1) défile mais pas (2), regarder journalctl -u tic-relay. Si (2) défile mais pas (3), revoir les templates value_json.* côté Home Assistant.
Suite
La téléinfo est intégrée à la stack domotique : raspjson produit, le relais transporte, Home Assistant consomme. C’est suffisant pour la plupart des usages.
Pour qui veut un historique long avec accès SQL complet, le chapitre suivant (optionnel) ajoute un second abonné au broker MQTT : un démon qui insère chaque trame dans une base PostgreSQL. C’est l’occasion d’illustrer ce que MQTT apporte concrètement : ajouter un consommateur sans toucher à quoi que ce soit en amont.
Commentaires
Aucun commentaire pour l'instant. Soyez le premier !
Laisser un commentaire