Files
replica/replica/handlers.go
2026-02-12 15:18:55 +01:00

421 lines
12 KiB
Go

package replica
import (
"database/sql"
"fmt"
"sync"
"time"
"github.com/go-mysql-org/go-mysql/replication"
)
// EventHandlers handles binlog event processing with resilience features
type EventHandlers struct {
secondaryDB *sql.DB
secondaryName string
tableMapCache map[uint64]*replication.TableMapEvent
tableMapMu sync.RWMutex
sqlBuilder *SQLBuilder
failedTables map[string]int // tableName -> consecutive failure count
failedTablesMu sync.RWMutex
maxFailures int // max consecutive failures before skipping
retryAttempts int // number of retry attempts
retryDelay time.Duration // base retry delay
lastSchemaHash map[string]string // tableName -> schema hash
lastSchemaMu sync.RWMutex
}
// NewEventHandlers creates new event handlers with default resilience settings
func NewEventHandlers(secondaryDB *sql.DB, secondaryName string) *EventHandlers {
return &EventHandlers{
secondaryDB: secondaryDB,
secondaryName: secondaryName,
tableMapCache: make(map[uint64]*replication.TableMapEvent),
sqlBuilder: NewSQLBuilder(),
failedTables: make(map[string]int),
maxFailures: 5,
retryAttempts: 3,
retryDelay: 100 * time.Millisecond,
lastSchemaHash: make(map[string]string),
}
}
// HandleRows processes row-level events with panic recovery and retry logic
func (h *EventHandlers) HandleRows(header *replication.EventHeader, e *replication.RowsEvent) error {
// Panic recovery wrapper
defer func() {
if r := recover(); r != nil {
Errorf("[%s][PANIC RECOVERED] HandleRows panic: %v", h.secondaryName, r)
}
}()
tableName := string(e.Table.Table)
schemaName := string(e.Table.Schema)
if schemaName != "replica" {
return nil
}
// Check if table is temporarily skipped due to failures
if h.isTableSkipped(schemaName, tableName) {
Warn("[%s][SKIPPED] Skipping event for %s.%s (too many failures)", h.secondaryName, schemaName, tableName)
return nil
}
eventType := h.getEventTypeName(header.EventType)
Info("[%s][%s] %s.%s", h.secondaryName, eventType, schemaName, tableName)
if h.secondaryDB != nil {
// Schema drift detection
if h.detectSchemaDrift(schemaName, tableName) {
Warn("[%s][WARN] Schema drift detected for %s.%s, pausing replication", h.secondaryName, schemaName, tableName)
h.markTableFailed(schemaName, tableName)
return fmt.Errorf("schema drift detected")
}
h.handleSecondaryReplication(e, header.EventType)
}
return nil
}
// HandleQuery processes query events with panic recovery
func (h *EventHandlers) HandleQuery(e *replication.QueryEvent) error {
// Panic recovery wrapper
defer func() {
if r := recover(); r != nil {
Errorf("[%s][PANIC RECOVERED] HandleQuery panic: %v", h.secondaryName, r)
}
}()
query := string(e.Query)
if h.secondaryDB != nil {
_, err := h.secondaryDB.Exec(query)
if err != nil {
Errorf("[%s][ERROR] Query failed: %v", h.secondaryName, err)
}
}
return nil
}
// HandleTableMap caches table map events
func (h *EventHandlers) HandleTableMap(e *replication.TableMapEvent) {
// Panic recovery wrapper
defer func() {
if r := recover(); r != nil {
Errorf("[%s][PANIC RECOVERED] HandleTableMap panic: %v", h.secondaryName, r)
}
}()
tableID := (uint64(e.TableID) << 8) | uint64(e.TableID>>56)
h.tableMapMu.Lock()
h.tableMapCache[tableID] = e
h.tableMapMu.Unlock()
}
// GetTableMap returns the cached table map for a table ID
func (h *EventHandlers) GetTableMap(tableID uint64) *replication.TableMapEvent {
h.tableMapMu.RLock()
defer h.tableMapMu.RUnlock()
return h.tableMapCache[tableID]
}
// isTableSkipped checks if a table should be skipped due to too many failures
func (h *EventHandlers) isTableSkipped(schema, table string) bool {
key := schema + "." + table
h.failedTablesMu.RLock()
defer h.failedTablesMu.RUnlock()
return h.failedTables[key] >= h.maxFailures
}
// markTableFailed records a failure for a table
func (h *EventHandlers) markTableFailed(schema, table string) {
key := schema + "." + table
h.failedTablesMu.Lock()
h.failedTables[key]++
failCount := h.failedTables[key]
h.failedTablesMu.Unlock()
Warn("[%s][FAILURE] %s.%s failure count: %d/%d", h.secondaryName, schema, table, failCount, h.maxFailures)
}
// markTableSuccess records a successful operation for a table
func (h *EventHandlers) markTableSuccess(schema, table string) {
key := schema + "." + table
h.failedTablesMu.Lock()
h.failedTables[key] = 0 // Reset failure count on success
h.failedTablesMu.Unlock()
}
// detectSchemaDrift checks if the table schema has changed
func (h *EventHandlers) detectSchemaDrift(schema, table string) bool {
key := schema + "." + table
// Get current schema hash
currentHash, err := h.getSchemaHash(schema, table)
if err != nil {
Warn("[%s][WARN] Could not get schema hash for %s.%s: %v", h.secondaryName, schema, table, err)
return false
}
h.lastSchemaMu.RLock()
lastHash, exists := h.lastSchemaHash[key]
h.lastSchemaMu.RUnlock()
if !exists {
// First time seeing this table
h.lastSchemaMu.Lock()
h.lastSchemaHash[key] = currentHash
h.lastSchemaMu.Unlock()
return false
}
if lastHash != currentHash {
Warn("[%s][DRIFT] Schema changed for %s.%s: %s -> %s", h.secondaryName, schema, table, lastHash, currentHash)
return true
}
return false
}
// getSchemaHash returns a hash of the table schema
func (h *EventHandlers) getSchemaHash(schema, table string) (string, error) {
query := fmt.Sprintf(
"SELECT MD5(GROUP_CONCAT(COLUMN_NAME, ':', DATA_TYPE, ':', IS_NULLABLE ORDER BY ORDINAL_POSITION)) "+
"FROM INFORMATION_SCHEMA.COLUMNS "+
"WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s'",
schema, table,
)
var hash string
err := h.secondaryDB.QueryRow(query).Scan(&hash)
return hash, err
}
func (h *EventHandlers) handleSecondaryReplication(e *replication.RowsEvent, eventType replication.EventType) {
schemaName := string(e.Table.Schema)
tableName := string(e.Table.Table)
tableID := (uint64(e.TableID) << 8) | uint64(e.TableID>>56)
tableMap := h.GetTableMap(tableID)
if tableMap == nil {
tableMap = e.Table
}
if len(tableMap.ColumnName) == 0 {
columns, err := h.fetchColumnNames(schemaName, tableName)
if err != nil {
Errorf("[%s][ERROR] Failed to fetch columns: %v", h.secondaryName, err)
return
}
columnBytes := make([][]byte, len(columns))
for i, col := range columns {
columnBytes[i] = []byte(col)
}
tableMap = &replication.TableMapEvent{
Schema: e.Table.Schema,
Table: e.Table.Table,
ColumnName: columnBytes,
}
}
switch eventType {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
h.replicateInsert(tableMap, schemaName, tableName, e.Rows)
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
h.replicateUpdate(tableMap, schemaName, tableName, e.Rows)
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
h.replicateDelete(tableMap, schemaName, tableName, e.Rows)
}
}
// replicateInsert inserts rows with retry logic
func (h *EventHandlers) replicateInsert(tableMap *replication.TableMapEvent, schema, table string, rows [][]interface{}) {
for _, row := range rows {
query := h.sqlBuilder.BuildInsert(schema, table, tableMap, row)
err := h.executeWithRetry(query)
if err != nil {
Errorf("[%s][ERROR] INSERT failed after retries: %v", h.secondaryName, err)
h.markTableFailed(schema, table)
} else {
h.markTableSuccess(schema, table)
}
}
}
// replicateUpdate updates rows with retry logic
func (h *EventHandlers) replicateUpdate(tableMap *replication.TableMapEvent, schema, table string, rows [][]interface{}) {
for i := 0; i < len(rows); i += 2 {
query := h.sqlBuilder.BuildUpdate(schema, table, tableMap, rows[i], rows[i+1])
err := h.executeWithRetry(query)
if err != nil {
Errorf("[%s][ERROR] UPDATE failed after retries: %v", h.secondaryName, err)
h.markTableFailed(schema, table)
} else {
h.markTableSuccess(schema, table)
}
}
}
// replicateDelete deletes rows with retry logic
func (h *EventHandlers) replicateDelete(tableMap *replication.TableMapEvent, schema, table string, rows [][]interface{}) {
for _, row := range rows {
query := h.sqlBuilder.BuildDelete(schema, table, tableMap, row)
err := h.executeWithRetry(query)
if err != nil {
Errorf("[%s][ERROR] DELETE failed after retries: %v", h.secondaryName, err)
h.markTableFailed(schema, table)
} else {
h.markTableSuccess(schema, table)
}
}
}
// executeWithRetry executes a query with exponential backoff retry
func (h *EventHandlers) executeWithRetry(query string) error {
var lastErr error
for attempt := 0; attempt <= h.retryAttempts; attempt++ {
if attempt > 0 {
delay := h.retryDelay * time.Duration(1<<attempt) // exponential backoff
Warn("[%s][RETRY] Retrying in %v (attempt %d/%d)", h.secondaryName, delay, attempt, h.retryAttempts)
time.Sleep(delay)
}
result, err := h.secondaryDB.Exec(query)
if err == nil {
if result != nil {
rowsAffected, _ := result.RowsAffected()
Info("[%s][SUCCESS] %d row(s) affected", h.secondaryName, rowsAffected)
}
return nil
}
lastErr = err
// Check if connection is dead
if h.isConnectionError(err) {
Errorf("[%s][CONNECTION ERROR] Detected connection error: %v", h.secondaryName, err)
h.reconnect()
}
}
return lastErr
}
// isConnectionError checks if the error is a connection-related error
func (h *EventHandlers) isConnectionError(err error) bool {
errStr := err.Error()
connectionErrors := []string{
"connection refused",
"connection reset",
"broken pipe",
"timeout",
"no such host",
"network is unreachable",
"driver: bad connection",
"invalid connection",
}
for _, ce := range connectionErrors {
if contains(errStr, ce) {
return true
}
}
return false
}
// reconnect attempts to reconnect to the secondary database
func (h *EventHandlers) reconnect() {
Warn("[%s][RECONNECT] Attempting to reconnect to secondary database...", h.secondaryName)
// Close existing connections
h.secondaryDB.Close()
// Attempt to re-establish connection
var err error
maxRetries := 5
for i := 0; i < maxRetries; i++ {
h.secondaryDB, err = sql.Open("mysql", "root:replica@tcp(localhost:3307)/replica?multiStatements=true")
if err == nil {
h.secondaryDB.SetMaxOpenConns(25)
h.secondaryDB.SetMaxIdleConns(5)
if err = h.secondaryDB.Ping(); err == nil {
Info("[%s][RECONNECT] Successfully reconnected to secondary database", h.secondaryName)
return
}
}
Errorf("[%s][RECONNECT] Reconnection attempt %d/%d failed: %v", h.secondaryName, i+1, maxRetries, err)
time.Sleep(time.Duration(i+1) * time.Second)
}
Errorf("[%s][RECONNECT] Failed to reconnect after %d attempts", h.secondaryName, maxRetries)
}
// PingSecondary performs a health check on the secondary connection
func (h *EventHandlers) PingSecondary() error {
if h.secondaryDB == nil {
return fmt.Errorf("secondary DB is nil")
}
return h.secondaryDB.Ping()
}
// GetSecondaryName returns the secondary name
func (h *EventHandlers) GetSecondaryName() string {
return h.secondaryName
}
func (h *EventHandlers) fetchColumnNames(schema, table string) ([]string, error) {
query := fmt.Sprintf(
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' ORDER BY ORDINAL_POSITION",
schema, table,
)
rows, err := h.secondaryDB.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
var columns []string
for rows.Next() {
var colName string
if err := rows.Scan(&colName); err != nil {
return nil, err
}
columns = append(columns, colName)
}
return columns, rows.Err()
}
func (h *EventHandlers) getEventTypeName(eventType replication.EventType) string {
switch eventType {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
return "INSERT"
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
return "UPDATE"
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
return "DELETE"
default:
return "UNKNOWN"
}
}
// contains checks if a string contains a substring
func contains(s, substr string) bool {
return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr))
}
func containsHelper(s, substr string) bool {
for i := 0; i <= len(s)-len(substr); i++ {
if s[i:i+len(substr)] == substr {
return true
}
}
return false
}