新增工程

This commit is contained in:
boyce
2019-01-24 16:22:18 +08:00
parent b6add79322
commit 0d4522433d
13 changed files with 2471 additions and 0 deletions

381
cluster/cluster.go Normal file
View File

@@ -0,0 +1,381 @@
package cluster
import (
"fmt"
"log"
"net"
"origin/rpc"
"origin/service"
"os"
"strconv"
"strings"
"time"
)
//https://github.com/rocket049/rpc2d/blob/master/rpcnode.go
//http://daizuozhuo.github.io/golang-rpc-practice/
type RpcClient struct {
nodeid int
pclient *rpc.Client
serverAddr string
isConnect bool
}
type CCluster struct {
port int
cfg *ClusterConfig
nodeclient map[int]*RpcClient
reader net.Conn
writer net.Conn
LocalRpcClient *rpc.Client
}
func (slf *CCluster) ReadNodeInfo(nodeid int) error {
//连接Server结点
var err error
slf.cfg, err = ReadCfg("./config/cluster.json", nodeid)
if err != nil {
fmt.Printf("%v", err)
return nil
}
return nil
}
func (slf *CCluster) GetClusterClient(id int) *rpc.Client {
v, ok := slf.nodeclient[id]
if ok == false {
return nil
}
return v.pclient
}
func (slf *CCluster) GetClusterNode(strNodeName string) *CNodeCfg {
for _, value := range slf.cfg.NodeList {
if value.NodeName == strNodeName {
return &value
}
}
return nil
}
func (slf *CCluster) GetBindUrl() (string, error) {
return slf.cfg.currentNode.ServerAddr, nil
}
type CTestData struct {
Bbbb int64
Cccc int
Ddd string
}
func (slf *CCluster) AcceptRpc(tpcListen *net.TCPListener) error {
slf.reader, slf.writer = net.Pipe()
go rpc.ServeConn(slf.reader)
slf.LocalRpcClient = rpc.NewClient(slf.writer)
for {
conn, err := tpcListen.Accept()
if err != nil {
fmt.Print(err)
return err
}
//使用goroutine单独处理rpc连接请求
go rpc.ServeConn(conn)
}
return nil
}
func (slf *CCluster) ListenService() error {
bindStr, err := slf.GetBindUrl()
if err != nil {
return err
}
tcpaddr, err := net.ResolveTCPAddr("tcp4", bindStr)
if err != nil {
return err
}
tcplisten, err2 := net.ListenTCP("tcp", tcpaddr)
if err2 != nil {
return err2
}
go slf.AcceptRpc(tcplisten)
return nil
}
type CPing struct {
TimeStamp int64
}
type CPong struct {
TimeStamp int64
}
func (slf *CPing) Ping(ping *CPing, pong *CPong) error {
pong.TimeStamp = ping.TimeStamp
return nil
}
func (slf *CCluster) ConnService() error {
ping := CPing{0}
pong := CPong{0}
fmt.Println(rpc.RegisterName("CPing", &ping))
//连接集群服务器
for _, nodeList := range slf.cfg.mapClusterNodeService {
for _, node := range nodeList {
slf.nodeclient[node.NodeID] = &RpcClient{node.NodeID, nil, node.ServerAddr, false}
}
}
for {
for _, rpcClient := range slf.nodeclient {
//
if rpcClient.isConnect == true {
ping.TimeStamp = 0
err := rpcClient.pclient.Call("CPing.Ping", &ping, &pong)
if err != nil {
rpcClient.pclient.Close()
rpcClient.pclient = nil
rpcClient.isConnect = false
continue
}
continue
}
if rpcClient.pclient != nil {
rpcClient.pclient.Close()
rpcClient.pclient = nil
}
client, err := rpc.Dial("tcp", rpcClient.serverAddr)
if err != nil {
log.Println(err)
continue
}
v, _ := slf.nodeclient[rpcClient.nodeid]
v.pclient = client
v.isConnect = true
}
time.Sleep(time.Second * 2)
}
return nil
}
func (slf *CCluster) Init() error {
if len(os.Args) < 2 {
return fmt.Errorf("param error not find NodeId=number")
}
parts := strings.Split(os.Args[1], "=")
if len(parts) < 2 {
return fmt.Errorf("param error not find NodeId=number")
}
if parts[0] != "NodeId" {
return fmt.Errorf("param error not find NodeId=number")
}
slf.nodeclient = make(map[int]*RpcClient)
//读取配置
ret, err := strconv.Atoi(parts[1])
if err != nil {
return err
}
return slf.ReadNodeInfo(ret)
}
func (slf *CCluster) Start() error {
service.InstanceServiceMgr().FetchService(slf.OnFetchService)
//监听服务
slf.ListenService()
//集群
go slf.ConnService()
return nil
}
//Node.servicename.methodname
//servicename.methodname
//_servicename.methodname
func (slf *CCluster) Call(NodeServiceMethod string, args interface{}, reply interface{}) error {
var callServiceName string
nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName)
if len(nodeidList) > 1 {
return fmt.Errorf("Call: %s find %d nodes.", NodeServiceMethod, len(nodeidList))
}
for _, nodeid := range nodeidList {
if nodeid == slf.GetCurrentNodeId() {
return slf.LocalRpcClient.Call(callServiceName, args, reply)
} else {
pclient := slf.GetClusterClient(nodeid)
if pclient == nil {
return fmt.Errorf("Call: NodeId %d is not find.", nodeid)
}
err := pclient.Call(callServiceName, args, reply)
return err
}
}
return fmt.Errorf("Call: %s fail.", NodeServiceMethod)
}
func (slf *CCluster) GetNodeList(NodeServiceMethod string, rpcServerMethod *string) []int {
var nodename string
var servicename string
var methodname string
var nodeidList []int
parts := strings.Split(NodeServiceMethod, ".")
if len(parts) == 2 {
servicename = parts[0]
methodname = parts[1]
} else if len(parts) == 3 {
nodename = parts[0]
servicename = parts[1]
methodname = parts[2]
} else {
return nodeidList
}
if nodename == "" {
nodeidList = make([]int, 0)
if servicename[:1] == "_" {
servicename = servicename[1:]
nodeidList = append(nodeidList, slf.GetCurrentNodeId())
} else {
nodeidList = slf.cfg.GetIdByService(servicename)
}
} else {
nodeidList = slf.GetIdByNodeService(nodename, servicename)
}
*rpcServerMethod = servicename + "." + methodname
return nodeidList
}
func (slf *CCluster) Go(NodeServiceMethod string, args interface{}) error {
var callServiceName string
nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName)
if len(nodeidList) > 1 {
return fmt.Errorf("Call: %s find %d nodes.", NodeServiceMethod, len(nodeidList))
}
for _, nodeid := range nodeidList {
if nodeid == slf.GetCurrentNodeId() {
slf.LocalRpcClient.Go(callServiceName, args, nil, nil)
return nil
} else {
pclient := slf.GetClusterClient(nodeid)
if pclient == nil {
return fmt.Errorf("Call: NodeId %d is not find.", nodeid)
}
pclient.Go(callServiceName, args, nil, nil)
return nil
}
}
return fmt.Errorf("Call: %s fail.", NodeServiceMethod)
}
func (slf *CCluster) CallNode(nodeid int, servicemethod string, args interface{}, reply interface{}) error {
pclient := slf.GetClusterClient(nodeid)
if pclient == nil {
return fmt.Errorf("Call: NodeId %d is not find.", nodeid)
}
err := pclient.Call(servicemethod, args, reply)
return err
}
func (slf *CCluster) GoNode(nodeid int, args interface{}, servicemethod string) error {
pclient := slf.GetClusterClient(nodeid)
if pclient == nil {
return fmt.Errorf("Call: NodeId %d is not find.", nodeid)
}
replyCall := pclient.Go(servicemethod, args, nil, nil)
//ret := <-replyCall.Done
if replyCall.Error != nil {
fmt.Print(replyCall.Error)
}
//fmt.Print(ret)
return nil
}
func (ws *CCluster) OnFetchService(iservice service.IService) error {
rpc.RegisterName(iservice.GetServiceName(), iservice)
return nil
}
//向远程服务器调用
//Node.servicename.methodname
//servicename.methodname
func Call(NodeServiceMethod string, args interface{}, reply interface{}) error {
return InstanceClusterMgr().Call(NodeServiceMethod, args, reply)
}
func Go(NodeServiceMethod string, args interface{}) error {
return InstanceClusterMgr().Go(NodeServiceMethod, args)
}
func CallNode(NodeId int, servicemethod string, args interface{}, reply interface{}) error {
return InstanceClusterMgr().CallNode(NodeId, servicemethod, args, reply)
}
func GoNode(NodeId int, servicemethod string, args interface{}) error {
return InstanceClusterMgr().GoNode(NodeId, args, servicemethod)
}
func CastCall(NodeServiceMethod string, args interface{}, reply []interface{}) error {
return InstanceClusterMgr().Call(NodeServiceMethod, args, reply)
}
var _self *CCluster
func InstanceClusterMgr() *CCluster {
if _self == nil {
_self = new(CCluster)
return _self
}
return _self
}
func (slf *CCluster) GetIdByNodeService(NodeName string, serviceName string) []int {
return slf.cfg.GetIdByNodeService(NodeName, serviceName)
}
func (slf *CCluster) HasLocalService(serviceName string) bool {
return slf.cfg.HasLocalService(serviceName)
}
func (slf *CCluster) GetCurrentNodeId() int {
return slf.cfg.currentNode.NodeID
}

144
cluster/config.go Normal file
View File

@@ -0,0 +1,144 @@
package cluster
import (
"encoding/json"
"fmt"
"io/ioutil"
)
type CNodeCfg struct {
NodeID int
NodeName string
ServerAddr string
ServiceList []string
ClusterNode []string
}
type CNode struct {
NodeID int
NodeName string
ServerAddr string
ServiceList map[string]bool
}
type ClusterConfig struct {
NodeList []CNodeCfg
//通过id获取结点
mapIdNode map[int]CNode
//map[nodename][ {CNode} ]
mapClusterNodeService map[string][]CNode
mapClusterServiceNode map[string][]CNode
mapLocalService map[string]bool
currentNode CNode
}
// ReadCfg ...
func ReadCfg(path string, nodeid int) (*ClusterConfig, error) {
c := &ClusterConfig{}
d, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
err = json.Unmarshal(d, c)
if err != nil {
return nil, err
}
c.mapIdNode = make(map[int]CNode, 1)
c.mapClusterNodeService = make(map[string][]CNode, 1)
c.mapLocalService = make(map[string]bool)
c.mapClusterServiceNode = make(map[string][]CNode, 1)
//组装mapIdNode
var clusterNode []string
for _, v := range c.NodeList {
//1.取所有结点
mapservice := make(map[string]bool, 1)
c.mapIdNode[v.NodeID] = CNode{v.NodeID, v.NodeName, v.ServerAddr, mapservice}
if nodeid == v.NodeID {
c.currentNode = c.mapIdNode[v.NodeID]
//2.mapServiceNode map[string][]string
for _, s := range v.ServiceList {
mapservice[s] = true
c.mapLocalService[s] = true
}
for _, c := range v.ClusterNode {
clusterNode = append(clusterNode, c)
}
}
}
//组装mapClusterNodeService
for _, kc := range clusterNode {
for _, v := range c.NodeList {
if kc == v.NodeName {
//将自有连接的结点取详细信息
mapservice := make(map[string]bool, 1)
curNode := CNode{v.NodeID, v.NodeName, v.ServerAddr, mapservice}
for _, s := range v.ServiceList {
mapservice[s] = true
c.mapClusterServiceNode[s] = append(c.mapClusterServiceNode[s], curNode)
}
c.mapClusterNodeService[v.NodeName] = append(c.mapClusterNodeService[v.NodeName], curNode)
}
}
}
fmt.Println(c.mapIdNode)
fmt.Println(c.mapClusterNodeService)
fmt.Println(c.mapClusterServiceNode)
return c, nil
}
func (slf *ClusterConfig) GetIdByService(serviceName string) []int {
var nodeidlist []int
nodeidlist = make([]int, 0)
nodeList, ok := slf.mapClusterServiceNode[serviceName]
if ok == true {
for _, v := range nodeList {
nodeidlist = append(nodeidlist, v.NodeID)
}
}
return nodeidlist
}
func (slf *ClusterConfig) GetIdByNodeService(NodeName string, serviceName string) []int {
var nodeidlist []int
nodeidlist = make([]int, 0)
if NodeName == slf.currentNode.NodeName {
nodeidlist = append(nodeidlist, slf.currentNode.NodeID)
}
v, ok := slf.mapClusterNodeService[NodeName]
if ok == false {
return nodeidlist
}
for _, n := range v {
_, ok = n.ServiceList[serviceName]
if ok == true {
nodeidlist = append(nodeidlist, n.NodeID)
}
}
return nodeidlist
}
func (slf *ClusterConfig) HasLocalService(serviceName string) bool {
_, ok := slf.mapLocalService[serviceName]
return ok == true
}

182
netbase/websocketserver.go Normal file
View File

@@ -0,0 +1,182 @@
package netbase
import (
"errors"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
type Connection struct {
wsConnect *websocket.Conn
inChan chan []byte
outChan chan []byte
closeChan chan byte
mutex sync.Mutex // 对closeChan关闭上锁
isClosed bool // 防止closeChan被关闭多次
}
type CWebSocketServer struct {
Connection
bindurl string
}
func NewWebSocketServer(bindurl string) *CWebSocketServer {
wss := new(CWebSocketServer)
wss.bindurl = bindurl
return wss
}
func (wss *CWebSocketServer) Start() {
http.HandleFunc("/ws", wsHandler)
go http.ListenAndServe(wss.bindurl, nil)
}
var (
upgrader = websocket.Upgrader{
// 允许跨域
CheckOrigin: func(r *http.Request) bool {
return true
},
}
)
func wsHandler(w http.ResponseWriter, r *http.Request) {
// w.Write([]byte("hello"))
var (
wsConn *websocket.Conn
err error
conn *Connection
data []byte
)
// 完成ws协议的握手操作
// Upgrade:websocket
if wsConn, err = upgrader.Upgrade(w, r, nil); err != nil {
return
}
if conn, err = InitConnection(wsConn); err != nil {
goto ERR
}
// 启动线程,不断发消息
go func() {
var (
err error
)
for {
if err = conn.WriteMessage([]byte("heartbeat")); err != nil {
return
}
time.Sleep(1 * time.Second)
}
}()
for {
if data, err = conn.ReadMessage(); err != nil {
goto ERR
}
if err = conn.WriteMessage(data); err != nil {
goto ERR
}
}
ERR:
conn.Close()
}
func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) {
conn = &Connection{
wsConnect: wsConn,
inChan: make(chan []byte, 1000),
outChan: make(chan []byte, 1000),
closeChan: make(chan byte, 1),
}
// 启动读协程
go conn.readLoop()
// 启动写协程
go conn.writeLoop()
return
}
func (conn *Connection) ReadMessage() (data []byte, err error) {
select {
case data = <-conn.inChan:
case <-conn.closeChan:
err = errors.New("connection is closeed")
}
return
}
func (conn *Connection) WriteMessage(data []byte) (err error) {
select {
case conn.outChan <- data:
case <-conn.closeChan:
err = errors.New("connection is closeed")
}
return
}
func (conn *Connection) Close() {
// 线程安全,可多次调用
conn.wsConnect.Close()
// 利用标记让closeChan只关闭一次
conn.mutex.Lock()
if !conn.isClosed {
close(conn.closeChan)
conn.isClosed = true
}
conn.mutex.Unlock()
}
// 内部实现
func (conn *Connection) readLoop() {
var (
data []byte
err error
)
for {
if _, data, err = conn.wsConnect.ReadMessage(); err != nil {
goto ERR
}
//阻塞在这里等待inChan有空闲位置
select {
case conn.inChan <- data:
case <-conn.closeChan: // closeChan 感知 conn断开
goto ERR
}
}
ERR:
conn.Close()
}
func (conn *Connection) writeLoop() {
var (
data []byte
err error
)
for {
select {
case data = <-conn.outChan:
case <-conn.closeChan:
goto ERR
}
if err = conn.wsConnect.WriteMessage(websocket.TextMessage, data); err != nil {
goto ERR
}
}
ERR:
conn.Close()
}

324
rpc/client.go Normal file
View File

@@ -0,0 +1,324 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package rpc
import (
"bufio"
"encoding/gob"
"errors"
"io"
"log"
"net"
"net/http"
"sync"
)
// ServerError represents an error that has been returned from
// the remote side of the RPC connection.
type ServerError string
func (e ServerError) Error() string {
return string(e)
}
var ErrShutdown = errors.New("connection is shut down")
// Call represents an active RPC.
type Call struct {
ServiceMethod string // The name of the service and method to call.
Args interface{} // The argument to the function (*struct).
Reply interface{} // The reply from the function (*struct).
Error error // After completion, the error status.
Done chan *Call // Strobes when call is complete.
}
// Client represents an RPC Client.
// There may be multiple outstanding Calls associated
// with a single Client, and a Client may be used by
// multiple goroutines simultaneously.
type Client struct {
codec ClientCodec
reqMutex sync.Mutex // protects following
request Request
mutex sync.Mutex // protects following
seq uint64
pending map[uint64]*Call
closing bool // user has called Close
shutdown bool // server has told us to stop
}
// A ClientCodec implements writing of RPC requests and
// reading of RPC responses for the client side of an RPC session.
// The client calls WriteRequest to write a request to the connection
// and calls ReadResponseHeader and ReadResponseBody in pairs
// to read responses. The client calls Close when finished with the
// connection. ReadResponseBody may be called with a nil
// argument to force the body of the response to be read and then
// discarded.
// See NewClient's comment for information about concurrent access.
type ClientCodec interface {
WriteRequest(*Request, interface{}) error
ReadResponseHeader(*Response) error
ReadResponseBody(interface{}) error
Close() error
}
func (client *Client) send(call *Call) {
client.reqMutex.Lock()
defer client.reqMutex.Unlock()
// Register this call.
client.mutex.Lock()
if client.shutdown || client.closing {
client.mutex.Unlock()
call.Error = ErrShutdown
call.done()
return
}
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()
// Encode and send the request.
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}
func (client *Client) input() {
var err error
var response Response
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
switch {
case call == nil:
// We've got no pending call. That usually means that
// WriteRequest partially failed, and call was already
// removed; response is a server telling us about an
// error reading request body. We should still attempt
// to read error body, but there's no one to give it to.
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
case response.Error != "":
// We've got an error response. Give this to the request;
// any subsequent requests will get the ReadResponseBody
// error if there is one.
call.Error = ServerError(response.Error)
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
call.done()
default:
err = client.codec.ReadResponseBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
// Terminate pending calls.
client.reqMutex.Lock()
client.mutex.Lock()
client.shutdown = true
closing := client.closing
if err == io.EOF {
if closing {
err = ErrShutdown
} else {
err = io.ErrUnexpectedEOF
}
}
for _, call := range client.pending {
call.Error = err
call.done()
}
client.mutex.Unlock()
client.reqMutex.Unlock()
if debugLog && err != io.EOF && !closing {
log.Println("rpc: client protocol error:", err)
}
}
func (call *Call) done() {
select {
case call.Done <- call:
// ok
default:
// We don't want to block here. It is the caller's responsibility to make
// sure the channel has enough buffer space. See comment in Go().
if debugLog {
log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
}
}
}
// NewClient returns a new Client to handle requests to the
// set of services at the other end of the connection.
// It adds a buffer to the write side of the connection so
// the header and payload are sent as a unit.
//
// The read and write halves of the connection are serialized independently,
// so no interlocking is required. However each half may be accessed
// concurrently so the implementation of conn should protect against
// concurrent reads or concurrent writes.
func NewClient(conn io.ReadWriteCloser) *Client {
encBuf := bufio.NewWriter(conn)
client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
return NewClientWithCodec(client)
}
// NewClientWithCodec is like NewClient but uses the specified
// codec to encode requests and decode responses.
func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
codec: codec,
pending: make(map[uint64]*Call),
}
go client.input()
return client
}
type gobClientCodec struct {
rwc io.ReadWriteCloser
dec *gob.Decoder
enc *gob.Encoder
encBuf *bufio.Writer
}
func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
if err = c.enc.Encode(r); err != nil {
return
}
if err = c.enc.Encode(body); err != nil {
return
}
return c.encBuf.Flush()
}
func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
return c.dec.Decode(r)
}
func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
return c.dec.Decode(body)
}
func (c *gobClientCodec) Close() error {
return c.rwc.Close()
}
// DialHTTP connects to an HTTP RPC server at the specified network address
// listening on the default HTTP RPC path.
func DialHTTP(network, address string) (*Client, error) {
return DialHTTPPath(network, address, DefaultRPCPath)
}
// DialHTTPPath connects to an HTTP RPC server
// at the specified network address and path.
func DialHTTPPath(network, address, path string) (*Client, error) {
var err error
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
// Require successful HTTP response
// before switching to RPC protocol.
resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
if err == nil && resp.Status == connected {
return NewClient(conn), nil
}
if err == nil {
err = errors.New("unexpected HTTP response: " + resp.Status)
}
conn.Close()
return nil, &net.OpError{
Op: "dial-http",
Net: network + " " + address,
Addr: nil,
Err: err,
}
}
// Dial connects to an RPC server at the specified network address.
func Dial(network, address string) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn), nil
}
// Close calls the underlying codec's Close method. If the connection is already
// shutting down, ErrShutdown is returned.
func (client *Client) Close() error {
client.mutex.Lock()
if client.closing {
client.mutex.Unlock()
return ErrShutdown
}
client.closing = true
client.mutex.Unlock()
return client.codec.Close()
}
// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
// If caller passes done != nil, it must arrange that
// done has enough buffer for the number of simultaneous
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}
// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}

90
rpc/debug.go Normal file
View File

@@ -0,0 +1,90 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package rpc
/*
Some HTML presented at http://machine:port/debug/rpc
Lists services, their methods, and some statistics, still rudimentary.
*/
import (
"fmt"
"html/template"
"net/http"
"sort"
)
const debugText = `<html>
<body>
<title>Services</title>
{{range .}}
<hr>
Service {{.Name}}
<hr>
<table>
<th align=center>Method</th><th align=center>Calls</th>
{{range .Method}}
<tr>
<td align=left font=fixed>{{.Name}}({{.Type.ArgType}}, {{.Type.ReplyType}}) error</td>
<td align=center>{{.Type.NumCalls}}</td>
</tr>
{{end}}
</table>
{{end}}
</body>
</html>`
var debug = template.Must(template.New("RPC debug").Parse(debugText))
// If set, print log statements for internal and I/O errors.
var debugLog = false
type debugMethod struct {
Type *methodType
Name string
}
type methodArray []debugMethod
type debugService struct {
Service *service
Name string
Method methodArray
}
type serviceArray []debugService
func (s serviceArray) Len() int { return len(s) }
func (s serviceArray) Less(i, j int) bool { return s[i].Name < s[j].Name }
func (s serviceArray) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (m methodArray) Len() int { return len(m) }
func (m methodArray) Less(i, j int) bool { return m[i].Name < m[j].Name }
func (m methodArray) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
type debugHTTP struct {
*Server
}
// Runs at /debug/rpc
func (server debugHTTP) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Build a sorted version of the data.
var services serviceArray
server.serviceMap.Range(func(snamei, svci interface{}) bool {
svc := svci.(*service)
ds := debugService{svc, snamei.(string), make(methodArray, 0, len(svc.method))}
for mname, method := range svc.method {
ds.Method = append(ds.Method, debugMethod{method, mname})
}
sort.Sort(ds.Method)
services = append(services, ds)
return true
})
sort.Sort(services)
err := debug.Execute(w, services)
if err != nil {
fmt.Fprintln(w, "rpc: error executing template:", err.Error())
}
}

734
rpc/server.go Normal file
View File

@@ -0,0 +1,734 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
/*
Package rpc provides access to the exported methods of an object across a
network or other I/O connection. A server registers an object, making it visible
as a service with the name of the type of the object. After registration, exported
methods of the object will be accessible remotely. A server may register multiple
objects (services) of different types but it is an error to register multiple
objects of the same type.
Only methods that satisfy these criteria will be made available for remote access;
other methods will be ignored:
- the method's type is exported.
- the method is exported.
- the method has two arguments, both exported (or builtin) types.
- the method's second argument is a pointer.
- the method has return type error.
In effect, the method must look schematically like
func (t *T) MethodName(argType T1, replyType *T2) error
where T1 and T2 can be marshaled by encoding/gob.
These requirements apply even if a different codec is used.
(In the future, these requirements may soften for custom codecs.)
The method's first argument represents the arguments provided by the caller; the
second argument represents the result parameters to be returned to the caller.
The method's return value, if non-nil, is passed back as a string that the client
sees as if created by errors.New. If an error is returned, the reply parameter
will not be sent back to the client.
The server may handle requests on a single connection by calling ServeConn. More
typically it will create a network listener and call Accept or, for an HTTP
listener, HandleHTTP and http.Serve.
A client wishing to use the service establishes a connection and then invokes
NewClient on the connection. The convenience function Dial (DialHTTP) performs
both steps for a raw network connection (an HTTP connection). The resulting
Client object has two methods, Call and Go, that specify the service and method to
call, a pointer containing the arguments, and a pointer to receive the result
parameters.
The Call method waits for the remote call to complete while the Go method
launches the call asynchronously and signals completion using the Call
structure's Done channel.
Unless an explicit codec is set up, package encoding/gob is used to
transport the data.
Here is a simple example. A server wishes to export an object of type Arith:
package server
import "errors"
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
type Arith int
func (t *Arith) Multiply(args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}
func (t *Arith) Divide(args *Args, quo *Quotient) error {
if args.B == 0 {
return errors.New("divide by zero")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
The server calls (for HTTP service):
arith := new(Arith)
rpc.Register(arith)
rpc.HandleHTTP()
l, e := net.Listen("tcp", ":1234")
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
At this point, clients can see a service "Arith" with methods "Arith.Multiply" and
"Arith.Divide". To invoke one, a client first dials the server:
client, err := rpc.DialHTTP("tcp", serverAddress + ":1234")
if err != nil {
log.Fatal("dialing:", err)
}
Then it can make a remote call:
// Synchronous call
args := &server.Args{7,8}
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("arith error:", err)
}
fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)
or
// Asynchronous call
quotient := new(Quotient)
divCall := client.Go("Arith.Divide", args, quotient, nil)
replyCall := <-divCall.Done // will be equal to divCall
// check errors, print, etc.
A server implementation will often provide a simple, type-safe wrapper for the
client.
The net/rpc package is frozen and is not accepting new features.
*/
package rpc
import (
"bufio"
"encoding/gob"
"errors"
"io"
"log"
"net"
"net/http"
"reflect"
"strings"
"sync"
"unicode"
"unicode/utf8"
)
const (
// Defaults used by HandleHTTP
DefaultRPCPath = "/_goRPC_"
DefaultDebugPath = "/debug/rpc"
)
// Precompute the reflect type for error. Can't use error directly
// because Typeof takes an empty interface value. This is annoying.
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
type methodType struct {
sync.Mutex // protects counters
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
numCalls uint
}
type service struct {
name string // name of service
rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // type of the receiver
method map[string]*methodType // registered methods
}
// Request is a header written before every RPC call. It is used internally
// but documented here as an aid to debugging, such as when analyzing
// network traffic.
type Request struct {
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *Request // for free list in Server
}
// Response is a header written before every RPC return. It is used internally
// but documented here as an aid to debugging, such as when analyzing
// network traffic.
type Response struct {
ServiceMethod string // echoes that of the Request
Seq uint64 // echoes that of the request
Error string // error, if any.
next *Response // for free list in Server
}
// Server represents an RPC Server.
type Server struct {
serviceMap sync.Map // map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
}
// NewServer returns a new Server.
func NewServer() *Server {
return &Server{}
}
// DefaultServer is the default instance of *Server.
var DefaultServer = NewServer()
// Is this an exported - upper case - name?
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
}
// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
// Register publishes in the server the set of methods of the
// receiver value that satisfy the following conditions:
// - exported method of exported type
// - two arguments, both of exported type
// - the second argument is a pointer
// - one return value, of type error
// It returns an error if the receiver is not an exported type or has
// no suitable methods. It also logs the error using package log.
// The client accesses each method using a string of the form "Type.Method",
// where Type is the receiver's concrete type.
func (server *Server) Register(rcvr interface{}) error {
return server.register(rcvr, "", false)
}
// RegisterName is like Register but uses the provided name for the type
// instead of the receiver's concrete type.
func (server *Server) RegisterName(name string, rcvr interface{}) error {
return server.register(rcvr, name, true)
}
func (server *Server) register(rcvr interface{}, name string, useName bool) error {
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
if useName {
sname = name
}
if sname == "" {
s := "rpc.Register: no service name for type " + s.typ.String()
log.Print(s)
return errors.New(s)
}
if !isExported(sname) && !useName {
s := "rpc.Register: type " + sname + " is not exported"
log.Print(s)
return errors.New(s)
}
s.name = sname
// Install the methods
s.method = suitableMethods(s.typ, true)
if len(s.method) == 0 {
str := ""
// To help the user, see if a pointer receiver would work.
method := suitableMethods(reflect.PtrTo(s.typ), false)
if len(method) != 0 {
str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
} else {
str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
}
//log.Print(str)
return errors.New(str)
}
if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
return errors.New("rpc: service already defined: " + sname)
}
return nil
}
// suitableMethods returns suitable Rpc methods of typ, it will report
// error using log if reportErr is true.
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
methods := make(map[string]*methodType)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
if len(mname) > 4 && mname[:4] != "RPC_" {
continue
}
// Method must be exported.
if method.PkgPath != "" {
continue
}
// Method needs three ins: receiver, *args, *reply.
if mtype.NumIn() != 3 {
if reportErr {
//log.Printf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn())
}
continue
}
// First arg need not be a pointer.
argType := mtype.In(1)
if !isExportedOrBuiltinType(argType) {
if reportErr {
//log.Printf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType)
}
continue
}
// Second arg must be a pointer.
replyType := mtype.In(2)
if replyType.Kind() != reflect.Ptr {
if reportErr {
//log.Printf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType)
}
continue
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
if reportErr {
//log.Printf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType)
}
continue
}
// Method needs one out.
if mtype.NumOut() != 1 {
if reportErr {
//log.Printf("rpc.Register: method %q has %d output parameters; needs exactly one\n", mname, mtype.NumOut())
}
continue
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
if reportErr {
//log.Printf("rpc.Register: return type of method %q is %q, must be error\n", mname, returnType)
}
continue
}
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
}
return methods
}
// A value sent as a placeholder for the server's response value when the server
// receives an invalid request. It is never decoded by the client since the Response
// contains an error when it is used.
var invalidRequest = struct{}{}
func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
resp := server.getResponse()
// Encode the response header
resp.ServiceMethod = req.ServiceMethod
if errmsg != "" {
resp.Error = errmsg
reply = invalidRequest
}
resp.Seq = req.Seq
sending.Lock()
err := codec.WriteResponse(resp, reply)
if debugLog && err != nil {
log.Println("rpc: writing response:", err)
}
sending.Unlock()
server.freeResponse(resp)
}
func (m *methodType) NumCalls() (n uint) {
m.Lock()
n = m.numCalls
m.Unlock()
return n
}
func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
if wg != nil {
defer wg.Done()
}
mtype.Lock()
mtype.numCalls++
mtype.Unlock()
function := mtype.method.Func
// Invoke the method, providing a new value for the reply.
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
// The return value for the method is an error.
errInter := returnValues[0].Interface()
errmsg := ""
if errInter != nil {
errmsg = errInter.(error).Error()
}
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
server.freeRequest(req)
}
type gobServerCodec struct {
rwc io.ReadWriteCloser
dec *gob.Decoder
enc *gob.Encoder
encBuf *bufio.Writer
closed bool
}
func (c *gobServerCodec) ReadRequestHeader(r *Request) error {
return c.dec.Decode(r)
}
func (c *gobServerCodec) ReadRequestBody(body interface{}) error {
return c.dec.Decode(body)
}
func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) (err error) {
if err = c.enc.Encode(r); err != nil {
if c.encBuf.Flush() == nil {
// Gob couldn't encode the header. Should not happen, so if it does,
// shut down the connection to signal that the connection is broken.
log.Println("rpc: gob error encoding response:", err)
c.Close()
}
return
}
if err = c.enc.Encode(body); err != nil {
if c.encBuf.Flush() == nil {
// Was a gob problem encoding the body but the header has been written.
// Shut down the connection to signal that the connection is broken.
log.Println("rpc: gob error encoding body:", err)
c.Close()
}
return
}
return c.encBuf.Flush()
}
func (c *gobServerCodec) Close() error {
if c.closed {
// Only call c.rwc.Close once; otherwise the semantics are undefined.
return nil
}
c.closed = true
return c.rwc.Close()
}
// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
// The caller typically invokes ServeConn in a go statement.
// ServeConn uses the gob wire format (see package gob) on the
// connection. To use an alternate codec, use ServeCodec.
// See NewClient's comment for information about concurrent access.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
encBuf: buf,
}
server.ServeCodec(srv)
}
// ServeCodec is like ServeConn but uses the specified codec to
// decode requests and encode responses.
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
if debugLog && err != io.EOF {
log.Println("rpc:", err)
}
if !keepReading {
break
}
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error())
server.freeRequest(req)
}
continue
}
wg.Add(1)
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
}
// We've seen that there are no more requests.
// Wait for responses to be sent before closing codec.
wg.Wait()
codec.Close()
}
// ServeRequest is like ServeCodec but synchronously serves a single request.
// It does not close the codec upon completion.
func (server *Server) ServeRequest(codec ServerCodec) error {
sending := new(sync.Mutex)
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
if !keepReading {
return err
}
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error())
server.freeRequest(req)
}
return err
}
service.call(server, sending, nil, mtype, req, argv, replyv, codec)
return nil
}
func (server *Server) getRequest() *Request {
server.reqLock.Lock()
req := server.freeReq
if req == nil {
req = new(Request)
} else {
server.freeReq = req.next
*req = Request{}
}
server.reqLock.Unlock()
return req
}
func (server *Server) freeRequest(req *Request) {
server.reqLock.Lock()
req.next = server.freeReq
server.freeReq = req
server.reqLock.Unlock()
}
func (server *Server) getResponse() *Response {
server.respLock.Lock()
resp := server.freeResp
if resp == nil {
resp = new(Response)
} else {
server.freeResp = resp.next
*resp = Response{}
}
server.respLock.Unlock()
return resp
}
func (server *Server) freeResponse(resp *Response) {
server.respLock.Lock()
resp.next = server.freeResp
server.freeResp = resp
server.respLock.Unlock()
}
func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
service, mtype, req, keepReading, err = server.readRequestHeader(codec)
if err != nil {
if !keepReading {
return
}
// discard body
codec.ReadRequestBody(nil)
return
}
// Decode the argument value.
argIsValue := false // if true, need to indirect before calling.
if mtype.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(mtype.ArgType.Elem())
} else {
argv = reflect.New(mtype.ArgType)
argIsValue = true
}
// argv guaranteed to be a pointer now.
if err = codec.ReadRequestBody(argv.Interface()); err != nil {
return
}
if argIsValue {
argv = argv.Elem()
}
replyv = reflect.New(mtype.ReplyType.Elem())
switch mtype.ReplyType.Elem().Kind() {
case reflect.Map:
replyv.Elem().Set(reflect.MakeMap(mtype.ReplyType.Elem()))
case reflect.Slice:
replyv.Elem().Set(reflect.MakeSlice(mtype.ReplyType.Elem(), 0, 0))
}
return
}
func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
// Grab the request header.
req = server.getRequest()
err = codec.ReadRequestHeader(req)
if err != nil {
req = nil
if err == io.EOF || err == io.ErrUnexpectedEOF {
return
}
err = errors.New("rpc: server cannot decode request: " + err.Error())
return
}
// We read the header successfully. If we see an error now,
// we can still recover and move on to the next request.
keepReading = true
dot := strings.LastIndex(req.ServiceMethod, ".")
if dot < 0 {
err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
return
}
serviceName := req.ServiceMethod[:dot]
methodName := req.ServiceMethod[dot+1:]
// Look up the request.
svci, ok := server.serviceMap.Load(serviceName)
if !ok {
err = errors.New("rpc: can't find service " + req.ServiceMethod)
return
}
svc = svci.(*service)
mtype = svc.method[methodName]
if mtype == nil {
err = errors.New("rpc: can't find method " + req.ServiceMethod)
}
return
}
// Accept accepts connections on the listener and serves requests
// for each incoming connection. Accept blocks until the listener
// returns a non-nil error. The caller typically invokes Accept in a
// go statement.
func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Print("rpc.Serve: accept:", err.Error())
return
}
go server.ServeConn(conn)
}
}
// Register publishes the receiver's methods in the DefaultServer.
func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
// RegisterName is like Register but uses the provided name for the type
// instead of the receiver's concrete type.
func RegisterName(name string, rcvr interface{}) error {
return DefaultServer.RegisterName(name, rcvr)
}
// A ServerCodec implements reading of RPC requests and writing of
// RPC responses for the server side of an RPC session.
// The server calls ReadRequestHeader and ReadRequestBody in pairs
// to read requests from the connection, and it calls WriteResponse to
// write a response back. The server calls Close when finished with the
// connection. ReadRequestBody may be called with a nil
// argument to force the body of the request to be read and discarded.
// See NewClient's comment for information about concurrent access.
type ServerCodec interface {
ReadRequestHeader(*Request) error
ReadRequestBody(interface{}) error
WriteResponse(*Response, interface{}) error
// Close can be called multiple times and must be idempotent.
Close() error
}
// ServeConn runs the DefaultServer on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
// The caller typically invokes ServeConn in a go statement.
// ServeConn uses the gob wire format (see package gob) on the
// connection. To use an alternate codec, use ServeCodec.
// See NewClient's comment for information about concurrent access.
func ServeConn(conn io.ReadWriteCloser) {
DefaultServer.ServeConn(conn)
}
// ServeCodec is like ServeConn but uses the specified codec to
// decode requests and encode responses.
func ServeCodec(codec ServerCodec) {
DefaultServer.ServeCodec(codec)
}
// ServeRequest is like ServeCodec but synchronously serves a single request.
// It does not close the codec upon completion.
func ServeRequest(codec ServerCodec) error {
return DefaultServer.ServeRequest(codec)
}
// Accept accepts connections on the listener and serves requests
// to DefaultServer for each incoming connection.
// Accept blocks; the caller typically invokes it in a go statement.
func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
// Can connect to RPC service using HTTP CONNECT to rpcPath.
var connected = "200 Connected to Go RPC"
// ServeHTTP implements an http.Handler that answers RPC requests.
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "CONNECT" {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusMethodNotAllowed)
io.WriteString(w, "405 must CONNECT\n")
return
}
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
return
}
io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
server.ServeConn(conn)
}
// HandleHTTP registers an HTTP handler for RPC messages on rpcPath,
// and a debugging handler on debugPath.
// It is still necessary to invoke http.Serve(), typically in a go statement.
func (server *Server) HandleHTTP(rpcPath, debugPath string) {
http.Handle(rpcPath, server)
http.Handle(debugPath, debugHTTP{server})
}
// HandleHTTP registers an HTTP handler for RPC messages to DefaultServer
// on DefaultRPCPath and a debugging handler on DefaultDebugPath.
// It is still necessary to invoke http.Serve(), typically in a go statement.
func HandleHTTP() {
DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
}

88
server/server.go Normal file
View File

@@ -0,0 +1,88 @@
package server
import (
"fmt"
"log"
"net/http"
_ "net/http/pprof"
"origin/cluster"
"origin/service"
"os"
"os/signal"
"sync"
"syscall"
)
type CExitCtl struct {
exit chan bool
waitGroup *sync.WaitGroup
}
type cserver struct {
CExitCtl
serviceManager service.IServiceManager
sigs chan os.Signal
}
func (s *cserver) Init() {
service.InitLog()
service.InstanceServiceMgr().Init()
s.exit = make(chan bool)
s.waitGroup = &sync.WaitGroup{}
s.sigs = make(chan os.Signal, 1)
signal.Notify(s.sigs, syscall.SIGINT, syscall.SIGTERM)
}
func (s *cserver) SetupService(services ...service.IService) {
for i := 0; i < len(services); i++ {
if cluster.InstanceClusterMgr().HasLocalService(services[i].GetServiceName()) == true {
service.InstanceServiceMgr().Setup(services[i])
}
}
}
func (s *cserver) Start() {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
service.InstanceServiceMgr().Start(s.exit, s.waitGroup)
cluster.InstanceClusterMgr().Start()
select {
case <-s.sigs:
fmt.Println("收到信号推出程序")
}
s.Stop()
}
func (s *cserver) Stop() {
close(s.exit)
s.waitGroup.Wait()
}
func NewServer() *cserver {
err := cluster.InstanceClusterMgr().Init()
if err != nil {
fmt.Print(err)
return nil
}
return new(cserver)
}
func HasCmdParam(param string) bool {
for i := 0; i < len(os.Args); i++ {
if os.Args[i] == param {
return true
}
}
return false
}

180
service/Service.go Normal file
View File

@@ -0,0 +1,180 @@
package service
import (
"fmt"
"log"
"os"
"reflect"
"strings"
"sync"
"time"
)
type MethodInfo struct {
Fun reflect.Value
ParamList []reflect.Value
types reflect.Type
}
type IBaseModule interface {
SetModuleName(modulename string) bool
GetModuleName() string
OnInit() error
OnRun() error
}
type IService interface {
Init(Iservice interface{}, servicetype int) error
Run(service IService, exit chan bool, pwaitGroup *sync.WaitGroup) error
OnInit() error
OnEndInit() error
OnRun() error
OnDestory() error
OnFetchService(iservice IService) error
GetServiceType() int
GetServiceName() string
GetServiceId() int
IsTimeOutTick(microSecond int64) bool
AddModule(module IBaseModule, autorun bool) bool
GetModule(module string) IBaseModule
GetStatus() int
}
var Log *log.Logger
var logFile *os.File
func InitLog() {
fileName := "log.log"
var err error
logFile, err = os.Create(fileName)
if err != nil {
log.Fatalln("open file error")
}
Log = log.New(logFile, "", log.LstdFlags)
}
type BaseService struct {
serviceid int
servicename string
servicetype int
tickTime int64
statTm int64
Status int
modulelist []IBaseModule
}
type BaseModule struct {
modulename string
}
func (slf *BaseModule) SetModuleName(modulename string) bool {
slf.modulename = modulename
return true
}
func (slf *BaseModule) GetModuleName() string {
return slf.modulename
}
func (slf *BaseService) GetServiceId() int {
return slf.serviceid
}
func (slf *BaseService) GetServiceType() int {
return slf.servicetype
}
func (slf *BaseService) GetServiceName() string {
return slf.servicename
}
func (slf *BaseService) GetStatus() int {
return slf.Status
}
func (slf *BaseService) OnEndInit() error {
return nil
}
func (slf *BaseService) OnFetchService(iservice IService) error {
return nil
}
func (slf *BaseService) Init(Iservice interface{}, servicetype int) error {
slf.servicename = fmt.Sprintf("%T", Iservice)
parts := strings.Split(slf.servicename, ".")
if len(parts) != 2 {
err := fmt.Errorf("BaseService.Init: service name is error: %q", slf.servicename)
return err
}
slf.servicename = parts[1]
slf.servicetype = servicetype
slf.serviceid = InstanceServiceMgr().GenServiceID()
return nil
}
func (slf *BaseService) Run(service IService, exit chan bool, pwaitGroup *sync.WaitGroup) error {
defer pwaitGroup.Done()
for {
select {
case <-exit:
fmt.Println("stopping...")
return nil
default:
}
slf.tickTime = time.Now().UnixNano() / 1e6
service.OnRun()
slf.tickTime = time.Now().UnixNano() / 1e6
}
return nil
}
func (slf *BaseService) RPC_CheckServiceTickTimeOut(microSecond int64) error {
if slf.IsTimeOutTick(microSecond) == true {
Log.Printf("service:%s is timeout,state:%d", slf.GetServiceName(), slf.GetStatus())
}
return nil
}
func (slf *BaseService) IsTimeOutTick(microSecond int64) bool {
nowtm := time.Now().UnixNano() / 1e6
return nowtm-slf.tickTime >= microSecond
}
func (slf *BaseService) AddModule(module IBaseModule, autorun bool) bool {
typename := fmt.Sprintf("%v", reflect.TypeOf(module))
parts := strings.Split(typename, ".")
if len(parts) < 2 {
return false
}
module.SetModuleName(parts[1])
slf.modulelist = append(slf.modulelist, module)
module.OnInit()
if autorun == true {
go module.OnRun()
}
return true
}
func (slf *BaseService) GetModule(module string) IBaseModule {
for _, v := range slf.modulelist {
if v.GetModuleName() == module {
return v
}
}
return nil
}

119
service/servicemanager.go Normal file
View File

@@ -0,0 +1,119 @@
package service
import (
"fmt"
"sync"
"time"
)
type IServiceManager interface {
Setup(s IService) bool
Init() bool
Start() bool
CreateServiceID() int
}
type CServiceManager struct {
//serviceList []IService
genserviceid int
localserviceMap map[string]IService
}
func (slf *CServiceManager) Setup(s IService) bool {
slf.localserviceMap[s.GetServiceName()] = s
return true
}
func (slf *CServiceManager) FindService(serviceName string) IService {
service, ok := slf.localserviceMap[serviceName]
if ok {
return service
}
return nil
}
type FetchService func(s IService) error
func (slf *CServiceManager) FetchService(s FetchService) IService {
for _, se := range slf.localserviceMap {
s(se)
}
return nil
}
func (slf *CServiceManager) Init() bool {
for _, s := range slf.localserviceMap {
if s.OnInit() != nil {
return false
}
}
// 初始化结束
for _, s := range slf.localserviceMap {
if s.OnEndInit() != nil {
return false
}
}
return true
}
func (slf *CServiceManager) Start(exit chan bool, pwaitGroup *sync.WaitGroup) bool {
for _, s := range slf.localserviceMap {
pwaitGroup.Add(1)
go s.Run(s, exit, pwaitGroup)
}
pwaitGroup.Add(1)
go slf.CheckServiceTimeTimeout(exit, pwaitGroup)
return true
}
func (slf *CServiceManager) CheckServiceTimeTimeout(exit chan bool, pwaitGroup *sync.WaitGroup) {
defer pwaitGroup.Done()
for {
select {
case <-exit:
fmt.Println("CheckServiceTimeTimeout stopping...")
return
default:
}
for _, s := range slf.localserviceMap {
if s.IsTimeOutTick(20000) == true {
Log.Printf("service:%s is timeout,state:%d", s.GetServiceName(), s.GetStatus())
}
}
time.Sleep(2 * time.Second)
}
}
func (slf *CServiceManager) GenServiceID() int {
slf.genserviceid += 1
return slf.genserviceid
}
func (slf *CServiceManager) Get() bool {
for _, s := range slf.localserviceMap {
go s.OnRun()
}
return true
}
var _self *CServiceManager
func InstanceServiceMgr() *CServiceManager {
if _self == nil {
_self = new(CServiceManager)
_self.localserviceMap = make(map[string]IService)
return _self
}
return _self
}

1
sysmodule/basemodule.go Normal file
View File

@@ -0,0 +1 @@
package sysmodule

35
sysmodule/timer.go.mv Normal file
View File

@@ -0,0 +1,35 @@
package wsservice
import (
"origin/service"
)
//声明控制器函数Map类型变量
type cWSService struct {
service.BaseService
port int
}
func (ws *cWSService) OnInit() error {
return nil
}
func (ws *cWSService) OnRun() error {
return nil
}
func (ws *cWSService) OnDestory() error {
return nil
}
func NewWSService(servicetype int) *cWSService {
wss := new(cWSService)
wss.Init(wss, servicetype)
return wss
}
func (ws *cWSService) RPC_TestMethod(a string, b int) error {
return nil
}

View File

@@ -0,0 +1,158 @@
package sysmodule
import (
"fmt"
"gorilla/websocket"
"log"
"net/http"
"net/url"
"time"
)
type IWebsocketClient interface {
Init(slf IWebsocketClient, strurl string, bproxy bool, timeoutsec time.Duration) error
Start() error
WriteMessage(msg []byte) error
OnDisconnect() error
OnConnected() error
OnReadMessage(msg []byte) error
}
type WebsocketClient struct {
WsDailer *websocket.Dialer
conn *websocket.Conn
url string
state int //0未连接状态 1连接状态
bwritemsg chan []byte
slf IWebsocketClient
timeoutsec time.Duration
}
func (ws *WebsocketClient) Init(slf IWebsocketClient, strurl string, bproxy bool, timeoutsec time.Duration) error {
ws.timeoutsec = timeoutsec
ws.slf = slf
if bproxy == true {
proxy := func(_ *http.Request) (*url.URL, error) {
return url.Parse("http://127.0.0.1:1080")
}
if timeoutsec > 0 {
tosec := timeoutsec * time.Second
ws.WsDailer = &websocket.Dialer{Proxy: proxy, HandshakeTimeout: tosec}
} else {
ws.WsDailer = &websocket.Dialer{Proxy: proxy}
}
} else {
if timeoutsec > 0 {
tosec := timeoutsec * time.Second
ws.WsDailer = &websocket.Dialer{HandshakeTimeout: tosec}
} else {
ws.WsDailer = &websocket.Dialer{}
}
}
ws.url = strurl
ws.bwritemsg = make(chan []byte, 1000)
return nil
}
func (ws *WebsocketClient) OnRun() error {
for {
if ws.state == 0 {
time.Sleep(1 * time.Second)
ws.StartConnect()
} else {
ws.conn.SetReadDeadline(time.Now().Add(ws.timeoutsec * time.Second))
_, message, err := ws.conn.ReadMessage()
if err != nil {
log.Printf("到服务器的连接断开 %+v\n", err)
ws.conn.Close()
ws.state = 0
ws.slf.OnDisconnect()
continue
}
ws.slf.OnReadMessage(message)
}
}
return nil
}
func (ws *WebsocketClient) StartConnect() error {
var err error
ws.conn, _, err = ws.WsDailer.Dial(ws.url, nil)
fmt.Printf("connecting %s, %+v\n", ws.url, err)
if err != nil {
return err
}
ws.state = 1
ws.slf.OnConnected()
return nil
}
func (ws *WebsocketClient) Start() error {
ws.state = 0
go ws.OnRun()
go ws.writeMsg()
return nil
}
//触发
func (ws *WebsocketClient) writeMsg() error {
timerC := time.NewTicker(time.Second * 5).C
for {
if ws.state == 0 {
time.Sleep(1 * time.Second)
continue
}
select {
case <-timerC:
if ws.state == 1 {
ws.WriteMessage([]byte(`ping`))
}
case msg := <-ws.bwritemsg:
if ws.state == 1 {
ws.conn.SetWriteDeadline(time.Now().Add(ws.timeoutsec * time.Second))
err := ws.conn.WriteMessage(websocket.TextMessage, msg)
if err != nil {
fmt.Print(err)
ws.state = 0
ws.conn.Close()
ws.slf.OnDisconnect()
}
}
}
}
return nil
}
func (ws *WebsocketClient) WriteMessage(msg []byte) error {
ws.bwritemsg <- msg
return nil
}
func (ws *WebsocketClient) OnDisconnect() error {
return nil
}
func (ws *WebsocketClient) OnConnected() error {
return nil
}
//触发
func (ws *WebsocketClient) OnReadMessage(msg []byte) error {
return nil
}

35
wsservice/wsservice.go Normal file
View File

@@ -0,0 +1,35 @@
package wsservice
import (
"origin/service"
)
//声明控制器函数Map类型变量
type cWSService struct {
service.BaseService
port int
}
func (ws *cWSService) OnInit() error {
return nil
}
func (ws *cWSService) OnRun() error {
return nil
}
func (ws *cWSService) OnDestory() error {
return nil
}
func NewWSService(servicetype int) *cWSService {
wss := new(cWSService)
wss.Init(wss, servicetype)
return wss
}
func (ws *cWSService) RPC_TestMethod(a string, b int) error {
return nil
}