📖 Chapitre 26/27 — La domotique chez soi Voir le sommaire →
Envoyer la sortie de raspjson vers l'unité de traitement
← Retour
domotique

Envoyer la sortie de raspjson vers l'unité de traitement

Cédrix · 01/01/2021
Modifié le 23 mai 2026 à 05h00

1/ 7 j  ·  1/ 14 j  ·  1/ 30 j  lecteurs

Où on en est

À ce stade du dossier téléinfo :

  • l’interface électronique (démodulateur ASK + Raspberry Pi 3) capte le signal TIC et le restitue en série TTL sur /dev/ttyAMA0 ;
  • le programme raspjson lit ce flux, décode les trames, vérifie les checksums et émet une trame JSON par cycle sur sa sortie standard ;
  • l’unité de traitement principale de la maison — c’est Home Assistant, conformément à la stack domotique mise en place dans les chapitres précédents — attend ces données pour les historiser, déclencher des automatisations et les afficher.

Reste un maillon : transporter chaque trame JSON depuis raspjson jusqu’à Home Assistant. Cet article décrit comment, et surtout comment ne perdre aucune trame quand le réseau ou l’unité de traitement font défaut.

┌────────────┐   JSON sur     ┌──────────┐    MQTT     ┌─────────────────┐
│  raspjson  │ ─── stdout ──► │  relais  │ ──────────► │  broker MQTT    │
└────────────┘                └──────────┘             │  (Mosquitto)    │
                                                       └────────┬────────┘
                                                                │ abonnement
                                                                ▼
                                                       ┌─────────────────┐
                                                       │ Home Assistant  │
                                                       └─────────────────┘

Le broker MQTT permettra plus tard d’ajouter d’autres consommateurs sans toucher ni à raspjson, ni au relais, ni à Home Assistant. C’est ce que fait le chapitre optionnel sur l’historisation PostgreSQL : un second abonné au même topic.


Choix du transport : pourquoi MQTT

Tout le livre s’appuie sur un broker MQTT (Mosquitto) comme épine dorsale de communication entre les dispositifs et l’unité de traitement (voir les chapitres MQTT, Broker MQTT, Mosquitto). Il est naturel d’y faire arriver aussi les trames TIC.

MQTT colle bien au cas d’usage :

  • publication asynchrone : raspjson n’a pas à attendre que Home Assistant ait fini de traiter pour passer à la trame suivante ;
  • découplage : on peut ajouter ou retirer des consommateurs (Home Assistant, démon SQL, dashboard alternatif…) sans toucher à l’émetteur ;
  • QoS configurable : on choisit le compromis entre rapidité et garantie de livraison.

Convention de topic adoptée dans la suite :

maison/energie/compteur/tic

Cette structure hiérarchique s’aligne sur les autres topics du livre (maison/<pièce>/<dispositif>/<grandeur>).


Le problème de résilience

Le compteur émet une trame TIC toutes les secondes environ. L’émetteur (raspjson + son relais) doit donc envoyer une trame par seconde vers le broker. Plusieurs scénarios peuvent perturber cette chaîne :

Scénario Cause typique Effet sans précaution
Débit entrant trop élevé Pic ponctuel de trames valides Saturation du relais, trames droppées
Cible lente Charge sur le broker, latence réseau Retard cumulatif, files mémoire qui gonflent
Cible injoignable Broker arrêté, panne réseau, reboot Pi Perte sèche des trames émises pendant l’incident

Le principe directeur :

Découpler la capture de la transmission. raspjson écrit ce qu’il lit, sans se soucier de la cible. Un processus indépendant lit ce qui a été écrit et tente de l’envoyer.

Concrètement : raspjson écrit chaque trame dans un journal (fichier append-only), et un relais MQTT parcourt ce journal pour publier. Si le relais ne peut pas publier (broker absent, réseau coupé), il s’arrête et reprendra plus tard, là où il s’était arrêté. Le journal sert de tampon persistant : aucune trame n’est perdue, même si le Raspberry Pi redémarre pendant un incident.


Pourquoi un fichier append-only plutôt qu’un FIFO

Plusieurs approches sont possibles pour découpler l’écriture et la lecture :

Solution Persistance reboot Complexité Diagnostic
Pipe Unix \| direct très faible impossible
FIFO nommée (mkfifo) faible difficile
Fichier append-only + curseur faible facile (tail, grep)
Redis / SQLite / RabbitMQ élevée dépendance lourde

Pour un débit d’une trame par seconde, le fichier append-only est imbattable : il survit aux reboots, on peut le lire avec n’importe quel outil shell pour déboguer, et la rotation logarithmique se fait avec logrotate.

Une ébauche fréquente consiste à supprimer la ligne du fichier après envoi, avec sed -i '1d'. Mauvaise idée : sed -i réécrit tout le fichier à chaque ligne, ce qui devient catastrophique au-delà de quelques milliers de lignes, et crée une fenêtre de course avec l’écriture concurrente de raspjson. On n’écrit pas et on ne modifie pas le journal côté relais : on mémorise une position de lecture (un curseur).


Mise en place

1. Démarrer raspjson en écriture sur le journal

mkdir -p /var/log/raspjson
raspjson >> /var/log/raspjson/tic.ndjson 2>> /var/log/raspjson/tic.err &

Le format NDJSON (Newline Delimited JSON, un JSON par ligne) est idéal : facile à concaténer, facile à lire ligne par ligne, supporté nativement par les outils Unix.

Une rotation logrotate quotidienne s’occupera de garder une taille raisonnable :

# /etc/logrotate.d/raspjson
/var/log/raspjson/tic.ndjson {
    daily
    rotate 7
    compress
    copytruncate
    missingok
    notifempty
}

copytruncate est important : il copie le fichier puis le tronque sur place, sans casser le handle ouvert par raspjson. Le relais, lui, devra détecter la troncature (voir plus bas).

2. Relais minimal en bash

Version simple, utile pour comprendre le pattern. On suppose mosquitto_pub installé (paquet mosquitto-clients).

#!/bin/bash
# /usr/local/bin/tic-relay.sh
set -euo pipefail

JOURNAL="/var/log/raspjson/tic.ndjson"
CURSOR="/var/lib/tic-relay/cursor"
BROKER="localhost"
TOPIC="maison/energie/compteur/tic"

mkdir -p "$(dirname "$CURSOR")"
[[ -f "$CURSOR" ]] || echo 0 > "$CURSOR"

while true; do
    pos=$(cat "$CURSOR")
    # Suit le fichier à partir de la position connue
    tail -c "+$((pos + 1))" "$JOURNAL" | while IFS= read -r line; do
        if mosquitto_pub -h "$BROKER" -t "$TOPIC" -m "$line" -q 1; then
            # +1 pour le \n consommé par read
            pos=$((pos + ${#line} + 1))
            echo "$pos" > "$CURSOR"
        else
            echo "publication échouée, on retentera" >&2
            sleep 5
            break
        fi
    done
    sleep 1
done

Limites de cette version :

  • ne détecte pas la rotation du journal (fichier tronqué = position invalide) ;
  • relance un tail toutes les secondes même quand rien n’arrive ;
  • pas de gestion fine de signaux pour un arrêt propre.

Suffisant pour un prototype, insuffisant pour mettre en service avec systemd. D’où la version Python.

3. Relais robuste en Python

Cette version utilise paho-mqtt (paquet python3-paho-mqtt), même bibliothèque que celle déployée pour les autres briques du livre.

#!/usr/bin/env python3
"""Relais entre le journal NDJSON de raspjson et le broker MQTT."""

import json
import logging
import os
import signal
import sys
import time
from pathlib import Path

import paho.mqtt.client as mqtt

JOURNAL = Path("/var/log/raspjson/tic.ndjson")
CURSOR  = Path("/var/lib/tic-relay/cursor")
BROKER  = "localhost"
PORT    = 1883
TOPIC   = "maison/energie/compteur/tic"
QOS     = 1

logging.basicConfig(level=logging.INFO,
                    format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("tic-relay")

running = True
def stop(*_): 
    global running
    running = False
    log.info("arrêt demandé")

signal.signal(signal.SIGTERM, stop)
signal.signal(signal.SIGINT,  stop)


def load_cursor() -> int:
    try:
        return int(CURSOR.read_text().strip())
    except (FileNotFoundError, ValueError):
        return 0

def save_cursor(pos: int) -> None:
    CURSOR.parent.mkdir(parents=True, exist_ok=True)
    tmp = CURSOR.with_suffix(".tmp")
    tmp.write_text(str(pos))
    tmp.replace(CURSOR)            # écriture atomique

def detect_truncation(pos: int) -> int:
    """Si le fichier a été tronqué (rotation), repart de zéro."""
    size = JOURNAL.stat().st_size if JOURNAL.exists() else 0
    if pos > size:
        log.warning("journal tronqué (pos=%d, taille=%d), réinit", pos, size)
        return 0
    return pos


def main() -> int:
    client = mqtt.Client(client_id="tic-relay", clean_session=True)
    client.connect(BROKER, PORT, keepalive=60)
    client.loop_start()

    pos = load_cursor()

    with JOURNAL.open("rb") as fh:
        while running:
            pos = detect_truncation(pos)
            fh.seek(pos)
            line = fh.readline()

            if not line:
                time.sleep(1)
                continue

            if not line.endswith(b"\n"):
                # ligne en cours d'écriture par raspjson, on attend
                time.sleep(0.2)
                continue

            payload = line.decode("utf-8", errors="replace").rstrip("\n")
            try:
                json.loads(payload)            # validation
            except json.JSONDecodeError as e:
                log.error("ligne invalide ignorée (offset=%d): %s", pos, e)
                pos += len(line)
                save_cursor(pos)
                continue

            info = client.publish(TOPIC, payload, qos=QOS)
            info.wait_for_publish(timeout=10)

            if info.rc != mqtt.MQTT_ERR_SUCCESS:
                log.warning("publication échouée (rc=%s), nouvel essai", info.rc)
                time.sleep(5)
                continue

            pos += len(line)
            save_cursor(pos)

    client.loop_stop()
    client.disconnect()
    return 0


if __name__ == "__main__":
    sys.exit(main())

Choix techniques :

  • QoS 1 côté publication : le broker confirme la réception, ce qui garantit qu’aucune trame n’est perdue entre le relais et le broker. QoS 2 (exactly-once) serait plus strict mais plus coûteux ; QoS 0 perdrait des trames en cas de coupure réseau brève.
  • Curseur écrit atomiquement via os.replace() sur un fichier temporaire : un crash en plein milieu ne laisse jamais un curseur partiel.
  • Détection de troncature : si la taille du journal est inférieure à la position courante, c’est qu’il y a eu rotation — on repart de zéro.
  • Validation JSON avant publication : une ligne corrompue n’est pas envoyée mais on avance quand même le curseur, sinon le relais resterait bloqué dessus.
  • Gestion SIGTERM pour un arrêt propre via systemd.

Déploiement avec systemd

Une unité simple pour le relais :

# /etc/systemd/system/tic-relay.service
[Unit]
Description=Relais MQTT pour les trames TIC
After=network.target mosquitto.service
Wants=mosquitto.service

[Service]
Type=simple
ExecStart=/usr/local/bin/tic-relay.py
Restart=on-failure
RestartSec=5
User=pi

[Install]
WantedBy=multi-user.target

Activation :

sudo systemctl daemon-reload
sudo systemctl enable --now tic-relay
sudo journalctl -u tic-relay -f

On peut faire la même chose pour raspjson lui-même, avec une unité qui redirige stdout vers le journal et Restart=always.


Côté Home Assistant

L’abonnement se fait via une intégration mqtt standard. Exemple de capteurs dans configuration.yaml (extrait — on n’expose ici que quelques étiquettes représentatives) :

mqtt:
  sensor:
    - name: "Puissance apparente"
      state_topic: "maison/energie/compteur/tic"
      unit_of_measurement: "VA"
      device_class: apparent_power
      value_template: "{{ value_json.PAPP }}"

    - name: "Intensité instantanée"
      state_topic: "maison/energie/compteur/tic"
      unit_of_measurement: "A"
      device_class: current
      value_template: "{{ value_json.IINST }}"

    - name: "Index heures creuses"
      state_topic: "maison/energie/compteur/tic"
      unit_of_measurement: "Wh"
      device_class: energy
      state_class: total_increasing
      value_template: "{{ value_json.HCHC }}"

    - name: "Index heures pleines"
      state_topic: "maison/energie/compteur/tic"
      unit_of_measurement: "Wh"
      device_class: energy
      state_class: total_increasing
      value_template: "{{ value_json.HCHP }}"

state_class: total_increasing est important sur les index : c’est ce qui permet à Home Assistant de calculer la consommation par période dans le tableau de bord Énergie.


Vérifier que ça marche

Trois points de contrôle, du bas vers le haut :

# 1. raspjson écrit-il dans le journal ?
tail -f /var/log/raspjson/tic.ndjson

# 2. Le broker reçoit-il les publications ?
mosquitto_sub -h localhost -t 'maison/energie/compteur/tic' -v

# 3. Home Assistant met-il les capteurs à jour ?
# → Paramètres → Appareils → MQTT → vérifier les "dernières mises à jour"

Si (1) défile mais pas (2), regarder journalctl -u tic-relay. Si (2) défile mais pas (3), revoir les templates value_json.* côté Home Assistant.


Suite

La téléinfo est intégrée à la stack domotique : raspjson produit, le relais transporte, Home Assistant consomme. C’est suffisant pour la plupart des usages.

Pour qui veut un historique long avec accès SQL complet, le chapitre suivant (optionnel) ajoute un second abonné au broker MQTT : un démon qui insère chaque trame dans une base PostgreSQL. C’est l’occasion d’illustrer ce que MQTT apporte concrètement : ajouter un consommateur sans toucher à quoi que ce soit en amont.

Partager : ✉ Mail X in 🐘
Commentaires

Aucun commentaire pour l'instant. Soyez le premier !

Laisser un commentaire
Un code de vérification sera envoyé à votre adresse email.