MQTT

Installation

$ sudo apt-get install mosquitto mosquitto-clients
$ pip3 install paho-mqtt

Simple publisher / subscriber app

We need a broker ( here we consider on the RPI ) :

mqtt_1.svg

RPI Side

$ # in a first terminal :  
$ mosquitto -v

command lines

PC Side

$ mosquitto_sub -d -h 192.168.0.2 -t test/message
$ # -h : mqtt host to connect to
$ # -t : mqtt topic to subscribe to

RPI Side

$ # in a second terminal :
$ mosquitto_pub -d -t test/message -m "hello" -h localhost
$ # -h : mqtt host to connect to ( broker address)

python programs

subscriber.py - publisher.py

PC Side

import paho.mqtt.client as mqtt
import time

########################################################################
def on_connect(client, userdata, flags, rc):
    if rc == 0:
         print("Connected to broker")
         global Connected                
         Connected = True                
    else:
         print("Connection failed")
########################################################################
def on_message(client, userdata, message):
    print("message received " ,str(message.payload.decode("utf-8")))
    print("message topic=",message.topic)
    print("message qos=",message.qos)
    print("message retain flag=",message.retain)
########################################################################
Connected = False

broker_address="192.168.0.2"
port = 1883                    

print("creating new instance")
client = mqtt.Client("python_test")
client.on_message=on_message          #attach function to callback
client.on_connect=on_connect
print("connecting to broker")
client.connect(broker_address, port)  #connect to broker
client.loop_start()                   #start the loop

while Connected != True:              #Wait for connection
    time.sleep(0.1)

print("Subscribing to topic","test/message")
client.subscribe("test/message")

#------------------------------------------------------------------------- 
try:
    while True:	
        time.sleep(1)
#------------------------------------------------------------------------- 
except KeyboardInterrupt:
    print("exiting")
    client.disconnect()
    client.loop_stop()

RPI Side

import paho.mqtt.client as mqttClient
import time

######################################################################## 
def on_connect(client, userdata, flags, rc):
 
    if rc == 0:
        print("Connected to broker")
        global Connected               
        Connected = True                
 
    else:
        print("Connection failed")
########################################################################
def on_publish(client,userdata,result):
    print("data published \n")
    pass
########################################################################
 
Connected = False

broker_address= "localhost"
port = 1883

client = mqttClient.Client("Python_publisher")     #create new instance
client.on_connect= on_connect                      #attach function to callback
client.on_publish = on_publish                     #attach function to callback 
client.connect(broker_address, port=port)          #connect to broker
client.loop_start()                                #start the loop
 
while Connected != True:                           #Wait for connection
    time.sleep(0.1)
 
client.publish("test/message", payload="hello")
#-------------------------------------------------------------------------
try:
    while True:
        print("publishing to test/message")
        client.publish("test/message", payload="hello")
        time.sleep(1)
#-------------------------------------------------------------------------
except KeyboardInterrupt:
    print("exiting")
    client.disconnect()
    client.loop_stop()

MQTT sensor app

terminal display

mqtt_2.svg

PC Side

$ mosquitto_sub -d -h 192.168.0.2 -t sense_hat/sensors/pressure
Client mosqsub|8121-kerhoas-de sending CONNECT
Client mosqsub|8121-kerhoas-de received CONNACK
Client mosqsub|8121-kerhoas-de sending SUBSCRIBE (Mid: 1, Topic: sense_hat/sensors/pressure, QoS: 0)
Client mosqsub|8121-kerhoas-de received SUBACK
Subscribed (mid: 1): 0
Client mosqsub|8121-kerhoas-de received PUBLISH (d0, q0, r0, m0, 'sense_hat/sensors/pressure', ... (17 bytes))
1026.799560546875
Client mosqsub|8121-kerhoas-de received PUBLISH (d0, q0, r0, m0, 'sense_hat/sensors/pressure', ... (16 bytes))
1026.84814453125
Client mosqsub|8121-kerhoas-de received PUBLISH (d0, q0, r0, m0, 'sense_hat/sensors/pressure', ... (15 bytes))
1026.8330078125
Client mosqsub|8121-kerhoas-de received PUBLISH (d0, q0, r0, m0, 'sense_hat/sensors/pressure', ... (17 bytes))
1026.826416015625
Client mosqsub|8121-kerhoas-de received PUBLISH (d0, q0, r0, m0, 'sense_hat/sensors/pressure', ... (15 bytes))
1026.8408203125
...

RPI Side

publisher_sense_hat_pressure.py
import paho.mqtt.client as mqttClient
import time
from sense_hat import SenseHat

################################################################# 
def on_connect(client, userdata, flags, rc):
 
    if rc == 0:
        print("Connected to broker")
        global Connected               
        Connected = True                
 
    else:
        print("Connection failed")
#################################################################
def on_publish(client,userdata,result):
    print("data published \n")
    pass
################################################################
 
Connected = False

broker_address= "localhost"
port = 1883

client = mqttClient.Client("Python_publisher")     #create new instance
client.on_connect= on_connect                      #attach function to callback
client.on_publish = on_publish                     #attach function to callback 
client.connect(broker_address, port=port)          #connect to broker
client.loop_start()                                #start the loop
 
while Connected != True:                           #Wait for connection
    time.sleep(0.1)
 
#-------------------------------------------------------------------------
try:
    while True:
        sense = SenseHat()
        press = sense.get_pressure()
        temp = sense.get_temperature()

        print("publishing to sense_hat/sensors/pressure")
        client.publish("sense_hat/sensors/pressure", payload = press)
        time.sleep(1)
#-------------------------------------------------------------------------
except KeyboardInterrupt:
    print("exiting")
    client.disconnect()
    client.loop_stop()

adding GPIO Led Control

mqtt_3.svg

PC Side

$  mosquitto_pub -d -t sense_hat/gpio/switch_led -m "LED_ON" -h "192.168.0.2"
subscriber_to_sense_hat.py
import paho.mqtt.client as mqtt
import time

########################################################################
def on_connect(client, userdata, flags, rc):
    if rc == 0:
         print("Connected to broker")
         global Connected                
         Connected = True                
    else:
         print("Connection failed")
########################################################################
def on_receive_pressure(client, userdata, message):
    print("pressure= " ,str(message.payload.decode("utf-8")))
########################################################################
def on_receive_led_state(client, userdata, message):
    print("led_state= " ,str(message.payload.decode("utf-8")))
########################################################################

Connected = False

broker_address="192.168.0.2"
port = 1883                    

print("creating new instance")
client = mqtt.Client("python_test")
client.on_connect=on_connect
client.message_callback_add("sense_hat/sensors/pressure", on_receive_pressure)
client.message_callback_add("sense_hat/gpio/led_state", on_receive_led_state)
print("connecting to broker")
client.connect(broker_address, port)  #connect to broker
client.loop_start()                   #start the loop

while Connected != True:              #Wait for connection
    time.sleep(0.1)

print("Subscribing to topics","sense_hat/sensors/pressure and sense_hat/gpio/led_state")
client.subscribe("sense_hat/sensors/pressure")
client.subscribe("sense_hat/gpio/led_state")
#------------------------------------------------------------------------- 
try:
    while True:	
        time.sleep(1)
#------------------------------------------------------------------------- 
except KeyboardInterrupt:
    print("exiting")
    client.disconnect()
    client.loop_stop()
$ python3 subscriber_to_sense_hat.py 
creating new instance
connecting to broker
Connected to broker
Subscribing to topics sense_hat/sensors/pressure and sense_hat/gpio/led_state
pressure=  1026.8740234375
pressure=  1026.855712890625
led_state=  ON
pressure=  1026.87548828125
pressure=  1026.8662109375
led_state=  OFF
pressure=  1026.86083984375
...

RPI Side

pi_sense_hat_app.py
import paho.mqtt.client as mqttClient
import time
from sense_hat import SenseHat
import subprocess

################################################################# 
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected to broker")
        global Connected               
        Connected = True                
    else:
        print("Connection failed")
#################################################################
def on_message(client, userdata, message):
    print("message received " ,str(message.payload.decode("utf-8")))
    if message.payload.decode("utf-8") == "LED_ON":
        print("action==true")
        bashCommand = "echo 1 > /sys/class/gpio/gpio4/value"
        output = subprocess.check_output(['bash','-c', bashCommand]) 
        ledS="ON"    
    else: 
        print("action==false")
        bashCommand = "echo 0 > /sys/class/gpio/gpio4/value"
        output = subprocess.check_output(['bash','-c', bashCommand]) 
        ledS="OFF"
    client.publish("sense_hat/gpio/led_state", payload=ledS)
################################################################
def on_publish(client,userdata,result):
    print("data published \n")
    pass
################################################################
 
Connected = False

broker_address= "localhost"
port = 1883

client = mqttClient.Client("rpi")  #create new instance
client.on_connect = on_connect                      #attach function to callback
client.on_publish = on_publish                     #attach function to callback 
client.on_message = on_message                     #attach function to callback 
client.connect(broker_address, port=port)          #connect to broker
client.loop_start()                                #start the loop
 
while Connected != True:                           #Wait for connection
    time.sleep(0.1)

client.subscribe("sense_hat/gpio/switch_led") 

#-------------------------------------------------------------------------
try:
    while True:
        sense = SenseHat()
        press = sense.get_pressure()
        temp = sense.get_temperature()

        print("publishing to sense_hat/sensors/pressure")
        client.publish("sense_hat/sensors/pressure", payload = press)
        time.sleep(1)
#-------------------------------------------------------------------------
except KeyboardInterrupt:
    print("exiting")
    client.disconnect()
    client.loop_stop()

web browser display : MQTT + socket io

mqtt_socketio.zip

web browser can’t support MQTT ; we can make a bridge between our MQTT socket and a websocket.
Here is an example with nodejs:

mqtt_4.svg

Raspberry PI Side

In a first terminal :

$ node webserver.js

In a 2nd terminal :

$ node sensor_app.js

PC Side

Open a web browser and write “192.168.0.2:8080”

www/index.html
Le fichier iot/linux-server-programming/mqtt-sensor-app/mqtt_socketio/www/index.html n'a pas été trouvé à l'emplacement spécifié.
webserver.js
Le fichier iot/linux-server-programming/mqtt-sensor-app/mqtt_socketio/webserver.js n'a pas été trouvé à l'emplacement spécifié.
sensor_app.js
Le fichier iot/linux-server-programming/mqtt-sensor-app/mqtt_socketio/sensor_app.js n'a pas été trouvé à l'emplacement spécifié.

Adding Security

Authentification

$ sudo mosquitto_passwd -c /etc/mosquitto/passwd sense
Password:
Reenter password:  

$ sudo nano /etc/mosquitto/conf.d/auth.conf
password_file /etc/mosquitto/passwd
allow_anonymous false

$ sudo systemctl restart mosquitto.service
$ mosquitto_pub -d -t test/message -m "hello" -h localhost -u "sense" -P "password"

TLS Encryption

cf securing-connections-with-ssl-tls

# CERTIFICATION AUTHORITY KEY GENERATION
$ openssl genrsa -aes256 -out ca.key 2048

# SELF SIGNED CERTIFICATION REQUEST
$ openssl req -new -x509 -days 3600 -key ca.key -out ca.crt -subj '/C=FR/ST=Bretagne/L=Brest/O=ENIB/OU=RX/CN=raspberrypi.local'

# BROKER ( SERVER )  KEY GENERATION
$ openssl genrsa -aes256 -out server.key 2048
# REM : no -aes256 : no crypted and password protected key, otherwise the broker will be blocked at start  

# CERTIFICATION REQUEST
$ openssl req -new -out server.csr -key server.key
# !! common name : raspberrypi.local ( by default ) 

# GETTING CERTIFICATE
$ openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 1800

$ sudo cp -rv  server.key server.crt ca.crt /etc/mosquitto

# CERTIFICATE CHECK
$  openssl x509 -noout -fingerprint -sha1 -text -in /etc/mosquitto/server.crt

$ sudo nano /etc/mosquitto/conf.d/tls.conf
port 8883
cafile /etc/mosquitto/ca.crt
keyfile /etc/mosquitto/server.key
certfile /etc/mosquitto/server.crt
tls_version tlsv1
$ mosquitto_sub -v -d -h raspberrypi.local -p 8883  --tls-version tlsv1 --cafile /etc/mosquitto/ca.crt -t test/message
$ mosquitto_pub -d -h raspberrypi.local -t test/message -p 8883 --tls-version tlsv1 --cafile /etc/mosquitto/ca.crt -m "hello"

MQTT on Pycom board (micropython)

Installations

$ sudo add-apt-repository ppa:webupd8team/atom
$ sudo apt-get update 
$ sudo apt-get install atom
$ sudo usermod -a -G dialout [you_name]

On atom, install Pymakr package.

Application

pycom_mqtt_test.zip

We suppose mosquitto broker is at address “192.168.200.179”.

from network import WLAN
from mqtt import MQTTClient
import machine
import time

def settimeout(duration):
    pass

print("trying to connect")
wlan = WLAN(mode=WLAN.STA)
wlan.antenna(WLAN.EXT_ANT)
print(wlan.ifconfig())
nets = wlan.scan()
print(str(nets))
wlan.connect(ssid='enib', auth=(WLAN.WPA2, '[pwd_enib]'))
#wlan.connect('eduroam', auth=(WLAN.WPA2_ENT, '[my_log]','[my_pwd]'))

while not wlan.isconnected():
    pass #machine.idle()
print("Connected to Wifi\n")

client = MQTTClient("broker", "192.168.200.179", port=1883)
client.settimeout = settimeout
client.connect()

while True:
     client.publish("test/message", "hello")
     print("Sending hello")
     time.sleep(1)

Once the program is running on pycom, in a terminal :

$ mosquitto_sub -d -h 192.168.200.179 -t test/message
Client mosqsub/981-raspberrypi sending CONNECT
Client mosqsub/981-raspberrypi received CONNACK
Client mosqsub/981-raspberrypi sending SUBSCRIBE (Mid: 1, Topic: test/message, QoS: 0)
Client mosqsub/981-raspberrypi received SUBACK
Subscribed (mid: 1): 0
Client mosqsub/981-raspberrypi received PUBLISH (d0, q0, r0, m0, 'test/message', ... (5 bytes))
hello
Client mosqsub/981-raspberrypi received PUBLISH (d0, q0, r0, m0, 'test/message', ... (5 bytes))
hello
Client mosqsub/981-raspberrypi received PUBLISH (d0, q0, r0, m0, 'test/message', ... (5 bytes))
hello
Client mosqsub/981-raspberrypi received PUBLISH (d0, q0, r0, m0, 'test/message', ... (5 bytes))