Skip to content

Commit

Permalink
存储引擎启动和清理
Browse files Browse the repository at this point in the history
  • Loading branch information
wanglufee committed Jan 5, 2025
1 parent 36a48b8 commit fa5ab45
Showing 1 changed file with 125 additions and 3 deletions.
128 changes: 125 additions & 3 deletions src/storage/disk.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{collections::BTreeMap, io::{BufWriter, Read, Seek, SeekFrom, Write}};
use serde::de::value;
use std::{collections::BTreeMap, fs::{File, OpenOptions}, io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write}, path::PathBuf};
use fs4::FileExt;

use crate::error::Result;

Expand All @@ -12,6 +12,46 @@ pub struct DiskEngine{
log: Log,
}

impl DiskEngine {
pub fn new(file_path: PathBuf) -> Result<Self> {
let mut log = Log::new(file_path)?;
let keydir = log.build_keydir()?;
Ok(Self { keydir, log })
}


pub fn new_compact(file_path: PathBuf) -> Result<Self> {
let mut eng = Self::new(file_path)?;
eng.compact()?;
Ok(eng)
}

fn compact(&mut self) -> Result<()> {
// 新建一个临时文件
let mut new_path = self.log.file_path.clone();
new_path.set_extension("compact");
let mut new_log = Log::new(new_path)?;
// 新建一个内存目录
let mut new_keydir = KeyDir::new();
// 遍历原目录并读取对应文件,生成新文件和目录
for (key,(offset,val_size)) in self.keydir.iter() {
let val = self.log.read_value(*offset, *val_size)?;
let (offset, size) =new_log.write_entry(key, Some(&val))?;
new_keydir.insert(key.clone(), (
offset + size as u64 - *val_size as u64, *val_size
));
}
// 将临时文件更名
std::fs::rename(&new_log.file_path, &self.log.file_path)?;
new_log.file_path = self.log.file_path.clone();
// 将新的文件和目录替换调原来的
self.log = new_log;
self.keydir = new_keydir;

Ok(())
}
}

impl super::engine::Engine for DiskEngine {
type EngineIterator<'a> = DiskEngineIterator;

Expand Down Expand Up @@ -75,10 +115,54 @@ impl DoubleEndedIterator for DiskEngineIterator {


pub struct Log {
file_path: PathBuf,
file: std::fs::File
}

impl Log {

fn new(file_path: PathBuf) -> Result<Self> {
// 文件夹不存在,创建文件夹
if let Some(dir) = file_path.parent() {
if !dir.exists() {
std::fs::create_dir_all(&dir)?;
}
}
// 打开文件
let file = OpenOptions::new().create(true).read(true).write(true).open(&file_path)?;

file.try_lock_exclusive()?;
Ok(Self { file_path ,file })
}

fn build_keydir(&mut self) -> Result<KeyDir>{
// 从文件构建内存目录
let mut key_dir = KeyDir::new();
let mut bufreader = BufReader::new(&self.file);
let file_size = self.file.metadata()?.len();
// 从文件头开始
let mut offset: u64 = 0;
loop {
// 如果到文件末尾,退出
if offset >= file_size {
break;
}
// 读取条目
let (key,val_size) = Self::read_entry(&mut bufreader, offset)?;
let key_size = key.len();
// 如果val_size为-1则说明被删除
if val_size == -1 {
key_dir.remove(&key);
offset += key_size as u64 + LOG_HEAD_SIZE as u64;
} else {
key_dir.insert(key, (
offset + LOG_HEAD_SIZE as u64 + key_size as u64 , val_size as u32
));
offset += key_size as u64 + LOG_HEAD_SIZE as u64 + val_size as u64;
}
}
Ok(key_dir)
}

fn write_entry(&mut self,key: &Vec<u8>, value: Option<&Vec<u8>>) -> Result<(u64,u32)> {
// 定位到文件末尾
Expand All @@ -95,7 +179,7 @@ impl Log {
if let Some(v) = value {
writer.write_all(&v)?;
}
writer.flush();
writer.flush()?;
// 返回相对应文件的偏移,和写入的总长度。
Ok((offset, total_size))
}
Expand All @@ -108,4 +192,42 @@ impl Log {
self.file.read_exact(&mut buf)?;
Ok(buf)
}

fn read_entry(bufreader: &mut BufReader<&File>, offset: u64) -> Result<(Vec<u8>,i32)>{
bufreader.seek(SeekFrom::Start(offset))?;
let mut len_buf = [0;4];

// 读取 key 长度
bufreader.read_exact(&mut len_buf)?;
let key_size = u32::from_be_bytes(len_buf);

// 读取 val 长度
bufreader.read_exact(&mut len_buf)?;
let val_size = i32::from_be_bytes(len_buf);

// 读取 key
let mut key = vec![0;key_size as usize];
bufreader.read_exact(&mut key)?;

Ok((key, val_size))
}
}

#[cfg(test)]
mod test{
use std::path::PathBuf;
use crate::error::Result;
use super::DiskEngine;

#[test]
fn test_disk_engine_start() -> Result<()> {
let _ = DiskEngine::new(PathBuf::from("/tmp/sqldp"))?;
Ok(())
}

#[test]
fn test_disk_engine_compact_start() -> Result<()> {
let _ = DiskEngine::new_compact(PathBuf::from("/tmp/sqldp"))?;
Ok(())
}
}

0 comments on commit fa5ab45

Please sign in to comment.