diff --git a/mysql-test/suite/galera/r/galera_multirow_rollback.result b/mysql-test/suite/galera/r/galera_multirow_rollback.result index 13502d0d3dcab..90fac64913d38 100644 --- a/mysql-test/suite/galera/r/galera_multirow_rollback.result +++ b/mysql-test/suite/galera/r/galera_multirow_rollback.result @@ -71,3 +71,54 @@ id fk1 3 1 DROP TABLE c; DROP TABLE p; +connect node_1a, 127.0.0.1, root, , test, $NODE_MYPORT_1; +SET GLOBAL DEBUG_DBUG = "d,sync.wsrep_apply_cb"; +connection node_1; +CREATE TABLE t1 (f1 int primary key, f2 int); +INSERT INTO t1 VALUES (1,0); +BEGIN; +INSERT INTO t1 VALUES (2,4),(1,1); +ERROR 23000: Duplicate entry '1' for key 'PRIMARY' +connection node_2; +UPDATE t1 SET f2=8 WHERE f1=1; +connection node_1a; +SET DEBUG_SYNC = "now WAIT_FOR sync.wsrep_apply_cb_reached"; +connection node_1; +COMMIT; +connection node_1a; +SET DEBUG_SYNC = "now SIGNAL signal.wsrep_apply_cb"; +connection node_1; +SELECT * FROM t1; +f1 f2 +1 8 +DROP TABLE t1; +CREATE TABLE t1(f1 int primary key, f2 int); +INSERT INTO t1 VALUES (1,0); +connection node_1a; +SET GLOBAL DEBUG_DBUG = "d,sync.wsrep_apply_cb"; +connection node_1; +BEGIN; +INSERT INTO t1 VALUES (3,5),(1,1); +ERROR 23000: Duplicate entry '1' for key 'PRIMARY' +connection node_2; +UPDATE t1 SET f2=9 WHERE f1=1; +connection node_1a; +SET DEBUG_SYNC = "now WAIT_FOR sync.wsrep_apply_cb_reached"; +SET DEBUG_SYNC = "now SIGNAL signal.wsrep_apply_cb"; +connection node_1; +COMMIT; +ERROR 40001: Deadlock found when trying to get lock; try restarting transaction +SELECT * FROM t1; +f1 f2 +1 9 +DROP TABLE t1; +CREATE TABLE t1 (i int primary key) engine=innodb; +BEGIN; +INSERT INTO t1 VALUES (1); +INSERT INTO t1 VALUES (2); +INSERT INTO t1 VALUES (1); +ERROR 23000: Duplicate entry '1' for key 'PRIMARY' +COMMIT; +DROP TABLE t1; +SET DEBUG_SYNC='reset'; +SET GLOBAL debug_dbug = DEFAULT; diff --git a/mysql-test/suite/galera/t/galera_multirow_rollback.test b/mysql-test/suite/galera/t/galera_multirow_rollback.test index a5aaedd8bd488..e764ec0e01b00 100644 --- a/mysql-test/suite/galera/t/galera_multirow_rollback.test +++ b/mysql-test/suite/galera/t/galera_multirow_rollback.test @@ -87,3 +87,85 @@ SELECT * FROM c; DROP TABLE c; DROP TABLE p; + + +# +# Case 5: testing statement rollback +# +#SET SESSION debug_dbug="+d,wsrep_disable_fix"; + +--connect node_1a, 127.0.0.1, root, , test, $NODE_MYPORT_1 +SET GLOBAL DEBUG_DBUG = "d,sync.wsrep_apply_cb"; + +--connection node_1 +CREATE TABLE t1 (f1 int primary key, f2 int); +INSERT INTO t1 VALUES (1,0); + +BEGIN; +--error ER_DUP_ENTRY +INSERT INTO t1 VALUES (2,4),(1,1); + +--connection node_2 +UPDATE t1 SET f2=8 WHERE f1=1; + +--connection node_1a +SET DEBUG_SYNC = "now WAIT_FOR sync.wsrep_apply_cb_reached"; + +--connection node_1 +COMMIT; + +--connection node_1a +SET DEBUG_SYNC = "now SIGNAL signal.wsrep_apply_cb"; + +--connection node_1 +SELECT * FROM t1; +DROP TABLE t1; + + +# +# Case 6: testing statement rollback with BF abort +# +CREATE TABLE t1(f1 int primary key, f2 int); +INSERT INTO t1 VALUES (1,0); + +--connection node_1a +SET GLOBAL DEBUG_DBUG = "d,sync.wsrep_apply_cb"; + +--connection node_1 +BEGIN; +--error ER_DUP_ENTRY +INSERT INTO t1 VALUES (3,5),(1,1); + +--connection node_2 +UPDATE t1 SET f2=9 WHERE f1=1; + +--connection node_1a +SET DEBUG_SYNC = "now WAIT_FOR sync.wsrep_apply_cb_reached"; +SET DEBUG_SYNC = "now SIGNAL signal.wsrep_apply_cb"; +--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE = 'wsrep applier committed' +--source include/wait_condition.inc + +--connection node_1 +--error ER_LOCK_DEADLOCK +COMMIT; + +SELECT * FROM t1; +DROP TABLE t1; + +# +# Case 7: +# +CREATE TABLE t1 (i int primary key) engine=innodb; +BEGIN; +INSERT INTO t1 VALUES (1); +INSERT INTO t1 VALUES (2); +--error ER_DUP_ENTRY +INSERT INTO t1 VALUES (1); +COMMIT; + + +# Cleanup +DROP TABLE t1; +SET DEBUG_SYNC='reset'; +SET GLOBAL debug_dbug = DEFAULT; + diff --git a/mysql-test/suite/galera_sr/r/galera_sr_multirow_rollback.result b/mysql-test/suite/galera_sr/r/galera_sr_multirow_rollback.result index 1795d16bbfa46..332e4dd243b2e 100644 --- a/mysql-test/suite/galera_sr/r/galera_sr_multirow_rollback.result +++ b/mysql-test/suite/galera_sr/r/galera_sr_multirow_rollback.result @@ -22,6 +22,7 @@ START TRANSACTION; INSERT INTO t1 (f2) VALUES ('a'), ('b'); ERROR 23000: Duplicate entry '0' for key 'PRIMARY' COMMIT; +ERROR HY000: Maximum writeset size exceeded SELECT COUNT(*) AS expect_0 FROM t1; expect_0 0 diff --git a/mysql-test/suite/galera_sr/t/galera_sr_multirow_rollback.test b/mysql-test/suite/galera_sr/t/galera_sr_multirow_rollback.test index 9004d33239367..de5cd09a60333 100644 --- a/mysql-test/suite/galera_sr/t/galera_sr_multirow_rollback.test +++ b/mysql-test/suite/galera_sr/t/galera_sr_multirow_rollback.test @@ -39,6 +39,7 @@ SET SESSION wsrep_trx_fragment_size = 1000; START TRANSACTION; --error ER_DUP_ENTRY INSERT INTO t1 (f2) VALUES ('a'), ('b'); +--error ER_UNKNOWN_ERROR COMMIT; SELECT COUNT(*) AS expect_0 FROM t1; diff --git a/sql/handler.cc b/sql/handler.cc index 1ea1818749c5b..2b09f7df049e9 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -2087,7 +2087,11 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) { int err; +#ifdef WITH_WSREP + if ((has_binlog_hton(ha_info) || WSREP_EMULATE_BINLOG(thd)) && +#else if (has_binlog_hton(ha_info) && +#endif /* WITH_WSREP */ (err= binlog_commit(thd, all, is_ro_1pc_trans(thd, ha_info, all, is_real_trans)))) { @@ -7075,7 +7079,7 @@ bool handler::check_table_binlog_row_based_internal() (thd->variables.option_bits & OPTION_BIN_LOG)) && mysql_bin_log.is_open())), (thd->variables.option_bits & OPTION_BIN_LOG) && - mysql_bin_log.is_open())); + mysql_bin_log.is_open())); } diff --git a/sql/log.cc b/sql/log.cc index a49b4b2c0fa33..7921b41b95323 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -348,7 +348,7 @@ class binlog_cache_data my_off_t get_prev_position() { - return(before_stmt_pos); + return(before_stmt_pos); } void set_prev_position(my_off_t pos) @@ -5869,17 +5869,6 @@ THD::binlog_start_trans_and_stmt() this->binlog_set_stmt_begin(); bool mstmt_mode= in_multi_stmt_transaction_mode(); #ifdef WITH_WSREP - /* - With wsrep binlog emulation we can skip the rest because the - binlog cache will not be written into binlog. Note however that - because of this the hton callbacks will not get called to clean - up the cache, so this must be done explicitly when the transaction - terminates. - */ - if (WSREP_EMULATE_BINLOG_NNULL(this)) - { - DBUG_VOID_RETURN; - } /* If this event replicates through a master-slave then we need to inject manually GTID so it is preserved in the cluster. We are writing directly to WSREP buffer and not in IO cache because in case of IO cache @@ -11922,20 +11911,6 @@ void wsrep_thd_binlog_trx_reset(THD * thd) DBUG_VOID_RETURN; } -void wsrep_thd_binlog_stmt_rollback(THD * thd) -{ - DBUG_ENTER("wsrep_thd_binlog_stmt_rollback"); - WSREP_DEBUG("wsrep_thd_binlog_stmt_rollback"); - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - if (cache_mngr) - { - thd->binlog_remove_pending_rows_event(TRUE, TRUE); - cache_mngr->stmt_cache.reset(); - } - DBUG_VOID_RETURN; -} - void wsrep_register_binlog_handler(THD *thd, bool trx) { DBUG_ENTER("register_binlog_handler"); diff --git a/sql/log.h b/sql/log.h index 7b62a1a547750..536d4562aa7cc 100644 --- a/sql/log.h +++ b/sql/log.h @@ -1254,7 +1254,6 @@ static inline TC_LOG *get_tc_log_implementation() #ifdef WITH_WSREP IO_CACHE* wsrep_get_cache(THD *, bool); void wsrep_thd_binlog_trx_reset(THD * thd); -void wsrep_thd_binlog_stmt_rollback(THD * thd); #endif /* WITH_WSREP */ class Gtid_list_log_event; diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index d7c1af4659cdf..5fac89a208961 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -4741,7 +4741,7 @@ select_create::prepare(List &_values, SELECT_LEX_UNIT *u) */ if (!thd->lex->tmp_table() && thd->is_current_stmt_binlog_format_row() && - mysql_bin_log.is_open()) + (mysql_bin_log.is_open() || WSREP_EMULATE_BINLOG(thd))) { thd->binlog_start_trans_and_stmt(); } @@ -5105,7 +5105,7 @@ bool select_create::send_eof() { thd->get_stmt_da()->set_overwrite_status(FALSE); mysql_mutex_lock(&thd->LOCK_thd_data); - if (wsrep_current_error(thd)) + if (wsrep_current_error(thd) || thd->is_error()) { WSREP_DEBUG("select_create commit failed, thd: %llu err: %s %s", thd->thread_id, diff --git a/sql/wsrep_client_service.cc b/sql/wsrep_client_service.cc index d3b4a18195b4f..8563e636e364b 100644 --- a/sql/wsrep_client_service.cc +++ b/sql/wsrep_client_service.cc @@ -101,7 +101,6 @@ int Wsrep_client_service::prepare_data_for_replication() { WSREP_ERROR("rbr write fail, data_len: %zu", data_len); - // wsrep_override_error(m_thd, ER_ERROR_DURING_COMMIT); DBUG_RETURN(1); } @@ -110,7 +109,6 @@ int Wsrep_client_service::prepare_data_for_replication() { WSREP_ERROR("rbr write fail, data_len: %zu", data_len); - // wsrep_override_error(m_thd, ER_ERROR_DURING_COMMIT); DBUG_RETURN(1); } @@ -138,6 +136,10 @@ int Wsrep_client_service::prepare_data_for_replication() { WSREP_DEBUG("empty rbr buffer, query: %s", wsrep_thd_query(m_thd)); } + + /* SR may have empty last WS to just carry the comit flag */ + if (!m_thd->wsrep_trx().is_streaming()) + DBUG_RETURN(1); } DBUG_RETURN(0); } diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index ae539668827b2..f04a10ea16cf6 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -3803,6 +3803,7 @@ void wsrep_commit_empty(THD* thd, bool all) we have already aborted transaction e.g. because max writeset size has been reached. */ DBUG_ASSERT(!wsrep_has_changes(thd) || + thd->wsrep_affected_rows == 0 || (thd->lex->sql_command == SQLCOM_CREATE_TABLE && !thd->is_current_stmt_binlog_format_row()) || thd->wsrep_cs().transaction().state() == wsrep::transaction::s_aborted); diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h index 4d260742ddeeb..822d22f08bcf6 100644 --- a/sql/wsrep_trans_observer.h +++ b/sql/wsrep_trans_observer.h @@ -196,16 +196,23 @@ static inline bool wsrep_run_commit_hook(THD* thd, bool all) wsrep_is_active(thd), wsrep_is_real(thd, all), wsrep_has_changes(thd), wsrep_thd_is_applying(thd), wsrep_is_ordered(thd))); - /* Is MST commit or autocommit? */ - bool ret= wsrep_is_active(thd) && wsrep_is_real(thd, all); + + /* is trx registered in wsrep provider */ + if (!wsrep_is_active(thd)) DBUG_RETURN(false); + if (!wsrep_is_real(thd, all)) DBUG_RETURN(false); + /* Do not commit if we are aborting */ - ret= ret && (thd->wsrep_trx().state() != wsrep::transaction::s_aborting); - if (ret && !(wsrep_has_changes(thd) || /* Has generated write set */ + if (thd->wsrep_trx().state() == wsrep::transaction::s_aborting) + DBUG_RETURN(false); + + mysql_mutex_lock(&thd->LOCK_thd_data); + + /* Has generated write set */ + if (!(wsrep_has_changes(thd) || /* Is high priority (replay, applier, storage) and the transaction is scheduled for commit ordering */ (wsrep_thd_is_applying(thd) && wsrep_is_ordered(thd)))) { - mysql_mutex_lock(&thd->LOCK_thd_data); DBUG_PRINT("wsrep", ("state: %s", wsrep::to_c_string(thd->wsrep_trx().state()))); /* Transaction is local but has no changes, the commit hooks will @@ -213,12 +220,11 @@ static inline bool wsrep_run_commit_hook(THD* thd, bool all) wsrep_commit_empty() */ if (thd->wsrep_trx().state() == wsrep::transaction::s_executing) { - ret= false; + mysql_mutex_unlock(&thd->LOCK_thd_data); + DBUG_RETURN(false); } - mysql_mutex_unlock(&thd->LOCK_thd_data); } - mysql_mutex_lock(&thd->LOCK_thd_data); /* Transaction creating sequence is TOI or RSU, CREATE SEQUENCE = CREATE + INSERT (initial value) and replicated using statement based replication, thus @@ -227,16 +233,33 @@ static inline bool wsrep_run_commit_hook(THD* thd, bool all) For TEMPORARY SEQUENCES commit hooks will be done as CREATE + INSERT is not replicated and needs to be committed locally. */ - if (ret && - (thd->wsrep_cs().mode() == wsrep::client_state::m_toi || + if ((thd->wsrep_cs().mode() == wsrep::client_state::m_toi || thd->wsrep_cs().mode() == wsrep::client_state::m_rsu) && thd->lex->sql_command == SQLCOM_CREATE_SEQUENCE && !thd->lex->tmp_table()) - ret= false; + DBUG_RETURN(false); + + if (thd->wsrep_cs().mode() == Wsrep_client_state::m_local && + !thd->wsrep_trx().is_streaming()) + { + IO_CACHE* trx_cache= wsrep_get_cache(thd, true); + IO_CACHE* stmt_cache= wsrep_get_cache(thd, false); + + if (!( (trx_cache && my_b_tell(trx_cache) > 0) || + (stmt_cache && my_b_tell(stmt_cache) > 0) + ) + ) + { + WSREP_DEBUG("empty RBR buffer, query: %s", wsrep_thd_query(thd)); + thd->wsrep_affected_rows = 0; + mysql_mutex_unlock(&thd->LOCK_thd_data); + DBUG_RETURN(false); + } + } mysql_mutex_unlock(&thd->LOCK_thd_data); - DBUG_PRINT("wsrep", ("return: %d", ret)); - DBUG_RETURN(ret); + DBUG_PRINT("wsrep", ("return: true")); + DBUG_RETURN(true); } /* @@ -375,7 +398,7 @@ static inline int wsrep_after_commit(THD* thd, bool all) wsrep_is_active(thd), (long long)wsrep_thd_trx_seqno(thd), wsrep_has_changes(thd)); - DBUG_ASSERT(wsrep_run_commit_hook(thd, all)); + //DBUG_ASSERT(wsrep_run_commit_hook(thd, all)); if (thd->internal_transaction()) DBUG_RETURN(0); int ret= 0; @@ -401,11 +424,6 @@ static inline int wsrep_before_rollback(THD* thd, bool all) { if (!all && thd->in_active_multi_stmt_transaction()) { - if (wsrep_emulate_bin_log) - { - wsrep_thd_binlog_stmt_rollback(thd); - } - if (thd->wsrep_trx().is_streaming() && (wsrep_fragments_certified_for_stmt(thd) > 0)) {