feat: expand node artifact operations and retention

This commit is contained in:
lpf
2026-03-09 10:46:22 +08:00
parent be2e025fe5
commit ba3be33c91
22 changed files with 2724 additions and 151 deletions

View File

@@ -7,6 +7,8 @@ import (
"bytes"
"compress/gzip"
"context"
"crypto/sha1"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
@@ -33,32 +35,35 @@ import (
)
type Server struct {
addr string
token string
mgr *nodes.Manager
server *http.Server
nodeConnMu sync.Mutex
nodeConnIDs map[string]string
nodeSockets map[string]*nodeSocketConn
nodeWebRTC *nodes.WebRTCTransport
nodeP2PStatus func() map[string]interface{}
gatewayVersion string
webuiVersion string
configPath string
workspacePath string
logFilePath string
onChat func(ctx context.Context, sessionKey, content string) (string, error)
onChatHistory func(sessionKey string) []map[string]interface{}
onConfigAfter func()
onCron func(action string, args map[string]interface{}) (interface{}, error)
onSubagents func(ctx context.Context, action string, args map[string]interface{}) (interface{}, error)
onToolsCatalog func() interface{}
webUIDir string
ekgCacheMu sync.Mutex
ekgCachePath string
ekgCacheStamp time.Time
ekgCacheSize int64
ekgCacheRows []map[string]interface{}
addr string
token string
mgr *nodes.Manager
server *http.Server
nodeConnMu sync.Mutex
nodeConnIDs map[string]string
nodeSockets map[string]*nodeSocketConn
nodeWebRTC *nodes.WebRTCTransport
nodeP2PStatus func() map[string]interface{}
artifactStatsMu sync.Mutex
artifactStats map[string]interface{}
gatewayVersion string
webuiVersion string
configPath string
workspacePath string
logFilePath string
onChat func(ctx context.Context, sessionKey, content string) (string, error)
onChatHistory func(sessionKey string) []map[string]interface{}
onConfigAfter func()
onCron func(action string, args map[string]interface{}) (interface{}, error)
onSubagents func(ctx context.Context, action string, args map[string]interface{}) (interface{}, error)
onNodeDispatch func(ctx context.Context, req nodes.Request, mode string) (nodes.Response, error)
onToolsCatalog func() interface{}
webUIDir string
ekgCacheMu sync.Mutex
ekgCachePath string
ekgCacheStamp time.Time
ekgCacheSize int64
ekgCacheRows []map[string]interface{}
}
var nodesWebsocketUpgrader = websocket.Upgrader{
@@ -74,11 +79,12 @@ func NewServer(host string, port int, token string, mgr *nodes.Manager) *Server
port = 7788
}
return &Server{
addr: fmt.Sprintf("%s:%d", addr, port),
token: strings.TrimSpace(token),
mgr: mgr,
nodeConnIDs: map[string]string{},
nodeSockets: map[string]*nodeSocketConn{},
addr: fmt.Sprintf("%s:%d", addr, port),
token: strings.TrimSpace(token),
mgr: mgr,
nodeConnIDs: map[string]string{},
nodeSockets: map[string]*nodeSocketConn{},
artifactStats: map[string]interface{}{},
}
}
@@ -114,6 +120,9 @@ func (s *Server) SetCronHandler(fn func(action string, args map[string]interface
func (s *Server) SetSubagentHandler(fn func(ctx context.Context, action string, args map[string]interface{}) (interface{}, error)) {
s.onSubagents = fn
}
func (s *Server) SetNodeDispatchHandler(fn func(ctx context.Context, req nodes.Request, mode string) (nodes.Response, error)) {
s.onNodeDispatch = fn
}
func (s *Server) SetToolsCatalogHandler(fn func() interface{}) { s.onToolsCatalog = fn }
func (s *Server) SetWebUIDir(dir string) { s.webUIDir = strings.TrimSpace(dir) }
func (s *Server) SetGatewayVersion(v string) { s.gatewayVersion = strings.TrimSpace(v) }
@@ -227,6 +236,12 @@ func (s *Server) Start(ctx context.Context) error {
mux.HandleFunc("/webui/api/upload", s.handleWebUIUpload)
mux.HandleFunc("/webui/api/nodes", s.handleWebUINodes)
mux.HandleFunc("/webui/api/node_dispatches", s.handleWebUINodeDispatches)
mux.HandleFunc("/webui/api/node_dispatches/replay", s.handleWebUINodeDispatchReplay)
mux.HandleFunc("/webui/api/node_artifacts", s.handleWebUINodeArtifacts)
mux.HandleFunc("/webui/api/node_artifacts/export", s.handleWebUINodeArtifactsExport)
mux.HandleFunc("/webui/api/node_artifacts/download", s.handleWebUINodeArtifactDownload)
mux.HandleFunc("/webui/api/node_artifacts/delete", s.handleWebUINodeArtifactDelete)
mux.HandleFunc("/webui/api/node_artifacts/prune", s.handleWebUINodeArtifactPrune)
mux.HandleFunc("/webui/api/cron", s.handleWebUICron)
mux.HandleFunc("/webui/api/skills", s.handleWebUISkills)
mux.HandleFunc("/webui/api/sessions", s.handleWebUISessions)
@@ -1075,14 +1090,121 @@ func (s *Server) webUINodesPayload(ctx context.Context) map[string]interface{} {
if s.nodeP2PStatus != nil {
p2p = s.nodeP2PStatus()
}
dispatches := s.webUINodesDispatchPayload(12)
return map[string]interface{}{
"nodes": list,
"trees": s.buildNodeAgentTrees(ctx, list),
"p2p": p2p,
"dispatches": s.webUINodesDispatchPayload(12),
"nodes": list,
"trees": s.buildNodeAgentTrees(ctx, list),
"p2p": p2p,
"dispatches": dispatches,
"alerts": s.webUINodeAlertsPayload(list, p2p, dispatches),
"artifact_retention": s.artifactStatsSnapshot(),
}
}
func (s *Server) webUINodeAlertsPayload(nodeList []nodes.NodeInfo, p2p map[string]interface{}, dispatches []map[string]interface{}) []map[string]interface{} {
alerts := make([]map[string]interface{}, 0)
for _, node := range nodeList {
nodeID := strings.TrimSpace(node.ID)
if nodeID == "" || nodeID == "local" {
continue
}
if !node.Online {
alerts = append(alerts, map[string]interface{}{
"severity": "critical",
"kind": "node_offline",
"node": nodeID,
"title": "Node offline",
"detail": fmt.Sprintf("node %s is offline", nodeID),
})
}
}
if sessions, ok := p2p["nodes"].([]map[string]interface{}); ok {
for _, session := range sessions {
appendNodeSessionAlert(&alerts, session)
}
} else if sessions, ok := p2p["nodes"].([]interface{}); ok {
for _, raw := range sessions {
if session, ok := raw.(map[string]interface{}); ok {
appendNodeSessionAlert(&alerts, session)
}
}
}
failuresByNode := map[string]int{}
for _, row := range dispatches {
nodeID := strings.TrimSpace(fmt.Sprint(row["node"]))
if nodeID == "" {
continue
}
if ok, _ := row["ok"].(bool); ok {
continue
}
failuresByNode[nodeID]++
}
for nodeID, count := range failuresByNode {
if count < 2 {
continue
}
alerts = append(alerts, map[string]interface{}{
"severity": "warning",
"kind": "dispatch_failures",
"node": nodeID,
"title": "Repeated dispatch failures",
"detail": fmt.Sprintf("node %s has %d recent failed dispatches", nodeID, count),
"count": count,
})
}
return alerts
}
func appendNodeSessionAlert(alerts *[]map[string]interface{}, session map[string]interface{}) {
nodeID := strings.TrimSpace(fmt.Sprint(session["node"]))
if nodeID == "" {
return
}
status := strings.ToLower(strings.TrimSpace(fmt.Sprint(session["status"])))
retryCount := int(int64Value(session["retry_count"]))
lastError := strings.TrimSpace(fmt.Sprint(session["last_error"]))
switch {
case status == "failed" || status == "closed":
*alerts = append(*alerts, map[string]interface{}{
"severity": "critical",
"kind": "p2p_session_down",
"node": nodeID,
"title": "P2P session down",
"detail": firstNonEmptyString(lastError, fmt.Sprintf("node %s p2p session is %s", nodeID, status)),
})
case retryCount >= 3 || (status == "connecting" && retryCount >= 2):
*alerts = append(*alerts, map[string]interface{}{
"severity": "warning",
"kind": "p2p_session_unstable",
"node": nodeID,
"title": "P2P session unstable",
"detail": firstNonEmptyString(lastError, fmt.Sprintf("node %s p2p session retry_count=%d", nodeID, retryCount)),
"count": retryCount,
})
}
}
func int64Value(v interface{}) int64 {
switch value := v.(type) {
case int:
return int64(value)
case int32:
return int64(value)
case int64:
return value
case float32:
return int64(value)
case float64:
return int64(value)
case json.Number:
if n, err := value.Int64(); err == nil {
return n
}
}
return 0
}
func (s *Server) webUINodesDispatchPayload(limit int) []map[string]interface{} {
workspace := strings.TrimSpace(s.workspacePath)
if workspace == "" {
@@ -1115,6 +1237,381 @@ func (s *Server) webUINodesDispatchPayload(limit int) []map[string]interface{} {
return out
}
func (s *Server) webUINodeArtifactsPayload(limit int) []map[string]interface{} {
return s.webUINodeArtifactsPayloadFiltered("", "", "", limit)
}
func (s *Server) webUINodeArtifactsPayloadFiltered(nodeFilter, actionFilter, kindFilter string, limit int) []map[string]interface{} {
nodeFilter = strings.TrimSpace(nodeFilter)
actionFilter = strings.TrimSpace(actionFilter)
kindFilter = strings.TrimSpace(kindFilter)
rows, _ := s.readNodeDispatchAuditRows()
if len(rows) == 0 {
return []map[string]interface{}{}
}
out := make([]map[string]interface{}, 0, limit)
for rowIndex := len(rows) - 1; rowIndex >= 0; rowIndex-- {
row := rows[rowIndex]
artifacts, _ := row["artifacts"].([]interface{})
for artifactIndex, raw := range artifacts {
artifact, ok := raw.(map[string]interface{})
if !ok {
continue
}
item := map[string]interface{}{
"id": buildNodeArtifactID(row, artifact, artifactIndex),
"time": row["time"],
"node": row["node"],
"action": row["action"],
"used_transport": row["used_transport"],
"ok": row["ok"],
"error": row["error"],
}
for _, key := range []string{"name", "kind", "mime_type", "storage", "path", "url", "content_text", "content_base64", "source_path", "size_bytes"} {
if value, ok := artifact[key]; ok {
item[key] = value
}
}
if nodeFilter != "" && !strings.EqualFold(strings.TrimSpace(fmt.Sprint(item["node"])), nodeFilter) {
continue
}
if actionFilter != "" && !strings.EqualFold(strings.TrimSpace(fmt.Sprint(item["action"])), actionFilter) {
continue
}
if kindFilter != "" && !strings.EqualFold(strings.TrimSpace(fmt.Sprint(item["kind"])), kindFilter) {
continue
}
out = append(out, item)
if limit > 0 && len(out) >= limit {
return out
}
}
}
return out
}
func (s *Server) readNodeDispatchAuditRows() ([]map[string]interface{}, string) {
workspace := strings.TrimSpace(s.workspacePath)
if workspace == "" {
return nil, ""
}
path := filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl")
data, err := os.ReadFile(path)
if err != nil {
return nil, path
}
lines := strings.Split(strings.TrimSpace(string(data)), "\n")
rows := make([]map[string]interface{}, 0, len(lines))
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
row := map[string]interface{}{}
if err := json.Unmarshal([]byte(line), &row); err != nil {
continue
}
rows = append(rows, row)
}
return rows, path
}
func buildNodeArtifactID(row, artifact map[string]interface{}, artifactIndex int) string {
seed := fmt.Sprintf("%v|%v|%v|%d|%v|%v|%v",
row["time"], row["node"], row["action"], artifactIndex,
artifact["name"], artifact["source_path"], artifact["path"],
)
sum := sha1.Sum([]byte(seed))
return fmt.Sprintf("%x", sum[:8])
}
func sanitizeZipEntryName(name string) string {
name = strings.TrimSpace(name)
if name == "" {
return "artifact.bin"
}
name = strings.ReplaceAll(name, "\\", "/")
name = filepath.Base(name)
name = strings.Map(func(r rune) rune {
switch {
case r >= 'a' && r <= 'z':
return r
case r >= 'A' && r <= 'Z':
return r
case r >= '0' && r <= '9':
return r
case r == '.', r == '-', r == '_':
return r
default:
return '_'
}
}, name)
if strings.Trim(name, "._") == "" {
return "artifact.bin"
}
return name
}
func (s *Server) findNodeArtifactByID(id string) (map[string]interface{}, bool) {
for _, item := range s.webUINodeArtifactsPayload(10000) {
if strings.TrimSpace(fmt.Sprint(item["id"])) == id {
return item, true
}
}
return nil, false
}
func resolveArtifactPath(workspace, raw string) string {
raw = strings.TrimSpace(raw)
if raw == "" {
return ""
}
if filepath.IsAbs(raw) {
clean := filepath.Clean(raw)
if info, err := os.Stat(clean); err == nil && !info.IsDir() {
return clean
}
return ""
}
root := strings.TrimSpace(workspace)
if root == "" {
return ""
}
clean := filepath.Clean(filepath.Join(root, raw))
if rel, err := filepath.Rel(root, clean); err != nil || strings.HasPrefix(rel, "..") {
return ""
}
if info, err := os.Stat(clean); err == nil && !info.IsDir() {
return clean
}
return ""
}
func readArtifactBytes(workspace string, item map[string]interface{}) ([]byte, string, error) {
if content := strings.TrimSpace(fmt.Sprint(item["content_base64"])); content != "" {
raw, err := base64.StdEncoding.DecodeString(content)
if err != nil {
return nil, "", err
}
return raw, strings.TrimSpace(fmt.Sprint(item["mime_type"])), nil
}
for _, rawPath := range []string{fmt.Sprint(item["source_path"]), fmt.Sprint(item["path"])} {
if path := resolveArtifactPath(workspace, rawPath); path != "" {
b, err := os.ReadFile(path)
if err != nil {
return nil, "", err
}
return b, strings.TrimSpace(fmt.Sprint(item["mime_type"])), nil
}
}
if contentText := fmt.Sprint(item["content_text"]); strings.TrimSpace(contentText) != "" {
return []byte(contentText), "text/plain; charset=utf-8", nil
}
return nil, "", fmt.Errorf("artifact content unavailable")
}
func (s *Server) filteredNodeDispatches(nodeFilter, actionFilter string, limit int) []map[string]interface{} {
items := s.webUINodesDispatchPayload(limit)
if nodeFilter == "" && actionFilter == "" {
return items
}
out := make([]map[string]interface{}, 0, len(items))
for _, item := range items {
if nodeFilter != "" && !strings.EqualFold(strings.TrimSpace(fmt.Sprint(item["node"])), nodeFilter) {
continue
}
if actionFilter != "" && !strings.EqualFold(strings.TrimSpace(fmt.Sprint(item["action"])), actionFilter) {
continue
}
out = append(out, item)
}
return out
}
func filteredNodeAlerts(alerts []map[string]interface{}, nodeFilter string) []map[string]interface{} {
if nodeFilter == "" {
return alerts
}
out := make([]map[string]interface{}, 0, len(alerts))
for _, item := range alerts {
if strings.EqualFold(strings.TrimSpace(fmt.Sprint(item["node"])), nodeFilter) {
out = append(out, item)
}
}
return out
}
func (s *Server) setArtifactStats(summary map[string]interface{}) {
s.artifactStatsMu.Lock()
defer s.artifactStatsMu.Unlock()
if summary == nil {
s.artifactStats = map[string]interface{}{}
return
}
copySummary := make(map[string]interface{}, len(summary))
for k, v := range summary {
copySummary[k] = v
}
s.artifactStats = copySummary
}
func (s *Server) artifactStatsSnapshot() map[string]interface{} {
s.artifactStatsMu.Lock()
defer s.artifactStatsMu.Unlock()
out := make(map[string]interface{}, len(s.artifactStats))
for k, v := range s.artifactStats {
out[k] = v
}
return out
}
func (s *Server) nodeArtifactRetentionConfig() cfgpkg.GatewayNodesArtifactsConfig {
cfg := cfgpkg.DefaultConfig()
if strings.TrimSpace(s.configPath) != "" {
if loaded, err := cfgpkg.LoadConfig(s.configPath); err == nil && loaded != nil {
cfg = loaded
}
}
return cfg.Gateway.Nodes.Artifacts
}
func (s *Server) applyNodeArtifactRetention() map[string]interface{} {
retention := s.nodeArtifactRetentionConfig()
if !retention.Enabled || !retention.PruneOnRead || retention.KeepLatest <= 0 {
summary := map[string]interface{}{
"enabled": retention.Enabled,
"keep_latest": retention.KeepLatest,
"retain_days": retention.RetainDays,
"prune_on_read": retention.PruneOnRead,
"pruned": 0,
"last_run_at": time.Now().UTC().Format(time.RFC3339),
}
s.setArtifactStats(summary)
return summary
}
items := s.webUINodeArtifactsPayload(0)
cutoff := time.Time{}
if retention.RetainDays > 0 {
cutoff = time.Now().UTC().Add(-time.Duration(retention.RetainDays) * 24 * time.Hour)
}
pruned := 0
prunedByAge := 0
prunedByCount := 0
for index, item := range items {
drop := false
dropByAge := false
if !cutoff.IsZero() {
if tm, err := time.Parse(time.RFC3339, strings.TrimSpace(fmt.Sprint(item["time"]))); err == nil && tm.Before(cutoff) {
drop = true
dropByAge = true
}
}
if !drop && index >= retention.KeepLatest {
drop = true
}
if !drop {
continue
}
_, deletedAudit, _ := s.deleteNodeArtifact(strings.TrimSpace(fmt.Sprint(item["id"])))
if deletedAudit {
pruned++
if dropByAge {
prunedByAge++
} else {
prunedByCount++
}
}
}
summary := map[string]interface{}{
"enabled": true,
"keep_latest": retention.KeepLatest,
"retain_days": retention.RetainDays,
"prune_on_read": retention.PruneOnRead,
"pruned": pruned,
"pruned_by_age": prunedByAge,
"pruned_by_count": prunedByCount,
"remaining": len(s.webUINodeArtifactsPayload(0)),
"last_run_at": time.Now().UTC().Format(time.RFC3339),
}
s.setArtifactStats(summary)
return summary
}
func (s *Server) deleteNodeArtifact(id string) (bool, bool, error) {
id = strings.TrimSpace(id)
if id == "" {
return false, false, fmt.Errorf("id is required")
}
rows, auditPath := s.readNodeDispatchAuditRows()
if len(rows) == 0 || auditPath == "" {
return false, false, fmt.Errorf("artifact audit is empty")
}
deletedFile := false
deletedAudit := false
for rowIndex, row := range rows {
artifacts, _ := row["artifacts"].([]interface{})
if len(artifacts) == 0 {
continue
}
nextArtifacts := make([]interface{}, 0, len(artifacts))
for artifactIndex, raw := range artifacts {
artifact, ok := raw.(map[string]interface{})
if !ok {
nextArtifacts = append(nextArtifacts, raw)
continue
}
if buildNodeArtifactID(row, artifact, artifactIndex) != id {
nextArtifacts = append(nextArtifacts, artifact)
continue
}
for _, rawPath := range []string{fmt.Sprint(artifact["source_path"]), fmt.Sprint(artifact["path"])} {
if path := resolveArtifactPath(s.workspacePath, rawPath); path != "" {
if err := os.Remove(path); err == nil {
deletedFile = true
break
}
}
}
deletedAudit = true
}
if deletedAudit {
row["artifacts"] = nextArtifacts
row["artifact_count"] = len(nextArtifacts)
kinds := make([]string, 0, len(nextArtifacts))
for _, raw := range nextArtifacts {
if artifact, ok := raw.(map[string]interface{}); ok {
if kind := strings.TrimSpace(fmt.Sprint(artifact["kind"])); kind != "" {
kinds = append(kinds, kind)
}
}
}
if len(kinds) > 0 {
row["artifact_kinds"] = kinds
} else {
delete(row, "artifact_kinds")
}
rows[rowIndex] = row
break
}
}
if !deletedAudit {
return false, false, fmt.Errorf("artifact not found")
}
var buf bytes.Buffer
for _, row := range rows {
encoded, err := json.Marshal(row)
if err != nil {
continue
}
buf.Write(encoded)
buf.WriteByte('\n')
}
if err := os.WriteFile(auditPath, buf.Bytes(), 0644); err != nil {
return deletedFile, false, err
}
return deletedFile, true, nil
}
func (s *Server) webUISessionsPayload() map[string]interface{} {
sessionsDir := filepath.Join(filepath.Dir(s.workspacePath), "agents", "main", "sessions")
_ = os.MkdirAll(sessionsDir, 0755)
@@ -1607,6 +2104,325 @@ func (s *Server) handleWebUINodeDispatches(w http.ResponseWriter, r *http.Reques
})
}
func (s *Server) handleWebUINodeDispatchReplay(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
if s.onNodeDispatch == nil {
http.Error(w, "node dispatch handler not configured", http.StatusServiceUnavailable)
return
}
var body struct {
Node string `json:"node"`
Action string `json:"action"`
Mode string `json:"mode"`
Task string `json:"task"`
Model string `json:"model"`
Args map[string]interface{} `json:"args"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "invalid json", http.StatusBadRequest)
return
}
req := nodes.Request{
Node: strings.TrimSpace(body.Node),
Action: strings.TrimSpace(body.Action),
Task: body.Task,
Model: body.Model,
Args: body.Args,
}
if req.Node == "" || req.Action == "" {
http.Error(w, "node and action are required", http.StatusBadRequest)
return
}
resp, err := s.onNodeDispatch(r.Context(), req, strings.TrimSpace(body.Mode))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
_ = json.NewEncoder(w).Encode(map[string]interface{}{
"ok": true,
"result": resp,
})
}
func (s *Server) handleWebUINodeArtifacts(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
limit := 200
if raw := strings.TrimSpace(r.URL.Query().Get("limit")); raw != "" {
if n, err := strconv.Atoi(raw); err == nil && n > 0 {
if n > 1000 {
n = 1000
}
limit = n
}
}
retentionSummary := s.applyNodeArtifactRetention()
nodeFilter := strings.TrimSpace(r.URL.Query().Get("node"))
actionFilter := strings.TrimSpace(r.URL.Query().Get("action"))
kindFilter := strings.TrimSpace(r.URL.Query().Get("kind"))
_ = json.NewEncoder(w).Encode(map[string]interface{}{
"ok": true,
"items": s.webUINodeArtifactsPayloadFiltered(nodeFilter, actionFilter, kindFilter, limit),
"artifact_retention": retentionSummary,
})
}
func (s *Server) handleWebUINodeArtifactsExport(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
retentionSummary := s.applyNodeArtifactRetention()
limit := 200
if raw := strings.TrimSpace(r.URL.Query().Get("limit")); raw != "" {
if n, err := strconv.Atoi(raw); err == nil && n > 0 {
if n > 1000 {
n = 1000
}
limit = n
}
}
nodeFilter := strings.TrimSpace(r.URL.Query().Get("node"))
actionFilter := strings.TrimSpace(r.URL.Query().Get("action"))
kindFilter := strings.TrimSpace(r.URL.Query().Get("kind"))
artifacts := s.webUINodeArtifactsPayloadFiltered(nodeFilter, actionFilter, kindFilter, limit)
dispatches := s.filteredNodeDispatches(nodeFilter, actionFilter, limit)
payload := s.webUINodesPayload(r.Context())
nodeList, _ := payload["nodes"].([]nodes.NodeInfo)
p2p, _ := payload["p2p"].(map[string]interface{})
alerts := filteredNodeAlerts(s.webUINodeAlertsPayload(nodeList, p2p, dispatches), nodeFilter)
var archive bytes.Buffer
zw := zip.NewWriter(&archive)
writeJSON := func(name string, value interface{}) error {
entry, err := zw.Create(name)
if err != nil {
return err
}
enc := json.NewEncoder(entry)
enc.SetIndent("", " ")
return enc.Encode(value)
}
manifest := map[string]interface{}{
"generated_at": time.Now().UTC().Format(time.RFC3339),
"filters": map[string]interface{}{
"node": nodeFilter,
"action": actionFilter,
"kind": kindFilter,
"limit": limit,
},
"artifact_count": len(artifacts),
"dispatch_count": len(dispatches),
"alert_count": len(alerts),
"retention": retentionSummary,
}
if err := writeJSON("manifest.json", manifest); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := writeJSON("dispatches.json", dispatches); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := writeJSON("alerts.json", alerts); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := writeJSON("artifacts.json", artifacts); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
for _, item := range artifacts {
name := sanitizeZipEntryName(firstNonEmptyString(
fmt.Sprint(item["name"]),
fmt.Sprint(item["source_path"]),
fmt.Sprint(item["path"]),
fmt.Sprintf("%s.bin", fmt.Sprint(item["id"])),
))
raw, _, err := readArtifactBytes(s.workspacePath, item)
entryName := filepath.ToSlash(filepath.Join("files", fmt.Sprintf("%s-%s", fmt.Sprint(item["id"]), name)))
if err != nil || len(raw) == 0 {
entryName = filepath.ToSlash(filepath.Join("files", fmt.Sprintf("%s-metadata.json", fmt.Sprint(item["id"]))))
raw, err = json.MarshalIndent(item, "", " ")
if err != nil {
continue
}
}
entry, err := zw.Create(entryName)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if _, err := entry.Write(raw); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
if err := zw.Close(); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
filename := "node-artifacts-export.zip"
if nodeFilter != "" {
filename = fmt.Sprintf("node-artifacts-%s.zip", sanitizeZipEntryName(nodeFilter))
}
w.Header().Set("Content-Type", "application/zip")
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filename))
w.WriteHeader(http.StatusOK)
_, _ = w.Write(archive.Bytes())
}
func (s *Server) handleWebUINodeArtifactDownload(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
id := strings.TrimSpace(r.URL.Query().Get("id"))
if id == "" {
http.Error(w, "id is required", http.StatusBadRequest)
return
}
item, ok := s.findNodeArtifactByID(id)
if !ok {
http.Error(w, "artifact not found", http.StatusNotFound)
return
}
name := strings.TrimSpace(fmt.Sprint(item["name"]))
if name == "" {
name = "artifact"
}
mimeType := strings.TrimSpace(fmt.Sprint(item["mime_type"]))
if mimeType == "" {
mimeType = "application/octet-stream"
}
if contentB64 := strings.TrimSpace(fmt.Sprint(item["content_base64"])); contentB64 != "" {
payload, err := base64.StdEncoding.DecodeString(contentB64)
if err != nil {
http.Error(w, "invalid inline artifact payload", http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", mimeType)
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", name))
_, _ = w.Write(payload)
return
}
for _, rawPath := range []string{fmt.Sprint(item["source_path"]), fmt.Sprint(item["path"])} {
if path := resolveArtifactPath(s.workspacePath, rawPath); path != "" {
http.ServeFile(w, r, path)
return
}
}
if contentText := fmt.Sprint(item["content_text"]); strings.TrimSpace(contentText) != "" {
w.Header().Set("Content-Type", mimeType)
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", name))
_, _ = w.Write([]byte(contentText))
return
}
http.Error(w, "artifact content unavailable", http.StatusNotFound)
}
func (s *Server) handleWebUINodeArtifactDelete(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var body struct {
ID string `json:"id"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "invalid json", http.StatusBadRequest)
return
}
deletedFile, deletedAudit, err := s.deleteNodeArtifact(strings.TrimSpace(body.ID))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
_ = json.NewEncoder(w).Encode(map[string]interface{}{
"ok": true,
"id": strings.TrimSpace(body.ID),
"deleted_file": deletedFile,
"deleted_audit": deletedAudit,
})
}
func (s *Server) handleWebUINodeArtifactPrune(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var body struct {
Node string `json:"node"`
Action string `json:"action"`
Kind string `json:"kind"`
KeepLatest int `json:"keep_latest"`
Limit int `json:"limit"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "invalid json", http.StatusBadRequest)
return
}
limit := body.Limit
if limit <= 0 || limit > 5000 {
limit = 5000
}
keepLatest := body.KeepLatest
if keepLatest < 0 {
keepLatest = 0
}
items := s.webUINodeArtifactsPayloadFiltered(strings.TrimSpace(body.Node), strings.TrimSpace(body.Action), strings.TrimSpace(body.Kind), limit)
pruned := 0
deletedFiles := 0
for index, item := range items {
if index < keepLatest {
continue
}
deletedFile, deletedAudit, err := s.deleteNodeArtifact(strings.TrimSpace(fmt.Sprint(item["id"])))
if err != nil || !deletedAudit {
continue
}
pruned++
if deletedFile {
deletedFiles++
}
}
_ = json.NewEncoder(w).Encode(map[string]interface{}{
"ok": true,
"pruned": pruned,
"deleted_files": deletedFiles,
"kept": keepLatest,
})
}
func (s *Server) buildNodeAgentTrees(ctx context.Context, nodeList []nodes.NodeInfo) []map[string]interface{} {
trees := make([]map[string]interface{}, 0, len(nodeList))
localRegistry := s.fetchRegistryItems(ctx)

View File

@@ -1,10 +1,12 @@
package api
import (
"archive/zip"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
@@ -432,7 +434,10 @@ func TestHandleWebUILogsLive(t *testing.T) {
func TestHandleWebUINodesIncludesP2PSummary(t *testing.T) {
t.Parallel()
srv := NewServer("127.0.0.1", 0, "", nodes.NewManager())
mgr := nodes.NewManager()
mgr.Upsert(nodes.NodeInfo{ID: "edge-b", Name: "Edge B"})
mgr.MarkOffline("edge-b")
srv := NewServer("127.0.0.1", 0, "", mgr)
workspace := t.TempDir()
srv.SetWorkspacePath(workspace)
if err := os.MkdirAll(filepath.Join(workspace, "memory"), 0755); err != nil {
@@ -446,6 +451,9 @@ func TestHandleWebUINodesIncludesP2PSummary(t *testing.T) {
"enabled": true,
"transport": "webrtc",
"active_sessions": 2,
"nodes": []map[string]interface{}{
{"node": "edge-b", "status": "connecting", "retry_count": 3, "last_error": "signal timeout"},
},
}
})
@@ -463,6 +471,10 @@ func TestHandleWebUINodesIncludesP2PSummary(t *testing.T) {
if p2p == nil || p2p["transport"] != "webrtc" {
t.Fatalf("expected p2p summary, got %+v", body)
}
alerts, _ := body["alerts"].([]interface{})
if len(alerts) == 0 {
t.Fatalf("expected node alerts, got %+v", body)
}
dispatches, _ := body["dispatches"].([]interface{})
if len(dispatches) != 1 {
t.Fatalf("expected dispatch audit rows, got %+v", body["dispatches"])
@@ -473,3 +485,273 @@ func TestHandleWebUINodesIncludesP2PSummary(t *testing.T) {
t.Fatalf("expected artifact previews in dispatch row, got %+v", first)
}
}
func TestHandleWebUINodeDispatchReplay(t *testing.T) {
t.Parallel()
srv := NewServer("127.0.0.1", 0, "", nodes.NewManager())
srv.SetNodeDispatchHandler(func(ctx context.Context, req nodes.Request, mode string) (nodes.Response, error) {
if req.Node != "edge-a" || req.Action != "screen_snapshot" || mode != "auto" {
t.Fatalf("unexpected replay request: %+v mode=%s", req, mode)
}
if fmt.Sprint(req.Args["quality"]) != "high" {
t.Fatalf("unexpected args: %+v", req.Args)
}
return nodes.Response{
OK: true,
Node: req.Node,
Action: req.Action,
Payload: map[string]interface{}{
"used_transport": "webrtc",
},
}, nil
})
body := `{"node":"edge-a","action":"screen_snapshot","mode":"auto","args":{"quality":"high"}}`
req := httptest.NewRequest(http.MethodPost, "/webui/api/node_dispatches/replay", strings.NewReader(body))
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
srv.handleWebUINodeDispatchReplay(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", rec.Code, rec.Body.String())
}
if !strings.Contains(rec.Body.String(), `"used_transport":"webrtc"`) {
t.Fatalf("expected replay result body, got: %s", rec.Body.String())
}
}
func TestHandleWebUINodeArtifactsListAndDelete(t *testing.T) {
t.Parallel()
srv := NewServer("127.0.0.1", 0, "", nodes.NewManager())
workspace := t.TempDir()
srv.SetWorkspacePath(workspace)
if err := os.MkdirAll(filepath.Join(workspace, "memory"), 0o755); err != nil {
t.Fatalf("mkdir memory: %v", err)
}
artifactPath := filepath.Join(workspace, "artifact.txt")
if err := os.WriteFile(artifactPath, []byte("artifact-body"), 0o644); err != nil {
t.Fatalf("write artifact: %v", err)
}
auditLine := fmt.Sprintf("{\"time\":\"2026-03-09T00:00:00Z\",\"node\":\"edge-a\",\"action\":\"run\",\"artifacts\":[{\"name\":\"artifact.txt\",\"kind\":\"text\",\"mime_type\":\"text/plain\",\"source_path\":\"%s\",\"size_bytes\":13}]}\n", artifactPath)
if err := os.WriteFile(filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl"), []byte(auditLine), 0o644); err != nil {
t.Fatalf("write audit: %v", err)
}
listReq := httptest.NewRequest(http.MethodGet, "/webui/api/node_artifacts", nil)
listRec := httptest.NewRecorder()
srv.handleWebUINodeArtifacts(listRec, listReq)
if listRec.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", listRec.Code)
}
var listBody map[string]interface{}
if err := json.Unmarshal(listRec.Body.Bytes(), &listBody); err != nil {
t.Fatalf("decode list body: %v", err)
}
items, _ := listBody["items"].([]interface{})
if len(items) != 1 {
t.Fatalf("expected 1 artifact, got %+v", listBody)
}
item, _ := items[0].(map[string]interface{})
artifactID := strings.TrimSpace(fmt.Sprint(item["id"]))
if artifactID == "" {
t.Fatalf("expected artifact id, got %+v", item)
}
deleteReq := httptest.NewRequest(http.MethodPost, "/webui/api/node_artifacts/delete", strings.NewReader(fmt.Sprintf(`{"id":"%s"}`, artifactID)))
deleteReq.Header.Set("Content-Type", "application/json")
deleteRec := httptest.NewRecorder()
srv.handleWebUINodeArtifactDelete(deleteRec, deleteReq)
if deleteRec.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", deleteRec.Code, deleteRec.Body.String())
}
if _, err := os.Stat(artifactPath); !os.IsNotExist(err) {
t.Fatalf("expected artifact file removed, stat err=%v", err)
}
}
func TestHandleWebUINodeArtifactsExport(t *testing.T) {
t.Parallel()
srv := NewServer("127.0.0.1", 0, "", nodes.NewManager())
workspace := t.TempDir()
srv.SetWorkspacePath(workspace)
if err := os.MkdirAll(filepath.Join(workspace, "memory"), 0o755); err != nil {
t.Fatalf("mkdir memory: %v", err)
}
auditLine := "{\"time\":\"2026-03-09T00:00:00Z\",\"node\":\"edge-a\",\"action\":\"screen_snapshot\",\"ok\":true,\"artifacts\":[{\"name\":\"shot.txt\",\"kind\":\"text\",\"mime_type\":\"text/plain\",\"content_base64\":\"Y2FwdHVyZQ==\",\"size_bytes\":7}]}\n"
if err := os.WriteFile(filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl"), []byte(auditLine), 0o644); err != nil {
t.Fatalf("write audit: %v", err)
}
srv.mgr.Upsert(nodes.NodeInfo{ID: "edge-a", Name: "Edge A", Online: true})
req := httptest.NewRequest(http.MethodGet, "/webui/api/node_artifacts/export?node=edge-a&action=screen_snapshot&kind=text", nil)
rec := httptest.NewRecorder()
srv.handleWebUINodeArtifactsExport(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", rec.Code, rec.Body.String())
}
if got := rec.Header().Get("Content-Type"); !strings.Contains(got, "application/zip") {
t.Fatalf("expected zip response, got %q", got)
}
zr, err := zip.NewReader(bytes.NewReader(rec.Body.Bytes()), int64(rec.Body.Len()))
if err != nil {
t.Fatalf("open zip: %v", err)
}
seen := map[string]bool{}
for _, file := range zr.File {
seen[file.Name] = true
}
for _, required := range []string{"manifest.json", "dispatches.json", "alerts.json", "artifacts.json"} {
if !seen[required] {
t.Fatalf("missing zip entry %q in %+v", required, seen)
}
}
foundFile := false
for _, file := range zr.File {
if !strings.HasPrefix(file.Name, "files/") {
continue
}
foundFile = true
rc, err := file.Open()
if err != nil {
t.Fatalf("open artifact file: %v", err)
}
body, _ := io.ReadAll(rc)
_ = rc.Close()
if string(body) != "capture" {
t.Fatalf("unexpected artifact payload %q", string(body))
}
}
if !foundFile {
t.Fatalf("expected exported artifact file in zip")
}
}
func TestHandleWebUINodeArtifactsPrune(t *testing.T) {
t.Parallel()
srv := NewServer("127.0.0.1", 0, "", nodes.NewManager())
workspace := t.TempDir()
srv.SetWorkspacePath(workspace)
if err := os.MkdirAll(filepath.Join(workspace, "memory"), 0o755); err != nil {
t.Fatalf("mkdir memory: %v", err)
}
auditLines := strings.Join([]string{
"{\"time\":\"2026-03-09T00:00:00Z\",\"node\":\"edge-a\",\"action\":\"screen_snapshot\",\"ok\":true,\"artifacts\":[{\"name\":\"one.txt\",\"kind\":\"text\",\"mime_type\":\"text/plain\",\"content_base64\":\"b25l\"}]}",
"{\"time\":\"2026-03-09T00:01:00Z\",\"node\":\"edge-a\",\"action\":\"screen_snapshot\",\"ok\":true,\"artifacts\":[{\"name\":\"two.txt\",\"kind\":\"text\",\"mime_type\":\"text/plain\",\"content_base64\":\"dHdv\"}]}",
"{\"time\":\"2026-03-09T00:02:00Z\",\"node\":\"edge-a\",\"action\":\"screen_snapshot\",\"ok\":true,\"artifacts\":[{\"name\":\"three.txt\",\"kind\":\"text\",\"mime_type\":\"text/plain\",\"content_base64\":\"dGhyZWU=\"}]}",
}, "\n") + "\n"
if err := os.WriteFile(filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl"), []byte(auditLines), 0o644); err != nil {
t.Fatalf("write audit: %v", err)
}
req := httptest.NewRequest(http.MethodPost, "/webui/api/node_artifacts/prune", strings.NewReader(`{"node":"edge-a","action":"screen_snapshot","kind":"text","keep_latest":1}`))
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
srv.handleWebUINodeArtifactPrune(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", rec.Code, rec.Body.String())
}
items := srv.webUINodeArtifactsPayloadFiltered("edge-a", "screen_snapshot", "text", 10)
if len(items) != 1 {
t.Fatalf("expected 1 remaining artifact, got %d", len(items))
}
if got := fmt.Sprint(items[0]["name"]); got != "three.txt" {
t.Fatalf("expected newest artifact to remain, got %q", got)
}
}
func TestHandleWebUINodeArtifactsAppliesRetentionConfig(t *testing.T) {
t.Parallel()
srv := NewServer("127.0.0.1", 0, "", nodes.NewManager())
workspace := t.TempDir()
srv.SetWorkspacePath(workspace)
if err := os.MkdirAll(filepath.Join(workspace, "memory"), 0o755); err != nil {
t.Fatalf("mkdir memory: %v", err)
}
cfg := cfgpkg.DefaultConfig()
cfg.Gateway.Nodes.Artifacts.Enabled = true
cfg.Gateway.Nodes.Artifacts.KeepLatest = 1
cfg.Gateway.Nodes.Artifacts.PruneOnRead = true
cfgPath := filepath.Join(workspace, "config.json")
if err := cfgpkg.SaveConfig(cfgPath, cfg); err != nil {
t.Fatalf("save config: %v", err)
}
srv.SetConfigPath(cfgPath)
auditLines := strings.Join([]string{
"{\"time\":\"2026-03-09T00:00:00Z\",\"node\":\"edge-a\",\"action\":\"screen_snapshot\",\"ok\":true,\"artifacts\":[{\"name\":\"one.txt\",\"kind\":\"text\",\"mime_type\":\"text/plain\",\"content_base64\":\"b25l\"}]}",
"{\"time\":\"2026-03-09T00:01:00Z\",\"node\":\"edge-a\",\"action\":\"screen_snapshot\",\"ok\":true,\"artifacts\":[{\"name\":\"two.txt\",\"kind\":\"text\",\"mime_type\":\"text/plain\",\"content_base64\":\"dHdv\"}]}",
}, "\n") + "\n"
if err := os.WriteFile(filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl"), []byte(auditLines), 0o644); err != nil {
t.Fatalf("write audit: %v", err)
}
req := httptest.NewRequest(http.MethodGet, "/webui/api/node_artifacts", nil)
rec := httptest.NewRecorder()
srv.handleWebUINodeArtifacts(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", rec.Code, rec.Body.String())
}
items := srv.webUINodeArtifactsPayload(10)
if len(items) != 1 {
t.Fatalf("expected retention to keep 1 artifact, got %d", len(items))
}
if got := fmt.Sprint(items[0]["name"]); got != "two.txt" {
t.Fatalf("expected newest artifact to remain, got %q", got)
}
stats := srv.artifactStatsSnapshot()
if fmt.Sprint(stats["pruned"]) == "" || fmt.Sprint(stats["pruned"]) == "0" {
t.Fatalf("expected retention stats to record pruned artifacts, got %+v", stats)
}
if fmt.Sprint(stats["keep_latest"]) != "1" {
t.Fatalf("expected keep_latest in stats, got %+v", stats)
}
}
func TestHandleWebUINodeArtifactsAppliesRetentionDays(t *testing.T) {
t.Parallel()
srv := NewServer("127.0.0.1", 0, "", nodes.NewManager())
workspace := t.TempDir()
srv.SetWorkspacePath(workspace)
if err := os.MkdirAll(filepath.Join(workspace, "memory"), 0o755); err != nil {
t.Fatalf("mkdir memory: %v", err)
}
cfg := cfgpkg.DefaultConfig()
cfg.Gateway.Nodes.Artifacts.Enabled = true
cfg.Gateway.Nodes.Artifacts.KeepLatest = 10
cfg.Gateway.Nodes.Artifacts.RetainDays = 1
cfg.Gateway.Nodes.Artifacts.PruneOnRead = true
cfgPath := filepath.Join(workspace, "config.json")
if err := cfgpkg.SaveConfig(cfgPath, cfg); err != nil {
t.Fatalf("save config: %v", err)
}
srv.SetConfigPath(cfgPath)
oldTime := time.Now().UTC().Add(-48 * time.Hour).Format(time.RFC3339)
newTime := time.Now().UTC().Add(-2 * time.Hour).Format(time.RFC3339)
auditLines := strings.Join([]string{
fmt.Sprintf("{\"time\":%q,\"node\":\"edge-a\",\"action\":\"screen_snapshot\",\"ok\":true,\"artifacts\":[{\"name\":\"old.txt\",\"kind\":\"text\",\"mime_type\":\"text/plain\",\"content_base64\":\"b2xk\"}]}", oldTime),
fmt.Sprintf("{\"time\":%q,\"node\":\"edge-a\",\"action\":\"screen_snapshot\",\"ok\":true,\"artifacts\":[{\"name\":\"fresh.txt\",\"kind\":\"text\",\"mime_type\":\"text/plain\",\"content_base64\":\"ZnJlc2g=\"}]}", newTime),
}, "\n") + "\n"
if err := os.WriteFile(filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl"), []byte(auditLines), 0o644); err != nil {
t.Fatalf("write audit: %v", err)
}
req := httptest.NewRequest(http.MethodGet, "/webui/api/node_artifacts", nil)
rec := httptest.NewRecorder()
srv.handleWebUINodeArtifacts(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", rec.Code, rec.Body.String())
}
items := srv.webUINodeArtifactsPayload(10)
if len(items) != 1 {
t.Fatalf("expected retention days to keep 1 artifact, got %d", len(items))
}
if got := fmt.Sprint(items[0]["name"]); got != "fresh.txt" {
t.Fatalf("expected fresh artifact to remain, got %q", got)
}
}