diff --git a/sysmodule/kafkamodule/Admin.go b/sysmodule/kafkamodule/Admin.go new file mode 100644 index 0000000..5896ede --- /dev/null +++ b/sysmodule/kafkamodule/Admin.go @@ -0,0 +1,65 @@ +package kafkamodule + +import "github.com/IBM/sarama" + +type KafkaAdmin struct { + sarama.ClusterAdmin + + mapTopic map[string]sarama.TopicDetail +} + +func (ka *KafkaAdmin) Setup(kafkaVersion string, addrs []string) error { + config := sarama.NewConfig() + var err error + config.Version, err = sarama.ParseKafkaVersion(kafkaVersion) + if err != nil { + return err + } + + ka.ClusterAdmin, err = sarama.NewClusterAdmin(addrs, config) + if err != nil { + return err + } + + ka.mapTopic, err = ka.GetTopics() + if err != nil { + ka.ClusterAdmin.Close() + return err + } + + return nil +} + +func (ka *KafkaAdmin) RefreshTopic() error { + var err error + ka.mapTopic, err = ka.GetTopics() + + return err +} + +func (ka *KafkaAdmin) HasTopic(topic string) bool { + _, ok := ka.mapTopic[topic] + + return ok +} + +func (ka *KafkaAdmin) GetTopicDetail(topic string) *sarama.TopicDetail { + topicDetail, ok := ka.mapTopic[topic] + if ok == false { + return nil + } + + return &topicDetail +} + +func (ka *KafkaAdmin) GetTopics() (map[string]sarama.TopicDetail, error) { + return ka.ListTopics() +} + +// CreateTopic 创建主题 +// numPartitions分区数 +// replicationFactor副本数 +// validateOnly参数执行操作时只进行参数验证而不实际执行操作 +func (ka *KafkaAdmin) CreateTopic(topic string, numPartitions int32, replicationFactor int16, validateOnly bool) error { + return ka.ClusterAdmin.CreateTopic(topic, &sarama.TopicDetail{NumPartitions: numPartitions, ReplicationFactor: replicationFactor}, validateOnly) +} diff --git a/sysmodule/kafkamodule/Consumer.go b/sysmodule/kafkamodule/Consumer.go new file mode 100644 index 0000000..ec448fc --- /dev/null +++ b/sysmodule/kafkamodule/Consumer.go @@ -0,0 +1,289 @@ +package kafkamodule + +import ( + "context" + "fmt" + "github.com/IBM/sarama" + "github.com/duanhf2012/origin/v2/log" + "sync" + "time" +) + +type ConsumerGroup struct { + sarama.ConsumerGroup + waitGroup sync.WaitGroup + + chanExit chan error + ready chan bool + cancel context.CancelFunc + groupId string +} + +func NewConsumerConfig(kafkaVersion string, assignor string, offsetsInitial int64) (*sarama.Config, error) { + var err error + + config := sarama.NewConfig() + config.Version, err = sarama.ParseKafkaVersion(kafkaVersion) + config.Consumer.Offsets.Initial = offsetsInitial + config.Consumer.Offsets.AutoCommit.Enable = false + + switch assignor { + case "sticky": + // 黏性roundRobin,rebalance之后首先保证前面的分配,从后面剥离 + // topic:T0{P0,P1,P2,P3,P4,P5},消费者:C1,C2 + // ---------------before rebalance:即roundRobin + // C1: T0{P0} T0{P2} T0{P4} + // C2: T0{P1} T0{P3} T0{P5} + // ----------------after rebalance:增加了一个消费者 + // C1: T0{P0} T0{P2} + // C2: T0{P1} T0{P3} + // C3: T0{P4} T0{P5} until每个消费者的分区数误差不超过1 + config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()} + case "roundrobin": + // roundRobin --逐个平均分发 + // topic: T0{P0,P1,P2},T1{P0,P1,P2,P3}两个消费者C1,C2 + // C1: T0{P0} T0{P2} T1{P1} T1{P3} + // C2: T0{P1} T1{P0} T1{P2} + config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} + case "range": + // 默认值 --一次平均分发 + // topic: T0{P0,P1,P2,P3},T1{P0,P1,P2,P3},两个消费者C1,C2 + // T1分区总数6 / 消费者数2 = 3 ,即该会话的分区每个消费者分3个 + // T2分区总数4 / 消费者数2 = 2 ,即该会话的分区每个消费者分2个 + // C1: T0{P0, P1, P2} T1{P0, P1} + // C2: T0{P3, P4, P5} T1{P2, P3} + config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()} + default: + return nil, fmt.Errorf("Unrecognized consumer group partition assignor: %s", assignor) + } + + if err != nil { + return nil, err + } + + return config, nil +} + +type IMsgReceiver interface { + Receiver(msgs []*sarama.ConsumerMessage) bool +} + +func (c *ConsumerGroup) Setup(addr []string, topics []string, groupId string, config *sarama.Config, receiverInterval time.Duration, maxReceiverNum int, msgReceiver IMsgReceiver) error { + var err error + c.ConsumerGroup, err = sarama.NewConsumerGroup(addr, groupId, config) + if err != nil { + return nil + } + c.groupId = groupId + c.chanExit = make(chan error, 1) + + var handler ConsumerGroupHandler + handler.receiver = msgReceiver + handler.maxReceiverNum = maxReceiverNum + handler.receiverInterval = receiverInterval + handler.chanExit = c.chanExit + + var ctx context.Context + ctx, c.cancel = context.WithCancel(context.Background()) + + c.waitGroup.Add(1) + go func() { + defer c.waitGroup.Done() + + for { + if err = c.Consume(ctx, topics, &handler); err != nil { + // 当setup失败的时候,error会返回到这里 + log.Error("Error from consumer", log.Any("err", err)) + return + } + + // check if context was cancelled, signaling that the consumer should stop + if ctx.Err() != nil { + log.Info("consumer stop", log.Any("info", ctx.Err())) + } + + c.chanExit <- err + } + }() + + err = <-c.chanExit + //已经准备好了 + return err +} + +func (c *ConsumerGroup) Close() { + log.Info("close consumerGroup") + //1.cancel掉 + c.cancel() + + //2.关闭连接 + err := c.ConsumerGroup.Close() + if err != nil { + log.Error("close consumerGroup fail", log.Any("err", err.Error())) + } + + //3.等待退出 + c.waitGroup.Wait() +} + +type ConsumerGroupHandler struct { + receiver IMsgReceiver + + receiverInterval time.Duration + maxReceiverNum int + + //mapTopicOffset map[string]map[int32]int //map[topic]map[partitionId]offsetInfo + mapTopicData map[string]*MsgData + mx sync.Mutex + + chanExit chan error + isRebalance bool //是否为再平衡 + //stopSig *int32 +} + +type MsgData struct { + sync.Mutex + msg []*sarama.ConsumerMessage + + mapPartitionOffset map[int32]int64 +} + +func (ch *ConsumerGroupHandler) Flush(session sarama.ConsumerGroupSession, topic string) { + if topic != "" { + msgData := ch.GetMsgData(topic) + msgData.flush(session, ch.receiver, topic) + return + } + + for tp, msgData := range ch.mapTopicData { + msgData.flush(session, ch.receiver, tp) + } +} + +func (ch *ConsumerGroupHandler) GetMsgData(topic string) *MsgData { + ch.mx.Lock() + defer ch.mx.Unlock() + + msgData := ch.mapTopicData[topic] + if msgData == nil { + msgData = &MsgData{} + msgData.msg = make([]*sarama.ConsumerMessage, 0, ch.maxReceiverNum) + ch.mapTopicData[topic] = msgData + } + + return msgData +} + +func (md *MsgData) flush(session sarama.ConsumerGroupSession, receiver IMsgReceiver, topic string) { + if len(md.msg) == 0 { + return + } + + //发送给接收者 + for { + ok := receiver.Receiver(md.msg) + if ok == true { + break + } + } + + for pId, offset := range md.mapPartitionOffset { + + session.MarkOffset(topic, pId, offset+1, "") + log.Info(fmt.Sprintf("topic %s,pid %d,offset %d", topic, pId, offset+1)) + } + session.Commit() + //log.Info("commit") + //time.Sleep(1000 * time.Second) + //置空 + md.msg = md.msg[:0] + clear(md.mapPartitionOffset) +} + +func (md *MsgData) appendMsg(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage, receiver IMsgReceiver, maxReceiverNum int) { + md.Lock() + defer md.Unlock() + + //收到的offset只会越来越大在 + if md.mapPartitionOffset == nil { + md.mapPartitionOffset = make(map[int32]int64, 10) + } + + md.mapPartitionOffset[msg.Partition] = msg.Offset + + md.msg = append(md.msg, msg) + if len(md.msg) < maxReceiverNum { + return + } + + md.flush(session, receiver, msg.Topic) +} + +func (ch *ConsumerGroupHandler) AppendMsg(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) { + dataMsg := ch.GetMsgData(msg.Topic) + dataMsg.appendMsg(session, msg, ch.receiver, ch.maxReceiverNum) +} + +func (ch *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { + ch.mapTopicData = make(map[string]*MsgData, 128) + + if ch.isRebalance == false { + ch.chanExit <- nil + } + + ch.isRebalance = true + return nil +} + +func (ch *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { + ch.Flush(session, "") + return nil +} + +func (ch *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + + ticker := time.NewTicker(ch.receiverInterval) + + for { + select { + case msg := <-claim.Messages(): + if msg == nil { + log.SWarning("claim will exit", log.Any("topic", claim.Topic()), log.Any("Partition", claim.Partition())) + return nil + } + ch.AppendMsg(session, msg) + case <-ticker.C: + ch.Flush(session, claim.Topic()) + case <-session.Context().Done(): + return nil + } + } +} + +/* +阿里云参数说明:https://sls.aliyun.com/doc/oscompatibledemo/sarama_go_kafka_consume.html +conf.Net.TLS.Enable = true + conf.Net.SASL.Enable = true + conf.Net.SASL.User = project + conf.Net.SASL.Password = fmt.Sprintf("%s#%s", accessId, accessKey) + conf.Net.SASL.Mechanism = "PLAIN" + + + + conf.Net.TLS.Enable = true + conf.Net.SASL.Enable = true + conf.Net.SASL.User = project + conf.Net.SASL.Password = fmt.Sprintf("%s#%s", accessId, accessKey) + conf.Net.SASL.Mechanism = "PLAIN" + + conf.Consumer.Fetch.Min = 1 + conf.Consumer.Fetch.Default = 1024 * 1024 + conf.Consumer.Retry.Backoff = 2 * time.Second + conf.Consumer.MaxWaitTime = 250 * time.Millisecond + conf.Consumer.MaxProcessingTime = 100 * time.Millisecond + conf.Consumer.Return.Errors = false + conf.Consumer.Offsets.AutoCommit.Enable = true + conf.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second + conf.Consumer.Offsets.Initial = sarama.OffsetOldest + conf.Consumer.Offsets.Retry.Max = 3 +*/ diff --git a/sysmodule/kafkamodule/Producer.go b/sysmodule/kafkamodule/Producer.go new file mode 100644 index 0000000..a5409d4 --- /dev/null +++ b/sysmodule/kafkamodule/Producer.go @@ -0,0 +1,146 @@ +package kafkamodule + +import ( + "context" + "github.com/IBM/sarama" + "github.com/duanhf2012/origin/v2/log" + "github.com/duanhf2012/origin/v2/service" + "time" +) + +type IProducer interface { +} + +type SyncProducer struct { +} + +type AsyncProducer struct { +} + +type Producer struct { + service.Module + + sarama.SyncProducer + + sarama.AsyncProducer +} + +// NewProducerConfig 新建producerConfig +// kafkaVersion kafka版本 +// returnErr,returnSucc 是否返回错误与成功 +// requiredAcks -1 #全量同步确认,强可靠性保证(当所有的 leader 和 follower 都接收成功时)#WaitForAll 1 #leader 确认收到, 默认(仅 leader 反馈)#WaitForLocal 0 #不确认,但是吞吐量大(不 care 结果) #NoResponse +// Idempotent(幂等) 确保信息都准确写入一份副本,用于幂等生产者,当这一项设置为true的时候,生产者将保证生产的消息一定是有序且精确一次的 +// partitioner 生成分区器,用于选择向哪个分区发送信息,默认情况下对消息密钥进行散列 +func NewProducerConfig(kafkaVersion string, returnErr bool, returnSucc bool, requiredAcks sarama.RequiredAcks, Idempotent bool, + partitioner sarama.PartitionerConstructor) (*sarama.Config, error) { + config := sarama.NewConfig() + var err error + config.Version, err = sarama.ParseKafkaVersion(kafkaVersion) + if err != nil { + return nil, err + } + + config.Producer.Return.Errors = returnErr + config.Producer.Return.Successes = returnSucc + config.Producer.RequiredAcks = requiredAcks + config.Producer.Partitioner = partitioner + config.Producer.Timeout = 10 * time.Second + + config.Producer.Idempotent = Idempotent + if Idempotent == true { + config.Net.MaxOpenRequests = 1 + } + return config, nil +} + +func (p *Producer) SyncSetup(addr []string, config *sarama.Config) error { + var err error + p.SyncProducer, err = sarama.NewSyncProducer(addr, config) + if err != nil { + return err + } + + return nil +} + +func (p *Producer) ASyncSetup(addr []string, config *sarama.Config) error { + var err error + p.AsyncProducer, err = sarama.NewAsyncProducer(addr, config) + if err != nil { + return err + } + + go func() { + p.asyncRun() + }() + return nil +} + +func (p *Producer) asyncRun() { + for { + select { + case sm := <-p.Successes(): + if sm.Metadata == nil { + break + } + asyncReturn := sm.Metadata.(*AsyncReturn) + asyncReturn.chanReturn <- asyncReturn + case em := <-p.Errors(): + log.Error("async kafkamodule error", log.ErrorAttr("err", em.Err)) + if em.Msg.Metadata == nil { + break + } + asyncReturn := em.Msg.Metadata.(*AsyncReturn) + asyncReturn.Err = em.Err + asyncReturn.chanReturn <- asyncReturn + } + } +} + +type AsyncReturn struct { + Msg *sarama.ProducerMessage + Err error + chanReturn chan *AsyncReturn +} + +func (ar *AsyncReturn) WaitOk(ctx context.Context) (*sarama.ProducerMessage, error) { + asyncReturn := ar.Msg.Metadata.(*AsyncReturn) + select { + case <-asyncReturn.chanReturn: + return asyncReturn.Msg, asyncReturn.Err + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func (p *Producer) AsyncSendMessage(msg *sarama.ProducerMessage) *AsyncReturn { + asyncReturn := AsyncReturn{Msg: msg, chanReturn: make(chan *AsyncReturn, 1)} + msg.Metadata = &asyncReturn + p.AsyncProducer.Input() <- msg + + return &asyncReturn +} + +func (p *Producer) AsyncPushMessage(msg *sarama.ProducerMessage) { + p.AsyncProducer.Input() <- msg +} + +func (p *Producer) Close() { + if p.SyncProducer != nil { + p.SyncProducer.Close() + p.SyncProducer = nil + } + + if p.AsyncProducer != nil { + p.AsyncProducer.Close() + p.AsyncProducer = nil + } +} + +func (p *Producer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { + return p.SyncProducer.SendMessage(msg) +} + +func (p *Producer) SendMessages(msgs []*sarama.ProducerMessage) error { + return p.SyncProducer.SendMessages(msgs) +} diff --git a/sysmodule/kafkamodule/ProducerAndConsumer_test.go b/sysmodule/kafkamodule/ProducerAndConsumer_test.go new file mode 100644 index 0000000..57c7024 --- /dev/null +++ b/sysmodule/kafkamodule/ProducerAndConsumer_test.go @@ -0,0 +1,151 @@ +package kafkamodule + +import ( + "context" + "fmt" + "github.com/IBM/sarama" + "testing" + "time" +) + +// 对各参数和机制名称的说明:https://blog.csdn.net/u013311345/article/details/129217728 +type MsgReceiver struct { + t *testing.T +} + +func (mr *MsgReceiver) Receiver(msgs []*sarama.ConsumerMessage) bool { + for _, m := range msgs { + mr.t.Logf("time:%s, topic:%s, partition:%d, offset:%d, key:%s, value:%s", time.Now().Format("2006-01-02 15:04:05.000"), m.Topic, m.Partition, m.Offset, m.Key, string(m.Value)) + } + + return true +} + +var addr = []string{"192.168.13.24:9092", "192.168.13.24:9093", "192.168.13.24:9094", "192.168.13.24:9095"} +var topicName = []string{"test_topic_1", "test_topic_2"} +var kafkaVersion = "3.3.1" + +func producer(t *testing.T) { + var admin KafkaAdmin + err := admin.Setup(kafkaVersion, addr) + if err != nil { + t.Fatal(err) + } + + for _, tName := range topicName { + if admin.HasTopic(tName) == false { + err = admin.CreateTopic(tName, 2, 2, false) + t.Log(err) + } + } + + var pd Producer + cfg, err := NewProducerConfig(kafkaVersion, true, true, sarama.WaitForAll, false, sarama.NewHashPartitioner) + if err != nil { + t.Fatal(err) + } + + err = pd.SyncSetup(addr, cfg) + if err != nil { + t.Fatal(err) + } + + now := time.Now() + + //msgs := make([]*sarama.ProducerMessage, 0, 20000) + for i := 0; i < 20000; i++ { + var msg sarama.ProducerMessage + msg.Key = sarama.StringEncoder(fmt.Sprintf("%d", i)) + msg.Topic = topicName[0] + msg.Value = sarama.StringEncoder(fmt.Sprintf("i'm %d", i)) + pd.SendMessage(&msg) + //msgs = append(msgs, &msg) + } + //err = pd.SendMessages(msgs) + //t.Log(err) + t.Log(time.Now().Sub(now).Milliseconds()) + pd.Close() +} + +func producer_async(t *testing.T) { + var admin KafkaAdmin + err := admin.Setup(kafkaVersion, addr) + if err != nil { + t.Fatal(err) + } + + for _, tName := range topicName { + if admin.HasTopic(tName) == false { + err = admin.CreateTopic(tName, 10, 2, false) + t.Log(err) + } + } + + var pd Producer + cfg, err := NewProducerConfig(kafkaVersion, true, true, sarama.WaitForAll, false, sarama.NewHashPartitioner) + if err != nil { + t.Fatal(err) + } + + err = pd.ASyncSetup(addr, cfg) + if err != nil { + t.Fatal(err) + } + + now := time.Now() + + msgs := make([]*AsyncReturn, 0, 20000) + for i := 0; i < 200000; i++ { + var msg sarama.ProducerMessage + msg.Key = sarama.StringEncoder(fmt.Sprintf("%d", i)) + msg.Topic = topicName[0] + msg.Value = sarama.StringEncoder(fmt.Sprintf("i'm %d", i)) + + r := pd.AsyncSendMessage(&msg) + msgs = append(msgs, r) + } + //err = pd.SendMessages(msgs) + //t.Log(err) + + for _, r := range msgs { + r.WaitOk(context.Background()) + //t.Log(m, e) + } + t.Log(time.Now().Sub(now).Milliseconds()) + + time.Sleep(1000 * time.Second) + pd.Close() +} + +func consumer(t *testing.T) { + var admin KafkaAdmin + err := admin.Setup(kafkaVersion, addr) + if err != nil { + t.Fatal(err) + } + + for _, tName := range topicName { + if admin.HasTopic(tName) == false { + err = admin.CreateTopic(tName, 10, 2, false) + t.Log(err) + } + } + + var cg ConsumerGroup + cfg, err := NewConsumerConfig(kafkaVersion, "sticky", sarama.OffsetOldest) + if err != nil { + t.Fatal(err) + } + + err = cg.Setup(addr, topicName, "test_groupId", cfg, 50*time.Second, 10, &MsgReceiver{t: t}) + t.Log(err) + time.Sleep(10000 * time.Second) + cg.Close() +} + +func TestConsumerAndProducer(t *testing.T) { + producer_async(t) + //go producer(t) + //producer(t) + //consumer(t) +} diff --git a/sysmodule/kafkamodule/Sasl.go b/sysmodule/kafkamodule/Sasl.go new file mode 100644 index 0000000..c9bbb69 --- /dev/null +++ b/sysmodule/kafkamodule/Sasl.go @@ -0,0 +1,7 @@ +package kafkamodule + +type Sasl struct { + UserName string `json:"UserName"` + Passwd string `json:"Passwd"` + InstanceId string `json:"InstanceId"` +}