Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support detach on transaction prepare #200

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dbsim/db_high_priority_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ const wsrep::transaction& db::high_priority_service::transaction() const
return client_.client_state().transaction();
}

wsrep::client_state& db::high_priority_service::client_state() const
{
return client_.client_state();
}

int db::high_priority_service::adopt_transaction(const wsrep::transaction&)
{
throw wsrep::not_implemented_error();
Expand Down
1 change: 1 addition & 0 deletions dbsim/db_high_priority_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace db
const wsrep::ws_meta&) override;
int next_fragment(const wsrep::ws_meta&) override;
const wsrep::transaction& transaction() const override;
wsrep::client_state& client_state() const override;
int adopt_transaction(const wsrep::transaction&) override;
int apply_write_set(const wsrep::ws_meta&,
const wsrep::const_buffer&,
Expand Down
20 changes: 17 additions & 3 deletions include/wsrep/client_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -519,17 +519,31 @@ namespace wsrep
* transaction disconnects, and the transaction must not rollback.
* After this call, a different client may later attempt to terminate
* the transaction by calling method commit_by_xid() or rollback_by_xid().
*
* @return Zero on success, non-zero if the transaction was BF aborted
*/
int before_xa_detach();

/**
* This method should be called to conclude the XA detach operation,
* after the DBMS has detached the transaction.
*
* @return Zero on success, non-zero if transaction was BF aborted
*/
int after_xa_detach();

/**
* @deprecated Use before_xa_detach() and after_xa_detach()
*/
void xa_detach();

/**
* Replay a XA transaction
*
* Replay a XA transaction that is in s_idle state.
* Replay a local XA transaction in s_idle state,
* or detached.
* This may happen if the transaction is BF aborted
* between prepare and commit.
* Since the victim is idle, this method can be called
* by the BF aborter or the backround rollbacker.
*/
void xa_replay();

Expand Down
5 changes: 5 additions & 0 deletions include/wsrep/high_priority_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ namespace wsrep
*/
virtual const wsrep::transaction& transaction() const = 0;

/**
* Return the associated client_state object
*/
virtual wsrep::client_state& client_state() const = 0;

/**
* Adopt a transaction.
*/
Expand Down
15 changes: 14 additions & 1 deletion include/wsrep/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ namespace wsrep
return !xid_.is_null();
}

/**
* Return true if the transaction has completed the prepare step.
*/
bool is_prepared_xa() const;

void assign_xid(const wsrep::xid& xid);

const wsrep::xid& xid() const
Expand All @@ -137,7 +142,9 @@ namespace wsrep

int commit_or_rollback_by_xid(const wsrep::xid& xid, bool commit);

void xa_detach();
int before_xa_detach(wsrep::unique_lock<mutex>&);

int after_xa_detach(wsrep::unique_lock<mutex>&);

int xa_replay(wsrep::unique_lock<wsrep::mutex>&);

Expand Down Expand Up @@ -213,6 +220,11 @@ namespace wsrep
return bf_aborted_in_total_order_;
}

wsrep::seqno bf_seqno() const
{
return bf_seqno_;
}

int flags() const
{
return flags_;
Expand Down Expand Up @@ -270,6 +282,7 @@ namespace wsrep
enum wsrep::provider::status bf_abort_provider_status_;
int bf_abort_client_state_;
bool bf_aborted_in_total_order_;
wsrep::seqno bf_seqno_;
wsrep::ws_handle ws_handle_;
wsrep::ws_meta ws_meta_;
int flags_;
Expand Down
80 changes: 70 additions & 10 deletions src/client_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "wsrep/server_state.hpp"
#include "wsrep/server_service.hpp"
#include "wsrep/client_service.hpp"
#include "wsrep/high_priority_service.hpp"

#include <unistd.h> // usleep()
#include <cassert>
Expand Down Expand Up @@ -450,8 +451,9 @@ void wsrep::client_state::sync_rollback_complete()
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
debug_log_state("sync_rollback_complete: enter");
assert(state_ == s_idle && mode_ == m_local &&
transaction_.state() == wsrep::transaction::s_aborted);
assert((state_ == s_idle && mode_ == m_local &&
transaction_.state() == wsrep::transaction::s_aborted) ||
mode_ == m_high_priority);
set_rollbacker_active(false);
cond_.notify_all();
debug_log_state("sync_rollback_complete: leave");
Expand All @@ -461,7 +463,7 @@ void wsrep::client_state::wait_rollback_complete_and_acquire_ownership()
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
debug_log_state("wait_rollback_complete_and_acquire_ownership: enter");
if (state_ == s_idle)
if (state_ == s_idle || mode_ == m_high_priority)
{
do_wait_rollback_complete_and_acquire_ownership(lock);
}
Expand Down Expand Up @@ -511,17 +513,72 @@ void wsrep::client_state::disable_streaming()
// XA //
//////////////////////////////////////////////////////////////////////////////

int wsrep::client_state::before_xa_detach()
{
int ret(0);
client_service_.debug_sync("wsrep_before_xa_detach_enter");
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(mode_ == m_local);
assert(state_ == s_none || state_ == s_exec || state_ == s_quitting);
if (transaction_.state() == wsrep::transaction::s_must_abort)
{
transaction_.state(lock, wsrep::transaction::s_must_replay);
lock.unlock();
client_service_.bf_rollback();
lock.lock();
ret = 1;
}
else
{
ret = transaction_.before_xa_detach(lock);
}
}
client_service_.debug_sync("wsrep_before_xa_detach_leave");
return ret;
}

int wsrep::client_state::after_xa_detach()
{
int ret(0);
client_service_.debug_sync("wsrep_after_xa_detach_enter");
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(mode_ == m_local);
if (transaction_.state() == wsrep::transaction::s_must_abort)
{
wsrep::high_priority_service* sa(
server_state_.find_streaming_applier(transaction_.server_id(),
transaction_.id()));
assert(sa);
if (sa)
{
wsrep::client_state& cs(sa->client_state());
cs.transaction_.state(lock, wsrep::transaction::s_must_abort);
cs.transaction_.state(lock, wsrep::transaction::s_must_replay);
cs.set_rollbacker_active(true);
lock.unlock();
server_state_.server_service().background_rollback(
sa->client_state());
lock.lock();
}
}
ret = transaction_.after_xa_detach(lock);
}
client_service_.debug_sync("wsrep_after_xa_detach_leave");
return ret;
}

void wsrep::client_state::xa_detach()
{
assert(mode_ == m_local);
assert(state_ == s_none || state_ == s_exec || state_ == s_quitting);
transaction_.xa_detach();
before_xa_detach();
after_xa_detach();
}

void wsrep::client_state::xa_replay()
{
assert(mode_ == m_local);
assert(state_ == s_idle);
assert((mode_ == m_local && state_ == s_idle) ||
(mode_ == m_high_priority));
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
transaction_.xa_replay(lock);
}
Expand Down Expand Up @@ -1000,13 +1057,16 @@ void wsrep::client_state::do_wait_rollback_complete_and_acquire_ownership(
wsrep::unique_lock<wsrep::mutex>& lock)
{
assert(lock.owns_lock());
assert(state_ == s_idle);
assert(state_ == s_idle || mode_ == m_high_priority);
while (is_rollbacker_active())
{
cond_.wait(lock);
}
do_acquire_ownership(lock);
state(lock, s_exec);
if (state_ == s_idle)
{
state(lock, s_exec);
}
}

void wsrep::client_state::update_last_written_gtid(const wsrep::gtid& gtid)
Expand Down
Loading