Skip to content
2 changes: 1 addition & 1 deletion go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ type MigrationContext struct {
AbortError error
abortMutex *sync.Mutex

Metrics *metrics.Client
Metrics metrics.Emitter

OriginalTableColumnsOnApplier *sql.ColumnList
OriginalTableColumns *sql.ColumnList
Expand Down
12 changes: 12 additions & 0 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/metrics"
"github.com/github/gh-ost/go/sql"

"context"
Expand Down Expand Up @@ -1044,22 +1045,27 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool
return hasFurtherRange, err
}

queryStartTime := time.Now()
rows, err := apl.db.Query(query, explodedArgs...)
if err != nil {
metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err)
return hasFurtherRange, err
}
defer rows.Close()

iterationRangeMaxValues := sql.NewColumnValues(apl.migrationContext.UniqueKey.Len())
for rows.Next() {
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err)
return hasFurtherRange, err
}
hasFurtherRange = true
}
if err = rows.Err(); err != nil {
metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err)
return hasFurtherRange, err
}
metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), nil)
if hasFurtherRange {
apl.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
return hasFurtherRange, nil
Expand Down Expand Up @@ -1107,7 +1113,9 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i
if _, err := tx.Exec(sessionQuery); err != nil {
return nil, err
}
queryStartTime := time.Now()
result, err := tx.Exec(query, explodedArgs...)
metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "chunk_copy", time.Since(queryStartTime), err)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1820,13 +1828,16 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e
// in the batch. SHOW WARNINGS only shows warnings from the last statement in a
// multi-statement query, so we interleave SHOW WARNINGS after each DML statement.
if apl.migrationContext.PanicOnWarnings {
queryStartTime := time.Now()
totalDelta, err = apl.executeBatchWithWarningChecking(ctx, tx, buildResults)
metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "binlog_apply", time.Since(queryStartTime), err)
if err != nil {
return rollback(err)
}
} else {
// Fast path: batch together DML queries into multi-statements to minimize network trips.
// We use the raw driver connection to access the rows affected for each statement.
queryStartTime := time.Now()
execErr := conn.Raw(func(driverConn any) error {
ex := driverConn.(driver.ExecerContext)
nvc := driverConn.(driver.NamedValueChecker)
Expand Down Expand Up @@ -1860,6 +1871,7 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e
return nil
})

metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "binlog_apply", time.Since(queryStartTime), execErr)
if execErr != nil {
return rollback(execErr)
}
Expand Down
4 changes: 4 additions & 0 deletions go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/metrics"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"

Expand Down Expand Up @@ -677,13 +678,16 @@ func (isp *Inspector) CountTableRows(ctx context.Context) error {

query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName))
var rowsEstimate int64
queryStartTime := time.Now()
if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil {
metrics.RecordQueryDuration(isp.migrationContext.Metrics, "source", "row_count", time.Since(queryStartTime), err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
isp.migrationContext.Log.Infof("exact row count cancelled (%s), likely because I'm about to cut over. I'm going to kill that query.", ctx.Err())
return mysql.Kill(isp.db, connectionID)
}
return err
}
metrics.RecordQueryDuration(isp.migrationContext.Metrics, "source", "row_count", time.Since(queryStartTime), nil)

// row count query finished. nil out the cancel func, so the main migration thread
// doesn't bother calling it after row copy is done.
Expand Down
Loading
Loading