task

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 21, 2025 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Alert

type Alert struct {
	Name      string   `bson:"name" json:"name"`           // 告警名称
	Condition string   `bson:"condition" json:"condition"` // 触发条件
	Channels  []string `bson:"channels" json:"channels"`   // 通知渠道
}

Alert 告警规则

type AlertingConfig

type AlertingConfig struct {
	Enabled bool    `bson:"enabled" json:"enabled"`
	Rules   []Alert `bson:"rules" json:"rules"` // 告警规则
}

AlertingConfig 告警配置

type AuthConfig

type AuthConfig struct {
	Type   string         `bson:"type" json:"type"`     // 认证类型 (none, basic, bearer, oauth2, custom)
	Config map[string]any `bson:"config" json:"config"` // 认证配置
}

AuthConfig 认证配置

type Batch

type Batch struct {
	ID        string    `json:"id"`
	Tasks     []*Task   `json:"tasks"`
	CreatedAt time.Time `json:"created_at"`
	Source    string    `json:"source"`
}

Batch represents a batch of tasks

func NewTaskBatch

func NewTaskBatch(tasks []*Task, source string) *Batch

NewTaskBatch creates a new task batch

func (*Batch) Filter

func (tb *Batch) Filter(predicate func(*Task) bool) []*Task

Filter filters tasks by predicate

func (*Batch) Size

func (tb *Batch) Size() int

Size returns batch size

type ConfigLoader

type ConfigLoader struct {
	// contains filtered or unexported fields
}

ConfigLoader loads task configurations from various sources

func NewConfigLoader

func NewConfigLoader(config *LoaderConfig) *ConfigLoader

NewConfigLoader creates a new configuration loader

func (*ConfigLoader) AddSource

func (cl *ConfigLoader) AddSource(source ConfigSource)

AddSource adds a configuration source

func (*ConfigLoader) Close

func (cl *ConfigLoader) Close()

Close closes the configuration loader

func (*ConfigLoader) LoadAll

func (cl *ConfigLoader) LoadAll(ctx context.Context) error

LoadAll loads all configurations from all sources

type ConfigSource

type ConfigSource interface {
	Name() string
	Load(ctx context.Context) ([]*TaskConfig, error)
}

ConfigSource represents a configuration source

type DateStrategy

type DateStrategy struct {
	Mode       string `bson:"mode" json:"mode"`                                   // 模式: single(单个日期), range(日期范围), custom(自定义)
	Format     string `bson:"format" json:"format"`                               // 日期格式,如 "2006-01-02"
	StartDays  int    `bson:"start_days" json:"start_days"`                       // 起始天数偏移(相对于今天,可为负数)
	EndDays    int    `bson:"end_days" json:"end_days"`                           // 结束天数偏移(相对于今天)
	DayStep    int    `bson:"day_step" json:"day_step"`                           // 步长(默认1,表示每天)
	CustomDays []int  `bson:"custom_days,omitempty" json:"custom_days,omitempty"` // 自定义天数偏移列表
}

DateStrategy 日期变量策略

type DeduplicationConfig

type DeduplicationConfig struct {
	Enabled    bool          `json:"enabled"`
	Strategy   string        `json:"strategy"` // url, content, custom
	Fields     []string      `json:"fields"`   // Fields to use for deduplication
	Expiration time.Duration `json:"expiration"`
}

DeduplicationConfig defines deduplication settings

type DefaultSeedExpander

type DefaultSeedExpander struct{}

DefaultSeedExpander is the default seed expander

func NewDefaultSeedExpander

func NewDefaultSeedExpander() *DefaultSeedExpander

NewDefaultSeedExpander creates a default seed expander

func (*DefaultSeedExpander) Expand

func (e *DefaultSeedExpander) Expand(data []byte) []*Task

Expand expands seed data into tasks

type DefaultTaskAllocator

type DefaultTaskAllocator struct {
	// contains filtered or unexported fields
}

DefaultTaskAllocator is the default task allocator

func NewDefaultTaskAllocator

func NewDefaultTaskAllocator(strategy string) *DefaultTaskAllocator

NewDefaultTaskAllocator creates a default task allocator

func (*DefaultTaskAllocator) Allocate

func (a *DefaultTaskAllocator) Allocate(ctx context.Context, instance *TaskInstance, nodes []NodeInfo) (*NodeInfo, error)

Allocate allocates a task to a node

func (*DefaultTaskAllocator) Release

func (a *DefaultTaskAllocator) Release(ctx context.Context, instance *TaskInstance) error

Release releases a task allocation

type DelayedQueue

type DelayedQueue struct {
	// contains filtered or unexported fields
}

DelayedQueue manages delayed task execution

func NewDelayedQueue

func NewDelayedQueue(queue Queue, client *redis.Client, prefix string) *DelayedQueue

NewDelayedQueue creates a new delayed queue

func (*DelayedQueue) ProcessDelayed

func (dq *DelayedQueue) ProcessDelayed(ctx context.Context) error

ProcessDelayed moves ready tasks to main queue

func (*DelayedQueue) PushDelayed

func (dq *DelayedQueue) PushDelayed(ctx context.Context, task *Task, delay time.Duration) error

PushDelayed adds a task with delay

func (*DelayedQueue) StartProcessor

func (dq *DelayedQueue) StartProcessor(ctx context.Context, interval time.Duration)

StartProcessor starts the delayed task processor

type DispatchStrategy

type DispatchStrategy string

DispatchStrategy defines how tasks are distributed

const (
	StrategyRoundRobin DispatchStrategy = "round_robin"
	StrategyLeastLoad  DispatchStrategy = "least_load"
	StrategyRandom     DispatchStrategy = "random"
	StrategyHash       DispatchStrategy = "hash"
	StrategySticky     DispatchStrategy = "sticky"
)

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher distributes tasks to worker nodes

func NewDispatcher

func NewDispatcher(config *DispatcherConfig) *Dispatcher

NewDispatcher creates a new task dispatcher

func (*Dispatcher) GetStats

func (d *Dispatcher) GetStats() map[string]interface{}

GetStats returns dispatcher statistics

func (*Dispatcher) RegisterNode

func (d *Dispatcher) RegisterNode(node *NodeInfo) error

RegisterNode registers a worker node

func (*Dispatcher) Start

func (d *Dispatcher) Start(ctx context.Context) error

Start starts the dispatcher

func (*Dispatcher) Stop

func (d *Dispatcher) Stop() error

Stop stops the dispatcher

func (*Dispatcher) Submit

func (d *Dispatcher) Submit(ctx context.Context, task *Task) error

Submit submits a task for dispatch

func (*Dispatcher) SubmitBatch

func (d *Dispatcher) SubmitBatch(ctx context.Context, tasks []*Task) error

SubmitBatch submits multiple tasks

func (*Dispatcher) UnregisterNode

func (d *Dispatcher) UnregisterNode(nodeID string) error

UnregisterNode unregisters a worker node

func (*Dispatcher) UpdateNodeHeartbeat

func (d *Dispatcher) UpdateNodeHeartbeat(nodeID string, load int) error

UpdateNodeHeartbeat updates node heartbeat

type DispatcherConfig

type DispatcherConfig struct {
	QueueType   QueueType
	Strategy    DispatchStrategy
	BatchSize   int
	MaxRetries  int
	Timeout     time.Duration
	RedisClient *redis.Client
	RedisPrefix string
	Logger      *slog.Logger
}

DispatcherConfig contains dispatcher configuration

type ErrorHandling

type ErrorHandling struct {
	Strategy      string   `bson:"strategy" json:"strategy"`             // 策略 (retry, skip, fail, notify)
	RetryStrategy string   `bson:"retry_strategy" json:"retry_strategy"` // 重试策略 (exponential, linear, fixed)
	MaxRetries    int      `bson:"max_retries" json:"max_retries"`       // 最大重试次数
	NotifyOn      []string `bson:"notify_on" json:"notify_on"`           // 通知条件
}

ErrorHandling 错误处理配置

type Event

type Event struct {
	Type      string                 `json:"type"`
	Source    string                 `json:"source"`
	Data      map[string]interface{} `json:"data"`
	Timestamp time.Time              `json:"timestamp"`
}

Event represents a trigger event

type ExecutionConfig

type ExecutionConfig struct {
	Mode          string        `bson:"mode" json:"mode"`                     // 执行模式 (single, batch, stream)
	BatchSize     int           `bson:"batch_size" json:"batch_size"`         // 批量大小
	Parallelism   int           `bson:"parallelism" json:"parallelism"`       // 并行度
	Dependencies  []string      `bson:"dependencies" json:"dependencies"`     // 依赖任务
	PreHooks      []HookConfig  `bson:"pre_hooks" json:"pre_hooks"`           // 前置钩子
	PostHooks     []HookConfig  `bson:"post_hooks" json:"post_hooks"`         // 后置钩子
	ErrorHandling ErrorHandling `bson:"error_handling" json:"error_handling"` // 错误处理
}

ExecutionConfig 执行配置

type Executor

type Executor interface {
	// Execute executes the task
	Execute(ctx context.Context, task *Task) (*TaskResult, error)

	// SetFetcher sets the fetcher
	SetFetcher(fetcher Fetcher)

	// SetExtractor sets the extractor
	SetExtractor(extractor Extractor)

	// SetPipeline sets the pipeline
	SetPipeline(pipeline Pipeline)

	// SetStorage sets the storage
	SetStorage(storage Storage)
}

Executor defines the interface for task execution

type ExtractRule

type ExtractRule struct {
	Field     string          `json:"field"`
	Selector  string          `json:"selector"`
	Type      string          `json:"type"` // css, xpath, json, regex
	Attribute string          `json:"attribute,omitempty"`
	Multiple  bool            `json:"multiple"`
	Required  bool            `json:"required"`
	Default   interface{}     `json:"default,omitempty"`
	Transform []TransformRule `json:"transform,omitempty"`
	Children  []ExtractRule   `json:"children,omitempty"` // For nested extraction
}

ExtractRule defines data extraction rules

type ExtractionConfig

type ExtractionConfig struct {
	Type      string           `bson:"type" json:"type"`           // 提取类型 (json, html, xml, regex, custom)
	Rules     []ExtractionRule `bson:"rules" json:"rules"`         // 提取规则
	Scripts   []Script         `bson:"scripts" json:"scripts"`     // 自定义脚本
	Templates []Template       `bson:"templates" json:"templates"` // 模板
}

ExtractionConfig 数据提取配置 - 支持任何数据格式

type ExtractionRule

type ExtractionRule struct {
	Field      string           `bson:"field" json:"field"`           // 字段名
	Path       string           `bson:"path" json:"path"`             // 路径表达式
	Type       string           `bson:"type" json:"type"`             // 数据类型
	Required   bool             `bson:"required" json:"required"`     // 是否必需
	Default    any              `bson:"default" json:"default"`       // 默认值
	Transform  []Transform      `bson:"transform" json:"transform"`   // 转换规则
	Validation []Validation     `bson:"validation" json:"validation"` // 验证规则
	Children   []ExtractionRule `bson:"children" json:"children"`     // 子规则(嵌套结构)
}

ExtractionRule 提取规则

type Extractor

type Extractor interface {
	// Extract extracts data from the response
	Extract(ctx context.Context, response *Response, rules []ExtractRule) (map[string]interface{}, error)

	// ExtractLinks extracts links for further crawling
	ExtractLinks(ctx context.Context, response *Response) ([]string, error)

	// GetType returns the extractor type (css, xpath, json, regex)
	GetType() string
}

Extractor defines the interface for data extraction

type Fetcher

type Fetcher interface {
	// Fetch fetches content from the given URL
	Fetch(ctx context.Context, task *Task) (*Response, error)

	// GetType returns the fetcher type (http, browser, api)
	GetType() string
}

Fetcher defines the interface for fetching content

type FileSource

type FileSource struct {
	// contains filtered or unexported fields
}

FileSource loads configurations from files

func NewFileSource

func NewFileSource(path string, logger *slog.Logger) *FileSource

NewFileSource creates a new file configuration source

func (*FileSource) Load

func (fs *FileSource) Load(ctx context.Context) ([]*TaskConfig, error)

Load loads configurations from file

func (*FileSource) Name

func (fs *FileSource) Name() string

Name returns the source name

type HookConfig

type HookConfig struct {
	Type   string         `bson:"type" json:"type"`     // 钩子类型 (http, script, function)
	Action string         `bson:"action" json:"action"` // 动作
	Config map[string]any `bson:"config" json:"config"` // 配置
}

HookConfig 钩子配置

type LinkRule

type LinkRule struct {
	Name      string      `json:"name"`
	Selector  string      `json:"selector"`
	Type      string      `json:"type"` // css, xpath, regex, json
	Attribute string      `json:"attribute"`
	Pattern   string      `json:"pattern"`   // URL pattern to match
	TaskType  Type        `json:"task_type"` // Type for generated tasks
	Priority  Priority    `json:"priority"`
	Depth     int         `json:"depth"`    // Max depth for this rule
	Metadata  interface{} `json:"metadata"` // Metadata to add to generated tasks
}

LinkRule defines rules for extracting and generating child tasks

type LoaderConfig

type LoaderConfig struct {
	Registry    *TaskRegistry
	Manager     *TaskManager
	Redis       *redis.Client
	Prefix      string
	Logger      *slog.Logger
	ConfigPaths []string
	WatchFiles  bool
}

LoaderConfig contains configuration loader settings

type LoggingConfig

type LoggingConfig struct {
	Level      string  `bson:"level" json:"level"`             // 日志级别
	SampleRate float64 `bson:"sample_rate" json:"sample_rate"` // 采样率
}

LoggingConfig 日志配置

type ManagerConfig

type ManagerConfig struct {
	Registry           *TaskRegistry
	Redis              *redis.Client
	Prefix             string
	Logger             *slog.Logger
	Allocator          TaskAllocator
	NodeManager        NodeManager
	MaxActiveTasks     int
	TaskTimeout        time.Duration
	CleanupInterval    time.Duration
	StateCheckInterval time.Duration
}

ManagerConfig contains task manager configuration

type MemoryQueue

type MemoryQueue struct {
	// contains filtered or unexported fields
}

MemoryQueue is an in-memory task queue

func NewMemoryQueue

func NewMemoryQueue(capacity int) *MemoryQueue

NewMemoryQueue creates a new memory queue

func (*MemoryQueue) Clear

func (mq *MemoryQueue) Clear(ctx context.Context) error

Clear removes all tasks

func (*MemoryQueue) Peek

func (mq *MemoryQueue) Peek(ctx context.Context) (*Task, error)

Peek retrieves but doesn't remove a task

func (*MemoryQueue) Pop

func (mq *MemoryQueue) Pop(ctx context.Context) (*Task, error)

Pop retrieves and removes a task

func (*MemoryQueue) PopN

func (mq *MemoryQueue) PopN(ctx context.Context, n int) ([]*Task, error)

PopN retrieves and removes N tasks

func (*MemoryQueue) Push

func (mq *MemoryQueue) Push(ctx context.Context, task *Task) error

Push adds a task to the queue

func (*MemoryQueue) PushBatch

func (mq *MemoryQueue) PushBatch(ctx context.Context, tasks []*Task) error

PushBatch adds multiple tasks

func (*MemoryQueue) Size

func (mq *MemoryQueue) Size(ctx context.Context) (int64, error)

Size returns queue size

func (*MemoryQueue) Stats

func (mq *MemoryQueue) Stats(ctx context.Context) (*QueueStats, error)

Stats returns queue statistics

type MetricsConfig

type MetricsConfig struct {
	Enabled    bool              `bson:"enabled" json:"enabled"`
	Collectors []string          `bson:"collectors" json:"collectors"` // 收集器
	Labels     map[string]string `bson:"labels" json:"labels"`         // 标签
}

MetricsConfig 指标配置

type MonitoringConfig

type MonitoringConfig struct {
	Metrics  MetricsConfig  `bson:"metrics" json:"metrics"`   // 指标配置
	Logging  LoggingConfig  `bson:"logging" json:"logging"`   // 日志配置
	Alerting AlertingConfig `bson:"alerting" json:"alerting"` // 告警配置
	Tracing  TracingConfig  `bson:"tracing" json:"tracing"`   // 追踪配置
}

MonitoringConfig 监控配置

type NodeInfo

type NodeInfo struct {
	ID            string    `json:"id"`
	Address       string    `json:"address"`
	Capacity      int       `json:"capacity"`
	CurrentLoad   int       `json:"current_load"`
	Status        string    `json:"status"`
	LastHeartbeat time.Time `json:"last_heartbeat"`
	Tags          []string  `json:"tags"`
	Capabilities  []string  `json:"capabilities"`
}

NodeInfo contains worker node information

type NodeManager

type NodeManager interface {
	GetAvailableNodes(ctx context.Context) ([]NodeInfo, error)
	GetNode(ctx context.Context, nodeID string) (*NodeInfo, error)
	UpdateNodeLoad(ctx context.Context, nodeID string, delta int) error
}

NodeManager manages node information

type PaginationConfig

type PaginationConfig struct {
	Enabled       bool   `bson:"enabled" json:"enabled"`
	Type          string `bson:"type" json:"type"`                     // 分页类型 (page, offset, cursor, token)
	StartParam    string `bson:"start_param" json:"start_param"`       // 起始参数名
	LimitParam    string `bson:"limit_param" json:"limit_param"`       // 限制参数名
	StartValue    any    `bson:"start_value" json:"start_value"`       // 起始值
	PageSize      int    `bson:"page_size" json:"page_size"`           // 页大小
	MaxPages      int    `bson:"max_pages" json:"max_pages"`           // 最大页数
	StopCondition string `bson:"stop_condition" json:"stop_condition"` // 停止条件
}

PaginationConfig 分页配置

type Pipeline

type Pipeline interface {
	// Process processes the extracted data through the pipeline
	Process(ctx context.Context, data map[string]interface{}) (map[string]interface{}, error)

	// GetID returns the pipeline identifier
	GetID() string

	// Validate validates the data against pipeline rules
	Validate(data map[string]interface{}) error
}

Pipeline defines the interface for data processing pipeline

type Priority

type Priority int

Priority defines task priority levels

const (
	PriorityLow    Priority = 0
	PriorityNormal Priority = 1
	PriorityHigh   Priority = 2
	PriorityUrgent Priority = 3
)

type PriorityQueue

type PriorityQueue struct {
	// contains filtered or unexported fields
}

PriorityQueue manages tasks with priority

func NewPriorityQueue

func NewPriorityQueue() *PriorityQueue

NewPriorityQueue creates a new priority queue

func (*PriorityQueue) Clear

func (pq *PriorityQueue) Clear(ctx context.Context) error

Clear removes all tasks

func (*PriorityQueue) Peek

func (pq *PriorityQueue) Peek(ctx context.Context) (*Task, error)

Peek retrieves but doesn't remove the highest priority task

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop(ctx context.Context) (*Task, error)

Pop retrieves highest priority task

func (*PriorityQueue) PopN

func (pq *PriorityQueue) PopN(ctx context.Context, n int) ([]*Task, error)

PopN retrieves N highest priority tasks

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(ctx context.Context, task *Task) error

Push adds a task to the appropriate priority queue

func (*PriorityQueue) PushBatch

func (pq *PriorityQueue) PushBatch(ctx context.Context, tasks []*Task) error

PushBatch adds multiple tasks

func (*PriorityQueue) Size

func (pq *PriorityQueue) Size(ctx context.Context) (int64, error)

Size returns total queue size

func (*PriorityQueue) Stats

func (pq *PriorityQueue) Stats(ctx context.Context) (*QueueStats, error)

Stats returns queue statistics

type Processor

type Processor struct {
	Name   string         `bson:"name" json:"name"`     // 处理器名称
	Type   string         `bson:"type" json:"type"`     // 处理器类型
	Config map[string]any `bson:"config" json:"config"` // 处理器配置
}

Processor 处理器

type ProxyConfig

type ProxyConfig struct {
	Enabled  bool     `bson:"enabled" json:"enabled"`
	Type     string   `bson:"type" json:"type"` // 代理类型 (http, socks5)
	Rotation bool     `bson:"rotation" json:"rotation"`
	Servers  []string `bson:"servers" json:"servers"`
}

ProxyConfig 代理配置

type Queue

type Queue interface {
	// Push adds a task to the queue
	Push(ctx context.Context, task *Task) error
	// PushBatch adds multiple tasks
	PushBatch(ctx context.Context, tasks []*Task) error
	// Pop retrieves and removes a task from the queue
	Pop(ctx context.Context) (*Task, error)
	// PopN retrieves and removes N tasks from the queue
	PopN(ctx context.Context, n int) ([]*Task, error)
	// Peek retrieves but doesn't remove a task
	Peek(ctx context.Context) (*Task, error)
	// Size returns queue size
	Size(ctx context.Context) (int64, error)
	// Clear removes all tasks
	Clear(ctx context.Context) error
	// Stats returns queue statistics
	Stats(ctx context.Context) (*QueueStats, error)
}

Queue interface for task queue

type QueueStats

type QueueStats struct {
	Total      int64              `json:"total"`
	ByStatus   map[Status]int64   `json:"by_status"`
	ByPriority map[Priority]int64 `json:"by_priority"`
	ByType     map[Type]int64     `json:"by_type"`
	OldestTask time.Time          `json:"oldest_task"`
	NewestTask time.Time          `json:"newest_task"`
}

QueueStats contains queue statistics

type QueueType

type QueueType string

QueueType defines queue types

const (
	QueueTypeMemory QueueType = "memory"
	QueueTypeRedis  QueueType = "redis"
)

type RateLimitConfig

type RateLimitConfig struct {
	Enabled         bool `bson:"enabled" json:"enabled"`
	RequestsPerMin  int  `bson:"requests_per_min" json:"requests_per_min"`
	RequestsPerHour int  `bson:"requests_per_hour" json:"requests_per_hour"`
	BurstSize       int  `bson:"burst_size" json:"burst_size"`
}

RateLimitConfig 速率限制配置

type RedisQueue

type RedisQueue struct {
	// contains filtered or unexported fields
}

RedisQueue is a Redis-based distributed queue

func NewRedisQueue

func NewRedisQueue(client *redis.Client, keyPrefix string) *RedisQueue

NewRedisQueue creates a new Redis queue

func (*RedisQueue) Clear

func (rq *RedisQueue) Clear(ctx context.Context) error

Clear removes all tasks

func (*RedisQueue) Peek

func (rq *RedisQueue) Peek(ctx context.Context) (*Task, error)

Peek retrieves but doesn't remove highest priority task

func (*RedisQueue) Pop

func (rq *RedisQueue) Pop(ctx context.Context) (*Task, error)

Pop retrieves and removes highest priority task

func (*RedisQueue) PopN

func (rq *RedisQueue) PopN(ctx context.Context, n int) ([]*Task, error)

PopN retrieves and removes N tasks

func (*RedisQueue) Push

func (rq *RedisQueue) Push(ctx context.Context, task *Task) error

Push adds a task to the Redis queue

func (*RedisQueue) PushBatch

func (rq *RedisQueue) PushBatch(ctx context.Context, tasks []*Task) error

PushBatch adds multiple tasks

func (*RedisQueue) Size

func (rq *RedisQueue) Size(ctx context.Context) (int64, error)

Size returns total queue size

func (*RedisQueue) Stats

func (rq *RedisQueue) Stats(ctx context.Context) (*QueueStats, error)

Stats returns queue statistics

type RedisSource

type RedisSource struct {
	// contains filtered or unexported fields
}

RedisSource loads configurations from Redis

func NewRedisSource

func NewRedisSource(redis *redis.Client, prefix string, logger *slog.Logger) *RedisSource

NewRedisSource creates a new Redis configuration source

func (*RedisSource) Load

func (rs *RedisSource) Load(ctx context.Context) ([]*TaskConfig, error)

Load loads configurations from Redis

func (*RedisSource) Name

func (rs *RedisSource) Name() string

Name returns the source name

type RequestTemplate

type RequestTemplate struct {
	Method      string            `bson:"method" json:"method"`             // 请求方法
	URL         string            `bson:"url" json:"url"`                   // URL模板(支持变量)
	Headers     map[string]string `bson:"headers" json:"headers"`           // 请求头
	Cookies     map[string]string `bson:"cookies" json:"cookies"`           // Cookies
	QueryParams map[string]any    `bson:"query_params" json:"query_params"` // 查询参数
	Body        any               `bson:"body" json:"body"`                 // 请求体(支持模板)
	Auth        AuthConfig        `bson:"auth" json:"auth"`                 // 认证配置
	Proxy       ProxyConfig       `bson:"proxy" json:"proxy"`               // 代理配置
	Variables   []Variable        `bson:"variables" json:"variables"`       // 变量定义
	Pagination  PaginationConfig  `bson:"pagination" json:"pagination"`     // 分页配置
}

RequestTemplate 请求模板 - 通用HTTP/WebSocket/自定义协议

type Response

type Response struct {
	StatusCode int
	Headers    map[string][]string
	Body       []byte
	URL        string // Final URL after redirects

	// For browser-based fetching
	HTML       string
	Screenshot []byte

	// Metadata
	Duration int64 // Response time in milliseconds
	Size     int64 // Content size in bytes
}

Response represents the fetch response

type RoutingRule

type RoutingRule struct {
	Name      string
	Condition func(*Task) bool
	Target    string // Node ID or tag
	Priority  int
}

RoutingRule defines task routing rules

type Schedule

type Schedule struct {
	ID           string                 `json:"id"`
	Name         string                 `json:"name"`
	Type         ScheduleType           `json:"type"`
	Expression   string                 `json:"expression"` // Cron expression or interval
	TaskTemplate *Task                  `json:"task_template"`
	Enabled      bool                   `json:"enabled"`
	LastRun      *time.Time             `json:"last_run,omitempty"`
	NextRun      *time.Time             `json:"next_run,omitempty"`
	RunCount     int                    `json:"run_count"`
	MaxRuns      int                    `json:"max_runs"` // 0 for unlimited
	Metadata     map[string]interface{} `json:"metadata,omitempty"`
	CreatedAt    time.Time              `json:"created_at"`
	UpdatedAt    time.Time              `json:"updated_at"`
}

Schedule defines a task schedule

type ScheduleBuilder

type ScheduleBuilder struct {
	// contains filtered or unexported fields
}

ScheduleBuilder helps build schedules

func NewScheduleBuilder

func NewScheduleBuilder(name string) *ScheduleBuilder

NewScheduleBuilder creates a new schedule builder

func (*ScheduleBuilder) Build

func (sb *ScheduleBuilder) Build() *Schedule

Build builds the schedule

func (*ScheduleBuilder) WithCron

func (sb *ScheduleBuilder) WithCron(expression string) *ScheduleBuilder

WithCron sets cron expression

func (*ScheduleBuilder) WithEvent

func (sb *ScheduleBuilder) WithEvent(eventType string) *ScheduleBuilder

WithEvent sets event trigger

func (*ScheduleBuilder) WithInterval

func (sb *ScheduleBuilder) WithInterval(interval time.Duration) *ScheduleBuilder

WithInterval sets interval

func (*ScheduleBuilder) WithMaxRuns

func (sb *ScheduleBuilder) WithMaxRuns(maxRuns int) *ScheduleBuilder

WithMaxRuns sets maximum runs

func (*ScheduleBuilder) WithOnce

func (sb *ScheduleBuilder) WithOnce(at time.Time) *ScheduleBuilder

WithOnce sets one-time execution

func (*ScheduleBuilder) WithTask

func (sb *ScheduleBuilder) WithTask(task *Task) *ScheduleBuilder

WithTask sets task template

type ScheduleCond

type ScheduleCond struct {
	Type       string `bson:"type" json:"type"`             // 条件类型
	Expression string `bson:"expression" json:"expression"` // 条件表达式
}

ScheduleCond 调度条件

type ScheduleConfig

type ScheduleConfig struct {
	Type       string         `bson:"type" json:"type"`             // 调度类型 (cron, interval, once, manual)
	Expression string         `bson:"expression" json:"expression"` // Cron表达式或间隔
	Timezone   string         `bson:"timezone" json:"timezone"`     // 时区
	StartTime  *time.Time     `bson:"start_time" json:"start_time"` // 开始时间
	EndTime    *time.Time     `bson:"end_time" json:"end_time"`     // 结束时间
	Conditions []ScheduleCond `bson:"conditions" json:"conditions"` // 执行条件
}

ScheduleConfig 调度配置

type ScheduleType

type ScheduleType string

ScheduleType defines schedule types

const (
	ScheduleTypeOnce     ScheduleType = "once"
	ScheduleTypeInterval ScheduleType = "interval"
	ScheduleTypeCron     ScheduleType = "cron"
	ScheduleTypeEvent    ScheduleType = "event"
)

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler manages task scheduling

func NewScheduler

func NewScheduler(config *SchedulerConfig) *Scheduler

NewScheduler creates a new scheduler

func (*Scheduler) AddSchedule

func (s *Scheduler) AddSchedule(schedule *Schedule) error

AddSchedule adds a new schedule

func (*Scheduler) DisableSchedule

func (s *Scheduler) DisableSchedule(scheduleID string) error

DisableSchedule disables a schedule

func (*Scheduler) EnableSchedule

func (s *Scheduler) EnableSchedule(scheduleID string) error

EnableSchedule enables a schedule

func (*Scheduler) GetSchedule

func (s *Scheduler) GetSchedule(scheduleID string) (*Schedule, error)

GetSchedule returns a schedule by ID

func (*Scheduler) GetStats

func (s *Scheduler) GetStats() map[string]interface{}

GetStats returns scheduler statistics

func (*Scheduler) ListSchedules

func (s *Scheduler) ListSchedules() []*Schedule

ListSchedules returns all schedules

func (*Scheduler) RemoveSchedule

func (s *Scheduler) RemoveSchedule(scheduleID string) error

RemoveSchedule removes a schedule

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context) error

Start starts the scheduler

func (*Scheduler) Stop

func (s *Scheduler) Stop() error

Stop stops the scheduler

func (*Scheduler) TriggerEvent

func (s *Scheduler) TriggerEvent(event *Event)

TriggerEvent triggers event-based schedules

func (*Scheduler) TriggerSchedule

func (s *Scheduler) TriggerSchedule(scheduleID string) error

TriggerSchedule manually triggers a schedule

type SchedulerConfig

type SchedulerConfig struct {
	Dispatcher *Dispatcher
	Logger     *slog.Logger
}

SchedulerConfig contains scheduler configuration

type Script

type Script struct {
	Language string `bson:"language" json:"language"` // 脚本语言 (javascript, python, lua)
	Code     string `bson:"code" json:"code"`         // 脚本代码
}

Script 脚本

type ScriptConfig

type ScriptConfig struct {
	ID          string                 `json:"id" yaml:"id"`
	Name        string                 `json:"name" yaml:"name"`
	Description string                 `json:"description" yaml:"description"`
	Language    string                 `json:"language" yaml:"language"` // lua, javascript
	Source      string                 `json:"source" yaml:"source"`     // inline, file, url
	Content     string                 `json:"content" yaml:"content"`
	File        string                 `json:"file" yaml:"file"`
	URL         string                 `json:"url" yaml:"url"`
	Parameters  map[string]interface{} `json:"parameters" yaml:"parameters"`
	Schedule    string                 `json:"schedule" yaml:"schedule"` // Cron expression
}

ScriptConfig represents a Lua script configuration

type SeedExpander

type SeedExpander interface {
	Expand(data []byte) []*Task
}

SeedExpander expands seed tasks into new tasks

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service 任务服务 - 提供任务管理的完整功能

func NewService

func NewService(db *mongo.Database) *Service

NewService 创建任务服务

func (*Service) CreateTask

func (s *Service) CreateTask(task *TaskDocument) (*TaskDocument, error)

CreateTask 创建任务

func (*Service) DeleteTask

func (s *Service) DeleteTask(id string) error

DeleteTask 删除任务

func (*Service) DisableTask

func (s *Service) DisableTask(id string) error

DisableTask 禁用任务

func (*Service) EnableTask

func (s *Service) EnableTask(id string) error

EnableTask 启用任务

func (*Service) GetReadyTasks

func (s *Service) GetReadyTasks() ([]*TaskDocument, error)

GetReadyTasks 获取待执行的任务

func (*Service) GetTask

func (s *Service) GetTask(id string) (*TaskDocument, error)

GetTask 获取单个任务

func (*Service) ListTasks

func (s *Service) ListTasks(filter TaskFilter) ([]*TaskDocument, error)

ListTasks 列出任务

func (*Service) UpdateTask

func (s *Service) UpdateTask(id string, update bson.M) error

UpdateTask 更新任务

func (*Service) UpdateTaskRaw

func (s *Service) UpdateTaskRaw(id primitive.ObjectID, update interface{}) (*mongo.UpdateResult, error)

UpdateTaskRaw 使用原始更新文档更新任务

type Status

type Status string

Status defines task status

const (
	StatusPending   Status = "pending"
	StatusQueued    Status = "queued"
	StatusRunning   Status = "running"
	StatusCompleted Status = "completed"
	StatusFailed    Status = "failed"
	StatusRetrying  Status = "retrying"
	StatusCancelled Status = "cancelled"
)

type Storage

type Storage interface {
	// Save saves the processed data
	Save(ctx context.Context, data interface{}) error

	// SaveBatch saves multiple items
	SaveBatch(ctx context.Context, items []interface{}) error

	// GetType returns the storage type (mongodb, mysql, redis, file, etc.)
	GetType() string

	// Close closes the storage connection
	Close() error
}

Storage defines the interface for data storage

type StorageConfig

type StorageConfig struct {
	Type       string `json:"type"` // mongodb, mysql, redis, file, s3
	Database   string `json:"database,omitempty"`
	Collection string `json:"collection,omitempty"` // For MongoDB
	Table      string `json:"table,omitempty"`      // For SQL databases
	Bucket     string `json:"bucket,omitempty"`     // For S3
	Path       string `json:"path,omitempty"`       // For file storage
	Format     string `json:"format,omitempty"`     // json, csv, parquet

	// Additional configuration
	Options map[string]interface{} `json:"options,omitempty"`
}

StorageConfig defines storage configuration

type StorageConfiguration

type StorageConfiguration struct {
	Targets []StorageTarget `bson:"targets" json:"targets"` // 存储目标
}

StorageConfiguration 存储配置 - 支持多种存储后端

type StorageTarget

type StorageTarget struct {
	Name       string         `bson:"name" json:"name"`             // 目标名称
	Type       string         `bson:"type" json:"type"`             // 存储类型 (mongodb, mysql, redis, elasticsearch, file, s3)
	Purpose    string         `bson:"purpose" json:"purpose"`       // 用途 (primary, backup, cache, archive)
	Config     map[string]any `bson:"config" json:"config"`         // 存储配置
	Conditions []string       `bson:"conditions" json:"conditions"` // 存储条件
}

StorageTarget 存储目标

type Task

type Task struct {
	// Identity
	ID       string `json:"id"`
	ParentID string `json:"parent_id,omitempty"` // MongoDB task ID
	Name     string `json:"name,omitempty"`      // Task name from MongoDB
	URL      string `json:"url"`
	Hash     string `json:"hash"` // URL hash for deduplication
	Type     string `json:"type"` // Changed to string for flexibility
	Priority int    `json:"priority"`
	Status   Status `json:"status"`
	NodeID   string `json:"node_id,omitempty"` // Assigned node

	// Retry management
	RetryCount int           `json:"retry_count"`
	MaxRetries int           `json:"max_retries"`
	RetryDelay time.Duration `json:"retry_delay"`
	LastError  string        `json:"last_error,omitempty"`

	// Timing
	CreatedAt   time.Time  `json:"created_at"`
	QueuedAt    *time.Time `json:"queued_at,omitempty"`
	StartedAt   *time.Time `json:"started_at,omitempty"`
	CompletedAt *time.Time `json:"completed_at,omitempty"`
	ExpiresAt   *time.Time `json:"expires_at,omitempty"`

	// Configuration
	Method         string            `json:"method"` // GET, POST, etc.
	Headers        map[string]string `json:"headers,omitempty"`
	Body           []byte            `json:"body,omitempty"`
	Cookies        map[string]string `json:"cookies,omitempty"`
	Proxy          string            `json:"proxy,omitempty"`
	UserAgent      string            `json:"user_agent,omitempty"`
	Timeout        time.Duration     `json:"timeout"`
	FollowRedirect bool              `json:"follow_redirect"`

	// Data extraction
	ExtractRules []ExtractRule `json:"extract_rules,omitempty"`
	Pipeline     Pipeline      `json:"-"`                        // Pipeline interface (not serialized)
	PipelineID   string        `json:"pipeline_id,omitempty"`    // Pipeline identifier for reconstruction
	Storage      Storage       `json:"-"`                        // Storage interface (not serialized)
	StorageConf  StorageConfig `json:"storage_config,omitempty"` // Storage configuration

	// Metadata
	Metadata map[string]interface{} `json:"metadata,omitempty"`
	Tags     []string               `json:"tags,omitempty"`
	Depth    int                    `json:"depth"`            // Crawl depth from seed
	Source   string                 `json:"source,omitempty"` // Source identifier

	// Lua script support
	ProjectID string `json:"project_id,omitempty"` // Project identifier for Lua scripts
	LuaScript string `json:"lua_script,omitempty"` // Lua script name (not path)

	// Callbacks
	OnSuccess  string `json:"on_success,omitempty"`  // Webhook or queue for success
	OnFailure  string `json:"on_failure,omitempty"`  // Webhook or queue for failure
	OnComplete string `json:"on_complete,omitempty"` // Webhook or queue for completion
}

Task represents a crawler task

func FromJSON

func FromJSON(data []byte) (*Task, error)

FromJSON creates task from JSON

func NewSeedTask

func NewSeedTask(url string, extractRules []ExtractRule) *Task

NewSeedTask creates a seed task that generates more tasks

func NewTask

func NewTask(url string, taskType string, priority int) *Task

NewTask creates a new task

func (*Task) AddTag

func (t *Task) AddTag(tag string)

AddTag adds a tag to the task

func (*Task) CanRetry

func (t *Task) CanRetry() bool

CanRetry checks if task can be retried

func (*Task) Cancel

func (t *Task) Cancel()

Cancel marks task as cancelled

func (*Task) Clone

func (t *Task) Clone() *Task

Clone creates a copy of the task

func (*Task) Complete

func (t *Task) Complete()

Complete marks task as completed

func (*Task) Duration

func (t *Task) Duration() time.Duration

Duration returns task execution duration

func (*Task) Fail

func (t *Task) Fail(err error)

Fail marks task as failed

func (*Task) GetCreatedAt

func (t *Task) GetCreatedAt() time.Time

GetCreatedAt returns creation time

func (*Task) GetID

func (t *Task) GetID() string

GetID returns task ID

func (*Task) GetMetadata

func (t *Task) GetMetadata() map[string]interface{}

GetMetadata returns all metadata

func (*Task) GetPriority

func (t *Task) GetPriority() int

GetPriority returns task priority

func (*Task) GetRetryCount

func (t *Task) GetRetryCount() int

GetRetryCount returns retry count

func (*Task) GetURL

func (t *Task) GetURL() string

GetURL returns task URL

func (*Task) HasTag

func (t *Task) HasTag(tag string) bool

HasTag checks if task has a specific tag

func (*Task) IncrementRetry

func (t *Task) IncrementRetry()

IncrementRetry increments retry count

func (*Task) IsExpired

func (t *Task) IsExpired() bool

IsExpired checks if task has expired

func (*Task) Queue

func (t *Task) Queue()

Queue marks task as queued

func (*Task) SetCookie

func (t *Task) SetCookie(key, value string)

SetCookie sets a single cookie

func (*Task) SetCookies

func (t *Task) SetCookies(cookies map[string]string)

SetCookies sets request cookies

func (*Task) SetError

func (t *Task) SetError(err error)

SetError sets error message and increments retry if possible

func (*Task) SetHeader

func (t *Task) SetHeader(key, value string)

SetHeader sets a single header

func (*Task) SetHeaders

func (t *Task) SetHeaders(headers map[string]string)

SetHeaders sets request headers

func (*Task) SetMetadata

func (t *Task) SetMetadata(key string, value interface{})

SetMetadata sets a metadata value

func (*Task) Start

func (t *Task) Start()

Start marks task as started

func (*Task) ToJSON

func (t *Task) ToJSON() ([]byte, error)

ToJSON converts task to JSON

func (*Task) Validate

func (t *Task) Validate() error

Validate validates task configuration

func (*Task) WaitTime

func (t *Task) WaitTime() time.Duration

WaitTime returns time spent waiting in queue

type TaskAllocator

type TaskAllocator interface {
	Allocate(ctx context.Context, instance *TaskInstance, nodes []NodeInfo) (*NodeInfo, error)
	Release(ctx context.Context, instance *TaskInstance) error
}

TaskAllocator allocates tasks to nodes

type TaskConfig

type TaskConfig struct {
	Type       string                 `json:"type" yaml:"type"` // definition, template, script
	Definition *TaskDef               `json:"definition" yaml:"definition"`
	Template   *TaskTemplate          `json:"template" yaml:"template"`
	Script     *ScriptConfig          `json:"script" yaml:"script"`
	Metadata   map[string]interface{} `json:"metadata" yaml:"metadata"`
}

TaskConfig represents a task configuration

type TaskConfiguration

type TaskConfiguration struct {
	Priority    int             `bson:"priority" json:"priority"`       // 优先级 (1-10)
	Timeout     int             `bson:"timeout" json:"timeout"`         // 超时时间(秒)
	MaxRetries  int             `bson:"max_retries" json:"max_retries"` // 最大重试次数
	RetryDelay  int             `bson:"retry_delay" json:"retry_delay"` // 重试延迟(秒)
	Concurrency int             `bson:"concurrency" json:"concurrency"` // 并发数
	RateLimit   RateLimitConfig `bson:"rate_limit" json:"rate_limit"`   // 速率限制
	Parameters  map[string]any  `bson:"parameters" json:"parameters"`   // 自定义参数
}

TaskConfiguration 通用任务配置

type TaskDef

type TaskDef struct {
	// Basic info
	ID          string `json:"id"`
	Name        string `json:"name"`
	Version     string `json:"version"`
	Description string `json:"description"`
	Author      string `json:"author"`
	Category    string `json:"category"` // seed, page, api, browser

	// Task type
	Type     Type     `json:"type"`
	Priority Priority `json:"priority"`

	// Request configuration
	Method         string            `json:"method"`
	Headers        map[string]string `json:"headers"`
	DefaultTimeout time.Duration     `json:"default_timeout"`
	MaxRetries     int               `json:"max_retries"`
	RetryInterval  time.Duration     `json:"retry_interval"`

	// Extraction rules
	ExtractRules []ExtractRule `json:"extract_rules"`
	LinkRules    []LinkRule    `json:"link_rules"` // For generating child tasks

	// Processing
	Pipeline      string              `json:"pipeline"`   // Pipeline configuration name
	Processors    []string            `json:"processors"` // Processor names
	Storage       string              `json:"storage"`    // Storage configuration name
	Deduplication DeduplicationConfig `json:"deduplication"`

	// Validation
	Validator TaskValidator `json:"-"`
	Schema    interface{}   `json:"schema"` // JSON schema for validation

	// Hooks
	Hooks TaskHooks `json:"-"`

	// Tags and metadata
	Tags     []string               `json:"tags"`
	Labels   map[string]string      `json:"labels"`
	Metadata map[string]interface{} `json:"metadata"`

	// Status
	Active    bool      `json:"active"`
	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`
}

TaskDef defines a type of crawler task (legacy)

func (*TaskDef) CreateTask

func (td *TaskDef) CreateTask(url string, metadata map[string]interface{}) (*Task, error)

CreateTask creates a task from this definition

func (*TaskDef) GenerateChildTasks

func (td *TaskDef) GenerateChildTasks(parentTask *Task, extractedData []map[string]interface{}) ([]*Task, error)

GenerateChildTasks generates child tasks based on link rules

func (*TaskDef) Validate

func (td *TaskDef) Validate() error

Validate validates the task definition

type TaskDocument

type TaskDocument struct {
	ID          primitive.ObjectID   `bson:"_id,omitempty" json:"id"`
	Name        string               `bson:"name" json:"name"`               // 任务名称
	Type        string               `bson:"type" json:"type"`               // 任务类型 (http, api, websocket, custom)
	Category    string               `bson:"category" json:"category"`       // 任务分类 (sports, news, e-commerce, etc)
	Description string               `bson:"description" json:"description"` // 任务描述
	Version     string               `bson:"version" json:"version"`         // 任务版本
	Status      TaskStatus           `bson:"status" json:"status"`           // 任务状态
	Config      TaskConfiguration    `bson:"config" json:"config"`           // 通用配置
	Schedule    ScheduleConfig       `bson:"schedule" json:"schedule"`       // 调度配置
	Execution   ExecutionConfig      `bson:"execution" json:"execution"`     // 执行配置
	Request     RequestTemplate      `bson:"request" json:"request"`         // 请求模板
	Extraction  ExtractionConfig     `bson:"extraction" json:"extraction"`   // 数据提取配置
	Transform   TransformConfig      `bson:"transform" json:"transform"`     // 数据转换配置
	Validation  ValidationConfig     `bson:"validation" json:"validation"`   // 数据验证配置
	Storage     StorageConfiguration `bson:"storage" json:"storage"`         // 存储配置
	Monitoring  MonitoringConfig     `bson:"monitoring" json:"monitoring"`   // 监控配置
	Metadata    map[string]any       `bson:"metadata" json:"metadata"`       // 元数据
	Tags        []string             `bson:"tags" json:"tags"`               // 标签

	// Lua脚本相关配置
	ProjectID string `bson:"project_id" json:"project_id"` // 项目ID(用于数据分组)
	LuaScript string `bson:"lua_script" json:"lua_script"` // Lua脚本名称标识(如: drip_ground_board)

	CreatedAt time.Time `bson:"created_at" json:"created_at"`
	UpdatedAt time.Time `bson:"updated_at" json:"updated_at"`
	CreatedBy string    `bson:"created_by" json:"created_by"`
	UpdatedBy string    `bson:"updated_by" json:"updated_by"`
}

TaskDocument MongoDB中的任务文档结构 - 完全通用的设计

type TaskFilter

type TaskFilter struct {
	Type     string
	Category string
	Enabled  *bool
	Tags     []string
	SortBy   string
	SortDesc bool
	Limit    int
	Offset   int
}

TaskFilter 任务过滤器

type TaskHooks

type TaskHooks struct {
	BeforeExecute func(ctx context.Context, task *Task) error
	AfterExecute  func(ctx context.Context, task *Task, result interface{}) error
	OnSuccess     func(ctx context.Context, task *Task, data interface{}) error
	OnFailure     func(ctx context.Context, task *Task, err error) error
	OnRetry       func(ctx context.Context, task *Task, attempt int) error
}

TaskHooks defines lifecycle hooks

type TaskInstance

type TaskInstance struct {
	ID           string    `json:"id"`
	DefinitionID string    `json:"definition_id"`
	Task         *Task     `json:"task"`
	State        TaskState `json:"state"`
	NodeID       string    `json:"node_id"`
	WorkerID     string    `json:"worker_id"`

	// Context
	Context   map[string]interface{} `json:"context"`
	Variables map[string]interface{} `json:"variables"`

	// Timing
	CreatedAt   time.Time  `json:"created_at"`
	StartedAt   *time.Time `json:"started_at"`
	CompletedAt *time.Time `json:"completed_at"`

	// Results
	Result     interface{} `json:"result"`
	Error      string      `json:"error"`
	RetryCount int         `json:"retry_count"`

	// Dependencies
	Dependencies []string `json:"dependencies"`
	DependsOn    []string `json:"depends_on"`

	// Monitoring
	LastHeartbeat time.Time              `json:"last_heartbeat"`
	Progress      float64                `json:"progress"`
	Metrics       map[string]interface{} `json:"metrics"`
}

TaskInstance represents a running task instance

type TaskManager

type TaskManager struct {
	// contains filtered or unexported fields
}

TaskManager manages task instances and their lifecycle

func NewTaskManager

func NewTaskManager(config *ManagerConfig) *TaskManager

NewTaskManager creates a new task manager

func (*TaskManager) AllocateTask

func (tm *TaskManager) AllocateTask(ctx context.Context, instanceID string) error

AllocateTask allocates a task to a node

func (*TaskManager) CreateInstance

func (tm *TaskManager) CreateInstance(definitionID string, params map[string]interface{}) (*TaskInstance, error)

CreateInstance creates a new task instance

func (*TaskManager) CreateInstanceFromTemplate

func (tm *TaskManager) CreateInstanceFromTemplate(templateID string, params map[string]interface{}) (*TaskInstance, error)

CreateInstanceFromTemplate creates instance from template

func (*TaskManager) GetInstance

func (tm *TaskManager) GetInstance(instanceID string) (*TaskInstance, error)

GetInstance retrieves a task instance

func (*TaskManager) GetMetrics

func (tm *TaskManager) GetMetrics() map[string]interface{}

GetMetrics returns task manager metrics

func (*TaskManager) Start

func (tm *TaskManager) Start(ctx context.Context) error

Start starts the task manager

func (*TaskManager) Stop

func (tm *TaskManager) Stop() error

Stop stops the task manager

func (*TaskManager) UpdateProgress

func (tm *TaskManager) UpdateProgress(instanceID string, progress float64, metrics map[string]interface{}) error

UpdateProgress updates task progress

func (*TaskManager) UpdateState

func (tm *TaskManager) UpdateState(instanceID string, state TaskState, metadata map[string]interface{}) error

UpdateState updates task instance state

type TaskRegistry

type TaskRegistry struct {
	// contains filtered or unexported fields
}

TaskRegistry manages task definitions and templates

func NewTaskRegistry

func NewTaskRegistry(redisClient *redis.Client, prefix string, logger *slog.Logger) *TaskRegistry

NewTaskRegistry creates a new task registry

func (*TaskRegistry) CreateTaskFromTemplate

func (tr *TaskRegistry) CreateTaskFromTemplate(templateID string, params map[string]interface{}) (*Task, error)

CreateTaskFromTemplate creates a task from a template

func (*TaskRegistry) GetDefinition

func (tr *TaskRegistry) GetDefinition(id string) (*TaskDef, error)

GetDefinition retrieves a task definition

func (*TaskRegistry) GetTemplate

func (tr *TaskRegistry) GetTemplate(id string) (*TaskTemplate, error)

GetTemplate retrieves a task template

func (*TaskRegistry) GetValidator

func (tr *TaskRegistry) GetValidator(name string) (TaskValidator, error)

GetValidator gets a task validator

func (*TaskRegistry) ListDefinitions

func (tr *TaskRegistry) ListDefinitions() ([]*TaskDef, error)

ListDefinitions lists all task definitions

func (*TaskRegistry) LoadDefinitionsFromRedis

func (tr *TaskRegistry) LoadDefinitionsFromRedis(ctx context.Context) error

LoadDefinitionsFromRedis loads all definitions from Redis

func (*TaskRegistry) LoadTemplatesFromRedis

func (tr *TaskRegistry) LoadTemplatesFromRedis(ctx context.Context) error

LoadTemplatesFromRedis loads all templates from Redis

func (*TaskRegistry) RegisterDefinition

func (tr *TaskRegistry) RegisterDefinition(def *TaskDef) error

RegisterDefinition registers a task definition

func (*TaskRegistry) RegisterTemplate

func (tr *TaskRegistry) RegisterTemplate(tmpl *TaskTemplate) error

RegisterTemplate registers a task template

func (*TaskRegistry) RegisterValidator

func (tr *TaskRegistry) RegisterValidator(name string, validator TaskValidator)

RegisterValidator registers a task validator

type TaskResult

type TaskResult struct {
	TaskID  string                 `json:"task_id"`
	Status  string                 `json:"status"`
	Message string                 `json:"message,omitempty"`
	Data    map[string]interface{} `json:"data,omitempty"`
	Links   []string               `json:"links,omitempty"`
	Error   string                 `json:"error,omitempty"`

	// Metrics
	StartTime      int64 `json:"start_time"`
	EndTime        int64 `json:"end_time"`
	Duration       int64 `json:"duration"` // in milliseconds
	BytesFetched   int64 `json:"bytes_fetched"`
	ItemsExtracted int   `json:"items_extracted"`
}

TaskResult represents task execution result

type TaskRouter

type TaskRouter struct {
	// contains filtered or unexported fields
}

TaskRouter routes tasks based on rules

func (*TaskRouter) AddRule

func (tr *TaskRouter) AddRule(rule RoutingRule)

AddRule adds a routing rule

func (*TaskRouter) Route

func (tr *TaskRouter) Route(task *Task) string

Route determines where to route a task

type TaskScheduler

type TaskScheduler interface {
	// AddTask adds a task to the scheduler
	AddTask(task *Task) error

	// GetNextTask gets the next task to execute
	GetNextTask() (*Task, error)

	// UpdateTask updates task status
	UpdateTask(task *Task) error

	// GetStats returns scheduler statistics
	GetStats() map[string]interface{}
}

TaskScheduler defines the interface for task scheduling

type TaskState

type TaskState string

TaskState represents task execution state

const (
	TaskStateCreated   TaskState = "created"
	TaskStateAllocated TaskState = "allocated"
	TaskStateRunning   TaskState = "running"
	TaskStatePaused    TaskState = "paused"
	TaskStateCompleted TaskState = "completed"
	TaskStateFailed    TaskState = "failed"
	TaskStateCancelled TaskState = "cancelled"
	TaskStateTimeout   TaskState = "timeout"
)

type TaskStatus

type TaskStatus struct {
	Enabled      bool      `bson:"enabled" json:"enabled"`             // 是否启用
	LastRun      time.Time `bson:"last_run" json:"last_run"`           // 最后运行时间
	NextRun      time.Time `bson:"next_run" json:"next_run"`           // 下次运行时间
	RunCount     int64     `bson:"run_count" json:"run_count"`         // 运行次数
	SuccessCount int64     `bson:"success_count" json:"success_count"` // 成功次数
	FailureCount int64     `bson:"failure_count" json:"failure_count"` // 失败次数
	LastSuccess  time.Time `bson:"last_success" json:"last_success"`   // 最后成功时间
	LastFailure  time.Time `bson:"last_failure" json:"last_failure"`   // 最后失败时间
	ErrorMessage string    `bson:"error_message" json:"error_message"` // 错误信息
}

TaskStatus 任务状态

type TaskTemplate

type TaskTemplate struct {
	ID          string `json:"id"`
	Name        string `json:"name"`
	Description string `json:"description"`
	Definition  string `json:"definition"` // Definition ID

	// URL generation
	URLPattern string                 `json:"url_pattern"`
	URLParams  map[string]interface{} `json:"url_params"`

	// Overrides
	Headers    map[string]string      `json:"headers"`
	Metadata   map[string]interface{} `json:"metadata"`
	Priority   *Priority              `json:"priority"`
	MaxRetries *int                   `json:"max_retries"`

	// Schedule
	Schedule string `json:"schedule"` // Cron expression
	Enabled  bool   `json:"enabled"`

	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`
}

TaskTemplate is a template for creating tasks

type TaskValidator

type TaskValidator interface {
	Validate(task *Task) error
	ValidateData(data map[string]interface{}) error
}

TaskValidator validates task data

type Template

type Template struct {
	Name    string `bson:"name" json:"name"`       // 模板名称
	Content string `bson:"content" json:"content"` // 模板内容
}

Template 模板

type TracingConfig

type TracingConfig struct {
	Enabled     bool    `bson:"enabled" json:"enabled"`
	SampleRate  float64 `bson:"sample_rate" json:"sample_rate"`
	ServiceName string  `bson:"service_name" json:"service_name"`
}

TracingConfig 追踪配置

type Transform

type Transform struct {
	Type   string         `bson:"type" json:"type"`     // 转换类型
	Config map[string]any `bson:"config" json:"config"` // 转换配置
}

Transform 转换规则

type TransformConfig

type TransformConfig struct {
	Enabled    bool        `bson:"enabled" json:"enabled"`
	Processors []Processor `bson:"processors" json:"processors"` // 处理器链
}

TransformConfig 数据转换配置

type TransformRule

type TransformRule struct {
	Type   string                 `json:"type"` // trim, replace, regex, format, split, join
	Params map[string]interface{} `json:"params,omitempty"`
}

TransformRule defines data transformation

type Type

type Type string

Type defines task types

const (
	TypeSeed      Type = "seed"      // Seed URL that generates more tasks
	TypeDetail    Type = "detail"    // Detail page crawling
	TypeList      Type = "list"      // List page crawling
	TypeAPI       Type = "api"       // API endpoint crawling
	TypeBrowser   Type = "browser"   // Browser-based crawling (JS rendering)
	TypeAggregate Type = "aggregate" // Aggregate multiple sources
)

type Validation

type Validation struct {
	Type    string `bson:"type" json:"type"`       // 验证类型
	Field   string `bson:"field" json:"field"`     // 字段
	Rule    string `bson:"rule" json:"rule"`       // 规则表达式
	Message string `bson:"message" json:"message"` // 错误消息
}

Validation 验证规则

type ValidationConfig

type ValidationConfig struct {
	Enabled bool         `bson:"enabled" json:"enabled"`
	Rules   []Validation `bson:"rules" json:"rules"` // 验证规则
}

ValidationConfig 数据验证配置

type Variable

type Variable struct {
	Name       string        `bson:"name" json:"name"`                             // 变量名
	Type       string        `bson:"type" json:"type"`                             // 类型 (string, number, date, array)
	Source     string        `bson:"source" json:"source"`                         // 来源 (env, context, function, static)
	Expression string        `bson:"expression" json:"expression"`                 // 表达式
	Default    any           `bson:"default" json:"default"`                       // 默认值
	Strategy   *DateStrategy `bson:"strategy,omitempty" json:"strategy,omitempty"` // 日期策略(仅当Type为date时使用)
}

Variable 变量定义

type Writer

type Writer interface {
	io.Writer
	io.Closer

	// WriteRecord writes a single record
	WriteRecord(record interface{}) error

	// WriteBatch writes multiple records
	WriteBatch(records []interface{}) error

	// Flush flushes any buffered data
	Flush() error
}

Writer defines the interface for writing output

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL