优化排行榜RankService

This commit is contained in:
orgin
2022-11-23 16:31:15 +08:00
parent c7e0fcbdbb
commit 975cf93d58
5 changed files with 78 additions and 102 deletions

View File

@@ -14,8 +14,9 @@ import (
"time"
)
const batchRemoveNum = 100
const MaxDays = 180
const batchRemoveNum = 128 //一切删除的最大数量
// RankDataDB 排行表数据
type RankDataDB struct {
Id uint64 `bson:"_id,omitempty"`
RefreshTime int64 `bson:"RefreshTime,omitempty"`
@@ -23,49 +24,52 @@ type RankDataDB struct {
Data []byte `bson:"Data,omitempty"`
}
// MongoPersist持久化Module
type MongoPersist struct {
service.Module
mongo mongodbmodule.MongoModule
url string //连接url
url string //Mongodb连接url
dbName string //数据库名称
SaveInterval time.Duration //落地数据库时间间隔
waitGroup sync.WaitGroup
sync.Mutex
mapRemoveRankData map[uint64]map[uint64]struct{} //将要删除的排行数据 map[RankId]map[Key]struct{}
mapUpsertRankData map[uint64]map[uint64]RankData //需要upsert的排行数据 map[RankId][key]RankData
//mapRankCfg map[uint64]string //map[RankId]RankCollectName
mapRankSkip map[uint64]IRankSkip
maxRetrySaveCount int //存档重试次数
mapRankSkip map[uint64]IRankSkip //所有的排行榜对象map[RankId]IRankSkip
maxRetrySaveCount int //存档重试次数
retryTimeIntervalMs time.Duration //重试时间间隔
stop int32
}
const CustomerCollectName = "SysCustomer"
lastSaveTime time.Time //最后一次存档时间
stop int32 //是否停服
waitGroup sync.WaitGroup //等待停服
}
func (mp *MongoPersist) OnInit() error {
mp.mapRemoveRankData = map[uint64]map[uint64]struct{}{}
mp.mapUpsertRankData = map[uint64]map[uint64]RankData{}
//mp.mapRankCfg = map[uint64]string{}
mp.mapRankSkip = map[uint64]IRankSkip{}
if errC := mp.ReadCfg(); errC != nil {
return errC
}
//初始化MongoDB
err := mp.mongo.Init(mp.url, time.Second*15)
if err != nil {
return err
}
//开始运行
err = mp.mongo.Start()
if err != nil {
log.SError("start dbService[", mp.dbName, "], url[", mp.url, "] init error:", err.Error())
return err
}
//开启协程
coroutine.GoRecover(mp.persistCoroutine,-1)
return nil
}
@@ -76,7 +80,6 @@ func (mp *MongoPersist) ReadCfg() error {
return fmt.Errorf("RankService config is error")
}
//读取数据库配置
saveMongoCfg,ok := mapDBServiceCfg["SaveMongo"]
if ok == false {
@@ -87,8 +90,7 @@ func (mp *MongoPersist) ReadCfg() error {
if ok == false {
return fmt.Errorf("RankService.SaveMongo config is error")
}
//parse MsgRouter
url, ok := mongodbCfg["Url"]
if ok == false {
return fmt.Errorf("RankService.SaveMongo.Url config is error")
@@ -120,7 +122,6 @@ func (mp *MongoPersist) ReadCfg() error {
}
mp.retryTimeIntervalMs = time.Duration(retryTimeIntervalMs.(float64))*time.Millisecond
return nil
}
@@ -190,17 +191,6 @@ func (mp *MongoPersist) loadFromDB(rankId uint64,rankCollectName string) error{
return nil
}
func (mp *MongoPersist) OnEnterRank(rankSkip IRankSkip, enterData *RankData){
mp.Lock()
defer mp.Unlock()
delete(mp.mapRemoveRankData,enterData.Key)
mp.lazyInitUpsertMap(rankSkip.GetRankID())
mp.mapUpsertRankData[rankSkip.GetRankID()][enterData.Key] = *enterData
//mp.SaveToDB()
}
func (mp *MongoPersist) lazyInitRemoveMap(rankId uint64){
if mp.mapRemoveRankData[rankId] == nil {
mp.mapRemoveRankData[rankId] = make(map[uint64]struct{},256)
@@ -213,25 +203,37 @@ func (mp *MongoPersist) lazyInitUpsertMap(rankId uint64){
}
}
func (mp *MongoPersist) OnEnterRank(rankSkip IRankSkip, enterData *RankData){
mp.Lock()
defer mp.Unlock()
delete(mp.mapRemoveRankData,enterData.Key)
mp.lazyInitUpsertMap(rankSkip.GetRankID())
mp.mapUpsertRankData[rankSkip.GetRankID()][enterData.Key] = *enterData
}
func (mp *MongoPersist) OnLeaveRank(rankSkip IRankSkip, leaveData *RankData){
mp.Lock()
defer mp.Unlock()
//先删掉更新中的数据
delete(mp.mapUpsertRankData,leaveData.Key)
mp.lazyInitRemoveMap(rankSkip.GetRankID())
mp.mapRemoveRankData[rankSkip.GetRankID()][leaveData.Key] = struct{}{}
//先删掉更新中的数据
delete(mp.mapUpsertRankData,leaveData.Key)
mp.lazyInitRemoveMap(rankSkip.GetRankID())
mp.mapRemoveRankData[rankSkip.GetRankID()][leaveData.Key] = struct{}{}
}
func (mp *MongoPersist) OnChangeRankData(rankSkip IRankSkip, changeData *RankData){
mp.Lock()
defer mp.Unlock()
//先删掉要删除的数据
delete(mp.mapRemoveRankData,changeData.Key)
//更新数据
mp.lazyInitUpsertMap(rankSkip.GetRankID())
mp.mapUpsertRankData[rankSkip.GetRankID()][changeData.Key] = *changeData
//先删掉要删除的数据
delete(mp.mapRemoveRankData,changeData.Key)
//更新数据
mp.lazyInitUpsertMap(rankSkip.GetRankID())
mp.mapUpsertRankData[rankSkip.GetRankID()][changeData.Key] = *changeData
}
//停存持久化到DB
@@ -240,21 +242,38 @@ func (mp *MongoPersist) OnStop(mapRankSkip map[uint64]*RankSkip){
mp.waitGroup.Wait()
}
func (mp *MongoPersist) JugeTimeoutSave() bool{
timeout := time.Now()
isTimeOut := timeout.Sub(mp.lastSaveTime) >= mp.SaveInterval
if isTimeOut == true {
mp.lastSaveTime = timeout
}
return isTimeOut
}
func (mp *MongoPersist) persistCoroutine(){
mp.waitGroup.Add(1)
defer mp.waitGroup.Done()
for atomic.LoadInt32(&mp.stop)==0 || mp.hasPersistData(){
//间隔时间sleep
time.Sleep(mp.SaveInterval)
time.Sleep(time.Second*1)
//没有持久化数据continue
if mp.hasPersistData() == false {
continue
}
if mp.JugeTimeoutSave() == false{
continue
}
//存档数据到数据库
mp.saveToDB()
}
//退出时存一次档
mp.saveToDB()
}
func (mp *MongoPersist) hasPersistData() bool{
@@ -281,6 +300,7 @@ func (mp *MongoPersist) saveToDB(){
for len(mapRemoveRankData) >0 {
mp.removeRankDataToDB(mapRemoveRankData)
}
}
func (mp *MongoPersist) removeToDB(collectName string,keys []uint64) error{
@@ -289,6 +309,7 @@ func (mp *MongoPersist) removeToDB(collectName string,keys []uint64) error{
defer cancel()
condition := bson.D{{Key: "_id", Value: bson.M{"$in": keys}}}
_, err := s.Collection(mp.dbName, collectName).DeleteMany(ctx, condition)
if err != nil {
log.SError("MongoPersist DeleteMany fail,collect name is ", collectName)
@@ -305,8 +326,8 @@ func (mp *MongoPersist) removeRankData(rankId uint64,keys []uint64) bool {
return false
}
//不成功则重试maxRetrySaveCount次
for i:=0;i<mp.maxRetrySaveCount;i++{
//不成功则一直重试
if mp.removeToDB(rank.GetRankName(),keys)!= nil {
time.Sleep(mp.retryTimeIntervalMs)
continue

View File

@@ -2,9 +2,7 @@ package rankservice
import (
"container/heap"
"github.com/duanhf2012/origin/util/sync"
"time"
)
@@ -97,7 +95,7 @@ func (rd *rankDataHeap) PushOrRefreshExpireKey(key uint64,refreshTimestamp int64
//1.先删掉之前的
expData ,ok := rd.mapExpireData[key]
if ok == true {
expData.RefreshTimestamp = time.Now().UnixNano()
expData.RefreshTimestamp = refreshTimestamp
heap.Fix(rd,expData.Index)
return
}

View File

@@ -7,17 +7,13 @@ import (
type RankDataChangeType int8
type IRankSkip interface {
GetRankID() uint64
GetRankName() string
GetRankLen() uint64
UpsetRank(upsetData *rpc.RankData,refreshTimestamp int64,fromLoad bool) (*RankData, RankDataChangeType)
UpsetRank(upsetData *rpc.RankData,refreshTimestamp int64,fromLoad bool) RankDataChangeType
}
type IRankModule interface {
service.IModule

View File

@@ -9,7 +9,6 @@ import (
)
const PreMapRankSkipLen = 10
type RankService struct {
service.Service
@@ -33,8 +32,6 @@ func (rs *RankService) OnInit() error {
return err
}
return nil
}
@@ -51,26 +48,6 @@ func (rs *RankService) SetupRankModule(rankModule IRankModule) {
rs.rankModule = rankModule
}
// RPC_ManualAddRankSkip 提供手动添加排行榜
func (rs *RankService) RPC_ManualAddRankSkip(addInfo *rpc.AddRankList, addResult *rpc.RankResult) error {
addList := make([]uint64, 0, PreMapRankSkipLen)
for _, addRankListData := range addInfo.AddList {
if addRankListData.RankId == 0 {
rs.deleteRankList(addList)
return fmt.Errorf("RPC_AddRankSkip must has rank id")
}
newSkip := NewRankSkip(addRankListData.RankId,"",addRankListData.IsDec, transformLevel(addRankListData.SkipListLevel), addRankListData.MaxRank,time.Duration(addRankListData.ExpireMs)*time.Millisecond)
newSkip.SetupRankModule(rs.rankModule)
rs.mapRankSkip[addRankListData.RankId] = newSkip
addList = append(addList, addRankListData.RankId)
}
addResult.AddCount = 1
return nil
}
// RPC_UpsetRank 更新排行榜
func (rs *RankService) RPC_UpsetRank(upsetInfo *rpc.UpsetRankData, upsetResult *rpc.RankResult) error {
rankSkip, ok := rs.mapRankSkip[upsetInfo.RankId]

View File

@@ -19,7 +19,7 @@ type RankSkip struct {
rankDataExpire rankDataHeap
}
const MaxPickExpireNum = 2
const MaxPickExpireNum = 128
const (
RankDataNone RankDataChangeType = 0
RankDataAdd RankDataChangeType = 1 //数据插入
@@ -48,7 +48,7 @@ func (rs *RankSkip) pickExpireKey(){
return
}
for i:=1;i<MaxPickExpireNum;i++{
for i:=1;i<=MaxPickExpireNum;i++{
key := rs.rankDataExpire.PopExpireKey()
if key == 0 {
return
@@ -78,40 +78,21 @@ func (rs *RankSkip) GetRankLen() uint64 {
}
func (rs *RankSkip) UpsetRankList(upsetRankData []*rpc.RankData) (addCount int32, modifyCount int32) {
addList := make([]*RankData, 0, 1)
updateList := make([]*RankData, 0, 1)
for _, upsetData := range upsetRankData {
changeData, changeType := rs.UpsetRank(upsetData,time.Now().UnixNano(),false)
if changeData == nil {
continue
}
switch changeType {
case RankDataAdd:
addList = append(addList, changeData)
case RankDataUpdate:
updateList = append(updateList, changeData)
}
changeType := rs.UpsetRank(upsetData,time.Now().UnixNano(),false)
if changeType == RankDataAdd{
addCount+=1
} else if changeType == RankDataUpdate{
modifyCount+=1
}
}
/*
if len(addList) > 0 {
rs.rankModule.OnEnterRank(rs, addList)
}
if len(updateList) > 0 {
rs.rankModule.OnChangeRankData(rs, updateList)
}
*/
addCount = int32(len(addList))
modifyCount = int32(len(updateList))
rs.pickExpireKey()
return
}
// UpsetRank 更新玩家排行数据,返回变化后的数据及变化类型
func (rs *RankSkip) UpsetRank(upsetData *rpc.RankData,refreshTimestamp int64,fromLoad bool) (*RankData, RankDataChangeType) {
func (rs *RankSkip) UpsetRank(upsetData *rpc.RankData,refreshTimestamp int64,fromLoad bool) RankDataChangeType {
rankNode, ok := rs.mapRankData[upsetData.Key]
if ok == true {
//找到的情况对比排名数据是否有变化,无变化进行data更新,有变化则进行删除更新
@@ -122,7 +103,8 @@ func (rs *RankSkip) UpsetRank(upsetData *rpc.RankData,refreshTimestamp int64,fro
if fromLoad == false {
rs.rankModule.OnChangeRankData(rs,rankNode)
}
return nil, RankDataNone
rs.rankDataExpire.PushOrRefreshExpireKey(upsetData.Key,refreshTimestamp)
return RankDataUpdate
}
if upsetData.Data == nil {
@@ -139,9 +121,9 @@ func (rs *RankSkip) UpsetRank(upsetData *rpc.RankData,refreshTimestamp int64,fro
rs.rankDataExpire.PushOrRefreshExpireKey(upsetData.Key,refreshTimestamp)
if fromLoad == false {
rs.rankModule.OnChangeRankData(rs, rankNode)
rs.rankModule.OnChangeRankData(rs, newRankData)
}
return newRankData, RankDataUpdate
return RankDataUpdate
}
if rs.checkInsertAndReplace(upsetData) {
@@ -154,10 +136,10 @@ func (rs *RankSkip) UpsetRank(upsetData *rpc.RankData,refreshTimestamp int64,fro
rs.rankModule.OnEnterRank(rs, newRankData)
}
return newRankData, RankDataAdd
return RankDataAdd
}
return nil, RankDataNone
return RankDataNone
}
// DeleteRankData 删除排行数据
@@ -178,7 +160,6 @@ func (rs *RankSkip) DeleteRankData(delKeys []uint64) int32 {
ReleaseRankData(rankData)
}
rs.pickExpireKey()
return removeRankData
}
@@ -189,12 +170,14 @@ func (rs *RankSkip) GetRankNodeData(findKey uint64) (*RankData, uint64) {
return nil, 0
}
rs.pickExpireKey()
_, index := rs.skipList.GetWithPosition(rankNode)
return rankNode, index+1
}
// GetRankNodeDataByPos 获取,返回排名节点与名次
func (rs *RankSkip) GetRankNodeDataByRank(rank uint64) (*RankData, uint64) {
rs.pickExpireKey()
rankNode := rs.skipList.ByPosition(rank-1)
if rankNode == nil {
return nil, 0
@@ -266,6 +249,7 @@ func (rs *RankSkip) GetRankDataFromToLimit(startPos, count uint64, result *rpc.R
return nil
}
rs.pickExpireKey()
if result.RankDataCount < startPos {
startPos = result.RankDataCount - 1
}