Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
quic: add backpressure notification to dataqueue
  • Loading branch information
jasnell committed Dec 9, 2023
commit 74eb03df56f6874fc3adfc0d5163a5298485afd8
35 changes: 35 additions & 0 deletions src/dataqueue/queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,28 @@ class DataQueueImpl final : public DataQueue,
"entries", entries_, "std::vector<std::unique_ptr<Entry>>");
}

void addBackpressureListener(BackpressureListener* listener) override {
if (idempotent_) return;
DCHECK_NOT_NULL(listener);
backpressure_listeners_.insert(listener);
}

void removeBackpressureListener(BackpressureListener* listener) override {
if (idempotent_) return;
DCHECK_NOT_NULL(listener);
backpressure_listeners_.erase(listener);
}

void NotifyBackpressure(size_t amount) {
if (idempotent_) return;
for (auto listener : backpressure_listeners_)
listener->EntryRead(amount);
}

bool has_backpressure_listeners() const {
Comment thread
anonrig marked this conversation as resolved.
Outdated
return !backpressure_listeners_.empty();
}

std::shared_ptr<Reader> get_reader() override;
SET_MEMORY_INFO_NAME(DataQueue)
SET_SELF_SIZE(DataQueueImpl)
Expand All @@ -173,6 +195,8 @@ class DataQueueImpl final : public DataQueue,
std::optional<uint64_t> capped_size_ = std::nullopt;
bool locked_to_reader_ = false;

std::set<BackpressureListener*> backpressure_listeners_;
Comment thread
jasnell marked this conversation as resolved.
Outdated

friend class DataQueue;
friend class IdempotentDataQueueReader;
friend class NonIdempotentDataQueueReader;
Expand Down Expand Up @@ -433,6 +457,17 @@ class NonIdempotentDataQueueReader final
return;
}

// If there is a backpressure listener, lets report on how much data was
// actually read.
if (data_queue_->has_backpressure_listeners()) {
// How much did we actually read?
size_t read = 0;
for (uint64_t n = 0; n < count; n++) {
read += vecs[n].len;
Comment thread
anonrig marked this conversation as resolved.
}
data_queue_->NotifyBackpressure(read);
}

// Now that we have updated this readers state, we can forward
// everything on to the outer next.
std::move(next)(status, vecs, count, std::move(done));
Expand Down
12 changes: 12 additions & 0 deletions src/dataqueue/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ class DataQueue : public MemoryRetainer {
using Done = bob::Done;
};

// A BackpressureListener can be used to receive notifications
// when a non-idempotent DataQueue releases entries as they
// are consumed.
class BackpressureListener {
public:
virtual void EntryRead(size_t amount) = 0;
};

// A DataQueue::Entry represents a logical chunk of data in the queue.
// The entry may or may not represent memory-resident data. It may
// or may not be consumable more than once.
Expand Down Expand Up @@ -285,6 +293,10 @@ class DataQueue : public MemoryRetainer {
// been set, maybeCapRemaining() will return std::nullopt.
virtual std::optional<uint64_t> maybeCapRemaining() const = 0;

// BackpressureListeners only work on non-idempotent DataQueues.
virtual void addBackpressureListener(BackpressureListener* listener) = 0;
virtual void removeBackpressureListener(BackpressureListener* listener) = 0;

static void Initialize(Environment* env, v8::Local<v8::Object> target);
static void RegisterExternalReferences(ExternalReferenceRegistry* registry);
};
Expand Down