Preface
Goal: Realtime data plot using Pandas.
Instead of internal random data. We separate the source data, and the receiver. This can be done easily with MQTT.
And we can also adjust the source data, so we can simulate sensor value change, such as temperature and so on.
In real sensor, we can change thermocouple directly, by giving heat from lighter, or other source. And take the heat by cold material.
Preparation
Run MQTT broker
❯ mosquitto -d
Publisher and Subscriber
Producer and Customer
or
Sender and Receiver
This MQTT example require two running scripts.
- Publisher and,
- Subscriber
1: Publisher
Produce MQTT Packet containing random values
Official Documentation
You should consider to read the official documentation, before reading this article thoroughly.
Python Source Code
You can just read the source as an overview.
Package Requirement
Paho MQTT
We need Paho MQTT
to run this script.
import paho.mqtt.client as mqtt
import random
from time import sleep
Skeleton
This only consist these methods below:
class mqPublisher:
def __init__(self):
def update_num(self):
def main(self):
Constructor
Exactly like our previous class.
class mqPublisher:
def __init__(self):
# save initial parameter
self.lim_lower = 5
self.lim_upper = 40
self.num_normal = 21
self.current = self.num_normal
Updating Values
With Random Number
Just this method. Over and over again. No changes intact.
def update_num(self):
while True:
neo = self.current
neo += random.uniform(-5, 5)
if self.lim_lower <= neo <= self.lim_upper:
break
self.current = neo
Main Method
The only differences is here. We sent packet to network, instead of print number in console.
def main(self):
client = mqtt.Client()
client.connect("localhost",1883,60)
while True:
self.update_num()
client.publish(
"topic/signal", str(self.current))
sleep(1)
client.disconnect();
Execute: Run
Consider run the publisher script.
mqPub = mqPublisher()
mqPub.main()
Nothing will be happened.
❯ python 11-pub.py
Every MQTT packet sent to network, every one second.
We need to capture the packet using subscriber.
2: Subscriber
Consume MQTT Packet containing any value
The script is very short. But be aware, that it is going to grow to a complex script later.
Python Source Code
Reading the source would help in understanding this article.
Package Requirement
Paho MQTT
import paho.mqtt.client as mqtt
Skeleton
The class only contain event handler.
class mqSubscriber:
def on_connect(self,
client, userdata, flags, rc):
def on_message(self,
client, userdata, msg):
def main(self):
Constructor
So short that this class does not even need a constructor.
Event Handler: On Connect
The PAHO MQTT require us to use this event handler.
def on_connect(self,
client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("topic/signal")
Event Handler: On Message
Capturing Packet
def on_message(self,
client, userdata, msg):
float_num = float(msg.payload.decode())
print("%.2f" % float_num)
Main Method
The main class copied from official documentation.
def main(self):
client = mqtt.Client()
client.connect("localhost",1883,60)
client.on_connect = self.on_connect
client.on_message = self.on_message
client.loop_forever()
Dual Output in CLI
We can run bu making class instance, and then call main methods.
mqSub = mqSubscriber()
mqSub.main()
Consider run both script, the publisher and subscriber. The subscriber script will show captured value.
❯ python 11-sub.py
Connected with result code 0
20.32
17.19
16.10
18.93
15.44
12.69
11.13
11.56
16.30
13.54
15.25
You can run two subscriber script, and see what happened. Both would receive the same value.
Comparing Topic
Between The Publisher and The Subscriber, Flow the Topic.
You may examine, how the topic from one script to another.
Running Two Publisher
Be creative! Make your own experiment.
Note that value can be received anytime by the subscriber. You can play, running two publisher script, with different initial value, and see what happened.
This got me to think anything else. What if I can adjust the value on the fly? So I can examine two publisher script better.
3: Adjustable Publisher
Running task asynchronously
We are actually want this script to do separate task concurrently.
- Publishing Number
- Adjusting Number
Python Source Code
Use the source, Luke.
For quick overview, just use the source.
Package Requirement
Beside the asyncio
,
we require the aioconsole
to handle the console input.
import paho.mqtt.client as mqtt
import asyncio
import random
from aioconsole import ainput
Skeleton
We need to separate the task:
produce
and adjust
as shown in skeleton below:
class mqPublisher:
def __init__(self):
def update_num(self):
def set_within(self, num):
async def do_produce(self):
async def do_adjust(self):
async def main(self):
mqPub = mqPublisher()
asyncio.run(mqPub.main())
Constructor
No more event handler
We can put MQTT connection inside the constructor, or main method, whatever suit your case.
class mqPublisher:
def __init__(self):
# save initial parameter
self.lim_lower = 5
self.lim_upper = 70
self.num_normal = 21
self.current = self.num_normal
self.client = mqtt.Client()
self.client.connect("localhost",1883,60)
Main
Managing Asynchronous Tasks
The main method manage two task separately,
using async
and await
keyword.
So that both tasks run concurrently,
without affecting each other.
async def main(self):
task_produce = asyncio.create_task(
self.do_produce())
task_adjust = asyncio.create_task(
self.do_adjust())
await(task_produce)
await(task_adjust)
Updating Values
Still intact no changes.
Adjusting Values
Using AIOConsole
We need to wrap this task in async method.
All we need to do is waiting to input adjustment. Then check if the next value is already reach the adjustment value.
async def do_adjust(self):
while True:
float_num = float(
await ainput('Adjustment: '))
float_num = self.set_within(float_num)
print("Adjust to: %.2f" % float_num)
while abs(self.current - float_num) > 1:
self.current = (self.current + float_num)/2
await asyncio.sleep(1)
Changing Value
Gradually changed
Changing the value instantly, is considered unnatural. We need to move step by step, by taking the mean value between two number:
self.current = (self.current + float_num)/2
To check if the value is near adjusment value, I give tolerance within one number.
while abs(self.current - float_num) > 1:
You can change the tolerance value, with whatever number suitable for your case.
Smoother Change
You can even make the transition smoother.
while abs(self.current - float_num) > 1:
fourth = (float_num - self.current)/4
self.current = self.current + fourth
await asyncio.sleep(0.7)
However, the data only be sent for each one second interval.
Limiting Adjustment
Controlling Value
I also add value controlling. So that the value won’t go out of defined boundary.
def set_within(self, num):
if num < self.lim_lower:
num = self.lim_lower
if num > self.lim_upper:
num = self.lim_upper
return num
Publishing Signal
Produce MQTT Packet
We can produce the packet as usual.
But this time in async
method.
async def do_produce(self):
while True:
self.update_num()
self.client.publish(
"topic/signal", str(self.current))
await asyncio.sleep(1)
Execute: Run
Running asynchronous task would be a little bit diffeent.
mqPub = mqPublisher()
asyncio.run(mqPub.main())
Dual Output in CLI
We can play with the script and adjust the value
❯ python 12-pub.py
<div class="highlight"><pre style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4"><code class="language-bash" data-lang="bash"></code></pre></div>
Adjustment: 50
Adjust to: 50.00
Adjustment: 10
Adjust to: 10.00
The subscriber will received the value according to the adjustment.
❯ python 11-sub.py
<div class="highlight"><pre style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4"><code class="language-bash" data-lang="bash"></code></pre></div>
❯ python 11-sub.py
Connected with result code 0
...
22.14
22.01
36.35
43.98
47.82
48.87
50.00
49.21
50.21
30.79
19.79
15.64
13.01
12.07
11.90
10.21
11.20
...
This would be clear with this video below:
Again. play with two publisher to examine MQTT behaviour. And see what happened.
What is Next 🤔?
This is not the only way to handle MQTT. We need to explore more alternative, and make some enhancement, such as asynchronous MQTT, and handling data in time series fashioned.
Consider continue reading [ Python - MQTT - Enhanced ].