82 lines
2.4 KiB
Go
Executable File
82 lines
2.4 KiB
Go
Executable File
package retriever
|
|
|
|
import (
|
|
"fmt"
|
|
"runtime"
|
|
"sync"
|
|
)
|
|
|
|
// Retrievable must be implemented by any type used with Retriever.
|
|
type Retrievable interface {
|
|
// Retrieve is used just to retrieve some data from somewhere (usually from
|
|
// network) and return a slice of bytes or nil and an error. If you need to
|
|
// write the data retrieved use a WorkerFunc where you can call this method.
|
|
Retrieve() ([]byte, error)
|
|
}
|
|
|
|
// Retriever allows to retrieve a bunch of files in parallel taking care of
|
|
// errors and a done channel that returns an amount of bytes written.
|
|
type Retriever struct {
|
|
retrieveErrors chan error
|
|
retrieveDone chan int
|
|
}
|
|
|
|
// New returns a new Retriever with a base path set.
|
|
func New() *Retriever {
|
|
return &Retriever{
|
|
retrieveErrors: make(chan error, 1),
|
|
retrieveDone: make(chan int)}
|
|
}
|
|
|
|
// GetErrorChan returns an error channel.
|
|
//
|
|
// An error channel holds just one error and hangs after that. So it is considered
|
|
// as a "soft panic".
|
|
func (r *Retriever) GetErrorChan() chan error {
|
|
return r.retrieveErrors
|
|
}
|
|
|
|
// GetDoneChan returns a channel that is used to track a progress. Its value
|
|
// is an amount of bytes written for an item.
|
|
func (r *Retriever) GetDoneChan() chan int {
|
|
return r.retrieveDone
|
|
}
|
|
|
|
// WorkerFunc accepts a Retrievable item and a basePath where an item should be
|
|
// placed to. And it returns how much was written and an error if something
|
|
// went wrong.
|
|
type WorkerFunc func(item Retrievable, basePath string) (int, error)
|
|
|
|
// Run runs in parallel a number of CPU threads * 2 WorkerFunc functions.
|
|
// If an error occured it stops and returns that error.
|
|
func (r *Retriever) Run(items []Retrievable, worker WorkerFunc, basePath string) error {
|
|
var workersGroup sync.WaitGroup
|
|
retrieveLimiter := make(chan struct{}, runtime.NumCPU()*2)
|
|
for _, item := range items {
|
|
if len(r.retrieveErrors) > 0 {
|
|
workersGroup.Wait()
|
|
return <-r.retrieveErrors
|
|
}
|
|
retrieveLimiter <- struct{}{}
|
|
workersGroup.Add(1)
|
|
go func(item Retrievable) {
|
|
bytesLen, err := worker(item, basePath)
|
|
workersGroup.Done()
|
|
<-retrieveLimiter
|
|
if err != nil {
|
|
r.retrieveErrors <- fmt.Errorf("failed to retrieve: %s", err)
|
|
return
|
|
}
|
|
r.retrieveDone <- bytesLen
|
|
}(item)
|
|
}
|
|
workersGroup.Wait()
|
|
return nil
|
|
}
|
|
|
|
// Close the channels.
|
|
func (r *Retriever) Close() {
|
|
close(r.retrieveDone)
|
|
close(r.retrieveErrors)
|
|
}
|