From de7771da54a46129da8209a05876006e618da837 Mon Sep 17 00:00:00 2001 From: Craig O'Donnell Date: Mon, 30 Jan 2023 23:15:09 -0500 Subject: [PATCH] limiter at proxy level --- server/proxy/http.go | 13 +++++++++++++ server/proxy/https.go | 5 +++++ server/proxy/proxy.go | 16 ++++++++++++---- server/proxy/stcp.go | 5 +++++ server/proxy/sudp.go | 5 +++++ server/proxy/tcp.go | 5 +++++ server/proxy/tcpmux.go | 5 +++++ server/proxy/udp.go | 12 ++++++++++++ server/proxy/xtcp.go | 5 +++++ 9 files changed, 67 insertions(+), 4 deletions(-) diff --git a/server/proxy/http.go b/server/proxy/http.go index e70cb65f..0f25c8c4 100644 --- a/server/proxy/http.go +++ b/server/proxy/http.go @@ -20,8 +20,10 @@ import ( "strings" frpIo "github.com/fatedier/golib/io" + "golang.org/x/time/rate" "github.com/fatedier/frp/pkg/config" + "github.com/fatedier/frp/pkg/util/limit" frpNet "github.com/fatedier/frp/pkg/util/net" "github.com/fatedier/frp/pkg/util/util" "github.com/fatedier/frp/pkg/util/vhost" @@ -135,6 +137,10 @@ func (pxy *HTTPProxy) GetConf() config.ProxyConf { return pxy.cfg } +func (pxy *HTTPProxy) GetLimiter() *rate.Limiter { + return pxy.limiter +} + func (pxy *HTTPProxy) GetRealConn(remoteAddr string) (workConn net.Conn, err error) { xl := pxy.xl rAddr, errRet := net.ResolveTCPAddr("tcp", remoteAddr) @@ -160,6 +166,13 @@ func (pxy *HTTPProxy) GetRealConn(remoteAddr string) (workConn net.Conn, err err if pxy.cfg.UseCompression { rwc = frpIo.WithCompression(rwc) } + + if pxy.GetLimiter() != nil { + rwc = frpIo.WrapReadWriteCloser(limit.NewReader(rwc, pxy.GetLimiter()), limit.NewWriter(rwc, pxy.GetLimiter()), func() error { + return rwc.Close() + }) + } + workConn = frpNet.WrapReadWriteCloserToConn(rwc, tmpConn) workConn = frpNet.WrapStatsConn(workConn, pxy.updateStatsAfterClosedConn) metrics.Server.OpenConnection(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType) diff --git a/server/proxy/https.go b/server/proxy/https.go index 42ecf35d..238f0489 100644 --- a/server/proxy/https.go +++ b/server/proxy/https.go @@ -20,6 +20,7 @@ import ( "github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/util/util" "github.com/fatedier/frp/pkg/util/vhost" + "golang.org/x/time/rate" ) type HTTPSProxy struct { @@ -74,6 +75,10 @@ func (pxy *HTTPSProxy) GetConf() config.ProxyConf { return pxy.cfg } +func (pxy *HTTPSProxy) GetLimiter() *rate.Limiter { + return pxy.limiter +} + func (pxy *HTTPSProxy) Close() { pxy.BaseProxy.Close() } diff --git a/server/proxy/proxy.go b/server/proxy/proxy.go index 2bc81df6..9d46b96e 100644 --- a/server/proxy/proxy.go +++ b/server/proxy/proxy.go @@ -47,6 +47,7 @@ type Proxy interface { GetUsedPortsNum() int GetResourceController() *controller.ResourceController GetUserInfo() plugin.UserInfo + GetLimiter() *rate.Limiter Close() } @@ -58,6 +59,7 @@ type BaseProxy struct { poolCount int getWorkConnFn GetWorkConnFn serverCfg config.ServerCommonConf + limiter *rate.Limiter userInfo plugin.UserInfo mu sync.RWMutex @@ -189,6 +191,13 @@ func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.Reso getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf, ) (pxy Proxy, err error) { xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(pxyConf.GetBaseInfo().ProxyName) + + var limiter *rate.Limiter + limitBytes := pxyConf.GetBaseInfo().BandwidthLimit.Bytes() + if limitBytes > 0 { + limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes)) + } + basePxy := BaseProxy{ name: pxyConf.GetBaseInfo().ProxyName, rc: rc, @@ -196,6 +205,7 @@ func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.Reso poolCount: poolCount, getWorkConnFn: getWorkConnFn, serverCfg: serverCfg, + limiter: limiter, xl: xl, ctx: xlog.NewContext(ctx, xl), userInfo: userInfo, @@ -290,10 +300,8 @@ func HandleUserTCPConnection(pxy Proxy, userConn net.Conn, serverCfg config.Serv local = frpIo.WithCompression(local) } - limitBytes := cfg.BandwidthLimit.Bytes() - if limitBytes > 0 { - limiter := rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes)) - local = frpIo.WrapReadWriteCloser(limit.NewReader(local, limiter), limit.NewWriter(local, limiter), func() error { + if pxy.GetLimiter() != nil { + local = frpIo.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error { return local.Close() }) } diff --git a/server/proxy/stcp.go b/server/proxy/stcp.go index 5ac47eaa..fd45e69b 100644 --- a/server/proxy/stcp.go +++ b/server/proxy/stcp.go @@ -16,6 +16,7 @@ package proxy import ( "github.com/fatedier/frp/pkg/config" + "golang.org/x/time/rate" ) type STCPProxy struct { @@ -41,6 +42,10 @@ func (pxy *STCPProxy) GetConf() config.ProxyConf { return pxy.cfg } +func (pxy *STCPProxy) GetLimiter() *rate.Limiter { + return pxy.limiter +} + func (pxy *STCPProxy) Close() { pxy.BaseProxy.Close() pxy.rc.VisitorManager.CloseListener(pxy.GetName()) diff --git a/server/proxy/sudp.go b/server/proxy/sudp.go index c4dba6d6..b327b8a2 100644 --- a/server/proxy/sudp.go +++ b/server/proxy/sudp.go @@ -16,6 +16,7 @@ package proxy import ( "github.com/fatedier/frp/pkg/config" + "golang.org/x/time/rate" ) type SUDPProxy struct { @@ -42,6 +43,10 @@ func (pxy *SUDPProxy) GetConf() config.ProxyConf { return pxy.cfg } +func (pxy *SUDPProxy) GetLimiter() *rate.Limiter { + return pxy.limiter +} + func (pxy *SUDPProxy) Close() { pxy.BaseProxy.Close() pxy.rc.VisitorManager.CloseListener(pxy.GetName()) diff --git a/server/proxy/tcp.go b/server/proxy/tcp.go index 0cf9c5f9..feae2c17 100644 --- a/server/proxy/tcp.go +++ b/server/proxy/tcp.go @@ -20,6 +20,7 @@ import ( "strconv" "github.com/fatedier/frp/pkg/config" + "golang.org/x/time/rate" ) type TCPProxy struct { @@ -74,6 +75,10 @@ func (pxy *TCPProxy) GetConf() config.ProxyConf { return pxy.cfg } +func (pxy *TCPProxy) GetLimiter() *rate.Limiter { + return pxy.limiter +} + func (pxy *TCPProxy) Close() { pxy.BaseProxy.Close() if pxy.cfg.Group == "" { diff --git a/server/proxy/tcpmux.go b/server/proxy/tcpmux.go index b812e601..7de43ef5 100644 --- a/server/proxy/tcpmux.go +++ b/server/proxy/tcpmux.go @@ -23,6 +23,7 @@ import ( "github.com/fatedier/frp/pkg/consts" "github.com/fatedier/frp/pkg/util/util" "github.com/fatedier/frp/pkg/util/vhost" + "golang.org/x/time/rate" ) type TCPMuxProxy struct { @@ -94,6 +95,10 @@ func (pxy *TCPMuxProxy) GetConf() config.ProxyConf { return pxy.cfg } +func (pxy *TCPMuxProxy) GetLimiter() *rate.Limiter { + return pxy.limiter +} + func (pxy *TCPMuxProxy) Close() { pxy.BaseProxy.Close() } diff --git a/server/proxy/udp.go b/server/proxy/udp.go index 53865540..3a136c39 100644 --- a/server/proxy/udp.go +++ b/server/proxy/udp.go @@ -24,10 +24,12 @@ import ( "github.com/fatedier/golib/errors" frpIo "github.com/fatedier/golib/io" + "golang.org/x/time/rate" "github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/msg" "github.com/fatedier/frp/pkg/proto/udp" + "github.com/fatedier/frp/pkg/util/limit" frpNet "github.com/fatedier/frp/pkg/util/net" "github.com/fatedier/frp/server/metrics" ) @@ -198,6 +200,12 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) { rwc = frpIo.WithCompression(rwc) } + if pxy.GetLimiter() != nil { + rwc = frpIo.WrapReadWriteCloser(limit.NewReader(rwc, pxy.GetLimiter()), limit.NewWriter(rwc, pxy.GetLimiter()), func() error { + return rwc.Close() + }) + } + pxy.workConn = frpNet.WrapReadWriteCloserToConn(rwc, workConn) ctx, cancel := context.WithCancel(context.Background()) go workConnReaderFn(pxy.workConn) @@ -225,6 +233,10 @@ func (pxy *UDPProxy) GetConf() config.ProxyConf { return pxy.cfg } +func (pxy *UDPProxy) GetLimiter() *rate.Limiter { + return pxy.limiter +} + func (pxy *UDPProxy) Close() { pxy.mu.Lock() defer pxy.mu.Unlock() diff --git a/server/proxy/xtcp.go b/server/proxy/xtcp.go index a1b45d54..b6c7be3a 100644 --- a/server/proxy/xtcp.go +++ b/server/proxy/xtcp.go @@ -18,6 +18,7 @@ import ( "fmt" "github.com/fatedier/golib/errors" + "golang.org/x/time/rate" "github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/msg" @@ -88,6 +89,10 @@ func (pxy *XTCPProxy) GetConf() config.ProxyConf { return pxy.cfg } +func (pxy *XTCPProxy) GetLimiter() *rate.Limiter { + return pxy.limiter +} + func (pxy *XTCPProxy) Close() { pxy.BaseProxy.Close() pxy.rc.NatHoleController.CloseClient(pxy.GetName())