diff --git a/sysservice/messagequeueservice/CustomerSubscriber.go b/sysservice/messagequeueservice/CustomerSubscriber.go index f7a4deb..2ffbffc 100644 --- a/sysservice/messagequeueservice/CustomerSubscriber.go +++ b/sysservice/messagequeueservice/CustomerSubscriber.go @@ -25,6 +25,7 @@ type CustomerSubscriber struct { customerId string isStop int32 //退出标记 + topicCache []TopicData // 从消息队列中取出来的消息的缓存 } const DefaultOneBatchQuantity = 1000 @@ -79,6 +80,7 @@ func (cs *CustomerSubscriber) trySetSubscriberBaseInfo(rpcHandler rpc.IRpcHandle cs.StartIndex = uint64(zeroTime.Unix() << 32) } + cs.topicCache = make([]TopicData, oneBatchQuantity) return nil } @@ -156,7 +158,7 @@ func (cs *CustomerSubscriber) SubscribeRun() { func (cs *CustomerSubscriber) subscribe() bool { //先从内存中查找 - topicData, ret := cs.subscriber.queue.FindData(cs.StartIndex+1, cs.oneBatchQuantity) + topicData, ret := cs.subscriber.queue.FindData(cs.StartIndex+1, cs.oneBatchQuantity, cs.topicCache[:0]) if ret == true { cs.publishToCustomer(topicData) return true diff --git a/sysservice/messagequeueservice/MemoryQueue.go b/sysservice/messagequeueservice/MemoryQueue.go index e533489..986fe46 100644 --- a/sysservice/messagequeueservice/MemoryQueue.go +++ b/sysservice/messagequeueservice/MemoryQueue.go @@ -78,7 +78,7 @@ func (mq *MemoryQueue) findData(startPos int32, startIndex uint64, limit int32) } // FindData 返回参数[]TopicData 表示查找到的数据,nil表示无数据。bool表示是否不应该在内存中来查 -func (mq *MemoryQueue) FindData(startIndex uint64, limit int32) ([]TopicData, bool) { +func (mq *MemoryQueue) FindData(startIndex uint64, limit int32, dataQueue []TopicData) ([]TopicData, bool) { mq.locker.RLock() defer mq.locker.RUnlock() @@ -87,15 +87,22 @@ func (mq *MemoryQueue) FindData(startIndex uint64, limit int32) ([]TopicData, bo return nil, false } else if mq.head < mq.tail { // 队列没有折叠 - return mq.findData(mq.head + 1, startIndex, limit) + datas,ret := mq.findData(mq.head + 1, startIndex, limit) + if ret { + dataQueue = append(dataQueue, datas...) + } + return dataQueue, ret } else { // 折叠先找后面的部分 datas,ret := mq.findData(mq.head+1, startIndex, limit) if ret { - return datas, ret + dataQueue = append(dataQueue, datas...) + return dataQueue, ret } // 后面没找到,从前面开始找 - return mq.findData(0, startIndex, limit) + datas,ret = mq.findData(0, startIndex, limit) + dataQueue = append(dataQueue, datas...) + return dataQueue, ret } }