perfetto: add benchmark for fixed rate producer

This benchmark tries to rate-limit how fast the producer is writing to
measure the CPU usage of the producer and service.

Bug: 74380167
Change-Id: Ic5e1c6a51394efeff9a291e89d1c06f13ad23f6d
diff --git a/test/fake_producer.cc b/test/fake_producer.cc
index 9d28744..a81f656 100644
--- a/test/fake_producer.cc
+++ b/test/fake_producer.cc
@@ -21,6 +21,7 @@
 
 #include "gtest/gtest.h"
 #include "perfetto/base/logging.h"
+#include "perfetto/base/time.h"
 #include "perfetto/base/utils.h"
 #include "perfetto/trace/test_event.pbzero.h"
 #include "perfetto/trace/trace_packet.pbzero.h"
@@ -65,6 +66,8 @@
   rnd_engine_ = std::minstd_rand0(source_config.for_testing().seed());
   message_count_ = source_config.for_testing().message_count();
   message_size_ = source_config.for_testing().message_size();
+  max_messages_per_second_ =
+      source_config.for_testing().max_messages_per_second();
   task_runner_->PostTask(on_create_data_source_instance_);
 }
 
@@ -82,10 +85,34 @@
         static_cast<char*>(malloc(message_size_)));
     memset(payload.get(), '.', message_size_);
     payload.get()[message_size_ - 1] = 0;
-    for (size_t i = 0; i < message_count_; i++) {
-      auto handle = trace_writer_->NewTracePacket();
-      handle->set_for_testing()->set_seq_value(rnd_engine_());
-      handle->set_for_testing()->set_str(payload.get(), message_size_);
+
+    base::TimeMillis start = base::GetWallTimeMs();
+    int64_t iterations = 0;
+    size_t messages_to_emit = message_count_;
+    while (messages_to_emit > 0) {
+      size_t messages_in_minibatch =
+          max_messages_per_second_ == 0
+              ? messages_to_emit
+              : std::min(max_messages_per_second_, messages_to_emit);
+      PERFETTO_DCHECK(messages_to_emit >= messages_in_minibatch);
+
+      for (size_t i = 0; i < messages_in_minibatch; i++) {
+        auto handle = trace_writer_->NewTracePacket();
+        handle->set_for_testing()->set_seq_value(rnd_engine_());
+        handle->set_for_testing()->set_str(payload.get(), message_size_);
+      }
+      messages_to_emit -= messages_in_minibatch;
+
+      // Pause until the second boundary to make sure that we are adhering to
+      // the speed limitation.
+      if (max_messages_per_second_ > 0) {
+        int64_t expected_time_taken = ++iterations * 1000;
+        base::TimeMillis time_taken = base::GetWallTimeMs() - start;
+        while (time_taken.count() < expected_time_taken) {
+          usleep((expected_time_taken - time_taken.count()) * 1000);
+          time_taken = base::GetWallTimeMs() - start;
+        }
+      }
     }
     trace_writer_->Flush(callback);
   });