Golang同步和异步执行

同步适合多个连续执行的,每一步的执行依赖于上一步操作,异步执行则和任务执行顺序无关(如从10个站点抓取数据)

同步执行类RunnerAsync

支持返回超时检测,系统中断检测

错误常量定义,task/err.go

package taskimport "errors"//超时错误
var ErrTimeout = errors.New("received timeout")//操作系统系统中断错误
var ErrInterrupt = errors.New("received interrupt")

实现代码如下,task/runner_async.go

package taskimport ("os""os/signal""time"
)//同步执行任务
type RunnerAsync struct {//操作系统的信号检测interrupt chan os.Signal//记录执行完成的状态complete chan error//超时检测timeout <-chan time.Time//保存所有要执行的任务,顺序执行tasks []func(id int)
}//new一个RunnerAsync对象
func NewRunnerAsync(d time.Duration) *RunnerAsync {return &RunnerAsync{interrupt: make(chan os.Signal, 1),complete:  make(chan error),timeout:   time.After(d),}
}//添加一个任务
func (this *RunnerAsync) Add(tasks ...func(id int)) {this.tasks = append(this.tasks, tasks...)
}//启动RunnerAsync,监听错误信息
func (this *RunnerAsync) Start() error {//接收操作系统信号signal.Notify(this.interrupt, os.Interrupt)//执行任务go func() {this.complete <- this.Run()}()select {//返回执行结果case err := <-this.complete:return err//超时返回case <-this.timeout:return ErrTimeout}
}//顺序执行所有的任务
func (this *RunnerAsync) Run() error {for id, task := range this.tasks {if this.gotInterrupt() {return ErrInterrupt}//执行任务task(id)}return nil
}//判断是否接收到操作系统中断信号
func (this *RunnerAsync) gotInterrupt() bool {select {case <-this.interrupt://停止接收别的信号signal.Stop(this.interrupt)return true//正常执行default:return false}
}

使用方法    

Add添加一个任务,任务为接收int类型的一个闭包

Start开始执行伤,返回一个error类型,nil为执行完毕, ErrTimeout代表执行超时,ErrInterrupt代表执行被中断(类似Ctrl + C操作)

测试代码

task/runner_async_test.go

package taskimport ("fmt""os""runtime""testing""time"
)func TestRunnerAsync_Start(t *testing.T) {//开启多核runtime.GOMAXPROCS(runtime.NumCPU())//创建runner对象,设置超时时间runner := NewRunnerAsync(8 * time.Second)//添加运行的任务runner.Add(createTaskAsync(),createTaskAsync(),createTaskAsync(),createTaskAsync(),createTaskAsync(),createTaskAsync(),createTaskAsync(),createTaskAsync(),createTaskAsync(),createTaskAsync(),createTaskAsync(),createTaskAsync(),createTaskAsync(),)fmt.Println("同步执行任务")//开始执行任务if err := runner.Start(); err != nil {switch err {case ErrTimeout:fmt.Println("执行超时")os.Exit(1)case ErrInterrupt:fmt.Println("任务被中断")os.Exit(2)}}t.Log("执行结束")}//创建要执行的任务
func createTaskAsync() func(id int) {return func(id int) {fmt.Printf("正在执行%v个任务\n", id)//模拟任务执行,sleep两秒//time.Sleep(1 * time.Second)}
}

执行结果

同步执行任务
正在执行0个任务
正在执行1个任务
正在执行2个任务
正在执行3个任务
正在执行4个任务
正在执行5个任务
正在执行6个任务
正在执行7个任务
正在执行8个任务
正在执行9个任务
正在执行10个任务
正在执行11个任务
正在执行12个任务

异步执行类Runner

支持返回超时检测,系统中断检测

实现代码如下,task/runner.go

package taskimport ("os""time""os/signal""sync"
)//异步执行任务
type Runner struct {//操作系统的信号检测interrupt chan os.Signal//记录执行完成的状态complete chan error//超时检测timeout <-chan time.Time//保存所有要执行的任务,顺序执行tasks []func(id int) errorwaitGroup sync.WaitGrouplock sync.Mutexerrs []error
}//new一个Runner对象
func NewRunner(d time.Duration) *Runner {return &Runner{interrupt: make(chan os.Signal, 1),complete:  make(chan error),timeout:   time.After(d),waitGroup: sync.WaitGroup{},lock:      sync.Mutex{},}
}//添加一个任务
func (this *Runner) Add(tasks ...func(id int) error) {this.tasks = append(this.tasks, tasks...)
}//启动Runner,监听错误信息
func (this *Runner) Start() error {//接收操作系统信号signal.Notify(this.interrupt, os.Interrupt)//并发执行任务go func() {this.complete <- this.Run()}()select {//返回执行结果case err := <-this.complete:return err//超时返回case <-this.timeout:return ErrTimeout}
}//异步执行所有的任务
func (this *Runner) Run() error {for id, task := range this.tasks {if this.gotInterrupt() {return ErrInterrupt}this.waitGroup.Add(1)go func(id int) {this.lock.Lock()//执行任务err := task(id)//加锁保存到结果集中this.errs = append(this.errs, err)this.lock.Unlock()this.waitGroup.Done()}(id)}this.waitGroup.Wait()return nil
}//判断是否接收到操作系统中断信号
func (this *Runner) gotInterrupt() bool {select {case <-this.interrupt://停止接收别的信号signal.Stop(this.interrupt)return true//正常执行default:return false}
}//获取执行完的error
func (this *Runner) GetErrs() []error {return this.errs
}

使用方法    

Add添加一个任务,任务为接收int类型,返回类型error的一个闭包

Start开始执行伤,返回一个error类型,nil为执行完毕, ErrTimeout代表执行超时,ErrInterrupt代表执行被中断(类似Ctrl + C操作)

getErrs获取所有的任务执行结果

测试示例代码

task/runner_test.go

package taskimport ("testing""time""fmt""os""runtime"
)func TestRunner_Start(t *testing.T) {//开启多核心runtime.GOMAXPROCS(runtime.NumCPU())//创建runner对象,设置超时时间runner := NewRunner(18 * time.Second)//添加运行的任务runner.Add(createTask(),createTask(),createTask(),createTask(),createTask(),createTask(),createTask(),createTask(),createTask(),createTask(),createTask(),createTask(),createTask(),createTask(),)fmt.Println("异步执行任务")//开始执行任务if err := runner.Start(); err != nil {switch err {case ErrTimeout:fmt.Println("执行超时")os.Exit(1)case ErrInterrupt:fmt.Println("任务被中断")os.Exit(2)}}t.Log("执行结束")t.Log(runner.GetErrs())}//创建要执行的任务
func createTask() func(id int) error {return func(id int) error {fmt.Printf("正在执行%v个任务\n", id)//模拟任务执行,sleep//time.Sleep(1 * time.Second)return nil}
}

执行结果

异步执行任务
正在执行2个任务
正在执行1个任务
正在执行4个任务
正在执行3个任务
正在执行6个任务
正在执行5个任务
正在执行9个任务
正在执行7个任务
正在执行10个任务
正在执行13个任务
正在执行8个任务
正在执行11个任务
正在执行12个任务
正在执行0个任务 


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部