perfetto: add benchmark for fixed rate producer

This benchmark tries to rate-limit how fast the producer is writing to
measure the CPU usage of the producer and service.

Bug: 74380167
Change-Id: Ic5e1c6a51394efeff9a291e89d1c06f13ad23f6d
diff --git a/test/end_to_end_benchmark.cc b/test/end_to_end_benchmark.cc
index 57f9cae..8c3d2fb 100644
--- a/test/end_to_end_benchmark.cc
+++ b/test/end_to_end_benchmark.cc
@@ -29,6 +29,8 @@
 
 namespace perfetto {
 
+namespace {
+
 // If we're building on Android and starting the daemons ourselves,
 // create the sockets in a world-writable location.
 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
@@ -40,7 +42,11 @@
 #define TEST_CONSUMER_SOCK_NAME PERFETTO_CONSUMER_SOCK_NAME
 #endif
 
-static void BM_EndToEnd(benchmark::State& state) {
+bool IsBenchmarkFunctionalOnly() {
+  return getenv("BENCHMARK_FUNCTIONAL_TEST_ONLY") != nullptr;
+}
+
+void BenchmarkCommon(benchmark::State& state) {
   base::TestTaskRunner task_runner;
 
 #if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
@@ -56,10 +62,13 @@
   };
   std::unique_ptr<FakeProducerDelegate> producer_delegate(
       new FakeProducerDelegate(TEST_PRODUCER_SOCK_NAME,
-                               posted_on_producer_enabled));
+                               std::move(posted_on_producer_enabled)));
   FakeProducerDelegate* producer_delegate_cached = producer_delegate.get();
   producer_thread.Start(std::move(producer_delegate));
 
+  // Once conneced, we can retrieve the inner producer.
+  FakeProducer* producer = producer_delegate_cached->producer();
+
   // Setup the TraceConfig for the consumer.
   TraceConfig trace_config;
   trace_config.add_buffers()->set_size_kb(512);
@@ -71,13 +80,19 @@
 
   // The parameters for the producer.
   static constexpr uint32_t kRandomSeed = 42;
-  uint32_t message_count = state.range(0);
-  uint32_t message_size = state.range(1);
+  size_t message_count = state.range(0);
+  size_t message_bytes = state.range(1);
+  size_t mb_per_s = state.range(2);
+
+  size_t messages_per_s = mb_per_s * 1024 * 1024 / message_bytes;
+  size_t time_for_messages_ms =
+      10000 + (messages_per_s == 0 ? 0 : message_count * 1000 / messages_per_s);
 
   // Setup the test to use a random number generator.
   ds_config->mutable_for_testing()->set_seed(kRandomSeed);
   ds_config->mutable_for_testing()->set_message_count(message_count);
-  ds_config->mutable_for_testing()->set_message_size(message_size);
+  ds_config->mutable_for_testing()->set_message_size(message_bytes);
+  ds_config->mutable_for_testing()->set_max_messages_per_second(messages_per_s);
 
   bool is_first_packet = true;
   auto on_readback_complete = task_runner.CreateCheckpoint("readback.complete");
@@ -87,10 +102,10 @@
                                         bool has_more) {
     for (auto& packet : packets) {
       ASSERT_TRUE(packet.Decode());
-      ASSERT_TRUE(packet->has_for_testing() || packet->has_clock_snapshot());
-      if (packet->has_clock_snapshot()) {
+      ASSERT_TRUE(packet->has_for_testing() || packet->has_clock_snapshot() ||
+                  packet->has_trace_config());
+      if (packet->has_clock_snapshot() || packet->has_trace_config())
         continue;
-      }
       ASSERT_EQ(protos::TracePacket::kTrustedUid,
                 packet->optional_trusted_uid_case());
       if (is_first_packet) {
@@ -118,7 +133,8 @@
   task_runner.RunUntilCheckpoint("producer.enabled");
 
   uint64_t wall_start_ns = base::GetWallTimeNs().count();
-  uint64_t thread_start_ns = service_thread.GetThreadCPUTimeNs();
+  uint64_t service_start_ns = service_thread.GetThreadCPUTimeNs();
+  uint64_t producer_start_ns = producer_thread.GetThreadCPUTimeNs();
   uint64_t iterations = 0;
   for (auto _ : state) {
     auto cname = "produced.and.committed." + std::to_string(iterations++);
@@ -127,29 +143,65 @@
                                              &on_produced_and_committed] {
       task_runner.PostTask(on_produced_and_committed);
     };
-    FakeProducer* producer = producer_delegate_cached->producer();
     producer->ProduceEventBatch(posted_on_produced_and_committed);
-    task_runner.RunUntilCheckpoint(cname);
+    task_runner.RunUntilCheckpoint(cname, time_for_messages_ms);
   }
-  uint64_t thread_ns = service_thread.GetThreadCPUTimeNs() - thread_start_ns;
+  uint64_t service_ns = service_thread.GetThreadCPUTimeNs() - service_start_ns;
+  uint64_t producer_ns =
+      producer_thread.GetThreadCPUTimeNs() - producer_start_ns;
   uint64_t wall_ns = base::GetWallTimeNs().count() - wall_start_ns;
 
-  state.counters["Ser CPU"] = benchmark::Counter(100.0 * thread_ns / wall_ns);
+  state.counters["Pro CPU"] = benchmark::Counter(100.0 * producer_ns / wall_ns);
+  state.counters["Ser CPU"] = benchmark::Counter(100.0 * service_ns / wall_ns);
   state.counters["Ser ns/m"] =
-      benchmark::Counter(1.0 * thread_ns / message_count);
+      benchmark::Counter(1.0 * service_ns / message_count);
 
   // Read back the buffer just to check correctness.
   consumer.ReadTraceData();
   task_runner.RunUntilCheckpoint("readback.complete");
-  state.SetBytesProcessed(int64_t(state.iterations()) * message_size *
-                          message_count);
+  state.SetBytesProcessed(iterations * message_bytes * message_count);
 
   consumer.Disconnect();
 }
 
-BENCHMARK(BM_EndToEnd)
+void SaturateCpuArgs(benchmark::internal::Benchmark* b) {
+  int min_message_count = 16;
+  int max_message_count = IsBenchmarkFunctionalOnly() ? 1024 : 1024 * 1024;
+  int min_payload = 8;
+  int max_payload = IsBenchmarkFunctionalOnly() ? 256 : 2048;
+  for (int count = min_message_count; count <= max_message_count; count *= 2) {
+    for (int bytes = min_payload; bytes <= max_payload; bytes *= 2) {
+      b->Args({count, bytes, 0 /* speed */});
+    }
+  }
+}
+
+void ConstantRateArgs(benchmark::internal::Benchmark* b) {
+  int message_count = IsBenchmarkFunctionalOnly() ? 2 * 1024 : 128 * 1024;
+  int min_speed = IsBenchmarkFunctionalOnly() ? 64 : 8;
+  int max_speed = IsBenchmarkFunctionalOnly() ? 128 : 128;
+  for (int speed = min_speed; speed <= max_speed; speed *= 2) {
+    b->Args({message_count, 128, speed});
+    b->Args({message_count, 256, speed});
+  }
+}
+}
+
+static void BM_EndToEnd_SaturateCpu(benchmark::State& state) {
+  BenchmarkCommon(state);
+}
+
+BENCHMARK(BM_EndToEnd_SaturateCpu)
     ->Unit(benchmark::kMicrosecond)
     ->UseRealTime()
-    ->RangeMultiplier(2)
-    ->Ranges({{16, 1024 * 1024}, {8, 2048}});
+    ->Apply(SaturateCpuArgs);
+
+static void BM_EndToEnd_ConstantRate(benchmark::State& state) {
+  BenchmarkCommon(state);
 }
+
+BENCHMARK(BM_EndToEnd_ConstantRate)
+    ->Unit(benchmark::kMicrosecond)
+    ->UseRealTime()
+    ->Apply(ConstantRateArgs);
+}  // namespace perfetto
diff --git a/test/end_to_end_integrationtest.cc b/test/end_to_end_integrationtest.cc
index 13a56b2..dbf91c2 100644
--- a/test/end_to_end_integrationtest.cc
+++ b/test/end_to_end_integrationtest.cc
@@ -183,9 +183,10 @@
                               std::vector<TracePacket> packets, bool has_more) {
     for (auto& packet : packets) {
       ASSERT_TRUE(packet.Decode());
+      ASSERT_TRUE(packet->has_for_testing() || packet->has_clock_snapshot() ||
+                  packet->has_trace_config());
       if (packet->has_clock_snapshot() || packet->has_trace_config())
         continue;
-      ASSERT_TRUE(packet->has_for_testing());
       ASSERT_EQ(protos::TracePacket::kTrustedUid,
                 packet->optional_trusted_uid_case());
       ASSERT_EQ(packet->for_testing().seq_value(), rnd_engine());
diff --git a/test/fake_producer.cc b/test/fake_producer.cc
index 9d28744..a81f656 100644
--- a/test/fake_producer.cc
+++ b/test/fake_producer.cc
@@ -21,6 +21,7 @@
 
 #include "gtest/gtest.h"
 #include "perfetto/base/logging.h"
+#include "perfetto/base/time.h"
 #include "perfetto/base/utils.h"
 #include "perfetto/trace/test_event.pbzero.h"
 #include "perfetto/trace/trace_packet.pbzero.h"
@@ -65,6 +66,8 @@
   rnd_engine_ = std::minstd_rand0(source_config.for_testing().seed());
   message_count_ = source_config.for_testing().message_count();
   message_size_ = source_config.for_testing().message_size();
+  max_messages_per_second_ =
+      source_config.for_testing().max_messages_per_second();
   task_runner_->PostTask(on_create_data_source_instance_);
 }
 
@@ -82,10 +85,34 @@
         static_cast<char*>(malloc(message_size_)));
     memset(payload.get(), '.', message_size_);
     payload.get()[message_size_ - 1] = 0;
-    for (size_t i = 0; i < message_count_; i++) {
-      auto handle = trace_writer_->NewTracePacket();
-      handle->set_for_testing()->set_seq_value(rnd_engine_());
-      handle->set_for_testing()->set_str(payload.get(), message_size_);
+
+    base::TimeMillis start = base::GetWallTimeMs();
+    int64_t iterations = 0;
+    size_t messages_to_emit = message_count_;
+    while (messages_to_emit > 0) {
+      size_t messages_in_minibatch =
+          max_messages_per_second_ == 0
+              ? messages_to_emit
+              : std::min(max_messages_per_second_, messages_to_emit);
+      PERFETTO_DCHECK(messages_to_emit >= messages_in_minibatch);
+
+      for (size_t i = 0; i < messages_in_minibatch; i++) {
+        auto handle = trace_writer_->NewTracePacket();
+        handle->set_for_testing()->set_seq_value(rnd_engine_());
+        handle->set_for_testing()->set_str(payload.get(), message_size_);
+      }
+      messages_to_emit -= messages_in_minibatch;
+
+      // Pause until the second boundary to make sure that we are adhering to
+      // the speed limitation.
+      if (max_messages_per_second_ > 0) {
+        int64_t expected_time_taken = ++iterations * 1000;
+        base::TimeMillis time_taken = base::GetWallTimeMs() - start;
+        while (time_taken.count() < expected_time_taken) {
+          usleep((expected_time_taken - time_taken.count()) * 1000);
+          time_taken = base::GetWallTimeMs() - start;
+        }
+      }
     }
     trace_writer_->Flush(callback);
   });
diff --git a/test/fake_producer.h b/test/fake_producer.h
index 152dd04..2495518 100644
--- a/test/fake_producer.h
+++ b/test/fake_producer.h
@@ -61,6 +61,7 @@
   std::minstd_rand0 rnd_engine_;
   size_t message_size_ = 0;
   size_t message_count_ = 0;
+  size_t max_messages_per_second_ = 0;
   std::function<void()> on_create_data_source_instance_;
   std::unique_ptr<Service::ProducerEndpoint> endpoint_;
   std::unique_ptr<TraceWriter> trace_writer_;