Ir al contenido

← Concentrador MQTT en Node-RED para flotas de PLC ESP32

Monitorización textil (tejeduría)Servidor (Node-RED)MQTTHTTPInfraestructura

Concentrador MQTT en Node-RED para flotas de PLC ESP32 — ejemplo completo

Concentrador MQTT en Node-RED para flotas de PLC ESP32: validación de payloads, deduplicación, backups en fichero, recuperación por HTTP y salud de la flota.

Programa completo y ejecutable para el Servidor (Node-RED) (node-red-multi-plant-concentrator.js): incluye cabecera de conexionado, requisitos y notas de integración.

Descarga el pack completo del proyecto — gratisEste ejemplo + los relacionados + lista de materiales

Vista de solo lectura.

/*
 * COMPLETE EXAMPLE — Multi-plant Node-RED concentrator (function nodes)
 *
 * Hardware: Server (Node-RED, port 1880) — one per plant or central
 * Based on: textile monitoring project, flows of the two plants
 *
 * Flow wiring (standard nodes + these function nodes):
 *
 *   [mqtt in planta/+/data]    --> [F1 validateAndEnrich] --> [F3 backupToFile] --> [file]
 *   [mqtt in planta/+/events]  --> [F1 validateAndEnrich] --> [F3 backupToFile] --> [file]
 *                                          |--> (output 2: discards) --> [F3] --> [file]
 *
 *   [http in POST /get_data]   --> [F2 receiveSDFile] --> [file] + [http response]
 *
 *   [inject every 5 min]       --> [F4 checkFleet] --> [dashboard / alarm]
 *
 *   The MQTT broker (e.g. Mosquitto) runs on the same machine. Each PLC
 *   publishes to planta/plcN/data and planta/plcN/events. Both plants use
 *   the same flow: only the PLANT variable of the configuration node changes.
 *
 * Works together with other catalog examples:
 *   - mqtt-events-sd-buffering.ino (source of the data)
 *   - sd-file-upload-http-post.ino (clients of the /get_data endpoint)
 *   - mqtt-remote-commands.ino (the console publishes to esp32/{id}/comands)
 */

// ============================================================ F1
// "validateAndEnrich" — 2 outputs: [valid, discards]
// Validates the PLC JSON, deduplicates by id_msg and adds plant metadata
const PLANT = env.get("PLANT") || "plant1";      // same logic in both plants

let data;
try {
  data = (typeof msg.payload === "string") ? JSON.parse(msg.payload) : msg.payload;
} catch (e) {
  msg.error = "invalid JSON: " + e.message;
  return [null, msg];                            // output 2: discards
}

// Minimum fields sent by the firmware
if (data.id === undefined) {
  msg.error = "missing PLC id";
  return [null, msg];
}

// Deduplication by id_msg (the PLC re-sends after reconnection / SD replay)
if (data.id_msg) {
  const seen = context.get("seen") || {};
  if (seen[data.id_msg]) {
    msg.error = "duplicate: " + data.id_msg;
    return [null, msg];
  }
  seen[data.id_msg] = Date.now();
  // Purge: keep only the last hour of IDs so the map does not grow unbounded
  const oneHourAgo = Date.now() - 3600 * 1000;
  for (const k in seen) if (seen[k] < oneHourAgo) delete seen[k];
  context.set("seen", seen);
}

// Enrich with concentrator metadata
data.plant = PLANT;
data.ts_server = new Date().toISOString();
data.source_topic = msg.topic;

msg.payload = data;
return [msg, null];                              // output 1: valid

// ============================================================ F2
// "receiveSDFile" — POST /get_data endpoint with basic auth
// Receives the daily files the PLC recovers from its SD card after an outage
const auth = msg.req.headers["authorization"] || "";
const expected = "Basic " + Buffer.from(
  env.get("HTTP_USER") + ":" + env.get("HTTP_PASS")).toString("base64");

if (auth !== expected) {
  msg.statusCode = 401;
  msg.payload = "Unauthorized";
  return [null, msg];                            // output 2: http response
}

const fileName = msg.req.headers["x-file-name"] || ("no_name_" + Date.now() + ".json");
const plcIP = msg.req.connection.remoteAddress;

// output 1 -> file node (writes to /data/recovered//)
const fileMsg = {
  payload: msg.payload,
  filename: "/data/recovered/" + (env.get("PLANT") || "plant1") +
            "/" + fileName.replace(/[^A-Za-z0-9._-]/g, "_")
};

node.log("Recovered " + fileName + " from " + plcIP +
         " (" + (msg.payload ? msg.payload.length : 0) + " bytes)");

msg.statusCode = 200;
msg.payload = "OK";
return [fileMsg, msg];

// ============================================================ F3
// "backupToFile" — everything received and everything discarded ends on disk
// One file per day and per type, mirroring the SD cards of the PLCs
const today = new Date().toISOString().slice(0, 10);        // YYYY-MM-DD
const type = msg.error ? "discarded" : "received";

msg.filename = "/data/backup/" + (env.get("PLANT") || "plant1") +
               "/" + type + "_" + today + ".json";
msg.payload = JSON.stringify(
  msg.error ? { error: msg.error, raw: msg.payload } : msg.payload
) + "\n";                                                   // append, 1 line per msg
return msg;

// ============================================================ F4
// "checkFleet" — health check panel: which PLC has stopped talking?
// Triggered by an inject node every 5 min; F1 updates flow.lastSeen
// (add in F1, after validation:  flow.set("seen_" + data.id, Date.now()); )
const SILENCE_LIMIT_MS = 2 * 60 * 1000;          // 2 min without a snapshot = alarm
const EXPECTED_PLCS = [1, 2, 3, 4];              // fleet of this plant

const now = Date.now();
const status = EXPECTED_PLCS.map(id => {
  const seenAt = flow.get("seen_" + id) || 0;
  return {
    plc: id,
    online: (now - seenAt) < SILENCE_LIMIT_MS,
    last_message: seenAt ? new Date(seenAt).toISOString() : "never"
  };
});

const down = status.filter(e => !e.online);
msg.payload = {
  plant: env.get("PLANT") || "plant1",
  ok: down.length === 0,
  down: down.map(e => e.plc),
  detail: status
};
if (down.length) node.warn("PLCs without data: " + down.map(e => e.plc).join(", "));
return msg;
Descarga el pack completo del proyecto — gratisEste ejemplo + los relacionados + lista de materiales