[harness-simulation] keep capture files in sync at runtime (#8138)

This commit keeps capture files in sync at runtime. Therefore, Harness
can obtain the addresses directly from the capture files in manual DUT
mode without having to enter the addresses manually.
diff --git a/tools/harness-simulation/harness/Thread_Harness/Sniffer/SimSniffer.py b/tools/harness-simulation/harness/Thread_Harness/Sniffer/SimSniffer.py
index 90bc43f..c5c55db 100644
--- a/tools/harness-simulation/harness/Thread_Harness/Sniffer/SimSniffer.py
+++ b/tools/harness-simulation/harness/Thread_Harness/Sniffer/SimSniffer.py
@@ -35,6 +35,7 @@
 import select
 import socket
 import struct
+import threading
 import time
 import win32api
 import winreg as wr
@@ -259,9 +260,20 @@
         if response.status != sniffer_pb2.OK:
             raise RuntimeError('startSniffer error: %s' % sniffer_pb2.Status.Name(response.status))
 
+        self._thread = threading.Thread(target=self._file_sync_main_loop)
+        self._thread.setDaemon(True)
+        self._thread.start()
+
         self.is_active = True
 
     @watched
+    def _file_sync_main_loop(self):
+        with open(self._local_pcapng_location, 'wb') as f:
+            for response in self._stub.TransferPcapng(sniffer_pb2.TransferPcapngRequest()):
+                f.write(response.content)
+                f.flush()
+
+    @watched
     def stopSniffer(self):
         if not self.is_active:
             return
@@ -270,8 +282,7 @@
         if response.status != sniffer_pb2.OK:
             raise RuntimeError('stopSniffer error: %s' % sniffer_pb2.Status.Name(response.status))
 
-        with open(self._local_pcapng_location, 'wb') as f:
-            f.write(response.pcap_content)
+        self._thread.join()
 
         self.is_active = False
 
diff --git a/tools/harness-simulation/posix/sniffer_sim/proto/sniffer.proto b/tools/harness-simulation/posix/sniffer_sim/proto/sniffer.proto
index 436bd3b..9d843f0 100644
--- a/tools/harness-simulation/posix/sniffer_sim/proto/sniffer.proto
+++ b/tools/harness-simulation/posix/sniffer_sim/proto/sniffer.proto
@@ -7,6 +7,9 @@
     // Start the sniffer
     rpc Start(StartRequest) returns (StartResponse) {}
 
+    // Transfer the capture file
+    rpc TransferPcapng(TransferPcapngRequest) returns (stream TransferPcapngResponse) {}
+
     // Let the sniffer sniff these nodes only
     rpc FilterNodes(FilterNodesRequest) returns (FilterNodesResponse) {}
 
@@ -41,6 +44,13 @@
     Status status = 1;
 }
 
+message TransferPcapngRequest {
+}
+
+message TransferPcapngResponse {
+    bytes content = 1;
+}
+
 message FilterNodesRequest {
     repeated int32 nodeids = 1;
 }
@@ -54,5 +64,4 @@
 
 message StopResponse {
     Status status = 1;
-    bytes pcap_content = 2;
 }
diff --git a/tools/harness-simulation/posix/sniffer_sim/sniffer.py b/tools/harness-simulation/posix/sniffer_sim/sniffer.py
index b059cf3..f352131 100644
--- a/tools/harness-simulation/posix/sniffer_sim/sniffer.py
+++ b/tools/harness-simulation/posix/sniffer_sim/sniffer.py
@@ -30,6 +30,7 @@
 import argparse
 from concurrent import futures
 import enum
+import fcntl
 import grpc
 import logging
 import os
@@ -38,6 +39,7 @@
 import subprocess
 import tempfile
 import threading
+import time
 
 import pcap_codec
 from proto import sniffer_pb2
@@ -66,13 +68,14 @@
         self._transport = None
         self._thread = None
         self._thread_alive.clear()
-        self._pcapng_filename = None
+        self._file_sync_done.clear()
         self._tshark_proc = None
 
     def __init__(self, max_nodes_num):
         self._max_nodes_num = max_nodes_num
         self._thread_alive = threading.Event()
-        self._mutex = threading.Lock()  # for self._denied_nodeids
+        self._file_sync_done = threading.Event()
+        self._nodeids_mutex = threading.Lock()  # for `self._denied_nodeids`
         self._reset()
 
     def Start(self, request, context):
@@ -94,11 +97,11 @@
         if request.includeEthernet:
             self._state |= CaptureState.ETHERNET
             cmd += ['-i', 'docker0']
-        self._pcapng_filename = os.path.join(tempdir, 'sim.pcapng')
-        cmd += ['-w', self._pcapng_filename, '-q', 'not ip and not tcp and not arp and not ether proto 0x8899']
+        cmd += ['-w', '-', '-q', 'not ip and not tcp and not arp and not ether proto 0x8899']
 
         self.logger.debug('Running command:  %s', ' '.join(cmd))
-        self._tshark_proc = subprocess.Popen(cmd)
+        self._tshark_proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
+        self._set_nonblocking(self._tshark_proc.stdout.fileno())
 
         # Construct pcap codec after initiating tshark to avoid blocking
         self._pcap = pcap_codec.PcapCodec(request.channel, fifo_name)
@@ -112,7 +115,7 @@
 
         # Start the sniffer main loop thread
         self._thread = threading.Thread(target=self._sniffer_main_loop)
-        self._thread.daemon = True
+        self._thread.setDaemon(True)
         self._transport.open()
         self._thread_alive.set()
         self._thread.start()
@@ -128,13 +131,44 @@
             except socket.timeout:
                 continue
 
-            with self._mutex:
+            with self._nodeids_mutex:
                 denied_nodeids = self._denied_nodeids
 
             # Equivalent to RF enclosure
             if nodeid not in denied_nodeids:
                 self._pcap.append(data)
 
+    def TransferPcapng(self, request, context):
+        """ Transfer the capture file. """
+
+        # Validate the state
+        if self._state == CaptureState.NONE:
+            return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OPERATION_ERROR)
+
+        # Synchronize the capture file
+        while True:
+            content = self._tshark_proc.stdout.read()
+            if content is None:
+                # Currently no captured packets
+                time.sleep(self.TIMEOUT)
+            elif content == b'':
+                # Reach EOF when tshark terminates
+                break
+            else:
+                # Forward the captured packets
+                yield sniffer_pb2.TransferPcapngResponse(content=content)
+
+        self._file_sync_done.set()
+
+    def _set_nonblocking(self, fd):
+        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+        if flags < 0:
+            raise RuntimeError('fcntl(F_GETFL) failed')
+
+        flags |= os.O_NONBLOCK
+        if fcntl.fcntl(fd, fcntl.F_SETFL, flags) < 0:
+            raise RuntimeError('fcntl(F_SETFL) failed')
+
     def FilterNodes(self, request, context):
         """ Only sniffer the specified nodes. """
 
@@ -150,7 +184,7 @@
             if not 1 <= nodeid <= self._max_nodes_num:
                 return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.VALUE_ERROR)
 
-        with self._mutex:
+        with self._nodeids_mutex:
             self._denied_nodeids = denied_nodeids
 
         return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OK)
@@ -161,28 +195,29 @@
         self.logger.debug('call Stop')
 
         # Validate and change the state
-        if self._state == CaptureState.NONE:
-            return sniffer_pb2.StopResponse(status=sniffer_pb2.OPERATION_ERROR, pcap_content=b'')
+        if not (self._state & CaptureState.THREAD):
+            return sniffer_pb2.StopResponse(status=sniffer_pb2.OPERATION_ERROR)
         self._state = CaptureState.NONE
 
         self._thread_alive.clear()
-        self._thread.join(timeout=1)
+        self._thread.join()
         self._transport.close()
         self._pcap.close()
 
         self._tshark_proc.terminate()
+        self._file_sync_done.wait()
+        # `self._tshark_proc` becomes None after the next statement
         self._tshark_proc.wait()
 
-        with open(self._pcapng_filename, 'rb') as f:
-            pcap_content = f.read()
-
         self._reset()
 
-        return sniffer_pb2.StopResponse(status=sniffer_pb2.OK, pcap_content=pcap_content)
+        return sniffer_pb2.StopResponse(status=sniffer_pb2.OK)
 
 
 def serve(address_port, max_nodes_num):
-    server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
+    # One worker is used for `Start`, `FilterNodes` and `Stop`
+    # The other worker is used for `TransferPcapng`, which will be kept running by the client in a background thread
+    server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
     sniffer_pb2_grpc.add_SnifferServicer_to_server(SnifferServicer(max_nodes_num), server)
     # add_secure_port requires a web domain
     server.add_insecure_port(address_port)