Documentation
¶
Index ¶
- type Alert
- type AlertingConfig
- type AuthConfig
- type Batch
- type ConfigLoader
- type ConfigSource
- type DateStrategy
- type DeduplicationConfig
- type DefaultSeedExpander
- type DefaultTaskAllocator
- type DelayedQueue
- type DispatchStrategy
- type Dispatcher
- func (d *Dispatcher) GetStats() map[string]interface{}
- func (d *Dispatcher) RegisterNode(node *NodeInfo) error
- func (d *Dispatcher) Start(ctx context.Context) error
- func (d *Dispatcher) Stop() error
- func (d *Dispatcher) Submit(ctx context.Context, task *Task) error
- func (d *Dispatcher) SubmitBatch(ctx context.Context, tasks []*Task) error
- func (d *Dispatcher) UnregisterNode(nodeID string) error
- func (d *Dispatcher) UpdateNodeHeartbeat(nodeID string, load int) error
- type DispatcherConfig
- type ErrorHandling
- type Event
- type ExecutionConfig
- type Executor
- type ExtractRule
- type ExtractionConfig
- type ExtractionRule
- type Extractor
- type Fetcher
- type FileSource
- type HookConfig
- type LinkRule
- type LoaderConfig
- type LoggingConfig
- type ManagerConfig
- type MemoryQueue
- func (mq *MemoryQueue) Clear(ctx context.Context) error
- func (mq *MemoryQueue) Peek(ctx context.Context) (*Task, error)
- func (mq *MemoryQueue) Pop(ctx context.Context) (*Task, error)
- func (mq *MemoryQueue) PopN(ctx context.Context, n int) ([]*Task, error)
- func (mq *MemoryQueue) Push(ctx context.Context, task *Task) error
- func (mq *MemoryQueue) PushBatch(ctx context.Context, tasks []*Task) error
- func (mq *MemoryQueue) Size(ctx context.Context) (int64, error)
- func (mq *MemoryQueue) Stats(ctx context.Context) (*QueueStats, error)
- type MetricsConfig
- type MonitoringConfig
- type NodeInfo
- type NodeManager
- type PaginationConfig
- type Pipeline
- type Priority
- type PriorityQueue
- func (pq *PriorityQueue) Clear(ctx context.Context) error
- func (pq *PriorityQueue) Peek(ctx context.Context) (*Task, error)
- func (pq *PriorityQueue) Pop(ctx context.Context) (*Task, error)
- func (pq *PriorityQueue) PopN(ctx context.Context, n int) ([]*Task, error)
- func (pq *PriorityQueue) Push(ctx context.Context, task *Task) error
- func (pq *PriorityQueue) PushBatch(ctx context.Context, tasks []*Task) error
- func (pq *PriorityQueue) Size(ctx context.Context) (int64, error)
- func (pq *PriorityQueue) Stats(ctx context.Context) (*QueueStats, error)
- type Processor
- type ProxyConfig
- type Queue
- type QueueStats
- type QueueType
- type RateLimitConfig
- type RedisQueue
- func (rq *RedisQueue) Clear(ctx context.Context) error
- func (rq *RedisQueue) Peek(ctx context.Context) (*Task, error)
- func (rq *RedisQueue) Pop(ctx context.Context) (*Task, error)
- func (rq *RedisQueue) PopN(ctx context.Context, n int) ([]*Task, error)
- func (rq *RedisQueue) Push(ctx context.Context, task *Task) error
- func (rq *RedisQueue) PushBatch(ctx context.Context, tasks []*Task) error
- func (rq *RedisQueue) Size(ctx context.Context) (int64, error)
- func (rq *RedisQueue) Stats(ctx context.Context) (*QueueStats, error)
- type RedisSource
- type RequestTemplate
- type Response
- type RoutingRule
- type Schedule
- type ScheduleBuilder
- func (sb *ScheduleBuilder) Build() *Schedule
- func (sb *ScheduleBuilder) WithCron(expression string) *ScheduleBuilder
- func (sb *ScheduleBuilder) WithEvent(eventType string) *ScheduleBuilder
- func (sb *ScheduleBuilder) WithInterval(interval time.Duration) *ScheduleBuilder
- func (sb *ScheduleBuilder) WithMaxRuns(maxRuns int) *ScheduleBuilder
- func (sb *ScheduleBuilder) WithOnce(at time.Time) *ScheduleBuilder
- func (sb *ScheduleBuilder) WithTask(task *Task) *ScheduleBuilder
- type ScheduleCond
- type ScheduleConfig
- type ScheduleType
- type Scheduler
- func (s *Scheduler) AddSchedule(schedule *Schedule) error
- func (s *Scheduler) DisableSchedule(scheduleID string) error
- func (s *Scheduler) EnableSchedule(scheduleID string) error
- func (s *Scheduler) GetSchedule(scheduleID string) (*Schedule, error)
- func (s *Scheduler) GetStats() map[string]interface{}
- func (s *Scheduler) ListSchedules() []*Schedule
- func (s *Scheduler) RemoveSchedule(scheduleID string) error
- func (s *Scheduler) Start(ctx context.Context) error
- func (s *Scheduler) Stop() error
- func (s *Scheduler) TriggerEvent(event *Event)
- func (s *Scheduler) TriggerSchedule(scheduleID string) error
- type SchedulerConfig
- type Script
- type ScriptConfig
- type SeedExpander
- type Service
- func (s *Service) CreateTask(task *TaskDocument) (*TaskDocument, error)
- func (s *Service) DeleteTask(id string) error
- func (s *Service) DisableTask(id string) error
- func (s *Service) EnableTask(id string) error
- func (s *Service) GetReadyTasks() ([]*TaskDocument, error)
- func (s *Service) GetTask(id string) (*TaskDocument, error)
- func (s *Service) ListTasks(filter TaskFilter) ([]*TaskDocument, error)
- func (s *Service) UpdateTask(id string, update bson.M) error
- func (s *Service) UpdateTaskRaw(id primitive.ObjectID, update interface{}) (*mongo.UpdateResult, error)
- type Status
- type Storage
- type StorageConfig
- type StorageConfiguration
- type StorageTarget
- type Task
- func (t *Task) AddTag(tag string)
- func (t *Task) CanRetry() bool
- func (t *Task) Cancel()
- func (t *Task) Clone() *Task
- func (t *Task) Complete()
- func (t *Task) Duration() time.Duration
- func (t *Task) Fail(err error)
- func (t *Task) GetCreatedAt() time.Time
- func (t *Task) GetID() string
- func (t *Task) GetMetadata() map[string]interface{}
- func (t *Task) GetPriority() int
- func (t *Task) GetRetryCount() int
- func (t *Task) GetURL() string
- func (t *Task) HasTag(tag string) bool
- func (t *Task) IncrementRetry()
- func (t *Task) IsExpired() bool
- func (t *Task) Queue()
- func (t *Task) SetCookie(key, value string)
- func (t *Task) SetCookies(cookies map[string]string)
- func (t *Task) SetError(err error)
- func (t *Task) SetHeader(key, value string)
- func (t *Task) SetHeaders(headers map[string]string)
- func (t *Task) SetMetadata(key string, value interface{})
- func (t *Task) Start()
- func (t *Task) ToJSON() ([]byte, error)
- func (t *Task) Validate() error
- func (t *Task) WaitTime() time.Duration
- type TaskAllocator
- type TaskConfig
- type TaskConfiguration
- type TaskDef
- type TaskDocument
- type TaskFilter
- type TaskHooks
- type TaskInstance
- type TaskManager
- func (tm *TaskManager) AllocateTask(ctx context.Context, instanceID string) error
- func (tm *TaskManager) CreateInstance(definitionID string, params map[string]interface{}) (*TaskInstance, error)
- func (tm *TaskManager) CreateInstanceFromTemplate(templateID string, params map[string]interface{}) (*TaskInstance, error)
- func (tm *TaskManager) GetInstance(instanceID string) (*TaskInstance, error)
- func (tm *TaskManager) GetMetrics() map[string]interface{}
- func (tm *TaskManager) Start(ctx context.Context) error
- func (tm *TaskManager) Stop() error
- func (tm *TaskManager) UpdateProgress(instanceID string, progress float64, metrics map[string]interface{}) error
- func (tm *TaskManager) UpdateState(instanceID string, state TaskState, metadata map[string]interface{}) error
- type TaskRegistry
- func (tr *TaskRegistry) CreateTaskFromTemplate(templateID string, params map[string]interface{}) (*Task, error)
- func (tr *TaskRegistry) GetDefinition(id string) (*TaskDef, error)
- func (tr *TaskRegistry) GetTemplate(id string) (*TaskTemplate, error)
- func (tr *TaskRegistry) GetValidator(name string) (TaskValidator, error)
- func (tr *TaskRegistry) ListDefinitions() ([]*TaskDef, error)
- func (tr *TaskRegistry) LoadDefinitionsFromRedis(ctx context.Context) error
- func (tr *TaskRegistry) LoadTemplatesFromRedis(ctx context.Context) error
- func (tr *TaskRegistry) RegisterDefinition(def *TaskDef) error
- func (tr *TaskRegistry) RegisterTemplate(tmpl *TaskTemplate) error
- func (tr *TaskRegistry) RegisterValidator(name string, validator TaskValidator)
- type TaskResult
- type TaskRouter
- type TaskScheduler
- type TaskState
- type TaskStatus
- type TaskTemplate
- type TaskValidator
- type Template
- type TracingConfig
- type Transform
- type TransformConfig
- type TransformRule
- type Type
- type Validation
- type ValidationConfig
- type Variable
- type Writer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Alert ¶
type Alert struct {
Name string `bson:"name" json:"name"` // 告警名称
Condition string `bson:"condition" json:"condition"` // 触发条件
Channels []string `bson:"channels" json:"channels"` // 通知渠道
}
Alert 告警规则
type AlertingConfig ¶
type AlertingConfig struct {
Enabled bool `bson:"enabled" json:"enabled"`
Rules []Alert `bson:"rules" json:"rules"` // 告警规则
}
AlertingConfig 告警配置
type AuthConfig ¶
type AuthConfig struct {
Type string `bson:"type" json:"type"` // 认证类型 (none, basic, bearer, oauth2, custom)
Config map[string]any `bson:"config" json:"config"` // 认证配置
}
AuthConfig 认证配置
type Batch ¶
type Batch struct {
ID string `json:"id"`
Tasks []*Task `json:"tasks"`
CreatedAt time.Time `json:"created_at"`
Source string `json:"source"`
}
Batch represents a batch of tasks
func NewTaskBatch ¶
NewTaskBatch creates a new task batch
type ConfigLoader ¶
type ConfigLoader struct {
// contains filtered or unexported fields
}
ConfigLoader loads task configurations from various sources
func NewConfigLoader ¶
func NewConfigLoader(config *LoaderConfig) *ConfigLoader
NewConfigLoader creates a new configuration loader
func (*ConfigLoader) AddSource ¶
func (cl *ConfigLoader) AddSource(source ConfigSource)
AddSource adds a configuration source
type ConfigSource ¶
type ConfigSource interface {
Name() string
Load(ctx context.Context) ([]*TaskConfig, error)
}
ConfigSource represents a configuration source
type DateStrategy ¶
type DateStrategy struct {
Mode string `bson:"mode" json:"mode"` // 模式: single(单个日期), range(日期范围), custom(自定义)
Format string `bson:"format" json:"format"` // 日期格式,如 "2006-01-02"
StartDays int `bson:"start_days" json:"start_days"` // 起始天数偏移(相对于今天,可为负数)
EndDays int `bson:"end_days" json:"end_days"` // 结束天数偏移(相对于今天)
DayStep int `bson:"day_step" json:"day_step"` // 步长(默认1,表示每天)
CustomDays []int `bson:"custom_days,omitempty" json:"custom_days,omitempty"` // 自定义天数偏移列表
}
DateStrategy 日期变量策略
type DeduplicationConfig ¶
type DeduplicationConfig struct {
Enabled bool `json:"enabled"`
Strategy string `json:"strategy"` // url, content, custom
Fields []string `json:"fields"` // Fields to use for deduplication
Expiration time.Duration `json:"expiration"`
}
DeduplicationConfig defines deduplication settings
type DefaultSeedExpander ¶
type DefaultSeedExpander struct{}
DefaultSeedExpander is the default seed expander
func NewDefaultSeedExpander ¶
func NewDefaultSeedExpander() *DefaultSeedExpander
NewDefaultSeedExpander creates a default seed expander
func (*DefaultSeedExpander) Expand ¶
func (e *DefaultSeedExpander) Expand(data []byte) []*Task
Expand expands seed data into tasks
type DefaultTaskAllocator ¶
type DefaultTaskAllocator struct {
// contains filtered or unexported fields
}
DefaultTaskAllocator is the default task allocator
func NewDefaultTaskAllocator ¶
func NewDefaultTaskAllocator(strategy string) *DefaultTaskAllocator
NewDefaultTaskAllocator creates a default task allocator
func (*DefaultTaskAllocator) Allocate ¶
func (a *DefaultTaskAllocator) Allocate(ctx context.Context, instance *TaskInstance, nodes []NodeInfo) (*NodeInfo, error)
Allocate allocates a task to a node
func (*DefaultTaskAllocator) Release ¶
func (a *DefaultTaskAllocator) Release(ctx context.Context, instance *TaskInstance) error
Release releases a task allocation
type DelayedQueue ¶
type DelayedQueue struct {
// contains filtered or unexported fields
}
DelayedQueue manages delayed task execution
func NewDelayedQueue ¶
func NewDelayedQueue(queue Queue, client *redis.Client, prefix string) *DelayedQueue
NewDelayedQueue creates a new delayed queue
func (*DelayedQueue) ProcessDelayed ¶
func (dq *DelayedQueue) ProcessDelayed(ctx context.Context) error
ProcessDelayed moves ready tasks to main queue
func (*DelayedQueue) PushDelayed ¶
PushDelayed adds a task with delay
func (*DelayedQueue) StartProcessor ¶
func (dq *DelayedQueue) StartProcessor(ctx context.Context, interval time.Duration)
StartProcessor starts the delayed task processor
type DispatchStrategy ¶
type DispatchStrategy string
DispatchStrategy defines how tasks are distributed
const ( StrategyRoundRobin DispatchStrategy = "round_robin" StrategyLeastLoad DispatchStrategy = "least_load" StrategyRandom DispatchStrategy = "random" StrategyHash DispatchStrategy = "hash" StrategySticky DispatchStrategy = "sticky" )
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher distributes tasks to worker nodes
func NewDispatcher ¶
func NewDispatcher(config *DispatcherConfig) *Dispatcher
NewDispatcher creates a new task dispatcher
func (*Dispatcher) GetStats ¶
func (d *Dispatcher) GetStats() map[string]interface{}
GetStats returns dispatcher statistics
func (*Dispatcher) RegisterNode ¶
func (d *Dispatcher) RegisterNode(node *NodeInfo) error
RegisterNode registers a worker node
func (*Dispatcher) Start ¶
func (d *Dispatcher) Start(ctx context.Context) error
Start starts the dispatcher
func (*Dispatcher) Submit ¶
func (d *Dispatcher) Submit(ctx context.Context, task *Task) error
Submit submits a task for dispatch
func (*Dispatcher) SubmitBatch ¶
func (d *Dispatcher) SubmitBatch(ctx context.Context, tasks []*Task) error
SubmitBatch submits multiple tasks
func (*Dispatcher) UnregisterNode ¶
func (d *Dispatcher) UnregisterNode(nodeID string) error
UnregisterNode unregisters a worker node
func (*Dispatcher) UpdateNodeHeartbeat ¶
func (d *Dispatcher) UpdateNodeHeartbeat(nodeID string, load int) error
UpdateNodeHeartbeat updates node heartbeat
type DispatcherConfig ¶
type DispatcherConfig struct {
QueueType QueueType
Strategy DispatchStrategy
BatchSize int
MaxRetries int
Timeout time.Duration
RedisClient *redis.Client
RedisPrefix string
Logger *slog.Logger
}
DispatcherConfig contains dispatcher configuration
type ErrorHandling ¶
type ErrorHandling struct {
Strategy string `bson:"strategy" json:"strategy"` // 策略 (retry, skip, fail, notify)
RetryStrategy string `bson:"retry_strategy" json:"retry_strategy"` // 重试策略 (exponential, linear, fixed)
MaxRetries int `bson:"max_retries" json:"max_retries"` // 最大重试次数
NotifyOn []string `bson:"notify_on" json:"notify_on"` // 通知条件
}
ErrorHandling 错误处理配置
type Event ¶
type Event struct {
Type string `json:"type"`
Source string `json:"source"`
Data map[string]interface{} `json:"data"`
Timestamp time.Time `json:"timestamp"`
}
Event represents a trigger event
type ExecutionConfig ¶
type ExecutionConfig struct {
Mode string `bson:"mode" json:"mode"` // 执行模式 (single, batch, stream)
BatchSize int `bson:"batch_size" json:"batch_size"` // 批量大小
Parallelism int `bson:"parallelism" json:"parallelism"` // 并行度
Dependencies []string `bson:"dependencies" json:"dependencies"` // 依赖任务
PreHooks []HookConfig `bson:"pre_hooks" json:"pre_hooks"` // 前置钩子
PostHooks []HookConfig `bson:"post_hooks" json:"post_hooks"` // 后置钩子
ErrorHandling ErrorHandling `bson:"error_handling" json:"error_handling"` // 错误处理
}
ExecutionConfig 执行配置
type Executor ¶
type Executor interface {
// Execute executes the task
Execute(ctx context.Context, task *Task) (*TaskResult, error)
// SetFetcher sets the fetcher
SetFetcher(fetcher Fetcher)
// SetExtractor sets the extractor
SetExtractor(extractor Extractor)
// SetPipeline sets the pipeline
SetPipeline(pipeline Pipeline)
// SetStorage sets the storage
SetStorage(storage Storage)
}
Executor defines the interface for task execution
type ExtractRule ¶
type ExtractRule struct {
Field string `json:"field"`
Selector string `json:"selector"`
Type string `json:"type"` // css, xpath, json, regex
Attribute string `json:"attribute,omitempty"`
Multiple bool `json:"multiple"`
Required bool `json:"required"`
Default interface{} `json:"default,omitempty"`
Transform []TransformRule `json:"transform,omitempty"`
Children []ExtractRule `json:"children,omitempty"` // For nested extraction
}
ExtractRule defines data extraction rules
type ExtractionConfig ¶
type ExtractionConfig struct {
Type string `bson:"type" json:"type"` // 提取类型 (json, html, xml, regex, custom)
Rules []ExtractionRule `bson:"rules" json:"rules"` // 提取规则
Scripts []Script `bson:"scripts" json:"scripts"` // 自定义脚本
Templates []Template `bson:"templates" json:"templates"` // 模板
}
ExtractionConfig 数据提取配置 - 支持任何数据格式
type ExtractionRule ¶
type ExtractionRule struct {
Field string `bson:"field" json:"field"` // 字段名
Path string `bson:"path" json:"path"` // 路径表达式
Type string `bson:"type" json:"type"` // 数据类型
Required bool `bson:"required" json:"required"` // 是否必需
Default any `bson:"default" json:"default"` // 默认值
Transform []Transform `bson:"transform" json:"transform"` // 转换规则
Validation []Validation `bson:"validation" json:"validation"` // 验证规则
Children []ExtractionRule `bson:"children" json:"children"` // 子规则(嵌套结构)
}
ExtractionRule 提取规则
type Extractor ¶
type Extractor interface {
// Extract extracts data from the response
Extract(ctx context.Context, response *Response, rules []ExtractRule) (map[string]interface{}, error)
// ExtractLinks extracts links for further crawling
ExtractLinks(ctx context.Context, response *Response) ([]string, error)
// GetType returns the extractor type (css, xpath, json, regex)
GetType() string
}
Extractor defines the interface for data extraction
type Fetcher ¶
type Fetcher interface {
// Fetch fetches content from the given URL
Fetch(ctx context.Context, task *Task) (*Response, error)
// GetType returns the fetcher type (http, browser, api)
GetType() string
}
Fetcher defines the interface for fetching content
type FileSource ¶
type FileSource struct {
// contains filtered or unexported fields
}
FileSource loads configurations from files
func NewFileSource ¶
func NewFileSource(path string, logger *slog.Logger) *FileSource
NewFileSource creates a new file configuration source
func (*FileSource) Load ¶
func (fs *FileSource) Load(ctx context.Context) ([]*TaskConfig, error)
Load loads configurations from file
type HookConfig ¶
type HookConfig struct {
Type string `bson:"type" json:"type"` // 钩子类型 (http, script, function)
Action string `bson:"action" json:"action"` // 动作
Config map[string]any `bson:"config" json:"config"` // 配置
}
HookConfig 钩子配置
type LinkRule ¶
type LinkRule struct {
Name string `json:"name"`
Selector string `json:"selector"`
Type string `json:"type"` // css, xpath, regex, json
Attribute string `json:"attribute"`
Pattern string `json:"pattern"` // URL pattern to match
TaskType Type `json:"task_type"` // Type for generated tasks
Priority Priority `json:"priority"`
Depth int `json:"depth"` // Max depth for this rule
Metadata interface{} `json:"metadata"` // Metadata to add to generated tasks
}
LinkRule defines rules for extracting and generating child tasks
type LoaderConfig ¶
type LoaderConfig struct {
Registry *TaskRegistry
Manager *TaskManager
Redis *redis.Client
Prefix string
Logger *slog.Logger
ConfigPaths []string
WatchFiles bool
}
LoaderConfig contains configuration loader settings
type LoggingConfig ¶
type LoggingConfig struct {
Level string `bson:"level" json:"level"` // 日志级别
SampleRate float64 `bson:"sample_rate" json:"sample_rate"` // 采样率
}
LoggingConfig 日志配置
type ManagerConfig ¶
type ManagerConfig struct {
Registry *TaskRegistry
Redis *redis.Client
Prefix string
Logger *slog.Logger
Allocator TaskAllocator
NodeManager NodeManager
MaxActiveTasks int
TaskTimeout time.Duration
CleanupInterval time.Duration
StateCheckInterval time.Duration
}
ManagerConfig contains task manager configuration
type MemoryQueue ¶
type MemoryQueue struct {
// contains filtered or unexported fields
}
MemoryQueue is an in-memory task queue
func NewMemoryQueue ¶
func NewMemoryQueue(capacity int) *MemoryQueue
NewMemoryQueue creates a new memory queue
func (*MemoryQueue) Clear ¶
func (mq *MemoryQueue) Clear(ctx context.Context) error
Clear removes all tasks
func (*MemoryQueue) Peek ¶
func (mq *MemoryQueue) Peek(ctx context.Context) (*Task, error)
Peek retrieves but doesn't remove a task
func (*MemoryQueue) Pop ¶
func (mq *MemoryQueue) Pop(ctx context.Context) (*Task, error)
Pop retrieves and removes a task
func (*MemoryQueue) Push ¶
func (mq *MemoryQueue) Push(ctx context.Context, task *Task) error
Push adds a task to the queue
func (*MemoryQueue) PushBatch ¶
func (mq *MemoryQueue) PushBatch(ctx context.Context, tasks []*Task) error
PushBatch adds multiple tasks
func (*MemoryQueue) Size ¶
func (mq *MemoryQueue) Size(ctx context.Context) (int64, error)
Size returns queue size
func (*MemoryQueue) Stats ¶
func (mq *MemoryQueue) Stats(ctx context.Context) (*QueueStats, error)
Stats returns queue statistics
type MetricsConfig ¶
type MetricsConfig struct {
Enabled bool `bson:"enabled" json:"enabled"`
Collectors []string `bson:"collectors" json:"collectors"` // 收集器
Labels map[string]string `bson:"labels" json:"labels"` // 标签
}
MetricsConfig 指标配置
type MonitoringConfig ¶
type MonitoringConfig struct {
Metrics MetricsConfig `bson:"metrics" json:"metrics"` // 指标配置
Logging LoggingConfig `bson:"logging" json:"logging"` // 日志配置
Alerting AlertingConfig `bson:"alerting" json:"alerting"` // 告警配置
Tracing TracingConfig `bson:"tracing" json:"tracing"` // 追踪配置
}
MonitoringConfig 监控配置
type NodeInfo ¶
type NodeInfo struct {
ID string `json:"id"`
Address string `json:"address"`
Capacity int `json:"capacity"`
CurrentLoad int `json:"current_load"`
Status string `json:"status"`
LastHeartbeat time.Time `json:"last_heartbeat"`
Tags []string `json:"tags"`
Capabilities []string `json:"capabilities"`
}
NodeInfo contains worker node information
type NodeManager ¶
type NodeManager interface {
GetAvailableNodes(ctx context.Context) ([]NodeInfo, error)
GetNode(ctx context.Context, nodeID string) (*NodeInfo, error)
UpdateNodeLoad(ctx context.Context, nodeID string, delta int) error
}
NodeManager manages node information
type PaginationConfig ¶
type PaginationConfig struct {
Enabled bool `bson:"enabled" json:"enabled"`
Type string `bson:"type" json:"type"` // 分页类型 (page, offset, cursor, token)
StartParam string `bson:"start_param" json:"start_param"` // 起始参数名
LimitParam string `bson:"limit_param" json:"limit_param"` // 限制参数名
StartValue any `bson:"start_value" json:"start_value"` // 起始值
PageSize int `bson:"page_size" json:"page_size"` // 页大小
MaxPages int `bson:"max_pages" json:"max_pages"` // 最大页数
StopCondition string `bson:"stop_condition" json:"stop_condition"` // 停止条件
}
PaginationConfig 分页配置
type Pipeline ¶
type Pipeline interface {
// Process processes the extracted data through the pipeline
Process(ctx context.Context, data map[string]interface{}) (map[string]interface{}, error)
// GetID returns the pipeline identifier
GetID() string
// Validate validates the data against pipeline rules
Validate(data map[string]interface{}) error
}
Pipeline defines the interface for data processing pipeline
type PriorityQueue ¶
type PriorityQueue struct {
// contains filtered or unexported fields
}
PriorityQueue manages tasks with priority
func NewPriorityQueue ¶
func NewPriorityQueue() *PriorityQueue
NewPriorityQueue creates a new priority queue
func (*PriorityQueue) Clear ¶
func (pq *PriorityQueue) Clear(ctx context.Context) error
Clear removes all tasks
func (*PriorityQueue) Peek ¶
func (pq *PriorityQueue) Peek(ctx context.Context) (*Task, error)
Peek retrieves but doesn't remove the highest priority task
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop(ctx context.Context) (*Task, error)
Pop retrieves highest priority task
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(ctx context.Context, task *Task) error
Push adds a task to the appropriate priority queue
func (*PriorityQueue) PushBatch ¶
func (pq *PriorityQueue) PushBatch(ctx context.Context, tasks []*Task) error
PushBatch adds multiple tasks
func (*PriorityQueue) Size ¶
func (pq *PriorityQueue) Size(ctx context.Context) (int64, error)
Size returns total queue size
func (*PriorityQueue) Stats ¶
func (pq *PriorityQueue) Stats(ctx context.Context) (*QueueStats, error)
Stats returns queue statistics
type Processor ¶
type Processor struct {
Name string `bson:"name" json:"name"` // 处理器名称
Type string `bson:"type" json:"type"` // 处理器类型
Config map[string]any `bson:"config" json:"config"` // 处理器配置
}
Processor 处理器
type ProxyConfig ¶
type ProxyConfig struct {
Enabled bool `bson:"enabled" json:"enabled"`
Type string `bson:"type" json:"type"` // 代理类型 (http, socks5)
Rotation bool `bson:"rotation" json:"rotation"`
Servers []string `bson:"servers" json:"servers"`
}
ProxyConfig 代理配置
type Queue ¶
type Queue interface {
// Push adds a task to the queue
Push(ctx context.Context, task *Task) error
// PushBatch adds multiple tasks
PushBatch(ctx context.Context, tasks []*Task) error
// Pop retrieves and removes a task from the queue
Pop(ctx context.Context) (*Task, error)
// PopN retrieves and removes N tasks from the queue
PopN(ctx context.Context, n int) ([]*Task, error)
// Peek retrieves but doesn't remove a task
Peek(ctx context.Context) (*Task, error)
// Size returns queue size
Size(ctx context.Context) (int64, error)
// Clear removes all tasks
Clear(ctx context.Context) error
// Stats returns queue statistics
Stats(ctx context.Context) (*QueueStats, error)
}
Queue interface for task queue
type QueueStats ¶
type QueueStats struct {
Total int64 `json:"total"`
ByStatus map[Status]int64 `json:"by_status"`
ByPriority map[Priority]int64 `json:"by_priority"`
ByType map[Type]int64 `json:"by_type"`
OldestTask time.Time `json:"oldest_task"`
NewestTask time.Time `json:"newest_task"`
}
QueueStats contains queue statistics
type RateLimitConfig ¶
type RateLimitConfig struct {
Enabled bool `bson:"enabled" json:"enabled"`
RequestsPerMin int `bson:"requests_per_min" json:"requests_per_min"`
RequestsPerHour int `bson:"requests_per_hour" json:"requests_per_hour"`
BurstSize int `bson:"burst_size" json:"burst_size"`
}
RateLimitConfig 速率限制配置
type RedisQueue ¶
type RedisQueue struct {
// contains filtered or unexported fields
}
RedisQueue is a Redis-based distributed queue
func NewRedisQueue ¶
func NewRedisQueue(client *redis.Client, keyPrefix string) *RedisQueue
NewRedisQueue creates a new Redis queue
func (*RedisQueue) Clear ¶
func (rq *RedisQueue) Clear(ctx context.Context) error
Clear removes all tasks
func (*RedisQueue) Peek ¶
func (rq *RedisQueue) Peek(ctx context.Context) (*Task, error)
Peek retrieves but doesn't remove highest priority task
func (*RedisQueue) Pop ¶
func (rq *RedisQueue) Pop(ctx context.Context) (*Task, error)
Pop retrieves and removes highest priority task
func (*RedisQueue) Push ¶
func (rq *RedisQueue) Push(ctx context.Context, task *Task) error
Push adds a task to the Redis queue
func (*RedisQueue) PushBatch ¶
func (rq *RedisQueue) PushBatch(ctx context.Context, tasks []*Task) error
PushBatch adds multiple tasks
func (*RedisQueue) Size ¶
func (rq *RedisQueue) Size(ctx context.Context) (int64, error)
Size returns total queue size
func (*RedisQueue) Stats ¶
func (rq *RedisQueue) Stats(ctx context.Context) (*QueueStats, error)
Stats returns queue statistics
type RedisSource ¶
type RedisSource struct {
// contains filtered or unexported fields
}
RedisSource loads configurations from Redis
func NewRedisSource ¶
NewRedisSource creates a new Redis configuration source
func (*RedisSource) Load ¶
func (rs *RedisSource) Load(ctx context.Context) ([]*TaskConfig, error)
Load loads configurations from Redis
type RequestTemplate ¶
type RequestTemplate struct {
Method string `bson:"method" json:"method"` // 请求方法
URL string `bson:"url" json:"url"` // URL模板(支持变量)
Headers map[string]string `bson:"headers" json:"headers"` // 请求头
Cookies map[string]string `bson:"cookies" json:"cookies"` // Cookies
QueryParams map[string]any `bson:"query_params" json:"query_params"` // 查询参数
Body any `bson:"body" json:"body"` // 请求体(支持模板)
Auth AuthConfig `bson:"auth" json:"auth"` // 认证配置
Proxy ProxyConfig `bson:"proxy" json:"proxy"` // 代理配置
Variables []Variable `bson:"variables" json:"variables"` // 变量定义
Pagination PaginationConfig `bson:"pagination" json:"pagination"` // 分页配置
}
RequestTemplate 请求模板 - 通用HTTP/WebSocket/自定义协议
type Response ¶
type Response struct {
StatusCode int
Headers map[string][]string
Body []byte
URL string // Final URL after redirects
// For browser-based fetching
HTML string
Screenshot []byte
// Metadata
Duration int64 // Response time in milliseconds
Size int64 // Content size in bytes
}
Response represents the fetch response
type RoutingRule ¶
type RoutingRule struct {
Name string
Condition func(*Task) bool
Target string // Node ID or tag
Priority int
}
RoutingRule defines task routing rules
type Schedule ¶
type Schedule struct {
ID string `json:"id"`
Name string `json:"name"`
Type ScheduleType `json:"type"`
Expression string `json:"expression"` // Cron expression or interval
TaskTemplate *Task `json:"task_template"`
Enabled bool `json:"enabled"`
LastRun *time.Time `json:"last_run,omitempty"`
NextRun *time.Time `json:"next_run,omitempty"`
RunCount int `json:"run_count"`
MaxRuns int `json:"max_runs"` // 0 for unlimited
Metadata map[string]interface{} `json:"metadata,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
Schedule defines a task schedule
type ScheduleBuilder ¶
type ScheduleBuilder struct {
// contains filtered or unexported fields
}
ScheduleBuilder helps build schedules
func NewScheduleBuilder ¶
func NewScheduleBuilder(name string) *ScheduleBuilder
NewScheduleBuilder creates a new schedule builder
func (*ScheduleBuilder) Build ¶
func (sb *ScheduleBuilder) Build() *Schedule
Build builds the schedule
func (*ScheduleBuilder) WithCron ¶
func (sb *ScheduleBuilder) WithCron(expression string) *ScheduleBuilder
WithCron sets cron expression
func (*ScheduleBuilder) WithEvent ¶
func (sb *ScheduleBuilder) WithEvent(eventType string) *ScheduleBuilder
WithEvent sets event trigger
func (*ScheduleBuilder) WithInterval ¶
func (sb *ScheduleBuilder) WithInterval(interval time.Duration) *ScheduleBuilder
WithInterval sets interval
func (*ScheduleBuilder) WithMaxRuns ¶
func (sb *ScheduleBuilder) WithMaxRuns(maxRuns int) *ScheduleBuilder
WithMaxRuns sets maximum runs
func (*ScheduleBuilder) WithOnce ¶
func (sb *ScheduleBuilder) WithOnce(at time.Time) *ScheduleBuilder
WithOnce sets one-time execution
func (*ScheduleBuilder) WithTask ¶
func (sb *ScheduleBuilder) WithTask(task *Task) *ScheduleBuilder
WithTask sets task template
type ScheduleCond ¶
type ScheduleCond struct {
Type string `bson:"type" json:"type"` // 条件类型
Expression string `bson:"expression" json:"expression"` // 条件表达式
}
ScheduleCond 调度条件
type ScheduleConfig ¶
type ScheduleConfig struct {
Type string `bson:"type" json:"type"` // 调度类型 (cron, interval, once, manual)
Expression string `bson:"expression" json:"expression"` // Cron表达式或间隔
Timezone string `bson:"timezone" json:"timezone"` // 时区
StartTime *time.Time `bson:"start_time" json:"start_time"` // 开始时间
EndTime *time.Time `bson:"end_time" json:"end_time"` // 结束时间
Conditions []ScheduleCond `bson:"conditions" json:"conditions"` // 执行条件
}
ScheduleConfig 调度配置
type ScheduleType ¶
type ScheduleType string
ScheduleType defines schedule types
const ( ScheduleTypeOnce ScheduleType = "once" ScheduleTypeInterval ScheduleType = "interval" ScheduleTypeCron ScheduleType = "cron" ScheduleTypeEvent ScheduleType = "event" )
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler manages task scheduling
func NewScheduler ¶
func NewScheduler(config *SchedulerConfig) *Scheduler
NewScheduler creates a new scheduler
func (*Scheduler) AddSchedule ¶
AddSchedule adds a new schedule
func (*Scheduler) DisableSchedule ¶
DisableSchedule disables a schedule
func (*Scheduler) EnableSchedule ¶
EnableSchedule enables a schedule
func (*Scheduler) GetSchedule ¶
GetSchedule returns a schedule by ID
func (*Scheduler) ListSchedules ¶
ListSchedules returns all schedules
func (*Scheduler) RemoveSchedule ¶
RemoveSchedule removes a schedule
func (*Scheduler) TriggerEvent ¶
TriggerEvent triggers event-based schedules
func (*Scheduler) TriggerSchedule ¶
TriggerSchedule manually triggers a schedule
type SchedulerConfig ¶
type SchedulerConfig struct {
Dispatcher *Dispatcher
Logger *slog.Logger
}
SchedulerConfig contains scheduler configuration
type Script ¶
type Script struct {
Language string `bson:"language" json:"language"` // 脚本语言 (javascript, python, lua)
Code string `bson:"code" json:"code"` // 脚本代码
}
Script 脚本
type ScriptConfig ¶
type ScriptConfig struct {
ID string `json:"id" yaml:"id"`
Name string `json:"name" yaml:"name"`
Description string `json:"description" yaml:"description"`
Language string `json:"language" yaml:"language"` // lua, javascript
Source string `json:"source" yaml:"source"` // inline, file, url
Content string `json:"content" yaml:"content"`
File string `json:"file" yaml:"file"`
URL string `json:"url" yaml:"url"`
Parameters map[string]interface{} `json:"parameters" yaml:"parameters"`
Schedule string `json:"schedule" yaml:"schedule"` // Cron expression
}
ScriptConfig represents a Lua script configuration
type SeedExpander ¶
SeedExpander expands seed tasks into new tasks
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service 任务服务 - 提供任务管理的完整功能
func (*Service) CreateTask ¶
func (s *Service) CreateTask(task *TaskDocument) (*TaskDocument, error)
CreateTask 创建任务
func (*Service) GetReadyTasks ¶
func (s *Service) GetReadyTasks() ([]*TaskDocument, error)
GetReadyTasks 获取待执行的任务
func (*Service) GetTask ¶
func (s *Service) GetTask(id string) (*TaskDocument, error)
GetTask 获取单个任务
func (*Service) ListTasks ¶
func (s *Service) ListTasks(filter TaskFilter) ([]*TaskDocument, error)
ListTasks 列出任务
func (*Service) UpdateTask ¶
UpdateTask 更新任务
func (*Service) UpdateTaskRaw ¶
func (s *Service) UpdateTaskRaw(id primitive.ObjectID, update interface{}) (*mongo.UpdateResult, error)
UpdateTaskRaw 使用原始更新文档更新任务
type Storage ¶
type Storage interface {
// Save saves the processed data
Save(ctx context.Context, data interface{}) error
// SaveBatch saves multiple items
SaveBatch(ctx context.Context, items []interface{}) error
// GetType returns the storage type (mongodb, mysql, redis, file, etc.)
GetType() string
// Close closes the storage connection
Close() error
}
Storage defines the interface for data storage
type StorageConfig ¶
type StorageConfig struct {
Type string `json:"type"` // mongodb, mysql, redis, file, s3
Database string `json:"database,omitempty"`
Collection string `json:"collection,omitempty"` // For MongoDB
Table string `json:"table,omitempty"` // For SQL databases
Bucket string `json:"bucket,omitempty"` // For S3
Path string `json:"path,omitempty"` // For file storage
Format string `json:"format,omitempty"` // json, csv, parquet
// Additional configuration
Options map[string]interface{} `json:"options,omitempty"`
}
StorageConfig defines storage configuration
type StorageConfiguration ¶
type StorageConfiguration struct {
Targets []StorageTarget `bson:"targets" json:"targets"` // 存储目标
}
StorageConfiguration 存储配置 - 支持多种存储后端
type StorageTarget ¶
type StorageTarget struct {
Name string `bson:"name" json:"name"` // 目标名称
Type string `bson:"type" json:"type"` // 存储类型 (mongodb, mysql, redis, elasticsearch, file, s3)
Purpose string `bson:"purpose" json:"purpose"` // 用途 (primary, backup, cache, archive)
Config map[string]any `bson:"config" json:"config"` // 存储配置
Conditions []string `bson:"conditions" json:"conditions"` // 存储条件
}
StorageTarget 存储目标
type Task ¶
type Task struct {
// Identity
ID string `json:"id"`
ParentID string `json:"parent_id,omitempty"` // MongoDB task ID
Name string `json:"name,omitempty"` // Task name from MongoDB
URL string `json:"url"`
Hash string `json:"hash"` // URL hash for deduplication
Type string `json:"type"` // Changed to string for flexibility
Priority int `json:"priority"`
Status Status `json:"status"`
NodeID string `json:"node_id,omitempty"` // Assigned node
// Retry management
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
RetryDelay time.Duration `json:"retry_delay"`
LastError string `json:"last_error,omitempty"`
// Timing
CreatedAt time.Time `json:"created_at"`
QueuedAt *time.Time `json:"queued_at,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
ExpiresAt *time.Time `json:"expires_at,omitempty"`
// Configuration
Method string `json:"method"` // GET, POST, etc.
Headers map[string]string `json:"headers,omitempty"`
Body []byte `json:"body,omitempty"`
Cookies map[string]string `json:"cookies,omitempty"`
Proxy string `json:"proxy,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
Timeout time.Duration `json:"timeout"`
FollowRedirect bool `json:"follow_redirect"`
// Data extraction
ExtractRules []ExtractRule `json:"extract_rules,omitempty"`
Pipeline Pipeline `json:"-"` // Pipeline interface (not serialized)
PipelineID string `json:"pipeline_id,omitempty"` // Pipeline identifier for reconstruction
Storage Storage `json:"-"` // Storage interface (not serialized)
StorageConf StorageConfig `json:"storage_config,omitempty"` // Storage configuration
// Metadata
Metadata map[string]interface{} `json:"metadata,omitempty"`
Tags []string `json:"tags,omitempty"`
Depth int `json:"depth"` // Crawl depth from seed
Source string `json:"source,omitempty"` // Source identifier
// Lua script support
ProjectID string `json:"project_id,omitempty"` // Project identifier for Lua scripts
LuaScript string `json:"lua_script,omitempty"` // Lua script name (not path)
// Callbacks
OnSuccess string `json:"on_success,omitempty"` // Webhook or queue for success
OnFailure string `json:"on_failure,omitempty"` // Webhook or queue for failure
OnComplete string `json:"on_complete,omitempty"` // Webhook or queue for completion
}
Task represents a crawler task
func NewSeedTask ¶
func NewSeedTask(url string, extractRules []ExtractRule) *Task
NewSeedTask creates a seed task that generates more tasks
func (*Task) GetCreatedAt ¶
GetCreatedAt returns creation time
func (*Task) GetMetadata ¶
GetMetadata returns all metadata
func (*Task) SetCookies ¶
SetCookies sets request cookies
func (*Task) SetHeaders ¶
SetHeaders sets request headers
func (*Task) SetMetadata ¶
SetMetadata sets a metadata value
type TaskAllocator ¶
type TaskAllocator interface {
Allocate(ctx context.Context, instance *TaskInstance, nodes []NodeInfo) (*NodeInfo, error)
Release(ctx context.Context, instance *TaskInstance) error
}
TaskAllocator allocates tasks to nodes
type TaskConfig ¶
type TaskConfig struct {
Type string `json:"type" yaml:"type"` // definition, template, script
Definition *TaskDef `json:"definition" yaml:"definition"`
Template *TaskTemplate `json:"template" yaml:"template"`
Script *ScriptConfig `json:"script" yaml:"script"`
Metadata map[string]interface{} `json:"metadata" yaml:"metadata"`
}
TaskConfig represents a task configuration
type TaskConfiguration ¶
type TaskConfiguration struct {
Priority int `bson:"priority" json:"priority"` // 优先级 (1-10)
Timeout int `bson:"timeout" json:"timeout"` // 超时时间(秒)
MaxRetries int `bson:"max_retries" json:"max_retries"` // 最大重试次数
RetryDelay int `bson:"retry_delay" json:"retry_delay"` // 重试延迟(秒)
Concurrency int `bson:"concurrency" json:"concurrency"` // 并发数
RateLimit RateLimitConfig `bson:"rate_limit" json:"rate_limit"` // 速率限制
Parameters map[string]any `bson:"parameters" json:"parameters"` // 自定义参数
}
TaskConfiguration 通用任务配置
type TaskDef ¶
type TaskDef struct {
// Basic info
ID string `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
Description string `json:"description"`
Author string `json:"author"`
Category string `json:"category"` // seed, page, api, browser
// Task type
Type Type `json:"type"`
Priority Priority `json:"priority"`
// Request configuration
Method string `json:"method"`
Headers map[string]string `json:"headers"`
DefaultTimeout time.Duration `json:"default_timeout"`
MaxRetries int `json:"max_retries"`
RetryInterval time.Duration `json:"retry_interval"`
// Extraction rules
ExtractRules []ExtractRule `json:"extract_rules"`
LinkRules []LinkRule `json:"link_rules"` // For generating child tasks
// Processing
Pipeline string `json:"pipeline"` // Pipeline configuration name
Processors []string `json:"processors"` // Processor names
Storage string `json:"storage"` // Storage configuration name
Deduplication DeduplicationConfig `json:"deduplication"`
// Validation
Validator TaskValidator `json:"-"`
Schema interface{} `json:"schema"` // JSON schema for validation
// Hooks
Hooks TaskHooks `json:"-"`
// Tags and metadata
Tags []string `json:"tags"`
Labels map[string]string `json:"labels"`
Metadata map[string]interface{} `json:"metadata"`
// Status
Active bool `json:"active"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
TaskDef defines a type of crawler task (legacy)
func (*TaskDef) CreateTask ¶
CreateTask creates a task from this definition
type TaskDocument ¶
type TaskDocument struct {
ID primitive.ObjectID `bson:"_id,omitempty" json:"id"`
Name string `bson:"name" json:"name"` // 任务名称
Type string `bson:"type" json:"type"` // 任务类型 (http, api, websocket, custom)
Category string `bson:"category" json:"category"` // 任务分类 (sports, news, e-commerce, etc)
Description string `bson:"description" json:"description"` // 任务描述
Version string `bson:"version" json:"version"` // 任务版本
Status TaskStatus `bson:"status" json:"status"` // 任务状态
Config TaskConfiguration `bson:"config" json:"config"` // 通用配置
Schedule ScheduleConfig `bson:"schedule" json:"schedule"` // 调度配置
Execution ExecutionConfig `bson:"execution" json:"execution"` // 执行配置
Request RequestTemplate `bson:"request" json:"request"` // 请求模板
Extraction ExtractionConfig `bson:"extraction" json:"extraction"` // 数据提取配置
Transform TransformConfig `bson:"transform" json:"transform"` // 数据转换配置
Validation ValidationConfig `bson:"validation" json:"validation"` // 数据验证配置
Storage StorageConfiguration `bson:"storage" json:"storage"` // 存储配置
Monitoring MonitoringConfig `bson:"monitoring" json:"monitoring"` // 监控配置
Metadata map[string]any `bson:"metadata" json:"metadata"` // 元数据
Tags []string `bson:"tags" json:"tags"` // 标签
// Lua脚本相关配置
ProjectID string `bson:"project_id" json:"project_id"` // 项目ID(用于数据分组)
LuaScript string `bson:"lua_script" json:"lua_script"` // Lua脚本名称标识(如: drip_ground_board)
CreatedAt time.Time `bson:"created_at" json:"created_at"`
UpdatedAt time.Time `bson:"updated_at" json:"updated_at"`
CreatedBy string `bson:"created_by" json:"created_by"`
UpdatedBy string `bson:"updated_by" json:"updated_by"`
}
TaskDocument MongoDB中的任务文档结构 - 完全通用的设计
type TaskFilter ¶
type TaskFilter struct {
Type string
Category string
Enabled *bool
Tags []string
SortBy string
SortDesc bool
Limit int
Offset int
}
TaskFilter 任务过滤器
type TaskHooks ¶
type TaskHooks struct {
BeforeExecute func(ctx context.Context, task *Task) error
AfterExecute func(ctx context.Context, task *Task, result interface{}) error
OnSuccess func(ctx context.Context, task *Task, data interface{}) error
OnFailure func(ctx context.Context, task *Task, err error) error
OnRetry func(ctx context.Context, task *Task, attempt int) error
}
TaskHooks defines lifecycle hooks
type TaskInstance ¶
type TaskInstance struct {
ID string `json:"id"`
DefinitionID string `json:"definition_id"`
Task *Task `json:"task"`
State TaskState `json:"state"`
NodeID string `json:"node_id"`
WorkerID string `json:"worker_id"`
// Context
Context map[string]interface{} `json:"context"`
Variables map[string]interface{} `json:"variables"`
// Timing
CreatedAt time.Time `json:"created_at"`
StartedAt *time.Time `json:"started_at"`
CompletedAt *time.Time `json:"completed_at"`
// Results
Result interface{} `json:"result"`
Error string `json:"error"`
RetryCount int `json:"retry_count"`
// Dependencies
Dependencies []string `json:"dependencies"`
DependsOn []string `json:"depends_on"`
// Monitoring
LastHeartbeat time.Time `json:"last_heartbeat"`
Progress float64 `json:"progress"`
Metrics map[string]interface{} `json:"metrics"`
}
TaskInstance represents a running task instance
type TaskManager ¶
type TaskManager struct {
// contains filtered or unexported fields
}
TaskManager manages task instances and their lifecycle
func NewTaskManager ¶
func NewTaskManager(config *ManagerConfig) *TaskManager
NewTaskManager creates a new task manager
func (*TaskManager) AllocateTask ¶
func (tm *TaskManager) AllocateTask(ctx context.Context, instanceID string) error
AllocateTask allocates a task to a node
func (*TaskManager) CreateInstance ¶
func (tm *TaskManager) CreateInstance(definitionID string, params map[string]interface{}) (*TaskInstance, error)
CreateInstance creates a new task instance
func (*TaskManager) CreateInstanceFromTemplate ¶
func (tm *TaskManager) CreateInstanceFromTemplate(templateID string, params map[string]interface{}) (*TaskInstance, error)
CreateInstanceFromTemplate creates instance from template
func (*TaskManager) GetInstance ¶
func (tm *TaskManager) GetInstance(instanceID string) (*TaskInstance, error)
GetInstance retrieves a task instance
func (*TaskManager) GetMetrics ¶
func (tm *TaskManager) GetMetrics() map[string]interface{}
GetMetrics returns task manager metrics
func (*TaskManager) Start ¶
func (tm *TaskManager) Start(ctx context.Context) error
Start starts the task manager
func (*TaskManager) UpdateProgress ¶
func (tm *TaskManager) UpdateProgress(instanceID string, progress float64, metrics map[string]interface{}) error
UpdateProgress updates task progress
func (*TaskManager) UpdateState ¶
func (tm *TaskManager) UpdateState(instanceID string, state TaskState, metadata map[string]interface{}) error
UpdateState updates task instance state
type TaskRegistry ¶
type TaskRegistry struct {
// contains filtered or unexported fields
}
TaskRegistry manages task definitions and templates
func NewTaskRegistry ¶
NewTaskRegistry creates a new task registry
func (*TaskRegistry) CreateTaskFromTemplate ¶
func (tr *TaskRegistry) CreateTaskFromTemplate(templateID string, params map[string]interface{}) (*Task, error)
CreateTaskFromTemplate creates a task from a template
func (*TaskRegistry) GetDefinition ¶
func (tr *TaskRegistry) GetDefinition(id string) (*TaskDef, error)
GetDefinition retrieves a task definition
func (*TaskRegistry) GetTemplate ¶
func (tr *TaskRegistry) GetTemplate(id string) (*TaskTemplate, error)
GetTemplate retrieves a task template
func (*TaskRegistry) GetValidator ¶
func (tr *TaskRegistry) GetValidator(name string) (TaskValidator, error)
GetValidator gets a task validator
func (*TaskRegistry) ListDefinitions ¶
func (tr *TaskRegistry) ListDefinitions() ([]*TaskDef, error)
ListDefinitions lists all task definitions
func (*TaskRegistry) LoadDefinitionsFromRedis ¶
func (tr *TaskRegistry) LoadDefinitionsFromRedis(ctx context.Context) error
LoadDefinitionsFromRedis loads all definitions from Redis
func (*TaskRegistry) LoadTemplatesFromRedis ¶
func (tr *TaskRegistry) LoadTemplatesFromRedis(ctx context.Context) error
LoadTemplatesFromRedis loads all templates from Redis
func (*TaskRegistry) RegisterDefinition ¶
func (tr *TaskRegistry) RegisterDefinition(def *TaskDef) error
RegisterDefinition registers a task definition
func (*TaskRegistry) RegisterTemplate ¶
func (tr *TaskRegistry) RegisterTemplate(tmpl *TaskTemplate) error
RegisterTemplate registers a task template
func (*TaskRegistry) RegisterValidator ¶
func (tr *TaskRegistry) RegisterValidator(name string, validator TaskValidator)
RegisterValidator registers a task validator
type TaskResult ¶
type TaskResult struct {
TaskID string `json:"task_id"`
Status string `json:"status"`
Message string `json:"message,omitempty"`
Data map[string]interface{} `json:"data,omitempty"`
Links []string `json:"links,omitempty"`
Error string `json:"error,omitempty"`
// Metrics
StartTime int64 `json:"start_time"`
EndTime int64 `json:"end_time"`
Duration int64 `json:"duration"` // in milliseconds
BytesFetched int64 `json:"bytes_fetched"`
ItemsExtracted int `json:"items_extracted"`
}
TaskResult represents task execution result
type TaskRouter ¶
type TaskRouter struct {
// contains filtered or unexported fields
}
TaskRouter routes tasks based on rules
func (*TaskRouter) AddRule ¶
func (tr *TaskRouter) AddRule(rule RoutingRule)
AddRule adds a routing rule
func (*TaskRouter) Route ¶
func (tr *TaskRouter) Route(task *Task) string
Route determines where to route a task
type TaskScheduler ¶
type TaskScheduler interface {
// AddTask adds a task to the scheduler
AddTask(task *Task) error
// GetNextTask gets the next task to execute
GetNextTask() (*Task, error)
// UpdateTask updates task status
UpdateTask(task *Task) error
// GetStats returns scheduler statistics
GetStats() map[string]interface{}
}
TaskScheduler defines the interface for task scheduling
type TaskState ¶
type TaskState string
TaskState represents task execution state
const ( TaskStateCreated TaskState = "created" TaskStateAllocated TaskState = "allocated" TaskStateRunning TaskState = "running" TaskStatePaused TaskState = "paused" TaskStateCompleted TaskState = "completed" TaskStateFailed TaskState = "failed" TaskStateCancelled TaskState = "cancelled" TaskStateTimeout TaskState = "timeout" )
type TaskStatus ¶
type TaskStatus struct {
Enabled bool `bson:"enabled" json:"enabled"` // 是否启用
LastRun time.Time `bson:"last_run" json:"last_run"` // 最后运行时间
NextRun time.Time `bson:"next_run" json:"next_run"` // 下次运行时间
RunCount int64 `bson:"run_count" json:"run_count"` // 运行次数
SuccessCount int64 `bson:"success_count" json:"success_count"` // 成功次数
FailureCount int64 `bson:"failure_count" json:"failure_count"` // 失败次数
LastSuccess time.Time `bson:"last_success" json:"last_success"` // 最后成功时间
LastFailure time.Time `bson:"last_failure" json:"last_failure"` // 最后失败时间
ErrorMessage string `bson:"error_message" json:"error_message"` // 错误信息
}
TaskStatus 任务状态
type TaskTemplate ¶
type TaskTemplate struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Definition string `json:"definition"` // Definition ID
// URL generation
URLPattern string `json:"url_pattern"`
URLParams map[string]interface{} `json:"url_params"`
// Overrides
Headers map[string]string `json:"headers"`
Metadata map[string]interface{} `json:"metadata"`
Priority *Priority `json:"priority"`
MaxRetries *int `json:"max_retries"`
// Schedule
Schedule string `json:"schedule"` // Cron expression
Enabled bool `json:"enabled"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
TaskTemplate is a template for creating tasks
type TaskValidator ¶
type TaskValidator interface {
Validate(task *Task) error
ValidateData(data map[string]interface{}) error
}
TaskValidator validates task data
type Template ¶
type Template struct {
Name string `bson:"name" json:"name"` // 模板名称
Content string `bson:"content" json:"content"` // 模板内容
}
Template 模板
type TracingConfig ¶
type TracingConfig struct {
Enabled bool `bson:"enabled" json:"enabled"`
SampleRate float64 `bson:"sample_rate" json:"sample_rate"`
ServiceName string `bson:"service_name" json:"service_name"`
}
TracingConfig 追踪配置
type Transform ¶
type Transform struct {
Type string `bson:"type" json:"type"` // 转换类型
Config map[string]any `bson:"config" json:"config"` // 转换配置
}
Transform 转换规则
type TransformConfig ¶
type TransformConfig struct {
Enabled bool `bson:"enabled" json:"enabled"`
Processors []Processor `bson:"processors" json:"processors"` // 处理器链
}
TransformConfig 数据转换配置
type TransformRule ¶
type TransformRule struct {
Type string `json:"type"` // trim, replace, regex, format, split, join
Params map[string]interface{} `json:"params,omitempty"`
}
TransformRule defines data transformation
type Type ¶
type Type string
Type defines task types
const ( TypeSeed Type = "seed" // Seed URL that generates more tasks TypeDetail Type = "detail" // Detail page crawling TypeList Type = "list" // List page crawling TypeAPI Type = "api" // API endpoint crawling TypeBrowser Type = "browser" // Browser-based crawling (JS rendering) TypeAggregate Type = "aggregate" // Aggregate multiple sources )
type Validation ¶
type Validation struct {
Type string `bson:"type" json:"type"` // 验证类型
Field string `bson:"field" json:"field"` // 字段
Rule string `bson:"rule" json:"rule"` // 规则表达式
Message string `bson:"message" json:"message"` // 错误消息
}
Validation 验证规则
type ValidationConfig ¶
type ValidationConfig struct {
Enabled bool `bson:"enabled" json:"enabled"`
Rules []Validation `bson:"rules" json:"rules"` // 验证规则
}
ValidationConfig 数据验证配置
type Variable ¶
type Variable struct {
Name string `bson:"name" json:"name"` // 变量名
Type string `bson:"type" json:"type"` // 类型 (string, number, date, array)
Source string `bson:"source" json:"source"` // 来源 (env, context, function, static)
Expression string `bson:"expression" json:"expression"` // 表达式
Default any `bson:"default" json:"default"` // 默认值
Strategy *DateStrategy `bson:"strategy,omitempty" json:"strategy,omitempty"` // 日期策略(仅当Type为date时使用)
}
Variable 变量定义
type Writer ¶
type Writer interface {
io.Writer
io.Closer
// WriteRecord writes a single record
WriteRecord(record interface{}) error
// WriteBatch writes multiple records
WriteBatch(records []interface{}) error
// Flush flushes any buffered data
Flush() error
}
Writer defines the interface for writing output