fix whatsapp

This commit is contained in:
LPF
2026-03-10 00:00:01 +08:00
parent 938e762c6b
commit e9a47ac02a
7 changed files with 384 additions and 52 deletions

View File

@@ -71,6 +71,8 @@ type Server struct {
liveRuntimeOn bool
liveSubagentMu sync.Mutex
liveSubagents map[string]*liveSubagentGroup
whatsAppBridge *channels.WhatsAppBridgeService
whatsAppBase string
}
var nodesWebsocketUpgrader = websocket.Upgrader{
@@ -311,6 +313,42 @@ func (s *Server) SetNodeWebRTCTransport(t *nodes.WebRTCTransport) {
func (s *Server) SetNodeP2PStatusHandler(fn func() map[string]interface{}) {
s.nodeP2PStatus = fn
}
func (s *Server) SetWhatsAppBridge(service *channels.WhatsAppBridgeService, basePath string) {
s.whatsAppBridge = service
s.whatsAppBase = strings.TrimSpace(basePath)
}
func (s *Server) handleWhatsAppBridgeWS(w http.ResponseWriter, r *http.Request) {
if s.whatsAppBridge == nil {
http.Error(w, "whatsapp bridge unavailable", http.StatusServiceUnavailable)
return
}
s.whatsAppBridge.ServeWS(w, r)
}
func (s *Server) handleWhatsAppBridgeStatus(w http.ResponseWriter, r *http.Request) {
if s.whatsAppBridge == nil {
http.Error(w, "whatsapp bridge unavailable", http.StatusServiceUnavailable)
return
}
s.whatsAppBridge.ServeStatus(w, r)
}
func (s *Server) handleWhatsAppBridgeLogout(w http.ResponseWriter, r *http.Request) {
if s.whatsAppBridge == nil {
http.Error(w, "whatsapp bridge unavailable", http.StatusServiceUnavailable)
return
}
s.whatsAppBridge.ServeLogout(w, r)
}
func joinServerRoute(base, endpoint string) string {
base = strings.TrimRight(strings.TrimSpace(base), "/")
if base == "" || base == "/" {
return "/" + strings.TrimPrefix(endpoint, "/")
}
return base + "/" + strings.TrimPrefix(endpoint, "/")
}
func (s *Server) rememberNodeConnection(nodeID, connID string) {
nodeID = strings.TrimSpace(nodeID)
@@ -440,6 +478,16 @@ func (s *Server) Start(ctx context.Context) error {
mux.HandleFunc("/webui/api/logs/stream", s.handleWebUILogsStream)
mux.HandleFunc("/webui/api/logs/live", s.handleWebUILogsLive)
mux.HandleFunc("/webui/api/logs/recent", s.handleWebUILogsRecent)
if strings.TrimSpace(s.whatsAppBase) != "" {
base := strings.TrimRight(strings.TrimSpace(s.whatsAppBase), "/")
if base == "" {
base = "/whatsapp"
}
mux.HandleFunc(base, s.handleWhatsAppBridgeWS)
mux.HandleFunc(joinServerRoute(base, "ws"), s.handleWhatsAppBridgeWS)
mux.HandleFunc(joinServerRoute(base, "status"), s.handleWhatsAppBridgeStatus)
mux.HandleFunc(joinServerRoute(base, "logout"), s.handleWhatsAppBridgeLogout)
}
s.server = &http.Server{Addr: s.addr, Handler: mux}
go func() {
<-ctx.Done()

View File

@@ -120,6 +120,53 @@ func TestHandleWebUIWhatsAppQR(t *testing.T) {
}
}
func TestHandleWebUIWhatsAppStatusWithNestedBridgePath(t *testing.T) {
t.Parallel()
bridge := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/whatsapp/status":
_ = json.NewEncoder(w).Encode(map[string]interface{}{
"state": "connected",
"connected": true,
"logged_in": true,
"bridge_addr": "127.0.0.1:7788",
"user_jid": "8613012345678@s.whatsapp.net",
"qr_available": false,
"last_event": "connected",
"updated_at": "2026-03-09T12:00:00+08:00",
})
default:
http.NotFound(w, r)
}
}))
defer bridge.Close()
tmp := t.TempDir()
cfgPath := filepath.Join(tmp, "config.json")
cfg := cfgpkg.DefaultConfig()
cfg.Logging.Enabled = false
cfg.Channels.WhatsApp.Enabled = true
cfg.Channels.WhatsApp.BridgeURL = "ws" + strings.TrimPrefix(bridge.URL, "http") + "/whatsapp/ws"
if err := cfgpkg.SaveConfig(cfgPath, cfg); err != nil {
t.Fatalf("save config: %v", err)
}
srv := NewServer("127.0.0.1", 0, "", nil)
srv.SetConfigPath(cfgPath)
req := httptest.NewRequest(http.MethodGet, "/webui/api/whatsapp/status", nil)
rec := httptest.NewRecorder()
srv.handleWebUIWhatsAppStatus(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", rec.Code, rec.Body.String())
}
if !strings.Contains(rec.Body.String(), `"bridge_running":true`) {
t.Fatalf("expected bridge_running=true, got: %s", rec.Body.String())
}
}
func TestHandleWebUIConfigRequiresConfirmForProviderAPIBaseChange(t *testing.T) {
t.Parallel()

View File

@@ -67,6 +67,7 @@ type WhatsAppBridgeService struct {
status WhatsAppBridgeStatus
wsClientsMu sync.Mutex
markReadFn func(ctx context.Context, ids []types.MessageID, timestamp time.Time, chat, sender types.JID) error
localOnly bool
}
type whatsappBridgeWSMessage struct {
@@ -100,6 +101,34 @@ func NewWhatsAppBridgeService(addr, stateDir string, printQR bool) *WhatsAppBrid
}
func (s *WhatsAppBridgeService) Start(ctx context.Context) error {
if err := s.startRuntime(ctx); err != nil {
return err
}
mux := http.NewServeMux()
s.RegisterRoutes(mux, "")
s.httpServer = &http.Server{
Addr: s.addr,
Handler: mux,
}
ln, err := net.Listen("tcp", s.addr)
if err != nil {
return fmt.Errorf("listen whatsapp bridge: %w", err)
}
if err := s.httpServer.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil
}
func (s *WhatsAppBridgeService) StartEmbedded(ctx context.Context) error {
s.localOnly = true
return s.startRuntime(ctx)
}
func (s *WhatsAppBridgeService) startRuntime(ctx context.Context) error {
if strings.TrimSpace(s.addr) == "" {
return fmt.Errorf("bridge address is required")
}
@@ -116,26 +145,13 @@ func (s *WhatsAppBridgeService) Start(ctx context.Context) error {
runCtx, cancel := context.WithCancel(ctx)
s.cancel = cancel
mux := http.NewServeMux()
mux.HandleFunc("/", s.handleWS)
mux.HandleFunc("/ws", s.handleWS)
mux.HandleFunc("/status", s.handleStatus)
mux.HandleFunc("/logout", s.handleLogout)
s.httpServer = &http.Server{
Addr: s.addr,
Handler: mux,
}
ln, err := net.Listen("tcp", s.addr)
if err != nil {
return fmt.Errorf("listen whatsapp bridge: %w", err)
}
go func() {
<-runCtx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = s.httpServer.Shutdown(shutdownCtx)
if s.httpServer != nil {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = s.httpServer.Shutdown(shutdownCtx)
}
s.closeWSClients()
if s.client != nil {
s.client.Disconnect()
@@ -148,10 +164,6 @@ func (s *WhatsAppBridgeService) Start(ctx context.Context) error {
go func() {
_ = s.connectClient(runCtx)
}()
if err := s.httpServer.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil
}
@@ -161,6 +173,17 @@ func (s *WhatsAppBridgeService) Stop() {
}
}
func (s *WhatsAppBridgeService) RegisterRoutes(mux *http.ServeMux, basePath string) {
if mux == nil {
return
}
basePath = normalizeBridgeBasePath(basePath)
mux.HandleFunc(basePath, s.ServeWS)
mux.HandleFunc(joinBridgeRoute(basePath, "ws"), s.ServeWS)
mux.HandleFunc(joinBridgeRoute(basePath, "status"), s.ServeStatus)
mux.HandleFunc(joinBridgeRoute(basePath, "logout"), s.ServeLogout)
}
func (s *WhatsAppBridgeService) StatusSnapshot() WhatsAppBridgeStatus {
s.statusMu.RLock()
defer s.statusMu.RUnlock()
@@ -421,11 +444,29 @@ func (s *WhatsAppBridgeService) handleWS(w http.ResponseWriter, r *http.Request)
}
}
func (s *WhatsAppBridgeService) ServeWS(w http.ResponseWriter, r *http.Request) {
s.wrapHandler(s.handleWS)(w, r)
}
func (s *WhatsAppBridgeService) wrapHandler(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if s.localOnly && !isLoopbackRequest(r) {
http.Error(w, "forbidden", http.StatusForbidden)
return
}
next(w, r)
}
}
func (s *WhatsAppBridgeService) handleStatus(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(s.StatusSnapshot())
}
func (s *WhatsAppBridgeService) ServeStatus(w http.ResponseWriter, r *http.Request) {
s.wrapHandler(s.handleStatus)(w, r)
}
func (s *WhatsAppBridgeService) handleLogout(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
@@ -795,36 +836,16 @@ func ParseWhatsAppBridgeListenAddr(raw string) (string, error) {
return raw, nil
}
func (s *WhatsAppBridgeService) ServeLogout(w http.ResponseWriter, r *http.Request) {
s.wrapHandler(s.handleLogout)(w, r)
}
func BridgeStatusURL(raw string) (string, error) {
raw = strings.TrimSpace(raw)
if raw == "" {
return "", fmt.Errorf("bridge url is required")
}
if !strings.Contains(raw, "://") {
raw = "ws://" + raw
}
u, err := url.Parse(raw)
if err != nil {
return "", fmt.Errorf("parse bridge url: %w", err)
}
switch u.Scheme {
case "wss":
u.Scheme = "https"
default:
u.Scheme = "http"
}
u.Path = "/status"
u.RawQuery = ""
u.Fragment = ""
return u.String(), nil
return bridgeEndpointURL(raw, "status")
}
func BridgeLogoutURL(raw string) (string, error) {
statusURL, err := BridgeStatusURL(raw)
if err != nil {
return "", err
}
return strings.TrimSuffix(statusURL, "/status") + "/logout", nil
return bridgeEndpointURL(raw, "logout")
}
func normalizeWhatsAppRecipientJID(raw string) (types.JID, error) {
@@ -874,3 +895,73 @@ func extractWhatsAppMessageText(msg *waProto.Message) string {
return ""
}
}
func bridgeEndpointURL(raw, endpoint string) (string, error) {
raw = strings.TrimSpace(raw)
if raw == "" {
return "", fmt.Errorf("bridge url is required")
}
if !strings.Contains(raw, "://") {
raw = "ws://" + raw
}
u, err := url.Parse(raw)
if err != nil {
return "", fmt.Errorf("parse bridge url: %w", err)
}
switch u.Scheme {
case "wss":
u.Scheme = "https"
default:
u.Scheme = "http"
}
u.Path = bridgeSiblingPath(u.Path, endpoint)
u.RawQuery = ""
u.Fragment = ""
return u.String(), nil
}
func bridgeSiblingPath(pathValue, endpoint string) string {
pathValue = strings.TrimSpace(pathValue)
if endpoint == "" {
endpoint = "status"
}
if pathValue == "" || pathValue == "/" {
return "/" + endpoint
}
trimmed := strings.TrimSuffix(pathValue, "/")
if strings.HasSuffix(trimmed, "/ws") {
return strings.TrimSuffix(trimmed, "/ws") + "/" + endpoint
}
return trimmed + "/" + endpoint
}
func normalizeBridgeBasePath(basePath string) string {
basePath = strings.TrimSpace(basePath)
if basePath == "" || basePath == "/" {
return "/"
}
if !strings.HasPrefix(basePath, "/") {
basePath = "/" + basePath
}
return strings.TrimSuffix(basePath, "/")
}
func joinBridgeRoute(basePath, endpoint string) string {
basePath = normalizeBridgeBasePath(basePath)
if basePath == "/" {
return "/" + strings.TrimPrefix(endpoint, "/")
}
return basePath + "/" + strings.TrimPrefix(endpoint, "/")
}
func isLoopbackRequest(r *http.Request) bool {
if r == nil {
return false
}
host, _, err := net.SplitHostPort(strings.TrimSpace(r.RemoteAddr))
if err != nil {
host = strings.TrimSpace(r.RemoteAddr)
}
ip := net.ParseIP(host)
return ip != nil && ip.IsLoopback()
}

View File

@@ -53,6 +53,16 @@ func TestBridgeStatusURL(t *testing.T) {
}
}
func TestBridgeStatusURLWithNestedPath(t *testing.T) {
got, err := BridgeStatusURL("ws://localhost:7788/whatsapp/ws")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got != "http://localhost:7788/whatsapp/status" {
t.Fatalf("got %q", got)
}
}
func TestNormalizeWhatsAppRecipientJID(t *testing.T) {
tests := []struct {
input string