From 07a102c6ea09fe51288d1f3a294546f97dcbef15 Mon Sep 17 00:00:00 2001 From: duanhf2012 <6549168@qq.com> Date: Fri, 11 Oct 2024 09:00:12 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=8F=91=E5=B8=83=E8=AE=A2?= =?UTF-8?q?=E9=98=85=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- util/pattern/pubsub/pubsub.go | 173 +++++++++++++++++++++++++++++ util/pattern/pubsub/pubsub_test.go | 54 +++++++++ 2 files changed, 227 insertions(+) create mode 100644 util/pattern/pubsub/pubsub.go create mode 100644 util/pattern/pubsub/pubsub_test.go diff --git a/util/pattern/pubsub/pubsub.go b/util/pattern/pubsub/pubsub.go new file mode 100644 index 0000000..604a2fb --- /dev/null +++ b/util/pattern/pubsub/pubsub.go @@ -0,0 +1,173 @@ +package pubsub + +import ( + "container/list" + "sync/atomic" +) + +type TopicType int +type Key uint64 + +type IBaseSubscriber interface { + OnSubscribe(key Key) + GetKey() Key +} + +type ISubscriber interface { + IBaseSubscriber + OnEvent(ctx ...any) +} + +type IPublisher interface { + Publish(topic TopicType, ctx ...any) + Subscribe(topic TopicType, sub ISubscriber) + UnSubscribe(topic TopicType) + UnSubscribeKey(key Key) +} + +var keyID uint64 + +func genKeyID() Key { + return Key(atomic.AddUint64(&keyID, 1)) +} + +type KeyData struct { + subscriber ISubscriber + topicType TopicType + keyElement *list.Element +} + +type SubscriberSet map[Key]KeyData +type TopicSet map[TopicType]*list.List + +type Publisher struct { + subscriberSet SubscriberSet + topicSet TopicSet +} + +func (set *SubscriberSet) init() { + *set = make(SubscriberSet, 64) +} + +func (set *SubscriberSet) add(keyElement *list.Element, topicType TopicType, subscriber ISubscriber) { + (*set)[keyElement.Value.(Key)] = KeyData{subscriber: subscriber, topicType: topicType, keyElement: keyElement} +} + +func (set *SubscriberSet) del(key Key) { + delete(*set, key) +} + +func (set *SubscriberSet) get(key Key) (KeyData, bool) { + keyData, ok := (*set)[key] + if !ok { + return keyData, false + } + + return keyData, true +} + +func (set *TopicSet) init() { + *set = make(TopicSet, 64) +} + +func (set *TopicSet) add(topic TopicType, key Key) *list.Element { + keyList := (*set)[topic] + if keyList == nil { + keyList = list.New() + (*set)[topic] = keyList + } + + return keyList.PushBack(key) +} + +func (set *TopicSet) del(topic TopicType, keyElement *list.Element) { + keyList := (*set)[topic] + if keyList == nil { + return + } + + keyList.Remove(keyElement) +} + +func (set *TopicSet) foreach(topic TopicType, cb func(key Key)) { + keyList := (*set)[topic] + if keyList == nil { + return + } + for e := keyList.Front(); e != nil; e = e.Next() { + cb(e.Value.(Key)) + } +} + +type BaseSubscriber struct { + key Key +} + +func (bs *BaseSubscriber) OnSubscribe(key Key) { + bs.key = key +} + +func (bs *BaseSubscriber) GetKey() Key { + return bs.key +} + +func (pub *Publisher) lazyInit() { + if pub.subscriberSet == nil { + pub.subscriberSet.init() + } + if pub.topicSet == nil { + pub.topicSet.init() + } +} + +func (pub *Publisher) add(topic TopicType, sub ISubscriber) Key { + key := genKeyID() + ele := pub.topicSet.add(topic, key) + pub.subscriberSet.add(ele, topic, sub) + + return key +} + +func (pub *Publisher) Publish(topic TopicType, ctx ...any) { + pub.lazyInit() + pub.topicSet.foreach(topic, func(key Key) { + keyData, ok := pub.subscriberSet.get(key) + if ok == false { + return + } + keyData.subscriber.OnEvent(ctx...) + }) +} + +func (pub *Publisher) Subscribe(topic TopicType, sub ISubscriber) bool { + if topic == 0 { + return false + } + + pub.lazyInit() + sub.OnSubscribe(pub.add(topic, sub)) + return true +} + +func (pub *Publisher) UnSubscribe(topic TopicType) { + keyList := pub.topicSet[topic] + if keyList == nil { + return + } + + for e := keyList.Front(); e != nil; e = e.Next() { + pub.subscriberSet.del(e.Value.(Key)) + } + + delete(pub.topicSet, topic) +} + +func (pub *Publisher) UnSubscribeKey(key Key) { + keyData, ok := pub.subscriberSet.get(key) + if ok == false { + return + } + + pub.topicSet.del(keyData.topicType, keyData.keyElement) + pub.subscriberSet.del(key) +} diff --git a/util/pattern/pubsub/pubsub_test.go b/util/pattern/pubsub/pubsub_test.go new file mode 100644 index 0000000..da5d63c --- /dev/null +++ b/util/pattern/pubsub/pubsub_test.go @@ -0,0 +1,54 @@ +package pubsub + +import ( + "testing" +) + +const ( + Invalid TopicType = iota + Topic1 + Topic2 +) + +var test *testing.T + +type Subscriber1 struct { + BaseSubscriber +} + +type Subscriber2 struct { + BaseSubscriber +} + +func (sub *Subscriber1) OnEvent(ctx ...any) { + test.Log("Subscriber1 OnEvent", " key ", sub.GetKey(), ctx) +} + +func (sub *Subscriber2) OnEvent(ctx ...any) { + test.Log("Subscriber2 OnEvent", " key ", sub.GetKey(), ctx) +} + +func TestPubSub(t *testing.T) { + test = t + var publisher Publisher + + // 创建3个订阅者 + var subscriber []ISubscriber + subscriber = append(subscriber, &Subscriber1{}, &Subscriber1{}, &Subscriber2{}) + + // 分别注册进Publisher中 + publisher.Subscribe(Topic1, subscriber[0]) + publisher.Subscribe(Topic1, subscriber[1]) + publisher.Subscribe(Topic2, subscriber[2]) + + // 发布订阅,两个Subscriber1都会调用OnEvent + publisher.Publish(Topic1, 1, 2, 3) + + // 删除订阅,Publish后,只有Subscriber1的key2收到 + publisher.UnSubscribeKey(subscriber[0].GetKey()) + publisher.Publish(Topic1, 1, 2, 3) + + // 删除Topic2,Publish将收不到 + publisher.UnSubscribe(Topic2) + publisher.Publish(Topic2, 1) +}