Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Client implementation + benchmark (#2)
Browse files Browse the repository at this point in the history
* drafting storage client

* adding api

* update ignore

* can pass local test read entire table

* fetch remote file work

* impl sync and better errro handling

* set up benchmark workflow

* benchmark can run successfully

* can run bench successfully

* fmtted

* fix env var issue

* fix empty dir issue

* skip remote tests to pass ci

* debugging ci

* fix nonexisting folder on ci

* debugging performance!

* small fix

* fix warnings

* make benchmark to use trait

* remove dead code
  • Loading branch information
ScottLinnn authored Apr 3, 2024
1 parent e3f64c3 commit ec319ac
Show file tree
Hide file tree
Showing 12 changed files with 786 additions and 165 deletions.
9 changes: 8 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,11 @@ Cargo.lock
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb

.vscode
# vscode foler
.vscode/

# local parquet file location for client
client/parquet_files/

bench_files
.vscode
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# 15721-s24-cache2
15-721 Spring 2024 - Cache #2

# LRU Cache Server

This server implements a Least Recently Used (LRU) caching mechanism, providing a simple interface for fetching files from a simulated S3 storage and managing them within an LRU cache. The server is built using Rust and the Rocket framework.
Expand Down Expand Up @@ -85,4 +86,8 @@ root@node1:/data> curl -L http://node2:8000/s3/test1.txt # make sure -L flag is
- **CURL Command**:
```sh
curl -X POST http://localhost:8000/size/<new-size-in-bytes>
```
```

## Benchmark

To run benchmark, simple run `bench.sh`
30 changes: 30 additions & 0 deletions bench.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/bash

echo $HOME

pip install pyarrow pandas

rm -rf bench_files
rm client/parquet_files/*

# Create dir to store benchmark parquet files named "bench_files"
mkdir -p bench_files

# This generates benchmark parquet files
python pqt_gen.py --num-rows 1000000 --num-cols 10 --num-files 4

cp bench_files/* server/tests/test_s3_files/

# When server is not available, we put the files on client side local cache
# cp bench_files/* /client/parquet_files/


cd client

# This build and run client/src/benchmark.rs, check the code for details
cargo build --bin benchmark

export CLIENT_FILES_DIR=$HOME/15721-s24-cache2/client/parquet_files/

RUST_BACKTRACE=1 cargo run --package client --bin benchmark

16 changes: 13 additions & 3 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
[package]
name = "echo_client"
name = "client"
version = "0.1.0"
edition = "2018"
authors = ["Your Name <your.email@example.com>"]
description = "Echo client application"
description = "Client application"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1", features = ["full"] }
clap = "3"
anyhow = "1.0"
parquet = "50.0.0"
arrow = "50.0.0"
log = "0.4"
env_logger = "0.11.1"
thiserror = "1.0"
env_logger = "0.11.1"
async-trait = "0.1"
rocket = { version = "0.5.0", features = ["json"] }
reqwest = { version = "0.11", features = ["stream", "json"] }

[[bin]]
name = "benchmark"
path = "src/benchmark.rs"
107 changes: 107 additions & 0 deletions client/src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use client::client_api::{StorageClient, StorageRequest, TableId};
use client::storage_client::StorageClientImpl;
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::time::Instant;

// This scans the bench_files dir to figure out which test files are present,
// then builds a map of TableId -> filename to init storage client(only when catalog is not available)
// and also generates workload based on table ids. Finally it runs the workload

#[tokio::main]
async fn main() {
// Call the helper function to create the map
let home = std::env::var("HOME").unwrap();
let bench_files_path = format!("{}/15721-s24-cache2/bench_files", home);
let map = create_table_file_map(&bench_files_path).unwrap();
let client = setup_client(map.clone());
let table_ids: Vec<TableId> = map.keys().cloned().collect();
let load = load_gen_allonce(table_ids.clone());
load_run(&client, load).await;
// let skewed_load = load_gen_skewed(table_ids);
// load_run(&client, skewed_load).await;
}

async fn load_run(client: &dyn StorageClient, requests: Vec<StorageRequest>) {
println!("Start running workload");
let start = Instant::now();
for req in requests {
let id = match req {
StorageRequest::Table(id) => id,
_ => panic!("Invalid request type"),
};
println!("Requesting data for table {:?}", id);

let res = client.request_data_sync(req).await;
assert!(res.is_ok());
println!("Received data for table {:?}", id);

// let local_cache_dir = StorageClientImpl::local_cache_path();
// // iterate files in local cache and delete them
// let entries = fs::read_dir(local_cache_dir).unwrap();
// for entry in entries {
// let entry = entry.unwrap();
// let path = entry.path();
// // if file name ends with "parquet"
// if let Some(file_name) = path.file_name() {
// if let Some(name) = file_name.to_str() {
// if name.ends_with("parquet") {
// fs::remove_file(path).unwrap();
// }
// }
// }
// }
}
let duration = start.elapsed();
println!("Time used: {:?}", duration);
}

// Generate a load of requests for all tables at once
fn load_gen_allonce(table_ids: Vec<TableId>) -> Vec<StorageRequest> {
let mut requests = Vec::new();
for table_id in table_ids {
requests.push(StorageRequest::Table(table_id));
}
requests
}

// Generate a load of requests for all tables, but skewed
// This is not always used
#[allow(dead_code)]
fn load_gen_skewed(table_ids: Vec<TableId>) -> Vec<StorageRequest> {
// read a random table id twice, and a random table id zero times
let mut requests = Vec::new();
for table_id in &table_ids {
requests.push(StorageRequest::Table(table_id.clone()));
}
// remove last element
requests.pop();
requests.push(StorageRequest::Table(table_ids[0]));

requests
}

fn setup_client(table_file_map: HashMap<TableId, String>) -> StorageClientImpl {
StorageClientImpl::new_for_test(1, table_file_map)
}

fn create_table_file_map(directory: &str) -> Result<HashMap<TableId, String>, std::io::Error> {
let mut table_file_map: HashMap<TableId, String> = HashMap::new();
let dir = Path::new(directory);

// Read the directory entries
let entries = fs::read_dir(dir)?;

// Iterate over the entries
for (id, entry) in entries.enumerate() {
let entry = entry?;
if entry.path().is_file() {
// If the entry is a file, add it to the map with an incremental ID
let filename = entry.file_name().into_string().unwrap();
table_file_map.insert(id as TableId, filename);
}
}

Ok(table_file_map)
}
38 changes: 38 additions & 0 deletions client/src/client_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use anyhow::Result;
use arrow::record_batch::RecordBatch;
use tokio::sync::mpsc::Receiver;

/// Id types for table, column, and record. Need to be consistent among all components
/// (e.g. execution engine). We don't want to make any type generic here just for the id,
/// so we simply define them here. Might refine later.
pub type TableId = u64;
pub type ColumnId = u64;
pub type RecordId = u64;

/// [`StorageRequest`] specifies the requests that the execution engine might issue to
/// the storage node.
///
/// Currently we assume the execution engine only requests the whole table/column. We may
/// add `std::ops::RangeBounds` later to support range query from the execution engine.
pub enum StorageRequest {
/// Requests a whole table from the underlying storage.
Table(TableId),
/// Requests one or more columns from the underlying storage.
Columns(TableId, Vec<ColumnId>),
/// Requests one or more tuples from the underlying storage.
/// FIXME: Do we really need this?
Tuple(Vec<RecordId>),
}

/// [`StorageClient`] provides the interface for the execution engine to query data from the
/// storage node. It resolves the physical location of the tables/columns/tuples by querying
/// the catalog node, and then sends the request to the storage node to get the data from the
/// underlying storage.
#[async_trait::async_trait]
pub trait StorageClient: Send + Sync + 'static {
/// Returns the requested data as a stream.
async fn request_data(&self, request: StorageRequest) -> Result<Receiver<RecordBatch>>;

/// Returns all the requested data as a whole.
async fn request_data_sync(&self, request: StorageRequest) -> Result<Vec<RecordBatch>>;
}
64 changes: 0 additions & 64 deletions client/src/kv_store.rs

This file was deleted.

2 changes: 2 additions & 0 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod client_api;
pub mod storage_client;
Loading

0 comments on commit ec319ac

Please sign in to comment.