Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ void Controller::ResetNonPods() {
}
delete _remote_stream_settings;
_thrift_method_name.clear();
_checksum_value.clear();
_after_rpc_resp_fn = nullptr;

CHECK(_unfinished_call == NULL);
Expand Down
38 changes: 29 additions & 9 deletions src/brpc/rdma/rdma_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent
// This is the number of reserved WRs in SQ/RQ for pure ACK.
static const size_t RESERVED_WR_NUM = 3;

static const uint64_t FIXED_ACK_WR_ID = 1;

// magic string RDMA (4B)
// message length (2B)
// hello version (2B)
Expand Down Expand Up @@ -191,6 +193,8 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
, _remote_window_capacity(0)
, _window_size(0)
, _new_rq_wrs(0)
, _pending_signaled_wrs(0)
, _pending_acks(0)
{
if (_sq_size < MIN_QP_SIZE) {
_sq_size = MIN_QP_SIZE;
Expand Down Expand Up @@ -869,12 +873,14 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
}

// Avoid too much send completion event to reduce the CPU overhead
bool signaled = false;
++_sq_unsignaled;
if (_sq_unsignaled >= _local_window_capacity / 4) {
// Refer to:
// http::www.rdmamojo.com/2014/06/30/working-unsignaled-completions/
wr.send_flags |= IBV_SEND_SIGNALED;
_sq_unsignaled = 0;
signaled = true;
}

ibv_send_wr* bad = NULL;
Expand All @@ -889,6 +895,10 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
return -1;
}

if (signaled) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line, I'm not sure if send cqe can generate energy after this atomic operation.

_pending_signaled_wrs.fetch_add(1, butil::memory_order_relaxed);
}

++_sq_current;
if (_sq_current == _sq_size - RESERVED_WR_NUM) {
_sq_current = 0;
Expand Down Expand Up @@ -918,6 +928,7 @@ int RdmaEndpoint::SendImm(uint32_t imm) {

ibv_send_wr wr;
memset(&wr, 0, sizeof(wr));
wr.wr_id = FIXED_ACK_WR_ID;
wr.opcode = IBV_WR_SEND_WITH_IMM;
wr.imm_data = butil::HostToNet32(imm);
wr.send_flags |= IBV_SEND_SOLICITED;
Expand All @@ -936,9 +947,13 @@ int RdmaEndpoint::SendImm(uint32_t imm) {

ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
bool zerocopy = FLAGS_rdma_recv_zerocopy;
uint16_t pending_signaled_wrs = _pending_signaled_wrs.load(butil::memory_order_relaxed);
switch (wc.opcode) {
case IBV_WC_SEND: { // send completion
// Do nothing
if (wc.wr_id == 0) {
Copy link

Copilot AI Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition checks if wc.wr_id == 0, but GenerateWrId() explicitly generates non-zero IDs (comment at line 930 states '0 is an invalid Id'). The signaled WRs in CutFromIOBufList at line 880 don't set wr.wr_id, leaving it as 0 after memset at line 808. This logic appears inverted - it should decrement when wc.wr_id != 0 (for SendImm) or the counter should be incremented for signaled sends in CutFromIOBufList that have wr_id == 0.

Suggested change
if (wc.wr_id == 0) {
if (wc.wr_id != 0) {

Copilot uses AI. Check for mistakes.
pending_signaled_wrs =
_pending_signaled_wrs.fetch_sub(1, butil::memory_order_relaxed) - 1;
}
break;
}
case IBV_WC_RECV: { // recv completion
Expand Down Expand Up @@ -970,14 +985,8 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
}
butil::subtle::MemoryBarrier();

// Update window
uint32_t wnd_thresh = _local_window_capacity / 8;
if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh
|| acks >= wnd_thresh) {
// Do not wake up writing thread right after _window_size > 0.
// Otherwise the writing thread may switch to background too quickly.
_socket->WakeAsEpollOut();
}
// Update acks
_pending_acks += acks;
}
// We must re-post recv WR
if (PostRecv(1, zerocopy) < 0) {
Expand All @@ -995,6 +1004,17 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
<< wc.opcode;
return -1;
}
if (pending_signaled_wrs <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that _window_size may still be larger than the actual sq size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you give an example to describe it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the client sends _local_window_capacity / 4 + 1 requests, it receives an ACK and a send CQE from the server. At this point, _window_size increases by _local_window_capacity / 4 + 1, and sq size increases by _local_window_capacity / 4.

The issue of inconsistency between _window_size and sq size still exists.

const uint32_t wnd_thresh = _local_window_capacity / 8;
auto acks = _window_size.fetch_add(_pending_acks, butil::memory_order_relaxed);
// Do not wake up writing thread right after _window_size > 0.
// Otherwise the writing thread may switch to background too
// quickly.
if (acks >= wnd_thresh || _pending_acks >= wnd_thresh) {
_socket->WakeAsEpollOut();
}
_pending_acks = 0;
}
return 0;
}

Expand Down
4 changes: 4 additions & 0 deletions src/brpc/rdma/rdma_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ friend class brpc::Socket;
butil::atomic<uint16_t> _window_size;
// The number of new WRs posted in the local Recv Queue
butil::atomic<uint16_t> _new_rq_wrs;
// The number of pending signaled send WRs
butil::atomic<uint16_t> _pending_signaled_wrs;
// The number of pending acks
uint16_t _pending_acks;
Copy link

Copilot AI Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _pending_acks member is not thread-safe (plain uint16_t instead of atomic). In polling mode, multiple poller threads can call HandleCompletion concurrently for the same endpoint, leading to race conditions on lines 997, 1016, and 1018 where this variable is read and written without synchronization.

Suggested change
uint16_t _pending_acks;
butil::atomic<uint16_t> _pending_acks;

Copilot uses AI. Check for mistakes.

// butex for inform read events on TCP fd during handshake
butil::atomic<int> *_read_butex;
Expand Down
Loading