blob: 2638423d1c10fe48531b6664dff4a705fd4ad665 [file] [log] [blame]
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/xds/xds_endpoint.h"
#include <stdlib.h>
#include <string.h>
#include <algorithm>
#include <limits>
#include <set>
#include <vector>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/types/optional.h"
#include "envoy/config/core/v3/address.upb.h"
#include "envoy/config/core/v3/base.upb.h"
#include "envoy/config/core/v3/health_check.upb.h"
#include "envoy/config/endpoint/v3/endpoint.upb.h"
#include "envoy/config/endpoint/v3/endpoint.upbdefs.h"
#include "envoy/config/endpoint/v3/endpoint_components.upb.h"
#include "envoy/type/v3/percent.upb.h"
#include "google/protobuf/wrappers.upb.h"
#include "upb/text/encode.h"
#include <grpc/support/log.h>
#include "src/core/ext/xds/upb_utils.h"
#include "src/core/ext/xds/xds_cluster.h"
#include "src/core/ext/xds/xds_health_status.h"
#include "src/core/ext/xds/xds_resource_type.h"
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/iomgr/resolved_address.h"
namespace grpc_core {
//
// XdsEndpointResource
//
std::string XdsEndpointResource::Priority::Locality::ToString() const {
std::vector<std::string> endpoint_strings;
for (const ServerAddress& endpoint : endpoints) {
endpoint_strings.emplace_back(endpoint.ToString());
}
return absl::StrCat("{name=", name->AsHumanReadableString(),
", lb_weight=", lb_weight, ", endpoints=[",
absl::StrJoin(endpoint_strings, ", "), "]}");
}
bool XdsEndpointResource::Priority::operator==(const Priority& other) const {
if (localities.size() != other.localities.size()) return false;
auto it1 = localities.begin();
auto it2 = other.localities.begin();
while (it1 != localities.end()) {
if (*it1->first != *it2->first) return false;
if (it1->second != it2->second) return false;
++it1;
++it2;
}
return true;
}
std::string XdsEndpointResource::Priority::ToString() const {
std::vector<std::string> locality_strings;
locality_strings.reserve(localities.size());
for (const auto& p : localities) {
locality_strings.emplace_back(p.second.ToString());
}
return absl::StrCat("[", absl::StrJoin(locality_strings, ", "), "]");
}
bool XdsEndpointResource::DropConfig::ShouldDrop(
const std::string** category_name) {
for (size_t i = 0; i < drop_category_list_.size(); ++i) {
const auto& drop_category = drop_category_list_[i];
// Generate a random number in [0, 1000000).
const uint32_t random = [&]() {
MutexLock lock(&mu_);
return absl::Uniform<uint32_t>(bit_gen_, 0, 1000000);
}();
if (random < drop_category.parts_per_million) {
*category_name = &drop_category.name;
return true;
}
}
return false;
}
std::string XdsEndpointResource::DropConfig::ToString() const {
std::vector<std::string> category_strings;
for (const DropCategory& category : drop_category_list_) {
category_strings.emplace_back(
absl::StrCat(category.name, "=", category.parts_per_million));
}
return absl::StrCat("{[", absl::StrJoin(category_strings, ", "),
"], drop_all=", drop_all_, "}");
}
std::string XdsEndpointResource::ToString() const {
std::vector<std::string> priority_strings;
for (size_t i = 0; i < priorities.size(); ++i) {
const Priority& priority = priorities[i];
priority_strings.emplace_back(
absl::StrCat("priority ", i, ": ", priority.ToString()));
}
return absl::StrCat("priorities=[", absl::StrJoin(priority_strings, ", "),
"], drop_config=", drop_config->ToString());
}
//
// XdsEndpointResourceType
//
namespace {
void MaybeLogClusterLoadAssignment(
const XdsResourceType::DecodeContext& context,
const envoy_config_endpoint_v3_ClusterLoadAssignment* cla) {
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer) &&
gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
const upb_MessageDef* msg_type =
envoy_config_endpoint_v3_ClusterLoadAssignment_getmsgdef(
context.symtab);
char buf[10240];
upb_TextEncode(cla, msg_type, nullptr, 0, buf, sizeof(buf));
gpr_log(GPR_DEBUG, "[xds_client %p] ClusterLoadAssignment: %s",
context.client, buf);
}
}
absl::optional<ServerAddress> ServerAddressParse(
const envoy_config_endpoint_v3_LbEndpoint* lb_endpoint,
ValidationErrors* errors) {
// health_status
const int32_t health_status =
envoy_config_endpoint_v3_LbEndpoint_health_status(lb_endpoint);
if (!XdsOverrideHostEnabled() &&
health_status != envoy_config_core_v3_UNKNOWN &&
health_status != envoy_config_core_v3_HEALTHY) {
return absl::nullopt;
}
auto status = XdsHealthStatus::FromUpb(health_status);
if (!status.has_value()) return absl::nullopt;
// load_balancing_weight
uint32_t weight = 1;
{
ValidationErrors::ScopedField field(errors, ".load_balancing_weight");
const google_protobuf_UInt32Value* load_balancing_weight =
envoy_config_endpoint_v3_LbEndpoint_load_balancing_weight(lb_endpoint);
if (load_balancing_weight != nullptr) {
weight = google_protobuf_UInt32Value_value(load_balancing_weight);
if (weight == 0) {
errors->AddError("must be greater than 0");
}
}
}
// endpoint
grpc_resolved_address grpc_address;
{
ValidationErrors::ScopedField field(errors, ".endpoint");
const envoy_config_endpoint_v3_Endpoint* endpoint =
envoy_config_endpoint_v3_LbEndpoint_endpoint(lb_endpoint);
if (endpoint == nullptr) {
errors->AddError("field not present");
return absl::nullopt;
}
ValidationErrors::ScopedField field2(errors, ".address");
const envoy_config_core_v3_Address* address =
envoy_config_endpoint_v3_Endpoint_address(endpoint);
if (address == nullptr) {
errors->AddError("field not present");
return absl::nullopt;
}
ValidationErrors::ScopedField field3(errors, ".socket_address");
const envoy_config_core_v3_SocketAddress* socket_address =
envoy_config_core_v3_Address_socket_address(address);
if (socket_address == nullptr) {
errors->AddError("field not present");
return absl::nullopt;
}
std::string address_str = UpbStringToStdString(
envoy_config_core_v3_SocketAddress_address(socket_address));
uint32_t port;
{
ValidationErrors::ScopedField field(errors, ".port_value");
port = envoy_config_core_v3_SocketAddress_port_value(socket_address);
if (GPR_UNLIKELY(port >> 16) != 0) {
errors->AddError("invalid port");
return absl::nullopt;
}
}
auto addr = StringToSockaddr(address_str, port);
if (!addr.ok()) {
errors->AddError(addr.status().message());
} else {
grpc_address = *addr;
}
}
// Convert to ServerAddress.
std::map<const char*, std::unique_ptr<ServerAddress::AttributeInterface>>
attributes;
attributes[ServerAddressWeightAttribute::kServerAddressWeightAttributeKey] =
std::make_unique<ServerAddressWeightAttribute>(weight);
attributes[XdsEndpointHealthStatusAttribute::kKey] =
std::make_unique<XdsEndpointHealthStatusAttribute>(*status);
return ServerAddress(grpc_address, ChannelArgs(), std::move(attributes));
}
struct ParsedLocality {
size_t priority;
XdsEndpointResource::Priority::Locality locality;
};
struct ResolvedAddressLessThan {
bool operator()(const grpc_resolved_address& a1,
const grpc_resolved_address& a2) const {
if (a1.len != a2.len) return a1.len < a2.len;
return memcmp(a1.addr, a2.addr, a1.len) < 0;
}
};
using ResolvedAddressSet =
std::set<grpc_resolved_address, ResolvedAddressLessThan>;
absl::optional<ParsedLocality> LocalityParse(
const envoy_config_endpoint_v3_LocalityLbEndpoints* locality_lb_endpoints,
ResolvedAddressSet* address_set, ValidationErrors* errors) {
const size_t original_error_size = errors->size();
ParsedLocality parsed_locality;
// load_balancing_weight
// If LB weight is not specified or 0, it means this locality is assigned
// no load.
const google_protobuf_UInt32Value* lb_weight =
envoy_config_endpoint_v3_LocalityLbEndpoints_load_balancing_weight(
locality_lb_endpoints);
parsed_locality.locality.lb_weight =
lb_weight != nullptr ? google_protobuf_UInt32Value_value(lb_weight) : 0;
if (parsed_locality.locality.lb_weight == 0) return absl::nullopt;
// locality
const envoy_config_core_v3_Locality* locality =
envoy_config_endpoint_v3_LocalityLbEndpoints_locality(
locality_lb_endpoints);
if (locality == nullptr) {
ValidationErrors::ScopedField field(errors, ".locality");
errors->AddError("field not present");
return absl::nullopt;
}
// region
std::string region =
UpbStringToStdString(envoy_config_core_v3_Locality_region(locality));
// zone
std::string zone =
UpbStringToStdString(envoy_config_core_v3_Locality_zone(locality));
// sub_zone
std::string sub_zone =
UpbStringToStdString(envoy_config_core_v3_Locality_sub_zone(locality));
parsed_locality.locality.name = MakeRefCounted<XdsLocalityName>(
std::move(region), std::move(zone), std::move(sub_zone));
// lb_endpoints
size_t size;
const envoy_config_endpoint_v3_LbEndpoint* const* lb_endpoints =
envoy_config_endpoint_v3_LocalityLbEndpoints_lb_endpoints(
locality_lb_endpoints, &size);
for (size_t i = 0; i < size; ++i) {
ValidationErrors::ScopedField field(errors,
absl::StrCat(".lb_endpoints[", i, "]"));
auto address = ServerAddressParse(lb_endpoints[i], errors);
if (address.has_value()) {
bool inserted = address_set->insert(address->address()).second;
if (!inserted) {
errors->AddError(absl::StrCat(
"duplicate endpoint address \"",
grpc_sockaddr_to_uri(&address->address()).value_or("<unknown>"),
"\""));
}
parsed_locality.locality.endpoints.push_back(std::move(*address));
}
}
// priority
parsed_locality.priority =
envoy_config_endpoint_v3_LocalityLbEndpoints_priority(
locality_lb_endpoints);
// Return result.
if (original_error_size != errors->size()) return absl::nullopt;
return parsed_locality;
}
void DropParseAndAppend(
const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload*
drop_overload,
XdsEndpointResource::DropConfig* drop_config, ValidationErrors* errors) {
// category
std::string category = UpbStringToStdString(
envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload_category(
drop_overload));
if (category.empty()) {
ValidationErrors::ScopedField field(errors, ".category");
errors->AddError("empty drop category name");
}
// drop_percentage
uint32_t numerator;
{
ValidationErrors::ScopedField field(errors, ".drop_percentage");
const envoy_type_v3_FractionalPercent* drop_percentage =
envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload_drop_percentage(
drop_overload);
if (drop_percentage == nullptr) {
errors->AddError("field not present");
return;
}
numerator = envoy_type_v3_FractionalPercent_numerator(drop_percentage);
{
ValidationErrors::ScopedField field(errors, ".denominator");
const int denominator =
envoy_type_v3_FractionalPercent_denominator(drop_percentage);
// Normalize to million.
switch (denominator) {
case envoy_type_v3_FractionalPercent_HUNDRED:
numerator *= 10000;
break;
case envoy_type_v3_FractionalPercent_TEN_THOUSAND:
numerator *= 100;
break;
case envoy_type_v3_FractionalPercent_MILLION:
break;
default:
errors->AddError("unknown denominator type");
}
}
// Cap numerator to 1000000.
numerator = std::min(numerator, 1000000u);
}
// Add category.
drop_config->AddCategory(std::move(category), numerator);
}
absl::StatusOr<XdsEndpointResource> EdsResourceParse(
const XdsResourceType::DecodeContext& /*context*/,
const envoy_config_endpoint_v3_ClusterLoadAssignment*
cluster_load_assignment) {
ValidationErrors errors;
XdsEndpointResource eds_resource;
// endpoints
{
ValidationErrors::ScopedField field(&errors, "endpoints");
ResolvedAddressSet address_set;
size_t locality_size;
const envoy_config_endpoint_v3_LocalityLbEndpoints* const* endpoints =
envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints(
cluster_load_assignment, &locality_size);
for (size_t i = 0; i < locality_size; ++i) {
ValidationErrors::ScopedField field(&errors, absl::StrCat("[", i, "]"));
auto parsed_locality = LocalityParse(endpoints[i], &address_set, &errors);
if (parsed_locality.has_value()) {
GPR_ASSERT(parsed_locality->locality.lb_weight != 0);
// Make sure prorities is big enough. Note that they might not
// arrive in priority order.
if (eds_resource.priorities.size() < parsed_locality->priority + 1) {
eds_resource.priorities.resize(parsed_locality->priority + 1);
}
auto& locality_map =
eds_resource.priorities[parsed_locality->priority].localities;
auto it = locality_map.find(parsed_locality->locality.name.get());
if (it != locality_map.end()) {
errors.AddError(absl::StrCat(
"duplicate locality ",
parsed_locality->locality.name->AsHumanReadableString(),
" found in priority ", parsed_locality->priority));
} else {
locality_map.emplace(parsed_locality->locality.name.get(),
std::move(parsed_locality->locality));
}
}
}
for (size_t i = 0; i < eds_resource.priorities.size(); ++i) {
const auto& priority = eds_resource.priorities[i];
if (priority.localities.empty()) {
errors.AddError(absl::StrCat("priority ", i, " empty"));
} else {
// Check that the sum of the locality weights in this priority
// does not exceed the max value for a uint32.
uint64_t total_weight = 0;
for (const auto& p : priority.localities) {
total_weight += p.second.lb_weight;
if (total_weight > std::numeric_limits<uint32_t>::max()) {
errors.AddError(
absl::StrCat("sum of locality weights for priority ", i,
" exceeds uint32 max"));
break;
}
}
}
}
}
// policy
eds_resource.drop_config = MakeRefCounted<XdsEndpointResource::DropConfig>();
const auto* policy = envoy_config_endpoint_v3_ClusterLoadAssignment_policy(
cluster_load_assignment);
if (policy != nullptr) {
ValidationErrors::ScopedField field(&errors, "policy");
size_t drop_size;
const auto* const* drop_overload =
envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_drop_overloads(
policy, &drop_size);
for (size_t i = 0; i < drop_size; ++i) {
ValidationErrors::ScopedField field(
&errors, absl::StrCat(".drop_overloads[", i, "]"));
DropParseAndAppend(drop_overload[i], eds_resource.drop_config.get(),
&errors);
}
}
// Return result.
if (!errors.ok()) {
return errors.status(absl::StatusCode::kInvalidArgument,
"errors parsing EDS resource");
}
return eds_resource;
}
} // namespace
XdsResourceType::DecodeResult XdsEndpointResourceType::Decode(
const XdsResourceType::DecodeContext& context,
absl::string_view serialized_resource) const {
DecodeResult result;
// Parse serialized proto.
auto* resource = envoy_config_endpoint_v3_ClusterLoadAssignment_parse(
serialized_resource.data(), serialized_resource.size(), context.arena);
if (resource == nullptr) {
result.resource = absl::InvalidArgumentError(
"Can't parse ClusterLoadAssignment resource.");
return result;
}
MaybeLogClusterLoadAssignment(context, resource);
// Validate resource.
result.name = UpbStringToStdString(
envoy_config_endpoint_v3_ClusterLoadAssignment_cluster_name(resource));
auto eds_resource = EdsResourceParse(context, resource);
if (!eds_resource.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_ERROR, "[xds_client %p] invalid ClusterLoadAssignment %s: %s",
context.client, result.name->c_str(),
eds_resource.status().ToString().c_str());
}
result.resource = eds_resource.status();
} else {
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_INFO, "[xds_client %p] parsed ClusterLoadAssignment %s: %s",
context.client, result.name->c_str(),
eds_resource->ToString().c_str());
}
result.resource =
std::make_unique<XdsEndpointResource>(std::move(*eds_resource));
}
return result;
}
} // namespace grpc_core