mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-15 00:04:46 +08:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8c8d681093 | ||
|
|
b8150cfc51 | ||
|
|
3833884777 |
@@ -68,34 +68,39 @@ func (s *Session) NextSeq(db string, collection string, id interface{}) (int, er
|
|||||||
|
|
||||||
after := options.After
|
after := options.After
|
||||||
updateOpts := options.FindOneAndUpdateOptions{ReturnDocument: &after}
|
updateOpts := options.FindOneAndUpdateOptions{ReturnDocument: &after}
|
||||||
err := s.Client.Database(db).Collection(collection).FindOneAndUpdate(ctxTimeout, bson.M{"_id": id}, bson.M{"$inc": bson.M{"Seq": 1}},&updateOpts).Decode(&res)
|
err := s.Client.Database(db).Collection(collection).FindOneAndUpdate(ctxTimeout, bson.M{"_id": id}, bson.M{"$inc": bson.M{"Seq": 1}}, &updateOpts).Decode(&res)
|
||||||
return res.Seq, err
|
return res.Seq, err
|
||||||
}
|
}
|
||||||
|
|
||||||
//indexKeys[索引][每个索引key字段]
|
// indexKeys[索引][每个索引key字段]
|
||||||
func (s *Session) EnsureIndex(db string, collection string, indexKeys [][]string, bBackground bool,sparse bool) error {
|
func (s *Session) EnsureIndex(db string, collection string, indexKeys [][]string, bBackground bool, sparse bool, asc bool) error {
|
||||||
return s.ensureIndex(db, collection, indexKeys, bBackground, false,sparse)
|
return s.ensureIndex(db, collection, indexKeys, bBackground, false, sparse, asc)
|
||||||
}
|
}
|
||||||
|
|
||||||
//indexKeys[索引][每个索引key字段]
|
// indexKeys[索引][每个索引key字段]
|
||||||
func (s *Session) EnsureUniqueIndex(db string, collection string, indexKeys [][]string, bBackground bool,sparse bool) error {
|
func (s *Session) EnsureUniqueIndex(db string, collection string, indexKeys [][]string, bBackground bool, sparse bool, asc bool) error {
|
||||||
return s.ensureIndex(db, collection, indexKeys, bBackground, true,sparse)
|
return s.ensureIndex(db, collection, indexKeys, bBackground, true, sparse, asc)
|
||||||
}
|
}
|
||||||
|
|
||||||
//keys[索引][每个索引key字段]
|
// keys[索引][每个索引key字段]
|
||||||
func (s *Session) ensureIndex(db string, collection string, indexKeys [][]string, bBackground bool, unique bool,sparse bool) error {
|
func (s *Session) ensureIndex(db string, collection string, indexKeys [][]string, bBackground bool, unique bool, sparse bool, asc bool) error {
|
||||||
var indexes []mongo.IndexModel
|
var indexes []mongo.IndexModel
|
||||||
for _, keys := range indexKeys {
|
for _, keys := range indexKeys {
|
||||||
keysDoc := bsonx.Doc{}
|
keysDoc := bsonx.Doc{}
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
keysDoc = keysDoc.Append(key, bsonx.Int32(1))
|
if asc {
|
||||||
|
keysDoc = keysDoc.Append(key, bsonx.Int32(1))
|
||||||
|
} else {
|
||||||
|
keysDoc = keysDoc.Append(key, bsonx.Int32(-1))
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
options:= options.Index().SetUnique(unique).SetBackground(bBackground)
|
options := options.Index().SetUnique(unique).SetBackground(bBackground)
|
||||||
if sparse == true {
|
if sparse == true {
|
||||||
options.SetSparse(true)
|
options.SetSparse(true)
|
||||||
}
|
}
|
||||||
indexes = append(indexes, mongo.IndexModel{Keys: keysDoc, Options:options })
|
indexes = append(indexes, mongo.IndexModel{Keys: keysDoc, Options: options})
|
||||||
}
|
}
|
||||||
|
|
||||||
ctxTimeout, cancel := context.WithTimeout(context.Background(), s.maxOperatorTimeOut)
|
ctxTimeout, cancel := context.WithTimeout(context.Background(), s.maxOperatorTimeOut)
|
||||||
|
|||||||
@@ -163,9 +163,9 @@ func (cs *CustomerSubscriber) subscribe() bool {
|
|||||||
cs.publishToCustomer(topicData)
|
cs.publishToCustomer(topicData)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
//从持久化数据中来找
|
//从持久化数据中来找
|
||||||
topicData = cs.subscriber.dataPersist.FindTopicData(cs.topic, cs.StartIndex, int64(cs.oneBatchQuantity))
|
topicData = cs.subscriber.dataPersist.FindTopicData(cs.topic, cs.StartIndex, int64(cs.oneBatchQuantity),cs.topicCache[:0])
|
||||||
return cs.publishToCustomer(topicData)
|
return cs.publishToCustomer(topicData)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -190,7 +190,7 @@ func (cs *CustomerSubscriber) publishToCustomer(topicData []TopicData) bool {
|
|||||||
|
|
||||||
if len(topicData) == 0 {
|
if len(topicData) == 0 {
|
||||||
//没有任何数据待一秒吧
|
//没有任何数据待一秒吧
|
||||||
time.Sleep(time.Millisecond * 100)
|
time.Sleep(time.Second * 1)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ type QueueDataPersist interface {
|
|||||||
OnReceiveTopicData(topic string, topicData []TopicData) //当收到推送过来的数据时
|
OnReceiveTopicData(topic string, topicData []TopicData) //当收到推送过来的数据时
|
||||||
OnPushTopicDataToCustomer(topic string, topicData []TopicData) //当推送数据到Customer时回调
|
OnPushTopicDataToCustomer(topic string, topicData []TopicData) //当推送数据到Customer时回调
|
||||||
PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool) //持久化数据,失败则返回false,上层会重复尝试,直到成功,建议在函数中加入次数,超过次数则返回true
|
PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool) //持久化数据,失败则返回false,上层会重复尝试,直到成功,建议在函数中加入次数,超过次数则返回true
|
||||||
FindTopicData(topic string, startIndex uint64, limit int64) []TopicData //查找数据,参数bool代表数据库查找是否成功
|
FindTopicData(topic string, startIndex uint64, limit int64, topicBuff []TopicData) []TopicData //查找数据,参数bool代表数据库查找是否成功
|
||||||
LoadCustomerIndex(topic string, customerId string) (uint64, bool) //false时代表获取失败,一般是读取错误,会进行重试。如果不存在时,返回(0,true)
|
LoadCustomerIndex(topic string, customerId string) (uint64, bool) //false时代表获取失败,一般是读取错误,会进行重试。如果不存在时,返回(0,true)
|
||||||
GetIndex(topicData *TopicData) uint64 //通过topic数据获取进度索引号
|
GetIndex(topicData *TopicData) uint64 //通过topic数据获取进度索引号
|
||||||
PersistIndex(topic string, customerId string, index uint64) //持久化进度索引号
|
PersistIndex(topic string, customerId string, index uint64) //持久化进度索引号
|
||||||
|
|||||||
@@ -20,8 +20,6 @@ type MongoPersist struct {
|
|||||||
url string //连接url
|
url string //连接url
|
||||||
dbName string //数据库名称
|
dbName string //数据库名称
|
||||||
retryCount int //落地数据库重试次数
|
retryCount int //落地数据库重试次数
|
||||||
|
|
||||||
topic []TopicData //用于临时缓存
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const CustomerCollectName = "SysCustomer"
|
const CustomerCollectName = "SysCustomer"
|
||||||
@@ -48,7 +46,7 @@ func (mp *MongoPersist) OnInit() error {
|
|||||||
keys = append(keys, "Customer", "Topic")
|
keys = append(keys, "Customer", "Topic")
|
||||||
IndexKey = append(IndexKey, keys)
|
IndexKey = append(IndexKey, keys)
|
||||||
s := mp.mongo.TakeSession()
|
s := mp.mongo.TakeSession()
|
||||||
if err := s.EnsureUniqueIndex(mp.dbName, CustomerCollectName, IndexKey, true, true); err != nil {
|
if err := s.EnsureUniqueIndex(mp.dbName, CustomerCollectName, IndexKey, true, true,true); err != nil {
|
||||||
log.SError("EnsureUniqueIndex is fail ", err.Error())
|
log.SError("EnsureUniqueIndex is fail ", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -85,14 +83,6 @@ func (mp *MongoPersist) ReadCfg() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MongoPersist) getTopicBuff(limit int) []TopicData {
|
|
||||||
if cap(mp.topic) < limit {
|
|
||||||
mp.topic = make([]TopicData, limit)
|
|
||||||
}
|
|
||||||
|
|
||||||
return mp.topic[:0]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mp *MongoPersist) OnExit() {
|
func (mp *MongoPersist) OnExit() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -184,7 +174,7 @@ func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, re
|
|||||||
}
|
}
|
||||||
|
|
||||||
// FindTopicData 查找数据
|
// FindTopicData 查找数据
|
||||||
func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int64) ([]TopicData, bool) {
|
func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int64,topicBuff []TopicData) ([]TopicData, bool) {
|
||||||
s := mp.mongo.TakeSession()
|
s := mp.mongo.TakeSession()
|
||||||
|
|
||||||
|
|
||||||
@@ -226,7 +216,6 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int
|
|||||||
}
|
}
|
||||||
|
|
||||||
//序列化返回
|
//序列化返回
|
||||||
topicBuff := mp.getTopicBuff(int(limit))
|
|
||||||
for i := 0; i < len(res); i++ {
|
for i := 0; i < len(res); i++ {
|
||||||
rawData, errM := bson.Marshal(res[i])
|
rawData, errM := bson.Marshal(res[i])
|
||||||
if errM != nil {
|
if errM != nil {
|
||||||
@@ -261,7 +250,7 @@ func (mp *MongoPersist) getCollectCount(topic string,today string) (int64 ,error
|
|||||||
}
|
}
|
||||||
|
|
||||||
// FindTopicData 查找数据
|
// FindTopicData 查找数据
|
||||||
func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64) []TopicData {
|
func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64,topicBuff []TopicData) []TopicData {
|
||||||
//某表找不到,一直往前找,找到当前置为止
|
//某表找不到,一直往前找,找到当前置为止
|
||||||
for days := 1; days <= MaxDays; days++ {
|
for days := 1; days <= MaxDays; days++ {
|
||||||
//是否可以跳天
|
//是否可以跳天
|
||||||
@@ -285,7 +274,7 @@ func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int
|
|||||||
}
|
}
|
||||||
|
|
||||||
//从startIndex开始一直往后查
|
//从startIndex开始一直往后查
|
||||||
topicData, isSucc := mp.findTopicData(topic, startIndex, limit)
|
topicData, isSucc := mp.findTopicData(topic, startIndex, limit,topicBuff)
|
||||||
//有数据或者数据库出错时返回,返回后,会进行下一轮的查询遍历
|
//有数据或者数据库出错时返回,返回后,会进行下一轮的查询遍历
|
||||||
if len(topicData) > 0 || isSucc == false {
|
if len(topicData) > 0 || isSucc == false {
|
||||||
return topicData
|
return topicData
|
||||||
|
|||||||
Reference in New Issue
Block a user