Where to Discuss?

Local Group

Preface

Goal: Realtime data plot using Pandas.

Instead of internal random data. We separate the source data, and the receiver. This can be done easily with MQTT.

And we can also adjust the source data, so we can simulate sensor value change, such as temperature and so on.

In real sensor, we can change thermocouple directly, by giving heat from lighter, or other source. And take the heat by cold material.

Preparation

Run MQTT broker

❯ mosquitto -d

Publisher and Subscriber

Producer and Customer

or

Sender and Receiver

This MQTT example require two running scripts.

  1. Publisher and,
  2. Subscriber

MQTT: Publisher and Subscriber


1: Publisher

Produce MQTT Packet containing random values

Official Documentation

You should consider to read the official documentation, before reading this article thoroughly.

Python Source Code

You can just read the source as an overview.

Package Requirement

Paho MQTT

We need Paho MQTT to run this script.

import paho.mqtt.client as mqtt
import random

from time import sleep

MQTT: Publisher: Brief

Skeleton

This only consist these methods below:

class mqPublisher:
  def __init__(self):
  def update_num(self):
  def main(self):

MQTT: Publisher: Skeleton

Constructor

Exactly like our previous class.

class mqPublisher:
  def __init__(self):
    # save initial parameter
    self.lim_lower  =  5
    self.lim_upper  = 40
    self.num_normal = 21
    self.current    = self.num_normal

MQTT: Publisher: Constructor

Updating Values

With Random Number

Just this method. Over and over again. No changes intact.

  def update_num(self):
    while True:
      neo = self.current
      neo += random.uniform(-5, 5)
      if self.lim_lower <= neo <= self.lim_upper:
        break

    self.current = neo

MQTT: Publisher: Updating Values

Main Method

The only differences is here. We sent packet to network, instead of print number in console.

  def main(self):
    client = mqtt.Client()
    client.connect("localhost",1883,60)

    while True:
      self.update_num()
      client.publish(
        "topic/signal", str(self.current))
    
      sleep(1)

    client.disconnect();

MQTT: Publisher: Main

Execute: Run

Consider run the publisher script.

mqPub = mqPublisher()
mqPub.main()

Nothing will be happened.

❯ python 11-pub.py

Every MQTT packet sent to network, every one second.

We need to capture the packet using subscriber.


2: Subscriber

Consume MQTT Packet containing any value

The script is very short. But be aware, that it is going to grow to a complex script later.

Python Source Code

Reading the source would help in understanding this article.

Package Requirement

Paho MQTT

import paho.mqtt.client as mqtt

MQTT: Subscriber: Brief

Skeleton

The class only contain event handler.

class mqSubscriber:
  def on_connect(self,
      client, userdata, flags, rc):
  def on_message(self,
      client, userdata, msg):
  def main(self):

MQTT: Subscriber: Skeleton

Constructor

So short that this class does not even need a constructor.

Event Handler: On Connect

The PAHO MQTT require us to use this event handler.

  def on_connect(self,
      client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("topic/signal")

MQTT: Subscriber: On Connect

Event Handler: On Message

Capturing Packet

  def on_message(self,
      client, userdata, msg):
    float_num = float(msg.payload.decode())
    print("%.2f" % float_num)

MQTT: Subscriber: On Message

Main Method

The main class copied from official documentation.

  def main(self):
    client = mqtt.Client()
    client.connect("localhost",1883,60)

    client.on_connect = self.on_connect
    client.on_message = self.on_message

    client.loop_forever()

MQTT: Subscriber: Main

Dual Output in CLI

We can run bu making class instance, and then call main methods.

mqSub = mqSubscriber()
mqSub.main()

Consider run both script, the publisher and subscriber. The subscriber script will show captured value.

❯ python 11-sub.py
Connected with result code 0
20.32
17.19
16.10
18.93
15.44
12.69
11.13
11.56
16.30
13.54
15.25

MQTT: Subscriber: Dual Panel Script

You can run two subscriber script, and see what happened. Both would receive the same value.

Comparing Topic

Between The Publisher and The Subscriber, Flow the Topic.

You may examine, how the topic from one script to another.

MQTT: Topic Compare

Running Two Publisher

Be creative! Make your own experiment.

Note that value can be received anytime by the subscriber. You can play, running two publisher script, with different initial value, and see what happened.

This got me to think anything else. What if I can adjust the value on the fly? So I can examine two publisher script better.


3: Adjustable Publisher

Running task asynchronously

We are actually want this script to do separate task concurrently.

  1. Publishing Number
  2. Adjusting Number

Python Source Code

Use the source, Luke.

For quick overview, just use the source.

Package Requirement

Beside the asyncio, we require the aioconsole to handle the console input.

import paho.mqtt.client as mqtt
import asyncio
import random
from aioconsole import ainput

MQTT: Adjustable Publisher: Brief

Skeleton

We need to separate the task: produce and adjust as shown in skeleton below:

class mqPublisher:
  def __init__(self):

  def update_num(self):
  def set_within(self, num):

  async def do_produce(self):
  async def do_adjust(self):
  async def main(self):

mqPub = mqPublisher()
asyncio.run(mqPub.main())

MQTT: Adjustable Publisher: Skeleton

Constructor

No more event handler

We can put MQTT connection inside the constructor, or main method, whatever suit your case.

class mqPublisher:
  def __init__(self):
    # save initial parameter
    self.lim_lower  =  5
    self.lim_upper  = 70
    self.num_normal = 21
    self.current    = self.num_normal

    self.client = mqtt.Client()
    self.client.connect("localhost",1883,60)

MQTT: Adjustable Publisher: Constructor

Main

Managing Asynchronous Tasks

The main method manage two task separately, using async and await keyword. So that both tasks run concurrently, without affecting each other.

  async def main(self):
    task_produce = asyncio.create_task(
      self.do_produce())
    task_adjust = asyncio.create_task(
      self.do_adjust())

    await(task_produce)
    await(task_adjust)

MQTT: Adjustable Publisher: Main

Updating Values

Still intact no changes.

Adjusting Values

Using AIOConsole

We need to wrap this task in async method.

All we need to do is waiting to input adjustment. Then check if the next value is already reach the adjustment value.

  async def do_adjust(self):
    while True:

      float_num = float(
        await ainput('Adjustment: '))
      float_num = self.set_within(float_num)
      print("Adjust to: %.2f" % float_num) 

      while abs(self.current - float_num) >  1:
        self.current = (self.current + float_num)/2
        await asyncio.sleep(1)

MQTT: Adjustable Publisher: Adjusting Values

Changing Value

Gradually changed

Changing the value instantly, is considered unnatural. We need to move step by step, by taking the mean value between two number:

self.current = (self.current + float_num)/2

To check if the value is near adjusment value, I give tolerance within one number.

while abs(self.current - float_num) >  1:

You can change the tolerance value, with whatever number suitable for your case.

Smoother Change

You can even make the transition smoother.

      while abs(self.current - float_num) >  1:
        fourth = (float_num - self.current)/4
        self.current = self.current + fourth
        await asyncio.sleep(0.7)

However, the data only be sent for each one second interval.

Limiting Adjustment

Controlling Value

I also add value controlling. So that the value won’t go out of defined boundary.

  def set_within(self, num):
    if num < self.lim_lower:
       num = self.lim_lower 
    if num > self.lim_upper:
       num = self.lim_upper
    return num

MQTT: Adjustable Publisher: Limiting Adjustment

Publishing Signal

Produce MQTT Packet

We can produce the packet as usual. But this time in async method.

  async def do_produce(self):
    while True:
      self.update_num()
      self.client.publish(
        "topic/signal", str(self.current))
    
      await asyncio.sleep(1)

MQTT: Adjustable Publisher: Produce Signal

Execute: Run

Running asynchronous task would be a little bit diffeent.

mqPub = mqPublisher()
asyncio.run(mqPub.main())

MQTT: Adjustable Publisher: Program Entry Point

Dual Output in CLI

We can play with the script and adjust the value

❯ python 12-pub.py
<div class="highlight"><pre style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4"><code class="language-bash" data-lang="bash"></code></pre></div>
Adjustment: 50
Adjust to: 50.00
Adjustment: 10
Adjust to: 10.00

The subscriber will received the value according to the adjustment.

❯ python 11-sub.py
<div class="highlight"><pre style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4"><code class="language-bash" data-lang="bash"></code></pre></div>
❯ python 11-sub.py
Connected with result code 0
...
22.14
22.01
36.35
43.98
47.82
48.87
50.00
49.21
50.21
30.79
19.79
15.64
13.01
12.07
11.90
10.21
11.20
...

MQTT: Adjustable Publisher: Dual Panel Script

This would be clear with this video below:

Again. play with two publisher to examine MQTT behaviour. And see what happened.


What is Next 🤔?

This is not the only way to handle MQTT. We need to explore more alternative, and make some enhancement, such as asynchronous MQTT, and handling data in time series fashioned.

Consider continue reading [ Python - MQTT - Enhanced ].