first commit
This commit is contained in:
16
pkg/replica/config.go
Normal file
16
pkg/replica/config.go
Normal file
@@ -0,0 +1,16 @@
|
||||
package replica
|
||||
|
||||
// BinlogConfig holds the configuration for connecting to MySQL/MariaDB binlog
|
||||
type BinlogConfig struct {
|
||||
Host string
|
||||
Port uint16
|
||||
User string
|
||||
Password string
|
||||
ServerID uint32
|
||||
Name string // Instance name for logging
|
||||
}
|
||||
|
||||
// MultiSourceConfig holds configuration for multiple MariaDB instances
|
||||
type MultiSourceConfig struct {
|
||||
Instances []BinlogConfig
|
||||
}
|
||||
413
pkg/replica/handlers.go
Normal file
413
pkg/replica/handlers.go
Normal file
@@ -0,0 +1,413 @@
|
||||
package replica
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-mysql-org/go-mysql/replication"
|
||||
)
|
||||
|
||||
// EventHandlers handles binlog event processing with resilience features
|
||||
type EventHandlers struct {
|
||||
secondaryDB *sql.DB
|
||||
tableMapCache map[uint64]*replication.TableMapEvent
|
||||
tableMapMu sync.RWMutex
|
||||
sqlBuilder *SQLBuilder
|
||||
failedTables map[string]int // tableName -> consecutive failure count
|
||||
failedTablesMu sync.RWMutex
|
||||
maxFailures int // max consecutive failures before skipping
|
||||
retryAttempts int // number of retry attempts
|
||||
retryDelay time.Duration // base retry delay
|
||||
lastSchemaHash map[string]string // tableName -> schema hash
|
||||
lastSchemaMu sync.RWMutex
|
||||
}
|
||||
|
||||
// NewEventHandlers creates new event handlers with default resilience settings
|
||||
func NewEventHandlers(secondaryDB *sql.DB) *EventHandlers {
|
||||
return &EventHandlers{
|
||||
secondaryDB: secondaryDB,
|
||||
tableMapCache: make(map[uint64]*replication.TableMapEvent),
|
||||
sqlBuilder: NewSQLBuilder(),
|
||||
failedTables: make(map[string]int),
|
||||
maxFailures: 5,
|
||||
retryAttempts: 3,
|
||||
retryDelay: 100 * time.Millisecond,
|
||||
lastSchemaHash: make(map[string]string),
|
||||
}
|
||||
}
|
||||
|
||||
// HandleRows processes row-level events with panic recovery and retry logic
|
||||
func (h *EventHandlers) HandleRows(header *replication.EventHeader, e *replication.RowsEvent) error {
|
||||
// Panic recovery wrapper
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
Errorf("[PANIC RECOVERED] HandleRows panic: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
tableName := string(e.Table.Table)
|
||||
schemaName := string(e.Table.Schema)
|
||||
|
||||
if schemaName != "replica" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if table is temporarily skipped due to failures
|
||||
if h.isTableSkipped(schemaName, tableName) {
|
||||
Warn("[SKIPPED] Skipping event for %s.%s (too many failures)", schemaName, tableName)
|
||||
return nil
|
||||
}
|
||||
|
||||
eventType := h.getEventTypeName(header.EventType)
|
||||
Info("[%s] %s.%s", eventType, schemaName, tableName)
|
||||
|
||||
if h.secondaryDB != nil {
|
||||
// Schema drift detection
|
||||
if h.detectSchemaDrift(schemaName, tableName) {
|
||||
Warn("[WARN] Schema drift detected for %s.%s, pausing replication", schemaName, tableName)
|
||||
h.markTableFailed(schemaName, tableName)
|
||||
return fmt.Errorf("schema drift detected")
|
||||
}
|
||||
|
||||
h.handleSecondaryReplication(e, header.EventType)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// HandleQuery processes query events with panic recovery
|
||||
func (h *EventHandlers) HandleQuery(e *replication.QueryEvent) error {
|
||||
// Panic recovery wrapper
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
Errorf("[PANIC RECOVERED] HandleQuery panic: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
query := string(e.Query)
|
||||
if h.secondaryDB != nil {
|
||||
_, err := h.secondaryDB.Exec(query)
|
||||
if err != nil {
|
||||
Errorf("[ERROR] Query failed: %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// HandleTableMap caches table map events
|
||||
func (h *EventHandlers) HandleTableMap(e *replication.TableMapEvent) {
|
||||
// Panic recovery wrapper
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
Errorf("[PANIC RECOVERED] HandleTableMap panic: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
tableID := (uint64(e.TableID) << 8) | uint64(e.TableID>>56)
|
||||
h.tableMapMu.Lock()
|
||||
h.tableMapCache[tableID] = e
|
||||
h.tableMapMu.Unlock()
|
||||
}
|
||||
|
||||
// GetTableMap returns the cached table map for a table ID
|
||||
func (h *EventHandlers) GetTableMap(tableID uint64) *replication.TableMapEvent {
|
||||
h.tableMapMu.RLock()
|
||||
defer h.tableMapMu.RUnlock()
|
||||
return h.tableMapCache[tableID]
|
||||
}
|
||||
|
||||
// isTableSkipped checks if a table should be skipped due to too many failures
|
||||
func (h *EventHandlers) isTableSkipped(schema, table string) bool {
|
||||
key := schema + "." + table
|
||||
h.failedTablesMu.RLock()
|
||||
defer h.failedTablesMu.RUnlock()
|
||||
return h.failedTables[key] >= h.maxFailures
|
||||
}
|
||||
|
||||
// markTableFailed records a failure for a table
|
||||
func (h *EventHandlers) markTableFailed(schema, table string) {
|
||||
key := schema + "." + table
|
||||
h.failedTablesMu.Lock()
|
||||
h.failedTables[key]++
|
||||
failCount := h.failedTables[key]
|
||||
h.failedTablesMu.Unlock()
|
||||
|
||||
Warn("[FAILURE] %s.%s failure count: %d/%d", schema, table, failCount, h.maxFailures)
|
||||
}
|
||||
|
||||
// markTableSuccess records a successful operation for a table
|
||||
func (h *EventHandlers) markTableSuccess(schema, table string) {
|
||||
key := schema + "." + table
|
||||
h.failedTablesMu.Lock()
|
||||
h.failedTables[key] = 0 // Reset failure count on success
|
||||
h.failedTablesMu.Unlock()
|
||||
}
|
||||
|
||||
// detectSchemaDrift checks if the table schema has changed
|
||||
func (h *EventHandlers) detectSchemaDrift(schema, table string) bool {
|
||||
key := schema + "." + table
|
||||
|
||||
// Get current schema hash
|
||||
currentHash, err := h.getSchemaHash(schema, table)
|
||||
if err != nil {
|
||||
Warn("[WARN] Could not get schema hash for %s.%s: %v", schema, table, err)
|
||||
return false
|
||||
}
|
||||
|
||||
h.lastSchemaMu.RLock()
|
||||
lastHash, exists := h.lastSchemaHash[key]
|
||||
h.lastSchemaMu.RUnlock()
|
||||
|
||||
if !exists {
|
||||
// First time seeing this table
|
||||
h.lastSchemaMu.Lock()
|
||||
h.lastSchemaHash[key] = currentHash
|
||||
h.lastSchemaMu.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
if lastHash != currentHash {
|
||||
Warn("[DRIFT] Schema changed for %s.%s: %s -> %s", schema, table, lastHash, currentHash)
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// getSchemaHash returns a hash of the table schema
|
||||
func (h *EventHandlers) getSchemaHash(schema, table string) (string, error) {
|
||||
query := fmt.Sprintf(
|
||||
"SELECT MD5(GROUP_CONCAT(COLUMN_NAME, ':', DATA_TYPE, ':', IS_NULLABLE ORDER BY ORDINAL_POSITION)) "+
|
||||
"FROM INFORMATION_SCHEMA.COLUMNS "+
|
||||
"WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s'",
|
||||
schema, table,
|
||||
)
|
||||
|
||||
var hash string
|
||||
err := h.secondaryDB.QueryRow(query).Scan(&hash)
|
||||
return hash, err
|
||||
}
|
||||
|
||||
func (h *EventHandlers) handleSecondaryReplication(e *replication.RowsEvent, eventType replication.EventType) {
|
||||
schemaName := string(e.Table.Schema)
|
||||
tableName := string(e.Table.Table)
|
||||
|
||||
tableID := (uint64(e.TableID) << 8) | uint64(e.TableID>>56)
|
||||
tableMap := h.GetTableMap(tableID)
|
||||
|
||||
if tableMap == nil {
|
||||
tableMap = e.Table
|
||||
}
|
||||
|
||||
if len(tableMap.ColumnName) == 0 {
|
||||
columns, err := h.fetchColumnNames(schemaName, tableName)
|
||||
if err != nil {
|
||||
Errorf("[ERROR] Failed to fetch columns: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
columnBytes := make([][]byte, len(columns))
|
||||
for i, col := range columns {
|
||||
columnBytes[i] = []byte(col)
|
||||
}
|
||||
|
||||
tableMap = &replication.TableMapEvent{
|
||||
Schema: e.Table.Schema,
|
||||
Table: e.Table.Table,
|
||||
ColumnName: columnBytes,
|
||||
}
|
||||
}
|
||||
|
||||
switch eventType {
|
||||
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
|
||||
h.replicateInsert(tableMap, schemaName, tableName, e.Rows)
|
||||
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
|
||||
h.replicateUpdate(tableMap, schemaName, tableName, e.Rows)
|
||||
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
|
||||
h.replicateDelete(tableMap, schemaName, tableName, e.Rows)
|
||||
}
|
||||
}
|
||||
|
||||
// replicateInsert inserts rows with retry logic
|
||||
func (h *EventHandlers) replicateInsert(tableMap *replication.TableMapEvent, schema, table string, rows [][]interface{}) {
|
||||
for _, row := range rows {
|
||||
query := h.sqlBuilder.BuildInsert(schema, table, tableMap, row)
|
||||
|
||||
err := h.executeWithRetry(query)
|
||||
if err != nil {
|
||||
Errorf("[ERROR] INSERT failed after retries: %v", err)
|
||||
h.markTableFailed(schema, table)
|
||||
} else {
|
||||
h.markTableSuccess(schema, table)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// replicateUpdate updates rows with retry logic
|
||||
func (h *EventHandlers) replicateUpdate(tableMap *replication.TableMapEvent, schema, table string, rows [][]interface{}) {
|
||||
for i := 0; i < len(rows); i += 2 {
|
||||
query := h.sqlBuilder.BuildUpdate(schema, table, tableMap, rows[i], rows[i+1])
|
||||
|
||||
err := h.executeWithRetry(query)
|
||||
if err != nil {
|
||||
Errorf("[ERROR] UPDATE failed after retries: %v", err)
|
||||
h.markTableFailed(schema, table)
|
||||
} else {
|
||||
h.markTableSuccess(schema, table)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// replicateDelete deletes rows with retry logic
|
||||
func (h *EventHandlers) replicateDelete(tableMap *replication.TableMapEvent, schema, table string, rows [][]interface{}) {
|
||||
for _, row := range rows {
|
||||
query := h.sqlBuilder.BuildDelete(schema, table, tableMap, row)
|
||||
|
||||
err := h.executeWithRetry(query)
|
||||
if err != nil {
|
||||
Errorf("[ERROR] DELETE failed after retries: %v", err)
|
||||
h.markTableFailed(schema, table)
|
||||
} else {
|
||||
h.markTableSuccess(schema, table)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// executeWithRetry executes a query with exponential backoff retry
|
||||
func (h *EventHandlers) executeWithRetry(query string) error {
|
||||
var lastErr error
|
||||
|
||||
for attempt := 0; attempt <= h.retryAttempts; attempt++ {
|
||||
if attempt > 0 {
|
||||
delay := h.retryDelay * time.Duration(1<<attempt) // exponential backoff
|
||||
Warn("[RETRY] Retrying in %v (attempt %d/%d)", delay, attempt, h.retryAttempts)
|
||||
time.Sleep(delay)
|
||||
}
|
||||
|
||||
result, err := h.secondaryDB.Exec(query)
|
||||
if err == nil {
|
||||
if result != nil {
|
||||
rowsAffected, _ := result.RowsAffected()
|
||||
Info("[SUCCESS] %d row(s) affected", rowsAffected)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
lastErr = err
|
||||
|
||||
// Check if connection is dead
|
||||
if h.isConnectionError(err) {
|
||||
Errorf("[CONNECTION ERROR] Detected connection error: %v", err)
|
||||
h.reconnect()
|
||||
}
|
||||
}
|
||||
|
||||
return lastErr
|
||||
}
|
||||
|
||||
// isConnectionError checks if the error is a connection-related error
|
||||
func (h *EventHandlers) isConnectionError(err error) bool {
|
||||
errStr := err.Error()
|
||||
connectionErrors := []string{
|
||||
"connection refused",
|
||||
"connection reset",
|
||||
"broken pipe",
|
||||
"timeout",
|
||||
"no such host",
|
||||
"network is unreachable",
|
||||
"driver: bad connection",
|
||||
"invalid connection",
|
||||
}
|
||||
|
||||
for _, ce := range connectionErrors {
|
||||
if contains(errStr, ce) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// reconnect attempts to reconnect to the secondary database
|
||||
func (h *EventHandlers) reconnect() {
|
||||
Warn("[RECONNECT] Attempting to reconnect to secondary database...")
|
||||
// Close existing connections
|
||||
h.secondaryDB.Close()
|
||||
|
||||
// Attempt to re-establish connection
|
||||
var err error
|
||||
maxRetries := 5
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
h.secondaryDB, err = sql.Open("mysql", "root:replica@tcp(localhost:3307)/replica?multiStatements=true")
|
||||
if err == nil {
|
||||
h.secondaryDB.SetMaxOpenConns(25)
|
||||
h.secondaryDB.SetMaxIdleConns(5)
|
||||
if err = h.secondaryDB.Ping(); err == nil {
|
||||
Info("[RECONNECT] Successfully reconnected to secondary database")
|
||||
return
|
||||
}
|
||||
}
|
||||
Errorf("[RECONNECT] Reconnection attempt %d/%d failed: %v", i+1, maxRetries, err)
|
||||
time.Sleep(time.Duration(i+1) * time.Second)
|
||||
}
|
||||
Errorf("[RECONNECT] Failed to reconnect after %d attempts", maxRetries)
|
||||
}
|
||||
|
||||
// PingSecondary performs a health check on the secondary connection
|
||||
func (h *EventHandlers) PingSecondary() error {
|
||||
if h.secondaryDB == nil {
|
||||
return fmt.Errorf("secondary DB is nil")
|
||||
}
|
||||
return h.secondaryDB.Ping()
|
||||
}
|
||||
|
||||
func (h *EventHandlers) fetchColumnNames(schema, table string) ([]string, error) {
|
||||
query := fmt.Sprintf(
|
||||
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' ORDER BY ORDINAL_POSITION",
|
||||
schema, table,
|
||||
)
|
||||
rows, err := h.secondaryDB.Query(query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var columns []string
|
||||
for rows.Next() {
|
||||
var colName string
|
||||
if err := rows.Scan(&colName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
columns = append(columns, colName)
|
||||
}
|
||||
|
||||
return columns, rows.Err()
|
||||
}
|
||||
|
||||
func (h *EventHandlers) getEventTypeName(eventType replication.EventType) string {
|
||||
switch eventType {
|
||||
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
|
||||
return "INSERT"
|
||||
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
|
||||
return "UPDATE"
|
||||
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
|
||||
return "DELETE"
|
||||
default:
|
||||
return "UNKNOWN"
|
||||
}
|
||||
}
|
||||
|
||||
// contains checks if a string contains a substring
|
||||
func contains(s, substr string) bool {
|
||||
return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr))
|
||||
}
|
||||
|
||||
func containsHelper(s, substr string) bool {
|
||||
for i := 0; i <= len(s)-len(substr); i++ {
|
||||
if s[i:i+len(substr)] == substr {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
635
pkg/replica/initial_transfer.go
Normal file
635
pkg/replica/initial_transfer.go
Normal file
@@ -0,0 +1,635 @@
|
||||
package replica
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
)
|
||||
|
||||
// TransferStats holds statistics about the transfer
|
||||
type TransferStats struct {
|
||||
TotalRows int64
|
||||
TotalTables int
|
||||
TransferTime int64 // in milliseconds
|
||||
Errors []string
|
||||
Progress TransferProgress
|
||||
}
|
||||
|
||||
// TransferProgress tracks the current position in the transfer
|
||||
type TransferProgress struct {
|
||||
DatabasesProcessed int
|
||||
CurrentDatabase string
|
||||
TablesProcessed map[string]int64 // tableName -> rows transferred
|
||||
LastCheckpoint time.Time
|
||||
}
|
||||
|
||||
// InitialTransfer handles the initial data transfer from primary to secondary
|
||||
type InitialTransfer struct {
|
||||
primaryDB *sql.DB
|
||||
secondaryDB *sql.DB
|
||||
batchSize int
|
||||
workerCount int
|
||||
excludedDBs map[string]bool
|
||||
mu sync.Mutex
|
||||
stats TransferStats
|
||||
checkpointFile string
|
||||
progress TransferProgress
|
||||
pauseChan chan struct{}
|
||||
resumeChan chan struct{}
|
||||
isPaused bool
|
||||
}
|
||||
|
||||
// NewInitialTransfer creates a new initial transfer handler
|
||||
func NewInitialTransfer(primaryDSN, secondaryDSN string, batchSize, workerCount int) (*InitialTransfer, error) {
|
||||
primaryDB, err := sql.Open("mysql", primaryDSN)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to primary: %v", err)
|
||||
}
|
||||
primaryDB.SetMaxOpenConns(batchSize)
|
||||
primaryDB.SetMaxIdleConns(2)
|
||||
|
||||
secondaryDB, err := sql.Open("mysql", secondaryDSN)
|
||||
if err != nil {
|
||||
primaryDB.Close()
|
||||
return nil, fmt.Errorf("failed to connect to secondary: %v", err)
|
||||
}
|
||||
secondaryDB.SetMaxOpenConns(batchSize)
|
||||
secondaryDB.SetMaxIdleConns(2)
|
||||
|
||||
if err := primaryDB.Ping(); err != nil {
|
||||
primaryDB.Close()
|
||||
secondaryDB.Close()
|
||||
return nil, fmt.Errorf("failed to ping primary: %v", err)
|
||||
}
|
||||
|
||||
if err := secondaryDB.Ping(); err != nil {
|
||||
primaryDB.Close()
|
||||
secondaryDB.Close()
|
||||
return nil, fmt.Errorf("failed to ping secondary: %v", err)
|
||||
}
|
||||
|
||||
return &InitialTransfer{
|
||||
primaryDB: primaryDB,
|
||||
secondaryDB: secondaryDB,
|
||||
batchSize: batchSize,
|
||||
workerCount: workerCount,
|
||||
excludedDBs: map[string]bool{
|
||||
"information_schema": true,
|
||||
"performance_schema": true,
|
||||
"mysql": true,
|
||||
"sys": true,
|
||||
},
|
||||
checkpointFile: "transfer_progress.json",
|
||||
progress: TransferProgress{
|
||||
TablesProcessed: make(map[string]int64),
|
||||
},
|
||||
pauseChan: make(chan struct{}),
|
||||
resumeChan: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Transfer executes the initial data transfer
|
||||
func (t *InitialTransfer) Transfer(excludeSchemas []string) error {
|
||||
startTime := time.Now()
|
||||
|
||||
// Load previous progress if exists
|
||||
t.loadProgress()
|
||||
|
||||
// Add user-specified exclusions
|
||||
for _, db := range excludeSchemas {
|
||||
t.excludedDBs[db] = true
|
||||
}
|
||||
|
||||
// Get list of databases to transfer
|
||||
databases, err := t.getDatabases()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get databases: %v", err)
|
||||
}
|
||||
|
||||
Info("[TRANSFER] Starting initial data transfer...")
|
||||
Infof("[TRANSFER] Found %d databases to transfer", len(databases))
|
||||
|
||||
// Get current binlog position before transfer
|
||||
binlogPos, err := t.getBinlogPosition()
|
||||
if err != nil {
|
||||
Warnf("[WARN] Failed to get binlog position: %v", err)
|
||||
}
|
||||
|
||||
for i, dbName := range databases {
|
||||
// Check for pause signal
|
||||
t.checkPause()
|
||||
|
||||
if t.excludedDBs[dbName] {
|
||||
Infof("[TRANSFER] Skipping excluded database: %s", dbName)
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip already processed databases
|
||||
if i < t.progress.DatabasesProcessed {
|
||||
Infof("[TRANSFER] Skipping already processed database: %s", dbName)
|
||||
continue
|
||||
}
|
||||
|
||||
t.progress.CurrentDatabase = dbName
|
||||
t.saveProgress()
|
||||
|
||||
if err := t.transferDatabase(dbName); err != nil {
|
||||
return fmt.Errorf("failed to transfer database %s: %v", dbName, err)
|
||||
}
|
||||
|
||||
t.progress.DatabasesProcessed++
|
||||
t.saveProgress()
|
||||
}
|
||||
|
||||
t.stats.TransferTime = time.Since(startTime).Milliseconds()
|
||||
|
||||
Infof("[TRANSFER] Transfer completed: %d tables, %d rows in %dms",
|
||||
t.stats.TotalTables, t.stats.TotalRows, t.stats.TransferTime)
|
||||
|
||||
if binlogPos != "" {
|
||||
Infof("[TRANSFER] Binlog position before transfer: %s", binlogPos)
|
||||
}
|
||||
|
||||
// Clear progress on successful completion
|
||||
t.clearProgress()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkPause checks if transfer should be paused
|
||||
func (t *InitialTransfer) checkPause() {
|
||||
if t.isPaused {
|
||||
Info("[TRANSFER] Transfer paused, waiting for resume...")
|
||||
<-t.resumeChan
|
||||
Info("[TRANSFER] Transfer resumed")
|
||||
}
|
||||
}
|
||||
|
||||
// Pause pauses the transfer
|
||||
func (t *InitialTransfer) Pause() {
|
||||
if !t.isPaused {
|
||||
t.isPaused = true
|
||||
t.pauseChan <- struct{}{}
|
||||
Info("[TRANSFER] Transfer pause requested")
|
||||
}
|
||||
}
|
||||
|
||||
// Resume resumes the transfer
|
||||
func (t *InitialTransfer) Resume() {
|
||||
if t.isPaused {
|
||||
t.isPaused = false
|
||||
t.resumeChan <- struct{}{}
|
||||
Info("[TRANSFER] Transfer resume requested")
|
||||
}
|
||||
}
|
||||
|
||||
// saveProgress saves the current progress to a checkpoint file
|
||||
func (t *InitialTransfer) saveProgress() {
|
||||
t.progress.LastCheckpoint = time.Now()
|
||||
|
||||
data, err := json.MarshalIndent(t.progress, "", " ")
|
||||
if err != nil {
|
||||
Warnf("[WARN] Failed to marshal progress: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = os.WriteFile(t.checkpointFile, data, 0644)
|
||||
if err != nil {
|
||||
Warnf("[WARN] Failed to save progress: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// loadProgress loads previous progress from checkpoint file
|
||||
func (t *InitialTransfer) loadProgress() {
|
||||
data, err := os.ReadFile(t.checkpointFile)
|
||||
if err != nil {
|
||||
Info("[INFO] No previous progress found, starting fresh")
|
||||
return
|
||||
}
|
||||
|
||||
var progress TransferProgress
|
||||
if err := json.Unmarshal(data, &progress); err != nil {
|
||||
Warnf("[WARN] Failed to load progress: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
t.progress = progress
|
||||
Infof("[INFO] Resuming transfer: %d databases, %d tables in progress",
|
||||
progress.DatabasesProcessed, len(progress.TablesProcessed))
|
||||
}
|
||||
|
||||
// clearProgress removes the checkpoint file
|
||||
func (t *InitialTransfer) clearProgress() {
|
||||
os.Remove(t.checkpointFile)
|
||||
}
|
||||
|
||||
// GetProgress returns the current transfer progress
|
||||
func (t *InitialTransfer) GetProgress() TransferProgress {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
return t.progress
|
||||
}
|
||||
|
||||
// getDatabases returns list of databases to transfer
|
||||
func (t *InitialTransfer) getDatabases() ([]string, error) {
|
||||
rows, err := t.primaryDB.Query("SHOW DATABASES")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var databases []string
|
||||
for rows.Next() {
|
||||
var dbName string
|
||||
if err := rows.Scan(&dbName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
databases = append(databases, dbName)
|
||||
}
|
||||
|
||||
return databases, rows.Err()
|
||||
}
|
||||
|
||||
// transferDatabase transfers all tables in a database
|
||||
func (t *InitialTransfer) transferDatabase(dbName string) error {
|
||||
Infof("[TRANSFER] Transferring database: %s", dbName)
|
||||
|
||||
// Get list of tables
|
||||
tables, err := t.getTables(dbName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get tables: %v", err)
|
||||
}
|
||||
|
||||
Infof("[TRANSFER] Found %d tables in %s", len(tables), dbName)
|
||||
|
||||
for _, table := range tables {
|
||||
// Check for pause signal
|
||||
t.checkPause()
|
||||
|
||||
// Skip already processed tables
|
||||
tableKey := dbName + "." + table
|
||||
if t.progress.TablesProcessed[tableKey] > 0 {
|
||||
Infof("[TRANSFER] Skipping already processed table: %s", tableKey)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := t.transferTable(dbName, table); err != nil {
|
||||
Errorf("[ERROR] Failed to transfer table %s.%s: %v", dbName, table, err)
|
||||
t.mu.Lock()
|
||||
t.stats.Errors = append(t.stats.Errors, fmt.Sprintf("%s.%s: %v", dbName, table, err))
|
||||
t.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getTables returns list of tables in a database
|
||||
func (t *InitialTransfer) getTables(dbName string) ([]string, error) {
|
||||
query := fmt.Sprintf("SHOW TABLES FROM `%s`", dbName)
|
||||
rows, err := t.primaryDB.Query(query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var tables []string
|
||||
for rows.Next() {
|
||||
var tableName string
|
||||
if err := rows.Scan(&tableName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tables = append(tables, tableName)
|
||||
}
|
||||
|
||||
return tables, rows.Err()
|
||||
}
|
||||
|
||||
// transferTable transfers a single table with chunked reads
|
||||
func (t *InitialTransfer) transferTable(dbName, tableName string) error {
|
||||
// Get table structure
|
||||
schema, err := t.getTableSchema(dbName, tableName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get table schema: %v", err)
|
||||
}
|
||||
|
||||
// Check if table has data
|
||||
hasData, err := t.tableHasData(dbName, tableName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check table data: %v", err)
|
||||
}
|
||||
|
||||
if !hasData {
|
||||
Infof("[TRANSFER] Table %s.%s is empty, skipping data transfer", dbName, tableName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get row count
|
||||
count, err := t.getRowCount(dbName, tableName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get row count: %v", err)
|
||||
}
|
||||
|
||||
Infof("[TRANSFER] Transferring %s.%s (%d rows)", dbName, tableName, count)
|
||||
|
||||
// Get primary key or first unique column for chunking
|
||||
pkColumn, err := t.getPrimaryKey(dbName, tableName)
|
||||
if err != nil {
|
||||
Warnf("[WARN] No primary key found for %s.%s, using full scan", dbName, tableName)
|
||||
return t.transferTableFullScan(dbName, tableName, schema, count)
|
||||
}
|
||||
|
||||
return t.transferTableChunked(dbName, tableName, schema, pkColumn, count)
|
||||
}
|
||||
|
||||
// tableHasData checks if a table has any rows
|
||||
func (t *InitialTransfer) tableHasData(dbName, tableName string) (bool, error) {
|
||||
query := fmt.Sprintf("SELECT 1 FROM `%s`.`%s` LIMIT 1", dbName, tableName)
|
||||
var exists int
|
||||
err := t.primaryDB.QueryRow(query).Scan(&exists)
|
||||
if err == sql.ErrNoRows {
|
||||
return false, nil
|
||||
}
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// getRowCount returns the number of rows in a table
|
||||
func (t *InitialTransfer) getRowCount(dbName, tableName string) (int64, error) {
|
||||
query := fmt.Sprintf("SELECT COUNT(*) FROM `%s`.`%s`", dbName, tableName)
|
||||
var count int64
|
||||
err := t.primaryDB.QueryRow(query).Scan(&count)
|
||||
return count, err
|
||||
}
|
||||
|
||||
// getTableSchema returns the column definitions for a table
|
||||
func (t *InitialTransfer) getTableSchema(dbName, tableName string) ([]ColumnInfo, error) {
|
||||
query := fmt.Sprintf(
|
||||
"SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COLUMN_KEY, EXTRA "+
|
||||
"FROM INFORMATION_SCHEMA.COLUMNS "+
|
||||
"WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' "+
|
||||
"ORDER BY ORDINAL_POSITION",
|
||||
dbName, tableName,
|
||||
)
|
||||
|
||||
rows, err := t.primaryDB.Query(query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var columns []ColumnInfo
|
||||
for rows.Next() {
|
||||
var col ColumnInfo
|
||||
if err := rows.Scan(&col.Name, &col.Type, &col.Nullable, &col.Key, &col.Extra); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
columns = append(columns, col)
|
||||
}
|
||||
|
||||
return columns, rows.Err()
|
||||
}
|
||||
|
||||
// ColumnInfo holds information about a table column
|
||||
type ColumnInfo struct {
|
||||
Name string
|
||||
Type string
|
||||
Nullable string
|
||||
Key string
|
||||
Extra string
|
||||
}
|
||||
|
||||
// getPrimaryKey returns the primary key column for a table
|
||||
func (t *InitialTransfer) getPrimaryKey(dbName, tableName string) (string, error) {
|
||||
query := fmt.Sprintf(
|
||||
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE "+
|
||||
"WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND CONSTRAINT_NAME = 'PRIMARY' "+
|
||||
"ORDER BY ORDINAL_POSITION LIMIT 1",
|
||||
dbName, tableName,
|
||||
)
|
||||
|
||||
var pkColumn string
|
||||
err := t.primaryDB.QueryRow(query).Scan(&pkColumn)
|
||||
if err == sql.ErrNoRows {
|
||||
return "", fmt.Errorf("no primary key")
|
||||
}
|
||||
return pkColumn, err
|
||||
}
|
||||
|
||||
// transferTableChunked transfers a table using primary key chunks
|
||||
func (t *InitialTransfer) transferTableChunked(dbName, tableName string, columns []ColumnInfo, pkColumn string, rowCount int64) error {
|
||||
tableKey := dbName + "." + tableName
|
||||
|
||||
// Get starting offset from progress
|
||||
startOffset := t.progress.TablesProcessed[tableKey]
|
||||
|
||||
// Get min and max primary key values
|
||||
minMax, err := t.getMinMaxPK(dbName, tableName, pkColumn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get min/max: %v", err)
|
||||
}
|
||||
|
||||
// Adjust start offset to be within range
|
||||
if startOffset < minMax.Min {
|
||||
startOffset = minMax.Min
|
||||
}
|
||||
|
||||
// Transfer chunks
|
||||
offset := startOffset
|
||||
for offset < minMax.Max {
|
||||
// Check for pause signal
|
||||
t.checkPause()
|
||||
|
||||
query := fmt.Sprintf(
|
||||
"SELECT * FROM `%s`.`%s` WHERE `%s` >= %d AND `%s` < %d ORDER BY `%s`",
|
||||
dbName, tableName, pkColumn, offset, pkColumn, offset+int64(t.batchSize), pkColumn,
|
||||
)
|
||||
|
||||
rows, err := t.primaryDB.Query(query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query chunk: %v", err)
|
||||
}
|
||||
|
||||
rowsInserted, err := t.insertRows(t.secondaryDB, dbName, tableName, columns, rows)
|
||||
rows.Close()
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to insert chunk: %v", err)
|
||||
}
|
||||
|
||||
// Update progress
|
||||
t.mu.Lock()
|
||||
t.progress.TablesProcessed[tableKey] = offset + int64(t.batchSize)
|
||||
t.mu.Unlock()
|
||||
|
||||
// Save checkpoint every 1000 rows
|
||||
if rowsInserted%1000 == 0 {
|
||||
t.saveProgress()
|
||||
}
|
||||
|
||||
offset += int64(t.batchSize)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// MinMax holds min and max values
|
||||
type MinMax struct {
|
||||
Min int64
|
||||
Max int64
|
||||
}
|
||||
|
||||
// getMinMaxPK returns the min and max primary key values
|
||||
func (t *InitialTransfer) getMinMaxPK(dbName, tableName, pkColumn string) (*MinMax, error) {
|
||||
query := fmt.Sprintf(
|
||||
"SELECT COALESCE(MIN(`%s`), 0), COALESCE(MAX(`%s`), 0) FROM `%s`.`%s`",
|
||||
pkColumn, pkColumn, dbName, tableName,
|
||||
)
|
||||
|
||||
var minVal, maxVal int64
|
||||
err := t.primaryDB.QueryRow(query).Scan(&minVal, &maxVal)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &MinMax{Min: minVal, Max: maxVal + 1}, nil
|
||||
}
|
||||
|
||||
// transferTableFullScan transfers a table without primary key (slower, uses LIMIT/OFFSET)
|
||||
func (t *InitialTransfer) transferTableFullScan(dbName, tableName string, columns []ColumnInfo, rowCount int64) error {
|
||||
tableKey := dbName + "." + tableName
|
||||
|
||||
// Get starting offset from progress
|
||||
startOffset := t.progress.TablesProcessed[tableKey]
|
||||
|
||||
var offset int64 = startOffset
|
||||
|
||||
for offset < rowCount {
|
||||
// Check for pause signal
|
||||
t.checkPause()
|
||||
|
||||
query := fmt.Sprintf(
|
||||
"SELECT * FROM `%s`.`%s` LIMIT %d OFFSET %d",
|
||||
dbName, tableName, t.batchSize, offset,
|
||||
)
|
||||
|
||||
rows, err := t.primaryDB.Query(query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query chunk: %v", err)
|
||||
}
|
||||
|
||||
rowsInserted, err := t.insertRows(t.secondaryDB, dbName, tableName, columns, rows)
|
||||
rows.Close()
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to insert chunk: %v", err)
|
||||
}
|
||||
|
||||
// Update progress
|
||||
t.mu.Lock()
|
||||
t.progress.TablesProcessed[tableKey] = offset
|
||||
t.mu.Unlock()
|
||||
|
||||
// Save checkpoint every 1000 rows
|
||||
if rowsInserted%1000 == 0 {
|
||||
t.saveProgress()
|
||||
}
|
||||
|
||||
offset += int64(t.batchSize)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// insertRows inserts rows from a query result into the secondary database
|
||||
func (t *InitialTransfer) insertRows(db *sql.DB, dbName, tableName string, columns []ColumnInfo, rows *sql.Rows) (int64, error) {
|
||||
// Build INSERT statement
|
||||
placeholders := make([]string, len(columns))
|
||||
colNames := make([]string, len(columns))
|
||||
for i := range columns {
|
||||
placeholders[i] = "?"
|
||||
colNames[i] = fmt.Sprintf("`%s`", columns[i].Name)
|
||||
}
|
||||
|
||||
insertSQL := fmt.Sprintf(
|
||||
"INSERT INTO `%s`.`%s` (%s) VALUES (%s)",
|
||||
dbName, tableName, strings.Join(colNames, ", "), strings.Join(placeholders, ", "),
|
||||
)
|
||||
|
||||
// Prepare statement
|
||||
stmt, err := db.Prepare(insertSQL)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to prepare statement: %v", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
// Insert rows
|
||||
rowCount := int64(0)
|
||||
for rows.Next() {
|
||||
values := make([]interface{}, len(columns))
|
||||
valuePtrs := make([]interface{}, len(columns))
|
||||
|
||||
for i := range values {
|
||||
valuePtrs[i] = &values[i]
|
||||
}
|
||||
|
||||
if err := rows.Scan(valuePtrs...); err != nil {
|
||||
return rowCount, fmt.Errorf("failed to scan row: %v", err)
|
||||
}
|
||||
|
||||
_, err := stmt.Exec(values...)
|
||||
if err != nil {
|
||||
return rowCount, fmt.Errorf("failed to insert row: %v", err)
|
||||
}
|
||||
rowCount++
|
||||
}
|
||||
|
||||
t.mu.Lock()
|
||||
t.stats.TotalRows += rowCount
|
||||
t.stats.TotalTables++
|
||||
t.mu.Unlock()
|
||||
|
||||
Infof("[TRANSFER] Inserted %d rows into %s.%s", rowCount, dbName, tableName)
|
||||
|
||||
return rowCount, rows.Err()
|
||||
}
|
||||
|
||||
// getBinlogPosition returns the current binlog position
|
||||
func (t *InitialTransfer) getBinlogPosition() (string, error) {
|
||||
var file string
|
||||
var pos uint32
|
||||
var doDB, ignoreDB string // Ignore extra columns from SHOW MASTER STATUS
|
||||
|
||||
err := t.primaryDB.QueryRow("SHOW MASTER STATUS").Scan(&file, &pos, &doDB, &ignoreDB)
|
||||
if err == sql.ErrNoRows {
|
||||
return "", nil
|
||||
}
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s:%d", file, pos), nil
|
||||
}
|
||||
|
||||
// Close closes the database connections
|
||||
func (t *InitialTransfer) Close() error {
|
||||
if err := t.primaryDB.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return t.secondaryDB.Close()
|
||||
}
|
||||
|
||||
// GetStats returns the transfer statistics
|
||||
func (t *InitialTransfer) GetStats() TransferStats {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
return t.stats
|
||||
}
|
||||
402
pkg/replica/logging.go
Normal file
402
pkg/replica/logging.go
Normal file
@@ -0,0 +1,402 @@
|
||||
package replica
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// LogLevel represents the logging level
|
||||
type LogLevel slog.Level
|
||||
|
||||
const (
|
||||
LevelDebug LogLevel = LogLevel(slog.LevelDebug)
|
||||
LevelInfo LogLevel = LogLevel(slog.LevelInfo)
|
||||
LevelWarn LogLevel = LogLevel(slog.LevelWarn)
|
||||
LevelError LogLevel = LogLevel(slog.LevelError)
|
||||
)
|
||||
|
||||
// GraylogConfig holds configuration for Graylog integration
|
||||
type GraylogConfig struct {
|
||||
Endpoint string // Graylog GELF endpoint (e.g., "localhost:12201")
|
||||
Protocol string // "udp" or "tcp"
|
||||
Timeout time.Duration // Connection timeout
|
||||
Source string // Source name for logs
|
||||
ExtraFields map[string]interface{} // Additional fields to include
|
||||
}
|
||||
|
||||
// Logger is a structured logger with optional Graylog support
|
||||
type Logger struct {
|
||||
logger *slog.Logger
|
||||
graylogMu sync.RWMutex
|
||||
graylog *GraylogClient
|
||||
level LogLevel
|
||||
source string
|
||||
}
|
||||
|
||||
// GraylogClient handles sending logs to Graylog
|
||||
type GraylogClient struct {
|
||||
conn net.Conn
|
||||
source string
|
||||
extra map[string]interface{}
|
||||
mu sync.Mutex
|
||||
connected bool
|
||||
}
|
||||
|
||||
// GELFMessage represents a GELF message structure
|
||||
type GELFMessage struct {
|
||||
Version string `json:"version"`
|
||||
Host string `json:"host"`
|
||||
ShortMessage string `json:"short_message"`
|
||||
FullMessage string `json:"full_message,omitempty"`
|
||||
Timestamp float64 `json:"timestamp"`
|
||||
Level int `json:"level"`
|
||||
Source string `json:"source,omitempty"`
|
||||
Extra map[string]interface{} `json:"-"`
|
||||
}
|
||||
|
||||
// NewGraylogClient creates a new Graylog client
|
||||
func NewGraylogClient(cfg GraylogConfig) (*GraylogClient, error) {
|
||||
var conn net.Conn
|
||||
var err error
|
||||
|
||||
protocol := cfg.Protocol
|
||||
if protocol == "" {
|
||||
protocol = "udp"
|
||||
}
|
||||
|
||||
conn, err = net.DialTimeout(protocol, cfg.Endpoint, cfg.Timeout)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to Graylog: %w", err)
|
||||
}
|
||||
|
||||
return &GraylogClient{
|
||||
conn: conn,
|
||||
source: cfg.Source,
|
||||
extra: cfg.ExtraFields,
|
||||
connected: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// send sends a GELF message to Graylog
|
||||
func (g *GraylogClient) send(msg *GELFMessage) error {
|
||||
g.mu.Lock()
|
||||
defer g.mu.Unlock()
|
||||
|
||||
if !g.connected {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add extra fields
|
||||
if g.extra != nil {
|
||||
msg.Extra = g.extra
|
||||
}
|
||||
|
||||
data, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal GELF message: %w", err)
|
||||
}
|
||||
|
||||
// Add null byte as delimiter for UDP
|
||||
var delimiter byte
|
||||
if udpConn, ok := g.conn.(*net.UDPConn); ok {
|
||||
delimiter = 0
|
||||
_, err = udpConn.Write(append(data, delimiter))
|
||||
} else {
|
||||
_, err = g.conn.Write(data)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
g.connected = false
|
||||
return fmt.Errorf("failed to send to Graylog: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the Graylog connection
|
||||
func (g *GraylogClient) Close() error {
|
||||
g.mu.Lock()
|
||||
defer g.mu.Unlock()
|
||||
|
||||
if g.conn != nil {
|
||||
g.connected = false
|
||||
return g.conn.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewLogger creates a new logger instance
|
||||
func NewLogger() *Logger {
|
||||
// Create slog handler with JSON output for structured logging
|
||||
handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
|
||||
Level: slog.LevelInfo,
|
||||
})
|
||||
|
||||
return &Logger{
|
||||
logger: slog.New(handler),
|
||||
level: LevelInfo,
|
||||
source: "binlog-sync",
|
||||
}
|
||||
}
|
||||
|
||||
// NewLoggerWithLevel creates a new logger with specified level
|
||||
func NewLoggerWithLevel(level LogLevel) *Logger {
|
||||
handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
|
||||
Level: slog.Level(level),
|
||||
})
|
||||
|
||||
return &Logger{
|
||||
logger: slog.New(handler),
|
||||
level: level,
|
||||
source: "binlog-sync",
|
||||
}
|
||||
}
|
||||
|
||||
// SetupGraylog configures Graylog integration
|
||||
func (l *Logger) SetupGraylog(cfg GraylogConfig) error {
|
||||
client, err := NewGraylogClient(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
l.graylogMu.Lock()
|
||||
l.graylog = client
|
||||
l.graylogMu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// With creates a new logger with additional context
|
||||
func (l *Logger) With(args ...any) *Logger {
|
||||
return &Logger{
|
||||
logger: l.logger.With(args...),
|
||||
level: l.level,
|
||||
source: l.source,
|
||||
}
|
||||
}
|
||||
|
||||
// Debug logs a debug message
|
||||
func (l *Logger) Debug(msg string, args ...any) {
|
||||
l.logger.Debug(msg, args...)
|
||||
l.sendToGraylog(slog.LevelDebug, msg, args)
|
||||
}
|
||||
|
||||
// Info logs an info message
|
||||
func (l *Logger) Info(msg string, args ...any) {
|
||||
l.logger.Info(msg, args...)
|
||||
l.sendToGraylog(slog.LevelInfo, msg, args)
|
||||
}
|
||||
|
||||
// Warn logs a warning message
|
||||
func (l *Logger) Warn(msg string, args ...any) {
|
||||
l.logger.Warn(msg, args...)
|
||||
l.sendToGraylog(slog.LevelWarn, msg, args)
|
||||
}
|
||||
|
||||
// Error logs an error message
|
||||
func (l *Logger) Error(msg string, args ...any) {
|
||||
l.logger.Error(msg, args...)
|
||||
l.sendToGraylog(slog.LevelError, msg, args)
|
||||
}
|
||||
|
||||
// Fatal logs a fatal message and exits
|
||||
func (l *Logger) Fatal(msg string, args ...any) {
|
||||
l.logger.Error(msg, args...)
|
||||
l.sendToGraylog(slog.LevelError, msg, args)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Debugf logs a formatted debug message
|
||||
func (l *Logger) Debugf(format string, args ...any) {
|
||||
msg := fmt.Sprintf(format, args...)
|
||||
l.logger.Debug(msg)
|
||||
l.sendToGraylog(slog.LevelDebug, msg, nil)
|
||||
}
|
||||
|
||||
// Infof logs a formatted info message
|
||||
func (l *Logger) Infof(format string, args ...any) {
|
||||
msg := fmt.Sprintf(format, args...)
|
||||
l.logger.Info(msg)
|
||||
l.sendToGraylog(slog.LevelInfo, msg, nil)
|
||||
}
|
||||
|
||||
// Warnf logs a formatted warning message
|
||||
func (l *Logger) Warnf(format string, args ...any) {
|
||||
msg := fmt.Sprintf(format, args...)
|
||||
l.logger.Warn(msg)
|
||||
l.sendToGraylog(slog.LevelWarn, msg, nil)
|
||||
}
|
||||
|
||||
// Errorf logs a formatted error message
|
||||
func (l *Logger) Errorf(format string, args ...any) {
|
||||
msg := fmt.Sprintf(format, args...)
|
||||
l.logger.Error(msg)
|
||||
l.sendToGraylog(slog.LevelError, msg, nil)
|
||||
}
|
||||
|
||||
// Fatalf logs a formatted fatal message and exits
|
||||
func (l *Logger) Fatalf(format string, args ...any) {
|
||||
msg := fmt.Sprintf(format, args...)
|
||||
l.logger.Error(msg)
|
||||
l.sendToGraylog(slog.LevelError, msg, nil)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Log logs a message at the specified level
|
||||
func (l *Logger) Log(level LogLevel, msg string, args ...any) {
|
||||
switch level {
|
||||
case LevelDebug:
|
||||
l.logger.Debug(msg, args...)
|
||||
case LevelInfo:
|
||||
l.logger.Info(msg, args...)
|
||||
case LevelWarn:
|
||||
l.logger.Warn(msg, args...)
|
||||
case LevelError:
|
||||
l.logger.Error(msg, args...)
|
||||
}
|
||||
l.sendToGraylog(slog.Level(level), msg, args)
|
||||
}
|
||||
|
||||
// sendToGraylog sends a log message to Graylog if configured
|
||||
func (l *Logger) sendToGraylog(level slog.Level, msg string, args []any) {
|
||||
l.graylogMu.RLock()
|
||||
defer l.graylogMu.RUnlock()
|
||||
|
||||
if l.graylog == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Convert args to structured data
|
||||
extra := make(map[string]interface{})
|
||||
if args != nil {
|
||||
for i := 0; i < len(args)-1; i += 2 {
|
||||
if key, ok := args[i].(string); ok {
|
||||
extra[key] = args[i+1]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
gelfMsg := &GELFMessage{
|
||||
Version: "1.1",
|
||||
Host: l.source,
|
||||
ShortMessage: msg,
|
||||
Timestamp: float64(time.Now().UnixNano()) / 1e9,
|
||||
Level: int(level) + 1, // GELF levels: 1=emerg, 2=alert, 3=crit, 4=err, 5=warn, 6=notice, 7=info, 8=debug
|
||||
Source: l.source,
|
||||
Extra: extra,
|
||||
}
|
||||
|
||||
// Send in background to not block
|
||||
go func() {
|
||||
_ = l.graylog.send(gelfMsg)
|
||||
}()
|
||||
}
|
||||
|
||||
// Close closes the logger and any underlying connections
|
||||
func (l *Logger) Close() {
|
||||
l.graylogMu.Lock()
|
||||
defer l.graylogMu.Unlock()
|
||||
|
||||
if l.graylog != nil {
|
||||
l.graylog.Close()
|
||||
l.graylog = nil
|
||||
}
|
||||
}
|
||||
|
||||
// GetLogger returns the underlying slog.Logger
|
||||
func (l *Logger) GetLogger() *slog.Logger {
|
||||
return l.logger
|
||||
}
|
||||
|
||||
// Handler returns a slog.Handler for use with other libraries
|
||||
func (l *Logger) Handler() slog.Handler {
|
||||
return l.logger.Handler()
|
||||
}
|
||||
|
||||
// Global logger instance - renamed to avoid conflict with log package
|
||||
var globalLogger *Logger
|
||||
|
||||
// Init initializes the global logger
|
||||
func InitLogger() {
|
||||
globalLogger = NewLogger()
|
||||
}
|
||||
|
||||
// InitLoggerWithLevel initializes the global logger with a specific level
|
||||
func InitLoggerWithLevel(level LogLevel) {
|
||||
globalLogger = NewLoggerWithLevel(level)
|
||||
}
|
||||
|
||||
// SetupGlobalGraylog configures Graylog for the global logger
|
||||
func SetupGlobalGraylog(cfg GraylogConfig) error {
|
||||
return globalLogger.SetupGraylog(cfg)
|
||||
}
|
||||
|
||||
// GetLogger returns the global logger instance
|
||||
func GetLogger() *Logger {
|
||||
if globalLogger == nil {
|
||||
InitLogger()
|
||||
}
|
||||
return globalLogger
|
||||
}
|
||||
|
||||
// Convenience functions using global logger
|
||||
|
||||
// Debug logs a debug message
|
||||
func Debug(msg string, args ...any) {
|
||||
GetLogger().Debug(msg, args...)
|
||||
}
|
||||
|
||||
// Info logs an info message
|
||||
func Info(msg string, args ...any) {
|
||||
GetLogger().Info(msg, args...)
|
||||
}
|
||||
|
||||
// Warn logs a warning message
|
||||
func Warn(msg string, args ...any) {
|
||||
GetLogger().Warn(msg, args...)
|
||||
}
|
||||
|
||||
// Error logs an error message
|
||||
func Error(msg string, args ...any) {
|
||||
GetLogger().Error(msg, args...)
|
||||
}
|
||||
|
||||
// Fatal logs a fatal message and exits
|
||||
func Fatal(msg string, args ...any) {
|
||||
GetLogger().Fatal(msg, args...)
|
||||
}
|
||||
|
||||
// Debugf logs a formatted debug message
|
||||
func Debugf(format string, args ...any) {
|
||||
GetLogger().Debugf(format, args...)
|
||||
}
|
||||
|
||||
// Infof logs a formatted info message
|
||||
func Infof(format string, args ...any) {
|
||||
GetLogger().Infof(format, args...)
|
||||
}
|
||||
|
||||
// Warnf logs a formatted warning message
|
||||
func Warnf(format string, args ...any) {
|
||||
GetLogger().Warnf(format, args...)
|
||||
}
|
||||
|
||||
// Errorf logs a formatted error message
|
||||
func Errorf(format string, args ...any) {
|
||||
GetLogger().Errorf(format, args...)
|
||||
}
|
||||
|
||||
// Fatalf logs a formatted fatal message and exits
|
||||
func Fatalf(format string, args ...any) {
|
||||
GetLogger().Fatalf(format, args...)
|
||||
}
|
||||
|
||||
// Log logs a message at the specified level
|
||||
func Log(level LogLevel, msg string, args ...any) {
|
||||
GetLogger().Log(level, msg, args...)
|
||||
}
|
||||
140
pkg/replica/position.go
Normal file
140
pkg/replica/position.go
Normal file
@@ -0,0 +1,140 @@
|
||||
package replica
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"os"
|
||||
|
||||
"github.com/go-mysql-org/go-mysql/mysql"
|
||||
)
|
||||
|
||||
// PositionManager handles binlog position persistence
|
||||
type PositionManager struct {
|
||||
db *sql.DB
|
||||
positionFile string
|
||||
}
|
||||
|
||||
// NewPositionManager creates a new position manager
|
||||
func NewPositionManager(db *sql.DB, positionFile string) *PositionManager {
|
||||
return &PositionManager{
|
||||
db: db,
|
||||
positionFile: positionFile,
|
||||
}
|
||||
}
|
||||
|
||||
// InitTable creates the binlog_position table if it doesn't exist
|
||||
func (pm *PositionManager) InitTable() error {
|
||||
if pm.db == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
createTableSQL := `
|
||||
CREATE TABLE IF NOT EXISTS binlog_position (
|
||||
id INT PRIMARY KEY,
|
||||
log_file VARCHAR(255),
|
||||
log_pos BIGINT,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
|
||||
)
|
||||
`
|
||||
|
||||
_, err := pm.db.Exec(createTableSQL)
|
||||
if err != nil {
|
||||
Errorf("[ERROR] Failed to create binlog_position table: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Load loads the last saved position from database or file
|
||||
func (pm *PositionManager) Load() (mysql.Position, error) {
|
||||
if pm.db != nil {
|
||||
return pm.loadFromDB()
|
||||
}
|
||||
return pm.loadFromFile()
|
||||
}
|
||||
|
||||
func (pm *PositionManager) loadFromDB() (mysql.Position, error) {
|
||||
var name string
|
||||
var pos uint32
|
||||
|
||||
err := pm.db.QueryRow(
|
||||
"SELECT log_file, log_pos FROM binlog_position WHERE id = 1",
|
||||
).Scan(&name, &pos)
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
return mysql.Position{Name: "", Pos: 0}, nil
|
||||
}
|
||||
if err != nil {
|
||||
return mysql.Position{Name: "", Pos: 0}, err
|
||||
}
|
||||
|
||||
Infof("[INFO] Loaded position: %s:%d", name, pos)
|
||||
return mysql.Position{Name: name, Pos: pos}, nil
|
||||
}
|
||||
|
||||
func (pm *PositionManager) loadFromFile() (mysql.Position, error) {
|
||||
data, err := os.ReadFile(pm.positionFile)
|
||||
if err != nil {
|
||||
return mysql.Position{Name: "", Pos: 0}, nil
|
||||
}
|
||||
|
||||
var pos mysql.Position
|
||||
err = json.Unmarshal(data, &pos)
|
||||
return pos, err
|
||||
}
|
||||
|
||||
// Save saves the current position to database or file
|
||||
func (pm *PositionManager) Save(pos mysql.Position) error {
|
||||
if pm.db != nil {
|
||||
return pm.saveToDB(pos)
|
||||
}
|
||||
return pm.saveToFile(pos)
|
||||
}
|
||||
|
||||
// Reset clears the saved position
|
||||
func (pm *PositionManager) Reset() error {
|
||||
if pm.db != nil {
|
||||
return pm.resetInDB()
|
||||
}
|
||||
return pm.resetInFile()
|
||||
}
|
||||
|
||||
func (pm *PositionManager) resetInDB() error {
|
||||
_, err := pm.db.Exec("DELETE FROM binlog_position WHERE id = 1")
|
||||
return err
|
||||
}
|
||||
|
||||
func (pm *PositionManager) resetInFile() error {
|
||||
return os.Remove(pm.positionFile)
|
||||
}
|
||||
|
||||
func (pm *PositionManager) saveToDB(pos mysql.Position) error {
|
||||
result, err := pm.db.Exec(
|
||||
"INSERT INTO binlog_position (id, log_file, log_pos) VALUES (1, ?, ?) "+
|
||||
"ON DUPLICATE KEY UPDATE log_file = VALUES(log_file), log_pos = VALUES(log_pos)",
|
||||
pos.Name, pos.Pos,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
Errorf("[ERROR] Failed to save position: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
rowsAffected, _ := result.RowsAffected()
|
||||
Infof("[INFO] Saved position: %s:%d (rows: %d)", pos.Name, pos.Pos, rowsAffected)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pm *PositionManager) saveToFile(pos mysql.Position) error {
|
||||
data, err := json.Marshal(pos)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return os.WriteFile(pm.positionFile, data, 0644)
|
||||
}
|
||||
|
||||
// GetPositionFile returns the position file path
|
||||
func (pm *PositionManager) GetPositionFile() string {
|
||||
return pm.positionFile
|
||||
}
|
||||
333
pkg/replica/service.go
Normal file
333
pkg/replica/service.go
Normal file
@@ -0,0 +1,333 @@
|
||||
package replica
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/go-mysql-org/go-mysql/mysql"
|
||||
"github.com/go-mysql-org/go-mysql/replication"
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
)
|
||||
|
||||
// BinlogSyncService handles MySQL binlog streaming with resilience features
|
||||
type BinlogSyncService struct {
|
||||
syncer *replication.BinlogSyncer
|
||||
streamer *replication.BinlogStreamer
|
||||
position mysql.Position
|
||||
handlers *EventHandlers
|
||||
positionMgr *PositionManager
|
||||
stopChan chan struct{}
|
||||
instanceName string
|
||||
primaryDB *sql.DB
|
||||
lastEventPos uint32 // for deduplication
|
||||
lastEventGTID string // for GTID-based deduplication (future)
|
||||
healthTicker *time.Ticker // for periodic health checks
|
||||
}
|
||||
|
||||
// NewBinlogSyncService creates a new binlog sync service
|
||||
func NewBinlogSyncService(cfg BinlogConfig, primaryDB, secondaryDB *sql.DB) *BinlogSyncService {
|
||||
syncerCfg := replication.BinlogSyncerConfig{
|
||||
ServerID: cfg.ServerID,
|
||||
Flavor: "mariadb",
|
||||
Host: cfg.Host,
|
||||
Port: cfg.Port,
|
||||
User: cfg.User,
|
||||
Password: cfg.Password,
|
||||
}
|
||||
|
||||
return &BinlogSyncService{
|
||||
syncer: replication.NewBinlogSyncer(syncerCfg),
|
||||
handlers: NewEventHandlers(secondaryDB),
|
||||
positionMgr: NewPositionManager(secondaryDB, fmt.Sprintf("binlog_position_%s.json", cfg.Name)),
|
||||
stopChan: make(chan struct{}),
|
||||
instanceName: cfg.Name,
|
||||
primaryDB: primaryDB,
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins binlog streaming
|
||||
func (s *BinlogSyncService) Start() error {
|
||||
if err := s.positionMgr.InitTable(); err != nil {
|
||||
Warnf("[WARN] Failed to init position table: %v", err)
|
||||
}
|
||||
|
||||
pos, err := s.positionMgr.Load()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load position: %v", err)
|
||||
}
|
||||
|
||||
if pos.Name == "" {
|
||||
s.streamer, err = s.syncer.StartSync(mysql.Position{Name: "", Pos: 4})
|
||||
} else {
|
||||
s.streamer, err = s.syncer.StartSync(pos)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start sync: %v", err)
|
||||
}
|
||||
|
||||
Infof("[%s] Started streaming from %s:%d", s.instanceName, pos.Name, pos.Pos)
|
||||
return s.processEvents()
|
||||
}
|
||||
|
||||
// StartWithStop begins binlog streaming with graceful shutdown
|
||||
func (s *BinlogSyncService) StartWithStop() error {
|
||||
if err := s.positionMgr.InitTable(); err != nil {
|
||||
Warnf("[WARN] Failed to init position table: %v", err)
|
||||
}
|
||||
|
||||
pos, err := s.positionMgr.Load()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load position: %v", err)
|
||||
}
|
||||
|
||||
if pos.Name == "" {
|
||||
s.streamer, err = s.syncer.StartSync(mysql.Position{Name: "", Pos: 4})
|
||||
} else {
|
||||
s.streamer, err = s.syncer.StartSync(pos)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start sync: %v", err)
|
||||
}
|
||||
|
||||
Infof("[%s] Started streaming from %s:%d", s.instanceName, pos.Name, pos.Pos)
|
||||
return s.processEventsWithStop()
|
||||
}
|
||||
|
||||
// processEventsWithStop processes events with graceful shutdown and health checks
|
||||
func (s *BinlogSyncService) processEventsWithStop() error {
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
// Start health check ticker (every 30 seconds)
|
||||
s.healthTicker = time.NewTicker(30 * time.Second)
|
||||
defer s.healthTicker.Stop()
|
||||
|
||||
// Start health check goroutine
|
||||
healthChan := make(chan error, 1)
|
||||
go s.runHealthChecks(healthChan)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-sigChan:
|
||||
Infof("[%s] Shutting down...", s.instanceName)
|
||||
s.positionMgr.Save(s.position)
|
||||
s.syncer.Close()
|
||||
return nil
|
||||
case <-s.stopChan:
|
||||
Infof("[%s] Stop signal received", s.instanceName)
|
||||
s.positionMgr.Save(s.position)
|
||||
s.syncer.Close()
|
||||
return nil
|
||||
case <-s.healthTicker.C:
|
||||
// Trigger health check
|
||||
go s.runHealthChecks(healthChan)
|
||||
case healthErr := <-healthChan:
|
||||
if healthErr != nil {
|
||||
Errorf("[HEALTH CHECK] Failed: %v", healthErr)
|
||||
}
|
||||
default:
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
ev, err := s.streamer.GetEvent(ctx)
|
||||
cancel()
|
||||
|
||||
if err == context.DeadlineExceeded {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get event: %v", err)
|
||||
}
|
||||
|
||||
s.processEvent(ev)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runHealthChecks performs periodic health checks
|
||||
func (s *BinlogSyncService) runHealthChecks(resultChan chan<- error) {
|
||||
// Check secondary DB connection
|
||||
if err := s.handlers.PingSecondary(); err != nil {
|
||||
resultChan <- fmt.Errorf("secondary DB ping failed: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if syncer is still healthy
|
||||
if s.syncer == nil {
|
||||
resultChan <- fmt.Errorf("syncer is nil")
|
||||
return
|
||||
}
|
||||
|
||||
resultChan <- nil
|
||||
}
|
||||
|
||||
// processEvents processes events continuously
|
||||
func (s *BinlogSyncService) processEvents() error {
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
ev, err := s.streamer.GetEvent(ctx)
|
||||
cancel()
|
||||
|
||||
if err == context.DeadlineExceeded {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get event: %v", err)
|
||||
}
|
||||
|
||||
s.processEvent(ev)
|
||||
}
|
||||
}
|
||||
|
||||
// processEvent processes a single event with panic recovery
|
||||
// Note: Deduplication disabled as it was causing valid events to be skipped
|
||||
func (s *BinlogSyncService) processEvent(ev *replication.BinlogEvent) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
Errorf("[PANIC RECOVERED] processEvent panic: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
s.position.Pos = ev.Header.LogPos
|
||||
|
||||
switch e := ev.Event.(type) {
|
||||
case *replication.RotateEvent:
|
||||
s.position.Name = string(e.NextLogName)
|
||||
Infof("[%s] Rotated to %s", s.instanceName, s.position.Name)
|
||||
|
||||
case *replication.TableMapEvent:
|
||||
s.handlers.HandleTableMap(e)
|
||||
|
||||
case *replication.RowsEvent:
|
||||
s.handlers.HandleRows(ev.Header, e)
|
||||
|
||||
case *replication.QueryEvent:
|
||||
s.handlers.HandleQuery(e)
|
||||
|
||||
case *replication.XIDEvent:
|
||||
s.positionMgr.Save(s.position)
|
||||
}
|
||||
|
||||
if ev.Header.LogPos%1000 == 0 {
|
||||
s.positionMgr.Save(s.position)
|
||||
}
|
||||
}
|
||||
|
||||
// Stop signals the service to stop
|
||||
func (s *BinlogSyncService) Stop() {
|
||||
close(s.stopChan)
|
||||
}
|
||||
|
||||
// NeedResync checks if a resync is needed
|
||||
func (s *BinlogSyncService) NeedResync() (bool, error) {
|
||||
// Check if position file/table exists
|
||||
pos, err := s.positionMgr.Load()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// If no position saved, we need initial transfer
|
||||
if pos.Name == "" {
|
||||
Infof("[%s] No saved position found, resync needed", s.instanceName)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Check if secondary DB has any tables/data
|
||||
hasData, err := s.checkSecondaryHasData()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if !hasData {
|
||||
Infof("[%s] Secondary database is empty, resync needed", s.instanceName)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// checkSecondaryHasData checks if the secondary database has any data
|
||||
func (s *BinlogSyncService) checkSecondaryHasData() (bool, error) {
|
||||
// Get the secondary DB from position manager
|
||||
db := s.positionMgr.db
|
||||
if db == nil {
|
||||
return true, nil // Can't check, assume OK
|
||||
}
|
||||
|
||||
// Check if any table has data
|
||||
var count int
|
||||
err := db.QueryRow("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'replica'").Scan(&count)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return count > 0, nil
|
||||
}
|
||||
|
||||
// RunInitialTransfer performs the initial data transfer
|
||||
func (s *BinlogSyncService) RunInitialTransfer(batchSize int, excludeSchemas []string) error {
|
||||
Infof("[%s] Starting initial data transfer...", s.instanceName)
|
||||
|
||||
// Get secondary DB from handlers
|
||||
secondaryDB := s.handlers.secondaryDB
|
||||
if secondaryDB == nil {
|
||||
return fmt.Errorf("secondary DB not available")
|
||||
}
|
||||
|
||||
// Create initial transfer with proper initialization
|
||||
transfer := &InitialTransfer{
|
||||
primaryDB: s.primaryDB,
|
||||
secondaryDB: secondaryDB,
|
||||
batchSize: batchSize,
|
||||
workerCount: 1,
|
||||
excludedDBs: map[string]bool{
|
||||
"information_schema": true,
|
||||
"performance_schema": true,
|
||||
"mysql": true,
|
||||
"sys": true,
|
||||
},
|
||||
checkpointFile: fmt.Sprintf("transfer_progress_%s.json", s.instanceName),
|
||||
progress: TransferProgress{
|
||||
TablesProcessed: make(map[string]int64),
|
||||
},
|
||||
}
|
||||
|
||||
if err := transfer.Transfer(excludeSchemas); err != nil {
|
||||
return fmt.Errorf("transfer failed: %v", err)
|
||||
}
|
||||
|
||||
// Reset position after successful transfer
|
||||
if err := s.positionMgr.Reset(); err != nil {
|
||||
return fmt.Errorf("failed to reset position: %v", err)
|
||||
}
|
||||
|
||||
Infof("[%s] Initial transfer completed successfully", s.instanceName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartWithResync starts binlog streaming with automatic resync if needed
|
||||
func (s *BinlogSyncService) StartWithResync(batchSize int, excludeSchemas []string) error {
|
||||
if err := s.positionMgr.InitTable(); err != nil {
|
||||
Warnf("[WARN] Failed to init position table: %v", err)
|
||||
}
|
||||
|
||||
// Check if resync is needed
|
||||
needResync, err := s.NeedResync()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check resync status: %v", err)
|
||||
}
|
||||
|
||||
if needResync {
|
||||
if err := s.RunInitialTransfer(batchSize, excludeSchemas); err != nil {
|
||||
return fmt.Errorf("initial transfer failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Start normal streaming
|
||||
return s.StartWithStop()
|
||||
}
|
||||
168
pkg/replica/sqlbuilder.go
Normal file
168
pkg/replica/sqlbuilder.go
Normal file
@@ -0,0 +1,168 @@
|
||||
package replica
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/go-mysql-org/go-mysql/replication"
|
||||
)
|
||||
|
||||
// SQLBuilder handles SQL statement building
|
||||
type SQLBuilder struct{}
|
||||
|
||||
// NewSQLBuilder creates a new SQL builder
|
||||
func NewSQLBuilder() *SQLBuilder {
|
||||
return &SQLBuilder{}
|
||||
}
|
||||
|
||||
// BuildInsert builds an INSERT statement
|
||||
func (sb *SQLBuilder) BuildInsert(schema, table string, tableMap *replication.TableMapEvent, row []interface{}) string {
|
||||
if len(row) == 0 {
|
||||
return fmt.Sprintf("INSERT INTO `%s`.`%s` VALUES ()", schema, table)
|
||||
}
|
||||
|
||||
var columns []string
|
||||
var values []string
|
||||
|
||||
for i, col := range row {
|
||||
colName := sb.getColumnName(tableMap, i)
|
||||
columns = append(columns, colName)
|
||||
values = append(values, formatValue(col))
|
||||
}
|
||||
|
||||
return fmt.Sprintf("INSERT INTO `%s`.`%s` (%s) VALUES (%s)",
|
||||
schema, table, strings.Join(columns, ", "), strings.Join(values, ", "))
|
||||
}
|
||||
|
||||
// BuildUpdate builds an UPDATE statement
|
||||
func (sb *SQLBuilder) BuildUpdate(schema, table string, tableMap *replication.TableMapEvent, before, after []interface{}) string {
|
||||
if len(before) == 0 || len(after) == 0 {
|
||||
return fmt.Sprintf("UPDATE `%s`.`%s` SET id = id", schema, table)
|
||||
}
|
||||
|
||||
var setClauses []string
|
||||
var whereClauses []string
|
||||
|
||||
for i := range before {
|
||||
colName := sb.getColumnName(tableMap, i)
|
||||
if !valuesEqual(before[i], after[i]) {
|
||||
setClauses = append(setClauses, fmt.Sprintf("%s = %s", colName, formatValue(after[i])))
|
||||
}
|
||||
if i == 0 {
|
||||
whereClauses = append(whereClauses, fmt.Sprintf("%s = %s", colName, formatValue(before[i])))
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Sprintf("UPDATE `%s`.`%s` SET %s WHERE %s",
|
||||
schema, table, strings.Join(setClauses, ", "), strings.Join(whereClauses, " AND "))
|
||||
}
|
||||
|
||||
// BuildDelete builds a DELETE statement
|
||||
func (sb *SQLBuilder) BuildDelete(schema, table string, tableMap *replication.TableMapEvent, row []interface{}) string {
|
||||
if len(row) == 0 {
|
||||
return fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE 1=0", schema, table)
|
||||
}
|
||||
|
||||
colName := sb.getColumnName(tableMap, 0)
|
||||
whereClause := fmt.Sprintf("%s = %s", colName, formatValue(row[0]))
|
||||
return fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE %s", schema, table, whereClause)
|
||||
}
|
||||
|
||||
// getColumnName returns the column name at the given index
|
||||
func (sb *SQLBuilder) getColumnName(tableMap *replication.TableMapEvent, index int) string {
|
||||
if tableMap == nil || index >= len(tableMap.ColumnName) {
|
||||
return fmt.Sprintf("`col_%d`", index)
|
||||
}
|
||||
return fmt.Sprintf("`%s`", tableMap.ColumnName[index])
|
||||
}
|
||||
|
||||
// formatValue formats a value for SQL
|
||||
func formatValue(col interface{}) string {
|
||||
switch v := col.(type) {
|
||||
case []byte:
|
||||
if str := string(v); validUTF8(v) {
|
||||
return fmt.Sprintf("'%s'", strings.ReplaceAll(str, "'", "''"))
|
||||
}
|
||||
return fmt.Sprintf("X'%s'", hexEncode(v))
|
||||
case string:
|
||||
return fmt.Sprintf("'%s'", strings.ReplaceAll(v, "'", "''"))
|
||||
case nil:
|
||||
return "NULL"
|
||||
default:
|
||||
return fmt.Sprintf("'%v'", v)
|
||||
}
|
||||
}
|
||||
|
||||
func validUTF8(b []byte) bool {
|
||||
for i := 0; i < len(b); {
|
||||
if b[i] < 0x80 {
|
||||
i++
|
||||
} else if (b[i] & 0xE0) == 0xC0 {
|
||||
if i+1 >= len(b) || (b[i+1]&0xC0) != 0x80 {
|
||||
return false
|
||||
}
|
||||
i += 2
|
||||
} else if (b[i] & 0xF0) == 0xE0 {
|
||||
if i+2 >= len(b) || (b[i+1]&0xC0) != 0x80 || (b[i+2]&0xC0) != 0x80 {
|
||||
return false
|
||||
}
|
||||
i += 3
|
||||
} else if (b[i] & 0xF8) == 0xF0 {
|
||||
if i+3 >= len(b) || (b[i+1]&0xC0) != 0x80 || (b[i+2]&0xC0) != 0x80 || (b[i+3]&0xC0) != 0x80 {
|
||||
return false
|
||||
}
|
||||
i += 4
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func hexEncode(b []byte) string {
|
||||
hexChars := "0123456789ABCDEF"
|
||||
result := make([]byte, len(b)*2)
|
||||
for i, byteVal := range b {
|
||||
result[i*2] = hexChars[byteVal>>4]
|
||||
result[i*2+1] = hexChars[byteVal&0x0F]
|
||||
}
|
||||
return string(result)
|
||||
}
|
||||
|
||||
// valuesEqual compares two values, handling slices properly
|
||||
func valuesEqual(a, b interface{}) bool {
|
||||
// Handle nil cases
|
||||
if a == nil && b == nil {
|
||||
return true
|
||||
}
|
||||
if a == nil || b == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Handle byte slices specially
|
||||
if aBytes, ok := a.([]byte); ok {
|
||||
if bBytes, ok := b.([]byte); ok {
|
||||
return bytesEqual(aBytes, bBytes)
|
||||
}
|
||||
return false
|
||||
}
|
||||
if _, ok := b.([]byte); ok {
|
||||
return false
|
||||
}
|
||||
|
||||
// For other types, use fmt.Sprintf comparison
|
||||
return fmt.Sprintf("%v", a) == fmt.Sprintf("%v", b)
|
||||
}
|
||||
|
||||
// bytesEqual compares two byte slices
|
||||
func bytesEqual(a, b []byte) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
for i := range a {
|
||||
if a[i] != b[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
Reference in New Issue
Block a user