blob: 46f63e68e7a68596a75618ce03851d6ef69a8f6f [file] [log] [blame]
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/resolver/polling_resolver.h"
#include <inttypes.h>
#include <functional>
#include <type_traits>
#include <utility>
#include <vector>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include <grpc/support/log.h>
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/service_config/service_config.h"
#include "src/core/lib/uri/uri_parser.h"
namespace grpc_core {
using ::grpc_event_engine::experimental::EventEngine;
PollingResolver::PollingResolver(ResolverArgs args,
Duration min_time_between_resolutions,
BackOff::Options backoff_options,
TraceFlag* tracer)
: authority_(args.uri.authority()),
name_to_resolve_(absl::StripPrefix(args.uri.path(), "/")),
channel_args_(std::move(args.args)),
work_serializer_(std::move(args.work_serializer)),
result_handler_(std::move(args.result_handler)),
tracer_(tracer),
interested_parties_(args.pollset_set),
min_time_between_resolutions_(min_time_between_resolutions),
backoff_(backoff_options) {
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
gpr_log(GPR_INFO, "[polling resolver %p] created", this);
}
}
PollingResolver::~PollingResolver() {
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
gpr_log(GPR_INFO, "[polling resolver %p] destroying", this);
}
}
void PollingResolver::StartLocked() { MaybeStartResolvingLocked(); }
void PollingResolver::RequestReresolutionLocked() {
if (request_ == nullptr) {
// If we're still waiting for a result-health callback from the last
// result we reported, don't trigger the re-resolution until we get
// that callback.
if (result_status_state_ ==
ResultStatusState::kResultHealthCallbackPending) {
result_status_state_ =
ResultStatusState::kReresolutionRequestedWhileCallbackWasPending;
} else {
MaybeStartResolvingLocked();
}
}
}
void PollingResolver::ResetBackoffLocked() {
backoff_.Reset();
if (next_resolution_timer_handle_.has_value()) {
MaybeCancelNextResolutionTimer();
StartResolvingLocked();
}
}
void PollingResolver::ShutdownLocked() {
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
gpr_log(GPR_INFO, "[polling resolver %p] shutting down", this);
}
shutdown_ = true;
MaybeCancelNextResolutionTimer();
request_.reset();
}
void PollingResolver::ScheduleNextResolutionTimer(const Duration& timeout) {
RefCountedPtr<PollingResolver> self = Ref();
next_resolution_timer_handle_ =
channel_args_.GetObject<EventEngine>()->RunAfter(
timeout, [self = std::move(self)]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto* self_ptr = self.get();
self_ptr->work_serializer_->Run(
[self = std::move(self)]() { self->OnNextResolutionLocked(); },
DEBUG_LOCATION);
});
}
void PollingResolver::OnNextResolutionLocked() {
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
gpr_log(GPR_INFO,
"[polling resolver %p] re-resolution timer fired: shutdown_=%d",
this, shutdown_);
}
// If we haven't been cancelled nor shutdown, then start resolving.
if (next_resolution_timer_handle_.has_value() && !shutdown_) {
next_resolution_timer_handle_.reset();
StartResolvingLocked();
}
}
void PollingResolver::MaybeCancelNextResolutionTimer() {
if (next_resolution_timer_handle_.has_value()) {
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
gpr_log(GPR_INFO, "[polling resolver %p] cancel re-resolution timer",
this);
}
channel_args_.GetObject<EventEngine>()->Cancel(
*next_resolution_timer_handle_);
next_resolution_timer_handle_.reset();
}
}
void PollingResolver::OnRequestComplete(Result result) {
Ref(DEBUG_LOCATION, "OnRequestComplete").release();
work_serializer_->Run(
[this, result]() mutable { OnRequestCompleteLocked(std::move(result)); },
DEBUG_LOCATION);
}
void PollingResolver::OnRequestCompleteLocked(Result result) {
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
gpr_log(GPR_INFO, "[polling resolver %p] request complete", this);
}
request_.reset();
if (!shutdown_) {
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
gpr_log(GPR_INFO,
"[polling resolver %p] returning result: "
"addresses=%s, service_config=%s",
this,
result.addresses.ok()
? absl::StrCat("<", result.addresses->size(), " addresses>")
.c_str()
: result.addresses.status().ToString().c_str(),
result.service_config.ok()
? (*result.service_config == nullptr
? "<null>"
: std::string((*result.service_config)->json_string())
.c_str())
: result.service_config.status().ToString().c_str());
}
GPR_ASSERT(result.result_health_callback == nullptr);
RefCountedPtr<PollingResolver> self =
Ref(DEBUG_LOCATION, "result_health_callback");
result.result_health_callback = [self =
std::move(self)](absl::Status status) {
self->GetResultStatus(std::move(status));
};
result_status_state_ = ResultStatusState::kResultHealthCallbackPending;
result_handler_->ReportResult(std::move(result));
}
Unref(DEBUG_LOCATION, "OnRequestComplete");
}
void PollingResolver::GetResultStatus(absl::Status status) {
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
gpr_log(GPR_INFO, "[polling resolver %p] result status from channel: %s",
this, status.ToString().c_str());
}
if (status.ok()) {
// Reset backoff state so that we start from the beginning when the
// next request gets triggered.
backoff_.Reset();
// If a re-resolution attempt was requested while the result-status
// callback was pending, trigger a new request now.
if (std::exchange(result_status_state_, ResultStatusState::kNone) ==
ResultStatusState::kReresolutionRequestedWhileCallbackWasPending) {
MaybeStartResolvingLocked();
}
} else {
// Set up for retry.
// InvalidateNow to avoid getting stuck re-initializing this timer
// in a loop while draining the currently-held WorkSerializer.
// Also see https://github.com/grpc/grpc/issues/26079.
ExecCtx::Get()->InvalidateNow();
const Timestamp next_try = backoff_.NextAttemptTime();
const Duration timeout = next_try - Timestamp::Now();
GPR_ASSERT(!next_resolution_timer_handle_.has_value());
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
if (timeout > Duration::Zero()) {
gpr_log(GPR_INFO, "[polling resolver %p] retrying in %" PRId64 " ms",
this, timeout.millis());
} else {
gpr_log(GPR_INFO, "[polling resolver %p] retrying immediately", this);
}
}
ScheduleNextResolutionTimer(timeout);
// Reset result_status_state_. Note that even if re-resolution was
// requested while the result-health callback was pending, we can
// ignore it here, because we are in backoff to re-resolve anyway.
result_status_state_ = ResultStatusState::kNone;
}
}
void PollingResolver::MaybeStartResolvingLocked() {
// If there is an existing timer, the time it fires is the earliest time we
// can start the next resolution.
if (next_resolution_timer_handle_.has_value()) return;
if (last_resolution_timestamp_.has_value()) {
// InvalidateNow to avoid getting stuck re-initializing this timer
// in a loop while draining the currently-held WorkSerializer.
// Also see https://github.com/grpc/grpc/issues/26079.
ExecCtx::Get()->InvalidateNow();
const Timestamp earliest_next_resolution =
*last_resolution_timestamp_ + min_time_between_resolutions_;
const Duration time_until_next_resolution =
earliest_next_resolution - Timestamp::Now();
if (time_until_next_resolution > Duration::Zero()) {
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
const Duration last_resolution_ago =
Timestamp::Now() - *last_resolution_timestamp_;
gpr_log(GPR_INFO,
"[polling resolver %p] in cooldown from last resolution "
"(from %" PRId64 " ms ago); will resolve again in %" PRId64
" ms",
this, last_resolution_ago.millis(),
time_until_next_resolution.millis());
}
ScheduleNextResolutionTimer(time_until_next_resolution);
return;
}
}
StartResolvingLocked();
}
void PollingResolver::StartResolvingLocked() {
request_ = StartRequest();
last_resolution_timestamp_ = Timestamp::Now();
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
gpr_log(GPR_INFO, "[polling resolver %p] starting resolution, request_=%p",
this, request_.get());
}
}
} // namespace grpc_core