From 66770f07a59b9ccfbefc683109f7f04fc51f089a Mon Sep 17 00:00:00 2001 From: orgin Date: Fri, 9 Dec 2022 12:42:47 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=B6=88=E6=81=AF=E9=98=9F?= =?UTF-8?q?=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../messagequeueservice/CustomerSubscriber.go | 4 +++- sysservice/messagequeueservice/MemoryQueue.go | 15 +++++++++++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/sysservice/messagequeueservice/CustomerSubscriber.go b/sysservice/messagequeueservice/CustomerSubscriber.go index f7a4deb..2ffbffc 100644 --- a/sysservice/messagequeueservice/CustomerSubscriber.go +++ b/sysservice/messagequeueservice/CustomerSubscriber.go @@ -25,6 +25,7 @@ type CustomerSubscriber struct { customerId string isStop int32 //退出标记 + topicCache []TopicData // 从消息队列中取出来的消息的缓存 } const DefaultOneBatchQuantity = 1000 @@ -79,6 +80,7 @@ func (cs *CustomerSubscriber) trySetSubscriberBaseInfo(rpcHandler rpc.IRpcHandle cs.StartIndex = uint64(zeroTime.Unix() << 32) } + cs.topicCache = make([]TopicData, oneBatchQuantity) return nil } @@ -156,7 +158,7 @@ func (cs *CustomerSubscriber) SubscribeRun() { func (cs *CustomerSubscriber) subscribe() bool { //先从内存中查找 - topicData, ret := cs.subscriber.queue.FindData(cs.StartIndex+1, cs.oneBatchQuantity) + topicData, ret := cs.subscriber.queue.FindData(cs.StartIndex+1, cs.oneBatchQuantity, cs.topicCache[:0]) if ret == true { cs.publishToCustomer(topicData) return true diff --git a/sysservice/messagequeueservice/MemoryQueue.go b/sysservice/messagequeueservice/MemoryQueue.go index e533489..986fe46 100644 --- a/sysservice/messagequeueservice/MemoryQueue.go +++ b/sysservice/messagequeueservice/MemoryQueue.go @@ -78,7 +78,7 @@ func (mq *MemoryQueue) findData(startPos int32, startIndex uint64, limit int32) } // FindData 返回参数[]TopicData 表示查找到的数据,nil表示无数据。bool表示是否不应该在内存中来查 -func (mq *MemoryQueue) FindData(startIndex uint64, limit int32) ([]TopicData, bool) { +func (mq *MemoryQueue) FindData(startIndex uint64, limit int32, dataQueue []TopicData) ([]TopicData, bool) { mq.locker.RLock() defer mq.locker.RUnlock() @@ -87,15 +87,22 @@ func (mq *MemoryQueue) FindData(startIndex uint64, limit int32) ([]TopicData, bo return nil, false } else if mq.head < mq.tail { // 队列没有折叠 - return mq.findData(mq.head + 1, startIndex, limit) + datas,ret := mq.findData(mq.head + 1, startIndex, limit) + if ret { + dataQueue = append(dataQueue, datas...) + } + return dataQueue, ret } else { // 折叠先找后面的部分 datas,ret := mq.findData(mq.head+1, startIndex, limit) if ret { - return datas, ret + dataQueue = append(dataQueue, datas...) + return dataQueue, ret } // 后面没找到,从前面开始找 - return mq.findData(0, startIndex, limit) + datas,ret = mq.findData(0, startIndex, limit) + dataQueue = append(dataQueue, datas...) + return dataQueue, ret } }