[harness-simulation] keep capture files in sync at runtime (#8138)
This commit keeps capture files in sync at runtime. Therefore, Harness
can obtain the addresses directly from the capture files in manual DUT
mode without having to enter the addresses manually.
diff --git a/tools/harness-simulation/posix/sniffer_sim/sniffer.py b/tools/harness-simulation/posix/sniffer_sim/sniffer.py
index b059cf3..f352131 100644
--- a/tools/harness-simulation/posix/sniffer_sim/sniffer.py
+++ b/tools/harness-simulation/posix/sniffer_sim/sniffer.py
@@ -30,6 +30,7 @@
import argparse
from concurrent import futures
import enum
+import fcntl
import grpc
import logging
import os
@@ -38,6 +39,7 @@
import subprocess
import tempfile
import threading
+import time
import pcap_codec
from proto import sniffer_pb2
@@ -66,13 +68,14 @@
self._transport = None
self._thread = None
self._thread_alive.clear()
- self._pcapng_filename = None
+ self._file_sync_done.clear()
self._tshark_proc = None
def __init__(self, max_nodes_num):
self._max_nodes_num = max_nodes_num
self._thread_alive = threading.Event()
- self._mutex = threading.Lock() # for self._denied_nodeids
+ self._file_sync_done = threading.Event()
+ self._nodeids_mutex = threading.Lock() # for `self._denied_nodeids`
self._reset()
def Start(self, request, context):
@@ -94,11 +97,11 @@
if request.includeEthernet:
self._state |= CaptureState.ETHERNET
cmd += ['-i', 'docker0']
- self._pcapng_filename = os.path.join(tempdir, 'sim.pcapng')
- cmd += ['-w', self._pcapng_filename, '-q', 'not ip and not tcp and not arp and not ether proto 0x8899']
+ cmd += ['-w', '-', '-q', 'not ip and not tcp and not arp and not ether proto 0x8899']
self.logger.debug('Running command: %s', ' '.join(cmd))
- self._tshark_proc = subprocess.Popen(cmd)
+ self._tshark_proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
+ self._set_nonblocking(self._tshark_proc.stdout.fileno())
# Construct pcap codec after initiating tshark to avoid blocking
self._pcap = pcap_codec.PcapCodec(request.channel, fifo_name)
@@ -112,7 +115,7 @@
# Start the sniffer main loop thread
self._thread = threading.Thread(target=self._sniffer_main_loop)
- self._thread.daemon = True
+ self._thread.setDaemon(True)
self._transport.open()
self._thread_alive.set()
self._thread.start()
@@ -128,13 +131,44 @@
except socket.timeout:
continue
- with self._mutex:
+ with self._nodeids_mutex:
denied_nodeids = self._denied_nodeids
# Equivalent to RF enclosure
if nodeid not in denied_nodeids:
self._pcap.append(data)
+ def TransferPcapng(self, request, context):
+ """ Transfer the capture file. """
+
+ # Validate the state
+ if self._state == CaptureState.NONE:
+ return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OPERATION_ERROR)
+
+ # Synchronize the capture file
+ while True:
+ content = self._tshark_proc.stdout.read()
+ if content is None:
+ # Currently no captured packets
+ time.sleep(self.TIMEOUT)
+ elif content == b'':
+ # Reach EOF when tshark terminates
+ break
+ else:
+ # Forward the captured packets
+ yield sniffer_pb2.TransferPcapngResponse(content=content)
+
+ self._file_sync_done.set()
+
+ def _set_nonblocking(self, fd):
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+ if flags < 0:
+ raise RuntimeError('fcntl(F_GETFL) failed')
+
+ flags |= os.O_NONBLOCK
+ if fcntl.fcntl(fd, fcntl.F_SETFL, flags) < 0:
+ raise RuntimeError('fcntl(F_SETFL) failed')
+
def FilterNodes(self, request, context):
""" Only sniffer the specified nodes. """
@@ -150,7 +184,7 @@
if not 1 <= nodeid <= self._max_nodes_num:
return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.VALUE_ERROR)
- with self._mutex:
+ with self._nodeids_mutex:
self._denied_nodeids = denied_nodeids
return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OK)
@@ -161,28 +195,29 @@
self.logger.debug('call Stop')
# Validate and change the state
- if self._state == CaptureState.NONE:
- return sniffer_pb2.StopResponse(status=sniffer_pb2.OPERATION_ERROR, pcap_content=b'')
+ if not (self._state & CaptureState.THREAD):
+ return sniffer_pb2.StopResponse(status=sniffer_pb2.OPERATION_ERROR)
self._state = CaptureState.NONE
self._thread_alive.clear()
- self._thread.join(timeout=1)
+ self._thread.join()
self._transport.close()
self._pcap.close()
self._tshark_proc.terminate()
+ self._file_sync_done.wait()
+ # `self._tshark_proc` becomes None after the next statement
self._tshark_proc.wait()
- with open(self._pcapng_filename, 'rb') as f:
- pcap_content = f.read()
-
self._reset()
- return sniffer_pb2.StopResponse(status=sniffer_pb2.OK, pcap_content=pcap_content)
+ return sniffer_pb2.StopResponse(status=sniffer_pb2.OK)
def serve(address_port, max_nodes_num):
- server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
+ # One worker is used for `Start`, `FilterNodes` and `Stop`
+ # The other worker is used for `TransferPcapng`, which will be kept running by the client in a background thread
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
sniffer_pb2_grpc.add_SnifferServicer_to_server(SnifferServicer(max_nodes_num), server)
# add_secure_port requires a web domain
server.add_insecure_port(address_port)