增加关闭socket function

This commit is contained in:
duanhf2012
2020-03-31 14:39:30 +08:00
parent f410afbd58
commit 5ae1e683c9
2 changed files with 55 additions and 25 deletions

View File

@@ -222,42 +222,52 @@ func (slf *RpcHandler) CallMethod(ServiceMethod string,param interface{},reply i
return err return err
} }
func (slf *RpcHandler) goRpc(nodeId int,serviceMethod string,mutiCoroutine bool,args interface{}) error { func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,mutiCoroutine bool,args interface{}) error {
pClientList,err := slf.funcRpcClient(nodeId,serviceMethod) pClientList,err := slf.funcRpcClient(nodeId,serviceMethod)
if err != nil { if err != nil {
log.Error("Call serviceMethod is error:%+v!",err) log.Error("Call serviceMethod is error:%+v!",err)
return err return err
} }
if len(pClientList) > 1 { if len(pClientList) > 1 && bCast == false{
log.Error("Cannot call more then 1 node!") log.Error("Cannot call more then 1 node!")
return fmt.Errorf("Cannot call more then 1 node!") return fmt.Errorf("Cannot call more then 1 node!")
} }
//2.rpcclient调用 //2.rpcclient调用
//如果调用本结点服务 //如果调用本结点服务
pClient := pClientList[0] for _,pClient := range pClientList {
if pClient.blocalhost == true { if pClient.blocalhost == true {
pLocalRpcServer:=slf.funcRpcServer() pLocalRpcServer:=slf.funcRpcServer()
//判断是否是同一服务 //判断是否是同一服务
sMethod := strings.Split(serviceMethod,".") sMethod := strings.Split(serviceMethod,".")
if len(sMethod)!=2 { if len(sMethod)!=2 {
err := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod) serr := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod)
log.Error("%+v",err) log.Error("%+v",serr)
return err if serr!= nil {
err = serr
}
continue
}
//调用自己rpcHandler处理器
if sMethod[0] == slf.rpcHandler.GetName() { //自己服务调用
//
return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil)
}
//其他的rpcHandler的处理器
pCall := pLocalRpcServer.rpcHandlerGo(true,mutiCoroutine,sMethod[0],sMethod[1],args,nil)
if pCall.Err!=nil {
err = pCall.Err
}
} }
//调用自己rpcHandler处理器
if sMethod[0] == slf.rpcHandler.GetName() { //自己服务调用 //跨node调用
// pCall := pClient.Go(true,mutiCoroutine,serviceMethod,args,nil)
return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil) if pCall.Err!=nil {
err = pCall.Err
} }
//其他的rpcHandler的处理器
pCall := pLocalRpcServer.rpcHandlerGo(true,mutiCoroutine,sMethod[0],sMethod[1],args,nil)
return pCall.Err
} }
//跨node调用 return err
pCall := pClient.Go(true,mutiCoroutine,serviceMethod,args,nil)
return pCall.Err
} }
func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,mutiCoroutine bool,args interface{},reply interface{}) error { func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,mutiCoroutine bool,args interface{},reply interface{}) error {
@@ -379,11 +389,11 @@ func (slf *RpcHandler) GRCall(serviceMethod string,args interface{},reply interf
} }
func (slf *RpcHandler) Go(serviceMethod string,args interface{}) error { func (slf *RpcHandler) Go(serviceMethod string,args interface{}) error {
return slf.goRpc(0,serviceMethod,false,args) return slf.goRpc(false,0,serviceMethod,false,args)
} }
func (slf *RpcHandler) GRGo(serviceMethod string,args interface{}) error { func (slf *RpcHandler) GRGo(serviceMethod string,args interface{}) error {
return slf.goRpc(0,serviceMethod,true,args) return slf.goRpc(false,0,serviceMethod,true,args)
} }
@@ -397,6 +407,9 @@ func (slf *RpcHandler) CallNode(nodeId int,serviceMethod string,args interface{}
} }
func (slf *RpcHandler) GoNode(nodeId int,serviceMethod string,args interface{}) error { func (slf *RpcHandler) GoNode(nodeId int,serviceMethod string,args interface{}) error {
return slf.goRpc(nodeId,serviceMethod,false,args) return slf.goRpc(false,nodeId,serviceMethod,false,args)
} }
func (slf *RpcHandler) CastGo(serviceMethod string,args interface{}) {
slf.goRpc(true,0,serviceMethod,false,args)
}

View File

@@ -152,4 +152,21 @@ func (slf *TcpService) SendMsg(clientid uint64,msg interface{}) error{
return err return err
} }
return client.tcpConn.WriteMsg(bytes) return client.tcpConn.WriteMsg(bytes)
} }
func (slf *TcpService) Close(clientid uint64) {
//
slf.mapClientLocker.Lock()
defer slf.mapClientLocker.Unlock()
client,ok := slf.mapClient[clientid]
if ok == false{
return
}
if client.tcpConn!=nil {
client.tcpConn.Close()
}
return
}