From cf8d35d06e1ba88cebd90565009401b3337e3501 Mon Sep 17 00:00:00 2001 From: gnehil Date: Fri, 1 Dec 2023 12:30:49 +0800 Subject: [PATCH] fix timestamp type precision lost --- .../apache/doris/spark/sql/SchemaUtils.scala | 5 +++-- .../doris/spark/sql/TestSchemaUtils.scala | 20 ++++++++++++++++--- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index 1f0e9422..d56a4a3b 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -32,7 +32,9 @@ import org.apache.spark.sql.types._ import org.slf4j.LoggerFactory import java.sql.Timestamp +import java.time.format.DateTimeFormatter import java.time.{LocalDateTime, ZoneOffset} +import java.util.Locale import scala.collection.JavaConversions._ import scala.collection.mutable @@ -164,8 +166,7 @@ private[spark] object SchemaUtils { case DoubleType => row.getDouble(ordinal) case StringType => Option(row.getUTF8String(ordinal)).map(_.toString).getOrElse(DataUtil.NULL_VALUE) case TimestampType => - LocalDateTime.ofEpochSecond(row.getLong(ordinal) / 100000, (row.getLong(ordinal) % 1000).toInt, ZoneOffset.UTC) - new Timestamp(row.getLong(ordinal) / 1000).toString + DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)).toString case DateType => DateTimeUtils.toJavaDate(row.getInt(ordinal)).toString case BinaryType => row.getBinary(ordinal) case dt: DecimalType => row.getDecimal(ordinal, dt.precision, dt.scale).toJavaBigDecimal diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala index e11fb4fb..5312c82f 100644 --- a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala +++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala @@ -20,13 +20,15 @@ package org.apache.doris.spark.sql import org.apache.doris.sdk.thrift.{TPrimitiveType, TScanColumnDesc} import org.apache.doris.spark.exception.DorisException import org.apache.doris.spark.rest.models.{Field, Schema} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types._ import org.hamcrest.core.StringStartsWith.startsWith import org.junit.{Assert, Ignore, Test} +import java.sql.Timestamp +import java.time.temporal.ChronoField import scala.collection.JavaConverters._ -@Ignore class TestSchemaUtils extends ExpectedExceptionTest { @Test def testConvertToStruct(): Unit = { @@ -54,8 +56,8 @@ class TestSchemaUtils extends ExpectedExceptionTest { Assert.assertEquals(DataTypes.LongType, SchemaUtils.getCatalystType("BIGINT", 0, 0)) Assert.assertEquals(DataTypes.FloatType, SchemaUtils.getCatalystType("FLOAT", 0, 0)) Assert.assertEquals(DataTypes.DoubleType, SchemaUtils.getCatalystType("DOUBLE", 0, 0)) - Assert.assertEquals(DataTypes.StringType, SchemaUtils.getCatalystType("DATE", 0, 0)) - Assert.assertEquals(DataTypes.StringType, SchemaUtils.getCatalystType("DATETIME", 0, 0)) + Assert.assertEquals(DataTypes.DateType, SchemaUtils.getCatalystType("DATE", 0, 0)) + Assert.assertEquals(DataTypes.TimestampType, SchemaUtils.getCatalystType("DATETIME", 0, 0)) Assert.assertEquals(DataTypes.BinaryType, SchemaUtils.getCatalystType("BINARY", 0, 0)) Assert.assertEquals(DecimalType(9, 3), SchemaUtils.getCatalystType("DECIMAL", 9, 3)) Assert.assertEquals(DataTypes.StringType, SchemaUtils.getCatalystType("CHAR", 0, 0)) @@ -113,4 +115,16 @@ class TestSchemaUtils extends ExpectedExceptionTest { } + @Test + def rowColumnValueTest(): Unit = { + + val timestamp = Timestamp.valueOf("2021-01-01 11:12:23.345678") + val row = InternalRow.fromSeq(Seq( + timestamp.getTime / 1000 * 1000000 + timestamp.getNanos / 1000 + )) + + Assert.assertEquals("2021-01-01 11:12:23.345678", SchemaUtils.rowColumnValue(row, 0, TimestampType)) + + } + }