diff --git a/pkg/git/finder.go b/pkg/git/finder.go index d997988..0ee90bd 100644 --- a/pkg/git/finder.go +++ b/pkg/git/finder.go @@ -12,6 +12,9 @@ import ( "github.com/pkg/errors" ) +// Max number of concurrently running status loading workers. +const maxWorkers = 100 + // errSkipNode is used as an error indicating that .git directory has been found. // It's handled by ErrorsCallback to tell the WalkCallback to skip this dir. var errSkipNode = errors.New(".git directory found, skipping this node") @@ -37,16 +40,18 @@ func Exists(path string) (bool, error) { return true, errors.Wrapf(errDirNoAccess, "can't access %s", path) } -// RepoFinder finds git repositories inside a given path. +// RepoFinder finds git repositories inside a given path and loads their status. type RepoFinder struct { - root string - repos []*Repo + root string + repos []*Repo + maxWorkers int } // NewRepoFinder returns a RepoFinder pointed at given root path. func NewRepoFinder(root string) *RepoFinder { return &RepoFinder{ - root: root, + root: root, + maxWorkers: maxWorkers, } } @@ -79,24 +84,28 @@ func (f *RepoFinder) Find() error { // LoadAll loads and returns sorted slice of statuses of all repositories found by RepoFinder. // If fetch equals true, it first fetches from the remote repo before loading the status. -// Each repo is loaded concurrently in its own goroutine, with max 100 repos being loaded at the same time. +// Each repo is loaded concurrently by a separate worker, with max 100 workers being active at the same time. func (f *RepoFinder) LoadAll(fetch bool) []*Status { var ss []*Status - loadedChan := make(chan *Status) + reposChan := make(chan *Repo, f.maxWorkers) + statusChan := make(chan *Status, f.maxWorkers) - for _, repo := range f.repos { - go func(repo *Repo) { - loadedChan <- repo.LoadStatus(fetch) - }(repo) + // Fire up workers. They listen on reposChan, load status and send the result to statusChan. + for i := 0; i < f.maxWorkers; i++ { + go statusWorker(fetch, reposChan, statusChan) } - for l := range loadedChan { - ss = append(ss, l) + // Start loading the slice of repos found by finder into the reposChan. + // It runs in a goroutine so that as soon as repos appear on the channel they can be processed and sent to statusChan. + go loadRepos(f.repos, reposChan) - // Close the channel when all repos are loaded. + // Read statuses from the statusChan and add then to the result slice. + // Close the channel when all repos are loaded. + for status := range statusChan { + ss = append(ss, status) if len(ss) == len(f.repos) { - close(loadedChan) + close(statusChan) } } @@ -108,6 +117,20 @@ func (f *RepoFinder) LoadAll(fetch bool) []*Status { return ss } +func loadRepos(repos []*Repo, reposChan chan<- *Repo) { + for _, repo := range repos { + reposChan <- repo + } + + close(reposChan) +} + +func statusWorker(fetch bool, reposChan <-chan *Repo, statusChan chan<- *Status) { + for repo := range reposChan { + statusChan <- repo.LoadStatus(fetch) + } +} + func (f *RepoFinder) walkCb(path string, ent *godirwalk.Dirent) error { // Do not traverse .git directories if ent.IsDir() && ent.Name() == dotgit {