MQTT

Installation

$ sudo apt-get install mosquitto mosquitto-clients
$ pip3 install paho-mqtt

Simple publisher / subscriber app

We need a broker ( here we consider on the RPI ) :

mqtt_1.svg

RPI Side

$ # in a first terminal :  
$ mosquitto -v

command lines

PC Side

$ mosquitto_sub -d -h 192.168.0.2 -t test/message
$ # -h : mqtt host to connect to
$ # -t : mqtt topic to subscribe to

RPI Side

$ # in a second terminal :
$ mosquitto_pub -d -t test/message -m "hello" -h localhost
$ # -h : mqtt host to connect to ( broker address)

python programs

subscriber.py - publisher.py

PC Side

import paho.mqtt.client as mqtt
import time

########################################################################
def on_connect(client, userdata, flags, rc):
    if rc == 0:
         print("Connected to broker")
         global Connected                
         Connected = True                
    else:
         print("Connection failed")
########################################################################
def on_message(client, userdata, message):
    print("message received " ,str(message.payload.decode("utf-8")))
    print("message topic=",message.topic)
    print("message qos=",message.qos)
    print("message retain flag=",message.retain)
########################################################################
Connected = False

broker_address="192.168.0.2"
port = 1883                    

print("creating new instance")
client = mqtt.Client("python_test")
client.on_message=on_message          #attach function to callback
client.on_connect=on_connect
print("connecting to broker")
client.connect(broker_address, port)  #connect to broker
client.loop_start()                   #start the loop

while Connected != True:              #Wait for connection
    time.sleep(0.1)

print("Subscribing to topic","test/message")
client.subscribe("test/message")

#------------------------------------------------------------------------- 
try:
    while True:	
        time.sleep(1)
#------------------------------------------------------------------------- 
except KeyboardInterrupt:
    print("exiting")
    client.disconnect()
    client.loop_stop()

RPI Side

import paho.mqtt.client as mqttClient
import time

######################################################################## 
def on_connect(client, userdata, flags, rc):
 
    if rc == 0:
        print("Connected to broker")
        global Connected               
        Connected = True                
 
    else:
        print("Connection failed")
########################################################################
def on_publish(client,userdata,result):
    print("data published \n")
    pass
########################################################################
 
Connected = False

broker_address= "localhost"
port = 1883

client = mqttClient.Client("Python_publisher")     #create new instance
client.on_connect= on_connect                      #attach function to callback
client.on_publish = on_publish                     #attach function to callback 
client.connect(broker_address, port=port)          #connect to broker
client.loop_start()                                #start the loop
 
while Connected != True:                           #Wait for connection
    time.sleep(0.1)
 
client.publish("test/message", payload="hello")
#-------------------------------------------------------------------------
try:
    while True:
        print("publishing to test/message")
        client.publish("test/message", payload="hello")
        time.sleep(1)
#-------------------------------------------------------------------------
except KeyboardInterrupt:
    print("exiting")
    client.disconnect()
    client.loop_stop()

MQTT sensor app

terminal display

mqtt_2.svg

PC Side

$ mosquitto_sub -d -h 192.168.0.2 -t sense_hat/sensors/pressure
Client mosqsub|8121-kerhoas-de sending CONNECT
Client mosqsub|8121-kerhoas-de received CONNACK
Client mosqsub|8121-kerhoas-de sending SUBSCRIBE (Mid: 1, Topic: sense_hat/sensors/pressure, QoS: 0)
Client mosqsub|8121-kerhoas-de received SUBACK
Subscribed (mid: 1): 0
Client mosqsub|8121-kerhoas-de received PUBLISH (d0, q0, r0, m0, 'sense_hat/sensors/pressure', ... (17 bytes))
1026.799560546875
Client mosqsub|8121-kerhoas-de received PUBLISH (d0, q0, r0, m0, 'sense_hat/sensors/pressure', ... (16 bytes))
1026.84814453125
Client mosqsub|8121-kerhoas-de received PUBLISH (d0, q0, r0, m0, 'sense_hat/sensors/pressure', ... (15 bytes))
1026.8330078125
Client mosqsub|8121-kerhoas-de received PUBLISH (d0, q0, r0, m0, 'sense_hat/sensors/pressure', ... (17 bytes))
1026.826416015625
Client mosqsub|8121-kerhoas-de received PUBLISH (d0, q0, r0, m0, 'sense_hat/sensors/pressure', ... (15 bytes))
1026.8408203125
...

RPI Side

publisher_sense_hat_pressure.py
import paho.mqtt.client as mqttClient
import time
from sense_hat import SenseHat

################################################################# 
def on_connect(client, userdata, flags, rc):
 
    if rc == 0:
        print("Connected to broker")
        global Connected               
        Connected = True                
 
    else:
        print("Connection failed")
#################################################################
def on_publish(client,userdata,result):
    print("data published \n")
    pass
################################################################
 
Connected = False

broker_address= "localhost"
port = 1883

client = mqttClient.Client("Python_publisher")     #create new instance
client.on_connect= on_connect                      #attach function to callback
client.on_publish = on_publish                     #attach function to callback 
client.connect(broker_address, port=port)          #connect to broker
client.loop_start()                                #start the loop
 
while Connected != True:                           #Wait for connection
    time.sleep(0.1)
 
#-------------------------------------------------------------------------
try:
    while True:
        sense = SenseHat()
        press = sense.get_pressure()
        temp = sense.get_temperature()

        print("publishing to sense_hat/sensors/pressure")
        client.publish("sense_hat/sensors/pressure", payload = press)
        time.sleep(1)
#-------------------------------------------------------------------------
except KeyboardInterrupt:
    print("exiting")
    client.disconnect()
    client.loop_stop()

adding GPIO Led Control

mqtt_3.svg

PC Side

$  mosquitto_pub -d -t sense_hat/gpio/switch_led -m "LED_ON" -h "192.168.0.2"
subscriber_to_sense_hat.py
import paho.mqtt.client as mqtt
import time

########################################################################
def on_connect(client, userdata, flags, rc):
    if rc == 0:
         print("Connected to broker")
         global Connected                
         Connected = True                
    else:
         print("Connection failed")
########################################################################
def on_receive_pressure(client, userdata, message):
    print("pressure= " ,str(message.payload.decode("utf-8")))
########################################################################
def on_receive_led_state(client, userdata, message):
    print("led_state= " ,str(message.payload.decode("utf-8")))
########################################################################

Connected = False

broker_address="192.168.0.2"
port = 1883                    

print("creating new instance")
client = mqtt.Client("python_test")
client.on_connect=on_connect
client.message_callback_add("sense_hat/sensors/pressure", on_receive_pressure)
client.message_callback_add("sense_hat/gpio/led_state", on_receive_led_state)
print("connecting to broker")
client.connect(broker_address, port)  #connect to broker
client.loop_start()                   #start the loop

while Connected != True:              #Wait for connection
    time.sleep(0.1)

print("Subscribing to topics","sense_hat/sensors/pressure and sense_hat/gpio/led_state")
client.subscribe("sense_hat/sensors/pressure")
client.subscribe("sense_hat/gpio/led_state")
#------------------------------------------------------------------------- 
try:
    while True:	
        time.sleep(1)
#------------------------------------------------------------------------- 
except KeyboardInterrupt:
    print("exiting")
    client.disconnect()
    client.loop_stop()
$ python3 subscriber_to_sense_hat.py 
creating new instance
connecting to broker
Connected to broker
Subscribing to topics sense_hat/sensors/pressure and sense_hat/gpio/led_state
pressure=  1026.8740234375
pressure=  1026.855712890625
led_state=  ON
pressure=  1026.87548828125
pressure=  1026.8662109375
led_state=  OFF
pressure=  1026.86083984375
...

RPI Side

pi_sense_hat_app.py
import paho.mqtt.client as mqttClient
import time
from sense_hat import SenseHat
import subprocess

################################################################# 
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected to broker")
        global Connected               
        Connected = True                
    else:
        print("Connection failed")
#################################################################
def on_message(client, userdata, message):
    print("message received " ,str(message.payload.decode("utf-8")))
    if message.payload.decode("utf-8") == "LED_ON":
        print("action==true")
        bashCommand = "echo 1 > /sys/class/gpio/gpio4/value"
        output = subprocess.check_output(['bash','-c', bashCommand]) 
        ledS="ON"    
    else: 
        print("action==false")
        bashCommand = "echo 0 > /sys/class/gpio/gpio4/value"
        output = subprocess.check_output(['bash','-c', bashCommand]) 
        ledS="OFF"
    client.publish("sense_hat/gpio/led_state", payload=ledS)
################################################################
def on_publish(client,userdata,result):
    print("data published \n")
    pass
################################################################
 
Connected = False

broker_address= "localhost"
port = 1883

client = mqttClient.Client("rpi")  #create new instance
client.on_connect = on_connect                      #attach function to callback
client.on_publish = on_publish                     #attach function to callback 
client.on_message = on_message                     #attach function to callback 
client.connect(broker_address, port=port)          #connect to broker
client.loop_start()                                #start the loop
 
while Connected != True:                           #Wait for connection
    time.sleep(0.1)

client.subscribe("sense_hat/gpio/switch_led") 

#-------------------------------------------------------------------------
try:
    while True:
        sense = SenseHat()
        press = sense.get_pressure()
        temp = sense.get_temperature()

        print("publishing to sense_hat/sensors/pressure")
        client.publish("sense_hat/sensors/pressure", payload = press)
        time.sleep(1)
#-------------------------------------------------------------------------
except KeyboardInterrupt:
    print("exiting")
    client.disconnect()
    client.loop_stop()

web browser display : MQTT + socket io

mqtt_socketio.zip

web browser can’t support MQTT ; we can make a bridge between our MQTT socket and a websocket.
Here is an example with nodejs:

mqtt_4.svg

Raspberry PI Side

In a first terminal :

$ node webserver.js

In a 2nd terminal :

$ node sensor_app.js

PC Side

Open a web browser and write “192.168.0.2:8080”

www/index.html

<html lang="en">
<head>
  <meta charset="UTF-8">
  <title>RPI Node Js Sensor App </title>
</head>
<body>
<!--=================================================================-->
<!--				DISPLAYS										 -->
<!--=================================================================-->
<script>
display_log=function(msg)
{
	  var logArea = 0;		
	  
	  if(!logArea)
	  { logArea=document.getElementById("log"); }
	  if(msg==null)
	  { logArea.textContent='';}
	  else { logArea.textContent+=msg+'\n'; }
};
</script> 
<script src="gauge.js"></script>
<!--=================================================================-->
<!--				WEBSOCKET SCRIPT								 -->
<!--=================================================================-->
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.0.3/socket.io.js"></script>
<script type="text/javascript" charset="utf-8">
	 
 	var label;
 	var led;
	   	    
    var socket=io();
    socket = io.connect('http://' + document.domain + ':' + location.port);
 
    socket.on('connect', function() {
        console.log('Websocket connected!');
        // subscribe to mqtt topics
        socket.emit('subscribe',{topic:'sense_hat/sensors/pressure'});
        socket.emit('subscribe',{topic:'sense_hat/gpio/led_state'});
    });
 
    // listen to the event
    socket.on('mqtt', function(msg) {
        console.log(msg);
        if ( msg.topic == 'sense_hat/sensors/pressure' )
        {
			display_log("pressure = "+msg.payload) ;
			var gauge=new Gauge('gauge'); 
			gauge.draw(msg.payload);
		}	
		
		if ( msg.topic == 'sense_hat/gpio/led_state' )
        {
			label.innerHTML=  msg.payload;			
		}			
       });

    function start() {
      console.log('start...');
      nbr_meas = document.getElementById("nbr_meas");
      socket.emit('publish',{topic:"sense_hat/sensors/nbr_meas", payload: parseInt(nbr_meas.value).toString()});      
    }    
</script>
<!--=================================================================-->
<!--				REQUEST FOR LED CONTROL					 -->
<!--=================================================================--> 
<script>
	function do_onclick()
	{
		label = document.getElementById("tLedLabel")
		  if (led.checked == false )
		  {
		   socket.emit('publish',{topic:"sense_hat/gpio/switch_led",payload: "OFF"});
		  }
		  else
		  {
		   socket.emit('publish',{topic:"sense_hat/gpio/switch_led", payload:"ON"});		   
		  }		  
	}
</script>	
<!--=================================================================-->
<!--				INIT					 						 -->
<!--=================================================================--> 
<script>
	window.onload = function() {
			led=document.getElementById("tLed");
			led.checked=false;
	};
</script>
<!--=================================================================-->
<!--				HTML											 -->
<!--=================================================================-->

<p>[<a href="/">home</a>]</p>

<hr> 

<h2>Control GPIO4 LED</h2>

<table>
	<tr>
	<td > 
			<h3>Static Part :</h3>
			Control GPIO4 led <input type="checkbox" id="tLed" onclick="do_onclick()">
	</td>
	<td>	
			<span id="tLedLabel"> GPIO4 led turned off</span>		
	</td>
	</tr>
</table>

<hr> 

<h2>Get Measured Datas through Websocket</h2>  
<table>
	<tr>
	<td > 
			<strong> Number of Measurements :  </strong><br/>   
			<input type="text" id="nbr_meas"/>     <br/>   
			<button onclick="start()">Start</button>
	</td>
	<td>	
			<h3> Pressure Measurement </h3>
			<div >
			<canvas id="gauge" width="170" height="170"></canvas>
			</div>			
	</td>
	</tr>
</table>

<p><b>Log:</b></p>
<p></p><pre id="log"></pre><p></p>	

<hr> 
  
</body>
</html>
<!--=================================================================-->
webserver.js
express = require('express');
var app = express();
var http = require('http').Server(app);
var fs = require('fs'); 
var io = require('socket.io')(http) 
var url = require('url'); 
var mqtt = require('mqtt');
//======================================================================
app.set('views', __dirname + '/views')
app.set('view engine', 'jade')
app.use(express.static(__dirname + '/www'))
//======================================================================
http.listen(8080); //listen to port 8080
var mqtt_client = mqtt.connect('mqtt://192.168.0.2');
//======================================================================
//					SERVE INDEX
//======================================================================
app.get("/", function(req, res) {
	res.setHeader('Content-type', 'text/xml');
    res.sendFile('index.html');
})
//======================================================================
app.get('/index.html', function (req, res) {
   res.sendFile( __dirname + "/" + "index.html" );
})
//======================================================================
io.sockets.on('connect', function (socket) { // WebSocket Connection	
			//##########################################################
			// 			BRIDGE WEBSOCKETIO --> MQTT BROKER 
			//##########################################################
			  socket.on('subscribe', function(data){
				 console.log('subscribing to' + data.topic );
				 mqtt_client.subscribe(data.topic);
			  });

			  socket.on('publish', function(data) {
					console.log('publishing to ' + data.topic);
					mqtt_client.publish(data.topic, data.payload);
			  });
	  			  			  
			//##########################################################
			// 			BRIDGE MQTT BROKER --> WEBSOCKETIO 
			//##########################################################
			mqtt_client.on('message', function (topic, payload, packet) {
				socket.emit('mqtt',{'topic': String(topic), 'payload': String(payload)});	
				console.log( topic+'='+ payload);
			});	  
}); 
//======================================================================
sensor_app.js
var mqtt = require('mqtt');
const imu = require("node-sense-hat").Imu;
const IMU = new imu.IMU();
var client = mqtt.connect('mqtt://192.168.0.2');
var Gpio = require('onoff').Gpio; 
var LED = new Gpio(4, 'out'); 
const nb_sleep = require('sleep-async')();

//======================================================================
client.on('connect', () => {
   client.subscribe('sense_hat/sensors/nbr_meas');
   client.subscribe('sense_hat/gpio/switch_led');
})
//======================================================================
client.on('message', function (topic, payload, packet) {
	console.log( topic+'='+ payload);
	switch (topic) {
    case 'sense_hat/sensors/nbr_meas':
      return handleSensorsNbr_meas(payload)
    case 'sense_hat/gpio/switch_led':
      return handleGpioSwitch_led(payload)  
  }
});
//======================================================================
function handleGpioSwitch_led(payload)  {
	var ledS="OFF";
	console.log('SWITCH LED %s', payload)
	    if( payload == 'ON' ) 
		{
			console.log("query true");
			LED.writeSync(1);
			ledS="LEDS_ON";
		}	
		else	
		{
			console.log("query false");
			LED.writeSync(0);
			ledS="LEDS_OFF";
		}	
	client.publish('sense_hat/gpio/led_state',  ledS.toString()  )				  
}	
//======================================================================
function handleSensorsNbr_meas(payload) {
  console.log('Nbr_meas %s', payload)

  var it = 0; 
  it = parseInt(payload);
	if (it) {
			  console.log(it); 
			  var sleep = require('system-sleep');
			  for(var i=0 ; i < it ; i++)
			  {
					IMU.getValue((err, data) => {
						if (err !== null) {
								console.error("Could not read sensor data: ", err);
								return;
						 }
						console.log("Accelleration is: ", JSON.stringify(data.accel, null, "  "));
						console.log("Gyroscope is: ", JSON.stringify(data.gyro, null, "  "));
						console.log("Compass is: ", JSON.stringify(data.compass, null, "  "));
						console.log("Fusion data is: ", JSON.stringify(data.fusionPose, null, "  "));

						console.log("Temp is: ", data.temperature);
						console.log("Pressure is: ", data.pressure);
						console.log("Humidity is: ", data.humidity);
													  		
						client.publish('sense_hat/sensors/pressure',  Number(data.pressure.toFixed(2)).toString() )				    
						});

					sleep(1000);

			   }			
			}
}
//======================================================================
process.on('SIGINT', function () { 	//on ctrl+c
  LED.writeSync(0); 				// Turn LED off
  LED.unexport(); 					// Unexport LED GPIO to free resources
  process.exit(); 					//exit completely
}); 
//======================================================================	

			
			

	
	
				
	
	
	





Adding Security

Authentification

$ sudo mosquitto_passwd -c /etc/mosquitto/passwd sense
Password:
Reenter password:  

$ sudo nano /etc/mosquitto/conf.d/auth.conf
password_file /etc/mosquitto/passwd
allow_anonymous false

$ sudo systemctl restart mosquitto.service
$ mosquitto_pub -d -t test/message -m "hello" -h localhost -u "sense" -P "password"

TLS Encryption

cf securing-connections-with-ssl-tls

# CERTIFICATION AUTHORITY KEY GENERATION
$ openssl genrsa -aes256 -out ca.key 2048

# SELF SIGNED CERTIFICATION REQUEST
$ openssl req -new -x509 -days 3600 -key ca.key -out ca.crt -subj '/C=FR/ST=Bretagne/L=Brest/O=ENIB/OU=RX/CN=raspberrypi.local'

# BROKER ( SERVER )  KEY GENERATION
$ openssl genrsa -aes256 -out server.key 2048
# REM : no -aes256 : no crypted and password protected key, otherwise the broker will be blocked at start  

# CERTIFICATION REQUEST
$ openssl req -new -out server.csr -key server.key
# !! common name : raspberrypi.local ( by default ) 

# GETTING CERTIFICATE
$ openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 1800

$ sudo cp -rv  server.key server.crt ca.crt /etc/mosquitto

# CERTIFICATE CHECK
$  openssl x509 -noout -fingerprint -sha1 -text -in /etc/mosquitto/server.crt

$ sudo nano /etc/mosquitto/conf.d/tls.conf
port 8883
cafile /etc/mosquitto/ca.crt
keyfile /etc/mosquitto/server.key
certfile /etc/mosquitto/server.crt
tls_version tlsv1
$ mosquitto_sub -v -d -h raspberrypi.local -p 8883  --tls-version tlsv1 --cafile /etc/mosquitto/ca.crt -t test/message
$ mosquitto_pub -d -h raspberrypi.local -t test/message -p 8883 --tls-version tlsv1 --cafile /etc/mosquitto/ca.crt -m "hello"

MQTT on Pycom board (micropython)

Installations

$ sudo add-apt-repository ppa:webupd8team/atom
$ sudo apt-get update 
$ sudo apt-get install atom
$ sudo usermod -a -G dialout [you_name]

On atom, install Pymakr package.

Application

pycom_mqtt_test.zip

We suppose mosquitto broker is at address “192.168.200.179”.

from network import WLAN
from mqtt import MQTTClient
import machine
import time

def settimeout(duration):
    pass

print("trying to connect")
wlan = WLAN(mode=WLAN.STA)
wlan.antenna(WLAN.EXT_ANT)
print(wlan.ifconfig())
nets = wlan.scan()
print(str(nets))
wlan.connect(ssid='enib', auth=(WLAN.WPA2, '[pwd_enib]'))
#wlan.connect('eduroam', auth=(WLAN.WPA2_ENT, '[my_log]','[my_pwd]'))

while not wlan.isconnected():
    pass #machine.idle()
print("Connected to Wifi\n")

client = MQTTClient("broker", "192.168.200.179", port=1883)
client.settimeout = settimeout
client.connect()

while True:
     client.publish("test/message", "hello")
     print("Sending hello")
     time.sleep(1)

Once the program is running on pycom, in a terminal :

$ mosquitto_sub -d -h 192.168.200.179 -t test/message
Client mosqsub/981-raspberrypi sending CONNECT
Client mosqsub/981-raspberrypi received CONNACK
Client mosqsub/981-raspberrypi sending SUBSCRIBE (Mid: 1, Topic: test/message, QoS: 0)
Client mosqsub/981-raspberrypi received SUBACK
Subscribed (mid: 1): 0
Client mosqsub/981-raspberrypi received PUBLISH (d0, q0, r0, m0, 'test/message', ... (5 bytes))
hello
Client mosqsub/981-raspberrypi received PUBLISH (d0, q0, r0, m0, 'test/message', ... (5 bytes))
hello
Client mosqsub/981-raspberrypi received PUBLISH (d0, q0, r0, m0, 'test/message', ... (5 bytes))
hello
Client mosqsub/981-raspberrypi received PUBLISH (d0, q0, r0, m0, 'test/message', ... (5 bytes))