优化性能监控与rankservice持久化

This commit is contained in:
duanhf2012
2023-05-09 14:06:17 +08:00
parent 92fdb7860c
commit b392617d6e
4 changed files with 20 additions and 17 deletions

View File

@@ -12,7 +12,7 @@ import (
"github.com/duanhf2012/origin/util/queue" "github.com/duanhf2012/origin/util/queue"
) )
var idleTimeout = 2 * time.Second var idleTimeout = int64(2 * time.Second)
const maxTaskQueueSessionId = 10000 const maxTaskQueueSessionId = 10000
type dispatch struct { type dispatch struct {
@@ -47,7 +47,7 @@ func (d *dispatch) open(minGoroutineNum int32, maxGoroutineNum int32, tasks chan
func (d *dispatch) run() { func (d *dispatch) run() {
defer d.waitDispatch.Done() defer d.waitDispatch.Done()
timeout := time.NewTimer(idleTimeout) timeout := time.NewTimer(time.Duration(atomic.LoadInt64(&idleTimeout)))
for { for {
select { select {
@@ -65,9 +65,9 @@ func (d *dispatch) run() {
case <-timeout.C: case <-timeout.C:
d.processTimer() d.processTimer()
if atomic.LoadInt32(&d.minConcurrentNum) == -1 && len(d.tasks) == 0 { if atomic.LoadInt32(&d.minConcurrentNum) == -1 && len(d.tasks) == 0 {
idleTimeout = time.Millisecond * 10 atomic.StoreInt64(&idleTimeout,int64(time.Millisecond * 10))
} }
timeout.Reset(idleTimeout) timeout.Reset(time.Duration(atomic.LoadInt64(&idleTimeout)))
} }
} }

View File

@@ -1,11 +1,12 @@
package network package network
import ( import (
"errors"
"github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/log"
"net" "net"
"sync" "sync"
"sync/atomic"
"time" "time"
"errors"
) )
type ConnSet map[net.Conn]struct{} type ConnSet map[net.Conn]struct{}
@@ -14,7 +15,7 @@ type TCPConn struct {
sync.Mutex sync.Mutex
conn net.Conn conn net.Conn
writeChan chan []byte writeChan chan []byte
closeFlag bool closeFlag int32
msgParser *MsgParser msgParser *MsgParser
} }
@@ -49,7 +50,7 @@ func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser,writeDe
conn.Close() conn.Close()
tcpConn.Lock() tcpConn.Lock()
freeChannel(tcpConn) freeChannel(tcpConn)
tcpConn.closeFlag = true atomic.StoreInt32(&tcpConn.closeFlag,1)
tcpConn.Unlock() tcpConn.Unlock()
}() }()
@@ -60,9 +61,9 @@ func (tcpConn *TCPConn) doDestroy() {
tcpConn.conn.(*net.TCPConn).SetLinger(0) tcpConn.conn.(*net.TCPConn).SetLinger(0)
tcpConn.conn.Close() tcpConn.conn.Close()
if !tcpConn.closeFlag { if atomic.LoadInt32(&tcpConn.closeFlag)==0 {
close(tcpConn.writeChan) close(tcpConn.writeChan)
tcpConn.closeFlag = true atomic.StoreInt32(&tcpConn.closeFlag,1)
} }
} }
@@ -76,12 +77,12 @@ func (tcpConn *TCPConn) Destroy() {
func (tcpConn *TCPConn) Close() { func (tcpConn *TCPConn) Close() {
tcpConn.Lock() tcpConn.Lock()
defer tcpConn.Unlock() defer tcpConn.Unlock()
if tcpConn.closeFlag { if atomic.LoadInt32(&tcpConn.closeFlag)==1 {
return return
} }
tcpConn.doWrite(nil) tcpConn.doWrite(nil)
tcpConn.closeFlag = true atomic.StoreInt32(&tcpConn.closeFlag,1)
} }
func (tcpConn *TCPConn) GetRemoteIp() string { func (tcpConn *TCPConn) GetRemoteIp() string {
@@ -104,7 +105,7 @@ func (tcpConn *TCPConn) doWrite(b []byte) error{
func (tcpConn *TCPConn) Write(b []byte) error{ func (tcpConn *TCPConn) Write(b []byte) error{
tcpConn.Lock() tcpConn.Lock()
defer tcpConn.Unlock() defer tcpConn.Unlock()
if tcpConn.closeFlag || b == nil { if atomic.LoadInt32(&tcpConn.closeFlag)==1 || b == nil {
tcpConn.ReleaseReadMsg(b) tcpConn.ReleaseReadMsg(b)
return errors.New("conn is close") return errors.New("conn is close")
} }
@@ -133,14 +134,14 @@ func (tcpConn *TCPConn) ReleaseReadMsg(byteBuff []byte){
} }
func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error { func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error {
if tcpConn.closeFlag == true { if atomic.LoadInt32(&tcpConn.closeFlag) == 1 {
return errors.New("conn is close") return errors.New("conn is close")
} }
return tcpConn.msgParser.Write(tcpConn, args...) return tcpConn.msgParser.Write(tcpConn, args...)
} }
func (tcpConn *TCPConn) WriteRawMsg(args []byte) error { func (tcpConn *TCPConn) WriteRawMsg(args []byte) error {
if tcpConn.closeFlag == true { if atomic.LoadInt32(&tcpConn.closeFlag) == 1 {
return errors.New("conn is close") return errors.New("conn is close")
} }
@@ -149,7 +150,7 @@ func (tcpConn *TCPConn) WriteRawMsg(args []byte) error {
func (tcpConn *TCPConn) IsConnected() bool { func (tcpConn *TCPConn) IsConnected() bool {
return tcpConn.closeFlag == false return atomic.LoadInt32(&tcpConn.closeFlag) == 0
} }
func (tcpConn *TCPConn) SetReadDeadline(d time.Duration) { func (tcpConn *TCPConn) SetReadDeadline(d time.Duration) {

View File

@@ -193,9 +193,11 @@ func Report() {
record = prof.record record = prof.record
prof.record = list.New() prof.record = list.New()
callNum := prof.callNum
totalCostTime := prof.totalCostTime
prof.stackLocker.RUnlock() prof.stackLocker.RUnlock()
DefaultReportFunction(name,prof.callNum,prof.totalCostTime,record) DefaultReportFunction(name,callNum,totalCostTime,record)
} }
} }

View File

@@ -71,6 +71,7 @@ func (mp *MongoPersist) OnInit() error {
} }
//开启协程 //开启协程
mp.waitGroup.Add(1)
go mp.persistCoroutine() go mp.persistCoroutine()
return nil return nil
@@ -261,7 +262,6 @@ func (mp *MongoPersist) JugeTimeoutSave() bool{
} }
func (mp *MongoPersist) persistCoroutine(){ func (mp *MongoPersist) persistCoroutine(){
mp.waitGroup.Add(1)
defer mp.waitGroup.Done() defer mp.waitGroup.Done()
for atomic.LoadInt32(&mp.stop)==0 || mp.hasPersistData(){ for atomic.LoadInt32(&mp.stop)==0 || mp.hasPersistData(){
//间隔时间sleep //间隔时间sleep