优化RPC

This commit is contained in:
orgin
2022-10-27 15:38:28 +08:00
parent 1eab31209c
commit fd364cf579
5 changed files with 34 additions and 24 deletions

View File

@@ -56,7 +56,7 @@ cluster.json如下
"ListenAddr":"127.0.0.1:8001", "ListenAddr":"127.0.0.1:8001",
"MaxRpcParamLen": 409600, "MaxRpcParamLen": 409600,
"NodeName": "Node_Test1", "NodeName": "Node_Test1",
"remark":"//以_打头的表示只在本机进程不对整个子网开", "remark":"//以_打头的表示只在本机进程不对整个子网开",
"ServiceList": ["TestService1","TestService2","TestServiceCall","GateService","_TcpService","HttpService","WSService"] "ServiceList": ["TestService1","TestService2","TestServiceCall","GateService","_TcpService","HttpService","WSService"]
}, },
{ {
@@ -65,7 +65,7 @@ cluster.json如下
"ListenAddr":"127.0.0.1:8002", "ListenAddr":"127.0.0.1:8002",
"MaxRpcParamLen": 409600, "MaxRpcParamLen": 409600,
"NodeName": "Node_Test1", "NodeName": "Node_Test1",
"remark":"//以_打头的表示只在本机进程不对整个子网开", "remark":"//以_打头的表示只在本机进程不对整个子网开",
"ServiceList": ["TestService1","TestService2","TestServiceCall","GateService","TcpService","HttpService","WSService"] "ServiceList": ["TestService1","TestService2","TestServiceCall","GateService","TcpService","HttpService","WSService"]
} }
] ]

View File

@@ -56,11 +56,11 @@ func (client *TCPClient) init() {
} }
if client.ReadDeadline == 0 { if client.ReadDeadline == 0 {
client.ReadDeadline = 15*time.Second client.ReadDeadline = 15*time.Second
log.SRelease("invalid ReadDeadline, reset to ", client.ReadDeadline,"s") log.SRelease("invalid ReadDeadline, reset to ", int64(client.ReadDeadline.Seconds()),"s")
} }
if client.WriteDeadline == 0 { if client.WriteDeadline == 0 {
client.WriteDeadline = 15*time.Second client.WriteDeadline = 15*time.Second
log.SRelease("invalid WriteDeadline, reset to ", client.WriteDeadline,"s") log.SRelease("invalid WriteDeadline, reset to ", int64(client.WriteDeadline.Seconds()),"s")
} }
if client.NewAgent == nil { if client.NewAgent == nil {
log.SFatal("NewAgent must not be nil") log.SFatal("NewAgent must not be nil")
@@ -79,6 +79,13 @@ func (client *TCPClient) init() {
client.msgParser = msgParser client.msgParser = msgParser
} }
func (client *TCPClient) GetCloseFlag() bool{
client.Lock()
defer client.Unlock()
return client.closeFlag
}
func (client *TCPClient) dial() net.Conn { func (client *TCPClient) dial() net.Conn {
for { for {
conn, err := net.Dial("tcp", client.Addr) conn, err := net.Dial("tcp", client.Addr)

View File

@@ -7,8 +7,8 @@ import (
"time" "time"
) )
const Default_ReadDeadline = 30 //30s const Default_ReadDeadline = time.Second*30 //30s
const Default_WriteDeadline = 30 //30s const Default_WriteDeadline = time.Second*30 //30s
const Default_MaxConnNum = 3000 const Default_MaxConnNum = 3000
const Default_PendingWriteNum = 10000 const Default_PendingWriteNum = 10000
const Default_LittleEndian = false const Default_LittleEndian = false
@@ -70,11 +70,11 @@ func (server *TCPServer) init() {
} }
if server.WriteDeadline == 0 { if server.WriteDeadline == 0 {
server.WriteDeadline = time.Second*Default_WriteDeadline server.WriteDeadline = Default_WriteDeadline
log.SRelease("invalid WriteDeadline, reset to ", server.WriteDeadline.Seconds(),"s") log.SRelease("invalid WriteDeadline, reset to ", server.WriteDeadline.Seconds(),"s")
} }
if server.ReadDeadline == 0 { if server.ReadDeadline == 0 {
server.ReadDeadline = time.Second*Default_ReadDeadline server.ReadDeadline = Default_ReadDeadline
log.SRelease("invalid ReadDeadline, reset to ", server.ReadDeadline.Seconds(),"s") log.SRelease("invalid ReadDeadline, reset to ", server.ReadDeadline.Seconds(),"s")
} }

View File

@@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/network" "github.com/duanhf2012/origin/network"
"github.com/duanhf2012/origin/util/timer"
"math" "math"
"reflect" "reflect"
"runtime" "runtime"
@@ -32,6 +31,9 @@ type Client struct {
TriggerRpcEvent TriggerRpcEvent
} }
const MaxCheckCallRpcCount = 1000
const MaxPendingWriteNum = 200000
const ConnectInterval = 2*time.Second
var clientSeq uint32 var clientSeq uint32
func (client *Client) NewClientAgent(conn *network.TCPConn) network.Agent { func (client *Client) NewClientAgent(conn *network.TCPConn) network.Agent {
@@ -41,18 +43,23 @@ func (client *Client) NewClientAgent(conn *network.TCPConn) network.Agent {
return client return client
} }
func (client *Client) Connect(id int, addr string, maxRpcParamLen uint32) error { func (client *Client) Connect(id int, addr string, maxRpcParamLen uint32) error {
client.clientSeq = atomic.AddUint32(&clientSeq, 1) client.clientSeq = atomic.AddUint32(&clientSeq, 1)
client.id = id client.id = id
client.Addr = addr client.Addr = addr
client.maxCheckCallRpcCount = 1000 client.maxCheckCallRpcCount = MaxCheckCallRpcCount
client.callRpcTimeout = 15 * time.Second client.callRpcTimeout = 15 * time.Second
client.ConnNum = 1 client.ConnectInterval = ConnectInterval
client.ConnectInterval = time.Second * 2 client.PendingWriteNum = MaxPendingWriteNum
client.PendingWriteNum = 200000
client.AutoReconnect = true client.AutoReconnect = true
client.ConnNum = 1
client.LenMsgLen = 4 client.LenMsgLen = 4
client.MinMsgLen = 2 client.MinMsgLen = 2
client.ReadDeadline = Default_ReadWriteDeadline
client.WriteDeadline = Default_ReadWriteDeadline
if maxRpcParamLen > 0 { if maxRpcParamLen > 0 {
client.MaxMsgLen = maxRpcParamLen client.MaxMsgLen = maxRpcParamLen
} else { } else {
@@ -73,17 +80,13 @@ func (client *Client) Connect(id int, addr string, maxRpcParamLen uint32) error
} }
func (client *Client) startCheckRpcCallTimer() { func (client *Client) startCheckRpcCallTimer() {
t := timer.NewTimer(5 * time.Second)
for { for {
select { time.Sleep(5 * time.Second)
case cTimer := <-t.C: if client.GetCloseFlag() == true {
cTimer.SetupTimer(time.Now()) break
client.checkRpcCallTimeout()
} }
client.checkRpcCallTimeout()
} }
t.Cancel()
timer.ReleaseTimer(t)
} }
func (client *Client) makeCallFail(call *Call) { func (client *Client) makeCallFail(call *Call) {

View File

@@ -63,7 +63,7 @@ func (server *Server) Init(rpcHandleFinder RpcHandleFinder) {
server.rpcServer = &network.TCPServer{} server.rpcServer = &network.TCPServer{}
} }
const Default_ReadWriteDeadline = 10*time.Second const Default_ReadWriteDeadline = 15*time.Second
func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) { func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) {
splitAddr := strings.Split(listenAddr, ":") splitAddr := strings.Split(listenAddr, ":")
@@ -84,8 +84,8 @@ func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) {
server.rpcServer.PendingWriteNum = 2000000 server.rpcServer.PendingWriteNum = 2000000
server.rpcServer.NewAgent = server.NewAgent server.rpcServer.NewAgent = server.NewAgent
server.rpcServer.LittleEndian = LittleEndian server.rpcServer.LittleEndian = LittleEndian
server.rpcServer.WriteDeadline = network.Default_WriteDeadline server.rpcServer.WriteDeadline = Default_ReadWriteDeadline
server.rpcServer.ReadDeadline = network.Default_WriteDeadline server.rpcServer.ReadDeadline = Default_ReadWriteDeadline
server.rpcServer.Start() server.rpcServer.Start()
} }