diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 2a6fd8f..6499e4b 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -63,6 +63,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers sessionsManager := session.NewSessionManager(filepath.Join(filepath.Dir(cfg.WorkspacePath()), "sessions")) toolsRegistry := tools.NewToolRegistry() + processManager := tools.NewProcessManager() readTool := tools.NewReadFileTool(workspace) writeTool := tools.NewWriteFileTool(workspace) listTool := tools.NewListDirTool(workspace) @@ -73,7 +74,8 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers toolsRegistry.Register(tools.NewAliasTool("read", "Read file content (OpenClaw-compatible alias of read_file)", readTool, map[string]string{"file_path": "path"})) toolsRegistry.Register(tools.NewAliasTool("write", "Write file content (OpenClaw-compatible alias of write_file)", writeTool, map[string]string{"file_path": "path"})) toolsRegistry.Register(tools.NewAliasTool("edit", "Edit file content (OpenClaw-compatible alias of edit_file)", tools.NewEditFileTool(workspace), map[string]string{"file_path": "path", "old_string": "oldText", "new_string": "newText"})) - toolsRegistry.Register(tools.NewExecTool(cfg.Tools.Shell, workspace)) + toolsRegistry.Register(tools.NewExecTool(cfg.Tools.Shell, workspace, processManager)) + toolsRegistry.Register(tools.NewProcessTool(processManager)) if cs != nil { toolsRegistry.Register(tools.NewRemindTool(cs)) diff --git a/pkg/tools/process_manager.go b/pkg/tools/process_manager.go new file mode 100644 index 0000000..3c85edb --- /dev/null +++ b/pkg/tools/process_manager.go @@ -0,0 +1,166 @@ +package tools + +import ( + "bytes" + "fmt" + "os/exec" + "sort" + "strconv" + "sync" + "sync/atomic" + "time" +) + +type processSession struct { + ID string + Command string + StartedAt time.Time + EndedAt time.Time + ExitCode *int + cmd *exec.Cmd + done chan struct{} + mu sync.RWMutex + log bytes.Buffer +} + +type ProcessManager struct { + mu sync.RWMutex + sessions map[string]*processSession + seq uint64 +} + +func NewProcessManager() *ProcessManager { + return &ProcessManager{sessions: map[string]*processSession{}} +} + +func (m *ProcessManager) Start(command, cwd string) (string, error) { + id := "p-" + strconv.FormatUint(atomic.AddUint64(&m.seq, 1), 10) + cmd := exec.Command("sh", "-c", command) + if cwd != "" { + cmd.Dir = cwd + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return "", err + } + stderr, err := cmd.StderrPipe() + if err != nil { + return "", err + } + s := &processSession{ID: id, Command: command, StartedAt: time.Now().UTC(), cmd: cmd, done: make(chan struct{})} + + m.mu.Lock() + m.sessions[id] = s + m.mu.Unlock() + + if err := cmd.Start(); err != nil { + m.mu.Lock() + delete(m.sessions, id) + m.mu.Unlock() + return "", err + } + + go m.capture(s, stdout) + go m.capture(s, stderr) + go func() { + err := cmd.Wait() + code := 0 + if err != nil { + if ee, ok := err.(*exec.ExitError); ok { + code = ee.ExitCode() + } else { + code = -1 + } + } + s.mu.Lock() + s.EndedAt = time.Now().UTC() + s.ExitCode = &code + s.mu.Unlock() + close(s.done) + }() + + return id, nil +} + +func (m *ProcessManager) capture(s *processSession, r interface{ Read([]byte) (int, error) }) { + buf := make([]byte, 2048) + for { + n, err := r.Read(buf) + if n > 0 { + s.mu.Lock() + _, _ = s.log.Write(buf[:n]) + s.mu.Unlock() + } + if err != nil { + return + } + } +} + +func (m *ProcessManager) List() []map[string]interface{} { + m.mu.RLock() + items := make([]*processSession, 0, len(m.sessions)) + for _, s := range m.sessions { + items = append(items, s) + } + m.mu.RUnlock() + sort.Slice(items, func(i, j int) bool { return items[i].StartedAt.After(items[j].StartedAt) }) + out := make([]map[string]interface{}, 0, len(items)) + for _, s := range items { + s.mu.RLock() + running := s.ExitCode == nil + code := interface{}(nil) + if s.ExitCode != nil { + code = *s.ExitCode + } + out = append(out, map[string]interface{}{"id": s.ID, "command": s.Command, "running": running, "exit_code": code, "started_at": s.StartedAt.Format(time.RFC3339)}) + s.mu.RUnlock() + } + return out +} + +func (m *ProcessManager) Get(id string) (*processSession, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + s, ok := m.sessions[id] + return s, ok +} + +func (m *ProcessManager) Log(id string, offset, limit int) (string, error) { + s, ok := m.Get(id) + if !ok { + return "", fmt.Errorf("session not found: %s", id) + } + s.mu.RLock() + defer s.mu.RUnlock() + b := s.log.Bytes() + if offset < 0 { + offset = 0 + } + if offset > len(b) { + offset = len(b) + } + end := len(b) + if limit > 0 && offset+limit < end { + end = offset + limit + } + return string(b[offset:end]), nil +} + +func (m *ProcessManager) Kill(id string) error { + s, ok := m.Get(id) + if !ok { + return fmt.Errorf("session not found: %s", id) + } + s.mu.RLock() + cmd := s.cmd + running := s.ExitCode == nil + s.mu.RUnlock() + if !running { + return nil + } + if cmd.Process == nil { + return fmt.Errorf("process not started") + } + return cmd.Process.Kill() +} diff --git a/pkg/tools/process_tool.go b/pkg/tools/process_tool.go new file mode 100644 index 0000000..b7e8fbb --- /dev/null +++ b/pkg/tools/process_tool.go @@ -0,0 +1,84 @@ +package tools + +import ( + "context" + "encoding/json" + "time" +) + +type ProcessTool struct{ m *ProcessManager } + +func NewProcessTool(m *ProcessManager) *ProcessTool { return &ProcessTool{m: m} } +func (t *ProcessTool) Name() string { return "process" } +func (t *ProcessTool) Description() string { + return "Manage background exec sessions: list, poll, log, kill" +} +func (t *ProcessTool) Parameters() map[string]interface{} { + return map[string]interface{}{"type": "object", "properties": map[string]interface{}{ + "action": map[string]interface{}{"type": "string", "description": "list|poll|log|kill"}, + "session_id": map[string]interface{}{"type": "string"}, + "offset": map[string]interface{}{"type": "integer"}, + "limit": map[string]interface{}{"type": "integer"}, + "timeout_ms": map[string]interface{}{"type": "integer"}, + }, "required": []string{"action"}} +} + +func (t *ProcessTool) Execute(ctx context.Context, args map[string]interface{}) (string, error) { + action, _ := args["action"].(string) + sid, _ := args["session_id"].(string) + if sid == "" { + sid, _ = args["sessionId"].(string) + } + switch action { + case "list": + b, _ := json.Marshal(t.m.List()) + return string(b), nil + case "log": + off := toInt(args["offset"]) + lim := toInt(args["limit"]) + return t.m.Log(sid, off, lim) + case "kill": + if err := t.m.Kill(sid); err != nil { + return "", err + } + return "killed", nil + case "poll": + timeout := toInt(args["timeout_ms"]) + if timeout < 0 { + timeout = 0 + } + s, ok := t.m.Get(sid) + if !ok { + return "", nil + } + if timeout > 0 { + select { + case <-s.done: + case <-time.After(time.Duration(timeout) * time.Millisecond): + case <-ctx.Done(): + } + } + s.mu.RLock() + defer s.mu.RUnlock() + resp := map[string]interface{}{"id": s.ID, "running": s.ExitCode == nil, "started_at": s.StartedAt.Format(time.RFC3339)} + if s.ExitCode != nil { + resp["exit_code"] = *s.ExitCode + resp["ended_at"] = s.EndedAt.Format(time.RFC3339) + } + b, _ := json.Marshal(resp) + return string(b), nil + default: + return "", nil + } +} + +func toInt(v interface{}) int { + switch x := v.(type) { + case float64: + return int(x) + case int: + return x + default: + return 0 + } +} diff --git a/pkg/tools/shell.go b/pkg/tools/shell.go index bc82f39..bc6e219 100644 --- a/pkg/tools/shell.go +++ b/pkg/tools/shell.go @@ -20,14 +20,16 @@ type ExecTool struct { timeout time.Duration sandboxEnabled bool sandboxImage string + procManager *ProcessManager } -func NewExecTool(cfg config.ShellConfig, workspace string) *ExecTool { +func NewExecTool(cfg config.ShellConfig, workspace string, pm *ProcessManager) *ExecTool { return &ExecTool{ workingDir: workspace, timeout: cfg.Timeout, sandboxEnabled: cfg.Sandbox.Enabled, sandboxImage: cfg.Sandbox.Image, + procManager: pm, } } @@ -51,6 +53,10 @@ func (t *ExecTool) Parameters() map[string]interface{} { "type": "string", "description": "Optional working directory for the command", }, + "background": map[string]interface{}{ + "type": "boolean", + "description": "Run command in background and return session id", + }, }, "required": []string{"command"}, } @@ -74,6 +80,17 @@ func (t *ExecTool) Execute(ctx context.Context, args map[string]interface{}) (st } } + if bg, _ := args["background"].(bool); bg { + if t.procManager == nil { + return "", fmt.Errorf("background process manager not configured") + } + sid, err := t.procManager.Start(command, cwd) + if err != nil { + return "", err + } + return fmt.Sprintf("{\"session_id\":%q,\"running\":true}", sid), nil + } + if t.sandboxEnabled { return t.executeInSandbox(ctx, command, cwd) }