Skip to content

Commit

Permalink
Merge pull request #9241 from Crypt-iQ/fix_vb
Browse files Browse the repository at this point in the history
discovery+graph: track job set dependencies in vb
  • Loading branch information
Roasbeef authored Jan 24, 2025
2 parents baa34b0 + 323b633 commit c3cbfd8
Show file tree
Hide file tree
Showing 8 changed files with 815 additions and 567 deletions.
45 changes: 27 additions & 18 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,9 @@ type AuthenticatedGossiper struct {
// AuthenticatedGossiper lock.
chanUpdateRateLimiter map[uint64][2]*rate.Limiter

// vb is used to enforce job dependency ordering of gossip messages.
vb *ValidationBarrier

sync.Mutex
}

Expand All @@ -542,6 +545,8 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
banman: newBanman(),
}

gossiper.vb = NewValidationBarrier(1000, gossiper.quit)

gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
ChainHash: cfg.ChainHash,
ChanSeries: cfg.ChanSeries,
Expand Down Expand Up @@ -1409,10 +1414,6 @@ func (d *AuthenticatedGossiper) networkHandler() {
log.Errorf("Unable to rebroadcast stale announcements: %v", err)
}

// We'll use this validation to ensure that we process jobs in their
// dependency order during parallel validation.
validationBarrier := graph.NewValidationBarrier(1000, d.quit)

for {
select {
// A new policy update has arrived. We'll commit it to the
Expand Down Expand Up @@ -1481,11 +1482,17 @@ func (d *AuthenticatedGossiper) networkHandler() {
// We'll set up any dependent, and wait until a free
// slot for this job opens up, this allow us to not
// have thousands of goroutines active.
validationBarrier.InitJobDependencies(announcement.msg)
annJobID, err := d.vb.InitJobDependencies(
announcement.msg,
)
if err != nil {
announcement.err <- err
continue
}

d.wg.Add(1)
go d.handleNetworkMessages(
announcement, &announcements, validationBarrier,
announcement, &announcements, annJobID,
)

// The trickle timer has ticked, which indicates we should
Expand Down Expand Up @@ -1536,28 +1543,23 @@ func (d *AuthenticatedGossiper) networkHandler() {
//
// NOTE: must be run as a goroutine.
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier) {
deDuped *deDupedAnnouncements, jobID JobID) {

defer d.wg.Done()
defer vb.CompleteJob()
defer d.vb.CompleteJob()

// We should only broadcast this message forward if it originated from
// us or it wasn't received as part of our initial historical sync.
shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()

// If this message has an existing dependency, then we'll wait until
// that has been fully validated before we proceed.
err := vb.WaitForDependants(nMsg.msg)
err := d.vb.WaitForParents(jobID, nMsg.msg)
if err != nil {
log.Debugf("Validating network message %s got err: %v",
nMsg.msg.MsgType(), err)

if !graph.IsError(
err,
graph.ErrVBarrierShuttingDown,
graph.ErrParentValidationFailed,
) {

if errors.Is(err, ErrVBarrierShuttingDown) {
log.Warnf("unexpected error during validation "+
"barrier shutdown: %v", err)
}
Expand All @@ -1577,7 +1579,16 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,

// If this message had any dependencies, then we can now signal them to
// continue.
vb.SignalDependants(nMsg.msg, allow)
err = d.vb.SignalDependents(nMsg.msg, jobID)
if err != nil {
// Something is wrong if SignalDependents returns an error.
log.Errorf("SignalDependents returned error for msg=%v with "+
"JobID=%v", spew.Sdump(nMsg.msg), jobID)

nMsg.err <- err

return
}

// If the announcement was accepted, then add the emitted announcements
// to our announce batch to be broadcast once the trickle timer ticks
Expand Down Expand Up @@ -2407,7 +2418,6 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
err,
graph.ErrOutdated,
graph.ErrIgnored,
graph.ErrVBarrierShuttingDown,
) {

log.Error(err)
Expand Down Expand Up @@ -3148,7 +3158,6 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
if graph.IsError(
err, graph.ErrOutdated,
graph.ErrIgnored,
graph.ErrVBarrierShuttingDown,
) {

log.Debugf("Update edge for short_chan_id(%v) got: %v",
Expand Down
Loading

0 comments on commit c3cbfd8

Please sign in to comment.