Compare commits

...

12 Commits

Author SHA1 Message Date
origin
8c8d681093 优化消息队列-当没有新消息时加大延迟时间 2023-01-28 16:25:30 +08:00
origin
b8150cfc51 扩展索引排序 2023-01-13 10:59:36 +08:00
origin
3833884777 优化消息队列 2022-12-29 14:54:54 +08:00
orgin
66770f07a5 优化消息队列 2022-12-09 12:42:47 +08:00
orgin
76c8541b34 优化消息队列持久化日志 2022-12-01 14:19:02 +08:00
orgin
b1fee9bc57 优化消息队列服务日志 2022-11-29 17:25:34 +08:00
orgin
284d43dc71 优化rpc 2022-11-29 13:38:07 +08:00
orgin
fd43863b73 优化rpc 2022-11-29 09:50:27 +08:00
orgin
1fcd870f1d 优化消息队列服务 2022-11-26 16:15:41 +08:00
orgin
11b78f84c4 优化rpc 2022-11-26 14:36:03 +08:00
orgin
8c6ee24b16 优化rpc 2022-11-26 14:16:25 +08:00
orgin
ca23925796 优化持久化存储模块 2022-11-25 11:45:35 +08:00
8 changed files with 133 additions and 81 deletions

View File

@@ -265,11 +265,35 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
return pCall
}
var iParam interface{}
if processor == nil {
_, processor = GetProcessorType(args)
}
if args != nil {
inParamValue := reflect.New(reflect.ValueOf(args).Type().Elem())
//args
//复制输入参数
iParam = inParamValue.Interface()
bytes,err := processor.Marshal(args)
if err == nil {
err = processor.Unmarshal(bytes,iParam)
}
if err != nil {
pCall.Seq = 0
pCall.Err = errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
pCall.done <- pCall
log.SError(pCall.Err.Error())
return pCall
}
}
req := MakeRpcRequest(processor, 0, rpcMethodId, serviceMethod, noReply, nil)
req.inParam = args
req.inParam = iParam
req.localReply = reply
if rawArgs != nil {
var err error
@@ -335,8 +359,23 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client, callerRpcHandler
}
_, processor := GetProcessorType(args)
inParamValue := reflect.New(reflect.ValueOf(args).Type().Elem())
//args
//复制输入参数
iParam := inParamValue.Interface()
bytes,err := processor.Marshal(args)
if err == nil {
err = processor.Unmarshal(bytes,iParam)
}
if err != nil {
errM := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
log.SError(errM.Error())
return errM
}
req := MakeRpcRequest(processor, 0, 0, serviceMethod, noReply, nil)
req.inParam = args
req.inParam = iParam
req.localReply = reply
if noReply == false {
@@ -370,7 +409,7 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client, callerRpcHandler
}
}
err := rpcHandler.PushRpcRequest(req)
err = rpcHandler.PushRpcRequest(req)
if err != nil {
ReleaseRpcRequest(req)
return err

View File

@@ -68,34 +68,39 @@ func (s *Session) NextSeq(db string, collection string, id interface{}) (int, er
after := options.After
updateOpts := options.FindOneAndUpdateOptions{ReturnDocument: &after}
err := s.Client.Database(db).Collection(collection).FindOneAndUpdate(ctxTimeout, bson.M{"_id": id}, bson.M{"$inc": bson.M{"Seq": 1}},&updateOpts).Decode(&res)
err := s.Client.Database(db).Collection(collection).FindOneAndUpdate(ctxTimeout, bson.M{"_id": id}, bson.M{"$inc": bson.M{"Seq": 1}}, &updateOpts).Decode(&res)
return res.Seq, err
}
//indexKeys[索引][每个索引key字段]
func (s *Session) EnsureIndex(db string, collection string, indexKeys [][]string, bBackground bool,sparse bool) error {
return s.ensureIndex(db, collection, indexKeys, bBackground, false,sparse)
// indexKeys[索引][每个索引key字段]
func (s *Session) EnsureIndex(db string, collection string, indexKeys [][]string, bBackground bool, sparse bool, asc bool) error {
return s.ensureIndex(db, collection, indexKeys, bBackground, false, sparse, asc)
}
//indexKeys[索引][每个索引key字段]
func (s *Session) EnsureUniqueIndex(db string, collection string, indexKeys [][]string, bBackground bool,sparse bool) error {
return s.ensureIndex(db, collection, indexKeys, bBackground, true,sparse)
// indexKeys[索引][每个索引key字段]
func (s *Session) EnsureUniqueIndex(db string, collection string, indexKeys [][]string, bBackground bool, sparse bool, asc bool) error {
return s.ensureIndex(db, collection, indexKeys, bBackground, true, sparse, asc)
}
//keys[索引][每个索引key字段]
func (s *Session) ensureIndex(db string, collection string, indexKeys [][]string, bBackground bool, unique bool,sparse bool) error {
// keys[索引][每个索引key字段]
func (s *Session) ensureIndex(db string, collection string, indexKeys [][]string, bBackground bool, unique bool, sparse bool, asc bool) error {
var indexes []mongo.IndexModel
for _, keys := range indexKeys {
keysDoc := bsonx.Doc{}
for _, key := range keys {
keysDoc = keysDoc.Append(key, bsonx.Int32(1))
if asc {
keysDoc = keysDoc.Append(key, bsonx.Int32(1))
} else {
keysDoc = keysDoc.Append(key, bsonx.Int32(-1))
}
}
options:= options.Index().SetUnique(unique).SetBackground(bBackground)
options := options.Index().SetUnique(unique).SetBackground(bBackground)
if sparse == true {
options.SetSparse(true)
}
indexes = append(indexes, mongo.IndexModel{Keys: keysDoc, Options:options })
indexes = append(indexes, mongo.IndexModel{Keys: keysDoc, Options: options})
}
ctxTimeout, cancel := context.WithTimeout(context.Background(), s.maxOperatorTimeOut)

View File

@@ -25,6 +25,7 @@ type CustomerSubscriber struct {
customerId string
isStop int32 //退出标记
topicCache []TopicData // 从消息队列中取出来的消息的缓存
}
const DefaultOneBatchQuantity = 1000
@@ -79,6 +80,7 @@ func (cs *CustomerSubscriber) trySetSubscriberBaseInfo(rpcHandler rpc.IRpcHandle
cs.StartIndex = uint64(zeroTime.Unix() << 32)
}
cs.topicCache = make([]TopicData, oneBatchQuantity)
return nil
}
@@ -156,14 +158,14 @@ func (cs *CustomerSubscriber) SubscribeRun() {
func (cs *CustomerSubscriber) subscribe() bool {
//先从内存中查找
topicData, ret := cs.subscriber.queue.FindData(cs.StartIndex, cs.oneBatchQuantity)
topicData, ret := cs.subscriber.queue.FindData(cs.StartIndex+1, cs.oneBatchQuantity, cs.topicCache[:0])
if ret == true {
cs.publishToCustomer(topicData)
return true
}
//从持久化数据中来找
topicData = cs.subscriber.dataPersist.FindTopicData(cs.topic, cs.StartIndex, int64(cs.oneBatchQuantity))
topicData = cs.subscriber.dataPersist.FindTopicData(cs.topic, cs.StartIndex, int64(cs.oneBatchQuantity),cs.topicCache[:0])
return cs.publishToCustomer(topicData)
}
@@ -188,7 +190,7 @@ func (cs *CustomerSubscriber) publishToCustomer(topicData []TopicData) bool {
if len(topicData) == 0 {
//没有任何数据待一秒吧
time.Sleep(time.Millisecond * 100)
time.Sleep(time.Second * 1)
return true
}

View File

@@ -49,13 +49,22 @@ func (mq *MemoryQueue) findData(startPos int32, startIndex uint64, limit int32)
if findStartPos <= mq.tail {
findEndPos = mq.tail + 1
} else {
findEndPos = int32(cap(mq.topicQueue))
findEndPos = int32(len(mq.topicQueue))
}
if findStartPos >= findEndPos {
return nil, false
}
// 要取的Seq 比内存中最小的数据的Seq还小那么需要返回错误
if mq.topicQueue[findStartPos].Seq > startIndex {
return nil, false
}
//二分查找位置
pos := int32(algorithms.BiSearch(mq.topicQueue[findStartPos:findEndPos], startIndex, 1))
if pos == -1 {
return nil, true
return nil, false
}
pos += findStartPos
@@ -69,29 +78,31 @@ func (mq *MemoryQueue) findData(startPos int32, startIndex uint64, limit int32)
}
// FindData 返回参数[]TopicData 表示查找到的数据nil表示无数据。bool表示是否不应该在内存中来查
func (mq *MemoryQueue) FindData(startIndex uint64, limit int32) ([]TopicData, bool) {
func (mq *MemoryQueue) FindData(startIndex uint64, limit int32, dataQueue []TopicData) ([]TopicData, bool) {
mq.locker.RLock()
defer mq.locker.RUnlock()
//队列为空时,应该从数据库查找
if mq.head == mq.tail {
return nil, false
}
/*
//先判断startIndex是否比第一个元素要大
headTopic := (mq.head + 1) % int32(len(mq.topicQueue))
//此时需要从持久化数据中取
if startIndex+1 > mq.topicQueue[headTopic].Seq {
return nil, false
} else if mq.head < mq.tail {
// 队列没有折叠
datas,ret := mq.findData(mq.head + 1, startIndex, limit)
if ret {
dataQueue = append(dataQueue, datas...)
}
return dataQueue, ret
} else {
// 折叠先找后面的部分
datas,ret := mq.findData(mq.head+1, startIndex, limit)
if ret {
dataQueue = append(dataQueue, datas...)
return dataQueue, ret
}
*/
retData, ret := mq.findData(mq.head+1, startIndex, limit)
if mq.head <= mq.tail || ret == true {
return retData, true
// 后面没找到,从前面开始找
datas,ret = mq.findData(0, startIndex, limit)
dataQueue = append(dataQueue, datas...)
return dataQueue, ret
}
//如果是正常head在后尾在前从数组0下标开始找到tail
return mq.findData(0, startIndex, limit)
}

View File

@@ -15,8 +15,8 @@ type QueueDataPersist interface {
OnExit()
OnReceiveTopicData(topic string, topicData []TopicData) //当收到推送过来的数据时
OnPushTopicDataToCustomer(topic string, topicData []TopicData) //当推送数据到Customer时回调
PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, bool) //持久化数据,失败则返回false上层会重复尝试直到成功建议在函数中加入次数超过次数则返回true
FindTopicData(topic string, startIndex uint64, limit int64) []TopicData //查找数据,参数bool代表数据库查找是否成功
PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool) //持久化数据,失败则返回false上层会重复尝试直到成功建议在函数中加入次数超过次数则返回true
FindTopicData(topic string, startIndex uint64, limit int64, topicBuff []TopicData) []TopicData //查找数据,参数bool代表数据库查找是否成功
LoadCustomerIndex(topic string, customerId string) (uint64, bool) //false时代表获取失败一般是读取错误会进行重试。如果不存在时返回(0,true)
GetIndex(topicData *TopicData) uint64 //通过topic数据获取进度索引号
PersistIndex(topic string, customerId string, index uint64) //持久化进度索引号

View File

@@ -20,8 +20,6 @@ type MongoPersist struct {
url string //连接url
dbName string //数据库名称
retryCount int //落地数据库重试次数
topic []TopicData //用于临时缓存
}
const CustomerCollectName = "SysCustomer"
@@ -48,7 +46,7 @@ func (mp *MongoPersist) OnInit() error {
keys = append(keys, "Customer", "Topic")
IndexKey = append(IndexKey, keys)
s := mp.mongo.TakeSession()
if err := s.EnsureUniqueIndex(mp.dbName, CustomerCollectName, IndexKey, true, true); err != nil {
if err := s.EnsureUniqueIndex(mp.dbName, CustomerCollectName, IndexKey, true, true,true); err != nil {
log.SError("EnsureUniqueIndex is fail ", err.Error())
return err
}
@@ -85,14 +83,6 @@ func (mp *MongoPersist) ReadCfg() error {
return nil
}
func (mp *MongoPersist) getTopicBuff(limit int) []TopicData {
if cap(mp.topic) < limit {
mp.topic = make([]TopicData, limit)
}
return mp.topic[:0]
}
func (mp *MongoPersist) OnExit() {
}
@@ -123,7 +113,6 @@ func (mp *MongoPersist) OnReceiveTopicData(topic string, topicData []TopicData)
// OnPushTopicDataToCustomer 当推送数据到Customer时回调
func (mp *MongoPersist) OnPushTopicDataToCustomer(topic string, topicData []TopicData) {
}
// PersistTopicData 持久化数据
@@ -142,20 +131,25 @@ func (mp *MongoPersist) persistTopicData(collectionName string, topicData []Topi
_, err := s.Collection(mp.dbName, collectionName).InsertMany(ctx, documents)
if err != nil {
log.SError("PersistTopicData InsertMany fail,collect name is ", collectionName)
log.SError("PersistTopicData InsertMany fail,collect name is ", collectionName," error:",err.Error())
//失败最大重试数量
return retryCount >= mp.retryCount
}
//log.SRelease("+++++++++====", time.Now().UnixNano())
return true
}
func (mp *MongoPersist) IsSameDay(timestamp1 int64,timestamp2 int64) bool{
t1 := time.Unix(timestamp1, 0)
t2 := time.Unix(timestamp2, 0)
return t1.Year() == t2.Year() && t1.Month() == t2.Month()&&t1.Day() == t2.Day()
}
// PersistTopicData 持久化数据
func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, bool) {
func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool) {
if len(topicData) == 0 {
return nil, true
return nil, nil,true
}
preDate := topicData[0].Seq >> 32
@@ -163,7 +157,7 @@ func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, re
for findPos = 1; findPos < len(topicData); findPos++ {
newDate := topicData[findPos].Seq >> 32
//说明换天了
if preDate != newDate {
if mp.IsSameDay(int64(preDate),int64(newDate)) == false {
break
}
}
@@ -172,15 +166,15 @@ func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, re
ret := mp.persistTopicData(collectName, topicData[:findPos], retryCount)
//如果失败,下次重试
if ret == false {
return nil, false
return nil, nil, false
}
//如果成功
return topicData[findPos:len(topicData)], true
return topicData[findPos:len(topicData)], topicData[0:findPos], true
}
// FindTopicData 查找数据
func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int64) ([]TopicData, bool) {
func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int64,topicBuff []TopicData) ([]TopicData, bool) {
s := mp.mongo.TakeSession()
@@ -222,7 +216,6 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int
}
//序列化返回
topicBuff := mp.getTopicBuff(int(limit))
for i := 0; i < len(res); i++ {
rawData, errM := bson.Marshal(res[i])
if errM != nil {
@@ -257,7 +250,7 @@ func (mp *MongoPersist) getCollectCount(topic string,today string) (int64 ,error
}
// FindTopicData 查找数据
func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64) []TopicData {
func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64,topicBuff []TopicData) []TopicData {
//某表找不到,一直往前找,找到当前置为止
for days := 1; days <= MaxDays; days++ {
//是否可以跳天
@@ -281,7 +274,7 @@ func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int
}
//从startIndex开始一直往后查
topicData, isSucc := mp.findTopicData(topic, startIndex, limit)
topicData, isSucc := mp.findTopicData(topic, startIndex, limit,topicBuff)
//有数据或者数据库出错时返回,返回后,会进行下一轮的查询遍历
if len(topicData) > 0 || isSucc == false {
return topicData
@@ -394,8 +387,7 @@ func (mp *MongoPersist) PersistIndex(topic string, customerId string, index uint
ctx, cancel := s.GetDefaultContext()
defer cancel()
ret, err := s.Collection(mp.dbName, CustomerCollectName).UpdateOne(ctx, condition, updata, UpdateOptionsOpts...)
fmt.Println(ret)
_, err := s.Collection(mp.dbName, CustomerCollectName).UpdateOne(ctx, condition, updata, UpdateOptionsOpts...)
if err != nil {
log.SError("PersistIndex fail :", err.Error())
}

View File

@@ -27,7 +27,7 @@ func (ss *Subscriber) PushTopicDataToQueue(topic string, topics []TopicData) {
}
}
func (ss *Subscriber) PersistTopicData(topic string, topics []TopicData, retryCount int) ([]TopicData, bool) {
func (ss *Subscriber) PersistTopicData(topic string, topics []TopicData, retryCount int) ([]TopicData, []TopicData, bool) {
return ss.dataPersist.PersistTopicData(topic, topics, retryCount)
}

View File

@@ -113,25 +113,28 @@ func (tr *TopicRoom) topicRoomRun() {
}
//如果落地失败最大重试maxTryPersistNum次数
var ret bool
for j := 0; j < maxTryPersistNum; {
for retryCount := 0; retryCount < maxTryPersistNum; {
//持久化处理
stagingBuff, ret = tr.PersistTopicData(tr.topic, stagingBuff, j+1)
//如果存档成功,并且有后续批次,则继续存档
if ret == true && len(stagingBuff) > 0 {
//二次存档不计次数
continue
}
//计数增加一次并且等待100ms继续重试
j += 1
if ret == false {
stagingBuff, savedBuff, ret := tr.PersistTopicData(tr.topic, stagingBuff, retryCount+1)
if ret == true {
// 1. 把成功存储的数据放入内存中
if len(savedBuff) > 0 {
tr.PushTopicDataToQueue(tr.topic, savedBuff)
}
// 2. 如果存档成功,并且有后续批次,则继续存档
if ret == true && len(stagingBuff) > 0 {
continue
}
// 3. 成功了,跳出
break
} else {
//计数增加一次并且等待100ms继续重试
retryCount++
time.Sleep(time.Millisecond * 100)
continue
}
tr.PushTopicDataToQueue(tr.topic, stagingBuff)
break
}
}