diff --git a/log.go b/log.go index bd7c2feb..b581ca0a 100644 --- a/log.go +++ b/log.go @@ -105,63 +105,76 @@ func (l *raftLog) String() string { // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, // it returns (last index of new entries, true). func (l *raftLog) maybeAppend(a logSlice, committed uint64) (lastnewi uint64, ok bool) { - if !l.matchTerm(a.prev) { + match, ok := l.findConflict(a) + if !ok { return 0, false } - // TODO(pav-kv): propagate logSlice down the stack. It will be used all the - // way down in unstable, for safety checks, and for useful bookkeeping. - - lastnewi = a.prev.index + uint64(len(a.entries)) - ci := l.findConflict(a.entries) - switch { - case ci == 0: - case ci <= l.committed: - l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) - default: - offset := a.prev.index + 1 - if ci-offset > uint64(len(a.entries)) { - l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(a.entries)) - } - l.append(a.entries[ci-offset:]...) - } - l.commitTo(min(committed, lastnewi)) - return lastnewi, true + + // Fast-forward to the first mismatching or missing entry. + // NB: prev.index <= match.index <= a.lastIndex(), so the sub-slicing is safe. + a.entries = a.entries[match.index-a.prev.index:] + a.prev = match + + // TODO(pav-kv): pass the logSlice down the stack, for safety checks and + // bookkeeping in the unstable structure. + l.append(a.entries...) + l.commitTo(min(committed, a.lastIndex())) + return a.lastIndex(), true } func (l *raftLog) append(ents ...pb.Entry) uint64 { if len(ents) == 0 { return l.lastIndex() } - if after := ents[0].Index - 1; after < l.committed { - l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed) + if first := ents[0].Index; first <= l.committed { + l.logger.Panicf("entry %d is already committed [committed(%d)]", first, l.committed) } l.unstable.truncateAndAppend(ents) return l.lastIndex() } -// findConflict finds the index of the conflict. -// It returns the first pair of conflicting entries between the existing -// entries and the given entries, if there are any. -// If there is no conflicting entries, and the existing entries contains -// all the given entries, zero will be returned. -// If there is no conflicting entries, but the given entries contains new -// entries, the index of the first new entry will be returned. -// An entry is considered to be conflicting if it has the same index but -// a different term. -// The index of the given entries MUST be continuously increasing. -func (l *raftLog) findConflict(ents []pb.Entry) uint64 { - for i := range ents { - if id := pbEntryID(&ents[i]); !l.matchTerm(id) { - if id.index <= l.lastIndex() { - // TODO(pav-kv): can simply print %+v of the id. This will change the - // log format though. - l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]", - id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term) - } - return id.index +// findConflict finds the last entry in the given log slice that matches the +// log. The next entry either mismatches, or is missing. +// +// If the slice partially/fully matches, this method returns true. The returned +// entryID is the ID of the last matching entry. It can be s.prev if it is the +// only matching entry. It is guaranteed that the returned entryID.index is in +// the [s.prev.index, s.lastIndex()] range. +// +// All the entries up to the returned entryID are already present in the log, +// and do not need to be appended again. The caller can safely fast-forward an +// append request to the next entry after it. +// +// Returns false if the given slice mismatches the log entirely, i.e. the s.prev +// entry has a mismatching entryID.term. In this case an append request can not +// proceed. +func (l *raftLog) findConflict(s logSlice) (entryID, bool) { + // TODO(pav-kv): add a fast-path here. If s.term == raftLog.lastTerm, we can + // skip the match checks entirely. We can double-check only the last entry + // match, to be sure, but it is not necessary if raft invariants are true. + if !l.matchTerm(s.prev) { + return entryID{}, false + } + + // TODO(pav-kv): every matchTerm call in the linear scan below can fall back + // to fetching an entry from storage. This is inefficient, we can improve it. + // NB: logs that don't match at one index, don't match at all indices above. + // So we can use binary search to find the fork. + match := s.prev + for i := range s.entries { + id := pbEntryID(&s.entries[i]) + if l.matchTerm(id) { + match = id + continue } + if id.index <= l.lastIndex() { + // TODO(pav-kv): should simply print %+v of the id. + l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]", + id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term) + } + return match, true } - return 0 + return match, true // all entries match } // findConflictByTerm returns a best guess on where this log ends matching diff --git a/log_test.go b/log_test.go index 90ae9388..8deed904 100644 --- a/log_test.go +++ b/log_test.go @@ -25,32 +25,49 @@ import ( func TestFindConflict(t *testing.T) { previousEnts := index(1).terms(1, 2, 3) - tests := []struct { - ents []pb.Entry - wconflict uint64 + ids := make([]entryID, 1, len(previousEnts)+1) // dummy (0, 0) at index 0 + for i := range previousEnts { + ids = append(ids, pbEntryID(&previousEnts[i])) + } + for _, tt := range []struct { + prev entryID + ents []pb.Entry + notOk bool + want entryID }{ - // no conflict, empty ent - {nil, 0}, + // no conflict, empty entries + {ents: nil, want: ids[0]}, + // prev does not match the log + {prev: entryID{term: 10, index: 1}, notOk: true}, // no conflict - {index(1).terms(1, 2, 3), 0}, - {index(2).terms(2, 3), 0}, - {index(3).terms(3), 0}, + {prev: ids[0], ents: index(1).terms(1, 2, 3), want: ids[3]}, + {prev: ids[1], ents: index(2).terms(2, 3), want: ids[3]}, + {prev: ids[2], ents: index(3).terms(3), want: ids[3]}, // no conflict, but has new entries - {index(1).terms(1, 2, 3, 4, 4), 4}, - {index(2).terms(2, 3, 4, 5), 4}, - {index(3).terms(3, 4, 4), 4}, - {index(4).terms(4, 4), 4}, - // conflicts with existing entries - {index(1).terms(4, 4), 1}, - {index(2).terms(1, 4, 4), 2}, - {index(3).terms(1, 2, 4, 4), 3}, - } - - for i, tt := range tests { - t.Run(fmt.Sprint(i), func(t *testing.T) { - raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append(previousEnts...) - require.Equal(t, tt.wconflict, raftLog.findConflict(tt.ents)) + {prev: ids[0], ents: index(1).terms(1, 2, 3, 4, 4), want: ids[3]}, + {prev: ids[1], ents: index(2).terms(2, 3, 4, 4), want: ids[3]}, + {prev: ids[2], ents: index(3).terms(3, 4, 4), want: ids[3]}, + {prev: ids[3], ents: index(4).terms(4, 4), want: ids[3]}, + // passes prev check, but conflicts with existing entries + {prev: ids[0], ents: index(1).terms(4, 4), want: ids[0]}, + {prev: ids[1], ents: index(2).terms(1, 4, 4), want: ids[1]}, + {prev: ids[2], ents: index(3).terms(2, 2, 4, 4), want: ids[2]}, + // prev does not match + {prev: entryID{term: 4, index: 1}, ents: index(2).terms(4, 4), notOk: true}, + {prev: entryID{term: 5, index: 2}, ents: index(3).terms(5, 6), notOk: true}, + // out of bounds + {prev: entryID{term: 3, index: 10}, ents: index(11).terms(3), notOk: true}, + // just touching the right bound, but still out of bounds + {prev: entryID{term: 3, index: 4}, ents: index(5).terms(3, 3, 4), notOk: true}, + } { + t.Run("", func(t *testing.T) { + log := newLog(NewMemoryStorage(), discardLogger) + log.append(previousEnts...) + app := logSlice{term: 100, prev: tt.prev, entries: tt.ents} + require.NoError(t, app.valid()) + match, ok := log.findConflict(app) + require.Equal(t, !tt.notOk, ok) + require.Equal(t, tt.want, match) }) } }