diff --git a/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java b/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java index 22b7b509..b5df2637 100644 --- a/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java +++ b/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java @@ -590,7 +590,7 @@ private void doAppend() throws Exception { private void sendBatchAppendEntryRequest() throws Exception { batchAppendEntryRequest.setCommitIndex(dLedgerStore.getCommittedIndex()); CompletableFuture responseFuture = dLedgerRpcService.push(batchAppendEntryRequest); - batchPendingMap.put(batchAppendEntryRequest.getLastEntryIndex(), new Pair<>(System.currentTimeMillis(), batchAppendEntryRequest.getCount())); + batchPendingMap.put(batchAppendEntryRequest.getFirstEntryIndex(), new Pair<>(System.currentTimeMillis(), batchAppendEntryRequest.getCount())); responseFuture.whenComplete((x, ex) -> { try { PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN); @@ -598,7 +598,7 @@ private void sendBatchAppendEntryRequest() throws Exception { switch (responseCode) { case SUCCESS: batchPendingMap.remove(x.getIndex()); - updatePeerWaterMark(x.getTerm(), peerId, x.getIndex()); + updatePeerWaterMark(x.getTerm(), peerId, x.getIndex() + x.getCount() - 1); break; case INCONSISTENT_STATE: logger.info("[Push-{}]Get INCONSISTENT_STATE when batch push index={} term={}", peerId, x.getIndex(), x.getTerm()); @@ -891,7 +891,8 @@ private PushEntryResponse buildResponse(PushEntryRequest request, int code) { response.setCode(code); response.setTerm(request.getTerm()); if (request.getType() != PushEntryRequest.Type.COMMIT) { - response.setIndex(request.getLastEntryIndex()); + response.setIndex(request.getFirstEntryIndex()); + response.setCount(request.getCount()); } response.setBeginIndex(dLedgerStore.getLedgerBeginIndex()); response.setEndIndex(dLedgerStore.getLedgerEndIndex()); diff --git a/src/main/java/io/openmessaging/storage/dledger/protocol/PushEntryRequest.java b/src/main/java/io/openmessaging/storage/dledger/protocol/PushEntryRequest.java index f67db3ee..75e12f5f 100644 --- a/src/main/java/io/openmessaging/storage/dledger/protocol/PushEntryRequest.java +++ b/src/main/java/io/openmessaging/storage/dledger/protocol/PushEntryRequest.java @@ -84,7 +84,13 @@ public long getLastEntryIndex() { } public int getCount() { - return batchEntry.size(); + if (!batchEntry.isEmpty()) { + return batchEntry.size(); + } else if (entry != null) { + return 1; + } else { + return 0; + } } public long getTotalSize() { diff --git a/src/main/java/io/openmessaging/storage/dledger/protocol/PushEntryResponse.java b/src/main/java/io/openmessaging/storage/dledger/protocol/PushEntryResponse.java index 831e3b8b..cef3a0a4 100644 --- a/src/main/java/io/openmessaging/storage/dledger/protocol/PushEntryResponse.java +++ b/src/main/java/io/openmessaging/storage/dledger/protocol/PushEntryResponse.java @@ -21,6 +21,7 @@ public class PushEntryResponse extends RequestOrResponse { private long beginIndex; private long endIndex; + private int count; public Long getIndex() { return index; @@ -45,4 +46,12 @@ public long getEndIndex() { public void setEndIndex(long endIndex) { this.endIndex = endIndex; } + + public int getCount() { + return count; + } + + public void setCount(int count) { + this.count = count; + } }