From 5344da1276f365de3ac658dab9f84bf33bdcf60e Mon Sep 17 00:00:00 2001 From: duanhf2012 Date: Tue, 31 Mar 2020 18:29:00 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E8=87=AA=E5=AE=9A=E4=B9=89?= =?UTF-8?q?=E6=97=A5=E5=BF=97=EF=BC=8C=E4=BC=98=E5=8C=96=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E8=BF=94=E5=9B=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 3 ++- example/main.go | 47 ++++++++++++++++++++++++++++------------------ node/node.go | 10 +++++++++- rpc/client.go | 12 +++--------- rpc/rpchandler.go | 11 +++++++---- 5 files changed, 50 insertions(+), 33 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 64d4443..c8c5be5 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -93,6 +93,7 @@ func (slf *Cluster) Start() { slf.rpcServer.Start(slf.localNodeInfo.ListenAddr) } + func GetCluster() *Cluster{ return &cluster } @@ -143,4 +144,4 @@ func GetRpcClient(nodeId int,serviceMethod string) ([]*rpc.Client,error) { func GetRpcServer() *rpc.Server{ return &cluster.rpcServer -} \ No newline at end of file +} diff --git a/example/main.go b/example/main.go index 4637631..f668994 100644 --- a/example/main.go +++ b/example/main.go @@ -3,6 +3,7 @@ package main import ( "fmt" "github.com/duanhf2012/origin/example/GateService" + "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/node" "github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/sysmodule" @@ -91,6 +92,7 @@ func (slf *Module4) OnRelease() { func (slf *TestServiceCall) OnInit() error { slf.AfterFunc(time.Second*1,slf.Run) + slf.AfterFunc(time.Second*1,slf.Test) moduleid1,_ = slf.AddModule(&Module1{}) moduleid2,_ = slf.AddModule(&Module2{}) fmt.Print(moduleid1,moduleid2) @@ -118,38 +120,47 @@ type Param struct { Pa []string } - -func (slf *TestServiceCall) Run(){ - //var ret int - //var input int = 10000 - //bT := time.Now() // 开始时间 - - //err := slf.Call("TestServiceCall.RPC_Test",&ret,&input) +var index int +func (slf *TestServiceCall) Test(){ + index += 1 var param Param param.A = 2342342341 param.B = "xxxxxxxxxxxxxxxxxxxxxxx" param.Pa = []string{"ccccc","asfsdfsdaf","bbadfsdf","ewrwefasdf","safsadfka;fksd"} - /* + param.Index = index + slf.AsyncCall("TestService1.RPC_Test",¶m, func(reply *Param, err error) { + fmt.Print(reply,"\n") + }) + slf.AfterFunc(time.Second*1,slf.Test) +} + +func (slf *TestServiceCall) Run(){ + //var ret int + var input int = 1000000 + //bT := time.Now() // 开始时间 + + //err := slf.Call("TestServiceCall.RPC_Test",&ret,&input) for i:=input;i>=0;i--{ + var param Param + param.A = 2342342341 + param.B = "xxxxxxxxxxxxxxxxxxxxxxx" + param.Pa = []string{"ccccc","asfsdfsdaf","bbadfsdf","ewrwefasdf","safsadfka;fksd"} param.Index = i + if param.Index == 0 { + fmt.Print(".......................\n") + } slf.AsyncCall("TestService1.RPC_Test",¶m, func(reply *Param, err error) { - if reply.Index == 0 || err != nil{ - eT := time.Since(bT) // 从开始到当前所消耗的时间 - fmt.Print(err,eT.Milliseconds()) - fmt.Print("xxxx..................",eT,err,"\n") - } - //fmt.Print(*reply,"\n",err) + log.Debug(" index %d ,err %+v",reply.Index,err) }) } -*/ + fmt.Print("finsh....") - - } func (slf *TestService1) RPC_Test(a *Param,b *Param) error { - *a = *b + //*a = *b + *b = *a return nil } diff --git a/node/node.go b/node/node.go index 6e8dfda..661b599 100644 --- a/node/node.go +++ b/node/node.go @@ -4,8 +4,10 @@ import ( "fmt" "github.com/duanhf2012/origin/cluster" "github.com/duanhf2012/origin/console" + "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/service" "io/ioutil" + syslog "log" "os" "os/signal" "strconv" @@ -84,6 +86,7 @@ func initNode(id int){ } func Start() { + SetSysLog("debug","./",syslog.Lshortfile|syslog.LstdFlags) console.RegisterCommand("start",startNode) console.RegisterCommand("stop",stopNode) err := console.Run(os.Args) @@ -131,4 +134,9 @@ func GetService(servicename string) service.IService { func SetConfigDir(configdir string){ cluster.SetConfigDir(configdir) -} \ No newline at end of file +} + +func SetSysLog(strLevel string, pathname string, flag int){ + logs,_:= log.New(strLevel,pathname,flag) + log.Export(logs) +} diff --git a/rpc/client.go b/rpc/client.go index 69c2bfb..48bf098 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -70,7 +70,6 @@ func (slf *Client) AsycGo(rpcHandler IRpcHandler,mutiCoroutine bool,serviceMetho request.Seq = slf.startSeq slf.pending[call.Seq] = call slf.pendingLock.Unlock() - request.ServiceMethod = serviceMethod var herr error request.InParam,herr = processor.Marshal(args) @@ -163,8 +162,8 @@ func (slf *Client) Run(){ for { bytes,err := slf.conn.ReadMsg() if err != nil { - slf.Close() - slf.Start() + log.Error("rpcClient %s ReadMsg error:%+v",slf.Addr,err) + return } //1.解析head respone := &RpcResponse{} @@ -198,12 +197,7 @@ func (slf *Client) Run(){ } } - } func (slf *Client) OnClose(){ - if slf.blocalhost== false{ - //关闭时,重新连接 - slf.Start() - } -} \ No newline at end of file +} diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index fa74232..8068566 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -179,13 +179,16 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) { v.iparam = request.localParam } - + var oParam reflect.Value paramList = append(paramList,reflect.ValueOf(slf.GetRpcHandler())) //接受者 if request.localReply!=nil { - v.oParam = reflect.ValueOf(request.localReply) + oParam = reflect.ValueOf(request.localReply) + }else{ + oParam = reflect.New(v.oParam.Type().Elem()) } + paramList = append(paramList,reflect.ValueOf(v.iparam)) - paramList = append(paramList,v.oParam) //输出参数 + paramList = append(paramList,oParam) //输出参数 returnValues := v.method.Func.Call(paramList) errInter := returnValues[0].Interface() @@ -194,7 +197,7 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) { } if request.requestHandle!=nil { - request.requestHandle(v.oParam.Interface(), err) + request.requestHandle(oParam.Interface(), err) } }