Compare commits
3 Commits
80babf24f2
...
c4e233401b
| Author | SHA1 | Date | |
|---|---|---|---|
|
c4e233401b
|
|||
|
a216a8016f
|
|||
|
46ddd0d6b7
|
@@ -13,6 +13,7 @@ import (
|
|||||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
@@ -43,7 +44,7 @@ func processMigrationJob(
|
|||||||
targetTableAnalyzer etl.TableAnalyzer,
|
targetTableAnalyzer etl.TableAnalyzer,
|
||||||
extractor extractors.GenericExtractor,
|
extractor extractors.GenericExtractor,
|
||||||
azureClient *azure.Client,
|
azureClient *azure.Client,
|
||||||
loader etl.Loader,
|
loader loaders.GenericLoader,
|
||||||
job config.Job,
|
job config.Job,
|
||||||
targetDbType string,
|
targetDbType string,
|
||||||
) models.JobResult {
|
) models.JobResult {
|
||||||
@@ -158,6 +159,7 @@ func processMigrationJob(
|
|||||||
chJobErrors,
|
chJobErrors,
|
||||||
&wgActivePartitions,
|
&wgActivePartitions,
|
||||||
&rowsRead,
|
&rowsRead,
|
||||||
|
job.SourceTable.FromJsonColumns,
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -188,12 +190,12 @@ func processMigrationJob(
|
|||||||
|
|
||||||
for range job.MaxLoaders {
|
for range job.MaxLoaders {
|
||||||
wgLoaders.Go(func() {
|
wgLoaders.Go(func() {
|
||||||
loader.Exec(
|
loader.Consume(
|
||||||
localCtx,
|
localCtx,
|
||||||
job.TargetTable,
|
job.TargetTable,
|
||||||
targetColTypes,
|
targetColTypes,
|
||||||
|
job.Retry,
|
||||||
chBatchesTransformed,
|
chBatchesTransformed,
|
||||||
chLoadersErrors,
|
|
||||||
chJobErrors,
|
chJobErrors,
|
||||||
&wgActiveBatches,
|
&wgActiveBatches,
|
||||||
&rowsLoaded,
|
&rowsLoaded,
|
||||||
|
|||||||
@@ -46,6 +46,9 @@ jobs:
|
|||||||
schema: Red
|
schema: Red
|
||||||
table: PUERTO
|
table: PUERTO
|
||||||
primary_key: ID_PUERTO
|
primary_key: ID_PUERTO
|
||||||
|
from_json:
|
||||||
|
- column: $node_id*
|
||||||
|
field: id
|
||||||
target:
|
target:
|
||||||
schema: Red
|
schema: Red
|
||||||
table: PUERTO
|
table: PUERTO
|
||||||
|
|||||||
@@ -43,14 +43,20 @@ type JobConfig struct {
|
|||||||
ToStorage ToStorageConfig `yaml:"to_storage"`
|
ToStorage ToStorageConfig `yaml:"to_storage"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type FromJsonItem struct {
|
||||||
|
Column string `yaml:"column"`
|
||||||
|
Field string `yaml:"field"`
|
||||||
|
}
|
||||||
|
|
||||||
type TableInfo struct {
|
type TableInfo struct {
|
||||||
Schema string `yaml:"schema"`
|
Schema string `yaml:"schema"`
|
||||||
Table string `yaml:"table"`
|
Table string `yaml:"table"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type SourceTableInfo struct {
|
type SourceTableInfo struct {
|
||||||
TableInfo `yaml:",inline"`
|
TableInfo `yaml:",inline"`
|
||||||
PrimaryKey string `yaml:"primary_key"`
|
PrimaryKey string `yaml:"primary_key"`
|
||||||
|
FromJsonColumns []FromJsonItem `yaml:"from_json"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type TargetTableInfo struct {
|
type TargetTableInfo struct {
|
||||||
|
|||||||
@@ -6,8 +6,11 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects"
|
dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
mssql "github.com/microsoft/go-mssqldb"
|
mssql "github.com/microsoft/go-mssqldb"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@@ -177,25 +180,55 @@ func (mw *mssqlDbWrapper) SaveMassive(ctx context.Context, schema string, table
|
|||||||
return rowsAffected, nil
|
return rowsAffected, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery) (RowsResult, error) {
|
func buildExtractQueryMssql(q ExtractionQuery) (string, error) {
|
||||||
var sbQuery strings.Builder
|
var sbQuery strings.Builder
|
||||||
|
|
||||||
sbQuery.WriteString("SELECT ")
|
sbQuery.WriteString("SELECT ")
|
||||||
|
|
||||||
if len(q.Columns) == 0 {
|
hasRegularColumns := len(q.Columns) > 0
|
||||||
sbQuery.WriteString("*")
|
hasJsonColumns := len(q.FromJsonColumns) > 0
|
||||||
} else {
|
|
||||||
for i, col := range q.Columns {
|
|
||||||
fmt.Fprintf(&sbQuery, "[%s]", col.Name())
|
|
||||||
|
|
||||||
|
// logrus.Debugf("Extraction query: %+v", q)
|
||||||
|
|
||||||
|
resolvedJson := make(map[string][]config.FromJsonItem, len(q.FromJsonColumns))
|
||||||
|
if hasJsonColumns {
|
||||||
|
for _, jsonConfig := range q.FromJsonColumns {
|
||||||
|
actualColumnName, err := findColumnByPattern(q.Columns, jsonConfig.Column)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
resolvedJson[actualColumnName] = append(resolvedJson[actualColumnName], jsonConfig)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
selectParts := make([]string, 0, len(q.Columns)+len(q.FromJsonColumns))
|
||||||
|
if hasRegularColumns {
|
||||||
|
for _, col := range q.Columns {
|
||||||
|
jsonConfigs, isJsonColumn := resolvedJson[col.Name()]
|
||||||
|
if isJsonColumn {
|
||||||
|
for _, jsonConfig := range jsonConfigs {
|
||||||
|
jsonPath := buildJsonPathMssql(jsonConfig.Field)
|
||||||
|
jsonExpr := fmt.Sprintf("JSON_VALUE([%s], '%s') AS [%s]", col.Name(), jsonPath, col.Name())
|
||||||
|
selectParts = append(selectParts, jsonExpr)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
colExpr := fmt.Sprintf("[%s]", col.Name())
|
||||||
switch col.Type() {
|
switch col.Type() {
|
||||||
case "GEOMETRY":
|
case "GEOMETRY":
|
||||||
fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
|
colExpr = fmt.Sprintf("[%s].STAsBinary() AS [%s]", col.Name(), col.Name())
|
||||||
}
|
}
|
||||||
|
selectParts = append(selectParts, colExpr)
|
||||||
|
}
|
||||||
|
} else if !hasJsonColumns {
|
||||||
|
selectParts = append(selectParts, "*")
|
||||||
|
}
|
||||||
|
|
||||||
if i < len(q.Columns)-1 {
|
for i, part := range selectParts {
|
||||||
sbQuery.WriteString(", ")
|
sbQuery.WriteString(part)
|
||||||
}
|
if i < len(selectParts)-1 {
|
||||||
|
sbQuery.WriteString(", ")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -231,9 +264,39 @@ func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery
|
|||||||
|
|
||||||
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", q.PrimaryKey)
|
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", q.PrimaryKey)
|
||||||
|
|
||||||
queryString := sbQuery.String()
|
return sbQuery.String(), nil
|
||||||
|
}
|
||||||
|
|
||||||
// logrus.Debugf("Query: %s", queryString)
|
func findColumnByPattern(columns []models.ColumnType, pattern string) (string, error) {
|
||||||
|
if pattern == "" {
|
||||||
|
return "", fmt.Errorf("column pattern cannot be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
if before, ok := strings.CutSuffix(pattern, "*"); ok {
|
||||||
|
prefix := before
|
||||||
|
for _, col := range columns {
|
||||||
|
if strings.HasPrefix(col.Name(), prefix) {
|
||||||
|
return col.Name(), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "", fmt.Errorf("no column found matching pattern '%s'", pattern)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, col := range columns {
|
||||||
|
if col.Name() == pattern {
|
||||||
|
return col.Name(), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "", fmt.Errorf("column '%s' not found in table columns", pattern)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery) (RowsResult, error) {
|
||||||
|
queryString, err := buildExtractQueryMssql(q)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
logrus.Debugf("Query: %s", queryString)
|
||||||
|
|
||||||
var queryArgs []any
|
var queryArgs []any
|
||||||
|
|
||||||
@@ -247,3 +310,11 @@ func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery
|
|||||||
|
|
||||||
return mw.Query(ctx, queryString, queryArgs...)
|
return mw.Query(ctx, queryString, queryArgs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func buildJsonPathMssql(field string) string {
|
||||||
|
if len(field) > 0 && field[0] == '.' {
|
||||||
|
field = field[1:]
|
||||||
|
}
|
||||||
|
|
||||||
|
return "$." + field
|
||||||
|
}
|
||||||
|
|||||||
396
internal/app/db-wrapper/mssql_test.go
Normal file
396
internal/app/db-wrapper/mssql_test.go
Normal file
@@ -0,0 +1,396 @@
|
|||||||
|
package dbwrapper
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBuildExtractQueryMssql_NoJsonColumns(t *testing.T) {
|
||||||
|
q := ExtractionQuery{
|
||||||
|
Schema: "dbo",
|
||||||
|
Table: "Users",
|
||||||
|
PrimaryKey: "ID",
|
||||||
|
Columns: []models.ColumnType{
|
||||||
|
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||||
|
models.NewColumnType("Name", true, false, "VARCHAR", "varchar", "VARCHAR", true, 255, 0, 0),
|
||||||
|
},
|
||||||
|
FromJsonColumns: []config.FromJsonItem{},
|
||||||
|
LowerLimit: ExtractorQueryLimit{IsValid: false},
|
||||||
|
UpperLimit: ExtractorQueryLimit{IsValid: false},
|
||||||
|
}
|
||||||
|
|
||||||
|
query, err := buildExtractQueryMssql(q)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected no error, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.Contains(query, "SELECT [ID], [Name]") {
|
||||||
|
t.Errorf("Expected columns in query, got: %s", query)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.Contains(query, "FROM [dbo].[Users]") {
|
||||||
|
t.Errorf("Expected FROM clause, got: %s", query)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.Contains(query, "ORDER BY [ID] ASC") {
|
||||||
|
t.Errorf("Expected ORDER BY clause, got: %s", query)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildExtractQueryMssql_WithJsonColumns_ExactColumnMatch(t *testing.T) {
|
||||||
|
// Test that the actual column name is used as alias, not a generated one
|
||||||
|
q := ExtractionQuery{
|
||||||
|
Schema: "dbo",
|
||||||
|
Table: "Events",
|
||||||
|
PrimaryKey: "EventID",
|
||||||
|
Columns: []models.ColumnType{
|
||||||
|
models.NewColumnType("EventID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||||
|
models.NewColumnType("EventData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||||
|
},
|
||||||
|
FromJsonColumns: []config.FromJsonItem{
|
||||||
|
{Column: "EventData", Field: ".userId"},
|
||||||
|
{Column: "EventData", Field: ".timestamp"},
|
||||||
|
},
|
||||||
|
LowerLimit: ExtractorQueryLimit{IsValid: false},
|
||||||
|
UpperLimit: ExtractorQueryLimit{IsValid: false},
|
||||||
|
}
|
||||||
|
|
||||||
|
query, err := buildExtractQueryMssql(q)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected no error, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.HasPrefix(query, "SELECT [EventID], JSON_VALUE([EventData], '$.userId') AS [EventData], JSON_VALUE([EventData], '$.timestamp') AS [EventData]") {
|
||||||
|
t.Errorf("Expected JSON columns to replace EventData in-order, got: %s", query)
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.Contains(query, "SELECT [EventID], [EventData]") {
|
||||||
|
t.Errorf("Expected EventData to be replaced by JSON extraction, got: %s", query)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Alias should be exactly "EventData", not "EventData_userId"
|
||||||
|
if !strings.Contains(query, "JSON_VALUE([EventData], '$.userId') AS [EventData]") {
|
||||||
|
t.Errorf("Expected JSON alias to be [EventData], got: %s", query)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.Contains(query, "JSON_VALUE([EventData], '$.timestamp') AS [EventData]") {
|
||||||
|
t.Errorf("Expected JSON alias to be [EventData], got: %s", query)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should have comma separating them
|
||||||
|
if !strings.Contains(query, "JSON_VALUE([EventData], '$.userId') AS [EventData], JSON_VALUE([EventData], '$.timestamp') AS [EventData]") {
|
||||||
|
t.Errorf("Expected comma-separated JSON values, got: %s", query)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildExtractQueryMssql_WithWildcardPattern(t *testing.T) {
|
||||||
|
// Test that wildcard pattern matching finds the correct column
|
||||||
|
q := ExtractionQuery{
|
||||||
|
Schema: "dbo",
|
||||||
|
Table: "Events",
|
||||||
|
PrimaryKey: "ID",
|
||||||
|
Columns: []models.ColumnType{
|
||||||
|
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||||
|
models.NewColumnType("NodeMetadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||||
|
},
|
||||||
|
FromJsonColumns: []config.FromJsonItem{
|
||||||
|
{Column: "NodeMeta*", Field: ".id"},
|
||||||
|
},
|
||||||
|
LowerLimit: ExtractorQueryLimit{IsValid: false},
|
||||||
|
UpperLimit: ExtractorQueryLimit{IsValid: false},
|
||||||
|
}
|
||||||
|
|
||||||
|
query, err := buildExtractQueryMssql(q)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected no error, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should find "NodeMetadata" from pattern "NodeMeta*" and use it as alias
|
||||||
|
if !strings.Contains(query, "JSON_VALUE([NodeMetadata], '$.id') AS [NodeMetadata]") {
|
||||||
|
t.Errorf("Expected to find and use NodeMetadata column by pattern, got: %s", query)
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.Contains(query, "SELECT [ID], [NodeMetadata]") {
|
||||||
|
t.Errorf("Expected NodeMetadata to be replaced by JSON extraction, got: %s", query)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildExtractQueryMssql_ColumnNotFound_Error(t *testing.T) {
|
||||||
|
// Test that an error is returned when column is not found
|
||||||
|
q := ExtractionQuery{
|
||||||
|
Schema: "dbo",
|
||||||
|
Table: "Events",
|
||||||
|
PrimaryKey: "ID",
|
||||||
|
Columns: []models.ColumnType{
|
||||||
|
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||||
|
},
|
||||||
|
FromJsonColumns: []config.FromJsonItem{
|
||||||
|
{Column: "NonExistentColumn", Field: ".id"},
|
||||||
|
},
|
||||||
|
LowerLimit: ExtractorQueryLimit{IsValid: false},
|
||||||
|
UpperLimit: ExtractorQueryLimit{IsValid: false},
|
||||||
|
}
|
||||||
|
|
||||||
|
query, err := buildExtractQueryMssql(q)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Expected error for missing column, got no error. Query: %s", query)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.Contains(err.Error(), "NonExistentColumn") {
|
||||||
|
t.Errorf("Expected error message to contain column name, got: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildExtractQueryMssql_WildcardPatternNotMatched_Error(t *testing.T) {
|
||||||
|
// Test that an error is returned when wildcard pattern doesn't match any column
|
||||||
|
q := ExtractionQuery{
|
||||||
|
Schema: "dbo",
|
||||||
|
Table: "Events",
|
||||||
|
PrimaryKey: "ID",
|
||||||
|
Columns: []models.ColumnType{
|
||||||
|
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||||
|
models.NewColumnType("EventData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||||
|
},
|
||||||
|
FromJsonColumns: []config.FromJsonItem{
|
||||||
|
{Column: "NonMatching*", Field: ".id"},
|
||||||
|
},
|
||||||
|
LowerLimit: ExtractorQueryLimit{IsValid: false},
|
||||||
|
UpperLimit: ExtractorQueryLimit{IsValid: false},
|
||||||
|
}
|
||||||
|
|
||||||
|
query, err := buildExtractQueryMssql(q)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Expected error for non-matching wildcard pattern, got no error. Query: %s", query)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.Contains(err.Error(), "NonMatching*") {
|
||||||
|
t.Errorf("Expected error message to contain pattern, got: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildExtractQueryMssql_NestedJsonFields(t *testing.T) {
|
||||||
|
q := ExtractionQuery{
|
||||||
|
Schema: "dbo",
|
||||||
|
Table: "Data",
|
||||||
|
PrimaryKey: "ID",
|
||||||
|
Columns: []models.ColumnType{
|
||||||
|
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||||
|
models.NewColumnType("NodeData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||||
|
},
|
||||||
|
FromJsonColumns: []config.FromJsonItem{
|
||||||
|
{Column: "NodeData", Field: ".user.name"},
|
||||||
|
{Column: "NodeData", Field: ".user.email"},
|
||||||
|
},
|
||||||
|
LowerLimit: ExtractorQueryLimit{IsValid: false},
|
||||||
|
UpperLimit: ExtractorQueryLimit{IsValid: false},
|
||||||
|
}
|
||||||
|
|
||||||
|
query, err := buildExtractQueryMssql(q)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected no error, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.Contains(query, "JSON_VALUE([NodeData], '$.user.name') AS [NodeData]") {
|
||||||
|
t.Errorf("Expected nested JSON path for user.name, got: %s", query)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.Contains(query, "JSON_VALUE([NodeData], '$.user.email') AS [NodeData]") {
|
||||||
|
t.Errorf("Expected nested JSON path for user.email, got: %s", query)
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.Contains(query, "SELECT [ID], [NodeData]") {
|
||||||
|
t.Errorf("Expected NodeData to be replaced by JSON extraction, got: %s", query)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildExtractQueryMssql_WithRangeLimits(t *testing.T) {
|
||||||
|
q := ExtractionQuery{
|
||||||
|
Schema: "dbo",
|
||||||
|
Table: "Products",
|
||||||
|
PrimaryKey: "ProductID",
|
||||||
|
Columns: []models.ColumnType{
|
||||||
|
models.NewColumnType("ProductID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||||
|
models.NewColumnType("Details", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||||
|
},
|
||||||
|
FromJsonColumns: []config.FromJsonItem{
|
||||||
|
{Column: "Details", Field: ".price"},
|
||||||
|
},
|
||||||
|
LowerLimit: ExtractorQueryLimit{IsValid: true, IsInclusive: true, Value: 100},
|
||||||
|
UpperLimit: ExtractorQueryLimit{IsValid: true, IsInclusive: false, Value: 500},
|
||||||
|
}
|
||||||
|
|
||||||
|
query, err := buildExtractQueryMssql(q)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected no error, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.Contains(query, "WHERE [ProductID] >= @min") {
|
||||||
|
t.Errorf("Expected WHERE clause with >=, got: %s", query)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.Contains(query, "[ProductID] < @max") {
|
||||||
|
t.Errorf("Expected upper limit with <, got: %s", query)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.Contains(query, "JSON_VALUE([Details], '$.price') AS [Details]") {
|
||||||
|
t.Errorf("Expected JSON_VALUE for Details, got: %s", query)
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.Contains(query, "SELECT [ProductID], [Details]") {
|
||||||
|
t.Errorf("Expected Details to be replaced by JSON extraction, got: %s", query)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildJsonPathMssql(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
input string
|
||||||
|
expected string
|
||||||
|
}{
|
||||||
|
{".id", "$.id"},
|
||||||
|
{"id", "$.id"},
|
||||||
|
{".user.name", "$.user.name"},
|
||||||
|
{"user.name", "$.user.name"},
|
||||||
|
{".location.coordinates.lat", "$.location.coordinates.lat"},
|
||||||
|
{"", "$."},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
result := buildJsonPathMssql(tt.input)
|
||||||
|
if result != tt.expected {
|
||||||
|
t.Errorf("buildJsonPathMssql(%q) = %q, want %q", tt.input, result, tt.expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFindColumnByPattern_ExactMatch(t *testing.T) {
|
||||||
|
columns := []models.ColumnType{
|
||||||
|
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||||
|
models.NewColumnType("Metadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||||
|
models.NewColumnType("EventData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := findColumnByPattern(columns, "Metadata")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected no error, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if result != "Metadata" {
|
||||||
|
t.Errorf("Expected 'Metadata', got '%s'", result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFindColumnByPattern_WildcardMatch(t *testing.T) {
|
||||||
|
columns := []models.ColumnType{
|
||||||
|
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||||
|
models.NewColumnType("NodeMetadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||||
|
models.NewColumnType("EventData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := findColumnByPattern(columns, "NodeMeta*")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected no error, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if result != "NodeMetadata" {
|
||||||
|
t.Errorf("Expected 'NodeMetadata', got '%s'", result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFindColumnByPattern_NotFound(t *testing.T) {
|
||||||
|
columns := []models.ColumnType{
|
||||||
|
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||||
|
models.NewColumnType("Metadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := findColumnByPattern(columns, "NonExistent")
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Expected error, got no error. Result: %s", result)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.Contains(err.Error(), "NonExistent") {
|
||||||
|
t.Errorf("Expected error to contain column name, got: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFindColumnByPattern_WildcardNotFound(t *testing.T) {
|
||||||
|
columns := []models.ColumnType{
|
||||||
|
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||||
|
models.NewColumnType("Metadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := findColumnByPattern(columns, "Event*")
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Expected error, got no error. Result: %s", result)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.Contains(err.Error(), "Event*") {
|
||||||
|
t.Errorf("Expected error to contain pattern, got: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildExtractQueryMssql_OnlyJsonColumns(t *testing.T) {
|
||||||
|
// Test when all columns are used via JSON extraction
|
||||||
|
q := ExtractionQuery{
|
||||||
|
Schema: "dbo",
|
||||||
|
Table: "Data",
|
||||||
|
PrimaryKey: "ID",
|
||||||
|
Columns: []models.ColumnType{
|
||||||
|
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||||
|
models.NewColumnType("JsonData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||||
|
},
|
||||||
|
FromJsonColumns: []config.FromJsonItem{
|
||||||
|
{Column: "JsonData", Field: ".field1"},
|
||||||
|
},
|
||||||
|
LowerLimit: ExtractorQueryLimit{IsValid: false},
|
||||||
|
UpperLimit: ExtractorQueryLimit{IsValid: false},
|
||||||
|
}
|
||||||
|
|
||||||
|
query, err := buildExtractQueryMssql(q)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected no error, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.HasPrefix(query, "SELECT [ID], JSON_VALUE([JsonData], '$.field1') AS [JsonData]") {
|
||||||
|
t.Errorf("Expected JsonData to be replaced by JSON extraction, got: %s", query)
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.Contains(query, "SELECT [ID], [JsonData]") {
|
||||||
|
t.Errorf("Expected JsonData to be excluded from raw selection, got: %s", query)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildExtractQueryMssql_JsonColumnsReplaceInOrder(t *testing.T) {
|
||||||
|
q := ExtractionQuery{
|
||||||
|
Schema: "dbo",
|
||||||
|
Table: "Users",
|
||||||
|
PrimaryKey: "UserID",
|
||||||
|
Columns: []models.ColumnType{
|
||||||
|
models.NewColumnType("UserID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||||
|
models.NewColumnType("Name", true, false, "VARCHAR", "varchar", "VARCHAR", false, 255, 0, 0),
|
||||||
|
models.NewColumnType("Email", true, false, "VARCHAR", "varchar", "VARCHAR", false, 255, 0, 0),
|
||||||
|
models.NewColumnType("Metadata", true, false, "NVARCHAR", "nvarchar", "NVARCHAR", true, 4000, 0, 0),
|
||||||
|
models.NewColumnType("Profile", true, false, "NVARCHAR", "nvarchar", "NVARCHAR", true, 4000, 0, 0),
|
||||||
|
models.NewColumnType("Settings", true, false, "NVARCHAR", "nvarchar", "NVARCHAR", true, 4000, 0, 0),
|
||||||
|
},
|
||||||
|
FromJsonColumns: []config.FromJsonItem{
|
||||||
|
{Column: "Metadata", Field: ".id"},
|
||||||
|
{Column: "Profile", Field: ".id"},
|
||||||
|
{Column: "Settings", Field: ".id"},
|
||||||
|
},
|
||||||
|
LowerLimit: ExtractorQueryLimit{IsValid: false},
|
||||||
|
UpperLimit: ExtractorQueryLimit{IsValid: false},
|
||||||
|
}
|
||||||
|
|
||||||
|
query, err := buildExtractQueryMssql(q)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected no error, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := "SELECT [UserID], [Name], [Email], JSON_VALUE([Metadata], '$.id') AS [Metadata], JSON_VALUE([Profile], '$.id') AS [Profile], JSON_VALUE([Settings], '$.id') AS [Settings] FROM [dbo].[Users] ORDER BY [UserID] ASC"
|
||||||
|
if query != expected {
|
||||||
|
t.Errorf("Unexpected query.\nExpected: %s\nGot: %s", expected, query)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -33,12 +34,13 @@ type ExtractorQueryLimit struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type ExtractionQuery struct {
|
type ExtractionQuery struct {
|
||||||
Schema string
|
Schema string
|
||||||
Table string
|
Table string
|
||||||
PrimaryKey string
|
PrimaryKey string
|
||||||
Columns []models.ColumnType
|
Columns []models.ColumnType
|
||||||
LowerLimit ExtractorQueryLimit
|
LowerLimit ExtractorQueryLimit
|
||||||
UpperLimit ExtractorQueryLimit
|
UpperLimit ExtractorQueryLimit
|
||||||
|
FromJsonColumns []config.FromJsonItem
|
||||||
}
|
}
|
||||||
|
|
||||||
type DbWrapper interface {
|
type DbWrapper interface {
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ func (ex *GenericExtractor) Consume(
|
|||||||
chErrorsOut chan<- custom_errors.JobError,
|
chErrorsOut chan<- custom_errors.JobError,
|
||||||
wgActivePartitions *sync.WaitGroup,
|
wgActivePartitions *sync.WaitGroup,
|
||||||
rowsRead *int64,
|
rowsRead *int64,
|
||||||
|
fromJsonColumns []config.FromJsonItem,
|
||||||
) {
|
) {
|
||||||
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
|
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
|
||||||
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
|
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
|
||||||
@@ -65,6 +66,7 @@ func (ex *GenericExtractor) Consume(
|
|||||||
indexPrimaryKey,
|
indexPrimaryKey,
|
||||||
retryConfig,
|
retryConfig,
|
||||||
chBatchesOut,
|
chBatchesOut,
|
||||||
|
fromJsonColumns,
|
||||||
)
|
)
|
||||||
wgActivePartitions.Done()
|
wgActivePartitions.Done()
|
||||||
|
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ func (ex *GenericExtractor) ProcessPartitionWithRetries(
|
|||||||
indexPrimaryKey int,
|
indexPrimaryKey int,
|
||||||
retryConfig config.RetryConfig,
|
retryConfig config.RetryConfig,
|
||||||
chBatchesOut chan<- models.Batch,
|
chBatchesOut chan<- models.Batch,
|
||||||
|
fromJsonColumns []config.FromJsonItem,
|
||||||
) (int64, error) {
|
) (int64, error) {
|
||||||
var totalRowsRead int64
|
var totalRowsRead int64
|
||||||
currentParitition := partition
|
currentParitition := partition
|
||||||
@@ -35,6 +36,7 @@ func (ex *GenericExtractor) ProcessPartitionWithRetries(
|
|||||||
currentParitition,
|
currentParitition,
|
||||||
indexPrimaryKey,
|
indexPrimaryKey,
|
||||||
chBatchesOut,
|
chBatchesOut,
|
||||||
|
fromJsonColumns,
|
||||||
)
|
)
|
||||||
// logrus.Debugf("Partition %v finished processing (%s.%s)", partition.Id, tableInfo.Schema, tableInfo.Table)
|
// logrus.Debugf("Partition %v finished processing (%s.%s)", partition.Id, tableInfo.Schema, tableInfo.Table)
|
||||||
totalRowsRead += rowsRead
|
totalRowsRead += rowsRead
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ func (ex *GenericExtractor) ProcessPartition(
|
|||||||
partition models.Partition,
|
partition models.Partition,
|
||||||
indexPrimaryKey int,
|
indexPrimaryKey int,
|
||||||
chBatchesOut chan<- models.Batch,
|
chBatchesOut chan<- models.Batch,
|
||||||
|
fromJsonColumns []config.FromJsonItem,
|
||||||
) (int64, error) {
|
) (int64, error) {
|
||||||
query := dbwrapper.ExtractionQuery{
|
query := dbwrapper.ExtractionQuery{
|
||||||
Schema: tableInfo.Schema,
|
Schema: tableInfo.Schema,
|
||||||
@@ -63,6 +64,7 @@ func (ex *GenericExtractor) ProcessPartition(
|
|||||||
IsInclusive: partition.Range.IsMaxInclusive,
|
IsInclusive: partition.Range.IsMaxInclusive,
|
||||||
Value: partition.Range.Max,
|
Value: partition.Range.Max,
|
||||||
},
|
},
|
||||||
|
FromJsonColumns: fromJsonColumns,
|
||||||
}
|
}
|
||||||
|
|
||||||
// logrus.Debugf("Processing partition: %+v (%s.%s)", query, tableInfo.Schema, tableInfo.Table)
|
// logrus.Debugf("Processing partition: %+v (%s.%s)", query, tableInfo.Schema, tableInfo.Table)
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ func (gl *GenericLoader) Consume(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
|
processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, batch)
|
||||||
wgActiveBatches.Done()
|
wgActiveBatches.Done()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -1,116 +1,13 @@
|
|||||||
package loaders
|
package loaders
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
|
||||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
|
||||||
"github.com/jackc/pgx/v5/pgconn"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type GenericLoader struct {
|
type GenericLoader struct {
|
||||||
db dbwrapper.DbWrapper
|
db dbwrapper.DbWrapper
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader {
|
func NewGenericLoader(db dbwrapper.DbWrapper) GenericLoader {
|
||||||
return &GenericLoader{db: db}
|
return GenericLoader{db: db}
|
||||||
}
|
|
||||||
|
|
||||||
func (gl *GenericLoader) ProcessBatch(
|
|
||||||
ctx context.Context,
|
|
||||||
tableInfo config.TargetTableInfo,
|
|
||||||
colNames []string,
|
|
||||||
batch models.Batch,
|
|
||||||
) (int, error) {
|
|
||||||
_, err := gl.db.SaveMassive(
|
|
||||||
ctx,
|
|
||||||
tableInfo.Schema,
|
|
||||||
tableInfo.Table,
|
|
||||||
colNames,
|
|
||||||
batch.Rows,
|
|
||||||
)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
if pgErr, ok := errors.AsType[*pgconn.PgError](err); ok {
|
|
||||||
if pgErr.Code == "23505" {
|
|
||||||
return 0, &custom_errors.JobError{
|
|
||||||
ShouldCancelJob: true,
|
|
||||||
Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table),
|
|
||||||
Prev: err,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0, &custom_errors.LoaderError{Batch: batch, Msg: err.Error()}
|
|
||||||
}
|
|
||||||
|
|
||||||
return len(batch.Rows), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (gl *GenericLoader) Exec(
|
|
||||||
ctx context.Context,
|
|
||||||
tableInfo config.TargetTableInfo,
|
|
||||||
columns []models.ColumnType,
|
|
||||||
chBatchesIn <-chan models.Batch,
|
|
||||||
chErrorsOut chan<- custom_errors.LoaderError,
|
|
||||||
chJobErrorsOut chan<- custom_errors.JobError,
|
|
||||||
wgActiveBatches *sync.WaitGroup,
|
|
||||||
rowsLoaded *int64,
|
|
||||||
) {
|
|
||||||
colNames := mapSlice(columns, func(col models.ColumnType) string {
|
|
||||||
return col.Name()
|
|
||||||
})
|
|
||||||
|
|
||||||
for {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case batch, ok := <-chBatchesIn:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
var ldError *custom_errors.LoaderError
|
|
||||||
var jobError *custom_errors.JobError
|
|
||||||
if errors.As(err, &ldError) {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case chErrorsOut <- *ldError:
|
|
||||||
}
|
|
||||||
} else if errors.As(err, &jobError) {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case chJobErrorsOut <- *jobError:
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case chErrorsOut <- custom_errors.LoaderError{Batch: batch, Msg: err.Error()}:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
wgActiveBatches.Done()
|
|
||||||
atomic.AddInt64(rowsLoaded, int64(processedRows))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
50
internal/app/etl/loaders/process-with-retries.go
Normal file
50
internal/app/etl/loaders/process-with-retries.go
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
package loaders
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (gl *GenericLoader) ProcessBatchWithRetries(
|
||||||
|
ctx context.Context,
|
||||||
|
tableInfo config.TargetTableInfo,
|
||||||
|
colNames []string,
|
||||||
|
retryConfig config.RetryConfig,
|
||||||
|
batch models.Batch,
|
||||||
|
) (int64, error) {
|
||||||
|
for {
|
||||||
|
rowsLoaded, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
|
||||||
|
if err == nil {
|
||||||
|
return rowsLoaded, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if btError, ok := errors.AsType[*custom_errors.LoaderError](err); ok {
|
||||||
|
batch.RetryCounter++
|
||||||
|
|
||||||
|
if batch.RetryCounter >= retryConfig.Attempts {
|
||||||
|
return rowsLoaded, &custom_errors.JobError{
|
||||||
|
ShouldCancelJob: false,
|
||||||
|
Msg: fmt.Sprintf("Temporal error in batch %v (retries: %d)", btError.Batch.Id, btError.Batch.RetryCounter),
|
||||||
|
Prev: btError,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
delay := custom_errors.ComputeBackoffDelay(
|
||||||
|
batch.RetryCounter,
|
||||||
|
retryConfig.BaseDelayMs,
|
||||||
|
retryConfig.MaxDelayMs,
|
||||||
|
retryConfig.MaxJitterMs,
|
||||||
|
)
|
||||||
|
time.Sleep(delay)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return rowsLoaded, err
|
||||||
|
}
|
||||||
|
}
|
||||||
43
internal/app/etl/loaders/process.go
Normal file
43
internal/app/etl/loaders/process.go
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
package loaders
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (gl *GenericLoader) ProcessBatch(
|
||||||
|
ctx context.Context,
|
||||||
|
tableInfo config.TargetTableInfo,
|
||||||
|
colNames []string,
|
||||||
|
batch models.Batch,
|
||||||
|
) (int64, error) {
|
||||||
|
_, err := gl.db.SaveMassive(
|
||||||
|
ctx,
|
||||||
|
tableInfo.Schema,
|
||||||
|
tableInfo.Table,
|
||||||
|
colNames,
|
||||||
|
batch.Rows,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if pgErr, ok := errors.AsType[*pgconn.PgError](err); ok {
|
||||||
|
if pgErr.Code == "23505" {
|
||||||
|
return 0, &custom_errors.JobError{
|
||||||
|
ShouldCancelJob: true,
|
||||||
|
Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table),
|
||||||
|
Prev: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0, &custom_errors.LoaderError{Batch: batch, Msg: err.Error()}
|
||||||
|
}
|
||||||
|
|
||||||
|
return int64(len(batch.Rows)), nil
|
||||||
|
}
|
||||||
@@ -51,18 +51,7 @@ type Loader interface {
|
|||||||
tableInfo config.TargetTableInfo,
|
tableInfo config.TargetTableInfo,
|
||||||
colNames []string,
|
colNames []string,
|
||||||
batch models.Batch,
|
batch models.Batch,
|
||||||
) (int, error)
|
) (int64, error)
|
||||||
|
|
||||||
Exec(
|
|
||||||
ctx context.Context,
|
|
||||||
tableInfo config.TargetTableInfo,
|
|
||||||
columns []models.ColumnType,
|
|
||||||
chBatchesIn <-chan models.Batch,
|
|
||||||
chErrorsOut chan<- custom_errors.LoaderError,
|
|
||||||
chJobErrorsOut chan<- custom_errors.JobError,
|
|
||||||
wgActiveBatches *sync.WaitGroup,
|
|
||||||
rowsLoaded *int64,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type TableAnalyzer interface {
|
type TableAnalyzer interface {
|
||||||
|
|||||||
Reference in New Issue
Block a user