Skip to content

Commit

Permalink
Read of .meta file is not included in the compaction io metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
shuwenwei committed Jan 24, 2025
1 parent 52da698 commit 0bb9901
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.storageengine.dataregion.compaction.io;

import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;

import org.apache.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.tsfile.write.writer.tsmiterator.DiskTSMIterator;

import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

public class CompactionDiskTSMIterator extends DiskTSMIterator {

public CompactionDiskTSMIterator(
CompactionType compactionType,
File cmtFile,
List<ChunkGroupMetadata> chunkGroupMetadataList,
LinkedList<Long> endPosForEachDevice)
throws IOException {
super(cmtFile, chunkGroupMetadataList, endPosForEachDevice);
this.input = new CompactionTsFileInput(compactionType, this.input);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

package org.apache.iotdb.db.storageengine.dataregion.compaction.io;

import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionIoDataType;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;

import org.apache.tsfile.exception.StopReadTsFileByInterruptException;
import org.apache.tsfile.read.reader.TsFileInput;

Expand All @@ -29,10 +34,33 @@
public class CompactionTsFileInput implements TsFileInput {
private final TsFileInput tsFileInput;

public CompactionTsFileInput(TsFileInput tsFileInput) {
private long metadataOffset = -1;

/** The type of compaction running. */
private final CompactionType compactionType;

/** A flag that indicates if an aligned series is being read. */
private volatile boolean readingAlignedSeries = false;

public CompactionTsFileInput(CompactionType compactionType, TsFileInput tsFileInput) {
this.compactionType = compactionType;
this.tsFileInput = tsFileInput;
}

public void setMetadataOffset(long metadataOffset) {
this.metadataOffset = metadataOffset;
}

/** Marks the start of reading an aligned series. */
public void markStartOfAlignedSeries() {
readingAlignedSeries = true;
}

/** Marks the end of reading an aligned series. */
public void markEndOfAlignedSeries() {
readingAlignedSeries = false;
}

@Override
public long size() throws IOException {
try {
Expand Down Expand Up @@ -71,7 +99,9 @@ public TsFileInput position(long newPosition) throws IOException {

@Override
public int read(ByteBuffer dst) throws IOException {
acquireReadDataSizeWithCompactionReadRateLimiter(dst.remaining());
int readSize = tsFileInput.read(dst);
updateMetrics(position(), readSize);
if (Thread.currentThread().isInterrupted()) {
throw new StopReadTsFileByInterruptException();
}
Expand All @@ -80,7 +110,9 @@ public int read(ByteBuffer dst) throws IOException {

@Override
public int read(ByteBuffer dst, long position) throws IOException {
acquireReadDataSizeWithCompactionReadRateLimiter(dst.remaining());
int readSize = tsFileInput.read(dst, position);
updateMetrics(position, readSize);
if (Thread.currentThread().isInterrupted()) {
throw new StopReadTsFileByInterruptException();
}
Expand All @@ -89,7 +121,7 @@ public int read(ByteBuffer dst, long position) throws IOException {

@Override
public InputStream wrapAsInputStream() throws IOException {
return tsFileInput.wrapAsInputStream();
return new CompactionTsFileInputStreamWrapper(tsFileInput.wrapAsInputStream());
}

@Override
Expand All @@ -101,4 +133,90 @@ public void close() throws IOException {
public String getFilePath() {
return tsFileInput.getFilePath();
}

private void acquireReadDataSizeWithCompactionReadRateLimiter(int readDataSize) {
CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1);
CompactionTaskManager.getInstance().getCompactionReadRateLimiter().acquire(readDataSize);
}

private void updateMetrics(long position, long totalSize) {
if (position >= metadataOffset) {
CompactionMetrics.getInstance()
.recordReadInfo(compactionType, CompactionIoDataType.METADATA, totalSize);
} else {
CompactionMetrics.getInstance()
.recordReadInfo(
compactionType,
readingAlignedSeries
? CompactionIoDataType.ALIGNED
: CompactionIoDataType.NOT_ALIGNED,
totalSize);
}
}

private class CompactionTsFileInputStreamWrapper extends InputStream {

private final InputStream inputStream;

public CompactionTsFileInputStreamWrapper(InputStream inputStream) {
this.inputStream = inputStream;
}

@Override
public int read() throws IOException {
acquireReadDataSizeWithCompactionReadRateLimiter(1);
long position = position();
int readSize = inputStream.read();
updateMetrics(position, readSize);
return readSize;
}

@Override
public int read(byte[] b) throws IOException {
acquireReadDataSizeWithCompactionReadRateLimiter(b.length);
long position = position();
int readSize = inputStream.read(b);
updateMetrics(position, readSize);
return readSize;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
acquireReadDataSizeWithCompactionReadRateLimiter(len);
long position = position();
int readSize = inputStream.read(b, off, len);
updateMetrics(position, readSize);
return readSize;
}

@Override
public long skip(long n) throws IOException {
return inputStream.skip(n);
}

@Override
public int available() throws IOException {
return inputStream.available();
}

@Override
public void close() throws IOException {
inputStream.close();
}

@Override
public synchronized void mark(int readlimit) {
inputStream.mark(readlimit);
}

@Override
public synchronized void reset() throws IOException {
inputStream.reset();
}

@Override
public boolean markSupported() {
return inputStream.markSupported();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,22 @@

package org.apache.iotdb.db.storageengine.dataregion.compaction.io;

import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionIoDataType;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;

import org.apache.tsfile.file.IMetadataIndexEntry;
import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.MetadataIndexNode;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.utils.Pair;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.LongConsumer;

/**
* This class extends the TsFileSequenceReader class to read and manage TsFile with a focus on
Expand All @@ -50,14 +43,6 @@
*/
public class CompactionTsFileReader extends TsFileSequenceReader {

private long metadataOffset = 0;

/** The type of compaction running. */
CompactionType compactionType;

/** A flag that indicates if an aligned series is being read. */
private volatile boolean readingAlignedSeries = false;

/**
* Constructs a new instance of CompactionTsFileReader.
*
Expand All @@ -67,55 +52,27 @@ public class CompactionTsFileReader extends TsFileSequenceReader {
*/
public CompactionTsFileReader(String file, CompactionType compactionType) throws IOException {
super(file);
this.tsFileInput = new CompactionTsFileInput(tsFileInput);
this.compactionType = compactionType;
this.metadataOffset = readFileMetadata().getMetaOffset();
}

@Override
protected ByteBuffer readData(long position, int totalSize, LongConsumer ioSizeRecorder)
throws IOException {
acquireReadDataSizeWithCompactionReadRateLimiter(totalSize);
ByteBuffer buffer = super.readData(position, totalSize, ioSizeRecorder);
if (position >= metadataOffset) {
CompactionMetrics.getInstance()
.recordReadInfo(compactionType, CompactionIoDataType.METADATA, totalSize);
} else {
CompactionMetrics.getInstance()
.recordReadInfo(
compactionType,
readingAlignedSeries
? CompactionIoDataType.ALIGNED
: CompactionIoDataType.NOT_ALIGNED,
totalSize);
}
return buffer;
CompactionTsFileInput compactionTsFileInput =
new CompactionTsFileInput(compactionType, tsFileInput);
this.tsFileInput = compactionTsFileInput;
compactionTsFileInput.setMetadataOffset(readFileMetadata().getMetaOffset());
}

/** Marks the start of reading an aligned series. */
public void markStartOfAlignedSeries() {
readingAlignedSeries = true;
((CompactionTsFileInput) tsFileInput).markStartOfAlignedSeries();
}

/** Marks the end of reading an aligned series. */
public void markEndOfAlignedSeries() {
readingAlignedSeries = false;
}

@Override
public Chunk readMemChunk(ChunkMetadata metaData) throws IOException {
return super.readMemChunk(metaData);
((CompactionTsFileInput) tsFileInput).markEndOfAlignedSeries();
}

@SuppressWarnings("java:S2177")
public ChunkHeader readChunkHeader(long position) throws IOException {
return ChunkHeader.deserializeFrom(tsFileInput, position);
}

public InputStream wrapAsInputStream() throws IOException {
return this.tsFileInput.wrapAsInputStream();
}

public Map<String, Pair<TimeseriesMetadata, Pair<Long, Long>>>
getTimeseriesMetadataAndOffsetByDevice(
MetadataIndexNode measurementNode,
Expand Down Expand Up @@ -186,11 +143,6 @@ public Map<String, Pair<Long, Long>> getTimeseriesMetadataOffsetByDevice(
return timeseriesMetadataOffsetMap;
}

private void acquireReadDataSizeWithCompactionReadRateLimiter(int readDataSize) {
CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1);
CompactionTaskManager.getInstance().getCompactionReadRateLimiter().acquire(readDataSize);
}

@Override
public boolean equals(Object o) {
return super.equals(o);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.tsfile.write.chunk.IChunkWriter;
import org.apache.tsfile.write.record.Tablet.ColumnCategory;
import org.apache.tsfile.write.writer.TsFileIOWriter;
import org.apache.tsfile.write.writer.tsmiterator.DiskTSMIterator;
import org.apache.tsfile.write.writer.tsmiterator.TSMIterator;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -150,6 +152,13 @@ public void endFile() throws IOException {
.recordWriteInfo(type, CompactionIoDataType.METADATA, writtenDataSize);
}

@Override
protected TSMIterator getTSMIterator() throws IOException {
return hasChunkMetadataInDisk
? new CompactionDiskTSMIterator(type, chunkMetadataTempFile, chunkGroupMetadataList, endPosInCMTForDevice)
: TSMIterator.getTSMIteratorInMemory(chunkGroupMetadataList);
}

public boolean isEmptyTargetFile() {
return isEmptyTargetFile;
}
Expand Down

0 comments on commit 0bb9901

Please sign in to comment.