Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use ChainExportRangeInternal to write snapshots directly to disk #67

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 100 additions & 122 deletions cmd/filecoin-chain-archiver/cmds/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"io"
"math/rand"
"net/url"
"os"
"path/filepath"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -41,53 +43,37 @@ func Compress(in io.Reader, out io.Writer) error {
return enc.Close()
}

type autocloser struct {
rc io.ReadCloser
}

func (ac autocloser) Read(p []byte) (n int, err error) {
n, err = ac.rc.Read(p)
if err != nil {
_ = ac.rc.Close()
}
return
}

func AutoCloser(rc io.ReadCloser) io.Reader {
return &autocloser{rc}
type snapshotInfo struct {
digest string
size int64
filename string
latestIndex string
latestLocation string
}

type multi struct {
io.Writer
cs []io.Closer
type snapshotReader struct {
reader io.Reader
errCh chan error
}

func MultiWriteCloser(ws ...io.Writer) io.WriteCloser {
m := &multi{Writer: io.MultiWriter(ws...)}
for _, w := range ws {
if c, ok := w.(io.Closer); ok {
m.cs = append(m.cs, c)
func (sr *snapshotReader) Read(p []byte) (n int, err error) {
n, _ = sr.reader.Read(p)
select {
case err := <-sr.errCh:
if err != nil {
return n, err
}
return n, io.EOF
default:
}
return m
return n, nil
}

func (m *multi) Close() error {
var first error
for _, c := range m.cs {
if err := c.Close(); err != nil && first == nil {
first = err
}
func newSnapshotReader(reader io.Reader, errChan chan error) *snapshotReader {
return &snapshotReader{
reader: reader,
errCh: errChan,
}
return first
}

type snapshotInfo struct {
digest string
size int64
filename string
latestIndex string
latestLocation string
}

var cmdCreate = &cli.Command{
Expand Down Expand Up @@ -193,12 +179,22 @@ var cmdCreate = &cli.Command{
EnvVars: []string{"FCA_CREATE_STATEROOT_COUNT"},
Value: 2000,
},
&cli.StringFlag{
Name: "filename",
Usage: "name of exported CAR file for internal chain export",
EnvVars: []string{"FCA_EXPORT_FILENAME"},
},
&cli.DurationFlag{
Name: "progress-update",
Usage: "how frequenty to provide provide update logs",
EnvVars: []string{"FCA_CREATE_PROGRESS_UPDATE"},
Value: 60 * time.Second,
},
&cli.StringFlag{
Name: "export-dir",
Usage: "directory where to save the exported CAR file",
EnvVars: []string{"FCA_EXPORT_DIR"},
},
},
Action: func(cctx *cli.Context) error {
ctx := context.Background()
Expand All @@ -215,9 +211,11 @@ var cmdCreate = &cli.Command{
flagConfigPath := cctx.String("config-path")
flagInterval := cctx.Int("interval")
flagConfidence := cctx.Int("confidence")
flagHeight := cctx.Int("height")
flagAfter := cctx.Int("after")
flagHeight := cctx.Int("height")
flagStaterootCount := cctx.Int("stateroot-count")
flagExportDir := cctx.String("export-dir")
flagFileName := cctx.String("filename")

u, err := url.Parse(flagBucketEndpoint)
if err != nil {
Expand Down Expand Up @@ -299,7 +297,13 @@ var cmdCreate = &cli.Command{
time.Sleep(time.Until(t))
bt := time.Now()

tsk, err := cm.GetTipset(ctx, height)
headTs, err := cm.GetTipset(ctx, height)
if err != nil {
return err
}

tailHeight := flagHeight - flagStaterootCount
tailTs, err := cm.GetTipset(ctx, abi.ChainEpoch(tailHeight))
if err != nil {
return err
}
Expand All @@ -324,7 +328,7 @@ var cmdCreate = &cli.Command{
logger.Infow("iteration", "value", iteration)
cm.ShiftStartNode(iteration)

node, peerID, err := cm.GetNodeWithTipSet(ctx, tsk, filterList)
node, peerID, err := cm.GetNodeWithTipSet(ctx, headTs, filterList)
if err != nil {
return err
}
Expand All @@ -340,12 +344,7 @@ var cmdCreate = &cli.Command{
return xerrors.Errorf("failed to aquire lock")
}

rr, wr := io.Pipe()
rc, wc := io.Pipe()

mw := MultiWriteCloser(wr, wc)

e := export.NewExport(node, tsk, abi.ChainEpoch(flagStaterootCount), true, mw)
e := export.NewExport(node, headTs, tailTs, flagFileName, flagExportDir)
errCh := make(chan error)
go func() {
errCh <- e.Export(ctx)
Expand All @@ -372,39 +371,51 @@ var cmdCreate = &cli.Command{
}
}()

rrPath := filepath.Join(flagExportDir, flagFileName)
for {
info, err := os.Stat(rrPath)
if os.IsNotExist(err) {
logger.Infow("waiting for snapshot car file to begin writing")
time.Sleep(time.Second * 15)
continue
} else if info.IsDir() {
return xerrors.Errorf("trying to open directory instead of car file")
}
break
}

f, err := os.OpenFile(rrPath, os.O_RDONLY, 444)
if err != nil {
return err
}
defer f.Close()
rr := newSnapshotReader(f, errCh)

go func() {
var lastSize int
var lastSize int64
for {
select {
case <-time.After(flagProgressUpdate):
size, done := e.Progress()
size := e.Progress(rrPath)
if size == 0 {
continue
}

if done {
return
}

logger.Infow("update", "total", size, "speed", (size-lastSize)/int(flagProgressUpdate/time.Second))
logger.Infow("update", "total", size, "speed", (size-lastSize)/int64(flagProgressUpdate/time.Second))
lastSize = size
case err := <-errCh:
if err != nil {
break
}
}
}
}()

if flagDiscard {
logger.Infow("discarding output")
g, _ := errgroup.WithContext(ctx)

g.Go(func() error {
_, err := io.Copy(io.Discard, rr)
return err
})
g, ctxGroup := errgroup.WithContext(ctx)
g.Go(func() error {
_, err := io.Copy(io.Discard, rc)
return err
return runWriteCompressed(ctxGroup, rrPath+".zstd", rr)
})

if err := g.Wait(); err != nil {
return err
}
Expand Down Expand Up @@ -432,23 +443,15 @@ var cmdCreate = &cli.Command{
return err
}

t := export.TimeAtHeight(gtp, height, 30*time.Second)

name := fmt.Sprintf("%d_%s", height, t.Format("2006_01_02T15_04_05Z"))
//t := export.TimeAtHeight(gtp, height, 30*time.Second)

logger.Infow("object", "name", name)
logger.Infow("object", "name", flagFileName)

g, ctxGroup := errgroup.WithContext(ctx)
var siRaw *snapshotInfo
var siCompressed *snapshotInfo
g.Go(func() error {
var err error
siRaw, err = runUploadRaw(ctxGroup, minioClient, flagBucket, flagNamePrefix, flagRetrievalEndpointPrefix, name, peerID, bt, rr)
return err
})
g.Go(func() error {
var err error
siCompressed, err = runUploadCompressed(ctxGroup, minioClient, flagBucket, flagNamePrefix, flagRetrievalEndpointPrefix, name, peerID, bt, rc)
siCompressed, err = runUploadCompressed(ctxGroup, minioClient, flagBucket, flagNamePrefix, flagRetrievalEndpointPrefix, flagFileName+".zstd", peerID, bt, rr)
return err
})
if err := g.Wait(); err != nil {
Expand All @@ -458,7 +461,7 @@ var cmdCreate = &cli.Command{
return err
}

sis := []*snapshotInfo{siRaw, siCompressed}
sis := []*snapshotInfo{siCompressed}

var sb strings.Builder
for _, x := range sis {
Expand All @@ -467,12 +470,12 @@ var cmdCreate = &cli.Command{

sha256sum := sb.String()

_, err = minioClient.PutObject(ctx, flagBucket, fmt.Sprintf("%s%s.sha256sum", flagNamePrefix, name), strings.NewReader(sha256sum), -1, minio.PutObjectOptions{
ContentDisposition: fmt.Sprintf("attachment; filename=\"%s.sha256sum\"", name),
_, err = minioClient.PutObject(ctx, flagBucket, fmt.Sprintf("%s%s.sha256sum", flagNamePrefix, flagFileName), strings.NewReader(sha256sum), -1, minio.PutObjectOptions{
ContentDisposition: fmt.Sprintf("attachment; filename=\"%s.sha256sum\"", flagFileName),
ContentType: "text/plain",
})
if err != nil {
logger.Errorw("failed to write sha256sum", "object", fmt.Sprintf("%s%s.sha256sum", flagNamePrefix, name), "err", err)
logger.Errorw("failed to write sha256sum", "object", fmt.Sprintf("%s%s.sha256sum", flagNamePrefix, flagFileName), "err", err)
}

for _, x := range sis {
Expand Down Expand Up @@ -502,61 +505,36 @@ var cmdCreate = &cli.Command{
},
}

func runUploadRaw(ctx context.Context, minioClient *minio.Client, flagBucket, flagNamePrefix, flagRetrievalEndpointPrefix, name, peerID string, bt time.Time, source io.Reader) (*snapshotInfo, error) {
h := sha256.New()
r := io.TeeReader(source, h)

filename := fmt.Sprintf("%s.car", name)
func compress(source io.Reader) io.Reader {
r, w := io.Pipe()
go func() {
Compress(source, w)
w.Close()
}()
return r
}

info, err := minioClient.PutObject(ctx, flagBucket, fmt.Sprintf("%s%s", flagNamePrefix, filename), r, -1, minio.PutObjectOptions{
ContentDisposition: fmt.Sprintf("attachment; filename=\"%s\"", filename),
ContentType: "application/octet-stream",
})
func runWriteCompressed(ctx context.Context, path string, source io.Reader) error {
file, err := os.Create(path)
if err != nil {
return nil, fmt.Errorf("failed to upload object (%s): %w", fmt.Sprintf("%s%s", flagNamePrefix, filename), err)
return err
}

logger.Infow("snapshot upload",
"bucket", info.Bucket,
"key", info.Key,
"etag", info.ETag,
"size", info.Size,
"location", info.Location,
"version_id", info.VersionID,
"expiration", info.Expiration,
"expiration_rule_id", info.ExpirationRuleID,
)

snapshotSize := info.Size

latestLocation, err := url.JoinPath(flagRetrievalEndpointPrefix, info.Key)
r := compress(source)
n, err := io.Copy(file, r)
if err != nil {
logger.Errorw("failed to join request path", "request_prefix", flagRetrievalEndpointPrefix, "key", info.Key)
return nil, fmt.Errorf("failed to join request path: %w", err)
return err
}

digest := fmt.Sprintf("%x", h.Sum(nil))

return &snapshotInfo{
digest: digest,
size: snapshotSize,
filename: filename,
latestIndex: "latest",
latestLocation: latestLocation,
}, nil
logger.Infow("data copied to file:", n)
return nil
}

func runUploadCompressed(ctx context.Context, minioClient *minio.Client, flagBucket, flagNamePrefix, flagRetrievalEndpointPrefix, name, peerID string, bt time.Time, source io.Reader) (*snapshotInfo, error) {
r1 := compress(source)

r1, w1 := io.Pipe()
go func() {
Compress(source, w1)
w1.Close()
}()
h := sha256.New()
r := io.TeeReader(r1, h)

filename := fmt.Sprintf("%s.car.zst", name)
filename := name

info, err := minioClient.PutObject(ctx, flagBucket, fmt.Sprintf("%s%s", flagNamePrefix, filename), r, -1, minio.PutObjectOptions{
ContentDisposition: fmt.Sprintf("attachment; filename=\"%s\"", filename),
Expand Down
Loading