update traffic last time (#8)

* refine SessionErrMsg

---------

Co-authored-by: Huge <huge@agi7.ai>
This commit is contained in:
百里(barry) 2024-07-11 18:36:52 +08:00 committed by GitHub
parent c0b091f139
commit fc764302ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 36 additions and 53 deletions

View File

@ -17,9 +17,9 @@ package server
import (
"cmp"
"encoding/json"
"math"
"net/http"
"slices"
"time"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
@ -457,47 +457,26 @@ func (svr *Service) deleteProxies(w http.ResponseWriter, r *http.Request) {
log.Infof("cleared [%d] offline proxies, total [%d] proxies", cleared, total)
}
func newRingBuffer(size int, name string) *ringBuffer {
return &ringBuffer{size: size, data: make([]int64, 0, size+1), name: name}
type proxyTraffic struct {
lastTraffic int64
lastTrafficTime int64
}
type ringBuffer struct {
size int
data []int64
name string
}
func (r *ringBuffer) Rate() float64 {
var data = r.data
if len(data) < r.size {
return math.NaN()
}
var growthRate = (float64(data[len(data)-1]) - float64(data[0])) / float64(len(data)-1) * 100
log.Infof("proxy rate calc: rate=%f count=%d name=%s", growthRate, len(data), r.name)
return growthRate
}
func (r *ringBuffer) Add(d int64) *ringBuffer {
if len(r.data) < r.size {
r.data = append(r.data, d)
} else {
r.data = append(r.data[1:], d)
}
func (r *proxyTraffic) Set(d int64) *proxyTraffic {
if r.lastTraffic == d {
return r
}
func (r *ringBuffer) Reset() {
r.data = r.data[:0]
r.lastTraffic = d
r.lastTrafficTime = time.Now().Unix()
return r
}
type ProxyPublishInfo struct {
Name string `json:"name"`
LastStartTime string `json:"lastStartTime"`
// LastTrafficTime unix seconds
LastTrafficTime int64 `json:"last_traffic_time"`
Time int64 `json:"time"`
Offline bool `json:"offline"`
TrafficRate *float64 `json:"traffic_rate,omitempty"`
}

View File

@ -21,7 +21,6 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"net"
"net/http"
"os"
@ -56,7 +55,6 @@ import (
"github.com/fatedier/golib/net/mux"
fmux "github.com/hashicorp/yamux"
quic "github.com/quic-go/quic-go"
"github.com/r3labs/sse/v2"
"github.com/samber/lo"
)
@ -682,6 +680,12 @@ type authMiddleware struct {
next http.Handler
}
const SessionErrMsgFmt = "The session is expired or invalid. Please close the current page and go to the device page to retry. (code=%d)"
const (
CookieNotFound = 1
AuthFailed = 2
)
func (m authMiddleware) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
if !strings.HasSuffix(request.Host, forwardHost) {
m.next.ServeHTTP(writer, request)
@ -693,7 +697,8 @@ func (m authMiddleware) ServeHTTP(writer http.ResponseWriter, request *http.Requ
cookie, err := request.Cookie(cookieName)
if err != nil {
writer.WriteHeader(http.StatusForbidden)
writer.Write([]byte(fmt.Sprintf("cookie not found, name=%s err=%s", cookieName, err.Error())))
log.Errorf("cookie not found, name=%s err=%s", cookieName, err.Error())
writer.Write([]byte(fmt.Sprintf(SessionErrMsgFmt, CookieNotFound)))
return
}
@ -701,7 +706,8 @@ func (m authMiddleware) ServeHTTP(writer http.ResponseWriter, request *http.Requ
claims, err := m.authVerify.GetVerifyData(token)
if err != nil {
writer.WriteHeader(http.StatusForbidden)
writer.Write([]byte(fmt.Sprintf("failed to verify auth, err=%s", err.Error())))
log.Errorf("failed to verify auth, err=%s", err.Error())
writer.Write([]byte(fmt.Sprintf(SessionErrMsgFmt, AuthFailed)))
return
}
@ -731,6 +737,7 @@ func (svr *Service) checkProxyStatusTimer() {
case <-svr.ctx.Done():
return
default:
break
}
func() {
@ -739,9 +746,9 @@ func (svr *Service) checkProxyStatusTimer() {
for _, info := range svr.getProxyStatsByType("http") {
mapSet[info.Name] = true
if vv, ok := svr.proxyTraffic.Load(info.Name); ok {
vv.(*ringBuffer).Add(info.TodayTrafficOut)
vv.(*proxyTraffic).Set(info.TodayTrafficOut)
} else {
svr.proxyTraffic.Store(info.Name, newRingBuffer(120, info.Name).Add(info.TodayTrafficOut))
svr.proxyTraffic.Store(info.Name, new(proxyTraffic).Set(info.TodayTrafficOut))
}
}
@ -766,6 +773,7 @@ func (svr *Service) checkProxyStatusTimer() {
case <-svr.ctx.Done():
return
default:
break
}
func() {
@ -774,21 +782,17 @@ func (svr *Service) checkProxyStatusTimer() {
log.Infof("publish proxy status, proxy_count=%d, stream=%v", len(proxyList), svr.ss.StreamExists(sseName))
for _, info := range proxyList {
var rate *float64
var pp, ok = svr.proxyTraffic.Load(info.Name)
if ok {
var rr = pp.(*ringBuffer).Rate()
if !math.IsNaN(rr) {
rate = lo.ToPtr(rr)
}
if !ok {
continue
}
var rr = pp.(*proxyTraffic)
var dd = ProxyPublishInfo{
Name: info.Name,
LastStartTime: info.LastStartTime,
LastTrafficTime: rr.lastTrafficTime,
Offline: info.Status == "offline",
Time: time.Now().Unix(),
TrafficRate: rate,
}
md, err := json.Marshal(dd)