blob: eb00e9387cc9b7148fdab93150335abf5d2782f2 [file] [log] [blame]
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_WRITER_H
#define GRPC_SRC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_WRITER_H
#include <grpc/support/port_platform.h>
#include <queue>
#include <string>
#include <vector>
#include "absl/container/flat_hash_map.h"
#include "src/core/ext/transport/binder/wire_format/binder.h"
#include "src/core/ext/transport/binder/wire_format/transaction.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h"
namespace grpc_binder {
// Member functions are thread safe.
class WireWriter {
public:
virtual ~WireWriter() = default;
virtual absl::Status RpcCall(std::unique_ptr<Transaction> tx) = 0;
virtual absl::Status SendAck(int64_t num_bytes) = 0;
virtual void OnAckReceived(int64_t num_bytes) = 0;
};
class WireWriterImpl : public WireWriter {
public:
explicit WireWriterImpl(std::unique_ptr<Binder> binder);
~WireWriterImpl() override;
absl::Status RpcCall(std::unique_ptr<Transaction> tx) override;
absl::Status SendAck(int64_t num_bytes) override;
void OnAckReceived(int64_t num_bytes) override;
// Required to be public because we would like to call this in combiner (which
// cannot invoke class member function). `RunScheduledTxArgs` and
// `RunScheduledTxInternal` should not be used by user directly.
struct RunScheduledTxArgs {
WireWriterImpl* writer;
struct StreamTx {
std::unique_ptr<Transaction> tx;
// How many data in transaction's `data` field has been sent.
int64_t bytes_sent = 0;
};
struct AckTx {
int64_t num_bytes;
};
absl::variant<AckTx, StreamTx> tx;
};
void RunScheduledTxInternal(RunScheduledTxArgs* arg);
// Split long message into chunks of size 16k. This doesn't necessarily have
// to be the same as the flow control acknowledgement size, but it should not
// exceed 128k.
static const int64_t kBlockSize;
// Flow control allows sending at most 128k between acknowledgements.
static const int64_t kFlowControlWindowSize;
private:
// Fast path: send data in one transaction.
absl::Status RpcCallFastPath(std::unique_ptr<Transaction> tx);
// This function will acquire `write_mu_` to make sure the binder is not used
// concurrently, so this can be called by different threads safely.
absl::Status MakeBinderTransaction(
BinderTransportTxCode tx_code,
std::function<absl::Status(WritableParcel*)> fill_parcel);
// Send a stream to `binder_`. Set `is_last_chunk` to `true` if the stream
// transaction has been sent completely. Otherwise set to `false`.
absl::Status RunStreamTx(RunScheduledTxArgs::StreamTx* stream_tx,
WritableParcel* parcel, bool* is_last_chunk)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(write_mu_);
// Schdule `RunScheduledTxArgs*` in `pending_outgoing_tx_` to `combiner_`, as
// many as possible (under the constraint of `kFlowControlWindowSize`).
void TryScheduleTransaction();
// Guards variables related to transport state.
grpc_core::Mutex write_mu_;
std::unique_ptr<Binder> binder_ ABSL_GUARDED_BY(write_mu_);
// Maps the transaction code (which identifies streams) to their next
// available sequence number. See
// https://github.com/grpc/proposal/blob/master/L73-java-binderchannel/wireformat.md#sequence-number
absl::flat_hash_map<int, int> next_seq_num_ ABSL_GUARDED_BY(write_mu_);
// Number of bytes we have already sent in stream transactions.
std::atomic<int64_t> num_outgoing_bytes_{0};
// Guards variables related to flow control logic.
grpc_core::Mutex flow_control_mu_;
int64_t num_acknowledged_bytes_ ABSL_GUARDED_BY(flow_control_mu_) = 0;
// The queue takes ownership of the pointer.
std::queue<RunScheduledTxArgs*> pending_outgoing_tx_
ABSL_GUARDED_BY(flow_control_mu_);
int num_non_acked_tx_in_combiner_ ABSL_GUARDED_BY(flow_control_mu_) = 0;
// Helper variable for determining if we are currently calling into
// `Binder::Transact`. Useful for avoiding the attempt of acquiring
// `write_mu_` multiple times on the same thread.
std::atomic_bool is_transacting_{false};
grpc_core::Combiner* combiner_;
};
} // namespace grpc_binder
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_WRITER_H