有没有人有使用 BLUEZ 的 GATT HID 键盘服务器的工作示例?

问题描述

我将 Raspbery PI4-b 与 Raspian Lite 发行版结合使用,使用 BLUEZ 编写虚拟键盘以创建 GATT HID (HOGP) 服务。

我采用了 BLUEZ 提供的心率监测器服务的 GATT 示例代码,并对其进行了修改添加服务和特征,以尝试制作我的 GATT HID 键盘服务器。

我尽了最大努力让我的服务器正常工作,但无法完全弄清楚。所以想知道是否有人有我可以使用的此类服务的工作示例?

我的服务器可以正常连接到 Windows 10 和我的 Android 手机作为 ble 键盘外围设备,但两个客户端都不会启动报告特征通知功能,因此我可以开始发送包含按键数据的报告。

谢谢!!!提前获得任何帮助。

我从 BLUEZ 心脏监护仪示例创建的代码

#!/usr/bin/env python3
# SPDX-License-Identifier: LGPL-2.1-or-later

import time
import dbus,dbus.exceptions
import dbus.mainloop.glib
import dbus.service

import array
try:
  from gi.repository import GObject
except ImportError:
  import gobject as GObject
import sys

from random import randint

mainloop = None
hidService = None

BLUEZ_SERVICE_NAME = 'org.bluez'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
DBUS_OM_IFACE =      'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE =    'org.freedesktop.DBus.Properties'

GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE =    'org.bluez.GattCharacteristic1'
GATT_DESC_IFACE =    'org.bluez.GattDescriptor1'

class InvalidArgsException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'

class NotSupportedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.NotSupported'

class NotPermittedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.NotPermitted'

class InvalidValueLengthException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.InvalidValueLength'

class FailedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.Failed'


class Application(dbus.service.Object):
    """
    org.bluez.GattApplication1 interface implementation
    """
    def __init__(self,bus):
 
        self.path = '/'
        self.services = []
        dbus.service.Object.__init__(self,bus,self.path)
        
        self.add_service(HIDService(bus,0))
        self.add_service(DeviceInfoService(bus,1))
        self.add_service(BatteryService(bus,2))
        #self.add_service(TestService(bus,3))

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_service(self,service):
        self.services.append(service)

    @dbus.service.method(DBUS_OM_IFACE,out_signature='a{oa{sa{sv}}}')
    def GetManagedobjects(self):
        response = {}
        print('GetManagedobjects')

        for service in self.services:
            response[service.get_path()] = service.get_properties()
            chrcs = service.get_characteristics()
            for chrc in chrcs:
                response[chrc.get_path()] = chrc.get_properties()
                descs = chrc.get_descriptors()
                for desc in descs:
                    response[desc.get_path()] = desc.get_properties()

        return response


class Service(dbus.service.Object):
    """
    org.bluez.GattService1 interface implementation
    """
    PATH_BASE = '/org/bluez/example/service'

    def __init__(self,index,uuid,primary):
        self.path = self.PATH_BASE + str(index)
        self.bus = bus
        self.uuid = uuid
        self.primary = primary
        self.characteristics = []
        dbus.service.Object.__init__(self,self.path)

    def get_properties(self):
        return {
                GATT_SERVICE_IFACE: {
                        'UUID': self.uuid,'Primary': self.primary,'characteristics': dbus.Array(
                                self.get_characteristic_paths(),signature='o')
                }
        }

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_characteristic(self,characteristic):
        self.characteristics.append(characteristic)

    def get_characteristic_paths(self):
        result = []
        for chrc in self.characteristics:
            result.append(chrc.get_path())
        return result

    def get_characteristics(self):
        return self.characteristics

    @dbus.service.method(DBUS_PROP_IFACE,in_signature='s',out_signature='a{sv}')
    def GetAll(self,interface):
        if interface != GATT_SERVICE_IFACE:
            raise InvalidArgsException()

        return self.get_properties()[GATT_SERVICE_IFACE]


class Characteristic(dbus.service.Object):
    """
    org.bluez.GattCharacteristic1 interface implementation
    """
    def __init__(self,flags,service):
        self.path = service.path + '/char' + str(index)
        self.bus = bus
        self.uuid = uuid
        self.service = service
        self.flags = flags
        self.descriptors = []
        dbus.service.Object.__init__(self,self.path)

    def get_properties(self):
        return {
                GATT_CHRC_IFACE: {
                        'Service': self.service.get_path(),'UUID': self.uuid,'Flags': self.flags,'Descriptors': dbus.Array(
                                self.get_descriptor_paths(),signature='o')
                }
        }

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_descriptor(self,descriptor):
        self.descriptors.append(descriptor)

    def get_descriptor_paths(self):
        result = []
        for desc in self.descriptors:
            result.append(desc.get_path())
        return result

    def get_descriptors(self):
        return self.descriptors

    @dbus.service.method(DBUS_PROP_IFACE,interface):
        if interface != GATT_CHRC_IFACE:
            raise InvalidArgsException()

        return self.get_properties()[GATT_CHRC_IFACE]

    @dbus.service.method(GATT_CHRC_IFACE,in_signature='a{sv}',out_signature='ay')
    def ReadValue(self,options):
        print('Default ReadValue called,returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_CHRC_IFACE,in_signature='aya{sv}')
    def WriteValue(self,value,options):
        print('Default WriteValue called,returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_CHRC_IFACE)
    def StartNotify(self):
        print('Default StartNotify called,returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_CHRC_IFACE)
    def StopNotify(self):
        print('Default StopNotify called,returning error')
        raise NotSupportedException()

    @dbus.service.signal(DBUS_PROP_IFACE,signature='sa{sv}as')
    def PropertiesChanged(self,interface,changed,invalidated):
        pass


class Descriptor(dbus.service.Object):
    """
    org.bluez.GattDescriptor1 interface implementation
    """
    def __init__(self,characteristic):
        self.path = characteristic.path + '/desc' + str(index)
        self.bus = bus
        self.uuid = uuid
        self.flags = flags
        self.chrc = characteristic
        dbus.service.Object.__init__(self,self.path)

    def get_properties(self):
        return {
                GATT_DESC_IFACE: {
                        'Characteristic': self.chrc.get_path(),}
        }

    def get_path(self):
        return dbus.ObjectPath(self.path)

    @dbus.service.method(DBUS_PROP_IFACE,interface):
        if interface != GATT_DESC_IFACE:
            raise InvalidArgsException()

        return self.get_properties()[GATT_DESC_IFACE]

    @dbus.service.method(GATT_DESC_IFACE,options):
        print ('Default ReadValue called,returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_DESC_IFACE,returning error')
        raise NotSupportedException()

class BatteryService(Service):
    """
    Fake Battery service that emulates a draining battery.

    """
    SERVICE_UUID = '180f'

    def __init__(self,index):
        Service.__init__(self,self.SERVICE_UUID,True)
        self.add_characteristic(batterylevelCharacteristic(bus,self))


class batterylevelCharacteristic(Characteristic):
    """
    Fake Battery Level characteristic. The battery level is drained by 2 points
    every 5 seconds.

    """
    BATTERY_LVL_UUID = '2a19'

    def __init__(self,service):
        Characteristic.__init__(
                self,self.BATTERY_LVL_UUID,['read','notify'],service)
        self.notifying = False
        self.battery_lvl = 100
        self.timer = GObject.timeout_add(30000,self.drain_battery)

    def notify_battery_level(self):
        if not self.notifying:
            return
        self.PropertiesChanged(
                GATT_CHRC_IFACE,{ 'Value': [dbus.Byte(self.battery_lvl)] },[])

    def drain_battery(self):
        if not self.notifying:
            return True
        if self.battery_lvl > 0:
            self.battery_lvl -= 2
            if self.battery_lvl < 5:
                #self.battery_lvl = 0
                GObject.source_remove(self.timer)
                
        print('Battery Level drained: ' + repr(self.battery_lvl))
        self.notify_battery_level()
        return True

    def ReadValue(self,options):
        print('Battery Level read: ' + repr(self.battery_lvl))
        return [dbus.Byte(self.battery_lvl)]

    def StartNotify(self):
        if self.notifying:
            print('Already notifying,nothing to do')
            return

        self.notifying = True
        self.notify_battery_level()

    def StopNotify(self):
        if not self.notifying:
            print('Not notifying,nothing to do')
            return

        self.notifying = False


#sourceId="org.bluetooth.service.device_information" type="primary" uuid="180A"
class DeviceInfoService(Service):

    SERVICE_UUID = '180A'

    def __init__(self,True)
        self.add_characteristic(vendorCharacteristic(bus,self))
        self.add_characteristic(ProductCharacteristic(bus,1,self))
        self.add_characteristic(VersionCharacteristic(bus,2,self))

#name="Manufacturer Name String" sourceId="org.bluetooth.characteristic.manufacturer_name_string" uuid="2A29"
class vendorCharacteristic(Characteristic):

    CHaraCTERISTIC_UUID = '2A29'

    def __init__(self,self.CHaraCTERISTIC_UUID,["read"],service)
        
        self.value = dbus.Array('HodgeCode'.encode(),signature=dbus.Signature('y'))
        print(f'***vendorCharacteristic value***: {self.value}')

    def ReadValue(self,options):
        print(f'Read vendorCharacteristic: {self.value}')
        return self.value

#sourceId="org.bluetooth.characteristic.model_number_string" uuid="2A24"
class ProductCharacteristic(Characteristic):

    CHaraCTERISTIC_UUID = '2A24'

    def __init__(self,service)
        
        self.value = dbus.Array('smartRemotes'.encode(),signature=dbus.Signature('y'))
        print(f'***ProductCharacteristic value***: {self.value}')

    def ReadValue(self,options):
        print(f'Read ProductCharacteristic: {self.value}')
        return self.value

#sourceId="org.bluetooth.characteristic.software_revision_string" uuid="2A28"
class VersionCharacteristic(Characteristic):

    CHaraCTERISTIC_UUID = '2A28'

    def __init__(self,service)
        
        self.value = dbus.Array('version 1.0.0'.encode(),signature=dbus.Signature('y'))
        print(f'***VersionCharacteristic value***: {self.value}')

    def ReadValue(self,options):
        print(f'Read VersionCharacteristic: {self.value}')
        return self.value

#name="Human Interface Device" sourceId="org.bluetooth.service.human_interface_device" type="primary" uuid="1812"
class HIDService(Service):
    SERVICE_UUID = '1812'
   
    def __init__(self,application):
        Service.__init__(self,True)
        
        self.parent = application
        self.protocolMode = ProtocolModeCharacteristic(bus,self)
        self.hidInfo = HIDInfoCharacteristic(bus,self)
        self.controlPoint = ControlPointCharacteristic(bus,self)
        self.report = ReportCharacteristic(bus,3,self)
        self.reportMap = ReportMapCharacteristic(bus,4,self)
        
        self.add_characteristic(self.protocolMode)
        self.add_characteristic(self.hidInfo)
        self.add_characteristic(self.controlPoint)
        self.add_characteristic(self.report)
        self.add_characteristic(self.reportMap)
    
        self.protocolMode.ReadValue({})
        
#name="Protocol Mode" sourceId="org.bluetooth.characteristic.protocol_mode" uuid="2A4E"
class ProtocolModeCharacteristic(Characteristic):

    CHaraCTERISTIC_UUID = '2A4E'

    def __init__(self,service):
        
        Characteristic.__init__(
                self,["read","write-without-response"],service)
        
        #self.value = dbus.Array([1],signature=dbus.Signature('y'))
        self.parent = service
        self.value = dbus.Array(bytearray.fromhex('01'),signature=dbus.Signature('y'))
        print(f'***ProtocolMode value***: {self.value}')
        print('********',service.parent)

    def ReadValue(self,options):
        print(f'Read ProtocolMode: {self.value}')
        return self.value

    def WriteValue(self,options):
        print(f'Write ProtocolMode {value}')
        self.value = value

#sourceId="org.bluetooth.characteristic.hid_control_point" uuid="2A4C"
class ControlPointCharacteristic(Characteristic):

    CHaraCTERISTIC_UUID = '2A4C'

    def __init__(self,["write-without-response"],service)
        
        self.value = dbus.Array(bytearray.fromhex('00'),signature=dbus.Signature('y'))
        print(f'***ControlPoint value***: {self.value}')

    def WriteValue(self,options):
        print(f'Write ControlPoint {value}')
        self.value = value


#id="hid_information" name="HID information" sourceId="org.bluetooth.characteristic.hid_information" uuid="2A4A"
class HIDInfoCharacteristic(Characteristic):

    CHaraCTERISTIC_UUID = '2A4A'

    def __init__(self,['read'],service)
                
        self.value = dbus.Array(bytearray.fromhex('01110002'),signature=dbus.Signature('y'))
        print(f'***HIDinformation value***: {self.value}')

    def ReadValue(self,options):
        print(f'Read HIDinformation: {self.value}')
        return self.value


#sourceId="org.bluetooth.characteristic.report_map" uuid="2A4B"
class ReportMapCharacteristic(Characteristic):

    CHaraCTERISTIC_UUID = '2A4B'

    def __init__(self,service)
                
        self.parent = service
        #self.value = dbus.Array(bytearray.fromhex('05010906a101850175019508050719e029e715002501810295017508810395057501050819012905910295017503910395067508150026ff000507190029ff8100c0050C0901A101850275109501150126ff0719012Aff078100C005010906a101850375019508050719e029e715002501810295017508150026ff000507190029ff8100c0'),signature=dbus.Signature('y'))
        self.value = dbus.Array(bytearray.fromhex('05010906a101050719e029e71500250175019508810295017508810195067508150025650507190029658100c0'),signature=dbus.Signature('y'))
        print(f'***ReportMap value***: {self.value}')

    def ReadValue(self,options):
        print(f'Read ReportMap: {self.value}')
        return self.value


#id="report" name="Report" sourceId="org.bluetooth.characteristic.report" uuid="2A4D"        
class ReportCharacteristic(Characteristic):

    CHaraCTERISTIC_UUID = '2A4D'

    def __init__(self,service)
                
        #self.add_descriptor(ClientConfigurationDescriptor(bus,self))
        self.add_descriptor(ReportReferenceDescriptor(bus,self))
        
        #[ 0xA1,reportNum,0 ]
        #self.value = dbus.Array(bytearray.fromhex('00000000000000000000'),signature=dbus.Signature('y'))
        self.value = dbus.Array(bytearray.fromhex('0000000000000000'),signature=dbus.Signature('y'))
        print(f'***Report value***: {self.value}')
                
        self.notifying = False
        #self.battery_lvl = 100
        #GObject.timeout_add(5000,self.drain_battery)

    def send(self,value='hey'):
        print(f'***send*** {value}');
        self.payload = dbus.Array(bytearray.fromhex('a100004800000000'))       
        self.PropertiesChanged(GATT_CHRC_IFACE,{ 'Value': self.payload },[])
                
        print(f'***sent***');
        

    def ReadValue(self,options):
        print(f'Read Report: {self.value}')
        return self.value

    def WriteValue(self,options):
        print(f'Write Report {self.value}')
        self.value = value

    def StartNotify(self):
        print(f'Start Notify')
        if self.notifying:
            print('Already notifying,nothing to do')
            return

        self.notifying = True
        self.notify_battery_level()

    def StopNotify(self):
        print(f'Stop Notify')
        if not self.notifying:
            print('Not notifying,nothing to do')
            return

        self.notifying = False
        
#name="Client Characteristic Configuration" sourceId="org.bluetooth.descriptor.gatt.client_characteristic_configuration" uuid="2902"
class ClientConfigurationDescriptor(Descriptor):

    DESCRIPTOR_UUID = '2902'

    def __init__(self,characteristic):
        Descriptor.__init__(
                self,self.DESCRIPTOR_UUID,'write'],characteristic)
                
        self.value = dbus.Array(bytearray.fromhex('0100'),signature=dbus.Signature('y'))
        print(f'***ClientConfiguration***: {self.value}')

    def ReadValue(self,options):
        print(f'Read ClientConfiguration: {self.value}')
        return self.value

    def WriteValue(self,options):
        print(f'Write ClientConfiguration {self.value}')
        self.value = value

#type="org.bluetooth.descriptor.report_reference" uuid="2908"
class ReportReferenceDescriptor(Descriptor):

    DESCRIPTOR_UUID = '2908'

    def __init__(self,characteristic)
                
        self.value = dbus.Array(bytearray.fromhex('0001'),signature=dbus.Signature('y'))
        print(f'***ReportReference***: {self.value}')

    def ReadValue(self,options):
        print(f'Read ReportReference: {self.value}')
        return self.value
 
#############################
# my sandBox
#############################
class TestService(Service):
    """
    Dummy test service that provides characteristics and descriptors that
    exercise varIoUs API functionality.
    """

    SERVICE_UUID = '12345678-1234-5678-1234-56789abcdef0'

    def __init__(self,True)
        self.add_characteristic(TestCharacteristic(bus,self))


class TestCharacteristic(Characteristic):
    """
    Dummy test characteristic. Allows writing arbitrary bytes to its value,and
    contains "extended properties",as well as a test descriptor.
    """

    CHaraCTERISTIC_UUID = '12345678-1234-5678-1234-56789abcdef1'

    def __init__(self,service)
                
        self.add_descriptor(TestDescriptor(bus,self))
        #self.value = []
        self.value = dbus.Array(bytearray.fromhex('05010906a101850175019508050719e029e715002501810295017508810395057501050819012905910295017503910395067508150026ff000507190029ff8100c0050C0901A101850275109501150126ff0719012Aff078100C005010906a101850375019508050719e029e715002501810295017508150026ff000507190029ff8100c0'),signature=dbus.Signature('y'))

    def ReadValue(self,options):
        print('TestCharacteristic Read: ' + repr(self.value))
        return self.value

    def WriteValue(self,options):
        print('TestCharacteristic Write: ' + repr(value))
        self.value = value


class TestDescriptor(Descriptor):
    """
    Dummy test descriptor. Returns a static value.
    """

    DESCRIPTOR_UUID = '12345678-1234-5678-1234-56789abcdef2'

    def __init__(self,characteristic)
                
        self.value = dbus.Array('Test'.encode(),signature=dbus.Signature('y'))
        print(f'***TestDescriptor***: {self.value}')


    def ReadValue(self,options):
        print('TestDescriptor Read')
        
        return self.value

    def WriteValue(self,options):
        print(f'TestDescriptor Write: {value}')
        self.value = value

def register_app_cb():
    print('GATT application registered')


def register_app_error_cb(error):
    print('Failed to register application: ' + str(error))
    mainloop.quit()


def find_adapter(bus):
    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME,'/'),DBUS_OM_IFACE)
    objects = remote_om.GetManagedobjects()

    for o,props in objects.items():
        if GATT_MANAGER_IFACE in props.keys():
            return o

    return None

def main():
    global mainloop

    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

    bus = dbus.SystemBus()

    adapter = find_adapter(bus)
    if not adapter:
        print('GattManager1 interface not found')
        return

    service_manager = dbus.Interface(
            bus.get_object(BLUEZ_SERVICE_NAME,adapter),GATT_MANAGER_IFACE)

    app = Application(bus)

    mainloop = GObject.MainLoop()

    print('Registering GATT application...')

    service_manager.Registerapplication(app.get_path(),{},reply_handler=register_app_cb,error_handler=register_app_error_cb)

    mainloop.run()

if __name__ == '__main__':
    main()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...