{"id":143,"date":"2019-12-24T18:09:07","date_gmt":"2019-12-24T10:09:07","guid":{"rendered":"http:\/\/cf3b5.com\/blog\/?p=143"},"modified":"2019-12-31T17:27:11","modified_gmt":"2019-12-31T09:27:11","slug":"raspberry-pi%e7%9a%84ble%e5%bc%80%e5%8f%91","status":"publish","type":"post","link":"http:\/\/cf3b5.com\/blog\/?p=143","title":{"rendered":"Raspberry Pi\u7684BLE\u5f00\u53d1\u4f8b\u5b50"},"content":{"rendered":"\n<p class=\"has-text-color has-background has-very-dark-gray-color has-luminous-vivid-amber-background-color\">2019\/12\/31 \u8865\u5145\u2026\u2026BLE\u7684\u4f20\u8f93\u901f\u7387\u592a\u5751\u4e86\uff0c\u867d\u7136\u5c0f\u7a0b\u5e8f\u652f\u6301\uff0c\u4f46\u662f\u771f\u7684\u7528\u8d77\u6765\u5f88\u6162\u554a\uff01\u54ed\u2026\u2026<\/p>\n\n\n\n<p>\u4f4e\u529f\u8017BLE\u548c\u4f20\u7edf\u84dd\u7259\u8fd8\u662f\u633a\u4e0d\u4e00\u6837\u7684\uff0c\u5fae\u4fe1\u5c0f\u7a0b\u5e8f\u8c8c\u4f3c\u53ea\u652f\u6301\u4f4e\u529f\u8017\u84dd\u7259\u7684\u534f\u8bae\uff0c\u4e3a\u4e86\u8ba9Pixels\u8fd9\u4e2a\u9879\u76ee\u80fd\u652f\u6301\u5c0f\u7a0b\u5e8f\uff0c\u53c8\u5f00\u59cb\u4e86\u6f2b\u6f2b\u6316\u5751\u4e4b\u8def<\/p>\n\n\n\n<p>\u7b2c\u4e00\uff0c\u5b89\u88c5BlueZ 5.5\u7248\u672c\uff0c\u8fd9\u4e2a\u4e0d\u7ec6\u8bf4\uff0c\u540e\u9762\u8865\u5145\uff0c\u5148\u5360\u5751\uff01<\/p>\n\n\n\n<p>\u7b2c\u4e8c\uff0c\u5b89\u88c5python\u7684bluepy\u5e93<\/p>\n\n\n\n<pre class=\"wp-block-syntaxhighlighter-code\">sudo apt-get install libglib2.0-dev\nsudo pip install bluepy<\/pre>\n\n\n\n<p>\u7b2c\u4e09\uff0c\u521b\u5efa\u6d4b\u8bd5\u6e90\u4ee3\u7801<\/p>\n\n\n\n<p>example_advertisement.py<\/p>\n\n\n\n<pre class=\"wp-block-syntaxhighlighter-code\">#!\/usr\/bin\/python\n\nfrom __future__ import print_function\n\nimport dbus\nimport dbus.exceptions\nimport dbus.mainloop.glib\nimport dbus.service\n\nimport array\n\ntry:\n  from gi.repository import GObject  # python3\nexcept ImportError:\n  import gobject as GObject  # python2\n\nfrom random import randint\n\nmainloop = None\n\nBLUEZ_SERVICE_NAME = 'org.bluez'\nLE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'\nDBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'\nDBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'\n\nLE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'\n\n\nclass InvalidArgsException(dbus.exceptions.DBusException):\n    _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'\n\n\nclass NotSupportedException(dbus.exceptions.DBusException):\n    _dbus_error_name = 'org.bluez.Error.NotSupported'\n\n\nclass NotPermittedException(dbus.exceptions.DBusException):\n    _dbus_error_name = 'org.bluez.Error.NotPermitted'\n\n\nclass InvalidValueLengthException(dbus.exceptions.DBusException):\n    _dbus_error_name = 'org.bluez.Error.InvalidValueLength'\n\n\nclass FailedException(dbus.exceptions.DBusException):\n    _dbus_error_name = 'org.bluez.Error.Failed'\n\n\nclass Advertisement(dbus.service.Object):\n    PATH_BASE = '\/org\/bluez\/example\/advertisement'\n\n    def __init__(self, bus, index, advertising_type):\n        self.path = self.PATH_BASE + str(index)\n        self.bus = bus\n        self.ad_type = advertising_type\n        self.service_uuids = None\n        self.manufacturer_data = None\n        self.solicit_uuids = None\n        self.service_data = None\n        self.local_name = None\n        self.include_tx_power = None\n        self.data = None\n        dbus.service.Object.__init__(self, bus, self.path)\n\n    def get_properties(self):\n        properties = dict()\n        properties['Type'] = self.ad_type\n        if self.service_uuids is not None:\n            properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,\n                                                    signature='s')\n        if self.solicit_uuids is not None:\n            properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,\n                                                    signature='s')\n        if self.manufacturer_data is not None:\n            properties['ManufacturerData'] = dbus.Dictionary(\n                self.manufacturer_data, signature='qv')\n        if self.service_data is not None:\n            properties['ServiceData'] = dbus.Dictionary(self.service_data,\n                                                        signature='sv')\n        if self.local_name is not None:\n            properties['LocalName'] = dbus.String(self.local_name)\n        if self.include_tx_power is not None:\n            properties['IncludeTxPower'] = dbus.Boolean(self.include_tx_power)\n\n        if self.data is not None:\n            properties['Data'] = dbus.Dictionary(\n                self.data, signature='yv')\n        return {LE_ADVERTISEMENT_IFACE: properties}\n\n    def get_path(self):\n        return dbus.ObjectPath(self.path)\n\n    def add_service_uuid(self, uuid):\n        if not self.service_uuids:\n            self.service_uuids = []\n        self.service_uuids.append(uuid)\n\n    def add_solicit_uuid(self, uuid):\n        if not self.solicit_uuids:\n            self.solicit_uuids = []\n        self.solicit_uuids.append(uuid)\n\n    def add_manufacturer_data(self, manuf_code, data):\n        if not self.manufacturer_data:\n            self.manufacturer_data = dbus.Dictionary({}, signature='qv')\n        self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y')\n\n    def add_service_data(self, uuid, data):\n        if not self.service_data:\n            self.service_data = dbus.Dictionary({}, signature='sv')\n        self.service_data[uuid] = dbus.Array(data, signature='y')\n\n    def add_local_name(self, name):\n        if not self.local_name:\n            self.local_name = \"\"\n        self.local_name = dbus.String(name)\n\n    def add_data(self, ad_type, data):\n        if not self.data:\n            self.data = dbus.Dictionary({}, signature='yv')\n        self.data[ad_type] = dbus.Array(data, signature='y')\n\n    @dbus.service.method(DBUS_PROP_IFACE,\n                         in_signature='s',\n                         out_signature='a{sv}')\n    def GetAll(self, interface):\n        print('GetAll')\n        if interface != LE_ADVERTISEMENT_IFACE:\n            raise InvalidArgsException()\n        print('returning props')\n        return self.get_properties()[LE_ADVERTISEMENT_IFACE]\n\n    @dbus.service.method(LE_ADVERTISEMENT_IFACE,\n                         in_signature='',\n                         out_signature='')\n    def Release(self):\n        print('%s: Released!' % self.path)\n\nclass TestAdvertisement(Advertisement):\n\n    def __init__(self, bus, index):\n        Advertisement.__init__(self, bus, index, 'peripheral')\n        self.add_service_uuid('180D')\n        self.add_service_uuid('180F')\n        self.add_manufacturer_data(0xffff, [0x00, 0x01, 0x02, 0x03, 0x04])\n        self.add_service_data('9999', [0x00, 0x01, 0x02, 0x03, 0x04])\n        self.add_local_name('TestAdvertisement')\n        self.include_tx_power = True\n        self.add_data(0x26, [0x01, 0x01, 0x00])\n\n\ndef register_ad_cb():\n    print('Advertisement registered')\n\n\ndef register_ad_error_cb(error):\n    print('Failed to register advertisement: ' + str(error))\n    mainloop.quit()\n\n\ndef find_adapter(bus):\n    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '\/'),\n                               DBUS_OM_IFACE)\n    objects = remote_om.GetManagedObjects()\n\n    for o, props in objects.items():\n        if LE_ADVERTISING_MANAGER_IFACE in props:\n            return o\n\n    return None\n\n\ndef main():\n    global mainloop\n\n    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)\n\n    bus = dbus.SystemBus()\n\n    adapter = find_adapter(bus)\n    if not adapter:\n        print('LEAdvertisingManager1 interface not found')\n        return\n\n    adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),\n                                   \"org.freedesktop.DBus.Properties\");\n\n    adapter_props.Set(\"org.bluez.Adapter1\", \"Powered\", dbus.Boolean(1))\n\n    ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),\n                                LE_ADVERTISING_MANAGER_IFACE)\n\n    test_advertisement = TestAdvertisement(bus, 0)\n\n    mainloop = GObject.MainLoop()\n\n    ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {},\n                                     reply_handler=register_ad_cb,\n                                     error_handler=register_ad_error_cb)\n\n    mainloop.run()\n\nif __name__ == '__main__':\n    main()\n<\/pre>\n\n\n\n<p>example_gatt_server.py<\/p>\n\n\n\n<pre class=\"wp-block-syntaxhighlighter-code\">#!\/usr\/bin\/env python3\n\nimport dbus\nimport dbus.exceptions\nimport dbus.mainloop.glib\nimport dbus.service\n\nimport array\ntry:\n  from gi.repository import GObject\nexcept ImportError:\n  import gobject as GObject\nimport sys\n\nfrom random import randint\n\nmainloop = None\n\nBLUEZ_SERVICE_NAME = 'org.bluez'\nGATT_MANAGER_IFACE = 'org.bluez.GattManager1'\nDBUS_OM_IFACE =      'org.freedesktop.DBus.ObjectManager'\nDBUS_PROP_IFACE =    'org.freedesktop.DBus.Properties'\n\nGATT_SERVICE_IFACE = 'org.bluez.GattService1'\nGATT_CHRC_IFACE =    'org.bluez.GattCharacteristic1'\nGATT_DESC_IFACE =    'org.bluez.GattDescriptor1'\n\nclass InvalidArgsException(dbus.exceptions.DBusException):\n    _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'\n\nclass NotSupportedException(dbus.exceptions.DBusException):\n    _dbus_error_name = 'org.bluez.Error.NotSupported'\n\nclass NotPermittedException(dbus.exceptions.DBusException):\n    _dbus_error_name = 'org.bluez.Error.NotPermitted'\n\nclass InvalidValueLengthException(dbus.exceptions.DBusException):\n    _dbus_error_name = 'org.bluez.Error.InvalidValueLength'\n\nclass FailedException(dbus.exceptions.DBusException):\n    _dbus_error_name = 'org.bluez.Error.Failed'\n\n\nclass Application(dbus.service.Object):\n    \"\"\"\n    org.bluez.GattApplication1 interface implementation\n    \"\"\"\n    def __init__(self, bus):\n        self.path = '\/'\n        self.services = []\n        dbus.service.Object.__init__(self, bus, self.path)\n        self.add_service(HeartRateService(bus, 0))\n        self.add_service(BatteryService(bus, 1))\n        self.add_service(TestService(bus, 2))\n\n    def get_path(self):\n        return dbus.ObjectPath(self.path)\n\n    def add_service(self, service):\n        self.services.append(service)\n\n    @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')\n    def GetManagedObjects(self):\n        response = {}\n        print('GetManagedObjects')\n\n        for service in self.services:\n            response[service.get_path()] = service.get_properties()\n            chrcs = service.get_characteristics()\n            for chrc in chrcs:\n                response[chrc.get_path()] = chrc.get_properties()\n                descs = chrc.get_descriptors()\n                for desc in descs:\n                    response[desc.get_path()] = desc.get_properties()\n\n        return response\n\n\nclass Service(dbus.service.Object):\n    \"\"\"\n    org.bluez.GattService1 interface implementation\n    \"\"\"\n    PATH_BASE = '\/org\/bluez\/example\/service'\n\n    def __init__(self, bus, index, uuid, primary):\n        self.path = self.PATH_BASE + str(index)\n        self.bus = bus\n        self.uuid = uuid\n        self.primary = primary\n        self.characteristics = []\n        dbus.service.Object.__init__(self, bus, self.path)\n\n    def get_properties(self):\n        return {\n                GATT_SERVICE_IFACE: {\n                        'UUID': self.uuid,\n                        'Primary': self.primary,\n                        'Characteristics': dbus.Array(\n                                self.get_characteristic_paths(),\n                                signature='o')\n                }\n        }\n\n    def get_path(self):\n        return dbus.ObjectPath(self.path)\n\n    def add_characteristic(self, characteristic):\n        self.characteristics.append(characteristic)\n\n    def get_characteristic_paths(self):\n        result = []\n        for chrc in self.characteristics:\n            result.append(chrc.get_path())\n        return result\n\n    def get_characteristics(self):\n        return self.characteristics\n\n    @dbus.service.method(DBUS_PROP_IFACE,\n                         in_signature='s',\n                         out_signature='a{sv}')\n    def GetAll(self, interface):\n        if interface != GATT_SERVICE_IFACE:\n            raise InvalidArgsException()\n\n        return self.get_properties()[GATT_SERVICE_IFACE]\n\n\nclass Characteristic(dbus.service.Object):\n    \"\"\"\n    org.bluez.GattCharacteristic1 interface implementation\n    \"\"\"\n    def __init__(self, bus, index, uuid, flags, service):\n        self.path = service.path + '\/char' + str(index)\n        self.bus = bus\n        self.uuid = uuid\n        self.service = service\n        self.flags = flags\n        self.descriptors = []\n        dbus.service.Object.__init__(self, bus, self.path)\n\n    def get_properties(self):\n        return {\n                GATT_CHRC_IFACE: {\n                        'Service': self.service.get_path(),\n                        'UUID': self.uuid,\n                        'Flags': self.flags,\n                        'Descriptors': dbus.Array(\n                                self.get_descriptor_paths(),\n                                signature='o')\n                }\n        }\n\n    def get_path(self):\n        return dbus.ObjectPath(self.path)\n\n    def add_descriptor(self, descriptor):\n        self.descriptors.append(descriptor)\n\n    def get_descriptor_paths(self):\n        result = []\n        for desc in self.descriptors:\n            result.append(desc.get_path())\n        return result\n\n    def get_descriptors(self):\n        return self.descriptors\n\n    @dbus.service.method(DBUS_PROP_IFACE,\n                         in_signature='s',\n                         out_signature='a{sv}')\n    def GetAll(self, interface):\n        if interface != GATT_CHRC_IFACE:\n            raise InvalidArgsException()\n\n        return self.get_properties()[GATT_CHRC_IFACE]\n\n    @dbus.service.method(GATT_CHRC_IFACE,\n                        in_signature='a{sv}',\n                        out_signature='ay')\n    def ReadValue(self, options):\n        print('Default ReadValue called, returning error')\n        raise NotSupportedException()\n\n    @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')\n    def WriteValue(self, value, options):\n        print('Default WriteValue called, returning error')\n        raise NotSupportedException()\n\n    @dbus.service.method(GATT_CHRC_IFACE)\n    def StartNotify(self):\n        print('Default StartNotify called, returning error')\n        raise NotSupportedException()\n\n    @dbus.service.method(GATT_CHRC_IFACE)\n    def StopNotify(self):\n        print('Default StopNotify called, returning error')\n        raise NotSupportedException()\n\n    @dbus.service.signal(DBUS_PROP_IFACE,\n                         signature='sa{sv}as')\n    def PropertiesChanged(self, interface, changed, invalidated):\n        pass\n\n\nclass Descriptor(dbus.service.Object):\n    \"\"\"\n    org.bluez.GattDescriptor1 interface implementation\n    \"\"\"\n    def __init__(self, bus, index, uuid, flags, characteristic):\n        self.path = characteristic.path + '\/desc' + str(index)\n        self.bus = bus\n        self.uuid = uuid\n        self.flags = flags\n        self.chrc = characteristic\n        dbus.service.Object.__init__(self, bus, self.path)\n\n    def get_properties(self):\n        return {\n                GATT_DESC_IFACE: {\n                        'Characteristic': self.chrc.get_path(),\n                        'UUID': self.uuid,\n                        'Flags': self.flags,\n                }\n        }\n\n    def get_path(self):\n        return dbus.ObjectPath(self.path)\n\n    @dbus.service.method(DBUS_PROP_IFACE,\n                         in_signature='s',\n                         out_signature='a{sv}')\n    def GetAll(self, interface):\n        if interface != GATT_DESC_IFACE:\n            raise InvalidArgsException()\n\n        return self.get_properties()[GATT_DESC_IFACE]\n\n    @dbus.service.method(GATT_DESC_IFACE,\n                        in_signature='a{sv}',\n                        out_signature='ay')\n    def ReadValue(self, options):\n        print ('Default ReadValue called, returning error')\n        raise NotSupportedException()\n\n    @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}')\n    def WriteValue(self, value, options):\n        print('Default WriteValue called, returning error')\n        raise NotSupportedException()\n\n\nclass HeartRateService(Service):\n    \"\"\"\n    Fake Heart Rate Service that simulates a fake heart beat and control point\n    behavior.\n\n    \"\"\"\n    HR_UUID = '0000180d-0000-1000-8000-00805f9b34fb'\n\n    def __init__(self, bus, index):\n        Service.__init__(self, bus, index, self.HR_UUID, True)\n        self.add_characteristic(HeartRateMeasurementChrc(bus, 0, self))\n        self.add_characteristic(BodySensorLocationChrc(bus, 1, self))\n        self.add_characteristic(HeartRateControlPointChrc(bus, 2, self))\n        self.energy_expended = 0\n\n\nclass HeartRateMeasurementChrc(Characteristic):\n    HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb'\n\n    def __init__(self, bus, index, service):\n        Characteristic.__init__(\n                self, bus, index,\n                self.HR_MSRMT_UUID,\n                ['notify'],\n                service)\n        self.notifying = False\n        self.hr_ee_count = 0\n\n    def hr_msrmt_cb(self):\n        value = []\n        value.append(dbus.Byte(0x06))\n\n        value.append(dbus.Byte(randint(90, 130)))\n\n        if self.hr_ee_count % 10 == 0:\n            value[0] = dbus.Byte(value[0] | 0x08)\n            value.append(dbus.Byte(self.service.energy_expended &amp; 0xff))\n            value.append(dbus.Byte((self.service.energy_expended >> 8) &amp; 0xff))\n\n        self.service.energy_expended = \\\n                min(0xffff, self.service.energy_expended + 1)\n        self.hr_ee_count += 1\n\n        print('Updating value: ' + repr(value))\n\n        self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, [])\n\n        return self.notifying\n\n    def _update_hr_msrmt_simulation(self):\n        print('Update HR Measurement Simulation')\n\n        if not self.notifying:\n            return\n\n        GObject.timeout_add(1000, self.hr_msrmt_cb)\n\n    def StartNotify(self):\n        if self.notifying:\n            print('Already notifying, nothing to do')\n            return\n\n        self.notifying = True\n        self._update_hr_msrmt_simulation()\n\n    def StopNotify(self):\n        if not self.notifying:\n            print('Not notifying, nothing to do')\n            return\n\n        self.notifying = False\n        self._update_hr_msrmt_simulation()\n\n\nclass BodySensorLocationChrc(Characteristic):\n    BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'\n\n    def __init__(self, bus, index, service):\n        Characteristic.__init__(\n                self, bus, index,\n                self.BODY_SNSR_LOC_UUID,\n                ['read'],\n                service)\n\n    def ReadValue(self, options):\n        # Return 'Chest' as the sensor location.\n        return [ 0x01 ]\n\nclass HeartRateControlPointChrc(Characteristic):\n    HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb'\n\n    def __init__(self, bus, index, service):\n        Characteristic.__init__(\n                self, bus, index,\n                self.HR_CTRL_PT_UUID,\n                ['write'],\n                service)\n\n    def WriteValue(self, value, options):\n        print('Heart Rate Control Point WriteValue called')\n\n        if len(value) != 1:\n            raise InvalidValueLengthException()\n\n        byte = value[0]\n        print('Control Point value: ' + repr(byte))\n\n        if byte != 1:\n            raise FailedException(\"0x80\")\n\n        print('Energy Expended field reset!')\n        self.service.energy_expended = 0\n\n\nclass BatteryService(Service):\n    \"\"\"\n    Fake Battery service that emulates a draining battery.\n\n    \"\"\"\n    BATTERY_UUID = '180f'\n\n    def __init__(self, bus, index):\n        Service.__init__(self, bus, index, self.BATTERY_UUID, True)\n        self.add_characteristic(BatteryLevelCharacteristic(bus, 0, self))\n\n\nclass BatteryLevelCharacteristic(Characteristic):\n    \"\"\"\n    Fake Battery Level characteristic. The battery level is drained by 2 points\n    every 5 seconds.\n\n    \"\"\"\n    BATTERY_LVL_UUID = '2a19'\n\n    def __init__(self, bus, index, service):\n        Characteristic.__init__(\n                self, bus, index,\n                self.BATTERY_LVL_UUID,\n                ['read', 'notify'],\n                service)\n        self.notifying = False\n        self.battery_lvl = 100\n        GObject.timeout_add(5000, self.drain_battery)\n\n    def notify_battery_level(self):\n        if not self.notifying:\n            return\n        self.PropertiesChanged(\n                GATT_CHRC_IFACE,\n                { 'Value': [dbus.Byte(self.battery_lvl)] }, [])\n\n    def drain_battery(self):\n        if not self.notifying:\n            return True\n        if self.battery_lvl > 0:\n            self.battery_lvl -= 2\n            if self.battery_lvl &lt; 0:\n                self.battery_lvl = 0\n        print('Battery Level drained: ' + repr(self.battery_lvl))\n        self.notify_battery_level()\n        return True\n\n    def ReadValue(self, options):\n        print('Battery Level read: ' + repr(self.battery_lvl))\n        return [dbus.Byte(self.battery_lvl)]\n\n    def StartNotify(self):\n        if self.notifying:\n            print('Already notifying, nothing to do')\n            return\n\n        self.notifying = True\n        self.notify_battery_level()\n\n    def StopNotify(self):\n        if not self.notifying:\n            print('Not notifying, nothing to do')\n            return\n\n        self.notifying = False\n\n\nclass TestService(Service):\n    \"\"\"\n    Dummy test service that provides characteristics and descriptors that\n    exercise various API functionality.\n\n    \"\"\"\n    TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0'\n\n    def __init__(self, bus, index):\n        Service.__init__(self, bus, index, self.TEST_SVC_UUID, True)\n        self.add_characteristic(TestCharacteristic(bus, 0, self))\n        self.add_characteristic(TestEncryptCharacteristic(bus, 1, self))\n        self.add_characteristic(TestSecureCharacteristic(bus, 2, self))\n\nclass TestCharacteristic(Characteristic):\n    \"\"\"\n    Dummy test characteristic. Allows writing arbitrary bytes to its value, and\n    contains \"extended properties\", as well as a test descriptor.\n\n    \"\"\"\n    TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1'\n\n    def __init__(self, bus, index, service):\n        Characteristic.__init__(\n                self, bus, index,\n                self.TEST_CHRC_UUID,\n                ['read', 'write', 'writable-auxiliaries'],\n                service)\n        self.value = []\n        self.add_descriptor(TestDescriptor(bus, 0, self))\n        self.add_descriptor(\n                CharacteristicUserDescriptionDescriptor(bus, 1, self))\n\n    def ReadValue(self, options):\n        print('TestCharacteristic Read: ' + repr(self.value))\n        return self.value\n\n    def WriteValue(self, value, options):\n        print('TestCharacteristic Write: ' + repr(value))\n        self.value = value\n\n\nclass TestDescriptor(Descriptor):\n    \"\"\"\n    Dummy test descriptor. Returns a static value.\n\n    \"\"\"\n    TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef2'\n\n    def __init__(self, bus, index, characteristic):\n        Descriptor.__init__(\n                self, bus, index,\n                self.TEST_DESC_UUID,\n                ['read', 'write'],\n                characteristic)\n\n    def ReadValue(self, options):\n        return [\n                dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')\n        ]\n\n\nclass CharacteristicUserDescriptionDescriptor(Descriptor):\n    \"\"\"\n    Writable CUD descriptor.\n\n    \"\"\"\n    CUD_UUID = '2901'\n\n    def __init__(self, bus, index, characteristic):\n        self.writable = 'writable-auxiliaries' in characteristic.flags\n        self.value = array.array('B', b'This is a characteristic for testing')\n        self.value = self.value.tolist()\n        Descriptor.__init__(\n                self, bus, index,\n                self.CUD_UUID,\n                ['read', 'write'],\n                characteristic)\n\n    def ReadValue(self, options):\n        return self.value\n\n    def WriteValue(self, value, options):\n        if not self.writable:\n            raise NotPermittedException()\n        self.value = value\n\nclass TestEncryptCharacteristic(Characteristic):\n    \"\"\"\n    Dummy test characteristic requiring encryption.\n\n    \"\"\"\n    TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef3'\n\n    def __init__(self, bus, index, service):\n        Characteristic.__init__(\n                self, bus, index,\n                self.TEST_CHRC_UUID,\n                ['encrypt-read', 'encrypt-write'],\n                service)\n        self.value = []\n        self.add_descriptor(TestEncryptDescriptor(bus, 2, self))\n        self.add_descriptor(\n                CharacteristicUserDescriptionDescriptor(bus, 3, self))\n\n    def ReadValue(self, options):\n        print('TestEncryptCharacteristic Read: ' + repr(self.value))\n        return self.value\n\n    def WriteValue(self, value, options):\n        print('TestEncryptCharacteristic Write: ' + repr(value))\n        self.value = value\n\nclass TestEncryptDescriptor(Descriptor):\n    \"\"\"\n    Dummy test descriptor requiring encryption. Returns a static value.\n\n    \"\"\"\n    TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef4'\n\n    def __init__(self, bus, index, characteristic):\n        Descriptor.__init__(\n                self, bus, index,\n                self.TEST_DESC_UUID,\n                ['encrypt-read', 'encrypt-write'],\n                characteristic)\n\n    def ReadValue(self, options):\n        return [\n                dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')\n        ]\n\n\nclass TestSecureCharacteristic(Characteristic):\n    \"\"\"\n    Dummy test characteristic requiring secure connection.\n\n    \"\"\"\n    TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef5'\n\n    def __init__(self, bus, index, service):\n        Characteristic.__init__(\n                self, bus, index,\n                self.TEST_CHRC_UUID,\n                ['secure-read', 'secure-write'],\n                service)\n        self.value = []\n        self.add_descriptor(TestSecureDescriptor(bus, 2, self))\n        self.add_descriptor(\n                CharacteristicUserDescriptionDescriptor(bus, 3, self))\n\n    def ReadValue(self, options):\n        print('TestSecureCharacteristic Read: ' + repr(self.value))\n        return self.value\n\n    def WriteValue(self, value, options):\n        print('TestSecureCharacteristic Write: ' + repr(value))\n        self.value = value\n\n\nclass TestSecureDescriptor(Descriptor):\n    \"\"\"\n    Dummy test descriptor requiring secure connection. Returns a static value.\n\n    \"\"\"\n    TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef6'\n\n    def __init__(self, bus, index, characteristic):\n        Descriptor.__init__(\n                self, bus, index,\n                self.TEST_DESC_UUID,\n                ['secure-read', 'secure-write'],\n                characteristic)\n\n    def ReadValue(self, options):\n        return [\n                dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')\n        ]\n\ndef register_app_cb():\n    print('GATT application registered')\n\n\ndef register_app_error_cb(error):\n    print('Failed to register application: ' + str(error))\n    mainloop.quit()\n\n\ndef find_adapter(bus):\n    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '\/'),\n                               DBUS_OM_IFACE)\n    objects = remote_om.GetManagedObjects()\n\n    for o, props in objects.items():\n        if GATT_MANAGER_IFACE in props.keys():\n            return o\n\n    return None\n\ndef main():\n    global mainloop\n\n    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)\n\n    bus = dbus.SystemBus()\n\n    adapter = find_adapter(bus)\n    if not adapter:\n        print('GattManager1 interface not found')\n        return\n\n    service_manager = dbus.Interface(\n            bus.get_object(BLUEZ_SERVICE_NAME, adapter),\n            GATT_MANAGER_IFACE)\n\n    app = Application(bus)\n\n    mainloop = GObject.MainLoop()\n\n    print('Registering GATT application...')\n\n    service_manager.RegisterApplication(app.get_path(), {},\n                                    reply_handler=register_app_cb,\n                                    error_handler=register_app_error_cb)\n\n    mainloop.run()\n\nif __name__ == '__main__':\n    main()\n<\/pre>\n\n\n\n<p>\u521b\u5efauart_peripheral.py\u6587\u4ef6<\/p>\n\n\n\n<pre class=\"wp-block-syntaxhighlighter-code\">import sys\nimport dbus, dbus.mainloop.glib\nfrom gi.repository import GObject\nfrom example_advertisement import Advertisement\nfrom example_advertisement import register_ad_cb, register_ad_error_cb\nfrom example_gatt_server import Service, Characteristic\nfrom example_gatt_server import register_app_cb, register_app_error_cb\n \nBLUEZ_SERVICE_NAME =           'org.bluez'\nDBUS_OM_IFACE =                'org.freedesktop.DBus.ObjectManager'\nLE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'\nGATT_MANAGER_IFACE =           'org.bluez.GattManager1'\nGATT_CHRC_IFACE =              'org.bluez.GattCharacteristic1'\nUART_SERVICE_UUID =            '6e400001-b5a3-f393-e0a9-e50e24dcca9e'\nUART_RX_CHARACTERISTIC_UUID =  '6e400002-b5a3-f393-e0a9-e50e24dcca9e'\nUART_TX_CHARACTERISTIC_UUID =  '6e400003-b5a3-f393-e0a9-e50e24dcca9e'\nLOCAL_NAME =                   'RaspberryPi3_UART' # \u8fd9\u4e2a\u540d\u5b57\u662f\u5c0f\u7a0b\u5e8f\u84dd\u7259\u626b\u63cf\u5230\u7684\u540d\u5b57\nmainloop = None\n \nclass TxCharacteristic(Characteristic):\n    def __init__(self, bus, index, service):\n        Characteristic.__init__(self, bus, index, UART_TX_CHARACTERISTIC_UUID,\n                                ['notify'], service)\n        self.notifying = False\n        GObject.io_add_watch(sys.stdin, GObject.IO_IN, self.on_console_input)\n \n    def on_console_input(self, fd, condition):\n        s = fd.readline()\n        if s.isspace():\n            pass\n        else:\n            self.send_tx(s)\n        return True\n \n    def send_tx(self, s):\n        if not self.notifying:\n            return\n        value = []\n        for c in s:\n            value.append(dbus.Byte(c.encode()))\n        self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': value}, [])\n \n    def StartNotify(self):\n        if self.notifying:\n            return\n        self.notifying = True\n \n    def StopNotify(self):\n        if not self.notifying:\n            return\n        self.notifying = False\n \nclass RxCharacteristic(Characteristic):\n    def __init__(self, bus, index, service):\n        Characteristic.__init__(self, bus, index, UART_RX_CHARACTERISTIC_UUID,\n                                ['write'], service)\n \n    def WriteValue(self, value, options):\n        print('remote: {}'.format(bytearray(value).decode()))\n \nclass UartService(Service):\n    def __init__(self, bus, index):\n        Service.__init__(self, bus, index, UART_SERVICE_UUID, True)\n        self.add_characteristic(TxCharacteristic(bus, 0, self))\n        self.add_characteristic(RxCharacteristic(bus, 1, self))\n \nclass Application(dbus.service.Object):\n    def __init__(self, bus):\n        self.path = '\/'\n        self.services = []\n        dbus.service.Object.__init__(self, bus, self.path)\n \n    def get_path(self):\n        return dbus.ObjectPath(self.path)\n \n    def add_service(self, service):\n        self.services.append(service)\n \n    @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')\n    def GetManagedObjects(self):\n        response = {}\n        for service in self.services:\n            response[service.get_path()] = service.get_properties()\n            chrcs = service.get_characteristics()\n            for chrc in chrcs:\n                response[chrc.get_path()] = chrc.get_properties()\n        return response\n \nclass UartApplication(Application):\n    def __init__(self, bus):\n        Application.__init__(self, bus)\n        self.add_service(UartService(bus, 0))\n \nclass UartAdvertisement(Advertisement):\n    def __init__(self, bus, index):\n        Advertisement.__init__(self, bus, index, 'peripheral')\n        self.add_service_uuid(UART_SERVICE_UUID)\n        self.add_local_name(LOCAL_NAME)\n        self.include_tx_power = True\n \ndef find_adapter(bus):\n    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '\/'),\n                               DBUS_OM_IFACE)\n    objects = remote_om.GetManagedObjects()\n    for o, props in objects.items():\n        for iface in (LE_ADVERTISING_MANAGER_IFACE, GATT_MANAGER_IFACE):\n            if iface not in props:\n                continue\n        return o\n    return None\n \ndef main():\n    global mainloop\n    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)\n    bus = dbus.SystemBus()\n    adapter = find_adapter(bus)\n    if not adapter:\n        print('BLE adapter not found')\n        return\n \n    service_manager = dbus.Interface(\n                                bus.get_object(BLUEZ_SERVICE_NAME, adapter),\n                                GATT_MANAGER_IFACE)\n    ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),\n                                LE_ADVERTISING_MANAGER_IFACE)\n \n    app = UartApplication(bus)\n    adv = UartAdvertisement(bus, 0)\n \n    mainloop = GObject.MainLoop()\n \n    service_manager.RegisterApplication(app.get_path(), {},\n                                        reply_handler=register_app_cb,\n                                        error_handler=register_app_error_cb)\n    ad_manager.RegisterAdvertisement(adv.get_path(), {},\n                                     reply_handler=register_ad_cb,\n                                     error_handler=register_ad_error_cb)\n    try:\n        mainloop.run()\n    except KeyboardInterrupt:\n        adv.Release()\n \nif __name__ == '__main__':\n    main()<\/pre>\n\n\n\n<p>\u7b2c\u56db\uff0c\u8fd0\u884cuart_peripheral.py <\/p>\n\n\n\n<pre class=\"wp-block-syntaxhighlighter-code\">$ python uart_peripheral.py \nGATT application registered\nGetAll\nreturning props\nAdvertisement registered<\/pre>\n\n\n\n<p>\u7b2c\u4e94\uff0c\u6d4b\u8bd5<\/p>\n\n\n\n<p>\u624b\u673a\u5b89\u88c5\u201cnRF Toolbox\u201d\u7684App\uff0c\u6253\u5f00\u540e\uff0c\u70b9\u51fb\u4f7f\u7528UART\u6a21\u5757<\/p>\n\n\n\n<p>\u5728\u70b9\u51fb\u201cConnect\u201d\uff0c\u9009\u62e9\u626b\u63cf\u51fa\u6765\u7684\u8bbe\u5907\uff0c\u8fde\u63a5\u6210\u529f\u4e4b\u540e\uff0c\u70b9\u51fb\u201cShow Log\u201d\uff0c\u53ef\u4ee5\u770b\u5230\u8fde\u63a5\u8fc7\u7a0b\u7684\u65e5\u5fd7\uff0c\u5e76\u4e14\u53ef\u4ee5\u8fdb\u884c\u6570\u636e\u4ea4\u4e92<\/p>\n\n\n\n<p>\u5c0f\u7a0b\u5e8f\u84dd\u7259\u529f\u80fd\uff0c\u4e5f\u53ef\u4ee5\u626b\u63cf\u5230\u5bf9\u5e94\u7684\u8bbe\u5907\u4e86<\/p>\n","protected":false},"excerpt":{"rendered":"<p>2019\/12\/31 \u8865\u5145\u2026\u2026BLE\u7684\u4f20\u8f93\u901f\u7387\u592a\u5751\u4e86\uff0c\u867d\u7136\u5c0f\u7a0b\u5e8f\u652f\u6301\uff0c\u4f46\u662f\u771f\u7684\u7528\u8d77\u6765\u5f88\u6162\u554a\uff01\u54ed\u2026\u2026 \u4f4e\u529f\u8017BL&hellip; <a href=\"http:\/\/cf3b5.com\/blog\/?p=143\" class=\"more-link\">\u7ee7\u7eed\u9605\u8bfb <span class=\"screen-reader-text\">Raspberry Pi\u7684BLE\u5f00\u53d1\u4f8b\u5b50<\/span><\/a><\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":[],"categories":[1],"tags":[],"_links":{"self":[{"href":"http:\/\/cf3b5.com\/blog\/index.php?rest_route=\/wp\/v2\/posts\/143"}],"collection":[{"href":"http:\/\/cf3b5.com\/blog\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/cf3b5.com\/blog\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/cf3b5.com\/blog\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/cf3b5.com\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=143"}],"version-history":[{"count":9,"href":"http:\/\/cf3b5.com\/blog\/index.php?rest_route=\/wp\/v2\/posts\/143\/revisions"}],"predecessor-version":[{"id":153,"href":"http:\/\/cf3b5.com\/blog\/index.php?rest_route=\/wp\/v2\/posts\/143\/revisions\/153"}],"wp:attachment":[{"href":"http:\/\/cf3b5.com\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=143"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/cf3b5.com\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=143"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/cf3b5.com\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=143"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}