问题描述
摘要:我有一个微型位连接到rpi-zero。我对微比特进行了编码,当按下A button
时,它将通过uart.write
将数据发送到rpi-zero。
在此测试中,微比特将uart.write("Test")
并向rpi-zero写入一个“测试”字。
我的最终目标是使用rpi-zero的BLE功能来充当控制设备,并通过微比特按钮发送指令。
我发现这个GATT Server Code用python为rpi编写。完全没有问题。
下面的代码将用于侦听微比特uart服务,并检查接收到的数据是否为"Test"
:
import serial
serialPort = serial.Serial(port = "/dev/ttyACM0",baudrate=115200,bytesize=8,timeout=0.5,stopbits=serial.STOPBITS_ONE)
serialString = " "
(serialPort.in_waiting > 0)
while True:
serialString = serialPort.readline()
if serialString == b'Test':
print("Yes")
else:
print("F")
但是真正的问题是当我尝试将此循环代码实现为GATT服务器代码时。
我似乎无法理解如何将此值传递给self.send_tx
此外,GATT服务器代码中似乎已经存在一个全局循环。因此,我尝试使用线程来同时运行两个函数,但是当我添加self.send_tx("Test")
时,它只会抛出错误Self is not defined
。
对不起,我对编码一无所知,有人知道对此可能的解决方法吗?谢谢
这是完整的代码:
import sys
import threading
import dbus,dbus.mainloop.glib
import serial
from gi.repository import GLib
from example_advertisement import Advertisement
from example_advertisement import register_ad_cb,register_ad_error_cb
from example_gatt_server import Service,Characteristic
from example_gatt_server import register_app_cb,register_app_error_cb
BLUEZ_SERVICE_NAME = 'org.bluez'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
UART_SERVICE_UUID = '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
UART_RX_CHaraCTERISTIC_UUID = '6e400002-b5a3-f393-e0a9-e50e24dcca9e'
UART_TX_CHaraCTERISTIC_UUID = '6e400003-b5a3-f393-e0a9-e50e24dcca9e'
LOCAL_NAME = 'rpi-gatt-server'
mainloop = None
serialPort = serial.Serial(port = "/dev/ttyACM0",timeout=0.8,stopbits=serial.STOPBITS_ONE)
serialString = " "
(serialPort.in_waiting > 0)
class TxCharacteristic(Characteristic):
def __init__(self,bus,index,service):
Characteristic.__init__(self,UART_TX_CHaraCTERISTIC_UUID,['notify'],service)
self.notifying = False
GLib.io_add_watch(sys.stdin,GLib.IO_IN,self.on_console_input)
def on_console_input(self,fd,condition):
s = fd.readline()
if s.isspace():
pass
else:
self.send_tx(s)
return True
def send_tx(self,s):
if not self.notifying:
return
value = []
for c in s:
value.append(dbus.Byte(c.encode()))
self.PropertiesChanged(GATT_CHRC_IFACE,{'Value': value},[])
def StartNotify(self):
if self.notifying:
print("yes")
return
self.notifying = True
def StopNotify(self):
if not self.notifying:
print("no")
return
self.notifying = False
class RxCharacteristic(Characteristic):
def __init__(self,UART_RX_CHaraCTERISTIC_UUID,['write'],service)
def WriteValue(self,value,options):
print('remote: {}'.format(bytearray(value).decode()))
class UartService(Service):
def __init__(self,index):
Service.__init__(self,UART_SERVICE_UUID,True)
self.add_characteristic(TxCharacteristic(bus,self))
self.add_characteristic(RxCharacteristic(bus,1,self))
class Application(dbus.service.Object):
def __init__(self,bus):
self.path = '/'
self.services = []
dbus.service.Object.__init__(self,self.path)
def get_path(self):
return dbus.ObjectPath(self.path)
def add_service(self,service):
self.services.append(service)
@dbus.service.method(DBUS_OM_IFACE,out_signature='a{oa{sa{sv}}}')
def GetManagedobjects(self):
response = {}
for service in self.services:
response[service.get_path()] = service.get_properties()
chrcs = service.get_characteristics()
for chrc in chrcs:
response[chrc.get_path()] = chrc.get_properties()
return response
class UartApplication(Application):
def __init__(self,bus):
Application.__init__(self,bus)
self.add_service(UartService(bus,0))
class UartAdvertisement(Advertisement):
def __init__(self,index):
Advertisement.__init__(self,'peripheral')
self.add_service_uuid(UART_SERVICE_UUID)
self.add_local_name(LOCAL_NAME)
self.include_tx_power = True
def find_adapter(bus):
remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME,'/'),DBUS_OM_IFACE)
objects = remote_om.GetManagedobjects()
for o,props in objects.items():
if LE_ADVERTISING_MANAGER_IFACE in props and GATT_MANAGER_IFACE in props:
return o
print('Skip adapter:',o)
return None
def check():
while True:
serialString = serialPort.readline()
if serialString == b'Test':
print("Okay,Test")
self.send_tx("Test")
else:
print("No")
def main():
global mainloop
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
adapter = find_adapter(bus)
if not adapter:
print('BLE adapter not found')
return
service_manager = dbus.Interface(
bus.get_object(BLUEZ_SERVICE_NAME,adapter),GATT_MANAGER_IFACE)
ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME,LE_ADVERTISING_MANAGER_IFACE)
app = UartApplication(bus)
adv = UartAdvertisement(bus,0)
mainloop = GLib.MainLoop()
service_manager.Registerapplication(app.get_path(),{},reply_handler=register_app_cb,error_handler=register_app_error_cb)
ad_manager.Registeradvertisement(adv.get_path(),reply_handler=register_ad_cb,error_handler=register_ad_error_cb)
try:
mainloop.run()
except KeyboardInterrupt:
adv.Release()
if __name__ == '__main__':
p1 = threading.Thread(target=main)
p2 = threading.Thread(target=check)
p1.start()
p2.start()
解决方法
我认为这可能是XY problem。我了解到的是,您想通过蓝牙低功耗(BLE)从micro:bit向RPi发送按钮按下。如果是这样,那么有一种更有效的方法可以做到这一点。
在micro:bit的Bluetooth Profile中,可以使用按钮服务和按钮A状态特征。他们的UUID是:
BTN_SRV = 'E95D9882-251D-470A-A062-FA1922DFA9A8'
BTN_A_STATE = 'E95DDA90-251D-470A-A062-FA1922DFA9A8'
您需要在micro:bit上设置GATT服务器。最有效的方法是使用https://makecode.microbit.org/#editor创建以下内容:
如果左侧菜单上没有蓝牙,请单击屏幕右上方的齿轮,选择Extensions
,然后选择Bluetooth
来替换{{1 }}扩展名。
通过使用pydbus库进行D-Bus绑定,可以简化RPi上的GATT客户端代码。
使用蓝牙,而不是具有Radio
循环并不断轮询micro:bit,让事件循环从Button A特性(在这种情况下)预订通知更为有效。每次按下或释放micro:bit按钮A时,都会调用我命名为while
的功能。
下面的代码不进行micro:bit和RPi之间的初始配对。由于配对是一次性配置步骤,因此我手动执行此操作。
这是RPi的示例python代码,该代码响应micro:bit上被按下的Button A ...
btn_handler