Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,14 @@ type MigrationContext struct {
TriggerSuffix string
Triggers []mysql.Trigger

// RowFilterWhereClause is an optional WHERE clause to filter rows during migration.
// Only rows matching this condition will be copied to the ghost table.
// This enables using gh-ost for data deletion/purging.
// Example: "created_at >= '2024-01-01'" keeps only rows from 2024 onwards
RowFilterWhereClause string
// RowFilter is the parsed filter for evaluating binlog events
RowFilter *sql.RowFilter

recentBinlogCoordinates mysql.BinlogCoordinates

BinlogSyncerMaxReconnectAttempts int
Expand Down
1 change: 1 addition & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func main() {
flag.StringVar(&migrationContext.DatabaseName, "database", "", "database name (mandatory)")
flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)")
flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)")
flag.StringVar(&migrationContext.RowFilterWhereClause, "where", "", "WHERE clause to filter rows during copy. Only rows matching this condition are kept in the ghost table. Useful for data purging. Example: --where=\"created_at >= '2024-01-01'\" to keep only recent data")
flag.BoolVar(&migrationContext.AttemptInstantDDL, "attempt-instant-ddl", false, "Attempt to use instant DDL for this migration first")
storageEngine := flag.String("storage-engine", "innodb", "Specify table storage engine (default: 'innodb'). When 'rocksdb': the session transaction isolation level is changed from REPEATABLE_READ to READ_COMMITTED.")

Expand Down
56 changes: 55 additions & 1 deletion go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
startTime := time.Now()
chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize)

query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery(
query, explodedArgs, err := sql.BuildRangeInsertPreparedQueryWithFilter(
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
this.migrationContext.GetGhostTableName(),
Expand All @@ -894,6 +894,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
this.migrationContext.IsTransactionalTable(),
// TODO: Don't hardcode this
strings.HasPrefix(this.migrationContext.ApplierMySQLVersion, "8."),
this.migrationContext.RowFilterWhereClause,
)
if err != nil {
return chunkSize, rowsAffected, duration, err
Expand Down Expand Up @@ -1449,19 +1450,64 @@ func (this *Applier) updateModifiesUniqueKeyColumns(dmlEvent *binlog.BinlogDMLEv
// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
// event entry on the original table.
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlBuildResult {
// Check if we have a row filter for data purging
rowFilter := this.migrationContext.RowFilter
hasFilter := rowFilter != nil && !rowFilter.IsEmpty()

switch dmlEvent.DML {
case binlog.DeleteDML:
{
// For DELETE: If we have a filter, the row either matched (and was copied) or didn't match
// (and wasn't copied). Either way, we issue the DELETE - it's idempotent.
// However, if the row doesn't match the filter, it was never in the ghost table,
// so we can skip the DELETE to avoid unnecessary work.
if hasFilter {
oldMatches := rowFilter.Matches(dmlEvent.WhereColumnValues.AbstractValues())
if !oldMatches {
// Row was never copied to ghost table, skip DELETE
return []*dmlBuildResult{}
}
}
query, uniqueKeyArgs, err := this.dmlDeleteQueryBuilder.BuildQuery(dmlEvent.WhereColumnValues.AbstractValues())
return []*dmlBuildResult{newDmlBuildResult(query, uniqueKeyArgs, -1, err)}
}
case binlog.InsertDML:
{
// For INSERT: Only insert if the row matches the filter (or no filter)
if hasFilter {
newMatches := rowFilter.Matches(dmlEvent.NewColumnValues.AbstractValues())
if !newMatches {
// Row doesn't match filter - don't insert (effectively deleted)
return []*dmlBuildResult{}
}
}
query, sharedArgs, err := this.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues())
return []*dmlBuildResult{newDmlBuildResult(query, sharedArgs, 1, err)}
}
case binlog.UpdateDML:
{
// For UPDATE with filter, check if the row's filter status changed
if hasFilter {
oldMatches := rowFilter.Matches(dmlEvent.WhereColumnValues.AbstractValues())
newMatches := rowFilter.Matches(dmlEvent.NewColumnValues.AbstractValues())

if !oldMatches && !newMatches {
// Row never matched filter - no-op
return []*dmlBuildResult{}
}
if oldMatches && !newMatches {
// Row used to match but no longer does - treat as DELETE
query, uniqueKeyArgs, err := this.dmlDeleteQueryBuilder.BuildQuery(dmlEvent.WhereColumnValues.AbstractValues())
return []*dmlBuildResult{newDmlBuildResult(query, uniqueKeyArgs, -1, err)}
}
if !oldMatches && newMatches {
// Row now matches but didn't before - treat as INSERT
query, sharedArgs, err := this.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues())
return []*dmlBuildResult{newDmlBuildResult(query, sharedArgs, 1, err)}
}
// Both old and new match - proceed with normal UPDATE below
}

if _, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified {
results := make([]*dmlBuildResult, 0, 2)
dmlEvent.DML = binlog.DeleteDML
Expand Down Expand Up @@ -1519,6 +1565,14 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
}
}

// If all events were filtered out (e.g., by --where filter), nothing to execute
if len(buildResults) == 0 {
if err := tx.Commit(); err != nil {
return err
}
return nil
}

// We batch together the DML queries into multi-statements to minimize network trips.
// We have to use the raw driver connection to access the rows affected
// for each statement in the multi-statement.
Expand Down
10 changes: 10 additions & 0 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,16 @@ func (this *Migrator) Migrate() (err error) {
return err
}

// Initialize row filter for data purging if a WHERE clause was provided
if this.migrationContext.RowFilterWhereClause != "" {
rowFilter, err := sql.NewRowFilter(this.migrationContext.RowFilterWhereClause, this.migrationContext.OriginalTableColumns)
if err != nil {
return this.migrationContext.Log.Errorf("Failed to parse --where clause: %+v", err)
}
this.migrationContext.RowFilter = rowFilter
this.migrationContext.Log.Infof("Row filter enabled: only rows matching '%s' will be migrated", this.migrationContext.RowFilterWhereClause)
}

// We can prepare some of the queries on the applier
if err := this.applier.prepareQueries(); err != nil {
return err
Expand Down
24 changes: 21 additions & 3 deletions go/sql/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,12 @@ func BuildRangePreparedComparison(columns *ColumnList, args []interface{}, compa
}

func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool, noWait bool) (result string, explodedArgs []interface{}, err error) {
return BuildRangeInsertQueryWithFilter(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable, noWait, "")
}

// BuildRangeInsertQueryWithFilter builds an INSERT...SELECT query with an optional row filter WHERE clause.
// The rowFilterWhereClause parameter allows filtering rows during copy (for data purging).
func BuildRangeInsertQueryWithFilter(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool, noWait bool, rowFilterWhereClause string) (result string, explodedArgs []interface{}, err error) {
if len(sharedColumns) == 0 {
return "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery")
}
Expand Down Expand Up @@ -303,6 +309,13 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin
return "", explodedArgs, err
}
explodedArgs = append(explodedArgs, rangeExplodedArgs...)

// Build optional row filter clause for data purging
rowFilterClause := ""
if rowFilterWhereClause != "" {
rowFilterClause = fmt.Sprintf("and (%s)", rowFilterWhereClause)
}

result = fmt.Sprintf(`
insert /* gh-ost %s.%s */ ignore
into
Expand All @@ -314,19 +327,24 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin
%s.%s
force index (%s)
where
(%s and %s)
(%s and %s) %s
%s
)`,
databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing,
sharedColumnsListing, databaseName, originalTableName, uniqueKey,
rangeStartComparison, rangeEndComparison, transactionalClause)
rangeStartComparison, rangeEndComparison, rowFilterClause, transactionalClause)
return result, explodedArgs, nil
}

func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool, noWait bool) (result string, explodedArgs []interface{}, err error) {
return BuildRangeInsertPreparedQueryWithFilter(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable, noWait, "")
}

// BuildRangeInsertPreparedQueryWithFilter builds a prepared INSERT...SELECT query with an optional row filter.
func BuildRangeInsertPreparedQueryWithFilter(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool, noWait bool, rowFilterWhereClause string) (result string, explodedArgs []interface{}, err error) {
rangeStartValues := buildColumnsPreparedValues(uniqueKeyColumns)
rangeEndValues := buildColumnsPreparedValues(uniqueKeyColumns)
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable, noWait)
return BuildRangeInsertQueryWithFilter(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable, noWait, rowFilterWhereClause)
}

func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
Expand Down
Loading