package server import ( "context" "crypto/sha256" "encoding/hex" "encoding/json" "fmt" "io" "log" "net/http" "os" "sort" "strings" "sync" "time" ) // Server manages snapshots from multiple clients with S3 support type Server struct { clients map[string]*ClientConfig snapshots map[string][]*SnapshotMetadata mu sync.RWMutex s3Backend *S3Backend localBackend *LocalBackend metadataFile string configFile string } // New creates a new snapshot server func New(configFile, metadataFile string, s3Backend *S3Backend, localBackend *LocalBackend) *Server { s := &Server{ clients: make(map[string]*ClientConfig), snapshots: make(map[string][]*SnapshotMetadata), s3Backend: s3Backend, localBackend: localBackend, metadataFile: metadataFile, configFile: configFile, } s.loadConfig() s.loadMetadata() return s } func (s *Server) loadConfig() { data, err := os.ReadFile(s.configFile) if err != nil { log.Printf("Warning: Could not read config file: %v", err) // Create default config s.clients["client1"] = &ClientConfig{ ClientID: "client1", APIKey: hashAPIKey("secret123"), MaxSizeBytes: 100 * 1024 * 1024 * 1024, Dataset: "backup/client1", Enabled: true, StorageType: "s3", } s.saveConfig() return } var clients []*ClientConfig if err := json.Unmarshal(data, &clients); err != nil { log.Printf("Error parsing config: %v", err) return } for _, client := range clients { s.clients[client.ClientID] = client } log.Printf("Loaded %d client configurations", len(s.clients)) } func (s *Server) saveConfig() { s.mu.RLock() defer s.mu.RUnlock() var clients []*ClientConfig for _, client := range s.clients { clients = append(clients, client) } data, err := json.MarshalIndent(clients, "", " ") if err != nil { log.Printf("Error marshaling config: %v", err) return } if err := os.WriteFile(s.configFile, data, 0600); err != nil { log.Printf("Error writing config: %v", err) } } func (s *Server) loadMetadata() { data, err := os.ReadFile(s.metadataFile) if err != nil { log.Printf("No existing metadata file, starting fresh") return } if err := json.Unmarshal(data, &s.snapshots); err != nil { log.Printf("Error parsing metadata: %v", err) return } totalSnapshots := 0 for _, snaps := range s.snapshots { totalSnapshots += len(snaps) } log.Printf("Loaded metadata for %d snapshots", totalSnapshots) } func (s *Server) saveMetadata() { s.mu.RLock() defer s.mu.RUnlock() data, err := json.MarshalIndent(s.snapshots, "", " ") if err != nil { log.Printf("Error marshaling metadata: %v", err) return } if err := os.WriteFile(s.metadataFile, data, 0600); err != nil { log.Printf("Error writing metadata: %v", err) } } func (s *Server) authenticate(clientID, apiKey string) bool { s.mu.RLock() defer s.mu.RUnlock() client, exists := s.clients[clientID] if !exists || !client.Enabled { return false } return client.APIKey == hashAPIKey(apiKey) } func (s *Server) getClientUsage(clientID string) int64 { s.mu.RLock() defer s.mu.RUnlock() var total int64 for _, snap := range s.snapshots[clientID] { total += snap.SizeBytes } return total } func (s *Server) canAcceptSnapshot(clientID string, estimatedSize int64) (bool, string) { s.mu.RLock() defer s.mu.RUnlock() client, exists := s.clients[clientID] if !exists { return false, "Client not found" } currentUsage := s.getClientUsage(clientID) if currentUsage+estimatedSize > client.MaxSizeBytes { return false, fmt.Sprintf("Quota exceeded: using %d/%d bytes", currentUsage, client.MaxSizeBytes) } return true, "OK" } func (s *Server) rotateSnapshots(clientID string) (int, int64) { // First pass: collect snapshots to delete while holding lock s.mu.Lock() client, exists := s.clients[clientID] if !exists { s.mu.Unlock() return 0, 0 } snapshots := s.snapshots[clientID] if len(snapshots) == 0 { s.mu.Unlock() return 0, 0 } // Sort by timestamp (oldest first) sort.Slice(snapshots, func(i, j int) bool { return snapshots[i].Timestamp.Before(snapshots[j].Timestamp) }) currentUsage := int64(0) for _, snap := range snapshots { currentUsage += snap.SizeBytes } // Collect snapshots to delete var toDelete []*SnapshotMetadata for currentUsage > client.MaxSizeBytes && len(snapshots) > 1 { oldest := snapshots[0] toDelete = append(toDelete, oldest) currentUsage -= oldest.SizeBytes snapshots = snapshots[1:] } // Update state before I/O s.snapshots[clientID] = snapshots s.mu.Unlock() if len(toDelete) == 0 { return 0, 0 } // Select appropriate backend var backend StorageBackend if client.StorageType == "s3" { backend = s.s3Backend } else { backend = s.localBackend } // Second pass: delete without holding lock deletedCount := 0 reclaimedBytes := int64(0) ctx := context.Background() for _, snap := range toDelete { if err := backend.Delete(ctx, snap.StorageKey); err != nil { log.Printf("Error deleting snapshot %s: %v", snap.StorageKey, err) continue } log.Printf("Rotated out snapshot: %s (freed %d bytes)", snap.StorageKey, snap.SizeBytes) reclaimedBytes += snap.SizeBytes deletedCount++ } // Save metadata after deletions s.saveMetadata() return deletedCount, reclaimedBytes } // HTTP Handlers // HandleUpload handles snapshot upload requests func (s *Server) HandleUpload(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } var req UploadRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { respondJSON(w, http.StatusBadRequest, UploadResponse{ Success: false, Message: "Invalid request", }) return } if !s.authenticate(req.ClientID, req.APIKey) { respondJSON(w, http.StatusUnauthorized, UploadResponse{ Success: false, Message: "Authentication failed", }) return } // Check quota estimatedSize := req.EstimatedSize if estimatedSize == 0 { estimatedSize = 1 * 1024 * 1024 * 1024 // Default 1GB estimate } canAccept, msg := s.canAcceptSnapshot(req.ClientID, estimatedSize) if !canAccept { respondJSON(w, http.StatusForbidden, UploadResponse{ Success: false, Message: msg, }) return } s.mu.RLock() client := s.clients[req.ClientID] s.mu.RUnlock() timestamp := time.Now().Format("2006-01-02_15:04:05") if client.StorageType == "s3" { // S3 upload storageKey := fmt.Sprintf("%s/%s_%s.zfs", req.ClientID, req.DatasetName, timestamp) if req.Compressed { storageKey += ".gz" } respondJSON(w, http.StatusOK, UploadResponse{ Success: true, Message: "Ready to receive snapshot", UploadMethod: "s3", StorageKey: storageKey, UploadURL: fmt.Sprintf("/upload-stream/%s", req.ClientID), }) } else { // Local ZFS receive snapshotName := fmt.Sprintf("%s@%s_%s", client.Dataset, req.ClientID, timestamp) respondJSON(w, http.StatusOK, UploadResponse{ Success: true, Message: "Ready to receive snapshot", UploadMethod: "zfs-receive", StorageKey: snapshotName, }) } } // HandleUploadStream handles streaming snapshot uploads func (s *Server) HandleUploadStream(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // Extract client ID from URL parts := strings.Split(r.URL.Path, "/") if len(parts) < 3 { http.Error(w, "Invalid URL", http.StatusBadRequest) return } clientID := parts[2] // Get metadata from headers apiKey := r.Header.Get("X-API-Key") storageKey := r.Header.Get("X-Storage-Key") datasetName := r.Header.Get("X-Dataset-Name") compressedStr := r.Header.Get("X-Compressed") incrementalStr := r.Header.Get("X-Incremental") baseSnapshot := r.Header.Get("X-Base-Snapshot") if !s.authenticate(clientID, apiKey) { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } ctx := context.Background() // Upload to S3 size := r.ContentLength if size < 0 { size = 0 } if err := s.s3Backend.Upload(ctx, storageKey, r.Body, size); err != nil { log.Printf("Error uploading to S3: %v", err) http.Error(w, "Upload failed", http.StatusInternalServerError) return } // Get actual size after upload actualSize, err := s.s3Backend.GetSize(ctx, storageKey) if err != nil { log.Printf("Error getting object size: %v", err) actualSize = size } // Save metadata s.mu.Lock() metadata := &SnapshotMetadata{ ClientID: clientID, SnapshotID: storageKey, Timestamp: time.Now(), SizeBytes: actualSize, DatasetName: datasetName, StorageKey: storageKey, StorageType: "s3", Compressed: compressedStr == "true", Incremental: incrementalStr == "true", BaseSnapshot: baseSnapshot, } s.snapshots[clientID] = append(s.snapshots[clientID], metadata) s.mu.Unlock() s.saveMetadata() respondJSON(w, http.StatusOK, map[string]interface{}{ "success": true, "message": "Snapshot uploaded successfully", "size": actualSize, }) } // HandleStatus handles status requests func (s *Server) HandleStatus(w http.ResponseWriter, r *http.Request) { clientID := r.URL.Query().Get("client_id") apiKey := r.URL.Query().Get("api_key") if !s.authenticate(clientID, apiKey) { respondJSON(w, http.StatusUnauthorized, StatusResponse{Success: false}) return } s.mu.RLock() client := s.clients[clientID] snapshots := s.snapshots[clientID] s.mu.RUnlock() usedBytes := s.getClientUsage(clientID) percentUsed := float64(usedBytes) / float64(client.MaxSizeBytes) * 100 respondJSON(w, http.StatusOK, StatusResponse{ Success: true, TotalSnapshots: len(snapshots), UsedBytes: usedBytes, MaxBytes: client.MaxSizeBytes, PercentUsed: percentUsed, Snapshots: snapshots, StorageType: client.StorageType, }) } // HandleRotate handles snapshot rotation requests func (s *Server) HandleRotate(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } var req struct { ClientID string `json:"client_id"` APIKey string `json:"api_key"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "Invalid request", http.StatusBadRequest) return } if !s.authenticate(req.ClientID, req.APIKey) { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } deletedCount, reclaimedBytes := s.rotateSnapshots(req.ClientID) respondJSON(w, http.StatusOK, map[string]interface{}{ "success": true, "deleted_count": deletedCount, "reclaimed_bytes": reclaimedBytes, }) } // HandleDownload handles snapshot download requests func (s *Server) HandleDownload(w http.ResponseWriter, r *http.Request) { clientID := r.URL.Query().Get("client_id") apiKey := r.URL.Query().Get("api_key") snapshotID := r.URL.Query().Get("snapshot_id") if !s.authenticate(clientID, apiKey) { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } // Find snapshot metadata s.mu.RLock() client := s.clients[clientID] var targetSnapshot *SnapshotMetadata for _, snap := range s.snapshots[clientID] { if snap.SnapshotID == snapshotID { targetSnapshot = snap break } } s.mu.RUnlock() if targetSnapshot == nil { http.Error(w, "Snapshot not found", http.StatusNotFound) return } ctx := context.Background() var backend StorageBackend if client.StorageType == "s3" { backend = s.s3Backend } else { backend = s.localBackend } // Download from storage reader, err := backend.Download(ctx, targetSnapshot.StorageKey) if err != nil { log.Printf("Error downloading snapshot: %v", err) http.Error(w, "Download failed", http.StatusInternalServerError) return } defer reader.Close() // Stream to client w.Header().Set("Content-Type", "application/octet-stream") w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", targetSnapshot.StorageKey)) if _, err := io.Copy(w, reader); err != nil { log.Printf("Error streaming snapshot: %v", err) } } // HandleHealth handles health check requests func (s *Server) HandleHealth(w http.ResponseWriter, r *http.Request) { respondJSON(w, http.StatusOK, map[string]interface{}{ "status": "healthy", "time": time.Now(), }) } // HandleRotationPolicy handles rotation policy requests from clients. // Returns the rotation policy configured for the client, if any. // If a policy is set, the client must use it and cannot override it. func (s *Server) HandleRotationPolicy(w http.ResponseWriter, r *http.Request) { clientID := r.URL.Query().Get("client_id") apiKey := r.URL.Query().Get("api_key") if !s.authenticate(clientID, apiKey) { respondJSON(w, http.StatusUnauthorized, RotationPolicyResponse{ Success: false, Message: "Authentication failed", }) return } s.mu.RLock() client, exists := s.clients[clientID] s.mu.RUnlock() if !exists { respondJSON(w, http.StatusNotFound, RotationPolicyResponse{ Success: false, Message: "Client not found", }) return } // Check if server-managed rotation policy is configured if client.RotationPolicy != nil { respondJSON(w, http.StatusOK, RotationPolicyResponse{ Success: true, Message: "Server-managed rotation policy", RotationPolicy: client.RotationPolicy, ServerManaged: true, }) return } // No server-managed policy - client can use its own defaults respondJSON(w, http.StatusOK, RotationPolicyResponse{ Success: true, Message: "No server-managed policy, client can use defaults", RotationPolicy: nil, ServerManaged: false, }) } // RegisterRoutes registers all HTTP routes func (s *Server) RegisterRoutes(mux *http.ServeMux) { mux.HandleFunc("/upload", s.HandleUpload) mux.HandleFunc("/upload-stream/", s.HandleUploadStream) mux.HandleFunc("/status", s.HandleStatus) mux.HandleFunc("/rotate", s.HandleRotate) mux.HandleFunc("/download", s.HandleDownload) mux.HandleFunc("/health", s.HandleHealth) mux.HandleFunc("/rotation-policy", s.HandleRotationPolicy) } func respondJSON(w http.ResponseWriter, status int, data interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) json.NewEncoder(w).Encode(data) } func hashAPIKey(key string) string { hash := sha256.Sum256([]byte(key)) return hex.EncodeToString(hash[:]) }