diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java index 83a7c3800cce..5b52707a36d7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java @@ -81,8 +81,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; -// TODO: 如:1,1 1,2 1,3 1,4 1,5 / 1,6 1,7 1,8(follower 得想办法知道 leader -// 是否发满了/前置请求是否发完了):发送端等待事件超时后尝试握手 public class PipeConsensusReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(PipeConsensusReceiver.class); private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); @@ -1051,19 +1049,19 @@ public RequestExecutor() { } private void onSuccess(long nextSyncedCommitIndex) { - LOGGER.info("Debug only: process no.{} event successfully!", nextSyncedCommitIndex); + LOGGER.info( + "PipeConsensus-ConsensusGroupId-{}:: process no.{} event successfully!", + consensusGroupId.getId(), + nextSyncedCommitIndex); reqBuffer.pollFirst(); onSyncedCommitIndex = nextSyncedCommitIndex; } private TPipeConsensusTransferResp onRequest( final TPipeConsensusTransferReq req, final boolean isTransferTsFilePiece) { - LOGGER.info( - "Debug only: no.{} event try to acquire lock", req.getCommitId().getCommitIndex()); lock.lock(); try { WrappedRequest wrappedReq = new WrappedRequest(req); - LOGGER.info("Debug only: start process no.{} event", wrappedReq.getCommitIndex()); // if a req is deprecated, we will discard it // This case may happen in this scenario: leader has transferred {1,2} and is intending to // transfer {3, 4, 5, 6}. And in one moment, follower has received {4, 5, 6}, {3} is still @@ -1138,18 +1136,40 @@ private TPipeConsensusTransferResp onRequest( // should wait until reqBuffer is full, which indicates the receiver has received all // the requests from the connector without duplication or leakage. try { - LOGGER.info( - "Debug only: no.{} event waiting on the lock...", - req.getCommitId().getCommitIndex()); - condition.await( - COMMON_CONFIG.getPipeConsensusReceiverMaxWaitingTimeForEventsInMs(), - TimeUnit.MILLISECONDS); + boolean timeout = + !condition.await( + COMMON_CONFIG.getPipeConsensusReceiverMaxWaitingTimeForEventsInMs() + * COMMON_CONFIG.getPipeConsensusEventBufferSize(), + TimeUnit.MILLISECONDS); + + // If the buffer is not full after waiting timeout, we suppose that the sender will + // not send any more events at this time, that is, the sender has sent all events. At + // this point we apply the event at reqBuffer's peek + if (timeout + && reqBuffer.size() < COMMON_CONFIG.getPipeConsensusEventBufferSize() + && reqBuffer.first() != null + && reqBuffer.first().equals(wrappedReq)) { + TPipeConsensusTransferResp resp = loadEvent(req); + + if (resp != null + && resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + onSuccess(wrappedReq.getCommitIndex()); + // signal all other reqs that may wait for this event + condition.signalAll(); + } + return resp; + } } catch (InterruptedException e) { LOGGER.warn( "current waiting is interrupted. onSyncedCommitIndex: {}. Exception: ", wrappedReq.getCommitIndex(), e); Thread.currentThread().interrupt(); + // Avoid infinite loop when RPC thread is killed by OS + return new TPipeConsensusTransferResp( + RpcUtils.getStatus( + TSStatusCode.SHUT_DOWN_ERROR, + "RPC processor is interrupted by shutdown hook when wait on condition!")); } } }