Skip to content

Commit

Permalink
Merge branch 'master' of github.com:LJW21-02/iotdb
Browse files Browse the repository at this point in the history
  • Loading branch information
LJW21-02 committed Jan 13, 2025
2 parents 716b784 + 33f5790 commit 2131541
Show file tree
Hide file tree
Showing 135 changed files with 3,986 additions and 1,255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -495,12 +496,13 @@ public static void assertTableNonQueryTestFail(
}
}

public static void assertResultSetSize(final ResultSet actualResultSet, int size) {
public static void assertResultSetSize(final ResultSet actualResultSet, final int size) {
try {
int count = 0;
while (actualResultSet.next()) {
--size;
++count;
}
Assert.assertEquals(0, size);
Assert.assertEquals(size, count);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(String.valueOf(e));
Expand Down Expand Up @@ -1042,6 +1044,43 @@ public static void assertDataEventuallyOnEnv(
}
}

public static void assertDataSizeEventuallyOnEnv(
final BaseEnv env, final String sql, final int size, final String databaseName) {
assertDataSizeEventuallyOnEnv(env, sql, size, 600, databaseName);
}

public static void assertDataSizeEventuallyOnEnv(
final BaseEnv env,
final String sql,
final int size,
final long timeoutSeconds,
final String dataBaseName) {
try (final Connection connection = env.getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
await()
.pollInSameThread()
.pollDelay(1L, TimeUnit.SECONDS)
.pollInterval(1L, TimeUnit.SECONDS)
.atMost(timeoutSeconds, TimeUnit.SECONDS)
.untilAsserted(
() -> {
try {
if (dataBaseName != null) {
statement.execute("use " + dataBaseName);
}
if (sql != null && !sql.isEmpty()) {
TestUtils.assertResultSetSize(executeQueryWithRetry(statement, sql), size);
}
} catch (final Exception e) {
Assert.fail(e.getMessage());
}
});
} catch (Exception e) {
fail(e.getMessage());
}
}

public static void assertDataEventuallyOnEnv(
final BaseEnv env,
final String sql,
Expand Down Expand Up @@ -1106,7 +1145,7 @@ public static void assertDataEventuallyOnEnv(
final String expectedHeader,
final Set<String> expectedResSet,
final long timeoutSeconds,
final String dataBaseName,
final String databaseName,
final Consumer<String> handleFailure) {
try (Connection connection = env.getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
Expand All @@ -1119,8 +1158,8 @@ public static void assertDataEventuallyOnEnv(
.untilAsserted(
() -> {
try {
if (dataBaseName != null) {
statement.execute("use " + dataBaseName);
if (databaseName != null) {
statement.execute("use " + databaseName);
}
if (sql != null && !sql.isEmpty()) {
TestUtils.assertResultSetEqual(
Expand Down Expand Up @@ -1174,8 +1213,20 @@ public static void assertDataEventuallyOnEnv(
}

public static void assertDataAlwaysOnEnv(
BaseEnv env, String sql, String expectedHeader, Set<String> expectedResSet) {
assertDataAlwaysOnEnv(env, sql, expectedHeader, expectedResSet, 10);
final BaseEnv env,
final String sql,
final String expectedHeader,
final Set<String> expectedResSet) {
assertDataAlwaysOnEnv(env, sql, expectedHeader, expectedResSet, 10, (String) null);
}

public static void assertDataAlwaysOnEnv(
final BaseEnv env,
final String sql,
final String expectedHeader,
final Set<String> expectedResSet,
final String database) {
assertDataAlwaysOnEnv(env, sql, expectedHeader, expectedResSet, 10, database);
}

public static void assertDataAlwaysOnEnv(
Expand All @@ -1192,8 +1243,11 @@ public static void assertDataAlwaysOnEnv(
String sql,
String expectedHeader,
Set<String> expectedResSet,
long consistentSeconds) {
try (Connection connection = env.getConnection();
long consistentSeconds,
final String database) {
try (Connection connection =
env.getConnection(
Objects.isNull(database) ? BaseEnv.TREE_SQL_DIALECT : BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
await()
Expand All @@ -1204,6 +1258,9 @@ public static void assertDataAlwaysOnEnv(
.failFast(
() -> {
try {
if (Objects.nonNull(database)) {
statement.execute("use " + database);
}
TestUtils.assertResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.pipe.it.tablemodel.TableModelUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.junit.Assert;
Expand Down Expand Up @@ -392,6 +394,38 @@ public void testDropRoleIdempotent() throws Exception {
Collections.singleton("1,"));
}

// Table model

@Test
public void testCreateTableIdempotent() throws Exception {
testTableConfigIdempotent(Collections.emptyList(), "create table test()");
}

@Test
public void testAlterTableAddColumnIdempotent() throws Exception {
testTableConfigIdempotent(
Collections.singletonList("create table test()"), "alter table test add column a id");
}

@Test
public void testAlterTableSetPropertiesIdempotent() throws Exception {
testTableConfigIdempotent(
Collections.singletonList("create table test()"),
"alter table test set properties ttl=100");
}

@Test
public void testAlterTableDropColumnIdempotent() throws Exception {
testTableConfigIdempotent(
Collections.singletonList("create table test(a id, b attribute, c int32)"),
"alter table test drop column b");
}

@Test
public void testDropTableIdempotent() throws Exception {
testTableConfigIdempotent(Collections.singletonList("create table test()"), "drop table test");
}

private void testIdempotent(
final List<String> beforeSqlList,
final String testSql,
Expand Down Expand Up @@ -449,4 +483,63 @@ private void testIdempotent(
// Assume that the afterSql is executed on receiverEnv
TestUtils.assertDataEventuallyOnEnv(receiverEnv, afterSqlQuery, expectedHeader, expectedResSet);
}

private void testTableConfigIdempotent(final List<String> beforeSqlList, final String testSql)
throws Exception {
final String database = "test";
TableModelUtils.createDatabase(senderEnv, database);
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.inclusion", "all");
extractorAttributes.put("extractor.inclusion.exclusion", "");
extractorAttributes.put("extractor.forwarding-pipe-requests", "false");
extractorAttributes.put("extractor.capture.table", "true");
extractorAttributes.put("extractor.capture.tree", "false");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.exception.conflict.resolve-strategy", "retry");
connectorAttributes.put("connector.exception.conflict.retry-max-time-seconds", "-1");

final TSStatus status =
client.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
}

if (!TestUtils.tryExecuteNonQueriesWithRetry(
database, BaseEnv.TABLE_SQL_DIALECT, senderEnv, beforeSqlList)) {
return;
}

if (!TestUtils.tryExecuteNonQueryWithRetry(
database, BaseEnv.TABLE_SQL_DIALECT, receiverEnv, testSql)) {
return;
}

// Create an idempotent conflict
if (!TestUtils.tryExecuteNonQueryWithRetry(
database, BaseEnv.TABLE_SQL_DIALECT, senderEnv, testSql)) {
return;
}

TableModelUtils.createDatabase(senderEnv, "test2");

// Assume that the "database" is executed on receiverEnv
TestUtils.assertDataSizeEventuallyOnEnv(receiverEnv, "show databases", 3, null);
}
}
Loading

0 comments on commit 2131541

Please sign in to comment.