Skip to content

Commit

Permalink
Handle commit failure of table procedures
Browse files Browse the repository at this point in the history
  • Loading branch information
Caideyipi authored Sep 24, 2024
1 parent 737c4cc commit 5b22e7b
Show file tree
Hide file tree
Showing 25 changed files with 531 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public enum ConfigPhysicalPlanType {
AddTableColumn((short) 853),
SetTableProperties((short) 854),
ShowTable((short) 855),
FetchTable((short) 856),

/** Deprecated types for sync, restored them for upgrade. */
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.consensus.request.read.table;

import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;

import java.util.Map;
import java.util.Set;

public class FetchTablePlan extends ConfigPhysicalReadPlan {

private final Map<String, Set<String>> fetchTableMap;

public FetchTablePlan(final Map<String, Set<String>> fetchTableMap) {
super(ConfigPhysicalPlanType.FetchTable);
this.fetchTableMap = fetchTableMap;
}

public Map<String, Set<String>> getFetchTableMap() {
return fetchTableMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.consensus.response.table;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.TsTableInternalRPCUtil;
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
import org.apache.iotdb.consensus.common.DataSet;

import java.util.Map;

public class FetchTableResp implements DataSet {
private final TSStatus status;
private final Map<String, Map<String, TsTable>> fetchTableMap;

public FetchTableResp(
final TSStatus status, final Map<String, Map<String, TsTable>> fetchTableMap) {
this.status = status;
this.fetchTableMap = fetchTableMap;
}

public TFetchTableResp convertToTFetchTableResp() {
return new TFetchTableResp(status)
.setTableInfoMap(TsTableInternalRPCUtil.serializeTableFetchResult(fetchTableMap));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
Expand Down Expand Up @@ -2582,6 +2583,28 @@ public TShowTableResp showTables(final String database) {
: new TShowTableResp(status);
}

@Override
public TFetchTableResp fetchTables(final Map<String, Set<String>> fetchTableMap) {
final TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
? clusterSchemaManager.fetchTables(
fetchTableMap.entrySet().stream()
.filter(
entry -> {
entry
.getValue()
.removeIf(
table ->
procedureManager
.checkDuplicateTableTask(
entry.getKey(), null, table, null, null)
.getRight());
return !entry.getValue().isEmpty();
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
: new TFetchTableResp(status);
}

@Override
public DataSet registerAINode(TAINodeRegisterReq req) {
TSStatus status = confirmLeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
Expand Down Expand Up @@ -148,6 +149,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* A subset of services provided by {@link ConfigManager}. For use internally only, passed to
Expand Down Expand Up @@ -833,4 +835,6 @@ TDataPartitionTableResp getOrCreateDataPartition(
TSStatus alterTable(final TAlterTableReq req);

TShowTableResp showTables(final String database);

TFetchTableResp fetchTables(final Map<String, Set<String>> fetchTableMap);
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public TSStatus deleteDatabases(
while (executor.isRunning()
&& System.currentTimeMillis() - startCheckTimeForProcedures < PROCEDURE_WAIT_TIME_OUT) {
final Pair<Long, Boolean> procedureIdDuplicatePair =
awaitDuplicateTableTask(
checkDuplicateTableTask(
database, null, null, null, ProcedureType.CREATE_TABLE_PROCEDURE);
hasOverlappedTask = procedureIdDuplicatePair.getRight();

Expand Down Expand Up @@ -1362,7 +1362,7 @@ private TSStatus executeWithoutDuplicate(
long procedureId;
synchronized (this) {
final Pair<Long, Boolean> procedureIdDuplicatePair =
awaitDuplicateTableTask(database, table, tableName, queryId, thisType);
checkDuplicateTableTask(database, table, tableName, queryId, thisType);
procedureId = procedureIdDuplicatePair.getLeft();

if (procedureId == -1) {
Expand All @@ -1375,16 +1375,12 @@ private TSStatus executeWithoutDuplicate(
}
}
final List<TSStatus> procedureStatus = new ArrayList<>();
final boolean isSucceed =
waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus);
if (isSucceed) {
return StatusUtils.OK;
} else {
return procedureStatus.get(0);
}
return waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus)
? StatusUtils.OK
: procedureStatus.get(0);
}

private Pair<Long, Boolean> awaitDuplicateTableTask(
public Pair<Long, Boolean> checkDuplicateTableTask(
final String database,
final TsTable table,
final String tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.table.FetchTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.table.ShowTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetAllSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetAllTemplateSetInfoPlan;
Expand All @@ -63,6 +64,7 @@
import org.apache.iotdb.confignode.consensus.response.database.CountDatabaseResp;
import org.apache.iotdb.confignode.consensus.response.database.DatabaseSchemaResp;
import org.apache.iotdb.confignode.consensus.response.partition.PathInfoResp;
import org.apache.iotdb.confignode.consensus.response.table.FetchTableResp;
import org.apache.iotdb.confignode.consensus.response.table.ShowTableResp;
import org.apache.iotdb.confignode.consensus.response.template.AllTemplateSetInfoResp;
import org.apache.iotdb.confignode.consensus.response.template.TemplateInfoResp;
Expand All @@ -76,6 +78,7 @@
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseInfo;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
Expand Down Expand Up @@ -1073,6 +1076,19 @@ public TShowTableResp showTables(final String database) {
}
}

public TFetchTableResp fetchTables(final Map<String, Set<String>> fetchTableMap) {
try {
return ((FetchTableResp)
configManager.getConsensusManager().read(new FetchTablePlan(fetchTableMap)))
.convertToTFetchTableResp();
} catch (final ConsensusException e) {
LOGGER.warn("Failed in the read API executing the consensus layer due to: ", e);
final TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
return new TFetchTableResp(res);
}
}

public byte[] getAllTableInfoForDataNodeActivation() {
return TsTableInternalRPCUtil.serializeTableInitializationInfo(
clusterSchemaInfo.getAllUsingTables(), clusterSchemaInfo.getAllPreCreateTables());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan;
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.table.FetchTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.table.ShowTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan;
Expand Down Expand Up @@ -311,6 +312,8 @@ public DataSet executeQueryPlan(final ConfigPhysicalReadPlan req)
return clusterSchemaInfo.getTemplateSetInfo((GetTemplateSetInfoPlan) req);
case ShowTable:
return clusterSchemaInfo.showTables((ShowTablePlan) req);
case FetchTable:
return clusterSchemaInfo.fetchTables((FetchTablePlan) req);
case GetTriggerTable:
return triggerInfo.getTriggerTable((GetTriggerTablePlan) req);
case GetTriggerLocation:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.table.FetchTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.table.ShowTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan;
Expand Down Expand Up @@ -63,6 +64,7 @@
import org.apache.iotdb.confignode.consensus.response.database.CountDatabaseResp;
import org.apache.iotdb.confignode.consensus.response.database.DatabaseSchemaResp;
import org.apache.iotdb.confignode.consensus.response.partition.PathInfoResp;
import org.apache.iotdb.confignode.consensus.response.table.FetchTableResp;
import org.apache.iotdb.confignode.consensus.response.table.ShowTableResp;
import org.apache.iotdb.confignode.consensus.response.template.AllTemplateSetInfoResp;
import org.apache.iotdb.confignode.consensus.response.template.TemplateInfoResp;
Expand Down Expand Up @@ -1102,6 +1104,27 @@ public ShowTableResp showTables(final ShowTablePlan plan) {
}
}

public FetchTableResp fetchTables(final FetchTablePlan plan) {
databaseReadWriteLock.readLock().lock();
try {
final Map<String, Map<String, TsTable>> result = new HashMap<>();
for (final Map.Entry<String, Set<String>> database2Tables :
plan.getFetchTableMap().entrySet()) {
result.put(
database2Tables.getKey(),
mTree.getSpecificTablesUnderSpecificDatabase(
getQualifiedDatabasePartialPath(database2Tables.getKey()),
database2Tables.getValue()));
}
return new FetchTableResp(StatusUtils.OK, result);
} catch (final MetadataException e) {
return new FetchTableResp(
RpcUtils.getStatus(e.getErrorCode(), e.getMessage()), Collections.emptyMap());
} finally {
databaseReadWriteLock.readLock().unlock();
}
}

public Map<String, List<TsTable>> getAllUsingTables() {
databaseReadWriteLock.readLock().lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,23 @@ public List<TsTable> getAllUsingTablesUnderSpecificDatabase(final PartialPath da
.collect(Collectors.toList());
}

public Map<String, TsTable> getSpecificTablesUnderSpecificDatabase(
final PartialPath databasePath, final Set<String> tables) throws MetadataException {
final IConfigMNode databaseNode = getDatabaseNodeByDatabasePath(databasePath).getAsMNode();
final Map<String, TsTable> result = new HashMap<>();
tables.forEach(
table -> {
final IConfigMNode child = databaseNode.getChildren().get(table);
if (child instanceof ConfigTableNode
&& ((ConfigTableNode) child).getStatus().equals(TableNodeStatus.USING)) {
result.put(table, ((ConfigTableNode) child).getTable());
} else {
result.put(table, null);
}
});
return result;
}

public Map<String, List<TsTable>> getAllUsingTables() {
return getAllDatabasePaths().stream()
.collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ private void commitRelease(final ConfigNodeProcedureEnv env) {
database,
table.getTableName(),
failedResults);
// TODO: Handle commit failure
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ private void commitReleaseTable(final ConfigNodeProcedureEnv env) {
database,
table.getTableName(),
failedResults);
// TODO: Handle commit failure
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ private void commitRelease(final ConfigNodeProcedureEnv env) {
database,
table.getTableName(),
failedResults);
// TODO: Handle commit failure
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
Expand Down Expand Up @@ -214,6 +215,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/** ConfigNodeRPCServer exposes the interface that interacts with the DataNode */
Expand Down Expand Up @@ -1304,4 +1306,9 @@ public TSStatus alterTable(final TAlterTableReq req) {
public TShowTableResp showTables(final String database) {
return configManager.showTables(database);
}

@Override
public TFetchTableResp fetchTables(final Map<String, Set<String>> fetchTableMap) {
return configManager.fetchTables(fetchTableMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,8 @@ public class IoTDBConfig {
/** Policy of DataNodeSchemaCache eviction */
private String dataNodeSchemaCacheEvictionPolicy = "FIFO";

private int dataNodeTableCacheSemaphorePermitNum = 5;

private String readConsistencyLevel = "strong";

/** Maximum execution time of a DriverTask */
Expand Down Expand Up @@ -3466,6 +3468,14 @@ public void setDataNodeSchemaCacheEvictionPolicy(String dataNodeSchemaCacheEvict
this.dataNodeSchemaCacheEvictionPolicy = dataNodeSchemaCacheEvictionPolicy;
}

public int getDataNodeTableCacheSemaphorePermitNum() {
return dataNodeTableCacheSemaphorePermitNum;
}

public void setDataNodeTableCacheSemaphorePermitNum(int dataNodeTableCacheSemaphorePermitNum) {
this.dataNodeTableCacheSemaphorePermitNum = dataNodeTableCacheSemaphorePermitNum;
}

public String getReadConsistencyLevel() {
return readConsistencyLevel;
}
Expand Down
Loading

0 comments on commit 5b22e7b

Please sign in to comment.