blob: 2962f7a2eb9905f319a698139dc97e0211b338b4 [file] [log] [blame]
//
//
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/posix_engine/timer_manager.h"
#include <memory>
#include <utility>
#include "absl/time/time.h"
#include "absl/types/optional.h"
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/thd.h"
static thread_local bool g_timer_thread;
namespace grpc_event_engine {
namespace experimental {
grpc_core::DebugOnlyTraceFlag grpc_event_engine_timer_trace(false, "timer");
void TimerManager::RunSomeTimers(
std::vector<experimental::EventEngine::Closure*> timers) {
for (auto* timer : timers) {
thread_pool_->Run(timer);
}
}
// wait until 'next' (or forever if there is already a timed waiter in the pool)
// returns true if the thread should continue executing (false if it should
// shutdown)
bool TimerManager::WaitUntil(grpc_core::Timestamp next) {
grpc_core::MutexLock lock(&mu_);
if (shutdown_) return false;
// If kicked_ is true at this point, it means there was a kick from the timer
// system that the timer-manager threads here missed. We cannot trust 'next'
// here any longer (since there might be an earlier deadline). So if kicked_
// is true at this point, we should quickly exit this and get the next
// deadline from the timer system
if (!kicked_) {
cv_wait_.WaitWithTimeout(&mu_,
absl::Milliseconds((next - host_.Now()).millis()));
++wakeups_;
}
kicked_ = false;
return true;
}
void TimerManager::MainLoop() {
for (;;) {
grpc_core::Timestamp next = grpc_core::Timestamp::InfFuture();
absl::optional<std::vector<experimental::EventEngine::Closure*>>
check_result = timer_list_->TimerCheck(&next);
GPR_ASSERT(check_result.has_value() &&
"ERROR: More than one MainLoop is running.");
if (!check_result->empty()) {
RunSomeTimers(std::move(*check_result));
continue;
}
if (!WaitUntil(next)) break;
}
main_loop_exit_signal_->Notify();
}
bool TimerManager::IsTimerManagerThread() { return g_timer_thread; }
void TimerManager::StartMainLoopThread() {
main_thread_ = grpc_core::Thread(
"timer_manager",
[](void* arg) {
auto self = static_cast<TimerManager*>(arg);
self->MainLoop();
},
this, nullptr,
grpc_core::Thread::Options().set_tracked(false).set_joinable(false));
main_thread_.Start();
}
TimerManager::TimerManager(
std::shared_ptr<grpc_event_engine::experimental::ThreadPool> thread_pool)
: host_(this), thread_pool_(std::move(thread_pool)) {
timer_list_ = std::make_unique<TimerList>(&host_);
main_loop_exit_signal_.emplace();
StartMainLoopThread();
}
grpc_core::Timestamp TimerManager::Host::Now() {
return grpc_core::Timestamp::FromTimespecRoundDown(
gpr_now(GPR_CLOCK_MONOTONIC));
}
void TimerManager::TimerInit(Timer* timer, grpc_core::Timestamp deadline,
experimental::EventEngine::Closure* closure) {
if (grpc_event_engine_timer_trace.enabled()) {
grpc_core::MutexLock lock(&mu_);
if (shutdown_) {
gpr_log(GPR_ERROR,
"WARNING: TimerManager::%p: scheduling Closure::%p after "
"TimerManager has been shut down.",
this, closure);
}
}
timer_list_->TimerInit(timer, deadline, closure);
}
bool TimerManager::TimerCancel(Timer* timer) {
return timer_list_->TimerCancel(timer);
}
void TimerManager::Shutdown() {
{
grpc_core::MutexLock lock(&mu_);
if (shutdown_) return;
if (grpc_event_engine_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, "TimerManager::%p shutting down", this);
}
shutdown_ = true;
// Wait on the main loop to exit.
cv_wait_.Signal();
}
main_loop_exit_signal_->WaitForNotification();
if (grpc_event_engine_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, "TimerManager::%p shutdown complete", this);
}
}
TimerManager::~TimerManager() { Shutdown(); }
void TimerManager::Host::Kick() { timer_manager_->Kick(); }
void TimerManager::Kick() {
grpc_core::MutexLock lock(&mu_);
kicked_ = true;
cv_wait_.Signal();
}
void TimerManager::RestartPostFork() {
grpc_core::MutexLock lock(&mu_);
GPR_ASSERT(GPR_LIKELY(shutdown_));
if (grpc_event_engine_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, "TimerManager::%p restarting after shutdown", this);
}
shutdown_ = false;
main_loop_exit_signal_.emplace();
StartMainLoopThread();
}
void TimerManager::PrepareFork() { Shutdown(); }
void TimerManager::PostforkParent() { RestartPostFork(); }
void TimerManager::PostforkChild() { RestartPostFork(); }
} // namespace experimental
} // namespace grpc_event_engine