From 57d413f5299858c64c53b6f9fc3e6a6b5b038517 Mon Sep 17 00:00:00 2001 From: Pekka Lampio Date: Tue, 14 Jan 2025 13:56:11 +0200 Subject: [PATCH] Ported Teemu's changes from Mysql 8. --- sql/log_event_server.cc | 42 +++++++-------- sql/wsrep_applier.cc | 82 ++++++++++++++++++++++++++++-- sql/wsrep_applier.h | 9 ++-- sql/wsrep_high_priority_service.cc | 69 ++----------------------- sql/wsrep_schema.cc | 4 +- 5 files changed, 105 insertions(+), 101 deletions(-) diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index 3c5437f42830c..8e886c5fec7d7 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -4928,35 +4928,27 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) #ifdef WITH_WSREP DBUG_EXECUTE_IF("apply_event_fail_once", { if (WSREP(thd)) { - TABLE_LIST *table_list_ptr= rgi->tables_to_lock; - for (uint i=0 ; table_list_ptr && (i < rgi->tables_to_lock_count); - table_list_ptr= table_list_ptr->next_global, i++) { - RPL_TABLE_LIST *ptr= static_cast(table_list_ptr); - WSREP_INFO("DEBUG: %s: executing row event: %s.%s", __FUNCTION__, - ptr->table->s->db.str, ptr->table->s->table_name.str); - if (0 == strcmp("test", ptr->table->s->db.str)) { - error= HA_ERR_LOCK_WAIT_TIMEOUT; - thd->is_slave_error= 1; - DBUG_SET("-d,apply_event_fail_once"); - goto err; - } - } + RPL_TABLE_LIST *ptr= static_cast(rgi->tables_to_lock); + error= HA_ERR_LOCK_WAIT_TIMEOUT; + slave_rows_error_report( + INFORMATION_LEVEL, error, rgi, thd, ptr->table, + get_type_str(), RPL_LOG_NAME, log_pos); + my_error(error, MYF(0)); + thd->is_slave_error= 1; + DBUG_SET("-d,apply_event_fail_once"); + goto err; } };); DBUG_EXECUTE_IF("apply_event_fail_always", { if (WSREP(thd)) { - TABLE_LIST *table_list_ptr= rgi->tables_to_lock; - for (uint i=0 ; table_list_ptr && (i < rgi->tables_to_lock_count); - table_list_ptr= table_list_ptr->next_global, i++) { - RPL_TABLE_LIST *ptr= static_cast(table_list_ptr); - WSREP_INFO("DEBUG: %s: executing row event: %s.%s", __FUNCTION__, - ptr->table->s->db.str, ptr->table->s->table_name.str); - if (0 == strcmp("test", ptr->table->s->db.str)) { - error= HA_ERR_LOCK_WAIT_TIMEOUT; - thd->is_slave_error= 1; - goto err; - } - } + RPL_TABLE_LIST *ptr= static_cast(rgi->tables_to_lock); + error= HA_ERR_LOCK_WAIT_TIMEOUT; + slave_rows_error_report( + INFORMATION_LEVEL, error, rgi, thd, ptr->table, + get_type_str(), RPL_LOG_NAME, log_pos); + my_error(error, MYF(0)); + thd->is_slave_error= 1; + goto err; } };); #endif /* WITH_WSREP */ diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc index 04a2031177118..9f0a4770f38bd 100644 --- a/sql/wsrep_applier.cc +++ b/sql/wsrep_applier.cc @@ -127,17 +127,19 @@ void wsrep_store_error(const THD* const thd, dst.size(), dst.size() ? dst.data() : "(null)"); } -int wsrep_apply_events(THD* thd, - Relay_log_info* rli, - const void* events_buf, - size_t buf_len) +static int apply_events(THD* thd, + Relay_log_info* rli, + const void* events_buf, + size_t buf_len, + const LEX_STRING &savepoint, + bool set_savepoint) { char *buf= (char *)events_buf; int rcode= 0; int event= 1; Log_event_type typ; - DBUG_ENTER("wsrep_apply_events"); + DBUG_ENTER("apply_events"); if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld", (long long) wsrep_thd_trx_seqno(thd)); @@ -147,6 +149,15 @@ int wsrep_apply_events(THD* thd, else thd->variables.gtid_domain_id= global_system_variables.gtid_domain_id; + bool in_trans = thd->in_active_multi_stmt_transaction(); + if (in_trans && set_savepoint) { + if (wsrep_applier_retry_count > 0 && !thd->wsrep_trx().is_streaming() && + trans_savepoint(thd, savepoint)) { + rcode = 1; + goto error; + } + } + while (buf_len) { int exec_res; @@ -234,6 +245,19 @@ int wsrep_apply_events(THD* thd, delete ev; goto error; } + + /* Transaction was started by the event, set the savepoint to rollback to + * in case of failure. */ + if (!in_trans && thd->in_active_multi_stmt_transaction()) { + in_trans = true; + if (wsrep_applier_retry_count > 0 && !thd->wsrep_trx().is_streaming() + && set_savepoint && trans_savepoint(thd, savepoint)) { + delete ev; + rcode = 1; + goto error; + } + } + event++; delete_or_keep_event_post_apply(thd->wsrep_rgi, typ, ev); @@ -247,3 +271,51 @@ int wsrep_apply_events(THD* thd, DBUG_RETURN(rcode); } + +int wsrep_apply_events(THD* thd, + Relay_log_info* rli, + const wsrep::const_buffer& data, + wsrep::mutable_buffer& err, + bool const include_msg) +{ + static char savepoint_name[20] = "wsrep_retry"; + static size_t savepoint_name_len = strlen(savepoint_name); + static const LEX_STRING savepoint= { savepoint_name, savepoint_name_len }; + uint n_retries = 0; + bool savepoint_exists = false; + + int ret= apply_events(thd, rli, data.data(), data.size(), savepoint, true); + + while (ret && n_retries < wsrep_applier_retry_count && + (savepoint_exists = trans_savepoint_exists(thd, savepoint))) { + /* applying failed, retry applying events */ + + /* rollback to savepoint without telling Wsrep-lib */ + thd->variables.wsrep_on = false; + if (FALSE != trans_rollback_to_savepoint(thd, savepoint)) { + thd->variables.wsrep_on = true; + break; + } + thd->variables.wsrep_on = true; + + /* reset THD object for retry */ + thd->clear_error(); + thd->reset_for_next_command(); + + /* retry applying events */ + ret= apply_events(thd, rli, data.data(), data.size(), savepoint, false); + n_retries++; + } + + if (savepoint_exists) { + trans_release_savepoint(thd, savepoint); + } + + if (ret && !thd->wsrep_has_ignored_error) + { + wsrep_store_error(thd, err, include_msg); + wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size()); + } + + return ret; +} diff --git a/sql/wsrep_applier.h b/sql/wsrep_applier.h index e633b1b9bf282..01dd496f7d8c6 100644 --- a/sql/wsrep_applier.h +++ b/sql/wsrep_applier.h @@ -20,10 +20,11 @@ #include "rpl_rli.h" // Relay_log_info #include "log_event.h" // Format_description_log_event -int wsrep_apply_events(THD* thd, - Relay_log_info* rli, - const void* events_buf, - size_t buf_len); +int wsrep_apply_events(THD* thd, + Relay_log_info* rli, + const wsrep::const_buffer& data, + wsrep::mutable_buffer& err, + bool const include_msg); /* Applier error codes, when nothing better is available. */ #define WSREP_RET_SUCCESS 0 // Success diff --git a/sql/wsrep_high_priority_service.cc b/sql/wsrep_high_priority_service.cc index aeade8bb7eed7..aa47ad91e18e5 100644 --- a/sql/wsrep_high_priority_service.cc +++ b/sql/wsrep_high_priority_service.cc @@ -121,69 +121,6 @@ static void wsrep_setup_uk_and_fk_checks(THD* thd) thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS; } -static int apply_events(THD* thd, - Relay_log_info* rli, - const wsrep::const_buffer& data, - wsrep::mutable_buffer& err, - bool const include_msg) -{ - static const LEX_CSTRING savepoint= { STRING_WITH_LEN("wsrep_retry") }; - uint n_retries = 0; - bool savepoint_exists = false; - uint applier_retry_count = wsrep_applier_retry_count; - - if (applier_retry_count > 0 && !thd->wsrep_trx().is_streaming()) { - /* create a savepoint in case we need to retry applying */ - savepoint_exists = (FALSE == trans_savepoint(thd, savepoint)); - } - - int ret= wsrep_apply_events(thd, rli, data.data(), data.size()); - - if (savepoint_exists) { - /* check that the savepoint still exists after apply */ - savepoint_exists = trans_savepoint_exists(thd, savepoint); - } - - while (ret && savepoint_exists && n_retries < applier_retry_count) { - /* applying failed, retry applying events */ - - /* rollback to savepoint without telling Wsrep-lib */ - thd->variables.wsrep_on = false; - if (FALSE != trans_rollback_to_savepoint(thd, savepoint)) { - thd->variables.wsrep_on = true; - break; - } - thd->variables.wsrep_on = true; - - /* reset THD object for retry */ - thd->clear_error(); - thd->reset_for_next_command(true); - - /* retry applying events */ - ret= wsrep_apply_events(thd, rli, data.data(), data.size()); - n_retries++; - /* check that the savepoint still exists after apply */ - savepoint_exists = trans_savepoint_exists(thd, savepoint); - } - - if (savepoint_exists) { - /* check that the savepoint still exists after apply */ - savepoint_exists = trans_savepoint_exists(thd, savepoint); - } - if (savepoint_exists) { - trans_release_savepoint(thd, savepoint); - } - - if (ret || wsrep_thd_has_ignored_error(thd)) - { - if (ret) - { - wsrep_store_error(thd, err, include_msg); - } - wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size()); - } - return ret; -} /**************************************************************************** High priority service @@ -486,7 +423,7 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta, #endif thd->set_time(); - int ret= apply_events(thd, m_rli, data, err, false); + int ret= wsrep_apply_events(thd, m_rli, data, err, false); wsrep_thd_set_ignored_error(thd, false); trans_commit(thd); @@ -654,7 +591,7 @@ int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta, #endif /* ENABLED_DEBUG_SYNC */ wsrep_setup_uk_and_fk_checks(thd); - int ret= apply_events(thd, m_rli, data, err, true); + int ret= wsrep_apply_events(thd, m_rli, data, err, true); thd->close_temporary_tables(); if (!ret && !(ws_meta.flags() & wsrep::provider::flag::commit)) @@ -823,7 +760,7 @@ int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta, ws_meta, thd->wsrep_sr().fragments()); } - ret= ret || apply_events(thd, m_rli, data, err, true); + ret= ret || wsrep_apply_events(thd, m_rli, data, err, true); thd->close_temporary_tables(); if (!ret && !(ws_meta.flags() & wsrep::provider::flag::commit)) { diff --git a/sql/wsrep_schema.cc b/sql/wsrep_schema.cc index 13780ede0e545..f8676ea963da3 100644 --- a/sql/wsrep_schema.cc +++ b/sql/wsrep_schema.cc @@ -1394,7 +1394,9 @@ static int replay_transaction(THD* thd, { Wsrep_schema_impl::thd_context_switch thd_context_switch(thd, orig_thd); - ret= wsrep_apply_events(orig_thd, rli, buf.ptr(), buf.length()); + wsrep::mutable_buffer unused; + ret= wsrep_apply_events(orig_thd, rli, {buf.c_ptr_quick(), buf.length()}, + unused, false); if (ret) { WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments");