Compare commits
2 Commits
2a5f703f3c
...
80babf24f2
| Author | SHA1 | Date | |
|---|---|---|---|
|
80babf24f2
|
|||
|
f12937a1c3
|
@@ -20,6 +20,8 @@ import (
|
|||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const jobErrorsChannelSize int = 100
|
||||||
|
|
||||||
func buildTruncateQuery(targetDbType, schema, table, truncateMethod string) string {
|
func buildTruncateQuery(targetDbType, schema, table, truncateMethod string) string {
|
||||||
if truncateMethod == "DELETE" {
|
if truncateMethod == "DELETE" {
|
||||||
if targetDbType == "postgres" {
|
if targetDbType == "postgres" {
|
||||||
@@ -110,11 +112,11 @@ func processMigrationJob(
|
|||||||
log.Error("Unexpected error calculating batch ranges: ", err)
|
log.Error("Unexpected error calculating batch ranges: ", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
chJobErrors := make(chan custom_errors.JobError, job.ExtractorQueueSize)
|
chJobErrors := make(chan custom_errors.JobError, jobErrorsChannelSize)
|
||||||
chLoadersErrors := make(chan custom_errors.LoaderError, job.ExtractorQueueSize)
|
chLoadersErrors := make(chan custom_errors.LoaderError, job.ExtractorQueueSize)
|
||||||
chPartitions := make(chan models.Partition, job.ExtractorQueueSize)
|
chPartitions := make(chan models.Partition, job.ExtractorQueueSize)
|
||||||
chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize)
|
chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize)
|
||||||
chBatchesTransformed := make(chan models.Batch, job.ExtractorQueueSize)
|
chBatchesTransformed := make(chan models.Batch, job.TransformerQueueSize)
|
||||||
|
|
||||||
var wgActivePartitions sync.WaitGroup
|
var wgActivePartitions sync.WaitGroup
|
||||||
var wgActiveBatches sync.WaitGroup
|
var wgActiveBatches sync.WaitGroup
|
||||||
|
|||||||
63
internal/app/etl/loaders/consume.go
Normal file
63
internal/app/etl/loaders/consume.go
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
package loaders
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (gl *GenericLoader) Consume(
|
||||||
|
ctx context.Context,
|
||||||
|
tableInfo config.TargetTableInfo,
|
||||||
|
columns []models.ColumnType,
|
||||||
|
retryConfig config.RetryConfig,
|
||||||
|
chBatchesIn <-chan models.Batch,
|
||||||
|
chErrorsOut chan<- custom_errors.JobError,
|
||||||
|
wgActiveBatches *sync.WaitGroup,
|
||||||
|
rowsLoaded *int64,
|
||||||
|
) {
|
||||||
|
colNames := mapSlice(columns, func(col models.ColumnType) string {
|
||||||
|
return col.Name()
|
||||||
|
})
|
||||||
|
|
||||||
|
for {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case batch, ok := <-chBatchesIn:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
|
||||||
|
wgActiveBatches.Done()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case chErrorsOut <- *jobError:
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
atomic.AddInt64(rowsLoaded, int64(processedRows))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -38,8 +38,7 @@ func (gl *GenericLoader) ProcessBatch(
|
|||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var pgErr *pgconn.PgError
|
if pgErr, ok := errors.AsType[*pgconn.PgError](err); ok {
|
||||||
if errors.As(err, &pgErr) {
|
|
||||||
if pgErr.Code == "23505" {
|
if pgErr.Code == "23505" {
|
||||||
return 0, &custom_errors.JobError{
|
return 0, &custom_errors.JobError{
|
||||||
ShouldCancelJob: true,
|
ShouldCancelJob: true,
|
||||||
|
|||||||
Reference in New Issue
Block a user