| ######################## |
| % Pipetool related tests |
| ######################## |
| |
| + Basic tests |
| |
| = Test default test case |
| |
| s = PeriodicSource("hello", 1, name="src") |
| d1 = Drain(name="d1") |
| c = ConsoleSink(name="c") |
| tf = TransformDrain(lambda x: "Got %s" % x) |
| t = TermSink(name="PipeToolsPeriodicTest", keepterm=False) |
| s > d1 > c |
| d1 > tf > t |
| |
| p = PipeEngine(s) |
| p.start() |
| time.sleep(3) |
| s.msg = [] |
| p.stop() |
| |
| try: |
| os.remove("test.png") |
| except OSError: |
| pass |
| |
| = Test add_pipe |
| |
| s = AutoSource() |
| p = PipeEngine(s) |
| p.add(Pipe()) |
| assert len(p.active_pipes) == 2 |
| |
| x = p.spawn_Pipe() |
| assert len(p.active_pipes) == 3 |
| assert isinstance(x, Pipe) |
| |
| = Test exhausted source |
| |
| s = AutoSource() |
| s._gen_data("hello") |
| s.is_exhausted = True |
| d1 = Drain(name="d1") |
| c = ConsoleSink(name="c") |
| s > d1 > c |
| |
| p = PipeEngine(s) |
| p.start() |
| p.wait_and_stop() |
| |
| = Test add_pipe on running instance |
| |
| p = PipeEngine() |
| p.start() |
| |
| s = CLIFeeder() |
| |
| d1 = Drain(name="d1") |
| c = QueueSink(name="c") |
| s > d1 > c |
| |
| p.add(s) |
| |
| s.send("hello") |
| s.send("hi") |
| |
| assert c.q.get(timeout=5) == "hello" |
| assert c.q.get(timeout=5) == "hi" |
| |
| p.stop() |
| |
| = Test Operators |
| |
| s = AutoSource() |
| p = PipeEngine(s) |
| assert p == p |
| |
| a = AutoSource() |
| b = AutoSource() |
| a >> b |
| assert len(a.high_sinks) == 1 |
| assert len(a.high_sources) == 0 |
| assert len(b.high_sinks) == 0 |
| assert len(b.high_sources) == 1 |
| a |
| b |
| |
| a = AutoSource() |
| b = AutoSource() |
| a << b |
| assert len(a.high_sinks) == 0 |
| assert len(a.high_sources) == 1 |
| assert len(b.high_sinks) == 1 |
| assert len(b.high_sources) == 0 |
| a |
| b |
| |
| a = AutoSource() |
| b = AutoSource() |
| a == b |
| assert len(a.sinks) == 1 |
| assert len(a.sources) == 1 |
| assert len(b.sinks) == 1 |
| assert len(b.sources) == 1 |
| |
| a = AutoSource() |
| b = AutoSource() |
| a//b |
| assert len(a.high_sinks) == 1 |
| assert len(a.high_sources) == 1 |
| assert len(b.high_sinks) == 1 |
| assert len(b.high_sources) == 1 |
| |
| a = AutoSource() |
| b = AutoSource() |
| a^b |
| assert len(b.trigger_sources) == 1 |
| assert len(a.trigger_sinks) == 1 |
| |
| = Test doc |
| |
| s = AutoSource() |
| p = PipeEngine(s) |
| p.list_pipes() |
| p.list_pipes_detailed() |
| |
| = Test RawConsoleSink with CLIFeeder |
| |
| p = PipeEngine() |
| |
| s = CLIFeeder() |
| s.send("hello") |
| s.is_exhausted = True |
| |
| r, w = os.pipe() |
| |
| d1 = Drain(name="d1") |
| c = RawConsoleSink(name="c") |
| c._write_pipe = w |
| s > d1 > c |
| |
| p.add(s) |
| p.start() |
| |
| assert os.read(r, 20) == b"hello\n" |
| p.wait_and_stop() |
| |
| = Test QueueSink with CLIFeeder |
| |
| p = PipeEngine() |
| |
| s = CLIFeeder() |
| s.send("hello") |
| s.is_exhausted = True |
| |
| d1 = Drain(name="d1") |
| c = QueueSink(name="c") |
| s > d1 > c |
| |
| p.add(s) |
| p.start() |
| |
| p.wait_and_stop() |
| assert c.recv() == "hello" |
| |
| = Test UpDrain |
| |
| test_val = None |
| |
| class TestSink(Sink): |
| def high_push(self, msg): |
| global test_val |
| test_val = msg |
| |
| p = PipeEngine() |
| |
| s = CLIFeeder() |
| s.send("hello") |
| s.is_exhausted = True |
| |
| d1 = UpDrain(name="d1") |
| c = TestSink(name="c") |
| s > d1 |
| d1 >> c |
| |
| p.add(s) |
| p.start() |
| |
| p.wait_and_stop() |
| assert test_val == "hello" |
| |
| = Test DownDrain |
| |
| test_val = None |
| |
| class TestSink(Sink): |
| def push(self, msg): |
| global test_val |
| test_val = msg |
| |
| p = PipeEngine() |
| |
| s = CLIHighFeeder() |
| s.send("hello") |
| s.is_exhausted = True |
| |
| d1 = DownDrain(name="d1") |
| c = TestSink(name="c") |
| s >> d1 |
| d1 > c |
| |
| p.add(s) |
| p.start() |
| |
| p.wait_and_stop() |
| assert test_val == "hello" |
| |
| + Advanced ScapyPipes pipetools tests |
| |
| = Test SniffSource |
| ~ netaccess |
| |
| p = PipeEngine() |
| |
| s = SniffSource() |
| d1 = Drain(name="d1") |
| c = QueueSink(name="c") |
| s > d1 > c |
| |
| p.add(s) |
| p.start() |
| sniff(count=3) |
| p.stop() |
| assert c.q.get() |
| |
| = Test exhausted AutoSource and SniffSource |
| |
| import mock |
| from scapy.error import Scapy_Exception |
| |
| def _fail(): |
| raise Scapy_Exception() |
| |
| a = AutoSource() |
| a._send = mock.MagicMock(side_effect=_fail) |
| a._wake_up() |
| try: |
| a.deliver() |
| except: |
| pass |
| |
| s = SniffSource() |
| s.s = mock.MagicMock() |
| s.s.recv = mock.MagicMock(side_effect=_fail) |
| try: |
| s.deliver() |
| except: |
| pass |
| |
| = Test RdpcapSource and WrpcapSink |
| ~ needs_root |
| |
| req = Ether()/IP()/ICMP() |
| rpy = Ether()/IP('E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') |
| |
| wrpcap("t.pcap", [req, rpy]) |
| |
| p = PipeEngine() |
| |
| s = RdpcapSource("t.pcap") |
| d1 = Drain(name="d1") |
| c = WrpcapSink("t2.pcap", name="c") |
| s > d1 > c |
| p.add(s) |
| p.start() |
| p.wait_and_stop() |
| |
| results = rdpcap("t2.pcap") |
| |
| assert raw(results[0]) == raw(req) |
| assert raw(results[1]) == raw(rpy) |
| |
| os.unlink("t.pcap") |
| os.unlink("t2.pcap") |
| |
| = Test InjectSink and Inject3Sink |
| ~ needs_root |
| |
| import mock |
| |
| a = IP(dst="192.168.0.1")/ICMP() |
| msgs = [] |
| |
| class FakeSocket(object): |
| def __init__(self, *arg, **karg): |
| pass |
| def close(self): |
| pass |
| def send(self, msg): |
| global msgs |
| msgs.append(msg) |
| |
| @mock.patch("scapy.scapypipes.conf.L2socket", FakeSocket) |
| @mock.patch("scapy.scapypipes.conf.L3socket", FakeSocket) |
| def _inject_sink(i3): |
| s = CLIFeeder() |
| s.send(a) |
| s.is_exhausted = True |
| d1 = Drain(name="d1") |
| c = Inject3Sink() if i3 else InjectSink() |
| s > d1 > c |
| p = PipeEngine(s) |
| p.start() |
| p.wait_and_stop() |
| |
| _inject_sink(False) # InjectSink |
| _inject_sink(True) # Inject3Sink |
| |
| assert msgs == [a,a] |
| |
| = TriggerDrain and TriggeredValve with CLIFeeder |
| |
| s = CLIFeeder() |
| d1 = TriggerDrain(lambda x:x=="trigger") |
| d2 = TriggeredValve() |
| c = QueueSink() |
| |
| s > d1 > d2 > c |
| d1 ^ d2 |
| |
| p = PipeEngine(s) |
| p.start() |
| |
| s.send("hello") |
| s.send("trigger") |
| s.send("hello2") |
| s.send("trigger") |
| s.send("hello3") |
| |
| assert c.q.get(timeout=5) == "hello" |
| assert c.q.get(timeout=5) == "trigger" |
| assert c.q.get(timeout=5) == "hello3" |
| |
| p.stop() |
| |
| = TriggerDrain and TriggeredValve with CLIHighFeeder |
| |
| s = CLIHighFeeder() |
| d1 = TriggerDrain(lambda x:x=="trigger") |
| d2 = TriggeredValve() |
| c = QueueSink() |
| |
| s >> d1 |
| d1 >> d2 |
| d2 >> c |
| d1 ^ d2 |
| |
| p = PipeEngine(s) |
| p.start() |
| |
| s.send("hello") |
| s.send("trigger") |
| s.send("hello2") |
| s.send("trigger") |
| s.send("hello3") |
| |
| assert c.q.get(timeout=5) == "hello" |
| assert c.q.get(timeout=5) == "trigger" |
| assert c.q.get(timeout=5) == "hello3" |
| |
| p.stop() |
| |
| = TriggerDrain and TriggeredQueueingValve with CLIFeeder |
| |
| s = CLIFeeder() |
| d1 = TriggerDrain(lambda x:x=="trigger") |
| d2 = TriggeredValve() |
| c = QueueSink() |
| |
| s > d1 > d2 > c |
| d1 ^ d2 |
| |
| p = PipeEngine(s) |
| p.start() |
| |
| s.send("hello") |
| s.send("trigger") |
| s.send("hello2") |
| s.send("trigger") |
| s.send("hello3") |
| |
| assert c.q.get(timeout=5) == "hello" |
| assert c.q.get(timeout=5) == "trigger" |
| assert c.q.get(timeout=5) == "hello3" |
| |
| p.stop() |
| |
| = TriggerDrain and TriggeredSwitch with CLIFeeder on high channel |
| |
| s = CLIFeeder() |
| d1 = TriggerDrain(lambda x:x=="trigger") |
| d2 = TriggeredSwitch() |
| c = QueueSink() |
| |
| s > d1 > d2 |
| d2 >> c |
| d1 ^ d2 |
| |
| p = PipeEngine(s) |
| p.start() |
| |
| s.send("hello") |
| s.send("trigger") |
| s.send("hello2") |
| s.send("trigger") |
| s.send("hello3") |
| |
| assert c.q.get(timeout=5) == "trigger" |
| assert c.q.get(timeout=5) == "hello2" |
| |
| p.stop() |
| |
| = TriggerDrain and TriggeredSwitch with CLIHighFeeder on low channel |
| |
| s = CLIHighFeeder() |
| d1 = TriggerDrain(lambda x:x=="trigger") |
| d2 = TriggeredSwitch() |
| c = QueueSink() |
| |
| s >> d1 |
| d1 >> d2 |
| d2 > c |
| d1 ^ d2 |
| |
| p = PipeEngine(s) |
| p.start() |
| |
| s.send("hello") |
| s.send("trigger") |
| s.send("hello2") |
| s.send("trigger") |
| s.send("hello3") |
| |
| assert c.q.get(timeout=5) == "hello" |
| assert c.q.get(timeout=5) == "trigger" |
| assert c.q.get(timeout=5) == "hello3" |
| |
| p.stop() |
| |
| = TriggerDrain and TriggeredMessage |
| |
| s = CLIFeeder() |
| d1 = TriggerDrain(lambda x:x=="trigger") |
| d2 = TriggeredMessage("hello") |
| c = QueueSink() |
| |
| s > d1 > d2 > c |
| d1 ^ d2 |
| |
| p = PipeEngine(s) |
| p.start() |
| |
| s.send("trigger") |
| |
| r = [c.q.get(timeout=5), c.q.get(timeout=5)] |
| assert "hello" in r |
| assert "trigger" in r |
| |
| p.stop() |
| |
| = TriggerDrain and TriggeredQueueingValve on low channel |
| |
| p = PipeEngine() |
| |
| s = CLIFeeder() |
| r, w = os.pipe() |
| |
| d1 = TriggerDrain(lambda x:x=="trigger") |
| d2 = TriggeredQueueingValve() |
| c = QueueSink(name="c") |
| s > d1 > d2 > c |
| d1 ^ d2 |
| |
| p.add(s) |
| p.start() |
| |
| s.send("trigger") |
| s.send("hello") |
| s.send("trigger") |
| assert c.q.get(timeout=3) == "trigger" |
| assert c.q.get(timeout=3) in ['hello', 'trigger'] |
| assert c.q.get(timeout=3) in ['hello', 'trigger'] |
| assert d2.q.qsize() == 0 |
| |
| p.stop() |
| |
| = TriggerDrain and TriggeredQueueingValve on high channel |
| |
| p = PipeEngine() |
| |
| s = CLIHighFeeder() |
| r, w = os.pipe() |
| |
| d1 = TriggerDrain(lambda x:x=="trigger") |
| d2 = TriggeredQueueingValve() |
| c = QueueSink(name="c") |
| s >> d1 >> d2 >> c |
| d1 ^ d2 |
| |
| p.add(s) |
| p.start() |
| |
| s.send("trigger") |
| s.send("hello") |
| s.send("trigger") |
| assert c.q.get(timeout=3) == "trigger" |
| assert c.q.get(timeout=3) == "hello" |
| assert d2.q.qsize() == 0 |
| |
| p.stop() |
| |
| = UDPDrain |
| |
| p = PipeEngine() |
| |
| s = CLIFeeder() |
| s2 = CLIHighFeeder() |
| d1 = UDPDrain() |
| c = QueueSink() |
| |
| s > d1 > c |
| s2 >> d1 >> c |
| |
| p.add(s) |
| p.add(s2) |
| p.start() |
| |
| s.send(IP(src="127.0.0.1")/UDP()/DNS()) |
| s2.send(DNS()) |
| |
| res = [c.q.get(timeout=2), c.q.get(timeout=2)] |
| assert b'\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00' in res |
| res.remove(b'\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00') |
| assert DNS in res[0] and res[0][UDP].sport == 1234 |
| |
| p.stop() |
| |
| = FDSourceSink on a Bunch object |
| |
| class Bunch: |
| __init__ = lambda self, **kw: setattr(self, '__dict__', kw) |
| |
| fd = Bunch(write=lambda x: None, read=lambda: "hello", fileno=lambda: None) |
| |
| s = FDSourceSink(fd) |
| d = Drain() |
| c = QueueSink() |
| s > d > c |
| |
| assert s.fileno() == None |
| s.push("data") |
| s.deliver() |
| assert c.q.get(timeout=1) == "hello" |
| |
| = TCPConnectPipe networking test |
| ~ networking needs_root |
| |
| p = PipeEngine() |
| |
| s = CLIFeeder() |
| d1 = TCPConnectPipe(addr="www.google.fr", port=80) |
| c = QueueSink() |
| |
| s > d1 > c |
| |
| p.add(s) |
| p.start() |
| |
| s.send(b"GET http://www.google.fr/search?q=scapy&start=1&num=1\n") |
| result = c.q.get(timeout=10) |
| p.stop() |
| |
| assert result.startswith(b"HTTP/1.0 200 OK") |