Skip to content

Commit

Permalink
Ported Teemu's changes from Mysql 8.
Browse files Browse the repository at this point in the history
  • Loading branch information
plampio committed Jan 14, 2025
1 parent a5d90f8 commit 57d413f
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 101 deletions.
42 changes: 17 additions & 25 deletions sql/log_event_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4928,35 +4928,27 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
#ifdef WITH_WSREP
DBUG_EXECUTE_IF("apply_event_fail_once", {
if (WSREP(thd)) {
TABLE_LIST *table_list_ptr= rgi->tables_to_lock;
for (uint i=0 ; table_list_ptr && (i < rgi->tables_to_lock_count);
table_list_ptr= table_list_ptr->next_global, i++) {
RPL_TABLE_LIST *ptr= static_cast<RPL_TABLE_LIST*>(table_list_ptr);
WSREP_INFO("DEBUG: %s: executing row event: %s.%s", __FUNCTION__,
ptr->table->s->db.str, ptr->table->s->table_name.str);
if (0 == strcmp("test", ptr->table->s->db.str)) {
error= HA_ERR_LOCK_WAIT_TIMEOUT;
thd->is_slave_error= 1;
DBUG_SET("-d,apply_event_fail_once");
goto err;
}
}
RPL_TABLE_LIST *ptr= static_cast<RPL_TABLE_LIST*>(rgi->tables_to_lock);
error= HA_ERR_LOCK_WAIT_TIMEOUT;
slave_rows_error_report(
INFORMATION_LEVEL, error, rgi, thd, ptr->table,
get_type_str(), RPL_LOG_NAME, log_pos);
my_error(error, MYF(0));
thd->is_slave_error= 1;
DBUG_SET("-d,apply_event_fail_once");
goto err;
}
};);
DBUG_EXECUTE_IF("apply_event_fail_always", {
if (WSREP(thd)) {
TABLE_LIST *table_list_ptr= rgi->tables_to_lock;
for (uint i=0 ; table_list_ptr && (i < rgi->tables_to_lock_count);
table_list_ptr= table_list_ptr->next_global, i++) {
RPL_TABLE_LIST *ptr= static_cast<RPL_TABLE_LIST*>(table_list_ptr);
WSREP_INFO("DEBUG: %s: executing row event: %s.%s", __FUNCTION__,
ptr->table->s->db.str, ptr->table->s->table_name.str);
if (0 == strcmp("test", ptr->table->s->db.str)) {
error= HA_ERR_LOCK_WAIT_TIMEOUT;
thd->is_slave_error= 1;
goto err;
}
}
RPL_TABLE_LIST *ptr= static_cast<RPL_TABLE_LIST*>(rgi->tables_to_lock);
error= HA_ERR_LOCK_WAIT_TIMEOUT;
slave_rows_error_report(
INFORMATION_LEVEL, error, rgi, thd, ptr->table,
get_type_str(), RPL_LOG_NAME, log_pos);
my_error(error, MYF(0));
thd->is_slave_error= 1;
goto err;
}
};);
#endif /* WITH_WSREP */
Expand Down
82 changes: 77 additions & 5 deletions sql/wsrep_applier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,19 @@ void wsrep_store_error(const THD* const thd,
dst.size(), dst.size() ? dst.data() : "(null)");
}

int wsrep_apply_events(THD* thd,
Relay_log_info* rli,
const void* events_buf,
size_t buf_len)
static int apply_events(THD* thd,
Relay_log_info* rli,
const void* events_buf,
size_t buf_len,
const LEX_STRING &savepoint,
bool set_savepoint)
{
char *buf= (char *)events_buf;
int rcode= 0;
int event= 1;
Log_event_type typ;

DBUG_ENTER("wsrep_apply_events");
DBUG_ENTER("apply_events");
if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld",
(long long) wsrep_thd_trx_seqno(thd));

Expand All @@ -147,6 +149,15 @@ int wsrep_apply_events(THD* thd,
else
thd->variables.gtid_domain_id= global_system_variables.gtid_domain_id;

bool in_trans = thd->in_active_multi_stmt_transaction();
if (in_trans && set_savepoint) {
if (wsrep_applier_retry_count > 0 && !thd->wsrep_trx().is_streaming() &&
trans_savepoint(thd, savepoint)) {
rcode = 1;
goto error;
}
}

while (buf_len)
{
int exec_res;
Expand Down Expand Up @@ -234,6 +245,19 @@ int wsrep_apply_events(THD* thd,
delete ev;
goto error;
}

/* Transaction was started by the event, set the savepoint to rollback to
* in case of failure. */
if (!in_trans && thd->in_active_multi_stmt_transaction()) {
in_trans = true;
if (wsrep_applier_retry_count > 0 && !thd->wsrep_trx().is_streaming()
&& set_savepoint && trans_savepoint(thd, savepoint)) {
delete ev;
rcode = 1;
goto error;
}
}

event++;

delete_or_keep_event_post_apply(thd->wsrep_rgi, typ, ev);
Expand All @@ -247,3 +271,51 @@ int wsrep_apply_events(THD* thd,

DBUG_RETURN(rcode);
}

int wsrep_apply_events(THD* thd,
Relay_log_info* rli,
const wsrep::const_buffer& data,
wsrep::mutable_buffer& err,
bool const include_msg)
{
static char savepoint_name[20] = "wsrep_retry";
static size_t savepoint_name_len = strlen(savepoint_name);
static const LEX_STRING savepoint= { savepoint_name, savepoint_name_len };
uint n_retries = 0;
bool savepoint_exists = false;

int ret= apply_events(thd, rli, data.data(), data.size(), savepoint, true);

while (ret && n_retries < wsrep_applier_retry_count &&
(savepoint_exists = trans_savepoint_exists(thd, savepoint))) {
/* applying failed, retry applying events */

/* rollback to savepoint without telling Wsrep-lib */
thd->variables.wsrep_on = false;
if (FALSE != trans_rollback_to_savepoint(thd, savepoint)) {
thd->variables.wsrep_on = true;
break;
}
thd->variables.wsrep_on = true;

/* reset THD object for retry */
thd->clear_error();
thd->reset_for_next_command();

/* retry applying events */
ret= apply_events(thd, rli, data.data(), data.size(), savepoint, false);
n_retries++;
}

if (savepoint_exists) {
trans_release_savepoint(thd, savepoint);
}

if (ret && !thd->wsrep_has_ignored_error)
{
wsrep_store_error(thd, err, include_msg);
wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size());
}

return ret;
}
9 changes: 5 additions & 4 deletions sql/wsrep_applier.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
#include "rpl_rli.h" // Relay_log_info
#include "log_event.h" // Format_description_log_event

int wsrep_apply_events(THD* thd,
Relay_log_info* rli,
const void* events_buf,
size_t buf_len);
int wsrep_apply_events(THD* thd,
Relay_log_info* rli,
const wsrep::const_buffer& data,
wsrep::mutable_buffer& err,
bool const include_msg);

/* Applier error codes, when nothing better is available. */
#define WSREP_RET_SUCCESS 0 // Success
Expand Down
69 changes: 3 additions & 66 deletions sql/wsrep_high_priority_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,69 +121,6 @@ static void wsrep_setup_uk_and_fk_checks(THD* thd)
thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
}

static int apply_events(THD* thd,
Relay_log_info* rli,
const wsrep::const_buffer& data,
wsrep::mutable_buffer& err,
bool const include_msg)
{
static const LEX_CSTRING savepoint= { STRING_WITH_LEN("wsrep_retry") };
uint n_retries = 0;
bool savepoint_exists = false;
uint applier_retry_count = wsrep_applier_retry_count;

if (applier_retry_count > 0 && !thd->wsrep_trx().is_streaming()) {
/* create a savepoint in case we need to retry applying */
savepoint_exists = (FALSE == trans_savepoint(thd, savepoint));
}

int ret= wsrep_apply_events(thd, rli, data.data(), data.size());

if (savepoint_exists) {
/* check that the savepoint still exists after apply */
savepoint_exists = trans_savepoint_exists(thd, savepoint);
}

while (ret && savepoint_exists && n_retries < applier_retry_count) {
/* applying failed, retry applying events */

/* rollback to savepoint without telling Wsrep-lib */
thd->variables.wsrep_on = false;
if (FALSE != trans_rollback_to_savepoint(thd, savepoint)) {
thd->variables.wsrep_on = true;
break;
}
thd->variables.wsrep_on = true;

/* reset THD object for retry */
thd->clear_error();
thd->reset_for_next_command(true);

/* retry applying events */
ret= wsrep_apply_events(thd, rli, data.data(), data.size());
n_retries++;
/* check that the savepoint still exists after apply */
savepoint_exists = trans_savepoint_exists(thd, savepoint);
}

if (savepoint_exists) {
/* check that the savepoint still exists after apply */
savepoint_exists = trans_savepoint_exists(thd, savepoint);
}
if (savepoint_exists) {
trans_release_savepoint(thd, savepoint);
}

if (ret || wsrep_thd_has_ignored_error(thd))
{
if (ret)
{
wsrep_store_error(thd, err, include_msg);
}
wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size());
}
return ret;
}

/****************************************************************************
High priority service
Expand Down Expand Up @@ -486,7 +423,7 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta,
#endif

thd->set_time();
int ret= apply_events(thd, m_rli, data, err, false);
int ret= wsrep_apply_events(thd, m_rli, data, err, false);
wsrep_thd_set_ignored_error(thd, false);
trans_commit(thd);

Expand Down Expand Up @@ -654,7 +591,7 @@ int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta,
#endif /* ENABLED_DEBUG_SYNC */

wsrep_setup_uk_and_fk_checks(thd);
int ret= apply_events(thd, m_rli, data, err, true);
int ret= wsrep_apply_events(thd, m_rli, data, err, true);

thd->close_temporary_tables();
if (!ret && !(ws_meta.flags() & wsrep::provider::flag::commit))
Expand Down Expand Up @@ -823,7 +760,7 @@ int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta,
ws_meta,
thd->wsrep_sr().fragments());
}
ret= ret || apply_events(thd, m_rli, data, err, true);
ret= ret || wsrep_apply_events(thd, m_rli, data, err, true);
thd->close_temporary_tables();
if (!ret && !(ws_meta.flags() & wsrep::provider::flag::commit))
{
Expand Down
4 changes: 3 additions & 1 deletion sql/wsrep_schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1394,7 +1394,9 @@ static int replay_transaction(THD* thd,
{
Wsrep_schema_impl::thd_context_switch thd_context_switch(thd, orig_thd);

ret= wsrep_apply_events(orig_thd, rli, buf.ptr(), buf.length());
wsrep::mutable_buffer unused;
ret= wsrep_apply_events(orig_thd, rli, {buf.c_ptr_quick(), buf.length()},
unused, false);
if (ret)
{
WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments");
Expand Down

0 comments on commit 57d413f

Please sign in to comment.