This commit is contained in:
2026-02-12 15:18:55 +01:00
parent ba32991201
commit 002e05eb89
15 changed files with 373 additions and 106 deletions

2
.gitignore vendored
View File

@@ -1 +1,3 @@
.env
bin/
examples/

View File

@@ -7,7 +7,7 @@ A robust MySQL/MariaDB binlog streaming replication service with automatic initi
### Quick Install (Go)
```bash
go install git.ma-al.com/goc_marek/replica/cmd/replica@latest
go install git.ma-al.com/goc_marek/replica@latest
```
### Build from Source
@@ -18,18 +18,15 @@ git clone https://git.ma-al.com/goc_marek/replica.git
cd replica
# Build the service
go build -o replica ./cmd/replica
go build -o bin/replica ./main.go
# Or install globally
go install ./cmd/replica
# Run the service
./bin/replica
```
### Docker
### Docker Compose
```bash
# Build the image
docker build -t replica .
# Run with docker-compose
docker-compose up -d
```
@@ -76,14 +73,14 @@ nano .env
| File | Purpose |
|------|---------|
| [`cmd/replica/main.go`](cmd/replica/main.go) | Application entry point and configuration |
| [`pkg/replica/service.go`](pkg/replica/service.go) | BinlogSyncService - core replication orchestration |
| [`pkg/replica/handlers.go`](pkg/replica/handlers.go) | EventHandlers - binlog event processing with resilience |
| [`pkg/replica/initial_transfer.go`](pkg/replica/initial_transfer.go) | InitialTransfer - bulk data transfer management |
| [`pkg/replica/position.go`](pkg/replica/position.go) | PositionManager - binlog position persistence |
| [`pkg/replica/sqlbuilder.go`](pkg/replica/sqlbuilder.go) | SQLBuilder - SQL statement generation |
| [`pkg/replica/config.go`](pkg/replica/config.go) | Configuration types |
| [`pkg/replica/logging.go`](pkg/replica/logging.go) | Structured logging with Graylog support |
| [`main.go`](main.go) | Application entry point and configuration |
| [`replica/service.go`](replica/service.go) | BinlogSyncService - core replication orchestration |
| [`replica/handlers.go`](replica/handlers.go) | EventHandlers - binlog event processing with resilience |
| [`replica/initial_transfer.go`](replica/initial_transfer.go) | InitialTransfer - bulk data transfer management |
| [`replica/position.go`](replica/position.go) | PositionManager - binlog position persistence |
| [`replica/sqlbuilder.go`](replica/sqlbuilder.go) | SQLBuilder - SQL statement generation |
| [`replica/config.go`](replica/config.go) | Configuration types |
| [`replica/logging.go`](replica/logging.go) | Structured logging with Graylog support |
### Data Flow
@@ -116,10 +113,10 @@ BinlogSyncService.processEvent()
```bash
# Build the service
go build -o replica
go build -o bin/replica ./main.go
# Run the service
./replica
./bin/replica
```
### Configuration
@@ -213,7 +210,8 @@ GRAYLOG_SOURCE=binlog-sync-prod
| Variable | Description | Default |
|----------|-------------|---------|
| `TRANSFER_BATCH_SIZE` | Rows per transfer chunk | `1000` |
| `TRANSFER_BATCH_SIZE` | Rows per transfer chunk | `10000` |
| `TRANSFER_WORKER_COUNT` | Number of parallel transfer workers | `4` |
| `LOCAL_PROJECT_NAME` | Project name for logging | `naluconcept` |
## Resilience Features
@@ -359,7 +357,7 @@ When resync is needed (empty replica or no saved position), the service performs
- Get table schema (column definitions)
- Check row count
- Transfer in chunks using primary key or LIMIT/OFFSET
5. **Progress Checkpointing**: Save progress to JSON file every 1000 rows
5. **Progress Checkpointing**: Save progress to JSON file every 10000 rows
6. **Position Reset**: Clear saved binlog position after successful transfer
7. **Binlog Streaming**: Start streaming from current position
@@ -375,7 +373,7 @@ SELECT * FROM table WHERE pk >= 1000 AND pk < 2000 ORDER BY pk
SELECT * FROM table LIMIT 1000 OFFSET 1000
```
**Batch Size:** Configurable (default: 1000 rows per chunk)
**Batch Size:** Configurable (default: 10000 rows per chunk)
### Progress Checkpointing
@@ -400,7 +398,7 @@ If the transfer is interrupted, it resumes from the last checkpoint.
Transfers can be paused and resumed programmatically:
```go
transfer := NewInitialTransfer(dsn, dsn, 1000, 1)
transfer := NewInitialTransfer(dsn, dsn, 10000, 4)
// Pause during transfer
transfer.Pause()
@@ -445,8 +443,8 @@ type EventHandlers struct {
type InitialTransfer struct {
primaryDB *sql.DB
secondaryDB *sql.DB
batchSize int // Default: 1000
workerCount int // Default: 1
batchSize int // Default: 10000
workerCount int // Default: 4
excludedDBs map[string]bool
checkpointFile string
progress TransferProgress
@@ -575,15 +573,15 @@ SELECT COUNT(*) FROM your_table;
## Performance Considerations
1. **Batch Size**: Start with 1000, adjust based on table size and memory
1. **Batch Size**: Start with 10000, adjust based on table size and memory
2. **Connection Pooling**: `SetMaxOpenConns(25)` for moderate load
3. **Worker Count**: Currently single-threaded (multi-worker planned)
3. **Worker Count**: Default: 4 workers for parallel processing
4. **Schema Caching**: Table schemas cached in memory (auto-updated on drift)
5. **Index Usage**: Chunked transfers require indexed primary key
## Limitations
- **Single-threaded**: One worker processes events sequentially
- **Multi-threaded**: Multiple workers process events in parallel (configurable via TRANSFER_WORKER_COUNT)
- **Position-based**: No GTID support yet (position-based only)
- **Integer PKs**: Chunking requires integer primary key for efficiency
- **No Conflict Resolution**: Concurrent writes not handled
@@ -618,11 +616,8 @@ SELECT COUNT(*) FROM your_table;
```
replica/
├── cmd/
│ └── replica/
│ └── main.go # Entry point
├── pkg/
│ └── replica/
├── main.go # Entry point
── replica/
│ ├── service.go # Replication orchestration
│ ├── handlers.go # Event processing
│ ├── initial_transfer.go # Bulk data transfer
@@ -630,6 +625,11 @@ replica/
│ ├── sqlbuilder.go # SQL generation
│ ├── config.go # Configuration types
│ └── logging.go # Structured logging
├── examples/
│ └── binlog-listener/
│ └── main.go # Example binlog listener
├── bin/
│ └── replica # Compiled binary
├── example.env # Environment template
├── .env # Environment (gitignored)
├── docker-compose.yml # Local development

View File

@@ -12,6 +12,7 @@ services:
- --log_bin=log_bin
- --binlog_format=ROW
- --server-id=${MARIA_SERVER_ID}
- --log_bin_trust_function_creators=1
ports:
- "${MARIA_PRIMARY_PORT}:3306"
networks:
@@ -21,8 +22,8 @@ services:
environment:
MARIADB_USER: ${MARIA_USER}
MARIADB_PASSWORD: ${MARIA_PASS}
MYSQL_DATABASE: ${MARIA_NAME}
MYSQL_ROOT_PASSWORD: ${MARIA_PASS}
MARIADB_DATABASE: ${MARIA_NAME}
MARIADB_ROOT_PASSWORD: ${MARIA_PASS}
restart: always
mariadb-secondary:
@@ -46,27 +47,12 @@ services:
environment:
MARIADB_USER: ${MARIA_USER}
MARIADB_PASSWORD: ${MARIA_PASS}
MYSQL_DATABASE: ${MARIA_NAME}
MYSQL_ROOT_PASSWORD: ${MARIA_PASS}
MARIADB_DATABASE: ${MARIA_NAME}
MARIADB_ROOT_PASSWORD: ${MARIA_PASS}
restart: always
depends_on:
- mariadb-primary
# postgresql:
# container_name: ${POSTGRES_HOST}
# restart: always
# image: postgres:18
# networks:
# repl:
# ports:
# - 5432:5432
# volumes:
# - postgres-data:/var/lib/postgresql:Z
# command: postgres -c shared_buffers=512MB -c work_mem=16MB -c maintenance_work_mem=256MB -c effective_cache_size=4GB -c max_connections=20
# environment:
# POSTGRES_USER: ${POSTGRES_USER}
# POSTGRES_PASSWORD: ${POSTGRES_PASS}
# POSTGRES_DB: ${POSTGRES_NAME}
networks:

View File

@@ -1,21 +1,22 @@
# Primary MariaDB Configuration
MARIA_USER=replica
MARIA_USER=root
MARIA_PASS=replica
MARIA_NAME=replica
MARIA_SERVER_ID=100
MARIA_PRIMARY_HOST=mariadb-primary
MARIA_PRIMARY_HOST=localhost
MARIA_PRIMARY_PORT=3306
MARIA_PRIMARY_NAME=mariadb-primary
# Secondary MariaDB Configuration (comma-separated for multiple)
# Format: host1:port1,host2:port2,host3:port3
# Or just hostnames and use MARIA_SECONDARY_PORTS for ports
MARIA_SECONDARY_HOSTS=mariadb-secondary-1,mariadb-secondary-2,mariadb-secondary-3
MARIA_SECONDARY_PORTS=3307,3308,3309
MARIA_SECONDARY_NAMES=secondary-1,secondary-2,secondary-3
MARIA_SECONDARY_HOSTS=localhost
MARIA_SECONDARY_PORTS=3307
MARIA_SECONDARY_NAMES=mariadb-secondary
# Optional: Override per-secondary credentials (must match number of secondaries or use defaults)
# MARIA_SECONDARY_USERS=replica1,replica2,replica3
# MARIA_SECONDARY_PASSWORDS=pass1,pass2,pass3
MARIA_SECONDARY_USERS=root
MARIA_SECONDARY_PASSWORDS=replica
# Legacy single secondary (for backward compatibility)
# MARIA_SECONDARY_HOST=mariadb-secondary

1
go.mod
View File

@@ -11,6 +11,7 @@ require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/pingcap/errors v0.11.5-0.20250318082626-8f80e5cb09ec // indirect
github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a // indirect

2
go.sum
View File

@@ -12,6 +12,8 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=

View File

@@ -7,7 +7,7 @@ import (
"strconv"
"syscall"
"git.ma-al.com/goc_marek/replica/pkg/replica"
"git.ma-al.com/goc_marek/replica/replica"
_ "github.com/go-sql-driver/mysql"
)
@@ -88,7 +88,9 @@ func main() {
// Start all services
replica.Info("Starting binlog replication...")
multiService.StartAll()
if err := multiService.StartAll(cfg.BatchSize, cfg.WorkerCount, cfg.ExcludeSchemas); err != nil {
replica.Fatalf("Failed to start all services: %v", err)
}
// Wait for shutdown signal
sig := <-sigChan

BIN
replica

Binary file not shown.

View File

@@ -5,6 +5,8 @@ import (
"strconv"
"strings"
"time"
"github.com/joho/godotenv"
)
// BinlogConfig holds the configuration for connecting to MySQL/MariaDB binlog
@@ -47,6 +49,7 @@ type AppConfig struct {
// Transfer settings
BatchSize int
WorkerCount int
ExcludeSchemas []string
// Graylog configuration
@@ -55,8 +58,15 @@ type AppConfig struct {
// LoadEnvConfig loads configuration from environment variables
func LoadEnvConfig() (*AppConfig, error) {
// Load .env file
if err := godotenv.Load(); err != nil {
// It's not an error if the .env file doesn't exist
// We just use the system environment variables
}
cfg := &AppConfig{
BatchSize: 1000,
BatchSize: 10000, // Increase default batch size from 1000 to 10000 for better performance
WorkerCount: 4, // Default to 4 workers for parallel processing
ExcludeSchemas: []string{"information_schema", "performance_schema", "mysql", "sys"},
}
@@ -135,6 +145,11 @@ func LoadEnvConfig() (*AppConfig, error) {
cfg.BatchSize = batchSize
}
// Worker count override
if workerCount := getEnvInt("TRANSFER_WORKER_COUNT", 0); workerCount > 0 {
cfg.WorkerCount = workerCount
}
// Graylog configuration
cfg.Graylog.Enabled = getEnvBool("GRAYLOG_ENABLED", false)
cfg.Graylog.Endpoint = getEnv("GRAYLOG_ENDPOINT", "localhost:12201")

View File

@@ -3,6 +3,7 @@ package replica
import (
"database/sql"
"fmt"
"strings"
"sync"
"time"
@@ -90,11 +91,23 @@ func (h *EventHandlers) HandleQuery(e *replication.QueryEvent) error {
query := string(e.Query)
if h.secondaryDB != nil {
// Skip CREATE TABLE statements to avoid ER_TABLE_EXISTS_ERROR
trimmedQuery := strings.ToUpper(strings.TrimSpace(query))
if strings.HasPrefix(trimmedQuery, "CREATE TABLE") {
Info("[%s][INFO] Skipping CREATE TABLE query: %s", h.secondaryName, query)
return nil
}
_, err := h.secondaryDB.Exec(query)
if err != nil {
// Skip ER_TABLE_EXISTS_ERROR specifically
if strings.Contains(strings.ToUpper(err.Error()), "ER_TABLE_EXISTS_ERROR") {
Info("[%s][INFO] Table already exists, skipping query: %s", h.secondaryName, query)
} else {
Errorf("[%s][ERROR] Query failed: %v", h.secondaryName, err)
}
}
}
return nil
}
@@ -292,7 +305,7 @@ func (h *EventHandlers) executeWithRetry(query string) error {
if err == nil {
if result != nil {
rowsAffected, _ := result.RowsAffected()
Info("[%s][SUCCESS] %d row(s) affected", h.secondaryName, rowsAffected)
Infof("[%s][SUCCESS] %d row(s) affected", h.secondaryName, rowsAffected)
}
return nil
}

View File

@@ -42,7 +42,9 @@ type InitialTransfer struct {
progress TransferProgress
pauseChan chan struct{}
resumeChan chan struct{}
stopChan chan struct{} // New channel for stop signal
isPaused bool
isStopped bool
}
// NewInitialTransfer creates a new initial transfer handler
@@ -51,16 +53,18 @@ func NewInitialTransfer(primaryDSN, secondaryDSN string, batchSize, workerCount
if err != nil {
return nil, fmt.Errorf("failed to connect to primary: %v", err)
}
primaryDB.SetMaxOpenConns(batchSize)
primaryDB.SetMaxIdleConns(2)
// Optimize connection pool settings - use worker count for max open connections
primaryDB.SetMaxOpenConns(workerCount * 2) // 2 connections per worker
primaryDB.SetMaxIdleConns(workerCount)
secondaryDB, err := sql.Open("mysql", secondaryDSN)
if err != nil {
primaryDB.Close()
return nil, fmt.Errorf("failed to connect to secondary: %v", err)
}
secondaryDB.SetMaxOpenConns(batchSize)
secondaryDB.SetMaxIdleConns(2)
// Optimize connection pool settings - use worker count for max open connections
secondaryDB.SetMaxOpenConns(workerCount * 2) // 2 connections per worker
secondaryDB.SetMaxIdleConns(workerCount)
if err := primaryDB.Ping(); err != nil {
primaryDB.Close()
@@ -91,6 +95,7 @@ func NewInitialTransfer(primaryDSN, secondaryDSN string, batchSize, workerCount
},
pauseChan: make(chan struct{}),
resumeChan: make(chan struct{}),
stopChan: make(chan struct{}), // Initialize stop channel
}, nil
}
@@ -122,6 +127,15 @@ func (t *InitialTransfer) Transfer(excludeSchemas []string) error {
}
for i, dbName := range databases {
// Check for stop signal
select {
case <-t.stopChan:
Info("[TRANSFER] Transfer stopped, saving progress...")
t.saveProgress()
return nil
default:
}
// Check for pause signal
t.checkPause()
@@ -140,6 +154,12 @@ func (t *InitialTransfer) Transfer(excludeSchemas []string) error {
t.saveProgress()
if err := t.transferDatabase(dbName); err != nil {
// Check if the error is due to transfer being stopped
if t.isStopped {
Info("[TRANSFER] Transfer stopped, saving progress...")
t.saveProgress()
return nil
}
return fmt.Errorf("failed to transfer database %s: %v", dbName, err)
}
@@ -162,12 +182,24 @@ func (t *InitialTransfer) Transfer(excludeSchemas []string) error {
return nil
}
// checkPause checks if transfer should be paused
// checkPause checks if transfer should be paused or stopped
func (t *InitialTransfer) checkPause() {
select {
case <-t.stopChan:
Info("[TRANSFER] Transfer stopped")
return
default:
}
if t.isPaused {
Info("[TRANSFER] Transfer paused, waiting for resume...")
<-t.resumeChan
select {
case <-t.resumeChan:
Info("[TRANSFER] Transfer resumed")
case <-t.stopChan:
Info("[TRANSFER] Transfer stopped while paused")
t.isPaused = false
}
}
}
@@ -189,6 +221,15 @@ func (t *InitialTransfer) Resume() {
}
}
// Stop stops the transfer
func (t *InitialTransfer) Stop() {
if !t.isStopped {
t.isStopped = true
close(t.stopChan)
Info("[TRANSFER] Transfer stop requested")
}
}
// saveProgress saves the current progress to a checkpoint file
func (t *InitialTransfer) saveProgress() {
t.progress.LastCheckpoint = time.Now()
@@ -268,23 +309,64 @@ func (t *InitialTransfer) transferDatabase(dbName string) error {
Infof("[TRANSFER] Found %d tables in %s", len(tables), dbName)
// All tables need to be processed - we'll resume from where we left off within each table
var unprocessedTables []string
for _, table := range tables {
unprocessedTables = append(unprocessedTables, table)
}
// Create worker pool
jobs := make(chan string, len(unprocessedTables))
results := make(chan error, len(unprocessedTables))
for w := 0; w < t.workerCount; w++ {
go func(workerID int) {
for table := range jobs {
// Check for stop signal
select {
case <-t.stopChan:
results <- nil // Signal stop
return
default:
}
// Check for pause signal
t.checkPause()
// Skip already processed tables
tableKey := dbName + "." + table
if t.progress.TablesProcessed[tableKey] > 0 {
Infof("[TRANSFER] Skipping already processed table: %s", tableKey)
continue
}
if err := t.transferTable(dbName, table); err != nil {
Errorf("[ERROR] Failed to transfer table %s.%s: %v", dbName, table, err)
// Check if the error is due to transfer being stopped
if t.isStopped {
results <- nil // Signal stop
return
}
Errorf("[ERROR] Worker %d: Failed to transfer table %s.%s: %v", workerID, dbName, table, err)
t.mu.Lock()
t.stats.Errors = append(t.stats.Errors, fmt.Sprintf("%s.%s: %v", dbName, table, err))
t.mu.Unlock()
}
results <- nil
}
}(w)
}
// Send jobs to workers
for _, table := range unprocessedTables {
jobs <- table
}
close(jobs)
// Wait for all workers to complete
for range unprocessedTables {
select {
case <-t.stopChan:
Info("[TRANSFER] Transfer stopped, saving progress...")
t.saveProgress()
return nil
case err := <-results:
if err != nil {
return err
}
}
}
return nil
@@ -313,12 +395,26 @@ func (t *InitialTransfer) getTables(dbName string) ([]string, error) {
// transferTable transfers a single table with chunked reads
func (t *InitialTransfer) transferTable(dbName, tableName string) error {
// Check for stop signal
select {
case <-t.stopChan:
Info("[TRANSFER] Transfer stopped, saving progress...")
t.saveProgress()
return nil
default:
}
// Get table structure
schema, err := t.getTableSchema(dbName, tableName)
if err != nil {
return fmt.Errorf("failed to get table schema: %v", err)
}
// Create table on secondary if it doesn't exist
if err := t.createTableOnSecondary(dbName, tableName); err != nil {
return fmt.Errorf("failed to create table on secondary: %v", err)
}
// Check if table has data
hasData, err := t.tableHasData(dbName, tableName)
if err != nil {
@@ -345,7 +441,87 @@ func (t *InitialTransfer) transferTable(dbName, tableName string) error {
return t.transferTableFullScan(dbName, tableName, schema, count)
}
// Check if primary key is numeric before attempting chunked transfer
if isPrimaryKeyNumeric(dbName, tableName, pkColumn, t.primaryDB) {
return t.transferTableChunked(dbName, tableName, schema, pkColumn, count)
} else {
Warnf("[WARN] Non-numeric primary key found for %s.%s, using full scan", dbName, tableName)
return t.transferTableFullScan(dbName, tableName, schema, count)
}
}
// createTableOnSecondary creates the table on the secondary database if it doesn't exist
func (t *InitialTransfer) createTableOnSecondary(dbName, tableName string) error {
// Get CREATE TABLE statement from primary
var createSQL string
query := fmt.Sprintf("SHOW CREATE TABLE `%s`.`%s`", dbName, tableName)
err := t.primaryDB.QueryRow(query).Scan(&tableName, &createSQL)
if err != nil {
return fmt.Errorf("failed to get CREATE TABLE statement: %v", err)
}
// Execute CREATE TABLE on secondary (with IF NOT EXISTS and foreign key checks disabled)
// This prevents errors due to foreign key constraints when tables are created in random order
_, err = t.secondaryDB.Exec("SET FOREIGN_KEY_CHECKS = 0")
if err != nil {
return fmt.Errorf("failed to disable foreign key checks: %v", err)
}
createSQL = strings.Replace(createSQL, "CREATE TABLE", "CREATE TABLE IF NOT EXISTS", 1)
_, err = t.secondaryDB.Exec(createSQL)
if err != nil {
// Try again without foreign key constraints if we still get an error
modifiedSQL := removeForeignKeyConstraints(createSQL)
_, err = t.secondaryDB.Exec(modifiedSQL)
if err != nil {
return fmt.Errorf("failed to create table: %v", err)
}
Infof("[TRANSFER] Table %s.%s created on secondary without foreign key constraints", dbName, tableName)
} else {
Infof("[TRANSFER] Table %s.%s created on secondary", dbName, tableName)
}
_, err = t.secondaryDB.Exec("SET FOREIGN_KEY_CHECKS = 1")
if err != nil {
return fmt.Errorf("failed to enable foreign key checks: %v", err)
}
return nil
}
// removeForeignKeyConstraints removes foreign key constraints from CREATE TABLE SQL
func removeForeignKeyConstraints(sql string) string {
// This is a simple implementation - more complex cases might require proper SQL parsing
// Remove FOREIGN KEY clauses
sql = strings.ReplaceAll(sql, "FOREIGN KEY", "/* FOREIGN KEY */")
return sql
}
// isPrimaryKeyNumeric checks if the primary key column is numeric type
func isPrimaryKeyNumeric(dbName, tableName, pkColumn string, db *sql.DB) bool {
query := fmt.Sprintf(
"SELECT DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS "+
"WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND COLUMN_NAME = '%s'",
dbName, tableName, pkColumn,
)
var dataType string
err := db.QueryRow(query).Scan(&dataType)
if err != nil {
return false
}
// List of numeric data types (MySQL/MariaDB)
numericTypes := []string{"tinyint", "smallint", "mediumint", "int", "bigint",
"decimal", "numeric", "float", "double", "real"}
for _, t := range numericTypes {
if strings.Contains(strings.ToLower(dataType), t) {
return true
}
}
return false
}
// tableHasData checks if a table has any rows
@@ -442,9 +618,24 @@ func (t *InitialTransfer) transferTableChunked(dbName, tableName string, columns
startOffset = minMax.Min
}
// Check if table has already been fully processed
if startOffset >= minMax.Max {
Infof("[TRANSFER] Table %s.%s already fully processed, skipping", dbName, tableName)
return nil
}
// Transfer chunks
offset := startOffset
for offset < minMax.Max {
// Check for stop signal
select {
case <-t.stopChan:
Info("[TRANSFER] Transfer stopped, saving progress...")
t.saveProgress()
return nil
default:
}
// Check for pause signal
t.checkPause()
@@ -462,6 +653,12 @@ func (t *InitialTransfer) transferTableChunked(dbName, tableName string, columns
rows.Close()
if err != nil {
// Check if the error is due to transfer being stopped
if t.isStopped {
Info("[TRANSFER] Transfer stopped, saving progress...")
t.saveProgress()
return nil
}
return fmt.Errorf("failed to insert chunk: %v", err)
}
@@ -470,14 +667,15 @@ func (t *InitialTransfer) transferTableChunked(dbName, tableName string, columns
t.progress.TablesProcessed[tableKey] = offset + int64(t.batchSize)
t.mu.Unlock()
// Save checkpoint every 1000 rows
if rowsInserted%1000 == 0 {
// Save checkpoint every 100 rows (for testing purposes)
if rowsInserted%100 == 0 {
t.saveProgress()
}
offset += int64(t.batchSize)
}
Infof("[TRANSFER] Table %s.%s fully processed", dbName, tableName)
return nil
}
@@ -510,9 +708,24 @@ func (t *InitialTransfer) transferTableFullScan(dbName, tableName string, column
// Get starting offset from progress
startOffset := t.progress.TablesProcessed[tableKey]
// Check if table has already been fully processed
if startOffset >= rowCount {
Infof("[TRANSFER] Table %s.%s already fully processed, skipping", dbName, tableName)
return nil
}
var offset int64 = startOffset
for offset < rowCount {
// Check for stop signal
select {
case <-t.stopChan:
Info("[TRANSFER] Transfer stopped, saving progress...")
t.saveProgress()
return nil
default:
}
// Check for pause signal
t.checkPause()
@@ -530,6 +743,12 @@ func (t *InitialTransfer) transferTableFullScan(dbName, tableName string, column
rows.Close()
if err != nil {
// Check if the error is due to transfer being stopped
if t.isStopped {
Info("[TRANSFER] Transfer stopped, saving progress...")
t.saveProgress()
return nil
}
return fmt.Errorf("failed to insert chunk: %v", err)
}
@@ -538,19 +757,27 @@ func (t *InitialTransfer) transferTableFullScan(dbName, tableName string, column
t.progress.TablesProcessed[tableKey] = offset
t.mu.Unlock()
// Save checkpoint every 1000 rows
if rowsInserted%1000 == 0 {
// Save checkpoint every 100 rows (for testing purposes)
if rowsInserted%100 == 0 {
t.saveProgress()
}
offset += int64(t.batchSize)
}
Infof("[TRANSFER] Table %s.%s fully processed", dbName, tableName)
return nil
}
// insertRows inserts rows from a query result into the secondary database
func (t *InitialTransfer) insertRows(db *sql.DB, dbName, tableName string, columns []ColumnInfo, rows *sql.Rows) (int64, error) {
// Disable foreign key checks to avoid errors due to constraint violations
_, err := db.Exec("SET FOREIGN_KEY_CHECKS = 0")
if err != nil {
return 0, fmt.Errorf("failed to disable foreign key checks: %v", err)
}
defer db.Exec("SET FOREIGN_KEY_CHECKS = 1") // Ensure foreign key checks are re-enabled
// Build INSERT statement
placeholders := make([]string, len(columns))
colNames := make([]string, len(columns))
@@ -560,7 +787,7 @@ func (t *InitialTransfer) insertRows(db *sql.DB, dbName, tableName string, colum
}
insertSQL := fmt.Sprintf(
"INSERT INTO `%s`.`%s` (%s) VALUES (%s)",
"INSERT IGNORE INTO `%s`.`%s` (%s) VALUES (%s)",
dbName, tableName, strings.Join(colNames, ", "), strings.Join(placeholders, ", "),
)
@@ -587,7 +814,9 @@ func (t *InitialTransfer) insertRows(db *sql.DB, dbName, tableName string, colum
_, err := stmt.Exec(values...)
if err != nil {
return rowCount, fmt.Errorf("failed to insert row: %v", err)
// Log the error but continue with other rows
Warnf("[TRANSFER] Failed to insert row into %s.%s: %v", dbName, tableName, err)
continue
}
rowCount++
}
@@ -598,7 +827,6 @@ func (t *InitialTransfer) insertRows(db *sql.DB, dbName, tableName string, colum
t.mu.Unlock()
Infof("[TRANSFER] Inserted %d rows into %s.%s", rowCount, dbName, tableName)
return rowCount, rows.Err()
}

View File

@@ -275,7 +275,7 @@ func (s *BinlogSyncService) checkSecondaryHasData() (bool, error) {
}
// RunInitialTransfer performs the initial data transfer
func (s *BinlogSyncService) RunInitialTransfer(batchSize int, excludeSchemas []string) error {
func (s *BinlogSyncService) RunInitialTransfer(batchSize, workerCount int, excludeSchemas []string) error {
Infof("[%s] Starting initial data transfer...", s.secondaryName)
// Get secondary DB from handlers
@@ -289,7 +289,7 @@ func (s *BinlogSyncService) RunInitialTransfer(batchSize int, excludeSchemas []s
primaryDB: s.primaryDB,
secondaryDB: secondaryDB,
batchSize: batchSize,
workerCount: 1,
workerCount: workerCount,
excludedDBs: map[string]bool{
"information_schema": true,
"performance_schema": true,
@@ -300,11 +300,28 @@ func (s *BinlogSyncService) RunInitialTransfer(batchSize int, excludeSchemas []s
progress: TransferProgress{
TablesProcessed: make(map[string]int64),
},
stopChan: make(chan struct{}),
}
if err := transfer.Transfer(excludeSchemas); err != nil {
// Start transfer in goroutine
errChan := make(chan error)
go func() {
errChan <- transfer.Transfer(excludeSchemas)
}()
// Wait for transfer completion or stop signal
select {
case <-s.stopChan:
Info("[%s] Initial transfer stopping...", s.secondaryName)
transfer.Stop()
<-errChan // Wait for transfer to finish stopping
Info("[%s] Initial transfer stopped", s.secondaryName)
return nil
case err := <-errChan:
if err != nil {
return fmt.Errorf("transfer failed: %v", err)
}
}
// Reset position after successful transfer
if err := s.positionMgr.Reset(); err != nil {
@@ -316,7 +333,7 @@ func (s *BinlogSyncService) RunInitialTransfer(batchSize int, excludeSchemas []s
}
// StartWithResync starts binlog streaming with automatic resync if needed
func (s *BinlogSyncService) StartWithResync(batchSize int, excludeSchemas []string) error {
func (s *BinlogSyncService) StartWithResync(batchSize, workerCount int, excludeSchemas []string) error {
if err := s.positionMgr.InitTable(); err != nil {
Warnf("[%s][WARN] Failed to init position table: %v", s.secondaryName, err)
}
@@ -328,7 +345,7 @@ func (s *BinlogSyncService) StartWithResync(batchSize int, excludeSchemas []stri
}
if needResync {
if err := s.RunInitialTransfer(batchSize, excludeSchemas); err != nil {
if err := s.RunInitialTransfer(batchSize, workerCount, excludeSchemas); err != nil {
return fmt.Errorf("initial transfer failed: %v", err)
}
}
@@ -363,13 +380,13 @@ func (m *MultiBinlogSyncService) AddService(service *BinlogSyncService) {
}
// StartAll starts all services
func (m *MultiBinlogSyncService) StartAll() error {
func (m *MultiBinlogSyncService) StartAll(batchSize, workerCount int, excludeSchemas []string) error {
for _, service := range m.services {
m.wg.Add(1)
go func(svc *BinlogSyncService) {
defer m.wg.Done()
Infof("[%s] Starting binlog sync service", svc.GetSecondaryName())
if err := svc.StartWithResync(1000, []string{"information_schema", "performance_schema", "mysql", "sys"}); err != nil {
if err := svc.StartWithResync(batchSize, workerCount, excludeSchemas); err != nil {
Errorf("[%s] Service error: %v", svc.GetSecondaryName(), err)
}
}(service)