Skip to content

Commit

Permalink
Merge branch 'master' into sub-file
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies committed May 21, 2024
2 parents bfed7f3 + 3b0dfcd commit 871c3c3
Show file tree
Hide file tree
Showing 95 changed files with 2,816 additions and 655 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,12 @@ public CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize) {
return this;
}

@Override
public CommonConfig setCnConnectionTimeoutMs(int connectionTimeoutMs) {
setProperty("cn_connection_timeout_ms", String.valueOf(connectionTimeoutMs));
return this;
}

// For part of the log directory
public String getClusterConfigStr() {
return fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,4 @@ public ConfigNodeConfig setMetricReporterType(List<String> metricReporterTypes)
properties.setProperty("cn_metric_reporter_list", String.join(",", metricReporterTypes));
return this;
}

@Override
public ConfigNodeConfig setConnectionTimeoutMs(long connectionTimeoutMs) {
properties.setProperty("cn_connection_timeout_ms", String.valueOf(connectionTimeoutMs));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -438,4 +438,11 @@ public CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize) {
cnConfig.setTagAttributeTotalSize(tagAttributeTotalSize);
return this;
}

@Override
public CommonConfig setCnConnectionTimeoutMs(int connectionTimeoutMs) {
dnConfig.setCnConnectionTimeoutMs(connectionTimeoutMs);
cnConfig.setCnConnectionTimeoutMs(connectionTimeoutMs);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,9 @@ public CommonConfig setWalMode(String walMode) {
public CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize) {
return this;
}

@Override
public CommonConfig setCnConnectionTimeoutMs(int connectionTimeoutMs) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,4 @@ public class RemoteConfigNodeConfig implements ConfigNodeConfig {
public ConfigNodeConfig setMetricReporterType(List<String> metricReporterTypes) {
return this;
}

@Override
public ConfigNodeConfig setConnectionTimeoutMs(long connectionTimeoutMs) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,6 @@ CommonConfig setEnableAutoLeaderBalanceForIoTConsensus(
CommonConfig setWalMode(String walMode);

CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize);

CommonConfig setCnConnectionTimeoutMs(int connectionTimeoutMs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,4 @@
/** This interface is used to handle properties in iotdb-confignode.properties. */
public interface ConfigNodeConfig {
ConfigNodeConfig setMetricReporterType(List<String> metricReporterTypes);

ConfigNodeConfig setConnectionTimeoutMs(long connectionTimeoutMs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public void setUp() {
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);

// 10 min, assert that the operations will not time out
senderEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
receiverEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);

senderEnv.initClusterEnvironment();
receiverEnv.initClusterEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ public void setUp() {
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);

// 10 min, assert that the operations will not time out
senderEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
receiverEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);

senderEnv.initClusterEnvironment(3, 3, 180);
receiverEnv.initClusterEnvironment(3, 3, 180);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public void setUp() {
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);

// 10 min, assert that the operations will not time out
senderEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
receiverEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);

senderEnv.initClusterEnvironment();
receiverEnv.initClusterEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,30 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2AutoCreateSchema.class})
public class IoTDBPipeDataSinkIT extends AbstractPipeDualAutoIT {
@Test
public void testThriftConnector() throws Exception {
public void testThriftConnectorWithRealtimeFirst() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {

// Do not fail if the failure has nothing to do with pipe
// Because the failures will randomly generate due to resource limitation
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList("insert into root.vehicle.d0(time, s1) values (0, 1)", "flush"))) {
return;
}

final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();
Expand All @@ -60,6 +70,7 @@ public void testThriftConnector() throws Exception {
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
connectorAttributes.put("connector.realtime-first", "true");

final TSStatus status =
client.createPipe(
Expand All @@ -76,15 +87,15 @@ public void testThriftConnector() throws Exception {
// Because the failures will randomly generate due to resource limitation
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList("insert into root.vehicle.d0(time, s1) values (0, 1)", "flush"))) {
Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)", "flush"))) {
return;
}

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.**",
"Time,root.vehicle.d0.s1,",
Collections.singleton("0,1.0,"));
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("0,1.0,", "1,1.0,"))));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public void setUp() {
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);

// 10 min, assert that the operations will not time out
senderEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
receiverEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);

senderEnv.initClusterEnvironment();
receiverEnv.initClusterEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public void setUp() {
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);

// 10 min, assert that the operations will not time out
senderEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
receiverEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);

senderEnv.initClusterEnvironment();
receiverEnv.initClusterEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ private void innerSetUp(
.setDataReplicationFactor(dataRegionReplicationFactor);

// 10 min, assert that the operations will not time out
senderEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
receiverEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);

senderEnv.initClusterEnvironment(configNodesNum, dataNodesNum);
receiverEnv.initClusterEnvironment(configNodesNum, dataNodesNum);
Expand Down Expand Up @@ -180,8 +180,8 @@ public void testPipeOnBothSenderAndReceiver() throws Exception {
.setDataReplicationFactor(1);

// 10 min, assert that the operations will not time out
senderEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
receiverEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);

senderEnv.initClusterEnvironment(3, 3);
receiverEnv.initClusterEnvironment(1, 1);
Expand Down Expand Up @@ -373,8 +373,8 @@ private void doTestUseNodeUrls(String connectorName) throws Exception {
.setDataReplicationFactor(2);

// 10 min, assert that the operations will not time out
senderEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
receiverEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);

senderEnv.initClusterEnvironment(1, 1);
receiverEnv.initClusterEnvironment(1, 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public void setUp() {
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);

// 10 min, assert that the operations will not time out
senderEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
receiverEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);

senderEnv.initClusterEnvironment();
receiverEnv.initClusterEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,7 @@ public void testPureDeleteInclusion() throws Exception {
receiverEnv,
"select * from root.**",
"Time,root.ln.wf01.wt01.status1,",
Collections.emptySet(),
10);
Collections.emptySet());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public void setUp() {
.setDataReplicationFactor(2);

// 10 min, assert that the operations will not time out
senderEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
receiverEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);

senderEnv.initClusterEnvironment();
receiverEnv.initClusterEnvironment(3, 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public void setUp() {
.setSchemaReplicationFactor(3);

// 10 min, assert that the operations will not time out
senderEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
receiverEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);

senderEnv.initClusterEnvironment(3, 3, 180);
receiverEnv.initClusterEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void setUp() {
env = MultiEnvFactory.getEnv(0);
env.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
// 10 min, assert that the operations will not time out
env.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
env.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
env.initClusterEnvironment();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public void setUp() {
receiverEnv.getConfig().getCommonConfig().setPipeAirGapReceiverEnabled(true);

// 10 min, assert that the operations will not time out
senderEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600_000);
receiverEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600_000);
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);

senderEnv.initClusterEnvironment();
receiverEnv.initClusterEnvironment();
Expand Down
4 changes: 4 additions & 0 deletions iotdb-core/confignode/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@
<artifactId>tsfile</artifactId>
<version>${tsfile.version}</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Loading

0 comments on commit 871c3c3

Please sign in to comment.