优化RankService排行榜服务

This commit is contained in:
orgin
2022-11-19 16:00:16 +08:00
parent 95b4e2f8de
commit 5bea050f63
7 changed files with 457 additions and 230 deletions

View File

@@ -15,7 +15,7 @@ var RankDataPool = sync.NewPoolEx(make(chan sync.IPoolData, 10240), func() sync.
type RankData struct {
*rpc.RankData
bRelease bool
ref bool
compareFunc func(other skip.Comparator) int
}

View File

@@ -0,0 +1,127 @@
package rankservice
import (
"container/heap"
"github.com/duanhf2012/origin/util/sync"
"time"
)
var expireDataPool = sync.NewPoolEx(make(chan sync.IPoolData, 10240), func() sync.IPoolData {
return &ExpireData{}
})
type ExpireData struct {
Index int
Key uint64
RefreshTimestamp int64
ref bool
}
type rankDataHeap struct {
rankDatas []*ExpireData
expireMs int64
mapExpireData map[uint64]*ExpireData
}
var expireData ExpireData
func (ed *ExpireData) Reset(){
*ed = expireData
}
func (ed *ExpireData) IsRef() bool{
return ed.ref
}
func (ed *ExpireData) Ref(){
ed.ref = true
}
func (ed *ExpireData) UnRef(){
ed.ref = false
}
func (rd *rankDataHeap) Init(maxRankDataCount int32,expireMs time.Duration){
rd.rankDatas = make([]*ExpireData,0,maxRankDataCount)
rd.expireMs = int64(expireMs)
rd.mapExpireData = make(map[uint64]*ExpireData,512)
heap.Init(rd)
}
func (rd *rankDataHeap) Len() int {
return len(rd.rankDatas)
}
func (rd *rankDataHeap) Less(i, j int) bool {
return rd.rankDatas[i].RefreshTimestamp < rd.rankDatas[j].RefreshTimestamp
}
func (rd *rankDataHeap) Swap(i, j int) {
rd.rankDatas[i], rd.rankDatas[j] = rd.rankDatas[j], rd.rankDatas[i]
rd.rankDatas[i].Index,rd.rankDatas[j].Index = i,j
}
func (rd *rankDataHeap) Push(x interface{}) {
ed := x.(*ExpireData)
ed.Index = len(rd.rankDatas)
rd.rankDatas = append(rd.rankDatas,ed)
}
func (rd *rankDataHeap) Pop() (ret interface{}) {
l := len(rd.rankDatas)
var retData *ExpireData
rd.rankDatas, retData = rd.rankDatas[:l-1], rd.rankDatas[l-1]
retData.Index = -1
ret = retData
return
}
func (rd *rankDataHeap) PopExpireKey() uint64{
if rd.Len() <= 0 {
return 0
}
if rd.rankDatas[0].RefreshTimestamp+rd.expireMs > time.Now().UnixNano() {
return 0
}
rankData := heap.Pop(rd).(*ExpireData)
delete(rd.mapExpireData,rankData.Key)
return rankData.Key
}
func (rd *rankDataHeap) PushOrRefreshExpireKey(key uint64){
//1.先删掉之前的
expData ,ok := rd.mapExpireData[key]
if ok == true {
expData.RefreshTimestamp = time.Now().UnixNano()
heap.Fix(rd,expData.Index)
return
}
//2.直接插入
expData = expireDataPool.Get().(*ExpireData)
expData.Key = key
expData.RefreshTimestamp = time.Now().UnixNano()
rd.mapExpireData[key] = expData
heap.Push(rd,expData)
}
func (rd *rankDataHeap) RemoveExpireKey(key uint64){
expData ,ok := rd.mapExpireData[key]
if ok == false {
return
}
delete(rd.mapExpireData,key)
heap.Remove(rd,expData.Index)
expireDataPool.Put(expData)
}

View File

@@ -4,23 +4,13 @@ import "github.com/duanhf2012/origin/service"
type RankDataChangeType int8
const (
RankDataNone RankDataChangeType = 0
RankDataAdd RankDataChangeType = 1 //数据插入
RankDataUpdate RankDataChangeType = 2 //数据更新
RankDataDelete RankDataChangeType = 3 //数据删除
)
type IRankSkip interface {
GetRankID() uint64
GetRankLen() uint64
}
// RankDataChangeCallBack 排行数据变化时调用
//type RankDataChangeCallBack interface {
// CB(iRankService service.IService, rankSkip IRankSkip, changeType RankDataChangeType, changed []*RankData)
//}
type IRankModule interface {
service.IModule

View File

@@ -3,22 +3,12 @@ package rankservice
import (
"fmt"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/node"
"github.com/duanhf2012/origin/rpc"
"github.com/duanhf2012/origin/service"
"time"
)
func init() {
node.Setup(&RankService{})
}
const PreMapRankSkipLen = 10
const ManualAddRankSkip = "RPC_ManualAddRankSkip"
const UpsetRank = "RPC_UpsetRank"
const DeleteRankDataByKey = "RPC_DeleteRankDataByKey"
const FindRankDataByKey = "RPC_FindRankDataByKey"
const FindRankDataByPos = "RPC_FindRankDataByPos"
const FindRankDataListStartTo = "RPC_FindRankDataListStartTo"
type RankService struct {
service.Service
@@ -68,7 +58,7 @@ func (rs *RankService) RPC_ManualAddRankSkip(addInfo *rpc.AddRankList, addResult
return fmt.Errorf("RPC_AddRankSkip must has rank id")
}
newSkip := NewRankSkip(addRankListData.IsDec, transformLevel(addRankListData.SkipListLevel), addRankListData.MaxRank)
newSkip := NewRankSkip(addRankListData.IsDec, transformLevel(addRankListData.SkipListLevel), addRankListData.MaxRank,time.Duration(addRankListData.ExpireMs)*time.Millisecond)
rs.mapRankSkip[addRankListData.RankId] = newSkip
addList = append(addList, addRankListData.RankId)
}
@@ -91,7 +81,7 @@ func (rs *RankService) RPC_UpsetRank(upsetInfo *rpc.UpsetRankData, upsetResult *
return nil
}
// RPC_DeleteRankDataByKey 按排行的key进行删除
// RPC_DeleteRankDataByKey 按key从排行榜中进行删除
func (rs *RankService) RPC_DeleteRankDataByKey(delInfo *rpc.DeleteByKey, delResult *rpc.RankResult) error {
rankSkip, ok := rs.mapRankSkip[delInfo.RankId]
if ok == false || rankSkip == nil {
@@ -107,49 +97,49 @@ func (rs *RankService) RPC_DeleteRankDataByKey(delInfo *rpc.DeleteByKey, delResu
return nil
}
// RPC_FindRankDataByKey 按key查找
// RPC_FindRankDataByKey 按key查找,返回对应的排行名次信息
func (rs *RankService) RPC_FindRankDataByKey(findInfo *rpc.FindRankDataByKey, findResult *rpc.RankPosData) error {
rankObj, ok := rs.mapRankSkip[findInfo.RankId]
if ok == false || rankObj == nil {
return fmt.Errorf("RPC_FindRankDataByKey[", findInfo.RankId, "] no this rank type")
}
findRankData, rankPos := rankObj.GetRankNodeData(findInfo.Key)
findRankData, rank := rankObj.GetRankNodeData(findInfo.Key)
if findRankData != nil {
findResult.Data = findRankData.Data
findResult.Key = findRankData.Key
findResult.SortData = findRankData.SortData
findResult.RankPos = rankPos
findResult.Rank = rank
}
return nil
}
// RPC_FindRankDataByPos 按pos查找
func (rs *RankService) RPC_FindRankDataByPos(findInfo *rpc.FindRankDataByPos, findResult *rpc.RankPosData) error {
// RPC_FindRankDataByRank 按pos查找
func (rs *RankService) RPC_FindRankDataByRank(findInfo *rpc.FindRankDataByRank, findResult *rpc.RankPosData) error {
rankObj, ok := rs.mapRankSkip[findInfo.RankId]
if ok == false || rankObj == nil {
return fmt.Errorf("RPC_FindRankDataByKey[", findInfo.RankId, "] no this rank type")
}
findRankData, rankPos := rankObj.GetRankNodeDataByPos(findInfo.Pos)
findRankData, rankPos := rankObj.GetRankNodeDataByRank(findInfo.Rank)
if findRankData != nil {
findResult.Data = findRankData.Data
findResult.Key = findRankData.Key
findResult.SortData = findRankData.SortData
findResult.RankPos = rankPos
findResult.Rank = rankPos
}
return nil
}
// RPC_FindRankDataListStartTo 按pos查找,start开始count个排行数据
func (rs *RankService) RPC_FindRankDataListStartTo(findInfo *rpc.FindRankDataListStartTo, findResult *rpc.RankDataList) error {
// RPC_FindRankDataListStartRank查找,从StartRank开始count个排行数据
func (rs *RankService) RPC_FindRankDataList(findInfo *rpc.FindRankDataList, findResult *rpc.RankDataList) error {
rankObj, ok := rs.mapRankSkip[findInfo.RankId]
if ok == false || rankObj == nil {
return fmt.Errorf("RPC_FindRankDataListStartTo[", findInfo.RankId, "] no this rank type")
}
findResult.RankDataCount = rankObj.GetRankLen()
return rankObj.GetRankDataFromToLimit(findInfo.StartPos, findInfo.Count, findResult)
return rankObj.GetRankDataFromToLimit(findInfo.StartRank, findInfo.Count, findResult)
}
func (rs *RankService) deleteRankList(delIdList []uint64) {
@@ -187,9 +177,14 @@ func (rs *RankService) dealCfg() error {
level, _ := mapCfg["SkipListLevel"].(float64)
isDec, _ := mapCfg["IsDec"].(bool)
maxRank, _ := mapCfg["MaxRank"].(float64)
expireMs, _ := mapCfg["ExpireMs"].(float64)
newSkip := NewRankSkip(isDec, transformLevel(int32(level)), uint64(maxRank))
newSkip := NewRankSkip(isDec, transformLevel(int32(level)), uint64(maxRank),time.Duration(expireMs)*time.Millisecond)
rs.mapRankSkip[uint64(rankId)] = newSkip
}
return nil
}

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/duanhf2012/origin/rpc"
"github.com/duanhf2012/origin/util/algorithms/skip"
"time"
)
type RankSkip struct {
@@ -12,19 +13,46 @@ type RankSkip struct {
skipList *skip.SkipList //跳表
mapRankData map[uint64]*RankData //排行数据map
maxLen uint64 //排行数据长度
expireMs time.Duration //有效时间
rankModule IRankModule
rankDataExpire rankDataHeap
}
const MaxPickExpireNum = 2
const (
RankDataNone RankDataChangeType = 0
RankDataAdd RankDataChangeType = 1 //数据插入
RankDataUpdate RankDataChangeType = 2 //数据更新
RankDataDelete RankDataChangeType = 3 //数据删除
)
// NewRankSkip 创建排行榜
func NewRankSkip(isDes bool, level interface{}, maxLen uint64) *RankSkip {
ret := &RankSkip{}
func NewRankSkip(isDes bool, level interface{}, maxLen uint64,expireMs time.Duration) *RankSkip {
rs := &RankSkip{}
ret.isDes = isDes
ret.skipList = skip.New(level)
ret.mapRankData = make(map[uint64]*RankData, 10240)
ret.maxLen = maxLen
rs.isDes = isDes
rs.skipList = skip.New(level)
rs.mapRankData = make(map[uint64]*RankData, 10240)
rs.maxLen = maxLen
rs.expireMs = expireMs
rs.rankDataExpire.Init(int32(maxLen),expireMs)
return ret
return rs
}
func (rs *RankSkip) pickExpireKey(){
if rs.expireMs == 0 {
return
}
for i:=1;i<MaxPickExpireNum;i++{
key := rs.rankDataExpire.PopExpireKey()
if key == 0 {
return
}
rs.DeleteRankData([]uint64{key})
}
}
func (rs *RankSkip) SetupRankModule(rankModule IRankModule) {
@@ -68,6 +96,8 @@ func (rs *RankSkip) UpsetRank(upsetRankData []*rpc.RankData) (addCount int32, mo
addCount = int32(len(addList))
modifyCount = int32(len(updateList))
rs.pickExpireKey()
return
}
@@ -90,13 +120,18 @@ func (rs *RankSkip) upsetRank(upsetData *rpc.RankData) (*RankData, RankDataChang
newRankData := NewRankData(rs.isDes, upsetData)
rs.skipList.Insert(newRankData)
rs.mapRankData[upsetData.Key] = newRankData
//刷新有效期
rs.rankDataExpire.PushOrRefreshExpireKey(upsetData.Key)
return newRankData, RankDataUpdate
}
if rs.checkCanInsert(upsetData) {
if rs.checkInsertAndReplace(upsetData) {
newRankData := NewRankData(rs.isDes, upsetData)
rs.skipList.Insert(newRankData)
rs.mapRankData[upsetData.Key] = newRankData
rs.rankDataExpire.PushOrRefreshExpireKey(upsetData.Key)
return newRankData, RankDataAdd
}
@@ -120,10 +155,12 @@ func (rs *RankSkip) DeleteRankData(delKeys []uint64) int32 {
//从排行榜中删除
for _, rankData := range removeRankData {
rs.skipList.Delete(rankData)
ReleaseRankData(rankData)
delete(rs.mapRankData, rankData.Key)
rs.rankDataExpire.RemoveExpireKey(rankData.Key)
ReleaseRankData(rankData)
}
rs.pickExpireKey()
return int32(len(removeRankData))
}
@@ -139,13 +176,13 @@ func (rs *RankSkip) GetRankNodeData(findKey uint64) (*RankData, uint64) {
}
// GetRankNodeDataByPos 获取,返回排名节点与名次
func (rs *RankSkip) GetRankNodeDataByPos(pos uint64) (*RankData, uint64) {
rankNode := rs.skipList.ByPosition(pos)
func (rs *RankSkip) GetRankNodeDataByRank(rank uint64) (*RankData, uint64) {
rankNode := rs.skipList.ByPosition(rank)
if rankNode == nil {
return nil, 0
}
return rankNode.(*RankData), pos
return rankNode.(*RankData), rank
}
// GetRankKeyPrevToLimit 获取key前count名的数据
@@ -166,7 +203,7 @@ func (rs *RankSkip) GetRankKeyPrevToLimit(findKey, count uint64, result *rpc.Ran
rankData := iter.Value().(*RankData)
result.RankPosDataList = append(result.RankPosDataList, &rpc.RankPosData{
Key: rankData.Key,
RankPos: rankPos - iterCount,
Rank: rankPos - iterCount,
SortData: rankData.SortData,
Data: rankData.Data,
})
@@ -194,7 +231,7 @@ func (rs *RankSkip) GetRankKeyNextToLimit(findKey, count uint64, result *rpc.Ran
rankData := iter.Value().(*RankData)
result.RankPosDataList = append(result.RankPosDataList, &rpc.RankPosData{
Key: rankData.Key,
RankPos: rankPos + iterCount,
Rank: rankPos + iterCount,
SortData: rankData.SortData,
Data: rankData.Data,
})
@@ -220,7 +257,7 @@ func (rs *RankSkip) GetRankDataFromToLimit(startPos, count uint64, result *rpc.R
rankData := iter.Value().(*RankData)
result.RankPosDataList = append(result.RankPosDataList, &rpc.RankPosData{
Key: rankData.Key,
RankPos: iterCount + startPos,
Rank: iterCount + startPos,
SortData: rankData.SortData,
Data: rankData.Data,
})
@@ -231,7 +268,7 @@ func (rs *RankSkip) GetRankDataFromToLimit(startPos, count uint64, result *rpc.R
}
// checkCanInsert 检查是否能插入
func (rs *RankSkip) checkCanInsert(upsetData *rpc.RankData) bool {
func (rs *RankSkip) checkInsertAndReplace(upsetData *rpc.RankData) bool {
//maxLen为0不限制长度
if rs.maxLen == 0 {
return true
@@ -254,9 +291,11 @@ func (rs *RankSkip) checkCanInsert(upsetData *rpc.RankData) bool {
//移除最后一位
//回调模块该RandData从排行中删除
rs.rankDataExpire.RemoveExpireKey(lastRankData.Key)
rs.rankModule.OnLeaveRank(rs, []*RankData{lastRankData})
rs.skipList.Delete(lastPosData)
delete(rs.mapRankData, lastRankData.Key)
ReleaseRankData(lastRankData)
return true
}