frp/server/service.go
百里(barry) c0b091f139
add proxy manager and monitor (#7)
* GitButler Integration Commit

This is an integration commit for the virtual branches that GitButler is tracking.

Due to GitButler managing multiple virtual branches, you cannot switch back and
forth between git branches and virtual branches easily. 

If you switch to another branch, GitButler will need to be reinitialized.
If you commit on this branch, GitButler will throw it away.

Here are the branches that are currently applied:
 - feat/frpcc (refs/gitbutler/feat/frpcc)
   branch head: e52195a01a6e3432cccc3ba952a8c940ad4d3fc6
   - test/main.go
   - go.sum
   - go.mod
   - server/service.go
For more information about what we're doing here, check out our docs:
https://docs.gitbutler.com/features/virtual-branches/integration-branch

* fix: barry 2024-07-03 15:36:17

* fix: barry 2024-07-03 15:44:12

* fix: barry 2024-07-03 15:46:25

* fix: barry 2024-07-03 16:52:13

* fix: barry 2024-07-03 17:30:53

* fix: barry 2024-07-03 17:42:01

* fix: barry 2024-07-03 18:43:39

* fix: barry 2024-07-03 19:36:06

* fix: barry 2024-07-03 19:43:47

* fix: barry 2024-07-04 10:51:06

* fix: barry 2024-07-04 12:29:01

* fix log and body

* fix rate calc

* fix: barry 2024-07-09 18:20:06

---------

Co-authored-by: GitButler <gitbutler@gitbutler.com>
2024-07-09 19:20:28 +08:00

808 lines
23 KiB
Go

// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"math"
"net"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/fatedier/frp/pkg/auth"
v1 "github.com/fatedier/frp/pkg/config/v1"
modelmetrics "github.com/fatedier/frp/pkg/metrics"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/nathole"
plugin "github.com/fatedier/frp/pkg/plugin/server"
"github.com/fatedier/frp/pkg/ssh"
"github.com/fatedier/frp/pkg/transport"
httppkg "github.com/fatedier/frp/pkg/util/http"
"github.com/fatedier/frp/pkg/util/log"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/tcpmux"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/version"
"github.com/fatedier/frp/pkg/util/vhost"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/frp/server/controller"
"github.com/fatedier/frp/server/group"
"github.com/fatedier/frp/server/metrics"
"github.com/fatedier/frp/server/ports"
"github.com/fatedier/frp/server/proxy"
"github.com/fatedier/frp/server/visitor"
"github.com/fatedier/golib/crypto"
"github.com/fatedier/golib/net/mux"
fmux "github.com/hashicorp/yamux"
quic "github.com/quic-go/quic-go"
"github.com/r3labs/sse/v2"
"github.com/samber/lo"
)
const (
connReadTimeout time.Duration = 10 * time.Second
vhostReadWriteTimeout time.Duration = 30 * time.Second
forwardHost = "remote.agi7.ai"
forwardCookieName = "agi7.forward.auth"
sseName = "proxy_status"
)
func init() {
crypto.DefaultSalt = "frp"
// Disable quic-go's receive buffer warning.
os.Setenv("QUIC_GO_DISABLE_RECEIVE_BUFFER_WARNING", "true")
// Disable quic-go's ECN support by default. It may cause issues on certain operating systems.
if os.Getenv("QUIC_GO_DISABLE_ECN") == "" {
os.Setenv("QUIC_GO_DISABLE_ECN", "true")
}
}
// Server service
type Service struct {
// Dispatch connections to different handlers listen on same port
muxer *mux.Mux
// Accept connections from client
listener net.Listener
// Accept connections using kcp
kcpListener net.Listener
// Accept connections using quic
quicListener *quic.Listener
// Accept connections using websocket
websocketListener net.Listener
// Accept frp tls connections
tlsListener net.Listener
// Accept pipe connections from ssh tunnel gateway
sshTunnelListener *netpkg.InternalListener
// Manage all controllers
ctlManager *ControlManager
// Manage all proxies
pxyManager *proxy.Manager
// Manage all plugins
pluginManager *plugin.Manager
// HTTP vhost router
httpVhostRouter *vhost.Routers
// All resource managers and controllers
rc *controller.ResourceController
// web server for dashboard UI and apis
webServer *httppkg.Server
sshTunnelGateway *ssh.Gateway
// Verifies authentication based on selected method
authVerifier auth.Verifier
tlsConfig *tls.Config
cfg *v1.ServerConfig
// service context
ctx context.Context
// call cancel to stop service
cancel context.CancelFunc
ss *sse.Server
proxyTraffic sync.Map
}
func NewService(cfg *v1.ServerConfig) (*Service, error) {
tlsConfig, err := transport.NewServerTLSConfig(
cfg.Transport.TLS.CertFile,
cfg.Transport.TLS.KeyFile,
cfg.Transport.TLS.TrustedCaFile)
if err != nil {
return nil, err
}
var webServer *httppkg.Server
if cfg.WebServer.Port > 0 {
ws, err := httppkg.NewServer(cfg.WebServer)
if err != nil {
return nil, err
}
webServer = ws
modelmetrics.EnableMem()
if cfg.EnablePrometheus {
modelmetrics.EnablePrometheus()
}
}
server := sse.New()
server.AutoStream = true
svr := &Service{
ss: server,
ctlManager: NewControlManager(),
pxyManager: proxy.NewManager(),
pluginManager: plugin.NewManager(),
rc: &controller.ResourceController{
VisitorManager: visitor.NewManager(),
TCPPortManager: ports.NewManager("tcp", cfg.ProxyBindAddr, cfg.AllowPorts),
UDPPortManager: ports.NewManager("udp", cfg.ProxyBindAddr, cfg.AllowPorts),
},
sshTunnelListener: netpkg.NewInternalListener(),
httpVhostRouter: vhost.NewRouters(),
authVerifier: auth.NewAuthVerifier(cfg.Auth),
webServer: webServer,
tlsConfig: tlsConfig,
cfg: cfg,
ctx: context.Background(),
}
if webServer != nil {
webServer.RouteRegister(svr.registerRouteHandlers)
}
// Create tcpmux httpconnect multiplexer.
if cfg.TCPMuxHTTPConnectPort > 0 {
var l net.Listener
address := net.JoinHostPort(cfg.ProxyBindAddr, strconv.Itoa(cfg.TCPMuxHTTPConnectPort))
l, err = net.Listen("tcp", address)
if err != nil {
return nil, fmt.Errorf("create server listener error, %v", err)
}
svr.rc.TCPMuxHTTPConnectMuxer, err = tcpmux.NewHTTPConnectTCPMuxer(l, cfg.TCPMuxPassthrough, vhostReadWriteTimeout)
if err != nil {
return nil, fmt.Errorf("create vhost tcpMuxer error, %v", err)
}
log.Infof("tcpmux httpconnect multiplexer listen on %s, passthough: %v", address, cfg.TCPMuxPassthrough)
}
// Init all plugins
for _, p := range cfg.HTTPPlugins {
svr.pluginManager.Register(plugin.NewHTTPPluginOptions(p))
log.Infof("plugin [%s] has been registered", p.Name)
}
svr.rc.PluginManager = svr.pluginManager
// Init group controller
svr.rc.TCPGroupCtl = group.NewTCPGroupCtl(svr.rc.TCPPortManager)
// Init HTTP group controller
svr.rc.HTTPGroupCtl = group.NewHTTPGroupController(svr.httpVhostRouter)
// Init TCP mux group controller
svr.rc.TCPMuxGroupCtl = group.NewTCPMuxGroupCtl(svr.rc.TCPMuxHTTPConnectMuxer)
// Init 404 not found page
vhost.NotFoundPagePath = cfg.Custom404Page
var (
httpMuxOn bool
httpsMuxOn bool
)
if cfg.BindAddr == cfg.ProxyBindAddr {
if cfg.BindPort == cfg.VhostHTTPPort {
httpMuxOn = true
}
if cfg.BindPort == cfg.VhostHTTPSPort {
httpsMuxOn = true
}
}
// Listen for accepting connections from client.
address := net.JoinHostPort(cfg.BindAddr, strconv.Itoa(cfg.BindPort))
ln, err := net.Listen("tcp", address)
if err != nil {
return nil, fmt.Errorf("create server listener error, %v", err)
}
svr.muxer = mux.NewMux(ln)
svr.muxer.SetKeepAlive(time.Duration(cfg.Transport.TCPKeepAlive) * time.Second)
go func() {
_ = svr.muxer.Serve()
}()
ln = svr.muxer.DefaultListener()
svr.listener = ln
log.Infof("frps tcp listen on %s", address)
// Listen for accepting connections from client using kcp protocol.
if cfg.KCPBindPort > 0 {
address := net.JoinHostPort(cfg.BindAddr, strconv.Itoa(cfg.KCPBindPort))
svr.kcpListener, err = netpkg.ListenKcp(address)
if err != nil {
return nil, fmt.Errorf("listen on kcp udp address %s error: %v", address, err)
}
log.Infof("frps kcp listen on udp %s", address)
}
if cfg.QUICBindPort > 0 {
address := net.JoinHostPort(cfg.BindAddr, strconv.Itoa(cfg.QUICBindPort))
quicTLSCfg := tlsConfig.Clone()
quicTLSCfg.NextProtos = []string{"frp"}
svr.quicListener, err = quic.ListenAddr(address, quicTLSCfg, &quic.Config{
MaxIdleTimeout: time.Duration(cfg.Transport.QUIC.MaxIdleTimeout) * time.Second,
MaxIncomingStreams: int64(cfg.Transport.QUIC.MaxIncomingStreams),
KeepAlivePeriod: time.Duration(cfg.Transport.QUIC.KeepalivePeriod) * time.Second,
})
if err != nil {
return nil, fmt.Errorf("listen on quic udp address %s error: %v", address, err)
}
log.Infof("frps quic listen on %s", address)
}
if cfg.SSHTunnelGateway.BindPort > 0 {
sshGateway, err := ssh.NewGateway(cfg.SSHTunnelGateway, cfg.ProxyBindAddr, svr.sshTunnelListener)
if err != nil {
return nil, fmt.Errorf("create ssh gateway error: %v", err)
}
svr.sshTunnelGateway = sshGateway
log.Infof("frps sshTunnelGateway listen on port %d", cfg.SSHTunnelGateway.BindPort)
}
// Listen for accepting connections from client using websocket protocol.
websocketPrefix := []byte("GET " + netpkg.FrpWebsocketPath)
websocketLn := svr.muxer.Listen(0, uint32(len(websocketPrefix)), func(data []byte) bool {
return bytes.Equal(data, websocketPrefix)
})
svr.websocketListener = netpkg.NewWebsocketListener(websocketLn)
// Create http vhost muxer.
if cfg.VhostHTTPPort > 0 {
rp := vhost.NewHTTPReverseProxy(vhost.HTTPReverseProxyOptions{
ResponseHeaderTimeoutS: cfg.VhostHTTPTimeout,
}, svr.httpVhostRouter)
svr.rc.HTTPReverseProxy = rp
address := net.JoinHostPort(cfg.ProxyBindAddr, strconv.Itoa(cfg.VhostHTTPPort))
server := &http.Server{
Addr: address,
Handler: &authMiddleware{
next: rp,
authVerify: svr.authVerifier.(*auth.JWTAuthSetterVerifier),
},
}
var l net.Listener
if httpMuxOn {
l = svr.muxer.ListenHttp(1)
} else {
l, err = net.Listen("tcp", address)
if err != nil {
return nil, fmt.Errorf("create vhost http listener error, %v", err)
}
}
go func() {
_ = server.Serve(l)
}()
log.Infof("http service listen on %s", address)
}
// Create https vhost muxer.
if cfg.VhostHTTPSPort > 0 {
var l net.Listener
if httpsMuxOn {
l = svr.muxer.ListenHttps(1)
} else {
address := net.JoinHostPort(cfg.ProxyBindAddr, strconv.Itoa(cfg.VhostHTTPSPort))
l, err = net.Listen("tcp", address)
if err != nil {
return nil, fmt.Errorf("create server listener error, %v", err)
}
log.Infof("https service listen on %s", address)
}
svr.rc.VhostHTTPSMuxer, err = vhost.NewHTTPSMuxer(l, vhostReadWriteTimeout)
if err != nil {
return nil, fmt.Errorf("create vhost httpsMuxer error, %v", err)
}
}
// frp tls listener
svr.tlsListener = svr.muxer.Listen(2, 1, func(data []byte) bool {
// tls first byte can be 0x16 only when vhost https port is not same with bind port
return int(data[0]) == netpkg.FRPTLSHeadByte || int(data[0]) == 0x16
})
// Create nat hole controller.
nc, err := nathole.NewController(time.Duration(cfg.NatHoleAnalysisDataReserveHours) * time.Hour)
if err != nil {
return nil, fmt.Errorf("create nat hole controller error, %v", err)
}
svr.rc.NatHoleController = nc
svr.checkProxyStatusTimer()
return svr, nil
}
func (svr *Service) Run(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
svr.ctx = ctx
svr.cancel = cancel
// run dashboard web server.
if svr.webServer != nil {
go func() {
log.Infof("dashboard listen on %s", svr.webServer.Address())
if err := svr.webServer.Run(); err != nil {
log.Warnf("dashboard server exit with error: %v", err)
}
}()
}
go svr.HandleListener(svr.sshTunnelListener, true)
if svr.kcpListener != nil {
go svr.HandleListener(svr.kcpListener, false)
}
if svr.quicListener != nil {
go svr.HandleQUICListener(svr.quicListener)
}
go svr.HandleListener(svr.websocketListener, false)
go svr.HandleListener(svr.tlsListener, false)
if svr.rc.NatHoleController != nil {
go svr.rc.NatHoleController.CleanWorker(svr.ctx)
}
if svr.sshTunnelGateway != nil {
go svr.sshTunnelGateway.Run()
}
svr.HandleListener(svr.listener, false)
<-svr.ctx.Done()
// service context may not be canceled by svr.Close(), we should call it here to release resources
if svr.listener != nil {
svr.Close()
}
}
func (svr *Service) Close() error {
if svr.kcpListener != nil {
svr.kcpListener.Close()
svr.kcpListener = nil
}
if svr.quicListener != nil {
svr.quicListener.Close()
svr.quicListener = nil
}
if svr.websocketListener != nil {
svr.websocketListener.Close()
svr.websocketListener = nil
}
if svr.tlsListener != nil {
svr.tlsListener.Close()
svr.tlsConfig = nil
}
if svr.listener != nil {
svr.listener.Close()
svr.listener = nil
}
svr.ctlManager.Close()
if svr.cancel != nil {
svr.cancel()
}
return nil
}
func (svr *Service) handleConnection(ctx context.Context, conn net.Conn, internal bool) {
xl := xlog.FromContextSafe(ctx)
var (
rawMsg msg.Message
err error
)
_ = conn.SetReadDeadline(time.Now().Add(connReadTimeout))
if rawMsg, err = msg.ReadMsg(conn); err != nil {
log.Tracef("Failed to read message: %v", err)
conn.Close()
return
}
_ = conn.SetReadDeadline(time.Time{})
switch m := rawMsg.(type) {
case *msg.Login:
// server plugin hook
content := &plugin.LoginContent{
Login: *m,
ClientAddress: conn.RemoteAddr().String(),
}
retContent, err := svr.pluginManager.Login(content)
if err == nil {
m = &retContent.Login
err = svr.RegisterControl(conn, m, internal)
}
// If login failed, send error message there.
// Otherwise send success message in control's work goroutine.
if err != nil {
xl.Warnf("register control error: %v", err)
_ = msg.WriteMsg(conn, &msg.LoginResp{
Version: version.Full(),
Error: util.GenerateResponseErrorString("register control error", err, lo.FromPtr(svr.cfg.DetailedErrorsToClient)),
})
conn.Close()
}
case *msg.NewWorkConn:
if err := svr.RegisterWorkConn(conn, m); err != nil {
conn.Close()
}
case *msg.NewVisitorConn:
if err = svr.RegisterVisitorConn(conn, m); err != nil {
xl.Warnf("register visitor conn error: %v", err)
_ = msg.WriteMsg(conn, &msg.NewVisitorConnResp{
ProxyName: m.ProxyName,
Error: util.GenerateResponseErrorString("register visitor conn error", err, lo.FromPtr(svr.cfg.DetailedErrorsToClient)),
})
conn.Close()
} else {
_ = msg.WriteMsg(conn, &msg.NewVisitorConnResp{
ProxyName: m.ProxyName,
Error: "",
})
}
default:
log.Warnf("Error message type for the new connection [%s]", conn.RemoteAddr().String())
conn.Close()
}
}
// HandleListener accepts connections from client and call handleConnection to handle them.
// If internal is true, it means that this listener is used for internal communication like ssh tunnel gateway.
// TODO(fatedier): Pass some parameters of listener/connection through context to avoid passing too many parameters.
func (svr *Service) HandleListener(l net.Listener, internal bool) {
// Listen for incoming connections from client.
for {
c, err := l.Accept()
if err != nil {
log.Warnf("Listener for incoming connections from client closed")
return
}
// inject xlog object into net.Conn context
xl := xlog.New()
ctx := context.Background()
c = netpkg.NewContextConn(xlog.NewContext(ctx, xl), c)
if !internal {
log.Tracef("start check TLS connection...")
originConn := c
forceTLS := svr.cfg.Transport.TLS.Force
var isTLS, custom bool
c, isTLS, custom, err = netpkg.CheckAndEnableTLSServerConnWithTimeout(c, svr.tlsConfig, forceTLS, connReadTimeout)
if err != nil {
log.Warnf("CheckAndEnableTLSServerConnWithTimeout error: %v", err)
originConn.Close()
continue
}
log.Tracef("check TLS connection success, isTLS: %v custom: %v internal: %v", isTLS, custom, internal)
}
// Start a new goroutine to handle connection.
go func(ctx context.Context, frpConn net.Conn) {
if lo.FromPtr(svr.cfg.Transport.TCPMux) && !internal {
fmuxCfg := fmux.DefaultConfig()
fmuxCfg.KeepAliveInterval = time.Duration(svr.cfg.Transport.TCPMuxKeepaliveInterval) * time.Second
fmuxCfg.LogOutput = io.Discard
fmuxCfg.MaxStreamWindowSize = 6 * 1024 * 1024
session, err := fmux.Server(frpConn, fmuxCfg)
if err != nil {
log.Warnf("Failed to create mux connection: %v", err)
frpConn.Close()
return
}
for {
stream, err := session.AcceptStreamWithContext(ctx)
if err != nil {
log.Debugf("Accept new mux stream error: %v", err)
session.Close()
return
}
go svr.handleConnection(ctx, stream, internal)
}
} else {
svr.handleConnection(ctx, frpConn, internal)
}
}(ctx, c)
}
}
func (svr *Service) HandleQUICListener(l *quic.Listener) {
// Listen for incoming connections from client.
for {
c, err := l.Accept(context.Background())
if err != nil {
log.Warnf("QUICListener for incoming connections from client closed")
return
}
// Start a new goroutine to handle connection.
go func(ctx context.Context, frpConn quic.Connection) {
for {
stream, err := frpConn.AcceptStream(context.Background())
if err != nil {
log.Debugf("Accept new quic mux stream error: %v", err)
_ = frpConn.CloseWithError(0, "")
return
}
go svr.handleConnection(ctx, netpkg.QuicStreamToNetConn(stream, frpConn), false)
}
}(context.Background(), c)
}
}
func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login, internal bool) error {
// If client's RunID is empty, it's a new client, we just create a new controller.
// Otherwise, we check if there is one controller has the same run id. If so, we release previous controller and start new one.
var err error
if loginMsg.RunID == "" {
loginMsg.RunID, err = util.RandID()
if err != nil {
return err
}
}
ctx := netpkg.NewContextFromConn(ctlConn)
xl := xlog.FromContextSafe(ctx)
xl.AppendPrefix(loginMsg.RunID)
ctx = xlog.NewContext(ctx, xl)
xl.Infof("client login info: ip [%s] version [%s] hostname [%s] os [%s] arch [%s]",
ctlConn.RemoteAddr().String(), loginMsg.Version, loginMsg.Hostname, loginMsg.Os, loginMsg.Arch)
// Check auth.
authVerifier := svr.authVerifier
if internal && loginMsg.ClientSpec.AlwaysAuthPass {
authVerifier = auth.AlwaysPassVerifier
}
if err := authVerifier.VerifyLogin(loginMsg); err != nil {
return err
}
// TODO(fatedier): use SessionContext
ctl, err := NewControl(ctx, svr.rc, svr.pxyManager, svr.pluginManager, authVerifier, ctlConn, !internal, loginMsg, svr.cfg)
if err != nil {
xl.Warnf("create new controller error: %v", err)
// don't return detailed errors to client
return fmt.Errorf("unexpected error when creating new controller")
}
if oldCtl := svr.ctlManager.Add(loginMsg.RunID, ctl); oldCtl != nil {
oldCtl.WaitClosed()
}
ctl.Start()
// for statistics
metrics.Server.NewClient()
go func() {
// block until control closed
ctl.WaitClosed()
svr.ctlManager.Del(loginMsg.RunID, ctl)
}()
return nil
}
// RegisterWorkConn register a new work connection to control and proxies need it.
func (svr *Service) RegisterWorkConn(workConn net.Conn, newMsg *msg.NewWorkConn) error {
xl := netpkg.NewLogFromConn(workConn)
ctl, exist := svr.ctlManager.GetByID(newMsg.RunID)
if !exist {
xl.Warnf("No client control found for run id [%s]", newMsg.RunID)
return fmt.Errorf("no client control found for run id [%s]", newMsg.RunID)
}
// server plugin hook
content := &plugin.NewWorkConnContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
NewWorkConn: *newMsg,
}
retContent, err := svr.pluginManager.NewWorkConn(content)
if err == nil {
newMsg = &retContent.NewWorkConn
// Check auth.
err = ctl.authVerifier.VerifyNewWorkConn(newMsg)
}
if err != nil {
xl.Warnf("invalid NewWorkConn with run id [%s]", newMsg.RunID)
_ = msg.WriteMsg(workConn, &msg.StartWorkConn{
Error: util.GenerateResponseErrorString("invalid NewWorkConn", err, lo.FromPtr(svr.cfg.DetailedErrorsToClient)),
})
return fmt.Errorf("invalid NewWorkConn with run id [%s]", newMsg.RunID)
}
return ctl.RegisterWorkConn(workConn)
}
func (svr *Service) RegisterVisitorConn(visitorConn net.Conn, newMsg *msg.NewVisitorConn) error {
visitorUser := ""
// TODO(deprecation): Compatible with old versions, can be without runID, user is empty. In later versions, it will be mandatory to include runID.
// If runID is required, it is not compatible with versions prior to v0.50.0.
if newMsg.RunID != "" {
ctl, exist := svr.ctlManager.GetByID(newMsg.RunID)
if !exist {
return fmt.Errorf("no client control found for run id [%s]", newMsg.RunID)
}
visitorUser = ctl.loginMsg.User
}
return svr.rc.VisitorManager.NewConn(newMsg.ProxyName, visitorConn, newMsg.Timestamp, newMsg.SignKey,
newMsg.UseEncryption, newMsg.UseCompression, visitorUser)
}
type authMiddleware struct {
authVerify *auth.JWTAuthSetterVerifier
next http.Handler
}
func (m authMiddleware) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
if !strings.HasSuffix(request.Host, forwardHost) {
m.next.ServeHTTP(writer, request)
return
}
var domain = strings.SplitN(request.Host, ".", 2)[0]
var cookieName = fmt.Sprintf("%s.%s", forwardCookieName, domain)
cookie, err := request.Cookie(cookieName)
if err != nil {
writer.WriteHeader(http.StatusForbidden)
writer.Write([]byte(fmt.Sprintf("cookie not found, name=%s err=%s", cookieName, err.Error())))
return
}
var token = cookie.Value
claims, err := m.authVerify.GetVerifyData(token)
if err != nil {
writer.WriteHeader(http.StatusForbidden)
writer.Write([]byte(fmt.Sprintf("failed to verify auth, err=%s", err.Error())))
return
}
if !strings.HasPrefix(request.Host, fmt.Sprintf("%s.", claims["domain"])) {
writer.WriteHeader(http.StatusForbidden)
writer.Write([]byte("domain access deny"))
return
}
cookieData := request.Header.Get("Cookie")
var cc string
for _, v := range strings.Split(cookieData, ";") {
if strings.HasPrefix(v, cookieName) {
continue
}
cc += v + ";"
}
request.Header.Set("Cookie", cc)
m.next.ServeHTTP(writer, request)
}
func (svr *Service) checkProxyStatusTimer() {
go func() {
for {
select {
case <-svr.ctx.Done():
return
default:
}
func() {
// update proxy traffic every 15s, total 30m
var mapSet = make(map[string]bool)
for _, info := range svr.getProxyStatsByType("http") {
mapSet[info.Name] = true
if vv, ok := svr.proxyTraffic.Load(info.Name); ok {
vv.(*ringBuffer).Add(info.TodayTrafficOut)
} else {
svr.proxyTraffic.Store(info.Name, newRingBuffer(120, info.Name).Add(info.TodayTrafficOut))
}
}
log.Infof("check and record proxy traffic, proxy_count=%d", len(mapSet))
// delete old data
svr.proxyTraffic.Range(func(key, value any) bool {
if !mapSet[key.(string)] {
svr.proxyTraffic.Delete(key)
}
return true
})
}()
time.Sleep(time.Second * 15)
}
}()
go func() {
for {
select {
case <-svr.ctx.Done():
return
default:
}
func() {
var proxyList = svr.getProxyStatsByType("http")
log.Infof("publish proxy status, proxy_count=%d, stream=%v", len(proxyList), svr.ss.StreamExists(sseName))
for _, info := range proxyList {
var rate *float64
var pp, ok = svr.proxyTraffic.Load(info.Name)
if ok {
var rr = pp.(*ringBuffer).Rate()
if !math.IsNaN(rr) {
rate = lo.ToPtr(rr)
}
}
var dd = ProxyPublishInfo{
Name: info.Name,
LastStartTime: info.LastStartTime,
Offline: info.Status == "offline",
Time: time.Now().Unix(),
TrafficRate: rate,
}
md, err := json.Marshal(dd)
if err != nil {
log.Errorf("failed to encode json data, err=%s", err)
continue
}
svr.ss.Publish(sseName, &sse.Event{Data: md})
}
}()
time.Sleep(time.Second * 10)
}
}()
}