replica ready

This commit is contained in:
2026-02-12 04:52:03 +01:00
commit ba32991201
15 changed files with 3365 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
.env

642
README.md Normal file
View File

@@ -0,0 +1,642 @@
# MariaDB/MySQL Binlog Replication Service
A robust MySQL/MariaDB binlog streaming replication service with automatic initial data transfer, resilience features, and comprehensive error handling. Supports single or multi-secondary replica configurations with optional Graylog logging.
## Installation
### Quick Install (Go)
```bash
go install git.ma-al.com/goc_marek/replica/cmd/replica@latest
```
### Build from Source
```bash
# Clone the repository
git clone https://git.ma-al.com/goc_marek/replica.git
cd replica
# Build the service
go build -o replica ./cmd/replica
# Or install globally
go install ./cmd/replica
```
### Docker
```bash
# Build the image
docker build -t replica .
# Run with docker-compose
docker-compose up -d
```
## Quick Start
```bash
# Copy example environment
cp example.env .env
# Edit .env with your configuration
nano .env
# Run the service
./replica
```
## Features
### Core Functionality
- **Binlog Streaming**: Real-time replication from MySQL/MariaDB binlog events
- **Initial Data Transfer**: Automatic bulk data transfer when resync is needed
- **Multi-table Support**: Replicates INSERT, UPDATE, DELETE events for multiple tables
- **Position Persistence**: Saves and resumes from last processed binlog position
- **Schema Filtering**: Only replicates events for the configured schema (default: "replica")
### Resilience & Error Handling
- **Panic Recovery**: Automatic recovery from unexpected panics in event handlers
- **Retry Logic**: Exponential backoff retry for failed SQL operations
- **Connection Health Checks**: Periodic connection health monitoring with auto-reconnect
- **Schema Drift Detection**: Detects and handles schema changes during replication
- **Graceful Degradation**: Skips problematic tables after repeated failures
- **Auto-Reconnect**: Automatic reconnection on connection errors
### Transfer Features
- **Chunked Transfers**: Efficient batch transfers using primary key ranges
- **Progress Checkpointing**: Saves transfer progress for resume after interruption
- **Pause/Resume**: Support for pausing and resuming initial transfers
- **Transfer Statistics**: Detailed logging of transfer progress and errors
## Architecture
### Components
| File | Purpose |
|------|---------|
| [`cmd/replica/main.go`](cmd/replica/main.go) | Application entry point and configuration |
| [`pkg/replica/service.go`](pkg/replica/service.go) | BinlogSyncService - core replication orchestration |
| [`pkg/replica/handlers.go`](pkg/replica/handlers.go) | EventHandlers - binlog event processing with resilience |
| [`pkg/replica/initial_transfer.go`](pkg/replica/initial_transfer.go) | InitialTransfer - bulk data transfer management |
| [`pkg/replica/position.go`](pkg/replica/position.go) | PositionManager - binlog position persistence |
| [`pkg/replica/sqlbuilder.go`](pkg/replica/sqlbuilder.go) | SQLBuilder - SQL statement generation |
| [`pkg/replica/config.go`](pkg/replica/config.go) | Configuration types |
| [`pkg/replica/logging.go`](pkg/replica/logging.go) | Structured logging with Graylog support |
### Data Flow
```
Primary DB (MySQL/MariaDB)
|
v
Binlog Stream
|
v
BinlogSyncService.processEvent()
|
+---> Panic Recovery Wrapper
|
+---> EventHandlers.HandleRows()
| |
| +---> Schema Filter (only "replica" schema)
| +---> Schema Drift Check
| +---> Retry Logic (if needed)
| +---> Execute SQL
|
+---> PositionManager.Save()
|
+---> Health Checks (every 30s)
```
## Usage
### Quick Start
```bash
# Build the service
go build -o replica
# Run the service
./replica
```
### Configuration
### Environment Variables
All configuration is done via environment variables in the `.env` file:
```bash
# Copy example environment
cp example.env .env
# Edit with your settings
nano .env
```
### Primary Database Configuration
| Variable | Description | Default |
|----------|-------------|---------|
| `MARIA_PRIMARY_HOST` | Primary database hostname | `mariadb-primary` |
| `MARIA_PRIMARY_PORT` | Primary database port | `3306` |
| `MARIA_USER` | Replication user | `replica` |
| `MARIA_PASS` | Replication password | `replica` |
| `MARIA_SERVER_ID` | Unique server ID for binlog | `100` |
| `MARIA_PRIMARY_NAME` | Instance name for logging | `mariadb-primary` |
### Multi-Secondary Replica Configuration
The service supports replicating to multiple secondary databases simultaneously. Configure secondaries using comma-separated values:
| Variable | Description | Example |
|----------|-------------|---------|
| `MARIA_SECONDARY_HOSTS` | Comma-separated hostnames | `secondary-1,secondary-2,secondary-3` |
| `MARIA_SECONDARY_PORTS` | Comma-separated ports | `3307,3308,3309` |
| `MARIA_SECONDARY_NAMES` | Comma-separated instance names | `replica-1,replica-2,replica-3` |
| `MARIA_SECONDARY_USERS` | Per-secondary users (optional) | `replica1,replica2,replica3` |
| `MARIA_SECONDARY_PASSWORDS` | Per-secondary passwords (optional) | `pass1,pass2,pass3` |
#### Example: Single Secondary
```bash
MARIA_SECONDARY_HOSTS=mariadb-secondary
MARIA_SECONDARY_PORTS=3307
MARIA_SECONDARY_NAMES=secondary-1
```
#### Example: Three Secondaries
```bash
MARIA_SECONDARY_HOSTS=secondary-1,secondary-2,secondary-3
MARIA_SECONDARY_PORTS=3307,3308,3309
MARIA_SECONDARY_NAMES=replica-1,replica-2,replica-3
MARIA_SECONDARY_USERS=replica1,replica2,replica3
MARIA_SECONDARY_PASSWORDS=secret1,secret2,secret3
```
#### Example: Two Secondaries with Different Credentials
```bash
MARIA_SECONDARY_HOSTS=secondary-1,secondary-2
MARIA_SECONDARY_PORTS=3307,3308
MARIA_SECONDARY_NAMES=replica-east,replica-west
MARIA_SECONDARY_USERS=replica_east,replica_west
MARIA_SECONDARY_PASSWORDS=east_secret,west_secret
```
**Note:** If `MARIA_SECONDARY_USERS` or `MARIA_SECONDARY_PASSWORDS` are not provided, the default `MARIA_USER` and `MARIA_PASS` will be used for all secondaries.
### Graylog Configuration
| Variable | Description | Default |
|----------|-------------|---------|
| `GRAYLOG_ENABLED` | Enable Graylog logging | `false` |
| `GRAYLOG_ENDPOINT` | Graylog GELF endpoint | `localhost:12201` |
| `GRAYLOG_PROTOCOL` | Protocol (udp/tcp) | `udp` |
| `GRAYLOG_TIMEOUT` | Connection timeout | `5s` |
| `GRAYLOG_SOURCE` | Source name for logs | `binlog-sync` |
#### Enable Graylog Logging
```bash
# Edit .env and set:
GRAYLOG_ENABLED=true
GRAYLOG_ENDPOINT=graylog.example.com:12201
GRAYLOG_PROTOCOL=udp
GRAYLOG_SOURCE=binlog-sync-prod
```
### Other Settings
| Variable | Description | Default |
|----------|-------------|---------|
| `TRANSFER_BATCH_SIZE` | Rows per transfer chunk | `1000` |
| `LOCAL_PROJECT_NAME` | Project name for logging | `naluconcept` |
## Resilience Features
### Panic Recovery
All event handlers include automatic panic recovery:
```go
defer func() {
if r := recover(); r != nil {
log.Printf("[PANIC RECOVERED] %v", r)
}
}()
```
This prevents a single malformed event from crashing the entire service. Recovery is implemented in:
- [`HandleRows()`](handlers.go:42)
- [`HandleQuery()`](handlers.go:81)
- [`HandleTableMap()`](handlers.go:100)
- [`processEvent()`](service.go:177)
### Retry Logic
Failed SQL operations are retried with exponential backoff:
```go
func (h *EventHandlers) executeWithRetry(query string) error {
for attempt := 0; attempt <= h.retryAttempts; attempt++ {
if attempt > 0 {
delay := h.retryDelay * time.Duration(1<<attempt)
log.Printf("[RETRY] Retrying in %v (attempt %d/%d)", delay, attempt, h.retryAttempts)
time.Sleep(delay)
}
result, err := h.secondaryDB.Exec(query)
if err == nil {
return nil
}
// Check if connection error and reconnect
if h.isConnectionError(err) {
h.reconnect()
}
}
return lastErr
}
```
**Configuration:**
- `retryAttempts`: Number of retry attempts (default: 3)
- `retryDelay`: Base delay between retries (default: 100ms)
### Connection Health Checks
The service performs periodic health checks every 30 seconds:
```go
// Checks:
// 1. Ping secondary database connection
// 2. Verify syncer is still active
// 3. Log health status
```
If a health check fails, the error is logged. The next SQL operation will trigger an auto-reconnect.
### Schema Drift Detection
Before applying events, the service checks for schema changes:
```go
func (h *EventHandlers) detectSchemaDrift(schema, table string) bool {
currentHash, _ := h.getSchemaHash(schema, table)
lastHash := h.lastSchemaHash[schema+"."+table]
if lastHash != "" && lastHash != currentHash {
log.Printf("[DRIFT] Schema changed for %s.%s", schema, table)
return true // Drift detected
}
h.lastSchemaHash[schema+"."+table] = currentHash
return false
}
```
If schema drift is detected, the table is marked as failed and temporarily skipped.
### Graceful Degradation
Tables with repeated failures are temporarily skipped:
```go
// After 5 consecutive failures, skip the table
if h.failedTables[key] >= h.maxFailures {
log.Printf("[SKIPPED] Too many failures for %s.%s", schema, table)
return nil // Skip this event
}
// Reset count on successful operation
h.failedTables[key] = 0
```
**Configuration:**
- `maxFailures`: Consecutive failures before skipping (default: 5)
### Auto-Reconnect
Connection errors trigger automatic reconnection:
```go
func (h *EventHandlers) reconnect() {
h.secondaryDB.Close()
maxRetries := 5
for i := 0; i < maxRetries; i++ {
h.secondaryDB, err = sql.Open(dsn)
// Configure connection pool
// Ping to verify
if err == nil {
log.Printf("[RECONNECT] Successfully reconnected")
return
}
time.Sleep(time.Duration(i+1) * time.Second)
}
}
```
**Detected connection errors:**
- "connection refused"
- "connection reset"
- "broken pipe"
- "timeout"
- "driver: bad connection"
- "invalid connection"
## Initial Transfer
When resync is needed (empty replica or no saved position), the service performs an initial data transfer:
### Transfer Process
1. **Detection**: Check if secondary database is empty or no position saved
2. **Database Enumeration**: List all databases from primary
3. **Schema Exclusion**: Skip excluded schemas (information_schema, mysql, etc.)
4. **Table Transfer**: For each table:
- Get table schema (column definitions)
- Check row count
- Transfer in chunks using primary key or LIMIT/OFFSET
5. **Progress Checkpointing**: Save progress to JSON file every 1000 rows
6. **Position Reset**: Clear saved binlog position after successful transfer
7. **Binlog Streaming**: Start streaming from current position
### Chunked Transfer
Tables are transferred in chunks for efficiency and memory safety:
```sql
-- Using primary key (efficient, preserves order)
SELECT * FROM table WHERE pk >= 1000 AND pk < 2000 ORDER BY pk
-- Without primary key (slower, may skip rows on updates)
SELECT * FROM table LIMIT 1000 OFFSET 1000
```
**Batch Size:** Configurable (default: 1000 rows per chunk)
### Progress Checkpointing
Transfer progress is saved to `transfer_progress_{instance}.json`:
```json
{
"DatabasesProcessed": 2,
"CurrentDatabase": "mydb",
"TablesProcessed": {
"mydb.users": 5000,
"mydb.orders": 10000
},
"LastCheckpoint": "2024-01-15T10:30:00Z"
}
```
If the transfer is interrupted, it resumes from the last checkpoint.
### Pause/Resume
Transfers can be paused and resumed programmatically:
```go
transfer := NewInitialTransfer(dsn, dsn, 1000, 1)
// Pause during transfer
transfer.Pause()
// Resume later
transfer.Resume()
```
## Configuration Reference
### BinlogConfig
```go
type BinlogConfig struct {
Host string // MySQL/MariaDB host
Port int // MySQL/MariaDB port
User string // Replication user
Password string // Password
ServerID uint32 // Unique server ID for replication
Name string // Instance name for logging
}
```
### EventHandlers
```go
type EventHandlers struct {
secondaryDB *sql.DB
tableMapCache map[uint64]*replication.TableMapEvent
sqlBuilder *SQLBuilder
failedTables map[string]int
maxFailures int // Default: 5
retryAttempts int // Default: 3
retryDelay time.Duration // Default: 100ms
lastSchemaHash map[string]string
}
```
### InitialTransfer
```go
type InitialTransfer struct {
primaryDB *sql.DB
secondaryDB *sql.DB
batchSize int // Default: 1000
workerCount int // Default: 1
excludedDBs map[string]bool
checkpointFile string
progress TransferProgress
}
```
### TransferProgress
```go
type TransferProgress struct {
DatabasesProcessed int
CurrentDatabase string
TablesProcessed map[string]int64 // "schema.table" -> rows transferred
LastCheckpoint time.Time
}
```
### TransferStats
```go
type TransferStats struct {
TotalRows int64
TotalTables int
TransferTime int64 // milliseconds
Errors []string
Progress TransferProgress
}
```
## Logging
The service uses structured logging with prefixes:
| Prefix | Example | Description |
|--------|---------|-------------|
| `[INSERT]` | `[INSERT] 1 row(s) affected` | Successful INSERT |
| `[UPDATE]` | `[UPDATE] 1 row(s) affected` | Successful UPDATE |
| `[DELETE]` | `[DELETE] 1 row(s) affected` | Successful DELETE |
| `[SUCCESS]` | `[SUCCESS] 1 row(s) affected` | SQL operation succeeded |
| `[ERROR]` | `[ERROR] INSERT failed after retries` | Operation failed |
| `[WARN]` | `[WARN] Schema drift detected` | Warning condition |
| `[PANIC]` | `[PANIC RECOVERED] panic message` | Recovered panic |
| `[RETRY]` | `[RETRY] Retrying in 200ms` | Retry attempt |
| `[DRIFT]` | `[DRIFT] Schema changed` | Schema drift detected |
| `[SKIPPED]` | `[SKIPPED] Too many failures` | Table skipped |
| `[FAILURE]` | `[FAILURE] failure count: 3/5` | Table failure count |
| `[HEALTH]` | `[HEALTH CHECK] Failed` | Health check result |
| `[TRANSFER]` | `[TRANSFER] Found 5 tables` | Transfer progress |
| `[INFO]` | `[INFO] Saved position` | Informational |
| `[ROTATE]` | `[mariadb-primary] Rotated to binlog.000001` | Binlog rotation |
| `[RECONNECT]` | `[RECONNECT] Successfully reconnected` | Reconnection result |
## Schema Filtering
By default, only events for the "replica" schema are replicated:
```go
// handlers.go line 54
if schemaName != "replica" {
return nil // Skip all other schemas
}
```
To replicate all schemas, comment out or modify this filter.
## Error Handling
### Common Errors
| Error | Cause | Resolution |
|-------|-------|------------|
| `connection refused` | Secondary DB down | Check secondary DB status |
| `schema drift detected` | Table schema changed | Manual intervention required |
| `too many failures` | Table repeatedly failing | Check table compatibility |
| `failed to get event` | Binlog stream interrupted | Service auto-recovers |
| `expected 4 destination arguments` | SHOW MASTER STATUS columns | Fixed in code |
| `assignment to entry in nil map` | Uninitialized map | Fixed in code |
### Recovery Procedures
1. **Secondary DB Connection Lost**:
- Service auto-reconnects (up to 5 attempts)
- Check secondary DB logs
- Verify network connectivity
2. **Schema Drift Detected**:
- Stop service
- Compare schemas: `SHOW CREATE TABLE schema.table` on both DBs
- Sync schemas manually
- Reset position: `DELETE FROM binlog_position`
- Restart service
3. **Transfer Interrupted**:
- Service resumes from `transfer_progress_{instance}.json`
- Check checkpoint file for progress
- Delete checkpoint file to restart transfer
4. **Event Processing Stuck**:
- Check health check logs
- Verify binlog position: `SELECT * FROM binlog_position`
- Restart service if needed
- Clear position: `DELETE FROM binlog_position`
## Monitoring
### Key Metrics to Watch
- **Replication Lag**: Time since last event processed
- **Rows Replicated**: Total INSERT/UPDATE/DELETE count
- **Tables Synced**: Number of tables synchronized
- **Error Count**: Failed operations counter
- **Success Rate**: Successful / total operations
### Manual Verification
```sql
-- Check position on secondary
SELECT * FROM binlog_position;
-- Check row counts
SELECT COUNT(*) FROM replica.your_table;
-- Compare with primary
SELECT COUNT(*) FROM your_table;
```
## Performance Considerations
1. **Batch Size**: Start with 1000, adjust based on table size and memory
2. **Connection Pooling**: `SetMaxOpenConns(25)` for moderate load
3. **Worker Count**: Currently single-threaded (multi-worker planned)
4. **Schema Caching**: Table schemas cached in memory (auto-updated on drift)
5. **Index Usage**: Chunked transfers require indexed primary key
## Limitations
- **Single-threaded**: One worker processes events sequentially
- **Position-based**: No GTID support yet (position-based only)
- **Integer PKs**: Chunking requires integer primary key for efficiency
- **No Conflict Resolution**: Concurrent writes not handled
- **No Data Validation**: Assumes source data is valid
## Known Issues & Fixes
### Fixed Issues
- [x] `SHOW MASTER STATUS` returning 4 columns (fixed by scanning 4 variables)
- [x] `nil map` panic in transfer (fixed by initializing TablesProcessed)
- [x] Event deduplication skipping valid events (disabled deduplication)
- [x] Checkpoint file path empty (fixed by setting instance-specific path)
### Workarounds
- **Position 0 Events**: Deduplication disabled for now
- **Schema Changes**: Service marks table as failed, manual restart required
## Future Enhancements
- [ ] GTID-based replication for better consistency
- [ ] Multi-threaded replication for higher throughput
- [ ] Conflict detection and resolution
- [ ] Prometheus metrics endpoint
- [ ] REST API for management
- [ ] Kubernetes operator
- [ ] Configuration file (YAML/JSON)
- [ ] Signal handling for dynamic config
## File Structure
```
replica/
├── cmd/
│ └── replica/
│ └── main.go # Entry point
├── pkg/
│ └── replica/
│ ├── service.go # Replication orchestration
│ ├── handlers.go # Event processing
│ ├── initial_transfer.go # Bulk data transfer
│ ├── position.go # Position persistence
│ ├── sqlbuilder.go # SQL generation
│ ├── config.go # Configuration types
│ └── logging.go # Structured logging
├── example.env # Environment template
├── .env # Environment (gitignored)
├── docker-compose.yml # Local development
├── go.mod # Go module
└── README.md # This file
```
## License
MIT License

100
cmd/replica/main.go Normal file
View File

@@ -0,0 +1,100 @@
package main
import (
"database/sql"
"os"
"os/signal"
"strconv"
"syscall"
"git.ma-al.com/goc_marek/replica/pkg/replica"
_ "github.com/go-sql-driver/mysql"
)
func main() {
// Initialize the logger
replica.InitLogger()
// Load configuration from environment
cfg, err := replica.LoadEnvConfig()
if err != nil {
replica.Fatalf("Failed to load configuration: %v", err)
}
// Setup Graylog if enabled
if cfg.Graylog.Enabled {
replica.Infof("Graylog enabled: %s", cfg.Graylog.Endpoint)
if err := replica.SetupGlobalGraylog(replica.GraylogConfig{
Endpoint: cfg.Graylog.Endpoint,
Protocol: cfg.Graylog.Protocol,
Timeout: cfg.Graylog.Timeout,
Source: cfg.Graylog.Source,
ExtraFields: cfg.Graylog.ExtraFields,
}); err != nil {
replica.Warnf("Failed to setup Graylog: %v", err)
} else {
replica.Info("Graylog setup successful")
}
}
replica.Infof("Loaded configuration: %d secondary(ies)", len(cfg.Secondaries))
// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Connect to primary MariaDB
primaryDSN := cfg.Primary.User + ":" + cfg.Primary.Password + "@tcp(" + cfg.Primary.Host + ":" +
strconv.FormatUint(uint64(cfg.Primary.Port), 10) + ")/?multiStatements=true"
primaryDB, err := sql.Open("mysql", primaryDSN)
if err != nil {
replica.Fatalf("Failed to connect to primary MariaDB: %v", err)
}
defer primaryDB.Close()
primaryDB.SetMaxOpenConns(25)
primaryDB.SetMaxIdleConns(5)
if err := primaryDB.Ping(); err != nil {
replica.Fatalf("Failed to ping primary MariaDB: %v", err)
}
replica.Info("Connected to primary MariaDB")
// Create multi-service manager
multiService := replica.NewMultiBinlogSyncService()
// Connect to each secondary and create services
for _, secCfg := range cfg.Secondaries {
replica.Infof("Connecting to secondary: %s (%s:%d)", secCfg.Name, secCfg.Host, secCfg.Port)
secondaryDB, err := sql.Open("mysql", secCfg.DSN)
if err != nil {
replica.Fatalf("Failed to connect to secondary %s: %v", secCfg.Name, err)
}
defer secondaryDB.Close()
secondaryDB.SetMaxOpenConns(25)
secondaryDB.SetMaxIdleConns(5)
if err := secondaryDB.Ping(); err != nil {
replica.Fatalf("Failed to ping secondary %s: %v", secCfg.Name, err)
}
replica.Infof("Connected to secondary: %s", secCfg.Name)
// Create service for this secondary
service := replica.NewBinlogSyncService(cfg.Primary, primaryDB, secondaryDB, secCfg.Name)
multiService.AddService(service)
}
// Start all services
replica.Info("Starting binlog replication...")
multiService.StartAll()
// Wait for shutdown signal
sig := <-sigChan
replica.Infof("Received %v, shutting down...", sig)
// Stop all services
multiService.StopAll()
replica.Info("Service stopped.")
}

80
docker-compose.yml Normal file
View File

@@ -0,0 +1,80 @@
services:
mariadb-primary:
image: mariadb:latest
container_name: ${MARIA_PRIMARY_NAME}
command:
- --innodb_buffer_pool_size=536870912
- --key_buffer_size=67108864
- --query_cache_type=1
- --query_cache_size=134217728
- --query-cache-strip-comments=1
- --max-connections=256
- --log_bin=log_bin
- --binlog_format=ROW
- --server-id=${MARIA_SERVER_ID}
ports:
- "${MARIA_PRIMARY_PORT}:3306"
networks:
- repl
volumes:
- mariadb-primary-data:/var/lib/mysql
environment:
MARIADB_USER: ${MARIA_USER}
MARIADB_PASSWORD: ${MARIA_PASS}
MYSQL_DATABASE: ${MARIA_NAME}
MYSQL_ROOT_PASSWORD: ${MARIA_PASS}
restart: always
mariadb-secondary:
image: mariadb:latest
container_name: secondary
command:
- --innodb_buffer_pool_size=536870912
- --key_buffer_size=67108864
- --max-connections=256
- --server-id=2
# - --read_only=ON
- --relay-log=relay-log
# - --log_bin=log_bin
# - --binlog_format=ROW
ports:
- "3307:3306"
networks:
- repl
volumes:
- mariadb-secondary-data:/var/lib/mysql
environment:
MARIADB_USER: ${MARIA_USER}
MARIADB_PASSWORD: ${MARIA_PASS}
MYSQL_DATABASE: ${MARIA_NAME}
MYSQL_ROOT_PASSWORD: ${MARIA_PASS}
restart: always
depends_on:
- mariadb-primary
# postgresql:
# container_name: ${POSTGRES_HOST}
# restart: always
# image: postgres:18
# networks:
# repl:
# ports:
# - 5432:5432
# volumes:
# - postgres-data:/var/lib/postgresql:Z
# command: postgres -c shared_buffers=512MB -c work_mem=16MB -c maintenance_work_mem=256MB -c effective_cache_size=4GB -c max_connections=20
# environment:
# POSTGRES_USER: ${POSTGRES_USER}
# POSTGRES_PASSWORD: ${POSTGRES_PASS}
# POSTGRES_DB: ${POSTGRES_NAME}
networks:
repl:
name: repl
volumes:
mariadb-primary-data:
driver: local
mariadb-secondary-data:
driver: local

34
example.env Normal file
View File

@@ -0,0 +1,34 @@
# Primary MariaDB Configuration
MARIA_USER=replica
MARIA_PASS=replica
MARIA_SERVER_ID=100
MARIA_PRIMARY_HOST=mariadb-primary
MARIA_PRIMARY_PORT=3306
MARIA_PRIMARY_NAME=mariadb-primary
# Secondary MariaDB Configuration (comma-separated for multiple)
# Format: host1:port1,host2:port2,host3:port3
# Or just hostnames and use MARIA_SECONDARY_PORTS for ports
MARIA_SECONDARY_HOSTS=mariadb-secondary-1,mariadb-secondary-2,mariadb-secondary-3
MARIA_SECONDARY_PORTS=3307,3308,3309
MARIA_SECONDARY_NAMES=secondary-1,secondary-2,secondary-3
# Optional: Override per-secondary credentials (must match number of secondaries or use defaults)
# MARIA_SECONDARY_USERS=replica1,replica2,replica3
# MARIA_SECONDARY_PASSWORDS=pass1,pass2,pass3
# Legacy single secondary (for backward compatibility)
# MARIA_SECONDARY_HOST=mariadb-secondary
# Transfer Settings
TRANSFER_BATCH_SIZE=1000
# Project name for logging
LOCAL_PROJECT_NAME=naluconcept
# Graylog Configuration
GRAYLOG_ENABLED=false
GRAYLOG_ENDPOINT=localhost:12201
GRAYLOG_PROTOCOL=udp
GRAYLOG_TIMEOUT=5s
GRAYLOG_SOURCE=binlog-sync

24
go.mod Normal file
View File

@@ -0,0 +1,24 @@
module git.ma-al.com/goc_marek/replica
go 1.23.0
require (
github.com/go-mysql-org/go-mysql v1.13.0
github.com/go-sql-driver/mysql v1.8.1
)
require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/pingcap/errors v0.11.5-0.20250318082626-8f80e5cb09ec // indirect
github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a // indirect
github.com/pingcap/tidb/pkg/parser v0.0.0-20250421232622-526b2c79173d // indirect
github.com/shopspring/decimal v1.2.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/text v0.24.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
)

75
go.sum Normal file
View File

@@ -0,0 +1,75 @@
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-mysql-org/go-mysql v1.13.0 h1:Hlsa5x1bX/wBFtMbdIOmb6YzyaVNBWnwrb8gSIEPMDc=
github.com/go-mysql-org/go-mysql v1.13.0/go.mod h1:FQxw17uRbFvMZFK+dPtIPufbU46nBdrGaxOw0ac9MFs=
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20250318082626-8f80e5cb09ec h1:3EiGmeJWoNixU+EwllIn26x6s4njiWRXewdx2zlYa84=
github.com/pingcap/errors v0.11.5-0.20250318082626-8f80e5cb09ec/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a h1:WIhmJBlNGmnCWH6TLMdZfNEDaiU8cFpZe3iaqDbQ0M8=
github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA=
github.com/pingcap/tidb/pkg/parser v0.0.0-20250421232622-526b2c79173d h1:3Ej6eTuLZp25p3aH/EXdReRHY12hjZYs3RrGp7iLdag=
github.com/pingcap/tidb/pkg/parser v0.0.0-20250421232622-526b2c79173d/go.mod h1:+8feuexTKcXHZF/dkDfvCwEyBAmgb4paFc3/WeYV2eE=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

256
pkg/replica/config.go Normal file
View File

@@ -0,0 +1,256 @@
package replica
import (
"os"
"strconv"
"strings"
"time"
)
// BinlogConfig holds the configuration for connecting to MySQL/MariaDB binlog
type BinlogConfig struct {
Host string
Port uint16
User string
Password string
ServerID uint32
Name string // Instance name for logging
}
// SecondaryConfig holds the configuration for a secondary database
type SecondaryConfig struct {
Name string // Friendly name for logging
Host string
Port uint16
User string
Password string
DSN string // Pre-built DSN for convenience
}
// GraylogConfig holds the configuration for Graylog integration
type GraylogConfig struct {
Enabled bool
Endpoint string
Protocol string
Timeout time.Duration
Source string
ExtraFields map[string]interface{}
}
// AppConfig holds the complete application configuration
type AppConfig struct {
// Primary configuration
Primary BinlogConfig
// Secondary configurations
Secondaries []SecondaryConfig
// Transfer settings
BatchSize int
ExcludeSchemas []string
// Graylog configuration
Graylog GraylogConfig
}
// LoadEnvConfig loads configuration from environment variables
func LoadEnvConfig() (*AppConfig, error) {
cfg := &AppConfig{
BatchSize: 1000,
ExcludeSchemas: []string{"information_schema", "performance_schema", "mysql", "sys"},
}
// Primary configuration
cfg.Primary.Host = getEnv("MARIA_PRIMARY_HOST", "localhost")
cfg.Primary.Port = uint16(getEnvInt("MARIA_PRIMARY_PORT", 3306))
cfg.Primary.User = getEnv("MARIA_USER", "replica")
cfg.Primary.Password = getEnv("MARIA_PASS", "replica")
cfg.Primary.ServerID = uint32(getEnvInt("MARIA_SERVER_ID", 100))
cfg.Primary.Name = getEnv("MARIA_PRIMARY_NAME", "mariadb-primary")
// Parse secondary hosts (comma-separated)
secondaryHosts := getEnv("MARIA_SECONDARY_HOSTS", "")
if secondaryHosts == "" {
// Fallback to single secondary for backward compatibility
secondaryHosts = getEnv("MARIA_SECONDARY_HOST", "localhost")
}
// Parse secondary ports (comma-separated, must match number of hosts or be single value)
secondaryPortsStr := getEnv("MARIA_SECONDARY_PORTS", "")
secondaryPorts := parseUint16List(secondaryPortsStr, 3307)
// Parse secondary users (comma-separated, optional)
secondaryUsers := getEnv("MARIA_SECONDARY_USERS", "")
users := parseStringList(secondaryUsers, cfg.Primary.User)
// Parse secondary passwords (comma-separated, optional)
secondaryPasswords := getEnv("MARIA_SECONDARY_PASSWORDS", "")
passwords := parseStringList(secondaryPasswords, cfg.Primary.Password)
// Parse secondary names (comma-separated, optional)
secondaryNames := getEnv("MARIA_SECONDARY_NAMES", "")
hosts := parseStringList(secondaryHosts, "")
names := parseStringList(secondaryNames, "")
portMap := makePortsMap(hosts, secondaryPorts, 3307)
userMap := makeStringMap(hosts, users, cfg.Primary.User)
passMap := makeStringMap(hosts, passwords, cfg.Primary.Password)
// Build secondary configurations
for i, host := range hosts {
name := strconv.Itoa(i + 1)
if i < len(names) && names[i] != "" {
name = names[i]
}
port := uint16(3307)
if p, ok := portMap[host]; ok {
port = p
}
user := cfg.Primary.User
if u, ok := userMap[host]; ok {
user = u
}
pass := cfg.Primary.Password
if p, ok := passMap[host]; ok {
pass = p
}
dsn := buildDSN(user, pass, host, int(port), "replica")
cfg.Secondaries = append(cfg.Secondaries, SecondaryConfig{
Name: name,
Host: host,
Port: port,
User: user,
Password: pass,
DSN: dsn,
})
}
// Batch size override
if batchSize := getEnvInt("TRANSFER_BATCH_SIZE", 0); batchSize > 0 {
cfg.BatchSize = batchSize
}
// Graylog configuration
cfg.Graylog.Enabled = getEnvBool("GRAYLOG_ENABLED", false)
cfg.Graylog.Endpoint = getEnv("GRAYLOG_ENDPOINT", "localhost:12201")
cfg.Graylog.Protocol = getEnv("GRAYLOG_PROTOCOL", "udp")
cfg.Graylog.Source = getEnv("GRAYLOG_SOURCE", "binlog-sync")
// Parse timeout
if timeout := getEnv("GRAYLOG_TIMEOUT", "5s"); timeout != "" {
d, err := time.ParseDuration(timeout)
if err == nil {
cfg.Graylog.Timeout = d
} else {
cfg.Graylog.Timeout = 5 * time.Second
}
}
return cfg, nil
}
// getEnv gets an environment variable with a default value
func getEnv(key, defaultValue string) string {
value := os.Getenv(key)
if value == "" {
return defaultValue
}
return value
}
// getEnvInt gets an environment variable as an integer with a default value
func getEnvInt(key string, defaultValue int) int {
value := os.Getenv(key)
if value == "" {
return defaultValue
}
i, err := strconv.Atoi(value)
if err != nil {
return defaultValue
}
return i
}
// getEnvBool gets an environment variable as a boolean with a default value
func getEnvBool(key string, defaultValue bool) bool {
value := os.Getenv(key)
if value == "" {
return defaultValue
}
b, err := strconv.ParseBool(value)
if err != nil {
return defaultValue
}
return b
}
// parseStringList parses a comma-separated string into a list
func parseStringList(s string, defaultValue string) []string {
if strings.TrimSpace(s) == "" {
if defaultValue == "" {
return nil
}
return []string{defaultValue}
}
parts := strings.Split(s, ",")
result := make([]string, len(parts))
for i, p := range parts {
result[i] = strings.TrimSpace(p)
}
return result
}
// parseUint16List parses a comma-separated string into a list of uint16
func parseUint16List(s string, defaultValue uint16) []uint16 {
if strings.TrimSpace(s) == "" {
return []uint16{defaultValue}
}
parts := strings.Split(s, ",")
result := make([]uint16, len(parts))
for i, p := range parts {
p = strings.TrimSpace(p)
val, err := strconv.ParseUint(p, 10, 16)
if err != nil {
result[i] = defaultValue
} else {
result[i] = uint16(val)
}
}
return result
}
// makePortsMap creates a map of host to port
func makePortsMap(hosts []string, ports []uint16, defaultPort uint16) map[string]uint16 {
result := make(map[string]uint16)
for i, host := range hosts {
if i < len(ports) {
result[host] = ports[i]
} else {
result[host] = defaultPort
}
}
return result
}
// makeStringMap creates a map of host to string value
func makeStringMap(hosts []string, values []string, defaultValue string) map[string]string {
result := make(map[string]string)
for i, host := range hosts {
if i < len(values) && values[i] != "" {
result[host] = values[i]
} else {
result[host] = defaultValue
}
}
return result
}
// buildDSN builds a MySQL DSN string
func buildDSN(user, password, host string, port int, database string) string {
return user + ":" + password + "@tcp(" + host + ":" + strconv.Itoa(port) + ")/" + database + "?multiStatements=true"
}

420
pkg/replica/handlers.go Normal file
View File

@@ -0,0 +1,420 @@
package replica
import (
"database/sql"
"fmt"
"sync"
"time"
"github.com/go-mysql-org/go-mysql/replication"
)
// EventHandlers handles binlog event processing with resilience features
type EventHandlers struct {
secondaryDB *sql.DB
secondaryName string
tableMapCache map[uint64]*replication.TableMapEvent
tableMapMu sync.RWMutex
sqlBuilder *SQLBuilder
failedTables map[string]int // tableName -> consecutive failure count
failedTablesMu sync.RWMutex
maxFailures int // max consecutive failures before skipping
retryAttempts int // number of retry attempts
retryDelay time.Duration // base retry delay
lastSchemaHash map[string]string // tableName -> schema hash
lastSchemaMu sync.RWMutex
}
// NewEventHandlers creates new event handlers with default resilience settings
func NewEventHandlers(secondaryDB *sql.DB, secondaryName string) *EventHandlers {
return &EventHandlers{
secondaryDB: secondaryDB,
secondaryName: secondaryName,
tableMapCache: make(map[uint64]*replication.TableMapEvent),
sqlBuilder: NewSQLBuilder(),
failedTables: make(map[string]int),
maxFailures: 5,
retryAttempts: 3,
retryDelay: 100 * time.Millisecond,
lastSchemaHash: make(map[string]string),
}
}
// HandleRows processes row-level events with panic recovery and retry logic
func (h *EventHandlers) HandleRows(header *replication.EventHeader, e *replication.RowsEvent) error {
// Panic recovery wrapper
defer func() {
if r := recover(); r != nil {
Errorf("[%s][PANIC RECOVERED] HandleRows panic: %v", h.secondaryName, r)
}
}()
tableName := string(e.Table.Table)
schemaName := string(e.Table.Schema)
if schemaName != "replica" {
return nil
}
// Check if table is temporarily skipped due to failures
if h.isTableSkipped(schemaName, tableName) {
Warn("[%s][SKIPPED] Skipping event for %s.%s (too many failures)", h.secondaryName, schemaName, tableName)
return nil
}
eventType := h.getEventTypeName(header.EventType)
Info("[%s][%s] %s.%s", h.secondaryName, eventType, schemaName, tableName)
if h.secondaryDB != nil {
// Schema drift detection
if h.detectSchemaDrift(schemaName, tableName) {
Warn("[%s][WARN] Schema drift detected for %s.%s, pausing replication", h.secondaryName, schemaName, tableName)
h.markTableFailed(schemaName, tableName)
return fmt.Errorf("schema drift detected")
}
h.handleSecondaryReplication(e, header.EventType)
}
return nil
}
// HandleQuery processes query events with panic recovery
func (h *EventHandlers) HandleQuery(e *replication.QueryEvent) error {
// Panic recovery wrapper
defer func() {
if r := recover(); r != nil {
Errorf("[%s][PANIC RECOVERED] HandleQuery panic: %v", h.secondaryName, r)
}
}()
query := string(e.Query)
if h.secondaryDB != nil {
_, err := h.secondaryDB.Exec(query)
if err != nil {
Errorf("[%s][ERROR] Query failed: %v", h.secondaryName, err)
}
}
return nil
}
// HandleTableMap caches table map events
func (h *EventHandlers) HandleTableMap(e *replication.TableMapEvent) {
// Panic recovery wrapper
defer func() {
if r := recover(); r != nil {
Errorf("[%s][PANIC RECOVERED] HandleTableMap panic: %v", h.secondaryName, r)
}
}()
tableID := (uint64(e.TableID) << 8) | uint64(e.TableID>>56)
h.tableMapMu.Lock()
h.tableMapCache[tableID] = e
h.tableMapMu.Unlock()
}
// GetTableMap returns the cached table map for a table ID
func (h *EventHandlers) GetTableMap(tableID uint64) *replication.TableMapEvent {
h.tableMapMu.RLock()
defer h.tableMapMu.RUnlock()
return h.tableMapCache[tableID]
}
// isTableSkipped checks if a table should be skipped due to too many failures
func (h *EventHandlers) isTableSkipped(schema, table string) bool {
key := schema + "." + table
h.failedTablesMu.RLock()
defer h.failedTablesMu.RUnlock()
return h.failedTables[key] >= h.maxFailures
}
// markTableFailed records a failure for a table
func (h *EventHandlers) markTableFailed(schema, table string) {
key := schema + "." + table
h.failedTablesMu.Lock()
h.failedTables[key]++
failCount := h.failedTables[key]
h.failedTablesMu.Unlock()
Warn("[%s][FAILURE] %s.%s failure count: %d/%d", h.secondaryName, schema, table, failCount, h.maxFailures)
}
// markTableSuccess records a successful operation for a table
func (h *EventHandlers) markTableSuccess(schema, table string) {
key := schema + "." + table
h.failedTablesMu.Lock()
h.failedTables[key] = 0 // Reset failure count on success
h.failedTablesMu.Unlock()
}
// detectSchemaDrift checks if the table schema has changed
func (h *EventHandlers) detectSchemaDrift(schema, table string) bool {
key := schema + "." + table
// Get current schema hash
currentHash, err := h.getSchemaHash(schema, table)
if err != nil {
Warn("[%s][WARN] Could not get schema hash for %s.%s: %v", h.secondaryName, schema, table, err)
return false
}
h.lastSchemaMu.RLock()
lastHash, exists := h.lastSchemaHash[key]
h.lastSchemaMu.RUnlock()
if !exists {
// First time seeing this table
h.lastSchemaMu.Lock()
h.lastSchemaHash[key] = currentHash
h.lastSchemaMu.Unlock()
return false
}
if lastHash != currentHash {
Warn("[%s][DRIFT] Schema changed for %s.%s: %s -> %s", h.secondaryName, schema, table, lastHash, currentHash)
return true
}
return false
}
// getSchemaHash returns a hash of the table schema
func (h *EventHandlers) getSchemaHash(schema, table string) (string, error) {
query := fmt.Sprintf(
"SELECT MD5(GROUP_CONCAT(COLUMN_NAME, ':', DATA_TYPE, ':', IS_NULLABLE ORDER BY ORDINAL_POSITION)) "+
"FROM INFORMATION_SCHEMA.COLUMNS "+
"WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s'",
schema, table,
)
var hash string
err := h.secondaryDB.QueryRow(query).Scan(&hash)
return hash, err
}
func (h *EventHandlers) handleSecondaryReplication(e *replication.RowsEvent, eventType replication.EventType) {
schemaName := string(e.Table.Schema)
tableName := string(e.Table.Table)
tableID := (uint64(e.TableID) << 8) | uint64(e.TableID>>56)
tableMap := h.GetTableMap(tableID)
if tableMap == nil {
tableMap = e.Table
}
if len(tableMap.ColumnName) == 0 {
columns, err := h.fetchColumnNames(schemaName, tableName)
if err != nil {
Errorf("[%s][ERROR] Failed to fetch columns: %v", h.secondaryName, err)
return
}
columnBytes := make([][]byte, len(columns))
for i, col := range columns {
columnBytes[i] = []byte(col)
}
tableMap = &replication.TableMapEvent{
Schema: e.Table.Schema,
Table: e.Table.Table,
ColumnName: columnBytes,
}
}
switch eventType {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
h.replicateInsert(tableMap, schemaName, tableName, e.Rows)
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
h.replicateUpdate(tableMap, schemaName, tableName, e.Rows)
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
h.replicateDelete(tableMap, schemaName, tableName, e.Rows)
}
}
// replicateInsert inserts rows with retry logic
func (h *EventHandlers) replicateInsert(tableMap *replication.TableMapEvent, schema, table string, rows [][]interface{}) {
for _, row := range rows {
query := h.sqlBuilder.BuildInsert(schema, table, tableMap, row)
err := h.executeWithRetry(query)
if err != nil {
Errorf("[%s][ERROR] INSERT failed after retries: %v", h.secondaryName, err)
h.markTableFailed(schema, table)
} else {
h.markTableSuccess(schema, table)
}
}
}
// replicateUpdate updates rows with retry logic
func (h *EventHandlers) replicateUpdate(tableMap *replication.TableMapEvent, schema, table string, rows [][]interface{}) {
for i := 0; i < len(rows); i += 2 {
query := h.sqlBuilder.BuildUpdate(schema, table, tableMap, rows[i], rows[i+1])
err := h.executeWithRetry(query)
if err != nil {
Errorf("[%s][ERROR] UPDATE failed after retries: %v", h.secondaryName, err)
h.markTableFailed(schema, table)
} else {
h.markTableSuccess(schema, table)
}
}
}
// replicateDelete deletes rows with retry logic
func (h *EventHandlers) replicateDelete(tableMap *replication.TableMapEvent, schema, table string, rows [][]interface{}) {
for _, row := range rows {
query := h.sqlBuilder.BuildDelete(schema, table, tableMap, row)
err := h.executeWithRetry(query)
if err != nil {
Errorf("[%s][ERROR] DELETE failed after retries: %v", h.secondaryName, err)
h.markTableFailed(schema, table)
} else {
h.markTableSuccess(schema, table)
}
}
}
// executeWithRetry executes a query with exponential backoff retry
func (h *EventHandlers) executeWithRetry(query string) error {
var lastErr error
for attempt := 0; attempt <= h.retryAttempts; attempt++ {
if attempt > 0 {
delay := h.retryDelay * time.Duration(1<<attempt) // exponential backoff
Warn("[%s][RETRY] Retrying in %v (attempt %d/%d)", h.secondaryName, delay, attempt, h.retryAttempts)
time.Sleep(delay)
}
result, err := h.secondaryDB.Exec(query)
if err == nil {
if result != nil {
rowsAffected, _ := result.RowsAffected()
Info("[%s][SUCCESS] %d row(s) affected", h.secondaryName, rowsAffected)
}
return nil
}
lastErr = err
// Check if connection is dead
if h.isConnectionError(err) {
Errorf("[%s][CONNECTION ERROR] Detected connection error: %v", h.secondaryName, err)
h.reconnect()
}
}
return lastErr
}
// isConnectionError checks if the error is a connection-related error
func (h *EventHandlers) isConnectionError(err error) bool {
errStr := err.Error()
connectionErrors := []string{
"connection refused",
"connection reset",
"broken pipe",
"timeout",
"no such host",
"network is unreachable",
"driver: bad connection",
"invalid connection",
}
for _, ce := range connectionErrors {
if contains(errStr, ce) {
return true
}
}
return false
}
// reconnect attempts to reconnect to the secondary database
func (h *EventHandlers) reconnect() {
Warn("[%s][RECONNECT] Attempting to reconnect to secondary database...", h.secondaryName)
// Close existing connections
h.secondaryDB.Close()
// Attempt to re-establish connection
var err error
maxRetries := 5
for i := 0; i < maxRetries; i++ {
h.secondaryDB, err = sql.Open("mysql", "root:replica@tcp(localhost:3307)/replica?multiStatements=true")
if err == nil {
h.secondaryDB.SetMaxOpenConns(25)
h.secondaryDB.SetMaxIdleConns(5)
if err = h.secondaryDB.Ping(); err == nil {
Info("[%s][RECONNECT] Successfully reconnected to secondary database", h.secondaryName)
return
}
}
Errorf("[%s][RECONNECT] Reconnection attempt %d/%d failed: %v", h.secondaryName, i+1, maxRetries, err)
time.Sleep(time.Duration(i+1) * time.Second)
}
Errorf("[%s][RECONNECT] Failed to reconnect after %d attempts", h.secondaryName, maxRetries)
}
// PingSecondary performs a health check on the secondary connection
func (h *EventHandlers) PingSecondary() error {
if h.secondaryDB == nil {
return fmt.Errorf("secondary DB is nil")
}
return h.secondaryDB.Ping()
}
// GetSecondaryName returns the secondary name
func (h *EventHandlers) GetSecondaryName() string {
return h.secondaryName
}
func (h *EventHandlers) fetchColumnNames(schema, table string) ([]string, error) {
query := fmt.Sprintf(
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' ORDER BY ORDINAL_POSITION",
schema, table,
)
rows, err := h.secondaryDB.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
var columns []string
for rows.Next() {
var colName string
if err := rows.Scan(&colName); err != nil {
return nil, err
}
columns = append(columns, colName)
}
return columns, rows.Err()
}
func (h *EventHandlers) getEventTypeName(eventType replication.EventType) string {
switch eventType {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
return "INSERT"
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
return "UPDATE"
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
return "DELETE"
default:
return "UNKNOWN"
}
}
// contains checks if a string contains a substring
func contains(s, substr string) bool {
return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr))
}
func containsHelper(s, substr string) bool {
for i := 0; i <= len(s)-len(substr); i++ {
if s[i:i+len(substr)] == substr {
return true
}
}
return false
}

View File

@@ -0,0 +1,635 @@
package replica
import (
"database/sql"
"encoding/json"
"fmt"
"os"
"strings"
"sync"
"time"
_ "github.com/go-sql-driver/mysql"
)
// TransferStats holds statistics about the transfer
type TransferStats struct {
TotalRows int64
TotalTables int
TransferTime int64 // in milliseconds
Errors []string
Progress TransferProgress
}
// TransferProgress tracks the current position in the transfer
type TransferProgress struct {
DatabasesProcessed int
CurrentDatabase string
TablesProcessed map[string]int64 // tableName -> rows transferred
LastCheckpoint time.Time
}
// InitialTransfer handles the initial data transfer from primary to secondary
type InitialTransfer struct {
primaryDB *sql.DB
secondaryDB *sql.DB
batchSize int
workerCount int
excludedDBs map[string]bool
mu sync.Mutex
stats TransferStats
checkpointFile string
progress TransferProgress
pauseChan chan struct{}
resumeChan chan struct{}
isPaused bool
}
// NewInitialTransfer creates a new initial transfer handler
func NewInitialTransfer(primaryDSN, secondaryDSN string, batchSize, workerCount int) (*InitialTransfer, error) {
primaryDB, err := sql.Open("mysql", primaryDSN)
if err != nil {
return nil, fmt.Errorf("failed to connect to primary: %v", err)
}
primaryDB.SetMaxOpenConns(batchSize)
primaryDB.SetMaxIdleConns(2)
secondaryDB, err := sql.Open("mysql", secondaryDSN)
if err != nil {
primaryDB.Close()
return nil, fmt.Errorf("failed to connect to secondary: %v", err)
}
secondaryDB.SetMaxOpenConns(batchSize)
secondaryDB.SetMaxIdleConns(2)
if err := primaryDB.Ping(); err != nil {
primaryDB.Close()
secondaryDB.Close()
return nil, fmt.Errorf("failed to ping primary: %v", err)
}
if err := secondaryDB.Ping(); err != nil {
primaryDB.Close()
secondaryDB.Close()
return nil, fmt.Errorf("failed to ping secondary: %v", err)
}
return &InitialTransfer{
primaryDB: primaryDB,
secondaryDB: secondaryDB,
batchSize: batchSize,
workerCount: workerCount,
excludedDBs: map[string]bool{
"information_schema": true,
"performance_schema": true,
"mysql": true,
"sys": true,
},
checkpointFile: "transfer_progress.json",
progress: TransferProgress{
TablesProcessed: make(map[string]int64),
},
pauseChan: make(chan struct{}),
resumeChan: make(chan struct{}),
}, nil
}
// Transfer executes the initial data transfer
func (t *InitialTransfer) Transfer(excludeSchemas []string) error {
startTime := time.Now()
// Load previous progress if exists
t.loadProgress()
// Add user-specified exclusions
for _, db := range excludeSchemas {
t.excludedDBs[db] = true
}
// Get list of databases to transfer
databases, err := t.getDatabases()
if err != nil {
return fmt.Errorf("failed to get databases: %v", err)
}
Info("[TRANSFER] Starting initial data transfer...")
Infof("[TRANSFER] Found %d databases to transfer", len(databases))
// Get current binlog position before transfer
binlogPos, err := t.getBinlogPosition()
if err != nil {
Warnf("[WARN] Failed to get binlog position: %v", err)
}
for i, dbName := range databases {
// Check for pause signal
t.checkPause()
if t.excludedDBs[dbName] {
Infof("[TRANSFER] Skipping excluded database: %s", dbName)
continue
}
// Skip already processed databases
if i < t.progress.DatabasesProcessed {
Infof("[TRANSFER] Skipping already processed database: %s", dbName)
continue
}
t.progress.CurrentDatabase = dbName
t.saveProgress()
if err := t.transferDatabase(dbName); err != nil {
return fmt.Errorf("failed to transfer database %s: %v", dbName, err)
}
t.progress.DatabasesProcessed++
t.saveProgress()
}
t.stats.TransferTime = time.Since(startTime).Milliseconds()
Infof("[TRANSFER] Transfer completed: %d tables, %d rows in %dms",
t.stats.TotalTables, t.stats.TotalRows, t.stats.TransferTime)
if binlogPos != "" {
Infof("[TRANSFER] Binlog position before transfer: %s", binlogPos)
}
// Clear progress on successful completion
t.clearProgress()
return nil
}
// checkPause checks if transfer should be paused
func (t *InitialTransfer) checkPause() {
if t.isPaused {
Info("[TRANSFER] Transfer paused, waiting for resume...")
<-t.resumeChan
Info("[TRANSFER] Transfer resumed")
}
}
// Pause pauses the transfer
func (t *InitialTransfer) Pause() {
if !t.isPaused {
t.isPaused = true
t.pauseChan <- struct{}{}
Info("[TRANSFER] Transfer pause requested")
}
}
// Resume resumes the transfer
func (t *InitialTransfer) Resume() {
if t.isPaused {
t.isPaused = false
t.resumeChan <- struct{}{}
Info("[TRANSFER] Transfer resume requested")
}
}
// saveProgress saves the current progress to a checkpoint file
func (t *InitialTransfer) saveProgress() {
t.progress.LastCheckpoint = time.Now()
data, err := json.MarshalIndent(t.progress, "", " ")
if err != nil {
Warnf("[WARN] Failed to marshal progress: %v", err)
return
}
err = os.WriteFile(t.checkpointFile, data, 0644)
if err != nil {
Warnf("[WARN] Failed to save progress: %v", err)
}
}
// loadProgress loads previous progress from checkpoint file
func (t *InitialTransfer) loadProgress() {
data, err := os.ReadFile(t.checkpointFile)
if err != nil {
Info("[INFO] No previous progress found, starting fresh")
return
}
var progress TransferProgress
if err := json.Unmarshal(data, &progress); err != nil {
Warnf("[WARN] Failed to load progress: %v", err)
return
}
t.progress = progress
Infof("[INFO] Resuming transfer: %d databases, %d tables in progress",
progress.DatabasesProcessed, len(progress.TablesProcessed))
}
// clearProgress removes the checkpoint file
func (t *InitialTransfer) clearProgress() {
os.Remove(t.checkpointFile)
}
// GetProgress returns the current transfer progress
func (t *InitialTransfer) GetProgress() TransferProgress {
t.mu.Lock()
defer t.mu.Unlock()
return t.progress
}
// getDatabases returns list of databases to transfer
func (t *InitialTransfer) getDatabases() ([]string, error) {
rows, err := t.primaryDB.Query("SHOW DATABASES")
if err != nil {
return nil, err
}
defer rows.Close()
var databases []string
for rows.Next() {
var dbName string
if err := rows.Scan(&dbName); err != nil {
return nil, err
}
databases = append(databases, dbName)
}
return databases, rows.Err()
}
// transferDatabase transfers all tables in a database
func (t *InitialTransfer) transferDatabase(dbName string) error {
Infof("[TRANSFER] Transferring database: %s", dbName)
// Get list of tables
tables, err := t.getTables(dbName)
if err != nil {
return fmt.Errorf("failed to get tables: %v", err)
}
Infof("[TRANSFER] Found %d tables in %s", len(tables), dbName)
for _, table := range tables {
// Check for pause signal
t.checkPause()
// Skip already processed tables
tableKey := dbName + "." + table
if t.progress.TablesProcessed[tableKey] > 0 {
Infof("[TRANSFER] Skipping already processed table: %s", tableKey)
continue
}
if err := t.transferTable(dbName, table); err != nil {
Errorf("[ERROR] Failed to transfer table %s.%s: %v", dbName, table, err)
t.mu.Lock()
t.stats.Errors = append(t.stats.Errors, fmt.Sprintf("%s.%s: %v", dbName, table, err))
t.mu.Unlock()
}
}
return nil
}
// getTables returns list of tables in a database
func (t *InitialTransfer) getTables(dbName string) ([]string, error) {
query := fmt.Sprintf("SHOW TABLES FROM `%s`", dbName)
rows, err := t.primaryDB.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
var tables []string
for rows.Next() {
var tableName string
if err := rows.Scan(&tableName); err != nil {
return nil, err
}
tables = append(tables, tableName)
}
return tables, rows.Err()
}
// transferTable transfers a single table with chunked reads
func (t *InitialTransfer) transferTable(dbName, tableName string) error {
// Get table structure
schema, err := t.getTableSchema(dbName, tableName)
if err != nil {
return fmt.Errorf("failed to get table schema: %v", err)
}
// Check if table has data
hasData, err := t.tableHasData(dbName, tableName)
if err != nil {
return fmt.Errorf("failed to check table data: %v", err)
}
if !hasData {
Infof("[TRANSFER] Table %s.%s is empty, skipping data transfer", dbName, tableName)
return nil
}
// Get row count
count, err := t.getRowCount(dbName, tableName)
if err != nil {
return fmt.Errorf("failed to get row count: %v", err)
}
Infof("[TRANSFER] Transferring %s.%s (%d rows)", dbName, tableName, count)
// Get primary key or first unique column for chunking
pkColumn, err := t.getPrimaryKey(dbName, tableName)
if err != nil {
Warnf("[WARN] No primary key found for %s.%s, using full scan", dbName, tableName)
return t.transferTableFullScan(dbName, tableName, schema, count)
}
return t.transferTableChunked(dbName, tableName, schema, pkColumn, count)
}
// tableHasData checks if a table has any rows
func (t *InitialTransfer) tableHasData(dbName, tableName string) (bool, error) {
query := fmt.Sprintf("SELECT 1 FROM `%s`.`%s` LIMIT 1", dbName, tableName)
var exists int
err := t.primaryDB.QueryRow(query).Scan(&exists)
if err == sql.ErrNoRows {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}
// getRowCount returns the number of rows in a table
func (t *InitialTransfer) getRowCount(dbName, tableName string) (int64, error) {
query := fmt.Sprintf("SELECT COUNT(*) FROM `%s`.`%s`", dbName, tableName)
var count int64
err := t.primaryDB.QueryRow(query).Scan(&count)
return count, err
}
// getTableSchema returns the column definitions for a table
func (t *InitialTransfer) getTableSchema(dbName, tableName string) ([]ColumnInfo, error) {
query := fmt.Sprintf(
"SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COLUMN_KEY, EXTRA "+
"FROM INFORMATION_SCHEMA.COLUMNS "+
"WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' "+
"ORDER BY ORDINAL_POSITION",
dbName, tableName,
)
rows, err := t.primaryDB.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
var columns []ColumnInfo
for rows.Next() {
var col ColumnInfo
if err := rows.Scan(&col.Name, &col.Type, &col.Nullable, &col.Key, &col.Extra); err != nil {
return nil, err
}
columns = append(columns, col)
}
return columns, rows.Err()
}
// ColumnInfo holds information about a table column
type ColumnInfo struct {
Name string
Type string
Nullable string
Key string
Extra string
}
// getPrimaryKey returns the primary key column for a table
func (t *InitialTransfer) getPrimaryKey(dbName, tableName string) (string, error) {
query := fmt.Sprintf(
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE "+
"WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND CONSTRAINT_NAME = 'PRIMARY' "+
"ORDER BY ORDINAL_POSITION LIMIT 1",
dbName, tableName,
)
var pkColumn string
err := t.primaryDB.QueryRow(query).Scan(&pkColumn)
if err == sql.ErrNoRows {
return "", fmt.Errorf("no primary key")
}
return pkColumn, err
}
// transferTableChunked transfers a table using primary key chunks
func (t *InitialTransfer) transferTableChunked(dbName, tableName string, columns []ColumnInfo, pkColumn string, rowCount int64) error {
tableKey := dbName + "." + tableName
// Get starting offset from progress
startOffset := t.progress.TablesProcessed[tableKey]
// Get min and max primary key values
minMax, err := t.getMinMaxPK(dbName, tableName, pkColumn)
if err != nil {
return fmt.Errorf("failed to get min/max: %v", err)
}
// Adjust start offset to be within range
if startOffset < minMax.Min {
startOffset = minMax.Min
}
// Transfer chunks
offset := startOffset
for offset < minMax.Max {
// Check for pause signal
t.checkPause()
query := fmt.Sprintf(
"SELECT * FROM `%s`.`%s` WHERE `%s` >= %d AND `%s` < %d ORDER BY `%s`",
dbName, tableName, pkColumn, offset, pkColumn, offset+int64(t.batchSize), pkColumn,
)
rows, err := t.primaryDB.Query(query)
if err != nil {
return fmt.Errorf("failed to query chunk: %v", err)
}
rowsInserted, err := t.insertRows(t.secondaryDB, dbName, tableName, columns, rows)
rows.Close()
if err != nil {
return fmt.Errorf("failed to insert chunk: %v", err)
}
// Update progress
t.mu.Lock()
t.progress.TablesProcessed[tableKey] = offset + int64(t.batchSize)
t.mu.Unlock()
// Save checkpoint every 1000 rows
if rowsInserted%1000 == 0 {
t.saveProgress()
}
offset += int64(t.batchSize)
}
return nil
}
// MinMax holds min and max values
type MinMax struct {
Min int64
Max int64
}
// getMinMaxPK returns the min and max primary key values
func (t *InitialTransfer) getMinMaxPK(dbName, tableName, pkColumn string) (*MinMax, error) {
query := fmt.Sprintf(
"SELECT COALESCE(MIN(`%s`), 0), COALESCE(MAX(`%s`), 0) FROM `%s`.`%s`",
pkColumn, pkColumn, dbName, tableName,
)
var minVal, maxVal int64
err := t.primaryDB.QueryRow(query).Scan(&minVal, &maxVal)
if err != nil {
return nil, err
}
return &MinMax{Min: minVal, Max: maxVal + 1}, nil
}
// transferTableFullScan transfers a table without primary key (slower, uses LIMIT/OFFSET)
func (t *InitialTransfer) transferTableFullScan(dbName, tableName string, columns []ColumnInfo, rowCount int64) error {
tableKey := dbName + "." + tableName
// Get starting offset from progress
startOffset := t.progress.TablesProcessed[tableKey]
var offset int64 = startOffset
for offset < rowCount {
// Check for pause signal
t.checkPause()
query := fmt.Sprintf(
"SELECT * FROM `%s`.`%s` LIMIT %d OFFSET %d",
dbName, tableName, t.batchSize, offset,
)
rows, err := t.primaryDB.Query(query)
if err != nil {
return fmt.Errorf("failed to query chunk: %v", err)
}
rowsInserted, err := t.insertRows(t.secondaryDB, dbName, tableName, columns, rows)
rows.Close()
if err != nil {
return fmt.Errorf("failed to insert chunk: %v", err)
}
// Update progress
t.mu.Lock()
t.progress.TablesProcessed[tableKey] = offset
t.mu.Unlock()
// Save checkpoint every 1000 rows
if rowsInserted%1000 == 0 {
t.saveProgress()
}
offset += int64(t.batchSize)
}
return nil
}
// insertRows inserts rows from a query result into the secondary database
func (t *InitialTransfer) insertRows(db *sql.DB, dbName, tableName string, columns []ColumnInfo, rows *sql.Rows) (int64, error) {
// Build INSERT statement
placeholders := make([]string, len(columns))
colNames := make([]string, len(columns))
for i := range columns {
placeholders[i] = "?"
colNames[i] = fmt.Sprintf("`%s`", columns[i].Name)
}
insertSQL := fmt.Sprintf(
"INSERT INTO `%s`.`%s` (%s) VALUES (%s)",
dbName, tableName, strings.Join(colNames, ", "), strings.Join(placeholders, ", "),
)
// Prepare statement
stmt, err := db.Prepare(insertSQL)
if err != nil {
return 0, fmt.Errorf("failed to prepare statement: %v", err)
}
defer stmt.Close()
// Insert rows
rowCount := int64(0)
for rows.Next() {
values := make([]interface{}, len(columns))
valuePtrs := make([]interface{}, len(columns))
for i := range values {
valuePtrs[i] = &values[i]
}
if err := rows.Scan(valuePtrs...); err != nil {
return rowCount, fmt.Errorf("failed to scan row: %v", err)
}
_, err := stmt.Exec(values...)
if err != nil {
return rowCount, fmt.Errorf("failed to insert row: %v", err)
}
rowCount++
}
t.mu.Lock()
t.stats.TotalRows += rowCount
t.stats.TotalTables++
t.mu.Unlock()
Infof("[TRANSFER] Inserted %d rows into %s.%s", rowCount, dbName, tableName)
return rowCount, rows.Err()
}
// getBinlogPosition returns the current binlog position
func (t *InitialTransfer) getBinlogPosition() (string, error) {
var file string
var pos uint32
var doDB, ignoreDB string // Ignore extra columns from SHOW MASTER STATUS
err := t.primaryDB.QueryRow("SHOW MASTER STATUS").Scan(&file, &pos, &doDB, &ignoreDB)
if err == sql.ErrNoRows {
return "", nil
}
if err != nil {
return "", err
}
return fmt.Sprintf("%s:%d", file, pos), nil
}
// Close closes the database connections
func (t *InitialTransfer) Close() error {
if err := t.primaryDB.Close(); err != nil {
return err
}
return t.secondaryDB.Close()
}
// GetStats returns the transfer statistics
func (t *InitialTransfer) GetStats() TransferStats {
t.mu.Lock()
defer t.mu.Unlock()
return t.stats
}

393
pkg/replica/logging.go Normal file
View File

@@ -0,0 +1,393 @@
package replica
import (
"encoding/json"
"fmt"
"log/slog"
"net"
"os"
"sync"
"time"
)
// LogLevel represents the logging level
type LogLevel slog.Level
const (
LevelDebug LogLevel = LogLevel(slog.LevelDebug)
LevelInfo LogLevel = LogLevel(slog.LevelInfo)
LevelWarn LogLevel = LogLevel(slog.LevelWarn)
LevelError LogLevel = LogLevel(slog.LevelError)
)
// Logger is a structured logger with optional Graylog support
type Logger struct {
logger *slog.Logger
graylogMu sync.RWMutex
graylog *GraylogClient
level LogLevel
source string
}
// GraylogClient handles sending logs to Graylog
type GraylogClient struct {
conn net.Conn
source string
extra map[string]interface{}
mu sync.Mutex
connected bool
}
// GELFMessage represents a GELF message structure
type GELFMessage struct {
Version string `json:"version"`
Host string `json:"host"`
ShortMessage string `json:"short_message"`
FullMessage string `json:"full_message,omitempty"`
Timestamp float64 `json:"timestamp"`
Level int `json:"level"`
Source string `json:"source,omitempty"`
Extra map[string]interface{} `json:"-"`
}
// NewGraylogClient creates a new Graylog client
func NewGraylogClient(cfg GraylogConfig) (*GraylogClient, error) {
var conn net.Conn
var err error
protocol := cfg.Protocol
if protocol == "" {
protocol = "udp"
}
conn, err = net.DialTimeout(protocol, cfg.Endpoint, cfg.Timeout)
if err != nil {
return nil, fmt.Errorf("failed to connect to Graylog: %w", err)
}
return &GraylogClient{
conn: conn,
source: cfg.Source,
extra: cfg.ExtraFields,
connected: true,
}, nil
}
// send sends a GELF message to Graylog
func (g *GraylogClient) send(msg *GELFMessage) error {
g.mu.Lock()
defer g.mu.Unlock()
if !g.connected {
return nil
}
// Add extra fields
if g.extra != nil {
msg.Extra = g.extra
}
data, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal GELF message: %w", err)
}
// Add null byte as delimiter for UDP
var delimiter byte
if udpConn, ok := g.conn.(*net.UDPConn); ok {
delimiter = 0
_, err = udpConn.Write(append(data, delimiter))
} else {
_, err = g.conn.Write(data)
}
if err != nil {
g.connected = false
return fmt.Errorf("failed to send to Graylog: %w", err)
}
return nil
}
// Close closes the Graylog connection
func (g *GraylogClient) Close() error {
g.mu.Lock()
defer g.mu.Unlock()
if g.conn != nil {
g.connected = false
return g.conn.Close()
}
return nil
}
// NewLogger creates a new logger instance
func NewLogger() *Logger {
// Create slog handler with JSON output for structured logging
handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
})
return &Logger{
logger: slog.New(handler),
level: LevelInfo,
source: "binlog-sync",
}
}
// NewLoggerWithLevel creates a new logger with specified level
func NewLoggerWithLevel(level LogLevel) *Logger {
handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.Level(level),
})
return &Logger{
logger: slog.New(handler),
level: level,
source: "binlog-sync",
}
}
// SetupGraylog configures Graylog integration
func (l *Logger) SetupGraylog(cfg GraylogConfig) error {
client, err := NewGraylogClient(cfg)
if err != nil {
return err
}
l.graylogMu.Lock()
l.graylog = client
l.graylogMu.Unlock()
return nil
}
// With creates a new logger with additional context
func (l *Logger) With(args ...any) *Logger {
return &Logger{
logger: l.logger.With(args...),
level: l.level,
source: l.source,
}
}
// Debug logs a debug message
func (l *Logger) Debug(msg string, args ...any) {
l.logger.Debug(msg, args...)
l.sendToGraylog(slog.LevelDebug, msg, args)
}
// Info logs an info message
func (l *Logger) Info(msg string, args ...any) {
l.logger.Info(msg, args...)
l.sendToGraylog(slog.LevelInfo, msg, args)
}
// Warn logs a warning message
func (l *Logger) Warn(msg string, args ...any) {
l.logger.Warn(msg, args...)
l.sendToGraylog(slog.LevelWarn, msg, args)
}
// Error logs an error message
func (l *Logger) Error(msg string, args ...any) {
l.logger.Error(msg, args...)
l.sendToGraylog(slog.LevelError, msg, args)
}
// Fatal logs a fatal message and exits
func (l *Logger) Fatal(msg string, args ...any) {
l.logger.Error(msg, args...)
l.sendToGraylog(slog.LevelError, msg, args)
os.Exit(1)
}
// Debugf logs a formatted debug message
func (l *Logger) Debugf(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
l.logger.Debug(msg)
l.sendToGraylog(slog.LevelDebug, msg, nil)
}
// Infof logs a formatted info message
func (l *Logger) Infof(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
l.logger.Info(msg)
l.sendToGraylog(slog.LevelInfo, msg, nil)
}
// Warnf logs a formatted warning message
func (l *Logger) Warnf(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
l.logger.Warn(msg)
l.sendToGraylog(slog.LevelWarn, msg, nil)
}
// Errorf logs a formatted error message
func (l *Logger) Errorf(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
l.logger.Error(msg)
l.sendToGraylog(slog.LevelError, msg, nil)
}
// Fatalf logs a formatted fatal message and exits
func (l *Logger) Fatalf(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
l.logger.Error(msg)
l.sendToGraylog(slog.LevelError, msg, nil)
os.Exit(1)
}
// Log logs a message at the specified level
func (l *Logger) Log(level LogLevel, msg string, args ...any) {
switch level {
case LevelDebug:
l.logger.Debug(msg, args...)
case LevelInfo:
l.logger.Info(msg, args...)
case LevelWarn:
l.logger.Warn(msg, args...)
case LevelError:
l.logger.Error(msg, args...)
}
l.sendToGraylog(slog.Level(level), msg, args)
}
// sendToGraylog sends a log message to Graylog if configured
func (l *Logger) sendToGraylog(level slog.Level, msg string, args []any) {
l.graylogMu.RLock()
defer l.graylogMu.RUnlock()
if l.graylog == nil {
return
}
// Convert args to structured data
extra := make(map[string]interface{})
if args != nil {
for i := 0; i < len(args)-1; i += 2 {
if key, ok := args[i].(string); ok {
extra[key] = args[i+1]
}
}
}
gelfMsg := &GELFMessage{
Version: "1.1",
Host: l.source,
ShortMessage: msg,
Timestamp: float64(time.Now().UnixNano()) / 1e9,
Level: int(level) + 1, // GELF levels: 1=emerg, 2=alert, 3=crit, 4=err, 5=warn, 6=notice, 7=info, 8=debug
Source: l.source,
Extra: extra,
}
// Send in background to not block
go func() {
_ = l.graylog.send(gelfMsg)
}()
}
// Close closes the logger and any underlying connections
func (l *Logger) Close() {
l.graylogMu.Lock()
defer l.graylogMu.Unlock()
if l.graylog != nil {
l.graylog.Close()
l.graylog = nil
}
}
// GetLogger returns the underlying slog.Logger
func (l *Logger) GetLogger() *slog.Logger {
return l.logger
}
// Handler returns a slog.Handler for use with other libraries
func (l *Logger) Handler() slog.Handler {
return l.logger.Handler()
}
// Global logger instance - renamed to avoid conflict with log package
var globalLogger *Logger
// Init initializes the global logger
func InitLogger() {
globalLogger = NewLogger()
}
// InitLoggerWithLevel initializes the global logger with a specific level
func InitLoggerWithLevel(level LogLevel) {
globalLogger = NewLoggerWithLevel(level)
}
// SetupGlobalGraylog configures Graylog for the global logger
func SetupGlobalGraylog(cfg GraylogConfig) error {
return globalLogger.SetupGraylog(cfg)
}
// GetLogger returns the global logger instance
func GetLogger() *Logger {
if globalLogger == nil {
InitLogger()
}
return globalLogger
}
// Convenience functions using global logger
// Debug logs a debug message
func Debug(msg string, args ...any) {
GetLogger().Debug(msg, args...)
}
// Info logs an info message
func Info(msg string, args ...any) {
GetLogger().Info(msg, args...)
}
// Warn logs a warning message
func Warn(msg string, args ...any) {
GetLogger().Warn(msg, args...)
}
// Error logs an error message
func Error(msg string, args ...any) {
GetLogger().Error(msg, args...)
}
// Fatal logs a fatal message and exits
func Fatal(msg string, args ...any) {
GetLogger().Fatal(msg, args...)
}
// Debugf logs a formatted debug message
func Debugf(format string, args ...any) {
GetLogger().Debugf(format, args...)
}
// Infof logs a formatted info message
func Infof(format string, args ...any) {
GetLogger().Infof(format, args...)
}
// Warnf logs a formatted warning message
func Warnf(format string, args ...any) {
GetLogger().Warnf(format, args...)
}
// Errorf logs a formatted error message
func Errorf(format string, args ...any) {
GetLogger().Errorf(format, args...)
}
// Fatalf logs a formatted fatal message and exits
func Fatalf(format string, args ...any) {
GetLogger().Fatalf(format, args...)
}
// Log logs a message at the specified level
func Log(level LogLevel, msg string, args ...any) {
GetLogger().Log(level, msg, args...)
}

140
pkg/replica/position.go Normal file
View File

@@ -0,0 +1,140 @@
package replica
import (
"database/sql"
"encoding/json"
"os"
"github.com/go-mysql-org/go-mysql/mysql"
)
// PositionManager handles binlog position persistence
type PositionManager struct {
db *sql.DB
positionFile string
}
// NewPositionManager creates a new position manager
func NewPositionManager(db *sql.DB, positionFile string) *PositionManager {
return &PositionManager{
db: db,
positionFile: positionFile,
}
}
// InitTable creates the binlog_position table if it doesn't exist
func (pm *PositionManager) InitTable() error {
if pm.db == nil {
return nil
}
createTableSQL := `
CREATE TABLE IF NOT EXISTS binlog_position (
id INT PRIMARY KEY,
log_file VARCHAR(255),
log_pos BIGINT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
`
_, err := pm.db.Exec(createTableSQL)
if err != nil {
Errorf("[ERROR] Failed to create binlog_position table: %v", err)
return err
}
return nil
}
// Load loads the last saved position from database or file
func (pm *PositionManager) Load() (mysql.Position, error) {
if pm.db != nil {
return pm.loadFromDB()
}
return pm.loadFromFile()
}
func (pm *PositionManager) loadFromDB() (mysql.Position, error) {
var name string
var pos uint32
err := pm.db.QueryRow(
"SELECT log_file, log_pos FROM binlog_position WHERE id = 1",
).Scan(&name, &pos)
if err == sql.ErrNoRows {
return mysql.Position{Name: "", Pos: 0}, nil
}
if err != nil {
return mysql.Position{Name: "", Pos: 0}, err
}
Infof("[INFO] Loaded position: %s:%d", name, pos)
return mysql.Position{Name: name, Pos: pos}, nil
}
func (pm *PositionManager) loadFromFile() (mysql.Position, error) {
data, err := os.ReadFile(pm.positionFile)
if err != nil {
return mysql.Position{Name: "", Pos: 0}, nil
}
var pos mysql.Position
err = json.Unmarshal(data, &pos)
return pos, err
}
// Save saves the current position to database or file
func (pm *PositionManager) Save(pos mysql.Position) error {
if pm.db != nil {
return pm.saveToDB(pos)
}
return pm.saveToFile(pos)
}
// Reset clears the saved position
func (pm *PositionManager) Reset() error {
if pm.db != nil {
return pm.resetInDB()
}
return pm.resetInFile()
}
func (pm *PositionManager) resetInDB() error {
_, err := pm.db.Exec("DELETE FROM binlog_position WHERE id = 1")
return err
}
func (pm *PositionManager) resetInFile() error {
return os.Remove(pm.positionFile)
}
func (pm *PositionManager) saveToDB(pos mysql.Position) error {
result, err := pm.db.Exec(
"INSERT INTO binlog_position (id, log_file, log_pos) VALUES (1, ?, ?) "+
"ON DUPLICATE KEY UPDATE log_file = VALUES(log_file), log_pos = VALUES(log_pos)",
pos.Name, pos.Pos,
)
if err != nil {
Errorf("[ERROR] Failed to save position: %v", err)
return err
}
rowsAffected, _ := result.RowsAffected()
Infof("[INFO] Saved position: %s:%d (rows: %d)", pos.Name, pos.Pos, rowsAffected)
return nil
}
func (pm *PositionManager) saveToFile(pos mysql.Position) error {
data, err := json.Marshal(pos)
if err != nil {
return err
}
return os.WriteFile(pm.positionFile, data, 0644)
}
// GetPositionFile returns the position file path
func (pm *PositionManager) GetPositionFile() string {
return pm.positionFile
}

397
pkg/replica/service.go Normal file
View File

@@ -0,0 +1,397 @@
package replica
import (
"context"
"database/sql"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
_ "github.com/go-sql-driver/mysql"
)
// BinlogSyncService handles MySQL binlog streaming with resilience features
type BinlogSyncService struct {
syncer *replication.BinlogSyncer
streamer *replication.BinlogStreamer
position mysql.Position
handlers *EventHandlers
positionMgr *PositionManager
stopChan chan struct{}
instanceName string
secondaryName string
primaryDB *sql.DB
lastEventPos uint32 // for deduplication
lastEventGTID string // for GTID-based deduplication (future)
healthTicker *time.Ticker // for periodic health checks
}
// NewBinlogSyncService creates a new binlog sync service
func NewBinlogSyncService(cfg BinlogConfig, primaryDB, secondaryDB *sql.DB, secondaryName string) *BinlogSyncService {
syncerCfg := replication.BinlogSyncerConfig{
ServerID: cfg.ServerID,
Flavor: "mariadb",
Host: cfg.Host,
Port: cfg.Port,
User: cfg.User,
Password: cfg.Password,
}
positionFile := fmt.Sprintf("binlog_position_%s_%s.json", cfg.Name, secondaryName)
return &BinlogSyncService{
syncer: replication.NewBinlogSyncer(syncerCfg),
handlers: NewEventHandlers(secondaryDB, secondaryName),
positionMgr: NewPositionManager(secondaryDB, positionFile),
stopChan: make(chan struct{}),
instanceName: cfg.Name,
secondaryName: secondaryName,
primaryDB: primaryDB,
}
}
// Start begins binlog streaming
func (s *BinlogSyncService) Start() error {
if err := s.positionMgr.InitTable(); err != nil {
Warnf("[%s][WARN] Failed to init position table: %v", s.secondaryName, err)
}
pos, err := s.positionMgr.Load()
if err != nil {
return fmt.Errorf("failed to load position: %v", err)
}
if pos.Name == "" {
s.streamer, err = s.syncer.StartSync(mysql.Position{Name: "", Pos: 4})
} else {
s.streamer, err = s.syncer.StartSync(pos)
}
if err != nil {
return fmt.Errorf("failed to start sync: %v", err)
}
Infof("[%s] Started streaming from %s:%d", s.instanceName, pos.Name, pos.Pos)
return s.processEvents()
}
// StartWithStop begins binlog streaming with graceful shutdown
func (s *BinlogSyncService) StartWithStop() error {
if err := s.positionMgr.InitTable(); err != nil {
Warnf("[%s][WARN] Failed to init position table: %v", s.secondaryName, err)
}
pos, err := s.positionMgr.Load()
if err != nil {
return fmt.Errorf("failed to load position: %v", err)
}
if pos.Name == "" {
s.streamer, err = s.syncer.StartSync(mysql.Position{Name: "", Pos: 4})
} else {
s.streamer, err = s.syncer.StartSync(pos)
}
if err != nil {
return fmt.Errorf("failed to start sync: %v", err)
}
Infof("[%s] Started streaming from %s:%d", s.instanceName, pos.Name, pos.Pos)
return s.processEventsWithStop()
}
// processEventsWithStop processes events with graceful shutdown and health checks
func (s *BinlogSyncService) processEventsWithStop() error {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Start health check ticker (every 30 seconds)
s.healthTicker = time.NewTicker(30 * time.Second)
defer s.healthTicker.Stop()
// Start health check goroutine
healthChan := make(chan error, 1)
go s.runHealthChecks(healthChan)
for {
select {
case <-sigChan:
Infof("[%s] Shutting down...", s.secondaryName)
s.positionMgr.Save(s.position)
s.syncer.Close()
return nil
case <-s.stopChan:
Infof("[%s] Stop signal received", s.secondaryName)
s.positionMgr.Save(s.position)
s.syncer.Close()
return nil
case <-s.healthTicker.C:
// Trigger health check
go s.runHealthChecks(healthChan)
case healthErr := <-healthChan:
if healthErr != nil {
Errorf("[%s][HEALTH CHECK] Failed: %v", s.secondaryName, healthErr)
}
default:
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ev, err := s.streamer.GetEvent(ctx)
cancel()
if err == context.DeadlineExceeded {
continue
}
if err != nil {
return fmt.Errorf("failed to get event: %v", err)
}
s.processEvent(ev)
}
}
}
// runHealthChecks performs periodic health checks
func (s *BinlogSyncService) runHealthChecks(resultChan chan<- error) {
// Check secondary DB connection
if err := s.handlers.PingSecondary(); err != nil {
resultChan <- fmt.Errorf("secondary DB ping failed: %v", err)
return
}
// Check if syncer is still healthy
if s.syncer == nil {
resultChan <- fmt.Errorf("syncer is nil")
return
}
resultChan <- nil
}
// processEvents processes events continuously
func (s *BinlogSyncService) processEvents() error {
for {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ev, err := s.streamer.GetEvent(ctx)
cancel()
if err == context.DeadlineExceeded {
continue
}
if err != nil {
return fmt.Errorf("failed to get event: %v", err)
}
s.processEvent(ev)
}
}
// processEvent processes a single event with panic recovery
// Note: Deduplication disabled as it was causing valid events to be skipped
func (s *BinlogSyncService) processEvent(ev *replication.BinlogEvent) {
defer func() {
if r := recover(); r != nil {
Errorf("[%s][PANIC RECOVERED] processEvent panic: %v", s.secondaryName, r)
}
}()
s.position.Pos = ev.Header.LogPos
switch e := ev.Event.(type) {
case *replication.RotateEvent:
s.position.Name = string(e.NextLogName)
Infof("[%s] Rotated to %s", s.instanceName, s.position.Name)
case *replication.TableMapEvent:
s.handlers.HandleTableMap(e)
case *replication.RowsEvent:
s.handlers.HandleRows(ev.Header, e)
case *replication.QueryEvent:
s.handlers.HandleQuery(e)
case *replication.XIDEvent:
s.positionMgr.Save(s.position)
}
if ev.Header.LogPos%1000 == 0 {
s.positionMgr.Save(s.position)
}
}
// Stop signals the service to stop
func (s *BinlogSyncService) Stop() {
close(s.stopChan)
}
// NeedResync checks if a resync is needed
func (s *BinlogSyncService) NeedResync() (bool, error) {
// Check if position file/table exists
pos, err := s.positionMgr.Load()
if err != nil {
return false, err
}
// If no position saved, we need initial transfer
if pos.Name == "" {
Infof("[%s] No saved position found, resync needed", s.secondaryName)
return true, nil
}
// Check if secondary DB has any tables/data
hasData, err := s.checkSecondaryHasData()
if err != nil {
return false, err
}
if !hasData {
Infof("[%s] Secondary database is empty, resync needed", s.secondaryName)
return true, nil
}
return false, nil
}
// checkSecondaryHasData checks if the secondary database has any data
func (s *BinlogSyncService) checkSecondaryHasData() (bool, error) {
// Get the secondary DB from position manager
db := s.positionMgr.db
if db == nil {
return true, nil // Can't check, assume OK
}
// Check if any table has data
var count int
err := db.QueryRow("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'replica'").Scan(&count)
if err != nil {
return false, err
}
return count > 0, nil
}
// RunInitialTransfer performs the initial data transfer
func (s *BinlogSyncService) RunInitialTransfer(batchSize int, excludeSchemas []string) error {
Infof("[%s] Starting initial data transfer...", s.secondaryName)
// Get secondary DB from handlers
secondaryDB := s.handlers.secondaryDB
if secondaryDB == nil {
return fmt.Errorf("secondary DB not available")
}
// Create initial transfer with proper initialization
transfer := &InitialTransfer{
primaryDB: s.primaryDB,
secondaryDB: secondaryDB,
batchSize: batchSize,
workerCount: 1,
excludedDBs: map[string]bool{
"information_schema": true,
"performance_schema": true,
"mysql": true,
"sys": true,
},
checkpointFile: fmt.Sprintf("transfer_progress_%s_%s.json", s.instanceName, s.secondaryName),
progress: TransferProgress{
TablesProcessed: make(map[string]int64),
},
}
if err := transfer.Transfer(excludeSchemas); err != nil {
return fmt.Errorf("transfer failed: %v", err)
}
// Reset position after successful transfer
if err := s.positionMgr.Reset(); err != nil {
return fmt.Errorf("failed to reset position: %v", err)
}
Infof("[%s] Initial transfer completed successfully", s.secondaryName)
return nil
}
// StartWithResync starts binlog streaming with automatic resync if needed
func (s *BinlogSyncService) StartWithResync(batchSize int, excludeSchemas []string) error {
if err := s.positionMgr.InitTable(); err != nil {
Warnf("[%s][WARN] Failed to init position table: %v", s.secondaryName, err)
}
// Check if resync is needed
needResync, err := s.NeedResync()
if err != nil {
return fmt.Errorf("failed to check resync status: %v", err)
}
if needResync {
if err := s.RunInitialTransfer(batchSize, excludeSchemas); err != nil {
return fmt.Errorf("initial transfer failed: %v", err)
}
}
// Start normal streaming
return s.StartWithStop()
}
// GetSecondaryName returns the secondary name
func (s *BinlogSyncService) GetSecondaryName() string {
return s.secondaryName
}
// MultiBinlogSyncService manages multiple binlog sync services
type MultiBinlogSyncService struct {
services []*BinlogSyncService
stopChan chan struct{}
wg sync.WaitGroup
}
// NewMultiBinlogSyncService creates a new multi-secondary binlog sync service
func NewMultiBinlogSyncService() *MultiBinlogSyncService {
return &MultiBinlogSyncService{
services: make([]*BinlogSyncService, 0),
stopChan: make(chan struct{}),
}
}
// AddService adds a service to the multi-service manager
func (m *MultiBinlogSyncService) AddService(service *BinlogSyncService) {
m.services = append(m.services, service)
}
// StartAll starts all services
func (m *MultiBinlogSyncService) StartAll() error {
for _, service := range m.services {
m.wg.Add(1)
go func(svc *BinlogSyncService) {
defer m.wg.Done()
Infof("[%s] Starting binlog sync service", svc.GetSecondaryName())
if err := svc.StartWithResync(1000, []string{"information_schema", "performance_schema", "mysql", "sys"}); err != nil {
Errorf("[%s] Service error: %v", svc.GetSecondaryName(), err)
}
}(service)
}
return nil
}
// StopAll stops all services
func (m *MultiBinlogSyncService) StopAll() {
close(m.stopChan)
for _, service := range m.services {
service.Stop()
}
m.wg.Wait()
}
// Wait waits for all services to complete
func (m *MultiBinlogSyncService) Wait() {
m.wg.Wait()
}
// Len returns the number of services
func (m *MultiBinlogSyncService) Len() int {
return len(m.services)
}

168
pkg/replica/sqlbuilder.go Normal file
View File

@@ -0,0 +1,168 @@
package replica
import (
"fmt"
"strings"
"github.com/go-mysql-org/go-mysql/replication"
)
// SQLBuilder handles SQL statement building
type SQLBuilder struct{}
// NewSQLBuilder creates a new SQL builder
func NewSQLBuilder() *SQLBuilder {
return &SQLBuilder{}
}
// BuildInsert builds an INSERT statement
func (sb *SQLBuilder) BuildInsert(schema, table string, tableMap *replication.TableMapEvent, row []interface{}) string {
if len(row) == 0 {
return fmt.Sprintf("INSERT INTO `%s`.`%s` VALUES ()", schema, table)
}
var columns []string
var values []string
for i, col := range row {
colName := sb.getColumnName(tableMap, i)
columns = append(columns, colName)
values = append(values, formatValue(col))
}
return fmt.Sprintf("INSERT INTO `%s`.`%s` (%s) VALUES (%s)",
schema, table, strings.Join(columns, ", "), strings.Join(values, ", "))
}
// BuildUpdate builds an UPDATE statement
func (sb *SQLBuilder) BuildUpdate(schema, table string, tableMap *replication.TableMapEvent, before, after []interface{}) string {
if len(before) == 0 || len(after) == 0 {
return fmt.Sprintf("UPDATE `%s`.`%s` SET id = id", schema, table)
}
var setClauses []string
var whereClauses []string
for i := range before {
colName := sb.getColumnName(tableMap, i)
if !valuesEqual(before[i], after[i]) {
setClauses = append(setClauses, fmt.Sprintf("%s = %s", colName, formatValue(after[i])))
}
if i == 0 {
whereClauses = append(whereClauses, fmt.Sprintf("%s = %s", colName, formatValue(before[i])))
}
}
return fmt.Sprintf("UPDATE `%s`.`%s` SET %s WHERE %s",
schema, table, strings.Join(setClauses, ", "), strings.Join(whereClauses, " AND "))
}
// BuildDelete builds a DELETE statement
func (sb *SQLBuilder) BuildDelete(schema, table string, tableMap *replication.TableMapEvent, row []interface{}) string {
if len(row) == 0 {
return fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE 1=0", schema, table)
}
colName := sb.getColumnName(tableMap, 0)
whereClause := fmt.Sprintf("%s = %s", colName, formatValue(row[0]))
return fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE %s", schema, table, whereClause)
}
// getColumnName returns the column name at the given index
func (sb *SQLBuilder) getColumnName(tableMap *replication.TableMapEvent, index int) string {
if tableMap == nil || index >= len(tableMap.ColumnName) {
return fmt.Sprintf("`col_%d`", index)
}
return fmt.Sprintf("`%s`", tableMap.ColumnName[index])
}
// formatValue formats a value for SQL
func formatValue(col interface{}) string {
switch v := col.(type) {
case []byte:
if str := string(v); validUTF8(v) {
return fmt.Sprintf("'%s'", strings.ReplaceAll(str, "'", "''"))
}
return fmt.Sprintf("X'%s'", hexEncode(v))
case string:
return fmt.Sprintf("'%s'", strings.ReplaceAll(v, "'", "''"))
case nil:
return "NULL"
default:
return fmt.Sprintf("'%v'", v)
}
}
func validUTF8(b []byte) bool {
for i := 0; i < len(b); {
if b[i] < 0x80 {
i++
} else if (b[i] & 0xE0) == 0xC0 {
if i+1 >= len(b) || (b[i+1]&0xC0) != 0x80 {
return false
}
i += 2
} else if (b[i] & 0xF0) == 0xE0 {
if i+2 >= len(b) || (b[i+1]&0xC0) != 0x80 || (b[i+2]&0xC0) != 0x80 {
return false
}
i += 3
} else if (b[i] & 0xF8) == 0xF0 {
if i+3 >= len(b) || (b[i+1]&0xC0) != 0x80 || (b[i+2]&0xC0) != 0x80 || (b[i+3]&0xC0) != 0x80 {
return false
}
i += 4
} else {
return false
}
}
return true
}
func hexEncode(b []byte) string {
hexChars := "0123456789ABCDEF"
result := make([]byte, len(b)*2)
for i, byteVal := range b {
result[i*2] = hexChars[byteVal>>4]
result[i*2+1] = hexChars[byteVal&0x0F]
}
return string(result)
}
// valuesEqual compares two values, handling slices properly
func valuesEqual(a, b interface{}) bool {
// Handle nil cases
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
// Handle byte slices specially
if aBytes, ok := a.([]byte); ok {
if bBytes, ok := b.([]byte); ok {
return bytesEqual(aBytes, bBytes)
}
return false
}
if _, ok := b.([]byte); ok {
return false
}
// For other types, use fmt.Sprintf comparison
return fmt.Sprintf("%v", a) == fmt.Sprintf("%v", b)
}
// bytesEqual compares two byte slices
func bytesEqual(a, b []byte) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

BIN
replica Executable file

Binary file not shown.