diff --git a/cluster/cluster.go b/cluster/cluster.go
new file mode 100644
index 0000000..444cb34
--- /dev/null
+++ b/cluster/cluster.go
@@ -0,0 +1,381 @@
+package cluster
+
+import (
+ "fmt"
+ "log"
+ "net"
+ "origin/rpc"
+ "origin/service"
+ "os"
+ "strconv"
+ "strings"
+ "time"
+)
+
+//https://github.com/rocket049/rpc2d/blob/master/rpcnode.go
+//http://daizuozhuo.github.io/golang-rpc-practice/
+
+type RpcClient struct {
+ nodeid int
+ pclient *rpc.Client
+ serverAddr string
+ isConnect bool
+}
+
+type CCluster struct {
+ port int
+ cfg *ClusterConfig
+ nodeclient map[int]*RpcClient
+
+ reader net.Conn
+ writer net.Conn
+
+ LocalRpcClient *rpc.Client
+}
+
+func (slf *CCluster) ReadNodeInfo(nodeid int) error {
+ //连接Server结点
+ var err error
+
+ slf.cfg, err = ReadCfg("./config/cluster.json", nodeid)
+ if err != nil {
+ fmt.Printf("%v", err)
+ return nil
+ }
+
+ return nil
+}
+
+func (slf *CCluster) GetClusterClient(id int) *rpc.Client {
+ v, ok := slf.nodeclient[id]
+ if ok == false {
+ return nil
+ }
+
+ return v.pclient
+}
+
+func (slf *CCluster) GetClusterNode(strNodeName string) *CNodeCfg {
+ for _, value := range slf.cfg.NodeList {
+ if value.NodeName == strNodeName {
+ return &value
+ }
+ }
+
+ return nil
+}
+
+func (slf *CCluster) GetBindUrl() (string, error) {
+ return slf.cfg.currentNode.ServerAddr, nil
+}
+
+type CTestData struct {
+ Bbbb int64
+ Cccc int
+ Ddd string
+}
+
+func (slf *CCluster) AcceptRpc(tpcListen *net.TCPListener) error {
+ slf.reader, slf.writer = net.Pipe()
+ go rpc.ServeConn(slf.reader)
+ slf.LocalRpcClient = rpc.NewClient(slf.writer)
+
+ for {
+ conn, err := tpcListen.Accept()
+ if err != nil {
+ fmt.Print(err)
+ return err
+ }
+
+ //使用goroutine单独处理rpc连接请求
+ go rpc.ServeConn(conn)
+ }
+
+ return nil
+}
+
+func (slf *CCluster) ListenService() error {
+
+ bindStr, err := slf.GetBindUrl()
+ if err != nil {
+ return err
+ }
+
+ tcpaddr, err := net.ResolveTCPAddr("tcp4", bindStr)
+ if err != nil {
+ return err
+ }
+
+ tcplisten, err2 := net.ListenTCP("tcp", tcpaddr)
+ if err2 != nil {
+ return err2
+ }
+
+ go slf.AcceptRpc(tcplisten)
+ return nil
+}
+
+type CPing struct {
+ TimeStamp int64
+}
+
+type CPong struct {
+ TimeStamp int64
+}
+
+func (slf *CPing) Ping(ping *CPing, pong *CPong) error {
+ pong.TimeStamp = ping.TimeStamp
+ return nil
+}
+
+func (slf *CCluster) ConnService() error {
+ ping := CPing{0}
+ pong := CPong{0}
+ fmt.Println(rpc.RegisterName("CPing", &ping))
+
+ //连接集群服务器
+ for _, nodeList := range slf.cfg.mapClusterNodeService {
+ for _, node := range nodeList {
+ slf.nodeclient[node.NodeID] = &RpcClient{node.NodeID, nil, node.ServerAddr, false}
+ }
+ }
+
+ for {
+ for _, rpcClient := range slf.nodeclient {
+
+ //
+ if rpcClient.isConnect == true {
+ ping.TimeStamp = 0
+ err := rpcClient.pclient.Call("CPing.Ping", &ping, &pong)
+ if err != nil {
+ rpcClient.pclient.Close()
+ rpcClient.pclient = nil
+ rpcClient.isConnect = false
+ continue
+ }
+ continue
+ }
+
+ if rpcClient.pclient != nil {
+ rpcClient.pclient.Close()
+ rpcClient.pclient = nil
+ }
+
+ client, err := rpc.Dial("tcp", rpcClient.serverAddr)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+
+ v, _ := slf.nodeclient[rpcClient.nodeid]
+ v.pclient = client
+ v.isConnect = true
+ }
+
+ time.Sleep(time.Second * 2)
+ }
+
+ return nil
+}
+
+func (slf *CCluster) Init() error {
+ if len(os.Args) < 2 {
+ return fmt.Errorf("param error not find NodeId=number")
+ }
+
+ parts := strings.Split(os.Args[1], "=")
+ if len(parts) < 2 {
+ return fmt.Errorf("param error not find NodeId=number")
+ }
+ if parts[0] != "NodeId" {
+ return fmt.Errorf("param error not find NodeId=number")
+ }
+
+ slf.nodeclient = make(map[int]*RpcClient)
+
+ //读取配置
+ ret, err := strconv.Atoi(parts[1])
+ if err != nil {
+ return err
+ }
+
+ return slf.ReadNodeInfo(ret)
+}
+
+func (slf *CCluster) Start() error {
+
+ service.InstanceServiceMgr().FetchService(slf.OnFetchService)
+
+ //监听服务
+ slf.ListenService()
+
+ //集群
+ go slf.ConnService()
+
+ return nil
+}
+
+//Node.servicename.methodname
+//servicename.methodname
+//_servicename.methodname
+func (slf *CCluster) Call(NodeServiceMethod string, args interface{}, reply interface{}) error {
+ var callServiceName string
+ nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName)
+ if len(nodeidList) > 1 {
+ return fmt.Errorf("Call: %s find %d nodes.", NodeServiceMethod, len(nodeidList))
+ }
+
+ for _, nodeid := range nodeidList {
+ if nodeid == slf.GetCurrentNodeId() {
+ return slf.LocalRpcClient.Call(callServiceName, args, reply)
+ } else {
+
+ pclient := slf.GetClusterClient(nodeid)
+
+ if pclient == nil {
+ return fmt.Errorf("Call: NodeId %d is not find.", nodeid)
+ }
+ err := pclient.Call(callServiceName, args, reply)
+ return err
+ }
+ }
+
+ return fmt.Errorf("Call: %s fail.", NodeServiceMethod)
+}
+
+func (slf *CCluster) GetNodeList(NodeServiceMethod string, rpcServerMethod *string) []int {
+ var nodename string
+ var servicename string
+ var methodname string
+ var nodeidList []int
+
+ parts := strings.Split(NodeServiceMethod, ".")
+ if len(parts) == 2 {
+ servicename = parts[0]
+ methodname = parts[1]
+ } else if len(parts) == 3 {
+ nodename = parts[0]
+ servicename = parts[1]
+ methodname = parts[2]
+ } else {
+ return nodeidList
+ }
+
+ if nodename == "" {
+ nodeidList = make([]int, 0)
+ if servicename[:1] == "_" {
+ servicename = servicename[1:]
+ nodeidList = append(nodeidList, slf.GetCurrentNodeId())
+ } else {
+ nodeidList = slf.cfg.GetIdByService(servicename)
+ }
+ } else {
+ nodeidList = slf.GetIdByNodeService(nodename, servicename)
+ }
+
+ *rpcServerMethod = servicename + "." + methodname
+ return nodeidList
+}
+
+func (slf *CCluster) Go(NodeServiceMethod string, args interface{}) error {
+ var callServiceName string
+ nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName)
+ if len(nodeidList) > 1 {
+ return fmt.Errorf("Call: %s find %d nodes.", NodeServiceMethod, len(nodeidList))
+ }
+
+ for _, nodeid := range nodeidList {
+ if nodeid == slf.GetCurrentNodeId() {
+ slf.LocalRpcClient.Go(callServiceName, args, nil, nil)
+ return nil
+ } else {
+
+ pclient := slf.GetClusterClient(nodeid)
+
+ if pclient == nil {
+ return fmt.Errorf("Call: NodeId %d is not find.", nodeid)
+ }
+ pclient.Go(callServiceName, args, nil, nil)
+ return nil
+ }
+ }
+
+ return fmt.Errorf("Call: %s fail.", NodeServiceMethod)
+}
+func (slf *CCluster) CallNode(nodeid int, servicemethod string, args interface{}, reply interface{}) error {
+ pclient := slf.GetClusterClient(nodeid)
+ if pclient == nil {
+ return fmt.Errorf("Call: NodeId %d is not find.", nodeid)
+ }
+
+ err := pclient.Call(servicemethod, args, reply)
+ return err
+
+}
+
+func (slf *CCluster) GoNode(nodeid int, args interface{}, servicemethod string) error {
+ pclient := slf.GetClusterClient(nodeid)
+ if pclient == nil {
+ return fmt.Errorf("Call: NodeId %d is not find.", nodeid)
+ }
+
+ replyCall := pclient.Go(servicemethod, args, nil, nil)
+ //ret := <-replyCall.Done
+ if replyCall.Error != nil {
+ fmt.Print(replyCall.Error)
+ }
+
+ //fmt.Print(ret)
+ return nil
+
+}
+
+func (ws *CCluster) OnFetchService(iservice service.IService) error {
+ rpc.RegisterName(iservice.GetServiceName(), iservice)
+ return nil
+}
+
+//向远程服务器调用
+//Node.servicename.methodname
+//servicename.methodname
+func Call(NodeServiceMethod string, args interface{}, reply interface{}) error {
+ return InstanceClusterMgr().Call(NodeServiceMethod, args, reply)
+}
+
+func Go(NodeServiceMethod string, args interface{}) error {
+ return InstanceClusterMgr().Go(NodeServiceMethod, args)
+}
+
+func CallNode(NodeId int, servicemethod string, args interface{}, reply interface{}) error {
+ return InstanceClusterMgr().CallNode(NodeId, servicemethod, args, reply)
+}
+
+func GoNode(NodeId int, servicemethod string, args interface{}) error {
+ return InstanceClusterMgr().GoNode(NodeId, args, servicemethod)
+}
+
+func CastCall(NodeServiceMethod string, args interface{}, reply []interface{}) error {
+ return InstanceClusterMgr().Call(NodeServiceMethod, args, reply)
+}
+
+var _self *CCluster
+
+func InstanceClusterMgr() *CCluster {
+ if _self == nil {
+ _self = new(CCluster)
+ return _self
+ }
+ return _self
+}
+
+func (slf *CCluster) GetIdByNodeService(NodeName string, serviceName string) []int {
+ return slf.cfg.GetIdByNodeService(NodeName, serviceName)
+}
+
+func (slf *CCluster) HasLocalService(serviceName string) bool {
+ return slf.cfg.HasLocalService(serviceName)
+}
+
+func (slf *CCluster) GetCurrentNodeId() int {
+ return slf.cfg.currentNode.NodeID
+}
diff --git a/cluster/config.go b/cluster/config.go
new file mode 100644
index 0000000..02e7424
--- /dev/null
+++ b/cluster/config.go
@@ -0,0 +1,144 @@
+package cluster
+
+import (
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+)
+
+type CNodeCfg struct {
+ NodeID int
+ NodeName string
+
+ ServerAddr string
+ ServiceList []string
+ ClusterNode []string
+}
+
+type CNode struct {
+ NodeID int
+ NodeName string
+
+ ServerAddr string
+ ServiceList map[string]bool
+}
+
+type ClusterConfig struct {
+ NodeList []CNodeCfg
+
+ //通过id获取结点
+ mapIdNode map[int]CNode
+
+ //map[nodename][ {CNode} ]
+ mapClusterNodeService map[string][]CNode
+ mapClusterServiceNode map[string][]CNode
+ mapLocalService map[string]bool
+
+ currentNode CNode
+}
+
+// ReadCfg ...
+func ReadCfg(path string, nodeid int) (*ClusterConfig, error) {
+ c := &ClusterConfig{}
+
+ d, err := ioutil.ReadFile(path)
+ if err != nil {
+ return nil, err
+ }
+ err = json.Unmarshal(d, c)
+ if err != nil {
+ return nil, err
+ }
+
+ c.mapIdNode = make(map[int]CNode, 1)
+ c.mapClusterNodeService = make(map[string][]CNode, 1)
+ c.mapLocalService = make(map[string]bool)
+ c.mapClusterServiceNode = make(map[string][]CNode, 1)
+
+ //组装mapIdNode
+ var clusterNode []string
+ for _, v := range c.NodeList {
+
+ //1.取所有结点
+ mapservice := make(map[string]bool, 1)
+ c.mapIdNode[v.NodeID] = CNode{v.NodeID, v.NodeName, v.ServerAddr, mapservice}
+
+ if nodeid == v.NodeID {
+ c.currentNode = c.mapIdNode[v.NodeID]
+
+ //2.mapServiceNode map[string][]string
+ for _, s := range v.ServiceList {
+ mapservice[s] = true
+ c.mapLocalService[s] = true
+ }
+
+ for _, c := range v.ClusterNode {
+ clusterNode = append(clusterNode, c)
+ }
+ }
+ }
+
+ //组装mapClusterNodeService
+ for _, kc := range clusterNode {
+ for _, v := range c.NodeList {
+ if kc == v.NodeName {
+ //将自有连接的结点取详细信息
+ mapservice := make(map[string]bool, 1)
+ curNode := CNode{v.NodeID, v.NodeName, v.ServerAddr, mapservice}
+ for _, s := range v.ServiceList {
+ mapservice[s] = true
+ c.mapClusterServiceNode[s] = append(c.mapClusterServiceNode[s], curNode)
+ }
+ c.mapClusterNodeService[v.NodeName] = append(c.mapClusterNodeService[v.NodeName], curNode)
+ }
+
+ }
+ }
+
+ fmt.Println(c.mapIdNode)
+ fmt.Println(c.mapClusterNodeService)
+ fmt.Println(c.mapClusterServiceNode)
+ return c, nil
+}
+
+func (slf *ClusterConfig) GetIdByService(serviceName string) []int {
+ var nodeidlist []int
+ nodeidlist = make([]int, 0)
+
+ nodeList, ok := slf.mapClusterServiceNode[serviceName]
+ if ok == true {
+ for _, v := range nodeList {
+ nodeidlist = append(nodeidlist, v.NodeID)
+ }
+ }
+
+ return nodeidlist
+}
+
+func (slf *ClusterConfig) GetIdByNodeService(NodeName string, serviceName string) []int {
+ var nodeidlist []int
+ nodeidlist = make([]int, 0)
+
+ if NodeName == slf.currentNode.NodeName {
+ nodeidlist = append(nodeidlist, slf.currentNode.NodeID)
+ }
+
+ v, ok := slf.mapClusterNodeService[NodeName]
+ if ok == false {
+ return nodeidlist
+ }
+
+ for _, n := range v {
+ _, ok = n.ServiceList[serviceName]
+ if ok == true {
+ nodeidlist = append(nodeidlist, n.NodeID)
+ }
+ }
+
+ return nodeidlist
+}
+
+func (slf *ClusterConfig) HasLocalService(serviceName string) bool {
+ _, ok := slf.mapLocalService[serviceName]
+ return ok == true
+}
diff --git a/netbase/websocketserver.go b/netbase/websocketserver.go
new file mode 100644
index 0000000..faea3ed
--- /dev/null
+++ b/netbase/websocketserver.go
@@ -0,0 +1,182 @@
+package netbase
+
+import (
+ "errors"
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/gorilla/websocket"
+)
+
+type Connection struct {
+ wsConnect *websocket.Conn
+ inChan chan []byte
+ outChan chan []byte
+ closeChan chan byte
+
+ mutex sync.Mutex // 对closeChan关闭上锁
+ isClosed bool // 防止closeChan被关闭多次
+}
+
+type CWebSocketServer struct {
+ Connection
+ bindurl string
+}
+
+func NewWebSocketServer(bindurl string) *CWebSocketServer {
+ wss := new(CWebSocketServer)
+ wss.bindurl = bindurl
+ return wss
+}
+
+func (wss *CWebSocketServer) Start() {
+
+ http.HandleFunc("/ws", wsHandler)
+ go http.ListenAndServe(wss.bindurl, nil)
+}
+
+var (
+ upgrader = websocket.Upgrader{
+ // 允许跨域
+ CheckOrigin: func(r *http.Request) bool {
+ return true
+ },
+ }
+)
+
+func wsHandler(w http.ResponseWriter, r *http.Request) {
+ // w.Write([]byte("hello"))
+ var (
+ wsConn *websocket.Conn
+ err error
+ conn *Connection
+ data []byte
+ )
+ // 完成ws协议的握手操作
+ // Upgrade:websocket
+ if wsConn, err = upgrader.Upgrade(w, r, nil); err != nil {
+ return
+ }
+
+ if conn, err = InitConnection(wsConn); err != nil {
+ goto ERR
+ }
+
+ // 启动线程,不断发消息
+ go func() {
+ var (
+ err error
+ )
+ for {
+ if err = conn.WriteMessage([]byte("heartbeat")); err != nil {
+ return
+ }
+ time.Sleep(1 * time.Second)
+ }
+ }()
+
+ for {
+ if data, err = conn.ReadMessage(); err != nil {
+ goto ERR
+ }
+ if err = conn.WriteMessage(data); err != nil {
+ goto ERR
+ }
+ }
+
+ERR:
+ conn.Close()
+
+}
+
+func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) {
+ conn = &Connection{
+ wsConnect: wsConn,
+ inChan: make(chan []byte, 1000),
+ outChan: make(chan []byte, 1000),
+ closeChan: make(chan byte, 1),
+ }
+ // 启动读协程
+ go conn.readLoop()
+ // 启动写协程
+ go conn.writeLoop()
+ return
+}
+
+func (conn *Connection) ReadMessage() (data []byte, err error) {
+
+ select {
+ case data = <-conn.inChan:
+ case <-conn.closeChan:
+ err = errors.New("connection is closeed")
+ }
+ return
+}
+
+func (conn *Connection) WriteMessage(data []byte) (err error) {
+
+ select {
+ case conn.outChan <- data:
+ case <-conn.closeChan:
+ err = errors.New("connection is closeed")
+ }
+ return
+}
+
+func (conn *Connection) Close() {
+ // 线程安全,可多次调用
+ conn.wsConnect.Close()
+ // 利用标记,让closeChan只关闭一次
+ conn.mutex.Lock()
+ if !conn.isClosed {
+ close(conn.closeChan)
+ conn.isClosed = true
+ }
+ conn.mutex.Unlock()
+}
+
+// 内部实现
+func (conn *Connection) readLoop() {
+ var (
+ data []byte
+ err error
+ )
+ for {
+ if _, data, err = conn.wsConnect.ReadMessage(); err != nil {
+ goto ERR
+ }
+ //阻塞在这里,等待inChan有空闲位置
+ select {
+ case conn.inChan <- data:
+ case <-conn.closeChan: // closeChan 感知 conn断开
+ goto ERR
+ }
+
+ }
+
+ERR:
+ conn.Close()
+}
+
+func (conn *Connection) writeLoop() {
+ var (
+ data []byte
+ err error
+ )
+
+ for {
+ select {
+ case data = <-conn.outChan:
+ case <-conn.closeChan:
+ goto ERR
+ }
+ if err = conn.wsConnect.WriteMessage(websocket.TextMessage, data); err != nil {
+ goto ERR
+ }
+ }
+
+ERR:
+ conn.Close()
+
+}
diff --git a/rpc/client.go b/rpc/client.go
new file mode 100644
index 0000000..cad2d45
--- /dev/null
+++ b/rpc/client.go
@@ -0,0 +1,324 @@
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rpc
+
+import (
+ "bufio"
+ "encoding/gob"
+ "errors"
+ "io"
+ "log"
+ "net"
+ "net/http"
+ "sync"
+)
+
+// ServerError represents an error that has been returned from
+// the remote side of the RPC connection.
+type ServerError string
+
+func (e ServerError) Error() string {
+ return string(e)
+}
+
+var ErrShutdown = errors.New("connection is shut down")
+
+// Call represents an active RPC.
+type Call struct {
+ ServiceMethod string // The name of the service and method to call.
+ Args interface{} // The argument to the function (*struct).
+ Reply interface{} // The reply from the function (*struct).
+ Error error // After completion, the error status.
+ Done chan *Call // Strobes when call is complete.
+}
+
+// Client represents an RPC Client.
+// There may be multiple outstanding Calls associated
+// with a single Client, and a Client may be used by
+// multiple goroutines simultaneously.
+type Client struct {
+ codec ClientCodec
+
+ reqMutex sync.Mutex // protects following
+ request Request
+
+ mutex sync.Mutex // protects following
+ seq uint64
+ pending map[uint64]*Call
+ closing bool // user has called Close
+ shutdown bool // server has told us to stop
+}
+
+// A ClientCodec implements writing of RPC requests and
+// reading of RPC responses for the client side of an RPC session.
+// The client calls WriteRequest to write a request to the connection
+// and calls ReadResponseHeader and ReadResponseBody in pairs
+// to read responses. The client calls Close when finished with the
+// connection. ReadResponseBody may be called with a nil
+// argument to force the body of the response to be read and then
+// discarded.
+// See NewClient's comment for information about concurrent access.
+type ClientCodec interface {
+ WriteRequest(*Request, interface{}) error
+ ReadResponseHeader(*Response) error
+ ReadResponseBody(interface{}) error
+
+ Close() error
+}
+
+func (client *Client) send(call *Call) {
+ client.reqMutex.Lock()
+ defer client.reqMutex.Unlock()
+
+ // Register this call.
+ client.mutex.Lock()
+ if client.shutdown || client.closing {
+ client.mutex.Unlock()
+ call.Error = ErrShutdown
+ call.done()
+ return
+ }
+ seq := client.seq
+ client.seq++
+ client.pending[seq] = call
+ client.mutex.Unlock()
+
+ // Encode and send the request.
+ client.request.Seq = seq
+ client.request.ServiceMethod = call.ServiceMethod
+ err := client.codec.WriteRequest(&client.request, call.Args)
+ if err != nil {
+ client.mutex.Lock()
+ call = client.pending[seq]
+ delete(client.pending, seq)
+ client.mutex.Unlock()
+ if call != nil {
+ call.Error = err
+ call.done()
+ }
+ }
+}
+
+func (client *Client) input() {
+ var err error
+ var response Response
+ for err == nil {
+ response = Response{}
+ err = client.codec.ReadResponseHeader(&response)
+ if err != nil {
+ break
+ }
+ seq := response.Seq
+ client.mutex.Lock()
+ call := client.pending[seq]
+ delete(client.pending, seq)
+ client.mutex.Unlock()
+
+ switch {
+ case call == nil:
+ // We've got no pending call. That usually means that
+ // WriteRequest partially failed, and call was already
+ // removed; response is a server telling us about an
+ // error reading request body. We should still attempt
+ // to read error body, but there's no one to give it to.
+ err = client.codec.ReadResponseBody(nil)
+ if err != nil {
+ err = errors.New("reading error body: " + err.Error())
+ }
+ case response.Error != "":
+ // We've got an error response. Give this to the request;
+ // any subsequent requests will get the ReadResponseBody
+ // error if there is one.
+ call.Error = ServerError(response.Error)
+ err = client.codec.ReadResponseBody(nil)
+ if err != nil {
+ err = errors.New("reading error body: " + err.Error())
+ }
+ call.done()
+ default:
+ err = client.codec.ReadResponseBody(call.Reply)
+ if err != nil {
+ call.Error = errors.New("reading body " + err.Error())
+ }
+ call.done()
+ }
+ }
+ // Terminate pending calls.
+ client.reqMutex.Lock()
+ client.mutex.Lock()
+ client.shutdown = true
+ closing := client.closing
+ if err == io.EOF {
+ if closing {
+ err = ErrShutdown
+ } else {
+ err = io.ErrUnexpectedEOF
+ }
+ }
+ for _, call := range client.pending {
+ call.Error = err
+ call.done()
+ }
+ client.mutex.Unlock()
+ client.reqMutex.Unlock()
+ if debugLog && err != io.EOF && !closing {
+ log.Println("rpc: client protocol error:", err)
+ }
+}
+
+func (call *Call) done() {
+ select {
+ case call.Done <- call:
+ // ok
+ default:
+ // We don't want to block here. It is the caller's responsibility to make
+ // sure the channel has enough buffer space. See comment in Go().
+ if debugLog {
+ log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
+ }
+ }
+}
+
+// NewClient returns a new Client to handle requests to the
+// set of services at the other end of the connection.
+// It adds a buffer to the write side of the connection so
+// the header and payload are sent as a unit.
+//
+// The read and write halves of the connection are serialized independently,
+// so no interlocking is required. However each half may be accessed
+// concurrently so the implementation of conn should protect against
+// concurrent reads or concurrent writes.
+func NewClient(conn io.ReadWriteCloser) *Client {
+ encBuf := bufio.NewWriter(conn)
+ client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
+ return NewClientWithCodec(client)
+}
+
+// NewClientWithCodec is like NewClient but uses the specified
+// codec to encode requests and decode responses.
+func NewClientWithCodec(codec ClientCodec) *Client {
+ client := &Client{
+ codec: codec,
+ pending: make(map[uint64]*Call),
+ }
+ go client.input()
+ return client
+}
+
+type gobClientCodec struct {
+ rwc io.ReadWriteCloser
+ dec *gob.Decoder
+ enc *gob.Encoder
+ encBuf *bufio.Writer
+}
+
+func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
+ if err = c.enc.Encode(r); err != nil {
+ return
+ }
+ if err = c.enc.Encode(body); err != nil {
+ return
+ }
+ return c.encBuf.Flush()
+}
+
+func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
+ return c.dec.Decode(r)
+}
+
+func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
+ return c.dec.Decode(body)
+}
+
+func (c *gobClientCodec) Close() error {
+ return c.rwc.Close()
+}
+
+// DialHTTP connects to an HTTP RPC server at the specified network address
+// listening on the default HTTP RPC path.
+func DialHTTP(network, address string) (*Client, error) {
+ return DialHTTPPath(network, address, DefaultRPCPath)
+}
+
+// DialHTTPPath connects to an HTTP RPC server
+// at the specified network address and path.
+func DialHTTPPath(network, address, path string) (*Client, error) {
+ var err error
+ conn, err := net.Dial(network, address)
+ if err != nil {
+ return nil, err
+ }
+ io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
+
+ // Require successful HTTP response
+ // before switching to RPC protocol.
+ resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
+ if err == nil && resp.Status == connected {
+ return NewClient(conn), nil
+ }
+ if err == nil {
+ err = errors.New("unexpected HTTP response: " + resp.Status)
+ }
+ conn.Close()
+ return nil, &net.OpError{
+ Op: "dial-http",
+ Net: network + " " + address,
+ Addr: nil,
+ Err: err,
+ }
+}
+
+// Dial connects to an RPC server at the specified network address.
+func Dial(network, address string) (*Client, error) {
+ conn, err := net.Dial(network, address)
+ if err != nil {
+ return nil, err
+ }
+ return NewClient(conn), nil
+}
+
+// Close calls the underlying codec's Close method. If the connection is already
+// shutting down, ErrShutdown is returned.
+func (client *Client) Close() error {
+ client.mutex.Lock()
+ if client.closing {
+ client.mutex.Unlock()
+ return ErrShutdown
+ }
+ client.closing = true
+ client.mutex.Unlock()
+ return client.codec.Close()
+}
+
+// Go invokes the function asynchronously. It returns the Call structure representing
+// the invocation. The done channel will signal when the call is complete by returning
+// the same Call object. If done is nil, Go will allocate a new channel.
+// If non-nil, done must be buffered or Go will deliberately crash.
+func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
+ call := new(Call)
+ call.ServiceMethod = serviceMethod
+ call.Args = args
+ call.Reply = reply
+ if done == nil {
+ done = make(chan *Call, 10) // buffered.
+ } else {
+ // If caller passes done != nil, it must arrange that
+ // done has enough buffer for the number of simultaneous
+ // RPCs that will be using that channel. If the channel
+ // is totally unbuffered, it's best not to run at all.
+ if cap(done) == 0 {
+ log.Panic("rpc: done channel is unbuffered")
+ }
+ }
+ call.Done = done
+ client.send(call)
+ return call
+}
+
+// Call invokes the named function, waits for it to complete, and returns its error status.
+func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
+ call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
+ return call.Error
+}
diff --git a/rpc/debug.go b/rpc/debug.go
new file mode 100644
index 0000000..a1d799f
--- /dev/null
+++ b/rpc/debug.go
@@ -0,0 +1,90 @@
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rpc
+
+/*
+ Some HTML presented at http://machine:port/debug/rpc
+ Lists services, their methods, and some statistics, still rudimentary.
+*/
+
+import (
+ "fmt"
+ "html/template"
+ "net/http"
+ "sort"
+)
+
+const debugText = `
+
+ Services
+ {{range .}}
+
+ Service {{.Name}}
+
+
+ | Method | Calls |
+ {{range .Method}}
+
+ | {{.Name}}({{.Type.ArgType}}, {{.Type.ReplyType}}) error |
+ {{.Type.NumCalls}} |
+
+ {{end}}
+
+ {{end}}
+
+ `
+
+var debug = template.Must(template.New("RPC debug").Parse(debugText))
+
+// If set, print log statements for internal and I/O errors.
+var debugLog = false
+
+type debugMethod struct {
+ Type *methodType
+ Name string
+}
+
+type methodArray []debugMethod
+
+type debugService struct {
+ Service *service
+ Name string
+ Method methodArray
+}
+
+type serviceArray []debugService
+
+func (s serviceArray) Len() int { return len(s) }
+func (s serviceArray) Less(i, j int) bool { return s[i].Name < s[j].Name }
+func (s serviceArray) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
+
+func (m methodArray) Len() int { return len(m) }
+func (m methodArray) Less(i, j int) bool { return m[i].Name < m[j].Name }
+func (m methodArray) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
+
+type debugHTTP struct {
+ *Server
+}
+
+// Runs at /debug/rpc
+func (server debugHTTP) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ // Build a sorted version of the data.
+ var services serviceArray
+ server.serviceMap.Range(func(snamei, svci interface{}) bool {
+ svc := svci.(*service)
+ ds := debugService{svc, snamei.(string), make(methodArray, 0, len(svc.method))}
+ for mname, method := range svc.method {
+ ds.Method = append(ds.Method, debugMethod{method, mname})
+ }
+ sort.Sort(ds.Method)
+ services = append(services, ds)
+ return true
+ })
+ sort.Sort(services)
+ err := debug.Execute(w, services)
+ if err != nil {
+ fmt.Fprintln(w, "rpc: error executing template:", err.Error())
+ }
+}
diff --git a/rpc/server.go b/rpc/server.go
new file mode 100644
index 0000000..bc2f9be
--- /dev/null
+++ b/rpc/server.go
@@ -0,0 +1,734 @@
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+/*
+ Package rpc provides access to the exported methods of an object across a
+ network or other I/O connection. A server registers an object, making it visible
+ as a service with the name of the type of the object. After registration, exported
+ methods of the object will be accessible remotely. A server may register multiple
+ objects (services) of different types but it is an error to register multiple
+ objects of the same type.
+
+ Only methods that satisfy these criteria will be made available for remote access;
+ other methods will be ignored:
+
+ - the method's type is exported.
+ - the method is exported.
+ - the method has two arguments, both exported (or builtin) types.
+ - the method's second argument is a pointer.
+ - the method has return type error.
+
+ In effect, the method must look schematically like
+
+ func (t *T) MethodName(argType T1, replyType *T2) error
+
+ where T1 and T2 can be marshaled by encoding/gob.
+ These requirements apply even if a different codec is used.
+ (In the future, these requirements may soften for custom codecs.)
+
+ The method's first argument represents the arguments provided by the caller; the
+ second argument represents the result parameters to be returned to the caller.
+ The method's return value, if non-nil, is passed back as a string that the client
+ sees as if created by errors.New. If an error is returned, the reply parameter
+ will not be sent back to the client.
+
+ The server may handle requests on a single connection by calling ServeConn. More
+ typically it will create a network listener and call Accept or, for an HTTP
+ listener, HandleHTTP and http.Serve.
+
+ A client wishing to use the service establishes a connection and then invokes
+ NewClient on the connection. The convenience function Dial (DialHTTP) performs
+ both steps for a raw network connection (an HTTP connection). The resulting
+ Client object has two methods, Call and Go, that specify the service and method to
+ call, a pointer containing the arguments, and a pointer to receive the result
+ parameters.
+
+ The Call method waits for the remote call to complete while the Go method
+ launches the call asynchronously and signals completion using the Call
+ structure's Done channel.
+
+ Unless an explicit codec is set up, package encoding/gob is used to
+ transport the data.
+
+ Here is a simple example. A server wishes to export an object of type Arith:
+
+ package server
+
+ import "errors"
+
+ type Args struct {
+ A, B int
+ }
+
+ type Quotient struct {
+ Quo, Rem int
+ }
+
+ type Arith int
+
+ func (t *Arith) Multiply(args *Args, reply *int) error {
+ *reply = args.A * args.B
+ return nil
+ }
+
+ func (t *Arith) Divide(args *Args, quo *Quotient) error {
+ if args.B == 0 {
+ return errors.New("divide by zero")
+ }
+ quo.Quo = args.A / args.B
+ quo.Rem = args.A % args.B
+ return nil
+ }
+
+ The server calls (for HTTP service):
+
+ arith := new(Arith)
+ rpc.Register(arith)
+ rpc.HandleHTTP()
+ l, e := net.Listen("tcp", ":1234")
+ if e != nil {
+ log.Fatal("listen error:", e)
+ }
+ go http.Serve(l, nil)
+
+ At this point, clients can see a service "Arith" with methods "Arith.Multiply" and
+ "Arith.Divide". To invoke one, a client first dials the server:
+
+ client, err := rpc.DialHTTP("tcp", serverAddress + ":1234")
+ if err != nil {
+ log.Fatal("dialing:", err)
+ }
+
+ Then it can make a remote call:
+
+ // Synchronous call
+ args := &server.Args{7,8}
+ var reply int
+ err = client.Call("Arith.Multiply", args, &reply)
+ if err != nil {
+ log.Fatal("arith error:", err)
+ }
+ fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)
+
+ or
+
+ // Asynchronous call
+ quotient := new(Quotient)
+ divCall := client.Go("Arith.Divide", args, quotient, nil)
+ replyCall := <-divCall.Done // will be equal to divCall
+ // check errors, print, etc.
+
+ A server implementation will often provide a simple, type-safe wrapper for the
+ client.
+
+ The net/rpc package is frozen and is not accepting new features.
+*/
+package rpc
+
+import (
+ "bufio"
+ "encoding/gob"
+ "errors"
+ "io"
+ "log"
+ "net"
+ "net/http"
+ "reflect"
+ "strings"
+ "sync"
+ "unicode"
+ "unicode/utf8"
+)
+
+const (
+ // Defaults used by HandleHTTP
+ DefaultRPCPath = "/_goRPC_"
+ DefaultDebugPath = "/debug/rpc"
+)
+
+// Precompute the reflect type for error. Can't use error directly
+// because Typeof takes an empty interface value. This is annoying.
+var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
+
+type methodType struct {
+ sync.Mutex // protects counters
+ method reflect.Method
+ ArgType reflect.Type
+ ReplyType reflect.Type
+ numCalls uint
+}
+
+type service struct {
+ name string // name of service
+ rcvr reflect.Value // receiver of methods for the service
+ typ reflect.Type // type of the receiver
+ method map[string]*methodType // registered methods
+}
+
+// Request is a header written before every RPC call. It is used internally
+// but documented here as an aid to debugging, such as when analyzing
+// network traffic.
+type Request struct {
+ ServiceMethod string // format: "Service.Method"
+ Seq uint64 // sequence number chosen by client
+ next *Request // for free list in Server
+}
+
+// Response is a header written before every RPC return. It is used internally
+// but documented here as an aid to debugging, such as when analyzing
+// network traffic.
+type Response struct {
+ ServiceMethod string // echoes that of the Request
+ Seq uint64 // echoes that of the request
+ Error string // error, if any.
+ next *Response // for free list in Server
+}
+
+// Server represents an RPC Server.
+type Server struct {
+ serviceMap sync.Map // map[string]*service
+ reqLock sync.Mutex // protects freeReq
+ freeReq *Request
+ respLock sync.Mutex // protects freeResp
+ freeResp *Response
+}
+
+// NewServer returns a new Server.
+func NewServer() *Server {
+ return &Server{}
+}
+
+// DefaultServer is the default instance of *Server.
+var DefaultServer = NewServer()
+
+// Is this an exported - upper case - name?
+func isExported(name string) bool {
+ rune, _ := utf8.DecodeRuneInString(name)
+ return unicode.IsUpper(rune)
+}
+
+// Is this type exported or a builtin?
+func isExportedOrBuiltinType(t reflect.Type) bool {
+ for t.Kind() == reflect.Ptr {
+ t = t.Elem()
+ }
+ // PkgPath will be non-empty even for an exported type,
+ // so we need to check the type name as well.
+ return isExported(t.Name()) || t.PkgPath() == ""
+}
+
+// Register publishes in the server the set of methods of the
+// receiver value that satisfy the following conditions:
+// - exported method of exported type
+// - two arguments, both of exported type
+// - the second argument is a pointer
+// - one return value, of type error
+// It returns an error if the receiver is not an exported type or has
+// no suitable methods. It also logs the error using package log.
+// The client accesses each method using a string of the form "Type.Method",
+// where Type is the receiver's concrete type.
+func (server *Server) Register(rcvr interface{}) error {
+ return server.register(rcvr, "", false)
+}
+
+// RegisterName is like Register but uses the provided name for the type
+// instead of the receiver's concrete type.
+func (server *Server) RegisterName(name string, rcvr interface{}) error {
+ return server.register(rcvr, name, true)
+}
+
+func (server *Server) register(rcvr interface{}, name string, useName bool) error {
+ s := new(service)
+ s.typ = reflect.TypeOf(rcvr)
+ s.rcvr = reflect.ValueOf(rcvr)
+ sname := reflect.Indirect(s.rcvr).Type().Name()
+ if useName {
+ sname = name
+ }
+ if sname == "" {
+ s := "rpc.Register: no service name for type " + s.typ.String()
+ log.Print(s)
+ return errors.New(s)
+ }
+
+ if !isExported(sname) && !useName {
+ s := "rpc.Register: type " + sname + " is not exported"
+ log.Print(s)
+ return errors.New(s)
+ }
+ s.name = sname
+
+ // Install the methods
+ s.method = suitableMethods(s.typ, true)
+
+ if len(s.method) == 0 {
+ str := ""
+
+ // To help the user, see if a pointer receiver would work.
+ method := suitableMethods(reflect.PtrTo(s.typ), false)
+ if len(method) != 0 {
+ str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
+ } else {
+ str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
+ }
+ //log.Print(str)
+
+ return errors.New(str)
+ }
+
+ if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
+ return errors.New("rpc: service already defined: " + sname)
+ }
+ return nil
+}
+
+// suitableMethods returns suitable Rpc methods of typ, it will report
+// error using log if reportErr is true.
+func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
+ methods := make(map[string]*methodType)
+ for m := 0; m < typ.NumMethod(); m++ {
+ method := typ.Method(m)
+ mtype := method.Type
+ mname := method.Name
+
+ if len(mname) > 4 && mname[:4] != "RPC_" {
+ continue
+ }
+
+ // Method must be exported.
+ if method.PkgPath != "" {
+ continue
+ }
+ // Method needs three ins: receiver, *args, *reply.
+ if mtype.NumIn() != 3 {
+ if reportErr {
+ //log.Printf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn())
+ }
+ continue
+ }
+ // First arg need not be a pointer.
+ argType := mtype.In(1)
+ if !isExportedOrBuiltinType(argType) {
+ if reportErr {
+ //log.Printf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType)
+ }
+ continue
+ }
+ // Second arg must be a pointer.
+ replyType := mtype.In(2)
+ if replyType.Kind() != reflect.Ptr {
+ if reportErr {
+ //log.Printf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType)
+ }
+ continue
+ }
+ // Reply type must be exported.
+ if !isExportedOrBuiltinType(replyType) {
+ if reportErr {
+ //log.Printf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType)
+ }
+ continue
+ }
+ // Method needs one out.
+ if mtype.NumOut() != 1 {
+ if reportErr {
+ //log.Printf("rpc.Register: method %q has %d output parameters; needs exactly one\n", mname, mtype.NumOut())
+ }
+ continue
+ }
+ // The return type of the method must be error.
+ if returnType := mtype.Out(0); returnType != typeOfError {
+ if reportErr {
+ //log.Printf("rpc.Register: return type of method %q is %q, must be error\n", mname, returnType)
+ }
+ continue
+ }
+ methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
+ }
+ return methods
+}
+
+// A value sent as a placeholder for the server's response value when the server
+// receives an invalid request. It is never decoded by the client since the Response
+// contains an error when it is used.
+var invalidRequest = struct{}{}
+
+func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
+ resp := server.getResponse()
+ // Encode the response header
+ resp.ServiceMethod = req.ServiceMethod
+ if errmsg != "" {
+ resp.Error = errmsg
+ reply = invalidRequest
+ }
+ resp.Seq = req.Seq
+ sending.Lock()
+ err := codec.WriteResponse(resp, reply)
+ if debugLog && err != nil {
+ log.Println("rpc: writing response:", err)
+ }
+ sending.Unlock()
+ server.freeResponse(resp)
+}
+
+func (m *methodType) NumCalls() (n uint) {
+ m.Lock()
+ n = m.numCalls
+ m.Unlock()
+ return n
+}
+
+func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
+ if wg != nil {
+ defer wg.Done()
+ }
+ mtype.Lock()
+ mtype.numCalls++
+ mtype.Unlock()
+ function := mtype.method.Func
+ // Invoke the method, providing a new value for the reply.
+ returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
+ // The return value for the method is an error.
+ errInter := returnValues[0].Interface()
+ errmsg := ""
+ if errInter != nil {
+ errmsg = errInter.(error).Error()
+ }
+ server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
+ server.freeRequest(req)
+}
+
+type gobServerCodec struct {
+ rwc io.ReadWriteCloser
+ dec *gob.Decoder
+ enc *gob.Encoder
+ encBuf *bufio.Writer
+ closed bool
+}
+
+func (c *gobServerCodec) ReadRequestHeader(r *Request) error {
+ return c.dec.Decode(r)
+}
+
+func (c *gobServerCodec) ReadRequestBody(body interface{}) error {
+ return c.dec.Decode(body)
+}
+
+func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) (err error) {
+ if err = c.enc.Encode(r); err != nil {
+ if c.encBuf.Flush() == nil {
+ // Gob couldn't encode the header. Should not happen, so if it does,
+ // shut down the connection to signal that the connection is broken.
+ log.Println("rpc: gob error encoding response:", err)
+ c.Close()
+ }
+ return
+ }
+ if err = c.enc.Encode(body); err != nil {
+ if c.encBuf.Flush() == nil {
+ // Was a gob problem encoding the body but the header has been written.
+ // Shut down the connection to signal that the connection is broken.
+ log.Println("rpc: gob error encoding body:", err)
+ c.Close()
+ }
+ return
+ }
+ return c.encBuf.Flush()
+}
+
+func (c *gobServerCodec) Close() error {
+ if c.closed {
+ // Only call c.rwc.Close once; otherwise the semantics are undefined.
+ return nil
+ }
+ c.closed = true
+ return c.rwc.Close()
+}
+
+// ServeConn runs the server on a single connection.
+// ServeConn blocks, serving the connection until the client hangs up.
+// The caller typically invokes ServeConn in a go statement.
+// ServeConn uses the gob wire format (see package gob) on the
+// connection. To use an alternate codec, use ServeCodec.
+// See NewClient's comment for information about concurrent access.
+func (server *Server) ServeConn(conn io.ReadWriteCloser) {
+ buf := bufio.NewWriter(conn)
+ srv := &gobServerCodec{
+ rwc: conn,
+ dec: gob.NewDecoder(conn),
+ enc: gob.NewEncoder(buf),
+ encBuf: buf,
+ }
+ server.ServeCodec(srv)
+}
+
+// ServeCodec is like ServeConn but uses the specified codec to
+// decode requests and encode responses.
+func (server *Server) ServeCodec(codec ServerCodec) {
+ sending := new(sync.Mutex)
+ wg := new(sync.WaitGroup)
+ for {
+ service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
+ if err != nil {
+ if debugLog && err != io.EOF {
+ log.Println("rpc:", err)
+ }
+ if !keepReading {
+ break
+ }
+ // send a response if we actually managed to read a header.
+ if req != nil {
+ server.sendResponse(sending, req, invalidRequest, codec, err.Error())
+ server.freeRequest(req)
+ }
+ continue
+ }
+ wg.Add(1)
+ go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
+ }
+ // We've seen that there are no more requests.
+ // Wait for responses to be sent before closing codec.
+ wg.Wait()
+ codec.Close()
+}
+
+// ServeRequest is like ServeCodec but synchronously serves a single request.
+// It does not close the codec upon completion.
+func (server *Server) ServeRequest(codec ServerCodec) error {
+ sending := new(sync.Mutex)
+ service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
+ if err != nil {
+ if !keepReading {
+ return err
+ }
+ // send a response if we actually managed to read a header.
+ if req != nil {
+ server.sendResponse(sending, req, invalidRequest, codec, err.Error())
+ server.freeRequest(req)
+ }
+ return err
+ }
+ service.call(server, sending, nil, mtype, req, argv, replyv, codec)
+ return nil
+}
+
+func (server *Server) getRequest() *Request {
+ server.reqLock.Lock()
+ req := server.freeReq
+ if req == nil {
+ req = new(Request)
+ } else {
+ server.freeReq = req.next
+ *req = Request{}
+ }
+ server.reqLock.Unlock()
+ return req
+}
+
+func (server *Server) freeRequest(req *Request) {
+ server.reqLock.Lock()
+ req.next = server.freeReq
+ server.freeReq = req
+ server.reqLock.Unlock()
+}
+
+func (server *Server) getResponse() *Response {
+ server.respLock.Lock()
+ resp := server.freeResp
+ if resp == nil {
+ resp = new(Response)
+ } else {
+ server.freeResp = resp.next
+ *resp = Response{}
+ }
+ server.respLock.Unlock()
+ return resp
+}
+
+func (server *Server) freeResponse(resp *Response) {
+ server.respLock.Lock()
+ resp.next = server.freeResp
+ server.freeResp = resp
+ server.respLock.Unlock()
+}
+
+func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
+ service, mtype, req, keepReading, err = server.readRequestHeader(codec)
+ if err != nil {
+ if !keepReading {
+ return
+ }
+ // discard body
+ codec.ReadRequestBody(nil)
+ return
+ }
+
+ // Decode the argument value.
+ argIsValue := false // if true, need to indirect before calling.
+ if mtype.ArgType.Kind() == reflect.Ptr {
+ argv = reflect.New(mtype.ArgType.Elem())
+ } else {
+ argv = reflect.New(mtype.ArgType)
+ argIsValue = true
+ }
+ // argv guaranteed to be a pointer now.
+ if err = codec.ReadRequestBody(argv.Interface()); err != nil {
+ return
+ }
+ if argIsValue {
+ argv = argv.Elem()
+ }
+
+ replyv = reflect.New(mtype.ReplyType.Elem())
+
+ switch mtype.ReplyType.Elem().Kind() {
+ case reflect.Map:
+ replyv.Elem().Set(reflect.MakeMap(mtype.ReplyType.Elem()))
+ case reflect.Slice:
+ replyv.Elem().Set(reflect.MakeSlice(mtype.ReplyType.Elem(), 0, 0))
+ }
+ return
+}
+
+func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
+ // Grab the request header.
+ req = server.getRequest()
+ err = codec.ReadRequestHeader(req)
+ if err != nil {
+ req = nil
+ if err == io.EOF || err == io.ErrUnexpectedEOF {
+ return
+ }
+ err = errors.New("rpc: server cannot decode request: " + err.Error())
+ return
+ }
+
+ // We read the header successfully. If we see an error now,
+ // we can still recover and move on to the next request.
+ keepReading = true
+
+ dot := strings.LastIndex(req.ServiceMethod, ".")
+ if dot < 0 {
+ err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
+ return
+ }
+ serviceName := req.ServiceMethod[:dot]
+ methodName := req.ServiceMethod[dot+1:]
+
+ // Look up the request.
+ svci, ok := server.serviceMap.Load(serviceName)
+ if !ok {
+ err = errors.New("rpc: can't find service " + req.ServiceMethod)
+ return
+ }
+ svc = svci.(*service)
+ mtype = svc.method[methodName]
+ if mtype == nil {
+ err = errors.New("rpc: can't find method " + req.ServiceMethod)
+ }
+ return
+}
+
+// Accept accepts connections on the listener and serves requests
+// for each incoming connection. Accept blocks until the listener
+// returns a non-nil error. The caller typically invokes Accept in a
+// go statement.
+func (server *Server) Accept(lis net.Listener) {
+ for {
+ conn, err := lis.Accept()
+ if err != nil {
+ log.Print("rpc.Serve: accept:", err.Error())
+ return
+ }
+ go server.ServeConn(conn)
+ }
+}
+
+// Register publishes the receiver's methods in the DefaultServer.
+func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
+
+// RegisterName is like Register but uses the provided name for the type
+// instead of the receiver's concrete type.
+func RegisterName(name string, rcvr interface{}) error {
+ return DefaultServer.RegisterName(name, rcvr)
+}
+
+// A ServerCodec implements reading of RPC requests and writing of
+// RPC responses for the server side of an RPC session.
+// The server calls ReadRequestHeader and ReadRequestBody in pairs
+// to read requests from the connection, and it calls WriteResponse to
+// write a response back. The server calls Close when finished with the
+// connection. ReadRequestBody may be called with a nil
+// argument to force the body of the request to be read and discarded.
+// See NewClient's comment for information about concurrent access.
+type ServerCodec interface {
+ ReadRequestHeader(*Request) error
+ ReadRequestBody(interface{}) error
+ WriteResponse(*Response, interface{}) error
+
+ // Close can be called multiple times and must be idempotent.
+ Close() error
+}
+
+// ServeConn runs the DefaultServer on a single connection.
+// ServeConn blocks, serving the connection until the client hangs up.
+// The caller typically invokes ServeConn in a go statement.
+// ServeConn uses the gob wire format (see package gob) on the
+// connection. To use an alternate codec, use ServeCodec.
+// See NewClient's comment for information about concurrent access.
+func ServeConn(conn io.ReadWriteCloser) {
+ DefaultServer.ServeConn(conn)
+}
+
+// ServeCodec is like ServeConn but uses the specified codec to
+// decode requests and encode responses.
+func ServeCodec(codec ServerCodec) {
+ DefaultServer.ServeCodec(codec)
+}
+
+// ServeRequest is like ServeCodec but synchronously serves a single request.
+// It does not close the codec upon completion.
+func ServeRequest(codec ServerCodec) error {
+ return DefaultServer.ServeRequest(codec)
+}
+
+// Accept accepts connections on the listener and serves requests
+// to DefaultServer for each incoming connection.
+// Accept blocks; the caller typically invokes it in a go statement.
+func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
+
+// Can connect to RPC service using HTTP CONNECT to rpcPath.
+var connected = "200 Connected to Go RPC"
+
+// ServeHTTP implements an http.Handler that answers RPC requests.
+func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ if req.Method != "CONNECT" {
+ w.Header().Set("Content-Type", "text/plain; charset=utf-8")
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ io.WriteString(w, "405 must CONNECT\n")
+ return
+ }
+ conn, _, err := w.(http.Hijacker).Hijack()
+ if err != nil {
+ log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
+ return
+ }
+ io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
+ server.ServeConn(conn)
+}
+
+// HandleHTTP registers an HTTP handler for RPC messages on rpcPath,
+// and a debugging handler on debugPath.
+// It is still necessary to invoke http.Serve(), typically in a go statement.
+func (server *Server) HandleHTTP(rpcPath, debugPath string) {
+ http.Handle(rpcPath, server)
+ http.Handle(debugPath, debugHTTP{server})
+}
+
+// HandleHTTP registers an HTTP handler for RPC messages to DefaultServer
+// on DefaultRPCPath and a debugging handler on DefaultDebugPath.
+// It is still necessary to invoke http.Serve(), typically in a go statement.
+func HandleHTTP() {
+ DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
+}
diff --git a/server/server.go b/server/server.go
new file mode 100644
index 0000000..ac5f228
--- /dev/null
+++ b/server/server.go
@@ -0,0 +1,88 @@
+package server
+
+import (
+ "fmt"
+ "log"
+
+ "net/http"
+ _ "net/http/pprof"
+ "origin/cluster"
+ "origin/service"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+)
+
+type CExitCtl struct {
+ exit chan bool
+ waitGroup *sync.WaitGroup
+}
+
+type cserver struct {
+ CExitCtl
+ serviceManager service.IServiceManager
+ sigs chan os.Signal
+}
+
+func (s *cserver) Init() {
+ service.InitLog()
+ service.InstanceServiceMgr().Init()
+
+ s.exit = make(chan bool)
+ s.waitGroup = &sync.WaitGroup{}
+ s.sigs = make(chan os.Signal, 1)
+ signal.Notify(s.sigs, syscall.SIGINT, syscall.SIGTERM)
+}
+
+func (s *cserver) SetupService(services ...service.IService) {
+ for i := 0; i < len(services); i++ {
+ if cluster.InstanceClusterMgr().HasLocalService(services[i].GetServiceName()) == true {
+ service.InstanceServiceMgr().Setup(services[i])
+ }
+
+ }
+
+}
+
+func (s *cserver) Start() {
+ go func() {
+
+ log.Println(http.ListenAndServe("localhost:6060", nil))
+ }()
+
+ service.InstanceServiceMgr().Start(s.exit, s.waitGroup)
+ cluster.InstanceClusterMgr().Start()
+
+ select {
+ case <-s.sigs:
+ fmt.Println("收到信号推出程序")
+ }
+
+ s.Stop()
+}
+
+func (s *cserver) Stop() {
+ close(s.exit)
+ s.waitGroup.Wait()
+}
+
+func NewServer() *cserver {
+ err := cluster.InstanceClusterMgr().Init()
+ if err != nil {
+ fmt.Print(err)
+ return nil
+ }
+
+ return new(cserver)
+}
+
+func HasCmdParam(param string) bool {
+ for i := 0; i < len(os.Args); i++ {
+ if os.Args[i] == param {
+ return true
+ }
+ }
+
+ return false
+}
diff --git a/service/Service.go b/service/Service.go
new file mode 100644
index 0000000..e9117a0
--- /dev/null
+++ b/service/Service.go
@@ -0,0 +1,180 @@
+package service
+
+import (
+ "fmt"
+ "log"
+
+ "os"
+ "reflect"
+ "strings"
+ "sync"
+ "time"
+)
+
+type MethodInfo struct {
+ Fun reflect.Value
+ ParamList []reflect.Value
+ types reflect.Type
+}
+
+type IBaseModule interface {
+ SetModuleName(modulename string) bool
+ GetModuleName() string
+ OnInit() error
+ OnRun() error
+}
+
+type IService interface {
+ Init(Iservice interface{}, servicetype int) error
+ Run(service IService, exit chan bool, pwaitGroup *sync.WaitGroup) error
+ OnInit() error
+ OnEndInit() error
+ OnRun() error
+ OnDestory() error
+ OnFetchService(iservice IService) error
+ GetServiceType() int
+ GetServiceName() string
+ GetServiceId() int
+ IsTimeOutTick(microSecond int64) bool
+
+ AddModule(module IBaseModule, autorun bool) bool
+ GetModule(module string) IBaseModule
+ GetStatus() int
+}
+
+var Log *log.Logger
+var logFile *os.File
+
+func InitLog() {
+ fileName := "log.log"
+ var err error
+ logFile, err = os.Create(fileName)
+
+ if err != nil {
+ log.Fatalln("open file error")
+ }
+ Log = log.New(logFile, "", log.LstdFlags)
+}
+
+type BaseService struct {
+ serviceid int
+ servicename string
+ servicetype int
+
+ tickTime int64
+ statTm int64
+ Status int
+
+ modulelist []IBaseModule
+}
+
+type BaseModule struct {
+ modulename string
+}
+
+func (slf *BaseModule) SetModuleName(modulename string) bool {
+ slf.modulename = modulename
+ return true
+}
+
+func (slf *BaseModule) GetModuleName() string {
+ return slf.modulename
+}
+
+func (slf *BaseService) GetServiceId() int {
+ return slf.serviceid
+}
+
+func (slf *BaseService) GetServiceType() int {
+ return slf.servicetype
+}
+
+func (slf *BaseService) GetServiceName() string {
+ return slf.servicename
+}
+
+func (slf *BaseService) GetStatus() int {
+ return slf.Status
+}
+
+func (slf *BaseService) OnEndInit() error {
+ return nil
+}
+
+func (slf *BaseService) OnFetchService(iservice IService) error {
+ return nil
+}
+
+func (slf *BaseService) Init(Iservice interface{}, servicetype int) error {
+ slf.servicename = fmt.Sprintf("%T", Iservice)
+ parts := strings.Split(slf.servicename, ".")
+ if len(parts) != 2 {
+ err := fmt.Errorf("BaseService.Init: service name is error: %q", slf.servicename)
+ return err
+ }
+
+ slf.servicename = parts[1]
+ slf.servicetype = servicetype
+ slf.serviceid = InstanceServiceMgr().GenServiceID()
+
+ return nil
+}
+
+func (slf *BaseService) Run(service IService, exit chan bool, pwaitGroup *sync.WaitGroup) error {
+ defer pwaitGroup.Done()
+ for {
+ select {
+ case <-exit:
+ fmt.Println("stopping...")
+ return nil
+ default:
+ }
+ slf.tickTime = time.Now().UnixNano() / 1e6
+ service.OnRun()
+ slf.tickTime = time.Now().UnixNano() / 1e6
+ }
+
+ return nil
+}
+
+func (slf *BaseService) RPC_CheckServiceTickTimeOut(microSecond int64) error {
+
+ if slf.IsTimeOutTick(microSecond) == true {
+ Log.Printf("service:%s is timeout,state:%d", slf.GetServiceName(), slf.GetStatus())
+ }
+
+ return nil
+}
+
+func (slf *BaseService) IsTimeOutTick(microSecond int64) bool {
+
+ nowtm := time.Now().UnixNano() / 1e6
+ return nowtm-slf.tickTime >= microSecond
+
+}
+
+func (slf *BaseService) AddModule(module IBaseModule, autorun bool) bool {
+ typename := fmt.Sprintf("%v", reflect.TypeOf(module))
+ parts := strings.Split(typename, ".")
+ if len(parts) < 2 {
+ return false
+ }
+ module.SetModuleName(parts[1])
+ slf.modulelist = append(slf.modulelist, module)
+ module.OnInit()
+ if autorun == true {
+ go module.OnRun()
+ }
+
+ return true
+}
+
+func (slf *BaseService) GetModule(module string) IBaseModule {
+ for _, v := range slf.modulelist {
+ if v.GetModuleName() == module {
+ return v
+ }
+ }
+
+ return nil
+}
diff --git a/service/servicemanager.go b/service/servicemanager.go
new file mode 100644
index 0000000..913d698
--- /dev/null
+++ b/service/servicemanager.go
@@ -0,0 +1,119 @@
+package service
+
+import (
+ "fmt"
+ "sync"
+ "time"
+)
+
+type IServiceManager interface {
+ Setup(s IService) bool
+ Init() bool
+ Start() bool
+ CreateServiceID() int
+}
+
+type CServiceManager struct {
+ //serviceList []IService
+ genserviceid int
+ localserviceMap map[string]IService
+}
+
+func (slf *CServiceManager) Setup(s IService) bool {
+
+ slf.localserviceMap[s.GetServiceName()] = s
+ return true
+}
+
+func (slf *CServiceManager) FindService(serviceName string) IService {
+ service, ok := slf.localserviceMap[serviceName]
+ if ok {
+ return service
+ }
+
+ return nil
+}
+
+type FetchService func(s IService) error
+
+func (slf *CServiceManager) FetchService(s FetchService) IService {
+ for _, se := range slf.localserviceMap {
+ s(se)
+ }
+
+ return nil
+}
+
+func (slf *CServiceManager) Init() bool {
+
+ for _, s := range slf.localserviceMap {
+ if s.OnInit() != nil {
+ return false
+ }
+ }
+
+ // 初始化结束
+ for _, s := range slf.localserviceMap {
+ if s.OnEndInit() != nil {
+ return false
+ }
+ }
+
+ return true
+}
+
+func (slf *CServiceManager) Start(exit chan bool, pwaitGroup *sync.WaitGroup) bool {
+ for _, s := range slf.localserviceMap {
+ pwaitGroup.Add(1)
+ go s.Run(s, exit, pwaitGroup)
+ }
+
+ pwaitGroup.Add(1)
+ go slf.CheckServiceTimeTimeout(exit, pwaitGroup)
+ return true
+}
+
+func (slf *CServiceManager) CheckServiceTimeTimeout(exit chan bool, pwaitGroup *sync.WaitGroup) {
+ defer pwaitGroup.Done()
+ for {
+ select {
+ case <-exit:
+ fmt.Println("CheckServiceTimeTimeout stopping...")
+ return
+ default:
+ }
+
+ for _, s := range slf.localserviceMap {
+
+ if s.IsTimeOutTick(20000) == true {
+ Log.Printf("service:%s is timeout,state:%d", s.GetServiceName(), s.GetStatus())
+ }
+ }
+ time.Sleep(2 * time.Second)
+ }
+
+}
+
+func (slf *CServiceManager) GenServiceID() int {
+ slf.genserviceid += 1
+ return slf.genserviceid
+}
+
+func (slf *CServiceManager) Get() bool {
+ for _, s := range slf.localserviceMap {
+ go s.OnRun()
+ }
+
+ return true
+}
+
+var _self *CServiceManager
+
+func InstanceServiceMgr() *CServiceManager {
+ if _self == nil {
+ _self = new(CServiceManager)
+ _self.localserviceMap = make(map[string]IService)
+ return _self
+ }
+ return _self
+}
diff --git a/sysmodule/basemodule.go b/sysmodule/basemodule.go
new file mode 100644
index 0000000..d68e6cf
--- /dev/null
+++ b/sysmodule/basemodule.go
@@ -0,0 +1 @@
+package sysmodule
diff --git a/sysmodule/timer.go.mv b/sysmodule/timer.go.mv
new file mode 100644
index 0000000..675c401
--- /dev/null
+++ b/sysmodule/timer.go.mv
@@ -0,0 +1,35 @@
+package wsservice
+
+import (
+ "origin/service"
+)
+
+//声明控制器函数Map类型变量
+
+type cWSService struct {
+ service.BaseService
+ port int
+}
+
+func (ws *cWSService) OnInit() error {
+ return nil
+}
+
+func (ws *cWSService) OnRun() error {
+
+ return nil
+}
+
+func (ws *cWSService) OnDestory() error {
+ return nil
+}
+
+func NewWSService(servicetype int) *cWSService {
+ wss := new(cWSService)
+ wss.Init(wss, servicetype)
+ return wss
+}
+
+func (ws *cWSService) RPC_TestMethod(a string, b int) error {
+ return nil
+}
diff --git a/sysmodule/websocketclient.go b/sysmodule/websocketclient.go
new file mode 100644
index 0000000..c2e794e
--- /dev/null
+++ b/sysmodule/websocketclient.go
@@ -0,0 +1,158 @@
+package sysmodule
+
+import (
+ "fmt"
+ "gorilla/websocket"
+ "log"
+ "net/http"
+ "net/url"
+
+ "time"
+)
+
+type IWebsocketClient interface {
+ Init(slf IWebsocketClient, strurl string, bproxy bool, timeoutsec time.Duration) error
+ Start() error
+ WriteMessage(msg []byte) error
+ OnDisconnect() error
+ OnConnected() error
+ OnReadMessage(msg []byte) error
+}
+
+type WebsocketClient struct {
+ WsDailer *websocket.Dialer
+ conn *websocket.Conn
+ url string
+ state int //0未连接状态 1连接状态
+ bwritemsg chan []byte
+ slf IWebsocketClient
+ timeoutsec time.Duration
+}
+
+func (ws *WebsocketClient) Init(slf IWebsocketClient, strurl string, bproxy bool, timeoutsec time.Duration) error {
+
+ ws.timeoutsec = timeoutsec
+ ws.slf = slf
+ if bproxy == true {
+ proxy := func(_ *http.Request) (*url.URL, error) {
+ return url.Parse("http://127.0.0.1:1080")
+ }
+
+ if timeoutsec > 0 {
+ tosec := timeoutsec * time.Second
+ ws.WsDailer = &websocket.Dialer{Proxy: proxy, HandshakeTimeout: tosec}
+ } else {
+ ws.WsDailer = &websocket.Dialer{Proxy: proxy}
+ }
+ } else {
+ if timeoutsec > 0 {
+ tosec := timeoutsec * time.Second
+ ws.WsDailer = &websocket.Dialer{HandshakeTimeout: tosec}
+ } else {
+ ws.WsDailer = &websocket.Dialer{}
+ }
+ }
+
+ ws.url = strurl
+ ws.bwritemsg = make(chan []byte, 1000)
+
+ return nil
+}
+
+func (ws *WebsocketClient) OnRun() error {
+ for {
+ if ws.state == 0 {
+ time.Sleep(1 * time.Second)
+ ws.StartConnect()
+ } else {
+ ws.conn.SetReadDeadline(time.Now().Add(ws.timeoutsec * time.Second))
+ _, message, err := ws.conn.ReadMessage()
+
+ if err != nil {
+ log.Printf("到服务器的连接断开 %+v\n", err)
+ ws.conn.Close()
+ ws.state = 0
+ ws.slf.OnDisconnect()
+ continue
+ }
+
+ ws.slf.OnReadMessage(message)
+ }
+ }
+
+ return nil
+}
+
+func (ws *WebsocketClient) StartConnect() error {
+
+ var err error
+ ws.conn, _, err = ws.WsDailer.Dial(ws.url, nil)
+ fmt.Printf("connecting %s, %+v\n", ws.url, err)
+ if err != nil {
+ return err
+ }
+
+ ws.state = 1
+ ws.slf.OnConnected()
+
+ return nil
+}
+
+func (ws *WebsocketClient) Start() error {
+ ws.state = 0
+ go ws.OnRun()
+ go ws.writeMsg()
+ return nil
+}
+
+//触发
+func (ws *WebsocketClient) writeMsg() error {
+ timerC := time.NewTicker(time.Second * 5).C
+ for {
+ if ws.state == 0 {
+ time.Sleep(1 * time.Second)
+ continue
+ }
+ select {
+ case <-timerC:
+ if ws.state == 1 {
+ ws.WriteMessage([]byte(`ping`))
+ }
+ case msg := <-ws.bwritemsg:
+ if ws.state == 1 {
+ ws.conn.SetWriteDeadline(time.Now().Add(ws.timeoutsec * time.Second))
+ err := ws.conn.WriteMessage(websocket.TextMessage, msg)
+
+ if err != nil {
+ fmt.Print(err)
+ ws.state = 0
+ ws.conn.Close()
+ ws.slf.OnDisconnect()
+ }
+ }
+ }
+ }
+
+ return nil
+}
+
+func (ws *WebsocketClient) WriteMessage(msg []byte) error {
+ ws.bwritemsg <- msg
+ return nil
+}
+
+func (ws *WebsocketClient) OnDisconnect() error {
+
+ return nil
+}
+
+func (ws *WebsocketClient) OnConnected() error {
+
+ return nil
+}
+
+//触发
+func (ws *WebsocketClient) OnReadMessage(msg []byte) error {
+
+ return nil
+}
diff --git a/wsservice/wsservice.go b/wsservice/wsservice.go
new file mode 100644
index 0000000..675c401
--- /dev/null
+++ b/wsservice/wsservice.go
@@ -0,0 +1,35 @@
+package wsservice
+
+import (
+ "origin/service"
+)
+
+//声明控制器函数Map类型变量
+
+type cWSService struct {
+ service.BaseService
+ port int
+}
+
+func (ws *cWSService) OnInit() error {
+ return nil
+}
+
+func (ws *cWSService) OnRun() error {
+
+ return nil
+}
+
+func (ws *cWSService) OnDestory() error {
+ return nil
+}
+
+func NewWSService(servicetype int) *cWSService {
+ wss := new(cWSService)
+ wss.Init(wss, servicetype)
+ return wss
+}
+
+func (ws *cWSService) RPC_TestMethod(a string, b int) error {
+ return nil
+}