diff --git a/cmd/zfs-client/main.go b/cmd/zfs-client/main.go index 3ebace7..99441df 100644 --- a/cmd/zfs-client/main.go +++ b/cmd/zfs-client/main.go @@ -58,7 +58,7 @@ func printUsage() { fmt.Println("ZFS Snapshot Backup Client - Simple Version") fmt.Println("\nUsage: zfs-client [command]") fmt.Println("\nCommands:") - fmt.Println(" snap - Create snapshot and send to server (auto full/incremental)") + fmt.Println(" snap - Create snapshot and send to server") fmt.Println(" status - Check server connection and quota") fmt.Println(" help - Show this help message") fmt.Println("\nEnvironment Variables (can be set in .env file):") @@ -67,12 +67,6 @@ func printUsage() { fmt.Println(" SERVER_URL - Backup server URL (default: http://localhost:8080)") fmt.Println(" LOCAL_DATASET - ZFS dataset to backup (default: tank/data)") fmt.Println(" COMPRESS - Enable LZ4 compression (default: true)") - fmt.Println("\nS3 Configuration (for direct S3 uploads):") - fmt.Println(" S3_ENDPOINT - S3 endpoint URL (e.g., https://s3.amazonaws.com)") - fmt.Println(" S3_REGION - AWS region (default: us-east-1)") - fmt.Println(" S3_BUCKET - S3 bucket name (default: zfs-backups)") - fmt.Println(" S3_ACCESS_KEY - AWS access key") - fmt.Println(" S3_SECRET_KEY - AWS secret key") fmt.Println("\nExamples:") fmt.Println(" zfs-client snap") fmt.Println(" zfs-client status") diff --git a/internal/client/client.go b/internal/client/client.go index 5d611c0..9a3342a 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -4,27 +4,24 @@ package client import ( "bytes" - "context" "encoding/json" "fmt" "io" "net/http" - "os" "os/exec" - "strings" "time" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/mistifyio/go-zfs" "github.com/pierrec/lz4/v4" ) +// SnapshotResult contains the result of a snapshot creation and send operation. +type SnapshotResult struct { + FullBackup bool + Snapshot *zfs.Dataset +} + // Client handles snapshot backup operations to a remote server. -// It manages creating local ZFS snapshots and transmitting them -// to the backup server via HTTP or SSH. type Client struct { config *Config } @@ -34,11 +31,51 @@ func New(config *Config) *Client { return &Client{config: config} } +// CreateAndSend creates a snapshot and sends it to the backup server via HTTP. +// It automatically detects if this is a full or incremental backup: +// - If no bookmark exists, does a full backup +// - If bookmark exists, does an incremental backup from the bookmark +func (c *Client) CreateAndSend() (*SnapshotResult, error) { + // Check for existing bookmark to determine backup type + lastBookmark, err := c.GetLastBookmark() + if err != nil { + return nil, fmt.Errorf("failed to check bookmarks: %v", err) + } + + // Create new snapshot + snapshot, err := c.CreateSnapshot() + if err != nil { + return nil, fmt.Errorf("failed to create snapshot: %v", err) + } + + isFullBackup := lastBookmark == "" + if isFullBackup { + fmt.Println("→ No previous backup found, doing FULL backup...") + // Send as full (no base) + if err := c.SendIncrementalHTTP(snapshot, ""); err != nil { + return nil, fmt.Errorf("failed to send snapshot: %v", err) + } + } else { + fmt.Printf("→ Found previous backup, doing INCREMENTAL from %s...\n", lastBookmark) + // Send as incremental from bookmark + if err := c.SendIncrementalHTTP(snapshot, lastBookmark); err != nil { + return nil, fmt.Errorf("failed to send incremental: %v", err) + } + } + + // Create bookmark for future incremental backups + if err := c.CreateBookmark(snapshot); err != nil { + fmt.Printf("Warning: failed to create bookmark: %v\n", err) + } + + return &SnapshotResult{ + FullBackup: isFullBackup, + Snapshot: snapshot, + }, nil +} + // CreateSnapshot creates a local ZFS snapshot of the configured dataset. -// The snapshot is named with a timestamp for easy identification. -// Returns the created snapshot dataset or an error. func (c *Client) CreateSnapshot() (*zfs.Dataset, error) { - // Get the local dataset ds, err := zfs.GetDataset(c.config.LocalDataset) if err != nil { return nil, fmt.Errorf("failed to get dataset: %v", err) @@ -63,12 +100,14 @@ func (c *Client) GetSnapshotSize(snapshot *zfs.Dataset) int64 { return int64(snapshot.Used) } -// SendSnapshot sends a snapshot to the backup server. -// It first requests upload authorization, then streams the snapshot -// using the appropriate method (S3 or ZFS receive). -func (c *Client) SendSnapshot(snapshot *zfs.Dataset) error { +// SendIncrementalHTTP sends a snapshot to the server via HTTP. +// The server then handles storage (S3 or local ZFS). +func (c *Client) SendIncrementalHTTP(snapshot *zfs.Dataset, base string) error { estimatedSize := c.GetSnapshotSize(snapshot) + // Determine if this is incremental or full + isIncremental := base != "" + // Request upload authorization from server uploadReq := map[string]interface{}{ "client_id": c.config.ClientID, @@ -77,6 +116,8 @@ func (c *Client) SendSnapshot(snapshot *zfs.Dataset) error { "timestamp": time.Now().Format(time.RFC3339), "compressed": c.config.Compress, "estimated_size": estimatedSize, + "incremental": isIncremental, + "base_snapshot": base, } reqBody, _ := json.Marshal(uploadReq) @@ -86,7 +127,6 @@ func (c *Client) SendSnapshot(snapshot *zfs.Dataset) error { } defer resp.Body.Close() - // Parse server response var uploadResp struct { Success bool `json:"success"` Message string `json:"message"` @@ -107,50 +147,24 @@ func (c *Client) SendSnapshot(snapshot *zfs.Dataset) error { fmt.Printf(" Method: %s\n", uploadResp.UploadMethod) fmt.Printf(" Storage key: %s\n", uploadResp.StorageKey) - // Choose upload method based on server response - if uploadResp.UploadMethod == "s3" { - return c.streamToS3(snapshot, uploadResp.UploadURL, uploadResp.StorageKey) - } - return c.sendViaZFS(snapshot, uploadResp.StorageKey) + // Stream to server via HTTP + return c.streamToServer(snapshot, base, uploadResp.UploadURL, uploadResp.StorageKey) } -// streamToS3 streams a ZFS snapshot to S3 storage using AWS SDK. -// The snapshot is optionally compressed with LZ4 before transmission. -func (c *Client) streamToS3(snapshot *zfs.Dataset, uploadURL, storageKey string) error { - fmt.Printf("→ Uploading snapshot to S3...\n") - - // Ensure endpoint has valid URI scheme - endpoint := c.config.S3Endpoint - if endpoint != "" && !strings.HasPrefix(endpoint, "http://") && !strings.HasPrefix(endpoint, "https://") { - endpoint = "http://" + endpoint - } - - // Create AWS config - awsCfg, err := config.LoadDefaultConfig(context.TODO(), - config.WithRegion(c.config.S3Region), - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( - c.config.S3AccessKey, - c.config.S3SecretKey, - "", - )), - ) - if err != nil { - return fmt.Errorf("failed to load AWS config: %v", err) - } - - // Determine if using custom endpoint (non-AWS) - customEndpoint := endpoint != "" && endpoint != "http://s3.amazonaws.com" && endpoint != "https://s3.amazonaws.com" - - // Create S3 client - s3Client := s3.NewFromConfig(awsCfg, func(o *s3.Options) { - if customEndpoint { - o.BaseEndpoint = aws.String(endpoint) - o.UsePathStyle = true // Required for MinIO compatible storage - } - }) +// streamToServer streams a ZFS snapshot to the backup server via HTTP. +func (c *Client) streamToServer(snapshot *zfs.Dataset, base, uploadURL, storageKey string) error { + fmt.Printf("→ Streaming snapshot to server...\n") // Create ZFS send command - cmd := exec.Command("zfs", "send", snapshot.Name) + var cmd *exec.Cmd + if base != "" { + // Incremental send from bookmark or snapshot + cmd = exec.Command("zfs", "send", "-i", base, snapshot.Name) + } else { + // Full send + cmd = exec.Command("zfs", "send", snapshot.Name) + } + zfsOut, err := cmd.StdoutPipe() if err != nil { return fmt.Errorf("failed to create pipe: %v", err) @@ -167,12 +181,10 @@ func (c *Client) streamToS3(snapshot *zfs.Dataset, uploadURL, storageKey string) fmt.Printf(" Compressing with LZ4...\n") pr, pw := io.Pipe() lz4Writer := lz4.NewWriter(pw) - lz4Writer.Apply(lz4.BlockSizeOption(lz4.BlockSize(4 * 1024 * 1024))) // 4MB blocks for better performance + lz4Writer.Apply(lz4.BlockSizeOption(lz4.BlockSize(4 * 1024 * 1024))) // 4MB blocks go func() { - // Copy zfs output to LZ4 writer io.Copy(lz4Writer, zfsOut) - // Close LZ4 writer first to flush, then close pipe lz4Writer.Close() pw.Close() }() @@ -180,113 +192,66 @@ func (c *Client) streamToS3(snapshot *zfs.Dataset, uploadURL, storageKey string) reader = pr } - // Upload to S3 using PutObject - _, err = s3Client.PutObject(context.TODO(), &s3.PutObjectInput{ - Bucket: aws.String(c.config.S3Bucket), - Key: aws.String(storageKey), - Body: reader, - ContentType: aws.String("application/octet-stream"), - }) + // Create HTTP request to server + req, err := http.NewRequest("POST", c.config.ServerURL+uploadURL, reader) + if err != nil { + return fmt.Errorf("failed to create request: %v", err) + } + + // Set headers + req.Header.Set("X-API-Key", c.config.APIKey) + req.Header.Set("X-Storage-Key", storageKey) + req.Header.Set("X-Dataset-Name", c.config.LocalDataset) + req.Header.Set("X-Compressed", fmt.Sprintf("%v", c.config.Compress)) + req.Header.Set("X-Incremental", fmt.Sprintf("%v", base != "")) + if base != "" { + req.Header.Set("X-Base-Snapshot", base) + } + req.Header.Set("Content-Type", "application/octet-stream") + + // Send request with no timeout for large uploads + httpClient := &http.Client{ + Timeout: 0, + } + + httpResp, err := httpClient.Do(req) + if err != nil { + cmd.Process.Kill() + return fmt.Errorf("failed to upload: %v", err) + } + defer httpResp.Body.Close() + + if httpResp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(httpResp.Body) + return fmt.Errorf("upload failed with status %d: %s", httpResp.StatusCode, body) + } - // Wait for zfs send to complete if err := cmd.Wait(); err != nil { return fmt.Errorf("zfs send failed: %v", err) } - if err != nil { - return fmt.Errorf("failed to upload to S3: %v", err) + // Parse response + var result struct { + Success bool `json:"success"` + Message string `json:"message"` + Size int64 `json:"size"` } - fmt.Printf("✓ Snapshot uploaded to S3 successfully!\n") + if err := json.NewDecoder(httpResp.Body).Decode(&result); err != nil { + return fmt.Errorf("failed to decode response: %v", err) + } + + if !result.Success { + return fmt.Errorf("upload failed: %s", result.Message) + } + + fmt.Printf("✓ Snapshot uploaded successfully!\n") + fmt.Printf(" Size: %.2f MB\n", float64(result.Size)/(1024*1024)) return nil } -// sendViaZFS sends a snapshot via traditional ZFS send/receive over SSH. -// This method is used when the server uses local ZFS storage. -func (c *Client) sendViaZFS(snapshot *zfs.Dataset, receivePath string) error { - fmt.Printf("→ Sending via ZFS send/receive...\n") - - // Extract server host from URL - serverHost := c.config.ServerURL - if len(serverHost) > 7 && strings.HasPrefix(serverHost, "http://") { - serverHost = serverHost[7:] - } else if len(serverHost) > 8 && strings.HasPrefix(serverHost, "https://") { - serverHost = serverHost[8:] - } - - // Remove port if present - if idx := strings.LastIndex(serverHost, ":"); idx > 0 { - serverHost = serverHost[:idx] - } - - // Execute ZFS send over SSH - cmd := exec.Command("sh", "-c", - fmt.Sprintf("zfs send %s | ssh %s 'zfs recv -F %s'", - snapshot.Name, serverHost, receivePath)) - - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - if err := cmd.Run(); err != nil { - return fmt.Errorf("failed to send snapshot: %v", err) - } - - fmt.Printf("✓ Snapshot sent successfully!\n") - return nil -} - -// SnapshotResult contains the result of a snapshot creation and send operation. -type SnapshotResult struct { - FullBackup bool - Snapshot *zfs.Dataset -} - -// CreateAndSend creates a snapshot and sends it to the backup server. -// It automatically detects if this is a full or incremental backup: -// - If no bookmark exists, does a full backup -// - If bookmark exists, does an incremental backup from the bookmark -func (c *Client) CreateAndSend() (*SnapshotResult, error) { - // Check for existing bookmark to determine backup type - lastBookmark, err := c.GetLastBookmark() - if err != nil { - return nil, fmt.Errorf("failed to check bookmarks: %v", err) - } - - // Create new snapshot - snapshot, err := c.CreateSnapshot() - if err != nil { - return nil, fmt.Errorf("failed to create snapshot: %v", err) - } - - isFullBackup := lastBookmark == "" - if isFullBackup { - fmt.Println("→ No previous backup found, doing FULL backup...") - // Send as full (no base) - if err := c.SendIncremental(snapshot, ""); err != nil { - return nil, fmt.Errorf("failed to send snapshot: %v", err) - } - } else { - fmt.Printf("→ Found previous backup, doing INCREMENTAL from %s...", lastBookmark) - // Send as incremental from bookmark - if err := c.SendIncremental(snapshot, lastBookmark); err != nil { - return nil, fmt.Errorf("failed to send incremental: %v", err) - } - } - - // Create bookmark for future incremental backups - if err := c.CreateBookmark(snapshot); err != nil { - fmt.Printf("Warning: failed to create bookmark: %v\n", err) - } - - return &SnapshotResult{ - FullBackup: isFullBackup, - Snapshot: snapshot, - }, nil -} - // GetStatus retrieves and displays the client's backup status from the server. -// Shows storage usage, quota, and snapshot count. func (c *Client) GetStatus() error { url := fmt.Sprintf("%s/status?client_id=%s&api_key=%s", c.config.ServerURL, c.config.ClientID, c.config.APIKey) diff --git a/internal/client/config.go b/internal/client/config.go index 546e9b2..20d7820 100644 --- a/internal/client/config.go +++ b/internal/client/config.go @@ -9,7 +9,6 @@ import ( ) // Config holds client-side configuration for connecting to the backup server. -// Note: Storage type is determined by the server, not the client. type Config struct { // ClientID is the unique identifier for this client ClientID string `json:"client_id"` @@ -21,16 +20,6 @@ type Config struct { LocalDataset string `json:"local_dataset"` // Compress enables LZ4 compression for transfers Compress bool `json:"compress"` - // S3Endpoint is the S3 endpoint URL (optional, for direct S3 uploads) - S3Endpoint string `json:"s3_endpoint"` - // S3Region is the AWS region - S3Region string `json:"s3_region"` - // S3Bucket is the S3 bucket name - S3Bucket string `json:"s3_bucket"` - // S3AccessKey is the AWS access key - S3AccessKey string `json:"s3_access_key"` - // S3SecretKey is the AWS secret key - S3SecretKey string `json:"s3_secret_key"` } // LoadConfig loads client configuration from environment variables and .env file. @@ -42,14 +31,9 @@ func LoadConfig() *Config { return &Config{ ClientID: getEnv("CLIENT_ID", "client1"), APIKey: getEnv("API_KEY", "secret123"), - ServerURL: getEnv("SERVER_URL", "http://backup-server:8080"), + ServerURL: getEnv("SERVER_URL", "http://localhost:8080"), LocalDataset: getEnv("LOCAL_DATASET", "tank/data"), Compress: getEnv("COMPRESS", "true") == "true", - S3Endpoint: getEnv("S3_ENDPOINT", ""), - S3Region: getEnv("S3_REGION", "us-east-1"), - S3Bucket: getEnv("S3_BUCKET", "zfs-backups"), - S3AccessKey: getEnv("S3_ACCESS_KEY", ""), - S3SecretKey: getEnv("S3_SECRET_KEY", ""), } } diff --git a/internal/client/snapshot.go b/internal/client/snapshot.go index 5973e29..b6e081f 100644 --- a/internal/client/snapshot.go +++ b/internal/client/snapshot.go @@ -3,23 +3,11 @@ package client import ( - "bytes" - "context" - "encoding/json" "fmt" - "io" - "net/http" - "os" "os/exec" "strings" - "time" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/mistifyio/go-zfs" - "github.com/pierrec/lz4/v4" ) // CreateBookmark creates a ZFS bookmark from a snapshot. @@ -83,203 +71,7 @@ func (c *Client) GetLastSnapshot() (*zfs.Dataset, error) { return snapshots[len(snapshots)-1], nil } -// SendIncremental sends an incremental stream from a bookmark or snapshot. -// If base is empty, sends a full stream. +// SendIncremental is kept for API compatibility - now just calls HTTP version func (c *Client) SendIncremental(snapshot *zfs.Dataset, base string) error { - estimatedSize := c.GetSnapshotSize(snapshot) - - // Determine if this is incremental or full - isIncremental := base != "" - var uploadMethod string - if isIncremental { - uploadMethod = "incremental" - } else { - uploadMethod = "full" - } - - // Request upload authorization from server - uploadReq := map[string]interface{}{ - "client_id": c.config.ClientID, - "api_key": c.config.APIKey, - "dataset_name": c.config.LocalDataset, - "timestamp": time.Now().Format(time.RFC3339), - "compressed": c.config.Compress, - "estimated_size": estimatedSize, - "incremental": isIncremental, - "base_snapshot": base, - } - - reqBody, _ := json.Marshal(uploadReq) - resp, err := http.Post(c.config.ServerURL+"/upload", "application/json", bytes.NewBuffer(reqBody)) - if err != nil { - return fmt.Errorf("failed to request upload: %v", err) - } - defer resp.Body.Close() - - var uploadResp struct { - Success bool `json:"success"` - Message string `json:"message"` - UploadURL string `json:"upload_url"` - UploadMethod string `json:"upload_method"` - StorageKey string `json:"storage_key"` - } - - if err := json.NewDecoder(resp.Body).Decode(&uploadResp); err != nil { - return fmt.Errorf("failed to decode response: %v", err) - } - - if !uploadResp.Success { - return fmt.Errorf("upload not authorized: %s", uploadResp.Message) - } - - fmt.Printf("→ Upload authorized\n") - fmt.Printf(" Method: %s\n", uploadResp.UploadMethod) - fmt.Printf(" Type: %s\n", uploadMethod) - fmt.Printf(" Storage key: %s\n", uploadResp.StorageKey) - - // Choose upload method based on server response - if uploadResp.UploadMethod == "s3" { - return c.streamIncrementalToS3(snapshot, base, uploadResp.UploadURL, uploadResp.StorageKey) - } - return c.sendIncrementalViaZFS(snapshot, base, uploadResp.StorageKey) -} - -// streamIncrementalToS3 streams an incremental ZFS snapshot to S3 using AWS SDK. -func (c *Client) streamIncrementalToS3(snapshot *zfs.Dataset, base, uploadURL, storageKey string) error { - fmt.Printf("→ Uploading snapshot to S3...\n") - - // Ensure endpoint has valid URI scheme - endpoint := c.config.S3Endpoint - if endpoint != "" && !strings.HasPrefix(endpoint, "http://") && !strings.HasPrefix(endpoint, "https://") { - endpoint = "http://" + endpoint - } - - // Create AWS config - awsCfg, err := config.LoadDefaultConfig(context.TODO(), - config.WithRegion(c.config.S3Region), - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( - c.config.S3AccessKey, - c.config.S3SecretKey, - "", - )), - ) - if err != nil { - return fmt.Errorf("failed to load AWS config: %v", err) - } - - // Determine if using custom endpoint (non-AWS) - customEndpoint := endpoint != "" && endpoint != "http://s3.amazonaws.com" && endpoint != "https://s3.amazonaws.com" - - // Create S3 client - s3Client := s3.NewFromConfig(awsCfg, func(o *s3.Options) { - if customEndpoint { - o.BaseEndpoint = aws.String(endpoint) - o.UsePathStyle = true // Required for MinIO compatible storage - } - }) - - // Create ZFS send command - var cmd *exec.Cmd - if base != "" { - // Incremental send from bookmark or snapshot - fmt.Printf(" Base: %s\n", base) - cmd = exec.Command("zfs", "send", "-i", base, snapshot.Name) - } else { - // Full send - cmd = exec.Command("zfs", "send", snapshot.Name) - } - - zfsOut, err := cmd.StdoutPipe() - if err != nil { - return fmt.Errorf("failed to create pipe: %v", err) - } - - if err := cmd.Start(); err != nil { - return fmt.Errorf("failed to start zfs send: %v", err) - } - - var reader io.Reader = zfsOut - - // Apply LZ4 compression if enabled - if c.config.Compress { - fmt.Printf(" Compressing with LZ4...\n") - pr, pw := io.Pipe() - lz4Writer := lz4.NewWriter(pw) - lz4Writer.Apply(lz4.BlockSizeOption(lz4.BlockSize(4 * 1024 * 1024))) // 4MB blocks for better performance - - go func() { - // Copy zfs output to LZ4 writer - io.Copy(lz4Writer, zfsOut) - // Close LZ4 writer first to flush, then close pipe - lz4Writer.Close() - pw.Close() - }() - - reader = pr - } - - // Upload to S3 using PutObject - _, err = s3Client.PutObject(context.TODO(), &s3.PutObjectInput{ - Bucket: aws.String(c.config.S3Bucket), - Key: aws.String(storageKey), - Body: reader, - ContentType: aws.String("application/octet-stream"), - }) - - // Wait for zfs send to complete - if err := cmd.Wait(); err != nil { - return fmt.Errorf("zfs send failed: %v", err) - } - - if err != nil { - return fmt.Errorf("failed to upload to S3: %v", err) - } - - fmt.Printf("✓ Snapshot uploaded to S3 successfully!\n") - - return nil -} - -// sendIncrementalViaZFS sends an incremental snapshot via ZFS send/receive over SSH. -// This method is used when the server uses local ZFS storage. -func (c *Client) sendIncrementalViaZFS(snapshot *zfs.Dataset, base, receivePath string) error { - fmt.Printf("→ Sending via ZFS send/receive...\n") - - // Extract server host from URL - serverHost := c.config.ServerURL - if len(serverHost) > 7 && strings.HasPrefix(serverHost, "http://") { - serverHost = serverHost[7:] - } else if len(serverHost) > 8 && strings.HasPrefix(serverHost, "https://") { - serverHost = serverHost[8:] - } - - // Remove port if present - if idx := strings.LastIndex(serverHost, ":"); idx > 0 { - serverHost = serverHost[:idx] - } - - // Build zfs send command - var zfsSendCmd string - if base != "" { - // Incremental send - fmt.Printf(" Base: %s\n", base) - zfsSendCmd = fmt.Sprintf("zfs send -i %s %s", base, snapshot.Name) - } else { - // Full send - zfsSendCmd = fmt.Sprintf("zfs send %s", snapshot.Name) - } - - // Execute ZFS send over SSH - cmd := exec.Command("sh", "-c", - fmt.Sprintf("%s | ssh %s 'zfs recv -F %s'", zfsSendCmd, serverHost, receivePath)) - - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - if err := cmd.Run(); err != nil { - return fmt.Errorf("failed to send snapshot: %v", err) - } - - fmt.Printf("✓ Snapshot sent successfully!\n") - return nil + return c.SendIncrementalHTTP(snapshot, base) }