mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-04-13 06:47:30 +08:00
296 lines
8.1 KiB
Go
296 lines
8.1 KiB
Go
package tools
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
)
|
|
|
|
type PipelineCreateTool struct {
|
|
orc *Orchestrator
|
|
}
|
|
|
|
func NewPipelineCreateTool(orc *Orchestrator) *PipelineCreateTool {
|
|
return &PipelineCreateTool{orc: orc}
|
|
}
|
|
|
|
func (t *PipelineCreateTool) Name() string { return "pipeline_create" }
|
|
|
|
func (t *PipelineCreateTool) Description() string {
|
|
return "Create a multi-agent pipeline with standardized task protocol (role/goal/dependencies/shared state)."
|
|
}
|
|
|
|
func (t *PipelineCreateTool) Parameters() map[string]interface{} {
|
|
return map[string]interface{}{
|
|
"type": "object",
|
|
"properties": map[string]interface{}{
|
|
"label": map[string]interface{}{
|
|
"type": "string",
|
|
"description": "Optional short pipeline label",
|
|
},
|
|
"objective": map[string]interface{}{
|
|
"type": "string",
|
|
"description": "Top-level objective for this pipeline",
|
|
},
|
|
"tasks": map[string]interface{}{
|
|
"type": "array",
|
|
"description": "Task list with id/role/goal/depends_on",
|
|
"items": map[string]interface{}{
|
|
"type": "object",
|
|
"properties": map[string]interface{}{
|
|
"id": map[string]interface{}{"type": "string"},
|
|
"role": map[string]interface{}{
|
|
"type": "string",
|
|
"description": "Agent role, e.g. research/coding/testing",
|
|
},
|
|
"goal": map[string]interface{}{"type": "string"},
|
|
"depends_on": map[string]interface{}{
|
|
"type": "array",
|
|
"items": map[string]interface{}{"type": "string"},
|
|
},
|
|
},
|
|
"required": []string{"id", "goal"},
|
|
},
|
|
},
|
|
},
|
|
"required": []string{"objective", "tasks"},
|
|
}
|
|
}
|
|
|
|
func (t *PipelineCreateTool) Execute(_ context.Context, args map[string]interface{}) (string, error) {
|
|
if t.orc == nil {
|
|
return "", fmt.Errorf("orchestrator is not configured")
|
|
}
|
|
|
|
objective, _ := args["objective"].(string)
|
|
label, _ := args["label"].(string)
|
|
|
|
rawTasks, ok := args["tasks"].([]interface{})
|
|
if !ok || len(rawTasks) == 0 {
|
|
return "", fmt.Errorf("tasks is required")
|
|
}
|
|
|
|
specs := make([]PipelineSpec, 0, len(rawTasks))
|
|
for i, item := range rawTasks {
|
|
m, ok := item.(map[string]interface{})
|
|
if !ok {
|
|
return "", fmt.Errorf("tasks[%d] must be object", i)
|
|
}
|
|
id, _ := m["id"].(string)
|
|
role, _ := m["role"].(string)
|
|
goal, _ := m["goal"].(string)
|
|
|
|
deps := make([]string, 0)
|
|
if rawDeps, ok := m["depends_on"].([]interface{}); ok {
|
|
for _, dep := range rawDeps {
|
|
if depS, ok := dep.(string); ok {
|
|
deps = append(deps, depS)
|
|
}
|
|
}
|
|
}
|
|
specs = append(specs, PipelineSpec{
|
|
ID: id,
|
|
Role: role,
|
|
Goal: goal,
|
|
DependsOn: deps,
|
|
})
|
|
}
|
|
|
|
p, err := t.orc.CreatePipeline(label, objective, "tool", "tool", specs)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return fmt.Sprintf("Pipeline created: %s (%d tasks)\nUse spawn with pipeline_id/task_id to run tasks.\nUse pipeline_dispatch to dispatch ready tasks.",
|
|
p.ID, len(p.Tasks)), nil
|
|
}
|
|
|
|
type PipelineStatusTool struct {
|
|
orc *Orchestrator
|
|
}
|
|
|
|
func NewPipelineStatusTool(orc *Orchestrator) *PipelineStatusTool {
|
|
return &PipelineStatusTool{orc: orc}
|
|
}
|
|
|
|
func (t *PipelineStatusTool) Name() string { return "pipeline_status" }
|
|
|
|
func (t *PipelineStatusTool) Description() string {
|
|
return "Get pipeline status, tasks progress, and shared state. If pipeline_id is empty, list recent pipelines."
|
|
}
|
|
|
|
func (t *PipelineStatusTool) Parameters() map[string]interface{} {
|
|
return map[string]interface{}{
|
|
"type": "object",
|
|
"properties": map[string]interface{}{
|
|
"pipeline_id": map[string]interface{}{
|
|
"type": "string",
|
|
"description": "Pipeline ID",
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (t *PipelineStatusTool) Execute(_ context.Context, args map[string]interface{}) (string, error) {
|
|
if t.orc == nil {
|
|
return "", fmt.Errorf("orchestrator is not configured")
|
|
}
|
|
pipelineID, _ := args["pipeline_id"].(string)
|
|
pipelineID = strings.TrimSpace(pipelineID)
|
|
|
|
if pipelineID == "" {
|
|
items := t.orc.ListPipelines()
|
|
if len(items) == 0 {
|
|
return "No pipelines found.", nil
|
|
}
|
|
var sb strings.Builder
|
|
sb.WriteString("Pipelines:\n")
|
|
for _, p := range items {
|
|
sb.WriteString(fmt.Sprintf("- %s [%s] %s\n", p.ID, p.Status, p.Label))
|
|
}
|
|
return sb.String(), nil
|
|
}
|
|
|
|
return t.orc.SnapshotJSON(pipelineID)
|
|
}
|
|
|
|
type PipelineStateSetTool struct {
|
|
orc *Orchestrator
|
|
}
|
|
|
|
func NewPipelineStateSetTool(orc *Orchestrator) *PipelineStateSetTool {
|
|
return &PipelineStateSetTool{orc: orc}
|
|
}
|
|
|
|
func (t *PipelineStateSetTool) Name() string { return "pipeline_state_set" }
|
|
|
|
func (t *PipelineStateSetTool) Description() string {
|
|
return "Set shared state key/value for a pipeline, allowing sub-agents to share intermediate results."
|
|
}
|
|
|
|
func (t *PipelineStateSetTool) Parameters() map[string]interface{} {
|
|
return map[string]interface{}{
|
|
"type": "object",
|
|
"properties": map[string]interface{}{
|
|
"pipeline_id": map[string]interface{}{"type": "string"},
|
|
"key": map[string]interface{}{"type": "string"},
|
|
"value": map[string]interface{}{
|
|
"description": "Any JSON-serializable value",
|
|
},
|
|
},
|
|
"required": []string{"pipeline_id", "key", "value"},
|
|
}
|
|
}
|
|
|
|
func (t *PipelineStateSetTool) Execute(_ context.Context, args map[string]interface{}) (string, error) {
|
|
if t.orc == nil {
|
|
return "", fmt.Errorf("orchestrator is not configured")
|
|
}
|
|
pipelineID, _ := args["pipeline_id"].(string)
|
|
key, _ := args["key"].(string)
|
|
value, ok := args["value"]
|
|
if !ok {
|
|
return "", fmt.Errorf("value is required")
|
|
}
|
|
if err := t.orc.SetSharedState(strings.TrimSpace(pipelineID), strings.TrimSpace(key), value); err != nil {
|
|
return "", err
|
|
}
|
|
return fmt.Sprintf("Updated pipeline shared state: %s.%s", pipelineID, key), nil
|
|
}
|
|
|
|
type PipelineDispatchTool struct {
|
|
orc *Orchestrator
|
|
spawn *SubagentManager
|
|
}
|
|
|
|
func NewPipelineDispatchTool(orc *Orchestrator, spawn *SubagentManager) *PipelineDispatchTool {
|
|
return &PipelineDispatchTool{orc: orc, spawn: spawn}
|
|
}
|
|
|
|
func (t *PipelineDispatchTool) Name() string { return "pipeline_dispatch" }
|
|
|
|
func (t *PipelineDispatchTool) Description() string {
|
|
return "Dispatch all dependency-ready tasks in a pipeline by spawning subagents automatically."
|
|
}
|
|
|
|
func (t *PipelineDispatchTool) Parameters() map[string]interface{} {
|
|
return map[string]interface{}{
|
|
"type": "object",
|
|
"properties": map[string]interface{}{
|
|
"pipeline_id": map[string]interface{}{
|
|
"type": "string",
|
|
"description": "Pipeline ID",
|
|
},
|
|
"max_dispatch": map[string]interface{}{
|
|
"type": "integer",
|
|
"description": "Maximum number of tasks to dispatch in this call (default 3)",
|
|
"default": 3,
|
|
},
|
|
},
|
|
"required": []string{"pipeline_id"},
|
|
}
|
|
}
|
|
|
|
func (t *PipelineDispatchTool) Execute(ctx context.Context, args map[string]interface{}) (string, error) {
|
|
if t.orc == nil || t.spawn == nil {
|
|
return "", fmt.Errorf("pipeline dispatcher is not configured")
|
|
}
|
|
|
|
pipelineID, _ := args["pipeline_id"].(string)
|
|
pipelineID = strings.TrimSpace(pipelineID)
|
|
if pipelineID == "" {
|
|
return "", fmt.Errorf("pipeline_id is required")
|
|
}
|
|
|
|
maxDispatch := 3
|
|
if raw, ok := args["max_dispatch"].(float64); ok && raw > 0 {
|
|
maxDispatch = int(raw)
|
|
}
|
|
|
|
ready, err := t.orc.ReadyTasks(pipelineID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if len(ready) == 0 {
|
|
return fmt.Sprintf("No ready tasks for pipeline %s", pipelineID), nil
|
|
}
|
|
|
|
dispatched := 0
|
|
var lines []string
|
|
|
|
for _, task := range ready {
|
|
if dispatched >= maxDispatch {
|
|
break
|
|
}
|
|
shared := map[string]interface{}{}
|
|
if p, ok := t.orc.GetPipeline(pipelineID); ok {
|
|
for k, v := range p.SharedState {
|
|
shared[k] = v
|
|
}
|
|
}
|
|
|
|
payload := task.Goal
|
|
if len(shared) > 0 {
|
|
sharedJSON, _ := json.Marshal(shared)
|
|
payload = fmt.Sprintf("%s\n\nShared State:\n%s", payload, string(sharedJSON))
|
|
}
|
|
|
|
label := task.ID
|
|
if task.Role != "" {
|
|
label = fmt.Sprintf("%s:%s", task.Role, task.ID)
|
|
}
|
|
if _, err := t.spawn.Spawn(ctx, payload, label, "tool", "tool", pipelineID, task.ID); err != nil {
|
|
lines = append(lines, fmt.Sprintf("- %s failed: %v", task.ID, err))
|
|
continue
|
|
}
|
|
dispatched++
|
|
lines = append(lines, fmt.Sprintf("- %s dispatched", task.ID))
|
|
}
|
|
|
|
if len(lines) == 0 {
|
|
return "No tasks dispatched.", nil
|
|
}
|
|
return fmt.Sprintf("Pipeline %s dispatch result:\n%s", pipelineID, strings.Join(lines, "\n")), nil
|
|
}
|