Compare commits

...

12 Commits

Author SHA1 Message Date
origin
8c8d681093 优化消息队列-当没有新消息时加大延迟时间 2023-01-28 16:25:30 +08:00
origin
b8150cfc51 扩展索引排序 2023-01-13 10:59:36 +08:00
origin
3833884777 优化消息队列 2022-12-29 14:54:54 +08:00
orgin
66770f07a5 优化消息队列 2022-12-09 12:42:47 +08:00
orgin
76c8541b34 优化消息队列持久化日志 2022-12-01 14:19:02 +08:00
orgin
b1fee9bc57 优化消息队列服务日志 2022-11-29 17:25:34 +08:00
orgin
284d43dc71 优化rpc 2022-11-29 13:38:07 +08:00
orgin
fd43863b73 优化rpc 2022-11-29 09:50:27 +08:00
orgin
1fcd870f1d 优化消息队列服务 2022-11-26 16:15:41 +08:00
orgin
11b78f84c4 优化rpc 2022-11-26 14:36:03 +08:00
orgin
8c6ee24b16 优化rpc 2022-11-26 14:16:25 +08:00
orgin
ca23925796 优化持久化存储模块 2022-11-25 11:45:35 +08:00
8 changed files with 133 additions and 81 deletions

View File

@@ -265,11 +265,35 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
return pCall return pCall
} }
var iParam interface{}
if processor == nil { if processor == nil {
_, processor = GetProcessorType(args) _, processor = GetProcessorType(args)
} }
if args != nil {
inParamValue := reflect.New(reflect.ValueOf(args).Type().Elem())
//args
//复制输入参数
iParam = inParamValue.Interface()
bytes,err := processor.Marshal(args)
if err == nil {
err = processor.Unmarshal(bytes,iParam)
}
if err != nil {
pCall.Seq = 0
pCall.Err = errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
pCall.done <- pCall
log.SError(pCall.Err.Error())
return pCall
}
}
req := MakeRpcRequest(processor, 0, rpcMethodId, serviceMethod, noReply, nil) req := MakeRpcRequest(processor, 0, rpcMethodId, serviceMethod, noReply, nil)
req.inParam = args req.inParam = iParam
req.localReply = reply req.localReply = reply
if rawArgs != nil { if rawArgs != nil {
var err error var err error
@@ -335,8 +359,23 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client, callerRpcHandler
} }
_, processor := GetProcessorType(args) _, processor := GetProcessorType(args)
inParamValue := reflect.New(reflect.ValueOf(args).Type().Elem())
//args
//复制输入参数
iParam := inParamValue.Interface()
bytes,err := processor.Marshal(args)
if err == nil {
err = processor.Unmarshal(bytes,iParam)
}
if err != nil {
errM := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
log.SError(errM.Error())
return errM
}
req := MakeRpcRequest(processor, 0, 0, serviceMethod, noReply, nil) req := MakeRpcRequest(processor, 0, 0, serviceMethod, noReply, nil)
req.inParam = args req.inParam = iParam
req.localReply = reply req.localReply = reply
if noReply == false { if noReply == false {
@@ -370,7 +409,7 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client, callerRpcHandler
} }
} }
err := rpcHandler.PushRpcRequest(req) err = rpcHandler.PushRpcRequest(req)
if err != nil { if err != nil {
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
return err return err

View File

@@ -68,34 +68,39 @@ func (s *Session) NextSeq(db string, collection string, id interface{}) (int, er
after := options.After after := options.After
updateOpts := options.FindOneAndUpdateOptions{ReturnDocument: &after} updateOpts := options.FindOneAndUpdateOptions{ReturnDocument: &after}
err := s.Client.Database(db).Collection(collection).FindOneAndUpdate(ctxTimeout, bson.M{"_id": id}, bson.M{"$inc": bson.M{"Seq": 1}},&updateOpts).Decode(&res) err := s.Client.Database(db).Collection(collection).FindOneAndUpdate(ctxTimeout, bson.M{"_id": id}, bson.M{"$inc": bson.M{"Seq": 1}}, &updateOpts).Decode(&res)
return res.Seq, err return res.Seq, err
} }
//indexKeys[索引][每个索引key字段] // indexKeys[索引][每个索引key字段]
func (s *Session) EnsureIndex(db string, collection string, indexKeys [][]string, bBackground bool,sparse bool) error { func (s *Session) EnsureIndex(db string, collection string, indexKeys [][]string, bBackground bool, sparse bool, asc bool) error {
return s.ensureIndex(db, collection, indexKeys, bBackground, false,sparse) return s.ensureIndex(db, collection, indexKeys, bBackground, false, sparse, asc)
} }
//indexKeys[索引][每个索引key字段] // indexKeys[索引][每个索引key字段]
func (s *Session) EnsureUniqueIndex(db string, collection string, indexKeys [][]string, bBackground bool,sparse bool) error { func (s *Session) EnsureUniqueIndex(db string, collection string, indexKeys [][]string, bBackground bool, sparse bool, asc bool) error {
return s.ensureIndex(db, collection, indexKeys, bBackground, true,sparse) return s.ensureIndex(db, collection, indexKeys, bBackground, true, sparse, asc)
} }
//keys[索引][每个索引key字段] // keys[索引][每个索引key字段]
func (s *Session) ensureIndex(db string, collection string, indexKeys [][]string, bBackground bool, unique bool,sparse bool) error { func (s *Session) ensureIndex(db string, collection string, indexKeys [][]string, bBackground bool, unique bool, sparse bool, asc bool) error {
var indexes []mongo.IndexModel var indexes []mongo.IndexModel
for _, keys := range indexKeys { for _, keys := range indexKeys {
keysDoc := bsonx.Doc{} keysDoc := bsonx.Doc{}
for _, key := range keys { for _, key := range keys {
keysDoc = keysDoc.Append(key, bsonx.Int32(1)) if asc {
keysDoc = keysDoc.Append(key, bsonx.Int32(1))
} else {
keysDoc = keysDoc.Append(key, bsonx.Int32(-1))
}
} }
options:= options.Index().SetUnique(unique).SetBackground(bBackground) options := options.Index().SetUnique(unique).SetBackground(bBackground)
if sparse == true { if sparse == true {
options.SetSparse(true) options.SetSparse(true)
} }
indexes = append(indexes, mongo.IndexModel{Keys: keysDoc, Options:options }) indexes = append(indexes, mongo.IndexModel{Keys: keysDoc, Options: options})
} }
ctxTimeout, cancel := context.WithTimeout(context.Background(), s.maxOperatorTimeOut) ctxTimeout, cancel := context.WithTimeout(context.Background(), s.maxOperatorTimeOut)

View File

@@ -25,6 +25,7 @@ type CustomerSubscriber struct {
customerId string customerId string
isStop int32 //退出标记 isStop int32 //退出标记
topicCache []TopicData // 从消息队列中取出来的消息的缓存
} }
const DefaultOneBatchQuantity = 1000 const DefaultOneBatchQuantity = 1000
@@ -79,6 +80,7 @@ func (cs *CustomerSubscriber) trySetSubscriberBaseInfo(rpcHandler rpc.IRpcHandle
cs.StartIndex = uint64(zeroTime.Unix() << 32) cs.StartIndex = uint64(zeroTime.Unix() << 32)
} }
cs.topicCache = make([]TopicData, oneBatchQuantity)
return nil return nil
} }
@@ -156,14 +158,14 @@ func (cs *CustomerSubscriber) SubscribeRun() {
func (cs *CustomerSubscriber) subscribe() bool { func (cs *CustomerSubscriber) subscribe() bool {
//先从内存中查找 //先从内存中查找
topicData, ret := cs.subscriber.queue.FindData(cs.StartIndex, cs.oneBatchQuantity) topicData, ret := cs.subscriber.queue.FindData(cs.StartIndex+1, cs.oneBatchQuantity, cs.topicCache[:0])
if ret == true { if ret == true {
cs.publishToCustomer(topicData) cs.publishToCustomer(topicData)
return true return true
} }
//从持久化数据中来找 //从持久化数据中来找
topicData = cs.subscriber.dataPersist.FindTopicData(cs.topic, cs.StartIndex, int64(cs.oneBatchQuantity)) topicData = cs.subscriber.dataPersist.FindTopicData(cs.topic, cs.StartIndex, int64(cs.oneBatchQuantity),cs.topicCache[:0])
return cs.publishToCustomer(topicData) return cs.publishToCustomer(topicData)
} }
@@ -188,7 +190,7 @@ func (cs *CustomerSubscriber) publishToCustomer(topicData []TopicData) bool {
if len(topicData) == 0 { if len(topicData) == 0 {
//没有任何数据待一秒吧 //没有任何数据待一秒吧
time.Sleep(time.Millisecond * 100) time.Sleep(time.Second * 1)
return true return true
} }

View File

@@ -49,13 +49,22 @@ func (mq *MemoryQueue) findData(startPos int32, startIndex uint64, limit int32)
if findStartPos <= mq.tail { if findStartPos <= mq.tail {
findEndPos = mq.tail + 1 findEndPos = mq.tail + 1
} else { } else {
findEndPos = int32(cap(mq.topicQueue)) findEndPos = int32(len(mq.topicQueue))
}
if findStartPos >= findEndPos {
return nil, false
}
// 要取的Seq 比内存中最小的数据的Seq还小那么需要返回错误
if mq.topicQueue[findStartPos].Seq > startIndex {
return nil, false
} }
//二分查找位置 //二分查找位置
pos := int32(algorithms.BiSearch(mq.topicQueue[findStartPos:findEndPos], startIndex, 1)) pos := int32(algorithms.BiSearch(mq.topicQueue[findStartPos:findEndPos], startIndex, 1))
if pos == -1 { if pos == -1 {
return nil, true return nil, false
} }
pos += findStartPos pos += findStartPos
@@ -69,29 +78,31 @@ func (mq *MemoryQueue) findData(startPos int32, startIndex uint64, limit int32)
} }
// FindData 返回参数[]TopicData 表示查找到的数据nil表示无数据。bool表示是否不应该在内存中来查 // FindData 返回参数[]TopicData 表示查找到的数据nil表示无数据。bool表示是否不应该在内存中来查
func (mq *MemoryQueue) FindData(startIndex uint64, limit int32) ([]TopicData, bool) { func (mq *MemoryQueue) FindData(startIndex uint64, limit int32, dataQueue []TopicData) ([]TopicData, bool) {
mq.locker.RLock() mq.locker.RLock()
defer mq.locker.RUnlock() defer mq.locker.RUnlock()
//队列为空时,应该从数据库查找 //队列为空时,应该从数据库查找
if mq.head == mq.tail { if mq.head == mq.tail {
return nil, false return nil, false
} } else if mq.head < mq.tail {
// 队列没有折叠
/* datas,ret := mq.findData(mq.head + 1, startIndex, limit)
//先判断startIndex是否比第一个元素要大 if ret {
headTopic := (mq.head + 1) % int32(len(mq.topicQueue)) dataQueue = append(dataQueue, datas...)
//此时需要从持久化数据中取 }
if startIndex+1 > mq.topicQueue[headTopic].Seq { return dataQueue, ret
return nil, false } else {
// 折叠先找后面的部分
datas,ret := mq.findData(mq.head+1, startIndex, limit)
if ret {
dataQueue = append(dataQueue, datas...)
return dataQueue, ret
} }
*/
retData, ret := mq.findData(mq.head+1, startIndex, limit) // 后面没找到,从前面开始找
if mq.head <= mq.tail || ret == true { datas,ret = mq.findData(0, startIndex, limit)
return retData, true dataQueue = append(dataQueue, datas...)
return dataQueue, ret
} }
//如果是正常head在后尾在前从数组0下标开始找到tail
return mq.findData(0, startIndex, limit)
} }

View File

@@ -15,8 +15,8 @@ type QueueDataPersist interface {
OnExit() OnExit()
OnReceiveTopicData(topic string, topicData []TopicData) //当收到推送过来的数据时 OnReceiveTopicData(topic string, topicData []TopicData) //当收到推送过来的数据时
OnPushTopicDataToCustomer(topic string, topicData []TopicData) //当推送数据到Customer时回调 OnPushTopicDataToCustomer(topic string, topicData []TopicData) //当推送数据到Customer时回调
PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, bool) //持久化数据,失败则返回false上层会重复尝试直到成功建议在函数中加入次数超过次数则返回true PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool) //持久化数据,失败则返回false上层会重复尝试直到成功建议在函数中加入次数超过次数则返回true
FindTopicData(topic string, startIndex uint64, limit int64) []TopicData //查找数据,参数bool代表数据库查找是否成功 FindTopicData(topic string, startIndex uint64, limit int64, topicBuff []TopicData) []TopicData //查找数据,参数bool代表数据库查找是否成功
LoadCustomerIndex(topic string, customerId string) (uint64, bool) //false时代表获取失败一般是读取错误会进行重试。如果不存在时返回(0,true) LoadCustomerIndex(topic string, customerId string) (uint64, bool) //false时代表获取失败一般是读取错误会进行重试。如果不存在时返回(0,true)
GetIndex(topicData *TopicData) uint64 //通过topic数据获取进度索引号 GetIndex(topicData *TopicData) uint64 //通过topic数据获取进度索引号
PersistIndex(topic string, customerId string, index uint64) //持久化进度索引号 PersistIndex(topic string, customerId string, index uint64) //持久化进度索引号

View File

@@ -20,8 +20,6 @@ type MongoPersist struct {
url string //连接url url string //连接url
dbName string //数据库名称 dbName string //数据库名称
retryCount int //落地数据库重试次数 retryCount int //落地数据库重试次数
topic []TopicData //用于临时缓存
} }
const CustomerCollectName = "SysCustomer" const CustomerCollectName = "SysCustomer"
@@ -48,7 +46,7 @@ func (mp *MongoPersist) OnInit() error {
keys = append(keys, "Customer", "Topic") keys = append(keys, "Customer", "Topic")
IndexKey = append(IndexKey, keys) IndexKey = append(IndexKey, keys)
s := mp.mongo.TakeSession() s := mp.mongo.TakeSession()
if err := s.EnsureUniqueIndex(mp.dbName, CustomerCollectName, IndexKey, true, true); err != nil { if err := s.EnsureUniqueIndex(mp.dbName, CustomerCollectName, IndexKey, true, true,true); err != nil {
log.SError("EnsureUniqueIndex is fail ", err.Error()) log.SError("EnsureUniqueIndex is fail ", err.Error())
return err return err
} }
@@ -85,14 +83,6 @@ func (mp *MongoPersist) ReadCfg() error {
return nil return nil
} }
func (mp *MongoPersist) getTopicBuff(limit int) []TopicData {
if cap(mp.topic) < limit {
mp.topic = make([]TopicData, limit)
}
return mp.topic[:0]
}
func (mp *MongoPersist) OnExit() { func (mp *MongoPersist) OnExit() {
} }
@@ -123,7 +113,6 @@ func (mp *MongoPersist) OnReceiveTopicData(topic string, topicData []TopicData)
// OnPushTopicDataToCustomer 当推送数据到Customer时回调 // OnPushTopicDataToCustomer 当推送数据到Customer时回调
func (mp *MongoPersist) OnPushTopicDataToCustomer(topic string, topicData []TopicData) { func (mp *MongoPersist) OnPushTopicDataToCustomer(topic string, topicData []TopicData) {
} }
// PersistTopicData 持久化数据 // PersistTopicData 持久化数据
@@ -142,20 +131,25 @@ func (mp *MongoPersist) persistTopicData(collectionName string, topicData []Topi
_, err := s.Collection(mp.dbName, collectionName).InsertMany(ctx, documents) _, err := s.Collection(mp.dbName, collectionName).InsertMany(ctx, documents)
if err != nil { if err != nil {
log.SError("PersistTopicData InsertMany fail,collect name is ", collectionName) log.SError("PersistTopicData InsertMany fail,collect name is ", collectionName," error:",err.Error())
//失败最大重试数量 //失败最大重试数量
return retryCount >= mp.retryCount return retryCount >= mp.retryCount
} }
//log.SRelease("+++++++++====", time.Now().UnixNano())
return true return true
} }
func (mp *MongoPersist) IsSameDay(timestamp1 int64,timestamp2 int64) bool{
t1 := time.Unix(timestamp1, 0)
t2 := time.Unix(timestamp2, 0)
return t1.Year() == t2.Year() && t1.Month() == t2.Month()&&t1.Day() == t2.Day()
}
// PersistTopicData 持久化数据 // PersistTopicData 持久化数据
func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, bool) { func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool) {
if len(topicData) == 0 { if len(topicData) == 0 {
return nil, true return nil, nil,true
} }
preDate := topicData[0].Seq >> 32 preDate := topicData[0].Seq >> 32
@@ -163,7 +157,7 @@ func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, re
for findPos = 1; findPos < len(topicData); findPos++ { for findPos = 1; findPos < len(topicData); findPos++ {
newDate := topicData[findPos].Seq >> 32 newDate := topicData[findPos].Seq >> 32
//说明换天了 //说明换天了
if preDate != newDate { if mp.IsSameDay(int64(preDate),int64(newDate)) == false {
break break
} }
} }
@@ -172,15 +166,15 @@ func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, re
ret := mp.persistTopicData(collectName, topicData[:findPos], retryCount) ret := mp.persistTopicData(collectName, topicData[:findPos], retryCount)
//如果失败,下次重试 //如果失败,下次重试
if ret == false { if ret == false {
return nil, false return nil, nil, false
} }
//如果成功 //如果成功
return topicData[findPos:len(topicData)], true return topicData[findPos:len(topicData)], topicData[0:findPos], true
} }
// FindTopicData 查找数据 // FindTopicData 查找数据
func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int64) ([]TopicData, bool) { func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int64,topicBuff []TopicData) ([]TopicData, bool) {
s := mp.mongo.TakeSession() s := mp.mongo.TakeSession()
@@ -222,7 +216,6 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int
} }
//序列化返回 //序列化返回
topicBuff := mp.getTopicBuff(int(limit))
for i := 0; i < len(res); i++ { for i := 0; i < len(res); i++ {
rawData, errM := bson.Marshal(res[i]) rawData, errM := bson.Marshal(res[i])
if errM != nil { if errM != nil {
@@ -257,7 +250,7 @@ func (mp *MongoPersist) getCollectCount(topic string,today string) (int64 ,error
} }
// FindTopicData 查找数据 // FindTopicData 查找数据
func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64) []TopicData { func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64,topicBuff []TopicData) []TopicData {
//某表找不到,一直往前找,找到当前置为止 //某表找不到,一直往前找,找到当前置为止
for days := 1; days <= MaxDays; days++ { for days := 1; days <= MaxDays; days++ {
//是否可以跳天 //是否可以跳天
@@ -281,7 +274,7 @@ func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int
} }
//从startIndex开始一直往后查 //从startIndex开始一直往后查
topicData, isSucc := mp.findTopicData(topic, startIndex, limit) topicData, isSucc := mp.findTopicData(topic, startIndex, limit,topicBuff)
//有数据或者数据库出错时返回,返回后,会进行下一轮的查询遍历 //有数据或者数据库出错时返回,返回后,会进行下一轮的查询遍历
if len(topicData) > 0 || isSucc == false { if len(topicData) > 0 || isSucc == false {
return topicData return topicData
@@ -394,8 +387,7 @@ func (mp *MongoPersist) PersistIndex(topic string, customerId string, index uint
ctx, cancel := s.GetDefaultContext() ctx, cancel := s.GetDefaultContext()
defer cancel() defer cancel()
ret, err := s.Collection(mp.dbName, CustomerCollectName).UpdateOne(ctx, condition, updata, UpdateOptionsOpts...) _, err := s.Collection(mp.dbName, CustomerCollectName).UpdateOne(ctx, condition, updata, UpdateOptionsOpts...)
fmt.Println(ret)
if err != nil { if err != nil {
log.SError("PersistIndex fail :", err.Error()) log.SError("PersistIndex fail :", err.Error())
} }

View File

@@ -27,7 +27,7 @@ func (ss *Subscriber) PushTopicDataToQueue(topic string, topics []TopicData) {
} }
} }
func (ss *Subscriber) PersistTopicData(topic string, topics []TopicData, retryCount int) ([]TopicData, bool) { func (ss *Subscriber) PersistTopicData(topic string, topics []TopicData, retryCount int) ([]TopicData, []TopicData, bool) {
return ss.dataPersist.PersistTopicData(topic, topics, retryCount) return ss.dataPersist.PersistTopicData(topic, topics, retryCount)
} }

View File

@@ -113,25 +113,28 @@ func (tr *TopicRoom) topicRoomRun() {
} }
//如果落地失败最大重试maxTryPersistNum次数 //如果落地失败最大重试maxTryPersistNum次数
var ret bool for retryCount := 0; retryCount < maxTryPersistNum; {
for j := 0; j < maxTryPersistNum; {
//持久化处理 //持久化处理
stagingBuff, ret = tr.PersistTopicData(tr.topic, stagingBuff, j+1) stagingBuff, savedBuff, ret := tr.PersistTopicData(tr.topic, stagingBuff, retryCount+1)
//如果存档成功,并且有后续批次,则继续存档
if ret == true && len(stagingBuff) > 0 { if ret == true {
//二次存档不计次数 // 1. 把成功存储的数据放入内存中
continue if len(savedBuff) > 0 {
} tr.PushTopicDataToQueue(tr.topic, savedBuff)
}
//计数增加一次并且等待100ms继续重试
j += 1 // 2. 如果存档成功,并且有后续批次,则继续存档
if ret == false { if ret == true && len(stagingBuff) > 0 {
continue
}
// 3. 成功了,跳出
break
} else {
//计数增加一次并且等待100ms继续重试
retryCount++
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
continue
} }
tr.PushTopicDataToQueue(tr.topic, stagingBuff)
break
} }
} }