diff --git a/cmd/go_migrate/build-extract-query.go b/cmd/go_migrate/build-extract-query.go deleted file mode 100644 index 478a4a8..0000000 --- a/cmd/go_migrate/build-extract-query.go +++ /dev/null @@ -1,75 +0,0 @@ -package main - -import ( - "fmt" - "strings" - - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" -) - -func buildExtractQueryMssql(sourceDbInfo config.SourceTableInfo, columns []ColumnType, includeRange bool, isMinInclusive bool) string { - var sbQuery strings.Builder - - sbQuery.WriteString("SELECT ") - - if len(columns) == 0 { - sbQuery.WriteString("*") - } else { - for i, col := range columns { - fmt.Fprintf(&sbQuery, "[%s]", col.name) - - if col.unifiedType == "GEOMETRY" { - fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.name) - } - - if i < len(columns)-1 { - sbQuery.WriteString(", ") - } - } - } - - fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", sourceDbInfo.Schema, sourceDbInfo.Table) - - if includeRange { - fmt.Fprintf(&sbQuery, " WHERE [%s]", sourceDbInfo.PrimaryKey) - if isMinInclusive { - sbQuery.WriteString(" >=") - } else { - sbQuery.WriteString(" >") - } - - fmt.Fprintf(&sbQuery, " @min AND [%s] <= @max", sourceDbInfo.PrimaryKey) - } - - fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", sourceDbInfo.PrimaryKey) - - return sbQuery.String() -} - -func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []ColumnType) string { - var sbColumns strings.Builder - - if len(columns) == 0 { - sbColumns.WriteString("*") - } else { - for i, col := range columns { - if col.unifiedType == "GEOMETRY" { - sbColumns.WriteString(`ST_AsEWKB("`) - sbColumns.WriteString(col.name) - sbColumns.WriteString(`") AS "`) - sbColumns.WriteString(col.name) - sbColumns.WriteString(`"`) - } else { - sbColumns.WriteString(`"`) - sbColumns.WriteString(col.name) - sbColumns.WriteString(`"`) - } - - if i < len(columns)-1 { - sbColumns.WriteString(", ") - } - } - } - - return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey) -} diff --git a/cmd/go_migrate/job-error-handler.go b/cmd/go_migrate/job-error-handler.go deleted file mode 100644 index 4fc89ff..0000000 --- a/cmd/go_migrate/job-error-handler.go +++ /dev/null @@ -1,47 +0,0 @@ -package main - -import ( - "context" - "fmt" - - log "github.com/sirupsen/logrus" -) - -type JobError struct { - ShouldCancelJob bool - Msg string - Prev error -} - -func (e *JobError) Error() string { - if e.Prev != nil { - return fmt.Sprintf("%s: %v", e.Msg, e.Prev) - } - - return e.Msg -} - -func jobErrorHandler(ctx context.Context, chErrorsIn <-chan JobError) error { - for { - if ctx.Err() != nil { - return nil - } - - select { - case <-ctx.Done(): - return nil - - case err, ok := <-chErrorsIn: - if !ok { - return nil - } - - if err.ShouldCancelJob { - log.Error(err.Msg, " - ", err.Prev) - return &err - } - - log.Error(err.Msg, " - ", err.Prev) - } - } -} diff --git a/cmd/go_migrate/loader.go b/cmd/go_migrate/loader.go index 70faa5e..54a9957 100644 --- a/cmd/go_migrate/loader.go +++ b/cmd/go_migrate/loader.go @@ -25,7 +25,7 @@ func loadRowsPostgres( tableInfo config.TargetTableInfo, columns []models.ColumnType, chChunksIn <-chan models.Chunk, - chErrorsOut chan<- LoaderError, + chErrorsOut chan<- custom_errors.LoaderError, chJobErrorsOut chan<- custom_errors.JobError, wgActiveChunks *sync.WaitGroup, rowsLoaded *int64, @@ -61,7 +61,7 @@ func loadChunkPostgres( identifier pgx.Identifier, colNames []string, chunk models.Chunk, - chErrorsOut chan<- LoaderError, + chErrorsOut chan<- custom_errors.LoaderError, chJobErrorsOut chan<- custom_errors.JobError, wgActiveChunks *sync.WaitGroup, rowsLoaded *int64, @@ -92,7 +92,7 @@ func loadChunkPostgres( } select { - case chErrorsOut <- LoaderError{Chunk: chunk, Msg: err.Error()}: + case chErrorsOut <- custom_errors.LoaderError{Chunk: chunk, Msg: err.Error()}: case <-ctx.Done(): return true } diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index c5c7513..0a7b800 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -52,7 +52,7 @@ func processMigrationJob( chExtractorErrors := make(chan custom_errors.ExtractorError, job.QueueSize) chChunksRaw := make(chan models.Chunk, job.QueueSize) chChunksTransformed := make(chan models.Chunk, job.QueueSize) - chLoadersErrors := make(chan LoaderError, job.QueueSize) + chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize) var wgActiveBatches sync.WaitGroup var wgActiveChunks sync.WaitGroup @@ -68,7 +68,7 @@ func processMigrationJob( }() go custom_errors.ExtractorErrorHandler(jobCtx, job.Retry.Attempts, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches) - go loaderErrorHandler(jobCtx, job.Retry.Attempts, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks) + go custom_errors.LoaderErrorHandler(jobCtx, job.Retry.Attempts, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks) maxExtractors := min(job.MaxExtractors, len(batches)) log.Infof("Starting %d extractor(s)...", maxExtractors) diff --git a/cmd/go_migrate/loader-error-handler.go b/internal/app/custom_errors/loader.error.go similarity index 83% rename from cmd/go_migrate/loader-error-handler.go rename to internal/app/custom_errors/loader.error.go index edd3569..3114b67 100644 --- a/cmd/go_migrate/loader-error-handler.go +++ b/internal/app/custom_errors/loader.error.go @@ -1,11 +1,10 @@ -package main +package custom_errors import ( "context" "fmt" "sync" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" ) @@ -18,12 +17,12 @@ func (e *LoaderError) Error() string { return e.Msg } -func loaderErrorHandler( +func LoaderErrorHandler( ctx context.Context, maxRetryAttempts int, chErrorsIn <-chan LoaderError, chChunksOut chan<- models.Chunk, - chJobErrorsOut chan<- custom_errors.JobError, + chJobErrorsOut chan<- JobError, wgActiveChunks *sync.WaitGroup, ) { for { @@ -41,7 +40,7 @@ func loaderErrorHandler( } if err.RetryCounter >= maxRetryAttempts { - jobError := custom_errors.JobError{ + jobError := JobError{ ShouldCancelJob: false, Msg: fmt.Sprintf("chunk %v reached max retries (%d)", err.Id, maxRetryAttempts), Prev: &err,