[harness-simulation] keep capture files in sync at runtime (#8138)

This commit keeps capture files in sync at runtime. Therefore, Harness
can obtain the addresses directly from the capture files in manual DUT
mode without having to enter the addresses manually.
diff --git a/tools/harness-simulation/posix/sniffer_sim/sniffer.py b/tools/harness-simulation/posix/sniffer_sim/sniffer.py
index b059cf3..f352131 100644
--- a/tools/harness-simulation/posix/sniffer_sim/sniffer.py
+++ b/tools/harness-simulation/posix/sniffer_sim/sniffer.py
@@ -30,6 +30,7 @@
 import argparse
 from concurrent import futures
 import enum
+import fcntl
 import grpc
 import logging
 import os
@@ -38,6 +39,7 @@
 import subprocess
 import tempfile
 import threading
+import time
 
 import pcap_codec
 from proto import sniffer_pb2
@@ -66,13 +68,14 @@
         self._transport = None
         self._thread = None
         self._thread_alive.clear()
-        self._pcapng_filename = None
+        self._file_sync_done.clear()
         self._tshark_proc = None
 
     def __init__(self, max_nodes_num):
         self._max_nodes_num = max_nodes_num
         self._thread_alive = threading.Event()
-        self._mutex = threading.Lock()  # for self._denied_nodeids
+        self._file_sync_done = threading.Event()
+        self._nodeids_mutex = threading.Lock()  # for `self._denied_nodeids`
         self._reset()
 
     def Start(self, request, context):
@@ -94,11 +97,11 @@
         if request.includeEthernet:
             self._state |= CaptureState.ETHERNET
             cmd += ['-i', 'docker0']
-        self._pcapng_filename = os.path.join(tempdir, 'sim.pcapng')
-        cmd += ['-w', self._pcapng_filename, '-q', 'not ip and not tcp and not arp and not ether proto 0x8899']
+        cmd += ['-w', '-', '-q', 'not ip and not tcp and not arp and not ether proto 0x8899']
 
         self.logger.debug('Running command:  %s', ' '.join(cmd))
-        self._tshark_proc = subprocess.Popen(cmd)
+        self._tshark_proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
+        self._set_nonblocking(self._tshark_proc.stdout.fileno())
 
         # Construct pcap codec after initiating tshark to avoid blocking
         self._pcap = pcap_codec.PcapCodec(request.channel, fifo_name)
@@ -112,7 +115,7 @@
 
         # Start the sniffer main loop thread
         self._thread = threading.Thread(target=self._sniffer_main_loop)
-        self._thread.daemon = True
+        self._thread.setDaemon(True)
         self._transport.open()
         self._thread_alive.set()
         self._thread.start()
@@ -128,13 +131,44 @@
             except socket.timeout:
                 continue
 
-            with self._mutex:
+            with self._nodeids_mutex:
                 denied_nodeids = self._denied_nodeids
 
             # Equivalent to RF enclosure
             if nodeid not in denied_nodeids:
                 self._pcap.append(data)
 
+    def TransferPcapng(self, request, context):
+        """ Transfer the capture file. """
+
+        # Validate the state
+        if self._state == CaptureState.NONE:
+            return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OPERATION_ERROR)
+
+        # Synchronize the capture file
+        while True:
+            content = self._tshark_proc.stdout.read()
+            if content is None:
+                # Currently no captured packets
+                time.sleep(self.TIMEOUT)
+            elif content == b'':
+                # Reach EOF when tshark terminates
+                break
+            else:
+                # Forward the captured packets
+                yield sniffer_pb2.TransferPcapngResponse(content=content)
+
+        self._file_sync_done.set()
+
+    def _set_nonblocking(self, fd):
+        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+        if flags < 0:
+            raise RuntimeError('fcntl(F_GETFL) failed')
+
+        flags |= os.O_NONBLOCK
+        if fcntl.fcntl(fd, fcntl.F_SETFL, flags) < 0:
+            raise RuntimeError('fcntl(F_SETFL) failed')
+
     def FilterNodes(self, request, context):
         """ Only sniffer the specified nodes. """
 
@@ -150,7 +184,7 @@
             if not 1 <= nodeid <= self._max_nodes_num:
                 return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.VALUE_ERROR)
 
-        with self._mutex:
+        with self._nodeids_mutex:
             self._denied_nodeids = denied_nodeids
 
         return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OK)
@@ -161,28 +195,29 @@
         self.logger.debug('call Stop')
 
         # Validate and change the state
-        if self._state == CaptureState.NONE:
-            return sniffer_pb2.StopResponse(status=sniffer_pb2.OPERATION_ERROR, pcap_content=b'')
+        if not (self._state & CaptureState.THREAD):
+            return sniffer_pb2.StopResponse(status=sniffer_pb2.OPERATION_ERROR)
         self._state = CaptureState.NONE
 
         self._thread_alive.clear()
-        self._thread.join(timeout=1)
+        self._thread.join()
         self._transport.close()
         self._pcap.close()
 
         self._tshark_proc.terminate()
+        self._file_sync_done.wait()
+        # `self._tshark_proc` becomes None after the next statement
         self._tshark_proc.wait()
 
-        with open(self._pcapng_filename, 'rb') as f:
-            pcap_content = f.read()
-
         self._reset()
 
-        return sniffer_pb2.StopResponse(status=sniffer_pb2.OK, pcap_content=pcap_content)
+        return sniffer_pb2.StopResponse(status=sniffer_pb2.OK)
 
 
 def serve(address_port, max_nodes_num):
-    server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
+    # One worker is used for `Start`, `FilterNodes` and `Stop`
+    # The other worker is used for `TransferPcapng`, which will be kept running by the client in a background thread
+    server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
     sniffer_pb2_grpc.add_SnifferServicer_to_server(SnifferServicer(max_nodes_num), server)
     # add_secure_port requires a web domain
     server.add_insecure_port(address_port)