Skip to content

Commit 61d1537

Browse files
authored
Remove unused utils, improve Iceberg and DeltaLake loaders (#3)
* loaders/utils: Remove (unused) * delta_lake_loader: Simplify batch statistics, no compaction stats * iceberg_loader: Add _table_cache - avoid fetching catalog for each batch - Also use _create_table_from_schema() to match base DataLoader class * iceberg_loader: Simplify batch_metadata
1 parent 9949095 commit 61d1537

File tree

4 files changed

+60
-440
lines changed

4 files changed

+60
-440
lines changed

src/amp/loaders/implementations/deltalake_loader.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -203,11 +203,7 @@ def _load_batch_impl(self, batch: pa.RecordBatch, table_name: str, **kwargs) ->
203203
self._refresh_table_reference()
204204

205205
# Post-write optimizations
206-
optimization_results = self._perform_post_write_optimizations(table.num_rows)
207-
208-
# Store optimization results in base class metadata
209-
if hasattr(self, '_last_batch_metadata'):
210-
self._last_batch_metadata = optimization_results
206+
_optimization_results = self._perform_post_write_optimizations()
211207

212208
return batch.num_rows
213209

@@ -283,7 +279,7 @@ def _refresh_table_reference(self) -> None:
283279
self.logger.error(f'Failed to refresh table reference: {e}')
284280
# Don't set _table_exists = False here as the table might still exist
285281

286-
def _perform_post_write_optimizations(self, rows_written: int) -> Dict[str, Any]:
282+
def _perform_post_write_optimizations(self) -> Dict[str, Any]:
287283
"""Perform post-write optimizations with robust API handling"""
288284
optimization_results = {}
289285

@@ -455,10 +451,6 @@ def _get_loader_batch_metadata(self, batch: pa.RecordBatch, duration: float, **k
455451
if self._table_exists and self._delta_table is not None:
456452
metadata['table_version'] = self._delta_table.version()
457453

458-
# Add optimization results if available
459-
if hasattr(self, '_last_batch_metadata'):
460-
metadata['optimization_results'] = self._last_batch_metadata
461-
462454
return metadata
463455

464456
def _get_loader_table_metadata(

src/amp/loaders/implementations/iceberg_loader.py

Lines changed: 58 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ def __init__(self, config: Dict[str, Any]):
8888
self._current_table: Optional[IcebergTable] = None
8989
self._namespace_exists: bool = False
9090
self.enable_statistics: bool = config.get('enable_statistics', True)
91+
self._table_cache: Dict[str, IcebergTable] = {} # Cache tables by identifier
9192

9293
def _get_required_config_fields(self) -> list[str]:
9394
"""Return required configuration fields"""
@@ -119,6 +120,7 @@ def disconnect(self) -> None:
119120
if self._catalog:
120121
self._catalog = None
121122

123+
self._table_cache.clear() # Clear table cache on disconnect
122124
self._is_connected = False
123125
self.logger.info('Iceberg loader disconnected')
124126

@@ -130,9 +132,16 @@ def _load_batch_impl(self, batch: pa.RecordBatch, table_name: str, **kwargs) ->
130132
# Fix timestamps for Iceberg compatibility
131133
table = self._fix_timestamps(table)
132134

133-
# Get or create the Iceberg table
135+
# Get the Iceberg table (already created by _create_table_from_schema if needed)
134136
mode = kwargs.get('mode', LoadMode.APPEND)
135-
iceberg_table = self._get_or_create_table(table_name, table.schema)
137+
table_identifier = f'{self.config.namespace}.{table_name}'
138+
139+
# Use cached table if available
140+
if table_identifier in self._table_cache:
141+
iceberg_table = self._table_cache[table_identifier]
142+
else:
143+
iceberg_table = self._catalog.load_table(table_identifier)
144+
self._table_cache[table_identifier] = iceberg_table
136145

137146
# Validate schema compatibility (unless overwriting)
138147
if mode != LoadMode.OVERWRITE:
@@ -143,15 +152,28 @@ def _load_batch_impl(self, batch: pa.RecordBatch, table_name: str, **kwargs) ->
143152

144153
return rows_written
145154

146-
def _create_table_from_schema(self, schema: pa.Schema, table_name: str) -> None:
147-
"""Create table from Arrow schema"""
148-
# Iceberg handles table creation in _get_or_create_table
149-
self.logger.info(f"Iceberg will create table '{table_name}' on first write with appropriate schema")
150-
151155
def _clear_table(self, table_name: str) -> None:
152156
"""Clear table for overwrite mode"""
153157
# Iceberg handles overwrites internally
154-
self.logger.info(f"Iceberg will handle overwrite for table '{table_name}'")
158+
# Clear from cache to ensure fresh state after overwrite
159+
table_identifier = f'{self.config.namespace}.{table_name}'
160+
if table_identifier in self._table_cache:
161+
del self._table_cache[table_identifier]
162+
163+
def _fix_schema_timestamps(self, schema: pa.Schema) -> pa.Schema:
164+
"""Convert nanosecond timestamps to microseconds in schema for Iceberg compatibility"""
165+
# Check if conversion is needed
166+
if not any(pa.types.is_timestamp(f.type) and f.type.unit == 'ns' for f in schema):
167+
return schema
168+
169+
new_fields = []
170+
for field in schema:
171+
if pa.types.is_timestamp(field.type) and field.type.unit == 'ns':
172+
new_fields.append(pa.field(field.name, pa.timestamp('us', tz=field.type.tz)))
173+
else:
174+
new_fields.append(field)
175+
176+
return pa.schema(new_fields)
155177

156178
def _fix_timestamps(self, arrow_table: pa.Table) -> pa.Table:
157179
"""Convert nanosecond timestamps to microseconds for Iceberg compatibility"""
@@ -219,33 +241,36 @@ def _check_namespace_exists(self, namespace: str) -> None:
219241
except Exception as e:
220242
raise NoSuchNamespaceError(f"Failed to verify namespace '{namespace}': {str(e)}") from e
221243

222-
def _get_or_create_table(self, table_name: str, schema: pa.Schema) -> IcebergTable:
223-
"""Get existing table or create new one"""
224-
table_identifier = f'{self.config.namespace}.{table_name}'
244+
def _create_table_from_schema(self, schema: pa.Schema, table_name: str) -> None:
245+
"""Create table if it doesn't exist - called once by base class before first batch"""
246+
if not self.config.create_table:
247+
# If create_table is False, just verify table exists
248+
table_identifier = f'{self.config.namespace}.{table_name}'
249+
try:
250+
table = self._catalog.load_table(table_identifier)
251+
# Cache the existing table
252+
self._table_cache[table_identifier] = table
253+
self.logger.debug(f'Table already exists: {table_identifier}')
254+
except (NoSuchTableError, NoSuchIcebergTableError) as e:
255+
raise NoSuchTableError(f"Table '{table_identifier}' not found and create_table=False") from e
256+
return
225257

226-
try:
227-
table = self._catalog.load_table(table_identifier)
228-
self.logger.debug(f'Loaded existing table: {table_identifier}')
229-
return table
258+
table_identifier = f'{self.config.namespace}.{table_name}'
230259

231-
except (NoSuchTableError, NoSuchIcebergTableError) as e:
232-
if not self.config.create_table:
233-
raise NoSuchTableError(f"Table '{table_identifier}' not found and create_table=False") from e
260+
# Fix timestamps in schema before creating table
261+
fixed_schema = self._fix_schema_timestamps(schema)
234262

235-
try:
236-
# Use partition_spec if provided
237-
if self.config.partition_spec:
238-
table = self._catalog.create_table(
239-
identifier=table_identifier, schema=schema, partition_spec=self.config.partition_spec
240-
)
241-
else:
242-
# Create table without partitioning
243-
table = self._catalog.create_table(identifier=table_identifier, schema=schema)
244-
self.logger.info(f'Created new table: {table_identifier}')
245-
return table
263+
# Use create_table_if_not_exists for simpler logic
264+
if self.config.partition_spec:
265+
table = self._catalog.create_table_if_not_exists(
266+
identifier=table_identifier, schema=fixed_schema, partition_spec=self.config.partition_spec
267+
)
268+
else:
269+
table = self._catalog.create_table_if_not_exists(identifier=table_identifier, schema=fixed_schema)
246270

247-
except Exception as e:
248-
raise RuntimeError(f"Failed to create table '{table_identifier}': {str(e)}") from e
271+
# Cache the newly created/loaded table
272+
self._table_cache[table_identifier] = table
273+
self.logger.info(f"Table '{table_identifier}' ready (created if needed)")
249274

250275
def _validate_schema_compatibility(self, iceberg_table: IcebergTable, arrow_schema: pa.Schema) -> None:
251276
"""Validate that Arrow schema is compatible with Iceberg table schema and perform schema evolution if enabled"""
@@ -383,45 +408,13 @@ def _perform_load_operation(self, iceberg_table: IcebergTable, arrow_table: pa.T
383408

384409
def _get_loader_batch_metadata(self, batch: pa.RecordBatch, duration: float, **kwargs) -> Dict[str, Any]:
385410
"""Get Iceberg-specific metadata for batch operation"""
386-
metadata = {'namespace': self.config.namespace}
387-
388-
# Add partition columns if available
389-
table_name = kwargs.get('table_name')
390-
if table_name and self._table_exists(table_name):
391-
try:
392-
table_info = self.get_table_info(table_name)
393-
metadata['partition_columns'] = table_info.get('partition_columns', [])
394-
except Exception:
395-
metadata['partition_columns'] = []
396-
else:
397-
# For new tables, get partition fields from partition_spec if available
398-
metadata['partition_columns'] = []
399-
400-
return metadata
411+
return {'namespace': self.config.namespace}
401412

402413
def _get_loader_table_metadata(
403414
self, table: pa.Table, duration: float, batch_count: int, **kwargs
404415
) -> Dict[str, Any]:
405416
"""Get Iceberg-specific metadata for table operation"""
406-
metadata = {'namespace': self.config.namespace}
407-
408-
# Add partition columns if available
409-
table_name = kwargs.get('table_name')
410-
if table_name and self._table_exists(table_name):
411-
try:
412-
table_info = self.get_table_info(table_name)
413-
metadata['partition_columns'] = table_info.get('partition_columns', [])
414-
except Exception:
415-
metadata['partition_columns'] = []
416-
else:
417-
# For new tables, get partition fields from partition_spec if available
418-
metadata['partition_columns'] = []
419-
if self.config.partition_spec and hasattr(self.config.partition_spec, 'fields'):
420-
# partition_spec.fields contains partition field definitions
421-
# We'll extract them during table creation
422-
metadata['partition_columns'] = [] # Will be populated after table creation
423-
424-
return metadata
417+
return {'namespace': self.config.namespace}
425418

426419
def _table_exists(self, table_name: str) -> bool:
427420
"""Check if a table exists"""

0 commit comments

Comments
 (0)