MQTT
Installation
$ sudo apt-get install mosquitto mosquitto-clients
$ pip3 install paho-mqtt
Simple publisher / subscriber app
We need a broker ( here we consider on the RPI ) :
RPI Side
$ # in a first terminal :
$ mosquitto -v
command lines
PC Side
$ mosquitto_sub -d -h 192.168.0.2 -t test/message
$ # -h : mqtt host to connect to
$ # -t : mqtt topic to subscribe to
RPI Side
$ # in a second terminal :
$ mosquitto_pub -d -t test/message -m "hello" -h localhost
$ # -h : mqtt host to connect to ( broker address)
python programs
subscriber.py - publisher.py
PC Side
import paho.mqtt.client as mqtt
import time
########################################################################
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to broker")
global Connected
Connected = True
else:
print("Connection failed")
########################################################################
def on_message(client, userdata, message):
print("message received " ,str(message.payload.decode("utf-8")))
print("message topic=",message.topic)
print("message qos=",message.qos)
print("message retain flag=",message.retain)
########################################################################
Connected = False
broker_address="192.168.0.2"
port = 1883
print("creating new instance")
client = mqtt.Client("python_test")
client.on_message=on_message #attach function to callback
client.on_connect=on_connect
print("connecting to broker")
client.connect(broker_address, port) #connect to broker
client.loop_start() #start the loop
while Connected != True: #Wait for connection
time.sleep(0.1)
print("Subscribing to topic","test/message")
client.subscribe("test/message")
#-------------------------------------------------------------------------
try:
while True:
time.sleep(1)
#-------------------------------------------------------------------------
except KeyboardInterrupt:
print("exiting")
client.disconnect()
client.loop_stop()
RPI Side
import paho.mqtt.client as mqttClient
import time
########################################################################
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to broker")
global Connected
Connected = True
else:
print("Connection failed")
########################################################################
def on_publish(client,userdata,result):
print("data published \n")
pass
########################################################################
Connected = False
broker_address= "localhost"
port = 1883
client = mqttClient.Client("Python_publisher") #create new instance
client.on_connect= on_connect #attach function to callback
client.on_publish = on_publish #attach function to callback
client.connect(broker_address, port=port) #connect to broker
client.loop_start() #start the loop
while Connected != True: #Wait for connection
time.sleep(0.1)
client.publish("test/message", payload="hello")
#-------------------------------------------------------------------------
try:
while True:
print("publishing to test/message")
client.publish("test/message", payload="hello")
time.sleep(1)
#-------------------------------------------------------------------------
except KeyboardInterrupt:
print("exiting")
client.disconnect()
client.loop_stop()
MQTT sensor app
terminal display
PC Side
$ mosquitto_sub -d -h 192.168.0.2 -t sense_hat/sensors/pressure
Client mosqsub|8121-kerhoas-de sending CONNECT
Client mosqsub|8121-kerhoas-de received CONNACK
Client mosqsub|8121-kerhoas-de sending SUBSCRIBE (Mid: 1, Topic: sense_hat/sensors/pressure, QoS: 0)
Client mosqsub|8121-kerhoas-de received SUBACK
Subscribed (mid: 1): 0
Client mosqsub|8121-kerhoas-de received PUBLISH (d0, q0, r0, m0, 'sense_hat/sensors/pressure', ... (17 bytes))
1026.799560546875
Client mosqsub|8121-kerhoas-de received PUBLISH (d0, q0, r0, m0, 'sense_hat/sensors/pressure', ... (16 bytes))
1026.84814453125
Client mosqsub|8121-kerhoas-de received PUBLISH (d0, q0, r0, m0, 'sense_hat/sensors/pressure', ... (15 bytes))
1026.8330078125
Client mosqsub|8121-kerhoas-de received PUBLISH (d0, q0, r0, m0, 'sense_hat/sensors/pressure', ... (17 bytes))
1026.826416015625
Client mosqsub|8121-kerhoas-de received PUBLISH (d0, q0, r0, m0, 'sense_hat/sensors/pressure', ... (15 bytes))
1026.8408203125
...
RPI Side
publisher_sense_hat_pressure.py
import paho.mqtt.client as mqttClient
import time
from sense_hat import SenseHat
#################################################################
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to broker")
global Connected
Connected = True
else:
print("Connection failed")
#################################################################
def on_publish(client,userdata,result):
print("data published \n")
pass
################################################################
Connected = False
broker_address= "localhost"
port = 1883
client = mqttClient.Client("Python_publisher") #create new instance
client.on_connect= on_connect #attach function to callback
client.on_publish = on_publish #attach function to callback
client.connect(broker_address, port=port) #connect to broker
client.loop_start() #start the loop
while Connected != True: #Wait for connection
time.sleep(0.1)
#-------------------------------------------------------------------------
try:
while True:
sense = SenseHat()
press = sense.get_pressure()
temp = sense.get_temperature()
print("publishing to sense_hat/sensors/pressure")
client.publish("sense_hat/sensors/pressure", payload = press)
time.sleep(1)
#-------------------------------------------------------------------------
except KeyboardInterrupt:
print("exiting")
client.disconnect()
client.loop_stop()
adding GPIO Led Control
PC Side
$ mosquitto_pub -d -t sense_hat/gpio/switch_led -m "LED_ON" -h "192.168.0.2"
subscriber_to_sense_hat.py
import paho.mqtt.client as mqtt
import time
########################################################################
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to broker")
global Connected
Connected = True
else:
print("Connection failed")
########################################################################
def on_receive_pressure(client, userdata, message):
print("pressure= " ,str(message.payload.decode("utf-8")))
########################################################################
def on_receive_led_state(client, userdata, message):
print("led_state= " ,str(message.payload.decode("utf-8")))
########################################################################
Connected = False
broker_address="192.168.0.2"
port = 1883
print("creating new instance")
client = mqtt.Client("python_test")
client.on_connect=on_connect
client.message_callback_add("sense_hat/sensors/pressure", on_receive_pressure)
client.message_callback_add("sense_hat/gpio/led_state", on_receive_led_state)
print("connecting to broker")
client.connect(broker_address, port) #connect to broker
client.loop_start() #start the loop
while Connected != True: #Wait for connection
time.sleep(0.1)
print("Subscribing to topics","sense_hat/sensors/pressure and sense_hat/gpio/led_state")
client.subscribe("sense_hat/sensors/pressure")
client.subscribe("sense_hat/gpio/led_state")
#-------------------------------------------------------------------------
try:
while True:
time.sleep(1)
#-------------------------------------------------------------------------
except KeyboardInterrupt:
print("exiting")
client.disconnect()
client.loop_stop()
$ python3 subscriber_to_sense_hat.py
creating new instance
connecting to broker
Connected to broker
Subscribing to topics sense_hat/sensors/pressure and sense_hat/gpio/led_state
pressure= 1026.8740234375
pressure= 1026.855712890625
led_state= ON
pressure= 1026.87548828125
pressure= 1026.8662109375
led_state= OFF
pressure= 1026.86083984375
...
RPI Side
pi_sense_hat_app.py
import paho.mqtt.client as mqttClient
import time
from sense_hat import SenseHat
import subprocess
#################################################################
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to broker")
global Connected
Connected = True
else:
print("Connection failed")
#################################################################
def on_message(client, userdata, message):
print("message received " ,str(message.payload.decode("utf-8")))
if message.payload.decode("utf-8") == "LED_ON":
print("action==true")
bashCommand = "echo 1 > /sys/class/gpio/gpio4/value"
output = subprocess.check_output(['bash','-c', bashCommand])
ledS="ON"
else:
print("action==false")
bashCommand = "echo 0 > /sys/class/gpio/gpio4/value"
output = subprocess.check_output(['bash','-c', bashCommand])
ledS="OFF"
client.publish("sense_hat/gpio/led_state", payload=ledS)
################################################################
def on_publish(client,userdata,result):
print("data published \n")
pass
################################################################
Connected = False
broker_address= "localhost"
port = 1883
client = mqttClient.Client("rpi") #create new instance
client.on_connect = on_connect #attach function to callback
client.on_publish = on_publish #attach function to callback
client.on_message = on_message #attach function to callback
client.connect(broker_address, port=port) #connect to broker
client.loop_start() #start the loop
while Connected != True: #Wait for connection
time.sleep(0.1)
client.subscribe("sense_hat/gpio/switch_led")
#-------------------------------------------------------------------------
try:
while True:
sense = SenseHat()
press = sense.get_pressure()
temp = sense.get_temperature()
print("publishing to sense_hat/sensors/pressure")
client.publish("sense_hat/sensors/pressure", payload = press)
time.sleep(1)
#-------------------------------------------------------------------------
except KeyboardInterrupt:
print("exiting")
client.disconnect()
client.loop_stop()
web browser display : MQTT + socket io
web browser can’t support MQTT ; we can make a bridge between our MQTT socket and a websocket.
Here is an example with nodejs:
Raspberry PI Side
In a first terminal :
$ node webserver.js
In a 2nd terminal :
$ node sensor_app.js
PC Side
Open a web browser and write “192.168.0.2:8080”
www/index.html
<html lang="en">
<head>
<meta charset="UTF-8">
<title>RPI Node Js Sensor App </title>
</head>
<body>
<!--=================================================================-->
<!-- DISPLAYS -->
<!--=================================================================-->
<script>
display_log=function(msg)
{
var logArea = 0;
if(!logArea)
{ logArea=document.getElementById("log"); }
if(msg==null)
{ logArea.textContent='';}
else { logArea.textContent+=msg+'\n'; }
};
</script>
<script src="gauge.js"></script>
<!--=================================================================-->
<!-- WEBSOCKET SCRIPT -->
<!--=================================================================-->
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.0.3/socket.io.js"></script>
<script type="text/javascript" charset="utf-8">
var label;
var led;
var socket=io();
socket = io.connect('http://' + document.domain + ':' + location.port);
socket.on('connect', function() {
console.log('Websocket connected!');
// subscribe to mqtt topics
socket.emit('subscribe',{topic:'sense_hat/sensors/pressure'});
socket.emit('subscribe',{topic:'sense_hat/gpio/led_state'});
});
// listen to the event
socket.on('mqtt', function(msg) {
console.log(msg);
if ( msg.topic == 'sense_hat/sensors/pressure' )
{
display_log("pressure = "+msg.payload) ;
var gauge=new Gauge('gauge');
gauge.draw(msg.payload);
}
if ( msg.topic == 'sense_hat/gpio/led_state' )
{
label.innerHTML= msg.payload;
}
});
function start() {
console.log('start...');
nbr_meas = document.getElementById("nbr_meas");
socket.emit('publish',{topic:"sense_hat/sensors/nbr_meas", payload: parseInt(nbr_meas.value).toString()});
}
</script>
<!--=================================================================-->
<!-- REQUEST FOR LED CONTROL -->
<!--=================================================================-->
<script>
function do_onclick()
{
label = document.getElementById("tLedLabel")
if (led.checked == false )
{
socket.emit('publish',{topic:"sense_hat/gpio/switch_led",payload: "OFF"});
}
else
{
socket.emit('publish',{topic:"sense_hat/gpio/switch_led", payload:"ON"});
}
}
</script>
<!--=================================================================-->
<!-- INIT -->
<!--=================================================================-->
<script>
window.onload = function() {
led=document.getElementById("tLed");
led.checked=false;
};
</script>
<!--=================================================================-->
<!-- HTML -->
<!--=================================================================-->
<p>[<a href="/">home</a>]</p>
<hr>
<h2>Control GPIO4 LED</h2>
<table>
<tr>
<td >
<h3>Static Part :</h3>
Control GPIO4 led <input type="checkbox" id="tLed" onclick="do_onclick()">
</td>
<td>
<span id="tLedLabel"> GPIO4 led turned off</span>
</td>
</tr>
</table>
<hr>
<h2>Get Measured Datas through Websocket</h2>
<table>
<tr>
<td >
<strong> Number of Measurements : </strong><br/>
<input type="text" id="nbr_meas"/> <br/>
<button onclick="start()">Start</button>
</td>
<td>
<h3> Pressure Measurement </h3>
<div >
<canvas id="gauge" width="170" height="170"></canvas>
</div>
</td>
</tr>
</table>
<p><b>Log:</b></p>
<p></p><pre id="log"></pre><p></p>
<hr>
</body>
</html>
<!--=================================================================-->
webserver.js
express = require('express');
var app = express();
var http = require('http').Server(app);
var fs = require('fs');
var io = require('socket.io')(http)
var url = require('url');
var mqtt = require('mqtt');
//======================================================================
app.set('views', __dirname + '/views')
app.set('view engine', 'jade')
app.use(express.static(__dirname + '/www'))
//======================================================================
http.listen(8080); //listen to port 8080
var mqtt_client = mqtt.connect('mqtt://192.168.0.2');
//======================================================================
// SERVE INDEX
//======================================================================
app.get("/", function(req, res) {
res.setHeader('Content-type', 'text/xml');
res.sendFile('index.html');
})
//======================================================================
app.get('/index.html', function (req, res) {
res.sendFile( __dirname + "/" + "index.html" );
})
//======================================================================
io.sockets.on('connect', function (socket) { // WebSocket Connection
//##########################################################
// BRIDGE WEBSOCKETIO --> MQTT BROKER
//##########################################################
socket.on('subscribe', function(data){
console.log('subscribing to' + data.topic );
mqtt_client.subscribe(data.topic);
});
socket.on('publish', function(data) {
console.log('publishing to ' + data.topic);
mqtt_client.publish(data.topic, data.payload);
});
//##########################################################
// BRIDGE MQTT BROKER --> WEBSOCKETIO
//##########################################################
mqtt_client.on('message', function (topic, payload, packet) {
socket.emit('mqtt',{'topic': String(topic), 'payload': String(payload)});
console.log( topic+'='+ payload);
});
});
//======================================================================
sensor_app.js
var mqtt = require('mqtt');
const imu = require("node-sense-hat").Imu;
const IMU = new imu.IMU();
var client = mqtt.connect('mqtt://192.168.0.2');
var Gpio = require('onoff').Gpio;
var LED = new Gpio(4, 'out');
const nb_sleep = require('sleep-async')();
//======================================================================
client.on('connect', () => {
client.subscribe('sense_hat/sensors/nbr_meas');
client.subscribe('sense_hat/gpio/switch_led');
})
//======================================================================
client.on('message', function (topic, payload, packet) {
console.log( topic+'='+ payload);
switch (topic) {
case 'sense_hat/sensors/nbr_meas':
return handleSensorsNbr_meas(payload)
case 'sense_hat/gpio/switch_led':
return handleGpioSwitch_led(payload)
}
});
//======================================================================
function handleGpioSwitch_led(payload) {
var ledS="OFF";
console.log('SWITCH LED %s', payload)
if( payload == 'ON' )
{
console.log("query true");
LED.writeSync(1);
ledS="LEDS_ON";
}
else
{
console.log("query false");
LED.writeSync(0);
ledS="LEDS_OFF";
}
client.publish('sense_hat/gpio/led_state', ledS.toString() )
}
//======================================================================
function handleSensorsNbr_meas(payload) {
console.log('Nbr_meas %s', payload)
var it = 0;
it = parseInt(payload);
if (it) {
console.log(it);
var sleep = require('system-sleep');
for(var i=0 ; i < it ; i++)
{
IMU.getValue((err, data) => {
if (err !== null) {
console.error("Could not read sensor data: ", err);
return;
}
console.log("Accelleration is: ", JSON.stringify(data.accel, null, " "));
console.log("Gyroscope is: ", JSON.stringify(data.gyro, null, " "));
console.log("Compass is: ", JSON.stringify(data.compass, null, " "));
console.log("Fusion data is: ", JSON.stringify(data.fusionPose, null, " "));
console.log("Temp is: ", data.temperature);
console.log("Pressure is: ", data.pressure);
console.log("Humidity is: ", data.humidity);
client.publish('sense_hat/sensors/pressure', Number(data.pressure.toFixed(2)).toString() )
});
sleep(1000);
}
}
}
//======================================================================
process.on('SIGINT', function () { //on ctrl+c
LED.writeSync(0); // Turn LED off
LED.unexport(); // Unexport LED GPIO to free resources
process.exit(); //exit completely
});
//======================================================================
Adding Security
Authentification
$ sudo mosquitto_passwd -c /etc/mosquitto/passwd sense
Password:
Reenter password:
$ sudo nano /etc/mosquitto/conf.d/auth.conf
password_file /etc/mosquitto/passwd
allow_anonymous false
$ sudo systemctl restart mosquitto.service
$ mosquitto_pub -d -t test/message -m "hello" -h localhost -u "sense" -P "password"
TLS Encryption
cf securing-connections-with-ssl-tls
# CERTIFICATION AUTHORITY KEY GENERATION
$ openssl genrsa -aes256 -out ca.key 2048
# SELF SIGNED CERTIFICATION REQUEST
$ openssl req -new -x509 -days 3600 -key ca.key -out ca.crt -subj '/C=FR/ST=Bretagne/L=Brest/O=ENIB/OU=RX/CN=raspberrypi.local'
# BROKER ( SERVER ) KEY GENERATION
$ openssl genrsa -aes256 -out server.key 2048
# REM : no -aes256 : no crypted and password protected key, otherwise the broker will be blocked at start
# CERTIFICATION REQUEST
$ openssl req -new -out server.csr -key server.key
# !! common name : raspberrypi.local ( by default )
# GETTING CERTIFICATE
$ openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 1800
$ sudo cp -rv server.key server.crt ca.crt /etc/mosquitto
# CERTIFICATE CHECK
$ openssl x509 -noout -fingerprint -sha1 -text -in /etc/mosquitto/server.crt
$ sudo nano /etc/mosquitto/conf.d/tls.conf
port 8883
cafile /etc/mosquitto/ca.crt
keyfile /etc/mosquitto/server.key
certfile /etc/mosquitto/server.crt
tls_version tlsv1
$ mosquitto_sub -v -d -h raspberrypi.local -p 8883 --tls-version tlsv1 --cafile /etc/mosquitto/ca.crt -t test/message
$ mosquitto_pub -d -h raspberrypi.local -t test/message -p 8883 --tls-version tlsv1 --cafile /etc/mosquitto/ca.crt -m "hello"
MQTT on Pycom board (micropython)
Installations
$ sudo add-apt-repository ppa:webupd8team/atom
$ sudo apt-get update
$ sudo apt-get install atom
$ sudo usermod -a -G dialout [you_name]
On atom, install Pymakr package.
Application
We suppose mosquitto broker is at address “192.168.200.179”.
from network import WLAN
from mqtt import MQTTClient
import machine
import time
def settimeout(duration):
pass
print("trying to connect")
wlan = WLAN(mode=WLAN.STA)
wlan.antenna(WLAN.EXT_ANT)
print(wlan.ifconfig())
nets = wlan.scan()
print(str(nets))
wlan.connect(ssid='enib', auth=(WLAN.WPA2, '[pwd_enib]'))
#wlan.connect('eduroam', auth=(WLAN.WPA2_ENT, '[my_log]','[my_pwd]'))
while not wlan.isconnected():
pass #machine.idle()
print("Connected to Wifi\n")
client = MQTTClient("broker", "192.168.200.179", port=1883)
client.settimeout = settimeout
client.connect()
while True:
client.publish("test/message", "hello")
print("Sending hello")
time.sleep(1)
Once the program is running on pycom, in a terminal :
$ mosquitto_sub -d -h 192.168.200.179 -t test/message
Client mosqsub/981-raspberrypi sending CONNECT
Client mosqsub/981-raspberrypi received CONNACK
Client mosqsub/981-raspberrypi sending SUBSCRIBE (Mid: 1, Topic: test/message, QoS: 0)
Client mosqsub/981-raspberrypi received SUBACK
Subscribed (mid: 1): 0
Client mosqsub/981-raspberrypi received PUBLISH (d0, q0, r0, m0, 'test/message', ... (5 bytes))
hello
Client mosqsub/981-raspberrypi received PUBLISH (d0, q0, r0, m0, 'test/message', ... (5 bytes))
hello
Client mosqsub/981-raspberrypi received PUBLISH (d0, q0, r0, m0, 'test/message', ... (5 bytes))
hello
Client mosqsub/981-raspberrypi received PUBLISH (d0, q0, r0, m0, 'test/message', ... (5 bytes))