MariaDB/MySQL Binlog Replication Service
A robust MySQL/MariaDB binlog streaming replication service with automatic initial data transfer, resilience features, and comprehensive error handling.
Features
Core Functionality
- Binlog Streaming: Real-time replication from MySQL/MariaDB binlog events
- Initial Data Transfer: Automatic bulk data transfer when resync is needed
- Multi-table Support: Replicates INSERT, UPDATE, DELETE events for multiple tables
- Position Persistence: Saves and resumes from last processed binlog position
- Schema Filtering: Only replicates events for the configured schema (default: "replica")
Resilience & Error Handling
- Panic Recovery: Automatic recovery from unexpected panics in event handlers
- Retry Logic: Exponential backoff retry for failed SQL operations
- Connection Health Checks: Periodic connection health monitoring with auto-reconnect
- Schema Drift Detection: Detects and handles schema changes during replication
- Graceful Degradation: Skips problematic tables after repeated failures
- Auto-Reconnect: Automatic reconnection on connection errors
Transfer Features
- Chunked Transfers: Efficient batch transfers using primary key ranges
- Progress Checkpointing: Saves transfer progress for resume after interruption
- Pause/Resume: Support for pausing and resuming initial transfers
- Transfer Statistics: Detailed logging of transfer progress and errors
Architecture
Components
| File | Purpose |
|---|---|
main.go |
Application entry point and configuration |
service.go |
BinlogSyncService - core replication orchestration |
handlers.go |
EventHandlers - binlog event processing with resilience |
initial_transfer.go |
InitialTransfer - bulk data transfer management |
position.go |
PositionManager - binlog position persistence |
sqlbuilder.go |
SQLBuilder - SQL statement generation |
config.go |
Configuration types |
Data Flow
Primary DB (MySQL/MariaDB)
|
v
Binlog Stream
|
v
BinlogSyncService.processEvent()
|
+---> Panic Recovery Wrapper
|
+---> EventHandlers.HandleRows()
| |
| +---> Schema Filter (only "replica" schema)
| +---> Schema Drift Check
| +---> Retry Logic (if needed)
| +---> Execute SQL
|
+---> PositionManager.Save()
|
+---> Health Checks (every 30s)
Usage
Quick Start
# Build the service
go build -o replica
# Run the service
./replica
Configuration
Configuration is done in main.go:
// Primary MySQL/MariaDB connection (binlog source)
primaryDSN := "root:password@tcp(localhost:3306)/?multiStatements=true"
// Secondary (replica) connection
secondaryDSN := "root:password@tcp(localhost:3307)/replica?multiStatements=true"
// MariaDB primary configuration
primaryConfig := BinlogConfig{
Host: "localhost",
Port: 3306,
User: "root",
Password: "password",
ServerID: 100,
Name: "mariadb-primary",
}
// Create service with both connections
service := NewBinlogSyncService(primaryConfig, primaryDB, secondaryDB)
// Start with automatic resync (checks if transfer is needed)
if err := service.StartWithResync(
batchSize: 1000, // Rows per transfer chunk
excludeSchemas: []string{ // Schemas to skip during transfer
"information_schema",
"performance_schema",
"mysql",
"sys",
},
); err != nil {
log.Fatalf("Service error: %v", err)
}
Resilience Features
Panic Recovery
All event handlers include automatic panic recovery:
defer func() {
if r := recover(); r != nil {
log.Printf("[PANIC RECOVERED] %v", r)
}
}()
This prevents a single malformed event from crashing the entire service. Recovery is implemented in:
Retry Logic
Failed SQL operations are retried with exponential backoff:
func (h *EventHandlers) executeWithRetry(query string) error {
for attempt := 0; attempt <= h.retryAttempts; attempt++ {
if attempt > 0 {
delay := h.retryDelay * time.Duration(1<<attempt)
log.Printf("[RETRY] Retrying in %v (attempt %d/%d)", delay, attempt, h.retryAttempts)
time.Sleep(delay)
}
result, err := h.secondaryDB.Exec(query)
if err == nil {
return nil
}
// Check if connection error and reconnect
if h.isConnectionError(err) {
h.reconnect()
}
}
return lastErr
}
Configuration:
retryAttempts: Number of retry attempts (default: 3)retryDelay: Base delay between retries (default: 100ms)
Connection Health Checks
The service performs periodic health checks every 30 seconds:
// Checks:
// 1. Ping secondary database connection
// 2. Verify syncer is still active
// 3. Log health status
If a health check fails, the error is logged. The next SQL operation will trigger an auto-reconnect.
Schema Drift Detection
Before applying events, the service checks for schema changes:
func (h *EventHandlers) detectSchemaDrift(schema, table string) bool {
currentHash, _ := h.getSchemaHash(schema, table)
lastHash := h.lastSchemaHash[schema+"."+table]
if lastHash != "" && lastHash != currentHash {
log.Printf("[DRIFT] Schema changed for %s.%s", schema, table)
return true // Drift detected
}
h.lastSchemaHash[schema+"."+table] = currentHash
return false
}
If schema drift is detected, the table is marked as failed and temporarily skipped.
Graceful Degradation
Tables with repeated failures are temporarily skipped:
// After 5 consecutive failures, skip the table
if h.failedTables[key] >= h.maxFailures {
log.Printf("[SKIPPED] Too many failures for %s.%s", schema, table)
return nil // Skip this event
}
// Reset count on successful operation
h.failedTables[key] = 0
Configuration:
maxFailures: Consecutive failures before skipping (default: 5)
Auto-Reconnect
Connection errors trigger automatic reconnection:
func (h *EventHandlers) reconnect() {
h.secondaryDB.Close()
maxRetries := 5
for i := 0; i < maxRetries; i++ {
h.secondaryDB, err = sql.Open(dsn)
// Configure connection pool
// Ping to verify
if err == nil {
log.Printf("[RECONNECT] Successfully reconnected")
return
}
time.Sleep(time.Duration(i+1) * time.Second)
}
}
Detected connection errors:
- "connection refused"
- "connection reset"
- "broken pipe"
- "timeout"
- "driver: bad connection"
- "invalid connection"
Initial Transfer
When resync is needed (empty replica or no saved position), the service performs an initial data transfer:
Transfer Process
- Detection: Check if secondary database is empty or no position saved
- Database Enumeration: List all databases from primary
- Schema Exclusion: Skip excluded schemas (information_schema, mysql, etc.)
- Table Transfer: For each table:
- Get table schema (column definitions)
- Check row count
- Transfer in chunks using primary key or LIMIT/OFFSET
- Progress Checkpointing: Save progress to JSON file every 1000 rows
- Position Reset: Clear saved binlog position after successful transfer
- Binlog Streaming: Start streaming from current position
Chunked Transfer
Tables are transferred in chunks for efficiency and memory safety:
-- Using primary key (efficient, preserves order)
SELECT * FROM table WHERE pk >= 1000 AND pk < 2000 ORDER BY pk
-- Without primary key (slower, may skip rows on updates)
SELECT * FROM table LIMIT 1000 OFFSET 1000
Batch Size: Configurable (default: 1000 rows per chunk)
Progress Checkpointing
Transfer progress is saved to transfer_progress_{instance}.json:
{
"DatabasesProcessed": 2,
"CurrentDatabase": "mydb",
"TablesProcessed": {
"mydb.users": 5000,
"mydb.orders": 10000
},
"LastCheckpoint": "2024-01-15T10:30:00Z"
}
If the transfer is interrupted, it resumes from the last checkpoint.
Pause/Resume
Transfers can be paused and resumed programmatically:
transfer := NewInitialTransfer(dsn, dsn, 1000, 1)
// Pause during transfer
transfer.Pause()
// Resume later
transfer.Resume()
Configuration Reference
BinlogConfig
type BinlogConfig struct {
Host string // MySQL/MariaDB host
Port int // MySQL/MariaDB port
User string // Replication user
Password string // Password
ServerID uint32 // Unique server ID for replication
Name string // Instance name for logging
}
EventHandlers
type EventHandlers struct {
secondaryDB *sql.DB
tableMapCache map[uint64]*replication.TableMapEvent
sqlBuilder *SQLBuilder
failedTables map[string]int
maxFailures int // Default: 5
retryAttempts int // Default: 3
retryDelay time.Duration // Default: 100ms
lastSchemaHash map[string]string
}
InitialTransfer
type InitialTransfer struct {
primaryDB *sql.DB
secondaryDB *sql.DB
batchSize int // Default: 1000
workerCount int // Default: 1
excludedDBs map[string]bool
checkpointFile string
progress TransferProgress
}
TransferProgress
type TransferProgress struct {
DatabasesProcessed int
CurrentDatabase string
TablesProcessed map[string]int64 // "schema.table" -> rows transferred
LastCheckpoint time.Time
}
TransferStats
type TransferStats struct {
TotalRows int64
TotalTables int
TransferTime int64 // milliseconds
Errors []string
Progress TransferProgress
}
Logging
The service uses structured logging with prefixes:
| Prefix | Example | Description |
|---|---|---|
[INSERT] |
[INSERT] 1 row(s) affected |
Successful INSERT |
[UPDATE] |
[UPDATE] 1 row(s) affected |
Successful UPDATE |
[DELETE] |
[DELETE] 1 row(s) affected |
Successful DELETE |
[SUCCESS] |
[SUCCESS] 1 row(s) affected |
SQL operation succeeded |
[ERROR] |
[ERROR] INSERT failed after retries |
Operation failed |
[WARN] |
[WARN] Schema drift detected |
Warning condition |
[PANIC] |
[PANIC RECOVERED] panic message |
Recovered panic |
[RETRY] |
[RETRY] Retrying in 200ms |
Retry attempt |
[DRIFT] |
[DRIFT] Schema changed |
Schema drift detected |
[SKIPPED] |
[SKIPPED] Too many failures |
Table skipped |
[FAILURE] |
[FAILURE] failure count: 3/5 |
Table failure count |
[HEALTH] |
[HEALTH CHECK] Failed |
Health check result |
[TRANSFER] |
[TRANSFER] Found 5 tables |
Transfer progress |
[INFO] |
[INFO] Saved position |
Informational |
[ROTATE] |
[mariadb-primary] Rotated to binlog.000001 |
Binlog rotation |
[RECONNECT] |
[RECONNECT] Successfully reconnected |
Reconnection result |
Schema Filtering
By default, only events for the "replica" schema are replicated:
// handlers.go line 54
if schemaName != "replica" {
return nil // Skip all other schemas
}
To replicate all schemas, comment out or modify this filter.
Error Handling
Common Errors
| Error | Cause | Resolution |
|---|---|---|
connection refused |
Secondary DB down | Check secondary DB status |
schema drift detected |
Table schema changed | Manual intervention required |
too many failures |
Table repeatedly failing | Check table compatibility |
failed to get event |
Binlog stream interrupted | Service auto-recovers |
expected 4 destination arguments |
SHOW MASTER STATUS columns | Fixed in code |
assignment to entry in nil map |
Uninitialized map | Fixed in code |
Recovery Procedures
-
Secondary DB Connection Lost:
- Service auto-reconnects (up to 5 attempts)
- Check secondary DB logs
- Verify network connectivity
-
Schema Drift Detected:
- Stop service
- Compare schemas:
SHOW CREATE TABLE schema.tableon both DBs - Sync schemas manually
- Reset position:
DELETE FROM binlog_position - Restart service
-
Transfer Interrupted:
- Service resumes from
transfer_progress_{instance}.json - Check checkpoint file for progress
- Delete checkpoint file to restart transfer
- Service resumes from
-
Event Processing Stuck:
- Check health check logs
- Verify binlog position:
SELECT * FROM binlog_position - Restart service if needed
- Clear position:
DELETE FROM binlog_position
Monitoring
Key Metrics to Watch
- Replication Lag: Time since last event processed
- Rows Replicated: Total INSERT/UPDATE/DELETE count
- Tables Synced: Number of tables synchronized
- Error Count: Failed operations counter
- Success Rate: Successful / total operations
Manual Verification
-- Check position on secondary
SELECT * FROM binlog_position;
-- Check row counts
SELECT COUNT(*) FROM replica.your_table;
-- Compare with primary
SELECT COUNT(*) FROM your_table;
Performance Considerations
- Batch Size: Start with 1000, adjust based on table size and memory
- Connection Pooling:
SetMaxOpenConns(25)for moderate load - Worker Count: Currently single-threaded (multi-worker planned)
- Schema Caching: Table schemas cached in memory (auto-updated on drift)
- Index Usage: Chunked transfers require indexed primary key
Limitations
- Single-threaded: One worker processes events sequentially
- Position-based: No GTID support yet (position-based only)
- Integer PKs: Chunking requires integer primary key for efficiency
- No Conflict Resolution: Concurrent writes not handled
- No Data Validation: Assumes source data is valid
Known Issues & Fixes
Fixed Issues
SHOW MASTER STATUSreturning 4 columns (fixed by scanning 4 variables)nil mappanic in transfer (fixed by initializing TablesProcessed)- Event deduplication skipping valid events (disabled deduplication)
- Checkpoint file path empty (fixed by setting instance-specific path)
Workarounds
- Position 0 Events: Deduplication disabled for now
- Schema Changes: Service marks table as failed, manual restart required
Future Enhancements
- GTID-based replication for better consistency
- Multi-threaded replication for higher throughput
- Conflict detection and resolution
- Prometheus metrics endpoint
- REST API for management
- Kubernetes operator
- Configuration file (YAML/JSON)
- Signal handling for dynamic config
File Structure
/home/marek/coding/test/replica/
├── main.go # Entry point, connections
├── service.go # Replication orchestration
├── handlers.go # Event processing
├── initial_transfer.go # Bulk data transfer
├── position.go # Position persistence
├── sqlbuilder.go # SQL generation
├── config.go # Config types
├── README.md # This file
├── docker-compose.yml # Local development
├── .env # Environment (optional)
└── go.mod # Go module
License
MIT License