1
0
mirror of https://github.com/tw93/Mole.git synced 2026-02-11 22:19:00 +00:00

optimize analyze scanner concurrency and channel send semantics

This commit is contained in:
tw93
2026-02-10 16:23:24 +08:00
parent 7d8bfbc9d9
commit 1a559f8563
2 changed files with 87 additions and 54 deletions

View File

@@ -10,7 +10,6 @@ import (
"os/exec"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"sync"
@@ -26,12 +25,33 @@ var scanGroup singleflight.Group
// trySend attempts to send an item to a channel with a timeout.
// Returns true if the item was sent, false if the timeout was reached.
func trySend[T any](ch chan<- T, item T, timeout time.Duration) bool {
timer := time.NewTimer(timeout)
if timeout <= 0 {
select {
case ch <- item:
return true
default:
return false
}
}
select {
case ch <- item:
return true
default:
}
timer := time.NewTimer(timeout)
defer func() {
if !timer.Stop() {
<-timer.C
select {
case <-timer.C:
default:
}
}
}()
select {
case ch <- item:
return true
case <-timer.C:
return false
@@ -45,6 +65,8 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in
}
var total int64
var localFilesScanned int64
var localBytesScanned int64
// Keep Top N heaps.
entriesHeap := &entryHeap{}
@@ -66,6 +88,7 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in
numWorkers = 1
}
sem := make(chan struct{}, numWorkers)
dirSem := make(chan struct{}, min(runtime.NumCPU()*2, maxDirWorkers))
duSem := make(chan struct{}, min(4, runtime.NumCPU())) // limits concurrent du processes
duQueueSem := make(chan struct{}, min(4, runtime.NumCPU())*2) // limits how many goroutines may be waiting to run du
var wg sync.WaitGroup
@@ -169,7 +192,7 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in
} else if cached, err := loadCacheFromDisk(path); err == nil {
size = cached.TotalSize
} else {
size = calculateDirSizeConcurrent(path, largeFileChan, &largeFileMinSize, duSem, duQueueSem, filesScanned, dirsScanned, bytesScanned, currentPath)
size = calculateDirSizeConcurrent(path, largeFileChan, &largeFileMinSize, dirSem, duSem, duQueueSem, filesScanned, dirsScanned, bytesScanned, currentPath)
}
atomic.AddInt64(&total, size)
atomic.AddInt64(dirsScanned, 1)
@@ -221,7 +244,7 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in
defer wg.Done()
defer func() { <-sem }()
size := calculateDirSizeConcurrent(path, largeFileChan, &largeFileMinSize, duSem, duQueueSem, filesScanned, dirsScanned, bytesScanned, currentPath)
size := calculateDirSizeConcurrent(path, largeFileChan, &largeFileMinSize, dirSem, duSem, duQueueSem, filesScanned, dirsScanned, bytesScanned, currentPath)
atomic.AddInt64(&total, size)
atomic.AddInt64(dirsScanned, 1)
@@ -243,8 +266,8 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in
// Actual disk usage for sparse/cloud files.
size := getActualFileSize(fullPath, info)
atomic.AddInt64(&total, size)
atomic.AddInt64(filesScanned, 1)
atomic.AddInt64(bytesScanned, size)
localFilesScanned++
localBytesScanned += size
trySend(entryChan, dirEntry{
Name: child.Name(),
@@ -263,6 +286,13 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in
}
}
if localFilesScanned > 0 {
atomic.AddInt64(filesScanned, localFilesScanned)
}
if localBytesScanned > 0 {
atomic.AddInt64(bytesScanned, localBytesScanned)
}
wg.Wait()
// Close channels and wait for collectors.
@@ -403,7 +433,8 @@ func findLargeFilesWithSpotlight(root string, minSize int64) []fileEntry {
return nil
}
var files []fileEntry
h := &largeFileHeap{}
heap.Init(h)
for line := range strings.Lines(strings.TrimSpace(string(output))) {
if line == "" {
@@ -431,20 +462,23 @@ func findLargeFilesWithSpotlight(root string, minSize int64) []fileEntry {
// Actual disk usage for sparse/cloud files.
actualSize := getActualFileSize(line, info)
files = append(files, fileEntry{
candidate := fileEntry{
Name: filepath.Base(line),
Path: line,
Size: actualSize,
})
}
if h.Len() < maxLargeFiles {
heap.Push(h, candidate)
} else if candidate.Size > (*h)[0].Size {
heap.Pop(h)
heap.Push(h, candidate)
}
}
// Sort by size (descending).
sort.Slice(files, func(i, j int) bool {
return files[i].Size > files[j].Size
})
if len(files) > maxLargeFiles {
files = files[:maxLargeFiles]
files := make([]fileEntry, h.Len())
for i := len(files) - 1; i >= 0; i-- {
files[i] = heap.Pop(h).(fileEntry)
}
return files
@@ -461,19 +495,18 @@ func isInFoldedDir(path string) bool {
return false
}
func calculateDirSizeConcurrent(root string, largeFileChan chan<- fileEntry, largeFileMinSize *int64, duSem, duQueueSem chan struct{}, filesScanned, dirsScanned, bytesScanned *int64, currentPath *atomic.Value) int64 {
func calculateDirSizeConcurrent(root string, largeFileChan chan<- fileEntry, largeFileMinSize *int64, dirSem, duSem, duQueueSem chan struct{}, filesScanned, dirsScanned, bytesScanned *int64, currentPath *atomic.Value) int64 {
children, err := os.ReadDir(root)
if err != nil {
return 0
}
var total int64
var localFilesScanned int64
var localDirsScanned int64
var localBytesScanned int64
var wg sync.WaitGroup
// Limit concurrent subdirectory scans.
maxConcurrent := min(runtime.NumCPU()*2, maxDirWorkers)
sem := make(chan struct{}, maxConcurrent)
for _, child := range children {
fullPath := filepath.Join(root, child.Name())
@@ -484,12 +517,14 @@ func calculateDirSizeConcurrent(root string, largeFileChan chan<- fileEntry, lar
}
size := getActualFileSize(fullPath, info)
total += size
atomic.AddInt64(filesScanned, 1)
atomic.AddInt64(bytesScanned, size)
localFilesScanned++
localBytesScanned += size
continue
}
if child.IsDir() {
localDirsScanned++
if shouldFoldDirWithPath(child.Name(), fullPath) {
duQueueSem <- struct{}{}
wg.Add(1)
@@ -508,21 +543,24 @@ func calculateDirSizeConcurrent(root string, largeFileChan chan<- fileEntry, lar
atomic.AddInt64(bytesScanned, size)
}
atomic.AddInt64(&total, size)
atomic.AddInt64(dirsScanned, 1)
}(fullPath)
continue
}
sem <- struct{}{}
wg.Add(1)
go func(path string) {
defer wg.Done()
defer func() { <-sem }()
select {
case dirSem <- struct{}{}:
wg.Add(1)
go func(path string) {
defer wg.Done()
defer func() { <-dirSem }()
size := calculateDirSizeConcurrent(path, largeFileChan, largeFileMinSize, duSem, duQueueSem, filesScanned, dirsScanned, bytesScanned, currentPath)
size := calculateDirSizeConcurrent(path, largeFileChan, largeFileMinSize, dirSem, duSem, duQueueSem, filesScanned, dirsScanned, bytesScanned, currentPath)
atomic.AddInt64(&total, size)
}(fullPath)
default:
size := calculateDirSizeConcurrent(fullPath, largeFileChan, largeFileMinSize, dirSem, duSem, duQueueSem, filesScanned, dirsScanned, bytesScanned, currentPath)
atomic.AddInt64(&total, size)
atomic.AddInt64(dirsScanned, 1)
}(fullPath)
}
continue
}
@@ -533,8 +571,8 @@ func calculateDirSizeConcurrent(root string, largeFileChan chan<- fileEntry, lar
size := getActualFileSize(fullPath, info)
total += size
atomic.AddInt64(filesScanned, 1)
atomic.AddInt64(bytesScanned, size)
localFilesScanned++
localBytesScanned += size
if !shouldSkipFileForLargeTracking(fullPath) && largeFileMinSize != nil {
minSize := atomic.LoadInt64(largeFileMinSize)
@@ -544,12 +582,23 @@ func calculateDirSizeConcurrent(root string, largeFileChan chan<- fileEntry, lar
}
// Update current path occasionally to prevent UI jitter.
if currentPath != nil && atomic.LoadInt64(filesScanned)%int64(batchUpdateSize) == 0 {
if currentPath != nil && localFilesScanned%int64(batchUpdateSize) == 0 {
currentPath.Store(fullPath)
}
}
wg.Wait()
if localFilesScanned > 0 {
atomic.AddInt64(filesScanned, localFilesScanned)
}
if localBytesScanned > 0 {
atomic.AddInt64(bytesScanned, localBytesScanned)
}
if localDirsScanned > 0 {
atomic.AddInt64(dirsScanned, localDirsScanned)
}
return total
}
@@ -699,14 +748,6 @@ func getActualFileSize(_ string, info fs.FileInfo) int64 {
return info.Size()
}
func getLastAccessTime(path string) time.Time {
info, err := os.Stat(path)
if err != nil {
return time.Time{}
}
return getLastAccessTimeFromInfo(info)
}
func getLastAccessTimeFromInfo(info fs.FileInfo) time.Time {
stat, ok := info.Sys().(*syscall.Stat_t)
if !ok {

View File

@@ -220,11 +220,7 @@ func (m model) View() string {
if entry.IsDir && isCleanableDir(entry.Path) {
hintLabel = fmt.Sprintf("%s🧹%s", colorYellow, colorReset)
} else {
lastAccess := entry.LastAccess
if lastAccess.IsZero() && entry.Path != "" {
lastAccess = getLastAccessTime(entry.Path)
}
if unusedTime := formatUnusedTime(lastAccess); unusedTime != "" {
if unusedTime := formatUnusedTime(entry.LastAccess); unusedTime != "" {
hintLabel = fmt.Sprintf("%s%s%s", colorGray, unusedTime, colorReset)
}
}
@@ -309,11 +305,7 @@ func (m model) View() string {
if entry.IsDir && isCleanableDir(entry.Path) {
hintLabel = fmt.Sprintf("%s🧹%s", colorYellow, colorReset)
} else {
lastAccess := entry.LastAccess
if lastAccess.IsZero() && entry.Path != "" {
lastAccess = getLastAccessTime(entry.Path)
}
if unusedTime := formatUnusedTime(lastAccess); unusedTime != "" {
if unusedTime := formatUnusedTime(entry.LastAccess); unusedTime != "" {
hintLabel = fmt.Sprintf("%s%s%s", colorGray, unusedTime, colorReset)
}
}