Compare commits

...

4 Commits

Author SHA1 Message Date
duanhf2012
1cf071a444 优化日志 2024-09-20 15:17:44 +08:00
duanhf2012
e6c09064bf 美化代码 2024-09-18 16:19:20 +08:00
duanhf2012
84ab0cb84a 新增定时器组关闭接口 2024-09-18 11:51:17 +08:00
duanhf2012
22fe00173b 新增FrameTimer模块,支持暂停、恢复、加速 2024-09-18 11:35:43 +08:00
8 changed files with 416 additions and 17 deletions

View File

@@ -297,12 +297,12 @@ func GetRpcClient(nodeId string, serviceMethod string,filterRetire bool, clientL
if nodeId != rpc.NodeIdNull {
pClient,retire := GetCluster().GetRpcClient(nodeId)
if pClient == nil {
return fmt.Errorf("cannot find nodeid %d!", nodeId), nil
return fmt.Errorf("cannot find nodeid %s", nodeId), nil
}
//如果需要筛选掉退休结点
if filterRetire == true && retire == true {
return fmt.Errorf("cannot find nodeid %d!", nodeId), nil
return fmt.Errorf("cannot find nodeid %s", nodeId), nil
}
clientList = append(clientList,pClient)
@@ -466,7 +466,7 @@ func (cls *Cluster) GetNodeInfo(nodeId string) (NodeInfo,bool) {
return nodeInfo.nodeInfo,true
}
func (dc *Cluster) CanDiscoveryService(fromMasterNodeId string,serviceName string) bool{
func (cls *Cluster) CanDiscoveryService(fromMasterNodeId string,serviceName string) bool{
canDiscovery := true
splitServiceName := strings.Split(serviceName,":")
@@ -474,16 +474,16 @@ func (dc *Cluster) CanDiscoveryService(fromMasterNodeId string,serviceName strin
serviceName = splitServiceName[0]
}
for i:=0;i<len(dc.GetLocalNodeInfo().DiscoveryService);i++{
masterNodeId := dc.GetLocalNodeInfo().DiscoveryService[i].MasterNodeId
for i:=0;i<len(cls.GetLocalNodeInfo().DiscoveryService);i++{
masterNodeId := cls.GetLocalNodeInfo().DiscoveryService[i].MasterNodeId
//无效的配置,则跳过
if masterNodeId == rpc.NodeIdNull && len(dc.GetLocalNodeInfo().DiscoveryService[i].ServiceList)==0 {
if masterNodeId == rpc.NodeIdNull && len(cls.GetLocalNodeInfo().DiscoveryService[i].ServiceList)==0 {
continue
}
canDiscovery = false
if masterNodeId == fromMasterNodeId || masterNodeId == rpc.NodeIdNull {
for _,discoveryService := range dc.GetLocalNodeInfo().DiscoveryService[i].ServiceList {
for _,discoveryService := range cls.GetLocalNodeInfo().DiscoveryService[i].ServiceList {
if discoveryService == serviceName {
return true
}

View File

@@ -18,6 +18,7 @@ const (
Sys_Event_Retire EventType = -11
Sys_Event_EtcdDiscovery EventType = -12
Sys_Event_Gin_Event EventType = -13
Sys_Event_FrameTick EventType = -14
Sys_Event_User_Define EventType = 1
)

View File

@@ -328,13 +328,13 @@ func startNode(args interface{}) error {
myName, mErr := sysprocess.GetMyProcessName()
//当前进程名获取失败,不应该发生
if mErr != nil {
log.SInfo("get my process's name is error,", mErr.Error())
log.Info("get my process's name is error",log.ErrorAttr("err", mErr))
os.Exit(-1)
}
//进程id存在而且进程名也相同被认为是当前进程重复运行
if cErr == nil && name == myName {
log.SInfo(fmt.Sprintf("repeat runs are not allowed,node is %s,processid is %d",strNodeId,processId))
log.Info("repeat runs are not allowed",log.String("nodeId",strNodeId),log.Int("processId",processId))
os.Exit(-1)
}
break

View File

@@ -145,14 +145,12 @@ func DefaultReportFunction(name string,callNum int,costTime time.Duration,record
return
}
var strReport string
strReport = "Profiler report tag "+name+":\n"
var average int64
if callNum>0 {
average = costTime.Milliseconds()/int64(callNum)
}
strReport += fmt.Sprintf("process count %d,take time %d Milliseconds,average %d Milliseconds/per.\n",callNum,costTime.Milliseconds(),average)
log.Info("Profiler report tag "+name,log.Int("process count",callNum),log.Int64("take time",costTime.Milliseconds()),log.Int64("average",average))
elem := record.Front()
var strTypes string
for elem!=nil {
@@ -163,11 +161,9 @@ func DefaultReportFunction(name string,callNum int,costTime time.Duration,record
strTypes = "slow process"
}
strReport += fmt.Sprintf("%s:%s is take %d Milliseconds\n",strTypes,pRecord.RecordName,pRecord.CostTime.Milliseconds())
log.Info("Profiler report type",log.String("Types",strTypes),log.String("RecordName",pRecord.RecordName),log.Int64("take time",pRecord.CostTime.Milliseconds()))
elem = elem.Next()
}
log.SInfo("report",strReport)
}
func Report() {

View File

@@ -0,0 +1,203 @@
package frametimer
import (
"container/heap"
"context"
"errors"
"github.com/duanhf2012/origin/v2/event"
"github.com/duanhf2012/origin/v2/log"
"time"
)
type TimerCB func(context.Context, FrameTimerID)
type _timerHeap struct {
timers []*timerData
}
func (h *_timerHeap) Len() int {
return len(h.timers)
}
func (h *_timerHeap) Less(i, j int) bool {
return h.timers[i].frameNum < h.timers[j].frameNum
}
func (h *_timerHeap) Swap(i, j int) {
h.timers[i], h.timers[j] = h.timers[j], h.timers[i]
h.timers[i].idx = i
h.timers[j].idx = j
}
func (h *_timerHeap) Push(x interface{}) {
td := x.(*timerData)
h.timers = append(h.timers, td)
td.idx = len(h.timers) - 1
}
func (h *_timerHeap) Pop() (ret interface{}) {
l := len(h.timers)
h.timers, ret = h.timers[:l-1], h.timers[l-1]
return
}
type FrameGroup struct {
groupID FrameGroupID
timerHeap _timerHeap
ft *FrameTimer
preTickGlobalFrameNum FrameNumber // 上次tick全局帧
preGlobalFrameNum FrameNumber // 上次设置的全局帧,用于更新FrameTimer.mapFrameGroup关系
frameNum FrameNumber // 当前帧
pause bool // 暂停状态
multiple uint8 // 位数默认1倍只允许1-5倍数
}
func (fg *FrameGroup) init() {
fg.timerHeap.timers = make([]*timerData, 0, 512)
fg.groupID = fg.ft.genGroupID()
fg.multiple = 1
heap.Init(&fg.timerHeap)
}
func (fg *FrameGroup) convertGlobalFrameNum(frameNum FrameNumber) FrameNumber {
return fg.ft.getGlobalFrameNumber() + (frameNum-fg.frameNum)/FrameNumber(fg.multiple)
}
func (fg *FrameGroup) refreshMinFrame() {
if fg.timerHeap.Len() == 0 || fg.pause {
return
}
globalFrameNum := fg.convertGlobalFrameNum(fg.timerHeap.timers[0].frameNum)
fg.ft.refreshGroupMinFrame(fg.groupID, fg.preGlobalFrameNum, globalFrameNum)
fg.preGlobalFrameNum = globalFrameNum
}
func (fg *FrameGroup) tick(globalFrame FrameNumber) {
fg.frameNum = fg.frameNum + (globalFrame-fg.preTickGlobalFrameNum)*FrameNumber(fg.multiple)
fg.preTickGlobalFrameNum = globalFrame
fg.onceTick()
fg.refreshMinFrame()
}
func (fg *FrameGroup) onceTick() {
for fg.timerHeap.Len() > 0 {
if fg.timerHeap.timers[0].frameNum > fg.frameNum {
break
}
t := heap.Pop(&fg.timerHeap).(*timerData)
ev := event.NewEvent()
ev.Type = event.Sys_Event_FrameTick
ev.Data = t
fg.ft.NotifyEvent(ev)
fg.ft.removeTimerData(t.timerID)
if t.tickerFrameNum != 0 {
fg.addTicker(t.timerID, t.tickerFrameNum, t.ctx, t.cb)
}
}
}
func (fg *FrameGroup) addTimer(timerID FrameTimerID, frame FrameNumber, ctx context.Context, cb TimerCB) {
nextFrame := fg.frameNum + frame
td := fg.ft.addTimerData(timerID, nextFrame, 0, ctx, cb)
heap.Push(&fg.timerHeap, td)
}
func (fg *FrameGroup) addTicker(timerID FrameTimerID, frame FrameNumber, ctx context.Context, cb TimerCB) {
nextFrame := fg.frameNum + frame
td := fg.ft.addTimerData(timerID, nextFrame, frame, ctx, cb)
heap.Push(&fg.timerHeap, td)
}
// SetMultiple 设置倍数允许倍数范围1-5
func (fg *FrameGroup) SetMultiple(multiple uint8) error {
if fg.multiple == multiple {
return nil
}
if multiple < 0 || multiple > maxMultiple {
return errors.New("invalid multiplier")
}
fg.multiple = multiple
fg.refreshMinFrame()
return nil
}
// FrameAfterFunc 创建After定时器
func (fg *FrameGroup) FrameAfterFunc(timerID *FrameTimerID, d time.Duration, ctx context.Context, cb TimerCB) {
fg.ft.locker.Lock()
defer fg.ft.locker.Unlock()
frame := FrameNumber(d / fg.ft.oneFrameTime)
newTimerID := fg.ft.genTimerID()
fg.addTimer(newTimerID, frame, ctx, cb)
*timerID = newTimerID
fg.refreshMinFrame()
}
// FrameNewTicker 创建Ticker定时器
func (fg *FrameGroup) FrameNewTicker(timerID *FrameTimerID, d time.Duration, ctx context.Context, cb TimerCB) {
fg.ft.locker.Lock()
defer fg.ft.locker.Unlock()
frame := FrameNumber(d / fg.ft.oneFrameTime)
newTimerID := fg.ft.genTimerID()
fg.addTicker(newTimerID, frame, ctx, cb)
*timerID = newTimerID
fg.refreshMinFrame()
}
// Pause 暂停定时器组
func (fg *FrameGroup) Pause() {
fg.ft.locker.Lock()
defer fg.ft.locker.Unlock()
fg.pause = true
fg.ft.removeGroup(fg.groupID, fg.preGlobalFrameNum)
fg.preGlobalFrameNum = 0
}
// Resume 唤醒定时器组
func (fg *FrameGroup) Resume() {
fg.ft.locker.Lock()
defer fg.ft.locker.Unlock()
fg.pause = false
fg.refreshMinFrame()
fg.preTickGlobalFrameNum = fg.ft.globalFrameNum
}
// CancelTimer 关闭定时器
func (fg *FrameGroup) CancelTimer(timerID FrameTimerID) {
fg.ft.locker.Lock()
defer fg.ft.locker.Unlock()
td := fg.ft.getTimerData(timerID)
if td == nil {
log.Error("cannot find timer", log.Uint64("timerID", uint64(timerID)))
return
}
heap.Remove(&fg.timerHeap, td.idx)
fg.ft.removeGroup(fg.groupID, fg.preGlobalFrameNum)
fg.preGlobalFrameNum = 0
fg.refreshMinFrame()
}
func (fg *FrameGroup) Close(){
fg.ft.removeGroup(fg.groupID, fg.preGlobalFrameNum)
delete(fg.ft.mapGroup,fg.groupID)
}

View File

@@ -0,0 +1,199 @@
package frametimer
import (
"context"
"github.com/duanhf2012/origin/v2/event"
"github.com/duanhf2012/origin/v2/log"
"github.com/duanhf2012/origin/v2/service"
"sync"
"time"
)
const defaultFps = 50
const defaultSleepInterval = time.Millisecond * 3
const maxFps = 1000
const maxMultiple = 5
type FrameGroupID uint64
type FrameTimerID uint64
type FrameNumber uint64
type timerData struct {
frameNum FrameNumber
timerID FrameTimerID
idx int
cb TimerCB
tickerFrameNum FrameNumber
ctx context.Context
}
type FrameTimer struct {
service.Module
fps uint32
oneFrameTime time.Duration
ticker *time.Ticker
mapFrameGroup map[FrameNumber]map[FrameGroupID]struct{}
mapGroup map[FrameGroupID]*FrameGroup
globalFrameNum FrameNumber // 当前帧
maxTimerID FrameTimerID
maxGroupID FrameGroupID
mapTimer map[FrameTimerID]*timerData
timerDataPool *sync.Pool
locker sync.Mutex
sleepInterval time.Duration
}
func (ft *FrameTimer) getTimerData(timerID FrameTimerID) *timerData {
return ft.mapTimer[timerID]
}
func (ft *FrameTimer) addTimerData(timerID FrameTimerID, frameNum FrameNumber, tickerFrameNum FrameNumber, ctx context.Context, cb TimerCB) *timerData {
td := ft.timerDataPool.Get().(*timerData)
td.timerID = timerID
td.frameNum = frameNum
td.cb = cb
td.idx = -1
td.tickerFrameNum = tickerFrameNum
ft.mapTimer[timerID] = td
return td
}
func (ft *FrameTimer) removeTimerData(timerID FrameTimerID) {
td := ft.mapTimer[timerID]
if td == nil {
return
}
ft.timerDataPool.Put(td)
}
func (ft *FrameTimer) genGroupID() FrameGroupID {
ft.maxGroupID++
return ft.maxGroupID
}
func (ft *FrameTimer) genTimerID() FrameTimerID {
ft.maxTimerID++
return ft.maxTimerID
}
func (ft *FrameTimer) getGlobalFrameNumber() FrameNumber {
return ft.globalFrameNum
}
func (ft *FrameTimer) frameTick() {
preFrameNum := ft.globalFrameNum
ft.globalFrameNum++
ft.locker.Lock()
defer ft.locker.Unlock()
for i := preFrameNum; i <= ft.globalFrameNum; i++ {
mapGroup := ft.mapFrameGroup[i]
delete(ft.mapFrameGroup, i)
for groupID := range mapGroup {
group := ft.mapGroup[groupID]
if group == nil {
continue
}
group.tick(ft.globalFrameNum)
}
}
}
func (ft *FrameTimer) removeGroup(groupID FrameGroupID, frameNum FrameNumber) {
delete(ft.mapFrameGroup[frameNum], groupID)
}
func (ft *FrameTimer) refreshGroupMinFrame(groupID FrameGroupID, preFrameNum FrameNumber, newFrameNum FrameNumber) {
ft.removeGroup(groupID, preFrameNum)
mapGroup := ft.mapFrameGroup[newFrameNum]
if mapGroup == nil {
mapGroup = make(map[FrameGroupID]struct{}, 6)
ft.mapFrameGroup[newFrameNum] = mapGroup
}
mapGroup[groupID] = struct{}{}
}
func (ft *FrameTimer) OnInit() error {
ft.mapFrameGroup = make(map[FrameNumber]map[FrameGroupID]struct{}, 1024)
ft.mapGroup = make(map[FrameGroupID]*FrameGroup, 1024)
ft.mapTimer = make(map[FrameTimerID]*timerData, 2048)
ft.timerDataPool = &sync.Pool{
New: func() any {
return &timerData{}
},
}
if ft.fps == 0 {
ft.fps = defaultFps
}
ft.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_FrameTick, ft.GetEventHandler(), func(e event.IEvent) {
ev := e.(*event.Event)
td, ok := ev.Data.(*timerData)
if !ok {
log.Error("convert *timerData error")
return
}
td.cb(td.ctx, td.timerID)
event.DeleteEvent(e)
})
ft.oneFrameTime = time.Second / time.Duration(ft.fps)
ft.ticker = time.NewTicker(ft.oneFrameTime)
if ft.sleepInterval == 0 {
ft.sleepInterval = defaultSleepInterval
}
go func() {
preTime := time.Now()
var preFrame FrameNumber
for {
time.Sleep(ft.sleepInterval)
frameMax := FrameNumber(time.Now().Sub(preTime) / ft.oneFrameTime)
for i := preFrame + 1; i <= frameMax; i++ {
ft.frameTick()
}
preFrame = frameMax
}
}()
return nil
}
// SetFps 设置帧率越大误差越低。如果有倍数加速需求可以适当加大fps以减少误差。默认50fps
func (ft *FrameTimer) SetFps(fps uint32) {
if fps > maxFps {
fps = maxFps
}
ft.fps = fps
}
// SetAccuracyInterval 设置时间间隔精度在循环中sleep该时间进行判断。实际上因为sleep有误差所以暂时不使用fps得出。默认为3ms
func (ft *FrameTimer) SetAccuracyInterval(interval time.Duration) {
ft.sleepInterval = interval
}
// NewGroup 创建定时器组
func (ft *FrameTimer) NewGroup() *FrameGroup {
var group FrameGroup
group.ft = ft
group.init()
ft.locker.Lock()
defer ft.locker.Unlock()
ft.mapGroup[group.groupID] = &group
return &group
}

View File

@@ -90,7 +90,7 @@ func (gm *GinModule) StartTLS(certFile, keyFile string) {
func (gm *GinModule) Stop(ctx context.Context) {
if err := gm.srv.Shutdown(ctx); err != nil {
log.SError("Server Shutdown", slog.Any("error", err))
log.Error("Server Shutdown", slog.Any("error", err))
}
}

View File

@@ -44,7 +44,7 @@ func Logger() gin.HandlerFunc {
// 响应状态码
statusCode := c.Writer.Status()
log.SDebug(fmt.Sprintf(
log.Debug(fmt.Sprintf(
"%s | %3d | %s %10s | \033[44;37m%-6s\033[0m %s %s | %10v | \"%s\" \"%s\"",
colorForStatus(statusCode),
statusCode,