From 0123f718423d369707e07d0c5b17380d4eb0a677 Mon Sep 17 00:00:00 2001 From: Tw93 Date: Fri, 12 Dec 2025 14:10:01 +0800 Subject: [PATCH] feat: Parallelize metric collection, enhance app protection with receipt file scanning, and update cleanup tasks --- cmd/analyze/constants.go | 5 ++ cmd/analyze/heap.go | 44 +++++++++++ cmd/analyze/scanner.go | 163 +++++++++++++++++++++++---------------- 3 files changed, 144 insertions(+), 68 deletions(-) create mode 100644 cmd/analyze/heap.go diff --git a/cmd/analyze/constants.go b/cmd/analyze/constants.go index 8694854..19331ed 100644 --- a/cmd/analyze/constants.go +++ b/cmd/analyze/constants.go @@ -169,11 +169,16 @@ var skipSystemDirs = map[string]bool{ "bin": true, "etc": true, "var": true, + "opt": false, // User might want to specific check opt + "usr": false, // User might check usr + "Volumes": true, // Skip external drives by default when scanning root + "Network": true, // Skip network mounts ".vol": true, ".Spotlight-V100": true, ".fseventsd": true, ".DocumentRevisions-V100": true, ".TemporaryItems": true, + ".MobileBackups": true, // Time Machine local snapshots } var skipExtensions = map[string]bool{ diff --git a/cmd/analyze/heap.go b/cmd/analyze/heap.go new file mode 100644 index 0000000..e2f0d14 --- /dev/null +++ b/cmd/analyze/heap.go @@ -0,0 +1,44 @@ +package main + +// entryHeap implements heap.Interface for a min-heap of dirEntry (sorted by Size) +// Since we want Top N Largest, we use a Min Heap of size N. +// When adding a new item: +// 1. If heap size < N: push +// 2. If heap size == N and item > min (root): pop min, push item +// The heap will thus maintain the largest N items. +type entryHeap []dirEntry + +func (h entryHeap) Len() int { return len(h) } +func (h entryHeap) Less(i, j int) bool { return h[i].Size < h[j].Size } // Min-heap based on Size +func (h entryHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *entryHeap) Push(x interface{}) { + *h = append(*h, x.(dirEntry)) +} + +func (h *entryHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +// largeFileHeap implements heap.Interface for fileEntry +type largeFileHeap []fileEntry + +func (h largeFileHeap) Len() int { return len(h) } +func (h largeFileHeap) Less(i, j int) bool { return h[i].Size < h[j].Size } +func (h largeFileHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *largeFileHeap) Push(x interface{}) { + *h = append(*h, x.(fileEntry)) +} + +func (h *largeFileHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/cmd/analyze/scanner.go b/cmd/analyze/scanner.go index b49f1f8..ad2064d 100644 --- a/cmd/analyze/scanner.go +++ b/cmd/analyze/scanner.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "container/heap" "context" "fmt" "io/fs" @@ -29,8 +30,14 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in } var total int64 - entries := make([]dirEntry, 0, len(children)) - largeFiles := make([]fileEntry, 0, maxLargeFiles*2) + + // Use heaps to track Top N items, drastically reducing memory usage + // for directories with millions of files + entriesHeap := &entryHeap{} + heap.Init(entriesHeap) + + largeFilesHeap := &largeFileHeap{} + heap.Init(largeFilesHeap) // Use worker pool for concurrent directory scanning // For I/O-bound operations, use more workers than CPU count @@ -54,19 +61,31 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in entryChan := make(chan dirEntry, len(children)) largeFileChan := make(chan fileEntry, maxLargeFiles*2) - // Start goroutines to collect from channels + // Start goroutines to collect from channels into heaps var collectorWg sync.WaitGroup collectorWg.Add(2) go func() { defer collectorWg.Done() for entry := range entryChan { - entries = append(entries, entry) + // Maintain Top N Heap for entries + if entriesHeap.Len() < maxEntries { + heap.Push(entriesHeap, entry) + } else if entry.Size > (*entriesHeap)[0].Size { + heap.Pop(entriesHeap) + heap.Push(entriesHeap, entry) + } } }() go func() { defer collectorWg.Done() for file := range largeFileChan { - largeFiles = append(largeFiles, file) + // Maintain Top N Heap for large files + if largeFilesHeap.Len() < maxLargeFiles { + heap.Push(largeFilesHeap, file) + } else if file.Size > (*largeFilesHeap)[0].Size { + heap.Pop(largeFilesHeap) + heap.Push(largeFilesHeap, file) + } } }() @@ -113,7 +132,7 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in // Try du command first for folded dirs (much faster) size, err := getDirectorySizeFromDu(path) if err != nil || size <= 0 { - // Fallback to walk if du fails + // Fallback to concurrent walk if du fails size = calculateDirSizeFast(path, filesScanned, dirsScanned, bytesScanned, currentPath) } atomic.AddInt64(&total, size) @@ -182,11 +201,15 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in close(largeFileChan) collectorWg.Wait() - sort.Slice(entries, func(i, j int) bool { - return entries[i].Size > entries[j].Size - }) - if len(entries) > maxEntries { - entries = entries[:maxEntries] + // Convert Heaps to sorted slices (Descending order) + entries := make([]dirEntry, entriesHeap.Len()) + for i := len(entries) - 1; i >= 0; i-- { + entries[i] = heap.Pop(entriesHeap).(dirEntry) + } + + largeFiles := make([]fileEntry, largeFilesHeap.Len()) + for i := len(largeFiles) - 1; i >= 0; i-- { + largeFiles[i] = heap.Pop(largeFilesHeap).(fileEntry) } // Try to use Spotlight (mdfind) for faster large file discovery @@ -194,18 +217,15 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in // if Spotlight is unavailable or fails. The fallback is intentionally silent // because users only care about correct results, not the method used. if spotlightFiles := findLargeFilesWithSpotlight(root, minLargeFileSize); len(spotlightFiles) > 0 { + // Spotlight results are already sorted top N + // Use them in place of scanned large files largeFiles = spotlightFiles - } else { - // Use files collected during scanning (fallback path) - // Sort and trim large files collected from scanning - sort.Slice(largeFiles, func(i, j int) bool { - return largeFiles[i].Size > largeFiles[j].Size - }) - if len(largeFiles) > maxLargeFiles { - largeFiles = largeFiles[:maxLargeFiles] - } } + // Double check sorting consistency (Spotlight returns sorted, but heap pop handles scan results) + // If needed, we could re-sort largeFiles, but heap pop ensures ascending, and we filled reverse, so it's Descending. + // Spotlight returns Descending. So no extra sort needed for either. + return scanResult{ Entries: entries, LargeFiles: largeFiles, @@ -242,70 +262,77 @@ func shouldSkipFileForLargeTracking(path string) bool { return skipExtensions[ext] } -// calculateDirSizeFast performs fast directory size calculation without detailed tracking or large file detection. -// Updates progress counters in batches to reduce atomic operation overhead. +// calculateDirSizeFast performs concurrent directory size calculation using os.ReadDir +// This is a faster fallback than filepath.WalkDir when du fails func calculateDirSizeFast(root string, filesScanned, dirsScanned, bytesScanned *int64, currentPath *string) int64 { var total int64 - var localFiles, localDirs int64 - var batchBytes int64 + var wg sync.WaitGroup // Create context with timeout ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() - walkFunc := func(path string, d fs.DirEntry, err error) error { - // Check for timeout + // Limit total concurrency for this walk + concurrency := runtime.NumCPU() * 4 + if concurrency > 64 { + concurrency = 64 + } + sem := make(chan struct{}, concurrency) + + var walk func(string) + walk = func(dirPath string) { select { case <-ctx.Done(): - return ctx.Err() + return default: } - if err != nil { - return nil - } - if d.IsDir() { - localDirs++ - // Batch update every N dirs to reduce atomic operations - if localDirs%batchUpdateSize == 0 { - atomic.AddInt64(dirsScanned, batchUpdateSize) - localDirs = 0 - } - return nil - } - info, err := d.Info() - if err != nil { - return nil - } - // Get actual disk usage for sparse files and cloud files - size := getActualFileSize(path, info) - total += size - batchBytes += size - localFiles++ + if currentPath != nil { - *currentPath = path + *currentPath = dirPath } - // Batch update every N files to reduce atomic operations - if localFiles%batchUpdateSize == 0 { - atomic.AddInt64(filesScanned, batchUpdateSize) - atomic.AddInt64(bytesScanned, batchBytes) - localFiles = 0 - batchBytes = 0 + + entries, err := os.ReadDir(dirPath) + if err != nil { + return + } + + var localBytes, localFiles int64 + + for _, entry := range entries { + if entry.IsDir() { + // Directories: recurse concurrently + wg.Add(1) + // Capture loop variable + subDir := filepath.Join(dirPath, entry.Name()) + go func(p string) { + defer wg.Done() + sem <- struct{}{} // Acquire token + defer func() { <-sem }() // Release token + walk(p) + }(subDir) + atomic.AddInt64(dirsScanned, 1) + } else { + // Files: process immediately + info, err := entry.Info() + if err == nil { + size := getActualFileSize(filepath.Join(dirPath, entry.Name()), info) + localBytes += size + localFiles++ + } + } + } + + if localBytes > 0 { + atomic.AddInt64(&total, localBytes) + atomic.AddInt64(bytesScanned, localBytes) + } + if localFiles > 0 { + atomic.AddInt64(filesScanned, localFiles) } - return nil } - _ = filepath.WalkDir(root, walkFunc) - - // Final update for remaining counts - if localFiles > 0 { - atomic.AddInt64(filesScanned, localFiles) - } - if localDirs > 0 { - atomic.AddInt64(dirsScanned, localDirs) - } - if batchBytes > 0 { - atomic.AddInt64(bytesScanned, batchBytes) - } + walk(root) + wg.Wait() return total }