← Concentrador MQTT en Node-RED para flotas de PLC ESP32
Monitorización textil (tejeduría)Servidor (Node-RED)MQTTHTTPInfraestructura
Concentrador MQTT en Node-RED para flotas de PLC ESP32 — ejemplo completo
Concentrador MQTT en Node-RED para flotas de PLC ESP32: validación de payloads, deduplicación, backups en fichero, recuperación por HTTP y salud de la flota.
Programa completo y ejecutable para el Servidor (Node-RED) (node-red-multi-plant-concentrator.js): incluye cabecera de conexionado, requisitos y notas de integración.
Descarga el pack completo del proyecto — gratisEste ejemplo + los relacionados + lista de materiales
Vista de solo lectura.
/*
* COMPLETE EXAMPLE — Multi-plant Node-RED concentrator (function nodes)
*
* Hardware: Server (Node-RED, port 1880) — one per plant or central
* Based on: textile monitoring project, flows of the two plants
*
* Flow wiring (standard nodes + these function nodes):
*
* [mqtt in planta/+/data] --> [F1 validateAndEnrich] --> [F3 backupToFile] --> [file]
* [mqtt in planta/+/events] --> [F1 validateAndEnrich] --> [F3 backupToFile] --> [file]
* |--> (output 2: discards) --> [F3] --> [file]
*
* [http in POST /get_data] --> [F2 receiveSDFile] --> [file] + [http response]
*
* [inject every 5 min] --> [F4 checkFleet] --> [dashboard / alarm]
*
* The MQTT broker (e.g. Mosquitto) runs on the same machine. Each PLC
* publishes to planta/plcN/data and planta/plcN/events. Both plants use
* the same flow: only the PLANT variable of the configuration node changes.
*
* Works together with other catalog examples:
* - mqtt-events-sd-buffering.ino (source of the data)
* - sd-file-upload-http-post.ino (clients of the /get_data endpoint)
* - mqtt-remote-commands.ino (the console publishes to esp32/{id}/comands)
*/
// ============================================================ F1
// "validateAndEnrich" — 2 outputs: [valid, discards]
// Validates the PLC JSON, deduplicates by id_msg and adds plant metadata
const PLANT = env.get("PLANT") || "plant1"; // same logic in both plants
let data;
try {
data = (typeof msg.payload === "string") ? JSON.parse(msg.payload) : msg.payload;
} catch (e) {
msg.error = "invalid JSON: " + e.message;
return [null, msg]; // output 2: discards
}
// Minimum fields sent by the firmware
if (data.id === undefined) {
msg.error = "missing PLC id";
return [null, msg];
}
// Deduplication by id_msg (the PLC re-sends after reconnection / SD replay)
if (data.id_msg) {
const seen = context.get("seen") || {};
if (seen[data.id_msg]) {
msg.error = "duplicate: " + data.id_msg;
return [null, msg];
}
seen[data.id_msg] = Date.now();
// Purge: keep only the last hour of IDs so the map does not grow unbounded
const oneHourAgo = Date.now() - 3600 * 1000;
for (const k in seen) if (seen[k] < oneHourAgo) delete seen[k];
context.set("seen", seen);
}
// Enrich with concentrator metadata
data.plant = PLANT;
data.ts_server = new Date().toISOString();
data.source_topic = msg.topic;
msg.payload = data;
return [msg, null]; // output 1: valid
// ============================================================ F2
// "receiveSDFile" — POST /get_data endpoint with basic auth
// Receives the daily files the PLC recovers from its SD card after an outage
const auth = msg.req.headers["authorization"] || "";
const expected = "Basic " + Buffer.from(
env.get("HTTP_USER") + ":" + env.get("HTTP_PASS")).toString("base64");
if (auth !== expected) {
msg.statusCode = 401;
msg.payload = "Unauthorized";
return [null, msg]; // output 2: http response
}
const fileName = msg.req.headers["x-file-name"] || ("no_name_" + Date.now() + ".json");
const plcIP = msg.req.connection.remoteAddress;
// output 1 -> file node (writes to /data/recovered//)
const fileMsg = {
payload: msg.payload,
filename: "/data/recovered/" + (env.get("PLANT") || "plant1") +
"/" + fileName.replace(/[^A-Za-z0-9._-]/g, "_")
};
node.log("Recovered " + fileName + " from " + plcIP +
" (" + (msg.payload ? msg.payload.length : 0) + " bytes)");
msg.statusCode = 200;
msg.payload = "OK";
return [fileMsg, msg];
// ============================================================ F3
// "backupToFile" — everything received and everything discarded ends on disk
// One file per day and per type, mirroring the SD cards of the PLCs
const today = new Date().toISOString().slice(0, 10); // YYYY-MM-DD
const type = msg.error ? "discarded" : "received";
msg.filename = "/data/backup/" + (env.get("PLANT") || "plant1") +
"/" + type + "_" + today + ".json";
msg.payload = JSON.stringify(
msg.error ? { error: msg.error, raw: msg.payload } : msg.payload
) + "\n"; // append, 1 line per msg
return msg;
// ============================================================ F4
// "checkFleet" — health check panel: which PLC has stopped talking?
// Triggered by an inject node every 5 min; F1 updates flow.lastSeen
// (add in F1, after validation: flow.set("seen_" + data.id, Date.now()); )
const SILENCE_LIMIT_MS = 2 * 60 * 1000; // 2 min without a snapshot = alarm
const EXPECTED_PLCS = [1, 2, 3, 4]; // fleet of this plant
const now = Date.now();
const status = EXPECTED_PLCS.map(id => {
const seenAt = flow.get("seen_" + id) || 0;
return {
plc: id,
online: (now - seenAt) < SILENCE_LIMIT_MS,
last_message: seenAt ? new Date(seenAt).toISOString() : "never"
};
});
const down = status.filter(e => !e.online);
msg.payload = {
plant: env.get("PLANT") || "plant1",
ok: down.length === 0,
down: down.map(e => e.plc),
detail: status
};
if (down.length) node.warn("PLCs without data: " + down.map(e => e.plc).join(", "));
return msg;
Descarga el pack completo del proyecto — gratisEste ejemplo + los relacionados + lista de materiales