Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load: when table exists in IoTDB, make sure the existing/incoming ID columns are the prefix of the incoming/existing ID columns #14341

Merged
merged 8 commits into from
Dec 16, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,48 @@ public static void assertDataAlwaysOnEnv(
}
}

public static void assertDataAlwaysOnEnv(
BaseEnv env,
String sql,
String expectedHeader,
Set<String> expectedResSet,
long consistentSeconds,
String database,
Consumer<String> handleFailure) {
try (Connection connection = env.getConnection();
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
await()
.pollInSameThread()
.pollDelay(1L, TimeUnit.SECONDS)
.pollInterval(1L, TimeUnit.SECONDS)
.atMost(consistentSeconds, TimeUnit.SECONDS)
.failFast(
() -> {
try {
if (database != null) {
statement.execute("use " + database);
}
TestUtils.assertResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
} catch (Exception e) {
if (handleFailure != null) {
handleFailure.accept(e.getMessage());
}
Assert.fail();
} catch (Error e) {
if (handleFailure != null) {
handleFailure.accept(e.getMessage());
}
throw e;
}
});
} catch (Exception e) {
e.printStackTrace();
fail();
}
}

public static void restartDataNodes() {
EnvFactory.getEnv().shutdownAllDataNodes();
EnvFactory.getEnv().startAllDataNodes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2TableModel;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.rpc.TSStatusCode;

import org.junit.Assert;
Expand All @@ -37,10 +38,15 @@
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.sql.Connection;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;

import static org.junit.Assert.fail;

@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2TableModel.class})
public class IoTDBPipeWithLoadIT extends AbstractPipeTableModelTestIT {
Expand Down Expand Up @@ -124,4 +130,244 @@ public void testReceiverNotLoadDeletedTimeseries() throws Exception {
TableModelUtils.assertCountData("test", "test", 50, receiverEnv, handleFailure);
}
}

// Test that receiver will not load data when table exists but ID columns mismatch
@Test
public void testReceiverNotLoadWhenIdColumnMismatch() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();
final Consumer<String> handleFailure =
o -> {
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
};

final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("capture.table", "true");
extractorAttributes.put("extractor.realtime.mode", "file");

connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
try (Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
statement.execute("create database if not exists db");
statement.execute("use db");
statement.execute(
"create table if not exists t1(id1 STRING ID, id2 STRING ID, s1 TEXT MEASUREMENT, s2 INT32 MEASUREMENT)");
statement.execute("INSERT INTO t1(time,id1,id2,s1,s2) values(1, 'd1', 'd2', 'red', 1)");
statement.execute("INSERT INTO t1(time,id1,id2,s1,s2) values(2, 'd1', 'd2', 'blue', 2)");
statement.execute("flush");
} catch (Exception e) {
fail(e.getMessage());
}

try (Connection connection = receiverEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
statement.execute("create database if not exists db");
statement.execute("use db");
statement.execute(
"create table if not exists t1(id3 STRING ID, id4 STRING ID, s3 TEXT MEASUREMENT, s4 INT32 MEASUREMENT)");
statement.execute("INSERT INTO t1(time,id3,id4,s3,s4) values(1, 'd3', 'd4', 'red2', 10)");
statement.execute("INSERT INTO t1(time,id3,id4,s3,s4) values(2, 'd3', 'd4', 'blue2', 20)");
statement.execute("flush");
} catch (Exception e) {
fail(e.getMessage());
}

TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

try {
// wait some time
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}

Set<String> expectedResSet = new java.util.HashSet<>();
expectedResSet.add("1970-01-01T00:00:00.002Z,d3,d4,blue2,20,");
expectedResSet.add("1970-01-01T00:00:00.001Z,d3,d4,red2,10,");
// make sure data are not transferred
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from t1",
"time,id3,id4,s3,s4,",
expectedResSet,
"db",
handleFailure);
}
}

// Test that receiver can load data when table exists and existing ID columns are the prefix of
// incoming ID columns
@Test
public void testReceiverAutoExtendIdColumn() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();
final Consumer<String> handleFailure =
o -> {
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
};

final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("capture.table", "true");
extractorAttributes.put("extractor.realtime.mode", "file");

connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
try (Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
statement.execute("create database if not exists db");
statement.execute("use db");
statement.execute(
"create table if not exists t1(id1 STRING ID, id2 STRING ID, id3 STRING ID, s1 TEXT MEASUREMENT, s2 INT32 MEASUREMENT)");
statement.execute(
"INSERT INTO t1(time,id1,id2,id3,s1,s2) values(1, 'd1', 'd2', 'd3', 'red', 1)");
statement.execute(
"INSERT INTO t1(time,id1,id2,id3,s1,s2) values(2, 'd1', 'd2', 'd3', 'blue', 2)");
statement.execute("flush");
} catch (Exception e) {
fail(e.getMessage());
}

try (Connection connection = receiverEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
statement.execute("create database if not exists db");
statement.execute("use db");
statement.execute(
"create table if not exists t1(id1 STRING ID, id2 STRING ID, s3 TEXT MEASUREMENT, s4 INT32 MEASUREMENT)");
statement.execute("INSERT INTO t1(time,id1,id2,s3,s4) values(1, 'd1', 'd2', 'red2', 10)");
statement.execute("INSERT INTO t1(time,id1,id2,s3,s4) values(2, 'd1', 'd2', 'blue2', 20)");
statement.execute("flush");
} catch (Exception e) {
fail(e.getMessage());
}

TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

Set<String> expectedResSet = new java.util.HashSet<>();
expectedResSet.add("1970-01-01T00:00:00.001Z,d1,d2,null,null,d3,red,1,");
expectedResSet.add("1970-01-01T00:00:00.002Z,d1,d2,null,null,d3,blue,2,");
expectedResSet.add("1970-01-01T00:00:00.001Z,d1,d2,red2,10,null,null,null,");
expectedResSet.add("1970-01-01T00:00:00.002Z,d1,d2,blue2,20,null,null,null,");
// make sure data are transferred and column "id3" is auto extended
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from t1",
"time,id1,id2,s3,s4,id3,s1,s2,",
expectedResSet,
"db",
handleFailure);
}
}
SteveYurongSu marked this conversation as resolved.
Show resolved Hide resolved

// Test that receiver can load data when table exists and incoming ID columns are the prefix of
// existing ID columns
@Test
public void testLoadWhenIncomingIdColumnsArePrefixOfExisting() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();
final Consumer<String> handleFailure =
o -> {
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
};

final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("capture.table", "true");
extractorAttributes.put("extractor.realtime.mode", "file");

connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
try (Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
statement.execute("create database if not exists db");
statement.execute("use db");
statement.execute(
"create table if not exists t1(id1 STRING ID, id2 STRING ID, s1 TEXT MEASUREMENT, s2 INT32 MEASUREMENT)");
statement.execute("INSERT INTO t1(time,id1,id2,s1,s2) values(1, 'd1', 'd2', 'red', 1)");
statement.execute("INSERT INTO t1(time,id1,id2,s1,s2) values(2, 'd1', 'd2', 'blue', 2)");
statement.execute("flush");
} catch (Exception e) {
fail(e.getMessage());
}

try (Connection connection = receiverEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
statement.execute("create database if not exists db");
statement.execute("use db");
statement.execute(
"create table if not exists t1(id1 STRING ID, id2 STRING ID, id3 STRING ID,s3 TEXT MEASUREMENT, s4 INT32 MEASUREMENT)");
statement.execute(
"INSERT INTO t1(time,id1,id2,id3,s3,s4) values(1, 'd1', 'd2', 'd3', 'red2', 10)");
statement.execute(
"INSERT INTO t1(time,id1,id2,id3,s3,s4) values(2, 'd1', 'd2', 'd3', 'blue2', 20)");
statement.execute("flush");
} catch (Exception e) {
fail(e.getMessage());
}

TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

Set<String> expectedResSet = new java.util.HashSet<>();
expectedResSet.add("1970-01-01T00:00:00.001Z,d1,d2,d3,red2,10,null,null,");
expectedResSet.add("1970-01-01T00:00:00.002Z,d1,d2,d3,blue2,20,null,null,");
expectedResSet.add("1970-01-01T00:00:00.001Z,d1,d2,null,null,null,red,1,");
expectedResSet.add("1970-01-01T00:00:00.002Z,d1,d2,null,null,null,blue,2,");
// make sure data are transferred and column "id3" is null in transferred data
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from t1",
"time,id1,id2,id3,s3,s4,s1,s2,",
expectedResSet,
10,
"db",
handleFailure);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ private static Object[] truncateNullSuffixesOfDeviceIdSegments(Object[] segments
public void createTable(TableSchema fileSchema, MPPQueryContext context, Metadata metadata)
throws VerifyMetadataException {
final TableSchema realSchema =
metadata.validateTableHeaderSchema(database, fileSchema, context, true).orElse(null);
metadata.validateTableHeaderSchema(database, fileSchema, context, true, true).orElse(null);
if (Objects.isNull(realSchema)) {
throw new VerifyMetadataException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ List<DeviceEntry> indexScan(
* <p>The caller need to recheck the dataType of measurement columns to decide whether to do
* partial insert
*
* @param isStrictIdColumn if true, when the table already exists, the id columns in the existing
* table should be the prefix of those in the input tableSchema, or input id columns be the
* prefix of existing id columns.
* @return If table doesn't exist and the user have no authority to create table, Optional.empty()
* will be returned. The returned table may not include all the columns
* in @param{tableSchema}, if the user have no authority to alter table.
Expand All @@ -108,7 +111,8 @@ Optional<TableSchema> validateTableHeaderSchema(
final String database,
final TableSchema tableSchema,
final MPPQueryContext context,
final boolean allowCreateTable);
final boolean allowCreateTable,
final boolean isStrictIdColumn);

/**
* This method is used for table device validation and should be invoked after column validation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,9 +705,14 @@ public List<DeviceEntry> indexScan(

@Override
public Optional<TableSchema> validateTableHeaderSchema(
String database, TableSchema tableSchema, MPPQueryContext context, boolean allowCreateTable) {
String database,
TableSchema tableSchema,
MPPQueryContext context,
boolean allowCreateTable,
boolean isStrictIdColumn) {
return TableHeaderSchemaValidator.getInstance()
.validateTableHeaderSchema(database, tableSchema, context, allowCreateTable);
.validateTableHeaderSchema(
database, tableSchema, context, allowCreateTable, isStrictIdColumn);
}

@Override
Expand Down
Loading
Loading