Skip to content

Commit

Permalink
fix: Make batch sealing retry correct (#367)
Browse files Browse the repository at this point in the history
* proof: Improve DecodeCommit1OutRaw error message

* fix: Improve supraseal batch ref tracking

* fix: Improve supraseal batch ref tracking

* fix: Catch Batch C1 exceptions

* supra library version check

* improve slotmgr metrics, fix non-zero slot

* slotmgr expvar

* fix deadlock

* batch: correctly collect sector numbers

* fix slotmgr in finalize

* storage: Recompute bad supra C1

* storage: Correctly retry supra C1 compute

* porep: More generous retry

* storage: Only do 1 supra C1 in parallel

* make gen
  • Loading branch information
magik6k authored Jan 24, 2025
1 parent fb8ca64 commit 20e110a
Show file tree
Hide file tree
Showing 11 changed files with 475 additions and 150 deletions.
7 changes: 3 additions & 4 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,18 +328,17 @@ func addSealingTasks(
}

if cfg.Subsystems.EnableBatchSeal {
slotMgr = slotmgr.NewSlotMgr()

batchSealTask, err := sealsupra.NewSupraSeal(
batchSealTask, sm, err := sealsupra.NewSupraSeal(
cfg.Seal.BatchSealSectorSize,
cfg.Seal.BatchSealBatchSize,
cfg.Seal.BatchSealPipelines,
!cfg.Seal.SingleHasherPerThread,
cfg.Seal.LayerNVMEDevices,
machineHostPort, slotMgr, db, full, stor, si)
machineHostPort, db, full, stor, si)
if err != nil {
return nil, xerrors.Errorf("setting up batch sealer: %w", err)
}
slotMgr = sm
activeTasks = append(activeTasks, batchSealTask)
addFinalize = true
}
Expand Down
51 changes: 32 additions & 19 deletions extern/supraseal/sealing/supra_seal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ static void init_ctx(size_t sector_size) {
}
}

extern "C"
int supra_version() {
return 0x100001;
}

extern "C"
void supra_seal_init(size_t sector_size, const char* config_file) {
printf("INIT called %s\n", config_file);
Expand Down Expand Up @@ -207,29 +212,37 @@ int c1(size_t block_offset, size_t num_sectors, size_t sector_slot,
const uint8_t* ticket, const char* cache_path,
const char* parents_filename, const char* replica_path,
size_t sector_size) {
size_t qpair = sealing_ctx->topology->c1_qpair;
int node_reader_core = sealing_ctx->topology->c1_reader;
const char* output_dir = cache_path;
try {
size_t qpair = sealing_ctx->topology->c1_qpair;
int node_reader_core = sealing_ctx->topology->c1_reader;
const char* output_dir = cache_path;

init_ctx(sector_size);
init_ctx(sector_size);

#define CALL_C1(C) \
{ \
streaming_node_reader_t<C> reader(sealing_ctx->controllers, qpair, \
block_offset, node_reader_core, \
sealing_ctx->topology->c1_sleep_time); \
return do_c1<C>(reader, \
num_sectors, sector_slot, \
replica_id, seed, \
ticket, cache_path, \
parents_filename, replica_path, \
output_dir); \
}
#define CALL_C1(C) \
{ \
streaming_node_reader_t<C> reader(sealing_ctx->controllers, qpair, \
block_offset, node_reader_core, \
sealing_ctx->topology->c1_sleep_time); \
return do_c1<C>(reader, \
num_sectors, sector_slot, \
replica_id, seed, \
ticket, cache_path, \
parents_filename, replica_path, \
output_dir); \
}

SECTOR_PARAMS_TABLE(SECTOR_CALL_TABLE(CALL_C1));
#undef CALL_C1
SECTOR_PARAMS_TABLE(SECTOR_CALL_TABLE(CALL_C1));
#undef CALL_C1

return 0;
return 0;
} catch (const std::exception& e) {
fprintf(stderr, "Exception in c1: %s\n", e.what());
std::terminate();
} catch (...) {
fprintf(stderr, "Unknown exception in c1\n");
std::terminate();
}
}

template<class C>
Expand Down
1 change: 1 addition & 0 deletions extern/supraseal/sealing/supra_seal.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
extern "C" {
#endif

int supra_version();

// Optional init function.
// config_file - topology config file. Defaults to supra_config.cfg
Expand Down
84 changes: 52 additions & 32 deletions lib/paths/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,8 @@ func (st *Local) ReadSnapVanillaProof(ctx context.Context, sr storiface.SectorRe
return out, nil
}

var supraC1Token = make(chan struct{}, 1)

func (st *Local) supraPoRepVanillaProof(src storiface.SectorPaths, sr storiface.SectorRef, sealed, unsealed cid.Cid, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness) ([]byte, error) {
batchMetaPath := filepath.Join(src.Cache, BatchMetaFile)
bmdata, err := os.ReadFile(batchMetaPath)
Expand Down Expand Up @@ -1178,49 +1180,67 @@ func (st *Local) supraPoRepVanillaProof(src storiface.SectorPaths, sr storiface.

// first see if commit-phase1-output is there
commitPhase1OutputPath := filepath.Join(src.Cache, CommitPhase1OutputFileSupra)
if _, err := os.Stat(commitPhase1OutputPath); err != nil {
if !os.IsNotExist(err) {
return nil, xerrors.Errorf("stat commit phase1 output: %w", err)
}

parentsPath, err := ParentsForProof(sr.ProofType)
if err != nil {
return nil, xerrors.Errorf("parents for proof: %w", err)
}
var retry bool

// not found, compute it
res := supraffi.C1(bm.BlockOffset, bm.BatchSectors, bm.NumInPipeline, replicaID[:], seed, ticket, src.Cache, parentsPath, src.Sealed, uint64(ssize))
if res != 0 {
return nil, xerrors.Errorf("c1 failed: %d", res)
for {
if retry {
if err := os.Remove(commitPhase1OutputPath); err != nil {
return nil, xerrors.Errorf("remove bad commit phase 1 output file: %w", err)
}
}
retry = true

// check again
if _, err := os.Stat(commitPhase1OutputPath); err != nil {
return nil, xerrors.Errorf("stat commit phase1 output after compute: %w", err)
if !os.IsNotExist(err) {
return nil, xerrors.Errorf("stat commit phase1 output: %w", err)
}

parentsPath, err := ParentsForProof(sr.ProofType)
if err != nil {
return nil, xerrors.Errorf("parents for proof: %w", err)
}

// not found, compute it
supraC1Token <- struct{}{}
res := supraffi.C1(bm.BlockOffset, bm.BatchSectors, bm.NumInPipeline, replicaID[:], seed, ticket, src.Cache, parentsPath, src.Sealed, uint64(ssize))
<-supraC1Token

if res != 0 {
return nil, xerrors.Errorf("c1 failed: %d", res)
}

// check again
if _, err := os.Stat(commitPhase1OutputPath); err != nil {
return nil, xerrors.Errorf("stat commit phase1 output after compute: %w", err)
}
}
}

// read the output
rawOut, err := os.ReadFile(commitPhase1OutputPath)
if err != nil {
return nil, xerrors.Errorf("read commit phase1 output: %w", err)
}
// read the output
rawOut, err := os.ReadFile(commitPhase1OutputPath)
if err != nil {
return nil, xerrors.Errorf("read commit phase1 output: %w", err)
}

// decode
dec, err := cuproof.DecodeCommit1OutRaw(bytes.NewReader(rawOut))
if err != nil {
return nil, xerrors.Errorf("decode commit phase1 output: %w", err)
}
// decode
dec, err := cuproof.DecodeCommit1OutRaw(bytes.NewReader(rawOut))
if err != nil {
log.Errorw("failed to decode commit phase1 output, will retry", "err", err)
time.Sleep(1 * time.Second)
continue
}

log.Infow("supraPoRepVanillaProof", "sref", sr, "replicaID", replicaID, "seed", seed, "ticket", ticket, "decrepl", dec.ReplicaID, "decr", dec.CommR, "decd", dec.CommD)
log.Infow("supraPoRepVanillaProof", "sref", sr, "replicaID", replicaID, "seed", seed, "ticket", ticket, "decrepl", dec.ReplicaID, "decr", dec.CommR, "decd", dec.CommD)

// out is json, so we need to marshal it back
out, err := json.Marshal(dec)
if err != nil {
return nil, xerrors.Errorf("marshal commit phase1 output: %w", err)
}
// out is json, so we need to marshal it back
out, err := json.Marshal(dec)
if err != nil {
log.Errorw("failed to decode commit phase1 output", "err", err)
time.Sleep(1 * time.Second)
}

return out, nil
return out, nil
}
}

var _ Store = &Local{}
13 changes: 11 additions & 2 deletions lib/proof/porep_vproof_bin_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/binary"
"fmt"
"io"

"golang.org/x/xerrors"
)

// This file contains a bincode decoder for Commit1OutRaw.
Expand Down Expand Up @@ -67,8 +69,15 @@ func DecodeCommit1OutRaw(r io.Reader) (Commit1OutRaw, error) {
}

// Read last byte, require EOF
if _, err := r.Read(make([]byte, 1)); err != io.EOF {
return out, fmt.Errorf("expected EOF")
b := make([]byte, 1)
if n, err := r.Read(b); err != io.EOF {
if err != nil {
return out, xerrors.Errorf("expected EOF, got: %w (n:%d, b:%x)", err, n, b)
}

n2, derr := io.Copy(io.Discard, r)

return out, xerrors.Errorf("expected EOF (ndc:%d, b:%x, derr:%x)", int64(n)+n2, b, derr)
}

return out, nil
Expand Down
36 changes: 34 additions & 2 deletions lib/slotmgr/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ package slotmgr
import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

var (
pre = "slotmgr_"

// KeySlotOffset tags metrics with the slot offset.
KeySlotOffset, _ = tag.NewKey("slot_offset")
)

// SlotMgrMeasures groups all slotmgr metrics.
// SlotMgrMeasures groups the high-level slotmgr metrics.
var SlotMgrMeasures = struct {
SlotsAvailable *stats.Int64Measure
SlotsAcquired *stats.Int64Measure
Expand All @@ -22,25 +26,53 @@ var SlotMgrMeasures = struct {
SlotErrors: stats.Int64(pre+"slot_errors", "Total number of slot errors (e.g., failed to put).", stats.UnitDimensionless),
}

// init registers the views for slotmgr metrics.
// SlotMgrSlotMeasures groups per-slot metrics.
var SlotMgrSlotMeasures = struct {
SlotInUse *stats.Int64Measure
SlotSectorCount *stats.Int64Measure
}{
SlotInUse: stats.Int64(pre+"slot_in_use", "Slot actively in use (batch sealing). 1=in use, 0=not in use", stats.UnitDimensionless),
SlotSectorCount: stats.Int64(pre+"slot_sector_count", "Number of sectors in the slot", stats.UnitDimensionless),
}

func init() {
err := view.Register(
&view.View{
Measure: SlotMgrMeasures.SlotsAvailable,
Description: "Number of available slots",
Aggregation: view.LastValue(),
},
&view.View{
Measure: SlotMgrMeasures.SlotsAcquired,
Description: "Total number of slots acquired",
Aggregation: view.Sum(),
},
&view.View{
Measure: SlotMgrMeasures.SlotsReleased,
Description: "Total number of slots released",
Aggregation: view.Sum(),
},
&view.View{
Measure: SlotMgrMeasures.SlotErrors,
Description: "Total number of slot errors",
Aggregation: view.Sum(),
},

// Register per-slot metrics
&view.View{
Name: pre + "slot_in_use",
Measure: SlotMgrSlotMeasures.SlotInUse,
Description: "Slot is in use (1) or not (0)",
TagKeys: []tag.Key{KeySlotOffset},
Aggregation: view.LastValue(),
},
&view.View{
Name: pre + "slot_sector_count",
Measure: SlotMgrSlotMeasures.SlotSectorCount,
Description: "Number of sectors in a slot",
TagKeys: []tag.Key{KeySlotOffset},
Aggregation: view.LastValue(),
},
)
if err != nil {
panic(err)
Expand Down
Loading

0 comments on commit 20e110a

Please sign in to comment.