gliter

package module
v1.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 19, 2025 License: MIT Imports: 9 Imported by: 0

README

Gliter ✨

Go Reference Go Build Go Report Card

Composable async & concurrency patterns for Go.
Write pipelines, worker pools, and async utilities without worrying about race conditions, deadlocks, or goroutine leaks.


Quick Start

Install:

go get github.com/arrno/gliter

Import:

import "github.com/arrno/gliter"

Table of Contents


Overview

Gliter makes it easy to assemble normal business logic into complex async flows.

Instead of spending time debugging goroutines, channels, or leaks, you define your data flow declaratively and let Gliter handle the concurrency patterns.


Pipeline

Compose stages of functions into a branching async pipeline:

gliter.NewPipeline(streamTransactionsFromKafka).
    Stage(
        preprocessFeatures,
    ).
    Fork(
        runFraudModel,
        checkBusinessRules,
    ).
    Merge(aggregateResults).
    Fork(
        sendToAlertSystem,
        storeInDatabase,
    ).
    Run()

Key properties:

  • Data always flows downstream from the generator through each stage.
  • Side effects (DB writes, API calls, etc.) belong inside stage functions.
  • Errors short-circuit the pipeline automatically.

Options:


Fork

Add multiple handlers in one stage to fork the pipeline (Fork is an alias for Stage):

gliter.NewPipeline(exampleGen()).
    Fork(
        exampleMid, // branch A
        exampleAlt, // branch B
    ).
    Stage(exampleEnd).
    Run()

👉 Each downstream stage is duplicated for each branch.
⚠️ If processing pointers, clone before mutating downstream.

For branching without duplicating streams, use WorkPool Stage, Option Stage, or InParallel.


WorkPool Stage

Fans out a handler function into N concurrent workers.
Each record is processed by exactly one worker (no cloning or duplication), then multiplexed onto the single downstream stream.

Configure behavior with options:

  • WithBuffer(M) → buffered channel capacity between upstream and workers
  • WithRetry(R) → automatic retries on failure

Allows fine-grained control over throughput, backpressure, and fault tolerance.

gliter.NewPipeline(exampleGen()).
    WorkPool(
        func(item int) (int, error) { return 1 + item, nil },
        3, // numWorkers
        WithBuffer(6),
        WithRetry(2),
    ).
    WorkPool(
        func(item int) (int, error) { return 2 + item, nil },
        6, // numWorkers
        WithBuffer(12),
        WithRetry(2),
    ).
    Run()

MixPool

Use when a worker pool needs distinct handlers but you still want automatic backpressure.
It's a convenience wrapper around WorkPool: pass a slice of handlers and Gliter will fan the stream across them while keeping worker semantics.

gliter.NewPipeline(exampleGen()).
    MixPool([]func(int) (int, error){
        func(item int) (int, error) { return item + 1, nil },
        func(item int) (int, error) { return item * 2, nil },
    },
        WithRetry(1),
    ).
    Run()

Throttle

Control concurrency when downstream stages overwhelm your DB or API:

gliter.NewPipeline(exampleGen()).
    Fork(exampleMid, exampleMid).
    Fork(exampleMid, exampleMid, exampleMid).
    Throttle(2).
    Stage(exampleEnd).
    Run()

Merge

Combine multiple branches into one:

gliter.NewPipeline(exampleGen()).
    Fork(exampleMid, exampleMid).
    Merge(func(items []int) ([]int, error) {
        sum := 0
        for _, item := range items {
            sum += item
        }
        return []int{sum}, nil
    }).
    Stage(exampleEnd).
    Run()

ForkOutIn

Shortcut for "fork, do a little work, then merge" flows.
Under the hood it's just a Fork followed by Merge, so keep it brief for small aggregations.

gliter.NewPipeline(exampleGen()).
    ForkOutIn(
        []func(int) (int, error){
            func(item int) (int, error) { return item + 1, nil },
            func(item int) (int, error) { return item - 1, nil },
        },
        func(items []int) ([]int, error) { return []int{items[0] - items[1]}, nil },
    ).
    Run()

Stick with the dedicated Fork/Merge stages when you need more complex fan-out trees.


Batch

Batch records for bulk operations:

func exampleBatch(items []int) ([]int, error) {
    if err := storeToDB(items); err != nil {
        return nil, err
    }
    return items, nil
}

gliter.NewPipeline(exampleGen()).
    Stage(exampleMid).
    Batch(100, exampleBatch).
    Stage(exampleEnd).
    Run()

Option

Route each record to exactly one handler (no cloning):

gliter.NewPipeline(exampleGen()).
    Stage(exampleMid).
    Option(
        func(item int) (int, error) { return 1 + item, nil },
        func(item int) (int, error) { return 2 + item, nil },
        func(item int) (int, error) { return 3 + item, nil },
    ).
    Stage(exampleEnd).
    Run()

Buffer

Insert a buffer before a slow stage:

gliter.NewPipeline(exampleGen()).
    Stage(exampleMid).
    Buffer(5).
    Stage(exampleEnd).
    Run()

Context / Cancel

Use the WithContext option for timeout/cancel:

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Second)
defer cancel()

gliter.NewPipeline(
    exampleGen(),
    gliter.WithContext(ctx),
).
    Stage(exampleMid).
    Stage(exampleEnd).
    Run()

Count / Tally

Count items processed, either via config:

counts, err := gliter.NewPipeline(
    exampleGen(),
    gliter.WithReturnCount(),
).Run()

Or with a tally channel:

pipeline := NewPipeline(exampleGen())
tally := pipeline.Tally()

Insight

Enable logging for debugging:

  • WithLogCount — summary counts
  • WithLogEmit — every emission
  • WithLogErr — errors
  • WithLogAll — all above
  • WithLogStep — interactive stepper
gliter.NewPipeline(
    exampleGen(),
    gliter.WithLogAll(),
).
    Fork(exampleMid, exampleAlt).
    Stage(exampleEnd).
    Run()

InParallel

Fan-out tasks, run concurrently, and collect results in order:

tasks := []func() (string, error){
    func() (string, error) { return "Hello", nil },
    func() (string, error) { return ", ", nil },
    func() (string, error) { return "Async!", nil },
}

results, err := gliter.InParallel(tasks)

Also available: InParallelThrottle for token-bucket concurrency.


Worker Pool (standalone)

Generic worker pools in one line:

results, errors := gliter.NewWorkerPool(3, handler).
    Push(0, 1, 2, 3, 4).
    Close().
    Collect()

Supported WorkerPool Options:

  • WithRetry
  • WithBuffer

See ./examples/worker_pool/main.go for more.


Misc Utilities

  • ThrottleBy — custom throttling
  • TeeBy — channel forking
  • ReadOrDone, WriteOrDone, IterDone
  • Any — consolidate “done” channels
  • Multiplex — merge streams
List Type (sync helpers)
val := gliter.
    List(0, 1, 2, 3, 4).
    Filter(func(i int) bool { return i%2 == 0 }).
    Map(func(val int) int { return val * 2 }).
    Reduce(func(acc *int, val int) { *acc += val }) // 12

Includes Filter, Map, Reduce, Find, Len, Reverse, At, Slice, etc.


Examples


Contributing

PRs welcome! 🚀
If something feels missing, broken, or unclear, open an issue or submit a fix.

Documentation

Index

Constants

View Source
const (
	FORK stageType = iota
	THROTTLE
	BATCH
	BUFFER
	MERGE
	OPTION
	FAN_OUT
)
View Source
const MAX_PAD int = 20

Variables

View Source
var (
	ErrNoGenerator     error = errors.New("pipeline run error: invalid pipeline: no generator provided")
	ErrEmptyStage      error = errors.New("pipeline run error: empty stage: no functions provided in stage")
	ErrEmptyThrottle   error = errors.New("pipeline run error: empty throttle: throttle must be a positive value")
	ErrInvalidThrottle error = errors.New("pipeline run error: invalid throttle: throttle value cannot be higher than channel count")
	ErrNilFunc         error = errors.New("pipeline run error: invalid stage: Nil function")
	ErrContextCanceled error = errors.New("pipeline run error: context canceled")
	ErrWithRetry       error = errors.New("job run error: Handler returned error, will retry")
	ErrWithoutRetry    error = errors.New("job run error: Handler returned error")
)
View Source
var (
	ErrInvalidOperationNotRunning = errors.New("workerPool invalid operation error: workerPool is not running")
	ErrInvalidOperationRunning    = errors.New("workerPool invalid operation error: workerPool is running")
	ErrInvalidOperationHasRun     = errors.New("workerPool invalid operation error: workerPool has already run")
)

Functions

func Any

func Any(chans ...<-chan any) <-chan any

Any closes when any of the input channels closes.

func AnyWithCtx added in v1.1.4

func AnyWithCtx(ctx context.Context, chans ...<-chan any) <-chan any

func ChunkBy added in v0.1.6

func ChunkBy[T any](items []T, by int) [][]T

func Flatten added in v0.1.6

func Flatten[T any](chunks [][]T) []T

func InParallel

func InParallel[T any](funcs []func() (T, error)) ([]T, error)

InParallel runs all the functions asynchronously and returns the results in order or the first error.

func InParallelThrottle added in v1.1.1

func InParallelThrottle[T any](throttle int, funcs []func() (T, error)) ([]T, error)

InParallelThrottle runs all the functions asynchronously and returns the results in order or the first error. No more than `throttle` threads will be active at any given time.

func IterDone added in v0.1.4

func IterDone[T any](read <-chan T, done <-chan any) <-chan T

IterDone combines a read and done channel for convenient iterating. Iterate over the return channel knowing the loop will exit when either read or done are closed.

func Multiplex added in v1.1.1

func Multiplex[T any](inbound ...<-chan T) <-chan T

Multiplex merges any number of read channels into one consolidated read-only stream

func ReadOrDone

func ReadOrDone[T any](read <-chan T, done <-chan any) (T, bool)

ReadOrDone blocks until it receives from 'read' or receives from 'done' and returns the boolean result.

func TeeBy

func TeeBy[T any](in <-chan T, done <-chan interface{}, n int) (outR []<-chan T)

TeeBy broadcasts all received signals on provided channel into n output channels. This function returns when 'in' channel is closed or signal is received on 'done'.

func ThrottleBy

func ThrottleBy[T any](in []chan T, done <-chan interface{}, n int) (out []chan T)

ThrottleBy merges the output of the provided channels into n output channels. This function returns when 'in' channels are closed or signal is received on 'done'.

func WriteOrDone

func WriteOrDone[T any](val T, write chan<- T, done <-chan any) bool

WriteOrDone blocks until it sends to 'write' or receives from 'done' and returns the boolean result.

Types

type Iter

type Iter[T any] struct {
	// contains filtered or unexported fields
}

func List

func List[T any](vals ...T) *Iter[T]

func MakeList

func MakeList[T any](length uint, capacity uint) *Iter[T]

func Map

func Map[T, R any](list Iter[T], f func(T) R) *Iter[R]

Map converts and transforms Iter[T] to Iter[R].

func NewList

func NewList[T any]() *Iter[T]

func SliceToList

func SliceToList[T any](s []T) *Iter[T]

func (Iter[T]) At

func (l Iter[T]) At(index int) T

func (Iter[T]) Delete

func (l Iter[T]) Delete(index int) Iter[T]

func (*Iter[T]) FPop

func (l *Iter[T]) FPop() T

FPop inserts at the front of the list.

func (Iter[T]) Filter

func (l Iter[T]) Filter(f func(val T) bool) *Iter[T]

func (Iter[T]) Find

func (l Iter[T]) Find(f func(val T) bool) (T, bool)

func (Iter[T]) Insert

func (l Iter[T]) Insert(index int, val T) Iter[T]

func (*Iter[T]) Iter

func (l *Iter[T]) Iter() []T

func (Iter[T]) Len

func (l Iter[T]) Len() int

func (Iter[T]) Map

func (l Iter[T]) Map(f func(val T) T) Iter[T]

Map transforms the values of Iter without changing the type. To change the type, see non-receiver alternative 'Map' function.

func (*Iter[T]) Pop

func (l *Iter[T]) Pop() T

func (*Iter[T]) Push

func (l *Iter[T]) Push(val T)

func (Iter[T]) Reduce

func (l Iter[T]) Reduce(f func(acc *T, val T)) *T

func (Iter[T]) Reverse

func (l Iter[T]) Reverse() Iter[T]

func (Iter[T]) Slice

func (l Iter[T]) Slice(start, stop int) Iter[T]

func (Iter[T]) Unwrap

func (l Iter[T]) Unwrap() []T

type Option added in v1.1.4

type Option func(*WorkerConfig)

func WithBuffer added in v1.1.4

func WithBuffer(buffer int) Option

func WithRetry added in v1.1.4

func WithRetry(attempts int) Option

func WithSize added in v1.1.4

func WithSize(size int) Option

type PLConfig

type PLConfig struct {
	LogAll      bool
	LogEmit     bool
	LogCount    bool
	ReturnCount bool
	LogStep     bool
	LogErr      bool
	// contains filtered or unexported fields
}

PLConfig controls limited pipeline behavior.

type PLNode

type PLNode[T any] struct {
	// contains filtered or unexported fields
}

PLNode is a representation of a pipeline node for logging/insight-tracking.

func NewPLNode

func NewPLNode[T any]() *PLNode[T]

func NewPLNodeAs

func NewPLNodeAs[T any](id string, val T) *PLNode[T]

func (*PLNode[T]) CollectCount

func (n *PLNode[T]) CollectCount() []PLNodeCount

func (*PLNode[T]) Count

func (n *PLNode[T]) Count() PLNodeCount

func (*PLNode[T]) Inc

func (n *PLNode[T]) Inc()

func (*PLNode[T]) IncAs

func (n *PLNode[T]) IncAs(val T)

func (*PLNode[T]) IncAsBatch added in v0.1.3

func (n *PLNode[T]) IncAsBatch(val []T)

func (*PLNode[T]) Print

func (n *PLNode[T]) Print()

func (*PLNode[T]) PrintFullBF

func (n *PLNode[T]) PrintFullBF()

PrintFullBF will print the full PLNode tree breadth-first to stdout.

func (*PLNode[T]) Set

func (n *PLNode[T]) Set(val T)

func (*PLNode[T]) Spawn

func (n *PLNode[T]) Spawn() *PLNode[T]

func (*PLNode[T]) SpawnAs

func (n *PLNode[T]) SpawnAs(child *PLNode[T])

func (*PLNode[T]) State

func (n *PLNode[T]) State() (string, int, T)

func (*PLNode[T]) StateArr

func (n *PLNode[T]) StateArr() []string

type PLNodeCount

type PLNodeCount struct {
	NodeID string
	Count  int
}

type PLOption added in v1.1.4

type PLOption func(*PLConfig)

func WithContext added in v1.1.4

func WithContext(ctx context.Context) PLOption

func WithLogAll added in v1.1.4

func WithLogAll() PLOption

func WithLogCount added in v1.1.4

func WithLogCount() PLOption

func WithLogEmit added in v1.1.4

func WithLogEmit() PLOption

func WithLogErr added in v1.1.5

func WithLogErr() PLOption

func WithLogStep added in v1.1.4

func WithLogStep() PLOption

func WithReturnCount added in v1.1.4

func WithReturnCount() PLOption

type Pipeline

type Pipeline[T any] struct {
	// contains filtered or unexported fields
}

Pipeline spawns threads for all stage functions and orchestrates channel signals between them.

func NewPipeline

func NewPipeline[T any](gen func() (T, bool, error), opts ...PLOption) *Pipeline[T]

func (*Pipeline[T]) Batch added in v0.1.3

func (p *Pipeline[T]) Batch(n int, f func(set []T) ([]T, error)) *Pipeline[T]

Batch pushes a special batch stage onto the pipeline of size n. A batch allows you to operate on a set of items. This is helpful for expensive operations such as DB writes.

func (*Pipeline[T]) Buffer added in v0.1.4

func (p *Pipeline[T]) Buffer(n uint) *Pipeline[T]

Buffer pushes a special buffer stage onto the pipeline of size n. A buffer stage is effectively a buffered channel of size n in between the previous and next stage.

func (*Pipeline[T]) Config

func (p *Pipeline[T]) Config(config PLConfig) *Pipeline[T]

func (*Pipeline[T]) Fork added in v1.1.9

func (p *Pipeline[T]) Fork(fs ...func(data T) (T, error)) *Pipeline[T]

Fork is a more explicit alias for Stage

func (*Pipeline[T]) ForkOutIn added in v1.1.7

func (p *Pipeline[T]) ForkOutIn(funcs []func(data T) (T, error), merge func([]T) ([]T, error), opts ...Option) *Pipeline[T]

ForkOutIn is a Fork and Merge stage combined

func (*Pipeline[T]) Merge added in v1.1.0

func (p *Pipeline[T]) Merge(f func(data []T) ([]T, error)) *Pipeline[T]

Merge pushes a special merge stage onto the pipeline. A merge stage will merge all upstream branches into a single downstream branch with a merge function. Output is converted back into a stream.

func (*Pipeline[T]) MixPool added in v1.1.6

func (p *Pipeline[T]) MixPool(funcs []func(data T) (T, error), opts ...Option) *Pipeline[T]

MixPool is a WorkerPool with heterogeneous processing. Size option is noop for MixPool

func (*Pipeline[T]) Option added in v1.1.0

func (p *Pipeline[T]) Option(fs ...func(data T) (T, error)) *Pipeline[T]

Option pushes a special option stage onto the pipeline. An option stage is effectively a buffer of size N where N is the number of handler functions. A record emitted from an upstream branch has an equal chance of entering any one of the option functions which are ready to receive.

func (*Pipeline[T]) Run

func (p *Pipeline[T]) Run() ([]PLNodeCount, error)

Run builds and launches all the pipeline stages.

func (*Pipeline[T]) Stage

func (p *Pipeline[T]) Stage(fs ...func(data T) (T, error)) *Pipeline[T]

Stage pushes a new stage onto the pipeline. A stage should have > 0 transform functions. Each transform function beyond the first forks the pipeline into an additional downstream branch.

func (*Pipeline[T]) Tally

func (p *Pipeline[T]) Tally() chan<- int

func (*Pipeline[T]) Throttle

func (p *Pipeline[T]) Throttle(n uint) *Pipeline[T]

Throttle pushes a special throttle stage onto the pipeline. A throttle stage will converge all upstream branches into n downstream branches.

func (*Pipeline[T]) WorkPool added in v1.1.9

func (p *Pipeline[T]) WorkPool(f func(data T) (T, error), numWorkers uint, opts ...Option) *Pipeline[T]

WorkPool is a wrapper around an Option stage that allows for more control. N workers are spawned to wrap the given handler with B buffer and R retries where N is numWorkers, B is Option->Buffer, and R is Option->Retry.

func (*Pipeline[T]) WorkerPool added in v1.1.4

func (p *Pipeline[T]) WorkerPool(f func(data T) (T, error), opts ...Option) *Pipeline[T]

WorkerPool is a wrapper around an Option stage that allows for more control. N workers are spawned to wrap the given handler with B buffer and R retries where N is Option->Size, B is Option->Buffer, and R is Option->Retry.

type Stepper

type Stepper[T any] struct {
	// contains filtered or unexported fields
}

Stepper provides a central point to block all threads and await user intervention.

func NewStepper

func NewStepper[T any](root *PLNode[T]) *Stepper[T]

func (*Stepper[T]) Run

func (s *Stepper[T]) Run() (chan<- any, <-chan any)

Run launches a stepper thread and provides a signal channel for external threads to write to. If this function exists, the returned 'done' channel is closed.

type TokenBucket added in v1.1.1

type TokenBucket struct {
	// contains filtered or unexported fields
}

func NewTokenBucket added in v1.1.1

func NewTokenBucket(size int) *TokenBucket

func (*TokenBucket) Push added in v1.1.1

func (t *TokenBucket) Push()

func (*TokenBucket) Take added in v1.1.1

func (t *TokenBucket) Take()

type WorkerConfig added in v1.1.4

type WorkerConfig struct {
	// contains filtered or unexported fields
}

type WorkerPool added in v1.1.2

type WorkerPool[T, R any] struct {
	// contains filtered or unexported fields
}

func NewWorkerPool added in v1.1.2

func NewWorkerPool[T, R any](size int, handler func(T) (R, error), opts ...Option) *WorkerPool[T, R]

func (*WorkerPool[T, R]) Boot added in v1.1.2

func (b *WorkerPool[T, R]) Boot() *WorkerPool[T, R]

Boot spawns workers.

func (*WorkerPool[T, R]) Close added in v1.1.2

func (b *WorkerPool[T, R]) Close() *WorkerPool[T, R]

Close shuts down workers and waits for exit.

func (*WorkerPool[T, R]) Collect added in v1.1.3

func (b *WorkerPool[T, R]) Collect() (res []R, errs []error)

Collect clears and returns results and errors.

func (*WorkerPool[T, R]) IsErr added in v1.1.2

func (b *WorkerPool[T, R]) IsErr() bool

func (*WorkerPool[T, R]) Push added in v1.1.2

func (b *WorkerPool[T, R]) Push(items ...T) *WorkerPool[T, R]

func (*WorkerPool[T, R]) TakeErrors added in v1.1.2

func (b *WorkerPool[T, R]) TakeErrors() (errs []error)

TakeErrors clears and returns internal errors.

func (*WorkerPool[T, R]) TakeResults added in v1.1.2

func (b *WorkerPool[T, R]) TakeResults() (res []R)

TakeResults clears and returns internal results.

Directories

Path Synopsis
examples
new command
original command
worker_pool command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL