Skip to content

Commit

Permalink
add timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
Pengzna committed May 24, 2024
1 parent c919efc commit b6a0dd0
Showing 1 changed file with 32 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

// TODO: 如:1,1 1,2 1,3 1,4 1,5 / 1,6 1,7 1,8(follower 得想办法知道 leader
// 是否发满了/前置请求是否发完了):发送端等待事件超时后尝试握手
public class PipeConsensusReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeConsensusReceiver.class);
private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig();
Expand Down Expand Up @@ -1051,19 +1049,19 @@ public RequestExecutor() {
}

private void onSuccess(long nextSyncedCommitIndex) {
LOGGER.info("Debug only: process no.{} event successfully!", nextSyncedCommitIndex);
LOGGER.info(
"PipeConsensus-ConsensusGroupId-{}:: process no.{} event successfully!",
consensusGroupId.getId(),
nextSyncedCommitIndex);
reqBuffer.pollFirst();
onSyncedCommitIndex = nextSyncedCommitIndex;
}

private TPipeConsensusTransferResp onRequest(
final TPipeConsensusTransferReq req, final boolean isTransferTsFilePiece) {
LOGGER.info(
"Debug only: no.{} event try to acquire lock", req.getCommitId().getCommitIndex());
lock.lock();
try {
WrappedRequest wrappedReq = new WrappedRequest(req);
LOGGER.info("Debug only: start process no.{} event", wrappedReq.getCommitIndex());
// if a req is deprecated, we will discard it
// This case may happen in this scenario: leader has transferred {1,2} and is intending to
// transfer {3, 4, 5, 6}. And in one moment, follower has received {4, 5, 6}, {3} is still
Expand Down Expand Up @@ -1138,18 +1136,40 @@ private TPipeConsensusTransferResp onRequest(
// should wait until reqBuffer is full, which indicates the receiver has received all
// the requests from the connector without duplication or leakage.
try {
LOGGER.info(
"Debug only: no.{} event waiting on the lock...",
req.getCommitId().getCommitIndex());
condition.await(
COMMON_CONFIG.getPipeConsensusReceiverMaxWaitingTimeForEventsInMs(),
TimeUnit.MILLISECONDS);
boolean timeout =
!condition.await(
COMMON_CONFIG.getPipeConsensusReceiverMaxWaitingTimeForEventsInMs()
* COMMON_CONFIG.getPipeConsensusEventBufferSize(),
TimeUnit.MILLISECONDS);

// If the buffer is not full after waiting timeout, we suppose that the sender will
// not send any more events at this time, that is, the sender has sent all events. At
// this point we apply the event at reqBuffer's peek
if (timeout
&& reqBuffer.size() < COMMON_CONFIG.getPipeConsensusEventBufferSize()
&& reqBuffer.first() != null
&& reqBuffer.first().equals(wrappedReq)) {
TPipeConsensusTransferResp resp = loadEvent(req);

if (resp != null
&& resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
onSuccess(wrappedReq.getCommitIndex());
// signal all other reqs that may wait for this event
condition.signalAll();
}
return resp;
}
} catch (InterruptedException e) {
LOGGER.warn(
"current waiting is interrupted. onSyncedCommitIndex: {}. Exception: ",
wrappedReq.getCommitIndex(),
e);
Thread.currentThread().interrupt();
// Avoid infinite loop when RPC thread is killed by OS
return new TPipeConsensusTransferResp(
RpcUtils.getStatus(
TSStatusCode.SHUT_DOWN_ERROR,
"RPC processor is interrupted by shutdown hook when wait on condition!"));
}
}
}
Expand Down

0 comments on commit b6a0dd0

Please sign in to comment.