H5W3
当前位置:H5W3 > go > 正文

【go】kafka多个消费者只有一个消费

使用goalng “github.com/Shopify/sarama”这个库

我现在有两个消费者,两个topic,kafka分区设定是50个,但是在实际运行过程中只有一个消费者在消费,关闭当前能消费的消费者,另外一个也可以消费,但是同时运行是就不行

看网上的文档说的是因为分区不够的原因,但是我现在设定了50个分区,只有两个消费者
代码1

    producerConfig := sarama.NewConfig()
producerConfig.Producer.Partitioner = sarama.NewHashPartitioner
producerConfig.Producer.Return.Successes = true
producerConfig.Producer.Timeout = 5 * time.Second
producer, err = sarama.NewSyncProducer([broker实例地址], producerConfig)
kafka_msg := &sarama.ProducerMessage{
Topic:topic,
Key:sarama.StringEncoder(key),
Value: sarama.StringEncoder(value),
}
partition, offset, err := producer.SendMessage(kafka_msg)

这样,投递成功之后返回的分区id永远是0,我怀疑是不是因为我分区设置值没有生效,

,但是因为是第一次用kafka,所以不确定,

于是使用下一个方式
代码2

    producerConfig := sarama.NewConfig()
// producerConfig.Producer.Partitioner = sarama.NewHashPartitioner
producerConfig.Producer.Return.Successes = true
producerConfig.Producer.Timeout = 5 * time.Second
producer, err = sarama.NewSyncProducer([broker实例地址], producerConfig)
kafka_msg := &sarama.ProducerMessage{
Topic:topic,
Key:sarama.StringEncoder(key),
Value: sarama.StringEncoder(value),
Partition: 7 ,
}
partition, offset, err := producer.SendMessage(kafka_msg)

这样设置后投递成功分区返回也是0。

我想确认的是,在分区设置生效的前提先,如果我用代码2来写,最终投递到的分区是不是7 ?

我现在怀疑是因为分区配置没有生效
请大神解答

kafka里面有单播和广播的区别,对一条消息来说,同一个消费组内的消费者有竞态关系,只有一个消费者能消费,这个是单播;同样,对一条消息,不同消费组的消费者都可以同时消费,这是多播。假如你想让两个消费者都能同时消费到消息,你可以将这两个消费者放在不同的消费组,这个需要消费端的groupId属性来设置。

一个分区只能被消费者组中的一个消费者消费,我猜你这里是因为每次发送的消息都发送到了同一个分区

你的两个消费者应该是在同一个分组中,在kafka中,一条消息可以被多个分组消费,但是只能被一个分组中的一个消费者消费。

sarama包中,在初始化producer时,需要指定所用的分区器(partitioner).
粘一段源码:
【go】kafka多个消费者只有一个消费

他不像Java producer中的DefaultPatitioner,会按顺序(指定的partition、key hash、roundRobin)选择分区。
所以你如果想指定分区的话。

producerConfig.Producer.Partitioner = sarama.NewHashPartitioner

改成

producerConfig.Producer.Partitioner = sarama.NewManualPartitioner

回答

本文地址:H5W3 » 【go】kafka多个消费者只有一个消费

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址