pw_ring_buffer: Support multi-reader ring buffer
Add support for single-writer multi-reader ring buffers.
Change-Id: I57510836639c36d010612850fd55047c5766f80e
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/32321
Pigweed-Auto-Submit: Prashanth Swaminathan <prashanthsw@google.com>
Reviewed-by: Armando Montanez <amontanez@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
diff --git a/pw_ring_buffer/prefixed_entry_ring_buffer.cc b/pw_ring_buffer/prefixed_entry_ring_buffer.cc
index 82f76d0..400e7d0 100644
--- a/pw_ring_buffer/prefixed_entry_ring_buffer.cc
+++ b/pw_ring_buffer/prefixed_entry_ring_buffer.cc
@@ -14,22 +14,27 @@
#include "pw_ring_buffer/prefixed_entry_ring_buffer.h"
+#include <algorithm>
#include <cstring>
+#include "pw_assert/light.h"
#include "pw_varint/varint.h"
namespace pw {
namespace ring_buffer {
using std::byte;
+using Reader = PrefixedEntryRingBufferMulti::Reader;
-void PrefixedEntryRingBuffer::Clear() {
- read_idx_ = 0;
+void PrefixedEntryRingBufferMulti::Clear() {
write_idx_ = 0;
- entry_count_ = 0;
+ for (Reader& reader : readers_) {
+ reader.read_idx = 0;
+ reader.entry_count = 0;
+ }
}
-Status PrefixedEntryRingBuffer::SetBuffer(std::span<byte> buffer) {
+Status PrefixedEntryRingBufferMulti::SetBuffer(std::span<byte> buffer) {
if ((buffer.data() == nullptr) || //
(buffer.size_bytes() == 0) || //
(buffer.size_bytes() > kMaxBufferBytes)) {
@@ -43,9 +48,35 @@
return OkStatus();
}
-Status PrefixedEntryRingBuffer::InternalPushBack(std::span<const byte> data,
- byte user_preamble_data,
- bool drop_elements_if_needed) {
+Status PrefixedEntryRingBufferMulti::AttachReader(Reader& reader) {
+ if (reader.buffer != nullptr) {
+ return Status::InvalidArgument();
+ }
+ reader.buffer = this;
+
+ // Note that a newly attached reader sees the buffer as empty,
+ // and is not privy to entries pushed before being attached.
+ reader.read_idx = write_idx_;
+ reader.entry_count = 0;
+ readers_.push_back(reader);
+ return OkStatus();
+}
+
+Status PrefixedEntryRingBufferMulti::DetachReader(Reader& reader) {
+ if (reader.buffer != this) {
+ return Status::InvalidArgument();
+ }
+ reader.buffer = nullptr;
+ reader.read_idx = 0;
+ reader.entry_count = 0;
+ readers_.remove(reader);
+ return OkStatus();
+}
+
+Status PrefixedEntryRingBufferMulti::InternalPushBack(
+ std::span<const byte> data,
+ byte user_preamble_data,
+ bool drop_elements_if_needed) {
if (buffer_ == nullptr) {
return Status::FailedPrecondition();
}
@@ -66,7 +97,7 @@
// PushBack() case: evict items as needed.
// Drop old entries until we have space for the new entry.
while (RawAvailableBytes() < total_write_bytes) {
- PopFront();
+ InternalPopFrontAll();
}
} else if (RawAvailableBytes() < total_write_bytes) {
// TryPushBack() case: don't evict items.
@@ -79,7 +110,11 @@
}
RawWrite(std::span(varint_buf, varint_bytes));
RawWrite(data);
- entry_count_++;
+
+ // Update all readers of the new count.
+ for (Reader& reader : readers_) {
+ reader.entry_count++;
+ }
return OkStatus();
}
@@ -95,40 +130,45 @@
};
}
-Status PrefixedEntryRingBuffer::PeekFront(std::span<byte> data,
- size_t* bytes_read) {
+Status PrefixedEntryRingBufferMulti::InternalPeekFront(Reader& reader,
+ std::span<byte> data,
+ size_t* bytes_read) {
*bytes_read = 0;
- return InternalRead(GetOutput(data, bytes_read), false);
+ return InternalRead(reader, GetOutput(data, bytes_read), false);
}
-Status PrefixedEntryRingBuffer::PeekFront(ReadOutput output) {
- return InternalRead(output, false);
+Status PrefixedEntryRingBufferMulti::InternalPeekFront(Reader& reader,
+ ReadOutput output) {
+ return InternalRead(reader, output, false);
}
-Status PrefixedEntryRingBuffer::PeekFrontWithPreamble(std::span<byte> data,
- size_t* bytes_read) {
+Status PrefixedEntryRingBufferMulti::InternalPeekFrontWithPreamble(
+ Reader& reader, std::span<byte> data, size_t* bytes_read) {
*bytes_read = 0;
- return InternalRead(GetOutput(data, bytes_read), true);
+ return InternalRead(reader, GetOutput(data, bytes_read), true);
}
-Status PrefixedEntryRingBuffer::PeekFrontWithPreamble(ReadOutput output) {
- return InternalRead(output, true);
+Status PrefixedEntryRingBufferMulti::InternalPeekFrontWithPreamble(
+ Reader& reader, ReadOutput output) {
+ return InternalRead(reader, output, true);
}
// T should be similar to Status (*read_output)(std::span<const byte>)
template <typename T>
-Status PrefixedEntryRingBuffer::InternalRead(T read_output, bool get_preamble) {
+Status PrefixedEntryRingBufferMulti::InternalRead(Reader& reader,
+ T read_output,
+ bool get_preamble) {
if (buffer_ == nullptr) {
return Status::FailedPrecondition();
}
- if (EntryCount() == 0) {
+ if (reader.entry_count == 0) {
return Status::OutOfRange();
}
// Figure out where to start reading (wrapped); accounting for preamble.
- EntryInfo info = FrontEntryInfo();
+ EntryInfo info = FrontEntryInfo(reader);
size_t read_bytes = info.data_bytes;
- size_t data_read_idx = read_idx_;
+ size_t data_read_idx = reader.read_idx;
if (get_preamble) {
read_bytes += info.preamble_bytes;
} else {
@@ -148,67 +188,133 @@
return status;
}
-Status PrefixedEntryRingBuffer::PopFront() {
- if (buffer_ == nullptr) {
- return Status::FailedPrecondition();
+void PrefixedEntryRingBufferMulti::InternalPopFrontAll() {
+ // Forcefully pop all readers. Find the slowest reader, which must have
+ // the highest entry count, then pop all readers that have the same count.
+ size_t entry_count = GetSlowestReader().entry_count;
+ // If no readers have any entries left to read, return immediately.
+ PW_DASSERT(entry_count != 0);
+ // Otherwise, pop the readers that have the largest value.
+ for (Reader& reader : readers_) {
+ if (reader.entry_count == entry_count) {
+ reader.PopFront();
+ }
}
- if (EntryCount() == 0) {
- return Status::OutOfRange();
- }
-
- // Advance the read pointer past the front entry to the next one.
- EntryInfo info = FrontEntryInfo();
- size_t entry_bytes = info.preamble_bytes + info.data_bytes;
- read_idx_ = IncrementIndex(read_idx_, entry_bytes);
- entry_count_--;
- return OkStatus();
}
-Status PrefixedEntryRingBuffer::Dering() {
- if (buffer_ == nullptr) {
+Reader& PrefixedEntryRingBufferMulti::GetSlowestReader() {
+ // Readers are guaranteed to be before the writer pointer (the class enforces
+ // this on every read/write operation that forces the write pointer ahead of
+ // an existing reader). To determine the slowest reader, we consider three
+ // scenarios:
+ //
+ // In all below cases, R1 is the slowest reader:
+ // [[R1 R2 R3 WH]] => Right-hand writer, slowest reader is left-most reader.
+ // [[WH R1 R2 R3]] => Left-hand writer, slowest reader is left-most reader.
+ // [[R3 WH R1 R2]] => Middle-writer, slowest reader is left-most reader after
+ // writer.
+ //
+ // Formally, choose the left-most reader after the writer (ex.2,3), but if
+ // that doesn't exist, choose the left-most reader before the writer (ex.1).
+ PW_DASSERT(readers_.size() > 0);
+ Reader* slowest_reader_after_writer = nullptr;
+ Reader* slowest_reader_before_writer = nullptr;
+ for (Reader& reader : readers_) {
+ if (reader.read_idx < write_idx_) {
+ if (!slowest_reader_before_writer ||
+ reader.read_idx < slowest_reader_before_writer->read_idx) {
+ slowest_reader_before_writer = &reader;
+ }
+ } else {
+ if (!slowest_reader_after_writer ||
+ reader.read_idx < slowest_reader_after_writer->read_idx) {
+ slowest_reader_after_writer = &reader;
+ }
+ }
+ }
+ return *(slowest_reader_after_writer ? slowest_reader_after_writer
+ : slowest_reader_before_writer);
+}
+
+Status PrefixedEntryRingBufferMulti::Dering() {
+ if (buffer_ == nullptr || readers_.size() == 0) {
return Status::FailedPrecondition();
}
+
// Check if by luck we're already deringed.
- if (read_idx_ == 0) {
+ Reader* slowest_reader = &GetSlowestReader();
+ if (slowest_reader->read_idx == 0) {
return OkStatus();
}
auto buffer_span = std::span(buffer_, buffer_bytes_);
- std::rotate(
- buffer_span.begin(), buffer_span.begin() + read_idx_, buffer_span.end());
+ std::rotate(buffer_span.begin(),
+ buffer_span.begin() + slowest_reader->read_idx,
+ buffer_span.end());
// If the new index is past the end of the buffer,
// alias it back (wrap) to the start of the buffer.
- if (write_idx_ < read_idx_) {
+ if (write_idx_ < slowest_reader->read_idx) {
write_idx_ += buffer_bytes_;
}
- write_idx_ -= read_idx_;
- read_idx_ = 0;
+ write_idx_ -= slowest_reader->read_idx;
+
+ for (Reader& reader : readers_) {
+ if (&reader == slowest_reader) {
+ continue;
+ }
+ if (reader.read_idx < slowest_reader->read_idx) {
+ reader.read_idx += buffer_bytes_;
+ }
+ reader.read_idx -= slowest_reader->read_idx;
+ }
+
+ slowest_reader->read_idx = 0;
return OkStatus();
}
-size_t PrefixedEntryRingBuffer::FrontEntryDataSizeBytes() {
- if (EntryCount() == 0) {
- return 0;
+Status PrefixedEntryRingBufferMulti::InternalPopFront(Reader& reader) {
+ if (buffer_ == nullptr) {
+ return Status::FailedPrecondition();
}
- return FrontEntryInfo().data_bytes;
+ if (reader.entry_count == 0) {
+ return Status::OutOfRange();
+ }
+
+ // Advance the read pointer past the front entry to the next one.
+ EntryInfo info = FrontEntryInfo(reader);
+ size_t entry_bytes = info.preamble_bytes + info.data_bytes;
+ size_t prev_read_idx = reader.read_idx;
+ reader.read_idx = IncrementIndex(prev_read_idx, entry_bytes);
+ reader.entry_count--;
+ return OkStatus();
}
-size_t PrefixedEntryRingBuffer::FrontEntryTotalSizeBytes() {
- if (EntryCount() == 0) {
+size_t PrefixedEntryRingBufferMulti::InternalFrontEntryDataSizeBytes(
+ Reader& reader) {
+ if (reader.entry_count == 0) {
return 0;
}
- EntryInfo info = FrontEntryInfo();
+ return FrontEntryInfo(reader).data_bytes;
+}
+
+size_t PrefixedEntryRingBufferMulti::InternalFrontEntryTotalSizeBytes(
+ Reader& reader) {
+ if (reader.entry_count == 0) {
+ return 0;
+ }
+ EntryInfo info = FrontEntryInfo(reader);
return info.preamble_bytes + info.data_bytes;
}
-PrefixedEntryRingBuffer::EntryInfo PrefixedEntryRingBuffer::FrontEntryInfo() {
+PrefixedEntryRingBufferMulti::EntryInfo
+PrefixedEntryRingBufferMulti::FrontEntryInfo(Reader& reader) {
// Entry headers consists of: (optional prefix byte, varint size, data...)
// Read the entry header; extract the varint and it's size in bytes.
byte varint_buf[kMaxEntryPreambleBytes];
RawRead(varint_buf,
- IncrementIndex(read_idx_, user_preamble_ ? 1 : 0),
+ IncrementIndex(reader.read_idx, user_preamble_ ? 1 : 0),
kMaxEntryPreambleBytes);
uint64_t entry_size;
size_t varint_size = varint::Decode(varint_buf, &entry_size);
@@ -221,20 +327,33 @@
// Comparisons ordered for more probable early exits, assuming the reader is
// not far behind the writer compared to the size of the ring.
-size_t PrefixedEntryRingBuffer::RawAvailableBytes() {
+size_t PrefixedEntryRingBufferMulti::RawAvailableBytes() {
+ // Compute slowest reader.
+ // TODO: Alternatively, the slowest reader could be actively mantained on
+ // every read operation, but reads are more likely than writes.
+ if (readers_.size() == 0) {
+ return buffer_bytes_;
+ }
+
+ size_t read_idx = GetSlowestReader().read_idx;
// Case: Not wrapped.
- if (read_idx_ < write_idx_) {
- return buffer_bytes_ - (write_idx_ - read_idx_);
+ if (read_idx < write_idx_) {
+ return buffer_bytes_ - (write_idx_ - read_idx);
}
// Case: Wrapped
- if (read_idx_ > write_idx_) {
- return read_idx_ - write_idx_;
+ if (read_idx > write_idx_) {
+ return read_idx - write_idx_;
}
// Case: Matched read and write heads; empty or full.
- return entry_count_ ? 0 : buffer_bytes_;
+ for (Reader& reader : readers_) {
+ if (reader.read_idx == read_idx && reader.entry_count != 0) {
+ return 0;
+ }
+ }
+ return buffer_bytes_;
}
-void PrefixedEntryRingBuffer::RawWrite(std::span<const std::byte> source) {
+void PrefixedEntryRingBufferMulti::RawWrite(std::span<const std::byte> source) {
// Write until the end of the source or the backing buffer.
size_t bytes_until_wrap = buffer_bytes_ - write_idx_;
size_t bytes_to_copy = std::min(source.size(), bytes_until_wrap);
@@ -248,9 +367,9 @@
write_idx_ = IncrementIndex(write_idx_, source.size());
}
-void PrefixedEntryRingBuffer::RawRead(byte* destination,
- size_t source_idx,
- size_t length_bytes) {
+void PrefixedEntryRingBufferMulti::RawRead(byte* destination,
+ size_t source_idx,
+ size_t length_bytes) {
// Read the pre-wrap bytes.
size_t bytes_until_wrap = buffer_bytes_ - source_idx;
size_t bytes_to_copy = std::min(length_bytes, bytes_until_wrap);
@@ -262,7 +381,8 @@
}
}
-size_t PrefixedEntryRingBuffer::IncrementIndex(size_t index, size_t count) {
+size_t PrefixedEntryRingBufferMulti::IncrementIndex(size_t index,
+ size_t count) {
// Note: This doesn't use modulus (%) since the branch is cheaper, and we
// guarantee that count will never be greater than buffer_bytes_.
index += count;