xp

package module
v0.1.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 31, 2024 License: MIT Imports: 8 Imported by: 0

README

XP

tag Go Version GoDoc Go report codecov

Because Windows XP was the best Microsoft Windows OS.

Windows are fundamental to processing infinite data streams. They partition the stream into finite "buckets" that enable computational analysis. This library implements basic windowing functionality for streams using NoSQL database components, including a sequential record format (recordio), a sorted string table (sstable), and a write-ahead log (wal). I developed this library to enable stream windowing without requiring complex frameworks like Apache Beam or Apache Flink.

Features

  • Stream Data Management Using Windows: Efficiently processes and organizes streaming data through window-based segmentation
  • Streamlined Implementation: Maintains simplicity in design and execution

Installation

go install github.com/davidvella/xp

Usage

A basic example:

import (
	"context"
	"fmt"
	"iter"
	"os"
	"time"

	"github.com/davidvella/xp"
	"github.com/davidvella/xp/handler"
	"github.com/davidvella/xp/partition"
	"github.com/davidvella/xp/partition/strategy/messagecount"
	"github.com/davidvella/xp/storage/local"
)

// ExampleProcessor demonstrates how to use the Processor type.
func ExampleProcessor() {
	pendingDir, err := os.MkdirTemp("", "pending-*")
	if err != nil {
		fmt.Printf("Failed to create temp dir: %v\n", err)
		return
	}
	defer os.Remove(pendingDir)

	publishedDir, err := os.MkdirTemp("", "published-*")
	if err != nil {
		fmt.Printf("Failed to create temp dir: %v\n", err)
		return
	}
	defer os.Remove(publishedDir)

	// Create storages
	storage := local.NewLocalStorage(pendingDir, publishedDir)

	// Create a handler that processes records
	h := handler.Func(func(_ context.Context, _ string, seq iter.Seq[partition.Record]) error {
		for rec := range seq {
			fmt.Printf("Processing record: %s\n", rec.GetID())
		}
		return nil
	})

	// Create a new processor with round-robin strategy
	proc, err := xp.NewProcessor(
		storage,
		storage,
		h,
		xp.WithStrategy(messagecount.NewStrategy(2)),
		xp.WithMaxConcurrency(4), // Process up to 4 records concurrently
		xp.WithPollInterval(250*time.Millisecond),
	)
	if err != nil {
		fmt.Printf("Failed to create processor: %v\n", err)
		return
	}

	// Handle some records
	ctx := context.Background()
	err = proc.Handle(ctx, &partition.RecordImpl{
		Data:         []byte("record1"),
		ID:           "t1",
		PartitionKey: "same",
		Timestamp:    time.Unix(1000, 0),
	})
	if err != nil {
		return
	}
	err = proc.Handle(ctx, &partition.RecordImpl{
		Data:         []byte("record2"),
		ID:           "t2",
		PartitionKey: "same",
		Timestamp:    time.Unix(1001, 0),
	})
	if err != nil {
		return
	}

	time.Sleep(time.Millisecond * 500)

	// Gracefully shutdown the processor
	if err := proc.Stop(); err != nil {
		fmt.Printf("Failed to stop processor: %v\n", err)
		return
	}

	// Output:
	// Processing record: t1
	// Processing record: t2
}

Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Why "XP"?

Named after Windows XP, arguably the most beloved Microsoft Windows operating system ever released.

Author

David Vella - @davidvella

Acknowledgments

  • Inspired by the legendary Windows XP
  • Use a go implementation of a tournament tree

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(*options)

Option is a function that configures the processor options.

func WithMaxConcurrency

func WithMaxConcurrency(m int) Option

WithMaxConcurrency sets the maximum number of concurrent file processors.

func WithPollInterval

func WithPollInterval(interval time.Duration) Option

WithPollInterval sets the poll interval for the consumer.

func WithStrategy

func WithStrategy(strategy partition.Strategy) Option

WithStrategy sets the partition strategy.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor manages both writing and consuming of records.

Example

ExampleProcessor demonstrates how to use the Processor type.

package main

import (
	"context"
	"fmt"
	"iter"
	"os"
	"time"

	"github.com/davidvella/xp"
	"github.com/davidvella/xp/handler"
	"github.com/davidvella/xp/partition"
	"github.com/davidvella/xp/partition/strategy/messagecount"
	"github.com/davidvella/xp/storage/local"
)

func main() {
	pendingDir, err := os.MkdirTemp("", "pending-*")
	if err != nil {
		fmt.Printf("Failed to create temp dir: %v\n", err)
		return
	}
	defer os.Remove(pendingDir)

	publishedDir, err := os.MkdirTemp("", "published-*")
	if err != nil {
		fmt.Printf("Failed to create temp dir: %v\n", err)
		return
	}
	defer os.Remove(publishedDir)

	// Create storages
	storage := local.NewLocalStorage(pendingDir, publishedDir)

	// Create a handler that processes records
	h := handler.Func(func(_ context.Context, _ string, seq iter.Seq[partition.Record]) error {
		for rec := range seq {
			fmt.Printf("Processing record: %s\n", rec.GetID())
		}
		return nil
	})

	// Create a new processor with round-robin strategy
	proc, err := xp.NewProcessor(
		storage,
		storage,
		h,
		xp.WithStrategy(messagecount.NewStrategy(2)),
		xp.WithMaxConcurrency(4), // Process up to 4 records concurrently
		xp.WithPollInterval(250*time.Millisecond),
	)
	if err != nil {
		fmt.Printf("Failed to create processor: %v\n", err)
		return
	}

	// Handle some records
	ctx := context.Background()
	err = proc.Handle(ctx, &partition.RecordImpl{
		Data:         []byte("record1"),
		ID:           "t1",
		PartitionKey: "same",
		Timestamp:    time.Unix(1000, 0),
	})
	if err != nil {
		return
	}
	err = proc.Handle(ctx, &partition.RecordImpl{
		Data:         []byte("record2"),
		ID:           "t2",
		PartitionKey: "same",
		Timestamp:    time.Unix(1001, 0),
	})
	if err != nil {
		return
	}

	time.Sleep(time.Millisecond * 500)

	// Gracefully shutdown the processor
	if err := proc.Stop(); err != nil {
		fmt.Printf("Failed to stop processor: %v\n", err)
		return
	}

}
Output:

Processing record: t1
Processing record: t2

func NewProcessor

func NewProcessor(
	producerStorage processor.Storage,
	consumerStorage consumer.Storage,
	handler handler.Handler,
	opts ...Option,
) (*Processor, error)

NewProcessor creates a new processor instance with the given storage and handler.

func (*Processor) Handle

func (p *Processor) Handle(ctx context.Context, record partition.Record) error

Handle processes a single record through the producer.

func (*Processor) Stop

func (p *Processor) Stop() error

Stop gracefully shuts down both producer and consumer.

Directories

Path Synopsis
Package compactor implements streaming compaction of multiple sorted sequences into a single SSTable.
Package compactor implements streaming compaction of multiple sorted sequences into a single SSTable.
Package consumer provides functionality for processing published WAL (Write-Ahead Log) files in a concurrent and controlled manner.
Package consumer provides functionality for processing published WAL (Write-Ahead Log) files in a concurrent and controlled manner.
Package loser implements a tournament tree (also known as a loser tree) for efficiently merging multiple sorted sequences.
Package loser implements a tournament tree (also known as a loser tree) for efficiently merging multiple sorted sequences.
Package priority implements a generic priority queue data structure that maintains a collection of key-value pairs ordered by priority.
Package priority implements a generic priority queue data structure that maintains a collection of key-value pairs ordered by priority.
Package recordio implements a binary record format for storing and retrieving partition.Record instances.
Package recordio implements a binary record format for storing and retrieving partition.Record instances.
Package sstable implements a Sorted String Table (SSTable) for efficient storage and retrieval of key-value pairs.
Package sstable implements a Sorted String Table (SSTable) for efficient storage and retrieval of key-value pairs.
storage
Package wal implements a Write-Ahead Log (WAL) for durable and atomic record storage.
Package wal implements a Write-Ahead Log (WAL) for durable and atomic record storage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL