mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-05-17 09:27:28 +08:00
align webui APIs with latest frontend (skills CRUD, stream/logs format, cron normalization, nodes delete)
This commit is contained in:
@@ -226,6 +226,9 @@ func gatewayCmd() {
|
|||||||
if kind == "" {
|
if kind == "" {
|
||||||
kind = "every"
|
kind = "every"
|
||||||
}
|
}
|
||||||
|
if kind == "once" {
|
||||||
|
kind = "at"
|
||||||
|
}
|
||||||
msg := getStr("message")
|
msg := getStr("message")
|
||||||
if msg == "" {
|
if msg == "" {
|
||||||
return nil, fmt.Errorf("message required")
|
return nil, fmt.Errorf("message required")
|
||||||
@@ -241,10 +244,12 @@ func gatewayCmd() {
|
|||||||
}
|
}
|
||||||
if kind == "at" {
|
if kind == "at" {
|
||||||
atMS, ok := args["atMs"].(float64)
|
atMS, ok := args["atMs"].(float64)
|
||||||
|
var at int64
|
||||||
if !ok || int64(atMS) <= 0 {
|
if !ok || int64(atMS) <= 0 {
|
||||||
return nil, fmt.Errorf("atMs required for kind=at")
|
at = time.Now().Add(1 * time.Minute).UnixMilli()
|
||||||
|
} else {
|
||||||
|
at = int64(atMS)
|
||||||
}
|
}
|
||||||
at := int64(atMS)
|
|
||||||
schedule.AtMS = &at
|
schedule.AtMS = &at
|
||||||
}
|
}
|
||||||
if kind == "cron" {
|
if kind == "cron" {
|
||||||
@@ -284,6 +289,9 @@ func gatewayCmd() {
|
|||||||
in.To = &v
|
in.To = &v
|
||||||
}
|
}
|
||||||
if kind := getStr("kind"); kind != "" {
|
if kind := getStr("kind"); kind != "" {
|
||||||
|
if kind == "once" {
|
||||||
|
kind = "at"
|
||||||
|
}
|
||||||
s := cron.CronSchedule{Kind: kind}
|
s := cron.CronSchedule{Kind: kind}
|
||||||
if kind == "every" {
|
if kind == "every" {
|
||||||
if everyMS, ok := args["everyMs"].(float64); ok && int64(everyMS) > 0 {
|
if everyMS, ok := args["everyMs"].(float64); ok && int64(everyMS) > 0 {
|
||||||
@@ -295,6 +303,9 @@ func gatewayCmd() {
|
|||||||
if atMS, ok := args["atMs"].(float64); ok && int64(atMS) > 0 {
|
if atMS, ok := args["atMs"].(float64); ok && int64(atMS) > 0 {
|
||||||
at := int64(atMS)
|
at := int64(atMS)
|
||||||
s.AtMS = &at
|
s.AtMS = &at
|
||||||
|
} else {
|
||||||
|
at := time.Now().Add(1 * time.Minute).UnixMilli()
|
||||||
|
s.AtMS = &at
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if kind == "cron" {
|
if kind == "cron" {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package nodes
|
package nodes
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -10,6 +11,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
@@ -380,14 +382,12 @@ func (s *RegistryServer) handleWebUIChatStream(w http.ResponseWriter, r *http.Re
|
|||||||
prompt += "[file: " + strings.TrimSpace(body.Media) + "]"
|
prompt += "[file: " + strings.TrimSpace(body.Media) + "]"
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "text/event-stream")
|
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
||||||
w.Header().Set("Cache-Control", "no-cache")
|
w.Header().Set("Cache-Control", "no-cache")
|
||||||
w.Header().Set("Connection", "keep-alive")
|
w.Header().Set("Connection", "keep-alive")
|
||||||
_, _ = w.Write([]byte("event: start\ndata: {\"ok\":true}\n\n"))
|
|
||||||
flusher.Flush()
|
|
||||||
resp, err := s.onChat(r.Context(), session, prompt)
|
resp, err := s.onChat(r.Context(), session, prompt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_, _ = w.Write([]byte("event: error\ndata: {\"error\":\"" + strings.ReplaceAll(err.Error(), "\"", "'") + "\"}\n\n"))
|
_, _ = w.Write([]byte("Error: " + err.Error()))
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -397,13 +397,9 @@ func (s *RegistryServer) handleWebUIChatStream(w http.ResponseWriter, r *http.Re
|
|||||||
if end > len(resp) {
|
if end > len(resp) {
|
||||||
end = len(resp)
|
end = len(resp)
|
||||||
}
|
}
|
||||||
part := strings.ReplaceAll(resp[i:end], "\n", "\\n")
|
_, _ = w.Write([]byte(resp[i:end]))
|
||||||
part = strings.ReplaceAll(part, "\"", "'")
|
|
||||||
_, _ = w.Write([]byte("event: delta\ndata: {\"text\":\"" + part + "\"}\n\n"))
|
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
}
|
}
|
||||||
_, _ = w.Write([]byte("event: done\ndata: {\"ok\":true}\n\n"))
|
|
||||||
flusher.Flush()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *RegistryServer) handleWebUINodes(w http.ResponseWriter, r *http.Request) {
|
func (s *RegistryServer) handleWebUINodes(w http.ResponseWriter, r *http.Request) {
|
||||||
@@ -467,9 +463,9 @@ func (s *RegistryServer) handleWebUICron(w http.ResponseWriter, r *http.Request)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if action == "list" {
|
if action == "list" {
|
||||||
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "jobs": res})
|
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "jobs": normalizeCronJobs(res)})
|
||||||
} else {
|
} else {
|
||||||
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "job": res})
|
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "job": normalizeCronJob(res)})
|
||||||
}
|
}
|
||||||
case http.MethodPost:
|
case http.MethodPost:
|
||||||
args := map[string]interface{}{}
|
args := map[string]interface{}{}
|
||||||
@@ -488,7 +484,7 @@ func (s *RegistryServer) handleWebUICron(w http.ResponseWriter, r *http.Request)
|
|||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "result": res})
|
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "result": normalizeCronJob(res)})
|
||||||
default:
|
default:
|
||||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||||
}
|
}
|
||||||
@@ -500,28 +496,30 @@ func (s *RegistryServer) handleWebUISkills(w http.ResponseWriter, r *http.Reques
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
skillsDir := filepath.Join(strings.TrimSpace(s.workspacePath), "skills")
|
skillsDir := filepath.Join(strings.TrimSpace(s.workspacePath), "skills")
|
||||||
if skillsDir == "" {
|
if strings.TrimSpace(skillsDir) == "" {
|
||||||
http.Error(w, "workspace not configured", http.StatusInternalServerError)
|
http.Error(w, "workspace not configured", http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
_ = os.MkdirAll(skillsDir, 0755)
|
||||||
|
|
||||||
switch r.Method {
|
switch r.Method {
|
||||||
case http.MethodGet:
|
case http.MethodGet:
|
||||||
entries, err := os.ReadDir(skillsDir)
|
entries, err := os.ReadDir(skillsDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
|
||||||
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "skills": []interface{}{}})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
type skillItem struct {
|
type skillItem struct {
|
||||||
Name string `json:"name"`
|
ID string `json:"id"`
|
||||||
Enabled bool `json:"enabled"`
|
Name string `json:"name"`
|
||||||
UpdateChecked bool `json:"update_checked"`
|
Description string `json:"description"`
|
||||||
RemoteFound bool `json:"remote_found,omitempty"`
|
Tools []string `json:"tools"`
|
||||||
RemoteVersion string `json:"remote_version,omitempty"`
|
SystemPrompt string `json:"system_prompt,omitempty"`
|
||||||
CheckError string `json:"check_error,omitempty"`
|
Enabled bool `json:"enabled"`
|
||||||
|
UpdateChecked bool `json:"update_checked"`
|
||||||
|
RemoteFound bool `json:"remote_found,omitempty"`
|
||||||
|
RemoteVersion string `json:"remote_version,omitempty"`
|
||||||
|
CheckError string `json:"check_error,omitempty"`
|
||||||
}
|
}
|
||||||
items := make([]skillItem, 0, len(entries))
|
items := make([]skillItem, 0, len(entries))
|
||||||
checkUpdates := strings.TrimSpace(r.URL.Query().Get("check_updates")) != "0"
|
checkUpdates := strings.TrimSpace(r.URL.Query().Get("check_updates")) != "0"
|
||||||
@@ -532,7 +530,14 @@ func (s *RegistryServer) handleWebUISkills(w http.ResponseWriter, r *http.Reques
|
|||||||
name := e.Name()
|
name := e.Name()
|
||||||
enabled := !strings.HasSuffix(name, ".disabled")
|
enabled := !strings.HasSuffix(name, ".disabled")
|
||||||
baseName := strings.TrimSuffix(name, ".disabled")
|
baseName := strings.TrimSuffix(name, ".disabled")
|
||||||
it := skillItem{Name: baseName, Enabled: enabled, UpdateChecked: checkUpdates}
|
desc, tools, sys := readSkillMeta(filepath.Join(skillsDir, name, "SKILL.md"))
|
||||||
|
if desc == "" || len(tools) == 0 || sys == "" {
|
||||||
|
d2, t2, s2 := readSkillMeta(filepath.Join(skillsDir, baseName, "SKILL.md"))
|
||||||
|
if desc == "" { desc = d2 }
|
||||||
|
if len(tools) == 0 { tools = t2 }
|
||||||
|
if sys == "" { sys = s2 }
|
||||||
|
}
|
||||||
|
it := skillItem{ID: baseName, Name: baseName, Description: desc, Tools: tools, SystemPrompt: sys, Enabled: enabled, UpdateChecked: checkUpdates}
|
||||||
if checkUpdates {
|
if checkUpdates {
|
||||||
found, version, checkErr := queryClawHubSkillVersion(r.Context(), baseName)
|
found, version, checkErr := queryClawHubSkillVersion(r.Context(), baseName)
|
||||||
it.RemoteFound = found
|
it.RemoteFound = found
|
||||||
@@ -545,22 +550,26 @@ func (s *RegistryServer) handleWebUISkills(w http.ResponseWriter, r *http.Reques
|
|||||||
}
|
}
|
||||||
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "skills": items, "source": "clawhub"})
|
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "skills": items, "source": "clawhub"})
|
||||||
case http.MethodPost:
|
case http.MethodPost:
|
||||||
var body struct {
|
var body map[string]interface{}
|
||||||
Action string `json:"action"`
|
|
||||||
Name string `json:"name"`
|
|
||||||
}
|
|
||||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||||
http.Error(w, "invalid json", http.StatusBadRequest)
|
http.Error(w, "invalid json", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
action := strings.ToLower(strings.TrimSpace(body.Action))
|
action, _ := body["action"].(string)
|
||||||
name := strings.TrimSpace(body.Name)
|
action = strings.ToLower(strings.TrimSpace(action))
|
||||||
|
id, _ := body["id"].(string)
|
||||||
|
name, _ := body["name"].(string)
|
||||||
|
if strings.TrimSpace(name) == "" {
|
||||||
|
name = id
|
||||||
|
}
|
||||||
|
name = strings.TrimSpace(name)
|
||||||
if name == "" {
|
if name == "" {
|
||||||
http.Error(w, "name required", http.StatusBadRequest)
|
http.Error(w, "name required", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
enabledPath := filepath.Join(skillsDir, name)
|
enabledPath := filepath.Join(skillsDir, name)
|
||||||
disabledPath := enabledPath + ".disabled"
|
disabledPath := enabledPath + ".disabled"
|
||||||
|
|
||||||
switch action {
|
switch action {
|
||||||
case "enable":
|
case "enable":
|
||||||
if _, err := os.Stat(disabledPath); err == nil {
|
if _, err := os.Stat(disabledPath); err == nil {
|
||||||
@@ -569,6 +578,7 @@ func (s *RegistryServer) handleWebUISkills(w http.ResponseWriter, r *http.Reques
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true})
|
||||||
case "disable":
|
case "disable":
|
||||||
if _, err := os.Stat(enabledPath); err == nil {
|
if _, err := os.Stat(enabledPath); err == nil {
|
||||||
if err := os.Rename(enabledPath, disabledPath); err != nil {
|
if err := os.Rename(enabledPath, disabledPath); err != nil {
|
||||||
@@ -576,16 +586,158 @@ func (s *RegistryServer) handleWebUISkills(w http.ResponseWriter, r *http.Reques
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true})
|
||||||
|
case "create", "update":
|
||||||
|
desc, _ := body["description"].(string)
|
||||||
|
sys, _ := body["system_prompt"].(string)
|
||||||
|
var toolsList []string
|
||||||
|
if arr, ok := body["tools"].([]interface{}); ok {
|
||||||
|
for _, v := range arr {
|
||||||
|
if sv, ok := v.(string); ok && strings.TrimSpace(sv) != "" {
|
||||||
|
toolsList = append(toolsList, strings.TrimSpace(sv))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if action == "create" {
|
||||||
|
if _, err := os.Stat(enabledPath); err == nil {
|
||||||
|
http.Error(w, "skill already exists", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := os.MkdirAll(filepath.Join(enabledPath, "scripts"), 0755); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
skillMD := buildSkillMarkdown(name, desc, toolsList, sys)
|
||||||
|
if err := os.WriteFile(filepath.Join(enabledPath, "SKILL.md"), []byte(skillMD), 0644); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true})
|
||||||
default:
|
default:
|
||||||
http.Error(w, "unsupported action", http.StatusBadRequest)
|
http.Error(w, "unsupported action", http.StatusBadRequest)
|
||||||
|
}
|
||||||
|
case http.MethodDelete:
|
||||||
|
id := strings.TrimSpace(r.URL.Query().Get("id"))
|
||||||
|
if id == "" {
|
||||||
|
http.Error(w, "id required", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true})
|
pathA := filepath.Join(skillsDir, id)
|
||||||
|
pathB := pathA + ".disabled"
|
||||||
|
deleted := false
|
||||||
|
if err := os.RemoveAll(pathA); err == nil {
|
||||||
|
deleted = true
|
||||||
|
}
|
||||||
|
if err := os.RemoveAll(pathB); err == nil {
|
||||||
|
deleted = true
|
||||||
|
}
|
||||||
|
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "deleted": deleted, "id": id})
|
||||||
default:
|
default:
|
||||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func buildSkillMarkdown(name, desc string, tools []string, systemPrompt string) string {
|
||||||
|
if strings.TrimSpace(desc) == "" {
|
||||||
|
desc = "No description provided."
|
||||||
|
}
|
||||||
|
t := strings.Join(tools, ", ")
|
||||||
|
return fmt.Sprintf(`---
|
||||||
|
name: %s
|
||||||
|
description: %s
|
||||||
|
---
|
||||||
|
|
||||||
|
# %s
|
||||||
|
|
||||||
|
%s
|
||||||
|
|
||||||
|
## Tools
|
||||||
|
%s
|
||||||
|
|
||||||
|
## System Prompt
|
||||||
|
%s
|
||||||
|
`, name, desc, name, desc, t, systemPrompt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func readSkillMeta(path string) (desc string, tools []string, systemPrompt string) {
|
||||||
|
b, err := os.ReadFile(path)
|
||||||
|
if err != nil {
|
||||||
|
return "", nil, ""
|
||||||
|
}
|
||||||
|
s := string(b)
|
||||||
|
reDesc := regexp.MustCompile(`(?m)^description:\s*(.+)$`)
|
||||||
|
reTools := regexp.MustCompile(`(?m)^##\s*Tools\s*$`)
|
||||||
|
rePrompt := regexp.MustCompile(`(?m)^##\s*System Prompt\s*$`)
|
||||||
|
if m := reDesc.FindStringSubmatch(s); len(m) > 1 {
|
||||||
|
desc = strings.TrimSpace(m[1])
|
||||||
|
}
|
||||||
|
if loc := reTools.FindStringIndex(s); loc != nil {
|
||||||
|
block := s[loc[1]:]
|
||||||
|
if p := rePrompt.FindStringIndex(block); p != nil {
|
||||||
|
block = block[:p[0]]
|
||||||
|
}
|
||||||
|
for _, line := range strings.Split(block, "\n") {
|
||||||
|
v := strings.TrimSpace(strings.TrimPrefix(strings.TrimSpace(line), "-"))
|
||||||
|
if v != "" {
|
||||||
|
tools = append(tools, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if loc := rePrompt.FindStringIndex(s); loc != nil {
|
||||||
|
systemPrompt = strings.TrimSpace(s[loc[1]:])
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func normalizeCronJob(v interface{}) map[string]interface{} {
|
||||||
|
if v == nil {
|
||||||
|
return map[string]interface{}{}
|
||||||
|
}
|
||||||
|
b, err := json.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return map[string]interface{}{"raw": fmt.Sprintf("%v", v)}
|
||||||
|
}
|
||||||
|
var m map[string]interface{}
|
||||||
|
if err := json.Unmarshal(b, &m); err != nil {
|
||||||
|
return map[string]interface{}{"raw": string(b)}
|
||||||
|
}
|
||||||
|
out := map[string]interface{}{}
|
||||||
|
for k, val := range m {
|
||||||
|
out[k] = val
|
||||||
|
}
|
||||||
|
if sch, ok := m["schedule"].(map[string]interface{}); ok {
|
||||||
|
if kind, ok := sch["kind"]; ok { out["kind"] = kind }
|
||||||
|
if every, ok := sch["everyMs"]; ok { out["everyMs"] = every }
|
||||||
|
if expr, ok := sch["expr"]; ok { out["expr"] = expr }
|
||||||
|
if at, ok := sch["atMs"]; ok { out["atMs"] = at }
|
||||||
|
}
|
||||||
|
if payload, ok := m["payload"].(map[string]interface{}); ok {
|
||||||
|
if msg, ok := payload["message"]; ok { out["message"] = msg }
|
||||||
|
if d, ok := payload["deliver"]; ok { out["deliver"] = d }
|
||||||
|
if c, ok := payload["channel"]; ok { out["channel"] = c }
|
||||||
|
if to, ok := payload["to"]; ok { out["to"] = to }
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeCronJobs(v interface{}) []map[string]interface{} {
|
||||||
|
b, err := json.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return []map[string]interface{}{}
|
||||||
|
}
|
||||||
|
var arr []interface{}
|
||||||
|
if err := json.Unmarshal(b, &arr); err != nil {
|
||||||
|
return []map[string]interface{}{}
|
||||||
|
}
|
||||||
|
out := make([]map[string]interface{}, 0, len(arr))
|
||||||
|
for _, it := range arr {
|
||||||
|
out = append(out, normalizeCronJob(it))
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
func queryClawHubSkillVersion(ctx context.Context, skill string) (found bool, version string, err error) {
|
func queryClawHubSkillVersion(ctx context.Context, skill string) (found bool, version string, err error) {
|
||||||
skill = strings.TrimSpace(skill)
|
skill = strings.TrimSpace(skill)
|
||||||
if skill == "" {
|
if skill == "" {
|
||||||
@@ -743,21 +895,28 @@ func (s *RegistryServer) handleWebUILogsStream(w http.ResponseWriter, r *http.Re
|
|||||||
http.Error(w, "stream unsupported", http.StatusInternalServerError)
|
http.Error(w, "stream unsupported", http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
w.Header().Set("Content-Type", "text/event-stream")
|
w.Header().Set("Content-Type", "application/x-ndjson")
|
||||||
w.Header().Set("Cache-Control", "no-cache")
|
w.Header().Set("Cache-Control", "no-cache")
|
||||||
w.Header().Set("Connection", "keep-alive")
|
w.Header().Set("Connection", "keep-alive")
|
||||||
buf := make([]byte, 4096)
|
|
||||||
|
reader := bufio.NewReader(f)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-r.Context().Done():
|
case <-r.Context().Done():
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
n, err := f.Read(buf)
|
line, err := reader.ReadString('\n')
|
||||||
if n > 0 {
|
if len(line) > 0 {
|
||||||
chunk := strings.ReplaceAll(string(buf[:n]), "\n", "\\n")
|
trim := strings.TrimRight(line, "\r\n")
|
||||||
chunk = strings.ReplaceAll(chunk, "\"", "'")
|
if trim != "" {
|
||||||
_, _ = w.Write([]byte("event: log\ndata: {\"chunk\":\"" + chunk + "\"}\n\n"))
|
if json.Valid([]byte(trim)) {
|
||||||
flusher.Flush()
|
_, _ = w.Write([]byte(trim + "\n"))
|
||||||
|
} else {
|
||||||
|
fallback, _ := json.Marshal(map[string]interface{}{"time": time.Now().UTC().Format(time.RFC3339), "level": "INFO", "msg": trim})
|
||||||
|
_, _ = w.Write(append(fallback, '\n'))
|
||||||
|
}
|
||||||
|
flusher.Flush()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
time.Sleep(500 * time.Millisecond)
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
|||||||
Reference in New Issue
Block a user