diff --git a/client/control.go b/client/control.go index e1916890..b78680f7 100644 --- a/client/control.go +++ b/client/control.go @@ -51,6 +51,8 @@ type SessionContext struct { } type Control struct { + svr *Service + // service context ctx context.Context xl *xlog.Logger @@ -79,9 +81,10 @@ type Control struct { msgDispatcher *msg.Dispatcher } -func NewControl(ctx context.Context, sessionCtx *SessionContext) (*Control, error) { +func NewControl(ctx context.Context, sessionCtx *SessionContext, svr *Service) (*Control, error) { // new xlog instance ctl := &Control{ + svr: svr, ctx: ctx, xl: xlog.FromContextSafe(ctx), sessionCtx: sessionCtx, @@ -182,15 +185,29 @@ func (ctl *Control) handleNatHoleResp(m msg.Message) { } } +func (ctl *Control) handleCustom(m msg.Message) { + xl := ctl.xl + inMsg := m.(*msg.ClientProxyClose) + xl.Infof("client get close message: %s", inMsg) + ctl.svr.GracefulClose(0) +} + func (ctl *Control) handlePong(m msg.Message) { xl := ctl.xl inMsg := m.(*msg.Pong) + if inMsg.AuthErr != "" { + xl.Errorf("Pong message contains auth error: %s", inMsg.AuthErr) + ctl.svr.GracefulClose(0) + return + } + if inMsg.Error != "" { xl.Errorf("Pong message contains error: %s", inMsg.Error) ctl.closeSession() return } + ctl.lastPong.Store(time.Now()) xl.Debugf("receive heartbeat from server") } @@ -230,6 +247,7 @@ func (ctl *Control) registerMsgHandlers() { ctl.msgDispatcher.RegisterHandler(&msg.NewProxyResp{}, ctl.handleNewProxyResp) ctl.msgDispatcher.RegisterHandler(&msg.NatHoleResp{}, ctl.handleNatHoleResp) ctl.msgDispatcher.RegisterHandler(&msg.Pong{}, ctl.handlePong) + ctl.msgDispatcher.RegisterHandler(&msg.ClientProxyClose{}, ctl.handleCustom) } // headerWorker sends heartbeat to server and check heartbeat timeout. diff --git a/client/service.go b/client/service.go index 7a7f6dc8..5b1b0b5d 100644 --- a/client/service.go +++ b/client/service.go @@ -317,7 +317,7 @@ func (svr *Service) loopLoginUntilSuccess(maxInterval time.Duration, firstLoginE AuthSetter: svr.authSetter, Connector: connector, } - ctl, err := NewControl(svr.ctx, sessionCtx) + ctl, err := NewControl(svr.ctx, sessionCtx, svr) if err != nil { conn.Close() xl.Errorf("NewControl error: %v", err) diff --git a/go.mod b/go.mod index 84f8bae0..83d6cc41 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/pires/go-proxyproto v0.7.0 github.com/prometheus/client_golang v1.19.0 github.com/quic-go/quic-go v0.42.0 + github.com/r3labs/sse/v2 v2.10.0 github.com/rodaine/table v1.2.0 github.com/samber/lo v1.39.0 github.com/spf13/cobra v1.8.0 @@ -74,6 +75,7 @@ require ( golang.org/x/tools v0.17.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/protobuf v1.33.0 // indirect + gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect diff --git a/go.sum b/go.sum index 6a94df6d..834231c5 100644 --- a/go.sum +++ b/go.sum @@ -114,6 +114,8 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/quic-go/quic-go v0.42.0 h1:uSfdap0eveIl8KXnipv9K7nlwZ5IqLlYOpJ58u5utpM= github.com/quic-go/quic-go v0.42.0/go.mod h1:132kz4kL3F9vxhW3CtQJLDVwcFe5wdWeJXXijhsO57M= +github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0= +github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rodaine/table v1.2.0 h1:38HEnwK4mKSHQJIkavVj+bst1TEY7j9zhLMWu4QJrMA= @@ -132,6 +134,7 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= @@ -181,6 +184,7 @@ golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= @@ -266,6 +270,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= +gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/pkg/msg/handler.go b/pkg/msg/handler.go index cb1eb15a..007f9a14 100644 --- a/pkg/msg/handler.go +++ b/pkg/msg/handler.go @@ -17,6 +17,8 @@ package msg import ( "io" "reflect" + + "github.com/fatedier/golib/log" ) func AsyncHandler(f func(Message)) func(Message) { @@ -69,6 +71,7 @@ func (d *Dispatcher) readLoop() { return } + log.Debugf("receive message from server: %s, exists: %v", reflect.TypeOf(m), d.msgHandlers[reflect.TypeOf(m)] != nil) if handler, ok := d.msgHandlers[reflect.TypeOf(m)]; ok { handler(m) } else if d.defaultHandler != nil { diff --git a/pkg/msg/msg.go b/pkg/msg/msg.go index ab6d7d28..30e77840 100644 --- a/pkg/msg/msg.go +++ b/pkg/msg/msg.go @@ -38,6 +38,7 @@ const ( TypeNatHoleResp = 'm' TypeNatHoleSid = '5' TypeNatHoleReport = '6' + TypeClientProxyClose = '7' ) var msgTypeMap = map[byte]interface{}{ @@ -59,6 +60,7 @@ var msgTypeMap = map[byte]interface{}{ TypeNatHoleResp: NatHoleResp{}, TypeNatHoleSid: NatHoleSid{}, TypeNatHoleReport: NatHoleReport{}, + TypeClientProxyClose: ClientProxyClose{}, } var TypeNameNatHoleResp = reflect.TypeOf(&NatHoleResp{}).Elem().Name() @@ -178,7 +180,8 @@ type Ping struct { } type Pong struct { - Error string `json:"error,omitempty"` + Error string `json:"error,omitempty"` + AuthErr string `json:"auth_err,omitempty"` } type UDPPacket struct { @@ -243,3 +246,7 @@ type NatHoleReport struct { Sid string `json:"sid,omitempty"` Success bool `json:"success,omitempty"` } + +type ClientProxyClose struct { + Name string +} diff --git a/pkg/util/vhost/router.go b/pkg/util/vhost/router.go index 4df79aa7..37b122a4 100644 --- a/pkg/util/vhost/router.go +++ b/pkg/util/vhost/router.go @@ -93,6 +93,25 @@ func (r *Routers) Del(domain, location, httpUser string) { routersByHTTPUser[httpUser] = newVrs } +func (r *Routers) GetAll() (rr []map[string]any) { + r.mutex.RLock() + defer r.mutex.RUnlock() + + for _, v := range r.indexByDomain { + for _, v1 := range v { + for _, v2 := range v1 { + rr = append(rr, map[string]any{ + "domain": v2.domain, + "location": v2.location, + "httpUser": v2.httpUser, + "payload": v2.payload, + }) + } + } + } + return rr +} + func (r *Routers) Get(host, path, httpUser string) (vr *Router, exist bool) { host = strings.ToLower(host) diff --git a/server/control.go b/server/control.go index 0227549b..ad6ce6e2 100644 --- a/server/control.go +++ b/server/control.go @@ -367,6 +367,7 @@ func (ctl *Control) worker() { func (ctl *Control) registerMsgHandlers() { ctl.msgDispatcher.RegisterHandler(&msg.NewProxy{}, ctl.handleNewProxy) ctl.msgDispatcher.RegisterHandler(&msg.Ping{}, ctl.handlePing) + ctl.msgDispatcher.RegisterHandler(&msg.ClientProxyClose{}, ctl.handleCustom) ctl.msgDispatcher.RegisterHandler(&msg.NatHoleVisitor{}, msg.AsyncHandler(ctl.handleNatHoleVisitor)) ctl.msgDispatcher.RegisterHandler(&msg.NatHoleClient{}, msg.AsyncHandler(ctl.handleNatHoleClient)) ctl.msgDispatcher.RegisterHandler(&msg.NatHoleReport{}, msg.AsyncHandler(ctl.handleNatHoleReport)) @@ -408,6 +409,14 @@ func (ctl *Control) handleNewProxy(m msg.Message) { _ = ctl.msgDispatcher.Send(resp) } +func (ctl *Control) handleCustom(m msg.Message) { + inMsg := m.(*msg.ClientProxyClose) + + xl := ctl.xl + xl.Debugf("handle custom request") + _ = ctl.msgDispatcher.Send(inMsg) +} + func (ctl *Control) handlePing(m msg.Message) { xl := ctl.xl inMsg := m.(*msg.Ping) @@ -421,14 +430,19 @@ func (ctl *Control) handlePing(m msg.Message) { Ping: *inMsg, } retContent, err := ctl.pluginManager.Ping(content) + var authErr string if err == nil { inMsg = &retContent.Ping err = ctl.authVerifier.VerifyPing(inMsg) + if err != nil { + authErr = err.Error() + } } if err != nil { xl.Warnf("received invalid ping: %v", err) _ = ctl.msgDispatcher.Send(&msg.Pong{ - Error: util.GenerateResponseErrorString("invalid ping", err, lo.FromPtr(ctl.serverCfg.DetailedErrorsToClient)), + Error: util.GenerateResponseErrorString("invalid ping", err, lo.FromPtr(ctl.serverCfg.DetailedErrorsToClient)), + AuthErr: authErr, }) return } diff --git a/server/dashboard_api.go b/server/dashboard_api.go index 62415c96..004ac73e 100644 --- a/server/dashboard_api.go +++ b/server/dashboard_api.go @@ -17,6 +17,7 @@ package server import ( "cmp" "encoding/json" + "math" "net/http" "slices" @@ -26,6 +27,7 @@ import ( "github.com/fatedier/frp/pkg/config/types" v1 "github.com/fatedier/frp/pkg/config/v1" "github.com/fatedier/frp/pkg/metrics/mem" + "github.com/fatedier/frp/pkg/msg" httppkg "github.com/fatedier/frp/pkg/util/http" "github.com/fatedier/frp/pkg/util/log" netpkg "github.com/fatedier/frp/pkg/util/net" @@ -51,9 +53,11 @@ func (svr *Service) registerRouteHandlers(helper *httppkg.RouterRegisterHelper) } // apis + subRouter.HandleFunc("/api/sub", svr.apiServerSub) subRouter.HandleFunc("/api/serverinfo", svr.apiServerInfo).Methods("GET") subRouter.HandleFunc("/api/proxy/{type}", svr.apiProxyByType).Methods("GET") subRouter.HandleFunc("/api/proxy/{type}/{name}", svr.apiProxyByTypeAndName).Methods("GET") + subRouter.HandleFunc("/api/proxy/{type}/{name}/close", svr.apiCloseProxyByTypeAndName).Methods("POST") subRouter.HandleFunc("/api/traffic/{name}", svr.apiProxyTraffic).Methods("GET") subRouter.HandleFunc("/api/proxies", svr.deleteProxies).Methods("DELETE") @@ -95,6 +99,15 @@ func (svr *Service) healthz(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(200) } +func (svr *Service) apiServerSub(w http.ResponseWriter, r *http.Request) { + go func() { + <-r.Context().Done() + log.Infof("The client is disconnected here, name=%s", sseName) + }() + + svr.ss.ServeHTTP(w, r) +} + // /api/serverinfo func (svr *Service) apiServerInfo(w http.ResponseWriter, r *http.Request) { res := GeneralResponse{Code: 200} @@ -206,6 +219,7 @@ type ProxyStatsInfo struct { LastStartTime string `json:"lastStartTime"` LastCloseTime string `json:"lastCloseTime"` Status string `json:"status"` + TrafficOutList []int64 `json:"traffic_out_list"` } type GetProxyInfoResp struct { @@ -283,6 +297,43 @@ type GetProxyStatsResp struct { Status string `json:"status"` } +func (svr *Service) apiCloseProxyByTypeAndName(w http.ResponseWriter, r *http.Request) { + res := GeneralResponse{Code: 200} + params := mux.Vars(r) + name := params["name"] + + defer func() { + log.Infof("Http response [%s]: code [%d]", r.URL.Path, res.Code) + w.WriteHeader(res.Code) + if len(res.Msg) > 0 { + _, _ = w.Write([]byte(res.Msg)) + } + }() + log.Infof("Http request: [%s]", r.URL.Path) + + pxy, ok := svr.pxyManager.GetByName(name) + if !ok { + res.Code = 404 + res.Msg = "not found" + return + } + + cc, ok := svr.ctlManager.GetByID(pxy.GetUserInfo().RunID) + if !ok { + res.Code = 404 + res.Msg = "not found" + return + } + + err := cc.msgDispatcher.Send(&msg.ClientProxyClose{Name: name}) + if err != nil { + res.Code = 500 + res.Msg = err.Error() + } else { + res.Msg = "ok" + } +} + // /api/proxy/:type/:name func (svr *Service) apiProxyByTypeAndName(w http.ResponseWriter, r *http.Request) { res := GeneralResponse{Code: 200} @@ -405,3 +456,48 @@ func (svr *Service) deleteProxies(w http.ResponseWriter, r *http.Request) { cleared, total := mem.StatsCollector.ClearOfflineProxies() log.Infof("cleared [%d] offline proxies, total [%d] proxies", cleared, total) } + +func newRingBuffer(size int, name string) *ringBuffer { + return &ringBuffer{size: size, data: make([]int64, 0, size+1), name: name} +} + +type ringBuffer struct { + size int + data []int64 + name string +} + +func (r *ringBuffer) Rate() float64 { + var data = r.data + + if len(data) < r.size { + return math.NaN() + } + + var growthRate = (float64(data[len(data)-1]) - float64(data[0])) / float64(len(data)-1) * 100 + + log.Infof("proxy rate calc: rate=%f count=%d name=%s", growthRate, len(data), r.name) + + return growthRate +} + +func (r *ringBuffer) Add(d int64) *ringBuffer { + if len(r.data) < r.size { + r.data = append(r.data, d) + } else { + r.data = append(r.data[1:], d) + } + return r +} + +func (r *ringBuffer) Reset() { + r.data = r.data[:0] +} + +type ProxyPublishInfo struct { + Name string `json:"name"` + LastStartTime string `json:"lastStartTime"` + Time int64 `json:"time"` + Offline bool `json:"offline"` + TrafficRate *float64 `json:"traffic_rate,omitempty"` +} diff --git a/server/service.go b/server/service.go index 7e5106d7..4dfd57cc 100644 --- a/server/service.go +++ b/server/service.go @@ -18,21 +18,18 @@ import ( "bytes" "context" "crypto/tls" + "encoding/json" "fmt" "io" + "math" "net" "net/http" "os" "strconv" "strings" + "sync" "time" - "github.com/fatedier/golib/crypto" - "github.com/fatedier/golib/net/mux" - fmux "github.com/hashicorp/yamux" - quic "github.com/quic-go/quic-go" - "github.com/samber/lo" - "github.com/fatedier/frp/pkg/auth" v1 "github.com/fatedier/frp/pkg/config/v1" modelmetrics "github.com/fatedier/frp/pkg/metrics" @@ -55,6 +52,12 @@ import ( "github.com/fatedier/frp/server/ports" "github.com/fatedier/frp/server/proxy" "github.com/fatedier/frp/server/visitor" + "github.com/fatedier/golib/crypto" + "github.com/fatedier/golib/net/mux" + fmux "github.com/hashicorp/yamux" + quic "github.com/quic-go/quic-go" + "github.com/r3labs/sse/v2" + "github.com/samber/lo" ) const ( @@ -62,6 +65,7 @@ const ( vhostReadWriteTimeout time.Duration = 30 * time.Second forwardHost = "remote.agi7.ai" forwardCookieName = "agi7.forward.auth" + sseName = "proxy_status" ) func init() { @@ -128,6 +132,10 @@ type Service struct { ctx context.Context // call cancel to stop service cancel context.CancelFunc + + ss *sse.Server + + proxyTraffic sync.Map } func NewService(cfg *v1.ServerConfig) (*Service, error) { @@ -153,7 +161,12 @@ func NewService(cfg *v1.ServerConfig) (*Service, error) { } } + server := sse.New() + server.AutoStream = true + svr := &Service{ + ss: server, + ctlManager: NewControlManager(), pxyManager: proxy.NewManager(), pluginManager: plugin.NewManager(), @@ -342,6 +355,8 @@ func NewService(cfg *v1.ServerConfig) (*Service, error) { return nil, fmt.Errorf("create nat hole controller error, %v", err) } svr.rc.NatHoleController = nc + + svr.checkProxyStatusTimer() return svr, nil } @@ -673,10 +688,12 @@ func (m authMiddleware) ServeHTTP(writer http.ResponseWriter, request *http.Requ return } - cookie, err := request.Cookie(forwardCookieName) + var domain = strings.SplitN(request.Host, ".", 2)[0] + var cookieName = fmt.Sprintf("%s.%s", forwardCookieName, domain) + cookie, err := request.Cookie(cookieName) if err != nil { writer.WriteHeader(http.StatusForbidden) - writer.Write([]byte(err.Error())) + writer.Write([]byte(fmt.Sprintf("cookie not found, name=%s err=%s", cookieName, err.Error()))) return } @@ -684,24 +701,107 @@ func (m authMiddleware) ServeHTTP(writer http.ResponseWriter, request *http.Requ claims, err := m.authVerify.GetVerifyData(token) if err != nil { writer.WriteHeader(http.StatusForbidden) - writer.Write([]byte(err.Error())) + writer.Write([]byte(fmt.Sprintf("failed to verify auth, err=%s", err.Error()))) return } if !strings.HasPrefix(request.Host, fmt.Sprintf("%s.", claims["domain"])) { writer.WriteHeader(http.StatusForbidden) - writer.Write([]byte(fmt.Sprintf("domain access deny"))) + writer.Write([]byte("domain access deny")) return } cookieData := request.Header.Get("Cookie") var cc string for _, v := range strings.Split(cookieData, ";") { - if strings.HasPrefix(v, forwardCookieName) { + if strings.HasPrefix(v, cookieName) { continue } cc += v + ";" } request.Header.Set("Cookie", cc) + m.next.ServeHTTP(writer, request) } + +func (svr *Service) checkProxyStatusTimer() { + go func() { + for { + select { + case <-svr.ctx.Done(): + return + default: + } + + func() { + // update proxy traffic every 15s, total 30m + var mapSet = make(map[string]bool) + for _, info := range svr.getProxyStatsByType("http") { + mapSet[info.Name] = true + if vv, ok := svr.proxyTraffic.Load(info.Name); ok { + vv.(*ringBuffer).Add(info.TodayTrafficOut) + } else { + svr.proxyTraffic.Store(info.Name, newRingBuffer(120, info.Name).Add(info.TodayTrafficOut)) + } + } + + log.Infof("check and record proxy traffic, proxy_count=%d", len(mapSet)) + + // delete old data + svr.proxyTraffic.Range(func(key, value any) bool { + if !mapSet[key.(string)] { + svr.proxyTraffic.Delete(key) + } + return true + }) + }() + + time.Sleep(time.Second * 15) + } + }() + + go func() { + for { + select { + case <-svr.ctx.Done(): + return + default: + } + + func() { + var proxyList = svr.getProxyStatsByType("http") + + log.Infof("publish proxy status, proxy_count=%d, stream=%v", len(proxyList), svr.ss.StreamExists(sseName)) + + for _, info := range proxyList { + var rate *float64 + var pp, ok = svr.proxyTraffic.Load(info.Name) + if ok { + var rr = pp.(*ringBuffer).Rate() + if !math.IsNaN(rr) { + rate = lo.ToPtr(rr) + } + } + + var dd = ProxyPublishInfo{ + Name: info.Name, + LastStartTime: info.LastStartTime, + Offline: info.Status == "offline", + Time: time.Now().Unix(), + TrafficRate: rate, + } + + md, err := json.Marshal(dd) + if err != nil { + log.Errorf("failed to encode json data, err=%s", err) + continue + } + + svr.ss.Publish(sseName, &sse.Event{Data: md}) + } + }() + + time.Sleep(time.Second * 10) + } + }() +}