在python中使用带有bluepy的BLE加速等待通知

问题描述

我使用 bluepy 从配备低功耗蓝牙设备的 Python 传感器中获取数据。传感器是我自己设计的设备。因此,我可以访问传感器板上的处理。

现在需要 2 秒才能从传感器获取数据。在传感器上,数据采集过程仅需 200 毫秒。此延迟发生在“等待通知”(1.7 秒)。我在下面粘贴了我的整个蓝牙通信类。但是,简要代码如下:

  1. 通过蓝牙向传感器写入一个字符串(传感器解析它​​以查看它应该返回什么样的数据)
  2. 使用 while self.peripheral.waitForNotifications 等待通知
  3. 读取数据(这很快

因此,这第 2 步需要时间(尽管传感器在大约 200 毫秒后开始发回)。

有谁知道是否可以加快第 2 步的速度?

import struct
import bluepy.btle as btle
import numpy
import time


class ReadDelegate(btle.DefaultDelegate):
    def __init__(self):
        self.data = b''

    def reset(self):
        self.data = b''

    def handleNotification(self,cHandle,data):
        self.data = self.data + data

    @property
    def data_length(self):
        return len(self.data)


class myHC08:
    def __init__(self,verbose=False):
        self.verbose = verbose
        self.mac = '34:14:B5:50:22:ED'
        self.write_service_id = 4
        self.write_service = None
        self.delegate = ReadDelegate()
        self.peripheral = None

    def print_output(self,message):
        print('HC08: %s' % message)

    def connect(self):
        self.peripheral = btle.Peripheral(self.mac)
        self.peripheral.withDelegate(self.delegate)
        s = self.write_service_id
        services = self.peripheral.getServices()
        self.write_service = self.peripheral.getServiceByUUID(list(services)[s].uuid)

    def wait_for_notification(self,time_out=5):
        start = time.time()
        while self.peripheral.waitForNotifications(1):
            waiting = time.time() - start
            if waiting > time_out: return False,False
        waiting = time.time() - start
        return True,waiting

    def wait_for_data(self,min_bytes,time_out=5):
        start = time.time()
        while self.delegate.data_length < min_bytes:
            waiting = time.time() - start
            if waiting > time_out: return False,waiting

    def write(self,message,unpack_string=False):
        if self.verbose: self.print_output('Writing %s' % message)
        self.delegate.reset()
        c = self.write_service.getcharacteristics()[0]
        c.write(bytes(message,"utf-8"))
        self.verbose: print("HC08 Receiving %i bytes" % min_bytes)

        if self.verbose: self.print_output('Waiting for notification')
        success,duration = self.wait_for_notification()
        if not success: return False
        if self.verbose: self.print_output('Duration: %.10f' % duration)

        if self.verbose: self.print_output('Waiting for data')
        success,duration = self.wait_for_data(min_bytes=min_bytes)
        if not success: return False
        if self.verbose: self.print_output('Duration: %.10f' % duration)

        received = self.delegate.data
        if unpack_string:
            received = struct.unpack(unpack_string,received)
            received = numpy.array(received)
        return received

    def write_robust(self,unpack_string=False,max_retry=3):
        for attempt in range(max_retry):
            if self.verbose: self.print_output('Attempt %i out of %i' % (attempt,max_retry))
            result = self.write(message,unpack_string)
            if result is not False: return result
            if not result: self.reconnect()
        if not result: raise ('Error Could not get data in %i attempts.' % max_retry)

    def disconnect(self):
        self.peripheral.disconnect()

    def reconnect(self):
        self.print_output('Reconnecting')
        for x in range(5): self.disconnect()
        self.connect()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)