国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Golang > 正文

Go語言同步與異步執行多個任務封裝詳解(Runner和RunnerAsync)

2020-04-01 18:59:32
字體:
來源:轉載
供稿:網友

前言

同步適合多個連續執行的,每一步的執行依賴于上一步操作,異步執行則和任務執行順序無關(如從10個站點抓取數據)

同步執行類RunnerAsync

支持返回超時檢測,系統中斷檢測

錯誤常量定義

//超時錯誤var ErrTimeout = errors.New("received timeout")//操作系統系統中斷錯誤var ErrInterrupt = errors.New("received interrupt")

實現代碼如下

package taskimport ( "os" "time" "os/signal" "sync") //異步執行任務type Runner struct { //操作系統的信號檢測 interrupt chan os.Signal //記錄執行完成的狀態 complete chan error //超時檢測 timeout <-chan time.Time //保存所有要執行的任務,順序執行 tasks []func(id int) error waitGroup sync.WaitGroup lock sync.Mutex errs []error} //new一個Runner對象func NewRunner(d time.Duration) *Runner { return &Runner{ interrupt: make(chan os.Signal, 1), complete: make(chan error), timeout: time.After(d), waitGroup: sync.WaitGroup{}, lock: sync.Mutex{}, }} //添加一個任務func (this *Runner) Add(tasks ...func(id int) error) { this.tasks = append(this.tasks, tasks...)} //啟動Runner,監聽錯誤信息func (this *Runner) Start() error { //接收操作系統信號 signal.Notify(this.interrupt, os.Interrupt) //并發執行任務 go func() { this.complete <- this.Run() }() select { //返回執行結果 case err := <-this.complete: return err //超時返回 case <-this.timeout: return ErrTimeout }} //異步執行所有的任務func (this *Runner) Run() error { for id, task := range this.tasks { if this.gotInterrupt() {  return ErrInterrupt } this.waitGroup.Add(1) go func(id int) {  this.lock.Lock()  //執行任務  err := task(id)  //加鎖保存到結果集中  this.errs = append(this.errs, err)   this.lock.Unlock()  this.waitGroup.Done() }(id) } this.waitGroup.Wait()  return nil} //判斷是否接收到操作系統中斷信號func (this *Runner) gotInterrupt() bool { select { case <-this.interrupt: //停止接收別的信號 signal.Stop(this.interrupt) return true //正常執行 default: return false }} //獲取執行完的errorfunc (this *Runner) GetErrs() []error { return this.errs}

使用方法    

Add添加一個任務,任務為接收int類型的一個閉包

Start開始執行傷,返回一個error類型,nil為執行完畢, ErrTimeout代表執行超時,ErrInterrupt代表執行被中斷(類似Ctrl + C操作)

測試示例代碼

package taskimport ( "testing" "time" "fmt" "os" "runtime") func TestRunnerAsync_Start(t *testing.T) { //開啟多核 runtime.GOMAXPROCS(runtime.NumCPU()) //創建runner對象,設置超時時間 runner := NewRunnerAsync(8 * time.Second) //添加運行的任務 runner.Add( createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), ) fmt.Println("同步執行任務") //開始執行任務 if err := runner.Start(); err != nil { switch err { case ErrTimeout:  fmt.Println("執行超時")  os.Exit(1) case ErrInterrupt:  fmt.Println("任務被中斷")  os.Exit(2) } } t.Log("執行結束")} //創建要執行的任務func createTaskAsync() func(id int) { return func(id int) { fmt.Printf("正在執行%v個任務/n", id) //模擬任務執行,sleep兩秒 //time.Sleep(1 * time.Second) }}

執行結果  

同步執行任務正在執行0個任務正在執行1個任務正在執行2個任務正在執行3個任務正在執行4個任務正在執行5個任務正在執行6個任務正在執行7個任務正在執行8個任務正在執行9個任務正在執行10個任務正在執行11個任務正在執行12個任務 runnerAsync_test.go:49: 執行結束

異步執行類Runner

支持返回超時檢測,系統中斷檢測

實現代碼如下

package taskimport ( "os" "time" "os/signal" "sync") //異步執行任務type Runner struct { //操作系統的信號檢測 interrupt chan os.Signal //記錄執行完成的狀態 complete chan error //超時檢測 timeout <-chan time.Time //保存所有要執行的任務,順序執行 tasks []func(id int) error waitGroup sync.WaitGroup lock sync.Mutex errs []error} //new一個Runner對象func NewRunner(d time.Duration) *Runner { return &Runner{  interrupt: make(chan os.Signal, 1),  complete: make(chan error),  timeout: time.After(d),  waitGroup: sync.WaitGroup{},  lock:  sync.Mutex{}, }} //添加一個任務func (this *Runner) Add(tasks ...func(id int) error) { this.tasks = append(this.tasks, tasks...)} //啟動Runner,監聽錯誤信息func (this *Runner) Start() error { //接收操作系統信號 signal.Notify(this.interrupt, os.Interrupt) //并發執行任務 go func() {  this.complete <- this.Run() }() select { //返回執行結果 case err := <-this.complete:  return err  //超時返回 case <-this.timeout:  return ErrTimeout }} //異步執行所有的任務func (this *Runner) Run() error { for id, task := range this.tasks {  if this.gotInterrupt() {   return ErrInterrupt  }  this.waitGroup.Add(1)  go func(id int) {   this.lock.Lock()   //執行任務   err := task(id)   //加鎖保存到結果集中   this.errs = append(this.errs, err)   this.lock.Unlock()   this.waitGroup.Done()  }(id) } this.waitGroup.Wait() return nil} //判斷是否接收到操作系統中斷信號func (this *Runner) gotInterrupt() bool { select { case <-this.interrupt:  //停止接收別的信號  signal.Stop(this.interrupt)  return true  //正常執行 default:  return false }} //獲取執行完的errorfunc (this *Runner) GetErrs() []error { return this.errs}

使用方法    

Add添加一個任務,任務為接收int類型,返回類型error的一個閉包

Start開始執行傷,返回一個error類型,nil為執行完畢, ErrTimeout代表執行超時,ErrInterrupt代表執行被中斷(類似Ctrl + C操作)

getErrs獲取所有的任務執行結果

測試示例代碼

package taskimport ( "testing" "time" "fmt" "os" "runtime") func TestRunner_Start(t *testing.T) { //開啟多核心 runtime.GOMAXPROCS(runtime.NumCPU()) //創建runner對象,設置超時時間 runner := NewRunner(18 * time.Second) //添加運行的任務 runner.Add(  createTask(),  createTask(),  createTask(),  createTask(),  createTask(),  createTask(),  createTask(),  createTask(),  createTask(),  createTask(),  createTask(),  createTask(),  createTask(),  createTask(), ) fmt.Println("異步執行任務") //開始執行任務 if err := runner.Start(); err != nil {  switch err {  case ErrTimeout:   fmt.Println("執行超時")   os.Exit(1)  case ErrInterrupt:   fmt.Println("任務被中斷")   os.Exit(2)  } } t.Log("執行結束") t.Log(runner.GetErrs())} //創建要執行的任務func createTask() func(id int) error { return func(id int) error {  fmt.Printf("正在執行%v個任務/n", id)  //模擬任務執行,sleep  //time.Sleep(1 * time.Second)  return nil }}

執行結果

異步執行任務正在執行2個任務正在執行1個任務正在執行4個任務正在執行3個任務正在執行6個任務正在執行5個任務正在執行9個任務正在執行7個任務正在執行10個任務正在執行13個任務正在執行8個任務正在執行11個任務正在執行12個任務正在執行0個任務 runner_test.go:49: 執行結束 runner_test.go:51: [<nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil>]

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對VEVB武林網的支持。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 阿瓦提县| 临西县| 永平县| 五莲县| 招远市| 长武县| 绥阳县| 南开区| 南昌市| 碌曲县| 宜昌市| 阿合奇县| 临潭县| 拉萨市| 东乌珠穆沁旗| 乌鲁木齐市| 麻栗坡县| 顺义区| 公主岭市| 讷河市| 珲春市| 浦东新区| 聊城市| 梅州市| 五常市| 台山市| 山阴县| 婺源县| 眉山市| 甘孜县| 育儿| 临安市| 富顺县| 桂阳县| 岳阳市| 杭锦旗| 内乡县| 手机| 色达县| 彭山县| 兴仁县|