blob: af9dd27886b3da8791b398665bf64edb37924d42 [file] [log] [blame]
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h"
#include <inttypes.h>
#include <stddef.h>
#include <algorithm>
#include <atomic>
#include <cmath>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
#include "absl/random/random.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
TraceFlag grpc_outlier_detection_lb_trace(false, "outlier_detection_lb");
const char* DisableOutlierDetectionAttribute::kName =
"disable_outlier_detection";
namespace {
using ::grpc_event_engine::experimental::EventEngine;
constexpr absl::string_view kOutlierDetection =
"outlier_detection_experimental";
// Config for xDS Cluster Impl LB policy.
class OutlierDetectionLbConfig : public LoadBalancingPolicy::Config {
public:
OutlierDetectionLbConfig(
OutlierDetectionConfig outlier_detection_config,
RefCountedPtr<LoadBalancingPolicy::Config> child_policy)
: outlier_detection_config_(outlier_detection_config),
child_policy_(std::move(child_policy)) {}
absl::string_view name() const override { return kOutlierDetection; }
bool CountingEnabled() const {
return outlier_detection_config_.success_rate_ejection.has_value() ||
outlier_detection_config_.failure_percentage_ejection.has_value();
}
const OutlierDetectionConfig& outlier_detection_config() const {
return outlier_detection_config_;
}
RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
return child_policy_;
}
private:
OutlierDetectionConfig outlier_detection_config_;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
};
// xDS Cluster Impl LB policy.
class OutlierDetectionLb : public LoadBalancingPolicy {
public:
explicit OutlierDetectionLb(Args args);
absl::string_view name() const override { return kOutlierDetection; }
absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
private:
class SubchannelState;
class SubchannelWrapper : public DelegatingSubchannel {
public:
SubchannelWrapper(RefCountedPtr<SubchannelState> subchannel_state,
RefCountedPtr<SubchannelInterface> subchannel)
: DelegatingSubchannel(std::move(subchannel)),
subchannel_state_(std::move(subchannel_state)) {
if (subchannel_state_ != nullptr) {
subchannel_state_->AddSubchannel(this);
if (subchannel_state_->ejection_time().has_value()) {
ejected_ = true;
}
}
}
~SubchannelWrapper() override {
if (subchannel_state_ != nullptr) {
subchannel_state_->RemoveSubchannel(this);
}
}
void Eject();
void Uneject();
void WatchConnectivityState(
std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override;
void CancelConnectivityStateWatch(
ConnectivityStateWatcherInterface* watcher) override;
RefCountedPtr<SubchannelState> subchannel_state() const {
return subchannel_state_;
}
private:
class WatcherWrapper
: public SubchannelInterface::ConnectivityStateWatcherInterface {
public:
WatcherWrapper(std::unique_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>
watcher,
bool ejected)
: watcher_(std::move(watcher)), ejected_(ejected) {}
void Eject() {
ejected_ = true;
if (last_seen_state_.has_value()) {
watcher_->OnConnectivityStateChange(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError(
"subchannel ejected by outlier detection"));
}
}
void Uneject() {
ejected_ = false;
if (last_seen_state_.has_value()) {
watcher_->OnConnectivityStateChange(*last_seen_state_,
last_seen_status_);
}
}
void OnConnectivityStateChange(grpc_connectivity_state new_state,
absl::Status status) override {
const bool send_update = !last_seen_state_.has_value() || !ejected_;
last_seen_state_ = new_state;
last_seen_status_ = status;
if (send_update) {
if (ejected_) {
new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
status = absl::UnavailableError(
"subchannel ejected by outlier detection");
}
watcher_->OnConnectivityStateChange(new_state, status);
}
}
grpc_pollset_set* interested_parties() override {
return watcher_->interested_parties();
}
private:
std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
watcher_;
absl::optional<grpc_connectivity_state> last_seen_state_;
absl::Status last_seen_status_;
bool ejected_;
};
RefCountedPtr<SubchannelState> subchannel_state_;
bool ejected_ = false;
std::map<SubchannelInterface::ConnectivityStateWatcherInterface*,
WatcherWrapper*>
watchers_;
};
class SubchannelState : public RefCounted<SubchannelState> {
public:
struct Bucket {
std::atomic<uint64_t> successes;
std::atomic<uint64_t> failures;
};
void RotateBucket() {
backup_bucket_->successes = 0;
backup_bucket_->failures = 0;
current_bucket_.swap(backup_bucket_);
active_bucket_.store(current_bucket_.get());
}
absl::optional<std::pair<double, uint64_t>> GetSuccessRateAndVolume() {
uint64_t total_request =
backup_bucket_->successes + backup_bucket_->failures;
if (total_request == 0) {
return absl::nullopt;
}
double success_rate =
backup_bucket_->successes * 100.0 /
(backup_bucket_->successes + backup_bucket_->failures);
return {
{success_rate, backup_bucket_->successes + backup_bucket_->failures}};
}
void AddSubchannel(SubchannelWrapper* wrapper) {
subchannels_.insert(wrapper);
}
void RemoveSubchannel(SubchannelWrapper* wrapper) {
subchannels_.erase(wrapper);
}
void AddSuccessCount() { active_bucket_.load()->successes.fetch_add(1); }
void AddFailureCount() { active_bucket_.load()->failures.fetch_add(1); }
absl::optional<Timestamp> ejection_time() const { return ejection_time_; }
void Eject(const Timestamp& time) {
ejection_time_ = time;
++multiplier_;
// Ejecting the subchannel may cause the child policy to unref the
// subchannel, so we need to be prepared for the set to be modified
// while we are iterating.
for (auto it = subchannels_.begin(); it != subchannels_.end();) {
SubchannelWrapper* subchannel = *it;
++it;
subchannel->Eject();
}
}
void Uneject() {
ejection_time_.reset();
for (auto& subchannel : subchannels_) {
subchannel->Uneject();
}
}
bool MaybeUneject(uint64_t base_ejection_time_in_millis,
uint64_t max_ejection_time_in_millis) {
if (!ejection_time_.has_value()) {
if (multiplier_ > 0) {
--multiplier_;
}
} else {
GPR_ASSERT(ejection_time_.has_value());
auto change_time = ejection_time_.value() +
Duration::Milliseconds(std::min(
base_ejection_time_in_millis * multiplier_,
std::max(base_ejection_time_in_millis,
max_ejection_time_in_millis)));
if (change_time < Timestamp::Now()) {
Uneject();
return true;
}
}
return false;
}
void DisableEjection() {
if (ejection_time_.has_value()) Uneject();
multiplier_ = 0;
}
private:
std::unique_ptr<Bucket> current_bucket_ = std::make_unique<Bucket>();
std::unique_ptr<Bucket> backup_bucket_ = std::make_unique<Bucket>();
// The bucket used to update call counts.
// Points to either current_bucket or active_bucket.
std::atomic<Bucket*> active_bucket_{current_bucket_.get()};
uint32_t multiplier_ = 0;
absl::optional<Timestamp> ejection_time_;
std::set<SubchannelWrapper*> subchannels_;
};
// A picker that wraps the picker from the child to perform outlier detection.
class Picker : public SubchannelPicker {
public:
Picker(OutlierDetectionLb* outlier_detection_lb,
RefCountedPtr<SubchannelPicker> picker, bool counting_enabled);
PickResult Pick(PickArgs args) override;
private:
class SubchannelCallTracker;
RefCountedPtr<SubchannelPicker> picker_;
bool counting_enabled_;
};
class Helper : public ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<OutlierDetectionLb> outlier_detection_policy)
: outlier_detection_policy_(std::move(outlier_detection_policy)) {}
~Helper() override {
outlier_detection_policy_.reset(DEBUG_LOCATION, "Helper");
}
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
private:
RefCountedPtr<OutlierDetectionLb> outlier_detection_policy_;
};
class EjectionTimer : public InternallyRefCounted<EjectionTimer> {
public:
EjectionTimer(RefCountedPtr<OutlierDetectionLb> parent,
Timestamp start_time);
void Orphan() override;
Timestamp StartTime() const { return start_time_; }
private:
void OnTimerLocked();
RefCountedPtr<OutlierDetectionLb> parent_;
absl::optional<EventEngine::TaskHandle> timer_handle_;
Timestamp start_time_;
absl::BitGen bit_gen_;
};
~OutlierDetectionLb() override;
// Returns the address map key for an address, or the empty string if
// the address should be ignored.
static std::string MakeKeyForAddress(const ServerAddress& address);
void ShutdownLocked() override;
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const ChannelArgs& args);
void MaybeUpdatePickerLocked();
// Current config from the resolver.
RefCountedPtr<OutlierDetectionLbConfig> config_;
// Internal state.
bool shutting_down_ = false;
OrphanablePtr<LoadBalancingPolicy> child_policy_;
// Latest state and picker reported by the child policy.
grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
absl::Status status_;
RefCountedPtr<SubchannelPicker> picker_;
std::map<std::string, RefCountedPtr<SubchannelState>> subchannel_state_map_;
OrphanablePtr<EjectionTimer> ejection_timer_;
};
//
// OutlierDetectionLb::SubchannelWrapper
//
void OutlierDetectionLb::SubchannelWrapper::Eject() {
ejected_ = true;
// Ejecting the subchannel may cause the child policy to cancel the watch,
// so we need to be prepared for the map to be modified while we are
// iterating.
for (auto it = watchers_.begin(); it != watchers_.end();) {
WatcherWrapper* watcher = it->second;
++it;
watcher->Eject();
}
}
void OutlierDetectionLb::SubchannelWrapper::Uneject() {
ejected_ = false;
for (auto& watcher : watchers_) {
watcher.second->Uneject();
}
}
void OutlierDetectionLb::SubchannelWrapper::WatchConnectivityState(
std::unique_ptr<ConnectivityStateWatcherInterface> watcher) {
ConnectivityStateWatcherInterface* watcher_ptr = watcher.get();
auto watcher_wrapper =
std::make_unique<WatcherWrapper>(std::move(watcher), ejected_);
watchers_.emplace(watcher_ptr, watcher_wrapper.get());
wrapped_subchannel()->WatchConnectivityState(std::move(watcher_wrapper));
}
void OutlierDetectionLb::SubchannelWrapper::CancelConnectivityStateWatch(
ConnectivityStateWatcherInterface* watcher) {
auto it = watchers_.find(watcher);
if (it == watchers_.end()) return;
wrapped_subchannel()->CancelConnectivityStateWatch(it->second);
watchers_.erase(it);
}
//
// OutlierDetectionLb::Picker::SubchannelCallTracker
//
class OutlierDetectionLb::Picker::SubchannelCallTracker
: public LoadBalancingPolicy::SubchannelCallTrackerInterface {
public:
SubchannelCallTracker(
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
original_subchannel_call_tracker,
RefCountedPtr<SubchannelState> subchannel_state)
: original_subchannel_call_tracker_(
std::move(original_subchannel_call_tracker)),
subchannel_state_(std::move(subchannel_state)) {}
~SubchannelCallTracker() override {
subchannel_state_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
}
void Start() override {
// This tracker does not care about started calls only finished calls.
// Delegate if needed.
if (original_subchannel_call_tracker_ != nullptr) {
original_subchannel_call_tracker_->Start();
}
}
void Finish(FinishArgs args) override {
// Delegate if needed.
if (original_subchannel_call_tracker_ != nullptr) {
original_subchannel_call_tracker_->Finish(args);
}
// Record call completion based on status for outlier detection
// calculations.
if (subchannel_state_ != nullptr) {
if (args.status.ok()) {
subchannel_state_->AddSuccessCount();
} else {
subchannel_state_->AddFailureCount();
}
}
}
private:
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
original_subchannel_call_tracker_;
RefCountedPtr<SubchannelState> subchannel_state_;
};
//
// OutlierDetectionLb::Picker
//
OutlierDetectionLb::Picker::Picker(OutlierDetectionLb* outlier_detection_lb,
RefCountedPtr<SubchannelPicker> picker,
bool counting_enabled)
: picker_(std::move(picker)), counting_enabled_(counting_enabled) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] constructed new picker %p and counting "
"is %s",
outlier_detection_lb, this,
(counting_enabled ? "enabled" : "disabled"));
}
}
LoadBalancingPolicy::PickResult OutlierDetectionLb::Picker::Pick(
LoadBalancingPolicy::PickArgs args) {
if (picker_ == nullptr) { // Should never happen.
return PickResult::Fail(absl::InternalError(
"outlier_detection picker not given any child picker"));
}
// Delegate to child picker
PickResult result = picker_->Pick(args);
auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
if (complete_pick != nullptr) {
// Unwrap subchannel to pass back up the stack.
auto* subchannel_wrapper =
static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
// Inject subchannel call tracker to record call completion as long as
// not both success_rate_ejection and failure_percentage_ejection are unset.
if (counting_enabled_) {
complete_pick->subchannel_call_tracker =
std::make_unique<SubchannelCallTracker>(
std::move(complete_pick->subchannel_call_tracker),
subchannel_wrapper->subchannel_state());
}
complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
}
return result;
}
//
// OutlierDetectionLb
//
OutlierDetectionLb::OutlierDetectionLb(Args args)
: LoadBalancingPolicy(std::move(args)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] created", this);
}
}
OutlierDetectionLb::~OutlierDetectionLb() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] destroying outlier_detection LB policy",
this);
}
}
std::string OutlierDetectionLb::MakeKeyForAddress(
const ServerAddress& address) {
// If the address has the DisableOutlierDetectionAttribute attribute,
// ignore it.
// TODO(roth): This is a hack to prevent outlier_detection from
// working with pick_first, as per discussion in
// https://github.com/grpc/grpc/issues/32967. Remove this as part of
// implementing dualstack backend support.
if (address.GetAttribute(DisableOutlierDetectionAttribute::kName) !=
nullptr) {
return "";
}
// Use only the address, not the attributes.
auto addr_str = grpc_sockaddr_to_string(&address.address(), false);
// If address couldn't be stringified, ignore it.
if (!addr_str.ok()) return "";
return std::move(*addr_str);
}
void OutlierDetectionLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] shutting down", this);
}
ejection_timer_.reset();
shutting_down_ = true;
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
if (child_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
interested_parties());
child_policy_.reset();
}
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
picker_.reset();
}
void OutlierDetectionLb::ExitIdleLocked() {
if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
}
void OutlierDetectionLb::ResetBackoffLocked() {
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
}
absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] Received update", this);
}
auto old_config = std::move(config_);
// Update config.
config_ = std::move(args.config);
// Update outlier detection timer.
if (!config_->CountingEnabled()) {
// No need for timer. Cancel the current timer, if any.
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] counting disabled, cancelling timer",
this);
}
ejection_timer_.reset();
} else if (ejection_timer_ == nullptr) {
// No timer running. Start it now.
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] starting timer", this);
}
ejection_timer_ = MakeOrphanable<EjectionTimer>(Ref(), Timestamp::Now());
for (const auto& p : subchannel_state_map_) {
p.second->RotateBucket(); // Reset call counters.
}
} else if (old_config->outlier_detection_config().interval !=
config_->outlier_detection_config().interval) {
// Timer interval changed. Cancel the current timer and start a new one
// with the same start time.
// Note that if the new deadline is in the past, the timer will fire
// immediately.
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] interval changed, replacing timer",
this);
}
ejection_timer_ =
MakeOrphanable<EjectionTimer>(Ref(), ejection_timer_->StartTime());
}
// Update subchannel state map.
if (args.addresses.ok()) {
std::set<std::string> current_addresses;
for (const ServerAddress& address : *args.addresses) {
std::string address_key = MakeKeyForAddress(address);
if (address_key.empty()) continue;
auto& subchannel_state = subchannel_state_map_[address_key];
if (subchannel_state == nullptr) {
subchannel_state = MakeRefCounted<SubchannelState>();
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] adding map entry for %s (%p)",
this, address_key.c_str(), subchannel_state.get());
}
} else if (!config_->CountingEnabled()) {
// If counting is not enabled, reset state.
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] counting disabled; disabling "
"ejection for %s (%p)",
this, address_key.c_str(), subchannel_state.get());
}
subchannel_state->DisableEjection();
}
current_addresses.emplace(address_key);
}
for (auto it = subchannel_state_map_.begin();
it != subchannel_state_map_.end();) {
if (current_addresses.find(it->first) == current_addresses.end()) {
// remove each map entry for a subchannel address not in the updated
// address list.
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] removing map entry for %s (%p)",
this, it->first.c_str(), it->second.get());
}
it = subchannel_state_map_.erase(it);
} else {
++it;
}
}
}
// Create child policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(args.args);
}
// Update child policy.
UpdateArgs update_args;
update_args.addresses = std::move(args.addresses);
update_args.resolution_note = std::move(args.resolution_note);
update_args.config = config_->child_policy();
// Update the policy.
update_args.args = std::move(args.args);
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] Updating child policy handler %p", this,
child_policy_.get());
}
return child_policy_->UpdateLocked(std::move(update_args));
}
void OutlierDetectionLb::MaybeUpdatePickerLocked() {
if (picker_ != nullptr) {
auto outlier_detection_picker =
MakeRefCounted<Picker>(this, picker_, config_->CountingEnabled());
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] updating connectivity: state=%s "
"status=(%s) picker=%p",
this, ConnectivityStateName(state_), status_.ToString().c_str(),
outlier_detection_picker.get());
}
channel_control_helper()->UpdateState(state_, status_,
std::move(outlier_detection_picker));
}
}
OrphanablePtr<LoadBalancingPolicy> OutlierDetectionLb::CreateChildPolicyLocked(
const ChannelArgs& args) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.work_serializer = work_serializer();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
std::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_outlier_detection_lb_trace);
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] Created new child policy handler %p",
this, lb_policy.get());
}
// Add our interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on
// this policy, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
interested_parties());
return lb_policy;
}
//
// OutlierDetectionLb::Helper
//
RefCountedPtr<SubchannelInterface> OutlierDetectionLb::Helper::CreateSubchannel(
ServerAddress address, const ChannelArgs& args) {
if (outlier_detection_policy_->shutting_down_) return nullptr;
RefCountedPtr<SubchannelState> subchannel_state;
std::string key = MakeKeyForAddress(address);
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] using key %s for subchannel address %s",
outlier_detection_policy_.get(), key.c_str(),
address.ToString().c_str());
}
if (!key.empty()) {
auto it = outlier_detection_policy_->subchannel_state_map_.find(key);
if (it != outlier_detection_policy_->subchannel_state_map_.end()) {
subchannel_state = it->second->Ref();
}
}
auto subchannel = MakeRefCounted<SubchannelWrapper>(
subchannel_state,
outlier_detection_policy_->channel_control_helper()->CreateSubchannel(
std::move(address), args));
if (subchannel_state != nullptr) {
subchannel_state->AddSubchannel(subchannel.get());
}
return subchannel;
}
void OutlierDetectionLb::Helper::UpdateState(
grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr<SubchannelPicker> picker) {
if (outlier_detection_policy_->shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] child connectivity state update: "
"state=%s (%s) picker=%p",
outlier_detection_policy_.get(), ConnectivityStateName(state),
status.ToString().c_str(), picker.get());
}
// Save the state and picker.
outlier_detection_policy_->state_ = state;
outlier_detection_policy_->status_ = status;
outlier_detection_policy_->picker_ = std::move(picker);
// Wrap the picker and return it to the channel.
outlier_detection_policy_->MaybeUpdatePickerLocked();
}
void OutlierDetectionLb::Helper::RequestReresolution() {
if (outlier_detection_policy_->shutting_down_) return;
outlier_detection_policy_->channel_control_helper()->RequestReresolution();
}
absl::string_view OutlierDetectionLb::Helper::GetAuthority() {
return outlier_detection_policy_->channel_control_helper()->GetAuthority();
}
grpc_event_engine::experimental::EventEngine*
OutlierDetectionLb::Helper::GetEventEngine() {
return outlier_detection_policy_->channel_control_helper()->GetEventEngine();
}
void OutlierDetectionLb::Helper::AddTraceEvent(TraceSeverity severity,
absl::string_view message) {
if (outlier_detection_policy_->shutting_down_) return;
outlier_detection_policy_->channel_control_helper()->AddTraceEvent(severity,
message);
}
//
// OutlierDetectionLb::EjectionTimer
//
OutlierDetectionLb::EjectionTimer::EjectionTimer(
RefCountedPtr<OutlierDetectionLb> parent, Timestamp start_time)
: parent_(std::move(parent)), start_time_(start_time) {
auto interval = parent_->config_->outlier_detection_config().interval;
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer will run in %s",
parent_.get(), interval.ToString().c_str());
}
timer_handle_ = parent_->channel_control_helper()->GetEventEngine()->RunAfter(
interval, [self = Ref(DEBUG_LOCATION, "EjectionTimer")]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto self_ptr = self.get();
self_ptr->parent_->work_serializer()->Run(
[self = std::move(self)]() { self->OnTimerLocked(); },
DEBUG_LOCATION);
});
}
void OutlierDetectionLb::EjectionTimer::Orphan() {
if (timer_handle_.has_value()) {
parent_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
timer_handle_.reset();
}
Unref();
}
void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
if (!timer_handle_.has_value()) return;
timer_handle_.reset();
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer running",
parent_.get());
}
std::map<SubchannelState*, double> success_rate_ejection_candidates;
std::map<SubchannelState*, double> failure_percentage_ejection_candidates;
size_t ejected_host_count = 0;
double success_rate_sum = 0;
auto time_now = Timestamp::Now();
auto& config = parent_->config_->outlier_detection_config();
for (auto& state : parent_->subchannel_state_map_) {
auto* subchannel_state = state.second.get();
// For each address, swap the call counter's buckets in that address's
// map entry.
subchannel_state->RotateBucket();
// Gather data to run success rate algorithm or failure percentage
// algorithm.
if (subchannel_state->ejection_time().has_value()) {
++ejected_host_count;
}
absl::optional<std::pair<double, uint64_t>> host_success_rate_and_volume =
subchannel_state->GetSuccessRateAndVolume();
if (!host_success_rate_and_volume.has_value()) {
continue;
}
double success_rate = host_success_rate_and_volume->first;
uint64_t request_volume = host_success_rate_and_volume->second;
if (config.success_rate_ejection.has_value()) {
if (request_volume >= config.success_rate_ejection->request_volume) {
success_rate_ejection_candidates[subchannel_state] = success_rate;
success_rate_sum += success_rate;
}
}
if (config.failure_percentage_ejection.has_value()) {
if (request_volume >=
config.failure_percentage_ejection->request_volume) {
failure_percentage_ejection_candidates[subchannel_state] = success_rate;
}
}
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] found %" PRIuPTR
" success rate candidates and %" PRIuPTR
" failure percentage candidates; ejected_host_count=%" PRIuPTR
"; success_rate_sum=%.3f",
parent_.get(), success_rate_ejection_candidates.size(),
failure_percentage_ejection_candidates.size(), ejected_host_count,
success_rate_sum);
}
// success rate algorithm
if (!success_rate_ejection_candidates.empty() &&
success_rate_ejection_candidates.size() >=
config.success_rate_ejection->minimum_hosts) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] running success rate algorithm: "
"stdev_factor=%d, enforcement_percentage=%d",
parent_.get(), config.success_rate_ejection->stdev_factor,
config.success_rate_ejection->enforcement_percentage);
}
// calculate ejection threshold: (mean - stdev *
// (success_rate_ejection.stdev_factor / 1000))
double mean = success_rate_sum / success_rate_ejection_candidates.size();
double variance = 0;
for (const auto& p : success_rate_ejection_candidates) {
variance += std::pow(p.second - mean, 2);
}
variance /= success_rate_ejection_candidates.size();
double stdev = std::sqrt(variance);
const double success_rate_stdev_factor =
static_cast<double>(config.success_rate_ejection->stdev_factor) / 1000;
double ejection_threshold = mean - stdev * success_rate_stdev_factor;
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] stdev=%.3f, ejection_threshold=%.3f",
parent_.get(), stdev, ejection_threshold);
}
for (auto& candidate : success_rate_ejection_candidates) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] checking candidate %p: "
"success_rate=%.3f",
parent_.get(), candidate.first, candidate.second);
}
if (candidate.second < ejection_threshold) {
uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
double current_percent =
100.0 * ejected_host_count / parent_->subchannel_state_map_.size();
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] random_key=%d "
"ejected_host_count=%" PRIuPTR " current_percent=%.3f",
parent_.get(), random_key, ejected_host_count,
current_percent);
}
if (random_key < config.success_rate_ejection->enforcement_percentage &&
(ejected_host_count == 0 ||
(current_percent < config.max_ejection_percent))) {
// Eject and record the timestamp for use when ejecting addresses in
// this iteration.
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate",
parent_.get());
}
candidate.first->Eject(time_now);
++ejected_host_count;
}
}
}
}
// failure percentage algorithm
if (!failure_percentage_ejection_candidates.empty() &&
failure_percentage_ejection_candidates.size() >=
config.failure_percentage_ejection->minimum_hosts) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] running failure percentage algorithm: "
"threshold=%d, enforcement_percentage=%d",
parent_.get(), config.failure_percentage_ejection->threshold,
config.failure_percentage_ejection->enforcement_percentage);
}
for (auto& candidate : failure_percentage_ejection_candidates) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] checking candidate %p: "
"success_rate=%.3f",
parent_.get(), candidate.first, candidate.second);
}
// Extra check to make sure success rate algorithm didn't already
// eject this backend.
if (candidate.first->ejection_time().has_value()) continue;
if ((100.0 - candidate.second) >
config.failure_percentage_ejection->threshold) {
uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
double current_percent =
100.0 * ejected_host_count / parent_->subchannel_state_map_.size();
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] random_key=%d "
"ejected_host_count=%" PRIuPTR " current_percent=%.3f",
parent_.get(), random_key, ejected_host_count,
current_percent);
}
if (random_key <
config.failure_percentage_ejection->enforcement_percentage &&
(ejected_host_count == 0 ||
(current_percent < config.max_ejection_percent))) {
// Eject and record the timestamp for use when ejecting addresses in
// this iteration.
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate",
parent_.get());
}
candidate.first->Eject(time_now);
++ejected_host_count;
}
}
}
}
// For each address in the map:
// If the address is not ejected and the multiplier is greater than 0,
// decrease the multiplier by 1. If the address is ejected, and the
// current time is after ejection_timestamp + min(base_ejection_time *
// multiplier, max(base_ejection_time, max_ejection_time)), un-eject the
// address.
for (auto& state : parent_->subchannel_state_map_) {
auto* subchannel_state = state.second.get();
const bool unejected = subchannel_state->MaybeUneject(
config.base_ejection_time.millis(), config.max_ejection_time.millis());
if (unejected && GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] unejected address %s (%p)",
parent_.get(), state.first.c_str(), subchannel_state);
}
}
parent_->ejection_timer_ =
MakeOrphanable<EjectionTimer>(parent_, Timestamp::Now());
}
//
// factory
//
class OutlierDetectionLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<OutlierDetectionLb>(std::move(args));
}
absl::string_view name() const override { return kOutlierDetection; }
absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json& json) const override {
ValidationErrors errors;
OutlierDetectionConfig outlier_detection_config;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
{
outlier_detection_config =
LoadFromJson<OutlierDetectionConfig>(json, JsonArgs(), &errors);
// Parse childPolicy manually.
{
ValidationErrors::ScopedField field(&errors, ".childPolicy");
auto it = json.object().find("childPolicy");
if (it == json.object().end()) {
errors.AddError("field not present");
} else {
auto child_policy_config = CoreConfiguration::Get()
.lb_policy_registry()
.ParseLoadBalancingConfig(it->second);
if (!child_policy_config.ok()) {
errors.AddError(child_policy_config.status().message());
} else {
child_policy = std::move(*child_policy_config);
}
}
}
}
if (!errors.ok()) {
return errors.status(
absl::StatusCode::kInvalidArgument,
"errors validating outlier_detection LB policy config");
}
return MakeRefCounted<OutlierDetectionLbConfig>(outlier_detection_config,
std::move(child_policy));
}
};
} // namespace
//
// OutlierDetectionConfig
//
const JsonLoaderInterface*
OutlierDetectionConfig::SuccessRateEjection::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<SuccessRateEjection>()
.OptionalField("stdevFactor", &SuccessRateEjection::stdev_factor)
.OptionalField("enforcementPercentage",
&SuccessRateEjection::enforcement_percentage)
.OptionalField("minimumHosts", &SuccessRateEjection::minimum_hosts)
.OptionalField("requestVolume", &SuccessRateEjection::request_volume)
.Finish();
return loader;
}
void OutlierDetectionConfig::SuccessRateEjection::JsonPostLoad(
const Json&, const JsonArgs&, ValidationErrors* errors) {
if (enforcement_percentage > 100) {
ValidationErrors::ScopedField field(errors, ".enforcement_percentage");
errors->AddError("value must be <= 100");
}
}
const JsonLoaderInterface*
OutlierDetectionConfig::FailurePercentageEjection::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<FailurePercentageEjection>()
.OptionalField("threshold", &FailurePercentageEjection::threshold)
.OptionalField("enforcementPercentage",
&FailurePercentageEjection::enforcement_percentage)
.OptionalField("minimumHosts",
&FailurePercentageEjection::minimum_hosts)
.OptionalField("requestVolume",
&FailurePercentageEjection::request_volume)
.Finish();
return loader;
}
void OutlierDetectionConfig::FailurePercentageEjection::JsonPostLoad(
const Json&, const JsonArgs&, ValidationErrors* errors) {
if (enforcement_percentage > 100) {
ValidationErrors::ScopedField field(errors, ".enforcement_percentage");
errors->AddError("value must be <= 100");
}
if (threshold > 100) {
ValidationErrors::ScopedField field(errors, ".threshold");
errors->AddError("value must be <= 100");
}
}
const JsonLoaderInterface* OutlierDetectionConfig::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<OutlierDetectionConfig>()
.OptionalField("interval", &OutlierDetectionConfig::interval)
.OptionalField("baseEjectionTime",
&OutlierDetectionConfig::base_ejection_time)
.OptionalField("maxEjectionTime",
&OutlierDetectionConfig::max_ejection_time)
.OptionalField("maxEjectionPercent",
&OutlierDetectionConfig::max_ejection_percent)
.OptionalField("successRateEjection",
&OutlierDetectionConfig::success_rate_ejection)
.OptionalField("failurePercentageEjection",
&OutlierDetectionConfig::failure_percentage_ejection)
.Finish();
return loader;
}
void OutlierDetectionConfig::JsonPostLoad(const Json& json, const JsonArgs&,
ValidationErrors* errors) {
if (json.object().find("maxEjectionTime") == json.object().end()) {
max_ejection_time = std::max(base_ejection_time, Duration::Seconds(300));
}
if (max_ejection_percent > 100) {
ValidationErrors::ScopedField field(errors, ".max_ejection_percent");
errors->AddError("value must be <= 100");
}
}
//
// Plugin registration
//
void RegisterOutlierDetectionLbPolicy(CoreConfiguration::Builder* builder) {
builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
std::make_unique<OutlierDetectionLbFactory>());
}
} // namespace grpc_core