diff --git a/sysmodule/mongodbmodule/mongodbmodule.go b/sysmodule/mongodbmodule/mongodbmodule.go index a882538..5088287 100644 --- a/sysmodule/mongodbmodule/mongodbmodule.go +++ b/sysmodule/mongodbmodule/mongodbmodule.go @@ -68,34 +68,39 @@ func (s *Session) NextSeq(db string, collection string, id interface{}) (int, er after := options.After updateOpts := options.FindOneAndUpdateOptions{ReturnDocument: &after} - err := s.Client.Database(db).Collection(collection).FindOneAndUpdate(ctxTimeout, bson.M{"_id": id}, bson.M{"$inc": bson.M{"Seq": 1}},&updateOpts).Decode(&res) + err := s.Client.Database(db).Collection(collection).FindOneAndUpdate(ctxTimeout, bson.M{"_id": id}, bson.M{"$inc": bson.M{"Seq": 1}}, &updateOpts).Decode(&res) return res.Seq, err } -//indexKeys[索引][每个索引key字段] -func (s *Session) EnsureIndex(db string, collection string, indexKeys [][]string, bBackground bool,sparse bool) error { - return s.ensureIndex(db, collection, indexKeys, bBackground, false,sparse) +// indexKeys[索引][每个索引key字段] +func (s *Session) EnsureIndex(db string, collection string, indexKeys [][]string, bBackground bool, sparse bool, asc bool) error { + return s.ensureIndex(db, collection, indexKeys, bBackground, false, sparse, asc) } -//indexKeys[索引][每个索引key字段] -func (s *Session) EnsureUniqueIndex(db string, collection string, indexKeys [][]string, bBackground bool,sparse bool) error { - return s.ensureIndex(db, collection, indexKeys, bBackground, true,sparse) +// indexKeys[索引][每个索引key字段] +func (s *Session) EnsureUniqueIndex(db string, collection string, indexKeys [][]string, bBackground bool, sparse bool, asc bool) error { + return s.ensureIndex(db, collection, indexKeys, bBackground, true, sparse, asc) } -//keys[索引][每个索引key字段] -func (s *Session) ensureIndex(db string, collection string, indexKeys [][]string, bBackground bool, unique bool,sparse bool) error { +// keys[索引][每个索引key字段] +func (s *Session) ensureIndex(db string, collection string, indexKeys [][]string, bBackground bool, unique bool, sparse bool, asc bool) error { var indexes []mongo.IndexModel for _, keys := range indexKeys { keysDoc := bsonx.Doc{} for _, key := range keys { - keysDoc = keysDoc.Append(key, bsonx.Int32(1)) + if asc { + keysDoc = keysDoc.Append(key, bsonx.Int32(1)) + } else { + keysDoc = keysDoc.Append(key, bsonx.Int32(-1)) + } + } - options:= options.Index().SetUnique(unique).SetBackground(bBackground) + options := options.Index().SetUnique(unique).SetBackground(bBackground) if sparse == true { options.SetSparse(true) } - indexes = append(indexes, mongo.IndexModel{Keys: keysDoc, Options:options }) + indexes = append(indexes, mongo.IndexModel{Keys: keysDoc, Options: options}) } ctxTimeout, cancel := context.WithTimeout(context.Background(), s.maxOperatorTimeOut) diff --git a/sysservice/messagequeueservice/CustomerSubscriber.go b/sysservice/messagequeueservice/CustomerSubscriber.go index 2ffbffc..43746c3 100644 --- a/sysservice/messagequeueservice/CustomerSubscriber.go +++ b/sysservice/messagequeueservice/CustomerSubscriber.go @@ -163,9 +163,9 @@ func (cs *CustomerSubscriber) subscribe() bool { cs.publishToCustomer(topicData) return true } - + //从持久化数据中来找 - topicData = cs.subscriber.dataPersist.FindTopicData(cs.topic, cs.StartIndex, int64(cs.oneBatchQuantity)) + topicData = cs.subscriber.dataPersist.FindTopicData(cs.topic, cs.StartIndex, int64(cs.oneBatchQuantity),cs.topicCache[:0]) return cs.publishToCustomer(topicData) } @@ -190,7 +190,7 @@ func (cs *CustomerSubscriber) publishToCustomer(topicData []TopicData) bool { if len(topicData) == 0 { //没有任何数据待一秒吧 - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Second * 1) return true } diff --git a/sysservice/messagequeueservice/MessageQueueService.go b/sysservice/messagequeueservice/MessageQueueService.go index 2a694cf..77e6032 100644 --- a/sysservice/messagequeueservice/MessageQueueService.go +++ b/sysservice/messagequeueservice/MessageQueueService.go @@ -16,7 +16,7 @@ type QueueDataPersist interface { OnReceiveTopicData(topic string, topicData []TopicData) //当收到推送过来的数据时 OnPushTopicDataToCustomer(topic string, topicData []TopicData) //当推送数据到Customer时回调 PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool) //持久化数据,失败则返回false,上层会重复尝试,直到成功,建议在函数中加入次数,超过次数则返回true - FindTopicData(topic string, startIndex uint64, limit int64) []TopicData //查找数据,参数bool代表数据库查找是否成功 + FindTopicData(topic string, startIndex uint64, limit int64, topicBuff []TopicData) []TopicData //查找数据,参数bool代表数据库查找是否成功 LoadCustomerIndex(topic string, customerId string) (uint64, bool) //false时代表获取失败,一般是读取错误,会进行重试。如果不存在时,返回(0,true) GetIndex(topicData *TopicData) uint64 //通过topic数据获取进度索引号 PersistIndex(topic string, customerId string, index uint64) //持久化进度索引号 diff --git a/sysservice/messagequeueservice/MongoPersist.go b/sysservice/messagequeueservice/MongoPersist.go index 062d578..c329ccf 100644 --- a/sysservice/messagequeueservice/MongoPersist.go +++ b/sysservice/messagequeueservice/MongoPersist.go @@ -20,8 +20,6 @@ type MongoPersist struct { url string //连接url dbName string //数据库名称 retryCount int //落地数据库重试次数 - - topic []TopicData //用于临时缓存 } const CustomerCollectName = "SysCustomer" @@ -48,7 +46,7 @@ func (mp *MongoPersist) OnInit() error { keys = append(keys, "Customer", "Topic") IndexKey = append(IndexKey, keys) s := mp.mongo.TakeSession() - if err := s.EnsureUniqueIndex(mp.dbName, CustomerCollectName, IndexKey, true, true); err != nil { + if err := s.EnsureUniqueIndex(mp.dbName, CustomerCollectName, IndexKey, true, true,true); err != nil { log.SError("EnsureUniqueIndex is fail ", err.Error()) return err } @@ -85,14 +83,6 @@ func (mp *MongoPersist) ReadCfg() error { return nil } -func (mp *MongoPersist) getTopicBuff(limit int) []TopicData { - if cap(mp.topic) < limit { - mp.topic = make([]TopicData, limit) - } - - return mp.topic[:0] -} - func (mp *MongoPersist) OnExit() { } @@ -184,7 +174,7 @@ func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, re } // FindTopicData 查找数据 -func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int64) ([]TopicData, bool) { +func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int64,topicBuff []TopicData) ([]TopicData, bool) { s := mp.mongo.TakeSession() @@ -226,7 +216,6 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int } //序列化返回 - topicBuff := mp.getTopicBuff(int(limit)) for i := 0; i < len(res); i++ { rawData, errM := bson.Marshal(res[i]) if errM != nil { @@ -261,7 +250,7 @@ func (mp *MongoPersist) getCollectCount(topic string,today string) (int64 ,error } // FindTopicData 查找数据 -func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64) []TopicData { +func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64,topicBuff []TopicData) []TopicData { //某表找不到,一直往前找,找到当前置为止 for days := 1; days <= MaxDays; days++ { //是否可以跳天 @@ -285,7 +274,7 @@ func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int } //从startIndex开始一直往后查 - topicData, isSucc := mp.findTopicData(topic, startIndex, limit) + topicData, isSucc := mp.findTopicData(topic, startIndex, limit,topicBuff) //有数据或者数据库出错时返回,返回后,会进行下一轮的查询遍历 if len(topicData) > 0 || isSucc == false { return topicData