mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-04-13 04:27:28 +08:00
1547 lines
42 KiB
Go
1547 lines
42 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/fs"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"runtime"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/YspCoder/clawgo/pkg/agent"
|
|
"github.com/YspCoder/clawgo/pkg/bus"
|
|
"github.com/YspCoder/clawgo/pkg/config"
|
|
"github.com/YspCoder/clawgo/pkg/cron"
|
|
"github.com/YspCoder/clawgo/pkg/nodes"
|
|
"github.com/YspCoder/clawgo/pkg/providers"
|
|
"github.com/YspCoder/clawgo/pkg/runtimecfg"
|
|
"github.com/YspCoder/clawgo/pkg/tools"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/pion/webrtc/v4"
|
|
)
|
|
|
|
type nodeRegisterOptions struct {
|
|
GatewayBase string
|
|
Token string
|
|
NodeToken string
|
|
ID string
|
|
Name string
|
|
Endpoint string
|
|
OS string
|
|
Arch string
|
|
Version string
|
|
Actions []string
|
|
Models []string
|
|
Tags []string
|
|
Agents []nodes.AgentInfo
|
|
Capabilities nodes.Capabilities
|
|
Watch bool
|
|
HeartbeatSec int
|
|
}
|
|
|
|
type nodeHeartbeatOptions struct {
|
|
GatewayBase string
|
|
Token string
|
|
ID string
|
|
}
|
|
|
|
type nodeWebRTCSession struct {
|
|
info nodes.NodeInfo
|
|
opts nodeRegisterOptions
|
|
client *http.Client
|
|
writeJSON func(interface{}) error
|
|
|
|
mu sync.Mutex
|
|
pc *webrtc.PeerConnection
|
|
dc *webrtc.DataChannel
|
|
}
|
|
|
|
type nodeLocalExecutor struct {
|
|
configPath string
|
|
workspace string
|
|
once sync.Once
|
|
loop *agent.AgentLoop
|
|
err error
|
|
}
|
|
|
|
var (
|
|
nodeLocalExecutorMu sync.Mutex
|
|
nodeLocalExecutors = map[string]*nodeLocalExecutor{}
|
|
nodeProviderFactory = providers.CreateProvider
|
|
nodeAgentLoopFactory = agent.NewAgentLoop
|
|
nodeLocalExecutorFactory = newNodeLocalExecutor
|
|
nodeCameraSnapFunc = captureNodeCameraSnapshot
|
|
nodeCameraClipFunc = captureNodeCameraClip
|
|
nodeScreenSnapFunc = captureNodeScreenSnapshot
|
|
nodeScreenRecordFunc = captureNodeScreenRecord
|
|
)
|
|
|
|
const nodeArtifactInlineLimit = 512 * 1024
|
|
|
|
func nodeCmd() {
|
|
args := os.Args[2:]
|
|
if len(args) == 0 {
|
|
printNodeHelp()
|
|
return
|
|
}
|
|
switch strings.ToLower(strings.TrimSpace(args[0])) {
|
|
case "register":
|
|
nodeRegisterCmd(args[1:])
|
|
case "heartbeat":
|
|
nodeHeartbeatCmd(args[1:])
|
|
case "help", "--help", "-h":
|
|
printNodeHelp()
|
|
default:
|
|
fmt.Printf("Unknown node command: %s\n", args[0])
|
|
printNodeHelp()
|
|
}
|
|
}
|
|
|
|
func printNodeHelp() {
|
|
fmt.Println("Node commands:")
|
|
fmt.Println(" clawgo node register [options]")
|
|
fmt.Println(" clawgo node heartbeat [options]")
|
|
fmt.Println()
|
|
fmt.Println("Register options:")
|
|
fmt.Println(" --gateway <url> Gateway base URL, e.g. http://host:18790")
|
|
fmt.Println(" --token <value> Gateway token (optional when gateway.token is empty)")
|
|
fmt.Println(" --node-token <value> Bearer token for this node endpoint (optional)")
|
|
fmt.Println(" --id <value> Node ID (default: hostname)")
|
|
fmt.Println(" --name <value> Node name (default: hostname)")
|
|
fmt.Println(" --endpoint <url> Public endpoint of this node")
|
|
fmt.Println(" --os <value> Reported OS (default: current runtime)")
|
|
fmt.Println(" --arch <value> Reported arch (default: current runtime)")
|
|
fmt.Println(" --version <value> Reported node version (default: current clawgo version)")
|
|
fmt.Println(" --actions <csv> Supported actions, e.g. run,agent_task")
|
|
fmt.Println(" --models <csv> Supported models, e.g. gpt-4o-mini")
|
|
fmt.Println(" --tags <csv> Node tags for dispatch policy, e.g. gpu,vision,build")
|
|
fmt.Println(" --capabilities <csv> Capability flags: run,invoke,model,camera,screen,location,canvas")
|
|
fmt.Println(" --watch Keep a websocket connection open and send heartbeats")
|
|
fmt.Println(" --heartbeat-sec <n> Heartbeat interval in seconds when --watch is set (default: 30)")
|
|
fmt.Println()
|
|
fmt.Println("Heartbeat options:")
|
|
fmt.Println(" --gateway <url> Gateway base URL")
|
|
fmt.Println(" --token <value> Gateway token")
|
|
fmt.Println(" --id <value> Node ID")
|
|
}
|
|
|
|
func nodeRegisterCmd(args []string) {
|
|
cfg, _ := loadConfig()
|
|
opts, err := parseNodeRegisterArgs(args, cfg)
|
|
if err != nil {
|
|
fmt.Printf("Error: %v\n", err)
|
|
printNodeHelp()
|
|
os.Exit(1)
|
|
}
|
|
client := &http.Client{Timeout: 20 * time.Second}
|
|
info := buildNodeInfo(opts)
|
|
ctx := context.Background()
|
|
if err := postNodeRegister(ctx, client, opts.GatewayBase, opts.Token, info); err != nil {
|
|
fmt.Printf("Error registering node: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
fmt.Printf("Node registered: %s -> %s\n", info.ID, opts.GatewayBase)
|
|
if !opts.Watch {
|
|
return
|
|
}
|
|
fmt.Printf("Heartbeat loop started: every %ds\n", opts.HeartbeatSec)
|
|
if err := runNodeHeartbeatLoop(client, opts, info); err != nil {
|
|
fmt.Printf("Heartbeat loop stopped: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func nodeHeartbeatCmd(args []string) {
|
|
cfg, _ := loadConfig()
|
|
opts, err := parseNodeHeartbeatArgs(args, cfg)
|
|
if err != nil {
|
|
fmt.Printf("Error: %v\n", err)
|
|
printNodeHelp()
|
|
os.Exit(1)
|
|
}
|
|
client := &http.Client{Timeout: 20 * time.Second}
|
|
if err := postNodeHeartbeat(context.Background(), client, opts.GatewayBase, opts.Token, opts.ID); err != nil {
|
|
fmt.Printf("Error sending heartbeat: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
fmt.Printf("Heartbeat sent: %s -> %s\n", opts.ID, opts.GatewayBase)
|
|
}
|
|
|
|
func parseNodeRegisterArgs(args []string, cfg *config.Config) (nodeRegisterOptions, error) {
|
|
host, _ := os.Hostname()
|
|
host = strings.TrimSpace(host)
|
|
if host == "" {
|
|
host = "node"
|
|
}
|
|
opts := nodeRegisterOptions{
|
|
GatewayBase: defaultGatewayBase(cfg),
|
|
Token: defaultGatewayToken(cfg),
|
|
ID: host,
|
|
Name: host,
|
|
OS: runtime.GOOS,
|
|
Arch: runtime.GOARCH,
|
|
Version: version,
|
|
HeartbeatSec: 30,
|
|
Capabilities: capabilitiesFromCSV("run,invoke,model"),
|
|
}
|
|
opts.Agents = nodeAgentsFromConfig(cfg)
|
|
for i := 0; i < len(args); i++ {
|
|
arg := strings.TrimSpace(args[i])
|
|
next := func() (string, error) {
|
|
if i+1 >= len(args) {
|
|
return "", fmt.Errorf("missing value for %s", arg)
|
|
}
|
|
i++
|
|
return strings.TrimSpace(args[i]), nil
|
|
}
|
|
switch arg {
|
|
case "--gateway":
|
|
v, err := next()
|
|
if err != nil {
|
|
return opts, err
|
|
}
|
|
opts.GatewayBase = v
|
|
case "--token":
|
|
v, err := next()
|
|
if err != nil {
|
|
return opts, err
|
|
}
|
|
opts.Token = v
|
|
case "--node-token":
|
|
v, err := next()
|
|
if err != nil {
|
|
return opts, err
|
|
}
|
|
opts.NodeToken = v
|
|
case "--id":
|
|
v, err := next()
|
|
if err != nil {
|
|
return opts, err
|
|
}
|
|
opts.ID = v
|
|
case "--name":
|
|
v, err := next()
|
|
if err != nil {
|
|
return opts, err
|
|
}
|
|
opts.Name = v
|
|
case "--endpoint":
|
|
v, err := next()
|
|
if err != nil {
|
|
return opts, err
|
|
}
|
|
opts.Endpoint = v
|
|
case "--os":
|
|
v, err := next()
|
|
if err != nil {
|
|
return opts, err
|
|
}
|
|
opts.OS = v
|
|
case "--arch":
|
|
v, err := next()
|
|
if err != nil {
|
|
return opts, err
|
|
}
|
|
opts.Arch = v
|
|
case "--version":
|
|
v, err := next()
|
|
if err != nil {
|
|
return opts, err
|
|
}
|
|
opts.Version = v
|
|
case "--actions":
|
|
v, err := next()
|
|
if err != nil {
|
|
return opts, err
|
|
}
|
|
opts.Actions = splitCSV(v)
|
|
case "--models":
|
|
v, err := next()
|
|
if err != nil {
|
|
return opts, err
|
|
}
|
|
opts.Models = splitCSV(v)
|
|
case "--tags":
|
|
v, err := next()
|
|
if err != nil {
|
|
return opts, err
|
|
}
|
|
opts.Tags = splitCSV(v)
|
|
case "--capabilities":
|
|
v, err := next()
|
|
if err != nil {
|
|
return opts, err
|
|
}
|
|
opts.Capabilities = capabilitiesFromCSV(v)
|
|
case "--watch":
|
|
opts.Watch = true
|
|
case "--heartbeat-sec":
|
|
v, err := next()
|
|
if err != nil {
|
|
return opts, err
|
|
}
|
|
n, convErr := strconv.Atoi(v)
|
|
if convErr != nil || n <= 0 {
|
|
return opts, fmt.Errorf("invalid --heartbeat-sec: %s", v)
|
|
}
|
|
opts.HeartbeatSec = n
|
|
default:
|
|
return opts, fmt.Errorf("unknown option: %s", arg)
|
|
}
|
|
}
|
|
if strings.TrimSpace(opts.GatewayBase) == "" {
|
|
return opts, fmt.Errorf("--gateway is required")
|
|
}
|
|
if strings.TrimSpace(opts.ID) == "" {
|
|
return opts, fmt.Errorf("--id is required")
|
|
}
|
|
opts.GatewayBase = normalizeGatewayBase(opts.GatewayBase)
|
|
return opts, nil
|
|
}
|
|
|
|
func parseNodeHeartbeatArgs(args []string, cfg *config.Config) (nodeHeartbeatOptions, error) {
|
|
host, _ := os.Hostname()
|
|
host = strings.TrimSpace(host)
|
|
if host == "" {
|
|
host = "node"
|
|
}
|
|
opts := nodeHeartbeatOptions{
|
|
GatewayBase: defaultGatewayBase(cfg),
|
|
Token: defaultGatewayToken(cfg),
|
|
ID: host,
|
|
}
|
|
for i := 0; i < len(args); i++ {
|
|
arg := strings.TrimSpace(args[i])
|
|
next := func() (string, error) {
|
|
if i+1 >= len(args) {
|
|
return "", fmt.Errorf("missing value for %s", arg)
|
|
}
|
|
i++
|
|
return strings.TrimSpace(args[i]), nil
|
|
}
|
|
switch arg {
|
|
case "--gateway":
|
|
v, err := next()
|
|
if err != nil {
|
|
return opts, err
|
|
}
|
|
opts.GatewayBase = v
|
|
case "--token":
|
|
v, err := next()
|
|
if err != nil {
|
|
return opts, err
|
|
}
|
|
opts.Token = v
|
|
case "--id":
|
|
v, err := next()
|
|
if err != nil {
|
|
return opts, err
|
|
}
|
|
opts.ID = v
|
|
default:
|
|
return opts, fmt.Errorf("unknown option: %s", arg)
|
|
}
|
|
}
|
|
if strings.TrimSpace(opts.GatewayBase) == "" {
|
|
return opts, fmt.Errorf("--gateway is required")
|
|
}
|
|
if strings.TrimSpace(opts.ID) == "" {
|
|
return opts, fmt.Errorf("--id is required")
|
|
}
|
|
opts.GatewayBase = normalizeGatewayBase(opts.GatewayBase)
|
|
return opts, nil
|
|
}
|
|
|
|
func buildNodeInfo(opts nodeRegisterOptions) nodes.NodeInfo {
|
|
return nodes.NodeInfo{
|
|
ID: strings.TrimSpace(opts.ID),
|
|
Name: strings.TrimSpace(opts.Name),
|
|
Tags: append([]string(nil), opts.Tags...),
|
|
OS: strings.TrimSpace(opts.OS),
|
|
Arch: strings.TrimSpace(opts.Arch),
|
|
Version: strings.TrimSpace(opts.Version),
|
|
Endpoint: strings.TrimSpace(opts.Endpoint),
|
|
Token: strings.TrimSpace(opts.NodeToken),
|
|
Capabilities: opts.Capabilities,
|
|
Actions: append([]string(nil), opts.Actions...),
|
|
Models: append([]string(nil), opts.Models...),
|
|
Agents: append([]nodes.AgentInfo(nil), opts.Agents...),
|
|
}
|
|
}
|
|
|
|
func nodeAgentsFromConfig(cfg *config.Config) []nodes.AgentInfo {
|
|
if cfg == nil {
|
|
return nil
|
|
}
|
|
items := make([]nodes.AgentInfo, 0, len(cfg.Agents.Subagents))
|
|
for agentID, subcfg := range cfg.Agents.Subagents {
|
|
id := strings.TrimSpace(agentID)
|
|
if id == "" || !subcfg.Enabled {
|
|
continue
|
|
}
|
|
items = append(items, nodes.AgentInfo{
|
|
ID: id,
|
|
DisplayName: strings.TrimSpace(subcfg.DisplayName),
|
|
Role: strings.TrimSpace(subcfg.Role),
|
|
Type: strings.TrimSpace(subcfg.Type),
|
|
Transport: strings.TrimSpace(subcfg.Transport),
|
|
ParentAgentID: strings.TrimSpace(subcfg.ParentAgentID),
|
|
})
|
|
}
|
|
sort.Slice(items, func(i, j int) bool { return items[i].ID < items[j].ID })
|
|
return items
|
|
}
|
|
|
|
func runNodeHeartbeatLoop(client *http.Client, opts nodeRegisterOptions, info nodes.NodeInfo) error {
|
|
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
|
|
defer stop()
|
|
for {
|
|
if err := runNodeHeartbeatSocket(ctx, opts, info); err != nil {
|
|
if ctx.Err() != nil {
|
|
fmt.Println("Node heartbeat stopped")
|
|
return nil
|
|
}
|
|
fmt.Printf("Warning: node socket closed for %s: %v\n", info.ID, err)
|
|
}
|
|
if ctx.Err() != nil {
|
|
fmt.Println("Node heartbeat stopped")
|
|
return nil
|
|
}
|
|
if regErr := postNodeRegister(ctx, client, opts.GatewayBase, opts.Token, info); regErr != nil {
|
|
fmt.Printf("Warning: re-register failed for %s: %v\n", info.ID, regErr)
|
|
} else {
|
|
fmt.Printf("Node re-registered: %s\n", info.ID)
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
fmt.Println("Node heartbeat stopped")
|
|
return nil
|
|
case <-time.After(2 * time.Second):
|
|
}
|
|
}
|
|
}
|
|
|
|
func runNodeHeartbeatSocket(ctx context.Context, opts nodeRegisterOptions, info nodes.NodeInfo) error {
|
|
wsURL := nodeWebsocketURL(opts.GatewayBase)
|
|
headers := http.Header{}
|
|
if strings.TrimSpace(opts.Token) != "" {
|
|
headers.Set("Authorization", "Bearer "+strings.TrimSpace(opts.Token))
|
|
}
|
|
conn, _, err := websocket.DefaultDialer.DialContext(ctx, wsURL, headers)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Close()
|
|
var writeMu sync.Mutex
|
|
writeJSON := func(v interface{}) error {
|
|
writeMu.Lock()
|
|
defer writeMu.Unlock()
|
|
_ = conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
|
return conn.WriteJSON(v)
|
|
}
|
|
writePing := func() error {
|
|
writeMu.Lock()
|
|
defer writeMu.Unlock()
|
|
return conn.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(10*time.Second))
|
|
}
|
|
|
|
if err := writeJSON(nodes.WireMessage{Type: "register", Node: &info}); err != nil {
|
|
return err
|
|
}
|
|
acks := make(chan nodes.WireAck, 8)
|
|
errs := make(chan error, 1)
|
|
client := &http.Client{Timeout: 20 * time.Second}
|
|
go readNodeSocketLoop(ctx, conn, writeJSON, client, info, opts, acks, errs)
|
|
if err := waitNodeAck(ctx, acks, errs, "registered", info.ID); err != nil {
|
|
return err
|
|
}
|
|
fmt.Printf("Node socket connected: %s\n", info.ID)
|
|
|
|
ticker := time.NewTicker(time.Duration(opts.HeartbeatSec) * time.Second)
|
|
pingTicker := time.NewTicker(nodeSocketPingInterval(opts.HeartbeatSec))
|
|
defer ticker.Stop()
|
|
defer pingTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case err := <-errs:
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
case <-pingTicker.C:
|
|
if err := writePing(); err != nil {
|
|
return err
|
|
}
|
|
case <-ticker.C:
|
|
if err := writeJSON(nodes.WireMessage{Type: "heartbeat", ID: info.ID}); err != nil {
|
|
return err
|
|
}
|
|
if err := waitNodeAck(ctx, acks, errs, "heartbeat", info.ID); err != nil {
|
|
return err
|
|
}
|
|
fmt.Printf("Heartbeat ok: %s\n", info.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func nodeSocketPingInterval(heartbeatSec int) time.Duration {
|
|
if heartbeatSec <= 0 {
|
|
return 25 * time.Second
|
|
}
|
|
interval := time.Duration(heartbeatSec) * time.Second / 2
|
|
if interval < 10*time.Second {
|
|
interval = 10 * time.Second
|
|
}
|
|
if interval > 25*time.Second {
|
|
interval = 25 * time.Second
|
|
}
|
|
return interval
|
|
}
|
|
|
|
func waitNodeAck(ctx context.Context, acks <-chan nodes.WireAck, errs <-chan error, expectedType, id string) error {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case err := <-errs:
|
|
if err == nil {
|
|
return context.Canceled
|
|
}
|
|
return err
|
|
case ack := <-acks:
|
|
if !ack.OK {
|
|
if strings.TrimSpace(ack.Error) == "" {
|
|
ack.Error = "unknown websocket error"
|
|
}
|
|
return fmt.Errorf("%s", ack.Error)
|
|
}
|
|
ackType := strings.ToLower(strings.TrimSpace(ack.Type))
|
|
if expectedType != "" && ackType != strings.ToLower(strings.TrimSpace(expectedType)) {
|
|
continue
|
|
}
|
|
if strings.TrimSpace(id) != "" && strings.TrimSpace(ack.ID) != "" && strings.TrimSpace(ack.ID) != strings.TrimSpace(id) {
|
|
continue
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func readNodeSocketLoop(ctx context.Context, conn *websocket.Conn, writeJSON func(interface{}) error, client *http.Client, info nodes.NodeInfo, opts nodeRegisterOptions, acks chan<- nodes.WireAck, errs chan<- error) {
|
|
defer close(acks)
|
|
defer close(errs)
|
|
rtc := &nodeWebRTCSession{info: info, opts: opts, client: client, writeJSON: writeJSON}
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
errs <- nil
|
|
return
|
|
default:
|
|
}
|
|
_ = conn.SetReadDeadline(time.Now().Add(90 * time.Second))
|
|
_, data, err := conn.ReadMessage()
|
|
if err != nil {
|
|
errs <- err
|
|
return
|
|
}
|
|
var raw map[string]interface{}
|
|
if err := json.Unmarshal(data, &raw); err != nil {
|
|
continue
|
|
}
|
|
if _, hasOK := raw["ok"]; hasOK {
|
|
var ack nodes.WireAck
|
|
if err := json.Unmarshal(data, &ack); err == nil {
|
|
acks <- ack
|
|
}
|
|
continue
|
|
}
|
|
var msg nodes.WireMessage
|
|
if err := json.Unmarshal(data, &msg); err != nil {
|
|
continue
|
|
}
|
|
switch strings.ToLower(strings.TrimSpace(msg.Type)) {
|
|
case "node_request":
|
|
go handleNodeWireRequest(ctx, writeJSON, client, info, opts, msg)
|
|
case "signal_offer", "signal_answer", "signal_candidate":
|
|
if err := rtc.handleSignal(ctx, msg); err != nil {
|
|
fmt.Printf("Warning: node webrtc signal failed: %v\n", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *nodeWebRTCSession) handleSignal(ctx context.Context, msg nodes.WireMessage) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
switch strings.ToLower(strings.TrimSpace(msg.Type)) {
|
|
case "signal_offer":
|
|
pc, err := s.ensurePeerConnectionLocked()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var desc webrtc.SessionDescription
|
|
if err := mapWirePayload(msg.Payload, &desc); err != nil {
|
|
return err
|
|
}
|
|
if err := pc.SetRemoteDescription(desc); err != nil {
|
|
return err
|
|
}
|
|
answer, err := pc.CreateAnswer(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := pc.SetLocalDescription(answer); err != nil {
|
|
return err
|
|
}
|
|
return s.writeJSON(nodes.WireMessage{
|
|
Type: "signal_answer",
|
|
From: s.info.ID,
|
|
To: "gateway",
|
|
Session: strings.TrimSpace(msg.Session),
|
|
Payload: structToWirePayload(*pc.LocalDescription()),
|
|
})
|
|
case "signal_candidate":
|
|
if s.pc == nil {
|
|
return nil
|
|
}
|
|
var candidate webrtc.ICECandidateInit
|
|
if err := mapWirePayload(msg.Payload, &candidate); err != nil {
|
|
return err
|
|
}
|
|
return s.pc.AddICECandidate(candidate)
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (s *nodeWebRTCSession) ensurePeerConnectionLocked() (*webrtc.PeerConnection, error) {
|
|
if s.pc != nil {
|
|
return s.pc, nil
|
|
}
|
|
config := webrtc.Configuration{}
|
|
pc, err := webrtc.NewPeerConnection(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
|
|
if candidate == nil {
|
|
return
|
|
}
|
|
_ = s.writeJSON(nodes.WireMessage{
|
|
Type: "signal_candidate",
|
|
From: s.info.ID,
|
|
To: "gateway",
|
|
Session: s.info.ID,
|
|
Payload: structToWirePayload(candidate.ToJSON()),
|
|
})
|
|
})
|
|
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
|
|
switch state {
|
|
case webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed, webrtc.PeerConnectionStateDisconnected:
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.pc != nil {
|
|
_ = s.pc.Close()
|
|
}
|
|
s.pc = nil
|
|
s.dc = nil
|
|
}
|
|
})
|
|
pc.OnDataChannel(func(dc *webrtc.DataChannel) {
|
|
s.mu.Lock()
|
|
s.dc = dc
|
|
s.mu.Unlock()
|
|
dc.OnMessage(func(message webrtc.DataChannelMessage) {
|
|
var msg nodes.WireMessage
|
|
if err := json.Unmarshal(message.Data, &msg); err != nil {
|
|
return
|
|
}
|
|
if strings.ToLower(strings.TrimSpace(msg.Type)) != "node_request" || msg.Request == nil {
|
|
return
|
|
}
|
|
go s.handleDataChannelRequest(context.Background(), dc, msg)
|
|
})
|
|
})
|
|
s.pc = pc
|
|
return pc, nil
|
|
}
|
|
|
|
func (s *nodeWebRTCSession) handleDataChannelRequest(ctx context.Context, dc *webrtc.DataChannel, msg nodes.WireMessage) {
|
|
resp := executeNodeRequest(ctx, s.client, s.info, s.opts, msg.Request)
|
|
b, err := json.Marshal(nodes.WireMessage{
|
|
Type: "node_response",
|
|
ID: msg.ID,
|
|
From: s.info.ID,
|
|
To: "gateway",
|
|
Session: strings.TrimSpace(msg.Session),
|
|
Response: &resp,
|
|
})
|
|
if err != nil {
|
|
return
|
|
}
|
|
_ = dc.Send(b)
|
|
}
|
|
|
|
func handleNodeWireRequest(ctx context.Context, writeJSON func(interface{}) error, client *http.Client, info nodes.NodeInfo, opts nodeRegisterOptions, msg nodes.WireMessage) {
|
|
resp := executeNodeRequest(ctx, client, info, opts, msg.Request)
|
|
_ = writeJSON(nodes.WireMessage{
|
|
Type: "node_response",
|
|
ID: msg.ID,
|
|
From: info.ID,
|
|
To: strings.TrimSpace(msg.From),
|
|
Session: strings.TrimSpace(msg.Session),
|
|
Response: &resp,
|
|
})
|
|
}
|
|
|
|
func executeNodeRequest(ctx context.Context, client *http.Client, info nodes.NodeInfo, opts nodeRegisterOptions, req *nodes.Request) nodes.Response {
|
|
resp := nodes.Response{
|
|
OK: false,
|
|
Code: "invalid_request",
|
|
Node: info.ID,
|
|
Action: "",
|
|
Error: "request missing",
|
|
}
|
|
if req == nil {
|
|
return resp
|
|
}
|
|
next := *req
|
|
resp.Action = next.Action
|
|
switch strings.ToLower(strings.TrimSpace(next.Action)) {
|
|
case "agent_task":
|
|
execResp, err := executeNodeAgentTask(ctx, info, next)
|
|
if err == nil {
|
|
return execResp
|
|
}
|
|
if strings.TrimSpace(opts.Endpoint) == "" {
|
|
resp.Error = err.Error()
|
|
resp.Code = "local_runtime_error"
|
|
return resp
|
|
}
|
|
case "camera_snap":
|
|
execResp, err := executeNodeCameraSnap(ctx, info, next)
|
|
if err == nil {
|
|
return execResp
|
|
}
|
|
if strings.TrimSpace(opts.Endpoint) == "" {
|
|
resp.Error = err.Error()
|
|
resp.Code = "local_runtime_error"
|
|
return resp
|
|
}
|
|
case "screen_snapshot":
|
|
execResp, err := executeNodeScreenSnapshot(ctx, info, next)
|
|
if err == nil {
|
|
return execResp
|
|
}
|
|
if strings.TrimSpace(opts.Endpoint) == "" {
|
|
resp.Error = err.Error()
|
|
resp.Code = "local_runtime_error"
|
|
return resp
|
|
}
|
|
case "camera_clip":
|
|
execResp, err := executeNodeCameraClip(ctx, info, next)
|
|
if err == nil {
|
|
return execResp
|
|
}
|
|
if strings.TrimSpace(opts.Endpoint) == "" {
|
|
resp.Error = err.Error()
|
|
resp.Code = "local_runtime_error"
|
|
return resp
|
|
}
|
|
case "screen_record":
|
|
execResp, err := executeNodeScreenRecord(ctx, info, next)
|
|
if err == nil {
|
|
return execResp
|
|
}
|
|
if strings.TrimSpace(opts.Endpoint) == "" {
|
|
resp.Error = err.Error()
|
|
resp.Code = "local_runtime_error"
|
|
return resp
|
|
}
|
|
}
|
|
if strings.TrimSpace(opts.Endpoint) == "" {
|
|
resp.Error = "node endpoint not configured"
|
|
resp.Code = "endpoint_missing"
|
|
return resp
|
|
}
|
|
if next.Node == "" {
|
|
next.Node = info.ID
|
|
}
|
|
execResp, err := nodes.DoEndpointRequest(ctx, client, opts.Endpoint, opts.NodeToken, next)
|
|
if err != nil {
|
|
return nodes.Response{
|
|
OK: false,
|
|
Code: "transport_error",
|
|
Node: info.ID,
|
|
Action: next.Action,
|
|
Error: err.Error(),
|
|
}
|
|
}
|
|
if strings.TrimSpace(execResp.Node) == "" {
|
|
execResp.Node = info.ID
|
|
}
|
|
return execResp
|
|
}
|
|
|
|
func executeNodeAgentTask(ctx context.Context, info nodes.NodeInfo, req nodes.Request) (nodes.Response, error) {
|
|
executor, err := getNodeLocalExecutor()
|
|
if err != nil {
|
|
return nodes.Response{}, err
|
|
}
|
|
loop, err := executor.Loop()
|
|
if err != nil {
|
|
return nodes.Response{}, err
|
|
}
|
|
|
|
remoteAgentID := strings.TrimSpace(stringArg(req.Args, "remote_agent_id"))
|
|
if remoteAgentID == "" || strings.EqualFold(remoteAgentID, "main") {
|
|
sessionKey := fmt.Sprintf("node:%s:main", info.ID)
|
|
result, err := loop.ProcessDirectWithOptions(ctx, strings.TrimSpace(req.Task), sessionKey, "node", info.ID, "main", nil)
|
|
if err != nil {
|
|
return nodes.Response{}, err
|
|
}
|
|
artifacts, err := collectNodeArtifacts(executor.workspace, req.Args)
|
|
if err != nil {
|
|
return nodes.Response{}, err
|
|
}
|
|
return nodes.Response{
|
|
OK: true,
|
|
Code: "ok",
|
|
Node: info.ID,
|
|
Action: req.Action,
|
|
Payload: map[string]interface{}{
|
|
"transport": "clawgo-local",
|
|
"agent_id": "main",
|
|
"result": strings.TrimSpace(result),
|
|
"artifacts": artifacts,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
out, err := loop.HandleSubagentRuntime(ctx, "dispatch_and_wait", map[string]interface{}{
|
|
"task": strings.TrimSpace(req.Task),
|
|
"agent_id": remoteAgentID,
|
|
"channel": "node",
|
|
"chat_id": info.ID,
|
|
"wait_timeout_sec": float64(120),
|
|
})
|
|
if err != nil {
|
|
return nodes.Response{}, err
|
|
}
|
|
payload, _ := out.(map[string]interface{})
|
|
result := strings.TrimSpace(fmt.Sprint(payload["merged"]))
|
|
if result == "" {
|
|
if reply, ok := payload["reply"].(*tools.RouterReply); ok {
|
|
result = strings.TrimSpace(reply.Result)
|
|
}
|
|
}
|
|
artifacts, err := collectNodeArtifacts(executor.workspace, req.Args)
|
|
if err != nil {
|
|
return nodes.Response{}, err
|
|
}
|
|
return nodes.Response{
|
|
OK: true,
|
|
Code: "ok",
|
|
Node: info.ID,
|
|
Action: req.Action,
|
|
Payload: map[string]interface{}{
|
|
"transport": "clawgo-local",
|
|
"agent_id": remoteAgentID,
|
|
"result": result,
|
|
"artifacts": artifacts,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func executeNodeCameraSnap(ctx context.Context, info nodes.NodeInfo, req nodes.Request) (nodes.Response, error) {
|
|
executor, err := getNodeLocalExecutor()
|
|
if err != nil {
|
|
return nodes.Response{}, err
|
|
}
|
|
outputPath, err := nodeCameraSnapFunc(ctx, executor.workspace, req.Args)
|
|
if err != nil {
|
|
return nodes.Response{}, err
|
|
}
|
|
artifact, err := buildNodeArtifact(executor.workspace, outputPath)
|
|
if err != nil {
|
|
return nodes.Response{}, err
|
|
}
|
|
return nodes.Response{
|
|
OK: true,
|
|
Code: "ok",
|
|
Node: info.ID,
|
|
Action: req.Action,
|
|
Payload: map[string]interface{}{
|
|
"transport": "clawgo-local",
|
|
"media_type": "image",
|
|
"storage": artifact["storage"],
|
|
"artifacts": []map[string]interface{}{artifact},
|
|
"meta": map[string]interface{}{
|
|
"facing": stringArg(req.Args, "facing"),
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func executeNodeScreenSnapshot(ctx context.Context, info nodes.NodeInfo, req nodes.Request) (nodes.Response, error) {
|
|
executor, err := getNodeLocalExecutor()
|
|
if err != nil {
|
|
return nodes.Response{}, err
|
|
}
|
|
outputPath, err := nodeScreenSnapFunc(ctx, executor.workspace, req.Args)
|
|
if err != nil {
|
|
return nodes.Response{}, err
|
|
}
|
|
artifact, err := buildNodeArtifact(executor.workspace, outputPath)
|
|
if err != nil {
|
|
return nodes.Response{}, err
|
|
}
|
|
return nodes.Response{
|
|
OK: true,
|
|
Code: "ok",
|
|
Node: info.ID,
|
|
Action: req.Action,
|
|
Payload: map[string]interface{}{
|
|
"transport": "clawgo-local",
|
|
"media_type": "image",
|
|
"storage": artifact["storage"],
|
|
"artifacts": []map[string]interface{}{artifact},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func executeNodeCameraClip(ctx context.Context, info nodes.NodeInfo, req nodes.Request) (nodes.Response, error) {
|
|
executor, err := getNodeLocalExecutor()
|
|
if err != nil {
|
|
return nodes.Response{}, err
|
|
}
|
|
durationMs := durationArg(req.Args, "duration_ms", 3000)
|
|
outputPath, err := nodeCameraClipFunc(ctx, executor.workspace, req.Args)
|
|
if err != nil {
|
|
return nodes.Response{}, err
|
|
}
|
|
artifact, err := buildNodeArtifact(executor.workspace, outputPath)
|
|
if err != nil {
|
|
return nodes.Response{}, err
|
|
}
|
|
return nodes.Response{
|
|
OK: true,
|
|
Code: "ok",
|
|
Node: info.ID,
|
|
Action: req.Action,
|
|
Payload: map[string]interface{}{
|
|
"transport": "clawgo-local",
|
|
"media_type": "video",
|
|
"storage": artifact["storage"],
|
|
"duration_ms": durationMs,
|
|
"artifacts": []map[string]interface{}{artifact},
|
|
"meta": map[string]interface{}{
|
|
"facing": stringArg(req.Args, "facing"),
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func executeNodeScreenRecord(ctx context.Context, info nodes.NodeInfo, req nodes.Request) (nodes.Response, error) {
|
|
executor, err := getNodeLocalExecutor()
|
|
if err != nil {
|
|
return nodes.Response{}, err
|
|
}
|
|
durationMs := durationArg(req.Args, "duration_ms", 3000)
|
|
outputPath, err := nodeScreenRecordFunc(ctx, executor.workspace, req.Args)
|
|
if err != nil {
|
|
return nodes.Response{}, err
|
|
}
|
|
artifact, err := buildNodeArtifact(executor.workspace, outputPath)
|
|
if err != nil {
|
|
return nodes.Response{}, err
|
|
}
|
|
return nodes.Response{
|
|
OK: true,
|
|
Code: "ok",
|
|
Node: info.ID,
|
|
Action: req.Action,
|
|
Payload: map[string]interface{}{
|
|
"transport": "clawgo-local",
|
|
"media_type": "video",
|
|
"storage": artifact["storage"],
|
|
"duration_ms": durationMs,
|
|
"artifacts": []map[string]interface{}{artifact},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func getNodeLocalExecutor() (*nodeLocalExecutor, error) {
|
|
key := strings.TrimSpace(getConfigPath())
|
|
if key == "" {
|
|
return nil, fmt.Errorf("config path is required")
|
|
}
|
|
nodeLocalExecutorMu.Lock()
|
|
defer nodeLocalExecutorMu.Unlock()
|
|
if existing := nodeLocalExecutors[key]; existing != nil {
|
|
return existing, nil
|
|
}
|
|
exec, err := nodeLocalExecutorFactory(key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
nodeLocalExecutors[key] = exec
|
|
return exec, nil
|
|
}
|
|
|
|
func newNodeLocalExecutor(configPath string) (*nodeLocalExecutor, error) {
|
|
configPath = strings.TrimSpace(configPath)
|
|
if configPath == "" {
|
|
return nil, fmt.Errorf("config path is required")
|
|
}
|
|
return &nodeLocalExecutor{configPath: configPath}, nil
|
|
}
|
|
|
|
func (e *nodeLocalExecutor) Loop() (*agent.AgentLoop, error) {
|
|
if e == nil {
|
|
return nil, fmt.Errorf("node local executor is nil")
|
|
}
|
|
e.once.Do(func() {
|
|
prev := globalConfigPathOverride
|
|
globalConfigPathOverride = e.configPath
|
|
defer func() { globalConfigPathOverride = prev }()
|
|
|
|
cfg, err := loadConfig()
|
|
if err != nil {
|
|
e.err = err
|
|
return
|
|
}
|
|
runtimecfg.Set(cfg)
|
|
msgBus := bus.NewMessageBus()
|
|
cronStorePath := filepath.Join(filepath.Dir(e.configPath), "cron", "jobs.json")
|
|
cronService := cron.NewCronService(cronStorePath, nil)
|
|
configureCronServiceRuntime(cronService, cfg)
|
|
provider, err := nodeProviderFactory(cfg)
|
|
if err != nil {
|
|
e.err = err
|
|
return
|
|
}
|
|
e.workspace = cfg.WorkspacePath()
|
|
loop := nodeAgentLoopFactory(cfg, msgBus, provider, cronService)
|
|
loop.SetConfigPath(e.configPath)
|
|
e.loop = loop
|
|
})
|
|
if e.err != nil {
|
|
return nil, e.err
|
|
}
|
|
if e.loop == nil {
|
|
return nil, fmt.Errorf("node local executor unavailable")
|
|
}
|
|
return e.loop, nil
|
|
}
|
|
|
|
func stringArg(args map[string]interface{}, key string) string {
|
|
if len(args) == 0 {
|
|
return ""
|
|
}
|
|
value, ok := args[key]
|
|
if !ok || value == nil {
|
|
return ""
|
|
}
|
|
return strings.TrimSpace(fmt.Sprint(value))
|
|
}
|
|
|
|
func collectNodeArtifacts(workspace string, args map[string]interface{}) ([]map[string]interface{}, error) {
|
|
paths := stringListArg(args, "artifact_paths")
|
|
if len(paths) == 0 {
|
|
return []map[string]interface{}{}, nil
|
|
}
|
|
root := strings.TrimSpace(workspace)
|
|
if root == "" {
|
|
return nil, fmt.Errorf("workspace path not configured")
|
|
}
|
|
out := make([]map[string]interface{}, 0, len(paths))
|
|
for _, raw := range paths {
|
|
artifact, err := buildNodeArtifact(root, raw)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, artifact)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func buildNodeArtifact(workspace, rawPath string) (map[string]interface{}, error) {
|
|
rawPath = strings.TrimSpace(rawPath)
|
|
if rawPath == "" {
|
|
return nil, fmt.Errorf("artifact path is required")
|
|
}
|
|
clean := filepath.Clean(rawPath)
|
|
fullPath := clean
|
|
if !filepath.IsAbs(clean) {
|
|
fullPath = filepath.Join(workspace, clean)
|
|
}
|
|
fullPath = filepath.Clean(fullPath)
|
|
rel, err := filepath.Rel(workspace, fullPath)
|
|
if err != nil || rel == ".." || strings.HasPrefix(rel, ".."+string(os.PathSeparator)) {
|
|
return nil, fmt.Errorf("artifact path escapes workspace: %s", rawPath)
|
|
}
|
|
info, err := os.Stat(fullPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if info.IsDir() {
|
|
return nil, fmt.Errorf("artifact path must be file: %s", rawPath)
|
|
}
|
|
artifact := map[string]interface{}{
|
|
"name": filepath.Base(fullPath),
|
|
"kind": nodeArtifactKindFromPath(fullPath),
|
|
"source_path": filepath.ToSlash(rel),
|
|
"size_bytes": info.Size(),
|
|
}
|
|
if mimeType := mimeTypeForPath(fullPath); mimeType != "" {
|
|
artifact["mime_type"] = mimeType
|
|
}
|
|
data, err := os.ReadFile(fullPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if shouldInlineAsText(fullPath, data, info.Mode()) {
|
|
artifact["storage"] = "inline"
|
|
artifact["content_text"] = string(data)
|
|
return artifact, nil
|
|
}
|
|
artifact["storage"] = "inline"
|
|
if len(data) > nodeArtifactInlineLimit {
|
|
data = data[:nodeArtifactInlineLimit]
|
|
artifact["truncated"] = true
|
|
}
|
|
artifact["content_base64"] = base64.StdEncoding.EncodeToString(data)
|
|
return artifact, nil
|
|
}
|
|
|
|
func stringListArg(args map[string]interface{}, key string) []string {
|
|
if len(args) == 0 {
|
|
return nil
|
|
}
|
|
items, ok := args[key].([]interface{})
|
|
if !ok {
|
|
return nil
|
|
}
|
|
out := make([]string, 0, len(items))
|
|
for _, item := range items {
|
|
value := strings.TrimSpace(fmt.Sprint(item))
|
|
if value == "" {
|
|
continue
|
|
}
|
|
out = append(out, value)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func mimeTypeForPath(path string) string {
|
|
switch strings.ToLower(filepath.Ext(path)) {
|
|
case ".md":
|
|
return "text/markdown"
|
|
case ".txt", ".log", ".json", ".yaml", ".yml", ".xml", ".csv":
|
|
return "text/plain"
|
|
case ".png":
|
|
return "image/png"
|
|
case ".jpg", ".jpeg":
|
|
return "image/jpeg"
|
|
case ".gif":
|
|
return "image/gif"
|
|
case ".webp":
|
|
return "image/webp"
|
|
case ".mp4":
|
|
return "video/mp4"
|
|
case ".mov":
|
|
return "video/quicktime"
|
|
case ".pdf":
|
|
return "application/pdf"
|
|
default:
|
|
return ""
|
|
}
|
|
}
|
|
|
|
func nodeArtifactKindFromPath(path string) string {
|
|
ext := strings.ToLower(filepath.Ext(path))
|
|
switch ext {
|
|
case ".png", ".jpg", ".jpeg", ".gif", ".webp":
|
|
return "image"
|
|
case ".mp4", ".mov", ".webm":
|
|
return "video"
|
|
case ".pdf":
|
|
return "document"
|
|
default:
|
|
return "file"
|
|
}
|
|
}
|
|
|
|
func shouldInlineAsText(path string, data []byte, mode fs.FileMode) bool {
|
|
if mode&fs.ModeType != 0 {
|
|
return false
|
|
}
|
|
switch strings.ToLower(filepath.Ext(path)) {
|
|
case ".md", ".txt", ".log", ".json", ".yaml", ".yml", ".xml", ".csv", ".go", ".ts", ".tsx", ".js", ".jsx", ".css", ".html", ".sh":
|
|
return len(data) <= nodeArtifactInlineLimit
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func captureNodeCameraSnapshot(ctx context.Context, workspace string, args map[string]interface{}) (string, error) {
|
|
outputPath, err := nodeMediaOutputPath(workspace, "camera", ".jpg", stringArg(args, "filename"))
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
switch runtime.GOOS {
|
|
case "linux":
|
|
if _, err := os.Stat("/dev/video0"); err != nil {
|
|
return "", fmt.Errorf("camera device /dev/video0 not found")
|
|
}
|
|
if _, err := exec.LookPath("ffmpeg"); err != nil {
|
|
return "", fmt.Errorf("ffmpeg not installed")
|
|
}
|
|
cmd := exec.CommandContext(ctx, "ffmpeg", "-y", "-f", "video4linux2", "-i", "/dev/video0", "-vframes", "1", "-q:v", "2", outputPath)
|
|
if out, err := cmd.CombinedOutput(); err != nil {
|
|
return "", fmt.Errorf("camera capture failed: %v, output=%s", err, strings.TrimSpace(string(out)))
|
|
}
|
|
return outputPath, nil
|
|
case "darwin":
|
|
if _, err := exec.LookPath("imagesnap"); err != nil {
|
|
return "", fmt.Errorf("imagesnap not installed")
|
|
}
|
|
cmd := exec.CommandContext(ctx, "imagesnap", "-q", outputPath)
|
|
if out, err := cmd.CombinedOutput(); err != nil {
|
|
return "", fmt.Errorf("camera capture failed: %v, output=%s", err, strings.TrimSpace(string(out)))
|
|
}
|
|
return outputPath, nil
|
|
default:
|
|
return "", fmt.Errorf("camera_snap not supported on %s", runtime.GOOS)
|
|
}
|
|
}
|
|
|
|
func captureNodeScreenSnapshot(ctx context.Context, workspace string, args map[string]interface{}) (string, error) {
|
|
outputPath, err := nodeMediaOutputPath(workspace, "screen", ".png", stringArg(args, "filename"))
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
switch runtime.GOOS {
|
|
case "darwin":
|
|
cmd := exec.CommandContext(ctx, "screencapture", "-x", outputPath)
|
|
if out, err := cmd.CombinedOutput(); err != nil {
|
|
return "", fmt.Errorf("screen capture failed: %v, output=%s", err, strings.TrimSpace(string(out)))
|
|
}
|
|
return outputPath, nil
|
|
case "linux":
|
|
candidates := [][]string{
|
|
{"grim", outputPath},
|
|
{"gnome-screenshot", "-f", outputPath},
|
|
{"scrot", outputPath},
|
|
{"import", "-window", "root", outputPath},
|
|
}
|
|
for _, candidate := range candidates {
|
|
if _, err := exec.LookPath(candidate[0]); err != nil {
|
|
continue
|
|
}
|
|
cmd := exec.CommandContext(ctx, candidate[0], candidate[1:]...)
|
|
if out, err := cmd.CombinedOutput(); err == nil {
|
|
return outputPath, nil
|
|
} else if strings.TrimSpace(string(out)) != "" {
|
|
continue
|
|
}
|
|
}
|
|
return "", fmt.Errorf("no supported screen capture command found (grim, gnome-screenshot, scrot, import)")
|
|
default:
|
|
return "", fmt.Errorf("screen_snapshot not supported on %s", runtime.GOOS)
|
|
}
|
|
}
|
|
|
|
func captureNodeCameraClip(ctx context.Context, workspace string, args map[string]interface{}) (string, error) {
|
|
outputPath, err := nodeMediaOutputPath(workspace, "camera", ".mp4", stringArg(args, "filename"))
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
durationSec := fmt.Sprintf("%.3f", float64(durationArg(args, "duration_ms", 3000))/1000.0)
|
|
switch runtime.GOOS {
|
|
case "linux":
|
|
if _, err := os.Stat("/dev/video0"); err != nil {
|
|
return "", fmt.Errorf("camera device /dev/video0 not found")
|
|
}
|
|
if _, err := exec.LookPath("ffmpeg"); err != nil {
|
|
return "", fmt.Errorf("ffmpeg not installed")
|
|
}
|
|
cmd := exec.CommandContext(ctx, "ffmpeg", "-y", "-f", "video4linux2", "-t", durationSec, "-i", "/dev/video0", "-pix_fmt", "yuv420p", outputPath)
|
|
if out, err := cmd.CombinedOutput(); err != nil {
|
|
return "", fmt.Errorf("camera clip failed: %v, output=%s", err, strings.TrimSpace(string(out)))
|
|
}
|
|
return outputPath, nil
|
|
case "darwin":
|
|
if _, err := exec.LookPath("ffmpeg"); err != nil {
|
|
return "", fmt.Errorf("ffmpeg not installed")
|
|
}
|
|
cmd := exec.CommandContext(ctx, "ffmpeg", "-y", "-f", "avfoundation", "-t", durationSec, "-i", "0:none", "-pix_fmt", "yuv420p", outputPath)
|
|
if out, err := cmd.CombinedOutput(); err != nil {
|
|
return "", fmt.Errorf("camera clip failed: %v, output=%s", err, strings.TrimSpace(string(out)))
|
|
}
|
|
return outputPath, nil
|
|
default:
|
|
return "", fmt.Errorf("camera_clip not supported on %s", runtime.GOOS)
|
|
}
|
|
}
|
|
|
|
func captureNodeScreenRecord(ctx context.Context, workspace string, args map[string]interface{}) (string, error) {
|
|
outputPath, err := nodeMediaOutputPath(workspace, "screen", ".mp4", stringArg(args, "filename"))
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
durationMs := durationArg(args, "duration_ms", 3000)
|
|
durationSec := fmt.Sprintf("%.3f", float64(durationMs)/1000.0)
|
|
durationWholeSec := strconv.Itoa((durationMs + 999) / 1000)
|
|
switch runtime.GOOS {
|
|
case "darwin":
|
|
if _, err := exec.LookPath("ffmpeg"); err == nil {
|
|
cmd := exec.CommandContext(ctx, "ffmpeg", "-y", "-f", "avfoundation", "-t", durationSec, "-i", "1:none", "-pix_fmt", "yuv420p", outputPath)
|
|
if out, err := cmd.CombinedOutput(); err == nil {
|
|
return outputPath, nil
|
|
} else if strings.TrimSpace(string(out)) != "" {
|
|
return "", fmt.Errorf("screen record failed: %v, output=%s", err, strings.TrimSpace(string(out)))
|
|
}
|
|
}
|
|
return "", fmt.Errorf("ffmpeg not installed")
|
|
case "linux":
|
|
candidates := [][]string{
|
|
{"ffmpeg", "-y", "-f", "x11grab", "-t", durationSec, "-i", os.Getenv("DISPLAY"), "-pix_fmt", "yuv420p", outputPath},
|
|
{"wf-recorder", "-f", outputPath, "-d", durationWholeSec},
|
|
}
|
|
for _, candidate := range candidates {
|
|
if candidate[0] == "ffmpeg" && strings.TrimSpace(os.Getenv("DISPLAY")) == "" {
|
|
continue
|
|
}
|
|
if _, err := exec.LookPath(candidate[0]); err != nil {
|
|
continue
|
|
}
|
|
cmd := exec.CommandContext(ctx, candidate[0], candidate[1:]...)
|
|
if out, err := cmd.CombinedOutput(); err == nil {
|
|
return outputPath, nil
|
|
} else if strings.TrimSpace(string(out)) != "" && candidate[0] == "ffmpeg" {
|
|
continue
|
|
}
|
|
}
|
|
return "", fmt.Errorf("no supported screen recorder found (ffmpeg x11grab or wf-recorder)")
|
|
default:
|
|
return "", fmt.Errorf("screen_record not supported on %s", runtime.GOOS)
|
|
}
|
|
}
|
|
|
|
func nodeMediaOutputPath(workspace, kind, ext, requested string) (string, error) {
|
|
root := strings.TrimSpace(workspace)
|
|
if root == "" {
|
|
return "", fmt.Errorf("workspace path not configured")
|
|
}
|
|
baseDir := filepath.Join(root, "artifacts", "node")
|
|
if err := os.MkdirAll(baseDir, 0755); err != nil {
|
|
return "", err
|
|
}
|
|
filename := strings.TrimSpace(requested)
|
|
if filename == "" {
|
|
filename = fmt.Sprintf("%s_%d%s", kind, time.Now().UnixNano(), ext)
|
|
}
|
|
filename = filepath.Clean(filename)
|
|
if filepath.IsAbs(filename) {
|
|
return "", fmt.Errorf("filename must be relative to workspace")
|
|
}
|
|
fullPath := filepath.Join(baseDir, filename)
|
|
fullPath = filepath.Clean(fullPath)
|
|
rel, err := filepath.Rel(root, fullPath)
|
|
if err != nil || rel == ".." || strings.HasPrefix(rel, ".."+string(os.PathSeparator)) {
|
|
return "", fmt.Errorf("capture path escapes workspace")
|
|
}
|
|
if err := os.MkdirAll(filepath.Dir(fullPath), 0755); err != nil {
|
|
return "", err
|
|
}
|
|
return fullPath, nil
|
|
}
|
|
|
|
func durationArg(args map[string]interface{}, key string, fallback int) int {
|
|
if len(args) == 0 {
|
|
return fallback
|
|
}
|
|
switch v := args[key].(type) {
|
|
case int:
|
|
if v > 0 {
|
|
return v
|
|
}
|
|
case int64:
|
|
if v > 0 {
|
|
return int(v)
|
|
}
|
|
case float64:
|
|
if v > 0 {
|
|
return int(v)
|
|
}
|
|
case json.Number:
|
|
if n, err := v.Int64(); err == nil && n > 0 {
|
|
return int(n)
|
|
}
|
|
}
|
|
return fallback
|
|
}
|
|
|
|
func structToWirePayload(v interface{}) map[string]interface{} {
|
|
b, _ := json.Marshal(v)
|
|
var out map[string]interface{}
|
|
_ = json.Unmarshal(b, &out)
|
|
if out == nil {
|
|
out = map[string]interface{}{}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func mapWirePayload(in map[string]interface{}, out interface{}) error {
|
|
if len(in) == 0 {
|
|
return fmt.Errorf("empty payload")
|
|
}
|
|
b, err := json.Marshal(in)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return json.Unmarshal(b, out)
|
|
}
|
|
|
|
func postNodeRegister(ctx context.Context, client *http.Client, gatewayBase, token string, info nodes.NodeInfo) error {
|
|
return postNodeJSON(ctx, client, gatewayBase, token, "/nodes/register", info)
|
|
}
|
|
|
|
func postNodeHeartbeat(ctx context.Context, client *http.Client, gatewayBase, token, id string) error {
|
|
return postNodeJSON(ctx, client, gatewayBase, token, "/nodes/heartbeat", map[string]string{"id": strings.TrimSpace(id)})
|
|
}
|
|
|
|
func postNodeJSON(ctx context.Context, client *http.Client, gatewayBase, token, path string, payload interface{}) error {
|
|
body, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, strings.TrimRight(gatewayBase, "/")+path, bytes.NewReader(body))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
if strings.TrimSpace(token) != "" {
|
|
req.Header.Set("Authorization", "Bearer "+strings.TrimSpace(token))
|
|
}
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
var out bytes.Buffer
|
|
_, _ = out.ReadFrom(resp.Body)
|
|
msg := strings.TrimSpace(out.String())
|
|
if msg == "" {
|
|
msg = resp.Status
|
|
}
|
|
return fmt.Errorf("http %d: %s", resp.StatusCode, msg)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func defaultGatewayBase(cfg *config.Config) string {
|
|
if raw := strings.TrimSpace(os.Getenv("CLAWGO_GATEWAY_URL")); raw != "" {
|
|
return normalizeGatewayBase(raw)
|
|
}
|
|
host := "127.0.0.1"
|
|
port := 18790
|
|
if cfg != nil {
|
|
if v := strings.TrimSpace(cfg.Gateway.Host); v != "" && v != "0.0.0.0" && v != "::" {
|
|
host = v
|
|
}
|
|
if cfg.Gateway.Port > 0 {
|
|
port = cfg.Gateway.Port
|
|
}
|
|
}
|
|
return fmt.Sprintf("http://%s:%d", host, port)
|
|
}
|
|
|
|
func defaultGatewayToken(cfg *config.Config) string {
|
|
if v := strings.TrimSpace(os.Getenv("CLAWGO_GATEWAY_TOKEN")); v != "" {
|
|
return v
|
|
}
|
|
if cfg == nil {
|
|
return ""
|
|
}
|
|
return strings.TrimSpace(cfg.Gateway.Token)
|
|
}
|
|
|
|
func normalizeGatewayBase(raw string) string {
|
|
raw = strings.TrimSpace(raw)
|
|
if raw == "" {
|
|
return ""
|
|
}
|
|
if !strings.HasPrefix(raw, "http://") && !strings.HasPrefix(raw, "https://") {
|
|
raw = "http://" + raw
|
|
}
|
|
return strings.TrimRight(raw, "/")
|
|
}
|
|
|
|
func nodeWebsocketURL(gatewayBase string) string {
|
|
base := normalizeGatewayBase(gatewayBase)
|
|
base = strings.TrimPrefix(base, "http://")
|
|
base = strings.TrimPrefix(base, "https://")
|
|
scheme := "ws://"
|
|
if strings.HasPrefix(strings.TrimSpace(gatewayBase), "https://") {
|
|
scheme = "wss://"
|
|
}
|
|
return scheme + base + "/nodes/connect"
|
|
}
|
|
|
|
func splitCSV(raw string) []string {
|
|
parts := strings.Split(raw, ",")
|
|
out := make([]string, 0, len(parts))
|
|
seen := map[string]bool{}
|
|
for _, part := range parts {
|
|
item := strings.TrimSpace(part)
|
|
if item == "" || seen[item] {
|
|
continue
|
|
}
|
|
seen[item] = true
|
|
out = append(out, item)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func capabilitiesFromCSV(raw string) nodes.Capabilities {
|
|
caps := nodes.Capabilities{}
|
|
for _, item := range splitCSV(raw) {
|
|
switch strings.ToLower(item) {
|
|
case "run":
|
|
caps.Run = true
|
|
case "invoke":
|
|
caps.Invoke = true
|
|
case "model", "agent_task":
|
|
caps.Model = true
|
|
case "camera":
|
|
caps.Camera = true
|
|
case "screen":
|
|
caps.Screen = true
|
|
case "location":
|
|
caps.Location = true
|
|
case "canvas":
|
|
caps.Canvas = true
|
|
}
|
|
}
|
|
return caps
|
|
}
|