This commit is contained in:
lpf
2026-02-13 23:45:05 +08:00
parent 3f209b6486
commit b57b4b14e7
6 changed files with 266 additions and 7 deletions

View File

@@ -15,6 +15,7 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"
@@ -31,6 +32,13 @@ import (
var errGatewayNotRunningSlash = errors.New("gateway not running")
const llmCallTimeout = 90 * time.Second
const perSessionQueueSize = 64
type sessionWorker struct {
queue chan bus.InboundMessage
cancelMu sync.Mutex
cancel context.CancelFunc
}
type AgentLoop struct {
bus *bus.MessageBus
@@ -45,6 +53,8 @@ type AgentLoop struct {
orchestrator *tools.Orchestrator
running atomic.Bool
compactionCfg config.ContextCompactionConfig
workersMu sync.Mutex
workers map[string]*sessionWorker
}
func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers.LLMProvider, cs *cron.CronService) *AgentLoop {
@@ -127,6 +137,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
tools: toolsRegistry,
orchestrator: orchestrator,
compactionCfg: cfg.Agents.Defaults.ContextCompaction,
workers: make(map[string]*sessionWorker),
}
// 注入递归运行逻辑,使 subagent 具备 full tool-calling 能力
@@ -144,15 +155,111 @@ func (al *AgentLoop) Run(ctx context.Context) error {
for al.running.Load() {
select {
case <-ctx.Done():
al.stopAllWorkers()
return nil
default:
msg, ok := al.bus.ConsumeInbound(ctx)
if !ok {
al.stopAllWorkers()
return nil
}
response, err := al.processMessage(ctx, msg)
if isStopCommand(msg.Content) {
al.handleStopCommand(msg)
continue
}
al.enqueueMessage(ctx, msg)
}
}
return nil
}
func (al *AgentLoop) Stop() {
al.running.Store(false)
al.stopAllWorkers()
}
func isStopCommand(content string) bool {
return strings.EqualFold(strings.TrimSpace(content), "/stop")
}
func (al *AgentLoop) handleStopCommand(msg bus.InboundMessage) {
worker := al.getWorker(msg.SessionKey)
if worker == nil {
return
}
worker.cancelMu.Lock()
cancel := worker.cancel
worker.cancelMu.Unlock()
if cancel == nil {
return
}
cancel()
}
func (al *AgentLoop) enqueueMessage(ctx context.Context, msg bus.InboundMessage) {
worker := al.getOrCreateWorker(ctx, msg.SessionKey)
select {
case worker.queue <- msg:
case <-ctx.Done():
case <-time.After(2 * time.Second):
al.bus.PublishOutbound(bus.OutboundMessage{
Channel: msg.Channel,
ChatID: msg.ChatID,
Content: "Message queue is busy. Please try again shortly.",
})
}
}
func (al *AgentLoop) getWorker(sessionKey string) *sessionWorker {
al.workersMu.Lock()
defer al.workersMu.Unlock()
return al.workers[sessionKey]
}
func (al *AgentLoop) getOrCreateWorker(ctx context.Context, sessionKey string) *sessionWorker {
al.workersMu.Lock()
defer al.workersMu.Unlock()
if w, ok := al.workers[sessionKey]; ok {
return w
}
w := &sessionWorker{
queue: make(chan bus.InboundMessage, perSessionQueueSize),
}
al.workers[sessionKey] = w
go al.runSessionWorker(ctx, sessionKey, w)
return w
}
func (al *AgentLoop) runSessionWorker(ctx context.Context, sessionKey string, worker *sessionWorker) {
for {
select {
case <-ctx.Done():
al.clearWorkerCancel(worker)
al.removeWorker(sessionKey, worker)
return
case msg := <-worker.queue:
taskCtx, cancel := context.WithCancel(ctx)
worker.cancelMu.Lock()
worker.cancel = cancel
worker.cancelMu.Unlock()
response, err := al.processMessage(taskCtx, msg)
cancel()
al.clearWorkerCancel(worker)
if err != nil {
if errors.Is(err, context.Canceled) {
continue
}
response = fmt.Sprintf("Error processing message: %v", err)
}
@@ -165,12 +272,38 @@ func (al *AgentLoop) Run(ctx context.Context) error {
}
}
}
return nil
}
func (al *AgentLoop) Stop() {
al.running.Store(false)
func (al *AgentLoop) clearWorkerCancel(worker *sessionWorker) {
worker.cancelMu.Lock()
worker.cancel = nil
worker.cancelMu.Unlock()
}
func (al *AgentLoop) removeWorker(sessionKey string, worker *sessionWorker) {
al.workersMu.Lock()
defer al.workersMu.Unlock()
if cur, ok := al.workers[sessionKey]; ok && cur == worker {
delete(al.workers, sessionKey)
}
}
func (al *AgentLoop) stopAllWorkers() {
al.workersMu.Lock()
workers := make([]*sessionWorker, 0, len(al.workers))
for _, w := range al.workers {
workers = append(workers, w)
}
al.workersMu.Unlock()
for _, w := range workers {
w.cancelMu.Lock()
cancel := w.cancel
w.cancelMu.Unlock()
if cancel != nil {
cancel()
}
}
}
func (al *AgentLoop) ProcessDirect(ctx context.Context, content, sessionKey string) (string, error) {
@@ -998,6 +1131,8 @@ func (al *AgentLoop) handleSlashCommand(content string) (bool, string, error) {
switch fields[0] {
case "/help":
return true, "Slash commands:\n/help\n/status\n/config get <path>\n/config set <path> <value>\n/reload\n/pipeline list\n/pipeline status <pipeline_id>\n/pipeline ready <pipeline_id>", nil
case "/stop":
return true, "Stop command is handled by queue runtime. Send /stop from your channel session to interrupt current response.", nil
case "/status":
cfg, err := config.LoadConfig(al.getConfigPathForCommands())
if err != nil {