mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-03 22:45:13 +08:00
1.优化网络库
2.rpc协议在宕机时不会导致整个rpc无法响应
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
type TcpSocketClient struct {
|
||||
conn net.Conn
|
||||
addr string
|
||||
}
|
||||
|
||||
func (slf *TcpSocketClient) Connect(addr string) error{
|
||||
@@ -22,13 +23,17 @@ func (slf *TcpSocketClient) Connect(addr string) error{
|
||||
return err
|
||||
}
|
||||
slf.conn = conn
|
||||
|
||||
slf.addr = addr
|
||||
|
||||
//
|
||||
return nil
|
||||
}
|
||||
|
||||
func (slf *TcpSocketClient) SendMsg(packtype uint16,message proto.Message) error{
|
||||
if slf.conn == nil {
|
||||
return fmt.Errorf("cannt connect %s",slf.addr)
|
||||
}
|
||||
|
||||
var msg MsgBasePack
|
||||
data,err := proto.Marshal(message)
|
||||
if err != nil {
|
||||
|
||||
@@ -247,7 +247,12 @@ func (slf *SClient) onsend(){
|
||||
}
|
||||
|
||||
pPackData := pack.(*MsgBasePack)
|
||||
slf.conn.Write(pPackData.Bytes())
|
||||
_,e := slf.conn.Write(pPackData.Bytes())
|
||||
if e!=nil {
|
||||
service.GetLogger().Printf(service.LEVER_DEBUG, "clent id %d write error...",slf.id)
|
||||
return
|
||||
}
|
||||
//fmt.Print("xxxxxxxxxxxxxxx:",n,e)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -143,7 +143,7 @@ func NewOriginNode() *COriginNode {
|
||||
|
||||
//安装系统服务
|
||||
syslogservice := &sysservice.LogService{}
|
||||
syslogservice.InitLog("syslog", fmt.Sprintf("syslog_%d", CurrentNodeId), sysmodule.LEVER_INFO)
|
||||
syslogservice.InitLog("syslog", fmt.Sprintf("syslog_%d", CurrentNodeId), sysmodule.LEVER_DEBUG)
|
||||
service.InstanceServiceMgr().Setup(syslogservice)
|
||||
|
||||
//初始化集群对象
|
||||
|
||||
@@ -324,7 +324,7 @@ func (server *Server) register(rcvr interface{}, name string, prefix string, use
|
||||
_, ok := server.mapCallQueue[sname]
|
||||
if ok == false {
|
||||
server.mapCallQueue[sname] = make(chan *CQueueRpcData, 10240)
|
||||
util.Go(server.ProcessQueue, sname)
|
||||
util.GoRecover(server.ProcessQueue,-1, sname)
|
||||
}
|
||||
if useName {
|
||||
sname = name
|
||||
@@ -612,7 +612,8 @@ func (server *Server) ServeCodec(codec ServerCodec) {
|
||||
wg.Add(1)
|
||||
//queueMode
|
||||
//fmt.Print(queueMode)
|
||||
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
|
||||
util.Go(service.call,server, sending, wg, mtype, req, argv, replyv, codec)
|
||||
//go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
|
||||
}
|
||||
// We've seen that there are no more requests.
|
||||
// Wait for responses to be sent before closing codec.
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
func F(callback interface{}, args ...interface{}) {
|
||||
func F(callback interface{},recoverNum int, args ...interface{}) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
var coreInfo string
|
||||
@@ -17,6 +17,11 @@ func F(callback interface{}, args ...interface{}) {
|
||||
} else {
|
||||
fmt.Print(coreInfo)
|
||||
}
|
||||
|
||||
if recoverNum==-1 ||recoverNum-1 >= 0 {
|
||||
recoverNum -= 1
|
||||
go F(callback,recoverNum, args...)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -33,5 +38,10 @@ func F(callback interface{}, args ...interface{}) {
|
||||
}
|
||||
|
||||
func Go(callback interface{}, args ...interface{}) {
|
||||
go F(callback, args...)
|
||||
go F(callback,0, args...)
|
||||
}
|
||||
|
||||
//-1表示一直恢复
|
||||
func GoRecover(callback interface{},recoverNum int, args ...interface{}) {
|
||||
go F(callback,recoverNum, args...)
|
||||
}
|
||||
Reference in New Issue
Block a user