Skip to content

Commit

Permalink
Separate table/tree model database & Added SchemaRegionGroupNum / dat…
Browse files Browse the repository at this point in the history
…aRegionGroupNum in table databases details & Optimized the message for table flush operation with database
  • Loading branch information
Caideyipi authored Dec 20, 2024
1 parent 5024142 commit aee90fb
Show file tree
Hide file tree
Showing 120 changed files with 1,280 additions and 1,629 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,6 @@ public void testStorageGroupWithHyphenInName() {
try (final ResultSet resultSet = statement.executeQuery("SHOW DATABASES DETAILS")) {
while (resultSet.next()) {
Assert.assertEquals("root.group_with_hyphen", resultSet.getString(1));
Assert.assertEquals("TREE", resultSet.getString(12));
}
}
} catch (final SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ public void testDeleteMultipleStorageGroupWithQuote() throws Exception {

@Test(expected = SQLException.class)
public void deleteNonExistStorageGroup() throws Exception {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement()) {
statement.execute("CREATE DATABASE root.ln2.wf01.wt01");
statement.execute("DELETE DATABASE root.ln2.wf01.wt02");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception {
}

final AtomicInteger leaderPort = new AtomicInteger(-1);
final TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
final TShowRegionResp showRegionResp =
client.showRegion(new TShowRegionReq().setIsTableModel(true));
showRegionResp
.getRegionInfoList()
.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,15 @@ public void testFlushGivenGroupNoData() {
@Test
@Ignore
public void testFlushNotExistGroupNoData() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement()) {
statement.execute("CREATE DATABASE root.noexist.nodatagroup1");
try {
statement.execute(
"FLUSH root.noexist.nodatagroup1,root.notExistGroup1,root.notExistGroup2");
} catch (SQLException sqe) {
} catch (final SQLException sqe) {
String expectedMsg =
"322: 322: storageGroup root.notExistGroup1,root.notExistGroup2 does not exist";
"322: 322: Database root.notExistGroup1,root.notExistGroup2 does not exist";
sqe.printStackTrace();
assertTrue(sqe.getMessage().contains(expectedMsg));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public void testManageDatabase() {
int[] schemaReplicaFactors = new int[] {1};
int[] dataReplicaFactors = new int[] {1};
int[] timePartitionInterval = new int[] {604800000};
String[] model = new String[] {"TABLE"};

// show
try (final ResultSet resultSet = statement.executeQuery("SHOW DATABASES")) {
Expand All @@ -112,6 +111,8 @@ public void testManageDatabase() {
assertEquals(databaseNames.length, cnt);
}

final int[] schemaRegionGroupNum = new int[] {0};
final int[] dataRegionGroupNum = new int[] {0};
// show
try (final ResultSet resultSet = statement.executeQuery("SHOW DATABASES DETAILS")) {
int cnt = 0;
Expand All @@ -130,7 +131,8 @@ public void testManageDatabase() {
assertEquals(schemaReplicaFactors[cnt], resultSet.getInt(3));
assertEquals(dataReplicaFactors[cnt], resultSet.getInt(4));
assertEquals(timePartitionInterval[cnt], resultSet.getLong(5));
assertEquals(model[cnt], resultSet.getString(6));
assertEquals(schemaRegionGroupNum[cnt], resultSet.getInt(6));
assertEquals(dataRegionGroupNum[cnt], resultSet.getInt(7));
cnt++;
}
assertEquals(databaseNames.length, cnt);
Expand Down Expand Up @@ -372,7 +374,8 @@ public void testInformationSchema() throws SQLException {
"schema_replication_factor,INT32,ATTRIBUTE,",
"data_replication_factor,INT32,ATTRIBUTE,",
"time_partition_interval,INT64,ATTRIBUTE,",
"model,STRING,ATTRIBUTE,")));
"schema_region_group_num,INT32,ATTRIBUTE,",
"data_region_group_num,INT32,ATTRIBUTE,")));
TestUtils.assertResultSetEqual(
statement.executeQuery("desc tables"),
"ColumnName,DataType,Category,",
Expand Down Expand Up @@ -405,4 +408,52 @@ public void testInformationSchema() throws SQLException {
"statement,STRING,ATTRIBUTE,")));
}
}

@Test
public void testMixedDatabase() throws SQLException {
try (final Connection connection =
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute("create database test");
}

try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement()) {
statement.execute("create database root.test");
statement.execute(
"alter database root.test WITH SCHEMA_REGION_GROUP_NUM=2, DATA_REGION_GROUP_NUM=3");
statement.execute("drop database root.test");
}

try (final Connection connection =
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
try (final ResultSet resultSet = statement.executeQuery("SHOW DATABASES DETAILS")) {
assertTrue(resultSet.next());
if (resultSet.getString(1).equals("information_schema")) {
assertTrue(resultSet.next());
}
assertEquals("test", resultSet.getString(1));
assertEquals(0, resultSet.getInt(6));
assertEquals(0, resultSet.getInt(7));
assertFalse(resultSet.next());
}
}

try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement()) {
statement.execute("create database root.test");
}

try (final Connection connection =
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute("drop database test");
}

try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement()) {
TestUtils.assertResultSetSize(statement.executeQuery("show databases"), 1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,27 @@ public class CountDatabasePlan extends ConfigPhysicalReadPlan {

private final String[] storageGroupPattern;
private final PathPatternTree scope;
private final boolean isTableModel;

public CountDatabasePlan(final List<String> storageGroupPattern, final PathPatternTree scope) {
public CountDatabasePlan(
final List<String> storageGroupPattern,
final PathPatternTree scope,
final boolean isTableModel) {
super(ConfigPhysicalPlanType.CountDatabase);
this.storageGroupPattern = storageGroupPattern.toArray(new String[0]);
this.scope = scope;
this.isTableModel = isTableModel;
}

public CountDatabasePlan(
final ConfigPhysicalPlanType type,
final List<String> storageGroupPattern,
final PathPatternTree scope) {
final PathPatternTree scope,
final boolean isTableModel) {
super(type);
this.storageGroupPattern = storageGroupPattern.toArray(new String[0]);
this.scope = scope;
this.isTableModel = isTableModel;
}

public String[] getDatabasePattern() {
Expand All @@ -54,6 +61,10 @@ public PathPatternTree getScope() {
return scope;
}

public boolean isTableModel() {
return isTableModel;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@

public class GetDatabasePlan extends CountDatabasePlan {

public GetDatabasePlan(final List<String> storageGroupPathPattern, final PathPatternTree scope) {
super(ConfigPhysicalPlanType.GetDatabase, storageGroupPathPattern, scope);
public GetDatabasePlan(
final List<String> storageGroupPathPattern,
final PathPatternTree scope,
final boolean isTableModel) {
super(ConfigPhysicalPlanType.GetDatabase, storageGroupPathPattern, scope, isTableModel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public AdjustMaxRegionGroupNumPlan() {
this.maxRegionGroupNumMap = new HashMap<>();
}

public void putEntry(String storageGroup, Pair<Integer, Integer> maxRegionGroupNum) {
public void putEntry(final String storageGroup, final Pair<Integer, Integer> maxRegionGroupNum) {
maxRegionGroupNumMap.put(storageGroup, maxRegionGroupNum);
}

Expand All @@ -51,11 +51,11 @@ public Map<String, Pair<Integer, Integer>> getMaxRegionGroupNumMap() {
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
protected void serializeImpl(final DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(getType().getPlanType(), stream);

ReadWriteIOUtils.write(maxRegionGroupNumMap.size(), stream);
for (Map.Entry<String, Pair<Integer, Integer>> maxRegionGroupNumEntry :
for (final Map.Entry<String, Pair<Integer, Integer>> maxRegionGroupNumEntry :
maxRegionGroupNumMap.entrySet()) {
ReadWriteIOUtils.write(maxRegionGroupNumEntry.getKey(), stream);
ReadWriteIOUtils.write(maxRegionGroupNumEntry.getValue().getLeft(), stream);
Expand All @@ -64,27 +64,27 @@ protected void serializeImpl(DataOutputStream stream) throws IOException {
}

@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
int storageGroupNum = buffer.getInt();
protected void deserializeImpl(final ByteBuffer buffer) throws IOException {
final int storageGroupNum = buffer.getInt();

for (int i = 0; i < storageGroupNum; i++) {
String storageGroup = ReadWriteIOUtils.readString(buffer);
int maxSchemaRegionGroupNum = buffer.getInt();
int maxDataRegionGroupNum = buffer.getInt();
final String storageGroup = ReadWriteIOUtils.readString(buffer);
final int maxSchemaRegionGroupNum = buffer.getInt();
final int maxDataRegionGroupNum = buffer.getInt();
maxRegionGroupNumMap.put(
storageGroup, new Pair<>(maxSchemaRegionGroupNum, maxDataRegionGroupNum));
}
}

@Override
public boolean equals(Object o) {
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AdjustMaxRegionGroupNumPlan that = (AdjustMaxRegionGroupNumPlan) o;
final AdjustMaxRegionGroupNumPlan that = (AdjustMaxRegionGroupNumPlan) o;
return maxRegionGroupNumMap.equals(that.maxRegionGroupNumMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public SetSpaceQuotaPlan() {
super(ConfigPhysicalPlanType.setSpaceQuota);
}

public SetSpaceQuotaPlan(List<String> prefixPathList, TSpaceQuota spaceLimit) {
public SetSpaceQuotaPlan(final List<String> prefixPathList, final TSpaceQuota spaceLimit) {
super(ConfigPhysicalPlanType.setSpaceQuota);
this.prefixPathList = prefixPathList;
this.spaceLimit = spaceLimit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public CreateRegionGroupsPlan() {
this.regionGroupMap = new HashMap<>();
}

public CreateRegionGroupsPlan(ConfigPhysicalPlanType type) {
public CreateRegionGroupsPlan(final ConfigPhysicalPlanType type) {
super(type);
this.regionGroupMap = new HashMap<>();
}
Expand All @@ -59,16 +59,17 @@ public Map<String, List<TRegionReplicaSet>> getRegionGroupMap() {
return regionGroupMap;
}

public void addRegionGroup(String database, TRegionReplicaSet regionReplicaSet) {
public void addRegionGroup(final String database, final TRegionReplicaSet regionReplicaSet) {
regionGroupMap
.computeIfAbsent(database, regionReplicaSets -> new ArrayList<>())
.add(regionReplicaSet);
}

public void planLog(Logger logger) {
for (Map.Entry<String, List<TRegionReplicaSet>> regionGroupEntry : regionGroupMap.entrySet()) {
String database = regionGroupEntry.getKey();
for (TRegionReplicaSet regionReplicaSet : regionGroupEntry.getValue()) {
public void planLog(final Logger logger) {
for (final Map.Entry<String, List<TRegionReplicaSet>> regionGroupEntry :
regionGroupMap.entrySet()) {
final String database = regionGroupEntry.getKey();
for (final TRegionReplicaSet regionReplicaSet : regionGroupEntry.getValue()) {
logger.info(
"[CreateRegionGroups] RegionGroup: {}, belonged database: {}, on DataNodes: {}",
regionReplicaSet.getRegionId(),
Expand All @@ -80,24 +81,24 @@ public void planLog(Logger logger) {
}
}

public void serializeForProcedure(DataOutputStream stream) throws IOException {
public void serializeForProcedure(final DataOutputStream stream) throws IOException {
this.serializeImpl(stream);
}

public void deserializeForProcedure(ByteBuffer buffer) throws IOException {
public void deserializeForProcedure(final ByteBuffer buffer) throws IOException {
// to remove the planType of ConfigPhysicalPlanType
buffer.getShort();
this.deserializeImpl(buffer);
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
protected void serializeImpl(final DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());

stream.writeInt(regionGroupMap.size());
for (Entry<String, List<TRegionReplicaSet>> entry : regionGroupMap.entrySet()) {
String database = entry.getKey();
List<TRegionReplicaSet> regionReplicaSets = entry.getValue();
for (final Entry<String, List<TRegionReplicaSet>> entry : regionGroupMap.entrySet()) {
final String database = entry.getKey();
final List<TRegionReplicaSet> regionReplicaSets = entry.getValue();
BasicStructureSerDeUtil.write(database, stream);
stream.writeInt(regionReplicaSets.size());
regionReplicaSets.forEach(
Expand All @@ -107,23 +108,23 @@ protected void serializeImpl(DataOutputStream stream) throws IOException {
}

@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
int databaseNum = buffer.getInt();
protected void deserializeImpl(final ByteBuffer buffer) throws IOException {
final int databaseNum = buffer.getInt();
for (int i = 0; i < databaseNum; i++) {
String database = BasicStructureSerDeUtil.readString(buffer);
final String database = BasicStructureSerDeUtil.readString(buffer);
regionGroupMap.put(database, new ArrayList<>());

int regionReplicaSetNum = buffer.getInt();
final int regionReplicaSetNum = buffer.getInt();
for (int j = 0; j < regionReplicaSetNum; j++) {
TRegionReplicaSet regionReplicaSet =
final TRegionReplicaSet regionReplicaSet =
ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(buffer);
regionGroupMap.get(database).add(regionReplicaSet);
}
}
}

@Override
public boolean equals(Object o) {
public boolean equals(final Object o) {
if (this == o) {
return true;
}
Expand All @@ -133,7 +134,7 @@ public boolean equals(Object o) {
if (!super.equals(o)) {
return false;
}
CreateRegionGroupsPlan that = (CreateRegionGroupsPlan) o;
final CreateRegionGroupsPlan that = (CreateRegionGroupsPlan) o;
return Objects.equals(regionGroupMap, that.regionGroupMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,15 @@ public boolean takeSnapshot(File snapshotDir) {
}

@Override
public void loadSnapshot(File latestSnapshotRootDir) {
public void loadSnapshot(final File latestSnapshotRootDir) {
try {
executor.loadSnapshot(latestSnapshotRootDir);
// We recompute the snapshot for pipe listener when loading snapshot
// to recover the newest snapshot in cache
PipeConfigNodeAgent.runtime()
.listener()
.tryListenToSnapshots(ConfignodeSnapshotParser.getSnapshots());
} catch (IOException e) {
} catch (final IOException e) {
if (PipeConfigNodeAgent.runtime().listener().isOpened()) {
LOGGER.warn(
"Config Region Listening Queue Listen to snapshot failed when startup, snapshot will be tried again when starting schema transferring pipes",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class NoAvailableRegionGroupException extends ConfigNodeException {
private static final String SCHEMA_REGION_GROUP = "SchemaRegionGroup";
private static final String DATA_REGION_GROUP = "DataRegionGroup";

public NoAvailableRegionGroupException(TConsensusGroupType regionGroupType) {
public NoAvailableRegionGroupException(final TConsensusGroupType regionGroupType) {
super(
String.format(
"There are no available %s RegionGroups currently, "
Expand Down
Loading

0 comments on commit aee90fb

Please sign in to comment.