Skip to content

Commit

Permalink
fix mods file metrics & fix the issue of missing mods with concurrent…
Browse files Browse the repository at this point in the history
… deletion and compaction (#14765)

* fix mods file metrics

* add lock for ModificationFile.remove

* spotless
  • Loading branch information
shuwenwei authored Jan 24, 2025
1 parent 8c62821 commit e56b34f
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,12 @@ private void recover() throws DataRegionException {
resource.getTsFile().length(),
true,
resource.getTsFile().getName());
resource.upgradeModFile(upgradeModFileThreadPool);
if (ModificationFile.getExclusiveMods(resource.getTsFile()).exists()) {
// update mods file metrics
resource.getExclusiveModFile();
} else {
resource.upgradeModFile(upgradeModFileThreadPool);
}
}
}
while (!value.isEmpty()) {
Expand Down Expand Up @@ -530,7 +535,12 @@ private void recover() throws DataRegionException {
false,
resource.getTsFile().getName());
}
resource.upgradeModFile(upgradeModFileThreadPool);
if (ModificationFile.getExclusiveMods(resource.getTsFile()).exists()) {
// update mods file metrics
resource.getExclusiveModFile();
} else {
resource.upgradeModFile(upgradeModFileThreadPool);
}
}
while (!value.isEmpty()) {
TsFileResource tsFileResource = value.get(value.size() - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public static void combineModsInCrossCompaction(
Set<ModEntry> modifications = new HashSet<>();
// get compaction mods from all source unseq files
for (TsFileResource unseqFile : unseqResources) {
modifications.addAll(ModificationFile.getCompactionMods(unseqFile).getAllMods());
modifications.addAll(ModificationFile.readAllCompactionModifications(unseqFile.getTsFile()));
}

// write target mods file
Expand All @@ -158,7 +158,8 @@ public static void combineModsInCrossCompaction(
continue;
}
Set<ModEntry> seqModifications =
new HashSet<>(ModificationFile.getCompactionMods(seqResources.get(i)).getAllMods());
new HashSet<>(
ModificationFile.readAllCompactionModifications(seqResources.get(i).getTsFile()));
modifications.addAll(seqModifications);
updateOneTargetMods(targetResource, modifications);
modifications.removeAll(seqModifications);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class ModificationFile implements AutoCloseable {
private boolean hasCompacted = false;
private boolean fileExists = false;
private final boolean updateMetrics;
private boolean removed = false;

private Set<ModificationFile> cascadeFiles = null;

Expand All @@ -89,19 +90,29 @@ public ModificationFile(File file, boolean updateModMetrics) {
}
}

public void writeLock() {
this.lock.writeLock().lock();
}

public void writeUnlock() {
this.lock.writeLock().unlock();
}

@SuppressWarnings("java:S2093") // cannot use try-with-resource, should not close here
public void write(ModEntry entry) throws IOException {
int updateFileNum = 0;
lock.writeLock().lock();
long size = 0;
try {
if (fileOutputStream == null) {
fileOutputStream =
new BufferedOutputStream(Files.newOutputStream(file.toPath(), CREATE, APPEND));
channel = FileChannel.open(file.toPath(), CREATE, APPEND);
if (!removed) {
if (fileOutputStream == null) {
fileOutputStream =
new BufferedOutputStream(Files.newOutputStream(file.toPath(), CREATE, APPEND));
channel = FileChannel.open(file.toPath(), CREATE, APPEND);
}
size += entry.serialize(fileOutputStream);
fileOutputStream.flush();
}
size += entry.serialize(fileOutputStream);
fileOutputStream.flush();

if (cascadeFiles != null) {
for (ModificationFile cascadeFile : cascadeFiles) {
Expand All @@ -124,15 +135,17 @@ public void write(Collection<? extends ModEntry> entries) throws IOException {
lock.writeLock().lock();
long size = 0;
try {
if (fileOutputStream == null) {
fileOutputStream =
new BufferedOutputStream(Files.newOutputStream(file.toPath(), CREATE, APPEND));
channel = FileChannel.open(file.toPath(), CREATE, APPEND);
}
for (ModEntry entry : entries) {
size += entry.serialize(fileOutputStream);
if (!removed) {
if (fileOutputStream == null) {
fileOutputStream =
new BufferedOutputStream(Files.newOutputStream(file.toPath(), CREATE, APPEND));
channel = FileChannel.open(file.toPath(), CREATE, APPEND);
}
for (ModEntry entry : entries) {
size += entry.serialize(fileOutputStream);
}
fileOutputStream.flush();
}
fileOutputStream.flush();

if (cascadeFiles != null) {
for (ModificationFile cascadeFile : cascadeFiles) {
Expand All @@ -150,7 +163,7 @@ public void write(Collection<? extends ModEntry> entries) throws IOException {
}

private void updateModFileMetric(int num, long size) {
if (updateMetrics) {
if (!removed && updateMetrics) {
FileMetrics.getInstance().increaseModFileNum(num);
FileMetrics.getInstance().increaseModFileSize(size);
}
Expand Down Expand Up @@ -214,6 +227,16 @@ public static long[] parseFileName(String name) {
return new long[] {levelNum, modNum};
}

public static List<ModEntry> readAllCompactionModifications(File tsfile) throws IOException {
try (ModificationFile modificationFile =
new ModificationFile(ModificationFile.getCompactionMods(tsfile), false)) {
if (modificationFile.exists()) {
return modificationFile.getAllMods();
}
}
return Collections.emptyList();
}

public static List<ModEntry> readAllModifications(
File tsfile, boolean readOldModFileIfNewModFileNotExists) throws IOException {
try (ModificationFile modificationFile =
Expand Down Expand Up @@ -313,12 +336,18 @@ public boolean exists() {
}

public void remove() throws IOException {
close();
FileUtils.deleteFileOrDirectory(file);
if (fileExists) {
updateModFileMetric(-1, -getFileLength());
lock.writeLock().lock();
try {
close();
FileUtils.deleteFileOrDirectory(file);
if (fileExists) {
updateModFileMetric(-1, -getFileLength());
}
fileExists = false;
removed = true;
} finally {
lock.writeLock().unlock();
}
fileExists = false;
}

public static ModificationFile getExclusiveMods(TsFileResource tsFileResource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,13 +391,23 @@ public void link(TsFileResource target) throws IOException {
}

public void linkModFile(TsFileResource target) throws IOException {
if (exclusiveModFileExists()) {
File modsFileForTargetResource = ModificationFile.getExclusiveMods(target.getTsFile());
Files.createLink(
modsFileForTargetResource.toPath(),
ModificationFile.getExclusiveMods(getTsFile()).toPath());
target.setExclusiveModFile(new ModificationFile(modsFileForTargetResource, true));
File targetModsFile = ModificationFile.getExclusiveMods(target.getTsFile());
ModificationFile sourceModFile = this.getExclusiveModFile();
ModificationFile targetModsFileObject;
sourceModFile.writeLock();
try {
if (sourceModFile.exists()) {
Files.createLink(
targetModsFile.toPath(), ModificationFile.getExclusiveMods(getTsFile()).toPath());
targetModsFileObject = new ModificationFile(targetModsFile, true);
} else {
targetModsFileObject = new ModificationFile(targetModsFile, true);
sourceModFile.setCascadeFile(Collections.singleton(targetModsFileObject));
}
} finally {
sourceModFile.writeUnlock();
}
target.setExclusiveModFile(targetModsFileObject);
if (sharedModFileExists()) {
modFileManagement.addReference(target, sharedModFile);
target.setSharedModFile(this.getSharedModFile(), false);
Expand Down Expand Up @@ -770,7 +780,6 @@ public void removeModFile() throws IOException {
if (getExclusiveModFile().exists()) {
getExclusiveModFile().remove();
}
exclusiveModFile = null;

if (getSharedModFile() != null && modFileManagement != null) {
modFileManagement.releaseFor(this, sharedModFile);
Expand Down

0 comments on commit e56b34f

Please sign in to comment.