新增rpc nats模块

This commit is contained in:
boyce
2024-04-23 10:44:21 +08:00
parent e36693eeff
commit 59efc05d24
16 changed files with 1101 additions and 658 deletions

View File

@@ -52,19 +52,21 @@ type Cluster struct {
localNodeInfo NodeInfo //本结点配置信息 localNodeInfo NodeInfo //本结点配置信息
discoveryInfo DiscoveryInfo //服务发现配置 discoveryInfo DiscoveryInfo //服务发现配置
rpcMode RpcMode
//masterDiscoveryNodeList []NodeInfo //配置发现Master结点 //masterDiscoveryNodeList []NodeInfo //配置发现Master结点
globalCfg interface{} //全局配置 globalCfg interface{} //全局配置
localServiceCfg map[string]interface{} //map[serviceName]配置数据* localServiceCfg map[string]interface{} //map[serviceName]配置数据*
serviceDiscovery IServiceDiscovery //服务发现接口 serviceDiscovery IServiceDiscovery //服务发现接口
locker sync.RWMutex //结点与服务关系保护锁 locker sync.RWMutex //结点与服务关系保护锁
mapRpc map[string]*NodeRpcInfo //nodeId mapRpc map[string]*NodeRpcInfo //nodeId
//mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo
mapServiceNode map[string]map[string]struct{} //map[serviceName]map[NodeId] mapServiceNode map[string]map[string]struct{} //map[serviceName]map[NodeId]
rpcServer rpc.Server callSet rpc.CallSet
rpcNats rpc.RpcNats
rpcServer rpc.IServer
rpcEventLocker sync.RWMutex //Rpc事件监听保护锁 rpcEventLocker sync.RWMutex //Rpc事件监听保护锁
mapServiceListenRpcEvent map[string]struct{} //ServiceName mapServiceListenRpcEvent map[string]struct{} //ServiceName
mapServiceListenDiscoveryEvent map[string]struct{} //ServiceName mapServiceListenDiscoveryEvent map[string]struct{} //ServiceName
@@ -82,11 +84,12 @@ func SetServiceDiscovery(serviceDiscovery IServiceDiscovery) {
cluster.serviceDiscovery = serviceDiscovery cluster.serviceDiscovery = serviceDiscovery
} }
func (cls *Cluster) Start() { func (cls *Cluster) Start() error{
cls.rpcServer.Start(cls.localNodeInfo.ListenAddr, cls.localNodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen) return cls.rpcServer.Start()
} }
func (cls *Cluster) Stop() { func (cls *Cluster) Stop() {
cls.rpcServer.Stop()
} }
func (cls *Cluster) DiscardNode(nodeId string) { func (cls *Cluster) DiscardNode(nodeId string) {
@@ -191,7 +194,12 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
//不存在时,则建立连接 //不存在时,则建立连接
rpcInfo := NodeRpcInfo{} rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = *nodeInfo rpcInfo.nodeInfo = *nodeInfo
rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,cls.triggerRpcEvent)
if cls.IsNatsMode() {
rpcInfo.client = cls.rpcNats.NewNatsClient(nodeInfo.NodeId, cls.GetLocalNodeInfo().NodeId,&cls.callSet)
}else{
rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,cls.triggerRpcEvent,&cls.callSet)
}
cls.mapRpc[nodeInfo.NodeId] = &rpcInfo cls.mapRpc[nodeInfo.NodeId] = &rpcInfo
log.Info("Discovery nodeId and new rpc client",log.String("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire),log.String("nodeListenAddr",nodeInfo.ListenAddr)) log.Info("Discovery nodeId and new rpc client",log.String("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire),log.String("nodeListenAddr",nodeInfo.ListenAddr))
} }
@@ -204,7 +212,15 @@ func (cls *Cluster) Init(localNodeId string, setupServiceFun SetupServiceFun) er
return err return err
} }
cls.rpcServer.Init(cls) cls.callSet.Init()
if cls.IsNatsMode() {
cls.rpcNats.Init(cls.rpcMode.Nats.NatsUrl,cls.rpcMode.Nats.NoRandomize,cls.GetLocalNodeInfo().NodeId,cls.localNodeInfo.CompressBytesLen,cls)
cls.rpcServer = &cls.rpcNats
}else{
s := &rpc.Server{}
s.Init(cls.localNodeInfo.ListenAddr,cls.localNodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,cls)
cls.rpcServer = s
}
//2.安装服务发现结点 //2.安装服务发现结点
err = cls.setupDiscovery(localNodeId, setupServiceFun) err = cls.setupDiscovery(localNodeId, setupServiceFun)
@@ -225,7 +241,6 @@ func (cls *Cluster) Init(localNodeId string, setupServiceFun SetupServiceFun) er
return nil return nil
} }
func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler { func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler {
pService := service.GetService(serviceName) pService := service.GetService(serviceName)
if pService == nil { if pService == nil {
@@ -276,8 +291,8 @@ func GetRpcClient(nodeId string, serviceMethod string,filterRetire bool, clientL
return GetCluster().GetNodeIdByService(serviceName, clientList, filterRetire) return GetCluster().GetNodeIdByService(serviceName, clientList, filterRetire)
} }
func GetRpcServer() *rpc.Server { func GetRpcServer() rpc.IServer {
return &cluster.rpcServer return cluster.rpcServer
} }
func (cls *Cluster) IsNodeConnected(nodeId string) bool { func (cls *Cluster) IsNodeConnected(nodeId string) bool {

View File

@@ -15,7 +15,7 @@ func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string,funDelNode Fu
discovery.funSetNode = funSetNode discovery.funSetNode = funSetNode
//解析本地其他服务配置 //解析本地其他服务配置
_,nodeInfoList,err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull) _,nodeInfoList,_,err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -318,7 +318,6 @@ func (dc *OriginDiscoveryClient) addDiscoveryMaster() {
continue continue
} }
dc.funSetNode(&discoveryNodeList.MasterNodeList[i]) dc.funSetNode(&discoveryNodeList.MasterNodeList[i])
} }
} }
@@ -556,6 +555,10 @@ func (cls *Cluster) IsOriginMasterDiscoveryNode(nodeId string) bool {
} }
func (cls *Cluster) getOriginMasterDiscoveryNodeInfo(nodeId string) *NodeInfo { func (cls *Cluster) getOriginMasterDiscoveryNodeInfo(nodeId string) *NodeInfo {
if cls.discoveryInfo.Origin == nil {
return nil
}
for i := 0; i < len(cls.discoveryInfo.Origin.MasterNodeList); i++ { for i := 0; i < len(cls.discoveryInfo.Origin.MasterNodeList); i++ {
if cls.discoveryInfo.Origin.MasterNodeList[i].NodeId == nodeId { if cls.discoveryInfo.Origin.MasterNodeList[i].NodeId == nodeId {
return &cls.discoveryInfo.Origin.MasterNodeList[i] return &cls.discoveryInfo.Origin.MasterNodeList[i]

View File

@@ -9,6 +9,7 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"
"time" "time"
"errors"
) )
var json = jsoniter.ConfigCompatibleWithStandardLibrary var json = jsoniter.ConfigCompatibleWithStandardLibrary
@@ -44,7 +45,19 @@ type DiscoveryInfo struct {
Origin *OriginDiscovery //orign Origin *OriginDiscovery //orign
} }
type NatsConfig struct {
NatsUrl string
NoRandomize bool
}
type RpcMode struct {
Typ string `json:"Type"`
Nats NatsConfig
}
type NodeInfoList struct { type NodeInfoList struct {
RpcMode RpcMode
Discovery DiscoveryInfo Discovery DiscoveryInfo
NodeList []NodeInfo NodeList []NodeInfo
} }
@@ -183,14 +196,41 @@ func (cls *Cluster) readServiceConfig(filepath string) (interface{}, map[string]
return GlobalCfg, serviceConfig, mapNodeService, nil return GlobalCfg, serviceConfig, mapNodeService, nil
} }
func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []NodeInfo, error) { func (cls *Cluster) SetRpcMode(cfgRpcMode *RpcMode,rpcMode *RpcMode) error {
//忽略掉没有设置的配置
if cfgRpcMode.Typ == "" {
return nil
}
//不允许重复的配置Rpc模式
if cfgRpcMode.Typ != "" && rpcMode.Typ != ""{
return errors.New("repeat config RpcMode")
}
//检查Typ是否合法
if cfgRpcMode.Typ!="Nats" && cfgRpcMode.Typ!="Default" {
return fmt.Errorf("RpcMode %s is not support", rpcMode.Typ)
}
if cfgRpcMode.Typ == "Nats" && len(cfgRpcMode.Nats.NatsUrl)==0 {
return fmt.Errorf("nats rpc mode config NatsUrl is empty")
}
*rpcMode = *cfgRpcMode
return nil
}
func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []NodeInfo,RpcMode, error) {
var nodeInfoList []NodeInfo var nodeInfoList []NodeInfo
var discoveryInfo DiscoveryInfo var discoveryInfo DiscoveryInfo
var rpcMode RpcMode
clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster" clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster"
fileInfoList, err := os.ReadDir(clusterCfgPath) fileInfoList, err := os.ReadDir(clusterCfgPath)
if err != nil { if err != nil {
return discoveryInfo, nil, fmt.Errorf("Read dir %s is fail :%+v", clusterCfgPath, err) return discoveryInfo, nil,rpcMode, fmt.Errorf("Read dir %s is fail :%+v", clusterCfgPath, err)
} }
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件 //读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
@@ -200,12 +240,17 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
fileNodeInfoList, rerr := cls.ReadClusterConfig(filePath) fileNodeInfoList, rerr := cls.ReadClusterConfig(filePath)
if rerr != nil { if rerr != nil {
return discoveryInfo, nil, fmt.Errorf("read file path %s is error:%+v", filePath, rerr) return discoveryInfo, nil,rpcMode, fmt.Errorf("read file path %s is error:%+v", filePath, rerr)
}
err = cls.SetRpcMode(&fileNodeInfoList.RpcMode,&rpcMode)
if err != nil {
return discoveryInfo, nil,rpcMode, err
} }
err = discoveryInfo.setDiscovery(&fileNodeInfoList.Discovery) err = discoveryInfo.setDiscovery(&fileNodeInfoList.Discovery)
if err != nil { if err != nil {
return discoveryInfo,nil,err return discoveryInfo,nil,rpcMode,err
} }
for _, nodeInfo := range fileNodeInfoList.NodeList { for _, nodeInfo := range fileNodeInfoList.NodeList {
@@ -217,7 +262,7 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
} }
if nodeId != rpc.NodeIdNull && (len(nodeInfoList) != 1) { if nodeId != rpc.NodeIdNull && (len(nodeInfoList) != 1) {
return discoveryInfo, nil, fmt.Errorf("%d configurations were found for the configuration with node ID %d!", len(nodeInfoList), nodeId) return discoveryInfo, nil,rpcMode, fmt.Errorf("%d configurations were found for the configuration with node ID %d!", len(nodeInfoList), nodeId)
} }
for i, _ := range nodeInfoList { for i, _ := range nodeInfoList {
@@ -231,7 +276,7 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
} }
} }
return discoveryInfo, nodeInfoList, nil return discoveryInfo, nodeInfoList, rpcMode,nil
} }
func (cls *Cluster) readLocalService(localNodeId string) error { func (cls *Cluster) readLocalService(localNodeId string) error {
@@ -325,7 +370,7 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
func (cls *Cluster) parseLocalCfg() { func (cls *Cluster) parseLocalCfg() {
rpcInfo := NodeRpcInfo{} rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = cls.localNodeInfo rpcInfo.nodeInfo = cls.localNodeInfo
rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId) rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId,&cls.callSet)
cls.mapRpc[cls.localNodeInfo.NodeId] = &rpcInfo cls.mapRpc[cls.localNodeInfo.NodeId] = &rpcInfo
@@ -338,18 +383,27 @@ func (cls *Cluster) parseLocalCfg() {
} }
} }
func (cls *Cluster) IsNatsMode() bool {
return cls.rpcMode.Typ == "Nats"
}
func (cls *Cluster) GetNatsUrl() string {
return cls.rpcMode.Nats.NatsUrl
}
func (cls *Cluster) InitCfg(localNodeId string) error { func (cls *Cluster) InitCfg(localNodeId string) error {
cls.localServiceCfg = map[string]interface{}{} cls.localServiceCfg = map[string]interface{}{}
cls.mapRpc = map[string]*NodeRpcInfo{} cls.mapRpc = map[string]*NodeRpcInfo{}
cls.mapServiceNode = map[string]map[string]struct{}{} cls.mapServiceNode = map[string]map[string]struct{}{}
//加载本地结点的NodeList配置 //加载本地结点的NodeList配置
discoveryInfo, nodeInfoList, err := cls.readLocalClusterConfig(localNodeId) discoveryInfo, nodeInfoList,rpcMode, err := cls.readLocalClusterConfig(localNodeId)
if err != nil { if err != nil {
return err return err
} }
cls.localNodeInfo = nodeInfoList[0] cls.localNodeInfo = nodeInfoList[0]
cls.discoveryInfo = discoveryInfo cls.discoveryInfo = discoveryInfo
cls.rpcMode = rpcMode
//读取本地服务配置 //读取本地服务配置
err = cls.readLocalService(localNodeId) err = cls.readLocalService(localNodeId)

View File

@@ -6,6 +6,8 @@ import (
"net" "net"
"sync" "sync"
"time" "time"
"fmt"
"errors"
) )
const( const(
@@ -36,15 +38,20 @@ type TCPServer struct {
MsgParser MsgParser
} }
func (server *TCPServer) Start() { func (server *TCPServer) Start() error{
server.init() err := server.init()
if err != nil {
return err
}
go server.run() go server.run()
return nil
} }
func (server *TCPServer) init() { func (server *TCPServer) init() error{
ln, err := net.Listen("tcp", server.Addr) ln, err := net.Listen("tcp", server.Addr)
if err != nil { if err != nil {
log.Fatal("Listen tcp fail",log.String("error", err.Error())) return fmt.Errorf("Listen tcp fail,error:%s",err.Error())
} }
if server.MaxConnNum <= 0 { if server.MaxConnNum <= 0 {
@@ -89,12 +96,14 @@ func (server *TCPServer) init() {
} }
if server.NewAgent == nil { if server.NewAgent == nil {
log.Fatal("NewAgent must not be nil") return errors.New("NewAgent must not be nil")
} }
server.ln = ln server.ln = ln
server.conns = make(ConnSet) server.conns = make(ConnSet)
server.MsgParser.init() server.MsgParser.init()
return nil
} }
func (server *TCPServer) SetNetMempool(mempool bytespool.IBytesMempool){ func (server *TCPServer) SetNetMempool(mempool bytespool.IBytesMempool){

View File

@@ -326,9 +326,7 @@ func startNode(args interface{}) error {
//5.运行集群 //5.运行集群
cluster.GetCluster().Start() cluster.GetCluster().Start()
//6.监听程序退出信号&性能报告 //6.监听程序退出信号&性能报告
bRun := true bRun := true
var pProfilerTicker *time.Ticker = &time.Ticker{} var pProfilerTicker *time.Ticker = &time.Ticker{}
@@ -352,9 +350,10 @@ func startNode(args interface{}) error {
} }
} }
cluster.GetCluster().Stop()
//7.退出 //7.退出
service.StopAllService() service.StopAllService()
cluster.GetCluster().Stop()
log.Info("Server is stop.") log.Info("Server is stop.")
log.Close() log.Close()

146
rpc/callset.go Normal file
View File

@@ -0,0 +1,146 @@
package rpc
import (
"errors"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/duanhf2012/origin/v2/log"
)
type CallSet struct {
pendingLock sync.RWMutex
startSeq uint64
pending map[uint64]*Call
callRpcTimeout time.Duration
maxCheckCallRpcCount int
callTimerHeap CallTimerHeap
}
func (cs *CallSet) Init(){
cs.pendingLock.Lock()
cs.callTimerHeap.Init()
cs.pending = make(map[uint64]*Call,4096)
cs.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
cs.callRpcTimeout = DefaultRpcTimeout
go cs.checkRpcCallTimeout()
cs.pendingLock.Unlock()
}
func (bc *CallSet) makeCallFail(call *Call) {
if call.callback != nil && call.callback.IsValid() {
call.rpcHandler.PushRpcResponse(call)
} else {
call.done <- call
}
}
func (bc *CallSet) checkRpcCallTimeout() {
for{
time.Sleep(DefaultCheckRpcCallTimeoutInterval)
for i := 0; i < bc.maxCheckCallRpcCount; i++ {
bc.pendingLock.Lock()
callSeq := bc.callTimerHeap.PopTimeout()
if callSeq == 0 {
bc.pendingLock.Unlock()
break
}
pCall := bc.pending[callSeq]
if pCall == nil {
bc.pendingLock.Unlock()
log.Error("call seq is not find",log.Uint64("seq", callSeq))
continue
}
delete(bc.pending,callSeq)
strTimeout := strconv.FormatInt(int64(pCall.TimeOut.Seconds()), 10)
pCall.Err = errors.New("RPC call takes more than " + strTimeout + " seconds,method is "+pCall.ServiceMethod)
log.Error("call timeout",log.String("error",pCall.Err.Error()))
bc.makeCallFail(pCall)
bc.pendingLock.Unlock()
continue
}
}
}
func (bc *CallSet) AddPending(call *Call) {
bc.pendingLock.Lock()
if call.Seq == 0 {
bc.pendingLock.Unlock()
log.Stack("call is error.")
return
}
bc.pending[call.Seq] = call
bc.callTimerHeap.AddTimer(call.Seq,call.TimeOut)
bc.pendingLock.Unlock()
}
func (bc *CallSet) RemovePending(seq uint64) *Call {
if seq == 0 {
return nil
}
bc.pendingLock.Lock()
call := bc.removePending(seq)
bc.pendingLock.Unlock()
return call
}
func (bc *CallSet) removePending(seq uint64) *Call {
v, ok := bc.pending[seq]
if ok == false {
return nil
}
bc.callTimerHeap.Cancel(seq)
delete(bc.pending, seq)
return v
}
func (bc *CallSet) FindPending(seq uint64) (pCall *Call) {
if seq == 0 {
return nil
}
bc.pendingLock.Lock()
pCall = bc.pending[seq]
bc.pendingLock.Unlock()
return pCall
}
func (bc *CallSet) cleanPending(){
bc.pendingLock.Lock()
for {
callSeq := bc.callTimerHeap.PopFirst()
if callSeq == 0 {
break
}
pCall := bc.pending[callSeq]
if pCall == nil {
log.Error("call Seq is not find",log.Uint64("seq",callSeq))
continue
}
delete(bc.pending,callSeq)
pCall.Err = errors.New("nodeid is disconnect ")
bc.makeCallFail(pCall)
}
bc.pendingLock.Unlock()
}
func (bc *CallSet) generateSeq() uint64 {
return atomic.AddUint64(&bc.startSeq, 1)
}

View File

@@ -4,11 +4,9 @@ import (
"errors" "errors"
"github.com/duanhf2012/origin/v2/network" "github.com/duanhf2012/origin/v2/network"
"reflect" "reflect"
"strconv"
"sync"
"sync/atomic"
"time" "time"
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
"fmt"
) )
const( const(
@@ -17,8 +15,7 @@ const(
DefaultRpcMinMsgLen = 2 DefaultRpcMinMsgLen = 2
DefaultMaxCheckCallRpcCount = 1000 DefaultMaxCheckCallRpcCount = 1000
DefaultMaxPendingWriteNum = 200000 DefaultMaxPendingWriteNum = 200000
DefaultConnectInterval = 2*time.Second DefaultConnectInterval = 2*time.Second
DefaultCheckRpcCallTimeoutInterval = 1*time.Second DefaultCheckRpcCallTimeoutInterval = 1*time.Second
DefaultRpcTimeout = 15*time.Second DefaultRpcTimeout = 15*time.Second
@@ -26,29 +23,34 @@ const(
var clientSeq uint32 var clientSeq uint32
type IWriter interface {
WriteMsg (nodeId string,args ...[]byte) error
IsConnected() bool
}
type IRealClient interface { type IRealClient interface {
SetConn(conn *network.TCPConn) SetConn(conn *network.TCPConn)
Close(waitDone bool) Close(waitDone bool)
AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error) AsyncCall(NodeId string,timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error)
Go(timeout time.Duration,rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call Go(NodeId string,timeout time.Duration,rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call
RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call RawGo(NodeId string,timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call
IsConnected() bool IsConnected() bool
Run() Run()
OnClose() OnClose()
Bind(server IServer)
} }
type Client struct { type Client struct {
clientId uint32 clientId uint32
nodeId string targetNodeId string
pendingLock sync.RWMutex compressBytesLen int
startSeq uint64
pending map[uint64]*Call
callRpcTimeout time.Duration
maxCheckCallRpcCount int
callTimerHeap CallTimerHeap *CallSet
IRealClient IRealClient
} }
@@ -58,128 +60,218 @@ func (client *Client) NewClientAgent(conn *network.TCPConn) network.Agent {
return client return client
} }
func (bc *Client) makeCallFail(call *Call) { func (client *Client) GetTargetNodeId() string {
if call.callback != nil && call.callback.IsValid() { return client.targetNodeId
call.rpcHandler.PushRpcResponse(call)
} else {
call.done <- call
}
}
func (bc *Client) checkRpcCallTimeout() {
for{
time.Sleep(DefaultCheckRpcCallTimeoutInterval)
for i := 0; i < bc.maxCheckCallRpcCount; i++ {
bc.pendingLock.Lock()
callSeq := bc.callTimerHeap.PopTimeout()
if callSeq == 0 {
bc.pendingLock.Unlock()
break
}
pCall := bc.pending[callSeq]
if pCall == nil {
bc.pendingLock.Unlock()
log.Error("call seq is not find",log.Uint64("seq", callSeq))
continue
}
delete(bc.pending,callSeq)
strTimeout := strconv.FormatInt(int64(pCall.TimeOut.Seconds()), 10)
pCall.Err = errors.New("RPC call takes more than " + strTimeout + " seconds,method is "+pCall.ServiceMethod)
log.Error("call timeout",log.String("error",pCall.Err.Error()))
bc.makeCallFail(pCall)
bc.pendingLock.Unlock()
continue
}
}
}
func (client *Client) InitPending() {
client.pendingLock.Lock()
client.callTimerHeap.Init()
client.pending = make(map[uint64]*Call,4096)
client.pendingLock.Unlock()
}
func (bc *Client) AddPending(call *Call) {
bc.pendingLock.Lock()
if call.Seq == 0 {
bc.pendingLock.Unlock()
log.Stack("call is error.")
return
}
bc.pending[call.Seq] = call
bc.callTimerHeap.AddTimer(call.Seq,call.TimeOut)
bc.pendingLock.Unlock()
}
func (bc *Client) RemovePending(seq uint64) *Call {
if seq == 0 {
return nil
}
bc.pendingLock.Lock()
call := bc.removePending(seq)
bc.pendingLock.Unlock()
return call
}
func (bc *Client) removePending(seq uint64) *Call {
v, ok := bc.pending[seq]
if ok == false {
return nil
}
bc.callTimerHeap.Cancel(seq)
delete(bc.pending, seq)
return v
}
func (bc *Client) FindPending(seq uint64) (pCall *Call) {
if seq == 0 {
return nil
}
bc.pendingLock.Lock()
pCall = bc.pending[seq]
bc.pendingLock.Unlock()
return pCall
}
func (bc *Client) cleanPending(){
bc.pendingLock.Lock()
for {
callSeq := bc.callTimerHeap.PopFirst()
if callSeq == 0 {
break
}
pCall := bc.pending[callSeq]
if pCall == nil {
log.Error("call Seq is not find",log.Uint64("seq",callSeq))
continue
}
delete(bc.pending,callSeq)
pCall.Err = errors.New("nodeid is disconnect ")
bc.makeCallFail(pCall)
}
bc.pendingLock.Unlock()
}
func (bc *Client) generateSeq() uint64 {
return atomic.AddUint64(&bc.startSeq, 1)
}
func (client *Client) GetNodeId() string {
return client.nodeId
} }
func (client *Client) GetClientId() uint32 { func (client *Client) GetClientId() uint32 {
return client.clientId return client.clientId
} }
func (client *Client) processRpcResponse(responseData []byte) error{
bCompress := (responseData[0]>>7) > 0
processor := GetProcessor(responseData[0]&0x7f)
if processor == nil {
//rc.conn.ReleaseReadMsg(responseData)
err:= errors.New(fmt.Sprintf("cannot find process %d",responseData[0]&0x7f))
log.Error(err.Error())
return err
}
//1.解析head
response := RpcResponse{}
response.RpcResponseData = processor.MakeRpcResponse(0, "", nil)
//解压缩
byteData := responseData[1:]
var compressBuff []byte
if bCompress == true {
var unCompressErr error
compressBuff,unCompressErr = compressor.UncompressBlock(byteData)
if unCompressErr!= nil {
//rc.conn.ReleaseReadMsg(responseData)
err := fmt.Errorf("uncompressBlock failed,err :%s",unCompressErr.Error())
return err
}
byteData = compressBuff
}
err := processor.Unmarshal(byteData, response.RpcResponseData)
if cap(compressBuff) > 0 {
compressor.UnCompressBufferCollection(compressBuff)
}
//rc.conn.ReleaseReadMsg(bytes)
if err != nil {
processor.ReleaseRpcResponse(response.RpcResponseData)
log.Error("rpcClient Unmarshal head error",log.ErrorAttr("error",err))
return nil
}
v := client.RemovePending(response.RpcResponseData.GetSeq())
if v == nil {
log.Error("rpcClient cannot find seq",log.Uint64("seq",response.RpcResponseData.GetSeq()))
} else {
v.Err = nil
if len(response.RpcResponseData.GetReply()) > 0 {
err = processor.Unmarshal(response.RpcResponseData.GetReply(), v.Reply)
if err != nil {
log.Error("rpcClient Unmarshal body failed",log.ErrorAttr("error",err))
v.Err = err
}
}
if response.RpcResponseData.GetErr() != nil {
v.Err = response.RpcResponseData.GetErr()
}
if v.callback != nil && v.callback.IsValid() {
v.rpcHandler.PushRpcResponse(v)
} else {
v.done <- v
}
}
processor.ReleaseRpcResponse(response.RpcResponseData)
return nil
}
//func (rc *Client) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call {
// _, processor := GetProcessorType(args)
// InParam, err := processor.Marshal(args)
// if err != nil {
// log.Error("Marshal is fail",log.ErrorAttr("error",err))
// call := MakeCall()
// call.DoError(err)
// return call
// }
//
// return rc.RawGo(timeout,rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply)
//}
func (rc *Client) rawGo(nodeId string,w IWriter,timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call {
call := MakeCall()
call.ServiceMethod = serviceMethod
call.Reply = reply
call.Seq = rc.generateSeq()
call.TimeOut = timeout
request := MakeRpcRequest(processor, call.Seq, rpcMethodId, serviceMethod, noReply, rawArgs)
bytes, err := processor.Marshal(request.RpcRequestData)
ReleaseRpcRequest(request)
if err != nil {
call.Seq = 0
log.Error("marshal is fail",log.String("error",err.Error()))
call.DoError(err)
return call
}
if w == nil || w.IsConnected()==false {
call.Seq = 0
sErr := errors.New(serviceMethod + " was called failed,rpc client is disconnect")
log.Error("conn is disconnect",log.String("error",sErr.Error()))
call.DoError(sErr)
return call
}
var compressBuff[]byte
bCompress := uint8(0)
if rc.compressBytesLen > 0 && len(bytes) >= rc.compressBytesLen {
var cErr error
compressBuff,cErr = compressor.CompressBlock(bytes)
if cErr != nil {
call.Seq = 0
log.Error("compress fail",log.String("error",cErr.Error()))
call.DoError(cErr)
return call
}
if len(compressBuff) < len(bytes) {
bytes = compressBuff
bCompress = 1<<7
}
}
if noReply == false {
rc.AddPending(call)
}
err = w.WriteMsg(nodeId,[]byte{uint8(processor.GetProcessorType())|bCompress}, bytes)
if cap(compressBuff) >0 {
compressor.CompressBufferCollection(compressBuff)
}
if err != nil {
rc.RemovePending(call.Seq)
log.Error("WiteMsg is fail",log.ErrorAttr("error",err))
call.Seq = 0
call.DoError(err)
}
return call
}
func (rc *Client) asyncCall(nodeId string,w IWriter,timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error) {
processorType, processor := GetProcessorType(args)
InParam, herr := processor.Marshal(args)
if herr != nil {
return emptyCancelRpc,herr
}
seq := rc.generateSeq()
request := MakeRpcRequest(processor, seq, 0, serviceMethod, false, InParam)
bytes, err := processor.Marshal(request.RpcRequestData)
ReleaseRpcRequest(request)
if err != nil {
return emptyCancelRpc,err
}
if w == nil || w.IsConnected()==false {
return emptyCancelRpc,errors.New("Rpc server is disconnect,call " + serviceMethod)
}
var compressBuff[]byte
bCompress := uint8(0)
if rc.compressBytesLen>0 &&len(bytes) >= rc.compressBytesLen {
var cErr error
compressBuff,cErr = compressor.CompressBlock(bytes)
if cErr != nil {
return emptyCancelRpc,cErr
}
if len(compressBuff) < len(bytes) {
bytes = compressBuff
bCompress = 1<<7
}
}
call := MakeCall()
call.Reply = replyParam
call.callback = &callback
call.rpcHandler = rpcHandler
call.ServiceMethod = serviceMethod
call.Seq = seq
call.TimeOut = timeout
rc.AddPending(call)
err = w.WriteMsg(nodeId,[]byte{uint8(processorType)|bCompress}, bytes)
if cap(compressBuff) >0 {
compressor.CompressBufferCollection(compressBuff)
}
if err != nil {
rc.RemovePending(call.Seq)
ReleaseCall(call)
return emptyCancelRpc,err
}
if cancelable {
rpcCancel := RpcCancel{CallSeq:seq,Cli: rc}
return rpcCancel.CancelRpc,nil
}
return emptyCancelRpc,nil
}

View File

@@ -37,7 +37,7 @@ func (lc *LClient) SetConn(conn *network.TCPConn){
func (lc *LClient) Close(waitDone bool){ func (lc *LClient) Close(waitDone bool){
} }
func (lc *LClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call { func (lc *LClient) Go(nodeId string,timeout time.Duration,rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call {
pLocalRpcServer := rpcHandler.GetRpcServer()() pLocalRpcServer := rpcHandler.GetRpcServer()()
//判断是否是同一服务 //判断是否是同一服务
findIndex := strings.Index(serviceMethod, ".") findIndex := strings.Index(serviceMethod, ".")
@@ -70,7 +70,7 @@ func (lc *LClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool,
} }
func (rc *LClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceName string, rawArgs []byte, reply interface{}) *Call { func (rc *LClient) RawGo(nodeId string,timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceName string, rawArgs []byte, reply interface{}) *Call {
pLocalRpcServer := rpcHandler.GetRpcServer()() pLocalRpcServer := rpcHandler.GetRpcServer()()
//服务自我调用 //服务自我调用
@@ -92,7 +92,7 @@ func (rc *LClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor
} }
func (lc *LClient) AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{},cancelable bool) (CancelRpc,error) { func (lc *LClient) AsyncCall(nodeId string,timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{},cancelable bool) (CancelRpc,error) {
pLocalRpcServer := rpcHandler.GetRpcServer()() pLocalRpcServer := rpcHandler.GetRpcServer()()
//判断是否是同一服务 //判断是否是同一服务
@@ -119,17 +119,19 @@ func (lc *LClient) AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, servi
return calcelRpc,nil return calcelRpc,nil
} }
func NewLClient(nodeId string) *Client{ func NewLClient(localNodeId string,callSet *CallSet) *Client{
client := &Client{} client := &Client{}
client.clientId = atomic.AddUint32(&clientSeq, 1) client.clientId = atomic.AddUint32(&clientSeq, 1)
client.nodeId = nodeId client.targetNodeId = localNodeId
client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount //client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
client.callRpcTimeout = DefaultRpcTimeout //client.callRpcTimeout = DefaultRpcTimeout
lClient := &LClient{} lClient := &LClient{}
lClient.selfClient = client lClient.selfClient = client
client.IRealClient = lClient client.IRealClient = lClient
client.InitPending() client.CallSet = callSet
go client.checkRpcCallTimeout()
return client return client
} }
func (rc *LClient) Bind(server IServer){
}

291
rpc/lserver.go Normal file
View File

@@ -0,0 +1,291 @@
package rpc
import (
"errors"
"github.com/duanhf2012/origin/v2/log"
"reflect"
"time"
"strings"
"fmt"
)
type BaseServer struct {
localNodeId string
compressBytesLen int
rpcHandleFinder RpcHandleFinder
iServer IServer
}
func (ls *BaseServer) initBaseServer(compressBytesLen int,rpcHandleFinder RpcHandleFinder){
ls.compressBytesLen = compressBytesLen
ls.rpcHandleFinder = rpcHandleFinder
}
func (ls *BaseServer) myselfRpcHandlerGo(client *Client,handlerName string, serviceMethod string, args interface{},callBack reflect.Value, reply interface{}) error {
rpcHandler := ls.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")
log.Error("service method not config",log.String("serviceMethod",serviceMethod))
return err
}
return rpcHandler.CallMethod(client,serviceMethod, args,callBack, reply)
}
func (ls *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call {
pCall := MakeCall()
pCall.Seq = client.generateSeq()
pCall.TimeOut = timeout
pCall.ServiceMethod = serviceMethod
rpcHandler := ls.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")
log.Error("service method not config",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err))
pCall.Seq = 0
pCall.DoError(err)
return pCall
}
var iParam interface{}
if processor == nil {
_, processor = GetProcessorType(args)
}
if args != nil {
var err error
iParam,err = processor.Clone(args)
if err != nil {
sErr := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
log.Error("deep copy inParam is failed",log.String("handlerName",handlerName),log.String("serviceMethod",serviceMethod))
pCall.Seq = 0
pCall.DoError(sErr)
return pCall
}
}
req := MakeRpcRequest(processor, 0, rpcMethodId, serviceMethod, noReply, nil)
req.inParam = iParam
req.localReply = reply
if rawArgs != nil {
var err error
req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs)
if err != nil {
log.Error("unmarshalInParam is failed",log.String("serviceMethod",serviceMethod),log.Uint32("rpcMethodId",rpcMethodId),log.ErrorAttr("error",err))
pCall.Seq = 0
pCall.DoError(err)
ReleaseRpcRequest(req)
return pCall
}
}
if noReply == false {
client.AddPending(pCall)
callSeq := pCall.Seq
req.requestHandle = func(Returns interface{}, Err RpcError) {
if reply != nil && Returns != reply && Returns != nil {
byteReturns, err := req.rpcProcessor.Marshal(Returns)
if err != nil {
Err = ConvertError(err)
log.Error("returns data cannot be marshal",log.Uint64("seq",callSeq),log.ErrorAttr("error",err))
}else{
err = req.rpcProcessor.Unmarshal(byteReturns, reply)
if err != nil {
Err = ConvertError(err)
log.Error("returns data cannot be Unmarshal",log.Uint64("seq",callSeq),log.ErrorAttr("error",err))
}
}
}
ReleaseRpcRequest(req)
v := client.RemovePending(callSeq)
if v == nil {
log.Error("rpcClient cannot find seq",log.Uint64("seq",callSeq))
return
}
if len(Err) == 0 {
v.Err = nil
v.DoOK()
} else {
log.Error(Err.Error())
v.DoError(Err)
}
}
}
err := rpcHandler.PushRpcRequest(req)
if err != nil {
log.Error(err.Error())
pCall.DoError(err)
ReleaseRpcRequest(req)
}
return pCall
}
func (server *BaseServer) selfNodeRpcHandlerAsyncGo(timeout time.Duration,client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value,cancelable bool) (CancelRpc,error) {
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")
log.Error(err.Error())
return emptyCancelRpc,err
}
_, processor := GetProcessorType(args)
iParam,err := processor.Clone(args)
if err != nil {
errM := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
log.Error(errM.Error())
return emptyCancelRpc,errM
}
req := MakeRpcRequest(processor, 0, 0, serviceMethod, noReply, nil)
req.inParam = iParam
req.localReply = reply
cancelRpc := emptyCancelRpc
var callSeq uint64
if noReply == false {
callSeq = client.generateSeq()
pCall := MakeCall()
pCall.Seq = callSeq
pCall.rpcHandler = callerRpcHandler
pCall.callback = &callback
pCall.Reply = reply
pCall.ServiceMethod = serviceMethod
pCall.TimeOut = timeout
client.AddPending(pCall)
rpcCancel := RpcCancel{CallSeq: callSeq,Cli: client}
cancelRpc = rpcCancel.CancelRpc
req.requestHandle = func(Returns interface{}, Err RpcError) {
v := client.RemovePending(callSeq)
if v == nil {
ReleaseRpcRequest(req)
return
}
if len(Err) == 0 {
v.Err = nil
} else {
v.Err = Err
}
if Returns != nil {
v.Reply = Returns
}
v.rpcHandler.PushRpcResponse(v)
ReleaseRpcRequest(req)
}
}
err = rpcHandler.PushRpcRequest(req)
if err != nil {
ReleaseRpcRequest(req)
if callSeq > 0 {
client.RemovePending(callSeq)
}
return emptyCancelRpc,err
}
return cancelRpc,nil
}
func (bs *BaseServer) processRpcRequest(data []byte,connTag string,wrResponse writeResponse) error{
bCompress := (data[0]>>7) > 0
processor := GetProcessor(data[0]&0x7f)
if processor == nil {
return errors.New("cannot find processor")
}
//解析head
var compressBuff []byte
byteData := data[1:]
if bCompress == true {
var unCompressErr error
compressBuff,unCompressErr = compressor.UncompressBlock(byteData)
if unCompressErr!= nil {
return errors.New("UncompressBlock failed")
}
byteData = compressBuff
}
req := MakeRpcRequest(processor, 0, 0, "", false, nil)
err := processor.Unmarshal(byteData, req.RpcRequestData)
if cap(compressBuff) > 0 {
compressor.UnCompressBufferCollection(compressBuff)
}
if err != nil {
if req.RpcRequestData.GetSeq() > 0 {
rpcError := RpcError(err.Error())
if req.RpcRequestData.IsNoReply() == false {
wrResponse(processor,connTag, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError)
}
}
ReleaseRpcRequest(req)
return err
}
//交给程序处理
serviceMethod := strings.Split(req.RpcRequestData.GetServiceMethod(), ".")
if len(serviceMethod) < 1 {
rpcError := RpcError("rpc request req.ServiceMethod is error")
if req.RpcRequestData.IsNoReply() == false {
wrResponse(processor,connTag, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError)
}
ReleaseRpcRequest(req)
log.Error("rpc request req.ServiceMethod is error")
return nil
}
rpcHandler := bs.rpcHandleFinder.FindRpcHandler(serviceMethod[0])
if rpcHandler == nil {
rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod()))
if req.RpcRequestData.IsNoReply() == false {
wrResponse(processor,connTag, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError)
}
log.Error("serviceMethod not config",log.String("serviceMethod",req.RpcRequestData.GetServiceMethod()))
ReleaseRpcRequest(req)
return nil
}
if req.RpcRequestData.IsNoReply() == false {
req.requestHandle = func(Returns interface{}, Err RpcError) {
wrResponse(processor,connTag, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), Returns, Err)
ReleaseRpcRequest(req)
}
}
req.inParam, err = rpcHandler.UnmarshalInParam(req.rpcProcessor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetRpcMethodId(), req.RpcRequestData.GetInParam())
if err != nil {
rErr := "Call Rpc " + req.RpcRequestData.GetServiceMethod() + " Param error " + err.Error()
log.Error("call rpc param error",log.String("serviceMethod",req.RpcRequestData.GetServiceMethod()),log.ErrorAttr("error",err))
if req.requestHandle != nil {
req.requestHandle(nil, RpcError(rErr))
} else {
ReleaseRpcRequest(req)
}
return nil
}
err = rpcHandler.PushRpcRequest(req)
if err != nil {
rpcError := RpcError(err.Error())
if req.RpcRequestData.IsNoReply() {
wrResponse(processor, connTag,req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError)
}
ReleaseRpcRequest(req)
}
return nil
}

89
rpc/natsclient.go Normal file
View File

@@ -0,0 +1,89 @@
package rpc
import (
"github.com/duanhf2012/origin/v2/network"
"reflect"
"time"
"github.com/nats-io/nats.go"
"github.com/duanhf2012/origin/v2/log"
)
//跨结点连接的Client
type NatsClient struct {
localNodeId string
natsConn *nats.Conn
client *Client
}
func (nc *NatsClient) Start(natsConn *nats.Conn) error{
nc.natsConn = natsConn
_,err := nc.natsConn.QueueSubscribe("oc."+nc.localNodeId, "oc",nc.onSubscribe)
return err
}
func (nc *NatsClient) onSubscribe(msg *nats.Msg){
//处理消息
nc.client.processRpcResponse(msg.Data)
}
func (nc *NatsClient) SetConn(conn *network.TCPConn){
}
func (nc *NatsClient) Close(waitDone bool){
}
func (nc *NatsClient) Run(){
}
func (nc *NatsClient) OnClose(){
}
func (rc *NatsClient) Bind(server IServer){
s := server.(*NatsServer)
rc.natsConn = s.natsConn
}
func (rc *NatsClient) Go(nodeId string,timeout time.Duration,rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call {
_, processor := GetProcessorType(args)
InParam, err := processor.Marshal(args)
if err != nil {
log.Error("Marshal is fail",log.ErrorAttr("error",err))
call := MakeCall()
call.DoError(err)
return call
}
return rc.client.rawGo(nodeId,rc,timeout,rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply)
}
func (rc *NatsClient) RawGo(nodeId string,timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call {
return rc.client.rawGo(nodeId,rc,timeout,rpcHandler,processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply)
}
func (rc *NatsClient) AsyncCall(nodeId string,timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error) {
cancelRpc,err := rc.client.asyncCall(nodeId,rc,timeout,rpcHandler, serviceMethod, callback, args, replyParam,cancelable)
if err != nil {
callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)})
}
return cancelRpc,nil
}
func (rc *NatsClient) WriteMsg (nodeId string,args ...[]byte) error{
buff := make([]byte,0,4096)
for _,ar := range args {
buff = append(buff,ar...)
}
var msg nats.Msg
msg.Subject = "os."+nodeId
msg.Data = buff
msg.Header = nats.Header{}
msg.Header.Set("fnode",rc.localNodeId)
return rc.natsConn.PublishMsg(&msg)
}
func (rc *NatsClient) IsConnected() bool{
return rc.natsConn.Status() == nats.CONNECTED
}

119
rpc/natsserver.go Normal file
View File

@@ -0,0 +1,119 @@
package rpc
import (
"github.com/nats-io/nats.go"
"github.com/duanhf2012/origin/v2/log"
"time"
)
type NatsServer struct {
BaseServer
natsUrl string
natsConn *nats.Conn
NoRandomize bool
nodeSubTopic string
compressBytesLen int
}
func (ns *NatsServer) Start() error{
var err error
var options []nats.Option
options = append(options,nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Error("nats is disconnect",log.String("connUrl",nc.ConnectedUrl()))
// handle disconnect error event
}))
options = append(options,nats.ReconnectHandler(func(nc *nats.Conn) {
log.Error("nats is reconnection",log.String("connUrl",nc.ConnectedUrl()))
// handle reconnect event
}))
options = append(options,nats.MaxReconnects(-1))
options = append(options,nats.ReconnectWait(time.Second*3))
if ns.NoRandomize {
options = append(options,nats.DontRandomize())
}
ns.natsConn,err = nats.Connect(ns.natsUrl,options...)
if err != nil {
log.Error("Connect to nats fail",log.String("natsUrl",ns.natsUrl),log.ErrorAttr("err",err))
return err
}
//开始订阅
_,err = ns.natsConn.QueueSubscribe(ns.nodeSubTopic,"os", func(msg *nats.Msg) {
ns.processRpcRequest(msg.Data,msg.Header.Get("fnode"),ns.WriteResponse)
})
return err
}
func (ns *NatsServer) WriteResponse(processor IRpcProcessor, nodeId string,serviceMethod string, seq uint64, reply interface{}, rpcError RpcError){
var mReply []byte
var err error
if reply != nil {
mReply, err = processor.Marshal(reply)
if err != nil {
rpcError = ConvertError(err)
}
}
var rpcResponse RpcResponse
rpcResponse.RpcResponseData = processor.MakeRpcResponse(seq, rpcError, mReply)
bytes, err := processor.Marshal(rpcResponse.RpcResponseData)
defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData)
if err != nil {
log.Error("mashal RpcResponseData failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err))
return
}
var compressBuff[]byte
bCompress := uint8(0)
if ns.compressBytesLen >0 && len(bytes) >= ns.compressBytesLen {
compressBuff,err = compressor.CompressBlock(bytes)
if err != nil {
log.Error("CompressBlock failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err))
return
}
if len(compressBuff) < len(bytes) {
bytes = compressBuff
bCompress = 1<<7
}
}
sendData := make([]byte,0,4096)
byteTypeAndCompress := []byte{uint8(processor.GetProcessorType())|bCompress}
sendData = append(sendData,byteTypeAndCompress...)
sendData = append(sendData,bytes...)
err = ns.natsConn.PublishMsg(&nats.Msg{Subject: "oc."+nodeId, Data: sendData})
if cap(compressBuff) >0 {
compressor.CompressBufferCollection(compressBuff)
}
if err != nil {
log.Error("WriteMsg error,Rpc return is fail",log.String("nodeId",nodeId),log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err))
}
}
func (ns *NatsServer) Stop(){
if ns.natsConn != nil {
ns.natsConn.Close()
}
}
func (ns *NatsServer) initServer(natsUrl string, noRandomize bool,localNodeId string,compressBytesLen int,rpcHandleFinder RpcHandleFinder){
ns.natsUrl = natsUrl
ns.NoRandomize = noRandomize
ns.localNodeId = localNodeId
ns.compressBytesLen = compressBytesLen
ns.initBaseServer(compressBytesLen,rpcHandleFinder)
ns.nodeSubTopic = "os."+localNodeId //服务器
}

View File

@@ -1,7 +1,6 @@
package rpc package rpc
import ( import (
"errors"
"fmt" "fmt"
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
"github.com/duanhf2012/origin/v2/network" "github.com/duanhf2012/origin/v2/network"
@@ -14,7 +13,6 @@ import (
//跨结点连接的Client //跨结点连接的Client
type RClient struct { type RClient struct {
compressBytesLen int
selfClient *Client selfClient *Client
network.TCPClient network.TCPClient
conn *network.TCPConn conn *network.TCPConn
@@ -42,7 +40,11 @@ func (rc *RClient) SetConn(conn *network.TCPConn){
rc.Unlock() rc.Unlock()
} }
func (rc *RClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call { func (rc *RClient) WriteMsg (nodeId string,args ...[]byte) error{
return rc.conn.WriteMsg(args...)
}
func (rc *RClient) Go(nodeId string,timeout time.Duration,rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call {
_, processor := GetProcessorType(args) _, processor := GetProcessorType(args)
InParam, err := processor.Marshal(args) InParam, err := processor.Marshal(args)
if err != nil { if err != nil {
@@ -52,74 +54,15 @@ func (rc *RClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool,
return call return call
} }
return rc.RawGo(timeout,rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply) return rc.selfClient.rawGo(nodeId,rc,timeout,rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply)
} }
func (rc *RClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call { func (rc *RClient) RawGo(nodeId string,timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call {
call := MakeCall() return rc.selfClient.rawGo(nodeId,rc,timeout,rpcHandler,processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply)
call.ServiceMethod = serviceMethod
call.Reply = reply
call.Seq = rc.selfClient.generateSeq()
call.TimeOut = timeout
request := MakeRpcRequest(processor, call.Seq, rpcMethodId, serviceMethod, noReply, rawArgs)
bytes, err := processor.Marshal(request.RpcRequestData)
ReleaseRpcRequest(request)
if err != nil {
call.Seq = 0
log.Error("marshal is fail",log.String("error",err.Error()))
call.DoError(err)
return call
}
conn := rc.GetConn()
if conn == nil || conn.IsConnected()==false {
call.Seq = 0
sErr := errors.New(serviceMethod + " was called failed,rpc client is disconnect")
log.Error("conn is disconnect",log.String("error",sErr.Error()))
call.DoError(sErr)
return call
}
var compressBuff[]byte
bCompress := uint8(0)
if rc.compressBytesLen > 0 && len(bytes) >= rc.compressBytesLen {
var cErr error
compressBuff,cErr = compressor.CompressBlock(bytes)
if cErr != nil {
call.Seq = 0
log.Error("compress fail",log.String("error",cErr.Error()))
call.DoError(cErr)
return call
}
if len(compressBuff) < len(bytes) {
bytes = compressBuff
bCompress = 1<<7
}
}
if noReply == false {
rc.selfClient.AddPending(call)
}
err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())|bCompress}, bytes)
if cap(compressBuff) >0 {
compressor.CompressBufferCollection(compressBuff)
}
if err != nil {
rc.selfClient.RemovePending(call.Seq)
log.Error("WiteMsg is fail",log.ErrorAttr("error",err))
call.Seq = 0
call.DoError(err)
}
return call
} }
func (rc *RClient) AsyncCall(nodeId string,timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error) {
func (rc *RClient) AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error) { cancelRpc,err := rc.selfClient.asyncCall(nodeId,rc,timeout,rpcHandler, serviceMethod, callback, args, replyParam,cancelable)
cancelRpc,err := rc.asyncCall(timeout,rpcHandler, serviceMethod, callback, args, replyParam,cancelable)
if err != nil { if err != nil {
callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)}) callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)})
} }
@@ -127,68 +70,6 @@ func (rc *RClient) AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, servi
return cancelRpc,nil return cancelRpc,nil
} }
func (rc *RClient) asyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error) {
processorType, processor := GetProcessorType(args)
InParam, herr := processor.Marshal(args)
if herr != nil {
return emptyCancelRpc,herr
}
seq := rc.selfClient.generateSeq()
request := MakeRpcRequest(processor, seq, 0, serviceMethod, false, InParam)
bytes, err := processor.Marshal(request.RpcRequestData)
ReleaseRpcRequest(request)
if err != nil {
return emptyCancelRpc,err
}
conn := rc.GetConn()
if conn == nil || conn.IsConnected()==false {
return emptyCancelRpc,errors.New("Rpc server is disconnect,call " + serviceMethod)
}
var compressBuff[]byte
bCompress := uint8(0)
if rc.compressBytesLen>0 &&len(bytes) >= rc.compressBytesLen {
var cErr error
compressBuff,cErr = compressor.CompressBlock(bytes)
if cErr != nil {
return emptyCancelRpc,cErr
}
if len(compressBuff) < len(bytes) {
bytes = compressBuff
bCompress = 1<<7
}
}
call := MakeCall()
call.Reply = replyParam
call.callback = &callback
call.rpcHandler = rpcHandler
call.ServiceMethod = serviceMethod
call.Seq = seq
call.TimeOut = timeout
rc.selfClient.AddPending(call)
err = conn.WriteMsg([]byte{uint8(processorType)|bCompress}, bytes)
if cap(compressBuff) >0 {
compressor.CompressBufferCollection(compressBuff)
}
if err != nil {
rc.selfClient.RemovePending(call.Seq)
ReleaseCall(call)
return emptyCancelRpc,err
}
if cancelable {
rpcCancel := RpcCancel{CallSeq:seq,Cli: rc.selfClient}
return rpcCancel.CancelRpc,nil
}
return emptyCancelRpc,nil
}
func (rc *RClient) Run() { func (rc *RClient) Run() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
@@ -199,7 +80,7 @@ func (rc *RClient) Run() {
} }
}() }()
rc.TriggerRpcConnEvent(true, rc.selfClient.GetClientId(), rc.selfClient.GetNodeId()) rc.TriggerRpcConnEvent(true, rc.selfClient.GetClientId(), rc.selfClient.GetTargetNodeId())
for { for {
bytes, err := rc.conn.ReadMsg() bytes, err := rc.conn.ReadMsg()
if err != nil { if err != nil {
@@ -207,85 +88,27 @@ func (rc *RClient) Run() {
return return
} }
bCompress := (bytes[0]>>7) > 0 err = rc.selfClient.processRpcResponse(bytes)
processor := GetProcessor(bytes[0]&0x7f)
if processor == nil {
rc.conn.ReleaseReadMsg(bytes)
log.Error("cannot find process",log.Uint8("process type",bytes[0]&0x7f))
return
}
//1.解析head
response := RpcResponse{}
response.RpcResponseData = processor.MakeRpcResponse(0, "", nil)
//解压缩
byteData := bytes[1:]
var compressBuff []byte
if bCompress == true {
var unCompressErr error
compressBuff,unCompressErr = compressor.UncompressBlock(byteData)
if unCompressErr!= nil {
rc.conn.ReleaseReadMsg(bytes)
log.Error("uncompressBlock failed",log.ErrorAttr("error",unCompressErr))
return
}
byteData = compressBuff
}
err = processor.Unmarshal(byteData, response.RpcResponseData)
if cap(compressBuff) > 0 {
compressor.UnCompressBufferCollection(compressBuff)
}
rc.conn.ReleaseReadMsg(bytes) rc.conn.ReleaseReadMsg(bytes)
if err != nil { if err != nil {
processor.ReleaseRpcResponse(response.RpcResponseData) return
log.Error("rpcClient Unmarshal head error",log.ErrorAttr("error",err))
continue
} }
v := rc.selfClient.RemovePending(response.RpcResponseData.GetSeq())
if v == nil {
log.Error("rpcClient cannot find seq",log.Uint64("seq",response.RpcResponseData.GetSeq()))
} else {
v.Err = nil
if len(response.RpcResponseData.GetReply()) > 0 {
err = processor.Unmarshal(response.RpcResponseData.GetReply(), v.Reply)
if err != nil {
log.Error("rpcClient Unmarshal body failed",log.ErrorAttr("error",err))
v.Err = err
}
}
if response.RpcResponseData.GetErr() != nil {
v.Err = response.RpcResponseData.GetErr()
}
if v.callback != nil && v.callback.IsValid() {
v.rpcHandler.PushRpcResponse(v)
} else {
v.done <- v
}
}
processor.ReleaseRpcResponse(response.RpcResponseData)
} }
} }
func (rc *RClient) OnClose() { func (rc *RClient) OnClose() {
rc.TriggerRpcConnEvent(false, rc.selfClient.GetClientId(), rc.selfClient.GetNodeId()) rc.TriggerRpcConnEvent(false, rc.selfClient.GetClientId(), rc.selfClient.GetTargetNodeId())
} }
func NewRClient(nodeId string, addr string, maxRpcParamLen uint32,compressBytesLen int,triggerRpcConnEvent TriggerRpcConnEvent) *Client{ func NewRClient(targetNodeId string, addr string, maxRpcParamLen uint32,compressBytesLen int,triggerRpcConnEvent TriggerRpcConnEvent,callSet *CallSet) *Client{
client := &Client{} client := &Client{}
client.clientId = atomic.AddUint32(&clientSeq, 1) client.clientId = atomic.AddUint32(&clientSeq, 1)
client.nodeId = nodeId client.targetNodeId = targetNodeId
client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount //client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
client.callRpcTimeout = DefaultRpcTimeout //client.callRpcTimeout = DefaultRpcTimeout
client.compressBytesLen = compressBytesLen
c:= &RClient{} c:= &RClient{}
c.compressBytesLen = compressBytesLen
c.selfClient = client c.selfClient = client
c.Addr = addr c.Addr = addr
c.ConnectInterval = DefaultConnectInterval c.ConnectInterval = DefaultConnectInterval
@@ -306,8 +129,7 @@ func NewRClient(nodeId string, addr string, maxRpcParamLen uint32,compressBytesL
c.MaxMsgLen = math.MaxUint32 c.MaxMsgLen = math.MaxUint32
} }
client.IRealClient = c client.IRealClient = c
client.InitPending() client.CallSet = callSet
go client.checkRpcCallTimeout()
c.Start() c.Start()
return client return client
} }
@@ -318,3 +140,7 @@ func (rc *RClient) Close(waitDone bool) {
rc.selfClient.cleanPending() rc.selfClient.cleanPending()
} }
func (rc *RClient) Bind(server IServer){
}

View File

@@ -15,7 +15,7 @@ import (
const maxClusterNode int = 128 const maxClusterNode int = 128
type FuncRpcClient func(nodeId string, serviceMethod string,filterRetire bool, client []*Client) (error, int) type FuncRpcClient func(nodeId string, serviceMethod string,filterRetire bool, client []*Client) (error, int)
type FuncRpcServer func() *Server type FuncRpcServer func() IServer
const NodeIdNull = "" const NodeIdNull = ""
var nilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()) var nilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem())
@@ -445,7 +445,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId str
//2.rpcClient调用 //2.rpcClient调用
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
pCall := pClientList[i].Go(DefaultRpcTimeout,handler.rpcHandler,true, serviceMethod, args, nil) pCall := pClientList[i].Go(pClientList[i].GetTargetNodeId(),DefaultRpcTimeout,handler.rpcHandler,true, serviceMethod, args, nil)
if pCall.Err != nil { if pCall.Err != nil {
err = pCall.Err err = pCall.Err
} }
@@ -472,7 +472,7 @@ func (handler *RpcHandler) callRpc(timeout time.Duration,nodeId string, serviceM
} }
pClient := pClientList[0] pClient := pClientList[0]
pCall := pClient.Go(timeout,handler.rpcHandler,false, serviceMethod, args, reply) pCall := pClient.Go(pClient.GetTargetNodeId(),timeout,handler.rpcHandler,false, serviceMethod, args, reply)
err = pCall.Done().Err err = pCall.Done().Err
pClient.RemovePending(pCall.Seq) pClient.RemovePending(pCall.Seq)
@@ -525,7 +525,7 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId string, ser
//2.rpcClient调用 //2.rpcClient调用
//如果调用本结点服务 //如果调用本结点服务
return pClientList[0].AsyncCall(timeout,handler.rpcHandler, serviceMethod, fVal, args, reply,false) return pClientList[0].AsyncCall(pClientList[0].GetTargetNodeId(),timeout,handler.rpcHandler, serviceMethod, fVal, args, reply,false)
} }
func (handler *RpcHandler) GetName() string { func (handler *RpcHandler) GetName() string {
@@ -600,7 +600,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId s
//如果调用本结点服务 //如果调用本结点服务
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
//跨node调用 //跨node调用
pCall := handler.pClientList[i].RawGo(DefaultRpcTimeout,handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil) pCall := handler.pClientList[i].RawGo(handler.pClientList[i].GetTargetNodeId(),DefaultRpcTimeout,handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil)
if pCall.Err != nil { if pCall.Err != nil {
err = pCall.Err err = pCall.Err
} }

41
rpc/rpcnats.go Normal file
View File

@@ -0,0 +1,41 @@
package rpc
import "sync/atomic"
type RpcNats struct {
NatsServer
NatsClient
}
func (rn *RpcNats) Start() error{
err := rn.NatsServer.Start()
if err != nil {
return err
}
return rn.NatsClient.Start(rn.NatsServer.natsConn)
}
func (rn *RpcNats) Init(natsUrl string, noRandomize bool, nodeId string,compressBytesLen int,rpcHandleFinder RpcHandleFinder){
rn.NatsClient.localNodeId = nodeId
rn.NatsServer.initServer(natsUrl,noRandomize, nodeId,compressBytesLen,rpcHandleFinder)
rn.NatsServer.iServer = rn
}
func (rn *RpcNats) NewNatsClient(targetNodeId string,localNodeId string,callSet *CallSet) *Client{
var client Client
client.clientId = atomic.AddUint32(&clientSeq, 1)
client.targetNodeId = targetNodeId
//client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
//client.callRpcTimeout = DefaultRpcTimeout
natsClient := &rn.NatsClient
natsClient.localNodeId = localNodeId
natsClient.client = &client
client.IRealClient = natsClient
client.CallSet = callSet
return &client
}

View File

@@ -1,7 +1,6 @@
package rpc package rpc
import ( import (
"errors"
"fmt" "fmt"
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
"github.com/duanhf2012/origin/v2/network" "github.com/duanhf2012/origin/v2/network"
@@ -13,6 +12,7 @@ import (
"runtime" "runtime"
) )
const Default_ReadWriteDeadline = 15*time.Second
type RpcProcessorType uint8 type RpcProcessorType uint8
const ( const (
@@ -24,12 +24,25 @@ var arrayProcessor = []IRpcProcessor{&JsonProcessor{}, &PBProcessor{}}
var arrayProcessorLen uint8 = 2 var arrayProcessorLen uint8 = 2
var LittleEndian bool var LittleEndian bool
type IServer interface {
Start() error
Stop()
selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call
myselfRpcHandlerGo(client *Client,handlerName string, serviceMethod string, args interface{},callBack reflect.Value, reply interface{}) error
selfNodeRpcHandlerAsyncGo(timeout time.Duration,client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value,cancelable bool) (CancelRpc,error)
}
type writeResponse func(processor IRpcProcessor,connTag string, serviceMethod string, seq uint64, reply interface{}, rpcError RpcError)
type Server struct { type Server struct {
BaseServer
functions map[interface{}]interface{} functions map[interface{}]interface{}
rpcHandleFinder RpcHandleFinder
rpcServer *network.TCPServer rpcServer *network.TCPServer
compressBytesLen int listenAddr string
maxRpcParamLen uint32
} }
type RpcAgent struct { type RpcAgent struct {
@@ -60,24 +73,25 @@ func GetProcessor(processorType uint8) IRpcProcessor {
return arrayProcessor[processorType] return arrayProcessor[processorType]
} }
func (server *Server) Init(rpcHandleFinder RpcHandleFinder) { func (server *Server) Init(listenAddr string, maxRpcParamLen uint32,compressBytesLen int,rpcHandleFinder RpcHandleFinder) {
server.rpcHandleFinder = rpcHandleFinder server.initBaseServer(compressBytesLen,rpcHandleFinder)
server.listenAddr = listenAddr
server.maxRpcParamLen = maxRpcParamLen
server.rpcServer = &network.TCPServer{} server.rpcServer = &network.TCPServer{}
} }
const Default_ReadWriteDeadline = 15*time.Second func (server *Server) Start() error{
splitAddr := strings.Split(server.listenAddr, ":")
func (server *Server) Start(listenAddr string, maxRpcParamLen uint32,compressBytesLen int) {
splitAddr := strings.Split(listenAddr, ":")
if len(splitAddr) != 2 { if len(splitAddr) != 2 {
log.Fatal("listen addr is failed", log.String("listenAddr",listenAddr)) return fmt.Errorf("listen addr is failed,listenAddr:%s", server.listenAddr)
} }
server.rpcServer.Addr = ":" + splitAddr[1] server.rpcServer.Addr = ":" + splitAddr[1]
server.rpcServer.MinMsgLen = 2 server.rpcServer.MinMsgLen = 2
server.compressBytesLen = compressBytesLen server.compressBytesLen = server.compressBytesLen
if maxRpcParamLen > 0 { if server.maxRpcParamLen > 0 {
server.rpcServer.MaxMsgLen = maxRpcParamLen server.rpcServer.MaxMsgLen = server.maxRpcParamLen
} else { } else {
server.rpcServer.MaxMsgLen = math.MaxUint32 server.rpcServer.MaxMsgLen = math.MaxUint32
} }
@@ -90,12 +104,16 @@ func (server *Server) Start(listenAddr string, maxRpcParamLen uint32,compressByt
server.rpcServer.ReadDeadline = Default_ReadWriteDeadline server.rpcServer.ReadDeadline = Default_ReadWriteDeadline
server.rpcServer.LenMsgLen = DefaultRpcLenMsgLen server.rpcServer.LenMsgLen = DefaultRpcLenMsgLen
server.rpcServer.Start() return server.rpcServer.Start()
}
func (server *Server) Stop(){
server.rpcServer.Close()
} }
func (agent *RpcAgent) OnDestroy() {} func (agent *RpcAgent) OnDestroy() {}
func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod string, seq uint64, reply interface{}, rpcError RpcError) { func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, connTag string, serviceMethod string, seq uint64, reply interface{}, rpcError RpcError) {
var mReply []byte var mReply []byte
var errM error var errM error
@@ -159,102 +177,12 @@ func (agent *RpcAgent) Run() {
break break
} }
bCompress := (data[0]>>7) > 0 defer agent.conn.ReleaseReadMsg(data)
processor := GetProcessor(data[0]&0x7f) err = agent.rpcServer.processRpcRequest( data,"",agent.WriteResponse)
if processor == nil {
agent.conn.ReleaseReadMsg(data)
log.Warning("cannot find processor",log.String("RemoteAddr",agent.conn.RemoteAddr().String()))
return
}
//解析head
var compressBuff []byte
byteData := data[1:]
if bCompress == true {
var unCompressErr error
compressBuff,unCompressErr = compressor.UncompressBlock(byteData)
if unCompressErr!= nil {
agent.conn.ReleaseReadMsg(data)
log.Error("UncompressBlock failed",log.String("RemoteAddr",agent.conn.RemoteAddr().String()),log.ErrorAttr("error",unCompressErr))
return
}
byteData = compressBuff
}
req := MakeRpcRequest(processor, 0, 0, "", false, nil)
err = processor.Unmarshal(byteData, req.RpcRequestData)
if cap(compressBuff) > 0 {
compressor.UnCompressBufferCollection(compressBuff)
}
agent.conn.ReleaseReadMsg(data)
if err != nil { if err != nil {
log.Error("Unmarshal failed",log.String("RemoteAddr",agent.conn.RemoteAddr().String()),log.ErrorAttr("error",err)) log.Error("processRpcRequest is error",log.String("remoteAddress",agent.conn.RemoteAddr().String()),log.ErrorAttr("error",err))
if req.RpcRequestData.GetSeq() > 0 { //will close tcpconn
rpcError := RpcError(err.Error()) break
if req.RpcRequestData.IsNoReply() == false {
agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError)
}
ReleaseRpcRequest(req)
continue
} else {
ReleaseRpcRequest(req)
break
}
}
//交给程序处理
serviceMethod := strings.Split(req.RpcRequestData.GetServiceMethod(), ".")
if len(serviceMethod) < 1 {
rpcError := RpcError("rpc request req.ServiceMethod is error")
if req.RpcRequestData.IsNoReply() == false {
agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError)
}
ReleaseRpcRequest(req)
log.Error("rpc request req.ServiceMethod is error")
continue
}
rpcHandler := agent.rpcServer.rpcHandleFinder.FindRpcHandler(serviceMethod[0])
if rpcHandler == nil {
rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod()))
if req.RpcRequestData.IsNoReply() == false {
agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError)
}
log.Error("serviceMethod not config",log.String("serviceMethod",req.RpcRequestData.GetServiceMethod()))
ReleaseRpcRequest(req)
continue
}
if req.RpcRequestData.IsNoReply() == false {
req.requestHandle = func(Returns interface{}, Err RpcError) {
agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), Returns, Err)
ReleaseRpcRequest(req)
}
}
req.inParam, err = rpcHandler.UnmarshalInParam(req.rpcProcessor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetRpcMethodId(), req.RpcRequestData.GetInParam())
if err != nil {
rErr := "Call Rpc " + req.RpcRequestData.GetServiceMethod() + " Param error " + err.Error()
log.Error("call rpc param error",log.String("serviceMethod",req.RpcRequestData.GetServiceMethod()),log.ErrorAttr("error",err))
if req.requestHandle != nil {
req.requestHandle(nil, RpcError(rErr))
} else {
ReleaseRpcRequest(req)
}
continue
}
err = rpcHandler.PushRpcRequest(req)
if err != nil {
rpcError := RpcError(err.Error())
if req.RpcRequestData.IsNoReply() {
agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError)
}
ReleaseRpcRequest(req)
} }
} }
} }
@@ -287,174 +215,3 @@ func (server *Server) NewAgent(c *network.TCPConn) network.Agent {
return agent return agent
} }
func (server *Server) myselfRpcHandlerGo(client *Client,handlerName string, serviceMethod string, args interface{},callBack reflect.Value, reply interface{}) error {
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")
log.Error("service method not config",log.String("serviceMethod",serviceMethod))
return err
}
return rpcHandler.CallMethod(client,serviceMethod, args,callBack, reply)
}
func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call {
pCall := MakeCall()
pCall.Seq = client.generateSeq()
pCall.TimeOut = timeout
pCall.ServiceMethod = serviceMethod
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")
log.Error("service method not config",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err))
pCall.Seq = 0
pCall.DoError(err)
return pCall
}
var iParam interface{}
if processor == nil {
_, processor = GetProcessorType(args)
}
if args != nil {
var err error
iParam,err = processor.Clone(args)
if err != nil {
sErr := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
log.Error("deep copy inParam is failed",log.String("handlerName",handlerName),log.String("serviceMethod",serviceMethod))
pCall.Seq = 0
pCall.DoError(sErr)
return pCall
}
}
req := MakeRpcRequest(processor, 0, rpcMethodId, serviceMethod, noReply, nil)
req.inParam = iParam
req.localReply = reply
if rawArgs != nil {
var err error
req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs)
if err != nil {
log.Error("unmarshalInParam is failed",log.String("serviceMethod",serviceMethod),log.Uint32("rpcMethodId",rpcMethodId),log.ErrorAttr("error",err))
pCall.Seq = 0
pCall.DoError(err)
ReleaseRpcRequest(req)
return pCall
}
}
if noReply == false {
client.AddPending(pCall)
callSeq := pCall.Seq
req.requestHandle = func(Returns interface{}, Err RpcError) {
if reply != nil && Returns != reply && Returns != nil {
byteReturns, err := req.rpcProcessor.Marshal(Returns)
if err != nil {
Err = ConvertError(err)
log.Error("returns data cannot be marshal",log.Uint64("seq",callSeq),log.ErrorAttr("error",err))
}else{
err = req.rpcProcessor.Unmarshal(byteReturns, reply)
if err != nil {
Err = ConvertError(err)
log.Error("returns data cannot be Unmarshal",log.Uint64("seq",callSeq),log.ErrorAttr("error",err))
}
}
}
ReleaseRpcRequest(req)
v := client.RemovePending(callSeq)
if v == nil {
log.Error("rpcClient cannot find seq",log.Uint64("seq",callSeq))
return
}
if len(Err) == 0 {
v.Err = nil
v.DoOK()
} else {
log.Error(Err.Error())
v.DoError(Err)
}
}
}
err := rpcHandler.PushRpcRequest(req)
if err != nil {
log.Error(err.Error())
pCall.DoError(err)
ReleaseRpcRequest(req)
}
return pCall
}
func (server *Server) selfNodeRpcHandlerAsyncGo(timeout time.Duration,client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value,cancelable bool) (CancelRpc,error) {
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")
log.Error(err.Error())
return emptyCancelRpc,err
}
_, processor := GetProcessorType(args)
iParam,err := processor.Clone(args)
if err != nil {
errM := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
log.Error(errM.Error())
return emptyCancelRpc,errM
}
req := MakeRpcRequest(processor, 0, 0, serviceMethod, noReply, nil)
req.inParam = iParam
req.localReply = reply
cancelRpc := emptyCancelRpc
var callSeq uint64
if noReply == false {
callSeq = client.generateSeq()
pCall := MakeCall()
pCall.Seq = callSeq
pCall.rpcHandler = callerRpcHandler
pCall.callback = &callback
pCall.Reply = reply
pCall.ServiceMethod = serviceMethod
pCall.TimeOut = timeout
client.AddPending(pCall)
rpcCancel := RpcCancel{CallSeq: callSeq,Cli: client}
cancelRpc = rpcCancel.CancelRpc
req.requestHandle = func(Returns interface{}, Err RpcError) {
v := client.RemovePending(callSeq)
if v == nil {
ReleaseRpcRequest(req)
return
}
if len(Err) == 0 {
v.Err = nil
} else {
v.Err = Err
}
if Returns != nil {
v.Reply = Returns
}
v.rpcHandler.PushRpcResponse(v)
ReleaseRpcRequest(req)
}
}
err = rpcHandler.PushRpcRequest(req)
if err != nil {
ReleaseRpcRequest(req)
if callSeq > 0 {
client.RemovePending(callSeq)
}
return emptyCancelRpc,err
}
return cancelRpc,nil
}