mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-03 22:45:13 +08:00
152 lines
3.3 KiB
Go
152 lines
3.3 KiB
Go
package kafkamodule
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"github.com/IBM/sarama"
|
||
"testing"
|
||
"time"
|
||
)
|
||
|
||
// 对各参数和机制名称的说明:https://blog.csdn.net/u013311345/article/details/129217728
|
||
type MsgReceiver struct {
|
||
t *testing.T
|
||
}
|
||
|
||
func (mr *MsgReceiver) Receiver(msgs []*sarama.ConsumerMessage) bool {
|
||
for _, m := range msgs {
|
||
mr.t.Logf("time:%s, topic:%s, partition:%d, offset:%d, key:%s, value:%s", time.Now().Format("2006-01-02 15:04:05.000"), m.Topic, m.Partition, m.Offset, m.Key, string(m.Value))
|
||
}
|
||
|
||
return true
|
||
}
|
||
|
||
var addr = []string{"192.168.13.24:9092", "192.168.13.24:9093", "192.168.13.24:9094", "192.168.13.24:9095"}
|
||
var topicName = []string{"test_topic_1", "test_topic_2"}
|
||
var kafkaVersion = "3.3.1"
|
||
|
||
func producer(t *testing.T) {
|
||
var admin KafkaAdmin
|
||
err := admin.Setup(kafkaVersion, addr)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
|
||
for _, tName := range topicName {
|
||
if admin.HasTopic(tName) == false {
|
||
err = admin.CreateTopic(tName, 2, 2, false)
|
||
t.Log(err)
|
||
}
|
||
}
|
||
|
||
var pd Producer
|
||
cfg, err := NewProducerConfig(kafkaVersion, true, true, sarama.WaitForAll, false, sarama.NewHashPartitioner)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
|
||
err = pd.SyncSetup(addr, cfg)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
|
||
now := time.Now()
|
||
|
||
//msgs := make([]*sarama.ProducerMessage, 0, 20000)
|
||
for i := 0; i < 20000; i++ {
|
||
var msg sarama.ProducerMessage
|
||
msg.Key = sarama.StringEncoder(fmt.Sprintf("%d", i))
|
||
msg.Topic = topicName[0]
|
||
msg.Value = sarama.StringEncoder(fmt.Sprintf("i'm %d", i))
|
||
pd.SendMessage(&msg)
|
||
//msgs = append(msgs, &msg)
|
||
}
|
||
//err = pd.SendMessages(msgs)
|
||
//t.Log(err)
|
||
t.Log(time.Now().Sub(now).Milliseconds())
|
||
pd.Close()
|
||
}
|
||
|
||
func producer_async(t *testing.T) {
|
||
var admin KafkaAdmin
|
||
err := admin.Setup(kafkaVersion, addr)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
|
||
for _, tName := range topicName {
|
||
if admin.HasTopic(tName) == false {
|
||
err = admin.CreateTopic(tName, 10, 2, false)
|
||
t.Log(err)
|
||
}
|
||
}
|
||
|
||
var pd Producer
|
||
cfg, err := NewProducerConfig(kafkaVersion, true, true, sarama.WaitForAll, false, sarama.NewHashPartitioner)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
|
||
err = pd.ASyncSetup(addr, cfg)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
|
||
now := time.Now()
|
||
|
||
msgs := make([]*AsyncReturn, 0, 20000)
|
||
for i := 0; i < 200000; i++ {
|
||
var msg sarama.ProducerMessage
|
||
msg.Key = sarama.StringEncoder(fmt.Sprintf("%d", i))
|
||
msg.Topic = topicName[0]
|
||
msg.Value = sarama.StringEncoder(fmt.Sprintf("i'm %d", i))
|
||
|
||
r := pd.AsyncSendMessage(&msg)
|
||
msgs = append(msgs, r)
|
||
}
|
||
//err = pd.SendMessages(msgs)
|
||
//t.Log(err)
|
||
|
||
for _, r := range msgs {
|
||
r.WaitOk(context.Background())
|
||
//t.Log(m, e)
|
||
}
|
||
t.Log(time.Now().Sub(now).Milliseconds())
|
||
|
||
time.Sleep(1000 * time.Second)
|
||
pd.Close()
|
||
}
|
||
|
||
func consumer(t *testing.T) {
|
||
var admin KafkaAdmin
|
||
err := admin.Setup(kafkaVersion, addr)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
|
||
for _, tName := range topicName {
|
||
if admin.HasTopic(tName) == false {
|
||
err = admin.CreateTopic(tName, 10, 2, false)
|
||
t.Log(err)
|
||
}
|
||
}
|
||
|
||
var cg ConsumerGroup
|
||
cfg, err := NewConsumerConfig(kafkaVersion, "sticky", sarama.OffsetOldest)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
|
||
err = cg.Setup(addr, topicName, "test_groupId", cfg, 50*time.Second, 10, &MsgReceiver{t: t})
|
||
t.Log(err)
|
||
time.Sleep(10000 * time.Second)
|
||
cg.Close()
|
||
}
|
||
|
||
func TestConsumerAndProducer(t *testing.T) {
|
||
producer_async(t)
|
||
//go producer(t)
|
||
//producer(t)
|
||
//consumer(t)
|
||
}
|