Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add application defined sequential consistency for certification #237

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions dbsim/db_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ int db::client::client_command(F f)
return err;
}

static void release_commit_critical_section(void* ptr)
{
auto* crit = static_cast<db::server::commit_critical_section*>(ptr);
if (crit->lock.owns_lock())
{
crit->lock.unlock();
}
}

void db::client::run_one_transaction()
{
if (params_.sync_wait)
Expand Down Expand Up @@ -145,15 +154,35 @@ void db::client::run_one_transaction()
err = err || client_command(
[&]()
{
// wsrep::log_debug() << "Commit";
auto commit_crit = server_.get_commit_critical_section();
if (not params_.check_sequential_consistency) {
commit_crit.lock.unlock();
}

client_state_.append_data({&commit_crit.commit_seqno,
sizeof(commit_crit.commit_seqno)});

wsrep::provider::seq_cb seq_cb {
&commit_crit,
release_commit_critical_section
};

assert(err == 0);
if (do_2pc())
if (params_.do_2pc)
{
err = err || client_state_.before_prepare();
err = err || client_state_.before_prepare(&seq_cb);
err = err || client_state_.after_prepare();
}
err = err || client_state_.before_commit();
if (err == 0) se_trx_.commit(transaction.ws_meta().gtid());
err = err || client_state_.before_commit(&seq_cb);
if (err == 0)
{
se_trx_.commit(transaction.ws_meta().gtid());
if (params_.check_sequential_consistency)
{
server_.check_sequential_consistency(
client_state_.id(), commit_crit.commit_seqno);
}
}
err = err || client_state_.ordered_commit();
err = err || client_state_.after_commit();
if (err)
Expand Down
1 change: 0 additions & 1 deletion dbsim/db_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ namespace db
void start();
wsrep::client_state& client_state() { return client_state_; }
wsrep::client_service& client_service() { return client_service_; }
bool do_2pc() const { return false; }
private:
friend class db::server_state;
friend class db::client_service;
Expand Down
14 changes: 13 additions & 1 deletion dbsim/db_high_priority_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ db::high_priority_service::high_priority_service(
: wsrep::high_priority_service(server.server_state())
, server_(server)
, client_(client)
, commit_seqno_()
{ }

int db::high_priority_service::start_transaction(
Expand All @@ -52,11 +53,14 @@ int db::high_priority_service::adopt_transaction(const wsrep::transaction&)

int db::high_priority_service::apply_write_set(
const wsrep::ws_meta&,
const wsrep::const_buffer&,
const wsrep::const_buffer& buf,
wsrep::mutable_buffer&)
{
client_.se_trx_.start(&client_);
client_.se_trx_.apply(client_.client_state().transaction());
assert(buf.size() > sizeof(uint64_t));
::memcpy(&commit_seqno_, buf.data() + buf.size() - sizeof(uint64_t),
sizeof(uint64_t));
return 0;
}

Expand All @@ -82,6 +86,14 @@ int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle,
client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, true);
int ret(client_.client_state_.before_commit());
if (ret == 0) client_.se_trx_.commit(ws_meta.gtid());

/* Local client session replaying. */
if (ws_meta.server_id() == server_.server_state().id()
&& client_.params_.check_sequential_consistency)
{
server_.check_sequential_consistency(ws_meta.client_id(),
commit_seqno_);
}
ret = ret || client_.client_state_.ordered_commit();
ret = ret || client_.client_state_.after_commit();
return ret;
Expand Down
1 change: 1 addition & 0 deletions dbsim/db_high_priority_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ namespace db
high_priority_service& operator=(const high_priority_service&);
db::server& server_;
db::client& client_;
uint64_t commit_seqno_;
};

class replayer_service : public db::high_priority_service
Expand Down
6 changes: 6 additions & 0 deletions dbsim/db_params.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ db::params db::parse_args(int argc, char** argv)
"Configure TLS service stubs.\n0 default disabled\n1 enabled\n"
"2 enabled with short read/write and renegotiation simulation\n"
"3 enabled with error simulation.")
("check-sequential-consistency",
po::value<bool>(&params.check_sequential_consistency),
"Check if the provider provides sequential consistency")
("do-2pc",
po::value<bool>(&params.do_2pc),
"Run commits in 2pc")
;
try
{
Expand Down
57 changes: 21 additions & 36 deletions dbsim/db_params.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,42 +27,27 @@ namespace db
{
struct params
{
size_t n_servers;
size_t n_clients;
size_t n_transactions;
size_t n_rows;
size_t max_data_size; // Maximum size of write set data payload.
bool random_data_size; // If true, randomize data payload size.
size_t alg_freq;
bool sync_wait;
std::string topology;
std::string wsrep_provider;
std::string wsrep_provider_options;
std::string status_file;
int debug_log_level;
int fast_exit;
int thread_instrumentation;
bool cond_checks;
int tls_service;
params()
: n_servers(0)
, n_clients(0)
, n_transactions(0)
, n_rows(1000)
, max_data_size(8)
, random_data_size(false)
, alg_freq(0)
, sync_wait(false)
, topology()
, wsrep_provider()
, wsrep_provider_options()
, status_file("status.json")
, debug_log_level(0)
, fast_exit(0)
, thread_instrumentation()
, cond_checks()
, tls_service()
{ }
size_t n_servers{0};
size_t n_clients{0};
size_t n_transactions{0};
size_t n_rows{1000};
size_t max_data_size{8}; // Maximum size of write set data payload.
bool random_data_size{false}; // If true, randomize data payload size.
/* Asymmetric lock granularity frequency. */
size_t alg_freq{0};
/* Whether to sync wait before start of transaction. */
bool sync_wait{false};
std::string topology{};
std::string wsrep_provider{};
std::string wsrep_provider_options{};
std::string status_file{"status.json"};
int debug_log_level{0};
int fast_exit{0};
int thread_instrumentation{0};
bool cond_checks{false};
int tls_service{0};
bool check_sequential_consistency{false};
bool do_2pc{false};
};

params parse_args(int argc, char** argv);
Expand Down
16 changes: 16 additions & 0 deletions dbsim/db_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ db::server::server(simulator& simulator,
, appliers_()
, clients_()
, client_threads_()
, commit_mutex_()
, next_commit_seqno_()
, committed_seqno_()
{
wsrep::log::logger_fn(logger_fn);
}
Expand Down Expand Up @@ -165,3 +168,16 @@ void db::server::log_state_change(enum wsrep::server_state::state from,
wsrep::log_info() << "State changed " << from << " -> " << to;
reporter_.report_state(to);
}

void db::server::check_sequential_consistency(wsrep::client_id client_id,
uint64_t commit_seqno)
{
if (committed_seqno_ >= commit_seqno)
{
wsrep::log_error() << "Sequentiality violation for " << client_id
<< " commit seqno " << commit_seqno << " previous "
<< committed_seqno_;
::abort();
}
committed_seqno_ = commit_seqno;
}
25 changes: 25 additions & 0 deletions dbsim/db_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,27 @@ namespace db
wsrep::high_priority_service* streaming_applier_service();
void log_state_change(enum wsrep::server_state::state,
enum wsrep::server_state::state);

/* Sequential consistency checks */
struct commit_critical_section
{
wsrep::unique_lock<wsrep::default_mutex> lock;
uint64_t commit_seqno;
commit_critical_section(wsrep::default_mutex& mutex,
uint64_t& next_commit_seqno)
: lock{ mutex }
, commit_seqno{ ++next_commit_seqno }
{
}
commit_critical_section(commit_critical_section&&) = default;
};
commit_critical_section get_commit_critical_section() {
return { commit_mutex_, next_commit_seqno_ };
}
/* Check that commits remain sequential according commit_seqno.
* This method must be called inside commit order critical section. */
void check_sequential_consistency(wsrep::client_id client_id,
uint64_t commit_seqno);
private:
void start_client(size_t id);

Expand All @@ -76,6 +97,10 @@ namespace db
std::vector<boost::thread> appliers_;
std::vector<std::shared_ptr<db::client>> clients_;
std::vector<boost::thread> client_threads_;

wsrep::default_mutex commit_mutex_;
uint64_t next_commit_seqno_;
uint64_t committed_seqno_;
};
}

Expand Down
42 changes: 40 additions & 2 deletions include/wsrep/client_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,49 @@ namespace wsrep

/** @name Commit ordering interface */
/** @{ */
int before_prepare();

/**
* This method should be called before the transaction
* is prepared. This call certifies the transaction and
* assigns write set meta data.
*
* @param seq_cb Callback which is passed to underlying
* certify() call. See wsrep::provider::certify().
*
* @return Zero on success, non-zero on failure.
*/
int before_prepare(const wsrep::provider::seq_cb_t* seq_cb);

/** Same as before_prepare() above, but nullptr is passed
* to seq_cb. */
int before_prepare()
{
return before_prepare(nullptr);
}

int after_prepare();

int before_commit();
/**
* This method should be called before transaction is committed.
* This call makes the transaction to enter commit time
* critical section. The critical section is left by calling
* ordered_commit().
*
* If before_prepare() is not called before this call, the
* before_prepare() is called internally.
*
* @param seq_cb Callback which is passed to underlying
* before_prepare() call.
*
* @return Zero on success, non-zero on failure.
*/
int before_commit(const wsrep::provider::seq_cb_t* seq_cb);

/** Same as before_commit(), but nullptr is passed to seq_cb. */
int before_commit()
{
return before_commit(nullptr);
}

int ordered_commit();

Expand Down
33 changes: 30 additions & 3 deletions include/wsrep/provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,37 @@ namespace wsrep
virtual int append_key(wsrep::ws_handle&, const wsrep::key&) = 0;
virtual enum status append_data(
wsrep::ws_handle&, const wsrep::const_buffer&) = 0;

/**
* Callback for application defined sequential consistency.
* The provider will call
* the callback once it can guarantee sequential consistency. */
typedef struct seq_cb {
/** Opaque caller context */
void *ctx;
/** Function to be called by the provider when sequential
* consistency is guaranteed. */
void (*fn)(void *ctx);
} seq_cb_t;

/**
* Certify the write set.
*
* @param client_id[in] Id of the client session.
* @param ws_handle[in,out] Write set handle associated to the current
* transaction.
* @param flags[in] Flags associated to the write set (see struct flag).
* @param ws_meta[out] Write set meta data associated to the
* replicated write set.
* @param seq_cb[in] Optional callback for application defined
* sequential consistency.
*
* @return Status code defined in struct status.
*/
virtual enum status
certify(wsrep::client_id, wsrep::ws_handle&,
int,
wsrep::ws_meta&) = 0;
certify(wsrep::client_id client_id, wsrep::ws_handle& ws_handle,
int flags, wsrep::ws_meta& ws_meta, const seq_cb_t* seq_cb)
= 0;
/**
* BF abort a transaction inside provider.
*
Expand Down
8 changes: 5 additions & 3 deletions include/wsrep/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,12 @@ namespace wsrep

int after_row();

int before_prepare(wsrep::unique_lock<wsrep::mutex>&);
int before_prepare(wsrep::unique_lock<wsrep::mutex>&,
const wsrep::provider::seq_cb_t*);

int after_prepare(wsrep::unique_lock<wsrep::mutex>&);

int before_commit();
int before_commit(const wsrep::provider::seq_cb_t*);

int ordered_commit();

Expand Down Expand Up @@ -248,7 +249,8 @@ namespace wsrep
bool abort_or_interrupt(wsrep::unique_lock<wsrep::mutex>&);
int streaming_step(wsrep::unique_lock<wsrep::mutex>&, bool force = false);
int certify_fragment(wsrep::unique_lock<wsrep::mutex>&);
int certify_commit(wsrep::unique_lock<wsrep::mutex>&);
int certify_commit(wsrep::unique_lock<wsrep::mutex>&,
const wsrep::provider::seq_cb_t*);
int append_sr_keys_for_commit();
int release_commit_order(wsrep::unique_lock<wsrep::mutex>&);
void remove_fragments_in_storage_service_scope(
Expand Down
Loading
Loading