diff --git a/dbsim/db_client.cpp b/dbsim/db_client.cpp index b5d56d37..97be6d8e 100644 --- a/dbsim/db_client.cpp +++ b/dbsim/db_client.cpp @@ -91,6 +91,15 @@ int db::client::client_command(F f) return err; } +static void release_commit_critical_section(void* ptr) +{ + auto* crit = static_cast(ptr); + if (crit->lock.owns_lock()) + { + crit->lock.unlock(); + } +} + void db::client::run_one_transaction() { if (params_.sync_wait) @@ -145,15 +154,35 @@ void db::client::run_one_transaction() err = err || client_command( [&]() { - // wsrep::log_debug() << "Commit"; + auto commit_crit = server_.get_commit_critical_section(); + if (not params_.check_sequential_consistency) { + commit_crit.lock.unlock(); + } + + client_state_.append_data({&commit_crit.commit_seqno, + sizeof(commit_crit.commit_seqno)}); + + wsrep::provider::seq_cb seq_cb { + &commit_crit, + release_commit_critical_section + }; + assert(err == 0); - if (do_2pc()) + if (params_.do_2pc) { - err = err || client_state_.before_prepare(); + err = err || client_state_.before_prepare(&seq_cb); err = err || client_state_.after_prepare(); } - err = err || client_state_.before_commit(); - if (err == 0) se_trx_.commit(transaction.ws_meta().gtid()); + err = err || client_state_.before_commit(&seq_cb); + if (err == 0) + { + se_trx_.commit(transaction.ws_meta().gtid()); + if (params_.check_sequential_consistency) + { + server_.check_sequential_consistency( + client_state_.id(), commit_crit.commit_seqno); + } + } err = err || client_state_.ordered_commit(); err = err || client_state_.after_commit(); if (err) diff --git a/dbsim/db_client.hpp b/dbsim/db_client.hpp index 5536a449..ca93a187 100644 --- a/dbsim/db_client.hpp +++ b/dbsim/db_client.hpp @@ -62,7 +62,6 @@ namespace db void start(); wsrep::client_state& client_state() { return client_state_; } wsrep::client_service& client_service() { return client_service_; } - bool do_2pc() const { return false; } private: friend class db::server_state; friend class db::client_service; diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp index 669fe502..d56a0bde 100644 --- a/dbsim/db_high_priority_service.cpp +++ b/dbsim/db_high_priority_service.cpp @@ -52,11 +52,14 @@ int db::high_priority_service::adopt_transaction(const wsrep::transaction&) int db::high_priority_service::apply_write_set( const wsrep::ws_meta&, - const wsrep::const_buffer&, + const wsrep::const_buffer& buf, wsrep::mutable_buffer&) { client_.se_trx_.start(&client_); client_.se_trx_.apply(client_.client_state().transaction()); + assert(buf.size() > sizeof(uint64_t)); + ::memcpy(&commit_seqno_, buf.data() + buf.size() - sizeof(uint64_t), + sizeof(uint64_t)); return 0; } @@ -82,6 +85,14 @@ int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle, client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, true); int ret(client_.client_state_.before_commit()); if (ret == 0) client_.se_trx_.commit(ws_meta.gtid()); + + /* Local client session replaying. */ + if (ws_meta.server_id() == server_.server_state().id() + && client_.params_.check_sequential_consistency) + { + server_.check_sequential_consistency(ws_meta.client_id(), + commit_seqno_); + } ret = ret || client_.client_state_.ordered_commit(); ret = ret || client_.client_state_.after_commit(); return ret; diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index d4a80f1b..f5309c09 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -69,6 +69,7 @@ namespace db high_priority_service& operator=(const high_priority_service&); db::server& server_; db::client& client_; + uint64_t commit_seqno_; }; class replayer_service : public db::high_priority_service diff --git a/dbsim/db_params.cpp b/dbsim/db_params.cpp index 40433f6c..b7a036bd 100644 --- a/dbsim/db_params.cpp +++ b/dbsim/db_params.cpp @@ -95,6 +95,12 @@ db::params db::parse_args(int argc, char** argv) "Configure TLS service stubs.\n0 default disabled\n1 enabled\n" "2 enabled with short read/write and renegotiation simulation\n" "3 enabled with error simulation.") + ("check-sequential-consistency", + po::value(¶ms.check_sequential_consistency), + "Check if the provider provides sequential consistency") + ("do-2pc", + po::value(¶ms.do_2pc), + "Run commits in 2pc") ; try { diff --git a/dbsim/db_params.hpp b/dbsim/db_params.hpp index e5df8062..6e7a4188 100644 --- a/dbsim/db_params.hpp +++ b/dbsim/db_params.hpp @@ -27,42 +27,27 @@ namespace db { struct params { - size_t n_servers; - size_t n_clients; - size_t n_transactions; - size_t n_rows; - size_t max_data_size; // Maximum size of write set data payload. - bool random_data_size; // If true, randomize data payload size. - size_t alg_freq; - bool sync_wait; - std::string topology; - std::string wsrep_provider; - std::string wsrep_provider_options; - std::string status_file; - int debug_log_level; - int fast_exit; - int thread_instrumentation; - bool cond_checks; - int tls_service; - params() - : n_servers(0) - , n_clients(0) - , n_transactions(0) - , n_rows(1000) - , max_data_size(8) - , random_data_size(false) - , alg_freq(0) - , sync_wait(false) - , topology() - , wsrep_provider() - , wsrep_provider_options() - , status_file("status.json") - , debug_log_level(0) - , fast_exit(0) - , thread_instrumentation() - , cond_checks() - , tls_service() - { } + size_t n_servers{0}; + size_t n_clients{0}; + size_t n_transactions{0}; + size_t n_rows{1000}; + size_t max_data_size{8}; // Maximum size of write set data payload. + bool random_data_size{false}; // If true, randomize data payload size. + /* Asymmetric lock granularity frequency. */ + size_t alg_freq{0}; + /* Whether to sync wait before start of transaction. */ + bool sync_wait{false}; + std::string topology{}; + std::string wsrep_provider{}; + std::string wsrep_provider_options{}; + std::string status_file{"status.json"}; + int debug_log_level{0}; + int fast_exit{0}; + int thread_instrumentation{0}; + bool cond_checks{false}; + int tls_service{0}; + bool check_sequential_consistency{false}; + bool do_2pc{false}; }; params parse_args(int argc, char** argv); diff --git a/dbsim/db_server.cpp b/dbsim/db_server.cpp index a54610d1..b9fd0ef7 100644 --- a/dbsim/db_server.cpp +++ b/dbsim/db_server.cpp @@ -68,6 +68,9 @@ db::server::server(simulator& simulator, , appliers_() , clients_() , client_threads_() + , commit_mutex_() + , next_commit_seqno_() + , committed_seqno_() { wsrep::log::logger_fn(logger_fn); } @@ -165,3 +168,16 @@ void db::server::log_state_change(enum wsrep::server_state::state from, wsrep::log_info() << "State changed " << from << " -> " << to; reporter_.report_state(to); } + +void db::server::check_sequential_consistency(wsrep::client_id client_id, + uint64_t commit_seqno) +{ + if (committed_seqno_ >= commit_seqno) + { + wsrep::log_error() << "Sequentiality violation for " << client_id + << " commit seqno " << commit_seqno << " previous " + << committed_seqno_; + ::abort(); + } + committed_seqno_ = commit_seqno; +} diff --git a/dbsim/db_server.hpp b/dbsim/db_server.hpp index 98b9a837..6aff7856 100644 --- a/dbsim/db_server.hpp +++ b/dbsim/db_server.hpp @@ -61,6 +61,27 @@ namespace db wsrep::high_priority_service* streaming_applier_service(); void log_state_change(enum wsrep::server_state::state, enum wsrep::server_state::state); + + /* Sequential consistency checks */ + struct commit_critical_section + { + wsrep::unique_lock lock; + uint64_t commit_seqno; + commit_critical_section(wsrep::default_mutex& mutex, + uint64_t& next_commit_seqno) + : lock{ mutex } + , commit_seqno{ ++next_commit_seqno } + { + } + commit_critical_section(commit_critical_section&&) = default; + }; + commit_critical_section get_commit_critical_section() { + return { commit_mutex_, next_commit_seqno_ }; + } + /* Check that commits remain sequential according commit_seqno. + * This method must be called inside commit order critical section. */ + void check_sequential_consistency(wsrep::client_id client_id, + uint64_t commit_seqno); private: void start_client(size_t id); @@ -76,6 +97,10 @@ namespace db std::vector appliers_; std::vector> clients_; std::vector client_threads_; + + wsrep::default_mutex commit_mutex_; + uint64_t next_commit_seqno_; + uint64_t committed_seqno_; }; }