Merge pull request #2 from ma6254/master

add: proxy limit
This commit is contained in:
Akkariin Meiko 2019-09-07 01:06:23 +08:00 committed by GitHub
commit 23d36090a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 238 additions and 23 deletions

1
.gitignore vendored
View File

@ -27,6 +27,7 @@ _testmain.go
bin/ bin/
packages/ packages/
test/bin/ test/bin/
release/
# Cache # Cache
*.swp *.swp

View File

@ -8,13 +8,17 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"github.com/fatedier/frp/extend/limit"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/models/msg"
) )
// Service sakurafrp api servie
type Service struct { type Service struct {
Host url.URL Host url.URL
} }
// NewService crate sakurafrp api servie
func NewService(host string) (s *Service, err error) { func NewService(host string) (s *Service, err error) {
u, err := url.Parse(host) u, err := url.Parse(host)
if err != nil { if err != nil {
@ -142,12 +146,17 @@ func (s Service) CheckProxy(user string, pMsg *msg.NewProxy, timestamp int64, st
return true, nil return true, nil
} }
// GetProxyLimit 获取隧道限速信息
func (s Service) GetProxyLimit(pxyConf *config.BaseProxyConf) (inLimit, outLimit uint64, err error) {
return 1 * limit.MB, 1 * limit.MB, nil
}
func BoolToString(val bool) (str string) { func BoolToString(val bool) (str string) {
if val { if val {
return "true" return "true"
} else {
return "false"
} }
return "false"
} }
type ErrHTTPStatus struct { type ErrHTTPStatus struct {

42
extend/limit/limit.go Normal file
View File

@ -0,0 +1,42 @@
package limit
import (
"io"
frpNet "github.com/fatedier/frp/utils/net"
)
const (
B uint64 = 1 << (10 * (iota))
KB
MB
GB
TB
PB
EB
)
const burstLimit = 1024 * 1024 * 1024
type LimitConn struct {
frpNet.Conn
lr io.Reader
lw io.Writer
}
func NewLimitConn(maxread, maxwrite uint64, c frpNet.Conn) LimitConn {
return LimitConn{
lr: NewReaderWithLimit(c, maxread*KB),
lw: NewWriterWithLimit(c, maxwrite*KB),
Conn: c,
}
}
func (c LimitConn) Read(p []byte) (n int, err error) {
return c.lr.Read(p)
}
func (c LimitConn) Write(p []byte) (n int, err error) {
return c.lw.Write(p)
}

63
extend/limit/reader.go Normal file
View File

@ -0,0 +1,63 @@
package limit
import (
"context"
"io"
"sync"
"time"
"golang.org/x/time/rate"
)
type Reader struct {
r io.Reader
limiter *rate.Limiter
ctx context.Context
mux sync.Mutex
}
// NewReader returns a reader that implements io.Reader with rate limiting.
func NewReader(r io.Reader) *Reader {
return &Reader{
r: r,
ctx: context.Background(),
mux: sync.Mutex{},
}
}
func NewReaderWithLimit(r io.Reader, speed uint64) *Reader {
rr := &Reader{
r: r,
ctx: context.Background(),
mux: sync.Mutex{},
}
rr.SetRateLimit(speed)
return rr
}
// SetRateLimit sets rate limit (bytes/sec) to the reader.
func (s *Reader) SetRateLimit(bytesPerSec uint64) {
s.mux.Lock()
defer s.mux.Unlock()
s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit)
s.limiter.AllowN(time.Now(), burstLimit) // spend initial burst
}
// Read reads bytes into p.
func (s *Reader) Read(p []byte) (int, error) {
s.mux.Lock()
defer s.mux.Unlock()
if s.limiter == nil {
return s.r.Read(p)
}
n, err := s.r.Read(p)
if err != nil {
return n, err
}
if err := s.limiter.WaitN(s.ctx, n); err != nil {
return n, err
}
return n, nil
}

17
extend/limit/w_test.go Normal file
View File

@ -0,0 +1,17 @@
package limit
import (
"fmt"
"net/http"
"testing"
)
func TestHttp(t *testing.T) {
http.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) {
lw := NewWriterWithLimit(w, 10*KB)
for {
fmt.Fprintf(lw, "x")
}
})
http.ListenAndServe(":62542", nil)
}

63
extend/limit/writer.go Normal file
View File

@ -0,0 +1,63 @@
package limit
import (
"context"
"io"
"sync"
"time"
"golang.org/x/time/rate"
)
type Writer struct {
w io.Writer
limiter *rate.Limiter
ctx context.Context
mux sync.Mutex
}
// NewWriter returns a writer that implements io.Writer with rate limiting.
func NewWriter(w io.Writer) *Writer {
return &Writer{
w: w,
ctx: context.Background(),
mux: sync.Mutex{},
}
}
func NewWriterWithLimit(w io.Writer, speed uint64) *Writer {
ww := &Writer{
w: w,
ctx: context.Background(),
mux: sync.Mutex{},
}
ww.SetRateLimit(speed)
return ww
}
// SetRateLimit sets rate limit (bytes/sec) to the writer.
func (s *Writer) SetRateLimit(bytesPerSec uint64) {
s.mux.Lock()
defer s.mux.Unlock()
s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit)
s.limiter.AllowN(time.Now(), burstLimit) // spend initial burst
}
// Write writes bytes from p.
func (s *Writer) Write(p []byte) (int, error) {
s.mux.Lock()
defer s.mux.Unlock()
if s.limiter == nil {
return s.w.Write(p)
}
n, err := s.w.Write(p)
if err != nil {
return n, err
}
if err := s.limiter.WaitN(s.ctx, n); err != nil {
return n, err
}
return n, err
}

1
go.mod
View File

@ -29,4 +29,5 @@ require (
github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae // indirect github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae // indirect
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 golang.org/x/net v0.0.0-20190724013045-ca1201d0de80
golang.org/x/text v0.3.2 // indirect golang.org/x/text v0.3.2 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
) )

4
go.sum
View File

@ -1,5 +1,6 @@
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatedier/beego v0.0.0-20171024143340-6c6a4f5bd5eb h1:wCrNShQidLmvVWn/0PikGmpdP0vtQmnvyRg3ZBEhczw= github.com/fatedier/beego v0.0.0-20171024143340-6c6a4f5bd5eb h1:wCrNShQidLmvVWn/0PikGmpdP0vtQmnvyRg3ZBEhczw=
github.com/fatedier/beego v0.0.0-20171024143340-6c6a4f5bd5eb/go.mod h1:wx3gB6dbIfBRcucp94PI9Bt3I0F2c/MyNEWuhzpWiwk= github.com/fatedier/beego v0.0.0-20171024143340-6c6a4f5bd5eb/go.mod h1:wx3gB6dbIfBRcucp94PI9Bt3I0F2c/MyNEWuhzpWiwk=
@ -27,6 +28,7 @@ github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc h1:lNOt1SMsgHX
github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc/go.mod h1:6/gX3+E/IYGa0wMORlSMla999awQFdbaeQCHjSMKIzY= github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc/go.mod h1:6/gX3+E/IYGa0wMORlSMla999awQFdbaeQCHjSMKIzY=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rakyll/statik v0.1.1 h1:fCLHsIMajHqD5RKigbFXpvX3dN7c80Pm12+NCrI3kvg= github.com/rakyll/statik v0.1.1 h1:fCLHsIMajHqD5RKigbFXpvX3dN7c80Pm12+NCrI3kvg=
github.com/rakyll/statik v0.1.1/go.mod h1:OEi9wJV/fMUAGx1eNjq75DKDsJVuEv1U0oYdX6GX8Zs= github.com/rakyll/statik v0.1.1/go.mod h1:OEi9wJV/fMUAGx1eNjq75DKDsJVuEv1U0oYdX6GX8Zs=
@ -58,4 +60,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@ -30,6 +30,7 @@ import (
"github.com/fatedier/frp/server/proxy" "github.com/fatedier/frp/server/proxy"
"github.com/fatedier/frp/server/stats" "github.com/fatedier/frp/server/stats"
"github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/net"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/version" "github.com/fatedier/frp/utils/version"
"github.com/fatedier/golib/control/shutdown" "github.com/fatedier/golib/control/shutdown"
@ -37,6 +38,7 @@ import (
"github.com/fatedier/golib/errors" "github.com/fatedier/golib/errors"
"github.com/fatedier/frp/extend/api" "github.com/fatedier/frp/extend/api"
"github.com/fatedier/frp/extend/limit"
) )
type ControlManager struct { type ControlManager struct {
@ -420,6 +422,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
var pxyConf config.ProxyConf var pxyConf config.ProxyConf
s, err := api.NewService(g.GlbServerCfg.ApiBaseUrl) s, err := api.NewService(g.GlbServerCfg.ApiBaseUrl)
var workConn proxy.GetWorkConnFn = ctl.GetWorkConn
if err != nil { if err != nil {
return remoteAddr, err return remoteAddr, err
@ -437,6 +440,18 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
if !ok { if !ok {
return remoteAddr, fmt.Errorf("invalid proxy configuration") return remoteAddr, fmt.Errorf("invalid proxy configuration")
} }
in, out, err := s.GetProxyLimit(pxyConf.GetBaseInfo())
if err != nil {
return remoteAddr, err
}
workConn = func() (frpNet.Conn, error) {
fconn, err := ctl.GetWorkConn()
if err != nil {
return nil, err
}
return limit.NewLimitConn(in, out, fconn), nil
}
} }
// Load configures from NewProxy message and check. // Load configures from NewProxy message and check.
@ -447,7 +462,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
// NewProxy will return a interface Proxy. // NewProxy will return a interface Proxy.
// In fact it create different proxies by different proxy type, we just call run() here. // In fact it create different proxies by different proxy type, we just call run() here.
pxy, err := proxy.NewProxy(ctl.runId, ctl.rc, ctl.statsCollector, ctl.poolCount, ctl.GetWorkConn, pxyConf) pxy, err := proxy.NewProxy(ctl.runId, ctl.rc, ctl.statsCollector, ctl.poolCount, workConn, pxyConf)
if err != nil { if err != nil {
return remoteAddr, err return remoteAddr, err
} }