| #!/usr/bin/env python3 |
| # |
| # Copyright (c) 2016, The OpenThread Authors. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are met: |
| # 1. Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # 3. Neither the name of the copyright holder nor the |
| # names of its contributors may be used to endorse or promote products |
| # derived from this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE |
| # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| # POSSIBILITY OF SUCH DAMAGE. |
| # |
| |
| import struct |
| |
| from binascii import hexlify |
| from enum import IntEnum |
| |
| import ipaddress |
| |
| # Map of 2 bits of parent priority. |
| pp_map = {1: 1, 0: 0, 3: -1, 2: -2} |
| |
| UDP_TEST_PORT = 12345 |
| |
| |
| # Get the signed parent priority from the byte that parent priority is in. |
| def map_pp(pp_byte): |
| return pp_map[((pp_byte & 0xC0) >> 6)] |
| |
| |
| def expect_the_same_class(self, other): |
| if not isinstance(other, self.__class__): |
| raise TypeError("Expected the same class. Got {} and {}".format(type(self), type(other))) |
| |
| |
| class MessageInfo(object): |
| |
| def __init__(self): |
| self.aux_sec_hdr = None |
| self.aux_sec_hdr_bytes = None |
| |
| self.mhr_bytes = None |
| self.extra_open_fields = None |
| |
| self.source_mac_address = None |
| self.destination_mac_address = None |
| |
| self._source_ipv6 = None |
| self._destination_ipv6 = None |
| |
| self._src_port = None |
| self._dst_port = None |
| |
| self.stable = None |
| self.payload_length = 0 |
| |
| def _convert_value_to_ip_address(self, value): |
| if isinstance(value, bytearray): |
| value = bytes(value) |
| |
| return ipaddress.ip_address(value) |
| |
| @property |
| def source_ipv6(self): |
| return self._source_ipv6 |
| |
| @source_ipv6.setter |
| def source_ipv6(self, value): |
| self._source_ipv6 = self._convert_value_to_ip_address(value) |
| |
| @property |
| def destination_ipv6(self): |
| return self._destination_ipv6 |
| |
| @destination_ipv6.setter |
| def destination_ipv6(self, value): |
| self._destination_ipv6 = self._convert_value_to_ip_address(value) |
| |
| @property |
| def src_port(self): |
| return self._src_port |
| |
| @src_port.setter |
| def src_port(self, value): |
| self._src_port = value |
| |
| @property |
| def dst_port(self): |
| return self._dst_port |
| |
| @dst_port.setter |
| def dst_port(self, value): |
| self._dst_port = value |
| |
| |
| class MacAddressType(IntEnum): |
| SHORT = 0 |
| LONG = 1 |
| |
| |
| class MacAddress(object): |
| |
| def __init__(self, mac_address, _type, big_endian=True): |
| if _type == MacAddressType.SHORT: |
| length = 2 |
| elif _type == MacAddressType.LONG: |
| length = 8 |
| |
| if not big_endian: |
| mac_address = mac_address[::-1] |
| |
| self._mac_address = bytearray(mac_address[:length]) |
| self._type = _type |
| |
| @property |
| def type(self): |
| return self._type |
| |
| @property |
| def mac_address(self): |
| return self._mac_address |
| |
| @property |
| def rloc(self): |
| return struct.unpack(">H", self._mac_address)[0] |
| |
| def convert_to_iid(self): |
| if self._type == MacAddressType.SHORT: |
| return (bytearray([0x00, 0x00, 0x00, 0xff, 0xfe, 0x00]) + self._mac_address[:2]) |
| elif self._type == MacAddressType.LONG: |
| return (bytearray([self._mac_address[0] ^ 0x02]) + self._mac_address[1:]) |
| else: |
| raise RuntimeError("Could not convert to IID. Invalid MAC address type: {}".format(self._type)) |
| |
| @classmethod |
| def from_eui64(cls, eui64, big_endian=True): |
| if not isinstance(eui64, bytearray): |
| raise RuntimeError("Could not create MAC address from EUI64. Invalid data type: {}".format(type(eui64))) |
| |
| return cls(eui64, MacAddressType.LONG) |
| |
| @classmethod |
| def from_rloc16(cls, rloc16, big_endian=True): |
| if isinstance(rloc16, int): |
| mac_address = struct.pack(">H", rloc16) |
| elif isinstance(rloc16, bytearray): |
| mac_address = rloc16[:2] |
| else: |
| raise RuntimeError("Could not create MAC address from RLOC16. Invalid data type: {}".format(type(rloc16))) |
| |
| return cls(mac_address, MacAddressType.SHORT) |
| |
| def __eq__(self, other): |
| return (self.type == other.type) and (self.mac_address == other.mac_address) |
| |
| def __repr__(self): |
| return "MacAddress(mac_address=b'{}', type={})".format(hexlify(self.mac_address), MacAddressType(self._type)) |