From e9a47ac02aafc2d6d381116141650d574a2f2d2a Mon Sep 17 00:00:00 2001 From: LPF Date: Tue, 10 Mar 2026 00:00:01 +0800 Subject: [PATCH] fix whatsapp --- Dockerfile | 35 +++++ cmd/clawgo/cmd_gateway.go | 95 +++++++++++++- docker-compose.yml | 14 ++ pkg/api/server.go | 48 +++++++ pkg/api/server_test.go | 47 +++++++ pkg/channels/whatsapp_bridge.go | 187 ++++++++++++++++++++------- pkg/channels/whatsapp_bridge_test.go | 10 ++ 7 files changed, 384 insertions(+), 52 deletions(-) create mode 100644 Dockerfile create mode 100644 docker-compose.yml diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..ec22176 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,35 @@ +FROM golang:1.25.5-bookworm AS builder + +WORKDIR /src + +COPY go.mod go.sum ./ +RUN go mod download + +COPY . ./ + +RUN rm -rf cmd/clawgo/workspace \ + && mkdir -p cmd/clawgo/workspace \ + && cp -a workspace/. cmd/clawgo/workspace/ + +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -trimpath -buildvcs=false -ldflags="-s -w" -o /out/clawgo ./cmd/clawgo + +FROM debian:bookworm-slim + +RUN apt-get update \ + && apt-get install -y --no-install-recommends ca-certificates tzdata \ + && rm -rf /var/lib/apt/lists/* + +RUN useradd --create-home --shell /bin/sh clawgo + +USER clawgo +WORKDIR /home/clawgo + +COPY --from=builder /out/clawgo /usr/local/bin/clawgo + +ENV CLAWGO_CONFIG=/home/clawgo/.clawgo/config.json + +EXPOSE 18790 + +VOLUME ["/home/clawgo/.clawgo"] + +ENTRYPOINT ["/bin/sh", "-c", "if [ ! -f \"$CLAWGO_CONFIG\" ]; then /usr/local/bin/clawgo onboard; fi; exec /usr/local/bin/clawgo gateway run --config \"$CLAWGO_CONFIG\""] diff --git a/cmd/clawgo/cmd_gateway.go b/cmd/clawgo/cmd_gateway.go index ec12e12..adcb77a 100644 --- a/cmd/clawgo/cmd_gateway.go +++ b/cmd/clawgo/cmd_gateway.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "net/url" "os" "os/exec" "os/signal" @@ -86,6 +87,9 @@ func gatewayCmd() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + if shouldEmbedWhatsAppBridge(cfg) { + cfg.Channels.WhatsApp.BridgeURL = embeddedWhatsAppBridgeURL(cfg) + } agentLoop, channelManager, err := buildGatewayRuntime(ctx, cfg, msgBus, cronService) if err != nil { @@ -125,10 +129,6 @@ func gatewayCmd() { fmt.Println("✓ Sentinel service started") } - if err := channelManager.StartAll(ctx); err != nil { - fmt.Printf("Error starting channels: %v\n", err) - } - registryServer := api.NewServer(cfg.Gateway.Host, cfg.Gateway.Port, cfg.Gateway.Token, nodes.DefaultManager()) configureGatewayNodeP2P := func(loop *agent.AgentLoop, server *api.Server, runtimeCfg *config.Config) { if loop == nil || server == nil || runtimeCfg == nil { @@ -223,6 +223,10 @@ func gatewayCmd() { registryServer.SetToolsCatalogHandler(func() interface{} { return agentLoop.GetToolCatalog() }) + whatsAppBridge, whatsAppEmbedded := setupEmbeddedWhatsAppBridge(ctx, cfg) + if whatsAppBridge != nil { + registryServer.SetWhatsAppBridge(whatsAppBridge, embeddedWhatsAppBridgeBasePath) + } registryServer.SetCronHandler(func(action string, args map[string]interface{}) (interface{}, error) { getStr := func(k string) string { v, _ := args[k].(string) @@ -366,6 +370,10 @@ func gatewayCmd() { fmt.Printf("✓ Node registry server started on %s:%d\n", cfg.Gateway.Host, cfg.Gateway.Port) } + if err := channelManager.StartAll(ctx); err != nil { + fmt.Printf("Error starting channels: %v\n", err) + } + go agentLoop.Run(ctx) go runGatewayStartupCompactionCheck(ctx, agentLoop) go runGatewayBootstrapInit(ctx, cfg, agentLoop) @@ -394,6 +402,10 @@ func gatewayCmd() { return } + if shouldEmbedWhatsAppBridge(newCfg) { + newCfg.Channels.WhatsApp.BridgeURL = embeddedWhatsAppBridgeURL(newCfg) + } + runtimeSame := reflect.DeepEqual(cfg.Agents, newCfg.Agents) && reflect.DeepEqual(cfg.Providers, newCfg.Providers) && reflect.DeepEqual(cfg.Tools, newCfg.Tools) && @@ -435,14 +447,22 @@ func gatewayCmd() { return } + newWhatsAppBridge, _ := setupEmbeddedWhatsAppBridge(ctx, newCfg) + channelManager.StopAll(ctx) agentLoop.Stop() + if whatsAppBridge != nil { + whatsAppBridge.Stop() + } channelManager = newChannelManager agentLoop = newAgentLoop cfg = newCfg + whatsAppBridge = newWhatsAppBridge + whatsAppEmbedded = newWhatsAppBridge != nil runtimecfg.Set(cfg) configureGatewayNodeP2P(agentLoop, registryServer, cfg) + registryServer.SetWhatsAppBridge(whatsAppBridge, embeddedWhatsAppBridgeBasePath) sentinelService.Stop() sentinelService = sentinel.NewService( getConfigPath(), @@ -483,6 +503,9 @@ func gatewayCmd() { default: fmt.Println("\nShutting down...") cancel() + if whatsAppEmbedded && whatsAppBridge != nil { + whatsAppBridge.Stop() + } heartbeatService.Stop() sentinelService.Stop() cronService.Stop() @@ -495,6 +518,8 @@ func gatewayCmd() { } } +const embeddedWhatsAppBridgeBasePath = "/whatsapp" + func runGatewayStartupCompactionCheck(parent context.Context, agentLoop *agent.AgentLoop) { if agentLoop == nil { return @@ -843,3 +868,65 @@ func buildHeartbeatService(cfg *config.Config, msgBus *bus.MessageBus) *heartbea return "queued", nil }, hbInterval, cfg.Agents.Defaults.Heartbeat.Enabled, cfg.Agents.Defaults.Heartbeat.PromptTemplate) } + +func setupEmbeddedWhatsAppBridge(ctx context.Context, cfg *config.Config) (*channels.WhatsAppBridgeService, bool) { + if cfg == nil || !cfg.Channels.WhatsApp.Enabled || !shouldEmbedWhatsAppBridge(cfg) { + return nil, false + } + cfg.Channels.WhatsApp.BridgeURL = embeddedWhatsAppBridgeURL(cfg) + stateDir := filepath.Join(filepath.Dir(getConfigPath()), "channels", "whatsapp") + svc := channels.NewWhatsAppBridgeService(fmt.Sprintf("%s:%d", cfg.Gateway.Host, cfg.Gateway.Port), stateDir, false) + if err := svc.StartEmbedded(ctx); err != nil { + fmt.Printf("Error starting embedded WhatsApp bridge: %v\n", err) + return nil, false + } + return svc, true +} + +func shouldEmbedWhatsAppBridge(cfg *config.Config) bool { + raw := strings.TrimSpace(cfg.Channels.WhatsApp.BridgeURL) + if raw == "" { + return true + } + hostPort := comparableBridgeHostPort(raw) + if hostPort == "" { + return false + } + if hostPort == "127.0.0.1:3001" || hostPort == "localhost:3001" { + return true + } + return hostPort == comparableGatewayHostPort(cfg.Gateway.Host, cfg.Gateway.Port) +} + +func embeddedWhatsAppBridgeURL(cfg *config.Config) string { + host := strings.TrimSpace(cfg.Gateway.Host) + switch host { + case "", "0.0.0.0", "::", "[::]": + host = "127.0.0.1" + } + return fmt.Sprintf("ws://%s:%d%s/ws", host, cfg.Gateway.Port, embeddedWhatsAppBridgeBasePath) +} + +func comparableBridgeHostPort(raw string) string { + raw = strings.TrimSpace(raw) + if raw == "" { + return "" + } + if !strings.Contains(raw, "://") { + return strings.ToLower(raw) + } + u, err := url.Parse(raw) + if err != nil { + return "" + } + return strings.ToLower(strings.TrimSpace(u.Host)) +} + +func comparableGatewayHostPort(host string, port int) string { + host = strings.TrimSpace(strings.ToLower(host)) + switch host { + case "", "0.0.0.0", "::", "[::]": + host = "127.0.0.1" + } + return fmt.Sprintf("%s:%d", host, port) +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..c9de9b1 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,14 @@ +services: + clawgo: + build: + context: . + dockerfile: Dockerfile + container_name: clawgo + restart: unless-stopped + ports: + - "18790:18790" + environment: + TZ: Asia/Shanghai + CLAWGO_CONFIG: /home/clawgo/.clawgo/config.json + volumes: + - ./.clawgo:/home/clawgo/.clawgo diff --git a/pkg/api/server.go b/pkg/api/server.go index a54a270..708ebe7 100644 --- a/pkg/api/server.go +++ b/pkg/api/server.go @@ -71,6 +71,8 @@ type Server struct { liveRuntimeOn bool liveSubagentMu sync.Mutex liveSubagents map[string]*liveSubagentGroup + whatsAppBridge *channels.WhatsAppBridgeService + whatsAppBase string } var nodesWebsocketUpgrader = websocket.Upgrader{ @@ -311,6 +313,42 @@ func (s *Server) SetNodeWebRTCTransport(t *nodes.WebRTCTransport) { func (s *Server) SetNodeP2PStatusHandler(fn func() map[string]interface{}) { s.nodeP2PStatus = fn } +func (s *Server) SetWhatsAppBridge(service *channels.WhatsAppBridgeService, basePath string) { + s.whatsAppBridge = service + s.whatsAppBase = strings.TrimSpace(basePath) +} + +func (s *Server) handleWhatsAppBridgeWS(w http.ResponseWriter, r *http.Request) { + if s.whatsAppBridge == nil { + http.Error(w, "whatsapp bridge unavailable", http.StatusServiceUnavailable) + return + } + s.whatsAppBridge.ServeWS(w, r) +} + +func (s *Server) handleWhatsAppBridgeStatus(w http.ResponseWriter, r *http.Request) { + if s.whatsAppBridge == nil { + http.Error(w, "whatsapp bridge unavailable", http.StatusServiceUnavailable) + return + } + s.whatsAppBridge.ServeStatus(w, r) +} + +func (s *Server) handleWhatsAppBridgeLogout(w http.ResponseWriter, r *http.Request) { + if s.whatsAppBridge == nil { + http.Error(w, "whatsapp bridge unavailable", http.StatusServiceUnavailable) + return + } + s.whatsAppBridge.ServeLogout(w, r) +} + +func joinServerRoute(base, endpoint string) string { + base = strings.TrimRight(strings.TrimSpace(base), "/") + if base == "" || base == "/" { + return "/" + strings.TrimPrefix(endpoint, "/") + } + return base + "/" + strings.TrimPrefix(endpoint, "/") +} func (s *Server) rememberNodeConnection(nodeID, connID string) { nodeID = strings.TrimSpace(nodeID) @@ -440,6 +478,16 @@ func (s *Server) Start(ctx context.Context) error { mux.HandleFunc("/webui/api/logs/stream", s.handleWebUILogsStream) mux.HandleFunc("/webui/api/logs/live", s.handleWebUILogsLive) mux.HandleFunc("/webui/api/logs/recent", s.handleWebUILogsRecent) + if strings.TrimSpace(s.whatsAppBase) != "" { + base := strings.TrimRight(strings.TrimSpace(s.whatsAppBase), "/") + if base == "" { + base = "/whatsapp" + } + mux.HandleFunc(base, s.handleWhatsAppBridgeWS) + mux.HandleFunc(joinServerRoute(base, "ws"), s.handleWhatsAppBridgeWS) + mux.HandleFunc(joinServerRoute(base, "status"), s.handleWhatsAppBridgeStatus) + mux.HandleFunc(joinServerRoute(base, "logout"), s.handleWhatsAppBridgeLogout) + } s.server = &http.Server{Addr: s.addr, Handler: mux} go func() { <-ctx.Done() diff --git a/pkg/api/server_test.go b/pkg/api/server_test.go index ebf58c5..3f0fee2 100644 --- a/pkg/api/server_test.go +++ b/pkg/api/server_test.go @@ -120,6 +120,53 @@ func TestHandleWebUIWhatsAppQR(t *testing.T) { } } +func TestHandleWebUIWhatsAppStatusWithNestedBridgePath(t *testing.T) { + t.Parallel() + + bridge := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/whatsapp/status": + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "state": "connected", + "connected": true, + "logged_in": true, + "bridge_addr": "127.0.0.1:7788", + "user_jid": "8613012345678@s.whatsapp.net", + "qr_available": false, + "last_event": "connected", + "updated_at": "2026-03-09T12:00:00+08:00", + }) + default: + http.NotFound(w, r) + } + })) + defer bridge.Close() + + tmp := t.TempDir() + cfgPath := filepath.Join(tmp, "config.json") + cfg := cfgpkg.DefaultConfig() + cfg.Logging.Enabled = false + cfg.Channels.WhatsApp.Enabled = true + cfg.Channels.WhatsApp.BridgeURL = "ws" + strings.TrimPrefix(bridge.URL, "http") + "/whatsapp/ws" + if err := cfgpkg.SaveConfig(cfgPath, cfg); err != nil { + t.Fatalf("save config: %v", err) + } + + srv := NewServer("127.0.0.1", 0, "", nil) + srv.SetConfigPath(cfgPath) + + req := httptest.NewRequest(http.MethodGet, "/webui/api/whatsapp/status", nil) + rec := httptest.NewRecorder() + srv.handleWebUIWhatsAppStatus(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", rec.Code, rec.Body.String()) + } + if !strings.Contains(rec.Body.String(), `"bridge_running":true`) { + t.Fatalf("expected bridge_running=true, got: %s", rec.Body.String()) + } +} + func TestHandleWebUIConfigRequiresConfirmForProviderAPIBaseChange(t *testing.T) { t.Parallel() diff --git a/pkg/channels/whatsapp_bridge.go b/pkg/channels/whatsapp_bridge.go index 7eade47..dedab8a 100644 --- a/pkg/channels/whatsapp_bridge.go +++ b/pkg/channels/whatsapp_bridge.go @@ -67,6 +67,7 @@ type WhatsAppBridgeService struct { status WhatsAppBridgeStatus wsClientsMu sync.Mutex markReadFn func(ctx context.Context, ids []types.MessageID, timestamp time.Time, chat, sender types.JID) error + localOnly bool } type whatsappBridgeWSMessage struct { @@ -100,6 +101,34 @@ func NewWhatsAppBridgeService(addr, stateDir string, printQR bool) *WhatsAppBrid } func (s *WhatsAppBridgeService) Start(ctx context.Context) error { + if err := s.startRuntime(ctx); err != nil { + return err + } + + mux := http.NewServeMux() + s.RegisterRoutes(mux, "") + s.httpServer = &http.Server{ + Addr: s.addr, + Handler: mux, + } + + ln, err := net.Listen("tcp", s.addr) + if err != nil { + return fmt.Errorf("listen whatsapp bridge: %w", err) + } + + if err := s.httpServer.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + return nil +} + +func (s *WhatsAppBridgeService) StartEmbedded(ctx context.Context) error { + s.localOnly = true + return s.startRuntime(ctx) +} + +func (s *WhatsAppBridgeService) startRuntime(ctx context.Context) error { if strings.TrimSpace(s.addr) == "" { return fmt.Errorf("bridge address is required") } @@ -116,26 +145,13 @@ func (s *WhatsAppBridgeService) Start(ctx context.Context) error { runCtx, cancel := context.WithCancel(ctx) s.cancel = cancel - mux := http.NewServeMux() - mux.HandleFunc("/", s.handleWS) - mux.HandleFunc("/ws", s.handleWS) - mux.HandleFunc("/status", s.handleStatus) - mux.HandleFunc("/logout", s.handleLogout) - s.httpServer = &http.Server{ - Addr: s.addr, - Handler: mux, - } - - ln, err := net.Listen("tcp", s.addr) - if err != nil { - return fmt.Errorf("listen whatsapp bridge: %w", err) - } - go func() { <-runCtx.Done() - shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - _ = s.httpServer.Shutdown(shutdownCtx) + if s.httpServer != nil { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = s.httpServer.Shutdown(shutdownCtx) + } s.closeWSClients() if s.client != nil { s.client.Disconnect() @@ -148,10 +164,6 @@ func (s *WhatsAppBridgeService) Start(ctx context.Context) error { go func() { _ = s.connectClient(runCtx) }() - - if err := s.httpServer.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) { - return err - } return nil } @@ -161,6 +173,17 @@ func (s *WhatsAppBridgeService) Stop() { } } +func (s *WhatsAppBridgeService) RegisterRoutes(mux *http.ServeMux, basePath string) { + if mux == nil { + return + } + basePath = normalizeBridgeBasePath(basePath) + mux.HandleFunc(basePath, s.ServeWS) + mux.HandleFunc(joinBridgeRoute(basePath, "ws"), s.ServeWS) + mux.HandleFunc(joinBridgeRoute(basePath, "status"), s.ServeStatus) + mux.HandleFunc(joinBridgeRoute(basePath, "logout"), s.ServeLogout) +} + func (s *WhatsAppBridgeService) StatusSnapshot() WhatsAppBridgeStatus { s.statusMu.RLock() defer s.statusMu.RUnlock() @@ -421,11 +444,29 @@ func (s *WhatsAppBridgeService) handleWS(w http.ResponseWriter, r *http.Request) } } +func (s *WhatsAppBridgeService) ServeWS(w http.ResponseWriter, r *http.Request) { + s.wrapHandler(s.handleWS)(w, r) +} + +func (s *WhatsAppBridgeService) wrapHandler(next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if s.localOnly && !isLoopbackRequest(r) { + http.Error(w, "forbidden", http.StatusForbidden) + return + } + next(w, r) + } +} + func (s *WhatsAppBridgeService) handleStatus(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(s.StatusSnapshot()) } +func (s *WhatsAppBridgeService) ServeStatus(w http.ResponseWriter, r *http.Request) { + s.wrapHandler(s.handleStatus)(w, r) +} + func (s *WhatsAppBridgeService) handleLogout(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) @@ -795,36 +836,16 @@ func ParseWhatsAppBridgeListenAddr(raw string) (string, error) { return raw, nil } +func (s *WhatsAppBridgeService) ServeLogout(w http.ResponseWriter, r *http.Request) { + s.wrapHandler(s.handleLogout)(w, r) +} + func BridgeStatusURL(raw string) (string, error) { - raw = strings.TrimSpace(raw) - if raw == "" { - return "", fmt.Errorf("bridge url is required") - } - if !strings.Contains(raw, "://") { - raw = "ws://" + raw - } - u, err := url.Parse(raw) - if err != nil { - return "", fmt.Errorf("parse bridge url: %w", err) - } - switch u.Scheme { - case "wss": - u.Scheme = "https" - default: - u.Scheme = "http" - } - u.Path = "/status" - u.RawQuery = "" - u.Fragment = "" - return u.String(), nil + return bridgeEndpointURL(raw, "status") } func BridgeLogoutURL(raw string) (string, error) { - statusURL, err := BridgeStatusURL(raw) - if err != nil { - return "", err - } - return strings.TrimSuffix(statusURL, "/status") + "/logout", nil + return bridgeEndpointURL(raw, "logout") } func normalizeWhatsAppRecipientJID(raw string) (types.JID, error) { @@ -874,3 +895,73 @@ func extractWhatsAppMessageText(msg *waProto.Message) string { return "" } } + +func bridgeEndpointURL(raw, endpoint string) (string, error) { + raw = strings.TrimSpace(raw) + if raw == "" { + return "", fmt.Errorf("bridge url is required") + } + if !strings.Contains(raw, "://") { + raw = "ws://" + raw + } + u, err := url.Parse(raw) + if err != nil { + return "", fmt.Errorf("parse bridge url: %w", err) + } + switch u.Scheme { + case "wss": + u.Scheme = "https" + default: + u.Scheme = "http" + } + u.Path = bridgeSiblingPath(u.Path, endpoint) + u.RawQuery = "" + u.Fragment = "" + return u.String(), nil +} + +func bridgeSiblingPath(pathValue, endpoint string) string { + pathValue = strings.TrimSpace(pathValue) + if endpoint == "" { + endpoint = "status" + } + if pathValue == "" || pathValue == "/" { + return "/" + endpoint + } + trimmed := strings.TrimSuffix(pathValue, "/") + if strings.HasSuffix(trimmed, "/ws") { + return strings.TrimSuffix(trimmed, "/ws") + "/" + endpoint + } + return trimmed + "/" + endpoint +} + +func normalizeBridgeBasePath(basePath string) string { + basePath = strings.TrimSpace(basePath) + if basePath == "" || basePath == "/" { + return "/" + } + if !strings.HasPrefix(basePath, "/") { + basePath = "/" + basePath + } + return strings.TrimSuffix(basePath, "/") +} + +func joinBridgeRoute(basePath, endpoint string) string { + basePath = normalizeBridgeBasePath(basePath) + if basePath == "/" { + return "/" + strings.TrimPrefix(endpoint, "/") + } + return basePath + "/" + strings.TrimPrefix(endpoint, "/") +} + +func isLoopbackRequest(r *http.Request) bool { + if r == nil { + return false + } + host, _, err := net.SplitHostPort(strings.TrimSpace(r.RemoteAddr)) + if err != nil { + host = strings.TrimSpace(r.RemoteAddr) + } + ip := net.ParseIP(host) + return ip != nil && ip.IsLoopback() +} diff --git a/pkg/channels/whatsapp_bridge_test.go b/pkg/channels/whatsapp_bridge_test.go index 8b2fd6b..8855dbe 100644 --- a/pkg/channels/whatsapp_bridge_test.go +++ b/pkg/channels/whatsapp_bridge_test.go @@ -53,6 +53,16 @@ func TestBridgeStatusURL(t *testing.T) { } } +func TestBridgeStatusURLWithNestedPath(t *testing.T) { + got, err := BridgeStatusURL("ws://localhost:7788/whatsapp/ws") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got != "http://localhost:7788/whatsapp/status" { + t.Fatalf("got %q", got) + } +} + func TestNormalizeWhatsAppRecipientJID(t *testing.T) { tests := []struct { input string