mirror of
https://github.com/grdl/git-get.git
synced 2026-02-08 11:04:19 +00:00
Add a worker pool to status loader
This will limit a number of councurrently running git commands when loading statuses of a large number of repositories.
This commit is contained in:
@@ -12,6 +12,9 @@ import (
|
|||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Max number of concurrently running status loading workers.
|
||||||
|
const maxWorkers = 100
|
||||||
|
|
||||||
// errSkipNode is used as an error indicating that .git directory has been found.
|
// errSkipNode is used as an error indicating that .git directory has been found.
|
||||||
// It's handled by ErrorsCallback to tell the WalkCallback to skip this dir.
|
// It's handled by ErrorsCallback to tell the WalkCallback to skip this dir.
|
||||||
var errSkipNode = errors.New(".git directory found, skipping this node")
|
var errSkipNode = errors.New(".git directory found, skipping this node")
|
||||||
@@ -37,16 +40,18 @@ func Exists(path string) (bool, error) {
|
|||||||
return true, errors.Wrapf(errDirNoAccess, "can't access %s", path)
|
return true, errors.Wrapf(errDirNoAccess, "can't access %s", path)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RepoFinder finds git repositories inside a given path.
|
// RepoFinder finds git repositories inside a given path and loads their status.
|
||||||
type RepoFinder struct {
|
type RepoFinder struct {
|
||||||
root string
|
root string
|
||||||
repos []*Repo
|
repos []*Repo
|
||||||
|
maxWorkers int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRepoFinder returns a RepoFinder pointed at given root path.
|
// NewRepoFinder returns a RepoFinder pointed at given root path.
|
||||||
func NewRepoFinder(root string) *RepoFinder {
|
func NewRepoFinder(root string) *RepoFinder {
|
||||||
return &RepoFinder{
|
return &RepoFinder{
|
||||||
root: root,
|
root: root,
|
||||||
|
maxWorkers: maxWorkers,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -79,24 +84,28 @@ func (f *RepoFinder) Find() error {
|
|||||||
|
|
||||||
// LoadAll loads and returns sorted slice of statuses of all repositories found by RepoFinder.
|
// LoadAll loads and returns sorted slice of statuses of all repositories found by RepoFinder.
|
||||||
// If fetch equals true, it first fetches from the remote repo before loading the status.
|
// If fetch equals true, it first fetches from the remote repo before loading the status.
|
||||||
// Each repo is loaded concurrently in its own goroutine, with max 100 repos being loaded at the same time.
|
// Each repo is loaded concurrently by a separate worker, with max 100 workers being active at the same time.
|
||||||
func (f *RepoFinder) LoadAll(fetch bool) []*Status {
|
func (f *RepoFinder) LoadAll(fetch bool) []*Status {
|
||||||
var ss []*Status
|
var ss []*Status
|
||||||
|
|
||||||
loadedChan := make(chan *Status)
|
reposChan := make(chan *Repo, f.maxWorkers)
|
||||||
|
statusChan := make(chan *Status, f.maxWorkers)
|
||||||
|
|
||||||
for _, repo := range f.repos {
|
// Fire up workers. They listen on reposChan, load status and send the result to statusChan.
|
||||||
go func(repo *Repo) {
|
for i := 0; i < f.maxWorkers; i++ {
|
||||||
loadedChan <- repo.LoadStatus(fetch)
|
go statusWorker(fetch, reposChan, statusChan)
|
||||||
}(repo)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for l := range loadedChan {
|
// Start loading the slice of repos found by finder into the reposChan.
|
||||||
ss = append(ss, l)
|
// It runs in a goroutine so that as soon as repos appear on the channel they can be processed and sent to statusChan.
|
||||||
|
go loadRepos(f.repos, reposChan)
|
||||||
|
|
||||||
// Close the channel when all repos are loaded.
|
// Read statuses from the statusChan and add then to the result slice.
|
||||||
|
// Close the channel when all repos are loaded.
|
||||||
|
for status := range statusChan {
|
||||||
|
ss = append(ss, status)
|
||||||
if len(ss) == len(f.repos) {
|
if len(ss) == len(f.repos) {
|
||||||
close(loadedChan)
|
close(statusChan)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -108,6 +117,20 @@ func (f *RepoFinder) LoadAll(fetch bool) []*Status {
|
|||||||
return ss
|
return ss
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func loadRepos(repos []*Repo, reposChan chan<- *Repo) {
|
||||||
|
for _, repo := range repos {
|
||||||
|
reposChan <- repo
|
||||||
|
}
|
||||||
|
|
||||||
|
close(reposChan)
|
||||||
|
}
|
||||||
|
|
||||||
|
func statusWorker(fetch bool, reposChan <-chan *Repo, statusChan chan<- *Status) {
|
||||||
|
for repo := range reposChan {
|
||||||
|
statusChan <- repo.LoadStatus(fetch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (f *RepoFinder) walkCb(path string, ent *godirwalk.Dirent) error {
|
func (f *RepoFinder) walkCb(path string, ent *godirwalk.Dirent) error {
|
||||||
// Do not traverse .git directories
|
// Do not traverse .git directories
|
||||||
if ent.IsDir() && ent.Name() == dotgit {
|
if ent.IsDir() && ent.Name() == dotgit {
|
||||||
|
|||||||
Reference in New Issue
Block a user