Where to Discuss?

Local Group


Goal: Realtime data plot using Pandas.

This extra miles does not have anything to do with MQTT itself.

Disagreeable Code

The fact that the previous code works, does not means my previous code is not sucks. The code above also do not have any clarity about, how the limit view boundary works.

The class above also doing many things at once, collecting data in dataframe, and also drawing the plot chart. This makes the code less readable. Unreadable code above is unpleasant!

My code should be made to be ready to be read by other developer. My understanding reflect in my code. Shame on me if it looks ridiculously amateur.

Better Code

Consider clean up. Make things simple. Refactor to internal design, while preserving your working code. Read again, to avoid mistake sooner. No code enhancement, no new feature. Remain intact from external view.

Good separate of concern, can be achieved by making modular class. Extracting main class into two more supporting classes. This will save you time, whenever you read your own code.

Better code does not means not sucks either. But at least I can do something about it.


Face it like a man

We need to take care carefully, what property relies on specific class. And what property should be available in both supporting classes. Fail to do so, would only make your class more complex.

MQTT: Clean-Up: Overview: Three Panes


I don’t know if what I do is right, or just another blunder. But here it is, what I did.

MQTT: Clean-Up: Class Diagram

I have limited knowledge in UML and class refactoring. Please inform me if there’s something I can improve. Or even if I’m wrong.

A: Overview

Python Source Code

May the open source, be with you.

The source will splitted into multi classes

Main Program

The main program would be as simple as below:

import asyncio
from mq17Subscriber import mqSubscriber

mqSub = mqSubscriber()

except KeyboardInterrupt:

MQTT: Clean-Up: Overview: Program entry Point

Running Script

You can runt the code by this command

❯ python 17-plot.py
25 : 21:30:08 - 21:30:24

Skeleton: mqSubscriber

You can imagine how simple the main class should be.

And each class skeleton is as below:

class mqSubscriber:

  def __init__(self):
  def update(self, new_num):
  async def main(self):

MQTT: Clean-Up: Class Overview: mqSubscriber Skeleton

My purpose is to get the program flow right. Not so quick and dirty as previous article.

Skeleton: mqDataFrame

This is almost exactly the same. Except the last two properties getter.

class mqDataFrame:
  def __init__(self, max_duration):
  def build_df_smooth(self):
  def remove_old(self, index, new_time):
  def update_data(self, index, new_num, new_time):
  def get_timeframe(self):
  def get_df_smooth(self):

MQTT: Clean-Up: Class Overview: mqDataFrame Skeleton

Skeleton: mqChart

class mqChart:
  def __init__(self, max_duration):
  def chart_setup(self):
  def update_ylimit(self, new_num):
  def update_xlimit(self, index, new_time):
  def update_view(self, index, timeframe, df_smooth):

MQTT: Clean-Up: Class Overview: mqChart Skeleton

You can see how the methods are quite different, with previous one. I splitted the limit updating method into each axis.

B: Subscriber Class

The main Class

Required Package

This require both supporting class:

import asyncio
import asyncio_mqtt as aiomqtt

from datetime      import datetime
from mq17Chart     import mqChart
from mq17DataFrame import mqDataFrame

MQTT: Clean-Up: Subscriber Class: Package Requirement

Package Constructor

The index properties is lingering anywhere.

class mqSubscriber:
  # Maximum Buffered Data in Second

  def __init__(self):
    # save initial parameter
    self.index = 0

    self.mqCha = mqChart(self.MAX_DURATION)
    self.mqDfr = mqDataFrame(self.MAX_DURATION)

MQTT: Clean-Up: Subscriber Class: Constructor

Then create both instance of supporting classes.

Main Method

This is very common. Very similar from official document of MQTT.

  async def main(self):
    client = aiomqtt.Client(
      hostname="localhost", port=1883)

    async with client:
      async with client.messages() as messages:
        await client.subscribe("topic/signal")

        async for message in messages:
          new_num = float(message.payload)

MQTT: Clean-Up: Subscriber Class: Main Methods

The only thing it does is calling update method.

Message Handler: Update


The main program flow can be tidied as below code.

First, the handle the data:

  def update(self, new_num):
    new_time = datetime.now()
        self.index, new_num, new_time)
    self.index += 1

    if self.index > 3:
      self.mqDfr.remove_old(self.index, new_time)

And then handle the chart:


      self.index, new_time)


We relies heavily on index properties.

MQTT: Clean-Up: Subscriber Class: Update

The steps can be summarized as:

  1. Update DataFrame
  2. Interpolation: Build smooth Data
  3. Purge old buffer.
  4. Update Limit, both horizontal and vertical.
  5. Update Realitme Chart

C: Data Class

All the dataframe related should be placed here.

Required Package

Pandas, and SciPy

import pandas as pd
import numpy  as np

from datetime import datetime, timedelta
from scipy    import interpolate

MQTT: Clean-Up: Data Class: Package Requirement


Prepare both empty dataframe.

class mqDataFrame:
  def __init__(self, max_duration):
    # save initial parameter
    self.max_duration = max_duration

    self.timeframe = pd.DataFrame({
      "time": [], "ftime": [], "temp": [] })
    self.df_smooth = pd.DataFrame({
      "time": [], "temp": [] })

MQTT: Clean-Up: Data Class: Constructor

Properties Getter

Both Getter.

  def get_timeframe(self):
    return self.timeframe

  def get_df_smooth(self):
    return self.df_smooth

MQTT: Clean-Up: Data Class: Properties Getter

You can call the property directly without this getter.

Timeframe Data Update

No changes intact

  def update_data(self, index, new_num, new_time):
    new_time_as_float = datetime.\

    new_pair = pd.DataFrame({
      "time" : new_time,
      "ftime": new_time_as_float,
      "temp" : new_num
    }, index=[index])

    self.timeframe = pd.concat(
      [self.timeframe, new_pair])

MQTT: Clean-Up: Data Class: Timeframe Update


No changes intact.

  def build_df_smooth(self):
    df_t = self.timeframe['ftime']   
    tck = interpolate.splrep(
        df_t, self.timeframe['temp'], s=0)

    idx = self.timeframe.index
    fst = df_t[idx[0]]
    lst = df_t[idx[-1]]

    xfit = np.arange(fst, lst, 0.1)
    yfit = interpolate.splev(xfit, tck, der=0)
    tfit = [datetime.fromtimestamp(f) for f in xfit]

    self.df_smooth = pd.DataFrame({
      "time": tfit,
      "temp": yfit

MQTT: Clean-Up: Data Class: Interpolation

Purge Old Signal

remove data outside boundary

I simplify the code. Move the limit start, into chart class, instead of putting the line here.

  def remove_old(self, index, new_time):
    obsolete = new_time \
      - timedelta(seconds=self.max_duration)

    self.timeframe = self.timeframe\
      [self.timeframe.time >= obsolete]

MQTT: Clean-Up: Data Class: Purge Old Signal

We should not put any chart calculation here.

D: Chart Class

All the chart related should be placed here.

I have to simplify boudary calculation.

Required Package

Again, Matplotlib!

import matplotlib
import matplotlib.pyplot as plt
import matplotlib.dates as md

from datetime import timedelta
from time     import sleep

MQTT: Clean-Up: Chart Class: Package Requirement


I move the chart nitialization here, including interactive mode.

class mqChart:
  def __init__(self, max_duration):
    # save initial parameter
    self.max_duration = max_duration

    self.lim_lower = 10
    self.lim_upper = 40

    self.lim_start = None
    self.lim_end   = None



MQTT: Clean-Up: Chart Class: Constructor

Vertical Boundary

This is simple, let’s separate it here.

  def update_ylimit(self, new_num):
    self.lim_lower = min(new_num, self.lim_lower)
    self.lim_upper = max(new_num, self.lim_upper)

      self.lim_lower - 15, self.lim_upper + 10)

MQTT: Clean-Up: Chart Class: Vertical Boundary

Horizontal Boundary

This is not complex, but rather something new. I put all the limit stuff here. Including the moving chart calculation, by changing left limit in x axis..

  def update_xlimit(self, index, new_time):
    if index <= 1:
      self.lim_start = new_time
      self.lim_end = new_time

      obsolete = new_time \
        - timedelta(seconds=self.max_duration )

      if index > self.max_duration:
        self.lim_start = obsolete

        self.lim_start - timedelta(seconds=1),
        self.lim_end   + timedelta(seconds=1))

MQTT: Clean-Up: Chart Class: Horizontal Boundary

Boundary Status

If you wish for debugging purpose.

      st = self.lim_start.strftime("%H:%M:%S")
      nd = self.lim_end.  strftime("%H:%M:%S")
      print("%d : %s - %s" % (index, st, nd),
        end="\r", flush=True)

MQTT: Clean-Up: Chart Class: Boundary Status

Chart Setup

Verbatim Copy

No code changes intact.

This original code is long, I only put the first four lines.

  def chart_setup(self):
    self.fig, self.axes = plt.subplots()
      self.lim_lower, self.lim_upper)


MQTT: Clean-Up: Chart Class: Chart Setup

You can check the complete code in github source above.

Draw Chart

This has a slight change.

  def update_view(self, index, timeframe, df_smooth):

    if index > 3:

    # tips: not to steal focus


MQTT: Clean-Up: Chart Class: Draw Both Chart

Pretty different with previous code right?


This should be enough for minimum viable prototype.

You should be ready to integrate this code, combining with your own project, publish to your local friend, and gather your own team to make prototype in your local area.

What do you think?

Farewell. We shall meet again.