From d4a891b905211d7f7f6ddb8fd5693a6483f48e6d Mon Sep 17 00:00:00 2001 From: Steve Yurong Su Date: Fri, 15 Nov 2024 15:55:32 +0800 Subject: [PATCH] Pipe: Disable unstable features in the distribution --- .github/workflows/pipe-it-2cluster.yml | 547 ------------------ .../it/autocreate/IoTDBPipeProtocolIT.java | 16 +- .../PipeConfigRegionConnectorConstructor.java | 7 - .../PipeDataRegionConnectorConstructor.java | 11 - .../PipeDataRegionProcessorConstructor.java | 30 - .../PipeSchemaRegionConnectorConstructor.java | 7 - .../dataregion/IoTDBDataRegionExtractor.java | 6 + .../PipeRealtimeDataRegionExtractor.java | 10 +- .../plugin/builtin/BuiltinPipePlugin.java | 35 -- .../pipe/extractor/IoTDBExtractor.java | 10 +- 10 files changed, 11 insertions(+), 668 deletions(-) delete mode 100644 .github/workflows/pipe-it-2cluster.yml diff --git a/.github/workflows/pipe-it-2cluster.yml b/.github/workflows/pipe-it-2cluster.yml deleted file mode 100644 index 9be00874afbd..000000000000 --- a/.github/workflows/pipe-it-2cluster.yml +++ /dev/null @@ -1,547 +0,0 @@ -name: Multi-Cluster IT - -on: - push: - branches: - - master - - 'rel/1.*' - - 'rc/1.*' - - 'force_ci/**' - paths-ignore: - - 'docs/**' - - 'site/**' - - 'iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/**' #queryengine - pull_request: - branches: - - master - - 'rel/1.*' - - 'rc/1.*' - - 'force_ci/**' - paths-ignore: - - 'docs/**' - - 'site/**' - - 'iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/**' #queryengine - # allow manually run the action: - workflow_dispatch: - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -env: - MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 - MAVEN_ARGS: --batch-mode --no-transfer-progress - DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} - -jobs: - auto-create-schema: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, PipeConsensusBatchMode, PipeConsensusStreamMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2AutoCreateSchema \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-auto-create-schema-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 - manual-create-schema: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, PipeConsensusBatchMode, PipeConsensusStreamMode] - cluster2: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode] - os: [ ubuntu-latest ] - exclude: - - cluster1: LightWeightStandaloneMode - cluster2: LightWeightStandaloneMode - - cluster1: LightWeightStandaloneMode - cluster2: ScalableSingleNodeMode - - cluster1: ScalableSingleNodeMode - cluster2: LightWeightStandaloneMode - - cluster1: ScalableSingleNodeMode - cluster2: HighPerformanceMode - - cluster1: HighPerformanceMode - cluster2: LightWeightStandaloneMode - - cluster1: HighPerformanceMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusBatchMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusBatchMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusStreamMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusStreamMode - cluster2: HighPerformanceMode - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2ManualCreateSchema \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-manual-create-schema-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - subscription-arch-verification: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 17 ] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [ ScalableSingleNodeMode ] - cluster2: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionArchVerification \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - subscription-regression-consumer: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 17 ] - # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ ScalableSingleNodeMode ] - cluster2: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionRegressionConsumer \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-regression-consumer-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - subscription-regression-misc: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 17 ] - # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ ScalableSingleNodeMode ] - cluster2: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionRegressionMisc \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-regression-misc-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - table-model: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, PipeConsensusBatchMode, PipeConsensusStreamMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2TableModel \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-table-model-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java index c5d41f001cb8..5a85496c4050 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java @@ -342,11 +342,6 @@ public void testAsyncConnectorUseNodeUrls() throws Exception { doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName()); } - @Test - public void testAirGapConnectorUseNodeUrls() throws Exception { - doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName()); - } - private void doTestUseNodeUrls(String connectorName) throws Exception { senderEnv .getConfig() @@ -381,16 +376,7 @@ private void doTestUseNodeUrls(String connectorName) throws Exception { final StringBuilder nodeUrlsBuilder = new StringBuilder(); for (final DataNodeWrapper wrapper : receiverEnv.getDataNodeWrapperList()) { - if (connectorName.equals(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) { - // Use default port for convenience - nodeUrlsBuilder - .append(wrapper.getIp()) - .append(":") - .append(wrapper.getPipeAirGapReceiverPort()) - .append(","); - } else { - nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(","); - } + nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(","); } try (final SyncConfigNodeIServiceClient client = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionConnectorConstructor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionConnectorConstructor.java index 74a34e503be2..514c4aa7d509 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionConnectorConstructor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionConnectorConstructor.java @@ -22,7 +22,6 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.donothing.DoNothingConnector; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeConnectorConstructor; -import org.apache.iotdb.confignode.manager.pipe.connector.protocol.IoTDBConfigRegionAirGapConnector; import org.apache.iotdb.confignode.manager.pipe.connector.protocol.IoTDBConfigRegionConnector; import org.apache.iotdb.pipe.api.PipeConnector; @@ -42,9 +41,6 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(), IoTDBConfigRegionConnector::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), - IoTDBConfigRegionAirGapConnector::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingConnector::new); @@ -59,9 +55,6 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(), IoTDBConfigRegionConnector::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), - IoTDBConfigRegionAirGapConnector::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingConnector::new); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java index e11e8dc56853..ea3583838354 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java @@ -23,9 +23,7 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.donothing.DoNothingConnector; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeConnectorConstructor; import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper; -import org.apache.iotdb.db.pipe.connector.protocol.airgap.IoTDBDataRegionAirGapConnector; import org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector; -import org.apache.iotdb.db.pipe.connector.protocol.opcua.OpcUaConnector; import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector; import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector; import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector; @@ -58,13 +56,8 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(), IoTDBLegacyPipeConnector::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), - IoTDBDataRegionAirGapConnector::new); pluginConstructors.put( BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(), WebSocketConnector::new); - pluginConstructors.put( - BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), OpcUaConnector::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingConnector::new); pluginConstructors.put( @@ -85,12 +78,8 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOTDB_LEGACY_PIPE_SINK.getPipePluginName(), IoTDBLegacyPipeConnector::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), - IoTDBDataRegionAirGapConnector::new); pluginConstructors.put( BuiltinPipePlugin.WEBSOCKET_SINK.getPipePluginName(), WebSocketConnector::new); - pluginConstructors.put(BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(), OpcUaConnector::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingConnector::new); pluginConstructors.put( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java index 31cc8250ebf9..33143bdc27ed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java @@ -21,18 +21,10 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeProcessorConstructor; import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper; -import org.apache.iotdb.db.pipe.processor.aggregate.AggregateProcessor; -import org.apache.iotdb.db.pipe.processor.aggregate.operator.processor.StandardStatisticsOperatorProcessor; -import org.apache.iotdb.db.pipe.processor.aggregate.window.processor.TumblingWindowingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.changing.ChangingValueSamplingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor; import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor; import org.apache.iotdb.db.pipe.processor.schemachange.RenameDatabaseProcessor; -import org.apache.iotdb.db.pipe.processor.twostage.plugin.TwoStageCountProcessor; class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor { @@ -44,28 +36,6 @@ class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor { protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName(), DoNothingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName(), - TumblingTimeSamplingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.SDT_SAMPLING_PROCESSOR.getPipePluginName(), - SwingingDoorTrendingSamplingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName(), - ChangingValueSamplingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.THROWING_EXCEPTION_PROCESSOR.getPipePluginName(), - ThrowingExceptionProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.AGGREGATE_PROCESSOR.getPipePluginName(), AggregateProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.STANDARD_STATISTICS_PROCESSOR.getPipePluginName(), - StandardStatisticsOperatorProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.TUMBLING_WINDOWING_PROCESSOR.getPipePluginName(), - TumblingWindowingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.COUNT_POINT_PROCESSOR.getPipePluginName(), TwoStageCountProcessor::new); pluginConstructors.put( BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(), PipeConsensusProcessor::new); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionConnectorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionConnectorConstructor.java index 6c96aaaeddc2..fb87b90f8775 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionConnectorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionConnectorConstructor.java @@ -22,7 +22,6 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.donothing.DoNothingConnector; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeConnectorConstructor; -import org.apache.iotdb.db.pipe.connector.protocol.airgap.IoTDBSchemaRegionAirGapConnector; import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBSchemaRegionConnector; import org.apache.iotdb.pipe.api.PipeConnector; @@ -42,9 +41,6 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(), IoTDBSchemaRegionConnector::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), - IoTDBSchemaRegionAirGapConnector::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingConnector::new); @@ -59,9 +55,6 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(), IoTDBSchemaRegionConnector::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), - IoTDBSchemaRegionAirGapConnector::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingConnector::new); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java index b53f35e80ba4..f2c1e553bdd6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; @@ -444,6 +445,11 @@ private void constructRealtimeExtractor(final PipeParameters parameters) return; } + if (pipeName == null || !pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) { + realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor(); + return; + } + // Use hybrid mode by default if (!parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY) && !parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java index 6957ccfdbc36..04d5c05d6324 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; -import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; @@ -109,7 +108,7 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { private final AtomicReference> dataRegionTimePartitionIdBound = new AtomicReference<>(); - protected boolean isForwardingPipeRequests; + protected boolean isForwardingPipeRequests = true; private boolean shouldTransferModFile; // Whether to transfer mods @@ -234,12 +233,7 @@ public void customize( ? TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime) : TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime) - 1; - isForwardingPipeRequests = - parameters.getBooleanOrDefault( - Arrays.asList( - PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, - PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY), - PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); + isForwardingPipeRequests = true; if (parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) { shouldTransferModFile = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java index fdbeb5bd74f0..2e6784a0d605 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java @@ -20,29 +20,19 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.donothing.DoNothingConnector; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.airgap.IoTDBAirGapConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.consensus.PipeConsensusAsyncConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift.IoTDBLegacyPipeConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift.IoTDBThriftAsyncConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift.IoTDBThriftConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSslConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSyncConnector; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.opcua.OpcUaConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.websocket.WebSocketConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.writeback.WriteBackConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.extractor.donothing.DoNothingExtractor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.extractor.iotdb.IoTDBExtractor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.AggregateProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.StandardStatisticsProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.TumblingWindowingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.ChangingValueSamplingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.pipeconsensus.PipeConsensusProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.schemachange.RenameDatabaseProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.twostage.TwoStageCountProcessor; import java.util.Arrays; import java.util.Collections; @@ -60,18 +50,8 @@ public enum BuiltinPipePlugin { // processors DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class), - TUMBLING_TIME_SAMPLING_PROCESSOR( - "tumbling-time-sampling-processor", TumblingTimeSamplingProcessor.class), - SDT_SAMPLING_PROCESSOR("sdt-sampling-processor", SwingingDoorTrendingSamplingProcessor.class), - CHANGING_VALUE_SAMPLING_PROCESSOR( - "changing-value-sampling-processor", ChangingValueSamplingProcessor.class), - THROWING_EXCEPTION_PROCESSOR("throwing-exception-processor", ThrowingExceptionProcessor.class), - AGGREGATE_PROCESSOR("aggregate-processor", AggregateProcessor.class), - COUNT_POINT_PROCESSOR("count-point-processor", TwoStageCountProcessor.class), // Hidden-processors, which are plugins of the processors - STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor", StandardStatisticsProcessor.class), - TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor", TumblingWindowingProcessor.class), PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor", PipeConsensusProcessor.class), RENAME_DATABASE_PROCESSOR("rename-database-processor", RenameDatabaseProcessor.class), @@ -82,12 +62,10 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_SYNC_CONNECTOR("iotdb-thrift-sync-connector", IoTDBThriftSyncConnector.class), IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector", IoTDBThriftAsyncConnector.class), IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", IoTDBLegacyPipeConnector.class), - IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapConnector.class), PIPE_CONSENSUS_ASYNC_CONNECTOR( "pipe-consensus-async-connector", PipeConsensusAsyncConnector.class), WEBSOCKET_CONNECTOR("websocket-connector", WebSocketConnector.class), - OPC_UA_CONNECTOR("opc-ua-connector", OpcUaConnector.class), WRITE_BACK_CONNECTOR("write-back-connector", WriteBackConnector.class), DO_NOTHING_SINK("do-nothing-sink", DoNothingConnector.class), @@ -96,9 +74,7 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_SYNC_SINK("iotdb-thrift-sync-sink", IoTDBThriftSyncConnector.class), IOTDB_THRIFT_ASYNC_SINK("iotdb-thrift-async-sink", IoTDBThriftAsyncConnector.class), IOTDB_LEGACY_PIPE_SINK("iotdb-legacy-pipe-sink", IoTDBLegacyPipeConnector.class), - IOTDB_AIR_GAP_SINK("iotdb-air-gap-sink", IoTDBAirGapConnector.class), WEBSOCKET_SINK("websocket-sink", WebSocketConnector.class), - OPC_UA_SINK("opc-ua-sink", OpcUaConnector.class), WRITE_BACK_SINK("write-back-sink", WriteBackConnector.class), SUBSCRIPTION_SINK("subscription-sink", DoNothingConnector.class), PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink", PipeConsensusAsyncConnector.class), @@ -136,14 +112,6 @@ public String getClassName() { // Sources DO_NOTHING_SOURCE.getPipePluginName().toUpperCase(), // Processors - TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), - SDT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), - CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), - THROWING_EXCEPTION_PROCESSOR.getPipePluginName().toUpperCase(), - AGGREGATE_PROCESSOR.getPipePluginName().toUpperCase(), - COUNT_POINT_PROCESSOR.getPipePluginName().toUpperCase(), - STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(), - TUMBLING_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(), PIPE_CONSENSUS_PROCESSOR.getPipePluginName().toUpperCase(), RENAME_DATABASE_PROCESSOR.getPipePluginName().toUpperCase(), // Connectors @@ -153,9 +121,7 @@ public String getClassName() { IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase(), IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(), IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase(), - IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase(), WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase(), - OPC_UA_CONNECTOR.getPipePluginName().toUpperCase(), WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(), PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(), // Sinks @@ -163,7 +129,6 @@ public String getClassName() { IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(), IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(), WEBSOCKET_SINK.getPipePluginName().toUpperCase(), - OPC_UA_SINK.getPipePluginName().toUpperCase(), WRITE_BACK_SINK.getPipePluginName().toUpperCase(), SUBSCRIPTION_SINK.getPipePluginName().toUpperCase(), PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName().toUpperCase()))); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java index 2d3a1615f00e..1e7d3eb53c25 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java @@ -20,7 +20,6 @@ package org.apache.iotdb.commons.pipe.extractor; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; -import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; import org.apache.iotdb.pipe.api.PipeExtractor; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; @@ -48,7 +47,7 @@ public abstract class IoTDBExtractor implements PipeExtractor { protected int regionId; protected PipeTaskMeta pipeTaskMeta; - protected boolean isForwardingPipeRequests; + protected boolean isForwardingPipeRequests = true; // The value is always true after the first start even the extractor is closed protected final AtomicBoolean hasBeenStarted = new AtomicBoolean(false); @@ -99,12 +98,7 @@ public void customize( taskID = pipeName + "_" + regionId + "_" + creationTime; pipeTaskMeta = environment.getPipeTaskMeta(); - isForwardingPipeRequests = - parameters.getBooleanOrDefault( - Arrays.asList( - PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, - PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY), - PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); + isForwardingPipeRequests = true; } @Override