diff --git a/sysservice/rankservice/MongodbPersist.go b/sysservice/rankservice/MongodbPersist.go index 3f0b785..2f549bd 100644 --- a/sysservice/rankservice/MongodbPersist.go +++ b/sysservice/rankservice/MongodbPersist.go @@ -14,8 +14,9 @@ import ( "time" ) -const batchRemoveNum = 100 -const MaxDays = 180 +const batchRemoveNum = 128 //一切删除的最大数量 + +// RankDataDB 排行表数据 type RankDataDB struct { Id uint64 `bson:"_id,omitempty"` RefreshTime int64 `bson:"RefreshTime,omitempty"` @@ -23,49 +24,52 @@ type RankDataDB struct { Data []byte `bson:"Data,omitempty"` } +// MongoPersist持久化Module type MongoPersist struct { service.Module mongo mongodbmodule.MongoModule - url string //连接url + url string //Mongodb连接url dbName string //数据库名称 SaveInterval time.Duration //落地数据库时间间隔 - waitGroup sync.WaitGroup sync.Mutex mapRemoveRankData map[uint64]map[uint64]struct{} //将要删除的排行数据 map[RankId]map[Key]struct{} mapUpsertRankData map[uint64]map[uint64]RankData //需要upsert的排行数据 map[RankId][key]RankData - //mapRankCfg map[uint64]string //map[RankId]RankCollectName - mapRankSkip map[uint64]IRankSkip - maxRetrySaveCount int //存档重试次数 + mapRankSkip map[uint64]IRankSkip //所有的排行榜对象map[RankId]IRankSkip + maxRetrySaveCount int //存档重试次数 retryTimeIntervalMs time.Duration //重试时间间隔 - stop int32 -} -const CustomerCollectName = "SysCustomer" + lastSaveTime time.Time //最后一次存档时间 + + stop int32 //是否停服 + waitGroup sync.WaitGroup //等待停服 +} func (mp *MongoPersist) OnInit() error { mp.mapRemoveRankData = map[uint64]map[uint64]struct{}{} mp.mapUpsertRankData = map[uint64]map[uint64]RankData{} - //mp.mapRankCfg = map[uint64]string{} mp.mapRankSkip = map[uint64]IRankSkip{} if errC := mp.ReadCfg(); errC != nil { return errC } + //初始化MongoDB err := mp.mongo.Init(mp.url, time.Second*15) if err != nil { return err } + //开始运行 err = mp.mongo.Start() if err != nil { log.SError("start dbService[", mp.dbName, "], url[", mp.url, "] init error:", err.Error()) return err } + //开启协程 coroutine.GoRecover(mp.persistCoroutine,-1) return nil } @@ -76,7 +80,6 @@ func (mp *MongoPersist) ReadCfg() error { return fmt.Errorf("RankService config is error") } - //读取数据库配置 saveMongoCfg,ok := mapDBServiceCfg["SaveMongo"] if ok == false { @@ -87,8 +90,7 @@ func (mp *MongoPersist) ReadCfg() error { if ok == false { return fmt.Errorf("RankService.SaveMongo config is error") } - - //parse MsgRouter + url, ok := mongodbCfg["Url"] if ok == false { return fmt.Errorf("RankService.SaveMongo.Url config is error") @@ -120,7 +122,6 @@ func (mp *MongoPersist) ReadCfg() error { } mp.retryTimeIntervalMs = time.Duration(retryTimeIntervalMs.(float64))*time.Millisecond - return nil } @@ -190,17 +191,6 @@ func (mp *MongoPersist) loadFromDB(rankId uint64,rankCollectName string) error{ return nil } -func (mp *MongoPersist) OnEnterRank(rankSkip IRankSkip, enterData *RankData){ - mp.Lock() - defer mp.Unlock() - - delete(mp.mapRemoveRankData,enterData.Key) - - mp.lazyInitUpsertMap(rankSkip.GetRankID()) - mp.mapUpsertRankData[rankSkip.GetRankID()][enterData.Key] = *enterData - //mp.SaveToDB() -} - func (mp *MongoPersist) lazyInitRemoveMap(rankId uint64){ if mp.mapRemoveRankData[rankId] == nil { mp.mapRemoveRankData[rankId] = make(map[uint64]struct{},256) @@ -213,25 +203,37 @@ func (mp *MongoPersist) lazyInitUpsertMap(rankId uint64){ } } +func (mp *MongoPersist) OnEnterRank(rankSkip IRankSkip, enterData *RankData){ + mp.Lock() + defer mp.Unlock() + + delete(mp.mapRemoveRankData,enterData.Key) + + mp.lazyInitUpsertMap(rankSkip.GetRankID()) + mp.mapUpsertRankData[rankSkip.GetRankID()][enterData.Key] = *enterData +} + + func (mp *MongoPersist) OnLeaveRank(rankSkip IRankSkip, leaveData *RankData){ mp.Lock() defer mp.Unlock() - //先删掉更新中的数据 - delete(mp.mapUpsertRankData,leaveData.Key) - mp.lazyInitRemoveMap(rankSkip.GetRankID()) - mp.mapRemoveRankData[rankSkip.GetRankID()][leaveData.Key] = struct{}{} + //先删掉更新中的数据 + delete(mp.mapUpsertRankData,leaveData.Key) + mp.lazyInitRemoveMap(rankSkip.GetRankID()) + mp.mapRemoveRankData[rankSkip.GetRankID()][leaveData.Key] = struct{}{} } func (mp *MongoPersist) OnChangeRankData(rankSkip IRankSkip, changeData *RankData){ mp.Lock() defer mp.Unlock() - //先删掉要删除的数据 - delete(mp.mapRemoveRankData,changeData.Key) - //更新数据 - mp.lazyInitUpsertMap(rankSkip.GetRankID()) - mp.mapUpsertRankData[rankSkip.GetRankID()][changeData.Key] = *changeData + //先删掉要删除的数据 + delete(mp.mapRemoveRankData,changeData.Key) + + //更新数据 + mp.lazyInitUpsertMap(rankSkip.GetRankID()) + mp.mapUpsertRankData[rankSkip.GetRankID()][changeData.Key] = *changeData } //停存持久化到DB @@ -240,21 +242,38 @@ func (mp *MongoPersist) OnStop(mapRankSkip map[uint64]*RankSkip){ mp.waitGroup.Wait() } +func (mp *MongoPersist) JugeTimeoutSave() bool{ + timeout := time.Now() + isTimeOut := timeout.Sub(mp.lastSaveTime) >= mp.SaveInterval + if isTimeOut == true { + mp.lastSaveTime = timeout + } + + return isTimeOut +} + func (mp *MongoPersist) persistCoroutine(){ mp.waitGroup.Add(1) defer mp.waitGroup.Done() for atomic.LoadInt32(&mp.stop)==0 || mp.hasPersistData(){ //间隔时间sleep - time.Sleep(mp.SaveInterval) + time.Sleep(time.Second*1) //没有持久化数据continue if mp.hasPersistData() == false { continue } + if mp.JugeTimeoutSave() == false{ + continue + } + //存档数据到数据库 mp.saveToDB() } + + //退出时存一次档 + mp.saveToDB() } func (mp *MongoPersist) hasPersistData() bool{ @@ -281,6 +300,7 @@ func (mp *MongoPersist) saveToDB(){ for len(mapRemoveRankData) >0 { mp.removeRankDataToDB(mapRemoveRankData) } + } func (mp *MongoPersist) removeToDB(collectName string,keys []uint64) error{ @@ -289,6 +309,7 @@ func (mp *MongoPersist) removeToDB(collectName string,keys []uint64) error{ defer cancel() condition := bson.D{{Key: "_id", Value: bson.M{"$in": keys}}} + _, err := s.Collection(mp.dbName, collectName).DeleteMany(ctx, condition) if err != nil { log.SError("MongoPersist DeleteMany fail,collect name is ", collectName) @@ -305,8 +326,8 @@ func (mp *MongoPersist) removeRankData(rankId uint64,keys []uint64) bool { return false } + //不成功则重试maxRetrySaveCount次 for i:=0;i 0 { - rs.rankModule.OnEnterRank(rs, addList) - } - - if len(updateList) > 0 { - rs.rankModule.OnChangeRankData(rs, updateList) - } -*/ - addCount = int32(len(addList)) - modifyCount = int32(len(updateList)) - rs.pickExpireKey() return } // UpsetRank 更新玩家排行数据,返回变化后的数据及变化类型 -func (rs *RankSkip) UpsetRank(upsetData *rpc.RankData,refreshTimestamp int64,fromLoad bool) (*RankData, RankDataChangeType) { +func (rs *RankSkip) UpsetRank(upsetData *rpc.RankData,refreshTimestamp int64,fromLoad bool) RankDataChangeType { rankNode, ok := rs.mapRankData[upsetData.Key] if ok == true { //找到的情况对比排名数据是否有变化,无变化进行data更新,有变化则进行删除更新 @@ -122,7 +103,8 @@ func (rs *RankSkip) UpsetRank(upsetData *rpc.RankData,refreshTimestamp int64,fro if fromLoad == false { rs.rankModule.OnChangeRankData(rs,rankNode) } - return nil, RankDataNone + rs.rankDataExpire.PushOrRefreshExpireKey(upsetData.Key,refreshTimestamp) + return RankDataUpdate } if upsetData.Data == nil { @@ -139,9 +121,9 @@ func (rs *RankSkip) UpsetRank(upsetData *rpc.RankData,refreshTimestamp int64,fro rs.rankDataExpire.PushOrRefreshExpireKey(upsetData.Key,refreshTimestamp) if fromLoad == false { - rs.rankModule.OnChangeRankData(rs, rankNode) + rs.rankModule.OnChangeRankData(rs, newRankData) } - return newRankData, RankDataUpdate + return RankDataUpdate } if rs.checkInsertAndReplace(upsetData) { @@ -154,10 +136,10 @@ func (rs *RankSkip) UpsetRank(upsetData *rpc.RankData,refreshTimestamp int64,fro rs.rankModule.OnEnterRank(rs, newRankData) } - return newRankData, RankDataAdd + return RankDataAdd } - return nil, RankDataNone + return RankDataNone } // DeleteRankData 删除排行数据 @@ -178,7 +160,6 @@ func (rs *RankSkip) DeleteRankData(delKeys []uint64) int32 { ReleaseRankData(rankData) } - rs.pickExpireKey() return removeRankData } @@ -189,12 +170,14 @@ func (rs *RankSkip) GetRankNodeData(findKey uint64) (*RankData, uint64) { return nil, 0 } + rs.pickExpireKey() _, index := rs.skipList.GetWithPosition(rankNode) return rankNode, index+1 } // GetRankNodeDataByPos 获取,返回排名节点与名次 func (rs *RankSkip) GetRankNodeDataByRank(rank uint64) (*RankData, uint64) { + rs.pickExpireKey() rankNode := rs.skipList.ByPosition(rank-1) if rankNode == nil { return nil, 0 @@ -266,6 +249,7 @@ func (rs *RankSkip) GetRankDataFromToLimit(startPos, count uint64, result *rpc.R return nil } + rs.pickExpireKey() if result.RankDataCount < startPos { startPos = result.RankDataCount - 1 }