swarm

package module
v0.24.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2025 License: Apache-2.0 Imports: 11 Imported by: 28

README

swarm

Go channels for distributed queueing and event-driven systems

sub-moduledocfeaturesabout
API & Kernel
AWS EventBridge
AWS DynamoDB Stream
AWS S3 Event
AWS SQS Events
AWS SQS and SQS FIFO
AWS WebSocket API
AWS SNS
AWS Kinesis Events
AWS Kinesis
AWS ElastiCache
MQTT


From Chaos to Channels

"Finally, messaging that feels like Go!" — Distributed systems, done right.

Writing distributed, event-driven systems in Go today is harder than it should be - a lesson learned from a decade building such systems in Erlang. Vendor APIs are clunky, non-idiomatic, and tightly coupled to specific messaging brokers.

swarm makes asynchronous, distributed messaging in Go idiomatic, testable, and portable by expressing queueing/event-driven systems through Go channels instead of vendor-specific APIs.

User Guide | Playground | Getting started | Examples | Philosophy

Quick Start

The example below shows the simplest way to enqueue and dequeue messages using AWS SQS.

package main

import (
  "github.com/fogfish/swarm"
  "github.com/fogfish/swarm/broker/sqs"
  "github.com/fogfish/swarm/emit"
  "github.com/fogfish/swarm/listen"
)

// Example message
type Order struct {
  ID     string  `json:"id"`
  Amount float64 `json:"amount"`
}

func main() {
  // create broker for AWS SQS
  q := sqs.Endpoint().Build("aws-sqs-queue-name")

  // create Golang channels
  rcv, ack := listen.Typed[Order](q.Listener)
  out := swarm.LogDeadLetters(emit.Typed[Order](q.Emitter))

  // Send messages
  out <- Order{ID: "123", Amount: 100.0}

  // use Golang channels for I/O
  go func() {
    for order := range rcv {
      processOrder(order.Object)
      ack <- order  // acknowledge processing
    }
  }()

  q.Await()
}

func processOrder(order Order) {
  // Your business logic here
}

That's it! You're now using distributed messaging with native Go channels.

Continue reading to understand the purpose of the library and why it exists.

What is swarm?

swarm is a Go library that solves the complexity of distributed, event-driven systems by abstracting external messaging queues (like AWS SQS, AWS EventBridge, RabbitMQ, Kafka, etc.) behind type-safe Go channels.

By aligning with Go's native concurrency model, swarm encourages developers to think in terms of message flows rather than request-response cycles, which reflects the asynchronous and unreliable reality of distributed systems. This mindset shift leads to more resilient, scalable, and maintainable architectures — where complexity is not hidden, but made explicit and manageable.

  • Immediate Productivity: Use Go channel patterns you already know, no need to learn new APIs.
  • Smooth Testing: Write unit tests with in-memory channels; switch to real systems for integration.
  • Future-Proof: Swap messaging technologies without changing your business logic.
  • Serverless-Ready: Designed for both long-running services and ephemeral serverless functions.
  • Scale-Out: Idiomatic architecture to build distributed topologies and scale-out Golang applications.

Think of swarm as net.Conn for distributed messaging: a universal, idiomatic interface you can depend on, regardless of the underlying transport.

Below we discussed why swarm exists and what problems it solves.

Why Choose swarm?

Traditional messaging libraries have fundamental issues

Traditional libraries With swarm
Different SDK for each broker One API for all brokers
Complex mocks for testing In-memory channels for tests
Vendor lock-in Portable across technologies
Sync APIs for async systems Native async with Go channels
Manual error handling Built-in retries & dead letters

swarm solves these problems by providing a universal, idiomatic interface built on Go's concurrency model — embracing the asynchronous nature of distributed systems, making message flows explicit, and hiding vendor-specific complexity behind familiar Go channels.

See practical scenarios for swarm in Storytelling.

Getting started

The library requires Go 1.24 or later due to usage of generic alias types.

The latest version of the library is available at main branch of this repository. All development, including new features and bug fixes, take place on the main branch using forking and pull requests as described in contribution guidelines. The stable version is available via Golang modules.

Use go get to retrieve the library and add it as dependency to your application.

go get -u github.com/fogfish/swarm
When to Use swarm

Perfect for:

  • Event-driven architectures - Decouple services with async messaging
  • Serverless applications - Zero-boilerplate event consumers on AWS Lambda
  • Microservices - Replace fragile HTTP calls with resilient messaging
  • High-throughput systems - Scale message processing horizontally
  • Multi-cloud applications - Abstract away broker differences

You'll love it if you:

  • Want type-safe messaging without vendor lock-in
  • Need to unit test message-driven code easily
  • Prefer Go channels over learning broker-specific APIs
  • Value clean, testable architecture patterns

Advanced Usage and Next steps

  • Learn why we built swarm.
  • User guide help you with adopting the library.
  • The library supports a variety of brokers out of the box. See the table at the beginning of this document for details. If you need to implement a broker that isn’t supported yet, refer to Bring Your Own Broker for guidance.

Real-World Success Stories

Here's how swarm solves actual production problems that teams face every day.

E-commerce Order Processing: The Cascading Failure Problem

Imagine you're building an e-commerce platform that processes thousands of orders per minute during Black Friday sales. Your order processing system needs to:

  • Validate payment information
  • Update inventory levels
  • Send confirmation emails
  • Update analytics systems
  • Trigger fulfillment processes

What are problems with the traditional approach:

  • Blocking Operations: Each service call blocks until completion, making the system slow during peak loads
  • Cascading Failures: If the email service is down, the entire order process fails, even though the core business logic succeeded
  • Poor Scalability: Cannot scale individual components independently
func ProcessOrder(order *Order) error {
  // Synchronous calls create cascading failures
  if err := paymentService.Charge(order.Payment); err != nil {
    return err // Customer sees error immediately
  }
    
  if err := inventoryService.Reserve(order.Items); err != nil {
    return err // Payment charged but inventory failed
  }
    
  if err := emailService.SendConfirmation(order); err != nil {
    return err // Everything fails if email is slow
  }
    
  return nil
}

Using the swarm library, we benefit from

  • Immediate Response: Customers get instant order confirmation
  • Fault Tolerance: If email service is down, orders still process successfully
  • Independent Scaling: Each service can scale based on its own load patterns
func ProcessOrder(order *Order) error {
  // Immediate response to customer
  if err := validateOrder(order); err != nil {
    return err
  }

  // Async processing via channels
  orderEvents <- OrderCreated{Order: order}
    
  return nil // Customer gets immediate confirmation
}

// Background processing
go func() {
  for event := range orderEvents {
    paymentQueue <- PaymentRequest{Order: event.Order}
    inventoryQueue <- InventoryUpdate{Order: event.Order}
    emailQueue <- EmailNotification{Order: event.Order}
  }
}()
Serverless Event Processing: The SDK Complexity Problem

You're building a serverless application on AWS that needs to process events from multiple sources - SQS queues, S3 bucket notifications, EventBridge events, and DynamoDB streams. Each service has its own SDK with different patterns.

// Different patterns for each service
func handleSQSEvent(event events.SQSEvent) error {
  for _, record := range event.Records {
    // SQS-specific parsing and acknowledgment
    body := record.Body
    receiptHandle := record.ReceiptHandle
    // ... SQS-specific error handling
  }
}

func handleS3Event(event events.S3Event) error {
  for _, record := range event.Records {
    // S3-specific parsing
    bucket := record.S3.Bucket.Name
    key := record.S3.Object.Key
    // ... S3-specific error handling
  }
}

Using the swarm library, we benefit from

  • Unified Interface: Same code patterns work across all AWS services
  • Easy Testing: Mock implementations for local development
  • Technology Agnostic: Can switch from SQS to EventBridge without code changes
events := eventbridge.Must(eventbridge.Listener().Build())
upload := s3.Must(s3.Listener().Build())
orders := sqs.Must(sqs.Listener().Build())

listen.Typed[UserEvent](events)
listen.Typed[DataUploaded](upload)
listen.Typed[Order](orders)
Microservices Communication: The Synchronous Trap

You have a microservices architecture where the user service, notification service, and analytics service need to communicate. Traditional REST APIs create tight coupling and cascading failures.

// Synchronous REST calls create brittleness
func CreateUser(user *User) error {
  // Save user
  if err := userDB.Save(user); err != nil {
    return err
  }

  // Synchronous calls to other services
  if err := notificationService.SendWelcomeEmail(user); err != nil {
    return err // User creation fails if email fails
  }

  if err := analyticsService.TrackUserCreated(user); err != nil {
    return err // User creation fails if analytics fails
  }

  return nil
}

Using the swarm library, we benefit from

  • Resilient: Core functionality works even if downstream services are down
  • Scalable: Services can process at their own pace
  • Evolvable: Easy to add new services without changing existing code
// Async communication via channels
func CreateUser(user *User) error {
  // Core business logic
  if err := userDB.Save(user); err != nil {
      return err
  }

  // Async notifications
  userEvents <- UserCreated{User: user}

  return nil // Immediate success
}

// Services consume events independently
go func() {
  for event := range userEvents {
    notificationQueue <- WelcomeEmail{User: event.User}
    analyticsQueue <- UserCreatedEvent{User: event.User}
  }
}()

How To Contribute

The library is Apache Version 2.0 licensed and accepts contributions via GitHub pull requests:

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Added some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

The build and testing process requires Go version 1.24 or later.

build and test library.

git clone https://github.com/fogfish/swarm
cd swarm
go test ./...
commit message

The commit message helps us to write a good release note, speed-up review process. The message should address two question what changed and why. The project follows the template defined by chapter Contributing to a Project of Git book.

bugs

If you experience any issues with the library, please let us know via GitHub issues. We appreciate detailed and accurate reports that help us to identity and replicate the issue.

Community & Support

License

See LICENSE

Documentation

Index

Constants

View Source
const (
	EnvConfigRealm          = "CONFIG_SWARM_REALM"
	EnvConfigAgent          = "CONFIG_SWARM_AGENT"
	EnvConfigPollFrequency  = "CONFIG_SWARM_POLL_FREQUENCY"
	EnvConfigTimeToFlight   = "CONFIG_SWARM_TIME_TO_FLIGHT"
	EnvConfigNetworkTimeout = "CONFIG_SWARM_NETWORK_TIMEOUT"
)

Environment variable to config kernel

View Source
const (
	ErrServiceIO  = faults.Type("service i/o failed")
	ErrEnqueue    = faults.Type("enqueue i/o failed")
	ErrEncoder    = faults.Type("encoder failure")
	ErrDequeue    = faults.Type("dequeue i/o failed")
	ErrDecoder    = faults.Type("decoder failure")
	ErrRouting    = faults.Safe1[string]("routing has failed (cat %s)")
	ErrCatUnknown = faults.Safe1[string]("unknown category %s")
)
View Source
const Version = "v0.24.1"

Variables

View Source
var (
	// Unique identity of the realm (logical environment or world) where the event was created.
	// Useful to support deployment isolation (e.g., green/blue, canary) in event-driven systems.
	WithRealm = opts.ForName[Config, string]("Realm")

	// Source is a direct performer of the event.
	// A software service that emits action to the stream.
	//
	// Deprecated: use WithAgent instead.
	WithSource = opts.ForName[Config, string]("Agent")

	// Agent is a software service that consumes/emits events from/to the stream.
	WithAgent = opts.ForName[Config, string]("Agent")

	// Define I/O backoff strategy
	// * backoff.Const(t, n) retry operation for N times, with T wait time in between
	// * backoff.Linear(t, n) retry operation for N times, with linear increments by T on each step
	// * backoff.Exp(t, n, f) retry operation for N times, with exponential increments by T on each step
	// * backoff.Empty() no retry
	WithRetry = opts.ForType[Config, Retry]()

	// Configure broker to route global errors to channel
	WithStdErr = opts.ForType[Config, chan<- error]()

	// Number of poller in the system
	WithPollerPool = opts.ForName[Config, int]("PollerPool")

	// Frequency to poll broker api
	WithPollFrequency = opts.ForName[Config, time.Duration]("PollFrequency")

	// Time To Flight for message from broker API to consumer
	WithTimeToFlight = opts.ForName[Config, time.Duration]("TimeToFlight")

	// Timeout for Network I/O
	WithNetworkTimeout = opts.ForName[Config, time.Duration]("NetworkTimeout")

	// Configures capacity for emit channel at broker
	WithEmitCapacity = opts.ForName[Config, int]("CapOut")

	// Configures capacity for dead letter channel at broker
	WithDeadLetterCapacity = opts.ForName[Config, int]("CapDlq")

	// Configures capacity for receive channel at broker
	WithRecvCapacity = opts.ForName[Config, int]("CapRcv")

	// Configures capacity for acknowledge channel at broker
	WithAckCapacity = opts.ForName[Config, int]("CapAck")

	// AtMostOnce is best effort policy, where a message is published without any
	// formal acknowledgement of receipt, and it isn't replayed.
	//
	// The policy only impacts behavior of Golang channels created by the broker
	WithPolicyAtMostOnce = opts.ForName("CapRcv",
		func(c *Config, n int) error {
			c.Policy = PolicyAtMostOnce
			c.CapOut = n
			c.CapDlq = n
			c.CapRcv = n
			c.CapAck = n
			return nil
		})

	// AtLeastOnce policy ensures delivery of the message to broker
	//
	// The policy only impacts behavior of Golang channels created by the broker
	WithPolicyAtLeastOnce = opts.ForName("CapRcv",
		func(c *Config, n int) error {
			c.Policy = PolicyAtLeastOnce
			c.CapOut = 0
			c.CapDlq = 0
			c.CapRcv = n
			c.CapAck = n
			return nil
		},
	)

	// Fail fast the message if category is not known to kernel.
	WithFailOnUnknownCategory = opts.ForName[Config, bool]("FailOnUnknownCategory")
)

Functions

func ErrTimeout added in v0.20.1

func ErrTimeout(op string, timer time.Duration) error

func LogDeadLetters added in v0.13.0

func LogDeadLetters[T any](out chan<- T, err <-chan T) chan<- T

Consumes dead letter messages

swarm.LogDeadLetters(queue.Enqueue(...))

func TypeOf added in v0.20.0

func TypeOf[T any](category ...string) string

TypeOf returns normalized name of the type T.

func WithConfigFromEnv added in v0.13.2

func WithConfigFromEnv() opts.Option[Config]

Configure from Environment, (all timers in seconds) - CONFIG_SWARM_REALM - CONFIG_SWARM_AGENT - CONFIG_SWARM_POLL_FREQUENCY - CONFIG_SWARM_TIME_TO_FLIGHT - CONFIG_SWARM_NETWORK_TIMEOUT

func WithLogStdErr added in v0.13.0

func WithLogStdErr() opts.Option[Config]

Configure broker to log standard errors

Types

type Bag added in v0.4.0

type Bag struct {
	// Message category ~ topic
	Category string

	// Unique brief summary of the message
	Digest Digest

	// Error on the message processing
	Error error

	// I/O Context of the message, as obtained from broker
	IOContext any

	// Message raw content
	Object []byte
}

Bag is an abstract container for octet stream. Bag is used by the transport to abstract message on the wire.

type Config

type Config struct {
	// Unique identity of the realm (logical environment or world) where the event was created.
	// Useful to support deployment isolation (e.g., green/blue, canary) in event-driven systems.
	Realm string

	// Agent is a direct performer of the event.
	// It is a software service that consumes/emits events from/to the stream.
	Agent string

	// Quality of Service Policy
	Policy Policy

	// Queue capacity (enhance with individual capacities)
	CapOut int
	CapDlq int
	CapRcv int
	CapAck int

	// Retry Policy for service calls
	Backoff Retry

	// Standard Error I/O channel
	StdErr chan<- error

	// Size of poller pool in the system
	PollerPool int

	// Frequency to poll broker api
	PollFrequency time.Duration

	// Time To Flight is a time required by the client to acknowledge the message
	TimeToFlight time.Duration

	// Timeout for any network operations
	NetworkTimeout time.Duration

	// Fail fast the message if category is not known to kernel.
	FailOnUnknownCategory bool
}

func NewConfig added in v0.9.0

func NewConfig() Config

type Digest added in v0.13.0

type Digest string

Unique brief summary of the message, specific to the broker

type Event added in v0.5.0

type Event[M, T any] struct {
	// Unique brief summary of the message
	Digest Digest `json:"-"`

	// Error on the message processing
	Error error `json:"-"`

	// I/O Context of the message, as obtained from broker
	IOContext any `json:"-"`

	Meta *M `json:"meta,omitempty"`
	Data *T `json:"data,omitempty"`
}

Event defines immutable fact(s) placed into the queueing system. Event resembles the concept of Action as it is defined by schema.org.

> An action performed by a direct agent and indirect participants upon a direct object.

This type supports development of event-driven solutions that treat data as a collection of immutable facts, which are queried and processed in real-time. These applications processes logical log of events, each event defines a change to current state of the object, i.e. which attributes were inserted, updated or deleted (a kind of diff). The event identifies the object that was changed together with using unique identifier.

In contrast with other solutions, the event does not uses envelop approach. Instead, it side-car meta and data each other, making extendible

func ToEvent added in v0.24.0

func ToEvent[M, T any](bag Bag, evt Event[M, T]) Event[M, T]

func (Event[M, T]) Fail added in v0.13.0

func (evt Event[M, T]) Fail(err error) Event[M, T]

Fail event with error

type Meta added in v0.20.0

type Meta struct {
	// Sink for the event
	Sink curie.IRI `json:"sink,omitempty"`

	//
	// Unique identity for event.
	// It is automatically defined by the library upon the transmission unless
	// defined by sender. Preserving ID across sequence of messages allows
	// building request/response semantic.
	ID string `json:"id,omitempty"`

	//
	// Canonical IRI that defines a type of action.
	// It is automatically defined by the library upon the transmission unless
	// defined by sender.
	Type curie.IRI `json:"type,omitempty"`

	// Unique identity of the realm (logical environment or world) where the event was created.
	// Useful to support deployment isolation (e.g., green/blue, canary) in event-driven systems.
	Realm curie.IRI `json:"realm,omitempty"`

	//
	// Direct performer of the event, a software service that emits action to
	// the stream. It is automatically defined by the library upon the transmission
	// unless defined by sender.
	Agent curie.IRI `json:"agent,omitempty"`

	//
	// ISO8601 timestamps when action has been created
	// It is automatically defined by the library upon the transmission
	Created time.Time `json:"created"`

	//
	// Indicates target performer of the event, a software service that is able to
	Target curie.IRI `json:"target,omitempty"`

	//
	// Indirect participants, a user who initiated an event.
	Participant curie.IRI `json:"participant,omitempty"`
}

The default metadata associated with event.

type Msg

type Msg[T any] struct {
	// Message category ~ topic
	Category string

	// Unique brief summary of the message
	Digest Digest

	// Error on the message processing
	Error error

	// I/O Context of the message, as obtained from broker
	IOContext any

	// Message decoded content
	Object T
}

Msg is a generic envelop type for incoming messages. It contains both decoded object and its digest used to acknowledge message.

func ToMsg added in v0.20.0

func ToMsg[T any](bag Bag, object T) Msg[T]

func (Msg[T]) Fail added in v0.12.0

func (msg Msg[T]) Fail(err error) Msg[T]

Fail message with error

type Policy added in v0.4.0

type Policy int

Grade of Service Policy

const (
	PolicyAtMostOnce Policy = iota
	PolicyAtLeastOnce
	PolicyExactlyOnce
)

type Retry added in v0.9.0

type Retry interface{ Retry(f func() error) error }

Directories

Path Synopsis
broker
embedded module
eventbridge module
eventddb module
events3 module
eventsqs module
sqs module
websocket module
qtest module
queue module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL