diff --git a/cluster/cluster.go b/cluster/cluster.go index c5d28bc..cfe6527 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -14,51 +14,54 @@ var configDir = "./config/" type SetupServiceFun func(s ...service.IService) type NodeStatus int -const( - Normal NodeStatus = 0 //正常 - Discard NodeStatus = 1 //丢弃 + +const ( + Normal NodeStatus = 0 //正常 + Discard NodeStatus = 1 //丢弃 ) type NodeInfo struct { - NodeId int - NodeName string - Private bool - ListenAddr string - ServiceList []string //所有的服务列表 + NodeId int + NodeName string + Private bool + ListenAddr string + MaxRpcParamLen uint32 //最大Rpc参数长度 + ServiceList []string //所有的服务列表 PublicServiceList []string //对外公开的服务列表 - DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选 - NeighborService []string - status NodeStatus + DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选 + NeighborService []string + status NodeStatus } type NodeRpcInfo struct { nodeInfo NodeInfo - client *rpc.Client + client *rpc.Client } var cluster Cluster + type Cluster struct { localNodeInfo NodeInfo //本结点配置信息 masterDiscoveryNodeList []NodeInfo //配置发现Master结点 - localServiceCfg map[string]interface{} //map[serviceName]配置数据* - mapRpc map[int] NodeRpcInfo //nodeId - serviceDiscovery IServiceDiscovery //服务发现接口 + localServiceCfg map[string]interface{} //map[serviceName]配置数据* + mapRpc map[int]NodeRpcInfo //nodeId + serviceDiscovery IServiceDiscovery //服务发现接口 - locker sync.RWMutex //结点与服务关系保护锁 - mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo - mapServiceNode map[string]map[int]struct{} //map[serviceName]map[NodeId] + locker sync.RWMutex //结点与服务关系保护锁 + mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo + mapServiceNode map[string]map[int]struct{} //map[serviceName]map[NodeId] - rpcServer rpc.Server - rpcEventLocker sync.RWMutex //Rpc事件监听保护锁 + rpcServer rpc.Server + rpcEventLocker sync.RWMutex //Rpc事件监听保护锁 mapServiceListenRpcEvent map[string]struct{} //ServiceName } -func GetCluster() *Cluster{ +func GetCluster() *Cluster { return &cluster } -func SetConfigDir(cfgDir string){ +func SetConfigDir(cfgDir string) { configDir = cfgDir } @@ -67,37 +70,37 @@ func SetServiceDiscovery(serviceDiscovery IServiceDiscovery) { } func (cls *Cluster) Start() { - cls.rpcServer.Start(cls.localNodeInfo.ListenAddr) + cls.rpcServer.Start(cls.localNodeInfo.ListenAddr, cls.localNodeInfo.MaxRpcParamLen) } func (cls *Cluster) Stop() { cls.serviceDiscovery.OnNodeStop() } -func (cls *Cluster) DiscardNode(nodeId int){ +func (cls *Cluster) DiscardNode(nodeId int) { cls.locker.Lock() - nodeInfo,ok := cls.mapIdNode[nodeId] + nodeInfo, ok := cls.mapIdNode[nodeId] cls.locker.Unlock() - if ok==true && nodeInfo.status == Discard { - cls.DelNode(nodeId,true) + if ok == true && nodeInfo.status == Discard { + cls.DelNode(nodeId, true) } } -func (cls *Cluster) DelNode(nodeId int,immediately bool){ +func (cls *Cluster) DelNode(nodeId int, immediately bool) { //MasterDiscover结点与本地结点不删除 - if cls.GetMasterDiscoveryNodeInfo(nodeId)!=nil || nodeId == cls.localNodeInfo.NodeId { + if cls.GetMasterDiscoveryNodeInfo(nodeId) != nil || nodeId == cls.localNodeInfo.NodeId { return } cls.locker.Lock() - nodeInfo,ok := cls.mapIdNode[nodeId] + nodeInfo, ok := cls.mapIdNode[nodeId] if ok == false { cls.locker.Unlock() return } - rpc,ok := cls.mapRpc[nodeId] - for{ + rpc, ok := cls.mapRpc[nodeId] + for { //立即删除 if immediately || ok == false { break @@ -109,49 +112,48 @@ func (cls *Cluster) DelNode(nodeId int,immediately bool){ nodeInfo.status = Discard rpc.client.Unlock() cls.locker.Unlock() - log.SRelease("Discard node ",nodeInfo.NodeId," ",nodeInfo.ListenAddr) + log.SRelease("Discard node ", nodeInfo.NodeId, " ", nodeInfo.ListenAddr) return } rpc.client.Unlock() break } - for _,serviceName := range nodeInfo.ServiceList{ - cls.delServiceNode(serviceName,nodeId) + for _, serviceName := range nodeInfo.ServiceList { + cls.delServiceNode(serviceName, nodeId) } - delete(cls.mapIdNode,nodeId) - delete(cls.mapRpc,nodeId) + delete(cls.mapIdNode, nodeId) + delete(cls.mapRpc, nodeId) cls.locker.Unlock() if ok == true { rpc.client.Close(false) } - log.SRelease("remove node ",nodeInfo.NodeId," ",nodeInfo.ListenAddr) + log.SRelease("remove node ", nodeInfo.NodeId, " ", nodeInfo.ListenAddr) } -func (cls *Cluster) serviceDiscoveryDelNode (nodeId int,immediately bool){ +func (cls *Cluster) serviceDiscoveryDelNode(nodeId int, immediately bool) { if nodeId == 0 { return } - cls.DelNode(nodeId,immediately) + cls.DelNode(nodeId, immediately) } -func (cls *Cluster) delServiceNode(serviceName string,nodeId int){ - if nodeId == cls.localNodeInfo.NodeId{ +func (cls *Cluster) delServiceNode(serviceName string, nodeId int) { + if nodeId == cls.localNodeInfo.NodeId { return } mapNode := cls.mapServiceNode[serviceName] - delete(mapNode,nodeId) - if len(mapNode)==0 { - delete(cls.mapServiceNode,serviceName) + delete(mapNode, nodeId) + if len(mapNode) == 0 { + delete(cls.mapServiceNode, serviceName) } } - -func (cls *Cluster) serviceDiscoverySetNodeInfo (nodeInfo *NodeInfo){ +func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) { //本地结点不加入 if nodeInfo.NodeId == cls.localNodeInfo.NodeId { return @@ -161,55 +163,54 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo (nodeInfo *NodeInfo){ defer cls.locker.Unlock() //先清一次的NodeId对应的所有服务清理 - lastNodeInfo,ok := cls.mapIdNode[nodeInfo.NodeId] - if ok == true{ - for _,serviceName := range lastNodeInfo.ServiceList{ - cls.delServiceNode(serviceName,nodeInfo.NodeId) + lastNodeInfo, ok := cls.mapIdNode[nodeInfo.NodeId] + if ok == true { + for _, serviceName := range lastNodeInfo.ServiceList { + cls.delServiceNode(serviceName, nodeInfo.NodeId) } } //再重新组装 mapDuplicate := map[string]interface{}{} //预防重复数据 - for _,serviceName := range nodeInfo.PublicServiceList { - if _,ok := mapDuplicate[serviceName];ok == true { + for _, serviceName := range nodeInfo.PublicServiceList { + if _, ok := mapDuplicate[serviceName]; ok == true { //存在重复 log.SError("Bad duplicate Service Cfg.") continue } mapDuplicate[serviceName] = nil - if _,ok:=cls.mapServiceNode[serviceName];ok==false { - cls.mapServiceNode[serviceName] = make(map[int]struct{},1) + if _, ok := cls.mapServiceNode[serviceName]; ok == false { + cls.mapServiceNode[serviceName] = make(map[int]struct{}, 1) } cls.mapServiceNode[serviceName][nodeInfo.NodeId] = struct{}{} } cls.mapIdNode[nodeInfo.NodeId] = *nodeInfo - log.SRelease("Discovery nodeId: ",nodeInfo.NodeId," services:",nodeInfo.PublicServiceList) - + log.SRelease("Discovery nodeId: ", nodeInfo.NodeId, " services:", nodeInfo.PublicServiceList) + //已经存在连接,则不需要进行设置 - if _,rpcInfoOK := cls.mapRpc[nodeInfo.NodeId];rpcInfoOK == true { + if _, rpcInfoOK := cls.mapRpc[nodeInfo.NodeId]; rpcInfoOK == true { return } rpcInfo := NodeRpcInfo{} rpcInfo.nodeInfo = *nodeInfo rpcInfo.client = &rpc.Client{} rpcInfo.client.TriggerRpcEvent = cls.triggerRpcEvent - rpcInfo.client.Connect(nodeInfo.NodeId,nodeInfo.ListenAddr) + rpcInfo.client.Connect(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen) cls.mapRpc[nodeInfo.NodeId] = rpcInfo - } -func (cls *Cluster) buildLocalRpc(){ +func (cls *Cluster) buildLocalRpc() { rpcInfo := NodeRpcInfo{} rpcInfo.nodeInfo = cls.localNodeInfo rpcInfo.client = &rpc.Client{} - rpcInfo.client.Connect(rpcInfo.nodeInfo.NodeId,"") + rpcInfo.client.Connect(rpcInfo.nodeInfo.NodeId, "", 0) cls.mapRpc[cls.localNodeInfo.NodeId] = rpcInfo } -func (cls *Cluster) Init(localNodeId int,setupServiceFun SetupServiceFun) error{ +func (cls *Cluster) Init(localNodeId int, setupServiceFun SetupServiceFun) error { //1.初始化配置 err := cls.InitCfg(localNodeId) if err != nil { @@ -220,10 +221,10 @@ func (cls *Cluster) Init(localNodeId int,setupServiceFun SetupServiceFun) error{ cls.buildLocalRpc() //2.安装服务发现结点 - cls.SetupServiceDiscovery(localNodeId,setupServiceFun) + cls.SetupServiceDiscovery(localNodeId, setupServiceFun) service.RegRpcEventFun = cls.RegRpcEvent - err = cls.serviceDiscovery.InitDiscovery(localNodeId,cls.serviceDiscoveryDelNode,cls.serviceDiscoverySetNodeInfo) + err = cls.serviceDiscovery.InitDiscovery(localNodeId, cls.serviceDiscoveryDelNode, cls.serviceDiscoverySetNodeInfo) if err != nil { return err } @@ -231,12 +232,12 @@ func (cls *Cluster) Init(localNodeId int,setupServiceFun SetupServiceFun) error{ return nil } -func (cls *Cluster) checkDynamicDiscovery(localNodeId int) (bool,bool){ - var localMaster bool //本结点是否为Master结点 - var hasMaster bool //是否配置Master服务 +func (cls *Cluster) checkDynamicDiscovery(localNodeId int) (bool, bool) { + var localMaster bool //本结点是否为Master结点 + var hasMaster bool //是否配置Master服务 //遍历所有结点 - for _,nodeInfo := range cls.masterDiscoveryNodeList{ + for _, nodeInfo := range cls.masterDiscoveryNodeList { if nodeInfo.NodeId == localNodeId { localMaster = true } @@ -244,27 +245,27 @@ func (cls *Cluster) checkDynamicDiscovery(localNodeId int) (bool,bool){ } //返回查询结果 - return localMaster,hasMaster + return localMaster, hasMaster } -func (cls *Cluster) appendService(serviceName string,bPublicService bool){ - cls.localNodeInfo.ServiceList = append(cls.localNodeInfo.ServiceList,serviceName) +func (cls *Cluster) appendService(serviceName string, bPublicService bool) { + cls.localNodeInfo.ServiceList = append(cls.localNodeInfo.ServiceList, serviceName) if bPublicService { - cls.localNodeInfo.PublicServiceList = append(cls.localNodeInfo.PublicServiceList,serviceName) + cls.localNodeInfo.PublicServiceList = append(cls.localNodeInfo.PublicServiceList, serviceName) } - if _,ok:=cls.mapServiceNode[serviceName];ok==false { + if _, ok := cls.mapServiceNode[serviceName]; ok == false { cls.mapServiceNode[serviceName] = map[int]struct{}{} } - cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId]= struct{}{} + cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{} } -func (cls *Cluster) GetDiscoveryNodeList() []NodeInfo{ +func (cls *Cluster) GetDiscoveryNodeList() []NodeInfo { return cls.masterDiscoveryNodeList } -func (cls *Cluster) GetMasterDiscoveryNodeInfo(nodeId int) *NodeInfo{ - for i:=0;i0 { +func GetRpcClient(nodeId int, serviceMethod string, clientList []*rpc.Client) (error, int) { + if nodeId > 0 { pClient := GetCluster().GetRpcClient(nodeId) - if pClient==nil { - return fmt.Errorf("cannot find nodeid %d!",nodeId),0 + if pClient == nil { + return fmt.Errorf("cannot find nodeid %d!", nodeId), 0 } clientList[0] = pClient - return nil,1 + return nil, 1 } - - findIndex := strings.Index(serviceMethod,".") - if findIndex==-1 { - return fmt.Errorf("servicemethod param %s is error!",serviceMethod),0 + findIndex := strings.Index(serviceMethod, ".") + if findIndex == -1 { + return fmt.Errorf("servicemethod param %s is error!", serviceMethod), 0 } serviceName := serviceMethod[:findIndex] //1.找到对应的rpcNodeid - return GetCluster().GetNodeIdByService(serviceName,clientList,true) + return GetCluster().GetNodeIdByService(serviceName, clientList, true) } -func GetRpcServer() *rpc.Server{ +func GetRpcServer() *rpc.Server { return &cluster.rpcServer } -func (cls *Cluster) IsNodeConnected (nodeId int) bool { +func (cls *Cluster) IsNodeConnected(nodeId int) bool { pClient := cls.GetRpcClient(nodeId) - return pClient!=nil && pClient.IsConnected() + return pClient != nil && pClient.IsConnected() } -func (cls *Cluster) triggerRpcEvent (bConnect bool,clientSeq uint32,nodeId int) { +func (cls *Cluster) triggerRpcEvent(bConnect bool, clientSeq uint32, nodeId int) { cls.locker.Lock() - nodeInfo,ok := cls.mapRpc[nodeId] - if ok == false || nodeInfo.client==nil || nodeInfo.client.GetClientSeq()!=clientSeq { + nodeInfo, ok := cls.mapRpc[nodeId] + if ok == false || nodeInfo.client == nil || nodeInfo.client.GetClientSeq() != clientSeq { cls.locker.Unlock() return } cls.locker.Unlock() cls.rpcEventLocker.Lock() - for serviceName,_:= range cls.mapServiceListenRpcEvent{ + for serviceName, _ := range cls.mapServiceListenRpcEvent { ser := service.GetService(serviceName) if ser == nil { - log.SError("cannot find service name ",serviceName) + log.SError("cannot find service name ", serviceName) continue } @@ -382,7 +382,7 @@ func (cls *Cluster) GetLocalNodeInfo() *NodeInfo { return &cls.localNodeInfo } -func (cls *Cluster) RegRpcEvent(serviceName string){ +func (cls *Cluster) RegRpcEvent(serviceName string) { cls.rpcEventLocker.Lock() if cls.mapServiceListenRpcEvent == nil { cls.mapServiceListenRpcEvent = map[string]struct{}{} @@ -392,27 +392,27 @@ func (cls *Cluster) RegRpcEvent(serviceName string){ cls.rpcEventLocker.Unlock() } -func (cls *Cluster) UnRegRpcEvent(serviceName string){ +func (cls *Cluster) UnRegRpcEvent(serviceName string) { cls.rpcEventLocker.Lock() - delete(cls.mapServiceListenRpcEvent,serviceName) + delete(cls.mapServiceListenRpcEvent, serviceName) cls.rpcEventLocker.Unlock() } -func (cls *Cluster) FetchAllNodeId(fetchFun func(nodeId int)){ +func (cls *Cluster) FetchAllNodeId(fetchFun func(nodeId int)) { cls.locker.Lock() - for nodeId,_:= range cls.mapIdNode { + for nodeId, _ := range cls.mapIdNode { fetchFun(nodeId) } cls.locker.Unlock() } -func HasService(nodeId int,serviceName string) bool{ +func HasService(nodeId int, serviceName string) bool { cluster.locker.RLock() defer cluster.locker.RUnlock() - mapNode,_ := cluster.mapServiceNode[serviceName] - if mapNode!=nil { - _,ok := mapNode[nodeId] + mapNode, _ := cluster.mapServiceNode[serviceName] + if mapNode != nil { + _, ok := mapNode[nodeId] return ok } diff --git a/cluster/dynamicdiscovery.go b/cluster/dynamicdiscovery.go index 222b38e..4f984a6 100644 --- a/cluster/dynamicdiscovery.go +++ b/cluster/dynamicdiscovery.go @@ -78,7 +78,8 @@ func (ds *DynamicDiscoveryMaster) OnStart() { nodeInfo.NodeName = localNodeInfo.NodeName nodeInfo.ListenAddr = localNodeInfo.ListenAddr nodeInfo.PublicServiceList = localNodeInfo.PublicServiceList - + nodeInfo.MaxRpcParamLen = localNodeInfo.MaxRpcParamLen + ds.addNodeInfo(&nodeInfo) } @@ -144,7 +145,7 @@ func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscove nodeInfo.ServiceList = req.NodeInfo.PublicServiceList nodeInfo.PublicServiceList = req.NodeInfo.PublicServiceList nodeInfo.ListenAddr = req.NodeInfo.ListenAddr - + nodeInfo.MaxRpcParamLen = req.NodeInfo.MaxRpcParamLen //主动删除已经存在的结点,确保先断开,再连接 cluster.serviceDiscoveryDelNode(nodeInfo.NodeId, true) @@ -264,6 +265,7 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco nInfo.NodeId = nodeInfo.NodeId nInfo.NodeName = nodeInfo.NodeName nInfo.ListenAddr = nodeInfo.ListenAddr + nInfo.MaxRpcParamLen = nodeInfo.MaxRpcParamLen mapNodeInfo[nodeInfo.NodeId] = nInfo } @@ -324,6 +326,7 @@ func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) { req.NodeInfo.NodeId = int32(cluster.localNodeInfo.NodeId) req.NodeInfo.NodeName = cluster.localNodeInfo.NodeName req.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr + req.NodeInfo.MaxRpcParamLen = cluster.localNodeInfo.MaxRpcParamLen //MasterDiscoveryNode配置中没有配置NeighborService,则同步当前结点所有服务 if len(nodeInfo.NeighborService) == 0 { @@ -335,12 +338,12 @@ func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) { //向Master服务同步本Node服务信息 err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) { if err != nil { - log.SError("call ",RegServiceDiscover," is fail :", err.Error()) + log.SError("call ", RegServiceDiscover, " is fail :", err.Error()) return } }) if err != nil { - log.SError("call ",RegServiceDiscover," is fail :", err.Error()) + log.SError("call ", RegServiceDiscover, " is fail :", err.Error()) } } @@ -373,6 +376,7 @@ func (dc *DynamicDiscoveryClient) setNodeInfo(nodeInfo *rpc.NodeInfo) { nInfo.NodeId = int(nodeInfo.NodeId) nInfo.NodeName = nodeInfo.NodeName nInfo.ListenAddr = nodeInfo.ListenAddr + nInfo.MaxRpcParamLen = nodeInfo.MaxRpcParamLen dc.funSetService(&nInfo) } diff --git a/rpc/client.go b/rpc/client.go index bffbe23..98357f2 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -18,7 +18,7 @@ import ( type Client struct { clientSeq uint32 - id int + id int bSelfNode bool network.TCPClient conn *network.TCPConn @@ -41,19 +41,24 @@ func (client *Client) NewClientAgent(conn *network.TCPConn) network.Agent { return client } -func (client *Client) Connect(id int,addr string) error { - client.clientSeq = atomic.AddUint32(&clientSeq,1) +func (client *Client) Connect(id int, addr string, maxRpcParamLen uint32) error { + client.clientSeq = atomic.AddUint32(&clientSeq, 1) client.id = id client.Addr = addr client.maxCheckCallRpcCount = 1000 - client.callRpcTimeout = 15*time.Second + client.callRpcTimeout = 15 * time.Second client.ConnNum = 1 - client.ConnectInterval = time.Second*2 + client.ConnectInterval = time.Second * 2 client.PendingWriteNum = 200000 client.AutoReconnect = true - client.LenMsgLen = 2 + client.LenMsgLen = 4 client.MinMsgLen = 2 - client.MaxMsgLen = math.MaxUint16 + if maxRpcParamLen > 0 { + client.MaxMsgLen = maxRpcParamLen + } else { + client.MaxMsgLen = math.MaxUint32 + } + client.NewAgent = client.NewClientAgent client.LittleEndian = LittleEndian client.ResetPending() @@ -67,13 +72,13 @@ func (client *Client) Connect(id int,addr string) error { return nil } -func (client *Client) startCheckRpcCallTimer(){ - t:=timer.NewTimer(5*time.Second) - for{ +func (client *Client) startCheckRpcCallTimer() { + t := timer.NewTimer(5 * time.Second) + for { select { - case cTimer:=<- t.C: - cTimer.SetupTimer(time.Now()) - client.checkRpcCallTimeout() + case cTimer := <-t.C: + cTimer.SetupTimer(time.Now()) + client.checkRpcCallTimeout() } } @@ -81,19 +86,19 @@ func (client *Client) startCheckRpcCallTimer(){ timer.ReleaseTimer(t) } -func (client *Client) makeCallFail(call *Call){ +func (client *Client) makeCallFail(call *Call) { client.removePending(call.Seq) - if call.callback!=nil && call.callback.IsValid() { + if call.callback != nil && call.callback.IsValid() { call.rpcHandler.PushRpcResponse(call) - }else{ + } else { call.done <- call } } -func (client *Client) checkRpcCallTimeout(){ +func (client *Client) checkRpcCallTimeout() { now := time.Now() - for i:=0;i< client.maxCheckCallRpcCount;i++ { + for i := 0; i < client.maxCheckCallRpcCount; i++ { client.pendingLock.Lock() pElem := client.pendingTimer.Front() if pElem == nil { @@ -103,7 +108,7 @@ func (client *Client) checkRpcCallTimeout(){ pCall := pElem.Value.(*Call) if now.Sub(pCall.callTime) > client.callRpcTimeout { strTimeout := strconv.FormatInt(int64(client.callRpcTimeout/time.Second), 10) - pCall.Err = errors.New("RPC call takes more than "+strTimeout+ " seconds") + pCall.Err = errors.New("RPC call takes more than " + strTimeout + " seconds") client.makeCallFail(pCall) client.pendingLock.Unlock() continue @@ -112,21 +117,21 @@ func (client *Client) checkRpcCallTimeout(){ } } -func (client *Client) ResetPending(){ +func (client *Client) ResetPending() { client.pendingLock.Lock() if client.pending != nil { - for _,v := range client.pending { + for _, v := range client.pending { v.Value.(*Call).Err = errors.New("node is disconnect") v.Value.(*Call).done <- v.Value.(*Call) } } - client.pending = make(map[uint64]*list.Element,4096) + client.pending = make(map[uint64]*list.Element, 4096) client.pendingTimer = list.New() client.pendingLock.Unlock() } -func (client *Client) AddPending(call *Call){ +func (client *Client) AddPending(call *Call) { client.pendingLock.Lock() call.callTime = time.Now() elemTimer := client.pendingTimer.PushBack(call) @@ -134,7 +139,7 @@ func (client *Client) AddPending(call *Call){ client.pendingLock.Unlock() } -func (client *Client) RemovePending(seq uint64) *Call{ +func (client *Client) RemovePending(seq uint64) *Call { if seq == 0 { return nil } @@ -144,20 +149,20 @@ func (client *Client) RemovePending(seq uint64) *Call{ return call } -func (client *Client) removePending(seq uint64) *Call{ - v,ok := client.pending[seq] - if ok == false{ +func (client *Client) removePending(seq uint64) *Call { + v, ok := client.pending[seq] + if ok == false { return nil } call := v.Value.(*Call) client.pendingTimer.Remove(v) - delete(client.pending,seq) + delete(client.pending, seq) return call } -func (client *Client) FindPending(seq uint64) *Call{ +func (client *Client) FindPending(seq uint64) *Call { client.pendingLock.Lock() - v,ok := client.pending[seq] + v, ok := client.pending[seq] if ok == false { client.pendingLock.Unlock() return nil @@ -169,27 +174,27 @@ func (client *Client) FindPending(seq uint64) *Call{ return pCall } -func (client *Client) generateSeq() uint64{ - return atomic.AddUint64(&client.startSeq,1) +func (client *Client) generateSeq() uint64 { + return atomic.AddUint64(&client.startSeq, 1) } -func (client *Client) AsyncCall(rpcHandler IRpcHandler,serviceMethod string,callback reflect.Value, args interface{},replyParam interface{}) error { +func (client *Client) AsyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) error { processorType, processor := GetProcessorType(args) - InParam,herr := processor.Marshal(args) + InParam, herr := processor.Marshal(args) if herr != nil { return herr } seq := client.generateSeq() - request:=MakeRpcRequest(processor,seq,0,serviceMethod,false,InParam) - bytes,err := processor.Marshal(request.RpcRequestData) + request := MakeRpcRequest(processor, seq, 0, serviceMethod, false, InParam) + bytes, err := processor.Marshal(request.RpcRequestData) ReleaseRpcRequest(request) if err != nil { return err } if client.conn == nil { - return errors.New("Rpc server is disconnect,call "+serviceMethod) + return errors.New("Rpc server is disconnect,call " + serviceMethod) } call := MakeCall() @@ -200,7 +205,7 @@ func (client *Client) AsyncCall(rpcHandler IRpcHandler,serviceMethod string,call call.Seq = seq client.AddPending(call) - err = client.conn.WriteMsg([]byte{uint8(processorType)},bytes) + err = client.conn.WriteMsg([]byte{uint8(processorType)}, bytes) if err != nil { client.RemovePending(call.Seq) ReleaseCall(call) @@ -210,14 +215,14 @@ func (client *Client) AsyncCall(rpcHandler IRpcHandler,serviceMethod string,call return nil } -func (client *Client) RawGo(processor IRpcProcessor,noReply bool,rpcMethodId uint32,serviceMethod string,args []byte,reply interface{}) *Call { +func (client *Client) RawGo(processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, args []byte, reply interface{}) *Call { call := MakeCall() call.ServiceMethod = serviceMethod call.Reply = reply call.Seq = client.generateSeq() - request := MakeRpcRequest(processor,call.Seq,rpcMethodId,serviceMethod,noReply,args) - bytes,err := processor.Marshal(request.RpcRequestData) + request := MakeRpcRequest(processor, call.Seq, rpcMethodId, serviceMethod, noReply, args) + bytes, err := processor.Marshal(request.RpcRequestData) ReleaseRpcRequest(request) if err != nil { call.Seq = 0 @@ -227,7 +232,7 @@ func (client *Client) RawGo(processor IRpcProcessor,noReply bool,rpcMethodId uin if client.conn == nil { call.Seq = 0 - call.Err = errors.New(serviceMethod+" was called failed,rpc client is disconnect") + call.Err = errors.New(serviceMethod + " was called failed,rpc client is disconnect") return call } @@ -235,7 +240,7 @@ func (client *Client) RawGo(processor IRpcProcessor,noReply bool,rpcMethodId uin client.AddPending(call) } - err = client.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())},bytes) + err = client.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes) if err != nil { client.RemovePending(call.Seq) call.Seq = 0 @@ -245,75 +250,75 @@ func (client *Client) RawGo(processor IRpcProcessor,noReply bool,rpcMethodId uin return call } -func (client *Client) Go(noReply bool,serviceMethod string, args interface{},reply interface{}) *Call { - _,processor := GetProcessorType(args) - InParam,err := processor.Marshal(args) +func (client *Client) Go(noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call { + _, processor := GetProcessorType(args) + InParam, err := processor.Marshal(args) if err != nil { call := MakeCall() call.Err = err return call } - return client.RawGo(processor,noReply,0,serviceMethod,InParam,reply) + return client.RawGo(processor, noReply, 0, serviceMethod, InParam, reply) } -func (client *Client) Run(){ +func (client *Client) Run() { defer func() { if r := recover(); r != nil { buf := make([]byte, 4096) l := runtime.Stack(buf, false) errString := fmt.Sprint(r) - log.SError("core dump info[",errString,"]\n", string(buf[:l])) + log.SError("core dump info[", errString, "]\n", string(buf[:l])) } }() - client.TriggerRpcEvent(true,client.GetClientSeq(),client.GetId()) + client.TriggerRpcEvent(true, client.GetClientSeq(), client.GetId()) for { - bytes,err := client.conn.ReadMsg() + bytes, err := client.conn.ReadMsg() if err != nil { - log.SError("rpcClient ",client.Addr," ReadMsg error:",err.Error()) + log.SError("rpcClient ", client.Addr, " ReadMsg error:", err.Error()) return } processor := GetProcessor(bytes[0]) - if processor==nil { + if processor == nil { client.conn.ReleaseReadMsg(bytes) - log.SError("rpcClient ",client.Addr," ReadMsg head error:",err.Error()) + log.SError("rpcClient ", client.Addr, " ReadMsg head error:", err.Error()) return } //1.解析head response := RpcResponse{} - response.RpcResponseData =processor.MakeRpcResponse(0,"",nil) + response.RpcResponseData = processor.MakeRpcResponse(0, "", nil) err = processor.Unmarshal(bytes[1:], response.RpcResponseData) client.conn.ReleaseReadMsg(bytes) if err != nil { processor.ReleaseRpcResponse(response.RpcResponseData) - log.SError("rpcClient Unmarshal head error:",err.Error()) + log.SError("rpcClient Unmarshal head error:", err.Error()) continue } v := client.RemovePending(response.RpcResponseData.GetSeq()) if v == nil { - log.SError("rpcClient cannot find seq ",response.RpcResponseData.GetSeq()," in pending") - }else { + log.SError("rpcClient cannot find seq ", response.RpcResponseData.GetSeq(), " in pending") + } else { v.Err = nil - if len(response.RpcResponseData.GetReply()) >0 { - err = processor.Unmarshal(response.RpcResponseData.GetReply(),v.Reply) + if len(response.RpcResponseData.GetReply()) > 0 { + err = processor.Unmarshal(response.RpcResponseData.GetReply(), v.Reply) if err != nil { - log.SError("rpcClient Unmarshal body error:",err.Error()) + log.SError("rpcClient Unmarshal body error:", err.Error()) v.Err = err } } if response.RpcResponseData.GetErr() != nil { - v.Err= response.RpcResponseData.GetErr() + v.Err = response.RpcResponseData.GetErr() } - if v.callback!=nil && v.callback.IsValid() { - v.rpcHandler.PushRpcResponse(v) - }else{ + if v.callback != nil && v.callback.IsValid() { + v.rpcHandler.PushRpcResponse(v) + } else { v.done <- v } } @@ -322,19 +327,19 @@ func (client *Client) Run(){ } } -func (client *Client) OnClose(){ - client.TriggerRpcEvent(false,client.GetClientSeq(),client.GetId()) +func (client *Client) OnClose() { + client.TriggerRpcEvent(false, client.GetClientSeq(), client.GetId()) } func (client *Client) IsConnected() bool { - return client.bSelfNode || (client.conn!=nil && client.conn.IsConnected()==true) + return client.bSelfNode || (client.conn != nil && client.conn.IsConnected() == true) } -func (client *Client) GetId() int{ +func (client *Client) GetId() int { return client.id } -func (client *Client) Close(waitDone bool){ +func (client *Client) Close(waitDone bool) { client.TCPClient.Close(waitDone) } diff --git a/rpc/dynamicdiscover.pb.go b/rpc/dynamicdiscover.pb.go index b5f6e0a..1466067 100644 --- a/rpc/dynamicdiscover.pb.go +++ b/rpc/dynamicdiscover.pb.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. -// source: rpc/dynamicdiscover.proto +// source: dynamicdiscover.proto package rpc @@ -26,8 +26,9 @@ type NodeInfo struct { NodeId int32 `protobuf:"varint,1,opt,name=NodeId,proto3" json:"NodeId,omitempty"` NodeName string `protobuf:"bytes,2,opt,name=NodeName,proto3" json:"NodeName,omitempty"` ListenAddr string `protobuf:"bytes,3,opt,name=ListenAddr,proto3" json:"ListenAddr,omitempty"` - Private bool `protobuf:"varint,4,opt,name=Private,proto3" json:"Private,omitempty"` - PublicServiceList []string `protobuf:"bytes,5,rep,name=PublicServiceList,proto3" json:"PublicServiceList,omitempty"` + MaxRpcParamLen uint32 `protobuf:"varint,4,opt,name=MaxRpcParamLen,proto3" json:"MaxRpcParamLen,omitempty"` + Private bool `protobuf:"varint,5,opt,name=Private,proto3" json:"Private,omitempty"` + PublicServiceList []string `protobuf:"bytes,6,rep,name=PublicServiceList,proto3" json:"PublicServiceList,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -37,7 +38,7 @@ func (m *NodeInfo) Reset() { *m = NodeInfo{} } func (m *NodeInfo) String() string { return proto.CompactTextString(m) } func (*NodeInfo) ProtoMessage() {} func (*NodeInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_9bfdd3ec0419520f, []int{0} + return fileDescriptor_c41e5a852f87626c, []int{0} } func (m *NodeInfo) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -87,6 +88,13 @@ func (m *NodeInfo) GetListenAddr() string { return "" } +func (m *NodeInfo) GetMaxRpcParamLen() uint32 { + if m != nil { + return m.MaxRpcParamLen + } + return 0 +} + func (m *NodeInfo) GetPrivate() bool { if m != nil { return m.Private @@ -113,7 +121,7 @@ func (m *ServiceDiscoverReq) Reset() { *m = ServiceDiscoverReq{} } func (m *ServiceDiscoverReq) String() string { return proto.CompactTextString(m) } func (*ServiceDiscoverReq) ProtoMessage() {} func (*ServiceDiscoverReq) Descriptor() ([]byte, []int) { - return fileDescriptor_9bfdd3ec0419520f, []int{1} + return fileDescriptor_c41e5a852f87626c, []int{1} } func (m *ServiceDiscoverReq) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -164,7 +172,7 @@ func (m *SubscribeDiscoverNotify) Reset() { *m = SubscribeDiscoverNotify func (m *SubscribeDiscoverNotify) String() string { return proto.CompactTextString(m) } func (*SubscribeDiscoverNotify) ProtoMessage() {} func (*SubscribeDiscoverNotify) Descriptor() ([]byte, []int) { - return fileDescriptor_9bfdd3ec0419520f, []int{2} + return fileDescriptor_c41e5a852f87626c, []int{2} } func (m *SubscribeDiscoverNotify) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -232,7 +240,7 @@ func (m *Empty) Reset() { *m = Empty{} } func (m *Empty) String() string { return proto.CompactTextString(m) } func (*Empty) ProtoMessage() {} func (*Empty) Descriptor() ([]byte, []int) { - return fileDescriptor_9bfdd3ec0419520f, []int{3} + return fileDescriptor_c41e5a852f87626c, []int{3} } func (m *Empty) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -268,30 +276,31 @@ func init() { proto.RegisterType((*Empty)(nil), "rpc.Empty") } -func init() { proto.RegisterFile("rpc/dynamicdiscover.proto", fileDescriptor_9bfdd3ec0419520f) } +func init() { proto.RegisterFile("dynamicdiscover.proto", fileDescriptor_c41e5a852f87626c) } -var fileDescriptor_9bfdd3ec0419520f = []byte{ - // 305 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x91, 0xd1, 0x4a, 0xc3, 0x30, - 0x18, 0x85, 0x89, 0x5d, 0xb7, 0xee, 0x57, 0x2f, 0x8c, 0xa0, 0x51, 0xa4, 0x94, 0x5e, 0x55, 0x90, - 0x0e, 0xf4, 0x01, 0x44, 0x99, 0xc2, 0x40, 0xc7, 0xc8, 0xee, 0xbc, 0x6b, 0x93, 0x0c, 0x02, 0x5d, - 0x13, 0xd3, 0x6c, 0xb0, 0x97, 0xf1, 0xd2, 0x67, 0xf1, 0xd2, 0x47, 0x90, 0x3d, 0x89, 0x2c, 0x66, - 0x73, 0x43, 0xf0, 0x2e, 0xdf, 0x7f, 0x72, 0xc2, 0x39, 0x7f, 0xe0, 0xcc, 0x68, 0xd6, 0xe3, 0x8b, - 0xba, 0x98, 0x4a, 0xc6, 0x65, 0xc3, 0xd4, 0x5c, 0x98, 0x5c, 0x1b, 0x65, 0x15, 0x0e, 0x8c, 0x66, - 0xe9, 0x3b, 0x82, 0x68, 0xa8, 0xb8, 0x18, 0xd4, 0x13, 0x85, 0x4f, 0xa0, 0xed, 0xce, 0x9c, 0xa0, - 0x04, 0x65, 0x21, 0xf5, 0x84, 0xcf, 0x7f, 0xee, 0x0c, 0x8b, 0xa9, 0x20, 0x7b, 0x09, 0xca, 0xba, - 0x74, 0xc3, 0x38, 0x06, 0x78, 0x92, 0x8d, 0x15, 0xf5, 0x1d, 0xe7, 0x86, 0x04, 0x4e, 0xdd, 0x9a, - 0x60, 0x02, 0x9d, 0x91, 0x91, 0xf3, 0xc2, 0x0a, 0xd2, 0x4a, 0x50, 0x16, 0xd1, 0x35, 0xe2, 0x2b, - 0x38, 0x1a, 0xcd, 0xca, 0x4a, 0xb2, 0xb1, 0x30, 0x73, 0xc9, 0xc4, 0xca, 0x44, 0xc2, 0x24, 0xc8, - 0xba, 0xf4, 0xaf, 0x90, 0xde, 0x02, 0xf6, 0xd8, 0xf7, 0x35, 0xa8, 0x78, 0xc5, 0x97, 0x10, 0xd5, - 0x3e, 0xbd, 0xcb, 0xbc, 0x7f, 0x7d, 0x98, 0x1b, 0xcd, 0xf2, 0x75, 0x25, 0xba, 0x91, 0xd3, 0x37, - 0x04, 0xa7, 0xe3, 0x59, 0xd9, 0x30, 0x23, 0xcb, 0xcd, 0x1b, 0x43, 0x65, 0xe5, 0x64, 0x81, 0x53, - 0x38, 0x78, 0x2e, 0x1a, 0xbb, 0xe2, 0xad, 0xfa, 0x3b, 0xb3, 0xd5, 0x72, 0x06, 0xcd, 0xe3, 0xac, - 0xaa, 0xdc, 0x0a, 0x22, 0xea, 0x09, 0x5f, 0x40, 0xb7, 0x2f, 0x2a, 0x6f, 0x0c, 0x9c, 0xf1, 0x77, - 0xb0, 0x13, 0xb0, 0x95, 0x04, 0xff, 0x05, 0xec, 0x40, 0xf8, 0x30, 0xd5, 0x76, 0x71, 0x7f, 0xfc, - 0xb1, 0x8c, 0xd1, 0xe7, 0x32, 0x46, 0x5f, 0xcb, 0x18, 0xbd, 0x84, 0x79, 0xcf, 0x68, 0x56, 0xb6, - 0xdd, 0xa7, 0xdd, 0x7c, 0x07, 0x00, 0x00, 0xff, 0xff, 0xa7, 0x79, 0x82, 0x2a, 0xd1, 0x01, 0x00, - 0x00, +var fileDescriptor_c41e5a852f87626c = []byte{ + // 324 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x91, 0xdf, 0x4a, 0xc3, 0x30, + 0x14, 0xc6, 0x89, 0x5d, 0xb7, 0xee, 0xe8, 0x04, 0x23, 0x6a, 0x10, 0x29, 0xa5, 0x17, 0x52, 0x41, + 0x2a, 0xe8, 0x03, 0x88, 0x32, 0x85, 0xc1, 0x36, 0x46, 0x76, 0xe7, 0x5d, 0x9a, 0x66, 0x10, 0xe8, + 0x3f, 0xd3, 0x6c, 0xb8, 0x97, 0xf1, 0x79, 0xbc, 0x12, 0x1f, 0x41, 0xf6, 0x24, 0xb2, 0x98, 0xcd, + 0x4d, 0xc1, 0xbb, 0xfe, 0xbe, 0xaf, 0xe7, 0x70, 0xbe, 0x2f, 0x70, 0x94, 0xce, 0x0b, 0x96, 0x4b, + 0x9e, 0xca, 0x9a, 0x97, 0x33, 0xa1, 0xe2, 0x4a, 0x95, 0xba, 0xc4, 0x8e, 0xaa, 0x78, 0xf8, 0x8e, + 0xc0, 0x1b, 0x96, 0xa9, 0xe8, 0x15, 0x93, 0x12, 0x1f, 0x43, 0xd3, 0x7c, 0xa7, 0x04, 0x05, 0x28, + 0x72, 0xa9, 0x25, 0x7c, 0xfa, 0xfd, 0xcf, 0x90, 0xe5, 0x82, 0xec, 0x04, 0x28, 0x6a, 0xd3, 0x35, + 0x63, 0x1f, 0xa0, 0x2f, 0x6b, 0x2d, 0x8a, 0xbb, 0x34, 0x55, 0xc4, 0x31, 0xee, 0x86, 0x82, 0xcf, + 0x61, 0x7f, 0xc0, 0x5e, 0x68, 0xc5, 0x47, 0x4c, 0xb1, 0xbc, 0x2f, 0x0a, 0xd2, 0x08, 0x50, 0xd4, + 0xa1, 0xbf, 0x54, 0x4c, 0xa0, 0x35, 0x52, 0x72, 0xc6, 0xb4, 0x20, 0x6e, 0x80, 0x22, 0x8f, 0xae, + 0x10, 0x5f, 0xc2, 0xc1, 0x68, 0x9a, 0x64, 0x92, 0x8f, 0x85, 0x9a, 0x49, 0x2e, 0x96, 0xcb, 0x49, + 0x33, 0x70, 0xa2, 0x36, 0xfd, 0x6b, 0x84, 0xb7, 0x80, 0x2d, 0x76, 0x6d, 0x5c, 0x2a, 0x9e, 0xf1, + 0x05, 0x78, 0x85, 0x4d, 0x69, 0xb2, 0xed, 0x5e, 0x77, 0x62, 0x55, 0xf1, 0x78, 0x15, 0x9d, 0xae, + 0xed, 0xf0, 0x15, 0xc1, 0xc9, 0x78, 0x9a, 0xd4, 0x5c, 0xc9, 0x64, 0xbd, 0x63, 0x58, 0x6a, 0x39, + 0x99, 0xe3, 0x10, 0xf6, 0x06, 0xac, 0xd6, 0x4b, 0xde, 0xa8, 0x69, 0x4b, 0x5b, 0x96, 0xd8, 0xab, + 0x1f, 0xa7, 0x59, 0x66, 0xaa, 0xf2, 0xa8, 0x25, 0x7c, 0x06, 0xed, 0xae, 0xc8, 0xec, 0xa0, 0x63, + 0x06, 0x7f, 0x84, 0xad, 0x03, 0x1b, 0x81, 0xf3, 0xdf, 0x81, 0x2d, 0x70, 0x1f, 0xf2, 0x4a, 0xcf, + 0xef, 0x0f, 0xdf, 0x16, 0x3e, 0xfa, 0x58, 0xf8, 0xe8, 0x73, 0xe1, 0xa3, 0x27, 0x37, 0xbe, 0x52, + 0x15, 0x4f, 0x9a, 0xe6, 0x71, 0x6f, 0xbe, 0x02, 0x00, 0x00, 0xff, 0xff, 0xf9, 0xfb, 0xa9, 0x70, + 0xf5, 0x01, 0x00, 0x00, } func (m *NodeInfo) Marshal() (dAtA []byte, err error) { @@ -324,7 +333,7 @@ func (m *NodeInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { copy(dAtA[i:], m.PublicServiceList[iNdEx]) i = encodeVarintDynamicdiscover(dAtA, i, uint64(len(m.PublicServiceList[iNdEx]))) i-- - dAtA[i] = 0x2a + dAtA[i] = 0x32 } } if m.Private { @@ -335,6 +344,11 @@ func (m *NodeInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { dAtA[i] = 0 } i-- + dAtA[i] = 0x28 + } + if m.MaxRpcParamLen != 0 { + i = encodeVarintDynamicdiscover(dAtA, i, uint64(m.MaxRpcParamLen)) + i-- dAtA[i] = 0x20 } if len(m.ListenAddr) > 0 { @@ -514,6 +528,9 @@ func (m *NodeInfo) Size() (n int) { if l > 0 { n += 1 + l + sovDynamicdiscover(uint64(l)) } + if m.MaxRpcParamLen != 0 { + n += 1 + sovDynamicdiscover(uint64(m.MaxRpcParamLen)) + } if m.Private { n += 2 } @@ -703,6 +720,25 @@ func (m *NodeInfo) Unmarshal(dAtA []byte) error { m.ListenAddr = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxRpcParamLen", wireType) + } + m.MaxRpcParamLen = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDynamicdiscover + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.MaxRpcParamLen |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 5: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Private", wireType) } @@ -722,7 +758,7 @@ func (m *NodeInfo) Unmarshal(dAtA []byte) error { } } m.Private = bool(v != 0) - case 5: + case 6: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field PublicServiceList", wireType) } diff --git a/rpc/dynamicdiscover.proto b/rpc/dynamicdiscover.proto index f5a80c9..65a91b6 100644 --- a/rpc/dynamicdiscover.proto +++ b/rpc/dynamicdiscover.proto @@ -6,8 +6,9 @@ message NodeInfo{ int32 NodeId = 1; string NodeName = 2; string ListenAddr = 3; - bool Private = 4; - repeated string PublicServiceList = 5; + uint32 MaxRpcParamLen = 4; + bool Private = 5; + repeated string PublicServiceList = 6; } //Client->Master diff --git a/rpc/gogorpc.pb.go b/rpc/gogorpc.pb.go index 6d2faa1..9c2f9aa 100644 --- a/rpc/gogorpc.pb.go +++ b/rpc/gogorpc.pb.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. -// source: rpc/gogorpc.proto +// source: gogorpc.proto package rpc @@ -37,7 +37,7 @@ func (m *GoGoPBRpcRequestData) Reset() { *m = GoGoPBRpcRequestData{} } func (m *GoGoPBRpcRequestData) String() string { return proto.CompactTextString(m) } func (*GoGoPBRpcRequestData) ProtoMessage() {} func (*GoGoPBRpcRequestData) Descriptor() ([]byte, []int) { - return fileDescriptor_38afb24c36168563, []int{0} + return fileDescriptor_d0e25d3af112ec8f, []int{0} } func (m *GoGoPBRpcRequestData) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -114,7 +114,7 @@ func (m *GoGoPBRpcResponseData) Reset() { *m = GoGoPBRpcResponseData{} } func (m *GoGoPBRpcResponseData) String() string { return proto.CompactTextString(m) } func (*GoGoPBRpcResponseData) ProtoMessage() {} func (*GoGoPBRpcResponseData) Descriptor() ([]byte, []int) { - return fileDescriptor_38afb24c36168563, []int{1} + return fileDescriptor_d0e25d3af112ec8f, []int{1} } func (m *GoGoPBRpcResponseData) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -169,25 +169,25 @@ func init() { proto.RegisterType((*GoGoPBRpcResponseData)(nil), "rpc.GoGoPBRpcResponseData") } -func init() { proto.RegisterFile("rpc/gogorpc.proto", fileDescriptor_38afb24c36168563) } +func init() { proto.RegisterFile("gogorpc.proto", fileDescriptor_d0e25d3af112ec8f) } -var fileDescriptor_38afb24c36168563 = []byte{ - // 237 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2c, 0x2a, 0x48, 0xd6, - 0x4f, 0xcf, 0x4f, 0xcf, 0x2f, 0x2a, 0x48, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2e, - 0x2a, 0x48, 0x56, 0x5a, 0xc2, 0xc8, 0x25, 0xe2, 0x9e, 0xef, 0x9e, 0x1f, 0xe0, 0x14, 0x54, 0x90, - 0x1c, 0x94, 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0xe2, 0x92, 0x58, 0x92, 0x28, 0x24, 0xc0, 0xc5, 0x1c, - 0x9c, 0x5a, 0x28, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x12, 0x04, 0x62, 0x0a, 0x29, 0x70, 0x71, 0x07, - 0x15, 0x24, 0xfb, 0xa6, 0x96, 0x64, 0xe4, 0xa7, 0x78, 0xa6, 0x48, 0x30, 0x29, 0x30, 0x6a, 0xf0, - 0x06, 0x21, 0x0b, 0x09, 0xa9, 0x70, 0xf1, 0x06, 0xa7, 0x16, 0x95, 0x65, 0x26, 0xa7, 0x42, 0x84, - 0x24, 0x98, 0x15, 0x18, 0x35, 0x38, 0x83, 0x50, 0x05, 0x85, 0x24, 0xb8, 0xd8, 0xfd, 0xf2, 0x83, - 0x52, 0x0b, 0x72, 0x2a, 0x25, 0x58, 0x14, 0x18, 0x35, 0x38, 0x82, 0x60, 0x5c, 0x90, 0x8c, 0x67, - 0x5e, 0x40, 0x62, 0x51, 0x62, 0xae, 0x04, 0xab, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x8c, 0xab, 0x14, - 0xca, 0x25, 0x8a, 0xe4, 0xca, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0x1c, 0xce, 0x14, 0xe1, 0x62, - 0x75, 0x2d, 0x2a, 0xca, 0x2f, 0x02, 0x3b, 0x90, 0x33, 0x08, 0xc2, 0x01, 0x89, 0x42, 0xac, 0x64, - 0x06, 0x1b, 0x0c, 0xe1, 0x38, 0x09, 0x9f, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, - 0x47, 0x72, 0x8c, 0x51, 0xac, 0x7a, 0xfa, 0x45, 0x05, 0xc9, 0x49, 0x6c, 0xe0, 0xe0, 0x31, 0x06, - 0x04, 0x00, 0x00, 0xff, 0xff, 0xc7, 0xfc, 0x50, 0x87, 0x33, 0x01, 0x00, 0x00, +var fileDescriptor_d0e25d3af112ec8f = []byte{ + // 233 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4d, 0xcf, 0x4f, 0xcf, + 0x2f, 0x2a, 0x48, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2e, 0x2a, 0x48, 0x56, 0x5a, + 0xc2, 0xc8, 0x25, 0xe2, 0x9e, 0xef, 0x9e, 0x1f, 0xe0, 0x14, 0x54, 0x90, 0x1c, 0x94, 0x5a, 0x58, + 0x9a, 0x5a, 0x5c, 0xe2, 0x92, 0x58, 0x92, 0x28, 0x24, 0xc0, 0xc5, 0x1c, 0x9c, 0x5a, 0x28, 0xc1, + 0xa8, 0xc0, 0xa8, 0xc1, 0x12, 0x04, 0x62, 0x0a, 0x29, 0x70, 0x71, 0x07, 0x15, 0x24, 0xfb, 0xa6, + 0x96, 0x64, 0xe4, 0xa7, 0x78, 0xa6, 0x48, 0x30, 0x29, 0x30, 0x6a, 0xf0, 0x06, 0x21, 0x0b, 0x09, + 0xa9, 0x70, 0xf1, 0x06, 0xa7, 0x16, 0x95, 0x65, 0x26, 0xa7, 0x42, 0x84, 0x24, 0x98, 0x15, 0x18, + 0x35, 0x38, 0x83, 0x50, 0x05, 0x85, 0x24, 0xb8, 0xd8, 0xfd, 0xf2, 0x83, 0x52, 0x0b, 0x72, 0x2a, + 0x25, 0x58, 0x14, 0x18, 0x35, 0x38, 0x82, 0x60, 0x5c, 0x90, 0x8c, 0x67, 0x5e, 0x40, 0x62, 0x51, + 0x62, 0xae, 0x04, 0xab, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x8c, 0xab, 0x14, 0xca, 0x25, 0x8a, 0xe4, + 0xca, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0x1c, 0xce, 0x14, 0xe1, 0x62, 0x75, 0x2d, 0x2a, 0xca, + 0x2f, 0x02, 0x3b, 0x90, 0x33, 0x08, 0xc2, 0x01, 0x89, 0x42, 0xac, 0x64, 0x06, 0x1b, 0x0c, 0xe1, + 0x38, 0x09, 0x9f, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, 0x47, 0x72, 0x8c, 0x51, + 0xac, 0x7a, 0xfa, 0x45, 0x05, 0xc9, 0x49, 0x6c, 0xe0, 0xe0, 0x31, 0x06, 0x04, 0x00, 0x00, 0xff, + 0xff, 0x26, 0xcf, 0x31, 0x39, 0x2f, 0x01, 0x00, 0x00, } func (m *GoGoPBRpcRequestData) Marshal() (dAtA []byte, err error) { diff --git a/rpc/server.go b/rpc/server.go index eb7ee88..498db9d 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -10,15 +10,16 @@ import ( "reflect" "strings" ) + type RpcProcessorType uint8 const ( - RpcProcessorJson RpcProcessorType = 0 - RpcProcessorGoGoPB RpcProcessorType = 1 + RpcProcessorJson RpcProcessorType = 0 + RpcProcessorGoGoPB RpcProcessorType = 1 ) //var processor IRpcProcessor = &JsonProcessor{} -var arrayProcessor = []IRpcProcessor{&JsonProcessor{},&GoGoPBProcessor{}} +var arrayProcessor = []IRpcProcessor{&JsonProcessor{}, &GoGoPBProcessor{}} var arrayProcessorLen uint8 = 2 var LittleEndian bool @@ -35,22 +36,22 @@ type RpcAgent struct { } func AppendProcessor(rpcProcessor IRpcProcessor) { - arrayProcessor = append(arrayProcessor,rpcProcessor) + arrayProcessor = append(arrayProcessor, rpcProcessor) arrayProcessorLen++ } -func GetProcessorType(param interface{}) (RpcProcessorType,IRpcProcessor){ - for i:=uint8(1);i=arrayProcessorLen{ +func GetProcessor(processorType uint8) IRpcProcessor { + if processorType >= arrayProcessorLen { return nil } return arrayProcessor[processorType] @@ -61,16 +62,21 @@ func (server *Server) Init(rpcHandleFinder RpcHandleFinder) { server.rpcServer = &network.TCPServer{} } -func (server *Server) Start(listenAddr string) { - splitAddr := strings.Split(listenAddr,":") - if len(splitAddr)!=2{ - log.SFatal("listen addr is error :",listenAddr) - } +func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) { + splitAddr := strings.Split(listenAddr, ":") + if len(splitAddr) != 2 { + log.SFatal("listen addr is error :", listenAddr) + } - server.rpcServer.Addr = ":"+splitAddr[1] - server.rpcServer.LenMsgLen = 2 //uint16 + server.rpcServer.Addr = ":" + splitAddr[1] + server.rpcServer.LenMsgLen = 4 //uint16 server.rpcServer.MinMsgLen = 2 - server.rpcServer.MaxMsgLen = math.MaxUint16 + if maxRpcParamLen > 0 { + server.rpcServer.MaxMsgLen = maxRpcParamLen + } else { + server.rpcServer.MaxMsgLen = math.MaxUint32 + } + server.rpcServer.MaxConnNum = 10000 server.rpcServer.PendingWriteNum = 2000000 server.rpcServer.NewAgent = server.NewAgent @@ -80,63 +86,63 @@ func (server *Server) Start(listenAddr string) { func (agent *RpcAgent) OnDestroy() {} -func (agent *RpcAgent) WriteResponse(processor IRpcProcessor,serviceMethod string,seq uint64,reply interface{},rpcError RpcError) { +func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod string, seq uint64, reply interface{}, rpcError RpcError) { var mReply []byte var errM error - if reply!=nil { - mReply,errM = processor.Marshal(reply) + if reply != nil { + mReply, errM = processor.Marshal(reply) if errM != nil { rpcError = ConvertError(errM) } } var rpcResponse RpcResponse - rpcResponse.RpcResponseData = processor.MakeRpcResponse(seq,rpcError,mReply) - bytes,errM := processor.Marshal(rpcResponse.RpcResponseData) + rpcResponse.RpcResponseData = processor.MakeRpcResponse(seq, rpcError, mReply) + bytes, errM := processor.Marshal(rpcResponse.RpcResponseData) defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData) if errM != nil { - log.SError("service method ",serviceMethod," Marshal error:",errM.Error()) + log.SError("service method ", serviceMethod, " Marshal error:", errM.Error()) return } - errM = agent.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())},bytes) + errM = agent.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes) if errM != nil { - log.SError("Rpc ",serviceMethod," return is error:",errM.Error()) + log.SError("Rpc ", serviceMethod, " return is error:", errM.Error()) } } func (agent *RpcAgent) Run() { for { - data,err := agent.conn.ReadMsg() + data, err := agent.conn.ReadMsg() if err != nil { - log.SError("remoteAddress:",agent.conn.RemoteAddr().String(),",read message: ",err.Error()) + log.SError("remoteAddress:", agent.conn.RemoteAddr().String(), ",read message: ", err.Error()) //will close tcpconn break } processor := GetProcessor(data[0]) - if processor==nil { + if processor == nil { agent.conn.ReleaseReadMsg(data) - log.SError("remote rpc ",agent.conn.RemoteAddr()," cannot find processor:",data[0]) + log.SError("remote rpc ", agent.conn.RemoteAddr(), " cannot find processor:", data[0]) return } //解析head - req := MakeRpcRequest(processor,0,0,"",false,nil) - err = processor.Unmarshal(data[1:],req.RpcRequestData) + req := MakeRpcRequest(processor, 0, 0, "", false, nil) + err = processor.Unmarshal(data[1:], req.RpcRequestData) agent.conn.ReleaseReadMsg(data) if err != nil { log.SError("rpc Unmarshal request is error:", err.Error()) - if req.RpcRequestData.GetSeq()>0 { + if req.RpcRequestData.GetSeq() > 0 { rpcError := RpcError(err.Error()) - if req.RpcRequestData.IsNoReply()==false { + if req.RpcRequestData.IsNoReply() == false { agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError) } ReleaseRpcRequest(req) continue - }else{ + } else { //will close tcpconn ReleaseRpcRequest(req) break @@ -144,10 +150,10 @@ func (agent *RpcAgent) Run() { } //交给程序处理 - serviceMethod := strings.Split(req.RpcRequestData.GetServiceMethod(),".") + serviceMethod := strings.Split(req.RpcRequestData.GetServiceMethod(), ".") if len(serviceMethod) < 1 { rpcError := RpcError("rpc request req.ServiceMethod is error") - if req.RpcRequestData.IsNoReply()==false { + if req.RpcRequestData.IsNoReply() == false { agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError) } ReleaseRpcRequest(req) @@ -156,30 +162,30 @@ func (agent *RpcAgent) Run() { } rpcHandler := agent.rpcServer.rpcHandleFinder.FindRpcHandler(serviceMethod[0]) - if rpcHandler== nil { + if rpcHandler == nil { rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod())) - if req.RpcRequestData.IsNoReply()==false { - agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError) + if req.RpcRequestData.IsNoReply() == false { + agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError) } - log.SError("service method ",req.RpcRequestData.GetServiceMethod()," not config!") + log.SError("service method ", req.RpcRequestData.GetServiceMethod(), " not config!") ReleaseRpcRequest(req) continue } - if req.RpcRequestData.IsNoReply()==false { - req.requestHandle = func(Returns interface{},Err RpcError){ - agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),Returns,Err) + if req.RpcRequestData.IsNoReply() == false { + req.requestHandle = func(Returns interface{}, Err RpcError) { + agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), Returns, Err) ReleaseRpcRequest(req) } } - req.inParam,err = rpcHandler.UnmarshalInParam(req.rpcProcessor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetRpcMethodId(),req.RpcRequestData.GetInParam()) + req.inParam, err = rpcHandler.UnmarshalInParam(req.rpcProcessor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetRpcMethodId(), req.RpcRequestData.GetInParam()) if err != nil { - rErr := "Call Rpc "+req.RpcRequestData.GetServiceMethod()+" Param error "+err.Error() - if req.requestHandle!=nil { + rErr := "Call Rpc " + req.RpcRequestData.GetServiceMethod() + " Param error " + err.Error() + if req.requestHandle != nil { req.requestHandle(nil, RpcError(rErr)) - }else{ + } else { ReleaseRpcRequest(req) } log.SError(rErr) @@ -191,7 +197,7 @@ func (agent *RpcAgent) Run() { rpcError := RpcError(err.Error()) if req.RpcRequestData.IsNoReply() { - agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError) + agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError) } ReleaseRpcRequest(req) @@ -213,7 +219,7 @@ func (agent *RpcAgent) RemoteAddr() net.Addr { return agent.conn.RemoteAddr() } -func (agent *RpcAgent) Close() { +func (agent *RpcAgent) Close() { agent.conn.Close() } @@ -227,26 +233,25 @@ func (server *Server) NewAgent(c *network.TCPConn) network.Agent { return agent } -func (server *Server) myselfRpcHandlerGo(handlerName string,serviceMethod string, args interface{},reply interface{}) error { +func (server *Server) myselfRpcHandlerGo(handlerName string, serviceMethod string, args interface{}, reply interface{}) error { rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) - if rpcHandler== nil { - err := errors.New("service method "+serviceMethod+" not config!") + if rpcHandler == nil { + err := errors.New("service method " + serviceMethod + " not config!") log.SError(err.Error()) return err } - return rpcHandler.CallMethod(serviceMethod,args,reply) + return rpcHandler.CallMethod(serviceMethod, args, reply) } - -func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Client,noReply bool,handlerName string,rpcMethodId uint32,serviceMethod string, args interface{},reply interface{},rawArgs []byte) *Call { +func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call { pCall := MakeCall() pCall.Seq = client.generateSeq() rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) - if rpcHandler== nil { + if rpcHandler == nil { pCall.Seq = 0 - pCall.Err = errors.New("service method "+serviceMethod+" not config!") + pCall.Err = errors.New("service method " + serviceMethod + " not config!") log.SError(pCall.Err.Error()) pCall.done <- pCall @@ -254,14 +259,14 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Clien } if processor == nil { - _,processor = GetProcessorType(args) + _, processor = GetProcessorType(args) } - req := MakeRpcRequest(processor,0,rpcMethodId, serviceMethod,noReply,nil) + req := MakeRpcRequest(processor, 0, rpcMethodId, serviceMethod, noReply, nil) req.inParam = args req.localReply = reply - if rawArgs!=nil { + if rawArgs != nil { var err error - req.inParam,err = rpcHandler.UnmarshalInParam(processor,serviceMethod,rpcMethodId,rawArgs) + req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs) if err != nil { ReleaseRpcRequest(req) pCall.Err = err @@ -272,30 +277,30 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Clien if noReply == false { client.AddPending(pCall) - req.requestHandle = func(Returns interface{},Err RpcError){ + req.requestHandle = func(Returns interface{}, Err RpcError) { if reply != nil && Returns != reply && Returns != nil { byteReturns, err := req.rpcProcessor.Marshal(Returns) if err != nil { - log.SError("returns data cannot be marshal ",pCall.Seq) + log.SError("returns data cannot be marshal ", pCall.Seq) ReleaseRpcRequest(req) } err = req.rpcProcessor.Unmarshal(byteReturns, reply) if err != nil { - log.SError("returns data cannot be Unmarshal ",pCall.Seq) + log.SError("returns data cannot be Unmarshal ", pCall.Seq) ReleaseRpcRequest(req) } } v := client.RemovePending(pCall.Seq) if v == nil { - log.SError("rpcClient cannot find seq ",pCall.Seq," in pending") + log.SError("rpcClient cannot find seq ", pCall.Seq, " in pending") ReleaseRpcRequest(req) return } if len(Err) == 0 { pCall.Err = nil - }else{ + } else { pCall.Err = Err } pCall.done <- pCall @@ -313,16 +318,16 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Clien return pCall } -func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler IRpcHandler,noReply bool,handlerName string,serviceMethod string,args interface{},reply interface{},callback reflect.Value) error { +func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value) error { rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) - if rpcHandler== nil { - err := errors.New("service method "+serviceMethod+" not config!") + if rpcHandler == nil { + err := errors.New("service method " + serviceMethod + " not config!") log.SError(err.Error()) return err } - _,processor := GetProcessorType(args) - req := MakeRpcRequest(processor,0,0,serviceMethod,noReply,nil) + _, processor := GetProcessorType(args) + req := MakeRpcRequest(processor, 0, 0, serviceMethod, noReply, nil) req.inParam = args req.localReply = reply @@ -335,21 +340,21 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler pCall.Reply = reply client.AddPending(pCall) - req.requestHandle = func(Returns interface{},Err RpcError){ + req.requestHandle = func(Returns interface{}, Err RpcError) { v := client.RemovePending(callSeq) if v == nil { - log.SError("rpcClient cannot find seq ",pCall.Seq," in pending") + log.SError("rpcClient cannot find seq ", pCall.Seq, " in pending") //ReleaseCall(pCall) ReleaseRpcRequest(req) return } if len(Err) == 0 { pCall.Err = nil - }else{ + } else { pCall.Err = Err } - if Returns!=nil { + if Returns != nil { pCall.Reply = Returns } pCall.rpcHandler.PushRpcResponse(pCall)