This commit is contained in:
2026-02-15 13:13:11 +01:00
parent 8b592db3dd
commit 2a5221c29a
8 changed files with 243 additions and 35 deletions

View File

@@ -238,6 +238,7 @@ func (s *Server) HandleUpload(w http.ResponseWriter, r *http.Request) {
Message: "Ready to receive snapshot",
UploadMethod: "zfs-receive",
StorageKey: snapshotName,
UploadURL: fmt.Sprintf("/upload-stream/%s", req.ClientID),
})
}
}
@@ -280,23 +281,48 @@ func (s *Server) HandleUploadStream(w http.ResponseWriter, r *http.Request) {
size = -1 // Use streaming upload for unknown size
}
if s.s3Backend == nil {
log.Printf("Error: S3 backend not initialized")
http.Error(w, "S3 backend not configured", http.StatusInternalServerError)
if s.s3Backend == nil && s.localBackend == nil {
log.Printf("Error: No storage backend configured")
http.Error(w, "No storage backend configured", http.StatusInternalServerError)
return
}
if err := s.s3Backend.Upload(ctx, storageKey, r.Body, size); err != nil {
log.Printf("Error uploading to S3: %v", err)
http.Error(w, "Upload failed", http.StatusInternalServerError)
// Determine storage type based on client configuration
client, err := s.db.GetClient(clientID)
if err != nil || client == nil {
http.Error(w, "Client not found", http.StatusNotFound)
return
}
// Get actual size after upload
actualSize, err := s.s3Backend.GetSize(ctx, storageKey)
if err != nil {
log.Printf("Error getting object size: %v", err)
var actualSize int64
// Handle based on storage type
if client.StorageType == "s3" && s.s3Backend != nil {
// Upload to S3
if err := s.s3Backend.Upload(ctx, storageKey, r.Body, size); err != nil {
log.Printf("Error uploading to S3: %v", err)
http.Error(w, "Upload failed", http.StatusInternalServerError)
return
}
// Get actual size after upload
actualSize, err = s.s3Backend.GetSize(ctx, storageKey)
if err != nil {
log.Printf("Error getting object size: %v", err)
actualSize = size
}
} else if client.StorageType == "local" && s.localBackend != nil {
// Upload to local ZFS
if err := s.localBackend.Receive(storageKey, r.Body, compressedStr == "true"); err != nil {
log.Printf("Error uploading to local ZFS: %v", err)
http.Error(w, "Upload failed", http.StatusInternalServerError)
return
}
actualSize = size
} else {
log.Printf("Error: Storage type %s not configured", client.StorageType)
http.Error(w, "Storage type not configured", http.StatusInternalServerError)
return
}
// Save metadata to database

View File

@@ -6,12 +6,15 @@ import (
"io"
"log"
"net/http"
"os"
"os/exec"
"strings"
"time"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/mistifyio/go-zfs"
"github.com/pierrec/lz4/v4"
)
// StorageBackend defines the interface for different storage types
@@ -138,6 +141,41 @@ func (l *LocalBackend) Upload(ctx context.Context, key string, data io.Reader, s
return fmt.Errorf("local backend upload not supported via storage interface, use zfs receive endpoint")
}
// Receive receives a ZFS snapshot stream and restores it to the local dataset
func (l *LocalBackend) Receive(snapshotName string, data io.Reader, compressed bool) error {
// Extract the target dataset from the snapshot name
// snapshotName format: dataset@name -> we want just the dataset part
parts := strings.Split(snapshotName, "@")
if len(parts) != 2 {
return fmt.Errorf("invalid snapshot name format: %s", snapshotName)
}
targetDataset := parts[0]
log.Printf("Receiving ZFS snapshot to %s (compressed: %v)", targetDataset, compressed)
// If compressed, decompress with LZ4 first
var reader io.Reader = data
if compressed {
lz4Reader := lz4.NewReader(data)
reader = lz4Reader
}
// Use go-zfs library to receive the snapshot (with -F force flag)
// Note: The library's ReceiveSnapshot doesn't support -F, so we use exec.Command
cmd := exec.Command("zfs", "receive", "-F", snapshotName)
cmd.Stdin = reader
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return fmt.Errorf("zfs receive failed: %v", err)
}
log.Printf("Successfully received snapshot: %s", snapshotName)
return nil
}
// Download creates a zfs send stream
func (l *LocalBackend) Download(ctx context.Context, key string) (io.ReadCloser, error) {
cmd := exec.CommandContext(ctx, "zfs", "send", key)