Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
BootstrapServers string // Comma-separated list of brokers: "broker1:9092,broker2:9092"
RequiredAcks int // e.g. 1 or kafka.RequireAll (see kafka-go docs)
MaxRetries int // Maximum number of retry attempts
MessageChannelBuffer int // Size of the message channel buffer
WorkerPoolSize int // Number of worker goroutines to process messages
BatchSize int // Maximum number of messages to batch before sending
BatchTimeoutMs int // Maximum time to wait before sending a batch
}
Config holds configuration options for the Kafka producer.
type Consumer ¶
type Consumer interface {
// Start begins the consumption loop. It reads messages from the configured topics,
// applies the provided handler (with built-in retry logic), and commits offsets (if using manual commits).
// This method blocks until the context is cancelled or an unrecoverable error occurs.
Start(ctx context.Context, handler func(ctx context.Context, msg *kafka.Message) error) error
// Close gracefully shuts down the consumer.
Close() error
}
Consumer defines the interface for consuming Kafka messages.
func NewKafkaConsumer ¶
func NewKafkaConsumer(logger zerolog.Logger, cfg ConsumerConfig, producer Producer, metrics metrics.Metrics) (Consumer, error)
NewKafkaConsumer creates and configures a new Kafka consumer. It splits the comma-separated BootstrapServers into a slice of broker addresses.
type ConsumerConfig ¶
type ConsumerConfig struct {
BootstrapServers string // Comma-separated list of brokers: "broker1:9092,broker2:9092"
GroupID string // Consumer group identifier
Topic string // topic to subscribe to
CommitInterval time.Duration // If > 0, automatic commits occur at this interval; if 0, manual commits are required.
MaxProcessingRetries int // Number of attempts to process a message before giving up
DeadLetterTopic string // Topic name for the Dead Letter Queue (DLQ)
ConsumerChannelBuffer int // Size of the message channel buffer
ConsumerWorkerPool int // Number of worker goroutines to process messages
}
ConsumerConfig holds configuration options for the Kafka consumer.
type KafkaWriter ¶
type KafkaWriter interface {
WriteMessages(ctx context.Context, msgs ...kafka.Message) error
Close() error
}
KafkaWriter is an interface for writing messages to Kafka. It is used to abstract the kafka.Writer dependency for testing.
type Producer ¶
type Producer interface {
// Produce sends a message to a given topic. If sending fails, a retry is scheduled.
// The returned channel will receive an error when the message is successfully produced or when an error occurs.
// The error will be nil if the message is produced successfully.
// The error will be non-nil if the message could not be produced after all retries
Produce(ctx context.Context, topic string, key []byte, value []byte) <-chan error
// Close gracefully shuts down the producer.
// The context is used to enforce a timeout for the operation.
Close(ctx context.Context) error
}
Producer is the interface for sending Kafka messages.
type Reader ¶
type Reader interface {
// ReadMessage reads the next message from the topic.
// This method blocks until a message is available or the context is cancelled.
// If the context is cancelled, an error is returned.
ReadMessage(ctx context.Context) (kafka.Message, error)
// CommitMessages commits the provided messages.
// This method is used for manual commits.
// If the context is cancelled, an error is returned.
CommitMessages(ctx context.Context, msgs ...kafka.Message) error
// Close gracefully shuts down the reader.
// This method is used to close the reader when the consumer is shut down.
Close() error
}
Reader is an interface for reading messages from Kafka. It is used to abstract the kafka.Reader dependency for testing.
Click to show internal directories.
Click to hide internal directories.