diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index c849466861..dc25e6e17e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -697,6 +697,7 @@ Map> convertS3Objects(List str >= TimeUnit.MINUTES.toMillis(this.forceSplitObjectPeriod)))); } + @SuppressWarnings("checkstyle:CyclomaticComplexity") void executeCompactionPlans(CommitStreamSetObjectRequest request, List compactionPlans, List s3ObjectMetadata) throws CompletionException { @@ -743,17 +744,34 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List { - uploader.forceUploadStreamSetObject(); - if (ex != null) { - logger.error("Error while uploading compaction objects", ex); - uploader.release().thenAccept(vv -> { - for (CompactedObject compactedObject : compactionPlan.compactedObjects()) { - compactedObject.streamDataBlocks().forEach(StreamDataBlock::release); - } - }).join(); + compactionCf = new CompletableFuture<>(); + CompletableFuture.allOf(cfList.toArray(new CompletableFuture[0])) + .whenComplete((v, uploadException) -> { + if (uploadException != null) { + logger.error("Error while uploading compaction objects", uploadException); } + uploader.forceUploadStreamSetObject().whenComplete((vv, forceUploadException) -> { + if (forceUploadException != null) { + logger.error("Error while force uploading stream set object", uploadException); + } + if (uploadException != null || forceUploadException != null) { + uploader.release().whenComplete((vvv, releaseException) -> { + if (releaseException != null) { + logger.error("Unexpected exception while release uploader"); + } + for (CompactedObject compactedObject : compactionPlan.compactedObjects()) { + compactedObject.streamDataBlocks().forEach(StreamDataBlock::release); + } + if (uploadException != null) { + compactionCf.completeExceptionally(new CompletionException("Uploading failed", uploadException)); + } else { + compactionCf.completeExceptionally(new CompletionException("Force uploading sso failed", forceUploadException)); + } + }); + } else { + compactionCf.complete(null); + } + }); }); } try {