Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wrong deviceId after PartialPath.concatAsMeasurementPath #14750

Merged
merged 3 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,11 @@ public DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
String.valueOf(loadTsFileAnalyzeSchemaMemorySizeInBytes));
return this;
}

@Override
public DataNodeConfig setCompactionScheduleInterval(long compactionScheduleInterval) {
properties.setProperty(
"compaction_schedule_interval_in_ms", String.valueOf(compactionScheduleInterval));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,9 @@ public DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
long loadTsFileAnalyzeSchemaMemorySizeInBytes) {
return this;
}

@Override
public DataNodeConfig setCompactionScheduleInterval(long compactionScheduleInterval) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ public interface DataNodeConfig {

DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
long loadTsFileAnalyzeSchemaMemorySizeInBytes);

DataNodeConfig setCompactionScheduleInterval(long compactionScheduleInterval);
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public static void setUpClass() {
.setPartitionInterval(1000)
.setMemtableSizeThreshold(10000);
// Adjust MemTable threshold size to make it flush automatically
EnvFactory.getEnv().getConfig().getDataNodeConfig().setCompactionScheduleInterval(5000);
EnvFactory.getEnv().initClusterEnvironment();
}

Expand Down Expand Up @@ -994,7 +995,13 @@ public void testConcurrentFlushAndSequentialDeletion()
threadPool.submit(
() ->
write(
writtenPointCounter, threadPool, fileNumMax, pointPerFile, deviceNum, testNum));
writtenPointCounter,
threadPool,
fileNumMax,
pointPerFile,
deviceNum,
testNum,
true));
int deletionRange = 150;
int deletionInterval = 1500;
Future<Void> deletionThread =
Expand Down Expand Up @@ -1041,7 +1048,13 @@ public void testConcurrentFlushAndRandomDeletion()
threadPool.submit(
() ->
write(
writtenPointCounter, threadPool, fileNumMax, pointPerFile, deviceNum, testNum));
writtenPointCounter,
threadPool,
fileNumMax,
pointPerFile,
deviceNum,
testNum,
true));
int deletionRange = 100;
int minIntervalToRecord = 1000;
Future<Void> deletionThread =
Expand Down Expand Up @@ -1094,7 +1107,8 @@ public void testConcurrentFlushAndRandomDeletionWithRestart()
fileNumMax,
pointPerFile,
deviceNum,
testNum));
testNum,
true));
int deletionRange = 100;
int minIntervalToRecord = 1000;
Future<Void> deletionThread =
Expand Down Expand Up @@ -1155,7 +1169,8 @@ private Void write(
int fileNumMax,
int pointPerFile,
int deviceNum,
int testNum)
int testNum,
boolean roundRobinDevice)
throws SQLException {

try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Expand All @@ -1165,20 +1180,37 @@ private Void write(
statement.execute("use test");

statement.execute(
"create table if not exists table" + testNum + "(deviceId STRING TAG, s0 INT32 field)");
"create table if not exists table"
+ testNum
+ "(city TAG, deviceId STRING TAG, s0 INT32 field)");

for (int i = 1; i <= fileNumMax; i++) {
for (int j = 0; j < pointPerFile; j++) {
long time = writtenPointCounter.get() + 1;
statement.execute(
String.format(
"INSERT INTO test.table"
+ testNum
+ "(time, deviceId, s0) VALUES(%d,'d"
+ (time % deviceNum)
+ "',%d)",
time,
time));
if (roundRobinDevice) {
statement.execute(
String.format(
"INSERT INTO test.table"
+ testNum
+ "(time, city, deviceId, s0) VALUES(%d, 'bj', 'd"
+ (time % deviceNum)
+ "',%d)",
time,
time));
} else {
for (int d = 0; d < deviceNum; d++) {
statement.execute(
String.format(
"INSERT INTO test.table"
+ testNum
+ "(time, city, deviceId, s0) VALUES(%d, 'bj', 'd"
+ d
+ "',%d)",
time,
time));
}
}

writtenPointCounter.incrementAndGet();
if (Thread.interrupted()) {
return null;
Expand Down Expand Up @@ -1393,6 +1425,144 @@ private Void randomDeletion(
return null;
}

private Void randomDeviceDeletion(
AtomicLong writtenPointCounter,
List<AtomicLong> deviceDeletedPointCounters,
ExecutorService allThreads,
int fileNumMax,
int pointPerFile,
int deletionRange,
int minIntervalToRecord,
int testNum)
throws SQLException, InterruptedException {
// delete random 'deletionRange' points each time
List<List<TimeRange>> allDeviceUndeletedRanges = new ArrayList<>();
for (int i = 0; i < deviceDeletedPointCounters.size(); i++) {
allDeviceUndeletedRanges.add(new ArrayList<>());
}
// pointPerFile * fileNumMax
long deletionEnd = (long) fileNumMax * pointPerFile - 1;
long nextRangeStart = 0;
Random random = new Random();

try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {

statement.execute("create database if not exists test");
statement.execute("use test");
while ((writtenPointCounter.get() < deletionEnd
|| allDeviceUndeletedRanges.stream().anyMatch(l -> !l.isEmpty()))
&& !Thread.interrupted()) {
// record the newly inserted interval if it is long enough
for (int i = 0; i < deviceDeletedPointCounters.size(); i++) {
long currentWrittenTime = writtenPointCounter.get();
List<TimeRange> deviceUndeletedRanges = allDeviceUndeletedRanges.get(i);

if (currentWrittenTime - nextRangeStart >= minIntervalToRecord) {
deviceUndeletedRanges.add(new TimeRange(nextRangeStart, currentWrittenTime));
nextRangeStart = currentWrittenTime + 1;
}
if (deviceUndeletedRanges.isEmpty()) {
Thread.sleep(10);
continue;
}
// pick up a random range
int rangeIndex = random.nextInt(deviceUndeletedRanges.size());
TimeRange timeRange = deviceUndeletedRanges.get(rangeIndex);
// delete a random part in the range
LOGGER.debug("Pick up a range [{}, {}]", timeRange.getMin(), timeRange.getMax());
long rangeDeletionStart;
long timeRangeLength = timeRange.getMax() - timeRange.getMin() + 1;
if (timeRangeLength == 1) {
rangeDeletionStart = timeRange.getMin();
} else {
rangeDeletionStart = random.nextInt((int) (timeRangeLength - 1)) + timeRange.getMin();
}
long rangeDeletionEnd = Math.min(rangeDeletionStart + deletionRange, timeRange.getMax());
LOGGER.debug("Deletion range [{}, {}]", rangeDeletionStart, rangeDeletionEnd);

statement.execute(
"delete from test.table"
+ testNum
+ " where time >= "
+ rangeDeletionStart
+ " and time <= "
+ rangeDeletionEnd
+ " and deviceId = 'd"
+ i
+ "'");
deviceDeletedPointCounters.get(i).addAndGet(rangeDeletionEnd - rangeDeletionStart + 1);
LOGGER.debug(
"Deleted range [{}, {}], written points: {}, deleted points: {}",
timeRange.getMin(),
timeRange.getMax(),
currentWrittenTime + 1,
deviceDeletedPointCounters.get(i).get());

// update the range
if (rangeDeletionStart == timeRange.getMin() && rangeDeletionEnd == timeRange.getMax()) {
// range fully deleted
deviceUndeletedRanges.remove(rangeIndex);
} else if (rangeDeletionStart == timeRange.getMin()) {
// prefix deleted
timeRange.setMin(rangeDeletionEnd + 1);
} else if (rangeDeletionEnd == timeRange.getMax()) {
// suffix deleted
timeRange.setMax(rangeDeletionStart - 1);
} else {
// split into two ranges
deviceUndeletedRanges.add(new TimeRange(rangeDeletionEnd + 1, timeRange.getMax()));
timeRange.setMax(rangeDeletionStart - 1);
}

// check the point count
try (ResultSet set =
statement.executeQuery(
"select count(*) from table"
+ testNum
+ " where time <= "
+ currentWrittenTime
+ " AND deviceId = 'd"
+ i
+ "'")) {
assertTrue(set.next());
long expectedCnt = currentWrittenTime + 1 - deviceDeletedPointCounters.get(i).get();
if (expectedCnt != set.getLong(1)) {
allDeviceUndeletedRanges.set(i, mergeRanges(deviceUndeletedRanges));
List<TimeRange> remainingRanges =
collectDataRanges(statement, currentWrittenTime, testNum);
LOGGER.debug("Expected ranges: {}", deviceUndeletedRanges);
LOGGER.debug("Remaining ranges: {}", remainingRanges);
fail(
String.format(
"Inconsistent number of points %d - %d", expectedCnt, set.getLong(1)));
}
}

Thread.sleep(10);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} catch (SQLException e) {
if (e.getMessage().contains("Fail to reconnect")) {
// restart triggered, ignore
return null;
} else {
allThreads.shutdownNow();
throw e;
}
} catch (ParallelRequestTimeoutException ignored) {
// restart triggered, ignore
return null;
} catch (Throwable e) {
allThreads.shutdownNow();
throw e;
}
return null;
}

private Void restart(
AtomicLong writtenPointCounter, long targetPointNum, ExecutorService threadPool)
throws InterruptedException, SQLException {
Expand Down Expand Up @@ -1522,6 +1692,68 @@ public void deleteTableOfTheSameNameTest()
}
}

@Test
public void testConcurrentFlushAndRandomDeviceDeletion()
throws InterruptedException, ExecutionException, SQLException {
int testNum = 25;
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
statement.execute("drop database if exists test");
statement.execute(
"SET CONFIGURATION inner_compaction_task_selection_mods_file_threshold='1024'");
statement.execute("SET CONFIGURATION inner_seq_performer='FAST'");
} catch (Exception ignored) {
// remote mode cannot find the config file during SET CONFIGURATION
}

AtomicLong writtenPointCounter = new AtomicLong(-1);
int fileNumMax = 100;
int pointPerFile = 100;
int deviceNum = 4;
List<AtomicLong> deviceDeletedPointCounters = new ArrayList<>(deviceNum);
for (int i = 0; i < deviceNum; i++) {
deviceDeletedPointCounters.add(new AtomicLong(0));
}

ExecutorService threadPool = Executors.newCachedThreadPool();
Future<Void> writeThread =
threadPool.submit(
() ->
write(
writtenPointCounter,
threadPool,
fileNumMax,
pointPerFile,
deviceNum,
testNum,
false));
int deletionRange = 100;
int minIntervalToRecord = 1000;
Future<Void> deletionThread =
threadPool.submit(
() ->
randomDeviceDeletion(
writtenPointCounter,
deviceDeletedPointCounters,
threadPool,
fileNumMax,
pointPerFile,
deletionRange,
minIntervalToRecord,
testNum));
writeThread.get();
deletionThread.get();
threadPool.shutdown();
boolean success = threadPool.awaitTermination(1, TimeUnit.MINUTES);
assertTrue(success);

try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
statement.execute("drop database if exists test");
statement.execute("SET CONFIGURATION inner_seq_performer='read_chunk'");
}
}

@Ignore("performance")
@Test
public void testDeletionWritePerformance() throws SQLException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ public class IoTDBConfig {
private volatile int minCrossCompactionUnseqFileLevel = 1;

/** The interval of compaction task schedulation in each virtual database. The unit is ms. */
private long compactionScheduleIntervalInMs = 60_000L;
private long compactionScheduleIntervalInMs = 5_000L;

/** The interval of ttl check task in each database. The unit is ms. Default is 2 hours. */
private long ttlCheckInterval = 7_200_000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,21 @@ private boolean loadCompactionTaskHotModifiedProps(TrimProperties properties) th
configModified |=
innerUnsequenceCompactionSelector != conf.getInnerUnsequenceCompactionSelector();

conf.setInnerSeqCompactionPerformer(
InnerSeqCompactionPerformer.getInnerSeqCompactionPerformer(
properties.getProperty(
"inner_seq_performer", conf.getInnerSeqCompactionPerformer().toString())));

conf.setInnerUnseqCompactionPerformer(
InnerUnseqCompactionPerformer.getInnerUnseqCompactionPerformer(
properties.getProperty(
"inner_unseq_performer", conf.getInnerUnseqCompactionPerformer().toString())));

conf.setCrossCompactionPerformer(
CrossCompactionPerformer.getCrossCompactionPerformer(
properties.getProperty(
"cross_performer", conf.getCrossCompactionPerformer().toString())));

// update inner_compaction_total_file_size_threshold
long innerCompactionFileSizeThresholdInByte =
conf.getInnerCompactionTotalFileSizeThresholdInByte();
Expand Down
Loading
Loading