diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index f9a780b42007..c1754c9978da 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -155,6 +155,9 @@ static void insertNumber(IColumn & column, WhichDataType type, T value) case TypeIndex::DateTime64: assert_cast &>(column).insertValue(static_cast(value)); break; + case TypeIndex::Time64: + assert_cast &>(column).insertValue(static_cast(value)); + break; case TypeIndex::IPv4: assert_cast(column).insertValue(IPv4(static_cast(value))); break; @@ -303,6 +306,8 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(const avro return createDecimalDeserializeFn(root_node, target_type, false); if (target.isDateTime64()) return createDecimalDeserializeFn(root_node, target_type, false); + if (target.isTime64()) + return createDecimalDeserializeFn(root_node, target_type, false); break; case avro::AVRO_INT: if (target_type->isValueRepresentedByNumber()) @@ -1282,8 +1287,11 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node) { case avro::Type::AVRO_INT: { - if (node->logicalType().type() == avro::LogicalType::DATE) + auto logical_type = node->logicalType(); + if (logical_type.type() == avro::LogicalType::DATE) return {std::make_shared()}; + if (logical_type.type() == avro::LogicalType::TIME_MILLIS) + return {std::make_shared(3)}; return {std::make_shared()}; } @@ -1294,6 +1302,10 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node) return {std::make_shared(3)}; if (logical_type.type() == avro::LogicalType::TIMESTAMP_MICROS) return {std::make_shared(6)}; + if (logical_type.type() == avro::LogicalType::TIME_MILLIS) + return {std::make_shared(3)}; + if (logical_type.type() == avro::LogicalType::TIME_MICROS) + return {std::make_shared(6)}; return std::make_shared(); } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp index 01ad28b2e593..c09bbec5b920 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp @@ -117,6 +117,8 @@ bool canDumpIcebergStats(const Field & field, DataTypePtr type) case TypeIndex::Date32: case TypeIndex::Int64: case TypeIndex::DateTime64: + case TypeIndex::Time: + case TypeIndex::Time64: case TypeIndex::String: return true; default: @@ -143,7 +145,9 @@ std::vector dumpFieldToBytes(const Field & field, DataTypePtr type) case TypeIndex::Date32: return dumpValue(field.safeGet()); case TypeIndex::Int64: + case TypeIndex::Time: return dumpValue(field.safeGet()); + case TypeIndex::Time64: case TypeIndex::DateTime64: return dumpValue(field.safeGet().getValue().value); case TypeIndex::String: diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp index 56a2cd976f6b..457baaece6cf 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp @@ -243,7 +243,7 @@ DataTypePtr IcebergSchemaProcessor::getSimpleType(const String & type_name, Cont if (type_name == f_date) return std::make_shared(); if (type_name == f_time) - return std::make_shared(); + return std::make_shared(6); if (type_name == f_timestamp) return std::make_shared(6); if (type_name == f_timestamptz) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index f4540f68bb8e..5bbe88148b5a 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -302,6 +302,7 @@ std::pair getIcebergType(DataTypePtr type, Int32 & ite case TypeIndex::DateTime64: return {"timestamp", true}; case TypeIndex::Time: + case TypeIndex::Time64: return {"time", true}; case TypeIndex::String: return {"string", true}; diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 7ca0725a1cc7..28773c9585d6 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -982,9 +982,7 @@ def test_partitioning_by_time(started_cluster, storage_type): create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) - # Fix test when https://github.com/Altinity/ClickHouse/issues/15355 is resolved - # Must be 43200 - assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}`") == "43200000000\ttest\n" + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}`") == "12:00:00.000000\ttest\n" @pytest.mark.parametrize("storage_type", ["s3"]) @@ -1012,6 +1010,12 @@ def test_partitioning_by_string(started_cluster, storage_type): field_type=StringType(), required=False, ), + NestedField( + field_id=3, + name="time_value", + field_type=TimeType(), + required=False, + ), ) partition_spec = PartitionSpec( @@ -1021,10 +1025,10 @@ def test_partitioning_by_string(started_cluster, storage_type): ) table = create_table(catalog, namespace, table_name, schema=schema, partition_spec=partition_spec) - data = [{"key": "a:b,c[d=e/f%g?h", "value": "test"}] + data = [{"key": "a:b,c[d=e/f%g?h", "value": "test", "time_value": dtime(12,0,0)}] df = pa.Table.from_pylist(data) table.append(df) create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) - assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}`") == "a:b,c[d=e/f%g?h\ttest\n" + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}`") == "a:b,c[d=e/f%g?h\ttest\t12:00:00.000000\n"