mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-04 06:54:45 +08:00
优化消息队列
This commit is contained in:
@@ -25,6 +25,7 @@ type CustomerSubscriber struct {
|
|||||||
customerId string
|
customerId string
|
||||||
|
|
||||||
isStop int32 //退出标记
|
isStop int32 //退出标记
|
||||||
|
topicCache []TopicData // 从消息队列中取出来的消息的缓存
|
||||||
}
|
}
|
||||||
|
|
||||||
const DefaultOneBatchQuantity = 1000
|
const DefaultOneBatchQuantity = 1000
|
||||||
@@ -79,6 +80,7 @@ func (cs *CustomerSubscriber) trySetSubscriberBaseInfo(rpcHandler rpc.IRpcHandle
|
|||||||
cs.StartIndex = uint64(zeroTime.Unix() << 32)
|
cs.StartIndex = uint64(zeroTime.Unix() << 32)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cs.topicCache = make([]TopicData, oneBatchQuantity)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -156,7 +158,7 @@ func (cs *CustomerSubscriber) SubscribeRun() {
|
|||||||
|
|
||||||
func (cs *CustomerSubscriber) subscribe() bool {
|
func (cs *CustomerSubscriber) subscribe() bool {
|
||||||
//先从内存中查找
|
//先从内存中查找
|
||||||
topicData, ret := cs.subscriber.queue.FindData(cs.StartIndex+1, cs.oneBatchQuantity)
|
topicData, ret := cs.subscriber.queue.FindData(cs.StartIndex+1, cs.oneBatchQuantity, cs.topicCache[:0])
|
||||||
if ret == true {
|
if ret == true {
|
||||||
cs.publishToCustomer(topicData)
|
cs.publishToCustomer(topicData)
|
||||||
return true
|
return true
|
||||||
|
|||||||
@@ -78,7 +78,7 @@ func (mq *MemoryQueue) findData(startPos int32, startIndex uint64, limit int32)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// FindData 返回参数[]TopicData 表示查找到的数据,nil表示无数据。bool表示是否不应该在内存中来查
|
// FindData 返回参数[]TopicData 表示查找到的数据,nil表示无数据。bool表示是否不应该在内存中来查
|
||||||
func (mq *MemoryQueue) FindData(startIndex uint64, limit int32) ([]TopicData, bool) {
|
func (mq *MemoryQueue) FindData(startIndex uint64, limit int32, dataQueue []TopicData) ([]TopicData, bool) {
|
||||||
mq.locker.RLock()
|
mq.locker.RLock()
|
||||||
defer mq.locker.RUnlock()
|
defer mq.locker.RUnlock()
|
||||||
|
|
||||||
@@ -87,15 +87,22 @@ func (mq *MemoryQueue) FindData(startIndex uint64, limit int32) ([]TopicData, bo
|
|||||||
return nil, false
|
return nil, false
|
||||||
} else if mq.head < mq.tail {
|
} else if mq.head < mq.tail {
|
||||||
// 队列没有折叠
|
// 队列没有折叠
|
||||||
return mq.findData(mq.head + 1, startIndex, limit)
|
datas,ret := mq.findData(mq.head + 1, startIndex, limit)
|
||||||
|
if ret {
|
||||||
|
dataQueue = append(dataQueue, datas...)
|
||||||
|
}
|
||||||
|
return dataQueue, ret
|
||||||
} else {
|
} else {
|
||||||
// 折叠先找后面的部分
|
// 折叠先找后面的部分
|
||||||
datas,ret := mq.findData(mq.head+1, startIndex, limit)
|
datas,ret := mq.findData(mq.head+1, startIndex, limit)
|
||||||
if ret {
|
if ret {
|
||||||
return datas, ret
|
dataQueue = append(dataQueue, datas...)
|
||||||
|
return dataQueue, ret
|
||||||
}
|
}
|
||||||
|
|
||||||
// 后面没找到,从前面开始找
|
// 后面没找到,从前面开始找
|
||||||
return mq.findData(0, startIndex, limit)
|
datas,ret = mq.findData(0, startIndex, limit)
|
||||||
|
dataQueue = append(dataQueue, datas...)
|
||||||
|
return dataQueue, ret
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user