Skip to content

Commit

Permalink
对注释进行了调整,功能不变
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanyang committed Aug 27, 2020
1 parent a1fe3bb commit 8232ed5
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 80 deletions.
25 changes: 11 additions & 14 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,54 +2,51 @@ package coming

import "errors"

//访问Stop()接口失败,没有成功关闭
//
type StopRefuseError struct{}

func (StopRefuseError) Error() string {
return "Process refused to stop."
}

//TaskInterface contains methods of a task need implement
type TaskInterface interface {
//正常启动目标返回nil,否则返回错误信息,程序执行期间阻塞
Run() error
//Stop can stop task next running,and the running task will run before finished
//实现改接口,以调用Stop()进行停止当前携程,本质是在执行过程中,通过通道传递信号,使得协程停止
//执行stop方法,当程序正常停止时返回nil,当程序停止被拒绝,返回StopRefuseError,当程序停止
//成功,但是报错时,将返回对应的错误信息
Stop() error
//Start to run all tasks from header
//如果完成了Start()操作则返回nil,否则返回err信息
Start() error
//Pause to stop next task , resume to allow run
Pause()
//Pause to stop next task , resume to allow run
//Resume to allow run ,Pause to stop next task
Resume()
}

//BaseTask is an default task with method always return message not nil.
//it will make you only focus on the method you will use
type BaseTask struct{}

//正常启动目标返回nil,否则返回错误信息,程序执行期间阻塞
func (this BaseTask) Run() error {
func (bt BaseTask) Run() error {
return errors.New("you need override Run() method")
}

//实现改接口,以调用Stop()进行停止当前携程,本质是在执行过程中,通过通道传递信号,使得协程停止
//执行stop方法,当程序正常停止时返回nil,当程序停止被拒绝,返回StopRefuseError,当程序停止
//成功,但是报错时,将返回对应的错误信息
func (this BaseTask) Stop() error {
func (bt BaseTask) Stop() error {
return errors.New("you need override Stop() method")
}

//如果完成了Start()操作则返回nil,否则返回err信息
func (this BaseTask) Start() error {
func (bt BaseTask) Start() error {
return errors.New("you need override Start() method")
}

//Pause to stop next task , resume to allow run
func (this BaseTask) Pause() {
func (bt BaseTask) Pause() {
panic("you need override Pause() and Resume() method to use this function")
}

//Pause to stop next task , resume to allow run
func (this BaseTask) Resume() {
func (bt BaseTask) Resume() {
panic("you need override Pause() and Resume() method to use this function")
}
12 changes: 7 additions & 5 deletions schedule_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,43 @@ var (
locker sync.RWMutex
)

// NewSchedule create a new scheduled task and append to tasks,whitch will be checked every second.
// this schedule task can match any time in a year with minimum precision second
// 这个function新建一个简单定时任务到后台任务集,schedule可以通过month, day, weekday, hour, minute, second.
// 这6个字段匹配到一年中的任意时刻,最小精度为秒
// this function create a new scheduled task and append to tasks,whitch will be checked every second.
// this schedule task can match any time in a year with minimum precision second
func NewSchedule(month, day, weekday, hour, minute, second int8, name string, task func(time.Time)) {
cj := schedule{month, day, weekday, hour, minute, second, task}
locker.Lock()
defer locker.Unlock()
tasks[name] = cj
}

// RemoveSchedule can remove schedule task safely
func RemoveSchedule(name string) {
locker.Lock()
defer locker.Unlock()
delete(tasks, name)
}

// NewMonthlySchedule creates a new schedule task matched any exactly time in a month.
// 创建一个定时任务,匹配一个月中的任意时间,但时间必须是明确的
// this creates a new scheduled task matched any exactly time in a month.
func NewMonthlySchedule(day, hour, minute, second int8, name string, task func(time.Time)) {
NewSchedule(ANY, day, ANY, hour, minute, second, name, task)
}

// NewWeeklySchedule creates a new scheduled task matched any exactly time in a week.
// 创建一个定时任务,匹配一周中的任意时间,但时间必须是明确的
// this creates a new scheduled task matched any exactly time in a week.
func NewWeeklySchedule(weekday, hour, minute, second int8, name string, task func(time.Time)) {
NewSchedule(ANY, ANY, weekday, hour, minute, second, name, task)
}

// NewDailySchedule creates a new scheduled task matched any exactly time of day.
// 创建一个定时任务,匹配一天中的任意时间,但时间必须是明确的
// this creates a new scheduled task matched any exactly time of day.
func NewDailySchedule(hour, minute, second int8, name string, task func(time.Time)) {
NewSchedule(ANY, ANY, ANY, hour, minute, second, name, task)
}

//Matches will match now time to execute task function
func (sc schedule) Matches(t time.Time) (ok bool) {
ok = (sc.Month == ANY || sc.Month == int8(t.Month())) &&
(sc.Day == ANY || sc.Day == int8(t.Day())) &&
Expand Down
6 changes: 0 additions & 6 deletions schedule_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@ import (
"time"
)

/**
* @author anyang
* Email: 1300378587@qq.com
* Created Date:2020-06-21 11:35
*/

func TestNewSchedule(t *testing.T) {

NewDailySchedule(ANY, ANY, ANY, "小任务", func(t time.Time) {
Expand Down
112 changes: 57 additions & 55 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ import (
"log"
)

//定义可执行的方法

//适合执行顺序长时间任务
// Task struct should be used for task that will running for long time so
// that you can use other methods like Pause() before finish
type Task struct {
BaseTask
status bool
Expand All @@ -21,13 +20,15 @@ type Task struct {
data map[int]interface{}
}

//func(this *Task,index int){
// TaskItem e.g.
// func(task *Task,index int){
// //do something
// //if there is result you need ,you can put it into this.data[index]
// this.data[index] = result
// //if there is result you need ,you can put it into task.data[index]
// task.data[index] = result
//}
type TaskItem func(*Task, int) error

// New create a new Task schedule,you can use method to operate you taskItem
func (Task) New() *Task {
return &Task{
status: false,
Expand All @@ -41,102 +42,103 @@ func (Task) New() *Task {
}
}

//
func (this *Task) Add(t TaskItem) *Task {
if this.works == nil {
// Add function add a TaskItem into Task
func (task *Task) Add(t TaskItem) *Task {
if task.works == nil {
panic("please use Task.New() create Task")
}
if this.status {
if task.status {
panic("This Task had start ,can not add")
}
this.works = append(this.works, t)
return this
task.works = append(task.works, t)
return task
}

//this function will start a waiting for run TaskItem of slice
func (this *Task) Start() error {
this.status = true
// Start function will start a waiting for run TaskItem of slice
func (task *Task) Start() error {
task.status = true
go func() {
<-this.start
for index, _ := range this.works {
<-task.start
for index, _ := range task.works {
select {
case <-this.stop:
case <-task.stop:
//关闭其他通道
close(this.start)
close(this.pause)
close(this.stop)
log.Print(&this, " has stop")
close(task.start)
close(task.pause)
close(task.stop)
log.Print(&task, " has stop")
goto END
case <-this.pause:
this.pause <- 2
case <-task.pause:
task.pause <- 2
default:
}
this.Exec(index)
task.Exec(index)
}
END:
this.finish = true
this.wait <- 1
this.status = false
task.finish = true
task.wait <- 1
task.status = false
}()
return nil
}

func (this *Task) Exec(index int) {
this.works[index](this, index)
// Exec will execute taskItem function in
func (task *Task) Exec(index int) {
task.works[index](task, index)
}

func (this *Task) WaitFinish() int {
result := <-this.wait
close(this.wait)
func (task *Task) WaitFinish() int {
result := <-task.wait
close(task.wait)
return result
}

//当需要再次执行Start时可以重新刷新,在此之前需要执行stop,并等待关闭完成
func (this *Task) Flush() bool {
if this.finish {
this.start = make(chan int, 1)
this.stop = make(chan int, 1)
this.pause = make(chan int)
this.wait = make(chan int)
this.status = false
this.finish = false
func (task *Task) Flush() bool {
if task.finish {
task.start = make(chan int, 1)
task.stop = make(chan int, 1)
task.pause = make(chan int)
task.wait = make(chan int)
task.status = false
task.finish = false
return true
}
return false
}

func (this *Task) Stop() error {
func (task *Task) Stop() error {
var err error
defer func() {
if err1 := recover(); err != nil {
err = fmt.Errorf("%v", err1)
}
}()
this.stop <- 1
task.stop <- 1
return err
}

func (this *Task) Run() error {
func (task *Task) Run() error {
var err error
if !this.status {
if !task.status {
err = errors.New("please do Start() function before Run()")
return err
}
this.start <- 1
task.start <- 1
return err
}

// 如果多处触发Pause()可能会导致阻塞,建议 go this.Pause(),利用channel阻塞进行暂停
func (this *Task) Pause() {
this.pause <- 1
// 如果多处触发Pause()可能会导致阻塞,建议 go task.Pause(),利用channel阻塞进行暂停
func (task *Task) Pause() {
task.pause <- 1
}

// 如果多处触发Resume()可能会导致阻塞,建议 go this.Pause()
func (this *Task) Resume() {
<-this.pause
// 如果多处触发Resume()可能会导致阻塞,建议 go task.Pause()
func (task *Task) Resume() {
<-task.pause
}

//this need user put result of TaskItem into data ,and key is index ,value is result
func (this *Task) Call() (map[int]interface{}, error) {
return this.data, nil
//task need user put result of TaskItem into data ,and key is index ,value is result
func (task *Task) Call() (map[int]interface{}, error) {
return task.data, nil
}

0 comments on commit 8232ed5

Please sign in to comment.