diff --git a/concurrent/dispatch.go b/concurrent/dispatch.go index 64935a4..0f29c18 100644 --- a/concurrent/dispatch.go +++ b/concurrent/dispatch.go @@ -12,7 +12,7 @@ import ( "github.com/duanhf2012/origin/util/queue" ) -var idleTimeout = 2 * time.Second +var idleTimeout = int64(2 * time.Second) const maxTaskQueueSessionId = 10000 type dispatch struct { @@ -47,7 +47,7 @@ func (d *dispatch) open(minGoroutineNum int32, maxGoroutineNum int32, tasks chan func (d *dispatch) run() { defer d.waitDispatch.Done() - timeout := time.NewTimer(idleTimeout) + timeout := time.NewTimer(time.Duration(atomic.LoadInt64(&idleTimeout))) for { select { @@ -65,9 +65,9 @@ func (d *dispatch) run() { case <-timeout.C: d.processTimer() if atomic.LoadInt32(&d.minConcurrentNum) == -1 && len(d.tasks) == 0 { - idleTimeout = time.Millisecond * 10 + atomic.StoreInt64(&idleTimeout,int64(time.Millisecond * 10)) } - timeout.Reset(idleTimeout) + timeout.Reset(time.Duration(atomic.LoadInt64(&idleTimeout))) } } diff --git a/network/tcp_conn.go b/network/tcp_conn.go index 0ebb9b7..b309b0d 100644 --- a/network/tcp_conn.go +++ b/network/tcp_conn.go @@ -1,11 +1,12 @@ package network import ( + "errors" "github.com/duanhf2012/origin/log" "net" "sync" + "sync/atomic" "time" - "errors" ) type ConnSet map[net.Conn]struct{} @@ -14,7 +15,7 @@ type TCPConn struct { sync.Mutex conn net.Conn writeChan chan []byte - closeFlag bool + closeFlag int32 msgParser *MsgParser } @@ -49,7 +50,7 @@ func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser,writeDe conn.Close() tcpConn.Lock() freeChannel(tcpConn) - tcpConn.closeFlag = true + atomic.StoreInt32(&tcpConn.closeFlag,1) tcpConn.Unlock() }() @@ -60,9 +61,9 @@ func (tcpConn *TCPConn) doDestroy() { tcpConn.conn.(*net.TCPConn).SetLinger(0) tcpConn.conn.Close() - if !tcpConn.closeFlag { + if atomic.LoadInt32(&tcpConn.closeFlag)==0 { close(tcpConn.writeChan) - tcpConn.closeFlag = true + atomic.StoreInt32(&tcpConn.closeFlag,1) } } @@ -76,12 +77,12 @@ func (tcpConn *TCPConn) Destroy() { func (tcpConn *TCPConn) Close() { tcpConn.Lock() defer tcpConn.Unlock() - if tcpConn.closeFlag { + if atomic.LoadInt32(&tcpConn.closeFlag)==1 { return } tcpConn.doWrite(nil) - tcpConn.closeFlag = true + atomic.StoreInt32(&tcpConn.closeFlag,1) } func (tcpConn *TCPConn) GetRemoteIp() string { @@ -104,7 +105,7 @@ func (tcpConn *TCPConn) doWrite(b []byte) error{ func (tcpConn *TCPConn) Write(b []byte) error{ tcpConn.Lock() defer tcpConn.Unlock() - if tcpConn.closeFlag || b == nil { + if atomic.LoadInt32(&tcpConn.closeFlag)==1 || b == nil { tcpConn.ReleaseReadMsg(b) return errors.New("conn is close") } @@ -133,14 +134,14 @@ func (tcpConn *TCPConn) ReleaseReadMsg(byteBuff []byte){ } func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error { - if tcpConn.closeFlag == true { + if atomic.LoadInt32(&tcpConn.closeFlag) == 1 { return errors.New("conn is close") } return tcpConn.msgParser.Write(tcpConn, args...) } func (tcpConn *TCPConn) WriteRawMsg(args []byte) error { - if tcpConn.closeFlag == true { + if atomic.LoadInt32(&tcpConn.closeFlag) == 1 { return errors.New("conn is close") } @@ -149,7 +150,7 @@ func (tcpConn *TCPConn) WriteRawMsg(args []byte) error { func (tcpConn *TCPConn) IsConnected() bool { - return tcpConn.closeFlag == false + return atomic.LoadInt32(&tcpConn.closeFlag) == 0 } func (tcpConn *TCPConn) SetReadDeadline(d time.Duration) { diff --git a/profiler/profiler.go b/profiler/profiler.go index 15d847b..e110a8c 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -193,9 +193,11 @@ func Report() { record = prof.record prof.record = list.New() + callNum := prof.callNum + totalCostTime := prof.totalCostTime prof.stackLocker.RUnlock() - DefaultReportFunction(name,prof.callNum,prof.totalCostTime,record) + DefaultReportFunction(name,callNum,totalCostTime,record) } } diff --git a/sysservice/rankservice/MongodbPersist.go b/sysservice/rankservice/MongodbPersist.go index bb62e21..8947c76 100644 --- a/sysservice/rankservice/MongodbPersist.go +++ b/sysservice/rankservice/MongodbPersist.go @@ -71,6 +71,7 @@ func (mp *MongoPersist) OnInit() error { } //开启协程 + mp.waitGroup.Add(1) go mp.persistCoroutine() return nil @@ -261,7 +262,6 @@ func (mp *MongoPersist) JugeTimeoutSave() bool{ } func (mp *MongoPersist) persistCoroutine(){ - mp.waitGroup.Add(1) defer mp.waitGroup.Done() for atomic.LoadInt32(&mp.stop)==0 || mp.hasPersistData(){ //间隔时间sleep