Skip to content

Commit

Permalink
fix: get trans-failed column and relax json (#3521)
Browse files Browse the repository at this point in the history
* fix: get trans-failed column in api deploy req

* relax check in json double

* json nan input&output, cmake

* fix ut xml upload
  • Loading branch information
vagetablechicken authored Nov 15, 2023
1 parent b1435d2 commit 825d155
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 96 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/cicd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ jobs:
uses: actions/upload-artifact@v2
with:
name: linux-ut-result-cpp-${{ github.sha }}
# exclude _deps xml
path: |
build/**/*.xml
reports/*.xml
!build/_deps/*
- name: install
if: ${{ github.event_name == 'push' }}
Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ endif()
include(FetchContent)
set(FETCHCONTENT_QUIET OFF)
include(farmhash)
include(rapidjson)

# contrib libs
add_subdirectory(contrib EXCLUDE_FROM_ALL)
Expand Down
9 changes: 9 additions & 0 deletions cmake/rapidjson.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FetchContent_Declare(
rapidjson
URL https://github.com/Tencent/rapidjson/archive/refs/tags/v1.1.0.zip
URL_HASH MD5=ceb1cf16e693a3170c173dc040a9d2bd
EXCLUDE_FROM_ALL # don't build this project as part of the overall build
)
# don't build this project, just populate
FetchContent_Populate(rapidjson)
include_directories(${rapidjson_SOURCE_DIR}/include)
19 changes: 17 additions & 2 deletions docs/zh/quickstart/sdk/rest_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@
- REST APIs 通过 APIServer 和 OpenMLDB 的服务进行交互,因此 APIServer 模块必须被正确部署才能有效使用。APISever 在安装部署时是可选模块,参照 [APIServer 部署文档](../../deploy/install_deploy.md#部署-apiserver)
- 现阶段,APIServer 主要用来做功能测试使用,并不推荐用来测试性能,也不推荐在生产环境使用。APIServer 的默认部署目前并没有高可用机制,并且引入了额外的网络和编解码开销。生产环境推荐使用 Java SDK,功能覆盖最完善,并且在功能、性能上都经过了充分测试。

## JSON Body

与APIServer的交互中,请求体均为JSON格式,并支持一定的扩展格式。注意以下几点:

- 传入超过整型或浮点数最大值的数值,将会解析失败,比如,double类型传入`1e1000`
- 非数值浮点数:在传入数据时,支持传入`NaN``Infinity``-Infinity`,与缩写`Inf``-Inf`(注意是unquoted的,并非字符串,也不支持其他变种写法)。在返回数据时,支持返回`NaN``Infinity``-Infinity`(不支持变种写法)。如果你需要将三者转换为null,可以配置 `write_nan_and_inf_null`
- 可以传入整型数字到浮点数,比如,`1`可被读取为double。
- float浮点数可能有精度损失,比如,`0.3`读取后将不会严格等于`0.3`,而是`0.30000000000000004`。我们不拒绝精度损失,请从业务层面考虑是否需要对此进行处理。传入超过float max但不超过double max的值,在读取后将成为`Inf`
- `true/false``null`并不支持大写,只支持小写。
- timestamp类型暂不支持传入年月日字符串,只支持传入数值,比如`1635247427000`
- date类型请传入**年月日字符串**,中间不要包含任何空格。

## 数据插入

请求地址:http://ip:port/dbs/{db_name}/tables/{table_name}
Expand Down Expand Up @@ -55,7 +67,8 @@ curl http://127.0.0.1:8080/dbs/db/tables/trans -X PUT -d '{
```JSON
{
"input": [["row0_value0", "row0_value1", "row0_value2"], ["row1_value0", "row1_value1", "row1_value2"], ...],
"need_schema": false
"need_schema": false,
"write_nan_and_inf_null": false
}
```

Expand All @@ -73,6 +86,7 @@ curl http://127.0.0.1:8080/dbs/db/tables/trans -X PUT -d '{

- 可以支持多行,其结果与返回的 response 中的 data.data 字段的数组一一对应。
- need_schema 可以设置为 true, 返回就会有输出结果的 schema。可选参数,默认为 false。
- write_nan_and_inf_null 可以设置为 true,可选参数,默认为false。如果设置为 true,当输出数据中有 NaN、Inf、-Inf 时,会将其转换为 null。
- input 为 array 格式/JSON 格式时候返回结果也是 array 格式/JSON 格式,一次请求的 input 只支持一种格式,请不要混合格式。
- JSON 格式的 input 数据可以有多余列。

Expand Down Expand Up @@ -131,7 +145,8 @@ curl http://127.0.0.1:8080/dbs/demo_db/deployments/demo_data_service -X POST -d'
"input": {
"schema": [],
"data": []
}
},
"write_nan_and_inf_null": false
}
```

Expand Down
99 changes: 62 additions & 37 deletions src/apiserver/api_server_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,16 @@ void APIServerImpl::RegisterQuery() {
}

QueryResp query_resp;
// we set write_nan_and_inf_null here instead of create a new JsonWriter with flags, cuz JsonWriter is not a
// good impl for template flag
query_resp.write_nan_and_inf_null = req.write_nan_and_inf_null;
query_resp.rs = rs;
writer << query_resp;
});
}

bool APIServerImpl::JsonArray2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v,
const butil::rapidjson::Value& common_cols_v,
std::shared_ptr<openmldb::sdk::SQLRequestRow> row) {
absl::Status APIServerImpl::JsonArray2SQLRequestRow(const Value& non_common_cols_v, const Value& common_cols_v,
std::shared_ptr<openmldb::sdk::SQLRequestRow> row) {
auto sch = row->GetSchema();

// scan all strings to init the total string length
Expand All @@ -186,23 +188,24 @@ bool APIServerImpl::JsonArray2SQLRequestRow(const butil::rapidjson::Value& non_c
for (decltype(sch->GetColumnCnt()) i = 0; i < sch->GetColumnCnt(); ++i) {
if (sch->IsConstant(i)) {
if (!AppendJsonValue(common_cols_v[common_idx], sch->GetColumnType(i), sch->IsColumnNotNull(i), row)) {
return false;
return absl::InvalidArgumentError(
absl::StrCat("trans const ", sch->GetColumnName(i), "[", sch->GetColumnType(i), "] failed"));
}
++common_idx;
} else {
if (!AppendJsonValue(non_common_cols_v[non_common_idx], sch->GetColumnType(i), sch->IsColumnNotNull(i),
row)) {
return false;
return absl::InvalidArgumentError(
absl::StrCat("trans ", sch->GetColumnName(i), "[", sch->GetColumnType(i), "] failed"));
}
++non_common_idx;
}
}
return true;
return absl::OkStatus();
}

template <typename T>
bool APIServerImpl::AppendJsonValue(const butil::rapidjson::Value& v, hybridse::sdk::DataType type, bool is_not_null,
T row) {
bool APIServerImpl::AppendJsonValue(const Value& v, hybridse::sdk::DataType type, bool is_not_null, T row) {
// check if null
if (v.IsNull()) {
if (is_not_null) {
Expand Down Expand Up @@ -237,13 +240,14 @@ bool APIServerImpl::AppendJsonValue(const butil::rapidjson::Value& v, hybridse::
return row->AppendInt64(v.GetInt64());
}
case hybridse::sdk::kTypeFloat: {
if (!v.IsDouble()) {
if (!v.IsNumber()) { // relax check, int can get as double and support set float NaN&Inf
return false;
}
return row->AppendFloat(boost::lexical_cast<float>(v.GetDouble()));
// IEEE 754 arithmetic allows cast nan/inf to float
return row->AppendFloat(v.GetFloat());
}
case hybridse::sdk::kTypeDouble: {
if (!v.IsDouble()) {
if (!v.IsLosslessDouble()) {
return false;
}
return row->AppendDouble(v.GetDouble());
Expand Down Expand Up @@ -281,9 +285,8 @@ bool APIServerImpl::AppendJsonValue(const butil::rapidjson::Value& v, hybridse::
}

// common_cols_v is still an array, but non_common_cols_v is map, should find the value by the column name
bool APIServerImpl::JsonMap2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v,
const butil::rapidjson::Value& common_cols_v,
std::shared_ptr<openmldb::sdk::SQLRequestRow> row) {
absl::Status APIServerImpl::JsonMap2SQLRequestRow(const Value& non_common_cols_v, const Value& common_cols_v,
std::shared_ptr<openmldb::sdk::SQLRequestRow> row) {
auto sch = row->GetSchema();

// scan all strings to init the total string length
Expand All @@ -300,8 +303,7 @@ bool APIServerImpl::JsonMap2SQLRequestRow(const butil::rapidjson::Value& non_com
if (sch->GetColumnType(i) == hybridse::sdk::kTypeString) {
auto v = non_common_cols_v.FindMember(sch->GetColumnName(i).c_str());
if (v == non_common_cols_v.MemberEnd()) {
LOG(WARNING) << "can't find " << sch->GetColumnName(i);
return false;
return absl::InvalidArgumentError("can't find " + sch->GetColumnName(i));
}
str_len_sum += v->value.GetStringLength();
}
Expand All @@ -313,23 +315,22 @@ bool APIServerImpl::JsonMap2SQLRequestRow(const butil::rapidjson::Value& non_com
for (decltype(sch->GetColumnCnt()) i = 0; i < sch->GetColumnCnt(); ++i) {
if (sch->IsConstant(i)) {
if (!AppendJsonValue(common_cols_v[common_idx], sch->GetColumnType(i), sch->IsColumnNotNull(i), row)) {
LOG(WARNING) << "set " << sch->GetColumnName(i) << " failed";
return false;
return absl::InvalidArgumentError(
absl::StrCat("trans const ", sch->GetColumnName(i), "[", sch->GetColumnType(i), "] failed"));
}
++common_idx;
} else {
auto v = non_common_cols_v.FindMember(sch->GetColumnName(i).c_str());
if (v == non_common_cols_v.MemberEnd()) {
LOG(WARNING) << "can't find " << sch->GetColumnName(i);
return false;
return absl::InvalidArgumentError("can't find " + sch->GetColumnName(i));
}
if (!AppendJsonValue(v->value, sch->GetColumnType(i), sch->IsColumnNotNull(i), row)) {
LOG(WARNING) << "set " << sch->GetColumnName(i) << " failed";
return false;
return absl::InvalidArgumentError(
absl::StrCat("trans ", sch->GetColumnName(i), "[", sch->GetColumnType(i), "] failed"));
}
}
}
return true;
return absl::OkStatus();
}

void APIServerImpl::RegisterPut() {
Expand All @@ -347,7 +348,7 @@ void APIServerImpl::RegisterPut() {

// json2doc, then generate an insert sql
Document document;
if (document.Parse(req_body.to_string().c_str()).HasParseError()) {
if (document.Parse<rapidjson::kParseNanAndInfFlag>(req_body.to_string().c_str()).HasParseError()) {
DLOG(INFO) << "rapidjson doc parse [" << req_body.to_string().c_str() << "] failed, code "
<< document.GetParseError() << ", offset " << document.GetErrorOffset();
writer << resp.Set("Json parse failed, error code: " + std::to_string(document.GetParseError()));
Expand Down Expand Up @@ -430,13 +431,14 @@ void APIServerImpl::ExecuteProcedure(bool has_common_col, const InterfaceProvide
auto db = db_it->second;
auto sp = sp_it->second;

// TODO(hw): JsonReader can't set SQLRequestRow simply(cuz common_cols), use raw rapidjson here
Document document;
if (document.Parse(req_body.to_string().c_str()).HasParseError()) {
if (document.Parse<rapidjson::kParseNanAndInfFlag>(req_body.to_string().c_str()).HasParseError()) {
writer << resp.Set("Request body json parse failed");
return;
}

butil::rapidjson::Value common_cols_v;
Value common_cols_v;
if (has_common_col) {
auto common_cols = document.FindMember("common_cols");
if (common_cols != document.MemberEnd()) {
Expand All @@ -459,6 +461,12 @@ void APIServerImpl::ExecuteProcedure(bool has_common_col, const InterfaceProvide
}
const auto& rows = input->value;

auto write_nan_and_inf_null = false;
auto write_nan_and_inf_null_option = document.FindMember("write_nan_and_inf_null");
if (write_nan_and_inf_null_option != document.MemberEnd() && write_nan_and_inf_null_option->value.IsBool()) {
write_nan_and_inf_null = write_nan_and_inf_null_option->value.GetBool();
}

hybridse::sdk::Status status;
// We need to use ShowProcedure to get input schema(should know which column is constant).
// GetRequestRowByProcedure can't do that.
Expand Down Expand Up @@ -498,13 +506,15 @@ void APIServerImpl::ExecuteProcedure(bool has_common_col, const InterfaceProvide
writer << resp.Set("Invalid input data size in row " + std::to_string(i));
return;
}
if (!JsonArray2SQLRequestRow(rows[i], common_cols_v, row)) {
writer << resp.Set("Translate to request row failed in array row " + std::to_string(i));
if (auto st = JsonArray2SQLRequestRow(rows[i], common_cols_v, row); !st.ok()) {
writer << resp.Set("Translate to request row failed in array row " + std::to_string(i) + ", " +
st.ToString());
return;
}
} else if (rows[i].IsObject()) {
if (!JsonMap2SQLRequestRow(rows[i], common_cols_v, row)) {
writer << resp.Set("Translate to request row failed in map row " + std::to_string(i));
if (auto st = JsonMap2SQLRequestRow(rows[i], common_cols_v, row); !st.ok()) {
writer << resp.Set("Translate to request row failed in map row " + std::to_string(i) + ", " +
st.ToString());
return;
}
} else {
Expand All @@ -522,6 +532,7 @@ void APIServerImpl::ExecuteProcedure(bool has_common_col, const InterfaceProvide
}

ExecSPResp sp_resp;
sp_resp.write_nan_and_inf_null = write_nan_and_inf_null;
// output schema in sp_info is needed for encoding data, so we need a bool in ExecSPResp to know whether to
// print schema
sp_resp.sp_info = sp_info;
Expand Down Expand Up @@ -720,6 +731,9 @@ JsonReader& operator&(JsonReader& ar, QueryReq& s) { // NOLINT
if (ar.HasMember("input")) {
ar.Member("input") & s.parameter;
}
if (ar.HasMember("write_nan_and_inf_null")) {
ar.Member("write_nan_and_inf_null") & s.write_nan_and_inf_null;
}
return ar.EndObject();
}

Expand Down Expand Up @@ -877,7 +891,18 @@ void WriteSchema(JsonWriter& ar, const std::string& name, const hybridse::sdk::S
ar.EndArray();
}

void WriteValue(JsonWriter& ar, std::shared_ptr<hybridse::sdk::ResultSet> rs, int i) { // NOLINT
void WriteDoubleHelper(JsonWriter& ar, double d, bool write_nan_and_inf_null) { // NOLINT
if (write_nan_and_inf_null) {
if (std::isnan(d) || std::isinf(d)) {
ar.SetNull();
return;
}
}
ar& d;
}

void WriteValue(JsonWriter& ar, std::shared_ptr<hybridse::sdk::ResultSet> rs, int i, // NOLINT
bool write_nan_and_inf_null) {
auto schema = rs->GetSchema();
if (rs->IsNULL(i)) {
if (schema->IsColumnNotNull(i)) {
Expand Down Expand Up @@ -908,13 +933,13 @@ void WriteValue(JsonWriter& ar, std::shared_ptr<hybridse::sdk::ResultSet> rs, in
case hybridse::sdk::kTypeFloat: {
float value = 0;
rs->GetFloat(i, &value);
ar& static_cast<double>(value);
WriteDoubleHelper(ar, value, write_nan_and_inf_null);
break;
}
case hybridse::sdk::kTypeDouble: {
double value = 0;
rs->GetDouble(i, &value);
ar& value;
WriteDoubleHelper(ar, value, write_nan_and_inf_null);
break;
}
case hybridse::sdk::kTypeString: {
Expand Down Expand Up @@ -980,15 +1005,15 @@ JsonWriter& operator&(JsonWriter& ar, ExecSPResp& s) { // NOLINT
for (decltype(schema.GetColumnCnt()) i = 0; i < schema.GetColumnCnt(); i++) {
if (!schema.IsConstant(i)) {
ar.Member(schema.GetColumnName(i).c_str());
WriteValue(ar, rs, i);
WriteValue(ar, rs, i, s.write_nan_and_inf_null);
}
}
ar.EndObject();
} else {
ar.StartArray();
for (decltype(schema.GetColumnCnt()) i = 0; i < schema.GetColumnCnt(); i++) {
if (!schema.IsConstant(i)) {
WriteValue(ar, rs, i);
WriteValue(ar, rs, i, s.write_nan_and_inf_null);
}
}
ar.EndArray(); // one row end
Expand All @@ -1004,7 +1029,7 @@ JsonWriter& operator&(JsonWriter& ar, ExecSPResp& s) { // NOLINT
ar.StartArray();
for (decltype(schema.GetColumnCnt()) i = 0; i < schema.GetColumnCnt(); i++) {
if (schema.IsConstant(i)) {
WriteValue(ar, rs, i);
WriteValue(ar, rs, i, s.write_nan_and_inf_null);
}
}
ar.EndArray(); // one row end
Expand Down Expand Up @@ -1255,7 +1280,7 @@ JsonWriter& operator&(JsonWriter& ar, QueryResp& s) { // NOLINT
while (rs->Next()) {
ar.StartArray();
for (decltype(schema.GetColumnCnt()) i = 0; i < schema.GetColumnCnt(); i++) {
WriteValue(ar, rs, i);
WriteValue(ar, rs, i, s.write_nan_and_inf_null);
}
ar.EndArray();
}
Expand Down
Loading

0 comments on commit 825d155

Please sign in to comment.