44 Commits

Author SHA1 Message Date
0c59d06af6 refactor: adjust configuration parameters for extractors and transformers; optimize batch processing settings 2026-05-09 02:07:09 -05:00
6fa9b21b1c refactor: update totalRows and chunkSize constants for improved performance 2026-05-09 01:49:39 -05:00
a8be31c18b refactor: adjust configuration parameters for extractors and loaders; enhance logging messages for clarity 2026-05-09 01:48:36 -05:00
5a8bce7701 refactor: update totalRows constant and add siteHolderAttach data generation logic; enhance row generation and loading process 2026-05-09 01:33:12 -05:00
b690e580c5 refactor: enhance logging and batch processing in migration; adjust configuration parameters for improved performance 2026-05-09 01:16:34 -05:00
68d983ea57 refactor: remove unused Extractor and Loader interfaces from types.go; streamline code structure 2026-05-09 00:32:32 -05:00
1bc7b67643 refactor: replace Exec with Consume method in MssqlTransformer; enhance retry handling and streamline transformation logic 2026-05-09 00:30:49 -05:00
d124da8b20 refactor: enhance error messages for max retries in partition and batch processing 2026-05-08 23:36:59 -05:00
b5fd6d0534 refactor: remove unused LoaderError type and associated file; streamline error handling structure 2026-05-08 23:32:07 -05:00
85d7d69da9 refactor: streamline error handling in migration process; consolidate failed partitions and batches tracking 2026-05-08 23:22:53 -05:00
d54108d5e5 refactor: move max failed batches configuration to retry section; clean up unused error handling code 2026-05-08 23:00:23 -05:00
212d3663e2 refactor: remove unused error channels and enhance job configuration; add max failed batches load parameter 2026-05-08 22:28:31 -05:00
c4e233401b refactor: update GenericLoader to use ProcessBatchWithRetries; enhance error handling and retry logic 2026-05-08 07:50:54 -05:00
a216a8016f refactor: update extractor methods to support FromJsonColumns; enhance data processing capabilities 2026-05-07 08:17:25 -05:00
46ddd0d6b7 refactor: enhance extraction query handling; add support for JSON column extraction and wildcard patterns 2026-05-07 07:40:21 -05:00
80babf24f2 refactor: implement Consume method in GenericLoader; enhance error handling in ProcessBatch 2026-05-06 18:56:50 -05:00
f12937a1c3 refactor: standardize job error channel size; update batch size for transformed batches 2026-05-05 23:20:34 -05:00
2a5f703f3c refactor: enhance retry handling in extractor processes; unify backoff delay computation 2026-05-05 23:16:13 -05:00
7cb959a103 refactor: update migration job configuration parameters; rename queue and batch size fields for clarity 2026-05-05 23:04:27 -05:00
6414943cf3 refactor: streamline error handling and processing in GenericExtractor; implement partition processing with retries 2026-04-27 01:11:45 -05:00
00459e42e6 refactor: update column field names in ExtractionQuery struct; enhance logging in consume and process methods 2026-04-27 00:25:48 -05:00
52fe083ab7 refactor: add consume and process methods for GenericExtractor; streamline data extraction logic 2026-04-26 19:39:14 -05:00
9a00d6af04 refactor: replace specific extractor implementations with a generic extractor; remove mssql and postgres extractor files 2026-04-26 19:33:59 -05:00
33af391986 refactor: correct import path for db_dialects package in mssql and postgres files; add db_dialects package 2026-04-26 19:25:16 -05:00
2b2d740d2e feat: implement QueryFromObject method for mssql and postgres wrappers; enhance query building with limits and geometry support 2026-04-26 19:24:22 -05:00
fbe17b3842 refactor: unify db dialect definition in dbdialects package 2026-04-26 01:33:34 -05:00
7bde77dcc5 refactor: rename Extractor Exec method 2026-04-26 01:17:40 -05:00
6ad25e5889 feat: update job configuration for infraestructura_site_holder__attach; enhance storage handling and retry logic 2026-04-22 15:23:19 -05:00
0ac5f01b65 feat: update configuration handling to use cleanenv; remove unused dependencies and improve error logging 2026-04-22 12:31:31 -05:00
3b1371a270 feat: integrate Azure storage handling in migration process; update transformers and job processing logic 2026-04-21 14:15:20 -05:00
bb7b35619a feat: add ToStorage configuration to JobConfig for enhanced data handling 2026-04-21 13:43:03 -05:00
cd2efb8692 feat: add new job configuration for red_terminal__attach with source and target definitions 2026-04-21 13:38:11 -05:00
9964ef819b feat: refactor migration job to use preSQL and postSQL from TargetTable; update config structure for job definitions 2026-04-21 12:30:40 -05:00
bd51223855 feat: add truncate query handling in migration process; update job configuration to remove commented truncate SQL 2026-04-21 11:40:02 -05:00
aa71eeb5c1 feat: enhance range handling in MSSQL and Postgres extractors; update partition range generator logic 2026-04-21 11:32:52 -05:00
9eb8800864 feat: add range configuration to job and update extractors for inclusive range handling 2026-04-21 11:29:34 -05:00
09bd364976 feat: implement Azure Blob Storage client and refactor configuration structure 2026-04-21 11:03:56 -05:00
f2e6edd8fa feat: add caarlos0/env package for environment variable management and refactor appConfig structure 2026-04-21 10:41:38 -05:00
c5a18f6d95 chore: update .env.example to set LOG_LEVEL to INFO and add Azure storage configuration variables 2026-04-21 10:15:01 -05:00
8217b13d08 feat: add azblob client implementation for file uploads 2026-04-21 10:01:42 -05:00
16c232762e feat: refactor extractor interface and implement Consume function for ETL process 2026-04-20 00:19:41 -05:00
b4a846575d Update .env.example with new variables 2026-04-19 23:08:09 -05:00
5bd730f026 feat: update .gitignore to include .vscode directory 2026-04-19 23:06:18 -05:00
7bd80d4180 feat: enhance logging configuration to use dynamic log level from environment variable 2026-04-19 23:06:13 -05:00
43 changed files with 2263 additions and 1067 deletions

24
.atl/skill-registry.md Normal file
View File

@@ -0,0 +1,24 @@
# Skill Registry — go-migrate
Generated: 2026-04-21
## Compact Rules
### Go conventions
- Use existing error wrapping pattern: `fmt.Errorf("context: %w", err)`
- Channel-based pipeline — keep goroutine lifecycle clean (close channels in correct order)
- No comments unless non-obvious WHY; no docstrings
- Prefer named returns only when it aids clarity in short functions
- Use `strings.EqualFold` for case-insensitive column name comparison
### Project conventions
- Config structs live in `internal/app/config/`
- ETL interfaces live in `internal/app/etl/types.go`
- Transformer implementations in `internal/app/etl/transformers/`
- Azure operations via `internal/app/azure/main.go`
- Per-job transformer creation (not shared) when job has storage config
## User Skills
| Trigger | Skill |
|---------|-------|
| sdd-* | SDD workflow skills |

View File

@@ -1,2 +1,12 @@
PG_FROM_DB_URL=postgresql://postgres:password@localhost:5432/db
PG_TO_DB_URL=postgresql://postgres:password@localhost:5432/db
SOURCE_DB_URL=sqlserver://sa:password@localhost:1433?database=master&packet+size=32767&loc=UTC
TARGET_DB_URL=postgresql://postgres:password@localhost:5432/db
LOG_LEVEL=INFO
AZ_STORAGE_ENABLED=false
AZ_ACCOUNT_NAME=
AZ_CONTAINER=
AZ_ACCOUNT_KEY=
AZ_USE_HTTPS=true
AZ_SERVICE_URL=
AZ_PREFIX=

2
.gitignore vendored
View File

@@ -27,5 +27,5 @@ go.work.sum
# Editor/IDE
# .idea/
# .vscode/
.vscode/
.temp

View File

@@ -3,6 +3,7 @@ package main
import (
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
log "github.com/sirupsen/logrus"
)
@@ -13,5 +14,13 @@ func configureLog() {
DisableSorting: false,
PadLevelText: true,
})
log.SetLevel(log.DebugLevel)
logLevelEnv := config.App.LogLevel
logLevel, err := log.ParseLevel(logLevelEnv)
if err != nil {
log.Warnf("Nivel de log inválido '%s', usando INFO por defecto", logLevelEnv)
logLevel = log.InfoLevel
}
log.SetLevel(logLevel)
}

View File

@@ -5,12 +5,12 @@ import (
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
@@ -24,7 +24,7 @@ func main() {
log.Fatalf("error leyendo configuracion: %v", err)
}
log.Debugf("Config: %+v", migrationConfig)
// log.Debugf("Config: %+v", migrationConfig)
startTime := time.Now()
@@ -86,7 +86,7 @@ func main() {
log.Infof("Migración terminada. Tablas: %d, Errores: %d, Filas totales: %d", len(results), totalErrors, totalProcessed)
totalDuration := time.Since(startTime)
log.Infof("=== Migration completed successfully! ===")
// log.Infof("=== Migration completed successfully! ===")
log.Infof("Total migration time: %v", totalDuration)
}
@@ -118,10 +118,18 @@ func processMigrationJobs(
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
extractor := extractors.NewMssqlExtractor(sourceDb)
transformer := transformers.NewMssqlTransformer()
extractor := extractors.NewExtractor(sourceDb)
loader := loaders.NewGenericLoader(targetDb)
var azureClient *azure.Client
if config.App.AzureStorage.Enabled {
var err error
azureClient, err = azure.NewClient(config.App.AzureStorage)
if err != nil {
log.Fatalf("Failed to create Azure storage client: %v", err)
}
}
for i := range maxParallelWorkers {
wgJobs.Go(func() {
for job := range chJobs {
@@ -132,9 +140,10 @@ func processMigrationJobs(
sourceTableAnalyzer,
targetTableAnalyzer,
extractor,
transformer,
azureClient,
loader,
job,
targetDb.GetDialect(),
)
chJobResults <- res

View File

@@ -7,26 +7,48 @@ import (
"sync/atomic"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
const jobErrorsChannelSize int = 100
func buildTruncateQuery(targetDbType, schema, table, truncateMethod string) string {
if truncateMethod == "DELETE" {
if targetDbType == "postgres" {
return fmt.Sprintf(`DELETE FROM "%s"."%s"`, schema, table)
}
return fmt.Sprintf(`DELETE FROM [%s].[%s]`, schema, table)
}
if targetDbType == "postgres" {
return fmt.Sprintf(`TRUNCATE TABLE "%s"."%s"`, schema, table)
}
return fmt.Sprintf(`TRUNCATE TABLE [%s].[%s]`, schema, table)
}
func processMigrationJob(
ctx context.Context,
targetDbWrapper dbwrapper.DbWrapper,
sourceTableAnalyzer etl.TableAnalyzer,
targetTableAnalyzer etl.TableAnalyzer,
extractor etl.Extractor,
transformer etl.Transformer,
loader etl.Loader,
extractor extractors.GenericExtractor,
azureClient *azure.Client,
loader loaders.GenericLoader,
job config.Job,
targetDbType string,
) models.JobResult {
transformer := transformers.NewMssqlTransformer(job.ToStorage, job.SourceTable, azureClient)
localCtx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -35,8 +57,6 @@ func processMigrationJob(
StartTime: time.Now(),
}
var rowsRead, rowsLoaded, rowsFailed int64
var wgQueryColumnTypes errgroup.Group
var sourceColTypes, targetColTypes []models.ColumnType
@@ -66,7 +86,13 @@ func processMigrationJob(
return result
}
for _, query := range job.PreSQL {
preSqlQueries := job.TargetTable.PreSQL
if job.TruncateTarget {
truncateQuery := buildTruncateQuery(targetDbType, job.TargetTable.Schema, job.TargetTable.Table, job.TruncateMethod)
preSqlQueries = append([]string{truncateQuery}, job.TargetTable.PreSQL...)
}
for _, query := range preSqlQueries {
if _, err := targetDbWrapper.Exec(localCtx, query); err != nil {
result.Error = err
return result
@@ -79,23 +105,20 @@ func processMigrationJob(
job.SourceTable.TableInfo,
job.SourceTable.PrimaryKey,
job.RowsPerPartition,
job.Range,
)
if err != nil {
log.Error("Unexpected error calculating batch ranges: ", err)
}
chJobErrors := make(chan custom_errors.JobError, job.QueueSize)
chExtractorErrors := make(chan custom_errors.ExtractorError, job.QueueSize)
chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize)
chPartitions := make(chan models.Partition, job.QueueSize)
chBatchesRaw := make(chan models.Batch, job.QueueSize)
chBatchesTransformed := make(chan models.Batch, job.QueueSize)
chJobErrors := make(chan custom_errors.JobError, jobErrorsChannelSize)
chPartitions := make(chan models.Partition)
chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize)
chBatchesTransformed := make(chan models.Batch, job.TransformerQueueSize)
var wgActivePartitions sync.WaitGroup
var wgActiveBatches sync.WaitGroup
var wgExtractors sync.WaitGroup
var wgTransformers sync.WaitGroup
var wgLoaders sync.WaitGroup
var wgActivePartitions, wgActiveBatches, wgExtractors, wgTransformers, wgLoaders sync.WaitGroup
var rowsRead, rowsLoaded, rowsFailed int64
var failedPartitionsCount, failedBatchesLoadCount int32
go func() {
if err := custom_errors.JobErrorHandler(localCtx, chJobErrors); err != nil {
@@ -105,41 +128,24 @@ func processMigrationJob(
}
}()
go custom_errors.ExtractorErrorHandler(
localCtx,
job.Retry,
job.MaxPartitionErrrors,
chExtractorErrors,
chPartitions,
chJobErrors,
&wgActivePartitions,
)
go custom_errors.LoaderErrorHandler(
localCtx,
job.Retry,
job.MaxChunkErrors,
chLoadersErrors,
chBatchesTransformed,
chJobErrors,
&wgActiveBatches,
)
maxExtractors := min(job.MaxExtractors, len(partitions))
log.Infof("Starting %d extractor(s)...", maxExtractors)
log.Infof("Starting %d extractor(s)... (%v)", maxExtractors, job.Name)
for range maxExtractors {
wgExtractors.Go(func() {
extractor.Exec(
extractor.Consume(
localCtx,
job.SourceTable,
sourceColTypes,
job.BatchSize,
job.ExtractorBatchSize,
job.Retry,
chPartitions,
chBatchesRaw,
chExtractorErrors,
chJobErrors,
&wgActivePartitions,
&rowsRead,
&failedPartitionsCount,
job.SourceTable.FromJsonColumns,
)
})
}
@@ -151,13 +157,15 @@ func processMigrationJob(
}
}()
log.Infof("Starting %d transformer(s)...", maxExtractors)
log.Infof("Starting %d transformer(s)... (%v)", maxExtractors, job.Name)
for range maxExtractors {
wgTransformers.Go(func() {
transformer.Exec(
transformer.Consume(
localCtx,
sourceColTypes,
job.Retry,
job.TransformerBatchSize,
chBatchesRaw,
chBatchesTransformed,
chJobErrors,
@@ -166,64 +174,62 @@ func processMigrationJob(
})
}
log.Infof("Starting %d loader(s)...", job.MaxLoaders)
log.Infof("Starting %d loader(s)... (%v)", job.MaxLoaders, job.Name)
for range job.MaxLoaders {
wgLoaders.Go(func() {
loader.Exec(
loader.Consume(
localCtx,
job.TargetTable,
targetColTypes,
job.Retry,
job.LoaderBatchSize,
chBatchesTransformed,
chLoadersErrors,
chJobErrors,
&wgActiveBatches,
&rowsLoaded,
&failedBatchesLoadCount,
)
})
}
go func() {
log.Debugf("Waiting for goroutines (%v)", job.Name)
// log.Debugf("Waiting for goroutines (%v)", job.Name)
wgActivePartitions.Wait()
log.Debugf("wgActivePartitions is empty (%v)", job.Name)
// log.Debugf("wgActivePartitions is empty (%v)", job.Name)
close(chPartitions)
log.Debugf("chPartitions is closed (%v)", job.Name)
close(chExtractorErrors)
log.Debugf("chExtractorErrors is closed (%v)", job.Name)
// log.Debugf("chPartitions is closed (%v)", job.Name)
wgExtractors.Wait()
log.Debugf("wgExtractors is empty (%v)", job.Name)
// log.Debugf("wgExtractors is empty (%v)", job.Name)
close(chBatchesRaw)
log.Debugf("chBatchesRaw is closed (%v)", job.Name)
// log.Debugf("chBatchesRaw is closed (%v)", job.Name)
wgTransformers.Wait()
log.Debugf("wgTransformers is empty (%v)", job.Name)
// log.Debugf("wgTransformers is empty (%v)", job.Name)
close(chBatchesTransformed)
// log.Debugf("chBatchesTransformed is closed (%v)", job.Name)
wgActiveBatches.Wait()
log.Debugf("wgActiveBatches is empty (%v)", job.Name)
close(chBatchesTransformed)
log.Debugf("chBatchesTransformed is empty (%v)", job.Name)
close(chLoadersErrors)
log.Debugf("chLoadersErrors is empty (%v)", job.Name)
// log.Debugf("wgActiveBatches is empty (%v)", job.Name)
wgLoaders.Wait()
log.Debugf("wgLoaders is empty (%v)", job.Name)
// log.Debugf("wgLoaders is empty (%v)", job.Name)
cancel()
}()
for _, query := range job.PostSQL {
for _, query := range job.TargetTable.PostSQL {
if _, err := targetDbWrapper.Exec(localCtx, query); err != nil {
result.Error = err
return result
}
}
log.Debugf("waiting for local context to be done (%v)", job.Name)
// log.Debugf("waiting for local context to be done (%v)", job.Name)
<-localCtx.Done()
log.Debugf("local context done (%v)", job.Name)
// log.Debugf("local context done (%v)", job.Name)
if ctx.Err() != nil {
result.Error = ctx.Err()
@@ -238,5 +244,9 @@ func processMigrationJob(
result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed)
}
if result.RowsRead == 0 {
log.Warnf("No rows extracted from (%v)", job.Name)
}
return result
}

View File

@@ -1,22 +1,26 @@
max_parallel_workers: 4
source_db_type: sqlserver
target_db_type: sqlserver
target_db_type: postgres
defaults:
batches_per_partition: 4
max_extractors: 2
extractor_batch_size: 5000
extractor_queue_size: 8
max_transformers: 2
transformer_batch_size: 12500
transformer_queue_size: 8
max_loaders: 4
queue_size: 8
batch_size: 25000
batches_per_partition: 8
loader_batch_size: 25000
truncate_target: true
truncate_method: TRUNCATE # TRUNCATE | DELETE
max_partition_errrors: 5
max_chunk_errors: 5
retry:
attempts: 3
base_delay_ms: 500
max_delay_ms: 10000
max_jitter_ms: 500
max_failed_partitions: 5
max_failed_batches_load: 5
jobs:
- name: cartografia_manzana
@@ -28,9 +32,8 @@ jobs:
target:
schema: Cartografia
table: MANZANA
pre_sql:
- 'SELECT 1'
# - 'TRUNCATE TABLE "Cartografia"."MANZANA"'
pre_sql:
- 'SELECT 1'
range:
min: 1000000
max: 2000000
@@ -43,11 +46,41 @@ jobs:
schema: Red
table: PUERTO
primary_key: ID_PUERTO
from_json:
- column: $node_id*
field: id
target:
schema: Red
table: PUERTO
pre_sql:
- 'SELECT 1'
# - 'TRUNCATE TABLE "Red"."PUERTO"'
post_sql:
- "SELECT 1"
pre_sql:
- 'SELECT 1'
post_sql:
- "SELECT 1"
- name: infraestructura_site_holder__attach
source:
schema: Infraestructura
table: SITE_HOLDER__ATTACH
primary_key: GDB_ARCHIVE_OID
target:
schema: Infraestructura
table: SITE_HOLDER__ATTACH
to_storage:
columns:
- source: DATA
target: FILE_URL
mode: REFERENCE_ONLY
batches_per_partition: 20
max_extractors: 32
extractor_batch_size: 1
extractor_queue_size: 100
max_transformers: 48
transformer_batch_size: 500
transformer_queue_size: 8
max_loaders: 4
loader_batch_size: 500
retry:
attempts: 5
base_delay_ms: 1000
max_delay_ms: 15000
max_jitter_ms: 500

12
go.mod
View File

@@ -1,12 +1,13 @@
module git.ksdemosapps.com/kylesoda/go-migrate
go 1.25.7
go 1.26
require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4
github.com/gaspardle/go-mssqlclrgeo v0.0.0-20160129143314-97ceabf987a4
github.com/google/uuid v1.6.0
github.com/ilyakaznacheev/cleanenv v1.5.0
github.com/jackc/pgx/v5 v5.9.1
github.com/joho/godotenv v1.5.1
github.com/microsoft/go-mssqldb v1.9.8
github.com/sirupsen/logrus v1.9.4
github.com/twpayne/go-geom v1.6.1
@@ -15,15 +16,20 @@ require (
)
require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 // indirect
github.com/BurntSushi/toml v1.6.0 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
golang.org/x/crypto v0.48.0 // indirect
golang.org/x/net v0.51.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/text v0.34.0 // indirect
olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3 // indirect
)

16
go.sum
View File

@@ -4,19 +4,25 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1 h1:Hk5QBxZQC1jb2Fwj6mpz
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1/go.mod h1:IYus9qsFobWIc2YVwe/WPjcnyCkPKtnHAqUYeebc8z0=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 h1:9iefClla7iYpfYWdzPCRDozdmndjTm8DXdpCzPajMgA=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2/go.mod h1:XtLgD3ZD34DAaVIIAyG3objl5DynM3CQ/vMcbBNJZGI=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.8.1 h1:/Zt+cDPnpC3OVDm/JKLOs7M2DKmLRIIp3XIx9pHHiig=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.8.1/go.mod h1:Ng3urmn6dYe8gnbCMoHHVl5APYz2txho3koEkV2o2HA=
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azkeys v1.4.0 h1:E4MgwLBGeVB5f2MdcIVD3ELVAWpr+WD6MUe1i+tM/PA=
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azkeys v1.4.0/go.mod h1:Y2b/1clN4zsAoUd/pgNAQHjLDnTis/6ROkUfyob6psM=
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.2.0 h1:nCYfgcSyHZXJI8J0IWE5MsCGlb2xp9fJiXyxWgmOFg4=
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.2.0/go.mod h1:ucUjca2JtSZboY8IoUqyQyuuXvwbMBVwFOm0vdQPNhA=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4 h1:jWQK1GI+LeGGUKBADtcH2rRqPxYB1Ljwms5gFA2LqrM=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4/go.mod h1:8mwH4klAm9DUgR2EEHyEEAQlRDvLPyg5fQry3y+cDew=
github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0 h1:XRzhVemXdgvJqCH0sFfrBUTnUJSBrBf7++ypk+twtRs=
github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0/go.mod h1:HKpQxkWaGLJ+D/5H8QRpyQXA1eKjxkFlOMwck5+33Jk=
github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/toml v1.6.0 h1:dRaEfpa2VI55EwlIW72hMRHdWouJeRF7TPYhI+AUQjk=
github.com/BurntSushi/toml v1.6.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZP/GkPY=
github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k=
github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc=
github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -32,6 +38,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
github.com/ilyakaznacheev/cleanenv v1.5.0 h1:0VNZXggJE2OYdXE87bfSSwGxeiGt9moSR2lOrsHHvr4=
github.com/ilyakaznacheev/cleanenv v1.5.0/go.mod h1:a5aDzaJrLCQZsazHol1w8InnDcOX0OColm64SlIi6gk=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
@@ -42,8 +50,8 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
@@ -83,3 +91,5 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3 h1:slmdOY3vp8a7KQbHkL+FLbvbkgMqmXojpFUO/jENuqQ=
olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3/go.mod h1:oVgVk4OWVDi43qWBEyGhXgYxt7+ED4iYNpTngSLX2Iw=

View File

@@ -0,0 +1,87 @@
package azure
import (
"context"
"errors"
"fmt"
"net/url"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
)
var (
ErrInvalidConnectionString = errors.New("invalid connection string")
ErrContainerNotFound = errors.New("container not found")
ErrBlobNotFound = errors.New("blob not found")
ErrInvalidInput = errors.New("invalid input parameters")
)
type Client struct {
client *azblob.Client
azureStorageConfig config.AzureStorageConfig
}
func NewClient(azureStorageConfig config.AzureStorageConfig) (*Client, error) {
protocol := "https"
if !azureStorageConfig.UseHTTPS {
protocol = "http"
}
blobEndpoint, _ := url.JoinPath(azureStorageConfig.ServiceURL, azureStorageConfig.AccountName)
connStr := fmt.Sprintf("DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s;BlobEndpoint=%s;",
protocol, azureStorageConfig.AccountName, azureStorageConfig.AccountKey, blobEndpoint)
client, err := azblob.NewClientFromConnectionString(connStr, nil)
if err != nil {
return nil, fmt.Errorf("creating azure storage client: %w", err)
}
return &Client{
client: client,
azureStorageConfig: azureStorageConfig,
}, nil
}
func (c *Client) CreateContainer(ctx context.Context, containerName string) error {
if containerName == "" {
return ErrInvalidInput
}
_, err := c.client.CreateContainer(ctx, containerName, nil)
if err != nil {
return fmt.Errorf("creating container %s: %w", containerName, err)
}
return nil
}
func (c *Client) UploadBuffer(ctx context.Context, containerName, blobPath string, buffer []byte) error {
if containerName == "" || blobPath == "" || buffer == nil {
return ErrInvalidInput
}
_, err := c.client.UploadBuffer(ctx, containerName, blobPath, buffer, nil)
if err != nil {
return fmt.Errorf("uploading blob %s: %w", blobPath, err)
}
return nil
}
func (c *Client) UploadAndGetURL(ctx context.Context, blobPath string, buffer []byte) (string, error) {
if blobPath == "" || buffer == nil {
return "", ErrInvalidInput
}
fullPath := blobPath
if c.azureStorageConfig.Prefix != "" {
fullPath, _ = url.JoinPath(c.azureStorageConfig.Prefix, blobPath)
}
if err := c.UploadBuffer(ctx, c.azureStorageConfig.Container, fullPath, buffer); err != nil {
return "", err
}
blobEndpoint, _ := url.JoinPath(c.azureStorageConfig.ServiceURL, c.azureStorageConfig.AccountName)
blobURL, _ := url.JoinPath(blobEndpoint, c.azureStorageConfig.Container, fullPath)
return blobURL, nil
}

View File

@@ -1,41 +1,41 @@
package config
import (
"os"
"github.com/joho/godotenv"
"github.com/ilyakaznacheev/cleanenv"
log "github.com/sirupsen/logrus"
)
type appConfig struct {
SourceDbUrl string
TargetDbUrl string
type AzureStorageConfig struct {
AccountName string `env:"AZ_ACCOUNT_NAME"`
Container string `env:"AZ_CONTAINER"`
AccountKey string `env:"AZ_ACCOUNT_KEY"`
UseHTTPS bool `env:"AZ_USE_HTTPS" env-default:"true"`
ServiceURL string `env:"AZ_SERVICE_URL"`
Prefix string `env:"AZ_PREFIX"`
Enabled bool `env:"AZ_STORAGE_ENABLED"`
}
func loadEnv() {
err := godotenv.Load()
if err != nil {
log.Warn("Warning: could not load .env file")
}
type appConfig struct {
SourceDbUrl string `env:"SOURCE_DB_URL" env-required:"true"`
TargetDbUrl string `env:"TARGET_DB_URL" env-required:"true"`
LogLevel string `env:"LOG_LEVEL" env-default:"INFO"`
AzureStorage AzureStorageConfig
}
func getAppConfig() appConfig {
loadEnv()
var cfg appConfig
sourceDbUrl := os.Getenv("SOURCE_DB_URL")
if sourceDbUrl == "" {
log.Fatal("SOURCE_DB_URL environment variable not set")
err := cleanenv.ReadConfig(".env", &cfg)
if err != nil {
log.Warn("Could not load .env file")
}
targetDbUrl := os.Getenv("TARGET_DB_URL")
if targetDbUrl == "" {
log.Fatal("TARGET_DB_URL environment variable not set")
err = cleanenv.ReadEnv(&cfg)
if err != nil {
log.Fatalf("Error al cargar variables: %v", err)
}
return appConfig{
SourceDbUrl: sourceDbUrl,
TargetDbUrl: targetDbUrl,
}
return cfg
}
var App appConfig = getAppConfig()

View File

@@ -8,24 +8,44 @@ import (
)
type RetryConfig struct {
Attempts int `yaml:"attempts"`
BaseDelayMs int `yaml:"base_delay_ms"`
MaxDelayMs int `yaml:"max_delay_ms"`
MaxJitterMs int `yaml:"max_jitter_ms"`
Attempts int `yaml:"attempts"`
BaseDelayMs int `yaml:"base_delay_ms"`
MaxDelayMs int `yaml:"max_delay_ms"`
MaxJitterMs int `yaml:"max_jitter_ms"`
MaxFailedPartitions int `yaml:"max_failed_partitions"`
MaxFailedBatchesLoad int `yaml:"max_failed_batches_load"`
}
type ToStorageColumnConfig struct {
Source string `yaml:"source"`
Target string `yaml:"target"`
Mode string `yaml:"mode"`
}
type ToStorageConfig struct {
Columns []ToStorageColumnConfig `yaml:"columns"`
}
type JobConfig struct {
MaxExtractors int `yaml:"max_extractors"`
MaxLoaders int `yaml:"max_loaders"`
QueueSize int `yaml:"queue_size"`
BatchSize int `yaml:"batch_size"`
BatchesPerPartition int `yaml:"batches_per_partition"`
TruncateTarget bool `yaml:"truncate_target"`
TruncateMethod string `yaml:"truncate_method"`
MaxPartitionErrrors int `yaml:"max_partition_errrors"`
MaxChunkErrors int `yaml:"max_chunk_errors"`
Retry RetryConfig `yaml:"retry"`
RowsPerPartition int64
BatchesPerPartition int `yaml:"batches_per_partition"`
MaxExtractors int `yaml:"max_extractors"`
ExtractorBatchSize int `yaml:"extractor_batch_size"`
ExtractorQueueSize int `yaml:"extractor_queue_size"`
MaxTransformers int `yaml:"max_transformers"`
TransformerBatchSize int `yaml:"transformer_batch_size"`
TransformerQueueSize int `yaml:"transformer_queue_size"`
MaxLoaders int `yaml:"max_loaders"`
LoaderBatchSize int `yaml:"loader_batch_size"`
TruncateTarget bool `yaml:"truncate_target"`
TruncateMethod string `yaml:"truncate_method"`
Retry RetryConfig `yaml:"retry"`
RowsPerPartition int64
ToStorage ToStorageConfig `yaml:"to_storage"`
}
type FromJsonItem struct {
Column string `yaml:"column"`
Field string `yaml:"field"`
}
type TableInfo struct {
@@ -33,13 +53,23 @@ type TableInfo struct {
Table string `yaml:"table"`
}
type TargetTableInfo struct {
TableInfo `yaml:",inline"`
type SourceTableInfo struct {
TableInfo `yaml:",inline"`
PrimaryKey string `yaml:"primary_key"`
FromJsonColumns []FromJsonItem `yaml:"from_json"`
}
type SourceTableInfo struct {
TableInfo `yaml:",inline"`
PrimaryKey string `yaml:"primary_key"`
type TargetTableInfo struct {
TableInfo `yaml:",inline"`
PreSQL []string `yaml:"pre_sql"`
PostSQL []string `yaml:"post_sql"`
}
type RangeConfig struct {
Min int64 `yaml:"min"`
Max int64 `yaml:"max"`
IsMinInclusive bool `yaml:"is_min_inclusive"`
IsMaxInclusive bool `yaml:"is_max_inclusive"`
}
type Job struct {
@@ -47,15 +77,8 @@ type Job struct {
Enabled bool `yaml:"enabled"`
SourceTable SourceTableInfo `yaml:"source"`
TargetTable TargetTableInfo `yaml:"target"`
PreSQL []string `yaml:"pre_sql"`
PostSQL []string `yaml:"post_sql"`
JobConfig `yaml:",inline"`
Range struct {
Min int64 `yaml:"min"`
Max int64 `yaml:"max"`
IsMinInclusive bool `yaml:"is_min_inclusive"`
IsMaxInclusive bool `yaml:"is_max_inclusive"`
}
Range RangeConfig `yaml:"range"`
}
type MigrationConfig struct {
@@ -84,7 +107,7 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
c.Defaults = raw.Defaults
c.SourceDbType = raw.SourceDbType
c.TargetDbType = raw.TargetDbType
c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition)
c.Defaults.RowsPerPartition = int64(raw.Defaults.ExtractorBatchSize * raw.Defaults.BatchesPerPartition)
for _, node := range raw.Jobs {
job := Job{
@@ -95,7 +118,7 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
return err
}
job.RowsPerPartition = int64(job.BatchSize * job.BatchesPerPartition)
job.RowsPerPartition = int64(job.ExtractorBatchSize * job.BatchesPerPartition)
c.Jobs = append(c.Jobs, job)
}

View File

@@ -1,12 +1,11 @@
package custom_errors
import (
"context"
"math/rand"
"time"
)
func computeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJitterMs int) time.Duration {
func ComputeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJitterMs int) time.Duration {
if retryCounter < 0 {
retryCounter = 0
}
@@ -40,22 +39,3 @@ func computeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJ
return delay
}
func requeueWithBackoff(ctx context.Context, delay time.Duration, enqueue func()) {
if delay <= 0 {
enqueue()
return
}
go func() {
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-ctx.Done():
return
case <-timer.C:
enqueue()
}
}()
}

View File

@@ -1,119 +0,0 @@
package custom_errors
import (
"context"
"fmt"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type ExtractorError struct {
Partition models.Partition
LastId int64
HasLastId bool
Msg string
}
func (e *ExtractorError) Error() string {
return e.Msg
}
func ExtractorErrorHandler(
ctx context.Context,
retryConfig config.RetryConfig,
maxPartitionErrors int,
chErrorsIn <-chan ExtractorError,
chPartitionsOut chan<- models.Partition,
chJobErrorsOut chan<- JobError,
wgActivePartitions *sync.WaitGroup,
) {
definitiveErrors := 0
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case err, ok := <-chErrorsIn:
if !ok {
return
}
if err.Partition.RetryCounter >= retryConfig.Attempts {
wgActivePartitions.Done()
definitiveErrors++
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, retryConfig.Attempts),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
if maxPartitionErrors > 0 && definitiveErrors >= maxPartitionErrors {
fatalError := JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Partition error limit reached (%d)", maxPartitionErrors),
Prev: &err,
}
select {
case chJobErrorsOut <- fatalError:
case <-ctx.Done():
return
}
}
continue
} else {
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Temporal error in partition %v (retries: %d)", err.Partition.Id, err.Partition.RetryCounter),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
}
newPartition := err.Partition
newPartition.RetryCounter++
delay := computeBackoffDelay(
newPartition.RetryCounter,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,
retryConfig.MaxJitterMs,
)
if err.HasLastId {
newPartition.ParentId = err.Partition.Id
newPartition.Id = uuid.New()
newPartition.Range.Min = err.LastId
newPartition.Range.IsMinInclusive = false
}
requeueWithBackoff(ctx, delay, func() {
select {
case chPartitionsOut <- newPartition:
case <-ctx.Done():
return
}
})
}
}
}

View File

@@ -1,107 +0,0 @@
package custom_errors
import (
"context"
"fmt"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
type LoaderError struct {
Batch models.Batch
Msg string
}
func (e *LoaderError) Error() string {
return e.Msg
}
func LoaderErrorHandler(
ctx context.Context,
retryConfig config.RetryConfig,
maxChunkErrors int,
chErrorsIn <-chan LoaderError,
chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- JobError,
wgActiveBatches *sync.WaitGroup,
) {
definitiveErrors := 0
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case err, ok := <-chErrorsIn:
if !ok {
return
}
if err.Batch.RetryCounter >= retryConfig.Attempts {
wgActiveBatches.Done()
definitiveErrors++
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Batch.Id, retryConfig.Attempts),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
if maxChunkErrors > 0 && definitiveErrors >= maxChunkErrors {
fatalError := JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Chunk error limit reached (%d)", maxChunkErrors),
Prev: &err,
}
select {
case chJobErrorsOut <- fatalError:
case <-ctx.Done():
return
}
}
continue
} else {
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Temporal error in batch %v (retries: %d)", err.Batch.Id, err.Batch.RetryCounter),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
}
err.Batch.RetryCounter++
delay := computeBackoffDelay(
err.Batch.RetryCounter,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,
retryConfig.MaxJitterMs,
)
requeueWithBackoff(ctx, delay, func() {
select {
case chBatchesOut <- err.Batch:
case <-ctx.Done():
return
}
})
}
}
}

View File

@@ -0,0 +1,25 @@
package custom_errors
import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
type ExtractorError struct {
Partition models.Partition
LastId int64
HasLastId bool
Msg string
}
func (e *ExtractorError) Error() string {
return e.Msg
}
type LoaderError struct {
Batch models.Batch
Msg string
}
func (e *LoaderError) Error() string {
return e.Msg
}

View File

@@ -0,0 +1,7 @@
package db_dialects
const (
SqlServer string = "sqlserver"
Postgres string = "postgres"
Null string = "null"
)

View File

@@ -4,13 +4,17 @@ import (
"context"
"database/sql"
"fmt"
"strings"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
mssql "github.com/microsoft/go-mssqldb"
)
func init() {
Register("sqlserver", func() DbWrapper {
return &mssqlDbWrapper{dialect: "sqlserver"}
Register(dbdialects.SqlServer, func() DbWrapper {
return &mssqlDbWrapper{dialect: dbdialects.SqlServer}
})
}
@@ -174,3 +178,140 @@ func (mw *mssqlDbWrapper) SaveMassive(ctx context.Context, schema string, table
return rowsAffected, nil
}
func buildExtractQueryMssql(q ExtractionQuery) (string, error) {
var sbQuery strings.Builder
sbQuery.WriteString("SELECT ")
hasRegularColumns := len(q.Columns) > 0
hasJsonColumns := len(q.FromJsonColumns) > 0
resolvedJson := make(map[string][]config.FromJsonItem, len(q.FromJsonColumns))
if hasJsonColumns {
for _, jsonConfig := range q.FromJsonColumns {
actualColumnName, err := findColumnByPattern(q.Columns, jsonConfig.Column)
if err != nil {
return "", err
}
resolvedJson[actualColumnName] = append(resolvedJson[actualColumnName], jsonConfig)
}
}
selectParts := make([]string, 0, len(q.Columns)+len(q.FromJsonColumns))
if hasRegularColumns {
for _, col := range q.Columns {
jsonConfigs, isJsonColumn := resolvedJson[col.Name()]
if isJsonColumn {
for _, jsonConfig := range jsonConfigs {
jsonPath := buildJsonPathMssql(jsonConfig.Field)
jsonExpr := fmt.Sprintf("JSON_VALUE([%s], '%s') AS [%s]", col.Name(), jsonPath, col.Name())
selectParts = append(selectParts, jsonExpr)
}
continue
}
colExpr := fmt.Sprintf("[%s]", col.Name())
switch col.Type() {
case "GEOMETRY":
colExpr = fmt.Sprintf("[%s].STAsBinary() AS [%s]", col.Name(), col.Name())
}
selectParts = append(selectParts, colExpr)
}
} else if !hasJsonColumns {
selectParts = append(selectParts, "*")
}
for i, part := range selectParts {
sbQuery.WriteString(part)
if i < len(selectParts)-1 {
sbQuery.WriteString(", ")
}
}
fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", q.Schema, q.Table)
if q.LowerLimit.IsValid || q.UpperLimit.IsValid {
sbQuery.WriteString(" WHERE ")
if q.LowerLimit.IsValid {
fmt.Fprintf(&sbQuery, "[%s]", q.PrimaryKey)
if q.LowerLimit.IsInclusive {
sbQuery.WriteString(" >=")
} else {
sbQuery.WriteString(" >")
}
sbQuery.WriteString(" @min")
}
if q.LowerLimit.IsValid && q.UpperLimit.IsValid {
sbQuery.WriteString(" AND ")
}
if q.UpperLimit.IsValid {
fmt.Fprintf(&sbQuery, "[%s]", q.PrimaryKey)
if q.UpperLimit.IsInclusive {
sbQuery.WriteString(" <=")
} else {
sbQuery.WriteString(" <")
}
sbQuery.WriteString(" @max")
}
}
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", q.PrimaryKey)
return sbQuery.String(), nil
}
func findColumnByPattern(columns []models.ColumnType, pattern string) (string, error) {
if pattern == "" {
return "", fmt.Errorf("column pattern cannot be empty")
}
if before, ok := strings.CutSuffix(pattern, "*"); ok {
prefix := before
for _, col := range columns {
if strings.HasPrefix(col.Name(), prefix) {
return col.Name(), nil
}
}
return "", fmt.Errorf("no column found matching pattern '%s'", pattern)
}
for _, col := range columns {
if col.Name() == pattern {
return col.Name(), nil
}
}
return "", fmt.Errorf("column '%s' not found in table columns", pattern)
}
func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery) (RowsResult, error) {
queryString, err := buildExtractQueryMssql(q)
if err != nil {
return nil, err
}
// logrus.Debugf("Query: %s", queryString)
var queryArgs []any
if q.LowerLimit.IsValid {
queryArgs = append(queryArgs, sql.Named("min", q.LowerLimit.Value))
}
if q.UpperLimit.IsValid {
queryArgs = append(queryArgs, sql.Named("max", q.UpperLimit.Value))
}
return mw.Query(ctx, queryString, queryArgs...)
}
func buildJsonPathMssql(field string) string {
if len(field) > 0 && field[0] == '.' {
field = field[1:]
}
return "$." + field
}

View File

@@ -0,0 +1,396 @@
package dbwrapper
import (
"strings"
"testing"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
func TestBuildExtractQueryMssql_NoJsonColumns(t *testing.T) {
q := ExtractionQuery{
Schema: "dbo",
Table: "Users",
PrimaryKey: "ID",
Columns: []models.ColumnType{
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
models.NewColumnType("Name", true, false, "VARCHAR", "varchar", "VARCHAR", true, 255, 0, 0),
},
FromJsonColumns: []config.FromJsonItem{},
LowerLimit: ExtractorQueryLimit{IsValid: false},
UpperLimit: ExtractorQueryLimit{IsValid: false},
}
query, err := buildExtractQueryMssql(q)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
if !strings.Contains(query, "SELECT [ID], [Name]") {
t.Errorf("Expected columns in query, got: %s", query)
}
if !strings.Contains(query, "FROM [dbo].[Users]") {
t.Errorf("Expected FROM clause, got: %s", query)
}
if !strings.Contains(query, "ORDER BY [ID] ASC") {
t.Errorf("Expected ORDER BY clause, got: %s", query)
}
}
func TestBuildExtractQueryMssql_WithJsonColumns_ExactColumnMatch(t *testing.T) {
// Test that the actual column name is used as alias, not a generated one
q := ExtractionQuery{
Schema: "dbo",
Table: "Events",
PrimaryKey: "EventID",
Columns: []models.ColumnType{
models.NewColumnType("EventID", false, false, "INT", "int", "INT", false, 0, 0, 0),
models.NewColumnType("EventData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
},
FromJsonColumns: []config.FromJsonItem{
{Column: "EventData", Field: ".userId"},
{Column: "EventData", Field: ".timestamp"},
},
LowerLimit: ExtractorQueryLimit{IsValid: false},
UpperLimit: ExtractorQueryLimit{IsValid: false},
}
query, err := buildExtractQueryMssql(q)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
if !strings.HasPrefix(query, "SELECT [EventID], JSON_VALUE([EventData], '$.userId') AS [EventData], JSON_VALUE([EventData], '$.timestamp') AS [EventData]") {
t.Errorf("Expected JSON columns to replace EventData in-order, got: %s", query)
}
if strings.Contains(query, "SELECT [EventID], [EventData]") {
t.Errorf("Expected EventData to be replaced by JSON extraction, got: %s", query)
}
// Alias should be exactly "EventData", not "EventData_userId"
if !strings.Contains(query, "JSON_VALUE([EventData], '$.userId') AS [EventData]") {
t.Errorf("Expected JSON alias to be [EventData], got: %s", query)
}
if !strings.Contains(query, "JSON_VALUE([EventData], '$.timestamp') AS [EventData]") {
t.Errorf("Expected JSON alias to be [EventData], got: %s", query)
}
// Should have comma separating them
if !strings.Contains(query, "JSON_VALUE([EventData], '$.userId') AS [EventData], JSON_VALUE([EventData], '$.timestamp') AS [EventData]") {
t.Errorf("Expected comma-separated JSON values, got: %s", query)
}
}
func TestBuildExtractQueryMssql_WithWildcardPattern(t *testing.T) {
// Test that wildcard pattern matching finds the correct column
q := ExtractionQuery{
Schema: "dbo",
Table: "Events",
PrimaryKey: "ID",
Columns: []models.ColumnType{
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
models.NewColumnType("NodeMetadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
},
FromJsonColumns: []config.FromJsonItem{
{Column: "NodeMeta*", Field: ".id"},
},
LowerLimit: ExtractorQueryLimit{IsValid: false},
UpperLimit: ExtractorQueryLimit{IsValid: false},
}
query, err := buildExtractQueryMssql(q)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
// Should find "NodeMetadata" from pattern "NodeMeta*" and use it as alias
if !strings.Contains(query, "JSON_VALUE([NodeMetadata], '$.id') AS [NodeMetadata]") {
t.Errorf("Expected to find and use NodeMetadata column by pattern, got: %s", query)
}
if strings.Contains(query, "SELECT [ID], [NodeMetadata]") {
t.Errorf("Expected NodeMetadata to be replaced by JSON extraction, got: %s", query)
}
}
func TestBuildExtractQueryMssql_ColumnNotFound_Error(t *testing.T) {
// Test that an error is returned when column is not found
q := ExtractionQuery{
Schema: "dbo",
Table: "Events",
PrimaryKey: "ID",
Columns: []models.ColumnType{
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
},
FromJsonColumns: []config.FromJsonItem{
{Column: "NonExistentColumn", Field: ".id"},
},
LowerLimit: ExtractorQueryLimit{IsValid: false},
UpperLimit: ExtractorQueryLimit{IsValid: false},
}
query, err := buildExtractQueryMssql(q)
if err == nil {
t.Fatalf("Expected error for missing column, got no error. Query: %s", query)
}
if !strings.Contains(err.Error(), "NonExistentColumn") {
t.Errorf("Expected error message to contain column name, got: %v", err)
}
}
func TestBuildExtractQueryMssql_WildcardPatternNotMatched_Error(t *testing.T) {
// Test that an error is returned when wildcard pattern doesn't match any column
q := ExtractionQuery{
Schema: "dbo",
Table: "Events",
PrimaryKey: "ID",
Columns: []models.ColumnType{
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
models.NewColumnType("EventData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
},
FromJsonColumns: []config.FromJsonItem{
{Column: "NonMatching*", Field: ".id"},
},
LowerLimit: ExtractorQueryLimit{IsValid: false},
UpperLimit: ExtractorQueryLimit{IsValid: false},
}
query, err := buildExtractQueryMssql(q)
if err == nil {
t.Fatalf("Expected error for non-matching wildcard pattern, got no error. Query: %s", query)
}
if !strings.Contains(err.Error(), "NonMatching*") {
t.Errorf("Expected error message to contain pattern, got: %v", err)
}
}
func TestBuildExtractQueryMssql_NestedJsonFields(t *testing.T) {
q := ExtractionQuery{
Schema: "dbo",
Table: "Data",
PrimaryKey: "ID",
Columns: []models.ColumnType{
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
models.NewColumnType("NodeData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
},
FromJsonColumns: []config.FromJsonItem{
{Column: "NodeData", Field: ".user.name"},
{Column: "NodeData", Field: ".user.email"},
},
LowerLimit: ExtractorQueryLimit{IsValid: false},
UpperLimit: ExtractorQueryLimit{IsValid: false},
}
query, err := buildExtractQueryMssql(q)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
if !strings.Contains(query, "JSON_VALUE([NodeData], '$.user.name') AS [NodeData]") {
t.Errorf("Expected nested JSON path for user.name, got: %s", query)
}
if !strings.Contains(query, "JSON_VALUE([NodeData], '$.user.email') AS [NodeData]") {
t.Errorf("Expected nested JSON path for user.email, got: %s", query)
}
if strings.Contains(query, "SELECT [ID], [NodeData]") {
t.Errorf("Expected NodeData to be replaced by JSON extraction, got: %s", query)
}
}
func TestBuildExtractQueryMssql_WithRangeLimits(t *testing.T) {
q := ExtractionQuery{
Schema: "dbo",
Table: "Products",
PrimaryKey: "ProductID",
Columns: []models.ColumnType{
models.NewColumnType("ProductID", false, false, "INT", "int", "INT", false, 0, 0, 0),
models.NewColumnType("Details", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
},
FromJsonColumns: []config.FromJsonItem{
{Column: "Details", Field: ".price"},
},
LowerLimit: ExtractorQueryLimit{IsValid: true, IsInclusive: true, Value: 100},
UpperLimit: ExtractorQueryLimit{IsValid: true, IsInclusive: false, Value: 500},
}
query, err := buildExtractQueryMssql(q)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
if !strings.Contains(query, "WHERE [ProductID] >= @min") {
t.Errorf("Expected WHERE clause with >=, got: %s", query)
}
if !strings.Contains(query, "[ProductID] < @max") {
t.Errorf("Expected upper limit with <, got: %s", query)
}
if !strings.Contains(query, "JSON_VALUE([Details], '$.price') AS [Details]") {
t.Errorf("Expected JSON_VALUE for Details, got: %s", query)
}
if strings.Contains(query, "SELECT [ProductID], [Details]") {
t.Errorf("Expected Details to be replaced by JSON extraction, got: %s", query)
}
}
func TestBuildJsonPathMssql(t *testing.T) {
tests := []struct {
input string
expected string
}{
{".id", "$.id"},
{"id", "$.id"},
{".user.name", "$.user.name"},
{"user.name", "$.user.name"},
{".location.coordinates.lat", "$.location.coordinates.lat"},
{"", "$."},
}
for _, tt := range tests {
result := buildJsonPathMssql(tt.input)
if result != tt.expected {
t.Errorf("buildJsonPathMssql(%q) = %q, want %q", tt.input, result, tt.expected)
}
}
}
func TestFindColumnByPattern_ExactMatch(t *testing.T) {
columns := []models.ColumnType{
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
models.NewColumnType("Metadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
models.NewColumnType("EventData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
}
result, err := findColumnByPattern(columns, "Metadata")
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
if result != "Metadata" {
t.Errorf("Expected 'Metadata', got '%s'", result)
}
}
func TestFindColumnByPattern_WildcardMatch(t *testing.T) {
columns := []models.ColumnType{
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
models.NewColumnType("NodeMetadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
models.NewColumnType("EventData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
}
result, err := findColumnByPattern(columns, "NodeMeta*")
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
if result != "NodeMetadata" {
t.Errorf("Expected 'NodeMetadata', got '%s'", result)
}
}
func TestFindColumnByPattern_NotFound(t *testing.T) {
columns := []models.ColumnType{
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
models.NewColumnType("Metadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
}
result, err := findColumnByPattern(columns, "NonExistent")
if err == nil {
t.Fatalf("Expected error, got no error. Result: %s", result)
}
if !strings.Contains(err.Error(), "NonExistent") {
t.Errorf("Expected error to contain column name, got: %v", err)
}
}
func TestFindColumnByPattern_WildcardNotFound(t *testing.T) {
columns := []models.ColumnType{
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
models.NewColumnType("Metadata", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
}
result, err := findColumnByPattern(columns, "Event*")
if err == nil {
t.Fatalf("Expected error, got no error. Result: %s", result)
}
if !strings.Contains(err.Error(), "Event*") {
t.Errorf("Expected error to contain pattern, got: %v", err)
}
}
func TestBuildExtractQueryMssql_OnlyJsonColumns(t *testing.T) {
// Test when all columns are used via JSON extraction
q := ExtractionQuery{
Schema: "dbo",
Table: "Data",
PrimaryKey: "ID",
Columns: []models.ColumnType{
models.NewColumnType("ID", false, false, "INT", "int", "INT", false, 0, 0, 0),
models.NewColumnType("JsonData", true, false, "VARCHAR", "varchar", "VARCHAR", true, 500, 0, 0),
},
FromJsonColumns: []config.FromJsonItem{
{Column: "JsonData", Field: ".field1"},
},
LowerLimit: ExtractorQueryLimit{IsValid: false},
UpperLimit: ExtractorQueryLimit{IsValid: false},
}
query, err := buildExtractQueryMssql(q)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
if !strings.HasPrefix(query, "SELECT [ID], JSON_VALUE([JsonData], '$.field1') AS [JsonData]") {
t.Errorf("Expected JsonData to be replaced by JSON extraction, got: %s", query)
}
if strings.Contains(query, "SELECT [ID], [JsonData]") {
t.Errorf("Expected JsonData to be excluded from raw selection, got: %s", query)
}
}
func TestBuildExtractQueryMssql_JsonColumnsReplaceInOrder(t *testing.T) {
q := ExtractionQuery{
Schema: "dbo",
Table: "Users",
PrimaryKey: "UserID",
Columns: []models.ColumnType{
models.NewColumnType("UserID", false, false, "INT", "int", "INT", false, 0, 0, 0),
models.NewColumnType("Name", true, false, "VARCHAR", "varchar", "VARCHAR", false, 255, 0, 0),
models.NewColumnType("Email", true, false, "VARCHAR", "varchar", "VARCHAR", false, 255, 0, 0),
models.NewColumnType("Metadata", true, false, "NVARCHAR", "nvarchar", "NVARCHAR", true, 4000, 0, 0),
models.NewColumnType("Profile", true, false, "NVARCHAR", "nvarchar", "NVARCHAR", true, 4000, 0, 0),
models.NewColumnType("Settings", true, false, "NVARCHAR", "nvarchar", "NVARCHAR", true, 4000, 0, 0),
},
FromJsonColumns: []config.FromJsonItem{
{Column: "Metadata", Field: ".id"},
{Column: "Profile", Field: ".id"},
{Column: "Settings", Field: ".id"},
},
LowerLimit: ExtractorQueryLimit{IsValid: false},
UpperLimit: ExtractorQueryLimit{IsValid: false},
}
query, err := buildExtractQueryMssql(q)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
expected := "SELECT [UserID], [Name], [Email], JSON_VALUE([Metadata], '$.id') AS [Metadata], JSON_VALUE([Profile], '$.id') AS [Profile], JSON_VALUE([Settings], '$.id') AS [Settings] FROM [dbo].[Users] ORDER BY [UserID] ASC"
if query != expected {
t.Errorf("Unexpected query.\nExpected: %s\nGot: %s", expected, query)
}
}

View File

@@ -3,14 +3,17 @@ package dbwrapper
import (
"context"
"errors"
"fmt"
"strings"
dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
func init() {
Register("postgres", func() DbWrapper {
return &postgresDbWrapper{dialect: "postgres"}
Register(dbdialects.Postgres, func() DbWrapper {
return &postgresDbWrapper{dialect: dbdialects.Postgres}
})
}
@@ -126,3 +129,75 @@ func (pw *postgresDbWrapper) SaveMassive(ctx context.Context, schema string, tab
return affectedRows, nil
}
func (pw *postgresDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery) (RowsResult, error) {
var sbQuery strings.Builder
sbQuery.WriteString("SELECT ")
if len(q.Columns) == 0 {
sbQuery.WriteString("*")
} else {
for i, col := range q.Columns {
switch col.Type() {
case "GEOMETRY":
fmt.Fprintf(&sbQuery, `ST_AsEWKB("%s") AS "%s"`, col.Name(), col.Name())
default:
fmt.Fprintf(&sbQuery, `"%s"`, col.Name())
}
if i < len(q.Columns)-1 {
sbQuery.WriteString(", ")
}
}
}
fmt.Fprintf(&sbQuery, ` FROM "%s"."%s"`, q.Schema, q.Table)
if q.LowerLimit.IsValid || q.UpperLimit.IsValid {
sbQuery.WriteString(" WHERE ")
paramIdx := 1
if q.LowerLimit.IsValid {
fmt.Fprintf(&sbQuery, `"%s"`, q.PrimaryKey)
if q.LowerLimit.IsInclusive {
sbQuery.WriteString(" >=")
} else {
sbQuery.WriteString(" >")
}
fmt.Fprintf(&sbQuery, " $%d", paramIdx)
paramIdx++
}
if q.LowerLimit.IsValid && q.UpperLimit.IsValid {
sbQuery.WriteString(" AND ")
}
if q.UpperLimit.IsValid {
fmt.Fprintf(&sbQuery, `"%s"`, q.PrimaryKey)
if q.UpperLimit.IsInclusive {
sbQuery.WriteString(" <=")
} else {
sbQuery.WriteString(" <")
}
fmt.Fprintf(&sbQuery, " $%d", paramIdx)
paramIdx++
}
}
fmt.Fprintf(&sbQuery, ` ORDER BY "%s" ASC`, q.PrimaryKey)
queryString := sbQuery.String()
var queryArgs []any
if q.LowerLimit.IsValid {
queryArgs = append(queryArgs, q.LowerLimit.Value)
}
if q.UpperLimit.IsValid {
queryArgs = append(queryArgs, q.UpperLimit.Value)
}
return pw.Query(ctx, queryString, queryArgs...)
}

View File

@@ -3,6 +3,9 @@ package dbwrapper
import (
"context"
"errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
var MethodNotSupported error = errors.New("Method not supported by driver... yet :P")
@@ -24,6 +27,22 @@ type RowResult interface {
Scan(dest ...any) error
}
type ExtractorQueryLimit struct {
IsValid bool
IsInclusive bool
Value int64
}
type ExtractionQuery struct {
Schema string
Table string
PrimaryKey string
Columns []models.ColumnType
LowerLimit ExtractorQueryLimit
UpperLimit ExtractorQueryLimit
FromJsonColumns []config.FromJsonItem
}
type DbWrapper interface {
Close() error
Connect(ctx context.Context, dbUrl string) error
@@ -32,4 +51,5 @@ type DbWrapper interface {
Query(ctx context.Context, query string, args ...any) (RowsResult, error)
QueryRow(ctx context.Context, query string, args ...any) RowResult
SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error)
QueryFromObject(ctx context.Context, query ExtractionQuery) (RowsResult, error)
}

View File

@@ -0,0 +1,108 @@
package extractors
import (
"context"
"errors"
"slices"
"strings"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/sirupsen/logrus"
)
func (ex *GenericExtractor) Consume(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
retryConfig config.RetryConfig,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
failedPartitionsCount *int32,
fromJsonColumns []config.FromJsonItem,
) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
})
if indexPrimaryKey == -1 {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{
ShouldCancelJob: true,
Msg: "Primary key not found in provided columns",
}:
}
return
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case partition, ok := <-chPartitionsIn:
if !ok {
return
}
rowsReadResult, err := ex.ProcessPartitionWithRetries(
ctx,
tableInfo,
columns,
batchSize,
partition,
indexPrimaryKey,
retryConfig,
chBatchesOut,
fromJsonColumns,
)
wgActivePartitions.Done()
if rowsReadResult > 0 {
current := atomic.LoadInt64(rowsRead)
logrus.Debugf("Rows read (partition extracted): +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table)
atomic.AddInt64(rowsRead, int64(rowsReadResult))
}
if err != nil {
atomic.AddInt32(failedPartitionsCount, 1)
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select {
case <-ctx.Done():
return
case chErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
}
}
currentFPCount := atomic.LoadInt32(failedPartitionsCount)
if currentFPCount > int32(retryConfig.MaxFailedPartitions) {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed partitions reached"}:
return
}
}
}
}
}
}

View File

@@ -0,0 +1,42 @@
package extractors
import (
"context"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type GenericExtractor struct {
db dbwrapper.DbWrapper
}
func NewExtractor(db dbwrapper.DbWrapper) GenericExtractor {
return GenericExtractor{db: db}
}
func sendBatch(ctx context.Context, chBatchesOut chan<- models.Batch, batch models.Batch) error {
select {
case chBatchesOut <- batch:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func flush(
ctx context.Context,
partition *models.Partition,
batchSize int,
batchRows []models.UnknownRowValues,
chBatchesOut chan<- models.Batch,
) error {
if len(batchRows) == 0 {
return nil
}
batch := models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows}
batchRows = make([]models.UnknownRowValues, 0, batchSize)
return sendBatch(ctx, chBatchesOut, batch)
}

View File

@@ -1,277 +0,0 @@
package extractors
import (
"context"
"database/sql"
"errors"
"fmt"
"slices"
"strings"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type MssqlExtractor struct {
db dbwrapper.DbWrapper
}
func NewMssqlExtractor(db dbwrapper.DbWrapper) etl.Extractor {
return &MssqlExtractor{db: db}
}
func buildExtractQueryMssql(
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
includeRange bool,
isMinInclusive bool,
) string {
var sbQuery strings.Builder
sbQuery.WriteString("SELECT ")
if len(columns) == 0 {
sbQuery.WriteString("*")
} else {
for i, col := range columns {
fmt.Fprintf(&sbQuery, "[%s]", col.Name())
// if col.Type() == "GEOMETRY" {
// fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
// }
if i < len(columns)-1 {
sbQuery.WriteString(", ")
}
}
}
fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", tableInfo.Schema, tableInfo.Table)
if includeRange {
fmt.Fprintf(&sbQuery, " WHERE [%s]", tableInfo.PrimaryKey)
if isMinInclusive {
sbQuery.WriteString(" >=")
} else {
sbQuery.WriteString(" >")
}
fmt.Fprintf(&sbQuery, " @min AND [%s] <= @max", tableInfo.PrimaryKey)
}
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", tableInfo.PrimaryKey)
return sbQuery.String()
}
func errorFromLastRow(
lastRow models.UnknownRowValues,
indexPrimaryKey int,
partition models.Partition,
previousError error,
) *custom_errors.ExtractorError {
lastIdRawValue := lastRow[indexPrimaryKey]
lastId, ok := convert.ToInt64(lastIdRawValue)
if !ok {
currentPartition := partition
currentPartition.RetryCounter = 3
return &custom_errors.ExtractorError{
Partition: currentPartition,
HasLastId: true,
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
}
}
return &custom_errors.ExtractorError{
Partition: partition,
HasLastId: true,
LastId: lastId,
Msg: previousError.Error(),
}
}
func (mssqlEx *MssqlExtractor) ProcessPartition(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int, error) {
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive)
var queryArgs []any
if partition.HasRange {
queryArgs = append(queryArgs,
sql.Named("min", partition.Range.Min),
sql.Named("max", partition.Range.Max),
)
}
rowsRead := 0
rows, err := mssqlEx.db.Query(ctx, query, queryArgs...)
if err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}
defer rows.Close()
batchRows := make([]models.UnknownRowValues, 0, batchSize)
for rows.Next() {
rowValues := make([]any, len(columns))
scanArgs := make([]any, len(columns))
for i := range rowValues {
scanArgs[i] = &rowValues[i]
}
if err := rows.Scan(scanArgs...); err != nil {
if len(batchRows) == 0 {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}
lastRow := batchRows[len(batchRows)-1]
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, ctx.Err()
}
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
}
rowsRead++
batchRows = append(batchRows, rowValues)
if len(batchRows) >= batchSize {
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, ctx.Err()
}
batchRows = make([]models.UnknownRowValues, 0, batchSize)
}
}
if err := rows.Err(); err != nil {
if errors.Is(err, ctx.Err()) {
return rowsRead, ctx.Err()
}
if len(batchRows) > 0 {
lastRow := batchRows[len(batchRows)-1]
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
}
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}
if len(batchRows) > 0 {
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, ctx.Err()
}
}
return rowsRead, nil
}
func (mssqlEx *MssqlExtractor) Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
})
if indexPrimaryKey == -1 {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- custom_errors.JobError{
ShouldCancelJob: true,
Msg: "Primary key not found in provided columns",
}:
}
return
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case partition, ok := <-chPartitionsIn:
if !ok {
return
}
rowsReadResult, err := mssqlEx.ProcessPartition(
ctx,
tableInfo,
columns,
batchSize,
partition,
indexPrimaryKey,
chBatchesOut,
)
if rowsReadResult > 0 {
atomic.AddInt64(rowsRead, int64(rowsReadResult))
}
if err != nil {
var exError *custom_errors.ExtractorError
var jobError *custom_errors.JobError
if errors.As(err, &exError) {
select {
case <-ctx.Done():
return
case chErrorsOut <- *exError:
}
} else if errors.As(err, &jobError) {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}:
}
}
continue
}
wgActivePartitions.Done()
}
}
}

View File

@@ -1,125 +0,0 @@
package extractors
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type PostgresExtractor struct {
db dbwrapper.DbWrapper
}
func NewPostgresExtractor(db dbwrapper.DbWrapper) etl.Extractor {
return &PostgresExtractor{db: db}
}
func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []models.ColumnType) string {
var sbColumns strings.Builder
if len(columns) == 0 {
sbColumns.WriteString("*")
} else {
for i, col := range columns {
if col.Type() == "GEOMETRY" {
sbColumns.WriteString(`ST_AsEWKB("`)
sbColumns.WriteString(col.Name())
sbColumns.WriteString(`") AS "`)
sbColumns.WriteString(col.Name())
sbColumns.WriteString(`"`)
} else {
sbColumns.WriteString(`"`)
sbColumns.WriteString(col.Name())
sbColumns.WriteString(`"`)
}
if i < len(columns)-1 {
sbColumns.WriteString(", ")
}
}
}
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey)
}
func (postgresEx *PostgresExtractor) ProcessPartition(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int, error) {
query := buildExtractQueryPostgres(tableInfo, columns)
if partition.HasRange {
return 0, errors.New("Batch config not yet supported")
}
rowsRead := 0
rows, err := postgresEx.db.Query(ctx, query)
if err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}
defer rows.Close()
batchRows := make([]models.UnknownRowValues, 0, batchSize)
for rows.Next() {
values, err := rows.Values()
if err != nil {
return rowsRead, errors.New("Unexpected error reading rows from source")
}
rowsRead++
batchRows = append(batchRows, values)
if len(batchRows) >= batchSize {
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, ctx.Err()
}
batchRows = make([]models.UnknownRowValues, 0, batchSize)
}
}
if err := rows.Err(); err != nil {
return rowsRead, errors.New("Unexpected error reading rows from source")
}
if len(batchRows) > 0 {
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, nil
}
}
return rowsRead, nil
}
func (postgresEx *PostgresExtractor) Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
}

View File

@@ -0,0 +1,77 @@
package extractors
import (
"context"
"errors"
"fmt"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
// "github.com/sirupsen/logrus"
)
func (ex *GenericExtractor) ProcessPartitionWithRetries(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
retryConfig config.RetryConfig,
chBatchesOut chan<- models.Batch,
fromJsonColumns []config.FromJsonItem,
) (int64, error) {
var totalRowsRead int64
currentParitition := partition
for {
rowsRead, err := ex.ProcessPartition(
ctx,
tableInfo,
columns,
batchSize,
currentParitition,
indexPrimaryKey,
chBatchesOut,
fromJsonColumns,
)
// logrus.Debugf("Partition %v finished processing (%s.%s)", partition.Id, tableInfo.Schema, tableInfo.Table)
totalRowsRead += rowsRead
if err == nil {
return totalRowsRead, nil
}
if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok {
currentParitition.RetryCounter++
if currentParitition.RetryCounter >= retryConfig.Attempts {
return totalRowsRead, &custom_errors.JobError{
Msg: fmt.Sprintf("Partition %v reached max retries (%d)", currentParitition.Id, currentParitition.RetryCounter),
Prev: err,
}
}
if exError.HasLastId {
currentParitition.ParentId = exError.Partition.Id
currentParitition.Id = uuid.New()
currentParitition.Range.Min = exError.LastId
currentParitition.Range.IsMinInclusive = false
}
delay := custom_errors.ComputeBackoffDelay(
currentParitition.RetryCounter,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,
retryConfig.MaxJitterMs,
)
time.Sleep(delay)
continue
}
return totalRowsRead, err
}
}

View File

@@ -0,0 +1,118 @@
package extractors
import (
"context"
"fmt"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
// "github.com/sirupsen/logrus"
)
func errorFromLastPartitionRow(
lastRow models.UnknownRowValues,
indexPrimaryKey int,
partition models.Partition,
previousError error,
) error {
lastIdRawValue := lastRow[indexPrimaryKey]
lastId, ok := convert.ToInt64(lastIdRawValue)
if !ok {
currentPartition := partition
currentPartition.RetryCounter = 3
return &custom_errors.ExtractorError{
Partition: currentPartition,
HasLastId: true,
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
}
}
return &custom_errors.ExtractorError{
Partition: partition,
HasLastId: true,
LastId: lastId,
Msg: previousError.Error(),
}
}
func (ex *GenericExtractor) ProcessPartition(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
fromJsonColumns []config.FromJsonItem,
) (int64, error) {
query := dbwrapper.ExtractionQuery{
Schema: tableInfo.Schema,
Table: tableInfo.Table,
PrimaryKey: tableInfo.PrimaryKey,
Columns: columns,
LowerLimit: dbwrapper.ExtractorQueryLimit{
IsValid: partition.HasRange && partition.Range.Min > 0,
IsInclusive: partition.Range.IsMinInclusive,
Value: partition.Range.Min,
},
UpperLimit: dbwrapper.ExtractorQueryLimit{
IsValid: partition.HasRange && partition.Range.Max > 0,
IsInclusive: partition.Range.IsMaxInclusive,
Value: partition.Range.Max,
},
FromJsonColumns: fromJsonColumns,
}
// logrus.Debugf("Processing partition: %+v (%s.%s)", query, tableInfo.Schema, tableInfo.Table)
rows, err := ex.db.QueryFromObject(ctx, query)
if err != nil {
return 0, err
}
defer rows.Close()
batchRows := make([]models.UnknownRowValues, 0, batchSize)
var rowsRead int64 = 0
for rows.Next() {
rowValues := make([]any, len(columns))
scanArgs := make([]any, len(columns))
for i := range rowValues {
scanArgs[i] = &rowValues[i]
}
if err := rows.Scan(scanArgs...); err != nil {
if len(batchRows) == 0 {
return rowsRead, err
}
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
return rowsRead, err
}
lastRow := batchRows[len(batchRows)-1]
return rowsRead, errorFromLastPartitionRow(lastRow, indexPrimaryKey, partition, err)
}
rowsRead++
batchRows = append(batchRows, rowValues)
if len(batchRows) >= batchSize {
// logrus.Debugf("Batch size reached, flushing batch with %v rows (rowsRead=%v)", len(batchRows), rowsRead)
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
// logrus.Warnf("Error flushing rows: %v", err)
return rowsRead, err
}
batchRows = make([]models.UnknownRowValues, 0, batchSize)
}
}
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
return rowsRead, err
}
return rowsRead, rows.Err()
}

View File

@@ -0,0 +1,154 @@
package loaders
import (
"context"
"errors"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
)
func (gl *GenericLoader) Consume(
ctx context.Context,
tableInfo config.TargetTableInfo,
columns []models.ColumnType,
retryConfig config.RetryConfig,
batchSize int,
chBatchesIn <-chan models.Batch,
chErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
rowsLoaded *int64,
failedBatchesCount *int32,
) {
colNames := mapSlice(columns, func(col models.ColumnType) string {
return col.Name()
})
var accRows []models.UnknownRowValues
var parentBatchesId []uuid.UUID
pendingDone := 0
defer func() {
for range pendingDone {
wgActiveBatches.Done()
}
}()
flush := func() bool {
if len(accRows) == 0 {
return true
}
count := len(parentBatchesId)
superBatch := models.Batch{
Id: uuid.New(),
ParentBatchesId: parentBatchesId,
Rows: accRows,
}
processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, superBatch)
for range count {
wgActiveBatches.Done()
}
pendingDone -= count
accRows = nil
parentBatchesId = nil
if err != nil {
atomic.AddInt32(failedBatchesCount, 1)
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select {
case <-ctx.Done():
return false
case chErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return false
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
}
}
if atomic.LoadInt32(failedBatchesCount) > int32(retryConfig.MaxFailedBatchesLoad) {
select {
case <-ctx.Done():
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}:
}
return false
}
return true
}
current := atomic.LoadInt64(rowsLoaded)
logrus.Debugf("Rows loaded (batch loaded): +%v [current=%v] (%s.%s)", processedRows, current, tableInfo.Schema, tableInfo.Table)
atomic.AddInt64(rowsLoaded, int64(processedRows))
return true
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case batch, ok := <-chBatchesIn:
if !ok {
flush()
return
}
if batchSize <= 0 {
processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, batch)
wgActiveBatches.Done()
if err != nil {
atomic.AddInt32(failedBatchesCount, 1)
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select {
case <-ctx.Done():
return
case chErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
}
}
if atomic.LoadInt32(failedBatchesCount) > int32(retryConfig.MaxFailedBatchesLoad) {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}:
return
}
}
continue
}
current := atomic.LoadInt64(rowsLoaded)
logrus.Debugf("Rows loaded: +%v [current=%v] (%s.%s)", processedRows, current, tableInfo.Schema, tableInfo.Table)
atomic.AddInt64(rowsLoaded, int64(processedRows))
continue
}
pendingDone++
accRows = append(accRows, batch.Rows...)
parentBatchesId = append(parentBatchesId, batch.Id)
if len(accRows) >= batchSize {
if !flush() {
return
}
}
}
}
}

View File

@@ -1,117 +1,13 @@
package loaders
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgconn"
)
type GenericLoader struct {
db dbwrapper.DbWrapper
}
func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader {
return &GenericLoader{db: db}
}
func (gl *GenericLoader) ProcessBatch(
ctx context.Context,
tableInfo config.TargetTableInfo,
colNames []string,
batch models.Batch,
) (int, error) {
_, err := gl.db.SaveMassive(
ctx,
tableInfo.Schema,
tableInfo.Table,
colNames,
batch.Rows,
)
if err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
if pgErr.Code == "23505" {
return 0, &custom_errors.JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table),
Prev: err,
}
}
}
return 0, &custom_errors.LoaderError{Batch: batch, Msg: err.Error()}
}
return len(batch.Rows), nil
}
func (gl *GenericLoader) Exec(
ctx context.Context,
tableInfo config.TargetTableInfo,
columns []models.ColumnType,
chBatchesIn <-chan models.Batch,
chErrorsOut chan<- custom_errors.LoaderError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
rowsLoaded *int64,
) {
colNames := mapSlice(columns, func(col models.ColumnType) string {
return col.Name()
})
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case batch, ok := <-chBatchesIn:
if !ok {
return
}
processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
if err != nil {
var ldError *custom_errors.LoaderError
var jobError *custom_errors.JobError
if errors.As(err, &ldError) {
select {
case <-ctx.Done():
return
case chErrorsOut <- *ldError:
}
} else if errors.As(err, &jobError) {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.LoaderError{Batch: batch, Msg: err.Error()}:
}
}
continue
}
wgActiveBatches.Done()
atomic.AddInt64(rowsLoaded, int64(processedRows))
}
}
func NewGenericLoader(db dbwrapper.DbWrapper) GenericLoader {
return GenericLoader{db: db}
}

View File

@@ -0,0 +1,49 @@
package loaders
import (
"context"
"errors"
"fmt"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
func (gl *GenericLoader) ProcessBatchWithRetries(
ctx context.Context,
tableInfo config.TargetTableInfo,
colNames []string,
retryConfig config.RetryConfig,
batch models.Batch,
) (int64, error) {
for {
rowsLoaded, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
if err == nil {
return rowsLoaded, nil
}
if btError, ok := errors.AsType[*custom_errors.LoaderError](err); ok {
batch.RetryCounter++
if batch.RetryCounter >= retryConfig.Attempts {
return rowsLoaded, &custom_errors.JobError{
Msg: fmt.Sprintf("Batch %v reached max retries (%d)", batch.Id, batch.RetryCounter),
Prev: btError,
}
}
delay := custom_errors.ComputeBackoffDelay(
batch.RetryCounter,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,
retryConfig.MaxJitterMs,
)
time.Sleep(delay)
continue
}
return rowsLoaded, err
}
}

View File

@@ -0,0 +1,43 @@
package loaders
import (
"context"
"errors"
"fmt"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgconn"
)
func (gl *GenericLoader) ProcessBatch(
ctx context.Context,
tableInfo config.TargetTableInfo,
colNames []string,
batch models.Batch,
) (int64, error) {
_, err := gl.db.SaveMassive(
ctx,
tableInfo.Schema,
tableInfo.Table,
colNames,
batch.Rows,
)
if err != nil {
if pgErr, ok := errors.AsType[*pgconn.PgError](err); ok {
if pgErr.Code == "23505" {
return 0, &custom_errors.JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table),
Prev: err,
}
}
}
return 0, &custom_errors.LoaderError{Batch: batch, Msg: err.Error()}
}
return int64(len(batch.Rows)), nil
}

View File

@@ -7,6 +7,7 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
)
func PartitionRangeGenerator(
@@ -15,8 +16,24 @@ func PartitionRangeGenerator(
tableInfo config.TableInfo,
partitionColumn string,
rowsPerPartition int64,
jobRange config.RangeConfig,
) ([]models.Partition, error) {
if jobRange.Min > 0 {
return []models.Partition{{
Id: uuid.New(),
HasRange: true,
RetryCounter: 0,
Range: models.PartitionRange{
Min: jobRange.Min,
Max: jobRange.Max,
IsMinInclusive: jobRange.IsMinInclusive,
IsMaxInclusive: jobRange.IsMaxInclusive,
},
}}, nil
}
rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo)
logrus.Infof("Estimated rows in source: %v (%s.%s)", rowsCount, tableInfo.Schema, tableInfo.Table)
if err != nil {
return nil, err
}
@@ -36,5 +53,7 @@ func PartitionRangeGenerator(
return nil, err
}
// logrus.Debugf("Partitions: %+v (%s.%s)", partitions, tableInfo.Schema, tableInfo.Table)
return partitions, nil
}

View File

@@ -39,8 +39,6 @@ JOIN sys.schemas s ON st.schema_id = s.schema_id
WHERE s.name = @schema AND st.name = @table AND (c.is_hidden = 0 OR (c.graph_type IS NOT NULL AND c.name LIKE '$%'))
ORDER BY c.column_id;`
// AND c.name NOT LIKE '$%'
type rawColumnMssql struct {
name string
userType string
@@ -236,6 +234,7 @@ ORDER BY batch_id`,
RetryCounter: 0,
Range: models.PartitionRange{
IsMinInclusive: true,
IsMaxInclusive: true,
},
}

View File

@@ -0,0 +1,117 @@
package transformers
import (
"context"
"errors"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
func (mssqlTr *MssqlTransformer) Consume(
ctx context.Context,
columns []models.ColumnType,
retryConfig config.RetryConfig,
batchSize int,
chBatchesIn <-chan models.Batch,
chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
) {
transformationPlan := computeTransformationPlan(columns)
storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable)
transformationPlan = append(transformationPlan, storagePlan...)
var accRows []models.UnknownRowValues
var parentBatchesId []uuid.UUID
var firstPartitionId uuid.UUID
flush := func() bool {
if len(accRows) == 0 {
return true
}
out := models.Batch{
Id: uuid.New(),
PartitionId: firstPartitionId,
ParentBatchesId: parentBatchesId,
Rows: accRows,
}
select {
case chBatchesOut <- out:
wgActiveBatches.Add(1)
case <-ctx.Done():
return false
}
accRows = nil
parentBatchesId = nil
firstPartitionId = uuid.Nil
return true
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case batch, ok := <-chBatchesIn:
if !ok {
flush()
return
}
if len(transformationPlan) > 0 {
err := ProcessBatchWithRetries(ctx, &batch, transformationPlan, retryConfig)
if err != nil {
if errors.Is(err, ctx.Err()) {
return
}
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select {
case chJobErrorsOut <- *jobError:
case <-ctx.Done():
return
}
} else {
select {
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
case <-ctx.Done():
return
}
}
return
}
}
if batchSize <= 0 {
select {
case chBatchesOut <- batch:
wgActiveBatches.Add(1)
case <-ctx.Done():
return
}
continue
}
if len(parentBatchesId) == 0 {
firstPartitionId = batch.PartitionId
}
accRows = append(accRows, batch.Rows...)
parentBatchesId = append(parentBatchesId, batch.Id)
if len(accRows) >= batchSize {
if !flush() {
return
}
}
}
}
}

View File

@@ -1,110 +1,21 @@
package transformers
import (
"context"
"errors"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
type MssqlTransformer struct{}
func NewMssqlTransformer() etl.Transformer {
return &MssqlTransformer{}
type MssqlTransformer struct {
toStorage config.ToStorageConfig
sourceTable config.SourceTableInfo
azureClient *azure.Client
}
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
return []etl.ColumnTransformPlan{}
}
const processBatchCtxCheck = 4096
func (mssqlTr *MssqlTransformer) ProcessBatch(
ctx context.Context,
batch *models.Batch,
transformationPlan []etl.ColumnTransformPlan,
) error {
for i, rowValues := range batch.Rows {
if i%processBatchCtxCheck == 0 {
if err := ctx.Err(); err != nil {
return err
}
}
for _, task := range transformationPlan {
val := rowValues[task.Index]
if val == nil {
continue
}
transformed, err := task.Fn(val)
if err != nil {
return err
}
rowValues[task.Index] = transformed
}
}
return nil
}
func (mssqlTr *MssqlTransformer) Exec(
ctx context.Context,
columns []models.ColumnType,
chBatchesIn <-chan models.Batch,
chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
) {
transformationPlan := computeTransformationPlan(columns)
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case batch, ok := <-chBatchesIn:
if !ok {
return
}
if len(transformationPlan) == 0 {
select {
case chBatchesOut <- batch:
wgActiveBatches.Add(1)
continue
case <-ctx.Done():
return
}
}
err := mssqlTr.ProcessBatch(ctx, &batch, transformationPlan)
if err != nil {
if errors.Is(err, ctx.Err()) {
return
}
select {
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
case <-ctx.Done():
}
return
}
select {
case chBatchesOut <- batch:
case <-ctx.Done():
return
}
wgActiveBatches.Add(1)
}
func NewMssqlTransformer(toStorage config.ToStorageConfig, sourceTable config.SourceTableInfo, azureClient *azure.Client) etl.Transformer {
return &MssqlTransformer{
toStorage: toStorage,
sourceTable: sourceTable,
azureClient: azureClient,
}
}

View File

@@ -0,0 +1,122 @@
package transformers
import (
"context"
"fmt"
"strings"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
)
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
var plan []etl.ColumnTransformPlan
for i, col := range columns {
switch col.SystemType() {
case "uniqueidentifier":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return mssqlUuidToBigEndian(b)
}
return v, nil
},
})
case "geometry", "geography":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return wkbToEwkbWithSrid(b, 4326)
}
return v, nil
},
})
case "datetime", "datetime2":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if t, ok := v.(time.Time); ok {
return ensureUTC(t), nil
}
return v, nil
},
})
}
}
return plan
}
func computeStorageTransformationPlan(
ctx context.Context,
azureClient *azure.Client,
toStorage config.ToStorageConfig,
sourceColumns []models.ColumnType,
sourceTable config.SourceTableInfo,
) []etl.ColumnTransformPlan {
if azureClient == nil || len(toStorage.Columns) == 0 {
return nil
}
colIndex := make(map[string]int, len(sourceColumns))
for i, col := range sourceColumns {
colIndex[strings.ToUpper(col.Name())] = i
}
var plan []etl.ColumnTransformPlan
for _, storageCol := range toStorage.Columns {
if storageCol.Mode != "REFERENCE_ONLY" {
logrus.Warnf("to_storage: unsupported mode %q for column %s — skipping", storageCol.Mode, storageCol.Source)
continue
}
idx, ok := colIndex[strings.ToUpper(storageCol.Source)]
if !ok {
logrus.Warnf("to_storage: source column %q not found in source schema — skipping", storageCol.Source)
continue
}
sourceColName := storageCol.Source
schema := sourceTable.Schema
table := sourceTable.Table
plan = append(plan, etl.ColumnTransformPlan{
Index: idx,
Fn: func(v any) (any, error) {
if v == nil {
return nil, nil
}
b, ok := v.([]byte)
if !ok {
logrus.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through",
schema, table, sourceColName, v)
return v, nil
}
// start := time.Now()
blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String())
blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b)
if err != nil {
return nil, &custom_errors.JobError{
Msg: fmt.Sprintf("Error uploading %s.%s.%s", schema, table, sourceColName),
Prev: err,
}
}
// logrus.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds())
return blobURL, nil
},
})
}
return plan
}

View File

@@ -0,0 +1,73 @@
package transformers
import (
"context"
"errors"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
const processBatchCtxCheck = 4096
func ProcessBatchWithRetries(
ctx context.Context,
batch *models.Batch,
transformationPlan []etl.ColumnTransformPlan,
retryConfig config.RetryConfig,
) error {
for i, rowValues := range batch.Rows {
if i%processBatchCtxCheck == 0 {
if err := ctx.Err(); err != nil {
return err
}
}
for _, task := range transformationPlan {
val := rowValues[task.Index]
if val == nil {
continue
}
var lastErr error
success := false
for attempt := 0; attempt < retryConfig.Attempts; attempt++ {
transformed, err := task.Fn(val)
if err == nil {
rowValues[task.Index] = transformed
success = true
break
}
lastErr = err
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
if jobError.ShouldCancelJob {
return jobError
}
}
if attempt == retryConfig.Attempts-1 {
break
}
delay := custom_errors.ComputeBackoffDelay(
attempt,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,
retryConfig.MaxJitterMs,
)
time.Sleep(delay)
}
if !success {
return lastErr
}
}
}
return nil
}

View File

@@ -1 +0,0 @@
package transformers

View File

@@ -9,31 +9,6 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
type Extractor interface {
ProcessPartition(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int, error)
Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
)
}
type TransformerFunc func(any) (any, error)
type ColumnTransformPlan struct {
@@ -42,42 +17,18 @@ type ColumnTransformPlan struct {
}
type Transformer interface {
ProcessBatch(
ctx context.Context,
batch *models.Batch,
transformationPlan []ColumnTransformPlan,
) error
Exec(
Consume(
ctx context.Context,
columns []models.ColumnType,
retryConfig config.RetryConfig,
batchSize int,
chBatchesIn <-chan models.Batch,
chBactchesOut chan<- models.Batch,
chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
)
}
type Loader interface {
ProcessBatch(
ctx context.Context,
tableInfo config.TargetTableInfo,
colNames []string,
batch models.Batch,
) (int, error)
Exec(
ctx context.Context,
tableInfo config.TargetTableInfo,
columns []models.ColumnType,
chBatchesIn <-chan models.Batch,
chErrorsOut chan<- custom_errors.LoaderError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
rowsLoaded *int64,
)
}
type TableAnalyzer interface {
QueryColumnTypes(
ctx context.Context,

View File

@@ -9,10 +9,11 @@ import (
type UnknownRowValues = []any
type Batch struct {
Id uuid.UUID
PartitionId uuid.UUID
Rows []UnknownRowValues
RetryCounter int
Id uuid.UUID
PartitionId uuid.UUID
ParentBatchesId []uuid.UUID
Rows []UnknownRowValues
RetryCounter int
}
type PartitionRange struct {

44
scripts/az-blob/main.go Normal file
View File

@@ -0,0 +1,44 @@
package main
import (
"context"
"fmt"
"log"
"math/rand"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
)
func main() {
cfg := config.App.AzureStorage
containerName := cfg.Container
client, err := azure.NewClient(cfg)
if err != nil {
log.Fatalf("Error creando cliente: %v", err)
}
ctx := context.Background()
var wg sync.WaitGroup
for i := 1; i <= 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
blobName := fmt.Sprintf("%sarchivo-%d.txt", cfg.Prefix, id)
content := fmt.Sprintf("Contenido aleatorio: %d", rand.Intn(100000))
err := client.UploadBuffer(ctx, containerName, blobName, []byte(content))
if err != nil {
log.Printf("Fallo al subir %s: %v", blobName, err)
} else {
fmt.Printf("Subido exitosamente: %s\n", blobName)
}
}(i)
}
wg.Wait()
}

View File

@@ -12,9 +12,10 @@ import (
)
const (
totalRows int = 1_000_000
chunkSize int = 50_000
queueSize int = 4
// totalRows int = 1_000_000
totalRows int = 1000
chunkSize int = 200
queueSize int = 4
)
func main() {
@@ -40,6 +41,14 @@ func main() {
seedManzanas(ctx, db)
})
wgSeed.Go(func() {
seedPuertos(ctx, db)
})
wgSeed.Go(func() {
seedSiteHolderAttach(ctx, db)
})
wgSeed.Wait()
}

View File

@@ -0,0 +1,227 @@
package main
import (
"bytes"
"context"
"database/sql"
"fmt"
"math/rand"
"sync"
"time"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
)
var siteHolderAttachJob = MigrationJob{
Schema: "Infraestructura",
Table: "SITE_HOLDER__ATTACH",
}
func seedSiteHolderAttach(ctx context.Context, db *sql.DB) error {
maxOid, err := getMaxGDBArchiveOidForAttach(ctx, db)
if err != nil {
log.Fatal("Error getting max GDB_ARCHIVE_OID: ", err)
}
log.Infof("Starting SITE_HOLDER__ATTACH data generation from GDB_ARCHIVE_OID: %d", maxOid+1)
rowsChan := make(chan []UnknownRowValues, queueSize)
var wgRowGenerator sync.WaitGroup
wgRowGenerator.Go(func() {
generateSiteHolderAttachRows(ctx, maxOid, totalRows, chunkSize, rowsChan)
})
columns := []string{
"GDB_ARCHIVE_OID",
"REL_GLOBALID",
"CONTENT_TYPE",
"ATT_NAME",
"DATA_SIZE",
"DATA",
"GLOBALID",
"GDB_FROM_DATE",
"GDB_TO_DATE",
"ATTACHMENTID",
}
if err := loadRowsMssql(ctx, siteHolderAttachJob, columns, db, rowsChan); err != nil {
return fmt.Errorf("Error loading rows (SITE_HOLDER__ATTACH): %w", err)
}
log.Info("Data generation and loading completed successfully (SITE_HOLDER__ATTACH)")
wgRowGenerator.Wait()
return nil
}
func getMaxGDBArchiveOidForAttach(ctx context.Context, db *sql.DB) (int, error) {
var maxOid sql.NullInt64
query := fmt.Sprintf(`
SELECT ISNULL(MAX(GDB_ARCHIVE_OID), 0)
FROM [%s].[%s]
`, siteHolderAttachJob.Schema, siteHolderAttachJob.Table)
err := db.QueryRowContext(ctx, query).Scan(&maxOid)
if err != nil && err != sql.ErrNoRows {
return 0, err
}
if !maxOid.Valid {
return 0, nil
}
return int(maxOid.Int64), nil
}
func generateSiteHolderAttachRows(
ctx context.Context,
startOid int,
totalRows int,
chunkSize int,
out chan<- []UnknownRowValues,
) {
defer close(out)
rowsGenerated := 0
currentChunk := make([]UnknownRowValues, 0, chunkSize)
for i := range totalRows {
gdbArchiveOid := startOid + i + 1
row := generateSiteHolderAttachRow(gdbArchiveOid)
currentChunk = append(currentChunk, row)
rowsGenerated++
if len(currentChunk) == chunkSize {
select {
case out <- currentChunk:
log.Debugf("Sent SITE_HOLDER__ATTACH chunk with %d rows", len(currentChunk))
case <-ctx.Done():
log.Info("Context cancelled, stopping SITE_HOLDER__ATTACH row generation")
return
}
currentChunk = make([]UnknownRowValues, 0, chunkSize)
}
if rowsGenerated%100_000 == 0 {
logSiteHolderAttachSampleRow(rowsGenerated, row)
}
}
if len(currentChunk) > 0 {
select {
case out <- currentChunk:
log.Debugf("Sent final SITE_HOLDER__ATTACH chunk with %d rows", len(currentChunk))
case <-ctx.Done():
log.Info("Context cancelled, stopping SITE_HOLDER__ATTACH row generation")
}
}
log.Infof("Finished generating %d SITE_HOLDER__ATTACH rows", rowsGenerated)
}
func generateSiteHolderAttachRow(gdbArchiveOid int) UnknownRowValues {
dateLowerLimit, _ := time.Parse(time.RFC3339, "2020-12-31T23:59:59Z")
dateUpperLimit, _ := time.Parse(time.RFC3339, "2025-12-31T23:59:59Z")
relGlobalID, _ := uuid.New().MarshalBinary()
contentType := generateRandomContentType()
attName := generateRandomAttachmentName()
binaryData := generateRandomBinaryContent()
dataSize := len(binaryData)
globalID, _ := uuid.New().MarshalBinary()
gdbFromDate := generateRandomTimestamp(dateLowerLimit, dateUpperLimit)
gdbToDate, _ := time.Parse(time.RFC3339, "9999-12-31T23:59:59Z")
attachmentID := rand.Intn(10000) + 1
return UnknownRowValues{
gdbArchiveOid,
relGlobalID,
contentType,
attName,
dataSize,
binaryData,
globalID,
gdbFromDate,
gdbToDate,
attachmentID,
}
}
func generateRandomContentType() string {
contentTypes := []string{
"text/plain",
"application/pdf",
"image/jpeg",
"image/png",
"application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"text/csv",
"application/json",
}
return contentTypes[rand.Intn(len(contentTypes))]
}
func generateRandomAttachmentName() string {
extensions := []string{".txt", ".pdf", ".jpg", ".png", ".doc", ".docx", ".csv", ".json"}
baseName := generateRandomString(20)
extension := extensions[rand.Intn(len(extensions))]
return baseName + extension
}
func generateRandomBinaryContent() []byte {
sizeOptions := []int{100, 500, 1000, 5000, 10000, 50000, 100000}
size := sizeOptions[rand.Intn(len(sizeOptions))]
var buf bytes.Buffer
lineCount := rand.Intn(size/50) + 1
for range lineCount {
line := generateRandomString(rand.Intn(80) + 20)
buf.WriteString(line)
buf.WriteString("\n")
}
for buf.Len() < size {
randomText := generateRandomString(rand.Intn(100) + 50)
buf.WriteString(randomText)
buf.WriteString("\n")
}
result := buf.Bytes()
if len(result) > size {
result = result[:size]
}
return result
}
func logSiteHolderAttachSampleRow(id int, rowValues UnknownRowValues) {
dataBytes := rowValues[5].([]byte)
log.Infof(`
Sample SITE_HOLDER__ATTACH row #%d:
GDB_ARCHIVE_OID: %v
REL_GLOBALID: [binary UUID]
CONTENT_TYPE: %v
ATT_NAME: %v
DATA_SIZE: %v
DATA: [%d bytes of binary content]
GLOBALID: [binary UUID]
GDB_FROM_DATE: %v
GDB_TO_DATE: %v
ATTACHMENTID: %v
`,
id,
rowValues[0],
rowValues[2],
rowValues[3],
rowValues[4],
len(dataBytes),
rowValues[7],
rowValues[8],
rowValues[9],
)
}