[SPARK-56175][SQL] FileTable implements SupportsPartitionManagement and V2 catalog table loading#55034
Draft
LuciferYang wants to merge 2 commits intoapache:masterfrom
Draft
[SPARK-56175][SQL] FileTable implements SupportsPartitionManagement and V2 catalog table loading#55034LuciferYang wants to merge 2 commits intoapache:masterfrom
LuciferYang wants to merge 2 commits intoapache:masterfrom
Conversation
…Frame API writes and delete FallBackFileSourceV2 Key changes: - FileWrite: added partitionSchema, customPartitionLocations, dynamicPartitionOverwrite, isTruncate; path creation and truncate logic; dynamic partition overwrite via FileCommitProtocol - FileTable: createFileWriteBuilder with SupportsDynamicOverwrite and SupportsTruncate; capabilities now include TRUNCATE and OVERWRITE_DYNAMIC; fileIndex skips file existence checks when userSpecifiedSchema is provided (write path) - All file format writes (Parquet, ORC, CSV, JSON, Text, Avro) use createFileWriteBuilder with partition/truncate/overwrite support - DataFrameWriter.lookupV2Provider: enabled FileDataSourceV2 for non-partitioned Append and Overwrite via df.write.save(path) - DataFrameWriter.insertInto: V1 fallback for file sources (TODO: SPARK-56175) - DataFrameWriter.saveAsTable: V1 fallback for file sources (TODO: SPARK-56230, needs StagingTableCatalog) - DataSourceV2Utils.getTableProvider: V1 fallback for file sources (TODO: SPARK-56175) - Removed FallBackFileSourceV2 rule - V2SessionCatalog.createTable: V1 FileFormat data type validation
…catalog table loading, and gate removal Key changes: - FileTable extends SupportsPartitionManagement with createPartition, dropPartition, listPartitionIdentifiers, partitionSchema - Partition operations sync to catalog metastore (best-effort) - V2SessionCatalog.loadTable returns FileTable instead of V1Table, sets catalogTable and useCatalogFileIndex on FileTable - V2SessionCatalog.getDataSourceOptions includes storage.properties for proper option propagation (header, ORC bloom filter, etc.) - V2SessionCatalog.createTable validates data types via FileTable - FileTable.columns() restores NOT NULL constraints from catalogTable - FileTable.partitioning() falls back to userSpecifiedPartitioning or catalog partition columns - FileTable.fileIndex uses CatalogFileIndex when catalog has registered partitions (custom partition locations) - FileTable.schema checks column name duplication for non-catalog tables only - DataSourceV2Utils.getTableProvider: removed FileDataSourceV2 gate - DataFrameWriter.insertInto: enabled V2 for file sources - DataFrameWriter.saveAsTable: V1 fallback (TODO: SPARK-56230) - ResolveSessionCatalog: V1 fallback for FileTable-backed commands (AnalyzeTable, AnalyzeColumn, TruncateTable, TruncatePartition, ShowPartitions, RecoverPartitions, AddPartitions, RenamePartitions, DropPartitions, SetTableLocation, CREATE TABLE validation, REPLACE TABLE blocking) - FindDataSourceTable: streaming V1 fallback for FileTable (TODO: SPARK-56233) - DataSource.planForWritingFileFormat: graceful V2 handling
Contributor
Author
|
This pr takes #54998 as the baseline. It is the second step for SPARK-56170. Commit f853912 contains the actual changes for SPARK-56175. SPARK-56170 seems to involve a considerable number of tasks, and I'm not sure if all of them can be completed before the 4.2 release. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This PR is part of SPARK-56170 (Remove file source V2 gate and unify V1/V2 file source paths). It makes three major changes:
1. FileTable implements SupportsPartitionManagement
FileTablenow extendsSupportsPartitionManagementwith filesystem-based partition operations:createPartition: creates partition directory and syncs to catalog metastoredropPartition: deletes partition directory and syncs to catalog metastorelistPartitionIdentifiers: discovers partitions from filesystem directory structurepartitionSchema: returns partition schema from fileIndex, userSpecifiedPartitioning, or catalogTable2. V2SessionCatalog.loadTable returns FileTable
V2SessionCatalog.loadTablenow returns the V2FileTable(e.g.,ParquetTable,OrcTable,CSVTable) instead ofV1Tablefor file-based catalog tables. This enables V2 capabilities for catalog tables:catalogTableanduseCatalogFileIndexonFileTablefor catalog metadata accessgetDataSourceOptionsincludesstorage.propertiesfor proper option propagation (CSV header, ORC bloom filter columns, etc.)FileTable.columns()restores NOT NULL constraints from catalog table metadataFileTable.partitioning()falls back to catalog partition columns when fileIndex has no partition infoFileTable.fileIndexusesCatalogFileIndexwhen catalog has registered partitions with custom locationsFileTable.schemachecks column name duplication for non-catalog tables only (catalog tables are handled by the analyzer)3. Gate removal and V1 fallbacks
FileDataSourceV2gate that prevented V2 provider resolution for file sourcesReplaceTableAsSelectwhich requiresStagingTableCatalog(TODO: SPARK-56230)FileTabledoesn't match V1 extractors (ResolvedV1TableIdentifier, etc.), we intercept these commands and delegate to V1 usingcatalogTablemetadata:AnalyzeTable,AnalyzeColumnTruncateTable,TruncatePartitionShowPartitionsRecoverPartitionsAddPartitions,RenamePartitions,DropPartitionsSetTableLocationCREATE TABLEdata type validation andREPLACE TABLEblocking for file sourcesFileTable. SinceFileTablelacksMICRO_BATCH_READ/CONTINUOUS_READcapabilities, streaming reads from catalog file tables fall back to V1StreamingRelation(TODO: SPARK-56233).collect{}.headto.collectFirst+.flatMapto gracefully handle V2 tables (which resolve toDataSourceV2Relationinstead ofLogicalRelation)Helper:
partSpecToMapAdded a
partSpecToMaphelper inResolveSessionCatalogthat convertsPartitionSpec(eitherResolvedPartitionSpecorUnresolvedPartitionSpec) to V1'sMap[String, String]format. This is needed becauseResolvePartitionSpecmay resolve partition specs beforeResolveSessionCatalogruns (both are in the Resolution batch), and callingasUnresolvedPartitionSpecson already-resolved specs would throwClassCastException.Why are the changes needed?
This is a key step toward unifying V1/V2 file source paths (SPARK-56170). By having
V2SessionCatalogreturn native V2FileTableinstances for file-based catalog tables, we enable:The V1 fallbacks in
ResolveSessionCatalogensure backward compatibility for commands that don't yet have V2-native implementations.Does this PR introduce any user-facing change?
No. The behavior is functionally equivalent. Internally, catalog file tables are now loaded as V2
FileTableinstances, but all user-visible operations produce the same results through V1 command fallbacks where needed.How was this patch tested?
Existing tests updated to handle both V1 and V2 plan structures:
DataStreamTableAPISuite: streaming read with file source tablesInMemoryColumnarQuerySuite: catalog stats afterANALYZE TABLE(TODO: SPARK-56232 for V2 stats propagation)CSVv2Suite: CSV parsing with char/varchar type columnsJsonV2Suite: case sensitivity of filter referencesOrcSourceV2Suite: bloom filter creation and selective dictionary encodingOrcV2QuerySuite: ORC file format detection in query plansParquetV2QuerySuite: INT96 to TIMESTAMP_MICROS migrationFileSourceSQLInsertTestSuite: partition spec handling withkeepPartitionSpecAsStringLiteralWas this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code 4.6