blob: d3c3766731d0e3930494ea380220b5ef3e5d15e6 [file] [log] [blame]
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <inttypes.h>
#include <string.h>
#include <algorithm>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/grpc.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h"
#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
namespace {
//
// pick_first LB policy
//
constexpr absl::string_view kPickFirst = "pick_first";
class PickFirst : public LoadBalancingPolicy {
public:
explicit PickFirst(Args args);
absl::string_view name() const override { return kPickFirst; }
absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
private:
~PickFirst() override;
class PickFirstSubchannelList;
class PickFirstSubchannelData
: public SubchannelData<PickFirstSubchannelList,
PickFirstSubchannelData> {
public:
PickFirstSubchannelData(
SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>*
subchannel_list,
const ServerAddress& address,
RefCountedPtr<SubchannelInterface> subchannel)
: SubchannelData(subchannel_list, address, std::move(subchannel)) {}
void ProcessConnectivityChangeLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) override;
// Processes the connectivity change to READY for an unselected subchannel.
void ProcessUnselectedReadyLocked();
};
class PickFirstSubchannelList
: public SubchannelList<PickFirstSubchannelList,
PickFirstSubchannelData> {
public:
PickFirstSubchannelList(PickFirst* policy, ServerAddressList addresses,
const ChannelArgs& args)
: SubchannelList(policy,
(GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)
? "PickFirstSubchannelList"
: nullptr),
std::move(addresses), policy->channel_control_helper(),
args) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
// Note that we do not start trying to connect to any subchannel here,
// since we will wait until we see the initial connectivity state for all
// subchannels before doing that.
}
~PickFirstSubchannelList() override {
PickFirst* p = static_cast<PickFirst*>(policy());
p->Unref(DEBUG_LOCATION, "subchannel_list");
}
bool in_transient_failure() const { return in_transient_failure_; }
void set_in_transient_failure(bool in_transient_failure) {
in_transient_failure_ = in_transient_failure;
}
size_t attempting_index() const { return attempting_index_; }
void set_attempting_index(size_t index) { attempting_index_ = index; }
private:
std::shared_ptr<WorkSerializer> work_serializer() const override {
return static_cast<PickFirst*>(policy())->work_serializer();
}
bool in_transient_failure_ = false;
size_t attempting_index_ = 0;
};
class Picker : public SubchannelPicker {
public:
explicit Picker(RefCountedPtr<SubchannelInterface> subchannel)
: subchannel_(std::move(subchannel)) {}
PickResult Pick(PickArgs /*args*/) override {
return PickResult::Complete(subchannel_);
}
private:
RefCountedPtr<SubchannelInterface> subchannel_;
};
void ShutdownLocked() override;
void AttemptToConnectUsingLatestUpdateArgsLocked();
// Lateset update args.
UpdateArgs latest_update_args_;
// All our subchannels.
RefCountedPtr<PickFirstSubchannelList> subchannel_list_;
// Latest pending subchannel list.
RefCountedPtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
// Selected subchannel in \a subchannel_list_.
PickFirstSubchannelData* selected_ = nullptr;
// Are we in IDLE state?
bool idle_ = false;
// Are we shut down?
bool shutdown_ = false;
};
PickFirst::PickFirst(Args args) : LoadBalancingPolicy(std::move(args)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Pick First %p created.", this);
}
}
PickFirst::~PickFirst() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Destroying Pick First %p", this);
}
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
}
void PickFirst::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
}
shutdown_ = true;
subchannel_list_.reset();
latest_pending_subchannel_list_.reset();
}
void PickFirst::ExitIdleLocked() {
if (shutdown_) return;
if (idle_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Pick First %p exiting idle", this);
}
idle_ = false;
AttemptToConnectUsingLatestUpdateArgsLocked();
}
}
void PickFirst::ResetBackoffLocked() {
if (subchannel_list_ != nullptr) subchannel_list_->ResetBackoffLocked();
if (latest_pending_subchannel_list_ != nullptr) {
latest_pending_subchannel_list_->ResetBackoffLocked();
}
}
void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
// Create a subchannel list from latest_update_args_.
ServerAddressList addresses;
if (latest_update_args_.addresses.ok()) {
addresses = *latest_update_args_.addresses;
}
// Replace latest_pending_subchannel_list_.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) &&
latest_pending_subchannel_list_ != nullptr) {
gpr_log(GPR_INFO,
"[PF %p] Shutting down previous pending subchannel list %p", this,
latest_pending_subchannel_list_.get());
}
latest_pending_subchannel_list_ = MakeRefCounted<PickFirstSubchannelList>(
this, std::move(addresses), latest_update_args_.args);
latest_pending_subchannel_list_->StartWatchingLocked();
// Empty update or no valid subchannels. Put the channel in
// TRANSIENT_FAILURE and request re-resolution.
if (latest_pending_subchannel_list_->num_subchannels() == 0) {
absl::Status status =
latest_update_args_.addresses.ok()
? absl::UnavailableError(absl::StrCat(
"empty address list: ", latest_update_args_.resolution_note))
: latest_update_args_.addresses.status();
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
channel_control_helper()->RequestReresolution();
}
// If the new update is empty or we don't yet have a selected subchannel in
// the current list, replace the current subchannel list immediately.
if (latest_pending_subchannel_list_->num_subchannels() == 0 ||
selected_ == nullptr) {
selected_ = nullptr;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) &&
subchannel_list_ != nullptr) {
gpr_log(GPR_INFO, "[PF %p] Shutting down previous subchannel list %p",
this, subchannel_list_.get());
}
subchannel_list_ = std::move(latest_pending_subchannel_list_);
}
}
absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
if (args.addresses.ok()) {
gpr_log(GPR_INFO,
"Pick First %p received update with %" PRIuPTR " addresses", this,
args.addresses->size());
} else {
gpr_log(GPR_INFO, "Pick First %p received update with address error: %s",
this, args.addresses.status().ToString().c_str());
}
}
// Add GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
args.args = args.args.Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
// Set return status based on the address list.
absl::Status status;
if (!args.addresses.ok()) {
status = args.addresses.status();
} else if (args.addresses->empty()) {
status = absl::UnavailableError("address list must not be empty");
}
// TODO(roth): This is a hack to disable outlier_detection when used
// with pick_first, for the reasons described in
// https://github.com/grpc/grpc/issues/32967. Remove this when
// implementing the dualstack design.
if (args.addresses.ok()) {
ServerAddressList addresses;
for (const auto& address : *args.addresses) {
addresses.emplace_back(address.WithAttribute(
DisableOutlierDetectionAttribute::kName,
std::make_unique<DisableOutlierDetectionAttribute>()));
}
args.addresses = std::move(addresses);
}
// If the update contains a resolver error and we have a previous update
// that was not a resolver error, keep using the previous addresses.
if (!args.addresses.ok() && latest_update_args_.config != nullptr) {
args.addresses = std::move(latest_update_args_.addresses);
}
// Update latest_update_args_.
latest_update_args_ = std::move(args);
// If we are not in idle, start connection attempt immediately.
// Otherwise, we defer the attempt into ExitIdleLocked().
if (!idle_) {
AttemptToConnectUsingLatestUpdateArgsLocked();
}
return status;
}
void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
// The notification must be for a subchannel in either the current or
// latest pending subchannel lists.
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
subchannel_list() == p->latest_pending_subchannel_list_.get());
GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
// Handle updates for the currently selected subchannel.
if (p->selected_ == this) {
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p selected subchannel connectivity changed to %s", p,
ConnectivityStateName(new_state));
}
// Any state change is considered to be a failure of the existing
// connection.
// If there is a pending update, switch to the pending update.
if (p->latest_pending_subchannel_list_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p promoting pending subchannel list %p to "
"replace %p",
p, p->latest_pending_subchannel_list_.get(),
p->subchannel_list_.get());
}
p->selected_ = nullptr;
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
// Set our state to that of the pending subchannel list.
if (p->subchannel_list_->in_transient_failure()) {
absl::Status status = absl::UnavailableError(absl::StrCat(
"selected subchannel failed; switching to pending update; "
"last failure: ",
p->subchannel_list_
->subchannel(p->subchannel_list_->num_subchannels())
->connectivity_status()
.ToString()));
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
} else {
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
MakeRefCounted<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
}
return;
}
// If the selected subchannel goes bad, request a re-resolution.
// TODO(qianchengz): We may want to request re-resolution in
// ExitIdleLocked().
p->channel_control_helper()->RequestReresolution();
// TODO(roth): We chould check the connectivity states of all the
// subchannels here, just in case one of them happens to be READY,
// and we could switch to that rather than going IDLE.
// Enter idle.
p->idle_ = true;
p->selected_ = nullptr;
p->subchannel_list_.reset();
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE, absl::Status(),
MakeRefCounted<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
return;
}
// If we get here, there are two possible cases:
// 1. We do not currently have a selected subchannel, and the update is
// for a subchannel in p->subchannel_list_ that we're trying to
// connect to. The goal here is to find a subchannel that we can
// select.
// 2. We do currently have a selected subchannel, and the update is
// for a subchannel in p->latest_pending_subchannel_list_. The
// goal here is to find a subchannel from the update that we can
// select in place of the current one.
// If the subchannel is READY, use it.
if (new_state == GRPC_CHANNEL_READY) {
subchannel_list()->set_in_transient_failure(false);
ProcessUnselectedReadyLocked();
return;
}
// If this is the initial connectivity state notification for this
// subchannel, check to see if it's the last one we were waiting for,
// in which case we start trying to connect to the first subchannel.
// Otherwise, do nothing, since we'll continue to wait until all of
// the subchannels report their state.
if (!old_state.has_value()) {
if (subchannel_list()->AllSubchannelsSeenInitialState()) {
subchannel_list()->subchannel(0)->subchannel()->RequestConnection();
}
return;
}
// Ignore any other updates for subchannels we're not currently trying to
// connect to.
if (Index() != subchannel_list()->attempting_index()) return;
// Otherwise, process connectivity state.
switch (new_state) {
case GRPC_CHANNEL_READY:
// Already handled this case above, so this should not happen.
GPR_UNREACHABLE_CODE(break);
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
size_t next_index = (Index() + 1) % subchannel_list()->num_subchannels();
subchannel_list()->set_attempting_index(next_index);
PickFirstSubchannelData* sd = subchannel_list()->subchannel(next_index);
// If we're tried all subchannels, set state to TRANSIENT_FAILURE.
if (sd->Index() == 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p subchannel list %p failed to connect to "
"all subchannels",
p, subchannel_list());
}
subchannel_list()->set_in_transient_failure(true);
// In case 2, swap to the new subchannel list. This means reporting
// TRANSIENT_FAILURE and dropping the existing (working) connection,
// but we can't ignore what the control plane has told us.
if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p promoting pending subchannel list %p to "
"replace %p",
p, p->latest_pending_subchannel_list_.get(),
p->subchannel_list_.get());
}
p->selected_ = nullptr; // owned by p->subchannel_list_
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// If this is the current subchannel list (either because we were
// in case 1 or because we were in case 2 and just promoted it to
// be the current list), re-resolve and report new state.
if (subchannel_list() == p->subchannel_list_.get()) {
p->channel_control_helper()->RequestReresolution();
absl::Status status = absl::UnavailableError(
absl::StrCat("failed to connect to all addresses; last error: ",
connectivity_status().ToString()));
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
}
}
// If the next subchannel is in IDLE, trigger a connection attempt.
// If it's in READY, we can't get here, because we would already
// have selected the subchannel above.
// If it's already in CONNECTING, we don't need to do this.
// If it's in TRANSIENT_FAILURE, then we will trigger the
// connection attempt later when it reports IDLE.
auto sd_state = sd->connectivity_state();
if (sd_state.has_value() && *sd_state == GRPC_CHANNEL_IDLE) {
sd->subchannel()->RequestConnection();
}
break;
}
case GRPC_CHANNEL_IDLE: {
subchannel()->RequestConnection();
break;
}
case GRPC_CHANNEL_CONNECTING: {
// Only update connectivity state in case 1, and only if we're not
// already in TRANSIENT_FAILURE.
if (subchannel_list() == p->subchannel_list_.get() &&
!subchannel_list()->in_transient_failure()) {
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
MakeRefCounted<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
}
break;
}
case GRPC_CHANNEL_SHUTDOWN:
GPR_UNREACHABLE_CODE(break);
}
}
void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
// If we get here, there are two possible cases:
// 1. We do not currently have a selected subchannel, and the update is
// for a subchannel in p->subchannel_list_ that we're trying to
// connect to. The goal here is to find a subchannel that we can
// select.
// 2. We do currently have a selected subchannel, and the update is
// for a subchannel in p->latest_pending_subchannel_list_. The
// goal here is to find a subchannel from the update that we can
// select in place of the current one.
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
subchannel_list() == p->latest_pending_subchannel_list_.get());
// Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_.
if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p promoting pending subchannel list %p to "
"replace %p",
p, p->latest_pending_subchannel_list_.get(),
p->subchannel_list_.get());
}
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// Cases 1 and 2.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
}
p->selected_ = this;
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::Status(),
MakeRefCounted<Picker>(subchannel()->Ref()));
for (size_t i = 0; i < subchannel_list()->num_subchannels(); ++i) {
if (i != Index()) {
subchannel_list()->subchannel(i)->ShutdownLocked();
}
}
}
class PickFirstConfig : public LoadBalancingPolicy::Config {
public:
absl::string_view name() const override { return kPickFirst; }
};
//
// factory
//
class PickFirstFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<PickFirst>(std::move(args));
}
absl::string_view name() const override { return kPickFirst; }
absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json& /*json*/) const override {
return MakeRefCounted<PickFirstConfig>();
}
};
} // namespace
void RegisterPickFirstLbPolicy(CoreConfiguration::Builder* builder) {
builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
std::make_unique<PickFirstFactory>());
}
} // namespace grpc_core