core

package
v0.0.0-...-a79db5c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2018 License: BSD-2-Clause Imports: 40 Imported by: 0

Documentation

Index

Constants

View Source
const (
	JOB_STAT_INIT             = "init"        // inital state
	JOB_STAT_QUEUING          = "queuing"     // transition from "init" to "queued"
	JOB_STAT_QUEUED           = "queued"      // all tasks have been added to taskmap
	JOB_STAT_INPROGRESS       = "in-progress" // a first task went into state in-progress
	JOB_STAT_COMPLETED        = "completed"
	JOB_STAT_SUSPEND          = "suspend"
	JOB_STAT_FAILED_PERMANENT = "failed-permanent" // this sepcific error state can be trigger by the workflow software
	JOB_STAT_DELETED          = "deleted"
)
View Source
const (
	TASK_STAT_INIT             = "init"        // initial state on creation of a task
	TASK_STAT_PENDING          = "pending"     // a task that wants to be enqueued
	TASK_STAT_READY            = "ready"       // a task ready to be enqueued
	TASK_STAT_QUEUED           = "queued"      // a task for which workunits have been created/queued
	TASK_STAT_INPROGRESS       = "in-progress" // a first workunit has been checkout (this does not guarantee a workunit is running right now)
	TASK_STAT_SUSPEND          = "suspend"
	TASK_STAT_FAILED           = "failed"
	TASK_STAT_FAILED_PERMANENT = "failed-permanent" // on exit code 42
	TASK_STAT_COMPLETED        = "completed"
	TASK_STAT_SKIPPED          = "user_skipped" // deprecated
	TASK_STAT_FAIL_SKIP        = "skipped"      // deprecated
	TASK_STAT_PASSED           = "passed"       // deprecated ?
)
View Source
const (
	TASK_TYPE_UNKNOWN  = ""
	TASK_TYPE_SCATTER  = "scatter"
	TASK_TYPE_WORKFLOW = "workflow"
	TASK_TYPE_NORMAL   = "normal"
)
View Source
const (
	WORK_STAT_INIT             = "init"             // initial state
	WORK_STAT_QUEUED           = "queued"           // after requeue ; after failures below max ; on WorkQueue.Add()
	WORK_STAT_RESERVED         = "reserved"         // short lived state between queued and checkout. when a worker checks the workunit out, the state is reserved.
	WORK_STAT_CHECKOUT         = "checkout"         // normal work checkout ; client registers that already has a workunit (e.g. after reboot of server)
	WORK_STAT_SUSPEND          = "suspend"          // on MAX_FAILURE ; on SuspendJob
	WORK_STAT_FAILED_PERMANENT = "failed-permanent" // app had exit code 42
	WORK_STAT_DONE             = "done"             // client only: done
	WORK_STAT_ERROR            = "fail"             // client only: workunit computation or IO error (variable was renamed to ERROR but not the string fail, to maintain backwards compability)
	WORK_STAT_PREPARED         = "prepared"         // client only: after argument parsing
	WORK_STAT_COMPUTED         = "computed"         // client only: after computation is done, before upload
	WORK_STAT_DISCARDED        = "discarded"        // client only: job / task suspended or server UUID changes
	WORK_STAT_PROXYQUEUED      = "proxyqueued"      // proxy only
)

Variables

View Source
var (
	QMgr          ResourceMgr
	Service       string = "unknown"
	Self          *Client
	ProxyWorkChan chan bool
	Server_UUID   string
	JM            *JobMap
	Start_time    time.Time
)
View Source
var (
	CGNameRegex = regexp.MustCompile(`^[A-Za-z0-9\_\-\.]+$`)
)
View Source
var DocumentMaxByte = 16777216

mongodb has hard limit of 16 MB docuemnt size

View Source
var JobInfoIndexes = []string{"name", "submittime", "completedtime", "pipeline", "clientgroups", "project", "service", "user", "priority", "userattr.submission"}

indexed info fields for search

Functions

func CWL_input_check

func CWL_input_check(job_input *cwl.Job_document, cwl_workflow *cwl.Workflow) (err error)

func DBGetJobAcl

func DBGetJobAcl(job_id string) (_acl acl.Acl, err error)

func DbFindDistinct

func DbFindDistinct(q bson.M, d string) (results interface{}, err error)

func DbUpdateJobField

func DbUpdateJobField(job_id string, key string, value interface{}) (err error)

func DeleteClientGroup

func DeleteClientGroup(id string) (err error)

func Deserialize_b64

func Deserialize_b64(encoding string, target interface{}) (err error)

func GetAdminView

func GetAdminView(special string) (data []interface{}, err error)

patch the admin view data function from the job controller through to the db.go

func GetJobCount

func GetJobCount(q bson.M) (count int, err error)

func GetJobIdByTaskId_deprecated

func GetJobIdByTaskId_deprecated(taskid string) (jobid string, err error)

func GetJobIdByWorkId_deprecated

func GetJobIdByWorkId_deprecated(workid string) (jobid string, err error)

func GetTaskIdByWorkId_deprecated

func GetTaskIdByWorkId_deprecated(workid string) (taskid string, err error)

func HasInfoField

func HasInfoField(a string) bool

func InitAwfMgr

func InitAwfMgr()

func InitClientGroupDB

func InitClientGroupDB()

func InitJobDB

func InitJobDB()

func InitProxyWorkChan

func InitProxyWorkChan()

func InitReaper

func InitReaper()

func InitResMgr

func InitResMgr(service string)

func IsFirstTask

func IsFirstTask(taskid string) bool

func IsValidUUID

func IsValidUUID(uuid string) bool

func NotifyWorkunitProcessed

func NotifyWorkunitProcessed(work *Workunit, perf *WorkPerf) (err error)

functions for REST API communication (=deprecated=) notify AWE server a workunit is finished with status either "failed" or "done", and with perf statistics if "done"

func PushOutputData

func PushOutputData(work *Workunit) (size int64, err error)

deprecated, see cache.UploadOutputData

func ReloadFromDisk

func ReloadFromDisk(path string) (err error)

func RemoveWorkFromClient

func RemoveWorkFromClient(client *Client, workid Workunit_Unique_Identifier) (err error)

func SetClientProfile

func SetClientProfile(profile *Client)

func UpdateJobState_deprecated

func UpdateJobState_deprecated(jobid string, newstate string, oldstates []string) (err error)

update job state to "newstate" only if the current state is in one of the "oldstates" // TODO make this a job.SetState function

Types

type BaseResponse

type BaseResponse struct {
	Status int      `json:"status"`
	Error  []string `json:"error"`
}

type CQMgr

type CQMgr struct {
	// contains filtered or unexported fields
}

this struct is embedded in ServerMgr

func (*CQMgr) AddClient

func (qm *CQMgr) AddClient(client *Client, lock bool) (err error)

func (*CQMgr) CheckClient

func (qm *CQMgr) CheckClient(client *Client) (ok bool, err error)

func (*CQMgr) CheckoutWorkunits

func (qm *CQMgr) CheckoutWorkunits(req_policy string, client_id string, client *Client, available_bytes int64, num int) (workunits []*Workunit, err error)

func (*CQMgr) ClientChecker

func (qm *CQMgr) ClientChecker()

func (*CQMgr) ClientHandle

func (qm *CQMgr) ClientHandle()

func (*CQMgr) ClientHeartBeat

func (qm *CQMgr) ClientHeartBeat(id string, cg *ClientGroup, workerstate WorkerState) (hbmsg HeartbeatInstructions, err error)

func (*CQMgr) ClientStatusChange_deprecated

func (qm *CQMgr) ClientStatusChange_deprecated(client *Client, new_status string, client_write_lock bool) (err error)

func (*CQMgr) DeleteClients

func (qm *CQMgr) DeleteClients(delete_clients []string)

func (*CQMgr) EnqueueWorkunit

func (qm *CQMgr) EnqueueWorkunit(work *Workunit) (err error)

func (*CQMgr) FetchDataToken

func (qm *CQMgr) FetchDataToken(workid string, clientid string) (token string, err error)

func (*CQMgr) GetAllClientsByUser

func (qm *CQMgr) GetAllClientsByUser(u *user.User) (clients []*Client, err error)

func (*CQMgr) GetClient

func (qm *CQMgr) GetClient(id string, lock_clientmap bool) (client *Client, ok bool, err error)

func (*CQMgr) GetClientByUser

func (qm *CQMgr) GetClientByUser(id string, u *user.User) (client *Client, err error)

func (*CQMgr) GetClientMap

func (qm *CQMgr) GetClientMap() *ClientMap

func (*CQMgr) GetWorkById

func (qm *CQMgr) GetWorkById(id Workunit_Unique_Identifier) (workunit *Workunit, err error)

func (*CQMgr) HasClient

func (qm *CQMgr) HasClient(id string, lock_clientmap bool) (has bool, err error)

func (*CQMgr) ListClients

func (qm *CQMgr) ListClients() (ids []string, err error)

func (*CQMgr) NotifyWorkStatus

func (qm *CQMgr) NotifyWorkStatus(notice Notice)

func (*CQMgr) ReQueueWorkunitByClient

func (qm *CQMgr) ReQueueWorkunitByClient(client *Client, client_write_lock bool) (err error)

func (*CQMgr) RegisterNewClient

func (qm *CQMgr) RegisterNewClient(files FormFiles, cg *ClientGroup) (client *Client, err error)

This can be a new client or an old client that re-registers

func (*CQMgr) RemoveClient

func (qm *CQMgr) RemoveClient(id string, lock bool) (err error)

lock is for clientmap

func (*CQMgr) ResumeClient

func (qm *CQMgr) ResumeClient(id string) (err error)

func (*CQMgr) ResumeClientByUser

func (qm *CQMgr) ResumeClientByUser(id string, u *user.User) (err error)

func (*CQMgr) ResumeSuspendedClients

func (qm *CQMgr) ResumeSuspendedClients() (count int, err error)

func (*CQMgr) ResumeSuspendedClientsByUser

func (qm *CQMgr) ResumeSuspendedClientsByUser(u *user.User) (count int)

func (*CQMgr) ShowWorkQueue

func (qm *CQMgr) ShowWorkQueue()

show functions used in debug

func (*CQMgr) ShowWorkunits

func (qm *CQMgr) ShowWorkunits(status string) (workunits []*Workunit, err error)

func (*CQMgr) ShowWorkunitsByUser

func (qm *CQMgr) ShowWorkunitsByUser(status string, u *user.User) (workunits []*Workunit)

func (*CQMgr) SuspendAllClients

func (qm *CQMgr) SuspendAllClients(reason string) (count int, err error)

func (*CQMgr) SuspendAllClientsByUser

func (qm *CQMgr) SuspendAllClientsByUser(u *user.User, reason string) (count int, err error)

func (*CQMgr) SuspendClient

func (qm *CQMgr) SuspendClient(id string, client *Client, reason string, client_write_lock bool) (err error)

use id OR client

func (*CQMgr) SuspendClientByUser

func (qm *CQMgr) SuspendClientByUser(id string, u *user.User, reason string) (err error)

func (*CQMgr) UpdateSubClients

func (qm *CQMgr) UpdateSubClients(id string, count int) (err error)

func (*CQMgr) UpdateSubClientsByUser

func (qm *CQMgr) UpdateSubClientsByUser(id string, count int, u *user.User)

type CWL_workunit

type CWL_workunit struct {
	Job_input          *cwl.Job_document         `bson:"job_input,omitempty" json:"job_input,omitempty" mapstructure:"job_input,omitempty"`
	Job_input_filename string                    `bson:"job_input_filename,omitempty" json:"job_input_filename,omitempty" mapstructure:"job_input_filename,omitempty"`
	CWL_tool           *cwl.CommandLineTool      `bson:"cwl_tool,omitempty" json:"cwl_tool,omitempty" mapstructure:"cwl_tool,omitempty"`
	CWL_tool_filename  string                    `bson:"cwl_tool_filename,omitempty" json:"cwl_tool_filename,omitempty" mapstructure:"cwl_tool_filename,omitempty"`
	Outputs            *cwl.Job_document         `bson:"outputs,omitempty" json:"outputs,omitempty" mapstructure:"outputs,omitempty"`
	OutputsExpected    *[]cwl.WorkflowStepOutput `bson:"outputs_expected,omitempty" json:"outputs_expected,omitempty" mapstructure:"outputs_expected,omitempty"` // this is the subset of outputs that are needed by the workflow
	Notice             `bson:",inline" json:",inline" mapstructure:",squash"`
}

func NewCWL_workunit

func NewCWL_workunit() *CWL_workunit

func NewCWL_workunit_from_interface

func NewCWL_workunit_from_interface(native interface{}) (workunit *CWL_workunit, err error)

type Client

type Client struct {
	RWMutex         `bson:"-" json:"-"`
	WorkerRuntime   `bson:",inline" json:",inline"`
	WorkerState     `bson:",inline" json:",inline"`
	RegTime         time.Time     `bson:"regtime" json:"regtime"`
	LastCompleted   time.Time     `bson:"lastcompleted" json:"lastcompleted"` // time of last time a job was completed (can be used to compute idle time)
	Serve_time      string        `bson:"serve_time" json:"serve_time"`
	Total_checkout  int           `bson:"total_checkout" json:"total_checkout"`
	Total_completed int           `bson:"total_completed" json:"total_completed"`
	Total_failed    int           `bson:"total_failed" json:"total_failed"`
	Skip_work       []string      `bson:"skip_work" json:"skip_work"`
	Last_failed     int           `bson:"-" json:"-"`
	Tag             bool          `bson:"-" json:"-"`
	Proxy           bool          `bson:"proxy" json:"proxy"`
	SubClients      int           `bson:"subclients" json:"subclients"`
	Online          bool          `bson:"online" json:"online"`                 // a state
	Suspended       bool          `bson:"suspended" json:"suspended"`           // a state
	Suspend_reason  string        `bson:"suspend_reason" json:"suspend_reason"` // a state
	Status          string        `bson:"Status" json:"Status"`                 // 1) suspended? 2) busy ? 3) online (call is idle) 4) offline
	Assigned_work   *WorkunitList `bson:"assigned_work" json:"assigned_work"`   // this is for exporting into json
	// contains filtered or unexported fields
}

this is the Worker

func NewClient

func NewClient() (client *Client)

func NewProfileClient

func NewProfileClient(filepath string) (client *Client, err error)

create Client object from json file

func (*Client) Add

func (this *Client) Add(workid Workunit_Unique_Identifier) (err error)

func (*Client) Append_Skip_work

func (cl *Client) Append_Skip_work(workid Workunit_Unique_Identifier, write_lock bool) (err error)

func (*Client) Contains_Skip_work_nolock

func (cl *Client) Contains_Skip_work_nolock(workid string) (c bool)

func (*Client) Get_Ack

func (cl *Client) Get_Ack() (ack CoAck, err error)

func (*Client) Get_Busy

func (cl *Client) Get_Busy(do_read_lock bool) (b bool, err error)

func (*Client) Get_Group

func (cl *Client) Get_Group(do_read_lock bool) (g string, err error)

func (*Client) Get_Id

func (cl *Client) Get_Id(do_read_lock bool) (s string, err error)

func (*Client) Get_Last_failed

func (cl *Client) Get_Last_failed() (count int, err error)

func (*Client) Get_New_Status

func (cl *Client) Get_New_Status(do_read_lock bool) (s string, err error)

this function should not be used internally, this is only for backwards-compatibility and human readability

func (*Client) Get_Suspended

func (cl *Client) Get_Suspended(do_read_lock bool) (s bool, err error)

func (*Client) Get_Total_checkout

func (cl *Client) Get_Total_checkout() (count int, err error)

func (*Client) Get_Total_completed

func (cl *Client) Get_Total_completed() (count int, err error)

func (*Client) Get_Total_failed

func (cl *Client) Get_Total_failed() (count int, err error)

func (*Client) Increment_last_failed

func (cl *Client) Increment_last_failed(write_lock bool) (value int, err error)

func (*Client) Increment_total_checkout

func (cl *Client) Increment_total_checkout(err error)

func (*Client) Increment_total_completed

func (cl *Client) Increment_total_completed() (err error)

func (*Client) Increment_total_failed

func (cl *Client) Increment_total_failed(write_lock bool) (err error)

func (*Client) Init

func (client *Client) Init()

invoked by NewClient or manually after unmarshalling

func (*Client) Marshal

func (cl *Client) Marshal() (result []byte, err error)

func (*Client) Resume

func (cl *Client) Resume(write_lock bool) (err error)

func (*Client) Set_Busy

func (cl *Client) Set_Busy(b bool, do_write_lock bool) (err error)

func (*Client) Set_Online

func (cl *Client) Set_Online(o bool, write_lock bool) (err error)

func (*Client) Set_Status_deprecated

func (cl *Client) Set_Status_deprecated(s string, write_lock bool) (err error)

func (*Client) Set_Suspended

func (cl *Client) Set_Suspended(s bool, reason string, write_lock bool) (err error)

func (*Client) Suspend

func (cl *Client) Suspend(reason string, write_lock bool) (err error)

func (*Client) Update_Status

func (cl *Client) Update_Status(write_lock bool) (err error)

type ClientGroup

type ClientGroup struct {
	Id           string                        `bson:"id" json:"id"`
	IP_CIDR      string                        `bson:"ip_cidr" json:"ip_cidr"`
	Name         string                        `bson:"name" json:"name"`
	Token        string                        `bson:"token" json:"token"`
	Acl          clientGroupAcl.ClientGroupAcl `bson:"acl" json:"-"`
	CreatedOn    time.Time                     `bson:"created_on" json:"created_on"`
	Expiration   time.Time                     `bson:"expiration" json:"expiration"`
	LastModified time.Time                     `bson:"last_modified" json:"last_modified"`
}

func CreateClientGroup

func CreateClientGroup(name string, u *user.User) (cg *ClientGroup, err error)

func LoadClientGroup

func LoadClientGroup(id string) (clientgroup *ClientGroup, err error)

func LoadClientGroupByName

func LoadClientGroupByName(name string) (clientgroup *ClientGroup, err error)

func LoadClientGroupByToken

func LoadClientGroupByToken(token string) (clientgroup *ClientGroup, err error)

func (*ClientGroup) Save

func (cg *ClientGroup) Save() (err error)

func (*ClientGroup) SetToken

func (cg *ClientGroup) SetToken()

type ClientGroups

type ClientGroups []ClientGroup

ClientGroup array type

func (*ClientGroups) GetPaginated

func (n *ClientGroups) GetPaginated(q bson.M, limit int, offset int, order string, direction string) (count int, err error)

type ClientMap

type ClientMap struct {
	RWMutex
	// contains filtered or unexported fields
}

func NewClientMap

func NewClientMap() *ClientMap

func (*ClientMap) Add

func (cl *ClientMap) Add(client *Client, lock bool) (err error)

func (*ClientMap) Delete

func (cl *ClientMap) Delete(client_id string, lock bool) (err error)

func (*ClientMap) Get

func (cl *ClientMap) Get(client_id string, lock bool) (client *Client, ok bool, err error)

func (*ClientMap) GetClientIds

func (cl *ClientMap) GetClientIds() (ids []string, err error)

func (*ClientMap) GetClients

func (cl *ClientMap) GetClients() (clients []*Client, err error)

func (*ClientMap) Has

func (cl *ClientMap) Has(client_id string, lock bool) (ok bool, err error)

type ClientMgr

type ClientMgr interface {
	RegisterNewClient(FormFiles, *ClientGroup) (*Client, error)
	ClientHeartBeat(string, *ClientGroup, WorkerState) (HeartbeatInstructions, error)
	GetClient(string, bool) (*Client, bool, error)
	GetClientByUser(string, *user.User) (*Client, error)
	//GetAllClients() []*Client
	GetClientMap() *ClientMap
	GetAllClientsByUser(*user.User) ([]*Client, error)
	//DeleteClient(*Client) error
	//DeleteClientById(string) error
	//DeleteClientByUser(string, *user.User) error
	SuspendClient(string, *Client, string, bool) error
	SuspendClientByUser(string, *user.User, string) error
	ResumeClient(string) error
	ResumeClientByUser(string, *user.User) error
	ResumeSuspendedClients() (int, error)
	ResumeSuspendedClientsByUser(*user.User) int
	SuspendAllClients(string) (int, error)
	SuspendAllClientsByUser(*user.User, string) (int, error)
	ClientChecker()
	UpdateSubClients(string, int) error
	UpdateSubClientsByUser(string, int, *user.User)
}

type ClientWorkMgr

type ClientWorkMgr interface {
	ClientMgr
	WorkMgr
}

type Clients

type Clients []*Client

func (*Clients) RLockRecursive

func (cs *Clients) RLockRecursive()

func (*Clients) RUnlockRecursive

func (cs *Clients) RUnlockRecursive()

type CoAck

type CoAck struct {
	// contains filtered or unexported fields
}

type CoReq

type CoReq struct {
	// contains filtered or unexported fields
}

type Command

type Command struct {
	Name          string   `bson:"name" json:"name" mapstructure:"name"`
	Args          string   `bson:"args" json:"args" mapstructure:"args"`
	ArgsArray     []string `bson:"args_array" json:"args_array" mapstructure:"args_array"`    // use this instead of Args, which is just a string
	Dockerimage   string   `bson:"Dockerimage" json:"Dockerimage" mapstructure:"Dockerimage"` // for Shock (TODO rename this !)
	DockerPull    string   `bson:"dockerPull" json:"dockerPull" mapstructure:"dockerPull"`    // docker pull
	Cmd_script    []string `bson:"cmd_script" json:"cmd_script" mapstructure:"cmd_script"`
	Environ       Envs     `bson:"environ" json:"environ" mapstructure:"environ"`
	HasPrivateEnv bool     `bson:"has_private_env" json:"has_private_env" mapstructure:"has_private_env"`
	Description   string   `bson:"description" json:"description" mapstructure:"description"`
	ParsedArgs    []string `bson:"-" json:"-" mapstructure:"-"`
	Local         bool     // indicates local execution, i.e. working directory is same as current working directory (do not delete !)
}

func NewCommand

func NewCommand(name string) *Command

type Command_p

type Command_p struct {
	Environ *Environ_p `json:"environ"`
}

type Environ_p

type Environ_p struct {
	Private map[string]string `json:"private"`
}

following special code is in order to unmarshal the private field Command.Environ.Private, so put them in to this file for less confusion

type Envs

type Envs struct {
	Public  map[string]string `bson:"public" json:"public"`
	Private map[string]string `bson:"private" json:"-"`
}

type Filter_work_stats

type Filter_work_stats struct {
	Total             int
	Skip_work         int
	Wrong_clientgroup int
	Wrong_app         int
}

type FormFile

type FormFile struct {
	Name     string
	Path     string
	Checksum map[string]string
}

type FormFiles

type FormFiles map[string]FormFile

type HeartbeatInstructions

type HeartbeatInstructions map[string]string //map[op]obj1,obj2 e.g. map[discard]=work1,work2

heartbeat response from awe-server to awe-worker used for issue operation request to client, e.g. discard suspended workunits

type Helper

type Helper struct {
	AWE_tasks *map[string]*Task
	// contains filtered or unexported fields
}

type IO

type IO struct {
	FileName      string                   `bson:"filename" json:"filename" mapstructure:"filename"`
	Name          string                   `bson:"name" json:"name" mapstructure:"name"`  // specifies abstract name of output as defined by the app
	AppPosition   int                      `bson:"appposition" json:"-" mapstructure:"-"` // specifies position in app output array
	Directory     string                   `bson:"directory" json:"directory" mapstructure:"directory"`
	Host          string                   `bson:"host" json:"host" mapstructure:"host"`
	Node          string                   `bson:"node" json:"node" mapstructure:"node"`
	Url           string                   `bson:"url"  json:"url" mapstructure:"url"` // can be shock or any other url
	Size          int64                    `bson:"size" json:"size" mapstructure:"size"`
	MD5           string                   `bson:"md5" json:"-" mapstructure:"-"`
	Cache         bool                     `bson:"cache" json:"cache" mapstructure:"cache"` // indicates that this files is "predata"" that needs to be cached
	Origin        string                   `bson:"origin" json:"origin" mapstructure:"origin"`
	Path          string                   `bson:"-" json:"-" mapstructure:"-"`
	Optional      bool                     `bson:"optional" json:"-" mapstructure:"-"`
	Nonzero       bool                     `bson:"nonzero"  json:"nonzero" mapstructure:"nonzero"`
	DataToken     string                   `bson:"datatoken"  json:"-" mapstructure:"-"`
	Intermediate  bool                     `bson:"Intermediate"  json:"-" mapstructure:"-"`
	Temporary     bool                     `bson:"temporary"  json:"temporary" mapstructure:"temporary"`
	ShockFilename string                   `bson:"shockfilename" json:"shockfilename" mapstructure:"shockfilename"`
	ShockIndex    string                   `bson:"shockindex" json:"shockindex" mapstructure:"shockindex"` // on input it indicates that Shock node has to be indexed by AWE server
	AttrFile      string                   `bson:"attrfile" json:"attrfile" mapstructure:"attrfile"`
	NoFile        bool                     `bson:"nofile" json:"nofile" mapstructure:"nofile"`
	Delete        bool                     `bson:"delete" json:"delete" mapstructure:"delete"` // speficies that this is a temorary node, to be deleted from shock on job completion
	Type          string                   `bson:"type" json:"type" mapstructure:"type"`
	NodeAttr      map[string]interface{}   `bson:"nodeattr" json:"nodeattr" mapstructure:"nodeattr"` // specifies attribute data to be stored in shock node (output only)
	FormOptions   map[string]string        `bson:"formoptions" json:"formoptions" mapstructure:"formoptions"`
	Uncompress    string                   `bson:"uncompress" json:"uncompress" mapstructure:"uncompress"` // tells AWE client to uncompress this file, e.g. "gzip"
	Indexes       map[string]shock.IdxInfo `bson:"-" json:"-" mapstructure:"-"`                            // copy of shock node.Indexes
}

func NewIO

func NewIO() *IO

func (*IO) DataUrl

func (io *IO) DataUrl() (dataurl string, err error)

func (*IO) DeleteNode

func (io *IO) DeleteNode() (err error)

func (*IO) GetFileSize

func (io *IO) GetFileSize() (size int64, modified bool, err error)

func (*IO) GetIndexInfo

func (io *IO) GetIndexInfo(indextype string) (idxInfo shock.IdxInfo, hasIndex bool, err error)

func (*IO) GetShockNode

func (io *IO) GetShockNode() (node *shock.ShockNode, err error)

func (*IO) HasFile

func (io *IO) HasFile() bool

func (*IO) TotalUnits

func (io *IO) TotalUnits(indextype string) (count int, err error)

func (*IO) Url2Shock

func (io *IO) Url2Shock() (err error)

type IOmap

type IOmap map[string]*IO // [filename]attributes

Deprecated JobDep struct uses deprecated TaskDep struct which uses the deprecated IOmap. Maintained for backwards compatibility. Jobs that cannot be parsed into the Job struct, but can be parsed into the JobDep struct will be translated to the new Job struct. (=deprecated=)

func NewIOmap

func NewIOmap() IOmap

(=deprecated=)

func (IOmap) Add

func (i IOmap) Add(name string, host string, node string, md5 string, cache bool)

(=deprecated=)

func (IOmap) Find

func (i IOmap) Find(name string) *IO

(=deprecated=)

func (IOmap) Has

func (i IOmap) Has(name string) bool

(=deprecated=)

type Info

type Info struct {
	Name          string                 `bson:"name" json:"name" mapstructure:"name"`
	Xref          string                 `bson:"xref" json:"xref" mapstructure:"xref"`
	Service       string                 `bson:"service" json:"service" mapstructure:"service"`
	Project       string                 `bson:"project" json:"project" mapstructure:"project"`
	User          string                 `bson:"user" json:"user" mapstructure:"user"`
	Pipeline      string                 `bson:"pipeline" json:"pipeline" mapstructure:"pipeline"` // or workflow
	ClientGroups  string                 `bson:"clientgroups" json:"clientgroups" mapstructure:"clientgroups"`
	SubmitTime    time.Time              `bson:"submittime" json:"submittime" mapstructure:"submittime"`
	StartedTime   time.Time              `bson:"startedtime" json:"startedtime" mapstructure:"startedtime"`
	CompletedTime time.Time              `bson:"completedtime" json:"completedtime" mapstructure:"completedtime"`
	Priority      int                    `bson:"priority" json:"priority" mapstructure:"priority"`
	Auth          bool                   `bson:"auth" json:"auth" mapstructure:"auth"`
	DataToken     string                 `bson:"datatoken" json:"-" mapstructure:"-"`
	NoRetry       bool                   `bson:"noretry" json:"noretry" mapstructure:"noretry"`
	UserAttr      map[string]interface{} `bson:"userattr" json:"userattr" mapstructure:"userattr"`
	Description   string                 `bson:"description" json:"description" mapstructure:"description"`
	Tracking      bool                   `bson:"tracking" json:"tracking" mapstructure:"tracking"`
	StartAt       time.Time              `bson:"start_at" json:"start_at" mapstructure:"start_at"` // will start tasks at this timepoint or shortly after
}

job info

func NewInfo

func NewInfo() *Info

func (*Info) SetStartedTime

func (this *Info) SetStartedTime(jobid string, t time.Time) (err error)

type Job

type Job struct {
	JobRaw `bson:",inline"`
	Tasks  []*Task `bson:"tasks" json:"tasks"`
}

func CWL2AWE

func CWL2AWE(_user *user.User, files FormFiles, job_input *cwl.Job_document, cwl_workflow *cwl.Workflow, collection *cwl.CWL_collection) (job *Job, err error)

func CreateJobImport

func CreateJobImport(u *user.User, file FormFile) (job *Job, err error)

func CreateJobUpload

func CreateJobUpload(u *user.User, files FormFiles) (job *Job, err error)

func GetJob

func GetJob(id string) (job *Job, err error)

func JobDepToJob

func JobDepToJob(jobDep *JobDep) (job *Job, err error)

Takes the deprecated (version 1) Job struct and returns the version 2 Job struct or an error

func LoadJob

func LoadJob(id string) (job *Job, err error)

func NewJob

func NewJob() (job *Job)

func ReadJobFile

func ReadJobFile(filename string) (job *Job, err error)

func (*Job) AddTask

func (job *Job) AddTask(task *Task) (err error)

func (*Job) AddWorkflowInstance

func (job *Job) AddWorkflowInstance(id string, inputs cwl.Job_document, remain_tasks int) (err error)

func (*Job) Decrease_WorkflowInstance_RemainTasks

func (job *Job) Decrease_WorkflowInstance_RemainTasks(id string) (remain_tasks int, err error)

func (*Job) Delete

func (job *Job) Delete() (err error)

func (*Job) FilePath

func (job *Job) FilePath() (path string, err error)

func (*Job) GetDataToken

func (job *Job) GetDataToken() (token string)

func (*Job) GetJobLogs

func (job *Job) GetJobLogs() (jlog *JobLog, err error)

func (*Job) GetPrivateEnv

func (job *Job) GetPrivateEnv(taskid string) (env map[string]string, err error)

func (*Job) GetRemainTasks

func (job *Job) GetRemainTasks() (remain_tasks int, err error)

func (*Job) GetState

func (job *Job) GetState(do_lock bool) (state string, err error)

func (*Job) GetTasks

func (job *Job) GetTasks() (tasks []*Task, err error)

func (*Job) GetWorkflowInstance

func (job *Job) GetWorkflowInstance(id string, do_read_lock bool) (wi *WorkflowInstance, err error)

func (*Job) GetWorkflowInstanceIndex

func (job *Job) GetWorkflowInstanceIndex(id string, do_read_lock bool) (index int, err error)

func (*Job) IncrementRemainTasks

func (job *Job) IncrementRemainTasks(inc int) (err error)

func (*Job) IncrementResumed

func (job *Job) IncrementResumed(inc int) (err error)

func (*Job) Init

func (job *Job) Init() (changed bool, err error)

this has to be called after Unmarshalling from JSON

func (*Job) Mkdir

func (job *Job) Mkdir() (err error)

func (*Job) NumTask

func (job *Job) NumTask() int

func (*Job) Path

func (job *Job) Path() (path string, err error)

---Path functions

func (*Job) RLockRecursive

func (job *Job) RLockRecursive()

func (*Job) RUnlockRecursive

func (job *Job) RUnlockRecursive()

func (*Job) Rmdir

func (job *Job) Rmdir() (err error)

func (*Job) Save

func (job *Job) Save() (err error)

func (*Job) SaveToDisk

func (job *Job) SaveToDisk() (err error)

func (*Job) SetClientgroups

func (job *Job) SetClientgroups(clientgroups string) (err error)

func (*Job) SetDataToken

func (job *Job) SetDataToken(token string) (err error)

func (*Job) SetError

func (job *Job) SetError(newError *JobError) (err error)

func (*Job) SetExpiration

func (job *Job) SetExpiration(expire string) (err error)

func (*Job) SetFile

func (job *Job) SetFile(file FormFile) (err error)

func (*Job) SetPipeline

func (job *Job) SetPipeline(pipeline string) (err error)

func (*Job) SetPriority

func (job *Job) SetPriority(priority int) (err error)

func (*Job) SetRemainTasks

func (job *Job) SetRemainTasks(remain_tasks int) (err error)

func (*Job) SetState

func (job *Job) SetState(newState string, oldstates []string) (err error)

func (*Job) Set_WorkflowInstance_Outputs

func (job *Job) Set_WorkflowInstance_Outputs(id string, outputs cwl.Job_document) (err error)

func (*Job) TaskList

func (job *Job) TaskList() []*Task

---Task functions

func (*Job) UpdateFile

func (job *Job) UpdateFile(files FormFiles, field string) (err error)

---Script upload (e.g. field="upload")

type JobDep

type JobDep struct {
	JobRaw `bson:",inline"`
	Tasks  []*TaskDep `bson:"tasks" json:"tasks"`
}

Deprecated JobDep struct uses deprecated TaskDep struct which uses the deprecated IOmap. Maintained for backwards compatibility. Jobs that cannot be parsed into the Job struct, but can be parsed into the JobDep struct will be translated to the new Job struct. (=deprecated=)

func NewJobDep

func NewJobDep() (job *JobDep)

type JobError

type JobError struct {
	ClientFailed string `bson:"clientfailed" json:"clientfailed"`
	WorkFailed   string `bson:"workfailed" json:"workfailed"`
	TaskFailed   string `bson:"taskfailed" json:"taskfailed"`
	ServerNotes  string `bson:"servernotes" json:"servernotes"`
	WorkNotes    string `bson:"worknotes" json:"worknotes"`
	AppError     string `bson:"apperror" json:"apperror"`
	Status       string `bson:"status" json:"status"`
}

type JobLog

type JobLog struct {
	Id         string     `bson:"id" json:"id"`
	State      string     `bson:"state" json:"state"`
	UpdateTime time.Time  `bson:"updatetime" json:"updatetime"`
	Error      *JobError  `bson:"error" json:"error"`
	Resumed    int        `bson:"resumed" json:"resumed"`
	Tasks      []*TaskLog `bson:"tasks" json:"tasks"`
}

type JobMap

type JobMap struct {
	RWMutex
	// contains filtered or unexported fields
}

func NewJobMap

func NewJobMap() (t *JobMap)

func (*JobMap) Add

func (jm *JobMap) Add(job *Job) (err error)

func (*JobMap) Delete

func (jm *JobMap) Delete(jobid string, lock bool) (err error)

func (*JobMap) Get

func (jm *JobMap) Get(jobid string, lock bool) (job *Job, ok bool, err error)

func (*JobMap) Get_List

func (jm *JobMap) Get_List(lock bool) (jobs []*Job, err error)

func (*JobMap) Len

func (jm *JobMap) Len() (length int, err error)

type JobMgr

type JobMgr interface {
	EnqueueTasksByJobId(string) error
	GetActiveJobs() map[string]bool
	IsJobRegistered(string) bool
	GetSuspendJobs() map[string]bool
	SuspendJob(string, *JobError) error
	ResumeSuspendedJobByUser(string, *user.User) error
	ResumeSuspendedJobsByUser(*user.User) int
	ResubmitJob(string) error
	DeleteJobByUser(string, *user.User, bool) error
	DeleteSuspendedJobsByUser(*user.User, bool) int
	DeleteZombieJobsByUser(*user.User, bool) int
	RecoverJob(string) error
	RecoverJobs() error
	FinalizeWorkPerf(Workunit_Unique_Identifier, string) error
	SaveStdLog(Workunit_Unique_Identifier, string, string) error
	GetReportMsg(Workunit_Unique_Identifier, string) (string, error)
	RecomputeJob(string, string) error
	UpdateQueueToken(*Job) error
}

type JobMin

type JobMin struct {
	Id            string                 `bson:"id" json:"id"`
	Name          string                 `bson:"name" json:"name"`
	Size          int64                  `bson:"size" json:"size"`
	SubmitTime    time.Time              `bson:"submittime" json:"submittime"`
	CompletedTime time.Time              `bson:"completedtime" json:"completedtime"`
	ComputeTime   int                    `bson:"computetime" json:"computetime"`
	Task          []int                  `bson:"task" json:"task"`
	State         []string               `bson:"state" json:"state"`
	UserAttr      map[string]interface{} `bson:"userattr" json:"userattr"`
}

type JobPerf

type JobPerf struct {
	Id     string               `bson:"id" json:"id"`
	Queued int64                `bson:"queued" json:"queued"`
	Start  int64                `bson:"start" json:"start"`
	End    int64                `bson:"end" json:"end"`
	Resp   int64                `bson:"resp" json:"resp"` //End - Queued
	Ptasks map[string]*TaskPerf `bson:"task_stats" json:"task_stats"`
	Pworks map[string]*WorkPerf `bson:"work_stats" json:"work_stats"`
}

func LoadJobPerf

func LoadJobPerf(id string) (perf *JobPerf, err error)

func NewJobPerf

func NewJobPerf(id string) *JobPerf

type JobRaw

type JobRaw struct {
	RWMutex
	Id                   string                       `bson:"id" json:"id"` // uuid
	Acl                  acl.Acl                      `bson:"acl" json:"-"`
	Info                 *Info                        `bson:"info" json:"info"`
	Script               script                       `bson:"script" json:"-"`
	State                string                       `bson:"state" json:"state"`
	Registered           bool                         `bson:"registered" json:"registered"`
	RemainTasks          int                          `bson:"remaintasks" json:"remaintasks"`
	Expiration           time.Time                    `bson:"expiration" json:"expiration"` // 0 means no expiration
	UpdateTime           time.Time                    `bson:"updatetime" json:"updatetime"`
	Error                *JobError                    `bson:"error" json:"error"`         // error struct exists when in suspended state
	Resumed              int                          `bson:"resumed" json:"resumed"`     // number of times the job has been resumed from suspension
	ShockHost            string                       `bson:"shockhost" json:"shockhost"` // this is a fall-back default if not specified at a lower level
	IsCWL                bool                         `bson:"is_cwl" json:"is_cwl`
	CwlVersion           cwl.CWLVersion               `bson:"cwl_version" json:"cwl_version"`
	CWL_objects          interface{}                  `bson:"cwl_objects" json:"cwl_objects`
	CWL_job_input        interface{}                  `bson:"cwl_job_input" json:"cwl_job_input` // has to be an array for mongo (id as key would not work)
	CWL_collection       *cwl.CWL_collection          `bson:"-" json:"-" yaml:"-" mapstructure:"-"`
	CWL_workflow         *cwl.Workflow                `bson:"-" json:"-" yaml:"-" mapstructure:"-"`
	WorkflowInstances    []interface{}                `bson:"workflow_instances" json:"workflow_instances" yaml:"workflow_instances" mapstructure:"workflow_instances"`
	WorkflowInstancesMap map[string]*WorkflowInstance `bson:"-" json:"-" yaml:"-" mapstructure:"-"`
}

func NewJobRaw

func NewJobRaw() (job *JobRaw)

func (*JobRaw) GetId

func (job *JobRaw) GetId(do_read_lock bool) (id string, err error)

type JobReaper

type JobReaper struct{}
var (
	Ttl         *JobReaper
	ExpireRegex = regexp.MustCompile(`^(\d+)(M|H|D)$`)
)

func NewJobReaper

func NewJobReaper() *JobReaper

func (*JobReaper) Handle

func (jr *JobReaper) Handle()

type Job_Acl

type Job_Acl struct {
	Acl acl.Acl `bson:"acl" json:"-"`
}

type Job_p

type Job_p struct {
	Tasks []*Task_p `json:"tasks"`
}

type Jobs

type Jobs []*Job

Job array type

func (*Jobs) GetAll

func (n *Jobs) GetAll(q bson.M, order string, direction string, do_init bool) (err error)

func (*Jobs) GetAllLimitOffset

func (n *Jobs) GetAllLimitOffset(q bson.M, limit int, offset int) (err error)

func (*Jobs) GetAllRecent

func (n *Jobs) GetAllRecent(q bson.M, recent int, do_init bool) (count int, err error)

func (*Jobs) GetAllUnsorted

func (n *Jobs) GetAllUnsorted(q bson.M) (err error)

func (*Jobs) GetJobAt

func (n *Jobs) GetJobAt(index int) *Job

func (*Jobs) GetPaginated

func (n *Jobs) GetPaginated(q bson.M, limit int, offset int, order string, direction string, do_init bool) (count int, err error)

func (*Jobs) Init

func (n *Jobs) Init() (changed_count int, err error)

func (*Jobs) Length

func (n *Jobs) Length() int

func (*Jobs) RLockRecursive

func (n *Jobs) RLockRecursive()

func (*Jobs) RUnlockRecursive

func (n *Jobs) RUnlockRecursive()

type Notice

type Notice struct {
	Id          Workunit_Unique_Identifier `bson:"id" json:"id" mapstructure:"id"` // redundant field, for reporting
	WorkerId    string                     `bson:"worker_id" json:"worker_id" mapstructure:"worker_id"`
	Results     *cwl.Job_document          `bson:"results" json:"results" mapstructure:"results"`                            // subset of tool_results with Shock URLs
	Status      string                     `bson:"status,omitempty" json:"status,omitempty" mapstructure:"status,omitempty"` // this is redundant as workunit already has state, but this is only used for transfer
	ComputeTime int                        `bson:"computetime,omitempty" json:"computetime,omitempty" mapstructure:"computetime,omitempty"`
	Notes       string
	Stderr      string
}

func NewNotice

func NewNotice(native interface{}) (workunit_result *Notice, err error)

type PartInfo

type PartInfo struct {
	Input         string `bson:"input" json:"input" mapstructure:"input"`
	Index         string `bson:"index" json:"index" mapstructure:"index"`
	TotalIndex    int    `bson:"totalindex" json:"totalindex" mapstructure:"totalindex"`
	MaxPartSizeMB int    `bson:"maxpartsize_mb" json:"maxpartsize_mb" mapstructure:"maxpartsize_mb"`
	Options       string `bson:"options" json:"-" mapstructure:"-"`
}

type Pipeline

type Pipeline struct {
	Info  *Info  `bson:"info" json:"info"`
	Tasks []Task `bson:"tasks" json:"tasks"`
}

func NewPipeline

func NewPipeline() *Pipeline

type ProxyMgr

type ProxyMgr struct {
	CQMgr
}

func NewProxyMgr

func NewProxyMgr() *ProxyMgr

func (*ProxyMgr) ClientChecker

func (qm *ProxyMgr) ClientChecker()

func (*ProxyMgr) ClientHandle

func (qm *ProxyMgr) ClientHandle()

func (*ProxyMgr) DeleteJobByUser

func (qm *ProxyMgr) DeleteJobByUser(jobid string, u *user.User, full bool) (err error)

func (*ProxyMgr) DeleteSuspendedJobsByUser

func (qm *ProxyMgr) DeleteSuspendedJobsByUser(u *user.User, full bool) (num int)

func (*ProxyMgr) DeleteZombieJobsByUser

func (qm *ProxyMgr) DeleteZombieJobsByUser(u *user.User, full bool) (num int)

func (*ProxyMgr) EnqueueTasksByJobId

func (qm *ProxyMgr) EnqueueTasksByJobId(jobid string) (err error)

func (*ProxyMgr) FetchDataToken

func (qm *ProxyMgr) FetchDataToken(workunit *Workunit, clientid string) (token string, err error)

func (*ProxyMgr) FetchPrivateEnv

func (qm *ProxyMgr) FetchPrivateEnv(workid string, clientid string) (env map[string]string, err error)

func (*ProxyMgr) FinalizeWorkPerf

func (qm *ProxyMgr) FinalizeWorkPerf(string, string) (err error)

func (*ProxyMgr) GetActiveJobs

func (qm *ProxyMgr) GetActiveJobs() map[string]bool

func (*ProxyMgr) GetJsonStatus

func (qm *ProxyMgr) GetJsonStatus() (status map[string]map[string]int, err error)

func (*ProxyMgr) GetQueue

func (qm *ProxyMgr) GetQueue(name string) interface{}

func (*ProxyMgr) GetReportMsg

func (qm *ProxyMgr) GetReportMsg(string, string) (report string, err error)

func (*ProxyMgr) GetSuspendJobs

func (qm *ProxyMgr) GetSuspendJobs() map[string]bool

func (*ProxyMgr) GetTextStatus

func (qm *ProxyMgr) GetTextStatus() string

func (*ProxyMgr) IsJobRegistered

func (qm *ProxyMgr) IsJobRegistered(id string) bool

func (*ProxyMgr) JobRegister

func (qm *ProxyMgr) JobRegister() (jid string, err error)

func (*ProxyMgr) Lock

func (qm *ProxyMgr) Lock()

func (*ProxyMgr) NoticeHandle

func (qm *ProxyMgr) NoticeHandle()

func (*ProxyMgr) QueueStatus

func (qm *ProxyMgr) QueueStatus() string

func (*ProxyMgr) RLock

func (qm *ProxyMgr) RLock()

func (*ProxyMgr) RUnlock

func (qm *ProxyMgr) RUnlock()

func (*ProxyMgr) RecomputeJob

func (qm *ProxyMgr) RecomputeJob(jobid string, stage string) (err error)

recompute jobs from specified task stage

func (*ProxyMgr) RecoverJob

func (qm *ProxyMgr) RecoverJob(id string) (err error)

recover job not in queue

func (*ProxyMgr) RecoverJobs

func (qm *ProxyMgr) RecoverJobs() (err error)

recover jobs not completed before awe-server restarts

func (*ProxyMgr) RegisterNewClient

func (qm *ProxyMgr) RegisterNewClient(files FormFiles, cg *ClientGroup) (client *Client, err error)

func (*ProxyMgr) ResubmitJob

func (qm *ProxyMgr) ResubmitJob(id string) (err error)

re-submit a job in db but not in the queue (caused by server restarting)

func (*ProxyMgr) ResumeQueue

func (qm *ProxyMgr) ResumeQueue()

func (*ProxyMgr) ResumeSuspendedJobByUser

func (qm *ProxyMgr) ResumeSuspendedJobByUser(id string, u *user.User) (err error)

resubmit a suspended job if user has rights

func (*ProxyMgr) ResumeSuspendedJobsByUser

func (qm *ProxyMgr) ResumeSuspendedJobsByUser(u *user.User) (num int)

func (*ProxyMgr) SaveStdLog

func (qm *ProxyMgr) SaveStdLog(string, string, string) (err error)

func (*ProxyMgr) SuspendJob

func (qm *ProxyMgr) SuspendJob(jobid string, jerror *JobError) (err error)

func (*ProxyMgr) SuspendQueue

func (qm *ProxyMgr) SuspendQueue()

func (*ProxyMgr) TaskHandle

func (qm *ProxyMgr) TaskHandle()

func (*ProxyMgr) Unlock

func (qm *ProxyMgr) Unlock()

func (*ProxyMgr) UpdateQueueLoop

func (qm *ProxyMgr) UpdateQueueLoop()

func (*ProxyMgr) UpdateQueueToken

func (qm *ProxyMgr) UpdateQueueToken(job *Job) (err error)

type RWMutex

type RWMutex struct {
	Name string `bson:"-" json:"-"`
	// contains filtered or unexported fields
}

func (*RWMutex) Init

func (m *RWMutex) Init(name string)

func (*RWMutex) Lock

func (m *RWMutex) Lock() (err error)

func (*RWMutex) LockNamed

func (m *RWMutex) LockNamed(name string) (err error)

func (*RWMutex) RCount

func (m *RWMutex) RCount() (c int)

func (*RWMutex) RList

func (m *RWMutex) RList() (list []string)

func (*RWMutex) RLock

func (m *RWMutex) RLock()

func (*RWMutex) RLockAnon

func (m *RWMutex) RLockAnon()

func (*RWMutex) RLockNamed

func (m *RWMutex) RLockNamed(name string) (rl ReadLock, err error)

func (*RWMutex) RUnlock

func (m *RWMutex) RUnlock()

func (*RWMutex) RUnlockAnon

func (m *RWMutex) RUnlockAnon()

func (*RWMutex) RUnlockNamed

func (m *RWMutex) RUnlockNamed(rl ReadLock)

func (*RWMutex) Unlock

func (m *RWMutex) Unlock()

type ReadLock

type ReadLock struct {
	// contains filtered or unexported fields
}

func (*ReadLock) Get_Id

func (r *ReadLock) Get_Id() string

type RequestQueue

type RequestQueue struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewRequestQueue

func NewRequestQueue() (q *RequestQueue)

func (RequestQueue) Pull

func (q RequestQueue) Pull() (req *CoReq, err error)

func (RequestQueue) Push

func (q RequestQueue) Push(req *CoReq) (err error)

type ResourceMgr

type ResourceMgr interface {
	ClientWorkMgr
	JobMgr
	TaskHandle()
	ClientHandle()
	UpdateQueueLoop()
	NoticeHandle()
	GetJsonStatus() (map[string]map[string]int, error)
	GetTextStatus() string
	QueueStatus() string
	GetQueue(string) interface{}
	SuspendQueue()
	ResumeQueue()
	Lock()
	Unlock()
	RLock()
	RUnlock()
}

type ServerMgr

type ServerMgr struct {
	CQMgr

	TaskMap TaskMap
	// contains filtered or unexported fields
}

func NewServerMgr

func NewServerMgr() *ServerMgr

func (*ServerMgr) ClientHandle

func (qm *ServerMgr) ClientHandle()

func (*ServerMgr) CreateAndEnqueueWorkunits

func (qm *ServerMgr) CreateAndEnqueueWorkunits(task *Task, job *Job) (err error)

func (*ServerMgr) CreateJobPerf

func (qm *ServerMgr) CreateJobPerf(jobid string)

---perf related methods

func (*ServerMgr) CreateTaskPerf

func (qm *ServerMgr) CreateTaskPerf(task *Task) (err error)

func (*ServerMgr) CreateWorkPerf

func (qm *ServerMgr) CreateWorkPerf(id Workunit_Unique_Identifier) (err error)

func (*ServerMgr) DeleteJobByUser

func (qm *ServerMgr) DeleteJobByUser(jobid string, u *user.User, full bool) (err error)

func (*ServerMgr) DeleteSuspendedJobsByUser

func (qm *ServerMgr) DeleteSuspendedJobsByUser(u *user.User, full bool) (num int)

func (*ServerMgr) DeleteZombieJobsByUser

func (qm *ServerMgr) DeleteZombieJobsByUser(u *user.User, full bool) (num int)

delete jobs in db with "queued" or "in-progress" state but not in the queue (zombie jobs) that user has access to

func (*ServerMgr) EnqueueTasksByJobId

func (qm *ServerMgr) EnqueueTasksByJobId(jobid string) (err error)

---task methods---- this is invoked after a job is uploaded and saved in mongo

func (*ServerMgr) FetchDataToken

func (qm *ServerMgr) FetchDataToken(work_id Workunit_Unique_Identifier, clientid string) (token string, err error)

--workunit methds (servermgr implementation)

func (*ServerMgr) FetchPrivateEnv

func (qm *ServerMgr) FetchPrivateEnv(id Workunit_Unique_Identifier, clientid string) (env map[string]string, err error)

func (*ServerMgr) FinalizeJobPerf

func (qm *ServerMgr) FinalizeJobPerf(jobid string)

func (*ServerMgr) FinalizeTaskPerf

func (qm *ServerMgr) FinalizeTaskPerf(task *Task) (err error)

TODO evaluate err

func (*ServerMgr) FinalizeWorkPerf

func (qm *ServerMgr) FinalizeWorkPerf(id Workunit_Unique_Identifier, reportfile string) (err error)

func (*ServerMgr) GetActiveJobs

func (qm *ServerMgr) GetActiveJobs() (ajobs map[string]bool)

func (*ServerMgr) GetJsonStatus

func (qm *ServerMgr) GetJsonStatus() (status map[string]map[string]int, err error)

func (*ServerMgr) GetQueue

func (qm *ServerMgr) GetQueue(name string) interface{}

func (*ServerMgr) GetReportMsg

func (qm *ServerMgr) GetReportMsg(id Workunit_Unique_Identifier, logname string) (report string, err error)

func (*ServerMgr) GetStepInputObjects

func (qm *ServerMgr) GetStepInputObjects(job *Job, task_id Task_Unique_Identifier, workflow_input_map map[string]cwl.CWLType, workflow_step *cwl.WorkflowStep) (workunit_input_map cwl.JobDocMap, err error)

func (*ServerMgr) GetSuspendJobs

func (qm *ServerMgr) GetSuspendJobs() (sjobs map[string]bool)

func (*ServerMgr) GetTextStatus

func (qm *ServerMgr) GetTextStatus() string

func (*ServerMgr) IsJobRegistered

func (qm *ServerMgr) IsJobRegistered(id string) bool

func (*ServerMgr) Lock

func (qm *ServerMgr) Lock()

func (*ServerMgr) LogJobPerf

func (qm *ServerMgr) LogJobPerf(jobid string)

func (*ServerMgr) NoticeHandle

func (qm *ServerMgr) NoticeHandle()

func (*ServerMgr) QueueStatus

func (qm *ServerMgr) QueueStatus() string

func (*ServerMgr) RLock

func (qm *ServerMgr) RLock()

func (*ServerMgr) RUnlock

func (qm *ServerMgr) RUnlock()

func (*ServerMgr) RecomputeJob

func (qm *ServerMgr) RecomputeJob(jobid string, stage string) (err error)

recompute job from specified task stage

func (*ServerMgr) RecoverJob

func (qm *ServerMgr) RecoverJob(id string) (err error)

recover a job in db that is missing from queue (caused by server restarting)

func (*ServerMgr) RecoverJobs

func (qm *ServerMgr) RecoverJobs() (err error)

recover jobs not completed before awe-server restarts

func (*ServerMgr) ResubmitJob

func (qm *ServerMgr) ResubmitJob(jobid string) (err error)

recompute job from beginning

func (*ServerMgr) ResumeQueue

func (qm *ServerMgr) ResumeQueue()

func (*ServerMgr) ResumeSuspendedJobByUser

func (qm *ServerMgr) ResumeSuspendedJobByUser(id string, u *user.User) (err error)

resubmit a suspended job if the user is authorized

func (*ServerMgr) ResumeSuspendedJobsByUser

func (qm *ServerMgr) ResumeSuspendedJobsByUser(u *user.User) (num int)

func (*ServerMgr) SaveStdLog

func (qm *ServerMgr) SaveStdLog(id Workunit_Unique_Identifier, logname string, tmppath string) (err error)

func (*ServerMgr) ShowTasks

func (qm *ServerMgr) ShowTasks()

show functions used in debug

func (*ServerMgr) SuspendJob

func (qm *ServerMgr) SuspendJob(jobid string, jerror *JobError) (err error)

use for JOB_STAT_SUSPEND and JOB_STAT_FAILED_PERMANENT

func (*ServerMgr) SuspendQueue

func (qm *ServerMgr) SuspendQueue()

func (*ServerMgr) TaskHandle

func (qm *ServerMgr) TaskHandle()

func (*ServerMgr) Unlock

func (qm *ServerMgr) Unlock()

func (*ServerMgr) UpdateJobPerfStartTime

func (qm *ServerMgr) UpdateJobPerfStartTime(jobid string)

func (*ServerMgr) UpdateJobTaskToInProgress

func (qm *ServerMgr) UpdateJobTaskToInProgress(works []*Workunit) (err error)

update job/task states from "queued" to "in-progress" once the first workunit is checked out

func (*ServerMgr) UpdateQueueLoop

func (qm *ServerMgr) UpdateQueueLoop()

func (*ServerMgr) UpdateQueueToken

func (qm *ServerMgr) UpdateQueueToken(job *Job) (err error)

update tokens for in-memory data structures

func (*ServerMgr) UpdateTaskPerfStartTime

func (qm *ServerMgr) UpdateTaskPerfStartTime(task *Task) (err error)

type StandardResponse

type StandardResponse struct {
	Status int         `json:"status"`
	Data   interface{} `json:"data"`
	Error  []string    `json:"error"`
}

func NotifyWorkunitProcessedWithLogs

func NotifyWorkunitProcessedWithLogs(work *Workunit, perf *WorkPerf, sendstdlogs bool) (response *StandardResponse, err error)

type StringLocked

type StringLocked struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*StringLocked) Get

func (s *StringLocked) Get() string

func (*StringLocked) Set

func (s *StringLocked) Set(value string)

type StructContainer

type StructContainer struct {
	Data interface{} `json:"data"`
}

type Task

type Task struct {
	TaskRaw `bson:",inline"`
	Inputs  []*IO `bson:"inputs" json:"inputs"`
	Outputs []*IO `bson:"outputs" json:"outputs"`
	Predata []*IO `bson:"predata" json:"predata"`
}

func CreateTasks

func CreateTasks(job *Job, workflow string, steps []cwl.WorkflowStep) (tasks []*Task, err error)

func NewTask

func NewTask(job *Job, workflow string, task_id string) (t *Task, err error)

currently this is only used to make a new task from a depricated task

func (*Task) CollectDependencies

func (task *Task) CollectDependencies() (changed bool, err error)

populate DependsOn

func (*Task) CreateIndex

func (task *Task) CreateIndex() (err error)

checks and creates indices on shock node if needed

func (*Task) CreateWorkunits

func (task *Task) CreateWorkunits(qm *ServerMgr, job *Job) (wus []*Workunit, err error)

func (*Task) DeleteInput

func (task *Task) DeleteInput() (modified int)

func (*Task) DeleteOutput

func (task *Task) DeleteOutput() (modified int)

func (*Task) GetOutput

func (task *Task) GetOutput(filename string) (output *IO, err error)

func (*Task) GetOutputs

func (task *Task) GetOutputs() (outputs []*IO, err error)

func (*Task) GetTaskLogs

func (task *Task) GetTaskLogs() (tlog *TaskLog, err error)

func (*Task) IncrementComputeTime

func (task *Task) IncrementComputeTime(inc int) (err error)

func (*Task) IncrementRemainWork

func (task *Task) IncrementRemainWork(inc int, writelock bool) (remainwork int, err error)

func (*Task) Init

func (task *Task) Init(job *Job) (changed bool, err error)

func (*Task) InitPartIndex

func (task *Task) InitPartIndex() (err error)

get part size based on partition/index info this resets task.Partition when called only 1 task.Inputs allowed unless 'partinfo.input' specified on POST if fail to get index info, task.TotalWork set to 1 and task.Partition set to nil

func (*Task) SetRemainWork

func (task *Task) SetRemainWork(num int, writelock bool) (err error)

func (*Task) SetTaskType

func (task *Task) SetTaskType(type_str string, writelock bool) (err error)

func (*Task) UpdateInputs

func (task *Task) UpdateInputs() (err error)

func (*Task) UpdateOutputs

func (task *Task) UpdateOutputs() (err error)

func (*Task) UpdatePredata

func (task *Task) UpdatePredata() (err error)

type TaskDep

type TaskDep struct {
	TaskRaw `bson:",inline"`
	Inputs  IOmap `bson:"inputs" json:"inputs"`
	Outputs IOmap `bson:"outputs" json:"outputs"`
	Predata IOmap `bson:"predata" json:"predata"`
}

Deprecated JobDep struct uses deprecated TaskDep struct which uses the deprecated IOmap. Maintained for backwards compatibility. Jobs that cannot be parsed into the Job struct, but can be parsed into the JobDep struct will be translated to the new Job struct. (=deprecated=)

type TaskLog

type TaskLog struct {
	Id            string     `bson:"taskid" json:"taskid"`
	State         string     `bson:"state" json:"state"`
	TotalWork     int        `bson:"totalwork" json:"totalwork"`
	CompletedDate time.Time  `bson:"completedDate" json:"completeddate"`
	Workunits     []*WorkLog `bson:"workunits" json:"workunits"`
}

type TaskMap

type TaskMap struct {
	RWMutex
	// contains filtered or unexported fields
}

func NewTaskMap

func NewTaskMap() (t *TaskMap)

func (*TaskMap) Add

func (tm *TaskMap) Add(task *Task) (err error)

func (*TaskMap) Delete

func (tm *TaskMap) Delete(taskid Task_Unique_Identifier) (task *Task, ok bool)

func (*TaskMap) Get

func (tm *TaskMap) Get(taskid Task_Unique_Identifier, lock bool) (task *Task, ok bool, err error)

func (*TaskMap) GetTasks

func (tm *TaskMap) GetTasks() (tasks []*Task, err error)

func (*TaskMap) Has

func (tm *TaskMap) Has(taskid Task_Unique_Identifier, lock bool) (ok bool, err error)

func (*TaskMap) Len

func (tm *TaskMap) Len() (length int, err error)

type TaskPerf

type TaskPerf struct {
	Queued       int64   `bson:"queued" json:"queued"`
	Start        int64   `bson:"start" json:"start"`
	End          int64   `bson:"end" json:"end"`
	Resp         int64   `bson:"resp" json:"resp"` //End -Queued
	InFileSizes  []int64 `bson:"size_infile" json:"size_infile"`
	OutFileSizes []int64 `bson:"size_outfile" json:"size_outfile"`
}

func NewTaskPerf

func NewTaskPerf(id string) *TaskPerf

type TaskRaw

type TaskRaw struct {
	RWMutex                `bson:"-" json:"-"`
	Task_Unique_Identifier `bson:",inline"`

	Id                  string                   `bson:"taskid" json:"taskid"` // old-style
	TaskType            string                   `bson:"task_type" json:"task_type"`
	Info                *Info                    `bson:"-" json:"-"` // this is just a pointer to the job.Info
	Cmd                 *Command                 `bson:"cmd" json:"cmd"`
	Partition           *PartInfo                `bson:"partinfo" json:"-"`
	DependsOn           []string                 `bson:"dependsOn" json:"dependsOn"` // only needed if dependency cannot be inferred from Input.Origin
	TotalWork           int                      `bson:"totalwork" json:"totalwork"`
	MaxWorkSize         int                      `bson:"maxworksize"   json:"maxworksize"`
	RemainWork          int                      `bson:"remainwork" json:"remainwork"`
	State               string                   `bson:"state" json:"state"`
	CreatedDate         time.Time                `bson:"createdDate" json:"createddate"`
	StartedDate         time.Time                `bson:"startedDate" json:"starteddate"`
	CompletedDate       time.Time                `bson:"completedDate" json:"completeddate"`
	ComputeTime         int                      `bson:"computetime" json:"computetime"`
	UserAttr            map[string]interface{}   `bson:"userattr" json:"userattr"`
	ClientGroups        string                   `bson:"clientgroups" json:"clientgroups"`
	WorkflowStep        *cwl.WorkflowStep        `bson:"workflowStep" json:"workflowStep"` // CWL-only
	StepOutputInterface interface{}              `bson:"stepOutput" json:"stepOutput"`     // CWL-only
	StepInput           *cwl.Job_document        `bson:"-" json:"-"`                       // CWL-only
	StepOutput          *cwl.Job_document        `bson:"-" json:"-"`                       // CWL-only
	Scatter_task        bool                     `bson:"scatter_task" json:"scatter_task"` // CWL-only
	Children            []Task_Unique_Identifier `bson:"children" json:"children"`         // CWL-only
	Children_ptr        []*Task                  `bson:"-" json:"-"`                       // CWL-only
	Finalizing          bool                     `bson:"-" json:"-"`                       // CWL-only
}

func NewTaskRaw

func NewTaskRaw(task_id Task_Unique_Identifier, info *Info) (tr TaskRaw, err error)

func (*TaskRaw) Finalize

func (task *TaskRaw) Finalize() (ok bool, err error)

this function prevents a dead-lock when a sub-workflow task finalizes

func (*TaskRaw) GetChildren

func (task *TaskRaw) GetChildren(qm *ServerMgr) (children []*Task, err error)

func (*TaskRaw) GetDependsOn

func (task *TaskRaw) GetDependsOn() (dep []string, err error)

func (*TaskRaw) GetId

func (task *TaskRaw) GetId() (id Task_Unique_Identifier, err error)

func (*TaskRaw) GetJobId

func (task *TaskRaw) GetJobId() (id string, err error)

func (*TaskRaw) GetParent

func (task *TaskRaw) GetParent() (p string, err error)

func (*TaskRaw) GetState

func (task *TaskRaw) GetState() (state string, err error)

func (*TaskRaw) GetStateNamed

func (task *TaskRaw) GetStateNamed(name string) (state string, err error)

only for debugging purposes

func (*TaskRaw) GetTaskType

func (task *TaskRaw) GetTaskType() (type_str string, err error)

func (*TaskRaw) InitRaw

func (task *TaskRaw) InitRaw(job *Job) (changed bool, err error)

func (*TaskRaw) SetCompletedDate

func (task *TaskRaw) SetCompletedDate(t time.Time, lock bool) (err error)

func (*TaskRaw) SetCreatedDate

func (task *TaskRaw) SetCreatedDate(t time.Time) (err error)

func (*TaskRaw) SetStartedDate

func (task *TaskRaw) SetStartedDate(t time.Time) (err error)

func (*TaskRaw) SetState

func (task *TaskRaw) SetState(new_state string, write_lock bool) (err error)

func (*TaskRaw) SetStepOutput

func (task *TaskRaw) SetStepOutput(jd *cwl.Job_document, lock bool) (err error)

type Task_Unique_Identifier

type Task_Unique_Identifier struct {
	TaskName string `bson:"task_name" json:"task_name" mapstructure:"task_name"` // example: #main/filter
	Parent   string `bson:"parent" json:"parent" mapstructure:"parent"`
	JobId    string `bson:"jobid" json:"jobid" mapstructure:"jobid"`
}

func New_Task_Unique_Identifier

func New_Task_Unique_Identifier(jobid string, parent string, taskname string) (t Task_Unique_Identifier, err error)

func New_Task_Unique_Identifier_FromString

func New_Task_Unique_Identifier_FromString(old_style_id string) (t Task_Unique_Identifier, err error)

func (Task_Unique_Identifier) String

func (taskid Task_Unique_Identifier) String() (s string, err error)

type Task_p

type Task_p struct {
	Cmd *Command_p `json:"cmd"`
}

type WorkList

type WorkList []*Workunit

queuing/prioritizing related functions

func (WorkList) Len

func (wl WorkList) Len() int

func (WorkList) Swap

func (wl WorkList) Swap(i, j int)

type WorkLog

type WorkLog struct {
	Id   string            `bson:"wuid" json:"wuid"` // TODO change !
	Rank int               `bson:"rank" json:"rank"`
	Logs map[string]string `bson:"logs" json:"logs"`
}

func NewWorkLog

func NewWorkLog(id Workunit_Unique_Identifier) (wlog *WorkLog, err error)

type WorkMgr

type WorkMgr interface {
	GetWorkById(Workunit_Unique_Identifier) (*Workunit, error)
	ShowWorkunits(string) ([]*Workunit, error)
	ShowWorkunitsByUser(string, *user.User) []*Workunit
	CheckoutWorkunits(string, string, *Client, int64, int) ([]*Workunit, error)
	NotifyWorkStatus(Notice)
	EnqueueWorkunit(*Workunit) error
	FetchDataToken(Workunit_Unique_Identifier, string) (string, error)
	FetchPrivateEnv(Workunit_Unique_Identifier, string) (map[string]string, error)
}

type WorkPerf

type WorkPerf struct {
	Queued             int64   `bson:"queued" json:"queued"`                   // WQ (queued at server or client, depending on who creates it)
	Done               int64   `bson:"done" json:"done"`                       // WD (done at server)
	Resp               int64   `bson:"resp" json:"resp"`                       // Done - Queued (server metric)
	Checkout           int64   `bson:"checkout" json:"checkout"`               // checkout at client
	Deliver            int64   `bson:"deliver" json:"deliver"`                 // done at client
	ClientResp         int64   `bson:"clientresp" json:"clientresp"`           // Deliver - Checkout (client metric)
	PreDataIn          float64 `bson:"time_predata_in" json:"time_predata_in"` // time in seconds for downloading prerequisite data at client
	DataIn             float64 `bson:"time_data_in" json:"time_data_in"`       // time in seconds for input data move-in at client
	DataOut            float64 `bson:"time_data_out" json:"time_data_out"`     // time in seconds for output data move-out at client
	Runtime            int64   `bson:"runtime" json:"runtime"`                 // time in seconds for computation at client
	DockerPrep         int64   `bson:"dockerprep" json:"dockerprep"`           // time in seconds for docker preparation on client
	MaxMemUsage        int64   `bson:"max_mem_usage" json:"max_mem_usage"`     // maxium memory consumption
	MaxMemoryTotalRss  int64   `bson:"max_memory_total_rss" json:"max_memory_total_rss"`
	MaxMemoryTotalSwap int64   `bson:"max_memory_total_swap" json:"max_memory_total_swap"`
	ClientId           string  `bson:"client_id" json:"client_id"`
	PreDataSize        int64   `bson:"size_predata" json:"size_predata"` //predata moved over network
	InFileSize         int64   `bson:"size_infile" json:"size_infile"`   //input file moved over network
	OutFileSize        int64   `bson:"size_outfile" json:"size_outfile"` //outpuf file moved over network
}

func NewWorkPerf

func NewWorkPerf() *WorkPerf

type WorkQueue

type WorkQueue struct {
	Queue    WorkunitMap // WORK_STAT_QUEUED - waiting workunits
	Checkout WorkunitMap // WORK_STAT_CHECKOUT - workunits being checked out
	Suspend  WorkunitMap // WORK_STAT_SUSPEND - suspended workunits
	// contains filtered or unexported fields
}

func NewWorkQueue

func NewWorkQueue() *WorkQueue

func (*WorkQueue) Add

func (wq *WorkQueue) Add(workunit *Workunit) (err error)

func (*WorkQueue) Clean

func (wq *WorkQueue) Clean() (workunits []*Workunit)

func (*WorkQueue) Delete

func (wq *WorkQueue) Delete(id Workunit_Unique_Identifier) (err error)

func (*WorkQueue) Get

func (wq *WorkQueue) Get(id Workunit_Unique_Identifier) (w *Workunit, ok bool, err error)

func (*WorkQueue) GetAll

func (wq *WorkQueue) GetAll() (worklist []*Workunit, err error)

func (*WorkQueue) GetForJob

func (wq *WorkQueue) GetForJob(jobid string) (worklist []*Workunit, err error)

func (*WorkQueue) Has

func (wq *WorkQueue) Has(id Workunit_Unique_Identifier) (has bool, err error)

func (*WorkQueue) Len

func (wq *WorkQueue) Len() (int, error)

func (*WorkQueue) StatusChange

func (wq *WorkQueue) StatusChange(id Workunit_Unique_Identifier, workunit *Workunit, new_status string, reason string) (err error)

type WorkerRuntime

type WorkerRuntime struct {
	Id            string   `bson:"id" json:"id"`     // this is a uuid (the only relevant identifier)
	Name          string   `bson:"name" json:"name"` // this can be anything you want
	Group         string   `bson:"group" json:"group"`
	User          string   `bson:"user" json:"user"`
	Domain        string   `bson:"domain" json:"domain"`
	InstanceId    string   `bson:"instance_id" json:"instance_id"`     // Openstack specific
	InstanceType  string   `bson:"instance_type" json:"instance_type"` // Openstack specific
	Host          string   `bson:"host" json:"host"`                   // deprecated
	Hostname      string   `bson:"hostname" json:"hostname"`
	Host_ip       string   `bson:"host_ip" json:"host_ip"` // Host can be physical machine or VM, whatever is helpful for management
	CPUs          int      `bson:"cores" json:"cores"`
	Apps          []string `bson:"apps" json:"apps"`
	GitCommitHash string   `bson:"git_commit_hash" json:"git_commit_hash"`
	Version       string   `bson:"version" json:"version"`
}

worker info that does not change at runtime

type WorkerState

type WorkerState struct {
	Busy         bool          `bson:"busy" json:"busy"` // a state
	Current_work *WorkunitList `bson:"current_work" json:"current_work"`
}

changes at runtime

func NewWorkerState

func NewWorkerState() (ws *WorkerState)

type Workflow

type Workflow struct {
	WfInfo     awf_info          `bson:"workflow_info" json:"workflow_info"`
	JobInfo    awf_jobinfo       `bson:"job_info" json:"job_info"`
	RawInputs  map[string]string `bson:"raw_inputs" json:"raw_inputs"`
	Variables  map[string]string `bson:"variables" json:"variables"`
	DataServer string            `bson:"data_server" json:"data_server"`
	Tasks      []*awf_task       `bson:"tasks" json:"tasks"`
}

type WorkflowInstance

type WorkflowInstance struct {
	Id          string           `bson:"id" json:"id" mapstructure:"id"`
	Inputs      cwl.Job_document `bson:"inputs" json:"inputs" mapstructure:"inputs"`
	Outputs     cwl.Job_document `bson:"outputs" json:"outputs" mapstructure:"outputs"`
	RemainTasks int              `bson:"remaintasks" json:"remaintasks" mapstructure:"remaintasks"`
}

func NewWorkflowInstanceFromInterface

func NewWorkflowInstanceFromInterface(original interface{}) (wi WorkflowInstance, err error)

type WorkflowMgr

type WorkflowMgr struct {
	// contains filtered or unexported fields
}
var (
	AwfMgr *WorkflowMgr
)

func NewWorkflowMgr

func NewWorkflowMgr() *WorkflowMgr

func (*WorkflowMgr) AddWorkflow

func (wfm *WorkflowMgr) AddWorkflow(name string, awf *Workflow)

func (*WorkflowMgr) GetAllWorkflows

func (wfm *WorkflowMgr) GetAllWorkflows() (workflows []*Workflow)

func (*WorkflowMgr) GetWorkflow

func (wfm *WorkflowMgr) GetWorkflow(name string) (awf *Workflow, err error)

func (*WorkflowMgr) LoadWorkflows

func (wfm *WorkflowMgr) LoadWorkflows() (err error)

type Workunit

type Workunit struct {
	Workunit_Unique_Identifier `bson:",inline" json:",inline" mapstructure:",squash"`
	Id                         string                 `bson:"id,omitempty" json:"id,omitempty" mapstructure:"id,omitempty"`       // global identifier: jobid_taskid_rank (for backwards coompatibility only)
	WuId                       string                 `bson:"wuid,omitempty" json:"wuid,omitempty" mapstructure:"wuid,omitempty"` // deprecated !
	Info                       *Info                  `bson:"info,omitempty" json:"info,omitempty" mapstructure:"info,omitempty"`
	Inputs                     []*IO                  `bson:"inputs,omitempty" json:"inputs,omitempty" mapstructure:"inputs,omitempty"`
	Outputs                    []*IO                  `bson:"outputs,omitempty" json:"outputs,omitempty" mapstructure:"outputs,omitempty"`
	Predata                    []*IO                  `bson:"predata,omitempty" json:"predata,omitempty" mapstructure:"predata,omitempty"`
	Cmd                        *Command               `bson:"cmd,omitempty" json:"cmd,omitempty" mapstructure:"cmd,omitempty"`
	TotalWork                  int                    `bson:"totalwork,omitempty" json:"totalwork,omitempty" mapstructure:"totalwork,omitempty"`
	Partition                  *PartInfo              `bson:"part,omitempty" json:"part,omitempty" mapstructure:"part,omitempty"`
	State                      string                 `bson:"state,omitempty" json:"state,omitempty" mapstructure:"state,omitempty"`
	Failed                     int                    `bson:"failed,omitempty" json:"failed,omitempty" mapstructure:"failed,omitempty"`
	CheckoutTime               time.Time              `bson:"checkout_time,omitempty" json:"checkout_time,omitempty" mapstructure:"checkout_time,omitempty"`
	Client                     string                 `bson:"client,omitempty" json:"client,omitempty" mapstructure:"client,omitempty"`
	ComputeTime                int                    `bson:"computetime,omitempty" json:"computetime,omitempty" mapstructure:"computetime,omitempty"`
	ExitStatus                 int                    `bson:"exitstatus,omitempty" json:"exitstatus,omitempty" mapstructure:"exitstatus,omitempty"` // Linux Exit Status Code (0 is success)
	Notes                      []string               `bson:"notes,omitempty" json:"notes,omitempty" mapstructure:"notes,omitempty"`
	UserAttr                   map[string]interface{} `bson:"userattr,omitempty" json:"userattr,omitempty" mapstructure:"userattr,omitempty"`
	ShockHost                  string                 `bson:"shockhost,omitempty" json:"shockhost,omitempty" mapstructure:"shockhost,omitempty"` // specifies default Shock host for outputs
	CWL_workunit               *CWL_workunit          `bson:"cwl,omitempty" json:"cwl,omitempty" mapstructure:"cwl,omitempty"`
	WorkPath                   string                 // this is the working directory. If empty, it will be computed.
	WorkPerf                   *WorkPerf
}

func NewWorkunit

func NewWorkunit(qm *ServerMgr, task *Task, rank int, job *Job) (workunit *Workunit, err error)

func (*Workunit) CDworkpath

func (work *Workunit) CDworkpath() (err error)

func (*Workunit) GetId

func (w *Workunit) GetId() (id Workunit_Unique_Identifier)

func (*Workunit) GetNotes

func (work *Workunit) GetNotes() string

func (*Workunit) Mkdir

func (work *Workunit) Mkdir() (err error)

func (*Workunit) Part

func (work *Workunit) Part() (part string)

calculate the range of data part algorithm: try to evenly distribute indexed parts to workunits e.g. totalWork=4, totalParts=10, then each workunits have parts 3,3,2,2

func (*Workunit) Path

func (work *Workunit) Path() (path string, err error)

func (*Workunit) RemoveDir

func (work *Workunit) RemoveDir() (err error)

func (*Workunit) SetState

func (work *Workunit) SetState(new_state string, reason string) (err error)

type WorkunitList

type WorkunitList struct {
	RWMutex `bson:"-" json:"-"`

	Data []string `json:"data"`
	// contains filtered or unexported fields
}

func NewWorkunitList

func NewWorkunitList() *WorkunitList

func (*WorkunitList) Add

func (cl *WorkunitList) Add(workid Workunit_Unique_Identifier) (err error)

lock always

func (*WorkunitList) Delete

func (cl *WorkunitList) Delete(workid Workunit_Unique_Identifier, write_lock bool) (err error)

func (*WorkunitList) Delete_all

func (cl *WorkunitList) Delete_all(workid string, write_lock bool) (err error)

func (*WorkunitList) FillMap

func (cl *WorkunitList) FillMap() (err error)

opposite of sync; take Data entries and copy them into map

func (*WorkunitList) Get_list

func (cl *WorkunitList) Get_list(do_read_lock bool) (assigned_work_ids []Workunit_Unique_Identifier, err error)

func (*WorkunitList) Get_string_list

func (cl *WorkunitList) Get_string_list(do_read_lock bool) (work_ids []string, err error)

func (*WorkunitList) Has

func (cl *WorkunitList) Has(workid Workunit_Unique_Identifier) (ok bool, err error)

func (*WorkunitList) Init

func (this *WorkunitList) Init(name string)

func (*WorkunitList) Length

func (cl *WorkunitList) Length(lock bool) (clength int, err error)

type WorkunitMap

type WorkunitMap struct {
	RWMutex
	Map map[Workunit_Unique_Identifier]*Workunit
}

func NewWorkunitMap

func NewWorkunitMap() *WorkunitMap

func (*WorkunitMap) Delete

func (wm *WorkunitMap) Delete(id Workunit_Unique_Identifier) (err error)

func (*WorkunitMap) Get

func (wm *WorkunitMap) Get(id Workunit_Unique_Identifier) (workunit *Workunit, ok bool, err error)

func (*WorkunitMap) GetWorkunits

func (wm *WorkunitMap) GetWorkunits() (workunits []*Workunit, err error)

func (*WorkunitMap) Len

func (wm *WorkunitMap) Len() (length int, err error)

func (*WorkunitMap) Set

func (wm *WorkunitMap) Set(workunit *Workunit) (err error)

type Workunit_Unique_Identifier

type Workunit_Unique_Identifier struct {
	Task_Unique_Identifier `bson:",inline" json:",inline" mapstructure:",squash"` // TaskName, Workflow, JobId
	Rank                   int                                                    `bson:"rank" json:"rank" mapstructure:"rank"` // this is the local identifier

}

func New_Workunit_Unique_Identifier

func New_Workunit_Unique_Identifier(task Task_Unique_Identifier, rank int) (wui Workunit_Unique_Identifier)

func New_Workunit_Unique_Identifier_FromString

func New_Workunit_Unique_Identifier_FromString(old_style_id string) (w Workunit_Unique_Identifier, err error)

func New_Workunit_Unique_Identifier_from_interface

func New_Workunit_Unique_Identifier_from_interface(original interface{}) (wui Workunit_Unique_Identifier, err error)

func (Workunit_Unique_Identifier) GetTask

func (Workunit_Unique_Identifier) String

func (w Workunit_Unique_Identifier) String() (work_str string, err error)

type WorkunitsSortby

type WorkunitsSortby struct {
	Order     string
	Direction string
	Workunits []*Workunit
}

func (WorkunitsSortby) Len

func (w WorkunitsSortby) Len() int

func (WorkunitsSortby) Less

func (w WorkunitsSortby) Less(i, j int) bool

func (WorkunitsSortby) Swap

func (w WorkunitsSortby) Swap(i, j int)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL