优化异步接口

This commit is contained in:
boyce
2019-02-18 13:26:01 +08:00
parent e3e16eefb4
commit 0347e7652c

View File

@@ -14,6 +14,12 @@ import (
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
) )
type SyncFun func()
const (
MAX_EXECUTE_FUN = 10000
)
// DBModule ... // DBModule ...
type DBModule struct { type DBModule struct {
service.BaseModule service.BaseModule
@@ -22,6 +28,9 @@ type DBModule struct {
UserName string UserName string
Password string Password string
DBName string DBName string
syncExecuteFun chan SyncFun
syncCoroutineNum int
} }
// DBResult ... // DBResult ...
@@ -37,7 +46,8 @@ type DBResult struct {
//OnInit ... //OnInit ...
func (slf *DBModule) OnInit() error { func (slf *DBModule) OnInit() error {
return slf.Connect(100) slf.syncExecuteFun = make(chan SyncFun, MAX_EXECUTE_FUN)
return slf.Connect(10)
} }
// Next ... // Next ...
@@ -238,6 +248,10 @@ func (slf *DBModule) Connect(maxConn int) error {
db.SetMaxIdleConns(maxConn) db.SetMaxIdleConns(maxConn)
db.SetConnMaxLifetime(time.Second * 90) db.SetConnMaxLifetime(time.Second * 90)
slf.syncCoroutineNum = maxConn
for i := 0; i < slf.syncCoroutineNum; i++ {
go slf.RunExecuteDBCoroutine()
}
return nil return nil
} }
@@ -274,10 +288,20 @@ func (slf *DBModule) SyncQuery(query string, args ...interface{}) SyncDBResult {
ret := SyncDBResult{ ret := SyncDBResult{
sres: make(chan DBResult, 1), sres: make(chan DBResult, 1),
} }
go func() {
if len(slf.syncExecuteFun) >= MAX_EXECUTE_FUN {
dbret := DBResult{}
dbret.Err = fmt.Errorf("chan is full,sql:%s", query)
ret.sres <- dbret
return ret
}
slf.syncExecuteFun <- func() {
rsp := slf.Query(query, args...) rsp := slf.Query(query, args...)
ret.sres <- rsp ret.sres <- rsp
}() }
return ret return ret
} }
@@ -301,9 +325,36 @@ func (slf *DBModule) SyncExec(query string, args ...interface{}) SyncDBResult {
ret := SyncDBResult{ ret := SyncDBResult{
sres: make(chan DBResult, 1), sres: make(chan DBResult, 1),
} }
go func() {
if len(slf.syncExecuteFun) >= MAX_EXECUTE_FUN {
dbret := DBResult{}
dbret.Err = fmt.Errorf("chan is full,sql:%s", query)
ret.sres <- dbret
return ret
}
slf.syncExecuteFun <- func() {
rsp := slf.Exec(query, args...) rsp := slf.Exec(query, args...)
ret.sres <- rsp ret.sres <- rsp
}() }
return ret return ret
} }
func (slf *DBModule) RunExecuteDBCoroutine() {
slf.WaitGroup.Add(1)
defer slf.WaitGroup.Done()
for {
select {
case <-slf.ExitChan:
service.GetLogger().Printf(LEVER_WARN, "stopping module %s...", fmt.Sprintf("%T", slf))
fmt.Println("stopping module %s...", fmt.Sprintf("%T", slf))
return
case fun := <-slf.syncExecuteFun:
fun()
default:
}
}
}