Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mdev-31272 new fix variant #332

Open
wants to merge 12 commits into
base: 10.6
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions mysql-test/suite/galera/r/galera_multirow_rollback.result
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,54 @@ id fk1
3 1
DROP TABLE c;
DROP TABLE p;
connect node_1a, 127.0.0.1, root, , test, $NODE_MYPORT_1;
SET GLOBAL DEBUG_DBUG = "d,sync.wsrep_apply_cb";
connection node_1;
CREATE TABLE t1 (f1 int primary key, f2 int);
INSERT INTO t1 VALUES (1,0);
BEGIN;
INSERT INTO t1 VALUES (2,4),(1,1);
ERROR 23000: Duplicate entry '1' for key 'PRIMARY'
connection node_2;
UPDATE t1 SET f2=8 WHERE f1=1;
connection node_1a;
SET DEBUG_SYNC = "now WAIT_FOR sync.wsrep_apply_cb_reached";
connection node_1;
COMMIT;
connection node_1a;
SET DEBUG_SYNC = "now SIGNAL signal.wsrep_apply_cb";
connection node_1;
SELECT * FROM t1;
f1 f2
1 8
DROP TABLE t1;
CREATE TABLE t1(f1 int primary key, f2 int);
INSERT INTO t1 VALUES (1,0);
connection node_1a;
SET GLOBAL DEBUG_DBUG = "d,sync.wsrep_apply_cb";
connection node_1;
BEGIN;
INSERT INTO t1 VALUES (3,5),(1,1);
ERROR 23000: Duplicate entry '1' for key 'PRIMARY'
connection node_2;
UPDATE t1 SET f2=9 WHERE f1=1;
connection node_1a;
SET DEBUG_SYNC = "now WAIT_FOR sync.wsrep_apply_cb_reached";
SET DEBUG_SYNC = "now SIGNAL signal.wsrep_apply_cb";
connection node_1;
COMMIT;
ERROR 40001: Deadlock found when trying to get lock; try restarting transaction
SELECT * FROM t1;
f1 f2
1 9
DROP TABLE t1;
CREATE TABLE t1 (i int primary key) engine=innodb;
BEGIN;
INSERT INTO t1 VALUES (1);
INSERT INTO t1 VALUES (2);
INSERT INTO t1 VALUES (1);
ERROR 23000: Duplicate entry '1' for key 'PRIMARY'
COMMIT;
DROP TABLE t1;
SET DEBUG_SYNC='reset';
SET GLOBAL debug_dbug = DEFAULT;
82 changes: 82 additions & 0 deletions mysql-test/suite/galera/t/galera_multirow_rollback.test
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,85 @@ SELECT * FROM c;

DROP TABLE c;
DROP TABLE p;


#
# Case 5: testing statement rollback

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Case 5: It is not so clear what case 5 is trying to test. If I understand correctly, you want to make sure that the COMMIT from node_1 does not go through replication / certification because it is an empty transaction. If so simply check that no gtid was consumed, or alternatively that seqno has not advanced. This case would not need debug sync at all and make it trivial to understand.

#
#SET SESSION debug_dbug="+d,wsrep_disable_fix";

--connect node_1a, 127.0.0.1, root, , test, $NODE_MYPORT_1
SET GLOBAL DEBUG_DBUG = "d,sync.wsrep_apply_cb";

--connection node_1
CREATE TABLE t1 (f1 int primary key, f2 int);
INSERT INTO t1 VALUES (1,0);

BEGIN;
--error ER_DUP_ENTRY
INSERT INTO t1 VALUES (2,4),(1,1);

--connection node_2
UPDATE t1 SET f2=8 WHERE f1=1;

--connection node_1a
SET DEBUG_SYNC = "now WAIT_FOR sync.wsrep_apply_cb_reached";

--connection node_1
COMMIT;

--connection node_1a
SET DEBUG_SYNC = "now SIGNAL signal.wsrep_apply_cb";

--connection node_1
SELECT * FROM t1;
DROP TABLE t1;


#
# Case 6: testing statement rollback with BF abort

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Case 6 can also simplified and make use of wait conditions instead of debug sync points (see galera_bf_abort.test). If we make this test debug sync free it has the advantage of being run in debug and release builds.

#
CREATE TABLE t1(f1 int primary key, f2 int);
INSERT INTO t1 VALUES (1,0);

--connection node_1a
SET GLOBAL DEBUG_DBUG = "d,sync.wsrep_apply_cb";

--connection node_1
BEGIN;
--error ER_DUP_ENTRY
INSERT INTO t1 VALUES (3,5),(1,1);

--connection node_2
UPDATE t1 SET f2=9 WHERE f1=1;

--connection node_1a
SET DEBUG_SYNC = "now WAIT_FOR sync.wsrep_apply_cb_reached";
SET DEBUG_SYNC = "now SIGNAL signal.wsrep_apply_cb";
--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE = 'wsrep applier committed'
--source include/wait_condition.inc

--connection node_1
--error ER_LOCK_DEADLOCK
COMMIT;

SELECT * FROM t1;
DROP TABLE t1;

#
# Case 7:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Case 7 looks like case 2. If so, drop it.

#
CREATE TABLE t1 (i int primary key) engine=innodb;
BEGIN;
INSERT INTO t1 VALUES (1);
INSERT INTO t1 VALUES (2);
--error ER_DUP_ENTRY
INSERT INTO t1 VALUES (1);
COMMIT;


# Cleanup
DROP TABLE t1;
SET DEBUG_SYNC='reset';
SET GLOBAL debug_dbug = DEFAULT;

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ START TRANSACTION;
INSERT INTO t1 (f2) VALUES ('a'), ('b');
ERROR 23000: Duplicate entry '0' for key 'PRIMARY'
COMMIT;
ERROR HY000: Maximum writeset size exceeded
SELECT COUNT(*) AS expect_0 FROM t1;
expect_0
0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ SET SESSION wsrep_trx_fragment_size = 1000;
START TRANSACTION;
--error ER_DUP_ENTRY
INSERT INTO t1 (f2) VALUES ('a'), ('b');
--error ER_UNKNOWN_ERROR
COMMIT;

SELECT COUNT(*) AS expect_0 FROM t1;
Expand Down
6 changes: 5 additions & 1 deletion sql/handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2087,7 +2087,11 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
{
int err;

#ifdef WITH_WSREP
if ((has_binlog_hton(ha_info) || WSREP_EMULATE_BINLOG(thd)) &&
#else
if (has_binlog_hton(ha_info) &&
#endif /* WITH_WSREP */
(err= binlog_commit(thd, all,
is_ro_1pc_trans(thd, ha_info, all, is_real_trans))))
{
Expand Down Expand Up @@ -7075,7 +7079,7 @@ bool handler::check_table_binlog_row_based_internal()
(thd->variables.option_bits & OPTION_BIN_LOG)) &&
mysql_bin_log.is_open())),
(thd->variables.option_bits & OPTION_BIN_LOG) &&
mysql_bin_log.is_open()));
mysql_bin_log.is_open()));
}


Expand Down
27 changes: 1 addition & 26 deletions sql/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ class binlog_cache_data

my_off_t get_prev_position()
{
return(before_stmt_pos);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop this whitespace change.

return(before_stmt_pos);
}

void set_prev_position(my_off_t pos)
Expand Down Expand Up @@ -5869,17 +5869,6 @@ THD::binlog_start_trans_and_stmt()
this->binlog_set_stmt_begin();
bool mstmt_mode= in_multi_stmt_transaction_mode();
#ifdef WITH_WSREP
/*
With wsrep binlog emulation we can skip the rest because the
binlog cache will not be written into binlog. Note however that
because of this the hton callbacks will not get called to clean
up the cache, so this must be done explicitly when the transaction
terminates.
*/
if (WSREP_EMULATE_BINLOG_NNULL(this))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you drop this optimization, then we should probably remove the patch at the beginning of ha_savepoint().

{
DBUG_VOID_RETURN;
}
/* If this event replicates through a master-slave then we need to
inject manually GTID so it is preserved in the cluster. We are writing
directly to WSREP buffer and not in IO cache because in case of IO cache
Expand Down Expand Up @@ -11922,20 +11911,6 @@ void wsrep_thd_binlog_trx_reset(THD * thd)
DBUG_VOID_RETURN;
}

void wsrep_thd_binlog_stmt_rollback(THD * thd)
{
DBUG_ENTER("wsrep_thd_binlog_stmt_rollback");
WSREP_DEBUG("wsrep_thd_binlog_stmt_rollback");
binlog_cache_mngr *const cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
if (cache_mngr)
{
thd->binlog_remove_pending_rows_event(TRUE, TRUE);
cache_mngr->stmt_cache.reset();
}
DBUG_VOID_RETURN;
}

void wsrep_register_binlog_handler(THD *thd, bool trx)
{
DBUG_ENTER("register_binlog_handler");
Expand Down
1 change: 0 additions & 1 deletion sql/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -1254,7 +1254,6 @@ static inline TC_LOG *get_tc_log_implementation()
#ifdef WITH_WSREP
IO_CACHE* wsrep_get_cache(THD *, bool);
void wsrep_thd_binlog_trx_reset(THD * thd);
void wsrep_thd_binlog_stmt_rollback(THD * thd);
#endif /* WITH_WSREP */

class Gtid_list_log_event;
Expand Down
4 changes: 2 additions & 2 deletions sql/sql_insert.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4741,7 +4741,7 @@ select_create::prepare(List<Item> &_values, SELECT_LEX_UNIT *u)
*/
if (!thd->lex->tmp_table() &&
thd->is_current_stmt_binlog_format_row() &&
mysql_bin_log.is_open())
(mysql_bin_log.is_open() || WSREP_EMULATE_BINLOG(thd)))
{
thd->binlog_start_trans_and_stmt();
}
Expand Down Expand Up @@ -5105,7 +5105,7 @@ bool select_create::send_eof()
{
thd->get_stmt_da()->set_overwrite_status(FALSE);
mysql_mutex_lock(&thd->LOCK_thd_data);
if (wsrep_current_error(thd))
if (wsrep_current_error(thd) || thd->is_error())
{
WSREP_DEBUG("select_create commit failed, thd: %llu err: %s %s",
thd->thread_id,
Expand Down
6 changes: 4 additions & 2 deletions sql/wsrep_client_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ int Wsrep_client_service::prepare_data_for_replication()
{
WSREP_ERROR("rbr write fail, data_len: %zu",
data_len);
// wsrep_override_error(m_thd, ER_ERROR_DURING_COMMIT);
DBUG_RETURN(1);
}

Expand All @@ -110,7 +109,6 @@ int Wsrep_client_service::prepare_data_for_replication()
{
WSREP_ERROR("rbr write fail, data_len: %zu",
data_len);
// wsrep_override_error(m_thd, ER_ERROR_DURING_COMMIT);
DBUG_RETURN(1);
}

Expand Down Expand Up @@ -138,6 +136,10 @@ int Wsrep_client_service::prepare_data_for_replication()
{
WSREP_DEBUG("empty rbr buffer, query: %s", wsrep_thd_query(m_thd));
}

/* SR may have empty last WS to just carry the comit flag */

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: "comit" should be "commit"

if (!m_thd->wsrep_trx().is_streaming())
DBUG_RETURN(1);
}
DBUG_RETURN(0);
}
Expand Down
1 change: 1 addition & 0 deletions sql/wsrep_mysqld.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3803,6 +3803,7 @@ void wsrep_commit_empty(THD* thd, bool all)
we have already aborted transaction e.g. because max writeset size
has been reached. */
DBUG_ASSERT(!wsrep_has_changes(thd) ||
thd->wsrep_affected_rows == 0 ||
(thd->lex->sql_command == SQLCOM_CREATE_TABLE &&
!thd->is_current_stmt_binlog_format_row()) ||
thd->wsrep_cs().transaction().state() == wsrep::transaction::s_aborted);
Expand Down
56 changes: 37 additions & 19 deletions sql/wsrep_trans_observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,29 +196,35 @@ static inline bool wsrep_run_commit_hook(THD* thd, bool all)
wsrep_is_active(thd), wsrep_is_real(thd, all),
wsrep_has_changes(thd), wsrep_thd_is_applying(thd),
wsrep_is_ordered(thd)));
/* Is MST commit or autocommit? */
bool ret= wsrep_is_active(thd) && wsrep_is_real(thd, all);

/* is trx registered in wsrep provider */
if (!wsrep_is_active(thd)) DBUG_RETURN(false);
if (!wsrep_is_real(thd, all)) DBUG_RETURN(false);

/* Do not commit if we are aborting */
ret= ret && (thd->wsrep_trx().state() != wsrep::transaction::s_aborting);
if (ret && !(wsrep_has_changes(thd) || /* Has generated write set */
if (thd->wsrep_trx().state() == wsrep::transaction::s_aborting)
DBUG_RETURN(false);

mysql_mutex_lock(&thd->LOCK_thd_data);

/* Has generated write set */
if (!(wsrep_has_changes(thd) ||
/* Is high priority (replay, applier, storage) and the
transaction is scheduled for commit ordering */
(wsrep_thd_is_applying(thd) && wsrep_is_ordered(thd))))
{
mysql_mutex_lock(&thd->LOCK_thd_data);
DBUG_PRINT("wsrep", ("state: %s",
wsrep::to_c_string(thd->wsrep_trx().state())));
/* Transaction is local but has no changes, the commit hooks will
be skipped and the wsrep transaction is terminated in
wsrep_commit_empty() */
if (thd->wsrep_trx().state() == wsrep::transaction::s_executing)
{
ret= false;
mysql_mutex_unlock(&thd->LOCK_thd_data);
DBUG_RETURN(false);
}
mysql_mutex_unlock(&thd->LOCK_thd_data);
}

mysql_mutex_lock(&thd->LOCK_thd_data);
/* Transaction creating sequence is TOI or RSU,
CREATE SEQUENCE = CREATE + INSERT (initial value)
and replicated using statement based replication, thus
Expand All @@ -227,16 +233,33 @@ static inline bool wsrep_run_commit_hook(THD* thd, bool all)
For TEMPORARY SEQUENCES commit hooks will be done as
CREATE + INSERT is not replicated and needs to be
committed locally. */
if (ret &&
(thd->wsrep_cs().mode() == wsrep::client_state::m_toi ||
if ((thd->wsrep_cs().mode() == wsrep::client_state::m_toi ||
thd->wsrep_cs().mode() == wsrep::client_state::m_rsu) &&
thd->lex->sql_command == SQLCOM_CREATE_SEQUENCE &&
!thd->lex->tmp_table())
ret= false;
DBUG_RETURN(false);

if (thd->wsrep_cs().mode() == Wsrep_client_state::m_local &&
!thd->wsrep_trx().is_streaming())
{
IO_CACHE* trx_cache= wsrep_get_cache(thd, true);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it would be good to put this block into a function in wsrep_binlog.cc or log.cc

IO_CACHE* stmt_cache= wsrep_get_cache(thd, false);

if (!( (trx_cache && my_b_tell(trx_cache) > 0) ||
(stmt_cache && my_b_tell(stmt_cache) > 0)
)
)
{
WSREP_DEBUG("empty RBR buffer, query: %s", wsrep_thd_query(thd));
thd->wsrep_affected_rows = 0;
mysql_mutex_unlock(&thd->LOCK_thd_data);
DBUG_RETURN(false);
}
}
mysql_mutex_unlock(&thd->LOCK_thd_data);

DBUG_PRINT("wsrep", ("return: %d", ret));
DBUG_RETURN(ret);
DBUG_PRINT("wsrep", ("return: true"));
DBUG_RETURN(true);
}

/*
Expand Down Expand Up @@ -375,7 +398,7 @@ static inline int wsrep_after_commit(THD* thd, bool all)
wsrep_is_active(thd),
(long long)wsrep_thd_trx_seqno(thd),
wsrep_has_changes(thd));
DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
//DBUG_ASSERT(wsrep_run_commit_hook(thd, all));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not have assertions that are commented out. Either remove this line altogether, or put it back and fix if necessary.

if (thd->internal_transaction())
DBUG_RETURN(0);
int ret= 0;
Expand All @@ -401,11 +424,6 @@ static inline int wsrep_before_rollback(THD* thd, bool all)
{
if (!all && thd->in_active_multi_stmt_transaction())
{
if (wsrep_emulate_bin_log)
{
wsrep_thd_binlog_stmt_rollback(thd);
}

if (thd->wsrep_trx().is_streaming() &&
(wsrep_fragments_certified_for_stmt(thd) > 0))
{
Expand Down