2026-02-12 04:55:32 +01:00
2026-02-12 04:55:32 +01:00
2026-02-12 04:55:32 +01:00
2026-02-12 04:55:32 +01:00
2026-02-12 04:55:32 +01:00
2026-02-12 04:55:32 +01:00
2026-02-12 04:55:32 +01:00
2026-02-12 04:55:32 +01:00
2026-02-12 04:55:32 +01:00
2026-02-12 04:55:32 +01:00

MariaDB/MySQL Binlog Replication Service

A robust MySQL/MariaDB binlog streaming replication service with automatic initial data transfer, resilience features, and comprehensive error handling.

Features

Core Functionality

  • Binlog Streaming: Real-time replication from MySQL/MariaDB binlog events
  • Initial Data Transfer: Automatic bulk data transfer when resync is needed
  • Multi-table Support: Replicates INSERT, UPDATE, DELETE events for multiple tables
  • Position Persistence: Saves and resumes from last processed binlog position
  • Schema Filtering: Only replicates events for the configured schema (default: "replica")

Resilience & Error Handling

  • Panic Recovery: Automatic recovery from unexpected panics in event handlers
  • Retry Logic: Exponential backoff retry for failed SQL operations
  • Connection Health Checks: Periodic connection health monitoring with auto-reconnect
  • Schema Drift Detection: Detects and handles schema changes during replication
  • Graceful Degradation: Skips problematic tables after repeated failures
  • Auto-Reconnect: Automatic reconnection on connection errors

Transfer Features

  • Chunked Transfers: Efficient batch transfers using primary key ranges
  • Progress Checkpointing: Saves transfer progress for resume after interruption
  • Pause/Resume: Support for pausing and resuming initial transfers
  • Transfer Statistics: Detailed logging of transfer progress and errors

Architecture

Components

File Purpose
main.go Application entry point and configuration
service.go BinlogSyncService - core replication orchestration
handlers.go EventHandlers - binlog event processing with resilience
initial_transfer.go InitialTransfer - bulk data transfer management
position.go PositionManager - binlog position persistence
sqlbuilder.go SQLBuilder - SQL statement generation
config.go Configuration types

Data Flow

Primary DB (MySQL/MariaDB)
       |
       v
   Binlog Stream
       |
       v
BinlogSyncService.processEvent()
       |
       +---> Panic Recovery Wrapper
       |
       +---> EventHandlers.HandleRows()
       |          |
       |          +---> Schema Filter (only "replica" schema)
       |          +---> Schema Drift Check
       |          +---> Retry Logic (if needed)
       |          +---> Execute SQL
       |
       +---> PositionManager.Save()
       |
       +---> Health Checks (every 30s)

Usage

Quick Start

# Build the service
go build -o replica

# Run the service
./replica

Configuration

Configuration is done in main.go:

// Primary MySQL/MariaDB connection (binlog source)
primaryDSN := "root:password@tcp(localhost:3306)/?multiStatements=true"

// Secondary (replica) connection
secondaryDSN := "root:password@tcp(localhost:3307)/replica?multiStatements=true"

// MariaDB primary configuration
primaryConfig := BinlogConfig{
    Host:     "localhost",
    Port:     3306,
    User:     "root",
    Password: "password",
    ServerID: 100,
    Name:     "mariadb-primary",
}

// Create service with both connections
service := NewBinlogSyncService(primaryConfig, primaryDB, secondaryDB)

// Start with automatic resync (checks if transfer is needed)
if err := service.StartWithResync(
    batchSize: 1000,           // Rows per transfer chunk
    excludeSchemas: []string{  // Schemas to skip during transfer
        "information_schema",
        "performance_schema",
        "mysql",
        "sys",
    },
); err != nil {
    log.Fatalf("Service error: %v", err)
}

Resilience Features

Panic Recovery

All event handlers include automatic panic recovery:

defer func() {
    if r := recover(); r != nil {
        log.Printf("[PANIC RECOVERED] %v", r)
    }
}()

This prevents a single malformed event from crashing the entire service. Recovery is implemented in:

Retry Logic

Failed SQL operations are retried with exponential backoff:

func (h *EventHandlers) executeWithRetry(query string) error {
    for attempt := 0; attempt <= h.retryAttempts; attempt++ {
        if attempt > 0 {
            delay := h.retryDelay * time.Duration(1<<attempt)
            log.Printf("[RETRY] Retrying in %v (attempt %d/%d)", delay, attempt, h.retryAttempts)
            time.Sleep(delay)
        }
        result, err := h.secondaryDB.Exec(query)
        if err == nil {
            return nil
        }
        // Check if connection error and reconnect
        if h.isConnectionError(err) {
            h.reconnect()
        }
    }
    return lastErr
}

Configuration:

  • retryAttempts: Number of retry attempts (default: 3)
  • retryDelay: Base delay between retries (default: 100ms)

Connection Health Checks

The service performs periodic health checks every 30 seconds:

// Checks:
// 1. Ping secondary database connection
// 2. Verify syncer is still active
// 3. Log health status

If a health check fails, the error is logged. The next SQL operation will trigger an auto-reconnect.

Schema Drift Detection

Before applying events, the service checks for schema changes:

func (h *EventHandlers) detectSchemaDrift(schema, table string) bool {
    currentHash, _ := h.getSchemaHash(schema, table)
    lastHash := h.lastSchemaHash[schema+"."+table]
    
    if lastHash != "" && lastHash != currentHash {
        log.Printf("[DRIFT] Schema changed for %s.%s", schema, table)
        return true  // Drift detected
    }
    h.lastSchemaHash[schema+"."+table] = currentHash
    return false
}

If schema drift is detected, the table is marked as failed and temporarily skipped.

Graceful Degradation

Tables with repeated failures are temporarily skipped:

// After 5 consecutive failures, skip the table
if h.failedTables[key] >= h.maxFailures {
    log.Printf("[SKIPPED] Too many failures for %s.%s", schema, table)
    return nil // Skip this event
}

// Reset count on successful operation
h.failedTables[key] = 0

Configuration:

  • maxFailures: Consecutive failures before skipping (default: 5)

Auto-Reconnect

Connection errors trigger automatic reconnection:

func (h *EventHandlers) reconnect() {
    h.secondaryDB.Close()
    
    maxRetries := 5
    for i := 0; i < maxRetries; i++ {
        h.secondaryDB, err = sql.Open(dsn)
        // Configure connection pool
        // Ping to verify
        if err == nil {
            log.Printf("[RECONNECT] Successfully reconnected")
            return
        }
        time.Sleep(time.Duration(i+1) * time.Second)
    }
}

Detected connection errors:

  • "connection refused"
  • "connection reset"
  • "broken pipe"
  • "timeout"
  • "driver: bad connection"
  • "invalid connection"

Initial Transfer

When resync is needed (empty replica or no saved position), the service performs an initial data transfer:

Transfer Process

  1. Detection: Check if secondary database is empty or no position saved
  2. Database Enumeration: List all databases from primary
  3. Schema Exclusion: Skip excluded schemas (information_schema, mysql, etc.)
  4. Table Transfer: For each table:
    • Get table schema (column definitions)
    • Check row count
    • Transfer in chunks using primary key or LIMIT/OFFSET
  5. Progress Checkpointing: Save progress to JSON file every 1000 rows
  6. Position Reset: Clear saved binlog position after successful transfer
  7. Binlog Streaming: Start streaming from current position

Chunked Transfer

Tables are transferred in chunks for efficiency and memory safety:

-- Using primary key (efficient, preserves order)
SELECT * FROM table WHERE pk >= 1000 AND pk < 2000 ORDER BY pk

-- Without primary key (slower, may skip rows on updates)
SELECT * FROM table LIMIT 1000 OFFSET 1000

Batch Size: Configurable (default: 1000 rows per chunk)

Progress Checkpointing

Transfer progress is saved to transfer_progress_{instance}.json:

{
  "DatabasesProcessed": 2,
  "CurrentDatabase": "mydb",
  "TablesProcessed": {
    "mydb.users": 5000,
    "mydb.orders": 10000
  },
  "LastCheckpoint": "2024-01-15T10:30:00Z"
}

If the transfer is interrupted, it resumes from the last checkpoint.

Pause/Resume

Transfers can be paused and resumed programmatically:

transfer := NewInitialTransfer(dsn, dsn, 1000, 1)

// Pause during transfer
transfer.Pause()

// Resume later
transfer.Resume()

Configuration Reference

BinlogConfig

type BinlogConfig struct {
    Host     string // MySQL/MariaDB host
    Port     int    // MySQL/MariaDB port
    User     string // Replication user
    Password string // Password
    ServerID uint32 // Unique server ID for replication
    Name     string // Instance name for logging
}

EventHandlers

type EventHandlers struct {
    secondaryDB    *sql.DB
    tableMapCache   map[uint64]*replication.TableMapEvent
    sqlBuilder      *SQLBuilder
    failedTables    map[string]int
    maxFailures     int              // Default: 5
    retryAttempts   int              // Default: 3
    retryDelay      time.Duration    // Default: 100ms
    lastSchemaHash  map[string]string
}

InitialTransfer

type InitialTransfer struct {
    primaryDB      *sql.DB
    secondaryDB    *sql.DB
    batchSize      int               // Default: 1000
    workerCount    int               // Default: 1
    excludedDBs    map[string]bool
    checkpointFile string
    progress       TransferProgress
}

TransferProgress

type TransferProgress struct {
    DatabasesProcessed int
    CurrentDatabase    string
    TablesProcessed    map[string]int64  // "schema.table" -> rows transferred
    LastCheckpoint     time.Time
}

TransferStats

type TransferStats struct {
    TotalRows    int64
    TotalTables  int
    TransferTime int64  // milliseconds
    Errors       []string
    Progress     TransferProgress
}

Logging

The service uses structured logging with prefixes:

Prefix Example Description
[INSERT] [INSERT] 1 row(s) affected Successful INSERT
[UPDATE] [UPDATE] 1 row(s) affected Successful UPDATE
[DELETE] [DELETE] 1 row(s) affected Successful DELETE
[SUCCESS] [SUCCESS] 1 row(s) affected SQL operation succeeded
[ERROR] [ERROR] INSERT failed after retries Operation failed
[WARN] [WARN] Schema drift detected Warning condition
[PANIC] [PANIC RECOVERED] panic message Recovered panic
[RETRY] [RETRY] Retrying in 200ms Retry attempt
[DRIFT] [DRIFT] Schema changed Schema drift detected
[SKIPPED] [SKIPPED] Too many failures Table skipped
[FAILURE] [FAILURE] failure count: 3/5 Table failure count
[HEALTH] [HEALTH CHECK] Failed Health check result
[TRANSFER] [TRANSFER] Found 5 tables Transfer progress
[INFO] [INFO] Saved position Informational
[ROTATE] [mariadb-primary] Rotated to binlog.000001 Binlog rotation
[RECONNECT] [RECONNECT] Successfully reconnected Reconnection result

Schema Filtering

By default, only events for the "replica" schema are replicated:

// handlers.go line 54
if schemaName != "replica" {
    return nil  // Skip all other schemas
}

To replicate all schemas, comment out or modify this filter.

Error Handling

Common Errors

Error Cause Resolution
connection refused Secondary DB down Check secondary DB status
schema drift detected Table schema changed Manual intervention required
too many failures Table repeatedly failing Check table compatibility
failed to get event Binlog stream interrupted Service auto-recovers
expected 4 destination arguments SHOW MASTER STATUS columns Fixed in code
assignment to entry in nil map Uninitialized map Fixed in code

Recovery Procedures

  1. Secondary DB Connection Lost:

    • Service auto-reconnects (up to 5 attempts)
    • Check secondary DB logs
    • Verify network connectivity
  2. Schema Drift Detected:

    • Stop service
    • Compare schemas: SHOW CREATE TABLE schema.table on both DBs
    • Sync schemas manually
    • Reset position: DELETE FROM binlog_position
    • Restart service
  3. Transfer Interrupted:

    • Service resumes from transfer_progress_{instance}.json
    • Check checkpoint file for progress
    • Delete checkpoint file to restart transfer
  4. Event Processing Stuck:

    • Check health check logs
    • Verify binlog position: SELECT * FROM binlog_position
    • Restart service if needed
    • Clear position: DELETE FROM binlog_position

Monitoring

Key Metrics to Watch

  • Replication Lag: Time since last event processed
  • Rows Replicated: Total INSERT/UPDATE/DELETE count
  • Tables Synced: Number of tables synchronized
  • Error Count: Failed operations counter
  • Success Rate: Successful / total operations

Manual Verification

-- Check position on secondary
SELECT * FROM binlog_position;

-- Check row counts
SELECT COUNT(*) FROM replica.your_table;

-- Compare with primary
SELECT COUNT(*) FROM your_table;

Performance Considerations

  1. Batch Size: Start with 1000, adjust based on table size and memory
  2. Connection Pooling: SetMaxOpenConns(25) for moderate load
  3. Worker Count: Currently single-threaded (multi-worker planned)
  4. Schema Caching: Table schemas cached in memory (auto-updated on drift)
  5. Index Usage: Chunked transfers require indexed primary key

Limitations

  • Single-threaded: One worker processes events sequentially
  • Position-based: No GTID support yet (position-based only)
  • Integer PKs: Chunking requires integer primary key for efficiency
  • No Conflict Resolution: Concurrent writes not handled
  • No Data Validation: Assumes source data is valid

Known Issues & Fixes

Fixed Issues

  • SHOW MASTER STATUS returning 4 columns (fixed by scanning 4 variables)
  • nil map panic in transfer (fixed by initializing TablesProcessed)
  • Event deduplication skipping valid events (disabled deduplication)
  • Checkpoint file path empty (fixed by setting instance-specific path)

Workarounds

  • Position 0 Events: Deduplication disabled for now
  • Schema Changes: Service marks table as failed, manual restart required

Future Enhancements

  • GTID-based replication for better consistency
  • Multi-threaded replication for higher throughput
  • Conflict detection and resolution
  • Prometheus metrics endpoint
  • REST API for management
  • Kubernetes operator
  • Configuration file (YAML/JSON)
  • Signal handling for dynamic config

File Structure

/home/marek/coding/test/replica/
├── main.go                    # Entry point, connections
├── service.go                 # Replication orchestration
├── handlers.go                # Event processing
├── initial_transfer.go        # Bulk data transfer
├── position.go                # Position persistence
├── sqlbuilder.go              # SQL generation
├── config.go                  # Config types
├── README.md                  # This file
├── docker-compose.yml         # Local development
├── .env                       # Environment (optional)
└── go.mod                     # Go module

License

MIT License

Description
No description provided
Readme 6.9 MiB
v1.0.0 Latest
2026-02-12 04:00:13 +00:00
Languages
Go 100%