diff --git a/galera/src/certification.cpp b/galera/src/certification.cpp index a087e2df7..dfd200a93 100644 --- a/galera/src/certification.cpp +++ b/galera/src/certification.cpp @@ -808,7 +808,7 @@ galera::Certification::do_test_preordered(TrxHandle* trx) } -galera::Certification::Certification(gu::Config& conf, ServiceThd& thd) +galera::Certification::Certification(gu::Config& conf, ServiceThd& thd, gcache::GCache& gcache) : version_ (-1), trx_map_ (), @@ -816,6 +816,7 @@ galera::Certification::Certification(gu::Config& conf, ServiceThd& thd) cert_index_ng_ (), deps_set_ (), service_thd_ (thd), + gcache_ (gcache), mutex_ (), trx_size_warn_count_ (0), initial_position_ (-1), @@ -1066,7 +1067,7 @@ wsrep_seqno_t galera::Certification::set_trx_committed(TrxHandle* trx) deps_set_.erase(i); } - if (gu_unlikely(index_purge_required())) + if (gu_unlikely(gcache_.cleanup_required() || index_purge_required())) { ret = get_safe_to_discard_seqno_(); } diff --git a/galera/src/certification.hpp b/galera/src/certification.hpp index cac4d4b14..d42b2e417 100644 --- a/galera/src/certification.hpp +++ b/galera/src/certification.hpp @@ -48,7 +48,7 @@ namespace galera TEST_FAILED } TestResult; - Certification(gu::Config& conf, ServiceThd& thd); + Certification(gu::Config& conf, ServiceThd& thd, gcache::GCache& gcache); ~Certification(); void assign_initial_position(wsrep_seqno_t seqno, int versiono); @@ -183,6 +183,7 @@ namespace galera CertIndexNG cert_index_ng_; DepsSet deps_set_; ServiceThd& service_thd_; + gcache::GCache& gcache_; gu::Mutex mutex_; size_t trx_size_warn_count_; wsrep_seqno_t initial_position_; diff --git a/galera/src/galera_gcs.hpp b/galera/src/galera_gcs.hpp index c3cb271a4..4f3eb367f 100644 --- a/galera/src/galera_gcs.hpp +++ b/galera/src/galera_gcs.hpp @@ -43,7 +43,8 @@ namespace galera virtual ssize_t replv(const WriteSetVector&, gcs_action& act, bool) = 0; virtual ssize_t repl (gcs_action& act, bool) = 0; - virtual gcs_seqno_t caused() = 0; + virtual void caused(gcs_seqno_t& seqno, + gu::datetime::Date& wait_until) = 0; virtual ssize_t schedule() = 0; virtual ssize_t interrupt(ssize_t) = 0; virtual ssize_t resume_recv() = 0; @@ -141,7 +142,23 @@ namespace galera return gcs_repl(conn_, &act, scheduled); } - gcs_seqno_t caused() { return gcs_caused(conn_); } + void caused(gcs_seqno_t& seqno, gu::datetime::Date& wait_until) + { + long err; + + while ((err = gcs_caused(conn_, seqno)) == -EAGAIN && + gu::datetime::Date::calendar() < wait_until) + { + usleep(1000); + } + + if (err == -EAGAIN) err = -ETIMEDOUT; + + if (err < 0) + { + gu_throw_error(-err); + } + } ssize_t schedule() { return gcs_schedule(conn_); } @@ -233,6 +250,11 @@ namespace galera size_t max_action_size() const { return GCS_MAX_ACT_SIZE; } + void join_notification() + { + gcs_join_notification(conn_); + } + private: Gcs(const Gcs&); @@ -311,7 +333,10 @@ namespace galera return ret; } - gcs_seqno_t caused() { return global_seqno_; } + void caused(gcs_seqno_t& seqno, gu::datetime::Date& wait_until) + { + seqno = global_seqno_; + } ssize_t schedule() { diff --git a/galera/src/galera_service_thd.cpp b/galera/src/galera_service_thd.cpp index cb9e5e877..8c04770ef 100644 --- a/galera/src/galera_service_thd.cpp +++ b/galera/src/galera_service_thd.cpp @@ -52,21 +52,43 @@ galera::ServiceThd::thd_func (void* arg) { if (data.act_ & A_LAST_COMMITTED) { - ssize_t const ret - (st->gcs_.set_last_applied(data.last_committed_)); + static const size_t max_set_attempts(4); + size_t attempts = 0; + ssize_t ret; - if (gu_unlikely(ret < 0)) + do { - log_warn << "Failed to report last committed " - << data.last_committed_ << ", " << ret - << " (" << strerror (-ret) << ')'; - // @todo: figure out what to do in this case + ret = st->gcs_.set_last_applied(data.last_committed_); + + if (gu_likely(ret != -EINTR)) + { + break; + } + + attempts++; + + // gcs_set_last_applied() may return EINTR if the send + // monitor was interruped, this is not an error and + // in this case there is no need to display a warning: + log_debug << "Reporting of last committed was " + "interrupted: " + << data.last_committed_ + << "\nRetrying " << attempts << "th time"; } - else + while (attempts != max_set_attempts); + + if (gu_likely(ret >= 0)) { log_debug << "Reported last committed: " << data.last_committed_; } + else + { + log_warn << "Failed to report last committed " + << data.last_committed_ << ", " << ret + << " (" << strerror (-ret) << ')'; + // @todo: figure out what to do in this case + } } if (data.act_ & A_RELEASE_SEQNO) diff --git a/galera/src/replicator_smm.cpp b/galera/src/replicator_smm.cpp index 041b2ba3b..07b6afc68 100644 --- a/galera/src/replicator_smm.cpp +++ b/galera/src/replicator_smm.cpp @@ -164,7 +164,7 @@ galera::ReplicatorSMM::ReplicatorSMM(const struct wsrep_init_args* args) ist_receiver_ (config_, slave_pool_, args->node_address), ist_senders_ (gcs_, gcache_), wsdb_ (), - cert_ (config_, service_thd_), + cert_ (config_, service_thd_, gcache_), local_monitor_ (), apply_monitor_ (), commit_monitor_ (), @@ -859,8 +859,8 @@ wsrep_status_t galera::ReplicatorSMM::replay_trx(TrxHandle* trx, void* trx_ctx) ApplyOrder ao(*trx); gu_trace(apply_monitor_.enter(ao)); trx->set_state(TrxHandle::S_MUST_REPLAY_CM); - // fall through } + // fall through case TrxHandle::S_MUST_REPLAY_CM: if (co_mode_ != CommitOrder::BYPASS) { @@ -895,6 +895,13 @@ wsrep_status_t galera::ReplicatorSMM::replay_trx(TrxHandle* trx, void* trx_ctx) catch (gu::Exception& e) { st_.mark_corrupt(); + + /* Before doing a graceful exit ensure that node isolate itself + from the cluster. This will cause the quorum to re-evaluate + and if minority nodes are left with different set of data + they can turn non-Primary to avoid further data consistency issue. */ + param_set("gmcast.isolate", "1"); + throw; } @@ -967,12 +974,19 @@ wsrep_status_t galera::ReplicatorSMM::post_rollback(TrxHandle* trx) wsrep_status_t galera::ReplicatorSMM::causal_read(wsrep_gtid_t* gtid) { - wsrep_seqno_t cseq(static_cast(gcs_.caused())); + wsrep_seqno_t cseq; + gu::datetime::Date wait_until(gu::datetime::Date::calendar() + + causal_read_timeout_); - if (cseq < 0) + try { - log_warn << "gcs_caused() returned " << cseq << " (" << strerror(-cseq) - << ')'; + gcs_.caused(cseq, wait_until); + assert(cseq >= 0); + } + catch (gu::Exception& e) + { + log_warn << "gcs_caused() returned " << -e.get_errno() + << " (" << strerror(e.get_errno()) << ")"; return WSREP_TRX_FAIL; } @@ -985,8 +999,6 @@ wsrep_status_t galera::ReplicatorSMM::causal_read(wsrep_gtid_t* gtid) // at monitor drain and disallowing further waits until // configuration change related operations (SST etc) have been // finished. - gu::datetime::Date wait_until(gu::datetime::Date::calendar() - + causal_read_timeout_); if (gu_likely(co_mode_ != CommitOrder::BYPASS)) { commit_monitor_.wait(cseq, wait_until); @@ -1198,6 +1210,13 @@ galera::ReplicatorSMM::sst_sent(const wsrep_gtid_t& state_id, int const rcode) if (state_() != S_DONOR) { log_error << "sst sent called when not SST donor, state " << state_(); + /* If sst_sent() fails node should restore itself back to the joined + state. The sst_sent function can fail. commonly due to network errors, + where DONOR may lose connectivity to JOINER (or existing cluster). + But on re-join it should restore the original state without waiting + for transition to JOINER state (DONOR->JOINER->JOINED->SYNCED). + SST failure on JOINER will gracefully shutdown the joiner.*/ + gcs_.join_notification(); return WSREP_CONN_FAIL; } @@ -1247,7 +1266,14 @@ void galera::ReplicatorSMM::process_trx(void* recv_ctx, TrxHandle* trx) log_fatal << "Failed to apply trx: " << *trx; log_fatal << e.what(); - log_fatal << "Node consistency compromized, aborting..."; + log_fatal << "Node consistency compromised, aborting..."; + + /* Before doing a graceful exit ensure that node isolate itself + from the cluster. This will cause the quorum to re-evaluate + and if minority nodes are left with different set of data + they can turn non-Primary to avoid further data consistency issue. */ + param_set("gmcast.isolate", "1"); + abort(); } break; @@ -1412,6 +1438,12 @@ galera::ReplicatorSMM::process_conf_change(void* recv_ctx, size_t app_req_len(0); const_cast(view_info).state_gap = st_required; + + // We need to set the protocol version BEFORE the view callback, so that + // any version-dependent code is run using the correct version instead of -1. + if (view_info.view >= 0) // Primary configuration + establish_protocol_versions (repl_proto); + wsrep_cb_status_t const rcode( view_cb_(app_ctx_, recv_ctx, &view_info, 0, 0, &app_req, &app_req_len)); @@ -1420,6 +1452,7 @@ galera::ReplicatorSMM::process_conf_change(void* recv_ctx, assert(app_req_len <= 0); log_fatal << "View callback failed. This is unrecoverable, " << "restart required."; + local_monitor_.leave(lo); close(); abort(); } @@ -1428,14 +1461,13 @@ galera::ReplicatorSMM::process_conf_change(void* recv_ctx, log_fatal << "Local state UUID " << state_uuid_ << " is different from group state UUID " << group_uuid << ", and SST request is null: restart required."; + local_monitor_.leave(lo); close(); abort(); } if (view_info.view >= 0) // Primary configuration { - establish_protocol_versions (repl_proto); - // we have to reset cert initial position here, SST does not contain // cert index yet (see #197). // Also this must be done before releasing GCache buffers. @@ -1528,6 +1560,7 @@ galera::ReplicatorSMM::process_conf_change(void* recv_ctx, { log_fatal << "Internal error: unexpected next state for " << "non-prim: " << next_state << ". Restart required."; + local_monitor_.leave(lo); close(); abort(); } @@ -1870,6 +1903,6 @@ galera::ReplicatorSMM::update_state_uuid (const wsrep_uuid_t& uuid) void galera::ReplicatorSMM::abort() { - gcs_.close(); + close(); gu_abort(); } diff --git a/galera/src/replicator_str.cpp b/galera/src/replicator_str.cpp index cb8272269..568f79710 100644 --- a/galera/src/replicator_str.cpp +++ b/galera/src/replicator_str.cpp @@ -536,7 +536,9 @@ ReplicatorSMM::prepare_state_request (const void* const sst_req, } catch (gu::Exception& e) { - log_warn + log_info << "State gap can't be serviced using IST." + " Switching to SST"; + log_info << "Failed to prepare for incremental state transfer: " << e.what() << ". IST will be unavailable."; } diff --git a/galera/tests/write_set_check.cpp b/galera/tests/write_set_check.cpp index b85708012..9943fc2e5 100644 --- a/galera/tests/write_set_check.cpp +++ b/galera/tests/write_set_check.cpp @@ -51,6 +51,7 @@ namespace gu::Config& conf() { return conf_; } galera::ServiceThd& thd() { return thd_; } + gcache::GCache& gcache() { return gcache_; } private: @@ -408,7 +409,7 @@ START_TEST(test_cert_hierarchical_v1) size_t nws(sizeof(wsi)/sizeof(wsi[0])); TestEnv env; - galera::Certification cert(env.conf(), env.thd()); + galera::Certification cert(env.conf(), env.thd(), env.gcache()); int const version(1); cert.assign_initial_position(0, version); galera::TrxHandle::Params const trx_params("", version,KeySet::MAX_VERSION); @@ -530,7 +531,7 @@ START_TEST(test_cert_hierarchical_v2) size_t nws(sizeof(wsi)/sizeof(wsi[0])); TestEnv env; - galera::Certification cert(env.conf(), env.thd()); + galera::Certification cert(env.conf(), env.thd(), env.gcache()); cert.assign_initial_position(0, version); galera::TrxHandle::Params const trx_params("", version,KeySet::MAX_VERSION); @@ -580,7 +581,8 @@ START_TEST(test_trac_726) const int version(2); TestEnv env; - galera::Certification cert(env.conf(), env.thd()); + galera::Certification cert(env.conf(), env.thd(), env.gcache()); + galera::TrxHandle::Params const trx_params("", version,KeySet::MAX_VERSION); wsrep_uuid_t uuid1 = {{1, }}; wsrep_uuid_t uuid2 = {{2, }}; diff --git a/gcache/src/GCache.hpp b/gcache/src/GCache.hpp index d1f61e1bb..ef74127de 100644 --- a/gcache/src/GCache.hpp +++ b/gcache/src/GCache.hpp @@ -99,6 +99,14 @@ namespace gcache int64_t& seqno_d, ssize_t& size); + /*! + * Implements the cleanup policy test. + */ + bool cleanup_required() + { + return (params.keep_pages_size() && ps.total_size() > params.keep_pages_size()); + } + class Buffer { public: diff --git a/gcs/src/SConscript b/gcs/src/SConscript index 6566e5ab6..9d91b61b3 100644 --- a/gcs/src/SConscript +++ b/gcs/src/SConscript @@ -7,6 +7,7 @@ libgcs_env = env.Clone() # Include paths libgcs_env.Append(CPPPATH = Split(''' + #/common #/galerautils/src #/gcomm/src #/gcache/src @@ -15,18 +16,18 @@ libgcs_env.Append(CPPPATH = Split(''' # Backends (TODO: Get from global options) libgcs_env.Append(CPPFLAGS = ' -DGCS_USE_GCOMM') # For C-style logging -libgcs_env.Append(CPPFLAGS = ' -DGALERA_LOG_H_ENABLE_CXX -Wno-variadic-macros') +libgcs_env.Append(CPPFLAGS = ' -DGALERA_LOG_H_ENABLE_CXX') # Disable old style cast warns until code is fixed -libgcs_env.Append(CPPFLAGS = ' -Wno-old-style-cast') +libgcs_env.Replace(CXXFLAGS = libgcs_env['CXXFLAGS'].replace('-Wold-style-cast', '')) +libgcs_env.Replace(CXXFLAGS = libgcs_env['CXXFLAGS'].replace('-Weffc++', '')) # Allow zero sized arrays libgcs_env.Replace(CCFLAGS = libgcs_env['CCFLAGS'].replace('-pedantic', '')) -libgcs_env.Append(CPPFLAGS = ' -Wno-missing-field-initializers') -libgcs_env.Append(CPPFLAGS = ' -Wno-effc++') +libgcs_env.Append(CCFLAGS = ' -Wno-missing-field-initializers') +libgcs_env.Append(CCFLAGS = ' -Wno-variadic-macros') -print libgcs_env['CFLAGS'] -print libgcs_env['CCFLAGS'] -print libgcs_env['CPPFLAGS'] -print libgcs_env['CXXFLAGS'] +print('gcs flags:') +for f in ['CFLAGS', 'CXXFLAGS', 'CCFLAGS', 'CPPFLAGS']: + print(f + ': ' + libgcs_env[f].strip()) gcs4garb_env = libgcs_env.Clone() diff --git a/gcs/src/gcs.cpp b/gcs/src/gcs.cpp index f038bcb50..62ef9e9eb 100644 --- a/gcs/src/gcs.cpp +++ b/gcs/src/gcs.cpp @@ -175,6 +175,10 @@ struct gcs_conn /* #603, #606 join control */ bool volatile need_to_join; gcs_seqno_t volatile join_seqno; + void join_notification() + { + need_to_join = true; + } /* sync control */ bool sync_sent_; @@ -880,17 +884,14 @@ _join (gcs_conn_t* conn, gcs_seqno_t seqno) while (-EAGAIN == (err = gcs_core_send_join (conn->core, seqno))) usleep (10000); - switch (err) + if (gu_unlikely(err < 0)) { - case -ENOTCONN: gu_warn ("Sending JOIN failed: %d (%s). " "Will retry in new primary component.", err, strerror(-err)); - case 0: - return 0; - default: - gu_error ("Sending JOIN failed: %d (%s).", err, strerror(-err)); return err; } + + return 0; } /*! Handles configuration action */ @@ -1035,8 +1036,8 @@ gcs_handle_act_state_req (gcs_conn_t* conn, /*! Allocates buffer with malloc to pass to the upper layer. */ static long -gcs_handle_state_change (gcs_conn_t* conn, - const struct gcs_act* act) +gcs_handle_state_change (gcs_conn_t* conn, + struct gcs_act* act) { gu_debug ("Got '%s' dated %lld", gcs_act_type_to_str (act->type), gcs_seqno_gtoh(*(gcs_seqno_t*)act->buf)); @@ -1378,14 +1379,11 @@ static void *gcs_recv_thread (void *arg) } else if (conn->my_idx == rcvd.sender_idx) { - gu_fatal("Protocol violation: unordered local action not in repl_q:" - " { {%p, %zd, %s}, %ld, %lld }.", + gu_debug("Discarding: unordered local action not in repl_q: " + "{ {%p, %zd, %s}, %ld, %lld }.", rcvd.act.buf, rcvd.act.buf_len, gcs_act_type_to_str(rcvd.act.type), rcvd.sender_idx, rcvd.id); - assert(0); - ret = -ENOTRECOVERABLE; - break; } else { @@ -1405,9 +1403,12 @@ static void *gcs_recv_thread (void *arg) } else if (ret < 0) { + /* We must set connection state to 'closed' to avoid the race + condition between gcs_recv_thread() and gcs_recv(), which + could lead to assertion in gcs_recv: */ + gcs_shift_state (conn, GCS_CONN_CLOSED); /* In case of error call _close() to release repl_q waiters. */ (void)_close(conn, false); - gcs_shift_state (conn, GCS_CONN_CLOSED); } gu_info ("RECV thread exiting %d: %s", ret, strerror(-ret)); return NULL; @@ -1597,9 +1598,9 @@ long gcs_interrupt (gcs_conn_t* conn, long handle) return gcs_sm_interrupt (conn->sm, handle); } -gcs_seqno_t gcs_caused(gcs_conn_t* conn) +long gcs_caused(gcs_conn_t* conn, gcs_seqno_t& seqno) { - return gcs_core_caused(conn->core); + return gcs_core_caused(conn->core, seqno); } /* Puts action in the send queue and returns after it is replicated */ @@ -1998,10 +1999,29 @@ gcs_set_last_applied (gcs_conn_t* conn, gcs_seqno_t seqno) long gcs_join (gcs_conn_t* conn, gcs_seqno_t seqno) { - conn->join_seqno = seqno; - conn->need_to_join = true; + // Even when node is evicted from the cluster in middle of SST, + // the SST may completes normally. After this, the node calls + // the gcs_join function and tries to join the cluster. However, + // this is impossible, because the node is already evicted. + // Therefore, the _join() function (which called from gcs_join) + // fails. Then node does IST (which also fails), after/during + // which it is aborted. To fix this, we should avoid joining + // the cluster through gcs_join function if node is evicted. + // To do this, we should check the current connection state + // in the gcs_join() function to return from it immediately + // if the node's communication channel was closed: + + if (conn->state < GCS_CONN_CLOSED) + { + conn->join_seqno = seqno; + conn->need_to_join = true; - return _join (conn, seqno); + return _join (conn, seqno); + } + else + { + return GCS_CLOSED_ERROR; + } } gcs_seqno_t gcs_local_sequence(gcs_conn_t* conn) @@ -2295,3 +2315,8 @@ const char* gcs_param_get (gcs_conn_t* conn, const char* key) return NULL; } + +void gcs_join_notification(gcs_conn_t* conn) +{ + conn->need_to_join = true; +} diff --git a/gcs/src/gcs.hpp b/gcs/src/gcs.hpp index 8e0e58bad..3af3eed9f 100644 --- a/gcs/src/gcs.hpp +++ b/gcs/src/gcs.hpp @@ -280,9 +280,11 @@ extern long gcs_resume_recv (gcs_conn_t* conn); * After action with this seqno is applied, this thread is guaranteed to see * all the changes made by the client, even on other nodes. * - * @return global sequence number or negative error code + * @retval 0 success + * @retval -EPERM operation not permitted (in NON_PRIMARY state) + * @retval -EAGAIN operation may be retried later (in transient state) */ -extern gcs_seqno_t gcs_caused(gcs_conn_t* conn); +extern long gcs_caused (gcs_conn_t* conn, gcs_seqno_t& seqno); /*! @brief Sends state transfer request * Broadcasts state transfer request which will be passed to one of the @@ -451,6 +453,8 @@ extern void gcs_flush_stats(gcs_conn_t *conn); void gcs_get_status(gcs_conn_t* conn, gu::Status& status); +extern void gcs_join_notification(gcs_conn_t *conn); + /*! A node with this name will be treated as a stateless arbitrator */ #define GCS_ARBITRATOR_NAME "garb" diff --git a/gcs/src/gcs_core.cpp b/gcs/src/gcs_core.cpp index 060922135..5af4c0cc0 100644 --- a/gcs/src/gcs_core.cpp +++ b/gcs/src/gcs_core.cpp @@ -97,6 +97,7 @@ core_act_t; typedef struct causal_act { gcs_seqno_t* act_id; + long* error; gu_mutex_t* mtx; gu_cond_t* cond; } causal_act_t; @@ -677,12 +678,18 @@ core_handle_last_msg (gcs_core_t* core, gcs_group_handle_last_msg (&core->group, msg); if (commit_cut) { /* commit cut changed */ - if ((act->buf = malloc (sizeof (commit_cut)))) { - act->type = GCS_ACT_COMMIT_CUT; + + int const buf_len(sizeof(commit_cut)); + void* const buf(malloc(buf_len)); + + if (gu_likely(NULL != (buf))) { /* #701 - everything that goes into the action buffer * is expected to be serialized. */ - *((gcs_seqno_t*)act->buf) = gcs_seqno_htog(commit_cut); - act->buf_len = sizeof(commit_cut); + *((gcs_seqno_t*)buf) = gcs_seqno_htog(commit_cut); + assert(NULL == act->buf); + act->buf = buf; + act->buf_len = buf_len; + act->type = GCS_ACT_COMMIT_CUT; return act->buf_len; } else { @@ -812,6 +819,7 @@ core_handle_comp_msg (gcs_core_t* core, "WAIT_STATE_MSG. Can't continue."); ret = -ENOTRECOVERABLE; assert(0); + // fall through default: gu_fatal ("Failed to handle component message: %d (%s)!", ret, strerror (-ret)); @@ -1021,23 +1029,33 @@ core_msg_to_action (gcs_core_t* core, static long core_msg_causal(gcs_core_t* conn, struct gcs_recv_msg* msg) { - causal_act_t* act; - if (gu_unlikely(msg->size != sizeof(*act))) + if (gu_unlikely(msg->size != sizeof(causal_act_t))) { gu_error("invalid causal act len %ld, expected %ld", - msg->size, sizeof(*act)); + msg->size, sizeof(causal_act_t)); return -EPROTO; } - gcs_seqno_t const causal_seqno = - GCS_GROUP_PRIMARY == conn->group.state ? - conn->group.act_id_ : GCS_SEQNO_ILL; - - act = (causal_act_t*)msg->buf; + causal_act_t* act= (causal_act_t*)msg->buf; gu_mutex_lock(act->mtx); - *act->act_id = causal_seqno; - gu_cond_signal(act->cond); + { + switch (conn->group.state) + { + case GCS_GROUP_PRIMARY: + *act->act_id = conn->group.act_id_; + break; + case GCS_GROUP_WAIT_STATE_UUID: + case GCS_GROUP_WAIT_STATE_MSG: + *act->error = -EAGAIN; + break; + default: + *act->error = -EPERM; + } + + gu_cond_signal(act->cond); + } gu_mutex_unlock(act->mtx); + return msg->size; } @@ -1324,20 +1342,22 @@ gcs_core_send_fc (gcs_core_t* core, const void* fc, size_t fc_size) return ret; } -gcs_seqno_t -gcs_core_caused(gcs_core_t* core) +long +gcs_core_caused (gcs_core_t* core, gcs_seqno_t& seqno) { - long ret; - gcs_seqno_t act_id = GCS_SEQNO_ILL; + long error = 0; gu_mutex_t mtx; gu_cond_t cond; - causal_act_t act = {&act_id, &mtx, &cond}; + causal_act_t act = {&seqno, &error, &mtx, &cond}; gu_mutex_init (&mtx, NULL); gu_cond_init (&cond, NULL); gu_mutex_lock (&mtx); { - ret = core_msg_send_retry (core, &act, sizeof(act), GCS_MSG_CAUSAL); + long ret = core_msg_send_retry (core, + &act, + sizeof(act), + GCS_MSG_CAUSAL); if (ret == sizeof(act)) { @@ -1346,14 +1366,14 @@ gcs_core_caused(gcs_core_t* core) else { assert (ret < 0); - act_id = ret; + error = ret; } } gu_mutex_unlock (&mtx); gu_mutex_destroy (&mtx); gu_cond_destroy (&cond); - return act_id; + return error; } long diff --git a/gcs/src/gcs_core.hpp b/gcs/src/gcs_core.hpp index 1b3bc9456..df3038d34 100644 --- a/gcs/src/gcs_core.hpp +++ b/gcs/src/gcs_core.hpp @@ -154,8 +154,8 @@ gcs_core_send_sync (gcs_core_t* core, gcs_seqno_t seqno); extern long gcs_core_send_fc (gcs_core_t* core, const void* fc, size_t fc_size); -extern gcs_seqno_t -gcs_core_caused(gcs_core_t* core); +extern long +gcs_core_caused (gcs_core_t* core, gcs_seqno_t& seqno); extern long gcs_core_param_set (gcs_core_t* core, const char* key, const char* value); diff --git a/gcs/src/gcs_gcomm.cpp b/gcs/src/gcs_gcomm.cpp index fc4b80873..8acc6410c 100644 --- a/gcs/src/gcs_gcomm.cpp +++ b/gcs/src/gcs_gcomm.cpp @@ -223,7 +223,7 @@ class GCommConn : public Consumer, public Toplay error_ = ENOTCONN; int err; - if ((err = pthread_create(&thd_, 0, &run_fn, this)) != 0) + if ((err = gu_thread_create(&thd_, 0, &run_fn, this)) != 0) { gu_throw_error(err) << "Failed to create thread"; } @@ -301,11 +301,18 @@ class GCommConn : public Consumer, public Toplay pthread_join(thd_, 0); { gcomm::Critical crit(*net_); - log_info << "gcomm: closing backend"; - tp_->close(error_ != 0 || force == true); - gcomm::disconnect(tp_, this); - delete tp_; - tp_ = 0; + if (tp_ == 0) + { + log_info << "gcomm: backend already closed"; + } + else + { + log_info << "gcomm: closing backend"; + tp_->close(error_ != 0 || force == true); + gcomm::disconnect(tp_, this); + delete tp_; + tp_ = 0; + } } const Message* msg; @@ -704,6 +711,7 @@ static GCS_BACKEND_RECV_FN(gcomm_recv) if (cm_size <= msg->buf_len) { memcpy(msg->buf, cm, cm_size); + msg->size = cm_size; recv_buf.pop_front(); msg->type = GCS_MSG_COMPONENT; } diff --git a/gcs/src/gcs_group.cpp b/gcs/src/gcs_group.cpp index 2e6eb40ed..15fa329c4 100644 --- a/gcs/src/gcs_group.cpp +++ b/gcs/src/gcs_group.cpp @@ -255,7 +255,7 @@ group_check_donor (gcs_group_t* group) gu_warn ("Donor %s is no longer in the group. State transfer cannot " "be completed, need to abort. Aborting...", donor_id); - gu_abort(); + // gu_abort(); } return; @@ -342,6 +342,8 @@ group_post_state_exchange (gcs_group_t* group) } assert (group->prim_num > 0); + + group_redo_last_applied(group); } else { // non-primary configuration @@ -757,9 +759,15 @@ gcs_group_handle_join_msg (gcs_group_t* group, const gcs_recv_msg_t* msg) } } else { - gu_info ("%d.%d (%s): State transfer %s %d.%d (%s) complete.", - sender_idx, sender->segment, sender->name, st_dir, - peer_idx, peer ? peer->segment : -1, peer_name); + if (GCS_NODE_STATE_JOINED == sender->status) { + gu_info ("%d.%d (%s): State transfer %s %d.%d (%s) complete.", + sender_idx, sender->segment, sender->name, st_dir, + peer_idx, peer ? peer->segment : -1, peer_name); + } + else { + assert(sender->desync_count > 0); + return 0; // don't deliver up + } } } } diff --git a/gcs/src/gcs_node.cpp b/gcs/src/gcs_node.cpp index 49a49d8f7..c9d0ac08f 100644 --- a/gcs/src/gcs_node.cpp +++ b/gcs/src/gcs_node.cpp @@ -181,6 +181,7 @@ gcs_node_update_status (gcs_node_t* node, const gcs_state_quorum_t* quorum) else { node->desync_count = 1; } + // fall through case GCS_NODE_STATE_SYNCED: node->count_last_applied = true; break; diff --git a/gcs/src/gcs_test.cpp b/gcs/src/gcs_test.cpp index c6472d544..fae878ad4 100644 --- a/gcs/src/gcs_test.cpp +++ b/gcs/src/gcs_test.cpp @@ -644,15 +644,19 @@ static long gcs_test_conf (gcs_test_conf_t *conf, long argc, char *argv[]) case 6: conf->n_recv = strtol (argv[5], &endptr, 10); if ('\0' != *endptr) goto error; + // fall through case 5: conf->n_send = strtol (argv[4], &endptr, 10); if ('\0' != *endptr) goto error; + // fall through case 4: conf->n_repl = strtol (argv[3], &endptr, 10); if ('\0' != *endptr) goto error; + // fall through case 3: conf->n_tries = strtol (argv[2], &endptr, 10); if ('\0' != *endptr) goto error; + // fall through case 2: conf->backend = argv[1]; break; diff --git a/gcs/src/unit_tests/SConscript b/gcs/src/unit_tests/SConscript index 57f33fad7..b4cd3f0f0 100644 --- a/gcs/src/unit_tests/SConscript +++ b/gcs/src/unit_tests/SConscript @@ -5,6 +5,7 @@ env = check_env.Clone() # Include paths env.Append(CPPPATH = Split(''' + #/common #/galerautils/src #/gcache/src #/gcs/src