diff --git a/go/base/context.go b/go/base/context.go index 26d13fe07..4d0c3b09d 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -238,7 +238,7 @@ type MigrationContext struct { AbortError error abortMutex *sync.Mutex - Metrics *metrics.Client + Metrics metrics.Emitter OriginalTableColumnsOnApplier *sql.ColumnList OriginalTableColumns *sql.ColumnList diff --git a/go/logic/applier.go b/go/logic/applier.go index f3474b3ef..29269ed11 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -18,6 +18,7 @@ import ( "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" + "github.com/github/gh-ost/go/metrics" "github.com/github/gh-ost/go/sql" "context" @@ -1044,8 +1045,10 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool return hasFurtherRange, err } + queryStartTime := time.Now() rows, err := apl.db.Query(query, explodedArgs...) if err != nil { + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err) return hasFurtherRange, err } defer rows.Close() @@ -1053,13 +1056,16 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool iterationRangeMaxValues := sql.NewColumnValues(apl.migrationContext.UniqueKey.Len()) for rows.Next() { if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil { + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err) return hasFurtherRange, err } hasFurtherRange = true } if err = rows.Err(); err != nil { + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err) return hasFurtherRange, err } + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), nil) if hasFurtherRange { apl.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues return hasFurtherRange, nil @@ -1107,7 +1113,9 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i if _, err := tx.Exec(sessionQuery); err != nil { return nil, err } + queryStartTime := time.Now() result, err := tx.Exec(query, explodedArgs...) + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "chunk_copy", time.Since(queryStartTime), err) if err != nil { return nil, err } @@ -1820,13 +1828,16 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e // in the batch. SHOW WARNINGS only shows warnings from the last statement in a // multi-statement query, so we interleave SHOW WARNINGS after each DML statement. if apl.migrationContext.PanicOnWarnings { + queryStartTime := time.Now() totalDelta, err = apl.executeBatchWithWarningChecking(ctx, tx, buildResults) + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "binlog_apply", time.Since(queryStartTime), err) if err != nil { return rollback(err) } } else { // Fast path: batch together DML queries into multi-statements to minimize network trips. // We use the raw driver connection to access the rows affected for each statement. + queryStartTime := time.Now() execErr := conn.Raw(func(driverConn any) error { ex := driverConn.(driver.ExecerContext) nvc := driverConn.(driver.NamedValueChecker) @@ -1860,6 +1871,7 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e return nil }) + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "binlog_apply", time.Since(queryStartTime), execErr) if execErr != nil { return rollback(execErr) } diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 96aadd672..982be1d98 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -16,6 +16,7 @@ import ( "time" "github.com/github/gh-ost/go/base" + "github.com/github/gh-ost/go/metrics" "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" @@ -677,13 +678,16 @@ func (isp *Inspector) CountTableRows(ctx context.Context) error { query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) var rowsEstimate int64 + queryStartTime := time.Now() if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil { + metrics.RecordQueryDuration(isp.migrationContext.Metrics, "source", "row_count", time.Since(queryStartTime), err) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { isp.migrationContext.Log.Infof("exact row count cancelled (%s), likely because I'm about to cut over. I'm going to kill that query.", ctx.Err()) return mysql.Kill(isp.db, connectionID) } return err } + metrics.RecordQueryDuration(isp.migrationContext.Metrics, "source", "row_count", time.Since(queryStartTime), nil) // row count query finished. nil out the cancel func, so the main migration thread // doesn't bother calling it after row copy is done. diff --git a/go/logic/migrator.go b/go/logic/migrator.go index bec13e594..017a0c302 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -18,6 +18,7 @@ import ( "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" + "github.com/github/gh-ost/go/metrics" "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" ) @@ -129,7 +130,7 @@ func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator { // sleepWhileTrue sleeps indefinitely until the given function returns 'false' // (or fails with error) -func (mgtr *Migrator) sleepWhileTrue(operation func() (bool, error)) error { +func (mgtr *Migrator) sleepWhileTrue(stage string, operation func() (bool, error)) error { for { // Check for abort before continuing if err := mgtr.checkAbort(); err != nil { @@ -142,6 +143,7 @@ func (mgtr *Migrator) sleepWhileTrue(operation func() (bool, error)) error { if !shouldSleep { return nil } + metrics.RecordSleep(mgtr.migrationContext.Metrics, stage, time.Second) time.Sleep(time.Second) } } @@ -165,7 +167,9 @@ func (mgtr *Migrator) retryOperation(operation func() error, notFatalHint ...boo for i := 0; i < maxRetries; i++ { if i != 0 { // sleep after previous iteration - RetrySleepFn(1 * time.Second) + sleepDuration := 1 * time.Second + metrics.RecordSleep(mgtr.migrationContext.Metrics, "retry_backoff", sleepDuration) + RetrySleepFn(sleepDuration) } // Check for abort/context cancellation before each retry if abortErr := mgtr.checkAbort(); abortErr != nil { @@ -206,7 +210,9 @@ func (mgtr *Migrator) retryOperationWithExponentialBackoff(operation func() erro ) if i != 0 { - RetrySleepFn(time.Duration(interval) * time.Second) + sleepDuration := time.Duration(interval) * time.Second + metrics.RecordSleep(mgtr.migrationContext.Metrics, "retry_backoff", sleepDuration) + RetrySleepFn(sleepDuration) } // Check for abort/context cancellation before each retry if abortErr := mgtr.checkAbort(); abortErr != nil { @@ -636,7 +642,7 @@ func (mgtr *Migrator) Migrate() (err error) { if err := mgtr.hooksExecutor.OnRowCopyComplete(); err != nil { return err } - mgtr.printStatus(ForcePrintStatusRule) + mgtr.reportStatus(ForcePrintStatusRule) if mgtr.migrationContext.IsCountingTableRows() { mgtr.migrationContext.Log.Info("stopping query for exact row count, because that can accidentally lock out the cut over") @@ -651,7 +657,7 @@ func (mgtr *Migrator) Migrate() (err error) { } else { retrier = mgtr.retryOperation } - if err := retrier(mgtr.cutOver); err != nil { + if err := mgtr.cutOverWithMetrics(retrier); err != nil { return err } atomic.StoreInt64(&mgtr.migrationContext.CutOverCompleteFlag, 1) @@ -766,7 +772,7 @@ func (mgtr *Migrator) Revert() error { } }() - mgtr.printStatus(ForcePrintStatusRule) + mgtr.reportStatus(ForcePrintStatusRule) var retrier func(func() error, ...bool) error if mgtr.migrationContext.CutOverExponentialBackoff { retrier = mgtr.retryOperationWithExponentialBackoff @@ -776,7 +782,7 @@ func (mgtr *Migrator) Revert() error { if err := mgtr.hooksExecutor.OnBeforeCutOver(); err != nil { return err } - if err := retrier(mgtr.cutOver); err != nil { + if err := mgtr.cutOverWithMetrics(retrier); err != nil { return err } atomic.StoreInt64(&mgtr.migrationContext.CutOverCompleteFlag, 1) @@ -796,6 +802,28 @@ func (mgtr *Migrator) ExecOnFailureHook() (err error) { return mgtr.hooksExecutor.OnFailure() } +func (mgtr *Migrator) cutOverWithMetrics(retrier func(func() error, ...bool) error) error { + return mgtr.cutOverOperationWithMetrics(retrier, mgtr.cutOver) +} + +func (mgtr *Migrator) cutOverOperationWithMetrics(retrier func(func() error, ...bool) error, operation func() error) error { + cutOverStartTime := time.Now() + err := retrier(func() error { + err := operation() + if err != nil { + metrics.RecordCutOverAttempt(mgtr.migrationContext.Metrics, metrics.CutOverOutcomeRetry) + return err + } + metrics.RecordCutOverAttempt(mgtr.migrationContext.Metrics, metrics.CutOverOutcomeSuccess) + return nil + }) + if err != nil { + metrics.RecordCutOverAttempt(mgtr.migrationContext.Metrics, metrics.CutOverOutcomeAbort) + } + metrics.RecordCutOverTotal(mgtr.migrationContext.Metrics, time.Since(cutOverStartTime), err) + return err +} + func (mgtr *Migrator) handleCutOverResult(cutOverError error) (err error) { if mgtr.migrationContext.TestOnReplica { // We're merely testing, we don't want to keep this state. Rollback the renames as possible @@ -841,6 +869,7 @@ func (mgtr *Migrator) cutOver() (err error) { mgtr.migrationContext.MarkPointOfInterest() mgtr.migrationContext.Log.Debugf("checking for cut-over postpone") if err := mgtr.sleepWhileTrue( + "cut_over_postpone", func() (bool, error) { heartbeatLag := mgtr.migrationContext.TimeSinceLastHeartbeatOnChangelog() maxLagMillisecondsThrottle := time.Duration(atomic.LoadInt64(&mgtr.migrationContext.MaxLagMillisecondsThrottleThreshold)) * time.Millisecond @@ -944,7 +973,7 @@ func (mgtr *Migrator) waitForEventsUpToLock() error { waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime) mgtr.migrationContext.Log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration) - mgtr.printStatus(ForcePrintStatusAndHintRule) + mgtr.reportStatus(ForcePrintStatusAndHintRule) return nil } @@ -958,9 +987,12 @@ func (mgtr *Migrator) cutOverTwoStep() (err error) { defer atomic.StoreInt64(&mgtr.migrationContext.InCutOverCriticalSectionFlag, 0) atomic.StoreInt64(&mgtr.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0) + phaseStartTime := time.Now() if err := mgtr.retryOperation(mgtr.applier.LockOriginalTable); err != nil { + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseOriginalTableLock, time.Since(phaseStartTime), err) return err } + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseOriginalTableLock, time.Since(phaseStartTime), nil) if err := mgtr.retryOperation(mgtr.waitForEventsUpToLock); err != nil { return err @@ -971,12 +1003,19 @@ func (mgtr *Migrator) cutOverTwoStep() (err error) { return err } } + phaseStartTime = time.Now() if err := mgtr.retryOperation(mgtr.applier.SwapTablesQuickAndBumpy); err != nil { + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), err) return err } + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), nil) + + phaseStartTime = time.Now() if err := mgtr.retryOperation(mgtr.applier.UnlockTables); err != nil { + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseUnlock, time.Since(phaseStartTime), err) return err } + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseUnlock, time.Since(phaseStartTime), nil) lockAndRenameDuration := mgtr.migrationContext.RenameTablesEndTime.Sub(mgtr.migrationContext.LockTablesStartTime) renameDuration := mgtr.migrationContext.RenameTablesEndTime.Sub(mgtr.migrationContext.RenameTablesStartTime) @@ -1000,14 +1039,17 @@ func (mgtr *Migrator) atomicCutOver() (err error) { tableLocked := make(chan error, 2) tableUnlocked := make(chan error, 2) var renameLockSessionId int64 + phaseStartTime := time.Now() go func() { if err := mgtr.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked, &renameLockSessionId); err != nil { mgtr.migrationContext.Log.Errore(err) } }() if err := <-tableLocked; err != nil { + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicLock, time.Since(phaseStartTime), err) return mgtr.migrationContext.Log.Errore(err) } + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicLock, time.Since(phaseStartTime), nil) lockOriginalSessionId := <-lockOriginalSessionIdChan mgtr.migrationContext.Log.Infof("Session locking original & magic tables is %+v", lockOriginalSessionId) // At this point we know the original table is locked. @@ -1025,7 +1067,8 @@ func (mgtr *Migrator) atomicCutOver() (err error) { // Step 2 // We now attempt an atomic RENAME on original & ghost tables, and expect it to block. - mgtr.migrationContext.RenameTablesStartTime = time.Now() + phaseStartTime = time.Now() + mgtr.migrationContext.RenameTablesStartTime = phaseStartTime var tableRenameKnownToHaveFailed int64 renameSessionIdChan := make(chan int64, 2) @@ -1050,6 +1093,7 @@ func (mgtr *Migrator) atomicCutOver() (err error) { } // Wait for the RENAME to appear in PROCESSLIST if err := mgtr.retryOperation(waitForRename, true); err != nil { + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), err) // Abort! Release the lock okToUnlockTable <- true return err @@ -1058,6 +1102,7 @@ func (mgtr *Migrator) atomicCutOver() (err error) { mgtr.migrationContext.Log.Infof("Found atomic RENAME to be blocking, as expected. Double checking the lock is still in place (though I don't strictly have to)") } if err := mgtr.applier.ExpectUsedLock(lockOriginalSessionId); err != nil { + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), err) // Abort operation. Just make sure to drop the magic table. return mgtr.migrationContext.Log.Errore(err) } @@ -1067,16 +1112,21 @@ func (mgtr *Migrator) atomicCutOver() (err error) { // we know it is safe to proceed to release the lock renameLockSessionId = renameSessionId + unlockStartTime := time.Now() okToUnlockTable <- true // BAM! magic table dropped, original table lock is released // -> RENAME released -> queries on original are unblocked. if err := <-tableUnlocked; err != nil { + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseUnlock, time.Since(unlockStartTime), err) return mgtr.migrationContext.Log.Errore(err) } + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseUnlock, time.Since(unlockStartTime), nil) if err := <-tablesRenamed; err != nil { + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), err) return mgtr.migrationContext.Log.Errore(err) } mgtr.migrationContext.RenameTablesEndTime = time.Now() + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, mgtr.migrationContext.RenameTablesEndTime.Sub(phaseStartTime), nil) // ooh nice! We're actually truly and thankfully done lockAndRenameDuration := mgtr.migrationContext.RenameTablesEndTime.Sub(mgtr.migrationContext.LockTablesStartTime) @@ -1087,7 +1137,7 @@ func (mgtr *Migrator) atomicCutOver() (err error) { // initiateServer begins listening on unix socket/tcp for incoming interactive commands func (mgtr *Migrator) initiateServer() (err error) { var f printStatusFunc = func(rule PrintStatusRule, writer io.Writer) { - mgtr.printStatus(rule, writer) + mgtr.reportStatus(rule, writer) } mgtr.server = NewServer(mgtr.migrationContext, mgtr.hooksExecutor, f) if err := mgtr.server.BindSocketFile(); err != nil { @@ -1167,9 +1217,38 @@ func (mgtr *Migrator) initiateInspector() (err error) { return nil } -// initiateStatus sets and activates the printStatus() ticker +// emitProgressMetrics emits StatsD gauges from a progress snapshot. +func (mgtr *Migrator) emitProgressMetrics(snap migrationProgressSnapshot) { + metrics.EmitProgressGauges( + mgtr.migrationContext.Metrics, + snap.totalRowsCopied, + snap.rowsEstimate, + snap.dmlApplied, + ) + metrics.EmitBinlogBacklogGauges( + mgtr.migrationContext.Metrics, + snap.applyEventsBacklog, + snap.applyEventsCapacity, + ) + isThrottled, _, _ := mgtr.migrationContext.IsThrottled() + metrics.EmitLagGauges( + mgtr.migrationContext.Metrics, + snap.replicationLagSeconds, + snap.heartbeatLagSeconds, + isThrottled, + ) +} + +// reportStatus samples progress, emits metrics, and optionally prints status output. +func (mgtr *Migrator) reportStatus(rule PrintStatusRule, writers ...io.Writer) { + snap := mgtr.sampleMigrationProgress() + mgtr.emitProgressMetrics(snap) + mgtr.printStatus(rule, snap, writers...) +} + +// initiateStatus sets and activates the reportStatus() ticker. func (mgtr *Migrator) initiateStatus() { - mgtr.printStatus(ForcePrintStatusAndHintRule) + mgtr.reportStatus(ForcePrintStatusAndHintRule) ticker := time.NewTicker(time.Second) defer ticker.Stop() var previousCount int64 @@ -1177,7 +1256,7 @@ func (mgtr *Migrator) initiateStatus() { if atomic.LoadInt64(&mgtr.finishedMigrating) > 0 { return } - go mgtr.printStatus(HeuristicPrintStatusRule) + go mgtr.reportStatus(HeuristicPrintStatusRule) totalCopied := atomic.LoadInt64(&mgtr.migrationContext.TotalRowsCopied) if previousCount > 0 { copiedThisLoop := totalCopied - previousCount @@ -1372,55 +1451,35 @@ func (mgtr *Migrator) shouldPrintMigrationStatusHint(rule PrintStatusRule, elaps // `rule` indicates the type of output expected. // By default the status is written to standard output, but other writers can // be used as well. -func (mgtr *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { +func (mgtr *Migrator) printStatus(rule PrintStatusRule, snap migrationProgressSnapshot, writers ...io.Writer) { if rule == NoPrintStatusRule { return } writers = append(writers, os.Stdout) - elapsedTime := mgtr.migrationContext.ElapsedTime() - elapsedSeconds := int64(elapsedTime.Seconds()) - totalRowsCopied := mgtr.migrationContext.GetTotalRowsCopied() - rowsEstimate := atomic.LoadInt64(&mgtr.migrationContext.RowsEstimate) + atomic.LoadInt64(&mgtr.migrationContext.RowsDeltaEstimate) - if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { - // Done copying rows. The totalRowsCopied value is the de-facto number of rows, - // and there is no further need to keep updating the value. - rowsEstimate = totalRowsCopied - } - - // we take the opportunity to update migration context with progressPct - progressPct := mgtr.getProgressPercent(rowsEstimate) - mgtr.migrationContext.SetProgressPct(progressPct) - // Before status, let's see if we should print a nice reminder for what exactly we're doing here. - if mgtr.shouldPrintMigrationStatusHint(rule, elapsedSeconds) { + if mgtr.shouldPrintMigrationStatusHint(rule, snap.elapsedSeconds) { mgtr.printMigrationStatusHint(writers...) } - // Get state + ETA - state, eta, etaDuration := mgtr.getMigrationStateAndETA(rowsEstimate) - mgtr.migrationContext.SetETADuration(etaDuration) - - if !mgtr.shouldPrintStatus(rule, elapsedSeconds, etaDuration) { + if !mgtr.shouldPrintStatus(rule, snap.elapsedSeconds, snap.etaDuration) { return } - currentBinlogCoordinates := mgtr.eventsStreamer.GetCurrentBinlogCoordinates() - status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s", - totalRowsCopied, rowsEstimate, progressPct, - atomic.LoadInt64(&mgtr.migrationContext.TotalDMLEventsApplied), - len(mgtr.applyEventsQueue), cap(mgtr.applyEventsQueue), - base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(mgtr.migrationContext.ElapsedRowCopyTime()), - currentBinlogCoordinates.DisplayString(), - mgtr.migrationContext.GetCurrentLagDuration().Seconds(), - mgtr.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds(), - state, - eta, + snap.totalRowsCopied, snap.rowsEstimate, snap.progressPct, + snap.dmlApplied, + snap.applyEventsBacklog, snap.applyEventsCapacity, + base.PrettifyDurationOutput(snap.elapsedTime), base.PrettifyDurationOutput(snap.elapsedRowCopyTime), + snap.streamerBinlogPosition, + snap.replicationLagSeconds, + snap.heartbeatLagSeconds, + snap.state, + snap.eta, ) mgtr.applier.WriteChangelog( fmt.Sprintf("copy iteration %d at %d", mgtr.migrationContext.GetIteration(), time.Now().Unix()), - state, + snap.state, ) w := io.MultiWriter(writers...) fmt.Fprintln(w, status) @@ -1434,7 +1493,7 @@ func (mgtr *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { mgtr.migrationContext.Log.Info(strings.Replace(status, "%", "%%", 1)) hooksStatusIntervalSec := mgtr.migrationContext.HooksStatusIntervalSec - if hooksStatusIntervalSec > 0 && elapsedSeconds%hooksStatusIntervalSec == 0 { + if hooksStatusIntervalSec > 0 && snap.elapsedSeconds%hooksStatusIntervalSec == 0 { mgtr.hooksExecutor.OnStatus(status) } } @@ -1754,7 +1813,9 @@ func (mgtr *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error) { return chk, err } mgtr.applier.CurrentCoordinatesMutex.Unlock() - time.Sleep(500 * time.Millisecond) + sleepDuration := 500 * time.Millisecond + metrics.RecordSleep(mgtr.migrationContext.Metrics, "replica_wait", sleepDuration) + time.Sleep(sleepDuration) } } @@ -1859,7 +1920,12 @@ func (mgtr *Migrator) executeWriteFuncs() error { copyRowsDuration := time.Since(copyRowsStartTime) sleepTimeNanosecondFloat64 := niceRatio * float64(copyRowsDuration.Nanoseconds()) sleepTime := time.Duration(int64(sleepTimeNanosecondFloat64)) * time.Nanosecond - time.Sleep(sleepTime) + if sleepTime > 0 { + if sleepTime >= time.Millisecond { + metrics.RecordSleep(mgtr.migrationContext.Metrics, "chunk_throttle", sleepTime) + } + time.Sleep(sleepTime) + } } } default: diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index fc8feada1..2b76a2360 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -29,6 +29,7 @@ import ( "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" + "github.com/github/gh-ost/go/metrics" "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" "github.com/testcontainers/testcontainers-go" @@ -407,6 +408,202 @@ func TestMigratorGetMigrationStateAndETA(t *testing.T) { } } +type progressGaugeSpy struct { + names []string + values []float64 + tags [][]string +} + +func (s *progressGaugeSpy) Gauge(name string, value float64, tags ...string) { + s.names = append(s.names, name) + s.values = append(s.values, value) + s.tags = append(s.tags, append([]string(nil), tags...)) +} + +func (s *progressGaugeSpy) Count(_ string, _ int64, _ ...string) {} +func (s *progressGaugeSpy) Histogram(_ string, _ float64, _ ...string) {} + +type cutOverMetricsSpy struct { + countNames []string + countTags [][]string + histogramNames []string + histogramTags [][]string +} + +func (s *cutOverMetricsSpy) Gauge(_ string, _ float64, _ ...string) {} + +func (s *cutOverMetricsSpy) Count(name string, _ int64, tags ...string) { + s.countNames = append(s.countNames, name) + s.countTags = append(s.countTags, append([]string(nil), tags...)) +} + +func (s *cutOverMetricsSpy) Histogram(name string, _ float64, tags ...string) { + s.histogramNames = append(s.histogramNames, name) + s.histogramTags = append(s.histogramTags, append([]string(nil), tags...)) +} + +func TestCutOverOperationWithMetricsRetryThenSuccess(t *testing.T) { + spy := &cutOverMetricsSpy{} + ctx := base.NewMigrationContext() + ctx.Metrics = spy + migrator := NewMigrator(ctx, "test") + + attempts := 0 + retrier := func(operation func() error, _ ...bool) error { + for { + err := operation() + if err == nil { + return nil + } + if attempts >= 2 { + return err + } + } + } + operation := func() error { + attempts++ + if attempts == 1 { + return errors.New("transient cutover failure") + } + return nil + } + + require.NoError(t, migrator.cutOverOperationWithMetrics(retrier, operation)) + assert.Equal(t, []string{"cut_over.attempts_total", "cut_over.attempts_total"}, spy.countNames) + assert.Equal(t, [][]string{{"outcome:" + metrics.CutOverOutcomeRetry}, {"outcome:" + metrics.CutOverOutcomeSuccess}}, spy.countTags) + assert.Equal(t, []string{"cut_over.total_duration_milliseconds"}, spy.histogramNames) + assert.Equal(t, [][]string{{"outcome:" + metrics.CutOverOutcomeSuccess}}, spy.histogramTags) +} + +func TestCutOverOperationWithMetricsAbort(t *testing.T) { + spy := &cutOverMetricsSpy{} + ctx := base.NewMigrationContext() + ctx.Metrics = spy + migrator := NewMigrator(ctx, "test") + cutOverErr := errors.New("cutover failed") + + retrier := func(operation func() error, _ ...bool) error { + return operation() + } + operation := func() error { + return cutOverErr + } + + require.ErrorIs(t, migrator.cutOverOperationWithMetrics(retrier, operation), cutOverErr) + assert.Equal(t, []string{"cut_over.attempts_total", "cut_over.attempts_total"}, spy.countNames) + assert.Equal(t, [][]string{{"outcome:" + metrics.CutOverOutcomeRetry}, {"outcome:" + metrics.CutOverOutcomeAbort}}, spy.countTags) + assert.Equal(t, []string{"cut_over.total_duration_milliseconds"}, spy.histogramNames) + assert.Equal(t, [][]string{{"outcome:" + metrics.CutOverOutcomeAbort}}, spy.histogramTags) +} + +func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) { + spy := &progressGaugeSpy{} + ctx := base.NewMigrationContext() + ctx.Metrics = spy + atomic.StoreInt64(&ctx.TotalRowsCopied, 1000) + atomic.StoreInt64(&ctx.RowsEstimate, 5000) + atomic.StoreInt64(&ctx.TotalDMLEventsApplied, 42) + + migrator := NewMigrator(ctx, "test") + migrator.reportStatus(NoPrintStatusRule, io.Discard) + require.Len(t, spy.names, 8) + assert.Equal(t, []string{ + "row_copy.rows_copied", + "row_copy.rows_estimate", + "dml.events_applied", + "binlog.backlog_size", + "binlog.backlog_capacity", + "binlog.backlog_utilization", + "lag.replication_seconds", + "lag.heartbeat_seconds", + }, spy.names) + assert.Equal(t, []float64{1000, 5000, 42, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6]) + assert.InDelta(t, 20.0, ctx.GetProgressPct(), 0.01) + + spy.names = nil + spy.values = nil + atomic.StoreInt64(&ctx.TotalDMLEventsApplied, 100) + migrator.reportStatus(NoPrintStatusRule, io.Discard) + require.Len(t, spy.names, 8) + assert.Equal(t, []float64{1000, 5000, 100, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6]) +} + +func TestReportStatusEmitsBinlogBacklogGauges(t *testing.T) { + spy := &progressGaugeSpy{} + ctx := base.NewMigrationContext() + ctx.Metrics = spy + + migrator := NewMigrator(ctx, "test") + migrator.applyEventsQueue <- newApplyEventStructByDML(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{DML: binlog.InsertDML}, + }) + migrator.applyEventsQueue <- newApplyEventStructByDML(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{DML: binlog.InsertDML}, + }) + + migrator.reportStatus(NoPrintStatusRule, io.Discard) + + capacity := float64(cap(migrator.applyEventsQueue)) + require.Len(t, spy.names, 8) + assert.Equal(t, float64(2), spy.values[3]) + assert.Equal(t, capacity, spy.values[4]) + assert.InDelta(t, 2/capacity, spy.values[5], 1e-9) +} + +func TestReportStatusEmitsGaugesWhenRowCopyComplete(t *testing.T) { + spy := &progressGaugeSpy{} + ctx := base.NewMigrationContext() + ctx.Metrics = spy + atomic.StoreInt64(&ctx.TotalRowsCopied, 5000) + atomic.StoreInt64(&ctx.RowsEstimate, 10000) + + migrator := NewMigrator(ctx, "test") + atomic.StoreInt64(&migrator.rowCopyCompleteFlag, 1) + migrator.reportStatus(NoPrintStatusRule, io.Discard) + + require.Len(t, spy.names, 8) + assert.Equal(t, float64(5000), spy.values[0]) + assert.Equal(t, float64(5000), spy.values[1], "rows_estimate tracks rows_copied when row copy is complete") +} + +func TestReportStatusEmitsGaugesWhenPrintSuppressed(t *testing.T) { + spy := &progressGaugeSpy{} + ctx := base.NewMigrationContext() + ctx.Metrics = spy + ctx.StartTime = time.Now().Add(-99 * time.Second) + atomic.StoreInt64(&ctx.TotalRowsCopied, 1000) + atomic.StoreInt64(&ctx.RowsEstimate, 5000) + atomic.StoreInt64(&ctx.EtaRowsPerSecond, 1) + + migrator := NewMigrator(ctx, "test") + snap := migrator.migrationProgressSnapshot() + _, _, etaDuration := migrator.getMigrationStateAndETA(snap.rowsEstimate) + require.False(t, migrator.shouldPrintStatus(HeuristicPrintStatusRule, snap.elapsedSeconds, etaDuration)) + + migrator.reportStatus(HeuristicPrintStatusRule, io.Discard) + require.Len(t, spy.names, 8) + assert.Equal(t, []float64{1000, 5000, 0, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6]) +} + +func TestReportStatusEmitsLagGaugesWhenThrottled(t *testing.T) { + spy := &progressGaugeSpy{} + ctx := base.NewMigrationContext() + ctx.Metrics = spy + ctx.SetThrottled(true, "max-lag-millis", base.NoThrottleReasonHint) + atomic.StoreInt64(&ctx.CurrentLag, int64(5*time.Second)) + ctx.SetLastHeartbeatOnChangelogTime(time.Now().Add(-4 * time.Second)) + + migrator := NewMigrator(ctx, "test") + migrator.reportStatus(NoPrintStatusRule, io.Discard) + + require.GreaterOrEqual(t, len(spy.names), 8) + assert.Equal(t, "lag.replication_seconds", spy.names[6]) + assert.Equal(t, "lag.heartbeat_seconds", spy.names[7]) + require.Len(t, spy.tags[6], 1) + assert.Equal(t, "throttled:true", spy.tags[6][0]) + assert.Equal(t, "throttled:true", spy.tags[7][0]) +} + func TestMigratorShouldPrintStatus(t *testing.T) { migrationContext := base.NewMigrationContext() migrator := NewMigrator(migrationContext, "1.2.3") diff --git a/go/logic/progress_snapshot.go b/go/logic/progress_snapshot.go new file mode 100644 index 000000000..375532cca --- /dev/null +++ b/go/logic/progress_snapshot.go @@ -0,0 +1,74 @@ +/* + Copyright 2022 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package logic + +import ( + "sync/atomic" + "time" +) + +// migrationProgressSnapshot captures row-copy, DML, backlog, and lag at a single point in time +// for status output. +type migrationProgressSnapshot struct { + totalRowsCopied int64 + rowsEstimate int64 + progressPct float64 + dmlApplied int64 + applyEventsBacklog int + applyEventsCapacity int + state string + eta string + etaDuration time.Duration + elapsedTime time.Duration + elapsedRowCopyTime time.Duration + elapsedSeconds int64 + streamerBinlogPosition string + replicationLagSeconds float64 + heartbeatLagSeconds float64 +} + +func (mgtr *Migrator) migrationProgressSnapshot() migrationProgressSnapshot { + totalRowsCopied := mgtr.migrationContext.GetTotalRowsCopied() + rowsEstimate := atomic.LoadInt64(&mgtr.migrationContext.RowsEstimate) + atomic.LoadInt64(&mgtr.migrationContext.RowsDeltaEstimate) + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { + // Done copying rows. The totalRowsCopied value is the de-facto number of rows, + // and there is no further need to keep updating the value. + rowsEstimate = totalRowsCopied + } + progressPct := mgtr.getProgressPercent(rowsEstimate) + state, eta, etaDuration := mgtr.getMigrationStateAndETA(rowsEstimate) + elapsedTime := mgtr.migrationContext.ElapsedTime() + + streamerBinlogPosition := "" + if mgtr.eventsStreamer != nil { + streamerBinlogPosition = mgtr.eventsStreamer.GetCurrentBinlogCoordinates().DisplayString() + } + + return migrationProgressSnapshot{ + totalRowsCopied: totalRowsCopied, + rowsEstimate: rowsEstimate, + progressPct: progressPct, + dmlApplied: atomic.LoadInt64(&mgtr.migrationContext.TotalDMLEventsApplied), + applyEventsBacklog: len(mgtr.applyEventsQueue), + applyEventsCapacity: cap(mgtr.applyEventsQueue), + state: state, + eta: eta, + etaDuration: etaDuration, + elapsedTime: elapsedTime, + elapsedRowCopyTime: mgtr.migrationContext.ElapsedRowCopyTime(), + elapsedSeconds: int64(elapsedTime.Seconds()), + streamerBinlogPosition: streamerBinlogPosition, + replicationLagSeconds: mgtr.migrationContext.GetCurrentLagDuration().Seconds(), + heartbeatLagSeconds: mgtr.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds(), + } +} + +func (mgtr *Migrator) sampleMigrationProgress() migrationProgressSnapshot { + snap := mgtr.migrationProgressSnapshot() + mgtr.migrationContext.SetProgressPct(snap.progressPct) + mgtr.migrationContext.SetETADuration(snap.etaDuration) + return snap +} diff --git a/go/logic/progress_snapshot_test.go b/go/logic/progress_snapshot_test.go new file mode 100644 index 000000000..1e42faf13 --- /dev/null +++ b/go/logic/progress_snapshot_test.go @@ -0,0 +1,76 @@ +/* + Copyright 2022 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package logic + +import ( + "bytes" + "io" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/github/gh-ost/go/base" +) + +func TestMigrationProgressSnapshotCapturesBacklogAndLag(t *testing.T) { + ctx := base.NewMigrationContext() + migrator := NewMigrator(ctx, "test") + atomic.StoreInt64(&ctx.TotalRowsCopied, 100) + atomic.StoreInt64(&ctx.CurrentLag, int64(3*time.Second)) + ctx.SetLastHeartbeatOnChangelogTime(time.Now().Add(-2 * time.Second)) + + snap := migrator.migrationProgressSnapshot() + assert.Equal(t, 0, snap.applyEventsBacklog) + assert.Equal(t, cap(migrator.applyEventsQueue), snap.applyEventsCapacity) + assert.InDelta(t, 3.0, snap.replicationLagSeconds, 0.01) + assert.InDelta(t, 2.0, snap.heartbeatLagSeconds, 0.5) + assert.Empty(t, snap.streamerBinlogPosition) +} + +func TestMigrationProgressSnapshotWhenRowCopyComplete(t *testing.T) { + ctx := base.NewMigrationContext() + migrator := NewMigrator(ctx, "test") + atomic.StoreInt64(&ctx.TotalRowsCopied, 5000) + atomic.StoreInt64(&ctx.RowsEstimate, 10000) + atomic.StoreInt64(&migrator.rowCopyCompleteFlag, 1) + + snap := migrator.migrationProgressSnapshot() + assert.Equal(t, int64(5000), snap.totalRowsCopied) + assert.Equal(t, int64(5000), snap.rowsEstimate) + assert.InDelta(t, 100.0, snap.progressPct, 0.01) +} + +func TestReportStatusSamplesProgress(t *testing.T) { + ctx := base.NewMigrationContext() + atomic.StoreInt64(&ctx.TotalRowsCopied, 1000) + atomic.StoreInt64(&ctx.RowsEstimate, 5000) + + migrator := NewMigrator(ctx, "test") + migrator.reportStatus(NoPrintStatusRule, io.Discard) + assert.InDelta(t, 20.0, ctx.GetProgressPct(), 0.01) +} + +func TestReportStatusUpdatesContextWhenPrintSuppressed(t *testing.T) { + ctx := base.NewMigrationContext() + ctx.StartTime = time.Now().Add(-99 * time.Second) + atomic.StoreInt64(&ctx.TotalRowsCopied, 1000) + atomic.StoreInt64(&ctx.RowsEstimate, 5000) + atomic.StoreInt64(&ctx.EtaRowsPerSecond, 1) + + migrator := NewMigrator(ctx, "test") + snap := migrator.migrationProgressSnapshot() + require.False(t, migrator.shouldPrintStatus(HeuristicPrintStatusRule, snap.elapsedSeconds, snap.etaDuration)) + + var buf bytes.Buffer + migrator.printStatus(HeuristicPrintStatusRule, snap, &buf) + assert.Empty(t, buf.String(), "heuristic rule should suppress status line output") + + migrator.reportStatus(HeuristicPrintStatusRule, io.Discard) + assert.InDelta(t, 20.0, ctx.GetProgressPct(), 0.01) +} diff --git a/go/logic/throttler.go b/go/logic/throttler.go index 2535579ab..ee6e3d132 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -14,6 +14,7 @@ import ( "time" "github.com/github/gh-ost/go/base" + "github.com/github/gh-ost/go/metrics" "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" ) @@ -38,7 +39,10 @@ var ( } ) -const frenoMagicHint = "freno" +const ( + frenoMagicHint = "freno" + throttleActiveMetricInterval = time.Second +) // Throttler collects metrics related to throttling and makes informed decision // whether throttling should take place. @@ -50,6 +54,10 @@ type Throttler struct { httpClientTimeout time.Duration inspector *Inspector finishedMigrating int64 + + throttleStartedAt time.Time + throttleStartedReason string + throttleActiveEmitted time.Time } func NewThrottler(migrationContext *base.MigrationContext, applier *Applier, inspector *Inspector, appVersion string) *Throttler { @@ -463,6 +471,32 @@ func (thlr *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan }() } +func (thlr *Throttler) recordThrottleMetrics(alreadyThrottling bool, shouldThrottle bool, throttleReason string, now time.Time) { + throttleStateChanged := shouldThrottle != alreadyThrottling + shouldEmitActiveGauge := throttleStateChanged || + thlr.throttleActiveEmitted.IsZero() || + now.Sub(thlr.throttleActiveEmitted) >= throttleActiveMetricInterval + if shouldEmitActiveGauge { + metrics.EmitThrottleActiveGauge(thlr.migrationContext.Metrics, shouldThrottle) + thlr.throttleActiveEmitted = now + } + + if shouldThrottle && !alreadyThrottling { + thlr.throttleStartedAt = now + thlr.throttleStartedReason = throttleReason + return + } + if alreadyThrottling && !shouldThrottle && !thlr.throttleStartedAt.IsZero() { + metrics.EmitThrottleInterval( + thlr.migrationContext.Metrics, + now.Sub(thlr.throttleStartedAt), + thlr.throttleStartedReason, + ) + thlr.throttleStartedAt = time.Time{} + thlr.throttleStartedReason = "" + } +} + // initiateThrottlerChecks initiates the throttle ticker and sets the basic behavior of throttling. func (thlr *Throttler) initiateThrottlerChecks() { throttlerFunction := func() { @@ -478,6 +512,7 @@ func (thlr *Throttler) initiateThrottlerChecks() { // End of throttling thlr.applier.WriteAndLogChangelog("throttle", "done throttling") } + thlr.recordThrottleMetrics(alreadyThrottling, shouldThrottle, throttleReason, time.Now()) thlr.migrationContext.SetThrottled(shouldThrottle, throttleReason, throttleReasonHint) } throttlerFunction() diff --git a/go/logic/throttler_test.go b/go/logic/throttler_test.go index f2d3c193a..01104805b 100644 --- a/go/logic/throttler_test.go +++ b/go/logic/throttler_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/github/gh-ost/go/base" ) @@ -108,3 +109,53 @@ func TestThrottleCallsOnThrottledCallback(t *testing.T) { t.Fatal("throttle() did not return after context cancellation") } } + +type throttleMetricsSpy struct { + gaugeNames []string + gaugeValues []float64 + histogramNames []string + histogramValues []float64 + countNames []string + countValues []int64 + tags [][]string +} + +func (s *throttleMetricsSpy) Gauge(name string, value float64, tags ...string) { + s.gaugeNames = append(s.gaugeNames, name) + s.gaugeValues = append(s.gaugeValues, value) + s.tags = append(s.tags, append([]string(nil), tags...)) +} + +func (s *throttleMetricsSpy) Count(name string, value int64, tags ...string) { + s.countNames = append(s.countNames, name) + s.countValues = append(s.countValues, value) + s.tags = append(s.tags, append([]string(nil), tags...)) +} + +func (s *throttleMetricsSpy) Histogram(name string, value float64, tags ...string) { + s.histogramNames = append(s.histogramNames, name) + s.histogramValues = append(s.histogramValues, value) + s.tags = append(s.tags, append([]string(nil), tags...)) +} + +func TestRecordThrottleMetricsEmitsOneIntervalMetricOnThrottleExit(t *testing.T) { + thlr := newTestThrottler() + spy := &throttleMetricsSpy{} + thlr.migrationContext.Metrics = spy + startedAt := time.Unix(100, 0) + + thlr.recordThrottleMetrics(false, true, "commanded by user", startedAt) + thlr.recordThrottleMetrics(true, true, "commanded by user", startedAt.Add(500*time.Millisecond)) + thlr.recordThrottleMetrics(true, true, "commanded by user", startedAt.Add(1100*time.Millisecond)) + thlr.recordThrottleMetrics(true, false, "", startedAt.Add(2500*time.Millisecond)) + + require.Equal(t, []string{"throttle.active", "throttle.active", "throttle.active"}, spy.gaugeNames) + require.Equal(t, []float64{1, 1, 0}, spy.gaugeValues) + require.Equal(t, []string{"throttle.duration_milliseconds"}, spy.histogramNames) + require.Equal(t, []float64{2500}, spy.histogramValues) + require.Equal(t, []string{"throttle.events_total"}, spy.countNames) + require.Equal(t, []int64{1}, spy.countValues) + require.Len(t, spy.tags, 5) + assert.Equal(t, []string{"reason:commanded by user"}, spy.tags[3]) + assert.Equal(t, []string{"reason:commanded by user"}, spy.tags[4]) +} diff --git a/go/metrics/client.go b/go/metrics/client.go index ed6acc096..b21fb1458 100644 --- a/go/metrics/client.go +++ b/go/metrics/client.go @@ -21,6 +21,14 @@ type Client struct { sd *statsd.Client } +// Emitter is the unified metrics abstraction implemented by *Client. Helpers in +// the metrics package take an Emitter so tests can supply spies without UDP. +type Emitter interface { + Gauge(name string, value float64, tags ...string) + Count(name string, value int64, tags ...string) + Histogram(name string, value float64, tags ...string) +} + // NewClient connects to addr for StatsD. If addr is empty, returns Noop and nil error. // namespace is typically "gh_ost." (metrics are named namespace + short name, e.g. gh_ost.startup). // tags are global tags applied to every metric (repeatable --statsd-tags). @@ -60,6 +68,13 @@ func (c *Client) Count(name string, value int64, tags ...string) { _ = c.sd.Count(name, value, tags, 1.0) } +func (c *Client) Histogram(name string, value float64, tags ...string) { + if c.sd == nil { + return + } + _ = c.sd.Histogram(name, value, tags, 1.0) +} + // Close flushes buffered metrics; safe for Noop. func (c *Client) Close() error { if c.sd == nil { diff --git a/go/metrics/emit.go b/go/metrics/emit.go new file mode 100644 index 000000000..cc63a9c8c --- /dev/null +++ b/go/metrics/emit.go @@ -0,0 +1,197 @@ +/* + Copyright 2026 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package metrics + +import ( + "context" + "fmt" + "runtime" + "time" +) + +// EmitProgressGauges emits row-copy and DML progress gauges (namespace is applied by the client): +// gh_ost.row_copy.rows_copied, gh_ost.row_copy.rows_estimate, gh_ost.dml.events_applied. +func EmitProgressGauges(emit Emitter, rowsCopied, rowsEstimate, dmlEventsApplied int64) { + if emit == nil { + return + } + emit.Gauge("row_copy.rows_copied", float64(rowsCopied)) + emit.Gauge("row_copy.rows_estimate", float64(rowsEstimate)) + emit.Gauge("dml.events_applied", float64(dmlEventsApplied)) +} + +// EmitBinlogBacklogGauges emits apply-events queue depth gauges (namespace is applied by the client): +// gh_ost.binlog.backlog_size, gh_ost.binlog.backlog_capacity, gh_ost.binlog.backlog_utilization. +func EmitBinlogBacklogGauges(emit Emitter, backlogSize, backlogCapacity int) { + if emit == nil { + return + } + emit.Gauge("binlog.backlog_size", float64(backlogSize)) + emit.Gauge("binlog.backlog_capacity", float64(backlogCapacity)) + emit.Gauge("binlog.backlog_utilization", binlogBacklogUtilization(backlogSize, backlogCapacity)) +} + +func binlogBacklogUtilization(backlogSize, backlogCapacity int) float64 { + if backlogCapacity <= 0 { + return 0 + } + utilization := float64(backlogSize) / float64(backlogCapacity) + if utilization > 1 { + return 1 + } + if utilization < 0 { + return 0 + } + return utilization +} + +// EmitLagGauges emits replication and heartbeat lag gauges (namespace is applied by the client): +// gh_ost.lag.replication_seconds, gh_ost.lag.heartbeat_seconds, each tagged throttled:true|false. +// +// These are point-in-time readings each status tick (not a distribution), so gauges are used +// rather than histograms; DogStatsD histogram aggregation exposes count/max series that do not +// match the log line lag values in Prometheus/Grafana. +func EmitLagGauges(emit Emitter, replicationLagSeconds, heartbeatLagSeconds float64, throttled bool) { + if emit == nil { + return + } + tags := []string{fmt.Sprintf("throttled:%t", throttled)} + emit.Gauge("lag.replication_seconds", replicationLagSeconds, tags...) + emit.Gauge("lag.heartbeat_seconds", heartbeatLagSeconds, tags...) +} + +// EmitGoRuntimeGauges emits gh_ost.go_runtime.* gauges (namespace is applied by the client). +// m and numGoroutine are typically from runtime.ReadMemStats and runtime.NumGoroutine. +func EmitGoRuntimeGauges(emit Emitter, m *runtime.MemStats, numGoroutine int) { + if emit == nil || m == nil { + return + } + emit.Gauge("go_runtime.alloc_bytes", float64(m.Alloc)) + emit.Gauge("go_runtime.sys_bytes", float64(m.Sys)) + emit.Gauge("go_runtime.heap_inuse_bytes", float64(m.HeapInuse)) + emit.Gauge("go_runtime.num_gc", float64(m.NumGC)) + emit.Gauge("go_runtime.gc_pause_total_ns", float64(m.PauseTotalNs)) + emit.Gauge("go_runtime.goroutines", float64(numGoroutine)) +} + +// StartGoRuntimeReporter periodically samples runtime memory and goroutines and emits gauges +// until ctx is cancelled. It is a no-op when interval <= 0, client is nil, or StatsD is disabled +// (noop client). +func StartGoRuntimeReporter(ctx context.Context, client *Client, interval time.Duration) { + if ctx == nil || client == nil || interval <= 0 || client.sd == nil { + return + } + + emit := func() { + var m runtime.MemStats + runtime.ReadMemStats(&m) + EmitGoRuntimeGauges(client, &m, runtime.NumGoroutine()) + } + + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + emit() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + emit() + } + } + }() +} + +// EmitThrottleActiveGauge emits whether throttling is currently active. +func EmitThrottleActiveGauge(emit Emitter, active bool) { + if emit == nil { + return + } + if active { + emit.Gauge("throttle.active", 1) + return + } + emit.Gauge("throttle.active", 0) +} + +// EmitThrottleInterval emits one event and one duration sample for a completed +// throttled interval. +func EmitThrottleInterval(emit Emitter, duration time.Duration, reason string) { + if emit == nil { + return + } + tags := []string{fmt.Sprintf("reason:%s", reason)} + emit.Histogram("throttle.duration_milliseconds", float64(duration.Milliseconds()), tags...) + emit.Count("throttle.events_total", 1, tags...) +} + +const ( + CutOverOutcomeSuccess = "success" + CutOverOutcomeRetry = "retry" + CutOverOutcomeAbort = "abort" + + CutOverPhaseMagicLock = "magic_lock" + CutOverPhaseOriginalTableLock = "original_table_lock" + CutOverPhaseMagicRename = "magic_rename" + CutOverPhaseUnlock = "unlock" +) + +// RecordCutOverPhase emits gh_ost.cut_over.phase_duration_milliseconds. +func RecordCutOverPhase(emit Emitter, phase string, duration time.Duration, err error) { + if emit == nil || phase == "" || duration < 0 { + return + } + emit.Histogram("cut_over.phase_duration_milliseconds", float64(duration.Milliseconds()), "phase:"+phase, "outcome:"+cutOverOutcomeFromError(err)) +} + +// RecordCutOverAttempt emits gh_ost.cut_over.attempts_total. +func RecordCutOverAttempt(emit Emitter, outcome string) { + if emit == nil || outcome == "" { + return + } + emit.Count("cut_over.attempts_total", 1, "outcome:"+outcome) +} + +// RecordCutOverTotal emits gh_ost.cut_over.total_duration_milliseconds for terminal cut-over outcomes. +func RecordCutOverTotal(emit Emitter, duration time.Duration, err error) { + if emit == nil || duration < 0 { + return + } + emit.Histogram("cut_over.total_duration_milliseconds", float64(duration.Milliseconds()), "outcome:"+cutOverOutcomeFromError(err)) +} + +func cutOverOutcomeFromError(err error) string { + if err != nil { + return CutOverOutcomeAbort + } + return CutOverOutcomeSuccess +} + +// RecordSleep emits per-stage sleep/wait metrics (namespace is applied by the client): +// gh_ost.sleep.duration_milliseconds and gh_ost.sleep.total_milliseconds, both tagged by stage. +func RecordSleep(emit Emitter, stage string, d time.Duration) { + if emit == nil || stage == "" || d < 0 { + return + } + tags := []string{"stage:" + stage} + milliseconds := d.Milliseconds() + emit.Histogram("sleep.duration_milliseconds", float64(milliseconds), tags...) + emit.Count("sleep.total_milliseconds", milliseconds, tags...) +} + +// RecordQueryDuration emits gh_ost.query.duration_milliseconds with side/kind/outcome tags. +func RecordQueryDuration(emit Emitter, side string, kind string, duration time.Duration, err error) { + if emit == nil || side == "" || kind == "" || duration < 0 { + return + } + outcome := "ok" + if err != nil { + outcome = "error" + } + emit.Histogram("query.duration_milliseconds", float64(duration.Milliseconds()), "side:"+side, "kind:"+kind, "outcome:"+outcome) +} diff --git a/go/metrics/emit_test.go b/go/metrics/emit_test.go new file mode 100644 index 000000000..35f64eee7 --- /dev/null +++ b/go/metrics/emit_test.go @@ -0,0 +1,429 @@ +/* + Copyright 2026 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package metrics + +import ( + "context" + "errors" + "runtime" + "slices" + "testing" + "time" +) + +type gaugeSpy struct { + names []string + values []float64 + tags [][]string +} + +func (g *gaugeSpy) Gauge(name string, value float64, tags ...string) { + g.names = append(g.names, name) + g.values = append(g.values, value) + g.tags = append(g.tags, append([]string(nil), tags...)) +} + +func (g *gaugeSpy) Count(name string, value int64, tags ...string) { +} + +func (g *gaugeSpy) Histogram(name string, value float64, tags ...string) { +} + +func TestEmitProgressGauges(t *testing.T) { + spy := &gaugeSpy{} + EmitProgressGauges(spy, 1000, 5000, 42) + + wantNames := []string{ + "row_copy.rows_copied", + "row_copy.rows_estimate", + "dml.events_applied", + } + wantVals := []float64{1000, 5000, 42} + + if len(spy.names) != len(wantNames) { + t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames)) + } + for i := range wantNames { + if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] { + t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i]) + } + } +} + +func TestEmitProgressGauges_nilSafe(t *testing.T) { + EmitProgressGauges(nil, 1, 2, 3) +} + +func TestEmitBinlogBacklogGauges(t *testing.T) { + spy := &gaugeSpy{} + EmitBinlogBacklogGauges(spy, 250, 1000) + + wantNames := []string{ + "binlog.backlog_size", + "binlog.backlog_capacity", + "binlog.backlog_utilization", + } + wantVals := []float64{250, 1000, 0.25} + + if len(spy.names) != len(wantNames) { + t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames)) + } + for i := range wantNames { + if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] { + t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i]) + } + } +} + +func TestEmitBinlogBacklogGauges_nilSafe(t *testing.T) { + EmitBinlogBacklogGauges(nil, 1, 2) +} + +func TestBinlogBacklogUtilization(t *testing.T) { + tests := []struct { + size, capacity int + want float64 + }{ + {0, 1000, 0}, + {250, 1000, 0.25}, + {1000, 1000, 1}, + {1500, 1000, 1}, + {-1, 1000, 0}, + {10, 0, 0}, + } + for _, tt := range tests { + got := binlogBacklogUtilization(tt.size, tt.capacity) + if got != tt.want { + t.Fatalf("utilization(%d, %d) = %v, want %v", tt.size, tt.capacity, got, tt.want) + } + } +} + +func TestEmitLagGauges_notThrottled(t *testing.T) { + spy := &gaugeSpy{} + EmitLagGauges(spy, 2.5, 1.25, false) + + wantNames := []string{"lag.replication_seconds", "lag.heartbeat_seconds"} + wantVals := []float64{2.5, 1.25} + wantTags := []string{"throttled:false"} + + if len(spy.names) != len(wantNames) { + t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames)) + } + for i := range wantNames { + if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] { + t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i]) + } + if len(spy.tags[i]) != 1 || spy.tags[i][0] != wantTags[0] { + t.Fatalf("[%d] got tags %v want [%s]", i, spy.tags[i], wantTags[0]) + } + } +} + +func TestEmitLagGauges_throttled(t *testing.T) { + spy := &gaugeSpy{} + EmitLagGauges(spy, 4.0, 3.0, true) + + if len(spy.names) != 2 { + t.Fatalf("got %d gauges, want 2", len(spy.names)) + } + for i := range spy.names { + if len(spy.tags[i]) != 1 || spy.tags[i][0] != "throttled:true" { + t.Fatalf("[%d] got tags %v want [throttled:true]", i, spy.tags[i]) + } + } +} + +func TestEmitLagGauges_nilSafe(t *testing.T) { + EmitLagGauges(nil, 1, 2, false) +} + +func TestEmitGoRuntimeGauges(t *testing.T) { + spy := &gaugeSpy{} + m := &runtime.MemStats{ + Alloc: 100, + Sys: 200, + HeapInuse: 300, + NumGC: 7, + PauseTotalNs: 42, + } + EmitGoRuntimeGauges(spy, m, 123) + + wantNames := []string{ + "go_runtime.alloc_bytes", + "go_runtime.sys_bytes", + "go_runtime.heap_inuse_bytes", + "go_runtime.num_gc", + "go_runtime.gc_pause_total_ns", + "go_runtime.goroutines", + } + wantVals := []float64{100, 200, 300, 7, 42, 123} + + if len(spy.names) != len(wantNames) { + t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames)) + } + for i := range wantNames { + if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] { + t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i]) + } + } +} + +func TestEmitGoRuntimeGauges_nilSafe(t *testing.T) { + EmitGoRuntimeGauges(nil, &runtime.MemStats{}, 1) + EmitGoRuntimeGauges(&gaugeSpy{}, nil, 1) +} + +func TestStartGoRuntimeReporter_stopsOnCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + c := &Client{} // sd nil, so the reporter should not start. + StartGoRuntimeReporter(ctx, c, time.Millisecond) + cancel() + time.Sleep(20 * time.Millisecond) +} + +type throttleSpy struct { + gaugeNames []string + gaugeValues []float64 + histogramNames []string + histogramValues []float64 + countNames []string + countValues []int64 + tags [][]string +} + +func (s *throttleSpy) Gauge(name string, value float64, tags ...string) { + s.gaugeNames = append(s.gaugeNames, name) + s.gaugeValues = append(s.gaugeValues, value) + s.tags = append(s.tags, append([]string(nil), tags...)) +} + +func (s *throttleSpy) Count(name string, value int64, tags ...string) { + s.countNames = append(s.countNames, name) + s.countValues = append(s.countValues, value) + s.tags = append(s.tags, append([]string(nil), tags...)) +} + +func (s *throttleSpy) Histogram(name string, value float64, tags ...string) { + s.histogramNames = append(s.histogramNames, name) + s.histogramValues = append(s.histogramValues, value) + s.tags = append(s.tags, append([]string(nil), tags...)) +} + +func TestEmitThrottleActiveGauge(t *testing.T) { + spy := &throttleSpy{} + + EmitThrottleActiveGauge(spy, true) + EmitThrottleActiveGauge(spy, false) + + if len(spy.gaugeNames) != 2 { + t.Fatalf("got %d gauges, want 2", len(spy.gaugeNames)) + } + if spy.gaugeNames[0] != "throttle.active" || spy.gaugeValues[0] != 1 { + t.Fatalf("got %s=%v, want throttle.active=1", spy.gaugeNames[0], spy.gaugeValues[0]) + } + if spy.gaugeNames[1] != "throttle.active" || spy.gaugeValues[1] != 0 { + t.Fatalf("got %s=%v, want throttle.active=0", spy.gaugeNames[1], spy.gaugeValues[1]) + } +} + +func TestEmitThrottleInterval(t *testing.T) { + spy := &throttleSpy{} + + EmitThrottleInterval(spy, 1500*time.Millisecond, "commanded by user") + + if len(spy.histogramNames) != 1 { + t.Fatalf("got %d histograms, want 1", len(spy.histogramNames)) + } + if spy.histogramNames[0] != "throttle.duration_milliseconds" || spy.histogramValues[0] != 1500 { + t.Fatalf("got %s=%v, want throttle.duration_milliseconds=1500", spy.histogramNames[0], spy.histogramValues[0]) + } + if len(spy.countNames) != 1 { + t.Fatalf("got %d counts, want 1", len(spy.countNames)) + } + if spy.countNames[0] != "throttle.events_total" || spy.countValues[0] != 1 { + t.Fatalf("got %s=%v, want throttle.events_total=1", spy.countNames[0], spy.countValues[0]) + } + if len(spy.tags) != 2 || len(spy.tags[0]) != 1 || spy.tags[0][0] != "reason:commanded by user" || + len(spy.tags[1]) != 1 || spy.tags[1][0] != "reason:commanded by user" { + t.Fatalf("got tags %v, want [reason:commanded by user]", spy.tags) + } +} + +func TestEmitThrottleIntervalNilSafe(t *testing.T) { + EmitThrottleActiveGauge(nil, true) + EmitThrottleInterval(nil, time.Second, "test") + EmitThrottleInterval(&gaugeSpy{}, time.Second, "test") +} + +type cutOverSpy struct { + histogramNames []string + histogramValues []float64 + histogramTags [][]string + countNames []string + countValues []int64 + countTags [][]string +} + +func (s *cutOverSpy) Gauge(_ string, _ float64, _ ...string) {} + +func (s *cutOverSpy) Histogram(name string, value float64, tags ...string) { + s.histogramNames = append(s.histogramNames, name) + s.histogramValues = append(s.histogramValues, value) + s.histogramTags = append(s.histogramTags, tags) +} + +func (s *cutOverSpy) Count(name string, value int64, tags ...string) { + s.countNames = append(s.countNames, name) + s.countValues = append(s.countValues, value) + s.countTags = append(s.countTags, tags) +} + +func TestRecordCutOverMetrics(t *testing.T) { + spy := &cutOverSpy{} + + RecordCutOverPhase(spy, CutOverPhaseMagicLock, 1500*time.Millisecond, nil) + RecordCutOverAttempt(spy, CutOverOutcomeSuccess) + RecordCutOverTotal(spy, 2*time.Second, errors.New("boom")) + + if len(spy.histogramNames) != 2 { + t.Fatalf("got %d histograms, want 2", len(spy.histogramNames)) + } + if spy.histogramNames[0] != "cut_over.phase_duration_milliseconds" || spy.histogramValues[0] != 1500 { + t.Fatalf("got first histogram %s=%v", spy.histogramNames[0], spy.histogramValues[0]) + } + if !slices.Equal(spy.histogramTags[0], []string{"phase:magic_lock", "outcome:success"}) { + t.Fatalf("got phase tags %#v", spy.histogramTags[0]) + } + if spy.histogramNames[1] != "cut_over.total_duration_milliseconds" || spy.histogramValues[1] != 2000 { + t.Fatalf("got second histogram %s=%v", spy.histogramNames[1], spy.histogramValues[1]) + } + if !slices.Equal(spy.histogramTags[1], []string{"outcome:abort"}) { + t.Fatalf("got total tags %#v", spy.histogramTags[1]) + } + if len(spy.countNames) != 1 || spy.countNames[0] != "cut_over.attempts_total" || spy.countValues[0] != 1 { + t.Fatalf("got counts %#v values %#v", spy.countNames, spy.countValues) + } + if !slices.Equal(spy.countTags[0], []string{"outcome:success"}) { + t.Fatalf("got count tags %#v", spy.countTags[0]) + } +} + +func TestRecordCutOverMetricsNilSafe(t *testing.T) { + RecordCutOverPhase(nil, CutOverPhaseMagicLock, time.Second, nil) + RecordCutOverAttempt(nil, CutOverOutcomeSuccess) + RecordCutOverTotal(nil, time.Second, nil) +} + +type histogramSpy struct { + names []string + values []float64 + tags [][]string +} + +func (h *histogramSpy) Gauge(_ string, _ float64, _ ...string) {} + +func (h *histogramSpy) Count(_ string, _ int64, _ ...string) {} + +func (h *histogramSpy) Histogram(name string, value float64, tags ...string) { + h.names = append(h.names, name) + h.values = append(h.values, value) + h.tags = append(h.tags, tags) +} + +func TestRecordQueryDuration(t *testing.T) { + spy := &histogramSpy{} + + RecordQueryDuration(spy, "source", "row_count", 1500*time.Millisecond, nil) + RecordQueryDuration(spy, "target", "binlog_apply", 2*time.Second, errors.New("boom")) + + if len(spy.names) != 2 { + t.Fatalf("got %d histograms, want 2", len(spy.names)) + } + if spy.names[0] != "query.duration_milliseconds" || spy.values[0] != 1500 { + t.Fatalf("got %s=%v, want query.duration_milliseconds=1500", spy.names[0], spy.values[0]) + } + if !slices.Equal(spy.tags[0], []string{"side:source", "kind:row_count", "outcome:ok"}) { + t.Fatalf("got tags %#v", spy.tags[0]) + } + if spy.values[1] != 2000 || !slices.Equal(spy.tags[1], []string{"side:target", "kind:binlog_apply", "outcome:error"}) { + t.Fatalf("got second metric value=%v tags=%#v", spy.values[1], spy.tags[1]) + } +} + +func TestRecordQueryDurationNilSafe(t *testing.T) { + RecordQueryDuration(nil, "source", "row_count", time.Second, nil) + RecordQueryDuration(&histogramSpy{}, "", "row_count", time.Second, nil) + RecordQueryDuration(&histogramSpy{}, "source", "", time.Second, nil) + RecordQueryDuration(&histogramSpy{}, "source", "row_count", -time.Second, nil) +} + +type sleepSpy struct { + histogramNames []string + histogramValues []float64 + histogramTags [][]string + countNames []string + countValues []int64 + countTags [][]string +} + +func (s *sleepSpy) Gauge(_ string, _ float64, _ ...string) {} + +func (s *sleepSpy) Histogram(name string, value float64, tags ...string) { + s.histogramNames = append(s.histogramNames, name) + s.histogramValues = append(s.histogramValues, value) + s.histogramTags = append(s.histogramTags, tags) +} + +func (s *sleepSpy) Count(name string, value int64, tags ...string) { + s.countNames = append(s.countNames, name) + s.countValues = append(s.countValues, value) + s.countTags = append(s.countTags, tags) +} + +func TestRecordSleep(t *testing.T) { + spy := &sleepSpy{} + + RecordSleep(spy, "retry_backoff", 2*time.Second) + + if len(spy.histogramNames) != 1 { + t.Fatalf("got %d histograms, want 1", len(spy.histogramNames)) + } + if spy.histogramNames[0] != "sleep.duration_milliseconds" || spy.histogramValues[0] != 2000 { + t.Fatalf("got histogram %s=%v, want sleep.duration_milliseconds=2000", spy.histogramNames[0], spy.histogramValues[0]) + } + if !slices.Equal(spy.histogramTags[0], []string{"stage:retry_backoff"}) { + t.Fatalf("got histogram tags %#v", spy.histogramTags[0]) + } + if len(spy.countNames) != 1 { + t.Fatalf("got %d counts, want 1", len(spy.countNames)) + } + if spy.countNames[0] != "sleep.total_milliseconds" || spy.countValues[0] != 2000 { + t.Fatalf("got count %s=%v, want sleep.total_milliseconds=2000", spy.countNames[0], spy.countValues[0]) + } + if !slices.Equal(spy.countTags[0], []string{"stage:retry_backoff"}) { + t.Fatalf("got count tags %#v", spy.countTags[0]) + } +} + +func TestRecordSleepSubSecond(t *testing.T) { + spy := &sleepSpy{} + + RecordSleep(spy, "replica_wait", 500*time.Millisecond) + + if spy.histogramNames[0] != "sleep.duration_milliseconds" || spy.histogramValues[0] != 500 { + t.Fatalf("got histogram %s=%v, want sleep.duration_milliseconds=500", spy.histogramNames[0], spy.histogramValues[0]) + } + if spy.countNames[0] != "sleep.total_milliseconds" || spy.countValues[0] != 500 { + t.Fatalf("got count %s=%v, want sleep.total_milliseconds=500", spy.countNames[0], spy.countValues[0]) + } +} + +func TestRecordSleepNilSafe(t *testing.T) { + RecordSleep(nil, "retry_backoff", time.Second) + RecordSleep(&sleepSpy{}, "", time.Second) + RecordSleep(&sleepSpy{}, "retry_backoff", -time.Second) +} diff --git a/go/metrics/go_runtime.go b/go/metrics/go_runtime.go deleted file mode 100644 index 24ae2c6b5..000000000 --- a/go/metrics/go_runtime.go +++ /dev/null @@ -1,61 +0,0 @@ -/* - Copyright 2022 GitHub Inc. - See https://github.com/github/gh-ost/blob/master/LICENSE -*/ - -package metrics - -import ( - "context" - "runtime" - "time" -) - -// MemStatsGaugeEmitter is implemented by *Client; used for tests without UDP. -type MemStatsGaugeEmitter interface { - Gauge(name string, value float64, tags ...string) -} - -// EmitGoRuntimeGauges emits gh_ost.go_runtime.* gauges (namespace is applied by the client). -// m and numGoroutine are typically from runtime.ReadMemStats and runtime.NumGoroutine. -func EmitGoRuntimeGauges(emit MemStatsGaugeEmitter, m *runtime.MemStats, numGoroutine int) { - if emit == nil || m == nil { - return - } - emit.Gauge("go_runtime.alloc_bytes", float64(m.Alloc)) - emit.Gauge("go_runtime.sys_bytes", float64(m.Sys)) - emit.Gauge("go_runtime.heap_inuse_bytes", float64(m.HeapInuse)) - emit.Gauge("go_runtime.num_gc", float64(m.NumGC)) - emit.Gauge("go_runtime.gc_pause_total_ns", float64(m.PauseTotalNs)) - emit.Gauge("go_runtime.goroutines", float64(numGoroutine)) -} - -// StartGoRuntimeReporter periodically samples runtime memory and goroutines and emits gauges -// until ctx is cancelled. It is a no-op when interval <= 0, client is nil, or StatsD is disabled -// (noop client). -func StartGoRuntimeReporter(ctx context.Context, client *Client, interval time.Duration) { - if ctx == nil || client == nil || interval <= 0 || client.sd == nil { - return - } - - emit := func() { - var m runtime.MemStats - runtime.ReadMemStats(&m) - EmitGoRuntimeGauges(client, &m, runtime.NumGoroutine()) - } - - go func() { - ticker := time.NewTicker(interval) - defer ticker.Stop() - - emit() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - emit() - } - } - }() -} diff --git a/go/metrics/go_runtime_test.go b/go/metrics/go_runtime_test.go deleted file mode 100644 index 24811206b..000000000 --- a/go/metrics/go_runtime_test.go +++ /dev/null @@ -1,67 +0,0 @@ -/* - Copyright 2022 GitHub Inc. - See https://github.com/github/gh-ost/blob/master/LICENSE -*/ - -package metrics - -import ( - "context" - "runtime" - "testing" - "time" -) - -type gaugeSpy struct { - names []string - values []float64 -} - -func (g *gaugeSpy) Gauge(name string, value float64, _ ...string) { - g.names = append(g.names, name) - g.values = append(g.values, value) -} - -func TestEmitGoRuntimeGauges(t *testing.T) { - spy := &gaugeSpy{} - m := &runtime.MemStats{ - Alloc: 100, - Sys: 200, - HeapInuse: 300, - NumGC: 7, - PauseTotalNs: 42, - } - EmitGoRuntimeGauges(spy, m, 123) - - wantNames := []string{ - "go_runtime.alloc_bytes", - "go_runtime.sys_bytes", - "go_runtime.heap_inuse_bytes", - "go_runtime.num_gc", - "go_runtime.gc_pause_total_ns", - "go_runtime.goroutines", - } - wantVals := []float64{100, 200, 300, 7, 42, 123} - - if len(spy.names) != len(wantNames) { - t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames)) - } - for i := range wantNames { - if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] { - t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i]) - } - } -} - -func TestEmitGoRuntimeGauges_nilSafe(t *testing.T) { - EmitGoRuntimeGauges(nil, &runtime.MemStats{}, 1) - EmitGoRuntimeGauges(&gaugeSpy{}, nil, 1) -} - -func TestStartGoRuntimeReporter_stopsOnCancel(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - c := &Client{} // sd nil — should not start - StartGoRuntimeReporter(ctx, c, time.Millisecond) - cancel() - time.Sleep(20 * time.Millisecond) -}