diff --git a/mysql-test/suite/galera/r/galera_defaults.result b/mysql-test/suite/galera/r/galera_defaults.result index 59dd47036c497..9a2407ec52a3d 100644 --- a/mysql-test/suite/galera/r/galera_defaults.result +++ b/mysql-test/suite/galera/r/galera_defaults.result @@ -18,6 +18,7 @@ AND VARIABLE_NAME NOT IN ( ORDER BY VARIABLE_NAME; VARIABLE_NAME VARIABLE_VALUE WSREP_ALLOWLIST +WSREP_APPLIER_RETRY_COUNT 0 WSREP_AUTO_INCREMENT_CONTROL ON WSREP_CERTIFICATION_RULES strict WSREP_CERTIFY_NONPK ON diff --git a/mysql-test/suite/galera/r/galera_retry_applying.result b/mysql-test/suite/galera/r/galera_retry_applying.result new file mode 100644 index 0000000000000..db5b3cf7bcb1e --- /dev/null +++ b/mysql-test/suite/galera/r/galera_retry_applying.result @@ -0,0 +1,73 @@ +connection node_2; +connection node_1; +CREATE TABLE t1 (f1 INTEGER PRIMARY KEY DEFAULT 0, f2 char(12)); +CREATE TABLE t3 (f1 INTEGER PRIMARY KEY DEFAULT 0, f2 char(12)); +START TRANSACTION; +INSERT INTO t3 (f1, f2) VALUES (1, 'a'); +INSERT INTO t3 (f1, f2) VALUES (2, 'b'); +INSERT INTO t3 (f1, f2) VALUES (3, 'c'), (4, 'd'), (5, 'e'); +COMMIT; +connection node_2; +SET GLOBAL wsrep_applier_retry_count = 2; +SET GLOBAL debug_dbug = "d,apply_event_fail_once:o,/dev/null"; +CALL mtr.add_suppression("Event .* Write_rows.* apply failed"); +connection node_1; +START TRANSACTION; +UPDATE t3 SET f2 = 'ax' WHERE f1 = 1; +UPDATE t3 SET f2 = 'bx' WHERE f1 = 2; +INSERT INTO t1 (f1, f2) VALUES (3, 'c'), (4, 'd'), (5, 'e'); +UPDATE t3 SET f2 = 'cx' WHERE f1 = 3; +UPDATE t3 SET f2 = 'dx' WHERE f1 = 4; +DELETE FROM t3 WHERE f1 = 5; +COMMIT; +connection node_2; +connection node_1; +SELECT COUNT(*) AS expect_3 FROM t1; +expect_3 +3 +SELECT COUNT(*) AS expect_4 FROM t3; +expect_4 +4 +connection node_2; +SELECT COUNT(*) AS expect_3 FROM t1; +expect_3 +3 +SELECT COUNT(*) AS expect_4 FROM t3; +expect_4 +4 +connection node_1; +DROP TABLE t1; +DROP TABLE t3; +connection node_2; +Shutting down server ... +SET wsrep_on=OFF; +Restarting server ... +connection node_1; +SET wsrep_sync_wait=0; +CREATE TABLE t2 (f1 INTEGER PRIMARY KEY DEFAULT 0, f2 char(12)); +connection node_2; +CALL mtr.add_suppression("Event .* Update_rows.* apply failed"); +CALL mtr.add_suppression("Inconsistency detected"); +CALL mtr.add_suppression("Failed to apply write set:.*"); +SET GLOBAL wsrep_applier_retry_count = 2; +SET GLOBAL debug_dbug = ''; +SET GLOBAL debug_dbug = "d,apply_event_fail_always:o,/dev/null"; +connection node_1; +START TRANSACTION; +INSERT INTO t2 (f1, f2) VALUES (1, 'a'), (2, 'b'); +COMMIT; +connection node_2; +Shutting down server ... +SET wsrep_on=OFF; +Restarting server ... +connection node_1; +SET wsrep_sync_wait=0; +connection node_2; +SELECT COUNT(*) AS expect_2 FROM t2; +expect_2 +2 +SET GLOBAL debug_dbug = DEFAULT; +connection node_1; +SET GLOBAL wsrep_applier_retry_count = 0; +SET DEBUG_SYNC = 'RESET'; +DROP TABLE t2; diff --git a/mysql-test/suite/galera/t/galera_retry_applying.test b/mysql-test/suite/galera/t/galera_retry_applying.test new file mode 100644 index 0000000000000..8d9210bdb3db8 --- /dev/null +++ b/mysql-test/suite/galera/t/galera_retry_applying.test @@ -0,0 +1,135 @@ +# +# Test retrying applying of a transaction +# + +--source include/galera_cluster.inc +--source include/have_debug_sync.inc + +# +# Case 1: Retrying succeeds after one retry event, no error is raised. +# +CREATE TABLE t1 (f1 INTEGER PRIMARY KEY DEFAULT 0, f2 char(12)); +CREATE TABLE t3 (f1 INTEGER PRIMARY KEY DEFAULT 0, f2 char(12)); + +START TRANSACTION; +INSERT INTO t3 (f1, f2) VALUES (1, 'a'); +INSERT INTO t3 (f1, f2) VALUES (2, 'b'); +INSERT INTO t3 (f1, f2) VALUES (3, 'c'), (4, 'd'), (5, 'e'); +COMMIT; + +# wait till the insert transaction has been replicated and committed in node_2 +--connection node_2 +--let $wait_condition = SELECT COUNT(*) > 0 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 't3'; +--source include/wait_condition.inc +--let $wait_condition = SELECT COUNT(*) > 0 FROM t3; +--source include/wait_condition.inc + +SET GLOBAL wsrep_applier_retry_count = 2; +SET GLOBAL debug_dbug = "d,apply_event_fail_once:o,/dev/null"; +CALL mtr.add_suppression("Event .* Write_rows.* apply failed"); + +--connection node_1 +START TRANSACTION; +UPDATE t3 SET f2 = 'ax' WHERE f1 = 1; +UPDATE t3 SET f2 = 'bx' WHERE f1 = 2; +INSERT INTO t1 (f1, f2) VALUES (3, 'c'), (4, 'd'), (5, 'e'); +UPDATE t3 SET f2 = 'cx' WHERE f1 = 3; +UPDATE t3 SET f2 = 'dx' WHERE f1 = 4; +DELETE FROM t3 WHERE f1 = 5; +COMMIT; + +# wait till the transaction has been replicated and committed in node_2 +--connection node_2 +--let $wait_condition = SELECT COUNT(*) = 4 FROM t3; +--source include/wait_condition.inc + +--connection node_1 +SELECT COUNT(*) AS expect_3 FROM t1; +SELECT COUNT(*) AS expect_4 FROM t3; + +--connection node_2 +SELECT COUNT(*) AS expect_3 FROM t1; +SELECT COUNT(*) AS expect_4 FROM t3; + +# +# Cleanup after Case 1. +# + +--connection node_1 +DROP TABLE t1; +DROP TABLE t3; + +# shutdown node 2 and restart it +--connection node_2 +--let $wait_condition = SELECT COUNT(*) = 0 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 't3'; +--source include/wait_condition.inc +--echo Shutting down server ... +SET wsrep_on=OFF; +--source include/shutdown_mysqld.inc +--remove_file $MYSQLTEST_VARDIR/mysqld.2/data/grastate.dat +--echo Restarting server ... +--source include/start_mysqld.inc + +# wait till node 2 is back in the cluster +--connection node_1 +SET wsrep_sync_wait=0; +--let $wait_condition = SELECT VARIABLE_VALUE = 2 FROM performance_schema.global_status WHERE VARIABLE_NAME = 'wsrep_cluster_size' +--source include/wait_condition.inc + + +# +# Case 2: Slave retries applying of a transaction multiple times. All +# retry attempts fail, and the applying will fail with the expected +# error. +# + +CREATE TABLE t2 (f1 INTEGER PRIMARY KEY DEFAULT 0, f2 char(12)); + +--connection node_2 +CALL mtr.add_suppression("Event .* Update_rows.* apply failed"); +CALL mtr.add_suppression("Inconsistency detected"); +CALL mtr.add_suppression("Failed to apply write set:.*"); + +SET GLOBAL wsrep_applier_retry_count = 2; +SET GLOBAL debug_dbug = ''; +SET GLOBAL debug_dbug = "d,apply_event_fail_always:o,/dev/null"; + +--connection node_1 +START TRANSACTION; +INSERT INTO t2 (f1, f2) VALUES (1, 'a'), (2, 'b'); +COMMIT; + +# node 2 should crash now, wait for the crash +--let $wait_condition = SELECT VARIABLE_VALUE = 1 FROM performance_schema.global_status WHERE VARIABLE_NAME = 'wsrep_cluster_size' +--source include/wait_condition.inc + +# restart node 2 +--connection node_2 +--echo Shutting down server ... +SET wsrep_on=OFF; +--source include/shutdown_mysqld.inc +--source include/wait_until_disconnected.inc +--remove_file $MYSQLTEST_VARDIR/mysqld.2/data/grastate.dat +--echo Restarting server ... +--source include/start_mysqld.inc + +# wait till node 2 is back in the cluster +--connection node_1 +SET wsrep_sync_wait=0; +--let $wait_condition = SELECT VARIABLE_VALUE = 2 FROM performance_schema.global_status WHERE VARIABLE_NAME = 'wsrep_cluster_size' +--source include/wait_condition.inc + +--connection node_2 +--let $wait_condition = SELECT COUNT(*) = 2 FROM t2; +--source include/wait_condition.inc +SELECT COUNT(*) AS expect_2 FROM t2; +SET GLOBAL debug_dbug = DEFAULT; + +# +# Cleanup +# + +--connection node_1 +SET GLOBAL wsrep_applier_retry_count = 0; +SET DEBUG_SYNC = 'RESET'; +DROP TABLE t2; diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index 899c468eb878d..8e886c5fec7d7 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -4925,7 +4925,33 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) /* remove trigger's tables */ goto err; } - +#ifdef WITH_WSREP + DBUG_EXECUTE_IF("apply_event_fail_once", { + if (WSREP(thd)) { + RPL_TABLE_LIST *ptr= static_cast(rgi->tables_to_lock); + error= HA_ERR_LOCK_WAIT_TIMEOUT; + slave_rows_error_report( + INFORMATION_LEVEL, error, rgi, thd, ptr->table, + get_type_str(), RPL_LOG_NAME, log_pos); + my_error(error, MYF(0)); + thd->is_slave_error= 1; + DBUG_SET("-d,apply_event_fail_once"); + goto err; + } + };); + DBUG_EXECUTE_IF("apply_event_fail_always", { + if (WSREP(thd)) { + RPL_TABLE_LIST *ptr= static_cast(rgi->tables_to_lock); + error= HA_ERR_LOCK_WAIT_TIMEOUT; + slave_rows_error_report( + INFORMATION_LEVEL, error, rgi, thd, ptr->table, + get_type_str(), RPL_LOG_NAME, log_pos); + my_error(error, MYF(0)); + thd->is_slave_error= 1; + goto err; + } + };); +#endif /* WITH_WSREP */ /* When the open and locking succeeded, we check all tables to ensure that they still have the correct type. diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 633918c833b99..c03997fb19556 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -6493,6 +6493,12 @@ static Sys_var_charptr Sys_wsrep_allowlist( READ_ONLY GLOBAL_VAR(wsrep_allowlist), CMD_LINE(REQUIRED_ARG), DEFAULT("")); +static Sys_var_uint Sys_wsrep_applier_retry_count ( + "wsrep_applier_retry_count", "Maximum number of applier retry attempts", + GLOBAL_VAR(wsrep_applier_retry_count), CMD_LINE(OPT_ARG), + VALID_RANGE(0, UINT_MAX), DEFAULT(0), BLOCK_SIZE(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG); + #endif /* WITH_WSREP */ static bool fix_host_cache_size(sys_var *, THD *, enum_var_type) diff --git a/sql/transaction.cc b/sql/transaction.cc index 8cdf1d0f7a547..092cb4ae70cbe 100644 --- a/sql/transaction.cc +++ b/sql/transaction.cc @@ -783,3 +783,14 @@ bool trans_release_savepoint(THD *thd, LEX_CSTRING name) DBUG_RETURN(MY_TEST(res)); } + +#ifdef WITH_WSREP +/* check if a named savepoint exists for the current transaction */ +bool trans_savepoint_exists(THD *thd, LEX_CSTRING name) +{ + SAVEPOINT **sv = find_savepoint(thd, name); + + return (*sv != NULL); +} +#endif /* WITH_WSREP */ + diff --git a/sql/transaction.h b/sql/transaction.h index fe0129fa8bc36..2da7cbebaf491 100644 --- a/sql/transaction.h +++ b/sql/transaction.h @@ -38,6 +38,9 @@ bool trans_rollback_stmt(THD *thd); bool trans_savepoint(THD *thd, LEX_CSTRING name); bool trans_rollback_to_savepoint(THD *thd, LEX_CSTRING name); bool trans_release_savepoint(THD *thd, LEX_CSTRING name); +#ifdef WITH_WSREP +bool trans_savepoint_exists(THD *thd, LEX_CSTRING name); +#endif /* WITH_WSREP */ void trans_reset_one_shot_chistics(THD *thd); diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc index 04a2031177118..9f0a4770f38bd 100644 --- a/sql/wsrep_applier.cc +++ b/sql/wsrep_applier.cc @@ -127,17 +127,19 @@ void wsrep_store_error(const THD* const thd, dst.size(), dst.size() ? dst.data() : "(null)"); } -int wsrep_apply_events(THD* thd, - Relay_log_info* rli, - const void* events_buf, - size_t buf_len) +static int apply_events(THD* thd, + Relay_log_info* rli, + const void* events_buf, + size_t buf_len, + const LEX_STRING &savepoint, + bool set_savepoint) { char *buf= (char *)events_buf; int rcode= 0; int event= 1; Log_event_type typ; - DBUG_ENTER("wsrep_apply_events"); + DBUG_ENTER("apply_events"); if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld", (long long) wsrep_thd_trx_seqno(thd)); @@ -147,6 +149,15 @@ int wsrep_apply_events(THD* thd, else thd->variables.gtid_domain_id= global_system_variables.gtid_domain_id; + bool in_trans = thd->in_active_multi_stmt_transaction(); + if (in_trans && set_savepoint) { + if (wsrep_applier_retry_count > 0 && !thd->wsrep_trx().is_streaming() && + trans_savepoint(thd, savepoint)) { + rcode = 1; + goto error; + } + } + while (buf_len) { int exec_res; @@ -234,6 +245,19 @@ int wsrep_apply_events(THD* thd, delete ev; goto error; } + + /* Transaction was started by the event, set the savepoint to rollback to + * in case of failure. */ + if (!in_trans && thd->in_active_multi_stmt_transaction()) { + in_trans = true; + if (wsrep_applier_retry_count > 0 && !thd->wsrep_trx().is_streaming() + && set_savepoint && trans_savepoint(thd, savepoint)) { + delete ev; + rcode = 1; + goto error; + } + } + event++; delete_or_keep_event_post_apply(thd->wsrep_rgi, typ, ev); @@ -247,3 +271,51 @@ int wsrep_apply_events(THD* thd, DBUG_RETURN(rcode); } + +int wsrep_apply_events(THD* thd, + Relay_log_info* rli, + const wsrep::const_buffer& data, + wsrep::mutable_buffer& err, + bool const include_msg) +{ + static char savepoint_name[20] = "wsrep_retry"; + static size_t savepoint_name_len = strlen(savepoint_name); + static const LEX_STRING savepoint= { savepoint_name, savepoint_name_len }; + uint n_retries = 0; + bool savepoint_exists = false; + + int ret= apply_events(thd, rli, data.data(), data.size(), savepoint, true); + + while (ret && n_retries < wsrep_applier_retry_count && + (savepoint_exists = trans_savepoint_exists(thd, savepoint))) { + /* applying failed, retry applying events */ + + /* rollback to savepoint without telling Wsrep-lib */ + thd->variables.wsrep_on = false; + if (FALSE != trans_rollback_to_savepoint(thd, savepoint)) { + thd->variables.wsrep_on = true; + break; + } + thd->variables.wsrep_on = true; + + /* reset THD object for retry */ + thd->clear_error(); + thd->reset_for_next_command(); + + /* retry applying events */ + ret= apply_events(thd, rli, data.data(), data.size(), savepoint, false); + n_retries++; + } + + if (savepoint_exists) { + trans_release_savepoint(thd, savepoint); + } + + if (ret && !thd->wsrep_has_ignored_error) + { + wsrep_store_error(thd, err, include_msg); + wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size()); + } + + return ret; +} diff --git a/sql/wsrep_applier.h b/sql/wsrep_applier.h index e633b1b9bf282..01dd496f7d8c6 100644 --- a/sql/wsrep_applier.h +++ b/sql/wsrep_applier.h @@ -20,10 +20,11 @@ #include "rpl_rli.h" // Relay_log_info #include "log_event.h" // Format_description_log_event -int wsrep_apply_events(THD* thd, - Relay_log_info* rli, - const void* events_buf, - size_t buf_len); +int wsrep_apply_events(THD* thd, + Relay_log_info* rli, + const wsrep::const_buffer& data, + wsrep::mutable_buffer& err, + bool const include_msg); /* Applier error codes, when nothing better is available. */ #define WSREP_RET_SUCCESS 0 // Success diff --git a/sql/wsrep_high_priority_service.cc b/sql/wsrep_high_priority_service.cc index bc0aa5ac99c31..aa47ad91e18e5 100644 --- a/sql/wsrep_high_priority_service.cc +++ b/sql/wsrep_high_priority_service.cc @@ -121,23 +121,6 @@ static void wsrep_setup_uk_and_fk_checks(THD* thd) thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS; } -static int apply_events(THD* thd, - Relay_log_info* rli, - const wsrep::const_buffer& data, - wsrep::mutable_buffer& err, - bool const include_msg) -{ - int const ret= wsrep_apply_events(thd, rli, data.data(), data.size()); - if (ret || wsrep_thd_has_ignored_error(thd)) - { - if (ret) - { - wsrep_store_error(thd, err, include_msg); - } - wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size()); - } - return ret; -} /**************************************************************************** High priority service @@ -440,7 +423,7 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta, #endif thd->set_time(); - int ret= apply_events(thd, m_rli, data, err, false); + int ret= wsrep_apply_events(thd, m_rli, data, err, false); wsrep_thd_set_ignored_error(thd, false); trans_commit(thd); @@ -608,7 +591,7 @@ int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta, #endif /* ENABLED_DEBUG_SYNC */ wsrep_setup_uk_and_fk_checks(thd); - int ret= apply_events(thd, m_rli, data, err, true); + int ret= wsrep_apply_events(thd, m_rli, data, err, true); thd->close_temporary_tables(); if (!ret && !(ws_meta.flags() & wsrep::provider::flag::commit)) @@ -777,7 +760,7 @@ int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta, ws_meta, thd->wsrep_sr().fragments()); } - ret= ret || apply_events(thd, m_rli, data, err, true); + ret= ret || wsrep_apply_events(thd, m_rli, data, err, true); thd->close_temporary_tables(); if (!ret && !(ws_meta.flags() & wsrep::provider::flag::commit)) { diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index fc4905562c57f..1788b656818d3 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -127,6 +127,7 @@ ulong wsrep_trx_fragment_unit= WSREP_FRAG_BYTES; // unit for fragment size ulong wsrep_SR_store_type= WSREP_SR_STORE_TABLE; uint wsrep_ignore_apply_errors= 0; +uint wsrep_applier_retry_count= 0; std::atomic wsrep_thread_create_failed; diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 183dc65b00707..34fb56c0ebdab 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -92,6 +92,7 @@ extern bool wsrep_gtid_mode; extern uint32 wsrep_gtid_domain_id; extern std::atomic wsrep_thread_create_failed; extern ulonglong wsrep_mode; +extern uint wsrep_applier_retry_count; enum enum_wsrep_reject_types { WSREP_REJECT_NONE, /* nothing rejected */ diff --git a/sql/wsrep_schema.cc b/sql/wsrep_schema.cc index 13780ede0e545..f8676ea963da3 100644 --- a/sql/wsrep_schema.cc +++ b/sql/wsrep_schema.cc @@ -1394,7 +1394,9 @@ static int replay_transaction(THD* thd, { Wsrep_schema_impl::thd_context_switch thd_context_switch(thd, orig_thd); - ret= wsrep_apply_events(orig_thd, rli, buf.ptr(), buf.length()); + wsrep::mutable_buffer unused; + ret= wsrep_apply_events(orig_thd, rli, {buf.c_ptr_quick(), buf.length()}, + unused, false); if (ret) { WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments");