新增CallEx接口

This commit is contained in:
boyce
2019-10-28 16:22:46 +08:00
parent e266c50ed6
commit 89328aa3da

View File

@@ -1,13 +1,13 @@
package cluster
import (
"errors"
"fmt"
"math/rand"
"net"
"os"
"sort"
"strings"
"sync"
"time"
"github.com/duanhf2012/origin/rpc"
@@ -34,7 +34,6 @@ type CCluster struct {
writer net.Conn
LocalRpcClient *rpc.Client
localRpcLocker sync.Mutex
innerLocalServiceList map[string]bool
}
@@ -188,7 +187,7 @@ func (slf *CCluster) ConnService() error {
if slf.LocalRpcClient.IsClosed() {
slf.ReSetLocalRpcClient()
}
time.Sleep(time.Second * 2)
time.Sleep(time.Second * 4)
}
return nil
@@ -372,6 +371,8 @@ func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{},
return slf.goImpl(bCast, NodeServiceMethod, args, queueModle, true)
}
func (slf *CCluster) goImpl(bCast bool, NodeServiceMethod string, args interface{}, queueModle bool, log bool) error {
var callServiceName string
var serviceName string
@@ -521,6 +522,94 @@ func Call(NodeServiceMethod string, args interface{}, reply interface{}) error {
return InstanceClusterMgr().Call(NodeServiceMethod, args, reply)
}
func (slf *CCluster) CallEx(NodeServiceMethod string, args interface{}, reply interface{}) *RpcCallResult {
return slf.rawcall(NodeServiceMethod, args, reply, false)
}
type RpcCallResult struct {
chanRet chan *rpc.Call
err error
rets *rpc.Call
}
func (slf *RpcCallResult) Make() {
slf.chanRet = make(chan *rpc.Call, 1)
slf.rets = nil
}
func (slf *RpcCallResult) WaitReturn(waittm time.Duration) error {
if slf.chanRet == nil {
return errors.New("cannot make rpccallresult")
}
if waittm <= 0 {
select {
case ret := <-slf.chanRet:
return ret.Error
}
} else {
//
select {
case ret := <-slf.chanRet:
return ret.Error
case <-time.After(waittm):
return errors.New("is time out")
}
}
return errors.New("unknow error.")
}
func (slf *CCluster) rawcall(NodeServiceMethod string, args interface{}, reply interface{}, queueModle bool) *RpcCallResult {
var rpcRet RpcCallResult
rpcRet.Make()
var callServiceName string
var serviceName string
nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName, &serviceName)
if len(nodeidList) > 1 || len(nodeidList) < 1 {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s) find nodes count %d is error.", NodeServiceMethod, len(nodeidList))
rpcRet.err = fmt.Errorf("CCluster.Call(%s) find nodes count %d is error.", NodeServiceMethod, len(nodeidList))
return &rpcRet
}
nodeid := nodeidList[0]
if nodeid == GetNodeId() {
//判断服务是否已经完成初始化
iService := service.InstanceServiceMgr().FindService(serviceName)
if iService == nil {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s): NodeId %d cannot find service.", NodeServiceMethod, nodeid)
rpcRet.err = fmt.Errorf("CCluster.Call(%s): NodeId %d cannot find service..", NodeServiceMethod, nodeid)
return &rpcRet
}
if iService.IsInit() == false {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid)
rpcRet.err = fmt.Errorf("CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid)
return &rpcRet
}
rpcRet.rets = slf.LocalRpcClient.Go(callServiceName, args, reply, rpcRet.chanRet, queueModle)
return &rpcRet
}
pclient := slf.GetClusterClient(nodeid)
if pclient == nil {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s): NodeId %d is not find.", NodeServiceMethod, nodeid)
rpcRet.err = fmt.Errorf("CCluster.Call(%s): NodeId %d is not find.", NodeServiceMethod, nodeid)
return &rpcRet
}
rpcRet.rets = pclient.Go(callServiceName, args, reply, rpcRet.chanRet, queueModle)
return &rpcRet
}
func CallEx(NodeServiceMethod string, args interface{}, reply interface{}) *RpcCallResult {
return InstanceClusterMgr().rawcall(NodeServiceMethod, args, reply, false)
}
func CallNode(NodeId int, servicemethod string, args interface{}, reply interface{}) error {
return InstanceClusterMgr().CallNode(NodeId, servicemethod, args, reply)
}