diff --git a/sysmodule/mysqlmondule/mysqlmodule.go b/sysmodule/mysqlmondule/mysqlmodule.go index f4cd5e1..8dbb297 100644 --- a/sysmodule/mysqlmondule/mysqlmodule.go +++ b/sysmodule/mysqlmondule/mysqlmodule.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "github.com/duanhf2012/origin/log" - "math/rand" "net/url" "reflect" "strconv" @@ -19,10 +18,6 @@ import ( type SyncFun func() -const ( - MAX_EXECUTE_FUN = 10000 -) - type DBExecute struct { syncExecuteFun chan SyncFun syncExecuteExit chan bool @@ -41,35 +36,19 @@ type MySQLModule struct { username string password string dbname string - maxconn int - PrintTime time.Duration - + slowDuration time.Duration pingCoroutine PingExecute - - syncCoroutineNum int - executeList []DBExecute waitGroup sync.WaitGroup } // Tx ... type Tx struct { tx *sql.Tx - PrintTime time.Duration + slowDuration time.Duration } // DBResult ... type DBResult struct { - Err error - LastInsertID int64 - RowsAffected int64 - res *sql.Rows - // 解码数据相关设置 - tag string - blur bool -} - -// DBResult ... -type DBResultEx struct { LastInsertID int64 RowsAffected int64 @@ -78,294 +57,79 @@ type DBResultEx struct { } type DataSetList struct { - dataSetList []DBResultEx + dataSetList []DBResult currentDataSetIdx int32 tag string blur bool } -// SyncDBResult ... -type SyncDBResult struct { - sres chan DBResult + +type dbcontrol interface { + Exec(query string, args ...interface{}) (sql.Result, error) + Query(query string, args ...interface{}) (*sql.Rows, error) } -type SyncDBResultExCallBack func(dataList *DataSetList, err error) -type SyncQueryDBResultEx struct { - sres chan *DataSetList - err chan error -} - -func (slf *SyncQueryDBResultEx) Get(timeoutMs int) (*DataSetList, error) { - timerC := time.NewTicker(time.Millisecond * time.Duration(timeoutMs)).C - select { - case <-timerC: - break - case err := <-slf.err: - dataset := <-slf.sres - return dataset, err - } - - return nil, fmt.Errorf("Getting the return result timeout [%d]ms", timeoutMs) -} - -type SyncExecuteDBResult struct { - sres chan *DBResultEx - err chan error -} - -func (slf *SyncExecuteDBResult) Get(timeoutMs int) (*DBResultEx, error) { - timerC := time.NewTicker(time.Millisecond * time.Duration(timeoutMs)).C - select { - case <-timerC: - break - case err := <-slf.err: - dataset := <-slf.sres - return dataset, err - } - - return nil, fmt.Errorf("Getting the return result timeout [%d]ms", timeoutMs) -} - -func (slf *MySQLModule) RunPing() { - for { - select { - case <-slf.pingCoroutine.pintExit: - log.Error("RunPing stopping %s...", fmt.Sprintf("%T", slf)) - return - case <-slf.pingCoroutine.tickerPing.C: - if slf.db != nil { - slf.db.Ping() - } - } - } -} - -func (slf *MySQLModule) Init(maxConn, executeNum int, url string, userName string, password string, dbname string) error { +func (slf *MySQLModule) Init( url string, userName string, password string, dbname string,maxConn int) error { slf.url = url - slf.maxconn = maxConn slf.username = userName slf.password = password slf.dbname = dbname - slf.syncCoroutineNum = executeNum - - if executeNum <= 0 { - return fmt.Errorf("executeNum mast more than zero:%d", executeNum) - } - - slf.executeList = []DBExecute{} - for i := 0; i < executeNum; i++ { - itemInfo := DBExecute{syncExecuteFun:make(chan SyncFun, MAX_EXECUTE_FUN), syncExecuteExit:make(chan bool, 1)} - slf.executeList = append(slf.executeList, itemInfo) - } slf.pingCoroutine = PingExecute{tickerPing : time.NewTicker(5*time.Second), pintExit : make(chan bool, 1)} - rand.Seed(time.Now().Unix()) - - return slf.Connect(slf.maxconn) + return slf.connect(maxConn) } -func (slf *MySQLModule) OnInit() error { - for i := 0; i < slf.syncCoroutineNum; i++ { - go slf.RunExecuteDBCoroutine(i) - } - go slf.RunPing() - - return nil +func (slf *MySQLModule) SetQuerySlowTime(slowDuration time.Duration) { + slf.slowDuration = slowDuration } -func (slf *MySQLModule) OnRelease() { - for i := 0; i < slf.syncCoroutineNum; i++ { - close(slf.executeList[i].syncExecuteExit) - } +func (slf *MySQLModule) Query(strQuery string, args ...interface{}) (*DataSetList, error) { + return query(slf.slowDuration,slf.db,strQuery,args...) } -//Close ... -func (slf *DBResult) Close() { - if slf.res != nil { - slf.res.Close() - } +// Exec ... +func (slf *MySQLModule) Exec(strSql string, args ...interface{}) (*DBResult, error) { + return exec(slf.slowDuration,slf.db,strSql,args...) } -//NextResult ... -func (slf *DBResult) NextResult() bool { - if slf.Err != nil || slf.res == nil { - return false - } - return slf.res.NextResultSet() -} - -// SetSpecificTag ... -func (slf *DBResult) SetSpecificTag(tag string) *DBResult { - slf.tag = tag - return slf -} - -// SetBlurMode ... -func (slf *DBResult) SetBlurMode(blur bool) *DBResult { - slf.blur = blur - return slf -} - -// UnMarshal ... -func (slf *DBResult) UnMarshal(out interface{}) error { - if slf.Err != nil { - return slf.Err - } - tbm, err := dbResult2Map(slf.res) +// Begin starts a transaction. +func (slf *MySQLModule) Begin() (*Tx, error) { + var txDBMoudule Tx + txdb, err := slf.db.Begin() if err != nil { - return err + log.Error("Begin error:%s", err.Error()) + return &txDBMoudule, err } - //fmt.Println(tbm) - v := reflect.ValueOf(out) - if v.Kind() != reflect.Ptr { - return errors.New("interface must be a pointer") - } - if v.Elem().Kind() == reflect.Struct { - if len(tbm) != 1 { - return fmt.Errorf("数据结果集的长度不匹配 len=%d", len(tbm)) - } - return slf.mapSingle2interface(tbm[0], v) - } - if v.Elem().Kind() == reflect.Slice { - return slf.mapSlice2interface(tbm, out) - } - return fmt.Errorf("错误的数据类型 %v", v.Elem().Kind()) + txDBMoudule.slowDuration = slf.slowDuration + txDBMoudule.tx = txdb + return &txDBMoudule, nil } -func dbResult2Map(rows *sql.Rows) ([]map[string]string, error) { - columns, err := rows.Columns() - if err != nil { - return nil, err - } - count := len(columns) - tableData := make([]map[string]string, 0) - values := make([]string, count) - valuePtrs := make([]interface{}, count) - for rows.Next() { - for i := 0; i < count; i++ { - valuePtrs[i] = &values[i] - } - err := rows.Scan(valuePtrs...) - if err != nil { - fmt.Println(err) - } - entry := make(map[string]string) - for i, col := range columns { - entry[strings.ToLower(col)] = values[i] - } - tableData = append(tableData, entry) - } - return tableData, nil +// Rollback aborts the transaction. +func (slf *Tx) Rollback() error { + return slf.tx.Rollback() } -func (slf *DBResult) mapSingle2interface(m map[string]string, v reflect.Value) error { - t := v.Type() - val := v.Elem() - typ := t.Elem() - - if !val.IsValid() { - return errors.New("数据类型不正确") - } - - for i := 0; i < val.NumField(); i++ { - value := val.Field(i) - kind := value.Kind() - tag := typ.Field(i).Tag.Get(slf.tag) - if tag == "" { - tag = typ.Field(i).Name - } - - if tag != "" && tag != "-" { - vtag := strings.Split(strings.ToLower(tag), ",") - meta, ok := m[vtag[0]] - if !ok { - if !slf.blur { - return fmt.Errorf("没有在结果集中找到对应的字段 %s", tag) - } - continue - } - if !value.CanSet() { - return errors.New("结构体字段没有读写权限") - } - if len(meta) == 0 { - continue - } - switch kind { - case reflect.String: - value.SetString(meta) - case reflect.Float32, reflect.Float64: - f, err := strconv.ParseFloat(meta, 64) - if err != nil { - return err - } - value.SetFloat(f) - case reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Int: - integer64, err := strconv.ParseInt(meta, 10, 64) - if err != nil { - return err - } - value.SetInt(integer64) - case reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uint: - integer64, err := strconv.ParseUint(meta, 10, 64) - if err != nil { - return err - } - value.SetUint(integer64) - case reflect.Bool: - b, err := strconv.ParseBool(meta) - if err != nil { - return err - } - value.SetBool(b) - default: - return errors.New("数据库映射存在不识别的数据类型") - } - } - } - return nil +// Commit commits the transaction. +func (slf *Tx) Commit() error { + return slf.tx.Commit() } -func (slf *MySQLModule) SetQuerySlowTime(Time time.Duration) { - slf.PrintTime = Time + +// QueryEx executes a query that return rows. +func (slf *Tx) Query(strQuery string, args ...interface{}) (*DataSetList, error) { + return query(slf.slowDuration,slf.tx,strQuery,args...) } -func (slf *MySQLModule) IsPrintTimeLog(Time time.Duration) bool { - if slf.PrintTime != 0 && Time >= slf.PrintTime { - return true - } - return false +// Exec executes a query that doesn't return rows. +func (slf *Tx) Exec(strSql string, args ...interface{}) (*DBResult, error) { + return exec(slf.slowDuration,slf.tx,strSql,args...) } -func (slf *DBResult) mapSlice2interface(data []map[string]string, in interface{}) error { - length := len(data) - - if length > 0 { - v := reflect.ValueOf(in).Elem() - newv := reflect.MakeSlice(v.Type(), 0, length) - v.Set(newv) - v.SetLen(length) - - for i := 0; i < length; i++ { - idxv := v.Index(i) - if idxv.Kind() == reflect.Ptr { - newObj := reflect.New(idxv.Type().Elem()) - v.Index(i).Set(newObj) - idxv = newObj - } else { - idxv = idxv.Addr() - } - err := slf.mapSingle2interface(data[i], idxv) - if err != nil { - return err - } - } - } - return nil -} // Connect ... -func (slf *MySQLModule) Connect(maxConn int) error { +func (slf *MySQLModule) connect(maxConn int) error { cmd := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8&parseTime=true&loc=%s&readTimeout=30s&timeout=15s&writeTimeout=30s", slf.username, slf.password, @@ -387,24 +151,26 @@ func (slf *MySQLModule) Connect(maxConn int) error { db.SetMaxIdleConns(maxConn) db.SetConnMaxLifetime(time.Second * 90) + go slf.runPing() + return nil } -// Get ... -func (slf *SyncDBResult) Get(timeoutMs int) DBResult { - timerC := time.NewTicker(time.Millisecond * time.Duration(timeoutMs)).C - select { - case <-timerC: - break - case rsp := <-slf.sres: - return rsp - } - return DBResult{ - Err: fmt.Errorf("Getting the return result timeout [%d]ms", timeoutMs), +func (slf *MySQLModule) runPing() { + for { + select { + case <-slf.pingCoroutine.pintExit: + log.Error("RunPing stopping %s...", fmt.Sprintf("%T", slf)) + return + case <-slf.pingCoroutine.tickerPing.C: + if slf.db != nil { + slf.db.Ping() + } + } } } -func (slf *MySQLModule) CheckArgs(args ...interface{}) error { +func checkArgs(args ...interface{}) error { for _, val := range args { if reflect.TypeOf(val).Kind() == reflect.String { retVal := val.(string) @@ -447,58 +213,38 @@ func (slf *MySQLModule) CheckArgs(args ...interface{}) error { return nil } -// Query ... -func (slf *MySQLModule) Query(query string, args ...interface{}) DBResult { - if slf.CheckArgs(args) != nil { - ret := DBResult{} - log.Error("CheckArgs is error :%s", query) - ret.Err = fmt.Errorf("CheckArgs is error!") - return ret - } - if slf.db == nil { - ret := DBResult{} - log.Error("cannot connect database:%s", query) - ret.Err = fmt.Errorf("cannot connect database!") - return ret - } - rows, err := slf.db.Query(query, args...) - if err != nil { - log.Error("Query:%s(%v)", query, err) - } - - return DBResult{ - Err: err, - res: rows, - tag: "json", - blur: true, +func checkSlow(slowDuration time.Duration,Time time.Duration) bool { + if slowDuration != 0 && Time >=slowDuration { + return true } + return false } -func (slf *MySQLModule) QueryEx(query string, args ...interface{}) (*DataSetList, error) { +func query(slowDuration time.Duration,db dbcontrol,strQuery string, args ...interface{}) (*DataSetList, error) { datasetList := DataSetList{} datasetList.tag = "json" datasetList.blur = true - if slf.CheckArgs(args) != nil { - log.Error("CheckArgs is error :%s", query) + if checkArgs(args) != nil { + log.Error("CheckArgs is error :%s", strQuery) return &datasetList, fmt.Errorf("CheckArgs is error!") } - if slf.db == nil { - log.Error("cannot connect database:%s", query) + if db == nil { + log.Error("cannot connect database:%s", strQuery) return &datasetList, fmt.Errorf("cannot connect database!") } TimeFuncStart := time.Now() - rows, err := slf.db.Query(query, args...) - TimeFuncPass := time.Since(TimeFuncStart) + rows, err := db.Query(strQuery, args...) + timeFuncPass := time.Since(TimeFuncStart) - if slf.IsPrintTimeLog(TimeFuncPass) { - log.Error("DBModule QueryEx Time %s , Query :%s , args :%+v", TimeFuncPass, query, args) + if checkSlow(slowDuration,timeFuncPass) { + log.Error("DBModule QueryEx Time %s , Query :%s , args :%+v", timeFuncPass, strQuery, args) } if err != nil { - log.Error("Query:%s(%v)", query, err) + log.Error("Query:%s(%v)", strQuery, err) if rows != nil { rows.Close() } @@ -507,7 +253,7 @@ func (slf *MySQLModule) QueryEx(query string, args ...interface{}) (*DataSetList defer rows.Close() for { - dbResult := DBResultEx{} + dbResult := DBResult{} //取出当前结果集所有行 for rows.Next() { if dbResult.RowInfo == nil { @@ -539,7 +285,7 @@ func (slf *MySQLModule) QueryEx(query string, args ...interface{}) (*DataSetList if hasRet == false { if rows.Err() != nil { - log.Error( "Query:%s(%+v)", query, rows) + log.Error( "Query:%s(%+v)", strQuery, rows) } break } @@ -548,76 +294,26 @@ func (slf *MySQLModule) QueryEx(query string, args ...interface{}) (*DataSetList return &datasetList, nil } -// SyncQuery ... -func (slf *MySQLModule) SyncQuery(queryHas int, query string, args ...interface{}) SyncQueryDBResultEx { - ret := SyncQueryDBResultEx{ - sres: make(chan *DataSetList, 1), - err: make(chan error, 1), - } - - chanIndex := queryHas % len(slf.executeList) - if chanIndex < 0 { - chanIndex = rand.Intn(len(slf.executeList)) - } - - if len(slf.executeList[chanIndex].syncExecuteFun) >= MAX_EXECUTE_FUN { - dbret := DataSetList{} - ret.err <- fmt.Errorf("chan is full,sql:%s", query) - ret.sres <- &dbret - - return ret - } - - slf.executeList[chanIndex].syncExecuteFun <- func() { - rsp, err := slf.QueryEx(query, args...) - ret.err <- err - ret.sres <- rsp - } - - return ret -} - -func (slf *MySQLModule) AsyncQuery(call SyncDBResultExCallBack, queryHas int, query string, args ...interface{}) error { - chanIndex := queryHas % len(slf.executeList) - if chanIndex < 0 { - chanIndex = rand.Intn(len(slf.executeList)) - } - - if len(slf.executeList[chanIndex].syncExecuteFun) >= MAX_EXECUTE_FUN { - return fmt.Errorf("chan is full,sql:%s", query) - } - - slf.executeList[chanIndex].syncExecuteFun <- func() { - rsp, err := slf.QueryEx(query, args...) - call(rsp, err) - return - } - - return nil -} - -// Exec ... -func (slf *MySQLModule) Exec(query string, args ...interface{}) (*DBResultEx, error) { - ret := &DBResultEx{} - if slf.db == nil { - log.Error("cannot connect database:%s", query) +func exec(slowDuration time.Duration,db dbcontrol,strSql string, args ...interface{}) (*DBResult, error) { + ret := &DBResult{} + if db == nil { + log.Error("cannot connect database:%s", strSql) return ret, fmt.Errorf("cannot connect database!") } - if slf.CheckArgs(args) != nil { - log.Error("CheckArgs is error :%s", query) - //return ret, fmt.Errorf("cannot connect database!") + if checkArgs(args) != nil { + log.Error("CheckArgs is error :%s", strSql) return ret, fmt.Errorf("CheckArgs is error!") } TimeFuncStart := time.Now() - res, err := slf.db.Exec(query, args...) - TimeFuncPass := time.Since(TimeFuncStart) - if slf.IsPrintTimeLog(TimeFuncPass) { - log.Error("DBModule QueryEx Time %s , Query :%s , args :%+v", TimeFuncPass, query, args) + res, err := db.Exec(strSql, args...) + timeFuncPass := time.Since(TimeFuncStart) + if checkSlow(slowDuration,timeFuncPass) { + log.Error("DBModule QueryEx Time %s , Query :%s , args :%+v", timeFuncPass, strSql, args) } if err != nil { - log.Error("Exec:%s(%v)", query, err) + log.Error("Exec:%s(%v)", strSql, err) return nil, err } @@ -627,74 +323,6 @@ func (slf *MySQLModule) Exec(query string, args ...interface{}) (*DBResultEx, er return ret, nil } -// SyncExec ... -func (slf *MySQLModule) SyncExec(queryHas int, query string, args ...interface{}) *SyncExecuteDBResult { - ret := &SyncExecuteDBResult{ - sres: make(chan *DBResultEx, 1), - err: make(chan error, 1), - } - - chanIndex := queryHas % len(slf.executeList) - if chanIndex < 0 { - chanIndex = rand.Intn(len(slf.executeList)) - } - - if len(slf.executeList[chanIndex].syncExecuteFun) >= MAX_EXECUTE_FUN { - ret.err <- fmt.Errorf("chan is full,sql:%s", query) - return ret - } - - slf.executeList[chanIndex].syncExecuteFun <- func() { - rsp, err := slf.Exec(query, args...) - if err != nil { - ret.err <- err - return - } - - ret.sres <- rsp - return - } - - return ret -} - -func (slf *MySQLModule) AsyncExec(call SyncDBResultExCallBack, queryHas int, query string, args ...interface{}) error { - chanIndex := queryHas % len(slf.executeList) - if chanIndex < 0 { - chanIndex = rand.Intn(len(slf.executeList)) - } - - if len(slf.executeList[chanIndex].syncExecuteFun) >= MAX_EXECUTE_FUN { - return fmt.Errorf("chan is full,sql:%s", query) - } - - slf.executeList[chanIndex].syncExecuteFun <- func() { - rsp, err := slf.Exec(query, args...) - - data := DataSetList{tag:"json", blur:true, dataSetList:[]DBResultEx{}} - data.dataSetList = append(data.dataSetList, *rsp) - call(&data, err) - - return - } - - return nil -} - -func (slf *MySQLModule) RunExecuteDBCoroutine(has int) { - slf.waitGroup.Add(1) - defer slf.waitGroup.Done() - for { - select { - case <-slf.executeList[has].syncExecuteExit: - log.Error("stopping module %s...", fmt.Sprintf("%T", slf)) - return - case fun := <-slf.executeList[has].syncExecuteFun: - fun() - } - } -} - func (slf *DataSetList) UnMarshal(args ...interface{}) error { if len(slf.dataSetList) != len(args) { return errors.New(fmt.Sprintf("Data set len(%d,%d) is not equal to args!", len(slf.dataSetList), len(args))) @@ -841,211 +469,3 @@ func (slf *DataSetList) rowData2interface(rowIdx int, m map[string][]interface{} } return nil } - -// Begin starts a transaction. -func (slf *MySQLModule) Begin() (*Tx, error) { - var txDBMoudule Tx - txdb, err := slf.db.Begin() - if err != nil { - log.Error("Begin error:%s", err.Error()) - return &txDBMoudule, err - } - txDBMoudule.tx = txdb - return &txDBMoudule, nil -} - -// Rollback aborts the transaction. -func (slf *Tx) Rollback() error { - return slf.tx.Rollback() -} - -// Commit commits the transaction. -func (slf *Tx) Commit() error { - return slf.tx.Commit() -} - -// CheckArgs... -func (slf *Tx) CheckArgs(args ...interface{}) error { - for _, val := range args { - if reflect.TypeOf(val).Kind() == reflect.String { - retVal := val.(string) - if strings.Contains(retVal, "-") == true { - return fmt.Errorf("CheckArgs is error arg is %+v", retVal) - } - if strings.Contains(retVal, "#") == true { - return fmt.Errorf("CheckArgs is error arg is %+v", retVal) - } - if strings.Contains(retVal, "&") == true { - return fmt.Errorf("CheckArgs is error arg is %+v", retVal) - } - if strings.Contains(retVal, "=") == true { - return fmt.Errorf("CheckArgs is error arg is %+v", retVal) - } - if strings.Contains(retVal, "%") == true { - return fmt.Errorf("CheckArgs is error arg is %+v", retVal) - } - if strings.Contains(retVal, "'") == true { - return fmt.Errorf("CheckArgs is error arg is %+v", retVal) - } - if strings.Contains(strings.ToLower(retVal), "delete ") == true { - return fmt.Errorf("CheckArgs is error arg is %+v", retVal) - } - if strings.Contains(strings.ToLower(retVal), "truncate ") == true { - return fmt.Errorf("CheckArgs is error arg is %+v", retVal) - } - if strings.Contains(strings.ToLower(retVal), " or ") == true { - return fmt.Errorf("CheckArgs is error arg is %+v", retVal) - } - if strings.Contains(strings.ToLower(retVal), "from ") == true { - return fmt.Errorf("CheckArgs is error arg is %+v", retVal) - } - if strings.Contains(strings.ToLower(retVal), "set ") == true { - return fmt.Errorf("CheckArgs is error arg is %+v", retVal) - } - } - } - - return nil -} - -// Query executes a query that returns rows, typically a SELECT. -func (slf *Tx) Query(query string, args ...interface{}) DBResult { - if slf.CheckArgs(args) != nil { - ret := DBResult{} - log.Error("CheckArgs is error :%s", query) - ret.Err = fmt.Errorf("CheckArgs is error!") - return ret - } - - if slf.tx == nil { - ret := DBResult{} - log.Error("cannot connect database:%s", query) - ret.Err = fmt.Errorf("cannot connect database!") - return ret - } - - rows, err := slf.tx.Query(query, args...) - if err != nil { - log.Error("Tx Query:%s(%v)", query, err) - } - - return DBResult{ - Err: err, - res: rows, - tag: "json", - blur: true, - } -} - -// IsPrintTimeLog... -func (slf *Tx) IsPrintTimeLog(Time time.Duration) bool { - if slf.PrintTime != 0 && Time >= slf.PrintTime { - return true - } - return false -} - -// QueryEx executes a query that return rows. -func (slf *Tx) QueryEx(query string, args ...interface{}) (*DataSetList, error) { - datasetList := DataSetList{} - datasetList.tag = "json" - datasetList.blur = true - - if slf.CheckArgs(args) != nil { - log.Error("CheckArgs is error :%s", query) - return &datasetList, fmt.Errorf("CheckArgs is error!") - } - - if slf.tx == nil { - log.Error("cannot connect database:%s", query) - return &datasetList, fmt.Errorf("cannot connect database!") - } - - TimeFuncStart := time.Now() - rows, err := slf.tx.Query(query, args...) - TimeFuncPass := time.Since(TimeFuncStart) - - if slf.IsPrintTimeLog(TimeFuncPass) { - log.Error("Tx QueryEx Time %s , Query :%s , args :%+v", TimeFuncPass, query, args) - } - if err != nil { - log.Error("Tx Query:%s(%v)", query, err) - if rows != nil { - rows.Close() - } - return &datasetList, err - } - defer rows.Close() - - for { - dbResult := DBResultEx{} - //取出当前结果集所有行 - for rows.Next() { - if dbResult.RowInfo == nil { - dbResult.RowInfo = make(map[string][]interface{}) - } - //RowInfo map[string][][]sql.NullString //map[fieldname][row][column]sql.NullString - colField, err := rows.Columns() - if err != nil { - return &datasetList, err - } - count := len(colField) - valuePtrs := make([]interface{}, count) - for i := 0; i < count; i++ { - valuePtrs[i] = &sql.NullString{} - } - rows.Scan(valuePtrs...) - - for idx, fieldname := range colField { - fieldRowData := dbResult.RowInfo[strings.ToLower(fieldname)] - fieldRowData = append(fieldRowData, valuePtrs[idx]) - dbResult.RowInfo[strings.ToLower(fieldname)] = fieldRowData - } - dbResult.rowNum += 1 - } - - datasetList.dataSetList = append(datasetList.dataSetList, dbResult) - //取下一个结果集 - hasRet := rows.NextResultSet() - - if hasRet == false { - if rows.Err() != nil { - log.Error("Tx Query:%s(%+v)", query, rows) - } - break - } - } - - return &datasetList, nil -} - -// Exec executes a query that doesn't return rows. -func (slf *Tx) Exec(query string, args ...interface{}) (*DBResultEx, error) { - ret := &DBResultEx{} - if slf.tx == nil { - log.Error("cannot connect database:%s", query) - return ret, fmt.Errorf("cannot connect database!") - } - - if slf.CheckArgs(args) != nil { - log.Error("CheckArgs is error :%s", query) - //return ret, fmt.Errorf("cannot connect database!") - return ret, fmt.Errorf("CheckArgs is error!") - } - - TimeFuncStart := time.Now() - res, err := slf.tx.Exec(query, args...) - TimeFuncPass := time.Since(TimeFuncStart) - if slf.IsPrintTimeLog(TimeFuncPass) { - log.Error("Tx QueryEx Time %s , Query :%s , args :%+v", TimeFuncPass, query, args) - } - if err != nil { - log.Error("Tx Exec:%s(%v)", query, err) - return nil, err - } - - ret.LastInsertID, _ = res.LastInsertId() - ret.RowsAffected, _ = res.RowsAffected() - - return ret, nil -} diff --git a/sysmodule/mysqlmondule/mysqlmodule_test.go b/sysmodule/mysqlmondule/mysqlmodule_test.go new file mode 100644 index 0000000..8152933 --- /dev/null +++ b/sysmodule/mysqlmondule/mysqlmodule_test.go @@ -0,0 +1,48 @@ +package mysqlmondule + +import ( + "testing" + "fmt" + "time" +) + +func Test_Example(t *testing.T) { + //初始化连接 + test := MySQLModule{} + err := test.Init("127.0.0.1:3306","root","123456","dbname",10) + if err !=nil { + fmt.Print(err) + return + } + + //设置1秒慢查询监控 + test.SetQuerySlowTime(1*time.Second) + + //查询语句 + //存储过程可以call sp_querytest(?,?,?)方式 + result,err := test.Query("select Id,Name from UserInfo where SvrAreaId=?",18) + if err!=nil { + fmt.Print(err) + }else{ + dbRet := []struct { + Id int `json:"Id"` + Name string `json:"Name"` + }{} + + //从结构集中返序列化数据到结构体切片中,UnMarshal可以支持多个结果集 + err = result.UnMarshal(&dbRet) + if err !=nil { + fmt.Print(err) + } + } + + //exec sql + //存储过程可以call sp_updatetest(?,?,?)方式 + execResult,execErr := test.Exec("update UserInfo set Name=? where Id=?","nickname",1000) + if execErr!=nil { + fmt.Print(execErr) + }else{ + fmt.Print(*execResult) + } + +} \ No newline at end of file