Skip to Content

← MQTT to SQL Server Bridge in Python on a Raspberry PLC

Industrial mixing (touch HMI)TouchBerry PiMQTTSQL ServerCommunication

MQTT to SQL Server Bridge in Python on a Raspberry PLC — full example

Connect an MQTT broker to SQL Server with Python: paho-mqtt plus pyodbc, two-way sync, auto-reconnection and parameterised inserts on a Raspberry PLC.

Complete, runnable program for the TouchBerry Pi (mqtt-sql-server-bridge.py): wiring header, requirements and integration notes included.

Download the full project pack — freeThis example + the related ones + bill of materials

Read-only preview.

# -*- coding: utf-8 -*-
"""
COMPLETE EXAMPLE — MQTT <-> SQL Server bridge (Python + pyodbc)

Hardware:  TouchBerry Pi (Raspberry Pi + Industrial Shields PLC, touch screen)
Based on:  industrial mixing project (Node-RED touch HMI)

Requirements:
    pip install paho-mqtt pyodbc
    ODBC driver: msodbcsql17 (or FreeTDS) installed on the Raspberry Pi

Architecture:
    - DB -> MQTT direction: every 5 s the machine statuses are read from the
      EstadoMaquinas table and published to planta//ready, where the
      Node-RED dashboard LEDs consume them.
    - MQTT -> DB direction: every activation published by the HMI on
      planta/activaciones is inserted into the RegistroActivaciones table.
    - Automatic reconnection for both the broker and the database.
"""

import json
import time

import paho.mqtt.client as mqtt
import pyodbc

# --- Configuration (always placeholders: load from the environment in production)
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
TOPIC_ACTIVATIONS = "planta/activaciones"       # HMI -> DB
TOPIC_READY_FMT = "planta/{machine}/ready"      # DB  -> HMI

SQL_CONN_STR = (
    "DRIVER={ODBC Driver 17 for SQL Server};"
    "SERVER=SERVIDOR_SQL;"
    "DATABASE=bd_produccion;"
    "UID=USUARIO_SQL;"
    "PWD=PASSWORD_SQL;"
)

POLL_PERIOD_S = 5           # how often EstadoMaquinas is read
DB_RETRY_S = 10             # wait before retrying the DB connection

db_connection = None        # shared pyodbc connection


# ----------------------------------------------------------------------
# Database: connection with retry
# ----------------------------------------------------------------------
def connect_db():
    """Returns a valid connection, retrying until it succeeds."""
    global db_connection
    while True:
        try:
            db_connection = pyodbc.connect(SQL_CONN_STR, timeout=5)
            db_connection.autocommit = True
            print("[DB] Connected to SQL Server")
            return db_connection
        except pyodbc.Error as e:
            print(f"[DB] No connection ({e}); retrying in {DB_RETRY_S} s")
            time.sleep(DB_RETRY_S)


def insert_activation(data):
    """Inserts an HMI activation into RegistroActivaciones."""
    query = """
    INSERT INTO RegistroActivaciones
      (IdMqtt, DateTime, [User], Reason, Comment, Q0, Q1, Q2, Q3)
    VALUES (NEWID(), GETDATE(), ?, ?, ?, ?, ?, ?, ?)
    """
    cursor = db_connection.cursor()
    cursor.execute(
        query,
        data.get("user", "unknown"),
        data.get("reason", ""),
        data.get("comment", ""),
        int(data.get("q0", 0)),
        int(data.get("q1", 0)),
        int(data.get("q2", 0)),
        int(data.get("q3", 0)),
    )
    cursor.close()
    print(f"[DB] Activation logged: {data.get('user')} / {data.get('reason')}")


def read_statuses():
    """Reads the readiness status of each machine (DB -> dashboard)."""
    cursor = db_connection.cursor()
    cursor.execute("SELECT Maquina, Ready FROM EstadoMaquinas")
    rows = cursor.fetchall()
    cursor.close()
    return rows


# ----------------------------------------------------------------------
# MQTT: callbacks (paho reconnects on its own thanks to loop_start)
# ----------------------------------------------------------------------
def on_connect(client, userdata, flags, rc):
    print(f"[MQTT] Connected to the broker (rc={rc})")
    client.subscribe(TOPIC_ACTIVATIONS, qos=1)    # resubscribe on every reconnection


def on_message(client, userdata, msg):
    """Activation received from the HMI: insert it into the DB."""
    try:
        data = json.loads(msg.payload.decode("utf-8"))
        insert_activation(data)
    except (json.JSONDecodeError, KeyError) as e:
        print(f"[MQTT] Invalid payload on {msg.topic}: {e}")
    except pyodbc.Error as e:
        # The DB went down mid-insert: reconnect and do not lose the record
        print(f"[DB] Insert error ({e}); reconnecting...")
        connect_db()
        insert_activation(data)


# ----------------------------------------------------------------------
# Main program
# ----------------------------------------------------------------------
def main():
    connect_db()

    client = mqtt.Client(client_id="puente-mqtt-sql")
    # client.username_pw_set("USUARIO_MQTT", "PASSWORD_MQTT")  # if the broker requires it
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
    client.loop_start()                 # own thread: handles MQTT reconnections

    # DB -> MQTT polling loop
    while True:
        try:
            for machine, ready in read_statuses():
                topic = TOPIC_READY_FMT.format(machine=machine)
                client.publish(topic, "1" if ready else "0", qos=1, retain=True)
        except pyodbc.Error as e:
            print(f"[DB] Polling error ({e}); reconnecting...")
            connect_db()
        time.sleep(POLL_PERIOD_S)


if __name__ == "__main__":
    main()
Download the full project pack — freeThis example + the related ones + bill of materials