diff --git a/cluster/cluster.go b/cluster/cluster.go index e1295b4..b4f1e9d 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -30,7 +30,8 @@ type CCluster struct { reader net.Conn writer net.Conn - LocalRpcClient *rpc.Client + LocalRpcClient *rpc.Client + innerLocalServiceList map[string]bool } func (slf *CCluster) ReadNodeInfo(nodeid int) error { @@ -297,7 +298,7 @@ func (slf *CCluster) GetNodeList(NodeServiceMethod string, rpcServerMethod *stri return nodeidList } -func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}) error { +func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}, queueModle bool) error { var callServiceName string var serviceName string nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName, &serviceName) @@ -318,7 +319,7 @@ func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}) return fmt.Errorf("CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid) } - replyCall := slf.LocalRpcClient.Go(callServiceName, args, nil, nil) + replyCall := slf.LocalRpcClient.Go(callServiceName, args, nil, nil, queueModle) if replyCall.Error != nil { service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) fail:%v.", NodeServiceMethod, replyCall.Error) } @@ -329,7 +330,7 @@ func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}) service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) NodeId %d not find client", NodeServiceMethod, nodeid) return fmt.Errorf("CCluster.Go(%s) NodeId %d not find client", NodeServiceMethod, nodeid) } - replyCall := pclient.Go(callServiceName, args, nil, nil) + replyCall := pclient.Go(callServiceName, args, nil, nil, queueModle) if replyCall.Error != nil { service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) fail:%v.", NodeServiceMethod, replyCall.Error) } @@ -355,14 +356,14 @@ func (slf *CCluster) CallNode(nodeid int, servicemethod string, args interface{} return err } -func (slf *CCluster) GoNode(nodeid int, args interface{}, servicemethod string) error { +func (slf *CCluster) GoNode(nodeid int, args interface{}, servicemethod string, queueModle bool) error { pclient := slf.GetClusterClient(nodeid) if pclient == nil { service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.GoNode(%d,%s) NodeId not find client", nodeid, servicemethod) return fmt.Errorf("CCluster.GoNode(%d,%s) NodeId not find client", nodeid, servicemethod) } - replyCall := pclient.Go(servicemethod, args, nil, nil) + replyCall := pclient.Go(servicemethod, args, nil, nil, queueModle) if replyCall.Error != nil { service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.GoNode(%d,%s) fail:%v", nodeid, servicemethod, replyCall.Error) } @@ -383,20 +384,32 @@ func Call(NodeServiceMethod string, args interface{}, reply interface{}) error { return InstanceClusterMgr().Call(NodeServiceMethod, args, reply) } -func Go(NodeServiceMethod string, args interface{}) error { - return InstanceClusterMgr().Go(false, NodeServiceMethod, args) -} - func CallNode(NodeId int, servicemethod string, args interface{}, reply interface{}) error { return InstanceClusterMgr().CallNode(NodeId, servicemethod, args, reply) } func GoNode(NodeId int, servicemethod string, args interface{}) error { - return InstanceClusterMgr().GoNode(NodeId, args, servicemethod) + return InstanceClusterMgr().GoNode(NodeId, args, servicemethod, false) +} + +func Go(NodeServiceMethod string, args interface{}) error { + return InstanceClusterMgr().Go(false, NodeServiceMethod, args, false) } func CastGo(NodeServiceMethod string, args interface{}) error { - return InstanceClusterMgr().Go(true, NodeServiceMethod, args) + return InstanceClusterMgr().Go(true, NodeServiceMethod, args, false) +} + +func GoNodeQueue(NodeId int, servicemethod string, args interface{}) error { + return InstanceClusterMgr().GoNode(NodeId, args, servicemethod, true) +} + +func GoQueue(NodeServiceMethod string, args interface{}) error { + return InstanceClusterMgr().Go(false, NodeServiceMethod, args, true) +} + +func CastGoQueue(NodeServiceMethod string, args interface{}) error { + return InstanceClusterMgr().Go(true, NodeServiceMethod, args, true) } var _self *CCluster @@ -404,6 +417,7 @@ var _self *CCluster func InstanceClusterMgr() *CCluster { if _self == nil { _self = new(CCluster) + _self.innerLocalServiceList = make(map[string]bool) return _self } return _self @@ -414,7 +428,8 @@ func (slf *CCluster) GetIdByNodeService(NodeName string, serviceName string) []i } func (slf *CCluster) HasLocalService(serviceName string) bool { - return slf.cfg.HasLocalService(serviceName) + _, ok := slf.innerLocalServiceList[serviceName] + return slf.cfg.HasLocalService(serviceName) || ok } func (slf *CCluster) HasInit(serviceName string) bool { @@ -424,3 +439,14 @@ func (slf *CCluster) HasInit(serviceName string) bool { func GetNodeId() int { return _self.cfg.currentNode.NodeID } + +func (slf *CCluster) AddLocalService(iservice service.IService) { + servicename := fmt.Sprintf("%T", iservice) + parts := strings.Split(servicename, ".") + if len(parts) != 2 { + service.GetLogger().Printf(service.LEVER_ERROR, "BaseService.Init: service name is error: %q", servicename) + } + + servicename = parts[1] + slf.innerLocalServiceList[servicename] = true +} diff --git a/originnode/node.go b/originnode/node.go index 88b2243..6aa9ccc 100644 --- a/originnode/node.go +++ b/originnode/node.go @@ -50,6 +50,9 @@ func (s *COriginNode) OpenDebugCheck(listenAddress string) { } func (s *COriginNode) SetupService(services ...service.IService) { + ppService := &sysservice.PProfService{} + services = append(services, ppService) + cluster.InstanceClusterMgr().AddLocalService(ppService) for i := 0; i < len(services); i++ { services[i].Init(services[i]) diff --git a/rpc/client.go b/rpc/client.go index b938502..bda7a33 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -70,7 +70,7 @@ type ClientCodec interface { Close() error } -func (client *Client) send(call *Call) { +func (client *Client) send(call *Call, queueMode bool) { client.reqMutex.Lock() defer client.reqMutex.Unlock() @@ -90,6 +90,7 @@ func (client *Client) send(call *Call) { // Encode and send the request. client.request.Seq = seq client.request.ServiceMethod = call.ServiceMethod + client.request.QueueMode = queueMode err := client.codec.WriteRequest(&client.request, call.Args) if err != nil { client.mutex.Lock() @@ -305,7 +306,7 @@ func (client *Client) Close() error { // the invocation. The done channel will signal when the call is complete by returning // the same Call object. If done is nil, Go will allocate a new channel. // If non-nil, done must be buffered or Go will deliberately crash. -func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call { +func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call, queueMode bool) *Call { call := new(Call) call.ServiceMethod = serviceMethod call.Args = args @@ -322,16 +323,16 @@ func (client *Client) Go(serviceMethod string, args interface{}, reply interface } } call.Done = done - client.send(call) + client.send(call, queueMode) return call } // Call invokes the named function, waits for it to complete, and returns its error status. func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error { select { - case call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done: + case call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1), false).Done: return call.Error - case <-time.After(10 * time.Second): + case <-time.After(15 * time.Second): } //call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done diff --git a/rpc/server.go b/rpc/server.go index 06e0ce3..ab5e9fc 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -144,6 +144,7 @@ import ( runtimedebug "runtime/debug" orginservice "github.com/duanhf2012/origin/service" + "github.com/duanhf2012/origin/util" ) const ( @@ -178,6 +179,7 @@ type Request struct { ServiceMethod string // format: "Service.Method" Seq uint64 // sequence number chosen by client next *Request // for free list in Server + QueueMode bool } // Response is a header written before every RPC return. It is used internally @@ -190,6 +192,10 @@ type Response struct { next *Response // for free list in Server } +const ( + MAX_RPCDATA_QUEUE_COUNT = 10240 +) + // Server represents an RPC Server. type Server struct { serviceMap sync.Map // map[string]*service @@ -197,11 +203,28 @@ type Server struct { freeReq *Request respLock sync.Mutex // protects freeResp freeResp *Response + + mapCallQueue map[string]chan *CQueueRpcData +} + +type CQueueRpcData struct { + server *Server + sending *sync.Mutex + wg *sync.WaitGroup + mtype *methodType + req *Request + argv reflect.Value + replyv reflect.Value + codec ServerCodec + service *service } // NewServer returns a new Server. func NewServer() *Server { - return &Server{} + server := &Server{} + server.mapCallQueue = make(map[string]chan *CQueueRpcData) + + return server } // DefaultServer is the default instance of *Server. @@ -243,11 +266,30 @@ func (server *Server) RegisterName(name string, prefix string, rcvr interface{}) return server.register(rcvr, name, prefix, true) } +func (server *Server) ProcessQueue(name string) { + chanRpc, ok := server.mapCallQueue[name] + if ok == false { + orginservice.GetLogger().Printf(orginservice.LEVER_FATAL, "cannot find queue") + return + } + + for { + rpcData := <-chanRpc + rpcData.service.call(rpcData.server, rpcData.sending, rpcData.wg, rpcData.mtype, rpcData.req, rpcData.argv, rpcData.replyv, rpcData.codec) + } +} + func (server *Server) register(rcvr interface{}, name string, prefix string, useName bool) error { s := new(service) s.typ = reflect.TypeOf(rcvr) s.rcvr = reflect.ValueOf(rcvr) sname := reflect.Indirect(s.rcvr).Type().Name() + + _, ok := server.mapCallQueue[sname] + if ok == false { + server.mapCallQueue[sname] = make(chan *CQueueRpcData, 10240) + util.Go(server.ProcessQueue, sname) + } if useName { sname = name } @@ -495,9 +537,10 @@ func (server *Server) ServeCodec(codec ServerCodec) { sending := new(sync.Mutex) wg := new(sync.WaitGroup) for { - service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec) + service, mtype, req, argv, replyv, keepReading, queueMode, err := server.readRequest(codec) if err != nil { if debugLog && err != io.EOF { + orginservice.GetLogger().Printf(orginservice.LEVER_FATAL, "rpc: %v", err) log.Println("rpc:", err) } if !keepReading { @@ -510,7 +553,28 @@ func (server *Server) ServeCodec(codec ServerCodec) { } continue } + + rpcData := &CQueueRpcData{server, sending, wg, mtype, req, argv, replyv, codec, service} + + if queueMode == true { + rpcChan, ok := server.mapCallQueue[service.name] + if ok == true { + if len(rpcChan) >= MAX_RPCDATA_QUEUE_COUNT { + orginservice.GetLogger().Printf(orginservice.LEVER_FATAL, "Rpc Service Name %s chan overload %d", service.name, MAX_RPCDATA_QUEUE_COUNT) + + continue + } + wg.Add(1) + rpcChan <- rpcData + continue + } else { + orginservice.GetLogger().Printf(orginservice.LEVER_FATAL, "Rpc Service Name %s call not find coroutines", service.name) + } + } + wg.Add(1) + //queueMode + //fmt.Print(queueMode) go service.call(server, sending, wg, mtype, req, argv, replyv, codec) } // We've seen that there are no more requests. @@ -523,7 +587,7 @@ func (server *Server) ServeCodec(codec ServerCodec) { // It does not close the codec upon completion. func (server *Server) ServeRequest(codec ServerCodec) error { sending := new(sync.Mutex) - service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec) + service, mtype, req, argv, replyv, keepReading, _, err := server.readRequest(codec) if err != nil { if !keepReading { return err @@ -579,13 +643,13 @@ func (server *Server) freeResponse(resp *Response) { server.respLock.Unlock() } -func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) { - service, mtype, req, keepReading, err = server.readRequestHeader(codec) +func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, queueMode bool, err error) { + service, mtype, req, keepReading, queueMode, err = server.readRequestHeader(codec) if err != nil { if !keepReading { return - } - // discard body + } // discard body + codec.ReadRequestBody(nil) return } @@ -617,7 +681,7 @@ func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *m return } -func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) { +func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, queueMode bool, err error) { // Grab the request header. req = server.getRequest() err = codec.ReadRequestHeader(req) @@ -653,6 +717,7 @@ func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype if mtype == nil { err = errors.New("rpc: can't find method " + req.ServiceMethod) } + queueMode = req.QueueMode return }