Skip to content

Commit

Permalink
PipeConsensus: Avoid replicate block && Avoid transfer error when con…
Browse files Browse the repository at this point in the history
…nector is closed. (#13146)
  • Loading branch information
Pengzna authored Aug 13, 2024
1 parent e3582cd commit b8426f5
Showing 1 changed file with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ private boolean addEvent2Buffer(EnrichedEvent event) {
event.getCommitId(),
event);
}
// Special judge to avoid transfer stuck when re-transfer events that will not be put in
// retryQueue.
if (transferBuffer.contains(event)) {
return true;
}
long currentTime = System.nanoTime();
boolean result =
transferBuffer.offer(
Expand Down Expand Up @@ -203,6 +208,13 @@ public synchronized void removeEventFromBuffer(EnrichedEvent event) {
transferBuffer.size(),
IOTDB_CONFIG.getPipeConsensusPipelineSize());
}
if (transferBuffer.isEmpty()) {
LOGGER.info(
"PipeConsensus-ConsensusGroup-{}: try to remove event-{} after pipeConsensusAsyncConnector being closed. Ignore it.",
consensusGroupId,
event);
return;
}
Iterator<EnrichedEvent> iterator = transferBuffer.iterator();
EnrichedEvent current = iterator.next();
while (!current.equalsInPipeConsensus(event) && iterator.hasNext()) {
Expand Down Expand Up @@ -458,7 +470,7 @@ private void syncTransferQueuedEventsIfNecessary() throws Exception {
polledEvent);
}
}
if (polledEvent != null && LOGGER.isDebugEnabled()) {
if (polledEvent != null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Polled event {} from retry queue.", polledEvent);
}
Expand Down

0 comments on commit b8426f5

Please sign in to comment.