flow

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2025 License: MIT Imports: 7 Imported by: 0

README

flow

Test GoDoc Release

Flow is a Go library that provides primitives for building flow-based processing pipelines. It implements a pull-based streaming model that enables efficient data processing through composable components. The library allows you to create, transform, and consume data streams with built-in support for error handling, context cancellation, and type safety using Go generics.

Key Features

  • Pull-based streaming model for efficient data processing
  • Context-aware processing with cancellation support
  • Built-in error handling and propagation
  • Type-safe data processing using Go generics
  • Composable pipeline components
  • Support for both synchronous and asynchronous processing
  • Rich set of source constructors:
    • Create flows from slices, channels, iterables, time-based events, and numeric ranges
    • Merge multiple sources into a single flow
    • Clone streams for parallel processing
  • Comprehensive transformation operations:
    • Map: Transform data items
    • Filter/KeepIf/OmitIf: Include/exclude data based on predicates
    • Reduce: Aggregate data
    • FlatMap/Flatten: Expand data streams
    • Chunk: Group items into fixed-size chunks
    • SlidingWindow: Create overlapping windows of data
    • KeepDistinct: Remove duplicates
    • Limit: Apply rate limiting
    • Keep/Omit/KeepFirst/KeepLast: Control which items are processed
  • Flexible data sinks:
    • Collect results into slices
    • Send data to channels
    • Fan-out to distribute items to multiple flows
    • Discard items when only side effects matter

Installation

go get github.com/nisimpson/flow

Examples

Basic Flow Creation
package main

import (
    "context"
    "fmt"

    "github.com/nisimpson/flow"
)

func main() {
    // Create a flow from a slice of integers
    f := flow.NewFromItems(1, 2, 3, 4, 5)

    // Create a context
    ctx := context.Background()

    // Consume the flow and print each item
    for item := range f.Stream(ctx) {
        fmt.Println(item)
    }
}
Transforming Data with Map, Filter, and Reduce
package main

import (
    "context"
    "fmt"
    "strconv"

    "github.com/nisimpson/flow"
)

func main() {
    ctx := context.Background()

    // Create a flow and apply transformations
    f := flow.NewFromItems(1, 2, 3, 4, 5).Transform(
        // Double each number -> [2, 4, 6, 8, 10]
        flow.Map(func(ctx context.Context, n int) (int, error) {
            return n * 2, nil
        }),
        // Keep only numbers greater than 5 -> [6, 8, 10]
        flow.KeepIf(func(ctx context.Context, n int) bool {
            return n > 5
        }),
        // Reduce to a rolling sum of each value -> [6, 14, 24]
        flow.Reduce(func(ctx context.Context, acc, item int) (int, error) {
            return acc + item, nil
        }),
        // Keep the last value -> [24]
        flow.KeepLast(),
        // Convert to string -> ["24"]
        flow.Map(func(ctx context.Context, n int) (string, error) {
            return strconv.Itoa(n), nil
        }),
    )

    // Collect results into a slice
    results, err := flow.Collect[string](ctx, f)
    if err != nil {
        fmt.Printf("Error: %v\n", err)
        return
    }

    fmt.Println(results) // ["24"]
}
Working with Channels
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/nisimpson/flow"
)

func main() {
    // Create a channel and send values to it
    ch := make(chan int)
    go func() {
        defer close(ch)
        for i := 1; i <= 5; i++ {
            ch <- i
            time.Sleep(100 * time.Millisecond)
        }
    }()

    // Create a flow from the channel
    f := flow.NewFromChannel(ch)

    // Create a context with timeout
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    // Process the flow
    for item := range f.Stream(ctx) {
        fmt.Printf("Received: %d\n", item)
    }
}
Chunking and Windowing
package main

import (
    "context"
    "fmt"

    "github.com/nisimpson/flow"
)

func main() {
    ctx := context.Background()

    // Create a flow with numbers 1 through 10
    f := flow.NewFromRange(1, 11, 1)

    // Group into chunks of 3
    chunks, err := flow.Collect[[]int](ctx, f.Transform(flow.Chunk[int](3)))
    if err != nil {
        fmt.Printf("Error: %v\n", err)
        return
    }
    fmt.Println("Chunks:", chunks) // [[1 2 3], [4 5 6], [7 8 9], [10]]

    // Create sliding windows of size 3, stepping by 1
    windows, err := flow.Collect[[]int](ctx, flow.NewFromRange(1, 8, 1).Transform(
        flow.SlidingWindow[int](func(opts *flow.SlidingWindowOptions) {
            opts.WindowSize = 3
            opts.StepSize = 1
        }),
    ))
    if err != nil {
        fmt.Printf("Error: %v\n", err)
        return
    }
    fmt.Println("Windows:", windows) // [[1 2 3], [2 3 4], [3 4 5], [4 5 6], [5 6 7]]
}
Error Handling
package main

import (
    "context"
    "errors"
    "fmt"

    "github.com/nisimpson/flow"
)

func main() {
    ctx := context.Background()

    // Create a flow with a transformation that might fail
    f := flow.NewFromItems(1, 2, 0, 4, 5).Transform(
        flow.Map(func(ctx context.Context, n int) (int, error) {
            if n == 0 {
                return 0, errors.New("division by zero")
            }
            return 10 / n, nil
        }),
    )

    // Collect results and handle errors
    results, err := flow.Collect[int](ctx, f)
    if err != nil {
        fmt.Printf("Error occurred: %v\n", err)
    } else {
        fmt.Println("Results:", results)
    }
}
Fan-Out and Fan-In
package main

import (
    "context"
    "fmt"

    "github.com/nisimpson/flow"
)

func main() {
    ctx := context.Background()

    // Create a flow with numbers
    f := flow.NewFromItems(1, 2, 3, 4, 5, 6)

    // Create a fan-out sink to separate odd and even numbers
    sink := &flow.FanOutSink[int]{
        Key: func(ctx context.Context, n int) string {
            if n%2 == 0 {
                return "even"
            }
            return "odd"
        },
    }

    // Execute the flow with the fan-out sink
    err := f.Collect(ctx, sink)
    if err != nil {
        fmt.Printf("Error: %v\n", err)
        return
    }

    // Process each source separately
    for _, src := range sink.Sources() {
        items, _ := flow.Collect[int](ctx, src)
        fmt.Println("Source items:", items)
    }

    // Fan-in: Merge multiple flows
    f1 := flow.NewFromItems(1, 3, 5)
    f2 := flow.NewFromItems(2, 4, 6)
    merged := flow.Merge(f1, f2)

    mergedItems, _ := flow.Collect[int](ctx, merged)
    fmt.Println("Merged items:", mergedItems)
}
Rate Limiting
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/nisimpson/flow"
    "golang.org/x/time/rate"
)

func main() {
    ctx := context.Background()

    // Create a flow with rate limiting (2 items per second)
    f := flow.NewFromItems(1, 2, 3, 4, 5).Transform(
        flow.Limit(rate.Every(time.Millisecond), 1),
    )

    start := time.Now()

    // Process the flow
    for item := range f.Stream(ctx) {
        fmt.Printf("Processed %d after %v\n", item, time.Since(start))
    }
}
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/nisimpson/flow"
)

// BatchDataIterator demonstrates fetching data in batches
type BatchDataIterator struct {
    currentBatch int
    batchSize   int
    maxBatches  int
    closed      bool
}

func (bi *BatchDataIterator) HasNext(ctx context.Context) bool {
    if bi.closed {
        return false
    }
    return bi.currentBatch < bi.maxBatches
}

func (bi *BatchDataIterator) Next(ctx context.Context) (any, error) {
    if bi.closed {
        return nil, fmt.Errorf("iterator is closed")
    }
    if bi.currentBatch >= bi.maxBatches {
        return nil, fmt.Errorf("no more batches")
    }

    // Simulate fetching a batch of data
    time.Sleep(100 * time.Millisecond)
    batch := fmt.Sprintf("Batch %d", bi.currentBatch)
    bi.currentBatch++
    return batch, nil
}

func (bi *BatchDataIterator) Close(ctx context.Context) {
    bi.closed = true
}

func main() {
    iterator := &BatchDataIterator{
        batchSize:   10,
        maxBatches:  5,
    }

    // Create a flow and process batches
    f := flow.NewFromIterable(iterator).
        Transform(
            flow.Map(func(ctx context.Context, batch string) (string, error) {
            return fmt.Sprintf("Processed %s", batch), nil
            }),
        )

    // Process results
    ctx := context.Background()
    for item := range f.Stream(ctx) {
        fmt.Println(item)
    }
}

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Overview

Package flow provides primitives for building flow-based processing pipelines. It implements a pull-based streaming model that enables efficient data processing through composable components.

Core Concepts

  • A Stream represents a sequence of values that can be iterated over using a pull-based model. Consumers request values through a yield function, and the stream continues producing values until either the source is exhausted or the consumer stops requesting values.

  • A Source is the fundamental building block for creating data pipelines. It provides a single method Out that returns a Stream of values. Sources can be created from various inputs including slices, channels, iterables, and time-based events.

  • A Flow represents a factory function that creates a Source. Flows provide a way to create reusable pipeline components that can be composed together. They support operations like merging multiple sources and cloning streams.

Key Features

  • Context-aware processing with cancellation support
  • Built-in error handling and propagation
  • Type-safe data processing
  • Composable pipeline components
  • Support for both synchronous and asynchronous processing

Common Source Constructors

  • NewFromItems: Creates a Flow from a variadic list of items
  • NewFromChannel: Creates a Flow from a channel
  • NewFromIterable: Creates a Flow from an Iterable source
  • NewFromTicker: Creates a Flow that emits timestamps at specified intervals
  • NewFromRange: Creates a Flow that emits a sequence of integers
  • Merge: Combines multiple sources into a single Flow

Example Usage

Creating a simple stream:

stream := func(yield func(any) bool) {
    for i := 0; i < 5; i++ {
        if !yield(i) {
            return // Stop if consumer requests
        }
    }
}

Creating and using a Flow:

// Create a Flow that emits numbers 1 through 5
flow := NewFromSourceFunc(func(ctx context.Context) Stream {
    return func(yield func(any) bool) {
        for i := 1; i <= 5; i++ {
            if ctx.Err() != nil {
                return
            }
            if !yield(i) {
                return
            }
        }
    }
})

// Use the flow
ctx := context.Background()
for item := range flow.Stream(ctx) {
    fmt.Println(item)
}

Error Handling

The package includes built-in error handling through context. Errors can be set using the SetError function and will be propagated through the pipeline:

if err != nil {
    SetError(ctx, err)
    continue
}

Thread Safety

All Flow operations are designed to be thread-safe when used with proper context cancellation. Multiple goroutines can safely consume from the same Flow using Clone() to create independent copies of the stream.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Collect

func Collect[T any](ctx context.Context, src Source) ([]T, error)

Collect is a convenience function that collects all items from a Source into a slice.

func SetError

func SetError(ctx context.Context, err error)

SetError adds an error to the FlowError stored in the context.

Types

type ChannelSink

type ChannelSink[T any] chan<- T

ChannelSink is a Sinker sends items from a Stream to a typed channel. It provides type-safe forwarding of items while respecting context cancellation.

func NewChannelSink

func NewChannelSink[T any](ch chan<- T) ChannelSink[T]

NewChannelSink creates a new ChannelSink from the provided channel.

func (ChannelSink[T]) Collect

func (c ChannelSink[T]) Collect(ctx context.Context, in Stream)

Collect implements the Sinker interface, sending items from the Stream to the underlying channel until there are no more items present or the provided context is cancelled.

type FanOutSink

type FanOutSink[T any] struct {
	// Flows maps keys to flow transformation functions. Each key represents a partition
	// and its associated function defines how items in that partition should be processed.
	Flows map[string]func(Source) Flow

	// Key determines the partition key for each item. It receives the context
	// and the item, returning a string key that determines which flow will process the item.
	Key func(context.Context, T) string
	// contains filtered or unexported fields
}

FanOutSink is a Sinker that distributes items from a Stream into multiple flows based on a key function. It allows for dynamic partitioning of data streams based on item characteristics.

func (*FanOutSink[T]) Collect

func (f *FanOutSink[T]) Collect(ctx context.Context, in Stream)

In implements the Sinker interface. It partitions incoming items based on the key function and creates a separate Source for each partition.

func (FanOutSink[T]) Source

func (f FanOutSink[T]) Source(key string) Source

Source returns the Source associated with the given key after partitioning. If no Source exists for the key, an empty stream source is returned.

func (FanOutSink[T]) Sources

func (f FanOutSink[T]) Sources() []Source

Sources returns the list of Source partitions created during processing.

type Flow

type Flow func() Source

Flow represents a factory function that creates a Source. It provides a way to create reusable pipeline components that can be composed together.

func Merge

func Merge(sources ...Source) Flow

Merge combines multiple sources into a single Flow. It processes each Source sequentially, emitting all values from one source before moving to the next.

The function will process sources in order until either:

  • All sources are exhausted
  • The context is cancelled
  • The consumer stops accepting values

func NewFromChannel

func NewFromChannel[T any](ch <-chan T) Flow

NewFromChannel creates a Flow from a channel. It provides a way to convert any typed channel for use in flow-based processing pipelines.

The flow will continue reading from the channel until it is closed or the context is cancelled.

Example:

// Create a channel and flow
ch := make(chan int)
flow := NewFromChannel(ch)

// Send values in a separate goroutine
go func() {
    defer close(ch)
    for i := 1; i <= 5; i++ {
        ch <- i
    }
}()

// Process values from the flow
ctx := context.Background()
for item := range flow.Stream(ctx) {
    fmt.Println(item)
}

func NewFromItems

func NewFromItems[T any](items ...T) Flow

NewFromItems creates a Flow from a variadic list of items. It provides a convenient way to create a flow from a known set of values of any type T.

Example:

// Create a Flow from integers
flow := NewFromItems(1, 2, 3, 4, 5)

// Use the flow
ctx := context.Background()
for item := range flow.Stream(ctx) {
    fmt.Println(item)
}

func NewFromIterable

func NewFromIterable(it Iterable) Flow

NewFromIterable creates a Flow from an Iterable source. It handles the conversion of the iteration-based interface to a stream-based flow.

func NewFromRange

func NewFromRange(start, end, step int) Flow

NewFromRange creates a Flow that emits a sequence of integers from start (inclusive) to end (exclusive), incrementing by the specified step value.

The function will stop emitting values when either:

  • The sequence reaches or exceeds the end value
  • The context is cancelled
  • The consumer stops accepting values

func NewFromSource

func NewFromSource(s Source) Flow

NewFromSource creates a Flow from an existing Source, allowing it to be used in flow-based processing pipelines.

Example:

// Create a simple source
source := SourceFunc(func(ctx context.Context) Stream {
    return func(yield func(any) bool) {
        for i := 1; i <= 3; i++ {
            if ctx.Err() != nil {
                return
            }
            if !yield(i) {
                return
            }
        }
    }
})

// Create a Flow from the source
flow := NewFromSource(source)

// Use the flow in a pipeline
ctx := context.Background()
for item := range flow.Stream(ctx) {
    fmt.Println(item)
}

func NewFromSourceFunc

func NewFromSourceFunc(fn func(context.Context) Stream) Flow

NewFromSourceFunc creates a Flow from a function that produces a Stream. This is a convenience function that wraps a stream-producing function into a flow, making it easier to create flows without explicitly implementing the Source interface.

Example:

// Create a Flow that emits numbers 1 through 5
flow := NewFromSourceFunc(func(ctx context.Context) Stream {
    return func(yield func(any) bool) {
        for i := 1; i <= 5; i++ {
            // Check for context cancellation
            if ctx.Err() != nil {
                return
            }
            // Emit value, stop if consumer requests
            if !yield(i) {
                return
            }
        }
    }
})

// Use the flow
ctx := context.Background()
for item := range flow.Stream(ctx) {
    fmt.Println(item)
}

func NewFromTicker

func NewFromTicker(interval time.Duration) Flow

NewFromTicker creates a Flow that emits timestamps at specified intervals. It continues to emit timestamps until either the context is cancelled or the consumer stops accepting values.

The timestamps are of type time.Time and represent the actual time when the interval elapsed. Note that the actual intervals may be slightly longer than specified due to system scheduling and processing delays.

func (Flow) Clone

func (fn Flow) Clone() Flow

Clone creates a new Flow that produces the same values as the original.

func (Flow) Collect

func (fn Flow) Collect(ctx context.Context, sink Sinker) error

Collect executes the Flow by sending its output to the provided Sinker. It handles error propagation and context cancellation while processing the Stream.

The method will stop processing when either:

  • The flow completes successfully
  • The context is cancelled
  • An error occurs during processing

func (Flow) Stream

func (fn Flow) Stream(ctx context.Context) Stream

Stream implements Source, returning the generated Stream from the Flow using the provided context.

func (Flow) Transform

func (fn Flow) Transform(pipes ...Transform) Flow

Transform applies a series of transformations to the Flow by joining the provided transformers and creating a new Flow with the combined Transform.

Example:

f := flow.NewFromItems(1, 2, 3, 4).Transform(
  Map(func(ctx context.Context, n int) (int, error) { return n * 2, nil }),
  Filter(func(ctx context.Context, n int) bool { return n > 5 }),
)

type FlowError

type FlowError struct {
	sync.Mutex // Embedded mutex for thread-safe operations
	// contains filtered or unexported fields
}

FlowError represents a collection of errors that occurred during Flow processing. It provides thread-safe error collection and aggregation capabilities.

func (*FlowError) Append

func (f *FlowError) Append(err error)

Append adds an error to the FlowError collection in a thread-safe manner. Nil errors won't contribute to the joined error text generated by FlowError.Error.

func (*FlowError) Error

func (f *FlowError) Error() string

Error implements the error interface, returning all collected errors joined together. Returns a string representation of all collected errors.

type Iterable

type Iterable interface {
	// HasNext checks if there are more items available in the sequence.
	// The context parameter allows for cancellation of the check operation.
	HasNext(context.Context) bool

	// Next retrieves the next item in the sequence.
	// Returns the next item and any error that occurred during retrieval.
	// The context parameter allows for cancellation of the retrieval operation.
	Next(context.Context) (any, error)

	// Close performs cleanup of any resources used by the Iterable.
	// The context parameter allows for cancellation of the cleanup operation.
	Close(context.Context)
}

Iterable represents a sequence of values that can be traversed. It provides methods for checking if more values exist, retrieving the next value, and cleaning up resources.

Example:

type DBIterator struct {
	rows    *sql.Rows
	closed  bool
}

func NewDBIterator(db *sql.DB, query string) (*DBIterator, error) {
	rows, err := db.Query(query)
	if err != nil {
			return nil, err
	}
	return &DBIterator{rows: rows}, nil
}

func (dbi *DBIterator) HasNext(ctx context.Context) bool {
	if dbi.closed {
			return false
	}
	return dbi.rows.Next()
}

func (dbi *DBIterator) Next(ctx context.Context) (any, error) {
	var data any
	err := dbi.rows.Scan(&data)
	return data, err
}

func (dbi *DBIterator) Close(ctx context.Context) {
	if !dbi.closed {
			dbi.rows.Close()
			dbi.closed = true
	}
}

type KeepDistinctOptions

type KeepDistinctOptions[T any] struct {
	// KeyFunction is a function that generates a string key for a given element.
	// This key is used to determine uniqueness. If two elements generate the
	// same key, they are considered duplicates.
	KeyFunction func(T) string
}

type Sinker

type Sinker interface {
	// Collect processes items from the provided [Stream] within the given context.
	// The implementation should respect context cancellation.
	Collect(ctx context.Context, stream Stream)
}

Sinker represents a destination for a Flow's output. It provides a way to consume and process values from a Stream.

func Discard

func Discard() Sinker

Discard creates a Sinker that consumes and discards all items from a Stream. It respects context cancellation while efficiently draining the stream without allocating memory for the items.

The function is useful for:

  • Draining streams when values are not needed
  • Testing flow processing without storing results
  • Completing flow processing when only side effects matter

type SinkerFunc

type SinkerFunc func(context.Context, Stream)

SinkerFunc is a function type that implements the Sinker interface, allowing simple functions to be used as Sinkers.

func (SinkerFunc) Collect

func (fn SinkerFunc) Collect(ctx context.Context, in Stream)

Collect implements the Sinker interface for SinkerFunc.

type SliceSink

type SliceSink[T any] struct {
	// contains filtered or unexported fields
}

SliceSink collects items from a Stream into a slice. It provides a type-safe way to accumulate items of type T into a slice while respecting context cancellation.

func NewSliceSink

func NewSliceSink[T any](s []T) *SliceSink[T]

NewSliceSink creates a new SliceSink with the provided initial slice. If nil is provided, a new empty slice will be created.

func (*SliceSink[T]) Collect

func (s *SliceSink[T]) Collect(ctx context.Context, in Stream)

Collect implements the Sinker interface. It collects items from the stream into the underlying slice, respecting context cancellation.

func (SliceSink[T]) Items

func (s SliceSink[T]) Items() []T

Items returns the underlying slice containing collected items.

type SlidingWindowOptions

type SlidingWindowOptions struct {
	WindowSize int // Sliding window size
	StepSize   int // Number of items to slide forward for each window
}

type Source

type Source interface {
	// Stream returns a [Stream] that produces values from this [Source]. The returned Stream
	// should continue producing values until either:
	//   - The source is exhausted
	//   - The provided context is cancelled
	//   - The consumer stops requesting values
	Stream(context.Context) Stream
}

Source defines an interface for producing streams of values. A Source is the fundamental building block for creating data pipelines. It provides a single method Out that returns a Stream of values.

type SourceFunc

type SourceFunc func(context.Context) Stream

SourceFunc is a function type that implements the Source interface. It allows regular functions to be used as Sources by implementing the [Source.Stream] method.

func (SourceFunc) Stream

func (fn SourceFunc) Stream(ctx context.Context) Stream

Stream implements the Source interface for SourceFunc.

type Stream

type Stream iter.Seq[any]

Stream represents a sequence of any values that can be iterated over. It uses a pull-based iteration model where consumers request values through a yield function. The Stream will continue producing values until either the source is exhausted or the yield function returns false. The underlying function of Stream has the following signature:

func(yield func(any) bool)

Example creating a simple stream:

stream := func(yield func(any) bool) {
  for i := 0; i < 5; i++ {
    if !yield(i) {
      return // Stop if consumer requests
    }
  }
}

Example consuming a stream using a range expression:

for item := range stream {
  fmt.Println(item)
  if item == 3 {
    break // Stop if condition is met
  }
}

func Empty

func Empty() Stream

Empty creates a Stream that produces no values. This can be useful as a default or placeholder source in situations where a no-op source is needed.

Example:

// Use Empty as a fallback flow
var flow Source
if someCondition {
    flow = NewFromItems(1, 2, 3)
} else {
    flow = Empty()
}

// The empty flow will produce no values
ctx := context.Background()
for item := range flow.Stream(ctx) {
    // This loop will not execute for Empty flow
}

func (Stream) Stream

func (s Stream) Stream(ctx context.Context) Stream

Stream implements Source, returning itself while ignoring context.

type Transform

type Transform func(Source) Source

Transform is a function type that transforms one Source into another Source. It represents a processing stage in a data pipeline that can modify, filter, or transform the data Stream.

func Chunk

func Chunk[T any](n int) Transform

Chunk creates a new Transform that groups items into fixed-size chunks. The last chunk may contain fewer items if the stream length is not divisible by n. Each chunk is emitted as a slice of items.

Example:

// Group numbers into chunks of 2
flow := NewFromItems(1, 2, 3, 4, 5).
    Transform(Chunk[int](2))

result, err := Collect[[]int](ctx, flow)
if err != nil {
    log.Fatal(err)
}
fmt.Println(result) // [[1, 2], [3, 4], [5]]

func Filter

func Filter[T any](fn func(context.Context, T) bool) Transform

Filter creates a new Transform that selectively includes items from the Stream based on the provided predicate function. Only items that cause the predicate to return true are included in the output stream.

Example:

// Keep only even numbers
flow := NewFromItems(1, 2, 3, 4, 5).
    Transform(Filter(func(ctx context.Context, n int) bool {
        return n%2 == 0
    }))

result, err := Collect[int](ctx, flow)
if err != nil {
    log.Fatal(err)
}
fmt.Println(result) // [2, 4]

func FlatMap

func FlatMap[T any, U any](fn func(context.Context, T) ([]U, error)) Transform

FlatMap creates a new Transform component that transforms items using the provided mapping function. Each input item is transformed into a slice of output items, which are then sent individually downstream.

func Flatten

func Flatten[T []U, U any]() Transform

Flatten creates a new Transform component that receives a batch of items and flattens it, sending each item from the upstream batch individually downstream. It is the semantic equivalent of:

FlatMap(func(in []T) []T { return in })

For example, to handle an upstream csv in indexed row order, call:

// flatten a 2-D array of strings
Flatten[[][]string]() // [][]string - Flatten -> []string

func Join

func Join(pipes ...Transform) Transform

Join combines multiple transformers into a single Transform by composing them in order. Each transformers's output becomes the input to the next transformer in the sequence.

Join returns Pass if no transformers are provided.

func Keep

func Keep(n int) Transform

Keep creates a new Transform that limits the stream to at most n items from the input stream. Once n items have been processed, any remaining input items are ignored.

Example:

// Keep first 3 items
f := NewFromItems(1, 2, 3, 4, 5).
    Transform(Keep(3))

results, err := Collect[int](ctx, f)
if err != nil {
    log.Fatal(err)
}
fmt.Println(results) // [1, 2, 3]

func KeepDistinct

func KeepDistinct[T any](opts ...func(*KeepDistinctOptions[T])) Transform

KeepDistinct creates a new Transform that filters out duplicate elements from the Stream. It accepts optional configuration functions to customize the Flow's behavior. By default, it uses the string representation of elements as the uniqueness key.

Example:

// Remove duplicate numbers
flow := NewFromItems(1, 2, 2, 3, 3, 4).
    Transform(KeepDistinct[int]())

result, err := Collect[int](ctx, flow)
if err != nil {
    log.Fatal(err)
}
fmt.Println(result) // [1, 2, 3, 4]

func KeepFirst

func KeepFirst() Transform

KeepFirst creates a new Transform that takes only the first item from the stream and terminates processing. It is semantically equivalent to Keep(1).

Example:

// Get first item from stream
f := NewFromItems(1, 2, 3, 4, 5).
    Transform(KeepFirst())

result, err := Collect[int](ctx, f)
if err != nil {
    log.Fatal(err)
}
fmt.Println(result) // [1]

func KeepIf

func KeepIf[T any](fn func(context.Context, T) bool) Transform

KeepIf creates a new Transform that selectively includes items from the stream based on the provided predicate function. It is an alias for Filter.

Example:

// Keep only positive numbers
flow := NewFromItems(-2, -1, 0, 1, 2).
    Transform(KeepIf(func(ctx context.Context, n int) bool {
        return n > 0
    }))

result, err := Collect[int](ctx, flow)
if err != nil {
    log.Fatal(err)
}
fmt.Println(result) // [1, 2]

func KeepLast

func KeepLast() Transform

KeepLast creates a new Transform that yields only the last item from the stream. If the stream is empty, no items are yielded.

Example:

// Get the last item from a stream of numbers
flow := NewFromItems(1, 2, 3, 4, 5).
    Transform(KeepLast())

result, err := Collect[int](ctx, f)
if err != nil {
    log.Fatal(err)
}
fmt.Println(result) // [5]

func Limit

func Limit(limit rate.Limit, burst int) Transform

Limit creates a new Transform that applies rate limiting to the stream using the token bucket algorithm. Items are processed according to the specified rate limit and burst size.

Example:

// Process items at a rate of 10 per second with max burst of 20
flow := NewFromItems(1, 2, 3, 4, 5).
    Transform(Limit(rate.Limit(10), 20))

result, err := Collect[int](ctx, flow)
if err != nil {
    log.Fatal(err)
}

func Map

func Map[T any, U any](fn func(context.Context, T) (U, error)) Transform

Map creates a new Transform component that transforms items using the provided function. Each input item is transformed from type T to type U using function fn.

func OmitIf

func OmitIf[T any](fn func(context.Context, T) bool) Transform

OmitIf creates a new Transform that selectively excludes items from the Stream based on the provided predicate function. It is the inverse of KeepIf.

Example:

// Skip negative numbers
flow := NewFromItems(-2, -1, 0, 1, 2).
    Transform(OmitIf(func(ctx context.Context, n int) bool {
        return n < 0
    }))

result, err := Collect[int](ctx, flow)
if err != nil {
    log.Fatal(err)
}
fmt.Println(result) // [0, 1, 2]

func ParallelMap

func ParallelMap[T any, U any](n int, fn func(context.Context, T) (U, error)) Transform

ParallelMap creates a new Transform that transforms items in parallel using multiple workers. Each input item is transformed from type T to type U using function fn, with processing distributed across n concurrent workers.

Example:

// Process numbers in parallel
flow := NewFromItems(1, 2, 3, 4, 5).
    Transform(ParallelMap[int, int](3, func(ctx context.Context, n int) (int, error) {
        time.Sleep(time.Second) // Simulate work
        return n * 2, nil
    }))

result, err := Collect[int](ctx, flow)
if err != nil {
    log.Fatal(err)
}
fmt.Println(result) // [2, 4, 6, 8, 10] <- order is not guaranteed

func Pass

func Pass() Transform

Pass creates a new Transform component that forwards items without modification. This can be useful for debugging or when you need to maintain the Flow structure without processing.

func Reduce

func Reduce[T any](fn func(ctx context.Context, acc T, item T) (T, error)) Transform

Reduce creates a new Transform component that combines multiple items into one using the provided reduction function. Each input item is combined with the accumulator to produce a new accumulated value.

Example:

// Join strings with separator
flow := NewFromItems("Hello", "World", "!").
    Transform(
        Reduce(func(ctx context.Context, acc, item string) (string, error) {
            if acc == "" {
                return item, nil
            }
            return acc + " " + item, nil
        }),
    )

result, err := Collect[string](ctx, flow)
// Result: ["Hello", "Hello World", "Hello World !"]

func Skip

func Skip(n int) Transform

Skip creates a new Transform that skips the first n items from the input stream and forwards all remaining items. If n is less than or equal to 0, no items are dropped.

Example:

// Skip first 2 items and process the rest
f := NewFromItems(1, 2, 3, 4, 5).
    Transform(Skip(2))

result, err := Collect[int](ctx, f)
if err != nil {
    log.Fatal(err)
}
fmt.Println(result) // [3, 4, 5]

func SlidingWindow

func SlidingWindow[T any](opts ...func(*SlidingWindowOptions)) Transform

SlidingWindow creates a new Transform component that groups items using a sliding window. The window moves forward by StepSize items after each batch is emitted.

Example:

// Create windows of size 3, sliding by 1
flow := NewFromItems(1, 2, 3, 4, 5).
    Transform(SlidingWindow[int](func(opts *SlidingWindowOptions) {
        opts.WindowSize = 3
        opts.StepSize = 1
    }))

result, err := Collect[[]int](ctx, flow)
if err != nil {
    log.Fatal(err)
}
fmt.Println(result) // [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL