commit ba329912013d0f95391ee2564d6dad094359b867 Author: Marek Goc Date: Thu Feb 12 04:52:03 2026 +0100 replica ready diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4c49bd7 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.env diff --git a/README.md b/README.md new file mode 100644 index 0000000..799ed7f --- /dev/null +++ b/README.md @@ -0,0 +1,642 @@ +# MariaDB/MySQL Binlog Replication Service + +A robust MySQL/MariaDB binlog streaming replication service with automatic initial data transfer, resilience features, and comprehensive error handling. Supports single or multi-secondary replica configurations with optional Graylog logging. + +## Installation + +### Quick Install (Go) + +```bash +go install git.ma-al.com/goc_marek/replica/cmd/replica@latest +``` + +### Build from Source + +```bash +# Clone the repository +git clone https://git.ma-al.com/goc_marek/replica.git +cd replica + +# Build the service +go build -o replica ./cmd/replica + +# Or install globally +go install ./cmd/replica +``` + +### Docker + +```bash +# Build the image +docker build -t replica . + +# Run with docker-compose +docker-compose up -d +``` + +## Quick Start + +```bash +# Copy example environment +cp example.env .env + +# Edit .env with your configuration +nano .env + +# Run the service +./replica +``` + +## Features + +### Core Functionality +- **Binlog Streaming**: Real-time replication from MySQL/MariaDB binlog events +- **Initial Data Transfer**: Automatic bulk data transfer when resync is needed +- **Multi-table Support**: Replicates INSERT, UPDATE, DELETE events for multiple tables +- **Position Persistence**: Saves and resumes from last processed binlog position +- **Schema Filtering**: Only replicates events for the configured schema (default: "replica") + +### Resilience & Error Handling +- **Panic Recovery**: Automatic recovery from unexpected panics in event handlers +- **Retry Logic**: Exponential backoff retry for failed SQL operations +- **Connection Health Checks**: Periodic connection health monitoring with auto-reconnect +- **Schema Drift Detection**: Detects and handles schema changes during replication +- **Graceful Degradation**: Skips problematic tables after repeated failures +- **Auto-Reconnect**: Automatic reconnection on connection errors + +### Transfer Features +- **Chunked Transfers**: Efficient batch transfers using primary key ranges +- **Progress Checkpointing**: Saves transfer progress for resume after interruption +- **Pause/Resume**: Support for pausing and resuming initial transfers +- **Transfer Statistics**: Detailed logging of transfer progress and errors + +## Architecture + +### Components + +| File | Purpose | +|------|---------| +| [`cmd/replica/main.go`](cmd/replica/main.go) | Application entry point and configuration | +| [`pkg/replica/service.go`](pkg/replica/service.go) | BinlogSyncService - core replication orchestration | +| [`pkg/replica/handlers.go`](pkg/replica/handlers.go) | EventHandlers - binlog event processing with resilience | +| [`pkg/replica/initial_transfer.go`](pkg/replica/initial_transfer.go) | InitialTransfer - bulk data transfer management | +| [`pkg/replica/position.go`](pkg/replica/position.go) | PositionManager - binlog position persistence | +| [`pkg/replica/sqlbuilder.go`](pkg/replica/sqlbuilder.go) | SQLBuilder - SQL statement generation | +| [`pkg/replica/config.go`](pkg/replica/config.go) | Configuration types | +| [`pkg/replica/logging.go`](pkg/replica/logging.go) | Structured logging with Graylog support | + +### Data Flow + +``` +Primary DB (MySQL/MariaDB) + | + v + Binlog Stream + | + v +BinlogSyncService.processEvent() + | + +---> Panic Recovery Wrapper + | + +---> EventHandlers.HandleRows() + | | + | +---> Schema Filter (only "replica" schema) + | +---> Schema Drift Check + | +---> Retry Logic (if needed) + | +---> Execute SQL + | + +---> PositionManager.Save() + | + +---> Health Checks (every 30s) +``` + +## Usage + +### Quick Start + +```bash +# Build the service +go build -o replica + +# Run the service +./replica +``` + +### Configuration + +### Environment Variables + +All configuration is done via environment variables in the `.env` file: + +```bash +# Copy example environment +cp example.env .env + +# Edit with your settings +nano .env +``` + +### Primary Database Configuration + +| Variable | Description | Default | +|----------|-------------|---------| +| `MARIA_PRIMARY_HOST` | Primary database hostname | `mariadb-primary` | +| `MARIA_PRIMARY_PORT` | Primary database port | `3306` | +| `MARIA_USER` | Replication user | `replica` | +| `MARIA_PASS` | Replication password | `replica` | +| `MARIA_SERVER_ID` | Unique server ID for binlog | `100` | +| `MARIA_PRIMARY_NAME` | Instance name for logging | `mariadb-primary` | + +### Multi-Secondary Replica Configuration + +The service supports replicating to multiple secondary databases simultaneously. Configure secondaries using comma-separated values: + +| Variable | Description | Example | +|----------|-------------|---------| +| `MARIA_SECONDARY_HOSTS` | Comma-separated hostnames | `secondary-1,secondary-2,secondary-3` | +| `MARIA_SECONDARY_PORTS` | Comma-separated ports | `3307,3308,3309` | +| `MARIA_SECONDARY_NAMES` | Comma-separated instance names | `replica-1,replica-2,replica-3` | +| `MARIA_SECONDARY_USERS` | Per-secondary users (optional) | `replica1,replica2,replica3` | +| `MARIA_SECONDARY_PASSWORDS` | Per-secondary passwords (optional) | `pass1,pass2,pass3` | + +#### Example: Single Secondary + +```bash +MARIA_SECONDARY_HOSTS=mariadb-secondary +MARIA_SECONDARY_PORTS=3307 +MARIA_SECONDARY_NAMES=secondary-1 +``` + +#### Example: Three Secondaries + +```bash +MARIA_SECONDARY_HOSTS=secondary-1,secondary-2,secondary-3 +MARIA_SECONDARY_PORTS=3307,3308,3309 +MARIA_SECONDARY_NAMES=replica-1,replica-2,replica-3 +MARIA_SECONDARY_USERS=replica1,replica2,replica3 +MARIA_SECONDARY_PASSWORDS=secret1,secret2,secret3 +``` + +#### Example: Two Secondaries with Different Credentials + +```bash +MARIA_SECONDARY_HOSTS=secondary-1,secondary-2 +MARIA_SECONDARY_PORTS=3307,3308 +MARIA_SECONDARY_NAMES=replica-east,replica-west +MARIA_SECONDARY_USERS=replica_east,replica_west +MARIA_SECONDARY_PASSWORDS=east_secret,west_secret +``` + +**Note:** If `MARIA_SECONDARY_USERS` or `MARIA_SECONDARY_PASSWORDS` are not provided, the default `MARIA_USER` and `MARIA_PASS` will be used for all secondaries. + +### Graylog Configuration + +| Variable | Description | Default | +|----------|-------------|---------| +| `GRAYLOG_ENABLED` | Enable Graylog logging | `false` | +| `GRAYLOG_ENDPOINT` | Graylog GELF endpoint | `localhost:12201` | +| `GRAYLOG_PROTOCOL` | Protocol (udp/tcp) | `udp` | +| `GRAYLOG_TIMEOUT` | Connection timeout | `5s` | +| `GRAYLOG_SOURCE` | Source name for logs | `binlog-sync` | + +#### Enable Graylog Logging + +```bash +# Edit .env and set: +GRAYLOG_ENABLED=true +GRAYLOG_ENDPOINT=graylog.example.com:12201 +GRAYLOG_PROTOCOL=udp +GRAYLOG_SOURCE=binlog-sync-prod +``` + +### Other Settings + +| Variable | Description | Default | +|----------|-------------|---------| +| `TRANSFER_BATCH_SIZE` | Rows per transfer chunk | `1000` | +| `LOCAL_PROJECT_NAME` | Project name for logging | `naluconcept` | + +## Resilience Features + +### Panic Recovery + +All event handlers include automatic panic recovery: + +```go +defer func() { + if r := recover(); r != nil { + log.Printf("[PANIC RECOVERED] %v", r) + } +}() +``` + +This prevents a single malformed event from crashing the entire service. Recovery is implemented in: +- [`HandleRows()`](handlers.go:42) +- [`HandleQuery()`](handlers.go:81) +- [`HandleTableMap()`](handlers.go:100) +- [`processEvent()`](service.go:177) + +### Retry Logic + +Failed SQL operations are retried with exponential backoff: + +```go +func (h *EventHandlers) executeWithRetry(query string) error { + for attempt := 0; attempt <= h.retryAttempts; attempt++ { + if attempt > 0 { + delay := h.retryDelay * time.Duration(1<= h.maxFailures { + log.Printf("[SKIPPED] Too many failures for %s.%s", schema, table) + return nil // Skip this event +} + +// Reset count on successful operation +h.failedTables[key] = 0 +``` + +**Configuration:** +- `maxFailures`: Consecutive failures before skipping (default: 5) + +### Auto-Reconnect + +Connection errors trigger automatic reconnection: + +```go +func (h *EventHandlers) reconnect() { + h.secondaryDB.Close() + + maxRetries := 5 + for i := 0; i < maxRetries; i++ { + h.secondaryDB, err = sql.Open(dsn) + // Configure connection pool + // Ping to verify + if err == nil { + log.Printf("[RECONNECT] Successfully reconnected") + return + } + time.Sleep(time.Duration(i+1) * time.Second) + } +} +``` + +**Detected connection errors:** +- "connection refused" +- "connection reset" +- "broken pipe" +- "timeout" +- "driver: bad connection" +- "invalid connection" + +## Initial Transfer + +When resync is needed (empty replica or no saved position), the service performs an initial data transfer: + +### Transfer Process + +1. **Detection**: Check if secondary database is empty or no position saved +2. **Database Enumeration**: List all databases from primary +3. **Schema Exclusion**: Skip excluded schemas (information_schema, mysql, etc.) +4. **Table Transfer**: For each table: + - Get table schema (column definitions) + - Check row count + - Transfer in chunks using primary key or LIMIT/OFFSET +5. **Progress Checkpointing**: Save progress to JSON file every 1000 rows +6. **Position Reset**: Clear saved binlog position after successful transfer +7. **Binlog Streaming**: Start streaming from current position + +### Chunked Transfer + +Tables are transferred in chunks for efficiency and memory safety: + +```sql +-- Using primary key (efficient, preserves order) +SELECT * FROM table WHERE pk >= 1000 AND pk < 2000 ORDER BY pk + +-- Without primary key (slower, may skip rows on updates) +SELECT * FROM table LIMIT 1000 OFFSET 1000 +``` + +**Batch Size:** Configurable (default: 1000 rows per chunk) + +### Progress Checkpointing + +Transfer progress is saved to `transfer_progress_{instance}.json`: + +```json +{ + "DatabasesProcessed": 2, + "CurrentDatabase": "mydb", + "TablesProcessed": { + "mydb.users": 5000, + "mydb.orders": 10000 + }, + "LastCheckpoint": "2024-01-15T10:30:00Z" +} +``` + +If the transfer is interrupted, it resumes from the last checkpoint. + +### Pause/Resume + +Transfers can be paused and resumed programmatically: + +```go +transfer := NewInitialTransfer(dsn, dsn, 1000, 1) + +// Pause during transfer +transfer.Pause() + +// Resume later +transfer.Resume() +``` + +## Configuration Reference + +### BinlogConfig + +```go +type BinlogConfig struct { + Host string // MySQL/MariaDB host + Port int // MySQL/MariaDB port + User string // Replication user + Password string // Password + ServerID uint32 // Unique server ID for replication + Name string // Instance name for logging +} +``` + +### EventHandlers + +```go +type EventHandlers struct { + secondaryDB *sql.DB + tableMapCache map[uint64]*replication.TableMapEvent + sqlBuilder *SQLBuilder + failedTables map[string]int + maxFailures int // Default: 5 + retryAttempts int // Default: 3 + retryDelay time.Duration // Default: 100ms + lastSchemaHash map[string]string +} +``` + +### InitialTransfer + +```go +type InitialTransfer struct { + primaryDB *sql.DB + secondaryDB *sql.DB + batchSize int // Default: 1000 + workerCount int // Default: 1 + excludedDBs map[string]bool + checkpointFile string + progress TransferProgress +} +``` + +### TransferProgress + +```go +type TransferProgress struct { + DatabasesProcessed int + CurrentDatabase string + TablesProcessed map[string]int64 // "schema.table" -> rows transferred + LastCheckpoint time.Time +} +``` + +### TransferStats + +```go +type TransferStats struct { + TotalRows int64 + TotalTables int + TransferTime int64 // milliseconds + Errors []string + Progress TransferProgress +} +``` + +## Logging + +The service uses structured logging with prefixes: + +| Prefix | Example | Description | +|--------|---------|-------------| +| `[INSERT]` | `[INSERT] 1 row(s) affected` | Successful INSERT | +| `[UPDATE]` | `[UPDATE] 1 row(s) affected` | Successful UPDATE | +| `[DELETE]` | `[DELETE] 1 row(s) affected` | Successful DELETE | +| `[SUCCESS]` | `[SUCCESS] 1 row(s) affected` | SQL operation succeeded | +| `[ERROR]` | `[ERROR] INSERT failed after retries` | Operation failed | +| `[WARN]` | `[WARN] Schema drift detected` | Warning condition | +| `[PANIC]` | `[PANIC RECOVERED] panic message` | Recovered panic | +| `[RETRY]` | `[RETRY] Retrying in 200ms` | Retry attempt | +| `[DRIFT]` | `[DRIFT] Schema changed` | Schema drift detected | +| `[SKIPPED]` | `[SKIPPED] Too many failures` | Table skipped | +| `[FAILURE]` | `[FAILURE] failure count: 3/5` | Table failure count | +| `[HEALTH]` | `[HEALTH CHECK] Failed` | Health check result | +| `[TRANSFER]` | `[TRANSFER] Found 5 tables` | Transfer progress | +| `[INFO]` | `[INFO] Saved position` | Informational | +| `[ROTATE]` | `[mariadb-primary] Rotated to binlog.000001` | Binlog rotation | +| `[RECONNECT]` | `[RECONNECT] Successfully reconnected` | Reconnection result | + +## Schema Filtering + +By default, only events for the "replica" schema are replicated: + +```go +// handlers.go line 54 +if schemaName != "replica" { + return nil // Skip all other schemas +} +``` + +To replicate all schemas, comment out or modify this filter. + +## Error Handling + +### Common Errors + +| Error | Cause | Resolution | +|-------|-------|------------| +| `connection refused` | Secondary DB down | Check secondary DB status | +| `schema drift detected` | Table schema changed | Manual intervention required | +| `too many failures` | Table repeatedly failing | Check table compatibility | +| `failed to get event` | Binlog stream interrupted | Service auto-recovers | +| `expected 4 destination arguments` | SHOW MASTER STATUS columns | Fixed in code | +| `assignment to entry in nil map` | Uninitialized map | Fixed in code | + +### Recovery Procedures + +1. **Secondary DB Connection Lost**: + - Service auto-reconnects (up to 5 attempts) + - Check secondary DB logs + - Verify network connectivity + +2. **Schema Drift Detected**: + - Stop service + - Compare schemas: `SHOW CREATE TABLE schema.table` on both DBs + - Sync schemas manually + - Reset position: `DELETE FROM binlog_position` + - Restart service + +3. **Transfer Interrupted**: + - Service resumes from `transfer_progress_{instance}.json` + - Check checkpoint file for progress + - Delete checkpoint file to restart transfer + +4. **Event Processing Stuck**: + - Check health check logs + - Verify binlog position: `SELECT * FROM binlog_position` + - Restart service if needed + - Clear position: `DELETE FROM binlog_position` + +## Monitoring + +### Key Metrics to Watch + +- **Replication Lag**: Time since last event processed +- **Rows Replicated**: Total INSERT/UPDATE/DELETE count +- **Tables Synced**: Number of tables synchronized +- **Error Count**: Failed operations counter +- **Success Rate**: Successful / total operations + +### Manual Verification + +```sql +-- Check position on secondary +SELECT * FROM binlog_position; + +-- Check row counts +SELECT COUNT(*) FROM replica.your_table; + +-- Compare with primary +SELECT COUNT(*) FROM your_table; +``` + +## Performance Considerations + +1. **Batch Size**: Start with 1000, adjust based on table size and memory +2. **Connection Pooling**: `SetMaxOpenConns(25)` for moderate load +3. **Worker Count**: Currently single-threaded (multi-worker planned) +4. **Schema Caching**: Table schemas cached in memory (auto-updated on drift) +5. **Index Usage**: Chunked transfers require indexed primary key + +## Limitations + +- **Single-threaded**: One worker processes events sequentially +- **Position-based**: No GTID support yet (position-based only) +- **Integer PKs**: Chunking requires integer primary key for efficiency +- **No Conflict Resolution**: Concurrent writes not handled +- **No Data Validation**: Assumes source data is valid + +## Known Issues & Fixes + +### Fixed Issues + +- [x] `SHOW MASTER STATUS` returning 4 columns (fixed by scanning 4 variables) +- [x] `nil map` panic in transfer (fixed by initializing TablesProcessed) +- [x] Event deduplication skipping valid events (disabled deduplication) +- [x] Checkpoint file path empty (fixed by setting instance-specific path) + +### Workarounds + +- **Position 0 Events**: Deduplication disabled for now +- **Schema Changes**: Service marks table as failed, manual restart required + +## Future Enhancements + +- [ ] GTID-based replication for better consistency +- [ ] Multi-threaded replication for higher throughput +- [ ] Conflict detection and resolution +- [ ] Prometheus metrics endpoint +- [ ] REST API for management +- [ ] Kubernetes operator +- [ ] Configuration file (YAML/JSON) +- [ ] Signal handling for dynamic config + +## File Structure + +``` +replica/ +├── cmd/ +│ └── replica/ +│ └── main.go # Entry point +├── pkg/ +│ └── replica/ +│ ├── service.go # Replication orchestration +│ ├── handlers.go # Event processing +│ ├── initial_transfer.go # Bulk data transfer +│ ├── position.go # Position persistence +│ ├── sqlbuilder.go # SQL generation +│ ├── config.go # Configuration types +│ └── logging.go # Structured logging +├── example.env # Environment template +├── .env # Environment (gitignored) +├── docker-compose.yml # Local development +├── go.mod # Go module +└── README.md # This file +``` + +## License + +MIT License diff --git a/cmd/replica/main.go b/cmd/replica/main.go new file mode 100644 index 0000000..51a6513 --- /dev/null +++ b/cmd/replica/main.go @@ -0,0 +1,100 @@ +package main + +import ( + "database/sql" + "os" + "os/signal" + "strconv" + "syscall" + + "git.ma-al.com/goc_marek/replica/pkg/replica" + _ "github.com/go-sql-driver/mysql" +) + +func main() { + // Initialize the logger + replica.InitLogger() + + // Load configuration from environment + cfg, err := replica.LoadEnvConfig() + if err != nil { + replica.Fatalf("Failed to load configuration: %v", err) + } + + // Setup Graylog if enabled + if cfg.Graylog.Enabled { + replica.Infof("Graylog enabled: %s", cfg.Graylog.Endpoint) + if err := replica.SetupGlobalGraylog(replica.GraylogConfig{ + Endpoint: cfg.Graylog.Endpoint, + Protocol: cfg.Graylog.Protocol, + Timeout: cfg.Graylog.Timeout, + Source: cfg.Graylog.Source, + ExtraFields: cfg.Graylog.ExtraFields, + }); err != nil { + replica.Warnf("Failed to setup Graylog: %v", err) + } else { + replica.Info("Graylog setup successful") + } + } + + replica.Infof("Loaded configuration: %d secondary(ies)", len(cfg.Secondaries)) + + // Handle shutdown signals + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + // Connect to primary MariaDB + primaryDSN := cfg.Primary.User + ":" + cfg.Primary.Password + "@tcp(" + cfg.Primary.Host + ":" + + strconv.FormatUint(uint64(cfg.Primary.Port), 10) + ")/?multiStatements=true" + primaryDB, err := sql.Open("mysql", primaryDSN) + if err != nil { + replica.Fatalf("Failed to connect to primary MariaDB: %v", err) + } + defer primaryDB.Close() + + primaryDB.SetMaxOpenConns(25) + primaryDB.SetMaxIdleConns(5) + + if err := primaryDB.Ping(); err != nil { + replica.Fatalf("Failed to ping primary MariaDB: %v", err) + } + replica.Info("Connected to primary MariaDB") + + // Create multi-service manager + multiService := replica.NewMultiBinlogSyncService() + + // Connect to each secondary and create services + for _, secCfg := range cfg.Secondaries { + replica.Infof("Connecting to secondary: %s (%s:%d)", secCfg.Name, secCfg.Host, secCfg.Port) + + secondaryDB, err := sql.Open("mysql", secCfg.DSN) + if err != nil { + replica.Fatalf("Failed to connect to secondary %s: %v", secCfg.Name, err) + } + defer secondaryDB.Close() + + secondaryDB.SetMaxOpenConns(25) + secondaryDB.SetMaxIdleConns(5) + + if err := secondaryDB.Ping(); err != nil { + replica.Fatalf("Failed to ping secondary %s: %v", secCfg.Name, err) + } + replica.Infof("Connected to secondary: %s", secCfg.Name) + + // Create service for this secondary + service := replica.NewBinlogSyncService(cfg.Primary, primaryDB, secondaryDB, secCfg.Name) + multiService.AddService(service) + } + + // Start all services + replica.Info("Starting binlog replication...") + multiService.StartAll() + + // Wait for shutdown signal + sig := <-sigChan + replica.Infof("Received %v, shutting down...", sig) + + // Stop all services + multiService.StopAll() + replica.Info("Service stopped.") +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..7979cef --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,80 @@ +services: + mariadb-primary: + image: mariadb:latest + container_name: ${MARIA_PRIMARY_NAME} + command: + - --innodb_buffer_pool_size=536870912 + - --key_buffer_size=67108864 + - --query_cache_type=1 + - --query_cache_size=134217728 + - --query-cache-strip-comments=1 + - --max-connections=256 + - --log_bin=log_bin + - --binlog_format=ROW + - --server-id=${MARIA_SERVER_ID} + ports: + - "${MARIA_PRIMARY_PORT}:3306" + networks: + - repl + volumes: + - mariadb-primary-data:/var/lib/mysql + environment: + MARIADB_USER: ${MARIA_USER} + MARIADB_PASSWORD: ${MARIA_PASS} + MYSQL_DATABASE: ${MARIA_NAME} + MYSQL_ROOT_PASSWORD: ${MARIA_PASS} + restart: always + + mariadb-secondary: + image: mariadb:latest + container_name: secondary + command: + - --innodb_buffer_pool_size=536870912 + - --key_buffer_size=67108864 + - --max-connections=256 + - --server-id=2 + # - --read_only=ON + - --relay-log=relay-log + # - --log_bin=log_bin + # - --binlog_format=ROW + ports: + - "3307:3306" + networks: + - repl + volumes: + - mariadb-secondary-data:/var/lib/mysql + environment: + MARIADB_USER: ${MARIA_USER} + MARIADB_PASSWORD: ${MARIA_PASS} + MYSQL_DATABASE: ${MARIA_NAME} + MYSQL_ROOT_PASSWORD: ${MARIA_PASS} + restart: always + depends_on: + - mariadb-primary + + # postgresql: + # container_name: ${POSTGRES_HOST} + # restart: always + # image: postgres:18 + # networks: + # repl: + # ports: + # - 5432:5432 + # volumes: + # - postgres-data:/var/lib/postgresql:Z + # command: postgres -c shared_buffers=512MB -c work_mem=16MB -c maintenance_work_mem=256MB -c effective_cache_size=4GB -c max_connections=20 + # environment: + # POSTGRES_USER: ${POSTGRES_USER} + # POSTGRES_PASSWORD: ${POSTGRES_PASS} + # POSTGRES_DB: ${POSTGRES_NAME} + + +networks: + repl: + name: repl +volumes: + mariadb-primary-data: + driver: local + mariadb-secondary-data: + driver: local + diff --git a/example.env b/example.env new file mode 100644 index 0000000..65679f0 --- /dev/null +++ b/example.env @@ -0,0 +1,34 @@ +# Primary MariaDB Configuration +MARIA_USER=replica +MARIA_PASS=replica +MARIA_SERVER_ID=100 +MARIA_PRIMARY_HOST=mariadb-primary +MARIA_PRIMARY_PORT=3306 +MARIA_PRIMARY_NAME=mariadb-primary + +# Secondary MariaDB Configuration (comma-separated for multiple) +# Format: host1:port1,host2:port2,host3:port3 +# Or just hostnames and use MARIA_SECONDARY_PORTS for ports +MARIA_SECONDARY_HOSTS=mariadb-secondary-1,mariadb-secondary-2,mariadb-secondary-3 +MARIA_SECONDARY_PORTS=3307,3308,3309 +MARIA_SECONDARY_NAMES=secondary-1,secondary-2,secondary-3 + +# Optional: Override per-secondary credentials (must match number of secondaries or use defaults) +# MARIA_SECONDARY_USERS=replica1,replica2,replica3 +# MARIA_SECONDARY_PASSWORDS=pass1,pass2,pass3 + +# Legacy single secondary (for backward compatibility) +# MARIA_SECONDARY_HOST=mariadb-secondary + +# Transfer Settings +TRANSFER_BATCH_SIZE=1000 + +# Project name for logging +LOCAL_PROJECT_NAME=naluconcept + +# Graylog Configuration +GRAYLOG_ENABLED=false +GRAYLOG_ENDPOINT=localhost:12201 +GRAYLOG_PROTOCOL=udp +GRAYLOG_TIMEOUT=5s +GRAYLOG_SOURCE=binlog-sync diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9ff13ac --- /dev/null +++ b/go.mod @@ -0,0 +1,24 @@ +module git.ma-al.com/goc_marek/replica + +go 1.23.0 + +require ( + github.com/go-mysql-org/go-mysql v1.13.0 + github.com/go-sql-driver/mysql v1.8.1 +) + +require ( + filippo.io/edwards25519 v1.1.0 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/klauspost/compress v1.17.8 // indirect + github.com/pingcap/errors v0.11.5-0.20250318082626-8f80e5cb09ec // indirect + github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a // indirect + github.com/pingcap/tidb/pkg/parser v0.0.0-20250421232622-526b2c79173d // indirect + github.com/shopspring/decimal v1.2.0 // indirect + go.uber.org/atomic v1.11.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect + golang.org/x/text v0.24.0 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..1183093 --- /dev/null +++ b/go.sum @@ -0,0 +1,75 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-mysql-org/go-mysql v1.13.0 h1:Hlsa5x1bX/wBFtMbdIOmb6YzyaVNBWnwrb8gSIEPMDc= +github.com/go-mysql-org/go-mysql v1.13.0/go.mod h1:FQxw17uRbFvMZFK+dPtIPufbU46nBdrGaxOw0ac9MFs= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= +github.com/pingcap/errors v0.11.5-0.20250318082626-8f80e5cb09ec h1:3EiGmeJWoNixU+EwllIn26x6s4njiWRXewdx2zlYa84= +github.com/pingcap/errors v0.11.5-0.20250318082626-8f80e5cb09ec/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= +github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a h1:WIhmJBlNGmnCWH6TLMdZfNEDaiU8cFpZe3iaqDbQ0M8= +github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA= +github.com/pingcap/tidb/pkg/parser v0.0.0-20250421232622-526b2c79173d h1:3Ej6eTuLZp25p3aH/EXdReRHY12hjZYs3RrGp7iLdag= +github.com/pingcap/tidb/pkg/parser v0.0.0-20250421232622-526b2c79173d/go.mod h1:+8feuexTKcXHZF/dkDfvCwEyBAmgb4paFc3/WeYV2eE= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= +github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/replica/config.go b/pkg/replica/config.go new file mode 100644 index 0000000..cbebd80 --- /dev/null +++ b/pkg/replica/config.go @@ -0,0 +1,256 @@ +package replica + +import ( + "os" + "strconv" + "strings" + "time" +) + +// BinlogConfig holds the configuration for connecting to MySQL/MariaDB binlog +type BinlogConfig struct { + Host string + Port uint16 + User string + Password string + ServerID uint32 + Name string // Instance name for logging +} + +// SecondaryConfig holds the configuration for a secondary database +type SecondaryConfig struct { + Name string // Friendly name for logging + Host string + Port uint16 + User string + Password string + DSN string // Pre-built DSN for convenience +} + +// GraylogConfig holds the configuration for Graylog integration +type GraylogConfig struct { + Enabled bool + Endpoint string + Protocol string + Timeout time.Duration + Source string + ExtraFields map[string]interface{} +} + +// AppConfig holds the complete application configuration +type AppConfig struct { + // Primary configuration + Primary BinlogConfig + + // Secondary configurations + Secondaries []SecondaryConfig + + // Transfer settings + BatchSize int + ExcludeSchemas []string + + // Graylog configuration + Graylog GraylogConfig +} + +// LoadEnvConfig loads configuration from environment variables +func LoadEnvConfig() (*AppConfig, error) { + cfg := &AppConfig{ + BatchSize: 1000, + ExcludeSchemas: []string{"information_schema", "performance_schema", "mysql", "sys"}, + } + + // Primary configuration + cfg.Primary.Host = getEnv("MARIA_PRIMARY_HOST", "localhost") + cfg.Primary.Port = uint16(getEnvInt("MARIA_PRIMARY_PORT", 3306)) + cfg.Primary.User = getEnv("MARIA_USER", "replica") + cfg.Primary.Password = getEnv("MARIA_PASS", "replica") + cfg.Primary.ServerID = uint32(getEnvInt("MARIA_SERVER_ID", 100)) + cfg.Primary.Name = getEnv("MARIA_PRIMARY_NAME", "mariadb-primary") + + // Parse secondary hosts (comma-separated) + secondaryHosts := getEnv("MARIA_SECONDARY_HOSTS", "") + if secondaryHosts == "" { + // Fallback to single secondary for backward compatibility + secondaryHosts = getEnv("MARIA_SECONDARY_HOST", "localhost") + } + + // Parse secondary ports (comma-separated, must match number of hosts or be single value) + secondaryPortsStr := getEnv("MARIA_SECONDARY_PORTS", "") + secondaryPorts := parseUint16List(secondaryPortsStr, 3307) + + // Parse secondary users (comma-separated, optional) + secondaryUsers := getEnv("MARIA_SECONDARY_USERS", "") + users := parseStringList(secondaryUsers, cfg.Primary.User) + + // Parse secondary passwords (comma-separated, optional) + secondaryPasswords := getEnv("MARIA_SECONDARY_PASSWORDS", "") + passwords := parseStringList(secondaryPasswords, cfg.Primary.Password) + + // Parse secondary names (comma-separated, optional) + secondaryNames := getEnv("MARIA_SECONDARY_NAMES", "") + + hosts := parseStringList(secondaryHosts, "") + names := parseStringList(secondaryNames, "") + portMap := makePortsMap(hosts, secondaryPorts, 3307) + userMap := makeStringMap(hosts, users, cfg.Primary.User) + passMap := makeStringMap(hosts, passwords, cfg.Primary.Password) + + // Build secondary configurations + for i, host := range hosts { + name := strconv.Itoa(i + 1) + if i < len(names) && names[i] != "" { + name = names[i] + } + + port := uint16(3307) + if p, ok := portMap[host]; ok { + port = p + } + + user := cfg.Primary.User + if u, ok := userMap[host]; ok { + user = u + } + + pass := cfg.Primary.Password + if p, ok := passMap[host]; ok { + pass = p + } + + dsn := buildDSN(user, pass, host, int(port), "replica") + + cfg.Secondaries = append(cfg.Secondaries, SecondaryConfig{ + Name: name, + Host: host, + Port: port, + User: user, + Password: pass, + DSN: dsn, + }) + } + + // Batch size override + if batchSize := getEnvInt("TRANSFER_BATCH_SIZE", 0); batchSize > 0 { + cfg.BatchSize = batchSize + } + + // Graylog configuration + cfg.Graylog.Enabled = getEnvBool("GRAYLOG_ENABLED", false) + cfg.Graylog.Endpoint = getEnv("GRAYLOG_ENDPOINT", "localhost:12201") + cfg.Graylog.Protocol = getEnv("GRAYLOG_PROTOCOL", "udp") + cfg.Graylog.Source = getEnv("GRAYLOG_SOURCE", "binlog-sync") + + // Parse timeout + if timeout := getEnv("GRAYLOG_TIMEOUT", "5s"); timeout != "" { + d, err := time.ParseDuration(timeout) + if err == nil { + cfg.Graylog.Timeout = d + } else { + cfg.Graylog.Timeout = 5 * time.Second + } + } + + return cfg, nil +} + +// getEnv gets an environment variable with a default value +func getEnv(key, defaultValue string) string { + value := os.Getenv(key) + if value == "" { + return defaultValue + } + return value +} + +// getEnvInt gets an environment variable as an integer with a default value +func getEnvInt(key string, defaultValue int) int { + value := os.Getenv(key) + if value == "" { + return defaultValue + } + i, err := strconv.Atoi(value) + if err != nil { + return defaultValue + } + return i +} + +// getEnvBool gets an environment variable as a boolean with a default value +func getEnvBool(key string, defaultValue bool) bool { + value := os.Getenv(key) + if value == "" { + return defaultValue + } + b, err := strconv.ParseBool(value) + if err != nil { + return defaultValue + } + return b +} + +// parseStringList parses a comma-separated string into a list +func parseStringList(s string, defaultValue string) []string { + if strings.TrimSpace(s) == "" { + if defaultValue == "" { + return nil + } + return []string{defaultValue} + } + parts := strings.Split(s, ",") + result := make([]string, len(parts)) + for i, p := range parts { + result[i] = strings.TrimSpace(p) + } + return result +} + +// parseUint16List parses a comma-separated string into a list of uint16 +func parseUint16List(s string, defaultValue uint16) []uint16 { + if strings.TrimSpace(s) == "" { + return []uint16{defaultValue} + } + parts := strings.Split(s, ",") + result := make([]uint16, len(parts)) + for i, p := range parts { + p = strings.TrimSpace(p) + val, err := strconv.ParseUint(p, 10, 16) + if err != nil { + result[i] = defaultValue + } else { + result[i] = uint16(val) + } + } + return result +} + +// makePortsMap creates a map of host to port +func makePortsMap(hosts []string, ports []uint16, defaultPort uint16) map[string]uint16 { + result := make(map[string]uint16) + for i, host := range hosts { + if i < len(ports) { + result[host] = ports[i] + } else { + result[host] = defaultPort + } + } + return result +} + +// makeStringMap creates a map of host to string value +func makeStringMap(hosts []string, values []string, defaultValue string) map[string]string { + result := make(map[string]string) + for i, host := range hosts { + if i < len(values) && values[i] != "" { + result[host] = values[i] + } else { + result[host] = defaultValue + } + } + return result +} + +// buildDSN builds a MySQL DSN string +func buildDSN(user, password, host string, port int, database string) string { + return user + ":" + password + "@tcp(" + host + ":" + strconv.Itoa(port) + ")/" + database + "?multiStatements=true" +} diff --git a/pkg/replica/handlers.go b/pkg/replica/handlers.go new file mode 100644 index 0000000..c42e13b --- /dev/null +++ b/pkg/replica/handlers.go @@ -0,0 +1,420 @@ +package replica + +import ( + "database/sql" + "fmt" + "sync" + "time" + + "github.com/go-mysql-org/go-mysql/replication" +) + +// EventHandlers handles binlog event processing with resilience features +type EventHandlers struct { + secondaryDB *sql.DB + secondaryName string + tableMapCache map[uint64]*replication.TableMapEvent + tableMapMu sync.RWMutex + sqlBuilder *SQLBuilder + failedTables map[string]int // tableName -> consecutive failure count + failedTablesMu sync.RWMutex + maxFailures int // max consecutive failures before skipping + retryAttempts int // number of retry attempts + retryDelay time.Duration // base retry delay + lastSchemaHash map[string]string // tableName -> schema hash + lastSchemaMu sync.RWMutex +} + +// NewEventHandlers creates new event handlers with default resilience settings +func NewEventHandlers(secondaryDB *sql.DB, secondaryName string) *EventHandlers { + return &EventHandlers{ + secondaryDB: secondaryDB, + secondaryName: secondaryName, + tableMapCache: make(map[uint64]*replication.TableMapEvent), + sqlBuilder: NewSQLBuilder(), + failedTables: make(map[string]int), + maxFailures: 5, + retryAttempts: 3, + retryDelay: 100 * time.Millisecond, + lastSchemaHash: make(map[string]string), + } +} + +// HandleRows processes row-level events with panic recovery and retry logic +func (h *EventHandlers) HandleRows(header *replication.EventHeader, e *replication.RowsEvent) error { + // Panic recovery wrapper + defer func() { + if r := recover(); r != nil { + Errorf("[%s][PANIC RECOVERED] HandleRows panic: %v", h.secondaryName, r) + } + }() + + tableName := string(e.Table.Table) + schemaName := string(e.Table.Schema) + + if schemaName != "replica" { + return nil + } + + // Check if table is temporarily skipped due to failures + if h.isTableSkipped(schemaName, tableName) { + Warn("[%s][SKIPPED] Skipping event for %s.%s (too many failures)", h.secondaryName, schemaName, tableName) + return nil + } + + eventType := h.getEventTypeName(header.EventType) + Info("[%s][%s] %s.%s", h.secondaryName, eventType, schemaName, tableName) + + if h.secondaryDB != nil { + // Schema drift detection + if h.detectSchemaDrift(schemaName, tableName) { + Warn("[%s][WARN] Schema drift detected for %s.%s, pausing replication", h.secondaryName, schemaName, tableName) + h.markTableFailed(schemaName, tableName) + return fmt.Errorf("schema drift detected") + } + + h.handleSecondaryReplication(e, header.EventType) + } + + return nil +} + +// HandleQuery processes query events with panic recovery +func (h *EventHandlers) HandleQuery(e *replication.QueryEvent) error { + // Panic recovery wrapper + defer func() { + if r := recover(); r != nil { + Errorf("[%s][PANIC RECOVERED] HandleQuery panic: %v", h.secondaryName, r) + } + }() + + query := string(e.Query) + if h.secondaryDB != nil { + _, err := h.secondaryDB.Exec(query) + if err != nil { + Errorf("[%s][ERROR] Query failed: %v", h.secondaryName, err) + } + } + return nil +} + +// HandleTableMap caches table map events +func (h *EventHandlers) HandleTableMap(e *replication.TableMapEvent) { + // Panic recovery wrapper + defer func() { + if r := recover(); r != nil { + Errorf("[%s][PANIC RECOVERED] HandleTableMap panic: %v", h.secondaryName, r) + } + }() + + tableID := (uint64(e.TableID) << 8) | uint64(e.TableID>>56) + h.tableMapMu.Lock() + h.tableMapCache[tableID] = e + h.tableMapMu.Unlock() +} + +// GetTableMap returns the cached table map for a table ID +func (h *EventHandlers) GetTableMap(tableID uint64) *replication.TableMapEvent { + h.tableMapMu.RLock() + defer h.tableMapMu.RUnlock() + return h.tableMapCache[tableID] +} + +// isTableSkipped checks if a table should be skipped due to too many failures +func (h *EventHandlers) isTableSkipped(schema, table string) bool { + key := schema + "." + table + h.failedTablesMu.RLock() + defer h.failedTablesMu.RUnlock() + return h.failedTables[key] >= h.maxFailures +} + +// markTableFailed records a failure for a table +func (h *EventHandlers) markTableFailed(schema, table string) { + key := schema + "." + table + h.failedTablesMu.Lock() + h.failedTables[key]++ + failCount := h.failedTables[key] + h.failedTablesMu.Unlock() + + Warn("[%s][FAILURE] %s.%s failure count: %d/%d", h.secondaryName, schema, table, failCount, h.maxFailures) +} + +// markTableSuccess records a successful operation for a table +func (h *EventHandlers) markTableSuccess(schema, table string) { + key := schema + "." + table + h.failedTablesMu.Lock() + h.failedTables[key] = 0 // Reset failure count on success + h.failedTablesMu.Unlock() +} + +// detectSchemaDrift checks if the table schema has changed +func (h *EventHandlers) detectSchemaDrift(schema, table string) bool { + key := schema + "." + table + + // Get current schema hash + currentHash, err := h.getSchemaHash(schema, table) + if err != nil { + Warn("[%s][WARN] Could not get schema hash for %s.%s: %v", h.secondaryName, schema, table, err) + return false + } + + h.lastSchemaMu.RLock() + lastHash, exists := h.lastSchemaHash[key] + h.lastSchemaMu.RUnlock() + + if !exists { + // First time seeing this table + h.lastSchemaMu.Lock() + h.lastSchemaHash[key] = currentHash + h.lastSchemaMu.Unlock() + return false + } + + if lastHash != currentHash { + Warn("[%s][DRIFT] Schema changed for %s.%s: %s -> %s", h.secondaryName, schema, table, lastHash, currentHash) + return true + } + + return false +} + +// getSchemaHash returns a hash of the table schema +func (h *EventHandlers) getSchemaHash(schema, table string) (string, error) { + query := fmt.Sprintf( + "SELECT MD5(GROUP_CONCAT(COLUMN_NAME, ':', DATA_TYPE, ':', IS_NULLABLE ORDER BY ORDINAL_POSITION)) "+ + "FROM INFORMATION_SCHEMA.COLUMNS "+ + "WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s'", + schema, table, + ) + + var hash string + err := h.secondaryDB.QueryRow(query).Scan(&hash) + return hash, err +} + +func (h *EventHandlers) handleSecondaryReplication(e *replication.RowsEvent, eventType replication.EventType) { + schemaName := string(e.Table.Schema) + tableName := string(e.Table.Table) + + tableID := (uint64(e.TableID) << 8) | uint64(e.TableID>>56) + tableMap := h.GetTableMap(tableID) + + if tableMap == nil { + tableMap = e.Table + } + + if len(tableMap.ColumnName) == 0 { + columns, err := h.fetchColumnNames(schemaName, tableName) + if err != nil { + Errorf("[%s][ERROR] Failed to fetch columns: %v", h.secondaryName, err) + return + } + + columnBytes := make([][]byte, len(columns)) + for i, col := range columns { + columnBytes[i] = []byte(col) + } + + tableMap = &replication.TableMapEvent{ + Schema: e.Table.Schema, + Table: e.Table.Table, + ColumnName: columnBytes, + } + } + + switch eventType { + case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: + h.replicateInsert(tableMap, schemaName, tableName, e.Rows) + case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: + h.replicateUpdate(tableMap, schemaName, tableName, e.Rows) + case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: + h.replicateDelete(tableMap, schemaName, tableName, e.Rows) + } +} + +// replicateInsert inserts rows with retry logic +func (h *EventHandlers) replicateInsert(tableMap *replication.TableMapEvent, schema, table string, rows [][]interface{}) { + for _, row := range rows { + query := h.sqlBuilder.BuildInsert(schema, table, tableMap, row) + + err := h.executeWithRetry(query) + if err != nil { + Errorf("[%s][ERROR] INSERT failed after retries: %v", h.secondaryName, err) + h.markTableFailed(schema, table) + } else { + h.markTableSuccess(schema, table) + } + } +} + +// replicateUpdate updates rows with retry logic +func (h *EventHandlers) replicateUpdate(tableMap *replication.TableMapEvent, schema, table string, rows [][]interface{}) { + for i := 0; i < len(rows); i += 2 { + query := h.sqlBuilder.BuildUpdate(schema, table, tableMap, rows[i], rows[i+1]) + + err := h.executeWithRetry(query) + if err != nil { + Errorf("[%s][ERROR] UPDATE failed after retries: %v", h.secondaryName, err) + h.markTableFailed(schema, table) + } else { + h.markTableSuccess(schema, table) + } + } +} + +// replicateDelete deletes rows with retry logic +func (h *EventHandlers) replicateDelete(tableMap *replication.TableMapEvent, schema, table string, rows [][]interface{}) { + for _, row := range rows { + query := h.sqlBuilder.BuildDelete(schema, table, tableMap, row) + + err := h.executeWithRetry(query) + if err != nil { + Errorf("[%s][ERROR] DELETE failed after retries: %v", h.secondaryName, err) + h.markTableFailed(schema, table) + } else { + h.markTableSuccess(schema, table) + } + } +} + +// executeWithRetry executes a query with exponential backoff retry +func (h *EventHandlers) executeWithRetry(query string) error { + var lastErr error + + for attempt := 0; attempt <= h.retryAttempts; attempt++ { + if attempt > 0 { + delay := h.retryDelay * time.Duration(1<= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr)) +} + +func containsHelper(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} diff --git a/pkg/replica/initial_transfer.go b/pkg/replica/initial_transfer.go new file mode 100644 index 0000000..8016b3f --- /dev/null +++ b/pkg/replica/initial_transfer.go @@ -0,0 +1,635 @@ +package replica + +import ( + "database/sql" + "encoding/json" + "fmt" + "os" + "strings" + "sync" + "time" + + _ "github.com/go-sql-driver/mysql" +) + +// TransferStats holds statistics about the transfer +type TransferStats struct { + TotalRows int64 + TotalTables int + TransferTime int64 // in milliseconds + Errors []string + Progress TransferProgress +} + +// TransferProgress tracks the current position in the transfer +type TransferProgress struct { + DatabasesProcessed int + CurrentDatabase string + TablesProcessed map[string]int64 // tableName -> rows transferred + LastCheckpoint time.Time +} + +// InitialTransfer handles the initial data transfer from primary to secondary +type InitialTransfer struct { + primaryDB *sql.DB + secondaryDB *sql.DB + batchSize int + workerCount int + excludedDBs map[string]bool + mu sync.Mutex + stats TransferStats + checkpointFile string + progress TransferProgress + pauseChan chan struct{} + resumeChan chan struct{} + isPaused bool +} + +// NewInitialTransfer creates a new initial transfer handler +func NewInitialTransfer(primaryDSN, secondaryDSN string, batchSize, workerCount int) (*InitialTransfer, error) { + primaryDB, err := sql.Open("mysql", primaryDSN) + if err != nil { + return nil, fmt.Errorf("failed to connect to primary: %v", err) + } + primaryDB.SetMaxOpenConns(batchSize) + primaryDB.SetMaxIdleConns(2) + + secondaryDB, err := sql.Open("mysql", secondaryDSN) + if err != nil { + primaryDB.Close() + return nil, fmt.Errorf("failed to connect to secondary: %v", err) + } + secondaryDB.SetMaxOpenConns(batchSize) + secondaryDB.SetMaxIdleConns(2) + + if err := primaryDB.Ping(); err != nil { + primaryDB.Close() + secondaryDB.Close() + return nil, fmt.Errorf("failed to ping primary: %v", err) + } + + if err := secondaryDB.Ping(); err != nil { + primaryDB.Close() + secondaryDB.Close() + return nil, fmt.Errorf("failed to ping secondary: %v", err) + } + + return &InitialTransfer{ + primaryDB: primaryDB, + secondaryDB: secondaryDB, + batchSize: batchSize, + workerCount: workerCount, + excludedDBs: map[string]bool{ + "information_schema": true, + "performance_schema": true, + "mysql": true, + "sys": true, + }, + checkpointFile: "transfer_progress.json", + progress: TransferProgress{ + TablesProcessed: make(map[string]int64), + }, + pauseChan: make(chan struct{}), + resumeChan: make(chan struct{}), + }, nil +} + +// Transfer executes the initial data transfer +func (t *InitialTransfer) Transfer(excludeSchemas []string) error { + startTime := time.Now() + + // Load previous progress if exists + t.loadProgress() + + // Add user-specified exclusions + for _, db := range excludeSchemas { + t.excludedDBs[db] = true + } + + // Get list of databases to transfer + databases, err := t.getDatabases() + if err != nil { + return fmt.Errorf("failed to get databases: %v", err) + } + + Info("[TRANSFER] Starting initial data transfer...") + Infof("[TRANSFER] Found %d databases to transfer", len(databases)) + + // Get current binlog position before transfer + binlogPos, err := t.getBinlogPosition() + if err != nil { + Warnf("[WARN] Failed to get binlog position: %v", err) + } + + for i, dbName := range databases { + // Check for pause signal + t.checkPause() + + if t.excludedDBs[dbName] { + Infof("[TRANSFER] Skipping excluded database: %s", dbName) + continue + } + + // Skip already processed databases + if i < t.progress.DatabasesProcessed { + Infof("[TRANSFER] Skipping already processed database: %s", dbName) + continue + } + + t.progress.CurrentDatabase = dbName + t.saveProgress() + + if err := t.transferDatabase(dbName); err != nil { + return fmt.Errorf("failed to transfer database %s: %v", dbName, err) + } + + t.progress.DatabasesProcessed++ + t.saveProgress() + } + + t.stats.TransferTime = time.Since(startTime).Milliseconds() + + Infof("[TRANSFER] Transfer completed: %d tables, %d rows in %dms", + t.stats.TotalTables, t.stats.TotalRows, t.stats.TransferTime) + + if binlogPos != "" { + Infof("[TRANSFER] Binlog position before transfer: %s", binlogPos) + } + + // Clear progress on successful completion + t.clearProgress() + + return nil +} + +// checkPause checks if transfer should be paused +func (t *InitialTransfer) checkPause() { + if t.isPaused { + Info("[TRANSFER] Transfer paused, waiting for resume...") + <-t.resumeChan + Info("[TRANSFER] Transfer resumed") + } +} + +// Pause pauses the transfer +func (t *InitialTransfer) Pause() { + if !t.isPaused { + t.isPaused = true + t.pauseChan <- struct{}{} + Info("[TRANSFER] Transfer pause requested") + } +} + +// Resume resumes the transfer +func (t *InitialTransfer) Resume() { + if t.isPaused { + t.isPaused = false + t.resumeChan <- struct{}{} + Info("[TRANSFER] Transfer resume requested") + } +} + +// saveProgress saves the current progress to a checkpoint file +func (t *InitialTransfer) saveProgress() { + t.progress.LastCheckpoint = time.Now() + + data, err := json.MarshalIndent(t.progress, "", " ") + if err != nil { + Warnf("[WARN] Failed to marshal progress: %v", err) + return + } + + err = os.WriteFile(t.checkpointFile, data, 0644) + if err != nil { + Warnf("[WARN] Failed to save progress: %v", err) + } +} + +// loadProgress loads previous progress from checkpoint file +func (t *InitialTransfer) loadProgress() { + data, err := os.ReadFile(t.checkpointFile) + if err != nil { + Info("[INFO] No previous progress found, starting fresh") + return + } + + var progress TransferProgress + if err := json.Unmarshal(data, &progress); err != nil { + Warnf("[WARN] Failed to load progress: %v", err) + return + } + + t.progress = progress + Infof("[INFO] Resuming transfer: %d databases, %d tables in progress", + progress.DatabasesProcessed, len(progress.TablesProcessed)) +} + +// clearProgress removes the checkpoint file +func (t *InitialTransfer) clearProgress() { + os.Remove(t.checkpointFile) +} + +// GetProgress returns the current transfer progress +func (t *InitialTransfer) GetProgress() TransferProgress { + t.mu.Lock() + defer t.mu.Unlock() + return t.progress +} + +// getDatabases returns list of databases to transfer +func (t *InitialTransfer) getDatabases() ([]string, error) { + rows, err := t.primaryDB.Query("SHOW DATABASES") + if err != nil { + return nil, err + } + defer rows.Close() + + var databases []string + for rows.Next() { + var dbName string + if err := rows.Scan(&dbName); err != nil { + return nil, err + } + databases = append(databases, dbName) + } + + return databases, rows.Err() +} + +// transferDatabase transfers all tables in a database +func (t *InitialTransfer) transferDatabase(dbName string) error { + Infof("[TRANSFER] Transferring database: %s", dbName) + + // Get list of tables + tables, err := t.getTables(dbName) + if err != nil { + return fmt.Errorf("failed to get tables: %v", err) + } + + Infof("[TRANSFER] Found %d tables in %s", len(tables), dbName) + + for _, table := range tables { + // Check for pause signal + t.checkPause() + + // Skip already processed tables + tableKey := dbName + "." + table + if t.progress.TablesProcessed[tableKey] > 0 { + Infof("[TRANSFER] Skipping already processed table: %s", tableKey) + continue + } + + if err := t.transferTable(dbName, table); err != nil { + Errorf("[ERROR] Failed to transfer table %s.%s: %v", dbName, table, err) + t.mu.Lock() + t.stats.Errors = append(t.stats.Errors, fmt.Sprintf("%s.%s: %v", dbName, table, err)) + t.mu.Unlock() + } + } + + return nil +} + +// getTables returns list of tables in a database +func (t *InitialTransfer) getTables(dbName string) ([]string, error) { + query := fmt.Sprintf("SHOW TABLES FROM `%s`", dbName) + rows, err := t.primaryDB.Query(query) + if err != nil { + return nil, err + } + defer rows.Close() + + var tables []string + for rows.Next() { + var tableName string + if err := rows.Scan(&tableName); err != nil { + return nil, err + } + tables = append(tables, tableName) + } + + return tables, rows.Err() +} + +// transferTable transfers a single table with chunked reads +func (t *InitialTransfer) transferTable(dbName, tableName string) error { + // Get table structure + schema, err := t.getTableSchema(dbName, tableName) + if err != nil { + return fmt.Errorf("failed to get table schema: %v", err) + } + + // Check if table has data + hasData, err := t.tableHasData(dbName, tableName) + if err != nil { + return fmt.Errorf("failed to check table data: %v", err) + } + + if !hasData { + Infof("[TRANSFER] Table %s.%s is empty, skipping data transfer", dbName, tableName) + return nil + } + + // Get row count + count, err := t.getRowCount(dbName, tableName) + if err != nil { + return fmt.Errorf("failed to get row count: %v", err) + } + + Infof("[TRANSFER] Transferring %s.%s (%d rows)", dbName, tableName, count) + + // Get primary key or first unique column for chunking + pkColumn, err := t.getPrimaryKey(dbName, tableName) + if err != nil { + Warnf("[WARN] No primary key found for %s.%s, using full scan", dbName, tableName) + return t.transferTableFullScan(dbName, tableName, schema, count) + } + + return t.transferTableChunked(dbName, tableName, schema, pkColumn, count) +} + +// tableHasData checks if a table has any rows +func (t *InitialTransfer) tableHasData(dbName, tableName string) (bool, error) { + query := fmt.Sprintf("SELECT 1 FROM `%s`.`%s` LIMIT 1", dbName, tableName) + var exists int + err := t.primaryDB.QueryRow(query).Scan(&exists) + if err == sql.ErrNoRows { + return false, nil + } + if err != nil { + return false, err + } + return true, nil +} + +// getRowCount returns the number of rows in a table +func (t *InitialTransfer) getRowCount(dbName, tableName string) (int64, error) { + query := fmt.Sprintf("SELECT COUNT(*) FROM `%s`.`%s`", dbName, tableName) + var count int64 + err := t.primaryDB.QueryRow(query).Scan(&count) + return count, err +} + +// getTableSchema returns the column definitions for a table +func (t *InitialTransfer) getTableSchema(dbName, tableName string) ([]ColumnInfo, error) { + query := fmt.Sprintf( + "SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COLUMN_KEY, EXTRA "+ + "FROM INFORMATION_SCHEMA.COLUMNS "+ + "WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' "+ + "ORDER BY ORDINAL_POSITION", + dbName, tableName, + ) + + rows, err := t.primaryDB.Query(query) + if err != nil { + return nil, err + } + defer rows.Close() + + var columns []ColumnInfo + for rows.Next() { + var col ColumnInfo + if err := rows.Scan(&col.Name, &col.Type, &col.Nullable, &col.Key, &col.Extra); err != nil { + return nil, err + } + columns = append(columns, col) + } + + return columns, rows.Err() +} + +// ColumnInfo holds information about a table column +type ColumnInfo struct { + Name string + Type string + Nullable string + Key string + Extra string +} + +// getPrimaryKey returns the primary key column for a table +func (t *InitialTransfer) getPrimaryKey(dbName, tableName string) (string, error) { + query := fmt.Sprintf( + "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE "+ + "WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND CONSTRAINT_NAME = 'PRIMARY' "+ + "ORDER BY ORDINAL_POSITION LIMIT 1", + dbName, tableName, + ) + + var pkColumn string + err := t.primaryDB.QueryRow(query).Scan(&pkColumn) + if err == sql.ErrNoRows { + return "", fmt.Errorf("no primary key") + } + return pkColumn, err +} + +// transferTableChunked transfers a table using primary key chunks +func (t *InitialTransfer) transferTableChunked(dbName, tableName string, columns []ColumnInfo, pkColumn string, rowCount int64) error { + tableKey := dbName + "." + tableName + + // Get starting offset from progress + startOffset := t.progress.TablesProcessed[tableKey] + + // Get min and max primary key values + minMax, err := t.getMinMaxPK(dbName, tableName, pkColumn) + if err != nil { + return fmt.Errorf("failed to get min/max: %v", err) + } + + // Adjust start offset to be within range + if startOffset < minMax.Min { + startOffset = minMax.Min + } + + // Transfer chunks + offset := startOffset + for offset < minMax.Max { + // Check for pause signal + t.checkPause() + + query := fmt.Sprintf( + "SELECT * FROM `%s`.`%s` WHERE `%s` >= %d AND `%s` < %d ORDER BY `%s`", + dbName, tableName, pkColumn, offset, pkColumn, offset+int64(t.batchSize), pkColumn, + ) + + rows, err := t.primaryDB.Query(query) + if err != nil { + return fmt.Errorf("failed to query chunk: %v", err) + } + + rowsInserted, err := t.insertRows(t.secondaryDB, dbName, tableName, columns, rows) + rows.Close() + + if err != nil { + return fmt.Errorf("failed to insert chunk: %v", err) + } + + // Update progress + t.mu.Lock() + t.progress.TablesProcessed[tableKey] = offset + int64(t.batchSize) + t.mu.Unlock() + + // Save checkpoint every 1000 rows + if rowsInserted%1000 == 0 { + t.saveProgress() + } + + offset += int64(t.batchSize) + } + + return nil +} + +// MinMax holds min and max values +type MinMax struct { + Min int64 + Max int64 +} + +// getMinMaxPK returns the min and max primary key values +func (t *InitialTransfer) getMinMaxPK(dbName, tableName, pkColumn string) (*MinMax, error) { + query := fmt.Sprintf( + "SELECT COALESCE(MIN(`%s`), 0), COALESCE(MAX(`%s`), 0) FROM `%s`.`%s`", + pkColumn, pkColumn, dbName, tableName, + ) + + var minVal, maxVal int64 + err := t.primaryDB.QueryRow(query).Scan(&minVal, &maxVal) + if err != nil { + return nil, err + } + + return &MinMax{Min: minVal, Max: maxVal + 1}, nil +} + +// transferTableFullScan transfers a table without primary key (slower, uses LIMIT/OFFSET) +func (t *InitialTransfer) transferTableFullScan(dbName, tableName string, columns []ColumnInfo, rowCount int64) error { + tableKey := dbName + "." + tableName + + // Get starting offset from progress + startOffset := t.progress.TablesProcessed[tableKey] + + var offset int64 = startOffset + + for offset < rowCount { + // Check for pause signal + t.checkPause() + + query := fmt.Sprintf( + "SELECT * FROM `%s`.`%s` LIMIT %d OFFSET %d", + dbName, tableName, t.batchSize, offset, + ) + + rows, err := t.primaryDB.Query(query) + if err != nil { + return fmt.Errorf("failed to query chunk: %v", err) + } + + rowsInserted, err := t.insertRows(t.secondaryDB, dbName, tableName, columns, rows) + rows.Close() + + if err != nil { + return fmt.Errorf("failed to insert chunk: %v", err) + } + + // Update progress + t.mu.Lock() + t.progress.TablesProcessed[tableKey] = offset + t.mu.Unlock() + + // Save checkpoint every 1000 rows + if rowsInserted%1000 == 0 { + t.saveProgress() + } + + offset += int64(t.batchSize) + } + + return nil +} + +// insertRows inserts rows from a query result into the secondary database +func (t *InitialTransfer) insertRows(db *sql.DB, dbName, tableName string, columns []ColumnInfo, rows *sql.Rows) (int64, error) { + // Build INSERT statement + placeholders := make([]string, len(columns)) + colNames := make([]string, len(columns)) + for i := range columns { + placeholders[i] = "?" + colNames[i] = fmt.Sprintf("`%s`", columns[i].Name) + } + + insertSQL := fmt.Sprintf( + "INSERT INTO `%s`.`%s` (%s) VALUES (%s)", + dbName, tableName, strings.Join(colNames, ", "), strings.Join(placeholders, ", "), + ) + + // Prepare statement + stmt, err := db.Prepare(insertSQL) + if err != nil { + return 0, fmt.Errorf("failed to prepare statement: %v", err) + } + defer stmt.Close() + + // Insert rows + rowCount := int64(0) + for rows.Next() { + values := make([]interface{}, len(columns)) + valuePtrs := make([]interface{}, len(columns)) + + for i := range values { + valuePtrs[i] = &values[i] + } + + if err := rows.Scan(valuePtrs...); err != nil { + return rowCount, fmt.Errorf("failed to scan row: %v", err) + } + + _, err := stmt.Exec(values...) + if err != nil { + return rowCount, fmt.Errorf("failed to insert row: %v", err) + } + rowCount++ + } + + t.mu.Lock() + t.stats.TotalRows += rowCount + t.stats.TotalTables++ + t.mu.Unlock() + + Infof("[TRANSFER] Inserted %d rows into %s.%s", rowCount, dbName, tableName) + + return rowCount, rows.Err() +} + +// getBinlogPosition returns the current binlog position +func (t *InitialTransfer) getBinlogPosition() (string, error) { + var file string + var pos uint32 + var doDB, ignoreDB string // Ignore extra columns from SHOW MASTER STATUS + + err := t.primaryDB.QueryRow("SHOW MASTER STATUS").Scan(&file, &pos, &doDB, &ignoreDB) + if err == sql.ErrNoRows { + return "", nil + } + if err != nil { + return "", err + } + + return fmt.Sprintf("%s:%d", file, pos), nil +} + +// Close closes the database connections +func (t *InitialTransfer) Close() error { + if err := t.primaryDB.Close(); err != nil { + return err + } + return t.secondaryDB.Close() +} + +// GetStats returns the transfer statistics +func (t *InitialTransfer) GetStats() TransferStats { + t.mu.Lock() + defer t.mu.Unlock() + return t.stats +} diff --git a/pkg/replica/logging.go b/pkg/replica/logging.go new file mode 100644 index 0000000..c3444fd --- /dev/null +++ b/pkg/replica/logging.go @@ -0,0 +1,393 @@ +package replica + +import ( + "encoding/json" + "fmt" + "log/slog" + "net" + "os" + "sync" + "time" +) + +// LogLevel represents the logging level +type LogLevel slog.Level + +const ( + LevelDebug LogLevel = LogLevel(slog.LevelDebug) + LevelInfo LogLevel = LogLevel(slog.LevelInfo) + LevelWarn LogLevel = LogLevel(slog.LevelWarn) + LevelError LogLevel = LogLevel(slog.LevelError) +) + +// Logger is a structured logger with optional Graylog support +type Logger struct { + logger *slog.Logger + graylogMu sync.RWMutex + graylog *GraylogClient + level LogLevel + source string +} + +// GraylogClient handles sending logs to Graylog +type GraylogClient struct { + conn net.Conn + source string + extra map[string]interface{} + mu sync.Mutex + connected bool +} + +// GELFMessage represents a GELF message structure +type GELFMessage struct { + Version string `json:"version"` + Host string `json:"host"` + ShortMessage string `json:"short_message"` + FullMessage string `json:"full_message,omitempty"` + Timestamp float64 `json:"timestamp"` + Level int `json:"level"` + Source string `json:"source,omitempty"` + Extra map[string]interface{} `json:"-"` +} + +// NewGraylogClient creates a new Graylog client +func NewGraylogClient(cfg GraylogConfig) (*GraylogClient, error) { + var conn net.Conn + var err error + + protocol := cfg.Protocol + if protocol == "" { + protocol = "udp" + } + + conn, err = net.DialTimeout(protocol, cfg.Endpoint, cfg.Timeout) + if err != nil { + return nil, fmt.Errorf("failed to connect to Graylog: %w", err) + } + + return &GraylogClient{ + conn: conn, + source: cfg.Source, + extra: cfg.ExtraFields, + connected: true, + }, nil +} + +// send sends a GELF message to Graylog +func (g *GraylogClient) send(msg *GELFMessage) error { + g.mu.Lock() + defer g.mu.Unlock() + + if !g.connected { + return nil + } + + // Add extra fields + if g.extra != nil { + msg.Extra = g.extra + } + + data, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to marshal GELF message: %w", err) + } + + // Add null byte as delimiter for UDP + var delimiter byte + if udpConn, ok := g.conn.(*net.UDPConn); ok { + delimiter = 0 + _, err = udpConn.Write(append(data, delimiter)) + } else { + _, err = g.conn.Write(data) + } + + if err != nil { + g.connected = false + return fmt.Errorf("failed to send to Graylog: %w", err) + } + + return nil +} + +// Close closes the Graylog connection +func (g *GraylogClient) Close() error { + g.mu.Lock() + defer g.mu.Unlock() + + if g.conn != nil { + g.connected = false + return g.conn.Close() + } + return nil +} + +// NewLogger creates a new logger instance +func NewLogger() *Logger { + // Create slog handler with JSON output for structured logging + handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + }) + + return &Logger{ + logger: slog.New(handler), + level: LevelInfo, + source: "binlog-sync", + } +} + +// NewLoggerWithLevel creates a new logger with specified level +func NewLoggerWithLevel(level LogLevel) *Logger { + handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.Level(level), + }) + + return &Logger{ + logger: slog.New(handler), + level: level, + source: "binlog-sync", + } +} + +// SetupGraylog configures Graylog integration +func (l *Logger) SetupGraylog(cfg GraylogConfig) error { + client, err := NewGraylogClient(cfg) + if err != nil { + return err + } + + l.graylogMu.Lock() + l.graylog = client + l.graylogMu.Unlock() + + return nil +} + +// With creates a new logger with additional context +func (l *Logger) With(args ...any) *Logger { + return &Logger{ + logger: l.logger.With(args...), + level: l.level, + source: l.source, + } +} + +// Debug logs a debug message +func (l *Logger) Debug(msg string, args ...any) { + l.logger.Debug(msg, args...) + l.sendToGraylog(slog.LevelDebug, msg, args) +} + +// Info logs an info message +func (l *Logger) Info(msg string, args ...any) { + l.logger.Info(msg, args...) + l.sendToGraylog(slog.LevelInfo, msg, args) +} + +// Warn logs a warning message +func (l *Logger) Warn(msg string, args ...any) { + l.logger.Warn(msg, args...) + l.sendToGraylog(slog.LevelWarn, msg, args) +} + +// Error logs an error message +func (l *Logger) Error(msg string, args ...any) { + l.logger.Error(msg, args...) + l.sendToGraylog(slog.LevelError, msg, args) +} + +// Fatal logs a fatal message and exits +func (l *Logger) Fatal(msg string, args ...any) { + l.logger.Error(msg, args...) + l.sendToGraylog(slog.LevelError, msg, args) + os.Exit(1) +} + +// Debugf logs a formatted debug message +func (l *Logger) Debugf(format string, args ...any) { + msg := fmt.Sprintf(format, args...) + l.logger.Debug(msg) + l.sendToGraylog(slog.LevelDebug, msg, nil) +} + +// Infof logs a formatted info message +func (l *Logger) Infof(format string, args ...any) { + msg := fmt.Sprintf(format, args...) + l.logger.Info(msg) + l.sendToGraylog(slog.LevelInfo, msg, nil) +} + +// Warnf logs a formatted warning message +func (l *Logger) Warnf(format string, args ...any) { + msg := fmt.Sprintf(format, args...) + l.logger.Warn(msg) + l.sendToGraylog(slog.LevelWarn, msg, nil) +} + +// Errorf logs a formatted error message +func (l *Logger) Errorf(format string, args ...any) { + msg := fmt.Sprintf(format, args...) + l.logger.Error(msg) + l.sendToGraylog(slog.LevelError, msg, nil) +} + +// Fatalf logs a formatted fatal message and exits +func (l *Logger) Fatalf(format string, args ...any) { + msg := fmt.Sprintf(format, args...) + l.logger.Error(msg) + l.sendToGraylog(slog.LevelError, msg, nil) + os.Exit(1) +} + +// Log logs a message at the specified level +func (l *Logger) Log(level LogLevel, msg string, args ...any) { + switch level { + case LevelDebug: + l.logger.Debug(msg, args...) + case LevelInfo: + l.logger.Info(msg, args...) + case LevelWarn: + l.logger.Warn(msg, args...) + case LevelError: + l.logger.Error(msg, args...) + } + l.sendToGraylog(slog.Level(level), msg, args) +} + +// sendToGraylog sends a log message to Graylog if configured +func (l *Logger) sendToGraylog(level slog.Level, msg string, args []any) { + l.graylogMu.RLock() + defer l.graylogMu.RUnlock() + + if l.graylog == nil { + return + } + + // Convert args to structured data + extra := make(map[string]interface{}) + if args != nil { + for i := 0; i < len(args)-1; i += 2 { + if key, ok := args[i].(string); ok { + extra[key] = args[i+1] + } + } + } + + gelfMsg := &GELFMessage{ + Version: "1.1", + Host: l.source, + ShortMessage: msg, + Timestamp: float64(time.Now().UnixNano()) / 1e9, + Level: int(level) + 1, // GELF levels: 1=emerg, 2=alert, 3=crit, 4=err, 5=warn, 6=notice, 7=info, 8=debug + Source: l.source, + Extra: extra, + } + + // Send in background to not block + go func() { + _ = l.graylog.send(gelfMsg) + }() +} + +// Close closes the logger and any underlying connections +func (l *Logger) Close() { + l.graylogMu.Lock() + defer l.graylogMu.Unlock() + + if l.graylog != nil { + l.graylog.Close() + l.graylog = nil + } +} + +// GetLogger returns the underlying slog.Logger +func (l *Logger) GetLogger() *slog.Logger { + return l.logger +} + +// Handler returns a slog.Handler for use with other libraries +func (l *Logger) Handler() slog.Handler { + return l.logger.Handler() +} + +// Global logger instance - renamed to avoid conflict with log package +var globalLogger *Logger + +// Init initializes the global logger +func InitLogger() { + globalLogger = NewLogger() +} + +// InitLoggerWithLevel initializes the global logger with a specific level +func InitLoggerWithLevel(level LogLevel) { + globalLogger = NewLoggerWithLevel(level) +} + +// SetupGlobalGraylog configures Graylog for the global logger +func SetupGlobalGraylog(cfg GraylogConfig) error { + return globalLogger.SetupGraylog(cfg) +} + +// GetLogger returns the global logger instance +func GetLogger() *Logger { + if globalLogger == nil { + InitLogger() + } + return globalLogger +} + +// Convenience functions using global logger + +// Debug logs a debug message +func Debug(msg string, args ...any) { + GetLogger().Debug(msg, args...) +} + +// Info logs an info message +func Info(msg string, args ...any) { + GetLogger().Info(msg, args...) +} + +// Warn logs a warning message +func Warn(msg string, args ...any) { + GetLogger().Warn(msg, args...) +} + +// Error logs an error message +func Error(msg string, args ...any) { + GetLogger().Error(msg, args...) +} + +// Fatal logs a fatal message and exits +func Fatal(msg string, args ...any) { + GetLogger().Fatal(msg, args...) +} + +// Debugf logs a formatted debug message +func Debugf(format string, args ...any) { + GetLogger().Debugf(format, args...) +} + +// Infof logs a formatted info message +func Infof(format string, args ...any) { + GetLogger().Infof(format, args...) +} + +// Warnf logs a formatted warning message +func Warnf(format string, args ...any) { + GetLogger().Warnf(format, args...) +} + +// Errorf logs a formatted error message +func Errorf(format string, args ...any) { + GetLogger().Errorf(format, args...) +} + +// Fatalf logs a formatted fatal message and exits +func Fatalf(format string, args ...any) { + GetLogger().Fatalf(format, args...) +} + +// Log logs a message at the specified level +func Log(level LogLevel, msg string, args ...any) { + GetLogger().Log(level, msg, args...) +} diff --git a/pkg/replica/position.go b/pkg/replica/position.go new file mode 100644 index 0000000..3fb4f04 --- /dev/null +++ b/pkg/replica/position.go @@ -0,0 +1,140 @@ +package replica + +import ( + "database/sql" + "encoding/json" + "os" + + "github.com/go-mysql-org/go-mysql/mysql" +) + +// PositionManager handles binlog position persistence +type PositionManager struct { + db *sql.DB + positionFile string +} + +// NewPositionManager creates a new position manager +func NewPositionManager(db *sql.DB, positionFile string) *PositionManager { + return &PositionManager{ + db: db, + positionFile: positionFile, + } +} + +// InitTable creates the binlog_position table if it doesn't exist +func (pm *PositionManager) InitTable() error { + if pm.db == nil { + return nil + } + + createTableSQL := ` + CREATE TABLE IF NOT EXISTS binlog_position ( + id INT PRIMARY KEY, + log_file VARCHAR(255), + log_pos BIGINT, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP + ) + ` + + _, err := pm.db.Exec(createTableSQL) + if err != nil { + Errorf("[ERROR] Failed to create binlog_position table: %v", err) + return err + } + + return nil +} + +// Load loads the last saved position from database or file +func (pm *PositionManager) Load() (mysql.Position, error) { + if pm.db != nil { + return pm.loadFromDB() + } + return pm.loadFromFile() +} + +func (pm *PositionManager) loadFromDB() (mysql.Position, error) { + var name string + var pos uint32 + + err := pm.db.QueryRow( + "SELECT log_file, log_pos FROM binlog_position WHERE id = 1", + ).Scan(&name, &pos) + + if err == sql.ErrNoRows { + return mysql.Position{Name: "", Pos: 0}, nil + } + if err != nil { + return mysql.Position{Name: "", Pos: 0}, err + } + + Infof("[INFO] Loaded position: %s:%d", name, pos) + return mysql.Position{Name: name, Pos: pos}, nil +} + +func (pm *PositionManager) loadFromFile() (mysql.Position, error) { + data, err := os.ReadFile(pm.positionFile) + if err != nil { + return mysql.Position{Name: "", Pos: 0}, nil + } + + var pos mysql.Position + err = json.Unmarshal(data, &pos) + return pos, err +} + +// Save saves the current position to database or file +func (pm *PositionManager) Save(pos mysql.Position) error { + if pm.db != nil { + return pm.saveToDB(pos) + } + return pm.saveToFile(pos) +} + +// Reset clears the saved position +func (pm *PositionManager) Reset() error { + if pm.db != nil { + return pm.resetInDB() + } + return pm.resetInFile() +} + +func (pm *PositionManager) resetInDB() error { + _, err := pm.db.Exec("DELETE FROM binlog_position WHERE id = 1") + return err +} + +func (pm *PositionManager) resetInFile() error { + return os.Remove(pm.positionFile) +} + +func (pm *PositionManager) saveToDB(pos mysql.Position) error { + result, err := pm.db.Exec( + "INSERT INTO binlog_position (id, log_file, log_pos) VALUES (1, ?, ?) "+ + "ON DUPLICATE KEY UPDATE log_file = VALUES(log_file), log_pos = VALUES(log_pos)", + pos.Name, pos.Pos, + ) + + if err != nil { + Errorf("[ERROR] Failed to save position: %v", err) + return err + } + + rowsAffected, _ := result.RowsAffected() + Infof("[INFO] Saved position: %s:%d (rows: %d)", pos.Name, pos.Pos, rowsAffected) + return nil +} + +func (pm *PositionManager) saveToFile(pos mysql.Position) error { + data, err := json.Marshal(pos) + if err != nil { + return err + } + return os.WriteFile(pm.positionFile, data, 0644) +} + +// GetPositionFile returns the position file path +func (pm *PositionManager) GetPositionFile() string { + return pm.positionFile +} diff --git a/pkg/replica/service.go b/pkg/replica/service.go new file mode 100644 index 0000000..798b51b --- /dev/null +++ b/pkg/replica/service.go @@ -0,0 +1,397 @@ +package replica + +import ( + "context" + "database/sql" + "fmt" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" + _ "github.com/go-sql-driver/mysql" +) + +// BinlogSyncService handles MySQL binlog streaming with resilience features +type BinlogSyncService struct { + syncer *replication.BinlogSyncer + streamer *replication.BinlogStreamer + position mysql.Position + handlers *EventHandlers + positionMgr *PositionManager + stopChan chan struct{} + instanceName string + secondaryName string + primaryDB *sql.DB + lastEventPos uint32 // for deduplication + lastEventGTID string // for GTID-based deduplication (future) + healthTicker *time.Ticker // for periodic health checks +} + +// NewBinlogSyncService creates a new binlog sync service +func NewBinlogSyncService(cfg BinlogConfig, primaryDB, secondaryDB *sql.DB, secondaryName string) *BinlogSyncService { + syncerCfg := replication.BinlogSyncerConfig{ + ServerID: cfg.ServerID, + Flavor: "mariadb", + Host: cfg.Host, + Port: cfg.Port, + User: cfg.User, + Password: cfg.Password, + } + + positionFile := fmt.Sprintf("binlog_position_%s_%s.json", cfg.Name, secondaryName) + + return &BinlogSyncService{ + syncer: replication.NewBinlogSyncer(syncerCfg), + handlers: NewEventHandlers(secondaryDB, secondaryName), + positionMgr: NewPositionManager(secondaryDB, positionFile), + stopChan: make(chan struct{}), + instanceName: cfg.Name, + secondaryName: secondaryName, + primaryDB: primaryDB, + } +} + +// Start begins binlog streaming +func (s *BinlogSyncService) Start() error { + if err := s.positionMgr.InitTable(); err != nil { + Warnf("[%s][WARN] Failed to init position table: %v", s.secondaryName, err) + } + + pos, err := s.positionMgr.Load() + if err != nil { + return fmt.Errorf("failed to load position: %v", err) + } + + if pos.Name == "" { + s.streamer, err = s.syncer.StartSync(mysql.Position{Name: "", Pos: 4}) + } else { + s.streamer, err = s.syncer.StartSync(pos) + } + + if err != nil { + return fmt.Errorf("failed to start sync: %v", err) + } + + Infof("[%s] Started streaming from %s:%d", s.instanceName, pos.Name, pos.Pos) + return s.processEvents() +} + +// StartWithStop begins binlog streaming with graceful shutdown +func (s *BinlogSyncService) StartWithStop() error { + if err := s.positionMgr.InitTable(); err != nil { + Warnf("[%s][WARN] Failed to init position table: %v", s.secondaryName, err) + } + + pos, err := s.positionMgr.Load() + if err != nil { + return fmt.Errorf("failed to load position: %v", err) + } + + if pos.Name == "" { + s.streamer, err = s.syncer.StartSync(mysql.Position{Name: "", Pos: 4}) + } else { + s.streamer, err = s.syncer.StartSync(pos) + } + + if err != nil { + return fmt.Errorf("failed to start sync: %v", err) + } + + Infof("[%s] Started streaming from %s:%d", s.instanceName, pos.Name, pos.Pos) + return s.processEventsWithStop() +} + +// processEventsWithStop processes events with graceful shutdown and health checks +func (s *BinlogSyncService) processEventsWithStop() error { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + // Start health check ticker (every 30 seconds) + s.healthTicker = time.NewTicker(30 * time.Second) + defer s.healthTicker.Stop() + + // Start health check goroutine + healthChan := make(chan error, 1) + go s.runHealthChecks(healthChan) + + for { + select { + case <-sigChan: + Infof("[%s] Shutting down...", s.secondaryName) + s.positionMgr.Save(s.position) + s.syncer.Close() + return nil + case <-s.stopChan: + Infof("[%s] Stop signal received", s.secondaryName) + s.positionMgr.Save(s.position) + s.syncer.Close() + return nil + case <-s.healthTicker.C: + // Trigger health check + go s.runHealthChecks(healthChan) + case healthErr := <-healthChan: + if healthErr != nil { + Errorf("[%s][HEALTH CHECK] Failed: %v", s.secondaryName, healthErr) + } + default: + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + ev, err := s.streamer.GetEvent(ctx) + cancel() + + if err == context.DeadlineExceeded { + continue + } + if err != nil { + return fmt.Errorf("failed to get event: %v", err) + } + + s.processEvent(ev) + } + } +} + +// runHealthChecks performs periodic health checks +func (s *BinlogSyncService) runHealthChecks(resultChan chan<- error) { + // Check secondary DB connection + if err := s.handlers.PingSecondary(); err != nil { + resultChan <- fmt.Errorf("secondary DB ping failed: %v", err) + return + } + + // Check if syncer is still healthy + if s.syncer == nil { + resultChan <- fmt.Errorf("syncer is nil") + return + } + + resultChan <- nil +} + +// processEvents processes events continuously +func (s *BinlogSyncService) processEvents() error { + for { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + ev, err := s.streamer.GetEvent(ctx) + cancel() + + if err == context.DeadlineExceeded { + continue + } + if err != nil { + return fmt.Errorf("failed to get event: %v", err) + } + + s.processEvent(ev) + } +} + +// processEvent processes a single event with panic recovery +// Note: Deduplication disabled as it was causing valid events to be skipped +func (s *BinlogSyncService) processEvent(ev *replication.BinlogEvent) { + defer func() { + if r := recover(); r != nil { + Errorf("[%s][PANIC RECOVERED] processEvent panic: %v", s.secondaryName, r) + } + }() + + s.position.Pos = ev.Header.LogPos + + switch e := ev.Event.(type) { + case *replication.RotateEvent: + s.position.Name = string(e.NextLogName) + Infof("[%s] Rotated to %s", s.instanceName, s.position.Name) + + case *replication.TableMapEvent: + s.handlers.HandleTableMap(e) + + case *replication.RowsEvent: + s.handlers.HandleRows(ev.Header, e) + + case *replication.QueryEvent: + s.handlers.HandleQuery(e) + + case *replication.XIDEvent: + s.positionMgr.Save(s.position) + } + + if ev.Header.LogPos%1000 == 0 { + s.positionMgr.Save(s.position) + } +} + +// Stop signals the service to stop +func (s *BinlogSyncService) Stop() { + close(s.stopChan) +} + +// NeedResync checks if a resync is needed +func (s *BinlogSyncService) NeedResync() (bool, error) { + // Check if position file/table exists + pos, err := s.positionMgr.Load() + if err != nil { + return false, err + } + + // If no position saved, we need initial transfer + if pos.Name == "" { + Infof("[%s] No saved position found, resync needed", s.secondaryName) + return true, nil + } + + // Check if secondary DB has any tables/data + hasData, err := s.checkSecondaryHasData() + if err != nil { + return false, err + } + + if !hasData { + Infof("[%s] Secondary database is empty, resync needed", s.secondaryName) + return true, nil + } + + return false, nil +} + +// checkSecondaryHasData checks if the secondary database has any data +func (s *BinlogSyncService) checkSecondaryHasData() (bool, error) { + // Get the secondary DB from position manager + db := s.positionMgr.db + if db == nil { + return true, nil // Can't check, assume OK + } + + // Check if any table has data + var count int + err := db.QueryRow("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'replica'").Scan(&count) + if err != nil { + return false, err + } + + return count > 0, nil +} + +// RunInitialTransfer performs the initial data transfer +func (s *BinlogSyncService) RunInitialTransfer(batchSize int, excludeSchemas []string) error { + Infof("[%s] Starting initial data transfer...", s.secondaryName) + + // Get secondary DB from handlers + secondaryDB := s.handlers.secondaryDB + if secondaryDB == nil { + return fmt.Errorf("secondary DB not available") + } + + // Create initial transfer with proper initialization + transfer := &InitialTransfer{ + primaryDB: s.primaryDB, + secondaryDB: secondaryDB, + batchSize: batchSize, + workerCount: 1, + excludedDBs: map[string]bool{ + "information_schema": true, + "performance_schema": true, + "mysql": true, + "sys": true, + }, + checkpointFile: fmt.Sprintf("transfer_progress_%s_%s.json", s.instanceName, s.secondaryName), + progress: TransferProgress{ + TablesProcessed: make(map[string]int64), + }, + } + + if err := transfer.Transfer(excludeSchemas); err != nil { + return fmt.Errorf("transfer failed: %v", err) + } + + // Reset position after successful transfer + if err := s.positionMgr.Reset(); err != nil { + return fmt.Errorf("failed to reset position: %v", err) + } + + Infof("[%s] Initial transfer completed successfully", s.secondaryName) + return nil +} + +// StartWithResync starts binlog streaming with automatic resync if needed +func (s *BinlogSyncService) StartWithResync(batchSize int, excludeSchemas []string) error { + if err := s.positionMgr.InitTable(); err != nil { + Warnf("[%s][WARN] Failed to init position table: %v", s.secondaryName, err) + } + + // Check if resync is needed + needResync, err := s.NeedResync() + if err != nil { + return fmt.Errorf("failed to check resync status: %v", err) + } + + if needResync { + if err := s.RunInitialTransfer(batchSize, excludeSchemas); err != nil { + return fmt.Errorf("initial transfer failed: %v", err) + } + } + + // Start normal streaming + return s.StartWithStop() +} + +// GetSecondaryName returns the secondary name +func (s *BinlogSyncService) GetSecondaryName() string { + return s.secondaryName +} + +// MultiBinlogSyncService manages multiple binlog sync services +type MultiBinlogSyncService struct { + services []*BinlogSyncService + stopChan chan struct{} + wg sync.WaitGroup +} + +// NewMultiBinlogSyncService creates a new multi-secondary binlog sync service +func NewMultiBinlogSyncService() *MultiBinlogSyncService { + return &MultiBinlogSyncService{ + services: make([]*BinlogSyncService, 0), + stopChan: make(chan struct{}), + } +} + +// AddService adds a service to the multi-service manager +func (m *MultiBinlogSyncService) AddService(service *BinlogSyncService) { + m.services = append(m.services, service) +} + +// StartAll starts all services +func (m *MultiBinlogSyncService) StartAll() error { + for _, service := range m.services { + m.wg.Add(1) + go func(svc *BinlogSyncService) { + defer m.wg.Done() + Infof("[%s] Starting binlog sync service", svc.GetSecondaryName()) + if err := svc.StartWithResync(1000, []string{"information_schema", "performance_schema", "mysql", "sys"}); err != nil { + Errorf("[%s] Service error: %v", svc.GetSecondaryName(), err) + } + }(service) + } + return nil +} + +// StopAll stops all services +func (m *MultiBinlogSyncService) StopAll() { + close(m.stopChan) + for _, service := range m.services { + service.Stop() + } + m.wg.Wait() +} + +// Wait waits for all services to complete +func (m *MultiBinlogSyncService) Wait() { + m.wg.Wait() +} + +// Len returns the number of services +func (m *MultiBinlogSyncService) Len() int { + return len(m.services) +} diff --git a/pkg/replica/sqlbuilder.go b/pkg/replica/sqlbuilder.go new file mode 100644 index 0000000..6e7b72f --- /dev/null +++ b/pkg/replica/sqlbuilder.go @@ -0,0 +1,168 @@ +package replica + +import ( + "fmt" + "strings" + + "github.com/go-mysql-org/go-mysql/replication" +) + +// SQLBuilder handles SQL statement building +type SQLBuilder struct{} + +// NewSQLBuilder creates a new SQL builder +func NewSQLBuilder() *SQLBuilder { + return &SQLBuilder{} +} + +// BuildInsert builds an INSERT statement +func (sb *SQLBuilder) BuildInsert(schema, table string, tableMap *replication.TableMapEvent, row []interface{}) string { + if len(row) == 0 { + return fmt.Sprintf("INSERT INTO `%s`.`%s` VALUES ()", schema, table) + } + + var columns []string + var values []string + + for i, col := range row { + colName := sb.getColumnName(tableMap, i) + columns = append(columns, colName) + values = append(values, formatValue(col)) + } + + return fmt.Sprintf("INSERT INTO `%s`.`%s` (%s) VALUES (%s)", + schema, table, strings.Join(columns, ", "), strings.Join(values, ", ")) +} + +// BuildUpdate builds an UPDATE statement +func (sb *SQLBuilder) BuildUpdate(schema, table string, tableMap *replication.TableMapEvent, before, after []interface{}) string { + if len(before) == 0 || len(after) == 0 { + return fmt.Sprintf("UPDATE `%s`.`%s` SET id = id", schema, table) + } + + var setClauses []string + var whereClauses []string + + for i := range before { + colName := sb.getColumnName(tableMap, i) + if !valuesEqual(before[i], after[i]) { + setClauses = append(setClauses, fmt.Sprintf("%s = %s", colName, formatValue(after[i]))) + } + if i == 0 { + whereClauses = append(whereClauses, fmt.Sprintf("%s = %s", colName, formatValue(before[i]))) + } + } + + return fmt.Sprintf("UPDATE `%s`.`%s` SET %s WHERE %s", + schema, table, strings.Join(setClauses, ", "), strings.Join(whereClauses, " AND ")) +} + +// BuildDelete builds a DELETE statement +func (sb *SQLBuilder) BuildDelete(schema, table string, tableMap *replication.TableMapEvent, row []interface{}) string { + if len(row) == 0 { + return fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE 1=0", schema, table) + } + + colName := sb.getColumnName(tableMap, 0) + whereClause := fmt.Sprintf("%s = %s", colName, formatValue(row[0])) + return fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE %s", schema, table, whereClause) +} + +// getColumnName returns the column name at the given index +func (sb *SQLBuilder) getColumnName(tableMap *replication.TableMapEvent, index int) string { + if tableMap == nil || index >= len(tableMap.ColumnName) { + return fmt.Sprintf("`col_%d`", index) + } + return fmt.Sprintf("`%s`", tableMap.ColumnName[index]) +} + +// formatValue formats a value for SQL +func formatValue(col interface{}) string { + switch v := col.(type) { + case []byte: + if str := string(v); validUTF8(v) { + return fmt.Sprintf("'%s'", strings.ReplaceAll(str, "'", "''")) + } + return fmt.Sprintf("X'%s'", hexEncode(v)) + case string: + return fmt.Sprintf("'%s'", strings.ReplaceAll(v, "'", "''")) + case nil: + return "NULL" + default: + return fmt.Sprintf("'%v'", v) + } +} + +func validUTF8(b []byte) bool { + for i := 0; i < len(b); { + if b[i] < 0x80 { + i++ + } else if (b[i] & 0xE0) == 0xC0 { + if i+1 >= len(b) || (b[i+1]&0xC0) != 0x80 { + return false + } + i += 2 + } else if (b[i] & 0xF0) == 0xE0 { + if i+2 >= len(b) || (b[i+1]&0xC0) != 0x80 || (b[i+2]&0xC0) != 0x80 { + return false + } + i += 3 + } else if (b[i] & 0xF8) == 0xF0 { + if i+3 >= len(b) || (b[i+1]&0xC0) != 0x80 || (b[i+2]&0xC0) != 0x80 || (b[i+3]&0xC0) != 0x80 { + return false + } + i += 4 + } else { + return false + } + } + return true +} + +func hexEncode(b []byte) string { + hexChars := "0123456789ABCDEF" + result := make([]byte, len(b)*2) + for i, byteVal := range b { + result[i*2] = hexChars[byteVal>>4] + result[i*2+1] = hexChars[byteVal&0x0F] + } + return string(result) +} + +// valuesEqual compares two values, handling slices properly +func valuesEqual(a, b interface{}) bool { + // Handle nil cases + if a == nil && b == nil { + return true + } + if a == nil || b == nil { + return false + } + + // Handle byte slices specially + if aBytes, ok := a.([]byte); ok { + if bBytes, ok := b.([]byte); ok { + return bytesEqual(aBytes, bBytes) + } + return false + } + if _, ok := b.([]byte); ok { + return false + } + + // For other types, use fmt.Sprintf comparison + return fmt.Sprintf("%v", a) == fmt.Sprintf("%v", b) +} + +// bytesEqual compares two byte slices +func bytesEqual(a, b []byte) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} diff --git a/replica b/replica new file mode 100755 index 0000000..4939496 Binary files /dev/null and b/replica differ