Skip to content

Commit

Permalink
Merge pull request #64 from ipfs/feat/improve-flatfs
Browse files Browse the repository at this point in the history
Make flatfs robust
  • Loading branch information
Stebalien authored Feb 14, 2020
2 parents fdacf94 + 4ccc5e9 commit a348e6e
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 85 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
version: 2.1
orbs:
ci-go: ipfs/ci-go@0.1
ci-go: ipfs/ci-go@0.2.1

workflows:
version: 2
Expand Down
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
`go-ds-flatfs` is used by `go-ipfs` to store raw block contents on disk. It supports several sharding functions (prefix, suffix, next-to-last/*).

It is _not_ a general-purpose datastore and has several important restrictions.
See the restrictions section for details.

## Lead Maintainer

[Jakub Sztandera](https://github.com/kubuxu)
Expand All @@ -33,13 +36,21 @@
import "github.com/ipfs/go-ds-flatfs"
```

`go-ds-flatfs` uses [`Gx`](https://github.com/whyrusleeping/gx) and [`Gx-go`](https://github.com/whyrusleeping/gx-go) to handle dependendencies. Run `make deps` to download and rewrite the imports to their fixed dependencies.

## Usage

Check the [GoDoc module documentation](https://godoc.org/github.com/ipfs/go-ds-flatfs) for an overview of this module's
functionality.

### Restrictions

FlatFS keys are severely restricted. Only keys that match `/[0-9A-Z+-_=]\+` are
allowed. That is, keys may only contain upper-case alpha-numeric characters,
'-', '+', '_', and '='. This is because values are written directly to the
filesystem without encoding.

Importantly, this means namespaced keys (e.g., /FOO/BAR), are _not_ allowed.
Attempts to write to such keys will result in an error.

### DiskUsage and Accuracy

This datastore implements the [`PersistentDatastore`](https://godoc.org/github.com/ipfs/go-datastore#PersistentDatastore) interface. It offers a `DiskUsage()` method which strives to find a balance between accuracy and performance. This implies:
Expand Down
2 changes: 1 addition & 1 deletion convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func Move(oldPath string, newPath string, out io.Writer) error {
}
} else {
// else we found something unexpected, so to be safe just move it
log.Warningf("found unexpected file in datastore directory: \"%s\", moving anyway\n", fn)
log.Warnw("found unexpected file in datastore directory, moving anyways", "file", fn)
newPath := filepath.Join(newDS.path, fn)
err := os.Rename(oldPath, newPath)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package flatfs_test

import (
"bytes"
"encoding/hex"
"encoding/base32"
"io/ioutil"
"math/rand"
"os"
Expand Down Expand Up @@ -205,7 +205,7 @@ func populateDatastore(t *testing.T, dir string) ([]datastore.Key, [][]byte) {
r.Read(blk)
blocks = append(blocks, blk)

key := "x" + hex.EncodeToString(blk[:8])
key := "X" + base32.StdEncoding.EncodeToString(blk[:8])
keys = append(keys, datastore.NewKey(key))
err := ds.Put(keys[i], blocks[i])
if err != nil {
Expand Down
103 changes: 76 additions & 27 deletions flatfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ var (
ErrDatastoreDoesNotExist = errors.New("datastore directory does not exist")
ErrShardingFileMissing = fmt.Errorf("%s file not found in datastore", SHARDING_FN)
ErrClosed = errors.New("datastore closed")
ErrInvalidKey = errors.New("key not supported by flatfs")
)

func init() {
Expand Down Expand Up @@ -361,6 +362,10 @@ var putMaxRetries = 6
// concurrent Put and a Delete operation, we cannot guarantee which one
// will win.
func (fs *Datastore) Put(key datastore.Key, value []byte) error {
if !keyIsValid(key) {
return fmt.Errorf("when putting '%q': %w", key, ErrInvalidKey)
}

fs.shutdownLock.RLock()
defer fs.shutdownLock.RUnlock()
if fs.shutdown {
Expand Down Expand Up @@ -580,6 +585,11 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {
}

func (fs *Datastore) Get(key datastore.Key) (value []byte, err error) {
// Can't exist in datastore.
if !keyIsValid(key) {
return nil, datastore.ErrNotFound
}

_, path := fs.encode(key)
data, err := ioutil.ReadFile(path)
if err != nil {
Expand All @@ -593,6 +603,11 @@ func (fs *Datastore) Get(key datastore.Key) (value []byte, err error) {
}

func (fs *Datastore) Has(key datastore.Key) (exists bool, err error) {
// Can't exist in datastore.
if !keyIsValid(key) {
return false, nil
}

_, path := fs.encode(key)
switch _, err := os.Stat(path); {
case err == nil:
Expand All @@ -605,6 +620,11 @@ func (fs *Datastore) Has(key datastore.Key) (exists bool, err error) {
}

func (fs *Datastore) GetSize(key datastore.Key) (size int, err error) {
// Can't exist in datastore.
if !keyIsValid(key) {
return -1, datastore.ErrNotFound
}

_, path := fs.encode(key)
switch s, err := os.Stat(path); {
case err == nil:
Expand All @@ -620,6 +640,11 @@ func (fs *Datastore) GetSize(key datastore.Key) (size int, err error) {
// the Put() explanation about the handling of concurrent write
// operations to the same key.
func (fs *Datastore) Delete(key datastore.Key) error {
// Can't exist in datastore.
if !keyIsValid(key) {
return nil
}

fs.shutdownLock.RLock()
defer fs.shutdownLock.RUnlock()
if fs.shutdown {
Expand Down Expand Up @@ -654,17 +679,16 @@ func (fs *Datastore) doDelete(key datastore.Key) error {

func (fs *Datastore) Query(q query.Query) (query.Results, error) {
prefix := datastore.NewKey(q.Prefix).String()
if (prefix != "/") ||
len(q.Filters) > 0 ||
len(q.Orders) > 0 ||
q.Limit > 0 ||
q.Offset > 0 ||
!q.KeysOnly ||
q.ReturnExpirations ||
q.ReturnsSizes {
// TODO this is overly simplistic, but the only caller is
// `ipfs refs local` for now, and this gets us moving.
return nil, errors.New("flatfs only supports listing all keys in random order")
if prefix != "/" {
// This datastore can't include keys with multiple components.
// Therefore, it's always correct to return an empty result when
// the user requests a filter by prefix.
log.Warnw(
"flatfs was queried with a key prefix but flatfs only supports keys at the root",
"prefix", q.Prefix,
"query", q,
)
return query.ResultsWithEntries(q, nil), nil
}

// Replicates the logic in ResultsWithChan but actually respects calls
Expand All @@ -682,7 +706,9 @@ func (fs *Datastore) Query(q query.Query) (query.Results, error) {
})
go b.Process.CloseAfterChildren() //nolint

return b.Results(), nil
// We don't apply _any_ of the query logic ourselves so we'll leave it
// all up to the naive query engine.
return query.NaiveQueryApply(q, b.Results()), nil
}

func (fs *Datastore) walkTopLevel(path string, result *query.ResultBuilder) error {
Expand Down Expand Up @@ -893,7 +919,7 @@ func (fs *Datastore) checkpointLoop() {
if !more { // shutting down
fs.writeDiskUsageFile(du, true)
if fs.dirty {
log.Errorf("could not store final value of disk usage to file, future estimates may be inaccurate")
log.Error("could not store final value of disk usage to file, future estimates may be inaccurate")
}
return
}
Expand Down Expand Up @@ -925,7 +951,7 @@ func (fs *Datastore) checkpointLoop() {
func (fs *Datastore) writeDiskUsageFile(du int64, doSync bool) {
tmp, err := ioutil.TempFile(fs.path, "du-")
if err != nil {
log.Warningf("cound not write disk usage: %v", err)
log.Warnw("could not write disk usage", "error", err)
return
}

Expand All @@ -941,24 +967,24 @@ func (fs *Datastore) writeDiskUsageFile(du int64, doSync bool) {
toWrite.DiskUsage = du
encoder := json.NewEncoder(tmp)
if err := encoder.Encode(&toWrite); err != nil {
log.Warningf("cound not write disk usage: %v", err)
log.Warnw("cound not write disk usage", "error", err)
return
}

if doSync {
if err := tmp.Sync(); err != nil {
log.Warningf("cound not sync %s: %v", DiskUsageFile, err)
log.Warnw("cound not sync", "error", err, "file", DiskUsageFile)
return
}
}

if err := tmp.Close(); err != nil {
log.Warningf("cound not write disk usage: %v", err)
log.Warnw("cound not write disk usage", "error", err)
return
}

if err := os.Rename(tmp.Name(), filepath.Join(fs.path, DiskUsageFile)); err != nil {
log.Warningf("cound not write disk usage: %v", err)
log.Warnw("cound not write disk usage", "error", err)
return
}
removed = true
Expand Down Expand Up @@ -1006,7 +1032,7 @@ func (fs *Datastore) Accuracy() string {
return string(fs.storedValue.Accuracy)
}

func (fs *Datastore) walk(path string, result *query.ResultBuilder) error {
func (fs *Datastore) walk(path string, qrb *query.ResultBuilder) error {
dir, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
Expand Down Expand Up @@ -1038,17 +1064,35 @@ func (fs *Datastore) walk(path string, result *query.ResultBuilder) error {

key, ok := fs.decode(fn)
if !ok {
log.Warningf("failed to decode flatfs entry: %s", fn)
log.Warnw("failed to decode flatfs entry", "file", fn)
continue
}

var result query.Result
result.Key = key.String()
if !qrb.Query.KeysOnly {
value, err := ioutil.ReadFile(filepath.Join(path, fn))
if err != nil {
result.Error = err
} else {
// NOTE: Don't set the value/size on error. We
// don't want to return partial values.
result.Value = value
result.Size = len(value)
}
} else if qrb.Query.ReturnsSizes {
var stat os.FileInfo
stat, err := os.Stat(filepath.Join(path, fn))
if err != nil {
result.Error = err
} else {
result.Size = int(stat.Size())
}
}

select {
case result.Output <- query.Result{
Entry: query.Entry{
Key: key.String(),
},
}:
case <-result.Process.Closing():
case qrb.Output <- result:
case <-qrb.Process.Closing():
return nil
}
}
Expand Down Expand Up @@ -1090,12 +1134,17 @@ func (fs *Datastore) Batch() (datastore.Batch, error) {
}

func (bt *flatfsBatch) Put(key datastore.Key, val []byte) error {
if !keyIsValid(key) {
return fmt.Errorf("when putting '%q': %w", key, ErrInvalidKey)
}
bt.puts[key] = val
return nil
}

func (bt *flatfsBatch) Delete(key datastore.Key) error {
bt.deletes[key] = struct{}{}
if keyIsValid(key) {
bt.deletes[key] = struct{}{}
} // otherwise, delete is a no-op anyways.
return nil
}

Expand Down
Loading

0 comments on commit a348e6e

Please sign in to comment.