blob: f352131279f34016575632afe1b3acfb1386f3fd [file] [log] [blame]
Jiachen Dong12933d32022-07-22 12:49:34 +08001#!/usr/bin/env python3
2#
3# Copyright (c) 2022, The OpenThread Authors.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are met:
8# 1. Redistributions of source code must retain the above copyright
9# notice, this list of conditions and the following disclaimer.
10# 2. Redistributions in binary form must reproduce the above copyright
11# notice, this list of conditions and the following disclaimer in the
12# documentation and/or other materials provided with the distribution.
13# 3. Neither the name of the copyright holder nor the
14# names of its contributors may be used to endorse or promote products
15# derived from this software without specific prior written permission.
16#
17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27# POSSIBILITY OF SUCH DAMAGE.
28#
29
30import argparse
Jiachen Dong62fcf852022-08-09 00:19:07 +080031from concurrent import futures
32import enum
Jiachen Dong242c7cc2023-02-15 11:08:02 +080033import fcntl
Jiachen Dong62fcf852022-08-09 00:19:07 +080034import grpc
Jiachen Dong12933d32022-07-22 12:49:34 +080035import logging
Jiachen Dong0b8e9742022-08-23 12:32:45 +080036import os
Jiachen Dong12933d32022-07-22 12:49:34 +080037import signal
Jiachen Dong0b8e9742022-08-23 12:32:45 +080038import socket
39import subprocess
40import tempfile
Jiachen Dong12933d32022-07-22 12:49:34 +080041import threading
Jiachen Dong242c7cc2023-02-15 11:08:02 +080042import time
Jiachen Dong12933d32022-07-22 12:49:34 +080043
Jiachen Dong0b8e9742022-08-23 12:32:45 +080044import pcap_codec
Jiachen Dong62fcf852022-08-09 00:19:07 +080045from proto import sniffer_pb2
46from proto import sniffer_pb2_grpc
Jiachen Dong12933d32022-07-22 12:49:34 +080047import sniffer_transport
48
49
Jiachen Dong0b8e9742022-08-23 12:32:45 +080050class CaptureState(enum.Flag):
51 NONE = 0
52 THREAD = enum.auto()
53 ETHERNET = enum.auto()
54
55
Jiachen Dong62fcf852022-08-09 00:19:07 +080056class SnifferServicer(sniffer_pb2_grpc.Sniffer):
57 """ Class representing the Sniffing node, whose main task is listening. """
Jiachen Dong12933d32022-07-22 12:49:34 +080058
Jiachen Dong62fcf852022-08-09 00:19:07 +080059 logger = logging.getLogger('sniffer.SnifferServicer')
Jiachen Dong12933d32022-07-22 12:49:34 +080060
61 RECV_BUFFER_SIZE = 4096
Jiachen Dong0b8e9742022-08-23 12:32:45 +080062 TIMEOUT = 0.1
Jiachen Dong12933d32022-07-22 12:49:34 +080063
Jiachen Dong62fcf852022-08-09 00:19:07 +080064 def _reset(self):
Jiachen Dong0b8e9742022-08-23 12:32:45 +080065 self._state = CaptureState.NONE
Jiachen Dong62fcf852022-08-09 00:19:07 +080066 self._pcap = None
Jiachen Donga0c4ede2022-09-15 05:31:06 +080067 self._denied_nodeids = None
Jiachen Dong62fcf852022-08-09 00:19:07 +080068 self._transport = None
Jiachen Dong12933d32022-07-22 12:49:34 +080069 self._thread = None
Jiachen Dong12933d32022-07-22 12:49:34 +080070 self._thread_alive.clear()
Jiachen Dong242c7cc2023-02-15 11:08:02 +080071 self._file_sync_done.clear()
Jiachen Dong0b8e9742022-08-23 12:32:45 +080072 self._tshark_proc = None
Jiachen Dong12933d32022-07-22 12:49:34 +080073
Jiachen Donga0c4ede2022-09-15 05:31:06 +080074 def __init__(self, max_nodes_num):
75 self._max_nodes_num = max_nodes_num
Jiachen Dong62fcf852022-08-09 00:19:07 +080076 self._thread_alive = threading.Event()
Jiachen Dong242c7cc2023-02-15 11:08:02 +080077 self._file_sync_done = threading.Event()
78 self._nodeids_mutex = threading.Lock() # for `self._denied_nodeids`
Jiachen Dong62fcf852022-08-09 00:19:07 +080079 self._reset()
80
Jiachen Dong62fcf852022-08-09 00:19:07 +080081 def Start(self, request, context):
Jiachen Dong12933d32022-07-22 12:49:34 +080082 """ Start sniffing. """
83
Jiachen Dong62fcf852022-08-09 00:19:07 +080084 self.logger.debug('call Start')
85
86 # Validate and change the state
Jiachen Dong0b8e9742022-08-23 12:32:45 +080087 if self._state != CaptureState.NONE:
Jiachen Dong62fcf852022-08-09 00:19:07 +080088 return sniffer_pb2.StartResponse(status=sniffer_pb2.OPERATION_ERROR)
Jiachen Dong0b8e9742022-08-23 12:32:45 +080089 self._state = CaptureState.THREAD
Jiachen Dong62fcf852022-08-09 00:19:07 +080090
Jiachen Dong0b8e9742022-08-23 12:32:45 +080091 # Create a temporary named pipe
92 tempdir = tempfile.mkdtemp()
93 fifo_name = os.path.join(tempdir, 'pcap.fifo')
94 os.mkfifo(fifo_name)
95
96 cmd = ['tshark', '-i', fifo_name]
97 if request.includeEthernet:
98 self._state |= CaptureState.ETHERNET
99 cmd += ['-i', 'docker0']
Jiachen Dong242c7cc2023-02-15 11:08:02 +0800100 cmd += ['-w', '-', '-q', 'not ip and not tcp and not arp and not ether proto 0x8899']
Jiachen Dong0b8e9742022-08-23 12:32:45 +0800101
102 self.logger.debug('Running command: %s', ' '.join(cmd))
Jiachen Dong242c7cc2023-02-15 11:08:02 +0800103 self._tshark_proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
104 self._set_nonblocking(self._tshark_proc.stdout.fileno())
Jiachen Dong0b8e9742022-08-23 12:32:45 +0800105
106 # Construct pcap codec after initiating tshark to avoid blocking
107 self._pcap = pcap_codec.PcapCodec(request.channel, fifo_name)
Jiachen Dong62fcf852022-08-09 00:19:07 +0800108
109 # Sniffer all nodes in default, i.e. there is no RF enclosure
Jiachen Donga0c4ede2022-09-15 05:31:06 +0800110 self._denied_nodeids = set()
Jiachen Dong62fcf852022-08-09 00:19:07 +0800111
112 # Create transport
113 transport_factory = sniffer_transport.SnifferTransportFactory()
114 self._transport = transport_factory.create_transport()
115
116 # Start the sniffer main loop thread
Jiachen Dong12933d32022-07-22 12:49:34 +0800117 self._thread = threading.Thread(target=self._sniffer_main_loop)
Jiachen Dong242c7cc2023-02-15 11:08:02 +0800118 self._thread.setDaemon(True)
Jiachen Dong12933d32022-07-22 12:49:34 +0800119 self._transport.open()
Jiachen Dong12933d32022-07-22 12:49:34 +0800120 self._thread_alive.set()
121 self._thread.start()
122
Jiachen Dong62fcf852022-08-09 00:19:07 +0800123 return sniffer_pb2.StartResponse(status=sniffer_pb2.OK)
124
Jiachen Dong0b8e9742022-08-23 12:32:45 +0800125 def _sniffer_main_loop(self):
126 """ Sniffer main loop. """
127
128 while self._thread_alive.is_set():
129 try:
130 data, nodeid = self._transport.recv(self.RECV_BUFFER_SIZE, self.TIMEOUT)
131 except socket.timeout:
132 continue
133
Jiachen Dong242c7cc2023-02-15 11:08:02 +0800134 with self._nodeids_mutex:
Jiachen Donga0c4ede2022-09-15 05:31:06 +0800135 denied_nodeids = self._denied_nodeids
Jiachen Dong0b8e9742022-08-23 12:32:45 +0800136
137 # Equivalent to RF enclosure
Jiachen Donga0c4ede2022-09-15 05:31:06 +0800138 if nodeid not in denied_nodeids:
Jiachen Dong0b8e9742022-08-23 12:32:45 +0800139 self._pcap.append(data)
140
Jiachen Dong242c7cc2023-02-15 11:08:02 +0800141 def TransferPcapng(self, request, context):
142 """ Transfer the capture file. """
143
144 # Validate the state
145 if self._state == CaptureState.NONE:
146 return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OPERATION_ERROR)
147
148 # Synchronize the capture file
149 while True:
150 content = self._tshark_proc.stdout.read()
151 if content is None:
152 # Currently no captured packets
153 time.sleep(self.TIMEOUT)
154 elif content == b'':
155 # Reach EOF when tshark terminates
156 break
157 else:
158 # Forward the captured packets
159 yield sniffer_pb2.TransferPcapngResponse(content=content)
160
161 self._file_sync_done.set()
162
163 def _set_nonblocking(self, fd):
164 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
165 if flags < 0:
166 raise RuntimeError('fcntl(F_GETFL) failed')
167
168 flags |= os.O_NONBLOCK
169 if fcntl.fcntl(fd, fcntl.F_SETFL, flags) < 0:
170 raise RuntimeError('fcntl(F_SETFL) failed')
171
Jiachen Dong62fcf852022-08-09 00:19:07 +0800172 def FilterNodes(self, request, context):
173 """ Only sniffer the specified nodes. """
174
175 self.logger.debug('call FilterNodes')
176
177 # Validate the state
Jiachen Dong0b8e9742022-08-23 12:32:45 +0800178 if not (self._state & CaptureState.THREAD):
Jiachen Dong62fcf852022-08-09 00:19:07 +0800179 return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OPERATION_ERROR)
180
Jiachen Donga0c4ede2022-09-15 05:31:06 +0800181 denied_nodeids = set(request.nodeids)
Jiachen Dong62fcf852022-08-09 00:19:07 +0800182 # Validate the node IDs
Jiachen Donga0c4ede2022-09-15 05:31:06 +0800183 for nodeid in denied_nodeids:
184 if not 1 <= nodeid <= self._max_nodes_num:
Jiachen Dong62fcf852022-08-09 00:19:07 +0800185 return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.VALUE_ERROR)
186
Jiachen Dong242c7cc2023-02-15 11:08:02 +0800187 with self._nodeids_mutex:
Jiachen Donga0c4ede2022-09-15 05:31:06 +0800188 self._denied_nodeids = denied_nodeids
Jiachen Dong62fcf852022-08-09 00:19:07 +0800189
190 return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OK)
191
192 def Stop(self, request, context):
193 """ Stop sniffing, and return the pcap bytes. """
194
195 self.logger.debug('call Stop')
196
197 # Validate and change the state
Jiachen Dong242c7cc2023-02-15 11:08:02 +0800198 if not (self._state & CaptureState.THREAD):
199 return sniffer_pb2.StopResponse(status=sniffer_pb2.OPERATION_ERROR)
Jiachen Dong0b8e9742022-08-23 12:32:45 +0800200 self._state = CaptureState.NONE
Jiachen Dong12933d32022-07-22 12:49:34 +0800201
202 self._thread_alive.clear()
Jiachen Dong242c7cc2023-02-15 11:08:02 +0800203 self._thread.join()
Jiachen Dong12933d32022-07-22 12:49:34 +0800204 self._transport.close()
Jiachen Dong0b8e9742022-08-23 12:32:45 +0800205 self._pcap.close()
Jiachen Dong12933d32022-07-22 12:49:34 +0800206
Jiachen Dong0b8e9742022-08-23 12:32:45 +0800207 self._tshark_proc.terminate()
Jiachen Dong242c7cc2023-02-15 11:08:02 +0800208 self._file_sync_done.wait()
209 # `self._tshark_proc` becomes None after the next statement
Jiachen Dong0b8e9742022-08-23 12:32:45 +0800210 self._tshark_proc.wait()
211
Jiachen Dong62fcf852022-08-09 00:19:07 +0800212 self._reset()
Jiachen Dong12933d32022-07-22 12:49:34 +0800213
Jiachen Dong242c7cc2023-02-15 11:08:02 +0800214 return sniffer_pb2.StopResponse(status=sniffer_pb2.OK)
Jiachen Dong12933d32022-07-22 12:49:34 +0800215
Jiachen Dong62fcf852022-08-09 00:19:07 +0800216
Jiachen Donga0c4ede2022-09-15 05:31:06 +0800217def serve(address_port, max_nodes_num):
Jiachen Dong242c7cc2023-02-15 11:08:02 +0800218 # One worker is used for `Start`, `FilterNodes` and `Stop`
219 # The other worker is used for `TransferPcapng`, which will be kept running by the client in a background thread
220 server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
Jiachen Donga0c4ede2022-09-15 05:31:06 +0800221 sniffer_pb2_grpc.add_SnifferServicer_to_server(SnifferServicer(max_nodes_num), server)
Jiachen Dong62fcf852022-08-09 00:19:07 +0800222 # add_secure_port requires a web domain
223 server.add_insecure_port(address_port)
224 logging.info('server starts on %s', address_port)
225 server.start()
226
227 def exit_handler(signum, context):
228 server.stop(1)
229
230 signal.signal(signal.SIGINT, exit_handler)
231 signal.signal(signal.SIGTERM, exit_handler)
232
233 server.wait_for_termination()
Jiachen Dong12933d32022-07-22 12:49:34 +0800234
235
236def run_sniffer():
Jiachen Dong62fcf852022-08-09 00:19:07 +0800237 logging.basicConfig(level=logging.INFO)
238
Jiachen Dong12933d32022-07-22 12:49:34 +0800239 parser = argparse.ArgumentParser()
Jiachen Dong62fcf852022-08-09 00:19:07 +0800240 parser.add_argument('--grpc-server',
241 dest='grpc_server',
Jiachen Dong12933d32022-07-22 12:49:34 +0800242 type=str,
243 required=True,
Jiachen Dong62fcf852022-08-09 00:19:07 +0800244 help='the address of the sniffer server')
Jiachen Donga0c4ede2022-09-15 05:31:06 +0800245 parser.add_argument('--max-nodes-num',
246 dest='max_nodes_num',
247 type=int,
248 required=True,
249 help='the maximum number of nodes')
Jiachen Dong12933d32022-07-22 12:49:34 +0800250 args = parser.parse_args()
251
Jiachen Donga0c4ede2022-09-15 05:31:06 +0800252 serve(args.grpc_server, args.max_nodes_num)
Jiachen Dong12933d32022-07-22 12:49:34 +0800253
254
255if __name__ == '__main__':
256 run_sniffer()