mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-03 22:45:13 +08:00
66 lines
1.4 KiB
Go
66 lines
1.4 KiB
Go
package kafkamodule
|
|
|
|
import "github.com/IBM/sarama"
|
|
|
|
type KafkaAdmin struct {
|
|
sarama.ClusterAdmin
|
|
|
|
mapTopic map[string]sarama.TopicDetail
|
|
}
|
|
|
|
func (ka *KafkaAdmin) Setup(kafkaVersion string, addrs []string) error {
|
|
config := sarama.NewConfig()
|
|
var err error
|
|
config.Version, err = sarama.ParseKafkaVersion(kafkaVersion)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ka.ClusterAdmin, err = sarama.NewClusterAdmin(addrs, config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ka.mapTopic, err = ka.GetTopics()
|
|
if err != nil {
|
|
ka.ClusterAdmin.Close()
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ka *KafkaAdmin) RefreshTopic() error {
|
|
var err error
|
|
ka.mapTopic, err = ka.GetTopics()
|
|
|
|
return err
|
|
}
|
|
|
|
func (ka *KafkaAdmin) HasTopic(topic string) bool {
|
|
_, ok := ka.mapTopic[topic]
|
|
|
|
return ok
|
|
}
|
|
|
|
func (ka *KafkaAdmin) GetTopicDetail(topic string) *sarama.TopicDetail {
|
|
topicDetail, ok := ka.mapTopic[topic]
|
|
if ok == false {
|
|
return nil
|
|
}
|
|
|
|
return &topicDetail
|
|
}
|
|
|
|
func (ka *KafkaAdmin) GetTopics() (map[string]sarama.TopicDetail, error) {
|
|
return ka.ListTopics()
|
|
}
|
|
|
|
// CreateTopic 创建主题
|
|
// numPartitions分区数
|
|
// replicationFactor副本数
|
|
// validateOnly参数执行操作时只进行参数验证而不实际执行操作
|
|
func (ka *KafkaAdmin) CreateTopic(topic string, numPartitions int32, replicationFactor int16, validateOnly bool) error {
|
|
return ka.ClusterAdmin.CreateTopic(topic, &sarama.TopicDetail{NumPartitions: numPartitions, ReplicationFactor: replicationFactor}, validateOnly)
|
|
}
|