Documentation
¶
Overview ¶
Package mink provides event sourcing and CQRS primitives for Go applications. It offers a simple, flexible API for building event-sourced systems with support for multiple database backends.
Package mink provides event sourcing and CQRS primitives for Go applications.
go-mink is an Event Sourcing library for Go that makes it easy to build applications using event sourcing patterns. It provides a simple API for storing events, loading aggregates, and projecting read models.
Quick Start ¶
Create an event store with the in-memory adapter for development:
import (
"github.com/AshkanYarmoradi/go-mink"
"github.com/AshkanYarmoradi/go-mink/adapters/memory"
)
store := mink.New(memory.NewAdapter())
For production, use the PostgreSQL adapter:
import (
"github.com/AshkanYarmoradi/go-mink"
"github.com/AshkanYarmoradi/go-mink/adapters/postgres"
)
adapter, err := postgres.NewAdapter(ctx, connStr)
if err != nil {
log.Fatal(err)
}
store := mink.New(adapter)
Defining Events ¶
Events are simple structs that represent something that happened in your domain:
type OrderCreated struct {
OrderID string `json:"orderId"`
CustomerID string `json:"customerId"`
}
type ItemAdded struct {
OrderID string `json:"orderId"`
SKU string `json:"sku"`
Quantity int `json:"quantity"`
Price float64 `json:"price"`
}
Register events with the store so they can be serialized and deserialized:
store.RegisterEvents(OrderCreated{}, ItemAdded{})
Defining Aggregates ¶
Aggregates are domain objects that encapsulate business logic and generate events:
type Order struct {
mink.AggregateBase
CustomerID string
Items []OrderItem
Status string
}
func NewOrder(id string) *Order {
return &Order{
AggregateBase: mink.NewAggregateBase(id, "Order"),
}
}
func (o *Order) Create(customerID string) {
o.Apply(OrderCreated{OrderID: o.AggregateID(), CustomerID: customerID})
o.CustomerID = customerID
o.Status = "Created"
}
func (o *Order) ApplyEvent(event interface{}) error {
switch e := event.(type) {
case OrderCreated:
o.CustomerID = e.CustomerID
o.Status = "Created"
case ItemAdded:
o.Items = append(o.Items, OrderItem{SKU: e.SKU, Quantity: e.Quantity, Price: e.Price})
}
// NOTE: Version is managed automatically by LoadAggregate and SaveAggregate.
// You do NOT need to call IncrementVersion() here.
return nil
}
Saving and Loading Aggregates ¶
Save aggregates to persist their uncommitted events:
order := NewOrder("order-123")
order.Create("customer-456")
order.AddItem("SKU-001", 2, 29.99)
err := store.SaveAggregate(ctx, order)
Load aggregates to rebuild state from events:
loaded := NewOrder("order-123")
err := store.LoadAggregate(ctx, loaded)
// loaded.Status == "Created"
// loaded.Items contains the added item
// loaded.Version() returns the number of events in the stream
Low-Level Event Operations ¶
Append events directly to a stream:
events := []interface{}{
OrderCreated{OrderID: "123", CustomerID: "456"},
ItemAdded{OrderID: "123", SKU: "SKU-001", Quantity: 2, Price: 29.99},
}
err := store.Append(ctx, "Order-123", events)
Load events from a stream:
events, err := store.Load(ctx, "Order-123")
Optimistic Concurrency ¶
Use expected versions to prevent concurrent modifications:
// Create new stream (must not exist) err := store.Append(ctx, "Order-123", events, mink.ExpectVersion(mink.NoStream)) // Append to existing stream at specific version err := store.Append(ctx, "Order-123", events, mink.ExpectVersion(1))
Version constants:
- AnyVersion (-1): Skip version check
- NoStream (0): Stream must not exist
- StreamExists (-2): Stream must exist
Metadata ¶
Add metadata to events for tracing and multi-tenancy:
metadata := mink.Metadata{}.
WithUserID("user-123").
WithCorrelationID("corr-456").
WithTenantID("tenant-789")
err := store.Append(ctx, "Order-123", events, mink.WithAppendMetadata(metadata))
Commands and CQRS (v0.2.0) ¶
Define commands to encapsulate user intentions:
type CreateOrder struct {
mink.CommandBase
CustomerID string `json:"customerId"`
}
func (c CreateOrder) CommandType() string { return "CreateOrder" }
func (c CreateOrder) Validate() error {
if c.CustomerID == "" {
return mink.NewValidationError("CustomerID", "required")
}
return nil
}
Create a command bus with middleware:
bus := mink.NewCommandBus()
bus.Use(mink.ValidationMiddleware())
bus.Use(mink.RecoveryMiddleware(func(err error) { log.Error(err) }))
bus.Use(mink.LoggingMiddleware(logger, nil))
Register command handlers:
bus.Register("CreateOrder", func(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
c := cmd.(CreateOrder)
order := NewOrder(uuid.New().String())
order.Create(c.CustomerID)
if err := store.SaveAggregate(ctx, order); err != nil {
return mink.NewErrorResult(err), err
}
return mink.NewSuccessResult(order.AggregateID(), order.Version()), nil
})
Dispatch commands:
result, err := bus.Dispatch(ctx, CreateOrder{CustomerID: "cust-123"})
Idempotency ¶
Prevent duplicate command processing with idempotency:
idempotencyStore := memory.NewIdempotencyStore() config := mink.DefaultIdempotencyConfig(idempotencyStore) bus.Use(mink.IdempotencyMiddleware(config))
Make commands idempotent by implementing IdempotentCommand:
func (c CreateOrder) IdempotencyKey() string { return c.RequestID }
Index ¶
- Constants
- Variables
- func BuildStreamID(aggregateType, aggregateID string) string
- func CausationIDFromContext(ctx context.Context) string
- func CorrelationIDFromContext(ctx context.Context) string
- func GenerateIdempotencyKey(cmd Command) string
- func GetCommandType(cmd interface{}) string
- func GetEventType(event interface{}) string
- func GetIdempotencyKey(cmd Command) string
- func IdempotencyKeyFromField(fieldGetter func(Command) string) func(Command) string
- func IdempotencyKeyPrefix(prefix string) func(Command) string
- func RegisterGenericHandler[C Command](registry *HandlerRegistry, ...)
- func SagaStateToJSON(state *SagaState) ([]byte, error)
- func ShouldHandleEventType(handledEvents []string, eventType string) bool
- func TenantIDFromContext(ctx context.Context) string
- func Version() string
- func WithCausationID(ctx context.Context, causationID string) context.Context
- func WithTenantID(ctx context.Context, tenantID string) context.Context
- type Aggregate
- type AggregateBase
- func (a *AggregateBase) AggregateID() string
- func (a *AggregateBase) AggregateType() string
- func (a *AggregateBase) Apply(event interface{})
- func (a *AggregateBase) ClearUncommittedEvents()
- func (a *AggregateBase) HasUncommittedEvents() bool
- func (a *AggregateBase) IncrementVersion()
- func (a *AggregateBase) SetID(id string)
- func (a *AggregateBase) SetType(t string)
- func (a *AggregateBase) SetVersion(v int64)
- func (a *AggregateBase) StreamID() StreamID
- func (a *AggregateBase) UncommittedEvents() []interface{}
- func (a *AggregateBase) Version() int64
- type AggregateCommand
- type AggregateFactory
- type AggregateHandler
- type AggregateHandlerConfig
- type AggregateRoot
- type AppendOption
- type AsyncOptions
- type AsyncProjection
- type AsyncProjectionBase
- type AsyncResult
- type CatchupSubscription
- type CategoryFilter
- type Checkpoint
- type CheckpointStore
- type Clearable
- type Command
- type CommandBase
- func (c CommandBase) GetCausationID() string
- func (c CommandBase) GetCommandID() string
- func (c CommandBase) GetCorrelationID() string
- func (c CommandBase) GetMetadata(key string) string
- func (c CommandBase) WithCausationID(id string) CommandBase
- func (c CommandBase) WithCommandID(id string) CommandBase
- func (c CommandBase) WithCorrelationID(id string) CommandBase
- func (c CommandBase) WithMetadata(key, value string) CommandBase
- type CommandBus
- func (b *CommandBus) Close() error
- func (b *CommandBus) Dispatch(ctx context.Context, cmd Command) (CommandResult, error)
- func (b *CommandBus) DispatchAll(ctx context.Context, cmds ...Command) ([]DispatchResult, error)
- func (b *CommandBus) DispatchAsync(ctx context.Context, cmd Command) <-chan DispatchResult
- func (b *CommandBus) HandlerCount() int
- func (b *CommandBus) HasHandler(cmdType string) bool
- func (b *CommandBus) IsClosed() bool
- func (b *CommandBus) MiddlewareCount() int
- func (b *CommandBus) Register(handler CommandHandler)
- func (b *CommandBus) RegisterFunc(cmdType string, ...)
- func (b *CommandBus) Use(middleware ...Middleware)
- type CommandBusOption
- type CommandContext
- func (c *CommandContext) Get(key string) (interface{}, bool)
- func (c *CommandContext) GetString(key string) string
- func (c *CommandContext) Set(key string, value interface{})
- func (c *CommandContext) SetError(err error)
- func (c *CommandContext) SetResult(result CommandResult)
- func (c *CommandContext) SetSuccess(aggregateID string, version int64)
- type CommandDispatcher
- type CommandHandler
- type CommandHandlerFunc
- type CommandResult
- type CompositeFilter
- type ConcurrencyError
- type ContextValueMiddleware
- type DispatchResult
- type Event
- type EventApplier
- type EventData
- type EventFilter
- type EventRegistry
- type EventStore
- func (s *EventStore) Adapter() adapters.EventStoreAdapter
- func (s *EventStore) Append(ctx context.Context, streamID string, events []interface{}, ...) error
- func (s *EventStore) Close() error
- func (s *EventStore) GetLastPosition(ctx context.Context) (uint64, error)
- func (s *EventStore) GetStreamInfo(ctx context.Context, streamID string) (*StreamInfo, error)
- func (s *EventStore) Initialize(ctx context.Context) error
- func (s *EventStore) Load(ctx context.Context, streamID string) ([]Event, error)
- func (s *EventStore) LoadAggregate(ctx context.Context, agg Aggregate) error
- func (s *EventStore) LoadEventsFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]StoredEvent, error)
- func (s *EventStore) LoadFrom(ctx context.Context, streamID string, fromVersion int64) ([]Event, error)
- func (s *EventStore) LoadRaw(ctx context.Context, streamID string, fromVersion int64) ([]StoredEvent, error)
- func (s *EventStore) RegisterEvents(events ...interface{})
- func (s *EventStore) SaveAggregate(ctx context.Context, agg Aggregate) error
- func (s *EventStore) Serializer() Serializer
- type EventSubscriber
- type EventTypeFilter
- type EventTypeNotRegisteredError
- type Filter
- type FilterOp
- type GenericHandler
- type HandlerNotFoundError
- type HandlerRegistry
- func (r *HandlerRegistry) Clear()
- func (r *HandlerRegistry) CommandTypes() []string
- func (r *HandlerRegistry) Count() int
- func (r *HandlerRegistry) Get(cmdType string) CommandHandler
- func (r *HandlerRegistry) Has(cmdType string) bool
- func (r *HandlerRegistry) Register(handler CommandHandler)
- func (r *HandlerRegistry) RegisterFunc(cmdType string, ...)
- func (r *HandlerRegistry) Remove(cmdType string)
- type IdempotencyConfig
- type IdempotencyRecord
- type IdempotencyReplayError
- type IdempotencyStore
- type IdempotentCommand
- type InMemoryRepository
- func (r *InMemoryRepository[T]) Clear(ctx context.Context) error
- func (r *InMemoryRepository[T]) Count(ctx context.Context, query Query) (int64, error)
- func (r *InMemoryRepository[T]) Delete(ctx context.Context, id string) error
- func (r *InMemoryRepository[T]) DeleteMany(ctx context.Context, query Query) (int64, error)
- func (r *InMemoryRepository[T]) Exists(ctx context.Context, id string) (bool, error)
- func (r *InMemoryRepository[T]) Find(ctx context.Context, query Query) ([]*T, error)
- func (r *InMemoryRepository[T]) FindOne(ctx context.Context, query Query) (*T, error)
- func (r *InMemoryRepository[T]) Get(ctx context.Context, id string) (*T, error)
- func (r *InMemoryRepository[T]) GetAll(ctx context.Context) ([]*T, error)
- func (r *InMemoryRepository[T]) GetMany(ctx context.Context, ids []string) ([]*T, error)
- func (r *InMemoryRepository[T]) Insert(ctx context.Context, model *T) error
- func (r *InMemoryRepository[T]) Len() int
- func (r *InMemoryRepository[T]) Update(ctx context.Context, id string, updateFn func(*T)) error
- func (r *InMemoryRepository[T]) Upsert(ctx context.Context, model *T) error
- type InlineProjection
- type JSONSerializer
- func (s *JSONSerializer) Deserialize(data []byte, eventType string) (interface{}, error)
- func (s *JSONSerializer) Register(eventType string, example interface{})
- func (s *JSONSerializer) RegisterAll(examples ...interface{})
- func (s *JSONSerializer) Registry() *EventRegistry
- func (s *JSONSerializer) Serialize(event interface{}) ([]byte, error)
- type LiveOptions
- type LiveProjection
- type LiveProjectionBase
- type Logger
- type LoggingMiddleware
- type Metadata
- type MetricsCollector
- type Middleware
- func CausationIDMiddleware() Middleware
- func ChainMiddleware(middleware ...Middleware) Middleware
- func CommandTypeMiddleware(types []string, middleware Middleware) Middleware
- func ConditionalMiddleware(condition func(Command) bool, middleware Middleware) Middleware
- func CorrelationIDMiddleware(generator func() string) Middleware
- func IdempotencyMiddleware(config IdempotencyConfig) Middleware
- func MetricsMiddleware(collector MetricsCollector) Middleware
- func RecoveryMiddleware() Middleware
- func RetryMiddleware(config RetryConfig) Middleware
- func TenantMiddleware(extractor func(Command) string, required bool) Middleware
- func TimeoutMiddleware(timeout time.Duration) Middleware
- func ValidationMiddleware() Middleware
- type MiddlewareFunc
- type MultiValidationError
- func (e *MultiValidationError) Add(err *ValidationError)
- func (e *MultiValidationError) AddField(field, message string)
- func (e *MultiValidationError) Error() string
- func (e *MultiValidationError) HasErrors() bool
- func (e *MultiValidationError) Is(target error) bool
- func (e *MultiValidationError) Unwrap() error
- type Option
- type OrderBy
- type PanicError
- type ParallelRebuilder
- type PollingSubscription
- type ProgressCallback
- type Projection
- type ProjectionBase
- type ProjectionEngine
- func (e *ProjectionEngine) GetAllStatuses() []*ProjectionStatus
- func (e *ProjectionEngine) GetStatus(name string) (*ProjectionStatus, error)
- func (e *ProjectionEngine) IsRunning() bool
- func (e *ProjectionEngine) NotifyLiveProjections(ctx context.Context, events []StoredEvent)
- func (e *ProjectionEngine) ProcessInlineProjections(ctx context.Context, events []StoredEvent) error
- func (e *ProjectionEngine) RegisterAsync(projection AsyncProjection, opts ...AsyncOptions) error
- func (e *ProjectionEngine) RegisterInline(projection InlineProjection) error
- func (e *ProjectionEngine) RegisterLive(projection LiveProjection, opts ...LiveOptions) error
- func (e *ProjectionEngine) Start(ctx context.Context) error
- func (e *ProjectionEngine) Stop(ctx context.Context) error
- func (e *ProjectionEngine) Unregister(name string) error
- type ProjectionEngineOption
- type ProjectionError
- type ProjectionMetrics
- type ProjectionRebuilder
- type ProjectionRebuilderOption
- type ProjectionState
- type ProjectionStatus
- type Query
- func (q *Query) And(field string, op FilterOp, value interface{}) *Query
- func (q *Query) Build() Query
- func (q *Query) OrderByAsc(field string) *Query
- func (q *Query) OrderByDesc(field string) *Query
- func (q *Query) Where(field string, op FilterOp, value interface{}) *Query
- func (q *Query) WithCount() *Query
- func (q *Query) WithLimit(limit int) *Query
- func (q *Query) WithOffset(offset int) *Query
- func (q *Query) WithPagination(page, pageSize int) *Query
- type QueryResult
- type ReadModelID
- type ReadModelRepository
- type RebuildOptions
- type RebuildProgress
- type RetryConfig
- type RetryPolicy
- type Saga
- type SagaBase
- func (s *SagaBase) Complete()
- func (s *SagaBase) CompletedAt() *time.Time
- func (s *SagaBase) CorrelationID() string
- func (s *SagaBase) CurrentStep() int
- func (s *SagaBase) Fail()
- func (s *SagaBase) IncrementVersion()
- func (s *SagaBase) MarkCompensated()
- func (s *SagaBase) SagaID() string
- func (s *SagaBase) SagaType() string
- func (s *SagaBase) SetCompletedAt(t *time.Time)
- func (s *SagaBase) SetCorrelationID(id string)
- func (s *SagaBase) SetCurrentStep(step int)
- func (s *SagaBase) SetID(id string)
- func (s *SagaBase) SetStartedAt(t time.Time)
- func (s *SagaBase) SetStatus(status SagaStatus)
- func (s *SagaBase) SetType(t string)
- func (s *SagaBase) SetVersion(v int64)
- func (s *SagaBase) StartCompensation()
- func (s *SagaBase) StartedAt() time.Time
- func (s *SagaBase) Status() SagaStatus
- func (s *SagaBase) Version() int64
- type SagaCorrelation
- type SagaFactory
- type SagaFailedError
- type SagaManager
- func (m *SagaManager) FindSagaByCorrelationID(ctx context.Context, correlationID string) (*SagaState, error)
- func (m *SagaManager) GetSaga(ctx context.Context, sagaID string) (*SagaState, error)
- func (m *SagaManager) IsRunning() bool
- func (m *SagaManager) Position() uint64
- func (m *SagaManager) ProcessEvent(ctx context.Context, event StoredEvent) error
- func (m *SagaManager) Register(sagaType string, factory SagaFactory, correlation SagaCorrelation)
- func (m *SagaManager) RegisterSimple(sagaType string, factory SagaFactory, startingEvents ...string)
- func (m *SagaManager) SetPosition(pos uint64)
- func (m *SagaManager) Start(ctx context.Context) error
- func (m *SagaManager) StartAsync(ctx context.Context) *AsyncResult
- func (m *SagaManager) StartSaga(ctx context.Context, sagaType string, triggerEvent StoredEvent) error
- func (m *SagaManager) StartSagaAsync(ctx context.Context, sagaType string, triggerEvent StoredEvent) *AsyncResult
- func (m *SagaManager) Stop()
- type SagaManagerOption
- func WithCommandBus(bus *CommandBus) SagaManagerOption
- func WithSagaLogger(logger Logger) SagaManagerOption
- func WithSagaPollInterval(d time.Duration) SagaManagerOption
- func WithSagaRetryAttempts(attempts int) SagaManagerOption
- func WithSagaRetryDelay(d time.Duration) SagaManagerOption
- func WithSagaSerializer(serializer Serializer) SagaManagerOption
- func WithSagaStore(store SagaStore) SagaManagerOption
- type SagaNotFoundError
- type SagaState
- type SagaStatus
- type SagaStep
- type SagaStepStatus
- type SagaStore
- type SerializationError
- type Serializer
- type SimpleDispatcher
- type StoredEvent
- type StreamID
- type StreamInfo
- type StreamNotFoundError
- type Subscription
- type SubscriptionAdapter
- type SubscriptionOptions
- type ValidationError
- type Validator
- type ValidatorFunc
- type VersionSetter
- type VersionedAggregate
Constants ¶
const ( // AnyVersion skips version checking, allowing append regardless of current version. AnyVersion int64 = -1 // NoStream indicates the stream must not exist (for creating new streams). NoStream int64 = 0 // StreamExists indicates the stream must exist (for appending to existing streams). StreamExists int64 = -2 )
Version constants for optimistic concurrency control.
const ( SagaStatusStarted = adapters.SagaStatusStarted SagaStatusRunning = adapters.SagaStatusRunning SagaStatusCompleted = adapters.SagaStatusCompleted SagaStatusFailed = adapters.SagaStatusFailed SagaStatusCompensating = adapters.SagaStatusCompensating SagaStatusCompensated = adapters.SagaStatusCompensated SagaStatusCompensationFailed = adapters.SagaStatusCompensationFailed )
Re-export saga status constants from adapters.
const ( SagaStepPending = adapters.SagaStepPending SagaStepRunning = adapters.SagaStepRunning SagaStepCompleted = adapters.SagaStepCompleted SagaStepFailed = adapters.SagaStepFailed SagaStepCompensated = adapters.SagaStepCompensated )
Re-export saga step status constants from adapters.
Variables ¶
var ( // ErrStreamNotFound indicates the requested stream does not exist. ErrStreamNotFound = adapters.ErrStreamNotFound // ErrConcurrencyConflict indicates an optimistic concurrency violation. ErrConcurrencyConflict = adapters.ErrConcurrencyConflict // ErrEventNotFound indicates the requested event does not exist. ErrEventNotFound = errors.New("mink: event not found") // ErrSerializationFailed indicates event serialization/deserialization failed. ErrSerializationFailed = errors.New("mink: serialization failed") // ErrEventTypeNotRegistered indicates an unknown event type was encountered. ErrEventTypeNotRegistered = errors.New("mink: event type not registered") // ErrNilAggregate indicates a nil aggregate was passed. ErrNilAggregate = errors.New("mink: nil aggregate") // ErrNilStore indicates a nil event store was passed. ErrNilStore = errors.New("mink: nil event store") // ErrEmptyStreamID indicates an empty stream ID was provided. ErrEmptyStreamID = adapters.ErrEmptyStreamID // ErrNoEvents indicates no events were provided for append. ErrNoEvents = adapters.ErrNoEvents // ErrInvalidVersion indicates an invalid version number was provided. ErrInvalidVersion = adapters.ErrInvalidVersion // ErrAdapterClosed indicates the adapter has been closed. ErrAdapterClosed = adapters.ErrAdapterClosed // ErrSubscriptionNotSupported indicates the adapter does not support subscriptions. ErrSubscriptionNotSupported = errors.New("mink: adapter does not support subscriptions") // ErrHandlerNotFound indicates no handler is registered for a command type. ErrHandlerNotFound = errors.New("mink: handler not found") // ErrValidationFailed indicates command validation failed. ErrValidationFailed = errors.New("mink: validation failed") // ErrCommandAlreadyProcessed indicates an idempotent command was already processed. ErrCommandAlreadyProcessed = errors.New("mink: command already processed") // ErrNilCommand indicates a nil command was passed. ErrNilCommand = errors.New("mink: nil command") // ErrHandlerPanicked indicates a handler panicked during execution. ErrHandlerPanicked = errors.New("mink: handler panicked") // ErrCommandBusClosed indicates the command bus has been closed. ErrCommandBusClosed = errors.New("mink: command bus closed") )
Sentinel errors for common error conditions. Use errors.Is() to check for these errors. These errors are aliases to the adapters package errors for compatibility.
var ( // ErrNilProjection indicates a nil projection was passed. ErrNilProjection = errors.New("mink: nil projection") // ErrEmptyProjectionName indicates a projection has no name. ErrEmptyProjectionName = errors.New("mink: projection name is required") // ErrProjectionNotFound indicates the requested projection does not exist. ErrProjectionNotFound = errors.New("mink: projection not found") // ErrProjectionAlreadyRegistered indicates a projection with the same name is already registered. ErrProjectionAlreadyRegistered = errors.New("mink: projection already registered") // ErrProjectionEngineAlreadyRunning indicates the projection engine is already running. ErrProjectionEngineAlreadyRunning = errors.New("mink: projection engine already running") // ErrProjectionEngineStopped indicates the projection engine has been stopped. ErrProjectionEngineStopped = errors.New("mink: projection engine stopped") // ErrNoCheckpointStore indicates no checkpoint store was configured. ErrNoCheckpointStore = errors.New("mink: checkpoint store is required") // ErrNotImplemented indicates a method is not implemented. ErrNotImplemented = errors.New("mink: not implemented") // ErrProjectionFailed indicates a projection failed to process an event. ErrProjectionFailed = errors.New("mink: projection failed") )
var ( // ErrNotFound indicates the requested entity was not found. ErrNotFound = errors.New("mink: not found") // ErrAlreadyExists indicates the entity already exists. ErrAlreadyExists = errors.New("mink: already exists") // ErrInvalidQuery indicates the query is invalid. ErrInvalidQuery = errors.New("mink: invalid query") )
Repository errors
var ( // ErrSagaNotFound indicates the requested saga does not exist. ErrSagaNotFound = adapters.ErrSagaNotFound // ErrSagaAlreadyExists indicates a saga with the same ID already exists. ErrSagaAlreadyExists = adapters.ErrSagaAlreadyExists // ErrSagaCompleted indicates the saga has already completed. ErrSagaCompleted = errors.New("mink: saga already completed") // ErrSagaFailed indicates the saga has failed. ErrSagaFailed = errors.New("mink: saga failed") // ErrSagaCompensating indicates the saga is currently compensating. ErrSagaCompensating = errors.New("mink: saga is compensating") // ErrNoSagaHandler indicates no handler is registered for the event type. ErrNoSagaHandler = errors.New("mink: no saga handler for event") )
Saga-related sentinel errors.
Functions ¶
func BuildStreamID ¶
BuildStreamID creates a stream ID from an aggregate type and ID. This follows the convention: "{Type}-{ID}"
func CausationIDFromContext ¶ added in v0.1.10
CausationIDFromContext returns the causation ID from context.
func CorrelationIDFromContext ¶ added in v0.1.10
CorrelationIDFromContext returns the correlation ID from context.
func GenerateIdempotencyKey ¶ added in v0.1.10
GenerateIdempotencyKey generates an idempotency key from a command. The key is based on the command type and its JSON-serialized content.
func GetCommandType ¶ added in v0.1.10
func GetCommandType(cmd interface{}) string
GetCommandType returns the type name of a command using reflection. This is useful for commands that don't embed CommandBase.
func GetEventType ¶
func GetEventType(event interface{}) string
GetEventType returns the event type name for the given event. It uses the struct name as the type name.
func GetIdempotencyKey ¶ added in v0.1.10
GetIdempotencyKey returns the idempotency key for a command. If the command implements IdempotentCommand, it uses that key. Otherwise, it generates a key from the command content.
func IdempotencyKeyFromField ¶ added in v0.1.10
IdempotencyKeyFromField extracts the idempotency key from a field in the command. If the field is empty, it falls back to GenerateIdempotencyKey.
func IdempotencyKeyPrefix ¶ added in v0.1.10
IdempotencyKeyPrefix is a convenience function to create a prefixed idempotency key.
func RegisterGenericHandler ¶ added in v0.1.10
func RegisterGenericHandler[C Command](registry *HandlerRegistry, handler func(ctx context.Context, cmd C) (CommandResult, error))
RegisterGenericHandler is a convenience function to register a generic handler.
func SagaStateToJSON ¶ added in v0.5.0
SagaStateToJSON converts saga state to JSON for persistence.
func ShouldHandleEventType ¶ added in v0.3.8
ShouldHandleEventType checks if an event type should be handled given a list of handled events. Returns true if handledEvents is empty (meaning all events are handled) or if eventType is in the list. This is a utility function used by projection engine and rebuilder to filter events.
func TenantIDFromContext ¶ added in v0.1.10
TenantIDFromContext returns the tenant ID from context.
func WithCausationID ¶ added in v0.1.10
WithCausationID returns a context with the causation ID set.
Types ¶
type Aggregate ¶
type Aggregate interface {
// AggregateID returns the unique identifier for this aggregate instance.
AggregateID() string
// AggregateType returns the type/category of this aggregate (e.g., "Order", "Customer").
AggregateType() string
// Version returns the current version of the aggregate.
// This is the number of events that have been applied.
Version() int64
// ApplyEvent applies an event to update the aggregate's state.
// This method should be idempotent and deterministic.
ApplyEvent(event interface{}) error
// UncommittedEvents returns events that have been applied but not yet persisted.
UncommittedEvents() []interface{}
// ClearUncommittedEvents removes all uncommitted events after successful persistence.
ClearUncommittedEvents()
}
Aggregate defines the interface for event-sourced aggregates. An aggregate is a domain object whose state is derived from a sequence of events.
type AggregateBase ¶
type AggregateBase struct {
// contains filtered or unexported fields
}
AggregateBase provides a default partial implementation of the Aggregate interface. Embed this struct in your aggregate types to get default behavior.
func NewAggregateBase ¶
func NewAggregateBase(id, aggregateType string) AggregateBase
NewAggregateBase creates a new AggregateBase with the given ID and type.
func (*AggregateBase) AggregateID ¶
func (a *AggregateBase) AggregateID() string
AggregateID returns the aggregate's unique identifier.
func (*AggregateBase) AggregateType ¶
func (a *AggregateBase) AggregateType() string
AggregateType returns the aggregate type.
func (*AggregateBase) Apply ¶
func (a *AggregateBase) Apply(event interface{})
Apply records an event as uncommitted. This should be called by the aggregate after creating a new event. The aggregate should also update its internal state based on the event.
func (*AggregateBase) ClearUncommittedEvents ¶
func (a *AggregateBase) ClearUncommittedEvents()
ClearUncommittedEvents removes all uncommitted events.
func (*AggregateBase) HasUncommittedEvents ¶
func (a *AggregateBase) HasUncommittedEvents() bool
HasUncommittedEvents returns true if there are events waiting to be persisted.
func (*AggregateBase) IncrementVersion ¶
func (a *AggregateBase) IncrementVersion()
IncrementVersion increments the aggregate version by 1.
func (*AggregateBase) SetID ¶
func (a *AggregateBase) SetID(id string)
SetID sets the aggregate's ID.
func (*AggregateBase) SetType ¶
func (a *AggregateBase) SetType(t string)
SetType sets the aggregate type.
func (*AggregateBase) SetVersion ¶
func (a *AggregateBase) SetVersion(v int64)
SetVersion sets the aggregate version.
func (*AggregateBase) StreamID ¶
func (a *AggregateBase) StreamID() StreamID
StreamID returns the stream ID for this aggregate. The stream ID is composed of the aggregate type and ID.
func (*AggregateBase) UncommittedEvents ¶
func (a *AggregateBase) UncommittedEvents() []interface{}
UncommittedEvents returns events that haven't been persisted yet.
func (*AggregateBase) Version ¶
func (a *AggregateBase) Version() int64
Version returns the current version of the aggregate.
type AggregateCommand ¶ added in v0.1.10
type AggregateCommand interface {
Command
// AggregateID returns the ID of the aggregate this command targets.
// Returns empty string for commands that create new aggregates.
AggregateID() string
}
AggregateCommand is a command that targets a specific aggregate.
type AggregateFactory ¶
AggregateFactory creates new aggregate instances.
type AggregateHandler ¶ added in v0.1.10
type AggregateHandler[C AggregateCommand, A Aggregate] struct { // contains filtered or unexported fields }
AggregateHandler is a handler that works with aggregates and an event store. It loads the aggregate, executes the command, and saves the results.
func NewAggregateHandler ¶ added in v0.1.10
func NewAggregateHandler[C AggregateCommand, A Aggregate](config AggregateHandlerConfig[C, A]) *AggregateHandler[C, A]
NewAggregateHandler creates a new AggregateHandler.
func (*AggregateHandler[C, A]) CommandType ¶ added in v0.1.10
func (h *AggregateHandler[C, A]) CommandType() string
CommandType returns the command type this handler processes.
func (*AggregateHandler[C, A]) Handle ¶ added in v0.1.10
func (h *AggregateHandler[C, A]) Handle(ctx context.Context, cmd Command) (CommandResult, error)
Handle loads the aggregate, executes the command, and saves the aggregate.
type AggregateHandlerConfig ¶ added in v0.1.10
type AggregateHandlerConfig[C AggregateCommand, A Aggregate] struct { Store *EventStore Factory func(id string) A Executor func(ctx context.Context, agg A, cmd C) error NewIDFunc func() string }
AggregateHandlerConfig configures an AggregateHandler.
type AggregateRoot ¶
type AggregateRoot interface {
Aggregate
// GetID is an alias for AggregateID for DDD conventions.
GetID() string
}
AggregateRoot is an extended interface that includes domain-driven design patterns.
type AppendOption ¶
type AppendOption func(*appendConfig)
AppendOption configures an append operation.
func ExpectVersion ¶
func ExpectVersion(v int64) AppendOption
ExpectVersion sets the expected stream version for optimistic concurrency.
func WithAppendMetadata ¶
func WithAppendMetadata(m Metadata) AppendOption
WithMetadata sets metadata for all events in the append operation.
type AsyncOptions ¶ added in v0.3.0
type AsyncOptions struct {
// BatchSize is the maximum number of events to process in a batch.
// Default: 100
BatchSize int
// BatchTimeout is the maximum time to wait for a full batch.
// Default: 1 second
BatchTimeout time.Duration
// PollInterval is how often to poll for new events when idle.
// Default: 100ms
PollInterval time.Duration
// RetryPolicy defines how to handle errors.
RetryPolicy RetryPolicy
// MaxRetries is the maximum number of retries for failed events.
// Default: 3
MaxRetries int
// StartFromBeginning starts processing from the beginning of the event stream.
// If false, starts from the last checkpoint.
// Default: false
StartFromBeginning bool
}
AsyncOptions configures async projection behavior.
func DefaultAsyncOptions ¶ added in v0.3.0
func DefaultAsyncOptions() AsyncOptions
DefaultAsyncOptions returns the default async projection options.
type AsyncProjection ¶ added in v0.3.0
type AsyncProjection interface {
Projection
// Apply processes a single event asynchronously.
Apply(ctx context.Context, event StoredEvent) error
// ApplyBatch processes multiple events in a single batch for efficiency.
// If not supported, implementations should process events sequentially.
ApplyBatch(ctx context.Context, events []StoredEvent) error
}
AsyncProjection processes events asynchronously in the background. This provides eventual consistency but better write performance and scalability.
type AsyncProjectionBase ¶ added in v0.3.0
type AsyncProjectionBase struct {
ProjectionBase
}
AsyncProjectionBase provides a default implementation of AsyncProjection. Embed this struct and override Apply to create async projections.
func NewAsyncProjectionBase ¶ added in v0.3.0
func NewAsyncProjectionBase(name string, handledEvents ...string) AsyncProjectionBase
NewAsyncProjectionBase creates a new AsyncProjectionBase.
func (*AsyncProjectionBase) ApplyBatch ¶ added in v0.3.0
func (p *AsyncProjectionBase) ApplyBatch(ctx context.Context, events []StoredEvent) error
ApplyBatch provides a default implementation that processes events sequentially. Override this method for custom batch processing logic.
type AsyncResult ¶ added in v0.5.2
type AsyncResult struct {
// contains filtered or unexported fields
}
AsyncResult represents the result of an asynchronous operation. It provides methods to wait for completion and check the result.
func (*AsyncResult) Cancel ¶ added in v0.5.2
func (r *AsyncResult) Cancel()
Cancel cancels the async operation's context. This can be used to stop a long-running operation early.
func (*AsyncResult) Context ¶ added in v0.5.2
func (r *AsyncResult) Context() context.Context
Context returns the context for this async operation.
func (*AsyncResult) Done ¶ added in v0.5.2
func (r *AsyncResult) Done() <-chan struct{}
Done returns a channel that is closed when the operation completes. Use this in select statements for non-blocking wait patterns.
Example:
select {
case <-result.Done():
if err := result.Err(); err != nil {
log.Printf("Failed: %v", err)
}
case <-time.After(5 * time.Second):
result.Cancel()
log.Println("Timed out")
}
func (*AsyncResult) Err ¶ added in v0.5.2
func (r *AsyncResult) Err() error
Err returns the error from the completed operation, or nil if not yet complete or successful.
func (*AsyncResult) IsComplete ¶ added in v0.5.2
func (r *AsyncResult) IsComplete() bool
IsComplete returns true if the operation has completed (successfully or with an error).
func (*AsyncResult) Wait ¶ added in v0.5.2
func (r *AsyncResult) Wait() error
Wait blocks until the operation completes and returns any error. This is a convenience method equivalent to <-result.Done(); return result.Err()
func (*AsyncResult) WaitWithTimeout ¶ added in v0.5.2
func (r *AsyncResult) WaitWithTimeout(timeout time.Duration) error
WaitWithTimeout blocks until the operation completes or the timeout expires. Returns context.DeadlineExceeded if the timeout is reached before completion.
type CatchupSubscription ¶ added in v0.3.0
type CatchupSubscription struct {
// contains filtered or unexported fields
}
CatchupSubscription provides catch-up subscription functionality. It first reads historical events from the event store, then switches to polling for new events. This ensures no events are missed during the transition.
func NewCatchupSubscription ¶ added in v0.3.0
func NewCatchupSubscription( store *EventStore, fromPosition uint64, opts ...SubscriptionOptions, ) (*CatchupSubscription, error)
NewCatchupSubscription creates a new catch-up subscription. Call Start() to begin receiving events from the specified position.
func (*CatchupSubscription) Close ¶ added in v0.3.0
func (s *CatchupSubscription) Close() error
Close stops the subscription.
func (*CatchupSubscription) Err ¶ added in v0.3.0
func (s *CatchupSubscription) Err() error
Err returns any error that caused the subscription to close.
func (*CatchupSubscription) Events ¶ added in v0.3.0
func (s *CatchupSubscription) Events() <-chan StoredEvent
Events returns the channel for receiving events.
func (*CatchupSubscription) Position ¶ added in v0.3.0
func (s *CatchupSubscription) Position() uint64
Position returns the current position of the subscription.
type CategoryFilter ¶ added in v0.3.0
type CategoryFilter struct {
// contains filtered or unexported fields
}
CategoryFilter filters events by stream category.
func NewCategoryFilter ¶ added in v0.3.0
func NewCategoryFilter(category string) *CategoryFilter
NewCategoryFilter creates a filter that only matches events from streams in the category.
func (*CategoryFilter) Matches ¶ added in v0.3.0
func (f *CategoryFilter) Matches(event StoredEvent) bool
Matches returns true if the event's stream is in the category.
type Checkpoint ¶ added in v0.3.0
type Checkpoint struct {
// ProjectionName is the name of the projection.
ProjectionName string
// Position is the global position of the last processed event.
Position uint64
// UpdatedAt is when the checkpoint was last updated.
UpdatedAt time.Time
}
Checkpoint represents a stored checkpoint record.
type CheckpointStore ¶ added in v0.3.0
type CheckpointStore interface {
// GetCheckpoint returns the last processed position for a projection.
// Returns 0 if no checkpoint exists.
GetCheckpoint(ctx context.Context, projectionName string) (uint64, error)
// SetCheckpoint stores the last processed position for a projection.
SetCheckpoint(ctx context.Context, projectionName string, position uint64) error
// DeleteCheckpoint removes the checkpoint for a projection.
DeleteCheckpoint(ctx context.Context, projectionName string) error
// GetAllCheckpoints returns checkpoints for all projections.
GetAllCheckpoints(ctx context.Context) (map[string]uint64, error)
}
CheckpointStore manages projection checkpoints. Checkpoints track the last processed position for each projection.
type Clearable ¶ added in v0.3.0
type Clearable interface {
// Clear removes all data from the read model.
Clear(ctx context.Context) error
}
Clearable is an interface for projections that can clear their read model.
type Command ¶ added in v0.1.10
type Command interface {
// CommandType returns the type identifier for this command (e.g., "CreateOrder").
CommandType() string
// Validate checks if the command is valid.
// Returns nil if valid, or an error describing validation failures.
Validate() error
}
Command represents an intent to change state in the system. Commands are the write side of CQRS and should be validated before execution.
type CommandBase ¶ added in v0.1.10
type CommandBase struct {
// CommandID is an optional unique identifier for this command instance.
CommandID string `json:"commandId,omitempty"`
// CorrelationID links related commands and events for distributed tracing.
CorrelationID string `json:"correlationId,omitempty"`
// CausationID identifies the event or command that caused this command.
CausationID string `json:"causationId,omitempty"`
// Metadata contains arbitrary key-value pairs for application-specific data.
Metadata map[string]string `json:"metadata,omitempty"`
}
CommandBase provides a default partial implementation of Command. Embed this struct in your command types to get common functionality.
func (CommandBase) GetCausationID ¶ added in v0.1.10
func (c CommandBase) GetCausationID() string
GetCausationID returns the causation ID.
func (CommandBase) GetCommandID ¶ added in v0.1.10
func (c CommandBase) GetCommandID() string
GetCommandID returns the command ID.
func (CommandBase) GetCorrelationID ¶ added in v0.1.10
func (c CommandBase) GetCorrelationID() string
GetCorrelationID returns the correlation ID.
func (CommandBase) GetMetadata ¶ added in v0.1.10
func (c CommandBase) GetMetadata(key string) string
GetMetadata returns the value for a metadata key, or empty string if not found.
func (CommandBase) WithCausationID ¶ added in v0.1.10
func (c CommandBase) WithCausationID(id string) CommandBase
WithCausationID returns a copy of CommandBase with the causation ID set.
func (CommandBase) WithCommandID ¶ added in v0.1.10
func (c CommandBase) WithCommandID(id string) CommandBase
WithCommandID returns a copy of CommandBase with the command ID set.
func (CommandBase) WithCorrelationID ¶ added in v0.1.10
func (c CommandBase) WithCorrelationID(id string) CommandBase
WithCorrelationID returns a copy of CommandBase with the correlation ID set.
func (CommandBase) WithMetadata ¶ added in v0.1.10
func (c CommandBase) WithMetadata(key, value string) CommandBase
WithMetadata returns a copy of CommandBase with a metadata key-value pair added.
type CommandBus ¶ added in v0.1.10
type CommandBus struct {
// contains filtered or unexported fields
}
CommandBus orchestrates command dispatching with middleware support. It routes commands to their handlers through a configurable middleware pipeline.
func NewCommandBus ¶ added in v0.1.10
func NewCommandBus(opts ...CommandBusOption) *CommandBus
NewCommandBus creates a new CommandBus with the given options.
func (*CommandBus) Close ¶ added in v0.1.10
func (b *CommandBus) Close() error
Close closes the command bus, preventing further dispatch operations.
func (*CommandBus) Dispatch ¶ added in v0.1.10
func (b *CommandBus) Dispatch(ctx context.Context, cmd Command) (CommandResult, error)
Dispatch sends a command through the middleware pipeline to its handler.
func (*CommandBus) DispatchAll ¶ added in v0.1.10
func (b *CommandBus) DispatchAll(ctx context.Context, cmds ...Command) ([]DispatchResult, error)
DispatchAll dispatches multiple commands and returns all results. Commands are dispatched sequentially in order.
func (*CommandBus) DispatchAsync ¶ added in v0.1.10
func (b *CommandBus) DispatchAsync(ctx context.Context, cmd Command) <-chan DispatchResult
DispatchAsync sends a command asynchronously and returns immediately. The result can be retrieved through the returned channel.
func (*CommandBus) HandlerCount ¶ added in v0.1.10
func (b *CommandBus) HandlerCount() int
HandlerCount returns the number of registered handlers.
func (*CommandBus) HasHandler ¶ added in v0.1.10
func (b *CommandBus) HasHandler(cmdType string) bool
HasHandler returns true if a handler is registered for the command type.
func (*CommandBus) IsClosed ¶ added in v0.1.10
func (b *CommandBus) IsClosed() bool
IsClosed returns true if the command bus has been closed.
func (*CommandBus) MiddlewareCount ¶ added in v0.1.10
func (b *CommandBus) MiddlewareCount() int
MiddlewareCount returns the number of registered middleware.
func (*CommandBus) Register ¶ added in v0.1.10
func (b *CommandBus) Register(handler CommandHandler)
Register adds a handler to the command bus.
func (*CommandBus) RegisterFunc ¶ added in v0.1.10
func (b *CommandBus) RegisterFunc(cmdType string, fn func(ctx context.Context, cmd Command) (CommandResult, error))
RegisterFunc registers a handler function for a command type.
func (*CommandBus) Use ¶ added in v0.1.10
func (b *CommandBus) Use(middleware ...Middleware)
Use adds middleware to the command bus. Middleware is executed in the order it was added.
type CommandBusOption ¶ added in v0.1.10
type CommandBusOption func(*CommandBus)
CommandBusOption configures a CommandBus.
func WithHandlerRegistry ¶ added in v0.1.10
func WithHandlerRegistry(registry *HandlerRegistry) CommandBusOption
WithHandlerRegistry sets a custom handler registry.
func WithMiddleware ¶ added in v0.1.10
func WithMiddleware(middleware ...Middleware) CommandBusOption
WithMiddleware adds middleware to the command bus.
type CommandContext ¶ added in v0.1.10
type CommandContext struct {
// Context is the standard Go context.
Context context.Context
// Command is the command being executed.
Command Command
// Result is the command execution result (set by handler).
Result CommandResult
// Metadata contains additional context data that can be set by middleware.
Metadata map[string]interface{}
}
CommandContext carries command execution context through the middleware chain.
func NewCommandContext ¶ added in v0.1.10
func NewCommandContext(ctx context.Context, cmd Command) *CommandContext
NewCommandContext creates a new CommandContext.
func (*CommandContext) Get ¶ added in v0.1.10
func (c *CommandContext) Get(key string) (interface{}, bool)
Get retrieves a value from the context metadata.
func (*CommandContext) GetString ¶ added in v0.1.10
func (c *CommandContext) GetString(key string) string
GetString retrieves a string value from the context metadata.
func (*CommandContext) Set ¶ added in v0.1.10
func (c *CommandContext) Set(key string, value interface{})
Set stores a value in the context metadata.
func (*CommandContext) SetError ¶ added in v0.1.10
func (c *CommandContext) SetError(err error)
SetError sets an error result.
func (*CommandContext) SetResult ¶ added in v0.1.10
func (c *CommandContext) SetResult(result CommandResult)
SetResult sets the command execution result.
func (*CommandContext) SetSuccess ¶ added in v0.1.10
func (c *CommandContext) SetSuccess(aggregateID string, version int64)
SetSuccess sets a successful result.
type CommandDispatcher ¶ added in v0.1.10
type CommandDispatcher interface {
// Dispatch sends a command to its handler and returns the result.
Dispatch(ctx context.Context, cmd Command) (CommandResult, error)
}
CommandDispatcher can dispatch commands to handlers.
type CommandHandler ¶ added in v0.1.10
type CommandHandler interface {
// CommandType returns the type of command this handler processes.
CommandType() string
// Handle processes the command and returns a result.
Handle(ctx context.Context, cmd Command) (CommandResult, error)
}
CommandHandler is the interface for handling a specific command type. Handlers contain the business logic for processing commands.
type CommandHandlerFunc ¶ added in v0.1.10
type CommandHandlerFunc struct {
// contains filtered or unexported fields
}
CommandHandlerFunc is a function type that implements CommandHandler.
func NewCommandHandlerFunc ¶ added in v0.1.10
func NewCommandHandlerFunc(cmdType string, fn func(ctx context.Context, cmd Command) (CommandResult, error)) *CommandHandlerFunc
NewCommandHandlerFunc creates a new CommandHandlerFunc.
func (*CommandHandlerFunc) CommandType ¶ added in v0.1.10
func (h *CommandHandlerFunc) CommandType() string
CommandType returns the command type this handler processes.
func (*CommandHandlerFunc) Handle ¶ added in v0.1.10
func (h *CommandHandlerFunc) Handle(ctx context.Context, cmd Command) (CommandResult, error)
Handle processes the command.
type CommandResult ¶ added in v0.1.10
type CommandResult struct {
// Success indicates whether the command executed successfully.
Success bool
// AggregateID is the ID of the aggregate affected by the command.
// For create commands, this is the ID of the newly created aggregate.
AggregateID string
// Version is the new version of the aggregate after command execution.
Version int64
// Data contains any additional result data.
Data interface{}
// Error contains the error if the command failed.
Error error
}
CommandResult represents the result of command execution. It can contain either a successful result or an error.
func IdempotencyRecordToResult ¶ added in v0.1.10
func IdempotencyRecordToResult(r *IdempotencyRecord) CommandResult
IdempotencyRecordToResult converts the record to a CommandResult.
func NewErrorResult ¶ added in v0.1.10
func NewErrorResult(err error) CommandResult
NewErrorResult creates a failed CommandResult.
func NewSuccessResult ¶ added in v0.1.10
func NewSuccessResult(aggregateID string, version int64) CommandResult
NewSuccessResult creates a successful CommandResult.
func NewSuccessResultWithData ¶ added in v0.1.10
func NewSuccessResultWithData(aggregateID string, version int64, data interface{}) CommandResult
NewSuccessResultWithData creates a successful CommandResult with additional data.
func (CommandResult) IsError ¶ added in v0.1.10
func (r CommandResult) IsError() bool
IsError returns true if the command failed.
func (CommandResult) IsSuccess ¶ added in v0.1.10
func (r CommandResult) IsSuccess() bool
IsSuccess returns true if the command executed successfully.
type CompositeFilter ¶ added in v0.3.0
type CompositeFilter struct {
// contains filtered or unexported fields
}
CompositeFilter combines multiple filters with AND logic.
func NewCompositeFilter ¶ added in v0.3.0
func NewCompositeFilter(filters ...EventFilter) *CompositeFilter
NewCompositeFilter creates a filter that matches only if all filters match.
func (*CompositeFilter) Matches ¶ added in v0.3.0
func (f *CompositeFilter) Matches(event StoredEvent) bool
Matches returns true if all filters match.
type ConcurrencyError ¶
ConcurrencyError provides detailed information about a concurrency conflict.
func NewConcurrencyError ¶
func NewConcurrencyError(streamID string, expected, actual int64) *ConcurrencyError
NewConcurrencyError creates a new ConcurrencyError.
func (*ConcurrencyError) Error ¶
func (e *ConcurrencyError) Error() string
Error returns the error message.
func (*ConcurrencyError) Is ¶
func (e *ConcurrencyError) Is(target error) bool
Is reports whether this error matches the target error.
func (*ConcurrencyError) Unwrap ¶
func (e *ConcurrencyError) Unwrap() error
Unwrap returns the underlying error for errors.Unwrap().
type ContextValueMiddleware ¶ added in v0.1.10
type ContextValueMiddleware struct {
// contains filtered or unexported fields
}
ContextValueMiddleware adds values to the context.
func NewContextValueMiddleware ¶ added in v0.1.10
func NewContextValueMiddleware(key, value interface{}) *ContextValueMiddleware
NewContextValueMiddleware creates middleware that adds a value to the context.
func (*ContextValueMiddleware) Middleware ¶ added in v0.1.10
func (m *ContextValueMiddleware) Middleware() Middleware
Middleware returns the middleware function.
type DispatchResult ¶ added in v0.1.10
type DispatchResult struct {
CommandResult
Error error
}
DispatchResult contains the result of an asynchronous dispatch operation.
func (DispatchResult) IsSuccess ¶ added in v0.1.10
func (r DispatchResult) IsSuccess() bool
IsSuccess returns true if the dispatch was successful.
type Event ¶
type Event struct {
// ID is the globally unique event identifier.
ID string
// StreamID identifies the stream this event belongs to.
StreamID string
// Type is the event type identifier.
Type string
// Data is the deserialized event payload.
Data interface{}
// Metadata contains contextual information.
Metadata Metadata
// Version is the position within the stream (1-based).
Version int64
// GlobalPosition is the position across all streams.
GlobalPosition uint64
// Timestamp is when the event was stored.
Timestamp time.Time
}
Event represents a deserialized event with its data as a Go type. This is the high-level representation used by applications.
func DeserializeEvent ¶
func DeserializeEvent(serializer Serializer, stored StoredEvent) (Event, error)
DeserializeEvent is a convenience function that deserializes a StoredEvent to an Event.
func EventFromStored ¶
func EventFromStored(stored StoredEvent, data interface{}) Event
EventFromStored creates an Event from a StoredEvent with deserialized data.
type EventApplier ¶
type EventApplier func(aggregate interface{}, event interface{}) error
EventApplier is a function type for applying events to aggregates.
type EventData ¶
type EventData struct {
// Type is the event type identifier (e.g., "OrderCreated").
Type string
// Data is the serialized event payload.
Data []byte
// Metadata contains optional contextual information.
Metadata Metadata
}
EventData represents an event to be stored. It contains the event type, serialized payload, and optional metadata.
func NewEventData ¶
NewEventData creates a new EventData with the given type and data.
func SerializeEvent ¶
func SerializeEvent(serializer Serializer, event interface{}, metadata Metadata) (EventData, error)
SerializeEvent is a convenience function that serializes an event and returns EventData.
func (EventData) WithMetadata ¶
WithMetadata returns a copy of EventData with the metadata set.
type EventFilter ¶ added in v0.3.0
type EventFilter interface {
// Matches returns true if the event should be delivered.
Matches(event StoredEvent) bool
}
EventFilter determines which events should be delivered.
type EventRegistry ¶
type EventRegistry struct {
// contains filtered or unexported fields
}
EventRegistry maps event type names to Go types. It is used by the JSONSerializer to deserialize events to the correct type.
func NewEventRegistry ¶
func NewEventRegistry() *EventRegistry
NewEventRegistry creates a new empty EventRegistry.
func (*EventRegistry) Count ¶
func (r *EventRegistry) Count() int
Count returns the number of registered event types.
func (*EventRegistry) Lookup ¶
func (r *EventRegistry) Lookup(eventType string) (reflect.Type, bool)
Lookup returns the Go type for the given event type name. Returns nil and false if the type is not registered.
func (*EventRegistry) Register ¶
func (r *EventRegistry) Register(eventType string, example interface{})
Register adds a mapping from eventType to the Go type of the example. The example should be a value (not a pointer) of the event type.
func (*EventRegistry) RegisterAll ¶
func (r *EventRegistry) RegisterAll(examples ...interface{})
RegisterAll registers multiple events using their struct names as type names. Each example should be a value (not a pointer) of the event type.
func (*EventRegistry) RegisteredTypes ¶
func (r *EventRegistry) RegisteredTypes() []string
RegisteredTypes returns a slice of all registered event type names.
type EventStore ¶
type EventStore struct {
// contains filtered or unexported fields
}
EventStore is the main entry point for event sourcing operations. It provides methods for appending events, loading aggregates, and managing streams.
func New ¶
func New(adapter adapters.EventStoreAdapter, opts ...Option) *EventStore
New creates a new EventStore with the given adapter and options.
func (*EventStore) Adapter ¶
func (s *EventStore) Adapter() adapters.EventStoreAdapter
Adapter returns the underlying adapter.
func (*EventStore) Append ¶
func (s *EventStore) Append(ctx context.Context, streamID string, events []interface{}, opts ...AppendOption) error
Append stores events to the specified stream. Events can be Go structs which will be serialized using the configured serializer.
func (*EventStore) Close ¶
func (s *EventStore) Close() error
Close releases resources held by the event store.
func (*EventStore) GetLastPosition ¶
func (s *EventStore) GetLastPosition(ctx context.Context) (uint64, error)
GetLastPosition returns the global position of the last stored event.
func (*EventStore) GetStreamInfo ¶
func (s *EventStore) GetStreamInfo(ctx context.Context, streamID string) (*StreamInfo, error)
GetStreamInfo returns metadata about a stream.
func (*EventStore) Initialize ¶
func (s *EventStore) Initialize(ctx context.Context) error
Initialize sets up the required storage schema.
func (*EventStore) LoadAggregate ¶
func (s *EventStore) LoadAggregate(ctx context.Context, agg Aggregate) error
LoadAggregate loads an aggregate's state by replaying its events. The aggregate should be a new instance with its ID and type already set.
If the aggregate implements VersionSetter, the version will be set to the number of events loaded. This is required for proper optimistic concurrency control when saving the aggregate later.
Note: AggregateBase implements VersionSetter, so aggregates embedding AggregateBase will automatically have their version set correctly.
func (*EventStore) LoadEventsFromPosition ¶ added in v0.3.8
func (s *EventStore) LoadEventsFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]StoredEvent, error)
LoadEventsFromPosition loads events starting from a global position. Returns ErrSubscriptionNotSupported if the adapter does not implement SubscriptionAdapter. This is a helper method used by ProjectionEngine and ProjectionRebuilder.
func (*EventStore) LoadFrom ¶
func (s *EventStore) LoadFrom(ctx context.Context, streamID string, fromVersion int64) ([]Event, error)
LoadFrom retrieves events from a stream starting from the specified version.
func (*EventStore) LoadRaw ¶
func (s *EventStore) LoadRaw(ctx context.Context, streamID string, fromVersion int64) ([]StoredEvent, error)
LoadRaw retrieves raw (non-deserialized) events from a stream.
func (*EventStore) RegisterEvents ¶
func (s *EventStore) RegisterEvents(events ...interface{})
RegisterEvents registers event types with the serializer. This is required for deserializing events back to their original types.
func (*EventStore) SaveAggregate ¶
func (s *EventStore) SaveAggregate(ctx context.Context, agg Aggregate) error
SaveAggregate persists uncommitted events from an aggregate. The aggregate's version is used for optimistic concurrency control.
After a successful save, if the aggregate implements VersionSetter, the version will be updated to reflect the new stream version. This allows for subsequent modifications without reloading.
func (*EventStore) Serializer ¶
func (s *EventStore) Serializer() Serializer
Serializer returns the event store's serializer.
type EventSubscriber ¶ added in v0.3.0
type EventSubscriber interface {
// SubscribeAll subscribes to all events starting from the given position.
SubscribeAll(ctx context.Context, fromPosition uint64, opts ...SubscriptionOptions) (Subscription, error)
// SubscribeStream subscribes to events from a specific stream.
SubscribeStream(ctx context.Context, streamID string, fromVersion int64, opts ...SubscriptionOptions) (Subscription, error)
// SubscribeCategory subscribes to events from all streams in a category.
SubscribeCategory(ctx context.Context, category string, fromPosition uint64, opts ...SubscriptionOptions) (Subscription, error)
}
EventSubscriber provides event subscription capabilities.
type EventTypeFilter ¶ added in v0.3.0
type EventTypeFilter struct {
// contains filtered or unexported fields
}
EventTypeFilter filters events by type.
func NewEventTypeFilter ¶ added in v0.3.0
func NewEventTypeFilter(eventTypes ...string) *EventTypeFilter
NewEventTypeFilter creates a filter that only matches the specified event types.
func (*EventTypeFilter) Matches ¶ added in v0.3.0
func (f *EventTypeFilter) Matches(event StoredEvent) bool
Matches returns true if the event type is in the filter.
type EventTypeNotRegisteredError ¶
type EventTypeNotRegisteredError struct {
EventType string
}
EventTypeNotRegisteredError provides detailed information about an unregistered event type.
func NewEventTypeNotRegisteredError ¶
func NewEventTypeNotRegisteredError(eventType string) *EventTypeNotRegisteredError
NewEventTypeNotRegisteredError creates a new EventTypeNotRegisteredError.
func (*EventTypeNotRegisteredError) Error ¶
func (e *EventTypeNotRegisteredError) Error() string
Error returns the error message.
func (*EventTypeNotRegisteredError) Is ¶
func (e *EventTypeNotRegisteredError) Is(target error) bool
Is reports whether this error matches the target error.
func (*EventTypeNotRegisteredError) Unwrap ¶
func (e *EventTypeNotRegisteredError) Unwrap() error
Unwrap returns the underlying error for errors.Unwrap().
type Filter ¶ added in v0.3.0
type Filter struct {
// Field is the field name to filter on.
Field string
// Op is the comparison operator.
Op FilterOp
// Value is the value to compare against.
Value interface{}
}
Filter represents a query filter condition.
type FilterOp ¶ added in v0.3.0
type FilterOp string
FilterOp represents a filter operation.
const ( // FilterOpEq matches equal values. FilterOpEq FilterOp = "=" // FilterOpNe matches not equal values. FilterOpNe FilterOp = "!=" // FilterOpGt matches greater than values. FilterOpGt FilterOp = ">" // FilterOpGte matches greater than or equal values. FilterOpGte FilterOp = ">=" // FilterOpLt matches less than values. FilterOpLt FilterOp = "<" // FilterOpLte matches less than or equal values. FilterOpLte FilterOp = "<=" // FilterOpIn matches any value in a list. FilterOpIn FilterOp = "IN" // FilterOpNotIn matches no value in a list. FilterOpNotIn FilterOp = "NOT IN" // FilterOpLike matches using SQL LIKE pattern. FilterOpLike FilterOp = "LIKE" // FilterOpIsNull matches null values. FilterOpIsNull FilterOp = "IS NULL" // FilterOpIsNotNull matches non-null values. FilterOpIsNotNull FilterOp = "IS NOT NULL" // FilterOpContains matches arrays containing a value. FilterOpContains FilterOp = "CONTAINS" // FilterOpBetween matches values between two bounds. FilterOpBetween FilterOp = "BETWEEN" )
type GenericHandler ¶ added in v0.1.10
type GenericHandler[C Command] struct { // contains filtered or unexported fields }
GenericHandler is a type-safe command handler for a specific command type. Use this to create handlers with compile-time type checking.
func NewGenericHandler ¶ added in v0.1.10
func NewGenericHandler[C Command](handler func(ctx context.Context, cmd C) (CommandResult, error)) *GenericHandler[C]
NewGenericHandler creates a new GenericHandler for the specified command type.
func (*GenericHandler[C]) CommandType ¶ added in v0.1.10
func (h *GenericHandler[C]) CommandType() string
CommandType returns the command type this handler processes.
func (*GenericHandler[C]) Handle ¶ added in v0.1.10
func (h *GenericHandler[C]) Handle(ctx context.Context, cmd Command) (CommandResult, error)
Handle processes the command with type checking.
type HandlerNotFoundError ¶ added in v0.1.10
type HandlerNotFoundError struct {
CommandType string
}
HandlerNotFoundError provides detailed information about a missing handler.
func NewHandlerNotFoundError ¶ added in v0.1.10
func NewHandlerNotFoundError(cmdType string) *HandlerNotFoundError
NewHandlerNotFoundError creates a new HandlerNotFoundError.
func (*HandlerNotFoundError) Error ¶ added in v0.1.10
func (e *HandlerNotFoundError) Error() string
Error returns the error message.
func (*HandlerNotFoundError) Is ¶ added in v0.1.10
func (e *HandlerNotFoundError) Is(target error) bool
Is reports whether this error matches the target error.
func (*HandlerNotFoundError) Unwrap ¶ added in v0.1.10
func (e *HandlerNotFoundError) Unwrap() error
Unwrap returns the underlying error for errors.Unwrap().
type HandlerRegistry ¶ added in v0.1.10
type HandlerRegistry struct {
// contains filtered or unexported fields
}
HandlerRegistry manages command handler registration and lookup.
func NewHandlerRegistry ¶ added in v0.1.10
func NewHandlerRegistry() *HandlerRegistry
NewHandlerRegistry creates a new HandlerRegistry.
func (*HandlerRegistry) Clear ¶ added in v0.1.10
func (r *HandlerRegistry) Clear()
Clear removes all handlers.
func (*HandlerRegistry) CommandTypes ¶ added in v0.1.10
func (r *HandlerRegistry) CommandTypes() []string
CommandTypes returns all registered command types.
func (*HandlerRegistry) Count ¶ added in v0.1.10
func (r *HandlerRegistry) Count() int
Count returns the number of registered handlers.
func (*HandlerRegistry) Get ¶ added in v0.1.10
func (r *HandlerRegistry) Get(cmdType string) CommandHandler
Get returns the handler for a command type. Returns nil if no handler is registered.
func (*HandlerRegistry) Has ¶ added in v0.1.10
func (r *HandlerRegistry) Has(cmdType string) bool
Has returns true if a handler is registered for the command type.
func (*HandlerRegistry) Register ¶ added in v0.1.10
func (r *HandlerRegistry) Register(handler CommandHandler)
Register adds a handler for a command type. If a handler is already registered for this type, it will be replaced.
func (*HandlerRegistry) RegisterFunc ¶ added in v0.1.10
func (r *HandlerRegistry) RegisterFunc(cmdType string, fn func(ctx context.Context, cmd Command) (CommandResult, error))
RegisterFunc registers a handler function for a command type.
func (*HandlerRegistry) Remove ¶ added in v0.1.10
func (r *HandlerRegistry) Remove(cmdType string)
Remove removes a handler for a command type.
type IdempotencyConfig ¶ added in v0.1.10
type IdempotencyConfig struct {
// Store is the idempotency store to use.
Store IdempotencyStore
// TTL is how long to keep idempotency records.
// Default is 24 hours.
TTL time.Duration
// KeyGenerator generates idempotency keys from commands.
// If nil, GetIdempotencyKey is used.
KeyGenerator func(Command) string
// StoreErrors determines if failed commands should be stored.
// If true, replaying a failed command returns the same error.
// If false, failed commands can be retried.
// Default is false.
StoreErrors bool
// SkipCommands is a list of command types to skip idempotency checking.
SkipCommands []string
}
IdempotencyConfig configures the idempotency middleware.
func DefaultIdempotencyConfig ¶ added in v0.1.10
func DefaultIdempotencyConfig(store IdempotencyStore) IdempotencyConfig
DefaultIdempotencyConfig returns a default idempotency configuration.
type IdempotencyRecord ¶ added in v0.1.10
type IdempotencyRecord = adapters.IdempotencyRecord
IdempotencyRecord stores information about a processed command.
func NewIdempotencyRecord ¶ added in v0.1.10
func NewIdempotencyRecord(key, cmdType string, result CommandResult, ttl time.Duration) *IdempotencyRecord
NewIdempotencyRecord creates a new IdempotencyRecord from a CommandResult.
type IdempotencyReplayError ¶ added in v0.1.10
IdempotencyReplayError indicates a command was already processed.
func (*IdempotencyReplayError) Error ¶ added in v0.1.10
func (e *IdempotencyReplayError) Error() string
func (*IdempotencyReplayError) Is ¶ added in v0.1.10
func (e *IdempotencyReplayError) Is(target error) bool
func (*IdempotencyReplayError) Unwrap ¶ added in v0.1.10
func (e *IdempotencyReplayError) Unwrap() error
type IdempotencyStore ¶ added in v0.1.10
type IdempotencyStore = adapters.IdempotencyStore
IdempotencyStore tracks processed commands to prevent duplicate processing.
type IdempotentCommand ¶ added in v0.1.10
type IdempotentCommand interface {
Command
// IdempotencyKey returns a unique key for deduplication.
// Commands with the same key will only be processed once.
IdempotencyKey() string
}
IdempotentCommand is a command that supports idempotency.
type InMemoryRepository ¶ added in v0.3.0
type InMemoryRepository[T any] struct { // contains filtered or unexported fields }
InMemoryRepository provides an in-memory implementation of ReadModelRepository. Useful for testing and prototyping.
func NewInMemoryRepository ¶ added in v0.3.0
func NewInMemoryRepository[T any](getID func(*T) string) *InMemoryRepository[T]
NewInMemoryRepository creates a new in-memory repository. The getID function extracts the ID from a read model.
func (*InMemoryRepository[T]) Clear ¶ added in v0.3.0
func (r *InMemoryRepository[T]) Clear(ctx context.Context) error
Clear removes all read models.
func (*InMemoryRepository[T]) Count ¶ added in v0.3.0
Count returns the number of read models matching the query. Note: This basic implementation ignores query filters and returns total count. For production use with filtering, implement a database-backed repository.
func (*InMemoryRepository[T]) Delete ¶ added in v0.3.0
func (r *InMemoryRepository[T]) Delete(ctx context.Context, id string) error
Delete removes a read model by ID.
func (*InMemoryRepository[T]) DeleteMany ¶ added in v0.3.0
DeleteMany removes all read models matching the query. Note: This basic implementation ignores query filters and deletes all items when filters are provided. For production use with filtering, implement a database-backed repository.
func (*InMemoryRepository[T]) Exists ¶ added in v0.3.0
Exists checks if a read model with the given ID exists.
func (*InMemoryRepository[T]) Find ¶ added in v0.3.0
func (r *InMemoryRepository[T]) Find(ctx context.Context, query Query) ([]*T, error)
Find queries read models with the given criteria. Note: This is a basic implementation that doesn't support all filter operations.
func (*InMemoryRepository[T]) FindOne ¶ added in v0.3.0
func (r *InMemoryRepository[T]) FindOne(ctx context.Context, query Query) (*T, error)
FindOne returns the first read model matching the query.
func (*InMemoryRepository[T]) Get ¶ added in v0.3.0
func (r *InMemoryRepository[T]) Get(ctx context.Context, id string) (*T, error)
Get retrieves a read model by ID.
func (*InMemoryRepository[T]) GetAll ¶ added in v0.3.0
func (r *InMemoryRepository[T]) GetAll(ctx context.Context) ([]*T, error)
GetAll returns all read models in the repository.
func (*InMemoryRepository[T]) GetMany ¶ added in v0.3.0
func (r *InMemoryRepository[T]) GetMany(ctx context.Context, ids []string) ([]*T, error)
GetMany retrieves multiple read models by their IDs.
func (*InMemoryRepository[T]) Insert ¶ added in v0.3.0
func (r *InMemoryRepository[T]) Insert(ctx context.Context, model *T) error
Insert creates a new read model.
func (*InMemoryRepository[T]) Len ¶ added in v0.3.0
func (r *InMemoryRepository[T]) Len() int
Len returns the number of items in the repository.
type InlineProjection ¶ added in v0.3.0
type InlineProjection interface {
Projection
// Apply processes a single event within the event store transaction.
// The projection should update its read model based on the event.
Apply(ctx context.Context, event StoredEvent) error
}
InlineProjection processes events in the same transaction as the event store append. This provides strong consistency but may impact write performance.
type JSONSerializer ¶
type JSONSerializer struct {
// contains filtered or unexported fields
}
JSONSerializer is the default Serializer implementation using JSON encoding.
func NewJSONSerializer ¶
func NewJSONSerializer() *JSONSerializer
NewJSONSerializer creates a new JSONSerializer with an empty registry.
func NewJSONSerializerWithRegistry ¶
func NewJSONSerializerWithRegistry(registry *EventRegistry) *JSONSerializer
NewJSONSerializerWithRegistry creates a new JSONSerializer with the given registry.
func (*JSONSerializer) Deserialize ¶
func (s *JSONSerializer) Deserialize(data []byte, eventType string) (interface{}, error)
Deserialize converts JSON bytes back to an event. If the event type is registered, returns a value of that type. Otherwise, returns a map[string]interface{}.
func (*JSONSerializer) Register ¶
func (s *JSONSerializer) Register(eventType string, example interface{})
Register adds an event type to the serializer's registry.
func (*JSONSerializer) RegisterAll ¶
func (s *JSONSerializer) RegisterAll(examples ...interface{})
RegisterAll registers multiple events using their struct names as type names.
func (*JSONSerializer) Registry ¶
func (s *JSONSerializer) Registry() *EventRegistry
Registry returns the underlying EventRegistry.
func (*JSONSerializer) Serialize ¶
func (s *JSONSerializer) Serialize(event interface{}) ([]byte, error)
Serialize converts an event to JSON bytes.
type LiveOptions ¶ added in v0.3.0
type LiveOptions struct {
// BufferSize is the size of the event channel buffer.
// Default: 1000
BufferSize int
}
LiveOptions configures live projection behavior.
func DefaultLiveOptions ¶ added in v0.3.0
func DefaultLiveOptions() LiveOptions
DefaultLiveOptions returns the default live projection options.
type LiveProjection ¶ added in v0.3.0
type LiveProjection interface {
Projection
// OnEvent is called for each event in real-time.
// This method should not block for long periods.
OnEvent(ctx context.Context, event StoredEvent)
// IsTransient returns true if this projection doesn't persist state.
// Transient projections are not checkpointed.
IsTransient() bool
}
LiveProjection receives events in real-time for dashboards and notifications. These projections are transient and don't persist state.
type LiveProjectionBase ¶ added in v0.3.0
type LiveProjectionBase struct {
ProjectionBase
// contains filtered or unexported fields
}
LiveProjectionBase provides a default implementation of LiveProjection. Embed this struct and override OnEvent to create live projections.
func NewLiveProjectionBase ¶ added in v0.3.0
func NewLiveProjectionBase(name string, transient bool, handledEvents ...string) LiveProjectionBase
NewLiveProjectionBase creates a new LiveProjectionBase.
func (*LiveProjectionBase) IsTransient ¶ added in v0.3.0
func (p *LiveProjectionBase) IsTransient() bool
IsTransient returns whether this projection is transient.
type Logger ¶
type Logger interface {
Debug(msg string, args ...interface{})
Info(msg string, args ...interface{})
Warn(msg string, args ...interface{})
Error(msg string, args ...interface{})
}
Logger defines the logging interface for the event store.
type LoggingMiddleware ¶ added in v0.1.10
type LoggingMiddleware struct {
// contains filtered or unexported fields
}
LoggingMiddleware logs command execution.
func NewLoggingMiddleware ¶ added in v0.1.10
func NewLoggingMiddleware(logger Logger) *LoggingMiddleware
NewLoggingMiddleware creates a new LoggingMiddleware.
func (*LoggingMiddleware) Middleware ¶ added in v0.1.10
func (m *LoggingMiddleware) Middleware() Middleware
Middleware returns the middleware function.
type Metadata ¶
type Metadata struct {
// CorrelationID links related events across services for distributed tracing.
CorrelationID string `json:"correlationId,omitempty"`
// CausationID identifies the event or command that caused this event.
CausationID string `json:"causationId,omitempty"`
// UserID identifies the user who triggered this event.
UserID string `json:"userId,omitempty"`
// TenantID identifies the tenant for multi-tenant applications.
TenantID string `json:"tenantId,omitempty"`
// Custom contains arbitrary key-value pairs for application-specific metadata.
Custom map[string]string `json:"custom,omitempty"`
}
Metadata contains contextual information about an event. It supports distributed tracing, multi-tenancy, and custom key-value pairs.
func (Metadata) WithCausationID ¶
WithCausationID returns a copy of Metadata with the causation ID set.
func (Metadata) WithCorrelationID ¶
WithCorrelationID returns a copy of Metadata with the correlation ID set.
func (Metadata) WithCustom ¶
WithCustom returns a copy of Metadata with a custom key-value pair added.
func (Metadata) WithTenantID ¶
WithTenantID returns a copy of Metadata with the tenant ID set.
func (Metadata) WithUserID ¶
WithUserID returns a copy of Metadata with the user ID set.
type MetricsCollector ¶ added in v0.1.10
type MetricsCollector interface {
// RecordCommand records a command execution.
RecordCommand(cmdType string, duration time.Duration, success bool, err error)
}
MetricsMiddleware collects metrics about command execution.
type Middleware ¶ added in v0.1.10
type Middleware func(next MiddlewareFunc) MiddlewareFunc
Middleware wraps a handler function with additional functionality.
func CausationIDMiddleware ¶ added in v0.1.10
func CausationIDMiddleware() Middleware
CausationIDMiddleware creates middleware that propagates causation IDs. The causation ID links events/commands to the command that caused them. This is essential for tracking the chain of causality in event sourcing.
func ChainMiddleware ¶ added in v0.1.10
func ChainMiddleware(middleware ...Middleware) Middleware
ChainMiddleware creates a single middleware from multiple middleware.
func CommandTypeMiddleware ¶ added in v0.1.10
func CommandTypeMiddleware(types []string, middleware Middleware) Middleware
CommandTypeMiddleware applies middleware only for specific command types.
func ConditionalMiddleware ¶ added in v0.1.10
func ConditionalMiddleware(condition func(Command) bool, middleware Middleware) Middleware
ConditionalMiddleware applies middleware only if the condition is true.
func CorrelationIDMiddleware ¶ added in v0.1.10
func CorrelationIDMiddleware(generator func() string) Middleware
CorrelationIDMiddleware creates middleware that propagates correlation IDs.
func IdempotencyMiddleware ¶ added in v0.1.10
func IdempotencyMiddleware(config IdempotencyConfig) Middleware
IdempotencyMiddleware creates middleware that prevents duplicate command processing.
func MetricsMiddleware ¶ added in v0.1.10
func MetricsMiddleware(collector MetricsCollector) Middleware
MetricsMiddleware creates middleware that records metrics.
func RecoveryMiddleware ¶ added in v0.1.10
func RecoveryMiddleware() Middleware
RecoveryMiddleware recovers from panics in handlers and returns them as errors. It captures a sanitized representation of the command data for debugging.
func RetryMiddleware ¶ added in v0.1.10
func RetryMiddleware(config RetryConfig) Middleware
RetryMiddleware creates middleware that retries failed commands.
func TenantMiddleware ¶ added in v0.1.10
func TenantMiddleware(extractor func(Command) string, required bool) Middleware
TenantMiddleware extracts and validates tenant ID.
func TimeoutMiddleware ¶ added in v0.1.10
func TimeoutMiddleware(timeout time.Duration) Middleware
TimeoutMiddleware adds a timeout to command execution.
func ValidationMiddleware ¶ added in v0.1.10
func ValidationMiddleware() Middleware
ValidationMiddleware validates commands before they reach the handler. If validation fails, the command is not dispatched.
type MiddlewareFunc ¶ added in v0.1.10
type MiddlewareFunc func(ctx context.Context, cmd Command) (CommandResult, error)
MiddlewareFunc is the function signature for command middleware.
type MultiValidationError ¶ added in v0.1.10
type MultiValidationError struct {
// CommandType is the type of command that failed validation.
CommandType string
// Errors contains all validation errors.
Errors []*ValidationError
}
MultiValidationError contains multiple validation errors.
func NewMultiValidationError ¶ added in v0.1.10
func NewMultiValidationError(cmdType string) *MultiValidationError
NewMultiValidationError creates a new MultiValidationError.
func (*MultiValidationError) Add ¶ added in v0.1.10
func (e *MultiValidationError) Add(err *ValidationError)
Add adds a validation error.
func (*MultiValidationError) AddField ¶ added in v0.1.10
func (e *MultiValidationError) AddField(field, message string)
AddField adds a validation error for a specific field.
func (*MultiValidationError) Error ¶ added in v0.1.10
func (e *MultiValidationError) Error() string
Error returns the error message.
func (*MultiValidationError) HasErrors ¶ added in v0.1.10
func (e *MultiValidationError) HasErrors() bool
HasErrors returns true if there are any validation errors.
func (*MultiValidationError) Is ¶ added in v0.1.10
func (e *MultiValidationError) Is(target error) bool
Is reports whether this error matches the target error.
func (*MultiValidationError) Unwrap ¶ added in v0.1.10
func (e *MultiValidationError) Unwrap() error
Unwrap returns the first error for errors.Unwrap().
type Option ¶
type Option func(*EventStore)
Option configures an EventStore.
func WithSerializer ¶
func WithSerializer(s Serializer) Option
WithSerializer sets a custom serializer.
type OrderBy ¶ added in v0.3.0
type OrderBy struct {
// Field is the field name to sort by.
Field string
// Desc specifies descending order.
Desc bool
}
OrderBy represents a sort order.
type PanicError ¶ added in v0.1.10
type PanicError struct {
CommandType string
Value interface{}
Stack string
// CommandData contains a sanitized JSON representation of the command for debugging.
// Sensitive fields should be masked by the caller before setting this field.
CommandData string
}
PanicError provides detailed information about a handler panic.
func NewPanicError ¶ added in v0.1.10
func NewPanicError(cmdType string, value interface{}, stack string) *PanicError
NewPanicError creates a new PanicError.
func NewPanicErrorWithCommand ¶ added in v0.1.10
func NewPanicErrorWithCommand(cmdType string, value interface{}, stack string, commandData string) *PanicError
NewPanicErrorWithCommand creates a new PanicError with command data for debugging. The commandData should be a sanitized representation of the command (sensitive fields masked).
func (*PanicError) Error ¶ added in v0.1.10
func (e *PanicError) Error() string
Error returns the error message.
func (*PanicError) Is ¶ added in v0.1.10
func (e *PanicError) Is(target error) bool
Is reports whether this error matches the target error.
func (*PanicError) Unwrap ¶ added in v0.1.10
func (e *PanicError) Unwrap() error
Unwrap returns the underlying error for errors.Unwrap().
type ParallelRebuilder ¶ added in v0.3.0
type ParallelRebuilder struct {
// contains filtered or unexported fields
}
ParallelRebuilder rebuilds multiple projections in parallel.
func NewParallelRebuilder ¶ added in v0.3.0
func NewParallelRebuilder(rebuilder *ProjectionRebuilder, concurrency int) *ParallelRebuilder
NewParallelRebuilder creates a new parallel rebuilder.
func (*ParallelRebuilder) RebuildAll ¶ added in v0.3.0
func (pr *ParallelRebuilder) RebuildAll(ctx context.Context, projections []AsyncProjection, opts ...RebuildOptions) error
RebuildAll rebuilds multiple async projections in parallel.
type PollingSubscription ¶ added in v0.3.0
type PollingSubscription struct {
// contains filtered or unexported fields
}
PollingSubscription polls the event store for new events. This is a fallback when push-based subscriptions aren't available.
func NewPollingSubscription ¶ added in v0.3.0
func NewPollingSubscription( store *EventStore, fromPosition uint64, opts ...SubscriptionOptions, ) *PollingSubscription
NewPollingSubscription creates a new polling subscription.
func (*PollingSubscription) Close ¶ added in v0.3.0
func (s *PollingSubscription) Close() error
Close stops the subscription.
func (*PollingSubscription) Err ¶ added in v0.3.0
func (s *PollingSubscription) Err() error
Err returns any error that caused the subscription to close.
func (*PollingSubscription) Events ¶ added in v0.3.0
func (s *PollingSubscription) Events() <-chan StoredEvent
Events returns the channel for receiving events.
type ProgressCallback ¶ added in v0.3.0
type ProgressCallback func(progress RebuildProgress)
ProgressCallback is called periodically during rebuild with progress updates.
type Projection ¶ added in v0.3.0
type Projection interface {
// Name returns the unique identifier for this projection.
// This name is used for checkpointing and management.
Name() string
// HandledEvents returns the list of event types this projection handles.
// An empty list means the projection handles all event types.
HandledEvents() []string
}
Projection is the base interface for all projection types. Projections transform events into optimized read models.
type ProjectionBase ¶ added in v0.3.0
type ProjectionBase struct {
// contains filtered or unexported fields
}
ProjectionBase provides a default partial implementation of Projection. Embed this struct in your projection types to get common functionality.
func NewProjectionBase ¶ added in v0.3.0
func NewProjectionBase(name string, handledEvents ...string) ProjectionBase
NewProjectionBase creates a new ProjectionBase.
func (*ProjectionBase) HandledEvents ¶ added in v0.3.0
func (p *ProjectionBase) HandledEvents() []string
HandledEvents returns the list of event types this projection handles.
func (*ProjectionBase) HandlesEvent ¶ added in v0.3.0
func (p *ProjectionBase) HandlesEvent(eventType string) bool
HandlesEvent returns true if this projection handles the given event type.
func (*ProjectionBase) Name ¶ added in v0.3.0
func (p *ProjectionBase) Name() string
Name returns the projection name.
type ProjectionEngine ¶ added in v0.3.0
type ProjectionEngine struct {
// contains filtered or unexported fields
}
ProjectionEngine manages the lifecycle of projections. It handles registration, starting, stopping, and monitoring projections.
func NewProjectionEngine ¶ added in v0.3.0
func NewProjectionEngine(store *EventStore, opts ...ProjectionEngineOption) *ProjectionEngine
NewProjectionEngine creates a new ProjectionEngine.
func (*ProjectionEngine) GetAllStatuses ¶ added in v0.3.0
func (e *ProjectionEngine) GetAllStatuses() []*ProjectionStatus
GetAllStatuses returns the status of all registered projections.
func (*ProjectionEngine) GetStatus ¶ added in v0.3.0
func (e *ProjectionEngine) GetStatus(name string) (*ProjectionStatus, error)
GetStatus returns the status of a projection by name.
func (*ProjectionEngine) IsRunning ¶ added in v0.3.0
func (e *ProjectionEngine) IsRunning() bool
IsRunning returns true if the engine is running.
func (*ProjectionEngine) NotifyLiveProjections ¶ added in v0.3.0
func (e *ProjectionEngine) NotifyLiveProjections(ctx context.Context, events []StoredEvent)
NotifyLiveProjections notifies all live projections of new events.
func (*ProjectionEngine) ProcessInlineProjections ¶ added in v0.3.0
func (e *ProjectionEngine) ProcessInlineProjections(ctx context.Context, events []StoredEvent) error
ProcessInlineProjections processes all inline projections for the given events. This is called by the event store after appending events.
func (*ProjectionEngine) RegisterAsync ¶ added in v0.3.0
func (e *ProjectionEngine) RegisterAsync(projection AsyncProjection, opts ...AsyncOptions) error
RegisterAsync registers an async projection with the given options. Async projections are processed in background workers.
func (*ProjectionEngine) RegisterInline ¶ added in v0.3.0
func (e *ProjectionEngine) RegisterInline(projection InlineProjection) error
RegisterInline registers an inline projection. Inline projections are processed synchronously with event appends.
func (*ProjectionEngine) RegisterLive ¶ added in v0.3.0
func (e *ProjectionEngine) RegisterLive(projection LiveProjection, opts ...LiveOptions) error
RegisterLive registers a live projection with optional configuration. Live projections receive events in real-time.
func (*ProjectionEngine) Start ¶ added in v0.3.0
func (e *ProjectionEngine) Start(ctx context.Context) error
Start starts the projection engine and all registered projections.
func (*ProjectionEngine) Stop ¶ added in v0.3.0
func (e *ProjectionEngine) Stop(ctx context.Context) error
Stop gracefully stops the projection engine.
func (*ProjectionEngine) Unregister ¶ added in v0.3.0
func (e *ProjectionEngine) Unregister(name string) error
Unregister removes a projection by name.
type ProjectionEngineOption ¶ added in v0.3.0
type ProjectionEngineOption func(*ProjectionEngine)
ProjectionEngineOption configures a ProjectionEngine.
func WithCheckpointStore ¶ added in v0.3.0
func WithCheckpointStore(store CheckpointStore) ProjectionEngineOption
WithCheckpointStore sets the checkpoint store for the engine.
func WithProjectionLogger ¶ added in v0.3.0
func WithProjectionLogger(logger Logger) ProjectionEngineOption
WithProjectionLogger sets the logger for the engine.
func WithProjectionMetrics ¶ added in v0.3.0
func WithProjectionMetrics(metrics ProjectionMetrics) ProjectionEngineOption
WithProjectionMetrics sets the metrics collector for the engine.
type ProjectionError ¶ added in v0.3.0
ProjectionError provides detailed information about a projection failure.
func NewProjectionError ¶ added in v0.3.0
func NewProjectionError(projectionName, eventType string, position uint64, cause error) *ProjectionError
NewProjectionError creates a new ProjectionError.
func (*ProjectionError) Error ¶ added in v0.3.0
func (e *ProjectionError) Error() string
Error returns the error message.
func (*ProjectionError) Is ¶ added in v0.3.0
func (e *ProjectionError) Is(target error) bool
Is reports whether this error matches the target error.
func (*ProjectionError) Unwrap ¶ added in v0.3.0
func (e *ProjectionError) Unwrap() error
Unwrap returns the underlying cause for errors.Unwrap().
type ProjectionMetrics ¶ added in v0.3.0
type ProjectionMetrics interface {
// RecordEventProcessed records that an event was processed.
RecordEventProcessed(projectionName, eventType string, duration time.Duration, success bool)
// RecordBatchProcessed records that a batch of events was processed.
RecordBatchProcessed(projectionName string, count int, duration time.Duration, success bool)
// RecordCheckpoint records a checkpoint update.
RecordCheckpoint(projectionName string, position uint64)
// RecordError records a projection error.
RecordError(projectionName string, err error)
}
ProjectionMetrics collects metrics about projection processing.
type ProjectionRebuilder ¶ added in v0.3.0
type ProjectionRebuilder struct {
// contains filtered or unexported fields
}
ProjectionRebuilder rebuilds projections from scratch. It replays all events through a projection to reconstruct its read model.
func NewProjectionRebuilder ¶ added in v0.3.0
func NewProjectionRebuilder(store *EventStore, checkpointStore CheckpointStore, opts ...ProjectionRebuilderOption) *ProjectionRebuilder
NewProjectionRebuilder creates a new projection rebuilder.
func (*ProjectionRebuilder) RebuildAsync ¶ added in v0.3.0
func (r *ProjectionRebuilder) RebuildAsync(ctx context.Context, projection AsyncProjection, opts ...RebuildOptions) error
RebuildAsync rebuilds an async projection from scratch.
func (*ProjectionRebuilder) RebuildInline ¶ added in v0.3.0
func (r *ProjectionRebuilder) RebuildInline(ctx context.Context, projection InlineProjection, opts ...RebuildOptions) error
RebuildInline rebuilds an inline projection from scratch.
type ProjectionRebuilderOption ¶ added in v0.3.0
type ProjectionRebuilderOption func(*ProjectionRebuilder)
ProjectionRebuilderOption configures a ProjectionRebuilder.
func WithRebuilderBatchSize ¶ added in v0.3.0
func WithRebuilderBatchSize(size int) ProjectionRebuilderOption
WithRebuilderBatchSize sets the batch size for rebuilding.
func WithRebuilderLogger ¶ added in v0.3.0
func WithRebuilderLogger(logger Logger) ProjectionRebuilderOption
WithRebuilderLogger sets the logger for the rebuilder.
func WithRebuilderMetrics ¶ added in v0.3.0
func WithRebuilderMetrics(metrics ProjectionMetrics) ProjectionRebuilderOption
WithRebuilderMetrics sets the metrics collector for the rebuilder.
type ProjectionState ¶ added in v0.3.0
type ProjectionState string
ProjectionState represents the current state of a projection.
const ( // ProjectionStateStopped indicates the projection is not running. ProjectionStateStopped ProjectionState = "stopped" // ProjectionStateRunning indicates the projection is actively processing events. ProjectionStateRunning ProjectionState = "running" // ProjectionStatePaused indicates the projection is paused. ProjectionStatePaused ProjectionState = "paused" // ProjectionStateFaulted indicates the projection has encountered an error. ProjectionStateFaulted ProjectionState = "faulted" // ProjectionStateRebuilding indicates the projection is being rebuilt. ProjectionStateRebuilding ProjectionState = "rebuilding" // ProjectionStateCatchingUp indicates the projection is catching up to current events. ProjectionStateCatchingUp ProjectionState = "catching_up" )
type ProjectionStatus ¶ added in v0.3.0
type ProjectionStatus struct {
// Name is the projection name.
Name string
// State is the current state of the projection.
State ProjectionState
// LastPosition is the global position of the last processed event.
LastPosition uint64
// EventsProcessed is the total number of events processed.
EventsProcessed uint64
// LastProcessedAt is when the last event was processed.
LastProcessedAt time.Time
// Error contains the error message if the projection is faulted.
Error string
// Lag is the number of events behind the head of the event store.
Lag uint64
// AverageLatency is the average time to process an event.
AverageLatency time.Duration
}
ProjectionStatus provides detailed information about a projection's current state.
type Query ¶ added in v0.3.0
type Query struct {
// Filters to apply.
Filters []Filter
// Ordering criteria.
OrderBy []OrderBy
// Maximum number of results to return.
// 0 means no limit.
Limit int
// Number of results to skip.
Offset int
// IncludeCount includes the total count in paginated results.
IncludeCount bool
}
Query represents a query for read models.
func (*Query) OrderByAsc ¶ added in v0.3.0
OrderByAsc adds ascending order.
func (*Query) OrderByDesc ¶ added in v0.3.0
OrderByDesc adds descending order.
func (*Query) WithOffset ¶ added in v0.3.0
WithOffset sets the number of results to skip.
func (*Query) WithPagination ¶ added in v0.3.0
WithPagination sets limit and offset for pagination.
type QueryResult ¶ added in v0.3.0
type QueryResult[T any] struct { // Items contains the matching read models. Items []*T // TotalCount is the total number of matching items (before pagination). // Only populated if IncludeCount was true in the query. TotalCount int64 // HasMore indicates if there are more results beyond the limit. HasMore bool }
QueryResult contains query results with optional count.
type ReadModelID ¶ added in v0.3.0
type ReadModelID interface {
// GetID returns the read model's unique identifier.
GetID() string
}
ReadModelID is an interface for read models that can return their ID.
type ReadModelRepository ¶ added in v0.3.0
type ReadModelRepository[T any] interface { // Get retrieves a read model by ID. // Returns ErrNotFound if not found. Get(ctx context.Context, id string) (*T, error) // GetMany retrieves multiple read models by their IDs. // Missing IDs are silently skipped. GetMany(ctx context.Context, ids []string) ([]*T, error) // Find queries read models with the given criteria. Find(ctx context.Context, query Query) ([]*T, error) // FindOne returns the first read model matching the query. // Returns ErrNotFound if no match. FindOne(ctx context.Context, query Query) (*T, error) // Count returns the number of read models matching the query. Count(ctx context.Context, query Query) (int64, error) // Insert creates a new read model. // Returns ErrAlreadyExists if ID already exists. Insert(ctx context.Context, model *T) error // Update modifies an existing read model. // Returns ErrNotFound if not found. Update(ctx context.Context, id string, updateFn func(*T)) error // Upsert creates or updates a read model. Upsert(ctx context.Context, model *T) error // Delete removes a read model by ID. // Returns ErrNotFound if not found. Delete(ctx context.Context, id string) error // DeleteMany removes all read models matching the query. // Returns the number of deleted models. DeleteMany(ctx context.Context, query Query) (int64, error) // Clear removes all read models. Clear(ctx context.Context) error }
ReadModelRepository provides generic CRUD operations for read models. T is the read model type.
type RebuildOptions ¶ added in v0.3.0
type RebuildOptions struct {
// DeleteCheckpoint deletes the existing checkpoint before rebuilding.
// Default: true
DeleteCheckpoint bool
// ClearReadModel calls the projection's Clear method before rebuilding.
// Only applicable for projections that implement Clearable.
// Default: true
ClearReadModel bool
// ProgressCallback is called periodically with progress updates.
ProgressCallback ProgressCallback
// ProgressInterval is how often to call the progress callback.
// Default: 1 second
ProgressInterval time.Duration
// FromPosition starts rebuilding from a specific position.
// Default: 0 (from beginning)
FromPosition uint64
// ToPosition stops rebuilding at a specific position.
// Default: 0 (to end)
ToPosition uint64
}
RebuildOptions configures a projection rebuild.
func DefaultRebuildOptions ¶ added in v0.3.0
func DefaultRebuildOptions() RebuildOptions
DefaultRebuildOptions returns the default rebuild options.
type RebuildProgress ¶ added in v0.3.0
type RebuildProgress struct {
// ProjectionName is the name of the projection being rebuilt.
ProjectionName string
// TotalEvents is the total number of events to process.
TotalEvents uint64
// ProcessedEvents is the number of events processed so far.
ProcessedEvents uint64
// CurrentPosition is the current global position.
CurrentPosition uint64
// StartedAt is when the rebuild started.
StartedAt time.Time
// Duration is the elapsed time.
Duration time.Duration
// EventsPerSecond is the processing rate.
EventsPerSecond float64
// EstimatedRemaining is the estimated time remaining.
EstimatedRemaining time.Duration
// Completed indicates if the rebuild is complete.
Completed bool
// Error contains any error that occurred.
Error error
}
RebuildProgress tracks the progress of a projection rebuild.
type RetryConfig ¶ added in v0.1.10
type RetryConfig struct {
// MaxAttempts is the maximum number of attempts (including the first one).
MaxAttempts int
// InitialDelay is the initial delay between retries.
InitialDelay time.Duration
// MaxDelay is the maximum delay between retries.
MaxDelay time.Duration
// Multiplier is the factor by which the delay increases on each retry.
Multiplier float64
// ShouldRetry determines if an error should be retried.
// If nil, all errors are retried.
ShouldRetry func(err error) bool
}
RetryMiddleware retries failed commands.
func DefaultRetryConfig ¶ added in v0.1.10
func DefaultRetryConfig() RetryConfig
DefaultRetryConfig returns a default retry configuration.
type RetryPolicy ¶ added in v0.3.0
type RetryPolicy interface {
// ShouldRetry returns true if the operation should be retried.
ShouldRetry(attempt int, err error) bool
// Delay returns the duration to wait before the next retry.
Delay(attempt int) time.Duration
}
RetryPolicy defines how to handle retries for failed operations.
func ExponentialBackoffRetry ¶ added in v0.3.0
func ExponentialBackoffRetry(maxRetries int, baseDelay, maxDelay time.Duration) RetryPolicy
ExponentialBackoffRetry creates a new retry policy with exponential backoff.
func NoRetry ¶ added in v0.3.0
func NoRetry() RetryPolicy
NoRetry returns a retry policy that never retries.
type Saga ¶ added in v0.5.0
type Saga interface {
// SagaID returns the unique identifier for this saga instance.
SagaID() string
// SagaType returns the type of this saga (e.g., "OrderFulfillment").
SagaType() string
// Status returns the current status of the saga.
Status() SagaStatus
// SetStatus sets the saga status.
SetStatus(status SagaStatus)
// CurrentStep returns the current step number (0-based).
CurrentStep() int
// SetCurrentStep sets the current step number.
SetCurrentStep(step int)
// CorrelationID returns the correlation ID for this saga.
// Used to correlate events to this saga instance.
CorrelationID() string
// SetCorrelationID sets the correlation ID.
SetCorrelationID(id string)
// HandledEvents returns the list of event types this saga handles.
HandledEvents() []string
// HandleEvent processes an event and returns commands to dispatch.
// The returned commands will be executed by the saga manager.
HandleEvent(ctx context.Context, event StoredEvent) ([]Command, error)
// Compensate is called when the saga needs to rollback.
// It returns compensating commands to undo previous steps.
Compensate(ctx context.Context, failedStep int, failureReason error) ([]Command, error)
// IsComplete returns true if the saga has completed successfully.
IsComplete() bool
// StartedAt returns when the saga started.
StartedAt() time.Time
// SetStartedAt sets when the saga started.
SetStartedAt(t time.Time)
// CompletedAt returns when the saga completed (nil if not completed).
CompletedAt() *time.Time
// SetCompletedAt sets when the saga completed.
SetCompletedAt(t *time.Time)
// Data returns the saga's internal state as a map.
// This is serialized and stored in the saga store.
Data() map[string]interface{}
// SetData restores the saga's internal state from a map.
SetData(data map[string]interface{})
// Version returns the saga version for optimistic concurrency.
Version() int64
// SetVersion sets the saga version.
SetVersion(v int64)
// IncrementVersion increments the saga version.
IncrementVersion()
}
Saga defines the interface for saga implementations. A saga coordinates long-running business processes across multiple aggregates.
type SagaBase ¶ added in v0.5.0
type SagaBase struct {
// contains filtered or unexported fields
}
SagaBase provides a default partial implementation of the Saga interface. Embed this struct in your saga types to get default behavior.
func NewSagaBase ¶ added in v0.5.0
NewSagaBase creates a new SagaBase with the given ID and type.
func (*SagaBase) Complete ¶ added in v0.5.0
func (s *SagaBase) Complete()
Complete marks the saga as completed.
func (*SagaBase) CompletedAt ¶ added in v0.5.0
CompletedAt returns when the saga completed.
func (*SagaBase) CorrelationID ¶ added in v0.5.0
CorrelationID returns the correlation ID.
func (*SagaBase) CurrentStep ¶ added in v0.5.0
CurrentStep returns the current step number.
func (*SagaBase) IncrementVersion ¶ added in v0.5.0
func (s *SagaBase) IncrementVersion()
IncrementVersion increments the saga version.
func (*SagaBase) MarkCompensated ¶ added in v0.5.0
func (s *SagaBase) MarkCompensated()
MarkCompensated marks the saga as compensated.
func (*SagaBase) SetCompletedAt ¶ added in v0.5.0
SetCompletedAt sets when the saga completed.
func (*SagaBase) SetCorrelationID ¶ added in v0.5.0
SetCorrelationID sets the correlation ID.
func (*SagaBase) SetCurrentStep ¶ added in v0.5.0
SetCurrentStep sets the current step number.
func (*SagaBase) SetStartedAt ¶ added in v0.5.0
SetStartedAt sets when the saga started.
func (*SagaBase) SetStatus ¶ added in v0.5.0
func (s *SagaBase) SetStatus(status SagaStatus)
SetStatus sets the saga status.
func (*SagaBase) SetVersion ¶ added in v0.5.0
SetVersion sets the saga version.
func (*SagaBase) StartCompensation ¶ added in v0.5.0
func (s *SagaBase) StartCompensation()
StartCompensation marks the saga as compensating.
func (*SagaBase) Status ¶ added in v0.5.0
func (s *SagaBase) Status() SagaStatus
Status returns the current status.
type SagaCorrelation ¶ added in v0.5.0
type SagaCorrelation struct {
// SagaType is the type of saga this correlation applies to.
SagaType string
// EventTypes are the event types that can start this saga.
StartingEvents []string
// CorrelationIDFunc extracts the correlation ID from an event.
// This is used to find existing sagas or create new ones.
CorrelationIDFunc func(event StoredEvent) string
}
SagaCorrelation provides strategies for correlating events to sagas.
type SagaFactory ¶ added in v0.5.0
SagaFactory creates new saga instances.
type SagaFailedError ¶ added in v0.5.0
type SagaFailedError struct {
SagaID string
SagaType string
FailedStep int
Reason string
Recoverable bool
}
SagaFailedError provides detailed information about a saga failure.
func NewSagaFailedError ¶ added in v0.5.0
func NewSagaFailedError(sagaID, sagaType string, failedStep int, reason string, recoverable bool) *SagaFailedError
NewSagaFailedError creates a new SagaFailedError.
func (*SagaFailedError) Error ¶ added in v0.5.0
func (e *SagaFailedError) Error() string
Error returns the error message.
func (*SagaFailedError) Is ¶ added in v0.5.0
func (e *SagaFailedError) Is(target error) bool
Is reports whether this error matches the target error.
type SagaManager ¶ added in v0.5.0
type SagaManager struct {
// contains filtered or unexported fields
}
SagaManager orchestrates saga lifecycle and event processing. It subscribes to events, routes them to appropriate sagas, and dispatches resulting commands.
Concurrency and Idempotency ¶
SagaManager provides several mechanisms to ensure correct saga processing under concurrent access:
Per-Saga Locking: Each saga ID has an associated mutex that serializes access. This prevents race conditions when the same event is delivered from multiple sources (e.g., pg_notify + polling) or when multiple events for the same saga arrive simultaneously.
Fresh State Loading: Before processing each event, the saga state is loaded fresh from the store. This ensures terminal status checks and idempotency checks see the latest state.
Event Idempotency: Processed events are tracked in the SagaState.ProcessedEvents field (not in the saga's Data map) to prevent duplicate processing on retries. This is handled transparently by the SagaManager - saga implementations don't need to preserve these internal tracking fields.
Optimistic Concurrency: The saga store uses version-based optimistic concurrency control. On conflict, the event is retried with fresh state.
func NewSagaManager ¶ added in v0.5.0
func NewSagaManager(eventStore *EventStore, opts ...SagaManagerOption) *SagaManager
NewSagaManager creates a new SagaManager.
func (*SagaManager) FindSagaByCorrelationID ¶ added in v0.5.0
func (m *SagaManager) FindSagaByCorrelationID(ctx context.Context, correlationID string) (*SagaState, error)
FindSagaByCorrelationID finds a saga by its correlation ID.
func (*SagaManager) IsRunning ¶ added in v0.5.0
func (m *SagaManager) IsRunning() bool
IsRunning returns true if the saga manager is running.
func (*SagaManager) Position ¶ added in v0.5.0
func (m *SagaManager) Position() uint64
Position returns the current event position.
func (*SagaManager) ProcessEvent ¶ added in v0.5.0
func (m *SagaManager) ProcessEvent(ctx context.Context, event StoredEvent) error
ProcessEvent manually processes a single event (for testing or manual replay).
func (*SagaManager) Register ¶ added in v0.5.0
func (m *SagaManager) Register(sagaType string, factory SagaFactory, correlation SagaCorrelation)
Register registers a saga type with its factory and correlation configuration.
func (*SagaManager) RegisterSimple ¶ added in v0.5.0
func (m *SagaManager) RegisterSimple(sagaType string, factory SagaFactory, startingEvents ...string)
RegisterSimple registers a saga with a simple correlation based on event stream ID.
func (*SagaManager) SetPosition ¶ added in v0.5.0
func (m *SagaManager) SetPosition(pos uint64)
SetPosition sets the starting position for event processing.
func (*SagaManager) Start ¶ added in v0.5.0
func (m *SagaManager) Start(ctx context.Context) error
Start begins processing events and routing them to sagas. This method blocks until the context is cancelled.
func (*SagaManager) StartAsync ¶ added in v0.5.2
func (m *SagaManager) StartAsync(ctx context.Context) *AsyncResult
StartAsync begins processing events and routing them to sagas in a background goroutine. It returns immediately with an AsyncResult that can be used to:
- Wait for the saga manager to stop: result.Wait()
- Wait with timeout: result.WaitWithTimeout(5 * time.Second)
- Check if stopped: result.IsComplete()
- Cancel the manager: result.Cancel()
- Get the error: result.Err()
The saga manager will continue processing until:
- The provided context is cancelled
- result.Cancel() is called
- An unrecoverable error occurs
Example:
result := manager.StartAsync(ctx)
// Do other work while saga manager runs in background...
// Later, when shutting down:
result.Cancel()
if err := result.WaitWithTimeout(10 * time.Second); err != nil {
log.Printf("Saga manager shutdown: %v", err)
}
func (*SagaManager) StartSaga ¶ added in v0.5.2
func (m *SagaManager) StartSaga(ctx context.Context, sagaType string, triggerEvent StoredEvent) error
StartSaga manually triggers a new saga instance synchronously. The saga will be created and the trigger event will be processed immediately.
This is useful when you want to start a saga based on an external trigger rather than waiting for an event to flow through the event store subscription.
func (*SagaManager) StartSagaAsync ¶ added in v0.5.2
func (m *SagaManager) StartSagaAsync(ctx context.Context, sagaType string, triggerEvent StoredEvent) *AsyncResult
StartSagaAsync manually triggers a new saga instance asynchronously. The saga will be started and the first event will be processed in a background goroutine. Returns an AsyncResult that can be used to wait for the initial processing to complete.
This is useful when you want to start a saga based on an external trigger rather than waiting for an event to flow through the event store subscription.
Example:
result := manager.StartSagaAsync(ctx, "OrderFulfillment", initialEvent)
if err := result.WaitWithTimeout(5 * time.Second); err != nil {
log.Printf("Failed to start saga: %v", err)
}
func (*SagaManager) Stop ¶ added in v0.5.0
func (m *SagaManager) Stop()
Stop gracefully stops the saga manager.
type SagaManagerOption ¶ added in v0.5.0
type SagaManagerOption func(*SagaManager)
SagaManagerOption configures a SagaManager.
func WithCommandBus ¶ added in v0.5.0
func WithCommandBus(bus *CommandBus) SagaManagerOption
WithCommandBus sets the command bus for dispatching commands.
func WithSagaLogger ¶ added in v0.5.0
func WithSagaLogger(logger Logger) SagaManagerOption
WithSagaLogger sets the logger.
func WithSagaPollInterval ¶ added in v0.5.0
func WithSagaPollInterval(d time.Duration) SagaManagerOption
WithSagaPollInterval sets the polling interval for event subscription.
func WithSagaRetryAttempts ¶ added in v0.5.0
func WithSagaRetryAttempts(attempts int) SagaManagerOption
WithSagaRetryAttempts sets the number of retry attempts for failed commands.
func WithSagaRetryDelay ¶ added in v0.5.0
func WithSagaRetryDelay(d time.Duration) SagaManagerOption
WithSagaRetryDelay sets the delay between retry attempts.
func WithSagaSerializer ¶ added in v0.5.0
func WithSagaSerializer(serializer Serializer) SagaManagerOption
WithSagaSerializer sets the serializer for saga data.
func WithSagaStore ¶ added in v0.5.0
func WithSagaStore(store SagaStore) SagaManagerOption
WithSagaStore sets the saga store.
type SagaNotFoundError ¶ added in v0.5.0
type SagaNotFoundError = adapters.SagaNotFoundError
SagaNotFoundError provides detailed information about a missing saga. This is a type alias to adapters.SagaNotFoundError for consistency.
type SagaState ¶ added in v0.5.0
SagaState represents the persisted state of a saga.
func SagaStateFromJSON ¶ added in v0.5.0
SagaStateFromJSON parses saga state from JSON.
type SagaStatus ¶ added in v0.5.0
type SagaStatus = adapters.SagaStatus
SagaStatus represents the current status of a saga.
type SagaStepStatus ¶ added in v0.5.0
type SagaStepStatus = adapters.SagaStepStatus
SagaStepStatus represents the status of a saga step.
type SerializationError ¶
type SerializationError struct {
EventType string
Operation string // "serialize" or "deserialize"
Cause error
}
SerializationError provides detailed information about a serialization failure.
func NewSerializationError ¶
func NewSerializationError(eventType, operation string, cause error) *SerializationError
NewSerializationError creates a new SerializationError.
func (*SerializationError) Error ¶
func (e *SerializationError) Error() string
Error returns the error message.
func (*SerializationError) Is ¶
func (e *SerializationError) Is(target error) bool
Is reports whether this error matches the target error.
func (*SerializationError) Unwrap ¶
func (e *SerializationError) Unwrap() error
Unwrap returns the underlying cause for errors.Unwrap().
type Serializer ¶
type Serializer interface {
// Serialize converts an event to bytes.
Serialize(event interface{}) ([]byte, error)
// Deserialize converts bytes back to an event.
// The eventType is used to determine the target type.
Deserialize(data []byte, eventType string) (interface{}, error)
}
Serializer handles event payload serialization and deserialization.
type SimpleDispatcher ¶ added in v0.1.10
type SimpleDispatcher struct {
// contains filtered or unexported fields
}
SimpleDispatcher is a basic dispatcher that forwards commands to handlers.
func NewSimpleDispatcher ¶ added in v0.1.10
func NewSimpleDispatcher(registry *HandlerRegistry) *SimpleDispatcher
NewSimpleDispatcher creates a new SimpleDispatcher.
func (*SimpleDispatcher) Dispatch ¶ added in v0.1.10
func (d *SimpleDispatcher) Dispatch(ctx context.Context, cmd Command) (CommandResult, error)
Dispatch sends a command to its handler.
type StoredEvent ¶
type StoredEvent struct {
// ID is the globally unique event identifier.
ID string
// StreamID identifies the stream this event belongs to.
StreamID string
// Type is the event type identifier.
Type string
// Data is the serialized event payload.
Data []byte
// Metadata contains contextual information.
Metadata Metadata
// Version is the position within the stream (1-based).
Version int64
// GlobalPosition is the position across all streams.
GlobalPosition uint64
// Timestamp is when the event was stored.
Timestamp time.Time
}
StoredEvent represents a persisted event with all storage metadata.
type StreamID ¶
type StreamID struct {
// Category represents the aggregate type (e.g., "Order", "Customer").
Category string
// ID is the unique identifier within the category (e.g., "order-123").
ID string
}
StreamID uniquely identifies an event stream. It consists of a category (aggregate type) and an instance ID.
func NewStreamID ¶
NewStreamID creates a new StreamID from category and ID.
func ParseStreamID ¶
ParseStreamID parses a stream ID string in the format "Category-ID". Returns an error if the format is invalid.
type StreamInfo ¶
type StreamInfo struct {
// StreamID is the stream identifier.
StreamID string
// Category is the stream category (aggregate type).
Category string
// Version is the current stream version.
Version int64
// EventCount is the number of events in the stream.
EventCount int64
// CreatedAt is when the stream was created.
CreatedAt time.Time
// UpdatedAt is when the stream was last modified.
UpdatedAt time.Time
}
StreamInfo contains metadata about an event stream.
type StreamNotFoundError ¶
type StreamNotFoundError struct {
StreamID string
}
StreamNotFoundError provides detailed information about a missing stream.
func NewStreamNotFoundError ¶
func NewStreamNotFoundError(streamID string) *StreamNotFoundError
NewStreamNotFoundError creates a new StreamNotFoundError.
func (*StreamNotFoundError) Error ¶
func (e *StreamNotFoundError) Error() string
Error returns the error message.
func (*StreamNotFoundError) Is ¶
func (e *StreamNotFoundError) Is(target error) bool
Is reports whether this error matches the target error.
func (*StreamNotFoundError) Unwrap ¶
func (e *StreamNotFoundError) Unwrap() error
Unwrap returns the underlying error for errors.Unwrap().
type Subscription ¶ added in v0.3.0
type Subscription interface {
// Events returns the channel for receiving events.
Events() <-chan StoredEvent
// Close stops the subscription.
Close() error
// Err returns any error that caused the subscription to close.
Err() error
}
Subscription represents an active event subscription.
type SubscriptionAdapter ¶ added in v0.3.0
type SubscriptionAdapter interface {
// LoadFromPosition loads events starting from a global position.
LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]StoredEvent, error)
// SubscribeAll subscribes to all events across all streams.
SubscribeAll(ctx context.Context, fromPosition uint64) (<-chan StoredEvent, error)
// SubscribeStream subscribes to events from a specific stream.
SubscribeStream(ctx context.Context, streamID string, fromVersion int64) (<-chan StoredEvent, error)
// SubscribeCategory subscribes to all events from streams in a category.
SubscribeCategory(ctx context.Context, category string, fromPosition uint64) (<-chan StoredEvent, error)
}
SubscriptionAdapter provides methods for subscribing to event streams. This interface extends the basic EventStoreAdapter for subscription capabilities.
type SubscriptionOptions ¶ added in v0.3.0
type SubscriptionOptions struct {
// BufferSize is the size of the event channel buffer.
// Default: 256
BufferSize int
// Filter optionally filters which events are delivered.
Filter EventFilter
// RetryOnError determines whether to retry on transient errors.
// Default: true
RetryOnError bool
// RetryInterval is the time to wait between retries.
// Default: 1 second
RetryInterval time.Duration
// MaxRetries is the maximum number of retry attempts.
// Default: 5
MaxRetries int
}
SubscriptionOptions configures a subscription.
func DefaultSubscriptionOptions ¶ added in v0.3.0
func DefaultSubscriptionOptions() SubscriptionOptions
DefaultSubscriptionOptions returns the default subscription options.
type ValidationError ¶ added in v0.1.10
type ValidationError struct {
// CommandType is the type of command that failed validation.
CommandType string
// Field is the field that failed validation (optional).
Field string
// Message describes the validation failure.
Message string
// Cause is the underlying error (optional).
Cause error
}
ValidationError represents a command validation failure.
func NewValidationError ¶ added in v0.1.10
func NewValidationError(cmdType, field, message string) *ValidationError
NewValidationError creates a new ValidationError.
func NewValidationErrorWithCause ¶ added in v0.1.10
func NewValidationErrorWithCause(cmdType, field, message string, cause error) *ValidationError
NewValidationErrorWithCause creates a new ValidationError with an underlying cause.
func (*ValidationError) Error ¶ added in v0.1.10
func (e *ValidationError) Error() string
Error returns the error message.
func (*ValidationError) Is ¶ added in v0.1.10
func (e *ValidationError) Is(target error) bool
Is reports whether this error matches the target error.
func (*ValidationError) Unwrap ¶ added in v0.1.10
func (e *ValidationError) Unwrap() error
Unwrap returns the underlying cause for errors.Unwrap().
type Validator ¶ added in v0.1.10
type Validator interface {
// Validate validates a command and returns validation errors.
Validate(cmd Command) error
}
Validator provides command validation functionality.
type ValidatorFunc ¶ added in v0.1.10
ValidatorFunc is a function that implements Validator.
func (ValidatorFunc) Validate ¶ added in v0.1.10
func (f ValidatorFunc) Validate(cmd Command) error
Validate implements Validator.
type VersionSetter ¶ added in v0.4.6
type VersionSetter interface {
SetVersion(v int64)
}
VersionSetter is an optional interface that aggregates can implement to allow the EventStore to set their version during loading. This is used for optimistic concurrency control in SaveAggregate. AggregateBase implements this interface.
type VersionedAggregate ¶
type VersionedAggregate interface {
Aggregate
// OriginalVersion returns the version when the aggregate was loaded.
OriginalVersion() int64
}
VersionedAggregate provides versioning information for optimistic concurrency.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package adapters provides interfaces for event store backends.
|
Package adapters provides interfaces for event store backends. |
|
memory
Package memory provides an in-memory implementation of the event store adapter.
|
Package memory provides an in-memory implementation of the event store adapter. |
|
postgres
Package postgres provides a PostgreSQL implementation of the event store adapter.
|
Package postgres provides a PostgreSQL implementation of the event store adapter. |
|
cli
|
|
|
commands
Package commands provides the CLI command implementations for mink.
|
Package commands provides the CLI command implementations for mink. |
|
config
Package config provides configuration management for the mink CLI.
|
Package config provides configuration management for the mink CLI. |
|
styles
Package styles provides consistent styling for the go-mink CLI.
|
Package styles provides consistent styling for the go-mink CLI. |
|
ui
Package ui provides reusable UI components for the go-mink CLI.
|
Package ui provides reusable UI components for the go-mink CLI. |
|
cmd
|
|
|
mink
command
mink is the command-line interface for the go-mink event sourcing library.
|
mink is the command-line interface for the go-mink event sourcing library. |
|
examples
|
|
|
basic
command
Package main demonstrates the basic usage of go-mink for event sourcing.
|
Package main demonstrates the basic usage of go-mink for event sourcing. |
|
cqrs
command
Package main demonstrates the CQRS and Command Bus features of go-mink (Phase 2).
|
Package main demonstrates the CQRS and Command Bus features of go-mink (Phase 2). |
|
cqrs-postgres
command
Package main demonstrates Phase 2 CQRS & Command Bus features with PostgreSQL.
|
Package main demonstrates Phase 2 CQRS & Command Bus features with PostgreSQL. |
|
full-ecommerce
command
Package main demonstrates a complete e-commerce order fulfillment system using go-mink.
|
Package main demonstrates a complete e-commerce order fulfillment system using go-mink. |
|
metrics
command
Example: Metrics Middleware
|
Example: Metrics Middleware |
|
msgpack
command
Example: MessagePack Serializer
|
Example: MessagePack Serializer |
|
projections
command
Package main demonstrates the projection and read model features of go-mink.
|
Package main demonstrates the projection and read model features of go-mink. |
|
protobuf
command
Example: Protocol Buffers Serializer
|
Example: Protocol Buffers Serializer |
|
sagas
command
Example: Saga (Process Manager) Pattern
|
Example: Saga (Process Manager) Pattern |
|
tracing
command
Example: Distributed Tracing with OpenTelemetry
|
Example: Distributed Tracing with OpenTelemetry |
|
middleware
|
|
|
metrics
Package metrics provides Prometheus metrics integration for mink.
|
Package metrics provides Prometheus metrics integration for mink. |
|
tracing
Package tracing provides OpenTelemetry integration for mink.
|
Package tracing provides OpenTelemetry integration for mink. |
|
serializer
|
|
|
msgpack
Package msgpack provides a MessagePack serializer implementation for mink.
|
Package msgpack provides a MessagePack serializer implementation for mink. |
|
protobuf
Package protobuf provides a Protocol Buffers serializer for mink events.
|
Package protobuf provides a Protocol Buffers serializer for mink events. |
|
testing
|
|
|
assertions
Package assertions provides event assertion utilities for testing event-sourced systems.
|
Package assertions provides event assertion utilities for testing event-sourced systems. |
|
bdd
Package bdd provides BDD-style test fixtures for event-sourced aggregates.
|
Package bdd provides BDD-style test fixtures for event-sourced aggregates. |
|
containers
Package containers provides test container utilities for integration testing.
|
Package containers provides test container utilities for integration testing. |
|
projections
Package projections provides testing utilities for projection development.
|
Package projections provides testing utilities for projection development. |
|
sagas
Package sagas provides testing utilities for saga (process manager) development.
|
Package sagas provides testing utilities for saga (process manager) development. |
|
testutil
Package testutil provides test utilities and fixtures for go-mink.
|
Package testutil provides test utilities and fixtures for go-mink. |