Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support filtering & decoding transaction input for txs dataset with --function-signature #145

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ pub struct Args {
#[arg(long, value_name = "SIG", help_heading = "Dataset-specific Options", num_args(1..))]
pub event_signature: Option<String>,

/// Function signature for transaction calldata decoding
#[arg(long, value_name = "SIG", help_heading = "Dataset-specific Options", num_args(1..))]
pub function_signature: Option<String>,

/// Blocks per request (eth_getLogs)
#[arg(
long,
Expand Down
12 changes: 11 additions & 1 deletion crates/cli/src/parse/schemas.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::collections::HashMap;

use cryo_freeze::{
ColumnEncoding, Datatype, FileFormat, LogDecoder, MultiDatatype, ParseError, Table,
CalldataDecoder, ColumnEncoding, Datatype, FileFormat, LogDecoder, MultiDatatype, ParseError,
Table,
};

use super::file_output;
Expand Down Expand Up @@ -47,6 +48,14 @@ pub(crate) fn parse_schemas(
None => None,
};

let calldata_decoder = match args.function_signature {
Some(ref sig) => match CalldataDecoder::new(sig.clone()) {
Ok(res) => Some(res),
Err(_) => return Err(ParseError::ParseError("invalid function signature".to_string())),
},
None => None,
};

// create schemas
let schemas: Result<HashMap<Datatype, Table>, ParseError> = datatypes
.iter()
Expand All @@ -60,6 +69,7 @@ pub(crate) fn parse_schemas(
&args.columns,
sort[datatype].clone(),
log_decoder.clone(),
calldata_decoder.clone(),
)
.map(|schema| (*datatype, schema))
.map_err(|e| {
Expand Down
46 changes: 44 additions & 2 deletions crates/freeze/src/datasets/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct Transactions {
chain_id: Vec<u64>,
timestamp: Vec<u32>,
block_hash: Vec<Vec<u8>>,
function_cols: indexmap::IndexMap<String, Vec<ethers_core::abi::Token>>,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -89,8 +90,20 @@ impl CollectByBlock for Transactions {
} else {
Box::new(|_| true)
};
let transactions =
block.transactions.clone().into_iter().filter(from_filter).filter(to_filter).collect();
let function_signature_filter: Box<dyn Fn(&Transaction) -> bool + Send> =
if let Some(decoder) = &schema.calldata_decoder {
Box::new(move |tx| tx.input.starts_with(&decoder.function.short_signature()[..]))
} else {
Box::new(|_| true)
};
let transactions = block
.transactions
.clone()
.into_iter()
.filter(from_filter)
.filter(to_filter)
.filter(function_signature_filter)
.collect();

// 2. collect receipts if necessary
// if transactions are filtered fetch by set of transaction hashes, else fetch all receipts
Expand Down Expand Up @@ -178,6 +191,26 @@ pub(crate) fn process_transaction(
exclude_failed: bool,
timestamp: u32,
) -> R<()> {
// if calldata_decoder is supplied, transactions should be processed only if
// the calldata matches the given function signature
let decoded_args = match &schema.calldata_decoder {
None => None,
Some(decoder) => {
if tx.input.len() < 4 {
return Ok(())
}
match decoder.function.decode_input(&tx.input[4..]) {
Ok(decoded_input) => {
Some(decoded_input.into_iter().zip(&decoder.args).collect::<Vec<_>>())
}
Err(_) => {
// if decoder exists and decode fails, return without appending column
return Ok(())
}
}
}
};

let success = if exclude_failed | schema.has_column("success") {
let success = tx_success(&tx, &receipt)?;
if exclude_failed & !success {
Expand Down Expand Up @@ -212,6 +245,15 @@ pub(crate) fn process_transaction(
store!(schema, columns, timestamp, timestamp);
store!(schema, columns, block_hash, tx.block_hash.unwrap_or_default().as_bytes().to_vec());

match decoded_args {
None => {}
Some(decoded_args) => {
for (token, arg) in decoded_args {
columns.function_cols.entry(arg.clone()).or_default().push(token)
}
}
}

Ok(())
}

Expand Down
213 changes: 213 additions & 0 deletions crates/freeze/src/types/decoders/calldata_decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
use ethers_core::{
abi::{self, AbiEncode, HumanReadableParser, Param, ParamType, Token},
types::{I256, U256},
};
use polars::{prelude::NamedFrom, series::Series};

use crate::{err, CollectError, ColumnEncoding, ToU256Series, U256Type};

/// container for calldata decoding context
#[derive(Clone, Debug, PartialEq)]
pub struct CalldataDecoder {
/// the raw function signature string ex: transfer(address to, uint256 value)
pub raw: String,
/// decoded abi type of function signature string
pub function: abi::Function,
/// argument names of function
pub args: Vec<String>,
}

impl CalldataDecoder {
/// create a new CalldataDecoder from function signature
pub fn new(function_signature: String) -> Result<Self, String> {
match HumanReadableParser::parse_function(function_signature.as_str()) {
Ok(mut function) => {
let args = function
.inputs
.clone()
.into_iter()
.enumerate()
.map(|(i, param)| {
if param.name.is_empty() {
let name = format!("arg_{}", i);
function.inputs[i].name = name.clone();
name
} else {
param.name
}
})
.collect();
Ok(Self { function, raw: function_signature.clone(), args })
}
Err(e) => {
let err = format!(
"incorrectly formatted function {} (expect something like function transfer(address,uint256) err: {}",
function_signature, e
);
eprintln!("{}", err);
Err(err)
}
}
}

/// data should never be mixed type, otherwise this will return inconsistent results
pub fn make_series(
&self,
name: String,
data: Vec<Token>,
chunk_len: usize,
u256_types: &[U256Type],
column_encoding: &ColumnEncoding,
) -> Result<Vec<Series>, CollectError> {
// This is a smooth brain way of doing this, but I can't think of a better way right now
let mut ints: Vec<i64> = vec![];
let mut uints: Vec<u64> = vec![];
let mut str_ints: Vec<String> = vec![];
let mut u256s: Vec<U256> = vec![];
let mut i256s: Vec<I256> = vec![];
let mut bytes: Vec<Vec<u8>> = vec![];
let mut hexes: Vec<String> = vec![];
let mut bools: Vec<bool> = vec![];
let mut strings: Vec<String> = vec![];
// TODO: support array & tuple types

let param = self
.function
.inputs
.clone()
.into_iter()
.filter(|i| i.name == name)
.collect::<Vec<Param>>();
let param = param.first();

for token in data {
match token {
Token::Address(a) => match column_encoding {
ColumnEncoding::Binary => bytes.push(a.to_fixed_bytes().into()),
ColumnEncoding::Hex => hexes.push(format!("{:?}", a)),
},
Token::FixedBytes(b) => match column_encoding {
ColumnEncoding::Binary => bytes.push(b),
ColumnEncoding::Hex => hexes.push(b.encode_hex()),
},
Token::Bytes(b) => match column_encoding {
ColumnEncoding::Binary => bytes.push(b),
ColumnEncoding::Hex => hexes.push(b.encode_hex()),
},
Token::Uint(i) => match param {
Some(param) => match param.kind.clone() {
ParamType::Uint(size) => {
if size <= 64 {
uints.push(i.as_u64())
} else {
u256s.push(i)
}
}
_ => str_ints.push(i.to_string()),
},
None => match i.try_into() {
Ok(i) => ints.push(i),
Err(_) => str_ints.push(i.to_string()),
},
},
Token::Int(i) => {
let i = I256::from_raw(i);
match param {
Some(param) => match param.kind.clone() {
ParamType::Int(size) => {
if size <= 64 {
ints.push(i.as_i64())
} else {
i256s.push(i)
}
}
_ => str_ints.push(i.to_string()),
},
None => match i.try_into() {
Ok(i) => ints.push(i),
Err(_) => str_ints.push(i.to_string()),
},
}
}
Token::Bool(b) => bools.push(b),
Token::String(s) => strings.push(s),
Token::Array(_) | Token::FixedArray(_) => {}
Token::Tuple(_) => {}
}
}
let mixed_length_err = format!("could not parse column {}, mixed type", name);
let mixed_length_err = mixed_length_err.as_str();

// check each vector, see if it contains any values, if it does, check if it's the same
// length as the input data and map to a series
let name = format!("param__{}", name);
if !ints.is_empty() {
Ok(vec![Series::new(name.as_str(), ints)])
} else if !i256s.is_empty() {
let mut series_vec = Vec::new();
for u256_type in u256_types.iter() {
series_vec.push(i256s.to_u256_series(
name.clone(),
u256_type.clone(),
column_encoding,
)?)
}
Ok(series_vec)
} else if !u256s.is_empty() {
let mut series_vec: Vec<Series> = Vec::new();
for u256_type in u256_types.iter() {
series_vec.push(u256s.to_u256_series(
name.clone(),
u256_type.clone(),
column_encoding,
)?)
}
Ok(series_vec)
} else if !uints.is_empty() {
Ok(vec![Series::new(name.as_str(), uints)])
} else if !str_ints.is_empty() {
Ok(vec![Series::new(name.as_str(), str_ints)])
} else if !bytes.is_empty() {
if bytes.len() != chunk_len {
return Err(err(mixed_length_err))
}
Ok(vec![Series::new(name.as_str(), bytes)])
} else if !hexes.is_empty() {
if hexes.len() != chunk_len {
return Err(err(mixed_length_err))
}
Ok(vec![Series::new(name.as_str(), hexes)])
} else if !bools.is_empty() {
if bools.len() != chunk_len {
return Err(err(mixed_length_err))
}
Ok(vec![Series::new(name.as_str(), bools)])
} else if !strings.is_empty() {
if strings.len() != chunk_len {
return Err(err(mixed_length_err))
}
Ok(vec![Series::new(name.as_str(), strings)])
} else {
// case where no data was passed
Ok(vec![Series::new(name.as_str(), vec![None::<u64>; chunk_len])])
}
}
}

mod tests {
#[allow(unused_imports)]
use super::CalldataDecoder;

#[test]
fn test_human_readable_parser() {
let decoder =
CalldataDecoder::new("transfer(address to,uint256 value)".to_string()).unwrap();
assert_eq!(decoder.args, vec!["to".to_string(), "value".to_string()]);
}

// #[test]
// fn test_human_readable_parser_without_arg_name() {
// let decoder = CalldataDecoder::new("transfer(address,uint256)".to_string()).unwrap();
// assert_eq!(decoder.args, vec!["arg_0".to_string(), "arg_1".to_string()]);
// }
}
3 changes: 3 additions & 0 deletions crates/freeze/src/types/decoders/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/// log decoder
pub mod log_decoder;
pub use log_decoder::*;
/// calldata decoder
pub mod calldata_decoder;
pub use calldata_decoder::*;
Loading
Loading