diff --git a/.gitignore b/.gitignore index 56a043c7..ca8dfff7 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,7 @@ _testmain.go bin/ packages/ test/bin/ +release/ # Cache *.swp diff --git a/extend/api/api.go b/extend/api/api.go index 4b3fe58d..b0185583 100755 --- a/extend/api/api.go +++ b/extend/api/api.go @@ -7,14 +7,18 @@ import ( "net/http" "net/url" "strconv" - + + "github.com/fatedier/frp/extend/limit" + "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" ) +// Service sakurafrp api servie type Service struct { Host url.URL } +// NewService crate sakurafrp api servie func NewService(host string) (s *Service, err error) { u, err := url.Parse(host) if err != nil { @@ -67,52 +71,52 @@ func (s Service) CheckProxy(user string, pMsg *msg.NewProxy, timestamp int64, st if err != nil { return false, err } - + headers, err := json.Marshal(pMsg.Headers) if err != nil { return false, err } - + locations, err := json.Marshal(pMsg.Locations) if err != nil { return false, err } - + values := url.Values{} - + // API Basic values.Set("action", "checkproxy") values.Set("user", user) values.Set("timestamp", fmt.Sprintf("%d", timestamp)) values.Set("apitoken", stk) - + // Proxies basic info values.Set("proxy_name", pMsg.ProxyName) values.Set("proxy_type", pMsg.ProxyType) values.Set("use_encryption", BoolToString(pMsg.UseEncryption)) values.Set("use_compression", BoolToString(pMsg.UseCompression)) - + // Http Proxies values.Set("domain", string(domains)) values.Set("subdomain", pMsg.SubDomain) - + // Headers values.Set("locations", string(locations)) values.Set("http_user", pMsg.HttpUser) values.Set("http_pwd", pMsg.HttpPwd) values.Set("host_header_rewrite", pMsg.HostHeaderRewrite) values.Set("headers", string(headers)) - + // Tcp & Udp & Stcp values.Set("remote_port", strconv.Itoa(pMsg.RemotePort)) - + // Stcp & Xtcp values.Set("sk", pMsg.Sk) - + // Load balance values.Set("group", pMsg.Group) values.Set("group_key", pMsg.GroupKey) - + s.Host.RawQuery = values.Encode() defer func(u *url.URL) { u.RawQuery = "" @@ -142,12 +146,17 @@ func (s Service) CheckProxy(user string, pMsg *msg.NewProxy, timestamp int64, st return true, nil } +// GetProxyLimit 获取隧道限速信息 +func (s Service) GetProxyLimit(pxyConf *config.BaseProxyConf) (inLimit, outLimit uint64, err error) { + return 1 * limit.MB, 1 * limit.MB, nil +} + func BoolToString(val bool) (str string) { if val { return "true" - } else { - return "false" } + return "false" + } type ErrHTTPStatus struct { diff --git a/extend/limit/limit.go b/extend/limit/limit.go new file mode 100644 index 00000000..1c0f9d67 --- /dev/null +++ b/extend/limit/limit.go @@ -0,0 +1,42 @@ +package limit + +import ( + "io" + + frpNet "github.com/fatedier/frp/utils/net" +) + +const ( + B uint64 = 1 << (10 * (iota)) + KB + MB + GB + TB + PB + EB +) + +const burstLimit = 1024 * 1024 * 1024 + +type LimitConn struct { + frpNet.Conn + + lr io.Reader + lw io.Writer +} + +func NewLimitConn(maxread, maxwrite uint64, c frpNet.Conn) LimitConn { + return LimitConn{ + lr: NewReaderWithLimit(c, maxread*KB), + lw: NewWriterWithLimit(c, maxwrite*KB), + Conn: c, + } +} + +func (c LimitConn) Read(p []byte) (n int, err error) { + return c.lr.Read(p) +} + +func (c LimitConn) Write(p []byte) (n int, err error) { + return c.lw.Write(p) +} diff --git a/extend/limit/reader.go b/extend/limit/reader.go new file mode 100644 index 00000000..cac8fb30 --- /dev/null +++ b/extend/limit/reader.go @@ -0,0 +1,63 @@ +package limit + +import ( + "context" + "io" + "sync" + "time" + + "golang.org/x/time/rate" +) + +type Reader struct { + r io.Reader + limiter *rate.Limiter + ctx context.Context + mux sync.Mutex +} + +// NewReader returns a reader that implements io.Reader with rate limiting. +func NewReader(r io.Reader) *Reader { + return &Reader{ + r: r, + ctx: context.Background(), + mux: sync.Mutex{}, + } +} + +func NewReaderWithLimit(r io.Reader, speed uint64) *Reader { + rr := &Reader{ + r: r, + ctx: context.Background(), + mux: sync.Mutex{}, + } + rr.SetRateLimit(speed) + return rr +} + +// SetRateLimit sets rate limit (bytes/sec) to the reader. +func (s *Reader) SetRateLimit(bytesPerSec uint64) { + s.mux.Lock() + defer s.mux.Unlock() + + s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit) + s.limiter.AllowN(time.Now(), burstLimit) // spend initial burst +} + +// Read reads bytes into p. +func (s *Reader) Read(p []byte) (int, error) { + s.mux.Lock() + defer s.mux.Unlock() + + if s.limiter == nil { + return s.r.Read(p) + } + n, err := s.r.Read(p) + if err != nil { + return n, err + } + if err := s.limiter.WaitN(s.ctx, n); err != nil { + return n, err + } + return n, nil +} diff --git a/extend/limit/w_test.go b/extend/limit/w_test.go new file mode 100644 index 00000000..88c22c9a --- /dev/null +++ b/extend/limit/w_test.go @@ -0,0 +1,17 @@ +package limit + +import ( + "fmt" + "net/http" + "testing" +) + +func TestHttp(t *testing.T) { + http.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) { + lw := NewWriterWithLimit(w, 10*KB) + for { + fmt.Fprintf(lw, "x") + } + }) + http.ListenAndServe(":62542", nil) +} diff --git a/extend/limit/writer.go b/extend/limit/writer.go new file mode 100644 index 00000000..628e2233 --- /dev/null +++ b/extend/limit/writer.go @@ -0,0 +1,63 @@ +package limit + +import ( + "context" + "io" + "sync" + "time" + + "golang.org/x/time/rate" +) + +type Writer struct { + w io.Writer + limiter *rate.Limiter + ctx context.Context + mux sync.Mutex +} + +// NewWriter returns a writer that implements io.Writer with rate limiting. +func NewWriter(w io.Writer) *Writer { + return &Writer{ + w: w, + ctx: context.Background(), + mux: sync.Mutex{}, + } +} + +func NewWriterWithLimit(w io.Writer, speed uint64) *Writer { + ww := &Writer{ + w: w, + ctx: context.Background(), + mux: sync.Mutex{}, + } + ww.SetRateLimit(speed) + return ww +} + +// SetRateLimit sets rate limit (bytes/sec) to the writer. +func (s *Writer) SetRateLimit(bytesPerSec uint64) { + s.mux.Lock() + defer s.mux.Unlock() + + s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit) + s.limiter.AllowN(time.Now(), burstLimit) // spend initial burst +} + +// Write writes bytes from p. +func (s *Writer) Write(p []byte) (int, error) { + s.mux.Lock() + defer s.mux.Unlock() + + if s.limiter == nil { + return s.w.Write(p) + } + n, err := s.w.Write(p) + if err != nil { + return n, err + } + if err := s.limiter.WaitN(s.ctx, n); err != nil { + return n, err + } + return n, err +} diff --git a/go.mod b/go.mod index 81de8abe..80dd249d 100644 --- a/go.mod +++ b/go.mod @@ -29,4 +29,5 @@ require ( github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae // indirect golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 golang.org/x/text v0.3.2 // indirect + golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 ) diff --git a/go.sum b/go.sum index 3eeac454..be5d5439 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,6 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fatedier/beego v0.0.0-20171024143340-6c6a4f5bd5eb h1:wCrNShQidLmvVWn/0PikGmpdP0vtQmnvyRg3ZBEhczw= github.com/fatedier/beego v0.0.0-20171024143340-6c6a4f5bd5eb/go.mod h1:wx3gB6dbIfBRcucp94PI9Bt3I0F2c/MyNEWuhzpWiwk= @@ -27,6 +28,7 @@ github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc h1:lNOt1SMsgHX github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc/go.mod h1:6/gX3+E/IYGa0wMORlSMla999awQFdbaeQCHjSMKIzY= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rakyll/statik v0.1.1 h1:fCLHsIMajHqD5RKigbFXpvX3dN7c80Pm12+NCrI3kvg= github.com/rakyll/statik v0.1.1/go.mod h1:OEi9wJV/fMUAGx1eNjq75DKDsJVuEv1U0oYdX6GX8Zs= @@ -58,4 +60,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/server/control.go b/server/control.go index c1ec52b7..6d65ae7f 100755 --- a/server/control.go +++ b/server/control.go @@ -30,13 +30,15 @@ import ( "github.com/fatedier/frp/server/proxy" "github.com/fatedier/frp/server/stats" "github.com/fatedier/frp/utils/net" + frpNet "github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/version" "github.com/fatedier/golib/control/shutdown" "github.com/fatedier/golib/crypto" "github.com/fatedier/golib/errors" - + "github.com/fatedier/frp/extend/api" + "github.com/fatedier/frp/extend/limit" ) type ControlManager struct { @@ -418,27 +420,40 @@ func (ctl *Control) manager() { func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err error) { var pxyConf config.ProxyConf - + s, err := api.NewService(g.GlbServerCfg.ApiBaseUrl) - + var workConn proxy.GetWorkConnFn = ctl.GetWorkConn + if err != nil { return remoteAddr, err } - + if g.GlbServerCfg.EnableApi { - + nowTime := time.Now().Unix() ok, err := s.CheckProxy(ctl.loginMsg.User, pxyMsg, nowTime, g.GlbServerCfg.ApiToken) - + if err != nil { return remoteAddr, err } - + if !ok { return remoteAddr, fmt.Errorf("invalid proxy configuration") } + + in, out, err := s.GetProxyLimit(pxyConf.GetBaseInfo()) + if err != nil { + return remoteAddr, err + } + workConn = func() (frpNet.Conn, error) { + fconn, err := ctl.GetWorkConn() + if err != nil { + return nil, err + } + return limit.NewLimitConn(in, out, fconn), nil + } } - + // Load configures from NewProxy message and check. pxyConf, err = config.NewProxyConfFromMsg(pxyMsg) if err != nil { @@ -447,7 +462,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err // NewProxy will return a interface Proxy. // In fact it create different proxies by different proxy type, we just call run() here. - pxy, err := proxy.NewProxy(ctl.runId, ctl.rc, ctl.statsCollector, ctl.poolCount, ctl.GetWorkConn, pxyConf) + pxy, err := proxy.NewProxy(ctl.runId, ctl.rc, ctl.statsCollector, ctl.poolCount, workConn, pxyConf) if err != nil { return remoteAddr, err }