Skip to content

Commit

Permalink
将mvcc单独分离出来,实现kv引擎的创建表和获取表
Browse files Browse the repository at this point in the history
  • Loading branch information
wanglufee committed Oct 27, 2024
1 parent cd34b10 commit d70ff42
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 55 deletions.
17 changes: 17 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use std::sync::PoisonError;

use bincode::ErrorKind;



pub type Result<T> = std::result::Result<T,Error>;

#[derive(Debug, Clone, PartialEq)]
pub enum Error {
Parse(String),
Internel(String),
}

impl From<std::num::ParseIntError> for Error {
Expand All @@ -17,4 +22,16 @@ impl From<std::num::ParseFloatError> for Error {
fn from(value: std::num::ParseFloatError) -> Self {
Error::Parse(value.to_string())
}
}

impl<T> From<PoisonError<T>> for Error {
fn from(value: PoisonError<T>) -> Self {
Error::Internel(value.to_string())
}
}

impl From<Box<ErrorKind>> for Error {
fn from(value: Box<ErrorKind>) -> Self {
Error::Internel(value.to_string())
}
}
58 changes: 41 additions & 17 deletions src/sql/engine/kv.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,42 @@
use crate::{error::Result, storage};
use serde::{Deserialize, Serialize};

use crate::{error::{Error, Result}, sql::{schema::Table, types::Row}, storage::{self, engine::Engine as StorageEngein}};

use super::{Engine, Transaction};

// kv Engine 定义,是对存储引擎的 MVCC 的封装
pub struct KVEngine{
pub kv : storage::Mvcc,
pub struct KVEngine<E : StorageEngein>{
pub kv : storage::mvcc::Mvcc<E>,
}

impl Clone for KVEngine {
impl<E : StorageEngein> Clone for KVEngine<E> {
fn clone(&self) -> Self {
Self { kv: self.kv.clone() }
}
}

impl Engine for KVEngine {
type Transaction = KVTransaction;
impl<E : StorageEngein> Engine for KVEngine<E> {
type Transaction = KVTransaction<E>;

fn begin(&self) -> Result<Self::Transaction> {
Ok(Self::Transaction::new(self.kv.begin()?))
}
}

// KV Transaction 定义,实际是对存储引擎 MVCCTransaction 的封装
pub struct KVTransaction {
txn: storage::MvccTransaction,
pub struct KVTransaction<E : StorageEngein> {
txn: storage::mvcc::MvccTransaction<E>,
}

impl KVTransaction {
pub fn new(txn : storage::MvccTransaction) -> Self {
impl<E : StorageEngein> KVTransaction<E> {
pub fn new(txn : storage::mvcc::MvccTransaction<E>) -> Self {
Self {
txn
}
}
}

impl Transaction for KVTransaction {
impl<E : StorageEngein> Transaction for KVTransaction<E> {
fn commit(&self) -> Result<()> {
todo!()
}
Expand All @@ -43,19 +45,41 @@ impl Transaction for KVTransaction {
todo!()
}

fn create_row(&mut self, table: String, row: crate::sql::types::Row) -> Result<()> {
fn create_row(&mut self, table: String, row: Row) -> Result<()> {
todo!()
}

fn scan_table(&self, table_name: String) -> Result<crate::sql::types::Row> {
fn scan_table(&self, table_name: String) -> Result<Row> {
todo!()
}

fn create_table(&mut self, table: crate::sql::schema::Table) -> Result<()> {
todo!()
// 创建表,此处去调用底层存储引擎的接口
fn create_table(&mut self, table: Table) -> Result<()> {
// 判断表是否已经存在
if self.get_table(table.name.clone())?.is_some() {
return Err(Error::Internel(format!("table {} already exists",table.name)));
}
// 判断表的有效性
if table.columns.is_empty() {
return Err(Error::Internel(format!("table {} has no columns",table.name)));
}
// 将表名序列化作为键,将整张表序列化作为值
let key = Key::Table(table.name.clone());
let value = bincode::serialize(&table)?;
self.txn.set(bincode::serialize(&key)?, value)?;
Ok(())
}

fn get_table(&self, table_name: String) -> Result<Option<crate::sql::schema::Table>> {
todo!()
fn get_table(&self, table_name: String) -> Result<Option<Table>> {
let key = Key::Table(table_name);
Ok(self.txn.get(bincode::serialize(&key)?)?
.map(|v| bincode::deserialize(&v))
.transpose()?)
}
}

#[derive(Debug, Serialize, Deserialize)]
enum Key {
Table(String),
Row(String,String),
}
2 changes: 1 addition & 1 deletion src/sql/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod query;

// 执行其trait
pub trait Executor<T: Transaction> {
fn execute(&self, txn: &mut T) -> Result<ResultSet>;
fn execute(self: Box<Self>, txn: &mut T) -> Result<ResultSet>;
}

impl<T: Transaction> dyn Executor<T> {
Expand Down
2 changes: 1 addition & 1 deletion src/sql/executor/mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl Insert {


impl<T: Transaction> Executor<T> for Insert {
fn execute(&self, txn: &mut T) -> Result<super::ResultSet> {
fn execute(self: Box<Self>, txn: &mut T) -> Result<super::ResultSet> {
todo!()
}
}
2 changes: 1 addition & 1 deletion src/sql/executor/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl Scan {
}

impl<T: Transaction> Executor<T> for Scan {
fn execute(&self, txn: &mut T) -> Result<super::ResultSet> {
fn execute(self: Box<Self>, txn: &mut T) -> Result<super::ResultSet> {
todo!()
}
}
8 changes: 5 additions & 3 deletions src/sql/executor/schema.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{error::Result, sql::{engine::Transaction, schema::{self, Table}}};

use super::Executor;
use super::{Executor, ResultSet};

// 创建表
pub struct CreateTable {
Expand All @@ -15,7 +15,9 @@ impl CreateTable {
}

impl<T: Transaction> Executor<T> for CreateTable {
fn execute(&self, txn: &mut T) -> Result<super::ResultSet> {
todo!()
fn execute(self: Box<Self>, txn: &mut T) -> Result<ResultSet> {
let table_name = self.schema.name.clone();
txn.create_table(self.schema);
Ok(ResultSet::CreateTable { table_name })
}
}
34 changes: 2 additions & 32 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,5 @@
use crate::error::Result;


pub mod engine;
pub mod memory;

pub struct Mvcc{

}

impl Clone for Mvcc {
fn clone(&self) -> Self {
Self { }
}
}

impl Mvcc {
pub fn new() -> Self {
Self {}
}

pub fn begin(&self) -> Result<MvccTransaction> {
Ok(MvccTransaction::new())
}
}


pub struct MvccTransaction {

}

impl MvccTransaction {
pub fn new() -> Self{
Self { }
}
}
pub mod mvcc;
72 changes: 72 additions & 0 deletions src/storage/mvcc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::sync::{Arc, Mutex};

use crate::error::Result;

use super::engine::Engine;

pub struct Mvcc<E : Engine>{
engine: Arc<Mutex<E>>,
}

impl<E : Engine> Clone for Mvcc<E> {
fn clone(&self) -> Self {
Self { engine: self.engine.clone() }
}
}

impl<E : Engine> Mvcc<E> {
pub fn new(eng: E) -> Self {
Self { engine:Arc::new(Mutex::new(eng)) }
}

pub fn begin(&self) -> Result<MvccTransaction<E>> {
Ok(MvccTransaction::begin(self.engine.clone()))
}
}


pub struct MvccTransaction<E : Engine> {
engine: Arc<Mutex<E>>
}

impl<E : Engine> MvccTransaction<E> {
pub fn begin(eng: Arc<Mutex<E>>) -> Self{
Self { engine: eng }
}

pub fn commit(&self) -> Result<()> {
Ok(())
}

pub fn rollback(&self) -> Result<()> {
Ok(())
}

// 插入数据
pub fn set(&self,key:Vec<u8>,value:Vec<u8>) -> Result<()> {
let mut eng = self.engine.lock()?;
eng.set(key, value)
}

// 获取数据
pub fn get(&self,key:Vec<u8>) -> Result<Option<Vec<u8>>> {
let mut eng = self.engine.lock()?;
eng.get(key)
}


pub fn scan_prefix(&self,prefix: Vec<u8>) -> Result<Vec<ScanResult>> {
let mut eng = self.engine.lock()?;
let mut iter = eng.scan_prefix(prefix);
let mut v = Vec::new();
while let Some((key,value)) = iter.next().transpose()? {
v.push(ScanResult{key,value});
}
Ok(v)
}
}

pub struct ScanResult {
key: Vec<u8>,
value: Vec<u8>,
}

0 comments on commit d70ff42

Please sign in to comment.