mirror of
https://github.com/tw93/Mole.git
synced 2026-02-04 08:06:43 +00:00
refactor: simplify channel send logic with trySend function
This commit is contained in:
@@ -23,6 +23,21 @@ import (
|
||||
|
||||
var scanGroup singleflight.Group
|
||||
|
||||
// trySend attempts to send an item to a channel with a timeout.
|
||||
// Returns true if the item was sent, false if the timeout was reached.
|
||||
func trySend[T any](ch chan<- T, item T, timeout time.Duration) bool {
|
||||
timer := time.NewTimer(timeout)
|
||||
select {
|
||||
case ch <- item:
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
return true
|
||||
case <-timer.C:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *int64, currentPath *atomic.Value) (scanResult, error) {
|
||||
children, err := os.ReadDir(root)
|
||||
if err != nil {
|
||||
@@ -119,42 +134,13 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in
|
||||
size := getActualFileSize(fullPath, info)
|
||||
atomic.AddInt64(&total, size)
|
||||
|
||||
// Reuse timer to reduce GC pressure
|
||||
timer := time.NewTimer(0)
|
||||
// Ensure timer is drained immediately since we start with 0
|
||||
if !timer.Stop() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case entryChan <- dirEntry{
|
||||
trySend(entryChan, dirEntry{
|
||||
Name: child.Name() + " →",
|
||||
Path: fullPath,
|
||||
Size: size,
|
||||
IsDir: isDir,
|
||||
LastAccess: getLastAccessTimeFromInfo(info),
|
||||
}:
|
||||
default:
|
||||
// If channel is full, use timer to wait with timeout
|
||||
timer.Reset(100 * time.Millisecond)
|
||||
select {
|
||||
case entryChan <- dirEntry{
|
||||
Name: child.Name() + " →",
|
||||
Path: fullPath,
|
||||
Size: size,
|
||||
IsDir: isDir,
|
||||
LastAccess: getLastAccessTimeFromInfo(info),
|
||||
}:
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
case <-timer.C:
|
||||
// Skip if channel is blocked
|
||||
}
|
||||
}
|
||||
}, 100*time.Millisecond)
|
||||
continue
|
||||
|
||||
}
|
||||
@@ -188,20 +174,13 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in
|
||||
atomic.AddInt64(&total, size)
|
||||
atomic.AddInt64(dirsScanned, 1)
|
||||
|
||||
timer := time.NewTimer(100 * time.Millisecond)
|
||||
select {
|
||||
case entryChan <- dirEntry{
|
||||
trySend(entryChan, dirEntry{
|
||||
Name: name,
|
||||
Path: path,
|
||||
Size: size,
|
||||
IsDir: true,
|
||||
LastAccess: time.Time{},
|
||||
}:
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
case <-timer.C:
|
||||
}
|
||||
}, 100*time.Millisecond)
|
||||
}(child.Name(), fullPath)
|
||||
continue
|
||||
}
|
||||
@@ -225,20 +204,13 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in
|
||||
atomic.AddInt64(&total, size)
|
||||
atomic.AddInt64(dirsScanned, 1)
|
||||
|
||||
timer := time.NewTimer(100 * time.Millisecond)
|
||||
select {
|
||||
case entryChan <- dirEntry{
|
||||
trySend(entryChan, dirEntry{
|
||||
Name: name,
|
||||
Path: path,
|
||||
Size: size,
|
||||
IsDir: true,
|
||||
LastAccess: time.Time{},
|
||||
}:
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
case <-timer.C:
|
||||
}
|
||||
}, 100*time.Millisecond)
|
||||
}(child.Name(), fullPath)
|
||||
continue
|
||||
}
|
||||
@@ -253,20 +225,13 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in
|
||||
atomic.AddInt64(&total, size)
|
||||
atomic.AddInt64(dirsScanned, 1)
|
||||
|
||||
timer := time.NewTimer(100 * time.Millisecond)
|
||||
select {
|
||||
case entryChan <- dirEntry{
|
||||
trySend(entryChan, dirEntry{
|
||||
Name: name,
|
||||
Path: path,
|
||||
Size: size,
|
||||
IsDir: true,
|
||||
LastAccess: time.Time{},
|
||||
}:
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
case <-timer.C:
|
||||
}
|
||||
}, 100*time.Millisecond)
|
||||
}(child.Name(), fullPath)
|
||||
continue
|
||||
}
|
||||
@@ -281,35 +246,19 @@ func scanPathConcurrent(root string, filesScanned, dirsScanned, bytesScanned *in
|
||||
atomic.AddInt64(filesScanned, 1)
|
||||
atomic.AddInt64(bytesScanned, size)
|
||||
|
||||
// Single-use timer for main loop (less pressure than tight loop above)
|
||||
// But let's be consistent and optimized
|
||||
timer := time.NewTimer(100 * time.Millisecond)
|
||||
select {
|
||||
case entryChan <- dirEntry{
|
||||
trySend(entryChan, dirEntry{
|
||||
Name: child.Name(),
|
||||
Path: fullPath,
|
||||
Size: size,
|
||||
IsDir: false,
|
||||
LastAccess: getLastAccessTimeFromInfo(info),
|
||||
}:
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
case <-timer.C:
|
||||
}
|
||||
}, 100*time.Millisecond)
|
||||
|
||||
// Track large files only.
|
||||
if !shouldSkipFileForLargeTracking(fullPath) {
|
||||
minSize := atomic.LoadInt64(&largeFileMinSize)
|
||||
if size >= minSize {
|
||||
timer.Reset(100 * time.Millisecond)
|
||||
select {
|
||||
case largeFileChan <- fileEntry{Name: child.Name(), Path: fullPath, Size: size}:
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
case <-timer.C:
|
||||
}
|
||||
trySend(largeFileChan, fileEntry{Name: child.Name(), Path: fullPath, Size: size}, 100*time.Millisecond)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -519,15 +468,6 @@ func calculateDirSizeConcurrent(root string, largeFileChan chan<- fileEntry, lar
|
||||
maxConcurrent := min(runtime.NumCPU()*2, maxDirWorkers)
|
||||
sem := make(chan struct{}, maxConcurrent)
|
||||
|
||||
// Reuse timer for large file sends
|
||||
timer := time.NewTimer(0)
|
||||
if !timer.Stop() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
for _, child := range children {
|
||||
fullPath := filepath.Join(root, child.Name())
|
||||
|
||||
@@ -593,14 +533,7 @@ func calculateDirSizeConcurrent(root string, largeFileChan chan<- fileEntry, lar
|
||||
if !shouldSkipFileForLargeTracking(fullPath) && largeFileMinSize != nil {
|
||||
minSize := atomic.LoadInt64(largeFileMinSize)
|
||||
if size >= minSize {
|
||||
timer.Reset(100 * time.Millisecond)
|
||||
select {
|
||||
case largeFileChan <- fileEntry{Name: child.Name(), Path: fullPath, Size: size}:
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
case <-timer.C:
|
||||
}
|
||||
trySend(largeFileChan, fileEntry{Name: child.Name(), Path: fullPath, Size: size}, 100*time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user