From 51254770f77bf13cfd7ea165716e26de6e103a73 Mon Sep 17 00:00:00 2001 From: huanghaibin Date: Fri, 10 Jan 2025 11:25:12 +0800 Subject: [PATCH 1/3] [fix](cloud-mow) schema change should retry when encouter TXN_CONFILCT in cloud mode --- be/src/cloud/cloud_meta_mgr.cpp | 4 + be/src/cloud/cloud_schema_change_job.cpp | 8 + be/src/common/status.h | 1 + .../java/org/apache/doris/common/Config.java | 8 + .../apache/doris/alter/SchemaChangeJobV2.java | 10 +- gensrc/thrift/Status.thrift | 2 + ...schema_change_with_mow_txn_conflict.groovy | 187 ++++++++++++++++++ 7 files changed, 219 insertions(+), 1 deletion(-) create mode 100644 regression-test/suites/schema_change_p0/test_schema_change_with_mow_txn_conflict.groovy diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 2b5f04550cfe2a..7ea501b602cb1d 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -385,6 +385,10 @@ Status retry_rpc(std::string_view op_name, const Request& req, Response* res, } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { return Status::Error("failed to {}: {}", op_name, res->status().msg()); + } else if (res->status().code() == + MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) { + return Status::Error("failed to {}: {}", op_name, + res->status().msg()); } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { return Status::Error("failed to {}: {}", op_name, res->status().msg()); diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index 14781da0f37f97..75882aefc58081 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -392,6 +392,14 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam } cloud::FinishTabletJobResponse finish_resp; + DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.test_conflict", { + std::srand(static_cast(std::time(nullptr))); + int random_value = std::rand() % 100; + if (random_value < 50) { + LOG(INFO) << "test txn conflict"; + return Status::Error("test txn conflict"); + } + }); auto st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job, &finish_resp); if (!st.ok()) { if (finish_resp.status().code() == cloud::JOB_ALREADY_SUCCESS) { diff --git a/be/src/common/status.h b/be/src/common/status.h index 0252ec8564feeb..2881878774b85f 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -78,6 +78,7 @@ namespace ErrorCode { TStatusError(NOT_MASTER, true); \ TStatusError(OBTAIN_LOCK_FAILED, false); \ TStatusError(SNAPSHOT_EXPIRED, false); \ + TStatusError(TXN_CONFLICT, false); \ TStatusError(DELETE_BITMAP_LOCK_ERROR, false); // E error_name, error_code, print_stacktrace #define APPLY_FOR_OLAP_ERROR_CODES(E) \ diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index a8b0bd15e84ce0..4a6833455560b3 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3306,6 +3306,14 @@ public static int metaServiceRpcRetryTimes() { "Maximal concurrent num of get tablet stat job."}) public static int max_get_tablet_stat_task_threads_num = 4; + @ConfField(mutable = true, description = {"存算分离模式下schema change失败是否重试", + "Whether to enable retry when schema change failed in cloud model, default is true."}) + public static boolean enable_schema_change_retry_in_cloud_mode = true; + + @ConfField(mutable = true, description = {"存算分离模式下schema change重试次数", + "Max retry times when schema change failed in cloud model, default is 3."}) + public static int schema_change_max_retry_time = 3; + // ATTN: DONOT add any config not related to cloud mode here // ATTN: DONOT add any config not related to cloud mode here // ATTN: DONOT add any config not related to cloud mode here diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index bea703a29d75fa..50dfa0103fcf57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -56,6 +56,7 @@ import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterReplicaTask; import org.apache.doris.task.CreateReplicaTask; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -577,7 +578,14 @@ protected void runRunningJob() throws AlterCancelException { List tasks = schemaChangeBatchTask.getUnfinishedTasks(2000); ensureCloudClusterExist(tasks); for (AgentTask task : tasks) { - if (task.getFailedTimes() > 0) { + int maxFailedTimes = 0; + if (Config.isCloudMode() && Config.enable_schema_change_retry_in_cloud_mode) { + if (task.getErrorCode() != null && task.getErrorCode().equals(TStatusCode.TXN_CONFLICT)) { + maxFailedTimes = Config.schema_change_max_retry_time; + } + LOG.warn("schema change task failed: {}, maxFailedTimes {}", task.getErrorMsg(), maxFailedTimes); + } + if (task.getFailedTimes() > maxFailedTimes) { task.setFinished(true); AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature()); LOG.warn("schema change task failed: {}", task.getErrorMsg()); diff --git a/gensrc/thrift/Status.thrift b/gensrc/thrift/Status.thrift index 7bdafc59ddb3aa..7265ebb2032236 100644 --- a/gensrc/thrift/Status.thrift +++ b/gensrc/thrift/Status.thrift @@ -108,6 +108,8 @@ enum TStatusCode { SNAPSHOT_EXPIRED = 75, + TXN_CONFLICT = 76, + // used for cloud DELETE_BITMAP_LOCK_ERROR = 100, // Not be larger than 200, see status.h diff --git a/regression-test/suites/schema_change_p0/test_schema_change_with_mow_txn_conflict.groovy b/regression-test/suites/schema_change_p0/test_schema_change_with_mow_txn_conflict.groovy new file mode 100644 index 00000000000000..46ccd812c2728c --- /dev/null +++ b/regression-test/suites/schema_change_p0/test_schema_change_with_mow_txn_conflict.groovy @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_schema_change_with_mow_txn_conflict", "p0") { + def customFeConfig = [ + schema_change_max_retry_time: 10 + ] + setFeConfigTemporary(customFeConfig) { + try { + def tableName3 = "test_all_unique_mow" + GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.test_conflict") + + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def getCreateViewState = { tableName -> + def createViewStateResult = sql """ SHOW ALTER TABLE MATERIALIZED VIEW WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return createViewStateResult[0][8] + } + + def execStreamLoad = { + streamLoad { + table "${tableName3}" + + set 'column_separator', ',' + + file 'all_types.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(2500, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + } + + sql """ DROP TABLE IF EXISTS ${tableName3} """ + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName3} ( + `k1` int(11) NULL, + `k2` tinyint(4) NULL, + `k3` smallint(6) NULL, + `k4` int(30) NULL, + `k5` largeint(40) NULL, + `k6` float NULL, + `k7` double NULL, + `k8` decimal(9, 0) NULL, + `k9` char(10) NULL, + `k10` varchar(1024) NULL, + `k11` text NULL, + `k12` date NULL, + `k13` datetime NULL + ) ENGINE=OLAP + unique KEY(k1, k2, k3) + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true" + ); + """ + execStreamLoad() + + sql """ alter table ${tableName3} modify column k4 string NULL""" + + Awaitility.await().atMost(600, TimeUnit.SECONDS).pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until( + { + String res = getJobState(tableName3) + if (res == "FINISHED" || res == "CANCELLED") { + assertEquals("FINISHED", res) + return true + } + execStreamLoad() + return false + } + ) + + sql """ alter table ${tableName3} add column v14 int NOT NULL default "1" after k13 """ + Awaitility.await().atMost(60, TimeUnit.SECONDS).pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until( + { + String res = getJobState(tableName3) + if (res == "FINISHED" || res == "CANCELLED") { + assertEquals("FINISHED", res) + return true + } + execStreamLoad() + return false + } + ) + sql """ insert into ${tableName3} values (10001, 2, 3, 4, 5, 6.6, 1.7, 8.8, + 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00', 10086) """ +// +// sql """ alter table ${tableName3} modify column k5 string NULL""" +// Awaitility.await().atMost(60, TimeUnit.SECONDS).pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until( +// { +// String res = getJobState(tableName3) +// if (res == "FINISHED" || res == "CANCELLED") { +// assertEquals("FINISHED", res) +// return true +// } +// execStreamLoad() +// return false +// } +// ) +// +// sql """ alter table ${tableName3} add column v14 int NOT NULL default "1" after k13 """ +// sql """ insert into ${tableName3} values (10001, 2, 3, 4, 5, 6.6, 1.7, 8.8, +// 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00', 10086) """ +// +// sql """ alter table ${tableName3} modify column v14 int NULL default "1" """ +// +// int cnt = 6000 +// Awaitility.await().atMost(60, TimeUnit.SECONDS).pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until( +// { +// String res = getJobState(tableName3) +// if (res == "FINISHED" || res == "CANCELLED") { +// assertEquals("FINISHED", res) +// return true +// } +// cnt--; +// int val = 100000 + cnt +// sql """ insert into ${tableName3} values (${val}, 2, 3, 4, 5, 6.6, 1.7, 8.8, +// 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00', 9527) """ +// return false +// } +// ) +// +// sql """ alter table ${tableName3} drop column v14 """ +// execStreamLoad() +// +// sql """ alter table ${tableName3} add column v14 int NOT NULL default "1" after k13 """ +// +// sql """ insert into ${tableName3} values (10002, 2, 3, 4, 5, 6.6, 1.7, 8.8, +// 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00', 10086) """ +// +// sql """ alter table ${tableName3} drop column v14 """ +// +// sql """ alter table ${tableName3} add column v14 bitmap after k13 """ +// +// sql """ insert into ${tableName3} values (10002, 2, 3, 4, 5, 6.6, 1.7, 8.8, +// 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00', to_bitmap(243)) """ +// +// sql """ alter table ${tableName3} drop column v14 """ +// + List> result = sql """ select * from ${tableName3} """ + for (row : result) { + logger.info("r1=" +row[1] ) + logger.info("r2=" +row[2] ) + logger.info("r3=" +row[3] ) + logger.info("r4=" +row[4] ) + assertEquals(2, row[1]); + assertEquals(3, row[2]); + assertEquals("4", row[3]); + assertEquals("5", row[4]); + } + } finally { + GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.test_conflict") + } + } + + +} \ No newline at end of file From d9535df4426ef1921c655445ec71e79aeb6a1efa Mon Sep 17 00:00:00 2001 From: huanghaibin Date: Fri, 10 Jan 2025 11:33:34 +0800 Subject: [PATCH 2/3] edit --- ...schema_change_with_mow_txn_conflict.groovy | 57 ------------------- 1 file changed, 57 deletions(-) diff --git a/regression-test/suites/schema_change_p0/test_schema_change_with_mow_txn_conflict.groovy b/regression-test/suites/schema_change_p0/test_schema_change_with_mow_txn_conflict.groovy index 46ccd812c2728c..448ae3bc0fbea1 100644 --- a/regression-test/suites/schema_change_p0/test_schema_change_with_mow_txn_conflict.groovy +++ b/regression-test/suites/schema_change_p0/test_schema_change_with_mow_txn_conflict.groovy @@ -114,65 +114,8 @@ suite("test_schema_change_with_mow_txn_conflict", "p0") { ) sql """ insert into ${tableName3} values (10001, 2, 3, 4, 5, 6.6, 1.7, 8.8, 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00', 10086) """ -// -// sql """ alter table ${tableName3} modify column k5 string NULL""" -// Awaitility.await().atMost(60, TimeUnit.SECONDS).pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until( -// { -// String res = getJobState(tableName3) -// if (res == "FINISHED" || res == "CANCELLED") { -// assertEquals("FINISHED", res) -// return true -// } -// execStreamLoad() -// return false -// } -// ) -// -// sql """ alter table ${tableName3} add column v14 int NOT NULL default "1" after k13 """ -// sql """ insert into ${tableName3} values (10001, 2, 3, 4, 5, 6.6, 1.7, 8.8, -// 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00', 10086) """ -// -// sql """ alter table ${tableName3} modify column v14 int NULL default "1" """ -// -// int cnt = 6000 -// Awaitility.await().atMost(60, TimeUnit.SECONDS).pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until( -// { -// String res = getJobState(tableName3) -// if (res == "FINISHED" || res == "CANCELLED") { -// assertEquals("FINISHED", res) -// return true -// } -// cnt--; -// int val = 100000 + cnt -// sql """ insert into ${tableName3} values (${val}, 2, 3, 4, 5, 6.6, 1.7, 8.8, -// 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00', 9527) """ -// return false -// } -// ) -// -// sql """ alter table ${tableName3} drop column v14 """ -// execStreamLoad() -// -// sql """ alter table ${tableName3} add column v14 int NOT NULL default "1" after k13 """ -// -// sql """ insert into ${tableName3} values (10002, 2, 3, 4, 5, 6.6, 1.7, 8.8, -// 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00', 10086) """ -// -// sql """ alter table ${tableName3} drop column v14 """ -// -// sql """ alter table ${tableName3} add column v14 bitmap after k13 """ -// -// sql """ insert into ${tableName3} values (10002, 2, 3, 4, 5, 6.6, 1.7, 8.8, -// 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00', to_bitmap(243)) """ -// -// sql """ alter table ${tableName3} drop column v14 """ -// List> result = sql """ select * from ${tableName3} """ for (row : result) { - logger.info("r1=" +row[1] ) - logger.info("r2=" +row[2] ) - logger.info("r3=" +row[3] ) - logger.info("r4=" +row[4] ) assertEquals(2, row[1]); assertEquals(3, row[2]); assertEquals("4", row[3]); From 702e8c69bfb83d797b93e5b59cb66b7ff59363a1 Mon Sep 17 00:00:00 2001 From: huanghaibin Date: Fri, 10 Jan 2025 19:45:15 +0800 Subject: [PATCH 3/3] edit --- be/src/cloud/cloud_meta_mgr.cpp | 8 ++++++-- be/src/cloud/cloud_schema_change_job.cpp | 3 +-- be/src/common/status.h | 1 - .../java/org/apache/doris/alter/SchemaChangeJobV2.java | 6 ++++-- gensrc/thrift/Status.thrift | 2 -- 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 7ea501b602cb1d..bf0e777705e7b3 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -387,8 +387,8 @@ Status retry_rpc(std::string_view op_name, const Request& req, Response* res, res->status().msg()); } else if (res->status().code() == MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) { - return Status::Error("failed to {}: {}", op_name, - res->status().msg()); + return Status::Error( + "failed to {}: {}", op_name, res->status().msg()); } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { return Status::Error("failed to {}: {}", op_name, res->status().msg()); @@ -397,6 +397,10 @@ Status retry_rpc(std::string_view op_name, const Request& req, Response* res, } if (++retry_times > config::meta_service_rpc_retry_times) { + if (res->status().code() == MetaServiceCode::LOCK_CONFLICT) { + return Status::Error( + "failed to {}: {}", op_name, res->status().msg()); + } break; } diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index 75882aefc58081..81b07ff7e4c843 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -396,8 +396,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam std::srand(static_cast(std::time(nullptr))); int random_value = std::rand() % 100; if (random_value < 50) { - LOG(INFO) << "test txn conflict"; - return Status::Error("test txn conflict"); + return Status::Error("test txn conflict"); } }); auto st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job, &finish_resp); diff --git a/be/src/common/status.h b/be/src/common/status.h index 2881878774b85f..0252ec8564feeb 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -78,7 +78,6 @@ namespace ErrorCode { TStatusError(NOT_MASTER, true); \ TStatusError(OBTAIN_LOCK_FAILED, false); \ TStatusError(SNAPSHOT_EXPIRED, false); \ - TStatusError(TXN_CONFLICT, false); \ TStatusError(DELETE_BITMAP_LOCK_ERROR, false); // E error_name, error_code, print_stacktrace #define APPLY_FOR_OLAP_ERROR_CODES(E) \ diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 50dfa0103fcf57..e919070b3f2391 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -580,10 +580,12 @@ protected void runRunningJob() throws AlterCancelException { for (AgentTask task : tasks) { int maxFailedTimes = 0; if (Config.isCloudMode() && Config.enable_schema_change_retry_in_cloud_mode) { - if (task.getErrorCode() != null && task.getErrorCode().equals(TStatusCode.TXN_CONFLICT)) { + if (task.getErrorCode() != null && task.getErrorCode() + .equals(TStatusCode.DELETE_BITMAP_LOCK_ERROR)) { maxFailedTimes = Config.schema_change_max_retry_time; } - LOG.warn("schema change task failed: {}, maxFailedTimes {}", task.getErrorMsg(), maxFailedTimes); + LOG.warn("schema change task failed: {}, set maxFailedTimes {}", task.getErrorMsg(), + maxFailedTimes); } if (task.getFailedTimes() > maxFailedTimes) { task.setFinished(true); diff --git a/gensrc/thrift/Status.thrift b/gensrc/thrift/Status.thrift index 7265ebb2032236..7bdafc59ddb3aa 100644 --- a/gensrc/thrift/Status.thrift +++ b/gensrc/thrift/Status.thrift @@ -108,8 +108,6 @@ enum TStatusCode { SNAPSHOT_EXPIRED = 75, - TXN_CONFLICT = 76, - // used for cloud DELETE_BITMAP_LOCK_ERROR = 100, // Not be larger than 200, see status.h