mink

package module
v0.5.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

README

go-mink 🦫

A Comprehensive Event Sourcing & CQRS Toolkit for Go

Go Reference Go Report Card Build Status codecov License Go Version

Quality Gate Status Reliability Rating Security Rating Maintainability Rating

Bugs Vulnerabilities Technical Debt


🚀 Current Status: v0.5.0 (Phase 5 In Progress)

Phase 5 (Security & Advanced Patterns) progress:

  • ✅ Saga / Process Manager with compensation handling
  • ✅ Saga Store (PostgreSQL & Memory implementations)
  • ✅ Saga testing utilities
  • ✅ CLI tool with code generation & diagnostics (84.9% coverage)
  • 🔜 Outbox pattern for reliable messaging
  • 🔜 Field-level encryption (AWS KMS, HashiCorp Vault)
  • 🔜 GDPR compliance (crypto-shredding)

Previous phases included:

  • ✅ Event Store with optimistic concurrency (v0.1.0)
  • ✅ PostgreSQL & In-Memory adapters (v0.1.0)
  • ✅ Command Bus with middleware pipeline (v0.2.0)
  • ✅ Idempotency, Validation, Correlation tracking (v0.2.0)
  • ✅ Projection Engine & Read Models (v0.3.0)
  • ✅ Event subscriptions & checkpoint management (v0.3.0)
  • ✅ BDD testing fixtures, assertions, test containers (v0.4.0)
  • ✅ Prometheus metrics & OpenTelemetry tracing (v0.4.0)

What is go-mink?

go-mink is a batteries-included Event Sourcing and CQRS (Command Query Responsibility Segregation) library for Go. Inspired by MartenDB for .NET, go-mink brings the same developer-friendly experience to the Go ecosystem.

Why "go-mink"? Just as Marten (the animal) inspired the .NET library name, we chose go-mink - another member of the Mustelidae family - for our Go counterpart.

Vision

"Make Event Sourcing in Go as simple as using a traditional ORM"

go-mink aims to eliminate the boilerplate code typically required when implementing Event Sourcing in Go, while providing a pluggable architecture that allows teams to choose their preferred storage backends.

Key Features

Feature Status Description
🎯 Event Store ✅ v0.1.0 Append-only event storage with optimistic concurrency
🔌 PostgreSQL Adapter ✅ v0.1.0 Production-ready PostgreSQL support
🧪 Memory Adapter ✅ v0.1.0 In-memory adapter for testing
🧱 Aggregates ✅ v0.1.0 Base implementation with event application
📋 Command Bus ✅ v0.2.0 Full CQRS with command handlers and middleware
🔐 Idempotency ✅ v0.2.0 Prevent duplicate command processing
🔗 Correlation/Causation ✅ v0.2.0 Distributed tracing support
📖 Projections ✅ v0.3.0 Inline, async, and live projection engine
📊 Read Models ✅ v0.3.0 Generic repository with query builder
📡 Subscriptions ✅ v0.3.0 Catch-up and polling event subscriptions
🧪 Testing Utilities ✅ v0.4.0 BDD fixtures, assertions, test containers
📊 Observability ✅ v0.4.0 Prometheus metrics & OpenTelemetry tracing
📦 MessagePack ✅ v0.4.0 Alternative serializer for performance
🛠️ CLI Tool ✅ v0.5.0 Code generation, migrations, diagnostics (84.9% coverage)
Sagas ✅ v0.5.0 Process manager for long-running workflows
🔐 Security 🔜 v0.5.0 Field-level encryption and GDPR compliance
📤 Outbox Pattern 🔜 v0.5.0 Reliable event publishing to external systems

Quick Example

package main

import (
    "context"
    
    "github.com/AshkanYarmoradi/go-mink"
    "github.com/AshkanYarmoradi/go-mink/adapters/postgres"
)

func main() {
    ctx := context.Background()
    
    // Initialize PostgreSQL adapter
    adapter, _ := postgres.NewAdapter("postgres://localhost/mydb")
    defer adapter.Close()
    
    // Create event store
    store := mink.New(adapter)
    
    // Create and populate an aggregate
    order := NewOrder("order-123")
    order.Create("customer-456")
    order.AddItem("SKU-001", 2, 29.99)
    
    // Save aggregate (events are persisted)
    store.SaveAggregate(ctx, order)
    
    // Load aggregate (events are replayed)
    loaded := NewOrder("order-123")
    store.LoadAggregate(ctx, loaded)
}

CQRS with Command Bus (v0.2.0)

package main

import (
    "context"
    
    "github.com/AshkanYarmoradi/go-mink"
    "github.com/AshkanYarmoradi/go-mink/adapters/memory"
)

// Define a command
type CreateOrder struct {
    mink.CommandBase
    CustomerID string `json:"customerId"`
}

func (c CreateOrder) CommandType() string { return "CreateOrder" }
func (c CreateOrder) Validate() error {
    if c.CustomerID == "" {
        return mink.NewValidationError("CreateOrder", "CustomerID", "required")
    }
    return nil
}

func main() {
    ctx := context.Background()
    
    // Create command bus with middleware
    bus := mink.NewCommandBus()
    bus.Use(mink.ValidationMiddleware())
    bus.Use(mink.RecoveryMiddleware())
    bus.Use(mink.CorrelationIDMiddleware(nil))
    
    // Add idempotency (prevents duplicate processing)
    idempotencyStore := memory.NewIdempotencyStore()
    bus.Use(mink.IdempotencyMiddleware(mink.DefaultIdempotencyConfig(idempotencyStore)))
    
    // Register command handler
    bus.RegisterFunc("CreateOrder", func(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
        c := cmd.(CreateOrder)
        // Process command...
        return mink.NewSuccessResult("order-123", 1), nil
    })
    
    // Dispatch command
    result, err := bus.Dispatch(ctx, CreateOrder{CustomerID: "cust-456"})
    if err != nil {
        panic(err)
    }
    
    fmt.Printf("Created order: %s (version %d)\n", result.AggregateID, result.Version)
}

Projections & Read Models (v0.3.0)

package main

import (
    "context"
    "time"

    "github.com/AshkanYarmoradi/go-mink"
    "github.com/AshkanYarmoradi/go-mink/adapters/memory"
)

// Define a read model
type OrderSummary struct {
    OrderID    string
    CustomerID string
    Status     string
    ItemCount  int
    Total      float64
}

// Define a projection
type OrderSummaryProjection struct {
    mink.ProjectionBase
    repo *mink.InMemoryRepository[OrderSummary]
}

func (p *OrderSummaryProjection) Apply(ctx context.Context, event mink.StoredEvent) error {
    // Transform events into read model updates
    // ...
    return nil
}

func main() {
    ctx := context.Background()

    // Create projection engine
    checkpointStore := memory.NewCheckpointStore()
    engine := mink.NewProjectionEngine(store,
        mink.WithCheckpointStore(checkpointStore),
    )

    // Register projections
    repo := mink.NewInMemoryRepository[OrderSummary](func(o *OrderSummary) string {
        return o.OrderID
    })
    engine.RegisterInline(&OrderSummaryProjection{repo: repo})

    // Start engine
    engine.Start(ctx)
    defer engine.Stop(ctx)

    // Query read models with fluent API
    orders, _ := repo.Query(ctx, mink.NewQuery().
        Where("Status", mink.Eq, "Pending").
        OrderByDesc("Total").
        WithLimit(10))

    // Rebuild projections when needed
    rebuilder := mink.NewProjectionRebuilder(store, checkpointStore)
    rebuilder.RebuildInline(ctx, projection, mink.RebuildOptions{BatchSize: 1000})
}

Testing Utilities (v0.4.0)

import (
    "github.com/AshkanYarmoradi/go-mink/testing/bdd"
    "github.com/AshkanYarmoradi/go-mink/testing/assertions"
    "github.com/AshkanYarmoradi/go-mink/testing/containers"
)

// BDD-style aggregate testing
func TestOrderCreation(t *testing.T) {
    order := NewOrder("order-123")

    bdd.Given(t, order).
        When(func() error {
            return order.Create("customer-456")
        }).
        Then(OrderCreated{OrderID: "order-123", CustomerID: "customer-456"})
}

// Event assertions
assertions.AssertEventTypes(t, events, "OrderCreated", "ItemAdded")

// PostgreSQL test containers
container := containers.StartPostgres(t)
db := container.MustDB(ctx)

Observability (v0.4.0)

import (
    "github.com/AshkanYarmoradi/go-mink/middleware/metrics"
    "github.com/AshkanYarmoradi/go-mink/middleware/tracing"
)

// Prometheus metrics
m := metrics.New(metrics.WithMetricsServiceName("order-service"))
m.MustRegister()
bus.Use(m.CommandMiddleware())

// OpenTelemetry tracing
tracer := tracing.NewTracer(tracing.WithServiceName("order-service"))
bus.Use(tracer.CommandMiddleware())

Installation

go get github.com/AshkanYarmoradi/go-mink
go get github.com/AshkanYarmoradi/go-mink/adapters/postgres

Documentation

Document Description
Introduction Problem statement and goals
Architecture System design and components
Event Store Event storage design
Read Models Projection system
Adapters Database adapter system
CLI Command-line tooling
API Design Public API reference
Roadmap Development phases
Advanced Patterns Commands, Sagas, Outbox
Security Encryption, GDPR, Versioning
Testing BDD fixtures and test utilities

License

Apache License 2.0 - See LICENSE for details.


go-mink - Event Sourcing for Go, Done Right.

Documentation

Overview

Package mink provides event sourcing and CQRS primitives for Go applications. It offers a simple, flexible API for building event-sourced systems with support for multiple database backends.

Package mink provides event sourcing and CQRS primitives for Go applications.

go-mink is an Event Sourcing library for Go that makes it easy to build applications using event sourcing patterns. It provides a simple API for storing events, loading aggregates, and projecting read models.

Quick Start

Create an event store with the in-memory adapter for development:

import (
    "github.com/AshkanYarmoradi/go-mink"
    "github.com/AshkanYarmoradi/go-mink/adapters/memory"
)

store := mink.New(memory.NewAdapter())

For production, use the PostgreSQL adapter:

import (
    "github.com/AshkanYarmoradi/go-mink"
    "github.com/AshkanYarmoradi/go-mink/adapters/postgres"
)

adapter, err := postgres.NewAdapter(ctx, connStr)
if err != nil {
    log.Fatal(err)
}
store := mink.New(adapter)

Defining Events

Events are simple structs that represent something that happened in your domain:

type OrderCreated struct {
    OrderID    string `json:"orderId"`
    CustomerID string `json:"customerId"`
}

type ItemAdded struct {
    OrderID  string  `json:"orderId"`
    SKU      string  `json:"sku"`
    Quantity int     `json:"quantity"`
    Price    float64 `json:"price"`
}

Register events with the store so they can be serialized and deserialized:

store.RegisterEvents(OrderCreated{}, ItemAdded{})

Defining Aggregates

Aggregates are domain objects that encapsulate business logic and generate events:

type Order struct {
    mink.AggregateBase
    CustomerID string
    Items      []OrderItem
    Status     string
}

func NewOrder(id string) *Order {
    return &Order{
        AggregateBase: mink.NewAggregateBase(id, "Order"),
    }
}

func (o *Order) Create(customerID string) {
    o.Apply(OrderCreated{OrderID: o.AggregateID(), CustomerID: customerID})
    o.CustomerID = customerID
    o.Status = "Created"
}

func (o *Order) ApplyEvent(event interface{}) error {
    switch e := event.(type) {
    case OrderCreated:
        o.CustomerID = e.CustomerID
        o.Status = "Created"
    case ItemAdded:
        o.Items = append(o.Items, OrderItem{SKU: e.SKU, Quantity: e.Quantity, Price: e.Price})
    }
    // NOTE: Version is managed automatically by LoadAggregate and SaveAggregate.
    // You do NOT need to call IncrementVersion() here.
    return nil
}

Saving and Loading Aggregates

Save aggregates to persist their uncommitted events:

order := NewOrder("order-123")
order.Create("customer-456")
order.AddItem("SKU-001", 2, 29.99)

err := store.SaveAggregate(ctx, order)

Load aggregates to rebuild state from events:

loaded := NewOrder("order-123")
err := store.LoadAggregate(ctx, loaded)
// loaded.Status == "Created"
// loaded.Items contains the added item
// loaded.Version() returns the number of events in the stream

Low-Level Event Operations

Append events directly to a stream:

events := []interface{}{
    OrderCreated{OrderID: "123", CustomerID: "456"},
    ItemAdded{OrderID: "123", SKU: "SKU-001", Quantity: 2, Price: 29.99},
}
err := store.Append(ctx, "Order-123", events)

Load events from a stream:

events, err := store.Load(ctx, "Order-123")

Optimistic Concurrency

Use expected versions to prevent concurrent modifications:

// Create new stream (must not exist)
err := store.Append(ctx, "Order-123", events, mink.ExpectVersion(mink.NoStream))

// Append to existing stream at specific version
err := store.Append(ctx, "Order-123", events, mink.ExpectVersion(1))

Version constants:

  • AnyVersion (-1): Skip version check
  • NoStream (0): Stream must not exist
  • StreamExists (-2): Stream must exist

Metadata

Add metadata to events for tracing and multi-tenancy:

metadata := mink.Metadata{}.
    WithUserID("user-123").
    WithCorrelationID("corr-456").
    WithTenantID("tenant-789")

err := store.Append(ctx, "Order-123", events, mink.WithAppendMetadata(metadata))

Commands and CQRS (v0.2.0)

Define commands to encapsulate user intentions:

type CreateOrder struct {
    mink.CommandBase
    CustomerID string `json:"customerId"`
}

func (c CreateOrder) CommandType() string { return "CreateOrder" }
func (c CreateOrder) Validate() error {
    if c.CustomerID == "" {
        return mink.NewValidationError("CustomerID", "required")
    }
    return nil
}

Create a command bus with middleware:

bus := mink.NewCommandBus()
bus.Use(mink.ValidationMiddleware())
bus.Use(mink.RecoveryMiddleware(func(err error) { log.Error(err) }))
bus.Use(mink.LoggingMiddleware(logger, nil))

Register command handlers:

bus.Register("CreateOrder", func(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
    c := cmd.(CreateOrder)
    order := NewOrder(uuid.New().String())
    order.Create(c.CustomerID)
    if err := store.SaveAggregate(ctx, order); err != nil {
        return mink.NewErrorResult(err), err
    }
    return mink.NewSuccessResult(order.AggregateID(), order.Version()), nil
})

Dispatch commands:

result, err := bus.Dispatch(ctx, CreateOrder{CustomerID: "cust-123"})

Idempotency

Prevent duplicate command processing with idempotency:

idempotencyStore := memory.NewIdempotencyStore()
config := mink.DefaultIdempotencyConfig(idempotencyStore)
bus.Use(mink.IdempotencyMiddleware(config))

Make commands idempotent by implementing IdempotentCommand:

func (c CreateOrder) IdempotencyKey() string { return c.RequestID }

Index

Constants

View Source
const (
	// AnyVersion skips version checking, allowing append regardless of current version.
	AnyVersion int64 = -1

	// NoStream indicates the stream must not exist (for creating new streams).
	NoStream int64 = 0

	// StreamExists indicates the stream must exist (for appending to existing streams).
	StreamExists int64 = -2
)

Version constants for optimistic concurrency control.

View Source
const (
	SagaStatusStarted            = adapters.SagaStatusStarted
	SagaStatusRunning            = adapters.SagaStatusRunning
	SagaStatusCompleted          = adapters.SagaStatusCompleted
	SagaStatusFailed             = adapters.SagaStatusFailed
	SagaStatusCompensating       = adapters.SagaStatusCompensating
	SagaStatusCompensated        = adapters.SagaStatusCompensated
	SagaStatusCompensationFailed = adapters.SagaStatusCompensationFailed
)

Re-export saga status constants from adapters.

View Source
const (
	SagaStepPending     = adapters.SagaStepPending
	SagaStepRunning     = adapters.SagaStepRunning
	SagaStepCompleted   = adapters.SagaStepCompleted
	SagaStepFailed      = adapters.SagaStepFailed
	SagaStepCompensated = adapters.SagaStepCompensated
)

Re-export saga step status constants from adapters.

Variables

View Source
var (
	// ErrStreamNotFound indicates the requested stream does not exist.
	ErrStreamNotFound = adapters.ErrStreamNotFound

	// ErrConcurrencyConflict indicates an optimistic concurrency violation.
	ErrConcurrencyConflict = adapters.ErrConcurrencyConflict

	// ErrEventNotFound indicates the requested event does not exist.
	ErrEventNotFound = errors.New("mink: event not found")

	// ErrSerializationFailed indicates event serialization/deserialization failed.
	ErrSerializationFailed = errors.New("mink: serialization failed")

	// ErrEventTypeNotRegistered indicates an unknown event type was encountered.
	ErrEventTypeNotRegistered = errors.New("mink: event type not registered")

	// ErrNilAggregate indicates a nil aggregate was passed.
	ErrNilAggregate = errors.New("mink: nil aggregate")

	// ErrNilStore indicates a nil event store was passed.
	ErrNilStore = errors.New("mink: nil event store")

	// ErrEmptyStreamID indicates an empty stream ID was provided.
	ErrEmptyStreamID = adapters.ErrEmptyStreamID

	// ErrNoEvents indicates no events were provided for append.
	ErrNoEvents = adapters.ErrNoEvents

	// ErrInvalidVersion indicates an invalid version number was provided.
	ErrInvalidVersion = adapters.ErrInvalidVersion

	// ErrAdapterClosed indicates the adapter has been closed.
	ErrAdapterClosed = adapters.ErrAdapterClosed

	// ErrSubscriptionNotSupported indicates the adapter does not support subscriptions.
	ErrSubscriptionNotSupported = errors.New("mink: adapter does not support subscriptions")

	// ErrHandlerNotFound indicates no handler is registered for a command type.
	ErrHandlerNotFound = errors.New("mink: handler not found")

	// ErrValidationFailed indicates command validation failed.
	ErrValidationFailed = errors.New("mink: validation failed")

	// ErrCommandAlreadyProcessed indicates an idempotent command was already processed.
	ErrCommandAlreadyProcessed = errors.New("mink: command already processed")

	// ErrNilCommand indicates a nil command was passed.
	ErrNilCommand = errors.New("mink: nil command")

	// ErrHandlerPanicked indicates a handler panicked during execution.
	ErrHandlerPanicked = errors.New("mink: handler panicked")

	// ErrCommandBusClosed indicates the command bus has been closed.
	ErrCommandBusClosed = errors.New("mink: command bus closed")
)

Sentinel errors for common error conditions. Use errors.Is() to check for these errors. These errors are aliases to the adapters package errors for compatibility.

View Source
var (
	// ErrNilProjection indicates a nil projection was passed.
	ErrNilProjection = errors.New("mink: nil projection")

	// ErrEmptyProjectionName indicates a projection has no name.
	ErrEmptyProjectionName = errors.New("mink: projection name is required")

	// ErrProjectionNotFound indicates the requested projection does not exist.
	ErrProjectionNotFound = errors.New("mink: projection not found")

	// ErrProjectionAlreadyRegistered indicates a projection with the same name is already registered.
	ErrProjectionAlreadyRegistered = errors.New("mink: projection already registered")

	// ErrProjectionEngineAlreadyRunning indicates the projection engine is already running.
	ErrProjectionEngineAlreadyRunning = errors.New("mink: projection engine already running")

	// ErrProjectionEngineStopped indicates the projection engine has been stopped.
	ErrProjectionEngineStopped = errors.New("mink: projection engine stopped")

	// ErrNoCheckpointStore indicates no checkpoint store was configured.
	ErrNoCheckpointStore = errors.New("mink: checkpoint store is required")

	// ErrNotImplemented indicates a method is not implemented.
	ErrNotImplemented = errors.New("mink: not implemented")

	// ErrProjectionFailed indicates a projection failed to process an event.
	ErrProjectionFailed = errors.New("mink: projection failed")
)
View Source
var (
	// ErrNotFound indicates the requested entity was not found.
	ErrNotFound = errors.New("mink: not found")

	// ErrAlreadyExists indicates the entity already exists.
	ErrAlreadyExists = errors.New("mink: already exists")

	// ErrInvalidQuery indicates the query is invalid.
	ErrInvalidQuery = errors.New("mink: invalid query")
)

Repository errors

View Source
var (
	// ErrSagaNotFound indicates the requested saga does not exist.
	ErrSagaNotFound = adapters.ErrSagaNotFound

	// ErrSagaAlreadyExists indicates a saga with the same ID already exists.
	ErrSagaAlreadyExists = adapters.ErrSagaAlreadyExists

	// ErrSagaCompleted indicates the saga has already completed.
	ErrSagaCompleted = errors.New("mink: saga already completed")

	// ErrSagaFailed indicates the saga has failed.
	ErrSagaFailed = errors.New("mink: saga failed")

	// ErrSagaCompensating indicates the saga is currently compensating.
	ErrSagaCompensating = errors.New("mink: saga is compensating")

	// ErrNoSagaHandler indicates no handler is registered for the event type.
	ErrNoSagaHandler = errors.New("mink: no saga handler for event")
)

Saga-related sentinel errors.

Functions

func BuildStreamID

func BuildStreamID(aggregateType, aggregateID string) string

BuildStreamID creates a stream ID from an aggregate type and ID. This follows the convention: "{Type}-{ID}"

func CausationIDFromContext added in v0.1.10

func CausationIDFromContext(ctx context.Context) string

CausationIDFromContext returns the causation ID from context.

func CorrelationIDFromContext added in v0.1.10

func CorrelationIDFromContext(ctx context.Context) string

CorrelationIDFromContext returns the correlation ID from context.

func GenerateIdempotencyKey added in v0.1.10

func GenerateIdempotencyKey(cmd Command) string

GenerateIdempotencyKey generates an idempotency key from a command. The key is based on the command type and its JSON-serialized content.

func GetCommandType added in v0.1.10

func GetCommandType(cmd interface{}) string

GetCommandType returns the type name of a command using reflection. This is useful for commands that don't embed CommandBase.

func GetEventType

func GetEventType(event interface{}) string

GetEventType returns the event type name for the given event. It uses the struct name as the type name.

func GetIdempotencyKey added in v0.1.10

func GetIdempotencyKey(cmd Command) string

GetIdempotencyKey returns the idempotency key for a command. If the command implements IdempotentCommand, it uses that key. Otherwise, it generates a key from the command content.

func IdempotencyKeyFromField added in v0.1.10

func IdempotencyKeyFromField(fieldGetter func(Command) string) func(Command) string

IdempotencyKeyFromField extracts the idempotency key from a field in the command. If the field is empty, it falls back to GenerateIdempotencyKey.

func IdempotencyKeyPrefix added in v0.1.10

func IdempotencyKeyPrefix(prefix string) func(Command) string

IdempotencyKeyPrefix is a convenience function to create a prefixed idempotency key.

func RegisterGenericHandler added in v0.1.10

func RegisterGenericHandler[C Command](registry *HandlerRegistry, handler func(ctx context.Context, cmd C) (CommandResult, error))

RegisterGenericHandler is a convenience function to register a generic handler.

func SagaStateToJSON added in v0.5.0

func SagaStateToJSON(state *SagaState) ([]byte, error)

SagaStateToJSON converts saga state to JSON for persistence.

func ShouldHandleEventType added in v0.3.8

func ShouldHandleEventType(handledEvents []string, eventType string) bool

ShouldHandleEventType checks if an event type should be handled given a list of handled events. Returns true if handledEvents is empty (meaning all events are handled) or if eventType is in the list. This is a utility function used by projection engine and rebuilder to filter events.

func TenantIDFromContext added in v0.1.10

func TenantIDFromContext(ctx context.Context) string

TenantIDFromContext returns the tenant ID from context.

func Version

func Version() string

Version returns the library version string.

func WithCausationID added in v0.1.10

func WithCausationID(ctx context.Context, causationID string) context.Context

WithCausationID returns a context with the causation ID set.

func WithTenantID added in v0.1.10

func WithTenantID(ctx context.Context, tenantID string) context.Context

WithTenantID returns a context with the tenant ID set.

Types

type Aggregate

type Aggregate interface {
	// AggregateID returns the unique identifier for this aggregate instance.
	AggregateID() string

	// AggregateType returns the type/category of this aggregate (e.g., "Order", "Customer").
	AggregateType() string

	// Version returns the current version of the aggregate.
	// This is the number of events that have been applied.
	Version() int64

	// ApplyEvent applies an event to update the aggregate's state.
	// This method should be idempotent and deterministic.
	ApplyEvent(event interface{}) error

	// UncommittedEvents returns events that have been applied but not yet persisted.
	UncommittedEvents() []interface{}

	// ClearUncommittedEvents removes all uncommitted events after successful persistence.
	ClearUncommittedEvents()
}

Aggregate defines the interface for event-sourced aggregates. An aggregate is a domain object whose state is derived from a sequence of events.

type AggregateBase

type AggregateBase struct {
	// contains filtered or unexported fields
}

AggregateBase provides a default partial implementation of the Aggregate interface. Embed this struct in your aggregate types to get default behavior.

func NewAggregateBase

func NewAggregateBase(id, aggregateType string) AggregateBase

NewAggregateBase creates a new AggregateBase with the given ID and type.

func (*AggregateBase) AggregateID

func (a *AggregateBase) AggregateID() string

AggregateID returns the aggregate's unique identifier.

func (*AggregateBase) AggregateType

func (a *AggregateBase) AggregateType() string

AggregateType returns the aggregate type.

func (*AggregateBase) Apply

func (a *AggregateBase) Apply(event interface{})

Apply records an event as uncommitted. This should be called by the aggregate after creating a new event. The aggregate should also update its internal state based on the event.

func (*AggregateBase) ClearUncommittedEvents

func (a *AggregateBase) ClearUncommittedEvents()

ClearUncommittedEvents removes all uncommitted events.

func (*AggregateBase) HasUncommittedEvents

func (a *AggregateBase) HasUncommittedEvents() bool

HasUncommittedEvents returns true if there are events waiting to be persisted.

func (*AggregateBase) IncrementVersion

func (a *AggregateBase) IncrementVersion()

IncrementVersion increments the aggregate version by 1.

func (*AggregateBase) SetID

func (a *AggregateBase) SetID(id string)

SetID sets the aggregate's ID.

func (*AggregateBase) SetType

func (a *AggregateBase) SetType(t string)

SetType sets the aggregate type.

func (*AggregateBase) SetVersion

func (a *AggregateBase) SetVersion(v int64)

SetVersion sets the aggregate version.

func (*AggregateBase) StreamID

func (a *AggregateBase) StreamID() StreamID

StreamID returns the stream ID for this aggregate. The stream ID is composed of the aggregate type and ID.

func (*AggregateBase) UncommittedEvents

func (a *AggregateBase) UncommittedEvents() []interface{}

UncommittedEvents returns events that haven't been persisted yet.

func (*AggregateBase) Version

func (a *AggregateBase) Version() int64

Version returns the current version of the aggregate.

type AggregateCommand added in v0.1.10

type AggregateCommand interface {
	Command

	// AggregateID returns the ID of the aggregate this command targets.
	// Returns empty string for commands that create new aggregates.
	AggregateID() string
}

AggregateCommand is a command that targets a specific aggregate.

type AggregateFactory

type AggregateFactory func(id string) Aggregate

AggregateFactory creates new aggregate instances.

type AggregateHandler added in v0.1.10

type AggregateHandler[C AggregateCommand, A Aggregate] struct {
	// contains filtered or unexported fields
}

AggregateHandler is a handler that works with aggregates and an event store. It loads the aggregate, executes the command, and saves the results.

func NewAggregateHandler added in v0.1.10

func NewAggregateHandler[C AggregateCommand, A Aggregate](config AggregateHandlerConfig[C, A]) *AggregateHandler[C, A]

NewAggregateHandler creates a new AggregateHandler.

func (*AggregateHandler[C, A]) CommandType added in v0.1.10

func (h *AggregateHandler[C, A]) CommandType() string

CommandType returns the command type this handler processes.

func (*AggregateHandler[C, A]) Handle added in v0.1.10

func (h *AggregateHandler[C, A]) Handle(ctx context.Context, cmd Command) (CommandResult, error)

Handle loads the aggregate, executes the command, and saves the aggregate.

type AggregateHandlerConfig added in v0.1.10

type AggregateHandlerConfig[C AggregateCommand, A Aggregate] struct {
	Store     *EventStore
	Factory   func(id string) A
	Executor  func(ctx context.Context, agg A, cmd C) error
	NewIDFunc func() string
}

AggregateHandlerConfig configures an AggregateHandler.

type AggregateRoot

type AggregateRoot interface {
	Aggregate

	// GetID is an alias for AggregateID for DDD conventions.
	GetID() string
}

AggregateRoot is an extended interface that includes domain-driven design patterns.

type AppendOption

type AppendOption func(*appendConfig)

AppendOption configures an append operation.

func ExpectVersion

func ExpectVersion(v int64) AppendOption

ExpectVersion sets the expected stream version for optimistic concurrency.

func WithAppendMetadata

func WithAppendMetadata(m Metadata) AppendOption

WithMetadata sets metadata for all events in the append operation.

type AsyncOptions added in v0.3.0

type AsyncOptions struct {
	// BatchSize is the maximum number of events to process in a batch.
	// Default: 100
	BatchSize int

	// BatchTimeout is the maximum time to wait for a full batch.
	// Default: 1 second
	BatchTimeout time.Duration

	// PollInterval is how often to poll for new events when idle.
	// Default: 100ms
	PollInterval time.Duration

	// RetryPolicy defines how to handle errors.
	RetryPolicy RetryPolicy

	// MaxRetries is the maximum number of retries for failed events.
	// Default: 3
	MaxRetries int

	// StartFromBeginning starts processing from the beginning of the event stream.
	// If false, starts from the last checkpoint.
	// Default: false
	StartFromBeginning bool
}

AsyncOptions configures async projection behavior.

func DefaultAsyncOptions added in v0.3.0

func DefaultAsyncOptions() AsyncOptions

DefaultAsyncOptions returns the default async projection options.

type AsyncProjection added in v0.3.0

type AsyncProjection interface {
	Projection

	// Apply processes a single event asynchronously.
	Apply(ctx context.Context, event StoredEvent) error

	// ApplyBatch processes multiple events in a single batch for efficiency.
	// If not supported, implementations should process events sequentially.
	ApplyBatch(ctx context.Context, events []StoredEvent) error
}

AsyncProjection processes events asynchronously in the background. This provides eventual consistency but better write performance and scalability.

type AsyncProjectionBase added in v0.3.0

type AsyncProjectionBase struct {
	ProjectionBase
}

AsyncProjectionBase provides a default implementation of AsyncProjection. Embed this struct and override Apply to create async projections.

func NewAsyncProjectionBase added in v0.3.0

func NewAsyncProjectionBase(name string, handledEvents ...string) AsyncProjectionBase

NewAsyncProjectionBase creates a new AsyncProjectionBase.

func (*AsyncProjectionBase) ApplyBatch added in v0.3.0

func (p *AsyncProjectionBase) ApplyBatch(ctx context.Context, events []StoredEvent) error

ApplyBatch provides a default implementation that processes events sequentially. Override this method for custom batch processing logic.

type AsyncResult added in v0.5.2

type AsyncResult struct {
	// contains filtered or unexported fields
}

AsyncResult represents the result of an asynchronous operation. It provides methods to wait for completion and check the result.

func (*AsyncResult) Cancel added in v0.5.2

func (r *AsyncResult) Cancel()

Cancel cancels the async operation's context. This can be used to stop a long-running operation early.

func (*AsyncResult) Context added in v0.5.2

func (r *AsyncResult) Context() context.Context

Context returns the context for this async operation.

func (*AsyncResult) Done added in v0.5.2

func (r *AsyncResult) Done() <-chan struct{}

Done returns a channel that is closed when the operation completes. Use this in select statements for non-blocking wait patterns.

Example:

select {
case <-result.Done():
    if err := result.Err(); err != nil {
        log.Printf("Failed: %v", err)
    }
case <-time.After(5 * time.Second):
    result.Cancel()
    log.Println("Timed out")
}

func (*AsyncResult) Err added in v0.5.2

func (r *AsyncResult) Err() error

Err returns the error from the completed operation, or nil if not yet complete or successful.

func (*AsyncResult) IsComplete added in v0.5.2

func (r *AsyncResult) IsComplete() bool

IsComplete returns true if the operation has completed (successfully or with an error).

func (*AsyncResult) Wait added in v0.5.2

func (r *AsyncResult) Wait() error

Wait blocks until the operation completes and returns any error. This is a convenience method equivalent to <-result.Done(); return result.Err()

func (*AsyncResult) WaitWithTimeout added in v0.5.2

func (r *AsyncResult) WaitWithTimeout(timeout time.Duration) error

WaitWithTimeout blocks until the operation completes or the timeout expires. Returns context.DeadlineExceeded if the timeout is reached before completion.

type CatchupSubscription added in v0.3.0

type CatchupSubscription struct {
	// contains filtered or unexported fields
}

CatchupSubscription provides catch-up subscription functionality. It first reads historical events from the event store, then switches to polling for new events. This ensures no events are missed during the transition.

func NewCatchupSubscription added in v0.3.0

func NewCatchupSubscription(
	store *EventStore,
	fromPosition uint64,
	opts ...SubscriptionOptions,
) (*CatchupSubscription, error)

NewCatchupSubscription creates a new catch-up subscription. Call Start() to begin receiving events from the specified position.

func (*CatchupSubscription) Close added in v0.3.0

func (s *CatchupSubscription) Close() error

Close stops the subscription.

func (*CatchupSubscription) Err added in v0.3.0

func (s *CatchupSubscription) Err() error

Err returns any error that caused the subscription to close.

func (*CatchupSubscription) Events added in v0.3.0

func (s *CatchupSubscription) Events() <-chan StoredEvent

Events returns the channel for receiving events.

func (*CatchupSubscription) Position added in v0.3.0

func (s *CatchupSubscription) Position() uint64

Position returns the current position of the subscription.

func (*CatchupSubscription) Start added in v0.3.0

func (s *CatchupSubscription) Start(ctx context.Context, pollInterval time.Duration) error

Start begins the catch-up subscription with the specified poll interval. It first catches up on historical events, then polls for new events.

type CategoryFilter added in v0.3.0

type CategoryFilter struct {
	// contains filtered or unexported fields
}

CategoryFilter filters events by stream category.

func NewCategoryFilter added in v0.3.0

func NewCategoryFilter(category string) *CategoryFilter

NewCategoryFilter creates a filter that only matches events from streams in the category.

func (*CategoryFilter) Matches added in v0.3.0

func (f *CategoryFilter) Matches(event StoredEvent) bool

Matches returns true if the event's stream is in the category.

type Checkpoint added in v0.3.0

type Checkpoint struct {
	// ProjectionName is the name of the projection.
	ProjectionName string

	// Position is the global position of the last processed event.
	Position uint64

	// UpdatedAt is when the checkpoint was last updated.
	UpdatedAt time.Time
}

Checkpoint represents a stored checkpoint record.

type CheckpointStore added in v0.3.0

type CheckpointStore interface {
	// GetCheckpoint returns the last processed position for a projection.
	// Returns 0 if no checkpoint exists.
	GetCheckpoint(ctx context.Context, projectionName string) (uint64, error)

	// SetCheckpoint stores the last processed position for a projection.
	SetCheckpoint(ctx context.Context, projectionName string, position uint64) error

	// DeleteCheckpoint removes the checkpoint for a projection.
	DeleteCheckpoint(ctx context.Context, projectionName string) error

	// GetAllCheckpoints returns checkpoints for all projections.
	GetAllCheckpoints(ctx context.Context) (map[string]uint64, error)
}

CheckpointStore manages projection checkpoints. Checkpoints track the last processed position for each projection.

type Clearable added in v0.3.0

type Clearable interface {
	// Clear removes all data from the read model.
	Clear(ctx context.Context) error
}

Clearable is an interface for projections that can clear their read model.

type Command added in v0.1.10

type Command interface {
	// CommandType returns the type identifier for this command (e.g., "CreateOrder").
	CommandType() string

	// Validate checks if the command is valid.
	// Returns nil if valid, or an error describing validation failures.
	Validate() error
}

Command represents an intent to change state in the system. Commands are the write side of CQRS and should be validated before execution.

type CommandBase added in v0.1.10

type CommandBase struct {
	// CommandID is an optional unique identifier for this command instance.
	CommandID string `json:"commandId,omitempty"`

	// CorrelationID links related commands and events for distributed tracing.
	CorrelationID string `json:"correlationId,omitempty"`

	// CausationID identifies the event or command that caused this command.
	CausationID string `json:"causationId,omitempty"`

	// Metadata contains arbitrary key-value pairs for application-specific data.
	Metadata map[string]string `json:"metadata,omitempty"`
}

CommandBase provides a default partial implementation of Command. Embed this struct in your command types to get common functionality.

func (CommandBase) GetCausationID added in v0.1.10

func (c CommandBase) GetCausationID() string

GetCausationID returns the causation ID.

func (CommandBase) GetCommandID added in v0.1.10

func (c CommandBase) GetCommandID() string

GetCommandID returns the command ID.

func (CommandBase) GetCorrelationID added in v0.1.10

func (c CommandBase) GetCorrelationID() string

GetCorrelationID returns the correlation ID.

func (CommandBase) GetMetadata added in v0.1.10

func (c CommandBase) GetMetadata(key string) string

GetMetadata returns the value for a metadata key, or empty string if not found.

func (CommandBase) WithCausationID added in v0.1.10

func (c CommandBase) WithCausationID(id string) CommandBase

WithCausationID returns a copy of CommandBase with the causation ID set.

func (CommandBase) WithCommandID added in v0.1.10

func (c CommandBase) WithCommandID(id string) CommandBase

WithCommandID returns a copy of CommandBase with the command ID set.

func (CommandBase) WithCorrelationID added in v0.1.10

func (c CommandBase) WithCorrelationID(id string) CommandBase

WithCorrelationID returns a copy of CommandBase with the correlation ID set.

func (CommandBase) WithMetadata added in v0.1.10

func (c CommandBase) WithMetadata(key, value string) CommandBase

WithMetadata returns a copy of CommandBase with a metadata key-value pair added.

type CommandBus added in v0.1.10

type CommandBus struct {
	// contains filtered or unexported fields
}

CommandBus orchestrates command dispatching with middleware support. It routes commands to their handlers through a configurable middleware pipeline.

func NewCommandBus added in v0.1.10

func NewCommandBus(opts ...CommandBusOption) *CommandBus

NewCommandBus creates a new CommandBus with the given options.

func (*CommandBus) Close added in v0.1.10

func (b *CommandBus) Close() error

Close closes the command bus, preventing further dispatch operations.

func (*CommandBus) Dispatch added in v0.1.10

func (b *CommandBus) Dispatch(ctx context.Context, cmd Command) (CommandResult, error)

Dispatch sends a command through the middleware pipeline to its handler.

func (*CommandBus) DispatchAll added in v0.1.10

func (b *CommandBus) DispatchAll(ctx context.Context, cmds ...Command) ([]DispatchResult, error)

DispatchAll dispatches multiple commands and returns all results. Commands are dispatched sequentially in order.

func (*CommandBus) DispatchAsync added in v0.1.10

func (b *CommandBus) DispatchAsync(ctx context.Context, cmd Command) <-chan DispatchResult

DispatchAsync sends a command asynchronously and returns immediately. The result can be retrieved through the returned channel.

func (*CommandBus) HandlerCount added in v0.1.10

func (b *CommandBus) HandlerCount() int

HandlerCount returns the number of registered handlers.

func (*CommandBus) HasHandler added in v0.1.10

func (b *CommandBus) HasHandler(cmdType string) bool

HasHandler returns true if a handler is registered for the command type.

func (*CommandBus) IsClosed added in v0.1.10

func (b *CommandBus) IsClosed() bool

IsClosed returns true if the command bus has been closed.

func (*CommandBus) MiddlewareCount added in v0.1.10

func (b *CommandBus) MiddlewareCount() int

MiddlewareCount returns the number of registered middleware.

func (*CommandBus) Register added in v0.1.10

func (b *CommandBus) Register(handler CommandHandler)

Register adds a handler to the command bus.

func (*CommandBus) RegisterFunc added in v0.1.10

func (b *CommandBus) RegisterFunc(cmdType string, fn func(ctx context.Context, cmd Command) (CommandResult, error))

RegisterFunc registers a handler function for a command type.

func (*CommandBus) Use added in v0.1.10

func (b *CommandBus) Use(middleware ...Middleware)

Use adds middleware to the command bus. Middleware is executed in the order it was added.

type CommandBusOption added in v0.1.10

type CommandBusOption func(*CommandBus)

CommandBusOption configures a CommandBus.

func WithHandlerRegistry added in v0.1.10

func WithHandlerRegistry(registry *HandlerRegistry) CommandBusOption

WithHandlerRegistry sets a custom handler registry.

func WithMiddleware added in v0.1.10

func WithMiddleware(middleware ...Middleware) CommandBusOption

WithMiddleware adds middleware to the command bus.

type CommandContext added in v0.1.10

type CommandContext struct {
	// Context is the standard Go context.
	Context context.Context

	// Command is the command being executed.
	Command Command

	// Result is the command execution result (set by handler).
	Result CommandResult

	// Metadata contains additional context data that can be set by middleware.
	Metadata map[string]interface{}
}

CommandContext carries command execution context through the middleware chain.

func NewCommandContext added in v0.1.10

func NewCommandContext(ctx context.Context, cmd Command) *CommandContext

NewCommandContext creates a new CommandContext.

func (*CommandContext) Get added in v0.1.10

func (c *CommandContext) Get(key string) (interface{}, bool)

Get retrieves a value from the context metadata.

func (*CommandContext) GetString added in v0.1.10

func (c *CommandContext) GetString(key string) string

GetString retrieves a string value from the context metadata.

func (*CommandContext) Set added in v0.1.10

func (c *CommandContext) Set(key string, value interface{})

Set stores a value in the context metadata.

func (*CommandContext) SetError added in v0.1.10

func (c *CommandContext) SetError(err error)

SetError sets an error result.

func (*CommandContext) SetResult added in v0.1.10

func (c *CommandContext) SetResult(result CommandResult)

SetResult sets the command execution result.

func (*CommandContext) SetSuccess added in v0.1.10

func (c *CommandContext) SetSuccess(aggregateID string, version int64)

SetSuccess sets a successful result.

type CommandDispatcher added in v0.1.10

type CommandDispatcher interface {
	// Dispatch sends a command to its handler and returns the result.
	Dispatch(ctx context.Context, cmd Command) (CommandResult, error)
}

CommandDispatcher can dispatch commands to handlers.

type CommandHandler added in v0.1.10

type CommandHandler interface {
	// CommandType returns the type of command this handler processes.
	CommandType() string

	// Handle processes the command and returns a result.
	Handle(ctx context.Context, cmd Command) (CommandResult, error)
}

CommandHandler is the interface for handling a specific command type. Handlers contain the business logic for processing commands.

type CommandHandlerFunc added in v0.1.10

type CommandHandlerFunc struct {
	// contains filtered or unexported fields
}

CommandHandlerFunc is a function type that implements CommandHandler.

func NewCommandHandlerFunc added in v0.1.10

func NewCommandHandlerFunc(cmdType string, fn func(ctx context.Context, cmd Command) (CommandResult, error)) *CommandHandlerFunc

NewCommandHandlerFunc creates a new CommandHandlerFunc.

func (*CommandHandlerFunc) CommandType added in v0.1.10

func (h *CommandHandlerFunc) CommandType() string

CommandType returns the command type this handler processes.

func (*CommandHandlerFunc) Handle added in v0.1.10

Handle processes the command.

type CommandResult added in v0.1.10

type CommandResult struct {
	// Success indicates whether the command executed successfully.
	Success bool

	// AggregateID is the ID of the aggregate affected by the command.
	// For create commands, this is the ID of the newly created aggregate.
	AggregateID string

	// Version is the new version of the aggregate after command execution.
	Version int64

	// Data contains any additional result data.
	Data interface{}

	// Error contains the error if the command failed.
	Error error
}

CommandResult represents the result of command execution. It can contain either a successful result or an error.

func IdempotencyRecordToResult added in v0.1.10

func IdempotencyRecordToResult(r *IdempotencyRecord) CommandResult

IdempotencyRecordToResult converts the record to a CommandResult.

func NewErrorResult added in v0.1.10

func NewErrorResult(err error) CommandResult

NewErrorResult creates a failed CommandResult.

func NewSuccessResult added in v0.1.10

func NewSuccessResult(aggregateID string, version int64) CommandResult

NewSuccessResult creates a successful CommandResult.

func NewSuccessResultWithData added in v0.1.10

func NewSuccessResultWithData(aggregateID string, version int64, data interface{}) CommandResult

NewSuccessResultWithData creates a successful CommandResult with additional data.

func (CommandResult) IsError added in v0.1.10

func (r CommandResult) IsError() bool

IsError returns true if the command failed.

func (CommandResult) IsSuccess added in v0.1.10

func (r CommandResult) IsSuccess() bool

IsSuccess returns true if the command executed successfully.

type CompositeFilter added in v0.3.0

type CompositeFilter struct {
	// contains filtered or unexported fields
}

CompositeFilter combines multiple filters with AND logic.

func NewCompositeFilter added in v0.3.0

func NewCompositeFilter(filters ...EventFilter) *CompositeFilter

NewCompositeFilter creates a filter that matches only if all filters match.

func (*CompositeFilter) Matches added in v0.3.0

func (f *CompositeFilter) Matches(event StoredEvent) bool

Matches returns true if all filters match.

type ConcurrencyError

type ConcurrencyError struct {
	StreamID        string
	ExpectedVersion int64
	ActualVersion   int64
}

ConcurrencyError provides detailed information about a concurrency conflict.

func NewConcurrencyError

func NewConcurrencyError(streamID string, expected, actual int64) *ConcurrencyError

NewConcurrencyError creates a new ConcurrencyError.

func (*ConcurrencyError) Error

func (e *ConcurrencyError) Error() string

Error returns the error message.

func (*ConcurrencyError) Is

func (e *ConcurrencyError) Is(target error) bool

Is reports whether this error matches the target error.

func (*ConcurrencyError) Unwrap

func (e *ConcurrencyError) Unwrap() error

Unwrap returns the underlying error for errors.Unwrap().

type ContextValueMiddleware added in v0.1.10

type ContextValueMiddleware struct {
	// contains filtered or unexported fields
}

ContextValueMiddleware adds values to the context.

func NewContextValueMiddleware added in v0.1.10

func NewContextValueMiddleware(key, value interface{}) *ContextValueMiddleware

NewContextValueMiddleware creates middleware that adds a value to the context.

func (*ContextValueMiddleware) Middleware added in v0.1.10

func (m *ContextValueMiddleware) Middleware() Middleware

Middleware returns the middleware function.

type DispatchResult added in v0.1.10

type DispatchResult struct {
	CommandResult
	Error error
}

DispatchResult contains the result of an asynchronous dispatch operation.

func (DispatchResult) IsSuccess added in v0.1.10

func (r DispatchResult) IsSuccess() bool

IsSuccess returns true if the dispatch was successful.

type Event

type Event struct {
	// ID is the globally unique event identifier.
	ID string

	// StreamID identifies the stream this event belongs to.
	StreamID string

	// Type is the event type identifier.
	Type string

	// Data is the deserialized event payload.
	Data interface{}

	// Metadata contains contextual information.
	Metadata Metadata

	// Version is the position within the stream (1-based).
	Version int64

	// GlobalPosition is the position across all streams.
	GlobalPosition uint64

	// Timestamp is when the event was stored.
	Timestamp time.Time
}

Event represents a deserialized event with its data as a Go type. This is the high-level representation used by applications.

func DeserializeEvent

func DeserializeEvent(serializer Serializer, stored StoredEvent) (Event, error)

DeserializeEvent is a convenience function that deserializes a StoredEvent to an Event.

func EventFromStored

func EventFromStored(stored StoredEvent, data interface{}) Event

EventFromStored creates an Event from a StoredEvent with deserialized data.

type EventApplier

type EventApplier func(aggregate interface{}, event interface{}) error

EventApplier is a function type for applying events to aggregates.

type EventData

type EventData struct {
	// Type is the event type identifier (e.g., "OrderCreated").
	Type string

	// Data is the serialized event payload.
	Data []byte

	// Metadata contains optional contextual information.
	Metadata Metadata
}

EventData represents an event to be stored. It contains the event type, serialized payload, and optional metadata.

func NewEventData

func NewEventData(eventType string, data []byte) EventData

NewEventData creates a new EventData with the given type and data.

func SerializeEvent

func SerializeEvent(serializer Serializer, event interface{}, metadata Metadata) (EventData, error)

SerializeEvent is a convenience function that serializes an event and returns EventData.

func (EventData) Validate

func (e EventData) Validate() error

Validate checks if the EventData is valid.

func (EventData) WithMetadata

func (e EventData) WithMetadata(m Metadata) EventData

WithMetadata returns a copy of EventData with the metadata set.

type EventFilter added in v0.3.0

type EventFilter interface {
	// Matches returns true if the event should be delivered.
	Matches(event StoredEvent) bool
}

EventFilter determines which events should be delivered.

type EventRegistry

type EventRegistry struct {
	// contains filtered or unexported fields
}

EventRegistry maps event type names to Go types. It is used by the JSONSerializer to deserialize events to the correct type.

func NewEventRegistry

func NewEventRegistry() *EventRegistry

NewEventRegistry creates a new empty EventRegistry.

func (*EventRegistry) Count

func (r *EventRegistry) Count() int

Count returns the number of registered event types.

func (*EventRegistry) Lookup

func (r *EventRegistry) Lookup(eventType string) (reflect.Type, bool)

Lookup returns the Go type for the given event type name. Returns nil and false if the type is not registered.

func (*EventRegistry) Register

func (r *EventRegistry) Register(eventType string, example interface{})

Register adds a mapping from eventType to the Go type of the example. The example should be a value (not a pointer) of the event type.

func (*EventRegistry) RegisterAll

func (r *EventRegistry) RegisterAll(examples ...interface{})

RegisterAll registers multiple events using their struct names as type names. Each example should be a value (not a pointer) of the event type.

func (*EventRegistry) RegisteredTypes

func (r *EventRegistry) RegisteredTypes() []string

RegisteredTypes returns a slice of all registered event type names.

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

EventStore is the main entry point for event sourcing operations. It provides methods for appending events, loading aggregates, and managing streams.

func New

func New(adapter adapters.EventStoreAdapter, opts ...Option) *EventStore

New creates a new EventStore with the given adapter and options.

func (*EventStore) Adapter

func (s *EventStore) Adapter() adapters.EventStoreAdapter

Adapter returns the underlying adapter.

func (*EventStore) Append

func (s *EventStore) Append(ctx context.Context, streamID string, events []interface{}, opts ...AppendOption) error

Append stores events to the specified stream. Events can be Go structs which will be serialized using the configured serializer.

func (*EventStore) Close

func (s *EventStore) Close() error

Close releases resources held by the event store.

func (*EventStore) GetLastPosition

func (s *EventStore) GetLastPosition(ctx context.Context) (uint64, error)

GetLastPosition returns the global position of the last stored event.

func (*EventStore) GetStreamInfo

func (s *EventStore) GetStreamInfo(ctx context.Context, streamID string) (*StreamInfo, error)

GetStreamInfo returns metadata about a stream.

func (*EventStore) Initialize

func (s *EventStore) Initialize(ctx context.Context) error

Initialize sets up the required storage schema.

func (*EventStore) Load

func (s *EventStore) Load(ctx context.Context, streamID string) ([]Event, error)

Load retrieves all events from a stream.

func (*EventStore) LoadAggregate

func (s *EventStore) LoadAggregate(ctx context.Context, agg Aggregate) error

LoadAggregate loads an aggregate's state by replaying its events. The aggregate should be a new instance with its ID and type already set.

If the aggregate implements VersionSetter, the version will be set to the number of events loaded. This is required for proper optimistic concurrency control when saving the aggregate later.

Note: AggregateBase implements VersionSetter, so aggregates embedding AggregateBase will automatically have their version set correctly.

func (*EventStore) LoadEventsFromPosition added in v0.3.8

func (s *EventStore) LoadEventsFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]StoredEvent, error)

LoadEventsFromPosition loads events starting from a global position. Returns ErrSubscriptionNotSupported if the adapter does not implement SubscriptionAdapter. This is a helper method used by ProjectionEngine and ProjectionRebuilder.

func (*EventStore) LoadFrom

func (s *EventStore) LoadFrom(ctx context.Context, streamID string, fromVersion int64) ([]Event, error)

LoadFrom retrieves events from a stream starting from the specified version.

func (*EventStore) LoadRaw

func (s *EventStore) LoadRaw(ctx context.Context, streamID string, fromVersion int64) ([]StoredEvent, error)

LoadRaw retrieves raw (non-deserialized) events from a stream.

func (*EventStore) RegisterEvents

func (s *EventStore) RegisterEvents(events ...interface{})

RegisterEvents registers event types with the serializer. This is required for deserializing events back to their original types.

func (*EventStore) SaveAggregate

func (s *EventStore) SaveAggregate(ctx context.Context, agg Aggregate) error

SaveAggregate persists uncommitted events from an aggregate. The aggregate's version is used for optimistic concurrency control.

After a successful save, if the aggregate implements VersionSetter, the version will be updated to reflect the new stream version. This allows for subsequent modifications without reloading.

func (*EventStore) Serializer

func (s *EventStore) Serializer() Serializer

Serializer returns the event store's serializer.

type EventSubscriber added in v0.3.0

type EventSubscriber interface {
	// SubscribeAll subscribes to all events starting from the given position.
	SubscribeAll(ctx context.Context, fromPosition uint64, opts ...SubscriptionOptions) (Subscription, error)

	// SubscribeStream subscribes to events from a specific stream.
	SubscribeStream(ctx context.Context, streamID string, fromVersion int64, opts ...SubscriptionOptions) (Subscription, error)

	// SubscribeCategory subscribes to events from all streams in a category.
	SubscribeCategory(ctx context.Context, category string, fromPosition uint64, opts ...SubscriptionOptions) (Subscription, error)
}

EventSubscriber provides event subscription capabilities.

type EventTypeFilter added in v0.3.0

type EventTypeFilter struct {
	// contains filtered or unexported fields
}

EventTypeFilter filters events by type.

func NewEventTypeFilter added in v0.3.0

func NewEventTypeFilter(eventTypes ...string) *EventTypeFilter

NewEventTypeFilter creates a filter that only matches the specified event types.

func (*EventTypeFilter) Matches added in v0.3.0

func (f *EventTypeFilter) Matches(event StoredEvent) bool

Matches returns true if the event type is in the filter.

type EventTypeNotRegisteredError

type EventTypeNotRegisteredError struct {
	EventType string
}

EventTypeNotRegisteredError provides detailed information about an unregistered event type.

func NewEventTypeNotRegisteredError

func NewEventTypeNotRegisteredError(eventType string) *EventTypeNotRegisteredError

NewEventTypeNotRegisteredError creates a new EventTypeNotRegisteredError.

func (*EventTypeNotRegisteredError) Error

Error returns the error message.

func (*EventTypeNotRegisteredError) Is

func (e *EventTypeNotRegisteredError) Is(target error) bool

Is reports whether this error matches the target error.

func (*EventTypeNotRegisteredError) Unwrap

func (e *EventTypeNotRegisteredError) Unwrap() error

Unwrap returns the underlying error for errors.Unwrap().

type Filter added in v0.3.0

type Filter struct {
	// Field is the field name to filter on.
	Field string

	// Op is the comparison operator.
	Op FilterOp

	// Value is the value to compare against.
	Value interface{}
}

Filter represents a query filter condition.

type FilterOp added in v0.3.0

type FilterOp string

FilterOp represents a filter operation.

const (
	// FilterOpEq matches equal values.
	FilterOpEq FilterOp = "="

	// FilterOpNe matches not equal values.
	FilterOpNe FilterOp = "!="

	// FilterOpGt matches greater than values.
	FilterOpGt FilterOp = ">"

	// FilterOpGte matches greater than or equal values.
	FilterOpGte FilterOp = ">="

	// FilterOpLt matches less than values.
	FilterOpLt FilterOp = "<"

	// FilterOpLte matches less than or equal values.
	FilterOpLte FilterOp = "<="

	// FilterOpIn matches any value in a list.
	FilterOpIn FilterOp = "IN"

	// FilterOpNotIn matches no value in a list.
	FilterOpNotIn FilterOp = "NOT IN"

	// FilterOpLike matches using SQL LIKE pattern.
	FilterOpLike FilterOp = "LIKE"

	// FilterOpIsNull matches null values.
	FilterOpIsNull FilterOp = "IS NULL"

	// FilterOpIsNotNull matches non-null values.
	FilterOpIsNotNull FilterOp = "IS NOT NULL"

	// FilterOpContains matches arrays containing a value.
	FilterOpContains FilterOp = "CONTAINS"

	// FilterOpBetween matches values between two bounds.
	FilterOpBetween FilterOp = "BETWEEN"
)

type GenericHandler added in v0.1.10

type GenericHandler[C Command] struct {
	// contains filtered or unexported fields
}

GenericHandler is a type-safe command handler for a specific command type. Use this to create handlers with compile-time type checking.

func NewGenericHandler added in v0.1.10

func NewGenericHandler[C Command](handler func(ctx context.Context, cmd C) (CommandResult, error)) *GenericHandler[C]

NewGenericHandler creates a new GenericHandler for the specified command type.

func (*GenericHandler[C]) CommandType added in v0.1.10

func (h *GenericHandler[C]) CommandType() string

CommandType returns the command type this handler processes.

func (*GenericHandler[C]) Handle added in v0.1.10

func (h *GenericHandler[C]) Handle(ctx context.Context, cmd Command) (CommandResult, error)

Handle processes the command with type checking.

type HandlerNotFoundError added in v0.1.10

type HandlerNotFoundError struct {
	CommandType string
}

HandlerNotFoundError provides detailed information about a missing handler.

func NewHandlerNotFoundError added in v0.1.10

func NewHandlerNotFoundError(cmdType string) *HandlerNotFoundError

NewHandlerNotFoundError creates a new HandlerNotFoundError.

func (*HandlerNotFoundError) Error added in v0.1.10

func (e *HandlerNotFoundError) Error() string

Error returns the error message.

func (*HandlerNotFoundError) Is added in v0.1.10

func (e *HandlerNotFoundError) Is(target error) bool

Is reports whether this error matches the target error.

func (*HandlerNotFoundError) Unwrap added in v0.1.10

func (e *HandlerNotFoundError) Unwrap() error

Unwrap returns the underlying error for errors.Unwrap().

type HandlerRegistry added in v0.1.10

type HandlerRegistry struct {
	// contains filtered or unexported fields
}

HandlerRegistry manages command handler registration and lookup.

func NewHandlerRegistry added in v0.1.10

func NewHandlerRegistry() *HandlerRegistry

NewHandlerRegistry creates a new HandlerRegistry.

func (*HandlerRegistry) Clear added in v0.1.10

func (r *HandlerRegistry) Clear()

Clear removes all handlers.

func (*HandlerRegistry) CommandTypes added in v0.1.10

func (r *HandlerRegistry) CommandTypes() []string

CommandTypes returns all registered command types.

func (*HandlerRegistry) Count added in v0.1.10

func (r *HandlerRegistry) Count() int

Count returns the number of registered handlers.

func (*HandlerRegistry) Get added in v0.1.10

func (r *HandlerRegistry) Get(cmdType string) CommandHandler

Get returns the handler for a command type. Returns nil if no handler is registered.

func (*HandlerRegistry) Has added in v0.1.10

func (r *HandlerRegistry) Has(cmdType string) bool

Has returns true if a handler is registered for the command type.

func (*HandlerRegistry) Register added in v0.1.10

func (r *HandlerRegistry) Register(handler CommandHandler)

Register adds a handler for a command type. If a handler is already registered for this type, it will be replaced.

func (*HandlerRegistry) RegisterFunc added in v0.1.10

func (r *HandlerRegistry) RegisterFunc(cmdType string, fn func(ctx context.Context, cmd Command) (CommandResult, error))

RegisterFunc registers a handler function for a command type.

func (*HandlerRegistry) Remove added in v0.1.10

func (r *HandlerRegistry) Remove(cmdType string)

Remove removes a handler for a command type.

type IdempotencyConfig added in v0.1.10

type IdempotencyConfig struct {
	// Store is the idempotency store to use.
	Store IdempotencyStore

	// TTL is how long to keep idempotency records.
	// Default is 24 hours.
	TTL time.Duration

	// KeyGenerator generates idempotency keys from commands.
	// If nil, GetIdempotencyKey is used.
	KeyGenerator func(Command) string

	// StoreErrors determines if failed commands should be stored.
	// If true, replaying a failed command returns the same error.
	// If false, failed commands can be retried.
	// Default is false.
	StoreErrors bool

	// SkipCommands is a list of command types to skip idempotency checking.
	SkipCommands []string
}

IdempotencyConfig configures the idempotency middleware.

func DefaultIdempotencyConfig added in v0.1.10

func DefaultIdempotencyConfig(store IdempotencyStore) IdempotencyConfig

DefaultIdempotencyConfig returns a default idempotency configuration.

type IdempotencyRecord added in v0.1.10

type IdempotencyRecord = adapters.IdempotencyRecord

IdempotencyRecord stores information about a processed command.

func NewIdempotencyRecord added in v0.1.10

func NewIdempotencyRecord(key, cmdType string, result CommandResult, ttl time.Duration) *IdempotencyRecord

NewIdempotencyRecord creates a new IdempotencyRecord from a CommandResult.

type IdempotencyReplayError added in v0.1.10

type IdempotencyReplayError struct {
	Key     string
	Message string
}

IdempotencyReplayError indicates a command was already processed.

func (*IdempotencyReplayError) Error added in v0.1.10

func (e *IdempotencyReplayError) Error() string

func (*IdempotencyReplayError) Is added in v0.1.10

func (e *IdempotencyReplayError) Is(target error) bool

func (*IdempotencyReplayError) Unwrap added in v0.1.10

func (e *IdempotencyReplayError) Unwrap() error

type IdempotencyStore added in v0.1.10

type IdempotencyStore = adapters.IdempotencyStore

IdempotencyStore tracks processed commands to prevent duplicate processing.

type IdempotentCommand added in v0.1.10

type IdempotentCommand interface {
	Command

	// IdempotencyKey returns a unique key for deduplication.
	// Commands with the same key will only be processed once.
	IdempotencyKey() string
}

IdempotentCommand is a command that supports idempotency.

type InMemoryRepository added in v0.3.0

type InMemoryRepository[T any] struct {
	// contains filtered or unexported fields
}

InMemoryRepository provides an in-memory implementation of ReadModelRepository. Useful for testing and prototyping.

func NewInMemoryRepository added in v0.3.0

func NewInMemoryRepository[T any](getID func(*T) string) *InMemoryRepository[T]

NewInMemoryRepository creates a new in-memory repository. The getID function extracts the ID from a read model.

func (*InMemoryRepository[T]) Clear added in v0.3.0

func (r *InMemoryRepository[T]) Clear(ctx context.Context) error

Clear removes all read models.

func (*InMemoryRepository[T]) Count added in v0.3.0

func (r *InMemoryRepository[T]) Count(ctx context.Context, query Query) (int64, error)

Count returns the number of read models matching the query. Note: This basic implementation ignores query filters and returns total count. For production use with filtering, implement a database-backed repository.

func (*InMemoryRepository[T]) Delete added in v0.3.0

func (r *InMemoryRepository[T]) Delete(ctx context.Context, id string) error

Delete removes a read model by ID.

func (*InMemoryRepository[T]) DeleteMany added in v0.3.0

func (r *InMemoryRepository[T]) DeleteMany(ctx context.Context, query Query) (int64, error)

DeleteMany removes all read models matching the query. Note: This basic implementation ignores query filters and deletes all items when filters are provided. For production use with filtering, implement a database-backed repository.

func (*InMemoryRepository[T]) Exists added in v0.3.0

func (r *InMemoryRepository[T]) Exists(ctx context.Context, id string) (bool, error)

Exists checks if a read model with the given ID exists.

func (*InMemoryRepository[T]) Find added in v0.3.0

func (r *InMemoryRepository[T]) Find(ctx context.Context, query Query) ([]*T, error)

Find queries read models with the given criteria. Note: This is a basic implementation that doesn't support all filter operations.

func (*InMemoryRepository[T]) FindOne added in v0.3.0

func (r *InMemoryRepository[T]) FindOne(ctx context.Context, query Query) (*T, error)

FindOne returns the first read model matching the query.

func (*InMemoryRepository[T]) Get added in v0.3.0

func (r *InMemoryRepository[T]) Get(ctx context.Context, id string) (*T, error)

Get retrieves a read model by ID.

func (*InMemoryRepository[T]) GetAll added in v0.3.0

func (r *InMemoryRepository[T]) GetAll(ctx context.Context) ([]*T, error)

GetAll returns all read models in the repository.

func (*InMemoryRepository[T]) GetMany added in v0.3.0

func (r *InMemoryRepository[T]) GetMany(ctx context.Context, ids []string) ([]*T, error)

GetMany retrieves multiple read models by their IDs.

func (*InMemoryRepository[T]) Insert added in v0.3.0

func (r *InMemoryRepository[T]) Insert(ctx context.Context, model *T) error

Insert creates a new read model.

func (*InMemoryRepository[T]) Len added in v0.3.0

func (r *InMemoryRepository[T]) Len() int

Len returns the number of items in the repository.

func (*InMemoryRepository[T]) Update added in v0.3.0

func (r *InMemoryRepository[T]) Update(ctx context.Context, id string, updateFn func(*T)) error

Update modifies an existing read model.

func (*InMemoryRepository[T]) Upsert added in v0.3.0

func (r *InMemoryRepository[T]) Upsert(ctx context.Context, model *T) error

Upsert creates or updates a read model.

type InlineProjection added in v0.3.0

type InlineProjection interface {
	Projection

	// Apply processes a single event within the event store transaction.
	// The projection should update its read model based on the event.
	Apply(ctx context.Context, event StoredEvent) error
}

InlineProjection processes events in the same transaction as the event store append. This provides strong consistency but may impact write performance.

type JSONSerializer

type JSONSerializer struct {
	// contains filtered or unexported fields
}

JSONSerializer is the default Serializer implementation using JSON encoding.

func NewJSONSerializer

func NewJSONSerializer() *JSONSerializer

NewJSONSerializer creates a new JSONSerializer with an empty registry.

func NewJSONSerializerWithRegistry

func NewJSONSerializerWithRegistry(registry *EventRegistry) *JSONSerializer

NewJSONSerializerWithRegistry creates a new JSONSerializer with the given registry.

func (*JSONSerializer) Deserialize

func (s *JSONSerializer) Deserialize(data []byte, eventType string) (interface{}, error)

Deserialize converts JSON bytes back to an event. If the event type is registered, returns a value of that type. Otherwise, returns a map[string]interface{}.

func (*JSONSerializer) Register

func (s *JSONSerializer) Register(eventType string, example interface{})

Register adds an event type to the serializer's registry.

func (*JSONSerializer) RegisterAll

func (s *JSONSerializer) RegisterAll(examples ...interface{})

RegisterAll registers multiple events using their struct names as type names.

func (*JSONSerializer) Registry

func (s *JSONSerializer) Registry() *EventRegistry

Registry returns the underlying EventRegistry.

func (*JSONSerializer) Serialize

func (s *JSONSerializer) Serialize(event interface{}) ([]byte, error)

Serialize converts an event to JSON bytes.

type LiveOptions added in v0.3.0

type LiveOptions struct {
	// BufferSize is the size of the event channel buffer.
	// Default: 1000
	BufferSize int
}

LiveOptions configures live projection behavior.

func DefaultLiveOptions added in v0.3.0

func DefaultLiveOptions() LiveOptions

DefaultLiveOptions returns the default live projection options.

type LiveProjection added in v0.3.0

type LiveProjection interface {
	Projection

	// OnEvent is called for each event in real-time.
	// This method should not block for long periods.
	OnEvent(ctx context.Context, event StoredEvent)

	// IsTransient returns true if this projection doesn't persist state.
	// Transient projections are not checkpointed.
	IsTransient() bool
}

LiveProjection receives events in real-time for dashboards and notifications. These projections are transient and don't persist state.

type LiveProjectionBase added in v0.3.0

type LiveProjectionBase struct {
	ProjectionBase
	// contains filtered or unexported fields
}

LiveProjectionBase provides a default implementation of LiveProjection. Embed this struct and override OnEvent to create live projections.

func NewLiveProjectionBase added in v0.3.0

func NewLiveProjectionBase(name string, transient bool, handledEvents ...string) LiveProjectionBase

NewLiveProjectionBase creates a new LiveProjectionBase.

func (*LiveProjectionBase) IsTransient added in v0.3.0

func (p *LiveProjectionBase) IsTransient() bool

IsTransient returns whether this projection is transient.

type Logger

type Logger interface {
	Debug(msg string, args ...interface{})
	Info(msg string, args ...interface{})
	Warn(msg string, args ...interface{})
	Error(msg string, args ...interface{})
}

Logger defines the logging interface for the event store.

type LoggingMiddleware added in v0.1.10

type LoggingMiddleware struct {
	// contains filtered or unexported fields
}

LoggingMiddleware logs command execution.

func NewLoggingMiddleware added in v0.1.10

func NewLoggingMiddleware(logger Logger) *LoggingMiddleware

NewLoggingMiddleware creates a new LoggingMiddleware.

func (*LoggingMiddleware) Middleware added in v0.1.10

func (m *LoggingMiddleware) Middleware() Middleware

Middleware returns the middleware function.

type Metadata

type Metadata struct {
	// CorrelationID links related events across services for distributed tracing.
	CorrelationID string `json:"correlationId,omitempty"`

	// CausationID identifies the event or command that caused this event.
	CausationID string `json:"causationId,omitempty"`

	// UserID identifies the user who triggered this event.
	UserID string `json:"userId,omitempty"`

	// TenantID identifies the tenant for multi-tenant applications.
	TenantID string `json:"tenantId,omitempty"`

	// Custom contains arbitrary key-value pairs for application-specific metadata.
	Custom map[string]string `json:"custom,omitempty"`
}

Metadata contains contextual information about an event. It supports distributed tracing, multi-tenancy, and custom key-value pairs.

func (Metadata) IsEmpty

func (m Metadata) IsEmpty() bool

IsEmpty reports whether the Metadata has no values set.

func (Metadata) WithCausationID

func (m Metadata) WithCausationID(id string) Metadata

WithCausationID returns a copy of Metadata with the causation ID set.

func (Metadata) WithCorrelationID

func (m Metadata) WithCorrelationID(id string) Metadata

WithCorrelationID returns a copy of Metadata with the correlation ID set.

func (Metadata) WithCustom

func (m Metadata) WithCustom(key, value string) Metadata

WithCustom returns a copy of Metadata with a custom key-value pair added.

func (Metadata) WithTenantID

func (m Metadata) WithTenantID(id string) Metadata

WithTenantID returns a copy of Metadata with the tenant ID set.

func (Metadata) WithUserID

func (m Metadata) WithUserID(id string) Metadata

WithUserID returns a copy of Metadata with the user ID set.

type MetricsCollector added in v0.1.10

type MetricsCollector interface {
	// RecordCommand records a command execution.
	RecordCommand(cmdType string, duration time.Duration, success bool, err error)
}

MetricsMiddleware collects metrics about command execution.

type Middleware added in v0.1.10

type Middleware func(next MiddlewareFunc) MiddlewareFunc

Middleware wraps a handler function with additional functionality.

func CausationIDMiddleware added in v0.1.10

func CausationIDMiddleware() Middleware

CausationIDMiddleware creates middleware that propagates causation IDs. The causation ID links events/commands to the command that caused them. This is essential for tracking the chain of causality in event sourcing.

func ChainMiddleware added in v0.1.10

func ChainMiddleware(middleware ...Middleware) Middleware

ChainMiddleware creates a single middleware from multiple middleware.

func CommandTypeMiddleware added in v0.1.10

func CommandTypeMiddleware(types []string, middleware Middleware) Middleware

CommandTypeMiddleware applies middleware only for specific command types.

func ConditionalMiddleware added in v0.1.10

func ConditionalMiddleware(condition func(Command) bool, middleware Middleware) Middleware

ConditionalMiddleware applies middleware only if the condition is true.

func CorrelationIDMiddleware added in v0.1.10

func CorrelationIDMiddleware(generator func() string) Middleware

CorrelationIDMiddleware creates middleware that propagates correlation IDs.

func IdempotencyMiddleware added in v0.1.10

func IdempotencyMiddleware(config IdempotencyConfig) Middleware

IdempotencyMiddleware creates middleware that prevents duplicate command processing.

func MetricsMiddleware added in v0.1.10

func MetricsMiddleware(collector MetricsCollector) Middleware

MetricsMiddleware creates middleware that records metrics.

func RecoveryMiddleware added in v0.1.10

func RecoveryMiddleware() Middleware

RecoveryMiddleware recovers from panics in handlers and returns them as errors. It captures a sanitized representation of the command data for debugging.

func RetryMiddleware added in v0.1.10

func RetryMiddleware(config RetryConfig) Middleware

RetryMiddleware creates middleware that retries failed commands.

func TenantMiddleware added in v0.1.10

func TenantMiddleware(extractor func(Command) string, required bool) Middleware

TenantMiddleware extracts and validates tenant ID.

func TimeoutMiddleware added in v0.1.10

func TimeoutMiddleware(timeout time.Duration) Middleware

TimeoutMiddleware adds a timeout to command execution.

func ValidationMiddleware added in v0.1.10

func ValidationMiddleware() Middleware

ValidationMiddleware validates commands before they reach the handler. If validation fails, the command is not dispatched.

type MiddlewareFunc added in v0.1.10

type MiddlewareFunc func(ctx context.Context, cmd Command) (CommandResult, error)

MiddlewareFunc is the function signature for command middleware.

type MultiValidationError added in v0.1.10

type MultiValidationError struct {
	// CommandType is the type of command that failed validation.
	CommandType string

	// Errors contains all validation errors.
	Errors []*ValidationError
}

MultiValidationError contains multiple validation errors.

func NewMultiValidationError added in v0.1.10

func NewMultiValidationError(cmdType string) *MultiValidationError

NewMultiValidationError creates a new MultiValidationError.

func (*MultiValidationError) Add added in v0.1.10

Add adds a validation error.

func (*MultiValidationError) AddField added in v0.1.10

func (e *MultiValidationError) AddField(field, message string)

AddField adds a validation error for a specific field.

func (*MultiValidationError) Error added in v0.1.10

func (e *MultiValidationError) Error() string

Error returns the error message.

func (*MultiValidationError) HasErrors added in v0.1.10

func (e *MultiValidationError) HasErrors() bool

HasErrors returns true if there are any validation errors.

func (*MultiValidationError) Is added in v0.1.10

func (e *MultiValidationError) Is(target error) bool

Is reports whether this error matches the target error.

func (*MultiValidationError) Unwrap added in v0.1.10

func (e *MultiValidationError) Unwrap() error

Unwrap returns the first error for errors.Unwrap().

type Option

type Option func(*EventStore)

Option configures an EventStore.

func WithLogger

func WithLogger(l Logger) Option

WithLogger sets a custom logger.

func WithSerializer

func WithSerializer(s Serializer) Option

WithSerializer sets a custom serializer.

type OrderBy added in v0.3.0

type OrderBy struct {
	// Field is the field name to sort by.
	Field string

	// Desc specifies descending order.
	Desc bool
}

OrderBy represents a sort order.

type PanicError added in v0.1.10

type PanicError struct {
	CommandType string
	Value       interface{}
	Stack       string
	// CommandData contains a sanitized JSON representation of the command for debugging.
	// Sensitive fields should be masked by the caller before setting this field.
	CommandData string
}

PanicError provides detailed information about a handler panic.

func NewPanicError added in v0.1.10

func NewPanicError(cmdType string, value interface{}, stack string) *PanicError

NewPanicError creates a new PanicError.

func NewPanicErrorWithCommand added in v0.1.10

func NewPanicErrorWithCommand(cmdType string, value interface{}, stack string, commandData string) *PanicError

NewPanicErrorWithCommand creates a new PanicError with command data for debugging. The commandData should be a sanitized representation of the command (sensitive fields masked).

func (*PanicError) Error added in v0.1.10

func (e *PanicError) Error() string

Error returns the error message.

func (*PanicError) Is added in v0.1.10

func (e *PanicError) Is(target error) bool

Is reports whether this error matches the target error.

func (*PanicError) Unwrap added in v0.1.10

func (e *PanicError) Unwrap() error

Unwrap returns the underlying error for errors.Unwrap().

type ParallelRebuilder added in v0.3.0

type ParallelRebuilder struct {
	// contains filtered or unexported fields
}

ParallelRebuilder rebuilds multiple projections in parallel.

func NewParallelRebuilder added in v0.3.0

func NewParallelRebuilder(rebuilder *ProjectionRebuilder, concurrency int) *ParallelRebuilder

NewParallelRebuilder creates a new parallel rebuilder.

func (*ParallelRebuilder) RebuildAll added in v0.3.0

func (pr *ParallelRebuilder) RebuildAll(ctx context.Context, projections []AsyncProjection, opts ...RebuildOptions) error

RebuildAll rebuilds multiple async projections in parallel.

type PollingSubscription added in v0.3.0

type PollingSubscription struct {
	// contains filtered or unexported fields
}

PollingSubscription polls the event store for new events. This is a fallback when push-based subscriptions aren't available.

func NewPollingSubscription added in v0.3.0

func NewPollingSubscription(
	store *EventStore,
	fromPosition uint64,
	opts ...SubscriptionOptions,
) *PollingSubscription

NewPollingSubscription creates a new polling subscription.

func (*PollingSubscription) Close added in v0.3.0

func (s *PollingSubscription) Close() error

Close stops the subscription.

func (*PollingSubscription) Err added in v0.3.0

func (s *PollingSubscription) Err() error

Err returns any error that caused the subscription to close.

func (*PollingSubscription) Events added in v0.3.0

func (s *PollingSubscription) Events() <-chan StoredEvent

Events returns the channel for receiving events.

func (*PollingSubscription) Start added in v0.3.0

func (s *PollingSubscription) Start(ctx context.Context, pollInterval time.Duration)

Start begins polling for events.

type ProgressCallback added in v0.3.0

type ProgressCallback func(progress RebuildProgress)

ProgressCallback is called periodically during rebuild with progress updates.

type Projection added in v0.3.0

type Projection interface {
	// Name returns the unique identifier for this projection.
	// This name is used for checkpointing and management.
	Name() string

	// HandledEvents returns the list of event types this projection handles.
	// An empty list means the projection handles all event types.
	HandledEvents() []string
}

Projection is the base interface for all projection types. Projections transform events into optimized read models.

type ProjectionBase added in v0.3.0

type ProjectionBase struct {
	// contains filtered or unexported fields
}

ProjectionBase provides a default partial implementation of Projection. Embed this struct in your projection types to get common functionality.

func NewProjectionBase added in v0.3.0

func NewProjectionBase(name string, handledEvents ...string) ProjectionBase

NewProjectionBase creates a new ProjectionBase.

func (*ProjectionBase) HandledEvents added in v0.3.0

func (p *ProjectionBase) HandledEvents() []string

HandledEvents returns the list of event types this projection handles.

func (*ProjectionBase) HandlesEvent added in v0.3.0

func (p *ProjectionBase) HandlesEvent(eventType string) bool

HandlesEvent returns true if this projection handles the given event type.

func (*ProjectionBase) Name added in v0.3.0

func (p *ProjectionBase) Name() string

Name returns the projection name.

type ProjectionEngine added in v0.3.0

type ProjectionEngine struct {
	// contains filtered or unexported fields
}

ProjectionEngine manages the lifecycle of projections. It handles registration, starting, stopping, and monitoring projections.

func NewProjectionEngine added in v0.3.0

func NewProjectionEngine(store *EventStore, opts ...ProjectionEngineOption) *ProjectionEngine

NewProjectionEngine creates a new ProjectionEngine.

func (*ProjectionEngine) GetAllStatuses added in v0.3.0

func (e *ProjectionEngine) GetAllStatuses() []*ProjectionStatus

GetAllStatuses returns the status of all registered projections.

func (*ProjectionEngine) GetStatus added in v0.3.0

func (e *ProjectionEngine) GetStatus(name string) (*ProjectionStatus, error)

GetStatus returns the status of a projection by name.

func (*ProjectionEngine) IsRunning added in v0.3.0

func (e *ProjectionEngine) IsRunning() bool

IsRunning returns true if the engine is running.

func (*ProjectionEngine) NotifyLiveProjections added in v0.3.0

func (e *ProjectionEngine) NotifyLiveProjections(ctx context.Context, events []StoredEvent)

NotifyLiveProjections notifies all live projections of new events.

func (*ProjectionEngine) ProcessInlineProjections added in v0.3.0

func (e *ProjectionEngine) ProcessInlineProjections(ctx context.Context, events []StoredEvent) error

ProcessInlineProjections processes all inline projections for the given events. This is called by the event store after appending events.

func (*ProjectionEngine) RegisterAsync added in v0.3.0

func (e *ProjectionEngine) RegisterAsync(projection AsyncProjection, opts ...AsyncOptions) error

RegisterAsync registers an async projection with the given options. Async projections are processed in background workers.

func (*ProjectionEngine) RegisterInline added in v0.3.0

func (e *ProjectionEngine) RegisterInline(projection InlineProjection) error

RegisterInline registers an inline projection. Inline projections are processed synchronously with event appends.

func (*ProjectionEngine) RegisterLive added in v0.3.0

func (e *ProjectionEngine) RegisterLive(projection LiveProjection, opts ...LiveOptions) error

RegisterLive registers a live projection with optional configuration. Live projections receive events in real-time.

func (*ProjectionEngine) Start added in v0.3.0

func (e *ProjectionEngine) Start(ctx context.Context) error

Start starts the projection engine and all registered projections.

func (*ProjectionEngine) Stop added in v0.3.0

func (e *ProjectionEngine) Stop(ctx context.Context) error

Stop gracefully stops the projection engine.

func (*ProjectionEngine) Unregister added in v0.3.0

func (e *ProjectionEngine) Unregister(name string) error

Unregister removes a projection by name.

type ProjectionEngineOption added in v0.3.0

type ProjectionEngineOption func(*ProjectionEngine)

ProjectionEngineOption configures a ProjectionEngine.

func WithCheckpointStore added in v0.3.0

func WithCheckpointStore(store CheckpointStore) ProjectionEngineOption

WithCheckpointStore sets the checkpoint store for the engine.

func WithProjectionLogger added in v0.3.0

func WithProjectionLogger(logger Logger) ProjectionEngineOption

WithProjectionLogger sets the logger for the engine.

func WithProjectionMetrics added in v0.3.0

func WithProjectionMetrics(metrics ProjectionMetrics) ProjectionEngineOption

WithProjectionMetrics sets the metrics collector for the engine.

type ProjectionError added in v0.3.0

type ProjectionError struct {
	ProjectionName string
	EventType      string
	Position       uint64
	Cause          error
}

ProjectionError provides detailed information about a projection failure.

func NewProjectionError added in v0.3.0

func NewProjectionError(projectionName, eventType string, position uint64, cause error) *ProjectionError

NewProjectionError creates a new ProjectionError.

func (*ProjectionError) Error added in v0.3.0

func (e *ProjectionError) Error() string

Error returns the error message.

func (*ProjectionError) Is added in v0.3.0

func (e *ProjectionError) Is(target error) bool

Is reports whether this error matches the target error.

func (*ProjectionError) Unwrap added in v0.3.0

func (e *ProjectionError) Unwrap() error

Unwrap returns the underlying cause for errors.Unwrap().

type ProjectionMetrics added in v0.3.0

type ProjectionMetrics interface {
	// RecordEventProcessed records that an event was processed.
	RecordEventProcessed(projectionName, eventType string, duration time.Duration, success bool)

	// RecordBatchProcessed records that a batch of events was processed.
	RecordBatchProcessed(projectionName string, count int, duration time.Duration, success bool)

	// RecordCheckpoint records a checkpoint update.
	RecordCheckpoint(projectionName string, position uint64)

	// RecordError records a projection error.
	RecordError(projectionName string, err error)
}

ProjectionMetrics collects metrics about projection processing.

type ProjectionRebuilder added in v0.3.0

type ProjectionRebuilder struct {
	// contains filtered or unexported fields
}

ProjectionRebuilder rebuilds projections from scratch. It replays all events through a projection to reconstruct its read model.

func NewProjectionRebuilder added in v0.3.0

func NewProjectionRebuilder(store *EventStore, checkpointStore CheckpointStore, opts ...ProjectionRebuilderOption) *ProjectionRebuilder

NewProjectionRebuilder creates a new projection rebuilder.

func (*ProjectionRebuilder) RebuildAsync added in v0.3.0

func (r *ProjectionRebuilder) RebuildAsync(ctx context.Context, projection AsyncProjection, opts ...RebuildOptions) error

RebuildAsync rebuilds an async projection from scratch.

func (*ProjectionRebuilder) RebuildInline added in v0.3.0

func (r *ProjectionRebuilder) RebuildInline(ctx context.Context, projection InlineProjection, opts ...RebuildOptions) error

RebuildInline rebuilds an inline projection from scratch.

type ProjectionRebuilderOption added in v0.3.0

type ProjectionRebuilderOption func(*ProjectionRebuilder)

ProjectionRebuilderOption configures a ProjectionRebuilder.

func WithRebuilderBatchSize added in v0.3.0

func WithRebuilderBatchSize(size int) ProjectionRebuilderOption

WithRebuilderBatchSize sets the batch size for rebuilding.

func WithRebuilderLogger added in v0.3.0

func WithRebuilderLogger(logger Logger) ProjectionRebuilderOption

WithRebuilderLogger sets the logger for the rebuilder.

func WithRebuilderMetrics added in v0.3.0

func WithRebuilderMetrics(metrics ProjectionMetrics) ProjectionRebuilderOption

WithRebuilderMetrics sets the metrics collector for the rebuilder.

type ProjectionState added in v0.3.0

type ProjectionState string

ProjectionState represents the current state of a projection.

const (
	// ProjectionStateStopped indicates the projection is not running.
	ProjectionStateStopped ProjectionState = "stopped"

	// ProjectionStateRunning indicates the projection is actively processing events.
	ProjectionStateRunning ProjectionState = "running"

	// ProjectionStatePaused indicates the projection is paused.
	ProjectionStatePaused ProjectionState = "paused"

	// ProjectionStateFaulted indicates the projection has encountered an error.
	ProjectionStateFaulted ProjectionState = "faulted"

	// ProjectionStateRebuilding indicates the projection is being rebuilt.
	ProjectionStateRebuilding ProjectionState = "rebuilding"

	// ProjectionStateCatchingUp indicates the projection is catching up to current events.
	ProjectionStateCatchingUp ProjectionState = "catching_up"
)

type ProjectionStatus added in v0.3.0

type ProjectionStatus struct {
	// Name is the projection name.
	Name string

	// State is the current state of the projection.
	State ProjectionState

	// LastPosition is the global position of the last processed event.
	LastPosition uint64

	// EventsProcessed is the total number of events processed.
	EventsProcessed uint64

	// LastProcessedAt is when the last event was processed.
	LastProcessedAt time.Time

	// Error contains the error message if the projection is faulted.
	Error string

	// Lag is the number of events behind the head of the event store.
	Lag uint64

	// AverageLatency is the average time to process an event.
	AverageLatency time.Duration
}

ProjectionStatus provides detailed information about a projection's current state.

type Query added in v0.3.0

type Query struct {
	// Filters to apply.
	Filters []Filter

	// Ordering criteria.
	OrderBy []OrderBy

	// Maximum number of results to return.
	// 0 means no limit.
	Limit int

	// Number of results to skip.
	Offset int

	// IncludeCount includes the total count in paginated results.
	IncludeCount bool
}

Query represents a query for read models.

func NewQuery added in v0.3.0

func NewQuery() *Query

NewQuery creates a new empty Query.

func (*Query) And added in v0.3.0

func (q *Query) And(field string, op FilterOp, value interface{}) *Query

And is an alias for Where for readability.

func (*Query) Build added in v0.3.0

func (q *Query) Build() Query

Build returns a copy of the query (useful for chaining).

func (*Query) OrderByAsc added in v0.3.0

func (q *Query) OrderByAsc(field string) *Query

OrderByAsc adds ascending order.

func (*Query) OrderByDesc added in v0.3.0

func (q *Query) OrderByDesc(field string) *Query

OrderByDesc adds descending order.

func (*Query) Where added in v0.3.0

func (q *Query) Where(field string, op FilterOp, value interface{}) *Query

Where adds a filter condition.

func (*Query) WithCount added in v0.3.0

func (q *Query) WithCount() *Query

WithCount includes total count in results.

func (*Query) WithLimit added in v0.3.0

func (q *Query) WithLimit(limit int) *Query

WithLimit sets the maximum number of results.

func (*Query) WithOffset added in v0.3.0

func (q *Query) WithOffset(offset int) *Query

WithOffset sets the number of results to skip.

func (*Query) WithPagination added in v0.3.0

func (q *Query) WithPagination(page, pageSize int) *Query

WithPagination sets limit and offset for pagination.

type QueryResult added in v0.3.0

type QueryResult[T any] struct {
	// Items contains the matching read models.
	Items []*T

	// TotalCount is the total number of matching items (before pagination).
	// Only populated if IncludeCount was true in the query.
	TotalCount int64

	// HasMore indicates if there are more results beyond the limit.
	HasMore bool
}

QueryResult contains query results with optional count.

type ReadModelID added in v0.3.0

type ReadModelID interface {
	// GetID returns the read model's unique identifier.
	GetID() string
}

ReadModelID is an interface for read models that can return their ID.

type ReadModelRepository added in v0.3.0

type ReadModelRepository[T any] interface {
	// Get retrieves a read model by ID.
	// Returns ErrNotFound if not found.
	Get(ctx context.Context, id string) (*T, error)

	// GetMany retrieves multiple read models by their IDs.
	// Missing IDs are silently skipped.
	GetMany(ctx context.Context, ids []string) ([]*T, error)

	// Find queries read models with the given criteria.
	Find(ctx context.Context, query Query) ([]*T, error)

	// FindOne returns the first read model matching the query.
	// Returns ErrNotFound if no match.
	FindOne(ctx context.Context, query Query) (*T, error)

	// Count returns the number of read models matching the query.
	Count(ctx context.Context, query Query) (int64, error)

	// Insert creates a new read model.
	// Returns ErrAlreadyExists if ID already exists.
	Insert(ctx context.Context, model *T) error

	// Update modifies an existing read model.
	// Returns ErrNotFound if not found.
	Update(ctx context.Context, id string, updateFn func(*T)) error

	// Upsert creates or updates a read model.
	Upsert(ctx context.Context, model *T) error

	// Delete removes a read model by ID.
	// Returns ErrNotFound if not found.
	Delete(ctx context.Context, id string) error

	// DeleteMany removes all read models matching the query.
	// Returns the number of deleted models.
	DeleteMany(ctx context.Context, query Query) (int64, error)

	// Clear removes all read models.
	Clear(ctx context.Context) error
}

ReadModelRepository provides generic CRUD operations for read models. T is the read model type.

type RebuildOptions added in v0.3.0

type RebuildOptions struct {
	// DeleteCheckpoint deletes the existing checkpoint before rebuilding.
	// Default: true
	DeleteCheckpoint bool

	// ClearReadModel calls the projection's Clear method before rebuilding.
	// Only applicable for projections that implement Clearable.
	// Default: true
	ClearReadModel bool

	// ProgressCallback is called periodically with progress updates.
	ProgressCallback ProgressCallback

	// ProgressInterval is how often to call the progress callback.
	// Default: 1 second
	ProgressInterval time.Duration

	// FromPosition starts rebuilding from a specific position.
	// Default: 0 (from beginning)
	FromPosition uint64

	// ToPosition stops rebuilding at a specific position.
	// Default: 0 (to end)
	ToPosition uint64
}

RebuildOptions configures a projection rebuild.

func DefaultRebuildOptions added in v0.3.0

func DefaultRebuildOptions() RebuildOptions

DefaultRebuildOptions returns the default rebuild options.

type RebuildProgress added in v0.3.0

type RebuildProgress struct {
	// ProjectionName is the name of the projection being rebuilt.
	ProjectionName string

	// TotalEvents is the total number of events to process.
	TotalEvents uint64

	// ProcessedEvents is the number of events processed so far.
	ProcessedEvents uint64

	// CurrentPosition is the current global position.
	CurrentPosition uint64

	// StartedAt is when the rebuild started.
	StartedAt time.Time

	// Duration is the elapsed time.
	Duration time.Duration

	// EventsPerSecond is the processing rate.
	EventsPerSecond float64

	// EstimatedRemaining is the estimated time remaining.
	EstimatedRemaining time.Duration

	// Completed indicates if the rebuild is complete.
	Completed bool

	// Error contains any error that occurred.
	Error error
}

RebuildProgress tracks the progress of a projection rebuild.

type RetryConfig added in v0.1.10

type RetryConfig struct {
	// MaxAttempts is the maximum number of attempts (including the first one).
	MaxAttempts int

	// InitialDelay is the initial delay between retries.
	InitialDelay time.Duration

	// MaxDelay is the maximum delay between retries.
	MaxDelay time.Duration

	// Multiplier is the factor by which the delay increases on each retry.
	Multiplier float64

	// ShouldRetry determines if an error should be retried.
	// If nil, all errors are retried.
	ShouldRetry func(err error) bool
}

RetryMiddleware retries failed commands.

func DefaultRetryConfig added in v0.1.10

func DefaultRetryConfig() RetryConfig

DefaultRetryConfig returns a default retry configuration.

type RetryPolicy added in v0.3.0

type RetryPolicy interface {
	// ShouldRetry returns true if the operation should be retried.
	ShouldRetry(attempt int, err error) bool

	// Delay returns the duration to wait before the next retry.
	Delay(attempt int) time.Duration
}

RetryPolicy defines how to handle retries for failed operations.

func ExponentialBackoffRetry added in v0.3.0

func ExponentialBackoffRetry(maxRetries int, baseDelay, maxDelay time.Duration) RetryPolicy

ExponentialBackoffRetry creates a new retry policy with exponential backoff.

func NoRetry added in v0.3.0

func NoRetry() RetryPolicy

NoRetry returns a retry policy that never retries.

type Saga added in v0.5.0

type Saga interface {
	// SagaID returns the unique identifier for this saga instance.
	SagaID() string

	// SagaType returns the type of this saga (e.g., "OrderFulfillment").
	SagaType() string

	// Status returns the current status of the saga.
	Status() SagaStatus

	// SetStatus sets the saga status.
	SetStatus(status SagaStatus)

	// CurrentStep returns the current step number (0-based).
	CurrentStep() int

	// SetCurrentStep sets the current step number.
	SetCurrentStep(step int)

	// CorrelationID returns the correlation ID for this saga.
	// Used to correlate events to this saga instance.
	CorrelationID() string

	// SetCorrelationID sets the correlation ID.
	SetCorrelationID(id string)

	// HandledEvents returns the list of event types this saga handles.
	HandledEvents() []string

	// HandleEvent processes an event and returns commands to dispatch.
	// The returned commands will be executed by the saga manager.
	HandleEvent(ctx context.Context, event StoredEvent) ([]Command, error)

	// Compensate is called when the saga needs to rollback.
	// It returns compensating commands to undo previous steps.
	Compensate(ctx context.Context, failedStep int, failureReason error) ([]Command, error)

	// IsComplete returns true if the saga has completed successfully.
	IsComplete() bool

	// StartedAt returns when the saga started.
	StartedAt() time.Time

	// SetStartedAt sets when the saga started.
	SetStartedAt(t time.Time)

	// CompletedAt returns when the saga completed (nil if not completed).
	CompletedAt() *time.Time

	// SetCompletedAt sets when the saga completed.
	SetCompletedAt(t *time.Time)

	// Data returns the saga's internal state as a map.
	// This is serialized and stored in the saga store.
	Data() map[string]interface{}

	// SetData restores the saga's internal state from a map.
	SetData(data map[string]interface{})

	// Version returns the saga version for optimistic concurrency.
	Version() int64

	// SetVersion sets the saga version.
	SetVersion(v int64)

	// IncrementVersion increments the saga version.
	IncrementVersion()
}

Saga defines the interface for saga implementations. A saga coordinates long-running business processes across multiple aggregates.

type SagaBase added in v0.5.0

type SagaBase struct {
	// contains filtered or unexported fields
}

SagaBase provides a default partial implementation of the Saga interface. Embed this struct in your saga types to get default behavior.

func NewSagaBase added in v0.5.0

func NewSagaBase(id, sagaType string) SagaBase

NewSagaBase creates a new SagaBase with the given ID and type.

func (*SagaBase) Complete added in v0.5.0

func (s *SagaBase) Complete()

Complete marks the saga as completed.

func (*SagaBase) CompletedAt added in v0.5.0

func (s *SagaBase) CompletedAt() *time.Time

CompletedAt returns when the saga completed.

func (*SagaBase) CorrelationID added in v0.5.0

func (s *SagaBase) CorrelationID() string

CorrelationID returns the correlation ID.

func (*SagaBase) CurrentStep added in v0.5.0

func (s *SagaBase) CurrentStep() int

CurrentStep returns the current step number.

func (*SagaBase) Fail added in v0.5.0

func (s *SagaBase) Fail()

Fail marks the saga as failed.

func (*SagaBase) IncrementVersion added in v0.5.0

func (s *SagaBase) IncrementVersion()

IncrementVersion increments the saga version.

func (*SagaBase) MarkCompensated added in v0.5.0

func (s *SagaBase) MarkCompensated()

MarkCompensated marks the saga as compensated.

func (*SagaBase) SagaID added in v0.5.0

func (s *SagaBase) SagaID() string

SagaID returns the saga's unique identifier.

func (*SagaBase) SagaType added in v0.5.0

func (s *SagaBase) SagaType() string

SagaType returns the saga type.

func (*SagaBase) SetCompletedAt added in v0.5.0

func (s *SagaBase) SetCompletedAt(t *time.Time)

SetCompletedAt sets when the saga completed.

func (*SagaBase) SetCorrelationID added in v0.5.0

func (s *SagaBase) SetCorrelationID(id string)

SetCorrelationID sets the correlation ID.

func (*SagaBase) SetCurrentStep added in v0.5.0

func (s *SagaBase) SetCurrentStep(step int)

SetCurrentStep sets the current step number.

func (*SagaBase) SetID added in v0.5.0

func (s *SagaBase) SetID(id string)

SetID sets the saga's ID.

func (*SagaBase) SetStartedAt added in v0.5.0

func (s *SagaBase) SetStartedAt(t time.Time)

SetStartedAt sets when the saga started.

func (*SagaBase) SetStatus added in v0.5.0

func (s *SagaBase) SetStatus(status SagaStatus)

SetStatus sets the saga status.

func (*SagaBase) SetType added in v0.5.0

func (s *SagaBase) SetType(t string)

SetType sets the saga type.

func (*SagaBase) SetVersion added in v0.5.0

func (s *SagaBase) SetVersion(v int64)

SetVersion sets the saga version.

func (*SagaBase) StartCompensation added in v0.5.0

func (s *SagaBase) StartCompensation()

StartCompensation marks the saga as compensating.

func (*SagaBase) StartedAt added in v0.5.0

func (s *SagaBase) StartedAt() time.Time

StartedAt returns when the saga started.

func (*SagaBase) Status added in v0.5.0

func (s *SagaBase) Status() SagaStatus

Status returns the current status.

func (*SagaBase) Version added in v0.5.0

func (s *SagaBase) Version() int64

Version returns the saga version.

type SagaCorrelation added in v0.5.0

type SagaCorrelation struct {
	// SagaType is the type of saga this correlation applies to.
	SagaType string

	// EventTypes are the event types that can start this saga.
	StartingEvents []string

	// CorrelationIDFunc extracts the correlation ID from an event.
	// This is used to find existing sagas or create new ones.
	CorrelationIDFunc func(event StoredEvent) string
}

SagaCorrelation provides strategies for correlating events to sagas.

type SagaFactory added in v0.5.0

type SagaFactory func(id string) Saga

SagaFactory creates new saga instances.

type SagaFailedError added in v0.5.0

type SagaFailedError struct {
	SagaID      string
	SagaType    string
	FailedStep  int
	Reason      string
	Recoverable bool
}

SagaFailedError provides detailed information about a saga failure.

func NewSagaFailedError added in v0.5.0

func NewSagaFailedError(sagaID, sagaType string, failedStep int, reason string, recoverable bool) *SagaFailedError

NewSagaFailedError creates a new SagaFailedError.

func (*SagaFailedError) Error added in v0.5.0

func (e *SagaFailedError) Error() string

Error returns the error message.

func (*SagaFailedError) Is added in v0.5.0

func (e *SagaFailedError) Is(target error) bool

Is reports whether this error matches the target error.

type SagaManager added in v0.5.0

type SagaManager struct {
	// contains filtered or unexported fields
}

SagaManager orchestrates saga lifecycle and event processing. It subscribes to events, routes them to appropriate sagas, and dispatches resulting commands.

Concurrency and Idempotency

SagaManager provides several mechanisms to ensure correct saga processing under concurrent access:

  1. Per-Saga Locking: Each saga ID has an associated mutex that serializes access. This prevents race conditions when the same event is delivered from multiple sources (e.g., pg_notify + polling) or when multiple events for the same saga arrive simultaneously.

  2. Fresh State Loading: Before processing each event, the saga state is loaded fresh from the store. This ensures terminal status checks and idempotency checks see the latest state.

  3. Event Idempotency: Processed events are tracked in the SagaState.ProcessedEvents field (not in the saga's Data map) to prevent duplicate processing on retries. This is handled transparently by the SagaManager - saga implementations don't need to preserve these internal tracking fields.

  4. Optimistic Concurrency: The saga store uses version-based optimistic concurrency control. On conflict, the event is retried with fresh state.

func NewSagaManager added in v0.5.0

func NewSagaManager(eventStore *EventStore, opts ...SagaManagerOption) *SagaManager

NewSagaManager creates a new SagaManager.

func (*SagaManager) FindSagaByCorrelationID added in v0.5.0

func (m *SagaManager) FindSagaByCorrelationID(ctx context.Context, correlationID string) (*SagaState, error)

FindSagaByCorrelationID finds a saga by its correlation ID.

func (*SagaManager) GetSaga added in v0.5.0

func (m *SagaManager) GetSaga(ctx context.Context, sagaID string) (*SagaState, error)

GetSaga retrieves a saga by its ID.

func (*SagaManager) IsRunning added in v0.5.0

func (m *SagaManager) IsRunning() bool

IsRunning returns true if the saga manager is running.

func (*SagaManager) Position added in v0.5.0

func (m *SagaManager) Position() uint64

Position returns the current event position.

func (*SagaManager) ProcessEvent added in v0.5.0

func (m *SagaManager) ProcessEvent(ctx context.Context, event StoredEvent) error

ProcessEvent manually processes a single event (for testing or manual replay).

func (*SagaManager) Register added in v0.5.0

func (m *SagaManager) Register(sagaType string, factory SagaFactory, correlation SagaCorrelation)

Register registers a saga type with its factory and correlation configuration.

func (*SagaManager) RegisterSimple added in v0.5.0

func (m *SagaManager) RegisterSimple(sagaType string, factory SagaFactory, startingEvents ...string)

RegisterSimple registers a saga with a simple correlation based on event stream ID.

func (*SagaManager) SetPosition added in v0.5.0

func (m *SagaManager) SetPosition(pos uint64)

SetPosition sets the starting position for event processing.

func (*SagaManager) Start added in v0.5.0

func (m *SagaManager) Start(ctx context.Context) error

Start begins processing events and routing them to sagas. This method blocks until the context is cancelled.

func (*SagaManager) StartAsync added in v0.5.2

func (m *SagaManager) StartAsync(ctx context.Context) *AsyncResult

StartAsync begins processing events and routing them to sagas in a background goroutine. It returns immediately with an AsyncResult that can be used to:

  • Wait for the saga manager to stop: result.Wait()
  • Wait with timeout: result.WaitWithTimeout(5 * time.Second)
  • Check if stopped: result.IsComplete()
  • Cancel the manager: result.Cancel()
  • Get the error: result.Err()

The saga manager will continue processing until:

  • The provided context is cancelled
  • result.Cancel() is called
  • An unrecoverable error occurs

Example:

result := manager.StartAsync(ctx)

// Do other work while saga manager runs in background...

// Later, when shutting down:
result.Cancel()
if err := result.WaitWithTimeout(10 * time.Second); err != nil {
    log.Printf("Saga manager shutdown: %v", err)
}

func (*SagaManager) StartSaga added in v0.5.2

func (m *SagaManager) StartSaga(ctx context.Context, sagaType string, triggerEvent StoredEvent) error

StartSaga manually triggers a new saga instance synchronously. The saga will be created and the trigger event will be processed immediately.

This is useful when you want to start a saga based on an external trigger rather than waiting for an event to flow through the event store subscription.

func (*SagaManager) StartSagaAsync added in v0.5.2

func (m *SagaManager) StartSagaAsync(ctx context.Context, sagaType string, triggerEvent StoredEvent) *AsyncResult

StartSagaAsync manually triggers a new saga instance asynchronously. The saga will be started and the first event will be processed in a background goroutine. Returns an AsyncResult that can be used to wait for the initial processing to complete.

This is useful when you want to start a saga based on an external trigger rather than waiting for an event to flow through the event store subscription.

Example:

result := manager.StartSagaAsync(ctx, "OrderFulfillment", initialEvent)
if err := result.WaitWithTimeout(5 * time.Second); err != nil {
    log.Printf("Failed to start saga: %v", err)
}

func (*SagaManager) Stop added in v0.5.0

func (m *SagaManager) Stop()

Stop gracefully stops the saga manager.

type SagaManagerOption added in v0.5.0

type SagaManagerOption func(*SagaManager)

SagaManagerOption configures a SagaManager.

func WithCommandBus added in v0.5.0

func WithCommandBus(bus *CommandBus) SagaManagerOption

WithCommandBus sets the command bus for dispatching commands.

func WithSagaLogger added in v0.5.0

func WithSagaLogger(logger Logger) SagaManagerOption

WithSagaLogger sets the logger.

func WithSagaPollInterval added in v0.5.0

func WithSagaPollInterval(d time.Duration) SagaManagerOption

WithSagaPollInterval sets the polling interval for event subscription.

func WithSagaRetryAttempts added in v0.5.0

func WithSagaRetryAttempts(attempts int) SagaManagerOption

WithSagaRetryAttempts sets the number of retry attempts for failed commands.

func WithSagaRetryDelay added in v0.5.0

func WithSagaRetryDelay(d time.Duration) SagaManagerOption

WithSagaRetryDelay sets the delay between retry attempts.

func WithSagaSerializer added in v0.5.0

func WithSagaSerializer(serializer Serializer) SagaManagerOption

WithSagaSerializer sets the serializer for saga data.

func WithSagaStore added in v0.5.0

func WithSagaStore(store SagaStore) SagaManagerOption

WithSagaStore sets the saga store.

type SagaNotFoundError added in v0.5.0

type SagaNotFoundError = adapters.SagaNotFoundError

SagaNotFoundError provides detailed information about a missing saga. This is a type alias to adapters.SagaNotFoundError for consistency.

type SagaState added in v0.5.0

type SagaState = adapters.SagaState

SagaState represents the persisted state of a saga.

func SagaStateFromJSON added in v0.5.0

func SagaStateFromJSON(data []byte) (*SagaState, error)

SagaStateFromJSON parses saga state from JSON.

type SagaStatus added in v0.5.0

type SagaStatus = adapters.SagaStatus

SagaStatus represents the current status of a saga.

type SagaStep added in v0.5.0

type SagaStep = adapters.SagaStep

SagaStep represents a single step in a saga.

type SagaStepStatus added in v0.5.0

type SagaStepStatus = adapters.SagaStepStatus

SagaStepStatus represents the status of a saga step.

type SagaStore added in v0.5.0

type SagaStore = adapters.SagaStore

SagaStore defines the interface for saga persistence.

type SerializationError

type SerializationError struct {
	EventType string
	Operation string // "serialize" or "deserialize"
	Cause     error
}

SerializationError provides detailed information about a serialization failure.

func NewSerializationError

func NewSerializationError(eventType, operation string, cause error) *SerializationError

NewSerializationError creates a new SerializationError.

func (*SerializationError) Error

func (e *SerializationError) Error() string

Error returns the error message.

func (*SerializationError) Is

func (e *SerializationError) Is(target error) bool

Is reports whether this error matches the target error.

func (*SerializationError) Unwrap

func (e *SerializationError) Unwrap() error

Unwrap returns the underlying cause for errors.Unwrap().

type Serializer

type Serializer interface {
	// Serialize converts an event to bytes.
	Serialize(event interface{}) ([]byte, error)

	// Deserialize converts bytes back to an event.
	// The eventType is used to determine the target type.
	Deserialize(data []byte, eventType string) (interface{}, error)
}

Serializer handles event payload serialization and deserialization.

type SimpleDispatcher added in v0.1.10

type SimpleDispatcher struct {
	// contains filtered or unexported fields
}

SimpleDispatcher is a basic dispatcher that forwards commands to handlers.

func NewSimpleDispatcher added in v0.1.10

func NewSimpleDispatcher(registry *HandlerRegistry) *SimpleDispatcher

NewSimpleDispatcher creates a new SimpleDispatcher.

func (*SimpleDispatcher) Dispatch added in v0.1.10

func (d *SimpleDispatcher) Dispatch(ctx context.Context, cmd Command) (CommandResult, error)

Dispatch sends a command to its handler.

type StoredEvent

type StoredEvent struct {
	// ID is the globally unique event identifier.
	ID string

	// StreamID identifies the stream this event belongs to.
	StreamID string

	// Type is the event type identifier.
	Type string

	// Data is the serialized event payload.
	Data []byte

	// Metadata contains contextual information.
	Metadata Metadata

	// Version is the position within the stream (1-based).
	Version int64

	// GlobalPosition is the position across all streams.
	GlobalPosition uint64

	// Timestamp is when the event was stored.
	Timestamp time.Time
}

StoredEvent represents a persisted event with all storage metadata.

type StreamID

type StreamID struct {
	// Category represents the aggregate type (e.g., "Order", "Customer").
	Category string

	// ID is the unique identifier within the category (e.g., "order-123").
	ID string
}

StreamID uniquely identifies an event stream. It consists of a category (aggregate type) and an instance ID.

func NewStreamID

func NewStreamID(category, id string) StreamID

NewStreamID creates a new StreamID from category and ID.

func ParseStreamID

func ParseStreamID(s string) (StreamID, error)

ParseStreamID parses a stream ID string in the format "Category-ID". Returns an error if the format is invalid.

func (StreamID) IsZero

func (s StreamID) IsZero() bool

IsZero reports whether the StreamID is empty.

func (StreamID) String

func (s StreamID) String() string

String returns the stream ID as "Category-ID".

func (StreamID) Validate

func (s StreamID) Validate() error

Validate checks if the StreamID is valid.

type StreamInfo

type StreamInfo struct {
	// StreamID is the stream identifier.
	StreamID string

	// Category is the stream category (aggregate type).
	Category string

	// Version is the current stream version.
	Version int64

	// EventCount is the number of events in the stream.
	EventCount int64

	// CreatedAt is when the stream was created.
	CreatedAt time.Time

	// UpdatedAt is when the stream was last modified.
	UpdatedAt time.Time
}

StreamInfo contains metadata about an event stream.

type StreamNotFoundError

type StreamNotFoundError struct {
	StreamID string
}

StreamNotFoundError provides detailed information about a missing stream.

func NewStreamNotFoundError

func NewStreamNotFoundError(streamID string) *StreamNotFoundError

NewStreamNotFoundError creates a new StreamNotFoundError.

func (*StreamNotFoundError) Error

func (e *StreamNotFoundError) Error() string

Error returns the error message.

func (*StreamNotFoundError) Is

func (e *StreamNotFoundError) Is(target error) bool

Is reports whether this error matches the target error.

func (*StreamNotFoundError) Unwrap

func (e *StreamNotFoundError) Unwrap() error

Unwrap returns the underlying error for errors.Unwrap().

type Subscription added in v0.3.0

type Subscription interface {
	// Events returns the channel for receiving events.
	Events() <-chan StoredEvent

	// Close stops the subscription.
	Close() error

	// Err returns any error that caused the subscription to close.
	Err() error
}

Subscription represents an active event subscription.

type SubscriptionAdapter added in v0.3.0

type SubscriptionAdapter interface {
	// LoadFromPosition loads events starting from a global position.
	LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]StoredEvent, error)

	// SubscribeAll subscribes to all events across all streams.
	SubscribeAll(ctx context.Context, fromPosition uint64) (<-chan StoredEvent, error)

	// SubscribeStream subscribes to events from a specific stream.
	SubscribeStream(ctx context.Context, streamID string, fromVersion int64) (<-chan StoredEvent, error)

	// SubscribeCategory subscribes to all events from streams in a category.
	SubscribeCategory(ctx context.Context, category string, fromPosition uint64) (<-chan StoredEvent, error)
}

SubscriptionAdapter provides methods for subscribing to event streams. This interface extends the basic EventStoreAdapter for subscription capabilities.

type SubscriptionOptions added in v0.3.0

type SubscriptionOptions struct {
	// BufferSize is the size of the event channel buffer.
	// Default: 256
	BufferSize int

	// Filter optionally filters which events are delivered.
	Filter EventFilter

	// RetryOnError determines whether to retry on transient errors.
	// Default: true
	RetryOnError bool

	// RetryInterval is the time to wait between retries.
	// Default: 1 second
	RetryInterval time.Duration

	// MaxRetries is the maximum number of retry attempts.
	// Default: 5
	MaxRetries int
}

SubscriptionOptions configures a subscription.

func DefaultSubscriptionOptions added in v0.3.0

func DefaultSubscriptionOptions() SubscriptionOptions

DefaultSubscriptionOptions returns the default subscription options.

type ValidationError added in v0.1.10

type ValidationError struct {
	// CommandType is the type of command that failed validation.
	CommandType string

	// Field is the field that failed validation (optional).
	Field string

	// Message describes the validation failure.
	Message string

	// Cause is the underlying error (optional).
	Cause error
}

ValidationError represents a command validation failure.

func NewValidationError added in v0.1.10

func NewValidationError(cmdType, field, message string) *ValidationError

NewValidationError creates a new ValidationError.

func NewValidationErrorWithCause added in v0.1.10

func NewValidationErrorWithCause(cmdType, field, message string, cause error) *ValidationError

NewValidationErrorWithCause creates a new ValidationError with an underlying cause.

func (*ValidationError) Error added in v0.1.10

func (e *ValidationError) Error() string

Error returns the error message.

func (*ValidationError) Is added in v0.1.10

func (e *ValidationError) Is(target error) bool

Is reports whether this error matches the target error.

func (*ValidationError) Unwrap added in v0.1.10

func (e *ValidationError) Unwrap() error

Unwrap returns the underlying cause for errors.Unwrap().

type Validator added in v0.1.10

type Validator interface {
	// Validate validates a command and returns validation errors.
	Validate(cmd Command) error
}

Validator provides command validation functionality.

type ValidatorFunc added in v0.1.10

type ValidatorFunc func(cmd Command) error

ValidatorFunc is a function that implements Validator.

func (ValidatorFunc) Validate added in v0.1.10

func (f ValidatorFunc) Validate(cmd Command) error

Validate implements Validator.

type VersionSetter added in v0.4.6

type VersionSetter interface {
	SetVersion(v int64)
}

VersionSetter is an optional interface that aggregates can implement to allow the EventStore to set their version during loading. This is used for optimistic concurrency control in SaveAggregate. AggregateBase implements this interface.

type VersionedAggregate

type VersionedAggregate interface {
	Aggregate

	// OriginalVersion returns the version when the aggregate was loaded.
	OriginalVersion() int64
}

VersionedAggregate provides versioning information for optimistic concurrency.

Directories

Path Synopsis
Package adapters provides interfaces for event store backends.
Package adapters provides interfaces for event store backends.
memory
Package memory provides an in-memory implementation of the event store adapter.
Package memory provides an in-memory implementation of the event store adapter.
postgres
Package postgres provides a PostgreSQL implementation of the event store adapter.
Package postgres provides a PostgreSQL implementation of the event store adapter.
cli
commands
Package commands provides the CLI command implementations for mink.
Package commands provides the CLI command implementations for mink.
config
Package config provides configuration management for the mink CLI.
Package config provides configuration management for the mink CLI.
styles
Package styles provides consistent styling for the go-mink CLI.
Package styles provides consistent styling for the go-mink CLI.
ui
Package ui provides reusable UI components for the go-mink CLI.
Package ui provides reusable UI components for the go-mink CLI.
cmd
mink command
mink is the command-line interface for the go-mink event sourcing library.
mink is the command-line interface for the go-mink event sourcing library.
examples
basic command
Package main demonstrates the basic usage of go-mink for event sourcing.
Package main demonstrates the basic usage of go-mink for event sourcing.
cqrs command
Package main demonstrates the CQRS and Command Bus features of go-mink (Phase 2).
Package main demonstrates the CQRS and Command Bus features of go-mink (Phase 2).
cqrs-postgres command
Package main demonstrates Phase 2 CQRS & Command Bus features with PostgreSQL.
Package main demonstrates Phase 2 CQRS & Command Bus features with PostgreSQL.
full-ecommerce command
Package main demonstrates a complete e-commerce order fulfillment system using go-mink.
Package main demonstrates a complete e-commerce order fulfillment system using go-mink.
metrics command
Example: Metrics Middleware
Example: Metrics Middleware
msgpack command
Example: MessagePack Serializer
Example: MessagePack Serializer
projections command
Package main demonstrates the projection and read model features of go-mink.
Package main demonstrates the projection and read model features of go-mink.
protobuf command
Example: Protocol Buffers Serializer
Example: Protocol Buffers Serializer
sagas command
Example: Saga (Process Manager) Pattern
Example: Saga (Process Manager) Pattern
tracing command
Example: Distributed Tracing with OpenTelemetry
Example: Distributed Tracing with OpenTelemetry
middleware
metrics
Package metrics provides Prometheus metrics integration for mink.
Package metrics provides Prometheus metrics integration for mink.
tracing
Package tracing provides OpenTelemetry integration for mink.
Package tracing provides OpenTelemetry integration for mink.
serializer
msgpack
Package msgpack provides a MessagePack serializer implementation for mink.
Package msgpack provides a MessagePack serializer implementation for mink.
protobuf
Package protobuf provides a Protocol Buffers serializer for mink events.
Package protobuf provides a Protocol Buffers serializer for mink events.
testing
assertions
Package assertions provides event assertion utilities for testing event-sourced systems.
Package assertions provides event assertion utilities for testing event-sourced systems.
bdd
Package bdd provides BDD-style test fixtures for event-sourced aggregates.
Package bdd provides BDD-style test fixtures for event-sourced aggregates.
containers
Package containers provides test container utilities for integration testing.
Package containers provides test container utilities for integration testing.
projections
Package projections provides testing utilities for projection development.
Package projections provides testing utilities for projection development.
sagas
Package sagas provides testing utilities for saga (process manager) development.
Package sagas provides testing utilities for saga (process manager) development.
testutil
Package testutil provides test utilities and fixtures for go-mink.
Package testutil provides test utilities and fixtures for go-mink.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL