diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionDiskTSMIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionDiskTSMIterator.java new file mode 100644 index 000000000000..b90769dba37f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionDiskTSMIterator.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.io; + +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; + +import org.apache.tsfile.file.metadata.ChunkGroupMetadata; +import org.apache.tsfile.write.writer.tsmiterator.DiskTSMIterator; + +import java.io.File; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +public class CompactionDiskTSMIterator extends DiskTSMIterator { + + public CompactionDiskTSMIterator( + CompactionType compactionType, + File cmtFile, + List chunkGroupMetadataList, + LinkedList endPosForEachDevice) + throws IOException { + super(cmtFile, chunkGroupMetadataList, endPosForEachDevice); + this.input = new CompactionTsFileInput(compactionType, this.input); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java index b13475e4fe67..e51f26da14cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java @@ -19,6 +19,11 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.io; +import org.apache.iotdb.db.service.metrics.CompactionMetrics; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionIoDataType; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; + import org.apache.tsfile.exception.StopReadTsFileByInterruptException; import org.apache.tsfile.read.reader.TsFileInput; @@ -29,10 +34,33 @@ public class CompactionTsFileInput implements TsFileInput { private final TsFileInput tsFileInput; - public CompactionTsFileInput(TsFileInput tsFileInput) { + private long metadataOffset = -1; + + /** The type of compaction running. */ + private final CompactionType compactionType; + + /** A flag that indicates if an aligned series is being read. */ + private volatile boolean readingAlignedSeries = false; + + public CompactionTsFileInput(CompactionType compactionType, TsFileInput tsFileInput) { + this.compactionType = compactionType; this.tsFileInput = tsFileInput; } + public void setMetadataOffset(long metadataOffset) { + this.metadataOffset = metadataOffset; + } + + /** Marks the start of reading an aligned series. */ + public void markStartOfAlignedSeries() { + readingAlignedSeries = true; + } + + /** Marks the end of reading an aligned series. */ + public void markEndOfAlignedSeries() { + readingAlignedSeries = false; + } + @Override public long size() throws IOException { try { @@ -71,7 +99,9 @@ public TsFileInput position(long newPosition) throws IOException { @Override public int read(ByteBuffer dst) throws IOException { + acquireReadDataSizeWithCompactionReadRateLimiter(dst.remaining()); int readSize = tsFileInput.read(dst); + updateMetrics(position(), readSize); if (Thread.currentThread().isInterrupted()) { throw new StopReadTsFileByInterruptException(); } @@ -80,7 +110,9 @@ public int read(ByteBuffer dst) throws IOException { @Override public int read(ByteBuffer dst, long position) throws IOException { + acquireReadDataSizeWithCompactionReadRateLimiter(dst.remaining()); int readSize = tsFileInput.read(dst, position); + updateMetrics(position, readSize); if (Thread.currentThread().isInterrupted()) { throw new StopReadTsFileByInterruptException(); } @@ -89,7 +121,7 @@ public int read(ByteBuffer dst, long position) throws IOException { @Override public InputStream wrapAsInputStream() throws IOException { - return tsFileInput.wrapAsInputStream(); + return new CompactionTsFileInputStreamWrapper(tsFileInput.wrapAsInputStream()); } @Override @@ -101,4 +133,90 @@ public void close() throws IOException { public String getFilePath() { return tsFileInput.getFilePath(); } + + private void acquireReadDataSizeWithCompactionReadRateLimiter(int readDataSize) { + CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1); + CompactionTaskManager.getInstance().getCompactionReadRateLimiter().acquire(readDataSize); + } + + private void updateMetrics(long position, long totalSize) { + if (position >= metadataOffset) { + CompactionMetrics.getInstance() + .recordReadInfo(compactionType, CompactionIoDataType.METADATA, totalSize); + } else { + CompactionMetrics.getInstance() + .recordReadInfo( + compactionType, + readingAlignedSeries + ? CompactionIoDataType.ALIGNED + : CompactionIoDataType.NOT_ALIGNED, + totalSize); + } + } + + private class CompactionTsFileInputStreamWrapper extends InputStream { + + private final InputStream inputStream; + + public CompactionTsFileInputStreamWrapper(InputStream inputStream) { + this.inputStream = inputStream; + } + + @Override + public int read() throws IOException { + acquireReadDataSizeWithCompactionReadRateLimiter(1); + long position = position(); + int readSize = inputStream.read(); + updateMetrics(position, readSize); + return readSize; + } + + @Override + public int read(byte[] b) throws IOException { + acquireReadDataSizeWithCompactionReadRateLimiter(b.length); + long position = position(); + int readSize = inputStream.read(b); + updateMetrics(position, readSize); + return readSize; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + acquireReadDataSizeWithCompactionReadRateLimiter(len); + long position = position(); + int readSize = inputStream.read(b, off, len); + updateMetrics(position, readSize); + return readSize; + } + + @Override + public long skip(long n) throws IOException { + return inputStream.skip(n); + } + + @Override + public int available() throws IOException { + return inputStream.available(); + } + + @Override + public void close() throws IOException { + inputStream.close(); + } + + @Override + public synchronized void mark(int readlimit) { + inputStream.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + inputStream.reset(); + } + + @Override + public boolean markSupported() { + return inputStream.markSupported(); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java index bf98611db2c2..7aabb0856433 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java @@ -19,29 +19,22 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.io; -import org.apache.iotdb.db.service.metrics.CompactionMetrics; -import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; -import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionIoDataType; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; import org.apache.tsfile.file.IMetadataIndexEntry; import org.apache.tsfile.file.header.ChunkHeader; -import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.MetadataIndexNode; import org.apache.tsfile.file.metadata.TimeseriesMetadata; import org.apache.tsfile.file.metadata.enums.MetadataIndexNodeType; import org.apache.tsfile.read.TsFileSequenceReader; -import org.apache.tsfile.read.common.Chunk; import org.apache.tsfile.utils.Pair; import java.io.IOException; -import java.io.InputStream; import java.nio.ByteBuffer; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.LongConsumer; /** * This class extends the TsFileSequenceReader class to read and manage TsFile with a focus on @@ -50,14 +43,6 @@ */ public class CompactionTsFileReader extends TsFileSequenceReader { - private long metadataOffset = 0; - - /** The type of compaction running. */ - CompactionType compactionType; - - /** A flag that indicates if an aligned series is being read. */ - private volatile boolean readingAlignedSeries = false; - /** * Constructs a new instance of CompactionTsFileReader. * @@ -67,44 +52,20 @@ public class CompactionTsFileReader extends TsFileSequenceReader { */ public CompactionTsFileReader(String file, CompactionType compactionType) throws IOException { super(file); - this.tsFileInput = new CompactionTsFileInput(tsFileInput); - this.compactionType = compactionType; - this.metadataOffset = readFileMetadata().getMetaOffset(); - } - - @Override - protected ByteBuffer readData(long position, int totalSize, LongConsumer ioSizeRecorder) - throws IOException { - acquireReadDataSizeWithCompactionReadRateLimiter(totalSize); - ByteBuffer buffer = super.readData(position, totalSize, ioSizeRecorder); - if (position >= metadataOffset) { - CompactionMetrics.getInstance() - .recordReadInfo(compactionType, CompactionIoDataType.METADATA, totalSize); - } else { - CompactionMetrics.getInstance() - .recordReadInfo( - compactionType, - readingAlignedSeries - ? CompactionIoDataType.ALIGNED - : CompactionIoDataType.NOT_ALIGNED, - totalSize); - } - return buffer; + CompactionTsFileInput compactionTsFileInput = + new CompactionTsFileInput(compactionType, tsFileInput); + this.tsFileInput = compactionTsFileInput; + compactionTsFileInput.setMetadataOffset(readFileMetadata().getMetaOffset()); } /** Marks the start of reading an aligned series. */ public void markStartOfAlignedSeries() { - readingAlignedSeries = true; + ((CompactionTsFileInput) tsFileInput).markStartOfAlignedSeries(); } /** Marks the end of reading an aligned series. */ public void markEndOfAlignedSeries() { - readingAlignedSeries = false; - } - - @Override - public Chunk readMemChunk(ChunkMetadata metaData) throws IOException { - return super.readMemChunk(metaData); + ((CompactionTsFileInput) tsFileInput).markEndOfAlignedSeries(); } @SuppressWarnings("java:S2177") @@ -112,10 +73,6 @@ public ChunkHeader readChunkHeader(long position) throws IOException { return ChunkHeader.deserializeFrom(tsFileInput, position); } - public InputStream wrapAsInputStream() throws IOException { - return this.tsFileInput.wrapAsInputStream(); - } - public Map>> getTimeseriesMetadataAndOffsetByDevice( MetadataIndexNode measurementNode, @@ -186,11 +143,6 @@ public Map> getTimeseriesMetadataOffsetByDevice( return timeseriesMetadataOffsetMap; } - private void acquireReadDataSizeWithCompactionReadRateLimiter(int readDataSize) { - CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1); - CompactionTaskManager.getInstance().getCompactionReadRateLimiter().acquire(readDataSize); - } - @Override public boolean equals(Object o) { return super.equals(o); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java index 7d89747d2f65..c761b18fc713 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java @@ -36,6 +36,8 @@ import org.apache.tsfile.write.chunk.IChunkWriter; import org.apache.tsfile.write.record.Tablet.ColumnCategory; import org.apache.tsfile.write.writer.TsFileIOWriter; +import org.apache.tsfile.write.writer.tsmiterator.DiskTSMIterator; +import org.apache.tsfile.write.writer.tsmiterator.TSMIterator; import java.io.File; import java.io.IOException; @@ -150,6 +152,13 @@ public void endFile() throws IOException { .recordWriteInfo(type, CompactionIoDataType.METADATA, writtenDataSize); } + @Override + protected TSMIterator getTSMIterator() throws IOException { + return hasChunkMetadataInDisk + ? new CompactionDiskTSMIterator(type, chunkMetadataTempFile, chunkGroupMetadataList, endPosInCMTForDevice) + : TSMIterator.getTSMIteratorInMemory(chunkGroupMetadataList); + } + public boolean isEmptyTargetFile() { return isEmptyTargetFile; }