Skip to content

Commit

Permalink
fix: Some more batching fixes (#355)
Browse files Browse the repository at this point in the history
* webui: Make batching more visible in pipeline pages

* fix snap schedule
  • Loading branch information
magik6k authored Jan 8, 2025
1 parent 19f943a commit 0a968f0
Show file tree
Hide file tree
Showing 8 changed files with 811 additions and 366 deletions.
6 changes: 3 additions & 3 deletions tasks/seal/poller_commit_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,12 @@ func (s *SealPoller) pollCommitMsgFail(ctx context.Context, maddr address.Addres

return xerrors.Errorf("sector not found after, commit message can't be found either")
default:
return xerrors.Errorf("commit message failed with exit code %s", exitcode.ExitCode(execResult.ExecutedRcptExitCode))
return xerrors.Errorf("commit message (s %d:%d m:%s) failed with exit code %s", task.SpID, task.SectorNumber, execResult.CommitMsgCID.String, exitcode.ExitCode(execResult.ExecutedRcptExitCode))
}
}

func (s *SealPoller) pollRetryCommitMsgSend(ctx context.Context, task pollTask, execResult dbExecResult) error {
if execResult.CommitMsgCID == nil {
if !execResult.CommitMsgCID.Valid {
return xerrors.Errorf("commit msg cid was nil")
}

Expand All @@ -272,7 +272,7 @@ func (s *SealPoller) pollRetryCommitMsgSend(ctx context.Context, task pollTask,
_, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET
commit_msg_cid = NULL, task_id_commit_msg = NULL, after_commit_msg = FALSE
WHERE commit_msg_cid = $1 AND sp_id = $2 AND sector_number = $3 AND after_commit_msg_success = FALSE`,
*execResult.CommitMsgCID, task.SpID, task.SectorNumber)
execResult.CommitMsgCID.String, task.SpID, task.SectorNumber)
if err != nil {
return xerrors.Errorf("update sectors_sdr_pipeline to retry precommit msg send: %w", err)
}
Expand Down
11 changes: 6 additions & 5 deletions tasks/seal/poller_precommit_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package seal

import (
"context"
"database/sql"
"sort"
"time"

Expand Down Expand Up @@ -131,8 +132,8 @@ func (s *SealPoller) sendPreCommitBatch(ctx context.Context, spid int64, sectors
}

type dbExecResult struct {
PrecommitMsgCID *string `db:"precommit_msg_cid"`
CommitMsgCID *string `db:"commit_msg_cid"`
PrecommitMsgCID sql.NullString `db:"precommit_msg_cid"`
CommitMsgCID sql.NullString `db:"commit_msg_cid"`

ExecutedTskCID string `db:"executed_tsk_cid"`
ExecutedTskEpoch int64 `db:"executed_tsk_epoch"`
Expand Down Expand Up @@ -195,12 +196,12 @@ func (s *SealPoller) pollPrecommitMsgFail(ctx context.Context, task pollTask, ex
// just retry
return s.pollRetryPrecommitMsgSend(ctx, task, execResult)
default:
return xerrors.Errorf("precommit message failed with exit code %s", exitcode.ExitCode(execResult.ExecutedRcptExitCode))
return xerrors.Errorf("precommit message (s %d:%d m:%s) failed with exit code %s", task.SpID, task.SectorNumber, execResult.PrecommitMsgCID.String, exitcode.ExitCode(execResult.ExecutedRcptExitCode))
}
}

func (s *SealPoller) pollRetryPrecommitMsgSend(ctx context.Context, task pollTask, execResult dbExecResult) error {
if execResult.PrecommitMsgCID == nil {
if !execResult.PrecommitMsgCID.Valid {
return xerrors.Errorf("precommit msg cid was nil")
}

Expand All @@ -209,7 +210,7 @@ func (s *SealPoller) pollRetryPrecommitMsgSend(ctx context.Context, task pollTas
_, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET
precommit_msg_cid = NULL, task_id_precommit_msg = NULL, after_precommit_msg = FALSE
WHERE precommit_msg_cid = $1 AND sp_id = $2 AND sector_number = $3 AND after_precommit_msg_success = FALSE`,
*execResult.PrecommitMsgCID, task.SpID, task.SectorNumber)
execResult.PrecommitMsgCID.String, task.SpID, task.SectorNumber)
if err != nil {
return xerrors.Errorf("update sectors_sdr_pipeline to retry precommit msg send: %w", err)
}
Expand Down
272 changes: 151 additions & 121 deletions tasks/snap/task_submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"time"

"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -101,7 +102,10 @@ func NewSubmitTask(db *harmonydb.DB, api SubmitTaskNodeAPI, bstore curiochain.Cu

cfg: submitConfig{
batch: updateBatchingConfig{
MaxUpdateBatch: 256,
// max mpool message is 64k
// snap has 16x192 snarks + 50 or so bytes of other message overhead,
// so we can only really fit ~20-16 updates in a message
MaxUpdateBatch: 16,
Slack: time.Duration(cfg.Batching.Update.Slack),
Timeout: time.Duration(cfg.Batching.Update.Timeout),
BaseFeeThreshold: abi.TokenAmount(cfg.Batching.Update.BaseFeeThreshold),
Expand Down Expand Up @@ -514,149 +518,175 @@ func (s *SubmitTask) TypeDetails() harmonytask.TaskTypeDetails {
}
}

func (s *SubmitTask) schedule(ctx context.Context, taskFunc harmonytask.AddTaskFunc) error {
// schedule submits
taskFunc(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
var cmt bool
type task struct {
SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"`
UpgradeProof int `db:"upgrade_proof"`
StartEpoch abi.ChainEpoch `db:"smallest_direct_start_epoch"`
UpdateReadyAt *time.Time `db:"update_ready_at"`
}
func (s *SubmitTask) schedule(ctx context.Context, addTaskFunc harmonytask.AddTaskFunc) error {
var done bool

for !done {
addTaskFunc(func(taskID harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
//----------------------------------
// 1) Gather candidate tasks to schedule
//----------------------------------
var rawRows []struct {
SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"`
UpgradeProof int64 `db:"upgrade_proof"`
UpdateReadyAt *time.Time `db:"update_ready_at"`
StartEpoch int64 `db:"smallest_direct_start_epoch"`
}

type sectorBatch struct {
cutoff abi.ChainEpoch
earliest time.Time
sectors []int64
}
err := tx.Select(&rawRows, `
SELECT
ssp.sp_id,
ssp.sector_number,
ssp.upgrade_proof,
ssp.update_ready_at,
MIN(ssip.direct_start_epoch) AS smallest_direct_start_epoch
FROM
sectors_snap_pipeline ssp
JOIN
sectors_snap_initial_pieces ssip
ON (ssp.sp_id = ssip.sp_id AND ssp.sector_number = ssip.sector_number)
WHERE
ssp.failed = FALSE
AND ssp.after_encode = TRUE
AND ssp.after_prove = TRUE
AND ssp.after_submit = FALSE
AND (ssp.submit_after IS NULL OR ssp.submit_after < NOW())
AND ssp.task_id_submit IS NULL
AND ssp.update_ready_at IS NOT NULL
GROUP BY
ssp.sp_id, ssp.sector_number, ssp.upgrade_proof, ssp.update_ready_at
ORDER BY
ssp.sp_id, ssp.sector_number
`)
if err != nil {
return false, xerrors.Errorf("selecting candidate snap updates: %w", err)
}

var tasks []task

err := tx.Select(&tasks, `SELECT
ssp.sp_id,
ssp.sector_number,
ssp.upgrade_proof,
ssp.update_ready_at,
MIN(ssip.direct_start_epoch) AS smallest_direct_start_epoch
FROM
sectors_snap_pipeline ssp
JOIN
sectors_snap_initial_pieces ssip
ON
ssp.sp_id = ssip.sp_id AND ssp.sector_number = ssip.sector_number
WHERE
ssp.failed = FALSE
AND ssp.after_encode = TRUE
AND ssp.after_prove = TRUE
AND ssp.after_submit = FALSE
AND (ssp.submit_after IS NULL OR ssp.submit_after < NOW())
AND ssp.task_id_submit IS NULL
GROUP BY
ssp.sp_id, ssp.sector_number, ssp.upgrade_proof
ORDER BY
ssp.sector_number ASC;`)
if err != nil {
return false, xerrors.Errorf("getting tasks: %w", err)
}
if len(rawRows) == 0 {
// No tasks left to schedule => set done=true and do not commit
done = true
return false, nil
}

if len(tasks) == 0 {
return false, nil
}
//----------------------------------
// 2) Group them by (sp_id, upgradeProof)
//----------------------------------
type rowInfo struct {
SectorNumber int64
UpdateReadyAt *time.Time
StartEpoch int64
}
batchMap := make(map[int64]map[int64][]rowInfo) // sp_id -> upgradeProof -> []rowInfo

// Make batches based on Proof types
ts, err := s.api.ChainHead(ctx)
if err != nil {
log.Errorf("error getting chain head: %s", err)
return
}
for _, row := range rawRows {
upMap, ok := batchMap[row.SpID]
if !ok {
upMap = make(map[int64][]rowInfo)
batchMap[row.SpID] = upMap
}
upMap[row.UpgradeProof] = append(upMap[row.UpgradeProof], rowInfo{
SectorNumber: row.SectorNumber,
UpdateReadyAt: row.UpdateReadyAt,
StartEpoch: row.StartEpoch,
})
}

batchMap := make(map[int64]map[abi.RegisteredUpdateProof][]task)
for i := range tasks {
// Check if SpID exists in batchMap
v, ok := batchMap[tasks[i].SpID]
if !ok {
// If not, initialize a new map for the RegisteredSealProof
v = make(map[abi.RegisteredUpdateProof][]task)
batchMap[tasks[i].SpID] = v
//----------------------------------
// 3) Try to find exactly one group that meets scheduling conditions
//----------------------------------
ts, err := s.api.ChainHead(ctx)
if err != nil {
// Serious error => rollback
return false, xerrors.Errorf("chain head error: %w", err)
}
// Append the task to the correct RegisteredSealProof
v[abi.RegisteredUpdateProof(tasks[i].UpgradeProof)] = append(v[abi.RegisteredUpdateProof(tasks[i].UpgradeProof)], tasks[i])
}

// Send batches per MinerID and per Proof type based on the following logic:
// 1. Check if Slack for any sector is reaching, if yes then send full batch
// 2. Check if timeout is reaching for any sector in the batch, if yes, then send the batch
// 3. Check if baseFee below set threshold. If yes then send all batches

for spid, miners := range batchMap {
for _, pts := range miners {
// Break into batches
var batches []sectorBatch
for i := 0; i < len(pts); i += s.cfg.batch.MaxUpdateBatch {
// Create a batch of size `maxBatchSize` or smaller for the last batch
end := i + s.cfg.batch.MaxUpdateBatch
if end > len(pts) {
end = len(pts)
// We'll iterate all groups, and if we find one that meets conditions, we schedule & commit.
for spID, proofMap := range batchMap {
for _, rows := range proofMap {
//----------------------------------
// 4) Possibly do sub-batching
//----------------------------------
maxBatch := s.cfg.batch.MaxUpdateBatch

toSchedule := rows
if maxBatch != 0 && len(rows) > maxBatch {
toSchedule = rows[:maxBatch]
}
var batch []int64
cutoff := abi.ChainEpoch(0)
earliest := time.Now()
for _, pt := range pts[i:end] {

if cutoff == 0 || pt.StartEpoch < cutoff {
cutoff = pt.StartEpoch
}
//----------------------------------
// 5) Check scheduling conditions (slack, baseFee, etc.)
//----------------------------------
var earliestStart int64 = math.MaxInt64
var earliestTime time.Time

if pt.UpdateReadyAt.Before(earliest) {
earliest = *pt.UpdateReadyAt
for _, row := range toSchedule {
if row.StartEpoch < earliestStart {
earliestStart = row.StartEpoch
}
if row.UpdateReadyAt != nil {
if earliestTime.IsZero() || row.UpdateReadyAt.Before(earliestTime) {
earliestTime = *row.UpdateReadyAt
}
}

batch = append(batch, pt.SectorNumber)
}

batches = append(batches, sectorBatch{
cutoff: cutoff,
sectors: batch,
})
}
deltaBlocks := earliestStart - int64(ts.Height())
timeUntil := time.Duration(deltaBlocks*int64(build.BlockDelaySecs)) * time.Second
scheduleNow := false

for i := range batches {
batch := batches[i]
//sectors := batch.sectors
// Process batch if slack has reached
if (time.Duration(batch.cutoff-ts.Height()) * time.Duration(build.BlockDelaySecs) * time.Second) < s.cfg.batch.Slack {
_, err = tx.Exec(`UPDATE sectors_snap_pipeline SET task_id_submit = $1, submit_after = NULL WHERE sp_id = $2 AND sector_number = $3`, id, spid, batch.sectors)
if err != nil {
return false, xerrors.Errorf("updating task id: %w", err)
// Slack
if timeUntil < s.cfg.batch.Slack {
scheduleNow = true
}

// Base fee check
if !scheduleNow {
if ts.MinTicketBlock().ParentBaseFee.LessThan(s.cfg.batch.BaseFeeThreshold) {
scheduleNow = true
}
cmt = true
continue
}
// Process batch if timeout has reached
if batch.earliest.Add(s.cfg.batch.Timeout).After(time.Now()) {
_, err = tx.Exec(`UPDATE sectors_snap_pipeline SET task_id_submit = $1, submit_after = NULL WHERE sp_id = $2 AND sector_number = $3`, id, spid, batch.sectors)
if err != nil {
return false, xerrors.Errorf("updating task id: %w", err)

// Timeout since earliestTime
if !scheduleNow && !earliestTime.IsZero() {
if time.Since(earliestTime) > s.cfg.batch.Timeout {
scheduleNow = true
}
cmt = true
}

if !scheduleNow {
// This group isn't ready to schedule. Move on to the next group.
continue
}
// Process batch if base fee is low enough for us to send
if ts.MinTicketBlock().ParentBaseFee.LessThan(s.cfg.batch.BaseFeeThreshold) {
_, err = tx.Exec(`UPDATE sectors_snap_pipeline SET task_id_submit = $1, submit_after = NULL WHERE sp_id = $2 AND sector_number = $3`, id, spid, batch.sectors)

//----------------------------------
// 6) Actually schedule: set task_id_submit=taskID for chosen rows
//----------------------------------
for _, row := range toSchedule {
_, err = tx.Exec(`
UPDATE sectors_snap_pipeline
SET task_id_submit = $1,
submit_after = NULL
WHERE sp_id = $2
AND sector_number = $3
AND task_id_submit IS NULL
`, taskID, spID, row.SectorNumber)
if err != nil {
return false, xerrors.Errorf("updating task id: %w", err)
return false, xerrors.Errorf("failed to set task_id_submit: %w", err)
}
cmt = true
continue
}

// We scheduled this group => commit & exit the transaction callback
return true, nil
}
}
}
return cmt, nil
})

// If we got here, we didn't find *any* group meeting conditions => no scheduling
// So let's set done = true to avoid indefinite looping.
done = true
return false, nil
})
}

// update landed
var tasks []struct {
Expand Down Expand Up @@ -733,7 +763,7 @@ func (s *SubmitTask) updateLanded(ctx context.Context, spId, sectorNum int64) er

return xerrors.Errorf("sector info after prove message not found not as expected")
default:
return xerrors.Errorf("commit message failed with exit code %s", exitcode.ExitCode(execResult[0].ExecutedRcptExitCode))
return xerrors.Errorf("prove message (m:%s) failed with exit code %s", execResult[0].ProveMsgCID, exitcode.ExitCode(execResult[0].ExecutedRcptExitCode))
}

si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(sectorNum), types.EmptyTSK)
Expand Down
Loading

0 comments on commit 0a968f0

Please sign in to comment.