Compare commits
3 Commits
80babf24f2
...
c4e233401b
| Author | SHA1 | Date | |
|---|---|---|---|
|
c4e233401b
|
|||
|
a216a8016f
|
|||
|
46ddd0d6b7
|
@@ -13,6 +13,7 @@ import (
|
||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
@@ -43,7 +44,7 @@ func processMigrationJob(
|
||||
targetTableAnalyzer etl.TableAnalyzer,
|
||||
extractor extractors.GenericExtractor,
|
||||
azureClient *azure.Client,
|
||||
loader etl.Loader,
|
||||
loader loaders.GenericLoader,
|
||||
job config.Job,
|
||||
targetDbType string,
|
||||
) models.JobResult {
|
||||
@@ -158,6 +159,7 @@ func processMigrationJob(
|
||||
chJobErrors,
|
||||
&wgActivePartitions,
|
||||
&rowsRead,
|
||||
job.SourceTable.FromJsonColumns,
|
||||
)
|
||||
})
|
||||
}
|
||||
@@ -188,12 +190,12 @@ func processMigrationJob(
|
||||
|
||||
for range job.MaxLoaders {
|
||||
wgLoaders.Go(func() {
|
||||
loader.Exec(
|
||||
loader.Consume(
|
||||
localCtx,
|
||||
job.TargetTable,
|
||||
targetColTypes,
|
||||
job.Retry,
|
||||
chBatchesTransformed,
|
||||
chLoadersErrors,
|
||||
chJobErrors,
|
||||
&wgActiveBatches,
|
||||
&rowsLoaded,
|
||||
|
||||
@@ -46,6 +46,9 @@ jobs:
|
||||
schema: Red
|
||||
table: PUERTO
|
||||
primary_key: ID_PUERTO
|
||||
from_json:
|
||||
- column: $node_id*
|
||||
field: id
|
||||
target:
|
||||
schema: Red
|
||||
table: PUERTO
|
||||
|
||||
@@ -43,6 +43,11 @@ type JobConfig struct {
|
||||
ToStorage ToStorageConfig `yaml:"to_storage"`
|
||||
}
|
||||
|
||||
type FromJsonItem struct {
|
||||
Column string `yaml:"column"`
|
||||
Field string `yaml:"field"`
|
||||
}
|
||||
|
||||
type TableInfo struct {
|
||||
Schema string `yaml:"schema"`
|
||||
Table string `yaml:"table"`
|
||||
@@ -51,6 +56,7 @@ type TableInfo struct {
|
||||
type SourceTableInfo struct {
|
||||
TableInfo `yaml:",inline"`
|
||||
PrimaryKey string `yaml:"primary_key"`
|
||||
FromJsonColumns []FromJsonItem `yaml:"from_json"`
|
||||
}
|
||||
|
||||
type TargetTableInfo struct {
|
||||
|
||||
@@ -6,8 +6,11 @@ import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
mssql "github.com/microsoft/go-mssqldb"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -177,27 +180,57 @@ func (mw *mssqlDbWrapper) SaveMassive(ctx context.Context, schema string, table
|
||||
return rowsAffected, nil
|
||||
}
|
||||
|
||||
func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery) (RowsResult, error) {
|
||||
func buildExtractQueryMssql(q ExtractionQuery) (string, error) {
|
||||
var sbQuery strings.Builder
|
||||
|
||||
sbQuery.WriteString("SELECT ")
|
||||
|
||||
if len(q.Columns) == 0 {
|
||||
sbQuery.WriteString("*")
|
||||
} else {
|
||||
for i, col := range q.Columns {
|
||||
fmt.Fprintf(&sbQuery, "[%s]", col.Name())
|
||||
hasRegularColumns := len(q.Columns) > 0
|
||||
hasJsonColumns := len(q.FromJsonColumns) > 0
|
||||
|
||||
// logrus.Debugf("Extraction query: %+v", q)
|
||||
|
||||
resolvedJson := make(map[string][]config.FromJsonItem, len(q.FromJsonColumns))
|
||||
if hasJsonColumns {
|
||||
for _, jsonConfig := range q.FromJsonColumns {
|
||||
actualColumnName, err := findColumnByPattern(q.Columns, jsonConfig.Column)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
resolvedJson[actualColumnName] = append(resolvedJson[actualColumnName], jsonConfig)
|
||||
}
|
||||
}
|
||||
|
||||
selectParts := make([]string, 0, len(q.Columns)+len(q.FromJsonColumns))
|
||||
if hasRegularColumns {
|
||||
for _, col := range q.Columns {
|
||||
jsonConfigs, isJsonColumn := resolvedJson[col.Name()]
|
||||
if isJsonColumn {
|
||||
for _, jsonConfig := range jsonConfigs {
|
||||
jsonPath := buildJsonPathMssql(jsonConfig.Field)
|
||||
jsonExpr := fmt.Sprintf("JSON_VALUE([%s], '%s') AS [%s]", col.Name(), jsonPath, col.Name())
|
||||
selectParts = append(selectParts, jsonExpr)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
colExpr := fmt.Sprintf("[%s]", col.Name())
|
||||
switch col.Type() {
|
||||
case "GEOMETRY":
|
||||
fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
|
||||
colExpr = fmt.Sprintf("[%s].STAsBinary() AS [%s]", col.Name(), col.Name())
|
||||
}
|
||||
selectParts = append(selectParts, colExpr)
|
||||
}
|
||||
} else if !hasJsonColumns {
|
||||
selectParts = append(selectParts, "*")
|
||||
}
|
||||
|
||||
if i < len(q.Columns)-1 {
|
||||
for i, part := range selectParts {
|
||||
sbQuery.WriteString(part)
|
||||
if i < len(selectParts)-1 {
|
||||
sbQuery.WriteString(", ")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", q.Schema, q.Table)
|
||||
|
||||
@@ -231,9 +264,39 @@ func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery
|
||||
|
||||
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", q.PrimaryKey)
|
||||
|
||||
queryString := sbQuery.String()
|
||||
return sbQuery.String(), nil
|
||||
}
|
||||
|
||||
// logrus.Debugf("Query: %s", queryString)
|
||||
func findColumnByPattern(columns []models.ColumnType, pattern string) (string, error) {
|
||||
if pattern == "" {
|
||||
return "", fmt.Errorf("column pattern cannot be empty")
|
||||
}
|
||||
|
||||
if before, ok := strings.CutSuffix(pattern, "*"); ok {
|
||||
prefix := before
|
||||
for _, col := range columns {
|
||||
if strings.HasPrefix(col.Name(), prefix) {
|
||||
return col.Name(), nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("no column found matching pattern '%s'", pattern)
|
||||
}
|
||||
|
||||
for _, col := range columns {
|
||||
if col.Name() == pattern {
|
||||
return col.Name(), nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("column '%s' not found in table columns", pattern)
|
||||
}
|
||||
|
||||
func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery) (RowsResult, error) {
|
||||
queryString, err := buildExtractQueryMssql(q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logrus.Debugf("Query: %s", queryString)
|
||||
|
||||
var queryArgs []any
|
||||
|
||||
@@ -247,3 +310,11 @@ func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery
|
||||
|
||||
return mw.Query(ctx, queryString, queryArgs...)
|
||||
}
|
||||
|
||||
func buildJsonPathMssql(field string) string {
|
||||
if len(field) > 0 && field[0] == '.' {
|
||||
field = field[1:]
|
||||
}
|
||||
|
||||
return "$." + field
|
||||
}
|
||||
|
||||
396
internal/app/db-wrapper/mssql_test.go
Normal file
396
internal/app/db-wrapper/mssql_test.go
Normal file
@@ -0,0 +1,396 @@
|
||||
package dbwrapper
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
)
|
||||
|
||||
func TestBuildExtractQueryMssql_NoJsonColumns(t *testing.T) {
|
||||
q := ExtractionQuery{
|
||||
Schema: "dbo",
|
||||
Table: "Users",
|
||||
PrimaryKey: "ID",
|
||||
Columns: []models.ColumnType{
|
||||
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||
models.NewColumnType("Name", true, false, "VARCHAR", "varchar", "VARCHAR", true, 255, 0, 0),
|
||||
},
|
||||
FromJsonColumns: []config.FromJsonItem{},
|
||||
LowerLimit: ExtractorQueryLimit{IsValid: false},
|
||||
UpperLimit: ExtractorQueryLimit{IsValid: false},
|
||||
}
|
||||
|
||||
query, err := buildExtractQueryMssql(q)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error, got: %v", err)
|
||||
}
|
||||
|
||||
if !strings.Contains(query, "SELECT [ID], [Name]") {
|
||||
t.Errorf("Expected columns in query, got: %s", query)
|
||||
}
|
||||
|
||||
if !strings.Contains(query, "FROM [dbo].[Users]") {
|
||||
t.Errorf("Expected FROM clause, got: %s", query)
|
||||
}
|
||||
|
||||
if !strings.Contains(query, "ORDER BY [ID] ASC") {
|
||||
t.Errorf("Expected ORDER BY clause, got: %s", query)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildExtractQueryMssql_WithJsonColumns_ExactColumnMatch(t *testing.T) {
|
||||
// Test that the actual column name is used as alias, not a generated one
|
||||
q := ExtractionQuery{
|
||||
Schema: "dbo",
|
||||
Table: "Events",
|
||||
PrimaryKey: "EventID",
|
||||
Columns: []models.ColumnType{
|
||||
models.NewColumnType("EventID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||
models.NewColumnType("EventData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||
},
|
||||
FromJsonColumns: []config.FromJsonItem{
|
||||
{Column: "EventData", Field: ".userId"},
|
||||
{Column: "EventData", Field: ".timestamp"},
|
||||
},
|
||||
LowerLimit: ExtractorQueryLimit{IsValid: false},
|
||||
UpperLimit: ExtractorQueryLimit{IsValid: false},
|
||||
}
|
||||
|
||||
query, err := buildExtractQueryMssql(q)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error, got: %v", err)
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(query, "SELECT [EventID], JSON_VALUE([EventData], '$.userId') AS [EventData], JSON_VALUE([EventData], '$.timestamp') AS [EventData]") {
|
||||
t.Errorf("Expected JSON columns to replace EventData in-order, got: %s", query)
|
||||
}
|
||||
|
||||
if strings.Contains(query, "SELECT [EventID], [EventData]") {
|
||||
t.Errorf("Expected EventData to be replaced by JSON extraction, got: %s", query)
|
||||
}
|
||||
|
||||
// Alias should be exactly "EventData", not "EventData_userId"
|
||||
if !strings.Contains(query, "JSON_VALUE([EventData], '$.userId') AS [EventData]") {
|
||||
t.Errorf("Expected JSON alias to be [EventData], got: %s", query)
|
||||
}
|
||||
|
||||
if !strings.Contains(query, "JSON_VALUE([EventData], '$.timestamp') AS [EventData]") {
|
||||
t.Errorf("Expected JSON alias to be [EventData], got: %s", query)
|
||||
}
|
||||
|
||||
// Should have comma separating them
|
||||
if !strings.Contains(query, "JSON_VALUE([EventData], '$.userId') AS [EventData], JSON_VALUE([EventData], '$.timestamp') AS [EventData]") {
|
||||
t.Errorf("Expected comma-separated JSON values, got: %s", query)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildExtractQueryMssql_WithWildcardPattern(t *testing.T) {
|
||||
// Test that wildcard pattern matching finds the correct column
|
||||
q := ExtractionQuery{
|
||||
Schema: "dbo",
|
||||
Table: "Events",
|
||||
PrimaryKey: "ID",
|
||||
Columns: []models.ColumnType{
|
||||
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||
models.NewColumnType("NodeMetadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||
},
|
||||
FromJsonColumns: []config.FromJsonItem{
|
||||
{Column: "NodeMeta*", Field: ".id"},
|
||||
},
|
||||
LowerLimit: ExtractorQueryLimit{IsValid: false},
|
||||
UpperLimit: ExtractorQueryLimit{IsValid: false},
|
||||
}
|
||||
|
||||
query, err := buildExtractQueryMssql(q)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error, got: %v", err)
|
||||
}
|
||||
|
||||
// Should find "NodeMetadata" from pattern "NodeMeta*" and use it as alias
|
||||
if !strings.Contains(query, "JSON_VALUE([NodeMetadata], '$.id') AS [NodeMetadata]") {
|
||||
t.Errorf("Expected to find and use NodeMetadata column by pattern, got: %s", query)
|
||||
}
|
||||
|
||||
if strings.Contains(query, "SELECT [ID], [NodeMetadata]") {
|
||||
t.Errorf("Expected NodeMetadata to be replaced by JSON extraction, got: %s", query)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildExtractQueryMssql_ColumnNotFound_Error(t *testing.T) {
|
||||
// Test that an error is returned when column is not found
|
||||
q := ExtractionQuery{
|
||||
Schema: "dbo",
|
||||
Table: "Events",
|
||||
PrimaryKey: "ID",
|
||||
Columns: []models.ColumnType{
|
||||
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||
},
|
||||
FromJsonColumns: []config.FromJsonItem{
|
||||
{Column: "NonExistentColumn", Field: ".id"},
|
||||
},
|
||||
LowerLimit: ExtractorQueryLimit{IsValid: false},
|
||||
UpperLimit: ExtractorQueryLimit{IsValid: false},
|
||||
}
|
||||
|
||||
query, err := buildExtractQueryMssql(q)
|
||||
if err == nil {
|
||||
t.Fatalf("Expected error for missing column, got no error. Query: %s", query)
|
||||
}
|
||||
|
||||
if !strings.Contains(err.Error(), "NonExistentColumn") {
|
||||
t.Errorf("Expected error message to contain column name, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildExtractQueryMssql_WildcardPatternNotMatched_Error(t *testing.T) {
|
||||
// Test that an error is returned when wildcard pattern doesn't match any column
|
||||
q := ExtractionQuery{
|
||||
Schema: "dbo",
|
||||
Table: "Events",
|
||||
PrimaryKey: "ID",
|
||||
Columns: []models.ColumnType{
|
||||
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||
models.NewColumnType("EventData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||
},
|
||||
FromJsonColumns: []config.FromJsonItem{
|
||||
{Column: "NonMatching*", Field: ".id"},
|
||||
},
|
||||
LowerLimit: ExtractorQueryLimit{IsValid: false},
|
||||
UpperLimit: ExtractorQueryLimit{IsValid: false},
|
||||
}
|
||||
|
||||
query, err := buildExtractQueryMssql(q)
|
||||
if err == nil {
|
||||
t.Fatalf("Expected error for non-matching wildcard pattern, got no error. Query: %s", query)
|
||||
}
|
||||
|
||||
if !strings.Contains(err.Error(), "NonMatching*") {
|
||||
t.Errorf("Expected error message to contain pattern, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildExtractQueryMssql_NestedJsonFields(t *testing.T) {
|
||||
q := ExtractionQuery{
|
||||
Schema: "dbo",
|
||||
Table: "Data",
|
||||
PrimaryKey: "ID",
|
||||
Columns: []models.ColumnType{
|
||||
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||
models.NewColumnType("NodeData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||
},
|
||||
FromJsonColumns: []config.FromJsonItem{
|
||||
{Column: "NodeData", Field: ".user.name"},
|
||||
{Column: "NodeData", Field: ".user.email"},
|
||||
},
|
||||
LowerLimit: ExtractorQueryLimit{IsValid: false},
|
||||
UpperLimit: ExtractorQueryLimit{IsValid: false},
|
||||
}
|
||||
|
||||
query, err := buildExtractQueryMssql(q)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error, got: %v", err)
|
||||
}
|
||||
|
||||
if !strings.Contains(query, "JSON_VALUE([NodeData], '$.user.name') AS [NodeData]") {
|
||||
t.Errorf("Expected nested JSON path for user.name, got: %s", query)
|
||||
}
|
||||
|
||||
if !strings.Contains(query, "JSON_VALUE([NodeData], '$.user.email') AS [NodeData]") {
|
||||
t.Errorf("Expected nested JSON path for user.email, got: %s", query)
|
||||
}
|
||||
|
||||
if strings.Contains(query, "SELECT [ID], [NodeData]") {
|
||||
t.Errorf("Expected NodeData to be replaced by JSON extraction, got: %s", query)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildExtractQueryMssql_WithRangeLimits(t *testing.T) {
|
||||
q := ExtractionQuery{
|
||||
Schema: "dbo",
|
||||
Table: "Products",
|
||||
PrimaryKey: "ProductID",
|
||||
Columns: []models.ColumnType{
|
||||
models.NewColumnType("ProductID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||
models.NewColumnType("Details", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||
},
|
||||
FromJsonColumns: []config.FromJsonItem{
|
||||
{Column: "Details", Field: ".price"},
|
||||
},
|
||||
LowerLimit: ExtractorQueryLimit{IsValid: true, IsInclusive: true, Value: 100},
|
||||
UpperLimit: ExtractorQueryLimit{IsValid: true, IsInclusive: false, Value: 500},
|
||||
}
|
||||
|
||||
query, err := buildExtractQueryMssql(q)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error, got: %v", err)
|
||||
}
|
||||
|
||||
if !strings.Contains(query, "WHERE [ProductID] >= @min") {
|
||||
t.Errorf("Expected WHERE clause with >=, got: %s", query)
|
||||
}
|
||||
|
||||
if !strings.Contains(query, "[ProductID] < @max") {
|
||||
t.Errorf("Expected upper limit with <, got: %s", query)
|
||||
}
|
||||
|
||||
if !strings.Contains(query, "JSON_VALUE([Details], '$.price') AS [Details]") {
|
||||
t.Errorf("Expected JSON_VALUE for Details, got: %s", query)
|
||||
}
|
||||
|
||||
if strings.Contains(query, "SELECT [ProductID], [Details]") {
|
||||
t.Errorf("Expected Details to be replaced by JSON extraction, got: %s", query)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildJsonPathMssql(t *testing.T) {
|
||||
tests := []struct {
|
||||
input string
|
||||
expected string
|
||||
}{
|
||||
{".id", "$.id"},
|
||||
{"id", "$.id"},
|
||||
{".user.name", "$.user.name"},
|
||||
{"user.name", "$.user.name"},
|
||||
{".location.coordinates.lat", "$.location.coordinates.lat"},
|
||||
{"", "$."},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
result := buildJsonPathMssql(tt.input)
|
||||
if result != tt.expected {
|
||||
t.Errorf("buildJsonPathMssql(%q) = %q, want %q", tt.input, result, tt.expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindColumnByPattern_ExactMatch(t *testing.T) {
|
||||
columns := []models.ColumnType{
|
||||
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||
models.NewColumnType("Metadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||
models.NewColumnType("EventData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||
}
|
||||
|
||||
result, err := findColumnByPattern(columns, "Metadata")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error, got: %v", err)
|
||||
}
|
||||
|
||||
if result != "Metadata" {
|
||||
t.Errorf("Expected 'Metadata', got '%s'", result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindColumnByPattern_WildcardMatch(t *testing.T) {
|
||||
columns := []models.ColumnType{
|
||||
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||
models.NewColumnType("NodeMetadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||
models.NewColumnType("EventData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||
}
|
||||
|
||||
result, err := findColumnByPattern(columns, "NodeMeta*")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error, got: %v", err)
|
||||
}
|
||||
|
||||
if result != "NodeMetadata" {
|
||||
t.Errorf("Expected 'NodeMetadata', got '%s'", result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindColumnByPattern_NotFound(t *testing.T) {
|
||||
columns := []models.ColumnType{
|
||||
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||
models.NewColumnType("Metadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||
}
|
||||
|
||||
result, err := findColumnByPattern(columns, "NonExistent")
|
||||
if err == nil {
|
||||
t.Fatalf("Expected error, got no error. Result: %s", result)
|
||||
}
|
||||
|
||||
if !strings.Contains(err.Error(), "NonExistent") {
|
||||
t.Errorf("Expected error to contain column name, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindColumnByPattern_WildcardNotFound(t *testing.T) {
|
||||
columns := []models.ColumnType{
|
||||
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||
models.NewColumnType("Metadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||
}
|
||||
|
||||
result, err := findColumnByPattern(columns, "Event*")
|
||||
if err == nil {
|
||||
t.Fatalf("Expected error, got no error. Result: %s", result)
|
||||
}
|
||||
|
||||
if !strings.Contains(err.Error(), "Event*") {
|
||||
t.Errorf("Expected error to contain pattern, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildExtractQueryMssql_OnlyJsonColumns(t *testing.T) {
|
||||
// Test when all columns are used via JSON extraction
|
||||
q := ExtractionQuery{
|
||||
Schema: "dbo",
|
||||
Table: "Data",
|
||||
PrimaryKey: "ID",
|
||||
Columns: []models.ColumnType{
|
||||
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||
models.NewColumnType("JsonData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
|
||||
},
|
||||
FromJsonColumns: []config.FromJsonItem{
|
||||
{Column: "JsonData", Field: ".field1"},
|
||||
},
|
||||
LowerLimit: ExtractorQueryLimit{IsValid: false},
|
||||
UpperLimit: ExtractorQueryLimit{IsValid: false},
|
||||
}
|
||||
|
||||
query, err := buildExtractQueryMssql(q)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error, got: %v", err)
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(query, "SELECT [ID], JSON_VALUE([JsonData], '$.field1') AS [JsonData]") {
|
||||
t.Errorf("Expected JsonData to be replaced by JSON extraction, got: %s", query)
|
||||
}
|
||||
|
||||
if strings.Contains(query, "SELECT [ID], [JsonData]") {
|
||||
t.Errorf("Expected JsonData to be excluded from raw selection, got: %s", query)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildExtractQueryMssql_JsonColumnsReplaceInOrder(t *testing.T) {
|
||||
q := ExtractionQuery{
|
||||
Schema: "dbo",
|
||||
Table: "Users",
|
||||
PrimaryKey: "UserID",
|
||||
Columns: []models.ColumnType{
|
||||
models.NewColumnType("UserID", false, false, "INT", "int", "INT", false, 0, 0, 0),
|
||||
models.NewColumnType("Name", true, false, "VARCHAR", "varchar", "VARCHAR", false, 255, 0, 0),
|
||||
models.NewColumnType("Email", true, false, "VARCHAR", "varchar", "VARCHAR", false, 255, 0, 0),
|
||||
models.NewColumnType("Metadata", true, false, "NVARCHAR", "nvarchar", "NVARCHAR", true, 4000, 0, 0),
|
||||
models.NewColumnType("Profile", true, false, "NVARCHAR", "nvarchar", "NVARCHAR", true, 4000, 0, 0),
|
||||
models.NewColumnType("Settings", true, false, "NVARCHAR", "nvarchar", "NVARCHAR", true, 4000, 0, 0),
|
||||
},
|
||||
FromJsonColumns: []config.FromJsonItem{
|
||||
{Column: "Metadata", Field: ".id"},
|
||||
{Column: "Profile", Field: ".id"},
|
||||
{Column: "Settings", Field: ".id"},
|
||||
},
|
||||
LowerLimit: ExtractorQueryLimit{IsValid: false},
|
||||
UpperLimit: ExtractorQueryLimit{IsValid: false},
|
||||
}
|
||||
|
||||
query, err := buildExtractQueryMssql(q)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error, got: %v", err)
|
||||
}
|
||||
|
||||
expected := "SELECT [UserID], [Name], [Email], JSON_VALUE([Metadata], '$.id') AS [Metadata], JSON_VALUE([Profile], '$.id') AS [Profile], JSON_VALUE([Settings], '$.id') AS [Settings] FROM [dbo].[Users] ORDER BY [UserID] ASC"
|
||||
if query != expected {
|
||||
t.Errorf("Unexpected query.\nExpected: %s\nGot: %s", expected, query)
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
)
|
||||
|
||||
@@ -39,6 +40,7 @@ type ExtractionQuery struct {
|
||||
Columns []models.ColumnType
|
||||
LowerLimit ExtractorQueryLimit
|
||||
UpperLimit ExtractorQueryLimit
|
||||
FromJsonColumns []config.FromJsonItem
|
||||
}
|
||||
|
||||
type DbWrapper interface {
|
||||
|
||||
@@ -25,6 +25,7 @@ func (ex *GenericExtractor) Consume(
|
||||
chErrorsOut chan<- custom_errors.JobError,
|
||||
wgActivePartitions *sync.WaitGroup,
|
||||
rowsRead *int64,
|
||||
fromJsonColumns []config.FromJsonItem,
|
||||
) {
|
||||
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
|
||||
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
|
||||
@@ -65,6 +66,7 @@ func (ex *GenericExtractor) Consume(
|
||||
indexPrimaryKey,
|
||||
retryConfig,
|
||||
chBatchesOut,
|
||||
fromJsonColumns,
|
||||
)
|
||||
wgActivePartitions.Done()
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ func (ex *GenericExtractor) ProcessPartitionWithRetries(
|
||||
indexPrimaryKey int,
|
||||
retryConfig config.RetryConfig,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
fromJsonColumns []config.FromJsonItem,
|
||||
) (int64, error) {
|
||||
var totalRowsRead int64
|
||||
currentParitition := partition
|
||||
@@ -35,6 +36,7 @@ func (ex *GenericExtractor) ProcessPartitionWithRetries(
|
||||
currentParitition,
|
||||
indexPrimaryKey,
|
||||
chBatchesOut,
|
||||
fromJsonColumns,
|
||||
)
|
||||
// logrus.Debugf("Partition %v finished processing (%s.%s)", partition.Id, tableInfo.Schema, tableInfo.Table)
|
||||
totalRowsRead += rowsRead
|
||||
|
||||
@@ -47,6 +47,7 @@ func (ex *GenericExtractor) ProcessPartition(
|
||||
partition models.Partition,
|
||||
indexPrimaryKey int,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
fromJsonColumns []config.FromJsonItem,
|
||||
) (int64, error) {
|
||||
query := dbwrapper.ExtractionQuery{
|
||||
Schema: tableInfo.Schema,
|
||||
@@ -63,6 +64,7 @@ func (ex *GenericExtractor) ProcessPartition(
|
||||
IsInclusive: partition.Range.IsMaxInclusive,
|
||||
Value: partition.Range.Max,
|
||||
},
|
||||
FromJsonColumns: fromJsonColumns,
|
||||
}
|
||||
|
||||
// logrus.Debugf("Processing partition: %+v (%s.%s)", query, tableInfo.Schema, tableInfo.Table)
|
||||
|
||||
@@ -38,7 +38,7 @@ func (gl *GenericLoader) Consume(
|
||||
return
|
||||
}
|
||||
|
||||
processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
|
||||
processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, batch)
|
||||
wgActiveBatches.Done()
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -1,116 +1,13 @@
|
||||
package loaders
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
)
|
||||
|
||||
type GenericLoader struct {
|
||||
db dbwrapper.DbWrapper
|
||||
}
|
||||
|
||||
func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader {
|
||||
return &GenericLoader{db: db}
|
||||
}
|
||||
|
||||
func (gl *GenericLoader) ProcessBatch(
|
||||
ctx context.Context,
|
||||
tableInfo config.TargetTableInfo,
|
||||
colNames []string,
|
||||
batch models.Batch,
|
||||
) (int, error) {
|
||||
_, err := gl.db.SaveMassive(
|
||||
ctx,
|
||||
tableInfo.Schema,
|
||||
tableInfo.Table,
|
||||
colNames,
|
||||
batch.Rows,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
if pgErr, ok := errors.AsType[*pgconn.PgError](err); ok {
|
||||
if pgErr.Code == "23505" {
|
||||
return 0, &custom_errors.JobError{
|
||||
ShouldCancelJob: true,
|
||||
Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table),
|
||||
Prev: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0, &custom_errors.LoaderError{Batch: batch, Msg: err.Error()}
|
||||
}
|
||||
|
||||
return len(batch.Rows), nil
|
||||
}
|
||||
|
||||
func (gl *GenericLoader) Exec(
|
||||
ctx context.Context,
|
||||
tableInfo config.TargetTableInfo,
|
||||
columns []models.ColumnType,
|
||||
chBatchesIn <-chan models.Batch,
|
||||
chErrorsOut chan<- custom_errors.LoaderError,
|
||||
chJobErrorsOut chan<- custom_errors.JobError,
|
||||
wgActiveBatches *sync.WaitGroup,
|
||||
rowsLoaded *int64,
|
||||
) {
|
||||
colNames := mapSlice(columns, func(col models.ColumnType) string {
|
||||
return col.Name()
|
||||
})
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case batch, ok := <-chBatchesIn:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
|
||||
|
||||
if err != nil {
|
||||
var ldError *custom_errors.LoaderError
|
||||
var jobError *custom_errors.JobError
|
||||
if errors.As(err, &ldError) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- *ldError:
|
||||
}
|
||||
} else if errors.As(err, &jobError) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chJobErrorsOut <- *jobError:
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- custom_errors.LoaderError{Batch: batch, Msg: err.Error()}:
|
||||
}
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
wgActiveBatches.Done()
|
||||
atomic.AddInt64(rowsLoaded, int64(processedRows))
|
||||
}
|
||||
}
|
||||
func NewGenericLoader(db dbwrapper.DbWrapper) GenericLoader {
|
||||
return GenericLoader{db: db}
|
||||
}
|
||||
|
||||
50
internal/app/etl/loaders/process-with-retries.go
Normal file
50
internal/app/etl/loaders/process-with-retries.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package loaders
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
)
|
||||
|
||||
func (gl *GenericLoader) ProcessBatchWithRetries(
|
||||
ctx context.Context,
|
||||
tableInfo config.TargetTableInfo,
|
||||
colNames []string,
|
||||
retryConfig config.RetryConfig,
|
||||
batch models.Batch,
|
||||
) (int64, error) {
|
||||
for {
|
||||
rowsLoaded, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
|
||||
if err == nil {
|
||||
return rowsLoaded, nil
|
||||
}
|
||||
|
||||
if btError, ok := errors.AsType[*custom_errors.LoaderError](err); ok {
|
||||
batch.RetryCounter++
|
||||
|
||||
if batch.RetryCounter >= retryConfig.Attempts {
|
||||
return rowsLoaded, &custom_errors.JobError{
|
||||
ShouldCancelJob: false,
|
||||
Msg: fmt.Sprintf("Temporal error in batch %v (retries: %d)", btError.Batch.Id, btError.Batch.RetryCounter),
|
||||
Prev: btError,
|
||||
}
|
||||
}
|
||||
|
||||
delay := custom_errors.ComputeBackoffDelay(
|
||||
batch.RetryCounter,
|
||||
retryConfig.BaseDelayMs,
|
||||
retryConfig.MaxDelayMs,
|
||||
retryConfig.MaxJitterMs,
|
||||
)
|
||||
time.Sleep(delay)
|
||||
continue
|
||||
}
|
||||
|
||||
return rowsLoaded, err
|
||||
}
|
||||
}
|
||||
43
internal/app/etl/loaders/process.go
Normal file
43
internal/app/etl/loaders/process.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package loaders
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
)
|
||||
|
||||
func (gl *GenericLoader) ProcessBatch(
|
||||
ctx context.Context,
|
||||
tableInfo config.TargetTableInfo,
|
||||
colNames []string,
|
||||
batch models.Batch,
|
||||
) (int64, error) {
|
||||
_, err := gl.db.SaveMassive(
|
||||
ctx,
|
||||
tableInfo.Schema,
|
||||
tableInfo.Table,
|
||||
colNames,
|
||||
batch.Rows,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
if pgErr, ok := errors.AsType[*pgconn.PgError](err); ok {
|
||||
if pgErr.Code == "23505" {
|
||||
return 0, &custom_errors.JobError{
|
||||
ShouldCancelJob: true,
|
||||
Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table),
|
||||
Prev: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0, &custom_errors.LoaderError{Batch: batch, Msg: err.Error()}
|
||||
}
|
||||
|
||||
return int64(len(batch.Rows)), nil
|
||||
}
|
||||
@@ -51,18 +51,7 @@ type Loader interface {
|
||||
tableInfo config.TargetTableInfo,
|
||||
colNames []string,
|
||||
batch models.Batch,
|
||||
) (int, error)
|
||||
|
||||
Exec(
|
||||
ctx context.Context,
|
||||
tableInfo config.TargetTableInfo,
|
||||
columns []models.ColumnType,
|
||||
chBatchesIn <-chan models.Batch,
|
||||
chErrorsOut chan<- custom_errors.LoaderError,
|
||||
chJobErrorsOut chan<- custom_errors.JobError,
|
||||
wgActiveBatches *sync.WaitGroup,
|
||||
rowsLoaded *int64,
|
||||
)
|
||||
) (int64, error)
|
||||
}
|
||||
|
||||
type TableAnalyzer interface {
|
||||
|
||||
Reference in New Issue
Block a user