mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-05-18 05:57:29 +08:00
chore: update project files
This commit is contained in:
@@ -2,10 +2,10 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/exec"
|
||||
"os/signal"
|
||||
@@ -81,9 +81,6 @@ func gatewayCmd() {
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
if shouldEmbedWhatsAppBridge(cfg) {
|
||||
cfg.Channels.WhatsApp.BridgeURL = embeddedWhatsAppBridgeURL(cfg)
|
||||
}
|
||||
|
||||
agentLoop, channelManager, err := buildGatewayRuntime(ctx, cfg, msgBus, cronService)
|
||||
if err != nil {
|
||||
@@ -175,19 +172,109 @@ func gatewayCmd() {
|
||||
}
|
||||
bindAgentLoopHandlers(agentLoop)
|
||||
var reloadMu sync.Mutex
|
||||
var applyReload func(forceRuntimeReload bool) error
|
||||
registryServer.SetConfigAfterHook(func(forceRuntimeReload bool) error {
|
||||
triggerReload := func(source string, forceRuntimeReload bool) error {
|
||||
reloadMu.Lock()
|
||||
defer reloadMu.Unlock()
|
||||
if applyReload == nil {
|
||||
return fmt.Errorf("reload handler not ready")
|
||||
fmt.Printf("\nReloading config (source=%s)...\n", strings.TrimSpace(source))
|
||||
newCfg, err := config.LoadConfig(getConfigPath())
|
||||
if err != nil {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
return applyReload(forceRuntimeReload)
|
||||
})
|
||||
whatsAppBridge, whatsAppEmbedded := setupEmbeddedWhatsAppBridge(ctx, cfg)
|
||||
if whatsAppBridge != nil {
|
||||
registryServer.SetWhatsAppBridge(whatsAppBridge, embeddedWhatsAppBridgeBasePath)
|
||||
if strings.EqualFold(strings.TrimSpace(os.Getenv(envRootGranted)), "1") || strings.EqualFold(strings.TrimSpace(os.Getenv(envRootGranted)), "true") {
|
||||
applyMaximumPermissionPolicy(newCfg)
|
||||
}
|
||||
configureCronServiceRuntime(cronService, newCfg)
|
||||
heartbeatService.Stop()
|
||||
heartbeatService = buildHeartbeatService(newCfg, msgBus)
|
||||
if err := heartbeatService.Start(); err != nil {
|
||||
fmt.Printf("Error starting heartbeat service: %v\n", err)
|
||||
}
|
||||
|
||||
if !forceRuntimeReload && reflect.DeepEqual(cfg, newCfg) {
|
||||
fmt.Println("Config unchanged, skip reload")
|
||||
return nil
|
||||
}
|
||||
|
||||
if cfg.Gateway.Host != newCfg.Gateway.Host || cfg.Gateway.Port != newCfg.Gateway.Port {
|
||||
fmt.Printf("Warning: gateway host/port change detected (%s:%d -> %s:%d); restart required to rebind listener\n",
|
||||
cfg.Gateway.Host, cfg.Gateway.Port, newCfg.Gateway.Host, newCfg.Gateway.Port)
|
||||
}
|
||||
|
||||
runtimeSame := reflect.DeepEqual(cfg.Agents, newCfg.Agents) &&
|
||||
reflect.DeepEqual(cfg.Models, newCfg.Models) &&
|
||||
reflect.DeepEqual(cfg.Tools, newCfg.Tools) &&
|
||||
reflect.DeepEqual(cfg.Channels, newCfg.Channels)
|
||||
|
||||
if runtimeSame && !forceRuntimeReload {
|
||||
configureLogging(newCfg)
|
||||
sentinelService.Stop()
|
||||
sentinelService = sentinel.NewService(
|
||||
getConfigPath(),
|
||||
newCfg.WorkspacePath(),
|
||||
newCfg.Sentinel.IntervalSec,
|
||||
newCfg.Sentinel.AutoHeal,
|
||||
buildSentinelAlertHandler(newCfg, msgBus),
|
||||
)
|
||||
if newCfg.Sentinel.Enabled {
|
||||
sentinelService.SetManager(channelManager)
|
||||
sentinelService.Start()
|
||||
}
|
||||
cfg = newCfg
|
||||
runtimecfg.Set(cfg)
|
||||
registryServer.SetToken(cfg.Gateway.Token)
|
||||
registryServer.SetWorkspacePath(cfg.WorkspacePath())
|
||||
registryServer.SetLogFilePath(cfg.LogFilePath())
|
||||
fmt.Println("Config hot-reload applied (logging/metadata only)")
|
||||
return nil
|
||||
}
|
||||
|
||||
newAgentLoop, newChannelManager, err := buildGatewayRuntime(ctx, newCfg, msgBus, cronService)
|
||||
if err != nil {
|
||||
return fmt.Errorf("init runtime: %w", err)
|
||||
}
|
||||
|
||||
channelManager.StopAll(ctx)
|
||||
agentLoop.Stop()
|
||||
channelManager = newChannelManager
|
||||
agentLoop = newAgentLoop
|
||||
cfg = newCfg
|
||||
runtimecfg.Set(cfg)
|
||||
bindAgentLoopHandlers(agentLoop)
|
||||
configureLogging(newCfg)
|
||||
registryServer.SetToken(cfg.Gateway.Token)
|
||||
registryServer.SetWorkspacePath(cfg.WorkspacePath())
|
||||
registryServer.SetLogFilePath(cfg.LogFilePath())
|
||||
if rawWeixin, ok := channelManager.GetChannel("weixin"); ok {
|
||||
if weixinChannel, ok := rawWeixin.(*channels.WeixinChannel); ok {
|
||||
weixinChannel.SetConfigPath(getConfigPath())
|
||||
registryServer.SetWeixinChannel(weixinChannel)
|
||||
}
|
||||
} else {
|
||||
registryServer.SetWeixinChannel(nil)
|
||||
}
|
||||
sentinelService.Stop()
|
||||
sentinelService = sentinel.NewService(
|
||||
getConfigPath(),
|
||||
newCfg.WorkspacePath(),
|
||||
newCfg.Sentinel.IntervalSec,
|
||||
newCfg.Sentinel.AutoHeal,
|
||||
buildSentinelAlertHandler(newCfg, msgBus),
|
||||
)
|
||||
if newCfg.Sentinel.Enabled {
|
||||
sentinelService.Start()
|
||||
}
|
||||
sentinelService.SetManager(channelManager)
|
||||
|
||||
if err := channelManager.StartAll(ctx); err != nil {
|
||||
return fmt.Errorf("start channels: %w", err)
|
||||
}
|
||||
go agentLoop.Run(ctx)
|
||||
fmt.Println("Config hot-reload applied")
|
||||
return nil
|
||||
}
|
||||
registryServer.SetConfigAfterHook(func(forceRuntimeReload bool) error {
|
||||
return triggerReload("api", forceRuntimeReload)
|
||||
})
|
||||
if rawWeixin, ok := channelManager.GetChannel("weixin"); ok {
|
||||
if weixinChannel, ok := rawWeixin.(*channels.WeixinChannel); ok {
|
||||
weixinChannel.SetConfigPath(getConfigPath())
|
||||
@@ -332,9 +419,9 @@ func gatewayCmd() {
|
||||
}
|
||||
})
|
||||
if err := registryServer.Start(ctx); err != nil {
|
||||
fmt.Printf("Error starting node registry server: %v\n", err)
|
||||
fmt.Printf("Error starting gateway server: %v\n", err)
|
||||
} else {
|
||||
fmt.Printf("Node registry server started on %s:%d\n", cfg.Gateway.Host, cfg.Gateway.Port)
|
||||
fmt.Printf("Gateway server started on %s:%d\n", cfg.Gateway.Host, cfg.Gateway.Port)
|
||||
}
|
||||
|
||||
if err := channelManager.StartAll(ctx); err != nil {
|
||||
@@ -345,137 +432,26 @@ func gatewayCmd() {
|
||||
go runGatewayStartupCompactionCheck(ctx, agentLoop)
|
||||
go runGatewayBootstrapInit(ctx, cfg, agentLoop)
|
||||
|
||||
stopConfigWatcher := startGatewayConfigWatcher(ctx, getConfigPath(), 500*time.Millisecond, 250*time.Millisecond, func() error {
|
||||
return triggerReload("watcher", false)
|
||||
})
|
||||
defer stopConfigWatcher()
|
||||
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, gatewayNotifySignals()...)
|
||||
applyReload = func(forceRuntimeReload bool) error {
|
||||
fmt.Println("\nReloading config...")
|
||||
newCfg, err := config.LoadConfig(getConfigPath())
|
||||
if err != nil {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
if strings.EqualFold(strings.TrimSpace(os.Getenv(envRootGranted)), "1") || strings.EqualFold(strings.TrimSpace(os.Getenv(envRootGranted)), "true") {
|
||||
applyMaximumPermissionPolicy(newCfg)
|
||||
}
|
||||
configureCronServiceRuntime(cronService, newCfg)
|
||||
heartbeatService.Stop()
|
||||
heartbeatService = buildHeartbeatService(newCfg, msgBus)
|
||||
if err := heartbeatService.Start(); err != nil {
|
||||
fmt.Printf("Error starting heartbeat service: %v\n", err)
|
||||
}
|
||||
|
||||
if !forceRuntimeReload && reflect.DeepEqual(cfg, newCfg) {
|
||||
fmt.Println("Config unchanged, skip reload")
|
||||
return nil
|
||||
}
|
||||
|
||||
if cfg.Gateway.Host != newCfg.Gateway.Host || cfg.Gateway.Port != newCfg.Gateway.Port {
|
||||
fmt.Printf("Warning: gateway host/port change detected (%s:%d -> %s:%d); restart required to rebind listener\n",
|
||||
cfg.Gateway.Host, cfg.Gateway.Port, newCfg.Gateway.Host, newCfg.Gateway.Port)
|
||||
}
|
||||
|
||||
if shouldEmbedWhatsAppBridge(newCfg) {
|
||||
newCfg.Channels.WhatsApp.BridgeURL = embeddedWhatsAppBridgeURL(newCfg)
|
||||
}
|
||||
|
||||
runtimeSame := reflect.DeepEqual(cfg.Agents, newCfg.Agents) &&
|
||||
reflect.DeepEqual(cfg.Models, newCfg.Models) &&
|
||||
reflect.DeepEqual(cfg.Tools, newCfg.Tools) &&
|
||||
reflect.DeepEqual(cfg.Channels, newCfg.Channels)
|
||||
|
||||
if runtimeSame && !forceRuntimeReload {
|
||||
configureLogging(newCfg)
|
||||
sentinelService.Stop()
|
||||
sentinelService = sentinel.NewService(
|
||||
getConfigPath(),
|
||||
newCfg.WorkspacePath(),
|
||||
newCfg.Sentinel.IntervalSec,
|
||||
newCfg.Sentinel.AutoHeal,
|
||||
buildSentinelAlertHandler(newCfg, msgBus),
|
||||
)
|
||||
if newCfg.Sentinel.Enabled {
|
||||
sentinelService.SetManager(channelManager)
|
||||
sentinelService.Start()
|
||||
}
|
||||
cfg = newCfg
|
||||
runtimecfg.Set(cfg)
|
||||
registryServer.SetToken(cfg.Gateway.Token)
|
||||
registryServer.SetWorkspacePath(cfg.WorkspacePath())
|
||||
registryServer.SetLogFilePath(cfg.LogFilePath())
|
||||
fmt.Println("Config hot-reload applied (logging/metadata only)")
|
||||
return nil
|
||||
}
|
||||
|
||||
newAgentLoop, newChannelManager, err := buildGatewayRuntime(ctx, newCfg, msgBus, cronService)
|
||||
if err != nil {
|
||||
return fmt.Errorf("init runtime: %w", err)
|
||||
}
|
||||
|
||||
channelManager.StopAll(ctx)
|
||||
agentLoop.Stop()
|
||||
if whatsAppBridge != nil {
|
||||
whatsAppBridge.Stop()
|
||||
}
|
||||
|
||||
newWhatsAppBridge, _ := setupEmbeddedWhatsAppBridge(ctx, newCfg)
|
||||
|
||||
channelManager = newChannelManager
|
||||
agentLoop = newAgentLoop
|
||||
cfg = newCfg
|
||||
whatsAppBridge = newWhatsAppBridge
|
||||
whatsAppEmbedded = newWhatsAppBridge != nil
|
||||
runtimecfg.Set(cfg)
|
||||
bindAgentLoopHandlers(agentLoop)
|
||||
configureLogging(newCfg)
|
||||
registryServer.SetToken(cfg.Gateway.Token)
|
||||
registryServer.SetWorkspacePath(cfg.WorkspacePath())
|
||||
registryServer.SetLogFilePath(cfg.LogFilePath())
|
||||
registryServer.SetWhatsAppBridge(whatsAppBridge, embeddedWhatsAppBridgeBasePath)
|
||||
if rawWeixin, ok := channelManager.GetChannel("weixin"); ok {
|
||||
if weixinChannel, ok := rawWeixin.(*channels.WeixinChannel); ok {
|
||||
weixinChannel.SetConfigPath(getConfigPath())
|
||||
registryServer.SetWeixinChannel(weixinChannel)
|
||||
}
|
||||
} else {
|
||||
registryServer.SetWeixinChannel(nil)
|
||||
}
|
||||
sentinelService.Stop()
|
||||
sentinelService = sentinel.NewService(
|
||||
getConfigPath(),
|
||||
newCfg.WorkspacePath(),
|
||||
newCfg.Sentinel.IntervalSec,
|
||||
newCfg.Sentinel.AutoHeal,
|
||||
buildSentinelAlertHandler(newCfg, msgBus),
|
||||
)
|
||||
if newCfg.Sentinel.Enabled {
|
||||
sentinelService.Start()
|
||||
}
|
||||
sentinelService.SetManager(channelManager)
|
||||
|
||||
if err := channelManager.StartAll(ctx); err != nil {
|
||||
return fmt.Errorf("start channels: %w", err)
|
||||
}
|
||||
go agentLoop.Run(ctx)
|
||||
fmt.Println("Config hot-reload applied")
|
||||
return nil
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case sig := <-sigChan:
|
||||
switch {
|
||||
case isGatewayReloadSignal(sig):
|
||||
reloadMu.Lock()
|
||||
err := applyReload(false)
|
||||
reloadMu.Unlock()
|
||||
err := triggerReload("signal", false)
|
||||
if err != nil {
|
||||
fmt.Printf("Reload failed: %v\n", err)
|
||||
}
|
||||
default:
|
||||
fmt.Println("\nShutting down...")
|
||||
cancel()
|
||||
if whatsAppEmbedded && whatsAppBridge != nil {
|
||||
whatsAppBridge.Stop()
|
||||
}
|
||||
heartbeatService.Stop()
|
||||
sentinelService.Stop()
|
||||
cronService.Stop()
|
||||
@@ -488,8 +464,6 @@ func gatewayCmd() {
|
||||
}
|
||||
}
|
||||
|
||||
const embeddedWhatsAppBridgeBasePath = "/whatsapp"
|
||||
|
||||
func runGatewayStartupCompactionCheck(parent context.Context, agentLoop *agent.AgentLoop) {
|
||||
if agentLoop == nil {
|
||||
return
|
||||
@@ -542,6 +516,89 @@ func runGatewayBootstrapInit(parent context.Context, cfg *config.Config, agentLo
|
||||
logger.InfoC("gateway", logger.C0114)
|
||||
}
|
||||
|
||||
type configFileFingerprint struct {
|
||||
Size int64
|
||||
ModUnixNano int64
|
||||
SHA256 [32]byte
|
||||
}
|
||||
|
||||
func readConfigFileFingerprint(path string) (configFileFingerprint, error) {
|
||||
info, err := os.Stat(path)
|
||||
if err != nil {
|
||||
return configFileFingerprint{}, err
|
||||
}
|
||||
content, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return configFileFingerprint{}, err
|
||||
}
|
||||
return configFileFingerprint{
|
||||
Size: info.Size(),
|
||||
ModUnixNano: info.ModTime().UnixNano(),
|
||||
SHA256: sha256.Sum256(content),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (f configFileFingerprint) sameContent(other configFileFingerprint) bool {
|
||||
return f.Size == other.Size && f.SHA256 == other.SHA256
|
||||
}
|
||||
|
||||
func startGatewayConfigWatcher(ctx context.Context, configPath string, debounce, pollInterval time.Duration, onContentChanged func() error) func() {
|
||||
if debounce <= 0 {
|
||||
debounce = 500 * time.Millisecond
|
||||
}
|
||||
if pollInterval <= 0 {
|
||||
pollInterval = 250 * time.Millisecond
|
||||
}
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
ticker := time.NewTicker(pollInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
last, err := readConfigFileFingerprint(configPath)
|
||||
haveLast := err == nil
|
||||
pending := false
|
||||
lastDetectedAt := time.Time{}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
current, err := readConfigFileFingerprint(configPath)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if !haveLast {
|
||||
last = current
|
||||
haveLast = true
|
||||
continue
|
||||
}
|
||||
if !current.sameContent(last) {
|
||||
last = current
|
||||
pending = true
|
||||
lastDetectedAt = time.Now()
|
||||
continue
|
||||
}
|
||||
if pending && !lastDetectedAt.IsZero() && time.Since(lastDetectedAt) >= debounce {
|
||||
pending = false
|
||||
if onContentChanged != nil {
|
||||
if err := onContentChanged(); err != nil {
|
||||
fmt.Printf("Config watcher reload failed: %v\n", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
return func() {
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func applyMaximumPermissionPolicy(cfg *config.Config) {
|
||||
cfg.Tools.Shell.Enabled = true
|
||||
cfg.Tools.Shell.Sandbox.Enabled = false
|
||||
@@ -1123,69 +1180,3 @@ func buildHeartbeatService(cfg *config.Config, msgBus *bus.MessageBus) *heartbea
|
||||
return "queued", nil
|
||||
}, hbInterval, cfg.Agents.Defaults.Heartbeat.Enabled, cfg.Agents.Defaults.Heartbeat.PromptTemplate)
|
||||
}
|
||||
|
||||
func setupEmbeddedWhatsAppBridge(ctx context.Context, cfg *config.Config) (*channels.WhatsAppBridgeService, bool) {
|
||||
if !shouldStartEmbeddedWhatsAppBridge(cfg) {
|
||||
return nil, false
|
||||
}
|
||||
cfg.Channels.WhatsApp.BridgeURL = embeddedWhatsAppBridgeURL(cfg)
|
||||
stateDir := filepath.Join(filepath.Dir(getConfigPath()), "channels", "whatsapp")
|
||||
svc := channels.NewWhatsAppBridgeService(fmt.Sprintf("%s:%d", cfg.Gateway.Host, cfg.Gateway.Port), stateDir, false)
|
||||
if err := svc.StartEmbedded(ctx); err != nil {
|
||||
fmt.Printf("Error starting embedded WhatsApp bridge: %v\n", err)
|
||||
return nil, false
|
||||
}
|
||||
return svc, true
|
||||
}
|
||||
|
||||
func shouldStartEmbeddedWhatsAppBridge(cfg *config.Config) bool {
|
||||
return cfg != nil && shouldEmbedWhatsAppBridge(cfg)
|
||||
}
|
||||
|
||||
func shouldEmbedWhatsAppBridge(cfg *config.Config) bool {
|
||||
raw := strings.TrimSpace(cfg.Channels.WhatsApp.BridgeURL)
|
||||
if raw == "" {
|
||||
return true
|
||||
}
|
||||
hostPort := comparableBridgeHostPort(raw)
|
||||
if hostPort == "" {
|
||||
return false
|
||||
}
|
||||
if hostPort == "127.0.0.1:3001" || hostPort == "localhost:3001" {
|
||||
return true
|
||||
}
|
||||
return hostPort == comparableGatewayHostPort(cfg.Gateway.Host, cfg.Gateway.Port)
|
||||
}
|
||||
|
||||
func embeddedWhatsAppBridgeURL(cfg *config.Config) string {
|
||||
host := strings.TrimSpace(cfg.Gateway.Host)
|
||||
switch host {
|
||||
case "", "0.0.0.0", "::", "[::]":
|
||||
host = "127.0.0.1"
|
||||
}
|
||||
return fmt.Sprintf("ws://%s:%d%s/ws", host, cfg.Gateway.Port, embeddedWhatsAppBridgeBasePath)
|
||||
}
|
||||
|
||||
func comparableBridgeHostPort(raw string) string {
|
||||
raw = strings.TrimSpace(raw)
|
||||
if raw == "" {
|
||||
return ""
|
||||
}
|
||||
if !strings.Contains(raw, "://") {
|
||||
return strings.ToLower(raw)
|
||||
}
|
||||
u, err := url.Parse(raw)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return strings.ToLower(strings.TrimSpace(u.Host))
|
||||
}
|
||||
|
||||
func comparableGatewayHostPort(host string, port int) string {
|
||||
host = strings.TrimSpace(strings.ToLower(host))
|
||||
switch host {
|
||||
case "", "0.0.0.0", "::", "[::]":
|
||||
host = "127.0.0.1"
|
||||
}
|
||||
return fmt.Sprintf("%s:%d", host, port)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user