【go】golang 并发问题

allMap存储的是一个任务列表,KEY标记了这个任务类型,Value对应的是任务的参数, 现在我需要并发处理这些任务 。 开发过程中使用了如下两种方法,效果并不好,感觉自己没有领会到golang并发处理的思想 ; 下面是我的几点体会和疑惑,希望得到各位大神的指导。

方式一

    // allMap 中存储了任务列表

// Task 定义如下

type Task struct {

Params interface{}

ResultChan chan []byte

// Wg *sync.WaitGroup

}

Params是参数,ResultChan是处理完毕之后,将结果写入到ResultChan中 ;

// 并发 处理任务

for key, value := range allMap {

go func(k string, v interface{}) {

log.Debug("k : " , k )

if k == tools.REQUEST_BAOJIE {

// A

log.Debug("baojie elem len : ", len(value))

one_task = &service.Task{

Params: v,

ResultChan: make(chan []byte, len(value)),

//Wg : new(sync.WaitGroup) ,

}

// B

log.Debugf("1 one_task : %+v ", one_task)

// AddTask函数逻辑会处理one_task,处理完毕之后,将结果写入到one_task结构体的ResultChan字段;

service.AddTask(one_task)

} else if k == tools.REQUEST {

}

}(key, value)

}

// C

log.Debugf("2 one_task : %+v ", one_task)

// 接收结果

go func() {

for item := range one_task.ResultChan {

log.Debug("Receive data From ResultChan : ", string(item))

}

log.Debug("Process ", tools.REQUEST_BAOJIE, " end ")

}()

这种方式的弊端,太依赖程序执行的先后顺序了,测试的过程中,发现当C发生在A和B之前时,会使接收结果goroutinue访问ResultChan成员发生奔溃,因为此时ResultChan还没有申请空间。

方案一解决方案:
service.AddTask(one_task) 函数再加一个参数,chan <- interface{} , AddTask处理完之后,将结果写入到这个通道里面,接收结果协程监听该通道,然后读取结果。

方式二

延迟并发时机

    for k, v := range allMap {

//go func(k string, v interface{}) {

log.Debug("k : ", k)

if k == tools.REQUEST {

// A

log.Debug("baojie elem len : ", len(v))

one_task = &service.Task{

Params: v,

ResultChan: make(chan []byte, len(v)),

//Wg : new(sync.WaitGroup) ,

}

// B

log.Debugf("1 one_task : %+v ", one_task)

go service.AddTask(one_task)

} else if k == tools.REQUEST_TCP {

}

//}(key, value)

}

// C

log.Debugf("2 one_task : %+v ", one_task)

// 接收结果

go func() {

for item := range one_task.ResultChan {

log.Debug("Receive data From ResultChan : ", string(item))

}

log.Debug("Process ", tools.REQUEST_BAOJIE, " end ")

}()

这样,就保证了C必须发生在A、B之后,这样一来,ResultChan一定先初始化了,等待AddTask后面的协程往里面写入数据,接收结果协程就会读取出来。

问题1

问题来了,既然方式一存在问题,那么方式二中是否在效率上有何弊端呢 ?

我这样写并发的逻辑是否有问题 ?

问题2

这种思想是否可取

var task Task ;

// 提交任务 线程

for key , value := range allMap{

task := Task{

params : value ,

result : make(chan interface{} , len(value) ) , // value 是一个list

}

go processOneByOne(key ,value) // 这种方式是不是开启了很多协程? len(allmap)

}

// 取结果

for result := range task.result {

// get result from chann

// to do

}

``

## 问题3

计划使用一个全局的chan,processOneByOne业务函数处理完毕之后,将结果写到该chan中,然后监听这个chann,从chann中获取结果

处理流程大致:

demo.go

func TodoWork(){

go func(){

for key ,value := range allMap{

processOneByOne(key , value )

}

}()

for item := range task.ResultChan {

// 问题一、 这里如何保证item就是上面那个key value的结果,而不是其他的KEY、value对应的结果

// 问题二、 当TodoWork在多进程环境下面时,是否存在竞争问题?

println(item)

}

}

task.go

var (

ResultChan chan interface{} 

)

func init(){

ResultChan = make( chan interface{} , 100 ) 

}

func processOneByOne( key string , value interface{} ) {

// 处理任务

// ....

// 写入结果

// 问题三、怎么关闭ResultChan , 如果不关闭,是不是goroutine泄漏问题啊 ?

ResultChan <- "Hello World"

}

### 问题描述

### 问题出现的环境背景及自己尝试过哪些方法

### 相关代码

// 请把代码文本粘贴到下方(请勿用图片代替代码)

### 你期待的结果是什么?实际看到的错误信息又是什么?

### 题目描述

### 题目来源及自己的思路

### 相关代码

// 请把代码文本粘贴到下方(请勿用图片代替代码)

### 你期待的结果是什么?实际看到的错误信息又是什么?

### 问题描述

### 问题出现的环境背景及自己尝试过哪些方法

### 相关代码

// 请把代码文本粘贴到下方(请勿用图片代替代码)

### 你期待的结果是什么?实际看到的错误信息又是什么?

  1. 你方式一和二中的示例代码one_task都是局部变量,C和后面的接收结果协程,都访问不到。
  2. 遍历map的效率比slice的效率低很多,建议把allMap转换为slice结构的存储,里面存储Task。
  3. golang的并发是CSP模型,以通道为核心,存在执行顺序的地方,使用channel会非常合适,如果C必须在A、B后面,建议使用channel解决顺序问题。
  4. 每个task都建立一个协程处理是golang的做法,只要它们之间没有执行顺序的依赖。
  5. 没必要为每个task都建立一个channel,除非你也不晓得这个task要产生多少数据。

如果是我,我会采用这种模式:

// Task 定义如下

type Task struct {

Key keyType

Params interface{}

}

var ResultChan chan []byte

var Wg sync.WaitGroup

// 分发任务

for _, task := range taskSlice {

Wg.Add(1)

go task.Do(ResultChan, &Wg)

}

// 接收每个任务的结果

go func() {

for item := range ResultChan {

log.Debug("Receive data From ResultChan : ", string(item))

}

log.Debug("Process ", tools.REQUEST_BAOJIE, " end ")

}()

// 等待接收,关闭通道

Wg.Wait()

close(ResultChan)

想太多了,怕同时起多了协程,可以定义协程长度,更甚至采用协程池。
简单写个实现:

type task struct{

name strig

params interface{}

result []byte

}

var gt = make(chan int,10)//同时只允许10个task一起执行

var tkr = make(chan task,10)

go func(){

for _,t := range allTaskMap {

gt <- 1

go func(curtask task){

//处理task,

tkr <- curtask //处理完,可以使用channel 将结果传输出去

<-gt//处理完毕 退出

}(t)

}

}()

for {

select{

case rt := <- tkr:

//处理结果

}

}

实际上很少用map去存储task列表,可以考虑使用channel传入task

@timmu 谢谢你的回答,我想问下gt通道在这里的作用。

你在初始化one_task之后就开始接收结果不就好了吗?然后这种代码少用if else太丑了,switch case会美观,而且性能更好,还有就是,既然是循环,你还是在循环里面执行函数会好一点

    // allMap 中存储了任务列表

// Task 定义如下

type Task struct {

Params interface{}

ResultChan chan []byte

// Wg *sync.WaitGroup

}

Params是参数,ResultChan是处理完毕之后,将结果写入到ResultChan中 ;

// 并发 处理任务

for key, value := range allMap {

go func(k string, v interface{}) {

log.Debug("k : " , k )

switch k {

case tools.REQUEST_BAOJIE:

// A

log.Debug("baojie elem len : ", len(value))

one_task = &service.Task{

Params: v,

ResultChan: make(chan []byte, len(value)),

//Wg : new(sync.WaitGroup) ,

}

// 接收结果

go func() {

for item := range one_task.ResultChan {

log.Debug("Receive data From ResultChan : ", string(item))

}

log.Debug("Process ", tools.REQUEST_BAOJIE, " end ")

}()

// B

log.Debugf("1 one_task : %+v ", one_task)

// AddTask函数逻辑会处理one_task,处理完毕之后,将结果写入到one_task结构体的ResultChan字段;

service.AddTask(one_task)

case tools.REQUEST:

}(key, value)

}

// C

log.Debugf("2 one_task : %+v ", one_task)

回答

以上是 【go】golang 并发问题 的全部内容, 来源链接: www.h5w3.com/113626.html

回到顶部