Documentation
¶
Overview ¶
Example (ProduceAndConsume) ¶
cc := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
queue, err := rsmq.New(rsmq.Options{
Client: cc,
Topic: "example",
ConsumeOpts: rsmq.ConsumeOpts{
ConsumerGroup: "task_group",
AutoCreateGroup: true,
MaxConcurrency: 1,
},
})
if err != nil {
log.Fatalf("Failed to create queue: %v", err)
}
defer queue.Close()
// Produce tasks
for i := 0; i < 10; i++ {
task := &rsmq.Message{
Payload: json.RawMessage(fmt.Sprintf(`{"message": "Hello %d"}`, i)),
}
err := queue.Add(context.Background(), task)
if err != nil {
log.Printf("Failed to enqueue task: %v", err)
}
}
// Consume tasks
go func() {
err := queue.Consume(
context.Background(),
func(ctx context.Context, task *rsmq.Message) error {
var payload map[string]interface{}
_ = json.Unmarshal(task.Payload, &payload)
fmt.Printf("Processing task, payload: %v\n", payload)
return nil
},
)
if err != nil {
log.Fatalf("Error consuming tasks: %v", err)
}
}()
time.Sleep(time.Second)
Output: Processing task, payload: map[message:Hello 0] Processing task, payload: map[message:Hello 1] Processing task, payload: map[message:Hello 2] Processing task, payload: map[message:Hello 3] Processing task, payload: map[message:Hello 4] Processing task, payload: map[message:Hello 5] Processing task, payload: map[message:Hello 6] Processing task, payload: map[message:Hello 7] Processing task, payload: map[message:Hello 8] Processing task, payload: map[message:Hello 9]
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // MessagingRsmqSystem is the messaging system for rsmq MessagingRsmqSystem = attribute.Key("messaging.system").String("rsmq") // MessagingRsmqMessageTopic is the messaging topic for rsmq MessagingRsmqMessageTopic = attribute.Key("messaging.rsmq.message.topic") // MessagingRsmqMessageGroup is the messaging group for rsmq MessagingRsmqMessageGroup = attribute.Key("messaging.rsmq.message.group") // MessagingRsmqMessageID is the messaging ID for rsmq MessagingRsmqMessageTag = attribute.Key("messaging.rsmq.message.tag") // MessagingRsmqMessageDeliveryTimestamp is the messaging delivery timestamp for rsmq MessagingRsmqMessageDeliveryTimestamp = attribute.Key("messaging.rsmq.message.delivery_timestamp") )
Functions ¶
Types ¶
type BatchMessageHandler ¶
BatchMessageHandler is a function that processes a batch of messages and returns a list of errors
type ConsumeOpts ¶
type ConsumeOpts struct {
// ConsumerGroup is the name of the consumer group
// Must be set if consuming messages
ConsumerGroup string
// ConsumerID is the unique identifier for the consumer
// Default is generated based on hostname and process ID
ConsumerID string
// BatchSize is the number of messages to consume in a single batch
// If set, the consumer will consume messages in batches
BatchSize int64
// MaxBlockDuration is the maximum time to block while waiting for messages
// If set, the consumer will block for the specified duration
MaxBlockDuration time.Duration
// AutoCreateGroup determines whether the consumer group should be created automatically
// If set, the consumer group will be created if it does not exist
AutoCreateGroup bool
// MaxConcurrency is the maximum number of messages to process concurrently
// If set, the messages will be processed concurrently up to the limit
MaxConcurrency uint32
// ConsumerIdleTimeout is the maximum time a consumer can be idle before being removed
// If set, the idle consumers will be removed periodically
ConsumerIdleTimeout time.Duration
// MaxRetryLimit is the maximum number of times a message can be retried
// If set, the message will be re-queued with an exponential backoff
MaxRetryLimit uint32
// RetryTimeWait is the time to wait before retrying a message
// The time to wait is calculated as 2^retryCount * RetryTimeWait
RetryTimeWait time.Duration
// PendingTimeout is the time to wait before a pending message is re-queued
// If set, the pending messages will be re-queued after the timeout
PendingTimeout time.Duration
// IdleConsumerCleanInterval is the interval to clean idle consumers
// If set, the idle consumers will be removed periodically
IdleConsumerCleanInterval time.Duration
// RateLimit is the maximum number of messages to consume per second
// If set, the rate limiter will be used to limit the number of messages consumed
RateLimit int
// SubExpression is the sub expression to filter messages, default is "*"
// e.g. "tag1||tag2||tag3"
SubExpression string
// CloseTimeout is the timeout to wait for the consumer to close
// Default is 5 seconds
CloseTimeout time.Duration
}
ConsumeOpts represents options for consuming messages
type MessageHandler ¶
MessageHandler is a function that processes a message and returns a result
type MessageQueue ¶
type MessageQueue struct {
// contains filtered or unexported fields
}
MessageQueue manages message production and consumption
func (*MessageQueue) Add ¶
func (mq *MessageQueue) Add(ctx context.Context, message *Message) error
Add adds a new message to the queue
func (*MessageQueue) BatchConsume ¶
func (mq *MessageQueue) BatchConsume(ctx context.Context, handler BatchMessageHandler) error
BatchConsume starts consuming messages from the queue in batches
func (*MessageQueue) Consume ¶
func (mq *MessageQueue) Consume(ctx context.Context, handler MessageHandler) error
Consume starts consuming messages from the queue
type Options ¶
type Options struct {
// Client is the Redis client
// Must be set
Client redis.Cmdable
// Topic is the topic name of the message
// Must be set
Topic string
// RetentionOpts represents options for retention policy
RetentionOpts RetentionOpts
// TracerProvider is the OpenTelemetry tracer provider
TracerProvider trace.TracerProvider
// ConsumeOpts represents options for consuming messages
ConsumeOpts ConsumeOpts
}
type RetentionOpts ¶
type RetentionOpts struct {
// MaxLen is the maximum length of the stream
// Default is 20,000,000
MaxLen int64
// MaxRetentionTime is the maximum retention time of the stream
// Default is 168 hours
MaxRetentionTime time.Duration
// CheckRetentionInterval is the interval to check retention time
// Default is 5 minutes
CheckRetentionInterval time.Duration
}
RetentionOpts represents options for retention policy
Click to show internal directories.
Click to hide internal directories.
