blob: 1cfe1d3e346ecdef45435c8a20afa3241f1c9bea [file] [log] [blame]
// Copyright 2021 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_EVENT_ENGINE_EVENT_ENGINE_H
#define GRPC_EVENT_ENGINE_EVENT_ENGINE_H
#include <grpc/support/port_platform.h>
#include <functional>
#include <vector>
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/port.h>
#include <grpc/event_engine/slice_buffer.h>
// TODO(vigneshbabu): Define the Endpoint::Write metrics collection system
namespace grpc_event_engine {
namespace experimental {
////////////////////////////////////////////////////////////////////////////////
/// The EventEngine Interface
///
/// Overview
/// --------
///
/// The EventEngine encapsulates all platform-specific behaviors related to low
/// level network I/O, timers, asynchronous execution, and DNS resolution.
///
/// This interface allows developers to provide their own event management and
/// network stacks. Motivating uses cases for supporting custom EventEngines
/// include the ability to hook into external event loops, and using different
/// EventEngine instances for each channel to better insulate network I/O and
/// callback processing from other channels.
///
/// A default cross-platform EventEngine instance is provided by gRPC.
///
/// Lifespan and Ownership
/// ----------------------
///
/// gRPC takes shared ownership of EventEngines via std::shared_ptrs to ensure
/// that the engines remain available until they are no longer needed. Depending
/// on the use case, engines may live until gRPC is shut down.
///
/// EXAMPLE USAGE (Not yet implemented)
///
/// Custom EventEngines can be specified per channel, and allow configuration
/// for both clients and servers. To set a custom EventEngine for a client
/// channel, you can do something like the following:
///
/// ChannelArguments args;
/// std::shared_ptr<EventEngine> engine = std::make_shared<MyEngine>(...);
/// args.SetEventEngine(engine);
/// MyAppClient client(grpc::CreateCustomChannel(
/// "localhost:50051", grpc::InsecureChannelCredentials(), args));
///
/// A gRPC server can use a custom EventEngine by calling the
/// ServerBuilder::SetEventEngine method:
///
/// ServerBuilder builder;
/// std::shared_ptr<EventEngine> engine = std::make_shared<MyEngine>(...);
/// builder.SetEventEngine(engine);
/// std::unique_ptr<Server> server(builder.BuildAndStart());
/// server->Wait();
///
///
/// Blocking EventEngine Callbacks
/// -----------------------------
///
/// Doing blocking work in EventEngine callbacks is generally not advisable.
/// While gRPC's default EventEngine implementations have some capacity to scale
/// their thread pools to avoid starvation, this is not an instantaneous
/// process. Further, user-provided EventEngines may not be optimized to handle
/// excessive blocking work at all.
///
/// *Best Practice* : Occasional blocking work may be fine, but we do not
/// recommend running a mostly blocking workload in EventEngine threads.
///
////////////////////////////////////////////////////////////////////////////////
class EventEngine : public std::enable_shared_from_this<EventEngine> {
public:
/// A duration between two events.
///
/// Throughout the EventEngine API durations are used to express how long
/// until an action should be performed.
using Duration = std::chrono::duration<int64_t, std::nano>;
/// A custom closure type for EventEngine task execution.
///
/// Throughout the EventEngine API, \a Closure ownership is retained by the
/// caller - the EventEngine will never delete a Closure, and upon
/// cancellation, the EventEngine will simply forget the Closure exists. The
/// caller is responsible for all necessary cleanup.
class Closure {
public:
Closure() = default;
// Closure's are an interface, and thus non-copyable.
Closure(const Closure&) = delete;
Closure& operator=(const Closure&) = delete;
// Polymorphic type => virtual destructor
virtual ~Closure() = default;
// Run the contained code.
virtual void Run() = 0;
};
/// Represents a scheduled task.
///
/// \a TaskHandles are returned by \a Run* methods, and can be given to the
/// \a Cancel method.
struct TaskHandle {
intptr_t keys[2];
static const TaskHandle kInvalid;
friend bool operator==(const TaskHandle& lhs, const TaskHandle& rhs);
friend bool operator!=(const TaskHandle& lhs, const TaskHandle& rhs);
};
/// A handle to a cancellable connection attempt.
///
/// Returned by \a Connect, and can be passed to \a CancelConnect.
struct ConnectionHandle {
intptr_t keys[2];
static const ConnectionHandle kInvalid;
friend bool operator==(const ConnectionHandle& lhs,
const ConnectionHandle& rhs);
friend bool operator!=(const ConnectionHandle& lhs,
const ConnectionHandle& rhs);
};
/// Thin wrapper around a platform-specific sockaddr type. A sockaddr struct
/// exists on all platforms that gRPC supports.
///
/// Platforms are expected to provide definitions for:
/// * sockaddr
/// * sockaddr_in
/// * sockaddr_in6
class ResolvedAddress {
public:
static constexpr socklen_t MAX_SIZE_BYTES = 128;
ResolvedAddress(const sockaddr* address, socklen_t size);
ResolvedAddress() = default;
ResolvedAddress(const ResolvedAddress&) = default;
const struct sockaddr* address() const;
socklen_t size() const;
private:
char address_[MAX_SIZE_BYTES] = {};
socklen_t size_ = 0;
};
/// One end of a connection between a gRPC client and server. Endpoints are
/// created when connections are established, and Endpoint operations are
/// gRPC's primary means of communication.
///
/// Endpoints must use the provided MemoryAllocator for all data buffer memory
/// allocations. gRPC allows applications to set memory constraints per
/// Channel or Server, and the implementation depends on all dynamic memory
/// allocation being handled by the quota system.
class Endpoint {
public:
/// Shuts down all connections and invokes all pending read or write
/// callbacks with an error status.
virtual ~Endpoint() = default;
/// A struct representing optional arguments that may be provided to an
/// EventEngine Endpoint Read API call.
///
/// Passed as argument to an Endpoint \a Read
struct ReadArgs {
// A suggestion to the endpoint implementation to read at-least the
// specified number of bytes over the network connection before marking
// the endpoint read operation as complete. gRPC may use this argument
// to minimize the number of endpoint read API calls over the lifetime
// of a connection.
int64_t read_hint_bytes;
};
/// Reads data from the Endpoint.
///
/// When data is available on the connection, that data is moved into the
/// \a buffer. If the read succeeds immediately, it returns true and the \a
/// on_read callback is not executed. Otherwise it returns false and the \a
/// on_read callback executes asynchronously when the read completes. The
/// caller must ensure that the callback has access to the buffer when it
/// executes. Ownership of the buffer is not transferred. Valid slices *may*
/// be placed into the buffer even if the callback is invoked with a non-OK
/// Status.
///
/// There can be at most one outstanding read per Endpoint at any given
/// time. An outstanding read is one in which the \a on_read callback has
/// not yet been executed for some previous call to \a Read. If an attempt
/// is made to call \a Read while a previous read is still outstanding, the
/// \a EventEngine must abort.
///
/// For failed read operations, implementations should pass the appropriate
/// statuses to \a on_read. For example, callbacks might expect to receive
/// CANCELLED on endpoint shutdown.
virtual bool Read(absl::AnyInvocable<void(absl::Status)> on_read,
SliceBuffer* buffer, const ReadArgs* args) = 0;
/// A struct representing optional arguments that may be provided to an
/// EventEngine Endpoint Write API call.
///
/// Passed as argument to an Endpoint \a Write
struct WriteArgs {
// Represents private information that may be passed by gRPC for
// select endpoints expected to be used only within google.
void* google_specific = nullptr;
// A suggestion to the endpoint implementation to group data to be written
// into frames of the specified max_frame_size. gRPC may use this
// argument to dynamically control the max sizes of frames sent to a
// receiver in response to high receiver memory pressure.
int64_t max_frame_size;
};
/// Writes data out on the connection.
///
/// If the write succeeds immediately, it returns true and the
/// \a on_writable callback is not executed. Otherwise it returns false and
/// the \a on_writable callback is called asynchronously when the connection
/// is ready for more data. The Slices within the \a data buffer may be
/// mutated at will by the Endpoint until \a on_writable is called. The \a
/// data SliceBuffer will remain valid after calling \a Write, but its state
/// is otherwise undefined. All bytes in \a data must have been written
/// before calling \a on_writable unless an error has occurred.
///
/// There can be at most one outstanding write per Endpoint at any given
/// time. An outstanding write is one in which the \a on_writable callback
/// has not yet been executed for some previous call to \a Write. If an
/// attempt is made to call \a Write while a previous write is still
/// outstanding, the \a EventEngine must abort.
///
/// For failed write operations, implementations should pass the appropriate
/// statuses to \a on_writable. For example, callbacks might expect to
/// receive CANCELLED on endpoint shutdown.
virtual bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
SliceBuffer* data, const WriteArgs* args) = 0;
/// Returns an address in the format described in DNSResolver. The returned
/// values are expected to remain valid for the life of the Endpoint.
virtual const ResolvedAddress& GetPeerAddress() const = 0;
virtual const ResolvedAddress& GetLocalAddress() const = 0;
};
/// Called when a new connection is established.
///
/// If the connection attempt was not successful, implementations should pass
/// the appropriate statuses to this callback. For example, callbacks might
/// expect to receive DEADLINE_EXCEEDED statuses when appropriate, or
/// CANCELLED statuses on EventEngine shutdown.
using OnConnectCallback =
absl::AnyInvocable<void(absl::StatusOr<std::unique_ptr<Endpoint>>)>;
/// Listens for incoming connection requests from gRPC clients and initiates
/// request processing once connections are established.
class Listener {
public:
/// Called when the listener has accepted a new client connection.
using AcceptCallback = absl::AnyInvocable<void(
std::unique_ptr<Endpoint>, MemoryAllocator memory_allocator)>;
virtual ~Listener() = default;
/// Bind an address/port to this Listener.
///
/// It is expected that multiple addresses/ports can be bound to this
/// Listener before Listener::Start has been called. Returns either the
/// bound port or an appropriate error status.
virtual absl::StatusOr<int> Bind(const ResolvedAddress& addr) = 0;
virtual absl::Status Start() = 0;
};
/// Factory method to create a network listener / server.
///
/// Once a \a Listener is created and started, the \a on_accept callback will
/// be called once asynchronously for each established connection. This method
/// may return a non-OK status immediately if an error was encountered in any
/// synchronous steps required to create the Listener. In this case,
/// \a on_shutdown will never be called.
///
/// If this method returns a Listener, then \a on_shutdown will be invoked
/// exactly once when the Listener is shut down, and only after all
/// \a on_accept callbacks have finished executing. The status passed to it
/// will indicate if there was a problem during shutdown.
///
/// The provided \a MemoryAllocatorFactory is used to create \a
/// MemoryAllocators for Endpoint construction.
virtual absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) = 0;
/// Creates a client network connection to a remote network listener.
///
/// Even in the event of an error, it is expected that the \a on_connect
/// callback will be asynchronously executed exactly once by the EventEngine.
/// A connection attempt can be cancelled using the \a CancelConnect method.
///
/// Implementation Note: it is important that the \a memory_allocator be used
/// for all read/write buffer allocations in the EventEngine implementation.
/// This allows gRPC's \a ResourceQuota system to monitor and control memory
/// usage with graceful degradation mechanisms. Please see the \a
/// MemoryAllocator API for more information.
virtual ConnectionHandle Connect(OnConnectCallback on_connect,
const ResolvedAddress& addr,
const EndpointConfig& args,
MemoryAllocator memory_allocator,
Duration timeout) = 0;
/// Request cancellation of a connection attempt.
///
/// If the associated connection has already been completed, it will not be
/// cancelled, and this method will return false.
///
/// If the associated connection has not been completed, it will be cancelled,
/// and this method will return true. The \a OnConnectCallback will not be
/// called.
virtual bool CancelConnect(ConnectionHandle handle) = 0;
/// Provides asynchronous resolution.
class DNSResolver {
public:
/// Task handle for DNS Resolution requests.
struct LookupTaskHandle {
intptr_t keys[2];
static const LookupTaskHandle kInvalid;
friend bool operator==(const LookupTaskHandle& lhs,
const LookupTaskHandle& rhs);
friend bool operator!=(const LookupTaskHandle& lhs,
const LookupTaskHandle& rhs);
};
/// Optional configuration for DNSResolvers.
struct ResolverOptions {
/// If empty, default DNS servers will be used.
/// Must be in the "IP:port" format as described in naming.md.
std::string dns_server;
};
/// DNS SRV record type.
struct SRVRecord {
std::string host;
int port = 0;
int priority = 0;
int weight = 0;
};
/// Called with the collection of sockaddrs that were resolved from a given
/// target address.
using LookupHostnameCallback =
absl::AnyInvocable<void(absl::StatusOr<std::vector<ResolvedAddress>>)>;
/// Called with a collection of SRV records.
using LookupSRVCallback =
absl::AnyInvocable<void(absl::StatusOr<std::vector<SRVRecord>>)>;
/// Called with the result of a TXT record lookup
using LookupTXTCallback =
absl::AnyInvocable<void(absl::StatusOr<std::vector<std::string>>)>;
virtual ~DNSResolver() = default;
/// Asynchronously resolve an address.
///
/// \a default_port may be a non-numeric named service port, and will only
/// be used if \a address does not already contain a port component.
///
/// When the lookup is complete, the \a on_resolve callback will be invoked
/// with a status indicating the success or failure of the lookup.
/// Implementations should pass the appropriate statuses to the callback.
/// For example, callbacks might expect to receive DEADLINE_EXCEEDED or
/// NOT_FOUND.
///
/// If cancelled, \a on_resolve will not be executed.
virtual LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve,
absl::string_view name,
absl::string_view default_port,
Duration timeout) = 0;
/// Asynchronously perform an SRV record lookup.
///
/// \a on_resolve has the same meaning and expectations as \a
/// LookupHostname's \a on_resolve callback.
virtual LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve,
absl::string_view name,
Duration timeout) = 0;
/// Asynchronously perform a TXT record lookup.
///
/// \a on_resolve has the same meaning and expectations as \a
/// LookupHostname's \a on_resolve callback.
virtual LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve,
absl::string_view name,
Duration timeout) = 0;
/// Cancel an asynchronous lookup operation.
///
/// This shares the same semantics with \a EventEngine::Cancel: successfully
/// cancelled lookups will not have their callbacks executed, and this
/// method returns true.
virtual bool CancelLookup(LookupTaskHandle handle) = 0;
};
/// At time of destruction, the EventEngine must have no active
/// responsibilities. EventEngine users (applications) are responsible for
/// cancelling all tasks and DNS lookups, shutting down listeners and
/// endpoints, prior to EventEngine destruction. If there are any outstanding
/// tasks, any running listeners, etc. at time of EventEngine destruction,
/// that is an invalid use of the API, and it will result in undefined
/// behavior.
virtual ~EventEngine() = default;
// TODO(nnoble): consider whether we can remove this method before we
// de-experimentalize this API.
virtual bool IsWorkerThread() = 0;
/// Creates and returns an instance of a DNSResolver, optionally configured by
/// the \a options struct.
virtual std::unique_ptr<DNSResolver> GetDNSResolver(
const DNSResolver::ResolverOptions& options) = 0;
/// Asynchronously executes a task as soon as possible.
///
/// \a Closures scheduled with \a Run cannot be cancelled. The \a closure will
/// not be deleted after it has been run, ownership remains with the caller.
///
/// Implementations must not execute the closure in the calling thread before
/// \a Run returns. For example, if the caller must release a lock before the
/// closure can proceed, running the closure immediately would cause a
/// deadlock.
virtual void Run(Closure* closure) = 0;
/// Asynchronously executes a task as soon as possible.
///
/// \a Closures scheduled with \a Run cannot be cancelled. Unlike the
/// overloaded \a Closure alternative, the absl::AnyInvocable version's \a
/// closure will be deleted by the EventEngine after the closure has been run.
///
/// This version of \a Run may be less performant than the \a Closure version
/// in some scenarios. This overload is useful in situations where performance
/// is not a critical concern.
///
/// Implementations must not execute the closure in the calling thread before
/// \a Run returns.
virtual void Run(absl::AnyInvocable<void()> closure) = 0;
/// Synonymous with scheduling an alarm to run after duration \a when.
///
/// The \a closure will execute when time \a when arrives unless it has been
/// cancelled via the \a Cancel method. If cancelled, the closure will not be
/// run, nor will it be deleted. Ownership remains with the caller.
///
/// Implementations must not execute the closure in the calling thread before
/// \a RunAfter returns.
virtual TaskHandle RunAfter(Duration when, Closure* closure) = 0;
/// Synonymous with scheduling an alarm to run after duration \a when.
///
/// The \a closure will execute when time \a when arrives unless it has been
/// cancelled via the \a Cancel method. If cancelled, the closure will not be
/// run. Unlike the overloaded \a Closure alternative, the absl::AnyInvocable
/// version's \a closure will be deleted by the EventEngine after the closure
/// has been run, or upon cancellation.
///
/// This version of \a RunAfter may be less performant than the \a Closure
/// version in some scenarios. This overload is useful in situations where
/// performance is not a critical concern.
///
/// Implementations must not execute the closure in the calling thread before
/// \a RunAfter returns.
virtual TaskHandle RunAfter(Duration when,
absl::AnyInvocable<void()> closure) = 0;
/// Request cancellation of a task.
///
/// If the associated closure has already been scheduled to run, it will not
/// be cancelled, and this function will return false.
///
/// If the associated closure has not been scheduled to run, it will be
/// cancelled, and the associated absl::AnyInvocable or \a Closure* will not
/// be executed. In this case, Cancel will return true.
///
/// Implementation note: closures should be destroyed in a timely manner after
/// execution or cancellation (milliseconds), since any state bound to the
/// closure may need to be destroyed for things to progress (e.g., if a
/// closure holds a ref to some ref-counted object).
virtual bool Cancel(TaskHandle handle) = 0;
};
/// Replace gRPC's default EventEngine factory.
///
/// Applications may call \a SetEventEngineFactory at any time to replace the
/// default factory used within gRPC. EventEngines will be created when
/// necessary, when they are otherwise not provided by the application.
///
/// To be certain that none of the gRPC-provided built-in EventEngines are
/// created, applications must set a custom EventEngine factory method *before*
/// grpc is initialized.
void SetEventEngineFactory(
absl::AnyInvocable<std::unique_ptr<EventEngine>()> factory);
/// Reset gRPC's EventEngine factory to the built-in default.
///
/// Applications that have called \a SetEventEngineFactory can remove their
/// custom factory using this method. The built-in EventEngine factories will be
/// used going forward. This has no affect on any EventEngines that were created
/// using the previous factories.
void EventEngineFactoryReset();
/// Create an EventEngine using the default factory.
std::unique_ptr<EventEngine> CreateEventEngine();
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_EVENT_ENGINE_EVENT_ENGINE_H