Documentation
¶
Overview ¶
Package sdq 提供简单高效的延迟队列实现 受 beanstalkd 启发,提供 topic、优先级、bury/kick、TTR 等特性
Index ¶
- Constants
- Variables
- func ValidateTopicName(name string) error
- type Config
- type Inspector
- func (i *Inspector) DeleteJob(id uint64) error
- func (i *Inspector) ForceDeleteJob(id uint64) error
- func (i *Inspector) GetJobBody(ctx context.Context, id uint64) ([]byte, error)
- func (i *Inspector) KickJob(id uint64) error
- func (i *Inspector) ListJobs(ctx context.Context, filter *JobMetaFilter) (*JobMetaList, error)
- func (i *Inspector) ListTopics() []string
- func (i *Inspector) ListTopicsPage(offset, limit int, order SortOrder) ([]string, int)
- func (i *Inspector) StartedAt() time.Time
- func (i *Inspector) Stats() *Stats
- func (i *Inspector) StatsJob(id uint64) (*JobMeta, error)
- func (i *Inspector) StatsTopic(name string) (*TopicStats, error)
- func (i *Inspector) StorageStats(ctx context.Context) (*StorageStats, error)
- func (i *Inspector) TickerStats() *TickerStats
- func (i *Inspector) TopicStats() []*TopicStats
- func (i *Inspector) TopicStatsPage(offset, limit int, order SortOrder) ([]*TopicStats, int)
- func (i *Inspector) WaitingStats() []WaitingStats
- type Job
- type JobMeta
- func (m *JobMeta) Clone() *JobMeta
- func (m *JobMeta) ReserveDeadline() time.Time
- func (m *JobMeta) ShouldBeReady(now time.Time) bool
- func (m *JobMeta) ShouldTimeout(now time.Time) bool
- func (m *JobMeta) TimeUntilReady(now time.Time) time.Duration
- func (m *JobMeta) TimeUntilTimeout(now time.Time) time.Duration
- type JobMetaFilter
- type JobMetaList
- type NewStorageFunc
- type NewTickerFunc
- type Queue
- func (q *Queue) Bury(id uint64, priority uint32) error
- func (q *Queue) Delete(id uint64) error
- func (q *Queue) ForceDelete(id uint64) error
- func (q *Queue) Kick(topic string, bound int) (int, error)
- func (q *Queue) KickJob(id uint64) error
- func (q *Queue) Peek(id uint64) (*Job, error)
- func (q *Queue) PeekBuried(topicName string) (*Job, error)
- func (q *Queue) PeekDelayed(topicName string) (*Job, error)
- func (q *Queue) PeekReady(topicName string) (*Job, error)
- func (q *Queue) Put(topic string, body []byte, priority uint32, delay, ttr time.Duration) (uint64, error)
- func (q *Queue) Release(id uint64, priority uint32, delay time.Duration) error
- func (q *Queue) Reserve(topics []string, timeout time.Duration) (*Job, error)
- func (q *Queue) Start() error
- func (q *Queue) StartWithOptions(opts StartOptions) error
- func (q *Queue) Stop() error
- func (q *Queue) Touch(id uint64, duration ...time.Duration) error
- func (q *Queue) TryReserve(topics []string) *JobMeta
- func (q *Queue) WaitForRecovery(timeout time.Duration) error
- type RecoveryCallback
- type RecoveryPhase
- type RecoveryProgress
- type RecoveryResult
- type SortOrder
- type StartOptions
- type State
- type Stats
- type Storage
- type StorageStats
- type Tickable
- type Ticker
- type TickerStats
- type TopicStats
- type WaitingStats
Constants ¶
const ( // MaxWaitersPerTopic 每个 topic 最大等待 worker 数 // 防止大量 worker 等待导致内存占用过高 MaxWaitersPerTopic = 1000 )
Variables ¶
var ( // ErrNotFound 任务不存在 ErrNotFound = errors.New("sdq: job not found") // ErrNotReserved 任务未被保留 ErrNotReserved = errors.New("sdq: job not reserved") // ErrNotBuried 任务未被埋葬 ErrNotBuried = errors.New("sdq: job not buried") // ErrInvalidState 任务状态无效 ErrInvalidState = errors.New("sdq: invalid job state") // ErrTimeout 操作超时 ErrTimeout = errors.New("sdq: timeout") // ErrInvalidTopic topic 名称无效(空、过长或包含非法字符) ErrInvalidTopic = errors.New("sdq: invalid topic name") // ErrMaxTopicsReached 达到最大 topic 数量 ErrMaxTopicsReached = errors.New("sdq: max topics reached") // ErrMaxJobsReached 达到最大任务数量 ErrMaxJobsReached = errors.New("sdq: max jobs reached") // ErrTouchLimitExceeded Touch 次数超限 ErrTouchLimitExceeded = errors.New("sdq: touch limit exceeded") // ErrInvalidTouchTime Touch 时间无效 ErrInvalidTouchTime = errors.New("sdq: invalid touch time") // ErrTooManyWaiters 等待队列已满 ErrTooManyWaiters = errors.New("sdq: too many waiters") // ErrInvalidTimeout timeout 必须大于 0 ErrInvalidTimeout = errors.New("sdq: timeout must be greater than 0") // ErrQueueStopped Queue 已停止,无法重新启动 ErrQueueStopped = errors.New("sdq: queue already stopped") // ErrInvalidConfig 配置无效 ErrInvalidConfig = errors.New("sdq: invalid config") )
var ( // ErrStorageClosed 存储已关闭 ErrStorageClosed = errors.New("sdq: storage closed") // ErrJobExists 任务已存在 ErrJobExists = errors.New("sdq: job already exists") )
Functions ¶
Types ¶
type Config ¶
type Config struct {
// DefaultTTR 默认 TTR
DefaultTTR time.Duration
// MaxJobSize 最大任务大小(字节)
MaxJobSize int
// Ticker 自定义 Ticker 实例(优先级高于 NewTickerFunc)
Ticker Ticker
// NewTickerFunc Ticker 构造函数(当 Ticker 为 nil 时使用)
NewTickerFunc NewTickerFunc
// MaxTouches 最大 Touch 次数
MaxTouches int
// MaxTouchDuration 最大延长时间
MaxTouchDuration time.Duration
// MinTouchInterval 最小 Touch 间隔
MinTouchInterval time.Duration
// MaxTopics 最大 topic 数(0 表示无限制)
MaxTopics int
// MaxJobsPerTopic 每个 topic 最大任务数(0 表示无限制)
MaxJobsPerTopic int
// EnableTopicCleanup 启用 Topic 惰性清理(定期清理空 Topic)
EnableTopicCleanup bool
// TopicCleanupInterval Topic 清理间隔(默认 1 小时)
TopicCleanupInterval time.Duration
// Storage 存储后端实例(优先级高于 NewStorageFunc)
Storage Storage
// NewStorageFunc Storage 构造函数(当 Storage 为 nil 时使用)
NewStorageFunc NewStorageFunc
// Logger 日志记录器(可选)
// 如果为 nil,将使用 slog.Default()
Logger *slog.Logger
}
Config Queue 配置
type Inspector ¶
type Inspector struct {
// contains filtered or unexported fields
}
Inspector 提供队列统计和监控功能
func (*Inspector) ForceDeleteJob ¶
ForceDeleteJob 强制删除指定的任务(支持任何状态:ready/delayed/reserved/buried)
func (*Inspector) GetJobBody ¶
GetJobBody 获取任务 Body 内容
func (*Inspector) ListJobs ¶
func (i *Inspector) ListJobs(ctx context.Context, filter *JobMetaFilter) (*JobMetaList, error)
ListJobs 查询任务元数据列表(支持过滤和分页) 这是一个底层方法,用于实现各种查询场景
func (*Inspector) ListTopics ¶
ListTopics 返回所有 topic 列表(已排序) 注意:当 topic 数量较多时,建议使用 ListTopicsPage 进行分页查询
func (*Inspector) ListTopicsPage ¶
ListTopicsPage 分页返回 topic 列表(已排序) offset: 起始位置(从 0 开始) limit: 返回数量 order: 排序方向(SortAsc 或 SortDesc) 返回: (topic 名称列表, 总数)
func (*Inspector) StatsTopic ¶
func (i *Inspector) StatsTopic(name string) (*TopicStats, error)
StatsTopic 返回 topic 统计信息
func (*Inspector) StorageStats ¶
func (i *Inspector) StorageStats(ctx context.Context) (*StorageStats, error)
StorageStats 返回存储统计信息
func (*Inspector) TickerStats ¶
func (i *Inspector) TickerStats() *TickerStats
TickerStats 返回定时器统计信息
func (*Inspector) TopicStats ¶
func (i *Inspector) TopicStats() []*TopicStats
TopicStats 返回所有 Topic 的统计信息(已排序) 注意:当 topic 数量较多时,建议使用 TopicStatsPage 进行分页查询
func (*Inspector) TopicStatsPage ¶
func (i *Inspector) TopicStatsPage(offset, limit int, order SortOrder) ([]*TopicStats, int)
TopicStatsPage 分页返回 Topic 的统计信息(已排序) offset: 起始位置(从 0 开始) limit: 返回数量 order: 排序方向(SortAsc 或 SortDesc) 返回: (统计信息列表, 总数)
func (*Inspector) WaitingStats ¶
func (i *Inspector) WaitingStats() []WaitingStats
WaitingStats 返回所有 topics 的等待队列统计
type Job ¶
type Job struct {
Meta *JobMeta // 元数据
// contains filtered or unexported fields
}
Job 完整任务 包含元数据和 Body Reserve 时才组装完整的 Job 返回给 Worker
func NewJobWithStorage ¶
NewJobWithStorage 创建任务(延迟加载模式) 注意:meta 应该已经是克隆的副本(由 TryReserve 返回) 此函数直接使用传入的 meta,不再重复克隆
type JobMeta ¶
type JobMeta struct {
// === 基本信息 ===
ID uint64 // 任务 ID
Topic string // 所属 topic
Priority uint32 // 优先级(数字越小优先级越高)
State State // 当前状态
// === 时间参数 ===
Delay time.Duration // 延迟时间(创建时指定)
TTR time.Duration // Time To Run - 执行超时时间
// === 时间戳 ===
CreatedAt time.Time // 创建时间
ReadyAt time.Time // 就绪时间(延迟任务的到期时间)
ReservedAt time.Time // 被保留时间
LastTouchAt time.Time // 最后一次 Touch 的时间
BuriedAt time.Time // 被埋葬时间
DeletedAt time.Time // 删除时间(软删除时使用)
// === 统计信息 ===
Reserves int // 被保留次数
Timeouts int // 超时次数
Releases int // 被释放次数
Buries int // 被埋葬次数
Kicks int // 被踢出次数
Touches int // Touch 次数(TTR 延长)
TotalTouchTime time.Duration // 累计延长的总时间(用于 MaxTouchDuration 检查)
}
JobMeta 任务元数据 轻量级结构,常驻内存用于调度 大小约 200 字节
func NewJobMeta ¶
NewJobMeta 创建新的任务元数据
func (*JobMeta) ReserveDeadline ¶
ReserveDeadline 返回保留任务的截止时间 只对 StateReserved 状态有效
func (*JobMeta) ShouldBeReady ¶
ShouldBeReady 判断延迟任务是否应该转为 Ready 状态 只对 StateDelayed 状态有效
func (*JobMeta) ShouldTimeout ¶
ShouldTimeout 判断保留任务是否应该超时转回 Ready 状态 只对 StateReserved 状态有效
func (*JobMeta) TimeUntilReady ¶
TimeUntilReady 返回延迟任务距离就绪的时间 只对 StateDelayed 状态有效
type JobMetaFilter ¶
type JobMetaFilter struct {
Topic string // 按 topic 过滤,空表示不过滤
State *State // 按状态过滤,nil 表示不过滤
Limit int // 返回数量限制,0 表示无限制
Offset int // 偏移量,用于分页
Cursor uint64 // 游标(任务 ID),用于游标分页,0 表示从头开始
}
JobMetaFilter 任务元数据过滤条件
type JobMetaList ¶
type JobMetaList struct {
Metas []*JobMeta // 任务元数据列表
Total int // 总数(可选,某些实现可能不支持)
HasMore bool // 是否还有更多数据
NextCursor uint64 // 下一页游标(最后一个任务的 ID)
}
JobMetaList 任务元数据列表结果
type NewStorageFunc ¶
NewStorageFunc Storage 构造函数
type NewTickerFunc ¶
NewTickerFunc Ticker 构造函数
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue 延迟队列 架构设计: - 每个 Topic 独立管理自己的 Ready/Delayed/Reserved/Buried 队列 - 只存储 JobMeta(轻量级),Body 按需从 Storage 加载 - Wheel Tick 负责定时通知 Topic 处理到期任务 - Storage 负责持久化
func (*Queue) PeekBuried ¶
PeekBuried 查看指定 topic 的下一个埋葬任务
func (*Queue) PeekDelayed ¶
PeekDelayed 查看指定 topic 的下一个将要就绪的延迟任务
func (*Queue) Put ¶
func (q *Queue) Put(topic string, body []byte, priority uint32, delay, ttr time.Duration) (uint64, error)
Put 添加任务到队列 topic: topic 名称,不能为空 body: 任务数据 priority: 优先级(数字越小优先级越高) delay: 延迟时间 ttr: 执行超时时间,0 使用默认值
func (*Queue) StartWithOptions ¶
func (q *Queue) StartWithOptions(opts StartOptions) error
StartWithOptions 使用选项启动 Queue 此方法是幂等的,多次调用只会启动一次
type RecoveryCallback ¶
type RecoveryCallback func(progress *RecoveryProgress)
RecoveryCallback 恢复进度回调函数 如果提供了回调,恢复过程中会调用它报告进度
type RecoveryPhase ¶
type RecoveryPhase int
RecoveryPhase 恢复阶段
const ( RecoveryPhaseStart RecoveryPhase = iota // 开始恢复 RecoveryPhaseRecovering // 恢复中 RecoveryPhaseComplete // 完成 RecoveryPhaseError // 错误 )
type RecoveryProgress ¶
type RecoveryProgress struct {
Phase RecoveryPhase // 当前阶段
Result *RecoveryResult // 恢复结果(仅在 Complete 阶段有值)
TotalJobs int // 总任务数
LoadedJobs int // 已加载任务数
FailedJobs int // 失败任务数
Error error // 错误信息(仅在 Error 阶段有值)
}
RecoveryProgress 恢复进度
type RecoveryResult ¶
type RecoveryResult struct {
MaxID uint64 // 最大任务 ID
TopicJobs map[string][]*JobMeta // topic -> jobs
TotalJobs int // 总任务数
FailedJobs int // 失败的任务数
}
RecoveryResult 恢复结果
type StartOptions ¶
type StartOptions struct {
// RecoveryCallback 恢复进度回调
// 如果提供了回调,会在恢复过程中调用它报告进度(Start, Recovering 阶段)
RecoveryCallback RecoveryCallback
}
Start 启动 Queue 如果配置了 Storage,会从 Storage 恢复任务 StartOptions 启动选项
type State ¶
type State int
State 任务状态
const ( // StateEnqueued 已入队,等待加载到内存 // 临时状态,任务刚创建时的初始状态 StateEnqueued State = iota // StateReady 就绪,等待 Reserve // 任务在 Topic.ReadyHeap 中,等待 Worker 拉取 StateReady // StateDelayed 延迟中,等待到期 // 任务在 Topic.DelayedHeap 中,到期后转为 Ready StateDelayed // StateReserved 已保留,Worker 处理中 // 任务在 Topic.ReservedMap 中,受 TTR 保护 StateReserved // StateBuried 已埋葬,暂时搁置 // 任务在 Topic.BuriedHeap 中,需要 Kick 才能恢复 StateBuried )
type Stats ¶
type Stats struct {
// 队列状态
TotalJobs int // 总任务数
ReadyJobs int // 就绪任务数
DelayedJobs int // 延迟任务数
ReservedJobs int // 保留任务数
BuriedJobs int // 埋葬任务数
Topics int // topic 数量
// 操作计数
Puts uint64 // Put 操作次数
Reserves uint64 // Reserve 操作次数
Deletes uint64 // Delete 操作次数
Releases uint64 // Release 操作次数
Buries uint64 // Bury 操作次数
Kicks uint64 // Kick 操作次数
Timeouts uint64 // 超时次数
Touches uint64 // Touch 操作次数
}
Stats 队列统计信息
type Storage ¶
type Storage interface {
// Name 返回存储名称
Name() string
// SaveJob 保存完整任务(元数据 + Body)
// 只在 Put 时调用,同时保存 meta 和 body
// Body 不可变,一旦保存就不会修改
// 如果任务已存在则返回 ErrJobExists
SaveJob(ctx context.Context, meta *JobMeta, body []byte) error
// UpdateJobMeta 更新任务元数据
// 只更新元数据(状态、统计等),不涉及 Body
// 如果任务不存在则返回 ErrNotFound
UpdateJobMeta(ctx context.Context, meta *JobMeta) error
// GetJobMeta 获取任务元数据
// 如果任务不存在则返回 ErrNotFound
GetJobMeta(ctx context.Context, id uint64) (*JobMeta, error)
// ScanJobMeta 扫描任务元数据
// 支持过滤、分页和游标
// filter 为 nil 时返回所有任务元数据(用于启动恢复)
// 启动恢复时:ScanJobMeta(ctx, nil) 加载所有元数据,不加载 Body
// 性能关键:100 万任务时只需 200MB 内存而不是 10GB
ScanJobMeta(ctx context.Context, filter *JobMetaFilter) (*JobMetaList, error)
// GetJobBody 获取任务 Body
// 如果任务不存在则返回 ErrNotFound
// Reserve 时才调用,按需加载
GetJobBody(ctx context.Context, id uint64) ([]byte, error)
// DeleteJob 删除任务(元数据 + Body)
// 如果任务不存在则返回 ErrNotFound
DeleteJob(ctx context.Context, id uint64) error
// CountJobs 统计任务数量
// filter 为 nil 时统计所有任务
CountJobs(ctx context.Context, filter *JobMetaFilter) (int, error)
// GetMaxJobID 获取最大任务 ID
// 用于快速启动时初始化 ID 生成器,避免扫描所有任务
// 如果没有任务则返回 0
GetMaxJobID(ctx context.Context) (uint64, error)
// Stats 返回存储统计信息
Stats(ctx context.Context) (*StorageStats, error)
// Close 关闭存储
Close() error
}
Storage 持久化存储接口 设计原则:元数据与 Body 分离存储 - 元数据:轻量级(~200B),常驻内存,用于调度,会更新 - Body:可能很大(KB~MB),按需加载,不可变
type StorageStats ¶
type StorageStats struct {
Name string // 存储名称
TotalJobs int64 // 总任务数
TotalTopics int // 总 topic 数
MetaSize int64 // 元数据存储大小(字节)
BodySize int64 // Body 存储大小(字节)
TotalSize int64 // 总存储大小(字节)
LastSaveTime int64 // 最后保存时间(Unix 时间戳)
LastLoadTime int64 // 最后加载时间(Unix 时间戳)
AvgMetaSize int64 // 平均元数据大小(字节)
AvgBodySize int64 // 平均 Body 大小(字节)
LoadedMetaSize int64 // 已加载元数据大小(字节)
LoadedBodySize int64 // 已加载 Body 大小(字节)
}
StorageStats 存储统计信息
type Tickable ¶
type Tickable interface {
// ProcessTick 处理 tick 通知
ProcessTick(now time.Time)
// NextTickTime 返回下一个需要 tick 的时间
// 返回 zero time 表示不需要 tick
NextTickTime() time.Time
// NeedsTick 是否需要 tick
NeedsTick() bool
}
Tickable 可被 tick 的对象接口 Topic 实现此接口以接收 tick 通知
type Ticker ¶
type Ticker interface {
// Name 返回定时器名称
Name() string
// Start 启动定时器
Start()
// Stop 停止定时器
Stop()
// Register 注册需要 tick 的对象
Register(name string, tickable Tickable)
// Unregister 取消注册
Unregister(name string)
// Wakeup 唤醒定时器(新任务加入时)
Wakeup()
// Stats 返回统计信息
Stats() *TickerStats
}
Ticker 定时器接口 负责定时触发已注册对象的 ProcessTick 方法
type TickerStats ¶
type TickerStats struct {
Name string // 定时器名称
RegisteredCount int // 注册的对象数量
NextTickTime time.Time // 下一个 tick 时间
TimeUntilTick time.Duration // 距离下一个 tick 的时间
}
TickerStats 定时器统计信息
type TopicStats ¶
type TopicStats struct {
Name string
ReadyJobs int
DelayedJobs int
ReservedJobs int
BuriedJobs int
TotalJobs int
}
TopicStats Topic 统计信息
type WaitingStats ¶
WaitingStats 等待队列统计信息
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
memorydynsleep
command
|
|
|
memorytimewheel
command
|
|
|
sqlitedynsleep
command
|
|
|
sqlitetimewheel
command
|
|
|
stability
command
stability 是一个带 Inspector 监控的稳定性测试程序
|
stability 是一个带 Inspector 监控的稳定性测试程序 |
|
webui
command
Package main 演示如何使用 webui 包提供的 HTTP API 和嵌入式前端界面
|
Package main 演示如何使用 webui 包提供的 HTTP API 和嵌入式前端界面 |
|
x
|
|
|
dynsleep
Package dynsleep provides a dynamic sleep ticker implementation.
|
Package dynsleep provides a dynamic sleep ticker implementation. |
|
memory
Package memory provides an in-memory storage implementation.
|
Package memory provides an in-memory storage implementation. |
|
metrics
Package metrics 提供 Prometheus metrics 支持
|
Package metrics 提供 Prometheus metrics 支持 |
|
sqlite
Package sqlite provides a SQLite storage implementation.
|
Package sqlite provides a SQLite storage implementation. |
|
timewheel
Package timewheel provides a time wheel ticker implementation.
|
Package timewheel provides a time wheel ticker implementation. |
|
webui
Package inspector 提供任务查询能力,方便实时 UI 展示
|
Package inspector 提供任务查询能力,方便实时 UI 展示 |