This commit is contained in:
boyce
2019-07-12 10:21:48 +08:00
7 changed files with 141 additions and 14 deletions

View File

@@ -345,12 +345,19 @@ func (slf *CCluster) GetRpcClientByNodeId(nodeid int) *RpcClient {
}
func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}, queueModle bool) error {
return slf.goImpl(bCast, NodeServiceMethod, args, queueModle, true)
}
func (slf *CCluster) goImpl(bCast bool, NodeServiceMethod string, args interface{}, queueModle bool, log bool) error {
var callServiceName string
var serviceName string
nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName, &serviceName)
if len(nodeidList) < 1 {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) not find nodes.", NodeServiceMethod)
return fmt.Errorf("CCluster.Go(%s) not find nodes.", NodeServiceMethod)
err := fmt.Errorf("CCluster.Go(%s) not find nodes.", NodeServiceMethod)
if log {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, err.Error())
}
return err
}
if bCast == false && len(nodeidList) > 1 {
@@ -364,23 +371,38 @@ func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{},
return fmt.Errorf("CCluster.Go(%s) cannot find service %s", NodeServiceMethod, serviceName)
}
if iService.IsInit() == false {
service.GetLogger().Printf(sysmodule.LEVER_WARN, "CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid)
return fmt.Errorf("CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid)
err := fmt.Errorf("CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid)
if log {
service.GetLogger().Printf(sysmodule.LEVER_WARN, err.Error())
}
return err
}
replyCall := slf.LocalRpcClient.Go(callServiceName, args, nil, nil, queueModle)
if replyCall.Error != nil {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) fail:%v.", NodeServiceMethod, replyCall.Error)
err := fmt.Errorf("CCluster.Go(%s) fail:%v.", NodeServiceMethod, replyCall.Error)
if log {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, err.Error())
} else {
return err
}
}
} else {
pclient := slf.GetClusterClient(nodeid)
if pclient == nil {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) NodeId %d not find client", NodeServiceMethod, nodeid)
return fmt.Errorf("CCluster.Go(%s) NodeId %d not find client", NodeServiceMethod, nodeid)
err := fmt.Errorf("CCluster.Go(%s) NodeId %d not find client", NodeServiceMethod, nodeid)
if log {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, err.Error())
}
return err
}
replyCall := pclient.Go(callServiceName, args, nil, nil, queueModle)
if replyCall.Error != nil {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) fail:%v.", NodeServiceMethod, replyCall.Error)
err := fmt.Errorf("CCluster.Go(%s) fail:%v.", NodeServiceMethod, replyCall.Error)
if log {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, err.Error())
}
return err
}
}
}
@@ -455,6 +477,11 @@ func GoQueue(NodeServiceMethod string, args interface{}) error {
return InstanceClusterMgr().Go(false, NodeServiceMethod, args, true)
}
//在GoQueue的基础上增加是否写日志参数
func GoQueueEx(NodeServiceMethod string, args interface{}, log bool) error {
return InstanceClusterMgr().goImpl(false, NodeServiceMethod, args, true, log)
}
func CastGoQueue(NodeServiceMethod string, args interface{}) error {
return InstanceClusterMgr().Go(true, NodeServiceMethod, args, true)
}

View File

@@ -38,7 +38,7 @@ type WebsocketClient struct {
timeoutsec time.Duration
bRun bool
ping string
ping []byte
}
const (
@@ -71,12 +71,12 @@ func (ws *WebsocketClient) Init(slf IWebsocketClient, strurl, strProxyPath strin
}
ws.url = strurl
ws.ping = `ping`
ws.ping = []byte(`ping`)
return nil
}
func (ws *WebsocketClient) SetPing(ping string) {
ws.ping = ping
ws.ping = []byte(ping)
}
//OnRun ...
@@ -182,7 +182,7 @@ func (ws *WebsocketClient) writeMsg() error {
case <-timerC:
if ws.state == 2 {
err := ws.WriteMessage([]byte(ws.ping))
err := ws.WriteMessage(ws.ping)
if err != nil {
service.GetLogger().Printf(service.LEVER_WARN, "websocket client is disconnect [%s],information is %v", ws.url, err)
ws.state = 0

View File

@@ -23,6 +23,8 @@ type IWebsocketServer interface {
CreateClient(conn *websocket.Conn) *WSClient
Disconnect(clientid uint64)
ReleaseClient(pclient *WSClient)
Clients() []uint64
BroadcastMsg(messageType int, msg []byte) int
}
type IMessageReceiver interface {

View File

@@ -138,6 +138,7 @@ import (
"reflect"
"strings"
"sync"
"time"
"unicode"
"unicode/utf8"
@@ -273,7 +274,42 @@ func (server *Server) ProcessQueue(name string) {
return
}
//定时报告队列超负荷运行
const reportDur = time.Minute
chCap := int64(cap(chanRpc))
reportLimit := chCap * 1 / 2
errNum := int64(0)
lastSize := int64(0)
maxSize := int64(0)
totalSize := int64(0)
lastReportTime := time.Now()
for {
//定时报告channel有没有超负荷运行
if size := int64(len(chanRpc)); size >= reportLimit || lastSize >= reportLimit {
if size >= reportLimit {
errNum++
totalSize += size
if size > maxSize {
maxSize = size
}
}
dur := time.Now().Sub(lastReportTime)
if dur >= reportDur {
avgSize := int64(0)
if errNum > 0 {
avgSize = totalSize / errNum
}
orginservice.GetLogger().Printf(orginservice.LEVER_WARN, "RpcServer.ProcessQueue(%s) %d erros when channel size reaches %d/%d maxSize=%d avgSize=%d during last %s",
name, errNum, reportLimit, chCap, maxSize, avgSize, dur)
errNum = 0
maxSize = 0
totalSize = 0
lastSize = size
lastReportTime = time.Now()
}
}
rpcData := <-chanRpc
rpcData.service.call(rpcData.server, rpcData.sending, rpcData.wg, rpcData.mtype, rpcData.req, rpcData.argv, rpcData.replyv, rpcData.codec)
}
@@ -560,7 +596,8 @@ func (server *Server) ServeCodec(codec ServerCodec) {
rpcChan, ok := server.mapCallQueue[service.name]
if ok == true {
if len(rpcChan) >= MAX_RPCDATA_QUEUE_COUNT {
orginservice.GetLogger().Printf(orginservice.LEVER_FATAL, "Rpc Service Name %s chan overload %d", service.name, MAX_RPCDATA_QUEUE_COUNT)
//不在这里写日志了 否则RPC繁忙 这里会刷日志把磁盘刷爆 ProcessQueue会记录channel繁忙的日志
//orginservice.GetLogger().Printf(orginservice.LEVER_FATAL, "Rpc Service Name %s chan overload %d", service.name, MAX_RPCDATA_QUEUE_COUNT)
continue
}

View File

@@ -1,5 +1,9 @@
package service
import (
"fmt"
)
const (
LEVER_UNKNOW = 0
LEVER_DEBUG = 1
@@ -10,8 +14,24 @@ const (
LEVEL_MAX = 6
)
var defaultLogger = &LoggerFmt{}
type ILogger interface {
Printf(level uint, format string, v ...interface{})
Print(level uint, v ...interface{})
SetLogLevel(level uint)
}
type LoggerFmt struct {
}
func (slf *LoggerFmt) Printf(level uint, format string, v ...interface{}) {
fmt.Printf(format, v...)
fmt.Println("")
}
func (slf *LoggerFmt) Print(level uint, v ...interface{}) {
fmt.Println(v...)
}
func (slf *LoggerFmt) SetLogLevel(level uint) {
//do nothing
}

View File

@@ -98,7 +98,11 @@ func (slf *CServiceManager) GenServiceID() int {
}
func (slf *CServiceManager) GetLogger() ILogger {
return slf.logger
ret := slf.logger
if ret == nil {
ret = defaultLogger
}
return ret
}
var self *CServiceManager

View File

@@ -0,0 +1,37 @@
package servicelist
import (
"github.com/duanhf2012/origin/originnode"
"github.com/duanhf2012/origin/service"
)
var node = func() *originnode.COriginNode {
//1.新建OrginNode结点
node := originnode.NewOriginNode()
if node == nil {
println("originnode.NewOriginNode fail")
return nil
}
return node
}()
var serviceList []service.IService
// 增加服务列表 在init中调用
// 因为是init的时候调用 所以不用锁
func PushService(s service.IService) {
serviceList = append(serviceList, s)
}
//在main中调用该函数即可加载所有service
//debugCheckUrl "localhost:6060"
func Start(debugCheckUrl string) {
node.OpenDebugCheck(debugCheckUrl)
node.SetupService(serviceList...)
//5.初始化结点
node.Init()
//6.开始结点
node.Start()
}