blob: 9691889cf98d3d64f3faef6d28e702f8e34dd2c3 [file] [log] [blame]
//
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "src/core/lib/http/httpcli.h"
#include <limits.h>
#include <initializer_list>
#include <string>
#include <utility>
#include "absl/functional/bind_front.h"
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/http/format_request.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/security_connector/security_connector.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/handshaker_registry.h"
#include "src/core/lib/transport/tcp_connect_handshaker.h"
namespace grpc_core {
namespace {
grpc_httpcli_get_override g_get_override;
grpc_httpcli_post_override g_post_override;
grpc_httpcli_put_override g_put_override;
void (*g_test_only_on_handshake_done_intercept)(HttpRequest* req);
} // namespace
OrphanablePtr<HttpRequest> HttpRequest::Get(
URI uri, const grpc_channel_args* channel_args,
grpc_polling_entity* pollent, const grpc_http_request* request,
Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
RefCountedPtr<grpc_channel_credentials> channel_creds) {
absl::optional<std::function<void()>> test_only_generate_response;
if (g_get_override != nullptr) {
test_only_generate_response = [request, uri, deadline, on_done,
response]() {
// Note that capturing request here assumes it will remain alive
// until after Start is called. This avoids making a copy as this
// code path is only used for test mocks.
g_get_override(request, uri.authority().c_str(), uri.path().c_str(),
deadline, on_done, response);
};
}
std::string name =
absl::StrFormat("HTTP:GET:%s:%s", uri.authority(), uri.path());
const grpc_slice request_text = grpc_httpcli_format_get_request(
request, uri.authority().c_str(), uri.path().c_str());
return MakeOrphanable<HttpRequest>(
std::move(uri), request_text, response, deadline, channel_args, on_done,
pollent, name.c_str(), std::move(test_only_generate_response),
std::move(channel_creds));
}
OrphanablePtr<HttpRequest> HttpRequest::Post(
URI uri, const grpc_channel_args* channel_args,
grpc_polling_entity* pollent, const grpc_http_request* request,
Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
RefCountedPtr<grpc_channel_credentials> channel_creds) {
absl::optional<std::function<void()>> test_only_generate_response;
if (g_post_override != nullptr) {
test_only_generate_response = [request, uri, deadline, on_done,
response]() {
g_post_override(request, uri.authority().c_str(), uri.path().c_str(),
request->body, request->body_length, deadline, on_done,
response);
};
}
std::string name =
absl::StrFormat("HTTP:POST:%s:%s", uri.authority(), uri.path());
const grpc_slice request_text = grpc_httpcli_format_post_request(
request, uri.authority().c_str(), uri.path().c_str());
return MakeOrphanable<HttpRequest>(
std::move(uri), request_text, response, deadline, channel_args, on_done,
pollent, name.c_str(), std::move(test_only_generate_response),
std::move(channel_creds));
}
OrphanablePtr<HttpRequest> HttpRequest::Put(
URI uri, const grpc_channel_args* channel_args,
grpc_polling_entity* pollent, const grpc_http_request* request,
Timestamp deadline, grpc_closure* on_done, grpc_http_response* response,
RefCountedPtr<grpc_channel_credentials> channel_creds) {
absl::optional<std::function<void()>> test_only_generate_response;
if (g_put_override != nullptr) {
test_only_generate_response = [request, uri, deadline, on_done,
response]() {
g_put_override(request, uri.authority().c_str(), uri.path().c_str(),
request->body, request->body_length, deadline, on_done,
response);
};
}
std::string name =
absl::StrFormat("HTTP:PUT:%s:%s", uri.authority(), uri.path());
const grpc_slice request_text = grpc_httpcli_format_put_request(
request, uri.authority().c_str(), uri.path().c_str());
return MakeOrphanable<HttpRequest>(
std::move(uri), request_text, response, deadline, channel_args, on_done,
pollent, name.c_str(), std::move(test_only_generate_response),
std::move(channel_creds));
}
void HttpRequest::SetOverride(grpc_httpcli_get_override get,
grpc_httpcli_post_override post,
grpc_httpcli_put_override put) {
g_get_override = get;
g_post_override = post;
g_put_override = put;
}
void HttpRequest::TestOnlySetOnHandshakeDoneIntercept(
void (*intercept)(HttpRequest* req)) {
g_test_only_on_handshake_done_intercept = intercept;
}
HttpRequest::HttpRequest(
URI uri, const grpc_slice& request_text, grpc_http_response* response,
Timestamp deadline, const grpc_channel_args* channel_args,
grpc_closure* on_done, grpc_polling_entity* pollent, const char* name,
absl::optional<std::function<void()>> test_only_generate_response,
RefCountedPtr<grpc_channel_credentials> channel_creds)
: uri_(std::move(uri)),
request_text_(request_text),
deadline_(deadline),
channel_args_(CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(channel_args)
.ToC()
.release()),
channel_creds_(std::move(channel_creds)),
on_done_(on_done),
resource_quota_(ResourceQuotaFromChannelArgs(channel_args_)),
pollent_(pollent),
pollset_set_(grpc_pollset_set_create()),
test_only_generate_response_(std::move(test_only_generate_response)),
resolver_(GetDNSResolver()) {
grpc_http_parser_init(&parser_, GRPC_HTTP_RESPONSE, response);
grpc_slice_buffer_init(&incoming_);
grpc_slice_buffer_init(&outgoing_);
grpc_iomgr_register_object(&iomgr_obj_, name);
GRPC_CLOSURE_INIT(&on_read_, OnRead, this, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&continue_on_read_after_schedule_on_exec_ctx_,
ContinueOnReadAfterScheduleOnExecCtx, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&done_write_, DoneWrite, this, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&continue_done_write_after_schedule_on_exec_ctx_,
ContinueDoneWriteAfterScheduleOnExecCtx, this,
grpc_schedule_on_exec_ctx);
GPR_ASSERT(pollent);
grpc_polling_entity_add_to_pollset_set(pollent, pollset_set_);
}
HttpRequest::~HttpRequest() {
grpc_channel_args_destroy(channel_args_);
grpc_http_parser_destroy(&parser_);
if (own_endpoint_ && ep_ != nullptr) {
grpc_endpoint_destroy(ep_);
}
CSliceUnref(request_text_);
grpc_iomgr_unregister_object(&iomgr_obj_);
grpc_slice_buffer_destroy(&incoming_);
grpc_slice_buffer_destroy(&outgoing_);
grpc_pollset_set_destroy(pollset_set_);
}
void HttpRequest::Start() {
MutexLock lock(&mu_);
if (test_only_generate_response_.has_value()) {
test_only_generate_response_.value()();
return;
}
Ref().release(); // ref held by pending DNS resolution
dns_request_handle_ = resolver_->LookupHostname(
absl::bind_front(&HttpRequest::OnResolved, this), uri_.authority(),
uri_.scheme(), kDefaultDNSRequestTimeout, pollset_set_,
/*name_server=*/"");
}
void HttpRequest::Orphan() {
{
MutexLock lock(&mu_);
GPR_ASSERT(!cancelled_);
cancelled_ = true;
// cancel potentially pending DNS resolution.
if (dns_request_handle_.has_value() &&
resolver_->Cancel(dns_request_handle_.value())) {
Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
Unref();
}
if (handshake_mgr_ != nullptr) {
// Shutdown will cancel any ongoing tcp connect.
handshake_mgr_->Shutdown(
GRPC_ERROR_CREATE("HTTP request cancelled during handshake"));
}
if (own_endpoint_ && ep_ != nullptr) {
grpc_endpoint_shutdown(ep_, GRPC_ERROR_CREATE("HTTP request cancelled"));
}
}
Unref();
}
void HttpRequest::AppendError(grpc_error_handle error) {
if (overall_error_.ok()) {
overall_error_ = GRPC_ERROR_CREATE("Failed HTTP/1 client request");
}
const grpc_resolved_address* addr = &addresses_[next_address_ - 1];
auto addr_text = grpc_sockaddr_to_uri(addr);
overall_error_ = grpc_error_add_child(
overall_error_,
grpc_error_set_str(
error, StatusStrProperty::kTargetAddress,
addr_text.ok() ? addr_text.value() : addr_text.status().ToString()));
}
void HttpRequest::OnReadInternal(grpc_error_handle error) {
for (size_t i = 0; i < incoming_.count; i++) {
if (GRPC_SLICE_LENGTH(incoming_.slices[i])) {
have_read_byte_ = 1;
grpc_error_handle err =
grpc_http_parser_parse(&parser_, incoming_.slices[i], nullptr);
if (!err.ok()) {
Finish(err);
return;
}
}
}
if (cancelled_) {
Finish(GRPC_ERROR_CREATE_REFERENCING("HTTP1 request cancelled during read",
&overall_error_, 1));
} else if (error.ok()) {
DoRead();
} else if (!have_read_byte_) {
NextAddress(error);
} else {
Finish(grpc_http_parser_eof(&parser_));
}
}
void HttpRequest::ContinueDoneWriteAfterScheduleOnExecCtx(
void* arg, grpc_error_handle error) {
RefCountedPtr<HttpRequest> req(static_cast<HttpRequest*>(arg));
MutexLock lock(&req->mu_);
if (error.ok() && !req->cancelled_) {
req->OnWritten();
} else {
req->NextAddress(error);
}
}
void HttpRequest::StartWrite() {
CSliceRef(request_text_);
grpc_slice_buffer_add(&outgoing_, request_text_);
Ref().release(); // ref held by pending write
grpc_endpoint_write(ep_, &outgoing_, &done_write_, nullptr,
/*max_frame_size=*/INT_MAX);
}
void HttpRequest::OnHandshakeDone(void* arg, grpc_error_handle error) {
auto* args = static_cast<HandshakerArgs*>(arg);
RefCountedPtr<HttpRequest> req(static_cast<HttpRequest*>(args->user_data));
if (g_test_only_on_handshake_done_intercept != nullptr) {
// Run this testing intercept before the lock so that it has a chance to
// do things like calling Orphan on the request
g_test_only_on_handshake_done_intercept(req.get());
}
MutexLock lock(&req->mu_);
req->own_endpoint_ = true;
if (!error.ok()) {
req->handshake_mgr_.reset();
req->NextAddress(error);
return;
}
// Handshake completed, so we own fields in args
grpc_slice_buffer_destroy(args->read_buffer);
gpr_free(args->read_buffer);
req->ep_ = args->endpoint;
req->handshake_mgr_.reset();
if (req->cancelled_) {
req->NextAddress(
GRPC_ERROR_CREATE("HTTP request cancelled during handshake"));
return;
}
req->StartWrite();
}
void HttpRequest::DoHandshake(const grpc_resolved_address* addr) {
// Create the security connector using the credentials and target name.
ChannelArgs args = ChannelArgs::FromC(channel_args_);
RefCountedPtr<grpc_channel_security_connector> sc =
channel_creds_->create_security_connector(
nullptr /*call_creds*/, uri_.authority().c_str(), &args);
if (sc == nullptr) {
Finish(GRPC_ERROR_CREATE_REFERENCING("failed to create security connector",
&overall_error_, 1));
return;
}
absl::StatusOr<std::string> address = grpc_sockaddr_to_uri(addr);
if (!address.ok()) {
Finish(GRPC_ERROR_CREATE_REFERENCING("Failed to extract URI from address",
&overall_error_, 1));
return;
}
args = args.SetObject(std::move(sc))
.Set(GRPC_ARG_TCP_HANDSHAKER_RESOLVED_ADDRESS, address.value());
// Start the handshake
handshake_mgr_ = MakeRefCounted<HandshakeManager>();
CoreConfiguration::Get().handshaker_registry().AddHandshakers(
HANDSHAKER_CLIENT, args, pollset_set_, handshake_mgr_.get());
Ref().release(); // ref held by pending handshake
grpc_endpoint* ep = ep_;
ep_ = nullptr;
own_endpoint_ = false;
handshake_mgr_->DoHandshake(ep, args, deadline_,
/*acceptor=*/nullptr, OnHandshakeDone,
/*user_data=*/this);
}
void HttpRequest::NextAddress(grpc_error_handle error) {
if (!error.ok()) {
AppendError(error);
}
if (cancelled_) {
Finish(GRPC_ERROR_CREATE_REFERENCING("HTTP request was cancelled",
&overall_error_, 1));
return;
}
if (next_address_ == addresses_.size()) {
Finish(GRPC_ERROR_CREATE_REFERENCING("Failed HTTP requests to all targets",
&overall_error_, 1));
return;
}
const grpc_resolved_address* addr = &addresses_[next_address_++];
DoHandshake(addr);
}
void HttpRequest::OnResolved(
absl::StatusOr<std::vector<grpc_resolved_address>> addresses_or) {
RefCountedPtr<HttpRequest> unreffer(this);
MutexLock lock(&mu_);
dns_request_handle_.reset();
if (cancelled_) {
Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
return;
}
if (!addresses_or.ok()) {
Finish(absl_status_to_grpc_error(addresses_or.status()));
return;
}
addresses_ = std::move(*addresses_or);
next_address_ = 0;
NextAddress(absl::OkStatus());
}
} // namespace grpc_core