Skip to content

Commit

Permalink
write zstd to file, fix blocking select in custom reader
Browse files Browse the repository at this point in the history
* do some refactoring so we can pipe the zstd encoded snapshot to file for debugging
* select statement in custom reader was blocking without setting default
  • Loading branch information
ognots committed Apr 21, 2023
1 parent a76b2c2 commit 3cb7c09
Showing 1 changed file with 63 additions and 37 deletions.
100 changes: 63 additions & 37 deletions cmd/filecoin-chain-archiver/cmds/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (sr *snapshotReader) Read(p []byte) (n int, err error) {
return n, err
}
return n, io.EOF
default:
}
}
}
Expand Down Expand Up @@ -380,10 +381,46 @@ var cmdCreate = &cli.Command{
}
}()

rrPath := filepath.Join(flagExportDir, flagFileName)
for {
info, err := os.Stat(rrPath)
if os.IsNotExist(err) {
logger.Infow("waiting for snapshot car file to begin writing")
time.Sleep(time.Second * 15)
continue
} else if info.IsDir() {
return xerrors.Errorf("trying to open directory instead of car file")
}
break
}

rr := newSnapshotReader(rrPath, errCh)

go func() {
var lastSize int64
for {
select {
case <-time.After(flagProgressUpdate):
size := e.Progress(rrPath)
if size == 0 {
continue
}
logger.Infow("update", "total", size, "speed", (size-lastSize)/int64(flagProgressUpdate/time.Second))
lastSize = size
case err := <- errCh:
if err != nil {
break
}
}
}
}()

if flagDiscard {
logger.Infow("discarding output")
g, _ := errgroup.WithContext(ctx)

g, ctxGroup := errgroup.WithContext(ctx)
g.Go(func() error {
return runWriteCompressed(ctxGroup, flagFileName+".zstd", rr)
})
if err := g.Wait(); err != nil {
return err
}
Expand All @@ -392,36 +429,6 @@ var cmdCreate = &cli.Command{
return err
}
} else {
rrPath := filepath.Join(flagExportDir, flagFileName)
for {
info, err := os.Stat(rrPath)
if os.IsNotExist(err) {
logger.Infow("waiting for snapshot car file to begin writing")
time.Sleep(time.Second * 15)
continue
} else if info.IsDir() {
return xerrors.Errorf("trying to open directory instead of car file")
}
break
}

rr := newSnapshotReader(rrPath, errCh)

go func() {
var lastSize int64
for {
select {
case <-time.After(flagProgressUpdate):
size := e.Progress(rrPath)
if size == 0 {
continue
}
logger.Infow("update", "total", size, "speed", (size-lastSize)/int64(flagProgressUpdate/time.Second))
lastSize = size
}
}
}()

host := u.Hostname()
port := u.Port()
if port == "" {
Expand Down Expand Up @@ -503,13 +510,32 @@ var cmdCreate = &cli.Command{
},
}

func runUploadCompressed(ctx context.Context, minioClient *minio.Client, flagBucket, flagNamePrefix, flagRetrievalEndpointPrefix, name, peerID string, bt time.Time, source io.Reader) (*snapshotInfo, error) {

r1, w1 := io.Pipe()
func compress(source io.Reader) io.Reader {
r, w := io.Pipe()
go func() {
Compress(source, w1)
w1.Close()
Compress(source, w)
w.Close()
}()
return r
}

func runWriteCompressed(ctx context.Context, name string, source io.Reader) error {
file, err := os.Create(name)
if err != nil {
return err
}
r := compress(source)
n, err := io.Copy(file, r)
if err != nil {
return err
}
logger.Infow("data copied to file:", n)
return nil
}

func runUploadCompressed(ctx context.Context, minioClient *minio.Client, flagBucket, flagNamePrefix, flagRetrievalEndpointPrefix, name, peerID string, bt time.Time, source io.Reader) (*snapshotInfo, error) {
r1 := compress(source)

h := sha256.New()
r := io.TeeReader(r1, h)

Expand Down

0 comments on commit 3cb7c09

Please sign in to comment.