Merge pull request #164 from AlanRosenthal/alan/local-write
Add `local-write` to bumble-console
diff --git a/apps/console.py b/apps/console.py
index 37afd22..498e224 100644
--- a/apps/console.py
+++ b/apps/console.py
@@ -24,7 +24,7 @@
import os
import random
import re
-from typing import Optional
+from typing import Optional, Union
from collections import OrderedDict
import click
@@ -126,7 +126,8 @@
def __init__(self):
self.known_addresses = set()
- self.known_attributes = []
+ self.known_remote_attributes = []
+ self.known_local_attributes = []
self.device = None
self.connected_peer = None
self.top_tab = 'device'
@@ -174,10 +175,11 @@
'disconnect': None,
'discover': {'services': None, 'attributes': None},
'request-mtu': None,
- 'read': LiveCompleter(self.known_attributes),
- 'write': LiveCompleter(self.known_attributes),
- 'subscribe': LiveCompleter(self.known_attributes),
- 'unsubscribe': LiveCompleter(self.known_attributes),
+ 'read': LiveCompleter(self.known_remote_attributes),
+ 'write': LiveCompleter(self.known_remote_attributes),
+ 'local-write': LiveCompleter(self.known_local_attributes),
+ 'subscribe': LiveCompleter(self.known_remote_attributes),
+ 'unsubscribe': LiveCompleter(self.known_remote_attributes),
'set-phy': {'1m': None, '2m': None, 'coded': None},
'set-default-phy': None,
'quit': None,
@@ -373,17 +375,19 @@
def show_remote_services(self, services):
lines = []
- del self.known_attributes[:]
+ del self.known_remote_attributes[:]
for service in services:
lines.append(("ansicyan", f"{service}\n"))
for characteristic in service.characteristics:
lines.append(('ansimagenta', f' {characteristic} + \n'))
- self.known_attributes.append(
+ self.known_remote_attributes.append(
f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}'
)
- self.known_attributes.append(f'*.{characteristic.uuid.to_hex_str()}')
- self.known_attributes.append(f'#{characteristic.handle:X}')
+ self.known_remote_attributes.append(
+ f'*.{characteristic.uuid.to_hex_str()}'
+ )
+ self.known_remote_attributes.append(f'#{characteristic.handle:X}')
for descriptor in characteristic.descriptors:
lines.append(("ansigreen", f" {descriptor}\n"))
@@ -392,12 +396,31 @@
def show_local_services(self, attributes):
lines = []
+ del self.known_local_attributes[:]
for attribute in attributes:
if isinstance(attribute, Service):
+ # Save the most recent service for use later
+ service = attribute
lines.append(("ansicyan", f"{attribute}\n"))
- elif isinstance(attribute, (Characteristic, CharacteristicDeclaration)):
+ elif isinstance(attribute, Characteristic):
+ # CharacteristicDeclaration includes all info from Characteristic
+ # no need to print it twice
+ continue
+ elif isinstance(attribute, CharacteristicDeclaration):
+ # Save the most recent characteristic declaration for use later
+ characteristic_declaration = attribute
+ self.known_local_attributes.append(
+ f'{service.uuid.to_hex_str()}.{attribute.characteristic.uuid.to_hex_str()}'
+ )
+ self.known_local_attributes.append(
+ f'#{attribute.characteristic.handle:X}'
+ )
lines.append(("ansimagenta", f" {attribute}\n"))
elif isinstance(attribute, Descriptor):
+ self.known_local_attributes.append(
+ f'{service.uuid.to_hex_str()}.{characteristic_declaration.characteristic.uuid.to_hex_str()}.{attribute.type.to_hex_str()}'
+ )
+ self.known_local_attributes.append(f'#{attribute.handle:X}')
lines.append(("ansigreen", f" {attribute}\n"))
else:
lines.append(("ansiyellow", f"{attribute}\n"))
@@ -501,7 +524,7 @@
self.show_attributes(attributes)
- def find_characteristic(self, param) -> Optional[CharacteristicProxy]:
+ def find_remote_characteristic(self, param) -> Optional[CharacteristicProxy]:
if not self.connected_peer:
return None
parts = param.split('.')
@@ -523,6 +546,38 @@
return None
+ def find_local_attribute(
+ self, param
+ ) -> Optional[Union[Characteristic, Descriptor]]:
+ parts = param.split('.')
+ if len(parts) == 3:
+ service_uuid = UUID(parts[0])
+ characteristic_uuid = UUID(parts[1])
+ descriptor_uuid = UUID(parts[2])
+ return self.device.gatt_server.get_descriptor_attribute(
+ service_uuid, characteristic_uuid, descriptor_uuid
+ )
+ if len(parts) == 2:
+ service_uuid = UUID(parts[0])
+ characteristic_uuid = UUID(parts[1])
+ characteristic_attributes = (
+ self.device.gatt_server.get_characteristic_attributes(
+ service_uuid, characteristic_uuid
+ )
+ )
+ if characteristic_attributes:
+ return characteristic_attributes[1]
+ return None
+ elif len(parts) == 1:
+ if parts[0].startswith('#'):
+ attribute_handle = int(f'{parts[0][1:]}', 16)
+ attribute = self.device.gatt_server.get_attribute(attribute_handle)
+ if isinstance(attribute, (Characteristic, Descriptor)):
+ return attribute
+ return None
+
+ return None
+
async def rssi_monitor_loop(self):
while True:
if self.monitor_rssi and self.connected_peer:
@@ -787,7 +842,7 @@
self.show_error('not connected')
return
- characteristic = self.find_characteristic(params[0])
+ characteristic = self.find_remote_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
return
@@ -812,7 +867,7 @@
except ValueError:
value = str.encode(params[1]) # must be a string
- characteristic = self.find_characteristic(params[0])
+ characteristic = self.find_remote_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
return
@@ -821,6 +876,34 @@
with_response = characteristic.properties & Characteristic.WRITE
await characteristic.write_value(value, with_response=with_response)
+ async def do_local_write(self, params):
+ if len(params) != 2:
+ self.show_error(
+ 'invalid syntax', 'expected local-write <attribute> <value>'
+ )
+ return
+
+ if params[1].upper().startswith("0X"):
+ value = bytes.fromhex(params[1][2:]) # parse as hex string
+ else:
+ try:
+ value = int(params[1]).to_bytes(2, "little") # try as 2 byte integer
+ except ValueError:
+ value = str.encode(params[1]) # must be a string
+
+ attribute = self.find_local_attribute(params[0])
+ if not attribute:
+ self.show_error('invalid syntax', 'unable to find attribute')
+ return
+
+ # send data to any subscribers
+ if isinstance(attribute, Characteristic):
+ attribute.write_value(None, value)
+ if attribute.has_properties([Characteristic.NOTIFY]):
+ await self.device.gatt_server.notify_subscribers(attribute)
+ if attribute.has_properties([Characteristic.INDICATE]):
+ await self.device.gatt_server.indicate_subscribers(attribute)
+
async def do_subscribe(self, params):
if not self.connected_peer:
self.show_error('not connected')
@@ -830,7 +913,7 @@
self.show_error('invalid syntax', 'expected subscribe <attribute>')
return
- characteristic = self.find_characteristic(params[0])
+ characteristic = self.find_remote_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
return
@@ -850,7 +933,7 @@
self.show_error('invalid syntax', 'expected subscribe <attribute>')
return
- characteristic = self.find_characteristic(params[0])
+ characteristic = self.find_remote_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
return
diff --git a/bumble/att.py b/bumble/att.py
index 8311d18..55ae8a5 100644
--- a/bumble/att.py
+++ b/bumble/att.py
@@ -750,10 +750,10 @@
permissions_str.split(","),
0,
)
- except TypeError:
+ except TypeError as exc:
raise TypeError(
- f"Attribute::permissions error:\nExpected a string containing any of the keys, seperated by commas: {','.join(Attribute.PERMISSION_NAMES.values())}\nGot: {permissions_str}"
- )
+ f"Attribute::permissions error:\nExpected a string containing any of the keys, separated by commas: {','.join(Attribute.PERMISSION_NAMES.values())}\nGot: {permissions_str}"
+ ) from exc
def __init__(self, attribute_type, permissions, value=b''):
EventEmitter.__init__(self)
diff --git a/bumble/gatt.py b/bumble/gatt.py
index 7aa065c..74ba6f0 100644
--- a/bumble/gatt.py
+++ b/bumble/gatt.py
@@ -28,7 +28,7 @@
import functools
import logging
import struct
-from typing import Optional, Sequence
+from typing import Optional, Sequence, List, Any, Iterable
from .colors import color
from .core import UUID, get_dict_key_by_value
@@ -259,6 +259,8 @@
See Vol 3, Part G - 3.3 CHARACTERISTIC DEFINITION
'''
+ uuid: UUID
+
# Property flags
BROADCAST = 0x01
READ = 0x02
@@ -325,6 +327,12 @@
return None
+ def has_properties(self, properties: Iterable[int]):
+ for prop in properties:
+ if self.properties & prop == 0:
+ return False
+ return True
+
def __str__(self):
return (
f'Characteristic(handle=0x{self.handle:04X}, '
@@ -340,6 +348,8 @@
See Vol 3, Part G - 3.3.1 CHARACTERISTIC DECLARATION
'''
+ characteristic: Characteristic
+
def __init__(self, characteristic, value_handle):
declaration_bytes = (
struct.pack('<BH', characteristic.properties, value_handle)
diff --git a/bumble/gatt_client.py b/bumble/gatt_client.py
index 25add18..17f622b 100644
--- a/bumble/gatt_client.py
+++ b/bumble/gatt_client.py
@@ -62,7 +62,6 @@
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE,
GATT_REQUEST_TIMEOUT,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
- Service,
Characteristic,
ClientCharacteristicConfigurationBits,
)
diff --git a/tests/gatt_test.py b/tests/gatt_test.py
index 478f299..3dc0f5f 100644
--- a/tests/gatt_test.py
+++ b/tests/gatt_test.py
@@ -872,7 +872,7 @@
# -----------------------------------------------------------------------------
-def test_charracteristic_permissions():
+def test_characteristic_permissions():
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
@@ -882,6 +882,21 @@
# -----------------------------------------------------------------------------
+def test_characteristic_has_properties():
+ characteristic = Characteristic(
+ 'FDB159DB-036C-49E3-B3DB-6325AC750806',
+ Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
+ 'READABLE,WRITEABLE',
+ )
+ assert characteristic.has_properties([Characteristic.READ])
+ assert characteristic.has_properties([Characteristic.READ, Characteristic.WRITE])
+ assert not characteristic.has_properties(
+ [Characteristic.READ, Characteristic.WRITE, Characteristic.INDICATE]
+ )
+ assert not characteristic.has_properties([Characteristic.INDICATE])
+
+
+# -----------------------------------------------------------------------------
def test_descriptor_permissions():
descriptor = Descriptor('2902', 'READABLE,WRITEABLE')
assert descriptor.permissions == 3