MariaDB/MySQL Binlog Replication Service
A robust MySQL/MariaDB binlog streaming replication service with automatic initial data transfer, resilience features, and comprehensive error handling. Supports single or multi-secondary replica configurations with optional Graylog logging.
Installation
Quick Install (Go)
go install git.ma-al.com/goc_marek/replica@latest
Build from Source
# Clone the repository
git clone https://git.ma-al.com/goc_marek/replica.git
cd replica
# Build the service
go build -o bin/replica ./main.go
# Run the service
./bin/replica
Docker Compose
# Run with docker-compose
docker-compose up -d
Quick Start
# Copy example environment
cp example.env .env
# Edit .env with your configuration
nano .env
# Run the service
./replica
Features
Core Functionality
- Binlog Streaming: Real-time replication from MySQL/MariaDB binlog events
- Initial Data Transfer: Automatic bulk data transfer when resync is needed
- Multi-table Support: Replicates INSERT, UPDATE, DELETE events for multiple tables
- Position Persistence: Saves and resumes from last processed binlog position
- Schema Filtering: Only replicates events for the configured schema (default: "replica")
Resilience & Error Handling
- Panic Recovery: Automatic recovery from unexpected panics in event handlers
- Retry Logic: Exponential backoff retry for failed SQL operations
- Connection Health Checks: Periodic connection health monitoring with auto-reconnect
- Schema Drift Detection: Detects and handles schema changes during replication
- Graceful Degradation: Skips problematic tables after repeated failures
- Auto-Reconnect: Automatic reconnection on connection errors
Transfer Features
- Chunked Transfers: Efficient batch transfers using primary key ranges
- Progress Checkpointing: Saves transfer progress for resume after interruption
- Pause/Resume: Support for pausing and resuming initial transfers
- Transfer Statistics: Detailed logging of transfer progress and errors
Architecture
Components
| File | Purpose |
|---|---|
main.go |
Application entry point and configuration |
replica/service.go |
BinlogSyncService - core replication orchestration |
replica/handlers.go |
EventHandlers - binlog event processing with resilience |
replica/initial_transfer.go |
InitialTransfer - bulk data transfer management |
replica/position.go |
PositionManager - binlog position persistence |
replica/sqlbuilder.go |
SQLBuilder - SQL statement generation |
replica/config.go |
Configuration types |
replica/logging.go |
Structured logging with Graylog support |
Data Flow
Primary DB (MySQL/MariaDB)
|
v
Binlog Stream
|
v
BinlogSyncService.processEvent()
|
+---> Panic Recovery Wrapper
|
+---> EventHandlers.HandleRows()
| |
| +---> Schema Filter (only "replica" schema)
| +---> Schema Drift Check
| +---> Retry Logic (if needed)
| +---> Execute SQL
|
+---> PositionManager.Save()
|
+---> Health Checks (every 30s)
Usage
Quick Start
# Build the service
go build -o bin/replica ./main.go
# Run the service
./bin/replica
Configuration
Environment Variables
All configuration is done via environment variables in the .env file:
# Copy example environment
cp example.env .env
# Edit with your settings
nano .env
Primary Database Configuration
| Variable | Description | Default |
|---|---|---|
MARIA_PRIMARY_HOST |
Primary database hostname | mariadb-primary |
MARIA_PRIMARY_PORT |
Primary database port | 3306 |
MARIA_USER |
Replication user | replica |
MARIA_PASS |
Replication password | replica |
MARIA_SERVER_ID |
Unique server ID for binlog | 100 |
MARIA_PRIMARY_NAME |
Instance name for logging | mariadb-primary |
Multi-Secondary Replica Configuration
The service supports replicating to multiple secondary databases simultaneously. Configure secondaries using comma-separated values:
| Variable | Description | Example |
|---|---|---|
MARIA_SECONDARY_HOSTS |
Comma-separated hostnames | secondary-1,secondary-2,secondary-3 |
MARIA_SECONDARY_PORTS |
Comma-separated ports | 3307,3308,3309 |
MARIA_SECONDARY_NAMES |
Comma-separated instance names | replica-1,replica-2,replica-3 |
MARIA_SECONDARY_USERS |
Per-secondary users (optional) | replica1,replica2,replica3 |
MARIA_SECONDARY_PASSWORDS |
Per-secondary passwords (optional) | pass1,pass2,pass3 |
Example: Single Secondary
MARIA_SECONDARY_HOSTS=mariadb-secondary
MARIA_SECONDARY_PORTS=3307
MARIA_SECONDARY_NAMES=secondary-1
Example: Three Secondaries
MARIA_SECONDARY_HOSTS=secondary-1,secondary-2,secondary-3
MARIA_SECONDARY_PORTS=3307,3308,3309
MARIA_SECONDARY_NAMES=replica-1,replica-2,replica-3
MARIA_SECONDARY_USERS=replica1,replica2,replica3
MARIA_SECONDARY_PASSWORDS=secret1,secret2,secret3
Example: Two Secondaries with Different Credentials
MARIA_SECONDARY_HOSTS=secondary-1,secondary-2
MARIA_SECONDARY_PORTS=3307,3308
MARIA_SECONDARY_NAMES=replica-east,replica-west
MARIA_SECONDARY_USERS=replica_east,replica_west
MARIA_SECONDARY_PASSWORDS=east_secret,west_secret
Note: If MARIA_SECONDARY_USERS or MARIA_SECONDARY_PASSWORDS are not provided, the default MARIA_USER and MARIA_PASS will be used for all secondaries.
Graylog Configuration
| Variable | Description | Default |
|---|---|---|
GRAYLOG_ENABLED |
Enable Graylog logging | false |
GRAYLOG_ENDPOINT |
Graylog GELF endpoint | localhost:12201 |
GRAYLOG_PROTOCOL |
Protocol (udp/tcp) | udp |
GRAYLOG_TIMEOUT |
Connection timeout | 5s |
GRAYLOG_SOURCE |
Source name for logs | binlog-sync |
Enable Graylog Logging
# Edit .env and set:
GRAYLOG_ENABLED=true
GRAYLOG_ENDPOINT=graylog.example.com:12201
GRAYLOG_PROTOCOL=udp
GRAYLOG_SOURCE=binlog-sync-prod
Other Settings
| Variable | Description | Default |
|---|---|---|
TRANSFER_BATCH_SIZE |
Rows per transfer chunk | 10000 |
TRANSFER_WORKER_COUNT |
Number of parallel transfer workers | 4 |
LOCAL_PROJECT_NAME |
Project name for logging | naluconcept |
Resilience Features
Panic Recovery
All event handlers include automatic panic recovery:
defer func() {
if r := recover(); r != nil {
log.Printf("[PANIC RECOVERED] %v", r)
}
}()
This prevents a single malformed event from crashing the entire service. Recovery is implemented in:
Retry Logic
Failed SQL operations are retried with exponential backoff:
func (h *EventHandlers) executeWithRetry(query string) error {
for attempt := 0; attempt <= h.retryAttempts; attempt++ {
if attempt > 0 {
delay := h.retryDelay * time.Duration(1<<attempt)
log.Printf("[RETRY] Retrying in %v (attempt %d/%d)", delay, attempt, h.retryAttempts)
time.Sleep(delay)
}
result, err := h.secondaryDB.Exec(query)
if err == nil {
return nil
}
// Check if connection error and reconnect
if h.isConnectionError(err) {
h.reconnect()
}
}
return lastErr
}
Configuration:
retryAttempts: Number of retry attempts (default: 3)retryDelay: Base delay between retries (default: 100ms)
Connection Health Checks
The service performs periodic health checks every 30 seconds:
// Checks:
// 1. Ping secondary database connection
// 2. Verify syncer is still active
// 3. Log health status
If a health check fails, the error is logged. The next SQL operation will trigger an auto-reconnect.
Schema Drift Detection
Before applying events, the service checks for schema changes:
func (h *EventHandlers) detectSchemaDrift(schema, table string) bool {
currentHash, _ := h.getSchemaHash(schema, table)
lastHash := h.lastSchemaHash[schema+"."+table]
if lastHash != "" && lastHash != currentHash {
log.Printf("[DRIFT] Schema changed for %s.%s", schema, table)
return true // Drift detected
}
h.lastSchemaHash[schema+"."+table] = currentHash
return false
}
If schema drift is detected, the table is marked as failed and temporarily skipped.
Graceful Degradation
Tables with repeated failures are temporarily skipped:
// After 5 consecutive failures, skip the table
if h.failedTables[key] >= h.maxFailures {
log.Printf("[SKIPPED] Too many failures for %s.%s", schema, table)
return nil // Skip this event
}
// Reset count on successful operation
h.failedTables[key] = 0
Configuration:
maxFailures: Consecutive failures before skipping (default: 5)
Auto-Reconnect
Connection errors trigger automatic reconnection:
func (h *EventHandlers) reconnect() {
h.secondaryDB.Close()
maxRetries := 5
for i := 0; i < maxRetries; i++ {
h.secondaryDB, err = sql.Open(dsn)
// Configure connection pool
// Ping to verify
if err == nil {
log.Printf("[RECONNECT] Successfully reconnected")
return
}
time.Sleep(time.Duration(i+1) * time.Second)
}
}
Detected connection errors:
- "connection refused"
- "connection reset"
- "broken pipe"
- "timeout"
- "driver: bad connection"
- "invalid connection"
Initial Transfer
When resync is needed (empty replica or no saved position), the service performs an initial data transfer:
Transfer Process
- Detection: Check if secondary database is empty or no position saved
- Database Enumeration: List all databases from primary
- Schema Exclusion: Skip excluded schemas (information_schema, mysql, etc.)
- Table Transfer: For each table:
- Get table schema (column definitions)
- Check row count
- Transfer in chunks using primary key or LIMIT/OFFSET
- Progress Checkpointing: Save progress to JSON file every 10000 rows
- Position Reset: Clear saved binlog position after successful transfer
- Binlog Streaming: Start streaming from current position
Chunked Transfer
Tables are transferred in chunks for efficiency and memory safety:
-- Using primary key (efficient, preserves order)
SELECT * FROM table WHERE pk >= 1000 AND pk < 2000 ORDER BY pk
-- Without primary key (slower, may skip rows on updates)
SELECT * FROM table LIMIT 1000 OFFSET 1000
Batch Size: Configurable (default: 10000 rows per chunk)
Progress Checkpointing
Transfer progress is saved to transfer_progress_{instance}.json:
{
"DatabasesProcessed": 2,
"CurrentDatabase": "mydb",
"TablesProcessed": {
"mydb.users": 5000,
"mydb.orders": 10000
},
"LastCheckpoint": "2024-01-15T10:30:00Z"
}
If the transfer is interrupted, it resumes from the last checkpoint.
Pause/Resume
Transfers can be paused and resumed programmatically:
transfer := NewInitialTransfer(dsn, dsn, 10000, 4)
// Pause during transfer
transfer.Pause()
// Resume later
transfer.Resume()
Configuration Reference
BinlogConfig
type BinlogConfig struct {
Host string // MySQL/MariaDB host
Port int // MySQL/MariaDB port
User string // Replication user
Password string // Password
ServerID uint32 // Unique server ID for replication
Name string // Instance name for logging
}
EventHandlers
type EventHandlers struct {
secondaryDB *sql.DB
tableMapCache map[uint64]*replication.TableMapEvent
sqlBuilder *SQLBuilder
failedTables map[string]int
maxFailures int // Default: 5
retryAttempts int // Default: 3
retryDelay time.Duration // Default: 100ms
lastSchemaHash map[string]string
}
InitialTransfer
type InitialTransfer struct {
primaryDB *sql.DB
secondaryDB *sql.DB
batchSize int // Default: 10000
workerCount int // Default: 4
excludedDBs map[string]bool
checkpointFile string
progress TransferProgress
}
TransferProgress
type TransferProgress struct {
DatabasesProcessed int
CurrentDatabase string
TablesProcessed map[string]int64 // "schema.table" -> rows transferred
LastCheckpoint time.Time
}
TransferStats
type TransferStats struct {
TotalRows int64
TotalTables int
TransferTime int64 // milliseconds
Errors []string
Progress TransferProgress
}
Logging
The service uses structured logging with prefixes:
| Prefix | Example | Description |
|---|---|---|
[INSERT] |
[INSERT] 1 row(s) affected |
Successful INSERT |
[UPDATE] |
[UPDATE] 1 row(s) affected |
Successful UPDATE |
[DELETE] |
[DELETE] 1 row(s) affected |
Successful DELETE |
[SUCCESS] |
[SUCCESS] 1 row(s) affected |
SQL operation succeeded |
[ERROR] |
[ERROR] INSERT failed after retries |
Operation failed |
[WARN] |
[WARN] Schema drift detected |
Warning condition |
[PANIC] |
[PANIC RECOVERED] panic message |
Recovered panic |
[RETRY] |
[RETRY] Retrying in 200ms |
Retry attempt |
[DRIFT] |
[DRIFT] Schema changed |
Schema drift detected |
[SKIPPED] |
[SKIPPED] Too many failures |
Table skipped |
[FAILURE] |
[FAILURE] failure count: 3/5 |
Table failure count |
[HEALTH] |
[HEALTH CHECK] Failed |
Health check result |
[TRANSFER] |
[TRANSFER] Found 5 tables |
Transfer progress |
[INFO] |
[INFO] Saved position |
Informational |
[ROTATE] |
[mariadb-primary] Rotated to binlog.000001 |
Binlog rotation |
[RECONNECT] |
[RECONNECT] Successfully reconnected |
Reconnection result |
Schema Filtering
By default, only events for the "replica" schema are replicated:
// handlers.go line 54
if schemaName != "replica" {
return nil // Skip all other schemas
}
To replicate all schemas, comment out or modify this filter.
Error Handling
Common Errors
| Error | Cause | Resolution |
|---|---|---|
connection refused |
Secondary DB down | Check secondary DB status |
schema drift detected |
Table schema changed | Manual intervention required |
too many failures |
Table repeatedly failing | Check table compatibility |
failed to get event |
Binlog stream interrupted | Service auto-recovers |
expected 4 destination arguments |
SHOW MASTER STATUS columns | Fixed in code |
assignment to entry in nil map |
Uninitialized map | Fixed in code |
Recovery Procedures
-
Secondary DB Connection Lost:
- Service auto-reconnects (up to 5 attempts)
- Check secondary DB logs
- Verify network connectivity
-
Schema Drift Detected:
- Stop service
- Compare schemas:
SHOW CREATE TABLE schema.tableon both DBs - Sync schemas manually
- Reset position:
DELETE FROM binlog_position - Restart service
-
Transfer Interrupted:
- Service resumes from
transfer_progress_{instance}.json - Check checkpoint file for progress
- Delete checkpoint file to restart transfer
- Service resumes from
-
Event Processing Stuck:
- Check health check logs
- Verify binlog position:
SELECT * FROM binlog_position - Restart service if needed
- Clear position:
DELETE FROM binlog_position
Monitoring
Key Metrics to Watch
- Replication Lag: Time since last event processed
- Rows Replicated: Total INSERT/UPDATE/DELETE count
- Tables Synced: Number of tables synchronized
- Error Count: Failed operations counter
- Success Rate: Successful / total operations
Manual Verification
-- Check position on secondary
SELECT * FROM binlog_position;
-- Check row counts
SELECT COUNT(*) FROM replica.your_table;
-- Compare with primary
SELECT COUNT(*) FROM your_table;
Performance Considerations
- Batch Size: Start with 10000, adjust based on table size and memory
- Connection Pooling:
SetMaxOpenConns(25)for moderate load - Worker Count: Default: 4 workers for parallel processing
- Schema Caching: Table schemas cached in memory (auto-updated on drift)
- Index Usage: Chunked transfers require indexed primary key
Limitations
- Multi-threaded: Multiple workers process events in parallel (configurable via TRANSFER_WORKER_COUNT)
- Position-based: No GTID support yet (position-based only)
- Integer PKs: Chunking requires integer primary key for efficiency
- No Conflict Resolution: Concurrent writes not handled
- No Data Validation: Assumes source data is valid
Known Issues & Fixes
Fixed Issues
SHOW MASTER STATUSreturning 4 columns (fixed by scanning 4 variables)nil mappanic in transfer (fixed by initializing TablesProcessed)- Event deduplication skipping valid events (disabled deduplication)
- Checkpoint file path empty (fixed by setting instance-specific path)
Workarounds
- Position 0 Events: Deduplication disabled for now
- Schema Changes: Service marks table as failed, manual restart required
Future Enhancements
- GTID-based replication for better consistency
- Multi-threaded replication for higher throughput
- Conflict detection and resolution
- Prometheus metrics endpoint
- REST API for management
- Kubernetes operator
- Configuration file (YAML/JSON)
- Signal handling for dynamic config
File Structure
replica/
├── main.go # Entry point
├── replica/
│ ├── service.go # Replication orchestration
│ ├── handlers.go # Event processing
│ ├── initial_transfer.go # Bulk data transfer
│ ├── position.go # Position persistence
│ ├── sqlbuilder.go # SQL generation
│ ├── config.go # Configuration types
│ └── logging.go # Structured logging
├── examples/
│ └── binlog-listener/
│ └── main.go # Example binlog listener
├── bin/
│ └── replica # Compiled binary
├── example.env # Environment template
├── .env # Environment (gitignored)
├── docker-compose.yml # Local development
├── go.mod # Go module
└── README.md # This file
License
MIT License