From 79c3eea324ac19769fe2e42228d779cac695f0b1 Mon Sep 17 00:00:00 2001 From: Talat Uyarer Date: Sat, 14 Mar 2026 15:41:30 -0700 Subject: [PATCH] Upgrade Avro to 1.12.1, enable decimal logical types, and nanosecond timestamp precision handling. --- flink-formats/flink-avro/pom.xml | 1 + .../formats/avro/AvroToRowDataConverters.java | 52 ++++++++++++------- .../formats/avro/RowDataToAvroConverters.java | 34 ++++++++++-- .../avro/typeutils/AvroSchemaConverter.java | 21 ++++++-- .../formats/avro/AvroOutputFormatITCase.java | 8 ++- .../formats/avro/AvroOutputFormatTest.java | 8 ++- .../avro/AvroRecordInputFormatTest.java | 18 +++---- .../AvroRowDataDeSerializationSchemaTest.java | 30 +++++++---- .../avro/AvroSplittableInputFormatTest.java | 31 +++++------ .../formats/avro/EncoderDecoderTest.java | 10 ++-- .../typeutils/AvroSchemaConverterTest.java | 30 +++++++++-- .../typeutils/AvroTypeExtractionTest.java | 16 +++--- .../formats/avro/utils/AvroTestUtils.java | 51 ++++++++++++------ .../formats/avro/utils/TestDataGenerator.java | 6 +-- .../table/runtime/batch/AvroTypesITCase.java | 34 ++++++------ .../src/test/resources/avro/user.avsc | 5 +- pom.xml | 2 +- 17 files changed, 222 insertions(+), 135 deletions(-) diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml index 262d5c03cf823..831a0aac2e3da 100644 --- a/flink-formats/flink-avro/pom.xml +++ b/flink-formats/flink-avro/pom.xml @@ -257,6 +257,7 @@ under the License. ${project.basedir}/src/test/resources/avro ${project.basedir}/target/generated-test-sources/ + true diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java index 9c63c56c4dcba..83c82b054c679 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java @@ -29,8 +29,10 @@ import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; import org.apache.avro.generic.GenericFixed; @@ -128,12 +130,13 @@ private static AvroToRowDataConverter createConverter( case TIME_WITHOUT_TIME_ZONE: return AvroToRowDataConverters::convertToTime; case TIMESTAMP_WITHOUT_TIME_ZONE: - return AvroToRowDataConverters::convertToTimestamp; + return createTimestampConverter(((TimestampType) type).getPrecision()); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: if (legacyTimestampMapping) { throw new UnsupportedOperationException("Unsupported type: " + type); } else { - return AvroToRowDataConverters::convertToTimestamp; + return createTimestampConverter( + ((LocalZonedTimestampType) type).getPrecision()); } case CHAR: case VARCHAR: @@ -211,24 +214,37 @@ private static AvroToRowDataConverter createMapConverter( }; } - private static TimestampData convertToTimestamp(Object object) { - final long millis; - if (object instanceof Long) { - millis = (Long) object; - } else if (object instanceof Instant) { - millis = ((Instant) object).toEpochMilli(); - } else if (object instanceof LocalDateTime) { - return TimestampData.fromLocalDateTime((LocalDateTime) object); - } else { - JodaConverter jodaConverter = JodaConverter.getConverter(); - if (jodaConverter != null) { - millis = jodaConverter.convertTimestamp(object); + private static AvroToRowDataConverter createTimestampConverter(int precision) { + return object -> { + if (object instanceof Long) { + long val = (Long) object; + if (precision <= 3) { + return TimestampData.fromEpochMillis(val); + } else if (precision <= 6) { + long millis = Math.floorDiv(val, 1000L); + int nanosOfMilli = (int) (Math.floorMod(val, 1000L) * 1000); + return TimestampData.fromEpochMillis(millis, nanosOfMilli); + } else { + long millis = Math.floorDiv(val, 1000_000L); + int nanosOfMilli = (int) Math.floorMod(val, 1000_000L); + return TimestampData.fromEpochMillis(millis, nanosOfMilli); + } + } else if (object instanceof Instant) { + return TimestampData.fromInstant((Instant) object); + } else if (object instanceof LocalDateTime) { + return TimestampData.fromLocalDateTime((LocalDateTime) object); } else { - throw new IllegalArgumentException( - "Unexpected object type for TIMESTAMP logical type. Received: " + object); + JodaConverter jodaConverter = JodaConverter.getConverter(); + if (jodaConverter != null) { + long millis = jodaConverter.convertTimestamp(object); + return TimestampData.fromEpochMillis(millis); + } else { + throw new IllegalArgumentException( + "Unexpected object type for TIMESTAMP logical type. Received: " + + object); + } } - } - return TimestampData.fromEpochMillis(millis); + }; } private static int convertToDate(Object object) { diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java index af7a936b270c9..82853cac3b42f 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java @@ -25,8 +25,10 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.util.CollectionUtil; import org.apache.avro.Schema; @@ -156,6 +158,7 @@ public Object convert(Schema schema, Object object) { }; break; case TIMESTAMP_WITHOUT_TIME_ZONE: + final int timestampPrecision = ((TimestampType) type).getPrecision(); if (legacyTimestampMapping) { converter = new RowDataToAvroConverter() { @@ -173,15 +176,26 @@ public Object convert(Schema schema, Object object) { @Override public Object convert(Schema schema, Object object) { - return ((TimestampData) object) - .toLocalDateTime() - .toInstant(ZoneOffset.UTC) - .toEpochMilli(); + java.time.Instant instant = + ((TimestampData) object) + .toLocalDateTime() + .toInstant(ZoneOffset.UTC); + if (timestampPrecision <= 3) { + return instant.toEpochMilli(); + } else if (timestampPrecision <= 6) { + return instant.getEpochSecond() * 1_000_000L + + instant.getNano() / 1000L; + } else { + return instant.getEpochSecond() * 1_000_000_000L + + instant.getNano(); + } } }; } break; case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int localZonedTimestampPrecision = + ((LocalZonedTimestampType) type).getPrecision(); if (legacyTimestampMapping) { throw new UnsupportedOperationException("Unsupported type: " + type); } else { @@ -191,7 +205,17 @@ public Object convert(Schema schema, Object object) { @Override public Object convert(Schema schema, Object object) { - return ((TimestampData) object).toInstant().toEpochMilli(); + java.time.Instant instant = + ((TimestampData) object).toInstant(); + if (localZonedTimestampPrecision <= 3) { + return instant.toEpochMilli(); + } else if (localZonedTimestampPrecision <= 6) { + return instant.getEpochSecond() * 1_000_000L + + instant.getNano() / 1000L; + } else { + return instant.getEpochSecond() * 1_000_000_000L + + instant.getNano(); + } } }; } diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java index 8bf5cbe2c07f7..2dcf6e930f07e 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java @@ -194,7 +194,8 @@ private static TypeInformation convertToTypeInfo( case LONG: if (legacyTimestampMapping) { if (schema.getLogicalType() == LogicalTypes.timestampMillis() - || schema.getLogicalType() == LogicalTypes.timestampMicros()) { + || schema.getLogicalType() == LogicalTypes.timestampMicros() + || schema.getLogicalType() == LogicalTypes.timestampNanos()) { return Types.SQL_TIMESTAMP; } else if (schema.getLogicalType() == LogicalTypes.timeMicros() || schema.getLogicalType() == LogicalTypes.timeMillis()) { @@ -203,10 +204,12 @@ private static TypeInformation convertToTypeInfo( } else { // Avro logical timestamp types to Flink DataStream timestamp types if (schema.getLogicalType() == LogicalTypes.timestampMillis() - || schema.getLogicalType() == LogicalTypes.timestampMicros()) { + || schema.getLogicalType() == LogicalTypes.timestampMicros() + || schema.getLogicalType() == LogicalTypes.timestampNanos()) { return Types.INSTANT; } else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis() - || schema.getLogicalType() == LogicalTypes.localTimestampMicros()) { + || schema.getLogicalType() == LogicalTypes.localTimestampMicros() + || schema.getLogicalType() == LogicalTypes.localTimestampNanos()) { return Types.LOCAL_DATE_TIME; } else if (schema.getLogicalType() == LogicalTypes.timeMicros() || schema.getLogicalType() == LogicalTypes.timeMillis()) { @@ -350,6 +353,8 @@ private static DataType convertToDataType(Schema schema, boolean legacyMapping) return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(); } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timestampNanos()) { + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9).notNull(); } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) { return DataTypes.TIME(3).notNull(); } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) { @@ -358,6 +363,8 @@ private static DataType convertToDataType(Schema schema, boolean legacyMapping) return DataTypes.TIMESTAMP(3).notNull(); } else if (schema.getLogicalType() == LogicalTypes.localTimestampMicros()) { return DataTypes.TIMESTAMP(6).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.localTimestampNanos()) { + return DataTypes.TIMESTAMP(9).notNull(); } } @@ -479,12 +486,14 @@ public static Schema convertToSchema( avroLogicalType = LogicalTypes.localTimestampMillis(); } else if (precision <= 6) { avroLogicalType = LogicalTypes.localTimestampMicros(); + } else if (precision <= 9) { + avroLogicalType = LogicalTypes.localTimestampNanos(); } else { throw new IllegalArgumentException( "Avro does not support LOCAL TIMESTAMP type " + "with precision: " + precision - + ", it only supports precision less than 6."); + + ", it only supports precision less than 9."); } } Schema timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType()); @@ -501,12 +510,14 @@ public static Schema convertToSchema( avroLogicalType = LogicalTypes.timestampMillis(); } else if (precision <= 6) { avroLogicalType = LogicalTypes.timestampMicros(); + } else if (precision <= 9) { + avroLogicalType = LogicalTypes.timestampNanos(); } else { throw new IllegalArgumentException( "Avro does not support TIMESTAMP type " + "with precision: " + precision - + ", it only supports precision less than 6."); + + ", it only supports precision less than 9."); } timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType()); return nullable ? nullableSchema(timestamp) : timestamp; diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java index 1347366cf8551..5dbca399504ee 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java @@ -25,7 +25,6 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.formats.avro.AvroOutputFormat.Codec; import org.apache.flink.formats.avro.generated.Colors; -import org.apache.flink.formats.avro.generated.Fixed2; import org.apache.flink.formats.avro.generated.User; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -185,12 +184,11 @@ public User map(Tuple3 value) { user.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS)); user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z")); user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)); + user.setTypeTimestampNanos(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS)); // 20.00 - user.setTypeDecimalBytes( - ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); + user.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2)); // 20.00 - user.setTypeDecimalFixed( - new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); + user.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2)); return user; } } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java index 92f43ac7d6562..217c0c12fa0fe 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java @@ -23,7 +23,6 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.avro.generated.Colors; -import org.apache.flink.formats.avro.generated.Fixed2; import org.apache.flink.formats.avro.generated.User; import org.apache.flink.mock.Whitebox; @@ -198,13 +197,12 @@ public int getAttemptNumber() { user.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS)); user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z")); user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)); + user.setTypeTimestampNanos(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS)); // 20.00 - user.setTypeDecimalBytes( - ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); + user.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2)); // 20.00 - user.setTypeDecimalFixed( - new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); + user.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2)); outputFormat.writeRecord(user); } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java index a3e2e6c0914d3..0f5fa12990a25 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java @@ -31,7 +31,6 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.formats.avro.generated.Address; import org.apache.flink.formats.avro.generated.Colors; -import org.apache.flink.formats.avro.generated.Fixed2; import org.apache.flink.formats.avro.generated.User; import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils; @@ -139,12 +138,11 @@ public static void writeTestFile(File testFile) throws IOException { user1.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS)); user1.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z")); user1.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)); + user1.setTypeTimestampNanos(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS)); // 20.00 - user1.setTypeDecimalBytes( - ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); + user1.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2)); // 20.00 - user1.setTypeDecimalFixed( - new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); + user1.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2)); // Construct via builder User user2 = @@ -179,14 +177,12 @@ public static void writeTestFile(File testFile) throws IOException { .setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z")) .setTypeTimestampMicros( Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)) + .setTypeTimestampNanos( + Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS)) // 20.00 - .setTypeDecimalBytes( - ByteBuffer.wrap( - BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) + .setTypeDecimalBytes(BigDecimal.valueOf(2000, 2)) // 20.00 - .setTypeDecimalFixed( - new Fixed2( - BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) + .setTypeDecimalFixed(BigDecimal.valueOf(2000, 2)) .build(); DatumWriter userDatumWriter = new SpecificDatumWriter<>(User.class); DataFileWriter dataFileWriter = new DataFileWriter<>(userDatumWriter); diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java index 84f9994f6665c..5349409823279 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java @@ -117,6 +117,7 @@ void testSerializeDeserialize(AvroEncoding encoding) throws Exception { FIELD("date", DATE()), FIELD("timestamp3", TIMESTAMP(3)), FIELD("timestamp3_2", TIMESTAMP(3)), + FIELD("timestamp9", TIMESTAMP(9)), FIELD("map", MAP(STRING(), BIGINT())), FIELD("map2map", MAP(STRING(), MAP(STRING(), INT()))), FIELD("map2array", MAP(STRING(), ARRAY(INT()))), @@ -124,7 +125,7 @@ void testSerializeDeserialize(AvroEncoding encoding) throws Exception { .notNull(); final RowType rowType = (RowType) dataType.getLogicalType(); - final Schema schema = AvroSchemaConverter.convertToSchema(rowType); + final Schema schema = AvroSchemaConverter.convertToSchema(rowType, false); final GenericRecord record = new GenericData.Record(schema); record.put(0, true); record.put(1, (int) Byte.MAX_VALUE); @@ -149,34 +150,35 @@ void testSerializeDeserialize(AvroEncoding encoding) throws Exception { record.put(12, 10087); record.put(13, 1589530213123L); record.put(14, 1589530213122L); + record.put(15, 1589530213123456789L); Map map = new HashMap<>(); map.put("flink", 12L); map.put("avro", 23L); - record.put(15, map); + record.put(16, map); Map> map2map = new HashMap<>(); Map innerMap = new HashMap<>(); innerMap.put("inner_key1", 123); innerMap.put("inner_key2", 234); map2map.put("outer_key", innerMap); - record.put(16, map2map); + record.put(17, map2map); List list1 = Arrays.asList(1, 2, 3, 4, 5, 6); List list2 = Arrays.asList(11, 22, 33, 44, 55); Map> map2list = new HashMap<>(); map2list.put("list1", list1); map2list.put("list2", list2); - record.put(17, map2list); + record.put(18, map2list); Map map2 = new HashMap<>(); map2.put("key1", null); - record.put(18, map2); + record.put(19, map2); AvroRowDataSerializationSchema serializationSchema = - createSerializationSchema(dataType, encoding, true); + createSerializationSchema(dataType, encoding, false); AvroRowDataDeserializationSchema deserializationSchema = - createDeserializationSchema(dataType, encoding, true); + createDeserializationSchema(dataType, encoding, false); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); GenericDatumWriter datumWriter = new GenericDatumWriter<>(schema); @@ -358,6 +360,10 @@ void testTimestampTypeLegacyMapping() throws Exception { .isEqualTo(new AtomicDataType(new BigIntType(false))); assertThat(dataType.getChildren().get(3)) .isEqualTo(new AtomicDataType(new BigIntType(false))); + assertThat(dataType.getChildren().get(4)) + .isEqualTo(new AtomicDataType(new BigIntType(false))); + assertThat(dataType.getChildren().get(5)) + .isEqualTo(new AtomicDataType(new BigIntType(false))); assertThatThrownBy(() -> createSerializationSchema(dataType, AvroEncoding.BINARY, true)) .isInstanceOf(IllegalArgumentException.class) @@ -396,10 +402,14 @@ void testTimestampTypeNewMapping() throws Exception { RowData rowData2 = deserializationSchema.deserialize(output); assertThat(rowData2).isEqualTo(rowData); - assertThat(rowData.getTimestamp(2, 3).toLocalDateTime().toString()) + assertThat(rowData.getTimestamp(2, 9).toInstant().toString()) + .isEqualTo("1970-01-01T00:00:00.123456789Z"); + assertThat(rowData.getTimestamp(3, 3).toLocalDateTime().toString()) .isEqualTo("2014-03-01T12:12:12.321"); - assertThat(rowData.getTimestamp(3, 6).toLocalDateTime().toString()) - .isEqualTo("1970-01-01T00:02:03.456"); + assertThat(rowData.getTimestamp(4, 6).toLocalDateTime().toString()) + .isEqualTo("1970-01-01T00:00:00.123456"); + assertThat(rowData.getTimestamp(5, 9).toLocalDateTime().toString()) + .isEqualTo("1970-01-01T00:00:00.123456789"); } private AvroRowDataSerializationSchema createSerializationSchema( diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java index f6fe06342c2bb..f082262306c38 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java @@ -25,7 +25,6 @@ import org.apache.flink.formats.avro.generated.Address; import org.apache.flink.formats.avro.generated.Colors; import org.apache.flink.formats.avro.generated.Fixed16; -import org.apache.flink.formats.avro.generated.Fixed2; import org.apache.flink.formats.avro.generated.User; import org.apache.avro.file.DataFileWriter; @@ -119,12 +118,11 @@ void createFiles(@TempDir java.nio.file.Path tempDir) throws IOException { user1.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS)); user1.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z")); user1.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)); + user1.setTypeTimestampNanos(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS)); // 20.00 - user1.setTypeDecimalBytes( - ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); + user1.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2)); // 20.00 - user1.setTypeDecimalFixed( - new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); + user1.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2)); // Construct via builder User user2 = @@ -159,14 +157,12 @@ void createFiles(@TempDir java.nio.file.Path tempDir) throws IOException { .setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z")) .setTypeTimestampMicros( Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)) + .setTypeTimestampNanos( + Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS)) // 20.00 - .setTypeDecimalBytes( - ByteBuffer.wrap( - BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) + .setTypeDecimalBytes(BigDecimal.valueOf(2000, 2)) // 20.00 - .setTypeDecimalFixed( - new Fixed2( - BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) + .setTypeDecimalFixed(BigDecimal.valueOf(2000, 2)) .build(); DatumWriter userDatumWriter = new SpecificDatumWriter<>(User.class); DataFileWriter dataFileWriter = new DataFileWriter<>(userDatumWriter); @@ -198,12 +194,11 @@ void createFiles(@TempDir java.nio.file.Path tempDir) throws IOException { user.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS)); user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z")); user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)); + user.setTypeTimestampNanos(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS)); // 20.00 - user.setTypeDecimalBytes( - ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); + user.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2)); // 20.00 - user.setTypeDecimalFixed( - new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); + user.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2)); dataFileWriter.append(user); } @@ -234,7 +229,7 @@ void testSplittedIF() throws IOException { format.close(); } - assertThat(elementsPerSplit).containsExactly(1604, 1203, 1203, 990); + assertThat(elementsPerSplit).containsExactly(1564, 1173, 1173, 1090); assertThat(elements).isEqualTo(NUM_RECORDS); format.close(); } @@ -280,7 +275,7 @@ void testAvroRecoveryWithFailureAtStart() throws Exception { format.close(); } - assertThat(elementsPerSplit).containsExactly(1604, 1203, 1203, 990); + assertThat(elementsPerSplit).containsExactly(1564, 1173, 1173, 1090); assertThat(elements).isEqualTo(NUM_RECORDS); format.close(); } @@ -326,7 +321,7 @@ void testAvroRecovery() throws Exception { format.close(); } - assertThat(elementsPerSplit).containsExactly(1604, 1203, 1203, 990); + assertThat(elementsPerSplit).containsExactly(1564, 1173, 1173, 1090); assertThat(elements).isEqualTo(NUM_RECORDS); format.close(); } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java index 65b470f0b5fd8..67ea9ea01a047 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java @@ -21,7 +21,6 @@ import org.apache.flink.formats.avro.generated.Address; import org.apache.flink.formats.avro.generated.Colors; import org.apache.flink.formats.avro.generated.Fixed16; -import org.apache.flink.formats.avro.generated.Fixed2; import org.apache.flink.formats.avro.generated.User; import org.apache.flink.formats.avro.utils.DataInputDecoder; import org.apache.flink.formats.avro.utils.DataOutputEncoder; @@ -299,12 +298,9 @@ void testGeneratedObjectWithNullableFields() { LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS), Instant.parse("2014-03-01T12:12:12.321Z"), Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS), - ByteBuffer.wrap( - BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()), // 20.00 - new Fixed2( - BigDecimal.valueOf(2000, 2) - .unscaledValue() - .toByteArray())); // 20.00 + Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS), + BigDecimal.valueOf(2000, 2), + BigDecimal.valueOf(2000, 2)); testObjectSerialization(user); } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java index 0cf0033341c3b..9f2eda069fd97 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java @@ -610,6 +610,7 @@ private void validateUserSchema(TypeInformation actual) { "type_time_micros", "type_timestamp_millis", "type_timestamp_micros", + "type_timestamp_nanos", "type_decimal_bytes", "type_decimal_fixed" }, @@ -634,6 +635,7 @@ private void validateUserSchema(TypeInformation actual) { Types.SQL_TIME, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, + Types.SQL_TIMESTAMP, Types.BIG_DEC, Types.BIG_DEC); @@ -649,11 +651,15 @@ private void validateTimestampsSchema(TypeInformation actual) { new String[] { "type_timestamp_millis", "type_timestamp_micros", + "type_timestamp_nanos", "type_local_timestamp_millis", - "type_local_timestamp_micros" + "type_local_timestamp_micros", + "type_local_timestamp_nanos" }, Types.INSTANT, Types.INSTANT, + Types.INSTANT, + Types.LOCAL_DATE_TIME, Types.LOCAL_DATE_TIME, Types.LOCAL_DATE_TIME); final RowTypeInfo timestampsRowTypeInfo = (RowTypeInfo) timestamps; @@ -666,11 +672,15 @@ private void validateLegacyTimestampsSchema(TypeInformation actual) { new String[] { "type_timestamp_millis", "type_timestamp_micros", + "type_timestamp_nanos", "type_local_timestamp_millis", - "type_local_timestamp_micros" + "type_local_timestamp_micros", + "type_local_timestamp_nanos" }, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, + Types.SQL_TIMESTAMP, + Types.LONG, Types.LONG, Types.LONG); final RowTypeInfo timestampsRowTypeInfo = (RowTypeInfo) timestamps; @@ -684,12 +694,16 @@ private void validateLegacyTimestampsSchema(DataType actual) { "type_timestamp_millis", DataTypes.TIMESTAMP(3).notNull()), DataTypes.FIELD( "type_timestamp_micros", DataTypes.TIMESTAMP(6).notNull()), + DataTypes.FIELD( + "type_timestamp_nanos", DataTypes.TIMESTAMP(9).notNull()), DataTypes.FIELD( "type_local_timestamp_millis", DataTypes.BIGINT().notNull()), DataTypes.FIELD( "type_local_timestamp_micros", - DataTypes.BIGINT().notNull())) + DataTypes.BIGINT().notNull()), + DataTypes.FIELD( + "type_local_timestamp_nanos", DataTypes.BIGINT().notNull())) .notNull(); assertThat(actual).isEqualTo(timestamps); @@ -704,12 +718,18 @@ private void validateTimestampsSchema(DataType actual) { DataTypes.FIELD( "type_timestamp_micros", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull()), + DataTypes.FIELD( + "type_timestamp_nanos", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9).notNull()), DataTypes.FIELD( "type_local_timestamp_millis", DataTypes.TIMESTAMP(3).notNull()), DataTypes.FIELD( "type_local_timestamp_micros", - DataTypes.TIMESTAMP(6).notNull())) + DataTypes.TIMESTAMP(6).notNull()), + DataTypes.FIELD( + "type_local_timestamp_nanos", + DataTypes.TIMESTAMP(9).notNull())) .notNull(); assertThat(actual).isEqualTo(timestamps); @@ -765,6 +785,8 @@ private void validateUserSchema(DataType actual) { "type_timestamp_millis", DataTypes.TIMESTAMP(3).notNull()), DataTypes.FIELD( "type_timestamp_micros", DataTypes.TIMESTAMP(6).notNull()), + DataTypes.FIELD( + "type_timestamp_nanos", DataTypes.BIGINT().notNull()), DataTypes.FIELD( "type_decimal_bytes", DataTypes.DECIMAL(4, 2).notNull()), DataTypes.FIELD( diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java index 6975e7c882ec8..16b09406a422e 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java @@ -102,14 +102,15 @@ void testSimpleAvroRead(boolean useMiniCluster, @InjectMiniCluster MiniCluster m + "\"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, " + "\"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], " + "\"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", " - + "\"type_map\": {\"KEY 2\": 17554, \"KEY 1\": 8546456}, \"type_fixed\": null, \"type_union\": null, " + + "\"type_map\": {\"KEY 1\": 8546456, \"KEY 2\": 17554}, \"type_fixed\": null, \"type_union\": null, " + "\"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", " + "\"state\": \"London\", \"zip\": \"NW1 6XE\"}, " + "\"type_bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", " + "\"type_date\": \"2014-03-01\", \"type_time_millis\": \"12:12:12\", \"type_time_micros\": \"00:00:00.123456\", " + "\"type_timestamp_millis\": \"2014-03-01T12:12:12.321Z\", " + "\"type_timestamp_micros\": \"1970-01-01T00:00:00.123456Z\", " - + "\"type_decimal_bytes\": \"\\u0007Ð\", \"type_decimal_fixed\": [7, -48]}\n" + + "\"type_timestamp_nanos\": \"1970-01-01T00:00:00.123456789Z\", " + + "\"type_decimal_bytes\": 20.00, \"type_decimal_fixed\": 20.00}\n" + "{\"name\": \"Charlie\", \"favorite_number\": null, " + "\"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, " + "\"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], " @@ -121,8 +122,9 @@ void testSimpleAvroRead(boolean useMiniCluster, @InjectMiniCluster MiniCluster m + "\"type_date\": \"2014-03-01\", \"type_time_millis\": \"12:12:12\", \"type_time_micros\": \"00:00:00.123456\", " + "\"type_timestamp_millis\": \"2014-03-01T12:12:12.321Z\", " + "\"type_timestamp_micros\": \"1970-01-01T00:00:00.123456Z\", " - + "\"type_decimal_bytes\": \"\\u0007Ð\", " - + "\"type_decimal_fixed\": [7, -48]}\n"; + + "\"type_timestamp_nanos\": \"1970-01-01T00:00:00.123456789Z\", " + + "\"type_decimal_bytes\": 20.00, " + + "\"type_decimal_fixed\": 20.00}\n"; } @ParameterizedTest @@ -163,7 +165,8 @@ void testSerializeWithAvro(boolean useMiniCluster, @InjectMiniCluster MiniCluste + "\"type_date\": \"2014-03-01\", \"type_time_millis\": \"12:12:12\", \"type_time_micros\": \"00:00:00.123456\", " + "\"type_timestamp_millis\": \"2014-03-01T12:12:12.321Z\", " + "\"type_timestamp_micros\": \"1970-01-01T00:00:00.123456Z\", " - + "\"type_decimal_bytes\": \"\\u0007Ð\", \"type_decimal_fixed\": [7, -48]}\n" + + "\"type_timestamp_nanos\": \"1970-01-01T00:00:00.123456789Z\", " + + "\"type_decimal_bytes\": 20.00, \"type_decimal_fixed\": 20.00}\n" + "{\"name\": \"Charlie\", \"favorite_number\": null, " + "\"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, " + "\"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], " @@ -175,7 +178,8 @@ void testSerializeWithAvro(boolean useMiniCluster, @InjectMiniCluster MiniCluste + "\"type_date\": \"2014-03-01\", \"type_time_millis\": \"12:12:12\", \"type_time_micros\": \"00:00:00.123456\", " + "\"type_timestamp_millis\": \"2014-03-01T12:12:12.321Z\", " + "\"type_timestamp_micros\": \"1970-01-01T00:00:00.123456Z\", " - + "\"type_decimal_bytes\": \"\\u0007Ð\", \"type_decimal_fixed\": [7, -48]}\n"; + + "\"type_timestamp_nanos\": \"1970-01-01T00:00:00.123456789Z\", " + + "\"type_decimal_bytes\": 20.00, \"type_decimal_fixed\": 20.00}\n"; } @ParameterizedTest diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java index b4e12f8ca10ee..b2fb563e61e89 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java @@ -24,7 +24,6 @@ import org.apache.flink.formats.avro.generated.Address; import org.apache.flink.formats.avro.generated.Colors; import org.apache.flink.formats.avro.generated.Fixed16; -import org.apache.flink.formats.avro.generated.Fixed2; import org.apache.flink.formats.avro.generated.Timestamps; import org.apache.flink.formats.avro.generated.User; import org.apache.flink.formats.avro.typeutils.AvroSerializerLargeGenericRecordTest; @@ -107,20 +106,15 @@ public final class AvroTestUtils { .setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z")) .setTypeTimestampMicros( Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)) + .setTypeTimestampNanos( + Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS)) // byte array must contain the two's-complement representation of the // unscaled integer value in big-endian byte order - .setTypeDecimalBytes( - ByteBuffer.wrap( - BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) - // array of length n can store at most - // Math.floor(Math.log10(Math.pow(2, 8 * n - 1) - 1)) - // base-10 digits of precision - .setTypeDecimalFixed( - new Fixed2( - BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) + .setTypeDecimalBytes(BigDecimal.valueOf(2000, 2)) + .setTypeDecimalFixed(BigDecimal.valueOf(2000, 2)) .build(); - final Row rowUser = new Row(23); + final Row rowUser = new Row(24); rowUser.setField(0, "Charlie"); rowUser.setField(1, null); rowUser.setField(2, "blue"); @@ -144,8 +138,10 @@ public final class AvroTestUtils { rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321")); rowUser.setField( 20, Timestamp.from(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))); - rowUser.setField(21, BigDecimal.valueOf(2000, 2)); + rowUser.setField( + 21, Timestamp.from(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS))); rowUser.setField(22, BigDecimal.valueOf(2000, 2)); + rowUser.setField(23, BigDecimal.valueOf(2000, 2)); final Tuple3, SpecificRecord, Row> t = new Tuple3<>(); t.f0 = User.class; @@ -176,7 +172,8 @@ public static Tuple3 getGenericTestData() { + "{\"name\":\"type_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"type_time_micros\"," + "\"type\":{\"type\":\"long\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\"," + "\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\"," - + "\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\"," + + "\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_timestamp_nanos\",\"type\":{\"type\":\"long\"," + + "\"logicalType\":\"timestamp-nanos\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\"," + "\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},{\"name\":\"type_decimal_fixed\",\"type\":{\"type\":\"fixed\"," + "\"name\":\"Fixed2\",\"size\":2,\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}"; final Schema schema = new Schema.Parser().parse(schemaString); @@ -222,6 +219,9 @@ public static Tuple3 getGenericTestData() { user.put("type_timestamp_millis", Instant.parse("2014-03-01T12:12:12.321Z")); user.put( "type_timestamp_micros", Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)); + user.put( + "type_timestamp_nanos", + Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS)); user.put( "type_decimal_bytes", ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); @@ -231,7 +231,7 @@ public static Tuple3 getGenericTestData() { schema.getField("type_decimal_fixed").schema(), BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); - final Row rowUser = new Row(23); + final Row rowUser = new Row(24); rowUser.setField(0, "Charlie"); rowUser.setField(1, null); rowUser.setField(2, "blue"); @@ -255,8 +255,10 @@ public static Tuple3 getGenericTestData() { rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321")); rowUser.setField( 20, Timestamp.from(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))); - rowUser.setField(21, BigDecimal.valueOf(2000, 2)); + rowUser.setField( + 21, Timestamp.from(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS))); rowUser.setField(22, BigDecimal.valueOf(2000, 2)); + rowUser.setField(23, BigDecimal.valueOf(2000, 2)); final Tuple3 t = new Tuple3<>(); t.f0 = user; @@ -274,34 +276,49 @@ public static Tuple3 getGenericTestData() { + "\"fields\": [{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\"," + "\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\"," + "\"logicalType\":\"timestamp-micros\"}},{\"name\": \"type_local_timestamp_millis\", \"type\": {\"type\": \"long\", \"logicalType\": \"local-timestamp-millis\"}}," - + "{\"name\": \"type_local_timestamp_micros\", \"type\": {\"type\": \"long\", \"logicalType\": \"local-timestamp-micros\"}}]}"; + + "{\"name\": \"type_local_timestamp_micros\", \"type\": {\"type\": \"long\", \"logicalType\": \"local-timestamp-micros\"}}," + + "{\"name\": \"type_timestamp_nanos\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}}," + + "{\"name\": \"type_local_timestamp_nanos\", \"type\": {\"type\": \"long\", \"logicalType\": \"local-timestamp-nanos\"}}]}"; final Schema schema = new Schema.Parser().parse(schemaString); final GenericRecord timestampRecord = new GenericData.Record(schema); timestampRecord.put("type_timestamp_millis", Instant.parse("2014-03-01T12:12:12.321Z")); timestampRecord.put( "type_timestamp_micros", Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)); + timestampRecord.put( + "type_timestamp_nanos", + Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS)); timestampRecord.put( "type_local_timestamp_millis", LocalDateTime.parse("2014-03-01T12:12:12.321")); timestampRecord.put( "type_local_timestamp_micros", LocalDateTime.parse("1970-01-01T00:00:00.123456")); + timestampRecord.put( + "type_local_timestamp_nanos", LocalDateTime.parse("1970-01-01T00:00:00.123456789")); final Timestamps timestamps = Timestamps.newBuilder() .setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z")) .setTypeTimestampMicros( Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)) + .setTypeTimestampNanos( + Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS)) .setTypeLocalTimestampMillis(LocalDateTime.parse("2014-03-01T12:12:12.321")) .setTypeLocalTimestampMicros( LocalDateTime.parse("1970-01-01T00:00:00.123456")) + .setTypeLocalTimestampNanos( + LocalDateTime.parse("1970-01-01T00:00:00.123456789")) .build(); - final Row timestampRow = new Row(4); + final Row timestampRow = new Row(6); timestampRow.setField(0, Timestamp.valueOf("2014-03-01 12:12:12.321")); timestampRow.setField( 1, Timestamp.from(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))); timestampRow.setField(2, Timestamp.valueOf(LocalDateTime.parse("2014-03-01T12:12:12.321"))); timestampRow.setField( 3, Timestamp.valueOf(LocalDateTime.parse("1970-01-01T00:00:00.123456"))); + timestampRow.setField( + 4, Timestamp.from(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS))); + timestampRow.setField( + 5, Timestamp.valueOf(LocalDateTime.parse("1970-01-01T00:00:00.123456789"))); final Tuple4, SpecificRecord, GenericRecord, Row> t = new Tuple4<>(); diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java index 13686bb2b5f23..01b63aeaee44e 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java @@ -21,7 +21,6 @@ import org.apache.flink.formats.avro.generated.Address; import org.apache.flink.formats.avro.generated.Colors; import org.apache.flink.formats.avro.generated.Fixed16; -import org.apache.flink.formats.avro.generated.Fixed2; import org.apache.flink.formats.avro.generated.SimpleUser; import org.apache.flink.formats.avro.generated.User; @@ -62,8 +61,9 @@ public static User generateRandomUser(Random rnd) { LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS), Instant.parse("2014-03-01T12:12:12.321Z"), Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS), - ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()), - new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); + Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS), + BigDecimal.valueOf(2000, 2), + BigDecimal.valueOf(2000, 2)); } public static SimpleUser generateRandomSimpleUser(Random rnd) { diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java index 3c97e2269804c..72924a6753b3a 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java @@ -21,7 +21,6 @@ import org.apache.flink.formats.avro.generated.Address; import org.apache.flink.formats.avro.generated.Colors; import org.apache.flink.formats.avro.generated.Fixed16; -import org.apache.flink.formats.avro.generated.Fixed2; import org.apache.flink.formats.avro.generated.User; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -84,11 +83,10 @@ class AvroTypesITCase extends AbstractTestBase { .setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z")) .setTypeTimestampMicros( Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)) - .setTypeDecimalBytes( - ByteBuffer.wrap( - BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) - .setTypeDecimalFixed( - new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) + .setTypeTimestampNanos( + Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS)) + .setTypeDecimalBytes(BigDecimal.valueOf(2000, 2)) + .setTypeDecimalFixed(BigDecimal.valueOf(2000, 2)) .build(); private static final User USER_2 = @@ -114,11 +112,10 @@ class AvroTypesITCase extends AbstractTestBase { .setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z")) .setTypeTimestampMicros( Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)) - .setTypeDecimalBytes( - ByteBuffer.wrap( - BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) - .setTypeDecimalFixed( - new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) + .setTypeTimestampNanos( + Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS)) + .setTypeDecimalBytes(BigDecimal.valueOf(2000, 2)) + .setTypeDecimalFixed(BigDecimal.valueOf(2000, 2)) .build(); private static final User USER_3 = @@ -144,11 +141,10 @@ class AvroTypesITCase extends AbstractTestBase { .setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z")) .setTypeTimestampMicros( Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)) - .setTypeDecimalBytes( - ByteBuffer.wrap( - BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) - .setTypeDecimalFixed( - new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) + .setTypeTimestampNanos( + Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS)) + .setTypeDecimalBytes(BigDecimal.valueOf(2000, 2)) + .setTypeDecimalFixed(BigDecimal.valueOf(2000, 2)) .build(); @Test @@ -168,15 +164,15 @@ void testAvroToRow() throws Exception { "+I[Charlie, null, blue, 1337, 1.337, null, false, [], [], null, RED, {}, null, null, " + "{\"num\": 42, \"street\": \"Bakerstreet\", \"city\": \"Berlin\", \"state\": \"Berlin\", \"zip\": \"12049\"}, " + "java.nio.HeapByteBuffer[pos=0 lim=10 cap=10], 2014-03-01, 12:12:12.345, 00:00:00.123456, 2014-03-01T12:12:12.321Z, " - + "1970-01-01T00:00:00.123456Z, java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, -48]]\n" + + "1970-01-01T00:00:00.123456Z, 1970-01-01T00:00:00.123456789Z, java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, -48]]\n" + "+I[Whatever, null, black, 42, 0.0, null, true, [hello], [true], null, GREEN, {}, " + "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], null, null, " + "java.nio.HeapByteBuffer[pos=0 lim=10 cap=10], 2014-03-01, 12:12:12, 00:00:00.123456, 2014-03-01T12:12:12.321Z, " - + "1970-01-01T00:00:00.123456Z, java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, -48]]\n" + + "1970-01-01T00:00:00.123456Z, 1970-01-01T00:00:00.123456789Z, java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, -48]]\n" + "+I[Terminator, null, yellow, 1, 0.0, null, false, [world], [false], null, GREEN, {}, " + "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], null, null, " + "java.nio.HeapByteBuffer[pos=0 lim=10 cap=10], 2014-03-01, 12:12:12, 00:00:00.123456, 2014-03-01T12:12:12.321Z, " - + "1970-01-01T00:00:00.123456Z, java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, -48]]"; + + "1970-01-01T00:00:00.123456Z, 1970-01-01T00:00:00.123456789Z, java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, -48]]"; TestBaseUtils.compareResultAsText(results, expected); } diff --git a/flink-formats/flink-avro/src/test/resources/avro/user.avsc b/flink-formats/flink-avro/src/test/resources/avro/user.avsc index 0462ec12539e4..f1b3f02b0e172 100644 --- a/flink-formats/flink-avro/src/test/resources/avro/user.avsc +++ b/flink-formats/flink-avro/src/test/resources/avro/user.avsc @@ -44,6 +44,7 @@ {"name": "type_time_micros", "type": {"type": "long", "logicalType": "time-micros"}}, {"name": "type_timestamp_millis", "type": {"type": "long", "logicalType": "timestamp-millis"}}, {"name": "type_timestamp_micros", "type": {"type": "long", "logicalType": "timestamp-micros"}}, + {"name": "type_timestamp_nanos", "type": {"type": "long", "logicalType": "timestamp-nanos"}}, {"name": "type_decimal_bytes", "type": {"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2}}, {"name": "type_decimal_fixed", "type": {"name": "Fixed2", "size": 2, "type": "fixed", "logicalType": "decimal", "precision": 4, "scale": 2}} ] @@ -122,8 +123,10 @@ "fields": [ {"name": "type_timestamp_millis", "type": {"type": "long", "logicalType": "timestamp-millis"}}, {"name": "type_timestamp_micros", "type": {"type": "long", "logicalType": "timestamp-micros"}}, + {"name": "type_timestamp_nanos", "type": {"type": "long", "logicalType": "timestamp-nanos"}}, {"name": "type_local_timestamp_millis", "type": {"type": "long", "logicalType": "local-timestamp-millis"}}, - {"name": "type_local_timestamp_micros", "type": {"type": "long", "logicalType": "local-timestamp-micros"}} + {"name": "type_local_timestamp_micros", "type": {"type": "long", "logicalType": "local-timestamp-micros"}}, + {"name": "type_local_timestamp_nanos", "type": {"type": "long", "logicalType": "local-timestamp-nanos"}} ] } ] diff --git a/pom.xml b/pom.xml index 2bd47d01bdccb..9c665dec6c852 100644 --- a/pom.xml +++ b/pom.xml @@ -148,7 +148,7 @@ under the License. 5.4.0 - 1.11.4 + 1.12.1 1.9.14.jdk17-redhat-00001 2.20.1