Files
clawgo/cmd/gateway_runtime.go
2026-05-10 17:43:06 +08:00

342 lines
10 KiB
Go

package main
import (
"context"
"fmt"
"strings"
"time"
"github.com/YspCoder/clawgo/pkg/agent"
"github.com/YspCoder/clawgo/pkg/api"
"github.com/YspCoder/clawgo/pkg/bus"
"github.com/YspCoder/clawgo/pkg/channels"
"github.com/YspCoder/clawgo/pkg/config"
"github.com/YspCoder/clawgo/pkg/cron"
"github.com/YspCoder/clawgo/pkg/heartbeat"
"github.com/YspCoder/clawgo/pkg/logger"
"github.com/YspCoder/clawgo/pkg/providers"
"github.com/YspCoder/clawgo/pkg/sentinel"
)
type gatewayRuntimeState struct {
cfg *config.Config
agentLoop *agent.AgentLoop
channelManager *channels.Manager
heartbeatService *heartbeat.HeartbeatService
sentinelService *sentinel.Service
}
func buildGatewayRuntime(ctx context.Context, cfg *config.Config, msgBus *bus.MessageBus, cronService *cron.CronService) (*agent.AgentLoop, *channels.Manager, error) {
provider, err := providers.CreateProvider(cfg)
if err != nil {
return nil, nil, fmt.Errorf("create provider: %w", err)
}
agentLoop := agent.NewAgentLoop(cfg, msgBus, provider, cronService)
agentLoop.SetConfigPath(getConfigPath())
startupInfo := agentLoop.GetStartupInfo()
toolsInfo := startupInfo["tools"].(map[string]interface{})
skillsInfo := startupInfo["skills"].(map[string]interface{})
fmt.Println("\nAgent Status:")
fmt.Printf(" - Tools: %d loaded\n", toolsInfo["count"])
fmt.Printf(" - Skills: %d/%d available\n", skillsInfo["available"], skillsInfo["total"])
logger.InfoCF("agent", logger.C0098,
map[string]interface{}{
"tools_count": toolsInfo["count"],
"skills_total": skillsInfo["total"],
"skills_available": skillsInfo["available"],
})
channelManager, err := channels.NewManager(cfg, msgBus)
if err != nil {
return nil, nil, fmt.Errorf("create channel manager: %w", err)
}
return agentLoop, channelManager, nil
}
func bindAgentLoopHandlers(registryServer *api.Server, loop *agent.AgentLoop) {
registryServer.SetChatHandler(func(cctx context.Context, sessionKey, content string) (string, error) {
if strings.TrimSpace(content) == "" {
return "", nil
}
return loop.ProcessDirect(cctx, content, sessionKey)
})
registryServer.SetChatHistoryHandler(func(query api.ChatHistoryQuery) []map[string]interface{} {
h := loop.GetSessionHistory(query.Session)
if query.Around > 0 || query.Before > 0 || query.After > 0 || query.Limit > 0 {
h = loop.GetSessionHistoryWindow(query.Session, query.Around, query.Before, query.After, query.Limit)
}
out := make([]map[string]interface{}, 0, len(h))
for _, m := range h {
entry := map[string]interface{}{"role": m.Role, "content": m.Content}
if strings.TrimSpace(m.ToolCallID) != "" {
entry["tool_call_id"] = m.ToolCallID
}
if len(m.ToolCalls) > 0 {
entry["tool_calls"] = m.ToolCalls
}
out = append(out, entry)
}
return out
})
registryServer.SetSessionSearchHandler(func(query api.SessionSearchQuery) []map[string]interface{} {
excludeKey := ""
if query.ExcludeCurrent {
excludeKey = strings.TrimSpace(query.Session)
}
results := loop.SearchSessions(query.Query, query.Kinds, excludeKey, query.Limit)
out := make([]map[string]interface{}, 0, len(results))
for _, item := range results {
entry := map[string]interface{}{
"key": item.Key,
"kind": item.Kind,
"updated_at": item.UpdatedAt.UnixMilli(),
"summary": item.Summary,
"score": item.Score,
}
snippets := make([]map[string]interface{}, 0, len(item.Snippets))
for _, snippet := range item.Snippets {
snippets = append(snippets, map[string]interface{}{
"seq": snippet.Seq,
"role": snippet.Role,
"segment": snippet.Segment,
"content": snippet.Content,
})
}
entry["snippets"] = snippets
out = append(out, entry)
}
return out
})
registryServer.SetToolsCatalogHandler(func() interface{} {
return loop.GetToolCatalog()
})
}
func bindCronHandler(registryServer *api.Server, cronService *cron.CronService) {
registryServer.SetCronHandler(func(action string, args map[string]interface{}) (interface{}, error) {
getStr := func(k string) string {
v, _ := args[k].(string)
return strings.TrimSpace(v)
}
getBoolPtr := func(k string) *bool {
v, ok := args[k].(bool)
if !ok {
return nil
}
vv := v
return &vv
}
switch strings.ToLower(strings.TrimSpace(action)) {
case "", "list":
return cronService.ListJobs(true), nil
case "get":
id := getStr("id")
if id == "" {
return nil, fmt.Errorf("id required")
}
j := cronService.GetJob(id)
if j == nil {
return nil, fmt.Errorf("job not found: %s", id)
}
return j, nil
case "create":
name := getStr("name")
if name == "" {
name = "webui-cron"
}
msg := getStr("message")
if msg == "" {
return nil, fmt.Errorf("message required")
}
schedule := cron.CronSchedule{}
if expr := getStr("expr"); expr != "" {
schedule.Expr = expr
} else {
// Backward compatibility for older clients.
kind := strings.ToLower(getStr("kind"))
switch kind {
case "every":
everyMS, ok := args["everyMs"].(float64)
if !ok || int64(everyMS) <= 0 {
return nil, fmt.Errorf("expr required")
}
ev := int64(everyMS)
schedule.Kind = "every"
schedule.EveryMS = &ev
case "once", "at":
atMS, ok := args["atMs"].(float64)
var at int64
if !ok || int64(atMS) <= 0 {
at = time.Now().Add(1 * time.Minute).UnixMilli()
} else {
at = int64(atMS)
}
schedule.Kind = "at"
schedule.AtMS = &at
default:
return nil, fmt.Errorf("expr required")
}
}
deliver := false
if v, ok := args["deliver"].(bool); ok {
deliver = v
}
return cronService.AddJob(name, schedule, msg, deliver, getStr("channel"), getStr("to"))
case "update":
id := getStr("id")
if id == "" {
return nil, fmt.Errorf("id required")
}
in := cron.UpdateJobInput{}
if v := getStr("name"); v != "" {
in.Name = &v
}
if v := getStr("message"); v != "" {
in.Message = &v
}
if p := getBoolPtr("enabled"); p != nil {
in.Enabled = p
}
if p := getBoolPtr("deliver"); p != nil {
in.Deliver = p
}
if v := getStr("channel"); v != "" {
in.Channel = &v
}
if v := getStr("to"); v != "" {
in.To = &v
}
if expr := getStr("expr"); expr != "" {
s := cron.CronSchedule{Expr: expr}
in.Schedule = &s
} else if kind := strings.ToLower(getStr("kind")); kind != "" {
// Backward compatibility for older clients.
s := cron.CronSchedule{Kind: kind}
switch kind {
case "every":
if everyMS, ok := args["everyMs"].(float64); ok && int64(everyMS) > 0 {
ev := int64(everyMS)
s.EveryMS = &ev
} else {
return nil, fmt.Errorf("expr required")
}
case "once", "at":
s.Kind = "at"
if atMS, ok := args["atMs"].(float64); ok && int64(atMS) > 0 {
at := int64(atMS)
s.AtMS = &at
} else {
at := time.Now().Add(1 * time.Minute).UnixMilli()
s.AtMS = &at
}
default:
return nil, fmt.Errorf("expr required")
}
in.Schedule = &s
}
return cronService.UpdateJob(id, in)
case "delete":
id := getStr("id")
return map[string]interface{}{"deleted": cronService.RemoveJob(id), "id": id}, nil
case "enable":
id := getStr("id")
j := cronService.EnableJob(id, true)
return map[string]interface{}{"ok": j != nil, "id": id}, nil
case "disable":
id := getStr("id")
j := cronService.EnableJob(id, false)
return map[string]interface{}{"ok": j != nil, "id": id}, nil
default:
return nil, fmt.Errorf("unsupported cron action: %s", action)
}
})
}
func normalizeCronTargetChatID(channel, chatID string) string {
ch := strings.ToLower(strings.TrimSpace(channel))
target := strings.TrimSpace(chatID)
if ch == "" || target == "" {
return target
}
prefix := ch + ":"
if strings.HasPrefix(strings.ToLower(target), prefix) {
return strings.TrimSpace(target[len(prefix):])
}
return target
}
func dispatchCronJob(msgBus *bus.MessageBus, job *cron.CronJob) string {
if job == nil {
return ""
}
message := strings.TrimSpace(job.Payload.Message)
if message == "" {
return ""
}
targetChannel := strings.TrimSpace(job.Payload.Channel)
targetChatID := normalizeCronTargetChatID(targetChannel, job.Payload.To)
if targetChannel != "" && targetChatID != "" {
msgBus.PublishOutbound(bus.OutboundMessage{
Channel: targetChannel,
ChatID: targetChatID,
Content: message,
})
if job.Payload.Deliver {
return "delivered"
}
return "delivered_targeted"
}
msgBus.PublishInbound(bus.InboundMessage{
Channel: "system",
SenderID: "cron",
ChatID: "internal:cron",
Content: message,
SessionKey: fmt.Sprintf("cron:%s", job.ID),
Metadata: map[string]string{
"trigger": "cron",
"job_id": job.ID,
},
})
return "scheduled"
}
func configureCronServiceRuntime(cs *cron.CronService, cfg *config.Config) {
if cs == nil || cfg == nil {
return
}
cs.SetRuntimeOptions(cron.RuntimeOptions{
RunLoopMinSleep: time.Duration(cfg.Cron.MinSleepSec) * time.Second,
RunLoopMaxSleep: time.Duration(cfg.Cron.MaxSleepSec) * time.Second,
RetryBackoffBase: time.Duration(cfg.Cron.RetryBackoffBaseSec) * time.Second,
RetryBackoffMax: time.Duration(cfg.Cron.RetryBackoffMaxSec) * time.Second,
MaxConsecutiveFailureRetries: int64(cfg.Cron.MaxConsecutiveFailureRetries),
MaxWorkers: cfg.Cron.MaxWorkers,
})
}
func buildHeartbeatService(cfg *config.Config, msgBus *bus.MessageBus) *heartbeat.HeartbeatService {
hbInterval := cfg.Agents.Defaults.Heartbeat.EverySec
if hbInterval <= 0 {
hbInterval = 30 * 60
}
return heartbeat.NewHeartbeatService(cfg.WorkspacePath(), func(prompt string) (string, error) {
msgBus.PublishInbound(bus.InboundMessage{
Channel: "system",
SenderID: "heartbeat",
ChatID: "internal:heartbeat",
Content: prompt,
SessionKey: "heartbeat:default",
Metadata: map[string]string{
"trigger": "heartbeat",
},
})
return "queued", nil
}, hbInterval, cfg.Agents.Defaults.Heartbeat.Enabled, cfg.Agents.Defaults.Heartbeat.PromptTemplate)
}