使用 Bleak 和 PyQtGraph 包实时绘制 BLE 数据

问题描述

我正在尝试使用基于 ESP32 的传感器和 BLE 实时绘制传感器数据。我尝试使用 Bleak 并结合 PyQtGraph 包中的简单 scrolling example。我知道从 ESP32 中正确读取了传感器数据,但没有出现绘图,因此显然是错误地集成了代码

我当前的代码

import asyncio
from bleak import BleakClient
import time
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore,QtGui
import numpy as np

# BLE peripheral ID
address = "6C9F597F-7085-4AAB-806B-D2558588D50D"
UUID = "beb5483e-36e1-4688-b7f5-ea07361b26a8"

# Plot details
win = pg.GraphicslayoutWidget(show=True)
win.setwindowTitle('pyqtgraph example: Scrolling Plots')
p1 = win.addplot()
data1 = np.zeros(5)
curve1 = p1.plot(data1)


async def run(address,loop):
    global data1

    while True:
        time.sleep(5)

        async with BleakClient(address,loop=loop) as client:
            data = await client.read_gatt_char(UUID)

            #Parse sensor data from BLE characteristic
            sensor_data = data.decode('utf_8')
            print(sensor_data)
            sensor_data = sensor_data.split(",")
            temperature = sensor_data[4]

            #Update the array with newest data so plot will appear scroll
            def update():
                print(temperature)
                data1[:-1] = data1[1:]
                data1[-1] = temperature
                curve1.setData(data1)

        timer = pg.QtCore.QTimer()
        timer.timeout.connect(update)
        timer.start(1000)


loop = asyncio.get_event_loop()
loop.run_until_complete(run(address,loop))

if __name__ == '__main__':
    QtGui.QApplication.instance().exec_()

解决方法

你永远不应该在 pyqtgraph 中使用 time.sleep。

pyqtgraph 默认不支持 asyncio,但您可以使用 asyncqt 模块(python -m pip install asyncqt)或 qasync 模块(python -m pip install qasync)创建一个支持Qt。

import asyncio

import pyqtgraph as pg
from pyqtgraph.Qt import QtCore,QtGui
import numpy as np

from bleak import BleakClient

from asyncqt import QEventLoop,asyncSlot

# BLE peripheral ID
address = "6C9F597F-7085-4AAB-806B-D2558588D50D"
UUID = "beb5483e-36e1-4688-b7f5-ea07361b26a8"


class Window(pg.GraphicsLayoutWidget):
    def __init__(self,loop=None,parent=None):
        super().__init__(parent)
        self._loop = loop

        self.setWindowTitle("pyqtgraph example: Scrolling Plots")
        plot = self.addPlot()
        self._data = np.zeros(5)
        self._curve = plot.plot(self.data)
        self._client = BleakClient(address,loop=self._loop)

    @property
    def client(self):
        return self._client

    async def start(self):
        await self.client.connect()
        self.start_read()

    async def stop(self):
        await self.client.disconnect()

    @property
    def data(self):
        return self._data

    @property
    def curve(self):
        return self._curve

    async def read(self):
        data = await self.client.read_gatt_char(UUID)
        sensor_data = data.split(",")
        if len(sensor_data) >= 5:
            temperature_str = sensor_data[4]
            try:
                temperature = float(temperature_str)
            except ValueError as e:
                print(f"{temperature_str} not is float")
            else:
                self.update_plot(temperature)
        QtCore.QTimer.singleShot(5000,self.start_read)

    def start_read(self):
        asyncio.ensure_future(self.read(),loop=self._loop)

    def update_plot(self,temperature):
        self.data[:-1] = self.data[1:]
        self.data[-1] = temperature
        self.curve.setData(self.data)

    def closeEvent(self,event):
        super().closeEvent(event)
        asyncio.ensure_future(self.client.stop(),loop=self._loop)


def main(args):
    app = QtGui.QApplication(args)
    loop = QEventLoop(app)
    asyncio.set_event_loop(loop)

    window = Window()
    window.show()

    with loop:
        asyncio.ensure_future(window.start(),loop=loop)
        loop.run_forever()


if __name__ == "__main__":
    import sys

    main(sys.argv)