Documentation
¶
Index ¶
- Constants
- Variables
- func Any(chans ...<-chan any) <-chan any
- func AnyWithCtx(ctx context.Context, chans ...<-chan any) <-chan any
- func ChunkBy[T any](items []T, by int) [][]T
- func Flatten[T any](chunks [][]T) []T
- func InParallel[T any](funcs []func() (T, error)) ([]T, error)
- func InParallelThrottle[T any](throttle int, funcs []func() (T, error)) ([]T, error)
- func IterDone[T any](read <-chan T, done <-chan any) <-chan T
- func Multiplex[T any](inbound ...<-chan T) <-chan T
- func ReadOrDone[T any](read <-chan T, done <-chan any) (T, bool)
- func TeeBy[T any](in <-chan T, done <-chan interface{}, n int) (outR []<-chan T)
- func ThrottleBy[T any](in []chan T, done <-chan interface{}, n int) (out []chan T)
- func WriteOrDone[T any](val T, write chan<- T, done <-chan any) bool
- type Iter
- func (l Iter[T]) At(index int) T
- func (l Iter[T]) Delete(index int) Iter[T]
- func (l *Iter[T]) FPop() T
- func (l Iter[T]) Filter(f func(val T) bool) *Iter[T]
- func (l Iter[T]) Find(f func(val T) bool) (T, bool)
- func (l Iter[T]) Insert(index int, val T) Iter[T]
- func (l *Iter[T]) Iter() []T
- func (l Iter[T]) Len() int
- func (l Iter[T]) Map(f func(val T) T) Iter[T]
- func (l *Iter[T]) Pop() T
- func (l *Iter[T]) Push(val T)
- func (l Iter[T]) Reduce(f func(acc *T, val T)) *T
- func (l Iter[T]) Reverse() Iter[T]
- func (l Iter[T]) Slice(start, stop int) Iter[T]
- func (l Iter[T]) Unwrap() []T
- type Option
- type PLConfig
- type PLNode
- func (n *PLNode[T]) CollectCount() []PLNodeCount
- func (n *PLNode[T]) Count() PLNodeCount
- func (n *PLNode[T]) Inc()
- func (n *PLNode[T]) IncAs(val T)
- func (n *PLNode[T]) IncAsBatch(val []T)
- func (n *PLNode[T]) Print()
- func (n *PLNode[T]) PrintFullBF()
- func (n *PLNode[T]) Set(val T)
- func (n *PLNode[T]) Spawn() *PLNode[T]
- func (n *PLNode[T]) SpawnAs(child *PLNode[T])
- func (n *PLNode[T]) State() (string, int, T)
- func (n *PLNode[T]) StateArr() []string
- type PLNodeCount
- type PLOption
- type Pipeline
- func (p *Pipeline[T]) Batch(n int, f func(set []T) ([]T, error)) *Pipeline[T]
- func (p *Pipeline[T]) Buffer(n uint) *Pipeline[T]
- func (p *Pipeline[T]) Config(config PLConfig) *Pipeline[T]
- func (p *Pipeline[T]) Fork(fs ...func(data T) (T, error)) *Pipeline[T]
- func (p *Pipeline[T]) ForkOutIn(funcs []func(data T) (T, error), merge func([]T) ([]T, error), opts ...Option) *Pipeline[T]
- func (p *Pipeline[T]) Merge(f func(data []T) ([]T, error)) *Pipeline[T]
- func (p *Pipeline[T]) MixPool(funcs []func(data T) (T, error), opts ...Option) *Pipeline[T]
- func (p *Pipeline[T]) Option(fs ...func(data T) (T, error)) *Pipeline[T]
- func (p *Pipeline[T]) Run() ([]PLNodeCount, error)
- func (p *Pipeline[T]) Stage(fs ...func(data T) (T, error)) *Pipeline[T]
- func (p *Pipeline[T]) Tally() chan<- int
- func (p *Pipeline[T]) Throttle(n uint) *Pipeline[T]
- func (p *Pipeline[T]) WorkPool(f func(data T) (T, error), numWorkers uint, opts ...Option) *Pipeline[T]
- func (p *Pipeline[T]) WorkerPool(f func(data T) (T, error), opts ...Option) *Pipeline[T]
- type Stepper
- type TokenBucket
- type WorkerConfig
- type WorkerPool
- func (b *WorkerPool[T, R]) Boot() *WorkerPool[T, R]
- func (b *WorkerPool[T, R]) Close() *WorkerPool[T, R]
- func (b *WorkerPool[T, R]) Collect() (res []R, errs []error)
- func (b *WorkerPool[T, R]) IsErr() bool
- func (b *WorkerPool[T, R]) Push(items ...T) *WorkerPool[T, R]
- func (b *WorkerPool[T, R]) TakeErrors() (errs []error)
- func (b *WorkerPool[T, R]) TakeResults() (res []R)
Constants ¶
const ( FORK stageType = iota THROTTLE BATCH BUFFER MERGE OPTION FAN_OUT )
const MAX_PAD int = 20
Variables ¶
var ( ErrNoGenerator error = errors.New("pipeline run error: invalid pipeline: no generator provided") ErrEmptyStage error = errors.New("pipeline run error: empty stage: no functions provided in stage") ErrEmptyThrottle error = errors.New("pipeline run error: empty throttle: throttle must be a positive value") ErrInvalidThrottle error = errors.New("pipeline run error: invalid throttle: throttle value cannot be higher than channel count") ErrNilFunc error = errors.New("pipeline run error: invalid stage: Nil function") ErrContextCanceled error = errors.New("pipeline run error: context canceled") ErrWithRetry error = errors.New("job run error: Handler returned error, will retry") ErrWithoutRetry error = errors.New("job run error: Handler returned error") )
var ( ErrInvalidOperationNotRunning = errors.New("workerPool invalid operation error: workerPool is not running") ErrInvalidOperationRunning = errors.New("workerPool invalid operation error: workerPool is running") ErrInvalidOperationHasRun = errors.New("workerPool invalid operation error: workerPool has already run") )
Functions ¶
func AnyWithCtx ¶ added in v1.1.4
func InParallel ¶
InParallel runs all the functions asynchronously and returns the results in order or the first error.
func InParallelThrottle ¶ added in v1.1.1
InParallelThrottle runs all the functions asynchronously and returns the results in order or the first error. No more than `throttle` threads will be active at any given time.
func IterDone ¶ added in v0.1.4
IterDone combines a read and done channel for convenient iterating. Iterate over the return channel knowing the loop will exit when either read or done are closed.
func Multiplex ¶ added in v1.1.1
func Multiplex[T any](inbound ...<-chan T) <-chan T
Multiplex merges any number of read channels into one consolidated read-only stream
func ReadOrDone ¶
ReadOrDone blocks until it receives from 'read' or receives from 'done' and returns the boolean result.
func TeeBy ¶
TeeBy broadcasts all received signals on provided channel into n output channels. This function returns when 'in' channel is closed or signal is received on 'done'.
func ThrottleBy ¶
ThrottleBy merges the output of the provided channels into n output channels. This function returns when 'in' channels are closed or signal is received on 'done'.
func WriteOrDone ¶
WriteOrDone blocks until it sends to 'write' or receives from 'done' and returns the boolean result.
Types ¶
type Iter ¶
type Iter[T any] struct { // contains filtered or unexported fields }
func SliceToList ¶
type PLConfig ¶
type PLConfig struct {
LogAll bool
LogEmit bool
LogCount bool
ReturnCount bool
LogStep bool
LogErr bool
// contains filtered or unexported fields
}
PLConfig controls limited pipeline behavior.
type PLNode ¶
type PLNode[T any] struct { // contains filtered or unexported fields }
PLNode is a representation of a pipeline node for logging/insight-tracking.
func NewPLNodeAs ¶
func (*PLNode[T]) CollectCount ¶
func (n *PLNode[T]) CollectCount() []PLNodeCount
func (*PLNode[T]) Count ¶
func (n *PLNode[T]) Count() PLNodeCount
func (*PLNode[T]) IncAsBatch ¶ added in v0.1.3
func (n *PLNode[T]) IncAsBatch(val []T)
func (*PLNode[T]) PrintFullBF ¶
func (n *PLNode[T]) PrintFullBF()
PrintFullBF will print the full PLNode tree breadth-first to stdout.
type PLNodeCount ¶
type PLOption ¶ added in v1.1.4
type PLOption func(*PLConfig)
func WithContext ¶ added in v1.1.4
func WithLogAll ¶ added in v1.1.4
func WithLogAll() PLOption
func WithLogCount ¶ added in v1.1.4
func WithLogCount() PLOption
func WithLogEmit ¶ added in v1.1.4
func WithLogEmit() PLOption
func WithLogErr ¶ added in v1.1.5
func WithLogErr() PLOption
func WithLogStep ¶ added in v1.1.4
func WithLogStep() PLOption
func WithReturnCount ¶ added in v1.1.4
func WithReturnCount() PLOption
type Pipeline ¶
type Pipeline[T any] struct { // contains filtered or unexported fields }
Pipeline spawns threads for all stage functions and orchestrates channel signals between them.
func NewPipeline ¶
func (*Pipeline[T]) Batch ¶ added in v0.1.3
Batch pushes a special batch stage onto the pipeline of size n. A batch allows you to operate on a set of items. This is helpful for expensive operations such as DB writes.
func (*Pipeline[T]) Buffer ¶ added in v0.1.4
Buffer pushes a special buffer stage onto the pipeline of size n. A buffer stage is effectively a buffered channel of size n in between the previous and next stage.
func (*Pipeline[T]) ForkOutIn ¶ added in v1.1.7
func (p *Pipeline[T]) ForkOutIn(funcs []func(data T) (T, error), merge func([]T) ([]T, error), opts ...Option) *Pipeline[T]
ForkOutIn is a Fork and Merge stage combined
func (*Pipeline[T]) Merge ¶ added in v1.1.0
Merge pushes a special merge stage onto the pipeline. A merge stage will merge all upstream branches into a single downstream branch with a merge function. Output is converted back into a stream.
func (*Pipeline[T]) MixPool ¶ added in v1.1.6
MixPool is a WorkerPool with heterogeneous processing. Size option is noop for MixPool
func (*Pipeline[T]) Option ¶ added in v1.1.0
Option pushes a special option stage onto the pipeline. An option stage is effectively a buffer of size N where N is the number of handler functions. A record emitted from an upstream branch has an equal chance of entering any one of the option functions which are ready to receive.
func (*Pipeline[T]) Run ¶
func (p *Pipeline[T]) Run() ([]PLNodeCount, error)
Run builds and launches all the pipeline stages.
func (*Pipeline[T]) Stage ¶
Stage pushes a new stage onto the pipeline. A stage should have > 0 transform functions. Each transform function beyond the first forks the pipeline into an additional downstream branch.
func (*Pipeline[T]) Throttle ¶
Throttle pushes a special throttle stage onto the pipeline. A throttle stage will converge all upstream branches into n downstream branches.
func (*Pipeline[T]) WorkPool ¶ added in v1.1.9
func (p *Pipeline[T]) WorkPool(f func(data T) (T, error), numWorkers uint, opts ...Option) *Pipeline[T]
WorkPool is a wrapper around an Option stage that allows for more control. N workers are spawned to wrap the given handler with B buffer and R retries where N is numWorkers, B is Option->Buffer, and R is Option->Retry.
func (*Pipeline[T]) WorkerPool ¶ added in v1.1.4
WorkerPool is a wrapper around an Option stage that allows for more control. N workers are spawned to wrap the given handler with B buffer and R retries where N is Option->Size, B is Option->Buffer, and R is Option->Retry.
type Stepper ¶
type Stepper[T any] struct { // contains filtered or unexported fields }
Stepper provides a central point to block all threads and await user intervention.
func NewStepper ¶
type TokenBucket ¶ added in v1.1.1
type TokenBucket struct {
// contains filtered or unexported fields
}
func NewTokenBucket ¶ added in v1.1.1
func NewTokenBucket(size int) *TokenBucket
func (*TokenBucket) Push ¶ added in v1.1.1
func (t *TokenBucket) Push()
func (*TokenBucket) Take ¶ added in v1.1.1
func (t *TokenBucket) Take()
type WorkerConfig ¶ added in v1.1.4
type WorkerConfig struct {
// contains filtered or unexported fields
}
type WorkerPool ¶ added in v1.1.2
type WorkerPool[T, R any] struct { // contains filtered or unexported fields }
func NewWorkerPool ¶ added in v1.1.2
func NewWorkerPool[T, R any](size int, handler func(T) (R, error), opts ...Option) *WorkerPool[T, R]
func (*WorkerPool[T, R]) Boot ¶ added in v1.1.2
func (b *WorkerPool[T, R]) Boot() *WorkerPool[T, R]
Boot spawns workers.
func (*WorkerPool[T, R]) Close ¶ added in v1.1.2
func (b *WorkerPool[T, R]) Close() *WorkerPool[T, R]
Close shuts down workers and waits for exit.
func (*WorkerPool[T, R]) Collect ¶ added in v1.1.3
func (b *WorkerPool[T, R]) Collect() (res []R, errs []error)
Collect clears and returns results and errors.
func (*WorkerPool[T, R]) IsErr ¶ added in v1.1.2
func (b *WorkerPool[T, R]) IsErr() bool
func (*WorkerPool[T, R]) Push ¶ added in v1.1.2
func (b *WorkerPool[T, R]) Push(items ...T) *WorkerPool[T, R]
func (*WorkerPool[T, R]) TakeErrors ¶ added in v1.1.2
func (b *WorkerPool[T, R]) TakeErrors() (errs []error)
TakeErrors clears and returns internal errors.
func (*WorkerPool[T, R]) TakeResults ¶ added in v1.1.2
func (b *WorkerPool[T, R]) TakeResults() (res []R)
TakeResults clears and returns internal results.