mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-03 22:45:13 +08:00
新增发布订阅模式
This commit is contained in:
173
util/pattern/pubsub/pubsub.go
Normal file
173
util/pattern/pubsub/pubsub.go
Normal file
@@ -0,0 +1,173 @@
|
||||
package pubsub
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type TopicType int
|
||||
type Key uint64
|
||||
|
||||
type IBaseSubscriber interface {
|
||||
OnSubscribe(key Key)
|
||||
GetKey() Key
|
||||
}
|
||||
|
||||
type ISubscriber interface {
|
||||
IBaseSubscriber
|
||||
OnEvent(ctx ...any)
|
||||
}
|
||||
|
||||
type IPublisher interface {
|
||||
Publish(topic TopicType, ctx ...any)
|
||||
Subscribe(topic TopicType, sub ISubscriber)
|
||||
UnSubscribe(topic TopicType)
|
||||
UnSubscribeKey(key Key)
|
||||
}
|
||||
|
||||
var keyID uint64
|
||||
|
||||
func genKeyID() Key {
|
||||
return Key(atomic.AddUint64(&keyID, 1))
|
||||
}
|
||||
|
||||
type KeyData struct {
|
||||
subscriber ISubscriber
|
||||
topicType TopicType
|
||||
keyElement *list.Element
|
||||
}
|
||||
|
||||
type SubscriberSet map[Key]KeyData
|
||||
type TopicSet map[TopicType]*list.List
|
||||
|
||||
type Publisher struct {
|
||||
subscriberSet SubscriberSet
|
||||
topicSet TopicSet
|
||||
}
|
||||
|
||||
func (set *SubscriberSet) init() {
|
||||
*set = make(SubscriberSet, 64)
|
||||
}
|
||||
|
||||
func (set *SubscriberSet) add(keyElement *list.Element, topicType TopicType, subscriber ISubscriber) {
|
||||
(*set)[keyElement.Value.(Key)] = KeyData{subscriber: subscriber, topicType: topicType, keyElement: keyElement}
|
||||
}
|
||||
|
||||
func (set *SubscriberSet) del(key Key) {
|
||||
delete(*set, key)
|
||||
}
|
||||
|
||||
func (set *SubscriberSet) get(key Key) (KeyData, bool) {
|
||||
keyData, ok := (*set)[key]
|
||||
if !ok {
|
||||
return keyData, false
|
||||
}
|
||||
|
||||
return keyData, true
|
||||
}
|
||||
|
||||
func (set *TopicSet) init() {
|
||||
*set = make(TopicSet, 64)
|
||||
}
|
||||
|
||||
func (set *TopicSet) add(topic TopicType, key Key) *list.Element {
|
||||
keyList := (*set)[topic]
|
||||
if keyList == nil {
|
||||
keyList = list.New()
|
||||
(*set)[topic] = keyList
|
||||
}
|
||||
|
||||
return keyList.PushBack(key)
|
||||
}
|
||||
|
||||
func (set *TopicSet) del(topic TopicType, keyElement *list.Element) {
|
||||
keyList := (*set)[topic]
|
||||
if keyList == nil {
|
||||
return
|
||||
}
|
||||
|
||||
keyList.Remove(keyElement)
|
||||
}
|
||||
|
||||
func (set *TopicSet) foreach(topic TopicType, cb func(key Key)) {
|
||||
keyList := (*set)[topic]
|
||||
if keyList == nil {
|
||||
return
|
||||
}
|
||||
for e := keyList.Front(); e != nil; e = e.Next() {
|
||||
cb(e.Value.(Key))
|
||||
}
|
||||
}
|
||||
|
||||
type BaseSubscriber struct {
|
||||
key Key
|
||||
}
|
||||
|
||||
func (bs *BaseSubscriber) OnSubscribe(key Key) {
|
||||
bs.key = key
|
||||
}
|
||||
|
||||
func (bs *BaseSubscriber) GetKey() Key {
|
||||
return bs.key
|
||||
}
|
||||
|
||||
func (pub *Publisher) lazyInit() {
|
||||
if pub.subscriberSet == nil {
|
||||
pub.subscriberSet.init()
|
||||
}
|
||||
if pub.topicSet == nil {
|
||||
pub.topicSet.init()
|
||||
}
|
||||
}
|
||||
|
||||
func (pub *Publisher) add(topic TopicType, sub ISubscriber) Key {
|
||||
key := genKeyID()
|
||||
ele := pub.topicSet.add(topic, key)
|
||||
pub.subscriberSet.add(ele, topic, sub)
|
||||
|
||||
return key
|
||||
}
|
||||
|
||||
func (pub *Publisher) Publish(topic TopicType, ctx ...any) {
|
||||
pub.lazyInit()
|
||||
pub.topicSet.foreach(topic, func(key Key) {
|
||||
keyData, ok := pub.subscriberSet.get(key)
|
||||
if ok == false {
|
||||
return
|
||||
}
|
||||
keyData.subscriber.OnEvent(ctx...)
|
||||
})
|
||||
}
|
||||
|
||||
func (pub *Publisher) Subscribe(topic TopicType, sub ISubscriber) bool {
|
||||
if topic == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
pub.lazyInit()
|
||||
sub.OnSubscribe(pub.add(topic, sub))
|
||||
return true
|
||||
}
|
||||
|
||||
func (pub *Publisher) UnSubscribe(topic TopicType) {
|
||||
keyList := pub.topicSet[topic]
|
||||
if keyList == nil {
|
||||
return
|
||||
}
|
||||
|
||||
for e := keyList.Front(); e != nil; e = e.Next() {
|
||||
pub.subscriberSet.del(e.Value.(Key))
|
||||
}
|
||||
|
||||
delete(pub.topicSet, topic)
|
||||
}
|
||||
|
||||
func (pub *Publisher) UnSubscribeKey(key Key) {
|
||||
keyData, ok := pub.subscriberSet.get(key)
|
||||
if ok == false {
|
||||
return
|
||||
}
|
||||
|
||||
pub.topicSet.del(keyData.topicType, keyData.keyElement)
|
||||
pub.subscriberSet.del(key)
|
||||
}
|
||||
54
util/pattern/pubsub/pubsub_test.go
Normal file
54
util/pattern/pubsub/pubsub_test.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package pubsub
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
const (
|
||||
Invalid TopicType = iota
|
||||
Topic1
|
||||
Topic2
|
||||
)
|
||||
|
||||
var test *testing.T
|
||||
|
||||
type Subscriber1 struct {
|
||||
BaseSubscriber
|
||||
}
|
||||
|
||||
type Subscriber2 struct {
|
||||
BaseSubscriber
|
||||
}
|
||||
|
||||
func (sub *Subscriber1) OnEvent(ctx ...any) {
|
||||
test.Log("Subscriber1 OnEvent", " key ", sub.GetKey(), ctx)
|
||||
}
|
||||
|
||||
func (sub *Subscriber2) OnEvent(ctx ...any) {
|
||||
test.Log("Subscriber2 OnEvent", " key ", sub.GetKey(), ctx)
|
||||
}
|
||||
|
||||
func TestPubSub(t *testing.T) {
|
||||
test = t
|
||||
var publisher Publisher
|
||||
|
||||
// 创建3个订阅者
|
||||
var subscriber []ISubscriber
|
||||
subscriber = append(subscriber, &Subscriber1{}, &Subscriber1{}, &Subscriber2{})
|
||||
|
||||
// 分别注册进Publisher中
|
||||
publisher.Subscribe(Topic1, subscriber[0])
|
||||
publisher.Subscribe(Topic1, subscriber[1])
|
||||
publisher.Subscribe(Topic2, subscriber[2])
|
||||
|
||||
// 发布订阅,两个Subscriber1都会调用OnEvent
|
||||
publisher.Publish(Topic1, 1, 2, 3)
|
||||
|
||||
// 删除订阅,Publish后,只有Subscriber1的key2收到
|
||||
publisher.UnSubscribeKey(subscriber[0].GetKey())
|
||||
publisher.Publish(Topic1, 1, 2, 3)
|
||||
|
||||
// 删除Topic2,Publish将收不到
|
||||
publisher.UnSubscribe(Topic2)
|
||||
publisher.Publish(Topic2, 1)
|
||||
}
|
||||
Reference in New Issue
Block a user