limiter at proxy level

This commit is contained in:
Craig O'Donnell 2023-01-30 23:15:09 -05:00
parent cb2e1cfffa
commit de7771da54
9 changed files with 67 additions and 4 deletions

View File

@ -20,8 +20,10 @@ import (
"strings" "strings"
frpIo "github.com/fatedier/golib/io" frpIo "github.com/fatedier/golib/io"
"golang.org/x/time/rate"
"github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/util/limit"
frpNet "github.com/fatedier/frp/pkg/util/net" frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util" "github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/vhost" "github.com/fatedier/frp/pkg/util/vhost"
@ -135,6 +137,10 @@ func (pxy *HTTPProxy) GetConf() config.ProxyConf {
return pxy.cfg return pxy.cfg
} }
func (pxy *HTTPProxy) GetLimiter() *rate.Limiter {
return pxy.limiter
}
func (pxy *HTTPProxy) GetRealConn(remoteAddr string) (workConn net.Conn, err error) { func (pxy *HTTPProxy) GetRealConn(remoteAddr string) (workConn net.Conn, err error) {
xl := pxy.xl xl := pxy.xl
rAddr, errRet := net.ResolveTCPAddr("tcp", remoteAddr) rAddr, errRet := net.ResolveTCPAddr("tcp", remoteAddr)
@ -160,6 +166,13 @@ func (pxy *HTTPProxy) GetRealConn(remoteAddr string) (workConn net.Conn, err err
if pxy.cfg.UseCompression { if pxy.cfg.UseCompression {
rwc = frpIo.WithCompression(rwc) rwc = frpIo.WithCompression(rwc)
} }
if pxy.GetLimiter() != nil {
rwc = frpIo.WrapReadWriteCloser(limit.NewReader(rwc, pxy.GetLimiter()), limit.NewWriter(rwc, pxy.GetLimiter()), func() error {
return rwc.Close()
})
}
workConn = frpNet.WrapReadWriteCloserToConn(rwc, tmpConn) workConn = frpNet.WrapReadWriteCloserToConn(rwc, tmpConn)
workConn = frpNet.WrapStatsConn(workConn, pxy.updateStatsAfterClosedConn) workConn = frpNet.WrapStatsConn(workConn, pxy.updateStatsAfterClosedConn)
metrics.Server.OpenConnection(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType) metrics.Server.OpenConnection(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)

View File

@ -20,6 +20,7 @@ import (
"github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/util/util" "github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/vhost" "github.com/fatedier/frp/pkg/util/vhost"
"golang.org/x/time/rate"
) )
type HTTPSProxy struct { type HTTPSProxy struct {
@ -74,6 +75,10 @@ func (pxy *HTTPSProxy) GetConf() config.ProxyConf {
return pxy.cfg return pxy.cfg
} }
func (pxy *HTTPSProxy) GetLimiter() *rate.Limiter {
return pxy.limiter
}
func (pxy *HTTPSProxy) Close() { func (pxy *HTTPSProxy) Close() {
pxy.BaseProxy.Close() pxy.BaseProxy.Close()
} }

View File

@ -47,6 +47,7 @@ type Proxy interface {
GetUsedPortsNum() int GetUsedPortsNum() int
GetResourceController() *controller.ResourceController GetResourceController() *controller.ResourceController
GetUserInfo() plugin.UserInfo GetUserInfo() plugin.UserInfo
GetLimiter() *rate.Limiter
Close() Close()
} }
@ -58,6 +59,7 @@ type BaseProxy struct {
poolCount int poolCount int
getWorkConnFn GetWorkConnFn getWorkConnFn GetWorkConnFn
serverCfg config.ServerCommonConf serverCfg config.ServerCommonConf
limiter *rate.Limiter
userInfo plugin.UserInfo userInfo plugin.UserInfo
mu sync.RWMutex mu sync.RWMutex
@ -189,6 +191,13 @@ func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.Reso
getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf, getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf,
) (pxy Proxy, err error) { ) (pxy Proxy, err error) {
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(pxyConf.GetBaseInfo().ProxyName) xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(pxyConf.GetBaseInfo().ProxyName)
var limiter *rate.Limiter
limitBytes := pxyConf.GetBaseInfo().BandwidthLimit.Bytes()
if limitBytes > 0 {
limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
}
basePxy := BaseProxy{ basePxy := BaseProxy{
name: pxyConf.GetBaseInfo().ProxyName, name: pxyConf.GetBaseInfo().ProxyName,
rc: rc, rc: rc,
@ -196,6 +205,7 @@ func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.Reso
poolCount: poolCount, poolCount: poolCount,
getWorkConnFn: getWorkConnFn, getWorkConnFn: getWorkConnFn,
serverCfg: serverCfg, serverCfg: serverCfg,
limiter: limiter,
xl: xl, xl: xl,
ctx: xlog.NewContext(ctx, xl), ctx: xlog.NewContext(ctx, xl),
userInfo: userInfo, userInfo: userInfo,
@ -290,10 +300,8 @@ func HandleUserTCPConnection(pxy Proxy, userConn net.Conn, serverCfg config.Serv
local = frpIo.WithCompression(local) local = frpIo.WithCompression(local)
} }
limitBytes := cfg.BandwidthLimit.Bytes() if pxy.GetLimiter() != nil {
if limitBytes > 0 { local = frpIo.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error {
limiter := rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
local = frpIo.WrapReadWriteCloser(limit.NewReader(local, limiter), limit.NewWriter(local, limiter), func() error {
return local.Close() return local.Close()
}) })
} }

View File

@ -16,6 +16,7 @@ package proxy
import ( import (
"github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/config"
"golang.org/x/time/rate"
) )
type STCPProxy struct { type STCPProxy struct {
@ -41,6 +42,10 @@ func (pxy *STCPProxy) GetConf() config.ProxyConf {
return pxy.cfg return pxy.cfg
} }
func (pxy *STCPProxy) GetLimiter() *rate.Limiter {
return pxy.limiter
}
func (pxy *STCPProxy) Close() { func (pxy *STCPProxy) Close() {
pxy.BaseProxy.Close() pxy.BaseProxy.Close()
pxy.rc.VisitorManager.CloseListener(pxy.GetName()) pxy.rc.VisitorManager.CloseListener(pxy.GetName())

View File

@ -16,6 +16,7 @@ package proxy
import ( import (
"github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/config"
"golang.org/x/time/rate"
) )
type SUDPProxy struct { type SUDPProxy struct {
@ -42,6 +43,10 @@ func (pxy *SUDPProxy) GetConf() config.ProxyConf {
return pxy.cfg return pxy.cfg
} }
func (pxy *SUDPProxy) GetLimiter() *rate.Limiter {
return pxy.limiter
}
func (pxy *SUDPProxy) Close() { func (pxy *SUDPProxy) Close() {
pxy.BaseProxy.Close() pxy.BaseProxy.Close()
pxy.rc.VisitorManager.CloseListener(pxy.GetName()) pxy.rc.VisitorManager.CloseListener(pxy.GetName())

View File

@ -20,6 +20,7 @@ import (
"strconv" "strconv"
"github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/config"
"golang.org/x/time/rate"
) )
type TCPProxy struct { type TCPProxy struct {
@ -74,6 +75,10 @@ func (pxy *TCPProxy) GetConf() config.ProxyConf {
return pxy.cfg return pxy.cfg
} }
func (pxy *TCPProxy) GetLimiter() *rate.Limiter {
return pxy.limiter
}
func (pxy *TCPProxy) Close() { func (pxy *TCPProxy) Close() {
pxy.BaseProxy.Close() pxy.BaseProxy.Close()
if pxy.cfg.Group == "" { if pxy.cfg.Group == "" {

View File

@ -23,6 +23,7 @@ import (
"github.com/fatedier/frp/pkg/consts" "github.com/fatedier/frp/pkg/consts"
"github.com/fatedier/frp/pkg/util/util" "github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/vhost" "github.com/fatedier/frp/pkg/util/vhost"
"golang.org/x/time/rate"
) )
type TCPMuxProxy struct { type TCPMuxProxy struct {
@ -94,6 +95,10 @@ func (pxy *TCPMuxProxy) GetConf() config.ProxyConf {
return pxy.cfg return pxy.cfg
} }
func (pxy *TCPMuxProxy) GetLimiter() *rate.Limiter {
return pxy.limiter
}
func (pxy *TCPMuxProxy) Close() { func (pxy *TCPMuxProxy) Close() {
pxy.BaseProxy.Close() pxy.BaseProxy.Close()
} }

View File

@ -24,10 +24,12 @@ import (
"github.com/fatedier/golib/errors" "github.com/fatedier/golib/errors"
frpIo "github.com/fatedier/golib/io" frpIo "github.com/fatedier/golib/io"
"golang.org/x/time/rate"
"github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg" "github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp" "github.com/fatedier/frp/pkg/proto/udp"
"github.com/fatedier/frp/pkg/util/limit"
frpNet "github.com/fatedier/frp/pkg/util/net" frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/server/metrics" "github.com/fatedier/frp/server/metrics"
) )
@ -198,6 +200,12 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
rwc = frpIo.WithCompression(rwc) rwc = frpIo.WithCompression(rwc)
} }
if pxy.GetLimiter() != nil {
rwc = frpIo.WrapReadWriteCloser(limit.NewReader(rwc, pxy.GetLimiter()), limit.NewWriter(rwc, pxy.GetLimiter()), func() error {
return rwc.Close()
})
}
pxy.workConn = frpNet.WrapReadWriteCloserToConn(rwc, workConn) pxy.workConn = frpNet.WrapReadWriteCloserToConn(rwc, workConn)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go workConnReaderFn(pxy.workConn) go workConnReaderFn(pxy.workConn)
@ -225,6 +233,10 @@ func (pxy *UDPProxy) GetConf() config.ProxyConf {
return pxy.cfg return pxy.cfg
} }
func (pxy *UDPProxy) GetLimiter() *rate.Limiter {
return pxy.limiter
}
func (pxy *UDPProxy) Close() { func (pxy *UDPProxy) Close() {
pxy.mu.Lock() pxy.mu.Lock()
defer pxy.mu.Unlock() defer pxy.mu.Unlock()

View File

@ -18,6 +18,7 @@ import (
"fmt" "fmt"
"github.com/fatedier/golib/errors" "github.com/fatedier/golib/errors"
"golang.org/x/time/rate"
"github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg" "github.com/fatedier/frp/pkg/msg"
@ -88,6 +89,10 @@ func (pxy *XTCPProxy) GetConf() config.ProxyConf {
return pxy.cfg return pxy.cfg
} }
func (pxy *XTCPProxy) GetLimiter() *rate.Limiter {
return pxy.limiter
}
func (pxy *XTCPProxy) Close() { func (pxy *XTCPProxy) Close() {
pxy.BaseProxy.Close() pxy.BaseProxy.Close()
pxy.rc.NatHoleController.CloseClient(pxy.GetName()) pxy.rc.NatHoleController.CloseClient(pxy.GetName())