blob: ae014edf6d9087dc8ce73fe92519ead05db5cd02 [file] [log] [blame]
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/posix_engine/ev_poll_posix.h"
#include <stdint.h>
#include <atomic>
#include <initializer_list>
#include <list>
#include <memory>
#include <utility>
#include "absl/container/inlined_vector.h"
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_format.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_EV_POLL
#include <errno.h>
#include <limits.h>
#include <poll.h>
#include <sys/socket.h>
#include <unistd.h>
#include <grpc/support/alloc.h>
#include "src/core/lib/event_engine/common_closures.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h"
#include "src/core/lib/event_engine/time_util.h"
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/strerror.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
static const intptr_t kClosureNotReady = 0;
static const intptr_t kClosureReady = 1;
static const int kPollinCheck = POLLIN | POLLHUP | POLLERR;
static const int kPolloutCheck = POLLOUT | POLLHUP | POLLERR;
namespace grpc_event_engine {
namespace experimental {
using Events = absl::InlinedVector<PollEventHandle*, 5>;
class PollEventHandle : public EventHandle {
public:
PollEventHandle(int fd, PollPoller* poller)
: fd_(fd),
pending_actions_(0),
fork_fd_list_(this),
poller_handles_list_(this),
poller_(poller),
scheduler_(poller->GetScheduler()),
is_orphaned_(false),
is_shutdown_(false),
closed_(false),
released_(false),
pollhup_(false),
watch_mask_(-1),
shutdown_error_(absl::OkStatus()),
exec_actions_closure_([this]() { ExecutePendingActions(); }),
on_done_(nullptr),
read_closure_(reinterpret_cast<PosixEngineClosure*>(kClosureNotReady)),
write_closure_(
reinterpret_cast<PosixEngineClosure*>(kClosureNotReady)) {
poller_->Ref();
grpc_core::MutexLock lock(&poller_->mu_);
poller_->PollerHandlesListAddHandle(this);
}
PollPoller* Poller() override { return poller_; }
bool SetPendingActions(bool pending_read, bool pending_write) {
pending_actions_ |= pending_read;
if (pending_write) {
pending_actions_ |= (1 << 2);
}
if (pending_read || pending_write) {
// The closure is going to be executed. We'll Unref this handle in
// ExecutePendingActions.
Ref();
return true;
}
return false;
}
void ForceRemoveHandleFromPoller() {
grpc_core::MutexLock lock(&poller_->mu_);
poller_->PollerHandlesListRemoveHandle(this);
}
int WrappedFd() override { return fd_; }
bool IsOrphaned() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
return is_orphaned_;
}
void CloseFd() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
if (!released_ && !closed_) {
closed_ = true;
close(fd_);
}
}
bool IsPollhup() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return pollhup_; }
void SetPollhup(bool pollhup) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
pollhup_ = pollhup;
}
bool IsWatched(int& watch_mask) const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
watch_mask = watch_mask_;
return watch_mask_ != -1;
}
bool IsWatched() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
return watch_mask_ != -1;
}
void SetWatched(int watch_mask) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
watch_mask_ = watch_mask;
}
void OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
absl::string_view reason) override;
void ShutdownHandle(absl::Status why) override;
void NotifyOnRead(PosixEngineClosure* on_read) override;
void NotifyOnWrite(PosixEngineClosure* on_write) override;
void NotifyOnError(PosixEngineClosure* on_error) override;
void SetReadable() override;
void SetWritable() override;
void SetHasError() override;
bool IsHandleShutdown() override {
grpc_core::MutexLock lock(&mu_);
return is_shutdown_;
};
inline void ExecutePendingActions() {
int kick = 0;
{
grpc_core::MutexLock lock(&mu_);
if ((pending_actions_ & 1UL)) {
if (SetReadyLocked(&read_closure_)) {
kick = 1;
}
}
if (((pending_actions_ >> 2) & 1UL)) {
if (SetReadyLocked(&write_closure_)) {
kick = 1;
}
}
pending_actions_ = 0;
}
if (kick) {
// SetReadyLocked immediately scheduled some closure. It would have set
// the closure state to NOT_READY. We need to wakeup the Work(...)
// thread to start polling on this fd. If this call is not made, it is
// possible that the poller will reach a state where all the fds under
// the poller's control are not polled for POLLIN/POLLOUT events thus
// leading to an indefinitely blocked Work(..) method.
poller_->KickExternal(false);
}
Unref();
}
void Ref() { ref_count_.fetch_add(1, std::memory_order_relaxed); }
void Unref() {
if (ref_count_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
if (on_done_ != nullptr) {
scheduler_->Run(on_done_);
}
poller_->Unref();
delete this;
}
}
~PollEventHandle() override = default;
grpc_core::Mutex* mu() ABSL_LOCK_RETURNED(mu_) { return &mu_; }
PollPoller::HandlesList& ForkFdListPos() { return fork_fd_list_; }
PollPoller::HandlesList& PollerHandlesListPos() {
return poller_handles_list_;
}
uint32_t BeginPollLocked(uint32_t read_mask, uint32_t write_mask)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
bool EndPollLocked(bool got_read, bool got_write)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
private:
int SetReadyLocked(PosixEngineClosure** st);
int NotifyOnLocked(PosixEngineClosure** st, PosixEngineClosure* closure);
// See Epoll1Poller::ShutdownHandle for explanation on why a mutex is
// required.
grpc_core::Mutex mu_;
std::atomic<int> ref_count_{1};
int fd_;
int pending_actions_;
PollPoller::HandlesList fork_fd_list_;
PollPoller::HandlesList poller_handles_list_;
PollPoller* poller_;
Scheduler* scheduler_;
bool is_orphaned_;
bool is_shutdown_;
bool closed_;
bool released_;
bool pollhup_;
int watch_mask_;
absl::Status shutdown_error_;
AnyInvocableClosure exec_actions_closure_;
PosixEngineClosure* on_done_;
PosixEngineClosure* read_closure_;
PosixEngineClosure* write_closure_;
};
namespace {
// Only used when GRPC_ENABLE_FORK_SUPPORT=1
std::list<PollPoller*> fork_poller_list;
// Only used when GRPC_ENABLE_FORK_SUPPORT=1
PollEventHandle* fork_fd_list_head = nullptr;
gpr_mu fork_fd_list_mu;
void ForkFdListAddHandle(PollEventHandle* handle) {
if (grpc_core::Fork::Enabled()) {
gpr_mu_lock(&fork_fd_list_mu);
handle->ForkFdListPos().next = fork_fd_list_head;
handle->ForkFdListPos().prev = nullptr;
if (fork_fd_list_head != nullptr) {
fork_fd_list_head->ForkFdListPos().prev = handle;
}
fork_fd_list_head = handle;
gpr_mu_unlock(&fork_fd_list_mu);
}
}
void ForkFdListRemoveHandle(PollEventHandle* handle) {
if (grpc_core::Fork::Enabled()) {
gpr_mu_lock(&fork_fd_list_mu);
if (fork_fd_list_head == handle) {
fork_fd_list_head = handle->ForkFdListPos().next;
}
if (handle->ForkFdListPos().prev != nullptr) {
handle->ForkFdListPos().prev->ForkFdListPos().next =
handle->ForkFdListPos().next;
}
if (handle->ForkFdListPos().next != nullptr) {
handle->ForkFdListPos().next->ForkFdListPos().prev =
handle->ForkFdListPos().prev;
}
gpr_mu_unlock(&fork_fd_list_mu);
}
}
void ForkPollerListAddPoller(PollPoller* poller) {
if (grpc_core::Fork::Enabled()) {
gpr_mu_lock(&fork_fd_list_mu);
fork_poller_list.push_back(poller);
gpr_mu_unlock(&fork_fd_list_mu);
}
}
void ForkPollerListRemovePoller(PollPoller* poller) {
if (grpc_core::Fork::Enabled()) {
gpr_mu_lock(&fork_fd_list_mu);
fork_poller_list.remove(poller);
gpr_mu_unlock(&fork_fd_list_mu);
}
}
// Returns the number of milliseconds elapsed between now and start timestamp.
int PollElapsedTimeToMillis(grpc_core::Timestamp start) {
if (start == grpc_core::Timestamp::InfFuture()) return -1;
grpc_core::Timestamp now =
grpc_core::Timestamp::FromTimespecRoundDown(gpr_now(GPR_CLOCK_MONOTONIC));
int64_t delta = (now - start).millis();
if (delta > INT_MAX) {
return INT_MAX;
} else if (delta < 0) {
return 0;
} else {
return static_cast<int>(delta);
}
}
bool InitPollPollerPosix();
// Called by the child process's post-fork handler to close open fds,
// including the global epoll fd of each poller. This allows gRPC to shutdown
// in the child process without interfering with connections or RPCs ongoing
// in the parent.
void ResetEventManagerOnFork() {
// Delete all pending Epoll1EventHandles.
gpr_mu_lock(&fork_fd_list_mu);
while (fork_fd_list_head != nullptr) {
close(fork_fd_list_head->WrappedFd());
PollEventHandle* next = fork_fd_list_head->ForkFdListPos().next;
fork_fd_list_head->ForceRemoveHandleFromPoller();
delete fork_fd_list_head;
fork_fd_list_head = next;
}
// Delete all registered pollers.
while (!fork_poller_list.empty()) {
PollPoller* poller = fork_poller_list.front();
fork_poller_list.pop_front();
delete poller;
}
gpr_mu_unlock(&fork_fd_list_mu);
if (grpc_core::Fork::Enabled()) {
gpr_mu_destroy(&fork_fd_list_mu);
grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
}
InitPollPollerPosix();
}
// It is possible that GLIBC has epoll but the underlying kernel doesn't.
// Create epoll_fd to make sure epoll support is available
bool InitPollPollerPosix() {
if (!grpc_event_engine::experimental::SupportsWakeupFd()) {
return false;
}
if (grpc_core::Fork::Enabled()) {
gpr_mu_init(&fork_fd_list_mu);
grpc_core::Fork::SetResetChildPollingEngineFunc(ResetEventManagerOnFork);
}
return true;
}
} // namespace
EventHandle* PollPoller::CreateHandle(int fd, absl::string_view /*name*/,
bool track_err) {
// Avoid unused-parameter warning for debug-only parameter
(void)track_err;
GPR_DEBUG_ASSERT(track_err == false);
PollEventHandle* handle = new PollEventHandle(fd, this);
ForkFdListAddHandle(handle);
// We need to send a kick to the thread executing Work(..) so that it can
// add this new Fd into the list of Fds to poll.
KickExternal(false);
return handle;
}
void PollEventHandle::OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
absl::string_view /*reason*/) {
ForkFdListRemoveHandle(this);
ForceRemoveHandleFromPoller();
{
grpc_core::ReleasableMutexLock lock(&mu_);
on_done_ = on_done;
released_ = release_fd != nullptr;
if (release_fd != nullptr) {
*release_fd = fd_;
}
GPR_ASSERT(!is_orphaned_);
is_orphaned_ = true;
// Perform shutdown operations if not already done so.
if (!is_shutdown_) {
is_shutdown_ = true;
shutdown_error_ =
absl::Status(absl::StatusCode::kInternal, "FD Orphaned");
grpc_core::StatusSetInt(&shutdown_error_,
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE);
SetReadyLocked(&read_closure_);
SetReadyLocked(&write_closure_);
}
// signal read/write closed to OS so that future operations fail.
if (!released_) {
shutdown(fd_, SHUT_RDWR);
}
if (!IsWatched()) {
CloseFd();
} else {
// It is watched i.e we cannot take action wihout breaking from the
// blocking poll. Mark it as Unwatched and kick the thread executing
// Work(...). That thread should proceed with the cleanup.
SetWatched(-1);
lock.Release();
poller_->KickExternal(false);
}
}
Unref();
}
int PollEventHandle::NotifyOnLocked(PosixEngineClosure** st,
PosixEngineClosure* closure) {
if (is_shutdown_ || pollhup_) {
closure->SetStatus(shutdown_error_);
scheduler_->Run(closure);
} else if (*st == reinterpret_cast<PosixEngineClosure*>(kClosureNotReady)) {
// not ready ==> switch to a waiting state by setting the closure
*st = closure;
return 0;
} else if (*st == reinterpret_cast<PosixEngineClosure*>(kClosureReady)) {
// already ready ==> queue the closure to run immediately
*st = reinterpret_cast<PosixEngineClosure*>(kClosureNotReady);
closure->SetStatus(shutdown_error_);
scheduler_->Run(closure);
return 1;
} else {
// upcallptr was set to a different closure. This is an error!
grpc_core::Crash(
"User called a notify_on function with a previous callback still "
"pending");
}
return 0;
}
// returns 1 if state becomes not ready
int PollEventHandle::SetReadyLocked(PosixEngineClosure** st) {
if (*st == reinterpret_cast<PosixEngineClosure*>(kClosureReady)) {
// duplicate ready ==> ignore
return 0;
} else if (*st == reinterpret_cast<PosixEngineClosure*>(kClosureNotReady)) {
// not ready, and not waiting ==> flag ready
*st = reinterpret_cast<PosixEngineClosure*>(kClosureReady);
return 0;
} else {
// waiting ==> queue closure
PosixEngineClosure* closure = *st;
*st = reinterpret_cast<PosixEngineClosure*>(kClosureNotReady);
closure->SetStatus(shutdown_error_);
scheduler_->Run(closure);
return 1;
}
}
void PollEventHandle::ShutdownHandle(absl::Status why) {
// We need to take a Ref here because SetReadyLocked may trigger execution
// of a closure which calls OrphanHandle or poller->Shutdown() prematurely.
Ref();
{
grpc_core::MutexLock lock(&mu_);
// only shutdown once
if (!is_shutdown_) {
is_shutdown_ = true;
shutdown_error_ = why;
grpc_core::StatusSetInt(&shutdown_error_,
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE);
SetReadyLocked(&read_closure_);
SetReadyLocked(&write_closure_);
}
}
// For the Ref() taken at the begining of this function.
Unref();
}
void PollEventHandle::NotifyOnRead(PosixEngineClosure* on_read) {
// We need to take a Ref here because NotifyOnLocked may trigger execution
// of a closure which calls OrphanHandle that may delete this object or call
// poller->Shutdown() prematurely.
Ref();
{
grpc_core::ReleasableMutexLock lock(&mu_);
if (NotifyOnLocked(&read_closure_, on_read)) {
lock.Release();
// NotifyOnLocked immediately scheduled some closure. It would have set
// the closure state to NOT_READY. We need to wakeup the Work(...) thread
// to start polling on this fd. If this call is not made, it is possible
// that the poller will reach a state where all the fds under the
// poller's control are not polled for POLLIN/POLLOUT events thus leading
// to an indefinitely blocked Work(..) method.
poller_->KickExternal(false);
}
}
// For the Ref() taken at the begining of this function.
Unref();
}
void PollEventHandle::NotifyOnWrite(PosixEngineClosure* on_write) {
// We need to take a Ref here because NotifyOnLocked may trigger execution
// of a closure which calls OrphanHandle that may delete this object or call
// poller->Shutdown() prematurely.
Ref();
{
grpc_core::ReleasableMutexLock lock(&mu_);
if (NotifyOnLocked(&write_closure_, on_write)) {
lock.Release();
// NotifyOnLocked immediately scheduled some closure. It would have set
// the closure state to NOT_READY. We need to wakeup the Work(...) thread
// to start polling on this fd. If this call is not made, it is possible
// that the poller will reach a state where all the fds under the
// poller's control are not polled for POLLIN/POLLOUT events thus leading
// to an indefinitely blocked Work(..) method.
poller_->KickExternal(false);
}
}
// For the Ref() taken at the begining of this function.
Unref();
}
void PollEventHandle::NotifyOnError(PosixEngineClosure* on_error) {
on_error->SetStatus(
absl::Status(absl::StatusCode::kCancelled,
"Polling engine does not support tracking errors"));
scheduler_->Run(on_error);
}
void PollEventHandle::SetReadable() {
Ref();
{
grpc_core::MutexLock lock(&mu_);
SetReadyLocked(&read_closure_);
}
Unref();
}
void PollEventHandle::SetWritable() {
Ref();
{
grpc_core::MutexLock lock(&mu_);
SetReadyLocked(&write_closure_);
}
Unref();
}
void PollEventHandle::SetHasError() {}
uint32_t PollEventHandle::BeginPollLocked(uint32_t read_mask,
uint32_t write_mask) {
uint32_t mask = 0;
bool read_ready = (pending_actions_ & 1UL);
bool write_ready = ((pending_actions_ >> 2) & 1UL);
Ref();
// If we are shutdown, then no need to poll this fd. Set watch_mask to 0.
if (is_shutdown_) {
SetWatched(0);
return 0;
}
// If there is nobody polling for read, but we need to, then start doing so.
if (read_mask && !read_ready &&
read_closure_ != reinterpret_cast<PosixEngineClosure*>(kClosureReady)) {
mask |= read_mask;
}
// If there is nobody polling for write, but we need to, then start doing so
if (write_mask && !write_ready &&
write_closure_ != reinterpret_cast<PosixEngineClosure*>(kClosureReady)) {
mask |= write_mask;
}
SetWatched(mask);
return mask;
}
bool PollEventHandle::EndPollLocked(bool got_read, bool got_write) {
if (is_orphaned_ && !IsWatched()) {
CloseFd();
} else if (!is_orphaned_) {
return SetPendingActions(got_read, got_write);
}
return false;
}
void PollPoller::KickExternal(bool ext) {
grpc_core::MutexLock lock(&mu_);
if (was_kicked_) {
if (ext) {
was_kicked_ext_ = true;
}
return;
}
was_kicked_ = true;
was_kicked_ext_ = ext;
GPR_ASSERT(wakeup_fd_->Wakeup().ok());
}
void PollPoller::Kick() { KickExternal(true); }
void PollPoller::PollerHandlesListAddHandle(PollEventHandle* handle) {
handle->PollerHandlesListPos().next = poll_handles_list_head_;
handle->PollerHandlesListPos().prev = nullptr;
if (poll_handles_list_head_ != nullptr) {
poll_handles_list_head_->PollerHandlesListPos().prev = handle;
}
poll_handles_list_head_ = handle;
++num_poll_handles_;
}
void PollPoller::PollerHandlesListRemoveHandle(PollEventHandle* handle) {
if (poll_handles_list_head_ == handle) {
poll_handles_list_head_ = handle->PollerHandlesListPos().next;
}
if (handle->PollerHandlesListPos().prev != nullptr) {
handle->PollerHandlesListPos().prev->PollerHandlesListPos().next =
handle->PollerHandlesListPos().next;
}
if (handle->PollerHandlesListPos().next != nullptr) {
handle->PollerHandlesListPos().next->PollerHandlesListPos().prev =
handle->PollerHandlesListPos().prev;
}
--num_poll_handles_;
}
PollPoller::PollPoller(Scheduler* scheduler)
: scheduler_(scheduler),
use_phony_poll_(false),
was_kicked_(false),
was_kicked_ext_(false),
num_poll_handles_(0),
poll_handles_list_head_(nullptr) {
wakeup_fd_ = *CreateWakeupFd();
GPR_ASSERT(wakeup_fd_ != nullptr);
ForkPollerListAddPoller(this);
}
PollPoller::PollPoller(Scheduler* scheduler, bool use_phony_poll)
: scheduler_(scheduler),
use_phony_poll_(use_phony_poll),
was_kicked_(false),
was_kicked_ext_(false),
num_poll_handles_(0),
poll_handles_list_head_(nullptr) {
wakeup_fd_ = *CreateWakeupFd();
GPR_ASSERT(wakeup_fd_ != nullptr);
ForkPollerListAddPoller(this);
}
PollPoller::~PollPoller() {
// Assert that no active handles are present at the time of destruction.
// They should have been orphaned before reaching this state.
GPR_ASSERT(num_poll_handles_ == 0);
GPR_ASSERT(poll_handles_list_head_ == nullptr);
}
Poller::WorkResult PollPoller::Work(
EventEngine::Duration timeout,
absl::FunctionRef<void()> schedule_poll_again) {
// Avoid malloc for small number of elements.
enum { inline_elements = 96 };
struct pollfd pollfd_space[inline_elements];
bool was_kicked_ext = false;
PollEventHandle* watcher_space[inline_elements];
Events pending_events;
pending_events.clear();
int timeout_ms =
static_cast<int>(grpc_event_engine::experimental::Milliseconds(timeout));
mu_.Lock();
// Start polling, and keep doing so while we're being asked to
// re-evaluate our pollers (this allows poll() based pollers to
// ensure they don't miss wakeups).
while (pending_events.empty() && timeout_ms >= 0) {
int r = 0;
size_t i;
nfds_t pfd_count;
struct pollfd* pfds;
PollEventHandle** watchers;
// Estimate start time for a poll iteration.
grpc_core::Timestamp start = grpc_core::Timestamp::FromTimespecRoundDown(
gpr_now(GPR_CLOCK_MONOTONIC));
if (num_poll_handles_ + 2 <= inline_elements) {
pfds = pollfd_space;
watchers = watcher_space;
} else {
const size_t pfd_size = sizeof(*pfds) * (num_poll_handles_ + 2);
const size_t watch_size = sizeof(*watchers) * (num_poll_handles_ + 2);
void* buf = gpr_malloc(pfd_size + watch_size);
pfds = static_cast<struct pollfd*>(buf);
watchers = static_cast<PollEventHandle**>(
static_cast<void*>((static_cast<char*>(buf) + pfd_size)));
pfds = static_cast<struct pollfd*>(buf);
}
pfd_count = 1;
pfds[0].fd = wakeup_fd_->ReadFd();
pfds[0].events = POLLIN;
pfds[0].revents = 0;
PollEventHandle* head = poll_handles_list_head_;
while (head != nullptr) {
{
grpc_core::MutexLock lock(head->mu());
// There shouldn't be any orphaned fds at this point. This is because
// prior to marking a handle as orphaned it is first removed from
// poll handle list for the poller under the poller lock.
GPR_ASSERT(!head->IsOrphaned());
if (!head->IsPollhup()) {
pfds[pfd_count].fd = head->WrappedFd();
watchers[pfd_count] = head;
// BeginPollLocked takes a ref of the handle. It also marks the
// fd as Watched with an appropriate watch_mask. The watch_mask
// is 0 if the fd is shutdown or if the fd is already ready (i.e
// both read and write events are already available) and doesn't
// need to be polled again. The watch_mask is > 0 otherwise
// indicating the fd needs to be polled.
pfds[pfd_count].events = head->BeginPollLocked(POLLIN, POLLOUT);
pfd_count++;
}
}
head = head->PollerHandlesListPos().next;
}
mu_.Unlock();
if (!use_phony_poll_ || timeout_ms == 0 || pfd_count == 1) {
// If use_phony_poll is true and pfd_count == 1, it implies only the
// wakeup_fd is present. Allow the call to get blocked in this case as
// well instead of crashing. This is because the poller::Work is called
// right after an event enging is constructed. Even if phony poll is
// expected to be used, we dont want to check for it until some actual
// event handles are registered. Otherwise the EventEngine construction
// may crash.
r = poll(pfds, pfd_count, timeout_ms);
} else {
grpc_core::Crash("Attempted a blocking poll when declared non-polling.");
}
if (r <= 0) {
if (r < 0 && errno != EINTR) {
// Abort fail here.
grpc_core::Crash(absl::StrFormat(
"(event_engine) PollPoller:%p encountered poll error: %s", this,
grpc_core::StrError(errno).c_str()));
}
for (i = 1; i < pfd_count; i++) {
PollEventHandle* head = watchers[i];
int watch_mask;
grpc_core::ReleasableMutexLock lock(head->mu());
if (head->IsWatched(watch_mask)) {
head->SetWatched(-1);
// This fd was Watched with a watch mask > 0.
if (watch_mask > 0 && r < 0) {
// This case implies the fd was polled (since watch_mask > 0 and
// the poll returned an error. Mark the fds as both readable and
// writable.
if (head->EndPollLocked(true, true)) {
// Its safe to add to list of pending events because
// EndPollLocked returns true only when the handle is
// not orphaned. But an orphan might be initiated on the handle
// after this Work() method returns and before the next Work()
// method is invoked.
pending_events.push_back(head);
}
} else {
// In this case, (1) watch_mask > 0 && r == 0 or (2) watch_mask ==
// 0 and r < 0 or (3) watch_mask == 0 and r == 0. For case-1, no
// events are pending on the fd even though the fd was polled. For
// case-2 and 3, the fd was not polled
head->EndPollLocked(false, false);
}
} else {
// It can enter this case if an orphan was invoked on the handle
// while it was being polled.
head->EndPollLocked(false, false);
}
lock.Release();
// Unref the ref taken at BeginPollLocked.
head->Unref();
}
} else {
if (pfds[0].revents & kPollinCheck) {
GPR_ASSERT(wakeup_fd_->ConsumeWakeup().ok());
}
for (i = 1; i < pfd_count; i++) {
PollEventHandle* head = watchers[i];
int watch_mask;
grpc_core::ReleasableMutexLock lock(head->mu());
if (!head->IsWatched(watch_mask) || watch_mask == 0) {
// IsWatched will be false if an orphan was invoked on the
// handle while it was being polled. If watch_mask is 0, then the fd
// was not polled.
head->SetWatched(-1);
head->EndPollLocked(false, false);
} else {
// Watched is true and watch_mask > 0
if (pfds[i].revents & POLLHUP) {
head->SetPollhup(true);
}
head->SetWatched(-1);
if (head->EndPollLocked(pfds[i].revents & kPollinCheck,
pfds[i].revents & kPolloutCheck)) {
// Its safe to add to list of pending events because EndPollLocked
// returns true only when the handle is not orphaned.
// But an orphan might be initiated on the handle after this
// Work() method returns and before the next Work() method is
// invoked.
pending_events.push_back(head);
}
}
lock.Release();
// Unref the ref taken at BeginPollLocked.
head->Unref();
}
}
if (pfds != pollfd_space) {
gpr_free(pfds);
}
// End of poll iteration. Update how much time is remaining.
timeout_ms -= PollElapsedTimeToMillis(start);
mu_.Lock();
if (std::exchange(was_kicked_, false) &&
std::exchange(was_kicked_ext_, false)) {
// External kick. Need to break out.
was_kicked_ext = true;
break;
}
}
mu_.Unlock();
if (pending_events.empty()) {
if (was_kicked_ext) {
return Poller::WorkResult::kKicked;
}
return Poller::WorkResult::kDeadlineExceeded;
}
// Run the provided callback synchronously.
schedule_poll_again();
// Process all pending events inline.
for (auto& it : pending_events) {
it->ExecutePendingActions();
}
return was_kicked_ext ? Poller::WorkResult::kKicked : Poller::WorkResult::kOk;
}
void PollPoller::Shutdown() {
ForkPollerListRemovePoller(this);
Unref();
}
PollPoller* MakePollPoller(Scheduler* scheduler, bool use_phony_poll) {
static bool kPollPollerSupported = InitPollPollerPosix();
if (kPollPollerSupported) {
return new PollPoller(scheduler, use_phony_poll);
}
return nullptr;
}
} // namespace experimental
} // namespace grpc_event_engine
#else // GRPC_POSIX_SOCKET_EV_POLL
#include "src/core/lib/gprpp/crash.h"
namespace grpc_event_engine {
namespace experimental {
PollPoller::PollPoller(Scheduler* /* engine */) {
grpc_core::Crash("unimplemented");
}
void PollPoller::Shutdown() { grpc_core::Crash("unimplemented"); }
PollPoller::~PollPoller() { grpc_core::Crash("unimplemented"); }
EventHandle* PollPoller::CreateHandle(int /*fd*/, absl::string_view /*name*/,
bool /*track_err*/) {
grpc_core::Crash("unimplemented");
}
Poller::WorkResult PollPoller::Work(
EventEngine::Duration /*timeout*/,
absl::FunctionRef<void()> /*schedule_poll_again*/) {
grpc_core::Crash("unimplemented");
}
void PollPoller::Kick() { grpc_core::Crash("unimplemented"); }
// If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
// nullptr.
PollPoller* MakePollPoller(Scheduler* /*scheduler*/,
bool /* use_phony_poll */) {
return nullptr;
}
void PollPoller::KickExternal(bool /*ext*/) {
grpc_core::Crash("unimplemented");
}
void PollPoller::PollerHandlesListAddHandle(PollEventHandle* /*handle*/) {
grpc_core::Crash("unimplemented");
}
void PollPoller::PollerHandlesListRemoveHandle(PollEventHandle* /*handle*/) {
grpc_core::Crash("unimplemented");
}
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_POSIX_SOCKET_EV_POLL