Skip to content

Commit

Permalink
Check local sequential consistency in dbsim
Browse files Browse the repository at this point in the history
- Release commit time critical section in callback
- Check the consistency inside commit order critical section

Other: Add 2pc switch to dbsim
  • Loading branch information
temeo committed Nov 28, 2024
1 parent 3de594b commit b063820
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 43 deletions.
39 changes: 34 additions & 5 deletions dbsim/db_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ int db::client::client_command(F f)
return err;
}

static void release_commit_critical_section(void* ptr)
{
auto* crit = static_cast<db::server::commit_critical_section*>(ptr);
if (crit->lock.owns_lock())
{
crit->lock.unlock();
}
}

void db::client::run_one_transaction()
{
if (params_.sync_wait)
Expand Down Expand Up @@ -145,15 +154,35 @@ void db::client::run_one_transaction()
err = err || client_command(
[&]()
{
// wsrep::log_debug() << "Commit";
auto commit_crit = server_.get_commit_critical_section();
if (not params_.check_sequential_consistency) {
commit_crit.lock.unlock();
}

client_state_.append_data({&commit_crit.commit_seqno,
sizeof(commit_crit.commit_seqno)});

wsrep::provider::seq_cb seq_cb {
&commit_crit,
release_commit_critical_section
};

assert(err == 0);
if (do_2pc())
if (params_.do_2pc)
{
err = err || client_state_.before_prepare();
err = err || client_state_.before_prepare(&seq_cb);
err = err || client_state_.after_prepare();
}
err = err || client_state_.before_commit();
if (err == 0) se_trx_.commit(transaction.ws_meta().gtid());
err = err || client_state_.before_commit(&seq_cb);
if (err == 0)
{
se_trx_.commit(transaction.ws_meta().gtid());
if (params_.check_sequential_consistency)
{
server_.check_sequential_consistency(
client_state_.id(), commit_crit.commit_seqno);
}
}
err = err || client_state_.ordered_commit();
err = err || client_state_.after_commit();
if (err)
Expand Down
1 change: 0 additions & 1 deletion dbsim/db_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ namespace db
void start();
wsrep::client_state& client_state() { return client_state_; }
wsrep::client_service& client_service() { return client_service_; }
bool do_2pc() const { return false; }
private:
friend class db::server_state;
friend class db::client_service;
Expand Down
13 changes: 12 additions & 1 deletion dbsim/db_high_priority_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,14 @@ int db::high_priority_service::adopt_transaction(const wsrep::transaction&)

int db::high_priority_service::apply_write_set(
const wsrep::ws_meta&,
const wsrep::const_buffer&,
const wsrep::const_buffer& buf,
wsrep::mutable_buffer&)
{
client_.se_trx_.start(&client_);
client_.se_trx_.apply(client_.client_state().transaction());
assert(buf.size() > sizeof(uint64_t));
::memcpy(&commit_seqno_, buf.data() + buf.size() - sizeof(uint64_t),
sizeof(uint64_t));
return 0;
}

Expand All @@ -82,6 +85,14 @@ int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle,
client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, true);
int ret(client_.client_state_.before_commit());
if (ret == 0) client_.se_trx_.commit(ws_meta.gtid());

/* Local client session replaying. */
if (ws_meta.server_id() == server_.server_state().id()
&& client_.params_.check_sequential_consistency)
{
server_.check_sequential_consistency(ws_meta.client_id(),
commit_seqno_);
}
ret = ret || client_.client_state_.ordered_commit();
ret = ret || client_.client_state_.after_commit();
return ret;
Expand Down
1 change: 1 addition & 0 deletions dbsim/db_high_priority_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ namespace db
high_priority_service& operator=(const high_priority_service&);
db::server& server_;
db::client& client_;
uint64_t commit_seqno_;
};

class replayer_service : public db::high_priority_service
Expand Down
6 changes: 6 additions & 0 deletions dbsim/db_params.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ db::params db::parse_args(int argc, char** argv)
"Configure TLS service stubs.\n0 default disabled\n1 enabled\n"
"2 enabled with short read/write and renegotiation simulation\n"
"3 enabled with error simulation.")
("check-sequential-consistency",
po::value<bool>(&params.check_sequential_consistency),
"Check if the provider provides sequential consistency")
("do-2pc",
po::value<bool>(&params.do_2pc),
"Run commits in 2pc")
;
try
{
Expand Down
57 changes: 21 additions & 36 deletions dbsim/db_params.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,42 +27,27 @@ namespace db
{
struct params
{
size_t n_servers;
size_t n_clients;
size_t n_transactions;
size_t n_rows;
size_t max_data_size; // Maximum size of write set data payload.
bool random_data_size; // If true, randomize data payload size.
size_t alg_freq;
bool sync_wait;
std::string topology;
std::string wsrep_provider;
std::string wsrep_provider_options;
std::string status_file;
int debug_log_level;
int fast_exit;
int thread_instrumentation;
bool cond_checks;
int tls_service;
params()
: n_servers(0)
, n_clients(0)
, n_transactions(0)
, n_rows(1000)
, max_data_size(8)
, random_data_size(false)
, alg_freq(0)
, sync_wait(false)
, topology()
, wsrep_provider()
, wsrep_provider_options()
, status_file("status.json")
, debug_log_level(0)
, fast_exit(0)
, thread_instrumentation()
, cond_checks()
, tls_service()
{ }
size_t n_servers{0};
size_t n_clients{0};
size_t n_transactions{0};
size_t n_rows{1000};
size_t max_data_size{8}; // Maximum size of write set data payload.
bool random_data_size{false}; // If true, randomize data payload size.
/* Asymmetric lock granularity frequency. */
size_t alg_freq{0};
/* Whether to sync wait before start of transaction. */
bool sync_wait{false};
std::string topology{};
std::string wsrep_provider{};
std::string wsrep_provider_options{};
std::string status_file{"status.json"};
int debug_log_level{0};
int fast_exit{0};
int thread_instrumentation{0};
bool cond_checks{false};
int tls_service{0};
bool check_sequential_consistency{false};
bool do_2pc{false};
};

params parse_args(int argc, char** argv);
Expand Down
16 changes: 16 additions & 0 deletions dbsim/db_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ db::server::server(simulator& simulator,
, appliers_()
, clients_()
, client_threads_()
, commit_mutex_()
, next_commit_seqno_()
, committed_seqno_()
{
wsrep::log::logger_fn(logger_fn);
}
Expand Down Expand Up @@ -165,3 +168,16 @@ void db::server::log_state_change(enum wsrep::server_state::state from,
wsrep::log_info() << "State changed " << from << " -> " << to;
reporter_.report_state(to);
}

void db::server::check_sequential_consistency(wsrep::client_id client_id,
uint64_t commit_seqno)
{
if (committed_seqno_ >= commit_seqno)
{
wsrep::log_error() << "Sequentiality violation for " << client_id
<< " commit seqno " << commit_seqno << " previous "
<< committed_seqno_;
::abort();
}
committed_seqno_ = commit_seqno;
}
25 changes: 25 additions & 0 deletions dbsim/db_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,27 @@ namespace db
wsrep::high_priority_service* streaming_applier_service();
void log_state_change(enum wsrep::server_state::state,
enum wsrep::server_state::state);

/* Sequential consistency checks */
struct commit_critical_section
{
wsrep::unique_lock<wsrep::default_mutex> lock;
uint64_t commit_seqno;
commit_critical_section(wsrep::default_mutex& mutex,
uint64_t& next_commit_seqno)
: lock{ mutex }
, commit_seqno{ ++next_commit_seqno }
{
}
commit_critical_section(commit_critical_section&&) = default;
};
commit_critical_section get_commit_critical_section() {
return { commit_mutex_, next_commit_seqno_ };
}
/* Check that commits remain sequential according commit_seqno.
* This method must be called inside commit order critical section. */
void check_sequential_consistency(wsrep::client_id client_id,
uint64_t commit_seqno);
private:
void start_client(size_t id);

Expand All @@ -76,6 +97,10 @@ namespace db
std::vector<boost::thread> appliers_;
std::vector<std::shared_ptr<db::client>> clients_;
std::vector<boost::thread> client_threads_;

wsrep::default_mutex commit_mutex_;
uint64_t next_commit_seqno_;
uint64_t committed_seqno_;
};
}

Expand Down

0 comments on commit b063820

Please sign in to comment.