Goal: Realtime data plot using Pandas.
Instead of internal random data. We separate the source data, and the receiver. This can be done easily with MQTT.
And we can also adjust the source data, so we can simulate sensor value change, such as temperature and so on.
In real sensor, we can change thermocouple directly, by giving heat from lighter, or other source. And take the heat by cold material.
Run MQTT broker
❯ mosquitto -d
Publisher and Subscriber
Producer and Customer
Sender and Receiver
This MQTT example require two running scripts.
- Publisher and,
Produce MQTT Packet containing random values
You should consider to read the official documentation, before reading this article thoroughly.
Python Source Code
You can just read the source as an overview.
Paho MQTT to run this script.
import paho.mqtt.client as mqtt import random from time import sleep
This only consist these methods below:
class mqPublisher: def __init__(self): def update_num(self): def main(self):
Exactly like our previous class.
class mqPublisher: def __init__(self): # save initial parameter self.lim_lower = 5 self.lim_upper = 40 self.num_normal = 21 self.current = self.num_normal
With Random Number
Just this method. Over and over again. No changes intact.
def update_num(self): while True: neo = self.current neo += random.uniform(-5, 5) if self.lim_lower <= neo <= self.lim_upper: break self.current = neo
The only differences is here. We sent packet to network, instead of print number in console.
def main(self): client = mqtt.Client() client.connect("localhost",1883,60) while True: self.update_num() client.publish( "topic/signal", str(self.current)) sleep(1) client.disconnect();
Consider run the publisher script.
mqPub = mqPublisher() mqPub.main()
Nothing will be happened.
❯ python 11-pub.py
Every MQTT packet sent to network, every one second.
We need to capture the packet using subscriber.
Consume MQTT Packet containing any value
The script is very short. But be aware, that it is going to grow to a complex script later.
Python Source Code
Reading the source would help in understanding this article.
import paho.mqtt.client as mqtt
The class only contain event handler.
class mqSubscriber: def on_connect(self, client, userdata, flags, rc): def on_message(self, client, userdata, msg): def main(self):
So short that this class does not even need a constructor.
Event Handler: On Connect
The PAHO MQTT require us to use this event handler.
def on_connect(self, client, userdata, flags, rc): print("Connected with result code "+str(rc)) client.subscribe("topic/signal")
Event Handler: On Message
def on_message(self, client, userdata, msg): float_num = float(msg.payload.decode()) print("%.2f" % float_num)
The main class copied from official documentation.
def main(self): client = mqtt.Client() client.connect("localhost",1883,60) client.on_connect = self.on_connect client.on_message = self.on_message client.loop_forever()
Dual Output in CLI
We can run bu making class instance, and then call main methods.
mqSub = mqSubscriber() mqSub.main()
Consider run both script, the publisher and subscriber. The subscriber script will show captured value.
❯ python 11-sub.py
Connected with result code 0 20.32 17.19 16.10 18.93 15.44 12.69 11.13 11.56 16.30 13.54 15.25
You can run two subscriber script, and see what happened. Both would receive the same value.
Between The Publisher and The Subscriber, Flow the Topic.
You may examine, how the topic from one script to another.
Running Two Publisher
Be creative! Make your own experiment.
Note that value can be received anytime by the subscriber. You can play, running two publisher script, with different initial value, and see what happened.
This got me to think anything else. What if I can adjust the value on the fly? So I can examine two publisher script better.
3: Adjustable Publisher
Running task asynchronously
We are actually want this script to do separate task concurrently.
- Publishing Number
- Adjusting Number
Python Source Code
Use the source, Luke.
For quick overview, just use the source.
we require the
to handle the console input.
import paho.mqtt.client as mqtt import asyncio import random from aioconsole import ainput
We need to separate the task:
adjust as shown in skeleton below:
class mqPublisher: def __init__(self): def update_num(self): def set_within(self, num): async def do_produce(self): async def do_adjust(self): async def main(self): mqPub = mqPublisher() asyncio.run(mqPub.main())
No more event handler
We can put MQTT connection inside the constructor, or main method, whatever suit your case.
class mqPublisher: def __init__(self): # save initial parameter self.lim_lower = 5 self.lim_upper = 70 self.num_normal = 21 self.current = self.num_normal self.client = mqtt.Client() self.client.connect("localhost",1883,60)
Managing Asynchronous Tasks
The main method manage two task separately,
So that both tasks run concurrently,
without affecting each other.
async def main(self): task_produce = asyncio.create_task( self.do_produce()) task_adjust = asyncio.create_task( self.do_adjust()) await(task_produce) await(task_adjust)
Still intact no changes.
We need to wrap this task in async method.
All we need to do is waiting to input adjustment. Then check if the next value is already reach the adjustment value.
async def do_adjust(self): while True: float_num = float( await ainput('Adjustment: ')) float_num = self.set_within(float_num) print("Adjust to: %.2f" % float_num) while abs(self.current - float_num) > 1: self.current = (self.current + float_num)/2 await asyncio.sleep(1)
Changing the value instantly, is considered unnatural. We need to move step by step, by taking the mean value between two number:
self.current = (self.current + float_num)/2
To check if the value is near adjusment value, I give tolerance within one number.
while abs(self.current - float_num) > 1:
You can change the tolerance value, with whatever number suitable for your case.
You can even make the transition smoother.
while abs(self.current - float_num) > 1: fourth = (float_num - self.current)/4 self.current = self.current + fourth await asyncio.sleep(0.7)
However, the data only be sent for each one second interval.
I also add value controlling. So that the value won’t go out of defined boundary.
def set_within(self, num): if num < self.lim_lower: num = self.lim_lower if num > self.lim_upper: num = self.lim_upper return num
Produce MQTT Packet
We can produce the packet as usual.
But this time in
async def do_produce(self): while True: self.update_num() self.client.publish( "topic/signal", str(self.current)) await asyncio.sleep(1)
Running asynchronous task would be a little bit diffeent.
mqPub = mqPublisher() asyncio.run(mqPub.main())
Dual Output in CLI
We can play with the script and adjust the value
❯ python 12-pub.py <div class="highlight"><pre style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4"><code class="language-bash" data-lang="bash"></code></pre></div> Adjustment: 50 Adjust to: 50.00 Adjustment: 10 Adjust to: 10.00
The subscriber will received the value according to the adjustment.
❯ python 11-sub.py <div class="highlight"><pre style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4"><code class="language-bash" data-lang="bash"></code></pre></div> ❯ python 11-sub.py Connected with result code 0 ... 22.14 22.01 36.35 43.98 47.82 48.87 50.00 49.21 50.21 30.79 19.79 15.64 13.01 12.07 11.90 10.21 11.20 ...
This would be clear with this video below:
Again. play with two publisher to examine MQTT behaviour. And see what happened.
What is Next 🤔?
This is not the only way to handle MQTT. We need to explore more alternative, and make some enhancement, such as asynchronous MQTT, and handling data in time series fashioned.
Consider continue reading [ Python - MQTT - Enhanced ].