feat: refactor extractor interface and implement Consume function for ETL process

This commit is contained in:
2026-04-20 00:19:41 -05:00
parent b4a846575d
commit 16c232762e
6 changed files with 108 additions and 123 deletions

View File

@@ -11,6 +11,7 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@@ -129,8 +130,9 @@ func processMigrationJob(
for range maxExtractors { for range maxExtractors {
wgExtractors.Go(func() { wgExtractors.Go(func() {
extractor.Exec( extractors.Consume(
localCtx, localCtx,
extractor,
job.SourceTable, job.SourceTable,
sourceColTypes, sourceColTypes,
job.BatchSize, job.BatchSize,

2
go.mod
View File

@@ -1,6 +1,6 @@
module git.ksdemosapps.com/kylesoda/go-migrate module git.ksdemosapps.com/kylesoda/go-migrate
go 1.25.7 go 1.26
require ( require (
github.com/gaspardle/go-mssqlclrgeo v0.0.0-20160129143314-97ceabf987a4 github.com/gaspardle/go-mssqlclrgeo v0.0.0-20160129143314-97ceabf987a4

View File

@@ -0,0 +1,101 @@
package extractors
import (
"context"
"errors"
"slices"
"strings"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
func Consume(
ctx context.Context,
extractor etl.Extractor,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
})
if indexPrimaryKey == -1 {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- custom_errors.JobError{
ShouldCancelJob: true,
Msg: "Primary key not found in provided columns",
}:
}
return
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case partition, ok := <-chPartitionsIn:
if !ok {
return
}
rowsReadResult, err := extractor.Exec(
ctx,
tableInfo,
columns,
batchSize,
partition,
indexPrimaryKey,
chBatchesOut,
)
if rowsReadResult > 0 {
atomic.AddInt64(rowsRead, int64(rowsReadResult))
}
if err != nil {
if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok {
select {
case <-ctx.Done():
return
case chErrorsOut <- *exError:
}
} else if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}:
}
}
continue
}
wgActivePartitions.Done()
}
}
}

View File

@@ -5,10 +5,7 @@ import (
"database/sql" "database/sql"
"errors" "errors"
"fmt" "fmt"
"slices"
"strings" "strings"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
@@ -99,7 +96,7 @@ func errorFromLastRow(
} }
} }
func (mssqlEx *MssqlExtractor) ProcessPartition( func (mssqlEx *MssqlExtractor) Exec(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
@@ -188,90 +185,3 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
return rowsRead, nil return rowsRead, nil
} }
func (mssqlEx *MssqlExtractor) Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
})
if indexPrimaryKey == -1 {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- custom_errors.JobError{
ShouldCancelJob: true,
Msg: "Primary key not found in provided columns",
}:
}
return
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case partition, ok := <-chPartitionsIn:
if !ok {
return
}
rowsReadResult, err := mssqlEx.ProcessPartition(
ctx,
tableInfo,
columns,
batchSize,
partition,
indexPrimaryKey,
chBatchesOut,
)
if rowsReadResult > 0 {
atomic.AddInt64(rowsRead, int64(rowsReadResult))
}
if err != nil {
var exError *custom_errors.ExtractorError
var jobError *custom_errors.JobError
if errors.As(err, &exError) {
select {
case <-ctx.Done():
return
case chErrorsOut <- *exError:
}
} else if errors.As(err, &jobError) {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}:
}
}
continue
}
wgActivePartitions.Done()
}
}
}

View File

@@ -5,7 +5,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
@@ -51,7 +50,7 @@ func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []mo
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey) return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey)
} }
func (postgresEx *PostgresExtractor) ProcessPartition( func (postgresEx *PostgresExtractor) Exec(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
@@ -109,17 +108,3 @@ func (postgresEx *PostgresExtractor) ProcessPartition(
return rowsRead, nil return rowsRead, nil
} }
func (postgresEx *PostgresExtractor) Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
}

View File

@@ -10,7 +10,7 @@ import (
) )
type Extractor interface { type Extractor interface {
ProcessPartition( Exec(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
@@ -19,19 +19,6 @@ type Extractor interface {
indexPrimaryKey int, indexPrimaryKey int,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
) (int, error) ) (int, error)
Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
)
} }
type TransformerFunc func(any) (any, error) type TransformerFunc func(any) (any, error)