问题描述
我将 Raspbery PI4-b 与 Raspian Lite 发行版结合使用,使用 BLUEZ 编写虚拟键盘以创建 GATT HID (HOGP) 服务。
我采用了 BLUEZ 提供的心率监测器服务的 GATT 示例代码,并对其进行了修改以添加服务和特征,以尝试制作我的 GATT HID 键盘服务器。
我尽了最大努力让我的服务器正常工作,但无法完全弄清楚。所以想知道是否有人有我可以使用的此类服务的工作示例?
我的服务器可以正常连接到 Windows 10 和我的 Android 手机作为 ble 键盘外围设备,但两个客户端都不会启动报告特征通知功能,因此我可以开始发送包含按键数据的报告。
谢谢!!!提前获得任何帮助。
我从 BLUEZ 心脏监护仪示例创建的代码:
#!/usr/bin/env python3
# SPDX-License-Identifier: LGPL-2.1-or-later
import time
import dbus,dbus.exceptions
import dbus.mainloop.glib
import dbus.service
import array
try:
from gi.repository import GObject
except ImportError:
import gobject as GObject
import sys
from random import randint
mainloop = None
hidService = None
BLUEZ_SERVICE_NAME = 'org.bluez'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'
class InvalidArgsException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
class NotSupportedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.NotSupported'
class NotPermittedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.NotPermitted'
class InvalidValueLengthException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.InvalidValueLength'
class FailedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.Failed'
class Application(dbus.service.Object):
"""
org.bluez.GattApplication1 interface implementation
"""
def __init__(self,bus):
self.path = '/'
self.services = []
dbus.service.Object.__init__(self,bus,self.path)
self.add_service(HIDService(bus,0))
self.add_service(DeviceInfoService(bus,1))
self.add_service(BatteryService(bus,2))
#self.add_service(TestService(bus,3))
def get_path(self):
return dbus.ObjectPath(self.path)
def add_service(self,service):
self.services.append(service)
@dbus.service.method(DBUS_OM_IFACE,out_signature='a{oa{sa{sv}}}')
def GetManagedobjects(self):
response = {}
print('GetManagedobjects')
for service in self.services:
response[service.get_path()] = service.get_properties()
chrcs = service.get_characteristics()
for chrc in chrcs:
response[chrc.get_path()] = chrc.get_properties()
descs = chrc.get_descriptors()
for desc in descs:
response[desc.get_path()] = desc.get_properties()
return response
class Service(dbus.service.Object):
"""
org.bluez.GattService1 interface implementation
"""
PATH_BASE = '/org/bluez/example/service'
def __init__(self,index,uuid,primary):
self.path = self.PATH_BASE + str(index)
self.bus = bus
self.uuid = uuid
self.primary = primary
self.characteristics = []
dbus.service.Object.__init__(self,self.path)
def get_properties(self):
return {
GATT_SERVICE_IFACE: {
'UUID': self.uuid,'Primary': self.primary,'characteristics': dbus.Array(
self.get_characteristic_paths(),signature='o')
}
}
def get_path(self):
return dbus.ObjectPath(self.path)
def add_characteristic(self,characteristic):
self.characteristics.append(characteristic)
def get_characteristic_paths(self):
result = []
for chrc in self.characteristics:
result.append(chrc.get_path())
return result
def get_characteristics(self):
return self.characteristics
@dbus.service.method(DBUS_PROP_IFACE,in_signature='s',out_signature='a{sv}')
def GetAll(self,interface):
if interface != GATT_SERVICE_IFACE:
raise InvalidArgsException()
return self.get_properties()[GATT_SERVICE_IFACE]
class Characteristic(dbus.service.Object):
"""
org.bluez.GattCharacteristic1 interface implementation
"""
def __init__(self,flags,service):
self.path = service.path + '/char' + str(index)
self.bus = bus
self.uuid = uuid
self.service = service
self.flags = flags
self.descriptors = []
dbus.service.Object.__init__(self,self.path)
def get_properties(self):
return {
GATT_CHRC_IFACE: {
'Service': self.service.get_path(),'UUID': self.uuid,'Flags': self.flags,'Descriptors': dbus.Array(
self.get_descriptor_paths(),signature='o')
}
}
def get_path(self):
return dbus.ObjectPath(self.path)
def add_descriptor(self,descriptor):
self.descriptors.append(descriptor)
def get_descriptor_paths(self):
result = []
for desc in self.descriptors:
result.append(desc.get_path())
return result
def get_descriptors(self):
return self.descriptors
@dbus.service.method(DBUS_PROP_IFACE,interface):
if interface != GATT_CHRC_IFACE:
raise InvalidArgsException()
return self.get_properties()[GATT_CHRC_IFACE]
@dbus.service.method(GATT_CHRC_IFACE,in_signature='a{sv}',out_signature='ay')
def ReadValue(self,options):
print('Default ReadValue called,returning error')
raise NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE,in_signature='aya{sv}')
def WriteValue(self,value,options):
print('Default WriteValue called,returning error')
raise NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE)
def StartNotify(self):
print('Default StartNotify called,returning error')
raise NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE)
def StopNotify(self):
print('Default StopNotify called,returning error')
raise NotSupportedException()
@dbus.service.signal(DBUS_PROP_IFACE,signature='sa{sv}as')
def PropertiesChanged(self,interface,changed,invalidated):
pass
class Descriptor(dbus.service.Object):
"""
org.bluez.GattDescriptor1 interface implementation
"""
def __init__(self,characteristic):
self.path = characteristic.path + '/desc' + str(index)
self.bus = bus
self.uuid = uuid
self.flags = flags
self.chrc = characteristic
dbus.service.Object.__init__(self,self.path)
def get_properties(self):
return {
GATT_DESC_IFACE: {
'Characteristic': self.chrc.get_path(),}
}
def get_path(self):
return dbus.ObjectPath(self.path)
@dbus.service.method(DBUS_PROP_IFACE,interface):
if interface != GATT_DESC_IFACE:
raise InvalidArgsException()
return self.get_properties()[GATT_DESC_IFACE]
@dbus.service.method(GATT_DESC_IFACE,options):
print ('Default ReadValue called,returning error')
raise NotSupportedException()
@dbus.service.method(GATT_DESC_IFACE,returning error')
raise NotSupportedException()
class BatteryService(Service):
"""
Fake Battery service that emulates a draining battery.
"""
SERVICE_UUID = '180f'
def __init__(self,index):
Service.__init__(self,self.SERVICE_UUID,True)
self.add_characteristic(batterylevelCharacteristic(bus,self))
class batterylevelCharacteristic(Characteristic):
"""
Fake Battery Level characteristic. The battery level is drained by 2 points
every 5 seconds.
"""
BATTERY_LVL_UUID = '2a19'
def __init__(self,service):
Characteristic.__init__(
self,self.BATTERY_LVL_UUID,['read','notify'],service)
self.notifying = False
self.battery_lvl = 100
self.timer = GObject.timeout_add(30000,self.drain_battery)
def notify_battery_level(self):
if not self.notifying:
return
self.PropertiesChanged(
GATT_CHRC_IFACE,{ 'Value': [dbus.Byte(self.battery_lvl)] },[])
def drain_battery(self):
if not self.notifying:
return True
if self.battery_lvl > 0:
self.battery_lvl -= 2
if self.battery_lvl < 5:
#self.battery_lvl = 0
GObject.source_remove(self.timer)
print('Battery Level drained: ' + repr(self.battery_lvl))
self.notify_battery_level()
return True
def ReadValue(self,options):
print('Battery Level read: ' + repr(self.battery_lvl))
return [dbus.Byte(self.battery_lvl)]
def StartNotify(self):
if self.notifying:
print('Already notifying,nothing to do')
return
self.notifying = True
self.notify_battery_level()
def StopNotify(self):
if not self.notifying:
print('Not notifying,nothing to do')
return
self.notifying = False
#sourceId="org.bluetooth.service.device_information" type="primary" uuid="180A"
class DeviceInfoService(Service):
SERVICE_UUID = '180A'
def __init__(self,True)
self.add_characteristic(vendorCharacteristic(bus,self))
self.add_characteristic(ProductCharacteristic(bus,1,self))
self.add_characteristic(VersionCharacteristic(bus,2,self))
#name="Manufacturer Name String" sourceId="org.bluetooth.characteristic.manufacturer_name_string" uuid="2A29"
class vendorCharacteristic(Characteristic):
CHaraCTERISTIC_UUID = '2A29'
def __init__(self,self.CHaraCTERISTIC_UUID,["read"],service)
self.value = dbus.Array('HodgeCode'.encode(),signature=dbus.Signature('y'))
print(f'***vendorCharacteristic value***: {self.value}')
def ReadValue(self,options):
print(f'Read vendorCharacteristic: {self.value}')
return self.value
#sourceId="org.bluetooth.characteristic.model_number_string" uuid="2A24"
class ProductCharacteristic(Characteristic):
CHaraCTERISTIC_UUID = '2A24'
def __init__(self,service)
self.value = dbus.Array('smartRemotes'.encode(),signature=dbus.Signature('y'))
print(f'***ProductCharacteristic value***: {self.value}')
def ReadValue(self,options):
print(f'Read ProductCharacteristic: {self.value}')
return self.value
#sourceId="org.bluetooth.characteristic.software_revision_string" uuid="2A28"
class VersionCharacteristic(Characteristic):
CHaraCTERISTIC_UUID = '2A28'
def __init__(self,service)
self.value = dbus.Array('version 1.0.0'.encode(),signature=dbus.Signature('y'))
print(f'***VersionCharacteristic value***: {self.value}')
def ReadValue(self,options):
print(f'Read VersionCharacteristic: {self.value}')
return self.value
#name="Human Interface Device" sourceId="org.bluetooth.service.human_interface_device" type="primary" uuid="1812"
class HIDService(Service):
SERVICE_UUID = '1812'
def __init__(self,application):
Service.__init__(self,True)
self.parent = application
self.protocolMode = ProtocolModeCharacteristic(bus,self)
self.hidInfo = HIDInfoCharacteristic(bus,self)
self.controlPoint = ControlPointCharacteristic(bus,self)
self.report = ReportCharacteristic(bus,3,self)
self.reportMap = ReportMapCharacteristic(bus,4,self)
self.add_characteristic(self.protocolMode)
self.add_characteristic(self.hidInfo)
self.add_characteristic(self.controlPoint)
self.add_characteristic(self.report)
self.add_characteristic(self.reportMap)
self.protocolMode.ReadValue({})
#name="Protocol Mode" sourceId="org.bluetooth.characteristic.protocol_mode" uuid="2A4E"
class ProtocolModeCharacteristic(Characteristic):
CHaraCTERISTIC_UUID = '2A4E'
def __init__(self,service):
Characteristic.__init__(
self,["read","write-without-response"],service)
#self.value = dbus.Array([1],signature=dbus.Signature('y'))
self.parent = service
self.value = dbus.Array(bytearray.fromhex('01'),signature=dbus.Signature('y'))
print(f'***ProtocolMode value***: {self.value}')
print('********',service.parent)
def ReadValue(self,options):
print(f'Read ProtocolMode: {self.value}')
return self.value
def WriteValue(self,options):
print(f'Write ProtocolMode {value}')
self.value = value
#sourceId="org.bluetooth.characteristic.hid_control_point" uuid="2A4C"
class ControlPointCharacteristic(Characteristic):
CHaraCTERISTIC_UUID = '2A4C'
def __init__(self,["write-without-response"],service)
self.value = dbus.Array(bytearray.fromhex('00'),signature=dbus.Signature('y'))
print(f'***ControlPoint value***: {self.value}')
def WriteValue(self,options):
print(f'Write ControlPoint {value}')
self.value = value
#id="hid_information" name="HID information" sourceId="org.bluetooth.characteristic.hid_information" uuid="2A4A"
class HIDInfoCharacteristic(Characteristic):
CHaraCTERISTIC_UUID = '2A4A'
def __init__(self,['read'],service)
self.value = dbus.Array(bytearray.fromhex('01110002'),signature=dbus.Signature('y'))
print(f'***HIDinformation value***: {self.value}')
def ReadValue(self,options):
print(f'Read HIDinformation: {self.value}')
return self.value
#sourceId="org.bluetooth.characteristic.report_map" uuid="2A4B"
class ReportMapCharacteristic(Characteristic):
CHaraCTERISTIC_UUID = '2A4B'
def __init__(self,service)
self.parent = service
#self.value = dbus.Array(bytearray.fromhex('05010906a101850175019508050719e029e715002501810295017508810395057501050819012905910295017503910395067508150026ff000507190029ff8100c0050C0901A101850275109501150126ff0719012Aff078100C005010906a101850375019508050719e029e715002501810295017508150026ff000507190029ff8100c0'),signature=dbus.Signature('y'))
self.value = dbus.Array(bytearray.fromhex('05010906a101050719e029e71500250175019508810295017508810195067508150025650507190029658100c0'),signature=dbus.Signature('y'))
print(f'***ReportMap value***: {self.value}')
def ReadValue(self,options):
print(f'Read ReportMap: {self.value}')
return self.value
#id="report" name="Report" sourceId="org.bluetooth.characteristic.report" uuid="2A4D"
class ReportCharacteristic(Characteristic):
CHaraCTERISTIC_UUID = '2A4D'
def __init__(self,service)
#self.add_descriptor(ClientConfigurationDescriptor(bus,self))
self.add_descriptor(ReportReferenceDescriptor(bus,self))
#[ 0xA1,reportNum,0 ]
#self.value = dbus.Array(bytearray.fromhex('00000000000000000000'),signature=dbus.Signature('y'))
self.value = dbus.Array(bytearray.fromhex('0000000000000000'),signature=dbus.Signature('y'))
print(f'***Report value***: {self.value}')
self.notifying = False
#self.battery_lvl = 100
#GObject.timeout_add(5000,self.drain_battery)
def send(self,value='hey'):
print(f'***send*** {value}');
self.payload = dbus.Array(bytearray.fromhex('a100004800000000'))
self.PropertiesChanged(GATT_CHRC_IFACE,{ 'Value': self.payload },[])
print(f'***sent***');
def ReadValue(self,options):
print(f'Read Report: {self.value}')
return self.value
def WriteValue(self,options):
print(f'Write Report {self.value}')
self.value = value
def StartNotify(self):
print(f'Start Notify')
if self.notifying:
print('Already notifying,nothing to do')
return
self.notifying = True
self.notify_battery_level()
def StopNotify(self):
print(f'Stop Notify')
if not self.notifying:
print('Not notifying,nothing to do')
return
self.notifying = False
#name="Client Characteristic Configuration" sourceId="org.bluetooth.descriptor.gatt.client_characteristic_configuration" uuid="2902"
class ClientConfigurationDescriptor(Descriptor):
DESCRIPTOR_UUID = '2902'
def __init__(self,characteristic):
Descriptor.__init__(
self,self.DESCRIPTOR_UUID,'write'],characteristic)
self.value = dbus.Array(bytearray.fromhex('0100'),signature=dbus.Signature('y'))
print(f'***ClientConfiguration***: {self.value}')
def ReadValue(self,options):
print(f'Read ClientConfiguration: {self.value}')
return self.value
def WriteValue(self,options):
print(f'Write ClientConfiguration {self.value}')
self.value = value
#type="org.bluetooth.descriptor.report_reference" uuid="2908"
class ReportReferenceDescriptor(Descriptor):
DESCRIPTOR_UUID = '2908'
def __init__(self,characteristic)
self.value = dbus.Array(bytearray.fromhex('0001'),signature=dbus.Signature('y'))
print(f'***ReportReference***: {self.value}')
def ReadValue(self,options):
print(f'Read ReportReference: {self.value}')
return self.value
#############################
# my sandBox
#############################
class TestService(Service):
"""
Dummy test service that provides characteristics and descriptors that
exercise varIoUs API functionality.
"""
SERVICE_UUID = '12345678-1234-5678-1234-56789abcdef0'
def __init__(self,True)
self.add_characteristic(TestCharacteristic(bus,self))
class TestCharacteristic(Characteristic):
"""
Dummy test characteristic. Allows writing arbitrary bytes to its value,and
contains "extended properties",as well as a test descriptor.
"""
CHaraCTERISTIC_UUID = '12345678-1234-5678-1234-56789abcdef1'
def __init__(self,service)
self.add_descriptor(TestDescriptor(bus,self))
#self.value = []
self.value = dbus.Array(bytearray.fromhex('05010906a101850175019508050719e029e715002501810295017508810395057501050819012905910295017503910395067508150026ff000507190029ff8100c0050C0901A101850275109501150126ff0719012Aff078100C005010906a101850375019508050719e029e715002501810295017508150026ff000507190029ff8100c0'),signature=dbus.Signature('y'))
def ReadValue(self,options):
print('TestCharacteristic Read: ' + repr(self.value))
return self.value
def WriteValue(self,options):
print('TestCharacteristic Write: ' + repr(value))
self.value = value
class TestDescriptor(Descriptor):
"""
Dummy test descriptor. Returns a static value.
"""
DESCRIPTOR_UUID = '12345678-1234-5678-1234-56789abcdef2'
def __init__(self,characteristic)
self.value = dbus.Array('Test'.encode(),signature=dbus.Signature('y'))
print(f'***TestDescriptor***: {self.value}')
def ReadValue(self,options):
print('TestDescriptor Read')
return self.value
def WriteValue(self,options):
print(f'TestDescriptor Write: {value}')
self.value = value
def register_app_cb():
print('GATT application registered')
def register_app_error_cb(error):
print('Failed to register application: ' + str(error))
mainloop.quit()
def find_adapter(bus):
remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME,'/'),DBUS_OM_IFACE)
objects = remote_om.GetManagedobjects()
for o,props in objects.items():
if GATT_MANAGER_IFACE in props.keys():
return o
return None
def main():
global mainloop
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
adapter = find_adapter(bus)
if not adapter:
print('GattManager1 interface not found')
return
service_manager = dbus.Interface(
bus.get_object(BLUEZ_SERVICE_NAME,adapter),GATT_MANAGER_IFACE)
app = Application(bus)
mainloop = GObject.MainLoop()
print('Registering GATT application...')
service_manager.Registerapplication(app.get_path(),{},reply_handler=register_app_cb,error_handler=register_app_error_cb)
mainloop.run()
if __name__ == '__main__':
main()
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)