Skip to content

Commit

Permalink
Serialize user-defined types as strings
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Nov 12, 2024
1 parent b65e04d commit 90e705a
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 11 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

#### [v0.4.0](https://github.com/BemiHQ/BemiDB/compare/v0.3.2...v0.4.0) - 2024-11-11

- Serialize user-defined types as strings

#### [v0.3.2](https://github.com/BemiHQ/BemiDB/compare/v0.3.1...v0.3.2) - 2024-11-11

- Fix Postgres `bigint` type conversion to Iceberg and Parquet
Expand Down
2 changes: 1 addition & 1 deletion scripts/install.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

VERSION="0.3.2"
VERSION="0.4.0"

# Detect OS and architecture
OS=$(uname -s | tr '[:upper:]' '[:lower:]')
Expand Down
22 changes: 17 additions & 5 deletions src/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,56 @@ import (
var TEST_PG_SCHEMA_COLUMNS = []PgSchemaColumn{
{
ColumnName: "id",
DataType: "character varying",
UdtName: "varchar",
IsNullable: "NO",
OrdinalPosition: "1",
},
{
ColumnName: "name",
DataType: "text",
UdtName: "text",
IsNullable: "NO",
OrdinalPosition: "2",
},
{
ColumnName: "int_value",
DataType: "integer",
UdtName: "int4",
IsNullable: "NO",
OrdinalPosition: "3",
},
{
ColumnName: "bigint_value",
DataType: "bigint",
UdtName: "int8",
IsNullable: "NO",
OrdinalPosition: "4",
},
{
ColumnName: "decimal_value",
DataType: "numeric",
UdtName: "numeric",
IsNullable: "NO",
OrdinalPosition: "5",
NumericPrecision: "10",
NumericScale: "2",
},
{
ColumnName: "user_defined_value",
DataType: "USER-DEFINED",
UdtName: "user_defined",
IsNullable: "NO",
OrdinalPosition: "6",
},
}

var TEST_LOADED_ROWS = [][]string{
{"1", "metric_1", "5", "9223372036854775807", "5.0"},
{"2", "metric_2", "10", "-9223372036854775808", "10.0"},
{"3", "metric_1", "5", "0", "5.0"},
{"4", "metric_2", "10", "1", "10.0"},
{"5", "metric_1", "5", "-1", "5.0"},
{"1", "metric_1", "5", "9223372036854775807", "5.0", "user_defined"},
{"2", "metric_2", "10", "-9223372036854775808", "10.0", "user_defined"},
{"3", "metric_1", "5", "0", "5.0", "user_defined"},
{"4", "metric_2", "10", "1", "10.0", "user_defined"},
{"5", "metric_1", "5", "-1", "5.0", "user_defined"},
}

func init() {
Expand Down
2 changes: 1 addition & 1 deletion src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
)

const VERSION = "0.3.2"
const VERSION = "0.4.0"

func main() {
flag.Parse()
Expand Down
12 changes: 8 additions & 4 deletions src/pg_schema_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
)

const (
PG_SCHEMA_TRUE = "YES"
PG_SCHEMA_FALSE = "FALSE"
PG_SCHEMA_TRUE = "YES"
PG_SCHEMA_FALSE = "FALSE"
PG_DATA_TYPE_USER_DEFINED = "USER-DEFINED"

PARQUET_SCHEMA_REPETITION_TYPE_REQUIRED = "REQUIRED"
PARQUET_SCHEMA_REPETITION_TYPE_OPTIONAL = "OPTIONAL"
Expand All @@ -18,6 +19,7 @@ const (

type PgSchemaColumn struct {
ColumnName string
DataType string
UdtName string
IsNullable string
OrdinalPosition string
Expand Down Expand Up @@ -126,6 +128,8 @@ func (pgSchemaColumn *PgSchemaColumn) FormatParquetValue(value string) *string {
value = "[" + strings.Join(values, ",") + "]"
return &value
}
} else if pgSchemaColumn.DataType == PG_DATA_TYPE_USER_DEFINED {
return &value
}
}

Expand Down Expand Up @@ -175,7 +179,7 @@ func (pgSchemaColumn PgSchemaColumn) toIcebergSchemaField() IcebergSchemaField {
case "time", "timetz":
icebergSchemaField.Type = "time"
default:
if strings.HasPrefix(pgSchemaColumn.UdtName, "_") {
if pgSchemaColumn.DataType == PG_DATA_TYPE_USER_DEFINED || strings.HasPrefix(pgSchemaColumn.UdtName, "_") {
icebergSchemaField.Type = "string"
} else {
panic("Unsupported PostgreSQL type: " + pgSchemaColumn.UdtName)
Expand Down Expand Up @@ -244,7 +248,7 @@ func (pgSchemaColumn *PgSchemaColumn) toParquetSchemaField() ParquetSchemaField
parquetSchemaField.Type = "BYTE_ARRAY"
parquetSchemaField.ConvertedType = "INTERVAL"
default:
if strings.HasPrefix(pgSchemaColumn.UdtName, "_") {
if pgSchemaColumn.DataType == PG_DATA_TYPE_USER_DEFINED || strings.HasPrefix(pgSchemaColumn.UdtName, "_") {
parquetSchemaField.Type = "BYTE_ARRAY"
parquetSchemaField.ConvertedType = "UTF8"
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (syncer *Syncer) pgTableSchemaColumns(conn *pgx.Conn, pgSchemaTable SchemaT
context.Background(),
`SELECT
column_name,
data_type,
udt_name,
is_nullable,
ordinal_position,
Expand All @@ -155,6 +156,7 @@ func (syncer *Syncer) pgTableSchemaColumns(conn *pgx.Conn, pgSchemaTable SchemaT
var pgSchemaColumn PgSchemaColumn
err = rows.Scan(
&pgSchemaColumn.ColumnName,
&pgSchemaColumn.DataType,
&pgSchemaColumn.UdtName,
&pgSchemaColumn.IsNullable,
&pgSchemaColumn.OrdinalPosition,
Expand Down

0 comments on commit 90e705a

Please sign in to comment.