mirror of
https://github.com/tw93/Mole.git
synced 2026-02-15 05:50:12 +00:00
feat: Parallelize metric collection, enhance app protection with receipt file scanning, and update cleanup tasks
This commit is contained in:
@@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"container/heap"
|
||||
"context"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
@@ -29,8 +30,14 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in
|
||||
}
|
||||
|
||||
var total int64
|
||||
entries := make([]dirEntry, 0, len(children))
|
||||
largeFiles := make([]fileEntry, 0, maxLargeFiles*2)
|
||||
|
||||
// Use heaps to track Top N items, drastically reducing memory usage
|
||||
// for directories with millions of files
|
||||
entriesHeap := &entryHeap{}
|
||||
heap.Init(entriesHeap)
|
||||
|
||||
largeFilesHeap := &largeFileHeap{}
|
||||
heap.Init(largeFilesHeap)
|
||||
|
||||
// Use worker pool for concurrent directory scanning
|
||||
// For I/O-bound operations, use more workers than CPU count
|
||||
@@ -54,19 +61,31 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in
|
||||
entryChan := make(chan dirEntry, len(children))
|
||||
largeFileChan := make(chan fileEntry, maxLargeFiles*2)
|
||||
|
||||
// Start goroutines to collect from channels
|
||||
// Start goroutines to collect from channels into heaps
|
||||
var collectorWg sync.WaitGroup
|
||||
collectorWg.Add(2)
|
||||
go func() {
|
||||
defer collectorWg.Done()
|
||||
for entry := range entryChan {
|
||||
entries = append(entries, entry)
|
||||
// Maintain Top N Heap for entries
|
||||
if entriesHeap.Len() < maxEntries {
|
||||
heap.Push(entriesHeap, entry)
|
||||
} else if entry.Size > (*entriesHeap)[0].Size {
|
||||
heap.Pop(entriesHeap)
|
||||
heap.Push(entriesHeap, entry)
|
||||
}
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
defer collectorWg.Done()
|
||||
for file := range largeFileChan {
|
||||
largeFiles = append(largeFiles, file)
|
||||
// Maintain Top N Heap for large files
|
||||
if largeFilesHeap.Len() < maxLargeFiles {
|
||||
heap.Push(largeFilesHeap, file)
|
||||
} else if file.Size > (*largeFilesHeap)[0].Size {
|
||||
heap.Pop(largeFilesHeap)
|
||||
heap.Push(largeFilesHeap, file)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -113,7 +132,7 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in
|
||||
// Try du command first for folded dirs (much faster)
|
||||
size, err := getDirectorySizeFromDu(path)
|
||||
if err != nil || size <= 0 {
|
||||
// Fallback to walk if du fails
|
||||
// Fallback to concurrent walk if du fails
|
||||
size = calculateDirSizeFast(path, filesScanned, dirsScanned, bytesScanned, currentPath)
|
||||
}
|
||||
atomic.AddInt64(&total, size)
|
||||
@@ -182,11 +201,15 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in
|
||||
close(largeFileChan)
|
||||
collectorWg.Wait()
|
||||
|
||||
sort.Slice(entries, func(i, j int) bool {
|
||||
return entries[i].Size > entries[j].Size
|
||||
})
|
||||
if len(entries) > maxEntries {
|
||||
entries = entries[:maxEntries]
|
||||
// Convert Heaps to sorted slices (Descending order)
|
||||
entries := make([]dirEntry, entriesHeap.Len())
|
||||
for i := len(entries) - 1; i >= 0; i-- {
|
||||
entries[i] = heap.Pop(entriesHeap).(dirEntry)
|
||||
}
|
||||
|
||||
largeFiles := make([]fileEntry, largeFilesHeap.Len())
|
||||
for i := len(largeFiles) - 1; i >= 0; i-- {
|
||||
largeFiles[i] = heap.Pop(largeFilesHeap).(fileEntry)
|
||||
}
|
||||
|
||||
// Try to use Spotlight (mdfind) for faster large file discovery
|
||||
@@ -194,18 +217,15 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in
|
||||
// if Spotlight is unavailable or fails. The fallback is intentionally silent
|
||||
// because users only care about correct results, not the method used.
|
||||
if spotlightFiles := findLargeFilesWithSpotlight(root, minLargeFileSize); len(spotlightFiles) > 0 {
|
||||
// Spotlight results are already sorted top N
|
||||
// Use them in place of scanned large files
|
||||
largeFiles = spotlightFiles
|
||||
} else {
|
||||
// Use files collected during scanning (fallback path)
|
||||
// Sort and trim large files collected from scanning
|
||||
sort.Slice(largeFiles, func(i, j int) bool {
|
||||
return largeFiles[i].Size > largeFiles[j].Size
|
||||
})
|
||||
if len(largeFiles) > maxLargeFiles {
|
||||
largeFiles = largeFiles[:maxLargeFiles]
|
||||
}
|
||||
}
|
||||
|
||||
// Double check sorting consistency (Spotlight returns sorted, but heap pop handles scan results)
|
||||
// If needed, we could re-sort largeFiles, but heap pop ensures ascending, and we filled reverse, so it's Descending.
|
||||
// Spotlight returns Descending. So no extra sort needed for either.
|
||||
|
||||
return scanResult{
|
||||
Entries: entries,
|
||||
LargeFiles: largeFiles,
|
||||
@@ -242,70 +262,77 @@ func shouldSkipFileForLargeTracking(path string) bool {
|
||||
return skipExtensions[ext]
|
||||
}
|
||||
|
||||
// calculateDirSizeFast performs fast directory size calculation without detailed tracking or large file detection.
|
||||
// Updates progress counters in batches to reduce atomic operation overhead.
|
||||
// calculateDirSizeFast performs concurrent directory size calculation using os.ReadDir
|
||||
// This is a faster fallback than filepath.WalkDir when du fails
|
||||
func calculateDirSizeFast(root string, filesScanned, dirsScanned, bytesScanned *int64, currentPath *string) int64 {
|
||||
var total int64
|
||||
var localFiles, localDirs int64
|
||||
var batchBytes int64
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Create context with timeout
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
walkFunc := func(path string, d fs.DirEntry, err error) error {
|
||||
// Check for timeout
|
||||
// Limit total concurrency for this walk
|
||||
concurrency := runtime.NumCPU() * 4
|
||||
if concurrency > 64 {
|
||||
concurrency = 64
|
||||
}
|
||||
sem := make(chan struct{}, concurrency)
|
||||
|
||||
var walk func(string)
|
||||
walk = func(dirPath string) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
return
|
||||
default:
|
||||
}
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
if d.IsDir() {
|
||||
localDirs++
|
||||
// Batch update every N dirs to reduce atomic operations
|
||||
if localDirs%batchUpdateSize == 0 {
|
||||
atomic.AddInt64(dirsScanned, batchUpdateSize)
|
||||
localDirs = 0
|
||||
}
|
||||
return nil
|
||||
}
|
||||
info, err := d.Info()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
// Get actual disk usage for sparse files and cloud files
|
||||
size := getActualFileSize(path, info)
|
||||
total += size
|
||||
batchBytes += size
|
||||
localFiles++
|
||||
|
||||
if currentPath != nil {
|
||||
*currentPath = path
|
||||
*currentPath = dirPath
|
||||
}
|
||||
// Batch update every N files to reduce atomic operations
|
||||
if localFiles%batchUpdateSize == 0 {
|
||||
atomic.AddInt64(filesScanned, batchUpdateSize)
|
||||
atomic.AddInt64(bytesScanned, batchBytes)
|
||||
localFiles = 0
|
||||
batchBytes = 0
|
||||
|
||||
entries, err := os.ReadDir(dirPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var localBytes, localFiles int64
|
||||
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
// Directories: recurse concurrently
|
||||
wg.Add(1)
|
||||
// Capture loop variable
|
||||
subDir := filepath.Join(dirPath, entry.Name())
|
||||
go func(p string) {
|
||||
defer wg.Done()
|
||||
sem <- struct{}{} // Acquire token
|
||||
defer func() { <-sem }() // Release token
|
||||
walk(p)
|
||||
}(subDir)
|
||||
atomic.AddInt64(dirsScanned, 1)
|
||||
} else {
|
||||
// Files: process immediately
|
||||
info, err := entry.Info()
|
||||
if err == nil {
|
||||
size := getActualFileSize(filepath.Join(dirPath, entry.Name()), info)
|
||||
localBytes += size
|
||||
localFiles++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if localBytes > 0 {
|
||||
atomic.AddInt64(&total, localBytes)
|
||||
atomic.AddInt64(bytesScanned, localBytes)
|
||||
}
|
||||
if localFiles > 0 {
|
||||
atomic.AddInt64(filesScanned, localFiles)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
_ = filepath.WalkDir(root, walkFunc)
|
||||
|
||||
// Final update for remaining counts
|
||||
if localFiles > 0 {
|
||||
atomic.AddInt64(filesScanned, localFiles)
|
||||
}
|
||||
if localDirs > 0 {
|
||||
atomic.AddInt64(dirsScanned, localDirs)
|
||||
}
|
||||
if batchBytes > 0 {
|
||||
atomic.AddInt64(bytesScanned, batchBytes)
|
||||
}
|
||||
walk(root)
|
||||
wg.Wait()
|
||||
|
||||
return total
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user