blob: d9f6d600db4a2851e75c1b34aec6640fd2f55655 [file] [log] [blame]
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import struct
import pytest
from bumble.controller import Controller
from bumble.gatt_client import CharacteristicProxy
from bumble.gatt_server import Server
from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
from bumble.gatt import (
GATT_BATTERY_LEVEL_CHARACTERISTIC,
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
CharacteristicAdapter,
DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter,
MappedCharacteristicAdapter,
UTF8CharacteristicAdapter,
Service,
Characteristic,
CharacteristicValue,
Descriptor,
)
from bumble.transport import AsyncPipeSink
from bumble.core import UUID
from bumble.att import (
Attribute,
ATT_EXCHANGE_MTU_REQUEST,
ATT_ATTRIBUTE_NOT_FOUND_ERROR,
ATT_PDU,
ATT_Error_Response,
ATT_Read_By_Group_Type_Request,
)
# -----------------------------------------------------------------------------
def basic_check(x):
pdu = x.to_bytes()
parsed = ATT_PDU.from_bytes(pdu)
x_str = str(x)
parsed_str = str(parsed)
assert x_str == parsed_str
# -----------------------------------------------------------------------------
def test_UUID():
u = UUID.from_16_bits(0x7788)
assert str(u) == 'UUID-16:7788'
u = UUID.from_32_bits(0x11223344)
assert str(u) == 'UUID-32:11223344'
u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
v = UUID(str(u))
assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
w = UUID.from_bytes(v.to_bytes())
assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
u1 = UUID.from_16_bits(0x1234)
b1 = u1.to_bytes(force_128=True)
u2 = UUID.from_bytes(b1)
assert u1 == u2
u3 = UUID.from_16_bits(0x180A)
assert str(u3) == 'UUID-16:180A (Device Information)'
# -----------------------------------------------------------------------------
def test_ATT_Error_Response():
pdu = ATT_Error_Response(
request_opcode_in_error=ATT_EXCHANGE_MTU_REQUEST,
attribute_handle_in_error=0x0000,
error_code=ATT_ATTRIBUTE_NOT_FOUND_ERROR,
)
basic_check(pdu)
# -----------------------------------------------------------------------------
def test_ATT_Read_By_Group_Type_Request():
pdu = ATT_Read_By_Group_Type_Request(
starting_handle=0x0001,
ending_handle=0xFFFF,
attribute_group_type=UUID.from_16_bits(0x2800),
)
basic_check(pdu)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_characteristic_encoding():
class Foo(Characteristic):
def encode_value(self, value):
return bytes([value])
def decode_value(self, value_bytes):
return value_bytes[0]
c = Foo(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.Properties.READ,
Characteristic.READABLE,
123,
)
x = c.read_value(None)
assert x == bytes([123])
c.write_value(None, bytes([122]))
assert c.value == 122
class FooProxy(CharacteristicProxy):
def __init__(self, characteristic):
super().__init__(
characteristic.client,
characteristic.handle,
characteristic.end_group_handle,
characteristic.uuid,
characteristic.properties,
)
def encode_value(self, value):
return bytes([value])
def decode_value(self, value_bytes):
return value_bytes[0]
[client, server] = LinkedDevices().devices[:2]
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
service = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic])
server.add_service(service)
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
await peer.discover_characteristics()
c = peer.get_characteristics_by_uuid(characteristic.uuid)
assert len(c) == 1
c = c[0]
cp = FooProxy(c)
v = await cp.read_value()
assert v == 123
await cp.write_value(124)
await async_barrier()
assert characteristic.value == bytes([124])
v = await cp.read_value()
assert v == 124
await cp.write_value(125, with_response=True)
await async_barrier()
assert characteristic.value == bytes([125])
cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2]))
await cd.write_value(100, with_response=True)
await async_barrier()
assert characteristic.value == bytes([50])
last_change = None
def on_change(value):
nonlocal last_change
last_change = value
await c.subscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change == characteristic.value
last_change = None
await server.notify_subscribers(characteristic, value=bytes([125]))
await async_barrier()
assert last_change == bytes([125])
last_change = None
await c.unsubscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change is None
await cp.subscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change == characteristic.value[0]
last_change = None
await server.notify_subscribers(characteristic, value=bytes([126]))
await async_barrier()
assert last_change == 126
last_change = None
await cp.unsubscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change is None
cd = DelegatedCharacteristicAdapter(c, decode=lambda x: x[0])
await cd.subscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change == characteristic.value[0]
last_change = None
await cd.unsubscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change is None
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_attribute_getters():
[client, server] = LinkedDevices().devices[:2]
characteristic_uuid = UUID('FDB159DB-036C-49E3-B3DB-6325AC750806')
characteristic = Characteristic(
characteristic_uuid,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
service_uuid = UUID('3A657F47-D34F-46B3-B1EC-698E29B6B829')
service = Service(service_uuid, [characteristic])
server.add_service(service)
service_attr = server.gatt_server.get_service_attribute(service_uuid)
assert service_attr
(
char_decl_attr,
char_value_attr,
) = server.gatt_server.get_characteristic_attributes(
service_uuid, characteristic_uuid
)
assert char_decl_attr and char_value_attr
desc_attr = server.gatt_server.get_descriptor_attribute(
service_uuid,
characteristic_uuid,
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
)
assert desc_attr
# assert all handles are in expected order
assert (
service_attr.handle
< char_decl_attr.handle
< char_value_attr.handle
< desc_attr.handle
== service_attr.end_group_handle
)
# assert characteristic declarations attribute is followed by characteristic value attribute
assert char_decl_attr.handle + 1 == char_value_attr.handle
# -----------------------------------------------------------------------------
def test_CharacteristicAdapter():
# Check that the CharacteristicAdapter base class is transparent
v = bytes([1, 2, 3])
c = Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.Properties.READ,
Characteristic.READABLE,
v,
)
a = CharacteristicAdapter(c)
value = a.read_value(None)
assert value == v
v = bytes([3, 4, 5])
a.write_value(None, v)
assert c.value == v
# Simple delegated adapter
a = DelegatedCharacteristicAdapter(
c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x))
)
value = a.read_value(None)
assert value == bytes(reversed(v))
v = bytes([3, 4, 5])
a.write_value(None, v)
assert a.value == bytes(reversed(v))
# Packed adapter with single element format
v = 1234
pv = struct.pack('>H', v)
c.value = v
a = PackedCharacteristicAdapter(c, '>H')
value = a.read_value(None)
assert value == pv
c.value = None
a.write_value(None, pv)
assert a.value == v
# Packed adapter with multi-element format
v1 = 1234
v2 = 5678
pv = struct.pack('>HH', v1, v2)
c.value = (v1, v2)
a = PackedCharacteristicAdapter(c, '>HH')
value = a.read_value(None)
assert value == pv
c.value = None
a.write_value(None, pv)
assert a.value == (v1, v2)
# Mapped adapter
v1 = 1234
v2 = 5678
pv = struct.pack('>HH', v1, v2)
mapped = {'v1': v1, 'v2': v2}
c.value = mapped
a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2'))
value = a.read_value(None)
assert value == pv
c.value = None
a.write_value(None, pv)
assert a.value == mapped
# UTF-8 adapter
v = 'Hello π'
ev = v.encode('utf-8')
c.value = v
a = UTF8CharacteristicAdapter(c)
value = a.read_value(None)
assert value == ev
c.value = None
a.write_value(None, ev)
assert a.value == v
# -----------------------------------------------------------------------------
def test_CharacteristicValue():
b = bytes([1, 2, 3])
c = CharacteristicValue(read=lambda _: b)
x = c.read(None)
assert x == b
result = []
c = CharacteristicValue(
write=lambda connection, value: result.append((connection, value))
)
z = object()
c.write(z, b)
assert result == [(z, b)]
# -----------------------------------------------------------------------------
class LinkedDevices:
def __init__(self):
self.connections = [None, None, None]
self.link = LocalLink()
self.controllers = [
Controller('C1', link=self.link),
Controller('C2', link=self.link),
Controller('C3', link=self.link),
]
self.devices = [
Device(
address='F0:F1:F2:F3:F4:F5',
host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])),
),
Device(
address='F1:F2:F3:F4:F5:F6',
host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])),
),
Device(
address='F2:F3:F4:F5:F6:F7',
host=Host(self.controllers[2], AsyncPipeSink(self.controllers[2])),
),
]
self.paired = [None, None, None]
# -----------------------------------------------------------------------------
async def async_barrier():
ready = asyncio.get_running_loop().create_future()
asyncio.get_running_loop().call_soon(ready.set_result, None)
await ready
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_write():
[client, server] = LinkedDevices().devices[:2]
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
)
def on_characteristic1_write(connection, value):
characteristic1._last_value = (connection, value)
characteristic1.on('write', on_characteristic1_write)
def on_characteristic2_read(connection):
return bytes(str(connection.peer_address))
def on_characteristic2_write(connection, value):
characteristic2._last_value = (connection, value)
characteristic2 = Characteristic(
'66DE9057-C848-4ACA-B993-D675644EBB85',
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(
read=on_characteristic2_read, write=on_characteristic2_write
),
)
service1 = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1, characteristic2]
)
server.add_services([service1])
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
await peer.discover_characteristics()
c = peer.get_characteristics_by_uuid(characteristic1.uuid)
assert len(c) == 1
c1 = c[0]
c = peer.get_characteristics_by_uuid(characteristic2.uuid)
assert len(c) == 1
c2 = c[0]
v1 = await peer.read_value(c1)
assert v1 == b''
b = bytes([1, 2, 3])
await peer.write_value(c1, b)
await async_barrier()
assert characteristic1.value == b
v1 = await peer.read_value(c1)
assert v1 == b
assert type(characteristic1._last_value is tuple)
assert len(characteristic1._last_value) == 2
assert str(characteristic1._last_value[0].peer_address) == str(
client.random_address
)
assert characteristic1._last_value[1] == b
bb = bytes([3, 4, 5, 6])
characteristic1.value = bb
v1 = await peer.read_value(c1)
assert v1 == bb
await peer.write_value(c2, b)
await async_barrier()
assert type(characteristic2._last_value is tuple)
assert len(characteristic2._last_value) == 2
assert str(characteristic2._last_value[0].peer_address) == str(
client.random_address
)
assert characteristic2._last_value[1] == b
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_write2():
[client, server] = LinkedDevices().devices[:2]
v = bytes([0x11, 0x22, 0x33, 0x44])
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
value=v,
)
service1 = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1])
server.add_services([service1])
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
c = peer.get_services_by_uuid(service1.uuid)
assert len(c) == 1
s = c[0]
await s.discover_characteristics()
c = s.get_characteristics_by_uuid(characteristic1.uuid)
assert len(c) == 1
c1 = c[0]
v1 = await c1.read_value()
assert v1 == v
a1 = PackedCharacteristicAdapter(c1, '>I')
v1 = await a1.read_value()
assert v1 == struct.unpack('>I', v)[0]
b = bytes([0x55, 0x66, 0x77, 0x88])
await a1.write_value(struct.unpack('>I', b)[0])
await async_barrier()
assert characteristic1.value == b
v1 = await a1.read_value()
assert v1 == struct.unpack('>I', b)[0]
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_subscribe_notify():
[client, server] = LinkedDevices().devices[:2]
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([1, 2, 3]),
)
def on_characteristic1_subscription(connection, notify_enabled, indicate_enabled):
characteristic1._last_subscription = (
connection,
notify_enabled,
indicate_enabled,
)
characteristic1.on('subscription', on_characteristic1_subscription)
characteristic2 = Characteristic(
'66DE9057-C848-4ACA-B993-D675644EBB85',
Characteristic.Properties.READ | Characteristic.Properties.INDICATE,
Characteristic.READABLE,
bytes([4, 5, 6]),
)
def on_characteristic2_subscription(connection, notify_enabled, indicate_enabled):
characteristic2._last_subscription = (
connection,
notify_enabled,
indicate_enabled,
)
characteristic2.on('subscription', on_characteristic2_subscription)
characteristic3 = Characteristic(
'AB5E639C-40C1-4238-B9CB-AF41F8B806E4',
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.INDICATE,
Characteristic.READABLE,
bytes([7, 8, 9]),
)
def on_characteristic3_subscription(connection, notify_enabled, indicate_enabled):
characteristic3._last_subscription = (
connection,
notify_enabled,
indicate_enabled,
)
characteristic3.on('subscription', on_characteristic3_subscription)
service1 = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[characteristic1, characteristic2, characteristic3],
)
server.add_services([service1])
def on_characteristic_subscription(
connection, characteristic, notify_enabled, indicate_enabled
):
server._last_subscription = (
connection,
characteristic,
notify_enabled,
indicate_enabled,
)
server.on('characteristic_subscription', on_characteristic_subscription)
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
await peer.discover_characteristics()
c = peer.get_characteristics_by_uuid(characteristic1.uuid)
assert len(c) == 1
c1 = c[0]
c = peer.get_characteristics_by_uuid(characteristic2.uuid)
assert len(c) == 1
c2 = c[0]
c = peer.get_characteristics_by_uuid(characteristic3.uuid)
assert len(c) == 1
c3 = c[0]
c1._called = False
c1._last_update = None
def on_c1_update(value):
c1._called = True
c1._last_update = value
c1.on('update', on_c1_update)
await peer.subscribe(c1)
await async_barrier()
assert server._last_subscription[1] == characteristic1
assert server._last_subscription[2]
assert not server._last_subscription[3]
assert characteristic1._last_subscription[1]
assert not characteristic1._last_subscription[2]
await server.indicate_subscribers(characteristic1)
await async_barrier()
assert not c1._called
await server.notify_subscribers(characteristic1)
await async_barrier()
assert c1._called
assert c1._last_update == characteristic1.value
c1._called = False
c1._last_update = None
c1_value = characteristic1.value
await server.notify_subscribers(characteristic1, bytes([0, 1, 2]))
await async_barrier()
assert c1._called
assert c1._last_update == bytes([0, 1, 2])
assert characteristic1.value == c1_value
c1._called = False
await peer.unsubscribe(c1)
await server.notify_subscribers(characteristic1)
assert not c1._called
c2._called = False
c2._last_update = None
def on_c2_update(value):
c2._called = True
c2._last_update = value
await peer.subscribe(c2, on_c2_update)
await async_barrier()
await server.notify_subscriber(
characteristic2._last_subscription[0], characteristic2
)
await async_barrier()
assert not c2._called
await server.indicate_subscriber(
characteristic2._last_subscription[0], characteristic2
)
await async_barrier()
assert c2._called
assert c2._last_update == characteristic2.value
c2._called = False
await peer.unsubscribe(c2, on_c2_update)
await server.indicate_subscriber(
characteristic2._last_subscription[0], characteristic2
)
await async_barrier()
assert not c2._called
c3._called = False
c3._called_2 = False
c3._called_3 = False
c3._last_update = None
c3._last_update_2 = None
c3._last_update_3 = None
def on_c3_update(value):
c3._called = True
c3._last_update = value
def on_c3_update_2(value): # for notify
c3._called_2 = True
c3._last_update_2 = value
def on_c3_update_3(value): # for indicate
c3._called_3 = True
c3._last_update_3 = value
c3.on('update', on_c3_update)
await peer.subscribe(c3, on_c3_update_2)
await async_barrier()
await server.notify_subscriber(
characteristic3._last_subscription[0], characteristic3
)
await async_barrier()
assert c3._called
assert c3._last_update == characteristic3.value
assert c3._called_2
assert c3._last_update_2 == characteristic3.value
assert not c3._called_3
c3._called = False
c3._called_2 = False
c3._called_3 = False
await peer.unsubscribe(c3)
await peer.subscribe(c3, on_c3_update_3, prefer_notify=False)
await async_barrier()
characteristic3.value = bytes([1, 2, 3])
await server.indicate_subscriber(
characteristic3._last_subscription[0], characteristic3
)
await async_barrier()
assert c3._called
assert c3._last_update == characteristic3.value
assert not c3._called_2
assert c3._called_3
assert c3._last_update_3 == characteristic3.value
c3._called = False
c3._called_2 = False
c3._called_3 = False
await peer.unsubscribe(c3)
await server.notify_subscriber(
characteristic3._last_subscription[0], characteristic3
)
await server.indicate_subscriber(
characteristic3._last_subscription[0], characteristic3
)
await async_barrier()
assert not c3._called
assert not c3._called_2
assert not c3._called_3
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_mtu_exchange():
[d1, d2, d3] = LinkedDevices().devices[:3]
d3.gatt_server.max_mtu = 100
d3_connections = []
@d3.on('connection')
def on_d3_connection(connection):
d3_connections.append(connection)
await d1.power_on()
await d2.power_on()
await d3.power_on()
d1_connection = await d1.connect(d3.random_address)
assert len(d3_connections) == 1
assert d3_connections[0] is not None
d2_connection = await d2.connect(d3.random_address)
assert len(d3_connections) == 2
assert d3_connections[1] is not None
d1_peer = Peer(d1_connection)
d2_peer = Peer(d2_connection)
d1_client_mtu = await d1_peer.request_mtu(220)
assert d1_client_mtu == 100
assert d1_connection.att_mtu == 100
d2_client_mtu = await d2_peer.request_mtu(50)
assert d2_client_mtu == 50
assert d2_connection.att_mtu == 50
# -----------------------------------------------------------------------------
def test_char_property_to_string():
# single
assert str(Characteristic.Properties(0x01)) == "BROADCAST"
assert str(Characteristic.Properties.BROADCAST) == "BROADCAST"
# double
assert str(Characteristic.Properties(0x03)) == "BROADCAST|READ"
assert (
str(Characteristic.Properties.BROADCAST | Characteristic.Properties.READ)
== "BROADCAST|READ"
)
# -----------------------------------------------------------------------------
def test_characteristic_property_from_string():
# single
assert (
Characteristic.Properties.from_string("BROADCAST")
== Characteristic.Properties.BROADCAST
)
# double
assert (
Characteristic.Properties.from_string("BROADCAST,READ")
== Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
)
assert (
Characteristic.Properties.from_string("READ,BROADCAST")
== Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
)
assert (
Characteristic.Properties.from_string("BROADCAST|READ")
== Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
)
# -----------------------------------------------------------------------------
def test_characteristic_property_from_string_assert():
with pytest.raises(TypeError) as e_info:
Characteristic.Properties.from_string("BROADCAST,HELLO")
assert (
str(e_info.value)
== """Characteristic.Properties::from_string() error:
Expected a string containing any of the keys, separated by , or |: BROADCAST,READ,WRITE_WITHOUT_RESPONSE,WRITE,NOTIFY,INDICATE,AUTHENTICATED_SIGNED_WRITES,EXTENDED_PROPERTIES
Got: BROADCAST,HELLO"""
)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_server_string():
[_, server] = LinkedDevices().devices[:2]
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
service = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic])
server.add_service(service)
assert (
str(server.gatt_server)
== """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access))
CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), READ)
Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), READ)
CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), READ)
Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), READ)
Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829)
CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY)
Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY)
Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)"""
)
# -----------------------------------------------------------------------------
async def async_main():
await test_read_write()
await test_read_write2()
await test_subscribe_notify()
await test_characteristic_encoding()
await test_mtu_exchange()
# -----------------------------------------------------------------------------
def test_permissions_from_string():
assert Attribute.Permissions.from_string('READABLE') == 1
assert Attribute.Permissions.from_string('WRITEABLE') == 2
assert Attribute.Permissions.from_string('READABLE,WRITEABLE') == 3
# -----------------------------------------------------------------------------
def test_characteristic_permissions():
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
'READABLE,WRITEABLE',
)
assert characteristic.permissions == 3
# -----------------------------------------------------------------------------
def test_characteristic_has_properties():
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
'READABLE,WRITEABLE',
)
assert characteristic.has_properties(Characteristic.Properties.READ)
assert characteristic.has_properties(
Characteristic.Properties.READ | Characteristic.Properties.WRITE
)
assert not characteristic.has_properties(
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.INDICATE
)
assert not characteristic.has_properties(Characteristic.Properties.INDICATE)
# -----------------------------------------------------------------------------
def test_descriptor_permissions():
descriptor = Descriptor('2902', 'READABLE,WRITEABLE')
assert descriptor.permissions == 3
# -----------------------------------------------------------------------------
def test_get_attribute_group():
device = Device()
# add some services / characteristics to the gatt server
characteristic1 = Characteristic(
'1111',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
characteristic2 = Characteristic(
'2222',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
services = [Service('1212', [characteristic1]), Service('3233', [characteristic2])]
device.gatt_server.add_services(services)
# get the handles from gatt server
characteristic_attributes1 = device.gatt_server.get_characteristic_attributes(
UUID('1212'), UUID('1111')
)
assert characteristic_attributes1 is not None
characteristic_attributes2 = device.gatt_server.get_characteristic_attributes(
UUID('3233'), UUID('2222')
)
assert characteristic_attributes2 is not None
descriptor1 = device.gatt_server.get_descriptor_attribute(
UUID('1212'), UUID('1111'), UUID('2902')
)
assert descriptor1 is not None
descriptor2 = device.gatt_server.get_descriptor_attribute(
UUID('3233'), UUID('2222'), UUID('2902')
)
assert descriptor2 is not None
# confirm the handles map back to the service
assert (
UUID('1212')
== device.gatt_server.get_attribute_group(
characteristic_attributes1[0].handle, Service
).uuid
)
assert (
UUID('1212')
== device.gatt_server.get_attribute_group(
characteristic_attributes1[1].handle, Service
).uuid
)
assert (
UUID('1212')
== device.gatt_server.get_attribute_group(descriptor1.handle, Service).uuid
)
assert (
UUID('3233')
== device.gatt_server.get_attribute_group(
characteristic_attributes2[0].handle, Service
).uuid
)
assert (
UUID('3233')
== device.gatt_server.get_attribute_group(
characteristic_attributes2[1].handle, Service
).uuid
)
assert (
UUID('3233')
== device.gatt_server.get_attribute_group(descriptor2.handle, Service).uuid
)
# confirm the handles map back to the characteristic
assert (
UUID('1111')
== device.gatt_server.get_attribute_group(
descriptor1.handle, Characteristic
).uuid
)
assert (
UUID('2222')
== device.gatt_server.get_attribute_group(
descriptor2.handle, Characteristic
).uuid
)
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
test_UUID()
test_ATT_Error_Response()
test_ATT_Read_By_Group_Type_Request()
test_CharacteristicValue()
test_CharacteristicAdapter()
asyncio.run(async_main())