Files
zfs/internal/client/client.go
2026-02-16 03:02:10 +01:00

324 lines
9.4 KiB
Go

// Package client provides ZFS snapshot backup client functionality.
// It handles creating snapshots and uploading them to a remote server.
package client
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os/exec"
"strings"
"time"
"github.com/mistifyio/go-zfs"
"github.com/pierrec/lz4/v4"
)
var uploadUrl = "/upload-stream/"
// SnapshotResult contains the result of a snapshot creation and send operation.
type SnapshotResult struct {
FullBackup bool
Snapshot *zfs.Dataset
}
// Client handles snapshot backup operations to a remote server.
type Client struct {
config *Config
}
// New creates a new Client instance with the provided configuration.
func New(config *Config) *Client {
return &Client{config: config}
}
// CreateAndSend creates a snapshot and sends it to the backup server via HTTP.
// It automatically detects if this is a full or incremental backup:
// - If no bookmark exists, does a full backup
// - If bookmark exists, does an incremental backup from the bookmark
// If targetDataset is provided, it overrides the configured dataset.
func (c *Client) CreateAndSend(targetDataset string) (*SnapshotResult, error) {
// Use provided dataset or fall back to config
if targetDataset == "" {
targetDataset = c.config.LocalDataset
}
// Check for existing bookmark to determine backup type
lastBookmark, err := c.GetLastBookmark()
if err != nil {
return nil, fmt.Errorf("failed to check bookmarks: %v", err)
}
// Create new snapshot
snapshot, err := c.CreateSnapshot(targetDataset)
if err != nil {
return nil, fmt.Errorf("failed to create snapshot: %v", err)
}
isFullBackup := lastBookmark == ""
if isFullBackup {
fmt.Println("→ No previous backup found, doing FULL backup...")
// Send as full (no base)
if err := c.SendIncrementalHTTP(snapshot, targetDataset, ""); err != nil {
return nil, fmt.Errorf("failed to send snapshot: %v", err)
}
} else {
fmt.Printf("→ Found previous backup, doing INCREMENTAL from %s...\n", lastBookmark)
// Send as incremental from bookmark
if err := c.SendIncrementalHTTP(snapshot, targetDataset, lastBookmark); err != nil {
return nil, fmt.Errorf("failed to send incremental: %v", err)
}
}
// Create bookmark for future incremental backups
if err := c.CreateBookmark(snapshot); err != nil {
fmt.Printf("Warning: failed to create bookmark: %v\n", err)
}
return &SnapshotResult{
FullBackup: isFullBackup,
Snapshot: snapshot,
}, nil
}
// CreateSnapshot creates a local ZFS snapshot of the configured dataset.
func (c *Client) CreateSnapshot(dataset string) (*zfs.Dataset, error) {
ds, err := zfs.GetDataset(dataset)
if err != nil {
return nil, fmt.Errorf("failed to get dataset: %v", err)
}
// Generate snapshot name with timestamp
timestamp := time.Now().Format("2006-01-02_15:04:05")
snapshotName := fmt.Sprintf("backup_%s", timestamp)
// Create the snapshot
snapshot, err := ds.Snapshot(snapshotName, false)
if err != nil {
return nil, fmt.Errorf("failed to create snapshot: %v", err)
}
fmt.Printf("✓ Created local snapshot: %s@%s\n", c.config.LocalDataset, snapshotName)
return snapshot, nil
}
// GetSnapshotSize returns the used size of a snapshot in bytes.
func (c *Client) GetSnapshotSize(snapshot *zfs.Dataset) int64 {
return int64(snapshot.Used)
}
// SendIncrementalHTTP sends a snapshot to the server via HTTP.
// The server then handles storage (S3 or local ZFS).
// datasetName should be the ZFS dataset being backed up (e.g., "tank/data")
func (c *Client) SendIncrementalHTTP(snapshot *zfs.Dataset, datasetName, base string) error {
estimatedSize := c.GetSnapshotSize(snapshot)
// Determine if this is incremental or full
isIncremental := base != ""
// Request upload authorization from server
uploadReq := map[string]interface{}{
"client_id": c.config.ClientID,
"api_key": c.config.APIKey,
"dataset_name": datasetName,
"timestamp": time.Now().Format(time.RFC3339),
"compressed": c.config.Compress,
"estimated_size": estimatedSize,
"incremental": isIncremental,
"base_snapshot": base,
}
reqBody, _ := json.Marshal(uploadReq)
uploadURL := c.config.ServerURL
// Ensure proper URL format
if !strings.HasSuffix(uploadURL, "/") {
uploadURL += "/"
}
uploadURL += "upload"
resp, err := http.Post(uploadURL, "application/json", bytes.NewBuffer(reqBody))
if err != nil {
return fmt.Errorf("failed to request upload: %v", err)
}
defer resp.Body.Close()
var uploadResp struct {
Success bool `json:"success"`
Message string `json:"message"`
UploadURL string `json:"upload_url"`
UploadMethod string `json:"upload_method"`
StorageKey string `json:"storage_key"`
}
if err := json.NewDecoder(resp.Body).Decode(&uploadResp); err != nil {
return fmt.Errorf("failed to decode response: %v", err)
}
if !uploadResp.Success {
return fmt.Errorf("upload not authorized: %s", uploadResp.Message)
}
fmt.Printf("→ Upload authorized\n")
fmt.Printf(" Method: %s\n", uploadResp.UploadMethod)
fmt.Printf(" Storage key: %s\n", uploadResp.StorageKey)
// Stream to server via HTTP
return c.streamToServer(snapshot, base, uploadResp.UploadURL, uploadResp.StorageKey)
}
// streamToServer streams a ZFS snapshot to the backup server via HTTP.
func (c *Client) streamToServer(snapshot *zfs.Dataset, base, uploadURL, storageKey string) error {
fmt.Printf("→ Streaming snapshot to server...\n")
// Create ZFS send command
var cmd *exec.Cmd
if base != "" {
// Incremental send from bookmark or snapshot
cmd = exec.Command("zfs", "send", "-i", base, snapshot.Name)
} else {
// Full send
cmd = exec.Command("zfs", "send", snapshot.Name)
}
zfsOut, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("failed to create pipe: %v", err)
}
if err := cmd.Start(); err != nil {
return fmt.Errorf("failed to start zfs send: %v", err)
}
var reader io.Reader = zfsOut
// Apply LZ4 compression if enabled
if c.config.Compress {
fmt.Printf(" Compressing with LZ4...\n")
pr, pw := io.Pipe()
lz4Writer := lz4.NewWriter(pw)
lz4Writer.Apply(lz4.BlockSizeOption(lz4.BlockSize(4 * 1024 * 1024))) // 4MB blocks
go func() {
io.Copy(lz4Writer, zfsOut)
lz4Writer.Close()
pw.Close()
}()
reader = pr
}
// Create HTTP request to server
// Build full URL properly - check if uploadURL is already full URL
fullURL := uploadURL
// If uploadURL is a relative path, prepend server URL
if !strings.HasPrefix(uploadURL, "http://") && !strings.HasPrefix(uploadURL, "https://") {
fullURL = c.config.ServerURL
// Remove trailing slash from base URL if present
fullURL = strings.TrimRight(fullURL, "/")
// Add leading slash to upload URL if not present
if !strings.HasPrefix(uploadURL, "/") {
uploadURL = "/" + uploadURL
}
fullURL += uploadURL
}
fmt.Printf(" Streaming to: %s\n", fullURL)
req, err := http.NewRequest("POST", fullURL, reader)
if err != nil {
return fmt.Errorf("failed to create request: %v", err)
}
// Set headers
req.Header.Set("X-API-Key", c.config.APIKey)
req.Header.Set("X-Storage-Key", storageKey)
req.Header.Set("X-Dataset-Name", c.config.LocalDataset)
req.Header.Set("X-Compressed", fmt.Sprintf("%v", c.config.Compress))
req.Header.Set("X-Incremental", fmt.Sprintf("%v", base != ""))
if base != "" {
req.Header.Set("X-Base-Snapshot", base)
}
req.Header.Set("Content-Type", "application/octet-stream")
// Send request with no timeout for large uploads
httpClient := &http.Client{
Timeout: 0,
}
httpResp, err := httpClient.Do(req)
if err != nil {
cmd.Process.Kill()
return fmt.Errorf("failed to upload: %v", err)
}
defer httpResp.Body.Close()
if httpResp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(httpResp.Body)
return fmt.Errorf("upload failed with status %d: %s", httpResp.StatusCode, body)
}
if err := cmd.Wait(); err != nil {
return fmt.Errorf("zfs send failed: %v", err)
}
// Parse response
var result struct {
Success bool `json:"success"`
Message string `json:"message"`
Size int64 `json:"size"`
}
if err := json.NewDecoder(httpResp.Body).Decode(&result); err != nil {
return fmt.Errorf("failed to decode response: %v", err)
}
if !result.Success {
return fmt.Errorf("upload failed: %s", result.Message)
}
fmt.Printf("✓ Snapshot uploaded successfully!\n")
fmt.Printf(" Size: %.2f MB\n", float64(result.Size)/(1024*1024))
return nil
}
// GetStatus retrieves and displays the client's backup status from the server.
func (c *Client) GetStatus() error {
url := fmt.Sprintf("%s/status?client_id=%s&api_key=%s",
c.config.ServerURL, c.config.ClientID, c.config.APIKey)
resp, err := http.Get(url)
if err != nil {
return fmt.Errorf("failed to get status: %v", err)
}
defer resp.Body.Close()
var status struct {
Success bool `json:"success"`
TotalSnapshots int `json:"total_snapshots"`
UsedBytes int64 `json:"used_bytes"`
MaxBytes int64 `json:"max_bytes"`
PercentUsed float64 `json:"percent_used"`
StorageType string `json:"storage_type"`
}
if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
return fmt.Errorf("failed to decode status: %v", err)
}
if !status.Success {
return fmt.Errorf("status check failed")
}
fmt.Printf("\n=== Server Status ===\n")
fmt.Printf("Storage Type: %s\n", status.StorageType)
fmt.Printf("Total Snapshots: %d\n", status.TotalSnapshots)
fmt.Printf("Used: %.2f GB / %.2f GB (%.1f%%)\n",
float64(status.UsedBytes)/(1024*1024*1024),
float64(status.MaxBytes)/(1024*1024*1024),
status.PercentUsed)
return nil
}