Files
zfs/internal/server/storage.go
2026-02-15 12:58:05 +01:00

211 lines
5.4 KiB
Go

package server
import (
"context"
"fmt"
"io"
"log"
"net/http"
"os/exec"
"time"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/mistifyio/go-zfs"
)
// StorageBackend defines the interface for different storage types
type StorageBackend interface {
Upload(ctx context.Context, key string, data io.Reader, size int64) error
Download(ctx context.Context, key string) (io.ReadCloser, error)
Delete(ctx context.Context, key string) error
List(ctx context.Context, prefix string) ([]string, error)
GetSize(ctx context.Context, key string) (int64, error)
}
// S3Backend implements StorageBackend for S3-compatible storage using minio-go
type S3Backend struct {
client *minio.Client
bucketName string
}
// NewS3Backend creates a new S3 storage backend using minio-go
func NewS3Backend(endpoint, accessKey, secretKey, bucketName string, useSSL bool) (*S3Backend, error) {
// Create custom HTTP transport with extended timeouts for large file uploads
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: nil,
IdleConnTimeout: 90 * time.Second,
// Connection pooling
MaxIdleConns: 10,
MaxIdleConnsPerHost: 10,
}
client, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKey, secretKey, ""),
Secure: useSSL,
Transport: transport,
})
if err != nil {
return nil, fmt.Errorf("failed to create S3 client: %v", err)
}
// Ensure bucket exists
ctx := context.Background()
exists, err := client.BucketExists(ctx, bucketName)
if err != nil {
return nil, fmt.Errorf("failed to check bucket: %v", err)
}
if !exists {
err = client.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create bucket: %v", err)
}
log.Printf("Created S3 bucket: %s", bucketName)
}
return &S3Backend{
client: client,
bucketName: bucketName,
}, nil
}
// Upload uploads data to S3 using minio-go
func (s *S3Backend) Upload(ctx context.Context, key string, data io.Reader, size int64) error {
_, err := s.client.PutObject(ctx, s.bucketName, key, data, size,
minio.PutObjectOptions{
ContentType: "application/octet-stream",
PartSize: 10 * 1024 * 1024, // 10MB parts
})
return err
}
// Download retrieves data from S3
func (s *S3Backend) Download(ctx context.Context, key string) (io.ReadCloser, error) {
obj, err := s.client.GetObject(ctx, s.bucketName, key, minio.GetObjectOptions{})
if err != nil {
return nil, err
}
return obj, nil
}
// Delete removes an object from S3
func (s *S3Backend) Delete(ctx context.Context, key string) error {
return s.client.RemoveObject(ctx, s.bucketName, key, minio.RemoveObjectOptions{})
}
// List returns all objects with the given prefix
func (s *S3Backend) List(ctx context.Context, prefix string) ([]string, error) {
var keys []string
objectCh := s.client.ListObjects(ctx, s.bucketName, minio.ListObjectsOptions{
Prefix: prefix,
Recursive: true,
})
for object := range objectCh {
if object.Err != nil {
return nil, object.Err
}
keys = append(keys, object.Key)
}
return keys, nil
}
// GetSize returns the size of an object in S3
func (s *S3Backend) GetSize(ctx context.Context, key string) (int64, error) {
info, err := s.client.StatObject(ctx, s.bucketName, key, minio.StatObjectOptions{})
if err != nil {
return 0, err
}
return info.Size, nil
}
// LocalBackend implements StorageBackend for local ZFS storage
type LocalBackend struct {
baseDataset string
}
// NewLocalBackend creates a new local ZFS storage backend
func NewLocalBackend(baseDataset string) *LocalBackend {
return &LocalBackend{baseDataset: baseDataset}
}
// Upload is not supported for local backend
func (l *LocalBackend) Upload(ctx context.Context, key string, data io.Reader, size int64) error {
return fmt.Errorf("local backend upload not supported via storage interface, use zfs receive endpoint")
}
// Download creates a zfs send stream
func (l *LocalBackend) Download(ctx context.Context, key string) (io.ReadCloser, error) {
cmd := exec.CommandContext(ctx, "zfs", "send", key)
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
if err := cmd.Start(); err != nil {
return nil, err
}
return &cmdReadCloser{stdout: stdout, cmd: cmd}, nil
}
// Delete destroys a ZFS dataset
func (l *LocalBackend) Delete(ctx context.Context, key string) error {
ds, err := zfs.GetDataset(key)
if err != nil {
return err
}
return ds.Destroy(zfs.DestroyDefault)
}
// List returns all snapshots with the given prefix
func (l *LocalBackend) List(ctx context.Context, prefix string) ([]string, error) {
snapshots, err := zfs.Snapshots(prefix)
if err != nil {
return nil, err
}
var names []string
for _, snap := range snapshots {
names = append(names, snap.Name)
}
return names, nil
}
// GetSize returns the used size of a ZFS dataset
func (l *LocalBackend) GetSize(ctx context.Context, key string) (int64, error) {
ds, err := zfs.GetDataset(key)
if err != nil {
return 0, err
}
return int64(ds.Used), nil
}
// cmdReadCloser wraps stdout pipe to properly wait for command completion
type cmdReadCloser struct {
stdout io.ReadCloser
cmd *exec.Cmd
closed bool
}
func (c *cmdReadCloser) Read(p []byte) (int, error) {
return c.stdout.Read(p)
}
func (c *cmdReadCloser) Close() error {
if c.closed {
return nil
}
c.closed = true
err := c.stdout.Close()
waitErr := c.cmd.Wait()
if err != nil {
return err
}
return waitErr
}