fix
This commit is contained in:
@@ -134,9 +134,11 @@ func (c *Client) streamToS3(snapshot *zfs.Dataset, uploadURL, storageKey string)
|
||||
gzWriter := gzip.NewWriter(pw)
|
||||
|
||||
go func() {
|
||||
defer pw.Close()
|
||||
defer gzWriter.Close()
|
||||
// Copy zfs output to gzip writer
|
||||
io.Copy(gzWriter, zfsOut)
|
||||
// Close gzip writer first to flush footer, then close pipe
|
||||
gzWriter.Close()
|
||||
pw.Close()
|
||||
}()
|
||||
|
||||
reader = pr
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"sort"
|
||||
"strings"
|
||||
@@ -186,7 +187,11 @@ func (c *Client) SendIncremental(snapshot *zfs.Dataset, base string) error {
|
||||
fmt.Printf(" Type: %s\n", uploadMethod)
|
||||
fmt.Printf(" Storage key: %s\n", uploadResp.StorageKey)
|
||||
|
||||
return c.streamIncrementalToS3(snapshot, base, uploadResp.UploadURL, uploadResp.StorageKey)
|
||||
// Choose upload method based on server response
|
||||
if uploadResp.UploadMethod == "s3" {
|
||||
return c.streamIncrementalToS3(snapshot, base, uploadResp.UploadURL, uploadResp.StorageKey)
|
||||
}
|
||||
return c.sendIncrementalViaZFS(snapshot, base, uploadResp.StorageKey)
|
||||
}
|
||||
|
||||
// streamIncrementalToS3 streams an incremental ZFS snapshot to S3.
|
||||
@@ -222,9 +227,11 @@ func (c *Client) streamIncrementalToS3(snapshot *zfs.Dataset, base, uploadURL, s
|
||||
gzWriter := gzip.NewWriter(pw)
|
||||
|
||||
go func() {
|
||||
defer pw.Close()
|
||||
defer gzWriter.Close()
|
||||
// Copy zfs output to gzip writer
|
||||
io.Copy(gzWriter, zfsOut)
|
||||
// Close gzip writer first to flush footer, then close pipe
|
||||
gzWriter.Close()
|
||||
pw.Close()
|
||||
}()
|
||||
|
||||
reader = pr
|
||||
@@ -289,6 +296,50 @@ func (c *Client) streamIncrementalToS3(snapshot *zfs.Dataset, base, uploadURL, s
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendIncrementalViaZFS sends an incremental snapshot via ZFS send/receive over SSH.
|
||||
// This method is used when the server uses local ZFS storage.
|
||||
func (c *Client) sendIncrementalViaZFS(snapshot *zfs.Dataset, base, receivePath string) error {
|
||||
fmt.Printf("-> Sending via ZFS send/receive...\n")
|
||||
|
||||
// Extract server host from URL
|
||||
serverHost := c.config.ServerURL
|
||||
if len(serverHost) > 7 && strings.HasPrefix(serverHost, "http://") {
|
||||
serverHost = serverHost[7:]
|
||||
} else if len(serverHost) > 8 && strings.HasPrefix(serverHost, "https://") {
|
||||
serverHost = serverHost[8:]
|
||||
}
|
||||
|
||||
// Remove port if present
|
||||
if idx := strings.LastIndex(serverHost, ":"); idx > 0 {
|
||||
serverHost = serverHost[:idx]
|
||||
}
|
||||
|
||||
// Build zfs send command
|
||||
var zfsSendCmd string
|
||||
if base != "" {
|
||||
// Incremental send
|
||||
fmt.Printf(" Base: %s\n", base)
|
||||
zfsSendCmd = fmt.Sprintf("zfs send -i %s %s", base, snapshot.Name)
|
||||
} else {
|
||||
// Full send
|
||||
zfsSendCmd = fmt.Sprintf("zfs send %s", snapshot.Name)
|
||||
}
|
||||
|
||||
// Execute ZFS send over SSH
|
||||
cmd := exec.Command("sh", "-c",
|
||||
fmt.Sprintf("%s | ssh %s 'zfs recv -F %s'", zfsSendCmd, serverHost, receivePath))
|
||||
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
|
||||
if err := cmd.Run(); err != nil {
|
||||
return fmt.Errorf("failed to send snapshot: %v", err)
|
||||
}
|
||||
|
||||
fmt.Printf("Snapshot sent successfully!\n")
|
||||
return nil
|
||||
}
|
||||
|
||||
// RotateLocalSnapshots removes old snapshots based on the retention policy.
|
||||
// This is similar to zfs-auto-snapshot's rotation behavior.
|
||||
func (c *Client) RotateLocalSnapshots(policy *SnapshotPolicy) error {
|
||||
|
||||
@@ -289,7 +289,8 @@ func (s *Server) HandleUpload(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
timestamp := time.Now().Format("2006-01-02_15:04:05")
|
||||
|
||||
if client.StorageType == "s3" {
|
||||
// Check if S3 backend is available for S3 storage type
|
||||
if client.StorageType == "s3" && s.s3Backend != nil {
|
||||
// S3 upload
|
||||
storageKey := fmt.Sprintf("%s/%s_%s.zfs", req.ClientID, req.DatasetName, timestamp)
|
||||
if req.Compressed {
|
||||
@@ -303,6 +304,12 @@ func (s *Server) HandleUpload(w http.ResponseWriter, r *http.Request) {
|
||||
StorageKey: storageKey,
|
||||
UploadURL: fmt.Sprintf("/upload-stream/%s", req.ClientID),
|
||||
})
|
||||
} else if client.StorageType == "s3" && s.s3Backend == nil {
|
||||
// S3 requested but not configured
|
||||
respondJSON(w, http.StatusInternalServerError, UploadResponse{
|
||||
Success: false,
|
||||
Message: "S3 storage requested but S3 backend is not configured on server",
|
||||
})
|
||||
} else {
|
||||
// Local ZFS receive
|
||||
snapshotName := fmt.Sprintf("%s@%s_%s", client.Dataset, req.ClientID, timestamp)
|
||||
@@ -346,9 +353,17 @@ func (s *Server) HandleUploadStream(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Upload to S3
|
||||
// When using chunked transfer (io.Pipe), ContentLength is -1
|
||||
// MinIO requires -1 for unknown size to use streaming upload
|
||||
size := r.ContentLength
|
||||
if size < 0 {
|
||||
size = 0
|
||||
size = -1 // Use streaming upload for unknown size
|
||||
}
|
||||
|
||||
if s.s3Backend == nil {
|
||||
log.Printf("Error: S3 backend not initialized")
|
||||
http.Error(w, "S3 backend not configured", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.s3Backend.Upload(ctx, storageKey, r.Body, size); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user