package server import ( "context" "crypto/sha256" "encoding/hex" "encoding/json" "fmt" "io" "log" "net/http" "strings" "time" ) // Server manages snapshots from multiple clients with S3 support type Server struct { db *Database s3Backend *S3Backend localBackend *LocalBackend } // New creates a new snapshot server with SQLite database func New(dbPath string, s3Backend *S3Backend, localBackend *LocalBackend) (*Server, error) { db, err := NewDatabase(dbPath) if err != nil { return nil, fmt.Errorf("failed to initialize database: %v", err) } // Create default client if none exists if err := db.CreateDefaultClient(); err != nil { db.Close() return nil, fmt.Errorf("failed to create default client: %v", err) } // Create default admin if none exists if err := db.CreateDefaultAdmin(); err != nil { db.Close() return nil, fmt.Errorf("failed to create default admin: %v", err) } // Clean expired sessions db.CleanExpiredSessions() s := &Server{ db: db, s3Backend: s3Backend, localBackend: localBackend, } return s, nil } // Close closes the database connection func (s *Server) Close() error { return s.db.Close() } func (s *Server) authenticate(clientID, apiKey string) bool { client, err := s.db.GetClient(clientID) if err != nil || client == nil { return false } if !client.Enabled { return false } return client.APIKey == hashAPIKey(apiKey) } func (s *Server) getClientUsage(clientID string) int64 { usage, _ := s.db.GetClientUsage(clientID) return usage } func (s *Server) canAcceptSnapshot(clientID string, estimatedSize int64) (bool, string) { client, err := s.db.GetClient(clientID) if err != nil || client == nil { return false, "Client not found" } currentUsage := s.getClientUsage(clientID) if currentUsage+estimatedSize > client.MaxSizeBytes { return false, fmt.Sprintf("Quota exceeded: using %d/%d bytes", currentUsage, client.MaxSizeBytes) } return true, "OK" } func (s *Server) rotateSnapshots(clientID string) (int, int64) { client, err := s.db.GetClient(clientID) if err != nil || client == nil { return 0, 0 } currentUsage := s.getClientUsage(clientID) if currentUsage <= client.MaxSizeBytes { return 0, 0 } // Calculate how many bytes we need to free bytesToFree := currentUsage - client.MaxSizeBytes var deletedCount int var reclaimedBytes int64 // Get oldest snapshots and delete until we're under quota snapshots, err := s.db.GetOldestSnapshots(clientID, 100) // Get up to 100 oldest if err != nil { log.Printf("Error getting oldest snapshots: %v", err) return 0, 0 } var toDelete []*SnapshotMetadata for _, snap := range snapshots { if reclaimedBytes >= bytesToFree { break } toDelete = append(toDelete, snap) reclaimedBytes += snap.SizeBytes } if len(toDelete) == 0 { return 0, 0 } // Select appropriate backend var backend StorageBackend if s.s3Backend != nil { backend = s.s3Backend } else if s.localBackend != nil { backend = s.localBackend } else { log.Printf("No storage backend available for rotation") return 0, 0 } // Delete snapshots ctx := context.Background() for _, snap := range toDelete { if err := backend.Delete(ctx, snap.StorageKey); err != nil { log.Printf("Error deleting snapshot %s: %v", snap.StorageKey, err) continue } if err := s.db.DeleteSnapshot(clientID, snap.SnapshotID); err != nil { log.Printf("Error deleting snapshot record %s: %v", snap.SnapshotID, err) continue } log.Printf("Rotated out snapshot: %s (freed %d bytes)", snap.StorageKey, snap.SizeBytes) deletedCount++ } return deletedCount, reclaimedBytes } // HTTP Handlers // HandleUpload handles snapshot upload requests func (s *Server) HandleUpload(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } var req UploadRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { respondJSON(w, http.StatusBadRequest, UploadResponse{ Success: false, Message: "Invalid request", }) return } if !s.authenticate(req.ClientID, req.APIKey) { respondJSON(w, http.StatusUnauthorized, UploadResponse{ Success: false, Message: "Authentication failed", }) return } // Check quota estimatedSize := req.EstimatedSize if estimatedSize == 0 { estimatedSize = 1 * 1024 * 1024 * 1024 // Default 1GB estimate } canAccept, msg := s.canAcceptSnapshot(req.ClientID, estimatedSize) if !canAccept { respondJSON(w, http.StatusForbidden, UploadResponse{ Success: false, Message: msg, }) return } client, err := s.db.GetClient(req.ClientID) if err != nil || client == nil { respondJSON(w, http.StatusInternalServerError, UploadResponse{ Success: false, Message: "Failed to get client configuration", }) return } timestamp := time.Now().Format("2006-01-02_15:04:05") // Check if S3 backend is available for S3 storage type if client.StorageType == "s3" && s.s3Backend != nil { // S3 upload storageKey := fmt.Sprintf("%s/%s_%s.zfs", req.ClientID, req.DatasetName, timestamp) if req.Compressed { storageKey += ".lz4" } respondJSON(w, http.StatusOK, UploadResponse{ Success: true, Message: "Ready to receive snapshot", UploadMethod: "s3", StorageKey: storageKey, UploadURL: fmt.Sprintf("/upload-stream/%s", req.ClientID), }) } else if client.StorageType == "s3" && s.s3Backend == nil { // S3 requested but not configured respondJSON(w, http.StatusInternalServerError, UploadResponse{ Success: false, Message: "S3 storage requested but S3 backend is not configured on server", }) } else { // Local ZFS receive snapshotName := fmt.Sprintf("%s@%s_%s", client.Dataset, req.ClientID, timestamp) respondJSON(w, http.StatusOK, UploadResponse{ Success: true, Message: "Ready to receive snapshot", UploadMethod: "zfs-receive", StorageKey: snapshotName, }) } } // HandleUploadStream handles streaming snapshot uploads func (s *Server) HandleUploadStream(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // Extract client ID from URL parts := strings.Split(r.URL.Path, "/") if len(parts) < 3 { http.Error(w, "Invalid URL", http.StatusBadRequest) return } clientID := parts[2] // Get metadata from headers apiKey := r.Header.Get("X-API-Key") storageKey := r.Header.Get("X-Storage-Key") datasetName := r.Header.Get("X-Dataset-Name") compressedStr := r.Header.Get("X-Compressed") incrementalStr := r.Header.Get("X-Incremental") baseSnapshot := r.Header.Get("X-Base-Snapshot") if !s.authenticate(clientID, apiKey) { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } ctx := context.Background() // Upload to S3 // When using chunked transfer (io.Pipe), ContentLength is -1 // MinIO requires -1 for unknown size to use streaming upload size := r.ContentLength if size < 0 { size = -1 // Use streaming upload for unknown size } if s.s3Backend == nil { log.Printf("Error: S3 backend not initialized") http.Error(w, "S3 backend not configured", http.StatusInternalServerError) return } if err := s.s3Backend.Upload(ctx, storageKey, r.Body, size); err != nil { log.Printf("Error uploading to S3: %v", err) http.Error(w, "Upload failed", http.StatusInternalServerError) return } // Get actual size after upload actualSize, err := s.s3Backend.GetSize(ctx, storageKey) if err != nil { log.Printf("Error getting object size: %v", err) actualSize = size } // Save metadata to database metadata := &SnapshotMetadata{ ClientID: clientID, SnapshotID: storageKey, Timestamp: time.Now(), SizeBytes: actualSize, DatasetName: datasetName, StorageKey: storageKey, StorageType: "s3", Compressed: compressedStr == "true", Incremental: incrementalStr == "true", BaseSnapshot: baseSnapshot, } if err := s.db.SaveSnapshot(metadata); err != nil { log.Printf("Error saving snapshot metadata: %v", err) } respondJSON(w, http.StatusOK, map[string]interface{}{ "success": true, "message": "Snapshot uploaded successfully", "size": actualSize, }) } // HandleStatus handles status requests func (s *Server) HandleStatus(w http.ResponseWriter, r *http.Request) { clientID := r.URL.Query().Get("client_id") apiKey := r.URL.Query().Get("api_key") if !s.authenticate(clientID, apiKey) { respondJSON(w, http.StatusUnauthorized, StatusResponse{Success: false}) return } client, err := s.db.GetClient(clientID) if err != nil || client == nil { respondJSON(w, http.StatusInternalServerError, StatusResponse{Success: false}) return } snapshots, err := s.db.GetSnapshotsByClient(clientID) if err != nil { log.Printf("Error getting snapshots: %v", err) snapshots = []*SnapshotMetadata{} } usedBytes := s.getClientUsage(clientID) percentUsed := float64(usedBytes) / float64(client.MaxSizeBytes) * 100 respondJSON(w, http.StatusOK, StatusResponse{ Success: true, TotalSnapshots: len(snapshots), UsedBytes: usedBytes, MaxBytes: client.MaxSizeBytes, PercentUsed: percentUsed, Snapshots: snapshots, StorageType: client.StorageType, }) } // HandleRotate handles snapshot rotation requests func (s *Server) HandleRotate(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } var req struct { ClientID string `json:"client_id"` APIKey string `json:"api_key"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "Invalid request", http.StatusBadRequest) return } if !s.authenticate(req.ClientID, req.APIKey) { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } deletedCount, reclaimedBytes := s.rotateSnapshots(req.ClientID) respondJSON(w, http.StatusOK, map[string]interface{}{ "success": true, "deleted_count": deletedCount, "reclaimed_bytes": reclaimedBytes, }) } // HandleDownload handles snapshot download requests func (s *Server) HandleDownload(w http.ResponseWriter, r *http.Request) { clientID := r.URL.Query().Get("client_id") apiKey := r.URL.Query().Get("api_key") snapshotID := r.URL.Query().Get("snapshot_id") if !s.authenticate(clientID, apiKey) { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } // Find snapshot metadata client, err := s.db.GetClient(clientID) if err != nil || client == nil { http.Error(w, "Client not found", http.StatusNotFound) return } targetSnapshot, err := s.db.GetSnapshotByID(clientID, snapshotID) if err != nil || targetSnapshot == nil { http.Error(w, "Snapshot not found", http.StatusNotFound) return } ctx := context.Background() var backend StorageBackend if client.StorageType == "s3" && s.s3Backend != nil { backend = s.s3Backend } else if s.localBackend != nil { backend = s.localBackend } else { http.Error(w, "No storage backend available", http.StatusInternalServerError) return } // Download from storage reader, err := backend.Download(ctx, targetSnapshot.StorageKey) if err != nil { log.Printf("Error downloading snapshot: %v", err) http.Error(w, "Download failed", http.StatusInternalServerError) return } defer reader.Close() // Stream to client w.Header().Set("Content-Type", "application/octet-stream") w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", targetSnapshot.StorageKey)) if _, err := io.Copy(w, reader); err != nil { log.Printf("Error streaming snapshot: %v", err) } } // HandleHealth handles health check requests func (s *Server) HandleHealth(w http.ResponseWriter, r *http.Request) { respondJSON(w, http.StatusOK, map[string]interface{}{ "status": "healthy", "time": time.Now(), }) } // HandleRotationPolicy handles rotation policy requests from clients. // Returns the rotation policy configured for the client, if any. // If a policy is set, the client must use it and cannot override it. func (s *Server) HandleRotationPolicy(w http.ResponseWriter, r *http.Request) { clientID := r.URL.Query().Get("client_id") apiKey := r.URL.Query().Get("api_key") if !s.authenticate(clientID, apiKey) { respondJSON(w, http.StatusUnauthorized, RotationPolicyResponse{ Success: false, Message: "Authentication failed", }) return } client, err := s.db.GetClient(clientID) if err != nil || client == nil { respondJSON(w, http.StatusNotFound, RotationPolicyResponse{ Success: false, Message: "Client not found", }) return } // Check if server-managed rotation policy is configured if client.RotationPolicy != nil { respondJSON(w, http.StatusOK, RotationPolicyResponse{ Success: true, Message: "Server-managed rotation policy", RotationPolicy: client.RotationPolicy, ServerManaged: true, }) return } // No server-managed policy - client can use its own defaults respondJSON(w, http.StatusOK, RotationPolicyResponse{ Success: true, Message: "No server-managed policy, client can use defaults", RotationPolicy: nil, ServerManaged: false, }) } // RegisterRoutes registers all HTTP routes func (s *Server) RegisterRoutes(mux *http.ServeMux) { // Client API routes mux.HandleFunc("/upload", s.HandleUpload) mux.HandleFunc("/upload-stream/", s.HandleUploadStream) mux.HandleFunc("/status", s.HandleStatus) mux.HandleFunc("/rotate", s.HandleRotate) mux.HandleFunc("/download", s.HandleDownload) mux.HandleFunc("/health", s.HandleHealth) mux.HandleFunc("/rotation-policy", s.HandleRotationPolicy) mux.HandleFunc("/client/change-password", s.handleClientChangePassword) // Admin API routes mux.HandleFunc("/admin/login", s.handleAdminLogin) mux.HandleFunc("/admin/logout", s.handleAdminLogout) mux.HandleFunc("/admin/check", s.handleAdminCheck) mux.HandleFunc("/admin/clients", s.handleAdminGetClients) mux.HandleFunc("/admin/client", s.handleAdminGetClient) mux.HandleFunc("/admin/client/create", s.handleAdminCreateClient) mux.HandleFunc("/admin/client/update", s.handleAdminUpdateClient) mux.HandleFunc("/admin/client/delete", s.handleAdminDeleteClient) mux.HandleFunc("/admin/client/reset-password", s.handleAdminResetClientPassword) mux.HandleFunc("/admin/snapshots", s.handleAdminGetSnapshots) mux.HandleFunc("/admin/snapshot/delete", s.handleAdminDeleteSnapshot) mux.HandleFunc("/admin/stats", s.handleAdminGetStats) mux.HandleFunc("/admin/admins", s.handleAdminGetAdmins) mux.HandleFunc("/admin/admin/create", s.handleAdminCreateAdmin) mux.HandleFunc("/admin/admin/delete", s.handleAdminDeleteAdmin) mux.HandleFunc("/admin/admin/password", s.handleAdminChangePassword) // Admin static files mux.HandleFunc("/admin/static/", s.handleAdminStatic) // Admin UI (static files served from /admin/) mux.HandleFunc("/admin/", s.handleAdminUI) } func respondJSON(w http.ResponseWriter, status int, data interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) json.NewEncoder(w).Encode(data) } func hashAPIKey(key string) string { hash := sha256.Sum256([]byte(key)) return hex.EncodeToString(hash[:]) }