Merge pull request #164 from AlanRosenthal/alan/local-write

Add `local-write` to bumble-console
diff --git a/apps/console.py b/apps/console.py
index 37afd22..498e224 100644
--- a/apps/console.py
+++ b/apps/console.py
@@ -24,7 +24,7 @@
 import os
 import random
 import re
-from typing import Optional
+from typing import Optional, Union
 from collections import OrderedDict
 
 import click
@@ -126,7 +126,8 @@
 
     def __init__(self):
         self.known_addresses = set()
-        self.known_attributes = []
+        self.known_remote_attributes = []
+        self.known_local_attributes = []
         self.device = None
         self.connected_peer = None
         self.top_tab = 'device'
@@ -174,10 +175,11 @@
                     'disconnect': None,
                     'discover': {'services': None, 'attributes': None},
                     'request-mtu': None,
-                    'read': LiveCompleter(self.known_attributes),
-                    'write': LiveCompleter(self.known_attributes),
-                    'subscribe': LiveCompleter(self.known_attributes),
-                    'unsubscribe': LiveCompleter(self.known_attributes),
+                    'read': LiveCompleter(self.known_remote_attributes),
+                    'write': LiveCompleter(self.known_remote_attributes),
+                    'local-write': LiveCompleter(self.known_local_attributes),
+                    'subscribe': LiveCompleter(self.known_remote_attributes),
+                    'unsubscribe': LiveCompleter(self.known_remote_attributes),
                     'set-phy': {'1m': None, '2m': None, 'coded': None},
                     'set-default-phy': None,
                     'quit': None,
@@ -373,17 +375,19 @@
 
     def show_remote_services(self, services):
         lines = []
-        del self.known_attributes[:]
+        del self.known_remote_attributes[:]
         for service in services:
             lines.append(("ansicyan", f"{service}\n"))
 
             for characteristic in service.characteristics:
                 lines.append(('ansimagenta', f'  {characteristic} + \n'))
-                self.known_attributes.append(
+                self.known_remote_attributes.append(
                     f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}'
                 )
-                self.known_attributes.append(f'*.{characteristic.uuid.to_hex_str()}')
-                self.known_attributes.append(f'#{characteristic.handle:X}')
+                self.known_remote_attributes.append(
+                    f'*.{characteristic.uuid.to_hex_str()}'
+                )
+                self.known_remote_attributes.append(f'#{characteristic.handle:X}')
                 for descriptor in characteristic.descriptors:
                     lines.append(("ansigreen", f"    {descriptor}\n"))
 
@@ -392,12 +396,31 @@
 
     def show_local_services(self, attributes):
         lines = []
+        del self.known_local_attributes[:]
         for attribute in attributes:
             if isinstance(attribute, Service):
+                # Save the most recent service for use later
+                service = attribute
                 lines.append(("ansicyan", f"{attribute}\n"))
-            elif isinstance(attribute, (Characteristic, CharacteristicDeclaration)):
+            elif isinstance(attribute, Characteristic):
+                # CharacteristicDeclaration includes all info from Characteristic
+                # no need to print it twice
+                continue
+            elif isinstance(attribute, CharacteristicDeclaration):
+                # Save the most recent characteristic declaration for use later
+                characteristic_declaration = attribute
+                self.known_local_attributes.append(
+                    f'{service.uuid.to_hex_str()}.{attribute.characteristic.uuid.to_hex_str()}'
+                )
+                self.known_local_attributes.append(
+                    f'#{attribute.characteristic.handle:X}'
+                )
                 lines.append(("ansimagenta", f"  {attribute}\n"))
             elif isinstance(attribute, Descriptor):
+                self.known_local_attributes.append(
+                    f'{service.uuid.to_hex_str()}.{characteristic_declaration.characteristic.uuid.to_hex_str()}.{attribute.type.to_hex_str()}'
+                )
+                self.known_local_attributes.append(f'#{attribute.handle:X}')
                 lines.append(("ansigreen", f"    {attribute}\n"))
             else:
                 lines.append(("ansiyellow", f"{attribute}\n"))
@@ -501,7 +524,7 @@
 
         self.show_attributes(attributes)
 
-    def find_characteristic(self, param) -> Optional[CharacteristicProxy]:
+    def find_remote_characteristic(self, param) -> Optional[CharacteristicProxy]:
         if not self.connected_peer:
             return None
         parts = param.split('.')
@@ -523,6 +546,38 @@
 
         return None
 
+    def find_local_attribute(
+        self, param
+    ) -> Optional[Union[Characteristic, Descriptor]]:
+        parts = param.split('.')
+        if len(parts) == 3:
+            service_uuid = UUID(parts[0])
+            characteristic_uuid = UUID(parts[1])
+            descriptor_uuid = UUID(parts[2])
+            return self.device.gatt_server.get_descriptor_attribute(
+                service_uuid, characteristic_uuid, descriptor_uuid
+            )
+        if len(parts) == 2:
+            service_uuid = UUID(parts[0])
+            characteristic_uuid = UUID(parts[1])
+            characteristic_attributes = (
+                self.device.gatt_server.get_characteristic_attributes(
+                    service_uuid, characteristic_uuid
+                )
+            )
+            if characteristic_attributes:
+                return characteristic_attributes[1]
+            return None
+        elif len(parts) == 1:
+            if parts[0].startswith('#'):
+                attribute_handle = int(f'{parts[0][1:]}', 16)
+                attribute = self.device.gatt_server.get_attribute(attribute_handle)
+                if isinstance(attribute, (Characteristic, Descriptor)):
+                    return attribute
+                return None
+
+        return None
+
     async def rssi_monitor_loop(self):
         while True:
             if self.monitor_rssi and self.connected_peer:
@@ -787,7 +842,7 @@
             self.show_error('not connected')
             return
 
-        characteristic = self.find_characteristic(params[0])
+        characteristic = self.find_remote_characteristic(params[0])
         if characteristic is None:
             self.show_error('no such characteristic')
             return
@@ -812,7 +867,7 @@
             except ValueError:
                 value = str.encode(params[1])  # must be a string
 
-        characteristic = self.find_characteristic(params[0])
+        characteristic = self.find_remote_characteristic(params[0])
         if characteristic is None:
             self.show_error('no such characteristic')
             return
@@ -821,6 +876,34 @@
         with_response = characteristic.properties & Characteristic.WRITE
         await characteristic.write_value(value, with_response=with_response)
 
+    async def do_local_write(self, params):
+        if len(params) != 2:
+            self.show_error(
+                'invalid syntax', 'expected local-write <attribute> <value>'
+            )
+            return
+
+        if params[1].upper().startswith("0X"):
+            value = bytes.fromhex(params[1][2:])  # parse as hex string
+        else:
+            try:
+                value = int(params[1]).to_bytes(2, "little")  # try as 2 byte integer
+            except ValueError:
+                value = str.encode(params[1])  # must be a string
+
+        attribute = self.find_local_attribute(params[0])
+        if not attribute:
+            self.show_error('invalid syntax', 'unable to find attribute')
+            return
+
+        # send data to any subscribers
+        if isinstance(attribute, Characteristic):
+            attribute.write_value(None, value)
+            if attribute.has_properties([Characteristic.NOTIFY]):
+                await self.device.gatt_server.notify_subscribers(attribute)
+            if attribute.has_properties([Characteristic.INDICATE]):
+                await self.device.gatt_server.indicate_subscribers(attribute)
+
     async def do_subscribe(self, params):
         if not self.connected_peer:
             self.show_error('not connected')
@@ -830,7 +913,7 @@
             self.show_error('invalid syntax', 'expected subscribe <attribute>')
             return
 
-        characteristic = self.find_characteristic(params[0])
+        characteristic = self.find_remote_characteristic(params[0])
         if characteristic is None:
             self.show_error('no such characteristic')
             return
@@ -850,7 +933,7 @@
             self.show_error('invalid syntax', 'expected subscribe <attribute>')
             return
 
-        characteristic = self.find_characteristic(params[0])
+        characteristic = self.find_remote_characteristic(params[0])
         if characteristic is None:
             self.show_error('no such characteristic')
             return
diff --git a/bumble/att.py b/bumble/att.py
index 8311d18..55ae8a5 100644
--- a/bumble/att.py
+++ b/bumble/att.py
@@ -750,10 +750,10 @@
                 permissions_str.split(","),
                 0,
             )
-        except TypeError:
+        except TypeError as exc:
             raise TypeError(
-                f"Attribute::permissions error:\nExpected a string containing any of the keys, seperated by commas: {','.join(Attribute.PERMISSION_NAMES.values())}\nGot: {permissions_str}"
-            )
+                f"Attribute::permissions error:\nExpected a string containing any of the keys, separated by commas: {','.join(Attribute.PERMISSION_NAMES.values())}\nGot: {permissions_str}"
+            ) from exc
 
     def __init__(self, attribute_type, permissions, value=b''):
         EventEmitter.__init__(self)
diff --git a/bumble/gatt.py b/bumble/gatt.py
index 7aa065c..74ba6f0 100644
--- a/bumble/gatt.py
+++ b/bumble/gatt.py
@@ -28,7 +28,7 @@
 import functools
 import logging
 import struct
-from typing import Optional, Sequence
+from typing import Optional, Sequence, List, Any, Iterable
 
 from .colors import color
 from .core import UUID, get_dict_key_by_value
@@ -259,6 +259,8 @@
     See Vol 3, Part G - 3.3 CHARACTERISTIC DEFINITION
     '''
 
+    uuid: UUID
+
     # Property flags
     BROADCAST = 0x01
     READ = 0x02
@@ -325,6 +327,12 @@
 
         return None
 
+    def has_properties(self, properties: Iterable[int]):
+        for prop in properties:
+            if self.properties & prop == 0:
+                return False
+        return True
+
     def __str__(self):
         return (
             f'Characteristic(handle=0x{self.handle:04X}, '
@@ -340,6 +348,8 @@
     See Vol 3, Part G - 3.3.1 CHARACTERISTIC DECLARATION
     '''
 
+    characteristic: Characteristic
+
     def __init__(self, characteristic, value_handle):
         declaration_bytes = (
             struct.pack('<BH', characteristic.properties, value_handle)
diff --git a/bumble/gatt_client.py b/bumble/gatt_client.py
index 25add18..17f622b 100644
--- a/bumble/gatt_client.py
+++ b/bumble/gatt_client.py
@@ -62,7 +62,6 @@
     GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE,
     GATT_REQUEST_TIMEOUT,
     GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
-    Service,
     Characteristic,
     ClientCharacteristicConfigurationBits,
 )
diff --git a/tests/gatt_test.py b/tests/gatt_test.py
index 478f299..3dc0f5f 100644
--- a/tests/gatt_test.py
+++ b/tests/gatt_test.py
@@ -872,7 +872,7 @@
 
 
 # -----------------------------------------------------------------------------
-def test_charracteristic_permissions():
+def test_characteristic_permissions():
     characteristic = Characteristic(
         'FDB159DB-036C-49E3-B3DB-6325AC750806',
         Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
@@ -882,6 +882,21 @@
 
 
 # -----------------------------------------------------------------------------
+def test_characteristic_has_properties():
+    characteristic = Characteristic(
+        'FDB159DB-036C-49E3-B3DB-6325AC750806',
+        Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
+        'READABLE,WRITEABLE',
+    )
+    assert characteristic.has_properties([Characteristic.READ])
+    assert characteristic.has_properties([Characteristic.READ, Characteristic.WRITE])
+    assert not characteristic.has_properties(
+        [Characteristic.READ, Characteristic.WRITE, Characteristic.INDICATE]
+    )
+    assert not characteristic.has_properties([Characteristic.INDICATE])
+
+
+# -----------------------------------------------------------------------------
 def test_descriptor_permissions():
     descriptor = Descriptor('2902', 'READABLE,WRITEABLE')
     assert descriptor.permissions == 3