go rpc调用新增队列模式

This commit is contained in:
boyce
2019-04-04 22:13:06 +08:00
parent f45e60ff49
commit 1d607d9018
4 changed files with 121 additions and 26 deletions

View File

@@ -30,7 +30,8 @@ type CCluster struct {
reader net.Conn
writer net.Conn
LocalRpcClient *rpc.Client
LocalRpcClient *rpc.Client
innerLocalServiceList map[string]bool
}
func (slf *CCluster) ReadNodeInfo(nodeid int) error {
@@ -297,7 +298,7 @@ func (slf *CCluster) GetNodeList(NodeServiceMethod string, rpcServerMethod *stri
return nodeidList
}
func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}) error {
func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}, queueModle bool) error {
var callServiceName string
var serviceName string
nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName, &serviceName)
@@ -318,7 +319,7 @@ func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{})
return fmt.Errorf("CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid)
}
replyCall := slf.LocalRpcClient.Go(callServiceName, args, nil, nil)
replyCall := slf.LocalRpcClient.Go(callServiceName, args, nil, nil, queueModle)
if replyCall.Error != nil {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) fail:%v.", NodeServiceMethod, replyCall.Error)
}
@@ -329,7 +330,7 @@ func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{})
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) NodeId %d not find client", NodeServiceMethod, nodeid)
return fmt.Errorf("CCluster.Go(%s) NodeId %d not find client", NodeServiceMethod, nodeid)
}
replyCall := pclient.Go(callServiceName, args, nil, nil)
replyCall := pclient.Go(callServiceName, args, nil, nil, queueModle)
if replyCall.Error != nil {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) fail:%v.", NodeServiceMethod, replyCall.Error)
}
@@ -355,14 +356,14 @@ func (slf *CCluster) CallNode(nodeid int, servicemethod string, args interface{}
return err
}
func (slf *CCluster) GoNode(nodeid int, args interface{}, servicemethod string) error {
func (slf *CCluster) GoNode(nodeid int, args interface{}, servicemethod string, queueModle bool) error {
pclient := slf.GetClusterClient(nodeid)
if pclient == nil {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.GoNode(%d,%s) NodeId not find client", nodeid, servicemethod)
return fmt.Errorf("CCluster.GoNode(%d,%s) NodeId not find client", nodeid, servicemethod)
}
replyCall := pclient.Go(servicemethod, args, nil, nil)
replyCall := pclient.Go(servicemethod, args, nil, nil, queueModle)
if replyCall.Error != nil {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.GoNode(%d,%s) fail:%v", nodeid, servicemethod, replyCall.Error)
}
@@ -383,20 +384,32 @@ func Call(NodeServiceMethod string, args interface{}, reply interface{}) error {
return InstanceClusterMgr().Call(NodeServiceMethod, args, reply)
}
func Go(NodeServiceMethod string, args interface{}) error {
return InstanceClusterMgr().Go(false, NodeServiceMethod, args)
}
func CallNode(NodeId int, servicemethod string, args interface{}, reply interface{}) error {
return InstanceClusterMgr().CallNode(NodeId, servicemethod, args, reply)
}
func GoNode(NodeId int, servicemethod string, args interface{}) error {
return InstanceClusterMgr().GoNode(NodeId, args, servicemethod)
return InstanceClusterMgr().GoNode(NodeId, args, servicemethod, false)
}
func Go(NodeServiceMethod string, args interface{}) error {
return InstanceClusterMgr().Go(false, NodeServiceMethod, args, false)
}
func CastGo(NodeServiceMethod string, args interface{}) error {
return InstanceClusterMgr().Go(true, NodeServiceMethod, args)
return InstanceClusterMgr().Go(true, NodeServiceMethod, args, false)
}
func GoNodeQueue(NodeId int, servicemethod string, args interface{}) error {
return InstanceClusterMgr().GoNode(NodeId, args, servicemethod, true)
}
func GoQueue(NodeServiceMethod string, args interface{}) error {
return InstanceClusterMgr().Go(false, NodeServiceMethod, args, true)
}
func CastGoQueue(NodeServiceMethod string, args interface{}) error {
return InstanceClusterMgr().Go(true, NodeServiceMethod, args, true)
}
var _self *CCluster
@@ -404,6 +417,7 @@ var _self *CCluster
func InstanceClusterMgr() *CCluster {
if _self == nil {
_self = new(CCluster)
_self.innerLocalServiceList = make(map[string]bool)
return _self
}
return _self
@@ -414,7 +428,8 @@ func (slf *CCluster) GetIdByNodeService(NodeName string, serviceName string) []i
}
func (slf *CCluster) HasLocalService(serviceName string) bool {
return slf.cfg.HasLocalService(serviceName)
_, ok := slf.innerLocalServiceList[serviceName]
return slf.cfg.HasLocalService(serviceName) || ok
}
func (slf *CCluster) HasInit(serviceName string) bool {
@@ -424,3 +439,14 @@ func (slf *CCluster) HasInit(serviceName string) bool {
func GetNodeId() int {
return _self.cfg.currentNode.NodeID
}
func (slf *CCluster) AddLocalService(iservice service.IService) {
servicename := fmt.Sprintf("%T", iservice)
parts := strings.Split(servicename, ".")
if len(parts) != 2 {
service.GetLogger().Printf(service.LEVER_ERROR, "BaseService.Init: service name is error: %q", servicename)
}
servicename = parts[1]
slf.innerLocalServiceList[servicename] = true
}

View File

@@ -50,6 +50,9 @@ func (s *COriginNode) OpenDebugCheck(listenAddress string) {
}
func (s *COriginNode) SetupService(services ...service.IService) {
ppService := &sysservice.PProfService{}
services = append(services, ppService)
cluster.InstanceClusterMgr().AddLocalService(ppService)
for i := 0; i < len(services); i++ {
services[i].Init(services[i])

View File

@@ -70,7 +70,7 @@ type ClientCodec interface {
Close() error
}
func (client *Client) send(call *Call) {
func (client *Client) send(call *Call, queueMode bool) {
client.reqMutex.Lock()
defer client.reqMutex.Unlock()
@@ -90,6 +90,7 @@ func (client *Client) send(call *Call) {
// Encode and send the request.
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
client.request.QueueMode = queueMode
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
client.mutex.Lock()
@@ -305,7 +306,7 @@ func (client *Client) Close() error {
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call, queueMode bool) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
@@ -322,16 +323,16 @@ func (client *Client) Go(serviceMethod string, args interface{}, reply interface
}
}
call.Done = done
client.send(call)
client.send(call, queueMode)
return call
}
// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
select {
case call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done:
case call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1), false).Done:
return call.Error
case <-time.After(10 * time.Second):
case <-time.After(15 * time.Second):
}
//call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done

View File

@@ -144,6 +144,7 @@ import (
runtimedebug "runtime/debug"
orginservice "github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/util"
)
const (
@@ -178,6 +179,7 @@ type Request struct {
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *Request // for free list in Server
QueueMode bool
}
// Response is a header written before every RPC return. It is used internally
@@ -190,6 +192,10 @@ type Response struct {
next *Response // for free list in Server
}
const (
MAX_RPCDATA_QUEUE_COUNT = 10240
)
// Server represents an RPC Server.
type Server struct {
serviceMap sync.Map // map[string]*service
@@ -197,11 +203,28 @@ type Server struct {
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
mapCallQueue map[string]chan *CQueueRpcData
}
type CQueueRpcData struct {
server *Server
sending *sync.Mutex
wg *sync.WaitGroup
mtype *methodType
req *Request
argv reflect.Value
replyv reflect.Value
codec ServerCodec
service *service
}
// NewServer returns a new Server.
func NewServer() *Server {
return &Server{}
server := &Server{}
server.mapCallQueue = make(map[string]chan *CQueueRpcData)
return server
}
// DefaultServer is the default instance of *Server.
@@ -243,11 +266,30 @@ func (server *Server) RegisterName(name string, prefix string, rcvr interface{})
return server.register(rcvr, name, prefix, true)
}
func (server *Server) ProcessQueue(name string) {
chanRpc, ok := server.mapCallQueue[name]
if ok == false {
orginservice.GetLogger().Printf(orginservice.LEVER_FATAL, "cannot find queue")
return
}
for {
rpcData := <-chanRpc
rpcData.service.call(rpcData.server, rpcData.sending, rpcData.wg, rpcData.mtype, rpcData.req, rpcData.argv, rpcData.replyv, rpcData.codec)
}
}
func (server *Server) register(rcvr interface{}, name string, prefix string, useName bool) error {
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
_, ok := server.mapCallQueue[sname]
if ok == false {
server.mapCallQueue[sname] = make(chan *CQueueRpcData, 10240)
util.Go(server.ProcessQueue, sname)
}
if useName {
sname = name
}
@@ -495,9 +537,10 @@ func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
service, mtype, req, argv, replyv, keepReading, queueMode, err := server.readRequest(codec)
if err != nil {
if debugLog && err != io.EOF {
orginservice.GetLogger().Printf(orginservice.LEVER_FATAL, "rpc: %v", err)
log.Println("rpc:", err)
}
if !keepReading {
@@ -510,7 +553,28 @@ func (server *Server) ServeCodec(codec ServerCodec) {
}
continue
}
rpcData := &CQueueRpcData{server, sending, wg, mtype, req, argv, replyv, codec, service}
if queueMode == true {
rpcChan, ok := server.mapCallQueue[service.name]
if ok == true {
if len(rpcChan) >= MAX_RPCDATA_QUEUE_COUNT {
orginservice.GetLogger().Printf(orginservice.LEVER_FATAL, "Rpc Service Name %s chan overload %d", service.name, MAX_RPCDATA_QUEUE_COUNT)
continue
}
wg.Add(1)
rpcChan <- rpcData
continue
} else {
orginservice.GetLogger().Printf(orginservice.LEVER_FATAL, "Rpc Service Name %s call not find coroutines", service.name)
}
}
wg.Add(1)
//queueMode
//fmt.Print(queueMode)
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
}
// We've seen that there are no more requests.
@@ -523,7 +587,7 @@ func (server *Server) ServeCodec(codec ServerCodec) {
// It does not close the codec upon completion.
func (server *Server) ServeRequest(codec ServerCodec) error {
sending := new(sync.Mutex)
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
service, mtype, req, argv, replyv, keepReading, _, err := server.readRequest(codec)
if err != nil {
if !keepReading {
return err
@@ -579,13 +643,13 @@ func (server *Server) freeResponse(resp *Response) {
server.respLock.Unlock()
}
func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
service, mtype, req, keepReading, err = server.readRequestHeader(codec)
func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, queueMode bool, err error) {
service, mtype, req, keepReading, queueMode, err = server.readRequestHeader(codec)
if err != nil {
if !keepReading {
return
}
// discard body
} // discard body
codec.ReadRequestBody(nil)
return
}
@@ -617,7 +681,7 @@ func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *m
return
}
func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, queueMode bool, err error) {
// Grab the request header.
req = server.getRequest()
err = codec.ReadRequestHeader(req)
@@ -653,6 +717,7 @@ func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype
if mtype == nil {
err = errors.New("rpc: can't find method " + req.ServiceMethod)
}
queueMode = req.QueueMode
return
}