diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp index 669fe502..76ed482a 100644 --- a/dbsim/db_high_priority_service.cpp +++ b/dbsim/db_high_priority_service.cpp @@ -45,6 +45,11 @@ const wsrep::transaction& db::high_priority_service::transaction() const return client_.client_state().transaction(); } +wsrep::client_state& db::high_priority_service::client_state() const +{ + return client_.client_state(); +} + int db::high_priority_service::adopt_transaction(const wsrep::transaction&) { throw wsrep::not_implemented_error(); diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index d4a80f1b..df75f622 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -34,6 +34,7 @@ namespace db const wsrep::ws_meta&) override; int next_fragment(const wsrep::ws_meta&) override; const wsrep::transaction& transaction() const override; + wsrep::client_state& client_state() const override; int adopt_transaction(const wsrep::transaction&) override; int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&, diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 8c62d0bf..22aeaa73 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -519,17 +519,31 @@ namespace wsrep * transaction disconnects, and the transaction must not rollback. * After this call, a different client may later attempt to terminate * the transaction by calling method commit_by_xid() or rollback_by_xid(). + * + * @return Zero on success, non-zero if the transaction was BF aborted + */ + int before_xa_detach(); + + /** + * This method should be called to conclude the XA detach operation, + * after the DBMS has detached the transaction. + * + * @return Zero on success, non-zero if transaction was BF aborted + */ + int after_xa_detach(); + + /** + * @deprecated Use before_xa_detach() and after_xa_detach() */ void xa_detach(); /** * Replay a XA transaction * - * Replay a XA transaction that is in s_idle state. + * Replay a local XA transaction in s_idle state, + * or detached. * This may happen if the transaction is BF aborted * between prepare and commit. - * Since the victim is idle, this method can be called - * by the BF aborter or the backround rollbacker. */ void xa_replay(); diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index 79b9f09f..6810b93d 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -63,6 +63,11 @@ namespace wsrep */ virtual const wsrep::transaction& transaction() const = 0; + /** + * Return the associated client_state object + */ + virtual wsrep::client_state& client_state() const = 0; + /** * Adopt a transaction. */ diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 26fff7b1..cde36f34 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -126,6 +126,11 @@ namespace wsrep return !xid_.is_null(); } + /** + * Return true if the transaction has completed the prepare step. + */ + bool is_prepared_xa() const; + void assign_xid(const wsrep::xid& xid); const wsrep::xid& xid() const @@ -137,7 +142,9 @@ namespace wsrep int commit_or_rollback_by_xid(const wsrep::xid& xid, bool commit); - void xa_detach(); + int before_xa_detach(wsrep::unique_lock&); + + int after_xa_detach(wsrep::unique_lock&); int xa_replay(wsrep::unique_lock&); @@ -213,6 +220,11 @@ namespace wsrep return bf_aborted_in_total_order_; } + wsrep::seqno bf_seqno() const + { + return bf_seqno_; + } + int flags() const { return flags_; @@ -270,6 +282,7 @@ namespace wsrep enum wsrep::provider::status bf_abort_provider_status_; int bf_abort_client_state_; bool bf_aborted_in_total_order_; + wsrep::seqno bf_seqno_; wsrep::ws_handle ws_handle_; wsrep::ws_meta ws_meta_; int flags_; diff --git a/src/client_state.cpp b/src/client_state.cpp index efa390cb..2c693eae 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -23,6 +23,7 @@ #include "wsrep/server_state.hpp" #include "wsrep/server_service.hpp" #include "wsrep/client_service.hpp" +#include "wsrep/high_priority_service.hpp" #include // usleep() #include @@ -450,8 +451,9 @@ void wsrep::client_state::sync_rollback_complete() { wsrep::unique_lock lock(mutex_); debug_log_state("sync_rollback_complete: enter"); - assert(state_ == s_idle && mode_ == m_local && - transaction_.state() == wsrep::transaction::s_aborted); + assert((state_ == s_idle && mode_ == m_local && + transaction_.state() == wsrep::transaction::s_aborted) || + mode_ == m_high_priority); set_rollbacker_active(false); cond_.notify_all(); debug_log_state("sync_rollback_complete: leave"); @@ -461,7 +463,7 @@ void wsrep::client_state::wait_rollback_complete_and_acquire_ownership() { wsrep::unique_lock lock(mutex_); debug_log_state("wait_rollback_complete_and_acquire_ownership: enter"); - if (state_ == s_idle) + if (state_ == s_idle || mode_ == m_high_priority) { do_wait_rollback_complete_and_acquire_ownership(lock); } @@ -511,17 +513,72 @@ void wsrep::client_state::disable_streaming() // XA // ////////////////////////////////////////////////////////////////////////////// +int wsrep::client_state::before_xa_detach() +{ + int ret(0); + client_service_.debug_sync("wsrep_before_xa_detach_enter"); + { + wsrep::unique_lock lock(mutex_); + assert(mode_ == m_local); + assert(state_ == s_none || state_ == s_exec || state_ == s_quitting); + if (transaction_.state() == wsrep::transaction::s_must_abort) + { + transaction_.state(lock, wsrep::transaction::s_must_replay); + lock.unlock(); + client_service_.bf_rollback(); + lock.lock(); + ret = 1; + } + else + { + ret = transaction_.before_xa_detach(lock); + } + } + client_service_.debug_sync("wsrep_before_xa_detach_leave"); + return ret; +} + +int wsrep::client_state::after_xa_detach() +{ + int ret(0); + client_service_.debug_sync("wsrep_after_xa_detach_enter"); + { + wsrep::unique_lock lock(mutex_); + assert(mode_ == m_local); + if (transaction_.state() == wsrep::transaction::s_must_abort) + { + wsrep::high_priority_service* sa( + server_state_.find_streaming_applier(transaction_.server_id(), + transaction_.id())); + assert(sa); + if (sa) + { + wsrep::client_state& cs(sa->client_state()); + cs.transaction_.state(lock, wsrep::transaction::s_must_abort); + cs.transaction_.state(lock, wsrep::transaction::s_must_replay); + cs.set_rollbacker_active(true); + lock.unlock(); + server_state_.server_service().background_rollback( + sa->client_state()); + lock.lock(); + } + } + ret = transaction_.after_xa_detach(lock); + } + client_service_.debug_sync("wsrep_after_xa_detach_leave"); + return ret; +} + void wsrep::client_state::xa_detach() { - assert(mode_ == m_local); - assert(state_ == s_none || state_ == s_exec || state_ == s_quitting); - transaction_.xa_detach(); + before_xa_detach(); + after_xa_detach(); } void wsrep::client_state::xa_replay() { - assert(mode_ == m_local); - assert(state_ == s_idle); + assert((mode_ == m_local && state_ == s_idle) || + (mode_ == m_high_priority)); wsrep::unique_lock lock(mutex_); transaction_.xa_replay(lock); } @@ -1000,13 +1057,16 @@ void wsrep::client_state::do_wait_rollback_complete_and_acquire_ownership( wsrep::unique_lock& lock) { assert(lock.owns_lock()); - assert(state_ == s_idle); + assert(state_ == s_idle || mode_ == m_high_priority); while (is_rollbacker_active()) { cond_.wait(lock); } do_acquire_ownership(lock); - state(lock, s_exec); + if (state_ == s_idle) + { + state(lock, s_exec); + } } void wsrep::client_state::update_last_written_gtid(const wsrep::gtid& gtid) diff --git a/src/transaction.cpp b/src/transaction.cpp index c864a0fa..ece312ca 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -101,6 +101,7 @@ wsrep::transaction::transaction( , bf_abort_provider_status_() , bf_abort_client_state_() , bf_aborted_in_total_order_() + , bf_seqno_() , ws_handle_() , ws_meta_() , flags_() @@ -716,10 +717,17 @@ int wsrep::transaction::before_rollback() break; case wsrep::client_state::m_high_priority: // Rollback by rollback write set or BF abort - assert(state_ == s_executing || state_ == s_prepared || state_ == s_aborting); - if (state_ != s_aborting) + switch (state_) { + case s_executing: + case s_prepared: state(lock, s_aborting); + break; + case s_must_replay: + case s_aborting: + break; + default: + assert(0); } break; default: @@ -929,7 +937,7 @@ void wsrep::transaction::after_command_must_abort( if (is_xa() && is_streaming()) { - xa_replay(lock); + xa_replay_commit(lock); } else { @@ -1004,6 +1012,7 @@ bool wsrep::transaction::bf_abort( << " successfully BF aborted " << id_ << " victim_seqno " << victim_seqno); bf_abort_state_ = state_at_enter; + bf_seqno_ = bf_seqno; state(lock, s_must_abort); ret = true; break; @@ -1117,8 +1126,16 @@ void wsrep::transaction::assign_xid(const wsrep::xid& xid) xid_ = xid; } +bool wsrep::transaction::is_prepared_xa() const +{ + return (state() == s_prepared) + || (is_xa() && state() == s_replaying) + || (state() == s_must_abort && bf_abort_state_ == s_prepared); +} + int wsrep::transaction::restore_to_prepared_state(const wsrep::xid& xid) { + debug_log_state("restore_to_prepared_state enter"); wsrep::unique_lock lock(client_state_.mutex_); assert(active()); assert(is_empty()); @@ -1134,6 +1151,7 @@ int wsrep::transaction::restore_to_prepared_state(const wsrep::xid& xid) } state(lock, s_prepared); xid_ = xid; + debug_log_state("restore_to_prepared_state leave"); return 0; } @@ -1166,12 +1184,16 @@ int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid, client_state_.id()); wsrep::ws_meta meta(stid); + lock.unlock(); + client_service_.debug_sync("wsrep_before_certification"); const enum wsrep::provider::status cert_ret( provider().certify(client_state_.id(), ws_handle_, flags(), meta)); + lock.lock(); + int ret; if (cert_ret == wsrep::provider::success) { @@ -1201,25 +1223,38 @@ int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid, return ret; } -void wsrep::transaction::xa_detach() +int wsrep::transaction::before_xa_detach(wsrep::unique_lock& lock) { - debug_log_state("xa_detach enter"); - assert(state() == s_prepared || - client_state_.state() == wsrep::client_state::s_quitting); + debug_log_state("before_xa_detach enter"); + assert(lock.owns_lock()); + assert (state() == s_prepared || + client_state_.state() == wsrep::client_state::s_quitting); if (state() == s_prepared) { wsrep::server_state& server_state(client_state_.server_state()); + lock.unlock(); + client_service_.debug_sync("wsrep_before_xa_detach_convert_client"); server_state.convert_streaming_client_to_applier(&client_state_); client_service_.store_globals(); client_service_.cleanup_transaction(); + lock.lock(); } - wsrep::unique_lock lock(client_state_.mutex_); - streaming_context_.cleanup(); + debug_log_state("before_xa_detach leave"); + return 0; +} + +int wsrep::transaction::after_xa_detach(wsrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + debug_log_state("after_xa_detach enter"); + assert(state() == s_must_abort || state() == s_prepared); state(lock, s_aborting); state(lock, s_aborted); provider().release(ws_handle_); + streaming_context_.cleanup(); cleanup(); - debug_log_state("xa_detach leave"); + debug_log_state("after_xa_detach leave"); + return 0; } void wsrep::transaction::xa_replay_common(wsrep::unique_lock& lock) @@ -1228,15 +1263,22 @@ void wsrep::transaction::xa_replay_common(wsrep::unique_lock& lock assert(is_xa()); assert(is_streaming()); assert(state() == s_must_replay); - assert(bf_aborted()); state(lock, s_replaying); enum wsrep::provider::status status; wsrep::server_state& server_state(client_state_.server_state()); + // Convert to streaming applier an replay. If transaction + // is detaching, streaming applier has been created + // already. Perhaps we should have a better way to tell + // if a transaction is detaching. lock.unlock(); - server_state.convert_streaming_client_to_applier(&client_state_); + if (client_state_.mode() == wsrep::client_state::m_local && + !server_state.find_streaming_applier(server_id_, id_)) + { + server_state.convert_streaming_client_to_applier(&client_state_); + } status = client_service_.replay_unordered(); client_service_.store_globals(); lock.lock(); @@ -1251,10 +1293,13 @@ int wsrep::transaction::xa_replay(wsrep::unique_lock& lock) { debug_log_state("xa_replay enter"); xa_replay_common(lock); - state(lock, s_aborted); - streaming_context_.cleanup(); - provider().release(ws_handle_); - cleanup(); + if (client_state_.mode() == wsrep::client_state::m_local) + { + state(lock, s_aborted); + streaming_context_.cleanup(); + provider().release(ws_handle_); + cleanup(); + } client_service_.signal_replayed(); debug_log_state("xa_replay leave"); return 0; @@ -1263,15 +1308,19 @@ int wsrep::transaction::xa_replay(wsrep::unique_lock& lock) int wsrep::transaction::xa_replay_commit(wsrep::unique_lock& lock) { debug_log_state("xa_replay_commit enter"); + enum wsrep::provider::status status(wsrep::provider::success); xa_replay_common(lock); - lock.unlock(); - enum wsrep::provider::status status(client_service_.commit_by_xid()); - lock.lock(); + if (client_state_.mode() == wsrep::client_state::m_local) + { + lock.unlock(); + status = client_service_.commit_by_xid(); + lock.lock(); + } int ret(1); switch (status) { case wsrep::provider::success: - state(lock, s_committed); + state(lock, s_aborted); streaming_context_.cleanup(); provider().release(ws_handle_); cleanup(); @@ -2087,6 +2136,7 @@ void wsrep::transaction::cleanup() bf_abort_provider_status_ = wsrep::provider::success; bf_abort_client_state_ = 0; bf_aborted_in_total_order_ = false; + bf_seqno_ = wsrep::seqno::undefined(); ws_meta_ = wsrep::ws_meta(); flags_ = 0; certified_ = false; diff --git a/test/mock_high_priority_service.hpp b/test/mock_high_priority_service.hpp index caf40b2c..a97d9cc7 100644 --- a/test/mock_high_priority_service.hpp +++ b/test/mock_high_priority_service.hpp @@ -85,7 +85,7 @@ namespace wsrep bool is_replaying() const WSREP_OVERRIDE { return replaying_; } void debug_crash(const char*) WSREP_OVERRIDE { /* Not in unit tests*/} - wsrep::client_state& client_state() + wsrep::client_state& client_state() const WSREP_OVERRIDE { return *client_state_; } diff --git a/test/transaction_test_xa.cpp b/test/transaction_test_xa.cpp index d9fbad27..7275f50d 100644 --- a/test/transaction_test_xa.cpp +++ b/test/transaction_test_xa.cpp @@ -63,7 +63,8 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_detach_commit_by_xid, BOOST_REQUIRE(sc.provider().fragments() == 1); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); - cc1.xa_detach(); + cc1.before_xa_detach(); + cc1.after_xa_detach(); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_aborted); BOOST_REQUIRE(cc1.after_statement() == 0); @@ -97,7 +98,8 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_detach_rollback_by_xid, BOOST_REQUIRE(sc.provider().fragments() == 1); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); - cc1.xa_detach(); + cc1.before_xa_detach(); + cc1.after_xa_detach(); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_aborted); BOOST_REQUIRE(cc1.after_statement() == 0);