Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tvlist feat new #14616

Open
wants to merge 56 commits into
base: force_ci/split_chunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
ef5de53
null bitmap for int tvlist
shizy818 Nov 27, 2024
7a15073
update min/max timestamp and sequential part of tvlist during insert
shizy818 Nov 30, 2024
c28fe3f
mutable & immutable tvlists in writable memchunk
shizy818 Dec 1, 2024
6df2e10
copy-on-write array list
shizy818 Dec 1, 2024
435d5da
review comments part 1
shizy818 Dec 3, 2024
d5fad22
fix unit test errors
shizy818 Dec 3, 2024
402c869
review comments part 2
shizy818 Dec 4, 2024
5aaa59b
push down global time filter
shizy818 Dec 5, 2024
cfc9e74
fix MemPageReaderTest case
shizy818 Dec 5, 2024
cd3f7d9
fix memory page offsets error
shizy818 Dec 5, 2024
e3d2d5e
synchronized sort & MergeSortTvListIterator bug
shizy818 Dec 6, 2024
73e9f83
tvlist_sort_threshold config property
shizy818 Dec 7, 2024
0545ff1
bug fix:
shizy818 Dec 11, 2024
2fcdcdb
optimize TVListIterator & MergeSortTvListIterator
shizy818 Dec 11, 2024
45db409
retrofit encode when tvlist_sort_threshold is zero
shizy818 Dec 13, 2024
9411fea
delay sort & statistic generation to query execution
shizy818 Dec 11, 2024
e3346d3
fix: skip deleted data during encode
shizy818 Dec 13, 2024
8e0f8e5
aligned time series part
shizy818 Dec 16, 2024
6eedcf6
fix: MemAlignedChunkReader page offset
shizy818 Dec 18, 2024
0a9c4ed
performance issue:
shizy818 Dec 17, 2024
9b0a6f8
fix: memory chunk reader may read more points than expected in one page
shizy818 Dec 23, 2024
23bf1f7
update chunk & page statistic for aligend memchunk by column
shizy818 Dec 21, 2024
298e16c
revert: getAlignedValueForQuery
shizy818 Dec 24, 2024
359929d
fix: * CopyOnWriteArrayList for AlignedTVList bitmaps
shizy818 Dec 25, 2024
6817370
refactor: Tim/Quick/Backward TVList
shizy818 Dec 25, 2024
a8beea5
refactor: synchronized tvlist method: sort, putXXX
shizy818 Dec 25, 2024
6369293
refactor: change list to array in AlignedTVList iterator
shizy818 Dec 25, 2024
a80bcc0
revert: remove CopyOnWriteArrayList
shizy818 Dec 26, 2024
3e0b904
refactor: clone MergeSort iterator from ReadOnlyChunk
shizy818 Dec 27, 2024
ca08605
fix: clone working tvlist during flush if there is query on it
shizy818 Dec 28, 2024
f23a72b
fix: writable mem chunk flush conditions
shizy818 Dec 30, 2024
8c7db55
refactor: add annotation and variable/function rename
shizy818 Dec 31, 2024
086f7be
merge force_ci/split_chunk
shizy818 Jan 1, 2025
36aa04d
fix: * remove delete method in BinaryTVList
shizy818 Jan 2, 2025
809ebd5
fix: remove getSortedTvListForQuery in SeriesRegionScan
shizy818 Jan 2, 2025
ab17aea
fix: TsFileProcessorTest unit test
shizy818 Jan 2, 2025
044d1ab
fix: IoTDBNullIdQueryIT.noMeasurementColumnsSelectTest
shizy818 Jan 2, 2025
1d2a840
fix: delete column of aligned time series
shizy818 Jan 5, 2025
93b9eeb
fix: aligned timeseries encode bug
shizy818 Jan 5, 2025
46775f0
fix: IoTDBGroupByNaturalMonthIT
shizy818 Jan 5, 2025
3501b8f
remove avgSeriesPointNumberThreshold setting
shizy818 Jan 5, 2025
062adc7
fix: IoTDBDeleteAlignedTimeseriesIT & AlignedTVListTest
shizy818 Jan 6, 2025
7c2c11d
fix: Copy globalTimeFilter due to GroupByMonthFilter
shizy818 Jan 6, 2025
edf7952
reset tmpLength for backward sort
shizy818 Jan 6, 2025
d6bf701
* fix TVList clear
shizy818 Jan 10, 2025
89ad4d5
hot-load TVLIST_SORT_THRESHOLD
shizy818 Jan 10, 2025
a3feeee
fix: isNullValue caller
shizy818 Jan 10, 2025
cb7e13a
fix unit test
shizy818 Jan 11, 2025
1dc4192
refactor: abstract prepareTvListMapForQuery method
shizy818 Jan 11, 2025
bab27fd
refactor: clear/clone/expand indices and bitmap
shizy818 Jan 12, 2025
8f78b21
merge sort using min heap
shizy818 Jan 12, 2025
11f376c
fix: WritableMemChunk deserialize
shizy818 Jan 13, 2025
736e596
feat: add index mem cost for TVList
shizy818 Jan 14, 2025
7fe1f85
fix: hot-load tvlist_sort_threshold setting
shizy818 Jan 14, 2025
b09904e
merge force_ci/split_chunk
shizy818 Jan 22, 2025
0900011
remove needless line in property template
shizy818 Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,12 @@ public class IoTDBConfig {
/** The sort algorithm used in TVList */
private TVListSortAlgorithm tvListSortAlgorithm = TVListSortAlgorithm.TIM;

/**
* the threshold when working TVList is sorted and added into immutable TVList list in the
* writable memtable
*/
private int tvListSortThreshold = 0;

/** Enable inner space compaction for sequence files */
private volatile boolean enableSeqSpaceCompaction = true;

Expand Down Expand Up @@ -2306,6 +2312,14 @@ public void setTvListSortAlgorithm(TVListSortAlgorithm tvListSortAlgorithm) {
this.tvListSortAlgorithm = tvListSortAlgorithm;
}

public int getTvListSortThreshold() {
return tvListSortThreshold;
}

public void setTVListSortThreshold(int tvListSortThreshold) {
this.tvListSortThreshold = tvListSortThreshold;
}

public boolean isRpcThriftCompressionEnable() {
return rpcThriftCompressionEnable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,11 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException
properties.getProperty(
"tvlist_sort_algorithm", conf.getTvListSortAlgorithm().toString())));

conf.setTVListSortThreshold(
Integer.parseInt(
properties.getProperty(
"tvlist_sort_threshold", Integer.toString(conf.getTvListSortThreshold()))));

conf.setCheckPeriodWhenInsertBlocked(
Integer.parseInt(
properties.getProperty(
Expand Down Expand Up @@ -2059,6 +2064,13 @@ public synchronized void loadHotModifiedProps(TrimProperties properties)
loadQuerySampleThroughput(properties);
// update trusted_uri_pattern
loadTrustedUriPattern(properties);

// tvlist_sort_threshold
conf.setTVListSortThreshold(
Integer.parseInt(
properties.getProperty(
"tvlist_sort_threshold",
ConfigurationFileUtils.getConfigurationDefaultValue("tvlist_sort_threshold"))));
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceType;
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;

import org.apache.tsfile.file.metadata.IDeviceID;
Expand Down Expand Up @@ -649,6 +650,40 @@ public void releaseResourceWhenAllDriversAreClosed() {
releaseResource();
}

/**
* It checks all referenced TVList by the query: 1. If current is not the owner, just remove
* itself from query context list 2. If current query is the owner and no other query use it now,
* release the TVList 3. If current query is the owner and other queries still use it, set the
* next query as owner
*/
private void releaseTVListOwnedByQuery() {
for (TVList tvList : tvListSet) {
tvList.lockQueryList();
List<QueryContext> queryContextList = tvList.getQueryContextList();
try {
queryContextList.remove(this);
if (tvList.getOwnerQuery() == this) {
if (queryContextList.isEmpty()) {
LOGGER.debug(
"TVList {} is released by the query, FragmentInstance Id is {}",
tvList,
this.getId());
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
tvList.clear();
} else {
LOGGER.debug(
"TVList {} is now owned by another query, FragmentInstance Id is {}",
tvList,
((FragmentInstanceContext) queryContextList.get(0)).getId());
tvList.setOwnerQuery(queryContextList.get(0));
}
}
} finally {
tvList.unlockQueryList();
}
}
}

/**
* All file paths used by this fragment instance must be cleared and thus the usage reference must
* be decreased.
Expand All @@ -669,6 +704,9 @@ public synchronized void releaseResource() {
unClosedFilePaths = null;
}

// release TVList/AlignedTVList owned by current query
releaseTVListOwnedByQuery();

dataRegion = null;
globalTimeFilter = null;
sharedQueryDataSource = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import org.apache.iotdb.db.utils.ModificationUtils;
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory.ModsSerializer;
import org.apache.iotdb.db.utils.datastructure.TVList;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -71,6 +73,9 @@ public class QueryContext {

private final Set<TsFileID> nonExistentModFiles = new CopyOnWriteArraySet<>();

// referenced TVLists for the query
protected final Set<TVList> tvListSet = new HashSet<>();

public QueryContext() {}

public QueryContext(long queryId) {
Expand Down Expand Up @@ -214,4 +219,8 @@ public boolean isIgnoreAllNullRows() {
public void setIgnoreAllNullRows(boolean ignoreAllNullRows) {
this.ignoreAllNullRows = ignoreAllNullRows;
}

public void addTVListToSet(Map<TVList, Integer> tvListMap) {
tvListSet.addAll(tvListMap.keySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.iotdb.commons.path.IFullPath;
import org.apache.iotdb.commons.path.NonAlignedFullPath;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.AlignedReadOnlyMemChunk;
import org.apache.iotdb.db.storageengine.dataregion.memtable.AlignedWritableMemChunk;
import org.apache.iotdb.db.storageengine.dataregion.memtable.AlignedWritableMemChunkGroup;
Expand All @@ -48,15 +50,19 @@
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.VectorMeasurementSchema;
import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -67,6 +73,7 @@
* MeasurementPath have different implementations, and the default PartialPath should not use it.
*/
public abstract class ResourceByPathUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(ResourceByPathUtils.class);

public static ResourceByPathUtils getResourceInstance(IFullPath path) {
if (path instanceof AlignedFullPath) {
Expand All @@ -85,14 +92,118 @@ public abstract ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
QueryContext context,
IMemTable memTable,
List<Pair<ModEntry, IMemTable>> modsToMemtable,
long timeLowerBound)
long timeLowerBound,
Filter globalTimeFilter)
throws QueryProcessException, IOException;

public abstract List<IChunkMetadata> getVisibleMetadataListFromWriter(
RestorableTsFileIOWriter writer,
TsFileResource tsFileResource,
QueryContext context,
long timeLowerBound);

/**
* Prepare the TVList references for the query. We remember TVLists' row count here and determine
* whether the TVLists needs sorting later during operator execution based on it. It need not
* protect sorted list. Sorted list is changed in the handover process of inserting, which holds
* the data region write lock. At this moment, query thread holds the data region read lock.
*
* @param context query context
* @param memChunk writable memchunk
* @param isWorkMemTable in working or flushing memtable
* @param globalTimeFilter global time filter
* @return Map<TVList, Integer>
*/
protected Map<TVList, Integer> prepareTvListMapForQuery(
QueryContext context,
IWritableMemChunk memChunk,
boolean isWorkMemTable,
Filter globalTimeFilter) {
// should copy globalTimeFilter because GroupByMonthFilter is stateful
Filter copyTimeFilter = null;
if (globalTimeFilter != null) {
copyTimeFilter = globalTimeFilter.copy();
}

Map<TVList, Integer> tvListQueryMap = new LinkedHashMap<>();
// immutable sorted lists
for (TVList tvList : memChunk.getSortedList()) {
if (copyTimeFilter != null
&& !copyTimeFilter.satisfyStartEndTime(tvList.getMinTime(), tvList.getMaxTime())) {
continue;
}
tvList.lockQueryList();
try {
LOGGER.debug(
"Flushing/Working MemTable - add current query context to immutable TVList's query list");
tvList.getQueryContextList().add(context);
tvListQueryMap.put(tvList, tvList.rowCount());
} finally {
tvList.unlockQueryList();
}
}

// mutable tvlist
TVList list = memChunk.getWorkingTVList();
TVList cloneList = null;
list.lockQueryList();
try {
if (copyTimeFilter != null
&& !copyTimeFilter.satisfyStartEndTime(list.getMinTime(), list.getMaxTime())) {
return tvListQueryMap;
}

if (!isWorkMemTable) {
LOGGER.debug(
"Flushing MemTable - add current query context to mutable TVList's query list");
list.getQueryContextList().add(context);
tvListQueryMap.put(list, list.rowCount());
} else {
if (list.isSorted() || list.getQueryContextList().isEmpty()) {
LOGGER.debug(
"Working MemTable - add current query context to mutable TVList's query list when it's sorted or no other query on it");
list.getQueryContextList().add(context);
tvListQueryMap.put(list, list.rowCount());
} else {
/*
* +----------------------+
* | MemTable |
* | |
* | +------------+ | +-----------------+
* | | TVList |<---+--+ +---+ Previous Query |
* | +-----^------+ | | | +-----------------+
* | | | | |
* +----------+-----------+ | | +----------------+
* | Clone +---+---+ Current Query |
* +-----+------+ | +----------------+
* | TVList | <---------+
* +------------+
*/
LOGGER.debug(
"Working MemTable - clone mutable TVList and replace old TVList in working MemTable");
QueryContext firstQuery = list.getQueryContextList().get(0);
// reserve query memory
if (firstQuery instanceof FragmentInstanceContext) {
MemoryReservationManager memoryReservationManager =
((FragmentInstanceContext) firstQuery).getMemoryReservationContext();
memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize());
}
list.setOwnerQuery(firstQuery);

// clone TVList
cloneList = list.clone();
cloneList.getQueryContextList().add(context);
tvListQueryMap.put(cloneList, cloneList.rowCount());
}
}
} finally {
list.unlockQueryList();
}
if (cloneList != null) {
memChunk.setWorkingTVList(cloneList);
}
return tvListQueryMap;
}
}

class AlignedResourceByPathUtils extends ResourceByPathUtils {
Expand Down Expand Up @@ -149,6 +260,8 @@ public AlignedTimeSeriesMetadata generateTimeSeriesMetadata(

for (ReadOnlyMemChunk memChunk : readOnlyMemChunk) {
if (!memChunk.isEmpty()) {
memChunk.sortTvLists();
memChunk.initChunkMetaFromTvLists();
AlignedChunkMetadata alignedChunkMetadata =
(AlignedChunkMetadata) memChunk.getChunkMetaData();
timeStatistics.mergeStatistics(alignedChunkMetadata.getTimeChunkMetadata().getStatistics());
Expand Down Expand Up @@ -182,7 +295,8 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
QueryContext context,
IMemTable memTable,
List<Pair<ModEntry, IMemTable>> modsToMemtable,
long timeLowerBound)
long timeLowerBound,
Filter globalTimeFilter)
throws QueryProcessException {
Map<IDeviceID, IWritableMemChunkGroup> memTableMap = memTable.getMemTableMap();
IDeviceID deviceID = alignedFullPath.getDeviceId();
Expand All @@ -207,10 +321,15 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
}
}

// get sorted tv list is synchronized so different query can get right sorted list reference
TVList alignedTvListCopy =
alignedMemChunk.getSortedTvListForQuery(
alignedFullPath.getSchemaList(), context.isIgnoreAllNullRows());
// prepare AlignedTVList for query. It should clone TVList if necessary.
Map<TVList, Integer> alignedTvListQueryMap =
prepareTvListMapForQuery(
context, alignedMemChunk, modsToMemtable == null, globalTimeFilter);

// column index list for the query
List<Integer> columnIndexList =
alignedMemChunk.buildColumnIndexList(alignedFullPath.getSchemaList());

List<TimeRange> timeColumnDeletion = null;
List<List<TimeRange>> valueColumnsDeletionList = null;
if (modsToMemtable != null) {
Expand All @@ -232,8 +351,9 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
}
return new AlignedReadOnlyMemChunk(
context,
columnIndexList,
getMeasurementSchema(),
alignedTvListCopy,
alignedTvListQueryMap,
timeColumnDeletion,
valueColumnsDeletionList);
}
Expand Down Expand Up @@ -352,6 +472,8 @@ public ITimeSeriesMetadata generateTimeSeriesMetadata(

for (ReadOnlyMemChunk memChunk : readOnlyMemChunk) {
if (!memChunk.isEmpty()) {
memChunk.sortTvLists();
memChunk.initChunkMetaFromTvLists();
seriesStatistics.mergeStatistics(memChunk.getChunkMetaData().getStatistics());
}
}
Expand All @@ -365,7 +487,8 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
QueryContext context,
IMemTable memTable,
List<Pair<ModEntry, IMemTable>> modsToMemtable,
long timeLowerBound)
long timeLowerBound,
Filter globalTimeFilter)
throws QueryProcessException, IOException {
Map<IDeviceID, IWritableMemChunkGroup> memTableMap = memTable.getMemTableMap();
IDeviceID deviceID = fullPath.getDeviceId();
Expand All @@ -376,8 +499,9 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
}
IWritableMemChunk memChunk =
memTableMap.get(deviceID).getMemChunkMap().get(fullPath.getMeasurement());
// get sorted tv list is synchronized so different query can get right sorted list reference
TVList chunkCopy = memChunk.getSortedTvListForQuery();
// prepare TVList for query. It should clone TVList if necessary.
Map<TVList, Integer> tvListQueryMap =
prepareTvListMapForQuery(context, memChunk, modsToMemtable == null, globalTimeFilter);
List<TimeRange> deletionList = null;
if (modsToMemtable != null) {
deletionList =
Expand All @@ -393,7 +517,7 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
fullPath.getMeasurement(),
fullPath.getMeasurementSchema().getType(),
fullPath.getMeasurementSchema().getEncodingType(),
chunkCopy,
tvListQueryMap,
fullPath.getMeasurementSchema().getProps(),
deletionList);
}
Expand Down
Loading
Loading