diff --git a/cmd/go_migrate/loader.go b/cmd/go_migrate/loader.go deleted file mode 100644 index 54a9957..0000000 --- a/cmd/go_migrate/loader.go +++ /dev/null @@ -1,197 +0,0 @@ -package main - -import ( - "context" - "database/sql" - "errors" - "fmt" - "sync" - "sync/atomic" - "time" - - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgconn" - "github.com/jackc/pgx/v5/pgxpool" - mssql "github.com/microsoft/go-mssqldb" - log "github.com/sirupsen/logrus" -) - -func loadRowsPostgres( - ctx context.Context, - db *pgxpool.Pool, - tableInfo config.TargetTableInfo, - columns []models.ColumnType, - chChunksIn <-chan models.Chunk, - chErrorsOut chan<- custom_errors.LoaderError, - chJobErrorsOut chan<- custom_errors.JobError, - wgActiveChunks *sync.WaitGroup, - rowsLoaded *int64, -) { - tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table} - colNames := Map(columns, func(col models.ColumnType) string { - return col.Name() - }) - - for { - if ctx.Err() != nil { - return - } - - select { - case <-ctx.Done(): - return - case chunk, ok := <-chChunksIn: - if !ok { - return - } - - if abort := loadChunkPostgres(ctx, db, tableId, colNames, chunk, chErrorsOut, chJobErrorsOut, wgActiveChunks, rowsLoaded); abort { - return - } - } - } -} - -func loadChunkPostgres( - ctx context.Context, - db *pgxpool.Pool, - identifier pgx.Identifier, - colNames []string, - chunk models.Chunk, - chErrorsOut chan<- custom_errors.LoaderError, - chJobErrorsOut chan<- custom_errors.JobError, - wgActiveChunks *sync.WaitGroup, - rowsLoaded *int64, -) (abort bool) { - chunkStartTime := time.Now() - _, err := db.CopyFrom( - ctx, - identifier, - colNames, - pgx.CopyFromRows(chunk.Data), - ) - - if err != nil { - var pgErr *pgconn.PgError - if errors.As(err, &pgErr) { - if pgErr.Code == "23505" { - select { - case chJobErrorsOut <- custom_errors.JobError{ - ShouldCancelJob: true, - Msg: fmt.Sprintf("Fatal error in table %s", identifier.Sanitize()), - Prev: err, - }: - case <-ctx.Done(): - } - wgActiveChunks.Done() - return true - } - } - - select { - case chErrorsOut <- custom_errors.LoaderError{Chunk: chunk, Msg: err.Error()}: - case <-ctx.Done(): - return true - } - return false - } - - chunkDuration := time.Since(chunkStartTime) - rowsPerSec := float64(len(chunk.Data)) / chunkDuration.Seconds() - - log.Infof("Loaded chunk: %d rows in %v (%.0f rows/sec)", len(chunk.Data), chunkDuration, rowsPerSec) - - atomic.AddInt64(rowsLoaded, int64(len(chunk.Data))) - wgActiveChunks.Done() - return false -} - -func loadRowsMssql(ctx context.Context, tableInfo config.TargetTableInfo, columns []ColumnType, db *sql.DB, in <-chan []models.UnknownRowValues) error { - chunkCount := 0 - totalRowsLoaded := 0 - - for rows := range in { - chunkStartTime := time.Now() - - tx, err := db.BeginTx(ctx, nil) - if err != nil { - return fmt.Errorf("error starting transaction: %w", err) - } - - fullTableName := fmt.Sprintf("[%s].[%s]", tableInfo.Schema, tableInfo.Table) - colNames := Map(columns, func(col ColumnType) string { - return col.name - }) - - stmt, err := tx.PrepareContext(ctx, mssql.CopyIn(fullTableName, mssql.BulkOptions{}, colNames...)) - if err != nil { - tx.Rollback() - return fmt.Errorf("error preparing bulk copy statement: %w", err) - } - - copyStartTime := time.Now() - - for _, row := range rows { - _, err = stmt.ExecContext(ctx, row...) - if err != nil { - stmt.Close() - tx.Rollback() - return fmt.Errorf("error executing row insert: %w", err) - } - } - - result, err := stmt.ExecContext(ctx) - if err != nil { - stmt.Close() - tx.Rollback() - return fmt.Errorf("error flushing bulk data: %w", err) - } - - err = stmt.Close() - if err != nil { - tx.Rollback() - return fmt.Errorf("error closing statement: %w", err) - } - - if err := tx.Commit(); err != nil { - return fmt.Errorf("error committing transaction: %w", err) - } - - rowsAffected, _ := result.RowsAffected() - chunkCount++ - totalRowsLoaded += int(rowsAffected) - - copyDuration := time.Since(copyStartTime) - chunkDuration := time.Since(chunkStartTime) - rowsPerSec := float64(len(rows)) / chunkDuration.Seconds() - - log.Infof("Loaded chunk #%d (MSSQL): %d rows in %v (copy: %v, %.0f rows/sec) - Total: %d rows", chunkCount, len(rows), chunkDuration, copyDuration, rowsPerSec, totalRowsLoaded) - } - - return nil -} - -func Map[T any, V any](input []T, mapper func(T) V) []V { - result := make([]V, len(input)) - - for i, v := range input { - result[i] = mapper(v) - } - - return result -} - -func fakeLoader(tableInfo config.TargetTableInfo, columns []ColumnType, in <-chan [][]any) { - for rows := range in { - log.Debugf("Chunk received, loading data into...") - - for i, rowValues := range rows { - if i%100 == 0 { - logSampleRow(tableInfo.Schema, tableInfo.Table, columns, rowValues, fmt.Sprintf("row %d", i)) - } - } - } -} diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index f22f186..55691dc 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -10,6 +10,7 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractor" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loader" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformer" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/jackc/pgx/v5/pgxpool" @@ -63,6 +64,7 @@ func processMigrationJob( mssqlExtractor := extractor.NewMssqlExtractor(sourceDb) mssqlToPostgresTransformer := transformer.NewMssqlTransformer() + postgresLoader := loader.NewPostgresLoader(targetDb) go func() { if err := custom_errors.JobErrorHandler(jobCtx, chJobErrors); err != nil { @@ -120,7 +122,16 @@ func processMigrationJob( for range job.MaxLoaders { wgLoaders.Go(func() { - loadRowsPostgres(jobCtx, targetDb, job.TargetTable, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks, &rowsLoaded) + postgresLoader.Exec( + jobCtx, + job.TargetTable, + targetColTypes, + chChunksTransformed, + chLoadersErrors, + chJobErrors, + &wgActiveChunks, + &rowsLoaded, + ) }) } diff --git a/internal/app/etl/extractor/mssql.go b/internal/app/etl/extractor/mssql.go index a2cdc50..40da1de 100644 --- a/internal/app/etl/extractor/mssql.go +++ b/internal/app/etl/extractor/mssql.go @@ -254,10 +254,13 @@ func (mssqlEx *MssqlExtractor) Exec( } } - select { - case <-ctx.Done(): - return - case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Prev: err}: + var jobError *custom_errors.JobError + if errors.As(err, &jobError) { + select { + case <-ctx.Done(): + return + case chJobErrorsOut <- *jobError: + } } return diff --git a/internal/app/etl/loader/postgres.go b/internal/app/etl/loader/postgres.go new file mode 100644 index 0000000..4cdc1ac --- /dev/null +++ b/internal/app/etl/loader/postgres.go @@ -0,0 +1,123 @@ +package loader + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgxpool" +) + +type PostgresLoader struct { + db *pgxpool.Pool +} + +func NewPostgresLoader(pool *pgxpool.Pool) Loader { + return &PostgresLoader{db: pool} +} + +func mapSlice[T any, V any](input []T, mapper func(T) V) []V { + result := make([]V, len(input)) + + for i, v := range input { + result[i] = mapper(v) + } + + return result +} + +func (postgresLd *PostgresLoader) ProcessChunk( + ctx context.Context, + tableInfo config.TargetTableInfo, + colNames []string, + chunk models.Chunk, +) (int, error) { + tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table} + _, err := postgresLd.db.CopyFrom( + ctx, + tableId, + colNames, + pgx.CopyFromRows(chunk.Data), + ) + + if err != nil { + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) { + if pgErr.Code == "23505" { + return 0, &custom_errors.JobError{ + ShouldCancelJob: true, + Msg: fmt.Sprintf("Fatal error in table %s", tableId.Sanitize()), + Prev: err, + } + } + } + + return 0, &custom_errors.LoaderError{Chunk: chunk, Msg: err.Error()} + } + + return len(chunk.Data), nil +} + +func (postgresLd *PostgresLoader) Exec( + ctx context.Context, + tableInfo config.TargetTableInfo, + columns []models.ColumnType, + chChunksIn <-chan models.Chunk, + chErrorsOut chan<- custom_errors.LoaderError, + chJobErrorsOut chan<- custom_errors.JobError, + wgActiveChunks *sync.WaitGroup, + rowsLoaded *int64, +) { + colNames := mapSlice(columns, func(col models.ColumnType) string { + return col.Name() + }) + + for { + if ctx.Err() != nil { + return + } + + select { + case <-ctx.Done(): + return + case chunk, ok := <-chChunksIn: + if !ok { + return + } + + processedRows, err := postgresLd.ProcessChunk(ctx, tableInfo, colNames, chunk) + + if err != nil { + var ldError *custom_errors.LoaderError + if errors.As(err, &ldError) { + select { + case <-ctx.Done(): + return + case chErrorsOut <- *ldError: + } + } + + var jobError *custom_errors.JobError + if errors.As(err, &jobError) { + select { + case <-ctx.Done(): + return + case chJobErrorsOut <- *jobError: + } + } + + return + } + + wgActiveChunks.Done() + atomic.AddInt64(rowsLoaded, int64(processedRows)) + } + } +} diff --git a/internal/app/etl/loader/types.go b/internal/app/etl/loader/types.go new file mode 100644 index 0000000..d2bff17 --- /dev/null +++ b/internal/app/etl/loader/types.go @@ -0,0 +1,30 @@ +package loader + +import ( + "context" + "sync" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" +) + +type Loader interface { + ProcessChunk( + ctx context.Context, + tableInfo config.TargetTableInfo, + colNames []string, + chunk models.Chunk, + ) (int, error) + + Exec( + ctx context.Context, + tableInfo config.TargetTableInfo, + columns []models.ColumnType, + chChunksIn <-chan models.Chunk, + chErrorsOut chan<- custom_errors.LoaderError, + chJobErrorsOut chan<- custom_errors.JobError, + wgActiveChunks *sync.WaitGroup, + rowsLoaded *int64, + ) +}