From 82c85483572fce80bcdb5275fe909d3850323380 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 13 Jan 2025 10:19:33 +0800 Subject: [PATCH 1/2] Ignore the DatabaseNotExistsException for adjustMaxRegionGroupNum when querying dataRegionGroupCount (#14669) * Update ClusterSchemaManager.java * Update ClusterSchemaManager.java --- .../manager/schema/ClusterSchemaManager.java | 97 ++++++++++--------- 1 file changed, 50 insertions(+), 47 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java index 9a6c06c26733..8e4bab6b0d49 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java @@ -490,59 +490,62 @@ public synchronized void adjustMaxRegionGroupNum() { continue; } + // Adjust maxSchemaRegionGroupNum for each Database. + // All Databases share the DataNodes equally. + // The allocated SchemaRegionGroups will not be shrunk. + final int allocatedSchemaRegionGroupCount; try { - // Adjust maxSchemaRegionGroupNum for each Database. - // All Databases share the DataNodes equally. - // The allocated SchemaRegionGroups will not be shrunk. - final int allocatedSchemaRegionGroupCount; - try { - allocatedSchemaRegionGroupCount = - getPartitionManager() - .getRegionGroupCount(databaseSchema.getName(), TConsensusGroupType.SchemaRegion); - } catch (final DatabaseNotExistsException e) { - // ignore the pre deleted database - continue; - } + allocatedSchemaRegionGroupCount = + getPartitionManager() + .getRegionGroupCount(databaseSchema.getName(), TConsensusGroupType.SchemaRegion); + } catch (final DatabaseNotExistsException e) { + // ignore the pre deleted database + continue; + } - final int maxSchemaRegionGroupNum = - calcMaxRegionGroupNum( - databaseSchema.getMinSchemaRegionGroupNum(), - SCHEMA_REGION_PER_DATA_NODE, - dataNodeNum, - databaseNum, - databaseSchema.getSchemaReplicationFactor(), - allocatedSchemaRegionGroupCount); - LOGGER.info( - "[AdjustRegionGroupNum] The maximum number of SchemaRegionGroups for Database: {} is adjusted to: {}", - databaseSchema.getName(), - maxSchemaRegionGroupNum); - - // Adjust maxDataRegionGroupNum for each Database. - // All Databases share the DataNodes equally. - // The allocated DataRegionGroups will not be shrunk. - final int allocatedDataRegionGroupCount = + final int maxSchemaRegionGroupNum = + calcMaxRegionGroupNum( + databaseSchema.getMinSchemaRegionGroupNum(), + SCHEMA_REGION_PER_DATA_NODE, + dataNodeNum, + databaseNum, + databaseSchema.getSchemaReplicationFactor(), + allocatedSchemaRegionGroupCount); + LOGGER.info( + "[AdjustRegionGroupNum] The maximum number of SchemaRegionGroups for Database: {} is adjusted to: {}", + databaseSchema.getName(), + maxSchemaRegionGroupNum); + + // Adjust maxDataRegionGroupNum for each Database. + // All Databases share the DataNodes equally. + // The allocated DataRegionGroups will not be shrunk. + final int allocatedDataRegionGroupCount; + try { + allocatedDataRegionGroupCount = getPartitionManager() .getRegionGroupCount(databaseSchema.getName(), TConsensusGroupType.DataRegion); - final int maxDataRegionGroupNum = - calcMaxRegionGroupNum( - databaseSchema.getMinDataRegionGroupNum(), - DATA_REGION_PER_DATA_NODE == 0 - ? CONF.getDataRegionPerDataNodeProportion() - : DATA_REGION_PER_DATA_NODE, - DATA_REGION_PER_DATA_NODE == 0 ? totalCpuCoreNum : dataNodeNum, - databaseNum, - databaseSchema.getDataReplicationFactor(), - allocatedDataRegionGroupCount); - LOGGER.info( - "[AdjustRegionGroupNum] The maximum number of DataRegionGroups for Database: {} is adjusted to: {}", - databaseSchema.getName(), - maxDataRegionGroupNum); - - adjustMaxRegionGroupNumPlan.putEntry( - databaseSchema.getName(), new Pair<>(maxSchemaRegionGroupNum, maxDataRegionGroupNum)); } catch (final DatabaseNotExistsException e) { - LOGGER.warn("Adjust maxRegionGroupNum failed because Database doesn't exist", e); + // ignore the pre deleted database + continue; } + + final int maxDataRegionGroupNum = + calcMaxRegionGroupNum( + databaseSchema.getMinDataRegionGroupNum(), + DATA_REGION_PER_DATA_NODE == 0 + ? CONF.getDataRegionPerDataNodeProportion() + : DATA_REGION_PER_DATA_NODE, + DATA_REGION_PER_DATA_NODE == 0 ? totalCpuCoreNum : dataNodeNum, + databaseNum, + databaseSchema.getDataReplicationFactor(), + allocatedDataRegionGroupCount); + LOGGER.info( + "[AdjustRegionGroupNum] The maximum number of DataRegionGroups for Database: {} is adjusted to: {}", + databaseSchema.getName(), + maxDataRegionGroupNum); + + adjustMaxRegionGroupNumPlan.putEntry( + databaseSchema.getName(), new Pair<>(maxSchemaRegionGroupNum, maxDataRegionGroupNum)); } try { getConsensusManager().write(adjustMaxRegionGroupNumPlan); From 3ac203c4e79429166a8263a37f9048899ad15e8c Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 13 Jan 2025 14:51:26 +0800 Subject: [PATCH 2/2] Pipe: Implement table meta/deletion sync & Fix the bug that delete data node's pattern is not parsed / pipe transferred delete data node's progressIndex is not binded & Implement mem control for schema snapshot parser (#14156) Co-authored-by: Steve Yurong Su --- .../apache/iotdb/db/it/utils/TestUtils.java | 77 ++++- .../it/autocreate/IoTDBPipeIdempotentIT.java | 93 ++++++ .../it/manual/IoTDBPipeTableManualIT.java | 293 ++++++++++++++++++ .../pipe/it/tablemodel/IoTDBPipeSyntaxIT.java | 2 +- .../pipe/it/tablemodel/TableModelUtils.java | 17 +- .../consensus/request/ConfigPhysicalPlan.java | 10 +- .../request/ConfigPhysicalPlanType.java | 2 + .../request/ConfigPhysicalPlanVisitor.java | 110 +++++-- .../pipe/payload/PipeCreateTablePlan.java | 34 ++ .../payload/PipeDeactivateTemplatePlan.java | 22 +- .../pipe/payload/PipeDeleteDevicesPlan.java | 130 ++++++++ .../write/table/PreCreateTablePlan.java | 11 +- .../confignode/manager/ClusterManager.java | 2 +- .../confignode/manager/ConfigManager.java | 2 +- .../confignode/manager/ProcedureManager.java | 30 +- .../PipeTransferConfigSnapshotSealReq.java | 32 +- .../IoTDBConfigRegionAirGapConnector.java | 4 + .../protocol/IoTDBConfigRegionConnector.java | 6 +- .../event/PipeConfigRegionSnapshotEvent.java | 3 +- .../ConfigRegionListeningFilter.java | 89 +++--- .../extractor/ConfigRegionListeningQueue.java | 23 +- .../extractor/IoTDBConfigRegionExtractor.java | 70 ++++- ...gPhysicalPlanTablePatternParseVisitor.java | 135 ++++++++ ...gPhysicalPlanTreePatternParseVisitor.java} | 6 +- .../protocol/IoTDBConfigNodeReceiver.java | 170 +++++++++- ...PipeConfigPhysicalPlanTSStatusVisitor.java | 76 +++++ .../manager/schema/ClusterSchemaManager.java | 29 +- .../executor/ConfigPlanExecutor.java | 1 + .../confignode/persistence/pipe/PipeInfo.java | 2 +- .../schema/CNPhysicalPlanGenerator.java | 77 +++-- .../persistence/schema/ConfigMTree.java | 5 +- .../schema/ConfignodeSnapshotParser.java | 30 +- .../procedure/env/ConfigNodeProcedureEnv.java | 4 +- .../impl/pipe/task/CreatePipeProcedureV2.java | 42 +-- .../impl/schema/DeleteDatabaseProcedure.java | 2 +- .../schema/DeleteTimeSeriesProcedure.java | 4 +- .../AbstractAlterOrDropTableProcedure.java | 10 +- .../schema/table/AddTableColumnProcedure.java | 16 +- .../schema/table/CreateTableProcedure.java | 20 +- .../schema/table/DeleteDevicesProcedure.java | 43 ++- .../table/DropTableColumnProcedure.java | 21 +- .../impl/schema/table/DropTableProcedure.java | 24 +- .../table/RenameTableColumnProcedure.java | 18 +- .../table/SetTablePropertiesProcedure.java | 18 +- .../impl/trigger/CreateTriggerProcedure.java | 9 +- .../procedure/store/ProcedureFactory.java | 35 ++- .../procedure/store/ProcedureType.java | 7 + .../request/ConfigPhysicalPlanSerDeTest.java | 40 +++ .../PipeConfigNodeThriftRequestTest.java | 12 +- ...sicalPlanTablePatternParseVisitorTest.java | 133 ++++++++ ...sicalPlanTreePatternParseVisitorTest.java} | 44 +-- .../receiver/PipeEnrichedProcedureTest.java | 194 ++++++++++++ .../table/AddTableColumnProcedureTest.java | 8 +- .../table/CreateTableProcedureTest.java | 5 +- .../table/DeleteDevicesProcedureTest.java | 5 +- .../table/DropTableColumnProcedureTest.java | 4 +- .../schema/table/DropTableProcedureTest.java | 4 +- .../table/RenameTableColumnProcedureTest.java | 4 +- .../SetTablePropertiesProcedureTest.java | 6 +- .../dataregion/DataExecutionVisitor.java | 9 +- .../SchemaRegionStateMachine.java | 15 +- .../task/connection/PipeEventCollector.java | 32 +- .../PipeTransferDataNodeHandshakeV1Req.java | 10 +- .../request/PipeTransferPlanNodeReq.java | 18 +- .../PipeTransferSchemaSnapshotSealReq.java | 94 ++++-- .../PipeTransferTabletInsertNodeReqV2.java | 2 +- .../PipeTransferTsFileSealWithModReq.java | 42 ++- .../IoTDBSchemaRegionAirGapConnector.java | 15 +- .../sync/IoTDBSchemaRegionConnector.java | 22 +- .../deletion/PipeDeleteDataNodeEvent.java | 24 +- .../schema/PipeSchemaRegionSnapshotEvent.java | 75 ++++- .../PipeSchemaSerializableEventType.java | 15 +- .../dataregion/DataRegionListeningFilter.java | 21 +- ...lDataRegionTsFileAndDeletionExtractor.java | 2 +- .../PipeInsertionDataNodeListener.java | 27 +- .../IoTDBSchemaRegionExtractor.java | 24 +- .../PipePlanTablePatternParseVisitor.java | 77 +++++ ...a => PipePlanTreePatternParseVisitor.java} | 7 +- .../SchemaRegionListeningFilter.java | 87 +++--- .../SchemaRegionListeningQueue.java | 8 +- .../metric/PipeDataNodeReceiverMetrics.java | 48 +-- .../thrift/IoTDBDataNodeReceiver.java | 209 +++++++++---- .../visitor/PipePlanToStatementVisitor.java | 54 +++- ...PipeStatementTablePatternParseVisitor.java | 40 +++ ...PipeStatementTreePatternParseVisitor.java} | 6 +- ...a => PipeTreeStatementToBatchVisitor.java} | 2 +- .../impl/DataNodeInternalRPCServiceImpl.java | 2 + .../executor/RegionWriteExecutor.java | 55 ++-- .../plan/analyze/AnalyzeUtils.java | 9 +- .../plan/analyze/AnalyzeVisitor.java | 4 +- .../config/TableConfigTaskVisitor.java | 2 +- .../plan/planner/LogicalPlanVisitor.java | 7 +- .../metadata/write/CreateTimeSeriesNode.java | 74 ++--- .../node/pipe/PipeEnrichedDeleteDataNode.java | 66 ++-- .../node/pipe/PipeEnrichedInsertNode.java | 44 +-- .../node/pipe/PipeEnrichedWritePlanNode.java | 23 +- .../analyzer/StatementAnalyzer.java | 79 ++--- .../relational/planner/RelationPlanner.java | 91 +++--- .../planner/TableLogicalPlanner.java | 15 +- .../schema/CreateOrUpdateTableDeviceNode.java | 14 +- .../sql/ast/AbstractTraverseDevice.java | 8 + .../plan/relational/sql/ast/AstVisitor.java | 2 +- .../sql/ast/CreateOrUpdateDevice.java | 31 +- .../plan/relational/sql/ast/CreatePipe.java | 4 +- .../plan/relational/sql/ast/Delete.java | 28 +- .../plan/relational/sql/ast/PipeEnriched.java | 8 +- .../plan/relational/sql/ast/Statement.java | 4 +- .../plan/relational/sql/ast/Update.java | 6 +- .../statement/crud/InsertBaseStatement.java | 16 +- .../statement/pipe/PipeEnrichedStatement.java | 10 +- .../attribute/DeviceAttributeStore.java | 10 +- .../attribute/IDeviceAttributeStore.java | 6 +- .../impl/SchemaRegionMemoryImpl.java | 136 ++++---- .../mtree/impl/mem/MemMTreeStore.java | 52 ++-- .../impl/mem/mnode/basic/BasicMNode.java | 22 +- .../mem/snapshot/MemMTreeSnapshotUtil.java | 37 +-- .../db/tools/schema/SRStatementGenerator.java | 203 ++++++++---- .../schema/SchemaRegionSnapshotParser.java | 32 +- .../PipeDataNodeThriftRequestTest.java | 22 +- ...StatementTablePatternParseVisitorTest.java | 57 ++++ ...StatementTreePatternParseVisitorTest.java} | 20 +- .../pipe/consensus/DeletionResourceTest.java | 39 +-- .../PipePlanTablePatternParseVisitorTest.java | 129 ++++++++ ... PipePlanTreePatternParseVisitorTest.java} | 32 +- .../utils/SchemaRegionSnapshotParserTest.java | 200 ++++++++---- .../thrift/request/PipeRequestType.java | 4 +- .../request/PipeTransferFileSealReqV2.java | 13 +- .../options/PipeInclusionOptions.java | 43 ++- .../datastructure/pattern/TablePattern.java | 32 +- .../datastructure/pattern/TreePattern.java | 33 +- .../pipe/extractor/IoTDBExtractor.java | 34 +- .../IoTDBNonDataRegionExtractor.java | 20 +- .../pipe/receiver/IoTDBFileReceiver.java | 32 +- .../iotdb/commons/schema/node/IMNode.java | 18 +- .../schema/node/visitor/MNodeVisitor.java | 6 +- 135 files changed, 3936 insertions(+), 1208 deletions(-) create mode 100644 integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTableManualIT.java create mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/payload/PipeCreateTablePlan.java create mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/payload/PipeDeleteDevicesPlan.java create mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePatternParseVisitor.java rename iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/{PipeConfigPhysicalPlanPatternParseVisitor.java => PipeConfigPhysicalPlanTreePatternParseVisitor.java} (98%) create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePatternParseVisitorTest.java rename iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/{PipeConfigPhysicalPlanPatternParseVisitorTest.java => PipeConfigPhysicalPlanTreePatternParseVisitorTest.java} (91%) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/PipePlanTablePatternParseVisitor.java rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/{PipePlanPatternParseVisitor.java => PipePlanTreePatternParseVisitor.java} (98%) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTablePatternParseVisitor.java rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/{PipeStatementPatternParseVisitor.java => PipeStatementTreePatternParseVisitor.java} (97%) rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/{PipeStatementToBatchVisitor.java => PipeTreeStatementToBatchVisitor.java} (98%) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeStatementTablePatternParseVisitorTest.java rename iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/{PipeStatementPatternParseVisitorTest.java => PipeStatementTreePatternParseVisitorTest.java} (94%) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipePlanTablePatternParseVisitorTest.java rename iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/{PipePlanPatternParseVisitorTest.java => PipePlanTreePatternParseVisitorTest.java} (94%) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java index dc995fcb490f..fdbb40d51043 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java @@ -48,6 +48,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; @@ -495,12 +496,13 @@ public static void assertTableNonQueryTestFail( } } - public static void assertResultSetSize(final ResultSet actualResultSet, int size) { + public static void assertResultSetSize(final ResultSet actualResultSet, final int size) { try { + int count = 0; while (actualResultSet.next()) { - --size; + ++count; } - Assert.assertEquals(0, size); + Assert.assertEquals(size, count); } catch (final Exception e) { e.printStackTrace(); Assert.fail(String.valueOf(e)); @@ -1042,6 +1044,43 @@ public static void assertDataEventuallyOnEnv( } } + public static void assertDataSizeEventuallyOnEnv( + final BaseEnv env, final String sql, final int size, final String databaseName) { + assertDataSizeEventuallyOnEnv(env, sql, size, 600, databaseName); + } + + public static void assertDataSizeEventuallyOnEnv( + final BaseEnv env, + final String sql, + final int size, + final long timeoutSeconds, + final String dataBaseName) { + try (final Connection connection = env.getConnection(BaseEnv.TABLE_SQL_DIALECT); + final Statement statement = connection.createStatement()) { + // Keep retrying if there are execution failures + await() + .pollInSameThread() + .pollDelay(1L, TimeUnit.SECONDS) + .pollInterval(1L, TimeUnit.SECONDS) + .atMost(timeoutSeconds, TimeUnit.SECONDS) + .untilAsserted( + () -> { + try { + if (dataBaseName != null) { + statement.execute("use " + dataBaseName); + } + if (sql != null && !sql.isEmpty()) { + TestUtils.assertResultSetSize(executeQueryWithRetry(statement, sql), size); + } + } catch (final Exception e) { + Assert.fail(e.getMessage()); + } + }); + } catch (Exception e) { + fail(e.getMessage()); + } + } + public static void assertDataEventuallyOnEnv( final BaseEnv env, final String sql, @@ -1106,7 +1145,7 @@ public static void assertDataEventuallyOnEnv( final String expectedHeader, final Set expectedResSet, final long timeoutSeconds, - final String dataBaseName, + final String databaseName, final Consumer handleFailure) { try (Connection connection = env.getConnection(BaseEnv.TABLE_SQL_DIALECT); Statement statement = connection.createStatement()) { @@ -1119,8 +1158,8 @@ public static void assertDataEventuallyOnEnv( .untilAsserted( () -> { try { - if (dataBaseName != null) { - statement.execute("use " + dataBaseName); + if (databaseName != null) { + statement.execute("use " + databaseName); } if (sql != null && !sql.isEmpty()) { TestUtils.assertResultSetEqual( @@ -1174,8 +1213,20 @@ public static void assertDataEventuallyOnEnv( } public static void assertDataAlwaysOnEnv( - BaseEnv env, String sql, String expectedHeader, Set expectedResSet) { - assertDataAlwaysOnEnv(env, sql, expectedHeader, expectedResSet, 10); + final BaseEnv env, + final String sql, + final String expectedHeader, + final Set expectedResSet) { + assertDataAlwaysOnEnv(env, sql, expectedHeader, expectedResSet, 10, (String) null); + } + + public static void assertDataAlwaysOnEnv( + final BaseEnv env, + final String sql, + final String expectedHeader, + final Set expectedResSet, + final String database) { + assertDataAlwaysOnEnv(env, sql, expectedHeader, expectedResSet, 10, database); } public static void assertDataAlwaysOnEnv( @@ -1192,8 +1243,11 @@ public static void assertDataAlwaysOnEnv( String sql, String expectedHeader, Set expectedResSet, - long consistentSeconds) { - try (Connection connection = env.getConnection(); + long consistentSeconds, + final String database) { + try (Connection connection = + env.getConnection( + Objects.isNull(database) ? BaseEnv.TREE_SQL_DIALECT : BaseEnv.TABLE_SQL_DIALECT); Statement statement = connection.createStatement()) { // Keep retrying if there are execution failures await() @@ -1204,6 +1258,9 @@ public static void assertDataAlwaysOnEnv( .failFast( () -> { try { + if (Objects.nonNull(database)) { + statement.execute("use " + database); + } TestUtils.assertResultSetEqual( executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet); } catch (Exception e) { diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java index 49f7b1fd3f35..cbe1ad065578 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java @@ -28,6 +28,8 @@ import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.pipe.it.tablemodel.TableModelUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.junit.Assert; @@ -392,6 +394,38 @@ public void testDropRoleIdempotent() throws Exception { Collections.singleton("1,")); } + // Table model + + @Test + public void testCreateTableIdempotent() throws Exception { + testTableConfigIdempotent(Collections.emptyList(), "create table test()"); + } + + @Test + public void testAlterTableAddColumnIdempotent() throws Exception { + testTableConfigIdempotent( + Collections.singletonList("create table test()"), "alter table test add column a id"); + } + + @Test + public void testAlterTableSetPropertiesIdempotent() throws Exception { + testTableConfigIdempotent( + Collections.singletonList("create table test()"), + "alter table test set properties ttl=100"); + } + + @Test + public void testAlterTableDropColumnIdempotent() throws Exception { + testTableConfigIdempotent( + Collections.singletonList("create table test(a id, b attribute, c int32)"), + "alter table test drop column b"); + } + + @Test + public void testDropTableIdempotent() throws Exception { + testTableConfigIdempotent(Collections.singletonList("create table test()"), "drop table test"); + } + private void testIdempotent( final List beforeSqlList, final String testSql, @@ -449,4 +483,63 @@ private void testIdempotent( // Assume that the afterSql is executed on receiverEnv TestUtils.assertDataEventuallyOnEnv(receiverEnv, afterSqlQuery, expectedHeader, expectedResSet); } + + private void testTableConfigIdempotent(final List beforeSqlList, final String testSql) + throws Exception { + final String database = "test"; + TableModelUtils.createDatabase(senderEnv, database); + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.inclusion", "all"); + extractorAttributes.put("extractor.inclusion.exclusion", ""); + extractorAttributes.put("extractor.forwarding-pipe-requests", "false"); + extractorAttributes.put("extractor.capture.table", "true"); + extractorAttributes.put("extractor.capture.tree", "false"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.exception.conflict.resolve-strategy", "retry"); + connectorAttributes.put("connector.exception.conflict.retry-max-time-seconds", "-1"); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("testPipe", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + } + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + database, BaseEnv.TABLE_SQL_DIALECT, senderEnv, beforeSqlList)) { + return; + } + + if (!TestUtils.tryExecuteNonQueryWithRetry( + database, BaseEnv.TABLE_SQL_DIALECT, receiverEnv, testSql)) { + return; + } + + // Create an idempotent conflict + if (!TestUtils.tryExecuteNonQueryWithRetry( + database, BaseEnv.TABLE_SQL_DIALECT, senderEnv, testSql)) { + return; + } + + TableModelUtils.createDatabase(senderEnv, "test2"); + + // Assume that the "database" is executed on receiverEnv + TestUtils.assertDataSizeEventuallyOnEnv(receiverEnv, "show databases", 3, null); + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTableManualIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTableManualIT.java new file mode 100644 index 000000000000..f375bd35e81b --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTableManualIT.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.manual; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.pipe.it.tablemodel.TableModelUtils; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2ManualCreateSchema.class}) +public class IoTDBPipeTableManualIT extends AbstractPipeDualManualIT { + @Test + public void testTableSync() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.inclusion", "all"); + extractorAttributes.put("extractor.capture.tree", "false"); + extractorAttributes.put("extractor.capture.table", "true"); + extractorAttributes.put("extractor.database-name", "test"); + extractorAttributes.put("extractor.table-name", "t.*[0-9]"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("testPipe", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + final String dbName = "test"; + TableModelUtils.createDatabase(senderEnv, dbName, 300); + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + dbName, + BaseEnv.TABLE_SQL_DIALECT, + senderEnv, + Arrays.asList( + "create table table1(a id, b attribute, c int32) with (ttl=3000)", + "alter table table1 add column d int64", + "alter table table1 drop column c", + "alter table table1 set properties ttl=default", + "insert into table1 (a, b, d) values(1, 1, 1)", + "create table noTransferTable(a id, b attribute, c int32) with (ttl=3000)"))) { + return; + } + + TableModelUtils.createDatabase(senderEnv, "noTransferDatabase", 300); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "show tables from test", + "TableName,TTL(ms),", + Collections.singleton("table1,300,"), + dbName); + + // Test devices + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, "show devices from table1", "a,b,", Collections.singleton("1,1,"), dbName); + + if (!TestUtils.tryExecuteNonQueryWithRetry( + dbName, BaseEnv.TABLE_SQL_DIALECT, senderEnv, "insert into table1 (a, b) values(1, 2)")) { + return; + } + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, "show devices from table1", "a,b,", Collections.singleton("1,2,"), dbName); + + if (!TestUtils.tryExecuteNonQueryWithRetry( + dbName, BaseEnv.TABLE_SQL_DIALECT, senderEnv, "update table1 set b = '3'")) { + return; + } + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, "show devices from table1", "a,b,", Collections.singleton("1,3,"), dbName); + + if (!TestUtils.tryExecuteNonQueryWithRetry( + dbName, BaseEnv.TABLE_SQL_DIALECT, senderEnv, "delete from table1")) { + return; + } + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, "select * from table1", "a,b,d,", Collections.emptySet(), dbName); + + if (!TestUtils.tryExecuteNonQueryWithRetry( + dbName, + BaseEnv.TABLE_SQL_DIALECT, + senderEnv, + "delete devices from table1 where a = '1'")) { + return; + } + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, "show devices from table1", "a,b,", Collections.emptySet(), dbName); + + // Will not include no-transfer table + TestUtils.assertDataAlwaysOnEnv( + receiverEnv, + "show tables from test", + "TableName,TTL(ms),", + Collections.singleton("table1,300,"), + dbName); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "desc table1", + "ColumnName,DataType,Category,", + new HashSet<>( + Arrays.asList( + "time,TIMESTAMP,TIME,", + "a,STRING,ID,", + "b,STRING,ATTRIBUTE,", + "d,INT64,MEASUREMENT,")), + dbName); + + if (!TestUtils.tryExecuteNonQueryWithRetry( + dbName, BaseEnv.TABLE_SQL_DIALECT, senderEnv, "drop table table1")) { + return; + } + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "show tables from test", + "TableName,TTL(ms),", + Collections.emptySet(), + dbName); + + if (!TestUtils.tryExecuteNonQueryWithRetry( + dbName, BaseEnv.TABLE_SQL_DIALECT, senderEnv, "drop database test")) { + return; + } + + // Will not include no-transfer database + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "show databases", + "Database,TTL(ms),SchemaReplicationFactor,DataReplicationFactor,TimePartitionInterval,", + Collections.singleton("information_schema,INF,null,null,null,"), + (String) null); + } + } + + @Test + public void testNoTree() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.inclusion", "all"); + extractorAttributes.put("extractor.capture.tree", "false"); + extractorAttributes.put("extractor.capture.table", "true"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("testPipe", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "create database root.test", + "alter database root.test with schema_region_group_num=2, data_region_group_num=3", + "create timeSeries root.test.d1.s1 int32", + "insert into root.test.d1 (s1) values (1)"))) { + return; + } + + TestUtils.assertDataAlwaysOnEnv( + receiverEnv, + "show databases", + "Database,SchemaReplicationFactor,DataReplicationFactor,TimePartitionInterval,", + Collections.emptySet()); + } + } + + @Test + public void testNoTable() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.inclusion", "all"); + extractorAttributes.put("extractor.inclusion.exclusion", "data.delete"); + extractorAttributes.put("extractor.capture.tree", "true"); + extractorAttributes.put("extractor.capture.table", "false"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("testPipe", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); + + final String dbName = "test"; + TableModelUtils.createDatabase(senderEnv, dbName, 300); + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + dbName, + BaseEnv.TABLE_SQL_DIALECT, + senderEnv, + Arrays.asList( + "create table table1(a id, b attribute, c int32) with (ttl=3000)", + "alter table table1 add column d int64", + "alter table table1 drop column b", + "alter table table1 set properties ttl=default"))) { + return; + } + + TestUtils.assertDataAlwaysOnEnv( + receiverEnv, + "show databases", + "Database,TTL(ms),SchemaReplicationFactor,DataReplicationFactor,TimePartitionInterval,", + Collections.singleton("information_schema,INF,null,null,null,"), + dbName); + } + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSyntaxIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSyntaxIT.java index 09658a583b08..96673a93a4a9 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSyntaxIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSyntaxIT.java @@ -630,7 +630,7 @@ public void testInclusionPattern() throws Exception { + "'connector.batch.enable'='false')", receiverIp, receiverPort)); fail(); - } catch (SQLException ignored) { + } catch (final SQLException ignored) { } // Invalid inclusion diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java index a3c8c3757420..46333e5c4703 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java @@ -81,11 +81,18 @@ public static void createDataBaseAndTable( } } - public static void createDataBase(final BaseEnv baseEnv, final String database) { - try (Connection connection = baseEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT); - Statement statement = connection.createStatement()) { - statement.execute("create database if not exists " + database); - } catch (Exception e) { + public static void createDatabase(final BaseEnv baseEnv, final String database) { + createDatabase(baseEnv, database, Long.MAX_VALUE); + } + + public static void createDatabase(final BaseEnv baseEnv, final String database, final long ttl) { + try (final Connection connection = baseEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT); + final Statement statement = connection.createStatement()) { + statement.execute( + "create database if not exists " + + database + + (ttl < Long.MAX_VALUE ? " with (ttl=" + ttl + ")" : "")); + } catch (final Exception e) { fail(e.getMessage()); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java index cc1aaee05553..d1de403421b6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java @@ -60,7 +60,9 @@ import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan; +import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeCreateTablePlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeactivateTemplatePlan; +import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteDevicesPlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteLogicalViewPlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteTimeSeriesPlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan; @@ -342,7 +344,7 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept plan = new ExtendSchemaTemplatePlan(); break; case PreCreateTable: - plan = new PreCreateTablePlan(); + plan = new PreCreateTablePlan(configPhysicalPlanType); break; case RollbackCreateTable: plan = new RollbackCreateTablePlan(); @@ -452,6 +454,12 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept case PipeDeactivateTemplate: plan = new PipeDeactivateTemplatePlan(); break; + case PipeCreateTable: + plan = new PipeCreateTablePlan(); + break; + case PipeDeleteDevices: + plan = new PipeDeleteDevicesPlan(); + break; case UpdateTriggersOnTransferNodes: plan = new UpdateTriggersOnTransferNodesPlan(); break; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java index 15236010654d..0f09b7f3c326 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java @@ -248,6 +248,8 @@ public enum ConfigPhysicalPlanType { PipeDeleteLogicalView((short) 1703), PipeDeactivateTemplate((short) 1704), PipeSetTTL((short) 1705), + PipeCreateTable((short) 1706), + PipeDeleteDevices((short) 1707), /** Subscription */ CreateTopic((short) 1800), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanVisitor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanVisitor.java index 1e418e338194..d2a46baf5858 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanVisitor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanVisitor.java @@ -23,17 +23,24 @@ import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan; import org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan; import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan; +import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeCreateTablePlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeactivateTemplatePlan; +import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteDevicesPlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteLogicalViewPlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteTimeSeriesPlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeUnsetSchemaTemplatePlan; +import org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan; +import org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteColumnPlan; +import org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteTablePlan; +import org.apache.iotdb.confignode.consensus.request.write.table.RenameTableColumnPlan; +import org.apache.iotdb.confignode.consensus.request.write.table.SetTablePropertiesPlan; import org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan; import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan; import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan; import org.apache.iotdb.confignode.consensus.request.write.template.ExtendSchemaTemplatePlan; public abstract class ConfigPhysicalPlanVisitor { - public R process(ConfigPhysicalPlan plan, C context) { + public R process(final ConfigPhysicalPlan plan, final C context) { switch (plan.getType()) { case CreateDatabase: return visitCreateDatabase((DatabaseSchemaPlan) plan, context); @@ -83,111 +90,162 @@ public R process(ConfigPhysicalPlan plan, C context) { return visitRevokeRoleFromUser((AuthorPlan) plan, context); case SetTTL: return visitTTL((SetTTLPlan) plan, context); + case PipeCreateTable: + return visitPipeCreateTable((PipeCreateTablePlan) plan, context); + case AddTableColumn: + return visitAddTableColumn((AddTableColumnPlan) plan, context); + case SetTableProperties: + return visitSetTableProperties((SetTablePropertiesPlan) plan, context); + case RenameTableColumn: + return visitRenameTableColumn((RenameTableColumnPlan) plan, context); + case CommitDeleteColumn: + return visitCommitDeleteColumn((CommitDeleteColumnPlan) plan, context); + case CommitDeleteTable: + return visitCommitDeleteTable((CommitDeleteTablePlan) plan, context); + case PipeDeleteDevices: + return visitPipeDeleteDevices((PipeDeleteDevicesPlan) plan, context); default: return visitPlan(plan, context); } } /** Top Level Description */ - public abstract R visitPlan(ConfigPhysicalPlan plan, C context); + public abstract R visitPlan(final ConfigPhysicalPlan plan, final C context); - public R visitCreateDatabase(DatabaseSchemaPlan createDatabasePlan, C context) { + public R visitCreateDatabase(final DatabaseSchemaPlan createDatabasePlan, final C context) { return visitPlan(createDatabasePlan, context); } - public R visitAlterDatabase(DatabaseSchemaPlan alterDatabasePlan, C context) { + public R visitAlterDatabase(final DatabaseSchemaPlan alterDatabasePlan, final C context) { return visitPlan(alterDatabasePlan, context); } - public R visitDeleteDatabase(DeleteDatabasePlan deleteDatabasePlan, C context) { + public R visitDeleteDatabase(final DeleteDatabasePlan deleteDatabasePlan, final C context) { return visitPlan(deleteDatabasePlan, context); } - public R visitCreateSchemaTemplate(CreateSchemaTemplatePlan createSchemaTemplatePlan, C context) { + public R visitCreateSchemaTemplate( + final CreateSchemaTemplatePlan createSchemaTemplatePlan, final C context) { return visitPlan(createSchemaTemplatePlan, context); } public R visitCommitSetSchemaTemplate( - CommitSetSchemaTemplatePlan commitSetSchemaTemplatePlan, C context) { + final CommitSetSchemaTemplatePlan commitSetSchemaTemplatePlan, final C context) { return visitPlan(commitSetSchemaTemplatePlan, context); } public R visitPipeUnsetSchemaTemplate( - PipeUnsetSchemaTemplatePlan pipeUnsetSchemaTemplatePlan, C context) { + final PipeUnsetSchemaTemplatePlan pipeUnsetSchemaTemplatePlan, final C context) { return visitPlan(pipeUnsetSchemaTemplatePlan, context); } - public R visitExtendSchemaTemplate(ExtendSchemaTemplatePlan extendSchemaTemplatePlan, C context) { + public R visitExtendSchemaTemplate( + final ExtendSchemaTemplatePlan extendSchemaTemplatePlan, final C context) { return visitPlan(extendSchemaTemplatePlan, context); } - public R visitDropSchemaTemplate(DropSchemaTemplatePlan dropSchemaTemplatePlan, C context) { + public R visitDropSchemaTemplate( + final DropSchemaTemplatePlan dropSchemaTemplatePlan, final C context) { return visitPlan(dropSchemaTemplatePlan, context); } - public R visitPipeDeleteTimeSeries(PipeDeleteTimeSeriesPlan pipeDeleteTimeSeriesPlan, C context) { + public R visitPipeDeleteTimeSeries( + final PipeDeleteTimeSeriesPlan pipeDeleteTimeSeriesPlan, final C context) { return visitPlan(pipeDeleteTimeSeriesPlan, context); } public R visitPipeDeleteLogicalView( - PipeDeleteLogicalViewPlan pipeDeleteLogicalViewPlan, C context) { + final PipeDeleteLogicalViewPlan pipeDeleteLogicalViewPlan, final C context) { return visitPlan(pipeDeleteLogicalViewPlan, context); } public R visitPipeDeactivateTemplate( - PipeDeactivateTemplatePlan pipeDeactivateTemplatePlan, C context) { + final PipeDeactivateTemplatePlan pipeDeactivateTemplatePlan, final C context) { return visitPlan(pipeDeactivateTemplatePlan, context); } - public R visitCreateUser(AuthorPlan createUserPlan, C context) { + public R visitCreateUser(final AuthorPlan createUserPlan, final C context) { return visitPlan(createUserPlan, context); } - public R visitCreateRawUser(AuthorPlan createRawUserPlan, C context) { + public R visitCreateRawUser(final AuthorPlan createRawUserPlan, final C context) { return visitPlan(createRawUserPlan, context); } - public R visitUpdateUser(AuthorPlan updateUserPlan, C context) { + public R visitUpdateUser(final AuthorPlan updateUserPlan, final C context) { return visitPlan(updateUserPlan, context); } - public R visitDropUser(AuthorPlan dropUserPlan, C context) { + public R visitDropUser(final AuthorPlan dropUserPlan, final C context) { return visitPlan(dropUserPlan, context); } - public R visitGrantUser(AuthorPlan grantUserPlan, C context) { + public R visitGrantUser(final AuthorPlan grantUserPlan, final C context) { return visitPlan(grantUserPlan, context); } - public R visitRevokeUser(AuthorPlan revokeUserPlan, C context) { + public R visitRevokeUser(final AuthorPlan revokeUserPlan, final C context) { return visitPlan(revokeUserPlan, context); } - public R visitCreateRole(AuthorPlan createRolePlan, C context) { + public R visitCreateRole(final AuthorPlan createRolePlan, final C context) { return visitPlan(createRolePlan, context); } - public R visitDropRole(AuthorPlan dropRolePlan, C context) { + public R visitDropRole(final AuthorPlan dropRolePlan, final C context) { return visitPlan(dropRolePlan, context); } - public R visitGrantRole(AuthorPlan grantRolePlan, C context) { + public R visitGrantRole(final AuthorPlan grantRolePlan, final C context) { return visitPlan(grantRolePlan, context); } - public R visitRevokeRole(AuthorPlan revokeRolePlan, C context) { + public R visitRevokeRole(final AuthorPlan revokeRolePlan, final C context) { return visitPlan(revokeRolePlan, context); } - public R visitGrantRoleToUser(AuthorPlan grantRoleToUserPlan, C context) { + public R visitGrantRoleToUser(final AuthorPlan grantRoleToUserPlan, final C context) { return visitPlan(grantRoleToUserPlan, context); } - public R visitRevokeRoleFromUser(AuthorPlan revokeRoleFromUserPlan, C context) { + public R visitRevokeRoleFromUser(final AuthorPlan revokeRoleFromUserPlan, final C context) { return visitPlan(revokeRoleFromUserPlan, context); } - public R visitTTL(SetTTLPlan setTTLPlan, C context) { + public R visitTTL(final SetTTLPlan setTTLPlan, final C context) { return visitPlan(setTTLPlan, context); } + + public R visitPipeCreateTable(final PipeCreateTablePlan pipeCreateTablePlan, final C context) { + return visitPlan(pipeCreateTablePlan, context); + } + + public R visitAddTableColumn(final AddTableColumnPlan addTableColumnPlan, final C context) { + return visitPlan(addTableColumnPlan, context); + } + + public R visitSetTableProperties( + final SetTablePropertiesPlan setTablePropertiesPlan, final C context) { + return visitPlan(setTablePropertiesPlan, context); + } + + public R visitCommitDeleteColumn( + final CommitDeleteColumnPlan commitDeleteColumnPlan, final C context) { + return visitPlan(commitDeleteColumnPlan, context); + } + + public R visitRenameTableColumn( + final RenameTableColumnPlan renameTableColumnPlan, final C context) { + return visitPlan(renameTableColumnPlan, context); + } + + public R visitCommitDeleteTable( + final CommitDeleteTablePlan commitDeleteTablePlan, final C context) { + return visitPlan(commitDeleteTablePlan, context); + } + + public R visitPipeDeleteDevices( + final PipeDeleteDevicesPlan pipeDeleteDevicesPlan, final C context) { + return visitPlan(pipeDeleteDevicesPlan, context); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/payload/PipeCreateTablePlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/payload/PipeCreateTablePlan.java new file mode 100644 index 000000000000..c4bef02e707c --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/payload/PipeCreateTablePlan.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.consensus.request.write.pipe.payload; + +import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; +import org.apache.iotdb.confignode.consensus.request.write.table.PreCreateTablePlan; + +public class PipeCreateTablePlan extends PreCreateTablePlan { + public PipeCreateTablePlan() { + super(ConfigPhysicalPlanType.PipeCreateTable); + } + + public PipeCreateTablePlan(final String database, final TsTable table) { + super(ConfigPhysicalPlanType.PipeCreateTable, database, table); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/payload/PipeDeactivateTemplatePlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/payload/PipeDeactivateTemplatePlan.java index cf9afa0fd8c9..b91d7ab8c88d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/payload/PipeDeactivateTemplatePlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/payload/PipeDeactivateTemplatePlan.java @@ -44,7 +44,7 @@ public PipeDeactivateTemplatePlan() { super(ConfigPhysicalPlanType.PipeDeactivateTemplate); } - public PipeDeactivateTemplatePlan(Map> templateSetInfo) { + public PipeDeactivateTemplatePlan(final Map> templateSetInfo) { super(ConfigPhysicalPlanType.PipeDeactivateTemplate); this.templateSetInfo = templateSetInfo; } @@ -54,26 +54,26 @@ public Map> getTemplateSetInfo() { } @Override - protected void serializeImpl(DataOutputStream stream) throws IOException { + protected void serializeImpl(final DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); ReadWriteIOUtils.write(templateSetInfo.size(), stream); - for (Map.Entry> entry : templateSetInfo.entrySet()) { + for (final Map.Entry> entry : templateSetInfo.entrySet()) { entry.getKey().serialize(stream); ReadWriteIOUtils.write(entry.getValue().size(), stream); - for (Template template : entry.getValue()) { + for (final Template template : entry.getValue()) { template.serialize(stream); } } } @Override - protected void deserializeImpl(ByteBuffer buffer) throws IOException { - int size = ReadWriteIOUtils.readInt(buffer); + protected void deserializeImpl(final ByteBuffer buffer) throws IOException { + final int size = ReadWriteIOUtils.readInt(buffer); templateSetInfo = new HashMap<>(); for (int i = 0; i < size; i++) { - PartialPath pattern = (PartialPath) PathDeserializeUtil.deserialize(buffer); - int templateNum = ReadWriteIOUtils.readInt(buffer); - List