1
0
mccl/pkg/retriever/retriever.go

82 lines
2.4 KiB
Go
Executable File

package retriever
import (
"fmt"
"runtime"
"sync"
)
// Retrievable must be implemented by any type used with Retriever.
type Retrievable interface {
// Retrieve is used just to retrieve some data from somewhere (usually from
// network) and return a slice of bytes or nil and an error. If you need to
// write the data retrieved use a WorkerFunc where you can call this method.
Retrieve() ([]byte, error)
}
// Retriever allows to retrieve a bunch of files in parallel taking care of
// errors and a done channel that returns an amount of bytes written.
type Retriever struct {
retrieveErrors chan error
retrieveDone chan int
}
// New returns a new Retriever with a base path set.
func New() *Retriever {
return &Retriever{
retrieveErrors: make(chan error, 1),
retrieveDone: make(chan int)}
}
// GetErrorChan returns an error channel.
//
// An error channel holds just one error and hangs after that. So it is considered
// as a "soft panic".
func (r *Retriever) GetErrorChan() chan error {
return r.retrieveErrors
}
// GetDoneChan returns a channel that is used to track a progress. Its value
// is an amount of bytes written for an item.
func (r *Retriever) GetDoneChan() chan int {
return r.retrieveDone
}
// WorkerFunc accepts a Retrievable item and a basePath where an item should be
// placed to. And it returns how much was written and an error if something
// went wrong.
type WorkerFunc func(item Retrievable, basePath string) (int, error)
// Run runs in parallel a number of CPU threads * 2 WorkerFunc functions.
// If an error occured it stops and returns that error.
func (r *Retriever) Run(items []Retrievable, worker WorkerFunc, basePath string) error {
var workersGroup sync.WaitGroup
retrieveLimiter := make(chan struct{}, runtime.NumCPU()*2)
for _, item := range items {
if len(r.retrieveErrors) > 0 {
workersGroup.Wait()
return <-r.retrieveErrors
}
retrieveLimiter <- struct{}{}
workersGroup.Add(1)
go func(item Retrievable) {
bytesLen, err := worker(item, basePath)
workersGroup.Done()
<-retrieveLimiter
if err != nil {
r.retrieveErrors <- fmt.Errorf("failed to retrieve: %s", err)
return
}
r.retrieveDone <- bytesLen
}(item)
}
workersGroup.Wait()
return nil
}
// Close the channels.
func (r *Retriever) Close() {
close(r.retrieveDone)
close(r.retrieveErrors)
}