webui: add realtime logs stream and APIs for stream chat/channels/skills/exec approvals

This commit is contained in:
DBT
2026-02-25 16:58:17 +00:00
parent 8c319ce6ed
commit ed60785fda
2 changed files with 289 additions and 0 deletions

View File

@@ -179,6 +179,8 @@ func gatewayCmd() {
registryServer := nodes.NewRegistryServer(cfg.Gateway.Host, cfg.Gateway.Port, cfg.Gateway.Token, nodes.DefaultManager())
registryServer.SetConfigPath(getConfigPath())
registryServer.SetWorkspacePath(cfg.WorkspacePath())
registryServer.SetLogFilePath(cfg.LogFilePath())
registryServer.SetWebUIDir(filepath.Join(cfg.WorkspacePath(), "webui-dist"))
registryServer.SetChatHandler(func(cctx context.Context, sessionKey, content string) (string, error) {
if strings.TrimSpace(content) == "" {
@@ -186,6 +188,9 @@ func gatewayCmd() {
}
return agentLoop.ProcessDirect(cctx, content, sessionKey)
})
registryServer.SetChannelsHandler(func() interface{} {
return channelManager.GetStatus()
})
registryServer.SetConfigAfterHook(func() {
_ = syscall.Kill(os.Getpid(), syscall.SIGHUP)
})

View File

@@ -20,9 +20,12 @@ type RegistryServer struct {
mgr *Manager
server *http.Server
configPath string
workspacePath string
logFilePath string
onChat func(ctx context.Context, sessionKey, content string) (string, error)
onConfigAfter func()
onCron func(action string, args map[string]interface{}) (interface{}, error)
onChannels func() interface{}
webUIDir string
}
@@ -38,9 +41,12 @@ func NewRegistryServer(host string, port int, token string, mgr *Manager) *Regis
}
func (s *RegistryServer) SetConfigPath(path string) { s.configPath = strings.TrimSpace(path) }
func (s *RegistryServer) SetWorkspacePath(path string) { s.workspacePath = strings.TrimSpace(path) }
func (s *RegistryServer) SetLogFilePath(path string) { s.logFilePath = strings.TrimSpace(path) }
func (s *RegistryServer) SetChatHandler(fn func(ctx context.Context, sessionKey, content string) (string, error)) {
s.onChat = fn
}
func (s *RegistryServer) SetChannelsHandler(fn func() interface{}) { s.onChannels = fn }
func (s *RegistryServer) SetConfigAfterHook(fn func()) { s.onConfigAfter = fn }
func (s *RegistryServer) SetCronHandler(fn func(action string, args map[string]interface{}) (interface{}, error)) {
s.onCron = fn
@@ -62,9 +68,14 @@ func (s *RegistryServer) Start(ctx context.Context) error {
mux.HandleFunc("/webui/", s.handleWebUIAsset)
mux.HandleFunc("/webui/api/config", s.handleWebUIConfig)
mux.HandleFunc("/webui/api/chat", s.handleWebUIChat)
mux.HandleFunc("/webui/api/chat/stream", s.handleWebUIChatStream)
mux.HandleFunc("/webui/api/upload", s.handleWebUIUpload)
mux.HandleFunc("/webui/api/nodes", s.handleWebUINodes)
mux.HandleFunc("/webui/api/cron", s.handleWebUICron)
mux.HandleFunc("/webui/api/channels", s.handleWebUIChannels)
mux.HandleFunc("/webui/api/skills", s.handleWebUISkills)
mux.HandleFunc("/webui/api/exec_approvals", s.handleWebUIExecApprovals)
mux.HandleFunc("/webui/api/logs/stream", s.handleWebUILogsStream)
s.server = &http.Server{Addr: s.addr, Handler: mux}
go func() {
<-ctx.Done()
@@ -332,6 +343,71 @@ func (s *RegistryServer) handleWebUIChat(w http.ResponseWriter, r *http.Request)
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "reply": resp, "session": session})
}
func (s *RegistryServer) handleWebUIChatStream(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
if s.onChat == nil {
http.Error(w, "chat handler not configured", http.StatusInternalServerError)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "stream unsupported", http.StatusInternalServerError)
return
}
var body struct {
Session string `json:"session"`
Message string `json:"message"`
Media string `json:"media"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "invalid json", http.StatusBadRequest)
return
}
session := strings.TrimSpace(body.Session)
if session == "" {
session = "webui:default"
}
prompt := strings.TrimSpace(body.Message)
if strings.TrimSpace(body.Media) != "" {
if prompt != "" {
prompt += "\n"
}
prompt += "[file: " + strings.TrimSpace(body.Media) + "]"
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
_, _ = w.Write([]byte("event: start\ndata: {\"ok\":true}\n\n"))
flusher.Flush()
resp, err := s.onChat(r.Context(), session, prompt)
if err != nil {
_, _ = w.Write([]byte("event: error\ndata: {\"error\":\"" + strings.ReplaceAll(err.Error(), "\"", "'") + "\"}\n\n"))
flusher.Flush()
return
}
chunk := 180
for i := 0; i < len(resp); i += chunk {
end := i + chunk
if end > len(resp) {
end = len(resp)
}
part := strings.ReplaceAll(resp[i:end], "\n", "\\n")
part = strings.ReplaceAll(part, "\"", "'")
_, _ = w.Write([]byte("event: delta\ndata: {\"text\":\"" + part + "\"}\n\n"))
flusher.Flush()
}
_, _ = w.Write([]byte("event: done\ndata: {\"ok\":true}\n\n"))
flusher.Flush()
}
func (s *RegistryServer) handleWebUINodes(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
@@ -398,6 +474,214 @@ func (s *RegistryServer) handleWebUICron(w http.ResponseWriter, r *http.Request)
}
}
func (s *RegistryServer) handleWebUIChannels(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
res := map[string]interface{}{"ok": true, "channels": []interface{}{}}
if s.onChannels != nil {
res["channels"] = s.onChannels()
}
_ = json.NewEncoder(w).Encode(res)
}
func (s *RegistryServer) handleWebUISkills(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
skillsDir := filepath.Join(strings.TrimSpace(s.workspacePath), "skills")
if skillsDir == "" {
http.Error(w, "workspace not configured", http.StatusInternalServerError)
return
}
switch r.Method {
case http.MethodGet:
entries, err := os.ReadDir(skillsDir)
if err != nil {
if os.IsNotExist(err) {
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "skills": []interface{}{}})
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
type skillItem struct {
Name string `json:"name"`
Enabled bool `json:"enabled"`
}
items := make([]skillItem, 0, len(entries))
for _, e := range entries {
if !e.IsDir() {
continue
}
name := e.Name()
enabled := !strings.HasSuffix(name, ".disabled")
items = append(items, skillItem{Name: strings.TrimSuffix(name, ".disabled"), Enabled: enabled})
}
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "skills": items})
case http.MethodPost:
var body struct {
Action string `json:"action"`
Name string `json:"name"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "invalid json", http.StatusBadRequest)
return
}
action := strings.ToLower(strings.TrimSpace(body.Action))
name := strings.TrimSpace(body.Name)
if name == "" {
http.Error(w, "name required", http.StatusBadRequest)
return
}
enabledPath := filepath.Join(skillsDir, name)
disabledPath := enabledPath + ".disabled"
switch action {
case "enable":
if _, err := os.Stat(disabledPath); err == nil {
if err := os.Rename(disabledPath, enabledPath); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
case "disable":
if _, err := os.Stat(enabledPath); err == nil {
if err := os.Rename(enabledPath, disabledPath); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
default:
http.Error(w, "unsupported action", http.StatusBadRequest)
return
}
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true})
default:
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
}
func (s *RegistryServer) handleWebUIExecApprovals(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
if strings.TrimSpace(s.configPath) == "" {
http.Error(w, "config path not set", http.StatusInternalServerError)
return
}
b, err := os.ReadFile(s.configPath)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var cfg map[string]interface{}
if err := json.Unmarshal(b, &cfg); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if r.Method == http.MethodGet {
toolsMap, _ := cfg["tools"].(map[string]interface{})
shellMap, _ := toolsMap["shell"].(map[string]interface{})
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "exec_approvals": shellMap})
return
}
if r.Method == http.MethodPost {
var body map[string]interface{}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "invalid json", http.StatusBadRequest)
return
}
toolsMap, _ := cfg["tools"].(map[string]interface{})
if toolsMap == nil {
toolsMap = map[string]interface{}{}
cfg["tools"] = toolsMap
}
shellMap, _ := toolsMap["shell"].(map[string]interface{})
if shellMap == nil {
shellMap = map[string]interface{}{}
toolsMap["shell"] = shellMap
}
for k, v := range body {
shellMap[k] = v
}
out, _ := json.MarshalIndent(cfg, "", " ")
tmp := s.configPath + ".tmp"
if err := os.WriteFile(tmp, out, 0644); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := os.Rename(tmp, s.configPath); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if s.onConfigAfter != nil {
s.onConfigAfter()
}
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "reloaded": true})
return
}
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
func (s *RegistryServer) handleWebUILogsStream(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
path := strings.TrimSpace(s.logFilePath)
if path == "" {
http.Error(w, "log path not configured", http.StatusInternalServerError)
return
}
f, err := os.Open(path)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer f.Close()
fi, _ := f.Stat()
if fi != nil {
_, _ = f.Seek(fi.Size(), io.SeekStart)
}
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "stream unsupported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
buf := make([]byte, 4096)
for {
select {
case <-r.Context().Done():
return
default:
n, err := f.Read(buf)
if n > 0 {
chunk := strings.ReplaceAll(string(buf[:n]), "\n", "\\n")
chunk = strings.ReplaceAll(chunk, "\"", "'")
_, _ = w.Write([]byte("event: log\ndata: {\"chunk\":\"" + chunk + "\"}\n\n"))
flusher.Flush()
}
if err != nil {
time.Sleep(500 * time.Millisecond)
}
}
}
}
func (s *RegistryServer) checkAuth(r *http.Request) bool {
if s.token == "" {
return true