This commit is contained in:
lpf
2026-02-21 10:10:26 +08:00
parent 736cc18ffe
commit 6724d0e10f

View File

@@ -34,7 +34,6 @@ import (
var errGatewayNotRunningSlash = errors.New("gateway not running")
const perSessionQueueSize = 64
const autoLearnDefaultInterval = 10 * time.Minute
const autoLearnMinInterval = 30 * time.Second
const autonomyDefaultIdleInterval = 30 * time.Minute
@@ -67,9 +66,11 @@ const compactionAttemptTimeout = 8 * time.Second
const compactionRetryPerCandidate = 1
type sessionWorker struct {
queue chan bus.InboundMessage
cancelMu sync.Mutex
cancel context.CancelFunc
cancelMu sync.Mutex
cancel context.CancelFunc
queueMu sync.Mutex
queue []bus.InboundMessage
queueNotify chan struct{}
}
type autoLearner struct {
@@ -250,14 +251,6 @@ type runControlIntentLLMResponse struct {
}
var defaultParallelSafeToolNames = []string{"read_file", "list_files", "find_files", "grep_files", "memory_search", "web_search", "repo_map", "system_info"}
var autonomyIntentKeywords = []string{
"autonomy", "autonomous", "autonomy mode", "self-driven", "self driven",
"自主", "自驱", "自主模式", "自动执行", "自治模式",
}
var autoLearnIntentKeywords = []string{
"auto-learn", "autolearn", "learning loop", "learn loop",
"自学习", "学习循环", "自动学习", "学习模式",
}
type stageReporter struct {
onUpdate func(content string)
@@ -685,7 +678,11 @@ func isStopCommand(content string) bool {
}
func (al *AgentLoop) handleStopCommand(msg bus.InboundMessage) {
worker := al.getWorker(msg.SessionKey)
al.cancelSessionWorkerRun(msg.SessionKey)
}
func (al *AgentLoop) cancelSessionWorkerRun(sessionKey string) {
worker := al.getWorker(sessionKey)
if worker == nil {
return
}
@@ -702,18 +699,9 @@ func (al *AgentLoop) handleStopCommand(msg bus.InboundMessage) {
}
func (al *AgentLoop) enqueueMessage(ctx context.Context, msg bus.InboundMessage) {
_ = ctx
worker := al.getOrCreateWorker(ctx, msg.SessionKey)
select {
case worker.queue <- msg:
case <-ctx.Done():
case <-time.After(5 * time.Second):
al.bus.PublishOutbound(bus.OutboundMessage{
Buttons: nil,
Channel: msg.Channel,
ChatID: msg.ChatID,
Content: "The message queue is currently busy. Please try again shortly.",
})
}
worker.enqueue(msg)
}
func (al *AgentLoop) getWorker(sessionKey string) *sessionWorker {
@@ -731,7 +719,8 @@ func (al *AgentLoop) getOrCreateWorker(ctx context.Context, sessionKey string) *
}
w := &sessionWorker{
queue: make(chan bus.InboundMessage, perSessionQueueSize),
queue: make([]bus.InboundMessage, 0, 16),
queueNotify: make(chan struct{}, 1),
}
al.workers[sessionKey] = w
@@ -741,58 +730,94 @@ func (al *AgentLoop) getOrCreateWorker(ctx context.Context, sessionKey string) *
func (al *AgentLoop) runSessionWorker(ctx context.Context, sessionKey string, worker *sessionWorker) {
for {
select {
case <-ctx.Done():
msg, ok := worker.dequeue(ctx)
if !ok {
al.clearWorkerCancel(worker)
al.removeWorker(sessionKey, worker)
return
case msg := <-worker.queue:
func() {
taskCtx, cancel := context.WithCancel(ctx)
taskCtx, tokenTotals := withTokenUsageTotals(taskCtx)
worker.cancelMu.Lock()
worker.cancel = cancel
worker.cancelMu.Unlock()
}
defer func() {
cancel()
al.clearWorkerCancel(worker)
if r := recover(); r != nil {
logger.ErrorCF("agent", "Session worker recovered from panic", map[string]interface{}{
"session_key": sessionKey,
"panic": fmt.Sprintf("%v", r),
})
}
}()
func() {
taskCtx, cancel := context.WithCancel(ctx)
taskCtx, tokenTotals := withTokenUsageTotals(taskCtx)
worker.cancelMu.Lock()
worker.cancel = cancel
worker.cancelMu.Unlock()
response, err := al.processMessage(taskCtx, msg)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
response = al.formatProcessingErrorMessage(taskCtx, msg, err)
}
if response != "" && shouldPublishSyntheticResponse(msg) {
if al != nil && al.sessions != nil && tokenTotals != nil {
al.sessions.AddTokenUsage(
msg.SessionKey,
tokenTotals.input,
tokenTotals.output,
tokenTotals.total,
)
}
response += formatTokenUsageSuffix(
tokenTotals,
)
al.bus.PublishOutbound(bus.OutboundMessage{
Buttons: nil,
Channel: msg.Channel,
ChatID: msg.ChatID,
Content: response,
defer func() {
cancel()
al.clearWorkerCancel(worker)
if r := recover(); r != nil {
logger.ErrorCF("agent", "Session worker recovered from panic", map[string]interface{}{
"session_key": sessionKey,
"panic": fmt.Sprintf("%v", r),
})
}
}()
response, err := al.processMessage(taskCtx, msg)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
response = al.formatProcessingErrorMessage(taskCtx, msg, err)
}
if response != "" && shouldPublishSyntheticResponse(msg) {
if al != nil && al.sessions != nil && tokenTotals != nil {
al.sessions.AddTokenUsage(
msg.SessionKey,
tokenTotals.input,
tokenTotals.output,
tokenTotals.total,
)
}
response += formatTokenUsageSuffix(
tokenTotals,
)
al.bus.PublishOutbound(bus.OutboundMessage{
Buttons: nil,
Channel: msg.Channel,
ChatID: msg.ChatID,
Content: response,
})
}
}()
}
}
func (w *sessionWorker) enqueue(msg bus.InboundMessage) {
if w == nil {
return
}
w.queueMu.Lock()
w.queue = append(w.queue, msg)
w.queueMu.Unlock()
select {
case w.queueNotify <- struct{}{}:
default:
}
}
func (w *sessionWorker) dequeue(ctx context.Context) (bus.InboundMessage, bool) {
if w == nil {
return bus.InboundMessage{}, false
}
for {
w.queueMu.Lock()
if len(w.queue) > 0 {
msg := w.queue[0]
w.queue[0] = bus.InboundMessage{}
w.queue = w.queue[1:]
w.queueMu.Unlock()
return msg, true
}
w.queueMu.Unlock()
select {
case <-ctx.Done():
return bus.InboundMessage{}, false
case <-w.queueNotify:
}
}
}
@@ -1059,11 +1084,12 @@ func (al *AgentLoop) maybeRunAutonomyRound(msg bus.InboundMessage) bool {
}
delete(al.autonomyBySess, msg.SessionKey)
al.autonomyMu.Unlock()
al.cancelSessionWorkerRun(msg.SessionKey)
al.controlMetricAdd(&al.controlStats.autonomyStoppedByGuard, 1)
al.bus.PublishOutbound(bus.OutboundMessage{
Channel: msg.Channel,
ChatID: msg.ChatID,
Content: "Autonomy mode stopped automatically because background rounds stalled repeatedly.",
Content: al.autonomyGuardStopMessage(msg.SessionKey),
})
return false
}
@@ -1083,7 +1109,7 @@ func (al *AgentLoop) maybeRunAutonomyRound(msg bus.InboundMessage) bool {
al.bus.PublishOutbound(bus.OutboundMessage{
Channel: msg.Channel,
ChatID: msg.ChatID,
Content: "Autonomy mode paused automatically after many unattended rounds. Send a new request to continue.",
Content: al.autonomyGuardPauseMessage(msg.SessionKey),
})
return false
}
@@ -1132,6 +1158,61 @@ func (al *AgentLoop) finishAutonomyRound(sessionKey string) {
}
}
func (al *AgentLoop) autonomyGuardStopMessage(sessionKey string) string {
return al.renderControlNotice(
sessionKey,
"Explain autonomy mode stopped automatically because background rounds stalled repeatedly, and suggest sending a new request to continue.",
"自主模式已因后台轮次连续停滞而自动停止;如需继续,请发送新的请求。",
)
}
func (al *AgentLoop) autonomyGuardPauseMessage(sessionKey string) string {
return al.renderControlNotice(
sessionKey,
"Explain autonomy mode paused automatically after too many unattended rounds, and suggest sending a new request to continue.",
"自主模式已在长时间无用户输入后自动暂停;如需继续,请发送新的请求。",
)
}
func (al *AgentLoop) autoLearnGuardStopMessage(sessionKey string) string {
return al.renderControlNotice(
sessionKey,
"Explain auto-learn stopped automatically after reaching the unattended round limit.",
"自动学习已因达到无人值守轮次上限而自动停止。",
)
}
func (al *AgentLoop) renderControlNotice(sessionKey, requirement, fallback string) string {
if al == nil {
return fallback
}
ctx, cancel := context.WithTimeout(
withUserLanguageHint(context.Background(), sessionKey, ""),
8*time.Second,
)
defer cancel()
systemPrompt := al.withBootstrapPolicy(`Generate one short user-facing notification sentence.
Requirements:
- Use natural language only.
- ` + strings.TrimSpace(requirement))
resp, err := al.callLLMWithModelFallback(ctx, []providers.Message{
{Role: "system", Content: systemPrompt},
}, nil, map[string]interface{}{
"max_tokens": 80,
"temperature": 0.2,
})
if err == nil && resp != nil {
text := strings.TrimSpace(resp.Content)
if text != "" {
return text
}
}
return fallback
}
func buildAutonomyFollowUpPrompt(round int, focus string, reportDue bool) string {
focus = strings.TrimSpace(focus)
if focus == "" && reportDue {
@@ -1199,7 +1280,7 @@ func (al *AgentLoop) runAutoLearnerLoop(ctx context.Context, msg bus.InboundMess
al.bus.PublishOutbound(bus.OutboundMessage{
Channel: msg.Channel,
ChatID: msg.ChatID,
Content: "Auto-learn stopped automatically after reaching the unattended round limit.",
Content: al.autoLearnGuardStopMessage(msg.SessionKey),
})
return false
}
@@ -2005,6 +2086,11 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
})
if isAutonomySyntheticMessage(msg) {
// Autonomy may have been stopped while synthetic messages are still queued.
// Drop stale synthetic rounds so they don't block real user turns.
if !al.isAutonomyEnabled(msg.SessionKey) {
return "", nil
}
defer al.finishAutonomyRound(msg.SessionKey)
}
@@ -4801,9 +4887,6 @@ func isExplicitRunCommand(content string) bool {
}
func (al *AgentLoop) detectAutonomyIntent(ctx context.Context, content string) (autonomyIntent, intentDetectionOutcome) {
if !shouldAttemptAutonomyIntentInference(content) {
return autonomyIntent{}, intentDetectionOutcome{}
}
if intent, confidence, ok := al.inferAutonomyIntent(ctx, content); ok {
al.controlMetricAdd(&al.controlStats.intentAutonomyMatched, 1)
return intent, intentDetectionOutcome{matched: true, confidence: confidence}
@@ -4813,9 +4896,6 @@ func (al *AgentLoop) detectAutonomyIntent(ctx context.Context, content string) (
}
func (al *AgentLoop) detectAutoLearnIntent(ctx context.Context, content string) (autoLearnIntent, intentDetectionOutcome) {
if !shouldAttemptAutoLearnIntentInference(content) {
return autoLearnIntent{}, intentDetectionOutcome{}
}
if intent, confidence, ok := al.inferAutoLearnIntent(ctx, content); ok {
al.controlMetricAdd(&al.controlStats.intentAutoLearnMatched, 1)
return intent, intentDetectionOutcome{matched: true, confidence: confidence}
@@ -4885,14 +4965,6 @@ Rules:
return intent, parsed.Confidence, true
}
func shouldAttemptAutonomyIntentInference(content string) bool {
lower := strings.ToLower(strings.TrimSpace(content))
if lower == "" {
return false
}
return containsAnySubstring(lower, autonomyIntentKeywords...)
}
func extractJSONObject(text string) string {
s := strings.TrimSpace(text)
if s == "" {
@@ -4974,14 +5046,6 @@ Rules:
return intent, parsed.Confidence, true
}
func shouldAttemptAutoLearnIntentInference(content string) bool {
lower := strings.ToLower(strings.TrimSpace(content))
if lower == "" {
return false
}
return containsAnySubstring(lower, autoLearnIntentKeywords...)
}
func (al *AgentLoop) inferTaskExecutionDirectives(ctx context.Context, content string) (taskExecutionDirectives, bool) {
text := strings.TrimSpace(content)
if text == "" || len(text) > 1200 {