Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Option ¶
type Option func(*options)
Option is a function that configures the processor options.
func WithMaxConcurrency ¶
WithMaxConcurrency sets the maximum number of concurrent file processors.
func WithPollInterval ¶
WithPollInterval sets the poll interval for the consumer.
func WithStrategy ¶
WithStrategy sets the partition strategy.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor manages both writing and consuming of records.
Example ¶
ExampleProcessor demonstrates how to use the Processor type.
package main
import (
"context"
"fmt"
"iter"
"os"
"time"
"github.com/davidvella/xp"
"github.com/davidvella/xp/handler"
"github.com/davidvella/xp/partition"
"github.com/davidvella/xp/partition/strategy/messagecount"
"github.com/davidvella/xp/storage/local"
)
func main() {
pendingDir, err := os.MkdirTemp("", "pending-*")
if err != nil {
fmt.Printf("Failed to create temp dir: %v\n", err)
return
}
defer os.Remove(pendingDir)
publishedDir, err := os.MkdirTemp("", "published-*")
if err != nil {
fmt.Printf("Failed to create temp dir: %v\n", err)
return
}
defer os.Remove(publishedDir)
// Create storages
storage := local.NewLocalStorage(pendingDir, publishedDir)
// Create a handler that processes records
h := handler.Func(func(_ context.Context, _ string, seq iter.Seq[partition.Record]) error {
for rec := range seq {
fmt.Printf("Processing record: %s\n", rec.GetID())
}
return nil
})
// Create a new processor with round-robin strategy
proc, err := xp.NewProcessor(
storage,
storage,
h,
xp.WithStrategy(messagecount.NewStrategy(2)),
xp.WithMaxConcurrency(4), // Process up to 4 records concurrently
xp.WithPollInterval(250*time.Millisecond),
)
if err != nil {
fmt.Printf("Failed to create processor: %v\n", err)
return
}
// Handle some records
ctx := context.Background()
err = proc.Handle(ctx, &partition.RecordImpl{
Data: []byte("record1"),
ID: "t1",
PartitionKey: "same",
Timestamp: time.Unix(1000, 0),
})
if err != nil {
return
}
err = proc.Handle(ctx, &partition.RecordImpl{
Data: []byte("record2"),
ID: "t2",
PartitionKey: "same",
Timestamp: time.Unix(1001, 0),
})
if err != nil {
return
}
time.Sleep(time.Millisecond * 500)
// Gracefully shutdown the processor
if err := proc.Stop(); err != nil {
fmt.Printf("Failed to stop processor: %v\n", err)
return
}
}
Output: Processing record: t1 Processing record: t2
func NewProcessor ¶
func NewProcessor( producerStorage processor.Storage, consumerStorage consumer.Storage, handler handler.Handler, opts ...Option, ) (*Processor, error)
NewProcessor creates a new processor instance with the given storage and handler.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package compactor implements streaming compaction of multiple sorted sequences into a single SSTable.
|
Package compactor implements streaming compaction of multiple sorted sequences into a single SSTable. |
|
Package consumer provides functionality for processing published WAL (Write-Ahead Log) files in a concurrent and controlled manner.
|
Package consumer provides functionality for processing published WAL (Write-Ahead Log) files in a concurrent and controlled manner. |
|
Package loser implements a tournament tree (also known as a loser tree) for efficiently merging multiple sorted sequences.
|
Package loser implements a tournament tree (also known as a loser tree) for efficiently merging multiple sorted sequences. |
|
Package priority implements a generic priority queue data structure that maintains a collection of key-value pairs ordered by priority.
|
Package priority implements a generic priority queue data structure that maintains a collection of key-value pairs ordered by priority. |
|
Package recordio implements a binary record format for storing and retrieving partition.Record instances.
|
Package recordio implements a binary record format for storing and retrieving partition.Record instances. |
|
Package sstable implements a Sorted String Table (SSTable) for efficient storage and retrieval of key-value pairs.
|
Package sstable implements a Sorted String Table (SSTable) for efficient storage and retrieval of key-value pairs. |
|
storage
|
|
|
Package wal implements a Write-Ahead Log (WAL) for durable and atomic record storage.
|
Package wal implements a Write-Ahead Log (WAL) for durable and atomic record storage. |
Click to show internal directories.
Click to hide internal directories.