Where to Discuss?

Local Group

Preface

Goal: Realtime data plot using Pandas.

We have solved the interpolation for time series. It is a good time to apply our knowledge into realtime data with MQTT.

This is a long article, but worth to read. You might consider to take a break, making a cup of coffe, before we start.


8: Smooth Realtime Plotting Using Time Series Dataframe

Main Course

No more baby crawl, let’s jump into real project.

Python Source Code

Use the source, Luke!

This time, source code reading is a must.

Required Package

We have all we need

import asyncio
import asyncio_mqtt  as aiomqtt

import matplotlib
import matplotlib.pyplot as plt
import matplotlib.dates as md

import pandas as pd
import numpy  as np

from datetime import datetime, timedelta
from time     import sleep
from scipy    import interpolate

MQTT: Smooth Timeseries Plot: Required Package

Skeleton

Only a few method, but teach has its own complexity.

class mqSubscriber:
  def __init__(self):   
  def chart_setup(self):
  def update_limit(self, new_time, new_num):
  def build_df_smooth(self):
  def remove_old(self, new_time):
  def update_data(self, msg):
  def update_view(self):
  async def main(self):

MQTT: Smooth Timeseries Plot: Skeleton

Constructor

class mqSubscriber:
  # Maximum Buffered Data in Second
  MAX_DURATION = 40

  def __init__(self):
    # save initial parameter
    self.index = 0
    self.timeframe = pd.DataFrame({
      "time": [], "ftime": [], "temp": [] })

    self.lim_lower = 10
    self.lim_upper = 40

    self.lim_start = None
    self.lim_end   = None

    self.chart_setup()

MQTT: Smooth Timeseries Plot: Constructor

Main Method

Interactive On

The rest is just ordinary asyncio_mqtt routine.

  async def main(self):
    plt.ion()
    plt.show()

    client = aiomqtt.Client(
      hostname="localhost", port=1883)

    async with client:
      async with client.messages() as messages:
        await client.subscribe("topic/signal")
        async for message in messages:
          self.update_data(message)
          self.update_view()

MQTT: Smooth Timeseries Plot: Asynchronous Main

There is two sequence task for each message.

  1. Updating the data by preparing frame such as interpolation, and then we can go to the next task.
  2. Updating View: Updating Limit, Remove obsolete data.

The code is wrapped in each own respective method.

  self.update_data(message)
  self.update_view()

Updating Data Frame

Using float type timestamp

We have three data series. Actually we can dispose the time series, but I keep it there for my own observation.

  def update_data(self, msg):
    new_num  = float(msg.payload)
    new_time = datetime.now()
    new_time_as_float = datetime.timestamp(new_time)

    new_pair = pd.DataFrame({
      "time" : new_time,
      "ftime": new_time_as_float,
      "temp" : new_num
    }, index=[self.index])

    self.index += 1
    self.timeframe = pd.concat(
      [self.timeframe, new_pair])

MQTT: Smooth Timeseries Plot: Update Data

I make an additional enhancement here. We can deal with this later in this article.

  def update_data(self, msg):
    ...

    self.update_limit(new_time, new_num)
    self.remove_old(new_time)

Right now we need to discuss the data, before additinal chart enhancement.

For simplicity for this article, I remove this feature. You can add yourself, for convenience.

Interpolation

Smoothing Lines with Moving Index

We have once solve the time series issue using float type. But there come another issue.

The other issue with interpolation is, getting the index for moving data. We should not assume the index start from zero, because we regularly dispose obsolete data. This means we also deal with moving index.

Getting secondary line for horizontal axes can be done this way. You can check by printing the data series in the script into the console yourself.

    df_t = self.timeframe['ftime']   

    idx = self.timeframe.index
    fst = df_t[idx[0]]
    lst = df_t[idx[-1]]

    xfit = np.arange(fst, lst, 0.1)

Now we can apply the code above for our interpolation.

  def build_df_smooth(self):
    df_t = self.timeframe['ftime']   
    tck = interpolate.splrep(
        df_t, self.timeframe['temp'], s=0)

    idx = self.timeframe.index
    fst = df_t[idx[0]]
    lst = df_t[idx[-1]]

    xfit = np.arange(fst, lst, 0.1)
    yfit = interpolate.splev(xfit, tck, der=0)
    tfit = [datetime.fromtimestamp(f) for f in xfit]

    self.df_smooth = pd.DataFrame({
      "time": tfit,
      "temp": yfit
    })

MQTT: Smooth Timeseries Plot: Build Smooth

I have already give simple interpolation example in previous article. If you still confused about how it works, you can googled for more simple interpolation example for a while, then go back here when you get the clue.

Setting Up Chart

Making pretty matplotlib chart always take long setup code. But it worth the looks.

  def chart_setup(self):
    self.fig, self.axes = plt.subplots()
    self.axes.set_ylim(
      self.lim_lower, self.lim_upper)

    xfmt = md.DateFormatter('%H:%M:%S')
    self.axes.xaxis.set_major_formatter(xfmt)
    plt.subplots_adjust(bottom=0.3)
    plt.xticks(rotation=90, ha='right')

    plt.title('Value by Time')
    plt.xlabel('Time')
    plt.ylabel('Value')

MQTT: Smooth Timeseries Plot: Chart Setup

We can go further exploiting any matplotlib feature, such major grid, and minor grid.

    self.axes.grid(
      which='major',
      color = '#004d40',
      linestyle = '--',
      linewidth = 0.8)

    self.axes.grid(
      which='minor',
      color = '#009688',
      linestyle = ':',
      linewidth = 0.4)

    self.axes.minorticks_on()

MQTT: Smooth Timeseries Plot: Chart Grid

We also need to add event handler to close the script, whenever the chart window closed.

    self.fig.canvas.mpl_connect(
      'close_event', exit)

And finally we should draw both lines:

  • Original Data as marker only
  • Smooth Data as lines
    self.line, = self.axes.plot(
      [], [], linestyle="None",
      marker='+', markerfacecolor='#00796b')
    self.smooth, = self.axes.plot(
      [], [], color='#1976d2')

MQTT: Smooth Timeseries Plot: Chart Plot

I also use material design, for nice color. The trick with design is, make variation details, but do not abuse. For example, we plot original data with marker, but please keep small. And do not use thick lines, unless you have good reason about it.

MQTT: Smooth Timeseries Plot: Chart Preview

Not really impressive. This is just a minimum one, without legend and stuff.

For this article, I intent to make this as simple as possible. You can read my other article to setup more complex view.

Updating Limit

It works on my computer.

Since we want to do update chart boundary manually, we have to create our own method to setup our limit boundary.

This is what I have got so far, based on trial and error, to find the right time difference. Good enough for prototype, but maybe need to be simplified.

  def update_limit(self, new_time, new_num):
    self.lim_lower = min(new_num, self.lim_lower)
    self.lim_upper = max(new_num, self.lim_upper)

    self.axes.set_ylim(
      self.lim_lower - 15, self.lim_upper + 10)

    self.lim_end = new_time
    if self.index <= 1:
      self.lim_start = new_time
    else:
      self.axes.set_xlim(
        self.lim_start - timedelta(seconds=1),
        self.lim_end   + timedelta(seconds=1))

I must admit I’m new in this area. Since I can’t wait for somebody to code for me, I brave myself to get this done. My code could be wrong, not enough test. Use it with your own risk.

MQTT: Smooth Timeseries Plot: Update Limit

I could be wrong but there might be somewhere, a ready standard do this task.

Meanwhile, you are free to make your own limit algorithm.

Cut Obsolete Data

Moving Time Series

The idea is to make, animation like chart.

  def remove_old(self, new_time):
    # remove old data
    obsolete = new_time \
      - timedelta(seconds=self.MAX_DURATION)

    self.timeframe = self.timeframe\
      [self.timeframe.time >= obsolete]

    if self.index >= self.MAX_DURATION:
      self.lim_start = obsolete + timedelta(seconds=2)

It works on my computer.

This is a last minute enhancement. And I know this is not a good code.

MQTT: Smooth Timeseries Plot: Cut Obsolete Data

Not much test yet. Use it with your own risk.

Updating Chart

Finally, draw both lines.

We still have to assign each data to matplotlib. I’m not sure which one is primary and which one is secondary. I guess both are important for me.

  def update_view(self):
    if self.index > 3:
      self.build_df_smooth()
      self.line.set_xdata(self.timeframe.time)
      self.line.set_ydata(self.timeframe.temp)
      self.smooth.set_xdata(self.df_smooth.time)
      self.smooth.set_ydata(self.df_smooth.temp)

    # tips: not to steal focus
    plt.draw()
    self.fig.canvas.draw_idle()
    self.fig.canvas.start_event_loop(0.001)

    sleep(0.1)

MQTT: Smooth Timeseries Plot: Update View

And don’t forget my attribution, as my respect for stackoverflow people. The answer of most basic coding issue.

Execute: Run

Be brave, grow up, and run!

mqSub = mqSubscriber()
asyncio.run(mqSub.main())

MQTT: Smooth Timeseries Plot: Execute: Run

Chart Preview

The limit is your imagination

Run, test, run test, stop the publisher, experiment with dual publisher, and so on. You can figure out, the result of code above yourself, or by observing this video below:

Performance

I also have checked the htop. The CPU usage is still low. I guess we can use this script for use with further project.


What is Next 🤔?

Consider clean up.

The fact that the code above works, does not means the code above is not sucks.

Make things simple. Refactor to internal design, while preserving your working code.

Better code does not means not sucks either. But at least I can do something about it.

Consider continue reading [ Python - MQTT - Clean Up ].