Skip to content

Commit

Permalink
实现查询和插入操作,完善整个流程
Browse files Browse the repository at this point in the history
  • Loading branch information
wanglufee committed Nov 3, 2024
1 parent d70ff42 commit 7721962
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 22 deletions.
80 changes: 72 additions & 8 deletions src/sql/engine/kv.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};

use crate::{error::{Error, Result}, sql::{schema::Table, types::Row}, storage::{self, engine::Engine as StorageEngein}};
use crate::{error::{Error, Result}, sql::{schema::Table, types::{Row, Value}}, storage::{self, engine::Engine as StorageEngein}};

use super::{Engine, Transaction};

Expand All @@ -9,6 +9,14 @@ pub struct KVEngine<E : StorageEngein>{
pub kv : storage::mvcc::Mvcc<E>,
}

impl<E: StorageEngein> KVEngine<E> {
pub fn new(engine: E) -> Self{
Self{
kv: storage::mvcc::Mvcc::new(engine)
}
}
}

impl<E : StorageEngein> Clone for KVEngine<E> {
fn clone(&self) -> Self {
Self { kv: self.kv.clone() }
Expand Down Expand Up @@ -38,19 +46,47 @@ impl<E : StorageEngein> KVTransaction<E> {

impl<E : StorageEngein> Transaction for KVTransaction<E> {
fn commit(&self) -> Result<()> {
todo!()
Ok(())
}

fn rollback(&self) -> Result<()> {
todo!()
Ok(())
}

fn create_row(&mut self, table: String, row: Row) -> Result<()> {
todo!()
fn create_row(&mut self, table_name: String, row: Row) -> Result<()> {
let table = self.must_get_table(table_name.clone())?;
// 检查类型有效性
for (i,col) in table.columns.iter().enumerate() {
match row[i].datatype() {
None if col.nullable => {},
None => return Err(Error::Internel(format!("column {} cannot be null",col.name))),
Some(dt) => {
if dt != col.datatype {
return Err(Error::Internel(format!("column {} type mismatched",col.name)));
}
},
}
}

// 存放数据
// 暂时以第一列作为主键
let id = Key::Row(table_name, row[0].clone());
let value = bincode::serialize(&row)?;
self.txn.set(bincode::serialize(&id)?, value)?;

Ok(())
}

fn scan_table(&self, table_name: String) -> Result<Row> {
todo!()
fn scan_table(&self, table_name: String) -> Result<Vec<Row>> {
let perfix = KeyPerfix::Row(table_name.clone());
let results = self.txn.scan_prefix(bincode::serialize(&perfix)?)?;

let mut rows = Vec::new();
for result in results {
let row: Row = bincode::deserialize(&result.value)?;
rows.push(row);
}
Ok(rows)
}

// 创建表,此处去调用底层存储引擎的接口
Expand Down Expand Up @@ -81,5 +117,33 @@ impl<E : StorageEngein> Transaction for KVTransaction<E> {
#[derive(Debug, Serialize, Deserialize)]
enum Key {
Table(String),
Row(String,String),
Row(String,Value),
}

#[derive(Debug, Serialize, Deserialize)]
enum KeyPerfix {
Table,
Row(String),
}


mod tests {
use crate::{error::Result, sql::engine::Engine, storage::memory::MemoryEngine};

use super::KVEngine;

#[test]
fn test_create_table() -> Result<()> {
let kvengine = KVEngine::new(MemoryEngine::new());
let mut s = kvengine.session()?;

s.execute("create table t1 (a int, b text default 'vv', c integer default 100);")?;
s.execute("insert into t1 values(1, 'a', 1);")?;
s.execute("insert into t1 values(2, 'b');")?;
s.execute("insert into t1(c, a) values(200, 3);")?;

s.execute("select * from t1;")?;

Ok(())
}
}
13 changes: 10 additions & 3 deletions src/sql/engine/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

use crate::error::Result;
use crate::error::{Error, Result};

use super::{executor::ResultSet, parser::Parser, plan::Plan, schema::Table, types::Row};

Expand Down Expand Up @@ -29,16 +29,23 @@ pub trait Transaction {
fn rollback(&self) -> Result<()>;

// 创建行
fn create_row(&mut self, table: String, row: Row) -> Result<()>;
fn create_row(&mut self, table_name: String, row: Row) -> Result<()>;

// 扫描表
fn scan_table(&self, table_name: String) -> Result<Row>;
fn scan_table(&self, table_name: String) -> Result<Vec<Row>>;

// DDL相关操作
fn create_table(&mut self, table: Table) -> Result<()>;

// 获取表信息
fn get_table(&self, table_name: String) -> Result<Option<Table>>;

// 必须拿到表名
fn must_get_table(&self, table_name: String) -> Result<Table> {
self.get_table(table_name.clone())?.ok_or(Error::Internel(
format!("table {} dose not exist!", table_name)
))
}
}

// 客户端 session 定义
Expand Down
2 changes: 1 addition & 1 deletion src/sql/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ pub enum ResultSet {
},
Scan {
columns: Vec<String>,
row: Vec<Row>
rows: Vec<Row>
}
}
75 changes: 71 additions & 4 deletions src/sql/executor/mutation.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::{error::Result, sql::{engine::Transaction, parser::ast::Expression}};
use std::collections::HashMap;

use super::Executor;
use crate::{error::{Error, Result}, sql::{engine::Transaction, parser::ast::Expression, schema::Table, types::{Row, Value}}};

use super::{Executor, ResultSet};

// 插入数据
pub struct Insert {
Expand All @@ -21,9 +23,74 @@ impl Insert {
}
}

// 对列进行对齐
// insert into tab values(1,2,3);
// 列有 a b c d
// 值有 1 2 3
// 那么需要给 d 列进行对齐
fn pad_row(table: &Table, row: &Row) -> Result<Row> {
let mut result = row.clone();
// 跳过以指定值的部分
for column in table.columns.iter().skip(row.len()) {
if let Some(default) = column.default.clone() {
result.push(default);
} else {
return Err(Error::Internel(format!("No default value for column {}!",column.name)));
}
}
Ok(result)
}

// 对列进行对齐
// insert into tab(d,c) values(2,3);
// 列有 a b c d
// 值有 default default 2 3
fn make_row(table: &Table, column: &Vec<String>, row: &Row) -> Result<Row> {
// 现判断指定的列和给定的值个数是否匹配
if column.len() != row.len() {
return Err(Error::Internel(format!("columns and values num mismatch")));
}
// 构造 hashmap 来保存制定的列和值
let mut input = HashMap::new();
for (i,col) in column.iter().enumerate() {
input.insert(col, row[i].clone());
}

let mut result = Vec::new();
for col in table.columns.iter() {
if let Some(value) = input.get(&col.name) {
result.push(value.clone());
} else if let Some(value) = col.default.clone() {
result.push(value);
} else {
return Err(Error::Internel(format!("No value given for the column {}",col.name)));
}
}
Ok(result)
}


impl<T: Transaction> Executor<T> for Insert {
fn execute(self: Box<Self>, txn: &mut T) -> Result<super::ResultSet> {
todo!()
fn execute(self: Box<Self>, txn: &mut T) -> Result<ResultSet> {
// 插入值时现取出表信息
let table = txn.must_get_table(self.table_name.clone())?;
let mut count = 0;
// 将表达式转换为值类型
for exprs in self.values {
let row = exprs.into_iter().map(|e| Value::from_expression(e)).collect::<Vec<_>>();
// 如果未指定列值
let insert_row = if self.columns.is_empty() {
pad_row(&table, &row)?
} else {
// 制定了插入的列
make_row(&table, &self.columns, &row)?
};

txn.create_row(self.table_name.clone(), insert_row)?;
count += 1;
}

Ok(ResultSet::Insert { count })

}
}
11 changes: 8 additions & 3 deletions src/sql/executor/query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{error::Result, sql::engine::Transaction};

use super::Executor;
use super::{Executor, ResultSet};

pub struct Scan {
table_name: String,
Expand All @@ -13,7 +13,12 @@ impl Scan {
}

impl<T: Transaction> Executor<T> for Scan {
fn execute(self: Box<Self>, txn: &mut T) -> Result<super::ResultSet> {
todo!()
fn execute(self: Box<Self>, txn: &mut T) -> Result<ResultSet> {
let table = txn.must_get_table(self.table_name.clone())?;
let rows = txn.scan_table(self.table_name)?;
Ok(ResultSet::Scan {
columns: table.columns.into_iter().map(|c| c.name).collect(),
rows
})
}
}
2 changes: 1 addition & 1 deletion src/sql/executor/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl CreateTable {
impl<T: Transaction> Executor<T> for CreateTable {
fn execute(self: Box<Self>, txn: &mut T) -> Result<ResultSet> {
let table_name = self.schema.name.clone();
txn.create_table(self.schema);
txn.create_table(self.schema)?;
Ok(ResultSet::CreateTable { table_name })
}
}
10 changes: 10 additions & 0 deletions src/sql/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ impl Value {
Expression::Consts(Consts::String(s)) => Self::String(s),
}
}

pub fn datatype(&self) -> Option<DataType>{
match self {
Value::Null => None,
Value::Boolean(_) => Some(DataType::Boolean),
Value::Integer(_) => Some(DataType::Integer),
Value::Float(_) => Some(DataType::Float),
Value::String(_) => Some(DataType::String),
}
}
}

pub type Row = Vec<Value>;
5 changes: 3 additions & 2 deletions src/storage/mvcc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ impl<E : Engine> MvccTransaction<E> {
}
}

#[derive(Debug)]
pub struct ScanResult {
key: Vec<u8>,
value: Vec<u8>,
pub key: Vec<u8>,
pub value: Vec<u8>,
}

0 comments on commit 7721962

Please sign in to comment.