blob: d104dbe520eef2ad429c0901de171ec493b150b1 [file] [log] [blame]
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h"
#include <inttypes.h>
#include <stdlib.h>
#include <algorithm>
#include <cmath>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/base/attributes.h"
#include "absl/container/inlined_vector.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#define XXH_INLINE_ALL
#include "xxhash.h"
#include <grpc/grpc.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/client_channel_internal.h"
#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
TraceFlag grpc_lb_ring_hash_trace(false, "ring_hash_lb");
UniqueTypeName RequestHashAttribute::TypeName() {
static UniqueTypeName::Factory kFactory("request_hash");
return kFactory.Create();
}
// Helper Parser method
const JsonLoaderInterface* RingHashConfig::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<RingHashConfig>()
.OptionalField("minRingSize", &RingHashConfig::min_ring_size)
.OptionalField("maxRingSize", &RingHashConfig::max_ring_size)
.Finish();
return loader;
}
void RingHashConfig::JsonPostLoad(const Json&, const JsonArgs&,
ValidationErrors* errors) {
{
ValidationErrors::ScopedField field(errors, ".minRingSize");
if (!errors->FieldHasErrors() &&
(min_ring_size == 0 || min_ring_size > 8388608)) {
errors->AddError("must be in the range [1, 8388608]");
}
}
{
ValidationErrors::ScopedField field(errors, ".maxRingSize");
if (!errors->FieldHasErrors() &&
(max_ring_size == 0 || max_ring_size > 8388608)) {
errors->AddError("must be in the range [1, 8388608]");
}
}
if (min_ring_size > max_ring_size) {
errors->AddError("max_ring_size cannot be smaller than min_ring_size");
}
}
namespace {
constexpr absl::string_view kRingHash = "ring_hash_experimental";
class RingHashLbConfig : public LoadBalancingPolicy::Config {
public:
RingHashLbConfig(size_t min_ring_size, size_t max_ring_size)
: min_ring_size_(min_ring_size), max_ring_size_(max_ring_size) {}
absl::string_view name() const override { return kRingHash; }
size_t min_ring_size() const { return min_ring_size_; }
size_t max_ring_size() const { return max_ring_size_; }
private:
size_t min_ring_size_;
size_t max_ring_size_;
};
//
// ring_hash LB policy
//
constexpr size_t kRingSizeCapDefault = 4096;
class RingHash : public LoadBalancingPolicy {
public:
explicit RingHash(Args args);
absl::string_view name() const override { return kRingHash; }
absl::Status UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;
private:
// Forward declaration.
class RingHashSubchannelList;
// Data for a particular subchannel in a subchannel list.
// This subclass adds the following functionality:
// - Tracks the previous connectivity state of the subchannel, so that
// we know how many subchannels are in each state.
class RingHashSubchannelData
: public SubchannelData<RingHashSubchannelList, RingHashSubchannelData> {
public:
RingHashSubchannelData(
SubchannelList<RingHashSubchannelList, RingHashSubchannelData>*
subchannel_list,
const ServerAddress& address,
RefCountedPtr<SubchannelInterface> subchannel)
: SubchannelData(subchannel_list, address, std::move(subchannel)),
address_(address) {}
const ServerAddress& address() const { return address_; }
grpc_connectivity_state logical_connectivity_state() const {
return logical_connectivity_state_;
}
const absl::Status& logical_connectivity_status() const {
return logical_connectivity_status_;
}
private:
// Performs connectivity state updates that need to be done only
// after we have started watching.
void ProcessConnectivityChangeLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) override;
ServerAddress address_;
// Last logical connectivity state seen.
// Note that this may differ from the state actually reported by the
// subchannel in some cases; for example, once this is set to
// TRANSIENT_FAILURE, we do not change it again until we get READY,
// so we skip any interim stops in CONNECTING.
grpc_connectivity_state logical_connectivity_state_ = GRPC_CHANNEL_IDLE;
absl::Status logical_connectivity_status_;
};
// A list of subchannels and the ring containing those subchannels.
class RingHashSubchannelList
: public SubchannelList<RingHashSubchannelList, RingHashSubchannelData> {
public:
class Ring : public RefCounted<Ring> {
public:
struct RingEntry {
uint64_t hash;
size_t subchannel_index;
};
Ring(RingHashLbConfig* config, RingHashSubchannelList* subchannel_list,
const ChannelArgs& args);
const std::vector<RingEntry>& ring() const { return ring_; }
private:
std::vector<RingEntry> ring_;
};
RingHashSubchannelList(RingHash* policy, ServerAddressList addresses,
const ChannelArgs& args);
~RingHashSubchannelList() override {
RingHash* p = static_cast<RingHash*>(policy());
p->Unref(DEBUG_LOCATION, "subchannel_list");
}
RefCountedPtr<Ring> ring() { return ring_; }
// Updates the counters of subchannels in each state when a
// subchannel transitions from old_state to new_state.
void UpdateStateCountersLocked(grpc_connectivity_state old_state,
grpc_connectivity_state new_state);
// Updates the RH policy's connectivity state based on the
// subchannel list's state counters, creating new picker and new ring.
// The index parameter indicates the index into the list of the subchannel
// whose status report triggered the call to
// UpdateRingHashConnectivityStateLocked().
// connection_attempt_complete is true if the subchannel just
// finished a connection attempt.
void UpdateRingHashConnectivityStateLocked(size_t index,
bool connection_attempt_complete,
absl::Status status);
private:
std::shared_ptr<WorkSerializer> work_serializer() const override {
return static_cast<RingHash*>(policy())->work_serializer();
}
size_t num_idle_;
size_t num_ready_ = 0;
size_t num_connecting_ = 0;
size_t num_transient_failure_ = 0;
RefCountedPtr<Ring> ring_;
// The index of the subchannel currently doing an internally
// triggered connection attempt, if any.
absl::optional<size_t> internally_triggered_connection_index_;
// TODO(roth): If we ever change the helper UpdateState() API to not
// need the status reported for TRANSIENT_FAILURE state (because
// it's not currently actually used for anything outside of the picker),
// then we will no longer need this data member.
absl::Status last_failure_;
};
class Picker : public SubchannelPicker {
public:
Picker(RefCountedPtr<RingHash> ring_hash_lb,
RingHashSubchannelList* subchannel_list)
: ring_hash_lb_(std::move(ring_hash_lb)),
ring_(subchannel_list->ring()) {
subchannels_.reserve(subchannel_list->num_subchannels());
for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
RingHashSubchannelData* subchannel_data =
subchannel_list->subchannel(i);
subchannels_.emplace_back(
SubchannelInfo{subchannel_data->subchannel()->Ref(),
subchannel_data->logical_connectivity_state(),
subchannel_data->logical_connectivity_status()});
}
}
PickResult Pick(PickArgs args) override;
private:
// A fire-and-forget class that schedules subchannel connection attempts
// on the control plane WorkSerializer.
class SubchannelConnectionAttempter : public Orphanable {
public:
explicit SubchannelConnectionAttempter(
RefCountedPtr<RingHash> ring_hash_lb)
: ring_hash_lb_(std::move(ring_hash_lb)) {
GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
}
void Orphan() override {
// Hop into ExecCtx, so that we're not holding the data plane mutex
// while we run control-plane code.
ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
}
void AddSubchannel(RefCountedPtr<SubchannelInterface> subchannel) {
subchannels_.push_back(std::move(subchannel));
}
private:
static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) {
auto* self = static_cast<SubchannelConnectionAttempter*>(arg);
self->ring_hash_lb_->work_serializer()->Run(
[self]() {
if (!self->ring_hash_lb_->shutdown_) {
for (auto& subchannel : self->subchannels_) {
subchannel->RequestConnection();
}
}
delete self;
},
DEBUG_LOCATION);
}
RefCountedPtr<RingHash> ring_hash_lb_;
grpc_closure closure_;
std::vector<RefCountedPtr<SubchannelInterface>> subchannels_;
};
struct SubchannelInfo {
RefCountedPtr<SubchannelInterface> subchannel;
grpc_connectivity_state state;
absl::Status status;
};
RefCountedPtr<RingHash> ring_hash_lb_;
RefCountedPtr<RingHashSubchannelList::Ring> ring_;
std::vector<SubchannelInfo> subchannels_;
};
~RingHash() override;
void ShutdownLocked() override;
// Current config from resolver.
RefCountedPtr<RingHashLbConfig> config_;
// list of subchannels.
RefCountedPtr<RingHashSubchannelList> subchannel_list_;
RefCountedPtr<RingHashSubchannelList> latest_pending_subchannel_list_;
// indicating if we are shutting down.
bool shutdown_ = false;
};
//
// RingHash::Picker
//
RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
auto* hash_attribute = static_cast<RequestHashAttribute*>(
call_state->GetCallAttribute(RequestHashAttribute::TypeName()));
absl::string_view hash;
if (hash_attribute != nullptr) {
hash = hash_attribute->request_hash();
}
uint64_t h;
if (!absl::SimpleAtoi(hash, &h)) {
return PickResult::Fail(
absl::InternalError("ring hash value is not a number"));
}
const auto& ring = ring_->ring();
// Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c
// (ketama_get_server) NOTE: The algorithm depends on using signed integers
// for lowp, highp, and first_index. Do not change them!
int64_t lowp = 0;
int64_t highp = ring.size();
int64_t first_index = 0;
while (true) {
first_index = (lowp + highp) / 2;
if (first_index == static_cast<int64_t>(ring.size())) {
first_index = 0;
break;
}
uint64_t midval = ring[first_index].hash;
uint64_t midval1 = first_index == 0 ? 0 : ring[first_index - 1].hash;
if (h <= midval && h > midval1) {
break;
}
if (midval < h) {
lowp = first_index + 1;
} else {
highp = first_index - 1;
}
if (lowp > highp) {
first_index = 0;
break;
}
}
OrphanablePtr<SubchannelConnectionAttempter> subchannel_connection_attempter;
auto ScheduleSubchannelConnectionAttempt =
[&](RefCountedPtr<SubchannelInterface> subchannel) {
if (subchannel_connection_attempter == nullptr) {
subchannel_connection_attempter =
MakeOrphanable<SubchannelConnectionAttempter>(ring_hash_lb_->Ref(
DEBUG_LOCATION, "SubchannelConnectionAttempter"));
}
subchannel_connection_attempter->AddSubchannel(std::move(subchannel));
};
SubchannelInfo& first_subchannel =
subchannels_[ring[first_index].subchannel_index];
switch (first_subchannel.state) {
case GRPC_CHANNEL_READY:
return PickResult::Complete(first_subchannel.subchannel);
case GRPC_CHANNEL_IDLE:
ScheduleSubchannelConnectionAttempt(first_subchannel.subchannel);
ABSL_FALLTHROUGH_INTENDED;
case GRPC_CHANNEL_CONNECTING:
return PickResult::Queue();
default: // GRPC_CHANNEL_TRANSIENT_FAILURE
break;
}
ScheduleSubchannelConnectionAttempt(first_subchannel.subchannel);
// Loop through remaining subchannels to find one in READY.
// On the way, we make sure the right set of connection attempts
// will happen.
bool found_second_subchannel = false;
bool found_first_non_failed = false;
for (size_t i = 1; i < ring.size(); ++i) {
const auto& entry = ring[(first_index + i) % ring.size()];
if (entry.subchannel_index == ring[first_index].subchannel_index) {
continue;
}
SubchannelInfo& subchannel_info = subchannels_[entry.subchannel_index];
if (subchannel_info.state == GRPC_CHANNEL_READY) {
return PickResult::Complete(subchannel_info.subchannel);
}
if (!found_second_subchannel) {
switch (subchannel_info.state) {
case GRPC_CHANNEL_IDLE:
ScheduleSubchannelConnectionAttempt(subchannel_info.subchannel);
ABSL_FALLTHROUGH_INTENDED;
case GRPC_CHANNEL_CONNECTING:
return PickResult::Queue();
default:
break;
}
found_second_subchannel = true;
}
if (!found_first_non_failed) {
if (subchannel_info.state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
ScheduleSubchannelConnectionAttempt(subchannel_info.subchannel);
} else {
if (subchannel_info.state == GRPC_CHANNEL_IDLE) {
ScheduleSubchannelConnectionAttempt(subchannel_info.subchannel);
}
found_first_non_failed = true;
}
}
}
return PickResult::Fail(absl::UnavailableError(absl::StrCat(
"ring hash cannot find a connected subchannel; first failure: ",
first_subchannel.status.ToString())));
}
//
// RingHash::RingHashSubchannelList::Ring
//
RingHash::RingHashSubchannelList::Ring::Ring(
RingHashLbConfig* config, RingHashSubchannelList* subchannel_list,
const ChannelArgs& args) {
// Store the weights while finding the sum.
struct AddressWeight {
std::string address;
// Default weight is 1 for the cases where a weight is not provided,
// each occurrence of the address will be counted a weight value of 1.
uint32_t weight = 1;
double normalized_weight;
};
std::vector<AddressWeight> address_weights;
size_t sum = 0;
address_weights.reserve(subchannel_list->num_subchannels());
for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
RingHashSubchannelData* sd = subchannel_list->subchannel(i);
const ServerAddressWeightAttribute* weight_attribute = static_cast<
const ServerAddressWeightAttribute*>(sd->address().GetAttribute(
ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
AddressWeight address_weight;
address_weight.address =
grpc_sockaddr_to_string(&sd->address().address(), false).value();
// Weight should never be zero, but ignore it just in case, since
// that value would screw up the ring-building algorithm.
if (weight_attribute != nullptr && weight_attribute->weight() > 0) {
address_weight.weight = weight_attribute->weight();
}
sum += address_weight.weight;
address_weights.push_back(std::move(address_weight));
}
// Calculating normalized weights and find min and max.
double min_normalized_weight = 1.0;
double max_normalized_weight = 0.0;
for (auto& address : address_weights) {
address.normalized_weight = static_cast<double>(address.weight) / sum;
min_normalized_weight =
std::min(address.normalized_weight, min_normalized_weight);
max_normalized_weight =
std::max(address.normalized_weight, max_normalized_weight);
}
// Scale up the number of hashes per host such that the least-weighted host
// gets a whole number of hashes on the ring. Other hosts might not end up
// with whole numbers, and that's fine (the ring-building algorithm below can
// handle this). This preserves the original implementation's behavior: when
// weights aren't provided, all hosts should get an equal number of hashes. In
// the case where this number exceeds the max_ring_size, it's scaled back down
// to fit.
const size_t ring_size_cap = args.GetInt(GRPC_ARG_RING_HASH_LB_RING_SIZE_CAP)
.value_or(kRingSizeCapDefault);
const size_t min_ring_size = std::min(config->min_ring_size(), ring_size_cap);
const size_t max_ring_size = std::min(config->max_ring_size(), ring_size_cap);
const double scale = std::min(
std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight,
static_cast<double>(max_ring_size));
// Reserve memory for the entire ring up front.
const uint64_t ring_size = std::ceil(scale);
ring_.reserve(ring_size);
// Populate the hash ring by walking through the (host, weight) pairs in
// normalized_host_weights, and generating (scale * weight) hashes for each
// host. Since these aren't necessarily whole numbers, we maintain running
// sums -- current_hashes and target_hashes -- which allows us to populate the
// ring in a mostly stable way.
absl::InlinedVector<char, 196> hash_key_buffer;
double current_hashes = 0.0;
double target_hashes = 0.0;
uint64_t min_hashes_per_host = ring_size;
uint64_t max_hashes_per_host = 0;
for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
const std::string& address_string = address_weights[i].address;
hash_key_buffer.assign(address_string.begin(), address_string.end());
hash_key_buffer.emplace_back('_');
auto offset_start = hash_key_buffer.end();
target_hashes += scale * address_weights[i].normalized_weight;
size_t count = 0;
while (current_hashes < target_hashes) {
const std::string count_str = absl::StrCat(count);
hash_key_buffer.insert(offset_start, count_str.begin(), count_str.end());
absl::string_view hash_key(hash_key_buffer.data(),
hash_key_buffer.size());
const uint64_t hash = XXH64(hash_key.data(), hash_key.size(), 0);
ring_.push_back({hash, i});
++count;
++current_hashes;
hash_key_buffer.erase(offset_start, hash_key_buffer.end());
}
min_hashes_per_host =
std::min(static_cast<uint64_t>(i), min_hashes_per_host);
max_hashes_per_host =
std::max(static_cast<uint64_t>(i), max_hashes_per_host);
}
std::sort(ring_.begin(), ring_.end(),
[](const RingEntry& lhs, const RingEntry& rhs) -> bool {
return lhs.hash < rhs.hash;
});
}
//
// RingHash::RingHashSubchannelList
//
RingHash::RingHashSubchannelList::RingHashSubchannelList(
RingHash* policy, ServerAddressList addresses, const ChannelArgs& args)
: SubchannelList(policy,
(GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)
? "RingHashSubchannelList"
: nullptr),
std::move(addresses), policy->channel_control_helper(),
args),
num_idle_(num_subchannels()) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
// Construct the ring.
ring_ = MakeRefCounted<Ring>(policy->config_.get(), this, args);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO,
"[RH %p] created subchannel list %p with %" PRIuPTR " ring entries",
policy, this, ring_->ring().size());
}
}
void RingHash::RingHashSubchannelList::UpdateStateCountersLocked(
grpc_connectivity_state old_state, grpc_connectivity_state new_state) {
if (old_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(num_idle_ > 0);
--num_idle_;
} else if (old_state == GRPC_CHANNEL_READY) {
GPR_ASSERT(num_ready_ > 0);
--num_ready_;
} else if (old_state == GRPC_CHANNEL_CONNECTING) {
GPR_ASSERT(num_connecting_ > 0);
--num_connecting_;
} else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(num_transient_failure_ > 0);
--num_transient_failure_;
}
GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
if (new_state == GRPC_CHANNEL_IDLE) {
++num_idle_;
} else if (new_state == GRPC_CHANNEL_READY) {
++num_ready_;
} else if (new_state == GRPC_CHANNEL_CONNECTING) {
++num_connecting_;
} else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++num_transient_failure_;
}
}
void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked(
size_t index, bool connection_attempt_complete, absl::Status status) {
RingHash* p = static_cast<RingHash*>(policy());
// If this is latest_pending_subchannel_list_, then swap it into
// subchannel_list_ as soon as we get the initial connectivity state
// report for every subchannel in the list.
if (p->latest_pending_subchannel_list_.get() == this &&
AllSubchannelsSeenInitialState()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] replacing subchannel list %p with %p", p,
p->subchannel_list_.get(), this);
}
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// Only set connectivity state if this is the current subchannel list.
if (p->subchannel_list_.get() != this) return;
// The overall aggregation rules here are:
// 1. If there is at least one subchannel in READY state, report READY.
// 2. If there are 2 or more subchannels in TRANSIENT_FAILURE state, report
// TRANSIENT_FAILURE.
// 3. If there is at least one subchannel in CONNECTING state, report
// CONNECTING.
// 4. If there is one subchannel in TRANSIENT_FAILURE state and there is
// more than one subchannel, report CONNECTING.
// 5. If there is at least one subchannel in IDLE state, report IDLE.
// 6. Otherwise, report TRANSIENT_FAILURE.
//
// We set start_connection_attempt to true if we match rules 2, 3, or 6.
grpc_connectivity_state state;
bool start_connection_attempt = false;
if (num_ready_ > 0) {
state = GRPC_CHANNEL_READY;
} else if (num_transient_failure_ >= 2) {
state = GRPC_CHANNEL_TRANSIENT_FAILURE;
start_connection_attempt = true;
} else if (num_connecting_ > 0) {
state = GRPC_CHANNEL_CONNECTING;
} else if (num_transient_failure_ == 1 && num_subchannels() > 1) {
state = GRPC_CHANNEL_CONNECTING;
start_connection_attempt = true;
} else if (num_idle_ > 0) {
state = GRPC_CHANNEL_IDLE;
} else {
state = GRPC_CHANNEL_TRANSIENT_FAILURE;
start_connection_attempt = true;
}
// In TRANSIENT_FAILURE, report the last reported failure.
// Otherwise, report OK.
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
if (!status.ok()) {
last_failure_ = absl::UnavailableError(absl::StrCat(
"no reachable subchannels; last error: ", status.ToString()));
}
status = last_failure_;
} else {
status = absl::OkStatus();
}
// Generate new picker and return it to the channel.
// Note that we use our own picker regardless of connectivity state.
p->channel_control_helper()->UpdateState(
state, status,
MakeRefCounted<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"), this));
// While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
// not be getting any pick requests from the priority policy.
// However, because the ring_hash policy does not attempt to
// reconnect to subchannels unless it is getting pick requests,
// it will need special handling to ensure that it will eventually
// recover from TRANSIENT_FAILURE state once the problem is resolved.
// Specifically, it will make sure that it is attempting to connect to
// at least one subchannel at any given time. After a given subchannel
// fails a connection attempt, it will move on to the next subchannel
// in the ring. It will keep doing this until one of the subchannels
// successfully connects, at which point it will report READY and stop
// proactively trying to connect. The policy will remain in
// TRANSIENT_FAILURE until at least one subchannel becomes connected,
// even if subchannels are in state CONNECTING during that time.
//
// Note that we do the same thing when the policy is in state
// CONNECTING, just to ensure that we don't remain in CONNECTING state
// indefinitely if there are no new picks coming in.
if (internally_triggered_connection_index_.has_value() &&
*internally_triggered_connection_index_ == index &&
connection_attempt_complete) {
internally_triggered_connection_index_.reset();
}
if (start_connection_attempt &&
!internally_triggered_connection_index_.has_value()) {
size_t next_index = (index + 1) % num_subchannels();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO,
"[RH %p] triggering internal connection attempt for subchannel "
"%p, subchannel_list %p (index %" PRIuPTR " of %" PRIuPTR ")",
p, subchannel(next_index)->subchannel(), this, next_index,
num_subchannels());
}
internally_triggered_connection_index_ = next_index;
subchannel(next_index)->subchannel()->RequestConnection();
}
}
//
// RingHash::RingHashSubchannelData
//
void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) {
RingHash* p = static_cast<RingHash*>(subchannel_list()->policy());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(
GPR_INFO,
"[RH %p] connectivity changed for subchannel %p, subchannel_list %p "
"(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
p, subchannel(), subchannel_list(), Index(),
subchannel_list()->num_subchannels(),
ConnectivityStateName(logical_connectivity_state_),
ConnectivityStateName(new_state));
}
GPR_ASSERT(subchannel() != nullptr);
// If this is not the initial state notification and the new state is
// TRANSIENT_FAILURE or IDLE, re-resolve.
// Note that we don't want to do this on the initial state notification,
// because that would result in an endless loop of re-resolution.
if (old_state.has_value() && (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
new_state == GRPC_CHANNEL_IDLE)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO,
"[RH %p] Subchannel %p reported %s; requesting re-resolution", p,
subchannel(), ConnectivityStateName(new_state));
}
p->channel_control_helper()->RequestReresolution();
}
const bool connection_attempt_complete = new_state != GRPC_CHANNEL_CONNECTING;
// Decide what state to report for the purposes of aggregation and
// picker behavior.
// If the last recorded state was TRANSIENT_FAILURE, ignore the change
// unless the new state is READY (or TF again, in which case we need
// to update the status).
if (logical_connectivity_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE ||
new_state == GRPC_CHANNEL_READY ||
new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// Update state counters used for aggregation.
subchannel_list()->UpdateStateCountersLocked(logical_connectivity_state_,
new_state);
// Update logical state.
logical_connectivity_state_ = new_state;
logical_connectivity_status_ = connectivity_status();
}
// Update the RH policy's connectivity state, creating new picker and new
// ring.
subchannel_list()->UpdateRingHashConnectivityStateLocked(
Index(), connection_attempt_complete, logical_connectivity_status_);
}
//
// RingHash
//
RingHash::RingHash(Args args) : LoadBalancingPolicy(std::move(args)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] Created", this);
}
}
RingHash::~RingHash() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] Destroying Ring Hash policy", this);
}
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
}
void RingHash::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] Shutting down", this);
}
shutdown_ = true;
subchannel_list_.reset();
latest_pending_subchannel_list_.reset();
}
void RingHash::ResetBackoffLocked() {
subchannel_list_->ResetBackoffLocked();
if (latest_pending_subchannel_list_ != nullptr) {
latest_pending_subchannel_list_->ResetBackoffLocked();
}
}
absl::Status RingHash::UpdateLocked(UpdateArgs args) {
config_ = std::move(args.config);
ServerAddressList addresses;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] received update with %" PRIuPTR " addresses",
this, args.addresses->size());
}
addresses = *std::move(args.addresses);
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s",
this, args.addresses.status().ToString().c_str());
}
// If we already have a subchannel list, then keep using the existing
// list, but still report back that the update was not accepted.
if (subchannel_list_ != nullptr) return args.addresses.status();
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace) &&
latest_pending_subchannel_list_ != nullptr) {
gpr_log(GPR_INFO, "[RH %p] replacing latest pending subchannel list %p",
this, latest_pending_subchannel_list_.get());
}
latest_pending_subchannel_list_ = MakeRefCounted<RingHashSubchannelList>(
this, std::move(addresses), args.args);
latest_pending_subchannel_list_->StartWatchingLocked();
// If we have no existing list or the new list is empty, immediately
// promote the new list.
// Otherwise, do nothing; the new list will be promoted when the
// initial subchannel states are reported.
if (subchannel_list_ == nullptr ||
latest_pending_subchannel_list_->num_subchannels() == 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace) &&
subchannel_list_ != nullptr) {
gpr_log(GPR_INFO,
"[RH %p] empty address list, replacing subchannel list %p", this,
subchannel_list_.get());
}
subchannel_list_ = std::move(latest_pending_subchannel_list_);
// If the new list is empty, report TRANSIENT_FAILURE.
if (subchannel_list_->num_subchannels() == 0) {
absl::Status status =
args.addresses.ok()
? absl::UnavailableError(
absl::StrCat("empty address list: ", args.resolution_note))
: args.addresses.status();
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
return status;
}
// Otherwise, report IDLE.
subchannel_list_->UpdateRingHashConnectivityStateLocked(
/*index=*/0, /*connection_attempt_complete=*/false, absl::OkStatus());
}
return absl::OkStatus();
}
//
// factory
//
class RingHashFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<RingHash>(std::move(args));
}
absl::string_view name() const override { return kRingHash; }
absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json& json) const override {
auto config = LoadFromJson<RingHashConfig>(
json, JsonArgs(), "errors validating ring_hash LB policy config");
if (!config.ok()) return config.status();
return MakeRefCounted<RingHashLbConfig>(config->min_ring_size,
config->max_ring_size);
}
};
} // namespace
void RegisterRingHashLbPolicy(CoreConfiguration::Builder* builder) {
builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
std::make_unique<RingHashFactory>());
}
} // namespace grpc_core