package extractors import ( "context" "fmt" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" // "github.com/sirupsen/logrus" ) func errorFromLastPartitionRow( lastRow models.UnknownRowValues, indexPrimaryKey int, partition models.Partition, previousError error, ) error { lastIdRawValue := lastRow[indexPrimaryKey] lastId, ok := convert.ToInt64(lastIdRawValue) if !ok { currentPartition := partition currentPartition.RetryCounter = 3 return &custom_errors.ExtractorError{ Partition: currentPartition, HasLastId: true, Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()), } } return &custom_errors.ExtractorError{ Partition: partition, HasLastId: true, LastId: lastId, Msg: previousError.Error(), } } func (ex *GenericExtractor) ProcessPartition( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, batchSize int, partition models.Partition, indexPrimaryKey int, chBatchesOut chan<- models.Batch, fromJsonColumns []config.FromJsonItem, ) (int64, error) { query := dbwrapper.ExtractionQuery{ Schema: tableInfo.Schema, Table: tableInfo.Table, PrimaryKey: tableInfo.PrimaryKey, Columns: columns, LowerLimit: dbwrapper.ExtractorQueryLimit{ IsValid: partition.HasRange && partition.Range.Min > 0, IsInclusive: partition.Range.IsMinInclusive, Value: partition.Range.Min, }, UpperLimit: dbwrapper.ExtractorQueryLimit{ IsValid: partition.HasRange && partition.Range.Max > 0, IsInclusive: partition.Range.IsMaxInclusive, Value: partition.Range.Max, }, FromJsonColumns: fromJsonColumns, } // logrus.Debugf("Processing partition: %+v (%s.%s)", query, tableInfo.Schema, tableInfo.Table) rows, err := ex.db.QueryFromObject(ctx, query) if err != nil { return 0, err } defer rows.Close() batchRows := make([]models.UnknownRowValues, 0, batchSize) var rowsRead int64 = 0 var lastRow models.UnknownRowValues for rows.Next() { rowValues := make([]any, len(columns)) scanArgs := make([]any, len(columns)) for i := range rowValues { scanArgs[i] = &rowValues[i] } if err := rows.Scan(scanArgs...); err != nil { if len(batchRows) == 0 { return rowsRead, err } if err := flush(ctx, batchSize, batchRows, chBatchesOut); err != nil { return rowsRead, err } lastRow := batchRows[len(batchRows)-1] return rowsRead, errorFromLastPartitionRow(lastRow, indexPrimaryKey, partition, err) } rowsRead++ lastRow = rowValues batchRows = append(batchRows, rowValues) if len(batchRows) >= batchSize { // logrus.Debugf("Batch size reached, flushing batch with %v rows (rowsRead=%v)", len(batchRows), rowsRead) if err := flush(ctx, batchSize, batchRows, chBatchesOut); err != nil { // logrus.Warnf("Error flushing rows: %v", err) return rowsRead, err } batchRows = make([]models.UnknownRowValues, 0, batchSize) } } if err := flush(ctx, batchSize, batchRows, chBatchesOut); err != nil { return rowsRead, err } if err := rows.Err(); err != nil { if lastRow != nil { return rowsRead, errorFromLastPartitionRow(lastRow, indexPrimaryKey, partition, err) } return rowsRead, err } return rowsRead, nil }