sdq

package module
v0.0.0-...-bda256d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2025 License: MIT Imports: 10 Imported by: 0

README

SDQ - Simple Delay Queue

Go Reference Go Report Card

简洁高效的延迟队列,基于 Go 实现,受 beanstalkd 启发。

特性

  • 多 Topic 隔离 - 支持多个命名队列
  • 优先级调度 - 数字越小优先级越高
  • 延迟执行 - 支持任务延迟执行和指定时间执行
  • TTR 保护 - Time-To-Run 机制防止任务丢失
  • 任务管理 - 支持 Bury/Kick/Release/Touch 操作
  • 持久化 - 可选的 Memory/SQLite 存储
  • 高性能 - 批量写入优化,内存队列调度
  • Pull 模式 - Worker 主动拉取,支持多 worker 并发
  • 类型安全 - 高级 Task API 提供编译时类型检查

安装

go get go-slim.dev/sdq

快速开始

基础用法
package main

import (
    "time"
    "go-slim.dev/sdq"
    "go-slim.dev/sdq/x/memory"
)

func main() {
    // 创建队列
    config := sdq.DefaultConfig()
    config.Storage = memory.New()

    q, _ := sdq.New(config)
    _ = q.Start()
    defer q.Stop()

    // 发布任务
    _, _ = q.Put("email", []byte("send email to [email protected]"),
        10,              // priority
        0,               // delay
        60*time.Second,  // TTR
    )

    // 消费任务
    job, _ := q.Reserve([]string{"email"}, 5*time.Second)
    println(string(job.Body()))
    _ = job.Delete()
}
Task API(推荐)
package main

import (
    "context"
    "time"
    "go-slim.dev/sdq"
    "go-slim.dev/sdq/x/memory"
    "go-slim.dev/sdq/x/task"
)

type EmailTask struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
}

var sendEmail = task.Register("send-email", &task.Config{
    Handler: func(ctx context.Context, data EmailTask) error {
        println("Sending email to:", data.To)
        return nil
    },
    TTR: 60 * time.Second,
})

func main() {
    config := sdq.DefaultConfig()
    config.Storage = memory.New()
    q, _ := sdq.New(config)
    _ = q.Start()
    defer q.Stop()

    sendEmail.SetQueue(q)

    // 发布任务
    _ = sendEmail.Publish(context.Background(), EmailTask{
        To:      "[email protected]",
        Subject: "Hello",
    })

    // 启动 Worker
    worker := task.NewWorker(q)
    _ = worker.Start(3)
    defer worker.Stop()
}

核心概念

Job 状态流转
Put → [Delayed] → Ready → Reserve → Reserved → Delete
                    ↑                    ↓
                    ←── Release/Timeout ──
                                         ↓
                                       Bury → Kick →
操作
操作 说明
Put 发布任务
Reserve 获取任务(阻塞等待)
Delete 删除任务
Release 释放任务(重新入队)
Bury 埋葬任务(暂时搁置)
Kick 唤醒埋葬的任务
Touch 延长 TTR

配置

config := sdq.Config{
    DefaultTTR:   60 * time.Second,  // 默认超时时间
    MaxJobSize:   64 * 1024,         // 最大任务大小
    MaxTouches:   10,                // Touch 最大次数
}
Storage
import (
    "go-slim.dev/sdq/x/memory"
    "go-slim.dev/sdq/x/sqlite"
)

// Memory(开发测试)
config.Storage = memory.New()

// SQLite(生产环境)
// 需要引入驱动:github.com/mattn/go-sqlite3 或 modernc.org/sqlite
config.Storage, _ = sqlite.New("./jobs.db")

文档

详细使用指南请参阅 GUIDE.md,包含:

  • 完整配置说明
  • Task API 详解
  • Worker 工作原理
  • 持久化与恢复
  • 最佳实践
  • 架构设计

License

MIT License

Documentation

Overview

Package sdq 提供简单高效的延迟队列实现 受 beanstalkd 启发,提供 topic、优先级、bury/kick、TTR 等特性

Index

Constants

View Source
const (
	// MaxWaitersPerTopic 每个 topic 最大等待 worker 数
	// 防止大量 worker 等待导致内存占用过高
	MaxWaitersPerTopic = 1000
)

Variables

View Source
var (
	// ErrNotFound 任务不存在
	ErrNotFound = errors.New("sdq: job not found")
	// ErrNotReserved 任务未被保留
	ErrNotReserved = errors.New("sdq: job not reserved")
	// ErrNotBuried 任务未被埋葬
	ErrNotBuried = errors.New("sdq: job not buried")
	// ErrInvalidState 任务状态无效
	ErrInvalidState = errors.New("sdq: invalid job state")
	// ErrTimeout 操作超时
	ErrTimeout = errors.New("sdq: timeout")
	// ErrInvalidTopic topic 名称无效(空、过长或包含非法字符)
	ErrInvalidTopic = errors.New("sdq: invalid topic name")
	// ErrMaxTopicsReached 达到最大 topic 数量
	ErrMaxTopicsReached = errors.New("sdq: max topics reached")
	// ErrMaxJobsReached 达到最大任务数量
	ErrMaxJobsReached = errors.New("sdq: max jobs reached")
	// ErrTouchLimitExceeded Touch 次数超限
	ErrTouchLimitExceeded = errors.New("sdq: touch limit exceeded")
	// ErrInvalidTouchTime Touch 时间无效
	ErrInvalidTouchTime = errors.New("sdq: invalid touch time")
	// ErrTooManyWaiters 等待队列已满
	ErrTooManyWaiters = errors.New("sdq: too many waiters")
	// ErrInvalidTimeout timeout 必须大于 0
	ErrInvalidTimeout = errors.New("sdq: timeout must be greater than 0")
	// ErrQueueStopped Queue 已停止,无法重新启动
	ErrQueueStopped = errors.New("sdq: queue already stopped")
	// ErrInvalidConfig 配置无效
	ErrInvalidConfig = errors.New("sdq: invalid config")
)
View Source
var (
	// ErrStorageClosed 存储已关闭
	ErrStorageClosed = errors.New("sdq: storage closed")
	// ErrJobExists 任务已存在
	ErrJobExists = errors.New("sdq: job already exists")
)

Functions

func ValidateTopicName

func ValidateTopicName(name string) error

ValidateTopicName 验证 topic 名称

Types

type Config

type Config struct {
	// DefaultTTR 默认 TTR
	DefaultTTR time.Duration
	// MaxJobSize 最大任务大小(字节)
	MaxJobSize int

	// Ticker 自定义 Ticker 实例(优先级高于 NewTickerFunc)
	Ticker Ticker
	// NewTickerFunc Ticker 构造函数(当 Ticker 为 nil 时使用)
	NewTickerFunc NewTickerFunc

	// MaxTouches 最大 Touch 次数
	MaxTouches int
	// MaxTouchDuration 最大延长时间
	MaxTouchDuration time.Duration
	// MinTouchInterval 最小 Touch 间隔
	MinTouchInterval time.Duration

	// MaxTopics 最大 topic 数(0 表示无限制)
	MaxTopics int
	// MaxJobsPerTopic 每个 topic 最大任务数(0 表示无限制)
	MaxJobsPerTopic int
	// EnableTopicCleanup 启用 Topic 惰性清理(定期清理空 Topic)
	EnableTopicCleanup bool
	// TopicCleanupInterval Topic 清理间隔(默认 1 小时)
	TopicCleanupInterval time.Duration

	// Storage 存储后端实例(优先级高于 NewStorageFunc)
	Storage Storage
	// NewStorageFunc Storage 构造函数(当 Storage 为 nil 时使用)
	NewStorageFunc NewStorageFunc

	// Logger 日志记录器(可选)
	// 如果为 nil,将使用 slog.Default()
	Logger *slog.Logger
}

Config Queue 配置

func DefaultConfig

func DefaultConfig() Config

DefaultConfig 返回默认配置

type Inspector

type Inspector struct {
	// contains filtered or unexported fields
}

Inspector 提供队列统计和监控功能

func NewInspector

func NewInspector(queue *Queue) *Inspector

NewInspector 创建新的 Inspector

func (*Inspector) DeleteJob

func (i *Inspector) DeleteJob(id uint64) error

DeleteJob 删除指定的任务(仅限已保留状态)

func (*Inspector) ForceDeleteJob

func (i *Inspector) ForceDeleteJob(id uint64) error

ForceDeleteJob 强制删除指定的任务(支持任何状态:ready/delayed/reserved/buried)

func (*Inspector) GetJobBody

func (i *Inspector) GetJobBody(ctx context.Context, id uint64) ([]byte, error)

GetJobBody 获取任务 Body 内容

func (*Inspector) KickJob

func (i *Inspector) KickJob(id uint64) error

KickJob 踢出指定的埋葬任务(使其重新进入 ready 队列)

func (*Inspector) ListJobs

func (i *Inspector) ListJobs(ctx context.Context, filter *JobMetaFilter) (*JobMetaList, error)

ListJobs 查询任务元数据列表(支持过滤和分页) 这是一个底层方法,用于实现各种查询场景

func (*Inspector) ListTopics

func (i *Inspector) ListTopics() []string

ListTopics 返回所有 topic 列表(已排序) 注意:当 topic 数量较多时,建议使用 ListTopicsPage 进行分页查询

func (*Inspector) ListTopicsPage

func (i *Inspector) ListTopicsPage(offset, limit int, order SortOrder) ([]string, int)

ListTopicsPage 分页返回 topic 列表(已排序) offset: 起始位置(从 0 开始) limit: 返回数量 order: 排序方向(SortAsc 或 SortDesc) 返回: (topic 名称列表, 总数)

func (*Inspector) StartedAt

func (i *Inspector) StartedAt() time.Time

StartedAt 返回队列启动时间

func (*Inspector) Stats

func (i *Inspector) Stats() *Stats

Stats 返回整体统计信息的快照

func (*Inspector) StatsJob

func (i *Inspector) StatsJob(id uint64) (*JobMeta, error)

StatsJob 返回指定任务的元数据

func (*Inspector) StatsTopic

func (i *Inspector) StatsTopic(name string) (*TopicStats, error)

StatsTopic 返回 topic 统计信息

func (*Inspector) StorageStats

func (i *Inspector) StorageStats(ctx context.Context) (*StorageStats, error)

StorageStats 返回存储统计信息

func (*Inspector) TickerStats

func (i *Inspector) TickerStats() *TickerStats

TickerStats 返回定时器统计信息

func (*Inspector) TopicStats

func (i *Inspector) TopicStats() []*TopicStats

TopicStats 返回所有 Topic 的统计信息(已排序) 注意:当 topic 数量较多时,建议使用 TopicStatsPage 进行分页查询

func (*Inspector) TopicStatsPage

func (i *Inspector) TopicStatsPage(offset, limit int, order SortOrder) ([]*TopicStats, int)

TopicStatsPage 分页返回 Topic 的统计信息(已排序) offset: 起始位置(从 0 开始) limit: 返回数量 order: 排序方向(SortAsc 或 SortDesc) 返回: (统计信息列表, 总数)

func (*Inspector) WaitingStats

func (i *Inspector) WaitingStats() []WaitingStats

WaitingStats 返回所有 topics 的等待队列统计

type Job

type Job struct {
	Meta *JobMeta // 元数据
	// contains filtered or unexported fields
}

Job 完整任务 包含元数据和 Body Reserve 时才组装完整的 Job 返回给 Worker

func NewJob

func NewJob(meta *JobMeta, body []byte, queue *Queue) *Job

NewJob 创建完整任务(立即加载模式) storage 为 nil 表示 body 已传入,GetBody 会直接返回

func NewJobWithStorage

func NewJobWithStorage(meta *JobMeta, storage Storage, queue *Queue) *Job

NewJobWithStorage 创建任务(延迟加载模式) 注意:meta 应该已经是克隆的副本(由 TryReserve 返回) 此函数直接使用传入的 meta,不再重复克隆

func (*Job) Body

func (j *Job) Body() []byte

Body 返回任务 Body(便捷方法,忽略错误) 如果加载失败,返回 nil 建议使用 GetBody() 以获得错误信息

func (*Job) Bury

func (j *Job) Bury(priority uint32) error

Bury 埋葬已保留的任务 priority: 新的优先级

func (*Job) Delete

func (j *Job) Delete() error

Delete 删除任务(必须是已保留状态)

func (*Job) GetBody

func (j *Job) GetBody() ([]byte, error)

GetBody 获取任务 Body(延迟加载,并发安全)

func (*Job) Kick

func (j *Job) Kick() error

Kick 踢出埋葬任务(将此任务从埋葬状态恢复) 注意:此方法只能用于已埋葬的任务

func (*Job) Release

func (j *Job) Release(priority uint32, delay time.Duration) error

Release 释放已保留的任务 priority: 新的优先级 delay: 延迟时间

func (*Job) Touch

func (j *Job) Touch(duration ...time.Duration) error

Touch 延长任务的 TTR 支持两种模式: - Touch() - 重置为原始 TTR - Touch(duration) - 延长指定时间

type JobMeta

type JobMeta struct {
	// === 基本信息 ===
	ID       uint64 // 任务 ID
	Topic    string // 所属 topic
	Priority uint32 // 优先级(数字越小优先级越高)
	State    State  // 当前状态

	// === 时间参数 ===
	Delay time.Duration // 延迟时间(创建时指定)
	TTR   time.Duration // Time To Run - 执行超时时间

	// === 时间戳 ===
	CreatedAt   time.Time // 创建时间
	ReadyAt     time.Time // 就绪时间(延迟任务的到期时间)
	ReservedAt  time.Time // 被保留时间
	LastTouchAt time.Time // 最后一次 Touch 的时间
	BuriedAt    time.Time // 被埋葬时间
	DeletedAt   time.Time // 删除时间(软删除时使用)

	// === 统计信息 ===
	Reserves       int           // 被保留次数
	Timeouts       int           // 超时次数
	Releases       int           // 被释放次数
	Buries         int           // 被埋葬次数
	Kicks          int           // 被踢出次数
	Touches        int           // Touch 次数(TTR 延长)
	TotalTouchTime time.Duration // 累计延长的总时间(用于 MaxTouchDuration 检查)
}

JobMeta 任务元数据 轻量级结构,常驻内存用于调度 大小约 200 字节

func NewJobMeta

func NewJobMeta(id uint64, topic string, priority uint32, delay, ttr time.Duration) *JobMeta

NewJobMeta 创建新的任务元数据

func (*JobMeta) Clone

func (m *JobMeta) Clone() *JobMeta

Clone 克隆元数据(用于返回副本,避免外部修改)

func (*JobMeta) ReserveDeadline

func (m *JobMeta) ReserveDeadline() time.Time

ReserveDeadline 返回保留任务的截止时间 只对 StateReserved 状态有效

func (*JobMeta) ShouldBeReady

func (m *JobMeta) ShouldBeReady(now time.Time) bool

ShouldBeReady 判断延迟任务是否应该转为 Ready 状态 只对 StateDelayed 状态有效

func (*JobMeta) ShouldTimeout

func (m *JobMeta) ShouldTimeout(now time.Time) bool

ShouldTimeout 判断保留任务是否应该超时转回 Ready 状态 只对 StateReserved 状态有效

func (*JobMeta) TimeUntilReady

func (m *JobMeta) TimeUntilReady(now time.Time) time.Duration

TimeUntilReady 返回延迟任务距离就绪的时间 只对 StateDelayed 状态有效

func (*JobMeta) TimeUntilTimeout

func (m *JobMeta) TimeUntilTimeout(now time.Time) time.Duration

TimeUntilTimeout 返回保留任务距离超时的时间 只对 StateReserved 状态有效

type JobMetaFilter

type JobMetaFilter struct {
	Topic  string // 按 topic 过滤,空表示不过滤
	State  *State // 按状态过滤,nil 表示不过滤
	Limit  int    // 返回数量限制,0 表示无限制
	Offset int    // 偏移量,用于分页
	Cursor uint64 // 游标(任务 ID),用于游标分页,0 表示从头开始
}

JobMetaFilter 任务元数据过滤条件

type JobMetaList

type JobMetaList struct {
	Metas      []*JobMeta // 任务元数据列表
	Total      int        // 总数(可选,某些实现可能不支持)
	HasMore    bool       // 是否还有更多数据
	NextCursor uint64     // 下一页游标(最后一个任务的 ID)
}

JobMetaList 任务元数据列表结果

type NewStorageFunc

type NewStorageFunc func(ctx context.Context, config *Config) (Storage, error)

NewStorageFunc Storage 构造函数

type NewTickerFunc

type NewTickerFunc func(ctx context.Context, config *Config) (Ticker, error)

NewTickerFunc Ticker 构造函数

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue 延迟队列 架构设计: - 每个 Topic 独立管理自己的 Ready/Delayed/Reserved/Buried 队列 - 只存储 JobMeta(轻量级),Body 按需从 Storage 加载 - Wheel Tick 负责定时通知 Topic 处理到期任务 - Storage 负责持久化

func New

func New(config Config) (*Queue, error)

New 创建新的 Queue 实例

func (*Queue) Bury

func (q *Queue) Bury(id uint64, priority uint32) error

Bury 埋葬已保留的任务

func (*Queue) Delete

func (q *Queue) Delete(id uint64) error

Delete 删除任务(必须是已保留状态)

func (*Queue) ForceDelete

func (q *Queue) ForceDelete(id uint64) error

ForceDelete 强制删除任务(支持任何状态)

func (*Queue) Kick

func (q *Queue) Kick(topic string, bound int) (int, error)

Kick 踢出埋葬的任务 topic: topic 名称 bound: 最多踢出的任务数

func (*Queue) KickJob

func (q *Queue) KickJob(id uint64) error

KickJob 踢出指定的埋葬任务

func (*Queue) Peek

func (q *Queue) Peek(id uint64) (*Job, error)

Peek 查看任务但不保留

func (*Queue) PeekBuried

func (q *Queue) PeekBuried(topicName string) (*Job, error)

PeekBuried 查看指定 topic 的下一个埋葬任务

func (*Queue) PeekDelayed

func (q *Queue) PeekDelayed(topicName string) (*Job, error)

PeekDelayed 查看指定 topic 的下一个将要就绪的延迟任务

func (*Queue) PeekReady

func (q *Queue) PeekReady(topicName string) (*Job, error)

PeekReady 查看指定 topic 的下一个就绪任务

func (*Queue) Put

func (q *Queue) Put(topic string, body []byte, priority uint32, delay, ttr time.Duration) (uint64, error)

Put 添加任务到队列 topic: topic 名称,不能为空 body: 任务数据 priority: 优先级(数字越小优先级越高) delay: 延迟时间 ttr: 执行超时时间,0 使用默认值

func (*Queue) Release

func (q *Queue) Release(id uint64, priority uint32, delay time.Duration) error

Release 释放已保留的任务 id: 任务 ID priority: 新的优先级 delay: 延迟时间

func (*Queue) Reserve

func (q *Queue) Reserve(topics []string, timeout time.Duration) (*Job, error)

Reserve 从指定 topics 保留一个任务 topics: topic 名称列表,不能为空 timeout: 等待超时时间,0 表示立即返回

func (*Queue) Start

func (q *Queue) Start() error

Start 启动 Queue(使用异步恢复)

func (*Queue) StartWithOptions

func (q *Queue) StartWithOptions(opts StartOptions) error

StartWithOptions 使用选项启动 Queue 此方法是幂等的,多次调用只会启动一次

func (*Queue) Stop

func (q *Queue) Stop() error

Stop 停止 Queue 停止后无法重新启动,需要创建新的 Queue 实例 此方法是幂等的,多次调用只会停止一次

func (*Queue) Touch

func (q *Queue) Touch(id uint64, duration ...time.Duration) error

Touch 延长任务的 TTR 支持两种模式: - Touch(id) - 重置为原始 TTR - Touch(id, duration) - 延长指定时间

func (*Queue) TryReserve

func (q *Queue) TryReserve(topics []string) *JobMeta

TryReserve 尝试立即预留任务

func (*Queue) WaitForRecovery

func (q *Queue) WaitForRecovery(timeout time.Duration) error

WaitForRecovery 等待恢复完成 timeout: 超时时间,0 表示无限等待 返回: 如果超时返回 ErrTimeout,如果队列已关闭返回 context.Canceled

type RecoveryCallback

type RecoveryCallback func(progress *RecoveryProgress)

RecoveryCallback 恢复进度回调函数 如果提供了回调,恢复过程中会调用它报告进度

type RecoveryPhase

type RecoveryPhase int

RecoveryPhase 恢复阶段

const (
	RecoveryPhaseStart      RecoveryPhase = iota // 开始恢复
	RecoveryPhaseRecovering                      // 恢复中
	RecoveryPhaseComplete                        // 完成
	RecoveryPhaseError                           // 错误
)

type RecoveryProgress

type RecoveryProgress struct {
	Phase      RecoveryPhase   // 当前阶段
	Result     *RecoveryResult // 恢复结果(仅在 Complete 阶段有值)
	TotalJobs  int             // 总任务数
	LoadedJobs int             // 已加载任务数
	FailedJobs int             // 失败任务数
	Error      error           // 错误信息(仅在 Error 阶段有值)
}

RecoveryProgress 恢复进度

type RecoveryResult

type RecoveryResult struct {
	MaxID      uint64                // 最大任务 ID
	TopicJobs  map[string][]*JobMeta // topic -> jobs
	TotalJobs  int                   // 总任务数
	FailedJobs int                   // 失败的任务数
}

RecoveryResult 恢复结果

type SortOrder

type SortOrder int

SortOrder 排序方向

const (
	SortAsc  SortOrder = iota // 升序(默认)
	SortDesc                  // 降序
)

type StartOptions

type StartOptions struct {
	// RecoveryCallback 恢复进度回调
	// 如果提供了回调,会在恢复过程中调用它报告进度(Start, Recovering 阶段)
	RecoveryCallback RecoveryCallback
}

Start 启动 Queue 如果配置了 Storage,会从 Storage 恢复任务 StartOptions 启动选项

type State

type State int

State 任务状态

const (
	// StateEnqueued 已入队,等待加载到内存
	// 临时状态,任务刚创建时的初始状态
	StateEnqueued State = iota

	// StateReady 就绪,等待 Reserve
	// 任务在 Topic.ReadyHeap 中,等待 Worker 拉取
	StateReady

	// StateDelayed 延迟中,等待到期
	// 任务在 Topic.DelayedHeap 中,到期后转为 Ready
	StateDelayed

	// StateReserved 已保留,Worker 处理中
	// 任务在 Topic.ReservedMap 中,受 TTR 保护
	StateReserved

	// StateBuried 已埋葬,暂时搁置
	// 任务在 Topic.BuriedHeap 中,需要 Kick 才能恢复
	StateBuried
)

func (State) String

func (s State) String() string

String 返回状态的字符串表示

type Stats

type Stats struct {
	// 队列状态
	TotalJobs    int // 总任务数
	ReadyJobs    int // 就绪任务数
	DelayedJobs  int // 延迟任务数
	ReservedJobs int // 保留任务数
	BuriedJobs   int // 埋葬任务数
	Topics       int // topic 数量

	// 操作计数
	Puts     uint64 // Put 操作次数
	Reserves uint64 // Reserve 操作次数
	Deletes  uint64 // Delete 操作次数
	Releases uint64 // Release 操作次数
	Buries   uint64 // Bury 操作次数
	Kicks    uint64 // Kick 操作次数
	Timeouts uint64 // 超时次数
	Touches  uint64 // Touch 操作次数
}

Stats 队列统计信息

type Storage

type Storage interface {
	// Name 返回存储名称
	Name() string

	// SaveJob 保存完整任务(元数据 + Body)
	// 只在 Put 时调用,同时保存 meta 和 body
	// Body 不可变,一旦保存就不会修改
	// 如果任务已存在则返回 ErrJobExists
	SaveJob(ctx context.Context, meta *JobMeta, body []byte) error

	// UpdateJobMeta 更新任务元数据
	// 只更新元数据(状态、统计等),不涉及 Body
	// 如果任务不存在则返回 ErrNotFound
	UpdateJobMeta(ctx context.Context, meta *JobMeta) error

	// GetJobMeta 获取任务元数据
	// 如果任务不存在则返回 ErrNotFound
	GetJobMeta(ctx context.Context, id uint64) (*JobMeta, error)

	// ScanJobMeta 扫描任务元数据
	// 支持过滤、分页和游标
	// filter 为 nil 时返回所有任务元数据(用于启动恢复)
	// 启动恢复时:ScanJobMeta(ctx, nil) 加载所有元数据,不加载 Body
	// 性能关键:100 万任务时只需 200MB 内存而不是 10GB
	ScanJobMeta(ctx context.Context, filter *JobMetaFilter) (*JobMetaList, error)

	// GetJobBody 获取任务 Body
	// 如果任务不存在则返回 ErrNotFound
	// Reserve 时才调用,按需加载
	GetJobBody(ctx context.Context, id uint64) ([]byte, error)

	// DeleteJob 删除任务(元数据 + Body)
	// 如果任务不存在则返回 ErrNotFound
	DeleteJob(ctx context.Context, id uint64) error

	// CountJobs 统计任务数量
	// filter 为 nil 时统计所有任务
	CountJobs(ctx context.Context, filter *JobMetaFilter) (int, error)

	// GetMaxJobID 获取最大任务 ID
	// 用于快速启动时初始化 ID 生成器,避免扫描所有任务
	// 如果没有任务则返回 0
	GetMaxJobID(ctx context.Context) (uint64, error)

	// Stats 返回存储统计信息
	Stats(ctx context.Context) (*StorageStats, error)

	// Close 关闭存储
	Close() error
}

Storage 持久化存储接口 设计原则:元数据与 Body 分离存储 - 元数据:轻量级(~200B),常驻内存,用于调度,会更新 - Body:可能很大(KB~MB),按需加载,不可变

type StorageStats

type StorageStats struct {
	Name           string // 存储名称
	TotalJobs      int64  // 总任务数
	TotalTopics    int    // 总 topic 数
	MetaSize       int64  // 元数据存储大小(字节)
	BodySize       int64  // Body 存储大小(字节)
	TotalSize      int64  // 总存储大小(字节)
	LastSaveTime   int64  // 最后保存时间(Unix 时间戳)
	LastLoadTime   int64  // 最后加载时间(Unix 时间戳)
	AvgMetaSize    int64  // 平均元数据大小(字节)
	AvgBodySize    int64  // 平均 Body 大小(字节)
	LoadedMetaSize int64  // 已加载元数据大小(字节)
	LoadedBodySize int64  // 已加载 Body 大小(字节)
}

StorageStats 存储统计信息

type Tickable

type Tickable interface {
	// ProcessTick 处理 tick 通知
	ProcessTick(now time.Time)

	// NextTickTime 返回下一个需要 tick 的时间
	// 返回 zero time 表示不需要 tick
	NextTickTime() time.Time

	// NeedsTick 是否需要 tick
	NeedsTick() bool
}

Tickable 可被 tick 的对象接口 Topic 实现此接口以接收 tick 通知

type Ticker

type Ticker interface {
	// Name 返回定时器名称
	Name() string

	// Start 启动定时器
	Start()

	// Stop 停止定时器
	Stop()

	// Register 注册需要 tick 的对象
	Register(name string, tickable Tickable)

	// Unregister 取消注册
	Unregister(name string)

	// Wakeup 唤醒定时器(新任务加入时)
	Wakeup()

	// Stats 返回统计信息
	Stats() *TickerStats
}

Ticker 定时器接口 负责定时触发已注册对象的 ProcessTick 方法

type TickerStats

type TickerStats struct {
	Name            string        // 定时器名称
	RegisteredCount int           // 注册的对象数量
	NextTickTime    time.Time     // 下一个 tick 时间
	TimeUntilTick   time.Duration // 距离下一个 tick 的时间
}

TickerStats 定时器统计信息

type TopicStats

type TopicStats struct {
	Name         string
	ReadyJobs    int
	DelayedJobs  int
	ReservedJobs int
	BuriedJobs   int
	TotalJobs    int
}

TopicStats Topic 统计信息

type WaitingStats

type WaitingStats struct {
	Topic          string // Topic 名称
	WaitingWorkers int    // 等待的 worker 数量
}

WaitingStats 等待队列统计信息

Directories

Path Synopsis
examples
memorydynsleep command
memorytimewheel command
sqlitedynsleep command
sqlitetimewheel command
stability command
stability 是一个带 Inspector 监控的稳定性测试程序
stability 是一个带 Inspector 监控的稳定性测试程序
webui command
Package main 演示如何使用 webui 包提供的 HTTP API 和嵌入式前端界面
Package main 演示如何使用 webui 包提供的 HTTP API 和嵌入式前端界面
x
dynsleep
Package dynsleep provides a dynamic sleep ticker implementation.
Package dynsleep provides a dynamic sleep ticker implementation.
memory
Package memory provides an in-memory storage implementation.
Package memory provides an in-memory storage implementation.
metrics
Package metrics 提供 Prometheus metrics 支持
Package metrics 提供 Prometheus metrics 支持
sqlite
Package sqlite provides a SQLite storage implementation.
Package sqlite provides a SQLite storage implementation.
timewheel
Package timewheel provides a time wheel ticker implementation.
Package timewheel provides a time wheel ticker implementation.
webui
Package inspector 提供任务查询能力,方便实时 UI 展示
Package inspector 提供任务查询能力,方便实时 UI 展示

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL