go - GO的Kafka的问题,动态消费topic?

 

问题描述:

背景:不同的应用需要把不同的数据写入不同的表,所以规定了结构:
Topic是应用ID、key是表名、value是具体数据

主要需要实现的功能是:

1.到达指定条数时,执行批量插入数据库,未达到指定条数时,视为超时,不论多少条都执行插入
2.监听退出信号,有退出信号时,把map中的数据返回插入到数据库

遇到的问题:

项目启动会初始化执行KfkInitConsumerGroup、ConsumeMessages,因为这两个只会执行一次,所以订阅的Topic就不会更新了,当有新的Topic写入时,这个新的Topic不能被及时的订阅,不能正常消费了,求各位大佬指点怎么修改代码,能让Topic自动更新呢?让新的Topic也会被消费呢?下边是我的代码实现

目前使用的是 github.com/Shopify/sarama 库,具体实现代码:

package kfk

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "regexp"
    "sync"
    "syscall"
    "time"

    "github.com/Shopify/sarama"
)

type Consumer struct {
    brokers      []string
    groupID      string
    topics       []string
    stopSignal   chan struct{}
    client       sarama.Client
    config       *sarama.Config
    consumer     sarama.ConsumerGroup
    timeExec     time.Duration // 超时时间
    mutex        sync.Mutex    // 互斥锁
    messageLimit int           // 每分钟获取的消息数量限制
}

// KfkInitConsumerGroup initializes the consumer group.
func KfkInitConsumerGroup(brokers []string, groupID string, messageLimit int, timeExec time.Duration) (*Consumer, error) {
    config := sarama.NewConfig()
    config.Consumer.Offsets.AutoCommit.Enable = true
    config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange

    client, err := sarama.NewClient(brokers, config)
    if err != nil {
        log.Fatal(err)
    }

    consumer := &Consumer{
        brokers:      brokers,
        groupID:      groupID,
        config:       config,
        timeExec:     timeExec,
        messageLimit: messageLimit,
    }

    // 根据client创建consumerGroup
    fromConsumer, err := sarama.NewConsumerGroupFromClient(groupID, client)
    if err != nil {
        log.Fatalf("Error creating consumer group client: %v", err)
    }

    consumer.client = client
    consumer.consumer = fromConsumer
    consumer.stopSignal = make(chan struct{})
    return consumer, nil
}

// ConsumeMessages starts consuming messages.
func (c *Consumer) ConsumeMessages(handler func(map[string]map[string][][]byte) error) {

    // 设置捕捉中断信号的通道
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

    messageMap := make(map[string]map[string][][]byte)
    messageCount := 0             // 消息计数器
    maxMessages := c.messageLimit // 每分钟获取的消息数量限制

    //设置消费者处理函数
    handlerFunc := &consumerHandler{
        handler:      handler,
        messageMap:   messageMap,
        maxMessages:  maxMessages,
        timeExec:     c.timeExec,
        messageCount: &messageCount,
    }

    // 创建一个等待组,用于等待消费者和信号监听器完成
    wg := sync.WaitGroup{}

    // 在启动消费者之前初始化主题列表
    topics, err := c.client.Topics()
    if err != nil {
        log.Fatalf("Error getting topics: %v", err)
    }
    filteredTopics := filterTopics(topics)
    c.mutex.Lock()
    c.topics = filteredTopics
    c.mutex.Unlock()

    // 启动单独的goroutine定期获取主题列表
    go c.refreshTopics()

    // 启动消费者协程
    wg.Add(1)
    go func() {
        defer wg.Done()

        c.mutex.Lock()
        topics := c.topics // 获取当前的主题列表
        c.mutex.Unlock()

        log.Printf("Start Topics: %v", topics)

        if err := c.consumer.Consume(context.Background(), topics, handlerFunc); err != nil {
            fmt.Printf("Error from consumer: %v\n", err)
        }
    }()

    // 启动信号监听器协程
    wg.Add(1)
    go func() {
        defer wg.Done()

        // 创建一个信号通道
        sigChan := make(chan os.Signal, 1)
        signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

        // 阻塞等待信号
        <-sigChan

        // 发送停止信号给消费者协程
        close(c.stopSignal)

        err := handlerFunc.handler(handlerFunc.messageMap)
        if err != nil {

        }

        // 退出程序
        os.Exit(0)
    }()

    // 等待消费者和信号监听器完成
    wg.Wait()
}

func (c *Consumer) refreshTopics() {
    ticker := time.NewTicker(5 * time.Second) // 定时器,每隔一分钟获取一次主题列表

    for {
        select {
        case <-ticker.C:
            topics, err := c.client.Topics()
            if err != nil {
                log.Printf("Error refreshing topics: %v", err)
                continue
            }

            filteredTopics := filterTopics(topics) // 过滤掉内部主题 "__consumer_offsets"

            log.Printf("All Topics: %v", filteredTopics)

            c.mutex.Lock()
            c.topics = filteredTopics // 更新主题列表
            c.mutex.Unlock()
        case <-c.stopSignal:
            ticker.Stop()
            return
        }
    }
}

// 过滤掉内部主题 "__consumer_offsets"
func filterTopics(topics []string) []string {
    filteredTopics := make([]string, 0)
    for _, topic := range topics {
        if topic != "__consumer_offsets" {
            filteredTopics = append(filteredTopics, topic)
        }
    }
    return filteredTopics
}

// consumerHandler implements sarama.ConsumerGroupHandler
type consumerHandler struct {
    messageCount *int
    maxMessages  int
    mutex        sync.Mutex    // 互斥锁
    timeExec     time.Duration // 超时时间
    ticker       *time.Ticker  // 定时器
    topicPattern *regexp.Regexp
    messageMap   map[string]map[string][][]byte
    handler      func(map[string]map[string][][]byte) error
}

// Setup is called when the consumer group session is established.
func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {
    return nil
}

// Cleanup is called when the consumer group session is terminated.
func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

// ConsumeClaim is called when a new claim is received.
func (h *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

    h.ticker = time.NewTicker(h.timeExec)
    defer h.ticker.Stop()

    for {
        select {
        case <-h.ticker.C:
            h.mutex.Lock()

            if len(h.messageMap) > 0 {

                // 每30秒触发一次定时器
                go func(messageMap map[string]map[string][][]byte) {
                    err := h.handler(messageMap)
                    if err != nil {
                        // 错误处理,根据实际情况进行操作
                    }

                }(h.messageMap)

                *h.messageCount = 0 // 重置计数器

                h.messageMap = make(map[string]map[string][][]byte) // 清空 messageMap

            }

            h.mutex.Unlock()

        case message := <-claim.Messages():
            h.mutex.Lock() // 加锁

            topicMap := h.messageMap[message.Topic]
            if topicMap == nil {
                topicMap = make(map[string][][]byte)
                h.messageMap[message.Topic] = topicMap
            }

            keySlice := topicMap[string(message.Key)]
            keySlice = append(keySlice, message.Value)
            topicMap[string(message.Key)] = keySlice

            h.mutex.Unlock() // 解锁

            session.MarkMessage(message, "") // 标记消息为已消费

            h.mutex.Lock()
            // 计算数量
            *h.messageCount++
            if *h.messageCount >= h.maxMessages {

                go func(messageMap map[string]map[string][][]byte) {
                    err := h.handler(messageMap)
                    if err != nil {
                        // 错误处理,根据实际情况进行操作
                    }
                }(h.messageMap)

                *h.messageCount = 0 // 重置计数器

                h.messageMap = make(map[string]map[string][][]byte) // 清空 messageMap

            }
            h.mutex.Unlock()
        }
    }
}

------------------------------分割线------------------------------

按照一楼大哥的方式改了改,大概代码如下:

ConsumeMessages

// ConsumeMessages starts consuming messages.
func (c *Consumer) ConsumeMessages(handler func(map[string]map[string][][]byte) error) {

    // ···

    c.ctx, c.cancel = context.WithCancel(context.Background())

    //设置消费者处理函数
    handlerFunc := &consumerHandler{
        handler:      handler,
        messageMap:   messageMap,
        maxMessages:  maxMessages,
        timeExec:     c.timeExec,
        messageCount: &messageCount,
    }

    // 创建一个等待组,用于等待消费者和信号监听器完成
    wg := sync.WaitGroup{}

    // 在启动消费者之前初始化主题列表
    topics, err := c.client.Topics()
    if err != nil {
        log.Fatalf("Error getting topics: %v", err)
    }
    filteredTopics := filterTopics(topics)
    c.mutex.Lock()
    c.topics = filteredTopics
    c.mutex.Unlock()

    // 启动单独的goroutine定期获取主题列表
    go c.refreshTopics(handlerFunc)

    // 启动消费者协程
    wg.Add(1)
    go func() {
        defer wg.Done()

        c.mutex.Lock()
        topics := c.topics // 获取当前的主题列表
        c.mutex.Unlock()

        log.Printf("Start Topics: %v", topics)

        if err := c.consumer.Consume(c.ctx, topics, handlerFunc); err != nil {
            fmt.Printf("Error from consumer: %v\n", err)
        }
    }()

    // ···

    // 等待消费者和信号监听器完成
    wg.Wait()
}

refreshTopics

func (c *Consumer) refreshTopics(handlerFunc *consumerHandler) {

    var consumerShutdown sync.WaitGroup

    ticker := time.NewTicker(5 * time.Second) // 定时器,每隔一分钟获取一次主题列表

    for {
        select {
        case <-ticker.C:

            // 刷新元数据缓存
            if err := c.client.RefreshMetadata(); err != nil {
                log.Printf("Error refreshing metadata: %v", err)
                continue
            }

            topics, err := c.client.Topics()
            if err != nil {
                log.Printf("Error refreshing topics: %v", err)
                continue
            }

            filteredTopics := filterTopics(topics) // 过滤掉内部主题 "__consumer_offsets"
            log.Printf("All Topics: %v", c.topics)

            // 主题列表发生了变化
            if !util.EqualSlices(filteredTopics, c.topics) {

                c.mutex.Lock()
                c.topics = filteredTopics // 更新主题列表
                c.mutex.Unlock()

                consumerShutdown.Wait()

                // 关闭旧的消费者
                c.cancel()

                // 创建一个新的消费者
                c.ctx, c.cancel = context.WithCancel(context.Background())

                go func() {

                    consumerShutdown.Add(1) // 增加计数器

                    log.Printf("New Topics: %v", c.topics)

                    if err := c.consumer.Consume(c.ctx, c.topics, handlerFunc); err != nil {
                        fmt.Printf("Error from consumer: %v\n", err)
                    }

                    consumerShutdown.Done() // 减少计数器
                }()
            }
        case <-c.stopSignal:
            ticker.Stop()
            return
        }
    }
}

其他

// 过滤掉内部主题 "__consumer_offsets"
func filterTopics(topics []string) []string {
    filteredTopics := make([]string, 0)
    for _, topic := range topics {
        if topic != "__consumer_offsets" {
            filteredTopics = append(filteredTopics, topic)
        }
    }
    return filteredTopics
}

// consumerHandler implements sarama.ConsumerGroupHandler
type consumerHandler struct {
    messageCount *int
    maxMessages  int
    mutex        sync.Mutex    // 互斥锁
    timeExec     time.Duration // 超时时间
    ticker       *time.Ticker  // 定时器
    topicPattern *regexp.Regexp
    messageMap   map[string]map[string][][]byte
    handler      func(map[string]map[string][][]byte) error
}

// Setup is called when the consumer group session is established.
func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {
    return nil
}

// Cleanup is called when the consumer group session is terminated.
func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {

    fmt.Println("关闭消费者")
    return nil
}

// ConsumeClaim is called when a new claim is received.
func (h *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

    h.ticker = time.NewTicker(h.timeExec)
    defer h.ticker.Stop()

    for {
        select {
        case <-h.ticker.C:
            h.mutex.Lock()

            if len(h.messageMap) > 0 {

                fmt.Println(fmt.Sprintf("%s 超时,执行插入 %d", util.ZTime(""), *h.messageCount))

                // 每30秒触发一次定时器
                go func(messageMap map[string]map[string][][]byte) {
                    err := h.handler(messageMap)
                    if err != nil {
                        // 错误处理,根据实际情况进行操作
                    }

                }(h.messageMap)

                *h.messageCount = 0 // 重置计数器

                h.messageMap = make(map[string]map[string][][]byte) // 清空 messageMap

            }

            h.mutex.Unlock()

        case message := <-claim.Messages():
            h.mutex.Lock() // 加锁

            if message != nil {
                topicMap := h.messageMap[message.Topic]
                if topicMap == nil {
                    topicMap = make(map[string][][]byte)
                    h.messageMap[message.Topic] = topicMap
                }

                keySlice := topicMap[string(message.Key)]
                keySlice = append(keySlice, message.Value)
                topicMap[string(message.Key)] = keySlice

                h.mutex.Unlock() // 解锁

                session.MarkMessage(message, "") // 标记消息为已消费

                h.mutex.Lock()
                // 计算数量
                *h.messageCount++
                if *h.messageCount >= h.maxMessages {

                    fmt.Println(fmt.Sprintf("%s 到达阈值 %d,执行插入", util.ZTime(""), *h.messageCount))

                    go func(messageMap map[string]map[string][][]byte) {
                        err := h.handler(messageMap)
                        if err != nil {
                            // 错误处理,根据实际情况进行操作
                        }
                    }(h.messageMap)

                    *h.messageCount = 0 // 重置计数器

                    h.messageMap = make(map[string]map[string][][]byte) // 清空 messageMap

                }
                h.mutex.Unlock()
            }

        }
    }
}

修改之后可以正常关闭旧的消费者,启动新的消费者也没有报错,但是ConsumeClaim中的message := <-claim.Messages() 获取到的是nil,也就是message = nil,随后程序结束,请各位大佬帮帮忙解答一下,十分感谢


 

第 1 个答案:

type Consumer struct {
    // ...
    ctx    context.Context
    cancel context.CancelFunc
    // ...
}

func (c *Consumer) ConsumeMessages(handler func(map[string]map[string][][]byte) error) {
    // ...
    c.ctx, c.cancel = context.WithCancel(context.Background())

    if err := c.consumer.Consume(c.ctx, topics, handlerFunc); err != nil {
        fmt.Printf("Error from consumer: %v\n", err)
    }

    // ...
}

func (c *Consumer) refreshTopics() {
    // ...

    for {
        select {
        case <-ticker.C:
            // ...

            // 主题列表发生了变化
            if !reflect.DeepEqual(filteredTopics, c.topics) {
                c.mutex.Lock()
                c.topics = filteredTopics // 更新主题列表
                c.mutex.Unlock()

                // 关闭旧的消费者
                c.cancel()

                // 创建一个新的消费者
                c.ctx, c.cancel = context.WithCancel(context.Background())
                go c.ConsumeMessages(handler)
            }
        case <-c.stopSignal:
            ticker.Stop()
            return
        }
    }
}

chrome 将断点放在有async关键字的方法上时,按下一步按钮,它直接跳出了方法,而不是走下一步,要怎么处理呢?https://www ...