From 1b21ae9ccb10d31b8287913efe0e326e40c99508 Mon Sep 17 00:00:00 2001 From: DBT Date: Thu, 12 Feb 2026 05:34:02 +0000 Subject: [PATCH] feat: implement parallel_fetch tool for concurrent web page retrieval --- pkg/agent/loop.go | 4 +- pkg/tools/parallel_fetch.go | 76 +++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 pkg/tools/parallel_fetch.go diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 764da2b..789c2b9 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -52,7 +52,9 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers braveAPIKey := cfg.Tools.Web.Search.APIKey toolsRegistry.Register(tools.NewWebSearchTool(braveAPIKey, cfg.Tools.Web.Search.MaxResults)) - toolsRegistry.Register(tools.NewWebFetchTool(50000)) + webFetchTool := tools.NewWebFetchTool(50000) + toolsRegistry.Register(webFetchTool) + toolsRegistry.Register(tools.NewParallelFetchTool(webFetchTool)) // Register message tool messageTool := tools.NewMessageTool() diff --git a/pkg/tools/parallel_fetch.go b/pkg/tools/parallel_fetch.go new file mode 100644 index 0000000..ed42ff9 --- /dev/null +++ b/pkg/tools/parallel_fetch.go @@ -0,0 +1,76 @@ +package tools + +import ( + "context" + "fmt" + "sync" +) + +type ParallelFetchTool struct { + fetcher *WebFetchTool +} + +func NewParallelFetchTool(fetcher *WebFetchTool) *ParallelFetchTool { + return &ParallelFetchTool{fetcher: fetcher} +} + +func (t *ParallelFetchTool) Name() string { + return "parallel_fetch" +} + +func (t *ParallelFetchTool) Description() string { + return "Fetch multiple URLs concurrently. Useful for comparing information across different sites or gathering diverse sources quickly." +} + +func (t *ParallelFetchTool) Parameters() map[string]interface{} { + return map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "urls": map[string]interface{}{ + "type": "array", + "items": map[string]interface{}{ + "type": "string", + }, + "description": "List of URLs to fetch", + }, + }, + "required": []string{"urls"}, + } +} + +func (t *ParallelFetchTool) Execute(ctx context.Context, args map[string]interface{}) (string, error) { + urlsRaw, ok := args["urls"].([]interface{}) + if !ok { + return "", fmt.Errorf("urls must be an array") + } + + results := make([]string, len(urlsRaw)) + var wg sync.WaitGroup + + for i, u := range urlsRaw { + urlStr, ok := u.(string) + if !ok { + continue + } + + wg.Add(1) + go func(index int, url string) { + defer wg.Done() + res, err := t.fetcher.Execute(ctx, map[string]interface{}{"url": url}) + if err != nil { + results[index] = fmt.Sprintf("Error fetching %s: %v", url, err) + } else { + results[index] = res + } + }(i, urlStr) + } + + wg.Wait() + + var output string + for i, res := range results { + output += fmt.Sprintf("=== Result %d ===\n%s\n\n", i+1, res) + } + + return output, nil +}