diff --git a/api.go b/api.go index cfd6637..3b8e349 100644 --- a/api.go +++ b/api.go @@ -2,54 +2,51 @@ package coming import "errors" -//访问Stop()接口失败,没有成功关闭 -// -type StopRefuseError struct{} - -func (StopRefuseError) Error() string { - return "Process refused to stop." -} - +//TaskInterface contains methods of a task need implement type TaskInterface interface { //正常启动目标返回nil,否则返回错误信息,程序执行期间阻塞 Run() error + //Stop can stop task next running,and the running task will run before finished //实现改接口,以调用Stop()进行停止当前携程,本质是在执行过程中,通过通道传递信号,使得协程停止 //执行stop方法,当程序正常停止时返回nil,当程序停止被拒绝,返回StopRefuseError,当程序停止 //成功,但是报错时,将返回对应的错误信息 Stop() error + //Start to run all tasks from header //如果完成了Start()操作则返回nil,否则返回err信息 Start() error //Pause to stop next task , resume to allow run Pause() - //Pause to stop next task , resume to allow run + //Resume to allow run ,Pause to stop next task Resume() } +//BaseTask is an default task with method always return message not nil. +//it will make you only focus on the method you will use type BaseTask struct{} //正常启动目标返回nil,否则返回错误信息,程序执行期间阻塞 -func (this BaseTask) Run() error { +func (bt BaseTask) Run() error { return errors.New("you need override Run() method") } //实现改接口,以调用Stop()进行停止当前携程,本质是在执行过程中,通过通道传递信号,使得协程停止 //执行stop方法,当程序正常停止时返回nil,当程序停止被拒绝,返回StopRefuseError,当程序停止 //成功,但是报错时,将返回对应的错误信息 -func (this BaseTask) Stop() error { +func (bt BaseTask) Stop() error { return errors.New("you need override Stop() method") } //如果完成了Start()操作则返回nil,否则返回err信息 -func (this BaseTask) Start() error { +func (bt BaseTask) Start() error { return errors.New("you need override Start() method") } //Pause to stop next task , resume to allow run -func (this BaseTask) Pause() { +func (bt BaseTask) Pause() { panic("you need override Pause() and Resume() method to use this function") } //Pause to stop next task , resume to allow run -func (this BaseTask) Resume() { +func (bt BaseTask) Resume() { panic("you need override Pause() and Resume() method to use this function") } diff --git a/schedule_task.go b/schedule_task.go index 1e2586b..c609632 100644 --- a/schedule_task.go +++ b/schedule_task.go @@ -18,10 +18,10 @@ var ( locker sync.RWMutex ) +// NewSchedule create a new scheduled task and append to tasks,whitch will be checked every second. +// this schedule task can match any time in a year with minimum precision second // 这个function新建一个简单定时任务到后台任务集,schedule可以通过month, day, weekday, hour, minute, second. // 这6个字段匹配到一年中的任意时刻,最小精度为秒 -// this function create a new scheduled task and append to tasks,whitch will be checked every second. -// this schedule task can match any time in a year with minimum precision second func NewSchedule(month, day, weekday, hour, minute, second int8, name string, task func(time.Time)) { cj := schedule{month, day, weekday, hour, minute, second, task} locker.Lock() @@ -29,30 +29,32 @@ func NewSchedule(month, day, weekday, hour, minute, second int8, name string, ta tasks[name] = cj } +// RemoveSchedule can remove schedule task safely func RemoveSchedule(name string) { locker.Lock() defer locker.Unlock() delete(tasks, name) } +// NewMonthlySchedule creates a new schedule task matched any exactly time in a month. // 创建一个定时任务,匹配一个月中的任意时间,但时间必须是明确的 -// this creates a new scheduled task matched any exactly time in a month. func NewMonthlySchedule(day, hour, minute, second int8, name string, task func(time.Time)) { NewSchedule(ANY, day, ANY, hour, minute, second, name, task) } +// NewWeeklySchedule creates a new scheduled task matched any exactly time in a week. // 创建一个定时任务,匹配一周中的任意时间,但时间必须是明确的 -// this creates a new scheduled task matched any exactly time in a week. func NewWeeklySchedule(weekday, hour, minute, second int8, name string, task func(time.Time)) { NewSchedule(ANY, ANY, weekday, hour, minute, second, name, task) } +// NewDailySchedule creates a new scheduled task matched any exactly time of day. // 创建一个定时任务,匹配一天中的任意时间,但时间必须是明确的 -// this creates a new scheduled task matched any exactly time of day. func NewDailySchedule(hour, minute, second int8, name string, task func(time.Time)) { NewSchedule(ANY, ANY, ANY, hour, minute, second, name, task) } +//Matches will match now time to execute task function func (sc schedule) Matches(t time.Time) (ok bool) { ok = (sc.Month == ANY || sc.Month == int8(t.Month())) && (sc.Day == ANY || sc.Day == int8(t.Day())) && diff --git a/schedule_task_test.go b/schedule_task_test.go index 90bfc68..892ce5c 100644 --- a/schedule_task_test.go +++ b/schedule_task_test.go @@ -6,12 +6,6 @@ import ( "time" ) -/** - * @author anyang - * Email: 1300378587@qq.com - * Created Date:2020-06-21 11:35 - */ - func TestNewSchedule(t *testing.T) { NewDailySchedule(ANY, ANY, ANY, "小任务", func(t time.Time) { diff --git a/task.go b/task.go index b086d4f..ee541a9 100644 --- a/task.go +++ b/task.go @@ -6,9 +6,8 @@ import ( "log" ) -//定义可执行的方法 - -//适合执行顺序长时间任务 +// Task struct should be used for task that will running for long time so +// that you can use other methods like Pause() before finish type Task struct { BaseTask status bool @@ -21,13 +20,15 @@ type Task struct { data map[int]interface{} } -//func(this *Task,index int){ +// TaskItem e.g. +// func(task *Task,index int){ // //do something -// //if there is result you need ,you can put it into this.data[index] -// this.data[index] = result +// //if there is result you need ,you can put it into task.data[index] +// task.data[index] = result //} type TaskItem func(*Task, int) error +// New create a new Task schedule,you can use method to operate you taskItem func (Task) New() *Task { return &Task{ status: false, @@ -41,102 +42,103 @@ func (Task) New() *Task { } } -// -func (this *Task) Add(t TaskItem) *Task { - if this.works == nil { +// Add function add a TaskItem into Task +func (task *Task) Add(t TaskItem) *Task { + if task.works == nil { panic("please use Task.New() create Task") } - if this.status { + if task.status { panic("This Task had start ,can not add") } - this.works = append(this.works, t) - return this + task.works = append(task.works, t) + return task } -//this function will start a waiting for run TaskItem of slice -func (this *Task) Start() error { - this.status = true +// Start function will start a waiting for run TaskItem of slice +func (task *Task) Start() error { + task.status = true go func() { - <-this.start - for index, _ := range this.works { + <-task.start + for index, _ := range task.works { select { - case <-this.stop: + case <-task.stop: //关闭其他通道 - close(this.start) - close(this.pause) - close(this.stop) - log.Print(&this, " has stop") + close(task.start) + close(task.pause) + close(task.stop) + log.Print(&task, " has stop") goto END - case <-this.pause: - this.pause <- 2 + case <-task.pause: + task.pause <- 2 default: } - this.Exec(index) + task.Exec(index) } END: - this.finish = true - this.wait <- 1 - this.status = false + task.finish = true + task.wait <- 1 + task.status = false }() return nil } -func (this *Task) Exec(index int) { - this.works[index](this, index) +// Exec will execute taskItem function in +func (task *Task) Exec(index int) { + task.works[index](task, index) } -func (this *Task) WaitFinish() int { - result := <-this.wait - close(this.wait) +func (task *Task) WaitFinish() int { + result := <-task.wait + close(task.wait) return result } //当需要再次执行Start时可以重新刷新,在此之前需要执行stop,并等待关闭完成 -func (this *Task) Flush() bool { - if this.finish { - this.start = make(chan int, 1) - this.stop = make(chan int, 1) - this.pause = make(chan int) - this.wait = make(chan int) - this.status = false - this.finish = false +func (task *Task) Flush() bool { + if task.finish { + task.start = make(chan int, 1) + task.stop = make(chan int, 1) + task.pause = make(chan int) + task.wait = make(chan int) + task.status = false + task.finish = false return true } return false } -func (this *Task) Stop() error { +func (task *Task) Stop() error { var err error defer func() { if err1 := recover(); err != nil { err = fmt.Errorf("%v", err1) } }() - this.stop <- 1 + task.stop <- 1 return err } -func (this *Task) Run() error { +func (task *Task) Run() error { var err error - if !this.status { + if !task.status { err = errors.New("please do Start() function before Run()") return err } - this.start <- 1 + task.start <- 1 return err } -// 如果多处触发Pause()可能会导致阻塞,建议 go this.Pause(),利用channel阻塞进行暂停 -func (this *Task) Pause() { - this.pause <- 1 +// 如果多处触发Pause()可能会导致阻塞,建议 go task.Pause(),利用channel阻塞进行暂停 +func (task *Task) Pause() { + task.pause <- 1 } -// 如果多处触发Resume()可能会导致阻塞,建议 go this.Pause() -func (this *Task) Resume() { - <-this.pause +// 如果多处触发Resume()可能会导致阻塞,建议 go task.Pause() +func (task *Task) Resume() { + <-task.pause } -//this need user put result of TaskItem into data ,and key is index ,value is result -func (this *Task) Call() (map[int]interface{}, error) { - return this.data, nil +//task need user put result of TaskItem into data ,and key is index ,value is result +func (task *Task) Call() (map[int]interface{}, error) { + return task.data, nil }