mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-15 16:34:44 +08:00
Merge branch 'master' of https://github.com/duanhf2012/origin
This commit is contained in:
@@ -145,3 +145,8 @@ func GetRpcClient(nodeId int,serviceMethod string) ([]*rpc.Client,error) {
|
|||||||
func GetRpcServer() *rpc.Server{
|
func GetRpcServer() *rpc.Server{
|
||||||
return &cluster.rpcServer
|
return &cluster.rpcServer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (slf *Cluster) IsNodeConnected (nodeId int) bool {
|
||||||
|
pClient := slf.GetRpcClient(nodeId)
|
||||||
|
return pClient!=nil && pClient.IsConnected()
|
||||||
|
}
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import (
|
|||||||
const Default_EventChannelLen = 10000
|
const Default_EventChannelLen = 10000
|
||||||
|
|
||||||
//事件接受器
|
//事件接受器
|
||||||
type EventReciver func(event *Event) error
|
type EventReciverFunc func(event *Event)
|
||||||
|
|
||||||
type Event struct {
|
type Event struct {
|
||||||
Type EventType
|
Type EventType
|
||||||
@@ -19,10 +19,12 @@ type Event struct {
|
|||||||
|
|
||||||
type IEventProcessor interface {
|
type IEventProcessor interface {
|
||||||
NotifyEvent(*Event)
|
NotifyEvent(*Event)
|
||||||
OnEventHandler(event *Event) error
|
|
||||||
SetEventReciver(eventProcessor IEventProcessor)
|
SetEventReciver(eventProcessor IEventProcessor)
|
||||||
GetEventReciver() IEventProcessor
|
GetEventReciver() IEventProcessor
|
||||||
SetEventChanNum(num int32) bool
|
SetEventChanNum(num int32) bool
|
||||||
|
RegEventReciverFunc(eventType EventType,reciverFunc EventReciverFunc)
|
||||||
|
UnRegEventReciverFun(eventType EventType)
|
||||||
}
|
}
|
||||||
|
|
||||||
type EventProcessor struct {
|
type EventProcessor struct {
|
||||||
@@ -32,6 +34,18 @@ type EventProcessor struct {
|
|||||||
|
|
||||||
eventChanNumLocker sync.RWMutex
|
eventChanNumLocker sync.RWMutex
|
||||||
eventChanNum int32
|
eventChanNum int32
|
||||||
|
mapEventReciverFunc map[EventType]EventReciverFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (slf *EventProcessor) RegEventReciverFunc(eventType EventType,reciverFunc EventReciverFunc){
|
||||||
|
if slf.mapEventReciverFunc == nil {
|
||||||
|
slf.mapEventReciverFunc = map[EventType]EventReciverFunc{}
|
||||||
|
}
|
||||||
|
slf.mapEventReciverFunc[eventType] = reciverFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (slf *EventProcessor) UnRegEventReciverFun(eventType EventType){
|
||||||
|
delete(slf.mapEventReciverFunc,eventType)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *EventProcessor) NotifyEvent(pEvent *Event) {
|
func (slf *EventProcessor) NotifyEvent(pEvent *Event) {
|
||||||
@@ -41,9 +55,7 @@ func (slf *EventProcessor) NotifyEvent(pEvent *Event) {
|
|||||||
slf.EventChan <-pEvent
|
slf.EventChan <-pEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *EventProcessor) OnEventHandler(event *Event) error{
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (slf *EventProcessor) GetEventChan() chan *Event{
|
func (slf *EventProcessor) GetEventChan() chan *Event{
|
||||||
slf.eventChanNumLocker.Lock()
|
slf.eventChanNumLocker.Lock()
|
||||||
@@ -85,7 +97,7 @@ type IHttpEventData interface {
|
|||||||
Handle()
|
Handle()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *EventProcessor) EventHandler(processor IEventProcessor,ev *Event) {
|
func (slf *EventProcessor) EventHandler(ev *Event) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
buf := make([]byte, 4096)
|
buf := make([]byte, 4096)
|
||||||
@@ -99,7 +111,11 @@ func (slf *EventProcessor) EventHandler(processor IEventProcessor,ev *Event) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
processor.OnEventHandler(ev)
|
if fun,ok := slf.mapEventReciverFunc[ev.Type];ok == false{
|
||||||
|
return
|
||||||
|
}else{
|
||||||
|
fun(ev)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *EventProcessor) innerEventHandler(ev *Event) bool {
|
func (slf *EventProcessor) innerEventHandler(ev *Event) bool {
|
||||||
|
|||||||
@@ -9,6 +9,8 @@ import (
|
|||||||
"github.com/duanhf2012/origin/node"
|
"github.com/duanhf2012/origin/node"
|
||||||
"github.com/duanhf2012/origin/service"
|
"github.com/duanhf2012/origin/service"
|
||||||
"github.com/duanhf2012/origin/sysservice"
|
"github.com/duanhf2012/origin/sysservice"
|
||||||
|
"github.com/duanhf2012/origin/util/timer"
|
||||||
|
"net/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
type GateService struct {
|
type GateService struct {
|
||||||
@@ -29,9 +31,17 @@ func (slf *GateService) OnInit() error{
|
|||||||
slf.httpRouter.GET("/get/query", slf.HttpTest)
|
slf.httpRouter.GET("/get/query", slf.HttpTest)
|
||||||
slf.httpRouter.POST("/post/query", slf.HttpTestPost)
|
slf.httpRouter.POST("/post/query", slf.HttpTestPost)
|
||||||
slf.httpRouter.SetServeFile(sysservice.METHOD_GET,"/img/head/","d:/img")
|
slf.httpRouter.SetServeFile(sysservice.METHOD_GET,"/img/head/","d:/img")
|
||||||
|
|
||||||
|
pCronExpr,_ := timer.NewCronExpr("0 * * * * *")
|
||||||
|
slf.CronFunc(pCronExpr,slf.Test)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (slf *GateService) Test(){
|
||||||
|
fmt.Print("xxxxx\n")
|
||||||
|
}
|
||||||
|
|
||||||
func (slf *GateService) HttpTest(session *sysservice.HttpSession) {
|
func (slf *GateService) HttpTest(session *sysservice.HttpSession) {
|
||||||
session.SetHeader("a","b")
|
session.SetHeader("a","b")
|
||||||
session.Write([]byte("this is a test"))
|
session.Write([]byte("this is a test"))
|
||||||
@@ -57,10 +67,11 @@ func (slf *GateService) HttpTestPost(session *sysservice.HttpSession) {
|
|||||||
|
|
||||||
testa.AA = 100
|
testa.AA = 100
|
||||||
testa.BB = "this is a test"
|
testa.BB = "this is a test"
|
||||||
session.WriteJson("asdasda")
|
session.WriteJsonDone(http.StatusOK,"asdasda")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *GateService) OnEventHandler(ev *event.Event) error{
|
func (slf *GateService) OnEventHandler(ev *event.Event) error{
|
||||||
|
|
||||||
if ev.Type == event.Sys_Event_Tcp_RecvPack {
|
if ev.Type == event.Sys_Event_Tcp_RecvPack {
|
||||||
pPack := ev.Data.(*sysservice.TcpPack)
|
pPack := ev.Data.(*sysservice.TcpPack)
|
||||||
slf.processor.Route(ev.Data,pPack.ClientId)
|
slf.processor.Route(ev.Data,pPack.ClientId)
|
||||||
|
|||||||
@@ -111,3 +111,7 @@ func (tcpConn *TCPConn) ReadMsg() ([]byte, error) {
|
|||||||
func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error {
|
func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error {
|
||||||
return tcpConn.msgParser.Write(tcpConn, args...)
|
return tcpConn.msgParser.Write(tcpConn, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tcpConn *TCPConn) IsConnected() bool {
|
||||||
|
return tcpConn.closeFlag == false
|
||||||
|
}
|
||||||
@@ -219,3 +219,7 @@ func (slf *Client) Run(){
|
|||||||
|
|
||||||
func (slf *Client) OnClose(){
|
func (slf *Client) OnClose(){
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (slf *Client) IsConnected() bool {
|
||||||
|
return slf.conn!=nil && slf.conn.IsConnected()==true
|
||||||
|
}
|
||||||
@@ -2,7 +2,6 @@ package service
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/duanhf2012/origin/event"
|
|
||||||
"github.com/duanhf2012/origin/log"
|
"github.com/duanhf2012/origin/log"
|
||||||
"github.com/duanhf2012/origin/profiler"
|
"github.com/duanhf2012/origin/profiler"
|
||||||
"github.com/duanhf2012/origin/rpc"
|
"github.com/duanhf2012/origin/rpc"
|
||||||
@@ -130,7 +129,7 @@ func (slf *Service) Run() {
|
|||||||
if slf.profiler!=nil {
|
if slf.profiler!=nil {
|
||||||
analyzer = slf.profiler.Push(fmt.Sprintf("Event_%d", int(ev.Type)))
|
analyzer = slf.profiler.Push(fmt.Sprintf("Event_%d", int(ev.Type)))
|
||||||
}
|
}
|
||||||
slf.EventHandler(slf.this.(event.IEventProcessor),ev)
|
slf.EventHandler(ev)
|
||||||
if analyzer!=nil {
|
if analyzer!=nil {
|
||||||
analyzer.Pop()
|
analyzer.Pop()
|
||||||
analyzer = nil
|
analyzer = nil
|
||||||
|
|||||||
@@ -121,6 +121,7 @@ func NewHttpHttpRouter(eventReciver event.IEventProcessor) IHttpRouter {
|
|||||||
|
|
||||||
|
|
||||||
func (slf *HttpSession) Query(key string) (string, bool) {
|
func (slf *HttpSession) Query(key string) (string, bool) {
|
||||||
|
|
||||||
if slf.mapParam == nil {
|
if slf.mapParam == nil {
|
||||||
slf.mapParam = make(map[string]string)
|
slf.mapParam = make(map[string]string)
|
||||||
|
|
||||||
@@ -158,6 +159,14 @@ func (slf *HttpSession) AddHeader(key, value string) {
|
|||||||
slf.w.Header().Add(key,value)
|
slf.w.Header().Add(key,value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (slf *HttpSession) GetHeader(key string) string{
|
||||||
|
return slf.r.Header.Get(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (slf *HttpSession) DelHeader(key string) {
|
||||||
|
slf.r.Header.Del(key)
|
||||||
|
}
|
||||||
|
|
||||||
func (slf *HttpSession) WriteStatusCode(statusCode int){
|
func (slf *HttpSession) WriteStatusCode(statusCode int){
|
||||||
slf.statusCode = statusCode
|
slf.statusCode = statusCode
|
||||||
}
|
}
|
||||||
@@ -166,15 +175,18 @@ func (slf *HttpSession) Write(msg []byte) {
|
|||||||
slf.msg = msg
|
slf.msg = msg
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *HttpSession) WriteJson(msgJson interface{}) error {
|
func (slf *HttpSession) WriteJsonDone(statusCode int,msgJson interface{}) error {
|
||||||
msg, err := json.Marshal(msgJson)
|
msg, err := json.Marshal(msgJson)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
slf.Write(msg)
|
slf.Write(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
slf.Done()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
func (slf *HttpSession) flush() {
|
func (slf *HttpSession) flush() {
|
||||||
slf.w.WriteHeader(slf.statusCode)
|
slf.w.WriteHeader(slf.statusCode)
|
||||||
if slf.msg!=nil {
|
if slf.msg!=nil {
|
||||||
@@ -182,7 +194,7 @@ func (slf *HttpSession) flush() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *HttpSession) done(){
|
func (slf *HttpSession) Done(){
|
||||||
slf.sessionDone <- slf
|
slf.sessionDone <- slf
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -249,7 +261,7 @@ func (slf *HttpRouter) Router(session *HttpSession){
|
|||||||
if slf.httpFiltrateList!=nil {
|
if slf.httpFiltrateList!=nil {
|
||||||
for _,fun := range slf.httpFiltrateList{
|
for _,fun := range slf.httpFiltrateList{
|
||||||
if fun(session) == false {
|
if fun(session) == false {
|
||||||
session.done()
|
//session.done()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -268,7 +280,7 @@ func (slf *HttpRouter) Router(session *HttpSession){
|
|||||||
}
|
}
|
||||||
|
|
||||||
v.httpHandle(session)
|
v.httpHandle(session)
|
||||||
session.done()
|
//session.done()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -276,13 +288,13 @@ func (slf *HttpRouter) Router(session *HttpSession){
|
|||||||
idx := strings.Index(urlPath, k)
|
idx := strings.Index(urlPath, k)
|
||||||
if idx != -1 {
|
if idx != -1 {
|
||||||
session.fileData = v
|
session.fileData = v
|
||||||
session.done()
|
session.Done()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
session.WriteStatusCode(http.StatusNotFound)
|
session.WriteStatusCode(http.StatusNotFound)
|
||||||
session.done()
|
session.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *HttpService) SetHttpRouter(httpRouter IHttpRouter) {
|
func (slf *HttpService) SetHttpRouter(httpRouter IHttpRouter) {
|
||||||
|
|||||||
Reference in New Issue
Block a user