diff --git a/internal/app/custom_errors/extractor.error.go b/internal/app/custom_errors/extractor.error.go index b282be0..1ab5888 100644 --- a/internal/app/custom_errors/extractor.error.go +++ b/internal/app/custom_errors/extractor.error.go @@ -1,13 +1,7 @@ package custom_errors import ( - "context" - "fmt" - "sync" - - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" - "github.com/google/uuid" ) type ExtractorError struct { @@ -20,100 +14,3 @@ type ExtractorError struct { func (e *ExtractorError) Error() string { return e.Msg } - -func ExtractorErrorHandler( - ctx context.Context, - retryConfig config.RetryConfig, - maxPartitionErrors int, - chErrorsIn <-chan ExtractorError, - chPartitionsOut chan<- models.Partition, - chJobErrorsOut chan<- JobError, - wgActivePartitions *sync.WaitGroup, -) { - definitiveErrors := 0 - - for { - if ctx.Err() != nil { - return - } - - select { - case <-ctx.Done(): - return - - case err, ok := <-chErrorsIn: - if !ok { - return - } - - if err.Partition.RetryCounter >= retryConfig.Attempts { - wgActivePartitions.Done() - definitiveErrors++ - jobError := JobError{ - ShouldCancelJob: false, - Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, retryConfig.Attempts), - Prev: &err, - } - - select { - case chJobErrorsOut <- jobError: - case <-ctx.Done(): - return - } - - if maxPartitionErrors > 0 && definitiveErrors >= maxPartitionErrors { - fatalError := JobError{ - ShouldCancelJob: true, - Msg: fmt.Sprintf("Partition error limit reached (%d)", maxPartitionErrors), - Prev: &err, - } - - select { - case chJobErrorsOut <- fatalError: - case <-ctx.Done(): - return - } - } - - continue - } else { - jobError := JobError{ - ShouldCancelJob: false, - Msg: fmt.Sprintf("Temporal error in partition %v (retries: %d)", err.Partition.Id, err.Partition.RetryCounter), - Prev: &err, - } - - select { - case chJobErrorsOut <- jobError: - case <-ctx.Done(): - return - } - } - - newPartition := err.Partition - newPartition.RetryCounter++ - - delay := computeBackoffDelay( - newPartition.RetryCounter, - retryConfig.BaseDelayMs, - retryConfig.MaxDelayMs, - retryConfig.MaxJitterMs, - ) - - if err.HasLastId { - newPartition.ParentId = err.Partition.Id - newPartition.Id = uuid.New() - newPartition.Range.Min = err.LastId - newPartition.Range.IsMinInclusive = false - } - - requeueWithBackoff(ctx, delay, func() { - select { - case chPartitionsOut <- newPartition: - case <-ctx.Done(): - return - } - }) - } - } -} diff --git a/internal/app/etl/extractors/main.go b/internal/app/etl/extractors/main.go new file mode 100644 index 0000000..e2307ba --- /dev/null +++ b/internal/app/etl/extractors/main.go @@ -0,0 +1,64 @@ +package extractors + +import ( + "context" + "fmt" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" + "github.com/google/uuid" +) + +func errorFromLastPartitionRow( + lastRow models.UnknownRowValues, + indexPrimaryKey int, + partition models.Partition, + previousError error, +) error { + lastIdRawValue := lastRow[indexPrimaryKey] + + lastId, ok := convert.ToInt64(lastIdRawValue) + if !ok { + currentPartition := partition + currentPartition.RetryCounter = 3 + return &custom_errors.ExtractorError{ + Partition: currentPartition, + HasLastId: true, + Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()), + } + + } + + return &custom_errors.ExtractorError{ + Partition: partition, + HasLastId: true, + LastId: lastId, + Msg: previousError.Error(), + } +} + +func sendBatch(ctx context.Context, chBatchesOut chan<- models.Batch, batch models.Batch) error { + select { + case chBatchesOut <- batch: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func flush( + ctx context.Context, + partition *models.Partition, + batchSize int, + batchRows []models.UnknownRowValues, + chBatchesOut chan<- models.Batch, +) error { + if len(batchRows) == 0 { + return nil + } + + batch := models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows} + batchRows = make([]models.UnknownRowValues, 0, batchSize) + return sendBatch(ctx, chBatchesOut, batch) +} diff --git a/internal/app/etl/extractors/mssql.go b/internal/app/etl/extractors/mssql.go index 1157459..71c227f 100644 --- a/internal/app/etl/extractors/mssql.go +++ b/internal/app/etl/extractors/mssql.go @@ -12,7 +12,6 @@ import ( "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" @@ -72,34 +71,6 @@ func buildExtractQueryMssql( return sbQuery.String() } -func errorFromLastRow( - lastRow models.UnknownRowValues, - indexPrimaryKey int, - partition models.Partition, - previousError error, -) *custom_errors.ExtractorError { - lastIdRawValue := lastRow[indexPrimaryKey] - - lastId, ok := convert.ToInt64(lastIdRawValue) - if !ok { - currentPartition := partition - currentPartition.RetryCounter = 3 - return &custom_errors.ExtractorError{ - Partition: currentPartition, - HasLastId: true, - Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()), - } - - } - - return &custom_errors.ExtractorError{ - Partition: partition, - HasLastId: true, - LastId: lastId, - Msg: previousError.Error(), - } -} - func (mssqlEx *MssqlExtractor) Extract( ctx context.Context, tableInfo config.SourceTableInfo, @@ -113,81 +84,52 @@ func (mssqlEx *MssqlExtractor) Extract( var queryArgs []any if partition.HasRange { - queryArgs = append(queryArgs, - sql.Named("min", partition.Range.Min), - sql.Named("max", partition.Range.Max), - ) + queryArgs = append(queryArgs, sql.Named("min", partition.Range.Min), sql.Named("max", partition.Range.Max)) } - var rowsRead int64 = 0 rows, err := mssqlEx.db.Query(ctx, query, queryArgs...) if err != nil { - return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} + return 0, err } defer rows.Close() batchRows := make([]models.UnknownRowValues, 0, batchSize) + var rowsRead int64 = 0 + + rowValues := make([]any, len(columns)) + scanArgs := make([]any, len(columns)) + for i := range rowValues { + scanArgs[i] = &rowValues[i] + } for rows.Next() { - rowValues := make([]any, len(columns)) - scanArgs := make([]any, len(columns)) - - for i := range rowValues { - scanArgs[i] = &rowValues[i] - } - if err := rows.Scan(scanArgs...); err != nil { if len(batchRows) == 0 { - return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} + return rowsRead, err + } + + if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil { + return rowsRead, err } lastRow := batchRows[len(batchRows)-1] - - select { - case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: - case <-ctx.Done(): - return rowsRead, ctx.Err() - } - - return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err) + return rowsRead, errorFromLastPartitionRow(lastRow, indexPrimaryKey, partition, err) } rowsRead++ batchRows = append(batchRows, rowValues) if len(batchRows) >= batchSize { - select { - case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: - case <-ctx.Done(): - return rowsRead, ctx.Err() + if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil { + return rowsRead, err } - - batchRows = make([]models.UnknownRowValues, 0, batchSize) - } - - } - - if err := rows.Err(); err != nil { - if errors.Is(err, ctx.Err()) { - return rowsRead, ctx.Err() - } - - if len(batchRows) > 0 { - lastRow := batchRows[len(batchRows)-1] - return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err) - } - - return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} - } - - if len(batchRows) > 0 { - select { - case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: - case <-ctx.Done(): - return rowsRead, ctx.Err() } } - return rowsRead, nil + if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil { + return rowsRead, err + } + + return rowsRead, rows.Err() } func (mssqlEx *MssqlExtractor) ExtractWithRetries( @@ -252,7 +194,7 @@ func (mssqlEx *MssqlExtractor) Consume( batchSize int, chPartitionsIn <-chan models.Partition, chBatchesOut chan<- models.Batch, - chJobErrorsOut chan<- custom_errors.JobError, + chErrorsOut chan<- custom_errors.JobError, wgActivePartitions *sync.WaitGroup, rowsRead *int64, ) { @@ -264,12 +206,11 @@ func (mssqlEx *MssqlExtractor) Consume( select { case <-ctx.Done(): return - case chJobErrorsOut <- custom_errors.JobError{ + case chErrorsOut <- custom_errors.JobError{ ShouldCancelJob: true, Msg: "Primary key not found in provided columns", }: } - return } @@ -307,13 +248,13 @@ func (mssqlEx *MssqlExtractor) Consume( select { case <-ctx.Done(): return - case chJobErrorsOut <- *jobError: + case chErrorsOut <- *jobError: } } else { select { case <-ctx.Done(): return - case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: + case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: } }