参考:https://www.cnblogs.com/gwyy/p/13266589.html
import “github.com/Shopify/sarama”
有几种类型的生产者:
sarama.NewSyncProducer() //同步发送者
sarama.NewAsyncProducer() //异步发送者
同步模式:
func main() {
config := sarama.NewConfig() //实例化个sarama的Config
config.Producer.Return.Successes = true //是否开启消息发送成功后通知 successes channel
config.Producer.Partitioner = sarama.NewRandomPartitioner //随机分区器
client,err := sarama.NewClient([]string{"10.180.18.60:9092"}, config) //初始化客户端
defer client.Close()
if err != nil {panic(err)}
producer,err := sarama.NewSyncProducerFromClient(client)
if err!=nil {panic(err)}
partition, offset , err := producer.SendMessage(&sarama.ProducerMessage{Topic: "liangtian_topic", Key: nil, Value: sarama.StringEncoder("hahaha")})
if err != nil {
log.Fatalf("unable to produce message: %q", err)
}
fmt.Println("partition",partition)
fmt.Println("offset",offset)
}
异步模式:
异步模式,顾名思义就是produce一个message之后不等待发送完成返回;这样调用者可以继续做其他的工作。
config := sarama.NewConfig()
// config.Producer.Return.Successes = true
client, err := sarama.NewClient([]{"localhost:9092"}, config)
if err != nil {
log.Fatalf("unable to create kafka client: %q", err)
}
producer, err := sarama.NewAsyncProducerFromClient
if err != nil {
log.Fatalf("unable to create kafka producer: %q", err)
}
defer producer.Close()
text := fmt.Sprintf("message %08d", i)
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
// wait response
select {
//case msg := <-producer.Successes():
// log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
default:
log.Println("Produced message default",)
}
...
关于异步producer注意事项:
异步模式produce一个消息后,缺省并不会报告成功状态。
config.Producer.Return.Successes = false
...
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)
// wait response
select {
case msg := <-producer.Successes():
log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
}
则这段代码会挂住,因为设置没有要求返回成功config.Producer.Return.Successes = false,那么在select等待的时候producer.Successes()不会返回,producer.Errors()也不会返回(假设没有错误发生),就挂在这儿。当然可以加一个default分支绕过去,就不会挂住了:
select {
case msg := <-producer.Successes():
log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
default:
log.Println("Produced message default")
}
如果打开了Return.Successes配置,则上述代码段等同于同步方式
config.Producer.Return.Successes = true
...
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)
// wait response
select {
case msg := <-producer.Successes():
log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
}
如果打开了Return.Successes配置,而又没有producer.Successes()提取,那么Successes()这个chan消息会被写满。
config.Producer.Return.Successes = true
...
log.Printf("Reade to Produced message: [%s]\n",text)
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)
// wait response
select {
//case msg := <-producer.Successes():
// log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
default:
log.Println("Produced message default",)
}
写满的结果就是不能再写入了,导致后面的Return.Successes消息丢失, 而且producer也会挂住,因为共享的buffer被占满了,大量的Return.Successes没有被消耗掉。
在produce第00000608个message的时候被挂住了,因为消息缓冲满了;这个缓冲的大小是可配的(可能是这个MaxRequestSize?),但是不管大小是多少,如果没有去提取Success消息最终都会被占满的。结论就是说配置config.Producer.Return.Successes = true和操作<-producer.Successes()必须配套使用;配置成true,那么就要去读取Successes,如果配置成false,则不能去读取Successes。
消费者
每一个Topic的分区只能被一个消费组中的一个消费者所消费。一个消费者可以同时消费多个topic
type consumerGroupHandler struct{
name string
}
func main1() {
var wg sync.WaitGroup
config := sarama.NewConfig()
config.Consumer.Return.Errors = false
config.Version = sarama.V0_10_2_0
config.Consumer.Offsets.Initial = sarama.OffsetOldest
client,err := sarama.NewClient([]string{"10.180.18.60:9092"}, config)
defer client.Close()
if err != nil {
panic(err)
}
group1, err := sarama.NewConsumerGroupFromClient("c1", client)
if err != nil {
panic(err)
}
group2, err := sarama.NewConsumerGroupFromClient("c2", client)
if err != nil {
panic(err)
}
group3, err := sarama.NewConsumerGroupFromClient("c3", client)
if err != nil {
panic(err)
}
defer group1.Close()
defer group2.Close()
defer group3.Close()
wg.Add(3)
go consume(&group1,&wg,"c1")
go consume(&group2,&wg,"c2")
go consume(&group3,&wg,"c3")
wg.Wait()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
select {
case <-signals:
}
}
func consume(group *sarama.ConsumerGroup,wg *sync.WaitGroup, name string) {
fmt.Println(name + "start")
wg.Done()
ctx := context.Background()
for {
//topic := []string{"tiantian_topic1","tiantian_topic2"} 可以消费多个topic
topics := []string{"liangtian_topic"}
handler := consumerGroupHandler{name: name}
err := (*group).Consume(ctx, topics, handler)
if err != nil {
panic(err)
}
}
}
func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("%s Message topic:%q partition:%d offset:%d value:%s\n",h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
// 手动确认消息
sess.MarkMessage(msg, "")
}
return nil
}
func handleErrors(group *sarama.ConsumerGroup,wg *sync.WaitGroup ){
wg.Done()
for err := range (*group).Errors() {
fmt.Println("ERROR", err)
}
}
普通消费者(我姑且这么说)。有些情况下我们有些消费者是没有消费组的,正常的消费者可自动分配分区到消费者并且组中消费者新增或删除会自动触发负载均衡的消费组。 但在某些情况下,却想要更简单的东西。有时你知道你有一个单一的消费者总是需要从主题中的所有分区读取数据,或者从一个主题特定分区读取数据。在这种情况下没有理由需要组或负载均衡,只是订阅特定的主题或分区,偶尔使用消息和提交偏移量。 但是有个注意的点。除了没有负载均衡以及需要手动查找分区,一切看起来都很正常。请记住,如果有人向主题添加新分区,则不会通知消费者。所以无论是处理通过定期检查consumer.partitionsFor()或者记住是否是管理员添加分区,应用程序将需要跳跃。还要注意的是消费者可以订阅的主题(成为一个消费组的一部分),或分配自己的分区,但不能同时实现。下面可以看看代码。一般不这么用。一般都用消费组+消费者
func main() {
var wg sync.WaitGroup
//创建消费者
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
client,err := sarama.NewClient([]string{"10.180.18.60:9092"}, config)
defer client.Close()
if err != nil {
panic(err)
}
consumer, err := sarama.NewConsumerFromClient(client)
defer consumer.Close()
if err != nil {panic(err)}
//设置分区
partitionList, err := consumer.Partitions("liangtian_topic")
if err != nil {
fmt.Println("faild to get the list of partitions",err)
}
//[0 1 2]
fmt.Println(partitionList)
//循环读取分区
for partition := range partitionList {
pc, err := consumer.ConsumePartition("liangtian_topic", int32(partition), sarama.OffsetOldest)
if err != nil {
fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
return
}
defer pc.AsyncClose()
wg.Add(1)
go func(pc sarama.PartitionConsumer) {
defer wg.Done()
for msg := range pc.Messages() {
fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
fmt.Println()
}
}(pc)
}
//time.Sleep(time.Hour)
wg.Wait()
consumer.Close()
}