diff --git a/config.yaml b/config.yaml index a46879c..9c50787 100644 --- a/config.yaml +++ b/config.yaml @@ -46,6 +46,9 @@ jobs: schema: Red table: PUERTO primary_key: ID_PUERTO + from_json: + - column: $node_id* + field: id target: schema: Red table: PUERTO diff --git a/internal/app/config/migration.go b/internal/app/config/migration.go index 51baa7c..9400b50 100644 --- a/internal/app/config/migration.go +++ b/internal/app/config/migration.go @@ -43,14 +43,20 @@ type JobConfig struct { ToStorage ToStorageConfig `yaml:"to_storage"` } +type FromJsonItem struct { + Column string `yaml:"column"` + Field string `yaml:"field"` +} + type TableInfo struct { Schema string `yaml:"schema"` Table string `yaml:"table"` } type SourceTableInfo struct { - TableInfo `yaml:",inline"` - PrimaryKey string `yaml:"primary_key"` + TableInfo `yaml:",inline"` + PrimaryKey string `yaml:"primary_key"` + FromJsonConfig []FromJsonItem `yaml:"from_json"` } type TargetTableInfo struct { diff --git a/internal/app/db-wrapper/mssql.go b/internal/app/db-wrapper/mssql.go index 03c9284..f6ef39e 100644 --- a/internal/app/db-wrapper/mssql.go +++ b/internal/app/db-wrapper/mssql.go @@ -6,8 +6,11 @@ import ( "fmt" "strings" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" mssql "github.com/microsoft/go-mssqldb" + "github.com/sirupsen/logrus" ) func init() { @@ -177,25 +180,53 @@ func (mw *mssqlDbWrapper) SaveMassive(ctx context.Context, schema string, table return rowsAffected, nil } -func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery) (RowsResult, error) { +func buildExtractQueryMssql(q ExtractionQuery) (string, error) { var sbQuery strings.Builder sbQuery.WriteString("SELECT ") - if len(q.Columns) == 0 { - sbQuery.WriteString("*") - } else { - for i, col := range q.Columns { - fmt.Fprintf(&sbQuery, "[%s]", col.Name()) + hasRegularColumns := len(q.Columns) > 0 + hasJsonColumns := len(q.FromJsonColumns) > 0 + resolvedJson := make(map[string][]config.FromJsonItem, len(q.FromJsonColumns)) + if hasJsonColumns { + for _, jsonConfig := range q.FromJsonColumns { + actualColumnName, err := findColumnByPattern(q.Columns, jsonConfig.Column) + if err != nil { + return "", err + } + resolvedJson[actualColumnName] = append(resolvedJson[actualColumnName], jsonConfig) + } + } + + selectParts := make([]string, 0, len(q.Columns)+len(q.FromJsonColumns)) + if hasRegularColumns { + for _, col := range q.Columns { + jsonConfigs, isJsonColumn := resolvedJson[col.Name()] + if isJsonColumn { + for _, jsonConfig := range jsonConfigs { + jsonPath := buildJsonPathMssql(jsonConfig.Field) + jsonExpr := fmt.Sprintf("JSON_VALUE([%s], '%s') AS [%s]", col.Name(), jsonPath, col.Name()) + selectParts = append(selectParts, jsonExpr) + } + continue + } + + colExpr := fmt.Sprintf("[%s]", col.Name()) switch col.Type() { case "GEOMETRY": - fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name()) + colExpr = fmt.Sprintf("[%s].STAsBinary() AS [%s]", col.Name(), col.Name()) } + selectParts = append(selectParts, colExpr) + } + } else if !hasJsonColumns { + selectParts = append(selectParts, "*") + } - if i < len(q.Columns)-1 { - sbQuery.WriteString(", ") - } + for i, part := range selectParts { + sbQuery.WriteString(part) + if i < len(selectParts)-1 { + sbQuery.WriteString(", ") } } @@ -231,9 +262,39 @@ func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", q.PrimaryKey) - queryString := sbQuery.String() + return sbQuery.String(), nil +} - // logrus.Debugf("Query: %s", queryString) +func findColumnByPattern(columns []models.ColumnType, pattern string) (string, error) { + if pattern == "" { + return "", fmt.Errorf("column pattern cannot be empty") + } + + if before, ok := strings.CutSuffix(pattern, "*"); ok { + prefix := before + for _, col := range columns { + if strings.HasPrefix(col.Name(), prefix) { + return col.Name(), nil + } + } + return "", fmt.Errorf("no column found matching pattern '%s'", pattern) + } + + for _, col := range columns { + if col.Name() == pattern { + return col.Name(), nil + } + } + return "", fmt.Errorf("column '%s' not found in table columns", pattern) +} + +func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery) (RowsResult, error) { + queryString, err := buildExtractQueryMssql(q) + if err != nil { + return nil, err + } + + logrus.Debugf("Query: %s", queryString) var queryArgs []any @@ -247,3 +308,11 @@ func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery return mw.Query(ctx, queryString, queryArgs...) } + +func buildJsonPathMssql(field string) string { + if len(field) > 0 && field[0] == '.' { + field = field[1:] + } + + return "$." + field +} diff --git a/internal/app/db-wrapper/mssql_test.go b/internal/app/db-wrapper/mssql_test.go new file mode 100644 index 0000000..d7abf0e --- /dev/null +++ b/internal/app/db-wrapper/mssql_test.go @@ -0,0 +1,396 @@ +package dbwrapper + +import ( + "strings" + "testing" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" +) + +func TestBuildExtractQueryMssql_NoJsonColumns(t *testing.T) { + q := ExtractionQuery{ + Schema: "dbo", + Table: "Users", + PrimaryKey: "ID", + Columns: []models.ColumnType{ + models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0), + models.NewColumnType("Name", true, false, "VARCHAR", "varchar", "VARCHAR", true, 255, 0, 0), + }, + FromJsonColumns: []config.FromJsonItem{}, + LowerLimit: ExtractorQueryLimit{IsValid: false}, + UpperLimit: ExtractorQueryLimit{IsValid: false}, + } + + query, err := buildExtractQueryMssql(q) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if !strings.Contains(query, "SELECT [ID], [Name]") { + t.Errorf("Expected columns in query, got: %s", query) + } + + if !strings.Contains(query, "FROM [dbo].[Users]") { + t.Errorf("Expected FROM clause, got: %s", query) + } + + if !strings.Contains(query, "ORDER BY [ID] ASC") { + t.Errorf("Expected ORDER BY clause, got: %s", query) + } +} + +func TestBuildExtractQueryMssql_WithJsonColumns_ExactColumnMatch(t *testing.T) { + // Test that the actual column name is used as alias, not a generated one + q := ExtractionQuery{ + Schema: "dbo", + Table: "Events", + PrimaryKey: "EventID", + Columns: []models.ColumnType{ + models.NewColumnType("EventID", false, false, "INT", "int", "INT", false, 0, 0, 0), + models.NewColumnType("EventData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0), + }, + FromJsonColumns: []config.FromJsonItem{ + {Column: "EventData", Field: ".userId"}, + {Column: "EventData", Field: ".timestamp"}, + }, + LowerLimit: ExtractorQueryLimit{IsValid: false}, + UpperLimit: ExtractorQueryLimit{IsValid: false}, + } + + query, err := buildExtractQueryMssql(q) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if !strings.HasPrefix(query, "SELECT [EventID], JSON_VALUE([EventData], '$.userId') AS [EventData], JSON_VALUE([EventData], '$.timestamp') AS [EventData]") { + t.Errorf("Expected JSON columns to replace EventData in-order, got: %s", query) + } + + if strings.Contains(query, "SELECT [EventID], [EventData]") { + t.Errorf("Expected EventData to be replaced by JSON extraction, got: %s", query) + } + + // Alias should be exactly "EventData", not "EventData_userId" + if !strings.Contains(query, "JSON_VALUE([EventData], '$.userId') AS [EventData]") { + t.Errorf("Expected JSON alias to be [EventData], got: %s", query) + } + + if !strings.Contains(query, "JSON_VALUE([EventData], '$.timestamp') AS [EventData]") { + t.Errorf("Expected JSON alias to be [EventData], got: %s", query) + } + + // Should have comma separating them + if !strings.Contains(query, "JSON_VALUE([EventData], '$.userId') AS [EventData], JSON_VALUE([EventData], '$.timestamp') AS [EventData]") { + t.Errorf("Expected comma-separated JSON values, got: %s", query) + } +} + +func TestBuildExtractQueryMssql_WithWildcardPattern(t *testing.T) { + // Test that wildcard pattern matching finds the correct column + q := ExtractionQuery{ + Schema: "dbo", + Table: "Events", + PrimaryKey: "ID", + Columns: []models.ColumnType{ + models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0), + models.NewColumnType("NodeMetadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0), + }, + FromJsonColumns: []config.FromJsonItem{ + {Column: "NodeMeta*", Field: ".id"}, + }, + LowerLimit: ExtractorQueryLimit{IsValid: false}, + UpperLimit: ExtractorQueryLimit{IsValid: false}, + } + + query, err := buildExtractQueryMssql(q) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + // Should find "NodeMetadata" from pattern "NodeMeta*" and use it as alias + if !strings.Contains(query, "JSON_VALUE([NodeMetadata], '$.id') AS [NodeMetadata]") { + t.Errorf("Expected to find and use NodeMetadata column by pattern, got: %s", query) + } + + if strings.Contains(query, "SELECT [ID], [NodeMetadata]") { + t.Errorf("Expected NodeMetadata to be replaced by JSON extraction, got: %s", query) + } +} + +func TestBuildExtractQueryMssql_ColumnNotFound_Error(t *testing.T) { + // Test that an error is returned when column is not found + q := ExtractionQuery{ + Schema: "dbo", + Table: "Events", + PrimaryKey: "ID", + Columns: []models.ColumnType{ + models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0), + }, + FromJsonColumns: []config.FromJsonItem{ + {Column: "NonExistentColumn", Field: ".id"}, + }, + LowerLimit: ExtractorQueryLimit{IsValid: false}, + UpperLimit: ExtractorQueryLimit{IsValid: false}, + } + + query, err := buildExtractQueryMssql(q) + if err == nil { + t.Fatalf("Expected error for missing column, got no error. Query: %s", query) + } + + if !strings.Contains(err.Error(), "NonExistentColumn") { + t.Errorf("Expected error message to contain column name, got: %v", err) + } +} + +func TestBuildExtractQueryMssql_WildcardPatternNotMatched_Error(t *testing.T) { + // Test that an error is returned when wildcard pattern doesn't match any column + q := ExtractionQuery{ + Schema: "dbo", + Table: "Events", + PrimaryKey: "ID", + Columns: []models.ColumnType{ + models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0), + models.NewColumnType("EventData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0), + }, + FromJsonColumns: []config.FromJsonItem{ + {Column: "NonMatching*", Field: ".id"}, + }, + LowerLimit: ExtractorQueryLimit{IsValid: false}, + UpperLimit: ExtractorQueryLimit{IsValid: false}, + } + + query, err := buildExtractQueryMssql(q) + if err == nil { + t.Fatalf("Expected error for non-matching wildcard pattern, got no error. Query: %s", query) + } + + if !strings.Contains(err.Error(), "NonMatching*") { + t.Errorf("Expected error message to contain pattern, got: %v", err) + } +} + +func TestBuildExtractQueryMssql_NestedJsonFields(t *testing.T) { + q := ExtractionQuery{ + Schema: "dbo", + Table: "Data", + PrimaryKey: "ID", + Columns: []models.ColumnType{ + models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0), + models.NewColumnType("NodeData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0), + }, + FromJsonColumns: []config.FromJsonItem{ + {Column: "NodeData", Field: ".user.name"}, + {Column: "NodeData", Field: ".user.email"}, + }, + LowerLimit: ExtractorQueryLimit{IsValid: false}, + UpperLimit: ExtractorQueryLimit{IsValid: false}, + } + + query, err := buildExtractQueryMssql(q) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if !strings.Contains(query, "JSON_VALUE([NodeData], '$.user.name') AS [NodeData]") { + t.Errorf("Expected nested JSON path for user.name, got: %s", query) + } + + if !strings.Contains(query, "JSON_VALUE([NodeData], '$.user.email') AS [NodeData]") { + t.Errorf("Expected nested JSON path for user.email, got: %s", query) + } + + if strings.Contains(query, "SELECT [ID], [NodeData]") { + t.Errorf("Expected NodeData to be replaced by JSON extraction, got: %s", query) + } +} + +func TestBuildExtractQueryMssql_WithRangeLimits(t *testing.T) { + q := ExtractionQuery{ + Schema: "dbo", + Table: "Products", + PrimaryKey: "ProductID", + Columns: []models.ColumnType{ + models.NewColumnType("ProductID", false, false, "INT", "int", "INT", false, 0, 0, 0), + models.NewColumnType("Details", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0), + }, + FromJsonColumns: []config.FromJsonItem{ + {Column: "Details", Field: ".price"}, + }, + LowerLimit: ExtractorQueryLimit{IsValid: true, IsInclusive: true, Value: 100}, + UpperLimit: ExtractorQueryLimit{IsValid: true, IsInclusive: false, Value: 500}, + } + + query, err := buildExtractQueryMssql(q) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if !strings.Contains(query, "WHERE [ProductID] >= @min") { + t.Errorf("Expected WHERE clause with >=, got: %s", query) + } + + if !strings.Contains(query, "[ProductID] < @max") { + t.Errorf("Expected upper limit with <, got: %s", query) + } + + if !strings.Contains(query, "JSON_VALUE([Details], '$.price') AS [Details]") { + t.Errorf("Expected JSON_VALUE for Details, got: %s", query) + } + + if strings.Contains(query, "SELECT [ProductID], [Details]") { + t.Errorf("Expected Details to be replaced by JSON extraction, got: %s", query) + } +} + +func TestBuildJsonPathMssql(t *testing.T) { + tests := []struct { + input string + expected string + }{ + {".id", "$.id"}, + {"id", "$.id"}, + {".user.name", "$.user.name"}, + {"user.name", "$.user.name"}, + {".location.coordinates.lat", "$.location.coordinates.lat"}, + {"", "$."}, + } + + for _, tt := range tests { + result := buildJsonPathMssql(tt.input) + if result != tt.expected { + t.Errorf("buildJsonPathMssql(%q) = %q, want %q", tt.input, result, tt.expected) + } + } +} + +func TestFindColumnByPattern_ExactMatch(t *testing.T) { + columns := []models.ColumnType{ + models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0), + models.NewColumnType("Metadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0), + models.NewColumnType("EventData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0), + } + + result, err := findColumnByPattern(columns, "Metadata") + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if result != "Metadata" { + t.Errorf("Expected 'Metadata', got '%s'", result) + } +} + +func TestFindColumnByPattern_WildcardMatch(t *testing.T) { + columns := []models.ColumnType{ + models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0), + models.NewColumnType("NodeMetadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0), + models.NewColumnType("EventData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0), + } + + result, err := findColumnByPattern(columns, "NodeMeta*") + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if result != "NodeMetadata" { + t.Errorf("Expected 'NodeMetadata', got '%s'", result) + } +} + +func TestFindColumnByPattern_NotFound(t *testing.T) { + columns := []models.ColumnType{ + models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0), + models.NewColumnType("Metadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0), + } + + result, err := findColumnByPattern(columns, "NonExistent") + if err == nil { + t.Fatalf("Expected error, got no error. Result: %s", result) + } + + if !strings.Contains(err.Error(), "NonExistent") { + t.Errorf("Expected error to contain column name, got: %v", err) + } +} + +func TestFindColumnByPattern_WildcardNotFound(t *testing.T) { + columns := []models.ColumnType{ + models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0), + models.NewColumnType("Metadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0), + } + + result, err := findColumnByPattern(columns, "Event*") + if err == nil { + t.Fatalf("Expected error, got no error. Result: %s", result) + } + + if !strings.Contains(err.Error(), "Event*") { + t.Errorf("Expected error to contain pattern, got: %v", err) + } +} + +func TestBuildExtractQueryMssql_OnlyJsonColumns(t *testing.T) { + // Test when all columns are used via JSON extraction + q := ExtractionQuery{ + Schema: "dbo", + Table: "Data", + PrimaryKey: "ID", + Columns: []models.ColumnType{ + models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0), + models.NewColumnType("JsonData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0), + }, + FromJsonColumns: []config.FromJsonItem{ + {Column: "JsonData", Field: ".field1"}, + }, + LowerLimit: ExtractorQueryLimit{IsValid: false}, + UpperLimit: ExtractorQueryLimit{IsValid: false}, + } + + query, err := buildExtractQueryMssql(q) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if !strings.HasPrefix(query, "SELECT [ID], JSON_VALUE([JsonData], '$.field1') AS [JsonData]") { + t.Errorf("Expected JsonData to be replaced by JSON extraction, got: %s", query) + } + + if strings.Contains(query, "SELECT [ID], [JsonData]") { + t.Errorf("Expected JsonData to be excluded from raw selection, got: %s", query) + } +} + +func TestBuildExtractQueryMssql_JsonColumnsReplaceInOrder(t *testing.T) { + q := ExtractionQuery{ + Schema: "dbo", + Table: "Users", + PrimaryKey: "UserID", + Columns: []models.ColumnType{ + models.NewColumnType("UserID", false, false, "INT", "int", "INT", false, 0, 0, 0), + models.NewColumnType("Name", true, false, "VARCHAR", "varchar", "VARCHAR", false, 255, 0, 0), + models.NewColumnType("Email", true, false, "VARCHAR", "varchar", "VARCHAR", false, 255, 0, 0), + models.NewColumnType("Metadata", true, false, "NVARCHAR", "nvarchar", "NVARCHAR", true, 4000, 0, 0), + models.NewColumnType("Profile", true, false, "NVARCHAR", "nvarchar", "NVARCHAR", true, 4000, 0, 0), + models.NewColumnType("Settings", true, false, "NVARCHAR", "nvarchar", "NVARCHAR", true, 4000, 0, 0), + }, + FromJsonColumns: []config.FromJsonItem{ + {Column: "Metadata", Field: ".id"}, + {Column: "Profile", Field: ".id"}, + {Column: "Settings", Field: ".id"}, + }, + LowerLimit: ExtractorQueryLimit{IsValid: false}, + UpperLimit: ExtractorQueryLimit{IsValid: false}, + } + + query, err := buildExtractQueryMssql(q) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + expected := "SELECT [UserID], [Name], [Email], JSON_VALUE([Metadata], '$.id') AS [Metadata], JSON_VALUE([Profile], '$.id') AS [Profile], JSON_VALUE([Settings], '$.id') AS [Settings] FROM [dbo].[Users] ORDER BY [UserID] ASC" + if query != expected { + t.Errorf("Unexpected query.\nExpected: %s\nGot: %s", expected, query) + } +} diff --git a/internal/app/db-wrapper/types.go b/internal/app/db-wrapper/types.go index e194710..7bc99e0 100644 --- a/internal/app/db-wrapper/types.go +++ b/internal/app/db-wrapper/types.go @@ -4,6 +4,7 @@ import ( "context" "errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" ) @@ -33,12 +34,13 @@ type ExtractorQueryLimit struct { } type ExtractionQuery struct { - Schema string - Table string - PrimaryKey string - Columns []models.ColumnType - LowerLimit ExtractorQueryLimit - UpperLimit ExtractorQueryLimit + Schema string + Table string + PrimaryKey string + Columns []models.ColumnType + LowerLimit ExtractorQueryLimit + UpperLimit ExtractorQueryLimit + FromJsonColumns []config.FromJsonItem } type DbWrapper interface {