apply go-level optimizations for lock-free snapshots, pooling, worker flow and typed errors

This commit is contained in:
DBT
2026-02-24 09:43:40 +00:00
parent 162909864a
commit dd705e5e93
4 changed files with 73 additions and 49 deletions

View File

@@ -11,6 +11,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"clawgo/pkg/bus" "clawgo/pkg/bus"
"clawgo/pkg/config" "clawgo/pkg/config"
@@ -27,6 +28,7 @@ type Manager struct {
dispatchSem chan struct{} dispatchSem chan struct{}
outboundLimit *rate.Limiter outboundLimit *rate.Limiter
mu sync.RWMutex mu sync.RWMutex
snapshot atomic.Value // map[string]Channel
} }
type asyncTask struct { type asyncTask struct {
@@ -42,6 +44,7 @@ func NewManager(cfg *config.Config, messageBus *bus.MessageBus) (*Manager, error
dispatchSem: make(chan struct{}, 32), dispatchSem: make(chan struct{}, 32),
outboundLimit: rate.NewLimiter(rate.Limit(40), 80), outboundLimit: rate.NewLimiter(rate.Limit(40), 80),
} }
m.snapshot.Store(map[string]Channel{})
if err := m.initChannels(); err != nil { if err := m.initChannels(); err != nil {
return nil, err return nil, err
@@ -159,10 +162,19 @@ func (m *Manager) initChannels() error {
logger.InfoCF("channels", "Channel initialization completed", map[string]interface{}{ logger.InfoCF("channels", "Channel initialization completed", map[string]interface{}{
"enabled_channels": len(m.channels), "enabled_channels": len(m.channels),
}) })
m.refreshSnapshot()
return nil return nil
} }
func (m *Manager) refreshSnapshot() {
next := make(map[string]Channel, len(m.channels))
for k, v := range m.channels {
next[k] = v
}
m.snapshot.Store(next)
}
func (m *Manager) StartAll(ctx context.Context) error { func (m *Manager) StartAll(ctx context.Context) error {
m.mu.Lock() m.mu.Lock()
if len(m.channels) == 0 { if len(m.channels) == 0 {
@@ -276,9 +288,8 @@ func (m *Manager) dispatchOutbound(ctx context.Context) {
return return
} }
m.mu.RLock() cur, _ := m.snapshot.Load().(map[string]Channel)
channel, exists := m.channels[msg.Channel] channel, exists := cur[msg.Channel]
m.mu.RUnlock()
if !exists { if !exists {
logger.WarnCF("channels", "Unknown channel for outbound message", map[string]interface{}{ logger.WarnCF("channels", "Unknown channel for outbound message", map[string]interface{}{
@@ -323,29 +334,24 @@ func (m *Manager) dispatchOutbound(ctx context.Context) {
} }
func (m *Manager) GetChannel(name string) (Channel, bool) { func (m *Manager) GetChannel(name string) (Channel, bool) {
m.mu.RLock() cur, _ := m.snapshot.Load().(map[string]Channel)
defer m.mu.RUnlock() channel, ok := cur[name]
channel, ok := m.channels[name]
return channel, ok return channel, ok
} }
func (m *Manager) GetStatus() map[string]interface{} { func (m *Manager) GetStatus() map[string]interface{} {
m.mu.RLock() cur, _ := m.snapshot.Load().(map[string]Channel)
defer m.mu.RUnlock() status := make(map[string]interface{}, len(cur))
for name := range cur {
status := make(map[string]interface{})
for name := range m.channels {
status[name] = map[string]interface{}{} status[name] = map[string]interface{}{}
} }
return status return status
} }
func (m *Manager) GetEnabledChannels() []string { func (m *Manager) GetEnabledChannels() []string {
m.mu.RLock() cur, _ := m.snapshot.Load().(map[string]Channel)
defer m.mu.RUnlock() names := make([]string, 0, len(cur))
for name := range cur {
names := make([]string, 0, len(m.channels))
for name := range m.channels {
names = append(names, name) names = append(names, name)
} }
return names return names
@@ -355,12 +361,14 @@ func (m *Manager) RegisterChannel(name string, channel Channel) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
m.channels[name] = channel m.channels[name] = channel
m.refreshSnapshot()
} }
func (m *Manager) UnregisterChannel(name string) { func (m *Manager) UnregisterChannel(name string) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
delete(m.channels, name) delete(m.channels, name)
m.refreshSnapshot()
} }
func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, content string) error { func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, content string) error {

8
pkg/tools/errors.go Normal file
View File

@@ -0,0 +1,8 @@
package tools
import "errors"
var (
ErrUnsupportedAction = errors.New("unsupported action")
ErrMissingField = errors.New("missing required field")
)

View File

@@ -3,6 +3,9 @@ package tools
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"sync"
"clawgo/pkg/bus" "clawgo/pkg/bus"
) )
@@ -14,6 +17,8 @@ type MessageTool struct {
defaultChatID string defaultChatID string
} }
var buttonRowPool = sync.Pool{New: func() interface{} { return make([]bus.Button, 0, 8) }}
func NewMessageTool() *MessageTool { func NewMessageTool() *MessageTool {
return &MessageTool{} return &MessageTool{}
} }
@@ -93,6 +98,7 @@ func (t *MessageTool) SetSendCallback(callback SendCallback) {
func (t *MessageTool) Execute(ctx context.Context, args map[string]interface{}) (string, error) { func (t *MessageTool) Execute(ctx context.Context, args map[string]interface{}) (string, error) {
action, _ := args["action"].(string) action, _ := args["action"].(string)
action = strings.ToLower(strings.TrimSpace(action))
if action == "" { if action == "" {
action = "send" action = "send"
} }
@@ -106,22 +112,22 @@ func (t *MessageTool) Execute(ctx context.Context, args map[string]interface{})
switch action { switch action {
case "send": case "send":
if content == "" { if content == "" {
return "", fmt.Errorf("message/content is required for action=send") return "", fmt.Errorf("%w: message/content for action=send", ErrMissingField)
} }
case "edit": case "edit":
if messageID == "" || content == "" { if messageID == "" || content == "" {
return "", fmt.Errorf("message_id and message/content are required for action=edit") return "", fmt.Errorf("%w: message_id and message/content for action=edit", ErrMissingField)
} }
case "delete": case "delete":
if messageID == "" { if messageID == "" {
return "", fmt.Errorf("message_id is required for action=delete") return "", fmt.Errorf("%w: message_id for action=delete", ErrMissingField)
} }
case "react": case "react":
if messageID == "" || emoji == "" { if messageID == "" || emoji == "" {
return "", fmt.Errorf("message_id and emoji are required for action=react") return "", fmt.Errorf("%w: message_id and emoji for action=react", ErrMissingField)
} }
default: default:
return fmt.Sprintf("Unsupported action: %s", action), nil return "", fmt.Errorf("%w: %s", ErrUnsupportedAction, action)
} }
channel, _ := args["channel"].(string) channel, _ := args["channel"].(string)
@@ -149,7 +155,8 @@ func (t *MessageTool) Execute(ctx context.Context, args map[string]interface{})
if btns, ok := args["buttons"].([]interface{}); ok { if btns, ok := args["buttons"].([]interface{}); ok {
for _, row := range btns { for _, row := range btns {
if rowArr, ok := row.([]interface{}); ok { if rowArr, ok := row.([]interface{}); ok {
var buttonRow []bus.Button pooled := buttonRowPool.Get().([]bus.Button)
buttonRow := pooled[:0]
for _, b := range rowArr { for _, b := range rowArr {
if bMap, ok := b.(map[string]interface{}); ok { if bMap, ok := b.(map[string]interface{}); ok {
text, _ := bMap["text"].(string) text, _ := bMap["text"].(string)
@@ -160,8 +167,10 @@ func (t *MessageTool) Execute(ctx context.Context, args map[string]interface{})
} }
} }
if len(buttonRow) > 0 { if len(buttonRow) > 0 {
buttons = append(buttons, buttonRow) copied := append([]bus.Button(nil), buttonRow...)
buttons = append(buttons, copied)
} }
buttonRowPool.Put(buttonRow[:0])
} }
} }
} }

View File

@@ -4,32 +4,38 @@ import (
"context" "context"
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"time" "time"
"clawgo/pkg/logger" "clawgo/pkg/logger"
) )
type ToolRegistry struct { type ToolRegistry struct {
tools map[string]Tool tools map[string]Tool
mu sync.RWMutex mu sync.RWMutex
snapshot atomic.Value // map[string]Tool (copy-on-write)
} }
func NewToolRegistry() *ToolRegistry { func NewToolRegistry() *ToolRegistry {
return &ToolRegistry{ r := &ToolRegistry{tools: make(map[string]Tool)}
tools: make(map[string]Tool), r.snapshot.Store(map[string]Tool{})
} return r
} }
func (r *ToolRegistry) Register(tool Tool) { func (r *ToolRegistry) Register(tool Tool) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
r.tools[tool.Name()] = tool r.tools[tool.Name()] = tool
next := make(map[string]Tool, len(r.tools))
for k, v := range r.tools {
next[k] = v
}
r.snapshot.Store(next)
} }
func (r *ToolRegistry) Get(name string) (Tool, bool) { func (r *ToolRegistry) Get(name string) (Tool, bool) {
r.mu.RLock() cur, _ := r.snapshot.Load().(map[string]Tool)
defer r.mu.RUnlock() tool, ok := cur[name]
tool, ok := r.tools[name]
return tool, ok return tool, ok
} }
@@ -73,11 +79,9 @@ func (r *ToolRegistry) Execute(ctx context.Context, name string, args map[string
} }
func (r *ToolRegistry) GetDefinitions() []map[string]interface{} { func (r *ToolRegistry) GetDefinitions() []map[string]interface{} {
r.mu.RLock() cur, _ := r.snapshot.Load().(map[string]Tool)
defer r.mu.RUnlock() definitions := make([]map[string]interface{}, 0, len(cur))
for _, tool := range cur {
definitions := make([]map[string]interface{}, 0, len(r.tools))
for _, tool := range r.tools {
definitions = append(definitions, ToolToSchema(tool)) definitions = append(definitions, ToolToSchema(tool))
} }
return definitions return definitions
@@ -85,11 +89,9 @@ func (r *ToolRegistry) GetDefinitions() []map[string]interface{} {
// List returns a list of all registered tool names. // List returns a list of all registered tool names.
func (r *ToolRegistry) List() []string { func (r *ToolRegistry) List() []string {
r.mu.RLock() cur, _ := r.snapshot.Load().(map[string]Tool)
defer r.mu.RUnlock() names := make([]string, 0, len(cur))
for name := range cur {
names := make([]string, 0, len(r.tools))
for name := range r.tools {
names = append(names, name) names = append(names, name)
} }
return names return names
@@ -97,19 +99,16 @@ func (r *ToolRegistry) List() []string {
// Count returns the number of registered tools. // Count returns the number of registered tools.
func (r *ToolRegistry) Count() int { func (r *ToolRegistry) Count() int {
r.mu.RLock() cur, _ := r.snapshot.Load().(map[string]Tool)
defer r.mu.RUnlock() return len(cur)
return len(r.tools)
} }
// GetSummaries returns human-readable summaries of all registered tools. // GetSummaries returns human-readable summaries of all registered tools.
// Returns a slice of "name - description" strings. // Returns a slice of "name - description" strings.
func (r *ToolRegistry) GetSummaries() []string { func (r *ToolRegistry) GetSummaries() []string {
r.mu.RLock() cur, _ := r.snapshot.Load().(map[string]Tool)
defer r.mu.RUnlock() summaries := make([]string, 0, len(cur))
for _, tool := range cur {
summaries := make([]string, 0, len(r.tools))
for _, tool := range r.tools {
summaries = append(summaries, fmt.Sprintf("- `%s` - %s", tool.Name(), tool.Description())) summaries = append(summaries, fmt.Sprintf("- `%s` - %s", tool.Name(), tool.Description()))
} }
return summaries return summaries