Skip to content

Commit

Permalink
[FDB BlobHandler] Update read and remove.
Browse files Browse the repository at this point in the history
This patch updates `read` and `remove` to take in account the size of the
`keys` and `values` for one FDB transaction.
  • Loading branch information
denis0x0D committed Dec 23, 2024
1 parent 50a9f82 commit ad67090
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 58 deletions.
11 changes: 11 additions & 0 deletions utils/fdb_wrapper_cpp/include/fdbcs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,21 @@ class BlobHandler
}

// Writes the given `blob` with given `key`.
// The semantic of this `write` is not atomic, it splits the data into multiple fdb transactions, if one of
// this transaction fails, we should call `removeBlob` to clear data which were written partially, and then
// try to `writeBlob` again.
bool writeBlob(std::shared_ptr<FDBCS::FDBDataBase> database, const ByteArray& key, const ByteArray& blob);

// Reads `blob` by the given `key`, on error returns false.
std::pair<bool, std::string> readBlob(std::shared_ptr<FDBCS::FDBDataBase> database, const ByteArray& key);

std::pair<bool, std::vector<Block>> readBlocks(std::shared_ptr<FDBCS::FDBDataBase> database,
const std::vector<ByteArray>& keys, uint32_t& index,
bool& dataBlockReached);

// Removes a `blob` by the given `key`, on error returns false.
// The semantic of this `remove` is not atomic, it splits keys to remove into multiple fdb transactions, if
// one of this transaction fails, we should call `removeBlob` again to remove keys, which were not removed.
bool removeBlob(std::shared_ptr<FDBCS::FDBDataBase> database, const ByteArray& key);

private:
Expand Down
143 changes: 85 additions & 58 deletions utils/fdb_wrapper_cpp/src/fdbcs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ std::pair<bool, ByteArray> Transaction::get(const ByteArray& key) const
return {false, {}};
}

err = fdb_future_get_error(future);
if (err)
{
fdb_future_destroy(future);
std::cerr << "fdb_future_get_error error, code: " << (int)err << std::endl;
return {false, {}};
}

const uint8_t* outValue;
int outValueLength;
fdb_bool_t present;
Expand All @@ -85,13 +93,9 @@ std::pair<bool, ByteArray> Transaction::get(const ByteArray& key) const

fdb_future_destroy(future);
if (present)
{
return {true, ByteArray(outValue, outValue + outValueLength)};
}
else
{
return {false, {}};
}
}
return {false, {}};
}
Expand Down Expand Up @@ -406,66 +410,82 @@ bool BlobHandler::isDataBlock(const Block& block)
return block.second.compare(0, keyBlockIdentifier.size(), keyBlockIdentifier) != 0;
}

std::pair<bool, std::vector<Block>> BlobHandler::readBlocks(std::shared_ptr<FDBCS::FDBDataBase> database,
const std::vector<ByteArray>& keys,
uint32_t& index, bool& dataBlockReached)
{
auto tnx = database->createTransaction();
if (!tnx)
return {false, {}};

size_t currentTnxSize = 0;
std::vector<Block> blocks;
// Take in account the size of the data that was read.
while ((index < keys.size()) && (currentTnxSize + blockSizeInBytes_ < maxTnxSize_))
{
const auto& key = keys[index];
auto p = tnx->get(key);
if (!p.first)
return {false, {}};
currentTnxSize += key.size() + p.second.size();

Block block{0, p.second};
if (!dataBlockReached && isDataBlock(block))
dataBlockReached = true;

blocks.push_back(block);
++index;
}

return {true, blocks};
}

std::pair<bool, std::string> BlobHandler::readBlob(std::shared_ptr<FDBCS::FDBDataBase> database,
const ByteArray& key)
{
Keys currentKeys{key};
bool dataBlockReached = false;
std::string blob;

while (!dataBlockReached)
{
auto tnx = database->createTransaction();
if (!tnx)
return {false, ""};

std::vector<Block> blocks;
for (const auto& key : currentKeys)
blocks.reserve(currentKeys.size());

uint32_t index = 0;
while (index < currentKeys.size())
{
auto p = tnx->get(key);
const auto p = readBlocks(database, currentKeys, index, dataBlockReached);
if (!p.first)
return {false, ""};
blocks.insert(blocks.end(), p.second.begin(), p.second.end());
}

Block block{0, p.second};
if (isDataBlock(block))
if (!dataBlockReached) [[likely]]
{
Keys nextKeys;
for (const auto& block : blocks)
{
dataBlockReached = true;
break;
auto keysPair = getKeysFromBlock(block);
if (!keysPair.first)
return {false, ""};

auto& keys = keysPair.second;
nextKeys.insert(nextKeys.end(), keys.begin(), keys.end());
}
blocks.push_back(block);
currentKeys = std::move(nextKeys);
}

if (dataBlockReached)
break;

Keys nextKeys;
for (const auto& block : blocks)
else
{
auto keysPair = getKeysFromBlock(block);
if (!keysPair.first)
return {false, ""};

auto& keys = keysPair.second;
nextKeys.insert(nextKeys.end(), keys.begin(), keys.end());
blob.reserve(blocks.size() * dataBlockSizeInBytes_);
for (const auto& dataBlock : blocks)
{
if (!dataBlock.second.size())
return {false, ""};
blob.insert(blob.end(), dataBlock.second.begin() + dataBlockIdentifier.size(),
dataBlock.second.end());
}
}
currentKeys = std::move(nextKeys);
}

std::string blob;
for (const auto& key : currentKeys)
{
auto tnx = database->createTransaction();
if (!tnx)
return {false, ""};

auto resultPair = tnx->get(key);
if (!resultPair.first)
return {false, ""};

auto& dataBlock = resultPair.second;
if (!dataBlock.size())
return {false, ""};

blob.insert(blob.end(), dataBlock.begin() + dataBlockIdentifier.size(), dataBlock.end());
}

return {true, blob};
Expand All @@ -488,29 +508,30 @@ bool BlobHandler::removeKeys(std::shared_ptr<FDBCS::FDBDataBase> database, const
bool BlobHandler::removeBlob(std::shared_ptr<FDBCS::FDBDataBase> database, const Key& key)
{
std::unordered_map<uint32_t, Keys> treeLevel;
auto tnx = database->createTransaction();
if (!tnx)
return false;

uint32_t currentLevel = 0;
int32_t currentLevel = 0;
treeLevel[0] = {key};
bool dataBlockReached = false;

while (true)
{
const auto& currentKeys = treeLevel[currentLevel];
std::vector<Block> blocks;
for (const auto& key : currentKeys)
blocks.reserve(currentKeys.size());

uint32_t index = 0;
while (index < currentKeys.size())
{
auto p = tnx->get(key);
const auto p = readBlocks(database, currentKeys, index, dataBlockReached);
if (!p.first)
return false;
blocks.push_back({0, p.second});
blocks.insert(blocks.end(), p.second.begin(), p.second.end());
}

if (isDataBlock(blocks.front()) || !currentKeys.size())
if (dataBlockReached)
break;

Keys nextKeys;
nextKeys.reserve(blocks.size() * numKeysInBlock_);
for (const auto& block : blocks)
{
auto keysPair = getKeysFromBlock(block);
Expand All @@ -525,8 +546,14 @@ bool BlobHandler::removeBlob(std::shared_ptr<FDBCS::FDBDataBase> database, const
treeLevel[currentLevel] = std::move(nextKeys);
}

for (uint32_t level = 0; level <= currentLevel; ++level)
RETURN_ON_ERROR(removeKeys(database, treeLevel[level]));
// Start to remove keys from the bottom of the tree.
// If we fail in the middle of removing operation, we can try to remove a unremoved data again, because a
// tree is still in a valid state, even if the bottom levels are removed.
while (currentLevel >= 0)
{
RETURN_ON_ERROR(removeKeys(database, treeLevel[currentLevel]));
--currentLevel;
}

return true;
}
Expand All @@ -536,4 +563,4 @@ bool setAPIVersion()
auto err = fdb_select_api_version(FDB_API_VERSION);
return err ? false : true;
}
} // namespace FDBCS
} // namespace FDBCS

0 comments on commit ad67090

Please sign in to comment.