blob: f352131279f34016575632afe1b3acfb1386f3fd [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2022, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import argparse
from concurrent import futures
import enum
import fcntl
import grpc
import logging
import os
import signal
import socket
import subprocess
import tempfile
import threading
import time
import pcap_codec
from proto import sniffer_pb2
from proto import sniffer_pb2_grpc
import sniffer_transport
class CaptureState(enum.Flag):
NONE = 0
THREAD = enum.auto()
ETHERNET = enum.auto()
class SnifferServicer(sniffer_pb2_grpc.Sniffer):
""" Class representing the Sniffing node, whose main task is listening. """
logger = logging.getLogger('sniffer.SnifferServicer')
RECV_BUFFER_SIZE = 4096
TIMEOUT = 0.1
def _reset(self):
self._state = CaptureState.NONE
self._pcap = None
self._denied_nodeids = None
self._transport = None
self._thread = None
self._thread_alive.clear()
self._file_sync_done.clear()
self._tshark_proc = None
def __init__(self, max_nodes_num):
self._max_nodes_num = max_nodes_num
self._thread_alive = threading.Event()
self._file_sync_done = threading.Event()
self._nodeids_mutex = threading.Lock() # for `self._denied_nodeids`
self._reset()
def Start(self, request, context):
""" Start sniffing. """
self.logger.debug('call Start')
# Validate and change the state
if self._state != CaptureState.NONE:
return sniffer_pb2.StartResponse(status=sniffer_pb2.OPERATION_ERROR)
self._state = CaptureState.THREAD
# Create a temporary named pipe
tempdir = tempfile.mkdtemp()
fifo_name = os.path.join(tempdir, 'pcap.fifo')
os.mkfifo(fifo_name)
cmd = ['tshark', '-i', fifo_name]
if request.includeEthernet:
self._state |= CaptureState.ETHERNET
cmd += ['-i', 'docker0']
cmd += ['-w', '-', '-q', 'not ip and not tcp and not arp and not ether proto 0x8899']
self.logger.debug('Running command: %s', ' '.join(cmd))
self._tshark_proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
self._set_nonblocking(self._tshark_proc.stdout.fileno())
# Construct pcap codec after initiating tshark to avoid blocking
self._pcap = pcap_codec.PcapCodec(request.channel, fifo_name)
# Sniffer all nodes in default, i.e. there is no RF enclosure
self._denied_nodeids = set()
# Create transport
transport_factory = sniffer_transport.SnifferTransportFactory()
self._transport = transport_factory.create_transport()
# Start the sniffer main loop thread
self._thread = threading.Thread(target=self._sniffer_main_loop)
self._thread.setDaemon(True)
self._transport.open()
self._thread_alive.set()
self._thread.start()
return sniffer_pb2.StartResponse(status=sniffer_pb2.OK)
def _sniffer_main_loop(self):
""" Sniffer main loop. """
while self._thread_alive.is_set():
try:
data, nodeid = self._transport.recv(self.RECV_BUFFER_SIZE, self.TIMEOUT)
except socket.timeout:
continue
with self._nodeids_mutex:
denied_nodeids = self._denied_nodeids
# Equivalent to RF enclosure
if nodeid not in denied_nodeids:
self._pcap.append(data)
def TransferPcapng(self, request, context):
""" Transfer the capture file. """
# Validate the state
if self._state == CaptureState.NONE:
return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OPERATION_ERROR)
# Synchronize the capture file
while True:
content = self._tshark_proc.stdout.read()
if content is None:
# Currently no captured packets
time.sleep(self.TIMEOUT)
elif content == b'':
# Reach EOF when tshark terminates
break
else:
# Forward the captured packets
yield sniffer_pb2.TransferPcapngResponse(content=content)
self._file_sync_done.set()
def _set_nonblocking(self, fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
if flags < 0:
raise RuntimeError('fcntl(F_GETFL) failed')
flags |= os.O_NONBLOCK
if fcntl.fcntl(fd, fcntl.F_SETFL, flags) < 0:
raise RuntimeError('fcntl(F_SETFL) failed')
def FilterNodes(self, request, context):
""" Only sniffer the specified nodes. """
self.logger.debug('call FilterNodes')
# Validate the state
if not (self._state & CaptureState.THREAD):
return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OPERATION_ERROR)
denied_nodeids = set(request.nodeids)
# Validate the node IDs
for nodeid in denied_nodeids:
if not 1 <= nodeid <= self._max_nodes_num:
return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.VALUE_ERROR)
with self._nodeids_mutex:
self._denied_nodeids = denied_nodeids
return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OK)
def Stop(self, request, context):
""" Stop sniffing, and return the pcap bytes. """
self.logger.debug('call Stop')
# Validate and change the state
if not (self._state & CaptureState.THREAD):
return sniffer_pb2.StopResponse(status=sniffer_pb2.OPERATION_ERROR)
self._state = CaptureState.NONE
self._thread_alive.clear()
self._thread.join()
self._transport.close()
self._pcap.close()
self._tshark_proc.terminate()
self._file_sync_done.wait()
# `self._tshark_proc` becomes None after the next statement
self._tshark_proc.wait()
self._reset()
return sniffer_pb2.StopResponse(status=sniffer_pb2.OK)
def serve(address_port, max_nodes_num):
# One worker is used for `Start`, `FilterNodes` and `Stop`
# The other worker is used for `TransferPcapng`, which will be kept running by the client in a background thread
server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
sniffer_pb2_grpc.add_SnifferServicer_to_server(SnifferServicer(max_nodes_num), server)
# add_secure_port requires a web domain
server.add_insecure_port(address_port)
logging.info('server starts on %s', address_port)
server.start()
def exit_handler(signum, context):
server.stop(1)
signal.signal(signal.SIGINT, exit_handler)
signal.signal(signal.SIGTERM, exit_handler)
server.wait_for_termination()
def run_sniffer():
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument('--grpc-server',
dest='grpc_server',
type=str,
required=True,
help='the address of the sniffer server')
parser.add_argument('--max-nodes-num',
dest='max_nodes_num',
type=int,
required=True,
help='the maximum number of nodes')
args = parser.parse_args()
serve(args.grpc_server, args.max_nodes_num)
if __name__ == '__main__':
run_sniffer()