Documentation
¶
Overview ¶
Package flow provides primitives for building flow-based processing pipelines. It implements a pull-based streaming model that enables efficient data processing through composable components.
Core Concepts ¶
A Stream represents a sequence of values that can be iterated over using a pull-based model. Consumers request values through a yield function, and the stream continues producing values until either the source is exhausted or the consumer stops requesting values.
A Source is the fundamental building block for creating data pipelines. It provides a single method Out that returns a Stream of values. Sources can be created from various inputs including slices, channels, iterables, and time-based events.
A Flow represents a factory function that creates a Source. Flows provide a way to create reusable pipeline components that can be composed together. They support operations like merging multiple sources and cloning streams.
Key Features ¶
- Context-aware processing with cancellation support
- Built-in error handling and propagation
- Type-safe data processing
- Composable pipeline components
- Support for both synchronous and asynchronous processing
Common Source Constructors ¶
- NewFromItems: Creates a Flow from a variadic list of items
- NewFromChannel: Creates a Flow from a channel
- NewFromIterable: Creates a Flow from an Iterable source
- NewFromTicker: Creates a Flow that emits timestamps at specified intervals
- NewFromRange: Creates a Flow that emits a sequence of integers
- Merge: Combines multiple sources into a single Flow
Example Usage ¶
Creating a simple stream:
stream := func(yield func(any) bool) {
for i := 0; i < 5; i++ {
if !yield(i) {
return // Stop if consumer requests
}
}
}
Creating and using a Flow:
// Create a Flow that emits numbers 1 through 5
flow := NewFromSourceFunc(func(ctx context.Context) Stream {
return func(yield func(any) bool) {
for i := 1; i <= 5; i++ {
if ctx.Err() != nil {
return
}
if !yield(i) {
return
}
}
}
})
// Use the flow
ctx := context.Background()
for item := range flow.Stream(ctx) {
fmt.Println(item)
}
Error Handling ¶
The package includes built-in error handling through context. Errors can be set using the SetError function and will be propagated through the pipeline:
if err != nil {
SetError(ctx, err)
continue
}
Thread Safety ¶
All Flow operations are designed to be thread-safe when used with proper context cancellation. Multiple goroutines can safely consume from the same Flow using Clone() to create independent copies of the stream.
Index ¶
- func Collect[T any](ctx context.Context, src Source) ([]T, error)
- func SetError(ctx context.Context, err error)
- type ChannelSink
- type FanOutSink
- type Flow
- func Merge(sources ...Source) Flow
- func NewFromChannel[T any](ch <-chan T) Flow
- func NewFromItems[T any](items ...T) Flow
- func NewFromIterable(it Iterable) Flow
- func NewFromRange(start, end, step int) Flow
- func NewFromSource(s Source) Flow
- func NewFromSourceFunc(fn func(context.Context) Stream) Flow
- func NewFromTicker(interval time.Duration) Flow
- type FlowError
- type Iterable
- type KeepDistinctOptions
- type Sinker
- type SinkerFunc
- type SliceSink
- type SlidingWindowOptions
- type Source
- type SourceFunc
- type Stream
- type Transform
- func Chunk[T any](n int) Transform
- func Filter[T any](fn func(context.Context, T) bool) Transform
- func FlatMap[T any, U any](fn func(context.Context, T) ([]U, error)) Transform
- func Flatten[T []U, U any]() Transform
- func Join(pipes ...Transform) Transform
- func Keep(n int) Transform
- func KeepDistinct[T any](opts ...func(*KeepDistinctOptions[T])) Transform
- func KeepFirst() Transform
- func KeepIf[T any](fn func(context.Context, T) bool) Transform
- func KeepLast() Transform
- func Limit(limit rate.Limit, burst int) Transform
- func Map[T any, U any](fn func(context.Context, T) (U, error)) Transform
- func OmitIf[T any](fn func(context.Context, T) bool) Transform
- func ParallelMap[T any, U any](n int, fn func(context.Context, T) (U, error)) Transform
- func Pass() Transform
- func Reduce[T any](fn func(ctx context.Context, acc T, item T) (T, error)) Transform
- func Skip(n int) Transform
- func SlidingWindow[T any](opts ...func(*SlidingWindowOptions)) Transform
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ChannelSink ¶
type ChannelSink[T any] chan<- T
ChannelSink is a Sinker sends items from a Stream to a typed channel. It provides type-safe forwarding of items while respecting context cancellation.
func NewChannelSink ¶
func NewChannelSink[T any](ch chan<- T) ChannelSink[T]
NewChannelSink creates a new ChannelSink from the provided channel.
type FanOutSink ¶
type FanOutSink[T any] struct { // Flows maps keys to flow transformation functions. Each key represents a partition // and its associated function defines how items in that partition should be processed. Flows map[string]func(Source) Flow // Key determines the partition key for each item. It receives the context // and the item, returning a string key that determines which flow will process the item. Key func(context.Context, T) string // contains filtered or unexported fields }
FanOutSink is a Sinker that distributes items from a Stream into multiple flows based on a key function. It allows for dynamic partitioning of data streams based on item characteristics.
func (*FanOutSink[T]) Collect ¶
func (f *FanOutSink[T]) Collect(ctx context.Context, in Stream)
In implements the Sinker interface. It partitions incoming items based on the key function and creates a separate Source for each partition.
func (FanOutSink[T]) Source ¶
func (f FanOutSink[T]) Source(key string) Source
Source returns the Source associated with the given key after partitioning. If no Source exists for the key, an empty stream source is returned.
func (FanOutSink[T]) Sources ¶
func (f FanOutSink[T]) Sources() []Source
Sources returns the list of Source partitions created during processing.
type Flow ¶
type Flow func() Source
Flow represents a factory function that creates a Source. It provides a way to create reusable pipeline components that can be composed together.
func Merge ¶
Merge combines multiple sources into a single Flow. It processes each Source sequentially, emitting all values from one source before moving to the next.
The function will process sources in order until either:
- All sources are exhausted
- The context is cancelled
- The consumer stops accepting values
func NewFromChannel ¶
NewFromChannel creates a Flow from a channel. It provides a way to convert any typed channel for use in flow-based processing pipelines.
The flow will continue reading from the channel until it is closed or the context is cancelled.
Example:
// Create a channel and flow
ch := make(chan int)
flow := NewFromChannel(ch)
// Send values in a separate goroutine
go func() {
defer close(ch)
for i := 1; i <= 5; i++ {
ch <- i
}
}()
// Process values from the flow
ctx := context.Background()
for item := range flow.Stream(ctx) {
fmt.Println(item)
}
func NewFromItems ¶
NewFromItems creates a Flow from a variadic list of items. It provides a convenient way to create a flow from a known set of values of any type T.
Example:
// Create a Flow from integers
flow := NewFromItems(1, 2, 3, 4, 5)
// Use the flow
ctx := context.Background()
for item := range flow.Stream(ctx) {
fmt.Println(item)
}
func NewFromIterable ¶
NewFromIterable creates a Flow from an Iterable source. It handles the conversion of the iteration-based interface to a stream-based flow.
func NewFromRange ¶
NewFromRange creates a Flow that emits a sequence of integers from start (inclusive) to end (exclusive), incrementing by the specified step value.
The function will stop emitting values when either:
- The sequence reaches or exceeds the end value
- The context is cancelled
- The consumer stops accepting values
func NewFromSource ¶
NewFromSource creates a Flow from an existing Source, allowing it to be used in flow-based processing pipelines.
Example:
// Create a simple source
source := SourceFunc(func(ctx context.Context) Stream {
return func(yield func(any) bool) {
for i := 1; i <= 3; i++ {
if ctx.Err() != nil {
return
}
if !yield(i) {
return
}
}
}
})
// Create a Flow from the source
flow := NewFromSource(source)
// Use the flow in a pipeline
ctx := context.Background()
for item := range flow.Stream(ctx) {
fmt.Println(item)
}
func NewFromSourceFunc ¶
NewFromSourceFunc creates a Flow from a function that produces a Stream. This is a convenience function that wraps a stream-producing function into a flow, making it easier to create flows without explicitly implementing the Source interface.
Example:
// Create a Flow that emits numbers 1 through 5
flow := NewFromSourceFunc(func(ctx context.Context) Stream {
return func(yield func(any) bool) {
for i := 1; i <= 5; i++ {
// Check for context cancellation
if ctx.Err() != nil {
return
}
// Emit value, stop if consumer requests
if !yield(i) {
return
}
}
}
})
// Use the flow
ctx := context.Background()
for item := range flow.Stream(ctx) {
fmt.Println(item)
}
func NewFromTicker ¶
NewFromTicker creates a Flow that emits timestamps at specified intervals. It continues to emit timestamps until either the context is cancelled or the consumer stops accepting values.
The timestamps are of type time.Time and represent the actual time when the interval elapsed. Note that the actual intervals may be slightly longer than specified due to system scheduling and processing delays.
func (Flow) Collect ¶
Collect executes the Flow by sending its output to the provided Sinker. It handles error propagation and context cancellation while processing the Stream.
The method will stop processing when either:
- The flow completes successfully
- The context is cancelled
- An error occurs during processing
func (Flow) Stream ¶
Stream implements Source, returning the generated Stream from the Flow using the provided context.
func (Flow) Transform ¶
Transform applies a series of transformations to the Flow by joining the provided transformers and creating a new Flow with the combined Transform.
Example:
f := flow.NewFromItems(1, 2, 3, 4).Transform(
Map(func(ctx context.Context, n int) (int, error) { return n * 2, nil }),
Filter(func(ctx context.Context, n int) bool { return n > 5 }),
)
type FlowError ¶
type FlowError struct {
sync.Mutex // Embedded mutex for thread-safe operations
// contains filtered or unexported fields
}
FlowError represents a collection of errors that occurred during Flow processing. It provides thread-safe error collection and aggregation capabilities.
func (*FlowError) Append ¶
Append adds an error to the FlowError collection in a thread-safe manner. Nil errors won't contribute to the joined error text generated by FlowError.Error.
type Iterable ¶
type Iterable interface {
// HasNext checks if there are more items available in the sequence.
// The context parameter allows for cancellation of the check operation.
HasNext(context.Context) bool
// Next retrieves the next item in the sequence.
// Returns the next item and any error that occurred during retrieval.
// The context parameter allows for cancellation of the retrieval operation.
Next(context.Context) (any, error)
// Close performs cleanup of any resources used by the Iterable.
// The context parameter allows for cancellation of the cleanup operation.
Close(context.Context)
}
Iterable represents a sequence of values that can be traversed. It provides methods for checking if more values exist, retrieving the next value, and cleaning up resources.
Example:
type DBIterator struct {
rows *sql.Rows
closed bool
}
func NewDBIterator(db *sql.DB, query string) (*DBIterator, error) {
rows, err := db.Query(query)
if err != nil {
return nil, err
}
return &DBIterator{rows: rows}, nil
}
func (dbi *DBIterator) HasNext(ctx context.Context) bool {
if dbi.closed {
return false
}
return dbi.rows.Next()
}
func (dbi *DBIterator) Next(ctx context.Context) (any, error) {
var data any
err := dbi.rows.Scan(&data)
return data, err
}
func (dbi *DBIterator) Close(ctx context.Context) {
if !dbi.closed {
dbi.rows.Close()
dbi.closed = true
}
}
type KeepDistinctOptions ¶
type Sinker ¶
type Sinker interface {
// Collect processes items from the provided [Stream] within the given context.
// The implementation should respect context cancellation.
Collect(ctx context.Context, stream Stream)
}
Sinker represents a destination for a Flow's output. It provides a way to consume and process values from a Stream.
func Discard ¶
func Discard() Sinker
Discard creates a Sinker that consumes and discards all items from a Stream. It respects context cancellation while efficiently draining the stream without allocating memory for the items.
The function is useful for:
- Draining streams when values are not needed
- Testing flow processing without storing results
- Completing flow processing when only side effects matter
type SinkerFunc ¶
SinkerFunc is a function type that implements the Sinker interface, allowing simple functions to be used as Sinkers.
func (SinkerFunc) Collect ¶
func (fn SinkerFunc) Collect(ctx context.Context, in Stream)
Collect implements the Sinker interface for SinkerFunc.
type SliceSink ¶
type SliceSink[T any] struct { // contains filtered or unexported fields }
SliceSink collects items from a Stream into a slice. It provides a type-safe way to accumulate items of type T into a slice while respecting context cancellation.
func NewSliceSink ¶
NewSliceSink creates a new SliceSink with the provided initial slice. If nil is provided, a new empty slice will be created.
type SlidingWindowOptions ¶
type Source ¶
type Source interface {
// Stream returns a [Stream] that produces values from this [Source]. The returned Stream
// should continue producing values until either:
// - The source is exhausted
// - The provided context is cancelled
// - The consumer stops requesting values
Stream(context.Context) Stream
}
Source defines an interface for producing streams of values. A Source is the fundamental building block for creating data pipelines. It provides a single method Out that returns a Stream of values.
type SourceFunc ¶
SourceFunc is a function type that implements the Source interface. It allows regular functions to be used as Sources by implementing the [Source.Stream] method.
func (SourceFunc) Stream ¶
func (fn SourceFunc) Stream(ctx context.Context) Stream
Stream implements the Source interface for SourceFunc.
type Stream ¶
Stream represents a sequence of any values that can be iterated over. It uses a pull-based iteration model where consumers request values through a yield function. The Stream will continue producing values until either the source is exhausted or the yield function returns false. The underlying function of Stream has the following signature:
func(yield func(any) bool)
Example creating a simple stream:
stream := func(yield func(any) bool) {
for i := 0; i < 5; i++ {
if !yield(i) {
return // Stop if consumer requests
}
}
}
Example consuming a stream using a range expression:
for item := range stream {
fmt.Println(item)
if item == 3 {
break // Stop if condition is met
}
}
func Empty ¶
func Empty() Stream
Empty creates a Stream that produces no values. This can be useful as a default or placeholder source in situations where a no-op source is needed.
Example:
// Use Empty as a fallback flow
var flow Source
if someCondition {
flow = NewFromItems(1, 2, 3)
} else {
flow = Empty()
}
// The empty flow will produce no values
ctx := context.Background()
for item := range flow.Stream(ctx) {
// This loop will not execute for Empty flow
}
type Transform ¶
Transform is a function type that transforms one Source into another Source. It represents a processing stage in a data pipeline that can modify, filter, or transform the data Stream.
func Chunk ¶
Chunk creates a new Transform that groups items into fixed-size chunks. The last chunk may contain fewer items if the stream length is not divisible by n. Each chunk is emitted as a slice of items.
Example:
// Group numbers into chunks of 2
flow := NewFromItems(1, 2, 3, 4, 5).
Transform(Chunk[int](2))
result, err := Collect[[]int](ctx, flow)
if err != nil {
log.Fatal(err)
}
fmt.Println(result) // [[1, 2], [3, 4], [5]]
func Filter ¶
Filter creates a new Transform that selectively includes items from the Stream based on the provided predicate function. Only items that cause the predicate to return true are included in the output stream.
Example:
// Keep only even numbers
flow := NewFromItems(1, 2, 3, 4, 5).
Transform(Filter(func(ctx context.Context, n int) bool {
return n%2 == 0
}))
result, err := Collect[int](ctx, flow)
if err != nil {
log.Fatal(err)
}
fmt.Println(result) // [2, 4]
func FlatMap ¶
FlatMap creates a new Transform component that transforms items using the provided mapping function. Each input item is transformed into a slice of output items, which are then sent individually downstream.
func Flatten ¶
Flatten creates a new Transform component that receives a batch of items and flattens it, sending each item from the upstream batch individually downstream. It is the semantic equivalent of:
FlatMap(func(in []T) []T { return in })
For example, to handle an upstream csv in indexed row order, call:
// flatten a 2-D array of strings Flatten[[][]string]() // [][]string - Flatten -> []string
func Join ¶
Join combines multiple transformers into a single Transform by composing them in order. Each transformers's output becomes the input to the next transformer in the sequence.
Join returns Pass if no transformers are provided.
func Keep ¶
Keep creates a new Transform that limits the stream to at most n items from the input stream. Once n items have been processed, any remaining input items are ignored.
Example:
// Keep first 3 items
f := NewFromItems(1, 2, 3, 4, 5).
Transform(Keep(3))
results, err := Collect[int](ctx, f)
if err != nil {
log.Fatal(err)
}
fmt.Println(results) // [1, 2, 3]
func KeepDistinct ¶
func KeepDistinct[T any](opts ...func(*KeepDistinctOptions[T])) Transform
KeepDistinct creates a new Transform that filters out duplicate elements from the Stream. It accepts optional configuration functions to customize the Flow's behavior. By default, it uses the string representation of elements as the uniqueness key.
Example:
// Remove duplicate numbers
flow := NewFromItems(1, 2, 2, 3, 3, 4).
Transform(KeepDistinct[int]())
result, err := Collect[int](ctx, flow)
if err != nil {
log.Fatal(err)
}
fmt.Println(result) // [1, 2, 3, 4]
func KeepFirst ¶
func KeepFirst() Transform
KeepFirst creates a new Transform that takes only the first item from the stream and terminates processing. It is semantically equivalent to Keep(1).
Example:
// Get first item from stream
f := NewFromItems(1, 2, 3, 4, 5).
Transform(KeepFirst())
result, err := Collect[int](ctx, f)
if err != nil {
log.Fatal(err)
}
fmt.Println(result) // [1]
func KeepIf ¶
KeepIf creates a new Transform that selectively includes items from the stream based on the provided predicate function. It is an alias for Filter.
Example:
// Keep only positive numbers
flow := NewFromItems(-2, -1, 0, 1, 2).
Transform(KeepIf(func(ctx context.Context, n int) bool {
return n > 0
}))
result, err := Collect[int](ctx, flow)
if err != nil {
log.Fatal(err)
}
fmt.Println(result) // [1, 2]
func KeepLast ¶
func KeepLast() Transform
KeepLast creates a new Transform that yields only the last item from the stream. If the stream is empty, no items are yielded.
Example:
// Get the last item from a stream of numbers
flow := NewFromItems(1, 2, 3, 4, 5).
Transform(KeepLast())
result, err := Collect[int](ctx, f)
if err != nil {
log.Fatal(err)
}
fmt.Println(result) // [5]
func Limit ¶
Limit creates a new Transform that applies rate limiting to the stream using the token bucket algorithm. Items are processed according to the specified rate limit and burst size.
Example:
// Process items at a rate of 10 per second with max burst of 20
flow := NewFromItems(1, 2, 3, 4, 5).
Transform(Limit(rate.Limit(10), 20))
result, err := Collect[int](ctx, flow)
if err != nil {
log.Fatal(err)
}
func Map ¶
Map creates a new Transform component that transforms items using the provided function. Each input item is transformed from type T to type U using function fn.
func OmitIf ¶
OmitIf creates a new Transform that selectively excludes items from the Stream based on the provided predicate function. It is the inverse of KeepIf.
Example:
// Skip negative numbers
flow := NewFromItems(-2, -1, 0, 1, 2).
Transform(OmitIf(func(ctx context.Context, n int) bool {
return n < 0
}))
result, err := Collect[int](ctx, flow)
if err != nil {
log.Fatal(err)
}
fmt.Println(result) // [0, 1, 2]
func ParallelMap ¶
ParallelMap creates a new Transform that transforms items in parallel using multiple workers. Each input item is transformed from type T to type U using function fn, with processing distributed across n concurrent workers.
Example:
// Process numbers in parallel
flow := NewFromItems(1, 2, 3, 4, 5).
Transform(ParallelMap[int, int](3, func(ctx context.Context, n int) (int, error) {
time.Sleep(time.Second) // Simulate work
return n * 2, nil
}))
result, err := Collect[int](ctx, flow)
if err != nil {
log.Fatal(err)
}
fmt.Println(result) // [2, 4, 6, 8, 10] <- order is not guaranteed
func Pass ¶
func Pass() Transform
Pass creates a new Transform component that forwards items without modification. This can be useful for debugging or when you need to maintain the Flow structure without processing.
func Reduce ¶
Reduce creates a new Transform component that combines multiple items into one using the provided reduction function. Each input item is combined with the accumulator to produce a new accumulated value.
Example:
// Join strings with separator
flow := NewFromItems("Hello", "World", "!").
Transform(
Reduce(func(ctx context.Context, acc, item string) (string, error) {
if acc == "" {
return item, nil
}
return acc + " " + item, nil
}),
)
result, err := Collect[string](ctx, flow)
// Result: ["Hello", "Hello World", "Hello World !"]
func Skip ¶
Skip creates a new Transform that skips the first n items from the input stream and forwards all remaining items. If n is less than or equal to 0, no items are dropped.
Example:
// Skip first 2 items and process the rest
f := NewFromItems(1, 2, 3, 4, 5).
Transform(Skip(2))
result, err := Collect[int](ctx, f)
if err != nil {
log.Fatal(err)
}
fmt.Println(result) // [3, 4, 5]
func SlidingWindow ¶
func SlidingWindow[T any](opts ...func(*SlidingWindowOptions)) Transform
SlidingWindow creates a new Transform component that groups items using a sliding window. The window moves forward by StepSize items after each batch is emitted.
Example:
// Create windows of size 3, sliding by 1
flow := NewFromItems(1, 2, 3, 4, 5).
Transform(SlidingWindow[int](func(opts *SlidingWindowOptions) {
opts.WindowSize = 3
opts.StepSize = 1
}))
result, err := Collect[[]int](ctx, flow)
if err != nil {
log.Fatal(err)
}
fmt.Println(result) // [[1, 2, 3], [2, 3, 4], [3, 4, 5]]