Documentation
¶
Index ¶
- Constants
- Variables
- func ErrTimeout(op string, timer time.Duration) error
- func LogDeadLetters[T any](out chan<- T, err <-chan T) chan<- T
- func TypeOf[T any](category ...string) string
- func WithConfigFromEnv() opts.Option[Config]
- func WithLogStdErr() opts.Option[Config]
- type Bag
- type Config
- type Digest
- type Event
- type Meta
- type Msg
- type Policy
- type Retry
Constants ¶
const ( EnvConfigRealm = "CONFIG_SWARM_REALM" EnvConfigAgent = "CONFIG_SWARM_AGENT" EnvConfigPollFrequency = "CONFIG_SWARM_POLL_FREQUENCY" EnvConfigTimeToFlight = "CONFIG_SWARM_TIME_TO_FLIGHT" EnvConfigNetworkTimeout = "CONFIG_SWARM_NETWORK_TIMEOUT" )
Environment variable to config kernel
const ( ErrServiceIO = faults.Type("service i/o failed") ErrEnqueue = faults.Type("enqueue i/o failed") ErrEncoder = faults.Type("encoder failure") ErrDequeue = faults.Type("dequeue i/o failed") ErrDecoder = faults.Type("decoder failure") ErrRouting = faults.Safe1[string]("routing has failed (cat %s)") ErrCatUnknown = faults.Safe1[string]("unknown category %s") )
const Version = "v0.24.1"
Variables ¶
var ( // Unique identity of the realm (logical environment or world) where the event was created. // Useful to support deployment isolation (e.g., green/blue, canary) in event-driven systems. WithRealm = opts.ForName[Config, string]("Realm") // Source is a direct performer of the event. // A software service that emits action to the stream. // // Deprecated: use WithAgent instead. WithSource = opts.ForName[Config, string]("Agent") // Agent is a software service that consumes/emits events from/to the stream. WithAgent = opts.ForName[Config, string]("Agent") // Define I/O backoff strategy // * backoff.Const(t, n) retry operation for N times, with T wait time in between // * backoff.Linear(t, n) retry operation for N times, with linear increments by T on each step // * backoff.Exp(t, n, f) retry operation for N times, with exponential increments by T on each step // * backoff.Empty() no retry WithRetry = opts.ForType[Config, Retry]() // Configure broker to route global errors to channel WithStdErr = opts.ForType[Config, chan<- error]() // Number of poller in the system WithPollerPool = opts.ForName[Config, int]("PollerPool") // Frequency to poll broker api WithPollFrequency = opts.ForName[Config, time.Duration]("PollFrequency") // Time To Flight for message from broker API to consumer WithTimeToFlight = opts.ForName[Config, time.Duration]("TimeToFlight") // Timeout for Network I/O WithNetworkTimeout = opts.ForName[Config, time.Duration]("NetworkTimeout") // Configures capacity for emit channel at broker WithEmitCapacity = opts.ForName[Config, int]("CapOut") // Configures capacity for dead letter channel at broker WithDeadLetterCapacity = opts.ForName[Config, int]("CapDlq") // Configures capacity for receive channel at broker WithRecvCapacity = opts.ForName[Config, int]("CapRcv") // Configures capacity for acknowledge channel at broker WithAckCapacity = opts.ForName[Config, int]("CapAck") // AtMostOnce is best effort policy, where a message is published without any // formal acknowledgement of receipt, and it isn't replayed. // // The policy only impacts behavior of Golang channels created by the broker WithPolicyAtMostOnce = opts.ForName("CapRcv", func(c *Config, n int) error { c.Policy = PolicyAtMostOnce c.CapOut = n c.CapDlq = n c.CapRcv = n c.CapAck = n return nil }) // AtLeastOnce policy ensures delivery of the message to broker // // The policy only impacts behavior of Golang channels created by the broker WithPolicyAtLeastOnce = opts.ForName("CapRcv", func(c *Config, n int) error { c.Policy = PolicyAtLeastOnce c.CapOut = 0 c.CapDlq = 0 c.CapRcv = n c.CapAck = n return nil }, ) // Fail fast the message if category is not known to kernel. WithFailOnUnknownCategory = opts.ForName[Config, bool]("FailOnUnknownCategory") )
Functions ¶
func LogDeadLetters ¶ added in v0.13.0
func LogDeadLetters[T any](out chan<- T, err <-chan T) chan<- T
Consumes dead letter messages
swarm.LogDeadLetters(queue.Enqueue(...))
func WithConfigFromEnv ¶ added in v0.13.2
Configure from Environment, (all timers in seconds) - CONFIG_SWARM_REALM - CONFIG_SWARM_AGENT - CONFIG_SWARM_POLL_FREQUENCY - CONFIG_SWARM_TIME_TO_FLIGHT - CONFIG_SWARM_NETWORK_TIMEOUT
func WithLogStdErr ¶ added in v0.13.0
Configure broker to log standard errors
Types ¶
type Bag ¶ added in v0.4.0
type Bag struct {
// Message category ~ topic
Category string
// Unique brief summary of the message
Digest Digest
// Error on the message processing
Error error
// I/O Context of the message, as obtained from broker
IOContext any
// Message raw content
Object []byte
}
Bag is an abstract container for octet stream. Bag is used by the transport to abstract message on the wire.
type Config ¶
type Config struct {
// Unique identity of the realm (logical environment or world) where the event was created.
// Useful to support deployment isolation (e.g., green/blue, canary) in event-driven systems.
Realm string
// Agent is a direct performer of the event.
// It is a software service that consumes/emits events from/to the stream.
Agent string
// Quality of Service Policy
Policy Policy
// Queue capacity (enhance with individual capacities)
CapOut int
CapDlq int
CapRcv int
CapAck int
// Retry Policy for service calls
Backoff Retry
// Standard Error I/O channel
StdErr chan<- error
// Size of poller pool in the system
PollerPool int
// Frequency to poll broker api
PollFrequency time.Duration
// Time To Flight is a time required by the client to acknowledge the message
TimeToFlight time.Duration
// Timeout for any network operations
NetworkTimeout time.Duration
// Fail fast the message if category is not known to kernel.
FailOnUnknownCategory bool
}
type Digest ¶ added in v0.13.0
type Digest string
Unique brief summary of the message, specific to the broker
type Event ¶ added in v0.5.0
type Event[M, T any] struct { // Unique brief summary of the message Digest Digest `json:"-"` // Error on the message processing Error error `json:"-"` // I/O Context of the message, as obtained from broker IOContext any `json:"-"` Meta *M `json:"meta,omitempty"` Data *T `json:"data,omitempty"` }
Event defines immutable fact(s) placed into the queueing system. Event resembles the concept of Action as it is defined by schema.org.
> An action performed by a direct agent and indirect participants upon a direct object.
This type supports development of event-driven solutions that treat data as a collection of immutable facts, which are queried and processed in real-time. These applications processes logical log of events, each event defines a change to current state of the object, i.e. which attributes were inserted, updated or deleted (a kind of diff). The event identifies the object that was changed together with using unique identifier.
In contrast with other solutions, the event does not uses envelop approach. Instead, it side-car meta and data each other, making extendible
type Meta ¶ added in v0.20.0
type Meta struct {
// Sink for the event
Sink curie.IRI `json:"sink,omitempty"`
//
// Unique identity for event.
// It is automatically defined by the library upon the transmission unless
// defined by sender. Preserving ID across sequence of messages allows
// building request/response semantic.
ID string `json:"id,omitempty"`
//
// Canonical IRI that defines a type of action.
// It is automatically defined by the library upon the transmission unless
// defined by sender.
Type curie.IRI `json:"type,omitempty"`
// Unique identity of the realm (logical environment or world) where the event was created.
// Useful to support deployment isolation (e.g., green/blue, canary) in event-driven systems.
Realm curie.IRI `json:"realm,omitempty"`
//
// Direct performer of the event, a software service that emits action to
// the stream. It is automatically defined by the library upon the transmission
// unless defined by sender.
Agent curie.IRI `json:"agent,omitempty"`
//
// ISO8601 timestamps when action has been created
// It is automatically defined by the library upon the transmission
Created time.Time `json:"created"`
//
// Indicates target performer of the event, a software service that is able to
Target curie.IRI `json:"target,omitempty"`
//
// Indirect participants, a user who initiated an event.
Participant curie.IRI `json:"participant,omitempty"`
}
The default metadata associated with event.
type Msg ¶
type Msg[T any] struct { // Message category ~ topic Category string // Unique brief summary of the message Digest Digest // Error on the message processing Error error // I/O Context of the message, as obtained from broker IOContext any // Message decoded content Object T }
Msg is a generic envelop type for incoming messages. It contains both decoded object and its digest used to acknowledge message.