← MQTT to SQL Server Bridge in Python on a Raspberry PLC
Industrial mixing (touch HMI)TouchBerry PiMQTTSQL ServerCommunication
MQTT to SQL Server Bridge in Python on a Raspberry PLC — full example
Connect an MQTT broker to SQL Server with Python: paho-mqtt plus pyodbc, two-way sync, auto-reconnection and parameterised inserts on a Raspberry PLC.
Complete, runnable program for the TouchBerry Pi (mqtt-sql-server-bridge.py): wiring header, requirements and integration notes included.
Download the full project pack — freeThis example + the related ones + bill of materials
Read-only preview.
# -*- coding: utf-8 -*-
"""
COMPLETE EXAMPLE — MQTT <-> SQL Server bridge (Python + pyodbc)
Hardware: TouchBerry Pi (Raspberry Pi + Industrial Shields PLC, touch screen)
Based on: industrial mixing project (Node-RED touch HMI)
Requirements:
pip install paho-mqtt pyodbc
ODBC driver: msodbcsql17 (or FreeTDS) installed on the Raspberry Pi
Architecture:
- DB -> MQTT direction: every 5 s the machine statuses are read from the
EstadoMaquinas table and published to planta//ready, where the
Node-RED dashboard LEDs consume them.
- MQTT -> DB direction: every activation published by the HMI on
planta/activaciones is inserted into the RegistroActivaciones table.
- Automatic reconnection for both the broker and the database.
"""
import json
import time
import paho.mqtt.client as mqtt
import pyodbc
# --- Configuration (always placeholders: load from the environment in production)
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
TOPIC_ACTIVATIONS = "planta/activaciones" # HMI -> DB
TOPIC_READY_FMT = "planta/{machine}/ready" # DB -> HMI
SQL_CONN_STR = (
"DRIVER={ODBC Driver 17 for SQL Server};"
"SERVER=SERVIDOR_SQL;"
"DATABASE=bd_produccion;"
"UID=USUARIO_SQL;"
"PWD=PASSWORD_SQL;"
)
POLL_PERIOD_S = 5 # how often EstadoMaquinas is read
DB_RETRY_S = 10 # wait before retrying the DB connection
db_connection = None # shared pyodbc connection
# ----------------------------------------------------------------------
# Database: connection with retry
# ----------------------------------------------------------------------
def connect_db():
"""Returns a valid connection, retrying until it succeeds."""
global db_connection
while True:
try:
db_connection = pyodbc.connect(SQL_CONN_STR, timeout=5)
db_connection.autocommit = True
print("[DB] Connected to SQL Server")
return db_connection
except pyodbc.Error as e:
print(f"[DB] No connection ({e}); retrying in {DB_RETRY_S} s")
time.sleep(DB_RETRY_S)
def insert_activation(data):
"""Inserts an HMI activation into RegistroActivaciones."""
query = """
INSERT INTO RegistroActivaciones
(IdMqtt, DateTime, [User], Reason, Comment, Q0, Q1, Q2, Q3)
VALUES (NEWID(), GETDATE(), ?, ?, ?, ?, ?, ?, ?)
"""
cursor = db_connection.cursor()
cursor.execute(
query,
data.get("user", "unknown"),
data.get("reason", ""),
data.get("comment", ""),
int(data.get("q0", 0)),
int(data.get("q1", 0)),
int(data.get("q2", 0)),
int(data.get("q3", 0)),
)
cursor.close()
print(f"[DB] Activation logged: {data.get('user')} / {data.get('reason')}")
def read_statuses():
"""Reads the readiness status of each machine (DB -> dashboard)."""
cursor = db_connection.cursor()
cursor.execute("SELECT Maquina, Ready FROM EstadoMaquinas")
rows = cursor.fetchall()
cursor.close()
return rows
# ----------------------------------------------------------------------
# MQTT: callbacks (paho reconnects on its own thanks to loop_start)
# ----------------------------------------------------------------------
def on_connect(client, userdata, flags, rc):
print(f"[MQTT] Connected to the broker (rc={rc})")
client.subscribe(TOPIC_ACTIVATIONS, qos=1) # resubscribe on every reconnection
def on_message(client, userdata, msg):
"""Activation received from the HMI: insert it into the DB."""
try:
data = json.loads(msg.payload.decode("utf-8"))
insert_activation(data)
except (json.JSONDecodeError, KeyError) as e:
print(f"[MQTT] Invalid payload on {msg.topic}: {e}")
except pyodbc.Error as e:
# The DB went down mid-insert: reconnect and do not lose the record
print(f"[DB] Insert error ({e}); reconnecting...")
connect_db()
insert_activation(data)
# ----------------------------------------------------------------------
# Main program
# ----------------------------------------------------------------------
def main():
connect_db()
client = mqtt.Client(client_id="puente-mqtt-sql")
# client.username_pw_set("USUARIO_MQTT", "PASSWORD_MQTT") # if the broker requires it
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
client.loop_start() # own thread: handles MQTT reconnections
# DB -> MQTT polling loop
while True:
try:
for machine, ready in read_statuses():
topic = TOPIC_READY_FMT.format(machine=machine)
client.publish(topic, "1" if ready else "0", qos=1, retain=True)
except pyodbc.Error as e:
print(f"[DB] Polling error ({e}); reconnecting...")
connect_db()
time.sleep(POLL_PERIOD_S)
if __name__ == "__main__":
main()
Download the full project pack — freeThis example + the related ones + bill of materials