diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index 355d748..3fb3a5a 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -4,12 +4,13 @@ import ( "context" "errors" "fmt" + "strings" "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db" "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgxpool" log "github.com/sirupsen/logrus" ) @@ -21,10 +22,35 @@ func main() { log.SetLevel(log.DebugLevel) log.Info("Starting migration...") - rowValues, colNames, err := extractData() + + ctxSource, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + sourcePool, err := db.Connect(ctxSource, config.App.FromDb.Url) + defer db.Close(sourcePool) + if err != nil { + log.Fatal(err) + } + log.Info("Successfully connected to from_db") + + ctxTarget, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + targetPool, err := db.Connect(ctxTarget, config.App.ToDb.Url) + defer db.Close(targetPool) + if err != nil { + log.Fatal(err) + } + log.Info("Successfully connected to to_db") + + schema := "test" + table := "migration_test" + colNames := []string{"id", "nombre_producto", "descripcion", "stock", "precio", "es_activo", "fecha_creacion", "ultima_actualizacion", "configuracion_json", "etiquetas", "binario_test", "ip_servidor", "rango_prueba"} + + rowValues, err := extractData(ctxSource, sourcePool, schema, table, colNames, 2) if err != nil { - log.Fatal("Unexpected error when extracting data") + log.Fatal("Unexpected error when extracting data", err) } for index, row := range rowValues { @@ -34,50 +60,48 @@ func main() { } } + insertedRows, err := insertData(ctxTarget, targetPool, schema, table, colNames, rowValues) + if err != nil { + log.Fatal("Unexpected error when inserting rows: ", err) + } + + log.Infof("Inserted rows: %d", insertedRows) + log.Info("Migration completed successfully!") } -func extractData() ([][]any, []string, error) { - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() +func buildExtractSqlSentence(schema, table string, colNames []string) string { + var sbColumns strings.Builder - pool, err := db.Connect(ctx, config.App.FromDb.Url) - defer db.Close(pool) - if err != nil { - log.Fatal(err) + for i, col := range colNames { + sbColumns.WriteString(`"`) + sbColumns.WriteString(col) + sbColumns.WriteString(`"`) + if i < len(colNames)-1 { + sbColumns.WriteString(", ") + } } - log.Info("Successfully connected to from_db") - limit := 2 + return fmt.Sprintf(`SELECT %s FROM "%s"."%s" LIMIT $1`, sbColumns.String(), schema, table) +} - /* This query will become dinamyc later */ - sql := ` -SELECT - id, - nombre_producto, - descripcion, - stock, - precio, - es_activo, - fecha_creacion, - ultima_actualizacion, - configuracion_json, - etiquetas, - binario_test, - ip_servidor, - rango_prueba -FROM test.migration_test -LIMIT $1; -` +func extractData(ctx context.Context, sourcePool *pgxpool.Pool, schema string, table string, colNames []string, limit int) ([][]any, error) { + if len(colNames) == 0 { + return nil, errors.New("Empty column names received") + } - rows, err := pool.Query(ctx, sql, limit) + sql := buildExtractSqlSentence(schema, table, colNames) + + log.Debug("Executing query: ", sql) + + rows, err := sourcePool.Query(ctx, sql, limit) if err != nil { if !errors.Is(err, pgx.ErrNoRows) { - return nil, nil, fmt.Errorf("Unexpected error: %w", err) + return nil, fmt.Errorf("Unexpected error: %w", err) } log.Warn("Unexpected error", err) - return [][]any{}, []string{}, nil + return [][]any{}, nil } defer rows.Close() @@ -101,9 +125,24 @@ LIMIT $1; rowValues = append(rowValues, values) } - return rowValues, Map(cols, func(col pgconn.FieldDescription) string { - return col.Name - }), nil + return rowValues, nil +} + +func insertData(ctx context.Context, targetPool *pgxpool.Pool, schema string, table string, colNames []string, rowValues [][]any) (int64, error) { + identifier := pgx.Identifier{schema, table} + + count, err := targetPool.CopyFrom( + ctx, + identifier, + colNames, + pgx.CopyFromRows(rowValues), + ) + + if err != nil { + return 0, fmt.Errorf("error en CopyFrom: %w", err) + } + + return count, nil } func Map[T any, V any](input []T, mapper func(T) V) []V {