From 0d4522433d8cebef18b46f09b490405c4afe42af Mon Sep 17 00:00:00 2001 From: boyce Date: Thu, 24 Jan 2019 16:22:18 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=B7=A5=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 381 ++++++++++++++++++ cluster/config.go | 144 +++++++ netbase/websocketserver.go | 182 +++++++++ rpc/client.go | 324 ++++++++++++++++ rpc/debug.go | 90 +++++ rpc/server.go | 734 +++++++++++++++++++++++++++++++++++ server/server.go | 88 +++++ service/Service.go | 180 +++++++++ service/servicemanager.go | 119 ++++++ sysmodule/basemodule.go | 1 + sysmodule/timer.go.mv | 35 ++ sysmodule/websocketclient.go | 158 ++++++++ wsservice/wsservice.go | 35 ++ 13 files changed, 2471 insertions(+) create mode 100644 cluster/cluster.go create mode 100644 cluster/config.go create mode 100644 netbase/websocketserver.go create mode 100644 rpc/client.go create mode 100644 rpc/debug.go create mode 100644 rpc/server.go create mode 100644 server/server.go create mode 100644 service/Service.go create mode 100644 service/servicemanager.go create mode 100644 sysmodule/basemodule.go create mode 100644 sysmodule/timer.go.mv create mode 100644 sysmodule/websocketclient.go create mode 100644 wsservice/wsservice.go diff --git a/cluster/cluster.go b/cluster/cluster.go new file mode 100644 index 0000000..444cb34 --- /dev/null +++ b/cluster/cluster.go @@ -0,0 +1,381 @@ +package cluster + +import ( + "fmt" + "log" + "net" + "origin/rpc" + "origin/service" + "os" + "strconv" + "strings" + "time" +) + +//https://github.com/rocket049/rpc2d/blob/master/rpcnode.go +//http://daizuozhuo.github.io/golang-rpc-practice/ + +type RpcClient struct { + nodeid int + pclient *rpc.Client + serverAddr string + isConnect bool +} + +type CCluster struct { + port int + cfg *ClusterConfig + nodeclient map[int]*RpcClient + + reader net.Conn + writer net.Conn + + LocalRpcClient *rpc.Client +} + +func (slf *CCluster) ReadNodeInfo(nodeid int) error { + //连接Server结点 + var err error + + slf.cfg, err = ReadCfg("./config/cluster.json", nodeid) + if err != nil { + fmt.Printf("%v", err) + return nil + } + + return nil +} + +func (slf *CCluster) GetClusterClient(id int) *rpc.Client { + v, ok := slf.nodeclient[id] + if ok == false { + return nil + } + + return v.pclient +} + +func (slf *CCluster) GetClusterNode(strNodeName string) *CNodeCfg { + for _, value := range slf.cfg.NodeList { + if value.NodeName == strNodeName { + return &value + } + } + + return nil +} + +func (slf *CCluster) GetBindUrl() (string, error) { + return slf.cfg.currentNode.ServerAddr, nil +} + +type CTestData struct { + Bbbb int64 + Cccc int + Ddd string +} + +func (slf *CCluster) AcceptRpc(tpcListen *net.TCPListener) error { + slf.reader, slf.writer = net.Pipe() + go rpc.ServeConn(slf.reader) + slf.LocalRpcClient = rpc.NewClient(slf.writer) + + for { + conn, err := tpcListen.Accept() + if err != nil { + fmt.Print(err) + return err + } + + //使用goroutine单独处理rpc连接请求 + go rpc.ServeConn(conn) + } + + return nil +} + +func (slf *CCluster) ListenService() error { + + bindStr, err := slf.GetBindUrl() + if err != nil { + return err + } + + tcpaddr, err := net.ResolveTCPAddr("tcp4", bindStr) + if err != nil { + return err + } + + tcplisten, err2 := net.ListenTCP("tcp", tcpaddr) + if err2 != nil { + return err2 + } + + go slf.AcceptRpc(tcplisten) + return nil +} + +type CPing struct { + TimeStamp int64 +} + +type CPong struct { + TimeStamp int64 +} + +func (slf *CPing) Ping(ping *CPing, pong *CPong) error { + pong.TimeStamp = ping.TimeStamp + return nil +} + +func (slf *CCluster) ConnService() error { + ping := CPing{0} + pong := CPong{0} + fmt.Println(rpc.RegisterName("CPing", &ping)) + + //连接集群服务器 + for _, nodeList := range slf.cfg.mapClusterNodeService { + for _, node := range nodeList { + slf.nodeclient[node.NodeID] = &RpcClient{node.NodeID, nil, node.ServerAddr, false} + } + } + + for { + for _, rpcClient := range slf.nodeclient { + + // + if rpcClient.isConnect == true { + ping.TimeStamp = 0 + err := rpcClient.pclient.Call("CPing.Ping", &ping, &pong) + if err != nil { + rpcClient.pclient.Close() + rpcClient.pclient = nil + rpcClient.isConnect = false + continue + } + continue + } + + if rpcClient.pclient != nil { + rpcClient.pclient.Close() + rpcClient.pclient = nil + } + + client, err := rpc.Dial("tcp", rpcClient.serverAddr) + if err != nil { + log.Println(err) + continue + } + + v, _ := slf.nodeclient[rpcClient.nodeid] + v.pclient = client + v.isConnect = true + } + + time.Sleep(time.Second * 2) + } + + return nil +} + +func (slf *CCluster) Init() error { + if len(os.Args) < 2 { + return fmt.Errorf("param error not find NodeId=number") + } + + parts := strings.Split(os.Args[1], "=") + if len(parts) < 2 { + return fmt.Errorf("param error not find NodeId=number") + } + if parts[0] != "NodeId" { + return fmt.Errorf("param error not find NodeId=number") + } + + slf.nodeclient = make(map[int]*RpcClient) + + //读取配置 + ret, err := strconv.Atoi(parts[1]) + if err != nil { + return err + } + + return slf.ReadNodeInfo(ret) +} + +func (slf *CCluster) Start() error { + + service.InstanceServiceMgr().FetchService(slf.OnFetchService) + + //监听服务 + slf.ListenService() + + //集群 + go slf.ConnService() + + return nil +} + +//Node.servicename.methodname +//servicename.methodname +//_servicename.methodname +func (slf *CCluster) Call(NodeServiceMethod string, args interface{}, reply interface{}) error { + var callServiceName string + nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName) + if len(nodeidList) > 1 { + return fmt.Errorf("Call: %s find %d nodes.", NodeServiceMethod, len(nodeidList)) + } + + for _, nodeid := range nodeidList { + if nodeid == slf.GetCurrentNodeId() { + return slf.LocalRpcClient.Call(callServiceName, args, reply) + } else { + + pclient := slf.GetClusterClient(nodeid) + + if pclient == nil { + return fmt.Errorf("Call: NodeId %d is not find.", nodeid) + } + err := pclient.Call(callServiceName, args, reply) + return err + } + } + + return fmt.Errorf("Call: %s fail.", NodeServiceMethod) +} + +func (slf *CCluster) GetNodeList(NodeServiceMethod string, rpcServerMethod *string) []int { + var nodename string + var servicename string + var methodname string + var nodeidList []int + + parts := strings.Split(NodeServiceMethod, ".") + if len(parts) == 2 { + servicename = parts[0] + methodname = parts[1] + } else if len(parts) == 3 { + nodename = parts[0] + servicename = parts[1] + methodname = parts[2] + } else { + return nodeidList + } + + if nodename == "" { + nodeidList = make([]int, 0) + if servicename[:1] == "_" { + servicename = servicename[1:] + nodeidList = append(nodeidList, slf.GetCurrentNodeId()) + } else { + nodeidList = slf.cfg.GetIdByService(servicename) + } + } else { + nodeidList = slf.GetIdByNodeService(nodename, servicename) + } + + *rpcServerMethod = servicename + "." + methodname + return nodeidList +} + +func (slf *CCluster) Go(NodeServiceMethod string, args interface{}) error { + var callServiceName string + nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName) + if len(nodeidList) > 1 { + return fmt.Errorf("Call: %s find %d nodes.", NodeServiceMethod, len(nodeidList)) + } + + for _, nodeid := range nodeidList { + if nodeid == slf.GetCurrentNodeId() { + slf.LocalRpcClient.Go(callServiceName, args, nil, nil) + return nil + } else { + + pclient := slf.GetClusterClient(nodeid) + + if pclient == nil { + return fmt.Errorf("Call: NodeId %d is not find.", nodeid) + } + pclient.Go(callServiceName, args, nil, nil) + return nil + } + } + + return fmt.Errorf("Call: %s fail.", NodeServiceMethod) +} +func (slf *CCluster) CallNode(nodeid int, servicemethod string, args interface{}, reply interface{}) error { + pclient := slf.GetClusterClient(nodeid) + if pclient == nil { + return fmt.Errorf("Call: NodeId %d is not find.", nodeid) + } + + err := pclient.Call(servicemethod, args, reply) + return err + +} + +func (slf *CCluster) GoNode(nodeid int, args interface{}, servicemethod string) error { + pclient := slf.GetClusterClient(nodeid) + if pclient == nil { + return fmt.Errorf("Call: NodeId %d is not find.", nodeid) + } + + replyCall := pclient.Go(servicemethod, args, nil, nil) + //ret := <-replyCall.Done + if replyCall.Error != nil { + fmt.Print(replyCall.Error) + } + + //fmt.Print(ret) + return nil + +} + +func (ws *CCluster) OnFetchService(iservice service.IService) error { + rpc.RegisterName(iservice.GetServiceName(), iservice) + return nil +} + +//向远程服务器调用 +//Node.servicename.methodname +//servicename.methodname +func Call(NodeServiceMethod string, args interface{}, reply interface{}) error { + return InstanceClusterMgr().Call(NodeServiceMethod, args, reply) +} + +func Go(NodeServiceMethod string, args interface{}) error { + return InstanceClusterMgr().Go(NodeServiceMethod, args) +} + +func CallNode(NodeId int, servicemethod string, args interface{}, reply interface{}) error { + return InstanceClusterMgr().CallNode(NodeId, servicemethod, args, reply) +} + +func GoNode(NodeId int, servicemethod string, args interface{}) error { + return InstanceClusterMgr().GoNode(NodeId, args, servicemethod) +} + +func CastCall(NodeServiceMethod string, args interface{}, reply []interface{}) error { + return InstanceClusterMgr().Call(NodeServiceMethod, args, reply) +} + +var _self *CCluster + +func InstanceClusterMgr() *CCluster { + if _self == nil { + _self = new(CCluster) + return _self + } + return _self +} + +func (slf *CCluster) GetIdByNodeService(NodeName string, serviceName string) []int { + return slf.cfg.GetIdByNodeService(NodeName, serviceName) +} + +func (slf *CCluster) HasLocalService(serviceName string) bool { + return slf.cfg.HasLocalService(serviceName) +} + +func (slf *CCluster) GetCurrentNodeId() int { + return slf.cfg.currentNode.NodeID +} diff --git a/cluster/config.go b/cluster/config.go new file mode 100644 index 0000000..02e7424 --- /dev/null +++ b/cluster/config.go @@ -0,0 +1,144 @@ +package cluster + +import ( + "encoding/json" + "fmt" + "io/ioutil" +) + +type CNodeCfg struct { + NodeID int + NodeName string + + ServerAddr string + ServiceList []string + ClusterNode []string +} + +type CNode struct { + NodeID int + NodeName string + + ServerAddr string + ServiceList map[string]bool +} + +type ClusterConfig struct { + NodeList []CNodeCfg + + //通过id获取结点 + mapIdNode map[int]CNode + + //map[nodename][ {CNode} ] + mapClusterNodeService map[string][]CNode + mapClusterServiceNode map[string][]CNode + mapLocalService map[string]bool + + currentNode CNode +} + +// ReadCfg ... +func ReadCfg(path string, nodeid int) (*ClusterConfig, error) { + c := &ClusterConfig{} + + d, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + err = json.Unmarshal(d, c) + if err != nil { + return nil, err + } + + c.mapIdNode = make(map[int]CNode, 1) + c.mapClusterNodeService = make(map[string][]CNode, 1) + c.mapLocalService = make(map[string]bool) + c.mapClusterServiceNode = make(map[string][]CNode, 1) + + //组装mapIdNode + var clusterNode []string + for _, v := range c.NodeList { + + //1.取所有结点 + mapservice := make(map[string]bool, 1) + c.mapIdNode[v.NodeID] = CNode{v.NodeID, v.NodeName, v.ServerAddr, mapservice} + + if nodeid == v.NodeID { + c.currentNode = c.mapIdNode[v.NodeID] + + //2.mapServiceNode map[string][]string + for _, s := range v.ServiceList { + mapservice[s] = true + c.mapLocalService[s] = true + } + + for _, c := range v.ClusterNode { + clusterNode = append(clusterNode, c) + } + } + } + + //组装mapClusterNodeService + for _, kc := range clusterNode { + for _, v := range c.NodeList { + if kc == v.NodeName { + //将自有连接的结点取详细信息 + mapservice := make(map[string]bool, 1) + curNode := CNode{v.NodeID, v.NodeName, v.ServerAddr, mapservice} + for _, s := range v.ServiceList { + mapservice[s] = true + c.mapClusterServiceNode[s] = append(c.mapClusterServiceNode[s], curNode) + } + c.mapClusterNodeService[v.NodeName] = append(c.mapClusterNodeService[v.NodeName], curNode) + } + + } + } + + fmt.Println(c.mapIdNode) + fmt.Println(c.mapClusterNodeService) + fmt.Println(c.mapClusterServiceNode) + return c, nil +} + +func (slf *ClusterConfig) GetIdByService(serviceName string) []int { + var nodeidlist []int + nodeidlist = make([]int, 0) + + nodeList, ok := slf.mapClusterServiceNode[serviceName] + if ok == true { + for _, v := range nodeList { + nodeidlist = append(nodeidlist, v.NodeID) + } + } + + return nodeidlist +} + +func (slf *ClusterConfig) GetIdByNodeService(NodeName string, serviceName string) []int { + var nodeidlist []int + nodeidlist = make([]int, 0) + + if NodeName == slf.currentNode.NodeName { + nodeidlist = append(nodeidlist, slf.currentNode.NodeID) + } + + v, ok := slf.mapClusterNodeService[NodeName] + if ok == false { + return nodeidlist + } + + for _, n := range v { + _, ok = n.ServiceList[serviceName] + if ok == true { + nodeidlist = append(nodeidlist, n.NodeID) + } + } + + return nodeidlist +} + +func (slf *ClusterConfig) HasLocalService(serviceName string) bool { + _, ok := slf.mapLocalService[serviceName] + return ok == true +} diff --git a/netbase/websocketserver.go b/netbase/websocketserver.go new file mode 100644 index 0000000..faea3ed --- /dev/null +++ b/netbase/websocketserver.go @@ -0,0 +1,182 @@ +package netbase + +import ( + "errors" + "net/http" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +type Connection struct { + wsConnect *websocket.Conn + inChan chan []byte + outChan chan []byte + closeChan chan byte + + mutex sync.Mutex // 对closeChan关闭上锁 + isClosed bool // 防止closeChan被关闭多次 +} + +type CWebSocketServer struct { + Connection + bindurl string +} + +func NewWebSocketServer(bindurl string) *CWebSocketServer { + wss := new(CWebSocketServer) + wss.bindurl = bindurl + return wss +} + +func (wss *CWebSocketServer) Start() { + + http.HandleFunc("/ws", wsHandler) + go http.ListenAndServe(wss.bindurl, nil) +} + +var ( + upgrader = websocket.Upgrader{ + // 允许跨域 + CheckOrigin: func(r *http.Request) bool { + return true + }, + } +) + +func wsHandler(w http.ResponseWriter, r *http.Request) { + // w.Write([]byte("hello")) + var ( + wsConn *websocket.Conn + err error + conn *Connection + data []byte + ) + // 完成ws协议的握手操作 + // Upgrade:websocket + if wsConn, err = upgrader.Upgrade(w, r, nil); err != nil { + return + } + + if conn, err = InitConnection(wsConn); err != nil { + goto ERR + } + + // 启动线程,不断发消息 + go func() { + var ( + err error + ) + for { + if err = conn.WriteMessage([]byte("heartbeat")); err != nil { + return + } + time.Sleep(1 * time.Second) + } + }() + + for { + if data, err = conn.ReadMessage(); err != nil { + goto ERR + } + if err = conn.WriteMessage(data); err != nil { + goto ERR + } + } + +ERR: + conn.Close() + +} + +func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) { + conn = &Connection{ + wsConnect: wsConn, + inChan: make(chan []byte, 1000), + outChan: make(chan []byte, 1000), + closeChan: make(chan byte, 1), + } + // 启动读协程 + go conn.readLoop() + // 启动写协程 + go conn.writeLoop() + return +} + +func (conn *Connection) ReadMessage() (data []byte, err error) { + + select { + case data = <-conn.inChan: + case <-conn.closeChan: + err = errors.New("connection is closeed") + } + return +} + +func (conn *Connection) WriteMessage(data []byte) (err error) { + + select { + case conn.outChan <- data: + case <-conn.closeChan: + err = errors.New("connection is closeed") + } + return +} + +func (conn *Connection) Close() { + // 线程安全,可多次调用 + conn.wsConnect.Close() + // 利用标记,让closeChan只关闭一次 + conn.mutex.Lock() + if !conn.isClosed { + close(conn.closeChan) + conn.isClosed = true + } + conn.mutex.Unlock() +} + +// 内部实现 +func (conn *Connection) readLoop() { + var ( + data []byte + err error + ) + for { + if _, data, err = conn.wsConnect.ReadMessage(); err != nil { + goto ERR + } + //阻塞在这里,等待inChan有空闲位置 + select { + case conn.inChan <- data: + case <-conn.closeChan: // closeChan 感知 conn断开 + goto ERR + } + + } + +ERR: + conn.Close() +} + +func (conn *Connection) writeLoop() { + var ( + data []byte + err error + ) + + for { + select { + case data = <-conn.outChan: + case <-conn.closeChan: + goto ERR + } + if err = conn.wsConnect.WriteMessage(websocket.TextMessage, data); err != nil { + goto ERR + } + } + +ERR: + conn.Close() + +} diff --git a/rpc/client.go b/rpc/client.go new file mode 100644 index 0000000..cad2d45 --- /dev/null +++ b/rpc/client.go @@ -0,0 +1,324 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package rpc + +import ( + "bufio" + "encoding/gob" + "errors" + "io" + "log" + "net" + "net/http" + "sync" +) + +// ServerError represents an error that has been returned from +// the remote side of the RPC connection. +type ServerError string + +func (e ServerError) Error() string { + return string(e) +} + +var ErrShutdown = errors.New("connection is shut down") + +// Call represents an active RPC. +type Call struct { + ServiceMethod string // The name of the service and method to call. + Args interface{} // The argument to the function (*struct). + Reply interface{} // The reply from the function (*struct). + Error error // After completion, the error status. + Done chan *Call // Strobes when call is complete. +} + +// Client represents an RPC Client. +// There may be multiple outstanding Calls associated +// with a single Client, and a Client may be used by +// multiple goroutines simultaneously. +type Client struct { + codec ClientCodec + + reqMutex sync.Mutex // protects following + request Request + + mutex sync.Mutex // protects following + seq uint64 + pending map[uint64]*Call + closing bool // user has called Close + shutdown bool // server has told us to stop +} + +// A ClientCodec implements writing of RPC requests and +// reading of RPC responses for the client side of an RPC session. +// The client calls WriteRequest to write a request to the connection +// and calls ReadResponseHeader and ReadResponseBody in pairs +// to read responses. The client calls Close when finished with the +// connection. ReadResponseBody may be called with a nil +// argument to force the body of the response to be read and then +// discarded. +// See NewClient's comment for information about concurrent access. +type ClientCodec interface { + WriteRequest(*Request, interface{}) error + ReadResponseHeader(*Response) error + ReadResponseBody(interface{}) error + + Close() error +} + +func (client *Client) send(call *Call) { + client.reqMutex.Lock() + defer client.reqMutex.Unlock() + + // Register this call. + client.mutex.Lock() + if client.shutdown || client.closing { + client.mutex.Unlock() + call.Error = ErrShutdown + call.done() + return + } + seq := client.seq + client.seq++ + client.pending[seq] = call + client.mutex.Unlock() + + // Encode and send the request. + client.request.Seq = seq + client.request.ServiceMethod = call.ServiceMethod + err := client.codec.WriteRequest(&client.request, call.Args) + if err != nil { + client.mutex.Lock() + call = client.pending[seq] + delete(client.pending, seq) + client.mutex.Unlock() + if call != nil { + call.Error = err + call.done() + } + } +} + +func (client *Client) input() { + var err error + var response Response + for err == nil { + response = Response{} + err = client.codec.ReadResponseHeader(&response) + if err != nil { + break + } + seq := response.Seq + client.mutex.Lock() + call := client.pending[seq] + delete(client.pending, seq) + client.mutex.Unlock() + + switch { + case call == nil: + // We've got no pending call. That usually means that + // WriteRequest partially failed, and call was already + // removed; response is a server telling us about an + // error reading request body. We should still attempt + // to read error body, but there's no one to give it to. + err = client.codec.ReadResponseBody(nil) + if err != nil { + err = errors.New("reading error body: " + err.Error()) + } + case response.Error != "": + // We've got an error response. Give this to the request; + // any subsequent requests will get the ReadResponseBody + // error if there is one. + call.Error = ServerError(response.Error) + err = client.codec.ReadResponseBody(nil) + if err != nil { + err = errors.New("reading error body: " + err.Error()) + } + call.done() + default: + err = client.codec.ReadResponseBody(call.Reply) + if err != nil { + call.Error = errors.New("reading body " + err.Error()) + } + call.done() + } + } + // Terminate pending calls. + client.reqMutex.Lock() + client.mutex.Lock() + client.shutdown = true + closing := client.closing + if err == io.EOF { + if closing { + err = ErrShutdown + } else { + err = io.ErrUnexpectedEOF + } + } + for _, call := range client.pending { + call.Error = err + call.done() + } + client.mutex.Unlock() + client.reqMutex.Unlock() + if debugLog && err != io.EOF && !closing { + log.Println("rpc: client protocol error:", err) + } +} + +func (call *Call) done() { + select { + case call.Done <- call: + // ok + default: + // We don't want to block here. It is the caller's responsibility to make + // sure the channel has enough buffer space. See comment in Go(). + if debugLog { + log.Println("rpc: discarding Call reply due to insufficient Done chan capacity") + } + } +} + +// NewClient returns a new Client to handle requests to the +// set of services at the other end of the connection. +// It adds a buffer to the write side of the connection so +// the header and payload are sent as a unit. +// +// The read and write halves of the connection are serialized independently, +// so no interlocking is required. However each half may be accessed +// concurrently so the implementation of conn should protect against +// concurrent reads or concurrent writes. +func NewClient(conn io.ReadWriteCloser) *Client { + encBuf := bufio.NewWriter(conn) + client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf} + return NewClientWithCodec(client) +} + +// NewClientWithCodec is like NewClient but uses the specified +// codec to encode requests and decode responses. +func NewClientWithCodec(codec ClientCodec) *Client { + client := &Client{ + codec: codec, + pending: make(map[uint64]*Call), + } + go client.input() + return client +} + +type gobClientCodec struct { + rwc io.ReadWriteCloser + dec *gob.Decoder + enc *gob.Encoder + encBuf *bufio.Writer +} + +func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) { + if err = c.enc.Encode(r); err != nil { + return + } + if err = c.enc.Encode(body); err != nil { + return + } + return c.encBuf.Flush() +} + +func (c *gobClientCodec) ReadResponseHeader(r *Response) error { + return c.dec.Decode(r) +} + +func (c *gobClientCodec) ReadResponseBody(body interface{}) error { + return c.dec.Decode(body) +} + +func (c *gobClientCodec) Close() error { + return c.rwc.Close() +} + +// DialHTTP connects to an HTTP RPC server at the specified network address +// listening on the default HTTP RPC path. +func DialHTTP(network, address string) (*Client, error) { + return DialHTTPPath(network, address, DefaultRPCPath) +} + +// DialHTTPPath connects to an HTTP RPC server +// at the specified network address and path. +func DialHTTPPath(network, address, path string) (*Client, error) { + var err error + conn, err := net.Dial(network, address) + if err != nil { + return nil, err + } + io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n") + + // Require successful HTTP response + // before switching to RPC protocol. + resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"}) + if err == nil && resp.Status == connected { + return NewClient(conn), nil + } + if err == nil { + err = errors.New("unexpected HTTP response: " + resp.Status) + } + conn.Close() + return nil, &net.OpError{ + Op: "dial-http", + Net: network + " " + address, + Addr: nil, + Err: err, + } +} + +// Dial connects to an RPC server at the specified network address. +func Dial(network, address string) (*Client, error) { + conn, err := net.Dial(network, address) + if err != nil { + return nil, err + } + return NewClient(conn), nil +} + +// Close calls the underlying codec's Close method. If the connection is already +// shutting down, ErrShutdown is returned. +func (client *Client) Close() error { + client.mutex.Lock() + if client.closing { + client.mutex.Unlock() + return ErrShutdown + } + client.closing = true + client.mutex.Unlock() + return client.codec.Close() +} + +// Go invokes the function asynchronously. It returns the Call structure representing +// the invocation. The done channel will signal when the call is complete by returning +// the same Call object. If done is nil, Go will allocate a new channel. +// If non-nil, done must be buffered or Go will deliberately crash. +func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call { + call := new(Call) + call.ServiceMethod = serviceMethod + call.Args = args + call.Reply = reply + if done == nil { + done = make(chan *Call, 10) // buffered. + } else { + // If caller passes done != nil, it must arrange that + // done has enough buffer for the number of simultaneous + // RPCs that will be using that channel. If the channel + // is totally unbuffered, it's best not to run at all. + if cap(done) == 0 { + log.Panic("rpc: done channel is unbuffered") + } + } + call.Done = done + client.send(call) + return call +} + +// Call invokes the named function, waits for it to complete, and returns its error status. +func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error { + call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done + return call.Error +} diff --git a/rpc/debug.go b/rpc/debug.go new file mode 100644 index 0000000..a1d799f --- /dev/null +++ b/rpc/debug.go @@ -0,0 +1,90 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package rpc + +/* + Some HTML presented at http://machine:port/debug/rpc + Lists services, their methods, and some statistics, still rudimentary. +*/ + +import ( + "fmt" + "html/template" + "net/http" + "sort" +) + +const debugText = ` + + Services + {{range .}} +
+ Service {{.Name}} +
+ + + {{range .Method}} + + + + + {{end}} +
MethodCalls
{{.Name}}({{.Type.ArgType}}, {{.Type.ReplyType}}) error{{.Type.NumCalls}}
+ {{end}} + + ` + +var debug = template.Must(template.New("RPC debug").Parse(debugText)) + +// If set, print log statements for internal and I/O errors. +var debugLog = false + +type debugMethod struct { + Type *methodType + Name string +} + +type methodArray []debugMethod + +type debugService struct { + Service *service + Name string + Method methodArray +} + +type serviceArray []debugService + +func (s serviceArray) Len() int { return len(s) } +func (s serviceArray) Less(i, j int) bool { return s[i].Name < s[j].Name } +func (s serviceArray) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (m methodArray) Len() int { return len(m) } +func (m methodArray) Less(i, j int) bool { return m[i].Name < m[j].Name } +func (m methodArray) Swap(i, j int) { m[i], m[j] = m[j], m[i] } + +type debugHTTP struct { + *Server +} + +// Runs at /debug/rpc +func (server debugHTTP) ServeHTTP(w http.ResponseWriter, req *http.Request) { + // Build a sorted version of the data. + var services serviceArray + server.serviceMap.Range(func(snamei, svci interface{}) bool { + svc := svci.(*service) + ds := debugService{svc, snamei.(string), make(methodArray, 0, len(svc.method))} + for mname, method := range svc.method { + ds.Method = append(ds.Method, debugMethod{method, mname}) + } + sort.Sort(ds.Method) + services = append(services, ds) + return true + }) + sort.Sort(services) + err := debug.Execute(w, services) + if err != nil { + fmt.Fprintln(w, "rpc: error executing template:", err.Error()) + } +} diff --git a/rpc/server.go b/rpc/server.go new file mode 100644 index 0000000..bc2f9be --- /dev/null +++ b/rpc/server.go @@ -0,0 +1,734 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +/* + Package rpc provides access to the exported methods of an object across a + network or other I/O connection. A server registers an object, making it visible + as a service with the name of the type of the object. After registration, exported + methods of the object will be accessible remotely. A server may register multiple + objects (services) of different types but it is an error to register multiple + objects of the same type. + + Only methods that satisfy these criteria will be made available for remote access; + other methods will be ignored: + + - the method's type is exported. + - the method is exported. + - the method has two arguments, both exported (or builtin) types. + - the method's second argument is a pointer. + - the method has return type error. + + In effect, the method must look schematically like + + func (t *T) MethodName(argType T1, replyType *T2) error + + where T1 and T2 can be marshaled by encoding/gob. + These requirements apply even if a different codec is used. + (In the future, these requirements may soften for custom codecs.) + + The method's first argument represents the arguments provided by the caller; the + second argument represents the result parameters to be returned to the caller. + The method's return value, if non-nil, is passed back as a string that the client + sees as if created by errors.New. If an error is returned, the reply parameter + will not be sent back to the client. + + The server may handle requests on a single connection by calling ServeConn. More + typically it will create a network listener and call Accept or, for an HTTP + listener, HandleHTTP and http.Serve. + + A client wishing to use the service establishes a connection and then invokes + NewClient on the connection. The convenience function Dial (DialHTTP) performs + both steps for a raw network connection (an HTTP connection). The resulting + Client object has two methods, Call and Go, that specify the service and method to + call, a pointer containing the arguments, and a pointer to receive the result + parameters. + + The Call method waits for the remote call to complete while the Go method + launches the call asynchronously and signals completion using the Call + structure's Done channel. + + Unless an explicit codec is set up, package encoding/gob is used to + transport the data. + + Here is a simple example. A server wishes to export an object of type Arith: + + package server + + import "errors" + + type Args struct { + A, B int + } + + type Quotient struct { + Quo, Rem int + } + + type Arith int + + func (t *Arith) Multiply(args *Args, reply *int) error { + *reply = args.A * args.B + return nil + } + + func (t *Arith) Divide(args *Args, quo *Quotient) error { + if args.B == 0 { + return errors.New("divide by zero") + } + quo.Quo = args.A / args.B + quo.Rem = args.A % args.B + return nil + } + + The server calls (for HTTP service): + + arith := new(Arith) + rpc.Register(arith) + rpc.HandleHTTP() + l, e := net.Listen("tcp", ":1234") + if e != nil { + log.Fatal("listen error:", e) + } + go http.Serve(l, nil) + + At this point, clients can see a service "Arith" with methods "Arith.Multiply" and + "Arith.Divide". To invoke one, a client first dials the server: + + client, err := rpc.DialHTTP("tcp", serverAddress + ":1234") + if err != nil { + log.Fatal("dialing:", err) + } + + Then it can make a remote call: + + // Synchronous call + args := &server.Args{7,8} + var reply int + err = client.Call("Arith.Multiply", args, &reply) + if err != nil { + log.Fatal("arith error:", err) + } + fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply) + + or + + // Asynchronous call + quotient := new(Quotient) + divCall := client.Go("Arith.Divide", args, quotient, nil) + replyCall := <-divCall.Done // will be equal to divCall + // check errors, print, etc. + + A server implementation will often provide a simple, type-safe wrapper for the + client. + + The net/rpc package is frozen and is not accepting new features. +*/ +package rpc + +import ( + "bufio" + "encoding/gob" + "errors" + "io" + "log" + "net" + "net/http" + "reflect" + "strings" + "sync" + "unicode" + "unicode/utf8" +) + +const ( + // Defaults used by HandleHTTP + DefaultRPCPath = "/_goRPC_" + DefaultDebugPath = "/debug/rpc" +) + +// Precompute the reflect type for error. Can't use error directly +// because Typeof takes an empty interface value. This is annoying. +var typeOfError = reflect.TypeOf((*error)(nil)).Elem() + +type methodType struct { + sync.Mutex // protects counters + method reflect.Method + ArgType reflect.Type + ReplyType reflect.Type + numCalls uint +} + +type service struct { + name string // name of service + rcvr reflect.Value // receiver of methods for the service + typ reflect.Type // type of the receiver + method map[string]*methodType // registered methods +} + +// Request is a header written before every RPC call. It is used internally +// but documented here as an aid to debugging, such as when analyzing +// network traffic. +type Request struct { + ServiceMethod string // format: "Service.Method" + Seq uint64 // sequence number chosen by client + next *Request // for free list in Server +} + +// Response is a header written before every RPC return. It is used internally +// but documented here as an aid to debugging, such as when analyzing +// network traffic. +type Response struct { + ServiceMethod string // echoes that of the Request + Seq uint64 // echoes that of the request + Error string // error, if any. + next *Response // for free list in Server +} + +// Server represents an RPC Server. +type Server struct { + serviceMap sync.Map // map[string]*service + reqLock sync.Mutex // protects freeReq + freeReq *Request + respLock sync.Mutex // protects freeResp + freeResp *Response +} + +// NewServer returns a new Server. +func NewServer() *Server { + return &Server{} +} + +// DefaultServer is the default instance of *Server. +var DefaultServer = NewServer() + +// Is this an exported - upper case - name? +func isExported(name string) bool { + rune, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(rune) +} + +// Is this type exported or a builtin? +func isExportedOrBuiltinType(t reflect.Type) bool { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + // PkgPath will be non-empty even for an exported type, + // so we need to check the type name as well. + return isExported(t.Name()) || t.PkgPath() == "" +} + +// Register publishes in the server the set of methods of the +// receiver value that satisfy the following conditions: +// - exported method of exported type +// - two arguments, both of exported type +// - the second argument is a pointer +// - one return value, of type error +// It returns an error if the receiver is not an exported type or has +// no suitable methods. It also logs the error using package log. +// The client accesses each method using a string of the form "Type.Method", +// where Type is the receiver's concrete type. +func (server *Server) Register(rcvr interface{}) error { + return server.register(rcvr, "", false) +} + +// RegisterName is like Register but uses the provided name for the type +// instead of the receiver's concrete type. +func (server *Server) RegisterName(name string, rcvr interface{}) error { + return server.register(rcvr, name, true) +} + +func (server *Server) register(rcvr interface{}, name string, useName bool) error { + s := new(service) + s.typ = reflect.TypeOf(rcvr) + s.rcvr = reflect.ValueOf(rcvr) + sname := reflect.Indirect(s.rcvr).Type().Name() + if useName { + sname = name + } + if sname == "" { + s := "rpc.Register: no service name for type " + s.typ.String() + log.Print(s) + return errors.New(s) + } + + if !isExported(sname) && !useName { + s := "rpc.Register: type " + sname + " is not exported" + log.Print(s) + return errors.New(s) + } + s.name = sname + + // Install the methods + s.method = suitableMethods(s.typ, true) + + if len(s.method) == 0 { + str := "" + + // To help the user, see if a pointer receiver would work. + method := suitableMethods(reflect.PtrTo(s.typ), false) + if len(method) != 0 { + str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)" + } else { + str = "rpc.Register: type " + sname + " has no exported methods of suitable type" + } + //log.Print(str) + + return errors.New(str) + } + + if _, dup := server.serviceMap.LoadOrStore(sname, s); dup { + return errors.New("rpc: service already defined: " + sname) + } + return nil +} + +// suitableMethods returns suitable Rpc methods of typ, it will report +// error using log if reportErr is true. +func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType { + methods := make(map[string]*methodType) + for m := 0; m < typ.NumMethod(); m++ { + method := typ.Method(m) + mtype := method.Type + mname := method.Name + + if len(mname) > 4 && mname[:4] != "RPC_" { + continue + } + + // Method must be exported. + if method.PkgPath != "" { + continue + } + // Method needs three ins: receiver, *args, *reply. + if mtype.NumIn() != 3 { + if reportErr { + //log.Printf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn()) + } + continue + } + // First arg need not be a pointer. + argType := mtype.In(1) + if !isExportedOrBuiltinType(argType) { + if reportErr { + //log.Printf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType) + } + continue + } + // Second arg must be a pointer. + replyType := mtype.In(2) + if replyType.Kind() != reflect.Ptr { + if reportErr { + //log.Printf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType) + } + continue + } + // Reply type must be exported. + if !isExportedOrBuiltinType(replyType) { + if reportErr { + //log.Printf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType) + } + continue + } + // Method needs one out. + if mtype.NumOut() != 1 { + if reportErr { + //log.Printf("rpc.Register: method %q has %d output parameters; needs exactly one\n", mname, mtype.NumOut()) + } + continue + } + // The return type of the method must be error. + if returnType := mtype.Out(0); returnType != typeOfError { + if reportErr { + //log.Printf("rpc.Register: return type of method %q is %q, must be error\n", mname, returnType) + } + continue + } + methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType} + } + return methods +} + +// A value sent as a placeholder for the server's response value when the server +// receives an invalid request. It is never decoded by the client since the Response +// contains an error when it is used. +var invalidRequest = struct{}{} + +func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) { + resp := server.getResponse() + // Encode the response header + resp.ServiceMethod = req.ServiceMethod + if errmsg != "" { + resp.Error = errmsg + reply = invalidRequest + } + resp.Seq = req.Seq + sending.Lock() + err := codec.WriteResponse(resp, reply) + if debugLog && err != nil { + log.Println("rpc: writing response:", err) + } + sending.Unlock() + server.freeResponse(resp) +} + +func (m *methodType) NumCalls() (n uint) { + m.Lock() + n = m.numCalls + m.Unlock() + return n +} + +func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) { + if wg != nil { + defer wg.Done() + } + mtype.Lock() + mtype.numCalls++ + mtype.Unlock() + function := mtype.method.Func + // Invoke the method, providing a new value for the reply. + returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv}) + // The return value for the method is an error. + errInter := returnValues[0].Interface() + errmsg := "" + if errInter != nil { + errmsg = errInter.(error).Error() + } + server.sendResponse(sending, req, replyv.Interface(), codec, errmsg) + server.freeRequest(req) +} + +type gobServerCodec struct { + rwc io.ReadWriteCloser + dec *gob.Decoder + enc *gob.Encoder + encBuf *bufio.Writer + closed bool +} + +func (c *gobServerCodec) ReadRequestHeader(r *Request) error { + return c.dec.Decode(r) +} + +func (c *gobServerCodec) ReadRequestBody(body interface{}) error { + return c.dec.Decode(body) +} + +func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) (err error) { + if err = c.enc.Encode(r); err != nil { + if c.encBuf.Flush() == nil { + // Gob couldn't encode the header. Should not happen, so if it does, + // shut down the connection to signal that the connection is broken. + log.Println("rpc: gob error encoding response:", err) + c.Close() + } + return + } + if err = c.enc.Encode(body); err != nil { + if c.encBuf.Flush() == nil { + // Was a gob problem encoding the body but the header has been written. + // Shut down the connection to signal that the connection is broken. + log.Println("rpc: gob error encoding body:", err) + c.Close() + } + return + } + return c.encBuf.Flush() +} + +func (c *gobServerCodec) Close() error { + if c.closed { + // Only call c.rwc.Close once; otherwise the semantics are undefined. + return nil + } + c.closed = true + return c.rwc.Close() +} + +// ServeConn runs the server on a single connection. +// ServeConn blocks, serving the connection until the client hangs up. +// The caller typically invokes ServeConn in a go statement. +// ServeConn uses the gob wire format (see package gob) on the +// connection. To use an alternate codec, use ServeCodec. +// See NewClient's comment for information about concurrent access. +func (server *Server) ServeConn(conn io.ReadWriteCloser) { + buf := bufio.NewWriter(conn) + srv := &gobServerCodec{ + rwc: conn, + dec: gob.NewDecoder(conn), + enc: gob.NewEncoder(buf), + encBuf: buf, + } + server.ServeCodec(srv) +} + +// ServeCodec is like ServeConn but uses the specified codec to +// decode requests and encode responses. +func (server *Server) ServeCodec(codec ServerCodec) { + sending := new(sync.Mutex) + wg := new(sync.WaitGroup) + for { + service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec) + if err != nil { + if debugLog && err != io.EOF { + log.Println("rpc:", err) + } + if !keepReading { + break + } + // send a response if we actually managed to read a header. + if req != nil { + server.sendResponse(sending, req, invalidRequest, codec, err.Error()) + server.freeRequest(req) + } + continue + } + wg.Add(1) + go service.call(server, sending, wg, mtype, req, argv, replyv, codec) + } + // We've seen that there are no more requests. + // Wait for responses to be sent before closing codec. + wg.Wait() + codec.Close() +} + +// ServeRequest is like ServeCodec but synchronously serves a single request. +// It does not close the codec upon completion. +func (server *Server) ServeRequest(codec ServerCodec) error { + sending := new(sync.Mutex) + service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec) + if err != nil { + if !keepReading { + return err + } + // send a response if we actually managed to read a header. + if req != nil { + server.sendResponse(sending, req, invalidRequest, codec, err.Error()) + server.freeRequest(req) + } + return err + } + service.call(server, sending, nil, mtype, req, argv, replyv, codec) + return nil +} + +func (server *Server) getRequest() *Request { + server.reqLock.Lock() + req := server.freeReq + if req == nil { + req = new(Request) + } else { + server.freeReq = req.next + *req = Request{} + } + server.reqLock.Unlock() + return req +} + +func (server *Server) freeRequest(req *Request) { + server.reqLock.Lock() + req.next = server.freeReq + server.freeReq = req + server.reqLock.Unlock() +} + +func (server *Server) getResponse() *Response { + server.respLock.Lock() + resp := server.freeResp + if resp == nil { + resp = new(Response) + } else { + server.freeResp = resp.next + *resp = Response{} + } + server.respLock.Unlock() + return resp +} + +func (server *Server) freeResponse(resp *Response) { + server.respLock.Lock() + resp.next = server.freeResp + server.freeResp = resp + server.respLock.Unlock() +} + +func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) { + service, mtype, req, keepReading, err = server.readRequestHeader(codec) + if err != nil { + if !keepReading { + return + } + // discard body + codec.ReadRequestBody(nil) + return + } + + // Decode the argument value. + argIsValue := false // if true, need to indirect before calling. + if mtype.ArgType.Kind() == reflect.Ptr { + argv = reflect.New(mtype.ArgType.Elem()) + } else { + argv = reflect.New(mtype.ArgType) + argIsValue = true + } + // argv guaranteed to be a pointer now. + if err = codec.ReadRequestBody(argv.Interface()); err != nil { + return + } + if argIsValue { + argv = argv.Elem() + } + + replyv = reflect.New(mtype.ReplyType.Elem()) + + switch mtype.ReplyType.Elem().Kind() { + case reflect.Map: + replyv.Elem().Set(reflect.MakeMap(mtype.ReplyType.Elem())) + case reflect.Slice: + replyv.Elem().Set(reflect.MakeSlice(mtype.ReplyType.Elem(), 0, 0)) + } + return +} + +func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) { + // Grab the request header. + req = server.getRequest() + err = codec.ReadRequestHeader(req) + if err != nil { + req = nil + if err == io.EOF || err == io.ErrUnexpectedEOF { + return + } + err = errors.New("rpc: server cannot decode request: " + err.Error()) + return + } + + // We read the header successfully. If we see an error now, + // we can still recover and move on to the next request. + keepReading = true + + dot := strings.LastIndex(req.ServiceMethod, ".") + if dot < 0 { + err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod) + return + } + serviceName := req.ServiceMethod[:dot] + methodName := req.ServiceMethod[dot+1:] + + // Look up the request. + svci, ok := server.serviceMap.Load(serviceName) + if !ok { + err = errors.New("rpc: can't find service " + req.ServiceMethod) + return + } + svc = svci.(*service) + mtype = svc.method[methodName] + if mtype == nil { + err = errors.New("rpc: can't find method " + req.ServiceMethod) + } + return +} + +// Accept accepts connections on the listener and serves requests +// for each incoming connection. Accept blocks until the listener +// returns a non-nil error. The caller typically invokes Accept in a +// go statement. +func (server *Server) Accept(lis net.Listener) { + for { + conn, err := lis.Accept() + if err != nil { + log.Print("rpc.Serve: accept:", err.Error()) + return + } + go server.ServeConn(conn) + } +} + +// Register publishes the receiver's methods in the DefaultServer. +func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) } + +// RegisterName is like Register but uses the provided name for the type +// instead of the receiver's concrete type. +func RegisterName(name string, rcvr interface{}) error { + return DefaultServer.RegisterName(name, rcvr) +} + +// A ServerCodec implements reading of RPC requests and writing of +// RPC responses for the server side of an RPC session. +// The server calls ReadRequestHeader and ReadRequestBody in pairs +// to read requests from the connection, and it calls WriteResponse to +// write a response back. The server calls Close when finished with the +// connection. ReadRequestBody may be called with a nil +// argument to force the body of the request to be read and discarded. +// See NewClient's comment for information about concurrent access. +type ServerCodec interface { + ReadRequestHeader(*Request) error + ReadRequestBody(interface{}) error + WriteResponse(*Response, interface{}) error + + // Close can be called multiple times and must be idempotent. + Close() error +} + +// ServeConn runs the DefaultServer on a single connection. +// ServeConn blocks, serving the connection until the client hangs up. +// The caller typically invokes ServeConn in a go statement. +// ServeConn uses the gob wire format (see package gob) on the +// connection. To use an alternate codec, use ServeCodec. +// See NewClient's comment for information about concurrent access. +func ServeConn(conn io.ReadWriteCloser) { + DefaultServer.ServeConn(conn) +} + +// ServeCodec is like ServeConn but uses the specified codec to +// decode requests and encode responses. +func ServeCodec(codec ServerCodec) { + DefaultServer.ServeCodec(codec) +} + +// ServeRequest is like ServeCodec but synchronously serves a single request. +// It does not close the codec upon completion. +func ServeRequest(codec ServerCodec) error { + return DefaultServer.ServeRequest(codec) +} + +// Accept accepts connections on the listener and serves requests +// to DefaultServer for each incoming connection. +// Accept blocks; the caller typically invokes it in a go statement. +func Accept(lis net.Listener) { DefaultServer.Accept(lis) } + +// Can connect to RPC service using HTTP CONNECT to rpcPath. +var connected = "200 Connected to Go RPC" + +// ServeHTTP implements an http.Handler that answers RPC requests. +func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if req.Method != "CONNECT" { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + w.WriteHeader(http.StatusMethodNotAllowed) + io.WriteString(w, "405 must CONNECT\n") + return + } + conn, _, err := w.(http.Hijacker).Hijack() + if err != nil { + log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error()) + return + } + io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n") + server.ServeConn(conn) +} + +// HandleHTTP registers an HTTP handler for RPC messages on rpcPath, +// and a debugging handler on debugPath. +// It is still necessary to invoke http.Serve(), typically in a go statement. +func (server *Server) HandleHTTP(rpcPath, debugPath string) { + http.Handle(rpcPath, server) + http.Handle(debugPath, debugHTTP{server}) +} + +// HandleHTTP registers an HTTP handler for RPC messages to DefaultServer +// on DefaultRPCPath and a debugging handler on DefaultDebugPath. +// It is still necessary to invoke http.Serve(), typically in a go statement. +func HandleHTTP() { + DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath) +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..ac5f228 --- /dev/null +++ b/server/server.go @@ -0,0 +1,88 @@ +package server + +import ( + "fmt" + "log" + + "net/http" + _ "net/http/pprof" + "origin/cluster" + "origin/service" + "os" + "os/signal" + "sync" + "syscall" +) + +type CExitCtl struct { + exit chan bool + waitGroup *sync.WaitGroup +} + +type cserver struct { + CExitCtl + serviceManager service.IServiceManager + sigs chan os.Signal +} + +func (s *cserver) Init() { + service.InitLog() + service.InstanceServiceMgr().Init() + + s.exit = make(chan bool) + s.waitGroup = &sync.WaitGroup{} + s.sigs = make(chan os.Signal, 1) + signal.Notify(s.sigs, syscall.SIGINT, syscall.SIGTERM) +} + +func (s *cserver) SetupService(services ...service.IService) { + for i := 0; i < len(services); i++ { + if cluster.InstanceClusterMgr().HasLocalService(services[i].GetServiceName()) == true { + service.InstanceServiceMgr().Setup(services[i]) + } + + } + +} + +func (s *cserver) Start() { + go func() { + + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() + + service.InstanceServiceMgr().Start(s.exit, s.waitGroup) + cluster.InstanceClusterMgr().Start() + + select { + case <-s.sigs: + fmt.Println("收到信号推出程序") + } + + s.Stop() +} + +func (s *cserver) Stop() { + close(s.exit) + s.waitGroup.Wait() +} + +func NewServer() *cserver { + err := cluster.InstanceClusterMgr().Init() + if err != nil { + fmt.Print(err) + return nil + } + + return new(cserver) +} + +func HasCmdParam(param string) bool { + for i := 0; i < len(os.Args); i++ { + if os.Args[i] == param { + return true + } + } + + return false +} diff --git a/service/Service.go b/service/Service.go new file mode 100644 index 0000000..e9117a0 --- /dev/null +++ b/service/Service.go @@ -0,0 +1,180 @@ +package service + +import ( + "fmt" + "log" + + "os" + "reflect" + "strings" + "sync" + "time" +) + +type MethodInfo struct { + Fun reflect.Value + ParamList []reflect.Value + types reflect.Type +} + +type IBaseModule interface { + SetModuleName(modulename string) bool + GetModuleName() string + OnInit() error + OnRun() error +} + +type IService interface { + Init(Iservice interface{}, servicetype int) error + Run(service IService, exit chan bool, pwaitGroup *sync.WaitGroup) error + OnInit() error + OnEndInit() error + OnRun() error + OnDestory() error + OnFetchService(iservice IService) error + GetServiceType() int + GetServiceName() string + GetServiceId() int + IsTimeOutTick(microSecond int64) bool + + AddModule(module IBaseModule, autorun bool) bool + GetModule(module string) IBaseModule + GetStatus() int +} + +var Log *log.Logger +var logFile *os.File + +func InitLog() { + fileName := "log.log" + var err error + logFile, err = os.Create(fileName) + + if err != nil { + log.Fatalln("open file error") + } + Log = log.New(logFile, "", log.LstdFlags) +} + +type BaseService struct { + serviceid int + servicename string + servicetype int + + tickTime int64 + statTm int64 + Status int + + modulelist []IBaseModule +} + +type BaseModule struct { + modulename string +} + +func (slf *BaseModule) SetModuleName(modulename string) bool { + slf.modulename = modulename + return true +} + +func (slf *BaseModule) GetModuleName() string { + return slf.modulename +} + +func (slf *BaseService) GetServiceId() int { + return slf.serviceid +} + +func (slf *BaseService) GetServiceType() int { + return slf.servicetype +} + +func (slf *BaseService) GetServiceName() string { + return slf.servicename +} + +func (slf *BaseService) GetStatus() int { + return slf.Status +} + +func (slf *BaseService) OnEndInit() error { + return nil +} + +func (slf *BaseService) OnFetchService(iservice IService) error { + return nil +} + +func (slf *BaseService) Init(Iservice interface{}, servicetype int) error { + slf.servicename = fmt.Sprintf("%T", Iservice) + parts := strings.Split(slf.servicename, ".") + if len(parts) != 2 { + err := fmt.Errorf("BaseService.Init: service name is error: %q", slf.servicename) + return err + } + + slf.servicename = parts[1] + slf.servicetype = servicetype + slf.serviceid = InstanceServiceMgr().GenServiceID() + + return nil +} + +func (slf *BaseService) Run(service IService, exit chan bool, pwaitGroup *sync.WaitGroup) error { + defer pwaitGroup.Done() + for { + select { + case <-exit: + fmt.Println("stopping...") + return nil + default: + } + slf.tickTime = time.Now().UnixNano() / 1e6 + service.OnRun() + slf.tickTime = time.Now().UnixNano() / 1e6 + } + + return nil +} + +func (slf *BaseService) RPC_CheckServiceTickTimeOut(microSecond int64) error { + + if slf.IsTimeOutTick(microSecond) == true { + Log.Printf("service:%s is timeout,state:%d", slf.GetServiceName(), slf.GetStatus()) + } + + return nil +} + +func (slf *BaseService) IsTimeOutTick(microSecond int64) bool { + + nowtm := time.Now().UnixNano() / 1e6 + return nowtm-slf.tickTime >= microSecond + +} + +func (slf *BaseService) AddModule(module IBaseModule, autorun bool) bool { + typename := fmt.Sprintf("%v", reflect.TypeOf(module)) + parts := strings.Split(typename, ".") + if len(parts) < 2 { + return false + } + module.SetModuleName(parts[1]) + slf.modulelist = append(slf.modulelist, module) + module.OnInit() + if autorun == true { + go module.OnRun() + } + + return true +} + +func (slf *BaseService) GetModule(module string) IBaseModule { + for _, v := range slf.modulelist { + if v.GetModuleName() == module { + return v + } + } + + return nil +} diff --git a/service/servicemanager.go b/service/servicemanager.go new file mode 100644 index 0000000..913d698 --- /dev/null +++ b/service/servicemanager.go @@ -0,0 +1,119 @@ +package service + +import ( + "fmt" + "sync" + "time" +) + +type IServiceManager interface { + Setup(s IService) bool + Init() bool + Start() bool + CreateServiceID() int +} + +type CServiceManager struct { + //serviceList []IService + genserviceid int + localserviceMap map[string]IService +} + +func (slf *CServiceManager) Setup(s IService) bool { + + slf.localserviceMap[s.GetServiceName()] = s + return true +} + +func (slf *CServiceManager) FindService(serviceName string) IService { + service, ok := slf.localserviceMap[serviceName] + if ok { + return service + } + + return nil +} + +type FetchService func(s IService) error + +func (slf *CServiceManager) FetchService(s FetchService) IService { + for _, se := range slf.localserviceMap { + s(se) + } + + return nil +} + +func (slf *CServiceManager) Init() bool { + + for _, s := range slf.localserviceMap { + if s.OnInit() != nil { + return false + } + } + + // 初始化结束 + for _, s := range slf.localserviceMap { + if s.OnEndInit() != nil { + return false + } + } + + return true +} + +func (slf *CServiceManager) Start(exit chan bool, pwaitGroup *sync.WaitGroup) bool { + for _, s := range slf.localserviceMap { + pwaitGroup.Add(1) + go s.Run(s, exit, pwaitGroup) + } + + pwaitGroup.Add(1) + go slf.CheckServiceTimeTimeout(exit, pwaitGroup) + return true +} + +func (slf *CServiceManager) CheckServiceTimeTimeout(exit chan bool, pwaitGroup *sync.WaitGroup) { + defer pwaitGroup.Done() + for { + select { + case <-exit: + fmt.Println("CheckServiceTimeTimeout stopping...") + return + default: + } + + for _, s := range slf.localserviceMap { + + if s.IsTimeOutTick(20000) == true { + Log.Printf("service:%s is timeout,state:%d", s.GetServiceName(), s.GetStatus()) + } + } + time.Sleep(2 * time.Second) + } + +} + +func (slf *CServiceManager) GenServiceID() int { + slf.genserviceid += 1 + return slf.genserviceid +} + +func (slf *CServiceManager) Get() bool { + for _, s := range slf.localserviceMap { + go s.OnRun() + } + + return true +} + +var _self *CServiceManager + +func InstanceServiceMgr() *CServiceManager { + if _self == nil { + _self = new(CServiceManager) + _self.localserviceMap = make(map[string]IService) + return _self + } + return _self +} diff --git a/sysmodule/basemodule.go b/sysmodule/basemodule.go new file mode 100644 index 0000000..d68e6cf --- /dev/null +++ b/sysmodule/basemodule.go @@ -0,0 +1 @@ +package sysmodule diff --git a/sysmodule/timer.go.mv b/sysmodule/timer.go.mv new file mode 100644 index 0000000..675c401 --- /dev/null +++ b/sysmodule/timer.go.mv @@ -0,0 +1,35 @@ +package wsservice + +import ( + "origin/service" +) + +//声明控制器函数Map类型变量 + +type cWSService struct { + service.BaseService + port int +} + +func (ws *cWSService) OnInit() error { + return nil +} + +func (ws *cWSService) OnRun() error { + + return nil +} + +func (ws *cWSService) OnDestory() error { + return nil +} + +func NewWSService(servicetype int) *cWSService { + wss := new(cWSService) + wss.Init(wss, servicetype) + return wss +} + +func (ws *cWSService) RPC_TestMethod(a string, b int) error { + return nil +} diff --git a/sysmodule/websocketclient.go b/sysmodule/websocketclient.go new file mode 100644 index 0000000..c2e794e --- /dev/null +++ b/sysmodule/websocketclient.go @@ -0,0 +1,158 @@ +package sysmodule + +import ( + "fmt" + "gorilla/websocket" + "log" + "net/http" + "net/url" + + "time" +) + +type IWebsocketClient interface { + Init(slf IWebsocketClient, strurl string, bproxy bool, timeoutsec time.Duration) error + Start() error + WriteMessage(msg []byte) error + OnDisconnect() error + OnConnected() error + OnReadMessage(msg []byte) error +} + +type WebsocketClient struct { + WsDailer *websocket.Dialer + conn *websocket.Conn + url string + state int //0未连接状态 1连接状态 + bwritemsg chan []byte + slf IWebsocketClient + timeoutsec time.Duration +} + +func (ws *WebsocketClient) Init(slf IWebsocketClient, strurl string, bproxy bool, timeoutsec time.Duration) error { + + ws.timeoutsec = timeoutsec + ws.slf = slf + if bproxy == true { + proxy := func(_ *http.Request) (*url.URL, error) { + return url.Parse("http://127.0.0.1:1080") + } + + if timeoutsec > 0 { + tosec := timeoutsec * time.Second + ws.WsDailer = &websocket.Dialer{Proxy: proxy, HandshakeTimeout: tosec} + } else { + ws.WsDailer = &websocket.Dialer{Proxy: proxy} + } + } else { + if timeoutsec > 0 { + tosec := timeoutsec * time.Second + ws.WsDailer = &websocket.Dialer{HandshakeTimeout: tosec} + } else { + ws.WsDailer = &websocket.Dialer{} + } + } + + ws.url = strurl + ws.bwritemsg = make(chan []byte, 1000) + + return nil +} + +func (ws *WebsocketClient) OnRun() error { + for { + if ws.state == 0 { + time.Sleep(1 * time.Second) + ws.StartConnect() + } else { + ws.conn.SetReadDeadline(time.Now().Add(ws.timeoutsec * time.Second)) + _, message, err := ws.conn.ReadMessage() + + if err != nil { + log.Printf("到服务器的连接断开 %+v\n", err) + ws.conn.Close() + ws.state = 0 + ws.slf.OnDisconnect() + continue + } + + ws.slf.OnReadMessage(message) + } + } + + return nil +} + +func (ws *WebsocketClient) StartConnect() error { + + var err error + ws.conn, _, err = ws.WsDailer.Dial(ws.url, nil) + fmt.Printf("connecting %s, %+v\n", ws.url, err) + if err != nil { + return err + } + + ws.state = 1 + ws.slf.OnConnected() + + return nil +} + +func (ws *WebsocketClient) Start() error { + ws.state = 0 + go ws.OnRun() + go ws.writeMsg() + return nil +} + +//触发 +func (ws *WebsocketClient) writeMsg() error { + timerC := time.NewTicker(time.Second * 5).C + for { + if ws.state == 0 { + time.Sleep(1 * time.Second) + continue + } + select { + case <-timerC: + if ws.state == 1 { + ws.WriteMessage([]byte(`ping`)) + } + case msg := <-ws.bwritemsg: + if ws.state == 1 { + ws.conn.SetWriteDeadline(time.Now().Add(ws.timeoutsec * time.Second)) + err := ws.conn.WriteMessage(websocket.TextMessage, msg) + + if err != nil { + fmt.Print(err) + ws.state = 0 + ws.conn.Close() + ws.slf.OnDisconnect() + } + } + } + } + + return nil +} + +func (ws *WebsocketClient) WriteMessage(msg []byte) error { + ws.bwritemsg <- msg + return nil +} + +func (ws *WebsocketClient) OnDisconnect() error { + + return nil +} + +func (ws *WebsocketClient) OnConnected() error { + + return nil +} + +//触发 +func (ws *WebsocketClient) OnReadMessage(msg []byte) error { + + return nil +} diff --git a/wsservice/wsservice.go b/wsservice/wsservice.go new file mode 100644 index 0000000..675c401 --- /dev/null +++ b/wsservice/wsservice.go @@ -0,0 +1,35 @@ +package wsservice + +import ( + "origin/service" +) + +//声明控制器函数Map类型变量 + +type cWSService struct { + service.BaseService + port int +} + +func (ws *cWSService) OnInit() error { + return nil +} + +func (ws *cWSService) OnRun() error { + + return nil +} + +func (ws *cWSService) OnDestory() error { + return nil +} + +func NewWSService(servicetype int) *cWSService { + wss := new(cWSService) + wss.Init(wss, servicetype) + return wss +} + +func (ws *cWSService) RPC_TestMethod(a string, b int) error { + return nil +}