go - GO的Kafka的问题,动态消费topic?
问题描述:
背景:不同的应用需要把不同的数据写入不同的表,所以规定了结构:
Topic
是应用ID、key
是表名、value
是具体数据
主要需要实现的功能是:
1.到达指定条数时,执行批量插入数据库,未达到指定条数时,视为超时,不论多少条都执行插入
2.监听退出信号,有退出信号时,把map中的数据返回插入到数据库
遇到的问题:
项目启动会初始化执行KfkInitConsumerGroup、ConsumeMessages
,因为这两个只会执行一次,所以订阅的Topic就不会更新了,当有新的Topic写入时,这个新的Topic不能被及时的订阅,不能正常消费了,求各位大佬指点怎么修改代码,能让Topic自动更新呢?让新的Topic也会被消费呢?下边是我的代码实现
目前使用的是 github.com/Shopify/sarama
库,具体实现代码:
package kfk import ( "context" "fmt" "log" "os" "os/signal" "regexp" "sync" "syscall" "time" "github.com/Shopify/sarama" ) type Consumer struct { brokers []string groupID string topics []string stopSignal chan struct{} client sarama.Client config *sarama.Config consumer sarama.ConsumerGroup timeExec time.Duration // 超时时间 mutex sync.Mutex // 互斥锁 messageLimit int // 每分钟获取的消息数量限制 } // KfkInitConsumerGroup initializes the consumer group. func KfkInitConsumerGroup(brokers []string, groupID string, messageLimit int, timeExec time.Duration) (*Consumer, error) { config := sarama.NewConfig() config.Consumer.Offsets.AutoCommit.Enable = true config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange client, err := sarama.NewClient(brokers, config) if err != nil { log.Fatal(err) } consumer := &Consumer{ brokers: brokers, groupID: groupID, config: config, timeExec: timeExec, messageLimit: messageLimit, } // 根据client创建consumerGroup fromConsumer, err := sarama.NewConsumerGroupFromClient(groupID, client) if err != nil { log.Fatalf("Error creating consumer group client: %v", err) } consumer.client = client consumer.consumer = fromConsumer consumer.stopSignal = make(chan struct{}) return consumer, nil } // ConsumeMessages starts consuming messages. func (c *Consumer) ConsumeMessages(handler func(map[string]map[string][][]byte) error) { // 设置捕捉中断信号的通道 sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) messageMap := make(map[string]map[string][][]byte) messageCount := 0 // 消息计数器 maxMessages := c.messageLimit // 每分钟获取的消息数量限制 //设置消费者处理函数 handlerFunc := &consumerHandler{ handler: handler, messageMap: messageMap, maxMessages: maxMessages, timeExec: c.timeExec, messageCount: &messageCount, } // 创建一个等待组,用于等待消费者和信号监听器完成 wg := sync.WaitGroup{} // 在启动消费者之前初始化主题列表 topics, err := c.client.Topics() if err != nil { log.Fatalf("Error getting topics: %v", err) } filteredTopics := filterTopics(topics) c.mutex.Lock() c.topics = filteredTopics c.mutex.Unlock() // 启动单独的goroutine定期获取主题列表 go c.refreshTopics() // 启动消费者协程 wg.Add(1) go func() { defer wg.Done() c.mutex.Lock() topics := c.topics // 获取当前的主题列表 c.mutex.Unlock() log.Printf("Start Topics: %v", topics) if err := c.consumer.Consume(context.Background(), topics, handlerFunc); err != nil { fmt.Printf("Error from consumer: %v\n", err) } }() // 启动信号监听器协程 wg.Add(1) go func() { defer wg.Done() // 创建一个信号通道 sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) // 阻塞等待信号 <-sigChan // 发送停止信号给消费者协程 close(c.stopSignal) err := handlerFunc.handler(handlerFunc.messageMap) if err != nil { } // 退出程序 os.Exit(0) }() // 等待消费者和信号监听器完成 wg.Wait() } func (c *Consumer) refreshTopics() { ticker := time.NewTicker(5 * time.Second) // 定时器,每隔一分钟获取一次主题列表 for { select { case <-ticker.C: topics, err := c.client.Topics() if err != nil { log.Printf("Error refreshing topics: %v", err) continue } filteredTopics := filterTopics(topics) // 过滤掉内部主题 "__consumer_offsets" log.Printf("All Topics: %v", filteredTopics) c.mutex.Lock() c.topics = filteredTopics // 更新主题列表 c.mutex.Unlock() case <-c.stopSignal: ticker.Stop() return } } } // 过滤掉内部主题 "__consumer_offsets" func filterTopics(topics []string) []string { filteredTopics := make([]string, 0) for _, topic := range topics { if topic != "__consumer_offsets" { filteredTopics = append(filteredTopics, topic) } } return filteredTopics } // consumerHandler implements sarama.ConsumerGroupHandler type consumerHandler struct { messageCount *int maxMessages int mutex sync.Mutex // 互斥锁 timeExec time.Duration // 超时时间 ticker *time.Ticker // 定时器 topicPattern *regexp.Regexp messageMap map[string]map[string][][]byte handler func(map[string]map[string][][]byte) error } // Setup is called when the consumer group session is established. func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil } // Cleanup is called when the consumer group session is terminated. func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } // ConsumeClaim is called when a new claim is received. func (h *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { h.ticker = time.NewTicker(h.timeExec) defer h.ticker.Stop() for { select { case <-h.ticker.C: h.mutex.Lock() if len(h.messageMap) > 0 { // 每30秒触发一次定时器 go func(messageMap map[string]map[string][][]byte) { err := h.handler(messageMap) if err != nil { // 错误处理,根据实际情况进行操作 } }(h.messageMap) *h.messageCount = 0 // 重置计数器 h.messageMap = make(map[string]map[string][][]byte) // 清空 messageMap } h.mutex.Unlock() case message := <-claim.Messages(): h.mutex.Lock() // 加锁 topicMap := h.messageMap[message.Topic] if topicMap == nil { topicMap = make(map[string][][]byte) h.messageMap[message.Topic] = topicMap } keySlice := topicMap[string(message.Key)] keySlice = append(keySlice, message.Value) topicMap[string(message.Key)] = keySlice h.mutex.Unlock() // 解锁 session.MarkMessage(message, "") // 标记消息为已消费 h.mutex.Lock() // 计算数量 *h.messageCount++ if *h.messageCount >= h.maxMessages { go func(messageMap map[string]map[string][][]byte) { err := h.handler(messageMap) if err != nil { // 错误处理,根据实际情况进行操作 } }(h.messageMap) *h.messageCount = 0 // 重置计数器 h.messageMap = make(map[string]map[string][][]byte) // 清空 messageMap } h.mutex.Unlock() } } }
------------------------------分割线------------------------------
按照一楼大哥的方式改了改,大概代码如下:
ConsumeMessages
// ConsumeMessages starts consuming messages. func (c *Consumer) ConsumeMessages(handler func(map[string]map[string][][]byte) error) { // ··· c.ctx, c.cancel = context.WithCancel(context.Background()) //设置消费者处理函数 handlerFunc := &consumerHandler{ handler: handler, messageMap: messageMap, maxMessages: maxMessages, timeExec: c.timeExec, messageCount: &messageCount, } // 创建一个等待组,用于等待消费者和信号监听器完成 wg := sync.WaitGroup{} // 在启动消费者之前初始化主题列表 topics, err := c.client.Topics() if err != nil { log.Fatalf("Error getting topics: %v", err) } filteredTopics := filterTopics(topics) c.mutex.Lock() c.topics = filteredTopics c.mutex.Unlock() // 启动单独的goroutine定期获取主题列表 go c.refreshTopics(handlerFunc) // 启动消费者协程 wg.Add(1) go func() { defer wg.Done() c.mutex.Lock() topics := c.topics // 获取当前的主题列表 c.mutex.Unlock() log.Printf("Start Topics: %v", topics) if err := c.consumer.Consume(c.ctx, topics, handlerFunc); err != nil { fmt.Printf("Error from consumer: %v\n", err) } }() // ··· // 等待消费者和信号监听器完成 wg.Wait() }
refreshTopics
func (c *Consumer) refreshTopics(handlerFunc *consumerHandler) { var consumerShutdown sync.WaitGroup ticker := time.NewTicker(5 * time.Second) // 定时器,每隔一分钟获取一次主题列表 for { select { case <-ticker.C: // 刷新元数据缓存 if err := c.client.RefreshMetadata(); err != nil { log.Printf("Error refreshing metadata: %v", err) continue } topics, err := c.client.Topics() if err != nil { log.Printf("Error refreshing topics: %v", err) continue } filteredTopics := filterTopics(topics) // 过滤掉内部主题 "__consumer_offsets" log.Printf("All Topics: %v", c.topics) // 主题列表发生了变化 if !util.EqualSlices(filteredTopics, c.topics) { c.mutex.Lock() c.topics = filteredTopics // 更新主题列表 c.mutex.Unlock() consumerShutdown.Wait() // 关闭旧的消费者 c.cancel() // 创建一个新的消费者 c.ctx, c.cancel = context.WithCancel(context.Background()) go func() { consumerShutdown.Add(1) // 增加计数器 log.Printf("New Topics: %v", c.topics) if err := c.consumer.Consume(c.ctx, c.topics, handlerFunc); err != nil { fmt.Printf("Error from consumer: %v\n", err) } consumerShutdown.Done() // 减少计数器 }() } case <-c.stopSignal: ticker.Stop() return } } }
其他
// 过滤掉内部主题 "__consumer_offsets" func filterTopics(topics []string) []string { filteredTopics := make([]string, 0) for _, topic := range topics { if topic != "__consumer_offsets" { filteredTopics = append(filteredTopics, topic) } } return filteredTopics } // consumerHandler implements sarama.ConsumerGroupHandler type consumerHandler struct { messageCount *int maxMessages int mutex sync.Mutex // 互斥锁 timeExec time.Duration // 超时时间 ticker *time.Ticker // 定时器 topicPattern *regexp.Regexp messageMap map[string]map[string][][]byte handler func(map[string]map[string][][]byte) error } // Setup is called when the consumer group session is established. func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil } // Cleanup is called when the consumer group session is terminated. func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error { fmt.Println("关闭消费者") return nil } // ConsumeClaim is called when a new claim is received. func (h *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { h.ticker = time.NewTicker(h.timeExec) defer h.ticker.Stop() for { select { case <-h.ticker.C: h.mutex.Lock() if len(h.messageMap) > 0 { fmt.Println(fmt.Sprintf("%s 超时,执行插入 %d", util.ZTime(""), *h.messageCount)) // 每30秒触发一次定时器 go func(messageMap map[string]map[string][][]byte) { err := h.handler(messageMap) if err != nil { // 错误处理,根据实际情况进行操作 } }(h.messageMap) *h.messageCount = 0 // 重置计数器 h.messageMap = make(map[string]map[string][][]byte) // 清空 messageMap } h.mutex.Unlock() case message := <-claim.Messages(): h.mutex.Lock() // 加锁 if message != nil { topicMap := h.messageMap[message.Topic] if topicMap == nil { topicMap = make(map[string][][]byte) h.messageMap[message.Topic] = topicMap } keySlice := topicMap[string(message.Key)] keySlice = append(keySlice, message.Value) topicMap[string(message.Key)] = keySlice h.mutex.Unlock() // 解锁 session.MarkMessage(message, "") // 标记消息为已消费 h.mutex.Lock() // 计算数量 *h.messageCount++ if *h.messageCount >= h.maxMessages { fmt.Println(fmt.Sprintf("%s 到达阈值 %d,执行插入", util.ZTime(""), *h.messageCount)) go func(messageMap map[string]map[string][][]byte) { err := h.handler(messageMap) if err != nil { // 错误处理,根据实际情况进行操作 } }(h.messageMap) *h.messageCount = 0 // 重置计数器 h.messageMap = make(map[string]map[string][][]byte) // 清空 messageMap } h.mutex.Unlock() } } } }
修改之后可以正常关闭旧的消费者,启动新的消费者也没有报错,但是ConsumeClaim
中的message := <-claim.Messages()
获取到的是nil,也就是message = nil
,随后程序结束,请各位大佬帮帮忙解答一下,十分感谢
第 1 个答案:
type Consumer struct { // ... ctx context.Context cancel context.CancelFunc // ... } func (c *Consumer) ConsumeMessages(handler func(map[string]map[string][][]byte) error) { // ... c.ctx, c.cancel = context.WithCancel(context.Background()) if err := c.consumer.Consume(c.ctx, topics, handlerFunc); err != nil { fmt.Printf("Error from consumer: %v\n", err) } // ... } func (c *Consumer) refreshTopics() { // ... for { select { case <-ticker.C: // ... // 主题列表发生了变化 if !reflect.DeepEqual(filteredTopics, c.topics) { c.mutex.Lock() c.topics = filteredTopics // 更新主题列表 c.mutex.Unlock() // 关闭旧的消费者 c.cancel() // 创建一个新的消费者 c.ctx, c.cancel = context.WithCancel(context.Background()) go c.ConsumeMessages(handler) } case <-c.stopSignal: ticker.Stop() return } } }
chrome 将断点放在有async关键字的方法上时,按下一步按钮,它直接跳出了方法,而不是走下一步,要怎么处理呢?https://www ...