diff --git a/internal/app/convert/main.go b/internal/app/convert/main.go new file mode 100644 index 0000000..294d0dc --- /dev/null +++ b/internal/app/convert/main.go @@ -0,0 +1,18 @@ +package convert + +func ToInt64(v any) (int64, bool) { + switch t := v.(type) { + case int: + return int64(t), true + case int8: + return int64(t), true + case int16: + return int64(t), true + case int32: + return int64(t), true + case int64: + return int64(t), true + default: + return 0, false + } +} diff --git a/internal/app/custom_errors/extractor.error.go b/internal/app/custom_errors/extractor.error.go new file mode 100644 index 0000000..5aafa24 --- /dev/null +++ b/internal/app/custom_errors/extractor.error.go @@ -0,0 +1,80 @@ +package custom_errors + +import ( + "context" + "fmt" + "sync" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" + "github.com/google/uuid" +) + +type ExtractorError struct { + Batch models.Batch + LastId int64 + HasLastId bool + Msg string +} + +func (e *ExtractorError) Error() string { + return e.Msg +} + +const maxRetryAttempts = 3 + +func ExtractorErrorHandler( + ctx context.Context, + chErrorsIn <-chan ExtractorError, + chBatchesOut chan<- models.Batch, + chJobErrorsOut chan<- JobError, + wgActiveBatches *sync.WaitGroup, +) { + for { + if ctx.Err() != nil { + return + } + + select { + case <-ctx.Done(): + return + + case err, ok := <-chErrorsIn: + if !ok { + return + } + + if err.Batch.RetryCounter >= maxRetryAttempts { + jobError := JobError{ + ShouldCancelJob: false, + Msg: fmt.Sprintf("batch %v reached max retries (%d)", err.Batch.Id, maxRetryAttempts), + Prev: &err, + } + + select { + case chJobErrorsOut <- jobError: + case <-ctx.Done(): + return + } + + wgActiveBatches.Done() + continue + } + + newBatch := err.Batch + newBatch.RetryCounter++ + + if err.HasLastId { + newBatch.ParentId = err.Batch.Id + newBatch.Id = uuid.New() + newBatch.LowerLimit = err.LastId + newBatch.IsLowerLimitInclusive = false + } + + select { + case chBatchesOut <- newBatch: + case <-ctx.Done(): + return + } + } + } +} diff --git a/internal/app/custom_errors/job.error.go b/internal/app/custom_errors/job.error.go new file mode 100644 index 0000000..ca359af --- /dev/null +++ b/internal/app/custom_errors/job.error.go @@ -0,0 +1,47 @@ +package custom_errors + +import ( + "context" + "fmt" + + log "github.com/sirupsen/logrus" +) + +type JobError struct { + ShouldCancelJob bool + Msg string + Prev error +} + +func (e *JobError) Error() string { + if e.Prev != nil { + return fmt.Sprintf("%s: %v", e.Msg, e.Prev) + } + + return e.Msg +} + +func JobErrorHandler(ctx context.Context, chErrorsIn <-chan JobError) error { + for { + if ctx.Err() != nil { + return nil + } + + select { + case <-ctx.Done(): + return nil + + case err, ok := <-chErrorsIn: + if !ok { + return nil + } + + if err.ShouldCancelJob { + log.Error(err.Msg, " - ", err.Prev) + return &err + } + + log.Error(err.Msg, " - ", err.Prev) + } + } +} diff --git a/internal/app/etl/extractor/main.go b/internal/app/etl/extractor/main.go new file mode 100644 index 0000000..0f85cf7 --- /dev/null +++ b/internal/app/etl/extractor/main.go @@ -0,0 +1,36 @@ +package extractor + +import ( + "context" + "sync" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" +) + +type Extractor interface { + ProcessBatch( + ctx context.Context, + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + chunkSize int, + batch models.Batch, + indexPrimaryKey int, + chChunksOut chan<- models.Chunk, + rowsRead *int64, + ) error + + Exec( + ctx context.Context, + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + chunkSize int, + chBatchesIn <-chan models.Batch, + chChunksOut chan<- models.Chunk, + chErrorsOut chan<- custom_errors.ExtractorError, + chJobErrorsOut chan<- custom_errors.JobError, + wgActiveBatches *sync.WaitGroup, + rowsRead *int64, + ) +} diff --git a/internal/app/etl/extractor/mssql.go b/internal/app/etl/extractor/mssql.go new file mode 100644 index 0000000..a2cdc50 --- /dev/null +++ b/internal/app/etl/extractor/mssql.go @@ -0,0 +1,269 @@ +package extractor + +import ( + "context" + "database/sql" + "errors" + "fmt" + "slices" + "strings" + "sync" + "sync/atomic" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" + "github.com/google/uuid" +) + +type MssqlExtractor struct { + db *sql.DB +} + +func NewMssqlExtractor(db *sql.DB) *MssqlExtractor { + return &MssqlExtractor{db: db} +} + +func buildExtractQueryMssql( + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + includeRange bool, + isMinInclusive bool, +) string { + var sbQuery strings.Builder + + sbQuery.WriteString("SELECT ") + + if len(columns) == 0 { + sbQuery.WriteString("*") + } else { + for i, col := range columns { + fmt.Fprintf(&sbQuery, "[%s]", col.Name()) + + if col.Type() == "GEOMETRY" { + fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name()) + } + + if i < len(columns)-1 { + sbQuery.WriteString(", ") + } + } + } + + fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", tableInfo.Schema, tableInfo.Table) + + if includeRange { + fmt.Fprintf(&sbQuery, " WHERE [%s]", tableInfo.PrimaryKey) + if isMinInclusive { + sbQuery.WriteString(" >=") + } else { + sbQuery.WriteString(" >") + } + + fmt.Fprintf(&sbQuery, " @min AND [%s] <= @max", tableInfo.PrimaryKey) + } + + fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", tableInfo.PrimaryKey) + + return sbQuery.String() +} + +func extractorErrorFromLastRowMssql( + lastRow models.UnknownRowValues, + indexPrimaryKey int, + batch *models.Batch, + previousError error, +) *custom_errors.ExtractorError { + lastIdRawValue := lastRow[indexPrimaryKey] + + lastId, ok := convert.ToInt64(lastIdRawValue) + if !ok { + currentBatch := *batch + currentBatch.RetryCounter = 3 + return &custom_errors.ExtractorError{ + Batch: currentBatch, + HasLastId: true, + Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()), + } + + } + + return &custom_errors.ExtractorError{ + Batch: *batch, + HasLastId: true, + LastId: lastId, + Msg: previousError.Error(), + } +} + +func (mssqlEx *MssqlExtractor) ProcessBatch( + ctx context.Context, + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + chunkSize int, + batch models.Batch, + indexPrimaryKey int, + chChunksOut chan<- models.Chunk, + rowsRead *int64, +) error { + query := buildExtractQueryMssql(tableInfo, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive) + + var queryArgs []any + if batch.ShouldUseRange { + queryArgs = append(queryArgs, + sql.Named("min", batch.LowerLimit), + sql.Named("max", batch.UpperLimit), + ) + } + + rows, err := mssqlEx.db.QueryContext(ctx, query, queryArgs...) + if err != nil { + return &custom_errors.ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()} + } + defer rows.Close() + + rowsChunk := make([]models.UnknownRowValues, 0, chunkSize) + + for rows.Next() { + values := make([]any, len(columns)) + scanArgs := make([]any, len(columns)) + + for i := range values { + scanArgs[i] = &values[i] + } + + if err := rows.Scan(scanArgs...); err != nil { + if len(rowsChunk) == 0 { + return &custom_errors.ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()} + } + + lastRow := rowsChunk[len(rowsChunk)-1] + + select { + case chChunksOut <- models.Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}: + case <-ctx.Done(): + return nil + } + + atomic.AddInt64(rowsRead, int64(len(rowsChunk))) + + return extractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err) + } + + rowsChunk = append(rowsChunk, values) + + if len(rowsChunk) >= chunkSize { + select { + case chChunksOut <- models.Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}: + case <-ctx.Done(): + return nil + } + + atomic.AddInt64(rowsRead, int64(len(rowsChunk))) + rowsChunk = make([]models.UnknownRowValues, 0, chunkSize) + } + } + + if err := rows.Err(); err != nil { + if errors.Is(err, ctx.Err()) { + return ctx.Err() + } + + if len(rowsChunk) == 0 { + return &custom_errors.ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()} + } + + lastRow := rowsChunk[len(rowsChunk)-1] + return extractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err) + } + + if len(rowsChunk) > 0 { + select { + case chChunksOut <- models.Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}: + case <-ctx.Done(): + return nil + } + + atomic.AddInt64(rowsRead, int64(len(rowsChunk))) + } + + return nil +} + +func (mssqlEx *MssqlExtractor) Exec( + ctx context.Context, + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + chunkSize int, + chBatchesIn <-chan models.Batch, + chChunksOut chan<- models.Chunk, + chErrorsOut chan<- custom_errors.ExtractorError, + chJobErrorsOut chan<- custom_errors.JobError, + wgActiveBatches *sync.WaitGroup, + rowsRead *int64, +) { + indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool { + return strings.EqualFold(col.Name(), tableInfo.PrimaryKey) + }) + + if indexPrimaryKey == -1 { + select { + case <-ctx.Done(): + return + case chJobErrorsOut <- custom_errors.JobError{ + ShouldCancelJob: true, + Msg: "Primary key not found in provided columns", + }: + } + + return + } + + for { + if ctx.Err() != nil { + return + } + + select { + case <-ctx.Done(): + return + case batch, ok := <-chBatchesIn: + if !ok { + return + } + + err := mssqlEx.ProcessBatch( + ctx, + tableInfo, + columns, + chunkSize, + batch, + indexPrimaryKey, + chChunksOut, + rowsRead, + ) + + if err != nil { + var exError *custom_errors.ExtractorError + if errors.As(err, &exError) { + select { + case <-ctx.Done(): + return + case chErrorsOut <- *exError: + } + } + + select { + case <-ctx.Done(): + return + case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Prev: err}: + } + + return + } + + wgActiveBatches.Done() + } + } +} diff --git a/internal/app/etl/extractor/postgres.go b/internal/app/etl/extractor/postgres.go new file mode 100644 index 0000000..86f0a2b --- /dev/null +++ b/internal/app/etl/extractor/postgres.go @@ -0,0 +1,127 @@ +package extractor + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "sync/atomic" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgxpool" +) + +type PostgresExtractor struct { + db *pgxpool.Pool +} + +func NewPostgresExtractor(pool *pgxpool.Pool) *PostgresExtractor { + return &PostgresExtractor{db: pool} +} + +func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []models.ColumnType) string { + var sbColumns strings.Builder + + if len(columns) == 0 { + sbColumns.WriteString("*") + } else { + for i, col := range columns { + if col.Type() == "GEOMETRY" { + sbColumns.WriteString(`ST_AsEWKB("`) + sbColumns.WriteString(col.Name()) + sbColumns.WriteString(`") AS "`) + sbColumns.WriteString(col.Name()) + sbColumns.WriteString(`"`) + } else { + sbColumns.WriteString(`"`) + sbColumns.WriteString(col.Name()) + sbColumns.WriteString(`"`) + } + + if i < len(columns)-1 { + sbColumns.WriteString(", ") + } + } + } + + return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey) +} + +func (postgresEx *PostgresExtractor) ProcessBatch( + ctx context.Context, + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + chunkSize int, + batch models.Batch, + indexPrimaryKey int, + chChunksOut chan<- models.Chunk, + rowsRead *int64, +) error { + query := buildExtractQueryPostgres(tableInfo, columns) + + if batch.ShouldUseRange { + return errors.New("Batch config not yet supported") + } + + rows, err := postgresEx.db.Query(ctx, query) + if err != nil { + return &custom_errors.ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()} + } + defer rows.Close() + + rowsChunk := make([]models.UnknownRowValues, 0, chunkSize) + + for rows.Next() { + values, err := rows.Values() + if err != nil { + return errors.New("Unexpected error reading rows from source") + } + + rowsChunk = append(rowsChunk, values) + + if len(rowsChunk) >= chunkSize { + select { + case chChunksOut <- models.Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}: + case <-ctx.Done(): + return nil + } + + atomic.AddInt64(rowsRead, int64(len(rowsChunk))) + rowsChunk = make([]models.UnknownRowValues, 0, chunkSize) + } + } + + if err := rows.Err(); err != nil { + return errors.New("Unexpected error reading rows from source") + } + + if len(rowsChunk) > 0 { + select { + case chChunksOut <- models.Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}: + case <-ctx.Done(): + return nil + } + + atomic.AddInt64(rowsRead, int64(len(rowsChunk))) + } + + return nil +} + +func (postgresEx *PostgresExtractor) Exec( + ctx context.Context, + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + chunkSize int, + chBatchesIn <-chan models.Batch, + chChunksOut chan<- models.Chunk, + chErrorsOut chan<- custom_errors.ExtractorError, + chJobErrorsOut chan<- custom_errors.JobError, + wgActiveBatches *sync.WaitGroup, + rowsRead *int64, +) { +} diff --git a/internal/app/models/colum-type.go b/internal/app/models/colum-type.go new file mode 100644 index 0000000..b0b7bbd --- /dev/null +++ b/internal/app/models/colum-type.go @@ -0,0 +1,44 @@ +package models + +type ColumnType struct { + name string + + hasMaxLength bool + hasPrecisionScale bool + + userType string + systemType string + unifiedType string + nullable bool + maxLength int64 + precision int64 + scale int64 +} + +func (c *ColumnType) Name() string { + return c.name +} + +func (c *ColumnType) UserType() string { + return c.userType +} + +func (c *ColumnType) SystemType() string { + return c.systemType +} + +func (c *ColumnType) Length() (length int64, ok bool) { + return c.maxLength, c.hasMaxLength +} + +func (c *ColumnType) DecimalSize() (precision, scale int64, ok bool) { + return c.precision, c.scale, c.hasPrecisionScale +} + +func (c *ColumnType) Nullable() bool { + return c.nullable +} + +func (c *ColumnType) Type() string { + return c.unifiedType +} diff --git a/internal/app/models/main.go b/internal/app/models/main.go new file mode 100644 index 0000000..81b1ffc --- /dev/null +++ b/internal/app/models/main.go @@ -0,0 +1,22 @@ +package models + +import "github.com/google/uuid" + +type UnknownRowValues = []any + +type Chunk struct { + Id uuid.UUID + BatchId uuid.UUID + Data []UnknownRowValues + RetryCounter int +} + +type Batch struct { + Id uuid.UUID + ParentId uuid.UUID + LowerLimit int64 + UpperLimit int64 + IsLowerLimitInclusive bool + ShouldUseRange bool + RetryCounter int +}