refactor: enhance logging and batch processing in migration; adjust configuration parameters for improved performance

This commit is contained in:
2026-05-09 01:16:34 -05:00
parent 68d983ea57
commit b690e580c5
8 changed files with 229 additions and 112 deletions

View File

@@ -86,7 +86,7 @@ func main() {
log.Infof("Migración terminada. Tablas: %d, Errores: %d, Filas totales: %d", len(results), totalErrors, totalProcessed) log.Infof("Migración terminada. Tablas: %d, Errores: %d, Filas totales: %d", len(results), totalErrors, totalProcessed)
totalDuration := time.Since(startTime) totalDuration := time.Since(startTime)
log.Infof("=== Migration completed successfully! ===") // log.Infof("=== Migration completed successfully! ===")
log.Infof("Total migration time: %v", totalDuration) log.Infof("Total migration time: %v", totalDuration)
} }

View File

@@ -129,7 +129,7 @@ func processMigrationJob(
}() }()
maxExtractors := min(job.MaxExtractors, len(partitions)) maxExtractors := min(job.MaxExtractors, len(partitions))
log.Infof("Starting %d extractor(s)...", maxExtractors) log.Infof("Starting %d extractor(s)... (%v)", maxExtractors, job.Name)
for range maxExtractors { for range maxExtractors {
wgExtractors.Go(func() { wgExtractors.Go(func() {
@@ -157,7 +157,7 @@ func processMigrationJob(
} }
}() }()
log.Infof("Starting %d transformer(s)...", maxExtractors) log.Infof("Starting %d transformer(s)... (%v)", maxExtractors, job.Name)
for range maxExtractors { for range maxExtractors {
wgTransformers.Go(func() { wgTransformers.Go(func() {
@@ -165,6 +165,7 @@ func processMigrationJob(
localCtx, localCtx,
sourceColTypes, sourceColTypes,
job.Retry, job.Retry,
job.TransformerBatchSize,
chBatchesRaw, chBatchesRaw,
chBatchesTransformed, chBatchesTransformed,
chJobErrors, chJobErrors,
@@ -173,7 +174,7 @@ func processMigrationJob(
}) })
} }
log.Infof("Starting %d loader(s)...", job.MaxLoaders) log.Infof("Starting %d loader(s)... (%v)", job.MaxLoaders, job.Name)
for range job.MaxLoaders { for range job.MaxLoaders {
wgLoaders.Go(func() { wgLoaders.Go(func() {
@@ -182,6 +183,7 @@ func processMigrationJob(
job.TargetTable, job.TargetTable,
targetColTypes, targetColTypes,
job.Retry, job.Retry,
job.LoaderBatchSize,
chBatchesTransformed, chBatchesTransformed,
chJobErrors, chJobErrors,
&wgActiveBatches, &wgActiveBatches,
@@ -192,28 +194,28 @@ func processMigrationJob(
} }
go func() { go func() {
log.Debugf("Waiting for goroutines (%v)", job.Name) // log.Debugf("Waiting for goroutines (%v)", job.Name)
wgActivePartitions.Wait() wgActivePartitions.Wait()
log.Debugf("wgActivePartitions is empty (%v)", job.Name) // log.Debugf("wgActivePartitions is empty (%v)", job.Name)
close(chPartitions) close(chPartitions)
log.Debugf("chPartitions is closed (%v)", job.Name) // log.Debugf("chPartitions is closed (%v)", job.Name)
wgExtractors.Wait() wgExtractors.Wait()
log.Debugf("wgExtractors is empty (%v)", job.Name) // log.Debugf("wgExtractors is empty (%v)", job.Name)
close(chBatchesRaw) close(chBatchesRaw)
log.Debugf("chBatchesRaw is closed (%v)", job.Name) // log.Debugf("chBatchesRaw is closed (%v)", job.Name)
wgTransformers.Wait() wgTransformers.Wait()
log.Debugf("wgTransformers is empty (%v)", job.Name) // log.Debugf("wgTransformers is empty (%v)", job.Name)
close(chBatchesTransformed)
// log.Debugf("chBatchesTransformed is closed (%v)", job.Name)
wgActiveBatches.Wait() wgActiveBatches.Wait()
log.Debugf("wgActiveBatches is empty (%v)", job.Name) // log.Debugf("wgActiveBatches is empty (%v)", job.Name)
close(chBatchesTransformed)
log.Debugf("chBatchesTransformed is empty (%v)", job.Name)
wgLoaders.Wait() wgLoaders.Wait()
log.Debugf("wgLoaders is empty (%v)", job.Name) // log.Debugf("wgLoaders is empty (%v)", job.Name)
cancel() cancel()
}() }()
@@ -225,9 +227,9 @@ func processMigrationJob(
} }
} }
log.Debugf("waiting for local context to be done (%v)", job.Name) // log.Debugf("waiting for local context to be done (%v)", job.Name)
<-localCtx.Done() <-localCtx.Done()
log.Debugf("local context done (%v)", job.Name) // log.Debugf("local context done (%v)", job.Name)
if ctx.Err() != nil { if ctx.Err() != nil {
result.Error = ctx.Err() result.Error = ctx.Err()
@@ -242,5 +244,9 @@ func processMigrationJob(
result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed) result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed)
} }
if result.RowsRead == 0 {
log.Warnf("No rows extracted from (%v)", job.Name)
}
return result return result
} }

View File

@@ -3,12 +3,12 @@ source_db_type: sqlserver
target_db_type: postgres target_db_type: postgres
defaults: defaults:
batches_per_partition: 8 batches_per_partition: 4
max_extractors: 2 max_extractors: 2
extractor_batch_size: 25000 extractor_batch_size: 5000
extractor_queue_size: 8 extractor_queue_size: 8
max_transformers: 2 max_transformers: 2
transformer_batch_size: 25000 transformer_batch_size: 12500
transformer_queue_size: 8 transformer_queue_size: 8
max_loaders: 4 max_loaders: 4
loader_batch_size: 25000 loader_batch_size: 25000
@@ -34,11 +34,11 @@ jobs:
table: MANZANA table: MANZANA
pre_sql: pre_sql:
- 'SELECT 1' - 'SELECT 1'
# range: range:
# min: 1000000 min: 1000000
# max: 2000000 max: 2000000
# is_min_inclusive: false is_min_inclusive: false
# is_max_inclusive: true is_max_inclusive: true
- name: red_puerto - name: red_puerto
enabled: true enabled: true
@@ -57,29 +57,29 @@ jobs:
post_sql: post_sql:
- "SELECT 1" - "SELECT 1"
# - name: infraestructura_site_holder__attach - name: infraestructura_site_holder__attach
# source: source:
# schema: Infraestructura schema: Infraestructura
# table: SITE_HOLDER__ATTACH table: SITE_HOLDER__ATTACH
# primary_key: GDB_ARCHIVE_OID primary_key: GDB_ARCHIVE_OID
# target: target:
# schema: Infraestructura schema: Infraestructura
# table: SITE_HOLDER__ATTACH table: SITE_HOLDER__ATTACH
# to_storage: to_storage:
# columns: columns:
# - source: DATA - source: DATA
# target: FILE_URL target: FILE_URL
# mode: REFERENCE_ONLY mode: REFERENCE_ONLY
# batches_per_partition: 10000 batches_per_partition: 10000
# max_extractors: 8 max_extractors: 8
# extractor_queue_size: 32 extractor_queue_size: 32
# extractor_batch_size: 1 extractor_batch_size: 1
# max_transformers: 16 max_transformers: 16
# transformer_batch_size: 20000 transformer_batch_size: 20000
# transformer_queue_size: 8 transformer_queue_size: 8
# max_loaders: 4 max_loaders: 4
# retry: retry:
# attempts: 5 attempts: 5
# base_delay_ms: 1000 base_delay_ms: 1000
# max_delay_ms: 15000 max_delay_ms: 15000
# max_jitter_ms: 500 max_jitter_ms: 500

View File

@@ -10,7 +10,6 @@ import (
dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects" dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
mssql "github.com/microsoft/go-mssqldb" mssql "github.com/microsoft/go-mssqldb"
"github.com/sirupsen/logrus"
) )
func init() { func init() {
@@ -188,8 +187,6 @@ func buildExtractQueryMssql(q ExtractionQuery) (string, error) {
hasRegularColumns := len(q.Columns) > 0 hasRegularColumns := len(q.Columns) > 0
hasJsonColumns := len(q.FromJsonColumns) > 0 hasJsonColumns := len(q.FromJsonColumns) > 0
// logrus.Debugf("Extraction query: %+v", q)
resolvedJson := make(map[string][]config.FromJsonItem, len(q.FromJsonColumns)) resolvedJson := make(map[string][]config.FromJsonItem, len(q.FromJsonColumns))
if hasJsonColumns { if hasJsonColumns {
for _, jsonConfig := range q.FromJsonColumns { for _, jsonConfig := range q.FromJsonColumns {
@@ -296,7 +293,7 @@ func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery
return nil, err return nil, err
} }
logrus.Debugf("Query: %s", queryString) // logrus.Debugf("Query: %s", queryString)
var queryArgs []any var queryArgs []any

View File

@@ -9,6 +9,8 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
) )
func (gl *GenericLoader) Consume( func (gl *GenericLoader) Consume(
@@ -16,6 +18,7 @@ func (gl *GenericLoader) Consume(
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
retryConfig config.RetryConfig, retryConfig config.RetryConfig,
batchSize int,
chBatchesIn <-chan models.Batch, chBatchesIn <-chan models.Batch,
chErrorsOut chan<- custom_errors.JobError, chErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup, wgActiveBatches *sync.WaitGroup,
@@ -26,6 +29,66 @@ func (gl *GenericLoader) Consume(
return col.Name() return col.Name()
}) })
var accRows []models.UnknownRowValues
var parentBatchesId []uuid.UUID
pendingDone := 0
defer func() {
for range pendingDone {
wgActiveBatches.Done()
}
}()
flush := func() bool {
if len(accRows) == 0 {
return true
}
count := len(parentBatchesId)
superBatch := models.Batch{
Id: uuid.New(),
ParentBatchesId: parentBatchesId,
Rows: accRows,
}
processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, superBatch)
for range count {
wgActiveBatches.Done()
}
pendingDone -= count
accRows = nil
parentBatchesId = nil
if err != nil {
atomic.AddInt32(failedBatchesCount, 1)
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select {
case <-ctx.Done():
return false
case chErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return false
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
}
}
if atomic.LoadInt32(failedBatchesCount) > int32(retryConfig.MaxFailedBatchesLoad) {
select {
case <-ctx.Done():
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}:
}
return false
}
return true
}
current := atomic.LoadInt64(rowsLoaded)
logrus.Debugf("Rows loaded: +%v [current=%v] (%s.%s)", processedRows, current, tableInfo.Schema, tableInfo.Table)
atomic.AddInt64(rowsLoaded, int64(processedRows))
return true
}
for { for {
if ctx.Err() != nil { if ctx.Err() != nil {
return return
@@ -36,42 +99,56 @@ func (gl *GenericLoader) Consume(
return return
case batch, ok := <-chBatchesIn: case batch, ok := <-chBatchesIn:
if !ok { if !ok {
flush()
return return
} }
processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, batch) if batchSize <= 0 {
wgActiveBatches.Done() processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, batch)
wgActiveBatches.Done()
if err != nil { if err != nil {
atomic.AddInt32(failedBatchesCount, 1) atomic.AddInt32(failedBatchesCount, 1)
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case chErrorsOut <- *jobError: case chErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
}
} }
} else {
select { if atomic.LoadInt32(failedBatchesCount) > int32(retryConfig.MaxFailedBatchesLoad) {
case <-ctx.Done(): select {
return case <-ctx.Done():
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: return
} case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}:
} return
}
currentFBCount := atomic.LoadInt32(failedBatchesCount)
if currentFBCount > int32(retryConfig.MaxFailedBatchesLoad) {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}:
return
} }
continue
} }
current := atomic.LoadInt64(rowsLoaded)
logrus.Debugf("Rows loaded: +%v [current=%v] (%s.%s)", processedRows, current, tableInfo.Schema, tableInfo.Table)
atomic.AddInt64(rowsLoaded, int64(processedRows))
continue continue
} }
atomic.AddInt64(rowsLoaded, int64(processedRows)) pendingDone++
accRows = append(accRows, batch.Rows...)
parentBatchesId = append(parentBatchesId, batch.Id)
if len(accRows) >= batchSize {
if !flush() {
return
}
}
} }
} }
} }

View File

@@ -8,12 +8,14 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
) )
func (mssqlTr *MssqlTransformer) Consume( func (mssqlTr *MssqlTransformer) Consume(
ctx context.Context, ctx context.Context,
columns []models.ColumnType, columns []models.ColumnType,
retryConfig config.RetryConfig, retryConfig config.RetryConfig,
batchSize int,
chBatchesIn <-chan models.Batch, chBatchesIn <-chan models.Batch,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- custom_errors.JobError, chJobErrorsOut chan<- custom_errors.JobError,
@@ -23,6 +25,32 @@ func (mssqlTr *MssqlTransformer) Consume(
storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable) storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable)
transformationPlan = append(transformationPlan, storagePlan...) transformationPlan = append(transformationPlan, storagePlan...)
var accRows []models.UnknownRowValues
var parentBatchesId []uuid.UUID
var firstPartitionId uuid.UUID
flush := func() bool {
if len(accRows) == 0 {
return true
}
out := models.Batch{
Id: uuid.New(),
PartitionId: firstPartitionId,
ParentBatchesId: parentBatchesId,
Rows: accRows,
}
select {
case chBatchesOut <- out:
wgActiveBatches.Add(1)
case <-ctx.Done():
return false
}
accRows = nil
parentBatchesId = nil
firstPartitionId = uuid.Nil
return true
}
for { for {
if ctx.Err() != nil { if ctx.Err() != nil {
return return
@@ -34,49 +62,56 @@ func (mssqlTr *MssqlTransformer) Consume(
case batch, ok := <-chBatchesIn: case batch, ok := <-chBatchesIn:
if !ok { if !ok {
flush()
return return
} }
if len(transformationPlan) == 0 { if len(transformationPlan) > 0 {
err := ProcessBatchWithRetries(ctx, &batch, transformationPlan, retryConfig)
if err != nil {
if errors.Is(err, ctx.Err()) {
return
}
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select {
case chJobErrorsOut <- *jobError:
case <-ctx.Done():
return
}
} else {
select {
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
case <-ctx.Done():
return
}
}
return
}
}
if batchSize <= 0 {
select { select {
case chBatchesOut <- batch: case chBatchesOut <- batch:
wgActiveBatches.Add(1) wgActiveBatches.Add(1)
continue
case <-ctx.Done(): case <-ctx.Done():
return return
} }
continue
} }
err := ProcessBatchWithRetries(ctx, &batch, transformationPlan, retryConfig) if len(parentBatchesId) == 0 {
if err != nil { firstPartitionId = batch.PartitionId
if errors.Is(err, ctx.Err()) { }
accRows = append(accRows, batch.Rows...)
parentBatchesId = append(parentBatchesId, batch.Id)
if len(accRows) >= batchSize {
if !flush() {
return return
} }
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select {
case chJobErrorsOut <- *jobError:
case <-ctx.Done():
return
}
} else {
select {
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
case <-ctx.Done():
return
}
}
return
} }
select {
case chBatchesOut <- batch:
case <-ctx.Done():
return
}
wgActiveBatches.Add(1)
} }
} }
} }

View File

@@ -21,6 +21,7 @@ type Transformer interface {
ctx context.Context, ctx context.Context,
columns []models.ColumnType, columns []models.ColumnType,
retryConfig config.RetryConfig, retryConfig config.RetryConfig,
batchSize int,
chBatchesIn <-chan models.Batch, chBatchesIn <-chan models.Batch,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- custom_errors.JobError, chJobErrorsOut chan<- custom_errors.JobError,

View File

@@ -9,10 +9,11 @@ import (
type UnknownRowValues = []any type UnknownRowValues = []any
type Batch struct { type Batch struct {
Id uuid.UUID Id uuid.UUID
PartitionId uuid.UUID PartitionId uuid.UUID
Rows []UnknownRowValues ParentBatchesId []uuid.UUID
RetryCounter int Rows []UnknownRowValues
RetryCounter int
} }
type PartitionRange struct { type PartitionRange struct {