package retriever import ( "fmt" "runtime" "sync" ) // Retrievable must be implemented by any type used with Retriever. type Retrievable interface { // Retrieve is used just to retrieve some data from somewhere (usually from // network) and return a slice of bytes or nil and an error. If you need to // write the data retrieved use a WorkerFunc where you can call this method. Retrieve() ([]byte, error) } // Retriever allows to retrieve a bunch of files in parallel taking care of // errors and a done channel that returns an amount of bytes written. type Retriever struct { retrieveErrors chan error retrieveDone chan int } // New returns a new Retriever with a base path set. func New() *Retriever { return &Retriever{ retrieveErrors: make(chan error, 1), retrieveDone: make(chan int)} } // GetErrorChan returns an error channel. // // An error channel holds just one error and hangs after that. So it is considered // as a "soft panic". func (r *Retriever) GetErrorChan() chan error { return r.retrieveErrors } // GetDoneChan returns a channel that is used to track a progress. Its value // is an amount of bytes written for an item. func (r *Retriever) GetDoneChan() chan int { return r.retrieveDone } // WorkerFunc accepts a Retrievable item and a basePath where an item should be // placed to. And it returns how much was written and an error if something // went wrong. type WorkerFunc func(item Retrievable, basePath string) (int, error) // Run runs in parallel a number of CPU threads * 2 WorkerFunc functions. // If an error occured it stops and returns that error. func (r *Retriever) Run(items []Retrievable, worker WorkerFunc, basePath string) error { var workersGroup sync.WaitGroup retrieveLimiter := make(chan struct{}, runtime.NumCPU()*2) for _, item := range items { if len(r.retrieveErrors) > 0 { workersGroup.Wait() return <-r.retrieveErrors } retrieveLimiter <- struct{}{} workersGroup.Add(1) go func(item Retrievable) { bytesLen, err := worker(item, basePath) workersGroup.Done() <-retrieveLimiter if err != nil { r.retrieveErrors <- fmt.Errorf("failed to retrieve: %s", err) return } r.retrieveDone <- bytesLen }(item) } workersGroup.Wait() return nil } // Close the channels. func (r *Retriever) Close() { close(r.retrieveDone) close(r.retrieveErrors) }