Skip to content

Commit

Permalink
raft: replace logSynced bool with leaderTerm
Browse files Browse the repository at this point in the history
Tracking the term of the leader with whom the log is consistent is more
straightforward than tracking an obscure bool. It also comens with extra
benefits when the TODO to add more safety checks to raftLog is done:
leaderTerm can be compared for establishing the order, whereas the bool
can only be true/false.

Initializing leaderTerm during the raftLog initialization is cheaper
compared to initializing it in becomeFollower(), because the former
happens only once, while the former can happen many times during the
lifetime of this node.

Signed-off-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
  • Loading branch information
pav-kv committed Jan 27, 2024
1 parent 68a7a9e commit 73c85a0
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 17 deletions.
34 changes: 34 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,35 @@ type raftLog struct {
// they will be saved into storage.
unstable unstable

// leaderTerm is a term of the leader with whom our log is "consistent". The
// log is guaranteed to be a prefix of this term's leader log.
//
// The leaderTerm can be safely updated to `t` if:
// 1. the last entry in the log has term `t`, or, more generally,
// 2. the last successful append was sent by the leader `t`.
//
// This is due to the following safety property (see raft paper §5.3):
//
// Log Matching: if two logs contain an entry with the same index and term,
// then the logs are identical in all entries up through the given index.
//
// We use (1) to initialize leaderTerm, and (2) to maintain it on updates.
//
// NB: (2) does not imply (1). If our log is behind the leader's log, the last
// entry term can be below leaderTerm.
//
// NB: leaderTerm does not necessarily match this raft node's term. It only
// does for the leader. For followers and candidates, when we first learn or
// bump to a new term, we don't have a proof that our log is consistent with
// the new term's leader (current or prospective). The new leader may override
// any suffix of the log after the committed index. Only when the first append
// from the new leader succeeds, we can update leaderTerm.
//
// During normal operation, leaderTerm matches the node term though. During a
// leader change, it briefly lags behind, and matches again when the first
// append message succeeds.
leaderTerm uint64

// committed is the highest log position that is known to be in
// stable storage on a quorum of nodes.
committed uint64
Expand Down Expand Up @@ -88,6 +117,11 @@ func newLogWithSize(storage Storage, logger Logger, maxApplyingEntsSize entryEnc
if err != nil {
panic(err) // TODO(bdarnell)
}
lastTerm, err := storage.Term(lastIndex)
if err != nil {
panic(err) // TODO(pav-kv)
}
log.leaderTerm = lastTerm
log.unstable.offset = lastIndex + 1
log.unstable.offsetInProgress = lastIndex + 1
log.unstable.logger = logger
Expand Down
25 changes: 8 additions & 17 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,12 +375,6 @@ type raft struct {

// the leader id
lead uint64
// logSynced is true if this node's log is guaranteed to be a prefix of the
// leader's log at this term. Always true for the leader. Always false for a
// candidate. For a follower, this is true if the last entry term matches the
// leader term, otherwise becomes true when the first MsgApp append from the
// leader succeeds.
logSynced bool
// leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10.
leadTransferee uint64
Expand Down Expand Up @@ -769,7 +763,6 @@ func (r *raft) reset(term uint64) {
r.Vote = None
}
r.lead = None
r.logSynced = false

r.electionElapsed = 0
r.heartbeatElapsed = 0
Expand Down Expand Up @@ -873,10 +866,6 @@ func (r *raft) becomeFollower(term uint64, lead uint64) {
r.reset(term)
r.tick = r.tickElection
r.lead = lead
// If the last entry term matches the leader term, the log is guaranteed to be
// a prefix of the leader's log. Otherwise, we will establish this guarantee
// later, on the first successful MsgApp.
r.logSynced = r.raftLog.lastTerm() == term
r.state = StateFollower
r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}
Expand Down Expand Up @@ -919,7 +908,6 @@ func (r *raft) becomeLeader() {
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.logSynced = true // the leader's log is in sync with itself
r.state = StateLeader
// Followers enter replicate mode when they've been successfully probed
// (perhaps after having received a snapshot as a result). The leader is
Expand Down Expand Up @@ -947,6 +935,8 @@ func (r *raft) becomeLeader() {
// so the preceding log append does not count against the uncommitted log
// quota of the new leader. In other words, after the call to appendEntry,
// r.uncommittedSize is still 0.

r.raftLog.leaderTerm = r.Term // the leader's log is consistent with itself
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}

Expand Down Expand Up @@ -1747,7 +1737,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
return
}
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.logSynced = true // from now on, the log is a prefix of the leader's log
r.raftLog.leaderTerm = m.Term // the log is now consistent with the leader
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
return
}
Expand Down Expand Up @@ -1787,10 +1777,10 @@ func (r *raft) handleHeartbeat(m pb.Message) {
// leader's log. Otherwise, entries at this index may mismatch.
//
// TODO(pav-kv): move this logic to r.raftLog, which is more appropriate for
// handling safety. The raftLog can use the logSynced flag for other safety
// checks. For example, unstable.truncateAndAppend currently may override a
// suffix of the log unconditionally, but it can only be done if !logSynced.
if r.logSynced {
// handling safety. The raftLog can use leaderTerm for other safety checks.
// For example, unstable.truncateAndAppend currently may override a suffix of
// the log unconditionally, but it can only be done if m.Term > leaderTerm.
if m.Term == r.raftLog.leaderTerm {
r.raftLog.commitTo(min(m.Commit, r.raftLog.lastIndex()))
}
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
Expand All @@ -1807,6 +1797,7 @@ func (r *raft) handleSnapshot(m pb.Message) {
if r.restore(s) {
r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, sindex, sterm)
r.raftLog.leaderTerm = m.Term // the log is now consistent with the leader
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
} else {
r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
Expand Down

0 comments on commit 73c85a0

Please sign in to comment.